diff --git a/source/adios2/engine/dataman/DataManWriter.tcc b/source/adios2/engine/dataman/DataManWriter.tcc index f4b74f581dc1b686f3b20e3e3f30d38cc7dfffad..149830a06401752f41f710ab505ba74a627d47d1 100644 --- a/source/adios2/engine/dataman/DataManWriter.tcc +++ b/source/adios2/engine/dataman/DataManWriter.tcc @@ -30,9 +30,9 @@ void DataManWriter::WriteVariableCommon(Variable<T> &variable, const T *values) // This part will go away, this is just to monitor variables per rank - if (variable.m_Shape.empty()) + if (variable.m_Count.empty()) { - variable.m_Shape = variable.m_Count; + variable.m_Count = variable.m_Shape; } if (variable.m_Start.empty()) { diff --git a/source/dataman/CMakeLists.txt b/source/dataman/CMakeLists.txt index 5adafab771f64ee9ff140fea4a9bdaaa830f4f21..17f5095ba4838a0b5765dc0465a2d7be342e3c1f 100644 --- a/source/dataman/CMakeLists.txt +++ b/source/dataman/CMakeLists.txt @@ -6,15 +6,15 @@ set(dataman_targets) add_library(dataman - DataManBase.cpp DataManBase.h - DataMan.cpp DataMan.h - CacheMan.cpp CacheMan.h -) + DataManBase.cpp DataManBase.h + DataMan.cpp DataMan.h + CacheMan.cpp CacheMan.h + ) target_include_directories(dataman PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) target_link_libraries(dataman - PRIVATE adios2sys - PUBLIC NLohmannJson -) + PRIVATE adios2sys + PUBLIC NLohmannJson + ) list(APPEND dataman_targets dataman) # Add the dataman plugins as MODULE libraries instead of SHARED libraries. @@ -31,42 +31,37 @@ list(APPEND dataman_targets temporalman) option(ADIOS_USE_DataMan_ZeroMQ "Enable ZeroMQ for DataMan" OFF) if(ADIOS_USE_DataMan_ZeroMQ) - find_package(ZeroMQ REQUIRED) + find_package(ZeroMQ REQUIRED) - # Manually add the ZeroMQ_INCLUDE_DIRS since object libs still don't support - # target usage requirements - add_library(streamman OBJECT StreamMan.h StreamMan.cpp) - target_include_directories(streamman PRIVATE ${ZeroMQ_INCLUDE_DIRS}) + add_library(zmqman MODULE + StreamMan.h StreamMan.cpp + ZmqMan.h ZmqMan.cpp + ) + target_link_libraries(zmqman PRIVATE dataman ZeroMQ::ZMQ) + list(APPEND dataman_targets zmqman) - add_library(zmqman MODULE - ZmqMan.h ZmqMan.cpp - $<TARGET_OBJECTS:streamman> - ) - target_link_libraries(zmqman PRIVATE dataman ZeroMQ::ZMQ) - list(APPEND dataman_targets zmqman) - - add_library(mdtmman MODULE - MdtmMan.h MdtmMan.cpp - $<TARGET_OBJECTS:streamman> - ) - target_link_libraries(mdtmman PRIVATE dataman ZeroMQ::ZMQ) - list(APPEND dataman_targets mdtmman) + add_library(mdtmman MODULE + StreamMan.h StreamMan.cpp + MdtmMan.h MdtmMan.cpp + ) + target_link_libraries(mdtmman PRIVATE dataman ZeroMQ::ZMQ) + list(APPEND dataman_targets mdtmman) endif() set(ADIOS_USE_DataMan_ZFP ${ADIOS_USE_ZFP} CACHE INTERNAL "Enable ZFP for DataMan" FORCE) if(ADIOS_USE_DataMan_ZFP) - find_package(ZFP REQUIRED) + find_package(ZFP REQUIRED) - add_library(zfpman MODULE ZfpMan.h ZfpMan.cpp) - target_link_libraries(zfpman PRIVATE dataman zfp::zfp) + add_library(zfpman MODULE ZfpMan.h ZfpMan.cpp) + target_link_libraries(zfpman PRIVATE dataman zfp::zfp) - list(APPEND dataman_targets zfpman) + list(APPEND dataman_targets zfpman) endif() install( - TARGETS ${dataman_targets} EXPORT adios2 - RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} - LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} - ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} -) + TARGETS ${dataman_targets} EXPORT adios2 + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} + ) diff --git a/source/dataman/CacheMan.cpp b/source/dataman/CacheMan.cpp index 46384219f1a6ff0eb77995ee4f3827bac479bc6a..f53cfaa6d56adcb4fa0fa0129bd8bde0ffc950fe 100644 --- a/source/dataman/CacheMan.cpp +++ b/source/dataman/CacheMan.cpp @@ -28,6 +28,7 @@ int CacheMan::put(const void *a_data, json a_jmsg) int CacheItem::put(const void *a_data, json a_jmsg) { + if (!m_initialized) { init(a_jmsg); diff --git a/source/dataman/CacheMan.h b/source/dataman/CacheMan.h index 6cfb50a950fb19e5d7201060eeeb9d535b2f225c..9668f014f3f484fec86f0660851a5bbff33d45a6 100644 --- a/source/dataman/CacheMan.h +++ b/source/dataman/CacheMan.h @@ -13,7 +13,7 @@ #include <queue> -#include "json.hpp" +#include <json.hpp> class CacheItem { diff --git a/source/dataman/DataManBase.h b/source/dataman/DataManBase.h index dd8a8ff97b9970b421a740a116ceec82931062b4..8ea2db345b25aaf9eda7bff371163c5bf0b2a538 100644 --- a/source/dataman/DataManBase.h +++ b/source/dataman/DataManBase.h @@ -22,7 +22,7 @@ #include <unordered_map> #include <vector> -#include "json.hpp" +#include <json.hpp> class DataManBase { diff --git a/source/dataman/ZmqMan.cpp b/source/dataman/ZmqMan.cpp index 811baa1d52f08250520dc859b566c392c239c4c3..1832ca14d6046ccd67471c16f66bc8b0376ffaf1 100644 --- a/source/dataman/ZmqMan.cpp +++ b/source/dataman/ZmqMan.cpp @@ -24,18 +24,19 @@ ZmqMan::~ZmqMan() int ZmqMan::init(json a_jmsg) { StreamMan::init(a_jmsg); - zmq_data = zmq_socket(zmq_context, ZMQ_PAIR); - std::string local_address = - make_address(m_local_ip, m_local_port + 1, "tcp"); - std::string remote_address = - make_address(m_remote_ip, m_remote_port + 1, "tcp"); if (m_stream_mode == "sender") { + zmq_data = zmq_socket(zmq_context, ZMQ_REQ); + std::string remote_address = + make_address(m_remote_ip, m_remote_port + 1, "tcp"); zmq_connect(zmq_data, remote_address.c_str()); logging("ZmqMan::init " + remote_address + " connected"); } else if (m_stream_mode == "receiver") { + zmq_data = zmq_socket(zmq_context, ZMQ_REP); + std::string local_address = + make_address(m_local_ip, m_local_port + 1, "tcp"); zmq_bind(zmq_data, local_address.c_str()); logging("ZmqMan::init " + local_address + " bound"); } @@ -44,10 +45,12 @@ int ZmqMan::init(json a_jmsg) int ZmqMan::put(const void *a_data, json a_jmsg) { - put_begin(a_data, a_jmsg); + char ret[10]; + DataManBase::put_begin(a_data, a_jmsg); StreamMan::put(a_data, a_jmsg); zmq_send(zmq_data, a_data, a_jmsg["sendbytes"].get<size_t>(), 0); - put_end(a_data, a_jmsg); + zmq_recv(zmq_data, ret, 10, 0); + DataManBase::put_end(a_data, a_jmsg); return 0; } @@ -60,6 +63,7 @@ void ZmqMan::on_recv(json a_jmsg) size_t sendbytes = a_jmsg["sendbytes"].get<size_t>(); std::vector<char> data(sendbytes); int ret = zmq_recv(zmq_data, data.data(), sendbytes, 0); + zmq_send(zmq_data, "OK", 10, 0); if (a_jmsg["compression_method"].is_string() and a_jmsg["compression_method"].get<std::string>() != "null")