Commit 90906069 authored by Wang, Ruonan's avatar Wang, Ruonan
Browse files

mpi handshake worked for all ssc tests

parent ef25fb76
......@@ -34,9 +34,9 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,
ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
ssc::GetParameter(m_IO.m_Parameters, "MaxStreamsPerApp", m_MaxStreamsPerApp);
ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength", m_MaxFilenameLength);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", m_RendezvousAppCount);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount", m_RendezvousStreamCount);
m_Buffer.resize(1);
......@@ -104,11 +104,6 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
{
TAU_SCOPED_TIMER_FUNC();
if (m_Verbosity >= 5)
{
std::cout << "SscReader::BeginStep, World Rank " << m_WorldRank
<< ", Reader Rank " << m_ReaderRank << std::endl;
}
if (m_InitialStep)
{
......@@ -141,6 +136,12 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
}
}
if (m_Verbosity >= 5)
{
std::cout << "SscReader::BeginStep, World Rank " << m_WorldRank
<< ", Reader Rank " << m_ReaderRank << ", Step "<< m_CurrentStep << std::endl;
}
if (m_Buffer[0] == 1)
{
return StepStatus::EndOfStream;
......@@ -187,236 +188,21 @@ void SscReader::SyncMpiPattern()
<< ", Reader Rank " << m_ReaderRank << std::endl;
}
m_MpiHandshake.Start(m_MaxStreamsPerApp, m_MaxFilenameLength, m_RendezvousAppCount, 'r', m_Name, CommAsMPI(m_Comm) );
m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength, m_RendezvousAppCount, 'r', m_Name, CommAsMPI(m_Comm) );
m_MpiHandshake.Wait(m_Name);
m_MpiHandshake.PrintMaps();
if (m_WorldSize == m_ReaderSize)
for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name))
{
throw(std::runtime_error("no writers are found"));
}
std::vector<int> lrbuf;
std::vector<int> grbuf;
// Process m_WorldRank == 0 to gather all the local rank m_WriterRank, and
// find out all the m_WriterRank == 0
if (m_WorldRank == 0)
{
grbuf.resize(m_WorldSize);
}
MPI_Gather(&m_ReaderRank, 1, MPI_INT, grbuf.data(), 1, MPI_INT, 0,
MPI_COMM_WORLD);
std::vector<int> AppStart; // m_WorldRank of the local rank 0 process
if (m_WorldRank == 0)
{
for (int i = 0; i < m_WorldSize; ++i)
{
if (grbuf[i] == 0)
{
AppStart.push_back(i);
}
}
m_AppSize = AppStart.size();
}
// Each local rank 0 process send their type (0 for writer, 1 for reader) to
// the world rank 0 process The AppStart are re-ordered to put all writers
// ahead of all the readers.
std::vector<int>
AppType; // Vector to record the type of the local rank 0 process
if (m_ReaderRank == 0) // Send type from each local rank 0 process to the
// world rank 0 process
{
if (m_WorldRank == 0) // App_ID
{
AppType.resize(m_AppSize);
for (int i = 0; i < m_AppSize; ++i)
{
if (i == 0)
{
AppType[i] = 1;
;
}
else
{
int tmp = 1;
MPI_Recv(&tmp, 1, MPI_INT, AppStart[i], 96, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
AppType[i] = tmp;
}
}
}
else
{
int tmp = 1; // type 1 for reader
MPI_Send(&tmp, 1, MPI_INT, 0, 96, MPI_COMM_WORLD); //
}
}
if (m_WorldRank == 0)
{
std::vector<int> AppWriter;
std::vector<int> AppReader;
for (int i = 0; i < m_AppSize; ++i)
{
if (AppType[i] == 0)
{
AppWriter.push_back(AppStart[i]);
}
else
{
AppReader.push_back(AppStart[i]);
}
}
m_WriterGlobalMpiInfo.resize(AppWriter.size());
m_ReaderGlobalMpiInfo.resize(AppReader.size());
AppStart = AppWriter;
AppStart.insert(AppStart.end(), AppReader.begin(), AppReader.end());
}
// Send the m_AppSize and m_AppID to each local rank 0 process
if (m_ReaderRank == 0) // Send m_AppID to each local rank 0 process
{
if (m_WorldRank == 0) // App_ID
{
for (int i = 0; i < m_AppSize; ++i)
{
MPI_Send(&i, 1, MPI_INT, AppStart[i], 99, MPI_COMM_WORLD); //
}
}
else
{
MPI_Recv(&m_AppID, 1, MPI_INT, 0, 99, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
}
}
m_Comm.Bcast(&m_AppID, sizeof(int),
0); // Local rank 0 process broadcast the m_AppID within the
// local communicator.
MPI_Bcast(&m_AppSize, 1, MPI_INT, 0, MPI_COMM_WORLD); // Bcast the m_AppSize
// In each local communicator, each local rank 0 process gathers the world
// rank of all the rest local processes.
if (m_ReaderRank == 0)
{
lrbuf.resize(m_ReaderSize);
}
m_Comm.Gather(&m_WorldRank, 1, lrbuf.data(), 1, 0);
// Send the WorldRank vector of each local communicator to the m_WorldRank
// == 0 process.
int WriterInfoSize = 0;
int ReaderInfoSize = 0;
if (m_ReaderRank == 0)
{
if (m_WorldRank == 0) // App_ID
{
for (int i = 0; i < m_WriterGlobalMpiInfo.size(); ++i)
{
if (i == 0)
{
m_WriterGlobalMpiInfo[i] = lrbuf;
++WriterInfoSize;
}
else
{
int j_writersize;
MPI_Recv(&j_writersize, 1, MPI_INT, AppStart[i], 96,
MPI_COMM_WORLD, MPI_STATUS_IGNORE); //
++WriterInfoSize;
m_WriterGlobalMpiInfo[i].resize(j_writersize);
MPI_Recv(m_WriterGlobalMpiInfo[i].data(), j_writersize,
MPI_INT, AppStart[i], 98, MPI_COMM_WORLD,
MPI_STATUS_IGNORE); //
}
}
for (int i = m_WriterGlobalMpiInfo.size(); i < m_AppSize; ++i)
{
if (i == 0)
{
m_ReaderGlobalMpiInfo[i] = lrbuf;
++ReaderInfoSize;
}
else
{
int j_readersize;
MPI_Recv(&j_readersize, 1, MPI_INT, AppStart[i], 95,
MPI_COMM_WORLD, MPI_STATUS_IGNORE); //
++ReaderInfoSize;
m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()]
.resize(j_readersize);
MPI_Recv(
m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()]
.data(),
j_readersize, MPI_INT, AppStart[i], 97, MPI_COMM_WORLD,
MPI_STATUS_IGNORE); //
}
}
}
else
{
MPI_Send(&m_ReaderSize, 1, MPI_INT, 0, 95, MPI_COMM_WORLD);
MPI_Send(lrbuf.data(), lrbuf.size(), MPI_INT, 0, 97,
MPI_COMM_WORLD);
}
}
// Broadcast m_WriterGlobalMpiInfo and m_ReaderGlobalMpiInfo to all the
// processes.
MPI_Bcast(&WriterInfoSize, 1, MPI_INT, 0,
MPI_COMM_WORLD); // Broadcast writerinfo size
MPI_Bcast(&ReaderInfoSize, 1, MPI_INT, 0, MPI_COMM_WORLD);
m_WriterGlobalMpiInfo.resize(WriterInfoSize);
m_ReaderGlobalMpiInfo.resize(ReaderInfoSize);
for (int i = 0; i < WriterInfoSize; ++i)
{
int ilen;
if (m_WorldRank == 0)
{
ilen = m_WriterGlobalMpiInfo[i].size();
}
MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD);
m_WriterGlobalMpiInfo[i].resize(ilen);
MPI_Bcast(m_WriterGlobalMpiInfo[i].data(), ilen, MPI_INT, 0,
MPI_COMM_WORLD); // Broadcast readerinfo size
}
for (int i = 0; i < ReaderInfoSize; ++i)
{
int ilen;
if (m_WorldRank == 0)
{
ilen = m_ReaderGlobalMpiInfo[i].size();
}
MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD);
m_ReaderGlobalMpiInfo[i].resize(ilen);
MPI_Bcast(m_ReaderGlobalMpiInfo[i].data(), ilen, MPI_INT, 0,
MPI_COMM_WORLD); // Broadcast readerinfo size
}
for (const auto &app : m_WriterGlobalMpiInfo)
{
for (int rank : app)
for (int rank : app.second)
{
m_AllWriterRanks.push_back(rank);
}
}
for (const auto &app : m_ReaderGlobalMpiInfo)
for (const auto &app : m_MpiHandshake.GetReaderMap(m_Name))
{
for (int rank : app)
for (int rank : app.second)
{
m_AllReaderRanks.push_back(rank);
}
......@@ -427,10 +213,6 @@ void SscReader::SyncMpiPattern()
MPI_Group_incl(worldGroup, m_AllWriterRanks.size(), m_AllWriterRanks.data(),
&m_MpiAllWritersGroup);
if (m_Verbosity >= 10 and m_WorldRank == 0)
{
ssc::PrintMpiInfo(m_WriterGlobalMpiInfo, m_ReaderGlobalMpiInfo);
}
}
void SscReader::SyncWritePattern()
......@@ -680,6 +462,7 @@ void SscReader::DoClose(const int transportIndex)
<< ", Reader Rank " << m_ReaderRank << std::endl;
}
MPI_Win_free(&m_MpiWin);
m_MpiHandshake.Finalize();
}
} // end namespace engine
......
......@@ -59,11 +59,6 @@ private:
int m_ReaderSize;
helper::MpiHandshake m_MpiHandshake;
int m_AppID = 0;
int m_AppSize = 0;
std::vector<std::vector<int>> m_WriterGlobalMpiInfo;
std::vector<std::vector<int>> m_ReaderGlobalMpiInfo;
std::vector<int> m_AllWriterRanks;
std::vector<int> m_AllReaderRanks;
......@@ -106,8 +101,8 @@ private:
ssc::RankPosMap &allOverlapRanks);
int m_Verbosity = 0;
int m_MaxStreamsPerApp = 4;
int m_MaxFilenameLength = 128;
int m_RendezvousStreamCount = 1;
int m_RendezvousAppCount = 2;
};
......
......@@ -34,9 +34,9 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode,
ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
ssc::GetParameter(m_IO.m_Parameters, "MaxStreamsPerApp", m_MaxStreamsPerApp);
ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength", m_MaxFilenameLength);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", m_RendezvousAppCount);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount", m_RendezvousStreamCount);
m_GlobalWritePattern.resize(m_WorldSize);
m_GlobalReadPattern.resize(m_WorldSize);
......@@ -48,12 +48,6 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
{
TAU_SCOPED_TIMER_FUNC();
if (m_Verbosity >= 5)
{
std::cout << "SscWriter::BeginStep, World Rank " << m_WorldRank
<< ", Writer Rank " << m_WriterRank << std::endl;
}
if (m_InitialStep)
{
m_InitialStep = false;
......@@ -62,6 +56,13 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
{
++m_CurrentStep;
}
if (m_Verbosity >= 5)
{
std::cout << "SscWriter::BeginStep, World Rank " << m_WorldRank
<< ", Writer Rank " << m_WriterRank << ", Step "<< m_CurrentStep << std::endl;
}
return StepStatus::OK;
}
......@@ -179,239 +180,27 @@ void SscWriter::SyncMpiPattern()
{
TAU_SCOPED_TIMER_FUNC();
m_MpiHandshake.Start(m_MaxStreamsPerApp, m_MaxFilenameLength, m_RendezvousAppCount, 'w', m_Name, CommAsMPI(m_Comm) );
m_MpiHandshake.Wait(m_Name);
m_MpiHandshake.PrintMaps();
if (m_Verbosity >= 5)
{
std::cout << "SscWriter::SyncMpiPattern, World Rank " << m_WorldRank
<< ", Writer Rank " << m_WriterRank << std::endl;
}
if (m_WorldSize == m_WriterSize)
{
throw(std::runtime_error("no readers are found"));
}
std::vector<int> lrbuf;
std::vector<int> grbuf;
// Process m_WorldRank == 0 to gather all the local rank m_WriterRank, and
// find out all the m_WriterRank == 0
if (m_WorldRank == 0)
{
grbuf.resize(m_WorldSize);
}
MPI_Gather(&m_WriterRank, 1, MPI_INT, grbuf.data(), 1, MPI_INT, 0,
MPI_COMM_WORLD);
std::vector<int> AppStart; // m_WorldRank of the local rank 0 process
if (m_WorldRank == 0)
{
for (int i = 0; i < m_WorldSize; ++i)
{
if (grbuf[i] == 0)
{
AppStart.push_back(i);
}
}
m_AppSize = AppStart.size();
}
// Each local rank 0 process send their type (0 for writer, 1 for reader) to
// the world rank 0 process The AppStart are re-ordered to put all writers
// ahead of all the readers.
std::vector<int>
AppType; // Vector to record the type of the local rank 0 process
if (m_WriterRank == 0) // Send type from each local rank 0 process to the
// world rank 0 process
{
if (m_WorldRank == 0) // App_ID
{
AppType.resize(m_AppSize);
for (int i = 0; i < m_AppSize; ++i)
{
if (i == 0)
{
AppType[i] = 0;
}
else
{
int tmp = 1;
MPI_Recv(&tmp, 1, MPI_INT, AppStart[i], 96, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
AppType[i] = tmp;
}
}
}
else
{
int tmp = 0; // type 0 for writer
MPI_Send(&tmp, 1, MPI_INT, 0, 96, MPI_COMM_WORLD); //
}
}
if (m_WorldRank == 0)
{
std::vector<int> AppWriter;
std::vector<int> AppReader;
for (int i = 0; i < m_AppSize; ++i)
{
if (AppType[i] == 0)
{
AppWriter.push_back(AppStart[i]);
}
else
{
AppReader.push_back(AppStart[i]);
}
}
m_WriterGlobalMpiInfo.resize(AppWriter.size());
m_ReaderGlobalMpiInfo.resize(AppReader.size());
AppStart = AppWriter;
AppStart.insert(AppStart.end(), AppReader.begin(), AppReader.end());
}
// Send the m_AppSize and m_AppID to each local rank 0 process
if (m_WriterRank == 0) // Send m_AppID to each local rank 0 process
{
if (m_WorldRank == 0) // App_ID
{
for (int i = 0; i < m_AppSize; ++i)
{
MPI_Send(&i, 1, MPI_INT, AppStart[i], 99, MPI_COMM_WORLD); //
}
}
else
{
MPI_Recv(&m_AppID, 1, MPI_INT, 0, 99, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
}
}
m_Comm.Bcast(&m_AppID, 1, 0); // Local rank 0 process broadcast the m_AppID
// within the local communicator.
MPI_Bcast(&m_AppSize, 1, MPI_INT, 0, MPI_COMM_WORLD); // Bcast the m_AppSize
// In each local communicator, each local rank 0 process gathers the world
// rank of all the rest local processes.
if (m_WriterRank == 0)
{
lrbuf.resize(m_WriterSize);
}
m_Comm.Gather(&m_WorldRank, 1, lrbuf.data(), 1, 0);
// Send the WorldRank vector of each local communicator to the m_WorldRank
// == 0 process.
int WriterInfoSize = 0;
int ReaderInfoSize = 0;
if (m_WriterRank == 0)
{
if (m_WorldRank == 0) // App_ID
{
for (int i = 0; i < m_WriterGlobalMpiInfo.size(); ++i)
{
if (i == 0)
{
m_WriterGlobalMpiInfo[i] = lrbuf;
++WriterInfoSize;
}
else
{
int j_writersize;
MPI_Recv(&j_writersize, 1, MPI_INT, AppStart[i], 96,
MPI_COMM_WORLD, MPI_STATUS_IGNORE); //
++WriterInfoSize;
m_WriterGlobalMpiInfo[i].resize(j_writersize);
MPI_Recv(m_WriterGlobalMpiInfo[i].data(), j_writersize,
MPI_INT, AppStart[i], 98, MPI_COMM_WORLD,
MPI_STATUS_IGNORE); //
}
}
for (int i = m_WriterGlobalMpiInfo.size(); i < m_AppSize; ++i)
{
if (i == 0)
{
m_ReaderGlobalMpiInfo[i] = lrbuf;
++ReaderInfoSize;
}
else
{
int j_readersize;
MPI_Recv(&j_readersize, 1, MPI_INT, AppStart[i], 95,
MPI_COMM_WORLD, MPI_STATUS_IGNORE); //
++ReaderInfoSize;
m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()]
.resize(j_readersize);
MPI_Recv(
m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()]
.data(),
j_readersize, MPI_INT, AppStart[i], 97, MPI_COMM_WORLD,
MPI_STATUS_IGNORE); //
}
}
}
else
{
MPI_Send(&m_WriterSize, 1, MPI_INT, 0, 96, MPI_COMM_WORLD);
MPI_Send(lrbuf.data(), lrbuf.size(), MPI_INT, 0, 98,
MPI_COMM_WORLD);
}
}
// Broadcast m_WriterGlobalMpiInfo and m_ReaderGlobalMpiInfo to all the
// processes.
MPI_Bcast(&WriterInfoSize, 1, MPI_INT, 0, MPI_COMM_WORLD);
MPI_Bcast(&ReaderInfoSize, 1, MPI_INT, 0, MPI_COMM_WORLD);
m_WriterGlobalMpiInfo.resize(WriterInfoSize);
m_ReaderGlobalMpiInfo.resize(ReaderInfoSize);
for (int i = 0; i < WriterInfoSize; ++i)
{
int ilen;
if (m_WorldRank == 0)
{
ilen = m_WriterGlobalMpiInfo[i].size();
}
MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD);
m_WriterGlobalMpiInfo[i].resize(ilen);
MPI_Bcast(m_WriterGlobalMpiInfo[i].data(), ilen, MPI_INT, 0,
MPI_COMM_WORLD);
}
for (int i = 0; i < ReaderInfoSize; ++i)
{
int ilen;
if (m_WorldRank == 0)
{
ilen = m_ReaderGlobalMpiInfo[i].size();
}
MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD);
m_ReaderGlobalMpiInfo[i].resize(ilen);
MPI_Bcast(m_ReaderGlobalMpiInfo[i].data(), ilen, MPI_INT, 0,
MPI_COMM_WORLD);
}
m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength, m_RendezvousAppCount, 'w', m_Name, CommAsMPI(m_Comm) );
m_MpiHandshake.Wait(m_Name);
m_MpiHandshake.PrintMaps();
for (const auto &app : m_WriterGlobalMpiInfo)
for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name))
{
for (int rank : app)
for (int rank : app.second)
{
m_AllWriterRanks.push_back(rank);
}
}
for (const auto &app : m_ReaderGlobalMpiInfo)
for (const auto &app : m_MpiHandshake.GetReaderMap(m_Name))
{
for (int rank : app)
for (int rank : app.second)
{
m_AllReaderRanks.push_back(rank);
}
......@@ -419,13 +208,7 @@ void SscWriter::SyncMpiPattern()
MPI_Group worldGroup;