Newer
Older
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* DataMan.cpp
*
* Created on: Jun 1, 2017
* Author: Jason Wang wangr1@ornl.gov
*/
Wang, Ruonan
committed
#include "adios2/toolkit/transportman/dataman/DataMan.h"
#include "adios2/helper/adiosString.h"
#ifdef ADIOS2_HAVE_ZEROMQ
#include "adios2/toolkit/transport/wan/WANZmq.h"
#endif
{
namespace transportman
{
DataMan::DataMan(MPI_Comm mpiComm, const bool debugMode)
: TransportMan(mpiComm, debugMode)
{
}
void DataMan::OpenWANTransports(const std::string &name, const Mode openMode,
const std::vector<Params> ¶metersVector,
const bool profile)
{
for (const auto ¶meters : parametersVector)
{
Wang, Ruonan
committed
std::shared_ptr<Transport> wanTransport, controlTransport;
Wang, Ruonan
committed
// to be removed
for (auto &i : parameters)
std::cout << i.first << " " << i.second << std::endl;
const std::string type(GetParameter(
"type", parameters, true, m_DebugMode, "Transport Type Parameter"));
const std::string library(GetParameter("Library", parameters, true,
m_DebugMode,
"Transport Library Parameter"));
GetParameter("IPAddress", parameters, true, m_DebugMode,
"Transport IPAddress Parameter"));
std::string portControl(GetParameter("Port", parameters, false,
m_DebugMode,
"Transport Port Parameter"));
portControl = std::to_string(m_DefaultPort);
const std::string portData(std::to_string(stoi(portControl) + 1));
Wang, Ruonan
committed
std::string messageName(GetParameter("Name", parameters, false,
m_DebugMode,
"Transport Name Parameter"));
if (messageName.empty())
{
messageName = name;
}
wanTransport = std::make_shared<transport::WANZmq>(
ipAddress, portData, m_MPIComm, m_DebugMode);
Wang, Ruonan
committed
controlTransport = std::make_shared<transport::WANZmq>(
ipAddress, portControl, m_MPIComm, m_DebugMode);
#else
throw std::invalid_argument(
"ERROR: this version of ADIOS2 didn't compile with "
"ZMQ library, in call to Open\n");
#endif
}
else
{
throw std::invalid_argument("ERROR: wan library " +
library +
" not supported or not "
"provided in IO AddTransport, "
"in call to Open\n");
}
}
wanTransport->Open(messageName, openMode);
m_Transports.emplace(counter, std::move(wanTransport));
controlTransport->Open(messageName, openMode);
m_ControlTransports.push_back(std::move(controlTransport));
Wang, Ruonan
committed
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
}
}
void DataMan::WriteWAN(const void *buffer, nlohmann::json jmsg)
{
m_ControlTransports[m_CurrentTransport]->Write(jmsg.dump().c_str(),
jmsg.dump().size());
m_Transports[m_CurrentTransport]->Write(static_cast<const char *>(buffer),
jmsg["bytes"].get<size_t>());
}
void DataMan::SetCallback(std::function<void(const void *, std::string,
std::string, std::string, Dims)>
callback)
{
m_CallBack = callback;
}
void DataMan::ReadThread(std::shared_ptr<Transport> trans,
std::shared_ptr<Transport> ctl_trans)
{
while (m_Listening)
{
// Wait for Read API to be implemented
/*
if (ctl_trans->Read() >= 0)
{
std::string smsg;
nlohmann::json jmsg = json::parse(smsg);
}
else
{
usleep(1);
Wang, Ruonan
committed
*/
}
}
} // end namespace transportman
} // end namespace adios