diff --git a/source/dataman/CacheMan.cpp b/source/dataman/CacheMan.cpp index f53cfaa6d56adcb4fa0fa0129bd8bde0ffc950fe..d18c3eb140a278e2b8bccd502948ae0dae5842fa 100644 --- a/source/dataman/CacheMan.cpp +++ b/source/dataman/CacheMan.cpp @@ -19,14 +19,14 @@ int CacheItem::init(json a_jmsg) return 0; } -int CacheMan::put(const void *a_data, json a_jmsg) +int CacheMan::put(const void *a_data, json &a_jmsg) { std::string doid = a_jmsg["doid"].get<std::string>(); std::string var = a_jmsg["var"].get<std::string>(); return m_cache[doid][var].put(a_data, a_jmsg); } -int CacheItem::put(const void *a_data, json a_jmsg) +int CacheItem::put(const void *a_data, json &a_jmsg) { if (!m_initialized) diff --git a/source/dataman/CacheMan.h b/source/dataman/CacheMan.h index 9668f014f3f484fec86f0660851a5bbff33d45a6..0aaaacdef36ce02b169a81adac4bdfc7b7caa9a5 100644 --- a/source/dataman/CacheMan.h +++ b/source/dataman/CacheMan.h @@ -21,7 +21,7 @@ public: using json = nlohmann::json; int init(json a_jmsg); - virtual int put(const void *a_data, json a_jmsg); + virtual int put(const void *a_data, json &a_jmsg); virtual void transform(std::vector<char> &a_data, json &a_jmsg) {} void *get(); @@ -81,7 +81,7 @@ class CacheMan public: using json = nlohmann::json; - int put(const void *a_data, json a_jmsg); + int put(const void *a_data, json &a_jmsg); void *get(std::string doid, std::string var); void pop(); void push(); diff --git a/source/dataman/DataMan.cpp b/source/dataman/DataMan.cpp index 8bc55fcb79f8391c2b8f12e6a01c331fb084d214..d3ba9a83fc147368a1cf0409a54cb2a9970b37d0 100644 --- a/source/dataman/DataMan.cpp +++ b/source/dataman/DataMan.cpp @@ -12,16 +12,19 @@ int DataMan::init(json a_jmsg) { return 0; } -int DataMan::put(const void *a_data, std::string p_doid, std::string p_var, - std::string p_dtype, std::vector<size_t> p_putshape, - std::vector<size_t> p_varshape, std::vector<size_t> p_offset, - size_t p_timestep, int p_tolerance, int p_priority) +int DataMan::put_streams(const void *a_data, json &a_jmsg) { - return DataMan::put(a_data, p_doid, p_var, p_dtype, p_putshape, p_varshape, - p_offset, p_timestep, p_tolerance, p_priority); + a_jmsg["channel_id"] = m_stream_index; + m_stream_mans[m_stream_index]->put(a_data, a_jmsg); + ++m_stream_index; + if (m_stream_index >= m_stream_mans.size()) + { + m_stream_index = 0; + } + return 0; } -int DataMan::put(const void *a_data, json a_jmsg) +int DataMan::put(const void *a_data, json &a_jmsg) { a_jmsg["timestep"] = m_timestep; if (m_cache_size > 0) @@ -32,9 +35,10 @@ int DataMan::put(const void *a_data, json a_jmsg) else { put_begin(a_data, a_jmsg); + put_streams(a_data, a_jmsg); put_end(a_data, a_jmsg); } - + dump_profiling(); return 0; } @@ -57,32 +61,34 @@ void DataMan::add_stream(json a_jmsg) m_cache_size = a_jmsg["cachesize"].get<size_t>(); } - if (m_tolerance.size() < m_num_channels) + int num_channels = 1; + + if (a_jmsg["num_channels"].is_number()) { - for (int i = 0; i < m_num_channels; ++i) - { - m_tolerance.push_back(0); - } + num_channels = a_jmsg["num_channels"].get<int>(); } - if (m_priority.size() < m_num_channels) + else { - for (int i = 0; i < m_num_channels; ++i) - { - m_priority.push_back(100 / (i + 1)); - } + a_jmsg["num_channels"] = num_channels; } - auto man = get_man(method); - if (man) + for (int i = 0; i < num_channels; i++) { - man->init(a_jmsg); - this->add_next(method, man); - } - if (a_jmsg["compression_method"].is_string()) - { - if (a_jmsg["compression_method"] != "null") + a_jmsg["channel_id"] = i; + a_jmsg["local_port"] = a_jmsg["local_port"].get<int>() + 2; + a_jmsg["remote_port"] = a_jmsg["remote_port"].get<int>() + 2; + auto man = get_man(method); + if (man) { - add_man_to_path(a_jmsg["compression_method"], method, a_jmsg); + std::cout << a_jmsg.dump(4); + man->init(a_jmsg); + m_stream_mans.push_back(man); + } + if (a_jmsg["compression_method"].is_string()) + { + if (a_jmsg["compression_method"] != "null") + { + } } } } @@ -104,10 +110,10 @@ void DataMan::flush() { json jmsg = m_cache.get_jmsg(j, k); put_begin(m_cache.get(j, k), jmsg); + put_streams(m_cache.get(j, k), jmsg); put_end(m_cache.get(j, k), jmsg); } } - flush_next(); m_cache.pop(); } } @@ -116,10 +122,6 @@ void DataMan::flush() m_cache.push(); } } - else - { - flush_next(); - } } int DataMan::get(void *a_data, json &a_jmsg) { return 0; } diff --git a/source/dataman/DataMan.h b/source/dataman/DataMan.h index 20ccdc63a60545087130e0d59974321bbc10b919..f37b86d3b08165683889cf29e0a53c2fb775cd26 100644 --- a/source/dataman/DataMan.h +++ b/source/dataman/DataMan.h @@ -19,31 +19,22 @@ class DataMan : public DataManBase public: DataMan() = default; virtual ~DataMan() = default; - virtual int init(json p_jmsg); - virtual int put(const void *p_data, json p_jmsg); - virtual int get(void *p_data, json &p_jmsg); + virtual int init(json a_jmsg); + virtual int put(const void *a_data, json &a_jmsg); + virtual int get(void *a_data, json &a_jmsg); + int put_streams(const void *a_data, json &a_jmsg); void flush(); - void add_stream(json p_jmsg); - int put(const void *p_data, std::string p_doid, std::string p_var, - std::string p_dtype, std::vector<size_t> p_putshape, - std::vector<size_t> p_varshape, std::vector<size_t> p_offset, - size_t p_timestep, int p_tolerance = 0, int p_priority = 100); + void add_stream(json a_jmsg); void add_file(std::string p_method); std::string name() { return "DataManager"; } std::string type() { return "Manager"; } virtual void transform(std::vector<char> &a_data, json &a_jmsg) {} private: - std::string m_local_ip; - std::string m_remote_ip; - int m_local_port = 0; - int m_remote_port = 0; - int m_num_channels = 0; - std::vector<int> m_tolerance; - std::vector<int> m_priority; CacheMan m_cache; size_t m_cache_size = 0; size_t m_timestep = 0; + int m_stream_index = 0; }; -#endif /* DATAMAN_H_ */ +#endif // DATAMAN_DATAMAN_H_ diff --git a/source/dataman/DataManBase.cpp b/source/dataman/DataManBase.cpp index 3012da8bba660730e131ef71e1228beb5502776c..5d498e31cd1758609dc201bf11d0002853332297 100644 --- a/source/dataman/DataManBase.cpp +++ b/source/dataman/DataManBase.cpp @@ -106,27 +106,6 @@ DataManBase::DataManBase() m_start_time = std::chrono::system_clock::now(); } -int DataManBase::put(const void *p_data, std::string p_doid, std::string p_var, - std::string p_dtype, std::vector<size_t> p_putshape, - std::vector<size_t> p_varshape, - std::vector<size_t> p_offset, size_t p_timestep, - int p_tolerance, int p_priority) -{ - json msg; - msg["doid"] = p_doid; - msg["var"] = p_var; - msg["dtype"] = p_dtype; - msg["putshape"] = p_putshape; - msg["putbytes"] = product(p_putshape, dsize(p_dtype)); - msg["varshape"] = p_varshape; - msg["varbytes"] = product(p_varshape, dsize(p_dtype)); - msg["offset"] = p_offset; - msg["timestep"] = p_timestep; - msg["tolerance"] = p_tolerance; - msg["priority"] = p_priority; - return put(p_data, msg); -} - int DataManBase::put_begin(const void *p_data, json &p_jmsg) { check_shape(p_jmsg); @@ -152,7 +131,6 @@ int DataManBase::put_end(const void *p_data, json &p_jmsg) m_profiling["total_mb"] = m_profiling["total_mb"].get<double>() + product(p_jmsg["varshape"], dsize(p_jmsg["dtype"])) / 1000000.0f; - std::cout << product(p_jmsg["varshape"], dsize(p_jmsg["dtype"])) << "\n"; duration = end - m_start_time; m_profiling["total_workflow_time"] = duration.count(); m_profiling["workflow_mbs"] = @@ -161,50 +139,23 @@ int DataManBase::put_end(const void *p_data, json &p_jmsg) m_profiling["manager_mbs"] = m_profiling["total_mb"].get<double>() / m_profiling["total_manager_time"].get<double>(); - put_next(p_data, p_jmsg); return 0; } -int DataManBase::get(void *p_data, std::string p_doid, std::string p_var, - std::string p_dtype, std::vector<size_t> p_getshape, - std::vector<size_t> p_varshape, - std::vector<size_t> p_offset, size_t p_timestep) -{ - json msg; - msg["doid"] = p_doid; - msg["var"] = p_var; - msg["dtype"] = p_dtype; - msg["getshape"] = p_getshape; - msg["varshape"] = p_varshape; - msg["offset"] = p_offset; - msg["timestep"] = p_timestep; - return get(p_data, msg); -} - -int DataManBase::get(void *p_data, std::string p_doid, std::string p_var, - std::string &p_dtype, std::vector<size_t> &p_varshape, - size_t &p_timestep) -{ - json msg; - msg["doid"] = p_doid; - msg["var"] = p_var; - return get(p_data, msg); -} - void DataManBase::reg_callback( std::function<void(const void *, std::string, std::string, std::string, std::vector<size_t>)> cb) { - if (m_next.empty()) + if (m_stream_mans.empty()) { m_callback = cb; } else { - for (const auto &i : m_next) + for (const auto &i : m_stream_mans) { - i.second->reg_callback(cb); + i->reg_callback(cb); } } } @@ -229,36 +180,6 @@ void DataManBase::dump(const void *p_data, json p_jmsg, std::ostream &out) out << std::endl; } -void DataManBase::add_next(std::string p_name, - std::shared_ptr<DataManBase> p_next) -{ - m_next[p_name] = p_next; -} - -void DataManBase::remove_next(std::string p_name) { m_next.erase(p_name); } - -bool DataManBase::have_next() -{ - if (m_next.empty()) - { - return false; - } - else - { - return true; - } -} - -void DataManBase::print_next(std::ostream &out) -{ - for (const auto &i : m_next) - { - out << i.second->name() << " -> "; - i.second->print_next(); - out << std::endl; - } -} - bool DataManBase::auto_transform(std::vector<char> &a_data, json &a_jmsg) { if (a_jmsg["compression_method"].is_string() && @@ -284,40 +205,6 @@ bool DataManBase::auto_transform(std::vector<char> &a_data, json &a_jmsg) } } -void DataManBase::add_man_to_path(std::string p_new, std::string p_path, - json p_jmsg) -{ - if (m_next.count(p_path) > 0) - { - auto man = get_man(p_new); - if (man) - { - man->init(p_jmsg); - man->add_next(p_path, m_next[p_path]); - this->add_next(p_new, man); - this->remove_next(p_path); - } - } -} - -int DataManBase::flush_next() -{ - for (const auto &i : m_next) - { - i.second->flush(); - } - return 0; -} - -int DataManBase::put_next(const void *p_data, json p_jmsg) -{ - for (const auto &i : m_next) - { - i.second->put(p_data, p_jmsg); - } - return 0; -} - std::shared_ptr<DataManBase> DataManBase::get_man(std::string method) { try @@ -491,23 +378,6 @@ size_t DataManBase::dsize(std::string dtype) return 0; } -nlohmann::json DataManBase::atoj(unsigned int *array) -{ - json j; - if (array) - { - if (array[0] > 0) - { - j = {array[1]}; - for (unsigned int i = 2; i <= array[0]; ++i) - { - j.insert(j.end(), array[i]); - } - } - } - return j; -} - int DataManBase::closest(int v, json j, bool up) { int s = 100, k = 0, t; @@ -559,3 +429,5 @@ void DataManBase::check_shape(json &p_jmsg) p_jmsg["varbytes"] = product(varshape, dsize(p_jmsg["dtype"].get<std::string>())); } + +void DataManBase::dump_profiling() { logging(m_profiling.dump(4)); } diff --git a/source/dataman/DataManBase.h b/source/dataman/DataManBase.h index 8ea2db345b25aaf9eda7bff371163c5bf0b2a538..37780ae7afab5abfcaebff592da2c9c355797ec9 100644 --- a/source/dataman/DataManBase.h +++ b/source/dataman/DataManBase.h @@ -30,62 +30,31 @@ public: using json = nlohmann::json; DataManBase(); - int put(const void *p_data, std::string p_doid, std::string p_var, - std::string p_dtype, std::vector<size_t> p_putshape, - std::vector<size_t> p_varshape, std::vector<size_t> p_offset, - size_t p_timestep, int p_tolerance = 0, int p_priority = 100); + virtual int put_begin(const void *a_data, json &a_jmsg); + virtual int put_end(const void *a_data, json &a_jmsg); - virtual int put_begin(const void *p_data, json &p_jmsg); - - virtual int put_end(const void *p_data, json &p_jmsg); - - virtual int put(const void *p_data, json p_jmsg) = 0; - - int get(void *p_data, std::string p_doid, std::string p_var, - std::string p_dtype, std::vector<size_t> p_getshape, - std::vector<size_t> p_varshape, std::vector<size_t> p_offset, - size_t p_timestep); - - int get(void *p_data, std::string p_doid, std::string p_var, - std::string &p_dtype, std::vector<size_t> &p_varshape, - size_t &p_timestep); - - virtual int get(void *p_data, json &p_jmsg) = 0; - virtual int init(json p_jmsg) = 0; + virtual int put(const void *a_data, json &a_jmsg) = 0; + virtual int get(void *a_data, json &a_jmsg) = 0; + virtual int init(json a_jmsg) = 0; virtual void flush() = 0; virtual std::string name() = 0; virtual std::string type() = 0; void reg_callback(std::function<void(const void *, std::string, std::string, std::string, std::vector<size_t>)> cb); - - void dump(const void *p_data, json p_jmsg, std::ostream &out = std::cout); - - void add_next(std::string p_name, std::shared_ptr<DataManBase> p_next); - - void remove_next(std::string p_name); - - bool have_next(); - - void print_next(std::ostream &out = std::cout); - + void dump(const void *a_data, json a_jmsg, std::ostream &out = std::cout); virtual void transform(std::vector<char> &a_data, json &a_jmsg) = 0; + void dump_profiling(); protected: bool auto_transform(std::vector<char> &a_data, json &a_jmsg); - void add_man_to_path(std::string p_new, std::string p_path, json p_jmsg); - - virtual int flush_next(); - - virtual int put_next(const void *p_data, json p_jmsg); - std::shared_ptr<DataManBase> get_man(std::string method); void logging(std::string p_msg, std::string p_man = "", std::ostream &out = std::cout); - bool check_json(json p_jmsg, std::vector<std::string> p_strings, + bool check_json(json a_jmsg, std::vector<std::string> p_strings, std::string p_man = ""); size_t product(size_t *shape); @@ -94,25 +63,22 @@ protected: size_t dsize(std::string dtype); - json atoj(unsigned int *array); - int closest(int v, json j, bool up); - void check_shape(json &p_jmsg); + void check_shape(json &a_jmsg); std::function<void(const void *, std::string, std::string, std::string, std::vector<size_t>)> m_callback; - std::map<std::string, std::shared_ptr<DataManBase>> m_next; + std::vector<std::shared_ptr<DataManBase>> m_stream_mans; private: struct ManagerLibrary; std::unordered_map<std::string, ManagerLibrary *> m_LoadedManagers; - json m_profiling; std::chrono::time_point<std::chrono::system_clock> m_start_time; std::chrono::time_point<std::chrono::system_clock> m_step_time; - bool m_profiling_enabled = false; + json m_profiling; }; #endif /* DATAMANBASE_H_ */ diff --git a/source/dataman/DumpMan.cpp b/source/dataman/DumpMan.cpp index b87c9ba02bd8399efa8c02e00728b5c2c88c5dd1..b3ab55f98c7595a280aeefbb7e0a93c9dd10886b 100644 --- a/source/dataman/DumpMan.cpp +++ b/source/dataman/DumpMan.cpp @@ -10,53 +10,53 @@ #include "DumpMan.h" -int DumpMan::init(json p_jmsg) +int DumpMan::init(json a_jmsg) { - if (p_jmsg["dumping"].is_boolean()) + if (a_jmsg["dumping"].is_boolean()) { - m_dumping = p_jmsg["dumping"].get<bool>(); + m_dumping = a_jmsg["dumping"].get<bool>(); } return 0; } -int DumpMan::get(void *p_data, json &p_jmsg) { return 0; } +int DumpMan::get(void *a_data, json &a_jmsg) { return 0; } -int DumpMan::put(const void *p_data, json p_jmsg) +int DumpMan::put(const void *a_data, json &a_jmsg) { - put_begin(p_data, p_jmsg); + put_begin(a_data, a_jmsg); if (!m_dumping) { return 1; } - if (!check_json(p_jmsg, {"doid", "var", "dtype", "putshape"})) + if (!check_json(a_jmsg, {"doid", "var", "dtype", "putshape"})) { return -1; } - std::string doid = p_jmsg["doid"]; - std::string var = p_jmsg["var"]; - std::string dtype = p_jmsg["dtype"]; + std::string doid = a_jmsg["doid"]; + std::string var = a_jmsg["var"]; + std::string dtype = a_jmsg["dtype"]; std::vector<size_t> putshape = - p_jmsg["putshape"].get<std::vector<size_t>>(); + a_jmsg["putshape"].get<std::vector<size_t>>(); std::vector<size_t> varshape = - p_jmsg["varshape"].get<std::vector<size_t>>(); - std::vector<size_t> offset = p_jmsg["offset"].get<std::vector<size_t>>(); + a_jmsg["varshape"].get<std::vector<size_t>>(); + std::vector<size_t> offset = a_jmsg["offset"].get<std::vector<size_t>>(); int numbers_to_print = 100; if (numbers_to_print > product(putshape, 1)) { numbers_to_print = product(putshape, 1); } size_t putbytes = product(putshape, dsize(dtype)); - size_t sendbytes = p_jmsg["sendbytes"].get<size_t>(); + size_t sendbytes = a_jmsg["sendbytes"].get<size_t>(); - std::cout << p_jmsg.dump(4) << std::endl; + std::cout << a_jmsg.dump(4) << std::endl; std::cout << "total MBs = " << product(putshape, dsize(dtype)) / 1000000 << std::endl; - std::vector<char> data(static_cast<const char *>(p_data), - static_cast<const char *>(p_data) + sendbytes); + std::vector<char> data(static_cast<const char *>(a_data), + static_cast<const char *>(a_data) + sendbytes); - auto_transform(data, p_jmsg); + auto_transform(data, a_jmsg); void *data_to_print = data.data(); for (size_t i = 0; i < numbers_to_print; ++i) @@ -72,7 +72,7 @@ int DumpMan::put(const void *p_data, json p_jmsg) } std::cout << std::endl; - put_end(p_data, p_jmsg); + put_end(a_data, a_jmsg); return 0; } diff --git a/source/dataman/DumpMan.h b/source/dataman/DumpMan.h index bc06477557fe816aa36dbbf1f75413fa8351a398..7dd27c21213549a22cf783ff3a11f26c44c5aa13 100644 --- a/source/dataman/DumpMan.h +++ b/source/dataman/DumpMan.h @@ -19,9 +19,9 @@ public: DumpMan() = default; virtual ~DumpMan() = default; - virtual int init(json p_jmsg); - virtual int put(const void *p_data, json p_jmsg); - virtual int get(void *p_data, json &p_jmsg); + virtual int init(json a_jmsg); + virtual int put(const void *a_data, json &a_jmsg); + virtual int get(void *a_data, json &a_jmsg); void flush(); std::string name() { return "DumpMan"; } std::string type() { return "Dump"; } diff --git a/source/dataman/MdtmMan.cpp b/source/dataman/MdtmMan.cpp index 5707861de4d48469435d8a6522a7221eced9568a..2f41d40c669d4f07439a31f437d4a112e4132517 100644 --- a/source/dataman/MdtmMan.cpp +++ b/source/dataman/MdtmMan.cpp @@ -16,134 +16,88 @@ #include <zmq.h> -MdtmMan::~MdtmMan() +int MdtmMan::init(json a_jmsg) { - if (zmq_ipc_req) - { - zmq_close(zmq_ipc_req); - } -} -int MdtmMan::init(json p_jmsg) -{ + StreamMan::init(a_jmsg); - StreamMan::init(p_jmsg); - - if (p_jmsg["pipe_prefix"].is_string()) - { - pipe_desc["pipe_prefix"] = p_jmsg["pipe_prefix"].get<std::string>(); - } - else + if (a_jmsg["pipe_prefix"].is_string()) { - pipe_desc["pipe_prefix"] = "/tmp/MdtmManPipes/"; + m_pipepath = a_jmsg["pipe_prefix"].get<std::string>(); } + json pipe_desc; pipe_desc["operation"] = "init"; + pipe_desc["pipe_prefix"] = m_pipepath; pipe_desc["mode"] = m_stream_mode; - std::string pipename_prefix = "MdtmManPipe"; - for (int i = 0; i < m_num_channels; ++i) + std::stringstream pname; + pname << m_pipename_prefix << m_channel_id; + m_pipename = pname.str(); + m_full_pipename = m_pipepath + m_pipename; + + // send JSON message to MDTM + if (m_channel_id == 0) { - std::stringstream pipename; - pipename << pipename_prefix << i; - if (i == 0) - { - pipe_desc["pipe_names"] = {pipename.str()}; - pipe_desc["loss_tolerance"] = {m_tolerance[i]}; - pipe_desc["priority"] = {m_priority[i]}; - } - else + for (int i = 0; i < m_num_channels; ++i) { - pipe_desc["pipe_names"].insert(pipe_desc["pipe_names"].end(), - pipename.str()); - pipe_desc["loss_tolerance"].insert( - pipe_desc["loss_tolerance"].end(), m_tolerance[i]); - pipe_desc["priority"].insert(pipe_desc["priority"].end(), - m_priority[i]); + std::stringstream pipename; + pipename << m_pipename_prefix << i; + if (i == 0) + { + pipe_desc["pipe_names"] = {pipename.str()}; + } + else + { + pipe_desc["pipe_names"].insert(pipe_desc["pipe_names"].end(), + pipename.str()); + } } - } - - // ZMQ_DataMan_MDTM - if (m_stream_mode == "sender") - { - zmq_ipc_req = zmq_socket(zmq_context, ZMQ_REQ); + void *zmq_ipc_req = nullptr; + zmq_ipc_req = zmq_socket(m_zmq_context, ZMQ_REQ); zmq_connect(zmq_ipc_req, "ipc:///tmp/ADIOS_MDTM_pipe"); char buffer_return[10]; zmq_send(zmq_ipc_req, pipe_desc.dump().c_str(), pipe_desc.dump().length(), 0); zmq_recv(zmq_ipc_req, buffer_return, sizeof(buffer_return), 0); + if (zmq_ipc_req) + { + zmq_close(zmq_ipc_req); + } } - // Pipes - mkdir(pipe_desc["pipe_prefix"].get<std::string>().c_str(), 0755); - for (const auto &i : - pipe_desc["pipe_names"].get<std::vector<std::string>>()) + // Make pipes + mkdir(m_pipepath.c_str(), 0755); + mkfifo(m_full_pipename.c_str(), 0666); + + if (m_stream_mode == "sender") { - std::string filename = pipe_desc["pipe_prefix"].get<std::string>() + i; - mkfifo(filename.c_str(), 0666); + m_pipe_handler = open(m_full_pipename.c_str(), O_WRONLY); } - - for (int i = 0; i < m_num_channels; ++i) + if (m_stream_mode == "receiver") { - std::stringstream pipename; - pipename << pipename_prefix << i; - std::string fullpipename = - pipe_desc["pipe_prefix"].get<std::string>() + pipename.str(); - if (m_stream_mode == "sender") - { - int fp = open(fullpipename.c_str(), O_WRONLY); - pipes.push_back(fp); - } - if (m_stream_mode == "receiver") - { - int fp = open(fullpipename.c_str(), O_RDONLY | O_NONBLOCK); - pipes.push_back(fp); - } - pipenames.push_back(pipename.str()); + m_pipe_handler = open(m_full_pipename.c_str(), O_RDONLY | O_NONBLOCK); } return 0; } -int MdtmMan::put(const void *a_data, json a_jmsg) +int MdtmMan::put(const void *a_data, json &a_jmsg) { + a_jmsg["pipe"] = m_pipename; put_begin(a_data, a_jmsg); - - // determine pipe to use - int priority = 100; - if (a_jmsg["priority"].is_number_integer()) - { - priority = a_jmsg["priority"].get<int>(); - } - int index; - if (m_parallel_mode == "round") - { - if (m_current_channel == m_num_channels - 1) - { - index = 0; - m_current_channel = 0; - } - else - { - m_current_channel++; - index = m_current_channel; - } - } - else if (m_parallel_mode == "priority") - { - index = closest(priority, pipe_desc["priority"], true); - } - a_jmsg["pipe"] = pipe_desc["pipe_names"][index]; - - StreamMan::put(a_data, a_jmsg); - size_t sendbytes = a_jmsg["sendbytes"].get<size_t>(); - write(pipes[index], a_data, sendbytes); + StreamMan::put_stream(a_data, a_jmsg); put_end(a_data, a_jmsg); return 0; } -int MdtmMan::get(void *p_data, json &p_jmsg) { return 0; } +int MdtmMan::get(void *a_data, json &a_jmsg) { return 0; } + +void MdtmMan::on_put(std::shared_ptr<std::vector<char>> a_data) +{ + write(m_pipe_handler, a_data->data(), a_data->size()); +} -void MdtmMan::on_recv(json a_jmsg) +void MdtmMan::on_recv(json &a_jmsg) { // push new request @@ -176,27 +130,13 @@ void MdtmMan::on_recv(json a_jmsg) size_t sendbytes = jmsg["sendbytes"].get<size_t>(); vqueue.front() = std::vector<char>(sendbytes); - // determine the pipe for the head request - if (jmsg == nullptr) - { - break; - } - int pipeindex = 0; - for (int i = 0; i < pipenames.size(); ++i) - { - if (jmsg["pipe"].get<std::string>() == pipenames[i]) - { - pipeindex = i; - } - } - // read the head request int error_times = 0; while (iqueue.front() < sendbytes) { - int ret = read(pipes[pipeindex], - vqueue.front().data() + iqueue.front(), - sendbytes - iqueue.front()); + int ret = + read(m_pipe_handler, vqueue.front().data() + iqueue.front(), + sendbytes - iqueue.front()); if (ret > 0) { iqueue.front() += ret; diff --git a/source/dataman/MdtmMan.h b/source/dataman/MdtmMan.h index 4a29da319fd1287a65312c8f066d4144d19166c3..59f96e55d2d721a76fc99354f3c2a6b78a276f83 100644 --- a/source/dataman/MdtmMan.h +++ b/source/dataman/MdtmMan.h @@ -19,23 +19,25 @@ class MdtmMan : public StreamMan { public: MdtmMan() = default; - virtual ~MdtmMan(); + virtual ~MdtmMan() = default; - virtual int init(json p_jmsg); - virtual int put(const void *p_data, json p_jmsg); - virtual int get(void *p_data, json &p_jmsg); + virtual int init(json a_jmsg); + virtual int put(const void *a_data, json &a_jmsg); + virtual int get(void *a_data, json &a_jmsg); virtual void transform(std::vector<char> &a_data, json &a_jmsg) {} - void on_recv(json msg); + virtual void on_recv(json &a_msg); + virtual void on_put(std::shared_ptr<std::vector<char>> a_data); std::string name() { return "MdtmMan"; } private: - void *zmq_ipc_req = nullptr; int zmq_msg_size = 1024; - json pipe_desc; + std::string m_pipepath = "/tmp/MdtmManPipes/"; + std::string m_pipename_prefix = "MdtmManPipe"; + std::string m_pipename; + std::string m_full_pipename; + int m_pipe_handler; std::string getmode = "callback"; - std::vector<int> pipes; - std::vector<std::string> pipenames; std::queue<json> jqueue; std::queue<std::vector<char>> vqueue; std::queue<int> iqueue; diff --git a/source/dataman/StreamMan.cpp b/source/dataman/StreamMan.cpp index 276011cddb991db590aecf4f392b7347b60b9543..a87de51fc809e8340a503bafc4fd845bcae1c2c2 100644 --- a/source/dataman/StreamMan.cpp +++ b/source/dataman/StreamMan.cpp @@ -19,18 +19,29 @@ StreamMan::~StreamMan() { - if (zmq_meta) + if (m_zmq_rep) { - zmq_close(zmq_meta); + zmq_close(m_zmq_rep); } - if (zmq_context) + if (m_zmq_req) { - zmq_ctx_destroy(zmq_context); + zmq_close(m_zmq_req); } - zmq_meta_rep_thread_active = false; - if (zmq_meta_rep_thread.joinable()) + if (m_zmq_context) { - zmq_meta_rep_thread.join(); + zmq_ctx_destroy(m_zmq_context); + } + + m_zmq_rep_thread_active = false; + if (m_zmq_rep_thread.joinable()) + { + m_zmq_rep_thread.join(); + } + + m_zmq_req_thread_active = false; + if (m_zmq_req_thread.joinable()) + { + m_zmq_req_thread.join(); } } @@ -54,38 +65,41 @@ int StreamMan::init(json p_jmsg) { m_clean_mode = p_jmsg["clean_mode"]; } - m_tolerance.assign(m_num_channels, 0); - m_priority.assign(m_num_channels, 100); - if (p_jmsg["num_channels"].is_number_integer()) + if (p_jmsg["tolerance"].is_number()) { - m_num_channels = p_jmsg["num_channels"].get<int>(); + m_tolerance = p_jmsg["tolerance"].get<int>(); } - if (p_jmsg["tolerance"] != nullptr) + if (p_jmsg["priority"].is_number()) { - m_tolerance = p_jmsg["tolerance"].get<std::vector<int>>(); + m_priority = p_jmsg["priority"].get<int>(); } - if (p_jmsg["priority"] != nullptr) + if (p_jmsg["num_channels"].is_number()) { - m_priority = p_jmsg["priority"].get<std::vector<int>>(); + m_num_channels = p_jmsg["num_channels"].get<int>(); + } + if (p_jmsg["channel_id"].is_number()) + { + m_channel_id = p_jmsg["channel_id"].get<int>(); } - if (!zmq_context) + if (!m_zmq_context) { - zmq_context = zmq_ctx_new(); - zmq_meta = zmq_socket(zmq_context, ZMQ_PAIR); + m_zmq_context = zmq_ctx_new(); if (m_stream_mode == "sender") { - zmq_connect(zmq_meta, remote_address.c_str()); - logging("StreamMan::init " + remote_address + " connected"); + m_zmq_req = zmq_socket(m_zmq_context, ZMQ_REQ); + zmq_connect(m_zmq_req, remote_address.c_str()); + logging("Connecting " + remote_address + " ..."); } else if (m_stream_mode == "receiver") { - zmq_bind(zmq_meta, local_address.c_str()); - logging("StreamMan::init " + local_address + " bound"); + m_zmq_rep = zmq_socket(m_zmq_context, ZMQ_REP); + zmq_bind(m_zmq_rep, local_address.c_str()); + logging("Binding " + local_address + " ..."); + m_zmq_rep_thread_active = true; + m_zmq_rep_thread = + std::thread(&StreamMan::zmq_rep_thread_func, this); } - zmq_meta_rep_thread_active = true; - zmq_meta_rep_thread = - std::thread(&StreamMan::zmq_meta_rep_thread_func, this); } return 0; } @@ -138,15 +152,23 @@ void StreamMan::flush() { json msg; msg["operation"] = "flush"; - zmq_send(zmq_meta, msg.dump().c_str(), msg.dump().length(), 0); + char ret[10]; + zmq_send(m_zmq_req, msg.dump().c_str(), msg.dump().length(), 0); + zmq_recv(m_zmq_req, ret, 10, 0); +} + +void StreamMan::zmq_req_thread_func(std::shared_ptr<std::vector<char>> a_data) +{ + on_put(a_data); } -void StreamMan::zmq_meta_rep_thread_func() +void StreamMan::zmq_rep_thread_func() { - while (zmq_meta_rep_thread_active) + while (m_zmq_rep_thread_active) { char msg[1024] = ""; - int ret = zmq_recv(zmq_meta, msg, 1024, ZMQ_NOBLOCK); + int ret = zmq_recv(m_zmq_rep, msg, 1024, ZMQ_NOBLOCK); + zmq_send(m_zmq_rep, "OK", 4, 0); std::string smsg = msg; if (ret >= 0) { @@ -154,13 +176,30 @@ void StreamMan::zmq_meta_rep_thread_func() logging("StreamMan::zmq_meta_rep_thread_func: \n" + jmsg.dump(4)); on_recv(jmsg); } - usleep(1); + else + { + usleep(1); + } } } -int StreamMan::put(const void *p_data, json p_jmsg) +int StreamMan::put_stream(const void *a_data, json a_jmsg) { - p_jmsg["operation"] = "put"; - zmq_send(zmq_meta, p_jmsg.dump().c_str(), p_jmsg.dump().length(), 0); + if (m_zmq_req_thread.joinable()) + { + m_zmq_req_thread.join(); + } + a_jmsg["operation"] = "put"; + char ret[10]; + zmq_send(m_zmq_req, a_jmsg.dump().c_str(), a_jmsg.dump().length(), 0); + zmq_recv(m_zmq_req, ret, 10, 0); + + // copy + std::shared_ptr<std::vector<char>> data = + std::make_shared<std::vector<char>>(); + data->resize(a_jmsg["sendbytes"].get<size_t>()); + std::memcpy(data->data(), a_data, a_jmsg["sendbytes"].get<size_t>()); + + m_zmq_req_thread = std::thread(&StreamMan::zmq_req_thread_func, this, data); return 0; } diff --git a/source/dataman/StreamMan.h b/source/dataman/StreamMan.h index 9f02603cb9dc1bb5832945c7dc2f48600cbcbef7..d0cffdee4b2d7a65cb6023d92f953e8b40c4ec74 100644 --- a/source/dataman/StreamMan.h +++ b/source/dataman/StreamMan.h @@ -22,33 +22,31 @@ public: StreamMan() = default; virtual ~StreamMan(); - virtual int init(json p_jmsg); - virtual int put(const void *p_data, json p_jmsg); - virtual void on_recv(json msg) = 0; + virtual int init(json a_jmsg); + virtual void on_recv(json &a_msg) = 0; + virtual void on_put(std::shared_ptr<std::vector<char>> a_data) = 0; void flush(); virtual std::string type() { return "Stream"; } protected: - void *zmq_context = NULL; - CacheMan m_cache; int callback_direct(const void *a_data, json &a_jmsg); int callback_cache(); + int put_stream(const void *a_data, json a_jmsg); + void *m_zmq_context = nullptr; + CacheMan m_cache; std::string m_get_mode = "callback"; std::string m_stream_mode; std::string m_local_ip; std::string m_remote_ip; int m_local_port; int m_remote_port; - int m_num_channels = 10; - std::vector<int> m_tolerance; - std::vector<int> m_priority; + int m_tolerance = 0; + int m_priority = 100; + int m_channel_id = 0; + int m_num_channels = 1; std::string m_clean_mode = "nan"; - // parallel - std::string m_parallel_mode = "round"; // round, priority - int m_current_channel = 0; - inline std::string make_address(std::string ip, int port, std::string protocol) { @@ -58,10 +56,15 @@ protected: } private: - void *zmq_meta = NULL; - void zmq_meta_rep_thread_func(); - bool zmq_meta_rep_thread_active; - std::thread zmq_meta_rep_thread; + void zmq_rep_thread_func(); + std::thread m_zmq_rep_thread; + void *m_zmq_rep = nullptr; + bool m_zmq_rep_thread_active = false; + + void zmq_req_thread_func(std::shared_ptr<std::vector<char>> a_data); + std::thread m_zmq_req_thread; + void *m_zmq_req = nullptr; + bool m_zmq_req_thread_active = false; }; #endif diff --git a/source/dataman/TemporalMan.cpp b/source/dataman/TemporalMan.cpp index c87bf7b102e9ee01c62582aa6541088e4bf69d86..e60081bf0357bd2c43b3f2f506bd4d9d2ae3b6bc 100644 --- a/source/dataman/TemporalMan.cpp +++ b/source/dataman/TemporalMan.cpp @@ -12,7 +12,7 @@ int TemporalMan::init(json p_jmsg) { return 0; } -int TemporalMan::put(const void *p_data, json p_jmsg) +int TemporalMan::put(const void *p_data, json &p_jmsg) { put_begin(p_data, p_jmsg); put_end(p_data, p_jmsg); diff --git a/source/dataman/TemporalMan.h b/source/dataman/TemporalMan.h index 5f0362d1121f5a24f45a782eceee7de4cea203a7..1ced811da9df0e7b7d0e78b0e18d7c61e5da9a61 100644 --- a/source/dataman/TemporalMan.h +++ b/source/dataman/TemporalMan.h @@ -18,9 +18,9 @@ class TemporalMan : public CompressMan public: TemporalMan() = default; virtual ~TemporalMan() = default; - virtual int init(json p_jmsg); - virtual int put(const void *p_data, json p_jmsg); - virtual int get(void *p_data, json &p_jmsg); + virtual int init(json a_jmsg); + virtual int put(const void *a_data, json &a_jmsg); + virtual int get(void *a_data, json &a_jmsg); virtual void flush(); virtual void transform(std::vector<char> &a_data, json &a_jmsg); std::string name() { return "TemporalMan"; } diff --git a/source/dataman/ZfpMan.cpp b/source/dataman/ZfpMan.cpp index 6a2b1291b271f604c727c166eae8ae197bac5114..54c5ef48c799579c70c38464a3d43a75a7ed2f0a 100644 --- a/source/dataman/ZfpMan.cpp +++ b/source/dataman/ZfpMan.cpp @@ -21,30 +21,28 @@ int ZfpMan::init(json a_jmsg) return 0; } -int ZfpMan::put(const void *a_data, json a_jmsg) +int ZfpMan::put(const void *a_data, json &a_jmsg) { put_begin(a_data, a_jmsg); - std::vector<char> compressed_data; if (check_json(a_jmsg, {"doid", "var", "dtype", "putshape"}, "ZfpMan")) { - if (not a_jmsg["compression_rate"].is_number()) - { - a_jmsg["compression_rate"] = m_compression_rate; - } compress(const_cast<void *>(a_data), compressed_data, a_jmsg); } - put_end(compressed_data.data(), a_jmsg); return 0; } int ZfpMan::get(void *a_data, json &a_jmsg) { return 0; } -void ZfpMan::flush() { flush_next(); } +void ZfpMan::flush() {} int ZfpMan::compress(void *a_input, std::vector<char> &a_output, json &a_jmsg) { + if (!a_jmsg["compression_rate"].is_number()) + { + a_jmsg["compression_rate"] = m_compression_rate; + } std::string dtype = a_jmsg["dtype"]; std::vector<size_t> shape = a_jmsg["putshape"].get<std::vector<size_t>>(); int compression_rate = a_jmsg["compression_rate"].get<int>(); diff --git a/source/dataman/ZfpMan.h b/source/dataman/ZfpMan.h index 8f81df2564dd31d058ee2e6571c1d1b4cbb39df8..d2f27e65db5efeee89dc6f99bf076af2098b6a5d 100644 --- a/source/dataman/ZfpMan.h +++ b/source/dataman/ZfpMan.h @@ -19,7 +19,7 @@ public: ZfpMan() = default; virtual ~ZfpMan() = default; virtual int init(json a_jmsg); - virtual int put(const void *a_data, json a_jmsg); + virtual int put(const void *a_data, json &a_jmsg); virtual int get(void *a_data, json &a_jmsg); virtual void transform(std::vector<char> &a_data, json &a_jmsg); virtual void flush(); diff --git a/source/dataman/ZmqMan.cpp b/source/dataman/ZmqMan.cpp index e92a3c4510000df1a29d048ab298743399a0249b..6300dfcd48ed81957d67b4ba633b6158371fb035 100644 --- a/source/dataman/ZmqMan.cpp +++ b/source/dataman/ZmqMan.cpp @@ -26,7 +26,7 @@ int ZmqMan::init(json a_jmsg) StreamMan::init(a_jmsg); if (m_stream_mode == "sender") { - zmq_data = zmq_socket(zmq_context, ZMQ_REQ); + zmq_data = zmq_socket(m_zmq_context, ZMQ_REQ); std::string remote_address = make_address(m_remote_ip, m_remote_port + 1, "tcp"); zmq_connect(zmq_data, remote_address.c_str()); @@ -34,7 +34,7 @@ int ZmqMan::init(json a_jmsg) } else if (m_stream_mode == "receiver") { - zmq_data = zmq_socket(zmq_context, ZMQ_REP); + zmq_data = zmq_socket(m_zmq_context, ZMQ_REP); std::string local_address = make_address(m_local_ip, m_local_port + 1, "tcp"); zmq_bind(zmq_data, local_address.c_str()); @@ -43,20 +43,24 @@ int ZmqMan::init(json a_jmsg) return 0; } -int ZmqMan::put(const void *a_data, json a_jmsg) +int ZmqMan::put(const void *a_data, json &a_jmsg) { - char ret[10]; DataManBase::put_begin(a_data, a_jmsg); - StreamMan::put(a_data, a_jmsg); - zmq_send(zmq_data, a_data, a_jmsg["sendbytes"].get<size_t>(), 0); - zmq_recv(zmq_data, ret, 10, 0); + StreamMan::put_stream(a_data, a_jmsg); DataManBase::put_end(a_data, a_jmsg); return 0; } int ZmqMan::get(void *a_data, json &a_jmsg) { return 0; } -void ZmqMan::on_recv(json a_jmsg) +void ZmqMan::on_put(std::shared_ptr<std::vector<char>> a_data) +{ + char ret[10]; + zmq_send(zmq_data, a_data->data(), a_data->size(), 0); + zmq_recv(zmq_data, ret, 10, 0); +} + +void ZmqMan::on_recv(json &a_jmsg) { if (a_jmsg["operation"].get<std::string>() == "put") { diff --git a/source/dataman/ZmqMan.h b/source/dataman/ZmqMan.h index 8e7ac54fd686391e444b7f41b5147b3d8ee9cac6..9571f5d4927b7d7d84899c524804b2d7c7aa2dbb 100644 --- a/source/dataman/ZmqMan.h +++ b/source/dataman/ZmqMan.h @@ -19,12 +19,13 @@ public: ZmqMan() = default; virtual ~ZmqMan(); - virtual int init(json p_jmsg); - virtual int put(const void *p_data, json p_jmsg); - virtual int get(void *p_data, json &p_jmsg); + virtual int init(json a_jmsg); + virtual int put(const void *a_data, json &a_jmsg); + virtual int get(void *a_data, json &a_jmsg); virtual void transform(std::vector<char> &a_data, json &a_jmsg) {} - virtual void on_recv(json msg); + virtual void on_recv(json &a_msg); + virtual void on_put(std::shared_ptr<std::vector<char>> a_data); std::string name() { return "ZmqMan"; } private: