diff --git a/source/dataman/CacheMan.cpp b/source/dataman/CacheMan.cpp index b61177f03535274dab465edef9a4053948cdfb43..11ee6d3b38f7c3f94163a87e950b1e7af4f01df8 100644 --- a/source/dataman/CacheMan.cpp +++ b/source/dataman/CacheMan.cpp @@ -13,128 +13,151 @@ #include <algorithm> #include <limits> -int CacheItem::init(json p_jmsg) +int CacheItem::init(json a_jmsg) { - m_doid = p_jmsg["doid"].get<std::string>(); - m_var = p_jmsg["var"].get<std::string>(); - m_dtype = p_jmsg["dtype"].get<std::string>(); - m_varshape = p_jmsg["varshape"].get<std::vector<size_t>>(); - m_dsize = p_jmsg["dsize"].get<size_t>(); - m_varsize = p_jmsg["varsize"].get<size_t>(); - m_varbytes = p_jmsg["varbytes"].get<size_t>(); - - if (m_buffer[m_timestep].size() != m_varbytes) - { - m_buffer[m_timestep].resize(m_varbytes); - } + m_jmsg = a_jmsg; return 0; } -int CacheItem::put(const void *p_data, json p_jmsg) +int CacheMan::put(const void *a_data, json a_jmsg) +{ + std::string doid = a_jmsg["doid"].get<std::string>(); + std::string var = a_jmsg["var"].get<std::string>(); + return m_cache[doid][var].put(a_data, a_jmsg); +} + +int CacheItem::put(const void *a_data, json a_jmsg) { - init(p_jmsg); - std::vector<size_t> p_putshape = - p_jmsg["putshape"].get<std::vector<size_t>>(); - std::vector<size_t> p_varshape = - p_jmsg["varshape"].get<std::vector<size_t>>(); - std::vector<size_t> p_offset = p_jmsg["offset"].get<std::vector<size_t>>(); - - size_t putsize = p_jmsg["putsize"].get<size_t>(); - size_t chunksize = p_putshape.back(); + if (!m_initialized) + { + init(a_jmsg); + m_initialized = true; + } + + std::vector<size_t> varshape = + a_jmsg["varshape"].get<std::vector<size_t>>(); + std::vector<size_t> putshape = + a_jmsg["putshape"].get<std::vector<size_t>>(); + std::vector<size_t> offset = a_jmsg["offset"].get<std::vector<size_t>>(); + size_t putsize = a_jmsg["putsize"].get<size_t>(); + size_t chunksize = putshape.back(); + size_t varbytes = a_jmsg["varbytes"].get<size_t>(); + size_t dsize = a_jmsg["dsize"].get<size_t>(); + + size_t timestep = 0; + + if (m_cache.empty()) + { + push(); + } + + if (a_jmsg["timestep"].is_number()) + { + timestep = a_jmsg["timestep"].get<size_t>(); + } + for (size_t i = 0; i < putsize; i += chunksize) { - std::vector<size_t> p = one2multi(p_putshape, i); - p = apply_offset(p, p_offset); - size_t ig = multi2one(p_varshape, p); - std::copy((char *)p_data + i * m_dsize, - (char *)p_data + i * m_dsize + chunksize * m_dsize, - m_buffer[m_timestep].data() + ig * m_dsize); + std::vector<size_t> p = one2multi(putshape, i); + p = apply_offset(p, offset); + size_t ig = multi2one(varshape, p); + std::copy((char *)a_data + i * dsize, + (char *)a_data + i * dsize + chunksize * dsize, + m_cache.back().data() + ig * dsize); } return 0; } -std::vector<size_t> CacheItem::get_shape() { return m_varshape; } - -std::string CacheItem::get_dtype() { return m_dtype; } - -void CacheItem::flush() { m_timestep++; } +void *CacheItem::get() { return m_cache.front().data(); } -void CacheMan::remove(std::string doid, std::string var, size_t timestep) +void *CacheMan::get(std::string doid, std::string var) { - m_cache[doid][var].remove(timestep); + return m_cache[doid][var].get(); } -void CacheMan::remove_all(size_t timestep) +void CacheMan::pop() { for (auto i : m_cache) { - for (auto j : m_cache[i.first]) + for (auto j : i.second) { - j.second.remove(timestep); + j.second.pop(); } } + m_timesteps_cached--; + m_timestep_first++; } -void CacheItem::remove(size_t timestep) { m_buffer.erase(timestep); } +void CacheItem::pop() { m_cache.pop(); } -void CacheItem::clean(const std::string mode) +void CacheMan::push() { - if (mode == "zero") - { - std::memset(m_buffer[m_timestep].data(), 0, m_varbytes); - return; - } - if (mode == "nan") + for (auto i : m_cache) { - for (size_t i = 0; i < m_varsize; i++) + for (auto j : i.second) { - if (m_dtype == "float") - ((float *)m_buffer[m_timestep].data())[i] = - std::numeric_limits<float>::quiet_NaN(); + j.second.push(); } - return; } + m_timesteps_cached++; } -const void *CacheItem::get_buffer() { return m_buffer[m_timestep].data(); } - -int CacheMan::put(const void *p_data, json p_jmsg) +void CacheItem::push() { - std::string doid = p_jmsg["doid"].get<std::string>(); - std::string var = p_jmsg["var"].get<std::string>(); - return m_cache[doid][var].put(p_data, p_jmsg); + size_t varbytes = m_jmsg["varbytes"].get<size_t>(); + std::vector<char> buff(varbytes); + clean(buff, m_clean_mode); + m_cache.push(buff); } -void CacheMan::flush() +void CacheItem::clean(std::string a_mode) { clean(m_cache.front(), a_mode); } +void CacheMan::clean(std::string a_mode) { for (auto i : m_cache) { - for (auto j : m_cache[i.first]) + for (auto j : i.second) { - j.second.flush(); + j.second.clean(a_mode); } } } -const void *CacheMan::get_buffer(std::string doid, std::string var) -{ - return m_cache[doid][var].get_buffer(); -} - -void CacheMan::clean(std::string doid, std::string var, std::string mode) -{ - m_cache[doid][var].clean(mode); -} - -void CacheMan::clean_all(std::string mode) +void CacheItem::clean(std::vector<char> &a_data, std::string a_mode) { - for (auto i = m_cache.begin(); i != m_cache.end(); ++i) + size_t varbytes = m_jmsg["varbytes"].get<size_t>(); + size_t varsize = m_jmsg["varsize"].get<size_t>(); + std::string dtype = m_jmsg["dtype"].get<std::string>(); + if (a_mode == "zero") { - for (auto j = m_cache[i->first].begin(); j != m_cache[i->first].end(); - ++j) + std::memset(a_data.data(), 0, varbytes); + return; + } + else if (a_mode == "nan") + { + if (dtype == "float") + { + for (size_t j = 0; j < varsize; j++) + { + ((float *)a_data.data())[j] = + std::numeric_limits<float>::quiet_NaN(); + } + } + else if (dtype == "double") { - j->second.clean(mode); + for (size_t j = 0; j < varsize; j++) + { + ((double *)a_data.data())[j] = + std::numeric_limits<double>::quiet_NaN(); + } + } + else if (dtype == "int") + { + for (size_t j = 0; j < varsize; j++) + { + ((int *)a_data.data())[j] = + std::numeric_limits<int>::quiet_NaN(); + } } } } @@ -155,12 +178,18 @@ std::vector<std::string> CacheMan::get_var_list(std::string doid) return var_list; } -std::vector<size_t> CacheMan::get_shape(std::string doid, std::string var) +size_t CacheMan::get_timesteps_cached() { return m_timesteps_cached; } + +nlohmann::json CacheMan::get_jmsg(std::string a_doid, std::string a_var) { - return m_cache[doid][var].get_shape(); + return m_cache[a_doid][a_var].get_jmsg(); } -std::string CacheMan::get_dtype(std::string doid, std::string var) +nlohmann::json CacheItem::get_jmsg() { - return m_cache[doid][var].get_dtype(); + m_jmsg.erase("putsize"); + m_jmsg.erase("putshape"); + m_jmsg.erase("putbytes"); + m_jmsg.erase("offset"); + return m_jmsg; } diff --git a/source/dataman/CacheMan.h b/source/dataman/CacheMan.h index 715f928a8028e52bdd1815bfe3c1e4c8924194dc..f27051d7ec35a54b0d3255ad289203c87233d0db 100644 --- a/source/dataman/CacheMan.h +++ b/source/dataman/CacheMan.h @@ -19,34 +19,23 @@ class CacheItem { public: using json = nlohmann::json; - CacheItem() = default; - virtual ~CacheItem() = default; - void init(std::string doid, std::string var, std::string dtype, - std::vector<size_t> varshape); - - virtual int init(json p_jmsg); - virtual int put(const void *p_data, json p_jmsg); + int init(json a_jmsg); + virtual int put(const void *a_data, json a_jmsg); virtual void transform(std::vector<char> &a_data, json &a_jmsg) {} - void flush(); - const void *get_buffer(); - void clean(const std::string mode); - void remove(size_t timestep); - std::vector<size_t> get_shape(); - std::string get_dtype(); + void *get(); + void clean(const std::string a_mode); + void pop(); + void push(); + json get_jmsg(); + void clean(std::vector<char> &a_data, std::string a_mode); + std::string m_clean_mode = "nan"; private: - std::map<size_t, std::vector<char>> m_buffer; - std::string m_doid; - std::string m_var; - std::string m_dtype; - size_t m_dsize; - size_t m_varsize; - size_t m_varbytes; - std::vector<size_t> m_varshape; - bool m_completed; - size_t m_timestep = 0; + std::queue<std::vector<char>> m_cache; + json m_jmsg; + bool m_initialized = false; inline std::vector<size_t> apply_offset(const std::vector<size_t> &p, const std::vector<size_t> &o) @@ -92,24 +81,22 @@ class CacheMan public: using json = nlohmann::json; - CacheMan() = default; - virtual ~CacheMan() = default; - virtual int put(const void *p_data, json p_jmsg); - void flush(); - const void *get_buffer(std::string doid, std::string var); - void clean(std::string doid, std::string var, std::string mode); - void clean_all(std::string mode); - void remove(std::string doid, std::string var, size_t timestep); - void remove_all(size_t timestep); + int put(const void *a_data, json a_jmsg); + void *get(std::string doid, std::string var); + void pop(); + void push(); std::vector<std::string> get_do_list(); std::vector<std::string> get_var_list(std::string doid); - std::vector<size_t> get_shape(std::string doid, std::string var); - std::string get_dtype(std::string doid, std::string var); + size_t get_timesteps_cached(); + json get_jmsg(std::string doid, std::string var); + void clean(std::string a_mode); private: typedef std::map<std::string, CacheItem> CacheVarMap; typedef std::map<std::string, CacheVarMap> CacheDoMap; CacheDoMap m_cache; + size_t m_timesteps_cached = 0; + size_t m_timestep_first = 0; }; #endif diff --git a/source/dataman/DataMan.cpp b/source/dataman/DataMan.cpp index 80f57137f8eec4c2b33abdaf787b0dcd152547bc..d9c4393967b056f4374398f1b15423823a3a24f1 100644 --- a/source/dataman/DataMan.cpp +++ b/source/dataman/DataMan.cpp @@ -25,7 +25,8 @@ int DataMan::put(const void *a_data, json a_jmsg) { if (m_cache_size > 0) { - // to be implemented + check_shape(a_jmsg); + m_cache.put(a_data, a_jmsg); } else { @@ -85,6 +86,39 @@ void DataMan::add_stream(json a_jmsg) } } -void DataMan::flush() { flush_next(); } +void DataMan::flush() +{ + m_timestep++; + if (m_cache_size > 0) + { + if (m_cache_size == m_cache.get_timesteps_cached()) + { + for (int i = 0; i < m_cache_size; i++) + { + std::vector<std::string> do_list = m_cache.get_do_list(); + for (auto j : do_list) + { + std::vector<std::string> var_list = m_cache.get_var_list(j); + for (auto k : var_list) + { + json jmsg = m_cache.get_jmsg(j, k); + put_begin(m_cache.get(j, k), jmsg); + put_end(m_cache.get(j, k), jmsg); + } + } + flush_next(); + m_cache.pop(); + } + } + else + { + m_cache.push(); + } + } + else + { + flush_next(); + } +} int DataMan::get(void *a_data, json &a_jmsg) { return 0; } diff --git a/source/dataman/DataMan.h b/source/dataman/DataMan.h index cb8e146a99129bd7b7a088cdd0a6c61a93da603b..20ccdc63a60545087130e0d59974321bbc10b919 100644 --- a/source/dataman/DataMan.h +++ b/source/dataman/DataMan.h @@ -41,7 +41,7 @@ private: int m_num_channels = 0; std::vector<int> m_tolerance; std::vector<int> m_priority; - std::queue<CacheMan> m_cache_q; + CacheMan m_cache; size_t m_cache_size = 0; size_t m_timestep = 0; }; diff --git a/source/dataman/MdtmMan.cpp b/source/dataman/MdtmMan.cpp index 7955f8c2de5f6c2cc248a4b622ef34c7a11bf94b..02e47706a4e0aa773331ae3bd02a0dfc7f21937b 100644 --- a/source/dataman/MdtmMan.cpp +++ b/source/dataman/MdtmMan.cpp @@ -154,7 +154,6 @@ void MdtmMan::on_recv(json a_jmsg) if (jqueue.front()["operation"] == "flush") { callback(); - m_cache.clean_all("nan"); jqueue.pop(); vqueue.pop(); iqueue.pop(); diff --git a/source/dataman/StreamMan.cpp b/source/dataman/StreamMan.cpp index 6874b73eb696be53d833b80cec8571eaba3fc139..d4d84f29f47169c65de7f91c81a7887d088aa417 100644 --- a/source/dataman/StreamMan.cpp +++ b/source/dataman/StreamMan.cpp @@ -50,6 +50,10 @@ int StreamMan::init(json p_jmsg) std::string local_address = make_address(m_local_ip, m_local_port, "tcp"); + if (p_jmsg["clean_mode"].is_string()) + { + m_clean_mode = p_jmsg["clean_mode"]; + } m_tolerance.assign(m_num_channels, 0); m_priority.assign(m_num_channels, 100); if (p_jmsg["num_channels"].is_number_integer()) @@ -91,25 +95,28 @@ int StreamMan::init(json p_jmsg) } } -void StreamMan::callback() +int StreamMan::callback() { - if (m_callback) + if (!m_callback) { - std::vector<std::string> do_list = m_cache.get_do_list(); - for (std::string i : do_list) - { - std::vector<std::string> var_list = m_cache.get_var_list(i); - for (std::string j : var_list) - { - m_callback(m_cache.get_buffer(i, j), i, j, - m_cache.get_dtype(i, j), m_cache.get_shape(i, j)); - } - } + logging("callback called but callback function not registered!"); + return -1; } - else + + std::vector<std::string> do_list = m_cache.get_do_list(); + for (std::string i : do_list) { - logging("callback called but callback function not registered!"); + std::vector<std::string> var_list = m_cache.get_var_list(i); + for (std::string j : var_list) + { + m_callback( + m_cache.get(i, j), i, j, + m_cache.get_jmsg(i, j)["dtype"].get<std::string>(), + m_cache.get_jmsg(i, j)["varshape"].get<std::vector<size_t>>()); + } } + + return 0; } void StreamMan::flush() diff --git a/source/dataman/StreamMan.h b/source/dataman/StreamMan.h index 198b403bda6d4c9904927bf9fff9f561a58d48d2..95897ca9c74715d6930da3149fd9dc5b52e7b659 100644 --- a/source/dataman/StreamMan.h +++ b/source/dataman/StreamMan.h @@ -31,7 +31,7 @@ public: protected: void *zmq_context = NULL; CacheMan m_cache; - void callback(); + int callback(); std::string m_get_mode = "callback"; std::string m_stream_mode; @@ -39,9 +39,10 @@ protected: std::string m_remote_ip; int m_local_port; int m_remote_port; - int m_num_channels = 1; + int m_num_channels = 10; std::vector<int> m_tolerance; std::vector<int> m_priority; + std::string m_clean_mode = "nan"; // parallel std::string m_parallel_mode = "round"; // round, priority diff --git a/source/dataman/ZmqMan.cpp b/source/dataman/ZmqMan.cpp index 1bed8f86b12ba92057cfa7fa5a9f4f786964e4c3..811baa1d52f08250520dc859b566c392c239c4c3 100644 --- a/source/dataman/ZmqMan.cpp +++ b/source/dataman/ZmqMan.cpp @@ -71,7 +71,5 @@ void ZmqMan::on_recv(json a_jmsg) else if (a_jmsg["operation"].get<std::string>() == "flush") { callback(); - m_cache.flush(); - m_cache.clean_all("nan"); } }