diff --git a/examples/hello/datamanWriter/helloDataManWriter.cpp b/examples/hello/datamanWriter/helloDataManWriter.cpp index 5c413d4ac035c4d38e2cfc206537cdce6de57183..1fc22a80c7916675af32a82467273d9afc59ae42 100644 --- a/examples/hello/datamanWriter/helloDataManWriter.cpp +++ b/examples/hello/datamanWriter/helloDataManWriter.cpp @@ -48,7 +48,7 @@ int main(int argc, char *argv[]) // if not defined by user, we can change the default settings datamanSettings.SetEngine("DataManWriter"); datamanSettings.SetParameters("peer-to-peer=yes", "real_time=yes", - "compress=no"); + "compress=no", "method=cache"); datamanSettings.AddTransport("Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3"); diff --git a/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp b/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp index 4a0b1ea989b0d47be10fe7bc67901a23eac2d3cb..c551d955670f6d5620802c848672ce18dc821733 100644 --- a/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp +++ b/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp @@ -49,7 +49,7 @@ int main(int argc, char *argv[]) // if not defined by user, we can change the default settings datamanSettings.SetEngine("DataManWriter"); datamanSettings.SetParameters( - "real_time=yes", "method_type=stream", "method=dump", + "real_time=yes", "method_type=stream", "method=cache", "monitoring=yes", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12306", "remote_port=12307"); // datamanSettings.AddTransport( "Mdtm", "localIP=127.0.0.1", diff --git a/source/dataman/CMakeLists.txt b/source/dataman/CMakeLists.txt index 947f927d18b1d961c902170f943c1816e2d24b60..acbf5c2ebe44fbd6d99f2c5f7c9c468a4c4f0239 100644 --- a/source/dataman/CMakeLists.txt +++ b/source/dataman/CMakeLists.txt @@ -8,10 +8,13 @@ add_library(dataman DataMan.h DataMan.cpp ) target_include_directories(dataman PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) -target_link_libraries(dataman PUBLIC ${CMAKE_DL_LIBS}) +target_link_libraries(dataman PRIVATE adios2sys) + +add_library(cacheman SHARED CacheMan.h CacheMan.cpp) +target_link_libraries(cacheman PRIVATE dataman) install( - TARGETS dataman EXPORT adios2 + TARGETS dataman cacheman EXPORT adios2 RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} diff --git a/source/dataman/CacheMan.cpp b/source/dataman/CacheMan.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b531d9e5302895f48c16cd73b44ab7e52d755f5b --- /dev/null +++ b/source/dataman/CacheMan.cpp @@ -0,0 +1,184 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * CacheMan.cpp + * + * Created on: Apr 18, 2017 + * Author: Jason Wang + */ + +#include "CacheMan.h" + +#include <algorithm> +#include <limits> + +int CacheMan::init(json p_jmsg) { return 0; } + +int CacheItem::init(json p_jmsg) +{ + m_doid = p_jmsg["doid"]; + m_var = p_jmsg["var"]; + m_dtype = p_jmsg["dtype"]; + m_varshape = p_jmsg["varshape"].get<std::vector<size_t>>(); + m_bytes = dsize(m_dtype); + m_varsize = product(m_varshape); + m_varbytes = m_varsize * m_bytes; + + if (m_buffer[m_timestep].size() != m_varbytes) + { + m_buffer[m_timestep].resize(m_varbytes); + } + return 0; +} + +int CacheItem::put(const void *p_data, json p_jmsg) +{ + if (!check_json(p_jmsg, + {"doid", "var", "dtype", "varshape", "putshape", "offset"}, + "CacheItem")) + { + return -1; + } + init(p_jmsg); + std::vector<size_t> p_putshape = + p_jmsg["putshape"].get<std::vector<size_t>>(); + std::vector<size_t> p_varshape = + p_jmsg["varshape"].get<std::vector<size_t>>(); + std::vector<size_t> p_offset = p_jmsg["offset"].get<std::vector<size_t>>(); + + size_t putsize = product(p_putshape); + size_t chunksize = p_putshape.back(); + for (size_t i = 0; i < putsize; i += chunksize) + { + std::vector<size_t> p = one2multi(p_putshape, i); + p = apply_offset(p, p_offset); + size_t ig = multi2one(p_varshape, p); + std::copy((char *)p_data + i * m_bytes, + (char *)p_data + i * m_bytes + chunksize * m_bytes, + m_buffer[m_timestep].data() + ig * m_bytes); + } + + return 0; +} + +int CacheItem::get(void *p_data, json &p_jmsg) { return 0; } + +std::vector<size_t> CacheItem::get_shape() { return m_varshape; } + +std::string CacheItem::get_dtype() { return m_dtype; } + +void CacheItem::flush() { m_timestep++; } + +void CacheMan::remove(std::string doid, std::string var, size_t timestep) +{ + m_cache[doid][var].remove(timestep); +} + +void CacheMan::remove_all(size_t timestep) +{ + for (auto i : m_cache) + { + for (auto j : m_cache[i.first]) + { + j.second.remove(timestep); + } + } +} + +void CacheItem::remove(size_t timestep) { m_buffer.erase(timestep); } + +void CacheItem::clean(const std::string mode) +{ + if (mode == "zero") + { + std::memset(m_buffer[m_timestep].data(), 0, m_varbytes); + return; + } + if (mode == "nan") + { + for (size_t i = 0; i < m_varsize; i++) + { + if (m_dtype == "float") + ((float *)m_buffer[m_timestep].data())[i] = + std::numeric_limits<float>::quiet_NaN(); + } + return; + } +} + +const void *CacheItem::get_buffer() { return m_buffer[m_timestep].data(); } + +int CacheMan::put(const void *p_data, json p_jmsg) +{ + if (check_json(p_jmsg, {"doid", "var"}, "CacheMan")) + { + std::string doid = p_jmsg["doid"]; + std::string var = p_jmsg["var"]; + return m_cache[doid][var].put(p_data, p_jmsg); + } + return -1; +} + +int CacheMan::get(void *p_data, json &p_jmsg) { return 0; } + +void CacheMan::flush() +{ + for (auto i : m_cache) + { + for (auto j : m_cache[i.first]) + { + j.second.flush(); + } + } +} + +const void *CacheMan::get_buffer(std::string doid, std::string var) +{ + return m_cache[doid][var].get_buffer(); +} + +void CacheMan::clean(std::string doid, std::string var, std::string mode) +{ + m_cache[doid][var].clean(mode); +} + +void CacheMan::clean_all(std::string mode) +{ + for (auto i = m_cache.begin(); i != m_cache.end(); ++i) + { + for (auto j = m_cache[i->first].begin(); j != m_cache[i->first].end(); + ++j) + { + j->second.clean(mode); + } + } +} + +std::vector<std::string> CacheMan::get_do_list() +{ + std::vector<std::string> do_list; + for (auto it = m_cache.begin(); it != m_cache.end(); ++it) + do_list.push_back(it->first); + return do_list; +} + +std::vector<std::string> CacheMan::get_var_list(std::string doid) +{ + std::vector<std::string> var_list; + for (auto it = m_cache[doid].begin(); it != m_cache[doid].end(); ++it) + var_list.push_back(it->first); + return var_list; +} + +std::vector<size_t> CacheMan::get_shape(std::string doid, std::string var) +{ + return m_cache[doid][var].get_shape(); +} + +std::string CacheMan::get_dtype(std::string doid, std::string var) +{ + return m_cache[doid][var].get_dtype(); +} + +DataManBase *getMan() { return new CacheMan; } diff --git a/source/dataman/CacheMan.h b/source/dataman/CacheMan.h new file mode 100644 index 0000000000000000000000000000000000000000..9bc6371b7a119714773ae5bdde85ee9da697b13b --- /dev/null +++ b/source/dataman/CacheMan.h @@ -0,0 +1,122 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * CacheMan.h + * + * Created on: Apr 18, 2017 + * Author: Jason Wang + */ +#ifndef CACHEMAN_H_ +#define CACHEMAN_H_ + +#include "DataMan.h" + +class CacheItem : public DataManBase +{ +public: + CacheItem() = default; + virtual ~CacheItem() = default; + + void init(std::string doid, std::string var, std::string dtype, + std::vector<size_t> varshape); + + virtual int init(json p_jmsg); + virtual int put(const void *p_data, json p_jmsg); + virtual int get(void *p_data, json &p_jmsg); + virtual void transform(const void *p_in, void *p_out, json &p_jmsg){}; + + void flush(); + std::string name() { return "CacheItem"; } + std::string type() { return "Cache"; } + const void *get_buffer(); + void clean(const std::string mode); + void remove(size_t timestep); + std::vector<size_t> get_shape(); + std::string get_dtype(); + +private: + std::map<size_t, std::vector<char>> m_buffer; + std::string m_doid; + std::string m_var; + std::string m_dtype; + size_t m_bytes; + size_t m_varsize; + size_t m_varbytes; + std::vector<size_t> m_varshape; + bool m_completed; + size_t m_timestep = 0; + + inline std::vector<size_t> apply_offset(const std::vector<size_t> &p, + const std::vector<size_t> &o) + { + std::vector<size_t> g; + for (int i = 0; i < p.size(); i++) + { + g.push_back(p[i] + o[i]); + } + return g; + } + + inline size_t multi2one(const std::vector<size_t> &v, + const std::vector<size_t> &p) + { + size_t index = 0; + for (int i = 1; i < v.size(); i++) + { + index += std::accumulate(v.begin() + i, v.end(), p[i - 1], + std::multiplies<size_t>()); + } + index += p.back(); + return index; + } + + inline std::vector<size_t> one2multi(const std::vector<size_t> &v, size_t p) + { + std::vector<size_t> index(v.size()); + for (int i = 1; i < v.size(); i++) + { + size_t s = std::accumulate(v.begin() + i, v.end(), 1, + std::multiplies<size_t>()); + index[i - 1] = p / s; + p -= index[i - 1] * s; + } + index.back() = p; + return index; + } +}; + +class CacheMan : public DataManBase +{ + +public: + CacheMan() = default; + virtual ~CacheMan() = default; + + virtual int init(json p_jmsg); + virtual int put(const void *p_data, json p_jmsg); + virtual int get(void *p_data, json &p_jmsg); + virtual void transform(const void *p_in, void *p_out, json &p_jmsg){}; + + void flush(); + std::string name() { return "CacheMan"; } + std::string type() { return "Cache"; } + const void *get_buffer(std::string doid, std::string var); + void clean(std::string doid, std::string var, std::string mode); + void clean_all(std::string mode); + void remove(std::string doid, std::string var, size_t timestep); + void remove_all(size_t timestep); + std::vector<std::string> get_do_list(); + std::vector<std::string> get_var_list(std::string doid); + std::vector<size_t> get_shape(std::string doid, std::string var); + std::string get_dtype(std::string doid, std::string var); + +private: + typedef std::map<std::string, CacheItem> CacheVarMap; + typedef std::map<std::string, CacheVarMap> CacheDoMap; + CacheDoMap m_cache; +}; + +extern "C" DataManBase *getMan(); + +#endif diff --git a/source/dataman/DataMan.h b/source/dataman/DataMan.h index 709f5e7f1ef2cea4e710d9507e10d4901051f725..6316eb34ed512dd4c732cdb571c34e4dd7977b9d 100644 --- a/source/dataman/DataMan.h +++ b/source/dataman/DataMan.h @@ -17,7 +17,6 @@ class DataMan : public DataManBase { public: DataMan() = default; - virtual ~DataMan() = default; virtual int init(json p_jmsg); virtual int put(const void *p_data, json p_jmsg); virtual int get(void *p_data, json &p_jmsg); diff --git a/source/dataman/DataManBase.cpp b/source/dataman/DataManBase.cpp index 5f9f73ec8b53b456a2dd2d4c589ae5e98c4276f4..f238e56095eae5f1d8308e6e9a91289182ce0303 100644 --- a/source/dataman/DataManBase.cpp +++ b/source/dataman/DataManBase.cpp @@ -10,6 +10,50 @@ #include "DataManBase.h" +#include <sstream> + +#include <adios2sys/DynamicLoader.hxx> + +struct DataManBase::ManagerLibrary +{ + adios2sys::DynamicLoader::LibraryHandle m_LibraryHandle; + DataManBase *(*m_getManFunc)(); + ManagerLibrary(std::string method) + { + std::stringstream libNameBuilder; + libNameBuilder << adios2sys::DynamicLoader::LibPrefix() << method + << "man" << adios2sys::DynamicLoader::LibExtension(); + std::string libName = libNameBuilder.str(); + + // Bind to the dynamic library + m_LibraryHandle = adios2sys::DynamicLoader::OpenLibrary(libName); + if (!m_LibraryHandle) + { + throw std::runtime_error("Unable to locate the " + libName + + " library."); + } + + // Bind to the getMan symbol + adios2sys::DynamicLoader::SymbolPointer symbolHandle = + adios2sys::DynamicLoader::GetSymbolAddress(m_LibraryHandle, + "getMan"); + if (!symbolHandle) + { + throw std::runtime_error("Unable to locate the getMan symbol in " + + libName); + } + m_getManFunc = reinterpret_cast<DataManBase *(*)()>(symbolHandle); + } + + ~ManagerLibrary() + { + if (m_LibraryHandle) + { + adios2sys::DynamicLoader::CloseLibrary(m_LibraryHandle); + } + } +}; + DataManBase::DataManBase() { m_profiling["total_manager_time"] = 0.0f; @@ -221,38 +265,23 @@ int DataManBase::put_next(const void *p_data, json p_jmsg) std::shared_ptr<DataManBase> DataManBase::get_man(std::string method) { - void *so = NULL; -#ifdef __APPLE__ - std::string dylibname = "lib" + method + "man.dylib"; - so = dlopen(dylibname.c_str(), RTLD_NOW); - if (so) - { - std::shared_ptr<DataManBase> (*func)() = NULL; - func = (std::shared_ptr<DataManBase>(*)())dlsym(so, "getMan"); - if (func) - { - return func(); - } - } -#endif - std::string soname = "lib" + method + "man.so"; - so = dlopen(soname.c_str(), RTLD_NOW); - if (so) + try { - std::shared_ptr<DataManBase> (*func)() = NULL; - func = (std::shared_ptr<DataManBase>(*)())dlsym(so, "getMan"); - if (func) + // Reuse already loaded libraries if possible + auto libIt = m_LoadedManagers.find(method); + if (libIt == m_LoadedManagers.end()) { - return func(); - } - else - { - logging("getMan() not found in " + soname); + // This insertion will only fail if an entry for method already + // exists, which this if block ensures that it doesn't. + libIt = + m_LoadedManagers.insert({method, new ManagerLibrary(method)}) + .first; } + return std::shared_ptr<DataManBase>(libIt->second->m_getManFunc()); } - else + catch (const std::runtime_error &ex) { - logging("Dynamic library " + soname + " not found in LD_LIBRARY_PATH"); + logging(ex.what()); + return nullptr; } - return nullptr; } diff --git a/source/dataman/DataManBase.h b/source/dataman/DataManBase.h index 9b8b116b6b431c659e1b1508bcb4231518e4606b..ebc8cd0097014c36bc9edbb6cfb692798ecabb4b 100644 --- a/source/dataman/DataManBase.h +++ b/source/dataman/DataManBase.h @@ -13,15 +13,13 @@ #include <cstdint> -#include <dlfcn.h> -#include <unistd.h> - #include <chrono> #include <complex> #include <functional> #include <iostream> #include <memory> #include <string> +#include <unordered_map> #include <vector> #include "json.hpp" @@ -31,7 +29,7 @@ class DataManBase public: using json = nlohmann::json; DataManBase(); - virtual ~DataManBase() = default; + int put(const void *p_data, std::string p_doid, std::string p_var, std::string p_dtype, std::vector<size_t> p_putshape, std::vector<size_t> p_varshape, std::vector<size_t> p_offset, @@ -265,6 +263,9 @@ protected: std::map<std::string, std::shared_ptr<DataManBase>> m_next; private: + struct ManagerLibrary; + std::unordered_map<std::string, ManagerLibrary *> m_LoadedManagers; + json m_profiling; std::chrono::time_point<std::chrono::system_clock> m_start_time; std::chrono::time_point<std::chrono::system_clock> m_step_time; diff --git a/thirdparty/KWSys/CMakeLists.txt b/thirdparty/KWSys/CMakeLists.txt index 15166f82a0732ae99c06184559b4479a14b76818..ef8e57769de84143c5238d86cbdfb88440fba979 100644 --- a/thirdparty/KWSys/CMakeLists.txt +++ b/thirdparty/KWSys/CMakeLists.txt @@ -7,3 +7,8 @@ if(NOT ADIOS_BUILD_SHARED_LIBS) endif() add_subdirectory(adios2sys) + +# Add the include usage requirements for KWSys +target_include_directories(adios2sys + INTERFACE ${CMAKE_CURRENT_BINARY_DIR}/adios2sys +)