diff --git a/CMakeLists.txt b/CMakeLists.txt index 9e267cc5b85cc4484ccfc515bb039722c71a2548..cc718b9c0476e0726dacde7e6a7e0939b861b368 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -72,7 +72,10 @@ if(ADIOS_USE_MPI) endif() option(ADIOS_USE_BZip2 "Enable support for BZip2 transforms" OFF) option(ADIOS_USE_ADIOS1 "Enable support for the ADIOS 1 engine" OFF) -option(ADIOS_USE_DataMan "Enable support for the DataMan engine" OFF) + +# DataMan is not a user-settable option. It will always be enabled if the +# platform supports it. +set(ADIOS_USE_DataMan ${SHARED_LIBS_SUPPORTED}) #------------------------------------------------------------------------------# # Third party libraries diff --git a/cmake/FindDataMan.cmake b/cmake/FindDataMan.cmake deleted file mode 100644 index cb4d2315a119a6a13d2d951c0c6dac1b273d3798..0000000000000000000000000000000000000000 --- a/cmake/FindDataMan.cmake +++ /dev/null @@ -1,72 +0,0 @@ -#------------------------------------------------------------------------------# -# Distributed under the OSI-approved Apache License, Version 2.0. See -# accompanying file Copyright.txt for details. -#------------------------------------------------------------------------------# -# -# FindDataMan -# --------- -# -# Try to find the DataMan library from Jason Wang, ORNL -# https://github.com/JasonRuonanWang/DataMan -# -# This module defines the following variables: -# -# DataMan_FOUND - System has DataMan -# DataMan_INCLUDE_DIRS - The DataMan include directory -# DataMan_LIBRARIES - Link these to use DataMan -# -# and the following imported targets: -# DataMan::DataMan - The core DataMan library -# -# You can also set the following variable to help guide the search: -# DataMan_ROOT_DIR - The install prefix for DataMan containing the -# include and lib folders -# Note: this can be set as a CMake variable or an -# environment variable. If specified as a CMake -# variable, it will override any setting specified -# as an environment variable. - -if(NOT DataMan_FOUND) - if((NOT DataMan_ROOT_DIR) AND (NOT (ENV{DataMan_ROOT_DIR} STREQUAL ""))) - set(DataMan_ROOT_DIR "$ENV{DataMan_ROOT_DIR}") - endif() - - # Search for the core libraries - if(DataMan_ROOT_DIR) - # If a root directory is specified, then don't look anywhere else - find_path(DataMan_INCLUDE_DIR DataMan.h - HINTS ${DataMan_ROOT_DIR}/include - NO_DEFAULT_PATHS - ) - set(_DataMan_LIBRARY_HINT HINTS ${DataMan_ROOT_DIR}/lib NO_DEFAULT_PATHS) - else() - # Otherwise use the include dir as a basis to search for the lib - find_path(DataMan_INCLUDE_DIR DataMan.h) - if(DataMan_INCLUDE_DIR) - get_filename_component(_DataMan_PREFIX "${DataMan_INCLUDE_DIR}" PATH) - set(_DataMan_LIBRARY_HINT HINTS ${_DataMan_PREFIX}/lib) - unset(_DataMan_PREFIX) - endif() - endif() - find_library(DataMan_LIBRARY dataman ${_DataMan_LIBRARY_HINT}) - unset(_DataMan_LIBRARY_HINT) - - include(FindPackageHandleStandardArgs) - find_package_handle_standard_args(DataMan - FOUND_VAR DataMan_FOUND - REQUIRED_VARS - DataMan_INCLUDE_DIR - DataMan_LIBRARY - ) - if(DataMan_FOUND) - set(DataMan_INCLUDE_DIRS ${DataMan_INCLUDE_DIR}) - set(DataMan_LIBRARIES ${DataMan_LIBRARY}) - if(DataMan_FOUND AND NOT TARGET DataMan::DataMan) - add_library(DataMan::DataMan UNKNOWN IMPORTED) - set_target_properties(DataMan::DataMan PROPERTIES - IMPORTED_LOCATION "${DataMan_LIBRARY}" - INTERFACE_INCLUDE_DIRECTORIES "${DataMan_INCLUDE_DIR}" - ) - endif() - endif() -endif() diff --git a/include/engine/dataman/DataManReader.h b/include/engine/dataman/DataManReader.h index f82e649bf9851cb68cafb3006461703474efcfa9..f29494f45ca7441c46c039ebfbd5d9115d41f19d 100644 --- a/include/engine/dataman/DataManReader.h +++ b/include/engine/dataman/DataManReader.h @@ -19,7 +19,7 @@ // supported capsules #include "capsule/heap/STLVector.h" -#include "DataManager.h" +#include "utilities/realtime/dataman/DataMan.h" namespace adios { @@ -38,11 +38,12 @@ public: * @param debugMode * @param nthreads */ + using json = nlohmann::json; DataManReader(ADIOS &adios, const std::string &name, const std::string accessMode, MPI_Comm mpiComm, const Method &method); - ~DataManReader(); + virtual ~DataManReader() = default; /** * Set callback function from user application @@ -108,7 +109,7 @@ private: /// m_Transports bool m_DoRealTime = false; - DataManager m_Man; + realtime::DataMan m_Man; std::function<void(const void *, std::string, std::string, std::string, Dims)> m_CallBack; ///< call back function diff --git a/include/engine/dataman/DataManWriter.h b/include/engine/dataman/DataManWriter.h index 6116de429282a89f21f7208827d09ba07c885e51..9902afb02c249eb95d762a9008352e6502ad1101 100644 --- a/include/engine/dataman/DataManWriter.h +++ b/include/engine/dataman/DataManWriter.h @@ -20,7 +20,7 @@ // supported capsules #include "capsule/heap/STLVector.h" -#include "DataManager.h" //here comes your DataMan header +#include "utilities/realtime/dataman/DataMan.h" namespace adios { @@ -29,6 +29,7 @@ class DataManWriter : public Engine { public: + using json = nlohmann::json; /** * Constructor for dataman engine Writer for WAN communications * @param adios @@ -43,7 +44,7 @@ public: const std::string accessMode, MPI_Comm mpiComm, const Method &method); - ~DataManWriter(); + virtual ~DataManWriter() = default; void SetCallBack(std::function<void(const void *, std::string, std::string, std::string, Dims)> @@ -106,7 +107,7 @@ private: bool m_DoRealTime = false; bool m_DoMonitor = false; - DataManager m_Man; + realtime::DataMan m_Man; std::function<void(const void *, std::string, std::string, std::string, Dims)> m_CallBack; ///< call back function @@ -157,10 +158,10 @@ private: MPI_Barrier(m_MPIComm); std::cout << "I am hooked to the DataMan library\n"; std::cout << "putshape " << variable.m_LocalDimensions.size() - << endl; + << std::endl; std::cout << "varshape " << variable.m_GlobalDimensions.size() - << endl; - std::cout << "offset " << variable.m_Offsets.size() << endl; + << std::endl; + std::cout << "offset " << variable.m_Offsets.size() << std::endl; for (int i = 0; i < m_SizeMPI; ++i) { if (i == m_RankMPI) diff --git a/include/transport/wan/MdtmMan.h b/include/transport/wan/MdtmMan.h index 7ddd162acc11beb73abedab6f43aad5c9dd0b7ae..78f0f92b8aadbc28d4c3858286d4392979151ee0 100644 --- a/include/transport/wan/MdtmMan.h +++ b/include/transport/wan/MdtmMan.h @@ -14,7 +14,7 @@ #include "core/Transport.h" #include "external/json.hpp" -#include "DataMan.h" //here comes your DataMan header +#include "utilities/realtime/dataman/DataManBase.h" namespace adios { @@ -43,7 +43,7 @@ public: const std::vector<int> priorities, MPI_Comm mpiComm, const bool debugMode); - ~MdtmMan(); + virtual ~MdtmMan() = default; void Open(const std::string name, const std::string accessMode); diff --git a/include/utilities/realtime/dataman/DataMan.h b/include/utilities/realtime/dataman/DataMan.h new file mode 100644 index 0000000000000000000000000000000000000000..59640e933fd9e1a5dc12800a2b3e924ff42bf92c --- /dev/null +++ b/include/utilities/realtime/dataman/DataMan.h @@ -0,0 +1,54 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * DataMan.h + * + * Created on: Apr 12, 2017 + * Author: Jason Wang + */ + +#ifndef DATAMANAGER_H_ +#define DATAMANAGER_H_ + +#include "utilities/realtime/dataman/DataManBase.h" + +namespace adios +{ +namespace realtime +{ + +class DataMan : public DataManBase +{ +public: + DataMan() = default; + virtual ~DataMan() = default; + virtual int init(json p_jmsg); + virtual int put(const void *p_data, json p_jmsg); + virtual int get(void *p_data, json &p_jmsg); + void flush(); + void add_stream(json p_jmsg); + int put(const void *p_data, std::string p_doid, std::string p_var, + std::string p_dtype, std::vector<size_t> p_putshape, + std::vector<size_t> p_varshape, std::vector<size_t> p_offset, + size_t p_timestep, int p_tolerance = 0, int p_priority = 100); + void add_file(std::string p_method); + std::string name() { return "DataManager"; } + std::string type() { return "Manager"; } + virtual void transform(const void *p_in, void *p_out, json &p_jmsg){}; + +private: + std::string m_local_ip = ""; + std::string m_remote_ip = ""; + int m_local_port = 0; + int m_remote_port = 0; + int m_num_channels = 0; + std::vector<int> m_tolerance; + std::vector<int> m_priority; +}; + +// end namespace realtime +} +// end namespace adios +} +#endif diff --git a/include/utilities/realtime/dataman/DataManBase.h b/include/utilities/realtime/dataman/DataManBase.h new file mode 100644 index 0000000000000000000000000000000000000000..9d14457427a4feeaeff65222c1d6a42fef352a12 --- /dev/null +++ b/include/utilities/realtime/dataman/DataManBase.h @@ -0,0 +1,283 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * DataManBase.h + * + * Created on: Apr 12, 2017 + * Author: Jason Wang + */ + +#ifndef DATAMAN_H_ +#define DATAMAN_H_ + +#include <cstdint> + +#include <dlfcn.h> +#include <unistd.h> + +#include <chrono> +#include <complex> +#include <functional> +#include <iostream> +#include <memory> +#include <string> +#include <vector> + +#include "external/json.hpp" + +namespace adios +{ +namespace realtime +{ + +class DataManBase +{ +public: + using json = nlohmann::json; + DataManBase(); + virtual ~DataManBase() = default; + int put(const void *p_data, std::string p_doid, std::string p_var, + std::string p_dtype, std::vector<size_t> p_putshape, + std::vector<size_t> p_varshape, std::vector<size_t> p_offset, + size_t p_timestep, int p_tolerance = 0, int p_priority = 100); + + virtual int put_begin(const void *p_data, json &p_jmsg); + + virtual int put_end(const void *p_data, json &p_jmsg); + + virtual int put(const void *p_data, json p_jmsg) = 0; + + int get(void *p_data, std::string p_doid, std::string p_var, + std::string p_dtype, std::vector<size_t> p_getshape, + std::vector<size_t> p_varshape, std::vector<size_t> p_offset, + size_t p_timestep); + + int get(void *p_data, std::string p_doid, std::string p_var, + std::string &p_dtype, std::vector<size_t> &p_varshape, + size_t &p_timestep); + + virtual int get(void *p_data, json &p_jmsg) = 0; + virtual int init(json p_jmsg) = 0; + virtual void flush() = 0; + virtual std::string name() = 0; + virtual std::string type() = 0; + void reg_callback(std::function<void(const void *, std::string, std::string, + std::string, std::vector<size_t>)> + cb); + + void dump(const void *p_data, json p_jmsg, std::ostream &out = std::cout); + + void add_next(std::string p_name, std::shared_ptr<DataManBase> p_next); + + void remove_next(std::string p_name); + + bool have_next(); + + void print_next(std::ostream &out = std::cout); + + virtual void transform(const void *p_in, void *p_out, json &p_jmsg) = 0; + +protected: + bool auto_transform(const void *p_in, void *p_out, json &p_jmsg); + + void add_man_to_path(std::string p_new, std::string p_path); + + virtual int flush_next(); + + virtual int put_next(const void *p_data, json p_jmsg); + + std::shared_ptr<DataManBase> get_man(std::string method); + + inline void logging(std::string p_msg, std::string p_man = "", + std::ostream &out = std::cout) + { + if (p_man == "") + p_man = name(); + out << "["; + out << p_man; + out << "]"; + out << " "; + out << p_msg; + out << std::endl; + } + + inline bool check_json(json p_jmsg, std::vector<std::string> p_strings, + std::string p_man = "") + { + if (p_man == "") + p_man = name(); + for (auto i : p_strings) + { + if (p_jmsg[i] == nullptr) + { + if (p_man != "") + { + logging("JSON key " + i + " not found!", p_man); + } + return false; + } + } + return true; + } + + inline size_t product(size_t *shape) + { + size_t s = 1; + if (shape) + { + for (size_t i = 1; i <= shape[0]; i++) + { + s *= shape[i]; + } + } + return s; + } + + inline size_t product(std::vector<size_t> shape, size_t size = 1) + { + return accumulate(shape.begin(), shape.end(), size, + std::multiplies<size_t>()); + } + + inline size_t dsize(std::string dtype) + { + if (dtype == "char") + return sizeof(char); + if (dtype == "short") + return sizeof(short); + if (dtype == "int") + return sizeof(int); + if (dtype == "long") + return sizeof(long); + if (dtype == "unsigned char") + return sizeof(unsigned char); + if (dtype == "unsigned short") + return sizeof(unsigned short); + if (dtype == "unsigned int") + return sizeof(unsigned int); + if (dtype == "unsigned long") + return sizeof(unsigned long); + if (dtype == "float") + return sizeof(float); + if (dtype == "double") + return sizeof(double); + if (dtype == "long double") + return sizeof(long double); + if (dtype == "std::complex<float>" or dtype == "complex<float>") + return sizeof(std::complex<float>); + if (dtype == "std::complex<double>") + return sizeof(std::complex<double>); + + if (dtype == "int8_t") + return sizeof(int8_t); + if (dtype == "uint8_t") + return sizeof(uint8_t); + if (dtype == "int16_t") + return sizeof(int16_t); + if (dtype == "uint16_t") + return sizeof(uint16_t); + if (dtype == "int32_t") + return sizeof(int32_t); + if (dtype == "uint32_t") + return sizeof(uint32_t); + if (dtype == "int64_t") + return sizeof(int64_t); + if (dtype == "uint64_t") + return sizeof(uint64_t); + return 0; + } + + inline json atoj(unsigned int *array) + { + json j; + if (array) + { + if (array[0] > 0) + { + j = {array[1]}; + for (unsigned int i = 2; i <= array[0]; i++) + { + j.insert(j.end(), array[i]); + } + } + } + return j; + } + + inline std::string rmquote(std::string in) + { + return in.substr(1, in.length() - 2); + } + + inline bool isin(std::string a, json j) + { + for (unsigned int i = 0; i < j.size(); i++) + { + if (j[i] == a) + return true; + } + return false; + } + + inline int closest(int v, json j, bool up) + { + int s = 100, k = 0, t; + for (unsigned int i = 0; i < j.size(); i++) + { + if (up) + t = j[i].get<int>() - v; + else + t = v - j[i].get<int>(); + if (t >= 0 && t < s) + { + s = t; + k = i; + } + } + return k; + } + + inline void check_shape(json &p_jmsg) + { + std::vector<size_t> varshape; + if (check_json(p_jmsg, {"varshape"})) + { + varshape = p_jmsg["varshape"].get<std::vector<size_t>>(); + } + else + { + return; + } + if (p_jmsg["putshape"] == nullptr) + { + p_jmsg["putshape"] = varshape; + } + if (p_jmsg["offset"] == nullptr) + { + p_jmsg["offset"] = std::vector<size_t>(varshape.size(), 0); + } + p_jmsg["putbytes"] = + product(p_jmsg["putshape"].get<std::vector<size_t>>(), + dsize(p_jmsg["dtype"].get<std::string>())); + p_jmsg["varbytes"] = + product(varshape, dsize(p_jmsg["dtype"].get<std::string>())); + } + + std::function<void(const void *, std::string, std::string, std::string, + std::vector<size_t>)> + m_callback; + std::map<std::string, std::shared_ptr<DataManBase>> m_next; + +private: + json m_profiling; + std::chrono::time_point<std::chrono::system_clock> m_start_time; + std::chrono::time_point<std::chrono::system_clock> m_step_time; + bool m_profiling_enabled = false; +}; + +// end namespace realtime +} +// end namespace adios +} +#endif diff --git a/source/CMakeLists.txt b/source/CMakeLists.txt index 2ccf94700c6304df4b8eacf788824956bdc5ac8d..d9d67a90fa156d8175751e5822ae729b3976e346 100644 --- a/source/CMakeLists.txt +++ b/source/CMakeLists.txt @@ -43,14 +43,15 @@ foreach(adios2_target IN LISTS adios2_targets) ) if(ADIOS_USE_DataMan) - find_package(DataMan REQUIRED) target_sources(${adios2_target} PRIVATE engine/dataman/DataManReader.cpp engine/dataman/DataManWriter.cpp transport/wan/MdtmMan.cpp + utilities/realtime/dataman/DataManBase.cpp + utilities/realtime/dataman/DataMan.cpp ) target_compile_definitions(${adios2_target} PRIVATE ADIOS_HAVE_DATAMAN) - target_link_libraries(${adios2_target} PRIVATE DataMan::DataMan) + target_link_libraries(${adios2_target} PRIVATE ${CMAKE_DL_LIBS}) endif() if(ADIOS_USE_BZip2) diff --git a/source/engine/dataman/DataManReader.cpp b/source/engine/dataman/DataManReader.cpp index 4b383491c464397e3b2360327fe2146c15ad0c25..4ad22bcb8f7d3f47f68711499cbb2ffdfc37160d 100644 --- a/source/engine/dataman/DataManReader.cpp +++ b/source/engine/dataman/DataManReader.cpp @@ -20,8 +20,6 @@ #include "transport/file/FilePointer.h" // uses C FILE* #include "transport/wan/MdtmMan.h" //uses Mdtm library -#include "DataMan.h" //here comes your DataMan header from external dataman library - namespace adios { @@ -35,8 +33,6 @@ DataManReader::DataManReader(ADIOS &adios, const std::string &name, Init(); } -DataManReader::~DataManReader() {} - void DataManReader::SetCallBack( std::function<void(const void *, std::string, std::string, std::string, Dims)> diff --git a/source/engine/dataman/DataManWriter.cpp b/source/engine/dataman/DataManWriter.cpp index 3810d3f0ba291a1551e563e556f57e662b8876c3..7e438b535152f9acbbe80d2b037f98780772bf3a 100644 --- a/source/engine/dataman/DataManWriter.cpp +++ b/source/engine/dataman/DataManWriter.cpp @@ -32,8 +32,6 @@ DataManWriter::DataManWriter(ADIOS &adios, const std::string name, Init(); } -DataManWriter::~DataManWriter() {} - void DataManWriter::SetCallBack( std::function<void(const void *, std::string, std::string, std::string, Dims)> diff --git a/source/transport/wan/MdtmMan.cpp b/source/transport/wan/MdtmMan.cpp index cdece2ecbfa5270b2e36eedb07af430319056cb1..c3fbdafd414ce82ceba78bfb8dbd0a7e950ce28b 100644 --- a/source/transport/wan/MdtmMan.cpp +++ b/source/transport/wan/MdtmMan.cpp @@ -27,8 +27,6 @@ MdtmMan::MdtmMan(const std::string localIP, const std::string remoteIP, { } -MdtmMan::~MdtmMan() {} - void MdtmMan::Open(const std::string name, const std::string accessMode) {} void MdtmMan::SetBuffer(char *buffer, std::size_t size) {} diff --git a/source/utilities/realtime/dataman/DataMan.cpp b/source/utilities/realtime/dataman/DataMan.cpp new file mode 100644 index 0000000000000000000000000000000000000000..430c8e3633f3621504033a983f1390c1fb0f3a89 --- /dev/null +++ b/source/utilities/realtime/dataman/DataMan.cpp @@ -0,0 +1,79 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * DataMan.cpp + * + * Created on: Apr 12, 2017 + * Author: Jason Wang + */ + +#include "utilities/realtime/dataman/DataMan.h" + +namespace adios +{ +namespace realtime +{ + +int DataMan::init(json p_jmsg) { return 0; } + +int DataMan::put(const void *p_data, std::string p_doid, std::string p_var, + std::string p_dtype, std::vector<size_t> p_putshape, + std::vector<size_t> p_varshape, std::vector<size_t> p_offset, + size_t p_timestep, int p_tolerance, int p_priority) +{ + return DataMan::put(p_data, p_doid, p_var, p_dtype, p_putshape, p_varshape, + p_offset, p_timestep, p_tolerance, p_priority); +} + +int DataMan::put(const void *p_data, json p_jmsg) +{ + put_begin(p_data, p_jmsg); + put_end(p_data, p_jmsg); + return 0; +} + +void DataMan::add_file(std::string p_method) {} + +void DataMan::add_stream(json p_jmsg) +{ + + std::string method = "zmq"; + + if (p_jmsg["method"] != nullptr) + method = p_jmsg["method"]; + + logging("Streaming method " + method + " added"); + + if (m_tolerance.size() < m_num_channels) + { + for (int i = 0; i < m_num_channels; i++) + { + m_tolerance.push_back(0); + } + } + if (m_priority.size() < m_num_channels) + { + for (int i = 0; i < m_num_channels; i++) + { + m_priority.push_back(100 / (i + 1)); + } + } + + auto man = get_man(method); + if (man) + { + man->init(p_jmsg); + this->add_next(method, man); + } + add_man_to_path("zfp", method); +} + +void DataMan::flush() { flush_next(); } + +int DataMan::get(void *p_data, json &p_jmsg) { return 0; } + +// end namespace realtime +} +// end namespace adios +} diff --git a/source/utilities/realtime/dataman/DataManBase.cpp b/source/utilities/realtime/dataman/DataManBase.cpp new file mode 100644 index 0000000000000000000000000000000000000000..5081eba05fa813ab56d593a8eee2e9f270a32848 --- /dev/null +++ b/source/utilities/realtime/dataman/DataManBase.cpp @@ -0,0 +1,268 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * DataManBase.cpp + * + * Created on: Apr 12, 2017 + * Author: Jason Wang + */ + +#include "utilities/realtime/dataman/DataManBase.h" + +namespace adios +{ +namespace realtime +{ + +DataManBase::DataManBase() +{ + m_profiling["total_manager_time"] = 0.0f; + m_profiling["total_mb"] = 0.0f; + m_start_time = std::chrono::system_clock::now(); +} + +int DataManBase::put(const void *p_data, std::string p_doid, std::string p_var, + std::string p_dtype, std::vector<size_t> p_putshape, + std::vector<size_t> p_varshape, + std::vector<size_t> p_offset, size_t p_timestep, + int p_tolerance, int p_priority) +{ + json msg; + msg["doid"] = p_doid; + msg["var"] = p_var; + msg["dtype"] = p_dtype; + msg["putshape"] = p_putshape; + msg["putbytes"] = product(p_putshape, dsize(p_dtype)); + msg["varshape"] = p_varshape; + msg["varbytes"] = product(p_varshape, dsize(p_dtype)); + msg["offset"] = p_offset; + msg["timestep"] = p_timestep; + msg["tolerance"] = p_tolerance; + msg["priority"] = p_priority; + return put(p_data, msg); +} + +int DataManBase::put_begin(const void *p_data, json &p_jmsg) +{ + check_shape(p_jmsg); + p_jmsg["profiling"] = m_profiling; + m_step_time = std::chrono::system_clock::now(); + return 0; +} + +int DataManBase::put_end(const void *p_data, json &p_jmsg) +{ + auto end = std::chrono::system_clock::now(); + std::chrono::duration<double> duration = end - m_step_time; + m_profiling["total_manager_time"] = + m_profiling["total_manager_time"].get<double>() + duration.count(); + m_profiling["total_mb"] = + m_profiling["total_mb"].get<size_t>() + + product(p_jmsg["varshape"], dsize(p_jmsg["dtype"])) / 1000000.0f; + duration = end - m_start_time; + m_profiling["total_workflow_time"] = duration.count(); + m_profiling["workflow_mbs"] = + m_profiling["total_mb"].get<double>() / + m_profiling["total_workflow_time"].get<double>(); + m_profiling["manager_mbs"] = + m_profiling["total_mb"].get<double>() / + m_profiling["total_manager_time"].get<double>(); + if (p_jmsg["compressed_size"] != nullptr) + p_jmsg["putbytes"] = p_jmsg["compressed_size"].get<size_t>(); + put_next(p_data, p_jmsg); + return 0; +} + +int DataManBase::get(void *p_data, std::string p_doid, std::string p_var, + std::string p_dtype, std::vector<size_t> p_getshape, + std::vector<size_t> p_varshape, + std::vector<size_t> p_offset, size_t p_timestep) +{ + json msg; + msg["doid"] = p_doid; + msg["var"] = p_var; + msg["dtype"] = p_dtype; + msg["getshape"] = p_getshape; + msg["varshape"] = p_varshape; + msg["offset"] = p_offset; + msg["timestep"] = p_timestep; + return get(p_data, msg); +} + +int DataManBase::get(void *p_data, std::string p_doid, std::string p_var, + std::string &p_dtype, std::vector<size_t> &p_varshape, + size_t &p_timestep) +{ + json msg; + msg["doid"] = p_doid; + msg["var"] = p_var; + return get(p_data, msg); +} + +void DataManBase::reg_callback( + std::function<void(const void *, std::string, std::string, std::string, + std::vector<size_t>)> + cb) +{ + if (m_next.size() == 0) + { + m_callback = cb; + } + else + { + for (auto i : m_next) + { + i.second->reg_callback(cb); + } + } +} + +void DataManBase::dump(const void *p_data, json p_jmsg, std::ostream &out) +{ + std::vector<size_t> p_varshape = + p_jmsg["varshape"].get<std::vector<size_t>>(); + std::string dtype = p_jmsg["dtype"]; + size_t length = p_jmsg["dumplength"].get<size_t>(); + size_t s = 0; + for (size_t i = 0; i < product(p_varshape, 1); i++) + { + s++; + out << ((float *)p_data)[i] << " "; + if (s == length) + { + out << std::endl; + s = 0; + } + } + out << std::endl; +} + +void DataManBase::add_next(std::string p_name, + std::shared_ptr<DataManBase> p_next) +{ + m_next[p_name] = p_next; +} + +void DataManBase::remove_next(std::string p_name) { m_next.erase(p_name); } + +bool DataManBase::have_next() +{ + if (m_next.size() == 0) + { + return false; + } + else + { + return true; + } +} + +void DataManBase::print_next(std::ostream &out) +{ + for (auto i : m_next) + { + out << i.second->name() << " -> "; + i.second->print_next(); + out << std::endl; + } +} + +bool DataManBase::auto_transform(const void *p_in, void *p_out, json &p_jmsg) +{ + if (p_jmsg["compression_method"] != nullptr) + { + auto method = p_jmsg["compression_method"]; + auto man = get_man(method); + if (man == nullptr) + { + logging("Library file for compression method " + + p_jmsg["compression_method"].dump() + " not found!"); + return false; + } + man->transform(p_in, p_out, p_jmsg); + p_jmsg.erase("compression_method"); + p_jmsg.erase("compression_rate"); + p_jmsg.erase("compressed_size"); + return true; + } + else + { + return false; + } +} + +void DataManBase::add_man_to_path(std::string p_new, std::string p_path) +{ + if (m_next.count(p_path) > 0) + { + auto man = get_man(p_new); + if (man) + { + man->add_next(p_path, m_next[p_path]); + this->add_next(p_new, man); + this->remove_next(p_path); + } + } +} + +int DataManBase::flush_next() +{ + for (auto i : m_next) + { + i.second->flush(); + } + return 0; +} + +int DataManBase::put_next(const void *p_data, json p_jmsg) +{ + for (auto i : m_next) + { + i.second->put(p_data, p_jmsg); + } + return 0; +} + +std::shared_ptr<DataManBase> DataManBase::get_man(std::string method) +{ + void *so = NULL; +#ifdef __APPLE__ + std::string dylibname = "lib" + method + "man.dylib"; + so = dlopen(dylibname.c_str(), RTLD_NOW); + if (so) + { + std::shared_ptr<DataManBase> (*func)() = NULL; + func = (std::shared_ptr<DataManBase>(*)())dlsym(so, "getMan"); + if (func) + { + return func(); + } + } +#endif + std::string soname = "lib" + method + "man.so"; + so = dlopen(soname.c_str(), RTLD_NOW); + if (so) + { + std::shared_ptr<DataManBase> (*func)() = NULL; + func = (std::shared_ptr<DataManBase>(*)())dlsym(so, "getMan"); + if (func) + { + return func(); + } + else + { + logging("getMan() not found in " + soname); + } + } + else + { + logging("Dynamic library " + soname + " not found in LD_LIBRARY_PATH"); + } + return nullptr; +} + +// end namespace realtime +} +// end namespace adios +}