Commit b8fc7346 authored by Wang, Ruonan's avatar Wang, Ruonan
Browse files

refined MpiHandshake and added timeout

parent 12ae522d
......@@ -38,8 +38,9 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,
m_MaxFilenameLength);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount",
m_RendezvousAppCount);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount",
m_RendezvousStreamCount);
ssc::GetParameter(m_IO.m_Parameters, "MaxStreamsPerApp",
m_MaxStreamsPerApp);
ssc::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs", m_OpenTimeoutSecs);
m_Buffer.resize(1);
......@@ -191,10 +192,9 @@ void SscReader::SyncMpiPattern()
<< ", Reader Rank " << m_ReaderRank << std::endl;
}
m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength,
m_RendezvousAppCount, 'r', m_Name, CommAsMPI(m_Comm));
m_MpiHandshake.Wait(m_Name);
m_MpiHandshake.PrintMaps();
m_MpiHandshake.Handshake(m_Name, 'r', m_OpenTimeoutSecs, m_MaxStreamsPerApp,
m_MaxFilenameLength, m_RendezvousAppCount,
CommAsMPI(m_Comm));
for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name))
{
......@@ -212,8 +212,6 @@ void SscReader::SyncMpiPattern()
}
}
m_MpiHandshake.Finalize();
MPI_Group worldGroup;
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, m_AllWriterRanks.size(), m_AllWriterRanks.data(),
......
......@@ -102,8 +102,9 @@ private:
int m_Verbosity = 0;
int m_MaxFilenameLength = 128;
int m_RendezvousStreamCount = 1;
int m_MaxStreamsPerApp = 1;
int m_RendezvousAppCount = 2;
int m_OpenTimeoutSecs = 10;
};
} // end namespace engine
......
......@@ -37,8 +37,9 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode,
m_MaxFilenameLength);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount",
m_RendezvousAppCount);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount",
m_RendezvousStreamCount);
ssc::GetParameter(m_IO.m_Parameters, "MaxStreamsPerApp",
m_MaxStreamsPerApp);
ssc::GetParameter(m_IO.m_Parameters, "OpenTimeoutSecs", m_OpenTimeoutSecs);
m_GlobalWritePattern.resize(m_WorldSize);
m_GlobalReadPattern.resize(m_WorldSize);
......@@ -189,10 +190,9 @@ void SscWriter::SyncMpiPattern()
<< ", Writer Rank " << m_WriterRank << std::endl;
}
m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength,
m_RendezvousAppCount, 'w', m_Name, CommAsMPI(m_Comm));
m_MpiHandshake.Wait(m_Name);
m_MpiHandshake.PrintMaps();
m_MpiHandshake.Handshake(m_Name, 'w', m_OpenTimeoutSecs, m_MaxStreamsPerApp,
m_MaxFilenameLength, m_RendezvousAppCount,
CommAsMPI(m_Comm));
for (const auto &app : m_MpiHandshake.GetWriterMap(m_Name))
{
......@@ -210,8 +210,6 @@ void SscWriter::SyncMpiPattern()
}
}
m_MpiHandshake.Finalize();
MPI_Group worldGroup;
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, m_AllReaderRanks.size(), m_AllReaderRanks.data(),
......
......@@ -92,8 +92,9 @@ private:
int m_Verbosity = 0;
int m_MaxFilenameLength = 128;
int m_RendezvousStreamCount = 1;
int m_MaxStreamsPerApp = 1;
int m_RendezvousAppCount = 2;
int m_OpenTimeoutSecs = 10;
};
} // end namespace engine
......
......@@ -6,6 +6,8 @@
*/
#include "adiosMpiHandshake.h"
#include <algorithm>
#include <chrono>
#include <cstring>
#include <iostream>
......@@ -20,7 +22,7 @@ std::vector<std::vector<MPI_Request>> MpiHandshake::m_RecvRequests;
size_t MpiHandshake::m_MaxStreamsPerApp;
size_t MpiHandshake::m_MaxFilenameLength;
size_t MpiHandshake::m_ItemSize;
std::map<std::string, size_t> MpiHandshake::m_AppsForStreams;
std::map<std::string, size_t> MpiHandshake::m_RendezvousAppCounts;
size_t MpiHandshake::m_StreamID = 0;
int MpiHandshake::m_WorldSize;
int MpiHandshake::m_WorldRank;
......@@ -64,15 +66,8 @@ void MpiHandshake::Test()
if (mode == 'w')
{
auto &ranks = m_WritersMap[filename][appMasterRank];
bool existed = false;
for (const auto r : ranks)
{
if (r == rank)
{
existed = true;
}
}
if (!existed)
if (std::find(ranks.begin(), ranks.end(), rank) ==
ranks.end())
{
ranks.push_back(rank);
}
......@@ -80,15 +75,8 @@ void MpiHandshake::Test()
else if (mode == 'r')
{
auto &ranks = m_ReadersMap[filename][appMasterRank];
bool existed = false;
for (const auto r : ranks)
{
if (r == rank)
{
existed = true;
}
}
if (!existed)
if (std::find(ranks.begin(), ranks.end(), rank) ==
ranks.end())
{
ranks.push_back(rank);
}
......@@ -102,12 +90,16 @@ bool MpiHandshake::Check(const std::string &filename)
{
Test();
// check if RendezvousAppCount reached
if (m_WritersMap[filename].size() + m_ReadersMap[filename].size() !=
m_AppsForStreams[filename])
m_RendezvousAppCounts[filename])
{
return false;
}
// check if all ranks' info is received
for (const auto &app : m_WritersMap[filename])
{
if (app.second.size() != m_AppsSize[app.first])
......@@ -127,56 +119,62 @@ bool MpiHandshake::Check(const std::string &filename)
return true;
}
void MpiHandshake::Start(const size_t maxStreamsPerApp,
const size_t maxFilenameLength,
const size_t appsForThisStream, const char mode,
const std::string &filename, MPI_Comm localComm)
void MpiHandshake::Handshake(const std::string &filename, const char mode,
const int timeoutSeconds,
const size_t maxStreamsPerApp,
const size_t maxFilenameLength,
const size_t rendezvousAppCountForStream,
MPI_Comm localComm)
{
m_AppsForStreams[filename] = appsForThisStream;
if (m_StreamID == 0)
// initialize variables
if (filename.size() > maxFilenameLength)
{
MPI_Comm_size(MPI_COMM_WORLD, &m_WorldSize);
MPI_Comm_rank(MPI_COMM_WORLD, &m_WorldRank);
MPI_Comm_size(localComm, &m_LocalSize);
MPI_Comm_rank(localComm, &m_LocalRank);
m_MaxStreamsPerApp = maxStreamsPerApp;
m_MaxFilenameLength = maxFilenameLength;
m_ItemSize = maxFilenameLength + sizeof(char) + sizeof(int) * 2;
if (m_LocalRank == 0)
{
m_LocalMasterRank = m_WorldRank;
}
throw(std::runtime_error("Filename too long"));
}
MPI_Bcast(&m_LocalMasterRank, 1, MPI_INT, 0, localComm);
MPI_Comm_size(MPI_COMM_WORLD, &m_WorldSize);
MPI_Comm_rank(MPI_COMM_WORLD, &m_WorldRank);
MPI_Comm_size(localComm, &m_LocalSize);
MPI_Comm_rank(localComm, &m_LocalRank);
m_MaxStreamsPerApp = maxStreamsPerApp;
m_MaxFilenameLength = maxFilenameLength;
m_RendezvousAppCounts[filename] = rendezvousAppCountForStream;
m_SendRequests.resize(m_WorldSize);
m_RecvRequests.resize(m_WorldSize);
for (int i = 0; i < m_WorldSize; ++i)
{
m_SendRequests[i].resize(maxStreamsPerApp);
m_RecvRequests[i].resize(maxStreamsPerApp);
}
m_SendRequests.resize(m_WorldSize);
m_RecvRequests.resize(m_WorldSize);
for (int rank = 0; rank < m_WorldSize; ++rank)
{
m_SendRequests[rank].resize(maxStreamsPerApp);
m_RecvRequests[rank].resize(maxStreamsPerApp);
}
size_t bufferSize = m_WorldSize * maxStreamsPerApp * m_ItemSize;
m_Buffer.resize(bufferSize);
m_ItemSize = maxFilenameLength + sizeof(char) + sizeof(int) * 2;
m_Buffer.resize(m_WorldSize * maxStreamsPerApp * m_ItemSize);
for (int rank = 0; rank < m_WorldSize; ++rank)
{
for (size_t stream = 0; stream < maxStreamsPerApp; ++stream)
{
MPI_Irecv(m_Buffer.data() + PlaceInBuffer(stream, rank),
m_ItemSize, MPI_CHAR, rank, rank, MPI_COMM_WORLD,
&m_RecvRequests[rank][stream]);
}
}
// broadcast local master rank's world rank to use as app ID
if (m_LocalRank == 0)
{
m_LocalMasterRank = m_WorldRank;
}
MPI_Bcast(&m_LocalMasterRank, 1, MPI_INT, 0, localComm);
if (filename.size() > maxFilenameLength)
// start receiving
for (int rank = 0; rank < m_WorldSize; ++rank)
{
throw(std::runtime_error("Filename too long"));
for (size_t stream = 0; stream < maxStreamsPerApp; ++stream)
{
MPI_Irecv(m_Buffer.data() + PlaceInBuffer(stream, rank), m_ItemSize,
MPI_CHAR, rank, rank, MPI_COMM_WORLD,
&m_RecvRequests[rank][stream]);
}
}
// start sending
size_t offset = 0;
std::vector<char> buffer(m_ItemSize);
std::memcpy(buffer.data(), &mode, sizeof(char));
......@@ -187,22 +185,44 @@ void MpiHandshake::Start(const size_t maxStreamsPerApp,
offset += sizeof(int);
std::memcpy(buffer.data() + offset, filename.data(), filename.size());
for (int i = 0; i < m_WorldSize; ++i)
for (int rank = 0; rank < m_WorldSize; ++rank)
{
MPI_Isend(buffer.data(), m_ItemSize, MPI_CHAR, i, m_WorldRank,
MPI_COMM_WORLD, &m_SendRequests[i][m_StreamID]);
MPI_Isend(buffer.data(), m_ItemSize, MPI_CHAR, rank, m_WorldRank,
MPI_COMM_WORLD, &m_SendRequests[rank][m_StreamID]);
}
++m_StreamID;
}
// wait and check if required RendezvousAppCount reached
void MpiHandshake::Wait(const std::string &filename)
{
bool finished = false;
while (!finished)
auto start_time = std::chrono::system_clock::now();
while (!Check(filename))
{
finished = Check(filename);
auto now_time = std::chrono::system_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::seconds>(
now_time - start_time);
if (duration.count() > timeoutSeconds)
{
throw(std::runtime_error("Mpi handshake timeout"));
}
}
// clean up MPI requests
for (auto &rs : m_RecvRequests)
{
for (auto &r : rs)
{
MPI_Status status;
int success;
MPI_Test(&r, &success, &status);
if (!success)
{
MPI_Cancel(&r);
}
}
}
m_RecvRequests.clear();
++m_StreamID;
}
const std::map<int, std::vector<int>> &
......@@ -216,29 +236,6 @@ MpiHandshake::GetReaderMap(const std::string &filename)
return m_ReadersMap[filename];
}
void MpiHandshake::Finalize()
{
--m_StreamID;
if (m_StreamID == 0)
{
for (auto &rs : m_RecvRequests)
{
for (auto &r : rs)
{
MPI_Status status;
int success;
MPI_Test(&r, &success, &status);
if (!success)
{
MPI_Cancel(&r);
}
}
}
m_RecvRequests.clear();
m_SendRequests.clear();
}
}
void MpiHandshake::PrintMaps()
{
for (int printRank = 0; printRank < m_WorldSize; ++printRank)
......@@ -246,7 +243,8 @@ void MpiHandshake::PrintMaps()
MPI_Barrier(MPI_COMM_WORLD);
if (m_WorldRank == printRank)
{
std::cout << "For rank " << printRank << " ********************* "
std::cout << "For rank " << printRank
<< "============================================"
<< std::endl;
std::cout << "Writers: " << std::endl;
for (const auto &stream : m_WritersMap)
......
......@@ -26,12 +26,12 @@ namespace helper
class MpiHandshake
{
public:
static void Start(const size_t maxStreamsPerApp,
const size_t maxFilenameLength,
const size_t appsForThisStream, const char mode,
const std::string &filename, MPI_Comm localComm);
static void Wait(const std::string &filename);
static void Finalize();
static void Handshake(const std::string &filename, const char mode,
const int timeoutSeconds,
const size_t maxStreamsPerApp,
const size_t maxFilenameLength,
const size_t RendezvousAppCountForStream,
MPI_Comm localComm);
static const std::map<int, std::vector<int>> &
GetWriterMap(const std::string &filename);
static const std::map<int, std::vector<int>> &
......@@ -48,7 +48,7 @@ private:
static size_t m_MaxStreamsPerApp;
static size_t m_MaxFilenameLength;
static size_t m_ItemSize;
static std::map<std::string, size_t> m_AppsForStreams;
static std::map<std::string, size_t> m_RendezvousAppCounts;
static size_t m_StreamID;
static int m_WorldSize;
static int m_WorldRank;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment