Unverified Commit ddaeeb07 authored by Wang, Ruonan's avatar Wang, Ruonan Committed by GitHub
Browse files

Merge pull request #2019 from JasonRuonanWang/ssc-multistream

Add MpiHandshake to enable XGC-COUPLER-GENE communication pattern
parents 4e57dba2 70baa4d5
......@@ -231,6 +231,7 @@ if(ADIOS2_HAVE_MPI)
target_sources(adios2 PRIVATE
core/IOMPI.cpp
helper/adiosCommMPI.h helper/adiosCommMPI.cpp
helper/adiosMpiHandshake.h helper/adiosMpiHandshake.cpp
engine/insitumpi/InSituMPIWriter.cpp engine/insitumpi/InSituMPIWriter.tcc
engine/insitumpi/InSituMPIReader.cpp engine/insitumpi/InSituMPIReader.tcc
engine/insitumpi/InSituMPIFunctions.cpp engine/insitumpi/InSituMPISchedules.cpp
......
......@@ -303,6 +303,45 @@ void PrintMpiInfo(const MpiInfo &writersInfo, const MpiInfo &readersInfo)
std::cout << std::endl;
}
bool GetParameter(const Params &params, const std::string &key, int &value)
{
auto it = params.find(key);
if (it == params.end())
{
return false;
}
else
{
try
{
value = std::stoi(it->second);
}
catch (...)
{
std::string error =
"Engine parameter " + key + " can only be integer numbers";
std::cerr << error << std::endl;
return false;
}
}
return true;
}
bool GetParameter(const Params &params, const std::string &key,
std::string &value)
{
auto it = params.find(key);
if (it == params.end())
{
return false;
}
else
{
value = it->second;
}
return true;
}
} // end namespace ssc
} // end namespace engine
} // end namespace core
......
......@@ -66,6 +66,10 @@ void JsonToBlockVecVec(const std::string &input, BlockVecVec &output);
bool AreSameDims(const Dims &a, const Dims &b);
bool GetParameter(const Params &params, const std::string &key, int &value);
bool GetParameter(const Params &params, const std::string &key,
std::string &value);
} // end namespace ssc
} // end namespace engine
} // end namespace core
......
......@@ -10,6 +10,7 @@
#include "SscReader.tcc"
#include "adios2/helper/adiosComm.h"
#include "adios2/helper/adiosCommMPI.h"
#include "adios2/helper/adiosFunctions.h"
#include "adios2/helper/adiosJSONcomplex.h"
#include "nlohmann/json.hpp"
......@@ -31,24 +32,15 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,
m_ReaderRank = m_Comm.Rank();
m_ReaderSize = m_Comm.Size();
auto it = m_IO.m_Parameters.find("MpiMode");
if (it != m_IO.m_Parameters.end())
{
m_MpiMode = it->second;
}
it = m_IO.m_Parameters.find("Verbose");
if (it != m_IO.m_Parameters.end())
{
try
{
m_Verbosity = std::stoi(it->second);
}
catch (...)
{
std::cerr << "Engine parameter Verbose can only be integer numbers"
<< std::endl;
}
}
ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength",
m_MaxFilenameLength);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount",
m_RendezvousAppCount);
ssc::GetParameter(m_IO.m_Parameters, "MaxStreamsPerApp",
m_MaxStreamsPerApp);
ssc::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs", m_OpenTimeoutSecs);
m_Buffer.resize(1);
......@@ -116,12 +108,6 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
{
TAU_SCOPED_TIMER_FUNC();
if (m_Verbosity >= 5)
{
std::cout << "SscReader::BeginStep, World Rank " << m_WorldRank
<< ", Reader Rank " << m_ReaderRank << std::endl;
}
if (m_InitialStep)
{
m_InitialStep = false;
......@@ -153,6 +139,13 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
}
}
if (m_Verbosity >= 5)
{
std::cout << "SscReader::BeginStep, World Rank " << m_WorldRank
<< ", Reader Rank " << m_ReaderRank << ", Step "
<< m_CurrentStep << std::endl;
}
if (m_Buffer[0] == 1)
{
return StepStatus::EndOfStream;
......@@ -191,239 +184,29 @@ void SscReader::EndStep()
void SscReader::SyncMpiPattern()
{
TAU_SCOPED_TIMER_FUNC();
if (m_Verbosity >= 5)
{
std::cout << "SscReader::SyncMpiPattern, World Rank " << m_WorldRank
<< ", Reader Rank " << m_ReaderRank << std::endl;
}
TAU_SCOPED_TIMER_FUNC();
if (m_WorldSize == m_ReaderSize)
{
throw(std::runtime_error("no writers are found"));
}
std::vector<int> lrbuf;
std::vector<int> grbuf;
m_MpiHandshake.Handshake(m_Name, 'r', m_OpenTimeoutSecs, m_MaxStreamsPerApp,
m_MaxFilenameLength, m_RendezvousAppCount,
CommAsMPI(m_Comm));
// Process m_WorldRank == 0 to gather all the local rank m_WriterRank, and
// find out all the m_WriterRank == 0
if (m_WorldRank == 0)
for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name))
{
grbuf.resize(m_WorldSize);
}
MPI_Gather(&m_ReaderRank, 1, MPI_INT, grbuf.data(), 1, MPI_INT, 0,
MPI_COMM_WORLD);
std::vector<int> AppStart; // m_WorldRank of the local rank 0 process
if (m_WorldRank == 0)
{
for (int i = 0; i < m_WorldSize; ++i)
{
if (grbuf[i] == 0)
{
AppStart.push_back(i);
}
}
m_AppSize = AppStart.size();
}
// Each local rank 0 process send their type (0 for writer, 1 for reader) to
// the world rank 0 process The AppStart are re-ordered to put all writers
// ahead of all the readers.
std::vector<int>
AppType; // Vector to record the type of the local rank 0 process
if (m_ReaderRank == 0) // Send type from each local rank 0 process to the
// world rank 0 process
{
if (m_WorldRank == 0) // App_ID
{
AppType.resize(m_AppSize);
for (int i = 0; i < m_AppSize; ++i)
{
if (i == 0)
{
AppType[i] = 1;
;
}
else
{
int tmp = 1;
MPI_Recv(&tmp, 1, MPI_INT, AppStart[i], 96, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
AppType[i] = tmp;
}
}
}
else
{
int tmp = 1; // type 1 for reader
MPI_Send(&tmp, 1, MPI_INT, 0, 96, MPI_COMM_WORLD); //
}
}
if (m_WorldRank == 0)
{
std::vector<int> AppWriter;
std::vector<int> AppReader;
for (int i = 0; i < m_AppSize; ++i)
{
if (AppType[i] == 0)
{
AppWriter.push_back(AppStart[i]);
}
else
{
AppReader.push_back(AppStart[i]);
}
}
m_WriterGlobalMpiInfo.resize(AppWriter.size());
m_ReaderGlobalMpiInfo.resize(AppReader.size());
AppStart = AppWriter;
AppStart.insert(AppStart.end(), AppReader.begin(), AppReader.end());
}
// Send the m_AppSize and m_AppID to each local rank 0 process
if (m_ReaderRank == 0) // Send m_AppID to each local rank 0 process
{
if (m_WorldRank == 0) // App_ID
{
for (int i = 0; i < m_AppSize; ++i)
{
MPI_Send(&i, 1, MPI_INT, AppStart[i], 99, MPI_COMM_WORLD); //
}
}
else
{
MPI_Recv(&m_AppID, 1, MPI_INT, 0, 99, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
}
}
m_Comm.Bcast(&m_AppID, sizeof(int),
0); // Local rank 0 process broadcast the m_AppID within the
// local communicator.
MPI_Bcast(&m_AppSize, 1, MPI_INT, 0, MPI_COMM_WORLD); // Bcast the m_AppSize
// In each local communicator, each local rank 0 process gathers the world
// rank of all the rest local processes.
if (m_ReaderRank == 0)
{
lrbuf.resize(m_ReaderSize);
}
m_Comm.Gather(&m_WorldRank, 1, lrbuf.data(), 1, 0);
// Send the WorldRank vector of each local communicator to the m_WorldRank
// == 0 process.
int WriterInfoSize = 0;
int ReaderInfoSize = 0;
if (m_ReaderRank == 0)
{
if (m_WorldRank == 0) // App_ID
{
for (int i = 0; i < m_WriterGlobalMpiInfo.size(); ++i)
{
if (i == 0)
{
m_WriterGlobalMpiInfo[i] = lrbuf;
++WriterInfoSize;
}
else
{
int j_writersize;
MPI_Recv(&j_writersize, 1, MPI_INT, AppStart[i], 96,
MPI_COMM_WORLD, MPI_STATUS_IGNORE); //
++WriterInfoSize;
m_WriterGlobalMpiInfo[i].resize(j_writersize);
MPI_Recv(m_WriterGlobalMpiInfo[i].data(), j_writersize,
MPI_INT, AppStart[i], 98, MPI_COMM_WORLD,
MPI_STATUS_IGNORE); //
}
}
for (int i = m_WriterGlobalMpiInfo.size(); i < m_AppSize; ++i)
{
if (i == 0)
{
m_ReaderGlobalMpiInfo[i] = lrbuf;
++ReaderInfoSize;
}
else
{
int j_readersize;
MPI_Recv(&j_readersize, 1, MPI_INT, AppStart[i], 95,
MPI_COMM_WORLD, MPI_STATUS_IGNORE); //
++ReaderInfoSize;
m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()]
.resize(j_readersize);
MPI_Recv(
m_ReaderGlobalMpiInfo[i - m_WriterGlobalMpiInfo.size()]
.data(),
j_readersize, MPI_INT, AppStart[i], 97, MPI_COMM_WORLD,
MPI_STATUS_IGNORE); //
}
}
}
else
{
MPI_Send(&m_ReaderSize, 1, MPI_INT, 0, 95, MPI_COMM_WORLD);
MPI_Send(lrbuf.data(), lrbuf.size(), MPI_INT, 0, 97,
MPI_COMM_WORLD);
}
}
// Broadcast m_WriterGlobalMpiInfo and m_ReaderGlobalMpiInfo to all the
// processes.
MPI_Bcast(&WriterInfoSize, 1, MPI_INT, 0,
MPI_COMM_WORLD); // Broadcast writerinfo size
MPI_Bcast(&ReaderInfoSize, 1, MPI_INT, 0, MPI_COMM_WORLD);
m_WriterGlobalMpiInfo.resize(WriterInfoSize);
m_ReaderGlobalMpiInfo.resize(ReaderInfoSize);
for (int i = 0; i < WriterInfoSize; ++i)
{
int ilen;
if (m_WorldRank == 0)
{
ilen = m_WriterGlobalMpiInfo[i].size();
}
MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD);
m_WriterGlobalMpiInfo[i].resize(ilen);
MPI_Bcast(m_WriterGlobalMpiInfo[i].data(), ilen, MPI_INT, 0,
MPI_COMM_WORLD); // Broadcast readerinfo size
}
for (int i = 0; i < ReaderInfoSize; ++i)
{
int ilen;
if (m_WorldRank == 0)
{
ilen = m_ReaderGlobalMpiInfo[i].size();
}
MPI_Bcast(&ilen, 1, MPI_INT, 0, MPI_COMM_WORLD);
m_ReaderGlobalMpiInfo[i].resize(ilen);
MPI_Bcast(m_ReaderGlobalMpiInfo[i].data(), ilen, MPI_INT, 0,
MPI_COMM_WORLD); // Broadcast readerinfo size
}
for (const auto &app : m_WriterGlobalMpiInfo)
{
for (int rank : app)
for (int rank : app.second)
{
m_AllWriterRanks.push_back(rank);
}
}
for (const auto &app : m_ReaderGlobalMpiInfo)
for (const auto &app : m_MpiHandshake.GetReaderMap(m_Name))
{
for (int rank : app)
for (int rank : app.second)
{
m_AllReaderRanks.push_back(rank);
}
......@@ -433,11 +216,6 @@ void SscReader::SyncMpiPattern()
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, m_AllWriterRanks.size(), m_AllWriterRanks.data(),
&m_MpiAllWritersGroup);
if (m_Verbosity >= 10 and m_WorldRank == 0)
{
ssc::PrintMpiInfo(m_WriterGlobalMpiInfo, m_ReaderGlobalMpiInfo);
}
}
void SscReader::SyncWritePattern()
......
......@@ -13,6 +13,7 @@
#include "SscHelper.h"
#include "adios2/core/Engine.h"
#include "adios2/helper/adiosMpiHandshake.h"
#include "adios2/toolkit/profiling/taustubs/tautimer.hpp"
#include <mpi.h>
#include <queue>
......@@ -56,10 +57,8 @@ private:
int m_WorldSize;
int m_ReaderRank;
int m_ReaderSize;
int m_AppID = 0;
int m_AppSize = 0;
std::vector<std::vector<int>> m_WriterGlobalMpiInfo;
std::vector<std::vector<int>> m_ReaderGlobalMpiInfo;
helper::MpiHandshake m_MpiHandshake;
std::vector<int> m_AllWriterRanks;
std::vector<int> m_AllReaderRanks;
......@@ -102,6 +101,10 @@ private:
ssc::RankPosMap &allOverlapRanks);
int m_Verbosity = 0;
int m_MaxFilenameLength = 128;
int m_MaxStreamsPerApp = 1;
int m_RendezvousAppCount = 2;
int m_OpenTimeoutSecs = 10;
};
} // end namespace engine
......
......@@ -10,11 +10,10 @@
#include "SscWriter.tcc"
#include "adios2/helper/adiosComm.h"
#include "adios2/helper/adiosCommMPI.h"
#include "adios2/helper/adiosJSONcomplex.h"
#include "nlohmann/json.hpp"
#include "adios2/helper/adiosCommMPI.h"
namespace adios2
{
namespace core
......@@ -32,24 +31,15 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode,
m_WriterRank = m_Comm.Rank();
m_WriterSize = m_Comm.Size();
auto it = m_IO.m_Parameters.find("MpiMode");
if (it != m_IO.m_Parameters.end())
{
m_MpiMode = it->second;
}
it = m_IO.m_Parameters.find("Verbose");
if (it != m_IO.m_Parameters.end())
{
try
{
m_Verbosity = std::stoi(it->second);
}
catch (...)
{
std::cerr << "Engine parameter Verbose can only be integer numbers"
<< std::endl;
}
}
ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength",
m_MaxFilenameLength);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount",
m_RendezvousAppCount);
ssc::GetParameter(m_IO.m_Parameters, "MaxStreamsPerApp",
m_MaxStreamsPerApp);
ssc::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs", m_OpenTimeoutSecs);
m_GlobalWritePattern.resize(m_WorldSize);
m_GlobalReadPattern.resize(m_WorldSize);
......@@ -61,12 +51,6 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
{
TAU_SCOPED_TIMER_FUNC();
if (m_Verbosity >= 5)
{
std::cout << "SscWriter::BeginStep, World Rank " << m_WorldRank
<< ", Writer Rank " << m_WriterRank << std::endl;
}
if (m_InitialStep)
{
m_InitialStep = false;
......@@ -75,6 +59,14 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
{
++m_CurrentStep;
}
if (m_Verbosity >= 5)
{
std::cout << "SscWriter::BeginStep, World Rank " << m_WorldRank
<< ", Writer Rank " << m_WriterRank << ", Step "
<< m_CurrentStep << std::endl;
}
return StepStatus::OK;
}
......@@ -198,229 +190,21 @@ void SscWriter::SyncMpiPattern()
<< ", Writer Rank " << m_WriterRank << std::endl;
}
if (m_WorldSize == m_WriterSize)
{
throw(std::runtime_error("no readers are found"));
}
std::vector<int> lrbuf;
std::vector<int> grbuf;
// Process m_WorldRank == 0 to gather all the local rank m_WriterRank, and
// find out all the m_WriterRank == 0
if (m_WorldRank == 0)
{
grbuf.resize(m_WorldSize);
}
MPI_Gather(&m_WriterRank, 1, MPI_INT, grbuf.data(), 1, MPI_INT, 0,
MPI_COMM_WORLD);
m_MpiHandshake.Handshake(m_Name, 'w', m_OpenTimeoutSecs, m_MaxStreamsPerApp,
m_MaxFilenameLength, m_RendezvousAppCount,
CommAsMPI(m_Comm));
std::vector<int> AppStart; // m_WorldRank of the local rank 0 process
if (m_WorldRank == 0)
for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name))
{
for (int i = 0; i < m_WorldSize; ++i)
{
if (grbuf[i] == 0)
{
AppStart.push_back(i);
}
}
m_AppSize = AppStart.size();
}
// Each local rank 0 process send their type (0 for writer, 1 for reader) to
// the world rank 0 process The AppStart are re-ordered to put all writers
// ahead of all the readers.
std::vector<int>
AppType; // Vector to record the type of the local rank 0 process
if (m_WriterRank == 0) // Send type from each local rank 0 process to the
// world rank 0 process
{
if (m_WorldRank == 0) // App_ID
{
AppType.resize(m_AppSize);
for (int i = 0; i < m_AppSize; ++i)
{
if (i == 0)
{
AppType[i] = 0;
}
else
{
int tmp = 1;
MPI_Recv(&tmp, 1, MPI_INT, AppStart[i], 96, MPI_COMM_WORLD,
MPI_STATUS_IGNORE);
AppType[i] = tmp;
}
}
}
else
{
int tmp = 0; // type 0 for writer
MPI_Send(&tmp, 1, MPI_INT, 0, 96, MPI_COMM_WORLD); //
}
}
if (m_WorldRank == 0)
{
std::vector<int> AppWriter;
std::vector<int> AppReader;
for (int i = 0; i < m_AppSize; ++i)
{
if (AppType[i] == 0)
{
AppWriter.push_back(AppStart[i]);
}
else
{
AppReader.push_back(AppStart[i]);
}
}
m_WriterGlobalMpiInfo.resize(AppWriter.size());
m_ReaderGlobalMpiInfo.resize(AppReader.size());
AppStart = AppWriter;
AppStart.insert(AppStart.end(), AppReader.begin(), AppReader.end());
}
// Send the m_AppSize and m_AppID to each local rank 0 process
if (m_WriterRank == 0) // Send m_AppID to each local rank 0 process
{
if (m_WorldRank == 0) // App_ID
{
for (int i = 0; i < m_AppSize; ++i)
{
MPI_Send(&i, 1, MPI_INT, AppStart[i], 99, MPI_COMM_WORLD); //
}
}
else
{