Newer
Older
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* DataMan.cpp
*
* Created on: Jun 1, 2017
* Author: Jason Wang wangr1@ornl.gov
*/
#include "DataMan.h"
#include "adios2/helper/adiosFunctions.h"
#ifdef ADIOS2_HAVE_ZEROMQ
#include "adios2/toolkit/transport/wan/WANZmq.h"
#endif
{
namespace transportman
{
DataMan::DataMan(MPI_Comm mpiComm, const bool debugMode)
: TransportMan(mpiComm, debugMode)
{
}
DataMan::~DataMan()
{
for (auto &controlThread : m_ControlThreads)
{
Wang, Ruonan
committed
m_Listening = false;
void DataMan::OpenWANTransports(const std::string &name, const Mode mode,
const std::vector<Params> ¶metersVector,
const bool profile)
#ifdef ADIOS2_HAVE_ZEROMQ
size_t counter = 0; // remove MACRO when more libraries are added
#endif
for (const auto ¶meters : parametersVector)
{
Wang, Ruonan
committed
std::shared_ptr<Transport> wanTransport, controlTransport;
const std::string TransportType(
GetParameter("TransportType", parameters, true, m_DebugMode,
"Transport Type Parameter"));
const std::string transport(GetParameter(
"Transport", parameters, true, m_DebugMode, "Transport Parameter"));
GetParameter("IPAddress", parameters, true, m_DebugMode,
"Transport IPAddress Parameter"));
std::string portControl(GetParameter("Port", parameters, false,
m_DebugMode,
"Transport Port Parameter"));
portControl = std::to_string(m_DefaultPort);
const std::string portData(std::to_string(stoi(portControl) + 1));
Wang, Ruonan
committed
std::string transportName(GetParameter("Name", parameters, false,
m_DebugMode,
"Transport Name Parameter"));
std::string format(
GetParameter("Format", parameters, false, m_DebugMode, "Format"));
if (format.empty())
{
format = "json";
}
if (TransportType == "wan" || TransportType == "WAN")
if (transport == "zmq" || transport == "ZMQ")
wanTransport = std::make_shared<transport::WANZmq>(
ipAddress, portData, m_MPIComm, m_DebugMode);
wanTransport->Open(transportName, mode);
m_Transports.emplace(counter, wanTransport);
Wang, Ruonan
committed
controlTransport = std::make_shared<transport::WANZmq>(
ipAddress, portControl, m_MPIComm, m_DebugMode);
controlTransport->Open(transportName, mode);
m_ControlTransports.emplace_back(controlTransport);
{
m_Listening = true;
m_ControlThreads.emplace_back(
std::thread(&DataMan::ReadThread, this, wanTransport,
controlTransport, format));
#else
throw std::invalid_argument(
"ERROR: this version of ADIOS2 didn't compile with "
"ZMQ library, in call to Open\n");
#endif
}
else
{
throw std::invalid_argument("ERROR: wan transport " +
transport +
" not supported or not "
"provided in IO AddTransport, "
"in call to Open\n");
}
}
}
Wang, Ruonan
committed
}
}
void DataMan::WriteWAN(const void *buffer, nlohmann::json jmsg)
{
if (m_CurrentTransport >= m_ControlTransports.size())
{
throw std::runtime_error("ERROR: No valid control transports found, "
"from DataMan::WriteWAN()");
}
if (m_CurrentTransport >= m_Transports.size())
{
throw std::runtime_error(
"ERROR: No valid transports found, from DataMan::WriteWAN()");
}
Wang, Ruonan
committed
m_ControlTransports[m_CurrentTransport]->Write(jmsg.dump().c_str(),
jmsg.dump().size());
m_Transports[m_CurrentTransport]->Write(
reinterpret_cast<const char *>(buffer), jmsg["bytes"].get<size_t>());
Wang, Ruonan
committed
}
void DataMan::WriteWAN(const void *buffer, size_t size)
if (m_CurrentTransport >= m_Transports.size())
{
throw std::runtime_error(
"ERROR: No valid transports found, from DataMan::WriteWAN()");
}
// m_Transports[m_CurrentTransport]->Write(static_cast<const char
// *>(buffer), size);
m_Transports[m_CurrentTransport]->Write(
reinterpret_cast<const char *>(buffer), size);
std::ofstream bpfile("datamanW.bp", std::ios_base::binary);
bpfile.write(reinterpret_cast<const char *>(buffer), size);
bpfile.close();
for (int i = 0; i < size / 4; i++)
{
std::cout << static_cast<const float *>(buffer)[i] << " ";
}
Wang, Ruonan
committed
void DataMan::ReadWAN(void *buffer, nlohmann::json jmsg) {}
void DataMan::SetBP3Deserializer(format::BP3Deserializer &bp3Deserializer)
{
m_BP3Deserializer = &bp3Deserializer;
}
void DataMan::SetIO(IO &io) { m_IO = &io; }
void DataMan::SetCallback(adios2::Operator &callback)
Wang, Ruonan
committed
{
Wang, Ruonan
committed
}
void DataMan::ReadThread(std::shared_ptr<Transport> trans,
std::shared_ptr<Transport> ctl_trans,
const std::string format)
Wang, Ruonan
committed
{
if (format == "json" || format == "JSON")
Wang, Ruonan
committed
{
std::cout << "json" << std::endl;
Wang, Ruonan
committed
{
char buffer[1024];
size_t bytes = 0;
nlohmann::json jmsg;
adios2::Transport::Status status;
ctl_trans->IRead(buffer, 1024, status, 0);
if (status.Bytes > 0)
Wang, Ruonan
committed
{
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
std::string smsg(buffer);
jmsg = nlohmann::json::parse(smsg);
bytes = jmsg.value("bytes", 0);
if (bytes > 0)
{
std::vector<char> data(bytes);
trans->Read(data.data(), bytes);
std::string doid =
jmsg.value("doid", "Unknown Data Object");
std::string var = jmsg.value("var", "Unknown Variable");
std::string dtype =
jmsg.value("dtype", "Unknown Data Type");
std::vector<size_t> putshape =
jmsg.value("putshape", std::vector<size_t>());
if (m_Callback != nullptr &&
m_Callback->m_Type == "Signature2")
{
m_Callback->RunCallback2(data.data(), doid, var, dtype,
putshape);
}
}
}
}
}
else if (format == "bp" || format == "BP")
{
while (m_Listening)
{
// char *buffer = new char[m_BufferSize];
std::vector<char> buffer(m_BufferSize);
{
m_BP3Deserializer->m_Data.Resize(
status.Bytes, "in DataMan Streaming Listener");
std::memcpy(m_BP3Deserializer->m_Data.m_Buffer.data(),
buffer.data(), status.Bytes);
/* TODO: remove this part */
m_Callback->RunCallback2(buffer.data(), "ss", "rr", "char",
adios2::Dims{128});
std::ofstream bpfile("datamanR.bp", std::ios_base::binary);
bpfile.write(m_BP3Deserializer->m_Data.m_Buffer.data(),
m_BP3Deserializer->m_Data.m_Buffer.size());
bpfile.close();
m_BP3Deserializer->ParseMetadata(*m_IO);
const auto variablesInfo = m_IO->GetAvailableVariables();
for (const auto &variableInfoPair : variablesInfo)
Wang, Ruonan
committed
{
std::cout << "Variable Name: " << variableInfoPair.first
<< "\n";
for (const auto ¶meter : variableInfoPair.second)
{
std::cout << "\tKey: " << parameter.first
<< "\t Value: " << parameter.second << "\n";
}
Wang, Ruonan
committed
}
Wang, Ruonan
committed
}
}
}
}
} // end namespace transportman
} // end namespace adios2