Commit a60de3da authored by Dmitry I. Lyakh's avatar Dmitry I. Lyakh

Fixed the use of atomic attributes in DAG nodes.

parent 77b44c8b
Pipeline #68921 passed with stage
in 5 minutes and 9 seconds
......@@ -107,6 +107,7 @@ std::vector<VertexIdType> DirectedBoostGraph::getNeighborList(VertexIdType verte
}
unlock();
return l;
}
......@@ -133,6 +134,7 @@ void DirectedBoostGraph::computeShortestPath(VertexIdType startIndex,
for (const auto & pi: p) paths.push_back(pi);
unlock();
return;
}
......
/** ExaTN:: Tensor Runtime: Directed acyclic graph of tensor operations
REVISION: 2019/07/29
REVISION: 2019/08/26
Copyright (C) 2018-2019 Tiffany Mintz, Dmitry Lyakh, Alex McCaskey
Copyright (C) 2018-2019 Oak Ridge National Laboratory (UT-Battelle)
......@@ -10,9 +10,9 @@ Rationale:
dependencies between them: A directed edge from node1 to
node2 indicates that node1 depends on node2. Each DAG node
has its unique integer vertex id (VertexIdType) returned
when the node is added to the DAG.
when the node is appended to the DAG.
(b) The tensor graph contains:
1. The DAG implementation (in the directed Boost graph subclass);
1. The DAG implementation (DirectedBoostGraph subclass);
2. The DAG execution state (TensorExecState data member).
**/
......
/** ExaTN:: Tensor Runtime: Tensor graph execution state
REVISION: 2019/08/04
REVISION: 2019/08/26
Copyright (C) 2018-2019 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2019 Oak Ridge National Laboratory (UT-Battelle)
......@@ -8,7 +8,7 @@ Copyright (C) 2018-2019 Oak Ridge National Laboratory (UT-Battelle)
#include "tensor_exec_state.hpp"
#include <iostream>
#include <assert.h>
#include <cassert>
namespace exatn {
namespace runtime {
......@@ -19,8 +19,9 @@ const std::vector<VertexIdType> * TensorExecState::getTensorEpochNodes(const Ten
auto tens_hash = tensor.getTensorHash();
auto iter = tensor_info_.find(tens_hash);
if(iter == tensor_info_.end()) return nullptr;
*epoch = (iter->second).rw_epoch;
return &((iter->second).rw_epoch_nodes);
auto & tens_info = *(iter->second);
*epoch = tens_info.rw_epoch.load();
return &(tens_info.rw_epoch_nodes);
}
int TensorExecState::registerTensorRead(const Tensor & tensor, VertexIdType node_id)
......@@ -28,13 +29,13 @@ int TensorExecState::registerTensorRead(const Tensor & tensor, VertexIdType node
auto tens_hash = tensor.getTensorHash();
auto iter = tensor_info_.find(tens_hash);
if(iter == tensor_info_.end()){
auto pos = tensor_info_.emplace(std::make_pair(tens_hash,TensorExecInfo()));
auto pos = tensor_info_.emplace(std::make_pair(tens_hash,std::make_shared<TensorExecInfo>()));
iter = pos.first;
}
auto & tens_info = iter->second;
if(tens_info.rw_epoch < 0){ //write epoch
auto & tens_info = *(iter->second);
if(tens_info.rw_epoch.load() < 0){ //write epoch
tens_info.rw_epoch_nodes.clear();
tens_info.rw_epoch = 0;
tens_info.rw_epoch.store(0);
}
tens_info.rw_epoch_nodes.emplace_back(node_id);
return ++(tens_info.rw_epoch);
......@@ -45,13 +46,13 @@ int TensorExecState::registerTensorWrite(const Tensor & tensor, VertexIdType nod
auto tens_hash = tensor.getTensorHash();
auto iter = tensor_info_.find(tens_hash);
if(iter == tensor_info_.end()){
auto pos = tensor_info_.emplace(std::make_pair(tens_hash,TensorExecInfo()));
auto pos = tensor_info_.emplace(std::make_pair(tens_hash,std::make_shared<TensorExecInfo>()));
iter = pos.first;
}
auto & tens_info = iter->second;
if(tens_info.rw_epoch != 0){ //either read or write epoch
auto & tens_info = *(iter->second);
if(tens_info.rw_epoch.load() != 0){ //either read or write epoch
tens_info.rw_epoch_nodes.clear();
tens_info.rw_epoch = 0;
tens_info.rw_epoch.store(0);
}
tens_info.rw_epoch_nodes.emplace_back(node_id);
++(tens_info.update_count);
......@@ -63,7 +64,7 @@ std::size_t TensorExecState::registerWriteCompletion(const Tensor & tensor)
auto tens_hash = tensor.getTensorHash();
auto iter = tensor_info_.find(tens_hash);
assert(iter != tensor_info_.end());
return --((iter->second).update_count);
return --(iter->second->update_count);
}
std::size_t TensorExecState::getTensorUpdateCount(const Tensor & tensor)
......@@ -71,7 +72,7 @@ std::size_t TensorExecState::getTensorUpdateCount(const Tensor & tensor)
auto tens_hash = tensor.getTensorHash();
auto iter = tensor_info_.find(tens_hash);
if(iter == tensor_info_.end()) return 0;
return (iter->second).update_count;
return iter->second->update_count.load();
}
void TensorExecState::registerDependencyFreeNode(VertexIdType node_id)
......
/** ExaTN:: Tensor Runtime: Tensor graph execution state
REVISION: 2019/07/29
REVISION: 2019/08/26
Copyright (C) 2018-2019 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2019 Oak Ridge National Laboratory (UT-Battelle)
......@@ -10,7 +10,7 @@ Rationale:
dependencies between them: A directed edge from node1 to
node2 indicates that node1 depends on node2. Each DAG node
has its unique integer vertex id (VertexIdType) returned
when the node is added into the DAG.
when the node is appended into the DAG.
(b) The tensor graph contains:
1. The DAG implementation (DirectedBoostGraph subclass);
2. The DAG execution state (TensorExecState data member).
......@@ -26,8 +26,8 @@ Rationale:
current (write) epoch.
The execution state of a Tensor is progressing through alternating
read and write epochs, introducing read-after-write, write-after-write,
and write-after-read dependencies between tensor nodes sharing the same
data (Tensor).
and write-after-read dependencies between tensor nodes with stored
tensor operations operating on the same data (Tensor).
**/
#ifndef EXATN_RUNTIME_TENSOR_EXEC_STATE_HPP_
......@@ -39,6 +39,7 @@ Rationale:
#include <unordered_map>
#include <list>
#include <memory>
#include <atomic>
namespace exatn {
namespace runtime {
......@@ -57,19 +58,20 @@ class TensorExecState {
protected:
struct TensorExecInfo {
std::size_t update_count; //total number of outstanding updates on a given Tensor in the current DAG
int rw_epoch; //>0: number of current epoch reads; -1: current epoch write (single)
std::vector<VertexIdType> rw_epoch_nodes; //nodes participating in the current R/W epoch
std::atomic<std::size_t> update_count; //total number of outstanding updates on a given Tensor in the current DAG
std::atomic<int> rw_epoch; //>0: number of current epoch reads; -1: current epoch write (single)
std::vector<VertexIdType> rw_epoch_nodes; //nodes participating in the current R/W epoch (either read or write)
TensorExecInfo(): update_count(0), rw_epoch(0) {}
TensorExecInfo(const TensorExecInfo &) = default;
TensorExecInfo & operator=(const TensorExecInfo &) = default;
TensorExecInfo(TensorExecInfo &&) noexcept = default;
TensorExecInfo & operator=(TensorExecInfo &&) noexcept = default;
TensorExecInfo(const TensorExecInfo &) = delete;
TensorExecInfo & operator=(const TensorExecInfo &) = delete;
TensorExecInfo(TensorExecInfo &&) noexcept = delete;
TensorExecInfo & operator=(TensorExecInfo &&) noexcept = delete;
~TensorExecInfo() = default;
};
public:
TensorExecState() = default;
TensorExecState(const TensorExecState &) = delete;
TensorExecState & operator=(const TensorExecState &) = delete;
......@@ -109,10 +111,10 @@ public:
private:
/** Table for tracking the execution status of a given tensor:
Tensor Hash --> TensorExecInfo **/
std::unordered_map<TensorHashType,TensorExecInfo> tensor_info_;
std::unordered_map<TensorHashType,std::shared_ptr<TensorExecInfo>> tensor_info_;
/** List of dependency-free unexecuted DAG nodes **/
std::list<VertexIdType> nodes_ready_;
/** List of the currently executed DAG nodes **/
/** List of the DAG nodes being currently executed **/
std::list<VertexIdType> nodes_executing_;
};
......
/** ExaTN:: Tensor Runtime: Directed acyclic graph (DAG) of tensor operations
REVISION: 2019/08/04
REVISION: 2019/08/26
Copyright (C) 2018-2019 Tiffany Mintz, Dmitry Lyakh, Alex McCaskey
Copyright (C) 2018-2019 Oak Ridge National Laboratory (UT-Battelle)
......@@ -9,19 +9,21 @@ Rationale:
represent tensor operations (tasks) and directed edges represent
dependencies between the corresponding nodes (tensor operations).
Each DAG is associated with a uniquely named TAProL scope such that
all tensor operations submitted by the Client to the ExaTN numerics
all tensor operations submitted by the Client to the ExaTN numerical
server are forwarded into the DAG associated with the TaProL scope
in which the Client currently resides.
(b) The tensor graph contains:
1. The DAG implementation (in the directed Boost graph subclass);
1. The DAG implementation (in the DirectedBoostGraph subclass);
2. The DAG execution state (TensorExecState data member).
(c) DEVELOPERS ONLY: The TensorGraph object provides lock/unlock methods for concurrent update
of the DAG structure (by Client thread) and its execution state (by Execution thread).
Additionally each node of the TensorGraph (TensorOpNode object) provides more fine grain
locking mechanism (lock/unlock methods) for providing exclusive access to individual DAG nodes.
Public virtual methods of TensorGraph implemented in DirectedBoostGraph subclass perform
locking/unlocking from there. Other (non-virtual) public methods of TensorGraph perform
locking/unlocking from here.
Public virtual methods of TensorGraph implemented in the DirectedBoostGraph subclass
perform locking/unlocking from there. Other (non-virtual) public methods of TensorGraph
perform locking/unlocking from here. Additionally each node of the TensorGraph (TensorOpNode object)
provides more fine grained locking mechanism (lock/unlock methods) for providing exclusive access to
individual DAG nodes, which is only related to TensorOpNode.getOperation() method since it returns a
reference to the stored tensor operation (shared pointer reference), thus may require external locking
for securing an exclusive access to this data member of TensorOpNode.
**/
#ifndef EXATN_RUNTIME_TENSOR_GRAPH_HPP_
......@@ -38,7 +40,7 @@ Rationale:
#include <atomic>
#include <mutex>
#include <assert.h>
#include <cassert>
namespace exatn {
namespace runtime {
......@@ -57,25 +59,38 @@ public:
TensorOpNode(const TensorOpNode &) = delete;
TensorOpNode & operator=(const TensorOpNode &) = delete;
TensorOpNode(TensorOpNode &&) noexcept = default;
TensorOpNode & operator=(TensorOpNode &&) noexcept = default;
TensorOpNode(TensorOpNode &&) noexcept = delete;
TensorOpNode & operator=(TensorOpNode &&) noexcept = delete;
~TensorOpNode() = default;
/** Returns whether or not the TensorOpNode is dummy. **/
inline bool isDummy() const {return is_noop_;}
/** Returns a reference to the stored tensor operation. Note that
this function may require external locking of the TensorOpNode object
via the lock/unlock methods in order to provide an exclusive access. **/
inline std::shared_ptr<TensorOperation> & getOperation() {return op_;}
/** Returns the (unqiue) id of the tensor graph node. **/
inline VertexIdType getId() const {return id_;}
inline bool isDummy() const {return is_noop_;}
/** Returns TRUE if the tensor graph node is currently being executed. **/
inline bool isExecuting() {return executing_.load();}
/** Returns TRUE if the tensor graph node has been executed to completion. **/
inline bool isExecuted(int * error_code = nullptr) {
bool ans = executed_.load();
if(error_code != nullptr && ans) *error_code = error_;
if(error_code != nullptr && ans) *error_code = error_.load();
return ans;
}
/** Sets the (unique) id of the tensor graph node. **/
inline void setId(VertexIdType id) {
id_ = id;
return;
}
/** Marks the tensor graph node as being currently executed. **/
inline void setExecuting() {
auto executing = executing_.load();
auto executed = executed_.load();
......@@ -84,12 +99,14 @@ public:
return;
}
/** Marks the tensor graph node as executed to completion. **/
inline void setExecuted(int error_code = 0) {
auto executing = executing_.load();
auto executed = executed_.load();
assert(executing && !executed);
error_ = error_code;
executed_.store(true); executing_.store(false);
error_.store(error_code);
executing_.store(false);
executed_.store(true);
return;
}
......@@ -98,11 +115,11 @@ public:
protected:
std::shared_ptr<TensorOperation> op_; //stored tensor operation
VertexIdType id_; //vertex id
int error_; //execution error code (0:success)
bool is_noop_; //TRUE if the stored tensor operation is NOOP (dummy node)
std::atomic<bool> executing_; //TRUE if the stored tensor operation is currently being executed
std::atomic<bool> executed_; //TRUE if the stored tensor operation has been executed to completion
VertexIdType id_; //graph vertex id
std::atomic<int> error_; //execution error code (0:success)
std::atomic<bool> executing_; //TRUE if the stored tensor operation is currently being executed
std::atomic<bool> executed_; //TRUE if the stored tensor operation has been executed to completion
bool is_noop_; //TRUE if the stored tensor operation is NOOP (dummy node)
private:
std::recursive_mutex mtx_; //object access mutex
......@@ -120,60 +137,62 @@ public:
TensorGraph & operator=(TensorGraph &&) noexcept = default;
virtual ~TensorGraph() = default;
/** Adds a new node (tensor operation) to the DAG and returns its id **/
/** Adds a new node (tensor operation) into the DAG and returns its id. **/
virtual VertexIdType addOperation(std::shared_ptr<TensorOperation> op) = 0;
/** Adds a directed edge between dependent and dependee DAG nodes:
<dependent> depends on <dependee> (dependent --> dependee) **/
<dependent> depends on <dependee> (dependent --> dependee). **/
virtual void addDependency(VertexIdType dependent,
VertexIdType dependee) = 0;
/** Returns TRUE if there is a dependency between two DAG nodes:
If vertex_id1 node depends on vertex_id2 node **/
If vertex_id1 node depends on vertex_id2 node. **/
virtual bool dependencyExists(VertexIdType vertex_id1,
VertexIdType vertex_id2) = 0;
/** Returns the properties (TensorOpNode) of a given DAG node **/
/** Returns the properties (TensorOpNode) of a given DAG node (by reference).
This function may require external locking in order to provide an
exclusive access to the DAG node properties returned by reference. **/
virtual TensorOpNode & getNodeProperties(VertexIdType vertex_id) = 0;
/** Returns the number of nodes the given node is connected to **/
/** Returns the number of nodes the given node is connected to. **/
virtual std::size_t getNodeDegree(VertexIdType vertex_id) = 0;
/** Returns the total number of nodes in the DAG **/
/** Returns the total number of nodes in the DAG. **/
virtual std::size_t getNumNodes() = 0;
/** Returns the total number of dependencies (directed edges) in the DAG **/
/** Returns the total number of dependencies (directed edges) in the DAG. **/
virtual std::size_t getNumDependencies() = 0;
/** Returns the list of nodes connected to the given DAG node **/
/** Returns the list of nodes connected to the given DAG node. **/
virtual std::vector<VertexIdType> getNeighborList(VertexIdType vertex_id) = 0;
/** Computes the shortest path from the start index **/
/** Computes the shortest path from the start index. **/
virtual void computeShortestPath(VertexIdType startIndex,
std::vector<double> & distances,
std::vector<VertexIdType> & paths) = 0;
/** Clones an empty subclass instance (needed for plugin registry) **/
/** Clones an empty subclass instance (needed for plugin registry). **/
virtual std::shared_ptr<TensorGraph> clone() = 0;
/** Marks the DAG node as being executed **/
/** Marks the DAG node as being currently executed. **/
void setNodeExecuting(VertexIdType vertex_id) {
return getNodeProperties(vertex_id).setExecuting();
}
/** Marks the DAG node as executed to completion **/
/** Marks the DAG node as executed to completion. **/
void setNodeExecuted(VertexIdType vertex_id, int error_code = 0) {
return getNodeProperties(vertex_id).setExecuted(error_code);
}
/** Returns TRUE if the DAG node is currently being executed **/
/** Returns TRUE if the DAG node is currently being executed. **/
bool nodeExecuting(VertexIdType vertex_id) {
return getNodeProperties(vertex_id).isExecuting();
}
/** Returns TRUE if the DAG node has been executed to completion,
error_code will return the error code if executed. **/
error_code will return the error code (if executed). **/
bool nodeExecuted(VertexIdType vertex_id, int * error_code = nullptr) {
return getNodeProperties(vertex_id).isExecuted(error_code);
}
......
/** ExaTN:: Tensor Runtime: Task-based execution layer for tensor operations
REVISION: 2019/07/29
REVISION: 2019/08/26
Copyright (C) 2018-2019 Tiffany Mintz, Dmitry Lyakh, Alex McCaskey
Copyright (C) 2018-2019 Oak Ridge National Laboratory (UT-Battelle)
......@@ -9,15 +9,15 @@ Rationale:
represent tensor operations (tasks) and directed edges represent
dependencies between the corresponding nodes (tensor operations).
Each DAG is associated with a uniquely named TAProL scope such that
all tensor operations submitted by the Client to the ExaTN numerics
all tensor operations submitted by the Client to the ExaTN numerical
server are forwarded into the DAG associated with the TaProL scope
in which the Client currently resides.
(b) The DAG lifecycle:
openScope(name): Opens a new TAProL scope and creates its associated empty DAG.
The .submit method can then be used to append new tensor
operations into the current DAG. The actual execution
of the submitted tensor operations may start at any time
after submission (asynchronously).
operations or whole tensor networks into the current DAG.
The actual execution of the submitted tensor operations
is asynchronous and may start any time after submission.
pauseScope(): Completes the actual execution of all started tensor operations in the
current DAG and defers the execution of the rest of the DAG for later.
resumeScope(name): Pauses the execution of the currently active DAG (if any) and
......@@ -28,14 +28,14 @@ Rationale:
sync(tensor): Tests for completion of all submitted update operations on a given tensor.
(d) Upon creation, the TensorRuntime object spawns an execution thread which will be executing tensor
operations in the course of DAG traversal. The execution thread will be joined upon TensorRuntime
destruction. The main thread will return control to the client which will then be able to submit
new operations into the current DAG. The submitted operations will be autonomously executed by
the execution thread. The DAG execution policy is specified by a polymorphic TensorGraphExecutor
provided during the construction of the TensorRuntime. Correspondingly, the TensorGraphExecutor
contains a polymorphic TensorNodeExecutor responsible for the actual execution of stored tensor
operations via an associated computational backend. The concrete TensorNodeExecutor is also
specified during the construction of the TensorRuntime oject.
(e) DEVELOPERS ONLY: The TensorGraph object provides lock/unlock methods for concurrent update
destruction. After spawning the execution thread, the main thread returns control to the client
which will then be able to submit new operations into the current DAG. The submitted operations
will be autonomously executed by the execution thread. The DAG execution policy is specified by
a polymorphic TensorGraphExecutor provided during the construction of the TensorRuntime.
Correspondingly, the TensorGraphExecutor contains a polymorphic TensorNodeExecutor responsible
for the actual execution of submitted tensor operations via an associated computational backend.
The concrete TensorNodeExecutor is specified during the construction of the TensorRuntime oject.
(e) DEVELOPERS ONLY: The TensorGraph object (DAG) provides lock/unlock methods for concurrent update
of the DAG structure (by Client thread) and its execution state (by Execution thread).
Additionally each node of the TensorGraph (TensorOpNode object) provides more fine grain
locking mechanism (lock/unlock methods) for providing exclusive access to individual DAG nodes.
......@@ -58,7 +58,7 @@ Rationale:
namespace exatn {
namespace runtime {
class TensorRuntime {
class TensorRuntime final {
public:
TensorRuntime(const std::string & graph_executor_name = "eager-dag-executor", //DAG executor kind
......
......@@ -15,7 +15,7 @@
TEST(TensorRuntimeTester, checkSimple) {
// Implement tests here
//`Implement tests here
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment