Newer
Older
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* MdtmMan.cpp
*
* Created on: Apr 20, 2017
* Author: Jason Wang
*/
#include "MdtmMan.h"
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
Ruonan Wang
committed
int MdtmMan::init(json a_jmsg)
std::cout << " 1 MdtmMan::init " << m_channel_id << std::endl;
Ruonan Wang
committed
StreamMan::init(a_jmsg);
Ruonan Wang
committed
if (a_jmsg["pipe_prefix"].is_string())
Ruonan Wang
committed
m_pipepath = a_jmsg["pipe_prefix"].get<std::string>();
Ruonan Wang
committed
json pipe_desc;
Ruonan Wang
committed
pipe_desc["pipe_prefix"] = m_pipepath;
Ruonan Wang
committed
std::stringstream pname;
pname << m_pipename_prefix << m_channel_id;
m_pipename = pname.str();
m_full_pipename = m_pipepath + m_pipename;
// send JSON message to MDTM
if (m_channel_id == 0)
Ruonan Wang
committed
for (int i = 0; i < m_num_channels; ++i)
Ruonan Wang
committed
std::stringstream pipename;
pipename << m_pipename_prefix << i;
if (i == 0)
{
pipe_desc["pipe_names"] = {pipename.str()};
}
else
{
pipe_desc["pipe_names"].insert(pipe_desc["pipe_names"].end(),
pipename.str());
}
Ruonan Wang
committed
void *zmq_ipc_req = nullptr;
zmq_ipc_req = zmq_socket(m_zmq_context, ZMQ_REQ);
zmq_connect(zmq_ipc_req, "ipc:///tmp/ADIOS_MDTM_pipe");
zmq_send(zmq_ipc_req, pipe_desc.dump().c_str(),
pipe_desc.dump().length(), 0);
zmq_recv(zmq_ipc_req, buffer_return, sizeof(buffer_return), 0);
Ruonan Wang
committed
if (zmq_ipc_req)
{
zmq_close(zmq_ipc_req);
}
Ruonan Wang
committed
// Make pipes
mkdir(m_pipepath.c_str(), 0755);
std::cout << "making " << m_full_pipename << std::endl;
mkfifo(m_full_pipename.c_str(), 0666);
std::cout << "made " << m_full_pipename << std::endl;
Ruonan Wang
committed
if (m_stream_mode == "sender")
std::cout << "opening " << m_full_pipename << std::endl;
m_pipe_handler = open(m_full_pipename.c_str(), O_WRONLY);
std::cout << "opened " << m_full_pipename << std::endl;
Ruonan Wang
committed
if (m_stream_mode == "receiver")
m_pipe_handler = open(m_full_pipename.c_str(), O_RDONLY | O_NONBLOCK);
Ruonan Wang
committed
int MdtmMan::put(const void *a_data, json &a_jmsg)
Ruonan Wang
committed
a_jmsg["pipe"] = m_pipename;
put_begin(a_data, a_jmsg);
Ruonan Wang
committed
StreamMan::put_stream(a_data, a_jmsg);
put_end(a_data, a_jmsg);
Ruonan Wang
committed
int MdtmMan::get(void *a_data, json &a_jmsg) { return 0; }
void MdtmMan::on_put(std::shared_ptr<std::vector<char>> a_data)
{
write(m_pipe_handler, a_data->data(), a_data->size());
}
Ruonan Wang
committed
void MdtmMan::on_recv(json &a_jmsg)
jqueue.push(a_jmsg);
vqueue.push(std::vector<char>());
if (jqueue.front()["operation"] == "flush")
{
callback_cache();
vqueue.pop();
iqueue.pop();
if (jqueue.empty())
for (int outloop = 0; outloop < jqueue.size() * 2; outloop++)
{
if (jqueue.front()["operation"] == "put")
{
json &jmsg = jqueue.front();
size_t sendbytes = jmsg["sendbytes"].get<size_t>();
vqueue.front() = std::vector<char>(sendbytes);
while (iqueue.front() < sendbytes)
Ruonan Wang
committed
int ret =
read(m_pipe_handler, vqueue.front().data() + iqueue.front(),
sendbytes - iqueue.front());
iqueue.front() += ret;
if (iqueue.front() == sendbytes)
if (a_jmsg["compression_method"].is_string())
if (a_jmsg["compression_method"].get<std::string>() !=
"null")
{
auto_transform(vqueue.front(), a_jmsg);
}
if (a_jmsg["varshape"] == a_jmsg["putshape"])
{
std::cout << "callback_direct \n";
callback_direct(vqueue.front().data(), jmsg);
}
else
{
m_cache.put(vqueue.front().data(), jmsg);
}
vqueue.pop();
iqueue.pop();