Commit f9cf7569 authored by Dmitry I. Lyakh's avatar Dmitry I. Lyakh

exatn::NumServer is now aware of MPI parallelization.

parent 6993db50
......@@ -4,16 +4,34 @@
namespace exatn {
void initialize() {
#ifdef MPI_ENABLED
void initialize(MPI_Comm communicator,
const std::string & graph_executor_name,
const std::string & node_executor_name)
{
if(!exatnFrameworkInitialized){
serviceRegistry->initialize();
exatnFrameworkInitialized = true;
//std::cout << "#DEBUG(exatn): ExaTN services initialized" << std::endl << std::flush;
numericalServer = std::make_shared<NumServer>();
numericalServer = std::make_shared<NumServer>(communicator,graph_executor_name,node_executor_name);
//std::cout << "#DEBUG(exatn): ExaTN numerical server initialized" << std::endl << std::flush;
}
return;
}
#else
void initialize(const std::string & graph_executor_name,
const std::string & node_executor_name)
{
if(!exatnFrameworkInitialized){
serviceRegistry->initialize();
exatnFrameworkInitialized = true;
//std::cout << "#DEBUG(exatn): ExaTN services initialized" << std::endl << std::flush;
numericalServer = std::make_shared<NumServer>(graph_executor_name,node_executor_name);
//std::cout << "#DEBUG(exatn): ExaTN numerical server initialized" << std::endl << std::flush;
}
return;
}
#endif
bool isInitialized() {
......
......@@ -10,7 +10,14 @@
namespace exatn {
/** Initializes ExaTN **/
void initialize();
#ifdef MPI_ENABLED
void initialize(MPI_Comm communicator = MPI_COMM_WORLD, //MPI communicator
const std::string & graph_executor_name = "eager-dag-executor", //DAG executor kind
const std::string & node_executor_name = "talsh-node-executor"); //DAG node executor kind
#else
void initialize(const std::string & graph_executor_name = "eager-dag-executor", //DAG executor kind
const std::string & node_executor_name = "talsh-node-executor"); //DAG node executor kind
#endif
/** Returns whether or not ExaTN has been initialized **/
bool isInitialized();
......
/** ExaTN::Numerics: Numerical server
REVISION: 2020/01/17
REVISION: 2020/02/27
Copyright (C) 2018-2020 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2020 Oak Ridge National Laboratory (UT-Battelle) **/
......@@ -19,13 +19,32 @@ namespace exatn{
std::shared_ptr<NumServer> numericalServer {nullptr}; //initialized by exatn::initialize()
NumServer::NumServer():
contr_seq_optimizer_("dummy"), tensor_rt_(std::make_shared<runtime::TensorRuntime>())
#ifdef MPI_ENABLED
NumServer::NumServer(MPI_Comm communicator,
const std::string & graph_executor_name,
const std::string & node_executor_name):
contr_seq_optimizer_("dummy"),
tensor_rt_(std::make_shared<runtime::TensorRuntime>(communicator,graph_executor_name,node_executor_name))
{
int mpi_error = MPI_Comm_size(communicator,&num_processes_); assert(mpi_error == MPI_SUCCESS);
mpi_error = MPI_Comm_rank(communicator,&process_rank_); assert(mpi_error == MPI_SUCCESS);
tensor_op_factory_ = TensorOpFactory::get();
scopes_.push(std::pair<std::string,ScopeId>{"GLOBAL",0}); //GLOBAL scope 0 is automatically open (top scope)
tensor_rt_->openScope("GLOBAL");
}
#else
NumServer::NumServer(const std::string & graph_executor_name,
const std::string & node_executor_name):
contr_seq_optimizer_("dummy"),
tensor_rt_(std::make_shared<runtime::TensorRuntime>(graph_executor_name,node_executor_name))
{
num_processes_ = 1; process_rank_ = 0;
tensor_op_factory_ = TensorOpFactory::get();
scopes_.push(std::pair<std::string,ScopeId>{"GLOBAL",0}); //GLOBAL scope 0 is automatically open (top scope)
tensor_rt_->openScope("GLOBAL");
}
#endif
NumServer::~NumServer()
{
......@@ -42,12 +61,25 @@ NumServer::~NumServer()
scopes_.pop();
}
#ifdef MPI_ENABLED
void NumServer::reconfigureTensorRuntime(MPI_Comm communicator,
const std::string & dag_executor_name,
const std::string & node_executor_name)
{
bool synced = tensor_rt_->sync(); assert(synced);
tensor_rt_ = std::move(std::make_shared<runtime::TensorRuntime>(communicator,dag_executor_name,node_executor_name));
return;
}
#else
void NumServer::reconfigureTensorRuntime(const std::string & dag_executor_name,
const std::string & node_executor_name)
{
bool synced = tensor_rt_->sync(); assert(synced);
tensor_rt_ = std::move(std::make_shared<runtime::TensorRuntime>(dag_executor_name,node_executor_name));
return;
}
#endif
void NumServer::resetContrSeqOptimizer(const std::string & optimizer_name)
{
......
/** ExaTN::Numerics: Numerical server
REVISION: 2020/01/17
REVISION: 2020/02/27
Copyright (C) 2018-2020 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2020 Oak Ridge National Laboratory (UT-Battelle) **/
......@@ -91,7 +91,14 @@ class NumServer final {
public:
NumServer();
#ifdef MPI_ENABLED
NumServer(MPI_Comm communicator = MPI_COMM_WORLD, //MPI communicator
const std::string & graph_executor_name = "eager-dag-executor", //DAG executor kind
const std::string & node_executor_name = "talsh-node-executor"); //DAG node executor kind
#else
NumServer(const std::string & graph_executor_name = "eager-dag-executor", //DAG executor kind
const std::string & node_executor_name = "talsh-node-executor"); //DAG node executor kind
#endif
NumServer(const NumServer &) = delete;
NumServer & operator=(const NumServer &) = delete;
NumServer(NumServer &&) noexcept = delete;
......@@ -99,8 +106,14 @@ public:
~NumServer();
/** Reconfigures tensor runtime implementation. **/
#ifdef MPI_ENABLED
void reconfigureTensorRuntime(MPI_Comm communicator,
const std::string & dag_executor_name,
const std::string & node_executor_name);
#else
void reconfigureTensorRuntime(const std::string & dag_executor_name,
const std::string & node_executor_name);
#endif
/** Resets the tensor contraction sequence optimizer that is
invoked when evaluating tensor networks. **/
......@@ -337,6 +350,8 @@ private:
TensorOpFactory * tensor_op_factory_; //tensor operation factory (non-owning pointer)
int num_processes_; //total number of parallel processes
int process_rank_; //rank of the current parallel process
std::shared_ptr<runtime::TensorRuntime> tensor_rt_; //tensor runtime (for actual execution of tensor operations)
};
......
......@@ -902,11 +902,17 @@ TEST(NumServerTester, EigenNumServer)
int main(int argc, char **argv) {
#ifdef MPI_ENABLED
int mpi_error = MPI_Init(&argc, &argv); assert(mpi_error == MPI_SUCCESS);
#endif
exatn::initialize();
::testing::InitGoogleTest(&argc, argv);
auto ret = RUN_ALL_TESTS();
exatn::finalize();
#ifdef MPI_ENABLED
mpi_error = MPI_Finalize(); assert(mpi_error == MPI_SUCCESS);
#endif
return ret;
}
......@@ -29,9 +29,17 @@ end scope main)src";
}
int main(int argc, char **argv) {
#ifdef MPI_ENABLED
int mpi_error = MPI_Init(&argc, &argv); assert(mpi_error == MPI_SUCCESS);
#endif
exatn::initialize();
::testing::InitGoogleTest(&argc, argv);
auto ret = RUN_ALL_TESTS();
exatn::finalize();
#ifdef MPI_ENABLED
mpi_error = MPI_Finalize(); assert(mpi_error == MPI_SUCCESS);
#endif
return ret;
}
\ No newline at end of file
}
......@@ -16,16 +16,35 @@ Copyright (C) 2018-2020 Oak Ridge National Laboratory (UT-Battelle)
namespace exatn {
namespace runtime {
#ifdef MPI_ENABLED
TensorRuntime::TensorRuntime(MPI_Comm communicator,
const std::string & graph_executor_name,
const std::string & node_executor_name):
communicator_(communicator),
graph_executor_name_(graph_executor_name), node_executor_name_(node_executor_name),
current_dag_(nullptr), executing_(false), alive_(false)
{
int mpi_error = MPI_Comm_size(communicator,&num_processes_); assert(mpi_error == MPI_SUCCESS);
mpi_error = MPI_Comm_rank(communicator,&process_rank_); assert(mpi_error == MPI_SUCCESS);
graph_executor_ = exatn::getService<TensorGraphExecutor>(graph_executor_name_);
std::cout << "#DEBUG(exatn::runtime::TensorRuntime)[MAIN_THREAD:Process " << process_rank_
<< "]: DAG executor set to " << graph_executor_name_ << " + "
<< node_executor_name_ << std::endl << std::flush;
launchExecutionThread();
}
#else
TensorRuntime::TensorRuntime(const std::string & graph_executor_name,
const std::string & node_executor_name):
graph_executor_name_(graph_executor_name), node_executor_name_(node_executor_name),
current_dag_(nullptr), executing_(false), alive_(false)
{
num_processes_ = 1; process_rank_ = 0;
graph_executor_ = exatn::getService<TensorGraphExecutor>(graph_executor_name_);
// std::cout << "#DEBUG(exatn::runtime::TensorRuntime)[MAIN_THREAD]: DAG executor set to "
// << graph_executor_name_ << " + " << node_executor_name_ << std::endl << std::flush;
std::cout << "#DEBUG(exatn::runtime::TensorRuntime)[MAIN_THREAD]: DAG executor set to "
<< graph_executor_name_ << " + " << node_executor_name_ << std::endl << std::flush;
launchExecutionThread();
}
#endif
TensorRuntime::~TensorRuntime()
......@@ -39,12 +58,6 @@ TensorRuntime::~TensorRuntime()
}
void TensorRuntime::enableParallelExecution(MPI_Comm communicator)
{
return;
}
void TensorRuntime::launchExecutionThread()
{
if(!(alive_.load())){
......
......@@ -49,7 +49,9 @@ Rationale:
#include "tensor_operation.hpp"
#include "tensor_method.hpp"
#ifdef MPI_ENABLED
#include "mpi.h"
#endif
#include <map>
#include <list>
......@@ -67,17 +69,21 @@ namespace runtime {
class TensorRuntime final {
public:
#ifdef MPI_ENABLED
TensorRuntime(MPI_Comm communicator = MPI_COMM_WORLD, //MPI communicator
const std::string & graph_executor_name = "eager-dag-executor", //DAG executor kind
const std::string & node_executor_name = "talsh-node-executor"); //DAG node executor kind
#else
TensorRuntime(const std::string & graph_executor_name = "eager-dag-executor", //DAG executor kind
const std::string & node_executor_name = "talsh-node-executor"); //DAG node executor kind
#endif
TensorRuntime(const TensorRuntime &) = delete;
TensorRuntime & operator=(const TensorRuntime &) = delete;
TensorRuntime(TensorRuntime &&) noexcept = delete;
TensorRuntime & operator=(TensorRuntime &&) noexcept = delete;
~TensorRuntime();
/** Sets up the parallel distributed configuration. **/
void enableParallelExecution(MPI_Comm communicator);
/** Resets the logging level (0:none) [MAIN THREAD]. **/
void resetLoggingLevel(int level = 0);
......@@ -152,10 +158,18 @@ private:
inline void lockDataReqQ(){data_req_mtx_.lock();}
inline void unlockDataReqQ(){data_req_mtx_.unlock();}
#ifdef MPI_ENABLED
/** MPI communicator **/
MPI_Comm communicator_;
#endif
/** Tensor graph (DAG) executor name **/
std::string graph_executor_name_;
/** Tensor graph (DAG) node executor name **/
std::string node_executor_name_;
/** Total number of parallel processes **/
int num_processes_;
/** Rank of the current parallel process **/
int process_rank_;
/** Current tensor graph (DAG) executor **/
std::shared_ptr<TensorGraphExecutor> graph_executor_;
/** Active execution graphs (DAGs) **/
......
......@@ -69,9 +69,17 @@ TEST(TensorRuntimeTester, checkSimple) {
}
int main(int argc, char **argv) {
#ifdef MPI_ENABLED
int mpi_error = MPI_Init(&argc, &argv); assert(mpi_error == MPI_SUCCESS);
#endif
exatn::initialize();
::testing::InitGoogleTest(&argc, argv);
auto ret = RUN_ALL_TESTS();
exatn::finalize();
#ifdef MPI_ENABLED
mpi_error = MPI_Finalize(); assert(mpi_error == MPI_SUCCESS);
#endif
return ret;
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment