Skip to content
Snippets Groups Projects
Commit 9d0c78f2 authored by Ruonan Wang's avatar Ruonan Wang
Browse files

Improved CacheMan to allow caching multiple time steps

parent 63b771a1
No related branches found
No related tags found
1 merge request!90Improved CacheMan to allow caching multiple time steps
...@@ -13,128 +13,151 @@ ...@@ -13,128 +13,151 @@
#include <algorithm> #include <algorithm>
#include <limits> #include <limits>
int CacheItem::init(json p_jmsg) int CacheItem::init(json a_jmsg)
{ {
m_doid = p_jmsg["doid"].get<std::string>(); m_jmsg = a_jmsg;
m_var = p_jmsg["var"].get<std::string>();
m_dtype = p_jmsg["dtype"].get<std::string>();
m_varshape = p_jmsg["varshape"].get<std::vector<size_t>>();
m_dsize = p_jmsg["dsize"].get<size_t>();
m_varsize = p_jmsg["varsize"].get<size_t>();
m_varbytes = p_jmsg["varbytes"].get<size_t>();
if (m_buffer[m_timestep].size() != m_varbytes)
{
m_buffer[m_timestep].resize(m_varbytes);
}
return 0; return 0;
} }
int CacheItem::put(const void *p_data, json p_jmsg) int CacheMan::put(const void *a_data, json a_jmsg)
{
std::string doid = a_jmsg["doid"].get<std::string>();
std::string var = a_jmsg["var"].get<std::string>();
return m_cache[doid][var].put(a_data, a_jmsg);
}
int CacheItem::put(const void *a_data, json a_jmsg)
{ {
init(p_jmsg); if (!m_initialized)
std::vector<size_t> p_putshape = {
p_jmsg["putshape"].get<std::vector<size_t>>(); init(a_jmsg);
std::vector<size_t> p_varshape = m_initialized = true;
p_jmsg["varshape"].get<std::vector<size_t>>(); }
std::vector<size_t> p_offset = p_jmsg["offset"].get<std::vector<size_t>>();
std::vector<size_t> varshape =
size_t putsize = p_jmsg["putsize"].get<size_t>(); a_jmsg["varshape"].get<std::vector<size_t>>();
size_t chunksize = p_putshape.back(); std::vector<size_t> putshape =
a_jmsg["putshape"].get<std::vector<size_t>>();
std::vector<size_t> offset = a_jmsg["offset"].get<std::vector<size_t>>();
size_t putsize = a_jmsg["putsize"].get<size_t>();
size_t chunksize = putshape.back();
size_t varbytes = a_jmsg["varbytes"].get<size_t>();
size_t dsize = a_jmsg["dsize"].get<size_t>();
size_t timestep = 0;
if (m_cache.empty())
{
push();
}
if (a_jmsg["timestep"].is_number())
{
timestep = a_jmsg["timestep"].get<size_t>();
}
for (size_t i = 0; i < putsize; i += chunksize) for (size_t i = 0; i < putsize; i += chunksize)
{ {
std::vector<size_t> p = one2multi(p_putshape, i); std::vector<size_t> p = one2multi(putshape, i);
p = apply_offset(p, p_offset); p = apply_offset(p, offset);
size_t ig = multi2one(p_varshape, p); size_t ig = multi2one(varshape, p);
std::copy((char *)p_data + i * m_dsize, std::copy((char *)a_data + i * dsize,
(char *)p_data + i * m_dsize + chunksize * m_dsize, (char *)a_data + i * dsize + chunksize * dsize,
m_buffer[m_timestep].data() + ig * m_dsize); m_cache.back().data() + ig * dsize);
} }
return 0; return 0;
} }
std::vector<size_t> CacheItem::get_shape() { return m_varshape; } void *CacheItem::get() { return m_cache.front().data(); }
std::string CacheItem::get_dtype() { return m_dtype; }
void CacheItem::flush() { m_timestep++; }
void CacheMan::remove(std::string doid, std::string var, size_t timestep) void *CacheMan::get(std::string doid, std::string var)
{ {
m_cache[doid][var].remove(timestep); return m_cache[doid][var].get();
} }
void CacheMan::remove_all(size_t timestep) void CacheMan::pop()
{ {
for (auto i : m_cache) for (auto i : m_cache)
{ {
for (auto j : m_cache[i.first]) for (auto j : i.second)
{ {
j.second.remove(timestep); j.second.pop();
} }
} }
m_timesteps_cached--;
m_timestep_first++;
} }
void CacheItem::remove(size_t timestep) { m_buffer.erase(timestep); } void CacheItem::pop() { m_cache.pop(); }
void CacheItem::clean(const std::string mode) void CacheMan::push()
{ {
if (mode == "zero") for (auto i : m_cache)
{
std::memset(m_buffer[m_timestep].data(), 0, m_varbytes);
return;
}
if (mode == "nan")
{ {
for (size_t i = 0; i < m_varsize; i++) for (auto j : i.second)
{ {
if (m_dtype == "float") j.second.push();
((float *)m_buffer[m_timestep].data())[i] =
std::numeric_limits<float>::quiet_NaN();
} }
return;
} }
m_timesteps_cached++;
} }
const void *CacheItem::get_buffer() { return m_buffer[m_timestep].data(); } void CacheItem::push()
int CacheMan::put(const void *p_data, json p_jmsg)
{ {
std::string doid = p_jmsg["doid"].get<std::string>(); size_t varbytes = m_jmsg["varbytes"].get<size_t>();
std::string var = p_jmsg["var"].get<std::string>(); std::vector<char> buff(varbytes);
return m_cache[doid][var].put(p_data, p_jmsg); clean(buff, m_clean_mode);
m_cache.push(buff);
} }
void CacheMan::flush() void CacheItem::clean(std::string a_mode) { clean(m_cache.front(), a_mode); }
void CacheMan::clean(std::string a_mode)
{ {
for (auto i : m_cache) for (auto i : m_cache)
{ {
for (auto j : m_cache[i.first]) for (auto j : i.second)
{ {
j.second.flush(); j.second.clean(a_mode);
} }
} }
} }
const void *CacheMan::get_buffer(std::string doid, std::string var) void CacheItem::clean(std::vector<char> &a_data, std::string a_mode)
{
return m_cache[doid][var].get_buffer();
}
void CacheMan::clean(std::string doid, std::string var, std::string mode)
{
m_cache[doid][var].clean(mode);
}
void CacheMan::clean_all(std::string mode)
{ {
for (auto i = m_cache.begin(); i != m_cache.end(); ++i) size_t varbytes = m_jmsg["varbytes"].get<size_t>();
size_t varsize = m_jmsg["varsize"].get<size_t>();
std::string dtype = m_jmsg["dtype"].get<std::string>();
if (a_mode == "zero")
{ {
for (auto j = m_cache[i->first].begin(); j != m_cache[i->first].end(); std::memset(a_data.data(), 0, varbytes);
++j) return;
}
else if (a_mode == "nan")
{
if (dtype == "float")
{
for (size_t j = 0; j < varsize; j++)
{
((float *)a_data.data())[j] =
std::numeric_limits<float>::quiet_NaN();
}
}
else if (dtype == "double")
{ {
j->second.clean(mode); for (size_t j = 0; j < varsize; j++)
{
((double *)a_data.data())[j] =
std::numeric_limits<double>::quiet_NaN();
}
}
else if (dtype == "int")
{
for (size_t j = 0; j < varsize; j++)
{
((int *)a_data.data())[j] =
std::numeric_limits<int>::quiet_NaN();
}
} }
} }
} }
...@@ -155,12 +178,18 @@ std::vector<std::string> CacheMan::get_var_list(std::string doid) ...@@ -155,12 +178,18 @@ std::vector<std::string> CacheMan::get_var_list(std::string doid)
return var_list; return var_list;
} }
std::vector<size_t> CacheMan::get_shape(std::string doid, std::string var) size_t CacheMan::get_timesteps_cached() { return m_timesteps_cached; }
nlohmann::json CacheMan::get_jmsg(std::string a_doid, std::string a_var)
{ {
return m_cache[doid][var].get_shape(); return m_cache[a_doid][a_var].get_jmsg();
} }
std::string CacheMan::get_dtype(std::string doid, std::string var) nlohmann::json CacheItem::get_jmsg()
{ {
return m_cache[doid][var].get_dtype(); m_jmsg.erase("putsize");
m_jmsg.erase("putshape");
m_jmsg.erase("putbytes");
m_jmsg.erase("offset");
return m_jmsg;
} }
...@@ -19,34 +19,23 @@ class CacheItem ...@@ -19,34 +19,23 @@ class CacheItem
{ {
public: public:
using json = nlohmann::json; using json = nlohmann::json;
CacheItem() = default;
virtual ~CacheItem() = default;
void init(std::string doid, std::string var, std::string dtype, int init(json a_jmsg);
std::vector<size_t> varshape); virtual int put(const void *a_data, json a_jmsg);
virtual int init(json p_jmsg);
virtual int put(const void *p_data, json p_jmsg);
virtual void transform(std::vector<char> &a_data, json &a_jmsg) {} virtual void transform(std::vector<char> &a_data, json &a_jmsg) {}
void flush(); void *get();
const void *get_buffer(); void clean(const std::string a_mode);
void clean(const std::string mode); void pop();
void remove(size_t timestep); void push();
std::vector<size_t> get_shape(); json get_jmsg();
std::string get_dtype(); void clean(std::vector<char> &a_data, std::string a_mode);
std::string m_clean_mode = "nan";
private: private:
std::map<size_t, std::vector<char>> m_buffer; std::queue<std::vector<char>> m_cache;
std::string m_doid; json m_jmsg;
std::string m_var; bool m_initialized = false;
std::string m_dtype;
size_t m_dsize;
size_t m_varsize;
size_t m_varbytes;
std::vector<size_t> m_varshape;
bool m_completed;
size_t m_timestep = 0;
inline std::vector<size_t> apply_offset(const std::vector<size_t> &p, inline std::vector<size_t> apply_offset(const std::vector<size_t> &p,
const std::vector<size_t> &o) const std::vector<size_t> &o)
...@@ -92,24 +81,22 @@ class CacheMan ...@@ -92,24 +81,22 @@ class CacheMan
public: public:
using json = nlohmann::json; using json = nlohmann::json;
CacheMan() = default; int put(const void *a_data, json a_jmsg);
virtual ~CacheMan() = default; void *get(std::string doid, std::string var);
virtual int put(const void *p_data, json p_jmsg); void pop();
void flush(); void push();
const void *get_buffer(std::string doid, std::string var);
void clean(std::string doid, std::string var, std::string mode);
void clean_all(std::string mode);
void remove(std::string doid, std::string var, size_t timestep);
void remove_all(size_t timestep);
std::vector<std::string> get_do_list(); std::vector<std::string> get_do_list();
std::vector<std::string> get_var_list(std::string doid); std::vector<std::string> get_var_list(std::string doid);
std::vector<size_t> get_shape(std::string doid, std::string var); size_t get_timesteps_cached();
std::string get_dtype(std::string doid, std::string var); json get_jmsg(std::string doid, std::string var);
void clean(std::string a_mode);
private: private:
typedef std::map<std::string, CacheItem> CacheVarMap; typedef std::map<std::string, CacheItem> CacheVarMap;
typedef std::map<std::string, CacheVarMap> CacheDoMap; typedef std::map<std::string, CacheVarMap> CacheDoMap;
CacheDoMap m_cache; CacheDoMap m_cache;
size_t m_timesteps_cached = 0;
size_t m_timestep_first = 0;
}; };
#endif #endif
...@@ -25,7 +25,8 @@ int DataMan::put(const void *a_data, json a_jmsg) ...@@ -25,7 +25,8 @@ int DataMan::put(const void *a_data, json a_jmsg)
{ {
if (m_cache_size > 0) if (m_cache_size > 0)
{ {
// to be implemented check_shape(a_jmsg);
m_cache.put(a_data, a_jmsg);
} }
else else
{ {
...@@ -85,6 +86,39 @@ void DataMan::add_stream(json a_jmsg) ...@@ -85,6 +86,39 @@ void DataMan::add_stream(json a_jmsg)
} }
} }
void DataMan::flush() { flush_next(); } void DataMan::flush()
{
m_timestep++;
if (m_cache_size > 0)
{
if (m_cache_size == m_cache.get_timesteps_cached())
{
for (int i = 0; i < m_cache_size; i++)
{
std::vector<std::string> do_list = m_cache.get_do_list();
for (auto j : do_list)
{
std::vector<std::string> var_list = m_cache.get_var_list(j);
for (auto k : var_list)
{
json jmsg = m_cache.get_jmsg(j, k);
put_begin(m_cache.get(j, k), jmsg);
put_end(m_cache.get(j, k), jmsg);
}
}
flush_next();
m_cache.pop();
}
}
else
{
m_cache.push();
}
}
else
{
flush_next();
}
}
int DataMan::get(void *a_data, json &a_jmsg) { return 0; } int DataMan::get(void *a_data, json &a_jmsg) { return 0; }
...@@ -41,7 +41,7 @@ private: ...@@ -41,7 +41,7 @@ private:
int m_num_channels = 0; int m_num_channels = 0;
std::vector<int> m_tolerance; std::vector<int> m_tolerance;
std::vector<int> m_priority; std::vector<int> m_priority;
std::queue<CacheMan> m_cache_q; CacheMan m_cache;
size_t m_cache_size = 0; size_t m_cache_size = 0;
size_t m_timestep = 0; size_t m_timestep = 0;
}; };
......
...@@ -154,7 +154,6 @@ void MdtmMan::on_recv(json a_jmsg) ...@@ -154,7 +154,6 @@ void MdtmMan::on_recv(json a_jmsg)
if (jqueue.front()["operation"] == "flush") if (jqueue.front()["operation"] == "flush")
{ {
callback(); callback();
m_cache.clean_all("nan");
jqueue.pop(); jqueue.pop();
vqueue.pop(); vqueue.pop();
iqueue.pop(); iqueue.pop();
......
...@@ -50,6 +50,10 @@ int StreamMan::init(json p_jmsg) ...@@ -50,6 +50,10 @@ int StreamMan::init(json p_jmsg)
std::string local_address = std::string local_address =
make_address(m_local_ip, m_local_port, "tcp"); make_address(m_local_ip, m_local_port, "tcp");
if (p_jmsg["clean_mode"].is_string())
{
m_clean_mode = p_jmsg["clean_mode"];
}
m_tolerance.assign(m_num_channels, 0); m_tolerance.assign(m_num_channels, 0);
m_priority.assign(m_num_channels, 100); m_priority.assign(m_num_channels, 100);
if (p_jmsg["num_channels"].is_number_integer()) if (p_jmsg["num_channels"].is_number_integer())
...@@ -91,25 +95,28 @@ int StreamMan::init(json p_jmsg) ...@@ -91,25 +95,28 @@ int StreamMan::init(json p_jmsg)
} }
} }
void StreamMan::callback() int StreamMan::callback()
{ {
if (m_callback) if (!m_callback)
{ {
std::vector<std::string> do_list = m_cache.get_do_list(); logging("callback called but callback function not registered!");
for (std::string i : do_list) return -1;
{
std::vector<std::string> var_list = m_cache.get_var_list(i);
for (std::string j : var_list)
{
m_callback(m_cache.get_buffer(i, j), i, j,
m_cache.get_dtype(i, j), m_cache.get_shape(i, j));
}
}
} }
else
std::vector<std::string> do_list = m_cache.get_do_list();
for (std::string i : do_list)
{ {
logging("callback called but callback function not registered!"); std::vector<std::string> var_list = m_cache.get_var_list(i);
for (std::string j : var_list)
{
m_callback(
m_cache.get(i, j), i, j,
m_cache.get_jmsg(i, j)["dtype"].get<std::string>(),
m_cache.get_jmsg(i, j)["varshape"].get<std::vector<size_t>>());
}
} }
return 0;
} }
void StreamMan::flush() void StreamMan::flush()
......
...@@ -31,7 +31,7 @@ public: ...@@ -31,7 +31,7 @@ public:
protected: protected:
void *zmq_context = NULL; void *zmq_context = NULL;
CacheMan m_cache; CacheMan m_cache;
void callback(); int callback();
std::string m_get_mode = "callback"; std::string m_get_mode = "callback";
std::string m_stream_mode; std::string m_stream_mode;
...@@ -39,9 +39,10 @@ protected: ...@@ -39,9 +39,10 @@ protected:
std::string m_remote_ip; std::string m_remote_ip;
int m_local_port; int m_local_port;
int m_remote_port; int m_remote_port;
int m_num_channels = 1; int m_num_channels = 10;
std::vector<int> m_tolerance; std::vector<int> m_tolerance;
std::vector<int> m_priority; std::vector<int> m_priority;
std::string m_clean_mode = "nan";
// parallel // parallel
std::string m_parallel_mode = "round"; // round, priority std::string m_parallel_mode = "round"; // round, priority
......
...@@ -71,7 +71,5 @@ void ZmqMan::on_recv(json a_jmsg) ...@@ -71,7 +71,5 @@ void ZmqMan::on_recv(json a_jmsg)
else if (a_jmsg["operation"].get<std::string>() == "flush") else if (a_jmsg["operation"].get<std::string>() == "flush")
{ {
callback(); callback();
m_cache.flush();
m_cache.clean_all("nan");
} }
} }
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment