Skip to content
Snippets Groups Projects
Unverified Commit 9ee54039 authored by pnorbert's avatar pnorbert Committed by GitHub
Browse files

Merge pull request #366 from JasonRuonanWang/bpindataman

heat transfer read/write worked with dataman
parents 26990b0c 79029150
No related branches found
No related tags found
No related merge requests found
......@@ -19,7 +19,7 @@
<!-- XXKb, XXMb, or XXXGb supported, default=16Kb
(applications might choose an optimal value) -->
<parameter key="InitialBufferSize" value="16Kb"/>
<parameter key="InitialBufferSize" value="128Mb"/>
</engine>
......@@ -56,7 +56,10 @@
<transport type="WAN">
<!-- POSIX, stdio (C FILE*), fstream (C++) -->
<parameter key="Library" value="POSIX"/>
<parameter key="Library" value="ZMQ"/>
<!-- timeout in seconds -->
<parameter key="Timeout" value="5"/>
<!-- IP Address for network transfer -->
<parameter key="IPAddress" value="127.0.0.1"/>
......
......@@ -91,10 +91,12 @@ int main(int argc, char *argv[])
dataManIO.SetParameters({
{"Format", "bp"},
});
dataManIO.AddTransport(
"WAN", {
{"Library", "ZMQ"}, {"IPAddress", "127.0.0.1"},
});
dataManIO.AddTransport("WAN", {
{"Library", "ZMQ"},
{"IPAddress", "127.0.0.1"},
{"DumpFile", "YES"},
{"Callback", "YES"},
});
dataManIO.AddOperator(callbackFloat); // propagate to all Engines
adios2::Engine &dataManReader =
......
......@@ -29,11 +29,30 @@ DataManReader::DataManReader(IO &io, const std::string &name, const Mode mode,
StepStatus DataManReader::BeginStep(StepMode stepMode,
const float timeoutSeconds)
{
StepStatus status = StepStatus::OK;
// here fill the logic for the while loop listener...
// m_BP3Deserializer.m_MetadataSet.StepsCount will have the number of steps
// per buffer
// Look at the BPFileReader.BeginStep implementation
std::vector<char> buffer;
buffer.reserve(m_BufferSize);
size_t size = 0;
m_Man.ReadWAN(buffer.data(), size);
StepStatus status;
if (size > 0)
{
status = StepStatus::OK;
m_BP3Deserializer.m_Data.Resize(size, "in DataMan Streaming Listener");
std::memcpy(m_BP3Deserializer.m_Data.m_Buffer.data(), buffer.data(),
size);
m_BP3Deserializer.ParseMetadata(m_BP3Deserializer.m_Data, m_IO);
}
else
{
status = StepStatus::EndOfStream;
}
return status;
}
......@@ -93,15 +112,12 @@ bool DataManReader::GetUIntParameter(Params &params, std::string key,
}
void DataManReader::InitParameters()
{
GetUIntParameter(m_IO.m_Parameters, "NChannels", m_NChannels);
GetStringParameter(m_IO.m_Parameters, "Format", m_UseFormat);
}
void DataManReader::InitTransports()
{
size_t channels = m_IO.m_TransportsParameters.size();
std::vector<std::string> names;
for (size_t i = 0; i < channels; ++i)
......@@ -114,7 +130,6 @@ void DataManReader::InitTransports()
}
void DataManReader::Init()
{
for (auto &j : m_IO.m_Operators)
{
if (j.ADIOSOperator.m_Type == "Signature2")
......
......@@ -35,7 +35,6 @@ public:
* @param debugMode
* @param nthreads
*/
using json = nlohmann::json;
DataManReader(IO &io, const std::string &name, const Mode mode,
MPI_Comm mpiComm);
......@@ -54,8 +53,9 @@ private:
format::BP3Deserializer m_BP3Deserializer;
transportman::DataMan m_Man;
size_t m_BufferSize = 1024 * 1024 * 1024;
unsigned int m_NChannels = 1;
std::string m_UseFormat = "json";
std::string m_UseFormat = "BP";
bool m_DoMonitor = false;
void Init();
......
......@@ -12,6 +12,7 @@
#define ADIOS2_ENGINE_DATAMAN_DATAMANREADER_TCC_
#include "DataManReader.h"
#include <iostream>
namespace adios2
{
......@@ -19,21 +20,22 @@ namespace adios2
template <class T>
void DataManReader::GetSyncCommon(Variable<T> &variable, T *data)
{
if (m_UseFormat == "BP")
if (m_UseFormat == "BP" || m_UseFormat == "bp" )
{
variable.SetData(data);
int mpiSize;
MPI_Comm_size(m_MPIComm, &mpiSize);
m_BP3Deserializer.GetSyncVariableDataFromStream(
variable, m_BP3Deserializer.m_Data);
size_t varsize = std::accumulate(variable.m_Shape.begin(), variable.m_Shape.end(), sizeof(T),
std::multiplies<std::size_t>());
std::memcpy(data, variable.GetData(), varsize/mpiSize);
}
}
// TODO: let;s try with GetSync first, GetDeferred, PerformGets is just a
// wrapper
template <class T>
void DataManReader::GetDeferredCommon(Variable<T> &variable, T *data)
{
m_BP3Deserializer.GetDeferredVariable(variable, data);
m_BP3Deserializer.m_PerformedGets = false;
GetSyncCommon(variable, data);
}
} // end namespace adios2
......
......@@ -37,7 +37,7 @@ void DataManWriter::EndStep()
{
if (m_UseFormat == "bp" || m_UseFormat == "BP")
{
// m_BP3Serializer.SerializeData(m_IO, true);
m_BP3Serializer.SerializeData(m_IO, true);
}
}
......
......@@ -16,6 +16,8 @@
#include "adios2/ADIOSMPI.h"
#include "adios2/helper/adiosFunctions.h" //GetType<T>
#include <iostream>
namespace adios2
{
......@@ -38,24 +40,7 @@ void DataManWriter::PutSyncCommon(Variable<T> &variable, const T *values)
variable.m_Start.assign(variable.m_Count.size(), 0);
}
if (m_UseFormat == "json" || m_UseFormat == "JSON")
{
nlohmann::json jmsg;
jmsg["doid"] = m_Name;
jmsg["var"] = variable.m_Name;
jmsg["dtype"] = GetType<T>();
jmsg["putshape"] = variable.m_Count;
jmsg["varshape"] = variable.m_Shape;
jmsg["offset"] = variable.m_Start;
jmsg["timestep"] = 0;
jmsg["bytes"] =
std::accumulate(variable.m_Shape.begin(), variable.m_Shape.end(),
sizeof(T), std::multiplies<size_t>());
m_Man.WriteWAN(values, jmsg);
}
else if (m_UseFormat == "bp" || m_UseFormat == "BP")
if (m_UseFormat == "bp" || m_UseFormat == "BP")
{
PutSyncCommonBP(variable, values);
}
......
......@@ -31,10 +31,10 @@ DataMan::DataMan(MPI_Comm mpiComm, const bool debugMode)
DataMan::~DataMan()
{
for (auto &controlThread : m_ControlThreads)
for (auto &readThread : m_ReadThreads)
{
m_Listening = false;
controlThread.join();
readThread.join();
}
}
......@@ -61,47 +61,45 @@ void DataMan::OpenWANTransports(const std::vector<std::string> &streamNames,
m_DebugMode,
"Transport Library Parameter"));
const std::string callback(
GetParameter("Callback", paramsVector[i], false, m_DebugMode,
"Transport Callback Parameter"));
const std::string ipAddress(
GetParameter("IPAddress", paramsVector[i], true, m_DebugMode,
"Transport IPAddress Parameter"));
std::string portControl(GetParameter("Port", paramsVector[i], false,
m_DebugMode,
"Transport Port Parameter"));
std::string port(GetParameter("Port", paramsVector[i], false,
m_DebugMode, "Transport Port Parameter"));
if (portControl.empty())
if (port.empty())
{
portControl = std::to_string(m_DefaultPort);
port = std::to_string(m_DefaultPort);
}
const std::string portData(std::to_string(stoi(portControl) + 1));
int mpiRank;
MPI_Comm_rank(m_MPIComm, &mpiRank);
std::string format(GetParameter("Format", paramsVector[i], false,
m_DebugMode, "Format"));
if (format.empty())
{
format = "bp";
}
port = std::to_string(stoi(port) + mpiRank);
if (library == "zmq" || library == "ZMQ")
{
#ifdef ADIOS2_HAVE_ZEROMQ
wanTransport = std::make_shared<transport::WANZmq>(
ipAddress, portData, m_MPIComm, m_DebugMode);
ipAddress, port, m_MPIComm, m_DebugMode);
wanTransport->Open(streamNames[i], mode);
m_Transports.emplace(counter, wanTransport);
controlTransport = std::make_shared<transport::WANZmq>(
ipAddress, portControl, m_MPIComm, m_DebugMode);
controlTransport->Open(streamNames[i], mode);
m_ControlTransports.emplace_back(controlTransport);
if (mode == Mode::Read)
{
m_Listening = true;
m_ControlThreads.emplace_back(
std::thread(&DataMan::ReadThread, this, wanTransport,
controlTransport, format));
if (callback == "YES" || callback == "yes" ||
callback == "TRUE" || callback == "true")
{
m_Listening = true;
m_ReadThreads.emplace_back(
std::thread(&DataMan::ReadThread, this, wanTransport,
streamNames[i], paramsVector[i]));
}
}
++counter;
#else
......@@ -123,45 +121,35 @@ void DataMan::OpenWANTransports(const std::vector<std::string> &streamNames,
}
}
void DataMan::WriteWAN(const void *buffer, nlohmann::json jmsg)
void DataMan::WriteWAN(const void *buffer, size_t size)
{
if (m_CurrentTransport >= m_ControlTransports.size())
{
throw std::runtime_error("ERROR: No valid control transports found, "
"from DataMan::WriteWAN()");
}
if (m_CurrentTransport >= m_Transports.size())
{
throw std::runtime_error(
"ERROR: No valid transports found, from DataMan::WriteWAN()");
}
m_ControlTransports[m_CurrentTransport]->Write(jmsg.dump().c_str(),
jmsg.dump().size());
m_Transports[m_CurrentTransport]->Write(
reinterpret_cast<const char *>(buffer), jmsg["bytes"].get<size_t>());
reinterpret_cast<const char *>(buffer), size);
}
void DataMan::WriteWAN(const void *buffer, size_t size)
void DataMan::ReadWAN(void *buffer, size_t &size)
{
if (m_CurrentTransport >= m_Transports.size())
Transport::Status status;
for (int i = 0; i < m_Timeout * 1000; ++i)
{
throw std::runtime_error(
"ERROR: No valid transports found, from DataMan::WriteWAN()");
m_Transports[m_CurrentTransport]->IRead(static_cast<char *>(buffer),
m_BufferSize, status);
if (status.Bytes > 0)
{
size = status.Bytes;
return;
}
std::this_thread::sleep_for(std::chrono::microseconds(1000));
}
m_Transports[m_CurrentTransport]->Write(
reinterpret_cast<const char *>(buffer), size);
/*
// dumping file for debugging
std::ofstream bpfile("datamanW.bp", std::ios_base::binary);
bpfile.write(reinterpret_cast<const char *>(buffer), size);
bpfile.close();
*/
}
void DataMan::ReadWAN(void *buffer, nlohmann::json jmsg) {}
void DataMan::SetBP3Deserializer(format::BP3Deserializer &bp3Deserializer)
{
m_BP3Deserializer = &bp3Deserializer;
......@@ -175,45 +163,18 @@ void DataMan::SetCallback(adios2::Operator &callback)
}
void DataMan::ReadThread(std::shared_ptr<Transport> trans,
std::shared_ptr<Transport> ctl_trans,
const std::string format)
const std::string stream_name,
const Params trans_params)
{
if (format == "json" || format == "JSON")
std::string format(GetParameter("Format", trans_params, false, m_DebugMode,
"Transport Format Parameter"));
if (format.empty())
{
while (m_Listening)
{
char buffer[1024];
size_t bytes = 0;
nlohmann::json jmsg;
adios2::Transport::Status status;
ctl_trans->IRead(buffer, 1024, status, 0);
if (status.Bytes > 0)
{
std::string smsg(buffer);
jmsg = nlohmann::json::parse(smsg);
bytes = jmsg.value("bytes", 0);
if (bytes > 0)
{
std::vector<char> data(bytes);
trans->Read(data.data(), bytes);
std::string doid =
jmsg.value("doid", "Unknown Data Object");
std::string var = jmsg.value("var", "Unknown Variable");
std::string dtype =
jmsg.value("dtype", "Unknown Data Type");
std::vector<size_t> putshape =
jmsg.value("putshape", std::vector<size_t>());
if (m_Callback != nullptr &&
m_Callback->m_Type == "Signature2")
{
m_Callback->RunCallback2(data.data(), doid, var, dtype,
putshape);
}
}
}
}
format = "bp";
}
else if (format == "bp" || format == "BP")
if (format == "bp" || format == "BP")
{
while (m_Listening)
{
......@@ -231,7 +192,20 @@ void DataMan::ReadThread(std::shared_ptr<Transport> trans,
std::memcpy(m_BP3Deserializer->m_Data.m_Buffer.data(),
buffer.data(), status.Bytes);
/* write bp file for debugging */
const std::string dumpFile(
GetParameter("DumpFile", trans_params, false, m_DebugMode,
"Transport DumpFile Parameter"));
// TODO: Aggregation
if (dumpFile == "yes" || dumpFile == "YES" ||
dumpFile == "TRUE" || dumpFile == "true")
{
std::ofstream bpfile(stream_name, std::ios_base::binary);
bpfile.write(reinterpret_cast<const char *>(buffer.data()),
status.Bytes);
bpfile.close();
}
m_BP3Deserializer->ParseMetadata(m_BP3Deserializer->m_Data,
*m_IO);
......
......@@ -18,8 +18,6 @@
#include "adios2/toolkit/format/bp3/BP3.h"
#include "adios2/toolkit/transportman/TransportMan.h"
#include <json.hpp>
namespace adios2
{
namespace transportman
......@@ -38,8 +36,6 @@ public:
const std::vector<Params> &params,
const bool profile);
void WriteWAN(const void *buffer, nlohmann::json jmsg);
/**
* For BP Format
* @param buffer
......@@ -47,7 +43,7 @@ public:
*/
void WriteWAN(const void *buffer, size_t size);
void ReadWAN(void *buffer, nlohmann::json jmsg);
void ReadWAN(void *buffer, size_t &size);
/**
* Set BP3 deserializer private pointer m_BP3Deserializer, from Engine
......@@ -63,20 +59,19 @@ private:
IO *m_IO = nullptr;
Operator *m_Callback = nullptr;
void ReadThread(std::shared_ptr<Transport> trans,
std::shared_ptr<Transport> ctl_trans,
const std::string format);
const std::string stream_name, const Params trans_params);
void RunCallback(void *buffer, std::string doid, std::string var,
std::string dtype, std::vector<size_t> shape);
std::vector<std::shared_ptr<Transport>> m_ControlTransports;
std::vector<std::thread> m_ControlThreads;
std::vector<std::thread> m_ReadThreads;
std::vector<Params> m_TransportsParameters;
size_t m_CurrentTransport = 0;
bool m_Listening = false;
nlohmann::json m_JMessage;
size_t m_BufferSize = 1024 * 1024 * 1024;
/** Pick the appropriate default */
const int m_DefaultPort = 12306;
int m_Timeout = 5;
};
} // end namespace transportman
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment