diff --git a/CMakeLists.txt b/CMakeLists.txt index a851009907345f4ff50958471eaf40659dc4bbf2..9ed7d3fd2b4d1a6a428fd770ecf9e3da105d4480 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -86,6 +86,7 @@ adios_option(BZip2 "Enable support for BZip2 transforms" AUTO) adios_option(ZFP "Enable support for ZFP transforms" AUTO) adios_option(MPI "Enable support for MPI" AUTO) adios_option(DataMan "Enable support for DataMan" AUTO) +adios_option(SST "Enable support for SST" AUTO) adios_option(ZeroMQ "Enable support for ZeroMQ" AUTO) adios_option(HDF5 "Enable support for the HDF5 engine" AUTO) adios_option(ADIOS1 "Enable support for the ADIOS 1.x engine" AUTO) @@ -100,7 +101,7 @@ if(ADIOS2_HAVE_MPI) endif() set(ADIOS2_CONFIG_OPTS - BZip2 ZFP MPI DataMan ZeroMQ HDF5 ADIOS1 Python Fortran SysVShMem + BZip2 ZFP MPI DataMan SST ZeroMQ HDF5 ADIOS1 Python Fortran SysVShMem ) GenerateADIOSHeaderConfig(${ADIOS2_CONFIG_OPTS}) diff --git a/cmake/ADIOS2ConfigCommon.cmake.in b/cmake/ADIOS2ConfigCommon.cmake.in index c32881dab5ee8a31b178133d38c16698277d8c19..b49de73411f5bd764c6d44622606a9a08a7b4d7b 100644 --- a/cmake/ADIOS2ConfigCommon.cmake.in +++ b/cmake/ADIOS2ConfigCommon.cmake.in @@ -35,6 +35,11 @@ if(NOT @BUILD_SHARED_LIBS@) find_dependency(HDF5 COMPONENTS C) endif() + set(ADIOS2_HAVE_SST @ADIOS2_HAVE_SST@) + if(ADIOS2_HAVE_SST) + find_dependency(SST COMPONENTS C) + endif() + set(ADIOS2_HAVE_ADIOS1 @ADIOS2_HAVE_ADIOS1@) if(ADIOS2_HAVE_ADIOS1) if(ADIOS2_HAVE_MPI) diff --git a/cmake/DetectOptions.cmake b/cmake/DetectOptions.cmake index de2a7784a1da6a68a0e059e951059e743a6baca1..e6b77ad54b6bc7dd428289dc1522abc55cfed1cf 100644 --- a/cmake/DetectOptions.cmake +++ b/cmake/DetectOptions.cmake @@ -124,6 +124,17 @@ if(PythonFull_FOUND) set(ADIOS2_HAVE_Python ON) endif() +# Sst +if(ADIOS2_USE_SST STREQUAL AUTO) + find_package(EVPATH) +elseif (ADIOS2_USE_SST) + find_package(EVPATH REQUIRED) +endif() + +if (EVPATH_FOUND) + set (ADIOS2_HAVE_SST TRUE) +endif() + #SysV IPC if(UNIX) include(CheckSymbolExists) diff --git a/cmake/FindEVPATH.cmake b/cmake/FindEVPATH.cmake new file mode 100644 index 0000000000000000000000000000000000000000..cd747600f798f85d767ec75328a35dfa95f297cb --- /dev/null +++ b/cmake/FindEVPATH.cmake @@ -0,0 +1,192 @@ +# - Find EVPath library +# https://www.cc.gatech.edu/systems/projects/EVPath +# +# Use this module by invoking find_package with the form: +# find_package(EVPATH +# [version] [EXACT] # Minimum or EXACT version, e.g. 1.6.0 +# [REQUIRED] # Fail with an error if EVPATH or a required +# # component is not found +# [QUIET] # ... +# [COMPONENTS <...>] # Compiled in components: fortran, readonly, + # sequential (all are case insentative) +# ) +# +# Module that finds the includes and libraries for a working EVPATH install. +# This module invokes the `evpath_config` script that should be installed with +# the other EVPATH tools. +# +# To provide a hint to the module where to find the EVPATH installation, +# set the EVPATH_ROOT or EVPATH_DIR environment variable. +# +# If this variable is not set, make sure that at least the according `bin/` +# directory of EVPATH is in your PATH environment variable. +# +# Set the following CMake variables BEFORE calling find_packages to +# influence this module: +# EVPATH_USE_STATIC_LIBS - Set to ON to force the use of static +# libraries. Default: OFF +# +# This module will define the following variables: +# EVPATH_INCLUDE_DIRS - Include directories for the EVPATH headers. +# EVPATH_LIBRARY_PATH - Full path of the libevpath library (.a or .so file) +# EVPATH_DEPENDENCY_LIBRARIES - EVPATH dependency libraries. +# EVPATH_FOUND - TRUE if FindEVPATH found a working install +# EVPATH_VERSION - Version in format Major.Minor.Patch +# +# Not used for now: +# EVPATH_DEFINITIONS - Compiler definitions you should add with +# add_definitions(${EVPATH_DEFINITIONS}) +# +############################################################################### +#Copyright (c) 2014, Axel Huebl and Felix Schmitt from http://picongpu.hzdr.de +#All rights reserved. + +#Redistribution and use in source and binary forms, with or without +#modification, are permitted provided that the following conditions are met: + +#1. Redistributions of source code must retain the above copyright notice, this +#list of conditions and the following disclaimer. + +#2. Redistributions in binary form must reproduce the above copyright notice, +#this list of conditions and the following disclaimer in the documentation +#and/or other materials provided with the distribution. + +#3. Neither the name of the copyright holder nor the names of its contributors +#may be used to endorse or promote products derived from this software without +#specific prior written permission. + +#THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +#AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +#IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +#DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +#FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +#DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +#SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +#CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +#OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +#OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +############################################################################### + + +############################################################################### +# EVPATH +############################################################################### +# get flags for evpath_config, -s (static) is the default + +set(evpath_config_hints) +foreach(PREFIX_VAR IN ITEMS EVPATH_ROOT EVPATH_DIR INSTALL_PREFIX) + if(${PREFIX_VAR}) + list(APPEND evpath_config_hints "${${PREFIX_VAR}}/bin") + elseif(NOT ("$ENV{${PREFIX_VAR}}" STREQUAL "")) + list(APPEND evpath_config_hints "$ENV{${PREFIX_VAR}}/bin") + endif() +endforeach() + +# find `evpath_config` program ################################################# +find_program(EVPATH_CONFIG NAME evpath_config HINTS ${evpath_config_hints}) + +# check `evpath_config` program ################################################ +if(EVPATH_CONFIG) + execute_process(COMMAND ${EVPATH_CONFIG} -c + OUTPUT_VARIABLE evpath_config_out + RESULT_VARIABLE evpath_config_ret + OUTPUT_STRIP_TRAILING_WHITESPACE + ) + if(evpath_config_ret EQUAL 0) + string(REPLACE " " ";" evpath_match "${evpath_config_out}") + set(evpath_include_hints) + set(EVPATH_COMPILE_OPTIONS) + foreach(OPT IN LISTS evpath_match) + if(OPT MATCHES "^-I(.*)") + list(APPEND evpath_include_hints "${CMAKE_MATCH_1}") + else() + list(APPEND EVPATH_COMPILE_OPTIONS ${OPT}) + endif() + endforeach() + endif() + + execute_process(COMMAND ${EVPATH_CONFIG} -l + OUTPUT_VARIABLE evpath_config_out + RESULT_VARIABLE evpath_config_ret + OUTPUT_STRIP_TRAILING_WHITESPACE + ) + if(evpath_config_ret EQUAL 0) + string(REPLACE " " ";" evpath_match "${evpath_config_out}") + set(evpath_libs) + set(evpath_lib_hints) + set(evpath_lib_flags) + foreach(OPT IN LISTS evpath_match) + if(OPT MATCHES "^-L(.*)") + list(APPEND evpath_lib_hints "${CMAKE_MATCH_1}") + elseif(OPT STREQUAL "-lstdc++") + list(APPEND evpath_lib_flags "${OPT}") + elseif(OPT MATCHES "^-l(.*)") + list(APPEND evpath_libs "${CMAKE_MATCH_1}") + elseif(OPT MATCHES "/*") + list(APPEND evpath_libs "${OPT}") + else() + list(APPEND evpath_lib_flags "${OPT}") + endif() + endforeach() + endif() + + # add the version string + execute_process(COMMAND ${EVPATH_CONFIG} -v + OUTPUT_VARIABLE EVPATH_VERSION + OUTPUT_STRIP_TRAILING_WHITESPACE + ) +endif() + +# make sure at the very least we find libevpath +if(NOT evpath_libs) + set(evpath_libs evpath) +endif() + +# Search for the actual libs and headers to use based on hints from the config +find_path(EVPATH_INCLUDE_DIR evpath.h HINTS ${evpath_include_hints}) + +set(EVPATH_LIBRARY) +set(EVPATH_DEPENDENCIES) +foreach(lib IN LISTS evpath_libs) + find_library(EVPATH_${lib}_LIBRARY NAME ${lib} HINTS ${evpath_lib_hints}) + if(EVPATH_${lib}_LIBRARY) + if(lib MATCHES "^evpath") + set(EVPATH_LIBRARY ${EVPATH_${lib}_LIBRARY}) + else() + list(APPEND EVPATH_DEPENDENCIES ${EVPATH_${lib}_LIBRARY}) + endif() + else() + list(APPEND EVPATH_DEPENDENCIES ${lib}) + endif() +endforeach() + +find_package(Threads REQUIRED) +list(APPEND EVPATH_DEPENDENCIES ${evpath_lib_flags} ${CMAKE_THREAD_LIBS_INIT}) + +############################################################################### +# FindPackage Options +############################################################################### + +# handles the REQUIRED, QUIET and version-related arguments for find_package +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(EVPATH + REQUIRED_VARS EVPATH_LIBRARY EVPATH_INCLUDE_DIR + VERSION_VAR EVPATH_VERSION +) + +if(EVPATH_FOUND) + set(EVPATH_INCLUDE_DIRS ${EVPATH_INCLUDE_DIR}) + set(EVPATH_LIBRARIES ${EVPATH_LIBRARY} ${EVPATH_DEPENDENCIES}) + ########################################################################## + # Add target and dependencies to ADIOS2 + ########################################################################## + if(NOT TARGET evpath::evpath) + add_library(evpath::evpath UNKNOWN IMPORTED) + set_target_properties(evpath::evpath PROPERTIES + IMPORTED_LOCATION "${EVPATH_LIBRARY}" + INTERFACE_LINK_LIBRARIES "${EVPATH_DEPENDENCIES}" + INTERFACE_INCLUDE_DIRECTORIES "${EVPATH_INCLUDE_DIRS}" + INTERFACE_COMPILE_OPTIONS "${EVPATH_COMPILE_OPTIONS}" + ) + endif() +endif() diff --git a/examples/hello/CMakeLists.txt b/examples/hello/CMakeLists.txt index f811c6372fbfbdee690de217deb9529cb803e774..4fe800cfbfa0892d1781ae46c5335187b624612b 100644 --- a/examples/hello/CMakeLists.txt +++ b/examples/hello/CMakeLists.txt @@ -19,6 +19,11 @@ if(ADIOS2_HAVE_DataMan) add_subdirectory(datamanWriter) endif() +if(ADIOS2_HAVE_SST) + add_subdirectory(sstReader) + add_subdirectory(sstWriter) +endif() + if(ADIOS2_HAVE_HDF5) add_subdirectory(hdf5Writer) endif() diff --git a/examples/hello/sstReader/CMakeLists.txt b/examples/hello/sstReader/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..28f22d092bdccd772505c6e94f71811fb352517a --- /dev/null +++ b/examples/hello/sstReader/CMakeLists.txt @@ -0,0 +1,11 @@ +#------------------------------------------------------------------------------# +# Distributed under the OSI-approved Apache License, Version 2.0. See +# accompanying file Copyright.txt for details. +#------------------------------------------------------------------------------# + +find_package(MPI COMPONENTS C REQUIRED) + +add_executable(hello_sstReader helloSstReader.cpp) +target_include_directories(hello_sstReader PRIVATE ${MPI_C_INCLUDE_PATH}) +target_link_libraries(hello_sstReader ${MPI_C_LIBRARIES}) +target_link_libraries(hello_sstReader adios2) diff --git a/examples/hello/sstReader/helloSstReader.cpp b/examples/hello/sstReader/helloSstReader.cpp new file mode 100644 index 0000000000000000000000000000000000000000..678c9c52f89283905f878f84cea5b1584b2e4a52 --- /dev/null +++ b/examples/hello/sstReader/helloSstReader.cpp @@ -0,0 +1,88 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * helloSstReader_nompi.cpp + * + * Created on: Aug 17, 2017 +v * Author: Greg Eisenhauer + */ + +#include <chrono> +#include <iostream> +#include <numeric> +#include <thread> +#include <vector> + +#include <adios2.h> + +void UserCallBack(const void *data, std::string doid, std::string var, + std::string dtype, std::vector<std::size_t> varshape) +{ + std::cout << "data object ID = " << doid << "\n"; + std::cout << "variable name = " << var << "\n"; + std::cout << "data type = " << dtype << "\n"; + + std::size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1, + std::multiplies<std::size_t>()); + + for (unsigned int i = 0; i < varsize; ++i) + std::cout << ((float *)data)[i] << " "; + std::cout << std::endl; +} + +int main(int argc, char *argv[]) +{ + // Application variable + MPI_Init(&argc, &argv); + int rank, size; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &size); + + int timeout = 5; + + if (argc == 2) + { + timeout = atoi(argv[1]); + } + + try + { + adios2::ADIOS adios(MPI_COMM_WORLD, adios2::DebugON); + + adios2::IO &sstIO = adios.DeclareIO("WAN"); + sstIO.SetEngine("SstReader"); + sstIO.SetParameters({{"real_time", "yes"}, + {"method_type", "stream"}, + {"method", "dump"}}); + + adios2::Engine &sstReader = + sstIO.Open("helloSst.bp", adios2::Mode::Read); + + std::this_thread::sleep_for(std::chrono::seconds(timeout)); + + sstReader.Close(); + } + catch (std::invalid_argument &e) + { + std::cout << "Invalid argument exception, STOPPING PROGRAM from rank " + << rank << "\n"; + std::cout << e.what() << "\n"; + } + catch (std::ios_base::failure &e) + { + std::cout << "IO System base failure exception, STOPPING PROGRAM " + "from rank " + << rank << "\n"; + std::cout << e.what() << "\n"; + } + catch (std::exception &e) + { + std::cout << "Exception, STOPPING PROGRAM from rank " << rank << "\n"; + std::cout << e.what() << "\n"; + } + + MPI_Finalize(); + + return 0; +} diff --git a/examples/hello/sstWriter/CMakeLists.txt b/examples/hello/sstWriter/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..df0cd234a7c966288d72dfd7aa25df9d97f7d854 --- /dev/null +++ b/examples/hello/sstWriter/CMakeLists.txt @@ -0,0 +1,11 @@ +#------------------------------------------------------------------------------# +# Distributed under the OSI-approved Apache License, Version 2.0. See +# accompanying file Copyright.txt for details. +#------------------------------------------------------------------------------# + +find_package(MPI COMPONENTS C REQUIRED) + +add_executable(hello_sstWriter helloSstWriter.cpp) +target_include_directories(hello_sstWriter PRIVATE ${MPI_C_INCLUDE_PATH}) +target_link_libraries(hello_sstWriter ${MPI_C_LIBRARIES}) +target_link_libraries(hello_sstWriter adios2) diff --git a/examples/hello/sstWriter/helloSstWriter.cpp b/examples/hello/sstWriter/helloSstWriter.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b1e633095cc74c9ae35eedffe7b88fa43108dd9a --- /dev/null +++ b/examples/hello/sstWriter/helloSstWriter.cpp @@ -0,0 +1,72 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * helloSstWriter.cpp + * + * Created on: Aug 17, 2017 + * Author: Greg Eisenhauer + */ + +#include <iostream> +#include <vector> + +#include <mpi.h> + +#include <adios2.h> + +int main(int argc, char *argv[]) +{ + // Application variable + MPI_Init(&argc, &argv); + int rank, size; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &size); + + std::vector<float> myFloats = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + const std::size_t Nx = myFloats.size(); + + try + { + adios2::ADIOS adios(MPI_COMM_WORLD, adios2::DebugON); + adios2::IO &sstIO = adios.DeclareIO("WANIO"); + sstIO.SetEngine("SstWriter"); + sstIO.SetParameters({{"peer-to-peer", "yes"}, + {"real_time", "yes"}, + {"compress", "no"}, + {"method", "dump"}}); + + // Define variable and local size + auto bpFloats = sstIO.DefineVariable<float>("bpFloats", {}, {}, {Nx}); + + // Create engine smart pointer to Sst Engine due to polymorphism, + // Open returns a smart pointer to Engine containing the Derived class + adios2::Engine &sstWriter = + sstIO.Open("helloSst.bp", adios2::Mode::Write); + + sstWriter.PutSync<float>(bpFloats, myFloats.data()); + sstWriter.Close(); + } + catch (std::invalid_argument &e) + { + std::cout << "Invalid argument exception, STOPPING PROGRAM from rank " + << rank << "\n"; + std::cout << e.what() << "\n"; + } + catch (std::ios_base::failure &e) + { + std::cout + << "IO System base failure exception, STOPPING PROGRAM from rank " + << rank << "\n"; + std::cout << e.what() << "\n"; + } + catch (std::exception &e) + { + std::cout << "Exception, STOPPING PROGRAM from rank " << rank << "\n"; + std::cout << e.what() << "\n"; + } + + MPI_Finalize(); + + return 0; +} diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index c886e6ba808868e022404d61e79f5a8539e25137..5953e7253806581e97827d57b7c2b25b605a9f46 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -73,6 +73,15 @@ if(ADIOS2_HAVE_ZeroMQ) target_link_libraries(adios2 PRIVATE ZeroMQ::ZMQ) endif() +if(ADIOS2_HAVE_SST) + target_sources(adios2 PRIVATE + engine/sst/SstReader.cpp + engine/sst/SstWriter.cpp + ) + target_link_libraries(adios2 PRIVATE sst evpath::evpath) + add_subdirectory(toolkit/sst) +endif() + target_sources(adios2 PRIVATE engine/dataman/DataManReader.cpp engine/dataman/DataManWriter.cpp diff --git a/source/adios2/core/IO.cpp b/source/adios2/core/IO.cpp index f0f9bd40fee503e4acba005c06e55cd4dd5a1dc9..365f5b1100cd1a0baba0cadacdcecc03d3383d29 100644 --- a/source/adios2/core/IO.cpp +++ b/source/adios2/core/IO.cpp @@ -25,6 +25,11 @@ #include "adios2/engine/dataman/DataManWriter.h" #endif +#ifdef ADIOS2_HAVE_SST // external dependencies +#include "adios2/engine/sst/SstReader.h" +#include "adios2/engine/sst/SstWriter.h" +#endif + #ifdef ADIOS2_HAVE_ADIOS1 // external dependencies #include "adios2/engine/adios1/ADIOS1Reader.h" #include "adios2/engine/adios1/ADIOS1Writer.h" @@ -302,6 +307,24 @@ Engine &IO::Open(const std::string &name, const Mode mode, MPI_Comm mpiComm) throw std::invalid_argument( "ERROR: this version didn't compile with " "DataMan library, can't Open DataManReader\n"); +#endif + } + else if (m_EngineType == "SstWriter") + { +#ifdef ADIOS2_HAVE_SST + engine = std::make_shared<SstWriter>(*this, name, mode, mpiComm); +#else + throw std::invalid_argument("ERROR: this version didn't compile with " + "Sst library, can't Open SstWriter\n"); +#endif + } + else if (m_EngineType == "SstReader") + { +#ifdef ADIOS2_HAVE_SST + engine = std::make_shared<SstReader>(*this, name, mode, mpiComm); +#else + throw std::invalid_argument("ERROR: this version didn't compile with " + "Sst library, can't Open SstReader\n"); #endif } else if (m_EngineType == "ADIOS1Writer") diff --git a/source/adios2/engine/sst/SstReader.cpp b/source/adios2/engine/sst/SstReader.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3d8367d45090f79cf07f0edbd65840b222fb0a67 --- /dev/null +++ b/source/adios2/engine/sst/SstReader.cpp @@ -0,0 +1,40 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * SstReader.cpp + * + * Created on: Aug 17, 2017 + * Author: Greg Eisenhauer + */ + +#include <cstring> +#include <string> + +#include "SstReader.h" + +namespace adios2 +{ + +SstReader::SstReader(IO &io, const std::string &name, const Mode mode, + MPI_Comm mpiComm) +: Engine("SstReader", io, name, mode, mpiComm) +{ + SstStream output; + char *cstr = new char[name.length() + 1]; + std::strcpy(cstr, name.c_str()); + + m_Input = SstReaderOpen(cstr, NULL, mpiComm); + Init(); + delete[] cstr; +} + +void SstReader::Close(const int transportIndex) { SstReaderClose(m_Input); } + +// PRIVATE +void SstReader::Init() +{ + auto itRealTime = m_IO.m_Parameters.find("real_time"); +} + +} // end namespace adios diff --git a/source/adios2/engine/sst/SstReader.h b/source/adios2/engine/sst/SstReader.h new file mode 100644 index 0000000000000000000000000000000000000000..d0603e089e20ac384450f0c95727a7d5cc6ad9a3 --- /dev/null +++ b/source/adios2/engine/sst/SstReader.h @@ -0,0 +1,54 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * SstReader.h + * + * Created on: Aug 17, 2017 + * Author: Greg Eisenhauer + */ + +#ifndef ADIOS2_ENGINE_SST_SSTREADER_H_ +#define ADIOS2_ENGINE_SST_SSTREADER_H_ + +#include <iostream> //std::cout << Needs to go + +#include <mpi.h> + +#include "adios2/toolkit/sst/sst.h" + +#include "adios2/core/Engine.h" +#include "adios2/core/IO.h" + +namespace adios2 +{ + +class SstReader : public Engine +{ + +public: + /** + * Constructor for sst engine Reader + * @param adios + * @param name + * @param accessMode + * @param mpiComm + * @param method + * @param debugMode + * @param nthreads + */ + SstReader(IO &io, const std::string &name, const Mode mode, + MPI_Comm mpiComm); + + virtual ~SstReader() = default; + + void Close(const int transportIndex = -1); + +private: + void Init(); + SstStream m_Input; +}; + +} // end namespace adios + +#endif /* ADIOS2_ENGINE_SST_SSTREADER_H_ */ diff --git a/source/adios2/engine/sst/SstWriter.cpp b/source/adios2/engine/sst/SstWriter.cpp new file mode 100644 index 0000000000000000000000000000000000000000..56035b3b95c09cc7ce7b2b1c373da316acd1ea04 --- /dev/null +++ b/source/adios2/engine/sst/SstWriter.cpp @@ -0,0 +1,131 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * Sst.cpp + * + * Created on: Aug 17, 2017 + * Author: Greg Eisenhauer + */ + +#include <mpi.h> + +#include "SstWriter.h" +#include "SstWriter.tcc" + +#include <iostream> //needs to go away, this is just for demo purposes + +namespace adios2 +{ + +SstWriter::SstWriter(IO &io, const std::string &name, const Mode mode, + MPI_Comm mpiComm) +: Engine("SstWriter", io, name, mode, mpiComm) +{ + char *cstr = new char[name.length() + 1]; + strcpy(cstr, name.c_str()); + + m_Output = SstWriterOpen(cstr, NULL, mpiComm); + Init(); + delete[] cstr; +} + +StepStatus SstWriter::BeginStep(StepMode mode, const float timeout_sec) +{ + return StepStatus::OK; +} +void SstWriter::EndStep() {} + +void SstWriter::Close(const int transportIndex) { SstWriterClose(m_Output); } + +// PRIVATE functions below +void SstWriter::Init() +{ + auto lf_SetBoolParameter = [&](const std::string key, bool ¶meter) { + + auto itKey = m_IO.m_Parameters.find(key); + if (itKey != m_IO.m_Parameters.end()) + { + if (itKey->second == "yes" || itKey->second == "true") + { + parameter = true; + } + else if (itKey->second == "no" || itKey->second == "false") + { + parameter = false; + } + } + }; + + // lf_SetBoolParameter("real_time", m_DoRealTime); + // lf_SetBoolParameter("monitoring", m_DoMonitor); + + // if (m_DoRealTime) + // { + // /** + // * Lambda function that assigns a parameter in m_Method to a + // * localVariable + // * of type std::string + // */ + // auto lf_AssignString = [&](const std::string parameter, + // std::string &localVariable) { + // auto it = m_IO.m_Parameters.find(parameter); + // if (it != m_IO.m_Parameters.end()) + // { + // localVariable = it->second; + // } + // }; + + // /** + // * Lambda function that assigns a parameter in m_Method to a + // * localVariable + // * of type int + // */ + // auto lf_AssignInt = [&](const std::string parameter, + // int &localVariable) { + // auto it = m_IO.m_Parameters.find(parameter); + // if (it != m_IO.m_Parameters.end()) + // { + // localVariable = std::stoi(it->second); + // } + // }; + + // auto lf_IsNumber = [](const std::string &s) { + // return !s.empty() && std::find_if(s.begin(), s.end(), [](char c) + // { + // return !std::isdigit(c); + // }) == s.end(); + // }; + + // json jmsg; + // for (const auto &i : m_IO.m_Parameters) + // { + // if (lf_IsNumber(i.second)) + // { + // jmsg[i.first] = std::stoi(i.second); + // } + // else + // { + // jmsg[i.first] = i.second; + // } + // } + // jmsg["stream_mode"] = "sender"; + // m_Man.add_stream(jmsg); + + // std::string method_type; + // lf_AssignString("method_type", method_type); + + // int num_channels = 0; + // lf_AssignInt("num_channels", num_channels); + // } +} + +#define declare_type(T) \ + void SstWriter::DoPutSync(Variable<T> &variable, const T *values) \ + { \ + PutSyncCommon(variable, values); \ + } +ADIOS2_FOREACH_TYPE_1ARG(declare_type) +#undef declare_type + +} // end namespace adios diff --git a/source/adios2/engine/sst/SstWriter.h b/source/adios2/engine/sst/SstWriter.h new file mode 100644 index 0000000000000000000000000000000000000000..9f28ef0e4a48e2eb61d443fdae3cd69f8958dac9 --- /dev/null +++ b/source/adios2/engine/sst/SstWriter.h @@ -0,0 +1,58 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * SstWriter.h + * + * Created on: Aug 17, 2017 + * Author: Greg Eisenhauer + */ + +#ifndef ADIOS2_ENGINE_SST_SST_WRITER_H_ +#define ADIOS2_ENGINE_SST_SST_WRITER_H_ + +#include <iostream> //std::cout must be removed, only used for hello example +#include <unistd.h> //sleep must be removed + +#include <mpi.h> + +#include "adios2/toolkit/sst/sst.h" + +#include "adios2/ADIOSConfig.h" +#include "adios2/core/Engine.h" + +namespace adios2 +{ + +class SstWriter : public Engine +{ + +public: + SstWriter(IO &io, const std::string &name, const Mode mode, + MPI_Comm mpiComm); + + virtual ~SstWriter() = default; + + StepStatus BeginStep(StepMode mode, const float timeoutSeconds = 0.f) final; + void EndStep() final; + + void Close(const int transportIndex = -1) final; + +private: + void Init(); ///< calls InitCapsules and InitTransports based on Method, + /// called from constructor + +#define declare_type(T) \ + void DoPutSync(Variable<T> &variable, const T *values) final; + ADIOS2_FOREACH_TYPE_1ARG(declare_type) +#undef declare_type + + template <class T> + void PutSyncCommon(Variable<T> &variable, const T *values); + + SstStream m_Output; +}; + +} // end namespace adios + +#endif /* ADIOS2_ENGINE_SST_SST_WRITER_H_ */ diff --git a/source/adios2/engine/sst/SstWriter.tcc b/source/adios2/engine/sst/SstWriter.tcc new file mode 100644 index 0000000000000000000000000000000000000000..2ff052d016ddd5cf53cbe075d24fea9250cec1d9 --- /dev/null +++ b/source/adios2/engine/sst/SstWriter.tcc @@ -0,0 +1,47 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * Sst.h + * + * Created on: Aug 17, 2017 + * Author: Greg Eisenhauer + */ + +#ifndef ADIOS2_ENGINE_SST_SST_WRITER_TCC_ +#define ADIOS2_ENGINE_SST_SST_WRITER_TCC_ + +#include "SstWriter.h" + +#include "adios2/ADIOSMPI.h" +#include "adios2/helper/adiosFunctions.h" //GetType<T> + +namespace adios2 +{ + +template <class T> +void SstWriter::PutSyncCommon(Variable<T> &variable, const T *values) +{ + variable.SetData(values); + + // This part will go away, this is just to monitor variables per rank + + if (variable.m_Count.empty()) + { + variable.m_Count = variable.m_Shape; + } + if (variable.m_Start.empty()) + { + variable.m_Start.assign(variable.m_Count.size(), 0); + } + + std::cout << "I am hooked to the Sst library\n"; + std::cout << "Variable " << variable.m_Name << "\n"; + std::cout << "putshape " << variable.m_Count.size() << "\n"; + std::cout << "varshape " << variable.m_Shape.size() << "\n"; + std::cout << "offset " << variable.m_Start.size() << "\n"; +} + +} // end namespace adios + +#endif /* ADIOS2_ENGINE_SST_SST_WRITER_H_ */ diff --git a/source/adios2/toolkit/sst/CMakeLists.txt b/source/adios2/toolkit/sst/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..725d7cce087e2d82baafb344d6496478aa2b942d --- /dev/null +++ b/source/adios2/toolkit/sst/CMakeLists.txt @@ -0,0 +1,15 @@ + +add_library(sst "") +set_property(TARGET sst PROPERTY C_STANDARD 99) +target_include_directories(sst PRIVATE ${CMAKE_CURRENT_SOURCE_DIR} ${EVPATH_INCLUDE_DIRS} ${MPI_C_INCLUDE_PATH}) +target_link_libraries(sst evpath::evpath ${MPI_C_LIBRARIES}) + +# sst sources are defined in the includes +include(dp/CMakeLists.txt) +include(cp/CMakeLists.txt) + +install(TARGETS sst EXPORT adios2Exports + RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} + LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} +) diff --git a/source/adios2/toolkit/sst/cp/CMakeLists.txt b/source/adios2/toolkit/sst/cp/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..c0fd5765feb5299c2c3acfc38739285dd18c97d7 --- /dev/null +++ b/source/adios2/toolkit/sst/cp/CMakeLists.txt @@ -0,0 +1,2 @@ +target_sources(sst PRIVATE "${CMAKE_CURRENT_LIST_DIR}/cp.c" + "${CMAKE_CURRENT_LIST_DIR}/cp_common.c") diff --git a/source/adios2/toolkit/sst/cp/cp.c b/source/adios2/toolkit/sst/cp/cp.c new file mode 100644 index 0000000000000000000000000000000000000000..c70460c37c12f9544f84c67f811cc1523afbe2c4 --- /dev/null +++ b/source/adios2/toolkit/sst/cp/cp.c @@ -0,0 +1,1079 @@ +#include <assert.h> +#include <stdarg.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/stat.h> +#include <sys/time.h> + +#include <atl.h> +#include <evpath.h> +#include <mpi.h> +#include <pthread.h> + +#include "sst.h" + +#include "cp_internal.h" + +extern void CP_verbose(SstStream Stream, char *Format, ...); +static void DP_verbose(SstStream Stream, char *Format, ...); +static CManager CP_getCManager(SstStream Stream); +static void CP_sendToPeer(SstStream Stream, CP_PeerCohort cohort, int rank, + CMFormat Format, void *data); +static MPI_Comm CP_getMPIComm(SstStream Stream); + +struct _CP_Services Svcs = { + (CP_VerboseFunc)DP_verbose, (CP_GetCManagerFunc)CP_getCManager, + (CP_SendToPeerFunc)CP_sendToPeer, (CP_GetMPICommFunc)CP_getMPIComm}; + +static void sendOneToEachWriterRank(SstStream s, CMFormat f, void *Msg, + void **WS_StreamPtr); +static void writeContactInfo(const char *Name, SstStream Stream) +{ + char *Contact = attr_list_to_string(CMget_contact_list(Stream->CPInfo->cm)); + char *TmpName = malloc(strlen(Name) + strlen(".tmp") + 1); + char *FileName = malloc(strlen(Name) + strlen(".bpflx") + 1); + FILE *WriterInfo; + + /* + * write the contact information file with a temporary name before + * renaming it to the final version to help prevent partial reads + */ + sprintf(TmpName, "%s.tmp", Name); + sprintf(FileName, "%s.bpflx", Name); + WriterInfo = fopen(TmpName, "w"); + fprintf(WriterInfo, "%p:%s", (void *)Stream, Contact); + fclose(WriterInfo); + rename(TmpName, FileName); + free(TmpName); + free(FileName); +} + +static char *readContactInfo(const char *Name, SstStream Stream) +{ + char *FileName = malloc(strlen(Name) + strlen(".bpflx") + 1); + FILE *WriterInfo; + sprintf(FileName, "%s.bpflx", Name); +// printf("Looking for writer contact in file %s\n", FileName); +redo: + WriterInfo = fopen(FileName, "r"); + while (!WriterInfo) + { + CMusleep(Stream->CPInfo->cm, 500); + WriterInfo = fopen(FileName, "r"); + } + struct stat Buf; + fstat(fileno(WriterInfo), &Buf); + int Size = Buf.st_size; + if (Size == 0) + { + // printf("Size of writer contact file is zero, but it shouldn't + // be! " + // "Retrying!\n"); + goto redo; + } + + char *Buffer = calloc(1, Size + 1); + (void)fread(Buffer, Size, 1, WriterInfo); + fclose(WriterInfo); + return Buffer; +} + +static int *setupPeerArray(int MySize, int MyRank, int PeerSize) +{ + int PortionSize = PeerSize / MySize; + int Leftovers = PeerSize - PortionSize * MySize; + int StartOffset = Leftovers; + int Start; + if (MyRank < Leftovers) + { + PortionSize++; + StartOffset = 0; + } + Start = PortionSize * MyRank + StartOffset; + int *MyPeers = malloc((PortionSize + 1) * sizeof(int)); + for (int i = 0; i < PortionSize; i++) + { + MyPeers[i] = Start + i; + } + MyPeers[PortionSize] = -1; + + return MyPeers; +} + +static void initWSReader(WS_ReaderInfo reader, int ReaderSize, + CP_ReaderInitInfo *reader_info) +{ + int WriterSize = reader->ParentStream->CohortSize; + int WriterRank = reader->ParentStream->Rank; + int i; + reader->ReaderCohortSize = ReaderSize; + reader->Connections = calloc(sizeof(reader->Connections[0]), ReaderSize); + for (i = 0; i < ReaderSize; i++) + { + reader->Connections[i].ContactList = + attr_list_from_string(reader_info[i]->ContactInfo); + reader->Connections[i].RemoteStreamID = reader_info[i]->ReaderID; + reader->Connections[i].CMconn = NULL; + } + reader->Peers = setupPeerArray(WriterSize, WriterRank, ReaderSize); + i = 0; + while (reader->Peers[i] != -1) + { + int peer = reader->Peers[i]; + reader->Connections[peer].CMconn = + CMget_conn(reader->ParentStream->CPInfo->cm, + reader->Connections[peer].ContactList); + i++; + } +} + +WS_ReaderInfo writer_participate_in_reader_open(SstStream Stream) +{ + RequestQueue Req; + reader_data_t ReturnData; + void *free_block = NULL; + int WriterResponseCondition = -1; + CMConnection conn; + CP_verbose(Stream, "Beginning writer-side reader open protocol\n"); + if (Stream->Rank == 0) + { + pthread_mutex_lock(&Stream->DataLock); + assert((Stream->ReadRequestQueue)); + Req = Stream->ReadRequestQueue; + Stream->ReadRequestQueue = Req->Next; + Req->Next = NULL; + pthread_mutex_unlock(&Stream->DataLock); + struct _CombinedReaderInfo reader_data; + reader_data.ReaderCohortSize = Req->Msg->ReaderCohortSize; + reader_data.CP_ReaderInfo = Req->Msg->CP_ReaderInfo; + reader_data.DP_ReaderInfo = Req->Msg->DP_ReaderInfo; + ReturnData = CP_distributeDataFromRankZero( + Stream, &reader_data, Stream->CPInfo->CombinedReaderInfoFormat, + &free_block); + WriterResponseCondition = Req->Msg->WriterResponseCondition; + conn = Req->Conn; + CMreturn_buffer(Stream->CPInfo->cm, Req->Msg); + free(Req); + } + else + { + ReturnData = CP_distributeDataFromRankZero( + Stream, NULL, Stream->CPInfo->CombinedReaderInfoFormat, + &free_block); + } + // printf("I am writer rank %d, my info on readers is:\n", Stream->Rank); + // FMdump_data(FMFormat_of_original(Stream->CPInfo->combined_reader_Format), + // ReturnData, 1024000); + // printf("\n"); + + Stream->Readers = realloc(Stream->Readers, sizeof(Stream->Readers[0]) * + (Stream->ReaderCount + 1)); + DP_WSR_Stream per_reader_Stream; + void *DP_WriterInfo; + void *ret_data_block; + CP_PeerConnection *connections_to_reader; + connections_to_reader = + calloc(sizeof(CP_PeerConnection), ReturnData->ReaderCohortSize); + for (int i = 0; i < ReturnData->ReaderCohortSize; i++) + { + attr_list attrs = + attr_list_from_string(ReturnData->CP_ReaderInfo[i]->ContactInfo); + connections_to_reader[i].ContactList = attrs; + connections_to_reader[i].RemoteStreamID = + ReturnData->CP_ReaderInfo[i]->ReaderID; + } + + per_reader_Stream = Stream->DP_Interface->initWriterPerReader( + &Svcs, Stream->DP_Stream, ReturnData->ReaderCohortSize, + connections_to_reader, ReturnData->DP_ReaderInfo, &DP_WriterInfo); + + WS_ReaderInfo CP_WSR_Stream = malloc(sizeof(*CP_WSR_Stream)); + Stream->Readers[Stream->ReaderCount] = CP_WSR_Stream; + CP_WSR_Stream->DP_WSR_Stream = per_reader_Stream; + CP_WSR_Stream->ParentStream = Stream; + CP_WSR_Stream->Connections = connections_to_reader; + initWSReader(CP_WSR_Stream, ReturnData->ReaderCohortSize, + ReturnData->CP_ReaderInfo); + + Stream->ReaderCount++; + + struct _CP_DP_PairInfo combined_init; + struct _CP_WriterInitInfo cpInfo; + + struct _CP_DP_PairInfo **pointers = NULL; + + cpInfo.ContactInfo = + attr_list_to_string(CMget_contact_list(Stream->CPInfo->cm)); + cpInfo.WriterID = CP_WSR_Stream; + + combined_init.CP_Info = (void **)&cpInfo; + combined_init.DP_Info = DP_WriterInfo; + + pointers = (struct _CP_DP_PairInfo **)CP_consolidateDataToRankZero( + Stream, &combined_init, Stream->CPInfo->PerRankWriterInfoFormat, + &ret_data_block); + + if (Stream->Rank == 0) + { + struct _WriterResponseMsg response; + memset(&response, 0, sizeof(response)); + response.WriterResponseCondition = WriterResponseCondition; + response.WriterCohortSize = Stream->CohortSize; + response.CP_WriterInfo = + malloc(response.WriterCohortSize * sizeof(void *)); + response.DP_WriterInfo = + malloc(response.WriterCohortSize * sizeof(void *)); + for (int i = 0; i < response.WriterCohortSize; i++) + { + response.CP_WriterInfo[i] = + (struct _CP_WriterInitInfo *)pointers[i]->CP_Info; + response.DP_WriterInfo[i] = pointers[i]->DP_Info; + } + CMwrite(conn, Stream->CPInfo->WriterResponseFormat, &response); + } + CP_verbose(Stream, "Finish writer-side reader open protocol for reader %p, " + "reader ready response pending\n", + CP_WSR_Stream); + return CP_WSR_Stream; +} + +static void waitForReaderResponse(WS_ReaderInfo Reader) +{ + SstStream Stream = Reader->ParentStream; + pthread_mutex_lock(&Stream->DataLock); + while (Reader->ReaderStatus != Established) + { + CP_verbose(Stream, "Waiting for Reader ready on WSR %p.\n", Reader); + pthread_cond_wait(&Stream->DataCondition, &Stream->DataLock); + } + assert(Reader->ReaderStatus == Established); + pthread_mutex_unlock(&Stream->DataLock); + CP_verbose(Stream, "Reader ready on WSR %p, Stream established.\n", Reader); +} + +SstStream SstWriterOpen(const char *Name, const char *params, MPI_Comm comm) +{ + SstStream Stream; + + Stream = CP_newStream(); + Stream->Role = WriterRole; + CP_parseParams(Stream, params); + + Stream->DP_Interface = LoadDP("dummy"); + + Stream->CPInfo = CP_getCPInfo(Stream->DP_Interface); + + Stream->mpiComm = comm; + if (Stream->WaitForFirstReader) + { + Stream->FirstReaderCondition = + CMCondition_get(Stream->CPInfo->cm, NULL); + } + else + { + Stream->FirstReaderCondition = -1; + } + + MPI_Comm_rank(Stream->mpiComm, &Stream->Rank); + MPI_Comm_size(Stream->mpiComm, &Stream->CohortSize); + + Stream->DP_Stream = Stream->DP_Interface->initWriter(&Svcs, Stream); + + if (Stream->Rank == 0) + { + writeContactInfo(Name, Stream); + } + + CP_verbose(Stream, "Opening Stream \"%s\"\n", Name); + + if (Stream->WaitForFirstReader) + { + WS_ReaderInfo reader; + CP_verbose( + Stream, + "Stream parameter requires rendezvous, waiting for first reader\n"); + if (Stream->Rank == 0) + { + pthread_mutex_lock(&Stream->DataLock); + if (Stream->ReadRequestQueue == NULL) + { + pthread_cond_wait(&Stream->DataCondition, &Stream->DataLock); + } + assert(Stream->ReadRequestQueue); + pthread_mutex_unlock(&Stream->DataLock); + } + MPI_Barrier(Stream->mpiComm); + + struct timeval Start, Stop, Diff; + gettimeofday(&Start, NULL); + reader = writer_participate_in_reader_open(Stream); + waitForReaderResponse(reader); + gettimeofday(&Stop, NULL); + timersub(&Stop, &Start, &Diff); + Stream->OpenTimeSecs = (double)Diff.tv_usec / 1e6 + Diff.tv_sec; + MPI_Barrier(Stream->mpiComm); + gettimeofday(&Stream->ValidStartTime, NULL); + } + CP_verbose(Stream, "Finish opening Stream \"%s\"\n", Name); + return Stream; +} + +void sendOneToEachReaderRank(SstStream s, CMFormat f, void *Msg, + void **RS_StreamPtr) +{ + for (int i = 0; i < s->ReaderCount; i++) + { + int j = 0; + WS_ReaderInfo CP_WSR_Stream = s->Readers[i]; + while (CP_WSR_Stream->Peers[j] != -1) + { + int peer = CP_WSR_Stream->Peers[j]; + CMConnection conn = CP_WSR_Stream->Connections[peer].CMconn; + /* add the reader-rank-specific Stream identifier to each outgoing + * message */ + *RS_StreamPtr = CP_WSR_Stream->Connections[peer].RemoteStreamID; + CMwrite(conn, f, Msg); + j++; + } + } +} + +void SstWriterClose(SstStream Stream) +{ + struct _WriterCloseMsg Msg; + struct timeval CloseTime, Diff; + Msg.FinalTimestep = Stream->LastProvidedTimestep; + sendOneToEachReaderRank(Stream, Stream->CPInfo->WriterCloseFormat, &Msg, + &Msg.RS_Stream); + + /* wait until all queued data is sent */ + CP_verbose(Stream, "Checking for queued timesteps in WriterClose\n"); + pthread_mutex_lock(&Stream->DataLock); + while (Stream->QueuedTimesteps) + { + CP_verbose(Stream, + "Waiting for timesteps to be released in WriterClose\n"); + pthread_cond_wait(&Stream->DataCondition, &Stream->DataLock); + } + pthread_mutex_unlock(&Stream->DataLock); + + gettimeofday(&CloseTime, NULL); + timersub(&CloseTime, &Stream->ValidStartTime, &Diff); + if (Stream->Stats) + Stream->Stats->ValidTimeSecs = (double)Diff.tv_usec / 1e6 + Diff.tv_sec; + + CP_verbose(Stream, "All timesteps are released in WriterClose\n"); +} + +void SstProvideTimestep(SstStream s, SstMetadata LocalMetadata, SstData Data, + long Timestep) +{ + void *data_block; + MetadataPlusDPInfo *pointers; + struct _TimestepMetadataMsg Msg; + void *DP_TimestepInfo = NULL; + struct _MetadataPlusDPInfo Md; + CPTimestepList Entry = malloc(sizeof(struct _CPTimestepEntry)); + + s->DP_Interface->provideTimestep(&Svcs, s->DP_Stream, Data, LocalMetadata, + Timestep, &DP_TimestepInfo); + + Md.Metadata = LocalMetadata; + Md.DP_TimestepInfo = DP_TimestepInfo; + + pointers = (MetadataPlusDPInfo *)CP_consolidateDataToAll( + s, &Md, s->CPInfo->PerRankMetadataFormat, &data_block); + + Msg.CohortSize = s->CohortSize; + Msg.Timestep = s->WriterTimestep++; + + /* separate metadata and DP_info to separate arrays */ + Msg.Metadata = malloc(s->CohortSize * sizeof(void *)); + Msg.DP_TimestepInfo = malloc(s->CohortSize * sizeof(void *)); + int NullCount = 0; + for (int i = 0; i < s->CohortSize; i++) + { + Msg.Metadata[i] = pointers[i]->Metadata; + Msg.DP_TimestepInfo[i] = pointers[i]->DP_TimestepInfo; + if (pointers[i]->DP_TimestepInfo == NULL) + NullCount++; + } + if (NullCount == s->CohortSize) + { + free(Msg.DP_TimestepInfo); + Msg.DP_TimestepInfo = NULL; + } + + CP_verbose(s, + "Sending TimestepMetadata for timestep %d, one to each reader\n", + Timestep); + + /* + * lock this Stream's data and queue the timestep + */ + pthread_mutex_lock(&s->DataLock); + s->LastProvidedTimestep = Timestep; + Entry->Data = Data; + Entry->Timestep = Timestep; + Entry->MetadataArray = Msg.Metadata; + Entry->DP_TimestepInfo = Msg.DP_TimestepInfo; + Entry->Next = s->QueuedTimesteps; + s->QueuedTimesteps = Entry; + s->QueuedTimestepCount++; + /* no one waits on timesteps being added, so no condition signal to note + * change */ + pthread_mutex_unlock(&s->DataLock); + + sendOneToEachReaderRank(s, s->CPInfo->DeliverTimestepMetadataFormat, &Msg, + &Msg.RS_Stream); +} + +static void **participate_in_reader_init_data_exchange(SstStream Stream, + void *dpInfo, + void **ret_data_block) +{ + + struct _CP_DP_PairInfo combined_init; + struct _CP_ReaderInitInfo cpInfo; + + struct _CP_DP_PairInfo **pointers = NULL; + + cpInfo.ContactInfo = + attr_list_to_string(CMget_contact_list(Stream->CPInfo->cm)); + cpInfo.ReaderID = Stream; + + combined_init.CP_Info = (void **)&cpInfo; + combined_init.DP_Info = dpInfo; + + pointers = (struct _CP_DP_PairInfo **)CP_consolidateDataToRankZero( + Stream, &combined_init, Stream->CPInfo->PerRankReaderInfoFormat, + ret_data_block); + return (void **)pointers; +} + +SstStream SstReaderOpen(const char *Name, const char *params, MPI_Comm comm) +{ + SstStream Stream; + void *dpInfo; + struct _CP_DP_PairInfo **pointers; + void *data_block; + void *free_block; + writer_data_t ReturnData; + struct _ReaderActivateMsg Msg; + struct timeval Start, Stop, Diff; + int i; + + Stream = CP_newStream(); + Stream->Role = ReaderRole; + + CP_parseParams(Stream, params); + + Stream->DP_Interface = LoadDP("dummy"); + + Stream->CPInfo = CP_getCPInfo(Stream->DP_Interface); + + Stream->mpiComm = comm; + + MPI_Comm_rank(Stream->mpiComm, &Stream->Rank); + MPI_Comm_size(Stream->mpiComm, &Stream->CohortSize); + printf("READER COHORT SIZE %d\n", Stream->CohortSize); + + Stream->DP_Stream = + Stream->DP_Interface->initReader(&Svcs, Stream, &dpInfo); + + pointers = + (struct _CP_DP_PairInfo **)participate_in_reader_init_data_exchange( + Stream, dpInfo, &data_block); + + gettimeofday(&Start, NULL); + + if (Stream->Rank == 0) + { + char *writer_0_contact = readContactInfo(Name, Stream); + void *writer_file_ID; + char *cm_contact_string = + malloc(strlen(writer_0_contact)); /* at least long enough */ + sscanf(writer_0_contact, "%p:%s", &writer_file_ID, cm_contact_string); + // printf("Writer contact info is fileID %p, contact info %s\n", + // writer_file_ID, cm_contact_string); + + attr_list WriterRank0Contact = attr_list_from_string(cm_contact_string); + CMConnection conn = CMget_conn(Stream->CPInfo->cm, WriterRank0Contact); + struct _ReaderRegisterMsg reader_register; + + reader_register.WriterFile = writer_file_ID; + reader_register.WriterResponseCondition = + CMCondition_get(Stream->CPInfo->cm, conn); + reader_register.ReaderCohortSize = Stream->CohortSize; + reader_register.CP_ReaderInfo = + malloc(reader_register.ReaderCohortSize * sizeof(void *)); + reader_register.DP_ReaderInfo = + malloc(reader_register.ReaderCohortSize * sizeof(void *)); + for (int i = 0; i < reader_register.ReaderCohortSize; i++) + { + reader_register.CP_ReaderInfo[i] = + (CP_ReaderInitInfo)pointers[i]->CP_Info; + reader_register.DP_ReaderInfo[i] = pointers[i]->DP_Info; + } + /* the response value is set in the handler */ + struct _WriterResponseMsg *response = NULL; + CMCondition_set_client_data(Stream->CPInfo->cm, + reader_register.WriterResponseCondition, + &response); + + CMwrite(conn, Stream->CPInfo->ReaderRegisterFormat, &reader_register); + /* wait for "go" from writer */ + CP_verbose( + Stream, + "Waiting for writer response message in SstReadOpen(\"%s\")\n", + Name, reader_register.WriterResponseCondition); + CMCondition_wait(Stream->CPInfo->cm, + reader_register.WriterResponseCondition); + CP_verbose(Stream, + "finished wait writer response message in read_open\n"); + + assert(response); + struct _CombinedWriterInfo writer_data; + writer_data.WriterCohortSize = response->WriterCohortSize; + writer_data.CP_WriterInfo = response->CP_WriterInfo; + writer_data.DP_WriterInfo = response->DP_WriterInfo; + ReturnData = CP_distributeDataFromRankZero( + Stream, &writer_data, Stream->CPInfo->CombinedWriterInfoFormat, + &free_block); + } + else + { + ReturnData = CP_distributeDataFromRankZero( + Stream, NULL, Stream->CPInfo->CombinedWriterInfoFormat, + &free_block); + } + // printf("I am reader rank %d, my info on writers is:\n", Stream->Rank); + // FMdump_data(FMFormat_of_original(Stream->CPInfo->combined_writer_Format), + // ReturnData, 1024000); + // printf("\n"); + + Stream->ConnectionsToWriter = + calloc(sizeof(CP_PeerConnection), ReturnData->WriterCohortSize); + for (i = 0; i < ReturnData->WriterCohortSize; i++) + { + attr_list attrs = + attr_list_from_string(ReturnData->CP_WriterInfo[i]->ContactInfo); + Stream->ConnectionsToWriter[i].ContactList = attrs; + Stream->ConnectionsToWriter[i].RemoteStreamID = + ReturnData->CP_WriterInfo[i]->WriterID; + } + + Stream->Peers = setupPeerArray(Stream->CohortSize, Stream->Rank, + ReturnData->WriterCohortSize); + i = 0; + while (Stream->Peers[i] != -1) + { + int peer = Stream->Peers[i]; + Stream->ConnectionsToWriter[peer].CMconn = CMget_conn( + Stream->CPInfo->cm, Stream->ConnectionsToWriter[peer].ContactList); + i++; + } + + Stream->DP_Interface->provideWriterDataToReader( + &Svcs, Stream->DP_Stream, ReturnData->WriterCohortSize, + Stream->ConnectionsToWriter, ReturnData->DP_WriterInfo); + pthread_mutex_lock(&Stream->DataLock); + Stream->Status = Established; + pthread_mutex_unlock(&Stream->DataLock); + CP_verbose(Stream, "Sending Reader Activate messages to writer\n"); + sendOneToEachWriterRank(Stream, Stream->CPInfo->ReaderActivateFormat, &Msg, + &Msg.WSR_Stream); + CP_verbose(Stream, "Finish opening Stream \"%s\"\n", Name); + gettimeofday(&Stop, NULL); + timersub(&Stop, &Start, &Diff); + Stream->OpenTimeSecs = (double)Diff.tv_usec / 1e6 + Diff.tv_sec; + gettimeofday(&Stream->ValidStartTime, NULL); + return Stream; +} + +void queueReaderRegisterMsgAndNotify(SstStream Stream, + struct _ReaderRegisterMsg *Req, + CMConnection conn) +{ + pthread_mutex_lock(&Stream->DataLock); + RequestQueue New = malloc(sizeof(struct _RequestQueue)); + New->Msg = Req; + New->Conn = conn; + New->Next = NULL; + if (Stream->ReadRequestQueue) + { + RequestQueue Last = Stream->ReadRequestQueue; + while (Last->Next) + { + Last = Last->Next; + } + Last->Next = New; + } + else + { + Stream->ReadRequestQueue = New; + } + pthread_cond_signal(&Stream->DataCondition); + pthread_mutex_unlock(&Stream->DataLock); +} + +void queueTimestepMetadataMsgAndNotify(SstStream Stream, + struct _TimestepMetadataMsg *tsm, + CMConnection conn) +{ + pthread_mutex_lock(&Stream->DataLock); + struct _TimestepMetadataList *New = malloc(sizeof(struct _RequestQueue)); + New->MetadataMsg = tsm; + New->Next = NULL; + if (Stream->Timesteps) + { + struct _TimestepMetadataList *Last = Stream->Timesteps; + while (Last->Next) + { + Last = Last->Next; + } + Last->Next = New; + } + else + { + Stream->Timesteps = New; + } + pthread_cond_signal(&Stream->DataCondition); + pthread_mutex_unlock(&Stream->DataLock); +} + +void CP_ReaderRegisterHandler(CManager cm, CMConnection conn, void *Msg_v, + void *client_data, attr_list attrs) +{ + SstStream Stream; + struct _ReaderRegisterMsg *Msg = (struct _ReaderRegisterMsg *)Msg_v; + // fprintf(stderr, + // "Received a reader registration message directed at writer + // %p\n", + // Msg->writer_file); + // fprintf(stderr, "A reader cohort of size %d is requesting to be + // added\n", + // Msg->ReaderCohortSize); + // for (int i = 0; i < Msg->ReaderCohortSize; i++) { + // fprintf(stderr, " rank %d CP contact info: %s, %d, %p\n", i, + // Msg->CP_ReaderInfo[i]->ContactInfo, + // Msg->CP_ReaderInfo[i]->target_stone, + // Msg->CP_ReaderInfo[i]->ReaderID); + // } + Stream = Msg->WriterFile; + + /* arrange for this message data to stay around */ + CMtake_buffer(cm, Msg); + + queueReaderRegisterMsgAndNotify(Stream, Msg, conn); +} + +void CP_ReaderActivateHandler(CManager cm, CMConnection conn, void *Msg_v, + void *client_data, attr_list attrs) +{ + struct _ReaderActivateMsg *Msg = (struct _ReaderActivateMsg *)Msg_v; + + WS_ReaderInfo CP_WSR_Stream = Msg->WSR_Stream; + CP_verbose(CP_WSR_Stream->ParentStream, "Reader Activate message received " + "for Stream %p. Setting state to " + "Established.\n", + CP_WSR_Stream); + pthread_mutex_lock(&CP_WSR_Stream->ParentStream->DataLock); + CP_WSR_Stream->ReaderStatus = Established; + /* + * the main thread might be waiting for this + */ + pthread_cond_signal(&CP_WSR_Stream->ParentStream->DataCondition); + pthread_mutex_unlock(&CP_WSR_Stream->ParentStream->DataLock); +} + +void CP_TimestepMetadataHandler(CManager cm, CMConnection conn, void *Msg_v, + void *client_data, attr_list attrs) +{ + SstStream Stream; + struct _TimestepMetadataMsg *Msg = (struct _TimestepMetadataMsg *)Msg_v; + Stream = (SstStream)Msg->RS_Stream; + CP_verbose(Stream, + "Received an incoming metadata message for timestep %d\n", + Stream->Rank, Msg->Timestep); + + /* arrange for this message data to stay around */ + CMtake_buffer(cm, Msg); + + queueTimestepMetadataMsgAndNotify(Stream, Msg, conn); +} + +void CP_WriterResponseHandler(CManager cm, CMConnection conn, void *Msg_v, + void *client_data, attr_list attrs) +{ + struct _WriterResponseMsg *Msg = (struct _WriterResponseMsg *)Msg_v; + struct _WriterResponseMsg **response_ptr; + // fprintf(stderr, "Received a writer_response message for condition + // %d\n", + // Msg->WriterResponseCondition); + // fprintf(stderr, "The responding writer has cohort of size %d :\n", + // Msg->writer_CohortSize); + // for (int i = 0; i < Msg->writer_CohortSize; i++) { + // fprintf(stderr, " rank %d CP contact info: %s, %p\n", i, + // Msg->CP_WriterInfo[i]->ContactInfo, + // Msg->CP_WriterInfo[i]->WriterID); + // } + + /* arrange for this message data to stay around */ + CMtake_buffer(cm, Msg); + + /* attach the message to the CMCondition so it an be retrieved by the main + * thread */ + response_ptr = + CMCondition_get_client_data(cm, Msg->WriterResponseCondition); + *response_ptr = Msg; + + /* wake the main thread */ + CMCondition_signal(cm, Msg->WriterResponseCondition); +} + +extern CPTimestepList dequeueTimestep(SstStream Stream, long Timestep) +{ + CPTimestepList Ret = NULL; + CPTimestepList List = NULL; + pthread_mutex_lock(&Stream->DataLock); + List = Stream->QueuedTimesteps; + if (Stream->QueuedTimesteps->Timestep == Timestep) + { + Stream->QueuedTimesteps = List->Next; + Ret = List; + } + else + { + CPTimestepList Last = List; + List = List->Next; + while (List != NULL) + { + if (List->Timestep == Timestep) + { + Last->Next = List->Next; + Ret = List; + } + Last = List; + List = List->Next; + } + if (Ret == NULL) + { + /* + * Shouldn't ever get here because we should never dequeue a + * timestep that we don't have. + */ + fprintf(stderr, "Failed to dequeue Timestep %ld, not found\n", + Timestep); + assert(0); + } + } + Stream->QueuedTimestepCount--; + /* main thread might be waiting on timesteps going away */ + pthread_cond_signal(&Stream->DataCondition); + pthread_mutex_unlock(&Stream->DataLock); + return NULL; +} + +extern void CP_ReleaseTimestepHandler(CManager cm, CMConnection conn, + void *Msg_v, void *client_data, + attr_list attrs) +{ + struct _ReleaseTimestepMsg *Msg = (struct _ReleaseTimestepMsg *)Msg_v; + WS_ReaderInfo Reader = (WS_ReaderInfo)Msg->WSR_Stream; + SstStream Stream = Reader->ParentStream; + CPTimestepList Entry = NULL; + + CP_verbose(Stream, "Received a release timestep message " + "for timestep %d\n", + Msg->Timestep); + + /* + * This needs reconsideration for multiple readers. Currently we do + * provideTimestep once for the "parent" Stream. We call data plane + * releaseTimestep whenever we get any release from any reader. This is + * fine while it's one-to-one. But if not, someone needs to be keeping + * track. Perhaps with reference counts, but still handling the failure + * situation where knowing how to adjust the reference count is hard. + * Left for later at the moment. + */ + Stream->DP_Interface->releaseTimestep( + &Svcs, Reader->ParentStream->DP_Stream, Msg->Timestep); + + Entry = dequeueTimestep(Reader->ParentStream, Msg->Timestep); + free(Entry); +} + +extern void CP_WriterCloseHandler(CManager cm, CMConnection conn, void *Msg_v, + void *client_data, attr_list attrs) +{ + WriterCloseMsg Msg = (WriterCloseMsg)Msg_v; + SstStream Stream = (SstStream)Msg->RS_Stream; + + CP_verbose(Stream, "Received a writer close message. " + "Timestep %d was the final timestep.\n", + Msg->FinalTimestep); + + pthread_mutex_lock(&Stream->DataLock); + Stream->Status = PeerClosed; + /* wake anyone that might be waiting */ + pthread_cond_signal(&Stream->DataCondition); + pthread_mutex_unlock(&Stream->DataLock); +} + +static TSMetadataList waitForMetadata(SstStream Stream, long timestep) +{ + struct _TimestepMetadataList *Next; + pthread_mutex_lock(&Stream->DataLock); + Next = Stream->Timesteps; + while (1) + { + Next = Stream->Timesteps; + while (Next) + { + if (Next->MetadataMsg->Timestep == timestep) + { + pthread_mutex_unlock(&Stream->DataLock); + return Next; + } + Next = Next->Next; + } + /* didn't find requested timestep, check Stream status */ + if (Stream->Status != Established) + { + /* closed or failed, return NULL */ + return NULL; + } + /* wait until we get the timestep metadata or something else changes */ + pthread_cond_wait(&Stream->DataCondition, &Stream->DataLock); + } + /* NOTREACHED */ + pthread_mutex_unlock(&Stream->DataLock); +} + +extern SstFullMetadata SstGetMetadata(SstStream Stream, long timestep) +{ + TSMetadataList Entry; + SstFullMetadata Ret; + Entry = waitForMetadata(Stream, timestep); + if (Entry) + { + Ret = malloc(sizeof(struct _SstFullMetadata)); + Ret->WriterCohortSize = Entry->MetadataMsg->CohortSize; + Ret->WriterMetadata = Entry->MetadataMsg->Metadata; + if (Stream->DP_Interface->TimestepInfoFormats == NULL) + { + // DP didn't provide struct info, no valid data + Ret->DP_TimestepInfo = NULL; + } + else + { + Ret->DP_TimestepInfo = Entry->MetadataMsg->DP_TimestepInfo; + } + Stream->CurrentWorkingTimestep = timestep; + return Ret; + } + assert(Stream->Status != Established); + return NULL; +} + +extern void *SstReadRemoteMemory(SstStream Stream, int Rank, long Timestep, + size_t Offset, size_t Length, void *Buffer, + void *DP_TimestepInfo) +{ + if (Stream->Stats) + Stream->Stats->BytesTransferred += Length; + return Stream->DP_Interface->readRemoteMemory( + &Svcs, Stream->DP_Stream, Rank, Timestep, Offset, Length, Buffer, + DP_TimestepInfo); +} + +void sendOneToEachWriterRank(SstStream s, CMFormat f, void *Msg, + void **WS_StreamPtr) +{ + int i = 0; + while (s->Peers[i] != -1) + { + int peer = s->Peers[i]; + CMConnection conn = s->ConnectionsToWriter[peer].CMconn; + /* add the writer Stream identifier to each outgoing + * message */ + *WS_StreamPtr = s->ConnectionsToWriter[peer].RemoteStreamID; + CMwrite(conn, f, Msg); + i++; + } +} + +extern void SstReleaseStep(SstStream Stream, long Timestep) +{ + long MaxTimestep; + struct _ReleaseTimestepMsg Msg; + + /* + * remove local metadata for that timestep + */ + pthread_mutex_lock(&Stream->DataLock); + struct _TimestepMetadataList *List = Stream->Timesteps; + + if (Stream->Timesteps->MetadataMsg->Timestep == Timestep) + { + Stream->Timesteps = List->Next; + free(List); + } + else + { + struct _TimestepMetadataList *last = List; + List = List->Next; + while (List != NULL) + { + if (List->MetadataMsg->Timestep == Timestep) + { + last->Next = List->Next; + free(List); + } + last = List; + List = List->Next; + } + } + pthread_mutex_unlock(&Stream->DataLock); + + /* + * this can be just a barrier (to ensure that everyone has called + * SstReleaseStep), but doing a reduce and comparing the returned max to + * our value will detect if someone is calling with a different timestep + * value (which would be bad). This is a relatively cheap upcost from + * simple barrier in return for robustness checking. + */ + MPI_Allreduce(&Timestep, &MaxTimestep, 1, MPI_LONG, MPI_MAX, + Stream->mpiComm); + assert((Timestep == MaxTimestep) && "All ranks must be in sync. Someone " + "called SstReleaseTimestep with a " + "different timestep value"); + + Msg.Timestep = Timestep; + + /* + * send each writer rank a release for this timestep (actually goes to WSR + * Streams) + */ + CP_verbose( + Stream, + "Sending ReleaseTimestep message for timestep %d, one to each writer\n", + Timestep); + sendOneToEachWriterRank(Stream, Stream->CPInfo->ReleaseTimestepFormat, &Msg, + &Msg.WSR_Stream); +} + +/* + * wait for metadata for Timestep indicated to arrive, or fail with EndOfStream + * or Error + */ +extern SstStatusValue SstAdvanceStep(SstStream Stream, long Timestep) +{ + + Stream->CurrentWorkingTimestep = Timestep; + + TSMetadataList Entry; + Entry = waitForMetadata(Stream, Timestep); + if (Entry) + { + Stream->CurrentWorkingTimestep = Timestep; + CP_verbose(Stream, "SstAdvanceStep returning Success on timestep %d\n", + Timestep); + return SstSuccess; + } + if (Stream->Status == PeerClosed) + { + CP_verbose(Stream, + "SstAdvanceStep returning EndOfStream at timestep %d\n", + Timestep); + return SstEndOfStream; + } + else + { + CP_verbose(Stream, + "SstAdvanceStep returning FatalError at timestep %d\n", + Timestep); + return SstFatalError; + } +} + +extern void SstReaderClose(SstStream Stream) +{ + /* need to have a reader-side shutdown protocol, but for now, just sleep for + * a little while to makes sure our release message for the last timestep + * got received */ + struct timeval CloseTime, Diff; + gettimeofday(&CloseTime, NULL); + timersub(&CloseTime, &Stream->ValidStartTime, &Diff); + if (Stream->Stats) + Stream->Stats->ValidTimeSecs = (double)Diff.tv_usec / 1e6 + Diff.tv_sec; + + CMsleep(Stream->CPInfo->cm, 1); +} + +extern SstStatusValue SstWaitForCompletion(SstStream Stream, void *handle) +{ + // We need a way to return an error from DP */ + Stream->DP_Interface->waitForCompletion(&Svcs, handle); + return SstSuccess; +} + +extern void SstSetStatsSave(SstStream Stream, SstStats Stats) +{ + Stats->OpenTimeSecs = Stream->OpenTimeSecs; + Stream->Stats = Stats; +} + +static void DP_verbose(SstStream s, char *Format, ...) +{ + if (s->Verbose) + { + va_list Args; + va_start(Args, Format); + if (s->Role == ReaderRole) + { + fprintf(stderr, "DP Reader %d (%p): ", s->Rank, s); + } + else + { + fprintf(stderr, "DP Writer %d (%p): ", s->Rank, s); + } + vfprintf(stderr, Format, Args); + va_end(Args); + } +} +extern void CP_verbose(SstStream s, char *Format, ...) +{ + if (s->Verbose) + { + va_list Args; + va_start(Args, Format); + if (s->Role == ReaderRole) + { + fprintf(stderr, "Reader %d (%p): ", s->Rank, s); + } + else + { + fprintf(stderr, "Writer %d (%p): ", s->Rank, s); + } + vfprintf(stderr, Format, Args); + va_end(Args); + } +} + +static CManager CP_getCManager(SstStream Stream) { return Stream->CPInfo->cm; } + +static MPI_Comm CP_getMPIComm(SstStream Stream) { return Stream->mpiComm; } + +static void CP_sendToPeer(SstStream s, CP_PeerCohort Cohort, int Rank, + CMFormat Format, void *Data) +{ + CP_PeerConnection *Peers = (CP_PeerConnection *)Cohort; + if (Peers[Rank].CMconn == NULL) + { + Peers[Rank].CMconn = CMget_conn(s->CPInfo->cm, Peers[Rank].ContactList); + } + CMwrite(Peers[Rank].CMconn, Format, Data); +} diff --git a/source/adios2/toolkit/sst/cp/cp_common.c b/source/adios2/toolkit/sst/cp/cp_common.c new file mode 100644 index 0000000000000000000000000000000000000000..f546279441a99083b661c1c23c787f0579dfcafa --- /dev/null +++ b/source/adios2/toolkit/sst/cp/cp_common.c @@ -0,0 +1,629 @@ +#include <stdio.h> +#include <stdlib.h> +#include <string.h> +#include <sys/stat.h> + +#include <atl.h> +#include <evpath.h> +#include <mpi.h> + +#include "sst.h" + +#include "cp_internal.h" + +void CP_parseParams(SstStream Stream, const char *Params) +{ + Stream->WaitForFirstReader = 1; +} + +static FMField CP_ReaderInitList[] = { + {"ContactInfo", "string", sizeof(char *), + FMOffset(CP_ReaderInitInfo, ContactInfo)}, + {"reader_ID", "integer", sizeof(void *), + FMOffset(CP_ReaderInitInfo, ReaderID)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec CP_ReaderInitStructs[] = { + {"cp_reader", CP_ReaderInitList, sizeof(struct _CP_ReaderInitInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField CP_WriterInitList[] = { + {"ContactInfo", "string", sizeof(char *), + FMOffset(CP_WriterInitInfo, ContactInfo)}, + {"WriterID", "integer", sizeof(void *), + FMOffset(CP_WriterInitInfo, WriterID)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec CP_WriterInitStructs[] = { + {"cp_writer", CP_WriterInitList, sizeof(struct _CP_WriterInitInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField CP_DP_PairList[] = { + {"CP_Info", "*CP_STRUCT", 0, FMOffset(struct _CP_DP_PairInfo *, CP_Info)}, + {"DP_Info", "*DP_STRUCT", 0, FMOffset(struct _CP_DP_PairInfo *, DP_Info)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec CP_DP_PairStructs[] = { + {"CP_DP_pair", CP_DP_PairList, sizeof(struct _CP_DP_PairInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMStructDescRec CP_DP_WriterPairStructs[] = { + {"CP_DP_WriterPair", CP_DP_PairList, sizeof(struct _CP_DP_PairInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField CP_DP_ArrayReaderList[] = { + {"ReaderCohortSize", "integer", sizeof(int), + FMOffset(struct _CombinedReaderInfo *, ReaderCohortSize)}, + {"CP_ReaderInfo", "(*CP_STRUCT)[ReaderCohortSize]", + sizeof(struct _CP_ReaderInitInfo), + FMOffset(struct _CombinedReaderInfo *, CP_ReaderInfo)}, + {"DP_ReaderInfo", "(*DP_STRUCT)[ReaderCohortSize]", 0, + FMOffset(struct _CombinedReaderInfo *, DP_ReaderInfo)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec CP_DP_ReaderArrayStructs[] = { + {"CombinedReaderInfo", CP_DP_ArrayReaderList, + sizeof(struct _CombinedReaderInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField CP_DP_ArrayWriterList[] = { + {"WriterCohortSize", "integer", sizeof(int), + FMOffset(struct _CombinedWriterInfo *, WriterCohortSize)}, + {"CP_WriterInfo", "(*CP_STRUCT)[WriterCohortSize]", + sizeof(struct _CP_WriterInitInfo), + FMOffset(struct _CombinedWriterInfo *, CP_WriterInfo)}, + {"DP_WriterInfo", "(*DP_STRUCT)[WriterCohortSize]", 0, + FMOffset(struct _CombinedWriterInfo *, DP_WriterInfo)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec CP_DP_WriterArrayStructs[] = { + {"CombinedWriterInfo", CP_DP_ArrayWriterList, + sizeof(struct _CombinedWriterInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField CP_ReaderRegisterList[] = { + {"writer_ID", "integer", sizeof(void *), + FMOffset(struct _ReaderRegisterMsg *, WriterFile)}, + {"writer_response_condition", "integer", sizeof(int), + FMOffset(struct _ReaderRegisterMsg *, WriterResponseCondition)}, + {"ReaderCohortSize", "integer", sizeof(int), + FMOffset(struct _ReaderRegisterMsg *, ReaderCohortSize)}, + {"CP_ReaderInfo", "(*CP_STRUCT)[ReaderCohortSize]", + sizeof(struct _CP_ReaderInitInfo), + FMOffset(struct _ReaderRegisterMsg *, CP_ReaderInfo)}, + {"DP_ReaderInfo", "(*DP_STRUCT)[ReaderCohortSize]", 0, + FMOffset(struct _ReaderRegisterMsg *, DP_ReaderInfo)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec CP_ReaderRegisterStructs[] = { + {"ReaderRegister", CP_ReaderRegisterList, sizeof(struct _ReaderRegisterMsg), + NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField CP_WriterResponseList[] = { + {"WriterResponseCondition", "integer", sizeof(int), + FMOffset(struct _WriterResponseMsg *, WriterResponseCondition)}, + {"WriterCohortSize", "integer", sizeof(int), + FMOffset(struct _WriterResponseMsg *, WriterCohortSize)}, + {"cp_WriterInfo", "(*CP_STRUCT)[WriterCohortSize]", + sizeof(struct _CP_WriterInitInfo), + FMOffset(struct _WriterResponseMsg *, CP_WriterInfo)}, + {"dp_WriterInfo", "(*DP_STRUCT)[WriterCohortSize]", 0, + FMOffset(struct _WriterResponseMsg *, DP_WriterInfo)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec CP_WriterResponseStructs[] = { + {"WriterResponse", CP_WriterResponseList, sizeof(struct _WriterResponseMsg), + NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField SstMetadataList[] = {{"DataSize", "integer", sizeof(size_t), + FMOffset(struct _SstMetadata *, DataSize)}, + {"VarCount", "integer", sizeof(int), + FMOffset(struct _SstMetadata *, VarCount)}, + {"Vars", "VarMetadata[VarCount]", + sizeof(struct _SstVarMeta), + FMOffset(struct _SstMetadata *, Vars)}, + {NULL, NULL, 0, 0}}; + +static FMField SstVarMetaList[] = { + {"VarName", "string", sizeof(char *), + FMOffset(struct _SstVarMeta *, VarName)}, + {"DimensionCount", "integer", sizeof(int), + FMOffset(struct _SstVarMeta *, DimensionCount)}, + {"Dimensions", "VarDimension[DimensionCount]", sizeof(struct _SstDimenMeta), + FMOffset(struct _SstVarMeta *, Dimensions)}, + {"DataOffsetInBlock", "integer", sizeof(int), + FMOffset(struct _SstVarMeta *, DataOffsetInBlock)}, + {NULL, NULL, 0, 0}}; + +static FMField SstDimenMetaList[] = { + {"Offset", "integer", sizeof(int), + FMOffset(struct _SstDimenMeta *, Offset)}, + {"Size", "integer", sizeof(int), FMOffset(struct _SstDimenMeta *, Size)}, + {"GlobalSize", "integer", sizeof(int), + FMOffset(struct _SstDimenMeta *, GlobalSize)}, + {NULL, NULL, 0, 0}}; + +static FMField MetaDataPlusDPInfoList[] = { + {"Metadata", "*SstMetadata", sizeof(struct _SstMetadata), + FMOffset(struct _MetadataPlusDPInfo *, Metadata)}, + {"DP_TimestepInfo", "*DP_STRUCT", 0, + FMOffset(struct _MetadataPlusDPInfo *, DP_TimestepInfo)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec MetaDataPlusDPInfoStructs[] = { + {"MetaDataPlusDPInfo", MetaDataPlusDPInfoList, + sizeof(struct _MetadataPlusDPInfo), NULL}, + {"SstMetadata", SstMetadataList, sizeof(struct _SstMetadata), NULL}, + {"VarMetadata", SstVarMetaList, sizeof(struct _SstVarMeta), NULL}, + {"VarDimension", SstDimenMetaList, sizeof(struct _SstDimenMeta), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField TimestepMetadataList[] = { + {"RS_Stream", "integer", sizeof(void *), + FMOffset(struct _TimestepMetadataMsg *, RS_Stream)}, + {"timestep", "integer", sizeof(int), + FMOffset(struct _TimestepMetadataMsg *, Timestep)}, + {"cohort_size", "integer", sizeof(int), + FMOffset(struct _TimestepMetadataMsg *, CohortSize)}, + {"metadata", "(*SstMetadata)[cohort_size]", sizeof(struct _SstMetadata), + FMOffset(struct _TimestepMetadataMsg *, Metadata)}, + {"TP_TimestepInfo", "(*DP_STRUCT)[cohort_size]", 0, + FMOffset(struct _TimestepMetadataMsg *, DP_TimestepInfo)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec TimestepMetadataStructs[] = { + {"timestepMetadata", TimestepMetadataList, + sizeof(struct _TimestepMetadataMsg), NULL}, + {"SstMetadata", SstMetadataList, sizeof(struct _SstMetadata), NULL}, + {"VarMetadata", SstVarMetaList, sizeof(struct _SstVarMeta), NULL}, + {"VarDimension", SstDimenMetaList, sizeof(struct _SstDimenMeta), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField ReleaseTimestepList[] = { + {"WSR_Stream", "integer", sizeof(void *), + FMOffset(struct _ReleaseTimestepMsg *, WSR_Stream)}, + {"Timestep", "integer", sizeof(int), + FMOffset(struct _ReleaseTimestepMsg *, Timestep)}, + {NULL, NULL, 0, 0}}; + +static FMField ReaderActivateList[] = { + {"WSR_Stream", "integer", sizeof(void *), + FMOffset(struct _ReaderActivateMsg *, WSR_Stream)}, + {NULL, NULL, 0, 0}}; + +static FMField WriterCloseList[] = { + {"RS_Stream", "integer", sizeof(void *), + FMOffset(struct _WriterCloseMsg *, RS_Stream)}, + {"FinalTimestep", "integer", sizeof(int), + FMOffset(struct _WriterCloseMsg *, FinalTimestep)}, + {NULL, NULL, 0, 0}}; + +static void replaceFormatNameInFieldList(FMStructDescList l, char *orig, + char *repl, int repl_size) +{ + int i = 0; + while (l[i].format_name) + { + int j = 0; + while (l[i].field_list[j].field_name) + { + char *loc; + if ((loc = strstr(l[i].field_list[j].field_type, orig))) + { + if (repl) + { + /* replace 'orig' with 'repl' */ + char *old = (char *)l[i].field_list[j].field_type; + char *new = + malloc(strlen(old) - strlen(orig) + strlen(repl) + 1); + strncpy(new, old, loc - old); + new[loc - old] = 0; + strcat(new, repl); + strcat(new, loc + strlen(orig)); + free(old); + l[i].field_list[j].field_type = new; + l[i].field_list[j].field_size = repl_size; + } + else + { + /* remove list item with 'orig' Move higher elements down 1 + */ + int index = j; + while (l[i].field_list[index].field_name != NULL) + { + l[i].field_list[index] = l[i].field_list[index + 1]; + } + j--; /* we've replaced this element, make sure we process + the one we replaced it with */ + } + } + j++; + } + i++; + } +} + +/* + * generated a combined FMStructDescList from separate top-level, cp and dp + * formats + * the format names/sizes "CP_STRUCT" and "DP_STRUCT" used in top-level field + * lists are replaced by + * the actual names/sizes provided. + */ +static FMStructDescList combineCpDpFormats(FMStructDescList top, + FMStructDescList cp, + FMStructDescList dp) +{ + FMStructDescList CombinedFormats = NULL; + int i = 0, topCount = 0, cpCount = 0, dpCount = 0; + CombinedFormats = FMcopy_struct_list(top); + + i = 0; + while (top[i++].format_name) + topCount++; + + i = 0; + while (cp && cp[i++].format_name) + cpCount++; + + i = 0; + while (dp && dp[i++].format_name) + dpCount++; + + CombinedFormats = + realloc(CombinedFormats, sizeof(CombinedFormats[0]) * + (topCount + cpCount + dpCount + 1)); + for (i = 0; i < cpCount; i++) + { + CombinedFormats[topCount + i].format_name = strdup(cp[i].format_name); + CombinedFormats[topCount + i].field_list = + copy_field_list(cp[i].field_list); + CombinedFormats[topCount + i].struct_size = cp[i].struct_size; + CombinedFormats[topCount + i].opt_info = NULL; + } + + for (i = 0; i < dpCount; i++) + { + CombinedFormats[topCount + cpCount + i].format_name = + strdup(dp[i].format_name); + CombinedFormats[topCount + cpCount + i].field_list = + copy_field_list(dp[i].field_list); + CombinedFormats[topCount + cpCount + i].struct_size = dp[i].struct_size; + CombinedFormats[topCount + cpCount + i].opt_info = NULL; + } + CombinedFormats[topCount + cpCount + dpCount].format_name = NULL; + CombinedFormats[topCount + cpCount + dpCount].field_list = NULL; + CombinedFormats[topCount + cpCount + dpCount].struct_size = 0; + CombinedFormats[topCount + cpCount + dpCount].opt_info = NULL; + + replaceFormatNameInFieldList(CombinedFormats, "CP_STRUCT", + cp ? cp[0].format_name : NULL, + cp ? cp[0].struct_size : 0); + replaceFormatNameInFieldList(CombinedFormats, "DP_STRUCT", + dp ? dp[0].format_name : NULL, + dp ? dp[0].struct_size : 0); + return CombinedFormats; +} + +void **CP_consolidateDataToRankZero(SstStream Stream, void *LocalInfo, + FFSTypeHandle Type, void **RetDataBlock) +{ + FFSBuffer Buf = create_FFSBuffer(); + int DataSize; + int *RecvCounts = NULL; + char *Buffer; + + struct _CP_DP_init_info **Pointers = NULL; + + Buffer = FFSencode(Buf, FMFormat_of_original(Type), LocalInfo, &DataSize); + + if (Stream->Rank == 0) + { + RecvCounts = malloc(Stream->CohortSize * sizeof(int)); + } + MPI_Gather(&DataSize, 1, MPI_INT, RecvCounts, 1, MPI_INT, 0, + MPI_COMM_WORLD); + + /* + * Figure out the total length of block + * and displacements for each rank + */ + + int *Displs = NULL; + char *RecvBuffer = NULL; + + if (Stream->Rank == 0) + { + int TotalLen = 0; + Displs = malloc(Stream->CohortSize * sizeof(int)); + + Displs[0] = 0; + TotalLen = (RecvCounts[0] + 7) & ~7; + + for (int i = 1; i < Stream->CohortSize; i++) + { + int RoundUp = (RecvCounts[i] + 7) & ~7; + Displs[i] = TotalLen; + TotalLen += RoundUp; + } + + RecvBuffer = malloc(TotalLen * sizeof(char)); + } + + /* + * Now we have the receive buffer, counts, and displacements, and + * can gather the data + */ + + MPI_Gatherv(Buffer, DataSize, MPI_CHAR, RecvBuffer, RecvCounts, Displs, + MPI_CHAR, 0, MPI_COMM_WORLD); + + if (Stream->Rank == 0) + { + FFSContext context = Stream->CPInfo->ffs_c; + // FFSTypeHandle ffs_type = FFSTypeHandle_from_encode(context, + // RecvBuffer); + + int i; + Pointers = malloc(Stream->CohortSize * sizeof(Pointers[0])); + for (i = 0; i < Stream->CohortSize; i++) + { + FFSdecode_in_place(context, RecvBuffer + Displs[i], + (void **)&Pointers[i]); + // printf("Decode for rank %d :\n", i); + // FMdump_data(FMFormat_of_original(ffs_type), Pointers[i], + // 1024000); + } + free(Displs); + free(RecvCounts); + } + *RetDataBlock = RecvBuffer; + return (void **)Pointers; +} + +void *CP_distributeDataFromRankZero(SstStream Stream, void *root_info, + FFSTypeHandle Type, void **RetDataBlock) +{ + int DataSize; + char *Buffer; + void *RetVal; + + if (Stream->Rank == 0) + { + FFSBuffer Buf = create_FFSBuffer(); + char *tmp = + FFSencode(Buf, FMFormat_of_original(Type), root_info, &DataSize); + MPI_Bcast(&DataSize, 1, MPI_INT, 0, MPI_COMM_WORLD); + MPI_Bcast(tmp, DataSize, MPI_CHAR, 0, MPI_COMM_WORLD); + Buffer = malloc(DataSize); + memcpy(Buffer, tmp, DataSize); + free_FFSBuffer(Buf); + } + else + { + MPI_Bcast(&DataSize, 1, MPI_INT, 0, MPI_COMM_WORLD); + Buffer = malloc(DataSize); + MPI_Bcast(Buffer, DataSize, MPI_CHAR, 0, MPI_COMM_WORLD); + } + + FFSContext context = Stream->CPInfo->ffs_c; + // FFSTypeHandle ffs_type = FFSTypeHandle_from_encode(context, Buffer); + + FFSdecode_in_place(context, Buffer, &RetVal); + // printf("Decode for rank %d is : \n", Stream->rank); + // FMdump_data(FMFormat_of_original(ffs_type), RetVal, 1024000); + *RetDataBlock = Buffer; + return RetVal; +} + +void **CP_consolidateDataToAll(SstStream Stream, void *LocalInfo, + FFSTypeHandle Type, void **RetDataBlock) +{ + FFSBuffer Buf = create_FFSBuffer(); + int DataSize; + int *RecvCounts = NULL; + char *Buffer; + + struct _CP_DP_init_info **Pointers = NULL; + + Buffer = FFSencode(Buf, FMFormat_of_original(Type), LocalInfo, &DataSize); + + RecvCounts = malloc(Stream->CohortSize * sizeof(int)); + + MPI_Allgather(&DataSize, 1, MPI_INT, RecvCounts, 1, MPI_INT, + MPI_COMM_WORLD); + + /* + * Figure out the total length of block + * and displacements for each rank + */ + + int *Displs = NULL; + char *RecvBuffer = NULL; + int i; + + int TotalLen = 0; + Displs = malloc(Stream->CohortSize * sizeof(int)); + + Displs[0] = 0; + TotalLen = (RecvCounts[0] + 7) & ~7; + + for (i = 1; i < Stream->CohortSize; i++) + { + int round_up = (RecvCounts[i] + 7) & ~7; + Displs[i] = TotalLen; + TotalLen += round_up; + } + + RecvBuffer = malloc(TotalLen * sizeof(char)); + + /* + * Now we have the receive Buffer, counts, and displacements, and + * can gather the data + */ + + MPI_Allgatherv(Buffer, DataSize, MPI_CHAR, RecvBuffer, RecvCounts, Displs, + MPI_CHAR, MPI_COMM_WORLD); + + FFSContext context = Stream->CPInfo->ffs_c; + + Pointers = malloc(Stream->CohortSize * sizeof(Pointers[0])); + for (i = 0; i < Stream->CohortSize; i++) + { + FFSdecode_in_place(context, RecvBuffer + Displs[i], + (void **)&Pointers[i]); + // FFSTypeHandle ffs_type = FFSTypeHandle_from_encode(context, + // RecvBuffer); + // printf("Decode for rank %d :\n", i); + // FMdump_data(FMFormat_of_original(ffs_type), Pointers[i], + // 1024000); + } + free(Displs); + free(RecvCounts); + + *RetDataBlock = RecvBuffer; + return (void **)Pointers; +} + +atom_t CM_TRANSPORT_ATOM = 0; + +static void initAtomList() +{ + if (CM_TRANSPORT_ATOM) + return; + + CM_TRANSPORT_ATOM = attr_atom_from_string("CM_TRANSPORT"); +} + +static void doFormatRegistration(CP_GlobalInfo CPInfo, CP_DP_Interface DPInfo) +{ + FMStructDescList PerRankReaderStructs, FullReaderRegisterStructs, + CombinedReaderStructs; + FMStructDescList PerRankWriterStructs, FullWriterResponseStructs, + CombinedWriterStructs; + FMStructDescList CombinedMetadataStructs, CombinedTimestepMetadataStructs; + FMFormat f; + + PerRankReaderStructs = combineCpDpFormats( + CP_DP_PairStructs, CP_ReaderInitStructs, DPInfo->ReaderContactFormats); + f = FMregister_data_format(CPInfo->fm_c, PerRankReaderStructs); + CPInfo->PerRankReaderInfoFormat = + FFSTypeHandle_by_index(CPInfo->ffs_c, FMformat_index(f)); + FFSset_fixed_target(CPInfo->ffs_c, PerRankReaderStructs); + + FullReaderRegisterStructs = + combineCpDpFormats(CP_ReaderRegisterStructs, CP_ReaderInitStructs, + DPInfo->ReaderContactFormats); + CPInfo->ReaderRegisterFormat = + CMregister_format(CPInfo->cm, FullReaderRegisterStructs); + CMregister_handler(CPInfo->ReaderRegisterFormat, CP_ReaderRegisterHandler, + NULL); + + CombinedReaderStructs = + combineCpDpFormats(CP_DP_ReaderArrayStructs, CP_ReaderInitStructs, + DPInfo->ReaderContactFormats); + f = FMregister_data_format(CPInfo->fm_c, CombinedReaderStructs); + CPInfo->CombinedReaderInfoFormat = + FFSTypeHandle_by_index(CPInfo->ffs_c, FMformat_index(f)); + FFSset_fixed_target(CPInfo->ffs_c, CombinedReaderStructs); + + PerRankWriterStructs = + combineCpDpFormats(CP_DP_WriterPairStructs, CP_WriterInitStructs, + DPInfo->WriterContactFormats); + f = FMregister_data_format(CPInfo->fm_c, PerRankWriterStructs); + CPInfo->PerRankWriterInfoFormat = + FFSTypeHandle_by_index(CPInfo->ffs_c, FMformat_index(f)); + FFSset_fixed_target(CPInfo->ffs_c, PerRankWriterStructs); + + FullWriterResponseStructs = + combineCpDpFormats(CP_WriterResponseStructs, CP_WriterInitStructs, + DPInfo->WriterContactFormats); + CPInfo->WriterResponseFormat = + CMregister_format(CPInfo->cm, FullWriterResponseStructs); + CMregister_handler(CPInfo->WriterResponseFormat, CP_WriterResponseHandler, + NULL); + + CombinedWriterStructs = + combineCpDpFormats(CP_DP_WriterArrayStructs, CP_WriterInitStructs, + DPInfo->WriterContactFormats); + f = FMregister_data_format(CPInfo->fm_c, CombinedWriterStructs); + CPInfo->CombinedWriterInfoFormat = + FFSTypeHandle_by_index(CPInfo->ffs_c, FMformat_index(f)); + FFSset_fixed_target(CPInfo->ffs_c, CombinedWriterStructs); + + CombinedMetadataStructs = combineCpDpFormats( + MetaDataPlusDPInfoStructs, NULL, DPInfo->TimestepInfoFormats); + f = FMregister_data_format(CPInfo->fm_c, CombinedMetadataStructs); + CPInfo->PerRankMetadataFormat = + FFSTypeHandle_by_index(CPInfo->ffs_c, FMformat_index(f)); + FFSset_fixed_target(CPInfo->ffs_c, CombinedMetadataStructs); + + CombinedTimestepMetadataStructs = combineCpDpFormats( + TimestepMetadataStructs, NULL, DPInfo->TimestepInfoFormats); + CPInfo->DeliverTimestepMetadataFormat = + CMregister_format(CPInfo->cm, CombinedTimestepMetadataStructs); + CMregister_handler(CPInfo->DeliverTimestepMetadataFormat, + CP_TimestepMetadataHandler, NULL); + + CPInfo->ReaderActivateFormat = CMregister_simple_format( + CPInfo->cm, "ReaderActivate", ReaderActivateList, + sizeof(struct _ReaderActivateMsg)); + CMregister_handler(CPInfo->ReaderActivateFormat, CP_ReaderActivateHandler, + NULL); + CPInfo->ReleaseTimestepFormat = CMregister_simple_format( + CPInfo->cm, "ReleaseTimestep", ReleaseTimestepList, + sizeof(struct _ReleaseTimestepMsg)); + CMregister_handler(CPInfo->ReleaseTimestepFormat, CP_ReleaseTimestepHandler, + NULL); + CPInfo->WriterCloseFormat = + CMregister_simple_format(CPInfo->cm, "WriterClose", WriterCloseList, + sizeof(struct _WriterCloseMsg)); + CMregister_handler(CPInfo->WriterCloseFormat, CP_WriterCloseHandler, NULL); +} + +extern CP_GlobalInfo CP_getCPInfo(CP_DP_Interface DPInfo) +{ + static CP_GlobalInfo CPInfo = NULL; + + if (CPInfo) + return CPInfo; + + initAtomList(); + + CPInfo = malloc(sizeof(*CPInfo)); + memset(CPInfo, 0, sizeof(*CPInfo)); + + CPInfo->cm = CManager_create(); + CMfork_comm_thread(CPInfo->cm); + + attr_list listen_list = create_attr_list(); + set_string_attr(listen_list, CM_TRANSPORT_ATOM, strdup("enet")); + CMlisten_specific(CPInfo->cm, listen_list); + free_attr_list(listen_list); + + CPInfo->fm_c = CMget_FMcontext(CPInfo->cm); + CPInfo->ffs_c = create_FFSContext_FM(CPInfo->fm_c); + + doFormatRegistration(CPInfo, DPInfo); + + return CPInfo; +} + +SstStream CP_newStream() +{ + SstStream Stream = malloc(sizeof(*Stream)); + memset(Stream, 0, sizeof(*Stream)); + pthread_mutex_init(&Stream->DataLock, NULL); + pthread_cond_init(&Stream->DataCondition, NULL); + if (getenv("SstVerbose")) + { + Stream->Verbose = 1; + } + else + { + Stream->Verbose = 0; + } + return Stream; +} diff --git a/source/adios2/toolkit/sst/cp/cp_internal.h b/source/adios2/toolkit/sst/cp/cp_internal.h new file mode 100644 index 0000000000000000000000000000000000000000..9170eb88d9713719314015da3f58289e94b7d934 --- /dev/null +++ b/source/adios2/toolkit/sst/cp/cp_internal.h @@ -0,0 +1,296 @@ +#include "dp_interface.h" +#include <pthread.h> + +typedef struct _CP_GlobalInfo +{ + /* exchange info */ + CManager cm; + FFSContext ffs_c; + FMContext fm_c; + FFSTypeHandle PerRankReaderInfoFormat; + FFSTypeHandle CombinedReaderInfoFormat; + CMFormat ReaderRegisterFormat; + FFSTypeHandle PerRankWriterInfoFormat; + FFSTypeHandle CombinedWriterInfoFormat; + CMFormat WriterResponseFormat; + FFSTypeHandle PerRankMetadataFormat; + CMFormat DeliverTimestepMetadataFormat; + CMFormat ReaderActivateFormat; + CMFormat ReleaseTimestepFormat; + CMFormat WriterCloseFormat; +} * CP_GlobalInfo; + +struct _ReaderRegisterMsg; + +typedef struct _RequestQueue +{ + struct _ReaderRegisterMsg *Msg; + CMConnection Conn; + struct _RequestQueue *Next; +} * RequestQueue; + +typedef struct _CP_PeerConnection +{ + attr_list ContactList; + void *RemoteStreamID; + CMConnection CMconn; +} CP_PeerConnection; + +enum StreamStatus +{ + NotOpen = 0, + Established, + PeerClosed, + PeerFailed, + Closed +}; + +typedef struct _WS_ReaderInfo +{ + SstStream ParentStream; + enum StreamStatus ReaderStatus; + void *DP_WSR_Stream; + void *RS_StreamID; + int ReaderCohortSize; + int *Peers; + CP_PeerConnection *Connections; +} * WS_ReaderInfo; + +typedef struct _TimestepMetadataList +{ + struct _TimestepMetadataMsg *MetadataMsg; + struct _TimestepMetadataList *Next; +} * TSMetadataList; + +enum StreamRole +{ + ReaderRole, + WriterRole +}; + +typedef struct _CPTimestepEntry +{ + long Timestep; + SstData Data; + void **DP_TimestepInfo; + SstMetadata *MetadataArray; + struct _CPTimestepEntry *Next; +} * CPTimestepList; + +struct _SstStream +{ + CP_GlobalInfo CPInfo; + + MPI_Comm mpiComm; + enum StreamRole Role; + + /* params */ + int WaitForFirstReader; + + /* state */ + int Verbose; + double OpenTimeSecs; + struct timeval ValidStartTime; + SstStats Stats; + + /* MPI info */ + int Rank; + int CohortSize; + + CP_DP_Interface DP_Interface; + void *DP_Stream; + + pthread_mutex_t DataLock; + pthread_cond_t DataCondition; + + /* WRITER-SIDE FIELDS */ + int WriterTimestep; + CPTimestepList QueuedTimesteps; + int QueuedTimestepCount; + int LastProvidedTimestep; + + /* rendezvous condition */ + int FirstReaderCondition; + RequestQueue ReadRequestQueue; + + int ReaderCount; + WS_ReaderInfo *Readers; + + /* READER-SIDE FIELDS */ + struct _TimestepMetadataList *Timesteps; + int WriterCohortSize; + int *Peers; + CP_PeerConnection *ConnectionsToWriter; + enum StreamStatus Status; + int FinalTimestep; + int CurrentWorkingTimestep; +}; + +/* + * This is the baseline contact information for each reader-side rank. + * It will be gathered and provided to writer ranks + */ +typedef struct _CP_ReaderInitInfo +{ + char *ContactInfo; + void *ReaderID; +} * CP_ReaderInitInfo; + +/* + * This is the structure that holds reader_side CP and DP contact info for a + * single rank. + * This is gathered on reader side. + */ +struct _CP_DP_PairInfo +{ + void **CP_Info; + void **DP_Info; +}; + +/* + * This is the structure that holds local metadata and the DP info related to + * it. + * This is gathered on writer side before distribution to readers. + */ +struct _MetadataPlusDPInfo +{ + SstMetadata Metadata; + void *DP_TimestepInfo; +}; + +/* + * Reader register messages are sent from reader rank 0 to writer rank 0 + * They contain basic info, plus contact information for each reader rank + */ +struct _ReaderRegisterMsg +{ + void *WriterFile; + int WriterResponseCondition; + int ReaderCohortSize; + CP_ReaderInitInfo *CP_ReaderInfo; + void **DP_ReaderInfo; +}; + +/* + * This is the consolidated reader contact info structure that is used to + * diseminate full reader contact information to all writer ranks + */ +typedef struct _CombinedReaderInfo +{ + int ReaderCohortSize; + CP_ReaderInitInfo *CP_ReaderInfo; + void **DP_ReaderInfo; +} * reader_data_t; + +/* + * This is the baseline contact information for each writer-side rank. + * It will be gathered and provided to reader ranks + */ +typedef struct _CP_WriterInitInfo +{ + char *ContactInfo; + void *WriterID; +} * CP_WriterInitInfo; + +/* + * Writer response messages from writer rank 0 to reader rank 0 after the + * initial contact request. + * They contain basic info, plus contact information for each reader rank + */ +struct _WriterResponseMsg +{ + int WriterResponseCondition; + int WriterCohortSize; + CP_WriterInitInfo *CP_WriterInfo; + void **DP_WriterInfo; +}; + +/* + * The ReaderActivate message informs the writer that this reader is now ready + * to receive data/timesteps. + * One is sent to each writer rank. + */ +struct _ReaderActivateMsg +{ + void *WSR_Stream; +}; + +/* + * The timestepMetadata message carries the metadata from all writer ranks. + * One is sent to each reader. + */ +struct _TimestepMetadataMsg +{ + void *RS_Stream; + int Timestep; + int CohortSize; + SstMetadata *Metadata; + void **DP_TimestepInfo; +}; + +/* + * The ReleaseTimestep message informs the writers that this reader is done with + * a particular timestep. + * One is sent to each writer rank. + */ +struct _ReleaseTimestepMsg +{ + void *WSR_Stream; + int Timestep; +}; + +/* + * The WriterClose message informs the readers that the writer is beginning an + * orderly shutdown + * of the stream. Data will still be served, but no new timesteps will be + * forthcoming. + * One is sent to each reader rank. + */ +typedef struct _WriterCloseMsg +{ + void *RS_Stream; + int FinalTimestep; +} * WriterCloseMsg; + +/* + * This is the consolidated writer contact info structure that is used to + * diseminate full writer contact information to all reader ranks + */ +typedef struct _CombinedWriterInfo +{ + int WriterCohortSize; + CP_WriterInitInfo *CP_WriterInfo; + void **DP_WriterInfo; +} * writer_data_t; + +typedef struct _MetadataPlusDPInfo *MetadataPlusDPInfo; + +extern atom_t CM_TRANSPORT_ATOM; + +void CP_parseParams(SstStream stream, const char *params); +extern CP_GlobalInfo CP_getCPInfo(CP_DP_Interface DPInfo); +extern SstStream CP_newStream(); + +void **CP_consolidateDataToRankZero(SstStream stream, void *local_info, + FFSTypeHandle type, void **ret_data_block); +void **CP_consolidateDataToAll(SstStream stream, void *local_info, + FFSTypeHandle type, void **ret_data_block); +void *CP_distributeDataFromRankZero(SstStream stream, void *root_info, + FFSTypeHandle type, void **ret_data_block); +extern void CP_ReaderRegisterHandler(CManager cm, CMConnection conn, + void *msg_v, void *client_data, + attr_list attrs); +extern void CP_WriterResponseHandler(CManager cm, CMConnection conn, + void *msg_v, void *client_data, + attr_list attrs); +extern void CP_ReaderActivateHandler(CManager cm, CMConnection conn, + void *msg_v, void *client_data, + attr_list attrs); +extern void CP_TimestepMetadataHandler(CManager cm, CMConnection conn, + void *msg_v, void *client_data, + attr_list attrs); +extern void CP_ReleaseTimestepHandler(CManager cm, CMConnection conn, + void *msg_v, void *client_data, + attr_list attrs); +extern void CP_WriterCloseHandler(CManager cm, CMConnection conn, void *msg_v, + void *client_data, attr_list attrs); diff --git a/source/adios2/toolkit/sst/dp/CMakeLists.txt b/source/adios2/toolkit/sst/dp/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..6829fa114ea1a2a08d940609c724ce32bcbc4954 --- /dev/null +++ b/source/adios2/toolkit/sst/dp/CMakeLists.txt @@ -0,0 +1,2 @@ +target_sources(sst PRIVATE "${CMAKE_CURRENT_LIST_DIR}/dp.c" + "${CMAKE_CURRENT_LIST_DIR}/dummy_dp.c") diff --git a/source/adios2/toolkit/sst/dp/dp.c b/source/adios2/toolkit/sst/dp/dp.c new file mode 100644 index 0000000000000000000000000000000000000000..1add67ab5b8add719cad5be1d6db322f8da97e24 --- /dev/null +++ b/source/adios2/toolkit/sst/dp/dp.c @@ -0,0 +1,25 @@ +#include <stdio.h> +#include <string.h> + +#include <evpath.h> +#include <fm.h> +#include <mpi.h> + +#include "sst_data.h" + +#include "dp_interface.h" + +extern CP_DP_Interface LoadDummyDP(); + +CP_DP_Interface LoadDP(char *dp_name) +{ + if (strcmp(dp_name, "dummy") == 0) + { + return LoadDummyDP(); + } + else + { + fprintf(stderr, "Unknown DP interface %s, load failed\n", dp_name); + return NULL; + } +} diff --git a/source/adios2/toolkit/sst/dp/dummy_dp.c b/source/adios2/toolkit/sst/dp/dummy_dp.c new file mode 100644 index 0000000000000000000000000000000000000000..2203aa42b56b6501abed17d81e11d1a3303353fb --- /dev/null +++ b/source/adios2/toolkit/sst/dp/dummy_dp.c @@ -0,0 +1,598 @@ +#include <assert.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +#include <atl.h> +#include <evpath.h> +#include <mpi.h> + +#include "sst_data.h" + +#include "dp_interface.h" + +/* + * Some conventions: + * `RS` indicates a reader-side item. + * `WS` indicates a writer-side item. + * `WSR` indicates a writer-side per-reader item. + * + * We keep different "stream" structures for the reader side and for the + * writer side. On the writer side, there's actually a "stream" + * per-connected-reader (a WSR_Stream), with the idea that some (many?) + * RDMA transports will require connections/pairing, so we'd need to track + * resources per reader. + * + * Generally the 'contact information' we exchange at init time includes + * the address of the local 'stream' data structure. This address isn't + * particularly useful to the far side, but it can be returned with + * requests to indicate what resource is targeted. For example, when a + * remote memory read request arrives at the writer from the reader, it + * includes the WSR_Stream value that is the address of the writer-side + * per-reader data structure. Upon message arrival, we just cast that + * value back into a pointer. + * + * By design, neither the data plane nor the control plane reference the + * other's symbols directly. The interface between the control plane and + * the data plane is represented by the types and structures defined in + * dp_interface.h and is a set of function pointers and FFS-style + * descriptions of the data structures to be communicated at init time. + * This allows for the future possibility of loading planes at run-time, etc. + * + * This "dummy" data plane uses control plane functionality to implement + * the ReadRemoteMemory functionality. That is, it both the request to + * read memory and the response which carries the data are actually + * accomplished using the connections and message delivery facilities of + * the control plane, made available here via CP_Services. A real data + * plane would replace one or both of these with RDMA functionality. + */ + +typedef struct _Dummy_RS_Stream +{ + CManager cm; + void *CP_Stream; + CMFormat ReadRequestFormat; + int Rank; + + /* writer info */ + int WriterCohortSize; + CP_PeerCohort PeerCohort; + struct _DummyWriterContactInfo *WriterContactInfo; +} * Dummy_RS_Stream; + +typedef struct _Dummy_WSR_Stream +{ + struct _Dummy_WS_Stream *WS_Stream; + CP_PeerCohort PeerCohort; + int ReaderCohortSize; + struct _DummyReaderContactInfo *ReaderContactInfo; +} * Dummy_WSR_Stream; + +typedef struct _TimestepEntry +{ + long Timestep; + struct _SstData *Data; + struct _DummyPerTimestepInfo *DP_TimestepInfo; + struct _TimestepEntry *Next; + +} * TimestepList; + +typedef struct _Dummy_WS_Stream +{ + CManager cm; + void *CP_Stream; + int Rank; + + TimestepList Timesteps; + CMFormat ReadReplyFormat; + + int ReaderCount; + Dummy_WSR_Stream *Readers; +} * Dummy_WS_Stream; + +typedef struct _DummyReaderContactInfo +{ + char *ContactString; + void *RS_Stream; +} * DummyReaderContactInfo; + +typedef struct _DummyWriterContactInfo +{ + char *ContactString; + void *WS_Stream; +} * DummyWriterContactInfo; + +typedef struct _DummyReadRequestMsg +{ + long Timestep; + size_t Offset; + size_t Length; + void *WS_Stream; + void *RS_Stream; + int RequestingRank; + int NotifyCondition; +} * DummyReadRequestMsg; + +static FMField DummyReadRequestList[] = { + {"Timestep", "integer", sizeof(long), + FMOffset(DummyReadRequestMsg, Timestep)}, + {"Offset", "integer", sizeof(size_t), + FMOffset(DummyReadRequestMsg, Offset)}, + {"Length", "integer", sizeof(size_t), + FMOffset(DummyReadRequestMsg, Length)}, + {"WS_Stream", "integer", sizeof(void *), + FMOffset(DummyReadRequestMsg, WS_Stream)}, + {"RS_Stream", "integer", sizeof(void *), + FMOffset(DummyReadRequestMsg, RS_Stream)}, + {"RequestingRank", "integer", sizeof(int), + FMOffset(DummyReadRequestMsg, RequestingRank)}, + {"NotifyCondition", "integer", sizeof(int), + FMOffset(DummyReadRequestMsg, NotifyCondition)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec DummyReadRequestStructs[] = { + {"DummyReadRequest", DummyReadRequestList, + sizeof(struct _DummyReadRequestMsg), NULL}, + {NULL, NULL, 0, NULL}}; + +typedef struct _DummyReadReplyMsg +{ + long Timestep; + size_t DataLength; + void *RS_Stream; + char *Data; + int NotifyCondition; +} * DummyReadReplyMsg; + +static FMField DummyReadReplyList[] = { + {"Timestep", "integer", sizeof(long), + FMOffset(DummyReadReplyMsg, Timestep)}, + {"RS_Stream", "integer", sizeof(void *), + FMOffset(DummyReadReplyMsg, RS_Stream)}, + {"DataLength", "integer", sizeof(size_t), + FMOffset(DummyReadReplyMsg, DataLength)}, + {"Data", "char[DataLength]", sizeof(char), + FMOffset(DummyReadReplyMsg, Data)}, + {"NotifyCondition", "integer", sizeof(int), + FMOffset(DummyReadReplyMsg, NotifyCondition)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec DummyReadReplyStructs[] = { + {"DummyReadReply", DummyReadReplyList, sizeof(struct _DummyReadReplyMsg), + NULL}, + {NULL, NULL, 0, NULL}}; + +static void DummyReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, + void *client_Data, attr_list attrs); + +static DP_RS_Stream DummyInitReader(CP_Services Svcs, void *CP_Stream, + void **ReaderContactInfoPtr) +{ + Dummy_RS_Stream Stream = malloc(sizeof(struct _Dummy_RS_Stream)); + DummyReaderContactInfo Contact = + malloc(sizeof(struct _DummyReaderContactInfo)); + CManager cm = Svcs->getCManager(CP_Stream); + char *DummyContactString = malloc(64); + MPI_Comm comm = Svcs->getMPIComm(CP_Stream); + CMFormat F; + + memset(Stream, 0, sizeof(*Stream)); + memset(Contact, 0, sizeof(*Contact)); + + /* + * save the CP_stream value of later use + */ + Stream->CP_Stream = CP_Stream; + + MPI_Comm_rank(comm, &Stream->Rank); + + sprintf(DummyContactString, "Reader Rank %d, test contact", Stream->Rank); + + /* + * add a handler for read reply messages + */ + Stream->ReadRequestFormat = CMregister_format(cm, DummyReadRequestStructs); + F = CMregister_format(cm, DummyReadReplyStructs); + CMregister_handler(F, DummyReadReplyHandler, Svcs); + + Contact->ContactString = DummyContactString; + Contact->RS_Stream = Stream; + + *ReaderContactInfoPtr = Contact; + + return Stream; +} + +static void DummyReadRequestHandler(CManager cm, CMConnection conn, void *msg_v, + void *client_Data, attr_list attrs) +{ + DummyReadRequestMsg ReadRequestMsg = (DummyReadRequestMsg)msg_v; + Dummy_WSR_Stream WSR_Stream = ReadRequestMsg->WS_Stream; + + Dummy_WS_Stream WS_Stream = WSR_Stream->WS_Stream; + TimestepList tmp = WS_Stream->Timesteps; + CP_Services Svcs = (CP_Services)client_Data; + + Svcs->verbose(WS_Stream->CP_Stream, "Got a request to read remote memory " + "from reader rank %d: timestep %d, " + "offset %d, length %d\n", + ReadRequestMsg->RequestingRank, ReadRequestMsg->Timestep, + ReadRequestMsg->Offset, ReadRequestMsg->Length); + while (tmp != NULL) + { + if (tmp->Timestep == ReadRequestMsg->Timestep) + { + struct _DummyReadReplyMsg ReadReplyMsg; + /* memset avoids uninit byte warnings from valgrind */ + memset(&ReadReplyMsg, 0, sizeof(ReadReplyMsg)); + ReadReplyMsg.Timestep = ReadRequestMsg->Timestep; + ReadReplyMsg.DataLength = ReadRequestMsg->Length; + ReadReplyMsg.Data = tmp->Data->block + ReadRequestMsg->Offset; + ReadReplyMsg.RS_Stream = ReadRequestMsg->RS_Stream; + ReadReplyMsg.NotifyCondition = ReadRequestMsg->NotifyCondition; + Svcs->verbose( + WS_Stream->CP_Stream, + "Sending a reply to reader rank %d for remote memory read\n", + ReadRequestMsg->RequestingRank); + Svcs->sendToPeer(WS_Stream->CP_Stream, WSR_Stream->PeerCohort, + ReadRequestMsg->RequestingRank, + WS_Stream->ReadReplyFormat, &ReadReplyMsg); + return; + } + tmp = tmp->Next; + } + /* + * Shouldn't ever get here because we should never get a request for a + * timestep that we don't have. + */ + fprintf(stderr, "Failed to read Timestep %ld, not found\n", + ReadRequestMsg->Timestep); + /* + * in the interest of not failing a writer on a reader failure, don't + * assert(0) here. Probably this sort of error should close the link to + * a reader though. + */ +} + +typedef struct _DummyCompletionHandle +{ + int CMcondition; + CManager cm; + void *CPStream; + void *Buffer; + int Rank; +} * DummyCompletionHandle; + +static void DummyReadReplyHandler(CManager cm, CMConnection conn, void *msg_v, + void *client_Data, attr_list attrs) +{ + DummyReadReplyMsg ReadReplyMsg = (DummyReadReplyMsg)msg_v; + Dummy_RS_Stream RS_Stream = ReadReplyMsg->RS_Stream; + CP_Services Svcs = (CP_Services)client_Data; + DummyCompletionHandle Handle = + CMCondition_get_client_data(cm, ReadReplyMsg->NotifyCondition); + + Svcs->verbose( + RS_Stream->CP_Stream, + "Got a reply to remote memory read from rank %d, condition is %d\n", + Handle->Rank, ReadReplyMsg->NotifyCondition); + + /* + * `Handle` contains the full request info and is `client_data` + * associated with the CMCondition. Once we get it, copy the incoming + * data to the buffer area given by the request + */ + memcpy(Handle->Buffer, ReadReplyMsg->Data, ReadReplyMsg->DataLength); + + /* + * Signal the condition to wake the reader if they are waiting. + */ + CMCondition_signal(cm, ReadReplyMsg->NotifyCondition); +} + +static DP_WS_Stream DummyInitWriter(CP_Services Svcs, void *CP_Stream) +{ + Dummy_WS_Stream Stream = malloc(sizeof(struct _Dummy_WS_Stream)); + CManager cm = Svcs->getCManager(CP_Stream); + MPI_Comm comm = Svcs->getMPIComm(CP_Stream); + CMFormat F; + + memset(Stream, 0, sizeof(struct _Dummy_WS_Stream)); + + MPI_Comm_rank(comm, &Stream->Rank); + + /* + * save the CP_stream value of later use + */ + Stream->CP_Stream = CP_Stream; + + /* + * add a handler for read request messages + */ + F = CMregister_format(cm, DummyReadRequestStructs); + CMregister_handler(F, DummyReadRequestHandler, Svcs); + + /* + * register read reply message structure so we can send later + */ + Stream->ReadReplyFormat = CMregister_format(cm, DummyReadReplyStructs); + + return (void *)Stream; +} + +static DP_WSR_Stream DummyInitWriterPerReader(CP_Services Svcs, + DP_WS_Stream WS_Stream_v, + int readerCohortSize, + CP_PeerCohort PeerCohort, + void **providedReaderInfo_v, + void **WriterContactInfoPtr) +{ + Dummy_WS_Stream WS_Stream = (Dummy_WS_Stream)WS_Stream_v; + Dummy_WSR_Stream WSR_Stream = malloc(sizeof(*WSR_Stream)); + DummyWriterContactInfo ContactInfo; + MPI_Comm comm = Svcs->getMPIComm(WS_Stream->CP_Stream); + int Rank; + char *DummyContactString = malloc(64); + DummyReaderContactInfo *providedReaderInfo = + (DummyReaderContactInfo *)providedReaderInfo_v; + + MPI_Comm_rank(comm, &Rank); + sprintf(DummyContactString, "Writer Rank %d, test contact", Rank); + + WSR_Stream->WS_Stream = WS_Stream; /* pointer to writer struct */ + WSR_Stream->PeerCohort = PeerCohort; + + /* + * make a copy of writer contact information (original will not be + * preserved) + */ + WSR_Stream->ReaderContactInfo = + malloc(sizeof(struct _DummyReaderContactInfo) * readerCohortSize); + for (int i = 0; i < readerCohortSize; i++) + { + WSR_Stream->ReaderContactInfo[i].ContactString = + strdup(providedReaderInfo[i]->ContactString); + WSR_Stream->ReaderContactInfo[i].RS_Stream = + providedReaderInfo[i]->RS_Stream; + Svcs->verbose( + WS_Stream->CP_Stream, + "Received contact info \"%s\", RD_Stream %p for Reader Rank %d\n", + WSR_Stream->ReaderContactInfo[i].ContactString, + WSR_Stream->ReaderContactInfo[i].RS_Stream, i); + } + + /* + * add this writer-side reader-specific stream to the parent writer stream + * structure + */ + WS_Stream->Readers = realloc( + WS_Stream->Readers, sizeof(*WSR_Stream) * (WS_Stream->ReaderCount + 1)); + WS_Stream->Readers[WS_Stream->ReaderCount] = WSR_Stream; + WS_Stream->ReaderCount++; + + ContactInfo = malloc(sizeof(struct _DummyWriterContactInfo)); + memset(ContactInfo, 0, sizeof(struct _DummyWriterContactInfo)); + ContactInfo->ContactString = DummyContactString; + ContactInfo->WS_Stream = WSR_Stream; + *WriterContactInfoPtr = ContactInfo; + + return WSR_Stream; +} + +static void DummyProvideWriterDataToReader(CP_Services Svcs, + DP_RS_Stream RS_Stream_v, + int writerCohortSize, + CP_PeerCohort PeerCohort, + void **providedWriterInfo_v) +{ + Dummy_RS_Stream RS_Stream = (Dummy_RS_Stream)RS_Stream_v; + DummyWriterContactInfo *providedWriterInfo = + (DummyWriterContactInfo *)providedWriterInfo_v; + + RS_Stream->PeerCohort = PeerCohort; + RS_Stream->WriterCohortSize = writerCohortSize; + + /* + * make a copy of writer contact information (original will not be + * preserved) + */ + RS_Stream->WriterContactInfo = + malloc(sizeof(struct _DummyWriterContactInfo) * writerCohortSize); + for (int i = 0; i < writerCohortSize; i++) + { + RS_Stream->WriterContactInfo[i].ContactString = + strdup(providedWriterInfo[i]->ContactString); + RS_Stream->WriterContactInfo[i].WS_Stream = + providedWriterInfo[i]->WS_Stream; + Svcs->verbose( + RS_Stream->CP_Stream, + "Received contact info \"%s\", WS_stream %p for WSR Rank %d\n", + RS_Stream->WriterContactInfo[i].ContactString, + RS_Stream->WriterContactInfo[i].WS_Stream, i); + } +} + +typedef struct _DummyPerTimestepInfo +{ + char *CheckString; + int CheckInt; +} * DummyPerTimestepInfo; + +static void *DummyReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v, + int Rank, long Timestep, size_t Offset, + size_t Length, void *Buffer, + void *DP_TimestepInfo) +{ + Dummy_RS_Stream Stream = (Dummy_RS_Stream) + Stream_v; /* DP_RS_Stream is the return from InitReader */ + CManager cm = Svcs->getCManager(Stream->CP_Stream); + DummyCompletionHandle ret = malloc(sizeof(struct _DummyCompletionHandle)); + DummyPerTimestepInfo TimestepInfo = (DummyPerTimestepInfo)DP_TimestepInfo; + struct _DummyReadRequestMsg ReadRequestMsg; + + ret->CMcondition = CMCondition_get(cm, NULL); + ret->CPStream = Stream->CP_Stream; + ret->cm = cm; + ret->Buffer = Buffer; + ret->Rank = Rank; + /* + * set the completion handle as client Data on the condition so that + * handler has access to it. + */ + CMCondition_set_client_data(cm, ret->CMcondition, ret); + + Svcs->verbose(Stream->CP_Stream, + "Adios requesting to read remote memory for Timestep %d " + "from Rank %d, WSR_Stream = %p\n", + Timestep, Rank, Stream->WriterContactInfo[Rank].WS_Stream); + + /* send request to appropriate writer */ + /* memset avoids uninit byte warnings from valgrind */ + memset(&ReadRequestMsg, 0, sizeof(ReadRequestMsg)); + ReadRequestMsg.Timestep = Timestep; + ReadRequestMsg.Offset = Offset; + ReadRequestMsg.Length = Length; + ReadRequestMsg.WS_Stream = Stream->WriterContactInfo[Rank].WS_Stream; + ReadRequestMsg.RS_Stream = Stream; + ReadRequestMsg.RequestingRank = Stream->Rank; + ReadRequestMsg.NotifyCondition = ret->CMcondition; + Svcs->sendToPeer(Stream->CP_Stream, Stream->PeerCohort, Rank, + Stream->ReadRequestFormat, &ReadRequestMsg); + + return ret; +} + +static void DummyWaitForCompletion(CP_Services Svcs, void *Handle_v) +{ + DummyCompletionHandle Handle = (DummyCompletionHandle)Handle_v; + Svcs->verbose( + Handle->CPStream, + "Waiting for completion of memory read to rank %d, condition %d\n", + Handle->Rank, Handle->CMcondition); + /* + * Wait for the CM condition to be signalled. If it has been already, + * this returns immediately. Copying the incoming data to the waiting + * buffer has been done by the reply handler. + */ + CMCondition_wait(Handle->cm, Handle->CMcondition); + Svcs->verbose( + Handle->CPStream, + "Remote memory read to rank %d with condition %d has completed\n", + Handle->Rank, Handle->CMcondition); + free(Handle); +} + +static void DummyProvideTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, + struct _SstData *Data, + struct _SstMetadata *LocalMetadata, + long Timestep, void **TimestepInfoPtr) +{ + Dummy_WS_Stream Stream = (Dummy_WS_Stream)Stream_v; + TimestepList Entry = malloc(sizeof(struct _TimestepEntry)); + struct _DummyPerTimestepInfo *Info = + malloc(sizeof(struct _DummyPerTimestepInfo)); + + Info->CheckString = malloc(64); + sprintf(Info->CheckString, "Dummy info for timestep %ld from rank %d", + Timestep, Stream->Rank); + Info->CheckInt = Stream->Rank * 1000 + Timestep; + Entry->Data = Data; + Entry->Timestep = Timestep; + Entry->DP_TimestepInfo = Info; + + Entry->Next = Stream->Timesteps; + Stream->Timesteps = Entry; + *TimestepInfoPtr = Info; +} + +static void DummyReleaseTimestep(CP_Services Svcs, DP_WS_Stream Stream_v, + long Timestep) +{ + Dummy_WS_Stream Stream = (Dummy_WS_Stream)Stream_v; + TimestepList List = Stream->Timesteps; + + Svcs->verbose(Stream->CP_Stream, "Releasing timestep %ld\n", Timestep); + if (Stream->Timesteps->Timestep == Timestep) + { + Stream->Timesteps = List->Next; + free(List); + } + else + { + TimestepList last = List; + List = List->Next; + while (List != NULL) + { + if (List->Timestep == Timestep) + { + last->Next = List->Next; + free(List); + return; + } + last = List; + List = List->Next; + } + /* + * Shouldn't ever get here because we should never release a + * timestep that we don't have. + */ + fprintf(stderr, "Failed to release Timestep %ld, not found\n", + Timestep); + assert(0); + } +} + +static FMField DummyReaderContactList[] = { + {"ContactString", "string", sizeof(char *), + FMOffset(DummyReaderContactInfo, ContactString)}, + {"reader_ID", "integer", sizeof(void *), + FMOffset(DummyReaderContactInfo, RS_Stream)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec DummyReaderContactStructs[] = { + {"DummyReaderContactInfo", DummyReaderContactList, + sizeof(struct _DummyReaderContactInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField DummyWriterContactList[] = { + {"ContactString", "string", sizeof(char *), + FMOffset(DummyWriterContactInfo, ContactString)}, + {"writer_ID", "integer", sizeof(void *), + FMOffset(DummyWriterContactInfo, WS_Stream)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec DummyWriterContactStructs[] = { + {"DummyWriterContactInfo", DummyWriterContactList, + sizeof(struct _DummyWriterContactInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static FMField DummyTimestepInfoList[] = { + {"CheckString", "string", sizeof(char *), + FMOffset(DummyPerTimestepInfo, CheckString)}, + {"CheckInt", "integer", sizeof(void *), + FMOffset(DummyPerTimestepInfo, CheckInt)}, + {NULL, NULL, 0, 0}}; + +static FMStructDescRec DummyTimestepInfoStructs[] = { + {"DummyTimestepInfo", DummyTimestepInfoList, + sizeof(struct _DummyPerTimestepInfo), NULL}, + {NULL, NULL, 0, NULL}}; + +static struct _CP_DP_Interface dummyDPInterface; + +extern CP_DP_Interface LoadDummyDP() +{ + memset(&dummyDPInterface, 0, sizeof(dummyDPInterface)); + dummyDPInterface.ReaderContactFormats = DummyReaderContactStructs; + dummyDPInterface.WriterContactFormats = DummyWriterContactStructs; + dummyDPInterface.TimestepInfoFormats = NULL; // DummyTimestepInfoStructs; + dummyDPInterface.initReader = DummyInitReader; + dummyDPInterface.initWriter = DummyInitWriter; + dummyDPInterface.initWriterPerReader = DummyInitWriterPerReader; + dummyDPInterface.provideWriterDataToReader = DummyProvideWriterDataToReader; + dummyDPInterface.readRemoteMemory = DummyReadRemoteMemory; + dummyDPInterface.waitForCompletion = DummyWaitForCompletion; + dummyDPInterface.provideTimestep = DummyProvideTimestep; + dummyDPInterface.releaseTimestep = DummyReleaseTimestep; + return &dummyDPInterface; +} diff --git a/source/adios2/toolkit/sst/dp_interface.h b/source/adios2/toolkit/sst/dp_interface.h new file mode 100644 index 0000000000000000000000000000000000000000..c4ca59df7a334d0af473ed1c01c226fbe34ff286 --- /dev/null +++ b/source/adios2/toolkit/sst/dp_interface.h @@ -0,0 +1,219 @@ +#ifndef _DP_INTERFACE_H +#define _DP_INTERFACE_H + +/*! + * + * CP_DP_Interface is the set of data format descriptions and function + * pointers that define a dataplane interface to control plane. + * + */ +typedef struct _CP_DP_Interface *CP_DP_Interface; + +/*! + * CP_Services is the type of a pointer to a struct of function pointers + * that give data plane access to control plane routines and functions. + * Generally it is the first argument to all DP functions invoked by the + * control plane. + */ +typedef struct _CP_Services *CP_Services; + +/*! + * DP_RS_Stream is an externally opaque pointer-sized value that represents + * the Reader Side DP stream. It is returned by an init function and + * provided back to the dataplane on every subsequent reader side call. + */ +typedef void *DP_RS_Stream; + +/*! + * DP_WS_Stream is an externally opaque pointer-sized value that represents + * the Writer Side DP stream. Because a stream might have multiple readers, + * this value is only provided to the per-reader writer-side initialization + * function, which returns its own opaque stream ID value. + */ +typedef void *DP_WS_Stream; + +/*! + * DP_WSR_Stream is an externally opaque pointer-sized value that represents + * the Writer Side *per Reader* DP stream. This value is returned by the + * per-reader writer-side initialization and provided back to the dataplane + * on any later reader-specific operations. + */ +typedef void *DP_WSR_Stream; + +/*! + * CP_PeerCohort is a value provided to the data plane that acts as a + * handle to the opposite (reader or writer) cohort. It is used in the + * sst_send_to_peer service and helps the dataplane leverage existing + * control plane messaging capabilities. + */ +typedef void *CP_PeerCohort; + +/*! + * CP_DP_InitReaderFunc is the type of a dataplane reader-side stream + * initialization function. Its return value is DP_RS_stream, an externally + * opaque handle which is provided to the dataplane on all subsequent + * operations for this stream. 'stream' is an input parameter and is the + * control plane-level reader-side stream identifier. This may be useful + * for callbacks, access to MPI communicator, EVPath info, etc. so can be + * associated with the DP_RS_stream. 'ReaderContactInfoPtr' is a pointer to a + * void*. That void* should be filled in by the init function with a + * pointer to reader-specific contact information for this process. The + * `readerContactFormats` FMStructDescList should describe the datastructure + * pointed to by the void*. The control plane will gather this information + * for all reader ranks, transmit it to the writer cohort and provide it as + * an array of pointers in the `providedReaderInfo` argument to + * CP_DP_InitWriterPerReaderFunc. + */ +typedef DP_RS_Stream (*CP_DP_InitReaderFunc)(CP_Services Svcs, void *CP_Stream, + void **ReaderContactInfoPtr); + +/*! + * CP_DP_InitWriterFunc is the type of a dataplane writer-side stream + * initialization function. Its return value is DP_WS_stream, an externally + * opaque handle which is provided to the dataplane on all subsequent + * stream-wide operations for this stream. 'stream' is an input parameter and + * is the + * control plane-level writer-side stream identifier. This may be useful + * for callbacks, access to MPI communicator, EVPath info, etc. so can be + * associated with the DP_RS_stream. + */ +typedef DP_WS_Stream (*CP_DP_InitWriterFunc)(CP_Services Svcs, void *CP_Stream); + +/*! + * CP_DP_InitWriterPerReaderFunc is the type of a dataplane writer-side + * per-reader stream initialization function. It is called when a new + * reader joins an writer-side stream. Its return value is DP_WSR_stream, + * an externally opaque handle which is provided to the dataplane on all + * operations on this stream that are specific to this reader. operations + * for this stream. 'stream' is an input parameter and is the DP_WS_stream + * value that was returned when this stream was initialized via the + * CP_DP_InitWriterFunc. `readerCohortSize` is the size of the reader's MPI + * cohort. `providedReaderInfo` is a pointer to an array of void* pointers + * with array size `readerCohortSize`. The Nth element of the array is a + * pointer to the value returned in initReaderInfo by reader rank N (with + * type described by ReaderContactFormats). 'initWriterInfo' is a pointer + * to a void*. That void* should be filled in by the init function with a + * pointer to writer-specific contact information for this process. The + * `writerContactFormats` FMStructDescList should describe the datastructure + * pointed to by the void*. The control plane will gather this information + * for all writer ranks, transmit it to the reader cohort and provide it as + * an array of pointers in the `providedWriterInfo` argument to + * ProvideWriterDataToReader(). The `peerCohort` argument is a handle to + * the reader-side peer cohort for use in peer-to-peer messaging. + */ +typedef DP_WSR_Stream (*CP_DP_InitWriterPerReaderFunc)( + CP_Services Svcs, DP_WS_Stream Stream, int ReaderCohortSize, + CP_PeerCohort PeerCohort, void **ProvidedReaderInfo, + void **WriterContactInfoPtr); + +/* + * CP_DP_ProvideWriterDataToReaderFunc is the type of a dataplane reader-side + * function that provides information about the newly-connected writer-side + * stream. The `stream` parameter was that which was returned by a call to + * the CP_DP_InitReaderFunc. `writerCohortSize` is the size of the + * writer's MPI cohort. `providedWriterInfo` is a pointer to an array of + * void* pointers with array size `writerCohortSize`. The Nth element of + * the array is a pointer to the value returned in WriterContactInfoPtr by + * writer + * rank N (with type described by WriterContactFormats). `PeerCohort` + * argument is a handle to writer-side peer cohort for use in peer-to-peer + * messaging. + */ +typedef void (*CP_DP_ProvideWriterDataToReaderFunc)(CP_Services Svcs, + DP_RS_Stream Stream, + int WriterCohortSize, + CP_PeerCohort PeerCohort, + void **ProvidedWriterInfo); + +/* + * DP_CompletionHandle an externally opaque pointer-sized value that is + * returned by the asynchronous DpReadRemoteMemory() call and which can be + * used to wait for the compelteion of the read. + */ +typedef void *DP_CompletionHandle; + +/*! + * CP_DP_ReadRemoteMemoryFunc is the type of a dataplane function that reads + * into a local buffer the data contained in the data block associated with + * a specific writer `rank` and a specific `timestep`. The data should be + * placed in the area pointed to by `buffer`. The returned data should + * start at offset `offset` from the beginning of the writers data block and + * continue for `length` bytes. The value provided for DP_TimestepInfo will + * be value which was returned as the void* pointed to by TimestepInfoPtr on + * the writer side in ProvideTimestepFunc. + */ +typedef DP_CompletionHandle (*CP_DP_ReadRemoteMemoryFunc)( + CP_Services Svcs, DP_RS_Stream RS_Stream, int Rank, long Timestep, + size_t Offset, size_t Length, void *Buffer, void *DP_TimestepInfo); + +/*! + * CP_DP_WaitForCompletionFunc is the type of a dataplane function that + * suspends the execution of the current thread until the asynchronous + * CP_DP_ReadRemoteMemory call that returned its `handle` parameter. + */ +typedef void (*CP_DP_WaitForCompletionFunc)(CP_Services Svcs, + DP_CompletionHandle Handle); + +/*! + * CP_DP_ProvideTimestepFunc is the type of a dataplane function that + * delivers a block of data associated with timestep `timestep` to the + * dataplane, where it should be available for remote read requests until it + * is released with CP_DP_ReleaseTimestep. While not necessarily useful for + * the data plane, we also deliver the local (non-consolidated) metadata and + * a pointer to a void*, TimestepInfoPtr. That void* should be filled in by + * the ProvideTimestep function with a pointer to any DP info that the data + * plane wishes to be available on the reader side for this timestep. The + * `TimestepInfoFormats` FMStructDescList should describe the datastructure + * pointed to by the void*. The control plane will gather this information + * for all writer ranks, transmit it to the reader cohort along with the + * aggregated metadata. + */ +typedef void (*CP_DP_ProvideTimestepFunc)(CP_Services Svcs, DP_WS_Stream Stream, + struct _SstData *Data, + struct _SstMetadata *LocalMetadata, + long Timestep, + void **TimestepInfoPtr); + +/*! + * CP_DP_ReleaseTimestepFunc is the type of a dataplane function that + * informs the dataplane that the data associated with timestep `timestep` + * will no longer be the subject of remote read requests, so its resources + * may be released. + */ +typedef void (*CP_DP_ReleaseTimestepFunc)(CP_Services Svcs, DP_WS_Stream Stream, + long Timestep); + +struct _CP_DP_Interface +{ + FMStructDescList ReaderContactFormats; + FMStructDescList WriterContactFormats; + FMStructDescList TimestepInfoFormats; + + CP_DP_InitReaderFunc initReader; + CP_DP_InitWriterFunc initWriter; + CP_DP_InitWriterPerReaderFunc initWriterPerReader; + CP_DP_ProvideWriterDataToReaderFunc provideWriterDataToReader; + + CP_DP_ReadRemoteMemoryFunc readRemoteMemory; + CP_DP_WaitForCompletionFunc waitForCompletion; + + CP_DP_ProvideTimestepFunc provideTimestep; + CP_DP_ReleaseTimestepFunc releaseTimestep; +}; + +typedef void (*CP_VerboseFunc)(void *CP_Stream, char *Format, ...); +typedef CManager (*CP_GetCManagerFunc)(void *CP_stream); +typedef MPI_Comm (*CP_GetMPICommFunc)(void *CP_Stream); +typedef int (*CP_SendToPeerFunc)(void *CP_Stream, CP_PeerCohort PeerCohort, + int Rank, CMFormat Format, void *Data); +struct _CP_Services +{ + CP_VerboseFunc verbose; + CP_GetCManagerFunc getCManager; + CP_SendToPeerFunc sendToPeer; + CP_GetMPICommFunc getMPIComm; +}; + +CP_DP_Interface LoadDP(char *dp_name); + +#endif diff --git a/source/adios2/toolkit/sst/sst.h b/source/adios2/toolkit/sst/sst.h new file mode 100644 index 0000000000000000000000000000000000000000..322e80766c3c9a5f2bfb1b08ddc228472af9c9df --- /dev/null +++ b/source/adios2/toolkit/sst/sst.h @@ -0,0 +1,77 @@ +/* + * The SST external interfaces. + * + * This is more a rough sketch than a final version. The details will + * change when the integration with ADIOS2 layers happen. In the meantime, + * this interface (hopefully) captures enough of the functionality for + * control plane and data plane implementations to proceed while the + * integration details are hashed out. + */ +#ifndef SST_H_ +#define SST_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +/*! + * SstStream is the basic type of a stream connecting an ADIOS2 reader + * and an ADIOS2 writer. Externally the same data type is used for both. + */ +typedef struct _SstStream *SstStream; + +/* + * metadata and typedefs are tentative and may come from ADIOS2 constructors. +*/ +typedef struct _SstMetadata *SstMetadata; +typedef struct _SstFullMetadata *SstFullMetadata; +typedef struct _SstData *SstData; + +typedef enum { SstSuccess, SstEndOfStream, SstFatalError } SstStatusValue; + +/* + * Struct that represents statistics tracked by SST + */ +typedef struct _SstStats +{ + double OpenTimeSecs; + double CloseTimeSecs; + double ValidTimeSecs; + size_t BytesTransferred; +} * SstStats; + +/* + * Writer-side operations + */ +extern SstStream SstWriterOpen(const char *filename, const char *params, + MPI_Comm comm); +extern void SstProvideTimestep(SstStream s, SstMetadata local_metadata, + SstData data, long timestep); +extern void SstWriterClose(SstStream stream); + +/* + * Reader-side operations + */ +extern SstStream SstReaderOpen(const char *filename, const char *params, + MPI_Comm comm); +extern SstFullMetadata SstGetMetadata(SstStream stream, long timestep); +extern void *SstReadRemoteMemory(SstStream s, int rank, long timestep, + size_t offset, size_t length, void *buffer, + void *DP_TimestepInfo); +extern SstStatusValue SstWaitForCompletion(SstStream stream, void *completion); +extern void SstReleaseStep(SstStream stream, long timestep); +extern SstStatusValue SstAdvanceStep(SstStream stream, long timestep); +extern void SstReaderClose(SstStream stream); + +/* + * General Operations + */ +extern void SstSetStatsSave(SstStream Stream, SstStats Save); + +#include "sst_data.h" + +#ifdef __cplusplus +} +#endif + +#endif /* SST_H_*/ diff --git a/source/adios2/toolkit/sst/sst_data.h b/source/adios2/toolkit/sst/sst_data.h new file mode 100644 index 0000000000000000000000000000000000000000..fb47ccba0ce35487c53255293e33677d07502669 --- /dev/null +++ b/source/adios2/toolkit/sst/sst_data.h @@ -0,0 +1,67 @@ +#ifndef _SST_DATA_H_ +#define _SST_DATA_H_ + +#ifndef _SYS_TYPES_H_ +#include <sys/types.h> +#endif + +typedef enum { SST_UINT = 1, SST_INT = 2, SST_FLOAT = 3 } SST_BASE_TYPE; + +struct _SstFullMetadata +{ + int WriterCohortSize; + struct _SstMetadata **WriterMetadata; + void **DP_TimestepInfo; +}; + +struct _SstMetadata +{ + size_t DataSize; + int IntVarCount; + struct _SstIntMeta *IntVars; + int FloatVarCount; + struct _SstFloatMeta *FloatVars; + int VarCount; + struct _SstVarMeta *Vars; +}; + +struct _SstData +{ + size_t DataSize; + char *block; +}; + +struct _SstVarMeta +{ + char *VarName; + int DimensionCount; + struct _SstDimenMeta *Dimensions; + int DataOffsetInBlock; +}; + +struct _SstIntMeta +{ + char *VarName; + int64_t Value; +}; + +struct _SstUintMeta +{ + char *VarName; + u_int64_t Value; +}; + +struct _SstFloatMeta +{ + char *VarName; + double Value; +}; + +struct _SstDimenMeta +{ + int Offset; + int Size; + int GlobalSize; +}; + +#endif /* !_SST_DATA_H_ */ diff --git a/testing/adios2/engine/CMakeLists.txt b/testing/adios2/engine/CMakeLists.txt index 1fac5d460df1bc3cd228a1081053ca33fd07b387..5a8f771ad517329d6cd7e775a81e2946674729a3 100644 --- a/testing/adios2/engine/CMakeLists.txt +++ b/testing/adios2/engine/CMakeLists.txt @@ -18,3 +18,8 @@ endif() if(ADIOS2_HAVE_DataMan) add_subdirectory(dataman) endif() + +if(ADIOS2_HAVE_SST) + add_subdirectory(sst) +endif() + diff --git a/testing/adios2/engine/sst/CMakeLists.txt b/testing/adios2/engine/sst/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..96f6178c39894ee39667bca6c70f5ef8b6e7becc --- /dev/null +++ b/testing/adios2/engine/sst/CMakeLists.txt @@ -0,0 +1,26 @@ +#------------------------------------------------------------------------------# +# Distributed under the OSI-approved Apache License, Version 2.0. See +# accompanying file Copyright.txt for details. +#------------------------------------------------------------------------------# + +add_executable(TestSstWrite TestSstWrite.cpp) +add_executable(TestSstRead TestSstRead.cpp) + +# Workaround for multiple versions of FindSst +if(SST_INCLUDE_DIRS) + target_include_directories(TestSstWrite PRIVATE ${SST_INCLUDE_DIRS}) + target_include_directories(TestSstRead PRIVATE ${SST_INCLUDE_DIRS}) +endif() +target_link_libraries(TestSstWrite adios2 gtest ${Sst_LIBRARY}) +target_link_libraries(TestSstRead adios2 gtest ${Sst_LIBRARY}) + +if(ADIOS2_HAVE_MPI) + target_link_libraries(TestSstWrite MPI::MPI_C) + target_link_libraries(TestSstRead MPI::MPI_C) + set(extra_test_args EXEC_WRAPPER ${MPIEXEC_COMMAND}) +endif() + +configure_file(run_staging_test.in ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/run_staging_test @ONLY) + +ADD_TEST(ADIOSSstTest.Connection_1x1 ${CMAKE_RUNTIME_OUTPUT_DIRECTORY}/run_staging_test -nr 1 -nw 1 -v -p TestSst) +set_tests_properties(ADIOSSstTest.Connection_1x1 PROPERTIES TIMEOUT 60) diff --git a/testing/adios2/engine/sst/TestSstRead.cpp b/testing/adios2/engine/sst/TestSstRead.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d36e5ec10e0ec3d1c77e668b8c6f0fb71f99d2f0 --- /dev/null +++ b/testing/adios2/engine/sst/TestSstRead.cpp @@ -0,0 +1,103 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + */ +#include <cstdint> +#include <cstring> + +#include <iostream> +#include <stdexcept> + +#include <adios2.h> + +#include <gtest/gtest.h> + +#include "../SmallTestData.h" + +class SstReadTest : public ::testing::Test +{ +public: + SstReadTest() = default; + + SmallTestData m_TestData; +}; + +//****************************************************************************** +// 1D 1x8 test data +//****************************************************************************** + +// ADIOS2 Sst read +TEST_F(SstReadTest, ADIOS2SstRead1D8) +{ + // Each process would write a 1x8 array and all processes would + // form a mpiSize * Nx 1D array + const std::string fname = "ADIOS2Sst1D8.sst"; + + int mpiRank = 0, mpiSize = 1; + // Number of rows + const std::size_t Nx = 8; + + // Number of steps + const std::size_t NSteps = 3; + +#ifdef ADIOS2_HAVE_MPI + MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank); + MPI_Comm_size(MPI_COMM_WORLD, &mpiSize); +#endif + +// Write test data using ADIOS2 + +#ifdef ADIOS2_HAVE_MPI + adios2::ADIOS adios(MPI_COMM_WORLD, adios2::DebugON); +#else + adios2::ADIOS adios(true); +#endif + adios2::IO &io = adios.DeclareIO("TestIO"); + + // Declare 1D variables (NumOfProcesses * Nx) + // The local process' part (start, count) can be defined now or later + // before Write(). + { + adios2::Dims shape{static_cast<unsigned int>(Nx * mpiSize)}; + adios2::Dims start{static_cast<unsigned int>(Nx * mpiRank)}; + adios2::Dims count{static_cast<unsigned int>(Nx)}; + io.DefineVariable<int8_t>("i8", shape, start, count); + io.DefineVariable<int16_t>("i16", shape, start, count); + io.DefineVariable<int32_t>("i32", shape, start, count); + io.DefineVariable<int64_t>("i64", shape, start, count); + io.DefineVariable<uint8_t>("u8", shape, start, count); + io.DefineVariable<uint16_t>("u16", shape, start, count); + io.DefineVariable<uint32_t>("u32", shape, start, count); + io.DefineVariable<uint64_t>("u64", shape, start, count); + io.DefineVariable<float>("r32", shape, start, count); + io.DefineVariable<double>("r64", shape, start, count); + } + + // Create the ADIOS 1 Engine + io.SetEngine("SstReader"); + + adios2::Engine &engine = io.Open(fname, adios2::Mode::Read); + + // Close the file + engine.Close(); +} + +//****************************************************************************** +// main +//****************************************************************************** + +int main(int argc, char **argv) +{ +#ifdef ADIOS2_HAVE_MPI + MPI_Init(nullptr, nullptr); +#endif + + ::testing::InitGoogleTest(&argc, argv); + int result = RUN_ALL_TESTS(); + +#ifdef ADIOS2_HAVE_MPI + MPI_Finalize(); +#endif + + return result; +} diff --git a/testing/adios2/engine/sst/TestSstWrite.cpp b/testing/adios2/engine/sst/TestSstWrite.cpp new file mode 100644 index 0000000000000000000000000000000000000000..1c69f522b540506ab3cb7d91241b607dcc27f60c --- /dev/null +++ b/testing/adios2/engine/sst/TestSstWrite.cpp @@ -0,0 +1,149 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + */ +#include <cstdint> +#include <cstring> + +#include <iostream> +#include <stdexcept> + +#include <adios2.h> + +#include <gtest/gtest.h> + +#include "../SmallTestData.h" + +class SstWriteTest : public ::testing::Test +{ +public: + SstWriteTest() = default; + + SmallTestData m_TestData; +}; + +//****************************************************************************** +// 1D 1x8 test data +//****************************************************************************** + +// ADIOS2 SST write +TEST_F(SstWriteTest, ADIOS2SstWrite) +{ + // Each process would write a 1x8 array and all processes would + // form a mpiSize * Nx 1D array + const std::string fname = "ADIOS2Sst1D8.sst"; + + int mpiRank = 0, mpiSize = 1; + // Number of rows + const std::size_t Nx = 8; + + // Number of steps + const std::size_t NSteps = 3; + +#ifdef ADIOS2_HAVE_MPI + MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank); + MPI_Comm_size(MPI_COMM_WORLD, &mpiSize); +#endif + +// Write test data using ADIOS2 + +#ifdef ADIOS2_HAVE_MPI + adios2::ADIOS adios(MPI_COMM_WORLD, adios2::DebugON); +#else + adios2::ADIOS adios(true); +#endif + adios2::IO &io = adios.DeclareIO("TestIO"); + + // Declare 1D variables (NumOfProcesses * Nx) + // The local process' part (start, count) can be defined now or later + // before Write(). + { + adios2::Dims shape{static_cast<unsigned int>(Nx * mpiSize)}; + adios2::Dims start{static_cast<unsigned int>(Nx * mpiRank)}; + adios2::Dims count{static_cast<unsigned int>(Nx)}; + io.DefineVariable<int8_t>("i8", shape, start, count); + io.DefineVariable<int16_t>("i16", shape, start, count); + io.DefineVariable<int32_t>("i32", shape, start, count); + io.DefineVariable<int64_t>("i64", shape, start, count); + io.DefineVariable<uint8_t>("u8", shape, start, count); + io.DefineVariable<uint16_t>("u16", shape, start, count); + io.DefineVariable<uint32_t>("u32", shape, start, count); + io.DefineVariable<uint64_t>("u64", shape, start, count); + io.DefineVariable<float>("r32", shape, start, count); + io.DefineVariable<double>("r64", shape, start, count); + } + + // Create the ADIOS 1 Engine + io.SetEngine("SstWriter"); + + adios2::Engine &engine = io.Open(fname, adios2::Mode::Write); + + for (size_t step = 0; step < NSteps; ++step) + { + // Generate test data for each process uniquely + SmallTestData currentTestData = + generateNewSmallTestData(m_TestData, step, mpiRank, mpiSize); + + // Retrieve the variables that previously went out of scope + auto &var_i8 = *io.InquireVariable<int8_t>("i8"); + auto &var_i16 = *io.InquireVariable<int16_t>("i16"); + auto &var_i32 = *io.InquireVariable<int32_t>("i32"); + auto &var_i64 = *io.InquireVariable<int64_t>("i64"); + auto &var_u8 = *io.InquireVariable<uint8_t>("u8"); + auto &var_u16 = *io.InquireVariable<uint16_t>("u16"); + auto &var_u32 = *io.InquireVariable<uint32_t>("u32"); + auto &var_u64 = *io.InquireVariable<uint64_t>("u64"); + auto &var_r32 = *io.InquireVariable<float>("r32"); + auto &var_r64 = *io.InquireVariable<double>("r64"); + + // Make a 1D selection to describe the local dimensions of the + // variable we write and its offsets in the global spaces + adios2::Box<adios2::Dims> sel({mpiRank * Nx}, {Nx}); + var_i8.SetSelection(sel); + var_i16.SetSelection(sel); + var_i32.SetSelection(sel); + var_i64.SetSelection(sel); + var_u8.SetSelection(sel); + var_u16.SetSelection(sel); + var_u32.SetSelection(sel); + var_u64.SetSelection(sel); + var_r32.SetSelection(sel); + var_r64.SetSelection(sel); + + // Write each one + // fill in the variable with values from starting index to + // starting index + count + // engine.BeginStep(); + engine.PutSync(var_i8, currentTestData.I8.data()); + engine.PutSync(var_i16, currentTestData.I16.data()); + engine.PutSync(var_i32, currentTestData.I32.data()); + engine.PutSync(var_i64, currentTestData.I64.data()); + engine.PutSync(var_u8, currentTestData.U8.data()); + engine.PutSync(var_u16, currentTestData.U16.data()); + engine.PutSync(var_u32, currentTestData.U32.data()); + engine.PutSync(var_u64, currentTestData.U64.data()); + engine.PutSync(var_r32, currentTestData.R32.data()); + engine.PutSync(var_r64, currentTestData.R64.data()); + // Advance to the next time step + engine.EndStep(); + } + + // Close the file + engine.Close(); +} + +int main(int argc, char **argv) +{ +#ifdef ADIOS2_HAVE_MPI + MPI_Init(nullptr, nullptr); +#endif + + ::testing::InitGoogleTest(&argc, argv); + int result = RUN_ALL_TESTS(); + +#ifdef ADIOS2_HAVE_MPI + MPI_Finalize(); +#endif + + return result; +} diff --git a/testing/adios2/engine/sst/run_staging_test.in b/testing/adios2/engine/sst/run_staging_test.in new file mode 100755 index 0000000000000000000000000000000000000000..62f62b5f0d3c638d42a16ce4c092d66fe2df5ed6 --- /dev/null +++ b/testing/adios2/engine/sst/run_staging_test.in @@ -0,0 +1,170 @@ +#!/bin/bash + +cur_dir=${PWD##*/} + +qflag=no; +vflag=no; +transport="flx" +timeout="60" +nr="2" +nw="2" +debugger="gdb" + +gdb -v >/dev/null 2>&1 || { debugger="lldb"; } +$debugger -v >/dev/null 2>&1 || { debugger="not found"; } + +usage() +{ + cat << EO + Usage: $PROGNAME [options] + $PROGNAME -o <version> -c + + Run a test under flexpath_tests. + + + Options: +EO + cat <<EO | column -s\& -t + -h|--help & show this output + -p <name> & prefix of Reader and Writer programs + -r <name> & name of reader-side program + -w <name> & name of writer-side program + -n <count> & number of nodes to run both sides + -nw <count> & number of nodes to run write side + -nr <count> & number of nodes to run read side + -q & be quiet + -timeout <seconds> & timeout in seconds +EO +} + +PROGNAME=${0##*/} +SHORTOPTS="p:r:w:n:t:o:qhv" +LONGOPTS="p:,nw:,nr:,help,timeout:" + +ARGS=$(getopt -a -s bash --options $SHORTOPTS \ + --longoptions $LONGOPTS --name $PROGNAME -- "$@" ) + +eval set -- "$ARGS" + +reader_prog="UNSET" +writer_prog="UNSET" + +while [ $# -gt 0 ] +do + case "$1" in + -h|--help) + usage + exit 0 + ;; + (-q) qflag=yes;; + (-v) vflag=yes;; + (-r) reader_prog="$2"; shift;; + (-w) writer_prog="$2"; shift;; + (-n) nw="$2"; nr="$2"; shift;; + (-p) prefix="$2"; shift;; + (--nw) nw="$2"; shift;; + (--nr) nr="$2"; shift;; + (--timeout) timeout="$2"; shift;; + (--) shift; break;; + (-*) echo "$0: error - unrecognized option $1" 1>&2; exit 1;; + (*) break;; + esac + shift +done + +cd @CMAKE_RUNTIME_OUTPUT_DIRECTORY@ + +if [ "$reader_prog" == "UNSET" ]; then + reader_prog="$prefix""Read" +fi +if [ "$writer_prog" == "UNSET" ]; then + writer_prog="$prefix""Write" +fi + +# remove any lingering sst contact files +rm -f *.bpflx + +# Spawn the writer +if [ $vflag == "yes" ]; then + echo "Doing (@MPIEXEC@ -q @MPIEXEC_NUMPROC_FLAG@ $nw ./$writer_prog ) & writer_pid=$!" +fi +(@MPIEXEC@ -q @MPIEXEC_NUMPROC_FLAG@ $nw ./$writer_prog ) & writer_pid=$! + +sleep 2 + +# Spawn the reader +if [ $vflag == "yes" ]; then + echo "Doing (@MPIEXEC@ -q @MPIEXEC_NUMPROC_FLAG@ $nr ./$reader_prog 2>&1 1>/dev/null) & reader_pid=$!" +fi +(@MPIEXEC@ -q @MPIEXEC_NUMPROC_FLAG@ $nr ./$reader_prog 2>&1 1>/dev/null) & reader_pid=$! + + +# in the background, sleep for timeout secs then kill that processes + if [ $vflag == "yes" ]; then + echo -n "Background sleep before killing test is $timeout seconds Starting at " + date + fi +# (sleep $timeout ;kill -9 $reader_pid ; kill -9 $writer_pid ) 2>/dev/null & waiter=$! +# echo "WAITER PID is $waiter" + +# wait on our worker process and return the exitcode + if [ $vflag == "yes" ]; then + echo -n "Wait for reader $reader_pid " + date + fi +wait $reader_pid +reader_exitcode=$? + +# wait on our worker process and return the exitcode + if [ $vflag == "yes" ]; then + echo -n "Wait for writer $writer_pid " + date + fi +wait $writer_pid +writer_exitcode=$? + +# kill the waiter subshell, if it still runs + + if [ $vflag == "yes" ]; then + echo -n "Killing waiter $waiter " + date + fi +#kill -9 $waiter 2>/dev/null +# 0 if we killed the waiter, cause that means the process finished before the waiter +finished_gracefully=$? + + if [ $vflag == "yes" ]; then + echo -n "Waiting for $waiter " + date + fi +sleep 1 +# wait for waiter so that we don't get a killed message +wait $waiter 1>2 2>/dev/null + + if [ $vflag == "yes" ]; then + echo -n "Done with for $waiter " + date + fi +exit_value=0 +if [ "$writer_exitcode" -ne "0" ]; then + echo "$writer_prog exited with error code $writer_exitcode" + exit_value=1 +fi +if [ "$reader_exitcode" -ne "0" ]; then + echo "$reader_prog exited with error code $reader_exitcode" + exit_value=1 +fi +if [ "$finished_gracefully" -ne "0" ]; then + echo "One or more processes had to be killed because of timeout" + exit_value=1 +fi + +if [ $qflag != "yes" ]; then + if [ $exit_value -eq "0" ]; then + echo "TEST PASSED"; + else + echo "TEST FAILED"; + fi +fi + +exit $exit_value