Commit 5c57e9f4 authored by Ruonan Wang's avatar Ruonan Wang
Browse files

moved mpi group and stream communicator creation to MpiHandshake

parent 6ec7bebf
......@@ -197,36 +197,9 @@ void SscReader::SyncMpiPattern()
m_MaxFilenameLength, m_RendezvousAppCount,
CommAsMPI(m_Comm));
std::vector<int> allWorkerRanks;
std::vector<int> allWriterRanks;
for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name))
{
for (int rank : app.second)
{
allWriterRanks.push_back(rank);
allWorkerRanks.push_back(rank);
}
}
for (const auto &app : m_MpiHandshake.GetReaderMap(m_Name))
{
for (int rank : app.second)
{
allWorkerRanks.push_back(rank);
}
}
MPI_Group worldGroup;
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, allWriterRanks.size(), allWriterRanks.data(),
&m_MpiAllWritersGroup);
std::sort(allWorkerRanks.begin(), allWorkerRanks.end());
MPI_Group allWorkersGroup;
MPI_Group_incl(worldGroup, allWorkerRanks.size(), allWorkerRanks.data(),
&allWorkersGroup);
MPI_Comm_create_group(MPI_COMM_WORLD, allWorkersGroup, 0, &m_StreamComm);
MPI_Barrier(m_StreamComm);
m_MpiAllWritersGroup = m_MpiHandshake.GetAllWritersGroup(m_Name);
m_StreamComm = m_MpiHandshake.GetStreamComm(m_Name);
}
void SscReader::SyncWritePattern()
......
......@@ -196,37 +196,9 @@ void SscWriter::SyncMpiPattern()
m_MaxFilenameLength, m_RendezvousAppCount,
CommAsMPI(m_Comm));
std::vector<int> allWorkerRanks;
std::vector<int> allReaderRanks;
m_MpiAllReadersGroup = m_MpiHandshake.GetAllReadersGroup(m_Name);
for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name))
{
for (int rank : app.second)
{
allWorkerRanks.push_back(rank);
}
}
for (const auto &app : m_MpiHandshake.GetReaderMap(m_Name))
{
for (int rank : app.second)
{
allWorkerRanks.push_back(rank);
allReaderRanks.push_back(rank);
}
}
MPI_Group worldGroup;
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, allReaderRanks.size(), allReaderRanks.data(),
&m_MpiAllReadersGroup);
std::sort(allWorkerRanks.begin(), allWorkerRanks.end());
MPI_Group allWorkersGroup;
MPI_Group_incl(worldGroup, allWorkerRanks.size(), allWorkerRanks.data(),
&allWorkersGroup);
MPI_Comm_create_group(MPI_COMM_WORLD, allWorkersGroup, 0, &m_StreamComm);
MPI_Barrier(m_StreamComm);
m_StreamComm = m_MpiHandshake.GetStreamComm(m_Name);
}
void SscWriter::SyncWritePattern()
......
......@@ -254,6 +254,74 @@ MpiHandshake::GetReaderMap(const std::string &filename)
return m_ReadersMap[filename];
}
MPI_Comm MpiHandshake::GetStreamComm(const std::string &filename)
{
std::vector<int> allStreamRanks;
for (const auto &app : GetWriterMap(filename))
{
for (int rank : app.second)
{
allStreamRanks.push_back(rank);
}
}
for (const auto &app : GetReaderMap(filename))
{
for (int rank : app.second)
{
allStreamRanks.push_back(rank);
}
}
MPI_Group worldGroup;
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
std::sort(allStreamRanks.begin(), allStreamRanks.end());
MPI_Group allWorkersGroup;
MPI_Group_incl(worldGroup, allStreamRanks.size(), allStreamRanks.data(), &allWorkersGroup);
MPI_Comm streamComm;
MPI_Comm_create_group(MPI_COMM_WORLD, allWorkersGroup, 0, &streamComm);
return streamComm;
}
MPI_Group MpiHandshake::GetAllReadersGroup(const std::string &filename)
{
std::vector<int> allReaderRanks;
for (const auto &app : GetReaderMap(filename))
{
for (int rank : app.second)
{
allReaderRanks.push_back(rank);
}
}
MPI_Group worldGroup;
MPI_Group allReadersGroup;
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, allReaderRanks.size(), allReaderRanks.data(), &allReadersGroup);
return allReadersGroup;
}
MPI_Group MpiHandshake::GetAllWritersGroup(const std::string &filename)
{
std::vector<int> allWriterRanks;
for (const auto &app : GetWriterMap(filename))
{
for (int rank : app.second)
{
allWriterRanks.push_back(rank);
}
}
MPI_Group worldGroup;
MPI_Group allWritersGroup;
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, allWriterRanks.size(), allWriterRanks.data(), &allWritersGroup);
return allWritersGroup;
}
void MpiHandshake::PrintMaps(const int printRank, const std::string &filename)
{
if (m_WorldRank == printRank)
......
......@@ -95,6 +95,33 @@ public:
static const std::map<int, std::vector<int>> &
GetReaderMap(const std::string &filename);
/**
* Get the MPI communicator for the stream, containing all writer ranks and reader ranks that work on the stream filename
*
* @param filename: name of the staging stream
*
* @return the communicator for the stream
*/
static MPI_Comm GetStreamComm(const std::string &filename);
/**
* Get the MPI group for all readers in the stream filename
*
* @param filename: name of the staging stream
*
* @return the MPI group
*/
static MPI_Group GetAllReadersGroup(const std::string &filename);
/**
* Get the MPI group for all writers in the stream filename
*
* @param filename: name of the staging stream
*
* @return the MPI group
*/
static MPI_Group GetAllWritersGroup(const std::string &filename);
private:
static void Test();
static bool Check(const std::string &filename);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment