diff --git a/examples/hello/datamanReader/helloDataManReader.cpp b/examples/hello/datamanReader/helloDataManReader.cpp index 89f3e8472f3bca26141747913b7c0c1cdb18e6a9..12a7d91a6a4480396b1c646e3c4aba83e55d8ef7 100644 --- a/examples/hello/datamanReader/helloDataManReader.cpp +++ b/examples/hello/datamanReader/helloDataManReader.cpp @@ -26,7 +26,7 @@ void UserCallBack(const void *data, std::string doid, std::string var, std::size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1, std::multiplies<std::size_t>()); - for (unsigned int i = 0; i < varsize; ++i) + for (size_t i = 0; i < varsize; ++i) std::cout << ((float *)data)[i] << " "; std::cout << std::endl; } @@ -60,7 +60,7 @@ int main(int argc, char *argv[]) dataManReader->SetCallBack(UserCallBack); - for (unsigned int i = 0; i < 3; ++i) + for (unsigned int i = 0; i < 30; ++i) { std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } diff --git a/source/adios2/engine/dataman/DataManReader.cpp b/source/adios2/engine/dataman/DataManReader.cpp index 7d6a053595f53dad9a8d2aaa51831dbcda12f948..a357490ef47c97b811397a14c85560c84c736700 100644 --- a/source/adios2/engine/dataman/DataManReader.cpp +++ b/source/adios2/engine/dataman/DataManReader.cpp @@ -23,7 +23,7 @@ DataManReader::DataManReader(IO &io, const std::string &name, Init(); } -void DataManReader::SetCallback( +void DataManReader::SetCallBack( std::function<void(const void *, std::string, std::string, std::string, Dims)> callback) diff --git a/source/adios2/engine/dataman/DataManReader.h b/source/adios2/engine/dataman/DataManReader.h index c1c2c42d1403213e1530e1a14fac856f2d983a66..dafa6b8dcdaede63f38ffd9faff0b360fa6122c5 100644 --- a/source/adios2/engine/dataman/DataManReader.h +++ b/source/adios2/engine/dataman/DataManReader.h @@ -45,7 +45,7 @@ public: * @param callback function (get) provided by the user to be applied in * DataMan */ - void SetCallback(std::function<void(const void *, std::string, std::string, + void SetCallBack(std::function<void(const void *, std::string, std::string, std::string, Dims)> callback); diff --git a/source/adios2/engine/dataman/DataManWriter.tcc b/source/adios2/engine/dataman/DataManWriter.tcc index e12f03da5ad774cce459bbbcfa714bc3c8d009ff..c11435b6ddc48b87ebb8199c7f528eec6051c08b 100644 --- a/source/adios2/engine/dataman/DataManWriter.tcc +++ b/source/adios2/engine/dataman/DataManWriter.tcc @@ -31,6 +31,10 @@ void DataManWriter::DoWriteCommon(Variable<T> &variable, const T *values) // This part will go away, this is just to monitor variables per rank + if (variable.m_Shape.empty()) + { + variable.m_Shape = variable.m_Count; + } if (variable.m_Count.empty()) { variable.m_Count = variable.m_Shape; diff --git a/source/adios2/toolkit/transport/wan/WANZmq.cpp b/source/adios2/toolkit/transport/wan/WANZmq.cpp index 7d002374db9377b786eb74b1d83506fe7bda9a74..0a13a18d5f9d72756f01498500c531220d86bbc5 100644 --- a/source/adios2/toolkit/transport/wan/WANZmq.cpp +++ b/source/adios2/toolkit/transport/wan/WANZmq.cpp @@ -122,13 +122,10 @@ void WANZmq::Write(const char *buffer, size_t size) m_Profiler.Timers.at("write").Resume(); } - /* - int status = zmq_send(m_Socket, buffer, size, 0); char ret[10]; zmq_recv(m_Socket, ret, 10, 0); - if (m_Profiler.IsActive) { m_Profiler.Timers.at("write").Pause(); @@ -145,12 +142,12 @@ void WANZmq::Write(const char *buffer, size_t size) ", in call to WANZmq write\n"); } } - */ } void WANZmq::Read(char *buffer, size_t size) { - // TODO: Implement read function + zmq_recv(m_Socket, buffer, size, 0); + int status = zmq_send(m_Socket, "OK", 4, 0); } void WANZmq::Flush() {} diff --git a/source/adios2/toolkit/transportman/dataman/DataMan.cpp b/source/adios2/toolkit/transportman/dataman/DataMan.cpp index 95abf19ee8c735b42293c6084383dcb5a4005fd1..72fa4dd5ba73af5fd2ad2f52c7d9475175f61e82 100644 --- a/source/adios2/toolkit/transportman/dataman/DataMan.cpp +++ b/source/adios2/toolkit/transportman/dataman/DataMan.cpp @@ -34,12 +34,6 @@ void DataMan::OpenWANTransports(const std::string &name, { std::shared_ptr<Transport> wanTransport, controlTransport; - // to be removed - for (auto &i : parameters) - { - std::cout << i.first << " " << i.second << std::endl; - } - const std::string type( GetParameter("type", parameters, true, m_DebugMode, "")); @@ -67,7 +61,7 @@ void DataMan::OpenWANTransports(const std::string &name, messageName = name; } - if (type == "wan") // need to create directory + if (type == "wan") { if (trans == "zmq") { @@ -95,9 +89,16 @@ void DataMan::OpenWANTransports(const std::string &name, } wanTransport->Open(messageName, openMode); - m_Transports.push_back(std::move(wanTransport)); + m_Transports.push_back(wanTransport); controlTransport->Open(messageName, openMode); - m_ControlTransports.push_back(std::move(controlTransport)); + m_ControlTransports.push_back(controlTransport); + + if (openMode == OpenMode::Read) + { + m_Listening = true; + m_ControlThreads.push_back(std::thread( + &DataMan::ReadThread, this, wanTransport, controlTransport)); + } } } @@ -109,6 +110,8 @@ void DataMan::WriteWAN(const void *buffer, nlohmann::json jmsg) jmsg["bytes"].get<size_t>()); } +void DataMan::ReadWAN(void *buffer, nlohmann::json jmsg) {} + void DataMan::SetCallback(std::function<void(const void *, std::string, std::string, std::string, Dims)> callback) @@ -121,18 +124,28 @@ void DataMan::ReadThread(std::shared_ptr<Transport> trans, { while (m_Listening) { - // Wait for Read API to be implemented - /* - if (ctl_trans->Read() >= 0) - { - std::string smsg; - nlohmann::json jmsg = json::parse(smsg); - } - else + char buffer[1024]; + size_t bytes; + nlohmann::json jmsg; + ctl_trans->Read(buffer, 1024); + std::string smsg(buffer); + jmsg = nlohmann::json::parse(smsg); + bytes = jmsg.value("bytes", 0); + + if (bytes > 0) { - usleep(1); + char data[bytes]; + trans->Read(data, bytes); + std::string doid = jmsg.value("doid", "Unknown Data Object"); + std::string var = jmsg.value("var", "Unknown Variable"); + std::string dtype = jmsg.value("dtype", "Unknown Data Type"); + std::vector<size_t> putshape = + jmsg.value("putshape", std::vector<size_t>()); + if (m_CallBack) + { + m_CallBack(data, doid, var, dtype, putshape); + } } - */ } } diff --git a/source/adios2/toolkit/transportman/dataman/DataMan.h b/source/adios2/toolkit/transportman/dataman/DataMan.h index 8466068fa98c96ac2b1fc97a88970ff505efa53e..9366e8a66c17d58e91f20aef9412bfeb81e93b14 100644 --- a/source/adios2/toolkit/transportman/dataman/DataMan.h +++ b/source/adios2/toolkit/transportman/dataman/DataMan.h @@ -33,6 +33,7 @@ public: const bool profile); void WriteWAN(const void *buffer, nlohmann::json jmsg); + void ReadWAN(void *buffer, nlohmann::json jmsg); void SetCallback(std::function<void(const void *, std::string, std::string, std::string, Dims)> @@ -49,7 +50,7 @@ private: std::function<void(const void *, std::string, std::string, std::string, Dims)> - m_CallBack; + m_CallBack = nullptr; nlohmann::json m_JMessage;