diff --git a/source/adios2/engine/dataman/DataManWriter.tcc b/source/adios2/engine/dataman/DataManWriter.tcc index f4b74f581dc1b686f3b20e3e3f30d38cc7dfffad..149830a06401752f41f710ab505ba74a627d47d1 100644 --- a/source/adios2/engine/dataman/DataManWriter.tcc +++ b/source/adios2/engine/dataman/DataManWriter.tcc @@ -30,9 +30,9 @@ void DataManWriter::WriteVariableCommon(Variable<T> &variable, const T *values) // This part will go away, this is just to monitor variables per rank - if (variable.m_Shape.empty()) + if (variable.m_Count.empty()) { - variable.m_Shape = variable.m_Count; + variable.m_Count = variable.m_Shape; } if (variable.m_Start.empty()) { diff --git a/source/dataman/CMakeLists.txt b/source/dataman/CMakeLists.txt index 5adafab771f64ee9ff140fea4a9bdaaa830f4f21..1acb8948776de361c1ca12c18a22c6d2fd784439 100644 --- a/source/dataman/CMakeLists.txt +++ b/source/dataman/CMakeLists.txt @@ -33,21 +33,16 @@ option(ADIOS_USE_DataMan_ZeroMQ "Enable ZeroMQ for DataMan" OFF) if(ADIOS_USE_DataMan_ZeroMQ) find_package(ZeroMQ REQUIRED) - # Manually add the ZeroMQ_INCLUDE_DIRS since object libs still don't support - # target usage requirements - add_library(streamman OBJECT StreamMan.h StreamMan.cpp) - target_include_directories(streamman PRIVATE ${ZeroMQ_INCLUDE_DIRS}) - add_library(zmqman MODULE + StreamMan.h StreamMan.cpp ZmqMan.h ZmqMan.cpp - $<TARGET_OBJECTS:streamman> ) target_link_libraries(zmqman PRIVATE dataman ZeroMQ::ZMQ) list(APPEND dataman_targets zmqman) add_library(mdtmman MODULE + StreamMan.h StreamMan.cpp MdtmMan.h MdtmMan.cpp - $<TARGET_OBJECTS:streamman> ) target_link_libraries(mdtmman PRIVATE dataman ZeroMQ::ZMQ) list(APPEND dataman_targets mdtmman) diff --git a/source/dataman/CacheMan.cpp b/source/dataman/CacheMan.cpp index 46384219f1a6ff0eb77995ee4f3827bac479bc6a..f53cfaa6d56adcb4fa0fa0129bd8bde0ffc950fe 100644 --- a/source/dataman/CacheMan.cpp +++ b/source/dataman/CacheMan.cpp @@ -28,6 +28,7 @@ int CacheMan::put(const void *a_data, json a_jmsg) int CacheItem::put(const void *a_data, json a_jmsg) { + if (!m_initialized) { init(a_jmsg); diff --git a/source/dataman/CacheMan.h b/source/dataman/CacheMan.h index 6cfb50a950fb19e5d7201060eeeb9d535b2f225c..9668f014f3f484fec86f0660851a5bbff33d45a6 100644 --- a/source/dataman/CacheMan.h +++ b/source/dataman/CacheMan.h @@ -13,7 +13,7 @@ #include <queue> -#include "json.hpp" +#include <json.hpp> class CacheItem { diff --git a/source/dataman/DataManBase.cpp b/source/dataman/DataManBase.cpp index e386249e7df9564fb53d4151667ded61f07bd826..3012da8bba660730e131ef71e1228beb5502776c 100644 --- a/source/dataman/DataManBase.cpp +++ b/source/dataman/DataManBase.cpp @@ -150,8 +150,9 @@ int DataManBase::put_end(const void *p_data, json &p_jmsg) m_profiling["total_manager_time"] = m_profiling["total_manager_time"].get<double>() + duration.count(); m_profiling["total_mb"] = - m_profiling["total_mb"].get<size_t>() + + m_profiling["total_mb"].get<double>() + product(p_jmsg["varshape"], dsize(p_jmsg["dtype"])) / 1000000.0f; + std::cout << product(p_jmsg["varshape"], dsize(p_jmsg["dtype"])) << "\n"; duration = end - m_start_time; m_profiling["total_workflow_time"] = duration.count(); m_profiling["workflow_mbs"] = diff --git a/source/dataman/DataManBase.h b/source/dataman/DataManBase.h index dd8a8ff97b9970b421a740a116ceec82931062b4..8ea2db345b25aaf9eda7bff371163c5bf0b2a538 100644 --- a/source/dataman/DataManBase.h +++ b/source/dataman/DataManBase.h @@ -22,7 +22,7 @@ #include <unordered_map> #include <vector> -#include "json.hpp" +#include <json.hpp> class DataManBase { diff --git a/source/dataman/ZmqMan.cpp b/source/dataman/ZmqMan.cpp index 811baa1d52f08250520dc859b566c392c239c4c3..1832ca14d6046ccd67471c16f66bc8b0376ffaf1 100644 --- a/source/dataman/ZmqMan.cpp +++ b/source/dataman/ZmqMan.cpp @@ -24,18 +24,19 @@ ZmqMan::~ZmqMan() int ZmqMan::init(json a_jmsg) { StreamMan::init(a_jmsg); - zmq_data = zmq_socket(zmq_context, ZMQ_PAIR); - std::string local_address = - make_address(m_local_ip, m_local_port + 1, "tcp"); - std::string remote_address = - make_address(m_remote_ip, m_remote_port + 1, "tcp"); if (m_stream_mode == "sender") { + zmq_data = zmq_socket(zmq_context, ZMQ_REQ); + std::string remote_address = + make_address(m_remote_ip, m_remote_port + 1, "tcp"); zmq_connect(zmq_data, remote_address.c_str()); logging("ZmqMan::init " + remote_address + " connected"); } else if (m_stream_mode == "receiver") { + zmq_data = zmq_socket(zmq_context, ZMQ_REP); + std::string local_address = + make_address(m_local_ip, m_local_port + 1, "tcp"); zmq_bind(zmq_data, local_address.c_str()); logging("ZmqMan::init " + local_address + " bound"); } @@ -44,10 +45,12 @@ int ZmqMan::init(json a_jmsg) int ZmqMan::put(const void *a_data, json a_jmsg) { - put_begin(a_data, a_jmsg); + char ret[10]; + DataManBase::put_begin(a_data, a_jmsg); StreamMan::put(a_data, a_jmsg); zmq_send(zmq_data, a_data, a_jmsg["sendbytes"].get<size_t>(), 0); - put_end(a_data, a_jmsg); + zmq_recv(zmq_data, ret, 10, 0); + DataManBase::put_end(a_data, a_jmsg); return 0; } @@ -60,6 +63,7 @@ void ZmqMan::on_recv(json a_jmsg) size_t sendbytes = a_jmsg["sendbytes"].get<size_t>(); std::vector<char> data(sendbytes); int ret = zmq_recv(zmq_data, data.data(), sendbytes, 0); + zmq_send(zmq_data, "OK", 10, 0); if (a_jmsg["compression_method"].is_string() and a_jmsg["compression_method"].get<std::string>() != "null")