Commit edacaf70 authored by Dmitry I. Lyakh's avatar Dmitry I. Lyakh
Browse files

Finished CuQuantum executor backend integration, except final allreduce.


Signed-off-by: default avatarDmitry I. Lyakh <quant4me@gmail.com>
parent 037ce2d1
......@@ -9,7 +9,7 @@ ISSUES:
That is, the order of tensor operations across all participating
processes must be consistent such that every encountered global
tensor operation will receive the same tensor operand irrespective
of the difference in the locally generated tensor name. Special
of the difference in the locally generated tensor names. Special
care needs to be taken in iterating over associative tensor containers,
to ensure that the keys are consistent accross all participating
processes. For example, automatically generated tensor names
......@@ -21,9 +21,7 @@ ISSUES:
BUGS:
- 32-bit integer MPI message chunking issue in the backend.
- Fix the bug(s) in the tensor order reduction mechanism in the TalshExecutor backend.
- Fix the bug(s) in the tensor order reduction mechanism in the TalshNodeExecutor backend.
FEATURES:
......@@ -39,11 +37,13 @@ FEATURES:
Contract replaced tensors, then replace the contracted
tensor with a new tensor (sub)network.
- Implement the Renormalization procedure.
- Implement SAVE/LOAD API for TensorExpansion.
- Implement TensorNetwork slice computing Generator.
- Implement b-D procedure.
- Implement bl-D procedure.
- Implement conjugate gradient optimization procedure.
......
/** ExaTN::Numerics: General client header (free function API)
REVISION: 2021/10/30
REVISION: 2022/01/07
Copyright (C) 2018-2021 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle) **/
Copyright (C) 2018-2022 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle) **/
/** Rationale:
1. Vector space and subspace registration [spaces.hpp, space_register.hpp]:
......@@ -1086,6 +1086,12 @@ inline std::shared_ptr<exatn::TensorNetwork> makeTensorNetwork(const std::string
// INTERNAL CONTROL API //
//////////////////////////
/** Switches the computational backend: {"default","cuquantum"}.
Only applies to tensor network execution. **/
inline void switchComputationalBackend(const std::string & backend_name)
{return numericalServer->switchComputationalBackend(backend_name);}
/** Resets the tensor contraction sequence optimizer that is invoked
when evaluating tensor networks: {dummy,heuro,greed,metis}. **/
inline void resetContrSeqOptimizer(const std::string & optimizer_name)
......
/** ExaTN::Numerics: Numerical server
REVISION: 2021/12/10
REVISION: 2022/01/07
Copyright (C) 2018-2021 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle) **/
Copyright (C) 2018-2022 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle) **/
#include "num_server.hpp"
#include "tensor_range.hpp"
......@@ -89,7 +89,8 @@ NumServer::NumServer(const MPICommProxy & communicator,
const ParamConf & parameters,
const std::string & graph_executor_name,
const std::string & node_executor_name):
contr_seq_optimizer_("metis"), contr_seq_caching_(false), logging_(0), intra_comm_(communicator), validation_tracing_(false)
contr_seq_optimizer_("metis"), contr_seq_caching_(false), logging_(0), comp_backend_("default"),
intra_comm_(communicator), validation_tracing_(false)
{
int mpi_error = MPI_Comm_size(*(communicator.get<MPI_Comm>()),&num_processes_); assert(mpi_error == MPI_SUCCESS);
mpi_error = MPI_Comm_rank(*(communicator.get<MPI_Comm>()),&process_rank_); assert(mpi_error == MPI_SUCCESS);
......@@ -117,7 +118,8 @@ NumServer::NumServer(const MPICommProxy & communicator,
NumServer::NumServer(const ParamConf & parameters,
const std::string & graph_executor_name,
const std::string & node_executor_name):
contr_seq_optimizer_("metis"), contr_seq_caching_(false), logging_(0), validation_tracing_(false)
contr_seq_optimizer_("metis"), contr_seq_caching_(false), logging_(0), comp_backend_("default"),
validation_tracing_(false)
{
num_processes_ = 1; process_rank_ = 0; global_process_rank_ = 0;
process_world_ = std::make_shared<ProcessGroup>(intra_comm_,num_processes_); //intra-communicator is empty here
......@@ -194,6 +196,22 @@ void NumServer::reconfigureTensorRuntime(const ParamConf & parameters,
}
#endif
void NumServer::switchComputationalBackend(const std::string & backend_name)
{
bool success = sync(); assert(success);
if(backend_name == "default"){
comp_backend_ = backend_name;
#ifdef CUQUANTUM
}else if(backend_name == "cuquantum"){
comp_backend_ = backend_name;
#endif
}else{
std::cout << "#ERROR(exatn::NumServer): switchComputationalBackend: Unknown backend: " << backend_name << std::endl;
std::abort();
}
return;
}
void NumServer::resetContrSeqOptimizer(const std::string & optimizer_name, bool caching)
{
contr_seq_optimizer_ = optimizer_name;
......@@ -612,7 +630,7 @@ bool NumServer::submit(const ProcessGroup & process_group,
//Determine parallel execution configuration:
unsigned int local_rank; //local process rank within the process group
if(!process_group.rankIsIn(process_rank_,&local_rank)) return true; //process is not in the group: Do nothing
assert(network.isValid()); //debug
//assert(network.isValid()); //debug
unsigned int num_procs = process_group.getSize(); //number of executing processes
assert(local_rank < num_procs);
if(logging_ > 0) logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
......@@ -883,6 +901,30 @@ bool NumServer::submit(const ProcessGroup & process_group,
bool NumServer::submit(const ProcessGroup & process_group,
std::shared_ptr<TensorNetwork> network)
{
#ifdef CUQUANTUM
//Try execution via an alternative computational backend:
if(comp_backend_ == "cuquantum"){
//Determine parallel execution configuration:
unsigned int local_rank; //local process rank within the process group
if(!process_group.rankIsIn(process_rank_,&local_rank)) return true; //process is not in the group: Do nothing
//assert(network->isValid()); //debug
unsigned int num_procs = process_group.getSize(); //number of executing processes
assert(local_rank < num_procs);
if(logging_ > 0) logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
<< "]: Submitting tensor network <" << network->getName() << "> (" << network->getTensor(0)->getName()
<< ") for execution via cuQuantum by " << num_procs << " processes with memory limit "
<< process_group.getMemoryLimitPerProcess() << " bytes" << std::endl << std::flush;
if(logging_ > 0) network->printItFile(logfile_);
const auto exec_handle = tensor_rt_->submit(network,process_group.getMPICommProxy(),num_procs,local_rank);
bool success = (exec_handle != 0);
if(success){
auto res = tn_exec_handles_.emplace(std::make_pair(network->getTensor(0)->getTensorHash(),exec_handle));
success = res.second;
if(success && logging_ > 0) logfile_ << "Number of submitted networks via cuQuantum = 1" << std::endl << std::flush;
}
return success;
}
#endif
if(network) return submit(process_group,*network);
return false;
}
......@@ -1030,6 +1072,14 @@ bool NumServer::sync(const ProcessGroup & process_group, const Tensor & tensor,
{
bool success = true;
if(!process_group.rankIsIn(process_rank_)) return success; //process is not in the group: Do nothing
#ifdef CUQUANTUM
if(comp_backend_ == "cuquantum"){
auto iter = tn_exec_handles_.find(tensor.getTensorHash());
bool synced = (iter == tn_exec_handles_.end());
if(!synced) synced = tensor_rt_->syncNetwork(iter->second,wait);
return synced;
}
#endif
auto iter = tensors_.find(tensor.getName());
if(iter != tensors_.end()){
if(iter->second->isComposite()){
......@@ -1081,7 +1131,11 @@ bool NumServer::sync(const ProcessGroup & process_group, TensorNetwork & network
bool NumServer::sync(bool wait)
{
return sync(getCurrentProcessGroup(),wait);
bool success = sync(getCurrentProcessGroup(),wait);
#ifdef CUQUANTUM
if(comp_backend_ == "cuquantum" && success) tn_exec_handles_.clear();
#endif
return success;
}
bool NumServer::sync(const ProcessGroup & process_group, bool wait)
......@@ -1092,6 +1146,9 @@ bool NumServer::sync(const ProcessGroup & process_group, bool wait)
if(success){
if(logging_ > 0) logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
<< "]: Locally synchronized all operations" << std::endl << std::flush;
#ifdef CUQUANTUM
if(comp_backend_ == "cuquantum") tn_exec_handles_.clear();
#endif
#ifdef MPI_ENABLED
if(wait){
auto errc = MPI_Barrier(process_group.getMPICommProxy().getRef<MPI_Comm>());
......
/** ExaTN::Numerics: Numerical server
REVISION: 2021/12/22
REVISION: 2022/01/07
Copyright (C) 2018-2021 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle) **/
Copyright (C) 2018-2022 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle) **/
/** Rationale:
(a) Numerical server provides basic tensor network processing functionality:
......@@ -270,6 +270,9 @@ public:
const std::string & node_executor_name);
#endif
/** Switches the computational backend. **/
void switchComputationalBackend(const std::string & backend_name);
/** Resets the tensor contraction sequence optimizer that is
invoked when evaluating tensor networks. **/
void resetContrSeqOptimizer(const std::string & optimizer_name, //in: tensor contraction sequence optimizer name
......@@ -1032,25 +1035,38 @@ protected:
private:
//Spaces:
std::shared_ptr<numerics::SpaceRegister> space_register_; //register of vector spaces and their named subspaces
std::unordered_map<std::string,SpaceId> subname2id_; //maps a subspace name to its parental vector space id
//Tensors:
std::unordered_map<std::string,std::shared_ptr<Tensor>> tensors_; //registered tensors (by CREATE operation)
std::map<std::string,std::shared_ptr<Tensor>> implicit_tensors_; //tensors created implicitly by the runtime (for garbage collection)
std::unordered_map<std::string,ProcessGroup> tensor_comms_; //process group associated with each tensor
#ifdef CUQUANTUM
//Tensor network execution handles:
std::unordered_map<numerics::TensorHashType,runtime::TensorOpExecHandle> tn_exec_handles_;
#endif
//Contraction path optimizer:
std::string contr_seq_optimizer_; //tensor contraction sequence optimizer invoked when evaluating tensor networks
bool contr_seq_caching_; //regulates whether or not to cache pseudo-optimal tensor contraction orders for later reuse
//Registered external methods and data:
std::map<std::string,std::shared_ptr<TensorMethod>> ext_methods_; //external tensor methods
std::map<std::string,std::shared_ptr<BytePacket>> ext_data_; //external data
//Program scopes:
std::stack<std::pair<std::string,ScopeId>> scopes_; //TAProL scope stack: {Scope name, Scope Id}
//Tensor operation factory:
TensorOpFactory * tensor_op_factory_; //tensor operation factory (non-owning pointer)
//Configuration:
int logging_; //logging level
std::ofstream logfile_; //log file
std::string comp_backend_; //current computational backend
int num_processes_; //total number of parallel processes in the dedicated MPI communicator
int process_rank_; //rank of the current parallel process in the dedicated MPI communicator
int global_process_rank_; //rank of the current parallel process in MPI_COMM_WORLD
......
......@@ -3807,6 +3807,8 @@ TEST(NumServerTester, CuTensorNet) {
success = exatn::initTensorRnd("C"); assert(success);
success = exatn::initTensor("D",0.0); assert(success);
exatn::switchComputationalBackend("default");
//Contract tensor network:
int num_repeats = NUM_REPEATS;
while(--num_repeats >= 0){
......@@ -3814,8 +3816,9 @@ TEST(NumServerTester, CuTensorNet) {
std::cout << "D(m,x,n,y)+=A(m,h,k,n)*B(u,k,h)*C(x,u,y): ";
auto flops = exatn::getTotalFlopCount();
auto time_start = exatn::Timer::timeInSecHR();
success = exatn::evaluateTensorNetworkSync("cuNet","D(m,x,n,y)+=A(m,h,k,n)*B(u,k,h)*C(x,u,y)");
success = exatn::evaluateTensorNetwork("cuNet","D(m,x,n,y)+=A(m,h,k,n)*B(u,k,h)*C(x,u,y)");
assert(success);
success = exatn::sync("D"); assert(success);
auto duration = exatn::Timer::timeInSecHR(time_start);
flops = exatn::getTotalFlopCount() - flops;
std::cout << "Performance = " << (flops / (1e9 * duration)) << " Gflop/s" << std::endl;
......
......@@ -16,7 +16,9 @@ add_library(${LIBRARY_NAME}
add_dependencies(${LIBRARY_NAME} exatensor-build)
target_include_directories(${LIBRARY_NAME}
PUBLIC . graph optimizer executor ${CMAKE_SOURCE_DIR}/src/exatn executor/cuquantum)
PUBLIC . graph optimizer executor
${CMAKE_SOURCE_DIR}/src/exatn ${CMAKE_SOURCE_DIR}/src/utils
executor/cuquantum)
target_link_libraries(${LIBRARY_NAME}
PUBLIC CppMicroServices exatn-utils exatn-numerics exatn-runtime-graph
......
......@@ -20,6 +20,7 @@ target_include_directories(${LIBRARY_NAME}
graph_executors/lazy
../graph
${CMAKE_SOURCE_DIR}/src/exatn
${CMAKE_SOURCE_DIR}/src/utils
cuquantum
)
......
......@@ -14,6 +14,7 @@ set_target_properties(${LIBRARY_NAME} PROPERTIES DEFINE_SYMBOL "")
target_include_directories(${LIBRARY_NAME}
PUBLIC .
${CMAKE_SOURCE_DIR}/src/exatn
${CMAKE_SOURCE_DIR}/src/utils
)
target_link_libraries(${LIBRARY_NAME} PUBLIC exatn-numerics)
......
/** ExaTN: Tensor Runtime: Tensor network executor: NVIDIA cuQuantum
REVISION: 2022/01/06
REVISION: 2022/01/07
Copyright (C) 2018-2022 Dmitry Lyakh
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle)
......@@ -53,6 +53,8 @@ struct TensorDescriptor {
struct TensorNetworkReq {
TensorNetworkQueue::ExecStat exec_status = TensorNetworkQueue::ExecStat::None; //tensor network execution status
int num_procs = 0; //total number of executing processes
int proc_id = -1; //id of the current executing process
std::shared_ptr<numerics::TensorNetwork> network; //tensor network specification
std::unordered_map<numerics::TensorHashType, TensorDescriptor> tensor_descriptors; //tensor descriptors (shape, volume, data type, body)
std::unordered_map<unsigned int, std::vector<int32_t>> tensor_modes; //indices associated with tensor dimensions (key = original tensor id)
......@@ -110,15 +112,16 @@ struct TensorNetworkReq {
CuQuantumExecutor::CuQuantumExecutor(TensorImplFunc tensor_data_access_func,
unsigned int pipeline_depth,
unsigned int process_rank, unsigned int num_processes):
unsigned int num_processes, unsigned int process_rank):
tensor_data_access_func_(std::move(tensor_data_access_func)),
pipe_depth_(pipeline_depth), process_rank_(process_rank), num_processes_(num_processes)
pipe_depth_(pipeline_depth), num_processes_(num_processes), process_rank_(process_rank)
{
static_assert(std::is_same<cutensornetHandle_t,void*>::value,"#FATAL(exatn::runtime::CuQuantumExecutor): cutensornetHandle_t != (void*)");
const size_t version = cutensornetGetVersion();
std::cout << "#DEBUG(exatn::runtime::CuQuantumExecutor): cuTensorNet backend version " << version << std::endl;
std::cout << "#DEBUG(exatn::runtime::CuQuantumExecutor): Total number of processes = " << num_processes_ << std::endl;
int num_gpus = 0;
auto error_code = talshDeviceCount(DEV_NVIDIA_GPU,&num_gpus); assert(error_code == TALSH_SUCCESS);
for(int i = 0; i < num_gpus; ++i){
......@@ -170,6 +173,7 @@ CuQuantumExecutor::~CuQuantumExecutor()
TensorNetworkQueue::ExecStat CuQuantumExecutor::execute(std::shared_ptr<numerics::TensorNetwork> network,
unsigned int num_processes, unsigned int process_rank,
const TensorOpExecHandle exec_handle)
{
assert(network);
......@@ -179,6 +183,8 @@ TensorNetworkQueue::ExecStat CuQuantumExecutor::execute(std::shared_ptr<numerics
auto tn_req = res.first->second;
tn_req->network = network;
tn_req->exec_status = TensorNetworkQueue::ExecStat::Idle;
tn_req->num_procs = num_processes;
tn_req->proc_id = process_rank;
parseTensorNetwork(tn_req); //still Idle
loadTensors(tn_req); //Idle --> Loading
if(tn_req->exec_status == TensorNetworkQueue::ExecStat::Loading){
......@@ -455,7 +461,7 @@ void CuQuantumExecutor::contractTensorNetwork(std::shared_ptr<TensorNetworkReq>
&num_slices,sizeof(num_slices)));
assert(num_slices > 0);
HANDLE_CUDA_ERROR(cudaEventRecord(tn_req->compute_start,tn_req->stream));
for(int64_t slice_id = process_rank_; slice_id < num_slices; slice_id += num_processes_){
for(int64_t slice_id = tn_req->proc_id; slice_id < num_slices; slice_id += tn_req->num_procs){
HANDLE_CTN_ERROR(cutensornetContraction(gpu_attr_[gpu].second.cutn_handle,
tn_req->comp_plan,
tn_req->data_in,tn_req->data_out,
......
/** ExaTN: Tensor Runtime: Tensor network executor: NVIDIA cuQuantum
REVISION: 2022/01/06
REVISION: 2022/01/07
Copyright (C) 2018-2022 Dmitry Lyakh
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle)
......@@ -42,8 +42,8 @@ public:
CuQuantumExecutor(TensorImplFunc tensor_data_access_func,
unsigned int pipeline_depth,
unsigned int process_rank,
unsigned int num_processes);
unsigned int num_processes,
unsigned int process_rank);
CuQuantumExecutor(const CuQuantumExecutor &) = delete;
CuQuantumExecutor & operator=(CuQuantumExecutor &) = delete;
......@@ -54,14 +54,16 @@ public:
/** Submits a tensor network for execution via CuQuantumExecutor.
The associated tensor network execution handle can be used
for progressing and completing the tensor network execution. **/
TensorNetworkQueue::ExecStat execute(std::shared_ptr<numerics::TensorNetwork> network,
const TensorOpExecHandle exec_handle);
TensorNetworkQueue::ExecStat execute(std::shared_ptr<numerics::TensorNetwork> network, //in: tensor network
unsigned int num_processes, //in: total number of executing processes
unsigned int process_rank, //in: rank of the current executing process
const TensorOpExecHandle exec_handle); //in: tensor network execution handle
/** Synchronizes on the progress of the tensor network execution.
If wait = TRUE, waits until completion, otherwise just tests the progress.
Returns the current status of the tensor network execution. **/
TensorNetworkQueue::ExecStat sync(const TensorOpExecHandle exec_handle,
int * error_code);
TensorNetworkQueue::ExecStat sync(const TensorOpExecHandle exec_handle, //in: tensor network execution handle
int * error_code); //out: error code (0:success)
/** Synchronizes execution of all submitted tensor networks to completion. **/
void sync();
......@@ -100,10 +102,10 @@ protected:
TensorImplFunc tensor_data_access_func_; //numerics::Tensor --> {tensor_body_ptr, size_in_bytes}
/** Pipeline depth **/
const unsigned int pipe_depth_;
/** Process rank **/
const unsigned int process_rank_;
/** Total number of parallel processes **/
const unsigned int num_processes_;
/** Current process rank **/
const unsigned int process_rank_;
};
} //namespace runtime
......
/** ExaTN: Tensor Runtime: Tensor network executor: Execution queue
REVISION: 2022/01/05
REVISION: 2022/01/07
Copyright (C) 2018-2022 Dmitry Lyakh
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle)
......@@ -42,6 +42,14 @@ public:
Completed //execution completed
};
//Tensor network entry:
struct NetEntry {
ExecStat exec_status = ExecStat::None; //execution status
unsigned int num_procs = 0; //number of executing processes
unsigned int proc_id = 0; //id of the current executing process
MPICommProxy comm; //MPI communicator proxy
};
using TensorNetworkQueueIterator =
std::list<std::pair<std::shared_ptr<numerics::TensorNetwork>,TensorOpExecHandle>>::iterator;
......@@ -80,10 +88,14 @@ public:
/** Appends a new tensor network to the queue (no repeats allowed).
Upon success, returns a positive execution handle, zero otherwise. **/
TensorOpExecHandle append(std::shared_ptr<numerics::TensorNetwork> network) {
TensorOpExecHandle append(std::shared_ptr<numerics::TensorNetwork> network,
const MPICommProxy & communicator,
unsigned int num_processes,
unsigned int process_rank) {
lock();
TensorOpExecHandle tn_hash = getTensorNetworkHash(network);
auto res = tn_exec_stat_.emplace(std::make_pair(tn_hash,ExecStat::Idle));
auto res = tn_exec_stat_.emplace(std::make_pair(tn_hash,
NetEntry{ExecStat::Idle,num_processes,process_rank,communicator}));
if(res.second){
networks_.emplace_back(std::make_pair(network,tn_hash));
}else{
......@@ -100,7 +112,7 @@ public:
assert(current_network_ != networks_.end());
auto iter = tn_exec_stat_.find(current_network_->second);
if(iter != tn_exec_stat_.end()){
if(iter->second == ExecStat::Completed){
if(iter->second.exec_status == ExecStat::Completed){
tn_exec_stat_.erase(iter);
}else{
std::cout << "#ERROR(exatn::runtime::TensorNetworkQueue): Attempt to delete an unfinished tensor network!\n";
......@@ -118,7 +130,7 @@ public:
auto exec_stat = ExecStat::None;
lock();
auto iter = tn_exec_stat_.find(exec_handle);
if(iter != tn_exec_stat_.cend()) exec_stat = iter->second;
if(iter != tn_exec_stat_.cend()) exec_stat = iter->second.exec_status;
unlock();
return exec_stat;
}
......@@ -131,14 +143,29 @@ public:
auto exec_stat = ExecStat::None;
lock();
auto iter = tn_exec_stat_.find(exec_handle);
if(iter != tn_exec_stat_.cend()){
exec_stat = iter->second;
iter->second = new_exec_stat;
if(iter != tn_exec_stat_.end()){
exec_stat = iter->second.exec_status;
iter->second.exec_status = new_exec_stat;
}
unlock();
return exec_stat;
}
/** Returns the parallel execution configuration associated
with the given tensor network execution handle. **/
std::pair<int,int> getExecConfiguration(const TensorOpExecHandle exec_handle,
MPICommProxy * communicator = nullptr) {
std::pair<int,int> exec_conf{0,-1};
lock();
auto iter = tn_exec_stat_.find(exec_handle);
if(iter != tn_exec_stat_.cend()){
exec_conf = std::make_pair(iter->second.num_procs,iter->second.proc_id);
if(communicator != nullptr) *communicator = iter->second.comm;
}
unlock();
return exec_conf;
}
/** Returns the constant iterator to the current tensor network. **/
ConstTensorNetworkQueueIterator getCurrent() {
return current_network_;
......@@ -186,7 +213,7 @@ public:
protected:
/** Tensor network execution status **/
std::unordered_map<TensorOpExecHandle,ExecStat> tn_exec_stat_;
std::unordered_map<TensorOpExecHandle,NetEntry> tn_exec_stat_;
/** Queue of tensor networks to be executed **/
std::list<std::pair<std::shared_ptr<numerics::TensorNetwork>,
TensorOpExecHandle>> networks_;
......
/** ExaTN:: Tensor Runtime: Tensor graph executor: Lazy
REVISION: 2022/01/06
REVISION: 2022/01/07
Copyright (C) 2018-2022 Dmitry Lyakh
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle)
......@@ -38,8 +38,8 @@ void LazyGraphExecutor::resetNodeExecutor(std::shared_ptr<TensorNodeExecutor> no
return data_ptr;
},
CUQUANTUM_PIPELINE_DEPTH,
process_rank,
num_processes
num_processes,
process_rank
);
}
#endif
......@@ -302,7 +302,8 @@ void LazyGraphExecutor::execute(TensorNetworkQueue & tensor_network_queue) {
assert(error_code == 0);
}
if(exec_stat == TensorNetworkQueue::ExecStat::None){
exec_stat = cuquantum_executor_->execute(current->first,exec_handle);
const auto exec_conf = tensor_network_queue.getExecConfiguration(exec_handle);
exec_stat = cuquantum_executor_->execute(current->first,exec_conf.first,exec_conf.second,exec_handle);
if(exec_stat != TensorNetworkQueue::ExecStat::None){
auto prev_exec_stat = tensor_network_queue.updateExecStatus(exec_handle,exec_stat);
std::cout << "#DEBUG(exatn::runtime::LazyGraphExecutor::execute): Submitted tensor network to cuQuantum\n";
......
/** ExaTN:: Tensor Runtime: Task-based execution layer for tensor operations
REVISION: 2022/01/06
REVISION: 2022/01/07
Copyright (C) 2018-2022 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle)
......@@ -109,7 +109,7 @@ void TensorRuntime::executionThreadWorkflow()
executing_.store(true); //reaffirm that DAG is still executing
}else{
graph_executor_->execute(tensor_network_queue_);
executing_.store(false); //executing_ is set to FALSE by the execution thread
if(!(current_dag_->hasUnexecutedNodes())) executing_.store(false); //executing_ is set to FALSE by the execution thread
}
}
processTensorDataRequests(); //process all outstanding client requests for tensor data (synchronous)
......@@ -292,9 +292,11 @@ bool TensorRuntime::sync(bool wait) {
#ifdef CUQUANTUM
TensorOpExecHandle TensorRuntime::submit(std::shared_ptr<numerics::TensorNetwork> network)
TensorOpExecHandle TensorRuntime::submit(std::shared_ptr<numerics::TensorNetwork> network,
const MPICommProxy & communicator,
unsigned int num_processes, unsigned int process_rank)
{
return tensor_network_queue_.append(network);
return tensor_network_queue_.append(network,communicator,num_processes,process_rank);
}
......
/** ExaTN:: Tensor Runtime: Task-based execution layer for tensor operations
REVISION: 2021/12/27
REVISION: 2022/01/07
Copyright (C) 2018-2021 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle)
Copyright (C) 2018-2022 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle)
Rationale:
(a) The execution space consists of one or more DAGs in which nodes
......@@ -146,7 +146,10 @@ public:
/** Submits an entire tensor network for processing as a whole.
The returned execution handle can be used for checking the status
of the tensor network execution. Zero on return means unsuccessful submission. **/
TensorOpExecHandle submit(std::shared_ptr<numerics::TensorNetwork> network); //in: tensor network
TensorOpExecHandle submit(std::shared_ptr<numerics::TensorNetwork> network, //in: tensor network
const MPICommProxy & communicator, //MPI communicator proxy
unsigned int num_processes, //in: number of executing processes
unsigned int process_rank); //in: rank of the current executing process
/** Tests for completion of processing of a whole tensor network.
A valid execution handle obtained during tensor network
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment