diff --git a/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp b/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp index 65c0ce4e87352b3110b908e3dd02099778f663a6..9353b4552ceb43f8d8157734415c5a1e73bbf584 100644 --- a/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp +++ b/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp @@ -24,10 +24,8 @@ int main(int argc, char *argv[]) adios2::ADIOS adios(adios2::DebugON); adios2::IO &dataManIO = adios.DeclareIO("WANIO"); dataManIO.SetEngine("DataManWriter"); - dataManIO.SetParameters({{"peer-to-peer", "yes"}, - {"real_time", "yes"}, - {"compress", "no"}, - {"method", "dump"}}); + dataManIO.SetParameters( + {{"compress", "no"}, {"method", "dump"}, {"type", "wan"}}); // Define variable and local size auto bpFloats = diff --git a/source/CMakeLists.txt b/source/CMakeLists.txt index 9a02a753447d8906527c7b7cce852e8bec3e044a..1274b1acb469363d42838f2c89bce9305fa083db 100644 --- a/source/CMakeLists.txt +++ b/source/CMakeLists.txt @@ -2,10 +2,6 @@ # Distributed under the OSI-approved Apache License, Version 2.0. See # accompanying file Copyright.txt for details. #------------------------------------------------------------------------------# - -if(ADIOS2_HAVE_DataMan) - add_subdirectory(dataman) -endif() add_subdirectory(adios2) diff --git a/source/adios2/ADIOSConfig.h.in b/source/adios2/ADIOSConfig.h.in index f7e45783cf57a59e2188d6a5c287646a8bf4fc68..b993cbaa486abeb10714f015f6144b64eb1517a4 100644 --- a/source/adios2/ADIOSConfig.h.in +++ b/source/adios2/ADIOSConfig.h.in @@ -39,8 +39,8 @@ /* CMake Option: ADIOS_USE_HDF5=ON */ #cmakedefine ADIOS2_HAVE_HDF5 -/* CMake Option: ADIOS_USE_DataMan=ON */ -#cmakedefine ADIOS2_HAVE_DATAMAN +/* CMake Option: ADIOS_USE_ZeroMQ=ON */ +#cmakedefine ADIOS2_HAVE_ZEROMQ /* CMake Option: ADIOS_USE_SysVShMem=ON */ #cmakedefine ADIOS2_HAVE_SYSVSHMEM diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index 24c36ffe8831289b79f03ddb8a39658ab8978cd2..7c7fde182d24a19d3e018bbd8a933b0ce991fe7b 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -47,6 +47,7 @@ add_library(adios2 toolkit/transport/file/FileStream.cpp toolkit/transportman/TransportMan.cpp + toolkit/transportman/dataman/DataMan.cpp ) target_include_directories(adios2 @@ -66,14 +67,20 @@ if(ADIOS2_HAVE_SysVShMem) target_sources(adios2 PRIVATE toolkit/capsule/shmem/ShmSystemV.cpp) endif() -if(ADIOS2_HAVE_DataMan) +if(ADIOS2_HAVE_ZeroMQ) target_sources(adios2 PRIVATE - engine/dataman/DataManReader.cpp - engine/dataman/DataManWriter.cpp + toolkit/transport/wan/WANZmq.cpp ) - target_link_libraries(adios2 PRIVATE dataman) + target_link_libraries(adios2 PRIVATE ZeroMQ::ZMQ) endif() - + +target_sources(adios2 PRIVATE + engine/dataman/DataManReader.cpp + engine/dataman/DataManWriter.cpp +) +target_link_libraries(adios2 PRIVATE NLohmannJson) + + if(ADIOS2_HAVE_BZip2) target_sources(adios2 PRIVATE transform/compress/CompressBZip2.cpp) target_link_libraries(adios2 PRIVATE BZip2::BZip2) diff --git a/source/adios2/engine/dataman/DataManReader.cpp b/source/adios2/engine/dataman/DataManReader.cpp index 2f7290e75d8c77a3818d6dc9b83d97a0f9437f91..7d6a053595f53dad9a8d2aaa51831dbcda12f948 100644 --- a/source/adios2/engine/dataman/DataManReader.cpp +++ b/source/adios2/engine/dataman/DataManReader.cpp @@ -17,19 +17,19 @@ namespace adios2 DataManReader::DataManReader(IO &io, const std::string &name, const OpenMode openMode, MPI_Comm mpiComm) -: Engine("DataManReader", io, name, openMode, mpiComm) +: Engine("DataManReader", io, name, openMode, mpiComm), m_Man(mpiComm, true) { m_EndMessage = " in call to IO Open DataManReader " + m_Name + "\n"; Init(); } -void DataManReader::SetCallBack( +void DataManReader::SetCallback( std::function<void(const void *, std::string, std::string, std::string, Dims)> callback) { m_CallBack = callback; - m_Man.reg_callback(callback); + m_Man.SetCallback(callback); } void DataManReader::Close(const int transportIndex) {} @@ -80,6 +80,7 @@ void DataManReader::Init() }) == s.end(); }; + /* json jmsg; for (auto &i : m_IO.m_Parameters) { @@ -93,12 +94,24 @@ void DataManReader::Init() } } jmsg["stream_mode"] = "receiver"; - m_Man.add_stream(jmsg); + */ - std::string method_type; - int num_channels = 0; - lf_AssignString("method_type", method_type); - lf_AssignInt("num_channels", num_channels); + int n_Transports = 1; + std::vector<Params> para(n_Transports); + + for (unsigned int i = 0; i < para.size(); i++) + { + para[i]["type"] = "wan"; + para[i]["transport"] = "zmq"; + para[i]["name"] = "stream"; + para[i]["ipaddress"] = "127.0.0.1"; + } + m_Man.OpenWANTransports("zmq", adios2::OpenMode::Read, para, true); + + std::string methodType; + int numChannels = 0; + lf_AssignString("method_type", methodType); + lf_AssignInt("num_channels", numChannels); } else { diff --git a/source/adios2/engine/dataman/DataManReader.h b/source/adios2/engine/dataman/DataManReader.h index 51b41e5ba2ad08291d66e10ec1e5448b1f1541fd..c1c2c42d1403213e1530e1a14fac856f2d983a66 100644 --- a/source/adios2/engine/dataman/DataManReader.h +++ b/source/adios2/engine/dataman/DataManReader.h @@ -13,10 +13,9 @@ #include <iostream> //std::cout << Needs to go -#include <DataMan.h> - #include "adios2/ADIOSConfig.h" #include "adios2/core/Engine.h" +#include "adios2/toolkit/transportman/dataman/DataMan.h" namespace adios2 { @@ -46,7 +45,7 @@ public: * @param callback function (get) provided by the user to be applied in * DataMan */ - void SetCallBack(std::function<void(const void *, std::string, std::string, + void SetCallback(std::function<void(const void *, std::string, std::string, std::string, Dims)> callback); @@ -63,7 +62,7 @@ public: private: bool m_DoRealTime = false; - DataMan m_Man; + transportman::DataMan m_Man; std::function<void(const void *, std::string, std::string, std::string, Dims)> m_CallBack; ///< call back function diff --git a/source/adios2/engine/dataman/DataManWriter.cpp b/source/adios2/engine/dataman/DataManWriter.cpp index 34e61c34f65e2338c8f33b987a08d927771bb55a..95132cc67110cfcbba2791ce6e4bbc4b1b9df5b6 100644 --- a/source/adios2/engine/dataman/DataManWriter.cpp +++ b/source/adios2/engine/dataman/DataManWriter.cpp @@ -20,7 +20,7 @@ namespace adios2 DataManWriter::DataManWriter(IO &io, const std::string &name, const OpenMode openMode, MPI_Comm mpiComm) -: Engine("DataManWriter", io, name, openMode, mpiComm) +: Engine("DataManWriter", io, name, openMode, mpiComm), m_Man(mpiComm, true) { m_EndMessage = ", in call to Open DataManWriter\n"; Init(); @@ -32,12 +32,18 @@ void DataManWriter::SetCallBack( callback) { m_CallBack = callback; - m_Man.reg_callback(callback); + // m_Man.reg_callback(callback); } -void DataManWriter::Advance(const float timeoutSeconds) { m_Man.flush(); } +void DataManWriter::Advance(const float timeoutSeconds) +{ + // m_Man.flush(); +} -void DataManWriter::Close(const int transportIndex) { m_Man.flush(); } +void DataManWriter::Close(const int transportIndex) +{ + // m_Man.flush(); +} // PRIVATE functions below void DataManWriter::Init() @@ -97,20 +103,33 @@ void DataManWriter::Init() }) == s.end(); }; - json jmsg; - for (const auto &i : m_IO.m_Parameters) + // json jmsg; + // for (const auto &i : m_IO.m_Parameters) + // { + // if (lf_IsNumber(i.second)) + // { + // jmsg[i.first] = std::stoi(i.second); + // } + // else + // { + // jmsg[i.first] = i.second; + // } + // } + // jmsg["stream_mode"] = "sender"; + // m_Man.add_stream(jmsg); + + int n_Transports = 1; + std::vector<Params> para(n_Transports); + + for (unsigned int i = 0; i < para.size(); i++) { - if (lf_IsNumber(i.second)) - { - jmsg[i.first] = std::stoi(i.second); - } - else - { - jmsg[i.first] = i.second; - } + para[i]["type"] = "wan"; + para[i]["transport"] = "zmq"; + para[i]["name"] = "stream"; + para[i]["ipaddress"] = "127.0.0.1"; } - jmsg["stream_mode"] = "sender"; - m_Man.add_stream(jmsg); + + m_Man.OpenWANTransports("zmq", adios2::OpenMode::Write, para, true); std::string method_type; lf_AssignString("method_type", method_type); diff --git a/source/adios2/engine/dataman/DataManWriter.h b/source/adios2/engine/dataman/DataManWriter.h index 2f829ec40beee06be296d1dc25ab189eaa308d87..28f90894127b1b92d6953985c2a7bc58eedc2c9a 100644 --- a/source/adios2/engine/dataman/DataManWriter.h +++ b/source/adios2/engine/dataman/DataManWriter.h @@ -12,12 +12,10 @@ #define ADIOS2_ENGINE_DATAMAN_DATAMAN_WRITER_H_ #include <iostream> //std::cout must be removed, only used for hello example -#include <unistd.h> //sleep must be removed - -#include <DataMan.h> #include "adios2/ADIOSConfig.h" #include "adios2/core/Engine.h" +#include "adios2/toolkit/transportman/dataman/DataMan.h" namespace adios2 { @@ -26,8 +24,6 @@ class DataManWriter : public Engine { public: - using json = nlohmann::json; - DataManWriter(IO &io, const std::string &name, const OpenMode openMode, MPI_Comm mpiComm); @@ -44,7 +40,7 @@ public: private: bool m_DoRealTime = false; bool m_DoMonitor = false; - DataMan m_Man; + transportman::DataMan m_Man; std::function<void(const void *, std::string, std::string, std::string, Dims)> m_CallBack; ///< call back function diff --git a/source/adios2/engine/dataman/DataManWriter.tcc b/source/adios2/engine/dataman/DataManWriter.tcc index adcd073d24e06c6719aa3b70e45c2586ae4fdb5b..e12f03da5ad774cce459bbbcfa714bc3c8d009ff 100644 --- a/source/adios2/engine/dataman/DataManWriter.tcc +++ b/source/adios2/engine/dataman/DataManWriter.tcc @@ -40,7 +40,7 @@ void DataManWriter::DoWriteCommon(Variable<T> &variable, const T *values) variable.m_Start.assign(variable.m_Count.size(), 0); } - json jmsg; + nlohmann::json jmsg; jmsg["doid"] = m_Name; jmsg["var"] = variable.m_Name; jmsg["dtype"] = GetType<T>(); @@ -48,7 +48,11 @@ void DataManWriter::DoWriteCommon(Variable<T> &variable, const T *values) jmsg["varshape"] = variable.m_Shape; jmsg["offset"] = variable.m_Start; jmsg["timestep"] = 0; - m_Man.put(values, jmsg); + jmsg["bytes"] = + std::accumulate(variable.m_Shape.begin(), variable.m_Shape.end(), + sizeof(T), std::multiplies<size_t>()); + + m_Man.WriteWAN(values, jmsg); if (m_DoMonitor) { @@ -69,10 +73,6 @@ void DataManWriter::DoWriteCommon(Variable<T> &variable, const T *values) std::cout << "Rank: " << i << "\n"; std::cout << std::endl; } - else - { - sleep(1); - } } MPI_Barrier(m_MPIComm); } diff --git a/source/adios2/helper/adiosString.cpp b/source/adios2/helper/adiosString.cpp index 1f0c85d0cad367ce2ae88c1c7be4e59653d4dee6..f294951ab6021ba1e1bd7cda44859bbea5617caa 100644 --- a/source/adios2/helper/adiosString.cpp +++ b/source/adios2/helper/adiosString.cpp @@ -129,6 +129,27 @@ void SetParameterValue(const std::string key, const Params ¶meters, } } +std::string GetParameter(const std::string key, const Params ¶ms, + const bool isMandatory, const bool debugMode, + const std::string hint) +{ + std::string value; + auto itParameter = params.find(key); + if (itParameter == params.end()) + { + if (debugMode && isMandatory) + { + throw std::invalid_argument("ERROR: mandatory parameter " + key + + " not found, " + hint); + } + } + else + { + value = itParameter->second; + } + return value; +} + void SetParameterValueInt(const std::string key, const Params ¶meters, int &value, const bool debugMode, const std::string hint) diff --git a/source/adios2/helper/adiosString.h b/source/adios2/helper/adiosString.h index 5f795bc4e918b9dddff49245427ad621f44acee3..e323797e8a18ebaa7ae3bcec78875288a1e10476 100644 --- a/source/adios2/helper/adiosString.h +++ b/source/adios2/helper/adiosString.h @@ -68,6 +68,9 @@ GetParametersValues(const std::string &key, void SetParameterValue(const std::string key, const Params ¶meters, std::string &value) noexcept; +std::string GetParameter(const std::string key, const adios2::Params ¶ms, + const bool isMandatory, const bool debugMode, + const std::string hint); /** * Sets int value if found in parameters for input key * @param key input diff --git a/source/adios2/toolkit/transport/wan/WANZmq.cpp b/source/adios2/toolkit/transport/wan/WANZmq.cpp index 6627a6372c42ddebaab36572825787cabbfa29e2..9d9e3644108dc94fb96816ccdb034be9d15450e5 100644 --- a/source/adios2/toolkit/transport/wan/WANZmq.cpp +++ b/source/adios2/toolkit/transport/wan/WANZmq.cpp @@ -10,6 +10,7 @@ #include "WANZmq.h" +#include <iostream> #include <zmq.h> namespace adios2 @@ -22,7 +23,11 @@ WANZmq::WANZmq(const std::string ipAddress, const std::string port, : Transport("wan", "zmq", mpiComm, debugMode), m_IPAddress(ipAddress), m_Port(port) { - + m_Context = zmq_ctx_new(); + if (m_Context == nullptr || m_Context == NULL) + { + throw std::runtime_error("ERROR: Creating ZeroMQ context failed"); + } if (m_DebugMode) { // TODO verify port is unsigned int @@ -35,6 +40,10 @@ WANZmq::~WANZmq() { zmq_close(m_Socket); } + if (m_Context) + { + zmq_ctx_destroy(m_Context); + } } void WANZmq::Open(const std::string &name, const OpenMode openMode) @@ -51,7 +60,12 @@ void WANZmq::Open(const std::string &name, const OpenMode openMode) m_Socket = zmq_socket(m_Context, ZMQ_REQ); const std::string fullIP("tcp://" + m_IPAddress + ":" + m_Port); - zmq_connect(m_Socket, fullIP.c_str()); + int err = zmq_connect(m_Socket, fullIP.c_str()); + if (err) + { + throw std::runtime_error("ERROR: zmq_connect() failed with " + + std::to_string(err)); + } if (m_Profiler.IsActive) { @@ -88,7 +102,7 @@ void WANZmq::Open(const std::string &name, const OpenMode openMode) if (m_DebugMode) { - if (m_Socket == NULL) // something goes wrong + if (m_Socket == nullptr || m_Socket == NULL) // something goes wrong { throw std::ios_base::failure( "ERROR: couldn't open socket for address " + m_Name + @@ -108,10 +122,13 @@ void WANZmq::Write(const char *buffer, size_t size) m_Profiler.Timers.at("write").Resume(); } + /* + int status = zmq_send(m_Socket, buffer, size, 0); char ret[10]; zmq_recv(m_Socket, ret, 10, 0); + if (m_Profiler.IsActive) { m_Profiler.Timers.at("write").Pause(); @@ -128,6 +145,7 @@ void WANZmq::Write(const char *buffer, size_t size) ", in call to WANZmq write\n"); } } + */ } void WANZmq::Flush() {} diff --git a/source/adios2/toolkit/transport/wan/WANZmq.h b/source/adios2/toolkit/transport/wan/WANZmq.h index 7249f963cc31d0fd464c4c9d92c594c2ee600408..167814b4f84fbec00f78f16c46c1d2e2503db2b6 100644 --- a/source/adios2/toolkit/transport/wan/WANZmq.h +++ b/source/adios2/toolkit/transport/wan/WANZmq.h @@ -50,11 +50,11 @@ private: const std::string m_IPAddress; std::string m_Port; - /** TODO: find out if is provided externally */ - void *m_Context = NULL; + /** context handler created by zmq, thread safe */ + void *m_Context = nullptr; - /** handler created by zmq */ - void *m_Socket = NULL; + /** socket handler created by zmq */ + void *m_Socket = nullptr; }; } // end namespace transport diff --git a/source/adios2/toolkit/transportman/dataman/DataMan.cpp b/source/adios2/toolkit/transportman/dataman/DataMan.cpp index c30344df544dda4bd5010467047e8634accb8223..95abf19ee8c735b42293c6084383dcb5a4005fd1 100644 --- a/source/adios2/toolkit/transportman/dataman/DataMan.cpp +++ b/source/adios2/toolkit/transportman/dataman/DataMan.cpp @@ -8,7 +8,12 @@ * Author: Jason Wang wangr1@ornl.gov */ -#include "DataMan.h" +#include "adios2/toolkit/transportman/dataman/DataMan.h" +#include "adios2/helper/adiosString.h" + +#ifdef ADIOS2_HAVE_ZEROMQ +#include "adios2/toolkit/transport/wan/WANZmq.h" +#endif namespace adios2 { @@ -19,58 +24,43 @@ DataMan::DataMan(MPI_Comm mpiComm, const bool debugMode) : TransportMan(mpiComm, debugMode) { } - void DataMan::OpenWANTransports(const std::string &name, const OpenMode openMode, const std::vector<Params> ¶metersVector, const bool profile) { - auto lf_GetParameter = [](const std::string key, const Params ¶ms, - const bool isMandatory, - const bool debugMode) -> std::string { - - std::string value; - auto itParameter = params.find(key); - if (itParameter == params.end()) - { - if (debugMode && isMandatory) - { - throw std::invalid_argument( - "ERROR: wan transport doesn't have mandatory parameter " + - key + - ", provide one in IO AddTransport, in call to Open\n"); - } - } - else - { - value = itParameter->second; - } - return value; - }; for (const auto ¶meters : parametersVector) { - std::shared_ptr<Transport> wanTransport; + std::shared_ptr<Transport> wanTransport, controlTransport; + + // to be removed + for (auto &i : parameters) + { + std::cout << i.first << " " << i.second << std::endl; + } const std::string type( - lf_GetParameter("transport", parameters, true, m_DebugMode)); + GetParameter("type", parameters, true, m_DebugMode, "")); - const std::string lib( - lf_GetParameter("lib", parameters, true, m_DebugMode)); + const std::string trans( + GetParameter("transport", parameters, true, m_DebugMode, "")); const std::string ipAddress( - lf_GetParameter("ipaddress", parameters, true, m_DebugMode)); + GetParameter("ipaddress", parameters, true, m_DebugMode, "")); - const std::string port( - lf_GetParameter("port", parameters, false, m_DebugMode)); + std::string port_control( + GetParameter("port", parameters, false, m_DebugMode, "")); - if (port.empty()) + if (port_control.empty()) { - port = m_DefaultPort; + port_control = std::to_string(m_DefaultPort); } - const std::string messageName( - lf_GetParameter("name", parameters, false, m_DebugMode)); + const std::string port_data(std::to_string(stoi(port_control) + 1)); + + std::string messageName( + GetParameter("name", parameters, false, m_DebugMode, "")); if (messageName.empty()) { @@ -79,11 +69,13 @@ void DataMan::OpenWANTransports(const std::string &name, if (type == "wan") // need to create directory { - if (lib == "zmq") + if (trans == "zmq") { -#ifdef ADIOS_HAVE_ZMQ +#ifdef ADIOS2_HAVE_ZEROMQ wanTransport = std::make_shared<transport::WANZmq>( - ipAddress, port, m_MPIComm, m_DebugMode); + ipAddress, port_data, m_MPIComm, m_DebugMode); + controlTransport = std::make_shared<transport::WANZmq>( + ipAddress, port_control, m_MPIComm, m_DebugMode); #else throw std::invalid_argument( "ERROR: this version of ADIOS2 didn't compile with " @@ -94,15 +86,53 @@ void DataMan::OpenWANTransports(const std::string &name, { if (m_DebugMode) { - throw std::invalid_argument("ERROR: wan library " + lib + + throw std::invalid_argument("ERROR: wan library " + trans + " not supported or not " "provided in IO AddTransport, " "in call to Open\n"); } } } + wanTransport->Open(messageName, openMode); m_Transports.push_back(std::move(wanTransport)); + controlTransport->Open(messageName, openMode); + m_ControlTransports.push_back(std::move(controlTransport)); + } +} + +void DataMan::WriteWAN(const void *buffer, nlohmann::json jmsg) +{ + m_ControlTransports[m_CurrentTransport]->Write(jmsg.dump().c_str(), + jmsg.dump().size()); + m_Transports[m_CurrentTransport]->Write(static_cast<const char *>(buffer), + jmsg["bytes"].get<size_t>()); +} + +void DataMan::SetCallback(std::function<void(const void *, std::string, + std::string, std::string, Dims)> + callback) +{ + m_CallBack = callback; +} + +void DataMan::ReadThread(std::shared_ptr<Transport> trans, + std::shared_ptr<Transport> ctl_trans) +{ + while (m_Listening) + { + // Wait for Read API to be implemented + /* + if (ctl_trans->Read() >= 0) + { + std::string smsg; + nlohmann::json jmsg = json::parse(smsg); + } + else + { + usleep(1); + } + */ } } diff --git a/source/adios2/toolkit/transportman/dataman/DataMan.h b/source/adios2/toolkit/transportman/dataman/DataMan.h index c0e009f67fe84268f8b56519bebdd6878654543e..8466068fa98c96ac2b1fc97a88970ff505efa53e 100644 --- a/source/adios2/toolkit/transportman/dataman/DataMan.h +++ b/source/adios2/toolkit/transportman/dataman/DataMan.h @@ -12,8 +12,8 @@ #define ADIOS2_TOOLKIT_TRANSPORTMAN_DATAMAN_DATAMAN_H_ #include "adios2/toolkit/transportman/TransportMan.h" - #include <json.hpp> +#include <thread> namespace adios2 { @@ -32,11 +32,29 @@ public: const std::vector<Params> ¶metersVector, const bool profile); + void WriteWAN(const void *buffer, nlohmann::json jmsg); + + void SetCallback(std::function<void(const void *, std::string, std::string, + std::string, Dims)> + callback); + private: + void ReadThread(std::shared_ptr<Transport> trans, + std::shared_ptr<Transport> ctl_trans); + + std::vector<std::shared_ptr<Transport>> m_ControlTransports; + std::vector<std::thread> m_ControlThreads; + size_t m_CurrentTransport = 0; + bool m_Listening = false; + + std::function<void(const void *, std::string, std::string, std::string, + Dims)> + m_CallBack; + nlohmann::json m_JMessage; /** Pick the appropriate default */ - const std::string m_DefaultPort = "22"; + const int m_DefaultPort = 12306; }; } // end namespace transportman diff --git a/source/dataman/CMakeLists.txt b/source/dataman/CMakeLists.txt deleted file mode 100644 index 6defa0d619677d1d3672dda13b8cd6897b497a6b..0000000000000000000000000000000000000000 --- a/source/dataman/CMakeLists.txt +++ /dev/null @@ -1,77 +0,0 @@ -#------------------------------------------------------------------------------# -# Distributed under the OSI-approved Apache License, Version 2.0. See -# accompanying file Copyright.txt for details. -#------------------------------------------------------------------------------# - -if(NOT SHARED_LIBS_SUPPORTED) - message(FATAL_ERROR "DataMan requires shared library support") -endif() -if(MSVC) - message(FATAL_ERROR "DataMan is not currently compatible with MSVC") -endif() - -add_library(dataman - DataManBase.cpp DataManBase.h - DataMan.cpp DataMan.h - CacheMan.cpp CacheMan.h -) -target_include_directories(dataman - PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}> -) -target_link_libraries(dataman - PRIVATE adios2sys_interface - PUBLIC NLohmannJson -) -set_target_properties(dataman PROPERTIES - VERSION ${ADIOS2_VERSION} - SOVERSION ${ADIOS2_VERSION_MAJOR} -) -install(TARGETS dataman EXPORT adios2Exports - RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} - LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} - ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} - INCLUDES DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/dataman -) - -# Add the dataman plugins as MODULE libraries instead of SHARED libraries. -# MODULE libraries are designed to be plugins, i.e. shared libs that nobody -# else links to. - -set(dataman_modules) - -add_library(dumpman MODULE DumpMan.h DumpMan.cpp) -target_link_libraries(dumpman PRIVATE dataman) -list(APPEND dataman_modules dumpman) - -add_library(temporalman MODULE TemporalMan.h TemporalMan.cpp) -target_link_libraries(temporalman PRIVATE dataman) -list(APPEND dataman_modules temporalman) - -if(ADIOS2_HAVE_ZeroMQ) - add_library(zmqman MODULE - StreamMan.h StreamMan.cpp - ZmqMan.h ZmqMan.cpp - ) - target_link_libraries(zmqman PRIVATE dataman ZeroMQ::ZMQ) - list(APPEND dataman_modules zmqman) - - add_library(mdtmman MODULE - StreamMan.h StreamMan.cpp - MdtmMan.h MdtmMan.cpp - ) - target_link_libraries(mdtmman PRIVATE dataman ZeroMQ::ZMQ) - list(APPEND dataman_modules mdtmman) -endif() - -if(ADIOS2_HAVE_ZFP) - add_library(zfpman MODULE ZfpMan.h ZfpMan.cpp) - target_link_libraries(zfpman PRIVATE dataman zfp::zfp) - - list(APPEND dataman_modules zfpman) -endif() - -install(TARGETS ${dataman_modules} - RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} - LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} - ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} -) diff --git a/source/dataman/CacheMan.cpp b/source/dataman/CacheMan.cpp deleted file mode 100644 index d18c3eb140a278e2b8bccd502948ae0dae5842fa..0000000000000000000000000000000000000000 --- a/source/dataman/CacheMan.cpp +++ /dev/null @@ -1,189 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * CacheMan.cpp - * - * Created on: Apr 18, 2017 - * Author: Jason Wang - */ - -#include "CacheMan.h" - -#include <algorithm> -#include <limits> - -int CacheItem::init(json a_jmsg) -{ - m_jmsg = a_jmsg; - return 0; -} - -int CacheMan::put(const void *a_data, json &a_jmsg) -{ - std::string doid = a_jmsg["doid"].get<std::string>(); - std::string var = a_jmsg["var"].get<std::string>(); - return m_cache[doid][var].put(a_data, a_jmsg); -} - -int CacheItem::put(const void *a_data, json &a_jmsg) -{ - - if (!m_initialized) - { - init(a_jmsg); - m_initialized = true; - } - - std::vector<size_t> varshape = - a_jmsg["varshape"].get<std::vector<size_t>>(); - std::vector<size_t> putshape = - a_jmsg["putshape"].get<std::vector<size_t>>(); - std::vector<size_t> offset = a_jmsg["offset"].get<std::vector<size_t>>(); - size_t putsize = a_jmsg["putsize"].get<size_t>(); - size_t chunksize = putshape.back(); - size_t varbytes = a_jmsg["varbytes"].get<size_t>(); - size_t dsize = a_jmsg["dsize"].get<size_t>(); - - if (m_cache.empty()) - { - push(); - } - - for (size_t i = 0; i < putsize; i += chunksize) - { - std::vector<size_t> p = one2multi(putshape, i); - p = apply_offset(p, offset); - size_t ig = multi2one(varshape, p); - std::copy(const_cast<char *>(static_cast<const char *>(a_data)) + - i * dsize, - const_cast<char *>(static_cast<const char *>(a_data)) + - i * dsize + chunksize * dsize, - m_cache.back().data() + ig * dsize); - } - - return 0; -} - -void *CacheItem::get() { return m_cache.front().data(); } - -void *CacheMan::get(std::string doid, std::string var) -{ - return m_cache[doid][var].get(); -} - -void CacheMan::pop() -{ - for (auto &i : m_cache) - { - for (auto &j : i.second) - { - j.second.pop(); - } - } - m_timesteps_cached--; - m_timestep_first++; -} - -void CacheItem::pop() { m_cache.pop(); } - -void CacheMan::push() -{ - for (auto &i : m_cache) - { - for (auto &j : i.second) - { - j.second.push(); - } - } - m_timesteps_cached++; -} - -void CacheItem::push() -{ - size_t varbytes = m_jmsg["varbytes"].get<size_t>(); - m_cache.push(std::vector<char>(varbytes)); - clean("nan"); -} - -void CacheItem::clean(std::string a_mode) -{ - size_t varbytes = m_jmsg["varbytes"].get<size_t>(); - size_t varsize = m_jmsg["varsize"].get<size_t>(); - std::string dtype = m_jmsg["dtype"].get<std::string>(); - if (a_mode == "zero") - { - std::memset(m_cache.front().data(), 0, varbytes); - return; - } - else if (a_mode == "nan") - { - if (dtype == "float") - { - for (size_t j = 0; j < varsize; ++j) - { - (reinterpret_cast<float *>(m_cache.front().data()))[j] = - std::numeric_limits<float>::quiet_NaN(); - } - } - else if (dtype == "double") - { - for (size_t j = 0; j < varsize; ++j) - { - (reinterpret_cast<double *>(m_cache.front().data()))[j] = - std::numeric_limits<double>::quiet_NaN(); - } - } - else if (dtype == "int") - { - for (size_t j = 0; j < varsize; ++j) - { - (reinterpret_cast<int *>(m_cache.front().data()))[j] = - std::numeric_limits<int>::quiet_NaN(); - } - } - } -} - -void CacheMan::clean(std::string a_mode) -{ - for (auto &i : m_cache) - { - for (auto &j : i.second) - { - j.second.clean(a_mode); - } - } -} - -std::vector<std::string> CacheMan::get_do_list() -{ - std::vector<std::string> do_list; - for (const auto &i : m_cache) - do_list.push_back(i.first); - return do_list; -} - -std::vector<std::string> CacheMan::get_var_list(std::string doid) -{ - std::vector<std::string> var_list; - for (const auto &i : m_cache[doid]) - var_list.push_back(i.first); - return var_list; -} - -size_t CacheMan::get_timesteps_cached() { return m_timesteps_cached; } - -nlohmann::json CacheMan::get_jmsg(std::string a_doid, std::string a_var) -{ - return m_cache[a_doid][a_var].get_jmsg(); -} - -nlohmann::json CacheItem::get_jmsg() -{ - m_jmsg.erase("putsize"); - m_jmsg.erase("putshape"); - m_jmsg.erase("putbytes"); - m_jmsg.erase("offset"); - return m_jmsg; -} diff --git a/source/dataman/CacheMan.h b/source/dataman/CacheMan.h deleted file mode 100644 index 0aaaacdef36ce02b169a81adac4bdfc7b7caa9a5..0000000000000000000000000000000000000000 --- a/source/dataman/CacheMan.h +++ /dev/null @@ -1,102 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * CacheMan.h - * - * Created on: Apr 18, 2017 - * Author: Jason Wang - */ - -#ifndef DATAMAN_CACHEMAN_H_ -#define DATAMAN_CACHEMAN_H_ - -#include <queue> - -#include <json.hpp> - -class CacheItem -{ -public: - using json = nlohmann::json; - - int init(json a_jmsg); - virtual int put(const void *a_data, json &a_jmsg); - virtual void transform(std::vector<char> &a_data, json &a_jmsg) {} - - void *get(); - void clean(const std::string a_mode); - void pop(); - void push(); - json get_jmsg(); - void clean(std::vector<char> &a_data, std::string a_mode); - std::string m_clean_mode; - -private: - std::queue<std::vector<char>> m_cache; - json m_jmsg; - bool m_initialized = false; - - inline std::vector<size_t> apply_offset(const std::vector<size_t> &p, - const std::vector<size_t> &o) - { - std::vector<size_t> g; - for (int i = 0; i < p.size(); ++i) - { - g.push_back(p[i] + o[i]); - } - return g; - } - - inline size_t multi2one(const std::vector<size_t> &v, - const std::vector<size_t> &p) - { - size_t index = 0; - for (int i = 1; i < v.size(); ++i) - { - index += std::accumulate(v.begin() + i, v.end(), p[i - 1], - std::multiplies<size_t>()); - } - index += p.back(); - return index; - } - - inline std::vector<size_t> one2multi(const std::vector<size_t> &v, size_t p) - { - std::vector<size_t> index(v.size()); - for (int i = 1; i < v.size(); ++i) - { - size_t s = std::accumulate(v.begin() + i, v.end(), 1, - std::multiplies<size_t>()); - index[i - 1] = p / s; - p -= index[i - 1] * s; - } - index.back() = p; - return index; - } -}; - -class CacheMan -{ - -public: - using json = nlohmann::json; - int put(const void *a_data, json &a_jmsg); - void *get(std::string doid, std::string var); - void pop(); - void push(); - std::vector<std::string> get_do_list(); - std::vector<std::string> get_var_list(std::string doid); - size_t get_timesteps_cached(); - json get_jmsg(std::string doid, std::string var); - void clean(std::string a_mode); - -private: - typedef std::map<std::string, CacheItem> CacheVarMap; - typedef std::map<std::string, CacheVarMap> CacheDoMap; - CacheDoMap m_cache; - size_t m_timesteps_cached = 0; - size_t m_timestep_first = 0; -}; - -#endif diff --git a/source/dataman/CompressMan.h b/source/dataman/CompressMan.h deleted file mode 100644 index 5c65cb191cf5eac59b37430d395fdacdf946ea7a..0000000000000000000000000000000000000000 --- a/source/dataman/CompressMan.h +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * CompressMan.h - * - * Created on: Apr 20, 2017 - * Author: Jason Wang - */ - -#ifndef DATAMAN_COMPRESSMAN_H_ -#define DATAMAN_COMPRESSMAN_H_ - -#include "DataMan.h" - -class CompressMan : public DataManBase -{ -public: - CompressMan() = default; - virtual ~CompressMan() = default; - virtual std::string type() { return "Compress"; } -}; - -#endif diff --git a/source/dataman/DataMan.cpp b/source/dataman/DataMan.cpp deleted file mode 100644 index 2c9ab9d27ac0b0330a3e9b48476ec963670f6526..0000000000000000000000000000000000000000 --- a/source/dataman/DataMan.cpp +++ /dev/null @@ -1,139 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * DataMan.cpp - * - * Created on: Apr 12, 2017 - * Author: Jason Wang - */ - -#include "DataMan.h" - -int DataMan::init(json a_jmsg) { return 0; } - -int DataMan::put_streams(const void *a_data, json &a_jmsg) -{ - a_jmsg["channel_id"] = m_stream_index; - m_stream_mans[m_stream_index]->put(a_data, a_jmsg); - ++m_stream_index; - if (m_stream_index >= m_stream_mans.size()) - { - m_stream_index = 0; - } - return 0; -} - -int DataMan::put(const void *a_data, json &a_jmsg) -{ - a_jmsg["timestep"] = m_timestep; - if (m_cache_size > 0) - { - check_shape(a_jmsg); - m_cache.put(a_data, a_jmsg); - } - else - { - put_begin(a_data, a_jmsg); - put_streams(a_data, a_jmsg); - put_end(a_data, a_jmsg); - } - dump_profiling(); - return 0; -} - -void DataMan::add_file(std::string p_method) {} - -void DataMan::add_stream(json a_jmsg) -{ - - std::string method; - - if (a_jmsg["method"].is_string()) - { - method = a_jmsg["method"]; - } - - logging("Streaming method " + method + " added"); - - if (a_jmsg["cachesize"].is_number()) - { - m_cache_size = a_jmsg["cachesize"].get<size_t>(); - } - - int num_channels = 1; - - if (a_jmsg["num_channels"].is_number()) - { - num_channels = a_jmsg["num_channels"].get<int>(); - } - else - { - a_jmsg["num_channels"] = num_channels; - } - - int local_port = 12306, remote_port = 12307; - - if (a_jmsg["local_port"].is_number()) - { - local_port = a_jmsg["local_port"].get<int>(); - } - - if (a_jmsg["remote_port"].is_number()) - { - local_port = a_jmsg["remote_port"].get<int>(); - } - - for (int i = 0; i < num_channels; i++) - { - a_jmsg["channel_id"] = i; - a_jmsg["local_port"] = local_port + 2; - a_jmsg["remote_port"] = remote_port + 2; - auto man = get_man(method); - if (man) - { - std::cout << a_jmsg.dump(4); - man->init(a_jmsg); - m_stream_mans.push_back(man); - } - if (a_jmsg["compression_method"].is_string()) - { - if (a_jmsg["compression_method"] != "null") - { - } - } - } -} - -void DataMan::flush() -{ - m_timestep++; - if (m_cache_size > 0) - { - if (m_cache_size == m_cache.get_timesteps_cached()) - { - for (int i = 0; i < m_cache_size; ++i) - { - std::vector<std::string> do_list = m_cache.get_do_list(); - for (const auto &j : do_list) - { - std::vector<std::string> var_list = m_cache.get_var_list(j); - for (const auto &k : var_list) - { - json jmsg = m_cache.get_jmsg(j, k); - put_begin(m_cache.get(j, k), jmsg); - put_streams(m_cache.get(j, k), jmsg); - put_end(m_cache.get(j, k), jmsg); - } - } - m_cache.pop(); - } - } - else - { - m_cache.push(); - } - } -} - -int DataMan::get(void *a_data, json &a_jmsg) { return 0; } diff --git a/source/dataman/DataMan.h b/source/dataman/DataMan.h deleted file mode 100644 index f37b86d3b08165683889cf29e0a53c2fb775cd26..0000000000000000000000000000000000000000 --- a/source/dataman/DataMan.h +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * DataMan.h - * - * Created on: Apr 12, 2017 - * Author: Jason Wang - */ - -#ifndef DATAMAN_DATAMAN_H_ -#define DATAMAN_DATAMAN_H_ - -#include "CacheMan.h" -#include "DataManBase.h" - -class DataMan : public DataManBase -{ -public: - DataMan() = default; - virtual ~DataMan() = default; - virtual int init(json a_jmsg); - virtual int put(const void *a_data, json &a_jmsg); - virtual int get(void *a_data, json &a_jmsg); - int put_streams(const void *a_data, json &a_jmsg); - void flush(); - void add_stream(json a_jmsg); - void add_file(std::string p_method); - std::string name() { return "DataManager"; } - std::string type() { return "Manager"; } - virtual void transform(std::vector<char> &a_data, json &a_jmsg) {} - -private: - CacheMan m_cache; - size_t m_cache_size = 0; - size_t m_timestep = 0; - int m_stream_index = 0; -}; - -#endif // DATAMAN_DATAMAN_H_ diff --git a/source/dataman/DataManBase.cpp b/source/dataman/DataManBase.cpp deleted file mode 100644 index 5d498e31cd1758609dc201bf11d0002853332297..0000000000000000000000000000000000000000 --- a/source/dataman/DataManBase.cpp +++ /dev/null @@ -1,433 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * DataManBase.cpp - * - * Created on: Apr 12, 2017 - * Author: Jason Wang - */ - -#include "DataManBase.h" - -#include <sstream> - -#include <adios2sys/DynamicLoader.hxx> - -struct DataManBase::ManagerLibrary -{ - std::string m_LibraryName; - adios2sys::DynamicLoader::LibraryHandle m_LibraryHandle; - DataManBase *(*m_getManFunc)(); - - ManagerLibrary(std::string method) - { - std::vector<std::string> searchedLibs; - std::string libName; - - std::vector<std::string> libPrefixes; - libPrefixes.emplace_back(""); - libPrefixes.emplace_back("lib"); -#ifdef __CYGWIN__ - libPrefixes.emplace_back("cyg"); -#endif - - std::vector<std::string> libSuffixes; -#ifdef __APPLE__ - libSuffixes.emplace_back("man.dylib"); - libSuffixes.emplace_back("man.so"); -#endif -#ifdef __hpux - libSuffixes.emplace_back("man.sl"); -#endif -#ifdef __unix__ - libSuffixes.emplace_back("man.so"); -#endif -#ifdef _WIN32 - libSuffixes.emplace_back("man.dll"); -#endif - - // Test the various combinations of library names - for (const std::string &prefix : libPrefixes) - { - for (const std::string &suffix : libSuffixes) - { - libName = prefix + method + suffix; - m_LibraryHandle = - adios2sys::DynamicLoader::OpenLibrary(libName); - searchedLibs.push_back(libName); - if (m_LibraryHandle) - { - break; - } - } - if (m_LibraryHandle) - { - break; - } - } - if (!m_LibraryHandle) - { - std::stringstream errString; - errString << "Unable to locate the " << method << " manager " - << "library; searched for "; - std::copy(searchedLibs.begin(), searchedLibs.end(), - std::ostream_iterator<std::string>(errString, " ")); - - throw std::runtime_error(errString.str()); - } - - // Bind to the getMan symbol - adios2sys::DynamicLoader::SymbolPointer symbolHandle = - adios2sys::DynamicLoader::GetSymbolAddress(m_LibraryHandle, - "getMan"); - if (!symbolHandle) - { - throw std::runtime_error("Unable to locate the getMan symbol in " + - libName); - } - m_getManFunc = reinterpret_cast<DataManBase *(*)()>(symbolHandle); - m_LibraryName = libName; - } - - ~ManagerLibrary() - { - if (m_LibraryHandle) - { - adios2sys::DynamicLoader::CloseLibrary(m_LibraryHandle); - } - } -}; - -DataManBase::DataManBase() -{ - m_profiling["total_manager_time"] = 0.0f; - m_profiling["total_mb"] = 0.0f; - m_start_time = std::chrono::system_clock::now(); -} - -int DataManBase::put_begin(const void *p_data, json &p_jmsg) -{ - check_shape(p_jmsg); - if (p_jmsg["compressed_size"].is_number()) - { - p_jmsg["sendbytes"] = p_jmsg["compressed_size"].get<size_t>(); - } - else - { - p_jmsg["sendbytes"] = p_jmsg["putbytes"].get<size_t>(); - } - p_jmsg["profiling"] = m_profiling; - m_step_time = std::chrono::system_clock::now(); - return 0; -} - -int DataManBase::put_end(const void *p_data, json &p_jmsg) -{ - auto end = std::chrono::system_clock::now(); - std::chrono::duration<double> duration = end - m_step_time; - m_profiling["total_manager_time"] = - m_profiling["total_manager_time"].get<double>() + duration.count(); - m_profiling["total_mb"] = - m_profiling["total_mb"].get<double>() + - product(p_jmsg["varshape"], dsize(p_jmsg["dtype"])) / 1000000.0f; - duration = end - m_start_time; - m_profiling["total_workflow_time"] = duration.count(); - m_profiling["workflow_mbs"] = - m_profiling["total_mb"].get<double>() / - m_profiling["total_workflow_time"].get<double>(); - m_profiling["manager_mbs"] = - m_profiling["total_mb"].get<double>() / - m_profiling["total_manager_time"].get<double>(); - return 0; -} - -void DataManBase::reg_callback( - std::function<void(const void *, std::string, std::string, std::string, - std::vector<size_t>)> - cb) -{ - if (m_stream_mans.empty()) - { - m_callback = cb; - } - else - { - for (const auto &i : m_stream_mans) - { - i->reg_callback(cb); - } - } -} - -void DataManBase::dump(const void *p_data, json p_jmsg, std::ostream &out) -{ - std::vector<size_t> p_varshape = - p_jmsg["varshape"].get<std::vector<size_t>>(); - std::string dtype = p_jmsg["dtype"]; - size_t length = p_jmsg["dumplength"].get<size_t>(); - size_t s = 0; - for (size_t i = 0; i < product(p_varshape, 1); ++i) - { - s++; - out << (static_cast<const float *>(p_data))[i] << " "; - if (s == length) - { - out << std::endl; - s = 0; - } - } - out << std::endl; -} - -bool DataManBase::auto_transform(std::vector<char> &a_data, json &a_jmsg) -{ - if (a_jmsg["compression_method"].is_string() && - a_jmsg["compression_method"].get<std::string>() != "null") - { - auto method = a_jmsg["compression_method"].get<std::string>(); - auto man = get_man(method); - if (!man) - { - logging("Library file for compression method " + method + - " not found!"); - return false; - } - man->transform(a_data, a_jmsg); - a_jmsg.erase("compression_method"); - a_jmsg.erase("compression_rate"); - a_jmsg.erase("compressed_size"); - return true; - } - else - { - return false; - } -} - -std::shared_ptr<DataManBase> DataManBase::get_man(std::string method) -{ - try - { - // Reuse already loaded libraries if possible - auto libIt = m_LoadedManagers.find(method); - if (libIt == m_LoadedManagers.end()) - { - // This insertion will only fail if an entry for method already - // exists, which this if block ensures that it doesn't. - libIt = - m_LoadedManagers.insert({method, new ManagerLibrary(method)}) - .first; - logging("Loaded " + libIt->second->m_LibraryName); - } - else - { - logging("Using existing " + libIt->second->m_LibraryName + "."); - } - return std::shared_ptr<DataManBase>(libIt->second->m_getManFunc()); - } - catch (const std::runtime_error &ex) - { - logging(ex.what()); - return nullptr; - } -} - -void DataManBase::logging(std::string p_msg, std::string p_man, - std::ostream &out) -{ - if (p_man == "") - { - p_man = name(); - } - out << "["; - out << p_man; - out << "]"; - out << " "; - out << p_msg; - out << std::endl; -} - -bool DataManBase::check_json(json p_jmsg, std::vector<std::string> p_strings, - std::string p_man) -{ - if (p_man == "") - { - p_man = name(); - } - for (const auto &i : p_strings) - { - if (p_jmsg[i] == nullptr) - { - if (p_man != "") - { - logging("JSON key " + i + " not found!", p_man); - } - return false; - } - } - return true; -} - -size_t DataManBase::product(size_t *shape) -{ - size_t s = 1; - if (shape) - { - for (size_t i = 1; i <= shape[0]; ++i) - { - s *= shape[i]; - } - } - return s; -} - -size_t DataManBase::product(std::vector<size_t> shape, size_t size) -{ - return accumulate(shape.begin(), shape.end(), size, - std::multiplies<size_t>()); -} - -size_t DataManBase::dsize(std::string dtype) -{ - if (dtype == "char") - { - return sizeof(char); - } - if (dtype == "short") - { - return sizeof(short); - } - if (dtype == "int") - { - return sizeof(int); - } - if (dtype == "long") - { - return sizeof(long); - } - if (dtype == "unsigned char") - { - return sizeof(unsigned char); - } - if (dtype == "unsigned short") - { - return sizeof(unsigned short); - } - if (dtype == "unsigned int") - { - return sizeof(unsigned int); - } - if (dtype == "unsigned long") - { - return sizeof(unsigned long); - } - if (dtype == "float") - { - return sizeof(float); - } - if (dtype == "double") - { - return sizeof(double); - } - if (dtype == "long double") - { - return sizeof(long double); - } - if (dtype == "std::complex<float>" or dtype == "complex<float>") - { - return sizeof(std::complex<float>); - } - if (dtype == "std::complex<double>") - { - return sizeof(std::complex<double>); - } - - if (dtype == "int8_t") - { - return sizeof(int8_t); - } - if (dtype == "uint8_t") - { - return sizeof(uint8_t); - } - if (dtype == "int16_t") - { - return sizeof(int16_t); - } - if (dtype == "uint16_t") - { - return sizeof(uint16_t); - } - if (dtype == "int32_t") - { - return sizeof(int32_t); - } - if (dtype == "uint32_t") - { - return sizeof(uint32_t); - } - if (dtype == "int64_t") - { - return sizeof(int64_t); - } - if (dtype == "uint64_t") - { - return sizeof(uint64_t); - } - return 0; -} - -int DataManBase::closest(int v, json j, bool up) -{ - int s = 100, k = 0, t; - for (unsigned int i = 0; i < j.size(); ++i) - { - if (up) - { - t = j[i].get<int>() - v; - } - else - { - t = v - j[i].get<int>(); - } - if (t >= 0 && t < s) - { - s = t; - k = i; - } - } - return k; -} - -void DataManBase::check_shape(json &p_jmsg) -{ - std::vector<size_t> varshape; - if (check_json(p_jmsg, {"varshape"})) - { - varshape = p_jmsg["varshape"].get<std::vector<size_t>>(); - } - else - { - return; - } - if (not p_jmsg["putshape"].is_array()) - { - p_jmsg["putshape"] = varshape; - } - if (not p_jmsg["offset"].is_array()) - { - p_jmsg["offset"] = std::vector<size_t>(varshape.size(), 0); - } - p_jmsg["dsize"] = dsize(p_jmsg["dtype"].get<std::string>()); - - p_jmsg["putsize"] = product(p_jmsg["putshape"].get<std::vector<size_t>>()); - p_jmsg["varsize"] = product(varshape); - - p_jmsg["putbytes"] = product(p_jmsg["putshape"].get<std::vector<size_t>>(), - dsize(p_jmsg["dtype"].get<std::string>())); - p_jmsg["varbytes"] = - product(varshape, dsize(p_jmsg["dtype"].get<std::string>())); -} - -void DataManBase::dump_profiling() { logging(m_profiling.dump(4)); } diff --git a/source/dataman/DataManBase.h b/source/dataman/DataManBase.h deleted file mode 100644 index 37780ae7afab5abfcaebff592da2c9c355797ec9..0000000000000000000000000000000000000000 --- a/source/dataman/DataManBase.h +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * DataManBase.h - * - * Created on: Apr 12, 2017 - * Author: Jason Wang - */ - -#ifndef DATAMAN_DATAMANBASE_H_ -#define DATAMAN_DATAMANBASE_H_ - -#include <cstdint> - -#include <chrono> -#include <complex> -#include <functional> -#include <iostream> -#include <memory> -#include <string> -#include <unordered_map> -#include <vector> - -#include <json.hpp> - -class DataManBase -{ -public: - using json = nlohmann::json; - DataManBase(); - - virtual int put_begin(const void *a_data, json &a_jmsg); - virtual int put_end(const void *a_data, json &a_jmsg); - - virtual int put(const void *a_data, json &a_jmsg) = 0; - virtual int get(void *a_data, json &a_jmsg) = 0; - virtual int init(json a_jmsg) = 0; - virtual void flush() = 0; - virtual std::string name() = 0; - virtual std::string type() = 0; - void reg_callback(std::function<void(const void *, std::string, std::string, - std::string, std::vector<size_t>)> - cb); - void dump(const void *a_data, json a_jmsg, std::ostream &out = std::cout); - virtual void transform(std::vector<char> &a_data, json &a_jmsg) = 0; - void dump_profiling(); - -protected: - bool auto_transform(std::vector<char> &a_data, json &a_jmsg); - - std::shared_ptr<DataManBase> get_man(std::string method); - - void logging(std::string p_msg, std::string p_man = "", - std::ostream &out = std::cout); - - bool check_json(json a_jmsg, std::vector<std::string> p_strings, - std::string p_man = ""); - - size_t product(size_t *shape); - - size_t product(std::vector<size_t> shape, size_t size = 1); - - size_t dsize(std::string dtype); - - int closest(int v, json j, bool up); - - void check_shape(json &a_jmsg); - - std::function<void(const void *, std::string, std::string, std::string, - std::vector<size_t>)> - m_callback; - std::vector<std::shared_ptr<DataManBase>> m_stream_mans; - -private: - struct ManagerLibrary; - std::unordered_map<std::string, ManagerLibrary *> m_LoadedManagers; - - std::chrono::time_point<std::chrono::system_clock> m_start_time; - std::chrono::time_point<std::chrono::system_clock> m_step_time; - json m_profiling; -}; - -#endif /* DATAMANBASE_H_ */ diff --git a/source/dataman/DumpMan.cpp b/source/dataman/DumpMan.cpp deleted file mode 100644 index b3ab55f98c7595a280aeefbb7e0a93c9dd10886b..0000000000000000000000000000000000000000 --- a/source/dataman/DumpMan.cpp +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * DumpMan.cpp - * - * Created on: Apr 20, 2017 - * Author: Jason Wang - */ - -#include "DumpMan.h" - -int DumpMan::init(json a_jmsg) -{ - if (a_jmsg["dumping"].is_boolean()) - { - m_dumping = a_jmsg["dumping"].get<bool>(); - } - return 0; -} -int DumpMan::get(void *a_data, json &a_jmsg) { return 0; } - -int DumpMan::put(const void *a_data, json &a_jmsg) -{ - put_begin(a_data, a_jmsg); - - if (!m_dumping) - { - return 1; - } - if (!check_json(a_jmsg, {"doid", "var", "dtype", "putshape"})) - { - return -1; - } - - std::string doid = a_jmsg["doid"]; - std::string var = a_jmsg["var"]; - std::string dtype = a_jmsg["dtype"]; - std::vector<size_t> putshape = - a_jmsg["putshape"].get<std::vector<size_t>>(); - std::vector<size_t> varshape = - a_jmsg["varshape"].get<std::vector<size_t>>(); - std::vector<size_t> offset = a_jmsg["offset"].get<std::vector<size_t>>(); - int numbers_to_print = 100; - if (numbers_to_print > product(putshape, 1)) - { - numbers_to_print = product(putshape, 1); - } - size_t putbytes = product(putshape, dsize(dtype)); - size_t sendbytes = a_jmsg["sendbytes"].get<size_t>(); - - std::cout << a_jmsg.dump(4) << std::endl; - std::cout << "total MBs = " << product(putshape, dsize(dtype)) / 1000000 - << std::endl; - - std::vector<char> data(static_cast<const char *>(a_data), - static_cast<const char *>(a_data) + sendbytes); - - auto_transform(data, a_jmsg); - - void *data_to_print = data.data(); - for (size_t i = 0; i < numbers_to_print; ++i) - { - if (dtype == "float") - { - std::cout << static_cast<float *>(data_to_print)[i] << " "; - } - if (dtype == "double") - { - std::cout << static_cast<double *>(data_to_print)[i] << " "; - } - } - - std::cout << std::endl; - put_end(a_data, a_jmsg); - return 0; -} - -void DumpMan::flush() {} diff --git a/source/dataman/DumpMan.h b/source/dataman/DumpMan.h deleted file mode 100644 index 7dd27c21213549a22cf783ff3a11f26c44c5aa13..0000000000000000000000000000000000000000 --- a/source/dataman/DumpMan.h +++ /dev/null @@ -1,36 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * DumpMan.h - * - * Created on: Apr 20, 2017 - * Author: Jason Wang - */ - -#ifndef DATAMAN_DUMPMAN_H_ -#define DATAMAN_DUMPMAN_H_ - -#include "DataMan.h" - -class DumpMan : public DataManBase -{ -public: - DumpMan() = default; - virtual ~DumpMan() = default; - - virtual int init(json a_jmsg); - virtual int put(const void *a_data, json &a_jmsg); - virtual int get(void *a_data, json &a_jmsg); - void flush(); - std::string name() { return "DumpMan"; } - std::string type() { return "Dump"; } - virtual void transform(std::vector<char> &a_data, json &a_jmsg) {} - -private: - bool m_dumping = true; -}; - -extern "C" DataManBase *getMan() { return new DumpMan; } - -#endif diff --git a/source/dataman/MdtmMan.cpp b/source/dataman/MdtmMan.cpp deleted file mode 100644 index 6bfa3af8c5d860f8db16cfe8f9b380aca392bc42..0000000000000000000000000000000000000000 --- a/source/dataman/MdtmMan.cpp +++ /dev/null @@ -1,190 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * MdtmMan.cpp - * - * Created on: Apr 20, 2017 - * Author: Jason Wang - */ - -#include "MdtmMan.h" - -#include <fcntl.h> -#include <sys/stat.h> -#include <unistd.h> - -#include <zmq.h> - -int MdtmMan::init(json a_jmsg) -{ - - std::cout << " 1 MdtmMan::init " << m_channel_id << std::endl; - StreamMan::init(a_jmsg); - - if (a_jmsg["pipe_prefix"].is_string()) - { - m_pipepath = a_jmsg["pipe_prefix"].get<std::string>(); - } - - json pipe_desc; - pipe_desc["operation"] = "init"; - pipe_desc["pipe_prefix"] = m_pipepath; - pipe_desc["mode"] = m_stream_mode; - - std::stringstream pname; - pname << m_pipename_prefix << m_channel_id; - m_pipename = pname.str(); - m_full_pipename = m_pipepath + m_pipename; - - // send JSON message to MDTM - if (m_channel_id == 0) - { - for (int i = 0; i < m_num_channels; ++i) - { - std::stringstream pipename; - pipename << m_pipename_prefix << i; - if (i == 0) - { - pipe_desc["pipe_names"] = {pipename.str()}; - } - else - { - pipe_desc["pipe_names"].insert(pipe_desc["pipe_names"].end(), - pipename.str()); - } - } - void *zmq_ipc_req = nullptr; - zmq_ipc_req = zmq_socket(m_zmq_context, ZMQ_REQ); - zmq_connect(zmq_ipc_req, "ipc:///tmp/ADIOS_MDTM_pipe"); - char buffer_return[10]; - zmq_send(zmq_ipc_req, pipe_desc.dump().c_str(), - pipe_desc.dump().length(), 0); - zmq_recv(zmq_ipc_req, buffer_return, sizeof(buffer_return), 0); - if (zmq_ipc_req) - { - zmq_close(zmq_ipc_req); - } - } - - // Make pipes - mkdir(m_pipepath.c_str(), 0755); - - std::cout << "making " << m_full_pipename << std::endl; - mkfifo(m_full_pipename.c_str(), 0666); - std::cout << "made " << m_full_pipename << std::endl; - - if (m_stream_mode == "sender") - { - std::cout << "opening " << m_full_pipename << std::endl; - m_pipe_handler = open(m_full_pipename.c_str(), O_WRONLY); - std::cout << "opened " << m_full_pipename << std::endl; - } - if (m_stream_mode == "receiver") - { - m_pipe_handler = open(m_full_pipename.c_str(), O_RDONLY | O_NONBLOCK); - } - - return 0; -} - -int MdtmMan::put(const void *a_data, json &a_jmsg) -{ - a_jmsg["pipe"] = m_pipename; - put_begin(a_data, a_jmsg); - StreamMan::put_stream(a_data, a_jmsg); - put_end(a_data, a_jmsg); - return 0; -} - -int MdtmMan::get(void *a_data, json &a_jmsg) { return 0; } - -void MdtmMan::on_put(std::shared_ptr<std::vector<char>> a_data) -{ - write(m_pipe_handler, a_data->data(), a_data->size()); -} - -void MdtmMan::on_recv(json &a_jmsg) -{ - - // push new request - jqueue.push(a_jmsg); - vqueue.push(std::vector<char>()); - iqueue.push(0); - - // for flush - if (jqueue.front()["operation"] == "flush") - { - callback_cache(); - jqueue.pop(); - vqueue.pop(); - iqueue.pop(); - } - - if (jqueue.empty()) - { - return; - } - - // for put - for (int outloop = 0; outloop < jqueue.size() * 2; outloop++) - { - if (jqueue.front()["operation"] == "put") - { - json &jmsg = jqueue.front(); - - // allocate buffer - size_t sendbytes = jmsg["sendbytes"].get<size_t>(); - vqueue.front() = std::vector<char>(sendbytes); - - // read the head request - int error_times = 0; - while (iqueue.front() < sendbytes) - { - int ret = - read(m_pipe_handler, vqueue.front().data() + iqueue.front(), - sendbytes - iqueue.front()); - if (ret > 0) - { - iqueue.front() += ret; - } - else - { - error_times++; - continue; - } - if (error_times > 1000000) - { - break; - } - } - - if (iqueue.front() == sendbytes) - { - if (a_jmsg["compression_method"].is_string()) - { - if (a_jmsg["compression_method"].get<std::string>() != - "null") - { - auto_transform(vqueue.front(), a_jmsg); - } - } - - if (a_jmsg["varshape"] == a_jmsg["putshape"]) - { - std::cout << "callback_direct \n"; - callback_direct(vqueue.front().data(), jmsg); - } - else - { - m_cache.put(vqueue.front().data(), jmsg); - } - - jqueue.pop(); - vqueue.pop(); - iqueue.pop(); - break; - } - } - } -} diff --git a/source/dataman/MdtmMan.h b/source/dataman/MdtmMan.h deleted file mode 100644 index 59f96e55d2d721a76fc99354f3c2a6b78a276f83..0000000000000000000000000000000000000000 --- a/source/dataman/MdtmMan.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * MdtmMan.h - * - * Created on: Apr 20, 2017 - * Author: Jason Wang - */ - -#ifndef DATAMAN_MDTMMAN_H_ -#define DATAMAN_MDTMMAN_H_ - -#include "StreamMan.h" - -#include <queue> - -class MdtmMan : public StreamMan -{ -public: - MdtmMan() = default; - virtual ~MdtmMan() = default; - - virtual int init(json a_jmsg); - virtual int put(const void *a_data, json &a_jmsg); - virtual int get(void *a_data, json &a_jmsg); - virtual void transform(std::vector<char> &a_data, json &a_jmsg) {} - - virtual void on_recv(json &a_msg); - virtual void on_put(std::shared_ptr<std::vector<char>> a_data); - std::string name() { return "MdtmMan"; } - -private: - int zmq_msg_size = 1024; - std::string m_pipepath = "/tmp/MdtmManPipes/"; - std::string m_pipename_prefix = "MdtmManPipe"; - std::string m_pipename; - std::string m_full_pipename; - int m_pipe_handler; - std::string getmode = "callback"; - std::queue<json> jqueue; - std::queue<std::vector<char>> vqueue; - std::queue<int> iqueue; - -}; // end of class MdtmMan - -extern "C" DataManBase *getMan() { return new MdtmMan; } - -#endif diff --git a/source/dataman/StreamMan.cpp b/source/dataman/StreamMan.cpp deleted file mode 100644 index a87de51fc809e8340a503bafc4fd845bcae1c2c2..0000000000000000000000000000000000000000 --- a/source/dataman/StreamMan.cpp +++ /dev/null @@ -1,205 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * StreamMan.cpp - * - * Created on: Apr 20, 2017 - * Author: Jason Wang - */ - -#include "StreamMan.h" - -#include <unistd.h> - -#include <iostream> -#include <sstream> - -#include "zmq.h" - -StreamMan::~StreamMan() -{ - if (m_zmq_rep) - { - zmq_close(m_zmq_rep); - } - if (m_zmq_req) - { - zmq_close(m_zmq_req); - } - if (m_zmq_context) - { - zmq_ctx_destroy(m_zmq_context); - } - - m_zmq_rep_thread_active = false; - if (m_zmq_rep_thread.joinable()) - { - m_zmq_rep_thread.join(); - } - - m_zmq_req_thread_active = false; - if (m_zmq_req_thread.joinable()) - { - m_zmq_req_thread.join(); - } -} - -int StreamMan::init(json p_jmsg) -{ - if (check_json(p_jmsg, {"stream_mode", "remote_ip", "local_ip", - "remote_port", "local_port"}, - "StreamMan")) - { - m_stream_mode = p_jmsg["stream_mode"]; - m_local_ip = p_jmsg["local_ip"]; - m_remote_ip = p_jmsg["remote_ip"]; - m_local_port = p_jmsg["local_port"]; - m_remote_port = p_jmsg["remote_port"]; - std::string remote_address = - make_address(m_remote_ip, m_remote_port, "tcp"); - std::string local_address = - make_address(m_local_ip, m_local_port, "tcp"); - - if (p_jmsg["clean_mode"].is_string()) - { - m_clean_mode = p_jmsg["clean_mode"]; - } - if (p_jmsg["tolerance"].is_number()) - { - m_tolerance = p_jmsg["tolerance"].get<int>(); - } - if (p_jmsg["priority"].is_number()) - { - m_priority = p_jmsg["priority"].get<int>(); - } - if (p_jmsg["num_channels"].is_number()) - { - m_num_channels = p_jmsg["num_channels"].get<int>(); - } - if (p_jmsg["channel_id"].is_number()) - { - m_channel_id = p_jmsg["channel_id"].get<int>(); - } - - if (!m_zmq_context) - { - m_zmq_context = zmq_ctx_new(); - if (m_stream_mode == "sender") - { - m_zmq_req = zmq_socket(m_zmq_context, ZMQ_REQ); - zmq_connect(m_zmq_req, remote_address.c_str()); - logging("Connecting " + remote_address + " ..."); - } - else if (m_stream_mode == "receiver") - { - m_zmq_rep = zmq_socket(m_zmq_context, ZMQ_REP); - zmq_bind(m_zmq_rep, local_address.c_str()); - logging("Binding " + local_address + " ..."); - m_zmq_rep_thread_active = true; - m_zmq_rep_thread = - std::thread(&StreamMan::zmq_rep_thread_func, this); - } - } - return 0; - } - else - { - return -1; - } -} - -int StreamMan::callback_direct(const void *a_data, json &a_jmsg) -{ - if (!m_callback) - { - logging("callback called but callback function not registered!"); - return -1; - } - - m_callback(a_data, a_jmsg["doid"], a_jmsg["var"], - a_jmsg["dtype"].get<std::string>(), - a_jmsg["varshape"].get<std::vector<size_t>>()); - return 0; -} - -int StreamMan::callback_cache() -{ - if (!m_callback) - { - logging("callback called but callback function not registered!"); - return -1; - } - - std::vector<std::string> do_list = m_cache.get_do_list(); - for (const std::string &i : do_list) - { - std::vector<std::string> var_list = m_cache.get_var_list(i); - for (const std::string &j : var_list) - { - m_callback( - m_cache.get(i, j), i, j, - m_cache.get_jmsg(i, j)["dtype"].get<std::string>(), - m_cache.get_jmsg(i, j)["varshape"].get<std::vector<size_t>>()); - } - } - m_cache.clean("nan"); - - return 0; -} - -void StreamMan::flush() -{ - json msg; - msg["operation"] = "flush"; - char ret[10]; - zmq_send(m_zmq_req, msg.dump().c_str(), msg.dump().length(), 0); - zmq_recv(m_zmq_req, ret, 10, 0); -} - -void StreamMan::zmq_req_thread_func(std::shared_ptr<std::vector<char>> a_data) -{ - on_put(a_data); -} - -void StreamMan::zmq_rep_thread_func() -{ - while (m_zmq_rep_thread_active) - { - char msg[1024] = ""; - int ret = zmq_recv(m_zmq_rep, msg, 1024, ZMQ_NOBLOCK); - zmq_send(m_zmq_rep, "OK", 4, 0); - std::string smsg = msg; - if (ret >= 0) - { - json jmsg = json::parse(msg); - logging("StreamMan::zmq_meta_rep_thread_func: \n" + jmsg.dump(4)); - on_recv(jmsg); - } - else - { - usleep(1); - } - } -} - -int StreamMan::put_stream(const void *a_data, json a_jmsg) -{ - if (m_zmq_req_thread.joinable()) - { - m_zmq_req_thread.join(); - } - a_jmsg["operation"] = "put"; - char ret[10]; - zmq_send(m_zmq_req, a_jmsg.dump().c_str(), a_jmsg.dump().length(), 0); - zmq_recv(m_zmq_req, ret, 10, 0); - - // copy - std::shared_ptr<std::vector<char>> data = - std::make_shared<std::vector<char>>(); - data->resize(a_jmsg["sendbytes"].get<size_t>()); - std::memcpy(data->data(), a_data, a_jmsg["sendbytes"].get<size_t>()); - - m_zmq_req_thread = std::thread(&StreamMan::zmq_req_thread_func, this, data); - return 0; -} diff --git a/source/dataman/StreamMan.h b/source/dataman/StreamMan.h deleted file mode 100644 index d0cffdee4b2d7a65cb6023d92f953e8b40c4ec74..0000000000000000000000000000000000000000 --- a/source/dataman/StreamMan.h +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * StreamMan.h - * - * Created on: Apr 20, 2017 - * Author: Jason Wang - */ - -#ifndef DATAMAN_STREAMMAN_H_ -#define DATAMAN_STREAMMAN_H_ - -#include "CacheMan.h" -#include "DataManBase.h" - -#include <thread> - -class StreamMan : public DataManBase -{ -public: - StreamMan() = default; - virtual ~StreamMan(); - - virtual int init(json a_jmsg); - virtual void on_recv(json &a_msg) = 0; - virtual void on_put(std::shared_ptr<std::vector<char>> a_data) = 0; - void flush(); - virtual std::string type() { return "Stream"; } - -protected: - int callback_direct(const void *a_data, json &a_jmsg); - int callback_cache(); - int put_stream(const void *a_data, json a_jmsg); - - void *m_zmq_context = nullptr; - CacheMan m_cache; - std::string m_get_mode = "callback"; - std::string m_stream_mode; - std::string m_local_ip; - std::string m_remote_ip; - int m_local_port; - int m_remote_port; - int m_tolerance = 0; - int m_priority = 100; - int m_channel_id = 0; - int m_num_channels = 1; - std::string m_clean_mode = "nan"; - - inline std::string make_address(std::string ip, int port, - std::string protocol) - { - std::stringstream address; - address << protocol << "://" << ip << ":" << port; - return address.str(); - } - -private: - void zmq_rep_thread_func(); - std::thread m_zmq_rep_thread; - void *m_zmq_rep = nullptr; - bool m_zmq_rep_thread_active = false; - - void zmq_req_thread_func(std::shared_ptr<std::vector<char>> a_data); - std::thread m_zmq_req_thread; - void *m_zmq_req = nullptr; - bool m_zmq_req_thread_active = false; -}; - -#endif diff --git a/source/dataman/TemporalMan.cpp b/source/dataman/TemporalMan.cpp deleted file mode 100644 index e60081bf0357bd2c43b3f2f506bd4d9d2ae3b6bc..0000000000000000000000000000000000000000 --- a/source/dataman/TemporalMan.cpp +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * TemporalMan.cpp - * - * Created on: Apr 20, 2017 - * Author: Jason Wang - */ - -#include "TemporalMan.h" - -int TemporalMan::init(json p_jmsg) { return 0; } - -int TemporalMan::put(const void *p_data, json &p_jmsg) -{ - put_begin(p_data, p_jmsg); - put_end(p_data, p_jmsg); - return 0; -} - -int TemporalMan::get(void *p_data, json &p_jmsg) { return 0; } - -void TemporalMan::flush() {} - -void TemporalMan::transform(std::vector<char> &a_data, json &a_jmsg) {} diff --git a/source/dataman/TemporalMan.h b/source/dataman/TemporalMan.h deleted file mode 100644 index 1ced811da9df0e7b7d0e78b0e18d7c61e5da9a61..0000000000000000000000000000000000000000 --- a/source/dataman/TemporalMan.h +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * TemporalMan.h - * - * Created on: Apr 20, 2017 - * Author: Jason Wang - */ - -#ifndef DATAMAN_TEMPORALMAN_H_ -#define DATAMAN_TEMPORALMAN_H_ - -#include "CompressMan.h" - -class TemporalMan : public CompressMan -{ -public: - TemporalMan() = default; - virtual ~TemporalMan() = default; - virtual int init(json a_jmsg); - virtual int put(const void *a_data, json &a_jmsg); - virtual int get(void *a_data, json &a_jmsg); - virtual void flush(); - virtual void transform(std::vector<char> &a_data, json &a_jmsg); - std::string name() { return "TemporalMan"; } -}; - -extern "C" DataManBase *getMan() { return new TemporalMan; } - -#endif diff --git a/source/dataman/ZfpMan.cpp b/source/dataman/ZfpMan.cpp deleted file mode 100644 index 54c5ef48c799579c70c38464a3d43a75a7ed2f0a..0000000000000000000000000000000000000000 --- a/source/dataman/ZfpMan.cpp +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * ZfpMan.cpp - * - * Created on: Apr 20, 2017 - * Author: Jason Wang - */ - -#include "ZfpMan.h" - -#include <zfp.h> - -int ZfpMan::init(json a_jmsg) -{ - if (a_jmsg["compression_rate"].is_number()) - { - m_compression_rate = a_jmsg["compression_rate"].get<double>(); - } - return 0; -} - -int ZfpMan::put(const void *a_data, json &a_jmsg) -{ - put_begin(a_data, a_jmsg); - std::vector<char> compressed_data; - if (check_json(a_jmsg, {"doid", "var", "dtype", "putshape"}, "ZfpMan")) - { - compress(const_cast<void *>(a_data), compressed_data, a_jmsg); - } - put_end(compressed_data.data(), a_jmsg); - return 0; -} - -int ZfpMan::get(void *a_data, json &a_jmsg) { return 0; } - -void ZfpMan::flush() {} - -int ZfpMan::compress(void *a_input, std::vector<char> &a_output, json &a_jmsg) -{ - if (!a_jmsg["compression_rate"].is_number()) - { - a_jmsg["compression_rate"] = m_compression_rate; - } - std::string dtype = a_jmsg["dtype"]; - std::vector<size_t> shape = a_jmsg["putshape"].get<std::vector<size_t>>(); - int compression_rate = a_jmsg["compression_rate"].get<int>(); - - int status = 0; // return value: 0 = success - uint dim = 1; - zfp_type type = zfp_type_none; // array scalar type - zfp_field *field; // array meta data - zfp_stream *zfp; // compressed stream - size_t bufsize; // byte size of compressed buffer - bitstream *stream; // bit stream to write to or read from - size_t zfpsize; // byte size of compressed stream - - // allocate meta data for the 3D array a[nz][ny][nx] - if (dtype == "int") - { - type = zfp_type_int32; - } - else if (dtype == "long") - { - type = zfp_type_int64; - } - else if (dtype == "float") - { - type = zfp_type_float; - } - else if (dtype == "double") - { - type = zfp_type_double; - } - - switch (shape.size()) - { - case 3: - field = zfp_field_3d(a_input, type, shape[0], shape[1], shape[2]); - dim = 3; - break; - case 2: - field = zfp_field_2d(a_input, type, shape[0], shape[1]); - dim = 2; - break; - case 1: - field = zfp_field_1d(a_input, type, shape[0]); - break; - default: - field = zfp_field_1d(a_input, type, product(shape)); - } - - // allocate meta data for a compressed stream - zfp = zfp_stream_open(NULL); - - // set compression mode and parameters via one of three functions - zfp_stream_set_rate(zfp, compression_rate, type, dim, 0); - // zfp_stream_set_precision(zfp, m_precision, type); - // zfp_stream_set_accuracy(zfp, m_accuracy, type); - - // allocate buffer for compressed data - bufsize = zfp_stream_maximum_size(zfp, field); - a_output.resize(bufsize); - - // associate bit stream with allocated buffer - stream = stream_open(a_output.data(), bufsize); - zfp_stream_set_bit_stream(zfp, stream); - zfp_stream_rewind(zfp); - - // compress or decompress entire array - - zfpsize = zfp_compress(zfp, field); - - if (!zfpsize) - { - logging("ZFP compression failed!"); - status = 1; - } - - a_jmsg["compressed_size"] = bufsize; - a_jmsg["compression_method"] = "zfp"; - - // clean up - zfp_field_free(field); - zfp_stream_close(zfp); - stream_close(stream); - - return 0; -} - -int ZfpMan::decompress(void *a_input, std::vector<char> &a_output, json &a_jmsg) -{ - - std::string dtype = a_jmsg["dtype"]; - std::vector<size_t> shape = a_jmsg["putshape"].get<std::vector<size_t>>(); - int compression_rate = a_jmsg["compression_rate"].get<int>(); - - int status = 0; // return value: 0 = success - uint dim = 1; - zfp_type type = zfp_type_none; // array scalar type - zfp_field *field; // array meta data - zfp_stream *zfp; // compressed stream - size_t bufsize = a_jmsg["compressed_size"] - .get<size_t>(); // byte size of compressed buffer - bitstream *stream; // bit stream to write to or read from - size_t zfpsize; // byte size of compressed stream - - // allocate meta data for the 3D array a[nz][ny][nx] - if (dtype == "int") - { - type = zfp_type_int32; - } - else if (dtype == "long") - { - type = zfp_type_int64; - } - else if (dtype == "float") - { - type = zfp_type_float; - } - else if (dtype == "double") - { - type = zfp_type_double; - } - - a_output.resize(product(shape, dsize(dtype))); - - switch (shape.size()) - { - case 3: - field = - zfp_field_3d(a_output.data(), type, shape[0], shape[1], shape[2]); - dim = 3; - break; - case 2: - field = zfp_field_2d(a_output.data(), type, shape[0], shape[1]); - dim = 2; - break; - case 1: - field = zfp_field_1d(a_output.data(), type, shape[0]); - break; - default: - field = zfp_field_1d(a_output.data(), type, product(shape)); - } - - zfp = zfp_stream_open(NULL); - zfp_stream_set_rate(zfp, compression_rate, type, dim, 0); - stream = stream_open(a_input, bufsize); - zfp_stream_set_bit_stream(zfp, stream); - zfp_stream_rewind(zfp); - if (!zfp_decompress(zfp, field)) - { - fprintf(stderr, "decompression failed\n"); - status = 1; - } - zfp_field_free(field); - zfp_stream_close(zfp); - stream_close(stream); - - return 0; -} - -void ZfpMan::transform(std::vector<char> &a_data, json &a_jmsg) -{ - std::string dtype = a_jmsg["dtype"]; - std::vector<size_t> shape = a_jmsg["putshape"].get<std::vector<size_t>>(); - int compression_rate = a_jmsg["compression_rate"].get<int>(); - size_t putbytes = a_jmsg["putbytes"].get<size_t>(); - std::vector<char> output(putbytes); - - int status = 0; // return value: 0 = success - uint dim = 1; - zfp_type type = zfp_type_none; // array scalar type - zfp_field *field; // array meta data - zfp_stream *zfp; // compressed stream - size_t bufsize = a_jmsg["compressed_size"] - .get<size_t>(); // byte size of compressed buffer - bitstream *stream; // bit stream to write to or read from - size_t zfpsize; // byte size of compressed stream - - // allocate meta data for the 3D array a[nz][ny][nx] - if (dtype == "int") - { - type = zfp_type_int32; - } - else if (dtype == "long") - { - type = zfp_type_int64; - } - else if (dtype == "float") - { - type = zfp_type_float; - } - else if (dtype == "double") - { - type = zfp_type_double; - } - - switch (shape.size()) - { - case 3: - field = zfp_field_3d(output.data(), type, shape[0], shape[1], shape[2]); - dim = 3; - break; - case 2: - field = zfp_field_2d(output.data(), type, shape[0], shape[1]); - dim = 2; - break; - case 1: - field = zfp_field_1d(output.data(), type, shape[0]); - break; - default: - field = zfp_field_1d(output.data(), type, product(shape)); - } - - zfp = zfp_stream_open(NULL); - zfp_stream_set_rate(zfp, compression_rate, type, dim, 0); - stream = stream_open(a_data.data(), bufsize); - zfp_stream_set_bit_stream(zfp, stream); - zfp_stream_rewind(zfp); - if (!zfp_decompress(zfp, field)) - { - fprintf(stderr, "decompression failed\n"); - status = 1; - } - zfp_field_free(field); - zfp_stream_close(zfp); - stream_close(stream); - - a_data = output; -} diff --git a/source/dataman/ZfpMan.h b/source/dataman/ZfpMan.h deleted file mode 100644 index d2f27e65db5efeee89dc6f99bf076af2098b6a5d..0000000000000000000000000000000000000000 --- a/source/dataman/ZfpMan.h +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * ZfpMan.h - * - * Created on: Apr 20, 2017 - * Author: Jason Wang - */ - -#ifndef DATAMAN_ZFPMAN_H_ -#define DATAMAN_ZFPMAN_H_ - -#include "CompressMan.h" - -class ZfpMan : public CompressMan -{ -public: - ZfpMan() = default; - virtual ~ZfpMan() = default; - virtual int init(json a_jmsg); - virtual int put(const void *a_data, json &a_jmsg); - virtual int get(void *a_data, json &a_jmsg); - virtual void transform(std::vector<char> &a_data, json &a_jmsg); - virtual void flush(); - int compress(void *a_input, std::vector<char> &a_output, json &a_jmsg); - int decompress(void *a_input, std::vector<char> &a_output, json &a_jmsg); - std::string name() { return "ZfpMan"; } -private: - double m_compression_rate = 8; -}; - -extern "C" DataManBase *getMan() { return new ZfpMan; } - -#endif diff --git a/source/dataman/ZmqMan.cpp b/source/dataman/ZmqMan.cpp deleted file mode 100644 index 6300dfcd48ed81957d67b4ba633b6158371fb035..0000000000000000000000000000000000000000 --- a/source/dataman/ZmqMan.cpp +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * ZmqMan.cpp - * - * Created on: Apr 20, 2017 - * Author: Jason Wang - */ - -#include "ZmqMan.h" - -#include <sys/stat.h> -#include <unistd.h> - -#include "zmq.h" - -ZmqMan::~ZmqMan() -{ - if (zmq_data) - zmq_close(zmq_data); -} - -int ZmqMan::init(json a_jmsg) -{ - StreamMan::init(a_jmsg); - if (m_stream_mode == "sender") - { - zmq_data = zmq_socket(m_zmq_context, ZMQ_REQ); - std::string remote_address = - make_address(m_remote_ip, m_remote_port + 1, "tcp"); - zmq_connect(zmq_data, remote_address.c_str()); - logging("ZmqMan::init " + remote_address + " connected"); - } - else if (m_stream_mode == "receiver") - { - zmq_data = zmq_socket(m_zmq_context, ZMQ_REP); - std::string local_address = - make_address(m_local_ip, m_local_port + 1, "tcp"); - zmq_bind(zmq_data, local_address.c_str()); - logging("ZmqMan::init " + local_address + " bound"); - } - return 0; -} - -int ZmqMan::put(const void *a_data, json &a_jmsg) -{ - DataManBase::put_begin(a_data, a_jmsg); - StreamMan::put_stream(a_data, a_jmsg); - DataManBase::put_end(a_data, a_jmsg); - return 0; -} - -int ZmqMan::get(void *a_data, json &a_jmsg) { return 0; } - -void ZmqMan::on_put(std::shared_ptr<std::vector<char>> a_data) -{ - char ret[10]; - zmq_send(zmq_data, a_data->data(), a_data->size(), 0); - zmq_recv(zmq_data, ret, 10, 0); -} - -void ZmqMan::on_recv(json &a_jmsg) -{ - if (a_jmsg["operation"].get<std::string>() == "put") - { - size_t sendbytes = a_jmsg["sendbytes"].get<size_t>(); - std::vector<char> data(sendbytes); - int ret = zmq_recv(zmq_data, data.data(), sendbytes, 0); - zmq_send(zmq_data, "OK", 10, 0); - - // if data is compressed then call auto_transform to decompress - if (a_jmsg["compression_method"].is_string()) - { - if (a_jmsg["compression_method"].get<std::string>() != "null") - { - auto_transform(data, a_jmsg); - } - } - - if (a_jmsg["varshape"] == a_jmsg["putshape"]) - { - callback_direct(data.data(), a_jmsg); - } - else - { - m_cache.put(data.data(), a_jmsg); - } - } - else if (a_jmsg["operation"].get<std::string>() == "flush") - { - callback_cache(); - } -} diff --git a/source/dataman/ZmqMan.h b/source/dataman/ZmqMan.h deleted file mode 100644 index 9571f5d4927b7d7d84899c524804b2d7c7aa2dbb..0000000000000000000000000000000000000000 --- a/source/dataman/ZmqMan.h +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * ZmqMan.h - * - * Created on: Apr 20, 2017 - * Author: Jason Wang - */ - -#ifndef DATAMAN_ZMQMAN_H_ -#define DATAMAN_ZMQMAN_H_ - -#include "StreamMan.h" - -class ZmqMan : public StreamMan -{ -public: - ZmqMan() = default; - virtual ~ZmqMan(); - - virtual int init(json a_jmsg); - virtual int put(const void *a_data, json &a_jmsg); - virtual int get(void *a_data, json &a_jmsg); - virtual void transform(std::vector<char> &a_data, json &a_jmsg) {} - - virtual void on_recv(json &a_msg); - virtual void on_put(std::shared_ptr<std::vector<char>> a_data); - std::string name() { return "ZmqMan"; } - -private: - void *zmq_data = nullptr; -}; - -extern "C" DataManBase *getMan() { return new ZmqMan; } - -#endif