Commit 6b0e3a93 authored by Dmitry I. Lyakh's avatar Dmitry I. Lyakh

Fixed an issue in Lazy DAG executor related to avoiding multiple ready nodes.

parent a171ede9
......@@ -62,8 +62,8 @@ void LazyGraphExecutor::execute(TensorGraph & dag) {
if(ready_for_execution){ //node is idle
ready_for_execution = ready_for_execution && dag.nodeDependenciesResolved(progress.current);
if(ready_for_execution){ //all node dependencies resolved (or none)
dag.registerDependencyFreeNode(progress.current);
if(logging_.load() > 1) logfile_ << "DAG node detected with all dependencies resolved: " << progress.current << std::endl;
auto registered = dag.registerDependencyFreeNode(progress.current);
if(registered && logging_.load() > 1) logfile_ << "DAG node detected with all dependencies resolved: " << progress.current << std::endl;
}else{ //node still has unresolved dependencies, try prefetching
if(progress.current < (progress.front + this->getPrefetchDepth())){
auto prefetching = this->node_executor_->prefetch(*(dag_node.getOperation()));
......@@ -146,7 +146,7 @@ void LazyGraphExecutor::execute(TensorGraph & dag) {
}else{ //tensor operation not submitted due to either temporary resource shortage or fatal error
auto discarded = this->node_executor_->discard(exec_handle);
dag.setNodeIdle(node);
dag.registerDependencyFreeNode(node);
auto registered = dag.registerDependencyFreeNode(node); assert(registered);
issued = false;
if(error_code == TRY_LATER){ //temporary shortage of resources
if(logging_.load() != 0) logfile_ << ": Postponed" << std::endl;
......
......@@ -75,10 +75,14 @@ std::size_t TensorExecState::getTensorUpdateCount(const Tensor & tensor)
return iter->second->update_count.load();
}
void TensorExecState::registerDependencyFreeNode(VertexIdType node_id)
bool TensorExecState::registerDependencyFreeNode(VertexIdType node_id)
{
nodes_ready_.emplace_back(node_id);
return;
bool inserted = true;
for(auto iter = nodes_ready_.begin(); iter != nodes_ready_.end(); ++iter){
if(*iter == node_id){inserted = false; break;}
}
if(inserted) nodes_ready_.emplace_back(node_id);
return inserted;
}
bool TensorExecState::extractDependencyFreeNode(VertexIdType * node_id)
......
......@@ -107,7 +107,7 @@ public:
std::size_t getTensorUpdateCount(const Tensor & tensor);
/** Registers a DAG node without dependencies. **/
void registerDependencyFreeNode(VertexIdType node_id);
bool registerDependencyFreeNode(VertexIdType node_id);
/** Extracts a dependency-free node from the list.
Returns FALSE if no such node exists. **/
bool extractDependencyFreeNode(VertexIdType * node_id);
......
......@@ -274,11 +274,11 @@ public:
}
/** Registers a DAG node without dependencies. **/
inline void registerDependencyFreeNode(VertexIdType node_id) {
inline bool registerDependencyFreeNode(VertexIdType node_id) {
lock();
exec_state_.registerDependencyFreeNode(node_id);
auto registered = exec_state_.registerDependencyFreeNode(node_id);
unlock();
return;
return registered;
}
/** Extracts a dependency-free node from the list.
......
Markdown is supported
0%
or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment