From 33a1264f6a71dffc772e666e9b40b9efc413b54a Mon Sep 17 00:00:00 2001 From: Jason Wang <wangr1@ornl.gov> Date: Wed, 10 May 2017 11:25:48 -0400 Subject: [PATCH] fixed a few DataMan problems caused by recent merges --- .../adios2/engine/dataman/DataManWriter.tcc | 4 +- source/dataman/CMakeLists.txt | 63 +++++++++---------- source/dataman/CacheMan.cpp | 1 + source/dataman/CacheMan.h | 2 +- source/dataman/DataManBase.h | 2 +- source/dataman/ZmqMan.cpp | 18 +++--- 6 files changed, 45 insertions(+), 45 deletions(-) diff --git a/source/adios2/engine/dataman/DataManWriter.tcc b/source/adios2/engine/dataman/DataManWriter.tcc index f4b74f581..149830a06 100644 --- a/source/adios2/engine/dataman/DataManWriter.tcc +++ b/source/adios2/engine/dataman/DataManWriter.tcc @@ -30,9 +30,9 @@ void DataManWriter::WriteVariableCommon(Variable<T> &variable, const T *values) // This part will go away, this is just to monitor variables per rank - if (variable.m_Shape.empty()) + if (variable.m_Count.empty()) { - variable.m_Shape = variable.m_Count; + variable.m_Count = variable.m_Shape; } if (variable.m_Start.empty()) { diff --git a/source/dataman/CMakeLists.txt b/source/dataman/CMakeLists.txt index 5adafab77..17f5095ba 100644 --- a/source/dataman/CMakeLists.txt +++ b/source/dataman/CMakeLists.txt @@ -6,15 +6,15 @@ set(dataman_targets) add_library(dataman - DataManBase.cpp DataManBase.h - DataMan.cpp DataMan.h - CacheMan.cpp CacheMan.h -) + DataManBase.cpp DataManBase.h + DataMan.cpp DataMan.h + CacheMan.cpp CacheMan.h + ) target_include_directories(dataman PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}) target_link_libraries(dataman - PRIVATE adios2sys - PUBLIC NLohmannJson -) + PRIVATE adios2sys + PUBLIC NLohmannJson + ) list(APPEND dataman_targets dataman) # Add the dataman plugins as MODULE libraries instead of SHARED libraries. @@ -31,42 +31,37 @@ list(APPEND dataman_targets temporalman) option(ADIOS_USE_DataMan_ZeroMQ "Enable ZeroMQ for DataMan" OFF) if(ADIOS_USE_DataMan_ZeroMQ) - find_package(ZeroMQ REQUIRED) + find_package(ZeroMQ REQUIRED) - # Manually add the ZeroMQ_INCLUDE_DIRS since object libs still don't support - # target usage requirements - add_library(streamman OBJECT StreamMan.h StreamMan.cpp) - target_include_directories(streamman PRIVATE ${ZeroMQ_INCLUDE_DIRS}) + add_library(zmqman MODULE + StreamMan.h StreamMan.cpp + ZmqMan.h ZmqMan.cpp + ) + target_link_libraries(zmqman PRIVATE dataman ZeroMQ::ZMQ) + list(APPEND dataman_targets zmqman) - add_library(zmqman MODULE - ZmqMan.h ZmqMan.cpp - $<TARGET_OBJECTS:streamman> - ) - target_link_libraries(zmqman PRIVATE dataman ZeroMQ::ZMQ) - list(APPEND dataman_targets zmqman) - - add_library(mdtmman MODULE - MdtmMan.h MdtmMan.cpp - $<TARGET_OBJECTS:streamman> - ) - target_link_libraries(mdtmman PRIVATE dataman ZeroMQ::ZMQ) - list(APPEND dataman_targets mdtmman) + add_library(mdtmman MODULE + StreamMan.h StreamMan.cpp + MdtmMan.h MdtmMan.cpp + ) + target_link_libraries(mdtmman PRIVATE dataman ZeroMQ::ZMQ) + list(APPEND dataman_targets mdtmman) endif() set(ADIOS_USE_DataMan_ZFP ${ADIOS_USE_ZFP} CACHE INTERNAL "Enable ZFP for DataMan" FORCE) if(ADIOS_USE_DataMan_ZFP) - find_package(ZFP REQUIRED) + find_package(ZFP REQUIRED) - add_library(zfpman MODULE ZfpMan.h ZfpMan.cpp) - target_link_libraries(zfpman PRIVATE dataman zfp::zfp) + add_library(zfpman MODULE ZfpMan.h ZfpMan.cpp) + target_link_libraries(zfpman PRIVATE dataman zfp::zfp) - list(APPEND dataman_targets zfpman) + list(APPEND dataman_targets zfpman) endif() install( - TARGETS ${dataman_targets} EXPORT adios2 - RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} - LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} - ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} -) + TARGETS ${dataman_targets} EXPORT adios2 + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} + ) diff --git a/source/dataman/CacheMan.cpp b/source/dataman/CacheMan.cpp index 46384219f..f53cfaa6d 100644 --- a/source/dataman/CacheMan.cpp +++ b/source/dataman/CacheMan.cpp @@ -28,6 +28,7 @@ int CacheMan::put(const void *a_data, json a_jmsg) int CacheItem::put(const void *a_data, json a_jmsg) { + if (!m_initialized) { init(a_jmsg); diff --git a/source/dataman/CacheMan.h b/source/dataman/CacheMan.h index 6cfb50a95..9668f014f 100644 --- a/source/dataman/CacheMan.h +++ b/source/dataman/CacheMan.h @@ -13,7 +13,7 @@ #include <queue> -#include "json.hpp" +#include <json.hpp> class CacheItem { diff --git a/source/dataman/DataManBase.h b/source/dataman/DataManBase.h index dd8a8ff97..8ea2db345 100644 --- a/source/dataman/DataManBase.h +++ b/source/dataman/DataManBase.h @@ -22,7 +22,7 @@ #include <unordered_map> #include <vector> -#include "json.hpp" +#include <json.hpp> class DataManBase { diff --git a/source/dataman/ZmqMan.cpp b/source/dataman/ZmqMan.cpp index 811baa1d5..1832ca14d 100644 --- a/source/dataman/ZmqMan.cpp +++ b/source/dataman/ZmqMan.cpp @@ -24,18 +24,19 @@ ZmqMan::~ZmqMan() int ZmqMan::init(json a_jmsg) { StreamMan::init(a_jmsg); - zmq_data = zmq_socket(zmq_context, ZMQ_PAIR); - std::string local_address = - make_address(m_local_ip, m_local_port + 1, "tcp"); - std::string remote_address = - make_address(m_remote_ip, m_remote_port + 1, "tcp"); if (m_stream_mode == "sender") { + zmq_data = zmq_socket(zmq_context, ZMQ_REQ); + std::string remote_address = + make_address(m_remote_ip, m_remote_port + 1, "tcp"); zmq_connect(zmq_data, remote_address.c_str()); logging("ZmqMan::init " + remote_address + " connected"); } else if (m_stream_mode == "receiver") { + zmq_data = zmq_socket(zmq_context, ZMQ_REP); + std::string local_address = + make_address(m_local_ip, m_local_port + 1, "tcp"); zmq_bind(zmq_data, local_address.c_str()); logging("ZmqMan::init " + local_address + " bound"); } @@ -44,10 +45,12 @@ int ZmqMan::init(json a_jmsg) int ZmqMan::put(const void *a_data, json a_jmsg) { - put_begin(a_data, a_jmsg); + char ret[10]; + DataManBase::put_begin(a_data, a_jmsg); StreamMan::put(a_data, a_jmsg); zmq_send(zmq_data, a_data, a_jmsg["sendbytes"].get<size_t>(), 0); - put_end(a_data, a_jmsg); + zmq_recv(zmq_data, ret, 10, 0); + DataManBase::put_end(a_data, a_jmsg); return 0; } @@ -60,6 +63,7 @@ void ZmqMan::on_recv(json a_jmsg) size_t sendbytes = a_jmsg["sendbytes"].get<size_t>(); std::vector<char> data(sendbytes); int ret = zmq_recv(zmq_data, data.data(), sendbytes, 0); + zmq_send(zmq_data, "OK", 10, 0); if (a_jmsg["compression_method"].is_string() and a_jmsg["compression_method"].get<std::string>() != "null") -- GitLab