Commit a42bb1b0 authored by Dave Pugmire's avatar Dave Pugmire Committed by Kitware Robot
Browse files

Merge topic 'messenger_cleanup'

8bf53bc3 Cleanup per review.
9c9db23d Fixes per review
2d1ff10d Code cleanup
5c453769 Merge branch 'master' of https://gitlab.kitware.com/vtk/vtk-m into messenger_cleanup
43b4bf46 Fix for serial builds.
ebb1019a Add CheckRequests method.
af14f583 Merge branch 'master' of https://gitlab.kitware.com/vtk/vtk-m into messenger_cleanup
55fff59e

 Cleanup the messenger classes.
Acked-by: default avatarKitware Robot <kwrobot@kitware.com>
Acked-by: Yenpure, Abhishek's avatarAbhishek Yenpure <abhishek@uoregon.edu>
Merge-request: !2410
parents 548ea5ee 8bf53bc3
......@@ -77,7 +77,7 @@ void Messenger::InitializeBuffers()
void Messenger::CleanupRequests(int tag)
{
std::vector<RequestTagPair> delKeys;
for (auto&& i : this->RecvBuffers)
for (const auto& i : this->RecvBuffers)
{
if (tag == TAG_ANY || tag == i.first.second)
delKeys.push_back(i.first);
......@@ -85,10 +85,8 @@ void Messenger::CleanupRequests(int tag)
if (!delKeys.empty())
{
for (const auto& it : delKeys)
for (auto& v : delKeys)
{
RequestTagPair v = it;
char* buff = this->RecvBuffers[v];
MPI_Cancel(&(v.first));
delete[] buff;
......@@ -121,44 +119,64 @@ void Messenger::PostRecv(int tag, std::size_t sz, int src)
}
void Messenger::CheckPendingSendRequests()
{
std::vector<RequestTagPair> reqTags;
this->CheckRequests(this->SendBuffers, {}, false, reqTags);
//Cleanup any send buffers that have completed.
for (const auto& rt : reqTags)
{
auto entry = this->SendBuffers.find(rt);
if (entry != this->SendBuffers.end())
{
delete[] entry->second;
this->SendBuffers.erase(entry);
}
}
}
void Messenger::CheckRequests(const std::map<RequestTagPair, char*>& buffers,
const std::set<int>& tagsToCheck,
bool BlockAndWait,
std::vector<RequestTagPair>& reqTags)
{
std::vector<MPI_Request> req, copy;
std::vector<int> tags;
for (auto it = this->SendBuffers.begin(); it != this->SendBuffers.end(); it++)
reqTags.resize(0);
//Check the buffers for the specified tags.
for (const auto& it : buffers)
{
req.push_back(it->first.first);
copy.push_back(it->first.first);
tags.push_back(it->first.second);
if (tagsToCheck.empty() || tagsToCheck.find(it.first.second) != tagsToCheck.end())
{
req.push_back(it.first.first);
copy.push_back(it.first.first);
tags.push_back(it.first.second);
}
}
//Nothing..
if (req.empty())
return;
//See if any sends are done.
int num = 0, *indices = new int[req.size()];
MPI_Status* status = new MPI_Status[req.size()];
int err = MPI_Testsome(req.size(), &req[0], &num, indices, status);
//Check the outstanding requests.
std::vector<MPI_Status> status(req.size());
std::vector<int> indices(req.size());
int num = 0;
int err;
if (BlockAndWait)
err = MPI_Waitsome(req.size(), req.data(), &num, indices.data(), status.data());
else
err = MPI_Testsome(req.size(), req.data(), &num, indices.data(), status.data());
if (err != MPI_SUCCESS)
throw vtkm::cont::ErrorFilterExecution(
"Error iwth MPI_Testsome in Messenger::CheckPendingSendRequests");
throw vtkm::cont::ErrorFilterExecution("Error with MPI_Testsome in Messenger::RecvData");
//Add the req/tag to the return vector.
reqTags.reserve(static_cast<std::size_t>(num));
for (int i = 0; i < num; i++)
{
MPI_Request r = copy[indices[i]];
int tag = tags[indices[i]];
RequestTagPair k(r, tag);
auto entry = this->SendBuffers.find(k);
if (entry != this->SendBuffers.end())
{
delete[] entry->second;
this->SendBuffers.erase(entry);
}
}
delete[] indices;
delete[] status;
reqTags.push_back(RequestTagPair(copy[indices[i]], tags[indices[i]]));
}
bool Messenger::PacketCompare(const char* a, const char* b)
......@@ -228,20 +246,20 @@ void Messenger::SendData(int dst, int tag, const vtkmdiy::MemoryBuffer& buff)
std::vector<char*> bufferList;
//Add headers, break into multiple buffers if needed.
PrepareForSend(tag, buff, bufferList);
this->PrepareForSend(tag, buff, bufferList);
Messenger::Header header;
for (std::size_t i = 0; i < bufferList.size(); i++)
for (const auto& b : bufferList)
{
memcpy(&header, bufferList[i], sizeof(header));
memcpy(&header, b, sizeof(header));
MPI_Request req;
int err = MPI_Isend(bufferList[i], header.packetSz, MPI_BYTE, dst, tag, this->MPIComm, &req);
int err = MPI_Isend(b, header.packetSz, MPI_BYTE, dst, tag, this->MPIComm, &req);
if (err != MPI_SUCCESS)
throw vtkm::cont::ErrorFilterExecution("Error in MPI_Isend inside Messenger::SendData");
//Add it to sendBuffers
RequestTagPair entry(req, tag);
this->SendBuffers[entry] = bufferList[i];
this->SendBuffers[entry] = b;
}
}
......@@ -251,7 +269,7 @@ bool Messenger::RecvData(int tag, std::vector<vtkmdiy::MemoryBuffer>& buffers, b
setTag.insert(tag);
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>> b;
buffers.resize(0);
if (RecvData(setTag, b, blockAndWait))
if (this->RecvData(setTag, b, blockAndWait))
{
buffers.resize(b.size());
for (std::size_t i = 0; i < b.size(); i++)
......@@ -261,65 +279,36 @@ bool Messenger::RecvData(int tag, std::vector<vtkmdiy::MemoryBuffer>& buffers, b
return false;
}
bool Messenger::RecvData(std::set<int>& tags,
bool Messenger::RecvData(const std::set<int>& tags,
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
bool blockAndWait)
{
buffers.resize(0);
//Find all recv of type tag.
std::vector<MPI_Request> req, copy;
std::vector<int> reqTags;
for (auto i = this->RecvBuffers.begin(); i != this->RecvBuffers.end(); i++)
{
if (tags.find(i->first.second) != tags.end())
{
req.push_back(i->first.first);
copy.push_back(i->first.first);
reqTags.push_back(i->first.second);
}
}
if (req.empty())
return false;
MPI_Status* status = new MPI_Status[req.size()];
int *indices = new int[req.size()], num = 0;
if (blockAndWait)
MPI_Waitsome(req.size(), &req[0], &num, indices, status);
else
MPI_Testsome(req.size(), &req[0], &num, indices, status);
std::vector<RequestTagPair> reqTags;
this->CheckRequests(this->RecvBuffers, tags, blockAndWait, reqTags);
if (num == 0)
{
delete[] status;
delete[] indices;
//Nothing came in.
if (reqTags.empty())
return false;
}
std::vector<char*> incomingBuffers(num);
for (int i = 0; i < num; i++)
std::vector<char*> incomingBuffers;
incomingBuffers.reserve(reqTags.size());
for (const auto& rt : reqTags)
{
RequestTagPair entry(copy[indices[i]], reqTags[indices[i]]);
auto it = this->RecvBuffers.find(entry);
auto it = this->RecvBuffers.find(rt);
if (it == this->RecvBuffers.end())
{
delete[] status;
delete[] indices;
throw vtkm::cont::ErrorFilterExecution("receive buffer not found");
}
incomingBuffers[i] = it->second;
incomingBuffers.push_back(it->second);
this->RecvBuffers.erase(it);
}
ProcessReceivedBuffers(incomingBuffers, buffers);
this->ProcessReceivedBuffers(incomingBuffers, buffers);
for (int i = 0; i < num; i++)
PostRecv(reqTags[indices[i]]);
delete[] status;
delete[] indices;
//Re-post receives
for (const auto& rt : reqTags)
this->PostRecv(rt.second);
return !buffers.empty();
}
......@@ -327,10 +316,8 @@ bool Messenger::RecvData(std::set<int>& tags,
void Messenger::ProcessReceivedBuffers(std::vector<char*>& incomingBuffers,
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers)
{
for (std::size_t i = 0; i < incomingBuffers.size(); i++)
for (auto& buff : incomingBuffers)
{
char* buff = incomingBuffers[i];
//Grab the header.
Messenger::Header header;
memcpy(&header, buff, sizeof(header));
......
......@@ -44,20 +44,28 @@ public:
#endif
}
int GetRank() const { return this->Rank; }
int GetNumRanks() const { return this->NumRanks; }
#ifdef VTKM_ENABLE_MPI
VTKM_CONT void RegisterTag(int tag, std::size_t numRecvs, std::size_t size);
protected:
static std::size_t CalcMessageBufferSize(std::size_t msgSz);
void InitializeBuffers();
void CleanupRequests(int tag = TAG_ANY);
void CheckPendingSendRequests();
void PostRecv(int tag);
void PostRecv(int tag, std::size_t sz, int src = -1);
void CleanupRequests(int tag = TAG_ANY);
void SendData(int dst, int tag, const vtkmdiy::MemoryBuffer& buff);
bool RecvData(std::set<int>& tags,
bool RecvData(const std::set<int>& tags,
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers,
bool blockAndWait = false);
private:
void PostRecv(int tag);
void PostRecv(int tag, std::size_t sz, int src = -1);
//Message headers.
typedef struct
{
......@@ -87,12 +95,16 @@ protected:
std::map<RankIdPair, std::list<char*>> RecvPackets;
std::map<RequestTagPair, char*> SendBuffers;
static constexpr int TAG_ANY = -1;
void CheckRequests(const std::map<RequestTagPair, char*>& buffer,
const std::set<int>& tags,
bool BlockAndWait,
std::vector<RequestTagPair>& reqTags);
#else
protected:
static constexpr int NumRanks = 1;
static constexpr int Rank = 0;
#endif
static std::size_t CalcMessageBufferSize(std::size_t msgSz);
};
}
}
......
......@@ -106,7 +106,7 @@ void ParticleMessenger::Exchange(
numTerminateMessages = 0;
inDataBlockIDsMap.clear();
if (this->NumRanks == 1)
if (this->GetNumRanks() == 1)
return this->SerialExchange(
outData, outBlockIDsMap, numLocalTerm, inData, inDataBlockIDsMap, blockAndWait);
......@@ -124,7 +124,7 @@ void ParticleMessenger::Exchange(
//Do all the sends first.
if (numLocalTerm > 0)
SendAllMsg({ MSG_TERMINATE, static_cast<int>(numLocalTerm) });
this->SendAllMsg({ MSG_TERMINATE, static_cast<int>(numLocalTerm) });
this->SendParticles(sendData);
this->CheckPendingSendRequests();
......@@ -160,7 +160,7 @@ void ParticleMessenger::RegisterMessages(int msgSz, int nParticles, int numBlock
std::size_t messageBuffSz = CalcMessageBufferSize(msgSz + 1);
std::size_t particleBuffSz = CalcParticleBufferSize(nParticles, numBlockIds);
int numRecvs = std::min(64, this->NumRanks - 1);
int numRecvs = std::min(64, this->GetNumRanks() - 1);
this->RegisterTag(ParticleMessenger::MESSAGE_TAG, numRecvs, messageBuffSz);
this->RegisterTag(ParticleMessenger::PARTICLE_TAG, numRecvs, particleBuffSz);
......@@ -174,7 +174,7 @@ void ParticleMessenger::SendMsg(int dst, const std::vector<int>& msg)
vtkmdiy::MemoryBuffer buff;
//Write data.
vtkmdiy::save(buff, this->Rank);
vtkmdiy::save(buff, this->GetRank());
vtkmdiy::save(buff, msg);
this->SendData(dst, ParticleMessenger::MESSAGE_TAG, buff);
}
......@@ -182,8 +182,8 @@ void ParticleMessenger::SendMsg(int dst, const std::vector<int>& msg)
VTKM_CONT
void ParticleMessenger::SendAllMsg(const std::vector<int>& msg)
{
for (int i = 0; i < this->NumRanks; i++)
if (i != this->Rank)
for (int i = 0; i < this->GetNumRanks(); i++)
if (i != this->GetRank())
this->SendMsg(i, msg);
}
......@@ -211,23 +211,23 @@ bool ParticleMessenger::RecvAny(std::vector<MsgCommType>* msgs,
if (!this->RecvData(tags, buffers, blockAndWait))
return false;
for (size_t i = 0; i < buffers.size(); i++)
for (auto& buff : buffers)
{
if (buffers[i].first == ParticleMessenger::MESSAGE_TAG)
if (buff.first == ParticleMessenger::MESSAGE_TAG)
{
int sendRank;
std::vector<int> m;
vtkmdiy::load(buffers[i].second, sendRank);
vtkmdiy::load(buffers[i].second, m);
vtkmdiy::load(buff.second, sendRank);
vtkmdiy::load(buff.second, m);
msgs->push_back(std::make_pair(sendRank, m));
}
else if (buffers[i].first == ParticleMessenger::PARTICLE_TAG)
else if (buff.first == ParticleMessenger::PARTICLE_TAG)
{
int sendRank;
std::vector<ParticleCommType> particles;
vtkmdiy::load(buffers[i].second, sendRank);
vtkmdiy::load(buffers[i].second, particles);
vtkmdiy::load(buff.second, sendRank);
vtkmdiy::load(buff.second, particles);
recvParticles->push_back(std::make_pair(sendRank, particles));
}
}
......
......@@ -114,7 +114,7 @@ VTKM_CONT
template <typename P, template <typename, typename> class Container, typename Allocator>
inline void ParticleMessenger::SendParticles(int dst, const Container<P, Allocator>& c)
{
if (dst == this->Rank)
if (dst == this->GetRank())
{
VTKM_LOG_S(vtkm::cont::LogLevel::Error, "Error. Sending a particle to yourself.");
return;
......@@ -123,7 +123,7 @@ inline void ParticleMessenger::SendParticles(int dst, const Container<P, Allocat
return;
vtkmdiy::MemoryBuffer bb;
vtkmdiy::save(bb, this->Rank);
vtkmdiy::save(bb, this->GetRank());
vtkmdiy::save(bb, c);
this->SendData(dst, ParticleMessenger::PARTICLE_TAG, bb);
}
......@@ -133,9 +133,9 @@ template <typename P, template <typename, typename> class Container, typename Al
inline void ParticleMessenger::SendParticles(
const std::unordered_map<int, Container<P, Allocator>>& m)
{
for (auto mit = m.begin(); mit != m.end(); mit++)
if (!mit->second.empty())
this->SendParticles(mit->first, mit->second);
for (const auto& mit : m)
if (!mit.second.empty())
this->SendParticles(mit.first, mit.second);
}
#endif
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment