From 2567b854d940995551d6cdaa70540474f540144b Mon Sep 17 00:00:00 2001 From: guj <jgu@lbl.gov> Date: Sun, 8 Oct 2017 13:54:51 -0700 Subject: [PATCH] added VDS support via Engine (HDF5Mixer) --- examples/heatTransfer/write/CMakeLists.txt | 12 + examples/heatTransfer/write/IO_h5mixer.cpp | 128 +++++ source/adios2/CMakeLists.txt | 3 + source/adios2/core/IO.cpp | 10 + source/adios2/engine/mixer/HDFMixer.cpp | 182 +++++++ source/adios2/engine/mixer/HDFMixer.h | 109 +++++ source/adios2/engine/mixer/HDFMixer.tcc | 46 ++ source/adios2/engine/mixer/HDFMixerWriter.cpp | 447 ++++++++++++++++++ source/adios2/engine/mixer/HDFMixerWriter.h | 91 ++++ .../toolkit/interop/hdf5/HDF5Common.cpp | 20 +- .../adios2/toolkit/interop/hdf5/HDF5Common.h | 2 + .../toolkit/interop/hdf5/HDF5Common.tcc | 23 +- 12 files changed, 1067 insertions(+), 6 deletions(-) create mode 100644 examples/heatTransfer/write/IO_h5mixer.cpp create mode 100644 source/adios2/engine/mixer/HDFMixer.cpp create mode 100644 source/adios2/engine/mixer/HDFMixer.h create mode 100644 source/adios2/engine/mixer/HDFMixer.tcc create mode 100644 source/adios2/engine/mixer/HDFMixerWriter.cpp create mode 100644 source/adios2/engine/mixer/HDFMixerWriter.h diff --git a/examples/heatTransfer/write/CMakeLists.txt b/examples/heatTransfer/write/CMakeLists.txt index 51028cb8b..2bf6568d9 100644 --- a/examples/heatTransfer/write/CMakeLists.txt +++ b/examples/heatTransfer/write/CMakeLists.txt @@ -66,5 +66,17 @@ if(ADIOS2_HAVE_MPI) target_link_libraries(heatTransfer_write_a2h5 adios2 MPI::MPI_C ${CMAKE_THREAD_LIBS_INIT} ) + + add_executable(heatTransfer_write_h5mixer + main.cpp + HeatTransfer.cpp + Settings.cpp + IO_h5mixer.cpp + ) + + target_link_libraries(heatTransfer_write_h5mixer + adios2 MPI::MPI_C ${CMAKE_THREAD_LIBS_INIT} + ) + endif() endif() diff --git a/examples/heatTransfer/write/IO_h5mixer.cpp b/examples/heatTransfer/write/IO_h5mixer.cpp new file mode 100644 index 000000000..a6365cff3 --- /dev/null +++ b/examples/heatTransfer/write/IO_h5mixer.cpp @@ -0,0 +1,128 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * IO_ADIOS2.cpp + * + * Created on: Feb 2017 + * Author: Norbert Podhorszki + */ + +#include "IO.h" + +#include <string> + +#include <adios2.h> + +#define str_helper(X) #X +#define str(X) str_helper(X) +//#ifndef DEFAULT_CONFIG +//#define DEFAULT_CONFIG config.xml +//#endif +#define DEFAULT_CONFIG mix.xml +#define DEFAULT_CONFIG_STR str(DEFAULT_CONFIG) + +static int rank_saved; +adios2::ADIOS *ad = nullptr; +std::shared_ptr<adios2::Engine> h5mixerWriter; +adios2::Variable<double> *varT = nullptr; +adios2::Variable<unsigned int> *varGndx = nullptr; + +IO::IO(const Settings &s, MPI_Comm comm) +{ + rank_saved = s.rank; + // m_outputfilename = s.outputfile + ".h5"; + m_outputfilename = s.outputfile; + /*ad = new adios2::ADIOS(std::string(DEFAULT_CONFIG_STR), comm, + adios2::DebugON); + */ + ad = new adios2::ADIOS(comm, adios2::DebugOFF); + + // Define method for engine creation + + adios2::IO &h5io = ad->DeclareIO("output"); + if (!h5io.InConfigFile()) + { + // if not defined by user, we can change the default settings + // BPFileWriter is the default engine + + // Allow an extra thread for data processing + // ISO-POSIX file is the default transport + // Passing parameters to the transport + h5io.SetEngine("HDFMixer"); + } + + varGndx = &h5io.DefineVariable<unsigned int>("gndx"); + h5io.DefineVariable<unsigned int>("gndy"); + + // define T as 2D global array + varT = &h5io.DefineVariable<double>( + "T", + // Global dimensions + {s.gndx, s.gndy}, + // starting offset of the local array in the global space + {s.offsx, s.offsy}, + // local size, could be defined later using SetSelection() + {s.ndx, s.ndy}); + + // add transform to variable + // adios2::Transform tr = adios2::transform::BZIP2( ); + // varT.AddTransform( tr, "" ); + // varT.AddTransform( tr,"accuracy=0.001" ); // for ZFP + + h5mixerWriter = h5io.Open(m_outputfilename, adios2::OpenMode::Write, comm); + + if (!h5mixerWriter) + { + throw std::ios_base::failure( + "ERROR: failed to open ADIOS h5mixerWriter\n"); + } +} + +IO::~IO() +{ + h5mixerWriter->Close(); + delete ad; +} + +void IO::write(int step, const HeatTransfer &ht, const Settings &s, + MPI_Comm comm) +{ +#if 1 + + /* This selection is redundant and not required, since we defined + * the selection already in DefineVariable(). It is here just as an example. + */ + // Make a selection to describe the local dimensions of the variable we + // write and its offsets in the global spaces. This could have been done in + // adios.DefineVariable() + adios2::SelectionBoundingBox sel({s.offsx, s.offsy}, {s.ndx, s.ndy}); + varT->SetSelection(sel); + + /* Select the area that we want to write from the data pointer we pass to + the + writer. + Think HDF5 memspace, just not hyperslabs, only a bounding box selection. + Engine will copy this bounding box from the data pointer into the output + buffer. + Size of the bounding box should match the "space" selection which was + given + above. + Default memspace is always the full selection. + */ + adios2::SelectionBoundingBox memspace = + adios2::SelectionBoundingBox({1, 1}, {s.ndx, s.ndy}); + varT->SetMemorySelection(memspace); + + h5mixerWriter->Write<unsigned int>(*varGndx, s.gndx); + h5mixerWriter->Write<unsigned int>("gndy", s.gndy); + h5mixerWriter->Write<double>(*varT, ht.data_noghost().data()); + h5mixerWriter->Advance(); + +#else + + h5mixerWriter->Write<double>(*varT, ht.data_noghost().data()); + h5mixerWriter->Advance(); + +#endif +} diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index 1a7e04532..011b56c97 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -117,6 +117,9 @@ if(ADIOS2_HAVE_HDF5) target_sources(adios2 PRIVATE engine/hdf5/HDF5ReaderP.cpp engine/hdf5/HDF5WriterP.cpp + engine/mixer/HDFMixer.cpp + engine/mixer/HDFMixer.tcc + engine/mixer/HDFMixerWriter.cpp toolkit/interop/hdf5/HDF5Common.cpp toolkit/interop/hdf5/HDF5Common.tcc ) target_link_libraries(adios2 PRIVATE ${HDF5_C_LIBRARIES}) diff --git a/source/adios2/core/IO.cpp b/source/adios2/core/IO.cpp index e635a1008..dc8e29c56 100644 --- a/source/adios2/core/IO.cpp +++ b/source/adios2/core/IO.cpp @@ -29,6 +29,7 @@ #ifdef ADIOS2_HAVE_HDF5 // external dependencies #include "adios2/engine/hdf5/HDF5ReaderP.h" #include "adios2/engine/hdf5/HDF5WriterP.h" +#include "adios2/engine/mixer/HDFMixer.h" #endif namespace adios2 @@ -229,6 +230,15 @@ std::shared_ptr<Engine> IO::Open(const std::string &name, // engine = std::make_shared<BPFileReader>(*this, name, openMode, // mpiComm); } + else if (m_EngineType == "HDFMixer") + { +#ifdef ADIOS2_HAVE_HDF5 + engine = std::make_shared<HDFMixer>(*this, name, openMode, mpiComm); +#else + throw std::invalid_argument("ERROR: this version didn't compile with " + "HDF5 library, can't use HDF5\n"); +#endif + } else if (m_EngineType == "DataManWriter") { #ifdef ADIOS2_HAVE_DATAMAN diff --git a/source/adios2/engine/mixer/HDFMixer.cpp b/source/adios2/engine/mixer/HDFMixer.cpp new file mode 100644 index 000000000..9455b642b --- /dev/null +++ b/source/adios2/engine/mixer/HDFMixer.cpp @@ -0,0 +1,182 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * HDFMixer.cpp + * + * Created on: Dec 19, 2016 + * Author: Junmin GU + */ + +#include "HDFMixer.h" +#include "HDFMixer.tcc" + +#include "adios2/ADIOSMPI.h" +#include "adios2/core/IO.h" +#include "adios2/helper/adiosFunctions.h" //CheckIndexRange +#include "adios2/toolkit/transport/file/FileStream.h" + +namespace adios2 +{ + +HDFMixer::HDFMixer(IO &io, const std::string &name, const OpenMode openMode, + MPI_Comm mpiComm) +: Engine("HDFMixer", io, name, openMode, mpiComm), + m_HDFVDSWriter(mpiComm, m_DebugMode), + m_HDFSerialWriter(MPI_COMM_SELF, m_DebugMode), + m_TransportsManager(mpiComm, m_DebugMode) +{ + m_EndMessage = " in call to IO Open HDFMixer " + m_Name + "\n"; + Init(); +} + +HDFMixer::~HDFMixer() = default; + +void HDFMixer::Init() +{ + InitParameters(); + InitTransports(); + InitBuffer(); +} + +#define declare_type(T) \ + void HDFMixer::DoWrite(Variable<T> &variable, const T *values) \ + { \ + DoWriteCommon(variable, values); \ + } +ADIOS2_FOREACH_TYPE_1ARG(declare_type) +#undef declare_type + +void HDFMixer::Advance(const float /*timeout_sec*/) +{ + m_HDFSerialWriter.Advance(); + m_HDFVDSWriter.Advance(); +} + +void HDFMixer::Close(const int transportIndex) +{ + if (m_DebugMode) + { + if (!m_TransportsManager.CheckTransportIndex(transportIndex)) + { + auto transportsSize = m_TransportsManager.m_Transports.size(); + throw std::invalid_argument( + "ERROR: transport index " + std::to_string(transportIndex) + + " outside range, -1 (default) to " + + std::to_string(transportsSize - 1) + ", in call to Close\n"); + } + } + + // close bp buffer by flattening data and metadata + m_HDFSerialWriter.Close(); + m_HDFVDSWriter.Close(); + // send data to corresponding transports + /* + m_TransportsManager.WriteFiles(m_HDFSerialWriter.m_HeapBuffer.GetData(), + m_HDFSerialWriter.m_HeapBuffer.m_DataPosition, + transportIndex); + + m_TransportsManager.CloseFiles(transportIndex); + */ + /* + // do profiling later + if (m_HDFSerialWriter.m_Profiler.IsActive && + m_TransportsManager.AllTransportsClosed()) + { + WriteProfilingJSONFile(); + } + */ +} + +// PRIVATE FUNCTIONS +void HDFMixer::InitParameters() +{ + // no need for hdf5 + // m_HDFSerialWriter.InitParameters(m_IO.m_Parameters); +} + +void HDFMixer::InitTransports() +{ + if (m_IO.m_TransportsParameters.empty()) + { + Params defaultTransportParameters; + defaultTransportParameters["transport"] = "File"; + m_IO.m_TransportsParameters.push_back(defaultTransportParameters); + } + +#ifdef NEVER +/* + // TODO need to add support for aggregators here later + + // Names are std::vector<std::string> + auto transportsNames = m_TransportsManager.GetFilesBaseNames( + m_Name, m_IO.m_TransportsParameters); + auto bpBaseNames = m_HDFSerialWriter.GetBPBaseNames(transportsNames); + auto bpNames = m_HDFSerialWriter.GetBPNames(transportsNames); + + m_TransportsManager.OpenFiles(bpBaseNames, bpNames, m_OpenMode, + m_IO.m_TransportsParameters, + m_HDFSerialWriter.m_Profiler.IsActive); +*/ +#else + + int rank; + MPI_Comm_rank(m_MPIComm, &rank); + m_HDFSerialWriter.Init(m_Name, rank); + m_HDFVDSWriter.Init(m_Name); +/* +auto transportsNames = m_TransportsManager.GetFilesBaseNames( + m_Name, +m_IO.m_TransportsParameters); auto baseNames = +m_HDFSerialWriter.GetBaseNames(transportsNames); + +auto h5name = m_HDFSerialWriter.GetLocalFileNames(baseNames, m_Name); // e.g. +base=/my/path/xy_1.h5 m_TransportsManager.OpenFiles(baseNames, h5name, +m_OpenMode, m_IO.m_TransportsParameters, m_HDFSerialWriter.m_Profiler.IsActive); +*/ +#endif +} + +void HDFMixer::InitBuffer() +{ + /* + if (m_OpenMode == OpenMode::Append) + { + throw std::invalid_argument( + "ADIOS2: OpenMode Append hasn't been implemented, yet"); + // TODO: Get last pg timestep and update timestep counter in + } + else + { + m_HDFSerialWriter.WriteProcessGroupIndex( + m_IO.m_HostLanguage, + m_TransportsManager.GetTransportsTypes()); + } + */ +} + +void HDFMixer::WriteProfilingJSONFile() +{ /* + auto transportTypes = m_TransportsManager.GetTransportsTypes(); + auto transportProfilers = m_TransportsManager.GetTransportsProfilers(); + + const std::string lineJSON( + m_HDFSerialWriter.GetRankProfilingJSON(transportTypes, + transportProfilers)); + + const std::string profilingJSON( + m_HDFSerialWriter.AggregateProfilingJSON(lineJSON)); + + //if (m_HDFSerialWriter.m_BP1Aggregator.m_RankMPI == 0) + if (m_HDFSerialWriter.m_MPIRank == 0) + { + transport::FileStream profilingJSONStream(m_MPIComm, m_DebugMode); + auto baseNames = m_HDFSerialWriter.GetBaseNames({m_Name}); + profilingJSONStream.Open(baseNames[0] + "/profiling.json", + OpenMode::Write); + profilingJSONStream.Write(profilingJSON.c_str(), profilingJSON.size()); + profilingJSONStream.Close(); + } + */} + +} // end namespace adios diff --git a/source/adios2/engine/mixer/HDFMixer.h b/source/adios2/engine/mixer/HDFMixer.h new file mode 100644 index 000000000..353950763 --- /dev/null +++ b/source/adios2/engine/mixer/HDFMixer.h @@ -0,0 +1,109 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * HDFMixer.h + * + * Created on: Aug 16 2017 + * Author: Junmin GU + */ + +#ifndef ADIOS2_ENGINE_H5_HDFMIXER_H_ +#define ADIOS2_ENGINE_H5_HDFMIXER_H_ + +#include "adios2/ADIOSConfig.h" +#include "adios2/core/Engine.h" +//#include "adios2/toolkit/format/bp1/BP1Writer.h" //format::BP1Writer + +/// \cond EXCLUDE_FROM_DOXYGEN +#include <algorithm> //std::count, std::copy, std::for_each +#include <cmath> //std::ceil +#include <cstring> //std::memcpy +/// \endcond + +#include "HDFMixerWriter.h" +#include "adios2/ADIOSConfig.h" +#include "adios2/ADIOSMacros.h" +#include "adios2/ADIOSTypes.h" +#include "adios2/core/Variable.h" +#include "adios2/helper/adiosFunctions.h" +#include "adios2/toolkit/capsule/heap/STLVector.h" +#include "adios2/toolkit/transportman/TransportMan.h" //transport::TransportsMan + +#include <iostream> + +namespace adios2 +{ + +class HDFMixer : public Engine +{ + +public: + /** + * Constructor for file Writer in H5 format + * @param name unique name given to the engine + * @param openMode w (supported), r, a from OpenMode in ADIOSTypes.h + * @param mpiComm MPI communicator + */ + HDFMixer(IO &io, const std::string &name, const OpenMode openMode, + MPI_Comm mpiComm); + + ~HDFMixer(); + + void Advance(const float timeoutSeconds = 0.0) final; + + /** + * Closes a single transport or all transports + * @param transportIndex, if -1 (default) closes all transports, otherwise + * it + * closes a transport in m_Transport[transportIndex]. In debug mode the + * latter + * is bounds-checked. + */ + void Close(const int transportIndex = -1) final; + + void CreateName(std::string &pathName, std::string &rootName, + std::string &fullH5Name, int rank); + +private: + /** Single object controlling H5 buffering */ + // format::H51Writer m_H51Writer; + HDFSerialWriter m_HDFSerialWriter; + HDFVDSWriter m_HDFVDSWriter; + + /** single object controlling a vector of Transports from IO AddTransport */ + transportman::TransportMan m_TransportsManager; + + /** true: due to buffer overflow, move to transports manager */ + bool m_DoTransportFlush = false; + + void Init() final; + + /** Parses parameters from IO SetParameters */ + void InitParameters() final; + /** Parses transports and parameters from IO AddTransport */ + void InitTransports() final; + + void InitBuffer(); + +#define declare_type(T) \ + void DoWrite(Variable<T> &variable, const T *values) final; + ADIOS2_FOREACH_TYPE_1ARG(declare_type) +#undef declare_type + + /** + * Common function for primitive (including std::complex) writes + * @param variable + * @param values + */ + template <class T> + void DoWriteCommon(Variable<T> &variable, const T *values); + + /** Write a profiling.json file from m_H51Writer and m_TransportsManager + * profilers*/ + void WriteProfilingJSONFile(); +}; + +} // end namespace adios + +#endif /* ADIOS2_ENGINE_H5_HDFMIXER_H_ */ diff --git a/source/adios2/engine/mixer/HDFMixer.tcc b/source/adios2/engine/mixer/HDFMixer.tcc new file mode 100644 index 000000000..63da2019d --- /dev/null +++ b/source/adios2/engine/mixer/HDFMixer.tcc @@ -0,0 +1,46 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * HDFMixer.tcc implementation of template functions with known type + * + * Created on: Aug 16 2017 + * Author: Junmin GU + */ + +#include "HDFMixer.h" + +namespace adios2 +{ + +template <class T> +void HDFMixer::DoWriteCommon(Variable<T> &variable, const T *values) +{ + variable.m_AppValues = values; + m_WrittenVariables.insert(variable.m_Name); + Variable<T> local(variable.m_Name, {}, {}, variable.m_Count, + variable.m_Count.size(), false); + + // m_HDFSerialWriter.m_H5File.Write(variable, values); + // writes only the m_Count() part + int nDims = std::max(variable.m_Shape.size(), variable.m_Count.size()); + if (nDims == 0) + { + // this is scalar + if (m_HDFVDSWriter.m_Rank == 0) + { + m_HDFVDSWriter.m_VDSFile.Write(local, values); + } + } + else + { + m_HDFSerialWriter.m_H5File.Write(local, values); + // std::cout<<" ==> "<< variable.m_Name<<std::endl; + // BuildVDS(AddExtension(m_Name, ".h5"), variable, + // m_HDFSerialWriter.m_H5File.GetHDF5Type<T>()); + m_HDFVDSWriter.AddVar(variable, + m_HDFSerialWriter.m_H5File.GetHDF5Type<T>()); + } +} + +} // end namespace adios diff --git a/source/adios2/engine/mixer/HDFMixerWriter.cpp b/source/adios2/engine/mixer/HDFMixerWriter.cpp new file mode 100644 index 000000000..b760fa19c --- /dev/null +++ b/source/adios2/engine/mixer/HDFMixerWriter.cpp @@ -0,0 +1,447 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * HDFMixer.h + * + * Created on: Aug 16 2017 + * Author: Junmin GU + */ + +#include <iostream> + +#include "HDFMixerWriter.h" +#include "adios2/helper/adiosFunctions.h" + +// +// class HDFSerialWriter +// + +adios2::HDFVDSWriter::HDFVDSWriter(MPI_Comm mpiComm, bool debugMode = false) +: m_MPISubfileComm(mpiComm), m_VDSFile(debugMode), m_Rank(-1) +{ + MPI_Comm_size(m_MPISubfileComm, &m_NumSubFiles); + MPI_Comm_rank(m_MPISubfileComm, &m_Rank); +} + +void adios2::HDFVDSWriter::Init(const std::string &name) +{ + if (m_Rank > 0) + { + return; + } + + // + // VDS can only operate on one process. So let rank = 0 handle it + // + std::string h5Name = AddExtension(name, ".h5"); + m_VDSFile.Init(h5Name, MPI_COMM_SELF, true); + // m_FileName = h5Name; + m_FileName = name; +} + +void adios2::HDFVDSWriter::GetVarInfo(const VariableBase &var, + std::vector<hsize_t> &dimsf, int nDims, + std::vector<hsize_t> &start, + std::vector<hsize_t> &count, + std::vector<hsize_t> &one) +{ // interop::HDF5Common summaryFile(true); + // std::vector<hsize_t> dimsf, start, one, count; + // int nDims = std::max(var.m_Shape.size(), var.m_Count.size()); + + for (int i = 0; i < nDims; i++) + { + if (var.m_Shape.size() > 0) + { + dimsf.push_back(var.m_Shape[i]); + } + else + { + dimsf.push_back(var.m_Count[i]); + } + if (var.m_Start.size() > 0) + { + start.push_back(var.m_Start[i]); + } + else + { + start.push_back(0); + } + if (var.m_Count.size() > 0) + { + count.push_back(var.m_Count[i]); + } + else if (var.m_Shape.size() > 0) + { + count.push_back(var.m_Shape[i]); + } + else + { + count.push_back(0); + } + one.push_back(1); + } +} + +void adios2::HDFVDSWriter::AddVar(const VariableBase &var, hid_t h5Type) +{ + hid_t space; + /* Create VDS dataspace. */ + int nDims = std::max(var.m_Shape.size(), var.m_Count.size()); + + if (nDims == 0) + { + if (m_Rank == 0) + { + /* + std::cout<<" will deal with scalar later?"<<var.m_Name<<std::endl; + + hid_t filespaceID = H5Screate(H5S_SCALAR); + hid_t dsetID = H5Dcreate(m_VDSFile.m_GroupId, var.m_Name.c_str(), + h5Type, filespaceID, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); + //hid_t plistID = H5Pcreate(H5P_DATASET_XFER); + //H5Pset_dxpl_mpio(plistID, H5FD_MPIO_COLLECTIVE); + herr_t status = H5Dwrite(dsetID, h5Type, H5S_ALL, H5S_ALL, + H5P_DEFAULT, values); + //herr_t status = H5Dwrite(dsetID, h5Type, H5S_ALL, H5S_ALL, + plistID, values); + + H5Sclose(filespaceID); + H5Dclose(dsetID); + */ + } + return; // + } + + /* Initialize hyperslab values. */ + size_t all_starts[m_NumSubFiles][nDims]; + size_t all_counts[m_NumSubFiles][nDims]; + + // + std::vector<hsize_t> dimsf, start, one, count; + GetVarInfo(var, dimsf, nDims, start, count, one); + // + + MPI_Gather(start.data(), nDims, ADIOS_MPI_SIZE_T, all_starts, nDims, + ADIOS_MPI_SIZE_T, 0, m_MPISubfileComm); + + MPI_Gather(count.data(), nDims, ADIOS_MPI_SIZE_T, all_counts, nDims, + ADIOS_MPI_SIZE_T, 0, m_MPISubfileComm); + + herr_t status; + if (m_Rank == 0) + { + m_VDSFile.CheckWriteGroup(); + /* Set VDS creation property. */ + hid_t dcpl = H5Pcreate(H5P_DATASET_CREATE); + // status = H5Pset_fill_value(dcpl, ADIOS_MPI_SIZE_T, 0); + + /* + std::cout<<"nDims = "<<nDims<<" h5type="<<h5Type<<" + filename="<<m_FileName<<std::endl; for (int i=0; i<m_NumSubFiles; i++) { + std::cout<<((size_t*)all_starts)[i*nDims]<<", + "<<((size_t*)all_starts)[i*nDims+1]<<" :: "; + std::cout<<((size_t*)all_counts)[i*nDims]<<", + "<<((size_t*)all_counts)[i*nDims+1]<<std::endl; + } + std::cout<<"num subfiles="<<m_NumSubFiles<<std::endl; + */ + + space = H5Screate_simple(nDims, dimsf.data(), NULL); + // summaryFile.Init(fileName.c_str(), MPI_COMM_SELF, true); + + hsize_t currCount[nDims], currStart[nDims]; + // std::string subfileVarName="TimeStep0/"+var.m_Name; // need full + // path? NEED TO GET the RIGHT SUBFILE VAR NAME RELATED to TIMESTEP!! + std::string subfileVarName; + interop::HDF5Common::StaticGetTimeStepString( + subfileVarName, m_VDSFile.m_CurrentTimeStep); + subfileVarName += "/" + var.m_Name; + + for (int i = 0; i < m_NumSubFiles; i++) + { + for (int j = 0; j < nDims; j++) + { + currCount[j] = all_counts[i][j]; + currStart[j] = all_starts[i][j]; + // std::cout<<i<<"th: subfile, "<<j<<"th dirmention: + // count:"<<currCount[j] <<" start:"<<currStart[j]<<std::endl; + } + hid_t src_space = H5Screate_simple( + nDims, currCount, + NULL); // with factor=1, we do not flatten the data + + status = H5Sselect_hyperslab(space, H5S_SELECT_SET, currStart, NULL, + one.data(), currCount); + + std::string path, root, subfileName; + HDFSerialWriter::StaticCreateName( + path, root, subfileName, m_FileName, + i); // for each core, get the subfile name + + // std::cout<<" subfileName="<<subfileName<<", + // var="<<subfileVarName<<std::endl; + + status = H5Pset_virtual(dcpl, space, subfileName.c_str(), + subfileVarName.c_str(), src_space); + status = H5Sclose(src_space); + } + + /* Create a virtual dataset. */ + // hid_t dset = H5Dcreate2 (m_VDSFile.m_FileId, subfileVarName.c_str(), + // h5Type, space, H5P_DEFAULT, dcpl, H5P_DEFAULT); + + hid_t dset = H5Dcreate2(m_VDSFile.m_GroupId, var.m_Name.c_str(), h5Type, + space, H5P_DEFAULT, dcpl, H5P_DEFAULT); + + status = H5Sclose(space); + status = H5Dclose(dset); + status = H5Pclose(dcpl); + } + + // m_VDSFile.Close(); + MPI_Barrier(m_MPISubfileComm); +} + +void adios2::HDFVDSWriter::Advance(const float timeoutSeconds) +{ + if (m_Rank > 0) + { + return; + } + + m_VDSFile.Advance(); +} + +void adios2::HDFVDSWriter::Close(const int transportIndex) +{ + if (m_Rank > 0) + { + return; + } + + m_VDSFile.Close(); +} + +// +// class HDFSerialWriter +// +adios2::HDFSerialWriter::HDFSerialWriter(MPI_Comm mpiComm, + const bool debugMode = false) +: m_MPILocalComm(mpiComm), m_DebugMode(debugMode), m_H5File(debugMode) +{ +} + +void adios2::HDFSerialWriter::Advance(const float timeoutSeconds) +{ + m_H5File.Advance(); +} +void adios2::HDFSerialWriter::Close(const int transportIndex) +{ + m_H5File.Close(); +}; + +void adios2::HDFSerialWriter::StaticCreateName(std::string &pathName, + std::string &rootName, + std::string &fullH5Name, + const std::string &input, + int rank) +{ + + auto lf_GetBaseName = [](const std::string &name) -> std::string { + const std::string baseName(AddExtension(name, ".h5") + ".dir"); + return baseName; + }; + + auto lf_GetRootTag = [](const std::string &userTag) -> std::string { + std::string h5RootName = userTag; + const auto lastPathSeparator(userTag.find_last_of(PathSeparator)); + if (lastPathSeparator != std::string::npos) + { + h5RootName = userTag.substr(lastPathSeparator); + } + return h5RootName; + }; + + pathName = lf_GetBaseName(input); + rootName = lf_GetRootTag(input); + + fullH5Name = + (pathName + "/" + rootName + "_" + std::to_string(rank) + ".h5"); +} + +void adios2::HDFSerialWriter::Init(const std::string &name, int rank) +{ + /* + auto lf_GetBaseName = [](const std::string &name) -> std::string { + const std::string baseName(AddExtension(name, ".h5") + ".dir"); + return baseName; + }; + + auto lf_GetRootTag = [] (const std::string &userTag) -> std::string { + std::string h5RootName = userTag; + const auto lastPathSeparator(userTag.find_last_of(PathSeparator)); + if (lastPathSeparator != std::string::npos) + { + h5RootName = userTag.substr(lastPathSeparator); + } + return h5RootName; + }; + + std::string baseName=lf_GetBaseName(name); + + auto rootTag = lf_GetRootTag(name); + const std::string h5Name(baseName + "/" + + rootTag+"_"+std::to_string(rank)+".h5"); + + */ + std::string baseName, rootTag, h5Name; + StaticCreateName(baseName, rootTag, h5Name, name, rank); + // std::cout<<"rank="<<rank<<" name="<<h5Name<<std::endl; + CreateDirectory(baseName); + m_H5File.Init(h5Name, m_MPILocalComm, true); + + m_FileName = h5Name; + m_Rank = rank; + // m_H5File.Init(h5Name, m_, true); +} + +/* + std::vector<std::string> + GetBaseNames(const std::vector<std::string> &names) const noexcept + { + auto lf_GetBaseName = [](const std::string &name) -> std::string { + const std::string baseName(AddExtension(name, ".h5") + ".dir"); + return baseName; + }; + + std::vector<std::string> baseNames; + baseNames.reserve(names.size()); + + for (const auto &name : names) + { + baseNames.push_back(lf_GetBaseName(name)); + } + return baseNames; + } + + + std::vector<std::string> + GetLocalFileNames(const std::vector<std::string> &baseNames, + const std::string &userTag) const noexcept + { + // e.g. /some/where/xy.h5.dir + // e.g. xy + + auto lf_GetH5Name = [](const std::string &baseName, + const std::string &userTag, + const int rank) -> std::string { +#ifdef NEVER + const std::string h5BaseName = AddExtension(baseName, ".h5"); + + std::string h5RootName = h5BaseName; + const auto lastPathSeparator(h5BaseName.find_last_of(PathSeparator)); + + if (lastPathSeparator != std::string::npos) + { + h5RootName = h5BaseName.substr(lastPathSeparator); + } + const std::string h5Name(h5BaseName + ".dir/" + h5RootName + "." + + std::to_string(rank)); +#else + const std::string h5Name(baseName + "/" + +userTag+"_"+std::to_string(rank)+".h5"); #endif return h5Name; + }; + + + auto lf_GetRootTag = [] (const std::string &userTag) -> std::string { + std::string h5RootName = userTag; + const auto lastPathSeparator(userTag.find_last_of(PathSeparator)); + if (lastPathSeparator != std::string::npos) + { + h5RootName = userTag.substr(lastPathSeparator); + } + return h5RootName; + }; + + std::vector<std::string> h5Names; + h5Names.reserve(baseNames.size()); + + auto rootTag = lf_GetRootTag(userTag); + for (const auto &baseName : baseNames) + { + h5Names.push_back(lf_GetH5Name(baseName, rootTag, m_RankMPI)); + } + return h5Names; + + } + + + enum class ResizeResult + { + Failure, //!< FAILURE, caught a std::bad_alloc + Unchanged, //!< UNCHANGED, no need to resize (sufficient capacity) + Success, //!< SUCCESS, resize was successful + Flush //!< FLUSH, need to flush to transports for current variable + }; + + + template <class T> + ResizeResult ResizeBuffer(const Variable<T> &variable) + { std::cout<<"ResizeBuffer() Forcing Flush for now."<<std::endl; + return HDFSerialWriter::ResizeResult::Flush;}; + + capsule::STLVector m_HeapBuffer; + profiling::IOChrono m_Profiler; + + void InitParameters(const Params ¶meters) + { + std::cout<<"InitParameters(), empty for now. "<<std::endl; + }; + + // + // from H51Writer + // + + + std::string GetRankProfilingJSON( + const std::vector<std::string> +&transportsTypes, const std::vector<profiling::IOChrono *> &transportsProfilers) +noexcept + { + std::cout<<"GetRankProfilingJSON() returns empty string now "<<std::endl; + return ""; + } + + std::string + AggregateProfilingJSON(const std::string &rankProfilingJSON) noexcept + { + std::cout<<"AggregateProfilingJSON() to hdf5"<<std::endl; + return "agg.hd5"; + } + + void WriteProcessGroupIndex( + const std::string hostLanguage, + const std::vector<std::string> &transportsTypes) +noexcept + { + std::cout<<"WriteProcessGroupIndex() to hdf5"<<std::endl; + } + + void Flush() { + std::cout<<"Flush() out hdf5"<<std::endl; + } + + template <class T> + void WriteVariableMetadata(const Variable<T> &variable) noexcept + { + std::cout<<"WriteVariableMetadata() to hdf5"<<std::endl; + } + template <class T> + void WriteVariablePayload(const Variable<T> &variable) noexcept + { + std::cout<<"WriteVariablePayload() to hdf5"<<std::endl; + } + +}; +*/ diff --git a/source/adios2/engine/mixer/HDFMixerWriter.h b/source/adios2/engine/mixer/HDFMixerWriter.h new file mode 100644 index 000000000..c9c8ab1b8 --- /dev/null +++ b/source/adios2/engine/mixer/HDFMixerWriter.h @@ -0,0 +1,91 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * HDFMixer.h + * + * Created on: Aug 16 2017 + * Author: Junmin GU + */ + +#ifndef ADIOS2_ENGINE_BP_HDFSERIALWRITER_H_ +#define ADIOS2_ENGINE_BP_HDFSERIALWRITER_H_ + +#include "adios2/toolkit/interop/hdf5/HDF5Common.h" + +inline MPI_Datatype mpi_typeof(char) { return MPI_CHAR; } +inline MPI_Datatype mpi_typeof(signed short) { return MPI_SHORT; } +inline MPI_Datatype mpi_typeof(signed int) { return MPI_INT; } +inline MPI_Datatype mpi_typeof(signed long) { return MPI_LONG; } +inline MPI_Datatype mpi_typeof(unsigned char) { return MPI_UNSIGNED_CHAR; } +inline MPI_Datatype mpi_typeof(unsigned short) { return MPI_UNSIGNED_SHORT; } +inline MPI_Datatype mpi_typeof(unsigned) { return MPI_UNSIGNED; } +inline MPI_Datatype mpi_typeof(unsigned long) { return MPI_UNSIGNED_LONG; } +inline MPI_Datatype mpi_typeof(signed long long) { return MPI_LONG_LONG_INT; } +inline MPI_Datatype mpi_typeof(double) { return MPI_DOUBLE; } +inline MPI_Datatype mpi_typeof(long double) { return MPI_LONG_DOUBLE; } +inline MPI_Datatype mpi_typeof(std::pair<int, int>) { return MPI_2INT; } +inline MPI_Datatype mpi_typeof(std::pair<float, int>) { return MPI_FLOAT_INT; } +inline MPI_Datatype mpi_typeof(std::pair<double, int>) +{ + return MPI_DOUBLE_INT; +} +inline MPI_Datatype mpi_typeof(std::pair<long double, int>) +{ + return MPI_LONG_DOUBLE_INT; +} +inline MPI_Datatype mpi_typeof(std::pair<short, int>) { return MPI_SHORT_INT; } + +#define ADIOS_MPI_SIZE_T (mpi_typeof(size_t())) + +namespace adios2 +{ + +class HDFVDSWriter +{ +public: + HDFVDSWriter(MPI_Comm mpiComm, bool debugMode); + void Init(const std::string &name); + void AddVar(const VariableBase &var, hid_t h5Type); + void Advance(const float timeoutSeconds = 0.0); + void Close(const int transportIndex = -1); + + interop::HDF5Common m_VDSFile; + int m_Rank; + +private: + void GetVarInfo(const VariableBase &var, std::vector<hsize_t> &dimsf, + int nDim, std::vector<hsize_t> &start, + std::vector<hsize_t> &count, std::vector<hsize_t> &one); + + int m_NumSubFiles; + std::string m_FileName; + MPI_Comm m_MPISubfileComm; // only rank 0 in this comm can build VDS; +}; + +class HDFSerialWriter +{ +public: + HDFSerialWriter(MPI_Comm mpiComm, bool debugMode); + void Advance(const float timeoutSeconds = 0.0); + void Close(const int transportIndex = -1); + void Init(const std::string &name, int rank); + + static void StaticCreateName(std::string &pathName, std::string &rootName, + std::string &fullH5Name, + const std::string &input, int rank); + /** contains data buffer and position */ + // capsule::STLVector m_HeapBuffer; + + // int m_MPIRank; + interop::HDF5Common m_H5File; + std::string m_FileName; + +private: + MPI_Comm m_MPILocalComm; // all ranks in this comm write to the same file + const bool m_DebugMode = false; + int m_Rank; +}; +} // end namespace adios2 + +#endif // ADIOS2_ENGINE_BP_HDFSerialWriter diff --git a/source/adios2/toolkit/interop/hdf5/HDF5Common.cpp b/source/adios2/toolkit/interop/hdf5/HDF5Common.cpp index 1da60eda1..50561adee 100644 --- a/source/adios2/toolkit/interop/hdf5/HDF5Common.cpp +++ b/source/adios2/toolkit/interop/hdf5/HDF5Common.cpp @@ -54,7 +54,9 @@ void HDF5Common::Init(const std::string &name, MPI_Comm comm, bool toWrite) H5Pset_fapl_mpio(m_PropertyListId, comm, MPI_INFO_NULL); #endif - std::string ts0 = "/TimeStep0"; + // std::string ts0 = "/TimeStep0"; + std::string ts0; + StaticGetTimeStepString(ts0, 0); if (toWrite) { @@ -195,8 +197,10 @@ void HDF5Common::Advance() return; } - std::string timeStepName = - "/TimeStep" + std::to_string(m_CurrentTimeStep + 1); + // std::string timeStepName = + // "/TimeStep" + std::to_string(m_CurrentTimeStep + 1); + std::string timeStepName; + StaticGetTimeStepString(timeStepName, m_CurrentTimeStep + 1); m_GroupId = H5Gopen(m_FileId, timeStepName.c_str(), H5P_DEFAULT); if (m_GroupId < 0) { @@ -218,7 +222,10 @@ void HDF5Common::CheckWriteGroup() return; } - std::string timeStepName = "/TimeStep" + std::to_string(m_CurrentTimeStep); + // std::string timeStepName = "/TimeStep" + + // std::to_string(m_CurrentTimeStep); + std::string timeStepName; + StaticGetTimeStepString(timeStepName, m_CurrentTimeStep); m_GroupId = H5Gcreate2(m_FileId, timeStepName.c_str(), H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); @@ -232,6 +239,11 @@ void HDF5Common::CheckWriteGroup() } } +void HDF5Common::StaticGetTimeStepString(std::string &timeStepName, int ts) +{ + timeStepName = "/TimeStep" + std::to_string(ts); +} + #define declare_template_instantiation(T) \ template void HDF5Common::Write(Variable<T> &variable, const T *value); diff --git a/source/adios2/toolkit/interop/hdf5/HDF5Common.h b/source/adios2/toolkit/interop/hdf5/HDF5Common.h index 8aa53eaf6..56029726a 100644 --- a/source/adios2/toolkit/interop/hdf5/HDF5Common.h +++ b/source/adios2/toolkit/interop/hdf5/HDF5Common.h @@ -48,6 +48,8 @@ public: unsigned int GetNumTimeSteps(); void WriteTimeSteps(); + static void StaticGetTimeStepString(std::string &timeStepName, int ts); + hid_t m_PropertyListId = -1; hid_t m_FileId = -1; hid_t m_GroupId = -1; diff --git a/source/adios2/toolkit/interop/hdf5/HDF5Common.tcc b/source/adios2/toolkit/interop/hdf5/HDF5Common.tcc index e5555692b..14d55d435 100644 --- a/source/adios2/toolkit/interop/hdf5/HDF5Common.tcc +++ b/source/adios2/toolkit/interop/hdf5/HDF5Common.tcc @@ -12,7 +12,7 @@ #define ADIOS2_TOOLKIT_INTEROP_HDF5_HDF5COMMON_TCC_ #include "HDF5Common.h" - +#include <iostream> #include <vector> namespace adios2 @@ -25,6 +25,26 @@ void HDF5Common::Write(Variable<T> &variable, const T *values) { CheckWriteGroup(); int dimSize = std::max(variable.m_Shape.size(), variable.m_Count.size()); + hid_t h5Type = GetHDF5Type<T>(); + + if (dimSize == 0) + { + // write scalar + hid_t filespaceID = H5Screate(H5S_SCALAR); + hid_t dsetID = + H5Dcreate(m_GroupId, variable.m_Name.c_str(), h5Type, filespaceID, + H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); + hid_t plistID = H5Pcreate(H5P_DATASET_XFER); + H5Pset_dxpl_mpio(plistID, H5FD_MPIO_COLLECTIVE); + + herr_t status = + H5Dwrite(dsetID, h5Type, H5S_ALL, H5S_ALL, plistID, values); + + H5Sclose(filespaceID); + H5Dclose(dsetID); + + return; + } std::vector<hsize_t> dimsf, count, offset; @@ -60,7 +80,6 @@ void HDF5Common::Write(Variable<T> &variable, const T *values) hid_t fileSpace = H5Screate_simple(dimSize, dimsf.data(), NULL); - hid_t h5Type = GetHDF5Type<T>(); hid_t dsetID = H5Dcreate(m_GroupId, variable.m_Name.c_str(), h5Type, fileSpace, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); // H5Sclose(fileSpace); -- GitLab