diff --git a/source/dataman/CacheMan.cpp b/source/dataman/CacheMan.cpp index 11ee6d3b38f7c3f94163a87e950b1e7af4f01df8..b575460c8a01ff981de38778fe06f576bf8dd0d4 100644 --- a/source/dataman/CacheMan.cpp +++ b/source/dataman/CacheMan.cpp @@ -44,17 +44,13 @@ int CacheItem::put(const void *a_data, json a_jmsg) size_t varbytes = a_jmsg["varbytes"].get<size_t>(); size_t dsize = a_jmsg["dsize"].get<size_t>(); - size_t timestep = 0; - if (m_cache.empty()) { push(); } - if (a_jmsg["timestep"].is_number()) - { - timestep = a_jmsg["timestep"].get<size_t>(); - } + printf("put begin front, pointer=%d\n", m_cache.front().data()); + printf("put begin back, pointer=%d\n", m_cache.back().data()); for (size_t i = 0; i < putsize; i += chunksize) { @@ -66,10 +62,16 @@ int CacheItem::put(const void *a_data, json a_jmsg) m_cache.back().data() + ig * dsize); } + printf("put end front, pointer=%d\n", m_cache.front().data()); + printf("put end back, pointer=%d\n", m_cache.back().data()); return 0; } -void *CacheItem::get() { return m_cache.front().data(); } +void *CacheItem::get() { + + printf("get, pointer=%d\n", m_cache.front().data()); + return m_cache.front().data(); +} void *CacheMan::get(std::string doid, std::string var) { @@ -89,7 +91,11 @@ void CacheMan::pop() m_timestep_first++; } -void CacheItem::pop() { m_cache.pop(); } +void CacheItem::pop() +{ + std::cout << "CacheItem::pop() ---------------------------" << std::endl; + m_cache.pop(); +} void CacheMan::push() { @@ -105,13 +111,55 @@ void CacheMan::push() void CacheItem::push() { + std::cout << "CacheItem::push() ---------------------------" << std::endl; size_t varbytes = m_jmsg["varbytes"].get<size_t>(); std::vector<char> buff(varbytes); - clean(buff, m_clean_mode); m_cache.push(buff); + clean(m_cache.front(), "nan"); +} + +void CacheItem::clean(std::string a_mode) +{ + size_t varbytes = m_jmsg["varbytes"].get<size_t>(); + size_t varsize = m_jmsg["varsize"].get<size_t>(); + std::string dtype = m_jmsg["dtype"].get<std::string>(); + if (a_mode == "zero") + { + printf("zero, pointer=%d\n", m_cache.front().data()); + std::memset(m_cache.front().data(), 0, varbytes); + return; + } + else if (a_mode == "nan") + { + if (dtype == "float") + { + printf("nan, pointer=%d\n", m_cache.front().data()); + for (size_t j = 0; j < varsize; j++) + { + ((float*)(m_cache.front().data()))[j] = + std::numeric_limits<float>::quiet_NaN(); + } + } + else if (dtype == "double") + { + for (size_t j = 0; j < varsize; j++) + { + ((double *)m_cache.front().data())[j] = + std::numeric_limits<double>::quiet_NaN(); + } + } + else if (dtype == "int") + { + for (size_t j = 0; j < varsize; j++) + { + ((int *)m_cache.front().data())[j] = + std::numeric_limits<int>::quiet_NaN(); + } + } + } +// clean(m_cache.front(), a_mode); } -void CacheItem::clean(std::string a_mode) { clean(m_cache.front(), a_mode); } void CacheMan::clean(std::string a_mode) { for (auto i : m_cache) @@ -130,6 +178,7 @@ void CacheItem::clean(std::vector<char> &a_data, std::string a_mode) std::string dtype = m_jmsg["dtype"].get<std::string>(); if (a_mode == "zero") { + printf("zero, pointer=%d\n", a_data.data()); std::memset(a_data.data(), 0, varbytes); return; } @@ -137,9 +186,10 @@ void CacheItem::clean(std::vector<char> &a_data, std::string a_mode) { if (dtype == "float") { + printf("nan, pointer=%d\n", a_data.data()); for (size_t j = 0; j < varsize; j++) { - ((float *)a_data.data())[j] = + ((float*)(a_data.data()))[j] = std::numeric_limits<float>::quiet_NaN(); } } @@ -193,3 +243,5 @@ nlohmann::json CacheItem::get_jmsg() m_jmsg.erase("offset"); return m_jmsg; } + + diff --git a/source/dataman/CacheMan.h b/source/dataman/CacheMan.h index f27051d7ec35a54b0d3255ad289203c87233d0db..ad248d04a3dcf0fe1e5650a6a58ded180c5ceb76 100644 --- a/source/dataman/CacheMan.h +++ b/source/dataman/CacheMan.h @@ -30,7 +30,7 @@ public: void push(); json get_jmsg(); void clean(std::vector<char> &a_data, std::string a_mode); - std::string m_clean_mode = "nan"; + std::string m_clean_mode; private: std::queue<std::vector<char>> m_cache; diff --git a/source/dataman/DataMan.cpp b/source/dataman/DataMan.cpp index d9c4393967b056f4374398f1b15423823a3a24f1..d871e0f3a29b518b48a4e9191bc39200d9105606 100644 --- a/source/dataman/DataMan.cpp +++ b/source/dataman/DataMan.cpp @@ -23,6 +23,7 @@ int DataMan::put(const void *a_data, std::string p_doid, std::string p_var, int DataMan::put(const void *a_data, json a_jmsg) { + a_jmsg["timestep"] = m_timestep; if (m_cache_size > 0) { check_shape(a_jmsg); diff --git a/source/dataman/StreamMan.cpp b/source/dataman/StreamMan.cpp index d4d84f29f47169c65de7f91c81a7887d088aa417..b993f7a2f2a792e428761fc5a4354ec95dabf15a 100644 --- a/source/dataman/StreamMan.cpp +++ b/source/dataman/StreamMan.cpp @@ -115,7 +115,7 @@ int StreamMan::callback() m_cache.get_jmsg(i, j)["varshape"].get<std::vector<size_t>>()); } } - + m_cache.clean("nan"); return 0; } @@ -135,9 +135,9 @@ void StreamMan::zmq_meta_rep_thread_func() std::string smsg = msg; if (ret >= 0) { - json j = json::parse(msg); - logging("StreamMan::zmq_meta_rep_thread_func: \n" + j.dump(4)); - on_recv(j); + json jmsg = json::parse(msg); + logging("StreamMan::zmq_meta_rep_thread_func: \n" + jmsg.dump(4)); + on_recv(jmsg); } usleep(10); } @@ -149,3 +149,6 @@ int StreamMan::put(const void *p_data, json p_jmsg) zmq_send(zmq_meta, p_jmsg.dump().c_str(), p_jmsg.dump().length(), 0); return 0; } + + + diff --git a/source/dataman/ZmqMan.cpp b/source/dataman/ZmqMan.cpp index 811baa1d52f08250520dc859b566c392c239c4c3..5d6f693739efda380e3fc8e94e8adae565762bda 100644 --- a/source/dataman/ZmqMan.cpp +++ b/source/dataman/ZmqMan.cpp @@ -73,3 +73,7 @@ void ZmqMan::on_recv(json a_jmsg) callback(); } } + + + +