Commit 12b87ac9 authored by Dmitry I. Lyakh's avatar Dmitry I. Lyakh
Browse files

Fixed few bugs, having memory corruption still ...


Signed-off-by: default avatarDmitry I. Lyakh <quant4me@gmail.com>
parent da2a00d4
/** ExaTN::Numerics: General client header (free function API)
REVISION: 2022/01/07
REVISION: 2022/01/08
Copyright (C) 2018-2022 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle) **/
......@@ -866,12 +866,12 @@ inline bool evaluateSync(const ProcessGroup & process_group, //in: chosen group
/** Synchronizes all outstanding update operations on a given tensor specified by
its symbolic name. If ProcessGroup is not provided, defaults to the local process.**/
inline bool sync(const std::string & name, //in: tensor name
bool wait = true) //in: wait versus test for completion
bool wait) //in: wait versus test for completion
{return numericalServer->sync(name,wait);}
inline bool sync(const ProcessGroup & process_group, //in: chosen group of MPI processes
const std::string & name, //in: tensor name
bool wait = true) //in: wait versus test for completion
bool wait) //in: wait versus test for completion
{return numericalServer->sync(process_group,name,wait);}
......
/** ExaTN::Numerics: Numerical server
REVISION: 2022/01/07
REVISION: 2022/01/08
Copyright (C) 2018-2022 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle) **/
......@@ -198,10 +198,10 @@ void NumServer::reconfigureTensorRuntime(const ParamConf & parameters,
void NumServer::switchComputationalBackend(const std::string & backend_name)
{
bool success = tensor_rt_->sync(); assert(success);
//bool success = sync(); assert(success);
if(logging_ > 0 && backend_name != comp_backend_){
logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
<< "]: Switching computational backend to " << backend_name << std::endl << std::flush;
<< "]: Switched computational backend to " << backend_name << std::endl << std::flush;
}
if(backend_name == "default"){
comp_backend_ = backend_name;
......@@ -210,7 +210,8 @@ void NumServer::switchComputationalBackend(const std::string & backend_name)
comp_backend_ = backend_name;
#endif
}else{
std::cout << "#ERROR(exatn::NumServer): switchComputationalBackend: Unknown backend: " << backend_name << std::endl;
std::cout << "#ERROR(exatn::NumServer): switchComputationalBackend: Unknown backend: "
<< backend_name << std::endl << std::flush;
std::abort();
}
return;
......@@ -916,15 +917,16 @@ bool NumServer::submit(const ProcessGroup & process_group,
assert(local_rank < num_procs);
if(logging_ > 0) logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
<< "]: Submitting tensor network <" << network->getName() << "> (" << network->getTensor(0)->getName()
<< ") for execution via cuQuantum by " << num_procs << " processes with memory limit "
<< process_group.getMemoryLimitPerProcess() << " bytes" << std::endl << std::flush;
<< ":" << getTensorNetworkHash(network) << ") for execution via cuQuantum by " << num_procs
<< " processes with memory limit " << process_group.getMemoryLimitPerProcess() << " bytes\n" << std::flush;
if(logging_ > 0) network->printItFile(logfile_);
const auto exec_handle = tensor_rt_->submit(network,process_group.getMPICommProxy(),num_procs,local_rank);
bool success = (exec_handle != 0);
if(success){
auto res = tn_exec_handles_.emplace(std::make_pair(network->getTensor(0)->getTensorHash(),exec_handle));
success = res.second;
if(success && logging_ > 0) logfile_ << "Number of submitted networks via cuQuantum = 1" << std::endl << std::flush;
if(success && logging_ > 0) logfile_ << "Execution handle of the submitted network via cuQuantum is "
<< exec_handle << std::endl << std::flush;
}
return success;
}
......@@ -1076,16 +1078,25 @@ bool NumServer::sync(const ProcessGroup & process_group, const Tensor & tensor,
{
bool success = true;
if(!process_group.rankIsIn(process_rank_)) return success; //process is not in the group: Do nothing
#ifdef CUQUANTUM
if(comp_backend_ == "cuquantum"){
auto iter = tn_exec_handles_.find(tensor.getTensorHash());
bool synced = (iter == tn_exec_handles_.end());
if(!synced) synced = tensor_rt_->syncNetwork(iter->second,wait);
return synced;
}
#endif
auto iter = tensors_.find(tensor.getName());
if(iter != tensors_.end()){
#ifdef CUQUANTUM
if(comp_backend_ == "cuquantum"){
auto cuter = tn_exec_handles_.find(iter->second->getTensorHash());
success = (cuter == tn_exec_handles_.end());
if(!success){
success = tensor_rt_->syncNetwork(cuter->second,wait);
if(success){
if(logging_ > 0) logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
<< "]: Locally synchronized cuQuantum execution handle " << cuter->second << " via tensor <" << tensor.getName() << ">"
<< std::endl << std::flush;
tn_exec_handles_.erase(cuter);
}
}
return success;
}
#endif
if(iter->second->isComposite()){
auto composite_tensor = castTensorComposite(iter->second); assert(composite_tensor);
for(auto subtens = composite_tensor->begin(); subtens != composite_tensor->end(); ++subtens){
......
......@@ -3791,7 +3791,7 @@ TEST(NumServerTester, CuTensorNet) {
const int NUM_REPEATS = 1;
exatn::resetLoggingLevel(1,2); //debug
exatn::resetLoggingLevel(2,2); //debug
bool success = true;
......@@ -3807,23 +3807,24 @@ TEST(NumServerTester, CuTensorNet) {
success = exatn::initTensorRnd("C"); assert(success);
success = exatn::initTensor("D",0.0); assert(success);
exatn::switchComputationalBackend("default");
success = exatn::sync(); assert(success);
exatn::switchComputationalBackend("cuquantum");
//Contract tensor network:
int num_repeats = NUM_REPEATS;
while(--num_repeats >= 0){
success = exatn::sync(); assert(success);
std::cout << "D(m,x,n,y)+=A(m,h,k,n)*B(u,k,h)*C(x,u,y): ";
auto flops = exatn::getTotalFlopCount();
auto time_start = exatn::Timer::timeInSecHR();
success = exatn::evaluateTensorNetwork("cuNet","D(m,x,n,y)+=A(m,h,k,n)*B(u,k,h)*C(x,u,y)");
assert(success);
success = exatn::sync("D"); assert(success);
success = exatn::evaluateTensorNetwork("cuNet","D(m,x,n,y)+=A(m,h,k,n)*B(u,k,h)*C(x,u,y)"); assert(success);
success = exatn::sync("D",true); assert(success);
auto duration = exatn::Timer::timeInSecHR(time_start);
flops = exatn::getTotalFlopCount() - flops;
std::cout << "Performance = " << (flops / (1e9 * duration)) << " Gflop/s" << std::endl;
std::cout << "Duration = " << duration << " s; Performance = " << (flops / (1e9 * duration)) << " Gflop/s\n";
}
//std::this_thread::sleep_for(std::chrono::microseconds(1000000));
//Destroy tensors:
success = exatn::sync(); assert(success);
success = exatn::destroyTensor("D"); assert(success);
......
/** ExaTN: Tensor basic types and parameters
REVISION: 2021/10/15
REVISION: 2022/01/07
Copyright (C) 2018-2021 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle) **/
Copyright (C) 2018-2022 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle) **/
#ifndef EXATN_NUMERICS_TENSOR_BASIC_HPP_
#define EXATN_NUMERICS_TENSOR_BASIC_HPP_
......@@ -63,22 +63,22 @@ enum class IndexKind{
//Basic tensor operations:
enum class TensorOpCode{
NOOP, //no operation
CREATE, //tensor creation
DESTROY, //tensor destruction
TRANSFORM, //tensor transformation/initialization
SLICE, //tensor slicing
INSERT, //tensor insertion
ADD, //tensor addition
CONTRACT, //tensor contraction
DECOMPOSE_SVD3, //tensor decomposition via SVD into three tensor factors
DECOMPOSE_SVD2, //tensor decomposition via SVD into two tensor factors
ORTHOGONALIZE_SVD, //tensor orthogonalization via SVD
ORTHOGONALIZE_MGS, //tensor orthogonalization via Modified Gram-Schmidt
FETCH, //fetch tensor data from another MPI process (parallel execution only)
UPLOAD, //upload tensor data to another MPI process (parallel execution only)
BROADCAST, //tensor broadcast (parallel execution only)
ALLREDUCE //tensor allreduce (parallel execution only)
NOOP, //0: no operation
CREATE, //1: tensor creation
DESTROY, //2: tensor destruction
TRANSFORM, //3: tensor transformation/initialization
SLICE, //4: tensor slicing
INSERT, //5: tensor insertion
ADD, //6: tensor addition
CONTRACT, //7: tensor contraction
DECOMPOSE_SVD3, //8: tensor decomposition via SVD into three tensor factors
DECOMPOSE_SVD2, //9: tensor decomposition via SVD into two tensor factors
ORTHOGONALIZE_SVD, //10: tensor orthogonalization via SVD
ORTHOGONALIZE_MGS, //11: tensor orthogonalization via Modified Gram-Schmidt
FETCH, //12: fetch tensor data from another MPI process (parallel execution only)
UPLOAD, //13: upload tensor data to another MPI process (parallel execution only)
BROADCAST, //14: tensor broadcast (parallel execution only)
ALLREDUCE //15: tensor allreduce (parallel execution only)
};
......
/** ExaTN: Tensor Runtime: Tensor network executor: NVIDIA cuQuantum
REVISION: 2022/01/07
REVISION: 2022/01/08
Copyright (C) 2018-2022 Dmitry Lyakh
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle)
......@@ -114,7 +114,7 @@ CuQuantumExecutor::CuQuantumExecutor(TensorImplFunc tensor_data_access_func,
unsigned int pipeline_depth,
unsigned int num_processes, unsigned int process_rank):
tensor_data_access_func_(std::move(tensor_data_access_func)),
pipe_depth_(pipeline_depth), num_processes_(num_processes), process_rank_(process_rank)
pipe_depth_(pipeline_depth), num_processes_(num_processes), process_rank_(process_rank), flops_(0.0)
{
static_assert(std::is_same<cutensornetHandle_t,void*>::value,"#FATAL(exatn::runtime::CuQuantumExecutor): cutensornetHandle_t != (void*)");
......@@ -442,6 +442,7 @@ void CuQuantumExecutor::planExecution(std::shared_ptr<TensorNetworkReq> tn_req)
tn_req->opt_info,
CUTENSORNET_CONTRACTION_OPTIMIZER_INFO_FLOP_COUNT,
&flops,sizeof(flops)));
flops_ += flops;
}
tn_req->exec_status = TensorNetworkQueue::ExecStat::Planning;
return;
......
/** ExaTN: Tensor Runtime: Tensor network executor: NVIDIA cuQuantum
REVISION: 2022/01/07
REVISION: 2022/01/08
Copyright (C) 2018-2022 Dmitry Lyakh
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle)
......@@ -68,6 +68,9 @@ public:
/** Synchronizes execution of all submitted tensor networks to completion. **/
void sync();
/** Returns total executed flop count. **/
double getTotalFlopCount() const {return flops_;}
protected:
static constexpr float WORKSPACE_FRACTION = 0.6;
......@@ -106,6 +109,8 @@ protected:
const unsigned int num_processes_;
/** Current process rank **/
const unsigned int process_rank_;
/** Executed flops **/
double flops_;
};
} //namespace runtime
......
/** ExaTN:: Tensor Runtime: Tensor graph executor: Lazy
REVISION: 2022/01/07
REVISION: 2022/01/08
Copyright (C) 2018-2022 Dmitry Lyakh
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle)
......@@ -18,7 +18,9 @@ Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle)
#include "errors.hpp"
//#define DEBUG
#ifndef NDEBUG
#define DEBUG
#endif
namespace exatn {
namespace runtime {
......@@ -37,7 +39,7 @@ void LazyGraphExecutor::resetNodeExecutor(std::shared_ptr<TensorNodeExecutor> no
void * data_ptr = this->node_executor_->getTensorImage(tensor,device_kind,device_id,size);
return data_ptr;
},
CUQUANTUM_PIPELINE_DEPTH,
cuquantum_pipe_depth_,
num_processes,
process_rank
);
......@@ -280,8 +282,8 @@ void LazyGraphExecutor::execute(TensorGraph & dag) {
void LazyGraphExecutor::execute(TensorNetworkQueue & tensor_network_queue) {
#ifdef CUQUANTUM
std::cout << "#DEBUG(exatn::runtime::LazyGraphExecutor::execute): Started executing the tensor network queue via cuQuantum: "
<< tensor_network_queue.getSize() << " networks detected" << std::endl;
//std::cout << "#DEBUG(exatn::runtime::LazyGraphExecutor::execute): Started executing the tensor network queue via cuQuantum: "
// << tensor_network_queue.getSize() << " networks detected" << std::endl;
assert(node_executor_);
//Synchronize the node executor:
bool synced = node_executor_->sync(); assert(synced);
......@@ -292,7 +294,7 @@ void LazyGraphExecutor::execute(TensorNetworkQueue & tensor_network_queue) {
bool not_over = !tensor_network_queue.isOver();
while(not_over){
const auto current_pos = tensor_network_queue.getCurrentPos();
if(current_pos < CUQUANTUM_PIPELINE_DEPTH){
if(current_pos < cuquantum_pipe_depth_){
const auto current = tensor_network_queue.getCurrent();
const auto exec_handle = current->second;
int error_code = 0;
......@@ -302,18 +304,40 @@ void LazyGraphExecutor::execute(TensorNetworkQueue & tensor_network_queue) {
assert(error_code == 0);
}
if(exec_stat == TensorNetworkQueue::ExecStat::None){
if(logging_.load() != 0){
logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
<< "](LazyGraphExecutor)[EXEC_THREAD]: Submitting to cuQuantum tensor network "
<< exec_handle << ": Status = ";
#ifdef DEBUG
logfile_.flush();
#endif
}
const auto exec_conf = tensor_network_queue.getExecConfiguration(exec_handle);
exec_stat = cuquantum_executor_->execute(current->first,exec_conf.first,exec_conf.second,exec_handle);
if(logging_.load() != 0){
logfile_ << static_cast<int>(exec_stat) << std::endl;
#ifdef DEBUG
logfile_.flush();
#endif
}
if(exec_stat != TensorNetworkQueue::ExecStat::None){
auto prev_exec_stat = tensor_network_queue.updateExecStatus(exec_handle,exec_stat);
std::cout << "#DEBUG(exatn::runtime::LazyGraphExecutor::execute): Submitted tensor network to cuQuantum\n";
//std::cout << "#DEBUG(exatn::runtime::LazyGraphExecutor::execute): Submitted tensor network to cuQuantum\n";
}
not_over = tensor_network_queue.next();
}else if(exec_stat == TensorNetworkQueue::ExecStat::Completed){
if(logging_.load() != 0){
logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
<< "](LazyGraphExecutor)[EXEC_THREAD]: Completed via cuQuantum tensor network " << exec_handle << std::endl;
#ifdef DEBUG
logfile_.flush();
#endif
}
auto prev_exec_stat = tensor_network_queue.updateExecStatus(exec_handle,exec_stat);
assert(current_pos == 0);
tensor_network_queue.remove();
std::cout << "#DEBUG(exatn::runtime::LazyGraphExecutor::execute): Completed tensor network execution via cuQuantum\n";
//std::cout << "#DEBUG(exatn::runtime::LazyGraphExecutor::execute): Completed tensor network execution via cuQuantum\n";
not_over = !tensor_network_queue.isOver();
}else{
auto prev_exec_stat = tensor_network_queue.updateExecStatus(exec_handle,exec_stat);
......@@ -325,12 +349,23 @@ void LazyGraphExecutor::execute(TensorNetworkQueue & tensor_network_queue) {
}
}
cuquantum_executor_->sync();
std::cout << "#DEBUG(exatn::runtime::LazyGraphExecutor::execute): Finished executing the tensor network queue via cuQuantum\n";
//std::cout << "#DEBUG(exatn::runtime::LazyGraphExecutor::execute): Finished executing the tensor network queue via cuQuantum\n";
#else
assert(tensor_network_queue.isEmpty());
#endif
return;
}
double LazyGraphExecutor::getTotalFlopCount() const
{
while(!node_executor_);
double flops = node_executor_->getTotalFlopCount();
#ifdef CUQUANTUM
while(!cuquantum_executor_);
flops += cuquantum_executor_->getTotalFlopCount();
#endif
return flops;
}
} //namespace runtime
} //namespace exatn
/** ExaTN:: Tensor Runtime: Tensor graph executor: Lazy
REVISION: 2022/01/06
REVISION: 2022/01/08
Copyright (C) 2018-2022 Dmitry Lyakh, Alex McCaskey
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle)
......@@ -32,6 +32,9 @@ public:
LazyGraphExecutor(): pipeline_depth_(DEFAULT_PIPELINE_DEPTH),
prefetch_depth_(DEFAULT_PREFETCH_DEPTH)
#ifdef CUQUANTUM
,cuquantum_pipe_depth_(CUQUANTUM_PIPELINE_DEPTH)
#endif
{
}
......@@ -71,6 +74,9 @@ public:
return pipeline_depth_;
}
/** Returns the current value of the total Flop count executed by the node executor. **/
virtual double getTotalFlopCount() const override;
const std::string name() const override {return "lazy-dag-executor";}
const std::string description() const override {return "Lazy tensor graph executor";}
std::shared_ptr<TensorGraphExecutor> clone() override {return std::make_shared<LazyGraphExecutor>();}
......@@ -80,6 +86,7 @@ protected:
unsigned int pipeline_depth_; //max number of active tensor operations in flight
unsigned int prefetch_depth_; //max number of tensor operations with active prefetch in flight
#ifdef CUQUANTUM
unsigned int cuquantum_pipe_depth_; //max number of actively executed tensor networks via cuQuantum
std::shared_ptr<CuQuantumExecutor> cuquantum_executor_; //cuQuantum executor
#endif
};
......
/** ExaTN:: Tensor Runtime: Tensor graph executor
REVISION: 2022/01/06
REVISION: 2022/01/08
Copyright (C) 2018-2022 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle)
......@@ -124,7 +124,7 @@ public:
}
/** Returns the current value of the total Flop count executed by the node executor. **/
double getTotalFlopCount() const {
virtual double getTotalFlopCount() const {
while(!node_executor_);
return node_executor_->getTotalFlopCount();
}
......
/** ExaTN:: Tensor Runtime: Task-based execution layer for tensor operations
REVISION: 2022/01/07
REVISION: 2022/01/08
Copyright (C) 2018-2022 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle)
......@@ -296,13 +296,16 @@ TensorOpExecHandle TensorRuntime::submit(std::shared_ptr<numerics::TensorNetwork
const MPICommProxy & communicator,
unsigned int num_processes, unsigned int process_rank)
{
return tensor_network_queue_.append(network,communicator,num_processes,process_rank);
const auto exec_handle = tensor_network_queue_.append(network,communicator,num_processes,process_rank);
executing_.store(true); //signal to the execution thread to execute the queue
return exec_handle;
}
bool TensorRuntime::syncNetwork(const TensorOpExecHandle exec_handle, bool wait)
{
assert(exec_handle != 0);
executing_.store(true); //reactivate the execution thread in case it was not active
bool synced = false;
while(!synced){
const auto exec_stat = tensor_network_queue_.checkExecStatus(exec_handle);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment