From 94b39f8cccdb42dac68276905ca52ff605c61dca Mon Sep 17 00:00:00 2001 From: Jason Wang <wangr1@ornl.gov> Date: Fri, 27 Oct 2017 13:32:33 -0400 Subject: [PATCH] fixed a few problems of DataMan/ZeroMQ, and added a simple Transport::Read() API to make the hello world workflow working again --- .../datamanReader/helloDataManReader.cpp | 4 +- .../adios2/engine/dataman/DataManReader.cpp | 2 +- source/adios2/engine/dataman/DataManReader.h | 2 +- .../adios2/engine/dataman/DataManWriter.tcc | 4 ++ .../adios2/toolkit/transport/wan/WANZmq.cpp | 7 +-- .../toolkit/transportman/dataman/DataMan.cpp | 51 ++++++++++++------- .../toolkit/transportman/dataman/DataMan.h | 3 +- 7 files changed, 44 insertions(+), 29 deletions(-) diff --git a/examples/hello/datamanReader/helloDataManReader.cpp b/examples/hello/datamanReader/helloDataManReader.cpp index 89f3e8472..12a7d91a6 100644 --- a/examples/hello/datamanReader/helloDataManReader.cpp +++ b/examples/hello/datamanReader/helloDataManReader.cpp @@ -26,7 +26,7 @@ void UserCallBack(const void *data, std::string doid, std::string var, std::size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1, std::multiplies<std::size_t>()); - for (unsigned int i = 0; i < varsize; ++i) + for (size_t i = 0; i < varsize; ++i) std::cout << ((float *)data)[i] << " "; std::cout << std::endl; } @@ -60,7 +60,7 @@ int main(int argc, char *argv[]) dataManReader->SetCallBack(UserCallBack); - for (unsigned int i = 0; i < 3; ++i) + for (unsigned int i = 0; i < 30; ++i) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } diff --git a/source/adios2/engine/dataman/DataManReader.cpp b/source/adios2/engine/dataman/DataManReader.cpp index 7d6a05359..a357490ef 100644 --- a/source/adios2/engine/dataman/DataManReader.cpp +++ b/source/adios2/engine/dataman/DataManReader.cpp @@ -23,7 +23,7 @@ DataManReader::DataManReader(IO &io, const std::string &name, Init(); } -void DataManReader::SetCallback( +void DataManReader::SetCallBack( std::function<void(const void *, std::string, std::string, std::string, Dims)> callback) diff --git a/source/adios2/engine/dataman/DataManReader.h b/source/adios2/engine/dataman/DataManReader.h index c1c2c42d1..dafa6b8dc 100644 --- a/source/adios2/engine/dataman/DataManReader.h +++ b/source/adios2/engine/dataman/DataManReader.h @@ -45,7 +45,7 @@ public: * @param callback function (get) provided by the user to be applied in * DataMan */ - void SetCallback(std::function<void(const void *, std::string, std::string, + void SetCallBack(std::function<void(const void *, std::string, std::string, std::string, Dims)> callback); diff --git a/source/adios2/engine/dataman/DataManWriter.tcc b/source/adios2/engine/dataman/DataManWriter.tcc index e12f03da5..c11435b6d 100644 --- a/source/adios2/engine/dataman/DataManWriter.tcc +++ b/source/adios2/engine/dataman/DataManWriter.tcc @@ -31,6 +31,10 @@ void DataManWriter::DoWriteCommon(Variable<T> &variable, const T *values) // This part will go away, this is just to monitor variables per rank + if (variable.m_Shape.empty()) + { + variable.m_Shape = variable.m_Count; + } if (variable.m_Count.empty()) { variable.m_Count = variable.m_Shape; diff --git a/source/adios2/toolkit/transport/wan/WANZmq.cpp b/source/adios2/toolkit/transport/wan/WANZmq.cpp index 7d002374d..0a13a18d5 100644 --- a/source/adios2/toolkit/transport/wan/WANZmq.cpp +++ b/source/adios2/toolkit/transport/wan/WANZmq.cpp @@ -122,13 +122,10 @@ void WANZmq::Write(const char *buffer, size_t size) m_Profiler.Timers.at("write").Resume(); } - /* - int status = zmq_send(m_Socket, buffer, size, 0); char ret[10]; zmq_recv(m_Socket, ret, 10, 0); - if (m_Profiler.IsActive) { m_Profiler.Timers.at("write").Pause(); @@ -145,12 +142,12 @@ void WANZmq::Write(const char *buffer, size_t size) ", in call to WANZmq write\n"); } } - */ } void WANZmq::Read(char *buffer, size_t size) { - // TODO: Implement read function + zmq_recv(m_Socket, buffer, size, 0); + int status = zmq_send(m_Socket, "OK", 4, 0); } void WANZmq::Flush() {} diff --git a/source/adios2/toolkit/transportman/dataman/DataMan.cpp b/source/adios2/toolkit/transportman/dataman/DataMan.cpp index 95abf19ee..72fa4dd5b 100644 --- a/source/adios2/toolkit/transportman/dataman/DataMan.cpp +++ b/source/adios2/toolkit/transportman/dataman/DataMan.cpp @@ -34,12 +34,6 @@ void DataMan::OpenWANTransports(const std::string &name, { std::shared_ptr<Transport> wanTransport, controlTransport; - // to be removed - for (auto &i : parameters) - { - std::cout << i.first << " " << i.second << std::endl; - } - const std::string type( GetParameter("type", parameters, true, m_DebugMode, "")); @@ -67,7 +61,7 @@ void DataMan::OpenWANTransports(const std::string &name, messageName = name; } - if (type == "wan") // need to create directory + if (type == "wan") { if (trans == "zmq") { @@ -95,9 +89,16 @@ void DataMan::OpenWANTransports(const std::string &name, } wanTransport->Open(messageName, openMode); - m_Transports.push_back(std::move(wanTransport)); + m_Transports.push_back(wanTransport); controlTransport->Open(messageName, openMode); - m_ControlTransports.push_back(std::move(controlTransport)); + m_ControlTransports.push_back(controlTransport); + + if (openMode == OpenMode::Read) + { + m_Listening = true; + m_ControlThreads.push_back(std::thread( + &DataMan::ReadThread, this, wanTransport, controlTransport)); + } } } @@ -109,6 +110,8 @@ void DataMan::WriteWAN(const void *buffer, nlohmann::json jmsg) jmsg["bytes"].get<size_t>()); } +void DataMan::ReadWAN(void *buffer, nlohmann::json jmsg) {} + void DataMan::SetCallback(std::function<void(const void *, std::string, std::string, std::string, Dims)> callback) @@ -121,18 +124,28 @@ void DataMan::ReadThread(std::shared_ptr<Transport> trans, { while (m_Listening) { - // Wait for Read API to be implemented - /* - if (ctl_trans->Read() >= 0) - { - std::string smsg; - nlohmann::json jmsg = json::parse(smsg); - } - else + char buffer[1024]; + size_t bytes; + nlohmann::json jmsg; + ctl_trans->Read(buffer, 1024); + std::string smsg(buffer); + jmsg = nlohmann::json::parse(smsg); + bytes = jmsg.value("bytes", 0); + + if (bytes > 0) { - usleep(1); + char data[bytes]; + trans->Read(data, bytes); + std::string doid = jmsg.value("doid", "Unknown Data Object"); + std::string var = jmsg.value("var", "Unknown Variable"); + std::string dtype = jmsg.value("dtype", "Unknown Data Type"); + std::vector<size_t> putshape = + jmsg.value("putshape", std::vector<size_t>()); + if (m_CallBack) + { + m_CallBack(data, doid, var, dtype, putshape); + } } - */ } } diff --git a/source/adios2/toolkit/transportman/dataman/DataMan.h b/source/adios2/toolkit/transportman/dataman/DataMan.h index 8466068fa..9366e8a66 100644 --- a/source/adios2/toolkit/transportman/dataman/DataMan.h +++ b/source/adios2/toolkit/transportman/dataman/DataMan.h @@ -33,6 +33,7 @@ public: const bool profile); void WriteWAN(const void *buffer, nlohmann::json jmsg); + void ReadWAN(void *buffer, nlohmann::json jmsg); void SetCallback(std::function<void(const void *, std::string, std::string, std::string, Dims)> @@ -49,7 +50,7 @@ private: std::function<void(const void *, std::string, std::string, std::string, Dims)> - m_CallBack; + m_CallBack = nullptr; nlohmann::json m_JMessage; -- GitLab