diff --git a/examples/hello/datamanWriter/helloDataManWriter.cpp b/examples/hello/datamanWriter/helloDataManWriter.cpp index 5c413d4ac035c4d38e2cfc206537cdce6de57183..1fc22a80c7916675af32a82467273d9afc59ae42 100644 --- a/examples/hello/datamanWriter/helloDataManWriter.cpp +++ b/examples/hello/datamanWriter/helloDataManWriter.cpp @@ -48,7 +48,7 @@ int main(int argc, char *argv[]) // if not defined by user, we can change the default settings datamanSettings.SetEngine("DataManWriter"); datamanSettings.SetParameters("peer-to-peer=yes", "real_time=yes", - "compress=no"); + "compress=no", "method=cache"); datamanSettings.AddTransport("Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3"); diff --git a/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp b/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp index 4a0b1ea989b0d47be10fe7bc67901a23eac2d3cb..c551d955670f6d5620802c848672ce18dc821733 100644 --- a/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp +++ b/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp @@ -49,7 +49,7 @@ int main(int argc, char *argv[]) // if not defined by user, we can change the default settings datamanSettings.SetEngine("DataManWriter"); datamanSettings.SetParameters( - "real_time=yes", "method_type=stream", "method=dump", + "real_time=yes", "method_type=stream", "method=cache", "monitoring=yes", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12306", "remote_port=12307"); // datamanSettings.AddTransport( "Mdtm", "localIP=127.0.0.1", diff --git a/source/dataman/CMakeLists.txt b/source/dataman/CMakeLists.txt index 947f927d18b1d961c902170f943c1816e2d24b60..17028d5df31991798349d71077f59d92d2d20863 100644 --- a/source/dataman/CMakeLists.txt +++ b/source/dataman/CMakeLists.txt @@ -10,8 +10,11 @@ add_library(dataman target_include_directories(dataman PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) target_link_libraries(dataman PUBLIC ${CMAKE_DL_LIBS}) +add_library(cacheman SHARED CacheMan.h CacheMan.cpp) +target_link_libraries(cacheman PRIVATE dataman) + install( - TARGETS dataman EXPORT adios2 + TARGETS dataman cacheman EXPORT adios2 RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} diff --git a/source/dataman/CacheMan.cpp b/source/dataman/CacheMan.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5e5c42901e5d3a79c1744d92c990d5667cdb0447 --- /dev/null +++ b/source/dataman/CacheMan.cpp @@ -0,0 +1,188 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * CacheMan.cpp + * + * Created on: Apr 18, 2017 + * Author: Jason Wang + */ + +#include "CacheMan.h" + +#include <algorithm> +#include <limits> + +int CacheMan::init(json p_jmsg) { return 0; } + +int CacheItem::init(json p_jmsg) +{ + m_doid = p_jmsg["doid"]; + m_var = p_jmsg["var"]; + m_dtype = p_jmsg["dtype"]; + m_varshape = p_jmsg["varshape"].get<std::vector<size_t>>(); + m_bytes = dsize(m_dtype); + m_varsize = product(m_varshape); + m_varbytes = m_varsize * m_bytes; + + if (m_buffer[m_timestep].size() != m_varbytes) + { + m_buffer[m_timestep].resize(m_varbytes); + } + return 0; +} + +int CacheItem::put(const void *p_data, json p_jmsg) +{ + if (!check_json(p_jmsg, + {"doid", "var", "dtype", "varshape", "putshape", "offset"}, + "CacheItem")) + { + return -1; + } + init(p_jmsg); + std::vector<size_t> p_putshape = + p_jmsg["putshape"].get<std::vector<size_t>>(); + std::vector<size_t> p_varshape = + p_jmsg["varshape"].get<std::vector<size_t>>(); + std::vector<size_t> p_offset = p_jmsg["offset"].get<std::vector<size_t>>(); + + size_t putsize = product(p_putshape); + size_t chunksize = p_putshape.back(); + for (size_t i = 0; i < putsize; i += chunksize) + { + std::vector<size_t> p = one2multi(p_putshape, i); + p = apply_offset(p, p_offset); + size_t ig = multi2one(p_varshape, p); + std::copy((char *)p_data + i * m_bytes, + (char *)p_data + i * m_bytes + chunksize * m_bytes, + m_buffer[m_timestep].data() + ig * m_bytes); + } + + return 0; +} + +int CacheItem::get(void *p_data, json &p_jmsg) { return 0; } + +std::vector<size_t> CacheItem::get_shape() { return m_varshape; } + +std::string CacheItem::get_dtype() { return m_dtype; } + +void CacheItem::flush() { m_timestep++; } + +void CacheMan::remove(std::string doid, std::string var, size_t timestep) +{ + m_cache[doid][var].remove(timestep); +} + +void CacheMan::remove_all(size_t timestep) +{ + for (auto i : m_cache) + { + for (auto j : m_cache[i.first]) + { + j.second.remove(timestep); + } + } +} + +void CacheItem::remove(size_t timestep) { m_buffer.erase(timestep); } + +void CacheItem::clean(const std::string mode) +{ + if (mode == "zero") + { + std::memset(m_buffer[m_timestep].data(), 0, m_varbytes); + return; + } + if (mode == "nan") + { + for (size_t i = 0; i < m_varsize; i++) + { + if (m_dtype == "float") + ((float *)m_buffer[m_timestep].data())[i] = + std::numeric_limits<float>::quiet_NaN(); + } + return; + } +} + +const void *CacheItem::get_buffer() { return m_buffer[m_timestep].data(); } + +int CacheMan::put(const void *p_data, json p_jmsg) +{ + if (check_json(p_jmsg, {"doid", "var"}, "CacheMan")) + { + std::string doid = p_jmsg["doid"]; + std::string var = p_jmsg["var"]; + return m_cache[doid][var].put(p_data, p_jmsg); + } + return -1; +} + +int CacheMan::get(void *p_data, json &p_jmsg) { return 0; } + +void CacheMan::flush() +{ + for (auto i : m_cache) + { + for (auto j : m_cache[i.first]) + { + j.second.flush(); + } + } +} + +const void *CacheMan::get_buffer(std::string doid, std::string var) +{ + return m_cache[doid][var].get_buffer(); +} + +void CacheMan::clean(std::string doid, std::string var, std::string mode) +{ + m_cache[doid][var].clean(mode); +} + +void CacheMan::clean_all(std::string mode) +{ + for (auto i = m_cache.begin(); i != m_cache.end(); ++i) + { + for (auto j = m_cache[i->first].begin(); j != m_cache[i->first].end(); + ++j) + { + j->second.clean(mode); + } + } +} + +std::vector<std::string> CacheMan::get_do_list() +{ + std::vector<std::string> do_list; + for (auto it = m_cache.begin(); it != m_cache.end(); ++it) + do_list.push_back(it->first); + return do_list; +} + +std::vector<std::string> CacheMan::get_var_list(std::string doid) +{ + std::vector<std::string> var_list; + for (auto it = m_cache[doid].begin(); it != m_cache[doid].end(); ++it) + var_list.push_back(it->first); + return var_list; +} + +std::vector<size_t> CacheMan::get_shape(std::string doid, std::string var) +{ + return m_cache[doid][var].get_shape(); +} + +std::string CacheMan::get_dtype(std::string doid, std::string var) +{ + return m_cache[doid][var].get_dtype(); +} + +DataManBase* getMan() +{ + return new CacheMan; +} + diff --git a/source/dataman/CacheMan.h b/source/dataman/CacheMan.h new file mode 100644 index 0000000000000000000000000000000000000000..b362e2a6290052c4d197748497a2a49881cc74e8 --- /dev/null +++ b/source/dataman/CacheMan.h @@ -0,0 +1,122 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * CacheMan.h + * + * Created on: Apr 18, 2017 + * Author: Jason Wang + */ +#ifndef CACHEMAN_H_ +#define CACHEMAN_H_ + +#include "DataMan.h" + +class CacheItem : public DataManBase +{ +public: + CacheItem() = default; + virtual ~CacheItem() = default; + + void init(std::string doid, std::string var, std::string dtype, + std::vector<size_t> varshape); + + virtual int init(json p_jmsg); + virtual int put(const void *p_data, json p_jmsg); + virtual int get(void *p_data, json &p_jmsg); + virtual void transform(const void *p_in, void *p_out, json &p_jmsg){}; + + void flush(); + std::string name() { return "CacheItem"; } + std::string type() { return "Cache"; } + const void *get_buffer(); + void clean(const std::string mode); + void remove(size_t timestep); + std::vector<size_t> get_shape(); + std::string get_dtype(); + +private: + std::map<size_t, std::vector<char>> m_buffer; + std::string m_doid; + std::string m_var; + std::string m_dtype; + size_t m_bytes; + size_t m_varsize; + size_t m_varbytes; + std::vector<size_t> m_varshape; + bool m_completed; + size_t m_timestep = 0; + + inline std::vector<size_t> apply_offset(const std::vector<size_t> &p, + const std::vector<size_t> &o) + { + std::vector<size_t> g; + for (int i = 0; i < p.size(); i++) + { + g.push_back(p[i] + o[i]); + } + return g; + } + + inline size_t multi2one(const std::vector<size_t> &v, + const std::vector<size_t> &p) + { + size_t index = 0; + for (int i = 1; i < v.size(); i++) + { + index += std::accumulate(v.begin() + i, v.end(), p[i - 1], + std::multiplies<size_t>()); + } + index += p.back(); + return index; + } + + inline std::vector<size_t> one2multi(const std::vector<size_t> &v, size_t p) + { + std::vector<size_t> index(v.size()); + for (int i = 1; i < v.size(); i++) + { + size_t s = std::accumulate(v.begin() + i, v.end(), 1, + std::multiplies<size_t>()); + index[i - 1] = p / s; + p -= index[i - 1] * s; + } + index.back() = p; + return index; + } +}; + +class CacheMan : public DataManBase +{ + +public: + CacheMan() = default; + virtual ~CacheMan() = default; + + virtual int init(json p_jmsg); + virtual int put(const void *p_data, json p_jmsg); + virtual int get(void *p_data, json &p_jmsg); + virtual void transform(const void *p_in, void *p_out, json &p_jmsg){}; + + void flush(); + std::string name() { return "CacheMan"; } + std::string type() { return "Cache"; } + const void *get_buffer(std::string doid, std::string var); + void clean(std::string doid, std::string var, std::string mode); + void clean_all(std::string mode); + void remove(std::string doid, std::string var, size_t timestep); + void remove_all(size_t timestep); + std::vector<std::string> get_do_list(); + std::vector<std::string> get_var_list(std::string doid); + std::vector<size_t> get_shape(std::string doid, std::string var); + std::string get_dtype(std::string doid, std::string var); + +private: + typedef std::map<std::string, CacheItem> CacheVarMap; + typedef std::map<std::string, CacheVarMap> CacheDoMap; + CacheDoMap m_cache; +}; + +extern "C" DataManBase* getMan(); + +#endif