Commit 2bd49146 authored by Dmitry I. Lyakh's avatar Dmitry I. Lyakh

Fixed TAL-SH initialization in TalshNodeExecutor.

TensorRuntimeTester works now, but incomplete (requires DAG executor implementation).
parent 680564c0
Pipeline #69765 passed with stage
in 5 minutes and 57 seconds
#include "exatn.hpp"
#include <iostream>
namespace exatn {
void initialize() {
if(!exatnFrameworkInitialized){
serviceRegistry->initialize();
numericalServer = std::make_shared<NumServer>();
exatnFrameworkInitialized = true;
std::cout << "#DEBUG(exatn): ExaTN services initialized" << std::endl << std::flush;
numericalServer = std::make_shared<NumServer>();
std::cout << "#DEBUG(exatn): ExaTN numerical server initialized" << std::endl << std::flush;
}
return;
}
......@@ -18,8 +22,9 @@ bool isInitialized() {
void finalize() {
exatnFrameworkInitialized = false;
numericalServer.reset();
exatnFrameworkInitialized = false;
std::cout << "#DEBUG(exatn): ExaTN numerical server shut down" << std::endl << std::flush;
return;
}
......
#ifndef EXATN_SERVICE_HPP_
#define EXATN_SERVICE_HPP_
#include "ServiceRegistry.hpp"
#include <iostream>
#include <memory>
#include "ServiceRegistry.hpp"
#include <cassert>
namespace exatn {
......@@ -13,23 +15,28 @@ extern std::shared_ptr<ServiceRegistry> serviceRegistry;
template <typename Service>
std::shared_ptr<Service> getService(const std::string &serviceName) {
if (!exatn::exatnFrameworkInitialized) {
std::cerr << "ExaTN is not initialized: Please execute "
if(!exatn::exatnFrameworkInitialized) {
std::cerr << "#FATAL(exatn::service): Unable to get service " << serviceName << std::endl
<< "ExaTN is not initialized: Please execute "
"exatn::initialize() before using its API.\n";
assert(false);
}
auto service = serviceRegistry->getService<Service>(serviceName);
if (!service) {
std::cerr << "Invalid ExaTN Service: Could not find " << serviceName
if(!service) {
std::cerr << "#ERROR(exatn::service): Invalid ExaTN service: " << serviceName
<< " in the Service Registry.\n";
assert(false);
}
return service;
}
template <typename Service>
bool hasService(const std::string &serviceName) {
if (!exatn::exatnFrameworkInitialized) {
std::cerr << "ExaTN is not initialized: Please execute "
if(!exatn::exatnFrameworkInitialized) {
std::cerr << "#FATAL(exatn::service): Unable to check service "+serviceName << std::endl <<
"ExaTN is not initialized: Please execute "
"exatn::initialize() before using its API.\n";
assert(false);
}
return serviceRegistry->hasService<Service>(serviceName);
}
......
/** ExaTN::Numerics: Numerical server
REVISION: 2019/08/26
REVISION: 2019/09/02
Copyright (C) 2018-2019 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2019 Oak Ridge National Laboratory (UT-Battelle) **/
......@@ -59,8 +59,8 @@ public:
NumServer();
NumServer(const NumServer &) = delete;
NumServer & operator=(const NumServer &) = delete;
NumServer(NumServer &&) noexcept = default;
NumServer & operator=(NumServer &&) noexcept = default;
NumServer(NumServer &&) noexcept = delete;
NumServer & operator=(NumServer &&) noexcept = delete;
~NumServer() = default;
/** Reconfigures tensor runtime implementation. **/
......
......@@ -24,21 +24,23 @@ public:
*/
void Start(BundleContext context) {
auto gex1 = std::make_shared<exatn::runtime::EagerGraphExecutor>();
auto gex2 = std::make_shared<exatn::runtime::LazyGraphExecutor>();
context.RegisterService<exatn::runtime::TensorGraphExecutor>(gex1);
context.RegisterService<exatn::runtime::TensorGraphExecutor>(gex2);
auto nex1 = std::make_shared<exatn::runtime::TalshNodeExecutor>();
auto nex2 = std::make_shared<exatn::runtime::ExatensorNodeExecutor>();
context.RegisterService<exatn::runtime::TensorNodeExecutor>(nex1);
context.RegisterService<exatn::runtime::TensorNodeExecutor>(nex2);
//Activate tensor graph (DAG) executors:
context.RegisterService<exatn::runtime::TensorGraphExecutor>(
std::make_shared<exatn::runtime::EagerGraphExecutor>()
);
context.RegisterService<exatn::runtime::TensorGraphExecutor>(
std::make_shared<exatn::runtime::LazyGraphExecutor>()
);
//Activate tensor graph (DAG) node executors:
context.RegisterService<exatn::runtime::TensorNodeExecutor>(
std::make_shared<exatn::runtime::TalshNodeExecutor>()
);
context.RegisterService<exatn::runtime::TensorNodeExecutor>(
std::make_shared<exatn::runtime::ExatensorNodeExecutor>()
);
}
/**
*/
void Stop(BundleContext /*context*/) {}
};
......
......@@ -3,6 +3,12 @@
namespace exatn {
namespace runtime {
void ExatensorNodeExecutor::initialize()
{
return;
}
int ExatensorNodeExecutor::execute(numerics::TensorOpCreate & op,
TensorOpExecHandle * exec_handle)
{
......
/** ExaTN:: Tensor Runtime: Tensor graph node executor: Exatensor
REVISION: 2019/09/01
REVISION: 2019/09/02
Copyright (C) 2018-2019 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2019 Oak Ridge National Laboratory (UT-Battelle)
......@@ -27,6 +27,8 @@ public:
ExatensorNodeExecutor & operator=(ExatensorNodeExecutor &&) noexcept = delete;
~ExatensorNodeExecutor() = default;
void initialize() override;
int execute(numerics::TensorOpCreate & op,
TensorOpExecHandle * exec_handle) override;
int execute(numerics::TensorOpDestroy & op,
......
......@@ -6,40 +6,44 @@
namespace exatn {
namespace runtime {
int TalshNodeExecutor::talsh_initialized_ = 0;
bool TalshNodeExecutor::talsh_initialized_ = false;
int TalshNodeExecutor::talsh_node_exec_count_ = 0;
std::mutex talsh_init_lock;
TalshNodeExecutor::TalshNodeExecutor()
void TalshNodeExecutor::initialize()
{
talsh_init_lock.lock();
if(talsh_initialized_ == 0){
if(!talsh_initialized_){
std::size_t host_buffer_size = 1024*1024*1024; //`Get max Host memory from OS
auto error_code = talsh::initialize(&host_buffer_size);
if(error_code == TALSH_SUCCESS){
std::cout << "#DEBUG(exatn::runtime::TalshNodeExecutor): TAL-SH initialized with Host buffer size of " <<
host_buffer_size << " Bytes" << std::endl;
//std::cout << "#DEBUG(exatn::runtime::TalshNodeExecutor): TAL-SH initialized with Host buffer size of " <<
//host_buffer_size << " Bytes" << std::endl << std::flush;
talsh_initialized_ = true;
}else{
std::cout << "#FATAL(exatn::runtime::TalshNodeExecutor): Unable to initialize TAL-SH!" << std::endl;
std::cerr << "#FATAL(exatn::runtime::TalshNodeExecutor): Unable to initialize TAL-SH!" << std::endl;
assert(false);
}
}
++talsh_initialized_;
++talsh_node_exec_count_;
talsh_init_lock.unlock();
return;
}
TalshNodeExecutor::~TalshNodeExecutor()
{
talsh_init_lock.lock();
--talsh_initialized_;
if(talsh_initialized_ == 0){
--talsh_node_exec_count_;
if(talsh_initialized_ && talsh_node_exec_count_ == 0){
auto error_code = talsh::shutdown();
if(error_code == TALSH_SUCCESS){
std::cout << "#DEBUG(exatn::runtime::TalshNodeExecutor): TAL-SH shut down" << std::endl;
//std::cout << "#DEBUG(exatn::runtime::TalshNodeExecutor): TAL-SH shut down" << std::endl << std::flush;
talsh_initialized_ = false;
}else{
std::cout << "#FATAL(exatn::runtime::TalshNodeExecutor): Unable to shut down TAL-SH!" << std::endl;
std::cerr << "#FATAL(exatn::runtime::TalshNodeExecutor): Unable to shut down TAL-SH!" << std::endl;
assert(false);
}
}
......
/** ExaTN:: Tensor Runtime: Tensor graph node executor: Talsh
REVISION: 2019/09/01
REVISION: 2019/09/02
Copyright (C) 2018-2019 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2019 Oak Ridge National Laboratory (UT-Battelle)
......@@ -25,13 +25,15 @@ class TalshNodeExecutor : public TensorNodeExecutor {
public:
TalshNodeExecutor();
TalshNodeExecutor() = default;
TalshNodeExecutor(const TalshNodeExecutor &) = delete;
TalshNodeExecutor & operator=(const TalshNodeExecutor &) = delete;
TalshNodeExecutor(TalshNodeExecutor &&) noexcept = delete;
TalshNodeExecutor & operator=(TalshNodeExecutor &&) noexcept = delete;
~TalshNodeExecutor();
void initialize() override;
int execute(numerics::TensorOpCreate & op,
TensorOpExecHandle * exec_handle) override;
int execute(numerics::TensorOpDestroy & op,
......@@ -57,7 +59,9 @@ protected:
/** Active execution handles associated with tensor operations currently executed by TAL-SH **/
std::unordered_map<TensorOpExecHandle,std::shared_ptr<talsh::TensorTask>> tasks_;
/** TAL-SH initialization status **/
static int talsh_initialized_; //number of active TAL-SH node executors
static bool talsh_initialized_;
/** Number of instances of TAL-SH node executors **/
static int talsh_node_exec_count_;
};
......
/** ExaTN:: Tensor Runtime: Tensor graph executor
REVISION: 2019/08/26
REVISION: 2019/09/02
Copyright (C) 2018-2019 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2019 Oak Ridge National Laboratory (UT-Battelle)
......@@ -41,9 +41,10 @@ public:
TensorGraphExecutor & operator=(TensorGraphExecutor &&) noexcept = delete;
~TensorGraphExecutor() = default;
/** Resets the DAG node executor (tensor operation executor). **/
/** Sets/resets the DAG node executor (tensor operation executor). **/
void resetNodeExecutor(std::shared_ptr<TensorNodeExecutor> node_executor) {
node_executor_ = node_executor;
if(node_executor_) node_executor_->initialize();
return;
}
......
/** ExaTN:: Tensor Runtime: Tensor graph node executor
REVISION: 2019/08/30
REVISION: 2019/09/02
Copyright (C) 2018-2019 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2019 Oak Ridge National Laboratory (UT-Battelle)
......@@ -32,6 +32,9 @@ class TensorNodeExecutor : public Identifiable, public Cloneable<TensorNodeExecu
public:
/** Explicitly initializes the underlying numerical service, if needed **/
virtual void initialize() = 0;
/** Executes the tensor operation found in a DAG node asynchronously,
returning the execution handle in exec_handle that can later be
used for testing for completion of the operation execution.
......
......@@ -6,11 +6,14 @@
namespace exatn {
namespace runtime {
TensorRuntime::TensorRuntime(const std::string & graph_executor_name, const std::string & node_executor_name):
TensorRuntime::TensorRuntime(const std::string & graph_executor_name,
const std::string & node_executor_name):
graph_executor_name_(graph_executor_name), node_executor_name_(node_executor_name),
current_dag_(nullptr), executing_(false), alive_(false)
{
graph_executor_ = exatn::getService<TensorGraphExecutor>(graph_executor_name);
graph_executor_->resetNodeExecutor(exatn::getService<TensorNodeExecutor>(node_executor_name));
graph_executor_ = exatn::getService<TensorGraphExecutor>(graph_executor_name_);
std::cout << "#DEBUG(exatn::runtime::TensorRuntime)[MAIN_THREAD]: DAG executor set to "
<< graph_executor_name_ << "+" << node_executor_name_ << std::endl << std::flush;
launchExecutionThread();
}
......@@ -19,9 +22,9 @@ TensorRuntime::~TensorRuntime()
{
if(alive_.load()){
alive_.store(false); //signal for the execution thread to finish
std::cout << "#DEBUG(exatn::runtime::TensorRuntime)[MAIN_THREAD]: Waiting Execution Thread ... ";
std::cout << "#DEBUG(exatn::runtime::TensorRuntime)[MAIN_THREAD]: Waiting Execution Thread ... " << std::flush;
exec_thread_.join(); //wait until the execution thread has finished
std::cout << "Joined" << std::endl;
std::cout << "Joined" << std::endl << std::flush;
}
}
......@@ -30,9 +33,9 @@ void TensorRuntime::launchExecutionThread()
{
if(!(alive_.load())){
alive_.store(true);
std::cout << "#DEBUG(exatn::runtime::TensorRuntime)[MAIN_THREAD]: Launching Execution Thread ... ";
std::cout << "#DEBUG(exatn::runtime::TensorRuntime)[MAIN_THREAD]: Launching Execution Thread ... " << std::flush;
exec_thread_ = std::thread(&TensorRuntime::executionThreadWorkflow,this);
std::cout << "Done" << std::endl;
std::cout << "Done" << std::endl << std::flush;
}
return; //only the main thread returns to the client
}
......@@ -40,12 +43,18 @@ void TensorRuntime::launchExecutionThread()
void TensorRuntime::executionThreadWorkflow()
{
graph_executor_->resetNodeExecutor(exatn::getService<TensorNodeExecutor>(node_executor_name_));
//std::cout << "#DEBUG(exatn::runtime::TensorRuntime)[EXEC_THREAD]: DAG node executor set to "
//<< node_executor_name_ << std::endl << std::flush;
while(alive_.load()){
if(executing_.load()){ //executing_ is set to TRUE by the main thread when new operations are submitted
graph_executor_->execute(*current_dag_);
executing_.store(false); //executing_ is set to FALSE by the execution thread
}
}
graph_executor_->resetNodeExecutor(std::shared_ptr<TensorNodeExecutor>(nullptr));
//std::cout << "#DEBUG(exatn::runtime::TensorRuntime)[EXEC_THREAD]: DAG node executor reset. End of life."
//<< std::endl << std::flush;
return; //end of execution thread life
}
......
/** ExaTN:: Tensor Runtime: Task-based execution layer for tensor operations
REVISION: 2019/09/01
REVISION: 2019/09/02
Copyright (C) 2018-2019 Tiffany Mintz, Dmitry Lyakh, Alex McCaskey
Copyright (C) 2018-2019 Oak Ridge National Laboratory (UT-Battelle)
......@@ -109,6 +109,10 @@ protected:
/** The execution thread lives here **/
void executionThreadWorkflow();
/** Tensor graph (DAG) executor name **/
std::string graph_executor_name_;
/** Tensor graph (DAG) node executor name **/
std::string node_executor_name_;
/** Current tensor graph (DAG) executor **/
std::shared_ptr<TensorGraphExecutor> graph_executor_;
/** Active execution graphs (DAGs) **/
......
......@@ -62,6 +62,7 @@ TEST(TensorRuntimeTester, checkSimple) {
exatn::numericalServer->submit(create_tensor1);
exatn::numericalServer->submit(create_tensor2);
exatn::numericalServer->submit(contract_tensors);
auto synced = exatn::numericalServer->sync(*tensor0,true); assert(synced);
exatn::numericalServer->submit(destroy_tensor2);
exatn::numericalServer->submit(destroy_tensor1);
exatn::numericalServer->submit(destroy_tensor0);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment