tensor_runtime.hpp 11.4 KB
Newer Older
1
/** ExaTN:: Tensor Runtime: Task-based execution layer for tensor operations
2
REVISION: 2022/01/17
3

4
5
Copyright (C) 2018-2022 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle)
6
7

Rationale:
8
9
 (a) The execution space consists of one or more DAGs in which nodes
     represent tensor operations (tasks) and directed edges represent
10
     dependencies between the corresponding nodes (tensor operations).
11
     Each DAG is associated with a uniquely named TAProL scope such that
12
     all tensor operations submitted by the Client to the ExaTN numerical
13
14
     server are forwarded into the DAG associated with the TaProL scope
     in which the Client currently resides.
15
16
17
 (b) The DAG lifecycle:
     openScope(name): Opens a new TAProL scope and creates its associated empty DAG.
                      The .submit method can then be used to append new tensor
18
19
20
                      operations or whole tensor networks into the current DAG.
                      The actual execution of the submitted tensor operations
                      is asynchronous and may start any time after submission.
21
     pauseScope(): Completes the actual execution of all started tensor operations in the
22
                   current DAG and defers the execution of the rest of the DAG for later.
23
24
     resumeScope(name): Pauses the execution of the currently active DAG (if any) and
                        resumes the execution of a previously paused DAG, making it current.
25
26
     closeScope(): Completes all tensor operations in the current DAG and destroys it.
 (c) submit(TensorOperation): Submits a tensor operation for (generally deferred) execution.
27
28
     sync(TensorOperation): Tests for completion of a specific tensor operation.
     sync(tensor): Tests for completion of all submitted update operations on a given tensor.
29
 (d) Upon creation, the TensorRuntime object spawns an execution thread which will be executing tensor
30
     operations in the course of DAG traversal. The execution thread will be joined upon TensorRuntime
31
32
33
34
35
36
37
38
     destruction. After spawning the execution thread, the main thread returns control to the client
     which will then be able to submit new operations into the current DAG. The submitted operations
     will be autonomously executed by the execution thread. The DAG execution policy is specified by
     a polymorphic TensorGraphExecutor provided during the construction of the TensorRuntime.
     Correspondingly, the TensorGraphExecutor contains a polymorphic TensorNodeExecutor responsible
     for the actual execution of submitted tensor operations via an associated computational backend.
     The concrete TensorNodeExecutor is specified during the construction of the TensorRuntime oject.
 (e) DEVELOPERS ONLY: The TensorGraph object (DAG) provides lock/unlock methods for concurrent update
39
40
41
     of the DAG structure (by Client thread) and its execution state (by Execution thread).
     Additionally each node of the TensorGraph (TensorOpNode object) provides more fine grain
     locking mechanism (lock/unlock methods) for providing exclusive access to individual DAG nodes.
42
43
**/

44
45
#ifndef EXATN_RUNTIME_TENSOR_RUNTIME_HPP_
#define EXATN_RUNTIME_TENSOR_RUNTIME_HPP_
46

47
#include "tensor_graph.hpp"
48
49
#include "tensor_network_queue.hpp"
#include "tensor_graph_executor.hpp"
50
51
52
#include "tensor_operation.hpp"
#include "tensor_method.hpp"

53
#include "param_conf.hpp"
54
#include "mpi_proxy.hpp"
55

56
#include <map>
57
#include <list>
58
#include <string>
59
#include <vector>
60
#include <memory>
61
62
#include <thread>
#include <atomic>
63
64
#include <future>
#include <mutex>
65
66
67
68

namespace exatn {
namespace runtime {

69
class TensorRuntime final {
Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
70

71
public:
72

73
  static constexpr std::size_t MAX_RUNTIME_DAG_SIZE = 8192; //max allowed DAG size during runtime
74

75
#ifdef MPI_ENABLED
76
  TensorRuntime(const MPICommProxy & communicator,                               //MPI communicator proxy
77
                const ParamConf & parameters,                                    //runtime configuration parameters
78
79
80
                const std::string & graph_executor_name = "eager-dag-executor",  //DAG executor kind
                const std::string & node_executor_name = "talsh-node-executor"); //DAG node executor kind
#else
81
82
  TensorRuntime(const ParamConf & parameters,                                    //runtime configuration parameters
                const std::string & graph_executor_name = "eager-dag-executor",  //DAG executor kind
83
                const std::string & node_executor_name = "talsh-node-executor"); //DAG node executor kind
84
#endif
85
86
  TensorRuntime(const TensorRuntime &) = delete;
  TensorRuntime & operator=(const TensorRuntime &) = delete;
87
88
  TensorRuntime(TensorRuntime &&) noexcept = delete;
  TensorRuntime & operator=(TensorRuntime &&) noexcept = delete;
89
  ~TensorRuntime();
90

91
92
93
  /** Resets the logging level (0:none) [MAIN THREAD]. **/
  void resetLoggingLevel(int level = 0);

94
95
96
97
  /** Enforces serialized (synchronized) execution of the DAG. **/
  void resetSerialization(bool serialize,
                          bool validation_trace = false);

98
99
100
  /** Activates/deactivates dry run (no actual computations). **/
  void activateDryRun(bool dry_run);

101
102
103
  /** Activates mixed-precision fast math on all devices (if available). **/
  void activateFastMath();

104
  /** Returns the Host memory buffer size in bytes provided by the executor. **/
105
  std::size_t getMemoryBufferSize() const;
106

107
108
109
110
  /** Returns the current memory usage by all allocated tensors.
      Note that the returned value includes buffer fragmentation overhead. **/
  std::size_t getMemoryUsage(std::size_t * free_mem) const;

111
112
113
  /** Returns the current value of the total Flop count executed by the executor. **/
  double getTotalFlopCount() const;

114
  /** Opens a new scope represented by a new execution graph (DAG). **/
115
  void openScope(const std::string & scope_name);
Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
116
117

  /** Pauses the current scope by completing all outstanding tensor operations
118
119
      and pausing the further progress of the current execution graph until resume.
      Returns TRUE upon successful pausing, FALSE otherwise. **/
Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
120
121
  void pauseScope();

122
  /** Resumes the execution of a previously paused scope (execution graph). **/
123
  void resumeScope(const std::string & scope_name);
Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
124

125
126
  /** Closes the current scope, fully completing all tensor operations
      in the current execution graph. **/
127
128
  void closeScope();

129
  /** Returns TRUE if there is the current scope is set. **/
130
  inline bool currentScopeIsSet() const {return scope_set_.load();}
131

132
133
134
  /** Submits a tensor operation into the current execution graph and returns its integer id. **/
  VertexIdType submit(std::shared_ptr<TensorOperation> op); //in: tensor operation

Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
135
136
  /** Tests for completion of a given tensor operation.
      If wait = TRUE, it will block until completion. **/
137
  bool sync(TensorOperation & op,
138
            bool wait = true);
139

140
  /** Tests for completion of all outstanding update operations on a given tensor.
Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
141
      If wait = TRUE, it will block until completion. **/
142
  bool sync(const Tensor & tensor,
143
            bool wait = true);
144

145
  /** Tests for completion of all previously submitted tensor operations.
146
147
148
      If wait = TRUE, it will block until completion. **/
  bool sync(bool wait = true);

149
150
151
152
#ifdef CUQUANTUM
  /** Submits an entire tensor network for processing as a whole.
      The returned execution handle can be used for checking the status
      of the tensor network execution. Zero on return means unsuccessful submission. **/
153
154
155
156
  TensorOpExecHandle submit(std::shared_ptr<numerics::TensorNetwork> network, //in: tensor network
                            const MPICommProxy & communicator, //MPI communicator proxy
                            unsigned int num_processes, //in: number of executing processes
                            unsigned int process_rank); //in: rank of the current executing process
157
158
159
160
161
162
163
164

  /** Tests for completion of processing of a whole tensor network.
      A valid execution handle obtained during tensor network
      submission must be positive. **/
  bool syncNetwork(const TensorOpExecHandle exec_handle,
                   bool wait = true);
#endif

165
  /** Returns a locally stored tensor slice (talsh::Tensor) providing access to tensor elements.
166
167
168
169
      This slice will be extracted from the exatn::numerics::Tensor implementation as a copy.
      The returned future becomes ready once the execution thread has retrieved the slice copy. **/
  std::future<std::shared_ptr<talsh::Tensor>> getLocalTensor(std::shared_ptr<Tensor> tensor, //in: exatn::numerics::Tensor to get slice of (by copy)
                            const std::vector<std::pair<DimOffset,DimExtent>> & slice_spec); //in: tensor slice specification
170

171
private:
172
173
174
175
176
  /** Tensor data request **/
  class TensorDataReq{
  public:
   std::promise<std::shared_ptr<talsh::Tensor>> slice_promise_;
   std::vector<std::pair<DimOffset,DimExtent>> slice_specs_;
177
   std::shared_ptr<Tensor> tensor_;
178
179
180
181
182
183
184
185
186
187
188
189
190

   TensorDataReq(std::promise<std::shared_ptr<talsh::Tensor>> && slice_promise,
                 const std::vector<std::pair<DimOffset,DimExtent>> & slice_specs,
                 std::shared_ptr<Tensor> tensor):
    slice_promise_(std::move(slice_promise)), slice_specs_(slice_specs), tensor_(tensor){}

   TensorDataReq(const TensorDataReq & req) = delete;
   TensorDataReq & operator=(const TensorDataReq & req) = delete;
   TensorDataReq(TensorDataReq && req) noexcept = default;
   TensorDataReq & operator=(TensorDataReq && req) noexcept = default;
   ~TensorDataReq() = default;
  };

191
192
  /** Launches the execution thread which will be executing DAGs on the fly. **/
  void launchExecutionThread();
193
  /** The execution thread lives here. **/
194
  void executionThreadWorkflow();
195
196
197
198
199
  /** Processes all outstanding tensor data requests (by execution thread). **/
  void processTensorDataRequests();

  inline void lockDataReqQ(){data_req_mtx_.lock();}
  inline void unlockDataReqQ(){data_req_mtx_.unlock();}
200

201
202
  /** Runtime configuration parameters **/
  ParamConf parameters_;
203
204
205
206
  /** Tensor graph (DAG) executor name **/
  std::string graph_executor_name_;
  /** Tensor graph (DAG) node executor name **/
  std::string node_executor_name_;
207
  /** Total number of parallel processes in the dedicated MPI communicator **/
208
  int num_processes_;
209
  /** Rank of the current parallel process in the dedicated MPI communicator **/
210
  int process_rank_;
211
212
  /** Rank of the current parallel process in MPI_COMM_WORLD **/
  int global_process_rank_;
213
214
  /** Current tensor graph (DAG) executor **/
  std::shared_ptr<TensorGraphExecutor> graph_executor_;
215
216
  /** Active execution graphs (DAGs) **/
  std::map<std::string, std::shared_ptr<TensorGraph>> dags_;
217
218
  /** Name of the current scope (current DAG name) **/
  std::string current_scope_;
219
  /** Current DAG **/
220
  std::shared_ptr<TensorGraph> current_dag_; //pointer to the current DAG
221
222
  /** Tensor data request queue **/
  std::list<TensorDataReq> data_req_queue_;
223
224
  /** List of tensor networks submitted for processing as a whole **/
  TensorNetworkQueue tensor_network_queue_;
225
226
  /** Logging level (0:none) **/
  int logging_;
227
228
  /** Current executing status (whether or not the execution thread is active) **/
  std::atomic<bool> executing_; //TRUE while the execution thread is executing the current DAG
229
230
  /** Current scope status **/
  std::atomic<bool> scope_set_; //TRUE if the current scope is set
231
  /** End of life flag **/
232
  std::atomic<bool> alive_; //TRUE while the main thread is accepting new operations from Client
233
234
  /** Execution thread **/
  std::thread exec_thread_;
235
236
  /** Data request mutex **/
  std::mutex data_req_mtx_;
237
238
239
240
};

} // namespace runtime
} // namespace exatn
Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
241

242
#endif //EXATN_RUNTIME_TENSOR_RUNTIME_HPP_