Commit 97bb5dc8 authored by Dmitry I. Lyakh's avatar Dmitry I. Lyakh
Browse files

Restricted dynamic DAG size to avoid RAM exhaustion


Signed-off-by: default avatarDmitry I. Lyakh <quant4me@gmail.com>
parent a1afd5cf
/** ExaTN::Numerics: Numerical server
REVISION: 2021/09/21
REVISION: 2021/09/22
Copyright (C) 2018-2021 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle) **/
......@@ -687,7 +687,7 @@ bool NumServer::submit(const ProcessGroup & process_group,
auto output_tensor = network.getTensor(0);
auto iter = tensors_.find(output_tensor->getName());
if(iter == tensors_.end()){ //output tensor does not exist and needs to be created
implicit_tensors_.emplace_back(output_tensor); //list of implicitly created tensors (for garbage collection)
implicit_tensors_.emplace(std::make_pair(output_tensor->getName(),output_tensor)); //list of implicitly created tensors (for garbage collection)
//Create output tensor:
std::shared_ptr<TensorOperation> op0 = tensor_op_factory_->createTensorOp(TensorOpCode::CREATE);
op0->setTensorOperand(output_tensor);
......@@ -1072,8 +1072,9 @@ bool NumServer::registerTensorIsometry(const std::string & name,
bool NumServer::withinTensorExistenceDomain(const std::string & tensor_name) const
{
auto iter = tensors_.find(tensor_name);
return (iter != tensors_.cend());
bool exists = (tensors_.find(tensor_name) != tensors_.cend());
if(!exists) exists = (implicit_tensors_.find(tensor_name) != implicit_tensors_.cend());
return exists;
}
const ProcessGroup & NumServer::getTensorProcessGroup(const std::string & tensor_name) const
......@@ -3002,21 +3003,25 @@ std::shared_ptr<talsh::Tensor> NumServer::getLocalTensor(const std::string & nam
void NumServer::destroyOrphanedTensors()
{
//std::cout << "#DEBUG(exatn::NumServer): Destroying orphaned tensors ... "; //debug
auto iter = implicit_tensors_.begin();
while(iter != implicit_tensors_.end()){
int ref_count = 1;
auto tens = tensors_.find((*iter)->getName());
auto tens = tensors_.find(iter->first);
if(tens != tensors_.end()) ++ref_count;
if(iter->use_count() <= ref_count){
auto tensor_mapper = getTensorMapper(getTensorProcessGroup((*iter)->getName()));
if(iter->second.use_count() <= ref_count){
//std::cout << "#DEBUG(exatn::NumServer::destroyOrphanedTensors): Orphan found with ref count "
// << ref_count << ": " << iter->first << std::endl; //debug
auto tensor_mapper = getTensorMapper(getTensorProcessGroup(iter->first));
std::shared_ptr<TensorOperation> destroy_op = tensor_op_factory_->createTensorOp(TensorOpCode::DESTROY);
destroy_op->setTensorOperand(*iter);
destroy_op->setTensorOperand(iter->second);
auto submitted = submit(destroy_op,tensor_mapper);
iter = implicit_tensors_.erase(iter);
}else{
++iter;
}
}
//std::cout << "Done\n" << std::flush; //debug
return;
}
......@@ -3024,7 +3029,7 @@ void NumServer::printImplicitTensors() const
{
std::cout << "#DEBUG(exatn::NumServer::printImplicitTensors):" << std::endl;
for(const auto & tens: implicit_tensors_){
std::cout << tens->getName() << ": Reference count = " << tens.use_count() << std::endl;
std::cout << tens.first << ": Reference count = " << tens.second.use_count() << std::endl;
}
std::cout << "#END" << std::endl << std::flush;
return;
......
/** ExaTN::Numerics: Numerical server
REVISION: 2021/09/21
REVISION: 2021/09/22
Copyright (C) 2018-2021 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle) **/
......@@ -975,7 +975,7 @@ private:
std::unordered_map<std::string,SpaceId> subname2id_; //maps a subspace name to its parental vector space id
std::unordered_map<std::string,std::shared_ptr<Tensor>> tensors_; //registered tensors (by CREATE operation)
std::list<std::shared_ptr<Tensor>> implicit_tensors_; //tensors created implicitly by the runtime (for garbage collection)
std::map<std::string,std::shared_ptr<Tensor>> implicit_tensors_; //tensors created implicitly by the runtime (for garbage collection)
std::unordered_map<std::string,ProcessGroup> tensor_comms_; //process group associated with each tensor
std::string contr_seq_optimizer_; //tensor contraction sequence optimizer invoked when evaluating tensor networks
......
......@@ -42,7 +42,7 @@
#define EXATN_TEST23
#define EXATN_TEST24
#define EXATN_TEST25
#define EXATN_TEST26 //requires input file from source
//#define EXATN_TEST26 //requires input file from source
#define EXATN_TEST27
#define EXATN_TEST28
......@@ -1757,7 +1757,7 @@ TEST(NumServerTester, testGarbage) {
// Create qubit tensors:
auto success = true;
const int NB_QUBITS = 24;
const int NB_QUBITS = 16;
for (int i = 0; i < NB_QUBITS; ++i) {
success = exatn::createTensor("Q" + std::to_string(i), TensorElementType::COMPLEX64, TensorShape{2});
......@@ -3198,7 +3198,7 @@ int main(int argc, char **argv) {
exatn::ParamConf exatn_parameters;
//Set the available CPU Host RAM size to be used by ExaTN:
exatn_parameters.setParameter("host_memory_buffer_size",8L*1024L*1024L*1024L);
exatn_parameters.setParameter("host_memory_buffer_size",2L*1024L*1024L*1024L);
#ifdef MPI_ENABLED
int thread_provided;
int mpi_error = MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &thread_provided);
......
/** ExaTN: Numerics: Symbolic tensor processing
REVISION: 2021/08/20
REVISION: 2021/09/22
Copyright (C) 2018-2021 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle) **/
......@@ -375,16 +375,27 @@ bool parse_pauli_string(const std::string & input,
const auto right_sq_pos = input.find("]",left_sq_pos);
if(right_sq_pos != std::string::npos){
paulis = input.substr(left_sq_pos,right_sq_pos-left_sq_pos+1);
const auto plus_pos = input.find("+",left_par_pos);
if(plus_pos != std::string::npos){
const auto real_len = plus_pos - left_par_pos - 1;
const auto plus_pos = input.find_last_of("+",right_par_pos);
const auto minus_pos = input.find_last_of("-",right_par_pos);
auto sep_pos = plus_pos;
if(minus_pos == std::string::npos){
sep_pos = plus_pos;
}else{
if(plus_pos == std::string::npos){
sep_pos = minus_pos;
}else{
sep_pos = std::max(minus_pos,plus_pos);
}
}
if(sep_pos != std::string::npos){
const auto real_len = sep_pos - left_par_pos - 1;
//std::cout << "#DEBUG(parse_pauli_string): Coef: " << input.substr(left_par_pos+1,real_len); //debug
if(real_len > 0) coef_real = std::stod(input.substr(left_par_pos+1,real_len));
const auto imag_end_pos = input.find("j",plus_pos);
const auto imag_end_pos = input.find("j",sep_pos);
if(imag_end_pos != std::string::npos){
const auto imag_len = imag_end_pos - plus_pos - 1;
//std::cout << " " << input.substr(plus_pos+1,imag_len) << std::endl; //debug
if(imag_len > 0) coef_imag = std::stod(input.substr(plus_pos+1,imag_len));
const auto imag_len = imag_end_pos - sep_pos - 1;
//std::cout << " " << input.substr(sep_pos+1,imag_len) << std::endl; //debug
if(imag_len > 0) coef_imag = std::stod(input.substr(sep_pos+1,imag_len));
coefficient = std::complex<double>{coef_real, coef_imag};
}else{
success = false;
......
/** ExaTN:: Tensor Runtime: Task-based execution layer for tensor operations
REVISION: 2021/09/21
REVISION: 2021/09/22
Copyright (C) 2018-2021 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle)
......@@ -134,45 +134,45 @@ void TensorRuntime::processTensorDataRequests()
void TensorRuntime::resetLoggingLevel(int level)
{
while(!graph_executor_);
graph_executor_->resetLoggingLevel(level);
logging_ = level;
return;
while(!graph_executor_);
graph_executor_->resetLoggingLevel(level);
logging_ = level;
return;
}
void TensorRuntime::resetSerialization(bool serialize, bool validation_trace)
{
while(!graph_executor_);
return graph_executor_->resetSerialization(serialize,validation_trace);
while(!graph_executor_);
return graph_executor_->resetSerialization(serialize,validation_trace);
}
void TensorRuntime::activateDryRun(bool dry_run)
{
while(!graph_executor_);
return graph_executor_->activateDryRun(dry_run);
while(!graph_executor_);
return graph_executor_->activateDryRun(dry_run);
}
void TensorRuntime::activateFastMath()
{
while(!graph_executor_);
return graph_executor_->activateFastMath();
while(!graph_executor_);
return graph_executor_->activateFastMath();
}
std::size_t TensorRuntime::getMemoryBufferSize() const
{
while(!graph_executor_);
return graph_executor_->getMemoryBufferSize();
while(!graph_executor_);
return graph_executor_->getMemoryBufferSize();
}
double TensorRuntime::getTotalFlopCount() const
{
while(!graph_executor_);
return graph_executor_->getTotalFlopCount();
while(!graph_executor_);
return graph_executor_->getTotalFlopCount();
}
......@@ -247,8 +247,8 @@ bool TensorRuntime::sync(TensorOperation & op, bool wait) {
auto opid = op.getId();
bool completed = current_dag_->nodeExecuted(opid);
while(wait && (!completed)){
executing_.store(true); //reactivate the execution thread to execute the DAG in case it was not active
completed = current_dag_->nodeExecuted(opid);
executing_.store(true); //reactivate the execution thread to execute the DAG in case it was not active
completed = current_dag_->nodeExecuted(opid);
}
return completed;
}
......@@ -260,8 +260,8 @@ bool TensorRuntime::sync(const Tensor & tensor, bool wait) {
executing_.store(true); //reactivate the execution thread to execute the DAG in case it was not active
bool completed = (current_dag_->getTensorUpdateCount(tensor) == 0);
while(wait && (!completed)){
executing_.store(true); //reactivate the execution thread to execute the DAG in case it was not active
completed = (current_dag_->getTensorUpdateCount(tensor) == 0);
executing_.store(true); //reactivate the execution thread to execute the DAG in case it was not active
completed = (current_dag_->getTensorUpdateCount(tensor) == 0);
}
//if(wait) std::cout << "Synced" << std::endl; //debug
return completed;
......@@ -269,20 +269,22 @@ bool TensorRuntime::sync(const Tensor & tensor, bool wait) {
bool TensorRuntime::sync(bool wait) {
//if(wait) std::cout << "#DEBUG(TensorRuntime::sync)[MAIN_THREAD]: Syncing ... "; //debug
assert(currentScopeIsSet());
if(current_dag_->hasUnexecutedNodes()) executing_.store(true);
bool still_working = executing_.load();
while(wait && still_working){
if(current_dag_->hasUnexecutedNodes()) executing_.store(true);
still_working = executing_.load();
if(current_dag_->hasUnexecutedNodes()) executing_.store(true);
still_working = executing_.load();
}
if(wait && (!still_working)){
if(current_dag_->getNumNodes() > MAX_RUNTIME_DAG_SIZE){
std::cout << "#DEBUG(TensorRuntime::sync)[MAIN_THREAD]: Clearing DAG ... "; //debug
current_dag_->clear();
std::cout << "Done\n" << std::flush; //debug
}
if(current_dag_->getNumNodes() > MAX_RUNTIME_DAG_SIZE){
//std::cout << "Clearing DAG ... "; //debug
current_dag_->clear();
//std::cout << "Done; "; //debug
}
}
//if(wait) std::cout << "Synced\n" << std::flush; //debug
return !still_working;
}
......
/** ExaTN:: Tensor Runtime: Task-based execution layer for tensor operations
REVISION: 2021/09/21
REVISION: 2021/09/22
Copyright (C) 2018-2021 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle)
......@@ -69,7 +69,7 @@ class TensorRuntime final {
public:
static constexpr std::size_t MAX_RUNTIME_DAG_SIZE = 1000000000;
static constexpr std::size_t MAX_RUNTIME_DAG_SIZE = 8192; //max allowed DAG size during runtime
#ifdef MPI_ENABLED
TensorRuntime(const MPICommProxy & communicator, //MPI communicator proxy
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment