diff --git a/include/engine/dataman/DataManReader.h b/include/engine/dataman/DataManReader.h index f82e649bf9851cb68cafb3006461703474efcfa9..1c943dcb61d5bb81111b675ee79dd7f8a613a537 100644 --- a/include/engine/dataman/DataManReader.h +++ b/include/engine/dataman/DataManReader.h @@ -19,7 +19,7 @@ // supported capsules #include "capsule/heap/STLVector.h" -#include "DataManager.h" +#include "utilities/realtime/dataman/DataManager.h" namespace adios { @@ -108,7 +108,7 @@ private: /// m_Transports bool m_DoRealTime = false; - DataManager m_Man; + realtime::DataManager m_Man; std::function<void(const void *, std::string, std::string, std::string, Dims)> m_CallBack; ///< call back function diff --git a/include/engine/dataman/DataManWriter.h b/include/engine/dataman/DataManWriter.h index 6116de429282a89f21f7208827d09ba07c885e51..d0a17c81d74350f6ac0357982bc0f2d0848c5e31 100644 --- a/include/engine/dataman/DataManWriter.h +++ b/include/engine/dataman/DataManWriter.h @@ -20,7 +20,7 @@ // supported capsules #include "capsule/heap/STLVector.h" -#include "DataManager.h" //here comes your DataMan header +#include "utilities/realtime/dataman/DataManager.h" namespace adios { @@ -106,7 +106,7 @@ private: bool m_DoRealTime = false; bool m_DoMonitor = false; - DataManager m_Man; + realtime::DataManager m_Man; std::function<void(const void *, std::string, std::string, std::string, Dims)> m_CallBack; ///< call back function diff --git a/include/utilities/realtime/dataman/DataMan.h b/include/utilities/realtime/dataman/DataMan.h new file mode 100644 index 0000000000000000000000000000000000000000..e5d2fff41e77b55a6d83e3b4ea14c931abdbb37a --- /dev/null +++ b/include/utilities/realtime/dataman/DataMan.h @@ -0,0 +1,468 @@ +#ifndef DATAMAN_H_ +#define DATAMAN_H_ + +#include <chrono> +#include <cstdint> +#include <dlfcn.h> +#include <functional> +#include <iostream> +#include <memory> +#include <string> +#include <unistd.h> +#include <vector> + +#include "external/json.hpp" + +using json = nlohmann::json; +using namespace std; + +namespace adios +{ +namespace realtime +{ + +class DataMan +{ +public: + DataMan() + { + m_profiling["total_manager_time"] = 0.0f; + m_profiling["total_mb"] = 0.0f; + m_start_time = chrono::system_clock::now(); + } + virtual ~DataMan() {} + int put(const void *p_data, string p_doid, string p_var, string p_dtype, + vector<size_t> p_putshape, vector<size_t> p_varshape, + vector<size_t> p_offset, size_t p_timestep, int p_tolerance = 0, + int p_priority = 100) + { + json msg; + msg["doid"] = p_doid; + msg["var"] = p_var; + msg["dtype"] = p_dtype; + msg["putshape"] = p_putshape; + msg["putbytes"] = product(p_putshape, dsize(p_dtype)); + msg["varshape"] = p_varshape; + msg["varbytes"] = product(p_varshape, dsize(p_dtype)); + msg["offset"] = p_offset; + msg["timestep"] = p_timestep; + msg["tolerance"] = p_tolerance; + msg["priority"] = p_priority; + return put(p_data, msg); + } + + virtual int put_begin(const void *p_data, json &p_jmsg) + { + check_shape(p_jmsg); + p_jmsg["profiling"] = m_profiling; + m_step_time = chrono::system_clock::now(); + return 0; + } + + virtual int put_end(const void *p_data, json &p_jmsg) + { + auto end = chrono::system_clock::now(); + chrono::duration<double> duration = end - m_step_time; + m_profiling["total_manager_time"] = + m_profiling["total_manager_time"].get<double>() + duration.count(); + m_profiling["total_mb"] = + m_profiling["total_mb"].get<size_t>() + + product(p_jmsg["varshape"], dsize(p_jmsg["dtype"])) / 1000000.0f; + duration = end - m_start_time; + m_profiling["total_workflow_time"] = duration.count(); + m_profiling["workflow_mbs"] = + m_profiling["total_mb"].get<double>() / + m_profiling["total_workflow_time"].get<double>(); + m_profiling["manager_mbs"] = + m_profiling["total_mb"].get<double>() / + m_profiling["total_manager_time"].get<double>(); + if (p_jmsg["compressed_size"] != nullptr) + p_jmsg["putbytes"] = p_jmsg["compressed_size"].get<size_t>(); + put_next(p_data, p_jmsg); + return 0; + } + + virtual int put(const void *p_data, json p_jmsg) = 0; + + int get(void *p_data, string p_doid, string p_var, string p_dtype, + vector<size_t> p_getshape, vector<size_t> p_varshape, + vector<size_t> p_offset, size_t p_timestep) + { + json msg; + msg["doid"] = p_doid; + msg["var"] = p_var; + msg["dtype"] = p_dtype; + msg["getshape"] = p_getshape; + msg["varshape"] = p_varshape; + msg["offset"] = p_offset; + msg["timestep"] = p_timestep; + return get(p_data, msg); + } + + int get(void *p_data, string p_doid, string p_var, string &p_dtype, + vector<size_t> &p_varshape, size_t &p_timestep) + { + json msg; + msg["doid"] = p_doid; + msg["var"] = p_var; + return get(p_data, msg); + } + + virtual int get(void *p_data, json &p_jmsg) = 0; + virtual int init(json p_jmsg) = 0; + virtual void flush() = 0; + virtual string name() = 0; + virtual string type() = 0; + void reg_callback(std::function<void(const void *, string, string, string, + vector<size_t>)> + cb) + { + if (m_next.size() == 0) + { + m_callback = cb; + } + else + { + for (auto i : m_next) + { + i.second->reg_callback(cb); + } + } + } + + void dump(const void *p_data, json p_jmsg) + { + vector<size_t> p_varshape = p_jmsg["varshape"].get<vector<size_t>>(); + string dtype = p_jmsg["dtype"]; + size_t length = p_jmsg["dumplength"].get<size_t>(); + size_t s = 0; + for (size_t i = 0; i < product(p_varshape, 1); i++) + { + s++; + cout << ((float *)p_data)[i] << " "; + if (s == length) + { + cout << endl; + s = 0; + } + } + cout << endl; + } + + void add_next(string p_name, shared_ptr<DataMan> p_next) + { + m_next[p_name] = p_next; + } + + void remove_next(string p_name) { m_next.erase(p_name); } + + bool have_next() + { + if (m_next.size() == 0) + { + return false; + } + else + { + return true; + } + } + + void print_next() + { + for (auto i : m_next) + { + cout << i.second->name() << " -> "; + i.second->print_next(); + cout << endl; + } + } + + virtual void transform(const void *p_in, void *p_out, json &p_jmsg) = 0; + +protected: + bool auto_transform(const void *p_in, void *p_out, json &p_jmsg) + { + if (p_jmsg["compression_method"] != nullptr) + { + auto method = p_jmsg["compression_method"]; + auto man = get_man(method); + if (man == nullptr) + { + logging("Library file for compression method " + + p_jmsg["compression_method"].dump() + " not found!"); + return false; + } + man->transform(p_in, p_out, p_jmsg); + p_jmsg.erase("compression_method"); + p_jmsg.erase("compression_rate"); + p_jmsg.erase("compressed_size"); + return true; + } + else + { + return false; + } + } + + void add_man_to_path(string p_new, string p_path) + { + if (m_next.count(p_path) > 0) + { + auto man = get_man(p_new); + man->add_next(p_path, m_next[p_path]); + this->add_next(p_new, man); + this->remove_next(p_path); + } + } + + inline void logging(string p_msg, string p_man = "") + { + if (p_man == "") + p_man = name(); + cout << "["; + cout << p_man; + cout << "]"; + cout << " "; + cout << p_msg; + cout << endl; + } + + inline bool check_json(json p_jmsg, vector<string> p_strings, + string p_man = "") + { + if (p_man == "") + p_man = name(); + for (auto i : p_strings) + { + if (p_jmsg[i] == nullptr) + { + if (p_man != "") + { + logging("JSON key " + i + " not found!", p_man); + } + return false; + } + } + return true; + } + + virtual int flush_next() + { + for (auto i : m_next) + { + i.second->flush(); + } + return 0; + } + + virtual int put_next(const void *p_data, json p_jmsg) + { + for (auto i : m_next) + { + i.second->put(p_data, p_jmsg); + } + return 0; + } + + inline size_t product(size_t *shape) + { + size_t s = 1; + if (shape) + { + for (size_t i = 1; i <= shape[0]; i++) + { + s *= shape[i]; + } + } + return s; + } + + inline size_t product(vector<size_t> shape, size_t size = 1) + { + return accumulate(shape.begin(), shape.end(), size, + multiplies<size_t>()); + } + + inline size_t dsize(string dtype) + { + if (dtype == "char") + return sizeof(char); + if (dtype == "short") + return sizeof(short); + if (dtype == "int") + return sizeof(int); + if (dtype == "long") + return sizeof(long); + if (dtype == "unsigned char") + return sizeof(unsigned char); + if (dtype == "unsigned short") + return sizeof(unsigned short); + if (dtype == "unsigned int") + return sizeof(unsigned int); + if (dtype == "unsigned long") + return sizeof(unsigned long); + if (dtype == "float") + return sizeof(float); + if (dtype == "double") + return sizeof(double); + if (dtype == "long double") + return sizeof(long double); + if (dtype == "complex<float>") + return sizeof(float) * 2; + if (dtype == "complex<double>") + return sizeof(double) * 2; + + if (dtype == "int8_t") + return sizeof(int8_t); + if (dtype == "uint8_t") + return sizeof(uint8_t); + if (dtype == "int16_t") + return sizeof(int16_t); + if (dtype == "uint16_t") + return sizeof(uint16_t); + if (dtype == "int32_t") + return sizeof(int32_t); + if (dtype == "uint32_t") + return sizeof(uint32_t); + if (dtype == "int64_t") + return sizeof(int64_t); + if (dtype == "uint64_t") + return sizeof(uint64_t); + return 0; + } + + inline json atoj(unsigned int *array) + { + json j; + if (array) + { + if (array[0] > 0) + { + j = {array[1]}; + for (unsigned int i = 2; i <= array[0]; i++) + { + j.insert(j.end(), array[i]); + } + } + } + return j; + } + + inline string rmquote(string in) { return in.substr(1, in.length() - 2); } + + inline bool isin(string a, json j) + { + for (unsigned int i = 0; i < j.size(); i++) + { + if (j[i] == a) + return true; + } + return false; + } + + inline int closest(int v, json j, bool up) + { + int s = 100, k = 0, t; + for (unsigned int i = 0; i < j.size(); i++) + { + if (up) + t = j[i].get<int>() - v; + else + t = v - j[i].get<int>(); + if (t >= 0 && t < s) + { + s = t; + k = i; + } + } + return k; + } + + inline void check_shape(json &p_jmsg) + { + vector<size_t> varshape; + if (check_json(p_jmsg, {"varshape"})) + { + varshape = p_jmsg["varshape"].get<vector<size_t>>(); + } + else + { + return; + } + if (p_jmsg["putshape"] == nullptr) + { + p_jmsg["putshape"] = varshape; + } + if (p_jmsg["offset"] == nullptr) + { + p_jmsg["offset"] = vector<size_t>(varshape.size(), 0); + } + p_jmsg["putbytes"] = product(p_jmsg["putshape"].get<vector<size_t>>(), + dsize(p_jmsg["dtype"].get<string>())); + p_jmsg["varbytes"] = + product(varshape, dsize(p_jmsg["dtype"].get<string>())); + } + + inline shared_ptr<DataMan> get_man(string method) + { + string soname = "lib" + method + "man.so"; + void *so = NULL; + so = dlopen(soname.c_str(), RTLD_NOW); + if (so) + { + shared_ptr<DataMan> (*func)() = NULL; + func = (shared_ptr<DataMan>(*)())dlsym(so, "getMan"); + if (func) + { + return func(); + } + else + { + logging("getMan() not found in " + soname); + } + } + else + { + logging("Dynamic library " + soname + + " not found in LD_LIBRARY_PATH"); + } + return nullptr; + } + + string run_cmd(string cmd) + { + FILE *pipe = NULL; + char buffer[2048]; + string result; + pipe = popen(cmd.c_str(), "r"); + if (NULL == pipe) + { + perror("pipe"); + return ""; + } + while (!feof(pipe)) + { + if (fgets(buffer, sizeof(buffer), pipe) != NULL) + result = buffer; + } + pclose(pipe); + return result; + } + + std::function<void(const void *, string, string, string, vector<size_t>)> + m_callback; + map<string, shared_ptr<DataMan>> m_next; + +private: + json m_profiling; + chrono::time_point<chrono::system_clock> m_start_time; + chrono::time_point<chrono::system_clock> m_step_time; + bool m_profiling_enabled = false; +}; + +// end namespace realtime +} +// end namespace adios +} +#endif diff --git a/include/utilities/realtime/dataman/DataManager.h b/include/utilities/realtime/dataman/DataManager.h new file mode 100644 index 0000000000000000000000000000000000000000..73320ab3bbea2621e04d857cee7d5b86c05f2bcb --- /dev/null +++ b/include/utilities/realtime/dataman/DataManager.h @@ -0,0 +1,44 @@ +#ifndef DATAMANAGER_H_ +#define DATAMANAGER_H_ + +#include "utilities/realtime/dataman/DataMan.h" + +namespace adios +{ +namespace realtime +{ + +class DataManager : public DataMan +{ +public: + DataManager(); + ~DataManager(); + virtual int init(json p_jmsg); + virtual int put(const void *p_data, json p_jmsg); + virtual int get(void *p_data, json &p_jmsg); + void flush(); + void add_stream(json p_jmsg); + int put(const void *p_data, string p_doid, string p_var, string p_dtype, + vector<size_t> p_putshape, vector<size_t> p_varshape, + vector<size_t> p_offset, size_t p_timestep, int p_tolerance = 0, + int p_priority = 100); + void add_file(string p_method); + string name() { return "DataManager"; } + string type() { return "Manager"; } + virtual void transform(const void *p_in, void *p_out, json &p_jmsg){}; + +private: + string m_local_ip = ""; + string m_remote_ip = ""; + int m_local_port = 0; + int m_remote_port = 0; + int m_num_channels = 0; + vector<int> m_tolerance; + vector<int> m_priority; +}; + +// end namespace realtime +} +// end namespace adios +} +#endif diff --git a/source/ADIOS.cpp b/source/ADIOS.cpp index 729e6c8ee7b8732364d7a12561a83baeb98d719a..bc9f4b66252e8c1d8ff8084e306c2a874151e398 100644 --- a/source/ADIOS.cpp +++ b/source/ADIOS.cpp @@ -25,10 +25,8 @@ #include "engine/bp/BPFileReader.h" #include "engine/bp/BPFileWriter.h" -#ifdef ADIOS_HAVE_DATAMAN // external dependencies #include "engine/dataman/DataManReader.h" #include "engine/dataman/DataManWriter.h" -#endif #ifdef ADIOS_HAVE_ADIOS1 // external dependencies #include "engine/adios1/ADIOS1Reader.h" @@ -148,25 +146,13 @@ std::shared_ptr<Engine> ADIOS::Open(const std::string &name, } else if (type == "DataManWriter") { -#ifdef ADIOS_HAVE_DATAMAN return std::make_shared<DataManWriter>(*this, name, accessMode, mpiComm, method); -#else - throw std::invalid_argument( - "ERROR: this version didn't compile with " - "Dataman library, can't Open DataManWriter\n"); -#endif } else if (type == "DataManReader") { -#ifdef ADIOS_HAVE_DATAMAN return std::make_shared<DataManReader>(*this, name, accessMode, mpiComm, method); -#else - throw std::invalid_argument( - "ERROR: this version didn't compile with " - "Dataman library, can't Open DataManReader\n"); -#endif } else if (type == "ADIOS1Writer") { diff --git a/source/CMakeLists.txt b/source/CMakeLists.txt index 2ccf94700c6304df4b8eacf788824956bdc5ac8d..c07a408a078d39a4257c8a9b58e16a5a0a36be10 100644 --- a/source/CMakeLists.txt +++ b/source/CMakeLists.txt @@ -25,6 +25,9 @@ foreach(adios2_target IN LISTS adios2_targets) engine/bp/BPFileReader.cpp engine/bp/BPFileWriter.cpp + engine/dataman/DataManReader.cpp + engine/dataman/DataManWriter.cpp + functions/adiosFunctions.cpp @@ -37,6 +40,8 @@ foreach(adios2_target IN LISTS adios2_targets) utilities/format/bp1/BP1Writer.cpp utilities/profiling/iochrono/Timer.cpp + + utilities/realtime/dataman/DataManager.cpp ) target_include_directories(${adios2_target} PUBLIC ${ADIOS_SOURCE_DIR}/include diff --git a/source/utilities/realtime/dataman/DataManager.cpp b/source/utilities/realtime/dataman/DataManager.cpp new file mode 100644 index 0000000000000000000000000000000000000000..8f2efa25b64d4eb084b2f05913c337e5c3939d5f --- /dev/null +++ b/source/utilities/realtime/dataman/DataManager.cpp @@ -0,0 +1,70 @@ +#include "utilities/realtime/dataman/DataManager.h" + +namespace adios +{ +namespace realtime +{ + +DataManager::DataManager() : DataMan() {} +DataManager::~DataManager() {} + +int DataManager::init(json p_jmsg) { return 0; } + +int DataManager::put(const void *p_data, string p_doid, string p_var, + string p_dtype, vector<size_t> p_putshape, + vector<size_t> p_varshape, vector<size_t> p_offset, + size_t p_timestep, int p_tolerance, int p_priority) +{ + return DataMan::put(p_data, p_doid, p_var, p_dtype, p_putshape, p_varshape, + p_offset, p_timestep, p_tolerance, p_priority); +} + +int DataManager::put(const void *p_data, json p_jmsg) +{ + put_begin(p_data, p_jmsg); + put_end(p_data, p_jmsg); + return 0; +} + +void DataManager::add_file(string p_method) {} + +void DataManager::add_stream(json p_jmsg) +{ + + string method = "zmq"; + + if (p_jmsg["method"] != nullptr) + method = p_jmsg["method"]; + + logging("Streaming method " + method + " added"); + + if (m_tolerance.size() < m_num_channels) + { + for (int i = 0; i < m_num_channels; i++) + { + m_tolerance.push_back(0); + } + } + if (m_priority.size() < m_num_channels) + { + for (int i = 0; i < m_num_channels; i++) + { + m_priority.push_back(100 / (i + 1)); + } + } + + auto man = get_man(method); + man->init(p_jmsg); + this->add_next(method, man); + + add_man_to_path("zfp", method); +} + +void DataManager::flush() { flush_next(); } + +int DataManager::get(void *p_data, json &p_jmsg) { return 0; } + +// end namespace realtime +} +// end namespace adios +}