Skip to content
Snippets Groups Projects
Commit 396a42fe authored by William F Godoy's avatar William F Godoy
Browse files

Merge remote-tracking branch 'upstream/master' into refactor

Conflicts:
	examples/experimental/multistep/reader_allsteps.cpp
	examples/experimental/multistep/reader_stepping.cpp
	examples/experimental/multistep/writer_multistep.cpp
	examples/hello/README.md
	examples/hello/bpWriter/helloBPWriter.cpp
	examples/hello/hdf5Writer/helloHDF5Writer.cpp
	source/adios2/engine/hdf5/HDF5Common.cpp
	source/adios2/engine/hdf5/HDF5WriterP.cpp
	source/adios2/engine/hdf5/HDF5WriterP.h
parents 77a5923b f25085c9
No related branches found
No related tags found
1 merge request!124Refactor
Showing
with 1074 additions and 378 deletions
#------------------------------------------------------------------------------#
# Distributed under the OSI-approved Apache License, Version 2.0. See
# accompanying file Copyright.txt for details.
#------------------------------------------------------------------------------#
# This module is already included in new versions of CMake
if(CMAKE_VERSION VERSION_LESS 3.10)
include(${CMAKE_CURRENT_LIST_DIR}/upstream/GoogleTest.cmake)
else()
include(${CMAKE_ROOT}/Modules/GoogleTest.cmake)
endif()
# Distributed under the OSI-approved BSD 3-Clause License. See accompanying
# file Copyright.txt or https://cmake.org/licensing for details.
#[=======================================================================[.rst:
GoogleTest
----------
This module defines functions to help use the Google Test infrastructure.
.. command:: gtest_add_tests
Automatically add tests with CTest by scanning source code for Google Test
macros::
gtest_add_tests(TARGET target
[SOURCES src1...]
[EXTRA_ARGS arg1...]
[WORKING_DIRECTORY dir]
[TEST_PREFIX prefix]
[TEST_SUFFIX suffix]
[SKIP_DEPENDENCY]
[TEST_LIST outVar]
)
``TARGET target``
This must be a known CMake target. CMake will substitute the location of
the built executable when running the test.
``SOURCES src1...``
When provided, only the listed files will be scanned for test cases. If
this option is not given, the :prop_tgt:`SOURCES` property of the
specified ``target`` will be used to obtain the list of sources.
``EXTRA_ARGS arg1...``
Any extra arguments to pass on the command line to each test case.
``WORKING_DIRECTORY dir``
Specifies the directory in which to run the discovered test cases. If this
option is not provided, the current binary directory is used.
``TEST_PREFIX prefix``
Allows the specified ``prefix`` to be prepended to the name of each
discovered test case. This can be useful when the same source files are
being used in multiple calls to ``gtest_add_test()`` but with different
``EXTRA_ARGS``.
``TEST_SUFFIX suffix``
Similar to ``TEST_PREFIX`` except the ``suffix`` is appended to the name of
every discovered test case. Both ``TEST_PREFIX`` and ``TEST_SUFFIX`` can be
specified.
``SKIP_DEPENDENCY``
Normally, the function creates a dependency which will cause CMake to be
re-run if any of the sources being scanned are changed. This is to ensure
that the list of discovered tests is updated. If this behavior is not
desired (as may be the case while actually writing the test cases), this
option can be used to prevent the dependency from being added.
``TEST_LIST outVar``
The variable named by ``outVar`` will be populated in the calling scope
with the list of discovered test cases. This allows the caller to do things
like manipulate test properties of the discovered tests.
.. code-block:: cmake
include(GoogleTest)
add_executable(FooTest FooUnitTest.cxx)
gtest_add_tests(TARGET FooTest
TEST_SUFFIX .noArgs
TEST_LIST noArgsTests
)
gtest_add_tests(TARGET FooTest
EXTRA_ARGS --someArg someValue
TEST_SUFFIX .withArgs
TEST_LIST withArgsTests
)
set_tests_properties(${noArgsTests} PROPERTIES TIMEOUT 10)
set_tests_properties(${withArgsTests} PROPERTIES TIMEOUT 20)
For backward compatibility reasons, the following form is also supported::
gtest_add_tests(exe args files...)
``exe``
The path to the test executable or the name of a CMake target.
``args``
A ;-list of extra arguments to be passed to executable. The entire
list must be passed as a single argument. Enclose it in quotes,
or pass ``""`` for no arguments.
``files...``
A list of source files to search for tests and test fixtures.
Alternatively, use ``AUTO`` to specify that ``exe`` is the name
of a CMake executable target whose sources should be scanned.
.. code-block:: cmake
include(GoogleTest)
set(FooTestArgs --foo 1 --bar 2)
add_executable(FooTest FooUnitTest.cxx)
gtest_add_tests(FooTest "${FooTestArgs}" AUTO)
#]=======================================================================]
function(gtest_add_tests)
if (ARGC LESS 1)
message(FATAL_ERROR "No arguments supplied to gtest_add_tests()")
endif()
set(options
SKIP_DEPENDENCY
)
set(oneValueArgs
TARGET
WORKING_DIRECTORY
TEST_PREFIX
TEST_SUFFIX
TEST_LIST
)
set(multiValueArgs
SOURCES
EXTRA_ARGS
)
set(allKeywords ${options} ${oneValueArgs} ${multiValueArgs})
unset(sources)
if("${ARGV0}" IN_LIST allKeywords)
cmake_parse_arguments(ARGS "${options}" "${oneValueArgs}" "${multiValueArgs}" ${ARGN})
set(autoAddSources YES)
else()
# Non-keyword syntax, convert to keyword form
if (ARGC LESS 3)
message(FATAL_ERROR "gtest_add_tests() without keyword options requires at least 3 arguments")
endif()
set(ARGS_TARGET "${ARGV0}")
set(ARGS_EXTRA_ARGS "${ARGV1}")
if(NOT "${ARGV2}" STREQUAL "AUTO")
set(ARGS_SOURCES "${ARGV}")
list(REMOVE_AT ARGS_SOURCES 0 1)
endif()
endif()
# The non-keyword syntax allows the first argument to be an arbitrary
# executable rather than a target if source files are also provided. In all
# other cases, both forms require a target.
if(NOT TARGET "${ARGS_TARGET}" AND NOT ARGS_SOURCES)
message(FATAL_ERROR "${ARGS_TARGET} does not define an existing CMake target")
endif()
if(NOT ARGS_WORKING_DIRECTORY)
unset(workDir)
else()
set(workDir WORKING_DIRECTORY "${ARGS_WORKING_DIRECTORY}")
endif()
if(NOT ARGS_SOURCES)
get_property(ARGS_SOURCES TARGET ${ARGS_TARGET} PROPERTY SOURCES)
endif()
unset(testList)
set(gtest_case_name_regex ".*\\( *([A-Za-z_0-9]+) *, *([A-Za-z_0-9]+) *\\).*")
set(gtest_test_type_regex "(TYPED_TEST|TEST_?[FP]?)")
foreach(source IN LISTS ARGS_SOURCES)
if(NOT ARGS_SKIP_DEPENDENCY)
set_property(DIRECTORY APPEND PROPERTY CMAKE_CONFIGURE_DEPENDS ${source})
endif()
file(READ "${source}" contents)
string(REGEX MATCHALL "${gtest_test_type_regex} *\\(([A-Za-z_0-9 ,]+)\\)" found_tests ${contents})
foreach(hit ${found_tests})
string(REGEX MATCH "${gtest_test_type_regex}" test_type ${hit})
# Parameterized tests have a different signature for the filter
if("x${test_type}" STREQUAL "xTEST_P")
string(REGEX REPLACE ${gtest_case_name_regex} "*/\\1.\\2/*" gtest_test_name ${hit})
elseif("x${test_type}" STREQUAL "xTEST_F" OR "x${test_type}" STREQUAL "xTEST")
string(REGEX REPLACE ${gtest_case_name_regex} "\\1.\\2" gtest_test_name ${hit})
elseif("x${test_type}" STREQUAL "xTYPED_TEST")
string(REGEX REPLACE ${gtest_case_name_regex} "\\1/*.\\2" gtest_test_name ${hit})
else()
message(WARNING "Could not parse GTest ${hit} for adding to CTest.")
continue()
endif()
# Make sure tests disabled in GTest get disabled in CTest
if(gtest_test_name MATCHES "(^|\\.)DISABLED_")
# Add the disabled test if CMake is new enough
# Note that this check is to allow backwards compatibility so this
# module can be copied locally in projects to use with older CMake
# versions
if(CMAKE_VERSION VERSION_GREATER_EQUAL 3.8.20170401)
string(REGEX REPLACE
"(^|\\.)DISABLED_" "\\1"
orig_test_name "${gtest_test_name}"
)
set(ctest_test_name
${ARGS_TEST_PREFIX}${orig_test_name}${ARGS_TEST_SUFFIX}
)
add_test(NAME ${ctest_test_name}
${workDir}
COMMAND ${ARGS_TARGET}
--gtest_also_run_disabled_tests
--gtest_filter=${gtest_test_name}
${ARGS_EXTRA_ARGS}
)
set_tests_properties(${ctest_test_name} PROPERTIES DISABLED TRUE)
list(APPEND testList ${ctest_test_name})
endif()
else()
set(ctest_test_name ${ARGS_TEST_PREFIX}${gtest_test_name}${ARGS_TEST_SUFFIX})
add_test(NAME ${ctest_test_name}
${workDir}
COMMAND ${ARGS_TARGET}
--gtest_filter=${gtest_test_name}
${ARGS_EXTRA_ARGS}
)
list(APPEND testList ${ctest_test_name})
endif()
endforeach()
endforeach()
if(ARGS_TEST_LIST)
set(${ARGS_TEST_LIST} ${testList} PARENT_SCOPE)
endif()
endfunction()
......@@ -52,4 +52,45 @@ if(ADIOS_USE_MPI)
${MPI_C_LIBRARIES} ${HDF5_C_LIBRARIES}
)
endif()
if(ADIOS_USE_HDF5)
find_package(HDF5 REQUIRED)
find_package(MPI COMPONENTS C REQUIRED)
add_executable(heatTransfer_write_ph5
main.cpp
HeatTransfer.cpp
Settings.cpp
IO_ph5.cpp
)
target_include_directories(heatTransfer_write_ph5
PRIVATE ${MPI_C_INCLUDE_PATH} ${HDF5_C_INCLUDE_DIRS}
)
target_link_libraries(heatTransfer_write_ph5
${MPI_C_LIBRARIES} ${HDF5_C_LIBRARIES}
)
endif()
if(ADIOS_USE_HDF5)
find_package(MPI COMPONENTS C REQUIRED)
add_executable(heatTransfer_write_a2h5
main.cpp
HeatTransfer.cpp
Settings.cpp
IO_ph5_adios2.cpp
)
target_include_directories(heatTransfer_write_a2h5
PRIVATE ${MPI_C_INCLUDE_PATH}
)
#target_link_libraries(heatTransfer_write_a2h5
# ${MPI_C_LIBRARIES}
#)
target_link_libraries(heatTransfer_write_a2h5 PUBLIC adios2)
endif()
endif()
......@@ -16,6 +16,11 @@
#include <adios.h>
// Enable compatibility with ADIOS 1.10 adios_declare_group signature
#if !ADIOS_VERSION_GE(1, 11, 0)
#define adios_stat_default adios_flag_yes
#endif
static int64_t group;
static int rank_saved;
......
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* IO_ADIOS2.cpp
*
* Created on: Feb 2017
* Author: Norbert Podhorszki
*/
#include "IO.h"
#include <hdf5.h>
#include <ios>
#include <iostream>
#include <memory>
#include <stdexcept>
#include <string>
class HDF5NativeWriter
{
public:
HDF5NativeWriter(const std::string &fileName);
~HDF5NativeWriter();
bool Advance();
void Close();
void CheckWriteGroup();
void WriteScalar(const std::string &varName, const void *data,
hid_t h5Type);
void WriteSimple(const std::string &varName, int dimSize, const void *data,
hid_t h5Type, const hsize_t *shape, const hsize_t *offset,
const hsize_t *count);
int m_CurrentTimeStep;
unsigned int m_TotalTimeSteps;
private:
hid_t m_FilePropertyListId;
hid_t m_FileId;
hid_t m_GroupId;
};
HDF5NativeWriter::HDF5NativeWriter(const std::string &fileName)
: m_CurrentTimeStep(0), m_TotalTimeSteps(0)
{
m_FilePropertyListId = H5Pcreate(H5P_FILE_ACCESS);
// read a file collectively
H5Pset_fapl_mpio(m_FilePropertyListId, MPI_COMM_WORLD, MPI_INFO_NULL);
m_FileId = H5Fcreate(fileName.c_str(), H5F_ACC_TRUNC, H5P_DEFAULT,
m_FilePropertyListId);
if (m_FileId < 0)
{
throw std::runtime_error("Unable to open " + fileName + " for reading");
}
std::string ts0 = "/TimeStep0";
m_GroupId = H5Gcreate2(m_FileId, ts0.c_str(), H5P_DEFAULT, H5P_DEFAULT,
H5P_DEFAULT);
if (m_GroupId < 0)
{
throw std::runtime_error("HDF5: Unable to create group " + ts0);
}
}
HDF5NativeWriter::~HDF5NativeWriter() { Close(); }
void HDF5NativeWriter::Close()
{
if (m_FileId < 0)
return;
hid_t s = H5Screate(H5S_SCALAR);
hid_t attr = H5Acreate(m_FileId, "NumTimeSteps", H5T_NATIVE_UINT, s,
H5P_DEFAULT, H5P_DEFAULT);
uint totalTimeSteps = m_CurrentTimeStep + 1;
if (m_GroupId < 0)
{
totalTimeSteps = m_CurrentTimeStep;
}
H5Awrite(attr, H5T_NATIVE_UINT, &totalTimeSteps);
H5Sclose(s);
H5Aclose(attr);
if (m_GroupId >= 0)
{
H5Gclose(m_GroupId);
m_GroupId = -1;
}
H5Fclose(m_FileId);
m_FileId = -1;
H5Pclose(m_FilePropertyListId);
}
bool HDF5NativeWriter::Advance()
{
if (m_GroupId >= 0)
{
H5Gclose(m_GroupId);
m_GroupId = -1;
}
++m_CurrentTimeStep;
return true;
}
void HDF5NativeWriter::CheckWriteGroup()
{
if (m_GroupId >= 0)
{
return;
}
std::string timeStepName = "/TimeStep" + std::to_string(m_CurrentTimeStep);
m_GroupId = H5Gcreate2(m_FileId, timeStepName.c_str(), H5P_DEFAULT,
H5P_DEFAULT, H5P_DEFAULT);
if (m_GroupId < 0)
{
throw std::runtime_error("HDF5: Unable to create group " +
timeStepName);
}
}
void HDF5NativeWriter::WriteScalar(const std::string &varName, const void *data,
hid_t h5Type)
{
CheckWriteGroup();
// scalar
hid_t filespaceID = H5Screate(H5S_SCALAR);
hid_t dsetID = H5Dcreate(m_GroupId, varName.c_str(), h5Type, filespaceID,
H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
herr_t status =
H5Dwrite(dsetID, h5Type, H5S_ALL, H5S_ALL, H5P_DEFAULT, data);
H5Sclose(filespaceID);
H5Dclose(dsetID);
}
void HDF5NativeWriter::WriteSimple(const std::string &varName, int dimSize,
const void *data, hid_t h5Type,
const hsize_t *shape, const hsize_t *offset,
const hsize_t *count)
{
CheckWriteGroup();
hid_t fileSpace = H5Screate_simple(dimSize, shape, NULL);
hid_t dsetID = H5Dcreate(m_GroupId, varName.c_str(), h5Type, fileSpace,
H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
hid_t memSpace = H5Screate_simple(dimSize, count, NULL);
// Select hyperslab
fileSpace = H5Dget_space(dsetID);
H5Sselect_hyperslab(fileSpace, H5S_SELECT_SET, offset, NULL, count, NULL);
// Create property list for collective dataset write.
hid_t plistID = H5Pcreate(H5P_DATASET_XFER);
H5Pset_dxpl_mpio(plistID, H5FD_MPIO_COLLECTIVE);
herr_t status;
status = H5Dwrite(dsetID, h5Type, memSpace, fileSpace, plistID, data);
if (status < 0)
{
// error
std::cerr << " Write failed. " << std::endl;
}
H5Dclose(dsetID);
H5Sclose(fileSpace);
H5Sclose(memSpace);
H5Pclose(plistID);
}
//
//
std::shared_ptr<HDF5NativeWriter> h5writer;
// HDF5NativeWriter* h5writer;
IO::IO(const Settings &s, MPI_Comm comm)
{
m_outputfilename = s.outputfile + ".h5";
if (s.outputfile[0] == '0')
{
std::cout << " no writer. " << std::endl;
h5writer = nullptr;
return;
}
h5writer = std::make_shared<HDF5NativeWriter>(m_outputfilename);
if (h5writer == nullptr)
throw std::ios_base::failure("ERROR: failed to open ADIOS h5writer\n");
}
IO::~IO()
{
if (h5writer != nullptr)
{
h5writer->Close();
}
// delete h5writer;
}
void IO::write(int step, const HeatTransfer &ht, const Settings &s,
MPI_Comm comm)
{
if (h5writer == nullptr)
{
return;
}
std::vector<hsize_t> dims = {s.gndx, s.gndy};
std::vector<hsize_t> offset = {s.offsx, s.offsy};
std::vector<hsize_t> count = {s.ndx, s.ndy};
h5writer->WriteSimple("T", 2, ht.data_noghost().data(), H5T_NATIVE_DOUBLE,
dims.data(), offset.data(), count.data());
h5writer->WriteScalar("gndy", &(s.gndy), H5T_NATIVE_UINT);
h5writer->WriteScalar("gndx", &(s.gndx), H5T_NATIVE_UINT);
h5writer->Advance();
}
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* IO_ADIOS2.cpp
*
* Created on: Feb 2017
* Author: Norbert Podhorszki
*/
#include "IO.h"
#include <string>
#include <adios2.h>
static int rank_saved;
adios::ADIOS *ad = nullptr;
std::shared_ptr<adios::Engine> h5writer;
adios::Variable<double> *varT = nullptr;
adios::Variable<unsigned int> *varGndx = nullptr;
IO::IO(const Settings &s, MPI_Comm comm)
{
rank_saved = s.rank;
m_outputfilename = s.outputfile + ".h5";
// adios::ADIOS adios(comm, adios::Verbose::INFO, false);
ad = new adios::ADIOS(comm, adios::Verbose::INFO, false);
// Define method for engine creation
// 1. Get method def from config file or define new one
adios::Method &h5writerSettings = ad->DeclareMethod("output");
if (!h5writerSettings.IsUserDefined())
{
// if not defined by user, we can change the default settings
// BPFileWriter is the default engine
h5writerSettings.SetEngine("HDF5Writer");
// Allow an extra thread for data processing
const std::string aggregatorsParam("Aggregators=" +
std::to_string((s.nproc + 1) / 2));
h5writerSettings.SetParameters("have_metadata_file=yes",
aggregatorsParam);
}
// ad->DefineScalar<unsigned int>("gndx", true);
varGndx = &(ad->DefineVariable<unsigned int>("gndx"));
ad->DefineVariable<unsigned int>("gndy");
// define T as 2D global array
varT = &(ad->DefineArray<double>(
"T",
// Global dimensions
{s.gndx, s.gndy},
// starting offset of the local array in the global space
{s.offsx, s.offsy},
// local size, could be defined later using SetSelection()
{s.ndx, s.ndy}));
// add transform to variable
// adios::Transform tr = adios::transform::BZIP2( );
// varT.AddTransform( tr, "" );
// varT.AddTransform( tr,"accuracy=0.001" ); // for ZFP
h5writer = ad->Open(m_outputfilename, "w", comm, h5writerSettings);
if (h5writer == nullptr)
throw std::ios_base::failure("ERROR: failed to open ADIOS h5writer\n");
}
IO::~IO()
{
h5writer->Close();
// delete ad;
}
void IO::write(int step, const HeatTransfer &ht, const Settings &s,
MPI_Comm comm)
{
#if 1
/* This selection is redundant and not required, since we defined
* the selection already in DefineVariable(). It is here just as an example.
*/
// Make a selection to describe the local dimensions of the variable we
// write and its offsets in the global spaces. This could have been done in
// adios.DefineVariable()
// adios::SelectionBoundingBox sel({s.offsx, s.offsy}, {s.ndx, s.ndy});
// varT->SetSelection(sel);
/* Select the area that we want to write from the data pointer we pass to
the
writer.
Think HDF5 memspace, just not hyperslabs, only a bounding box selection.
Engine will copy this bounding box from the data pointer into the output
buffer.
Size of the bounding box should match the "space" selection which was
given
above.
Default memspace is always the full selection.
*/
// adios::SelectionBoundingBox memspace =
// adios::SelectionBoundingBox({1, 1}, {s.ndx, s.ndy});
// varT->SetMemorySelection(memspace);
h5writer->Write<double>(*varT, ht.data_noghost().data());
// h5writer->Write(*varT, ht.data_noghost().data());
h5writer->Write<unsigned int>(*varGndx, &(s.gndx));
h5writer->Write("gndy", &(s.gndy));
h5writer->Advance();
#else
h5writer->Write<double>(*varT, ht.data_noghost().data());
h5writer->Advance();
#endif
}
<?xml version="1.0"?>
<adios-config>
<io name="Output">
<engine name="BPFileWriter">verbose=4;profile_units=mus</engine>
<transport name= "File">
profile_units=mus;
abort_on_error;
have_metadata_file
=
no;
</transport>
<!-- Create a named transform and add variables to it here.
name is optional, required only if it is used outside the definition
options is optional to pass parameters to the transformation
-->
<transform name="LossyCompression" transform="zfp" options="accuracy=0.001">
<var name="myMatrix"/>
<var name="ThisVarDoesNotExists"/>
</transform>
<!-- Unnamed transformation -->
<transform transform="bzip2">
<var name="myMatrix2"/>
</transform>
<!-- A variable can have its own private transform definition.
Also its own ordered chain of transformations.
Also can refer to a transform defined previously
-->
<var name="myDoubles">
<transform transform="identity">verbose=DEBUG</transform>
<transform name="LossyCompression"/>
</var>
<buffer max-size-MB="20"/>
</io>
</adios-config>
......@@ -85,9 +85,9 @@ endif()
if(ADIOS_USE_ADIOS1)
if(ADIOS_USE_MPI)
find_package(ADIOS1 REQUIRED)
find_package(ADIOS1 1.12.0 REQUIRED)
else()
find_package(ADIOS1 COMPONENTS sequential REQUIRED)
find_package(ADIOS1 1.12.0 COMPONENTS sequential REQUIRED)
endif()
target_sources(adios2 PRIVATE
......@@ -113,7 +113,11 @@ if(ADIOS_USE_HDF5)
"of ADIOS is being built, which requires a sequential HDF5."
)
endif()
target_include_directories(adios2 PRIVATE ${HDF5_C_INCLUDE_DIRS})
if(HDF5_C_INCLUDE_DIRS)
target_include_directories(adios2 PRIVATE ${HDF5_C_INCLUDE_DIRS})
else()
target_include_directories(adios2 PRIVATE ${HDF5_INCLUDE_DIRS})
endif()
target_sources(adios2 PRIVATE
engine/hdf5/HDF5ReaderP.cpp
engine/hdf5/HDF5WriterP.cpp
......
......@@ -14,6 +14,11 @@
#include "adios2/helper/adiosFunctions.h"
// Enable compatibility with ADIOS 1.10 adios_declare_group signature
#if !ADIOS_VERSION_GE(1, 11, 0)
#define adios_stat_default adios_flag_yes
#endif
namespace adios
{
......
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* HDF5Common.cpp
*
* Created on: April 20, 2017
* Author: Junmin
*/
#include "HDF5Common.h"
#include <complex>
#include <iostream>
#include "adios2/ADIOSMPI.h"
namespace adios
{
#define H5_ERROR std::cout << "[ADIOS H5 ERROR] "
HDF5Common::HDF5Common()
: m_WriteMode(false), m_CurrentTimeStep(0), m_NumTimeSteps(0)
{
m_DefH5TypeComplexFloat =
H5Tcreate(H5T_COMPOUND, sizeof(std::complex<float>));
H5Tinsert(m_DefH5TypeComplexFloat, "freal", 0, H5T_NATIVE_FLOAT);
H5Tinsert(m_DefH5TypeComplexFloat, "fimg", H5Tget_size(H5T_NATIVE_FLOAT),
H5T_NATIVE_FLOAT);
m_DefH5TypeComplexDouble =
H5Tcreate(H5T_COMPOUND, sizeof(std::complex<double>));
H5Tinsert(m_DefH5TypeComplexDouble, "dreal", 0, H5T_NATIVE_DOUBLE);
H5Tinsert(m_DefH5TypeComplexDouble, "dimg", H5Tget_size(H5T_NATIVE_DOUBLE),
H5T_NATIVE_DOUBLE);
m_DefH5TypeComplexLongDouble =
H5Tcreate(H5T_COMPOUND, sizeof(std::complex<long double>));
H5Tinsert(m_DefH5TypeComplexLongDouble, "ldouble real", 0,
H5T_NATIVE_LDOUBLE);
H5Tinsert(m_DefH5TypeComplexLongDouble, "ldouble img",
H5Tget_size(H5T_NATIVE_LDOUBLE), H5T_NATIVE_LDOUBLE);
}
void HDF5Common::Init(const std::string name, MPI_Comm comm, bool toWrite)
{
m_WriteMode = toWrite;
//
m_PropertyListId = H5Pcreate(H5P_FILE_ACCESS);
#ifdef ADIOS2_HAVE_MPI
H5Pset_fapl_mpio(m_PropertyListId, comm, MPI_INFO_NULL);
#endif
std::string ts0 = "/TimeStep0";
if (toWrite)
{
/*
* Create a new file collectively and release property list identifier.
*/
m_FileId = H5Fcreate(name.c_str(), H5F_ACC_TRUNC, H5P_DEFAULT,
m_PropertyListId);
if (m_FileId >= 0)
{
m_GroupId = H5Gcreate2(m_FileId, ts0.c_str(), H5P_DEFAULT,
H5P_DEFAULT, H5P_DEFAULT);
if (m_GroupId < 0)
{
std::clog << "H5Common::Init: ERROR" << std::endl;
throw std::runtime_error("HDF5: Unable to create group " + ts0);
}
}
}
else
{
// read a file collectively
m_FileId = H5Fopen(name.c_str(), H5F_ACC_RDONLY, H5P_DEFAULT);
if (m_FileId >= 0)
{
m_GroupId = H5Gopen(m_FileId, ts0.c_str(), H5P_DEFAULT);
}
}
H5Pclose(m_PropertyListId);
}
void HDF5Common::WriteTimeSteps()
{
if (m_FileId < 0)
{
// std::cerr<<"[ADIOS HDF5Error]: Invalid file to record timestep
// to."<<std::endl;
H5_ERROR << "Invalid file to record timestep to." << std::endl;
return;
}
if (!m_WriteMode)
{
return;
}
hid_t s = H5Screate(H5S_SCALAR);
hid_t attr = H5Acreate(m_FileId, "NumTimeSteps", H5T_NATIVE_UINT, s,
H5P_DEFAULT, H5P_DEFAULT);
uint totalTimeSteps = m_CurrentTimeStep + 1;
if (m_GroupId < 0)
{
totalTimeSteps = m_CurrentTimeStep;
}
H5Awrite(attr, H5T_NATIVE_UINT, &totalTimeSteps);
H5Sclose(s);
H5Aclose(attr);
}
unsigned int HDF5Common::GetNumTimeSteps()
{
if (m_WriteMode)
{
return -1;
}
if (m_FileId < 0)
{
std::cerr
<< "[ADIOS HDF5Error]: Invalid file to read timestep attribute."
<< std::endl;
return -1;
}
if (m_NumTimeSteps <= 0)
{
hid_t attr = H5Aopen(m_FileId, "NumTimeSteps", H5P_DEFAULT);
H5Aread(attr, H5T_NATIVE_UINT, &m_NumTimeSteps);
H5Aclose(attr);
}
return m_NumTimeSteps;
}
void HDF5Common::Close()
{
if (m_FileId < 0)
{
return;
}
WriteTimeSteps();
if (m_GroupId >= 0)
{
H5Gclose(m_GroupId);
}
H5Fclose(m_FileId);
m_FileId = -1;
m_GroupId = -1;
H5Tclose(m_DefH5TypeComplexFloat);
H5Tclose(m_DefH5TypeComplexDouble);
H5Tclose(m_DefH5TypeComplexLongDouble);
}
void HDF5Common::Advance()
{
if (m_GroupId >= 0)
{
H5Gclose(m_GroupId);
m_GroupId = -1;
}
if (m_WriteMode)
{
// m_GroupId = H5Gcreate2(m_FileId, tsname.c_str(), H5P_DEFAULT,
// H5P_DEFAULT, H5P_DEFAULT);
}
else
{
if (m_NumTimeSteps == 0)
{
GetNumTimeSteps();
}
if (m_CurrentTimeStep + 1 >= m_NumTimeSteps)
{
return;
}
std::string timeStepName =
"/TimeStep" + std::to_string(m_CurrentTimeStep + 1);
m_GroupId = H5Gopen(m_FileId, timeStepName.c_str(), H5P_DEFAULT);
if (m_GroupId < 0)
{
throw std::runtime_error("HDF5: Unable to open group " +
timeStepName);
}
}
++m_CurrentTimeStep;
}
void HDF5Common::CheckWriteGroup()
{
if (!m_WriteMode)
{
return;
}
if (m_GroupId >= 0)
{
return;
}
std::string timeStepName = "/TimeStep" + std::to_string(m_CurrentTimeStep);
m_GroupId = H5Gcreate2(m_FileId, timeStepName.c_str(), H5P_DEFAULT,
H5P_DEFAULT, H5P_DEFAULT);
if (m_GroupId < 0)
{
throw std::runtime_error("HDF5: Unable to create group " +
timeStepName);
}
}
}
......@@ -19,14 +19,14 @@ int CacheItem::init(json a_jmsg)
return 0;
}
int CacheMan::put(const void *a_data, json a_jmsg)
int CacheMan::put(const void *a_data, json &a_jmsg)
{
std::string doid = a_jmsg["doid"].get<std::string>();
std::string var = a_jmsg["var"].get<std::string>();
return m_cache[doid][var].put(a_data, a_jmsg);
}
int CacheItem::put(const void *a_data, json a_jmsg)
int CacheItem::put(const void *a_data, json &a_jmsg)
{
if (!m_initialized)
......
......@@ -21,7 +21,7 @@ public:
using json = nlohmann::json;
int init(json a_jmsg);
virtual int put(const void *a_data, json a_jmsg);
virtual int put(const void *a_data, json &a_jmsg);
virtual void transform(std::vector<char> &a_data, json &a_jmsg) {}
void *get();
......@@ -81,7 +81,7 @@ class CacheMan
public:
using json = nlohmann::json;
int put(const void *a_data, json a_jmsg);
int put(const void *a_data, json &a_jmsg);
void *get(std::string doid, std::string var);
void pop();
void push();
......
......@@ -12,16 +12,19 @@
int DataMan::init(json a_jmsg) { return 0; }
int DataMan::put(const void *a_data, std::string p_doid, std::string p_var,
std::string p_dtype, std::vector<size_t> p_putshape,
std::vector<size_t> p_varshape, std::vector<size_t> p_offset,
size_t p_timestep, int p_tolerance, int p_priority)
int DataMan::put_streams(const void *a_data, json &a_jmsg)
{
return DataMan::put(a_data, p_doid, p_var, p_dtype, p_putshape, p_varshape,
p_offset, p_timestep, p_tolerance, p_priority);
a_jmsg["channel_id"] = m_stream_index;
m_stream_mans[m_stream_index]->put(a_data, a_jmsg);
++m_stream_index;
if (m_stream_index >= m_stream_mans.size())
{
m_stream_index = 0;
}
return 0;
}
int DataMan::put(const void *a_data, json a_jmsg)
int DataMan::put(const void *a_data, json &a_jmsg)
{
a_jmsg["timestep"] = m_timestep;
if (m_cache_size > 0)
......@@ -32,9 +35,10 @@ int DataMan::put(const void *a_data, json a_jmsg)
else
{
put_begin(a_data, a_jmsg);
put_streams(a_data, a_jmsg);
put_end(a_data, a_jmsg);
}
dump_profiling();
return 0;
}
......@@ -57,32 +61,34 @@ void DataMan::add_stream(json a_jmsg)
m_cache_size = a_jmsg["cachesize"].get<size_t>();
}
if (m_tolerance.size() < m_num_channels)
int num_channels = 1;
if (a_jmsg["num_channels"].is_number())
{
for (int i = 0; i < m_num_channels; ++i)
{
m_tolerance.push_back(0);
}
num_channels = a_jmsg["num_channels"].get<int>();
}
if (m_priority.size() < m_num_channels)
else
{
for (int i = 0; i < m_num_channels; ++i)
{
m_priority.push_back(100 / (i + 1));
}
a_jmsg["num_channels"] = num_channels;
}
auto man = get_man(method);
if (man)
for (int i = 0; i < num_channels; i++)
{
man->init(a_jmsg);
this->add_next(method, man);
}
if (a_jmsg["compression_method"].is_string())
{
if (a_jmsg["compression_method"] != "null")
a_jmsg["channel_id"] = i;
a_jmsg["local_port"] = a_jmsg["local_port"].get<int>() + 2;
a_jmsg["remote_port"] = a_jmsg["remote_port"].get<int>() + 2;
auto man = get_man(method);
if (man)
{
add_man_to_path(a_jmsg["compression_method"], method, a_jmsg);
std::cout << a_jmsg.dump(4);
man->init(a_jmsg);
m_stream_mans.push_back(man);
}
if (a_jmsg["compression_method"].is_string())
{
if (a_jmsg["compression_method"] != "null")
{
}
}
}
}
......@@ -104,10 +110,10 @@ void DataMan::flush()
{
json jmsg = m_cache.get_jmsg(j, k);
put_begin(m_cache.get(j, k), jmsg);
put_streams(m_cache.get(j, k), jmsg);
put_end(m_cache.get(j, k), jmsg);
}
}
flush_next();
m_cache.pop();
}
}
......@@ -116,10 +122,6 @@ void DataMan::flush()
m_cache.push();
}
}
else
{
flush_next();
}
}
int DataMan::get(void *a_data, json &a_jmsg) { return 0; }
......@@ -19,31 +19,22 @@ class DataMan : public DataManBase
public:
DataMan() = default;
virtual ~DataMan() = default;
virtual int init(json p_jmsg);
virtual int put(const void *p_data, json p_jmsg);
virtual int get(void *p_data, json &p_jmsg);
virtual int init(json a_jmsg);
virtual int put(const void *a_data, json &a_jmsg);
virtual int get(void *a_data, json &a_jmsg);
int put_streams(const void *a_data, json &a_jmsg);
void flush();
void add_stream(json p_jmsg);
int put(const void *p_data, std::string p_doid, std::string p_var,
std::string p_dtype, std::vector<size_t> p_putshape,
std::vector<size_t> p_varshape, std::vector<size_t> p_offset,
size_t p_timestep, int p_tolerance = 0, int p_priority = 100);
void add_stream(json a_jmsg);
void add_file(std::string p_method);
std::string name() { return "DataManager"; }
std::string type() { return "Manager"; }
virtual void transform(std::vector<char> &a_data, json &a_jmsg) {}
private:
std::string m_local_ip;
std::string m_remote_ip;
int m_local_port = 0;
int m_remote_port = 0;
int m_num_channels = 0;
std::vector<int> m_tolerance;
std::vector<int> m_priority;
CacheMan m_cache;
size_t m_cache_size = 0;
size_t m_timestep = 0;
int m_stream_index = 0;
};
#endif /* DATAMAN_H_ */
#endif // DATAMAN_DATAMAN_H_
......@@ -106,27 +106,6 @@ DataManBase::DataManBase()
m_start_time = std::chrono::system_clock::now();
}
int DataManBase::put(const void *p_data, std::string p_doid, std::string p_var,
std::string p_dtype, std::vector<size_t> p_putshape,
std::vector<size_t> p_varshape,
std::vector<size_t> p_offset, size_t p_timestep,
int p_tolerance, int p_priority)
{
json msg;
msg["doid"] = p_doid;
msg["var"] = p_var;
msg["dtype"] = p_dtype;
msg["putshape"] = p_putshape;
msg["putbytes"] = product(p_putshape, dsize(p_dtype));
msg["varshape"] = p_varshape;
msg["varbytes"] = product(p_varshape, dsize(p_dtype));
msg["offset"] = p_offset;
msg["timestep"] = p_timestep;
msg["tolerance"] = p_tolerance;
msg["priority"] = p_priority;
return put(p_data, msg);
}
int DataManBase::put_begin(const void *p_data, json &p_jmsg)
{
check_shape(p_jmsg);
......@@ -152,7 +131,6 @@ int DataManBase::put_end(const void *p_data, json &p_jmsg)
m_profiling["total_mb"] =
m_profiling["total_mb"].get<double>() +
product(p_jmsg["varshape"], dsize(p_jmsg["dtype"])) / 1000000.0f;
std::cout << product(p_jmsg["varshape"], dsize(p_jmsg["dtype"])) << "\n";
duration = end - m_start_time;
m_profiling["total_workflow_time"] = duration.count();
m_profiling["workflow_mbs"] =
......@@ -161,50 +139,23 @@ int DataManBase::put_end(const void *p_data, json &p_jmsg)
m_profiling["manager_mbs"] =
m_profiling["total_mb"].get<double>() /
m_profiling["total_manager_time"].get<double>();
put_next(p_data, p_jmsg);
return 0;
}
int DataManBase::get(void *p_data, std::string p_doid, std::string p_var,
std::string p_dtype, std::vector<size_t> p_getshape,
std::vector<size_t> p_varshape,
std::vector<size_t> p_offset, size_t p_timestep)
{
json msg;
msg["doid"] = p_doid;
msg["var"] = p_var;
msg["dtype"] = p_dtype;
msg["getshape"] = p_getshape;
msg["varshape"] = p_varshape;
msg["offset"] = p_offset;
msg["timestep"] = p_timestep;
return get(p_data, msg);
}
int DataManBase::get(void *p_data, std::string p_doid, std::string p_var,
std::string &p_dtype, std::vector<size_t> &p_varshape,
size_t &p_timestep)
{
json msg;
msg["doid"] = p_doid;
msg["var"] = p_var;
return get(p_data, msg);
}
void DataManBase::reg_callback(
std::function<void(const void *, std::string, std::string, std::string,
std::vector<size_t>)>
cb)
{
if (m_next.empty())
if (m_stream_mans.empty())
{
m_callback = cb;
}
else
{
for (const auto &i : m_next)
for (const auto &i : m_stream_mans)
{
i.second->reg_callback(cb);
i->reg_callback(cb);
}
}
}
......@@ -229,36 +180,6 @@ void DataManBase::dump(const void *p_data, json p_jmsg, std::ostream &out)
out << std::endl;
}
void DataManBase::add_next(std::string p_name,
std::shared_ptr<DataManBase> p_next)
{
m_next[p_name] = p_next;
}
void DataManBase::remove_next(std::string p_name) { m_next.erase(p_name); }
bool DataManBase::have_next()
{
if (m_next.empty())
{
return false;
}
else
{
return true;
}
}
void DataManBase::print_next(std::ostream &out)
{
for (const auto &i : m_next)
{
out << i.second->name() << " -> ";
i.second->print_next();
out << std::endl;
}
}
bool DataManBase::auto_transform(std::vector<char> &a_data, json &a_jmsg)
{
if (a_jmsg["compression_method"].is_string() &&
......@@ -284,40 +205,6 @@ bool DataManBase::auto_transform(std::vector<char> &a_data, json &a_jmsg)
}
}
void DataManBase::add_man_to_path(std::string p_new, std::string p_path,
json p_jmsg)
{
if (m_next.count(p_path) > 0)
{
auto man = get_man(p_new);
if (man)
{
man->init(p_jmsg);
man->add_next(p_path, m_next[p_path]);
this->add_next(p_new, man);
this->remove_next(p_path);
}
}
}
int DataManBase::flush_next()
{
for (const auto &i : m_next)
{
i.second->flush();
}
return 0;
}
int DataManBase::put_next(const void *p_data, json p_jmsg)
{
for (const auto &i : m_next)
{
i.second->put(p_data, p_jmsg);
}
return 0;
}
std::shared_ptr<DataManBase> DataManBase::get_man(std::string method)
{
try
......@@ -491,23 +378,6 @@ size_t DataManBase::dsize(std::string dtype)
return 0;
}
nlohmann::json DataManBase::atoj(unsigned int *array)
{
json j;
if (array)
{
if (array[0] > 0)
{
j = {array[1]};
for (unsigned int i = 2; i <= array[0]; ++i)
{
j.insert(j.end(), array[i]);
}
}
}
return j;
}
int DataManBase::closest(int v, json j, bool up)
{
int s = 100, k = 0, t;
......@@ -559,3 +429,5 @@ void DataManBase::check_shape(json &p_jmsg)
p_jmsg["varbytes"] =
product(varshape, dsize(p_jmsg["dtype"].get<std::string>()));
}
void DataManBase::dump_profiling() { logging(m_profiling.dump(4)); }
......@@ -30,62 +30,31 @@ public:
using json = nlohmann::json;
DataManBase();
int put(const void *p_data, std::string p_doid, std::string p_var,
std::string p_dtype, std::vector<size_t> p_putshape,
std::vector<size_t> p_varshape, std::vector<size_t> p_offset,
size_t p_timestep, int p_tolerance = 0, int p_priority = 100);
virtual int put_begin(const void *a_data, json &a_jmsg);
virtual int put_end(const void *a_data, json &a_jmsg);
virtual int put_begin(const void *p_data, json &p_jmsg);
virtual int put_end(const void *p_data, json &p_jmsg);
virtual int put(const void *p_data, json p_jmsg) = 0;
int get(void *p_data, std::string p_doid, std::string p_var,
std::string p_dtype, std::vector<size_t> p_getshape,
std::vector<size_t> p_varshape, std::vector<size_t> p_offset,
size_t p_timestep);
int get(void *p_data, std::string p_doid, std::string p_var,
std::string &p_dtype, std::vector<size_t> &p_varshape,
size_t &p_timestep);
virtual int get(void *p_data, json &p_jmsg) = 0;
virtual int init(json p_jmsg) = 0;
virtual int put(const void *a_data, json &a_jmsg) = 0;
virtual int get(void *a_data, json &a_jmsg) = 0;
virtual int init(json a_jmsg) = 0;
virtual void flush() = 0;
virtual std::string name() = 0;
virtual std::string type() = 0;
void reg_callback(std::function<void(const void *, std::string, std::string,
std::string, std::vector<size_t>)>
cb);
void dump(const void *p_data, json p_jmsg, std::ostream &out = std::cout);
void add_next(std::string p_name, std::shared_ptr<DataManBase> p_next);
void remove_next(std::string p_name);
bool have_next();
void print_next(std::ostream &out = std::cout);
void dump(const void *a_data, json a_jmsg, std::ostream &out = std::cout);
virtual void transform(std::vector<char> &a_data, json &a_jmsg) = 0;
void dump_profiling();
protected:
bool auto_transform(std::vector<char> &a_data, json &a_jmsg);
void add_man_to_path(std::string p_new, std::string p_path, json p_jmsg);
virtual int flush_next();
virtual int put_next(const void *p_data, json p_jmsg);
std::shared_ptr<DataManBase> get_man(std::string method);
void logging(std::string p_msg, std::string p_man = "",
std::ostream &out = std::cout);
bool check_json(json p_jmsg, std::vector<std::string> p_strings,
bool check_json(json a_jmsg, std::vector<std::string> p_strings,
std::string p_man = "");
size_t product(size_t *shape);
......@@ -94,25 +63,22 @@ protected:
size_t dsize(std::string dtype);
json atoj(unsigned int *array);
int closest(int v, json j, bool up);
void check_shape(json &p_jmsg);
void check_shape(json &a_jmsg);
std::function<void(const void *, std::string, std::string, std::string,
std::vector<size_t>)>
m_callback;
std::map<std::string, std::shared_ptr<DataManBase>> m_next;
std::vector<std::shared_ptr<DataManBase>> m_stream_mans;
private:
struct ManagerLibrary;
std::unordered_map<std::string, ManagerLibrary *> m_LoadedManagers;
json m_profiling;
std::chrono::time_point<std::chrono::system_clock> m_start_time;
std::chrono::time_point<std::chrono::system_clock> m_step_time;
bool m_profiling_enabled = false;
json m_profiling;
};
#endif /* DATAMANBASE_H_ */
......@@ -10,53 +10,53 @@
#include "DumpMan.h"
int DumpMan::init(json p_jmsg)
int DumpMan::init(json a_jmsg)
{
if (p_jmsg["dumping"].is_boolean())
if (a_jmsg["dumping"].is_boolean())
{
m_dumping = p_jmsg["dumping"].get<bool>();
m_dumping = a_jmsg["dumping"].get<bool>();
}
return 0;
}
int DumpMan::get(void *p_data, json &p_jmsg) { return 0; }
int DumpMan::get(void *a_data, json &a_jmsg) { return 0; }
int DumpMan::put(const void *p_data, json p_jmsg)
int DumpMan::put(const void *a_data, json &a_jmsg)
{
put_begin(p_data, p_jmsg);
put_begin(a_data, a_jmsg);
if (!m_dumping)
{
return 1;
}
if (!check_json(p_jmsg, {"doid", "var", "dtype", "putshape"}))
if (!check_json(a_jmsg, {"doid", "var", "dtype", "putshape"}))
{
return -1;
}
std::string doid = p_jmsg["doid"];
std::string var = p_jmsg["var"];
std::string dtype = p_jmsg["dtype"];
std::string doid = a_jmsg["doid"];
std::string var = a_jmsg["var"];
std::string dtype = a_jmsg["dtype"];
std::vector<size_t> putshape =
p_jmsg["putshape"].get<std::vector<size_t>>();
a_jmsg["putshape"].get<std::vector<size_t>>();
std::vector<size_t> varshape =
p_jmsg["varshape"].get<std::vector<size_t>>();
std::vector<size_t> offset = p_jmsg["offset"].get<std::vector<size_t>>();
a_jmsg["varshape"].get<std::vector<size_t>>();
std::vector<size_t> offset = a_jmsg["offset"].get<std::vector<size_t>>();
int numbers_to_print = 100;
if (numbers_to_print > product(putshape, 1))
{
numbers_to_print = product(putshape, 1);
}
size_t putbytes = product(putshape, dsize(dtype));
size_t sendbytes = p_jmsg["sendbytes"].get<size_t>();
size_t sendbytes = a_jmsg["sendbytes"].get<size_t>();
std::cout << p_jmsg.dump(4) << std::endl;
std::cout << a_jmsg.dump(4) << std::endl;
std::cout << "total MBs = " << product(putshape, dsize(dtype)) / 1000000
<< std::endl;
std::vector<char> data(static_cast<const char *>(p_data),
static_cast<const char *>(p_data) + sendbytes);
std::vector<char> data(static_cast<const char *>(a_data),
static_cast<const char *>(a_data) + sendbytes);
auto_transform(data, p_jmsg);
auto_transform(data, a_jmsg);
void *data_to_print = data.data();
for (size_t i = 0; i < numbers_to_print; ++i)
......@@ -72,7 +72,7 @@ int DumpMan::put(const void *p_data, json p_jmsg)
}
std::cout << std::endl;
put_end(p_data, p_jmsg);
put_end(a_data, a_jmsg);
return 0;
}
......
......@@ -19,9 +19,9 @@ public:
DumpMan() = default;
virtual ~DumpMan() = default;
virtual int init(json p_jmsg);
virtual int put(const void *p_data, json p_jmsg);
virtual int get(void *p_data, json &p_jmsg);
virtual int init(json a_jmsg);
virtual int put(const void *a_data, json &a_jmsg);
virtual int get(void *a_data, json &a_jmsg);
void flush();
std::string name() { return "DumpMan"; }
std::string type() { return "Dump"; }
......
......@@ -16,134 +16,88 @@
#include <zmq.h>
MdtmMan::~MdtmMan()
{
if (zmq_ipc_req)
{
zmq_close(zmq_ipc_req);
}
}
int MdtmMan::init(json p_jmsg)
int MdtmMan::init(json a_jmsg)
{
StreamMan::init(p_jmsg);
StreamMan::init(a_jmsg);
if (p_jmsg["pipe_prefix"].is_string())
{
pipe_desc["pipe_prefix"] = p_jmsg["pipe_prefix"].get<std::string>();
}
else
if (a_jmsg["pipe_prefix"].is_string())
{
pipe_desc["pipe_prefix"] = "/tmp/MdtmManPipes/";
m_pipepath = a_jmsg["pipe_prefix"].get<std::string>();
}
json pipe_desc;
pipe_desc["operation"] = "init";
pipe_desc["pipe_prefix"] = m_pipepath;
pipe_desc["mode"] = m_stream_mode;
std::string pipename_prefix = "MdtmManPipe";
for (int i = 0; i < m_num_channels; ++i)
std::stringstream pname;
pname << m_pipename_prefix << m_channel_id;
m_pipename = pname.str();
m_full_pipename = m_pipepath + m_pipename;
// send JSON message to MDTM
if (m_channel_id == 0)
{
std::stringstream pipename;
pipename << pipename_prefix << i;
if (i == 0)
for (int i = 0; i < m_num_channels; ++i)
{
pipe_desc["pipe_names"] = {pipename.str()};
pipe_desc["loss_tolerance"] = {m_tolerance[i]};
pipe_desc["priority"] = {m_priority[i]};
}
else
{
pipe_desc["pipe_names"].insert(pipe_desc["pipe_names"].end(),
pipename.str());
pipe_desc["loss_tolerance"].insert(
pipe_desc["loss_tolerance"].end(), m_tolerance[i]);
pipe_desc["priority"].insert(pipe_desc["priority"].end(),
m_priority[i]);
std::stringstream pipename;
pipename << m_pipename_prefix << i;
if (i == 0)
{
pipe_desc["pipe_names"] = {pipename.str()};
}
else
{
pipe_desc["pipe_names"].insert(pipe_desc["pipe_names"].end(),
pipename.str());
}
}
}
// ZMQ_DataMan_MDTM
if (m_stream_mode == "sender")
{
zmq_ipc_req = zmq_socket(zmq_context, ZMQ_REQ);
void *zmq_ipc_req = nullptr;
zmq_ipc_req = zmq_socket(m_zmq_context, ZMQ_REQ);
zmq_connect(zmq_ipc_req, "ipc:///tmp/ADIOS_MDTM_pipe");
char buffer_return[10];
zmq_send(zmq_ipc_req, pipe_desc.dump().c_str(),
pipe_desc.dump().length(), 0);
zmq_recv(zmq_ipc_req, buffer_return, sizeof(buffer_return), 0);
if (zmq_ipc_req)
{
zmq_close(zmq_ipc_req);
}
}
// Pipes
mkdir(pipe_desc["pipe_prefix"].get<std::string>().c_str(), 0755);
for (const auto &i :
pipe_desc["pipe_names"].get<std::vector<std::string>>())
// Make pipes
mkdir(m_pipepath.c_str(), 0755);
mkfifo(m_full_pipename.c_str(), 0666);
if (m_stream_mode == "sender")
{
std::string filename = pipe_desc["pipe_prefix"].get<std::string>() + i;
mkfifo(filename.c_str(), 0666);
m_pipe_handler = open(m_full_pipename.c_str(), O_WRONLY);
}
for (int i = 0; i < m_num_channels; ++i)
if (m_stream_mode == "receiver")
{
std::stringstream pipename;
pipename << pipename_prefix << i;
std::string fullpipename =
pipe_desc["pipe_prefix"].get<std::string>() + pipename.str();
if (m_stream_mode == "sender")
{
int fp = open(fullpipename.c_str(), O_WRONLY);
pipes.push_back(fp);
}
if (m_stream_mode == "receiver")
{
int fp = open(fullpipename.c_str(), O_RDONLY | O_NONBLOCK);
pipes.push_back(fp);
}
pipenames.push_back(pipename.str());
m_pipe_handler = open(m_full_pipename.c_str(), O_RDONLY | O_NONBLOCK);
}
return 0;
}
int MdtmMan::put(const void *a_data, json a_jmsg)
int MdtmMan::put(const void *a_data, json &a_jmsg)
{
a_jmsg["pipe"] = m_pipename;
put_begin(a_data, a_jmsg);
// determine pipe to use
int priority = 100;
if (a_jmsg["priority"].is_number_integer())
{
priority = a_jmsg["priority"].get<int>();
}
int index;
if (m_parallel_mode == "round")
{
if (m_current_channel == m_num_channels - 1)
{
index = 0;
m_current_channel = 0;
}
else
{
m_current_channel++;
index = m_current_channel;
}
}
else if (m_parallel_mode == "priority")
{
index = closest(priority, pipe_desc["priority"], true);
}
a_jmsg["pipe"] = pipe_desc["pipe_names"][index];
StreamMan::put(a_data, a_jmsg);
size_t sendbytes = a_jmsg["sendbytes"].get<size_t>();
write(pipes[index], a_data, sendbytes);
StreamMan::put_stream(a_data, a_jmsg);
put_end(a_data, a_jmsg);
return 0;
}
int MdtmMan::get(void *p_data, json &p_jmsg) { return 0; }
int MdtmMan::get(void *a_data, json &a_jmsg) { return 0; }
void MdtmMan::on_recv(json a_jmsg)
void MdtmMan::on_put(std::shared_ptr<std::vector<char>> a_data)
{
write(m_pipe_handler, a_data->data(), a_data->size());
}
void MdtmMan::on_recv(json &a_jmsg)
{
// push new request
......@@ -154,7 +108,7 @@ void MdtmMan::on_recv(json a_jmsg)
// for flush
if (jqueue.front()["operation"] == "flush")
{
callback();
callback_cache();
jqueue.pop();
vqueue.pop();
iqueue.pop();
......@@ -176,27 +130,13 @@ void MdtmMan::on_recv(json a_jmsg)
size_t sendbytes = jmsg["sendbytes"].get<size_t>();
vqueue.front() = std::vector<char>(sendbytes);
// determine the pipe for the head request
if (jmsg == nullptr)
{
break;
}
int pipeindex = 0;
for (int i = 0; i < pipenames.size(); ++i)
{
if (jmsg["pipe"].get<std::string>() == pipenames[i])
{
pipeindex = i;
}
}
// read the head request
int error_times = 0;
while (iqueue.front() < sendbytes)
{
int ret = read(pipes[pipeindex],
vqueue.front().data() + iqueue.front(),
sendbytes - iqueue.front());
int ret =
read(m_pipe_handler, vqueue.front().data() + iqueue.front(),
sendbytes - iqueue.front());
if (ret > 0)
{
iqueue.front() += ret;
......@@ -222,7 +162,17 @@ void MdtmMan::on_recv(json a_jmsg)
auto_transform(vqueue.front(), a_jmsg);
}
}
m_cache.put(vqueue.front().data(), jmsg);
if (a_jmsg["varshape"] == a_jmsg["putshape"])
{
std::cout << "callback_direct \n";
callback_direct(vqueue.front().data(), jmsg);
}
else
{
m_cache.put(vqueue.front().data(), jmsg);
}
jqueue.pop();
vqueue.pop();
iqueue.pop();
......
......@@ -19,23 +19,25 @@ class MdtmMan : public StreamMan
{
public:
MdtmMan() = default;
virtual ~MdtmMan();
virtual ~MdtmMan() = default;
virtual int init(json p_jmsg);
virtual int put(const void *p_data, json p_jmsg);
virtual int get(void *p_data, json &p_jmsg);
virtual int init(json a_jmsg);
virtual int put(const void *a_data, json &a_jmsg);
virtual int get(void *a_data, json &a_jmsg);
virtual void transform(std::vector<char> &a_data, json &a_jmsg) {}
void on_recv(json msg);
virtual void on_recv(json &a_msg);
virtual void on_put(std::shared_ptr<std::vector<char>> a_data);
std::string name() { return "MdtmMan"; }
private:
void *zmq_ipc_req = nullptr;
int zmq_msg_size = 1024;
json pipe_desc;
std::string m_pipepath = "/tmp/MdtmManPipes/";
std::string m_pipename_prefix = "MdtmManPipe";
std::string m_pipename;
std::string m_full_pipename;
int m_pipe_handler;
std::string getmode = "callback";
std::vector<int> pipes;
std::vector<std::string> pipenames;
std::queue<json> jqueue;
std::queue<std::vector<char>> vqueue;
std::queue<int> iqueue;
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment