Commit 67452e37 authored by Dmitry I. Lyakh's avatar Dmitry I. Lyakh
Browse files

Connected some of the TalshNodeExecutor functionality to CuQuantumExecutor.


Signed-off-by: default avatarDmitry I. Lyakh <quant4me@gmail.com>
parent 5e56a54b
/** ExaTN: Tensor Runtime: Tensor network executor: NVIDIA cuQuantum
REVISION: 2021/12/24
REVISION: 2021/12/27
Copyright (C) 2018-2021 Dmitry Lyakh
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle)
......@@ -16,6 +16,7 @@ Rationale:
#include <vector>
#include <unordered_map>
#include <type_traits>
#include <iostream>
......@@ -57,43 +58,45 @@ struct TensorNetworkReq {
};
CuQuantumExecutor::CuQuantumExecutor()
CuQuantumExecutor::CuQuantumExecutor(TensorImplFunc tensor_data_access_func):
tensor_data_access_func_(std::move(tensor_data_access_func))
{
static_assert(std::is_same<cutensornetHandle_t,void*>::value,"#FATAL(exatn::runtime::CuQuantumExecutor): cutensornetHandle_t != (void*)");
const size_t version = cutensornetGetVersion();
std::cout << "#DEBUG(exatn::runtime::CuQuantumExecutor): cuTensorNet backend version " << version << std::endl;
int num_gpus = 0;
auto error_code = talshDeviceCount(DEV_NVIDIA_GPU,&num_gpus); assert(error_code == TALSH_SUCCESS);
for(int i = 0; i < num_gpus; ++i){
if(talshDeviceState(i,DEV_NVIDIA_GPU) >= DEV_ON) gpus.emplace_back(i);
if(talshDeviceState(i,DEV_NVIDIA_GPU) >= DEV_ON) gpus_.emplace_back(i);
}
std::cout << "#DEBUG(exatn::runtime::CuQuantumExecutor): Number of available GPUs = " << gpus.size() << std::endl;
std::cout << "#DEBUG(exatn::runtime::CuQuantumExecutor): Number of available GPUs = " << gpus_.size() << std::endl;
ctn_handles.resize(gpus.size());
for(const auto & gpu_id: gpus){
ctn_handles_.resize(gpus_.size());
for(const auto & gpu_id: gpus_){
HANDLE_CUDA_ERROR(cudaSetDevice(gpu_id));
HANDLE_CTN_ERROR(cutensornetCreate((cutensornetHandle_t*)(&ctn_handles[gpu_id])));
HANDLE_CTN_ERROR(cutensornetCreate((cutensornetHandle_t*)(&ctn_handles_[gpu_id])));
}
std::cout << "#DEBUG(exatn::runtime::CuQuantumExecutor): Created cuTensorNet contexts for all available GPUs" << std::endl;
}
CuQuantumExecutor::~CuQuantumExecutor()
{
bool success = sync(); assert(success);
for(const auto & gpu_id: gpus){
for(const auto & gpu_id: gpus_){
HANDLE_CUDA_ERROR(cudaSetDevice(gpu_id));
HANDLE_CTN_ERROR(cutensornetDestroy((cutensornetHandle_t)(ctn_handles[gpu_id])));
HANDLE_CTN_ERROR(cutensornetDestroy((cutensornetHandle_t)(ctn_handles_[gpu_id])));
}
std::cout << "#DEBUG(exatn::runtime::CuQuantumExecutor): Destroyed cuTensorNet contexts for all available GPUs" << std::endl;
ctn_handles.clear();
gpus.clear();
ctn_handles_.clear();
gpus_.clear();
}
int CuQuantumExecutor::execute(std::shared_ptr<numerics::TensorNetwork> network,
TensorOpExecHandle exec_handle)
const TensorOpExecHandle exec_handle)
{
int error_code = 0;
//`Finish
......@@ -101,14 +104,14 @@ int CuQuantumExecutor::execute(std::shared_ptr<numerics::TensorNetwork> network,
}
bool CuQuantumExecutor::executing(TensorOpExecHandle exec_handle)
bool CuQuantumExecutor::executing(const TensorOpExecHandle exec_handle)
{
auto iter = active_networks_.find(exec_handle);
return (iter != active_networks_.end());
}
bool CuQuantumExecutor::sync(TensorOpExecHandle exec_handle,
bool CuQuantumExecutor::sync(const TensorOpExecHandle exec_handle,
int * error_code,
bool wait)
{
......
/** ExaTN: Tensor Runtime: Tensor network executor: NVIDIA cuQuantum
REVISION: 2021/12/24
REVISION: 2021/12/27
Copyright (C) 2018-2021 Dmitry Lyakh
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle)
......@@ -18,19 +18,29 @@ Rationale:
#include <unordered_map>
#include <vector>
#include <functional>
#include "tensor_network_queue.hpp"
namespace talsh{
class Tensor;
}
namespace exatn {
namespace runtime {
using TensorImplFunc = std::function<const void*(const numerics::Tensor &, int, int, std::size_t *)>;
using TensorImplTalshFunc = std::function<std::shared_ptr<talsh::Tensor>(const numerics::Tensor &, int, int)>;
struct TensorNetworkReq;
class CuQuantumExecutor {
public:
CuQuantumExecutor();
CuQuantumExecutor(TensorImplFunc tensor_data_access_func);
CuQuantumExecutor(const CuQuantumExecutor &) = delete;
CuQuantumExecutor & operator=(CuQuantumExecutor &) = delete;
CuQuantumExecutor(CuQuantumExecutor &&) noexcept = delete;
......@@ -38,11 +48,11 @@ public:
virtual ~CuQuantumExecutor();
int execute(std::shared_ptr<numerics::TensorNetwork> network,
TensorOpExecHandle exec_handle);
const TensorOpExecHandle exec_handle);
bool executing(TensorOpExecHandle exec_handle);
bool executing(const TensorOpExecHandle exec_handle);
bool sync(TensorOpExecHandle exec_handle,
bool sync(const TensorOpExecHandle exec_handle,
int * error_code,
bool wait = true);
......@@ -53,9 +63,11 @@ protected:
/** Currently processed tensor networks **/
std::unordered_map<TensorOpExecHandle,std::shared_ptr<TensorNetworkReq>> active_networks_;
/** GPU Ids available to the current process **/
std::vector<int> gpus;
std::vector<int> gpus_;
/** cuTensorNet contexts for all available GPUs **/
std::vector<void*> ctn_handles; //cutensornetHandle_t = void*
std::vector<void*> ctn_handles_; //cutensornetHandle_t = void*
/** Tensor data access function **/
TensorImplFunc tensor_data_access_func_;
};
} //namespace runtime
......
/** ExaTN: Tensor Runtime: Tensor network executor: Execution queue
REVISION: 2021/12/24
REVISION: 2021/12/27
Copyright (C) 2018-2021 Dmitry Lyakh
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle)
......@@ -17,6 +17,7 @@ Rationale:
#include "tensor_network.hpp"
#include "tensor_operation.hpp"
#include <unordered_map>
#include <list>
#include <memory>
#include <atomic>
......@@ -31,6 +32,15 @@ class TensorNetworkQueue {
public:
//Tensor network execution status:
enum class ExecStat {
None, //no execution status
Idle, //submitted but execution has not yet started
Preparing, //preparation for execution has started
Executing, //actual execution (numerical computation) has started
Completed //execution completed
};
using TensorNetworkQueueIterator =
std::list<std::pair<std::shared_ptr<numerics::TensorNetwork>,TensorOpExecHandle>>::iterator;
......@@ -51,6 +61,7 @@ public:
ConstTensorNetworkQueueIterator cbegin() {return networks_.cbegin();}
ConstTensorNetworkQueueIterator cend() {return networks_.cend();}
/** Returns TRUE is the tensor network queue is empty, FALSE otherwise. **/
bool isEmpty() {
lock();
bool empt = networks_.empty();
......@@ -58,6 +69,7 @@ public:
return empt;
}
/** Returns the current size of the tensor network queue. **/
std::size_t getSize() {
lock();
const std::size_t current_size = networks_.size();
......@@ -65,26 +77,57 @@ public:
return current_size;
}
/** Appends a new tensor network to the queue (no repeats allowed).
Upon success, returns a positive execution handle, zero otherwise. **/
TensorOpExecHandle append(std::shared_ptr<numerics::TensorNetwork> network) {
lock();
const TensorOpExecHandle tn_hash = getTensorNetworkHash(network);
networks_.emplace_back(std::make_pair(network,tn_hash));
TensorOpExecHandle tn_hash = getTensorNetworkHash(network);
auto res = tn_exec_stat_.emplace(std::make_pair(tn_hash,ExecStat::Idle));
if(res.second){
networks_.emplace_back(std::make_pair(network,tn_hash));
}else{
tn_hash = 0;
}
unlock();
return tn_hash;
}
/** Removes the tensor network currently pointed to from the queue.
The tensor network execution status must be marked Completed. **/
void remove() {
lock();
assert(current_network_ != networks_.end());
auto iter = tn_exec_stat_.find(current_network_->second);
if(iter != tn_exec_stat_.end()){
if(iter->second == ExecStat::Completed){
tn_exec_stat_.erase(iter);
}else{
std::cout << "#ERROR(exatn::runtime::TensorNetworkQueue): Attempt to delete an unfinished tensor network!\n";
assert(false);
}
}
current_network_ = networks_.erase(current_network_);
unlock();
return;
}
/** Returns the execution status associated with
the given tensor network execution handle. **/
ExecStat checkExecStatus(const TensorOpExecHandle exec_handle) {
auto exec_stat = ExecStat::None;
lock();
auto iter = tn_exec_stat_.find(exec_handle);
if(iter != tn_exec_stat_.cend()) exec_stat = iter->second;
unlock();
return exec_stat;
}
/** Returns the constant iterator to the current tensor network. **/
ConstTensorNetworkQueueIterator getCurrent() {
return current_network_;
}
/** Returns the current iterator to the beginning of the queue. **/
void reset() {
lock();
current_network_ = networks_.begin();
......@@ -92,6 +135,8 @@ public:
return;
}
/** Returns TRUE if the current iterator is positioned
after the end of the queue, FALSE otherwise. **/
bool isOver() {
lock();
bool over = (current_network_ == networks_.end());
......@@ -99,6 +144,9 @@ public:
return over;
}
/** Moves the current iterator to the next element of the queue.
If moved past the end, return FALSE, otherwise TRUE.
The current iterator must be valid on entrance. **/
bool next() {
lock();
assert(current_network_ != networks_.end());
......@@ -108,14 +156,18 @@ public:
return not_over;
}
/** Locks. **/
inline void lock(){queue_lock_.lock();}
inline void unlock(){queue_lock_.unlock();}
protected:
/** Tensor network execution status **/
std::unordered_map<TensorOpExecHandle,ExecStat> tn_exec_stat_;
/** Queue of tensor networks to be executed **/
std::list<std::pair<std::shared_ptr<numerics::TensorNetwork>,
TensorOpExecHandle>> networks_;
/** Tensor network iterator **/
TensorNetworkQueueIterator current_network_;
std::mutex queue_lock_;
};
......
/** ExaTN:: Tensor Runtime: Tensor graph executor: Lazy
REVISION: 2021/12/24
REVISION: 2021/12/27
Copyright (C) 2018-2021 Dmitry Lyakh
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle)
......@@ -28,9 +28,16 @@ void LazyGraphExecutor::resetNodeExecutor(std::shared_ptr<TensorNodeExecutor> no
unsigned int process_rank,
unsigned int global_process_rank)
{
TensorGraphExecutor::resetNodeExecutor(node_executor,parameters,process_rank,global_process_rank);
TensorGraphExecutor::resetNodeExecutor(node_executor,parameters,process_rank,global_process_rank);
#ifdef CUQUANTUM
if(node_executor) cuquantum_executor_ = std::make_shared<CuQuantumExecutor>();
if(node_executor){
cuquantum_executor_ = std::make_shared<CuQuantumExecutor>(
[this](const numerics::Tensor & tensor, int device_kind, int device_id, std::size_t * size){
const void * data_ptr = this->node_executor_->getTensorImage(tensor,device_kind,device_id,size);
return data_ptr;
}
);
}
#endif
return;
}
......@@ -268,10 +275,12 @@ void LazyGraphExecutor::execute(TensorGraph & dag) {
void LazyGraphExecutor::execute(TensorNetworkQueue & tensor_network_queue) {
std::cout << "#DEBUG(exatn::runtime::LazyGraphExecutor::execute): Started executing the tensor network queue via cuQuantum\n";
std::cout << "#DEBUG(exatn::runtime::LazyGraphExecutor::execute): Started executing the tensor network queue via cuQuantum: "
<< tensor_network_queue.getSize() << " elements detected" << std::endl;
#ifdef CUQUANTUM
assert(node_executor_);
//Synchronize the node executor:
node_executor_->sync();
bool synced = node_executor_->sync(); assert(synced);
node_executor_->clearCache();
//Process the tensor network queue:
while(!tensor_network_queue.isEmpty()){
......@@ -282,7 +291,7 @@ void LazyGraphExecutor::execute(TensorNetworkQueue & tensor_network_queue) {
const auto exec_handle = current->second;
if(cuquantum_executor_->executing(exec_handle)){
int error_code = 0;
auto synced = cuquantum_executor_->sync(exec_handle,&error_code,false);
synced = cuquantum_executor_->sync(exec_handle,&error_code,false);
assert(error_code == 0);
if(synced){
tensor_network_queue.remove();
......@@ -299,7 +308,9 @@ void LazyGraphExecutor::execute(TensorNetworkQueue & tensor_network_queue) {
}
}
}
bool synced = cuquantum_executor_->sync(); assert(synced);
synced = cuquantum_executor_->sync(); assert(synced);
#else
assert(tensor_network_queue.isEmpty());
#endif
std::cout << "#DEBUG(exatn::runtime::LazyGraphExecutor::execute): Finished executing the tensor network queue via cuQuantum\n";
return;
......
/** ExaTN:: Tensor Runtime: Tensor graph node executor: Exatensor
REVISION: 2021/21/24
REVISION: 2021/12/27
Copyright (C) 2018-2021 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle)
......@@ -87,6 +87,11 @@ public:
std::shared_ptr<talsh::Tensor> getLocalTensor(const numerics::Tensor & tensor,
const std::vector<std::pair<DimOffset,DimExtent>> & slice_spec) override;
const void * getTensorImage(const numerics::Tensor & tensor,
int device_kind,
int device_id,
std::size_t * size = nullptr) const override {return nullptr;}
const std::string name() const override {return "exatensor-node-executor";}
const std::string description() const override {return "ExaTENSOR tensor graph node executor";}
std::shared_ptr<TensorNodeExecutor> clone() override {return std::make_shared<ExatensorNodeExecutor>();}
......
/** ExaTN:: Tensor Runtime: Tensor graph node executor: Talsh
REVISION: 2021/12/24
REVISION: 2021/12/27
Copyright (C) 2018-2021 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle)
......@@ -1459,6 +1459,15 @@ std::shared_ptr<talsh::Tensor> TalshNodeExecutor::getLocalTensor(const numerics:
}
const void * TalshNodeExecutor::getTensorImage(const numerics::Tensor & tensor,
int device_kind, int device_id,
std::size_t * size) const
{
//`Implement
return nullptr;
}
bool TalshNodeExecutor::finishPrefetching(const numerics::TensorOperation & op)
{
bool synced = true;
......
/** ExaTN:: Tensor Runtime: Tensor graph node executor: Talsh
REVISION: 2021/12/24
REVISION: 2021/12/27
Copyright (C) 2018-2021 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle)
......@@ -97,6 +97,13 @@ public:
std::shared_ptr<talsh::Tensor> getLocalTensor(const numerics::Tensor & tensor,
const std::vector<std::pair<DimOffset,DimExtent>> & slice_spec) override;
/** Returns a non-owning pointer to a local tensor data image on a given device.
If unsuccessful, returns nullptr. **/
const void * getTensorImage(const numerics::Tensor & tensor, //in: tensor
int device_kind, //in: device kind (implementation specific)
int device_id, //in: device id: [0,1,2,..]
std::size_t * size = nullptr) const override; //out: tensor data image size in bytes
/** Finishes tensor operand prefetching for a given tensor operation. **/
bool finishPrefetching(const numerics::TensorOperation & op); //in: tensor operation
......
/** ExaTN:: Tensor Runtime: Tensor graph node executor
REVISION: 2021/12/24
REVISION: 2021/12/27
Copyright (C) 2018-2021 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle)
......@@ -117,6 +117,14 @@ public:
virtual std::shared_ptr<talsh::Tensor> getLocalTensor(const numerics::Tensor & tensor,
const std::vector<std::pair<DimOffset,DimExtent>> & slice_spec) = 0;
/** Returns a non-owning pointer to a local tensor data image on a given device.
If unsuccessful, returns nullptr. **/
virtual const void * getTensorImage(const numerics::Tensor & tensor, //in: tensor
int device_kind, //in: device kind (implementation specific)
int device_id, //in: device id: [0,1,2,..]
std::size_t * size = nullptr) const = 0; //out: tensor data image size in bytes
/** Clones. **/
virtual std::shared_ptr<TensorNodeExecutor> clone() = 0;
};
......
/** ExaTN:: Tensor Runtime: Task-based execution layer for tensor operations
REVISION: 2021/12/22
REVISION: 2021/12/27
Copyright (C) 2018-2021 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle)
......@@ -242,17 +242,6 @@ VertexIdType TensorRuntime::submit(std::shared_ptr<TensorOperation> op) {
}
#ifdef CUQUANTUM
bool TensorRuntime::submit(std::shared_ptr<numerics::TensorNetwork> network,
TensorOpExecHandle * exec_handle)
{
assert(exec_handle != nullptr);
*exec_handle = tensor_network_queue_.append(network);
return true;
}
#endif
bool TensorRuntime::sync(TensorOperation & op, bool wait) {
assert(currentScopeIsSet());
executing_.store(true); //reactivate the execution thread to execute the DAG in case it was not active
......@@ -301,6 +290,28 @@ bool TensorRuntime::sync(bool wait) {
}
#ifdef CUQUANTUM
TensorOpExecHandle TensorRuntime::submit(std::shared_ptr<numerics::TensorNetwork> network)
{
return tensor_network_queue_.append(network);
}
bool TensorRuntime::syncNetwork(const TensorOpExecHandle exec_handle, bool wait)
{
assert(exec_handle != 0);
bool synced = false;
while(!synced){
const auto exec_stat = tensor_network_queue_.checkExecStatus(exec_handle);
synced = (exec_stat == TensorNetworkQueue::ExecStat::None ||
exec_stat == TensorNetworkQueue::ExecStat::Completed);
if(!wait) break;
};
return synced;
}
#endif
std::future<std::shared_ptr<talsh::Tensor>> TensorRuntime::getLocalTensor(std::shared_ptr<Tensor> tensor,
const std::vector<std::pair<DimOffset,DimExtent>> & slice_spec)
{
......
/** ExaTN:: Tensor Runtime: Task-based execution layer for tensor operations
REVISION: 2021/12/22
REVISION: 2021/12/27
Copyright (C) 2018-2021 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle)
......@@ -128,12 +128,6 @@ public:
/** Submits a tensor operation into the current execution graph and returns its integer id. **/
VertexIdType submit(std::shared_ptr<TensorOperation> op); //in: tensor operation
#ifdef CUQUANTUM
/** Submits an entire tensor network for processing as a whole. **/
bool submit(std::shared_ptr<numerics::TensorNetwork> network, //in: tensor network
TensorOpExecHandle * exec_handle = nullptr); //out: assigned execution handle
#endif
/** Tests for completion of a given tensor operation.
If wait = TRUE, it will block until completion. **/
bool sync(TensorOperation & op,
......@@ -148,6 +142,19 @@ public:
If wait = TRUE, it will block until completion. **/
bool sync(bool wait = true);
#ifdef CUQUANTUM
/** Submits an entire tensor network for processing as a whole.
The returned execution handle can be used for checking the status
of the tensor network execution. Zero on return means unsuccessful submission. **/
TensorOpExecHandle submit(std::shared_ptr<numerics::TensorNetwork> network); //in: tensor network
/** Tests for completion of processing of a whole tensor network.
A valid execution handle obtained during tensor network
submission must be positive. **/
bool syncNetwork(const TensorOpExecHandle exec_handle,
bool wait = true);
#endif
/** Returns a locally stored tensor slice (talsh::Tensor) providing access to tensor elements.
This slice will be extracted from the exatn::numerics::Tensor implementation as a copy.
The returned future becomes ready once the execution thread has retrieved the slice copy. **/
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment