Commit 9c9db23d authored by dpugmire's avatar dpugmire
Browse files

Fixes per review

parent 2d1ff10d
......@@ -77,7 +77,7 @@ void Messenger::InitializeBuffers()
void Messenger::CleanupRequests(int tag)
{
std::vector<RequestTagPair> delKeys;
for (auto&& i : this->RecvBuffers)
for (const auto& i : this->RecvBuffers)
{
if (tag == TAG_ANY || tag == i.first.second)
delKeys.push_back(i.first);
......@@ -120,42 +120,45 @@ void Messenger::PostRecv(int tag, std::size_t sz, int src)
void Messenger::CheckPendingSendRequests()
{
std::vector<std::pair<RequestTagPair, char*>> req;
if (this->CheckRequests(this->SendBuffers, {}, false, req))
std::vector<RequestTagPair> reqTags;
this->CheckRequests(this->SendBuffers, {}, false, reqTags);
//Cleanup any send buffers that have completed.
for (const auto& rt : reqTags)
{
for (const auto& it : req)
auto entry = this->SendBuffers.find(rt);
if (entry != this->SendBuffers.end())
{
delete[] it.second;
this->SendBuffers.erase(it.first);
delete[] entry->second;
this->SendBuffers.erase(entry);
}
}
}
bool Messenger::CheckRequests(const std::map<RequestTagPair, char*>& buffers,
void Messenger::CheckRequests(const std::map<RequestTagPair, char*>& buffers,
const std::set<int>& tagsToCheck,
bool BlockAndWait,
std::vector<std::pair<RequestTagPair, char*>>& ret)
std::vector<RequestTagPair>& reqTags)
{
std::vector<MPI_Request> req, copy;
std::vector<int> tags;
req.reserve(buffers.size());
tags.reserve(buffers.size());
reqTags.resize(0);
//Check the buffers for the specified tags.
for (const auto& it : buffers)
{
if (tagsToCheck.empty() || tagsToCheck.find(it.first.second) != tagsToCheck.end())
{
req.push_back(it.first.first);
copy.push_back(it.first.first);
tags.push_back(it.first.second);
}
}
//Nothing..
if (req.empty())
{
ret.resize(0);
return false;
}
return;
//Check the outstanding requests.
std::vector<MPI_Status> status(req.size());
......@@ -168,23 +171,12 @@ bool Messenger::CheckRequests(const std::map<RequestTagPair, char*>& buffers,
else
err = MPI_Testsome(req.size(), req.data(), &num, indices.data(), status.data());
if (err != MPI_SUCCESS)
throw vtkm::cont::ErrorFilterExecution(
"Error with MPI_Testsome/MPI_Waitsome in Messenger::CheckRequests");
throw vtkm::cont::ErrorFilterExecution("Error with MPI_Testsome in Messenger::RecvData");
if (num == 0)
return false;
//Add the req/tag to the return vector.
std::size_t sz = static_cast<std::size_t>(num);
ret.reserve(sz);
for (std::size_t i = 0; i < sz; i++)
{
RequestTagPair key(req[indices[i]], tags[indices[i]]);
const auto& it = buffers.find(key);
ret.push_back(std::make_pair(key, it->second));
}
return true;
for (int i = 0; i < num; i++)
reqTags.push_back(RequestTagPair(copy[indices[i]], tags[indices[i]]));
}
bool Messenger::PacketCompare(const char* a, const char* b)
......@@ -294,25 +286,31 @@ bool Messenger::RecvData(const std::set<int>& tags,
buffers.resize(0);
std::vector<RequestTagPair> reqTags;
std::vector<std::pair<RequestTagPair, char*>> req;
if (!this->CheckRequests(this->RecvBuffers, tags, blockAndWait, req))
this->CheckRequests(this->RecvBuffers, tags, blockAndWait, reqTags);
//Nothing came in.
if (reqTags.empty())
return false;
std::vector<char*> incomingBuffers;
incomingBuffers.reserve(req.size());
for (const auto& r : req)
incomingBuffers.reserve(reqTags.size());
for (const auto& rt : reqTags)
{
incomingBuffers.push_back(r.second);
this->RecvBuffers.erase(r.first);
auto it = this->RecvBuffers.find(rt);
if (it == this->RecvBuffers.end())
throw vtkm::cont::ErrorFilterExecution("receive buffer not found");
incomingBuffers.push_back(it->second);
this->RecvBuffers.erase(it);
}
this->ProcessReceivedBuffers(incomingBuffers, buffers);
//Re-post receives
for (const auto& rt : req)
this->PostRecv(rt.first.second);
for (const auto& rt : reqTags)
this->PostRecv(rt.second);
return true;
return !buffers.empty();
}
void Messenger::ProcessReceivedBuffers(std::vector<char*>& incomingBuffers,
......
......@@ -44,8 +44,8 @@ public:
#endif
}
inline int GetRank() const { return this->Rank; }
inline int GetNumRanks() const { return this->NumRanks; }
int GetRank() const { return this->Rank; }
int GetNumRanks() const { return this->NumRanks; }
#ifdef VTKM_ENABLE_MPI
VTKM_CONT void RegisterTag(int tag, std::size_t numRecvs, std::size_t size);
......@@ -96,10 +96,10 @@ private:
std::map<RequestTagPair, char*> SendBuffers;
static constexpr int TAG_ANY = -1;
bool CheckRequests(const std::map<RequestTagPair, char*>& buffer,
void CheckRequests(const std::map<RequestTagPair, char*>& buffer,
const std::set<int>& tags,
bool BlockAndWait,
std::vector<std::pair<RequestTagPair, char*>>& ret);
std::vector<RequestTagPair>& reqTags);
#else
protected:
static constexpr int NumRanks = 1;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment