diff --git a/cmake/GoogleTest.cmake b/cmake/GoogleTest.cmake new file mode 100644 index 0000000000000000000000000000000000000000..d13ae1dbcf1b54e9411bd3f22fa2125eac1838c0 --- /dev/null +++ b/cmake/GoogleTest.cmake @@ -0,0 +1,11 @@ +#------------------------------------------------------------------------------# +# Distributed under the OSI-approved Apache License, Version 2.0. See +# accompanying file Copyright.txt for details. +#------------------------------------------------------------------------------# + +# This module is already included in new versions of CMake +if(CMAKE_VERSION VERSION_LESS 3.10) + include(${CMAKE_CURRENT_LIST_DIR}/upstream/GoogleTest.cmake) +else() + include(${CMAKE_ROOT}/Modules/GoogleTest.cmake) +endif() diff --git a/cmake/upstream/GoogleTest.cmake b/cmake/upstream/GoogleTest.cmake new file mode 100644 index 0000000000000000000000000000000000000000..7415e068103305dca9321c4eda9f8ad48ecba501 --- /dev/null +++ b/cmake/upstream/GoogleTest.cmake @@ -0,0 +1,226 @@ +# Distributed under the OSI-approved BSD 3-Clause License. See accompanying +# file Copyright.txt or https://cmake.org/licensing for details. + +#[=======================================================================[.rst: +GoogleTest +---------- + +This module defines functions to help use the Google Test infrastructure. + +.. command:: gtest_add_tests + + Automatically add tests with CTest by scanning source code for Google Test + macros:: + + gtest_add_tests(TARGET target + [SOURCES src1...] + [EXTRA_ARGS arg1...] + [WORKING_DIRECTORY dir] + [TEST_PREFIX prefix] + [TEST_SUFFIX suffix] + [SKIP_DEPENDENCY] + [TEST_LIST outVar] + ) + + ``TARGET target`` + This must be a known CMake target. CMake will substitute the location of + the built executable when running the test. + + ``SOURCES src1...`` + When provided, only the listed files will be scanned for test cases. If + this option is not given, the :prop_tgt:`SOURCES` property of the + specified ``target`` will be used to obtain the list of sources. + + ``EXTRA_ARGS arg1...`` + Any extra arguments to pass on the command line to each test case. + + ``WORKING_DIRECTORY dir`` + Specifies the directory in which to run the discovered test cases. If this + option is not provided, the current binary directory is used. + + ``TEST_PREFIX prefix`` + Allows the specified ``prefix`` to be prepended to the name of each + discovered test case. This can be useful when the same source files are + being used in multiple calls to ``gtest_add_test()`` but with different + ``EXTRA_ARGS``. + + ``TEST_SUFFIX suffix`` + Similar to ``TEST_PREFIX`` except the ``suffix`` is appended to the name of + every discovered test case. Both ``TEST_PREFIX`` and ``TEST_SUFFIX`` can be + specified. + + ``SKIP_DEPENDENCY`` + Normally, the function creates a dependency which will cause CMake to be + re-run if any of the sources being scanned are changed. This is to ensure + that the list of discovered tests is updated. If this behavior is not + desired (as may be the case while actually writing the test cases), this + option can be used to prevent the dependency from being added. + + ``TEST_LIST outVar`` + The variable named by ``outVar`` will be populated in the calling scope + with the list of discovered test cases. This allows the caller to do things + like manipulate test properties of the discovered tests. + + .. code-block:: cmake + + include(GoogleTest) + add_executable(FooTest FooUnitTest.cxx) + gtest_add_tests(TARGET FooTest + TEST_SUFFIX .noArgs + TEST_LIST noArgsTests + ) + gtest_add_tests(TARGET FooTest + EXTRA_ARGS --someArg someValue + TEST_SUFFIX .withArgs + TEST_LIST withArgsTests + ) + set_tests_properties(${noArgsTests} PROPERTIES TIMEOUT 10) + set_tests_properties(${withArgsTests} PROPERTIES TIMEOUT 20) + + For backward compatibility reasons, the following form is also supported:: + + gtest_add_tests(exe args files...) + + ``exe`` + The path to the test executable or the name of a CMake target. + ``args`` + A ;-list of extra arguments to be passed to executable. The entire + list must be passed as a single argument. Enclose it in quotes, + or pass ``""`` for no arguments. + ``files...`` + A list of source files to search for tests and test fixtures. + Alternatively, use ``AUTO`` to specify that ``exe`` is the name + of a CMake executable target whose sources should be scanned. + + .. code-block:: cmake + + include(GoogleTest) + set(FooTestArgs --foo 1 --bar 2) + add_executable(FooTest FooUnitTest.cxx) + gtest_add_tests(FooTest "${FooTestArgs}" AUTO) + +#]=======================================================================] + +function(gtest_add_tests) + + if (ARGC LESS 1) + message(FATAL_ERROR "No arguments supplied to gtest_add_tests()") + endif() + + set(options + SKIP_DEPENDENCY + ) + set(oneValueArgs + TARGET + WORKING_DIRECTORY + TEST_PREFIX + TEST_SUFFIX + TEST_LIST + ) + set(multiValueArgs + SOURCES + EXTRA_ARGS + ) + set(allKeywords ${options} ${oneValueArgs} ${multiValueArgs}) + + unset(sources) + if("${ARGV0}" IN_LIST allKeywords) + cmake_parse_arguments(ARGS "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN}) + set(autoAddSources YES) + else() + # Non-keyword syntax, convert to keyword form + if (ARGC LESS 3) + message(FATAL_ERROR "gtest_add_tests() without keyword options requires at least 3 arguments") + endif() + set(ARGS_TARGET "${ARGV0}") + set(ARGS_EXTRA_ARGS "${ARGV1}") + if(NOT "${ARGV2}" STREQUAL "AUTO") + set(ARGS_SOURCES "${ARGV}") + list(REMOVE_AT ARGS_SOURCES 0 1) + endif() + endif() + + # The non-keyword syntax allows the first argument to be an arbitrary + # executable rather than a target if source files are also provided. In all + # other cases, both forms require a target. + if(NOT TARGET "${ARGS_TARGET}" AND NOT ARGS_SOURCES) + message(FATAL_ERROR "${ARGS_TARGET} does not define an existing CMake target") + endif() + if(NOT ARGS_WORKING_DIRECTORY) + unset(workDir) + else() + set(workDir WORKING_DIRECTORY "${ARGS_WORKING_DIRECTORY}") + endif() + + if(NOT ARGS_SOURCES) + get_property(ARGS_SOURCES TARGET ${ARGS_TARGET} PROPERTY SOURCES) + endif() + + unset(testList) + + set(gtest_case_name_regex ".*\\( *([A-Za-z_0-9]+) *, *([A-Za-z_0-9]+) *\\).*") + set(gtest_test_type_regex "(TYPED_TEST|TEST_?[FP]?)") + + foreach(source IN LISTS ARGS_SOURCES) + if(NOT ARGS_SKIP_DEPENDENCY) + set_property(DIRECTORY APPEND PROPERTY CMAKE_CONFIGURE_DEPENDS ${source}) + endif() + file(READ "${source}" contents) + string(REGEX MATCHALL "${gtest_test_type_regex} *\\(([A-Za-z_0-9 ,]+)\\)" found_tests ${contents}) + foreach(hit ${found_tests}) + string(REGEX MATCH "${gtest_test_type_regex}" test_type ${hit}) + + # Parameterized tests have a different signature for the filter + if("x${test_type}" STREQUAL "xTEST_P") + string(REGEX REPLACE ${gtest_case_name_regex} "*/\\1.\\2/*" gtest_test_name ${hit}) + elseif("x${test_type}" STREQUAL "xTEST_F" OR "x${test_type}" STREQUAL "xTEST") + string(REGEX REPLACE ${gtest_case_name_regex} "\\1.\\2" gtest_test_name ${hit}) + elseif("x${test_type}" STREQUAL "xTYPED_TEST") + string(REGEX REPLACE ${gtest_case_name_regex} "\\1/*.\\2" gtest_test_name ${hit}) + else() + message(WARNING "Could not parse GTest ${hit} for adding to CTest.") + continue() + endif() + + # Make sure tests disabled in GTest get disabled in CTest + if(gtest_test_name MATCHES "(^|\\.)DISABLED_") + # Add the disabled test if CMake is new enough + # Note that this check is to allow backwards compatibility so this + # module can be copied locally in projects to use with older CMake + # versions + if(CMAKE_VERSION VERSION_GREATER_EQUAL 3.8.20170401) + string(REGEX REPLACE + "(^|\\.)DISABLED_" "\\1" + orig_test_name "${gtest_test_name}" + ) + set(ctest_test_name + ${ARGS_TEST_PREFIX}${orig_test_name}${ARGS_TEST_SUFFIX} + ) + add_test(NAME ${ctest_test_name} + ${workDir} + COMMAND ${ARGS_TARGET} + --gtest_also_run_disabled_tests + --gtest_filter=${gtest_test_name} + ${ARGS_EXTRA_ARGS} + ) + set_tests_properties(${ctest_test_name} PROPERTIES DISABLED TRUE) + list(APPEND testList ${ctest_test_name}) + endif() + else() + set(ctest_test_name ${ARGS_TEST_PREFIX}${gtest_test_name}${ARGS_TEST_SUFFIX}) + add_test(NAME ${ctest_test_name} + ${workDir} + COMMAND ${ARGS_TARGET} + --gtest_filter=${gtest_test_name} + ${ARGS_EXTRA_ARGS} + ) + list(APPEND testList ${ctest_test_name}) + endif() + endforeach() + endforeach() + + if(ARGS_TEST_LIST) + set(${ARGS_TEST_LIST} ${testList} PARENT_SCOPE) + endif() + +endfunction() diff --git a/examples/heatTransfer/write/CMakeLists.txt b/examples/heatTransfer/write/CMakeLists.txt index 763a151aa2206afee7aa56a95e05be48744cb2fb..5034755f77f8c54ea8e46f98c650624b17b0b01b 100644 --- a/examples/heatTransfer/write/CMakeLists.txt +++ b/examples/heatTransfer/write/CMakeLists.txt @@ -52,4 +52,45 @@ if(ADIOS_USE_MPI) ${MPI_C_LIBRARIES} ${HDF5_C_LIBRARIES} ) endif() + + + if(ADIOS_USE_HDF5) + find_package(HDF5 REQUIRED) + find_package(MPI COMPONENTS C REQUIRED) + + add_executable(heatTransfer_write_ph5 + main.cpp + HeatTransfer.cpp + Settings.cpp + IO_ph5.cpp + ) + target_include_directories(heatTransfer_write_ph5 + PRIVATE ${MPI_C_INCLUDE_PATH} ${HDF5_C_INCLUDE_DIRS} + ) + target_link_libraries(heatTransfer_write_ph5 + ${MPI_C_LIBRARIES} ${HDF5_C_LIBRARIES} + ) + endif() + + + if(ADIOS_USE_HDF5) + find_package(MPI COMPONENTS C REQUIRED) + + add_executable(heatTransfer_write_a2h5 + main.cpp + HeatTransfer.cpp + Settings.cpp + IO_ph5_adios2.cpp + ) + + target_include_directories(heatTransfer_write_a2h5 + PRIVATE ${MPI_C_INCLUDE_PATH} + ) + #target_link_libraries(heatTransfer_write_a2h5 + # ${MPI_C_LIBRARIES} + #) + target_link_libraries(heatTransfer_write_a2h5 PUBLIC adios2) + + endif() + endif() diff --git a/examples/heatTransfer/write/IO_adios1.cpp b/examples/heatTransfer/write/IO_adios1.cpp index 5e0a0e41b635863e92071c9beb97656d4113662d..caba2796b3d8206d95606b6f201ce2b571dea3eb 100644 --- a/examples/heatTransfer/write/IO_adios1.cpp +++ b/examples/heatTransfer/write/IO_adios1.cpp @@ -16,6 +16,11 @@ #include <adios.h> +// Enable compatibility with ADIOS 1.10 adios_declare_group signature +#if !ADIOS_VERSION_GE(1, 11, 0) +#define adios_stat_default adios_flag_yes +#endif + static int64_t group; static int rank_saved; diff --git a/examples/heatTransfer/write/IO_ph5.cpp b/examples/heatTransfer/write/IO_ph5.cpp new file mode 100644 index 0000000000000000000000000000000000000000..674b5388761cefa4c4203374522f16105199038a --- /dev/null +++ b/examples/heatTransfer/write/IO_ph5.cpp @@ -0,0 +1,233 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * IO_ADIOS2.cpp + * + * Created on: Feb 2017 + * Author: Norbert Podhorszki + */ + +#include "IO.h" + +#include <hdf5.h> +#include <ios> +#include <iostream> +#include <memory> +#include <stdexcept> +#include <string> + +class HDF5NativeWriter +{ + +public: + HDF5NativeWriter(const std::string &fileName); + ~HDF5NativeWriter(); + + bool Advance(); + void Close(); + void CheckWriteGroup(); + + void WriteScalar(const std::string &varName, const void *data, + hid_t h5Type); + void WriteSimple(const std::string &varName, int dimSize, const void *data, + hid_t h5Type, const hsize_t *shape, const hsize_t *offset, + const hsize_t *count); + + int m_CurrentTimeStep; + unsigned int m_TotalTimeSteps; + +private: + hid_t m_FilePropertyListId; + hid_t m_FileId; + hid_t m_GroupId; +}; + +HDF5NativeWriter::HDF5NativeWriter(const std::string &fileName) +: m_CurrentTimeStep(0), m_TotalTimeSteps(0) +{ + m_FilePropertyListId = H5Pcreate(H5P_FILE_ACCESS); + + // read a file collectively + H5Pset_fapl_mpio(m_FilePropertyListId, MPI_COMM_WORLD, MPI_INFO_NULL); + + m_FileId = H5Fcreate(fileName.c_str(), H5F_ACC_TRUNC, H5P_DEFAULT, + m_FilePropertyListId); + + if (m_FileId < 0) + { + throw std::runtime_error("Unable to open " + fileName + " for reading"); + } + + std::string ts0 = "/TimeStep0"; + + m_GroupId = H5Gcreate2(m_FileId, ts0.c_str(), H5P_DEFAULT, H5P_DEFAULT, + H5P_DEFAULT); + if (m_GroupId < 0) + { + throw std::runtime_error("HDF5: Unable to create group " + ts0); + } +} + +HDF5NativeWriter::~HDF5NativeWriter() { Close(); } + +void HDF5NativeWriter::Close() +{ + if (m_FileId < 0) + return; + + hid_t s = H5Screate(H5S_SCALAR); + hid_t attr = H5Acreate(m_FileId, "NumTimeSteps", H5T_NATIVE_UINT, s, + H5P_DEFAULT, H5P_DEFAULT); + uint totalTimeSteps = m_CurrentTimeStep + 1; + + if (m_GroupId < 0) + { + totalTimeSteps = m_CurrentTimeStep; + } + H5Awrite(attr, H5T_NATIVE_UINT, &totalTimeSteps); + H5Sclose(s); + H5Aclose(attr); + + if (m_GroupId >= 0) + { + H5Gclose(m_GroupId); + m_GroupId = -1; + } + + H5Fclose(m_FileId); + m_FileId = -1; + H5Pclose(m_FilePropertyListId); +} + +bool HDF5NativeWriter::Advance() +{ + if (m_GroupId >= 0) + { + H5Gclose(m_GroupId); + m_GroupId = -1; + } + + ++m_CurrentTimeStep; + + return true; +} + +void HDF5NativeWriter::CheckWriteGroup() +{ + if (m_GroupId >= 0) + { + return; + } + + std::string timeStepName = "/TimeStep" + std::to_string(m_CurrentTimeStep); + m_GroupId = H5Gcreate2(m_FileId, timeStepName.c_str(), H5P_DEFAULT, + H5P_DEFAULT, H5P_DEFAULT); + if (m_GroupId < 0) + { + throw std::runtime_error("HDF5: Unable to create group " + + timeStepName); + } +} + +void HDF5NativeWriter::WriteScalar(const std::string &varName, const void *data, + hid_t h5Type) +{ + CheckWriteGroup(); + // scalar + hid_t filespaceID = H5Screate(H5S_SCALAR); + hid_t dsetID = H5Dcreate(m_GroupId, varName.c_str(), h5Type, filespaceID, + H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); + herr_t status = + H5Dwrite(dsetID, h5Type, H5S_ALL, H5S_ALL, H5P_DEFAULT, data); + + H5Sclose(filespaceID); + H5Dclose(dsetID); +} + +void HDF5NativeWriter::WriteSimple(const std::string &varName, int dimSize, + const void *data, hid_t h5Type, + const hsize_t *shape, const hsize_t *offset, + const hsize_t *count) +{ + CheckWriteGroup(); + hid_t fileSpace = H5Screate_simple(dimSize, shape, NULL); + + hid_t dsetID = H5Dcreate(m_GroupId, varName.c_str(), h5Type, fileSpace, + H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); + + hid_t memSpace = H5Screate_simple(dimSize, count, NULL); + + // Select hyperslab + fileSpace = H5Dget_space(dsetID); + H5Sselect_hyperslab(fileSpace, H5S_SELECT_SET, offset, NULL, count, NULL); + + // Create property list for collective dataset write. + + hid_t plistID = H5Pcreate(H5P_DATASET_XFER); + H5Pset_dxpl_mpio(plistID, H5FD_MPIO_COLLECTIVE); + + herr_t status; + + status = H5Dwrite(dsetID, h5Type, memSpace, fileSpace, plistID, data); + + if (status < 0) + { + // error + std::cerr << " Write failed. " << std::endl; + } + + H5Dclose(dsetID); + H5Sclose(fileSpace); + H5Sclose(memSpace); + H5Pclose(plistID); +} + +// +// +std::shared_ptr<HDF5NativeWriter> h5writer; +// HDF5NativeWriter* h5writer; + +IO::IO(const Settings &s, MPI_Comm comm) +{ + m_outputfilename = s.outputfile + ".h5"; + + if (s.outputfile[0] == '0') + { + std::cout << " no writer. " << std::endl; + h5writer = nullptr; + return; + } + h5writer = std::make_shared<HDF5NativeWriter>(m_outputfilename); + + if (h5writer == nullptr) + throw std::ios_base::failure("ERROR: failed to open ADIOS h5writer\n"); +} + +IO::~IO() +{ + if (h5writer != nullptr) + { + h5writer->Close(); + } + // delete h5writer; +} + +void IO::write(int step, const HeatTransfer &ht, const Settings &s, + MPI_Comm comm) +{ + if (h5writer == nullptr) + { + return; + } + std::vector<hsize_t> dims = {s.gndx, s.gndy}; + std::vector<hsize_t> offset = {s.offsx, s.offsy}; + std::vector<hsize_t> count = {s.ndx, s.ndy}; + + h5writer->WriteSimple("T", 2, ht.data_noghost().data(), H5T_NATIVE_DOUBLE, + dims.data(), offset.data(), count.data()); + h5writer->WriteScalar("gndy", &(s.gndy), H5T_NATIVE_UINT); + h5writer->WriteScalar("gndx", &(s.gndx), H5T_NATIVE_UINT); + + h5writer->Advance(); +} diff --git a/examples/heatTransfer/write/IO_ph5_adios2.cpp b/examples/heatTransfer/write/IO_ph5_adios2.cpp new file mode 100644 index 0000000000000000000000000000000000000000..e694d9f291698cbb51f4bc8b0e304e32d6d90e1f --- /dev/null +++ b/examples/heatTransfer/write/IO_ph5_adios2.cpp @@ -0,0 +1,120 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * IO_ADIOS2.cpp + * + * Created on: Feb 2017 + * Author: Norbert Podhorszki + */ + +#include "IO.h" + +#include <string> + +#include <adios2.h> + +static int rank_saved; +adios::ADIOS *ad = nullptr; +std::shared_ptr<adios::Engine> h5writer; +adios::Variable<double> *varT = nullptr; +adios::Variable<unsigned int> *varGndx = nullptr; + +IO::IO(const Settings &s, MPI_Comm comm) +{ + rank_saved = s.rank; + m_outputfilename = s.outputfile + ".h5"; + // adios::ADIOS adios(comm, adios::Verbose::INFO, false); + ad = new adios::ADIOS(comm, adios::Verbose::INFO, false); + + // Define method for engine creation + // 1. Get method def from config file or define new one + + adios::Method &h5writerSettings = ad->DeclareMethod("output"); + if (!h5writerSettings.IsUserDefined()) + { + // if not defined by user, we can change the default settings + // BPFileWriter is the default engine + h5writerSettings.SetEngine("HDF5Writer"); + // Allow an extra thread for data processing + + const std::string aggregatorsParam("Aggregators=" + + std::to_string((s.nproc + 1) / 2)); + h5writerSettings.SetParameters("have_metadata_file=yes", + aggregatorsParam); + } + + // ad->DefineScalar<unsigned int>("gndx", true); + varGndx = &(ad->DefineVariable<unsigned int>("gndx")); + ad->DefineVariable<unsigned int>("gndy"); + + // define T as 2D global array + varT = &(ad->DefineArray<double>( + "T", + // Global dimensions + {s.gndx, s.gndy}, + // starting offset of the local array in the global space + {s.offsx, s.offsy}, + // local size, could be defined later using SetSelection() + {s.ndx, s.ndy})); + + // add transform to variable + // adios::Transform tr = adios::transform::BZIP2( ); + // varT.AddTransform( tr, "" ); + // varT.AddTransform( tr,"accuracy=0.001" ); // for ZFP + + h5writer = ad->Open(m_outputfilename, "w", comm, h5writerSettings); + + if (h5writer == nullptr) + throw std::ios_base::failure("ERROR: failed to open ADIOS h5writer\n"); +} + +IO::~IO() +{ + h5writer->Close(); + // delete ad; +} + +void IO::write(int step, const HeatTransfer &ht, const Settings &s, + MPI_Comm comm) +{ +#if 1 + + /* This selection is redundant and not required, since we defined + * the selection already in DefineVariable(). It is here just as an example. + */ + // Make a selection to describe the local dimensions of the variable we + // write and its offsets in the global spaces. This could have been done in + // adios.DefineVariable() + // adios::SelectionBoundingBox sel({s.offsx, s.offsy}, {s.ndx, s.ndy}); + // varT->SetSelection(sel); + + /* Select the area that we want to write from the data pointer we pass to + the + writer. + Think HDF5 memspace, just not hyperslabs, only a bounding box selection. + Engine will copy this bounding box from the data pointer into the output + buffer. + Size of the bounding box should match the "space" selection which was + given + above. + Default memspace is always the full selection. + */ + // adios::SelectionBoundingBox memspace = + // adios::SelectionBoundingBox({1, 1}, {s.ndx, s.ndy}); + // varT->SetMemorySelection(memspace); + + h5writer->Write<double>(*varT, ht.data_noghost().data()); + // h5writer->Write(*varT, ht.data_noghost().data()); + h5writer->Write<unsigned int>(*varGndx, &(s.gndx)); + h5writer->Write("gndy", &(s.gndy)); + + h5writer->Advance(); + +#else + + h5writer->Write<double>(*varT, ht.data_noghost().data()); + h5writer->Advance(); + +#endif +} diff --git a/examples/hello/bpWriter/config.xml b/examples/hello/bpWriter/config.xml new file mode 100644 index 0000000000000000000000000000000000000000..10dfa830e33baed86ec3c2667f0bd6b67b2c8aa1 --- /dev/null +++ b/examples/hello/bpWriter/config.xml @@ -0,0 +1,41 @@ +<?xml version="1.0"?> +<adios-config> + + <io name="Output"> + <engine name="BPFileWriter">verbose=4;profile_units=mus</engine> + <transport name= "File"> + profile_units=mus; + abort_on_error; + have_metadata_file + = + no; + </transport> + + <!-- Create a named transform and add variables to it here. + name is optional, required only if it is used outside the definition + options is optional to pass parameters to the transformation + --> + <transform name="LossyCompression" transform="zfp" options="accuracy=0.001"> + <var name="myMatrix"/> + <var name="ThisVarDoesNotExists"/> + </transform> + + <!-- Unnamed transformation --> + <transform transform="bzip2"> + <var name="myMatrix2"/> + </transform> + + <!-- A variable can have its own private transform definition. + Also its own ordered chain of transformations. + Also can refer to a transform defined previously + --> + <var name="myDoubles"> + <transform transform="identity">verbose=DEBUG</transform> + <transform name="LossyCompression"/> + </var> + + <buffer max-size-MB="20"/> + </io> + +</adios-config> + diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index 5efe3befa37c7c2be734f799996a0d87bfbbac96..30c544588c2e48b2d3e6aedbcbd9a7ee0488d3f3 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -85,9 +85,9 @@ endif() if(ADIOS_USE_ADIOS1) if(ADIOS_USE_MPI) - find_package(ADIOS1 REQUIRED) + find_package(ADIOS1 1.12.0 REQUIRED) else() - find_package(ADIOS1 COMPONENTS sequential REQUIRED) + find_package(ADIOS1 1.12.0 COMPONENTS sequential REQUIRED) endif() target_sources(adios2 PRIVATE @@ -113,7 +113,11 @@ if(ADIOS_USE_HDF5) "of ADIOS is being built, which requires a sequential HDF5." ) endif() - target_include_directories(adios2 PRIVATE ${HDF5_C_INCLUDE_DIRS}) + if(HDF5_C_INCLUDE_DIRS) + target_include_directories(adios2 PRIVATE ${HDF5_C_INCLUDE_DIRS}) + else() + target_include_directories(adios2 PRIVATE ${HDF5_INCLUDE_DIRS}) + endif() target_sources(adios2 PRIVATE engine/hdf5/HDF5ReaderP.cpp engine/hdf5/HDF5WriterP.cpp diff --git a/source/adios2/engine/adios1/ADIOS1Writer.cpp b/source/adios2/engine/adios1/ADIOS1Writer.cpp index bd65055de7f6e007d56c534ece65678f2b28cf79..a6108fe3f956a7f639bd8e13656bdc2c603d4e79 100644 --- a/source/adios2/engine/adios1/ADIOS1Writer.cpp +++ b/source/adios2/engine/adios1/ADIOS1Writer.cpp @@ -14,6 +14,11 @@ #include "adios2/helper/adiosFunctions.h" +// Enable compatibility with ADIOS 1.10 adios_declare_group signature +#if !ADIOS_VERSION_GE(1, 11, 0) +#define adios_stat_default adios_flag_yes +#endif + namespace adios { diff --git a/source/adios2/engine/hdf5/HDF5Common.cpp b/source/adios2/engine/hdf5/HDF5Common.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d628689c7d6a3b380e1b782f6ed331cd473fe2b9 --- /dev/null +++ b/source/adios2/engine/hdf5/HDF5Common.cpp @@ -0,0 +1,227 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * HDF5Common.cpp + * + * Created on: April 20, 2017 + * Author: Junmin + */ + +#include "HDF5Common.h" + +#include <complex> +#include <iostream> + +#include "adios2/ADIOSMPI.h" + +namespace adios +{ + +#define H5_ERROR std::cout << "[ADIOS H5 ERROR] " + +HDF5Common::HDF5Common() +: m_WriteMode(false), m_CurrentTimeStep(0), m_NumTimeSteps(0) +{ + m_DefH5TypeComplexFloat = + H5Tcreate(H5T_COMPOUND, sizeof(std::complex<float>)); + H5Tinsert(m_DefH5TypeComplexFloat, "freal", 0, H5T_NATIVE_FLOAT); + H5Tinsert(m_DefH5TypeComplexFloat, "fimg", H5Tget_size(H5T_NATIVE_FLOAT), + H5T_NATIVE_FLOAT); + + m_DefH5TypeComplexDouble = + H5Tcreate(H5T_COMPOUND, sizeof(std::complex<double>)); + H5Tinsert(m_DefH5TypeComplexDouble, "dreal", 0, H5T_NATIVE_DOUBLE); + H5Tinsert(m_DefH5TypeComplexDouble, "dimg", H5Tget_size(H5T_NATIVE_DOUBLE), + H5T_NATIVE_DOUBLE); + + m_DefH5TypeComplexLongDouble = + H5Tcreate(H5T_COMPOUND, sizeof(std::complex<long double>)); + H5Tinsert(m_DefH5TypeComplexLongDouble, "ldouble real", 0, + H5T_NATIVE_LDOUBLE); + H5Tinsert(m_DefH5TypeComplexLongDouble, "ldouble img", + H5Tget_size(H5T_NATIVE_LDOUBLE), H5T_NATIVE_LDOUBLE); +} + +void HDF5Common::Init(const std::string name, MPI_Comm comm, bool toWrite) +{ + m_WriteMode = toWrite; + + // + m_PropertyListId = H5Pcreate(H5P_FILE_ACCESS); + +#ifdef ADIOS2_HAVE_MPI + H5Pset_fapl_mpio(m_PropertyListId, comm, MPI_INFO_NULL); +#endif + + std::string ts0 = "/TimeStep0"; + + if (toWrite) + { + /* + * Create a new file collectively and release property list identifier. + */ + m_FileId = H5Fcreate(name.c_str(), H5F_ACC_TRUNC, H5P_DEFAULT, + m_PropertyListId); + if (m_FileId >= 0) + { + m_GroupId = H5Gcreate2(m_FileId, ts0.c_str(), H5P_DEFAULT, + H5P_DEFAULT, H5P_DEFAULT); + if (m_GroupId < 0) + { + std::clog << "H5Common::Init: ERROR" << std::endl; + throw std::runtime_error("HDF5: Unable to create group " + ts0); + } + } + } + else + { + // read a file collectively + m_FileId = H5Fopen(name.c_str(), H5F_ACC_RDONLY, H5P_DEFAULT); + if (m_FileId >= 0) + { + m_GroupId = H5Gopen(m_FileId, ts0.c_str(), H5P_DEFAULT); + } + } + + H5Pclose(m_PropertyListId); +} + +void HDF5Common::WriteTimeSteps() +{ + if (m_FileId < 0) + { + // std::cerr<<"[ADIOS HDF5Error]: Invalid file to record timestep + // to."<<std::endl; + H5_ERROR << "Invalid file to record timestep to." << std::endl; + return; + } + + if (!m_WriteMode) + { + return; + } + + hid_t s = H5Screate(H5S_SCALAR); + + hid_t attr = H5Acreate(m_FileId, "NumTimeSteps", H5T_NATIVE_UINT, s, + H5P_DEFAULT, H5P_DEFAULT); + uint totalTimeSteps = m_CurrentTimeStep + 1; + + if (m_GroupId < 0) + { + totalTimeSteps = m_CurrentTimeStep; + } + + H5Awrite(attr, H5T_NATIVE_UINT, &totalTimeSteps); + + H5Sclose(s); + H5Aclose(attr); +} + +unsigned int HDF5Common::GetNumTimeSteps() +{ + if (m_WriteMode) + { + return -1; + } + + if (m_FileId < 0) + { + std::cerr + << "[ADIOS HDF5Error]: Invalid file to read timestep attribute." + << std::endl; + return -1; + } + + if (m_NumTimeSteps <= 0) + { + hid_t attr = H5Aopen(m_FileId, "NumTimeSteps", H5P_DEFAULT); + + H5Aread(attr, H5T_NATIVE_UINT, &m_NumTimeSteps); + H5Aclose(attr); + } + + return m_NumTimeSteps; +} + +void HDF5Common::Close() +{ + if (m_FileId < 0) + { + return; + } + + WriteTimeSteps(); + + if (m_GroupId >= 0) + { + H5Gclose(m_GroupId); + } + + H5Fclose(m_FileId); + m_FileId = -1; + m_GroupId = -1; + + H5Tclose(m_DefH5TypeComplexFloat); + H5Tclose(m_DefH5TypeComplexDouble); + H5Tclose(m_DefH5TypeComplexLongDouble); +} + +void HDF5Common::Advance() +{ + if (m_GroupId >= 0) + { + H5Gclose(m_GroupId); + m_GroupId = -1; + } + + if (m_WriteMode) + { + // m_GroupId = H5Gcreate2(m_FileId, tsname.c_str(), H5P_DEFAULT, + // H5P_DEFAULT, H5P_DEFAULT); + } + else + { + if (m_NumTimeSteps == 0) + { + GetNumTimeSteps(); + } + if (m_CurrentTimeStep + 1 >= m_NumTimeSteps) + { + return; + } + + std::string timeStepName = + "/TimeStep" + std::to_string(m_CurrentTimeStep + 1); + m_GroupId = H5Gopen(m_FileId, timeStepName.c_str(), H5P_DEFAULT); + if (m_GroupId < 0) + { + throw std::runtime_error("HDF5: Unable to open group " + + timeStepName); + } + } + ++m_CurrentTimeStep; +} + +void HDF5Common::CheckWriteGroup() +{ + if (!m_WriteMode) + { + return; + } + if (m_GroupId >= 0) + { + return; + } + + std::string timeStepName = "/TimeStep" + std::to_string(m_CurrentTimeStep); + m_GroupId = H5Gcreate2(m_FileId, timeStepName.c_str(), H5P_DEFAULT, + H5P_DEFAULT, H5P_DEFAULT); + if (m_GroupId < 0) + { + throw std::runtime_error("HDF5: Unable to create group " + + timeStepName); + } +} +} diff --git a/source/dataman/CacheMan.cpp b/source/dataman/CacheMan.cpp index f53cfaa6d56adcb4fa0fa0129bd8bde0ffc950fe..d18c3eb140a278e2b8bccd502948ae0dae5842fa 100644 --- a/source/dataman/CacheMan.cpp +++ b/source/dataman/CacheMan.cpp @@ -19,14 +19,14 @@ int CacheItem::init(json a_jmsg) return 0; } -int CacheMan::put(const void *a_data, json a_jmsg) +int CacheMan::put(const void *a_data, json &a_jmsg) { std::string doid = a_jmsg["doid"].get<std::string>(); std::string var = a_jmsg["var"].get<std::string>(); return m_cache[doid][var].put(a_data, a_jmsg); } -int CacheItem::put(const void *a_data, json a_jmsg) +int CacheItem::put(const void *a_data, json &a_jmsg) { if (!m_initialized) diff --git a/source/dataman/CacheMan.h b/source/dataman/CacheMan.h index 9668f014f3f484fec86f0660851a5bbff33d45a6..0aaaacdef36ce02b169a81adac4bdfc7b7caa9a5 100644 --- a/source/dataman/CacheMan.h +++ b/source/dataman/CacheMan.h @@ -21,7 +21,7 @@ public: using json = nlohmann::json; int init(json a_jmsg); - virtual int put(const void *a_data, json a_jmsg); + virtual int put(const void *a_data, json &a_jmsg); virtual void transform(std::vector<char> &a_data, json &a_jmsg) {} void *get(); @@ -81,7 +81,7 @@ class CacheMan public: using json = nlohmann::json; - int put(const void *a_data, json a_jmsg); + int put(const void *a_data, json &a_jmsg); void *get(std::string doid, std::string var); void pop(); void push(); diff --git a/source/dataman/DataMan.cpp b/source/dataman/DataMan.cpp index 8bc55fcb79f8391c2b8f12e6a01c331fb084d214..d3ba9a83fc147368a1cf0409a54cb2a9970b37d0 100644 --- a/source/dataman/DataMan.cpp +++ b/source/dataman/DataMan.cpp @@ -12,16 +12,19 @@ int DataMan::init(json a_jmsg) { return 0; } -int DataMan::put(const void *a_data, std::string p_doid, std::string p_var, - std::string p_dtype, std::vector<size_t> p_putshape, - std::vector<size_t> p_varshape, std::vector<size_t> p_offset, - size_t p_timestep, int p_tolerance, int p_priority) +int DataMan::put_streams(const void *a_data, json &a_jmsg) { - return DataMan::put(a_data, p_doid, p_var, p_dtype, p_putshape, p_varshape, - p_offset, p_timestep, p_tolerance, p_priority); + a_jmsg["channel_id"] = m_stream_index; + m_stream_mans[m_stream_index]->put(a_data, a_jmsg); + ++m_stream_index; + if (m_stream_index >= m_stream_mans.size()) + { + m_stream_index = 0; + } + return 0; } -int DataMan::put(const void *a_data, json a_jmsg) +int DataMan::put(const void *a_data, json &a_jmsg) { a_jmsg["timestep"] = m_timestep; if (m_cache_size > 0) @@ -32,9 +35,10 @@ int DataMan::put(const void *a_data, json a_jmsg) else { put_begin(a_data, a_jmsg); + put_streams(a_data, a_jmsg); put_end(a_data, a_jmsg); } - + dump_profiling(); return 0; } @@ -57,32 +61,34 @@ void DataMan::add_stream(json a_jmsg) m_cache_size = a_jmsg["cachesize"].get<size_t>(); } - if (m_tolerance.size() < m_num_channels) + int num_channels = 1; + + if (a_jmsg["num_channels"].is_number()) { - for (int i = 0; i < m_num_channels; ++i) - { - m_tolerance.push_back(0); - } + num_channels = a_jmsg["num_channels"].get<int>(); } - if (m_priority.size() < m_num_channels) + else { - for (int i = 0; i < m_num_channels; ++i) - { - m_priority.push_back(100 / (i + 1)); - } + a_jmsg["num_channels"] = num_channels; } - auto man = get_man(method); - if (man) + for (int i = 0; i < num_channels; i++) { - man->init(a_jmsg); - this->add_next(method, man); - } - if (a_jmsg["compression_method"].is_string()) - { - if (a_jmsg["compression_method"] != "null") + a_jmsg["channel_id"] = i; + a_jmsg["local_port"] = a_jmsg["local_port"].get<int>() + 2; + a_jmsg["remote_port"] = a_jmsg["remote_port"].get<int>() + 2; + auto man = get_man(method); + if (man) { - add_man_to_path(a_jmsg["compression_method"], method, a_jmsg); + std::cout << a_jmsg.dump(4); + man->init(a_jmsg); + m_stream_mans.push_back(man); + } + if (a_jmsg["compression_method"].is_string()) + { + if (a_jmsg["compression_method"] != "null") + { + } } } } @@ -104,10 +110,10 @@ void DataMan::flush() { json jmsg = m_cache.get_jmsg(j, k); put_begin(m_cache.get(j, k), jmsg); + put_streams(m_cache.get(j, k), jmsg); put_end(m_cache.get(j, k), jmsg); } } - flush_next(); m_cache.pop(); } } @@ -116,10 +122,6 @@ void DataMan::flush() m_cache.push(); } } - else - { - flush_next(); - } } int DataMan::get(void *a_data, json &a_jmsg) { return 0; } diff --git a/source/dataman/DataMan.h b/source/dataman/DataMan.h index 20ccdc63a60545087130e0d59974321bbc10b919..f37b86d3b08165683889cf29e0a53c2fb775cd26 100644 --- a/source/dataman/DataMan.h +++ b/source/dataman/DataMan.h @@ -19,31 +19,22 @@ class DataMan : public DataManBase public: DataMan() = default; virtual ~DataMan() = default; - virtual int init(json p_jmsg); - virtual int put(const void *p_data, json p_jmsg); - virtual int get(void *p_data, json &p_jmsg); + virtual int init(json a_jmsg); + virtual int put(const void *a_data, json &a_jmsg); + virtual int get(void *a_data, json &a_jmsg); + int put_streams(const void *a_data, json &a_jmsg); void flush(); - void add_stream(json p_jmsg); - int put(const void *p_data, std::string p_doid, std::string p_var, - std::string p_dtype, std::vector<size_t> p_putshape, - std::vector<size_t> p_varshape, std::vector<size_t> p_offset, - size_t p_timestep, int p_tolerance = 0, int p_priority = 100); + void add_stream(json a_jmsg); void add_file(std::string p_method); std::string name() { return "DataManager"; } std::string type() { return "Manager"; } virtual void transform(std::vector<char> &a_data, json &a_jmsg) {} private: - std::string m_local_ip; - std::string m_remote_ip; - int m_local_port = 0; - int m_remote_port = 0; - int m_num_channels = 0; - std::vector<int> m_tolerance; - std::vector<int> m_priority; CacheMan m_cache; size_t m_cache_size = 0; size_t m_timestep = 0; + int m_stream_index = 0; }; -#endif /* DATAMAN_H_ */ +#endif // DATAMAN_DATAMAN_H_ diff --git a/source/dataman/DataManBase.cpp b/source/dataman/DataManBase.cpp index 3012da8bba660730e131ef71e1228beb5502776c..5d498e31cd1758609dc201bf11d0002853332297 100644 --- a/source/dataman/DataManBase.cpp +++ b/source/dataman/DataManBase.cpp @@ -106,27 +106,6 @@ DataManBase::DataManBase() m_start_time = std::chrono::system_clock::now(); } -int DataManBase::put(const void *p_data, std::string p_doid, std::string p_var, - std::string p_dtype, std::vector<size_t> p_putshape, - std::vector<size_t> p_varshape, - std::vector<size_t> p_offset, size_t p_timestep, - int p_tolerance, int p_priority) -{ - json msg; - msg["doid"] = p_doid; - msg["var"] = p_var; - msg["dtype"] = p_dtype; - msg["putshape"] = p_putshape; - msg["putbytes"] = product(p_putshape, dsize(p_dtype)); - msg["varshape"] = p_varshape; - msg["varbytes"] = product(p_varshape, dsize(p_dtype)); - msg["offset"] = p_offset; - msg["timestep"] = p_timestep; - msg["tolerance"] = p_tolerance; - msg["priority"] = p_priority; - return put(p_data, msg); -} - int DataManBase::put_begin(const void *p_data, json &p_jmsg) { check_shape(p_jmsg); @@ -152,7 +131,6 @@ int DataManBase::put_end(const void *p_data, json &p_jmsg) m_profiling["total_mb"] = m_profiling["total_mb"].get<double>() + product(p_jmsg["varshape"], dsize(p_jmsg["dtype"])) / 1000000.0f; - std::cout << product(p_jmsg["varshape"], dsize(p_jmsg["dtype"])) << "\n"; duration = end - m_start_time; m_profiling["total_workflow_time"] = duration.count(); m_profiling["workflow_mbs"] = @@ -161,50 +139,23 @@ int DataManBase::put_end(const void *p_data, json &p_jmsg) m_profiling["manager_mbs"] = m_profiling["total_mb"].get<double>() / m_profiling["total_manager_time"].get<double>(); - put_next(p_data, p_jmsg); return 0; } -int DataManBase::get(void *p_data, std::string p_doid, std::string p_var, - std::string p_dtype, std::vector<size_t> p_getshape, - std::vector<size_t> p_varshape, - std::vector<size_t> p_offset, size_t p_timestep) -{ - json msg; - msg["doid"] = p_doid; - msg["var"] = p_var; - msg["dtype"] = p_dtype; - msg["getshape"] = p_getshape; - msg["varshape"] = p_varshape; - msg["offset"] = p_offset; - msg["timestep"] = p_timestep; - return get(p_data, msg); -} - -int DataManBase::get(void *p_data, std::string p_doid, std::string p_var, - std::string &p_dtype, std::vector<size_t> &p_varshape, - size_t &p_timestep) -{ - json msg; - msg["doid"] = p_doid; - msg["var"] = p_var; - return get(p_data, msg); -} - void DataManBase::reg_callback( std::function<void(const void *, std::string, std::string, std::string, std::vector<size_t>)> cb) { - if (m_next.empty()) + if (m_stream_mans.empty()) { m_callback = cb; } else { - for (const auto &i : m_next) + for (const auto &i : m_stream_mans) { - i.second->reg_callback(cb); + i->reg_callback(cb); } } } @@ -229,36 +180,6 @@ void DataManBase::dump(const void *p_data, json p_jmsg, std::ostream &out) out << std::endl; } -void DataManBase::add_next(std::string p_name, - std::shared_ptr<DataManBase> p_next) -{ - m_next[p_name] = p_next; -} - -void DataManBase::remove_next(std::string p_name) { m_next.erase(p_name); } - -bool DataManBase::have_next() -{ - if (m_next.empty()) - { - return false; - } - else - { - return true; - } -} - -void DataManBase::print_next(std::ostream &out) -{ - for (const auto &i : m_next) - { - out << i.second->name() << " -> "; - i.second->print_next(); - out << std::endl; - } -} - bool DataManBase::auto_transform(std::vector<char> &a_data, json &a_jmsg) { if (a_jmsg["compression_method"].is_string() && @@ -284,40 +205,6 @@ bool DataManBase::auto_transform(std::vector<char> &a_data, json &a_jmsg) } } -void DataManBase::add_man_to_path(std::string p_new, std::string p_path, - json p_jmsg) -{ - if (m_next.count(p_path) > 0) - { - auto man = get_man(p_new); - if (man) - { - man->init(p_jmsg); - man->add_next(p_path, m_next[p_path]); - this->add_next(p_new, man); - this->remove_next(p_path); - } - } -} - -int DataManBase::flush_next() -{ - for (const auto &i : m_next) - { - i.second->flush(); - } - return 0; -} - -int DataManBase::put_next(const void *p_data, json p_jmsg) -{ - for (const auto &i : m_next) - { - i.second->put(p_data, p_jmsg); - } - return 0; -} - std::shared_ptr<DataManBase> DataManBase::get_man(std::string method) { try @@ -491,23 +378,6 @@ size_t DataManBase::dsize(std::string dtype) return 0; } -nlohmann::json DataManBase::atoj(unsigned int *array) -{ - json j; - if (array) - { - if (array[0] > 0) - { - j = {array[1]}; - for (unsigned int i = 2; i <= array[0]; ++i) - { - j.insert(j.end(), array[i]); - } - } - } - return j; -} - int DataManBase::closest(int v, json j, bool up) { int s = 100, k = 0, t; @@ -559,3 +429,5 @@ void DataManBase::check_shape(json &p_jmsg) p_jmsg["varbytes"] = product(varshape, dsize(p_jmsg["dtype"].get<std::string>())); } + +void DataManBase::dump_profiling() { logging(m_profiling.dump(4)); } diff --git a/source/dataman/DataManBase.h b/source/dataman/DataManBase.h index 8ea2db345b25aaf9eda7bff371163c5bf0b2a538..37780ae7afab5abfcaebff592da2c9c355797ec9 100644 --- a/source/dataman/DataManBase.h +++ b/source/dataman/DataManBase.h @@ -30,62 +30,31 @@ public: using json = nlohmann::json; DataManBase(); - int put(const void *p_data, std::string p_doid, std::string p_var, - std::string p_dtype, std::vector<size_t> p_putshape, - std::vector<size_t> p_varshape, std::vector<size_t> p_offset, - size_t p_timestep, int p_tolerance = 0, int p_priority = 100); + virtual int put_begin(const void *a_data, json &a_jmsg); + virtual int put_end(const void *a_data, json &a_jmsg); - virtual int put_begin(const void *p_data, json &p_jmsg); - - virtual int put_end(const void *p_data, json &p_jmsg); - - virtual int put(const void *p_data, json p_jmsg) = 0; - - int get(void *p_data, std::string p_doid, std::string p_var, - std::string p_dtype, std::vector<size_t> p_getshape, - std::vector<size_t> p_varshape, std::vector<size_t> p_offset, - size_t p_timestep); - - int get(void *p_data, std::string p_doid, std::string p_var, - std::string &p_dtype, std::vector<size_t> &p_varshape, - size_t &p_timestep); - - virtual int get(void *p_data, json &p_jmsg) = 0; - virtual int init(json p_jmsg) = 0; + virtual int put(const void *a_data, json &a_jmsg) = 0; + virtual int get(void *a_data, json &a_jmsg) = 0; + virtual int init(json a_jmsg) = 0; virtual void flush() = 0; virtual std::string name() = 0; virtual std::string type() = 0; void reg_callback(std::function<void(const void *, std::string, std::string, std::string, std::vector<size_t>)> cb); - - void dump(const void *p_data, json p_jmsg, std::ostream &out = std::cout); - - void add_next(std::string p_name, std::shared_ptr<DataManBase> p_next); - - void remove_next(std::string p_name); - - bool have_next(); - - void print_next(std::ostream &out = std::cout); - + void dump(const void *a_data, json a_jmsg, std::ostream &out = std::cout); virtual void transform(std::vector<char> &a_data, json &a_jmsg) = 0; + void dump_profiling(); protected: bool auto_transform(std::vector<char> &a_data, json &a_jmsg); - void add_man_to_path(std::string p_new, std::string p_path, json p_jmsg); - - virtual int flush_next(); - - virtual int put_next(const void *p_data, json p_jmsg); - std::shared_ptr<DataManBase> get_man(std::string method); void logging(std::string p_msg, std::string p_man = "", std::ostream &out = std::cout); - bool check_json(json p_jmsg, std::vector<std::string> p_strings, + bool check_json(json a_jmsg, std::vector<std::string> p_strings, std::string p_man = ""); size_t product(size_t *shape); @@ -94,25 +63,22 @@ protected: size_t dsize(std::string dtype); - json atoj(unsigned int *array); - int closest(int v, json j, bool up); - void check_shape(json &p_jmsg); + void check_shape(json &a_jmsg); std::function<void(const void *, std::string, std::string, std::string, std::vector<size_t>)> m_callback; - std::map<std::string, std::shared_ptr<DataManBase>> m_next; + std::vector<std::shared_ptr<DataManBase>> m_stream_mans; private: struct ManagerLibrary; std::unordered_map<std::string, ManagerLibrary *> m_LoadedManagers; - json m_profiling; std::chrono::time_point<std::chrono::system_clock> m_start_time; std::chrono::time_point<std::chrono::system_clock> m_step_time; - bool m_profiling_enabled = false; + json m_profiling; }; #endif /* DATAMANBASE_H_ */ diff --git a/source/dataman/DumpMan.cpp b/source/dataman/DumpMan.cpp index b87c9ba02bd8399efa8c02e00728b5c2c88c5dd1..b3ab55f98c7595a280aeefbb7e0a93c9dd10886b 100644 --- a/source/dataman/DumpMan.cpp +++ b/source/dataman/DumpMan.cpp @@ -10,53 +10,53 @@ #include "DumpMan.h" -int DumpMan::init(json p_jmsg) +int DumpMan::init(json a_jmsg) { - if (p_jmsg["dumping"].is_boolean()) + if (a_jmsg["dumping"].is_boolean()) { - m_dumping = p_jmsg["dumping"].get<bool>(); + m_dumping = a_jmsg["dumping"].get<bool>(); } return 0; } -int DumpMan::get(void *p_data, json &p_jmsg) { return 0; } +int DumpMan::get(void *a_data, json &a_jmsg) { return 0; } -int DumpMan::put(const void *p_data, json p_jmsg) +int DumpMan::put(const void *a_data, json &a_jmsg) { - put_begin(p_data, p_jmsg); + put_begin(a_data, a_jmsg); if (!m_dumping) { return 1; } - if (!check_json(p_jmsg, {"doid", "var", "dtype", "putshape"})) + if (!check_json(a_jmsg, {"doid", "var", "dtype", "putshape"})) { return -1; } - std::string doid = p_jmsg["doid"]; - std::string var = p_jmsg["var"]; - std::string dtype = p_jmsg["dtype"]; + std::string doid = a_jmsg["doid"]; + std::string var = a_jmsg["var"]; + std::string dtype = a_jmsg["dtype"]; std::vector<size_t> putshape = - p_jmsg["putshape"].get<std::vector<size_t>>(); + a_jmsg["putshape"].get<std::vector<size_t>>(); std::vector<size_t> varshape = - p_jmsg["varshape"].get<std::vector<size_t>>(); - std::vector<size_t> offset = p_jmsg["offset"].get<std::vector<size_t>>(); + a_jmsg["varshape"].get<std::vector<size_t>>(); + std::vector<size_t> offset = a_jmsg["offset"].get<std::vector<size_t>>(); int numbers_to_print = 100; if (numbers_to_print > product(putshape, 1)) { numbers_to_print = product(putshape, 1); } size_t putbytes = product(putshape, dsize(dtype)); - size_t sendbytes = p_jmsg["sendbytes"].get<size_t>(); + size_t sendbytes = a_jmsg["sendbytes"].get<size_t>(); - std::cout << p_jmsg.dump(4) << std::endl; + std::cout << a_jmsg.dump(4) << std::endl; std::cout << "total MBs = " << product(putshape, dsize(dtype)) / 1000000 << std::endl; - std::vector<char> data(static_cast<const char *>(p_data), - static_cast<const char *>(p_data) + sendbytes); + std::vector<char> data(static_cast<const char *>(a_data), + static_cast<const char *>(a_data) + sendbytes); - auto_transform(data, p_jmsg); + auto_transform(data, a_jmsg); void *data_to_print = data.data(); for (size_t i = 0; i < numbers_to_print; ++i) @@ -72,7 +72,7 @@ int DumpMan::put(const void *p_data, json p_jmsg) } std::cout << std::endl; - put_end(p_data, p_jmsg); + put_end(a_data, a_jmsg); return 0; } diff --git a/source/dataman/DumpMan.h b/source/dataman/DumpMan.h index bc06477557fe816aa36dbbf1f75413fa8351a398..7dd27c21213549a22cf783ff3a11f26c44c5aa13 100644 --- a/source/dataman/DumpMan.h +++ b/source/dataman/DumpMan.h @@ -19,9 +19,9 @@ public: DumpMan() = default; virtual ~DumpMan() = default; - virtual int init(json p_jmsg); - virtual int put(const void *p_data, json p_jmsg); - virtual int get(void *p_data, json &p_jmsg); + virtual int init(json a_jmsg); + virtual int put(const void *a_data, json &a_jmsg); + virtual int get(void *a_data, json &a_jmsg); void flush(); std::string name() { return "DumpMan"; } std::string type() { return "Dump"; } diff --git a/source/dataman/MdtmMan.cpp b/source/dataman/MdtmMan.cpp index 40aed77d70e6304dafe09a3c5c695825fb0abf9c..2f41d40c669d4f07439a31f437d4a112e4132517 100644 --- a/source/dataman/MdtmMan.cpp +++ b/source/dataman/MdtmMan.cpp @@ -16,134 +16,88 @@ #include <zmq.h> -MdtmMan::~MdtmMan() -{ - if (zmq_ipc_req) - { - zmq_close(zmq_ipc_req); - } -} - -int MdtmMan::init(json p_jmsg) +int MdtmMan::init(json a_jmsg) { - StreamMan::init(p_jmsg); + StreamMan::init(a_jmsg); - if (p_jmsg["pipe_prefix"].is_string()) - { - pipe_desc["pipe_prefix"] = p_jmsg["pipe_prefix"].get<std::string>(); - } - else + if (a_jmsg["pipe_prefix"].is_string()) { - pipe_desc["pipe_prefix"] = "/tmp/MdtmManPipes/"; + m_pipepath = a_jmsg["pipe_prefix"].get<std::string>(); } + json pipe_desc; pipe_desc["operation"] = "init"; + pipe_desc["pipe_prefix"] = m_pipepath; pipe_desc["mode"] = m_stream_mode; - std::string pipename_prefix = "MdtmManPipe"; - for (int i = 0; i < m_num_channels; ++i) + std::stringstream pname; + pname << m_pipename_prefix << m_channel_id; + m_pipename = pname.str(); + m_full_pipename = m_pipepath + m_pipename; + + // send JSON message to MDTM + if (m_channel_id == 0) { - std::stringstream pipename; - pipename << pipename_prefix << i; - if (i == 0) + for (int i = 0; i < m_num_channels; ++i) { - pipe_desc["pipe_names"] = {pipename.str()}; - pipe_desc["loss_tolerance"] = {m_tolerance[i]}; - pipe_desc["priority"] = {m_priority[i]}; - } - else - { - pipe_desc["pipe_names"].insert(pipe_desc["pipe_names"].end(), - pipename.str()); - pipe_desc["loss_tolerance"].insert( - pipe_desc["loss_tolerance"].end(), m_tolerance[i]); - pipe_desc["priority"].insert(pipe_desc["priority"].end(), - m_priority[i]); + std::stringstream pipename; + pipename << m_pipename_prefix << i; + if (i == 0) + { + pipe_desc["pipe_names"] = {pipename.str()}; + } + else + { + pipe_desc["pipe_names"].insert(pipe_desc["pipe_names"].end(), + pipename.str()); + } } - } - - // ZMQ_DataMan_MDTM - if (m_stream_mode == "sender") - { - zmq_ipc_req = zmq_socket(zmq_context, ZMQ_REQ); + void *zmq_ipc_req = nullptr; + zmq_ipc_req = zmq_socket(m_zmq_context, ZMQ_REQ); zmq_connect(zmq_ipc_req, "ipc:///tmp/ADIOS_MDTM_pipe"); char buffer_return[10]; zmq_send(zmq_ipc_req, pipe_desc.dump().c_str(), pipe_desc.dump().length(), 0); zmq_recv(zmq_ipc_req, buffer_return, sizeof(buffer_return), 0); + if (zmq_ipc_req) + { + zmq_close(zmq_ipc_req); + } } - // Pipes - mkdir(pipe_desc["pipe_prefix"].get<std::string>().c_str(), 0755); - for (const auto &i : - pipe_desc["pipe_names"].get<std::vector<std::string>>()) + // Make pipes + mkdir(m_pipepath.c_str(), 0755); + mkfifo(m_full_pipename.c_str(), 0666); + + if (m_stream_mode == "sender") { - std::string filename = pipe_desc["pipe_prefix"].get<std::string>() + i; - mkfifo(filename.c_str(), 0666); + m_pipe_handler = open(m_full_pipename.c_str(), O_WRONLY); } - - for (int i = 0; i < m_num_channels; ++i) + if (m_stream_mode == "receiver") { - std::stringstream pipename; - pipename << pipename_prefix << i; - std::string fullpipename = - pipe_desc["pipe_prefix"].get<std::string>() + pipename.str(); - if (m_stream_mode == "sender") - { - int fp = open(fullpipename.c_str(), O_WRONLY); - pipes.push_back(fp); - } - if (m_stream_mode == "receiver") - { - int fp = open(fullpipename.c_str(), O_RDONLY | O_NONBLOCK); - pipes.push_back(fp); - } - pipenames.push_back(pipename.str()); + m_pipe_handler = open(m_full_pipename.c_str(), O_RDONLY | O_NONBLOCK); } return 0; } -int MdtmMan::put(const void *a_data, json a_jmsg) +int MdtmMan::put(const void *a_data, json &a_jmsg) { + a_jmsg["pipe"] = m_pipename; put_begin(a_data, a_jmsg); - - // determine pipe to use - int priority = 100; - if (a_jmsg["priority"].is_number_integer()) - { - priority = a_jmsg["priority"].get<int>(); - } - int index; - if (m_parallel_mode == "round") - { - if (m_current_channel == m_num_channels - 1) - { - index = 0; - m_current_channel = 0; - } - else - { - m_current_channel++; - index = m_current_channel; - } - } - else if (m_parallel_mode == "priority") - { - index = closest(priority, pipe_desc["priority"], true); - } - a_jmsg["pipe"] = pipe_desc["pipe_names"][index]; - - StreamMan::put(a_data, a_jmsg); - size_t sendbytes = a_jmsg["sendbytes"].get<size_t>(); - write(pipes[index], a_data, sendbytes); + StreamMan::put_stream(a_data, a_jmsg); put_end(a_data, a_jmsg); return 0; } -int MdtmMan::get(void *p_data, json &p_jmsg) { return 0; } +int MdtmMan::get(void *a_data, json &a_jmsg) { return 0; } -void MdtmMan::on_recv(json a_jmsg) +void MdtmMan::on_put(std::shared_ptr<std::vector<char>> a_data) +{ + write(m_pipe_handler, a_data->data(), a_data->size()); +} + +void MdtmMan::on_recv(json &a_jmsg) { // push new request @@ -154,7 +108,7 @@ void MdtmMan::on_recv(json a_jmsg) // for flush if (jqueue.front()["operation"] == "flush") { - callback(); + callback_cache(); jqueue.pop(); vqueue.pop(); iqueue.pop(); @@ -176,27 +130,13 @@ void MdtmMan::on_recv(json a_jmsg) size_t sendbytes = jmsg["sendbytes"].get<size_t>(); vqueue.front() = std::vector<char>(sendbytes); - // determine the pipe for the head request - if (jmsg == nullptr) - { - break; - } - int pipeindex = 0; - for (int i = 0; i < pipenames.size(); ++i) - { - if (jmsg["pipe"].get<std::string>() == pipenames[i]) - { - pipeindex = i; - } - } - // read the head request int error_times = 0; while (iqueue.front() < sendbytes) { - int ret = read(pipes[pipeindex], - vqueue.front().data() + iqueue.front(), - sendbytes - iqueue.front()); + int ret = + read(m_pipe_handler, vqueue.front().data() + iqueue.front(), + sendbytes - iqueue.front()); if (ret > 0) { iqueue.front() += ret; @@ -222,7 +162,17 @@ void MdtmMan::on_recv(json a_jmsg) auto_transform(vqueue.front(), a_jmsg); } } - m_cache.put(vqueue.front().data(), jmsg); + + if (a_jmsg["varshape"] == a_jmsg["putshape"]) + { + std::cout << "callback_direct \n"; + callback_direct(vqueue.front().data(), jmsg); + } + else + { + m_cache.put(vqueue.front().data(), jmsg); + } + jqueue.pop(); vqueue.pop(); iqueue.pop(); diff --git a/source/dataman/MdtmMan.h b/source/dataman/MdtmMan.h index 4a29da319fd1287a65312c8f066d4144d19166c3..59f96e55d2d721a76fc99354f3c2a6b78a276f83 100644 --- a/source/dataman/MdtmMan.h +++ b/source/dataman/MdtmMan.h @@ -19,23 +19,25 @@ class MdtmMan : public StreamMan { public: MdtmMan() = default; - virtual ~MdtmMan(); + virtual ~MdtmMan() = default; - virtual int init(json p_jmsg); - virtual int put(const void *p_data, json p_jmsg); - virtual int get(void *p_data, json &p_jmsg); + virtual int init(json a_jmsg); + virtual int put(const void *a_data, json &a_jmsg); + virtual int get(void *a_data, json &a_jmsg); virtual void transform(std::vector<char> &a_data, json &a_jmsg) {} - void on_recv(json msg); + virtual void on_recv(json &a_msg); + virtual void on_put(std::shared_ptr<std::vector<char>> a_data); std::string name() { return "MdtmMan"; } private: - void *zmq_ipc_req = nullptr; int zmq_msg_size = 1024; - json pipe_desc; + std::string m_pipepath = "/tmp/MdtmManPipes/"; + std::string m_pipename_prefix = "MdtmManPipe"; + std::string m_pipename; + std::string m_full_pipename; + int m_pipe_handler; std::string getmode = "callback"; - std::vector<int> pipes; - std::vector<std::string> pipenames; std::queue<json> jqueue; std::queue<std::vector<char>> vqueue; std::queue<int> iqueue; diff --git a/source/dataman/StreamMan.cpp b/source/dataman/StreamMan.cpp index 68e08ab946722e7fb7792904eb8e166dc3f15e05..a87de51fc809e8340a503bafc4fd845bcae1c2c2 100644 --- a/source/dataman/StreamMan.cpp +++ b/source/dataman/StreamMan.cpp @@ -19,18 +19,29 @@ StreamMan::~StreamMan() { - if (zmq_meta) + if (m_zmq_rep) { - zmq_close(zmq_meta); + zmq_close(m_zmq_rep); } - if (zmq_context) + if (m_zmq_req) { - zmq_ctx_destroy(zmq_context); + zmq_close(m_zmq_req); } - zmq_meta_rep_thread_active = false; - if (zmq_meta_rep_thread.joinable()) + if (m_zmq_context) { - zmq_meta_rep_thread.join(); + zmq_ctx_destroy(m_zmq_context); + } + + m_zmq_rep_thread_active = false; + if (m_zmq_rep_thread.joinable()) + { + m_zmq_rep_thread.join(); + } + + m_zmq_req_thread_active = false; + if (m_zmq_req_thread.joinable()) + { + m_zmq_req_thread.join(); } } @@ -54,38 +65,41 @@ int StreamMan::init(json p_jmsg) { m_clean_mode = p_jmsg["clean_mode"]; } - m_tolerance.assign(m_num_channels, 0); - m_priority.assign(m_num_channels, 100); - if (p_jmsg["num_channels"].is_number_integer()) + if (p_jmsg["tolerance"].is_number()) { - m_num_channels = p_jmsg["num_channels"].get<int>(); + m_tolerance = p_jmsg["tolerance"].get<int>(); } - if (p_jmsg["tolerance"] != nullptr) + if (p_jmsg["priority"].is_number()) { - m_tolerance = p_jmsg["tolerance"].get<std::vector<int>>(); + m_priority = p_jmsg["priority"].get<int>(); } - if (p_jmsg["priority"] != nullptr) + if (p_jmsg["num_channels"].is_number()) { - m_priority = p_jmsg["priority"].get<std::vector<int>>(); + m_num_channels = p_jmsg["num_channels"].get<int>(); + } + if (p_jmsg["channel_id"].is_number()) + { + m_channel_id = p_jmsg["channel_id"].get<int>(); } - if (!zmq_context) + if (!m_zmq_context) { - zmq_context = zmq_ctx_new(); - zmq_meta = zmq_socket(zmq_context, ZMQ_PAIR); + m_zmq_context = zmq_ctx_new(); if (m_stream_mode == "sender") { - zmq_connect(zmq_meta, remote_address.c_str()); - logging("StreamMan::init " + remote_address + " connected"); + m_zmq_req = zmq_socket(m_zmq_context, ZMQ_REQ); + zmq_connect(m_zmq_req, remote_address.c_str()); + logging("Connecting " + remote_address + " ..."); } else if (m_stream_mode == "receiver") { - zmq_bind(zmq_meta, local_address.c_str()); - logging("StreamMan::init " + local_address + " bound"); + m_zmq_rep = zmq_socket(m_zmq_context, ZMQ_REP); + zmq_bind(m_zmq_rep, local_address.c_str()); + logging("Binding " + local_address + " ..."); + m_zmq_rep_thread_active = true; + m_zmq_rep_thread = + std::thread(&StreamMan::zmq_rep_thread_func, this); } - zmq_meta_rep_thread_active = true; - zmq_meta_rep_thread = - std::thread(&StreamMan::zmq_meta_rep_thread_func, this); } return 0; } @@ -95,7 +109,21 @@ int StreamMan::init(json p_jmsg) } } -int StreamMan::callback() +int StreamMan::callback_direct(const void *a_data, json &a_jmsg) +{ + if (!m_callback) + { + logging("callback called but callback function not registered!"); + return -1; + } + + m_callback(a_data, a_jmsg["doid"], a_jmsg["var"], + a_jmsg["dtype"].get<std::string>(), + a_jmsg["varshape"].get<std::vector<size_t>>()); + return 0; +} + +int StreamMan::callback_cache() { if (!m_callback) { @@ -104,10 +132,10 @@ int StreamMan::callback() } std::vector<std::string> do_list = m_cache.get_do_list(); - for (std::string i : do_list) + for (const std::string &i : do_list) { std::vector<std::string> var_list = m_cache.get_var_list(i); - for (std::string j : var_list) + for (const std::string &j : var_list) { m_callback( m_cache.get(i, j), i, j, @@ -116,6 +144,7 @@ int StreamMan::callback() } } m_cache.clean("nan"); + return 0; } @@ -123,15 +152,23 @@ void StreamMan::flush() { json msg; msg["operation"] = "flush"; - zmq_send(zmq_meta, msg.dump().c_str(), msg.dump().length(), 0); + char ret[10]; + zmq_send(m_zmq_req, msg.dump().c_str(), msg.dump().length(), 0); + zmq_recv(m_zmq_req, ret, 10, 0); } -void StreamMan::zmq_meta_rep_thread_func() +void StreamMan::zmq_req_thread_func(std::shared_ptr<std::vector<char>> a_data) { - while (zmq_meta_rep_thread_active) + on_put(a_data); +} + +void StreamMan::zmq_rep_thread_func() +{ + while (m_zmq_rep_thread_active) { char msg[1024] = ""; - int ret = zmq_recv(zmq_meta, msg, 1024, ZMQ_NOBLOCK); + int ret = zmq_recv(m_zmq_rep, msg, 1024, ZMQ_NOBLOCK); + zmq_send(m_zmq_rep, "OK", 4, 0); std::string smsg = msg; if (ret >= 0) { @@ -139,13 +176,30 @@ void StreamMan::zmq_meta_rep_thread_func() logging("StreamMan::zmq_meta_rep_thread_func: \n" + jmsg.dump(4)); on_recv(jmsg); } - usleep(10); + else + { + usleep(1); + } } } -int StreamMan::put(const void *p_data, json p_jmsg) +int StreamMan::put_stream(const void *a_data, json a_jmsg) { - p_jmsg["operation"] = "put"; - zmq_send(zmq_meta, p_jmsg.dump().c_str(), p_jmsg.dump().length(), 0); + if (m_zmq_req_thread.joinable()) + { + m_zmq_req_thread.join(); + } + a_jmsg["operation"] = "put"; + char ret[10]; + zmq_send(m_zmq_req, a_jmsg.dump().c_str(), a_jmsg.dump().length(), 0); + zmq_recv(m_zmq_req, ret, 10, 0); + + // copy + std::shared_ptr<std::vector<char>> data = + std::make_shared<std::vector<char>>(); + data->resize(a_jmsg["sendbytes"].get<size_t>()); + std::memcpy(data->data(), a_data, a_jmsg["sendbytes"].get<size_t>()); + + m_zmq_req_thread = std::thread(&StreamMan::zmq_req_thread_func, this, data); return 0; } diff --git a/source/dataman/StreamMan.h b/source/dataman/StreamMan.h index 95897ca9c74715d6930da3149fd9dc5b52e7b659..d0cffdee4b2d7a65cb6023d92f953e8b40c4ec74 100644 --- a/source/dataman/StreamMan.h +++ b/source/dataman/StreamMan.h @@ -22,32 +22,31 @@ public: StreamMan() = default; virtual ~StreamMan(); - virtual int init(json p_jmsg); - virtual int put(const void *p_data, json p_jmsg); - virtual void on_recv(json msg) = 0; + virtual int init(json a_jmsg); + virtual void on_recv(json &a_msg) = 0; + virtual void on_put(std::shared_ptr<std::vector<char>> a_data) = 0; void flush(); virtual std::string type() { return "Stream"; } protected: - void *zmq_context = NULL; - CacheMan m_cache; - int callback(); + int callback_direct(const void *a_data, json &a_jmsg); + int callback_cache(); + int put_stream(const void *a_data, json a_jmsg); + void *m_zmq_context = nullptr; + CacheMan m_cache; std::string m_get_mode = "callback"; std::string m_stream_mode; std::string m_local_ip; std::string m_remote_ip; int m_local_port; int m_remote_port; - int m_num_channels = 10; - std::vector<int> m_tolerance; - std::vector<int> m_priority; + int m_tolerance = 0; + int m_priority = 100; + int m_channel_id = 0; + int m_num_channels = 1; std::string m_clean_mode = "nan"; - // parallel - std::string m_parallel_mode = "round"; // round, priority - int m_current_channel = 0; - inline std::string make_address(std::string ip, int port, std::string protocol) { @@ -57,10 +56,15 @@ protected: } private: - void *zmq_meta = NULL; - void zmq_meta_rep_thread_func(); - bool zmq_meta_rep_thread_active; - std::thread zmq_meta_rep_thread; + void zmq_rep_thread_func(); + std::thread m_zmq_rep_thread; + void *m_zmq_rep = nullptr; + bool m_zmq_rep_thread_active = false; + + void zmq_req_thread_func(std::shared_ptr<std::vector<char>> a_data); + std::thread m_zmq_req_thread; + void *m_zmq_req = nullptr; + bool m_zmq_req_thread_active = false; }; #endif diff --git a/source/dataman/TemporalMan.cpp b/source/dataman/TemporalMan.cpp index c87bf7b102e9ee01c62582aa6541088e4bf69d86..e60081bf0357bd2c43b3f2f506bd4d9d2ae3b6bc 100644 --- a/source/dataman/TemporalMan.cpp +++ b/source/dataman/TemporalMan.cpp @@ -12,7 +12,7 @@ int TemporalMan::init(json p_jmsg) { return 0; } -int TemporalMan::put(const void *p_data, json p_jmsg) +int TemporalMan::put(const void *p_data, json &p_jmsg) { put_begin(p_data, p_jmsg); put_end(p_data, p_jmsg); diff --git a/source/dataman/TemporalMan.h b/source/dataman/TemporalMan.h index 5f0362d1121f5a24f45a782eceee7de4cea203a7..1ced811da9df0e7b7d0e78b0e18d7c61e5da9a61 100644 --- a/source/dataman/TemporalMan.h +++ b/source/dataman/TemporalMan.h @@ -18,9 +18,9 @@ class TemporalMan : public CompressMan public: TemporalMan() = default; virtual ~TemporalMan() = default; - virtual int init(json p_jmsg); - virtual int put(const void *p_data, json p_jmsg); - virtual int get(void *p_data, json &p_jmsg); + virtual int init(json a_jmsg); + virtual int put(const void *a_data, json &a_jmsg); + virtual int get(void *a_data, json &a_jmsg); virtual void flush(); virtual void transform(std::vector<char> &a_data, json &a_jmsg); std::string name() { return "TemporalMan"; } diff --git a/source/dataman/ZfpMan.cpp b/source/dataman/ZfpMan.cpp index 6a2b1291b271f604c727c166eae8ae197bac5114..54c5ef48c799579c70c38464a3d43a75a7ed2f0a 100644 --- a/source/dataman/ZfpMan.cpp +++ b/source/dataman/ZfpMan.cpp @@ -21,30 +21,28 @@ int ZfpMan::init(json a_jmsg) return 0; } -int ZfpMan::put(const void *a_data, json a_jmsg) +int ZfpMan::put(const void *a_data, json &a_jmsg) { put_begin(a_data, a_jmsg); - std::vector<char> compressed_data; if (check_json(a_jmsg, {"doid", "var", "dtype", "putshape"}, "ZfpMan")) { - if (not a_jmsg["compression_rate"].is_number()) - { - a_jmsg["compression_rate"] = m_compression_rate; - } compress(const_cast<void *>(a_data), compressed_data, a_jmsg); } - put_end(compressed_data.data(), a_jmsg); return 0; } int ZfpMan::get(void *a_data, json &a_jmsg) { return 0; } -void ZfpMan::flush() { flush_next(); } +void ZfpMan::flush() {} int ZfpMan::compress(void *a_input, std::vector<char> &a_output, json &a_jmsg) { + if (!a_jmsg["compression_rate"].is_number()) + { + a_jmsg["compression_rate"] = m_compression_rate; + } std::string dtype = a_jmsg["dtype"]; std::vector<size_t> shape = a_jmsg["putshape"].get<std::vector<size_t>>(); int compression_rate = a_jmsg["compression_rate"].get<int>(); diff --git a/source/dataman/ZfpMan.h b/source/dataman/ZfpMan.h index 8f81df2564dd31d058ee2e6571c1d1b4cbb39df8..d2f27e65db5efeee89dc6f99bf076af2098b6a5d 100644 --- a/source/dataman/ZfpMan.h +++ b/source/dataman/ZfpMan.h @@ -19,7 +19,7 @@ public: ZfpMan() = default; virtual ~ZfpMan() = default; virtual int init(json a_jmsg); - virtual int put(const void *a_data, json a_jmsg); + virtual int put(const void *a_data, json &a_jmsg); virtual int get(void *a_data, json &a_jmsg); virtual void transform(std::vector<char> &a_data, json &a_jmsg); virtual void flush(); diff --git a/source/dataman/ZmqMan.cpp b/source/dataman/ZmqMan.cpp index 1832ca14d6046ccd67471c16f66bc8b0376ffaf1..6300dfcd48ed81957d67b4ba633b6158371fb035 100644 --- a/source/dataman/ZmqMan.cpp +++ b/source/dataman/ZmqMan.cpp @@ -26,7 +26,7 @@ int ZmqMan::init(json a_jmsg) StreamMan::init(a_jmsg); if (m_stream_mode == "sender") { - zmq_data = zmq_socket(zmq_context, ZMQ_REQ); + zmq_data = zmq_socket(m_zmq_context, ZMQ_REQ); std::string remote_address = make_address(m_remote_ip, m_remote_port + 1, "tcp"); zmq_connect(zmq_data, remote_address.c_str()); @@ -34,7 +34,7 @@ int ZmqMan::init(json a_jmsg) } else if (m_stream_mode == "receiver") { - zmq_data = zmq_socket(zmq_context, ZMQ_REP); + zmq_data = zmq_socket(m_zmq_context, ZMQ_REP); std::string local_address = make_address(m_local_ip, m_local_port + 1, "tcp"); zmq_bind(zmq_data, local_address.c_str()); @@ -43,20 +43,24 @@ int ZmqMan::init(json a_jmsg) return 0; } -int ZmqMan::put(const void *a_data, json a_jmsg) +int ZmqMan::put(const void *a_data, json &a_jmsg) { - char ret[10]; DataManBase::put_begin(a_data, a_jmsg); - StreamMan::put(a_data, a_jmsg); - zmq_send(zmq_data, a_data, a_jmsg["sendbytes"].get<size_t>(), 0); - zmq_recv(zmq_data, ret, 10, 0); + StreamMan::put_stream(a_data, a_jmsg); DataManBase::put_end(a_data, a_jmsg); return 0; } int ZmqMan::get(void *a_data, json &a_jmsg) { return 0; } -void ZmqMan::on_recv(json a_jmsg) +void ZmqMan::on_put(std::shared_ptr<std::vector<char>> a_data) +{ + char ret[10]; + zmq_send(zmq_data, a_data->data(), a_data->size(), 0); + zmq_recv(zmq_data, ret, 10, 0); +} + +void ZmqMan::on_recv(json &a_jmsg) { if (a_jmsg["operation"].get<std::string>() == "put") { @@ -65,15 +69,26 @@ void ZmqMan::on_recv(json a_jmsg) int ret = zmq_recv(zmq_data, data.data(), sendbytes, 0); zmq_send(zmq_data, "OK", 10, 0); - if (a_jmsg["compression_method"].is_string() and - a_jmsg["compression_method"].get<std::string>() != "null") + // if data is compressed then call auto_transform to decompress + if (a_jmsg["compression_method"].is_string()) + { + if (a_jmsg["compression_method"].get<std::string>() != "null") + { + auto_transform(data, a_jmsg); + } + } + + if (a_jmsg["varshape"] == a_jmsg["putshape"]) + { + callback_direct(data.data(), a_jmsg); + } + else { - auto_transform(data, a_jmsg); + m_cache.put(data.data(), a_jmsg); } - m_cache.put(data.data(), a_jmsg); } else if (a_jmsg["operation"].get<std::string>() == "flush") { - callback(); + callback_cache(); } } diff --git a/source/dataman/ZmqMan.h b/source/dataman/ZmqMan.h index 48fa2bbe0b739c4d9de06a4edde1e7c54d6b5708..9571f5d4927b7d7d84899c524804b2d7c7aa2dbb 100644 --- a/source/dataman/ZmqMan.h +++ b/source/dataman/ZmqMan.h @@ -19,16 +19,17 @@ public: ZmqMan() = default; virtual ~ZmqMan(); - virtual int init(json p_jmsg); - virtual int put(const void *p_data, json p_jmsg); - virtual int get(void *p_data, json &p_jmsg); + virtual int init(json a_jmsg); + virtual int put(const void *a_data, json &a_jmsg); + virtual int get(void *a_data, json &a_jmsg); virtual void transform(std::vector<char> &a_data, json &a_jmsg) {} - virtual void on_recv(json msg); + virtual void on_recv(json &a_msg); + virtual void on_put(std::shared_ptr<std::vector<char>> a_data); std::string name() { return "ZmqMan"; } private: - void *zmq_data = NULL; + void *zmq_data = nullptr; }; extern "C" DataManBase *getMan() { return new ZmqMan; } diff --git a/testing/CMakeLists.txt b/testing/CMakeLists.txt index 80385077f8485cf5b0d9ba29e44f1db93c8a8669..7ed0bbbc8764e841205b5b9ef0e33903b47fd433 100644 --- a/testing/CMakeLists.txt +++ b/testing/CMakeLists.txt @@ -3,4 +3,6 @@ # accompanying file Copyright.txt for details. #------------------------------------------------------------------------------# +include(GoogleTest) + add_subdirectory(adios2) diff --git a/testing/adios2/engine/adios1/CMakeLists.txt b/testing/adios2/engine/adios1/CMakeLists.txt index e71473103c8005ed6fff2b4c2c4963db67662eec..35fd48db97582a48a8c1b3bfc456fc3aeb151c16 100644 --- a/testing/adios2/engine/adios1/CMakeLists.txt +++ b/testing/adios2/engine/adios1/CMakeLists.txt @@ -10,5 +10,5 @@ if(NOT ADIOS_USE_MPI) add_executable(TestADIOS1WriteRead TestADIOS1WriteRead.cpp) target_link_libraries(TestADIOS1WriteRead adios2 gtest adios1::adios) - add_test(NAME adios2::engine::adios1::write_read COMMAND TestADIOS1WriteRead) + gtest_add_tests(TARGET TestADIOS1WriteRead) endif() diff --git a/testing/adios2/engine/bp/CMakeLists.txt b/testing/adios2/engine/bp/CMakeLists.txt index 87eb336a2faf5cf09dd5fcc4001421f6a5ecb4c3..270f0171ca25554ed044619b4847e2fa11a96a00 100644 --- a/testing/adios2/engine/bp/CMakeLists.txt +++ b/testing/adios2/engine/bp/CMakeLists.txt @@ -10,5 +10,5 @@ if(NOT ADIOS_USE_MPI) add_executable(TestBPWriteRead TestBPWriteRead.cpp) target_link_libraries(TestBPWriteRead adios2 gtest adios1::adios) - add_test(NAME adios2::engine::bp::write_read COMMAND TestBPWriteRead) + gtest_add_tests(TARGET TestBPWriteRead) endif() diff --git a/testing/adios2/engine/hdf5/CMakeLists.txt b/testing/adios2/engine/hdf5/CMakeLists.txt index 2f69ed7a3a83f751b1a9f756dd350389b9e6db28..9847eaee6b081946ccf2c98db47a23b587daa223 100644 --- a/testing/adios2/engine/hdf5/CMakeLists.txt +++ b/testing/adios2/engine/hdf5/CMakeLists.txt @@ -9,4 +9,4 @@ add_executable(TestHDF5WriteRead TestHDF5WriteRead.cpp) target_include_directories(TestHDF5WriteRead PRIVATE ${HDF5_C_INCLUDE_DIRS}) target_link_libraries(TestHDF5WriteRead adios2 gtest ${HDF5_C_LIBRARIES}) -add_test(NAME adios2::engine::hdf5::write_read COMMAND TestHDF5WriteRead) +gtest_add_tests(TARGET TestHDF5WriteRead) diff --git a/testing/adios2/interface/CMakeLists.txt b/testing/adios2/interface/CMakeLists.txt index 7924344f3098f0dd61e0a4d3b91b76143c70b314..e229416de28347dbe46931805724e7d05bbdfdb2 100644 --- a/testing/adios2/interface/CMakeLists.txt +++ b/testing/adios2/interface/CMakeLists.txt @@ -5,4 +5,5 @@ add_executable(TestADIOSInterfaceWrite TestADIOSInterfaceWrite.cpp) target_link_libraries(TestADIOSInterfaceWrite adios2 gtest gtest_main) -add_test(NAME adios2::interface::write COMMAND TestADIOSInterfaceWrite) + +gtest_add_tests(TARGET TestADIOSInterfaceWrite)