tensor_runtime.hpp 10.6 KB
Newer Older
1
/** ExaTN:: Tensor Runtime: Task-based execution layer for tensor operations
2
REVISION: 2021/12/22
3

4
5
Copyright (C) 2018-2021 Dmitry Lyakh, Tiffany Mintz, Alex McCaskey
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle)
6
7

Rationale:
8
9
 (a) The execution space consists of one or more DAGs in which nodes
     represent tensor operations (tasks) and directed edges represent
10
     dependencies between the corresponding nodes (tensor operations).
11
     Each DAG is associated with a uniquely named TAProL scope such that
12
     all tensor operations submitted by the Client to the ExaTN numerical
13
14
     server are forwarded into the DAG associated with the TaProL scope
     in which the Client currently resides.
15
16
17
 (b) The DAG lifecycle:
     openScope(name): Opens a new TAProL scope and creates its associated empty DAG.
                      The .submit method can then be used to append new tensor
18
19
20
                      operations or whole tensor networks into the current DAG.
                      The actual execution of the submitted tensor operations
                      is asynchronous and may start any time after submission.
21
     pauseScope(): Completes the actual execution of all started tensor operations in the
22
                   current DAG and defers the execution of the rest of the DAG for later.
23
24
     resumeScope(name): Pauses the execution of the currently active DAG (if any) and
                        resumes the execution of a previously paused DAG, making it current.
25
26
     closeScope(): Completes all tensor operations in the current DAG and destroys it.
 (c) submit(TensorOperation): Submits a tensor operation for (generally deferred) execution.
27
28
     sync(TensorOperation): Tests for completion of a specific tensor operation.
     sync(tensor): Tests for completion of all submitted update operations on a given tensor.
29
 (d) Upon creation, the TensorRuntime object spawns an execution thread which will be executing tensor
30
     operations in the course of DAG traversal. The execution thread will be joined upon TensorRuntime
31
32
33
34
35
36
37
38
     destruction. After spawning the execution thread, the main thread returns control to the client
     which will then be able to submit new operations into the current DAG. The submitted operations
     will be autonomously executed by the execution thread. The DAG execution policy is specified by
     a polymorphic TensorGraphExecutor provided during the construction of the TensorRuntime.
     Correspondingly, the TensorGraphExecutor contains a polymorphic TensorNodeExecutor responsible
     for the actual execution of submitted tensor operations via an associated computational backend.
     The concrete TensorNodeExecutor is specified during the construction of the TensorRuntime oject.
 (e) DEVELOPERS ONLY: The TensorGraph object (DAG) provides lock/unlock methods for concurrent update
39
40
41
     of the DAG structure (by Client thread) and its execution state (by Execution thread).
     Additionally each node of the TensorGraph (TensorOpNode object) provides more fine grain
     locking mechanism (lock/unlock methods) for providing exclusive access to individual DAG nodes.
42
43
**/

44
45
#ifndef EXATN_RUNTIME_TENSOR_RUNTIME_HPP_
#define EXATN_RUNTIME_TENSOR_RUNTIME_HPP_
46

47
#include "tensor_graph.hpp"
48
49
#include "tensor_network_queue.hpp"
#include "tensor_graph_executor.hpp"
50
51
52
#include "tensor_operation.hpp"
#include "tensor_method.hpp"

53
#include "param_conf.hpp"
54
#include "mpi_proxy.hpp"
55

56
#include <map>
57
#include <list>
58
#include <string>
59
#include <vector>
60
#include <memory>
61
62
#include <thread>
#include <atomic>
63
64
#include <future>
#include <mutex>
65
66
67
68

namespace exatn {
namespace runtime {

69
class TensorRuntime final {
Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
70

71
public:
72

73
  static constexpr std::size_t MAX_RUNTIME_DAG_SIZE = 8192; //max allowed DAG size during runtime
74

75
#ifdef MPI_ENABLED
76
  TensorRuntime(const MPICommProxy & communicator,                               //MPI communicator proxy
77
                const ParamConf & parameters,                                    //runtime configuration parameters
78
79
80
                const std::string & graph_executor_name = "eager-dag-executor",  //DAG executor kind
                const std::string & node_executor_name = "talsh-node-executor"); //DAG node executor kind
#else
81
82
  TensorRuntime(const ParamConf & parameters,                                    //runtime configuration parameters
                const std::string & graph_executor_name = "eager-dag-executor",  //DAG executor kind
83
                const std::string & node_executor_name = "talsh-node-executor"); //DAG node executor kind
84
#endif
85
86
  TensorRuntime(const TensorRuntime &) = delete;
  TensorRuntime & operator=(const TensorRuntime &) = delete;
87
88
  TensorRuntime(TensorRuntime &&) noexcept = delete;
  TensorRuntime & operator=(TensorRuntime &&) noexcept = delete;
89
  ~TensorRuntime();
90

91
92
93
  /** Resets the logging level (0:none) [MAIN THREAD]. **/
  void resetLoggingLevel(int level = 0);

94
95
96
97
  /** Enforces serialized (synchronized) execution of the DAG. **/
  void resetSerialization(bool serialize,
                          bool validation_trace = false);

98
99
100
  /** Activates/deactivates dry run (no actual computations). **/
  void activateDryRun(bool dry_run);

101
102
103
  /** Activates mixed-precision fast math on all devices (if available). **/
  void activateFastMath();

104
  /** Returns the Host memory buffer size in bytes provided by the executor. **/
105
  std::size_t getMemoryBufferSize() const;
106

107
108
109
  /** Returns the current value of the total Flop count executed by the executor. **/
  double getTotalFlopCount() const;

110
  /** Opens a new scope represented by a new execution graph (DAG). **/
111
  void openScope(const std::string & scope_name);
Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
112
113

  /** Pauses the current scope by completing all outstanding tensor operations
114
115
      and pausing the further progress of the current execution graph until resume.
      Returns TRUE upon successful pausing, FALSE otherwise. **/
Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
116
117
  void pauseScope();

118
  /** Resumes the execution of a previously paused scope (execution graph). **/
119
  void resumeScope(const std::string & scope_name);
Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
120

121
122
  /** Closes the current scope, fully completing all tensor operations
      in the current execution graph. **/
123
124
  void closeScope();

125
  /** Returns TRUE if there is the current scope is set. **/
126
  inline bool currentScopeIsSet() const {return scope_set_.load();}
127

128
129
130
131
132
133
134
135
  /** Submits a tensor operation into the current execution graph and returns its integer id. **/
  VertexIdType submit(std::shared_ptr<TensorOperation> op); //in: tensor operation

#ifdef CUQUANTUM
  /** Submits an entire tensor network for processing as a whole. **/
  bool submit(std::shared_ptr<numerics::TensorNetwork> network, //in: tensor network
              TensorOpExecHandle * exec_handle = nullptr);      //out: assigned execution handle
#endif
136

Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
137
138
  /** Tests for completion of a given tensor operation.
      If wait = TRUE, it will block until completion. **/
139
  bool sync(TensorOperation & op,
140
            bool wait = true);
141

142
  /** Tests for completion of all outstanding update operations on a given tensor.
Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
143
      If wait = TRUE, it will block until completion. **/
144
  bool sync(const Tensor & tensor,
145
            bool wait = true);
146

147
  /** Tests for completion of all previously submitted tensor operations.
148
149
150
      If wait = TRUE, it will block until completion. **/
  bool sync(bool wait = true);

151
  /** Returns a locally stored tensor slice (talsh::Tensor) providing access to tensor elements.
152
153
154
155
      This slice will be extracted from the exatn::numerics::Tensor implementation as a copy.
      The returned future becomes ready once the execution thread has retrieved the slice copy. **/
  std::future<std::shared_ptr<talsh::Tensor>> getLocalTensor(std::shared_ptr<Tensor> tensor, //in: exatn::numerics::Tensor to get slice of (by copy)
                            const std::vector<std::pair<DimOffset,DimExtent>> & slice_spec); //in: tensor slice specification
156

157
private:
158
159
160
161
162
  /** Tensor data request **/
  class TensorDataReq{
  public:
   std::promise<std::shared_ptr<talsh::Tensor>> slice_promise_;
   std::vector<std::pair<DimOffset,DimExtent>> slice_specs_;
163
   std::shared_ptr<Tensor> tensor_;
164
165
166
167
168
169
170
171
172
173
174
175
176

   TensorDataReq(std::promise<std::shared_ptr<talsh::Tensor>> && slice_promise,
                 const std::vector<std::pair<DimOffset,DimExtent>> & slice_specs,
                 std::shared_ptr<Tensor> tensor):
    slice_promise_(std::move(slice_promise)), slice_specs_(slice_specs), tensor_(tensor){}

   TensorDataReq(const TensorDataReq & req) = delete;
   TensorDataReq & operator=(const TensorDataReq & req) = delete;
   TensorDataReq(TensorDataReq && req) noexcept = default;
   TensorDataReq & operator=(TensorDataReq && req) noexcept = default;
   ~TensorDataReq() = default;
  };

177
178
  /** Launches the execution thread which will be executing DAGs on the fly. **/
  void launchExecutionThread();
179
  /** The execution thread lives here. **/
180
  void executionThreadWorkflow();
181
182
183
184
185
  /** Processes all outstanding tensor data requests (by execution thread). **/
  void processTensorDataRequests();

  inline void lockDataReqQ(){data_req_mtx_.lock();}
  inline void unlockDataReqQ(){data_req_mtx_.unlock();}
186

187
188
  /** Runtime configuration parameters **/
  ParamConf parameters_;
189
190
191
192
  /** Tensor graph (DAG) executor name **/
  std::string graph_executor_name_;
  /** Tensor graph (DAG) node executor name **/
  std::string node_executor_name_;
193
  /** Total number of parallel processes in the dedicated MPI communicator **/
194
  int num_processes_;
195
  /** Rank of the current parallel process in the dedicated MPI communicator **/
196
  int process_rank_;
197
198
  /** Rank of the current parallel process in MPI_COMM_WORLD **/
  int global_process_rank_;
199
200
  /** Current tensor graph (DAG) executor **/
  std::shared_ptr<TensorGraphExecutor> graph_executor_;
201
202
  /** Active execution graphs (DAGs) **/
  std::map<std::string, std::shared_ptr<TensorGraph>> dags_;
203
204
  /** Name of the current scope (current DAG name) **/
  std::string current_scope_;
205
  /** Current DAG **/
206
  std::shared_ptr<TensorGraph> current_dag_; //pointer to the current DAG
207
208
  /** Tensor data request queue **/
  std::list<TensorDataReq> data_req_queue_;
209
210
  /** List of tensor networks submitted for processing as a whole **/
  TensorNetworkQueue tensor_network_queue_;
211
212
  /** Logging level (0:none) **/
  int logging_;
213
214
  /** Current executing status (whether or not the execution thread is active) **/
  std::atomic<bool> executing_; //TRUE while the execution thread is executing the current DAG
215
216
  /** Current scope status **/
  std::atomic<bool> scope_set_; //TRUE if the current scope is set
217
  /** End of life flag **/
218
  std::atomic<bool> alive_; //TRUE while the main thread is accepting new operations from Client
219
220
  /** Execution thread **/
  std::thread exec_thread_;
221
222
  /** Data request mutex **/
  std::mutex data_req_mtx_;
223
224
225
226
};

} // namespace runtime
} // namespace exatn
Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
227

228
#endif //EXATN_RUNTIME_TENSOR_RUNTIME_HPP_