Unverified Commit b1f1ac08 authored by Wang, Ruonan's avatar Wang, Ruonan Committed by GitHub
Browse files

Merge pull request #2024 from JasonRuonanWang/ssc-multistream

Fix a few multi-stream issues in SSC and add tests for xgc-coupler-gene use case
parents c066f19e 843a788a
......@@ -27,10 +27,6 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,
: Engine("SscReader", io, name, mode, std::move(comm))
{
TAU_SCOPED_TIMER_FUNC();
MPI_Comm_rank(MPI_COMM_WORLD, &m_WorldRank);
MPI_Comm_size(MPI_COMM_WORLD, &m_WorldSize);
m_ReaderRank = m_Comm.Rank();
m_ReaderSize = m_Comm.Size();
ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
......@@ -44,9 +40,13 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,
m_Buffer.resize(1);
m_GlobalWritePattern.resize(m_WorldSize);
SyncMpiPattern();
SyncWritePattern();
m_ReaderRank = m_Comm.Rank();
m_ReaderSize = m_Comm.Size();
MPI_Comm_rank(m_StreamComm, &m_StreamRank);
MPI_Comm_size(m_StreamComm, &m_StreamSize);
m_GlobalWritePattern.resize(m_StreamSize);
}
SscReader::~SscReader() { TAU_SCOPED_TIMER_FUNC(); }
......@@ -97,7 +97,7 @@ void SscReader::GetTwoSided()
{
requests.emplace_back();
MPI_Irecv(m_Buffer.data() + i.second.first, i.second.second, MPI_CHAR,
i.first, 0, MPI_COMM_WORLD, &requests.back());
i.first, 0, m_StreamComm, &requests.back());
}
MPI_Status statuses[requests.size()];
MPI_Waitall(requests.size(), requests.data(), statuses);
......@@ -111,7 +111,8 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
if (m_InitialStep)
{
m_InitialStep = false;
MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, MPI_COMM_WORLD, &m_MpiWin);
SyncWritePattern();
MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, m_StreamComm, &m_MpiWin);
MPI_Win_start(m_MpiAllWritersGroup, 0, m_MpiWin);
}
else
......@@ -141,7 +142,7 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
if (m_Verbosity >= 5)
{
std::cout << "SscReader::BeginStep, World Rank " << m_WorldRank
std::cout << "SscReader::BeginStep, World Rank " << m_StreamRank
<< ", Reader Rank " << m_ReaderRank << ", Step "
<< m_CurrentStep << std::endl;
}
......@@ -171,11 +172,11 @@ void SscReader::EndStep()
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
MPI_COMM_WORLD, &m_MpiWin);
m_StreamComm, &m_MpiWin);
}
if (m_Verbosity >= 5)
{
std::cout << "SscReader::EndStep, World Rank " << m_WorldRank
std::cout << "SscReader::EndStep, World Rank " << m_StreamRank
<< ", Reader Rank " << m_ReaderRank << std::endl;
}
}
......@@ -188,7 +189,7 @@ void SscReader::SyncMpiPattern()
if (m_Verbosity >= 5)
{
std::cout << "SscReader::SyncMpiPattern, World Rank " << m_WorldRank
std::cout << "SscReader::SyncMpiPattern, World Rank " << m_StreamRank
<< ", Reader Rank " << m_ReaderRank << std::endl;
}
......@@ -196,11 +197,14 @@ void SscReader::SyncMpiPattern()
m_MaxFilenameLength, m_RendezvousAppCount,
CommAsMPI(m_Comm));
m_MpiAllWritersGroup = m_MpiHandshake.GetAllWritersGroup(m_Name);
std::vector<int> allStreamRanks;
for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name))
{
for (int rank : app.second)
{
m_AllWriterRanks.push_back(rank);
allStreamRanks.push_back(rank);
}
}
......@@ -208,14 +212,17 @@ void SscReader::SyncMpiPattern()
{
for (int rank : app.second)
{
m_AllReaderRanks.push_back(rank);
allStreamRanks.push_back(rank);
}
}
MPI_Group worldGroup;
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, m_AllWriterRanks.size(), m_AllWriterRanks.data(),
&m_MpiAllWritersGroup);
std::sort(allStreamRanks.begin(), allStreamRanks.end());
MPI_Group allWorkersGroup;
MPI_Group_incl(worldGroup, allStreamRanks.size(), allStreamRanks.data(),
&allWorkersGroup);
MPI_Comm_create_group(MPI_COMM_WORLD, allWorkersGroup, 0, &m_StreamComm);
}
void SscReader::SyncWritePattern()
......@@ -223,7 +230,7 @@ void SscReader::SyncWritePattern()
TAU_SCOPED_TIMER_FUNC();
if (m_Verbosity >= 5)
{
std::cout << "SscReader::SyncWritePattern, World Rank " << m_WorldRank
std::cout << "SscReader::SyncWritePattern, World Rank " << m_StreamRank
<< ", Reader Rank " << m_ReaderRank << std::endl;
}
......@@ -231,16 +238,16 @@ void SscReader::SyncWritePattern()
size_t localSize = 0;
size_t maxLocalSize;
MPI_Allreduce(&localSize, &maxLocalSize, 1, MPI_UNSIGNED_LONG_LONG, MPI_MAX,
MPI_COMM_WORLD);
m_StreamComm);
std::vector<char> localVec(maxLocalSize, '\0');
std::vector<char> globalVec(maxLocalSize * m_WorldSize);
std::vector<char> globalVec(maxLocalSize * m_StreamSize);
MPI_Allgather(localVec.data(), maxLocalSize, MPI_CHAR, globalVec.data(),
maxLocalSize, MPI_CHAR, MPI_COMM_WORLD);
maxLocalSize, MPI_CHAR, m_StreamComm);
// deserialize global metadata Json
try
{
for (size_t i = 0; i < m_WorldSize; ++i)
for (size_t i = 0; i < m_StreamSize; ++i)
{
if (globalVec[i * maxLocalSize] == '\0')
{
......@@ -287,7 +294,7 @@ void SscReader::SyncWritePattern()
}
}
for (int i = 0; i < m_WorldSize; ++i)
for (int i = 0; i < m_StreamSize; ++i)
{
if (m_GlobalWritePatternJson[i] == nullptr)
{
......@@ -338,7 +345,7 @@ void SscReader::SyncReadPattern()
TAU_SCOPED_TIMER_FUNC();
if (m_Verbosity >= 5)
{
std::cout << "SscReader::SyncReadPattern, World Rank " << m_WorldRank
std::cout << "SscReader::SyncReadPattern, World Rank " << m_StreamRank
<< ", Reader Rank " << m_ReaderRank << std::endl;
}
......@@ -348,18 +355,18 @@ void SscReader::SyncReadPattern()
size_t localSize = localStr.size();
size_t maxLocalSize;
MPI_Allreduce(&localSize, &maxLocalSize, 1, MPI_UNSIGNED_LONG_LONG, MPI_MAX,
MPI_COMM_WORLD);
m_StreamComm);
std::vector<char> localVec(maxLocalSize, '\0');
std::memcpy(localVec.data(), localStr.c_str(), localStr.size());
std::vector<char> globalVec(maxLocalSize * m_WorldSize);
std::vector<char> globalVec(maxLocalSize * m_StreamSize);
MPI_Allgather(localVec.data(), maxLocalSize, MPI_CHAR, globalVec.data(),
maxLocalSize, MPI_CHAR, MPI_COMM_WORLD);
maxLocalSize, MPI_CHAR, m_StreamComm);
// deserialize global metadata Json
nlohmann::json globalJson;
try
{
for (size_t i = 0; i < m_WorldSize; ++i)
for (size_t i = 0; i < m_StreamSize; ++i)
{
if (globalVec[i * maxLocalSize] == '\0')
{
......@@ -461,7 +468,7 @@ void SscReader::DoClose(const int transportIndex)
TAU_SCOPED_TIMER_FUNC();
if (m_Verbosity >= 5)
{
std::cout << "SscReader::DoClose, World Rank " << m_WorldRank
std::cout << "SscReader::DoClose, World Rank " << m_StreamRank
<< ", Reader Rank " << m_ReaderRank << std::endl;
}
MPI_Win_free(&m_MpiWin);
......
......@@ -51,16 +51,15 @@ private:
std::vector<char> m_Buffer;
MPI_Win m_MpiWin;
MPI_Group m_MpiAllWritersGroup;
MPI_Comm m_StreamComm;
std::string m_MpiMode = "TwoSided";
int m_WorldRank;
int m_WorldSize;
int m_StreamRank;
int m_StreamSize;
int m_ReaderRank;
int m_ReaderSize;
helper::MpiHandshake m_MpiHandshake;
std::vector<int> m_AllWriterRanks;
std::vector<int> m_AllReaderRanks;
void SyncMpiPattern();
void SyncWritePattern();
......
......@@ -26,10 +26,6 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode,
: Engine("SscWriter", io, name, mode, std::move(comm))
{
TAU_SCOPED_TIMER_FUNC();
MPI_Comm_rank(MPI_COMM_WORLD, &m_WorldRank);
MPI_Comm_size(MPI_COMM_WORLD, &m_WorldSize);
m_WriterRank = m_Comm.Rank();
m_WriterSize = m_Comm.Size();
ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
......@@ -41,10 +37,16 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode,
m_MaxStreamsPerApp);
ssc::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs", m_OpenTimeoutSecs);
m_GlobalWritePattern.resize(m_WorldSize);
m_GlobalReadPattern.resize(m_WorldSize);
m_Buffer.resize(1);
SyncMpiPattern();
m_WriterRank = m_Comm.Rank();
m_WriterSize = m_Comm.Size();
MPI_Comm_rank(m_StreamComm, &m_StreamRank);
MPI_Comm_size(m_StreamComm, &m_StreamSize);
m_GlobalWritePattern.resize(m_StreamSize);
m_GlobalReadPattern.resize(m_StreamSize);
}
StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
......@@ -62,7 +64,7 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
if (m_Verbosity >= 5)
{
std::cout << "SscWriter::BeginStep, World Rank " << m_WorldRank
std::cout << "SscWriter::BeginStep, World Rank " << m_StreamRank
<< ", Writer Rank " << m_WriterRank << ", Step "
<< m_CurrentStep << std::endl;
}
......@@ -124,7 +126,7 @@ void SscWriter::PutTwoSided()
{
requests.emplace_back();
MPI_Isend(m_Buffer.data(), m_Buffer.size(), MPI_CHAR, i.first, 0,
MPI_COMM_WORLD, &requests.back());
m_StreamComm, &requests.back());
}
MPI_Status statuses[requests.size()];
MPI_Waitall(requests.size(), requests.data(), statuses);
......@@ -135,7 +137,7 @@ void SscWriter::EndStep()
TAU_SCOPED_TIMER_FUNC();
if (m_Verbosity >= 5)
{
std::cout << "SscWriter::EndStep, World Rank " << m_WorldRank
std::cout << "SscWriter::EndStep, World Rank " << m_StreamRank
<< ", Writer Rank " << m_WriterRank << ", Step "
<< m_CurrentStep << std::endl;
}
......@@ -144,12 +146,12 @@ void SscWriter::EndStep()
{
SyncWritePattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
MPI_COMM_WORLD, &m_MpiWin);
m_StreamComm, &m_MpiWin);
PutOneSidedPostPull();
MPI_Win_free(&m_MpiWin);
SyncReadPattern();
MPI_Win_create(m_Buffer.data(), m_Buffer.size(), 1, MPI_INFO_NULL,
MPI_COMM_WORLD, &m_MpiWin);
m_StreamComm, &m_MpiWin);
}
else
{
......@@ -186,7 +188,7 @@ void SscWriter::SyncMpiPattern()
if (m_Verbosity >= 5)
{
std::cout << "SscWriter::SyncMpiPattern, World Rank " << m_WorldRank
std::cout << "SscWriter::SyncMpiPattern, World Rank " << m_StreamRank
<< ", Writer Rank " << m_WriterRank << std::endl;
}
......@@ -194,11 +196,15 @@ void SscWriter::SyncMpiPattern()
m_MaxFilenameLength, m_RendezvousAppCount,
CommAsMPI(m_Comm));
m_MpiAllReadersGroup = m_MpiHandshake.GetAllReadersGroup(m_Name);
std::vector<int> allStreamRanks;
for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name))
{
for (int rank : app.second)
{
m_AllWriterRanks.push_back(rank);
allStreamRanks.push_back(rank);
}
}
......@@ -206,14 +212,17 @@ void SscWriter::SyncMpiPattern()
{
for (int rank : app.second)
{
m_AllReaderRanks.push_back(rank);
allStreamRanks.push_back(rank);
}
}
MPI_Group worldGroup;
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, m_AllReaderRanks.size(), m_AllReaderRanks.data(),
&m_MpiAllReadersGroup);
std::sort(allStreamRanks.begin(), allStreamRanks.end());
MPI_Group allWorkersGroup;
MPI_Group_incl(worldGroup, allStreamRanks.size(), allStreamRanks.data(),
&allWorkersGroup);
MPI_Comm_create_group(MPI_COMM_WORLD, allWorkersGroup, 0, &m_StreamComm);
}
void SscWriter::SyncWritePattern()
......@@ -221,13 +230,13 @@ void SscWriter::SyncWritePattern()
TAU_SCOPED_TIMER_FUNC();
if (m_Verbosity >= 5)
{
std::cout << "SscWriter::SyncWritePattern, World Rank " << m_WorldRank
std::cout << "SscWriter::SyncWritePattern, World Rank " << m_StreamRank
<< ", Writer Rank " << m_WriterRank << std::endl;
}
// serialize local writer rank variables metadata
nlohmann::json localRankMetaJ;
for (const auto &b : m_GlobalWritePattern[m_WorldRank])
for (const auto &b : m_GlobalWritePattern[m_StreamRank])
{
localRankMetaJ["Variables"].emplace_back();
auto &jref = localRankMetaJ["Variables"].back();
......@@ -278,18 +287,18 @@ void SscWriter::SyncWritePattern()
size_t localSize = localStr.size();
size_t maxLocalSize;
MPI_Allreduce(&localSize, &maxLocalSize, 1, MPI_UNSIGNED_LONG_LONG, MPI_MAX,
MPI_COMM_WORLD);
m_StreamComm);
std::vector<char> localVec(maxLocalSize, '\0');
std::memcpy(localVec.data(), localStr.data(), localStr.size());
std::vector<char> globalVec(maxLocalSize * m_WorldSize, '\0');
std::vector<char> globalVec(maxLocalSize * m_StreamSize, '\0');
MPI_Allgather(localVec.data(), maxLocalSize, MPI_CHAR, globalVec.data(),
maxLocalSize, MPI_CHAR, MPI_COMM_WORLD);
maxLocalSize, MPI_CHAR, m_StreamComm);
// deserialize global metadata Json
nlohmann::json globalJson;
try
{
for (size_t i = 0; i < m_WorldSize; ++i)
for (size_t i = 0; i < m_StreamSize; ++i)
{
if (globalVec[i * maxLocalSize] == '\0')
{
......@@ -319,24 +328,24 @@ void SscWriter::SyncReadPattern()
TAU_SCOPED_TIMER_FUNC();
if (m_Verbosity >= 5)
{
std::cout << "SscWriter::SyncReadPattern, World Rank " << m_WorldRank
std::cout << "SscWriter::SyncReadPattern, World Rank " << m_StreamRank
<< ", Writer Rank " << m_WriterRank << std::endl;
}
size_t localSize = 0;
size_t maxLocalSize;
MPI_Allreduce(&localSize, &maxLocalSize, 1, MPI_UNSIGNED_LONG_LONG, MPI_MAX,
MPI_COMM_WORLD);
m_StreamComm);
std::vector<char> localVec(maxLocalSize, '\0');
std::vector<char> globalVec(maxLocalSize * m_WorldSize);
std::vector<char> globalVec(maxLocalSize * m_StreamSize);
MPI_Allgather(localVec.data(), maxLocalSize, MPI_CHAR, globalVec.data(),
maxLocalSize, MPI_CHAR, MPI_COMM_WORLD);
maxLocalSize, MPI_CHAR, m_StreamComm);
// deserialize global metadata Json
nlohmann::json globalJson;
try
{
for (size_t i = 0; i < m_WorldSize; ++i)
for (size_t i = 0; i < m_StreamSize; ++i)
{
if (globalVec[i * maxLocalSize] == '\0')
{
......@@ -359,7 +368,7 @@ void SscWriter::SyncReadPattern()
ssc::JsonToBlockVecVec(globalJson, m_GlobalReadPattern);
ssc::CalculateOverlap(m_GlobalReadPattern,
m_GlobalWritePattern[m_WorldRank]);
m_GlobalWritePattern[m_StreamRank]);
m_AllSendingReaderRanks = ssc::AllOverlapRanks(m_GlobalReadPattern);
CalculatePosition(m_GlobalWritePattern, m_GlobalReadPattern, m_WriterRank,
m_AllSendingReaderRanks);
......@@ -428,7 +437,7 @@ void SscWriter::DoClose(const int transportIndex)
TAU_SCOPED_TIMER_FUNC();
if (m_Verbosity >= 5)
{
std::cout << "SscWriter::DoClose, World Rank " << m_WorldRank
std::cout << "SscWriter::DoClose, World Rank " << m_StreamRank
<< ", Writer Rank " << m_WriterRank << std::endl;
}
......@@ -440,7 +449,7 @@ void SscWriter::DoClose(const int transportIndex)
for (const auto &i : m_AllSendingReaderRanks)
{
requests.emplace_back();
MPI_Isend(m_Buffer.data(), 1, MPI_CHAR, i.first, 0, MPI_COMM_WORLD,
MPI_Isend(m_Buffer.data(), 1, MPI_CHAR, i.first, 0, m_StreamComm,
&requests.back());
}
MPI_Status statuses[requests.size()];
......
......@@ -52,16 +52,15 @@ private:
std::vector<char> m_Buffer;
MPI_Win m_MpiWin;
MPI_Group m_MpiAllReadersGroup;
MPI_Comm m_StreamComm;
std::string m_MpiMode = "TwoSided";
int m_WorldRank;
int m_WorldSize;
int m_StreamRank;
int m_StreamSize;
int m_WriterRank;
int m_WriterSize;
helper::MpiHandshake m_MpiHandshake;
std::vector<int> m_AllWriterRanks;
std::vector<int> m_AllReaderRanks;
void SyncMpiPattern();
void SyncWritePattern();
......
......@@ -24,7 +24,7 @@ namespace engine
template <class T>
bool SscWriter::HasBlock(const Variable<T> &variable)
{
for (const auto &b : m_GlobalWritePattern[m_WorldRank])
for (const auto &b : m_GlobalWritePattern[m_StreamRank])
{
if (b.name == variable.m_Name and
ssc::AreSameDims(variable.m_Start, b.start) and
......@@ -59,8 +59,8 @@ void SscWriter::PutDeferredCommon(Variable<T> &variable, const T *data)
{
if (not HasBlock(variable))
{
m_GlobalWritePattern[m_WorldRank].emplace_back();
auto &b = m_GlobalWritePattern[m_WorldRank].back();
m_GlobalWritePattern[m_StreamRank].emplace_back();
auto &b = m_GlobalWritePattern[m_StreamRank].back();
b.name = variable.m_Name;
b.type = helper::GetType<T>();
b.shape = variable.m_Shape;
......@@ -75,7 +75,7 @@ void SscWriter::PutDeferredCommon(Variable<T> &variable, const T *data)
variable.SetData(data);
bool found = false;
for (const auto &b : m_GlobalWritePattern[m_WorldRank])
for (const auto &b : m_GlobalWritePattern[m_StreamRank])
{
if (b.name == variable.m_Name and
ssc::AreSameDims(variable.m_Start, b.start) and
......
......@@ -3,6 +3,9 @@
* accompanying file Copyright.txt for details.
*
* adiosMpiHandshake.cpp
*
* Created on: Mar 1, 2020
* Author: Jason Wang
*/
#include "adiosMpiHandshake.h"
......@@ -91,6 +94,8 @@ void MpiHandshake::Test()
bool MpiHandshake::Check(const std::string &filename)
{
int m_Verbosity = 1;
Test();
// check if RendezvousAppCount reached
......@@ -98,6 +103,15 @@ bool MpiHandshake::Check(const std::string &filename)
if (m_WritersMap[filename].size() + m_ReadersMap[filename].size() !=
m_RendezvousAppCounts[filename])
{
if (m_Verbosity >= 10)
{
std::cout << "MpiHandshake Rank " << m_WorldRank << " Stream "
<< filename << ": " << m_WritersMap[filename].size()
<< " writers and " << m_ReadersMap[filename].size()
<< " readers found out of "
<< m_RendezvousAppCounts[filename]
<< " total rendezvous apps" << std::endl;
}
return false;
}
......@@ -205,7 +219,9 @@ void MpiHandshake::Handshake(const std::string &filename, const char mode,
nowTime - startTime);
if (duration.count() > timeoutSeconds)
{
throw(std::runtime_error("Mpi handshake timeout"));
throw(std::runtime_error("Mpi handshake timeout on Rank" +
std::to_string(m_WorldRank) +
" for Stream " + filename));
}
}
......@@ -240,41 +256,75 @@ MpiHandshake::GetReaderMap(const std::string &filename)
return m_ReadersMap[filename];
}
void MpiHandshake::PrintMaps(const int printRank)
MPI_Group MpiHandshake::GetAllReadersGroup(const std::string &filename)
{
std::vector<int> allReaderRanks;
for (const auto &app : GetReaderMap(filename))
{
for (int rank : app.second)
{
allReaderRanks.push_back(rank);
}
}
MPI_Group worldGroup;
MPI_Group allReadersGroup;
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, allReaderRanks.size(), allReaderRanks.data(),
&allReadersGroup);
return allReadersGroup;
}
MPI_Group MpiHandshake::GetAllWritersGroup(const std::string &filename)
{
std::vector<int> allWriterRanks;
for (const auto &app : GetWriterMap(filename))
{
for (int rank : app.second)
{
allWriterRanks.push_back(rank);
}
}
MPI_Group worldGroup;
MPI_Group allWritersGroup;
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, allWriterRanks.size(), allWriterRanks.data(),
&allWritersGroup);
return allWritersGroup;
}
void MpiHandshake::PrintMaps(const int printRank, const std::string &filename)
{
if (m_WorldRank == printRank)
{
std::cout << "Writers: " << std::endl;
for (const auto &stream : m_WritersMap)
std::cout << "Printing MPI handshake map for Stream " << filename
<< " from Rank " << printRank << std::endl;
std::cout << " Writers: " << std::endl;
for (const auto &app : m_WritersMap[filename])
{
std::cout << " Stream " << stream.first << std::endl;
for (const auto &app : stream.second)
std::cout << " App Master Rank " << app.first << std::endl;
std::cout << " ";
for (const auto &rank : app.second)
{
std::cout << " App Master Rank " << app.first
<< std::endl;
std::cout << " ";
for (const auto &rank : app.second)
{
std::cout << rank << ", ";
}
std::cout << std::endl;
std::cout << rank << ", ";
}
std::cout << std::endl;
}
std::cout << "Readers: " << std::endl;
for (const auto &stream : m_ReadersMap)
std::cout << " Readers: " << std::endl;
for (const auto &app : m_ReadersMap[filename])
{
std::cout << " Stream " << stream.first << std::endl;
for (const auto &app : stream.second)
std::cout << " App Master Rank " << app.first << std::endl;
std::cout << " ";
for (const auto &rank : app.second)
{
std::cout << " App Master Rank " << app.first
<< std::endl;
std::cout << " ";
for (const auto &rank : app.second)