-
Wang, Ruonan authored
fixed a few problems of DataMan/ZeroMQ, and added a simple Transport::Read() API to make the hello world workflow working again
Wang, Ruonan authoredfixed a few problems of DataMan/ZeroMQ, and added a simple Transport::Read() API to make the hello world workflow working again
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
DataManReader.cpp 3.28 KiB
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* DataManReader.cpp
*
* Created on: Feb 21, 2017
* Author: wfg
*/
#include "DataManReader.h"
#include "adios2/helper/adiosFunctions.h" //CSVToVector
namespace adios2
{
DataManReader::DataManReader(IO &io, const std::string &name,
const OpenMode openMode, MPI_Comm mpiComm)
: Engine("DataManReader", io, name, openMode, mpiComm), m_Man(mpiComm, true)
{
m_EndMessage = " in call to IO Open DataManReader " + m_Name + "\n";
Init();
}
void DataManReader::SetCallBack(
std::function<void(const void *, std::string, std::string, std::string,
Dims)>
callback)
{
m_CallBack = callback;
m_Man.SetCallback(callback);
}
void DataManReader::Close(const int transportIndex) {}
// PRIVATE
void DataManReader::Init()
{
auto itRealTime = m_IO.m_Parameters.find("real_time");
if (itRealTime != m_IO.m_Parameters.end())
{
if (itRealTime->second == "yes" || itRealTime->second == "true")
m_DoRealTime = true;
}
if (m_DoRealTime)
{
/**
* Lambda function that assigns a parameter in m_Method to a
* localVariable
* of type std::string
*/
auto lf_AssignString = [this](const std::string parameter,
std::string &localVariable) {
auto it = m_IO.m_Parameters.find(parameter);
if (it != m_IO.m_Parameters.end())
{
localVariable = it->second;
}
};
/**
* Lambda function that assigns a parameter in m_Method to a
* localVariable
* of type int
*/
auto lf_AssignInt = [this](const std::string parameter,
int &localVariable) {
auto it = m_IO.m_Parameters.find(parameter);
if (it != m_IO.m_Parameters.end())
{
localVariable = std::stoi(it->second);
}
};
auto is_number = [](const std::string &s) {
return !s.empty() && std::find_if(s.begin(), s.end(), [](char c) {
return !std::isdigit(c);
}) == s.end();
};
/*
json jmsg;
for (auto &i : m_IO.m_Parameters)
{
if (is_number(i.second))
{
jmsg[i.first] = std::stoi(i.second);
}
else
{
jmsg[i.first] = i.second;
}
}
jmsg["stream_mode"] = "receiver";
*/
int n_Transports = 1;
std::vector<Params> para(n_Transports);
for (unsigned int i = 0; i < para.size(); i++)
{
para[i]["type"] = "wan";
para[i]["transport"] = "zmq";
para[i]["name"] = "stream";
para[i]["ipaddress"] = "127.0.0.1";
}
m_Man.OpenWANTransports("zmq", adios2::OpenMode::Read, para, true);
std::string methodType;
int numChannels = 0;
lf_AssignString("method_type", methodType);
lf_AssignInt("num_channels", numChannels);
}
else
{
InitTransports();
}
}
} // end namespace adios