Commit 55fff59e authored by dpugmire's avatar dpugmire
Browse files

Cleanup the messenger classes.

parent 3aa2c752
......@@ -85,10 +85,8 @@ void Messenger::CleanupRequests(int tag)
if (!delKeys.empty())
{
for (const auto& it : delKeys)
for (auto& v : delKeys)
{
RequestTagPair v = it;
char* buff = this->RecvBuffers[v];
MPI_Cancel(&(v.first));
delete[] buff;
......@@ -125,11 +123,11 @@ void Messenger::CheckPendingSendRequests()
std::vector<MPI_Request> req, copy;
std::vector<int> tags;
for (auto it = this->SendBuffers.begin(); it != this->SendBuffers.end(); it++)
for (auto&& it : this->SendBuffers)
{
req.push_back(it->first.first);
copy.push_back(it->first.first);
tags.push_back(it->first.second);
req.push_back(it.first.first);
copy.push_back(it.first.first);
tags.push_back(it.first.second);
}
if (req.empty())
......@@ -228,20 +226,20 @@ void Messenger::SendData(int dst, int tag, const vtkmdiy::MemoryBuffer& buff)
std::vector<char*> bufferList;
//Add headers, break into multiple buffers if needed.
PrepareForSend(tag, buff, bufferList);
this->PrepareForSend(tag, buff, bufferList);
Messenger::Header header;
for (std::size_t i = 0; i < bufferList.size(); i++)
for (const auto& b : bufferList)
{
memcpy(&header, bufferList[i], sizeof(header));
memcpy(&header, b, sizeof(header));
MPI_Request req;
int err = MPI_Isend(bufferList[i], header.packetSz, MPI_BYTE, dst, tag, this->MPIComm, &req);
int err = MPI_Isend(b, header.packetSz, MPI_BYTE, dst, tag, this->MPIComm, &req);
if (err != MPI_SUCCESS)
throw vtkm::cont::ErrorFilterExecution("Error in MPI_Isend inside Messenger::SendData");
//Add it to sendBuffers
RequestTagPair entry(req, tag);
this->SendBuffers[entry] = bufferList[i];
this->SendBuffers[entry] = b;
}
}
......@@ -251,7 +249,7 @@ bool Messenger::RecvData(int tag, std::vector<vtkmdiy::MemoryBuffer>& buffers, b
setTag.insert(tag);
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>> b;
buffers.resize(0);
if (RecvData(setTag, b, blockAndWait))
if (this->RecvData(setTag, b, blockAndWait))
{
buffers.resize(b.size());
for (std::size_t i = 0; i < b.size(); i++)
......@@ -270,32 +268,30 @@ bool Messenger::RecvData(std::set<int>& tags,
//Find all recv of type tag.
std::vector<MPI_Request> req, copy;
std::vector<int> reqTags;
for (auto i = this->RecvBuffers.begin(); i != this->RecvBuffers.end(); i++)
for (const auto& i : this->RecvBuffers)
{
if (tags.find(i->first.second) != tags.end())
if (tags.find(i.first.second) != tags.end())
{
req.push_back(i->first.first);
copy.push_back(i->first.first);
reqTags.push_back(i->first.second);
req.push_back(i.first.first);
copy.push_back(i.first.first);
reqTags.push_back(i.first.second);
}
}
if (req.empty())
return false;
MPI_Status* status = new MPI_Status[req.size()];
int *indices = new int[req.size()], num = 0;
std::vector<MPI_Status> status(req.size());
std::vector<int> indices(req.size());
int num = 0;
if (blockAndWait)
MPI_Waitsome(req.size(), &req[0], &num, indices, status);
MPI_Waitsome(req.size(), req.data(), &num, indices.data(), status.data());
else
MPI_Testsome(req.size(), &req[0], &num, indices, status);
MPI_Testsome(req.size(), req.data(), &num, indices.data(), status.data());
if (num == 0)
{
delete[] status;
delete[] indices;
return false;
}
std::vector<char*> incomingBuffers(num);
for (int i = 0; i < num; i++)
......@@ -303,34 +299,25 @@ bool Messenger::RecvData(std::set<int>& tags,
RequestTagPair entry(copy[indices[i]], reqTags[indices[i]]);
auto it = this->RecvBuffers.find(entry);
if (it == this->RecvBuffers.end())
{
delete[] status;
delete[] indices;
throw vtkm::cont::ErrorFilterExecution("receive buffer not found");
}
incomingBuffers[i] = it->second;
this->RecvBuffers.erase(it);
}
ProcessReceivedBuffers(incomingBuffers, buffers);
this->ProcessReceivedBuffers(incomingBuffers, buffers);
for (int i = 0; i < num; i++)
PostRecv(reqTags[indices[i]]);
delete[] status;
delete[] indices;
return !buffers.empty();
}
void Messenger::ProcessReceivedBuffers(std::vector<char*>& incomingBuffers,
std::vector<std::pair<int, vtkmdiy::MemoryBuffer>>& buffers)
{
for (std::size_t i = 0; i < incomingBuffers.size(); i++)
for (auto& buff : incomingBuffers)
{
char* buff = incomingBuffers[i];
//Grab the header.
Messenger::Header header;
memcpy(&header, buff, sizeof(header));
......
......@@ -124,7 +124,7 @@ void ParticleMessenger::Exchange(
//Do all the sends first.
if (numLocalTerm > 0)
SendAllMsg({ MSG_TERMINATE, static_cast<int>(numLocalTerm) });
this->SendAllMsg({ MSG_TERMINATE, static_cast<int>(numLocalTerm) });
this->SendParticles(sendData);
this->CheckPendingSendRequests();
......@@ -211,23 +211,23 @@ bool ParticleMessenger::RecvAny(std::vector<MsgCommType>* msgs,
if (!this->RecvData(tags, buffers, blockAndWait))
return false;
for (size_t i = 0; i < buffers.size(); i++)
for (auto& buff : buffers)
{
if (buffers[i].first == ParticleMessenger::MESSAGE_TAG)
if (buff.first == ParticleMessenger::MESSAGE_TAG)
{
int sendRank;
std::vector<int> m;
vtkmdiy::load(buffers[i].second, sendRank);
vtkmdiy::load(buffers[i].second, m);
vtkmdiy::load(buff.second, sendRank);
vtkmdiy::load(buff.second, m);
msgs->push_back(std::make_pair(sendRank, m));
}
else if (buffers[i].first == ParticleMessenger::PARTICLE_TAG)
else if (buff.first == ParticleMessenger::PARTICLE_TAG)
{
int sendRank;
std::vector<ParticleCommType> particles;
vtkmdiy::load(buffers[i].second, sendRank);
vtkmdiy::load(buffers[i].second, particles);
vtkmdiy::load(buff.second, sendRank);
vtkmdiy::load(buff.second, particles);
recvParticles->push_back(std::make_pair(sendRank, particles));
}
}
......
......@@ -133,9 +133,9 @@ template <typename P, template <typename, typename> class Container, typename Al
inline void ParticleMessenger::SendParticles(
const std::unordered_map<int, Container<P, Allocator>>& m)
{
for (auto mit = m.begin(); mit != m.end(); mit++)
if (!mit->second.empty())
this->SendParticles(mit->first, mit->second);
for (auto&& mit : m)
if (!mit.second.empty())
this->SendParticles(mit.first, mit.second);
}
#endif
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment