Commit 4e2ac835 authored by Ruonan Wang's avatar Ruonan Wang
Browse files

minor changes and clang-format

parent 90906069
......@@ -318,7 +318,8 @@ bool GetParameter(const Params &params, const std::string &key, int &value)
}
catch (...)
{
std::string error = "Engine parameter " + key + " can only be integer numbers";
std::string error =
"Engine parameter " + key + " can only be integer numbers";
std::cerr << error << std::endl;
return false;
}
......@@ -326,7 +327,8 @@ bool GetParameter(const Params &params, const std::string &key, int &value)
return true;
}
bool GetParameter(const Params &params, const std::string &key, std::string &value)
bool GetParameter(const Params &params, const std::string &key,
std::string &value)
{
auto it = params.find(key);
if (it == params.end())
......
......@@ -67,7 +67,8 @@ void JsonToBlockVecVec(const std::string &input, BlockVecVec &output);
bool AreSameDims(const Dims &a, const Dims &b);
bool GetParameter(const Params &params, const std::string &key, int &value);
bool GetParameter(const Params &params, const std::string &key, std::string &value);
bool GetParameter(const Params &params, const std::string &key,
std::string &value);
} // end namespace ssc
} // end namespace engine
......
......@@ -34,9 +34,12 @@ SscReader::SscReader(IO &io, const std::string &name, const Mode mode,
ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength", m_MaxFilenameLength);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", m_RendezvousAppCount);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount", m_RendezvousStreamCount);
ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength",
m_MaxFilenameLength);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount",
m_RendezvousAppCount);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount",
m_RendezvousStreamCount);
m_Buffer.resize(1);
......@@ -104,7 +107,6 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
{
TAU_SCOPED_TIMER_FUNC();
if (m_InitialStep)
{
m_InitialStep = false;
......@@ -139,7 +141,8 @@ StepStatus SscReader::BeginStep(const StepMode stepMode,
if (m_Verbosity >= 5)
{
std::cout << "SscReader::BeginStep, World Rank " << m_WorldRank
<< ", Reader Rank " << m_ReaderRank << ", Step "<< m_CurrentStep << std::endl;
<< ", Reader Rank " << m_ReaderRank << ", Step "
<< m_CurrentStep << std::endl;
}
if (m_Buffer[0] == 1)
......@@ -188,7 +191,8 @@ void SscReader::SyncMpiPattern()
<< ", Reader Rank " << m_ReaderRank << std::endl;
}
m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength, m_RendezvousAppCount, 'r', m_Name, CommAsMPI(m_Comm) );
m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength,
m_RendezvousAppCount, 'r', m_Name, CommAsMPI(m_Comm));
m_MpiHandshake.Wait(m_Name);
m_MpiHandshake.PrintMaps();
......@@ -208,11 +212,12 @@ void SscReader::SyncMpiPattern()
}
}
m_MpiHandshake.Finalize();
MPI_Group worldGroup;
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, m_AllWriterRanks.size(), m_AllWriterRanks.data(),
&m_MpiAllWritersGroup);
}
void SscReader::SyncWritePattern()
......@@ -462,7 +467,6 @@ void SscReader::DoClose(const int transportIndex)
<< ", Reader Rank " << m_ReaderRank << std::endl;
}
MPI_Win_free(&m_MpiWin);
m_MpiHandshake.Finalize();
}
} // end namespace engine
......
......@@ -13,8 +13,8 @@
#include "SscHelper.h"
#include "adios2/core/Engine.h"
#include "adios2/toolkit/profiling/taustubs/tautimer.hpp"
#include "adios2/helper/adiosMpiHandshake.h"
#include "adios2/toolkit/profiling/taustubs/tautimer.hpp"
#include <mpi.h>
#include <queue>
......
......@@ -14,7 +14,6 @@
#include "adios2/helper/adiosJSONcomplex.h"
#include "nlohmann/json.hpp"
namespace adios2
{
namespace core
......@@ -34,9 +33,12 @@ SscWriter::SscWriter(IO &io, const std::string &name, const Mode mode,
ssc::GetParameter(m_IO.m_Parameters, "MpiMode", m_MpiMode);
ssc::GetParameter(m_IO.m_Parameters, "Verbose", m_Verbosity);
ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength", m_MaxFilenameLength);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount", m_RendezvousAppCount);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount", m_RendezvousStreamCount);
ssc::GetParameter(m_IO.m_Parameters, "MaxFilenameLength",
m_MaxFilenameLength);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousAppCount",
m_RendezvousAppCount);
ssc::GetParameter(m_IO.m_Parameters, "RendezvousStreamCount",
m_RendezvousStreamCount);
m_GlobalWritePattern.resize(m_WorldSize);
m_GlobalReadPattern.resize(m_WorldSize);
......@@ -60,7 +62,8 @@ StepStatus SscWriter::BeginStep(StepMode mode, const float timeoutSeconds)
if (m_Verbosity >= 5)
{
std::cout << "SscWriter::BeginStep, World Rank " << m_WorldRank
<< ", Writer Rank " << m_WriterRank << ", Step "<< m_CurrentStep << std::endl;
<< ", Writer Rank " << m_WriterRank << ", Step "
<< m_CurrentStep << std::endl;
}
return StepStatus::OK;
......@@ -186,7 +189,8 @@ void SscWriter::SyncMpiPattern()
<< ", Writer Rank " << m_WriterRank << std::endl;
}
m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength, m_RendezvousAppCount, 'w', m_Name, CommAsMPI(m_Comm) );
m_MpiHandshake.Start(m_RendezvousStreamCount, m_MaxFilenameLength,
m_RendezvousAppCount, 'w', m_Name, CommAsMPI(m_Comm));
m_MpiHandshake.Wait(m_Name);
m_MpiHandshake.PrintMaps();
......@@ -206,9 +210,12 @@ void SscWriter::SyncMpiPattern()
}
}
m_MpiHandshake.Finalize();
MPI_Group worldGroup;
MPI_Comm_group(MPI_COMM_WORLD, &worldGroup);
MPI_Group_incl(worldGroup, m_AllReaderRanks.size(), m_AllReaderRanks.data(), &m_MpiAllReadersGroup);
MPI_Group_incl(worldGroup, m_AllReaderRanks.size(), m_AllReaderRanks.data(),
&m_MpiAllReadersGroup);
}
void SscWriter::SyncWritePattern()
......@@ -473,7 +480,6 @@ void SscWriter::DoClose(const int transportIndex)
}
MPI_Win_free(&m_MpiWin);
m_MpiHandshake.Finalize();
}
} // end namespace engine
......
......@@ -13,8 +13,8 @@
#include "SscHelper.h"
#include "adios2/core/Engine.h"
#include "adios2/toolkit/profiling/taustubs/tautimer.hpp"
#include "adios2/helper/adiosMpiHandshake.h"
#include "adios2/toolkit/profiling/taustubs/tautimer.hpp"
#include <mpi.h>
#include <queue>
......
......@@ -5,8 +5,8 @@
* adiosMpiHandshake.cpp
*/
#include <iostream>
#include "adiosMpiHandshake.h"
#include <iostream>
namespace adios2
{
......@@ -26,11 +26,12 @@ int MpiHandshake::m_WorldRank;
int MpiHandshake::m_LocalSize;
int MpiHandshake::m_LocalRank;
int MpiHandshake::m_LocalMasterRank;
std::map<std::string, std::map<int, std::vector<int>>> MpiHandshake::m_WritersMap;
std::map<std::string, std::map<int, std::vector<int>>> MpiHandshake::m_ReadersMap;
std::map<std::string, std::map<int, std::vector<int>>>
MpiHandshake::m_WritersMap;
std::map<std::string, std::map<int, std::vector<int>>>
MpiHandshake::m_ReadersMap;
std::map<int, int> MpiHandshake::m_AppsSize;
size_t MpiHandshake::PlaceInBuffer(size_t stream, int rank)
{
return rank * m_MaxStreamsPerApp * m_ItemSize + stream * m_ItemSize;
......@@ -41,50 +42,52 @@ void MpiHandshake::Test()
int success;
MPI_Status status;
for(int rank=0; rank<m_WorldSize; ++rank)
for (int rank = 0; rank < m_WorldSize; ++rank)
{
for(size_t stream=0; stream<m_MaxStreamsPerApp; ++stream)
for (size_t stream = 0; stream < m_MaxStreamsPerApp; ++stream)
{
MPI_Test(&m_RecvRequests[rank][stream], &success, &status);
if(success)
if (success)
{
size_t offset = PlaceInBuffer(stream, rank);
char mode = m_Buffer[offset];
offset += sizeof(char);
int appMasterRank = reinterpret_cast<int *>(m_Buffer.data() + offset)[0];
int appMasterRank =
reinterpret_cast<int *>(m_Buffer.data() + offset)[0];
offset += sizeof(int);
int appSize = reinterpret_cast<int *>(m_Buffer.data() + offset)[0];
int appSize =
reinterpret_cast<int *>(m_Buffer.data() + offset)[0];
offset += sizeof(int);
std::string filename = m_Buffer.data() + offset;
m_AppsSize[appMasterRank] = appSize;
if(mode == 'w')
if (mode == 'w')
{
auto &ranks = m_WritersMap[filename][appMasterRank];
bool existed = false;
for(const auto r : ranks)
for (const auto r : ranks)
{
if(r == rank)
if (r == rank)
{
existed = true;
}
}
if(not existed)
if (not existed)
{
ranks.push_back(rank);
}
}
else if(mode == 'r')
else if (mode == 'r')
{
auto &ranks = m_ReadersMap[filename][appMasterRank];
bool existed = false;
for(const auto r : ranks)
for (const auto r : ranks)
{
if(r == rank)
if (r == rank)
{
existed = true;
}
}
if(not existed)
if (not existed)
{
ranks.push_back(rank);
}
......@@ -98,14 +101,15 @@ bool MpiHandshake::Check(const std::string &filename)
{
Test();
if(m_WritersMap[filename].size() + m_ReadersMap[filename].size() != m_AppsForStreams[filename])
if (m_WritersMap[filename].size() + m_ReadersMap[filename].size() !=
m_AppsForStreams[filename])
{
return false;
}
for (const auto &app : m_WritersMap[filename])
{
if(app.second.size() != m_AppsSize[app.first])
if (app.second.size() != m_AppsSize[app.first])
{
return false;
}
......@@ -113,7 +117,7 @@ bool MpiHandshake::Check(const std::string &filename)
for (const auto &app : m_ReadersMap[filename])
{
if(app.second.size() != m_AppsSize[app.first])
if (app.second.size() != m_AppsSize[app.first])
{
return false;
}
......@@ -122,11 +126,14 @@ bool MpiHandshake::Check(const std::string &filename)
return true;
}
void MpiHandshake::Start(const size_t maxStreamsPerApp, const size_t maxFilenameLength, const size_t appsForThisStream, const char mode, const std::string &filename, MPI_Comm localComm)
void MpiHandshake::Start(const size_t maxStreamsPerApp,
const size_t maxFilenameLength,
const size_t appsForThisStream, const char mode,
const std::string &filename, MPI_Comm localComm)
{
m_AppsForStreams[filename] = appsForThisStream;
if(m_StreamID == 0)
if (m_StreamID == 0)
{
MPI_Comm_size(MPI_COMM_WORLD, &m_WorldSize);
MPI_Comm_rank(MPI_COMM_WORLD, &m_WorldRank);
......@@ -136,7 +143,7 @@ void MpiHandshake::Start(const size_t maxStreamsPerApp, const size_t maxFilename
m_MaxFilenameLength = maxFilenameLength;
m_ItemSize = maxFilenameLength + sizeof(char) + sizeof(int) * 2;
if(m_LocalRank == 0)
if (m_LocalRank == 0)
{
m_LocalMasterRank = m_WorldRank;
}
......@@ -145,7 +152,7 @@ void MpiHandshake::Start(const size_t maxStreamsPerApp, const size_t maxFilename
m_SendRequests.resize(m_WorldSize);
m_RecvRequests.resize(m_WorldSize);
for(int i=0; i<m_WorldSize; ++i)
for (int i = 0; i < m_WorldSize; ++i)
{
m_SendRequests[i].resize(maxStreamsPerApp);
m_RecvRequests[i].resize(maxStreamsPerApp);
......@@ -154,16 +161,18 @@ void MpiHandshake::Start(const size_t maxStreamsPerApp, const size_t maxFilename
size_t bufferSize = m_WorldSize * maxStreamsPerApp * m_ItemSize;
m_Buffer.resize(bufferSize);
for(int rank=0; rank<m_WorldSize; ++rank)
for (int rank = 0; rank < m_WorldSize; ++rank)
{
for(size_t stream=0; stream<maxStreamsPerApp; ++stream)
for (size_t stream = 0; stream < maxStreamsPerApp; ++stream)
{
MPI_Irecv(m_Buffer.data() + PlaceInBuffer(stream, rank), m_ItemSize, MPI_CHAR, rank, rank, MPI_COMM_WORLD, &m_RecvRequests[rank][stream]);
MPI_Irecv(m_Buffer.data() + PlaceInBuffer(stream, rank),
m_ItemSize, MPI_CHAR, rank, rank, MPI_COMM_WORLD,
&m_RecvRequests[rank][stream]);
}
}
}
if(filename.size() > maxFilenameLength)
if (filename.size() > maxFilenameLength)
{
throw(std::runtime_error("Filename too long"));
}
......@@ -177,29 +186,31 @@ void MpiHandshake::Start(const size_t maxStreamsPerApp, const size_t maxFilename
offset += sizeof(int);
std::memcpy(buffer.data() + offset, filename.data(), filename.size());
for(int i=0; i<m_WorldSize; ++i)
for (int i = 0; i < m_WorldSize; ++i)
{
MPI_Isend(buffer.data(), m_ItemSize, MPI_CHAR, i, m_WorldRank, MPI_COMM_WORLD, &m_SendRequests[i][m_StreamID]);
MPI_Isend(buffer.data(), m_ItemSize, MPI_CHAR, i, m_WorldRank,
MPI_COMM_WORLD, &m_SendRequests[i][m_StreamID]);
}
++m_StreamID;
}
void MpiHandshake::Wait(const std::string &filename)
{
bool finished = false;
while(not finished)
while (not finished)
{
finished = Check(filename);
}
}
const std::map<int, std::vector<int>> & MpiHandshake::GetWriterMap(const std::string &filename)
const std::map<int, std::vector<int>> &
MpiHandshake::GetWriterMap(const std::string &filename)
{
return m_WritersMap[filename];
}
const std::map<int, std::vector<int>> & MpiHandshake::GetReaderMap(const std::string &filename)
const std::map<int, std::vector<int>> &
MpiHandshake::GetReaderMap(const std::string &filename)
{
return m_ReadersMap[filename];
}
......@@ -207,16 +218,16 @@ const std::map<int, std::vector<int>> & MpiHandshake::GetReaderMap(const std::st
void MpiHandshake::Finalize()
{
--m_StreamID;
if(m_StreamID == 0)
if (m_StreamID == 0)
{
for(auto &rs : m_RecvRequests)
for (auto &rs : m_RecvRequests)
{
for(auto &r : rs)
for (auto &r : rs)
{
MPI_Status status;
int success;
MPI_Test(&r, &success, &status);
if(not success)
if (not success)
{
MPI_Cancel(&r);
}
......@@ -229,21 +240,23 @@ void MpiHandshake::Finalize()
void MpiHandshake::PrintMaps()
{
for(int printRank = 0; printRank < m_WorldSize; ++printRank)
for (int printRank = 0; printRank < m_WorldSize; ++printRank)
{
MPI_Barrier(MPI_COMM_WORLD);
if(m_WorldRank == printRank)
if (m_WorldRank == printRank)
{
std::cout << "For rank " << printRank << " ********************* " << std::endl;
std::cout << "For rank " << printRank << " ********************* "
<< std::endl;
std::cout << "Writers: " << std::endl;
for (const auto &stream : m_WritersMap)
{
std::cout << " Stream " << stream.first << std::endl;
for (const auto &app : stream.second)
{
std::cout << " App Master Rank " << app.first << std::endl;
std::cout << " ";
for(const auto &rank : app.second)
std::cout << " App Master Rank " << app.first
<< std::endl;
std::cout << " ";
for (const auto &rank : app.second)
{
std::cout << rank << ", ";
}
......@@ -256,9 +269,10 @@ void MpiHandshake::PrintMaps()
std::cout << " Stream " << stream.first << std::endl;
for (const auto &app : stream.second)
{
std::cout << " App Master Rank " << app.first << std::endl;
std::cout << " ";
for(const auto &rank : app.second)
std::cout << " App Master Rank " << app.first
<< std::endl;
std::cout << " ";
for (const auto &rank : app.second)
{
std::cout << rank << ", ";
}
......
......@@ -10,13 +10,13 @@
#include "adios2/common/ADIOSConfig.h"
#ifndef ADIOS2_HAVE_MPI
#error "Do not include adiosCommMPI.h without ADIOS2_HAVE_MPI."
#error "Do not include adiosMpiHandshake.h without ADIOS2_HAVE_MPI."
#endif
#include <string>
#include <map>
#include <vector>
#include <mpi.h>
#include <string>
#include <vector>
namespace adios2
{
......@@ -25,33 +25,39 @@ namespace helper
class MpiHandshake
{
public:
static void Start(const size_t maxStreamsPerApp, const size_t maxFilenameLength, const size_t appsForThisStream, const char mode, const std::string &filename, MPI_Comm localComm);
static void Wait(const std::string &filename);
static void Finalize();
static const std::map<int, std::vector<int>> & GetWriterMap(const std::string &filename);
static const std::map<int, std::vector<int>> & GetReaderMap(const std::string &filename);
static void PrintMaps();
private:
static void Test();
static bool Check(const std::string &filename);
static size_t PlaceInBuffer(const size_t stream, const int rank);
static std::vector<char> m_Buffer;
static std::vector<std::vector<MPI_Request>> m_SendRequests;
static std::vector<std::vector<MPI_Request>> m_RecvRequests;
static size_t m_MaxStreamsPerApp;
static size_t m_MaxFilenameLength;
static size_t m_ItemSize;
static std::map<std::string, size_t> m_AppsForStreams;
static size_t m_StreamID;
static int m_WorldSize;
static int m_WorldRank;
static int m_LocalSize;
static int m_LocalRank;
static int m_LocalMasterRank;
static std::map<std::string, std::map<int, std::vector<int>>> m_WritersMap;
static std::map<std::string, std::map<int, std::vector<int>>> m_ReadersMap;
static std::map<int, int> m_AppsSize;
public:
static void Start(const size_t maxStreamsPerApp,
const size_t maxFilenameLength,
const size_t appsForThisStream, const char mode,
const std::string &filename, MPI_Comm localComm);
static void Wait(const std::string &filename);
static void Finalize();
static const std::map<int, std::vector<int>> &
GetWriterMap(const std::string &filename);
static const std::map<int, std::vector<int>> &
GetReaderMap(const std::string &filename);
static void PrintMaps();
private:
static void Test();
static bool Check(const std::string &filename);
static size_t PlaceInBuffer(const size_t stream, const int rank);
static std::vector<char> m_Buffer;
static std::vector<std::vector<MPI_Request>> m_SendRequests;
static std::vector<std::vector<MPI_Request>> m_RecvRequests;
static size_t m_MaxStreamsPerApp;
static size_t m_MaxFilenameLength;
static size_t m_ItemSize;
static std::map<std::string, size_t> m_AppsForStreams;
static size_t m_StreamID;
static int m_WorldSize;
static int m_WorldRank;
static int m_LocalSize;
static int m_LocalRank;
static int m_LocalMasterRank;
static std::map<std::string, std::map<int, std::vector<int>>> m_WritersMap;
static std::map<std::string, std::map<int, std::vector<int>>> m_ReadersMap;
static std::map<int, int> m_AppsSize;
};
} // end namespace helper
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment