diff --git a/source/dataman/MdtmMan.cpp b/source/dataman/MdtmMan.cpp index 40aed77d70e6304dafe09a3c5c695825fb0abf9c..5707861de4d48469435d8a6522a7221eced9568a 100644 --- a/source/dataman/MdtmMan.cpp +++ b/source/dataman/MdtmMan.cpp @@ -154,7 +154,7 @@ void MdtmMan::on_recv(json a_jmsg) // for flush if (jqueue.front()["operation"] == "flush") { - callback(); + callback_cache(); jqueue.pop(); vqueue.pop(); iqueue.pop(); @@ -222,7 +222,17 @@ void MdtmMan::on_recv(json a_jmsg) auto_transform(vqueue.front(), a_jmsg); } } - m_cache.put(vqueue.front().data(), jmsg); + + if (a_jmsg["varshape"] == a_jmsg["putshape"]) + { + std::cout << "callback_direct \n"; + callback_direct(vqueue.front().data(), jmsg); + } + else + { + m_cache.put(vqueue.front().data(), jmsg); + } + jqueue.pop(); vqueue.pop(); iqueue.pop(); diff --git a/source/dataman/StreamMan.cpp b/source/dataman/StreamMan.cpp index 68e08ab946722e7fb7792904eb8e166dc3f15e05..276011cddb991db590aecf4f392b7347b60b9543 100644 --- a/source/dataman/StreamMan.cpp +++ b/source/dataman/StreamMan.cpp @@ -95,7 +95,21 @@ int StreamMan::init(json p_jmsg) } } -int StreamMan::callback() +int StreamMan::callback_direct(const void *a_data, json &a_jmsg) +{ + if (!m_callback) + { + logging("callback called but callback function not registered!"); + return -1; + } + + m_callback(a_data, a_jmsg["doid"], a_jmsg["var"], + a_jmsg["dtype"].get<std::string>(), + a_jmsg["varshape"].get<std::vector<size_t>>()); + return 0; +} + +int StreamMan::callback_cache() { if (!m_callback) { @@ -104,10 +118,10 @@ int StreamMan::callback() } std::vector<std::string> do_list = m_cache.get_do_list(); - for (std::string i : do_list) + for (const std::string &i : do_list) { std::vector<std::string> var_list = m_cache.get_var_list(i); - for (std::string j : var_list) + for (const std::string &j : var_list) { m_callback( m_cache.get(i, j), i, j, @@ -116,6 +130,7 @@ int StreamMan::callback() } } m_cache.clean("nan"); + return 0; } @@ -139,7 +154,7 @@ void StreamMan::zmq_meta_rep_thread_func() logging("StreamMan::zmq_meta_rep_thread_func: \n" + jmsg.dump(4)); on_recv(jmsg); } - usleep(10); + usleep(1); } } diff --git a/source/dataman/StreamMan.h b/source/dataman/StreamMan.h index 95897ca9c74715d6930da3149fd9dc5b52e7b659..9f02603cb9dc1bb5832945c7dc2f48600cbcbef7 100644 --- a/source/dataman/StreamMan.h +++ b/source/dataman/StreamMan.h @@ -31,7 +31,8 @@ public: protected: void *zmq_context = NULL; CacheMan m_cache; - int callback(); + int callback_direct(const void *a_data, json &a_jmsg); + int callback_cache(); std::string m_get_mode = "callback"; std::string m_stream_mode; diff --git a/source/dataman/ZmqMan.cpp b/source/dataman/ZmqMan.cpp index 1832ca14d6046ccd67471c16f66bc8b0376ffaf1..e92a3c4510000df1a29d048ab298743399a0249b 100644 --- a/source/dataman/ZmqMan.cpp +++ b/source/dataman/ZmqMan.cpp @@ -65,15 +65,26 @@ void ZmqMan::on_recv(json a_jmsg) int ret = zmq_recv(zmq_data, data.data(), sendbytes, 0); zmq_send(zmq_data, "OK", 10, 0); - if (a_jmsg["compression_method"].is_string() and - a_jmsg["compression_method"].get<std::string>() != "null") + // if data is compressed then call auto_transform to decompress + if (a_jmsg["compression_method"].is_string()) { - auto_transform(data, a_jmsg); + if (a_jmsg["compression_method"].get<std::string>() != "null") + { + auto_transform(data, a_jmsg); + } + } + + if (a_jmsg["varshape"] == a_jmsg["putshape"]) + { + callback_direct(data.data(), a_jmsg); + } + else + { + m_cache.put(data.data(), a_jmsg); } - m_cache.put(data.data(), a_jmsg); } else if (a_jmsg["operation"].get<std::string>() == "flush") { - callback(); + callback_cache(); } } diff --git a/source/dataman/ZmqMan.h b/source/dataman/ZmqMan.h index 48fa2bbe0b739c4d9de06a4edde1e7c54d6b5708..8e7ac54fd686391e444b7f41b5147b3d8ee9cac6 100644 --- a/source/dataman/ZmqMan.h +++ b/source/dataman/ZmqMan.h @@ -28,7 +28,7 @@ public: std::string name() { return "ZmqMan"; } private: - void *zmq_data = NULL; + void *zmq_data = nullptr; }; extern "C" DataManBase *getMan() { return new ZmqMan; }