graph_executor_lazy.cpp 11.7 KB
Newer Older
1
/** ExaTN:: Tensor Runtime: Tensor graph executor: Lazy
2
REVISION: 2021/12/22
3

4
5
Copyright (C) 2018-2021 Dmitry Lyakh
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle)
6
7
**/

8
9
#include "graph_executor_lazy.hpp"

10
11
#include "talshxx.hpp"

12
13
14
15
#ifdef CUQUANTUM
#include "cuquantum_executor.hpp"
#endif

16
17
18
#include <iostream>
#include <iomanip>

19
20
21
#include "errors.hpp"

//#define DEBUG
22

23
24
25
namespace exatn {
namespace runtime {

26
27
28
29
30
31
32
33
34
35
36
37
38
void LazyGraphExecutor::resetNodeExecutor(std::shared_ptr<TensorNodeExecutor> node_executor,
                                          const ParamConf & parameters,
                                          unsigned int process_rank,
                                          unsigned int global_process_rank)
{
 TensorGraphExecutor::resetNodeExecutor(node_executor,parameters,process_rank,global_process_rank);
#ifdef CUQUANTUM
 cuquantum_executor_ = std::make_shared<CuQuantumExecutor>();
#endif
 return;
}


39
void LazyGraphExecutor::execute(TensorGraph & dag) {
40

41
  struct Progress {
42
43
44
    VertexIdType num_nodes; //total number of nodes in the DAG (may grow)
    VertexIdType front;     //the first unexecuted node in the DAG
    VertexIdType current;   //the current node in the DAG
45
46
47
48
  };

  Progress progress{dag.getNumNodes(),dag.getFrontNode(),0};
  progress.current = progress.front;
49

50
51
52
  auto find_next_idle_node = [this,&dag,&progress] () {
    const auto prev_node = progress.current;
    progress.front = dag.getFrontNode();
53
    progress.num_nodes = dag.getNumNodes();
54
55
56
    if(progress.front < progress.num_nodes){
      ++progress.current;
      if(progress.current >= progress.num_nodes){
57
        progress.current = progress.front;
58
59
60
61
62
63
64
65
66
67
        if(progress.current == prev_node) ++progress.current;
      }else{
        if(progress.current >= (progress.front + this->getPipelineDepth())){
          progress.current = progress.front;
          if(progress.current == prev_node) ++progress.current;
        }
      }
      while(progress.current < progress.num_nodes){
        if(dag.nodeIdle(progress.current)) break;
        ++progress.current;
68
      }
69
70
    }else{ //all DAG nodes have been executed
      progress.current = progress.front; //end-of-DAG
71
    }
72
73
    progress.num_nodes = dag.getNumNodes(); //update DAG size again
    return (progress.current < progress.num_nodes && progress.current != prev_node);
74
75
  };

76
  auto inspect_node_dependencies = [this,&dag,&progress] () {
77
78
79
80
81
82
83
    bool ready_for_execution = false;
    if(progress.current < progress.num_nodes){
      auto & dag_node = dag.getNodeProperties(progress.current);
      ready_for_execution = dag_node.isIdle();
      if(ready_for_execution){ //node is idle
        ready_for_execution = ready_for_execution && dag.nodeDependenciesResolved(progress.current);
        if(ready_for_execution){ //all node dependencies resolved (or none)
84
85
          auto registered = dag.registerDependencyFreeNode(progress.current);
          if(registered && logging_.load() > 1) logfile_ << "DAG node detected with all dependencies resolved: " << progress.current << std::endl;
86
87
88
89
90
91
92
        }else{ //node still has unresolved dependencies, try prefetching
          if(progress.current < (progress.front + this->getPrefetchDepth())){
            auto prefetching = this->node_executor_->prefetch(*(dag_node.getOperation()));
            if(logging_.load() != 0 && prefetching){
              logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
                       << "](LazyGraphExecutor)[EXEC_THREAD]: Initiated prefetch for tensor operation "
                       << progress.current << std::endl;
93
#ifdef DEBUG
94
95
              logfile_.flush();
#endif
96
97
            }
          }
98
99
100
        }
      }
    }
101
102
103
    return ready_for_execution;
  };

104
  auto issue_ready_node = [this,&dag,&progress] () {
105
    if(logging_.load() > 2){
106
107
108
109
110
      logfile_ << "DAG current list of dependency free nodes:";
      auto free_nodes = dag.getDependencyFreeNodes();
      for(const auto & node: free_nodes) logfile_ << " " << node;
      logfile_ << std::endl;
    }
111
112
113
114
115
    VertexIdType node;
    bool issued = dag.extractDependencyFreeNode(&node);
    if(issued){
      auto & dag_node = dag.getNodeProperties(node);
      auto op = dag_node.getOperation();
116
117
118
119
120
121
122
123
      if(logging_.load() != 0){
        logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
                 << "](LazyGraphExecutor)[EXEC_THREAD]: Submitting tensor operation "
                 << node << ": Opcode = " << static_cast<int>(op->getOpcode());
        if(logging_.load() > 1){
          logfile_ << ": Details:" << std::endl;
          op->printItFile(logfile_);
        }
124
#ifdef DEBUG
125
126
        logfile_.flush();
#endif
127
      }
128
      dag.setNodeExecuting(node);
129
130
131
      op->recordStartTime();
      TensorOpExecHandle exec_handle;
      auto error_code = op->accept(*(this->node_executor_),&exec_handle);
132
      if(logging_.load() != 0) logfile_ << ": Status = " << error_code;
133
      if(error_code == 0){ //tensor operation submitted for execution successfully
134
        if(logging_.load() != 0) logfile_ << ": Syncing ... ";
135
        auto synced = this->node_executor_->sync(exec_handle,&error_code,serialize_.load());
136
137
138
139
        if(synced){ //tensor operation has completed immediately
          op->recordFinishTime();
          dag.setNodeExecuted(node,error_code);
          if(error_code == 0){
140
141
142
            if(logging_.load() != 0){
              logfile_ << "Success [" << std::fixed << std::setprecision(6)
                       << exatn::Timer::timeInSecHR(getTimeStampStart()) << "]" << std::endl;
143
144
              logfile_ << "[" << exatn::Timer::timeInSecHR(getTimeStampStart()) << "]"
                       << " Total Flop count = " << getTotalFlopCount() << std::endl;
145
#ifdef DEBUG
146
              logfile_.flush();
147
#endif
148
            }
149
            op->dissociateTensorOperands();
150
            progress.num_nodes = dag.getNumNodes();
151
            auto progressed = dag.progressFrontNode(node);
152
153
154
155
156
157
158
159
160
161
            if(progressed){
              progress.front = dag.getFrontNode();
              while(progress.front < progress.num_nodes){
                if(!(dag.nodeExecuted(progress.front))) break;
                dag.progressFrontNode(progress.front);
                progress.front = dag.getFrontNode();
              }
            }
            if(progressed && logging_.load() > 1) logfile_ << "DAG front node progressed to "
              << progress.front << " out of total of " << progress.num_nodes << std::endl;
162
          }else{
163
164
165
166
167
            if(logging_.load() != 0){
              logfile_ << "Failed: Error " << error_code << " [" << std::fixed << std::setprecision(6)
                       << exatn::Timer::timeInSecHR(getTimeStampStart()) << "]" << std::endl;
              logfile_.flush();
            }
168
169
170
171
172
173
            std::cout << "#ERROR(exatn::TensorRuntime::GraphExecutorLazy): Immediate completion error for tensor operation "
             << node << " with execution handle " << exec_handle << ": Error " << error_code << std::endl << std::flush;
            assert(false); //`Do I need to handle this case gracefully?
          }
        }else{ //tensor operation is still executing asynchronously
          dag.registerExecutingNode(node,exec_handle);
174
          if(logging_.load() != 0) logfile_ << "Deferred" << std::endl;
175
176
        }
      }else{ //tensor operation not submitted due to either temporary resource shortage or fatal error
177
178
        auto discarded = this->node_executor_->discard(exec_handle);
        dag.setNodeIdle(node);
179
        auto registered = dag.registerDependencyFreeNode(node); assert(registered);
180
        issued = false;
181
182
183
        if(error_code == TRY_LATER){ //temporary shortage of resources
          if(logging_.load() != 0) logfile_ << ": Postponed" << std::endl;
        }else{ //fatal error
184
          if(logging_.load() != 0) logfile_.flush();
185
186
          std::cout << "#ERROR(exatn::TensorRuntime::GraphExecutorLazy): Failed to submit tensor operation "
           << node << " with execution handle " << exec_handle << ": Error " << error_code << std::endl << std::flush;
187
          assert(false); //`Do I need to handle this case gracefully?
188
189
190
191
192
193
        }
      }
    }
    return issued;
  };

194
  auto test_nodes_for_completion = [this,&dag,&progress] () {
195
196
197
198
    auto executing_nodes = dag.executingNodesBegin();
    while(executing_nodes != dag.executingNodesEnd()){
      int error_code;
      auto exec_handle = executing_nodes->second;
199
      auto synced = this->node_executor_->sync(exec_handle,&error_code,serialize_.load());
200
      if(synced){ //tensor operation has completed
201
202
203
204
205
        VertexIdType node;
        executing_nodes = dag.extractExecutingNode(executing_nodes,&node);
        auto & dag_node = dag.getNodeProperties(node);
        auto op = dag_node.getOperation();
        op->recordFinishTime();
206
        dag.setNodeExecuted(node,error_code);
207
        if(error_code == 0){
208
209
210
211
          if(logging_.load() != 0){
            logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
                     << "](LazyGraphExecutor)[EXEC_THREAD]: Synced tensor operation "
                     << node << ": Opcode = " << static_cast<int>(op->getOpcode()) << std::endl;
212
213
            logfile_ << "[" << exatn::Timer::timeInSecHR(getTimeStampStart()) << "]"
                       << " Total Flop count = " << getTotalFlopCount() << std::endl;
214
#ifdef DEBUG
215
            logfile_.flush();
216
#endif
217
          }
218
          op->dissociateTensorOperands();
219
          progress.num_nodes = dag.getNumNodes();
220
          auto progressed = dag.progressFrontNode(node);
221
222
223
224
225
226
227
228
229
230
          if(progressed){
            progress.front = dag.getFrontNode();
            while(progress.front < progress.num_nodes){
              if(!(dag.nodeExecuted(progress.front))) break;
              dag.progressFrontNode(progress.front);
              progress.front = dag.getFrontNode();
            }
          }
          if(progressed && logging_.load() > 1) logfile_ << "DAG front node progressed to "
            << progress.front << " out of total of " << progress.num_nodes << std::endl;
231
        }else{
232
233
234
235
236
237
          if(logging_.load() != 0){
            logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
                     << "](LazyGraphExecutor)[EXEC_THREAD]: Failed to sync tensor operation "
                     << node << ": Opcode = " << static_cast<int>(op->getOpcode()) << std::endl;
            logfile_.flush();
          }
238
          std::cout << "#ERROR(exatn::TensorRuntime::GraphExecutorLazy): Deferred completion error for tensor operation "
239
           << node << " with execution handle " << exec_handle << ": Error " << error_code << std::endl << std::flush;
240
          assert(false); //`Do I need to handle this case gracefully?
241
        }
242
      }else{ //tensor operation has not completed yet
243
244
245
246
247
248
        ++executing_nodes;
      }
    }
    return;
  };

249
250
251
252
253
254
  if(logging_.load() != 0){
    logfile_ << "DAG entry list of dependency free nodes:";
    auto free_nodes = dag.getDependencyFreeNodes();
    for(const auto & node: free_nodes) logfile_ << " " << node;
    logfile_ << std::endl << std::flush;
  }
255
256
257
258
259
260
261
262
263
264
265
  bool not_done = (progress.front < progress.num_nodes);
  while(not_done){
    //Try to issue all idle DAG nodes that are ready for execution:
    while(issue_ready_node());
    //Inspect whether the current node can be issued:
    auto node_ready = inspect_node_dependencies();
    //Test the currently executing DAG nodes for completion:
    test_nodes_for_completion();
    //Find the next idle DAG node:
    not_done = find_next_idle_node() || (progress.front < progress.num_nodes);
  }
266
  return;
267
268
}

269
270

void LazyGraphExecutor::execute(TensorNetworkQueue & tensor_network_queue) {
271
272
273
#ifdef CUQUANTUM
  //`Implement
#endif
274
275
276
  return;
}

277
278
} //namespace runtime
} //namespace exatn