Skip to content
Snippets Groups Projects
Commit 79029150 authored by Wang, Ruonan's avatar Wang, Ruonan
Browse files

heat transfer read/write worked with dataman with multiple MPI ranks in N-to-N pattern

parent 10547b65
No related branches found
No related tags found
1 merge request!366heat transfer read/write worked with dataman
......@@ -19,7 +19,7 @@
<!-- XXKb, XXMb, or XXXGb supported, default=16Kb
(applications might choose an optimal value) -->
<parameter key="InitialBufferSize" value="16Kb"/>
<parameter key="InitialBufferSize" value="128Mb"/>
</engine>
......@@ -58,8 +58,8 @@
<!-- POSIX, stdio (C FILE*), fstream (C++) -->
<parameter key="Library" value="ZMQ"/>
<!-- Dump file on receiver side -->
<parameter key="DumpFile" value="YES"/>
<!-- timeout in seconds -->
<parameter key="Timeout" value="5"/>
<!-- IP Address for network transfer -->
<parameter key="IPAddress" value="127.0.0.1"/>
......
......@@ -35,7 +35,6 @@ public:
* @param debugMode
* @param nthreads
*/
using json = nlohmann::json;
DataManReader(IO &io, const std::string &name, const Mode mode,
MPI_Comm mpiComm);
......
......@@ -12,6 +12,7 @@
#define ADIOS2_ENGINE_DATAMAN_DATAMANREADER_TCC_
#include "DataManReader.h"
#include <iostream>
namespace adios2
{
......@@ -21,13 +22,13 @@ void DataManReader::GetSyncCommon(Variable<T> &variable, T *data)
{
if (m_UseFormat == "BP" || m_UseFormat == "bp" )
{
int mpiSize;
MPI_Comm_size(m_MPIComm, &mpiSize);
m_BP3Deserializer.GetSyncVariableDataFromStream(
variable, m_BP3Deserializer.m_Data);
size_t varsize = std::accumulate(variable.m_Shape.begin(), variable.m_Shape.end(), sizeof(T),
std::multiplies<std::size_t>());
std::memcpy(data, variable.GetData(), varsize);
std::memcpy(data, variable.GetData(), varsize/mpiSize);
}
}
......
......@@ -16,6 +16,8 @@
#include "adios2/ADIOSMPI.h"
#include "adios2/helper/adiosFunctions.h" //GetType<T>
#include <iostream>
namespace adios2
{
......@@ -38,24 +40,7 @@ void DataManWriter::PutSyncCommon(Variable<T> &variable, const T *values)
variable.m_Start.assign(variable.m_Count.size(), 0);
}
if (m_UseFormat == "json" || m_UseFormat == "JSON")
{
nlohmann::json jmsg;
jmsg["doid"] = m_Name;
jmsg["var"] = variable.m_Name;
jmsg["dtype"] = GetType<T>();
jmsg["putshape"] = variable.m_Count;
jmsg["varshape"] = variable.m_Shape;
jmsg["offset"] = variable.m_Start;
jmsg["timestep"] = 0;
jmsg["bytes"] =
std::accumulate(variable.m_Shape.begin(), variable.m_Shape.end(),
sizeof(T), std::multiplies<size_t>());
m_Man.WriteWAN(values, jmsg);
}
else if (m_UseFormat == "bp" || m_UseFormat == "BP")
if (m_UseFormat == "bp" || m_UseFormat == "BP")
{
PutSyncCommonBP(variable, values);
}
......
......@@ -31,10 +31,10 @@ DataMan::DataMan(MPI_Comm mpiComm, const bool debugMode)
DataMan::~DataMan()
{
for (auto &controlThread : m_ControlThreads)
for (auto &readThread : m_ReadThreads)
{
m_Listening = false;
controlThread.join();
readThread.join();
}
}
......@@ -69,39 +69,36 @@ void DataMan::OpenWANTransports(const std::vector<std::string> &streamNames,
GetParameter("IPAddress", paramsVector[i], true, m_DebugMode,
"Transport IPAddress Parameter"));
std::string portControl(GetParameter("Port", paramsVector[i], false,
m_DebugMode,
"Transport Port Parameter"));
std::string port(GetParameter("Port", paramsVector[i], false,
m_DebugMode, "Transport Port Parameter"));
if (portControl.empty())
if (port.empty())
{
portControl = std::to_string(m_DefaultPort);
port = std::to_string(m_DefaultPort);
}
const std::string portData(std::to_string(stoi(portControl) + 1));
int mpiRank;
MPI_Comm_rank(m_MPIComm, &mpiRank);
port = std::to_string(stoi(port) + mpiRank);
if (library == "zmq" || library == "ZMQ")
{
#ifdef ADIOS2_HAVE_ZEROMQ
wanTransport = std::make_shared<transport::WANZmq>(
ipAddress, portData, m_MPIComm, m_DebugMode);
ipAddress, port, m_MPIComm, m_DebugMode);
wanTransport->Open(streamNames[i], mode);
m_Transports.emplace(counter, wanTransport);
controlTransport = std::make_shared<transport::WANZmq>(
ipAddress, portControl, m_MPIComm, m_DebugMode);
controlTransport->Open(streamNames[i], mode);
m_ControlTransports.emplace_back(controlTransport);
if (mode == Mode::Read)
{
if (callback == "YES" || callback == "yes" ||
callback == "TRUE" || callback == "true")
{
m_Listening = true;
m_ControlThreads.emplace_back(std::thread(
&DataMan::ReadThread, this, wanTransport,
controlTransport, streamNames[i], paramsVector[i]));
m_ReadThreads.emplace_back(
std::thread(&DataMan::ReadThread, this, wanTransport,
streamNames[i], paramsVector[i]));
}
}
++counter;
......@@ -124,24 +121,6 @@ void DataMan::OpenWANTransports(const std::vector<std::string> &streamNames,
}
}
void DataMan::WriteWAN(const void *buffer, nlohmann::json jmsg)
{
if (m_CurrentTransport >= m_ControlTransports.size())
{
throw std::runtime_error("ERROR: No valid control transports found, "
"from DataMan::WriteWAN()");
}
if (m_CurrentTransport >= m_Transports.size())
{
throw std::runtime_error(
"ERROR: No valid transports found, from DataMan::WriteWAN()");
}
m_ControlTransports[m_CurrentTransport]->Write(jmsg.dump().c_str(),
jmsg.dump().size());
m_Transports[m_CurrentTransport]->Write(
reinterpret_cast<const char *>(buffer), jmsg["bytes"].get<size_t>());
}
void DataMan::WriteWAN(const void *buffer, size_t size)
{
if (m_CurrentTransport >= m_Transports.size())
......@@ -158,7 +137,7 @@ void DataMan::ReadWAN(void *buffer, size_t &size)
{
Transport::Status status;
for (int i = 0; i < 3000; ++i)
for (int i = 0; i < m_Timeout * 1000; ++i)
{
m_Transports[m_CurrentTransport]->IRead(static_cast<char *>(buffer),
m_BufferSize, status);
......@@ -167,7 +146,7 @@ void DataMan::ReadWAN(void *buffer, size_t &size)
size = status.Bytes;
return;
}
std::this_thread::sleep_for(std::chrono::microseconds(10));
std::this_thread::sleep_for(std::chrono::microseconds(1000));
}
}
......@@ -184,7 +163,6 @@ void DataMan::SetCallback(adios2::Operator &callback)
}
void DataMan::ReadThread(std::shared_ptr<Transport> trans,
std::shared_ptr<Transport> ctl_trans,
const std::string stream_name,
const Params trans_params)
{
......@@ -196,42 +174,7 @@ void DataMan::ReadThread(std::shared_ptr<Transport> trans,
format = "bp";
}
if (format == "json" || format == "JSON")
{
while (m_Listening)
{
char buffer[1024];
size_t bytes = 0;
nlohmann::json jmsg;
adios2::Transport::Status status;
ctl_trans->IRead(buffer, 1024, status, 0);
if (status.Bytes > 0)
{
std::string smsg(buffer);
jmsg = nlohmann::json::parse(smsg);
bytes = jmsg.value("bytes", 0);
if (bytes > 0)
{
std::vector<char> data(bytes);
trans->Read(data.data(), bytes);
std::string doid =
jmsg.value("doid", "Unknown Data Object");
std::string var = jmsg.value("var", "Unknown Variable");
std::string dtype =
jmsg.value("dtype", "Unknown Data Type");
std::vector<size_t> putshape =
jmsg.value("putshape", std::vector<size_t>());
if (m_Callback != nullptr &&
m_Callback->m_Type == "Signature2")
{
m_Callback->RunCallback2(data.data(), doid, var, dtype,
putshape);
}
}
}
}
}
else if (format == "bp" || format == "BP")
if (format == "bp" || format == "BP")
{
while (m_Listening)
{
......
......@@ -18,8 +18,6 @@
#include "adios2/toolkit/format/bp3/BP3.h"
#include "adios2/toolkit/transportman/TransportMan.h"
#include <json.hpp>
namespace adios2
{
namespace transportman
......@@ -38,8 +36,6 @@ public:
const std::vector<Params> &params,
const bool profile);
void WriteWAN(const void *buffer, nlohmann::json jmsg);
/**
* For BP Format
* @param buffer
......@@ -63,23 +59,19 @@ private:
IO *m_IO = nullptr;
Operator *m_Callback = nullptr;
void ReadThread(std::shared_ptr<Transport> trans,
std::shared_ptr<Transport> ctl_trans,
const std::string stream_name, const Params trans_params);
void RunCallback(void *buffer, std::string doid, std::string var,
std::string dtype, std::vector<size_t> shape);
std::vector<std::shared_ptr<Transport>> m_ControlTransports;
std::vector<std::thread> m_ControlThreads;
std::vector<std::thread> m_ReadThreads;
std::vector<Params> m_TransportsParameters;
size_t m_CurrentTransport = 0;
bool m_Listening = false;
nlohmann::json m_JMessage;
size_t m_BufferSize = 1024 * 1024 * 1024;
/** Pick the appropriate default */
const int m_DefaultPort = 12306;
int m_Timeout = 5;
};
} // end namespace transportman
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment