Commit 2d6075ec authored by Dmitry I. Lyakh's avatar Dmitry I. Lyakh
Browse files

Added FALSE assert when trying to broadcast/allreduce composite tensors, needs implementation ...



Signed-off-by: default avatarDmitry I. Lyakh <quant4me@gmail.com>
parent 85d53eae
/** ExaTN::Numerics: Numerical server /** ExaTN::Numerics: Numerical server
REVISION: 2021/10/01 REVISION: 2021/10/02
Copyright (C) 2018-2021 Dmitry I. Lyakh (Liakh) Copyright (C) 2018-2021 Dmitry I. Lyakh (Liakh)
Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle) **/ Copyright (C) 2018-2021 Oak Ridge National Laboratory (UT-Battelle) **/
...@@ -1953,41 +1953,55 @@ bool NumServer::broadcastTensorSync(const std::string & name, int root_process_r ...@@ -1953,41 +1953,55 @@ bool NumServer::broadcastTensorSync(const std::string & name, int root_process_r
bool NumServer::broadcastTensor(const ProcessGroup & process_group, const std::string & name, int root_process_rank) bool NumServer::broadcastTensor(const ProcessGroup & process_group, const std::string & name, int root_process_rank)
{ {
if(!process_group.rankIsIn(process_rank_)) return true; //process is not in the group: Do nothing if(!process_group.rankIsIn(process_rank_)) return true; //process is not in the group: Do nothing
bool success = true;
auto tensor_mapper = getTensorMapper(process_group); auto tensor_mapper = getTensorMapper(process_group);
auto iter = tensors_.find(name); auto iter = tensors_.find(name);
if(iter == tensors_.end()){ if(iter == tensors_.end()){
std::cout << "#ERROR(exatn::NumServer::broadcastTensor): Tensor " << name << " not found!" << std::endl; std::cout << "#ERROR(exatn::NumServer::broadcastTensor): Tensor " << name << " not found!" << std::endl;
return false; return false;
} }
std::shared_ptr<TensorOperation> op = tensor_op_factory_->createTensorOp(TensorOpCode::BROADCAST); if(iter->second->isComposite()){
op->setTensorOperand(iter->second); std::cout << "#ERROR(exatn::NumServer::broadcastTensor): Tensor " << name
std::dynamic_pointer_cast<numerics::TensorOpBroadcast>(op)->resetMPICommunicator(process_group.getMPICommProxy()); << " is composite, broadcast not implemented!" << std::endl << std::flush;
std::dynamic_pointer_cast<numerics::TensorOpBroadcast>(op)->resetRootRank(root_process_rank); assert(false);
auto submitted = submit(op,tensor_mapper); }else{
return submitted; std::shared_ptr<TensorOperation> op = tensor_op_factory_->createTensorOp(TensorOpCode::BROADCAST);
op->setTensorOperand(iter->second);
std::dynamic_pointer_cast<numerics::TensorOpBroadcast>(op)->resetMPICommunicator(process_group.getMPICommProxy());
std::dynamic_pointer_cast<numerics::TensorOpBroadcast>(op)->resetRootRank(root_process_rank);
success = submit(op,tensor_mapper);
}
return success;
} }
bool NumServer::broadcastTensorSync(const ProcessGroup & process_group, const std::string & name, int root_process_rank) bool NumServer::broadcastTensorSync(const ProcessGroup & process_group, const std::string & name, int root_process_rank)
{ {
if(!process_group.rankIsIn(process_rank_)) return true; //process is not in the group: Do nothing if(!process_group.rankIsIn(process_rank_)) return true; //process is not in the group: Do nothing
bool success = true;
auto tensor_mapper = getTensorMapper(process_group); auto tensor_mapper = getTensorMapper(process_group);
auto iter = tensors_.find(name); auto iter = tensors_.find(name);
if(iter == tensors_.end()){ if(iter == tensors_.end()){
std::cout << "#ERROR(exatn::NumServer::broadcastTensorSync): Tensor " << name << " not found!" << std::endl; std::cout << "#ERROR(exatn::NumServer::broadcastTensorSync): Tensor " << name << " not found!" << std::endl;
return false; return false;
} }
std::shared_ptr<TensorOperation> op = tensor_op_factory_->createTensorOp(TensorOpCode::BROADCAST); if(iter->second->isComposite()){
op->setTensorOperand(iter->second); std::cout << "#ERROR(exatn::NumServer::broadcastTensorSync): Tensor " << name
std::dynamic_pointer_cast<numerics::TensorOpBroadcast>(op)->resetMPICommunicator(process_group.getMPICommProxy()); << " is composite, broadcast not implemented!" << std::endl << std::flush;
std::dynamic_pointer_cast<numerics::TensorOpBroadcast>(op)->resetRootRank(root_process_rank); assert(false);
auto submitted = submit(op,tensor_mapper); }else{
if(submitted){ std::shared_ptr<TensorOperation> op = tensor_op_factory_->createTensorOp(TensorOpCode::BROADCAST);
submitted = sync(*op); op->setTensorOperand(iter->second);
std::dynamic_pointer_cast<numerics::TensorOpBroadcast>(op)->resetMPICommunicator(process_group.getMPICommProxy());
std::dynamic_pointer_cast<numerics::TensorOpBroadcast>(op)->resetRootRank(root_process_rank);
success = submit(op,tensor_mapper);
if(success){
success = sync(*op);
#ifdef MPI_ENABLED #ifdef MPI_ENABLED
if(submitted) submitted = sync(process_group); if(success) success = sync(process_group);
#endif #endif
}
} }
return submitted; return success;
} }
bool NumServer::allreduceTensor(const std::string & name) bool NumServer::allreduceTensor(const std::string & name)
...@@ -2003,39 +2017,53 @@ bool NumServer::allreduceTensorSync(const std::string & name) ...@@ -2003,39 +2017,53 @@ bool NumServer::allreduceTensorSync(const std::string & name)
bool NumServer::allreduceTensor(const ProcessGroup & process_group, const std::string & name) bool NumServer::allreduceTensor(const ProcessGroup & process_group, const std::string & name)
{ {
if(!process_group.rankIsIn(process_rank_)) return true; //process is not in the group: Do nothing if(!process_group.rankIsIn(process_rank_)) return true; //process is not in the group: Do nothing
bool success = true;
auto tensor_mapper = getTensorMapper(process_group); auto tensor_mapper = getTensorMapper(process_group);
auto iter = tensors_.find(name); auto iter = tensors_.find(name);
if(iter == tensors_.end()){ if(iter == tensors_.end()){
std::cout << "#ERROR(exatn::NumServer::allreduceTensor): Tensor " << name << " not found!" << std::endl; std::cout << "#ERROR(exatn::NumServer::allreduceTensor): Tensor " << name << " not found!" << std::endl;
return false; return false;
} }
std::shared_ptr<TensorOperation> op = tensor_op_factory_->createTensorOp(TensorOpCode::ALLREDUCE); if(iter->second->isComposite()){
op->setTensorOperand(iter->second); std::cout << "#ERROR(exatn::NumServer::allreduceTensor): Tensor " << name
std::dynamic_pointer_cast<numerics::TensorOpAllreduce>(op)->resetMPICommunicator(process_group.getMPICommProxy()); << " is composite, allreduce not implemented!" << std::endl << std::flush;
auto submitted = submit(op,tensor_mapper); assert(false);
return submitted; }else{
std::shared_ptr<TensorOperation> op = tensor_op_factory_->createTensorOp(TensorOpCode::ALLREDUCE);
op->setTensorOperand(iter->second);
std::dynamic_pointer_cast<numerics::TensorOpAllreduce>(op)->resetMPICommunicator(process_group.getMPICommProxy());
success = submit(op,tensor_mapper);
}
return success;
} }
bool NumServer::allreduceTensorSync(const ProcessGroup & process_group, const std::string & name) bool NumServer::allreduceTensorSync(const ProcessGroup & process_group, const std::string & name)
{ {
if(!process_group.rankIsIn(process_rank_)) return true; //process is not in the group: Do nothing if(!process_group.rankIsIn(process_rank_)) return true; //process is not in the group: Do nothing
bool success = true;
auto tensor_mapper = getTensorMapper(process_group); auto tensor_mapper = getTensorMapper(process_group);
auto iter = tensors_.find(name); auto iter = tensors_.find(name);
if(iter == tensors_.end()){ if(iter == tensors_.end()){
std::cout << "#ERROR(exatn::NumServer::allreduceTensorSync): Tensor " << name << " not found!" << std::endl; std::cout << "#ERROR(exatn::NumServer::allreduceTensorSync): Tensor " << name << " not found!" << std::endl;
return false; return false;
} }
std::shared_ptr<TensorOperation> op = tensor_op_factory_->createTensorOp(TensorOpCode::ALLREDUCE); if(iter->second->isComposite()){
op->setTensorOperand(iter->second); std::cout << "#ERROR(exatn::NumServer::allreduceTensorSync): Tensor " << name
std::dynamic_pointer_cast<numerics::TensorOpAllreduce>(op)->resetMPICommunicator(process_group.getMPICommProxy()); << " is composite, allreduce not implemented!" << std::endl << std::flush;
auto submitted = submit(op,tensor_mapper); assert(false);
if(submitted){ }else{
submitted = sync(*op); std::shared_ptr<TensorOperation> op = tensor_op_factory_->createTensorOp(TensorOpCode::ALLREDUCE);
op->setTensorOperand(iter->second);
std::dynamic_pointer_cast<numerics::TensorOpAllreduce>(op)->resetMPICommunicator(process_group.getMPICommProxy());
success = submit(op,tensor_mapper);
if(success){
success = sync(*op);
#ifdef MPI_ENABLED #ifdef MPI_ENABLED
if(submitted) submitted = sync(process_group); if(success) success = sync(process_group);
#endif #endif
}
} }
return submitted; return success;
} }
bool NumServer::transformTensor(const std::string & name, std::shared_ptr<TensorMethod> functor) bool NumServer::transformTensor(const std::string & name, std::shared_ptr<TensorMethod> functor)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment