Skip to content
Snippets Groups Projects
Commit 94607efa authored by Atkins, Charles Vernon's avatar Atkins, Charles Vernon Committed by GitHub
Browse files

Merge pull request #94 from JasonRuonanWang/dataman

Fixed a bug in CacheMan for cleaning variables after a time step is finished
parents d03d8d8e 11984427
No related branches found
No related tags found
No related merge requests found
...@@ -44,25 +44,20 @@ int CacheItem::put(const void *a_data, json a_jmsg) ...@@ -44,25 +44,20 @@ int CacheItem::put(const void *a_data, json a_jmsg)
size_t varbytes = a_jmsg["varbytes"].get<size_t>(); size_t varbytes = a_jmsg["varbytes"].get<size_t>();
size_t dsize = a_jmsg["dsize"].get<size_t>(); size_t dsize = a_jmsg["dsize"].get<size_t>();
size_t timestep = 0;
if (m_cache.empty()) if (m_cache.empty())
{ {
push(); push();
} }
if (a_jmsg["timestep"].is_number())
{
timestep = a_jmsg["timestep"].get<size_t>();
}
for (size_t i = 0; i < putsize; i += chunksize) for (size_t i = 0; i < putsize; i += chunksize)
{ {
std::vector<size_t> p = one2multi(putshape, i); std::vector<size_t> p = one2multi(putshape, i);
p = apply_offset(p, offset); p = apply_offset(p, offset);
size_t ig = multi2one(varshape, p); size_t ig = multi2one(varshape, p);
std::copy((char *)a_data + i * dsize, std::copy(const_cast<char *>(static_cast<const char *>(a_data)) +
(char *)a_data + i * dsize + chunksize * dsize, i * dsize,
const_cast<char *>(static_cast<const char *>(a_data)) +
i * dsize + chunksize * dsize,
m_cache.back().data() + ig * dsize); m_cache.back().data() + ig * dsize);
} }
...@@ -78,9 +73,9 @@ void *CacheMan::get(std::string doid, std::string var) ...@@ -78,9 +73,9 @@ void *CacheMan::get(std::string doid, std::string var)
void CacheMan::pop() void CacheMan::pop()
{ {
for (auto i : m_cache) for (auto &i : m_cache)
{ {
for (auto j : i.second) for (auto &j : i.second)
{ {
j.second.pop(); j.second.pop();
} }
...@@ -93,9 +88,9 @@ void CacheItem::pop() { m_cache.pop(); } ...@@ -93,9 +88,9 @@ void CacheItem::pop() { m_cache.pop(); }
void CacheMan::push() void CacheMan::push()
{ {
for (auto i : m_cache) for (auto &i : m_cache)
{ {
for (auto j : i.second) for (auto &j : i.second)
{ {
j.second.push(); j.second.push();
} }
...@@ -106,75 +101,73 @@ void CacheMan::push() ...@@ -106,75 +101,73 @@ void CacheMan::push()
void CacheItem::push() void CacheItem::push()
{ {
size_t varbytes = m_jmsg["varbytes"].get<size_t>(); size_t varbytes = m_jmsg["varbytes"].get<size_t>();
std::vector<char> buff(varbytes); m_cache.push(std::vector<char>(varbytes));
clean(buff, m_clean_mode); clean("nan");
m_cache.push(buff);
}
void CacheItem::clean(std::string a_mode) { clean(m_cache.front(), a_mode); }
void CacheMan::clean(std::string a_mode)
{
for (auto i : m_cache)
{
for (auto j : i.second)
{
j.second.clean(a_mode);
}
}
} }
void CacheItem::clean(std::vector<char> &a_data, std::string a_mode) void CacheItem::clean(std::string a_mode)
{ {
size_t varbytes = m_jmsg["varbytes"].get<size_t>(); size_t varbytes = m_jmsg["varbytes"].get<size_t>();
size_t varsize = m_jmsg["varsize"].get<size_t>(); size_t varsize = m_jmsg["varsize"].get<size_t>();
std::string dtype = m_jmsg["dtype"].get<std::string>(); std::string dtype = m_jmsg["dtype"].get<std::string>();
if (a_mode == "zero") if (a_mode == "zero")
{ {
std::memset(a_data.data(), 0, varbytes); std::memset(m_cache.front().data(), 0, varbytes);
return; return;
} }
else if (a_mode == "nan") else if (a_mode == "nan")
{ {
if (dtype == "float") if (dtype == "float")
{ {
for (size_t j = 0; j < varsize; j++) for (size_t j = 0; j < varsize; ++j)
{ {
((float *)a_data.data())[j] = (reinterpret_cast<float *>(m_cache.front().data()))[j] =
std::numeric_limits<float>::quiet_NaN(); std::numeric_limits<float>::quiet_NaN();
} }
} }
else if (dtype == "double") else if (dtype == "double")
{ {
for (size_t j = 0; j < varsize; j++) for (size_t j = 0; j < varsize; ++j)
{ {
((double *)a_data.data())[j] = (reinterpret_cast<double *>(m_cache.front().data()))[j] =
std::numeric_limits<double>::quiet_NaN(); std::numeric_limits<double>::quiet_NaN();
} }
} }
else if (dtype == "int") else if (dtype == "int")
{ {
for (size_t j = 0; j < varsize; j++) for (size_t j = 0; j < varsize; ++j)
{ {
((int *)a_data.data())[j] = (reinterpret_cast<int *>(m_cache.front().data()))[j] =
std::numeric_limits<int>::quiet_NaN(); std::numeric_limits<int>::quiet_NaN();
} }
} }
} }
} }
void CacheMan::clean(std::string a_mode)
{
for (auto &i : m_cache)
{
for (auto &j : i.second)
{
j.second.clean(a_mode);
}
}
}
std::vector<std::string> CacheMan::get_do_list() std::vector<std::string> CacheMan::get_do_list()
{ {
std::vector<std::string> do_list; std::vector<std::string> do_list;
for (auto it = m_cache.begin(); it != m_cache.end(); ++it) for (const auto &i : m_cache)
do_list.push_back(it->first); do_list.push_back(i.first);
return do_list; return do_list;
} }
std::vector<std::string> CacheMan::get_var_list(std::string doid) std::vector<std::string> CacheMan::get_var_list(std::string doid)
{ {
std::vector<std::string> var_list; std::vector<std::string> var_list;
for (auto it = m_cache[doid].begin(); it != m_cache[doid].end(); ++it) for (const auto &i : m_cache[doid])
var_list.push_back(it->first); var_list.push_back(i.first);
return var_list; return var_list;
} }
......
...@@ -30,7 +30,7 @@ public: ...@@ -30,7 +30,7 @@ public:
void push(); void push();
json get_jmsg(); json get_jmsg();
void clean(std::vector<char> &a_data, std::string a_mode); void clean(std::vector<char> &a_data, std::string a_mode);
std::string m_clean_mode = "nan"; std::string m_clean_mode;
private: private:
std::queue<std::vector<char>> m_cache; std::queue<std::vector<char>> m_cache;
...@@ -41,7 +41,7 @@ private: ...@@ -41,7 +41,7 @@ private:
const std::vector<size_t> &o) const std::vector<size_t> &o)
{ {
std::vector<size_t> g; std::vector<size_t> g;
for (int i = 0; i < p.size(); i++) for (int i = 0; i < p.size(); ++i)
{ {
g.push_back(p[i] + o[i]); g.push_back(p[i] + o[i]);
} }
...@@ -52,7 +52,7 @@ private: ...@@ -52,7 +52,7 @@ private:
const std::vector<size_t> &p) const std::vector<size_t> &p)
{ {
size_t index = 0; size_t index = 0;
for (int i = 1; i < v.size(); i++) for (int i = 1; i < v.size(); ++i)
{ {
index += std::accumulate(v.begin() + i, v.end(), p[i - 1], index += std::accumulate(v.begin() + i, v.end(), p[i - 1],
std::multiplies<size_t>()); std::multiplies<size_t>());
...@@ -64,7 +64,7 @@ private: ...@@ -64,7 +64,7 @@ private:
inline std::vector<size_t> one2multi(const std::vector<size_t> &v, size_t p) inline std::vector<size_t> one2multi(const std::vector<size_t> &v, size_t p)
{ {
std::vector<size_t> index(v.size()); std::vector<size_t> index(v.size());
for (int i = 1; i < v.size(); i++) for (int i = 1; i < v.size(); ++i)
{ {
size_t s = std::accumulate(v.begin() + i, v.end(), 1, size_t s = std::accumulate(v.begin() + i, v.end(), 1,
std::multiplies<size_t>()); std::multiplies<size_t>());
......
...@@ -23,6 +23,7 @@ int DataMan::put(const void *a_data, std::string p_doid, std::string p_var, ...@@ -23,6 +23,7 @@ int DataMan::put(const void *a_data, std::string p_doid, std::string p_var,
int DataMan::put(const void *a_data, json a_jmsg) int DataMan::put(const void *a_data, json a_jmsg)
{ {
a_jmsg["timestep"] = m_timestep;
if (m_cache_size > 0) if (m_cache_size > 0)
{ {
check_shape(a_jmsg); check_shape(a_jmsg);
...@@ -58,14 +59,14 @@ void DataMan::add_stream(json a_jmsg) ...@@ -58,14 +59,14 @@ void DataMan::add_stream(json a_jmsg)
if (m_tolerance.size() < m_num_channels) if (m_tolerance.size() < m_num_channels)
{ {
for (int i = 0; i < m_num_channels; i++) for (int i = 0; i < m_num_channels; ++i)
{ {
m_tolerance.push_back(0); m_tolerance.push_back(0);
} }
} }
if (m_priority.size() < m_num_channels) if (m_priority.size() < m_num_channels)
{ {
for (int i = 0; i < m_num_channels; i++) for (int i = 0; i < m_num_channels; ++i)
{ {
m_priority.push_back(100 / (i + 1)); m_priority.push_back(100 / (i + 1));
} }
...@@ -93,13 +94,13 @@ void DataMan::flush() ...@@ -93,13 +94,13 @@ void DataMan::flush()
{ {
if (m_cache_size == m_cache.get_timesteps_cached()) if (m_cache_size == m_cache.get_timesteps_cached())
{ {
for (int i = 0; i < m_cache_size; i++) for (int i = 0; i < m_cache_size; ++i)
{ {
std::vector<std::string> do_list = m_cache.get_do_list(); std::vector<std::string> do_list = m_cache.get_do_list();
for (auto j : do_list) for (const auto &j : do_list)
{ {
std::vector<std::string> var_list = m_cache.get_var_list(j); std::vector<std::string> var_list = m_cache.get_var_list(j);
for (auto k : var_list) for (const auto &k : var_list)
{ {
json jmsg = m_cache.get_jmsg(j, k); json jmsg = m_cache.get_jmsg(j, k);
put_begin(m_cache.get(j, k), jmsg); put_begin(m_cache.get(j, k), jmsg);
......
...@@ -195,13 +195,13 @@ void DataManBase::reg_callback( ...@@ -195,13 +195,13 @@ void DataManBase::reg_callback(
std::vector<size_t>)> std::vector<size_t>)>
cb) cb)
{ {
if (m_next.size() == 0) if (m_next.empty())
{ {
m_callback = cb; m_callback = cb;
} }
else else
{ {
for (auto i : m_next) for (const auto &i : m_next)
{ {
i.second->reg_callback(cb); i.second->reg_callback(cb);
} }
...@@ -215,10 +215,10 @@ void DataManBase::dump(const void *p_data, json p_jmsg, std::ostream &out) ...@@ -215,10 +215,10 @@ void DataManBase::dump(const void *p_data, json p_jmsg, std::ostream &out)
std::string dtype = p_jmsg["dtype"]; std::string dtype = p_jmsg["dtype"];
size_t length = p_jmsg["dumplength"].get<size_t>(); size_t length = p_jmsg["dumplength"].get<size_t>();
size_t s = 0; size_t s = 0;
for (size_t i = 0; i < product(p_varshape, 1); i++) for (size_t i = 0; i < product(p_varshape, 1); ++i)
{ {
s++; s++;
out << ((float *)p_data)[i] << " "; out << (static_cast<const float *>(p_data))[i] << " ";
if (s == length) if (s == length)
{ {
out << std::endl; out << std::endl;
...@@ -238,7 +238,7 @@ void DataManBase::remove_next(std::string p_name) { m_next.erase(p_name); } ...@@ -238,7 +238,7 @@ void DataManBase::remove_next(std::string p_name) { m_next.erase(p_name); }
bool DataManBase::have_next() bool DataManBase::have_next()
{ {
if (m_next.size() == 0) if (m_next.empty())
{ {
return false; return false;
} }
...@@ -250,7 +250,7 @@ bool DataManBase::have_next() ...@@ -250,7 +250,7 @@ bool DataManBase::have_next()
void DataManBase::print_next(std::ostream &out) void DataManBase::print_next(std::ostream &out)
{ {
for (auto i : m_next) for (const auto &i : m_next)
{ {
out << i.second->name() << " -> "; out << i.second->name() << " -> ";
i.second->print_next(); i.second->print_next();
...@@ -301,7 +301,7 @@ void DataManBase::add_man_to_path(std::string p_new, std::string p_path, ...@@ -301,7 +301,7 @@ void DataManBase::add_man_to_path(std::string p_new, std::string p_path,
int DataManBase::flush_next() int DataManBase::flush_next()
{ {
for (auto i : m_next) for (const auto &i : m_next)
{ {
i.second->flush(); i.second->flush();
} }
...@@ -310,7 +310,7 @@ int DataManBase::flush_next() ...@@ -310,7 +310,7 @@ int DataManBase::flush_next()
int DataManBase::put_next(const void *p_data, json p_jmsg) int DataManBase::put_next(const void *p_data, json p_jmsg)
{ {
for (auto i : m_next) for (const auto &i : m_next)
{ {
i.second->put(p_data, p_jmsg); i.second->put(p_data, p_jmsg);
} }
...@@ -367,7 +367,7 @@ bool DataManBase::check_json(json p_jmsg, std::vector<std::string> p_strings, ...@@ -367,7 +367,7 @@ bool DataManBase::check_json(json p_jmsg, std::vector<std::string> p_strings,
{ {
p_man = name(); p_man = name();
} }
for (auto i : p_strings) for (const auto &i : p_strings)
{ {
if (p_jmsg[i] == nullptr) if (p_jmsg[i] == nullptr)
{ {
...@@ -386,7 +386,7 @@ size_t DataManBase::product(size_t *shape) ...@@ -386,7 +386,7 @@ size_t DataManBase::product(size_t *shape)
size_t s = 1; size_t s = 1;
if (shape) if (shape)
{ {
for (size_t i = 1; i <= shape[0]; i++) for (size_t i = 1; i <= shape[0]; ++i)
{ {
s *= shape[i]; s *= shape[i];
} }
...@@ -498,7 +498,7 @@ nlohmann::json DataManBase::atoj(unsigned int *array) ...@@ -498,7 +498,7 @@ nlohmann::json DataManBase::atoj(unsigned int *array)
if (array[0] > 0) if (array[0] > 0)
{ {
j = {array[1]}; j = {array[1]};
for (unsigned int i = 2; i <= array[0]; i++) for (unsigned int i = 2; i <= array[0]; ++i)
{ {
j.insert(j.end(), array[i]); j.insert(j.end(), array[i]);
} }
...@@ -510,7 +510,7 @@ nlohmann::json DataManBase::atoj(unsigned int *array) ...@@ -510,7 +510,7 @@ nlohmann::json DataManBase::atoj(unsigned int *array)
int DataManBase::closest(int v, json j, bool up) int DataManBase::closest(int v, json j, bool up)
{ {
int s = 100, k = 0, t; int s = 100, k = 0, t;
for (unsigned int i = 0; i < j.size(); i++) for (unsigned int i = 0; i < j.size(); ++i)
{ {
if (up) if (up)
{ {
......
...@@ -59,7 +59,7 @@ int DumpMan::put(const void *p_data, json p_jmsg) ...@@ -59,7 +59,7 @@ int DumpMan::put(const void *p_data, json p_jmsg)
auto_transform(data, p_jmsg); auto_transform(data, p_jmsg);
void *data_to_print = data.data(); void *data_to_print = data.data();
for (size_t i = 0; i < numbers_to_print; i++) for (size_t i = 0; i < numbers_to_print; ++i)
{ {
if (dtype == "float") if (dtype == "float")
{ {
......
...@@ -42,7 +42,7 @@ int MdtmMan::init(json p_jmsg) ...@@ -42,7 +42,7 @@ int MdtmMan::init(json p_jmsg)
pipe_desc["mode"] = m_stream_mode; pipe_desc["mode"] = m_stream_mode;
std::string pipename_prefix = "MdtmManPipe"; std::string pipename_prefix = "MdtmManPipe";
for (int i = 0; i < m_num_channels; i++) for (int i = 0; i < m_num_channels; ++i)
{ {
std::stringstream pipename; std::stringstream pipename;
pipename << pipename_prefix << i; pipename << pipename_prefix << i;
...@@ -76,13 +76,14 @@ int MdtmMan::init(json p_jmsg) ...@@ -76,13 +76,14 @@ int MdtmMan::init(json p_jmsg)
// Pipes // Pipes
mkdir(pipe_desc["pipe_prefix"].get<std::string>().c_str(), 0755); mkdir(pipe_desc["pipe_prefix"].get<std::string>().c_str(), 0755);
for (auto i : pipe_desc["pipe_names"].get<std::vector<std::string>>()) for (const auto &i :
pipe_desc["pipe_names"].get<std::vector<std::string>>())
{ {
std::string filename = pipe_desc["pipe_prefix"].get<std::string>() + i; std::string filename = pipe_desc["pipe_prefix"].get<std::string>() + i;
mkfifo(filename.c_str(), 0666); mkfifo(filename.c_str(), 0666);
} }
for (int i = 0; i < m_num_channels; i++) for (int i = 0; i < m_num_channels; ++i)
{ {
std::stringstream pipename; std::stringstream pipename;
pipename << pipename_prefix << i; pipename << pipename_prefix << i;
...@@ -181,7 +182,7 @@ void MdtmMan::on_recv(json a_jmsg) ...@@ -181,7 +182,7 @@ void MdtmMan::on_recv(json a_jmsg)
break; break;
} }
int pipeindex = 0; int pipeindex = 0;
for (int i = 0; i < pipenames.size(); i++) for (int i = 0; i < pipenames.size(); ++i)
{ {
if (jmsg["pipe"].get<std::string>() == pipenames[i]) if (jmsg["pipe"].get<std::string>() == pipenames[i])
{ {
......
...@@ -115,7 +115,7 @@ int StreamMan::callback() ...@@ -115,7 +115,7 @@ int StreamMan::callback()
m_cache.get_jmsg(i, j)["varshape"].get<std::vector<size_t>>()); m_cache.get_jmsg(i, j)["varshape"].get<std::vector<size_t>>());
} }
} }
m_cache.clean("nan");
return 0; return 0;
} }
...@@ -135,9 +135,9 @@ void StreamMan::zmq_meta_rep_thread_func() ...@@ -135,9 +135,9 @@ void StreamMan::zmq_meta_rep_thread_func()
std::string smsg = msg; std::string smsg = msg;
if (ret >= 0) if (ret >= 0)
{ {
json j = json::parse(msg); json jmsg = json::parse(msg);
logging("StreamMan::zmq_meta_rep_thread_func: \n" + j.dump(4)); logging("StreamMan::zmq_meta_rep_thread_func: \n" + jmsg.dump(4));
on_recv(j); on_recv(jmsg);
} }
usleep(10); usleep(10);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment