Commit cb00ccd7 authored by Dmitry I. Lyakh's avatar Dmitry I. Lyakh

Fixed MPI-enabled interface. MPI is no longer a public dependency:

Option 1: Application does not initialize MPI, ExaTN does;
Option 2: Application does initialize MPI, ExaTN receives the communicator via a proxy type.
parent fbbd2f5e
Pipeline #92959 passed with stage
in 4 minutes and 25 seconds
......@@ -291,7 +291,7 @@ void create_exatn_py_module(py::module &m) {
py::class_<exatn::NumServer, std::shared_ptr<exatn::NumServer>>(
m, "NumServer", "")
.def(py::init<>())
.def(py::init<const exatn::MPICommProxy &>())
.def("registerTensorMethod", &exatn::NumServer::registerTensorMethod, "")
.def("getTensorMethod", &exatn::NumServer::getTensorMethod, "")
.def("registerExternalData", &exatn::NumServer::registerExternalData, "")
......@@ -547,19 +547,25 @@ void create_exatn_py_module(py::module &m) {
/**
ExaTN module definitions
*/
m.def("Initialize", (void (*)()) & exatn::initialize,
m.def("Initialize",
[](){return exatn::initialize();},
"Initialize the exatn framework.");
m.def("Initialize",
[](const exatn::MPICommProxy & communicator){
return exatn::initialize(communicator);
},
"Initialize the exatn framework.");
m.def(
"getDriverClient",
[](const std::string name) -> std::shared_ptr<exatn::rpc::DriverClient> {
return exatn::getService<exatn::rpc::DriverClient>(name);
},
"");
m.def("Finalize", &exatn::finalize, "Finalize the framework");
m.def(
"getNumServer", []() { return exatn::numericalServer; },
py::return_value_policy::reference, "");
m.def("getDriverClient",
[](const std::string name) -> std::shared_ptr<exatn::rpc::DriverClient> {
return exatn::getService<exatn::rpc::DriverClient>(name);
},
"");
m.def("Finalize", &exatn::finalize, "Finalize the framework.");
m.def("getNumServer", []() { return exatn::numericalServer; },
py::return_value_policy::reference, "");
m.def(
"createVectorSpace",
......
......@@ -9,16 +9,17 @@
namespace exatn {
#ifdef MPI_ENABLED
void initialize(MPICommProxy & communicator,
void initialize(const MPICommProxy & communicator,
const std::string & graph_executor_name,
const std::string & node_executor_name)
{
if(!exatnFrameworkInitialized){
serviceRegistry->initialize();
exatnFrameworkInitialized = true;
exatnInitializedMPI = false;
//std::cout << "#DEBUG(exatn): ExaTN services initialized" << std::endl << std::flush;
numericalServer = std::make_shared<NumServer>(communicator,graph_executor_name,node_executor_name);
//std::cout << "#DEBUG(exatn): ExaTN numerical server initialized" << std::endl << std::flush;
//std::cout << "#DEBUG(exatn): ExaTN numerical server initialized with MPI" << std::endl << std::flush;
}
return;
}
......@@ -33,11 +34,19 @@ void initialize(const std::string & graph_executor_name,
exatnFrameworkInitialized = true;
//std::cout << "#DEBUG(exatn): ExaTN services initialized" << std::endl << std::flush;
#ifdef MPI_ENABLED
numericalServer = std::make_shared<NumServer>(???,graph_executor_name,node_executor_name);
int thread_provided;
int mpi_error = MPI_Init_thread(NULL,NULL,MPI_THREAD_MULTIPLE,&thread_provided);
assert(mpi_error == MPI_SUCCESS);
assert(thread_provided == MPI_THREAD_MULTIPLE);
exatnInitializedMPI = true;
MPI_Comm global_comm = MPI_COMM_WORLD;
numericalServer = std::make_shared<NumServer>(MPICommProxy(&global_comm),
graph_executor_name,node_executor_name);
//std::cout << "#DEBUG(exatn): ExaTN numerical server initialized with MPI" << std::endl << std::flush;
#else
numericalServer = std::make_shared<NumServer>(graph_executor_name,node_executor_name);
#endif
//std::cout << "#DEBUG(exatn): ExaTN numerical server initialized" << std::endl << std::flush;
#endif
}
return;
}
......@@ -50,8 +59,16 @@ bool isInitialized() {
void finalize() {
numericalServer.reset();
exatnFrameworkInitialized = false;
//std::cout << "#DEBUG(exatn): ExaTN numerical server shut down" << std::endl << std::flush;
if(exatnFrameworkInitialized){
#ifdef MPI_ENABLED
if(exatnInitializedMPI){
int mpi_error = MPI_Finalize(); assert(mpi_error == MPI_SUCCESS);
exatnInitializedMPI = false;
}
#endif
exatnFrameworkInitialized = false;
//std::cout << "#DEBUG(exatn): ExaTN numerical server shut down" << std::endl << std::flush;
}
return;
}
......
......@@ -11,7 +11,7 @@ namespace exatn {
/** Initializes ExaTN **/
#ifdef MPI_ENABLED
void initialize(MPICommProxy & communicator, //MPI communicator proxy
void initialize(const MPICommProxy & communicator, //MPI communicator proxy
const std::string & graph_executor_name = "eager-dag-executor", //DAG executor kind
const std::string & node_executor_name = "talsh-node-executor"); //DAG node executor kind
#endif
......
......@@ -3,6 +3,7 @@
namespace exatn {
bool exatnFrameworkInitialized = false;
bool exatnInitializedMPI = false;
std::shared_ptr<ServiceRegistry> serviceRegistry = std::make_shared<ServiceRegistry>();
......
......@@ -11,6 +11,7 @@
namespace exatn {
extern bool exatnFrameworkInitialized;
extern bool exatnInitializedMPI;
extern std::shared_ptr<ServiceRegistry> serviceRegistry;
template <typename Service>
......
......@@ -24,7 +24,7 @@ std::shared_ptr<NumServer> numericalServer {nullptr}; //initialized by exatn::in
#ifdef MPI_ENABLED
NumServer::NumServer(MPICommProxy & communicator,
NumServer::NumServer(const MPICommProxy & communicator,
const std::string & graph_executor_name,
const std::string & node_executor_name):
contr_seq_optimizer_("dummy"),
......@@ -69,7 +69,7 @@ NumServer::~NumServer()
#ifdef MPI_ENABLED
void NumServer::reconfigureTensorRuntime(MPICommProxy & communicator,
void NumServer::reconfigureTensorRuntime(const MPICommProxy & communicator,
const std::string & dag_executor_name,
const std::string & node_executor_name)
{
......
......@@ -92,7 +92,7 @@ class NumServer final {
public:
#ifdef MPI_ENABLED
NumServer(MPICommProxy & communicator, //MPI communicator proxy
NumServer(const MPICommProxy & communicator, //MPI communicator proxy
const std::string & graph_executor_name = "eager-dag-executor", //DAG executor kind
const std::string & node_executor_name = "talsh-node-executor"); //DAG node executor kind
#else
......@@ -107,7 +107,7 @@ public:
/** Reconfigures tensor runtime implementation. **/
#ifdef MPI_ENABLED
void reconfigureTensorRuntime(MPICommProxy & communicator,
void reconfigureTensorRuntime(const MPICommProxy & communicator,
const std::string & dag_executor_name,
const std::string & node_executor_name);
#else
......
......@@ -907,9 +907,15 @@ TEST(NumServerTester, EigenNumServer)
int main(int argc, char **argv) {
#ifdef MPI_ENABLED
int mpi_error = MPI_Init(&argc, &argv); assert(mpi_error == MPI_SUCCESS);
#endif
int thread_provided;
int mpi_error = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &thread_provided);
assert(mpi_error == MPI_SUCCESS);
assert(thread_provided == MPI_THREAD_MULTIPLE);
MPI_Comm global_comm = MPI_COMM_WORLD;
exatn::initialize(MPICommProxy(&global_comm));
#else
exatn::initialize();
#endif
::testing::InitGoogleTest(&argc, argv);
auto ret = RUN_ALL_TESTS();
......
......@@ -2,10 +2,6 @@
#include <gtest/gtest.h>
#include "exatn.hpp"
#ifdef MPI_ENABLED
#include "mpi.h"
#endif
using namespace exatn::parser;
TEST(TAProLInterpreterTester, checkSimple) {
......@@ -35,17 +31,11 @@ end scope main
}
int main(int argc, char **argv) {
#ifdef MPI_ENABLED
int mpi_error = MPI_Init(&argc, &argv); assert(mpi_error == MPI_SUCCESS);
#endif
exatn::initialize();
::testing::InitGoogleTest(&argc, argv);
auto ret = RUN_ALL_TESTS();
exatn::finalize();
#ifdef MPI_ENABLED
mpi_error = MPI_Finalize(); assert(mpi_error == MPI_SUCCESS);
#endif
return ret;
}
/** ExaTN: MPI proxy types
/** ExaTN: MPI Communicator Proxy
REVISION: 2020/03/10
Copyright (C) 2018-2020 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2020 Oak Ridge National Laboratory (UT-Battelle) **/
#ifndef EXATN_MPI_PROXY_HPP_
#define EXATN_MPI_PROXY_HPP_
#ifndef EXATN_MPI_COMM_PROXY_HPP_
#define EXATN_MPI_COMM_PROXY_HPP_
namespace exatn {
class MPICommProxy{
class MPICommProxy {
public:
template<typename MPITypeName>
MPICommProxy(MPITypeName * mpi_object_ptr):
object_(static_cast<void*>(mpi_object_ptr)) {}
template<typename MPICommType>
MPICommProxy(MPICommType * mpi_comm_ptr): mpi_comm_ptr_(static_cast<void*>(mpi_comm_ptr)) {}
MPICommProxy(const MPICommProxy &) = default;
MPICommProxy & operator=(const MPICommProxy &) = default;
......@@ -22,15 +21,15 @@ public:
MPICommProxy & operator=(MPICommProxy &&) noexcept = default;
~MPICommProxy() = default;
bool isEmpty() const {return (object_ == nullptr);}
bool isEmpty() const {return (mpi_comm_ptr_ == nullptr);}
template<typename MPITypeName>
MPITypeName * get(){return static_cast<MPITypeName*>(object_);}
template<typename MPICommType>
MPICommType * get() const {return static_cast<MPICommType*>(mpi_comm_ptr_);}
private:
void * object_;
void * mpi_comm_ptr_; //weak non-owning pointer to an MPI communicator
};
} //namespace exatn
#endif //EXATN_MPI_PROXY_HPP_
#endif //EXATN_MPI_COMM_PROXY_HPP_
......@@ -21,15 +21,17 @@ namespace exatn {
namespace runtime {
#ifdef MPI_ENABLED
TensorRuntime::TensorRuntime(MPICommProxy & communicator,
static MPI_Comm global_mpi_comm; //MPI communicator used to initialize the tensor runtime
TensorRuntime::TensorRuntime(const MPICommProxy & communicator,
const std::string & graph_executor_name,
const std::string & node_executor_name):
mpi_comm_(communicator),
graph_executor_name_(graph_executor_name), node_executor_name_(node_executor_name),
current_dag_(nullptr), executing_(false), alive_(false)
{
int mpi_error = MPI_Comm_size(*(mpi_comm_.get<MPI_Comm>()),&num_processes_); assert(mpi_error == MPI_SUCCESS);
mpi_error = MPI_Comm_rank(*(mpi_comm_.get<MPI_Comm>()),&process_rank_); assert(mpi_error == MPI_SUCCESS);
global_mpi_comm = *(communicator.get<MPI_Comm>());
int mpi_error = MPI_Comm_size(global_mpi_comm,&num_processes_); assert(mpi_error == MPI_SUCCESS);
mpi_error = MPI_Comm_rank(global_mpi_comm,&process_rank_); assert(mpi_error == MPI_SUCCESS);
graph_executor_ = exatn::getService<TensorGraphExecutor>(graph_executor_name_);
std::cout << "#DEBUG(exatn::runtime::TensorRuntime)[MAIN_THREAD:Process " << process_rank_
<< "]: DAG executor set to " << graph_executor_name_ << " + "
......
......@@ -69,7 +69,7 @@ class TensorRuntime final {
public:
#ifdef MPI_ENABLED
TensorRuntime(MPICommProxy & communicator, //MPI communicator proxy
TensorRuntime(const MPICommProxy & communicator, //MPI communicator proxy
const std::string & graph_executor_name = "eager-dag-executor", //DAG executor kind
const std::string & node_executor_name = "talsh-node-executor"); //DAG node executor kind
#else
......@@ -156,10 +156,6 @@ private:
inline void lockDataReqQ(){data_req_mtx_.lock();}
inline void unlockDataReqQ(){data_req_mtx_.unlock();}
#ifdef MPI_ENABLED
/** MPI communicator proxy **/
MPICommProxy mpi_comm_;
#endif
/** Tensor graph (DAG) executor name **/
std::string graph_executor_name_;
/** Tensor graph (DAG) node executor name **/
......
......@@ -13,10 +13,6 @@
#include <gtest/gtest.h>
#include "exatn.hpp"
#ifdef MPI_ENABLED
#include "mpi.h"
#endif
TEST(TensorRuntimeTester, checkSimple) {
using exatn::numerics::Tensor;
......@@ -72,18 +68,13 @@ TEST(TensorRuntimeTester, checkSimple) {
}
int main(int argc, char **argv) {
#ifdef MPI_ENABLED
int mpi_error = MPI_Init(&argc, &argv); assert(mpi_error == MPI_SUCCESS);
#endif
exatn::initialize();
::testing::InitGoogleTest(&argc, argv);
auto ret = RUN_ALL_TESTS();
exatn::finalize();
#ifdef MPI_ENABLED
mpi_error = MPI_Finalize(); assert(mpi_error == MPI_SUCCESS);
#endif
return ret;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment