diff --git a/include/engine/dataman/DataManReader.h b/include/engine/dataman/DataManReader.h index 1c943dcb61d5bb81111b675ee79dd7f8a613a537..dfa3d77f4f03eeff558b1169234bff13e625f970 100644 --- a/include/engine/dataman/DataManReader.h +++ b/include/engine/dataman/DataManReader.h @@ -19,7 +19,7 @@ // supported capsules #include "capsule/heap/STLVector.h" -#include "utilities/realtime/dataman/DataManager.h" +#include "utilities/realtime/dataman/DataMan.h" namespace adios { @@ -38,6 +38,7 @@ public: * @param debugMode * @param nthreads */ + using json = nlohmann::json; DataManReader(ADIOS &adios, const std::string &name, const std::string accessMode, MPI_Comm mpiComm, const Method &method); @@ -108,7 +109,7 @@ private: /// m_Transports bool m_DoRealTime = false; - realtime::DataManager m_Man; + realtime::DataMan m_Man; std::function<void(const void *, std::string, std::string, std::string, Dims)> m_CallBack; ///< call back function diff --git a/include/engine/dataman/DataManWriter.h b/include/engine/dataman/DataManWriter.h index d0a17c81d74350f6ac0357982bc0f2d0848c5e31..667498dbc25344a4c981a9152bcc51b1a79a80bd 100644 --- a/include/engine/dataman/DataManWriter.h +++ b/include/engine/dataman/DataManWriter.h @@ -20,7 +20,7 @@ // supported capsules #include "capsule/heap/STLVector.h" -#include "utilities/realtime/dataman/DataManager.h" +#include "utilities/realtime/dataman/DataMan.h" namespace adios { @@ -29,6 +29,7 @@ class DataManWriter : public Engine { public: + using json = nlohmann::json; /** * Constructor for dataman engine Writer for WAN communications * @param adios @@ -106,7 +107,7 @@ private: bool m_DoRealTime = false; bool m_DoMonitor = false; - realtime::DataManager m_Man; + realtime::DataMan m_Man; std::function<void(const void *, std::string, std::string, std::string, Dims)> m_CallBack; ///< call back function @@ -157,10 +158,10 @@ private: MPI_Barrier(m_MPIComm); std::cout << "I am hooked to the DataMan library\n"; std::cout << "putshape " << variable.m_LocalDimensions.size() - << endl; + << std::endl; std::cout << "varshape " << variable.m_GlobalDimensions.size() - << endl; - std::cout << "offset " << variable.m_Offsets.size() << endl; + << std::endl; + std::cout << "offset " << variable.m_Offsets.size() << std::endl; for (int i = 0; i < m_SizeMPI; ++i) { if (i == m_RankMPI) diff --git a/include/utilities/realtime/dataman/DataMan.h b/include/utilities/realtime/dataman/DataMan.h index e5d2fff41e77b55a6d83e3b4ea14c931abdbb37a..59640e933fd9e1a5dc12800a2b3e924ff42bf92c 100644 --- a/include/utilities/realtime/dataman/DataMan.h +++ b/include/utilities/realtime/dataman/DataMan.h @@ -1,464 +1,50 @@ -#ifndef DATAMAN_H_ -#define DATAMAN_H_ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * DataMan.h + * + * Created on: Apr 12, 2017 + * Author: Jason Wang + */ -#include <chrono> -#include <cstdint> -#include <dlfcn.h> -#include <functional> -#include <iostream> -#include <memory> -#include <string> -#include <unistd.h> -#include <vector> +#ifndef DATAMANAGER_H_ +#define DATAMANAGER_H_ -#include "external/json.hpp" - -using json = nlohmann::json; -using namespace std; +#include "utilities/realtime/dataman/DataManBase.h" namespace adios { namespace realtime { -class DataMan +class DataMan : public DataManBase { public: - DataMan() - { - m_profiling["total_manager_time"] = 0.0f; - m_profiling["total_mb"] = 0.0f; - m_start_time = chrono::system_clock::now(); - } - virtual ~DataMan() {} - int put(const void *p_data, string p_doid, string p_var, string p_dtype, - vector<size_t> p_putshape, vector<size_t> p_varshape, - vector<size_t> p_offset, size_t p_timestep, int p_tolerance = 0, - int p_priority = 100) - { - json msg; - msg["doid"] = p_doid; - msg["var"] = p_var; - msg["dtype"] = p_dtype; - msg["putshape"] = p_putshape; - msg["putbytes"] = product(p_putshape, dsize(p_dtype)); - msg["varshape"] = p_varshape; - msg["varbytes"] = product(p_varshape, dsize(p_dtype)); - msg["offset"] = p_offset; - msg["timestep"] = p_timestep; - msg["tolerance"] = p_tolerance; - msg["priority"] = p_priority; - return put(p_data, msg); - } - - virtual int put_begin(const void *p_data, json &p_jmsg) - { - check_shape(p_jmsg); - p_jmsg["profiling"] = m_profiling; - m_step_time = chrono::system_clock::now(); - return 0; - } - - virtual int put_end(const void *p_data, json &p_jmsg) - { - auto end = chrono::system_clock::now(); - chrono::duration<double> duration = end - m_step_time; - m_profiling["total_manager_time"] = - m_profiling["total_manager_time"].get<double>() + duration.count(); - m_profiling["total_mb"] = - m_profiling["total_mb"].get<size_t>() + - product(p_jmsg["varshape"], dsize(p_jmsg["dtype"])) / 1000000.0f; - duration = end - m_start_time; - m_profiling["total_workflow_time"] = duration.count(); - m_profiling["workflow_mbs"] = - m_profiling["total_mb"].get<double>() / - m_profiling["total_workflow_time"].get<double>(); - m_profiling["manager_mbs"] = - m_profiling["total_mb"].get<double>() / - m_profiling["total_manager_time"].get<double>(); - if (p_jmsg["compressed_size"] != nullptr) - p_jmsg["putbytes"] = p_jmsg["compressed_size"].get<size_t>(); - put_next(p_data, p_jmsg); - return 0; - } - - virtual int put(const void *p_data, json p_jmsg) = 0; - - int get(void *p_data, string p_doid, string p_var, string p_dtype, - vector<size_t> p_getshape, vector<size_t> p_varshape, - vector<size_t> p_offset, size_t p_timestep) - { - json msg; - msg["doid"] = p_doid; - msg["var"] = p_var; - msg["dtype"] = p_dtype; - msg["getshape"] = p_getshape; - msg["varshape"] = p_varshape; - msg["offset"] = p_offset; - msg["timestep"] = p_timestep; - return get(p_data, msg); - } - - int get(void *p_data, string p_doid, string p_var, string &p_dtype, - vector<size_t> &p_varshape, size_t &p_timestep) - { - json msg; - msg["doid"] = p_doid; - msg["var"] = p_var; - return get(p_data, msg); - } - - virtual int get(void *p_data, json &p_jmsg) = 0; - virtual int init(json p_jmsg) = 0; - virtual void flush() = 0; - virtual string name() = 0; - virtual string type() = 0; - void reg_callback(std::function<void(const void *, string, string, string, - vector<size_t>)> - cb) - { - if (m_next.size() == 0) - { - m_callback = cb; - } - else - { - for (auto i : m_next) - { - i.second->reg_callback(cb); - } - } - } - - void dump(const void *p_data, json p_jmsg) - { - vector<size_t> p_varshape = p_jmsg["varshape"].get<vector<size_t>>(); - string dtype = p_jmsg["dtype"]; - size_t length = p_jmsg["dumplength"].get<size_t>(); - size_t s = 0; - for (size_t i = 0; i < product(p_varshape, 1); i++) - { - s++; - cout << ((float *)p_data)[i] << " "; - if (s == length) - { - cout << endl; - s = 0; - } - } - cout << endl; - } - - void add_next(string p_name, shared_ptr<DataMan> p_next) - { - m_next[p_name] = p_next; - } - - void remove_next(string p_name) { m_next.erase(p_name); } - - bool have_next() - { - if (m_next.size() == 0) - { - return false; - } - else - { - return true; - } - } - - void print_next() - { - for (auto i : m_next) - { - cout << i.second->name() << " -> "; - i.second->print_next(); - cout << endl; - } - } - - virtual void transform(const void *p_in, void *p_out, json &p_jmsg) = 0; - -protected: - bool auto_transform(const void *p_in, void *p_out, json &p_jmsg) - { - if (p_jmsg["compression_method"] != nullptr) - { - auto method = p_jmsg["compression_method"]; - auto man = get_man(method); - if (man == nullptr) - { - logging("Library file for compression method " + - p_jmsg["compression_method"].dump() + " not found!"); - return false; - } - man->transform(p_in, p_out, p_jmsg); - p_jmsg.erase("compression_method"); - p_jmsg.erase("compression_rate"); - p_jmsg.erase("compressed_size"); - return true; - } - else - { - return false; - } - } - - void add_man_to_path(string p_new, string p_path) - { - if (m_next.count(p_path) > 0) - { - auto man = get_man(p_new); - man->add_next(p_path, m_next[p_path]); - this->add_next(p_new, man); - this->remove_next(p_path); - } - } - - inline void logging(string p_msg, string p_man = "") - { - if (p_man == "") - p_man = name(); - cout << "["; - cout << p_man; - cout << "]"; - cout << " "; - cout << p_msg; - cout << endl; - } - - inline bool check_json(json p_jmsg, vector<string> p_strings, - string p_man = "") - { - if (p_man == "") - p_man = name(); - for (auto i : p_strings) - { - if (p_jmsg[i] == nullptr) - { - if (p_man != "") - { - logging("JSON key " + i + " not found!", p_man); - } - return false; - } - } - return true; - } - - virtual int flush_next() - { - for (auto i : m_next) - { - i.second->flush(); - } - return 0; - } - - virtual int put_next(const void *p_data, json p_jmsg) - { - for (auto i : m_next) - { - i.second->put(p_data, p_jmsg); - } - return 0; - } - - inline size_t product(size_t *shape) - { - size_t s = 1; - if (shape) - { - for (size_t i = 1; i <= shape[0]; i++) - { - s *= shape[i]; - } - } - return s; - } - - inline size_t product(vector<size_t> shape, size_t size = 1) - { - return accumulate(shape.begin(), shape.end(), size, - multiplies<size_t>()); - } - - inline size_t dsize(string dtype) - { - if (dtype == "char") - return sizeof(char); - if (dtype == "short") - return sizeof(short); - if (dtype == "int") - return sizeof(int); - if (dtype == "long") - return sizeof(long); - if (dtype == "unsigned char") - return sizeof(unsigned char); - if (dtype == "unsigned short") - return sizeof(unsigned short); - if (dtype == "unsigned int") - return sizeof(unsigned int); - if (dtype == "unsigned long") - return sizeof(unsigned long); - if (dtype == "float") - return sizeof(float); - if (dtype == "double") - return sizeof(double); - if (dtype == "long double") - return sizeof(long double); - if (dtype == "complex<float>") - return sizeof(float) * 2; - if (dtype == "complex<double>") - return sizeof(double) * 2; - - if (dtype == "int8_t") - return sizeof(int8_t); - if (dtype == "uint8_t") - return sizeof(uint8_t); - if (dtype == "int16_t") - return sizeof(int16_t); - if (dtype == "uint16_t") - return sizeof(uint16_t); - if (dtype == "int32_t") - return sizeof(int32_t); - if (dtype == "uint32_t") - return sizeof(uint32_t); - if (dtype == "int64_t") - return sizeof(int64_t); - if (dtype == "uint64_t") - return sizeof(uint64_t); - return 0; - } - - inline json atoj(unsigned int *array) - { - json j; - if (array) - { - if (array[0] > 0) - { - j = {array[1]}; - for (unsigned int i = 2; i <= array[0]; i++) - { - j.insert(j.end(), array[i]); - } - } - } - return j; - } - - inline string rmquote(string in) { return in.substr(1, in.length() - 2); } - - inline bool isin(string a, json j) - { - for (unsigned int i = 0; i < j.size(); i++) - { - if (j[i] == a) - return true; - } - return false; - } - - inline int closest(int v, json j, bool up) - { - int s = 100, k = 0, t; - for (unsigned int i = 0; i < j.size(); i++) - { - if (up) - t = j[i].get<int>() - v; - else - t = v - j[i].get<int>(); - if (t >= 0 && t < s) - { - s = t; - k = i; - } - } - return k; - } - - inline void check_shape(json &p_jmsg) - { - vector<size_t> varshape; - if (check_json(p_jmsg, {"varshape"})) - { - varshape = p_jmsg["varshape"].get<vector<size_t>>(); - } - else - { - return; - } - if (p_jmsg["putshape"] == nullptr) - { - p_jmsg["putshape"] = varshape; - } - if (p_jmsg["offset"] == nullptr) - { - p_jmsg["offset"] = vector<size_t>(varshape.size(), 0); - } - p_jmsg["putbytes"] = product(p_jmsg["putshape"].get<vector<size_t>>(), - dsize(p_jmsg["dtype"].get<string>())); - p_jmsg["varbytes"] = - product(varshape, dsize(p_jmsg["dtype"].get<string>())); - } - - inline shared_ptr<DataMan> get_man(string method) - { - string soname = "lib" + method + "man.so"; - void *so = NULL; - so = dlopen(soname.c_str(), RTLD_NOW); - if (so) - { - shared_ptr<DataMan> (*func)() = NULL; - func = (shared_ptr<DataMan>(*)())dlsym(so, "getMan"); - if (func) - { - return func(); - } - else - { - logging("getMan() not found in " + soname); - } - } - else - { - logging("Dynamic library " + soname + - " not found in LD_LIBRARY_PATH"); - } - return nullptr; - } - - string run_cmd(string cmd) - { - FILE *pipe = NULL; - char buffer[2048]; - string result; - pipe = popen(cmd.c_str(), "r"); - if (NULL == pipe) - { - perror("pipe"); - return ""; - } - while (!feof(pipe)) - { - if (fgets(buffer, sizeof(buffer), pipe) != NULL) - result = buffer; - } - pclose(pipe); - return result; - } - - std::function<void(const void *, string, string, string, vector<size_t>)> - m_callback; - map<string, shared_ptr<DataMan>> m_next; + DataMan() = default; + virtual ~DataMan() = default; + virtual int init(json p_jmsg); + virtual int put(const void *p_data, json p_jmsg); + virtual int get(void *p_data, json &p_jmsg); + void flush(); + void add_stream(json p_jmsg); + int put(const void *p_data, std::string p_doid, std::string p_var, + std::string p_dtype, std::vector<size_t> p_putshape, + std::vector<size_t> p_varshape, std::vector<size_t> p_offset, + size_t p_timestep, int p_tolerance = 0, int p_priority = 100); + void add_file(std::string p_method); + std::string name() { return "DataManager"; } + std::string type() { return "Manager"; } + virtual void transform(const void *p_in, void *p_out, json &p_jmsg){}; private: - json m_profiling; - chrono::time_point<chrono::system_clock> m_start_time; - chrono::time_point<chrono::system_clock> m_step_time; - bool m_profiling_enabled = false; + std::string m_local_ip = ""; + std::string m_remote_ip = ""; + int m_local_port = 0; + int m_remote_port = 0; + int m_num_channels = 0; + std::vector<int> m_tolerance; + std::vector<int> m_priority; }; // end namespace realtime diff --git a/include/utilities/realtime/dataman/DataManBase.h b/include/utilities/realtime/dataman/DataManBase.h new file mode 100644 index 0000000000000000000000000000000000000000..406eb02bf4d17df7c51e4a827e3b89b659dc951f --- /dev/null +++ b/include/utilities/realtime/dataman/DataManBase.h @@ -0,0 +1,307 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * DataManBase.h + * + * Created on: Apr 12, 2017 + * Author: Jason Wang + */ + +#ifndef DATAMAN_H_ +#define DATAMAN_H_ + +#include <cstdint> + +#include <dlfcn.h> +#include <unistd.h> + +#include <chrono> +#include <complex> +#include <functional> +#include <iostream> +#include <memory> +#include <string> +#include <vector> + +#include "external/json.hpp" + +namespace adios +{ +namespace realtime +{ + +class DataManBase +{ +public: + using json = nlohmann::json; + DataManBase(); + virtual ~DataManBase() = default; + int put(const void *p_data, std::string p_doid, std::string p_var, + std::string p_dtype, std::vector<size_t> p_putshape, + std::vector<size_t> p_varshape, std::vector<size_t> p_offset, + size_t p_timestep, int p_tolerance = 0, int p_priority = 100); + + virtual int put_begin(const void *p_data, json &p_jmsg); + + virtual int put_end(const void *p_data, json &p_jmsg); + + virtual int put(const void *p_data, json p_jmsg) = 0; + + int get(void *p_data, std::string p_doid, std::string p_var, + std::string p_dtype, std::vector<size_t> p_getshape, + std::vector<size_t> p_varshape, std::vector<size_t> p_offset, + size_t p_timestep); + + int get(void *p_data, std::string p_doid, std::string p_var, + std::string &p_dtype, std::vector<size_t> &p_varshape, + size_t &p_timestep); + + virtual int get(void *p_data, json &p_jmsg) = 0; + virtual int init(json p_jmsg) = 0; + virtual void flush() = 0; + virtual std::string name() = 0; + virtual std::string type() = 0; + void reg_callback(std::function<void(const void *, std::string, std::string, + std::string, std::vector<size_t>)> + cb); + + void dump(const void *p_data, json p_jmsg, std::ostream &out = std::cout); + + void add_next(std::string p_name, std::shared_ptr<DataManBase> p_next); + + void remove_next(std::string p_name); + + bool have_next(); + + void print_next(std::ostream &out = std::cout); + + virtual void transform(const void *p_in, void *p_out, json &p_jmsg) = 0; + +protected: + bool auto_transform(const void *p_in, void *p_out, json &p_jmsg); + + void add_man_to_path(std::string p_new, std::string p_path); + + virtual int flush_next(); + + virtual int put_next(const void *p_data, json p_jmsg); + + inline void logging(std::string p_msg, std::string p_man = "", + std::ostream &out = std::cout) + { + if (p_man == "") + p_man = name(); + out << "["; + out << p_man; + out << "]"; + out << " "; + out << p_msg; + out << std::endl; + } + + inline bool check_json(json p_jmsg, std::vector<std::string> p_strings, + std::string p_man = "") + { + if (p_man == "") + p_man = name(); + for (auto i : p_strings) + { + if (p_jmsg[i] == nullptr) + { + if (p_man != "") + { + logging("JSON key " + i + " not found!", p_man); + } + return false; + } + } + return true; + } + + inline size_t product(size_t *shape) + { + size_t s = 1; + if (shape) + { + for (size_t i = 1; i <= shape[0]; i++) + { + s *= shape[i]; + } + } + return s; + } + + inline size_t product(std::vector<size_t> shape, size_t size = 1) + { + return accumulate(shape.begin(), shape.end(), size, + std::multiplies<size_t>()); + } + + inline size_t dsize(std::string dtype) + { + if (dtype == "char") + return sizeof(char); + if (dtype == "short") + return sizeof(short); + if (dtype == "int") + return sizeof(int); + if (dtype == "long") + return sizeof(long); + if (dtype == "unsigned char") + return sizeof(unsigned char); + if (dtype == "unsigned short") + return sizeof(unsigned short); + if (dtype == "unsigned int") + return sizeof(unsigned int); + if (dtype == "unsigned long") + return sizeof(unsigned long); + if (dtype == "float") + return sizeof(float); + if (dtype == "double") + return sizeof(double); + if (dtype == "long double") + return sizeof(long double); + if (dtype == "std::complex<float>" or dtype == "complex<float>") + return sizeof(std::complex<float>); + if (dtype == "std::complex<double>") + return sizeof(std::complex<double>); + + if (dtype == "int8_t") + return sizeof(int8_t); + if (dtype == "uint8_t") + return sizeof(uint8_t); + if (dtype == "int16_t") + return sizeof(int16_t); + if (dtype == "uint16_t") + return sizeof(uint16_t); + if (dtype == "int32_t") + return sizeof(int32_t); + if (dtype == "uint32_t") + return sizeof(uint32_t); + if (dtype == "int64_t") + return sizeof(int64_t); + if (dtype == "uint64_t") + return sizeof(uint64_t); + return 0; + } + + inline json atoj(unsigned int *array) + { + json j; + if (array) + { + if (array[0] > 0) + { + j = {array[1]}; + for (unsigned int i = 2; i <= array[0]; i++) + { + j.insert(j.end(), array[i]); + } + } + } + return j; + } + + inline std::string rmquote(std::string in) + { + return in.substr(1, in.length() - 2); + } + + inline bool isin(std::string a, json j) + { + for (unsigned int i = 0; i < j.size(); i++) + { + if (j[i] == a) + return true; + } + return false; + } + + inline int closest(int v, json j, bool up) + { + int s = 100, k = 0, t; + for (unsigned int i = 0; i < j.size(); i++) + { + if (up) + t = j[i].get<int>() - v; + else + t = v - j[i].get<int>(); + if (t >= 0 && t < s) + { + s = t; + k = i; + } + } + return k; + } + + inline void check_shape(json &p_jmsg) + { + std::vector<size_t> varshape; + if (check_json(p_jmsg, {"varshape"})) + { + varshape = p_jmsg["varshape"].get<std::vector<size_t>>(); + } + else + { + return; + } + if (p_jmsg["putshape"] == nullptr) + { + p_jmsg["putshape"] = varshape; + } + if (p_jmsg["offset"] == nullptr) + { + p_jmsg["offset"] = std::vector<size_t>(varshape.size(), 0); + } + p_jmsg["putbytes"] = + product(p_jmsg["putshape"].get<std::vector<size_t>>(), + dsize(p_jmsg["dtype"].get<std::string>())); + p_jmsg["varbytes"] = + product(varshape, dsize(p_jmsg["dtype"].get<std::string>())); + } + + inline std::shared_ptr<DataManBase> get_man(std::string method) + { + std::string soname = "lib" + method + "man.so"; + void *so = NULL; + so = dlopen(soname.c_str(), RTLD_NOW); + if (so) + { + std::shared_ptr<DataManBase> (*func)() = NULL; + func = (std::shared_ptr<DataManBase>(*)())dlsym(so, "getMan"); + if (func) + { + return func(); + } + else + { + logging("getMan() not found in " + soname); + } + } + else + { + logging("Dynamic library " + soname + + " not found in LD_LIBRARY_PATH"); + } + return nullptr; + } + + std::function<void(const void *, std::string, std::string, std::string, + std::vector<size_t>)> + m_callback; + std::map<std::string, std::shared_ptr<DataManBase>> m_next; + +private: + json m_profiling; + std::chrono::time_point<std::chrono::system_clock> m_start_time; + std::chrono::time_point<std::chrono::system_clock> m_step_time; + bool m_profiling_enabled = false; +}; + +// end namespace realtime +} +// end namespace adios +} +#endif diff --git a/include/utilities/realtime/dataman/DataManager.h b/include/utilities/realtime/dataman/DataManager.h deleted file mode 100644 index 73320ab3bbea2621e04d857cee7d5b86c05f2bcb..0000000000000000000000000000000000000000 --- a/include/utilities/realtime/dataman/DataManager.h +++ /dev/null @@ -1,44 +0,0 @@ -#ifndef DATAMANAGER_H_ -#define DATAMANAGER_H_ - -#include "utilities/realtime/dataman/DataMan.h" - -namespace adios -{ -namespace realtime -{ - -class DataManager : public DataMan -{ -public: - DataManager(); - ~DataManager(); - virtual int init(json p_jmsg); - virtual int put(const void *p_data, json p_jmsg); - virtual int get(void *p_data, json &p_jmsg); - void flush(); - void add_stream(json p_jmsg); - int put(const void *p_data, string p_doid, string p_var, string p_dtype, - vector<size_t> p_putshape, vector<size_t> p_varshape, - vector<size_t> p_offset, size_t p_timestep, int p_tolerance = 0, - int p_priority = 100); - void add_file(string p_method); - string name() { return "DataManager"; } - string type() { return "Manager"; } - virtual void transform(const void *p_in, void *p_out, json &p_jmsg){}; - -private: - string m_local_ip = ""; - string m_remote_ip = ""; - int m_local_port = 0; - int m_remote_port = 0; - int m_num_channels = 0; - vector<int> m_tolerance; - vector<int> m_priority; -}; - -// end namespace realtime -} -// end namespace adios -} -#endif diff --git a/source/ADIOS.cpp b/source/ADIOS.cpp index bc9f4b66252e8c1d8ff8084e306c2a874151e398..b713687182e1d5d5732d51549f39a8d82f4bb80b 100644 --- a/source/ADIOS.cpp +++ b/source/ADIOS.cpp @@ -25,8 +25,10 @@ #include "engine/bp/BPFileReader.h" #include "engine/bp/BPFileWriter.h" +#ifdef ADIOS_HAVE_DATAMAN // external dependencies #include "engine/dataman/DataManReader.h" #include "engine/dataman/DataManWriter.h" +#endif #ifdef ADIOS_HAVE_ADIOS1 // external dependencies #include "engine/adios1/ADIOS1Reader.h" @@ -146,13 +148,25 @@ std::shared_ptr<Engine> ADIOS::Open(const std::string &name, } else if (type == "DataManWriter") { +#ifdef ADIOS_HAVE_DATAMAN return std::make_shared<DataManWriter>(*this, name, accessMode, mpiComm, method); +#else + throw std::invalid_argument( + "ERROR: this version didn't compile with " + "Dataman library, can't Open DataManWriter\n"); +#endif } else if (type == "DataManReader") { +#ifdef ADIOS_HAVE_DATAMAN return std::make_shared<DataManReader>(*this, name, accessMode, mpiComm, method); +#else + throw std::invalid_argument( + "ERROR: this version didn't compile with " + "Dataman library, can't Open DataManReader\n"); +#endif } else if (type == "ADIOS1Writer") { diff --git a/source/CMakeLists.txt b/source/CMakeLists.txt index c07a408a078d39a4257c8a9b58e16a5a0a36be10..35b8d413c5d9107802df4a5dd6749de3b3b9e2d6 100644 --- a/source/CMakeLists.txt +++ b/source/CMakeLists.txt @@ -41,7 +41,8 @@ foreach(adios2_target IN LISTS adios2_targets) utilities/profiling/iochrono/Timer.cpp - utilities/realtime/dataman/DataManager.cpp + utilities/realtime/dataman/DataManBase.cpp + utilities/realtime/dataman/DataMan.cpp ) target_include_directories(${adios2_target} PUBLIC ${ADIOS_SOURCE_DIR}/include diff --git a/source/utilities/realtime/dataman/DataManager.cpp b/source/utilities/realtime/dataman/DataMan.cpp similarity index 52% rename from source/utilities/realtime/dataman/DataManager.cpp rename to source/utilities/realtime/dataman/DataMan.cpp index 8f2efa25b64d4eb084b2f05913c337e5c3939d5f..337fc01876adf31727560f4e17a2cffea3ddc8a8 100644 --- a/source/utilities/realtime/dataman/DataManager.cpp +++ b/source/utilities/realtime/dataman/DataMan.cpp @@ -1,37 +1,44 @@ -#include "utilities/realtime/dataman/DataManager.h" +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * DataMan.cpp + * + * Created on: Apr 12, 2017 + * Author: Jason Wang + */ + +#include "utilities/realtime/dataman/DataMan.h" namespace adios { namespace realtime { -DataManager::DataManager() : DataMan() {} -DataManager::~DataManager() {} +int DataMan::init(json p_jmsg) { return 0; } -int DataManager::init(json p_jmsg) { return 0; } - -int DataManager::put(const void *p_data, string p_doid, string p_var, - string p_dtype, vector<size_t> p_putshape, - vector<size_t> p_varshape, vector<size_t> p_offset, - size_t p_timestep, int p_tolerance, int p_priority) +int DataMan::put(const void *p_data, std::string p_doid, std::string p_var, + std::string p_dtype, std::vector<size_t> p_putshape, + std::vector<size_t> p_varshape, std::vector<size_t> p_offset, + size_t p_timestep, int p_tolerance, int p_priority) { return DataMan::put(p_data, p_doid, p_var, p_dtype, p_putshape, p_varshape, p_offset, p_timestep, p_tolerance, p_priority); } -int DataManager::put(const void *p_data, json p_jmsg) +int DataMan::put(const void *p_data, json p_jmsg) { put_begin(p_data, p_jmsg); put_end(p_data, p_jmsg); return 0; } -void DataManager::add_file(string p_method) {} +void DataMan::add_file(std::string p_method) {} -void DataManager::add_stream(json p_jmsg) +void DataMan::add_stream(json p_jmsg) { - string method = "zmq"; + std::string method = "zmq"; if (p_jmsg["method"] != nullptr) method = p_jmsg["method"]; @@ -60,9 +67,9 @@ void DataManager::add_stream(json p_jmsg) add_man_to_path("zfp", method); } -void DataManager::flush() { flush_next(); } +void DataMan::flush() { flush_next(); } -int DataManager::get(void *p_data, json &p_jmsg) { return 0; } +int DataMan::get(void *p_data, json &p_jmsg) { return 0; } // end namespace realtime } diff --git a/source/utilities/realtime/dataman/DataManBase.cpp b/source/utilities/realtime/dataman/DataManBase.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b4d6c3a4b04d86a5f5268b83b84af0828659b455 --- /dev/null +++ b/source/utilities/realtime/dataman/DataManBase.cpp @@ -0,0 +1,227 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * DataManBase.cpp + * + * Created on: Apr 12, 2017 + * Author: Jason Wang + */ + +#include "utilities/realtime/dataman/DataManBase.h" + +namespace adios +{ +namespace realtime +{ + +DataManBase::DataManBase() +{ + m_profiling["total_manager_time"] = 0.0f; + m_profiling["total_mb"] = 0.0f; + m_start_time = std::chrono::system_clock::now(); +} + +int DataManBase::put(const void *p_data, std::string p_doid, std::string p_var, + std::string p_dtype, std::vector<size_t> p_putshape, + std::vector<size_t> p_varshape, + std::vector<size_t> p_offset, size_t p_timestep, + int p_tolerance, int p_priority) +{ + json msg; + msg["doid"] = p_doid; + msg["var"] = p_var; + msg["dtype"] = p_dtype; + msg["putshape"] = p_putshape; + msg["putbytes"] = product(p_putshape, dsize(p_dtype)); + msg["varshape"] = p_varshape; + msg["varbytes"] = product(p_varshape, dsize(p_dtype)); + msg["offset"] = p_offset; + msg["timestep"] = p_timestep; + msg["tolerance"] = p_tolerance; + msg["priority"] = p_priority; + return put(p_data, msg); +} + +int DataManBase::put_begin(const void *p_data, json &p_jmsg) +{ + check_shape(p_jmsg); + p_jmsg["profiling"] = m_profiling; + m_step_time = std::chrono::system_clock::now(); + return 0; +} + +int DataManBase::put_end(const void *p_data, json &p_jmsg) +{ + auto end = std::chrono::system_clock::now(); + std::chrono::duration<double> duration = end - m_step_time; + m_profiling["total_manager_time"] = + m_profiling["total_manager_time"].get<double>() + duration.count(); + m_profiling["total_mb"] = + m_profiling["total_mb"].get<size_t>() + + product(p_jmsg["varshape"], dsize(p_jmsg["dtype"])) / 1000000.0f; + duration = end - m_start_time; + m_profiling["total_workflow_time"] = duration.count(); + m_profiling["workflow_mbs"] = + m_profiling["total_mb"].get<double>() / + m_profiling["total_workflow_time"].get<double>(); + m_profiling["manager_mbs"] = + m_profiling["total_mb"].get<double>() / + m_profiling["total_manager_time"].get<double>(); + if (p_jmsg["compressed_size"] != nullptr) + p_jmsg["putbytes"] = p_jmsg["compressed_size"].get<size_t>(); + put_next(p_data, p_jmsg); + return 0; +} + +int DataManBase::get(void *p_data, std::string p_doid, std::string p_var, + std::string p_dtype, std::vector<size_t> p_getshape, + std::vector<size_t> p_varshape, + std::vector<size_t> p_offset, size_t p_timestep) +{ + json msg; + msg["doid"] = p_doid; + msg["var"] = p_var; + msg["dtype"] = p_dtype; + msg["getshape"] = p_getshape; + msg["varshape"] = p_varshape; + msg["offset"] = p_offset; + msg["timestep"] = p_timestep; + return get(p_data, msg); +} + +int DataManBase::get(void *p_data, std::string p_doid, std::string p_var, + std::string &p_dtype, std::vector<size_t> &p_varshape, + size_t &p_timestep) +{ + json msg; + msg["doid"] = p_doid; + msg["var"] = p_var; + return get(p_data, msg); +} + +void DataManBase::reg_callback( + std::function<void(const void *, std::string, std::string, std::string, + std::vector<size_t>)> + cb) +{ + if (m_next.size() == 0) + { + m_callback = cb; + } + else + { + for (auto i : m_next) + { + i.second->reg_callback(cb); + } + } +} + +void DataManBase::dump(const void *p_data, json p_jmsg, std::ostream &out) +{ + std::vector<size_t> p_varshape = + p_jmsg["varshape"].get<std::vector<size_t>>(); + std::string dtype = p_jmsg["dtype"]; + size_t length = p_jmsg["dumplength"].get<size_t>(); + size_t s = 0; + for (size_t i = 0; i < product(p_varshape, 1); i++) + { + s++; + out << ((float *)p_data)[i] << " "; + if (s == length) + { + out << std::endl; + s = 0; + } + } + out << std::endl; +} + +void DataManBase::add_next(std::string p_name, + std::shared_ptr<DataManBase> p_next) +{ + m_next[p_name] = p_next; +} + +void DataManBase::remove_next(std::string p_name) { m_next.erase(p_name); } + +bool DataManBase::have_next() +{ + if (m_next.size() == 0) + { + return false; + } + else + { + return true; + } +} + +void DataManBase::print_next(std::ostream &out) +{ + for (auto i : m_next) + { + out << i.second->name() << " -> "; + i.second->print_next(); + out << std::endl; + } +} + +bool DataManBase::auto_transform(const void *p_in, void *p_out, json &p_jmsg) +{ + if (p_jmsg["compression_method"] != nullptr) + { + auto method = p_jmsg["compression_method"]; + auto man = get_man(method); + if (man == nullptr) + { + logging("Library file for compression method " + + p_jmsg["compression_method"].dump() + " not found!"); + return false; + } + man->transform(p_in, p_out, p_jmsg); + p_jmsg.erase("compression_method"); + p_jmsg.erase("compression_rate"); + p_jmsg.erase("compressed_size"); + return true; + } + else + { + return false; + } +} + +void DataManBase::add_man_to_path(std::string p_new, std::string p_path) +{ + if (m_next.count(p_path) > 0) + { + auto man = get_man(p_new); + man->add_next(p_path, m_next[p_path]); + this->add_next(p_new, man); + this->remove_next(p_path); + } +} + +int DataManBase::flush_next() +{ + for (auto i : m_next) + { + i.second->flush(); + } + return 0; +} + +int DataManBase::put_next(const void *p_data, json p_jmsg) +{ + for (auto i : m_next) + { + i.second->put(p_data, p_jmsg); + } + return 0; +} + +// end namespace realtime +} +// end namespace adios +}