Commit c5194504 authored by Dmitry I. Lyakh's avatar Dmitry I. Lyakh
Browse files

Implemented parallel tensor expansion evaluation, needs testing ...


Signed-off-by: default avatarDmitry I. Lyakh <quant4me@gmail.com>
parent d5e50c4f
......@@ -688,6 +688,10 @@ bool NumServer::submit(const ProcessGroup & process_group,
auto iter = tensors_.find(output_tensor->getName());
if(iter == tensors_.end()){ //output tensor does not exist and needs to be created
implicit_tensors_.emplace(std::make_pair(output_tensor->getName(),output_tensor)); //list of implicitly created tensors (for garbage collection)
if(!(process_group == getDefaultProcessGroup())){
auto saved = tensor_comms_.emplace(std::make_pair(output_tensor->getName(),process_group));
assert(saved.second);
}
//Create output tensor:
std::shared_ptr<TensorOperation> op0 = tensor_op_factory_->createTensorOp(TensorOpCode::CREATE);
op0->setTensorOperand(output_tensor);
......@@ -887,13 +891,14 @@ bool NumServer::submit(const ProcessGroup & process_group,
unsigned int local_rank;
if(!process_group.rankIsIn(process_rank_,&local_rank)) return true; //process is not in the group: Do nothing
assert(accumulator);
bool success = true;
auto tensor_mapper = getTensorMapper(process_group);
if(parallel_width <= 1){ //all processes execute all tensor networks one-by-one
auto tensor_mapper = getTensorMapper(process_group);
std::list<std::shared_ptr<TensorOperation>> accumulations;
for(auto component = expansion.begin(); component != expansion.end(); ++component){
//Evaluate the tensor network component (compute its output tensor):
auto & network = *(component->network);
auto submitted = submit(process_group,network); if(!submitted) return false;
success = submit(process_group,network); assert(success);
//Create accumulation operation for the scaled computed output tensor:
bool conjugated;
auto output_tensor = network.getTensor(0,&conjugated); assert(!conjugated); //output tensor cannot be conjugated
......@@ -910,12 +915,89 @@ bool NumServer::submit(const ProcessGroup & process_group,
}
//Submit all previously created accumulation operations:
for(auto & accumulation: accumulations){
auto submitted = submit(accumulation,tensor_mapper); if(!submitted) return false;
success = submit(accumulation,tensor_mapper); assert(success);
}
}else{ //tensor networks will be distributed among subgroups of processes
std::abort(); //`Finish
//Destroy output tensors of all tensor networks within the original process group:
for(auto component = expansion.begin(); component != expansion.end(); ++component){
auto & network = *(component->network);
bool conjugated;
auto output_tensor = network.getTensor(0,&conjugated); assert(!conjugated); //output tensor cannot be conjugated
const auto & output_tensor_name = output_tensor->getName();
if(tensorAllocated(output_tensor_name)){
success = destroyTensorSync(output_tensor_name); assert(success);
}else{
auto iter = implicit_tensors_.find(output_tensor_name);
if(iter != implicit_tensors_.end()){
std::shared_ptr<TensorOperation> destruction = tensor_op_factory_->createTensorOp(TensorOpCode::DESTROY);
destruction->setTensorOperand(iter->second);
success = submit(destruction,tensor_mapper); assert(success);
auto num_deleted = implicit_tensors_.erase(output_tensor_name); assert(num_deleted == 1);
num_deleted = tensor_comms_.erase(output_tensor_name);
success = sync(*destruction); assert(success);
}
}
}
//Split the process group into subgroups:
const auto num_procs = process_group.getSize();
const auto num_networks = expansion.getNumComponents();
if(num_networks < parallel_width) parallel_width = num_networks;
int procs_per_subgroup = num_procs / parallel_width;
int remainder_procs = num_procs % parallel_width;
int my_subgroup_id = -1;
if(local_rank < (procs_per_subgroup + 1) * remainder_procs){
my_subgroup_id = local_rank / (procs_per_subgroup + 1);
}else{
my_subgroup_id = remainder_procs +
(local_rank - ((procs_per_subgroup + 1) * remainder_procs)) / procs_per_subgroup;
}
assert(my_subgroup_id >= 0 && my_subgroup_id < parallel_width);
auto process_subgroup = process_group.split(my_subgroup_id);
auto local_tensor_mapper = getTensorMapper(*process_subgroup);
//Create/initialize accumulator tensors within subgroups:
auto local_accumulator = makeSharedTensor(*accumulator);
local_accumulator->rename("_lacc"+std::to_string(my_subgroup_id));
success = createTensorSync(*process_subgroup,local_accumulator,accumulator->getElementType()); assert(success);
success = initTensor(local_accumulator->getName(),0.0); assert(success);
//Distribute and evaluate tensor networks within subgroups:
for(auto component = expansion.begin(); component != expansion.end(); ++component){
if(std::distance(expansion.begin(),component) % parallel_width == my_subgroup_id){
auto & network = *(component->network);
success = submit(*process_subgroup,network); assert(success);
//Create accumulation operation for the scaled computed output tensor:
bool conjugated;
auto output_tensor = network.getTensor(0,&conjugated); assert(!conjugated); //output tensor cannot be conjugated
const auto & output_tensor_name = output_tensor->getName();
std::shared_ptr<TensorOperation> local_accumulation = tensor_op_factory_->createTensorOp(TensorOpCode::ADD);
local_accumulation->setTensorOperand(local_accumulator);
local_accumulation->setTensorOperand(output_tensor,conjugated);
local_accumulation->setScalar(0,component->coefficient);
std::string add_pattern;
auto generated = generate_addition_pattern(local_accumulator->getRank(),add_pattern,false,
local_accumulator->getName(),output_tensor_name);
assert(generated);
local_accumulation->setIndexPattern(add_pattern);
success = submit(local_accumulation,local_tensor_mapper); assert(success);
}
}
success = sync(*process_subgroup); assert(success);
//Accumulate local accumulator tensors into the global one:
std::shared_ptr<TensorOperation> accumulation = tensor_op_factory_->createTensorOp(TensorOpCode::ADD);
accumulation->setTensorOperand(accumulator);
accumulation->setTensorOperand(local_accumulator);
std::string add_pattern;
auto generated = generate_addition_pattern(accumulator->getRank(),add_pattern,false,
accumulator->getName(),local_accumulator->getName());
assert(generated);
accumulation->setIndexPattern(add_pattern);
success = submit(accumulation,local_tensor_mapper); assert(success);
success = sync(*process_subgroup); assert(success);
success = allreduceTensorSync(accumulator->getName()); assert(success);
success = sync(process_group); assert(success);
//Destroy local accumulator tensors:
success = destroyTensorSync(local_accumulator->getName()); assert(success);
}
return true;
return success;
}
bool NumServer::submit(const ProcessGroup & process_group,
......@@ -1335,10 +1417,8 @@ bool NumServer::destroyTensor(const std::string & name) //always synchronous
op->setTensorOperand(iter->second);
submitted = submit(op,tensor_mapper);
if(submitted){
auto num_deleted = tensors_.erase(name);
assert(num_deleted == 1);
num_deleted = tensor_comms_.erase(name);
assert(num_deleted == 1);
auto num_deleted = tensors_.erase(name); assert(num_deleted == 1);
num_deleted = tensor_comms_.erase(name); assert(num_deleted == 1);
}
}else{
std::shared_ptr<TensorOperation> op = tensor_op_factory_->createTensorOp(TensorOpCode::DESTROY);
......@@ -1367,14 +1447,12 @@ bool NumServer::destroyTensorSync(const std::string & name)
op->setTensorOperand(iter->second);
submitted = submit(op,tensor_mapper);
if(submitted){
auto num_deleted = tensors_.erase(name);
assert(num_deleted == 1);
auto num_deleted = tensors_.erase(name); assert(num_deleted == 1);
submitted = sync(*op);
#ifdef MPI_ENABLED
if(submitted) submitted = sync(process_group);
#endif
num_deleted = tensor_comms_.erase(name);
assert(num_deleted == 1);
num_deleted = tensor_comms_.erase(name); assert(num_deleted == 1);
}
}else{
std::shared_ptr<TensorOperation> op = tensor_op_factory_->createTensorOp(TensorOpCode::DESTROY);
......@@ -3047,6 +3125,7 @@ void NumServer::destroyOrphanedTensors()
std::shared_ptr<TensorOperation> destroy_op = tensor_op_factory_->createTensorOp(TensorOpCode::DESTROY);
destroy_op->setTensorOperand(iter->second);
auto submitted = submit(destroy_op,tensor_mapper);
auto num_deleted = tensor_comms_.erase(iter->first);
iter = implicit_tensors_.erase(iter);
}else{
++iter;
......
......@@ -502,6 +502,12 @@ public:
if(!tensor_domain.isContainedIn(other_tensors_domain)){
std::cout << "#ERROR(exatn::getTensorProcessGroup): Tensor operand existence domains must be properly nested: "
<< "Tensor " << tensor_name << " violates this requirement!" << std::endl;
const auto & tensor_domain_ranks = tensor_domain.getProcessRanks();
const auto & other_tensors_domain_ranks = other_tensors_domain.getProcessRanks();
for(const auto & proc_rank: tensor_domain_ranks) std::cout << " " << proc_rank;
std::cout << std::endl;
for(const auto & proc_rank: other_tensors_domain_ranks) std::cout << " " << proc_rank;
std::cout << std::endl;
assert(false);
};
return tensor_domain;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment