diff --git a/examples/hello/datamanReader/helloDataManReader.cpp b/examples/hello/datamanReader/helloDataManReader.cpp index fbbee475e0e83ac8198c93677ec2b9ac3b876d96..5c8fadbaec57396997c26a715612ee3aecf871d2 100644 --- a/examples/hello/datamanReader/helloDataManReader.cpp +++ b/examples/hello/datamanReader/helloDataManReader.cpp @@ -63,7 +63,7 @@ int main(int argc, char *argv[]) adios2::Engine &dataManReader = dataManIO.Open("myDoubles.bp", adios2::Mode::Read); - for (unsigned int i = 0; i < 3; ++i) + for (unsigned int i = 0; i < 10; ++i) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } diff --git a/source/adios2/engine/dataman/DataManReader.cpp b/source/adios2/engine/dataman/DataManReader.cpp index 2c6fa387b9c2f9aaf1ee0817350e9336dea1d43a..5ade03a3a1ee20fd9ce1b7facb561c3012f754a5 100644 --- a/source/adios2/engine/dataman/DataManReader.cpp +++ b/source/adios2/engine/dataman/DataManReader.cpp @@ -75,11 +75,19 @@ void DataManReader::Init() for (unsigned int i = 0; i < parameters.size(); i++) { parameters[i]["type"] = "wan"; - parameters[i]["transport"] = "zmq"; + parameters[i]["Library"] = "zmq"; parameters[i]["name"] = "stream"; - parameters[i]["ipaddress"] = "127.0.0.1"; + parameters[i]["IPAddress"] = "127.0.0.1"; } m_Man.OpenWANTransports("zmq", Mode::Read, parameters, true); + for (auto &j : m_IO.m_Operators) + { + if (j.ADIOSOperator.m_Type == "Signature2") + { + m_Man.SetCallback(j.ADIOSOperator); + break; + } + } std::string methodType; int numChannels = 0; diff --git a/source/adios2/engine/dataman/DataManReader.h b/source/adios2/engine/dataman/DataManReader.h index 7b7ec1934d9692216b9f3571bd027f6f0936ed47..ee64d3c203c5cbbc6677064bf5c70c23ef841b18 100644 --- a/source/adios2/engine/dataman/DataManReader.h +++ b/source/adios2/engine/dataman/DataManReader.h @@ -40,23 +40,11 @@ public: virtual ~DataManReader() = default; - /** - * Set callback function from user application - * @param callback function (get) provided by the user to be applied in - * DataMan - */ - void SetCallBack(std::function<void(const void *, std::string, std::string, - std::string, Dims)> - callback); - void Close(const int transportIndex = -1); private: bool m_DoRealTime = false; transportman::DataMan m_Man; - std::function<void(const void *, std::string, std::string, std::string, - Dims)> - m_CallBack; ///< call back function void Init(); diff --git a/source/adios2/engine/dataman/DataManWriter.cpp b/source/adios2/engine/dataman/DataManWriter.cpp index bb90bd6fb88c00c1efe5d9dd1a825e0e4d033b4c..5430f4b182924a062c5c76ed5a847637d960385c 100644 --- a/source/adios2/engine/dataman/DataManWriter.cpp +++ b/source/adios2/engine/dataman/DataManWriter.cpp @@ -95,9 +95,9 @@ void DataManWriter::Init() for (unsigned int i = 0; i < parameters.size(); i++) { parameters[i]["type"] = "wan"; - parameters[i]["transport"] = "zmq"; + parameters[i]["Library"] = "zmq"; parameters[i]["name"] = "stream"; - parameters[i]["ipaddress"] = "127.0.0.1"; + parameters[i]["IPAddress"] = "127.0.0.1"; } m_Man.OpenWANTransports("zmq", Mode::Write, parameters, true); diff --git a/source/adios2/engine/dataman/DataManWriter.h b/source/adios2/engine/dataman/DataManWriter.h index c690770d402757274630695044e3bc9c43564f32..5f67a4bb1f111bcaf13783fa7f90ca15884fe090 100644 --- a/source/adios2/engine/dataman/DataManWriter.h +++ b/source/adios2/engine/dataman/DataManWriter.h @@ -36,10 +36,6 @@ private: bool m_DoRealTime = false; bool m_DoMonitor = false; transportman::DataMan m_Man; - std::function<void(const void *, std::string, std::string, std::string, - Dims)> - m_CallBack; ///< call back function - void Init(); ///< calls InitCapsules and InitTransports based on Method, /// called from constructor diff --git a/source/adios2/toolkit/transport/wan/WANZmq.cpp b/source/adios2/toolkit/transport/wan/WANZmq.cpp index 96d77799a8d8b887e4c317773d750bcb85a30b8f..690adf2b301ae53805e66c1421304f9e8368da2d 100644 --- a/source/adios2/toolkit/transport/wan/WANZmq.cpp +++ b/source/adios2/toolkit/transport/wan/WANZmq.cpp @@ -126,7 +126,8 @@ void WANZmq::Write(const char *buffer, size_t size, size_t start) void WANZmq::Read(char *buffer, size_t size, size_t start) { - // TODO: Implement read function + zmq_recv(m_Socket, buffer, size, 0); + int status = zmq_send(m_Socket, "OK", 4, 0); } void WANZmq::Flush() {} diff --git a/source/adios2/toolkit/transportman/dataman/DataMan.cpp b/source/adios2/toolkit/transportman/dataman/DataMan.cpp index fe8c13d135c1758bad13591e49223d927eca56c0..690c49a93ae842831b4436c7e97b5ee27577e7a2 100644 --- a/source/adios2/toolkit/transportman/dataman/DataMan.cpp +++ b/source/adios2/toolkit/transportman/dataman/DataMan.cpp @@ -72,20 +72,20 @@ void DataMan::OpenWANTransports(const std::string &name, const Mode mode, #ifdef ADIOS2_HAVE_ZEROMQ wanTransport = std::make_shared<transport::WANZmq>( ipAddress, portData, m_MPIComm, m_DebugMode); + wanTransport->Open(messageName, mode); + m_Transports.emplace(counter, wanTransport); + controlTransport = std::make_shared<transport::WANZmq>( ipAddress, portControl, m_MPIComm, m_DebugMode); - - wanTransport->Open(messageName, mode); - m_Transports.emplace(counter, std::move(wanTransport)); controlTransport->Open(messageName, mode); - m_ControlTransports.push_back(std::move(controlTransport)); + m_ControlTransports.emplace_back(controlTransport); if (mode == Mode::Read) { m_Listening = true; - m_ControlThreads.push_back(std::thread(&DataMan::ReadThread, - this, wanTransport, - controlTransport)); + m_ControlThreads.emplace_back( + std::thread(&DataMan::ReadThread, this, wanTransport, + controlTransport)); } ++counter; @@ -120,11 +120,9 @@ void DataMan::WriteWAN(const void *buffer, nlohmann::json jmsg) void DataMan::ReadWAN(void *buffer, nlohmann::json jmsg) {} -void DataMan::SetCallback(std::function<void(const void *, std::string, - std::string, std::string, Dims)> - callback) +void DataMan::SetCallback(adios2::Operator &callback) { - m_CallBack = callback; + m_Callback = &callback; } void DataMan::ReadThread(std::shared_ptr<Transport> trans, @@ -133,13 +131,12 @@ void DataMan::ReadThread(std::shared_ptr<Transport> trans, while (m_Listening) { char buffer[1024]; - size_t bytes; + size_t bytes = 0; nlohmann::json jmsg; - ctl_trans->Read(buffer, 1024); + ctl_trans->Read(buffer, 1024, 0); std::string smsg(buffer); jmsg = nlohmann::json::parse(smsg); bytes = jmsg.value("bytes", 0); - if (bytes > 0) { std::vector<char> data(bytes); @@ -149,9 +146,10 @@ void DataMan::ReadThread(std::shared_ptr<Transport> trans, std::string dtype = jmsg.value("dtype", "Unknown Data Type"); std::vector<size_t> putshape = jmsg.value("putshape", std::vector<size_t>()); - if (m_CallBack) + if (m_Callback != nullptr && m_Callback->m_Type == "Signature2") { - m_CallBack(data.data(), doid, var, dtype, putshape); + m_Callback->RunCallback2(data.data(), doid, var, dtype, + putshape); } } } diff --git a/source/adios2/toolkit/transportman/dataman/DataMan.h b/source/adios2/toolkit/transportman/dataman/DataMan.h index 6e69ba1ea3493c193b454458de234724fec4a0b3..e3648f74576679c36818f5ebf6b11ffe5b908a2c 100644 --- a/source/adios2/toolkit/transportman/dataman/DataMan.h +++ b/source/adios2/toolkit/transportman/dataman/DataMan.h @@ -11,6 +11,7 @@ #ifndef ADIOS2_TOOLKIT_TRANSPORTMAN_DATAMAN_DATAMAN_H_ #define ADIOS2_TOOLKIT_TRANSPORTMAN_DATAMAN_DATAMAN_H_ +#include "adios2/core/Operator.h" #include "adios2/toolkit/transportman/TransportMan.h" #include <json.hpp> #include <thread> @@ -35,11 +36,10 @@ public: void WriteWAN(const void *buffer, nlohmann::json jmsg); void ReadWAN(void *buffer, nlohmann::json jmsg); - void SetCallback(std::function<void(const void *, std::string, std::string, - std::string, Dims)> - callback); + void SetCallback(adios2::Operator &callback); private: + adios2::Operator *m_Callback = nullptr; void ReadThread(std::shared_ptr<Transport> trans, std::shared_ptr<Transport> ctl_trans); @@ -47,11 +47,6 @@ private: std::vector<std::thread> m_ControlThreads; size_t m_CurrentTransport = 0; bool m_Listening = false; - - std::function<void(const void *, std::string, std::string, std::string, - Dims)> - m_CallBack = nullptr; - nlohmann::json m_JMessage; /** Pick the appropriate default */