num_server.cpp 143 KB
Newer Older
1
/** ExaTN::Numerics: Numerical server
2
REVISION: 2022/01/17
3

4
5
Copyright (C) 2018-2022 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2022 Oak Ridge National Laboratory (UT-Battelle) **/
6
7

#include "num_server.hpp"
8
#include "tensor_range.hpp"
9
#include "timers.hpp"
10

11
#include <unordered_set>
12
#include <complex>
13
#include <vector>
14
#include <stack>
15
#include <list>
16
#include <map>
17
#include <future>
18
#include <algorithm>
19

20
21
22
23
#ifdef MPI_ENABLED
#include "mpi.h"
#endif

24
#include <cstddef>
25

26
27
namespace exatn{

28
29
30
/** Numerical server (singleton) **/
std::shared_ptr<NumServer> numericalServer {nullptr}; //initialized by exatn::initialize()

31

32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
unsigned int subtensor_owner_id(unsigned int process_rank,
                                unsigned int num_processes,
                                unsigned long long subtensor_id,
                                unsigned long long num_subtensors)
{
 unsigned int owner_id;
 if(num_subtensors <= num_processes){
  auto bin = static_cast<unsigned long long>(process_rank) / num_subtensors;
  owner_id = bin * num_subtensors + subtensor_id; assert(owner_id < num_processes);
 }else{
  assert(num_subtensors % num_processes == 0);
  unsigned long long num_subtensors_per_process = num_subtensors / num_processes;
  owner_id = subtensor_id / num_subtensors_per_process;
 }
 return owner_id;
}


std::pair<unsigned long long, unsigned long long> owned_subtensors(unsigned int process_rank,
                                                                   unsigned int num_processes,
                                                                   unsigned long long num_subtensors)
{
 std::pair<unsigned long long, unsigned long long> subtensor_range;
 if(num_subtensors <= num_processes){
  unsigned long long subtensor_id = static_cast<unsigned long long>(process_rank) % num_subtensors;
  subtensor_range = std::make_pair(subtensor_id,subtensor_id);
 }else{
  assert(num_subtensors % num_processes == 0);
  unsigned long long num_subtensors_per_process = num_subtensors / num_processes;
  auto nspp = num_subtensors_per_process;
  unsigned long long num_minor_bits = 0;
  while(nspp >>= 1) ++num_minor_bits; assert(num_minor_bits > 0);
  unsigned long long subtensor_begin = static_cast<unsigned long long>(process_rank) << num_minor_bits;
  unsigned long long subtensor_end = subtensor_begin | ((1ULL << num_minor_bits) - 1);
  subtensor_range = std::make_pair(subtensor_begin,subtensor_end);
 }
 return subtensor_range;
}


72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
unsigned int replication_level(const ProcessGroup & process_group,
                               std::shared_ptr<Tensor> & tensor)
{
 unsigned int repl_level = 1;
 unsigned long long num_procs = process_group.getSize();
 unsigned long long num_subtensors = 1;
 if(tensor->isComposite()) num_subtensors = castTensorComposite(tensor)->getNumSubtensors();
 if(num_subtensors < num_procs){
  assert(num_procs % num_subtensors == 0);
  repl_level = num_procs / num_subtensors;
 }
 return repl_level;
}


87
#ifdef MPI_ENABLED
88
NumServer::NumServer(const MPICommProxy & communicator,
89
                     const ParamConf & parameters,
90
91
                     const std::string & graph_executor_name,
                     const std::string & node_executor_name):
92
93
 contr_seq_optimizer_("metis"), contr_seq_caching_(false), logging_(0), comp_backend_("default"),
 intra_comm_(communicator), validation_tracing_(false)
94
{
95
96
 int mpi_error = MPI_Comm_size(*(communicator.get<MPI_Comm>()),&num_processes_); assert(mpi_error == MPI_SUCCESS);
 mpi_error = MPI_Comm_rank(*(communicator.get<MPI_Comm>()),&process_rank_); assert(mpi_error == MPI_SUCCESS);
97
 mpi_error = MPI_Comm_rank(MPI_COMM_WORLD,&global_process_rank_); assert(mpi_error == MPI_SUCCESS);
98
 process_world_ = std::make_shared<ProcessGroup>(intra_comm_,num_processes_);
99
100
 std::vector<unsigned int> myself = {static_cast<unsigned int>(process_rank_)};
 process_self_ = std::make_shared<ProcessGroup>(MPICommProxy(MPI_COMM_SELF),myself);
101
102
103
104
105
 int64_t provided_buf_size; //bytes
 if(parameters.getParameter("host_memory_buffer_size",&provided_buf_size)){
  process_world_->resetMemoryLimitPerProcess(static_cast<std::size_t>(provided_buf_size));
  process_self_->resetMemoryLimitPerProcess(static_cast<std::size_t>(provided_buf_size));
 }
106
107
 default_tensor_mapper_ = std::shared_ptr<TensorMapper>(
  new CompositeTensorMapper(intra_comm_,process_rank_,num_processes_,process_world_->getMemoryLimitPerProcess(),tensors_));
108
 initBytePacket(&byte_packet_);
109
 space_register_ = getSpaceRegister(); assert(space_register_);
110
 tensor_op_factory_ = TensorOpFactory::get();
111
 mpi_error = MPI_Barrier(*(communicator.get<MPI_Comm>())); assert(mpi_error == MPI_SUCCESS);
112
 time_start_ = exatn::Timer::timeInSecHR();
113
 tensor_rt_ = std::move(std::make_shared<runtime::TensorRuntime>(communicator,parameters,graph_executor_name,node_executor_name));
114
 scopes_.push(std::pair<std::string,ScopeId>{"GLOBAL",0}); //GLOBAL scope 0 is automatically open (top scope)
115
 tensor_rt_->openScope("GLOBAL");
116
}
117
#else
118
119
NumServer::NumServer(const ParamConf & parameters,
                     const std::string & graph_executor_name,
120
                     const std::string & node_executor_name):
121
122
 contr_seq_optimizer_("metis"), contr_seq_caching_(false), logging_(0), comp_backend_("default"),
 validation_tracing_(false)
123
{
124
 num_processes_ = 1; process_rank_ = 0; global_process_rank_ = 0;
125
126
 process_world_ = std::make_shared<ProcessGroup>(intra_comm_,num_processes_); //intra-communicator is empty here
 std::vector<unsigned int> myself = {static_cast<unsigned int>(process_rank_)};
Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
127
 process_self_ = std::make_shared<ProcessGroup>(intra_comm_,myself); //intra-communicator is empty here
128
129
130
131
132
 int64_t provided_buf_size; //bytes
 if(parameters.getParameter("host_memory_buffer_size",&provided_buf_size)){
  process_world_->resetMemoryLimitPerProcess(static_cast<std::size_t>(provided_buf_size));
  process_self_->resetMemoryLimitPerProcess(static_cast<std::size_t>(provided_buf_size));
 }
133
134
 default_tensor_mapper_ = std::shared_ptr<TensorMapper>(
  new CompositeTensorMapper(process_rank_,num_processes_,process_world_->getMemoryLimitPerProcess(),tensors_));
135
 initBytePacket(&byte_packet_);
136
 space_register_ = getSpaceRegister(); assert(space_register_);
137
 tensor_op_factory_ = TensorOpFactory::get();
138
 time_start_ = exatn::Timer::timeInSecHR();
139
 tensor_rt_ = std::move(std::make_shared<runtime::TensorRuntime>(parameters,graph_executor_name,node_executor_name));
140
141
142
143
144
 scopes_.push(std::pair<std::string,ScopeId>{"GLOBAL",0}); //GLOBAL scope 0 is automatically open (top scope)
 tensor_rt_->openScope("GLOBAL");
}
#endif

145

146
147
NumServer::~NumServer()
{
148
149
150
 //Garbage collection:
 destroyOrphanedTensors();
 //Destroy composite tensors:
151
152
 auto iter = tensors_.begin();
 while(iter != tensors_.end()){
153
  if(iter->second->isComposite()){
154
155
   const auto tensor_name = iter->first;
   auto success = destroyTensorSync(tensor_name); assert(success);
156
157
158
159
160
161
162
163
   iter = tensors_.begin();
  }else{
   ++iter;
  }
 }
 //Destroy remaining simple tensors:
 iter = tensors_.begin();
 while(iter != tensors_.end()){
164
165
  const auto tensor_name = iter->first;
  auto success = destroyTensorSync(tensor_name); assert(success);
166
  iter = tensors_.begin();
167
 }
168
 //Close scope and clean:
169
 tensor_rt_->closeScope(); //contains sync() inside
170
 scopes_.pop();
171
 destroyBytePacket(&byte_packet_);
172
 resetClientLoggingLevel();
173
174
}

175
176

#ifdef MPI_ENABLED
177
void NumServer::reconfigureTensorRuntime(const MPICommProxy & communicator,
178
                                         const ParamConf & parameters,
179
180
181
                                         const std::string & dag_executor_name,
                                         const std::string & node_executor_name)
{
182
 while(!tensor_rt_);
183
 bool synced = tensor_rt_->sync(); assert(synced);
184
 tensor_rt_ = std::move(std::make_shared<runtime::TensorRuntime>(communicator,parameters,dag_executor_name,node_executor_name));
185
186
187
 return;
}
#else
188
189
void NumServer::reconfigureTensorRuntime(const ParamConf & parameters,
                                         const std::string & dag_executor_name,
190
191
                                         const std::string & node_executor_name)
{
192
 while(!tensor_rt_);
193
 bool synced = tensor_rt_->sync(); assert(synced);
194
 tensor_rt_ = std::move(std::make_shared<runtime::TensorRuntime>(parameters,dag_executor_name,node_executor_name));
195
 return;
196
}
197
#endif
198

199
200
void NumServer::switchComputationalBackend(const std::string & backend_name)
{
201
 //bool success = sync(); assert(success);
202
203
 if(logging_ > 0 && backend_name != comp_backend_){
  logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
204
           << "]: Switched computational backend to " << backend_name << std::endl << std::flush;
205
 }
206
207
208
209
210
211
212
 if(backend_name == "default"){
  comp_backend_ = backend_name;
#ifdef CUQUANTUM
 }else if(backend_name == "cuquantum"){
  comp_backend_ = backend_name;
#endif
 }else{
213
214
  std::cout << "#ERROR(exatn::NumServer): switchComputationalBackend: Unknown backend: "
            << backend_name << std::endl << std::flush;
215
216
217
218
219
  std::abort();
 }
 return;
}

220
void NumServer::resetContrSeqOptimizer(const std::string & optimizer_name, bool caching)
221
222
{
 contr_seq_optimizer_ = optimizer_name;
223
224
225
226
 contr_seq_caching_ = caching;
 return;
}

227
void NumServer::activateContrSeqCaching(bool persist)
228
{
229
 numerics::ContractionSeqOptimizer::activatePersistentCaching(persist);
230
231
232
233
234
235
 contr_seq_caching_ = true;
 return;
}

void NumServer::deactivateContrSeqCaching()
{
236
 numerics::ContractionSeqOptimizer::activatePersistentCaching(false);
237
 contr_seq_caching_ = false;
238
239
240
 return;
}

241
242
243
244
245
bool NumServer::queryContrSeqCaching() const
{
 return contr_seq_caching_;
}

246
247
void NumServer::resetClientLoggingLevel(int level){
 if(logging_ == 0){
248
  if(level != 0) logfile_.open("exatn_main_thread."+std::to_string(global_process_rank_)+".log", std::ios::out | std::ios::trunc);
249
250
251
252
253
254
255
 }else{
  if(level == 0) logfile_.close();
 }
 logging_ = level;
 return;
}

256
257
void NumServer::resetRuntimeLoggingLevel(int level)
{
258
 while(!tensor_rt_);
259
 bool synced = tensor_rt_->sync(); assert(synced);
260
 tensor_rt_->resetLoggingLevel(level);
261
262
263
 return;
}

264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
void NumServer::resetExecutionSerialization(bool serialize, bool validation_trace)
{
 while(!tensor_rt_);
 bool synced = tensor_rt_->sync(); assert(synced);
 validation_tracing_ = serialize && validation_trace;
 tensor_rt_->resetSerialization(serialize,validation_tracing_);
 if(logging_ > 0){
  logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
           << "]: DAG execution serialization = " << serialize
           << ": Validation tracing = " << validation_tracing_
           << "; Tensor runtime synced" << std::endl << std::flush;
 }
 synced = tensor_rt_->sync(); assert(synced);
 return;
}

280
281
282
283
284
285
286
287
288
289
290
291
292
void NumServer::activateDryRun(bool dry_run)
{
 while(!tensor_rt_);
 bool synced = tensor_rt_->sync(); assert(synced);
 tensor_rt_->activateDryRun(dry_run);
 if(logging_ > 0){
  logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
           << "]: Dry run activation status = " << dry_run << std::endl << std::flush;
 }
 synced = tensor_rt_->sync(); assert(synced);
 return;
}

293
294
295
296
297
298
299
300
301
302
303
304
305
void NumServer::activateFastMath()
{
 while(!tensor_rt_);
 bool synced = tensor_rt_->sync(); assert(synced);
 tensor_rt_->activateFastMath();
 if(logging_ > 0){
  logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
           << "]: Fast math activated (if available); Tensor runtime synced" << std::endl << std::flush;
 }
 synced = tensor_rt_->sync(); assert(synced);
 return;
}

306
std::size_t NumServer::getMemoryBufferSize() const
307
{
308
309
 while(!tensor_rt_);
 return tensor_rt_->getMemoryBufferSize();
310
311
}

312
313
314
315
316
317
std::size_t NumServer::getMemoryUsage(std::size_t * free_mem) const
{
 while(!tensor_rt_);
 return tensor_rt_->getMemoryUsage(free_mem);
}

318
319
320
321
322
323
double NumServer::getTotalFlopCount() const
{
 while(!tensor_rt_);
 return tensor_rt_->getTotalFlopCount();
}

324
325
326
327
328
const ProcessGroup & NumServer::getDefaultProcessGroup() const
{
 return *process_world_;
}

329
330
331
332
333
const ProcessGroup & NumServer::getCurrentProcessGroup() const
{
 return *process_self_;
}

334
335
336
337
338
339
340
341
342
int NumServer::getProcessRank(const ProcessGroup & process_group) const
{
 unsigned int local_rank;
 auto rank_is_in = process_group.rankIsIn(getProcessRank(),&local_rank);
 if(!rank_is_in) return -1;
 assert(local_rank >= 0);
 return static_cast<int>(local_rank);
}

343
344
345
346
347
int NumServer::getProcessRank() const
{
 return process_rank_;
}

348
349
350
351
352
int NumServer::getNumProcesses(const ProcessGroup & process_group) const
{
 return static_cast<int>(process_group.getSize());
}

353
354
355
356
357
int NumServer::getNumProcesses() const
{
 return num_processes_;
}

358
359
360
361
362
std::shared_ptr<TensorMapper> NumServer::getTensorMapper(const ProcessGroup & process_group) const
{
 unsigned int local_rank;
 bool rank_is_in_group = process_group.rankIsIn(process_rank_,&local_rank);
 assert(rank_is_in_group);
363
364
#ifdef MPI_ENABLED
 return std::shared_ptr<TensorMapper>(new CompositeTensorMapper(process_group.getMPICommProxy(),
365
         local_rank,process_group.getSize(),process_group.getMemoryLimitPerProcess(),tensors_));
366
#else
367
368
 return std::shared_ptr<TensorMapper>(new CompositeTensorMapper(local_rank,process_group.getSize(),
                                      process_group.getMemoryLimitPerProcess(),tensors_));
369
#endif
370
371
372
373
374
375
376
}

std::shared_ptr<TensorMapper> NumServer::getTensorMapper() const
{
 return default_tensor_mapper_;
}

377
void NumServer::registerTensorMethod(const std::string & tag, std::shared_ptr<TensorMethod> method)
378
{
379
380
 auto res = ext_methods_.insert({tag,method});
 if(!(std::get<1>(res))) std::cout << "#ERROR(NumServer::registerTensorMethod): Method already exists: " << tag << std::endl;
381
 assert(std::get<1>(res));
382
383
384
 return;
}

385
std::shared_ptr<TensorMethod> NumServer::getTensorMethod(const std::string & tag)
386
387
388
389
390
391
{
 return ext_methods_[tag];
}

void NumServer::registerExternalData(const std::string & tag, std::shared_ptr<BytePacket> packet)
{
392
393
394
 auto res = ext_data_.insert({tag,packet});
 if(!(std::get<1>(res))) std::cout << "#ERROR(NumServer::registerExternalData): Data already exists: " << tag << std::endl;
 assert(std::get<1>(res));
395
396
397
398
399
400
401
402
 return;
}

std::shared_ptr<BytePacket> NumServer::getExternalData(const std::string & tag)
{
 return ext_data_[tag];
}

403
404
405

ScopeId NumServer::openScope(const std::string & scope_name)
{
406
407
408
409
 assert(scope_name.length() > 0);
 ScopeId new_scope_id = scopes_.size();
 scopes_.push(std::pair<std::string,ScopeId>{scope_name,new_scope_id});
 return new_scope_id;
410
411
412
413
}

ScopeId NumServer::closeScope()
{
414
 assert(!scopes_.empty());
415
416
 const auto & prev_scope = scopes_.top();
 ScopeId prev_scope_id = std::get<1>(prev_scope);
417
 scopes_.pop();
418
 return prev_scope_id;
419
420
421
422
423
424
}


SpaceId NumServer::createVectorSpace(const std::string & space_name, DimExtent space_dim,
                                     const VectorSpace ** space_ptr)
{
425
 assert(space_name.length() > 0);
426
427
 SpaceId space_id = space_register_->registerSpace(std::make_shared<VectorSpace>(space_dim,space_name));
 if(space_ptr != nullptr) *space_ptr = space_register_->getSpace(space_id);
428
 return space_id;
429
430
431
432
}

void NumServer::destroyVectorSpace(const std::string & space_name)
{
433
 assert(false);
434
435
436
437
438
439
 //`Finish
 return;
}

void NumServer::destroyVectorSpace(SpaceId space_id)
{
440
 assert(false);
441
442
443
444
 //`Finish
 return;
}

445
446
const VectorSpace * NumServer::getVectorSpace(const std::string & space_name) const
{
447
 return space_register_->getSpace(space_name);
448
449
}

450
451
452

SubspaceId NumServer::createSubspace(const std::string & subspace_name,
                                     const std::string & space_name,
453
                                     std::pair<DimOffset,DimOffset> bounds,
454
455
                                     const Subspace ** subspace_ptr)
{
456
 assert(subspace_name.length() > 0 && space_name.length() > 0);
457
 const VectorSpace * space = space_register_->getSpace(space_name);
458
 assert(space != nullptr);
459
460
 SubspaceId subspace_id = space_register_->registerSubspace(std::make_shared<Subspace>(space,bounds,subspace_name));
 if(subspace_ptr != nullptr) *subspace_ptr = space_register_->getSubspace(space_name,subspace_name);
461
462
463
464
 auto res = subname2id_.insert({subspace_name,space->getRegisteredId()});
 if(!(res.second)) std::cout << "#ERROR(NumServer::createSubspace): Subspace already exists: " << subspace_name << std::endl;
 assert(res.second);
 return subspace_id;
465
466
467
468
}

void NumServer::destroySubspace(const std::string & subspace_name)
{
469
 assert(false);
470
471
472
473
474
475
 //`Finish
 return;
}

void NumServer::destroySubspace(SubspaceId subspace_id)
{
476
 assert(false);
477
478
479
480
 //`Finish
 return;
}

481
482
483
484
485
486
487
const Subspace * NumServer::getSubspace(const std::string & subspace_name) const
{
 assert(subspace_name.length() > 0);
 auto it = subname2id_.find(subspace_name);
 if(it == subname2id_.end()) std::cout << "#ERROR(NumServer::getSubspace): Subspace not found: " << subspace_name << std::endl;
 assert(it != subname2id_.end());
 SpaceId space_id = (*it).second;
488
 const VectorSpace * space = space_register_->getSpace(space_id);
489
490
491
 assert(space != nullptr);
 const std::string & space_name = space->getName();
 assert(space_name.length() > 0);
492
 return space_register_->getSubspace(space_name,subspace_name);
493
494
}

495
bool NumServer::submitOp(std::shared_ptr<TensorOperation> operation)
496
497
498
{
 bool submitted = false;
 if(operation){
499
  if(logging_ > 1 || (validation_tracing_ && logging_ > 0)){
500
501
502
503
504
   logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
            << "]: Submitting tensor operation:" << std::endl;
   operation->printItFile(logfile_);
   logfile_.flush(); //`Debug only
  }
505
  submitted = true;
506
  //Register/unregister tensor existence:
507
508
509
  if(operation->getOpcode() == TensorOpCode::CREATE){ //TENSOR_CREATE sets tensor element type for future references
   auto tensor = operation->getTensorOperand(0);
   auto elem_type = std::dynamic_pointer_cast<numerics::TensorOpCreate>(operation)->getTensorElementType();
510
511
512
513
514
515
   if(elem_type == TensorElementType::VOID){
    elem_type = tensor->getElementType();
    std::dynamic_pointer_cast<numerics::TensorOpCreate>(operation)->resetTensorElementType(elem_type);
   }else{
    tensor->setElementType(elem_type);
   }
516
   auto res = tensors_.emplace(std::make_pair(tensor->getName(),tensor)); //registers the tensor
517
   if(!(res.second)){
518
    std::cout << "#ERROR(exatn::NumServer::submitOp): Attempt to CREATE an already existing tensor "
519
              << tensor->getName() << std::endl << std::flush;
520
521
522
523
    submitted = false;
   }
  }else if(operation->getOpcode() == TensorOpCode::DESTROY){
   auto tensor = operation->getTensorOperand(0);
524
   auto num_deleted = tensors_.erase(tensor->getName()); //unregisters the tensor
525
   if(num_deleted != 1){
526
    std::cout << "#ERROR(exatn::NumServer::submitOp): Attempt to DESTROY a non-existing tensor "
527
              << tensor->getName() << std::endl << std::flush;
528
529
530
    submitted = false;
   }
  }
531
  //Submit tensor operation to tensor runtime:
532
  if(submitted) tensor_rt_->submit(operation);
533
  //Compute validation stamps for all output tensor operands, if needed (debug):
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
  if(validation_tracing_ && submitted){
   const auto opcode = operation->getOpcode();
   if(opcode != TensorOpCode::NOOP && opcode != TensorOpCode::CREATE && opcode != TensorOpCode::DESTROY){
    const auto num_out_operands = operation->getNumOperandsOut();
    if(logging_ > 0 && num_out_operands > 0) logfile_ << "#Validation stamp";
    for(unsigned int oper = 0; oper < num_out_operands; ++oper){
     auto functor_norm1 = std::shared_ptr<TensorMethod>(new numerics::FunctorNorm1());
     std::shared_ptr<TensorOperation> op_trace = tensor_op_factory_->createTensorOp(TensorOpCode::TRANSFORM);
     op_trace->setTensorOperand(operation->getTensorOperand(oper));
     std::dynamic_pointer_cast<numerics::TensorOpTransform>(op_trace)->resetFunctor(functor_norm1);
     submitted = tensor_rt_->submit(op_trace);
     if(submitted){
      submitted = tensor_rt_->sync(*op_trace);
      if(logging_ > 0 && submitted){
       double norm = std::dynamic_pointer_cast<numerics::FunctorNorm1>(functor_norm1)->getNorm();
       logfile_ << " " << norm;
      }
     }
    }
    if(logging_ > 0 && num_out_operands > 0) logfile_ << std::endl << std::flush;
   }
  }
556
 }
557
 return submitted;
558
}
559

Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
560
bool NumServer::submit(std::shared_ptr<TensorOperation> operation, std::shared_ptr<TensorMapper> tensor_mapper)
561
562
{
 bool success = true;
563
564
 std::stack<unsigned int> deltas;
 const auto opcode = operation->getOpcode();
565
 const auto num_operands = operation->getNumOperands();
566
 //Create and initialize implicit Kronecker Delta tensors:
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
 if(opcode != TensorOpCode::CREATE && opcode != TensorOpCode::DESTROY){
  auto elem_type = TensorElementType::VOID;
  for(unsigned int i = 0; i < num_operands; ++i){
   auto operand = operation->getTensorOperand(i);
   elem_type = operand->getElementType();
   if(elem_type != TensorElementType::VOID) break;
  }
  for(unsigned int i = 0; i < num_operands; ++i){
   auto operand = operation->getTensorOperand(i);
   const auto & tensor_name = operand->getName();
   if(tensor_name.length() >= 2){
    if(tensor_name[0] == '_' && tensor_name[1] == 'd'){ //_d: explicit Kronecker Delta tensor
     if(!tensorAllocated(tensor_name)){
      //std::cout << "#DEBUG(exatn::NumServer::submitOp): Kronecker Delta tensor creation: "
      //          << tensor_name << ": Element type = " << static_cast<int>(elem_type) << std::endl; //debug
      assert(elem_type != TensorElementType::VOID);
      std::shared_ptr<TensorOperation> op0 = tensor_op_factory_->createTensorOp(TensorOpCode::CREATE);
      op0->setTensorOperand(operand);
      std::dynamic_pointer_cast<numerics::TensorOpCreate>(op0)->resetTensorElementType(elem_type);
      success = submitOp(op0);
      if(success){
       deltas.push(i);
       std::shared_ptr<TensorOperation> op1 = tensor_op_factory_->createTensorOp(TensorOpCode::TRANSFORM);
       op1->setTensorOperand(operand);
       std::dynamic_pointer_cast<numerics::TensorOpTransform>(op1)->
        resetFunctor(std::shared_ptr<TensorMethod>(new numerics::FunctorInitDelta()));
       success = submitOp(op1);
      }
     }
596
597
    }
   }
598
   if(!success) break;
599
600
601
  }
 }
 if(success){
602
603
  //Submit the main tensor operation:
  if(operation->isComposite()){
Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
604
   const auto num_ops = operation->decompose(*tensor_mapper); assert(num_ops > 0);
605
606
607
608
609
610
   for(std::size_t op_id = 0; op_id < num_ops; ++op_id){
    success = submitOp((*operation)[op_id]); if(!success) break;
   }
  }else{
   success = submitOp(operation);
  }
611
  if(success){
612
   //Destroy temporarily created Kronecker Delta tensors:
613
614
615
616
617
618
619
620
621
622
623
624
   while(!deltas.empty()){
    auto operand = operation->getTensorOperand(deltas.top());
    std::shared_ptr<TensorOperation> op2 = tensor_op_factory_->createTensorOp(TensorOpCode::DESTROY);
    op2->setTensorOperand(operand);
    success = submitOp(op2); if(!success) break;
    deltas.pop();
   }
  }
 }
 return success;
}

625
bool NumServer::submit(TensorNetwork & network)
626
{
627
 return submit(getDefaultProcessGroup(),network);
628
629
630
631
}

bool NumServer::submit(std::shared_ptr<TensorNetwork> network)
{
632
 return submit(getDefaultProcessGroup(),network);
633
634
}

635
636
bool NumServer::submit(const ProcessGroup & process_group,
                       TensorNetwork & network)
637
{
638
639
 const bool debugging = false;
 const bool serialize = false;
640

641
642
643
644
645
646
647
#ifdef CUQUANTUM
 if(comp_backend_ == "cuquantum"){
  auto sh_network = std::shared_ptr<TensorNetwork>(&network,[](TensorNetwork * net_ptr){});
  return submit(process_group,sh_network);
 }
#endif

648
649
 //Determine parallel execution configuration:
 unsigned int local_rank; //local process rank within the process group
650
 if(!process_group.rankIsIn(process_rank_,&local_rank)) return true; //process is not in the group: Do nothing
651
 //assert(network.isValid()); //debug
652
 unsigned int num_procs = process_group.getSize(); //number of executing processes
653
 assert(local_rank < num_procs);
654
 if(logging_ > 0) logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
655
656
657
                           << "]: Submitting tensor network <" << network.getName() << "> (" << network.getTensor(0)->getName()
                           << ") for execution by " << num_procs << " processes with memory limit "
                           << process_group.getMemoryLimitPerProcess() << " bytes" << std::endl << std::flush;
658
 if(logging_ > 0) network.printItFile(logfile_);
659

660
661
 //Determine the pseudo-optimal tensor contraction sequence:
 const auto num_input_tensors = network.getNumTensors();
662
663
 bool new_contr_seq = network.exportContractionSequence().empty();
 if(contr_seq_caching_ && new_contr_seq){ //check whether the optimal tensor contraction sequence is already available from the past
664
  auto cached_seq = ContractionSeqOptimizer::findContractionSequence(network);
665
666
667
668
  if(cached_seq.first != nullptr){
   network.importContractionSequence(*(cached_seq.first),cached_seq.second);
   new_contr_seq = false;
  }
669
 }
670
671
672
 if(new_contr_seq){
  double flops = network.determineContractionSequence(contr_seq_optimizer_);
 }
673
674
675
 if(logging_ > 0) logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
                           << "]: Found a contraction sequence candidate locally (caching = " << contr_seq_caching_
                           << ")" << std::endl;
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
#ifdef MPI_ENABLED
 //Synchronize on the best tensor contraction sequence across processes:
 if(num_procs > 1 && num_input_tensors > 2){
  double flops = 0.0;
  std::vector<double> proc_flops(num_procs,0.0);
  std::vector<unsigned int> contr_seq_content;
  packContractionSequenceIntoVector(network.exportContractionSequence(&flops),contr_seq_content);
  auto errc = MPI_Gather(&flops,1,MPI_DOUBLE,proc_flops.data(),1,MPI_DOUBLE,
                         0,process_group.getMPICommProxy().getRef<MPI_Comm>());
  assert(errc == MPI_SUCCESS);
  auto min_flops = std::min_element(proc_flops.cbegin(),proc_flops.cend());
  int root_id = static_cast<int>(std::distance(proc_flops.cbegin(),min_flops));
  errc = MPI_Bcast(&root_id,1,MPI_INT,0,process_group.getMPICommProxy().getRef<MPI_Comm>());
  assert(errc == MPI_SUCCESS);
  errc = MPI_Bcast(&flops,1,MPI_DOUBLE,root_id,process_group.getMPICommProxy().getRef<MPI_Comm>());
  assert(errc == MPI_SUCCESS);
  errc = MPI_Bcast(contr_seq_content.data(),contr_seq_content.size(),MPI_UNSIGNED,
                   root_id,process_group.getMPICommProxy().getRef<MPI_Comm>());
  assert(errc == MPI_SUCCESS);
  network.importContractionSequence(contr_seq_content,flops);
 }
697
698
 if(logging_ > 0) logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
                           << "]: Found the optimal contraction sequence across all processes" << std::endl;
699
#endif
700
 if(logging_ > 0) network.printContractionSequence(logfile_);
701
702

 //Generate the primitive tensor operation list:
Dmitry I. Lyakh's avatar
Dmitry I. Lyakh committed
703
 auto tensor_mapper = getTensorMapper(process_group);
704
 auto & op_list = network.getOperationList(contr_seq_optimizer_,(num_procs > 1));
705
 if(contr_seq_caching_ && new_contr_seq) ContractionSeqOptimizer::cacheContractionSequence(network);
706
 const double max_intermediate_presence_volume = network.getMaxIntermediatePresenceVolume();
707
708
 unsigned int max_intermediate_rank = 0;
 double max_intermediate_volume = network.getMaxIntermediateVolume(&max_intermediate_rank);
709
710
 if(logging_ > 0) logfile_ << "[" << std::fixed << std::setprecision(6) << exatn::Timer::timeInSecHR(getTimeStampStart())
                           << "]: Contraction info: FMA flop count = " << std::scientific << network.getFMAFlops()
711
                           << "; Max intermediate presence volume = " << max_intermediate_presence_volume
712
713
                           << "; Max intermediate rank = " << max_intermediate_rank
                           << " with volume " << max_intermediate_volume << " -> ";
714
715
716

 //Split some of the tensor network indices based on the requested memory limit:
 const std::size_t proc_mem_volume = process_group.getMemoryLimitPerProcess() / sizeof(std::complex<double>);
717
 if(max_intermediate_presence_volume > 0.0 && max_intermediate_volume > 0.0){
718
719
  const double shrink_coef = std::min(1.0,
   static_cast<double>(proc_mem_volume) / (max_intermediate_presence_volume * 1.5 * 2.0)); //{1.5:memory fragmentation}; {2.0:tensor transpose}
720
721
  max_intermediate_volume *= shrink_coef;
 }
722
 if(logging_ > 0) logfile_ << max_intermediate_volume << " (after slicing)" << std::endl << std::flush;
723
 //if(max_intermediate_presence_volume > 0.0 && max_intermediate_volume > 0.0)
724
 network.splitIndices(static_cast<std::size_t>(max_intermediate_volume));
725
 if(logging_ > 0) network.printSplitIndexInfo(logfile_,logging_ > 1);
726

727
 //Create the output tensor of the tensor network if needed:
728
 bool submitted = false;
729
 auto output_tensor = network.getTensor(0);
730
 auto iter = tensors_.find(output_tensor->getName());
731
 if(iter == tensors_.end()){ //output tensor does not exist and needs to be created
732
  implicit_tensors_.emplace(std::make_pair(output_tensor->getName(),output_tensor)); //list of implicitly created tensors (for garbage collection)
733
734
735
736
  if(!(process_group == getDefaultProcessGroup())){
   auto saved = tensor_comms_.emplace(std::make_pair(output_tensor->getName(),process_group));
   assert(saved.second);
  }
737
738
739
740
  //Create output tensor:
  std::shared_ptr<TensorOperation> op0 = tensor_op_factory_->createTensorOp(TensorOpCode::CREATE);
  op0->setTensorOperand(output_tensor);
  std::dynamic_pointer_cast<numerics::TensorOpCreate>(op0)->
741
   resetTensorElementType(output_tensor->getElementType());
742
  submitted = submit(op0,tensor_mapper); if(!submitted) return false; //this CREATE operation will also register the output tensor
743
 }
744

745
 //Initialize the output tensor to zero:
746
747
748
749
 std::shared_ptr<TensorOperation> op1 = tensor_op_factory_->createTensorOp(TensorOpCode::TRANSFORM);
 op1->setTensorOperand(output_tensor);
 std::dynamic_pointer_cast<numerics::TensorOpTransform>(op1)->
  resetFunctor(std::shared_ptr<TensorMethod>(new numerics::FunctorInitVal(0.0)));
750
 submitted = submit(op1,tensor_mapper); if(!submitted) return false;
751

752
 //Submit all tensor operations for tensor network evaluation:
753
 std::size_t num_tens_ops_in_fly = 0;
754
 const auto num_split_indices = network.getNumSplitIndices(); //total number of indices that were split
755
 if(logging_ > 0) logfile_ << "Number of split indices = " << num_split_indices << std::endl << std::flush;
756
 std::size_t num_items_executed = 0; //number of tensor sub-networks executed
757
 if(num_split_indices > 0){ //multiple tensor sub-networks need to be executed by all processes ditributively
758
  //Distribute tensor sub-networks among processes:
759
760
  std::vector<DimExtent> work_extents(num_split_indices);
  for(int i = 0; i < num_split_indices; ++i) work_extents[i] = network.getSplitIndexInfo(i).second.size(); //number of segments per split index
761
  numerics::TensorRange work_range(work_extents); //each range dimension refers to the number of segments per the corresponding split index
762
  bool not_done = true;
763
  if(num_procs > 1) not_done = work_range.reset(num_procs,local_rank); //work subrange for the current local process rank (may be empty)
764
765
  if(logging_ > 0) logfile_ << "Total number of sub-networks = " << work_range.localVolume()
                            << "; Current process has a share (0/1) = " << not_done << std::endl << std::flush;
766
  //Each process executes its share of tensor sub-networks:
767
  while(not_done){
768
   if(logging_ > 1){
769
    logfile_ << "Submitting sub-network ";
770
771
    work_range.printCurrent(logfile_);
    logfile_ << std::endl;
772
   }
773
   std::unordered_map<numerics::TensorHashType,std::shared_ptr<numerics::Tensor>> intermediate_slices; //temporary slices of intermediates
774
   std::list<std::shared_ptr<numerics::Tensor>> input_slices; //temporary slices of input tensors
775
   //Execute all tensor operations for the current tensor sub-network:
776
   for(auto op = op_list.begin(); op != op_list.end(); ++op){
777
778
779
780
    if(debugging && logging_ > 1){ //debug
     logfile_ << "Next tensor operation from the tensor network operation list:" << std::endl;
     (*op)->printItFile(logfile_);
    }
781
    const auto num_operands = (*op)->getNumOperands();
782
783
784
    std::shared_ptr<TensorOperation> tens_op = (*op)->clone();
    //Substitute sliced tensor operands with their respective slices from the current tensor sub-network:
    std::shared_ptr<numerics::Tensor> output_tensor_slice;
785
    for(unsigned int op_num = 0; op_num < num_operands; ++op_num){
786
787
     auto tensor = (*op)->getTensorOperand(op_num);
     const auto tensor_rank = tensor->getRank();
788
789
790
     if(debugging && logging_ > 1){ //debug
      logfile_ << "Next tensor operand " << op_num << " named " << tensor->getName();
     }
791
     bool tensor_is_output;
792
793
     bool tensor_is_intermediate = tensorNameIsIntermediate(*tensor,&tensor_is_output);
     tensor_is_output = (tensor == output_tensor);
794
     //Look up the tensor operand in the table of sliced tensor operands:
795
     std::pair<numerics::TensorHashType,numerics::TensorHashType> key;
796
     if(tensor_is_intermediate || tensor_is_output){ //intermediate tensor (including output tensor)
797
      numerics::TensorHashType zero = 0;
798
799
      key = std::make_pair(zero,tensor->getTensorHash());
     }else{ //input tensor
800
      numerics::TensorHashType pos = op_num;
801
802
803
      key = std::make_pair((*op)->getTensorOpHash(),pos);
     }
     const auto * tensor_info = network.getSplitTensorInfo(key);
804
     //Replace the full tensor operand with its respective slice (if found):
805
     if(tensor_info != nullptr){ //tensor has splitted indices
806
      if(debugging && logging_ > 1) logfile_ << " with split indices" << std::endl; //debug
807
      std::shared_ptr<numerics::Tensor> tensor_slice;
808
      //Look up the tensor slice in case it has already been created:
809
      if(tensor_is_intermediate && (!tensor_is_output)){ //pure intermediate tensor
810
       auto slice_iter = intermediate_slices.find(tensor->getTensorHash()); //look up by the hash of the parental tensor
811
       if(slice_iter != intermediate_slices.end()) tensor_slice = slice_iter->second;
812
      }
813
      //Create the tensor slice upon first encounter:
814
      if(!tensor_slice){
815
       //Import original subspaces and dimension extents from the parental tensor:
816
817
818
819
       std::vector<SubspaceId> subspaces(tensor_rank);
       for(unsigned int i = 0; i < tensor_rank; ++i) subspaces[i] = tensor->getDimSubspaceId(i);
       std::vector<DimExtent> dim_extents(tensor_rank);
       for(unsigned int i = 0; i < tensor_rank; ++i) dim_extents[i] = tensor->getDimExtent(i);
820
       //Replace the sliced dimensions with their updated subspaces and dimension extents:
821
822
       for(const auto & index_desc: *tensor_info){
        const auto gl_index_id = index_desc.first;
823
        const auto index_pos = index_desc.second;
824
825
        const auto & index_info = network.getSplitIndexInfo(gl_index_id);
        const auto segment_selector = work_range.getIndex(gl_index_id);
826
827
        subspaces[index_pos] = index_info.second[segment_selector].first;
        dim_extents[index_pos] = index_info.second[segment_selector].second;
828
829
        if(logging_ > 1) logfile_ << "Index replacement in tensor " << tensor->getName()
         << ": " << index_info.first << " in position " << index_pos << std::endl;
830
       }
831
       //Construct the tensor slice from the parental tensor:
832
833
       tensor_slice = tensor->createSubtensor(subspaces,dim_extents);
       tensor_slice->rename(); //unique automatic name will be generated
834
       //Store the tensor in the table for subsequent referencing:
835
       if(tensor_is_intermediate && (!tensor_is_output)){ //pure intermediate tensor
836
837
        auto res = intermediate_slices.emplace(std::make_pair(tensor->getTensorHash(),tensor_slice));
        assert(res.second);
838
839
       }else{ //input/output tensor
        input_slices.emplace_back(tensor_slice);
840
841
       }
      }
842
      //Replace the sliced tensor operand with its current slice in the primary tensor operation:
843
      bool replaced = tens_op->resetTensorOperand(op_num,tensor_slice); assert(replaced);
844
845
846
      //Allocate the input/output tensor slice and extract its contents (not for intermediates):
      if(!tensor_is_intermediate || tensor_is_output){ //input/output tensor: create slice and extract its contents
       //Create an empty slice of the input/output tensor:
847
848
849
850
       std::shared_ptr<TensorOperation> create_slice = tensor_op_factory_->createTensorOp(TensorOpCode::CREATE);
       create_slice->setTensorOperand(tensor_slice);
       std::dynamic_pointer_cast<numerics::TensorOpCreate>(create_slice)->
        resetTensorElementType(tensor->getElementType());
851
       submitted = submit(create_slice,tensor_mapper); if(!submitted) return false;
852
       ++num_tens_ops_in_fly;
853
854
855
856
857
858
       //Extract the slice contents from the input/output tensor:
       if(tensor_is_output){ //make sure the output tensor slice only shows up once
        //assert(tensor == output_tensor);
        assert(!output_tensor_slice);
        output_tensor_slice = tensor_slice;
       }
859
860
861
       std::shared_ptr<TensorOperation> extract_slice = tensor_op_factory_->createTensorOp(TensorOpCode::SLICE);
       extract_slice->setTensorOperand(tensor_slice);
       extract_slice->setTensorOperand(tensor);
862
       submitted = submit(extract_slice,tensor_mapper); if(!submitted) return false;
863
       ++num_tens_ops_in_fly;
864
      }
865
866
     }else{
      if(debugging && logging_ > 1) logfile_ << " without split indices" << std::endl; //debug
867
     }
868
869
    } //loop over tensor operands
    //Submit the primary tensor operation with the current slices:
870
    submitted = submit(tens_op,tensor_mapper); if(!submitted) return false;
871
    ++num_tens_ops_in_fly;
872
873
874
875
876
    //Insert the output tensor slice back into the output tensor:
    if(output_tensor_slice){
     std::shared_ptr<TensorOperation> insert_slice = tensor_op_factory_->createTensorOp(TensorOpCode::INSERT);
     insert_slice->setTensorOperand(output_tensor);
     insert_slice->setTensorOperand(output_tensor_slice);
877
     submitted = submit(insert_slice,tensor_mapper); if(!submitted) return false;
878
     ++num_tens_ops_in_fly;
879
880
     output_tensor_slice.reset();
    }
881
882
883
    //Destroy temporary input tensor slices:
    for(auto & input_slice: input_slices){
     std::shared_ptr<TensorOperation> destroy_slice = tensor_op_factory_->createTensorOp(TensorOpCode::DESTROY);
884
     destroy_slice->setTensorOperand(input_slice);
885
     submitted = submit(destroy_slice,tensor_mapper); if(!submitted) return false;
886
887
888
889
890
     ++num_tens_ops_in_fly;
    }
    if(serialize || num_tens_ops_in_fly > exatn::runtime::TensorRuntime::MAX_RUNTIME_DAG_SIZE){
     sync(process_group);
     num_tens_ops_in_fly = 0;
891
892
    }
    input_slices.clear();
893
894
   } //loop over tensor operations
   //Erase intermediate tensor slices once all tensor operations have been executed:
895
   intermediate_slices.clear();
896
   ++num_items_executed;
897
   //Proceed to the next tensor sub-network:
898
   not_done = work_range.next();
899
  } //loop over tensor sub-networks
900
901
902
903
904
  //Allreduce the tensor network output tensor within the executing process group:
  if(num_procs > 1){
   std::shared_ptr<TensorOperation> allreduce = tensor_op_factory_->createTensorOp(TensorOpCode::ALLREDUCE);
   allreduce->setTensorOperand(output_tensor);
   std::dynamic_pointer_cast<numerics::TensorOpAllreduce>(allreduce)->resetMPICommunicator(process_group.getMPICommProxy());
905
   submitted = submit(allreduce,tensor_mapper); if(!submitted) return false;
906
   ++num_tens_ops_in_fly;
907
908
  }
 }else{ //only a single tensor (sub-)network executed redundantly by all processes
909
  for(auto op = op_list.begin(); op != op_list.end(); ++op){
910
   submitted = submit(*op,tensor_mapper); if(!submitted) return false;
911
   ++num_tens_ops_in_fly;
912
  }
913
  ++num_items_executed;
914
 }
915
 if(logging_ > 0) logfile_ << "Number of submitted sub-networks = " << num_items_executed << std::endl << std::