Skip to content
Snippets Groups Projects
Commit 2567b854 authored by guj's avatar guj
Browse files

added VDS support via Engine (HDF5Mixer)

parent acc392ad
No related branches found
No related tags found
1 merge request!278added VDS support via Engine (HDF5Mixer)
...@@ -66,5 +66,17 @@ if(ADIOS2_HAVE_MPI) ...@@ -66,5 +66,17 @@ if(ADIOS2_HAVE_MPI)
target_link_libraries(heatTransfer_write_a2h5 target_link_libraries(heatTransfer_write_a2h5
adios2 MPI::MPI_C ${CMAKE_THREAD_LIBS_INIT} adios2 MPI::MPI_C ${CMAKE_THREAD_LIBS_INIT}
) )
add_executable(heatTransfer_write_h5mixer
main.cpp
HeatTransfer.cpp
Settings.cpp
IO_h5mixer.cpp
)
target_link_libraries(heatTransfer_write_h5mixer
adios2 MPI::MPI_C ${CMAKE_THREAD_LIBS_INIT}
)
endif() endif()
endif() endif()
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* IO_ADIOS2.cpp
*
* Created on: Feb 2017
* Author: Norbert Podhorszki
*/
#include "IO.h"
#include <string>
#include <adios2.h>
#define str_helper(X) #X
#define str(X) str_helper(X)
//#ifndef DEFAULT_CONFIG
//#define DEFAULT_CONFIG config.xml
//#endif
#define DEFAULT_CONFIG mix.xml
#define DEFAULT_CONFIG_STR str(DEFAULT_CONFIG)
static int rank_saved;
adios2::ADIOS *ad = nullptr;
std::shared_ptr<adios2::Engine> h5mixerWriter;
adios2::Variable<double> *varT = nullptr;
adios2::Variable<unsigned int> *varGndx = nullptr;
IO::IO(const Settings &s, MPI_Comm comm)
{
rank_saved = s.rank;
// m_outputfilename = s.outputfile + ".h5";
m_outputfilename = s.outputfile;
/*ad = new adios2::ADIOS(std::string(DEFAULT_CONFIG_STR), comm,
adios2::DebugON);
*/
ad = new adios2::ADIOS(comm, adios2::DebugOFF);
// Define method for engine creation
adios2::IO &h5io = ad->DeclareIO("output");
if (!h5io.InConfigFile())
{
// if not defined by user, we can change the default settings
// BPFileWriter is the default engine
// Allow an extra thread for data processing
// ISO-POSIX file is the default transport
// Passing parameters to the transport
h5io.SetEngine("HDFMixer");
}
varGndx = &h5io.DefineVariable<unsigned int>("gndx");
h5io.DefineVariable<unsigned int>("gndy");
// define T as 2D global array
varT = &h5io.DefineVariable<double>(
"T",
// Global dimensions
{s.gndx, s.gndy},
// starting offset of the local array in the global space
{s.offsx, s.offsy},
// local size, could be defined later using SetSelection()
{s.ndx, s.ndy});
// add transform to variable
// adios2::Transform tr = adios2::transform::BZIP2( );
// varT.AddTransform( tr, "" );
// varT.AddTransform( tr,"accuracy=0.001" ); // for ZFP
h5mixerWriter = h5io.Open(m_outputfilename, adios2::OpenMode::Write, comm);
if (!h5mixerWriter)
{
throw std::ios_base::failure(
"ERROR: failed to open ADIOS h5mixerWriter\n");
}
}
IO::~IO()
{
h5mixerWriter->Close();
delete ad;
}
void IO::write(int step, const HeatTransfer &ht, const Settings &s,
MPI_Comm comm)
{
#if 1
/* This selection is redundant and not required, since we defined
* the selection already in DefineVariable(). It is here just as an example.
*/
// Make a selection to describe the local dimensions of the variable we
// write and its offsets in the global spaces. This could have been done in
// adios.DefineVariable()
adios2::SelectionBoundingBox sel({s.offsx, s.offsy}, {s.ndx, s.ndy});
varT->SetSelection(sel);
/* Select the area that we want to write from the data pointer we pass to
the
writer.
Think HDF5 memspace, just not hyperslabs, only a bounding box selection.
Engine will copy this bounding box from the data pointer into the output
buffer.
Size of the bounding box should match the "space" selection which was
given
above.
Default memspace is always the full selection.
*/
adios2::SelectionBoundingBox memspace =
adios2::SelectionBoundingBox({1, 1}, {s.ndx, s.ndy});
varT->SetMemorySelection(memspace);
h5mixerWriter->Write<unsigned int>(*varGndx, s.gndx);
h5mixerWriter->Write<unsigned int>("gndy", s.gndy);
h5mixerWriter->Write<double>(*varT, ht.data_noghost().data());
h5mixerWriter->Advance();
#else
h5mixerWriter->Write<double>(*varT, ht.data_noghost().data());
h5mixerWriter->Advance();
#endif
}
...@@ -117,6 +117,9 @@ if(ADIOS2_HAVE_HDF5) ...@@ -117,6 +117,9 @@ if(ADIOS2_HAVE_HDF5)
target_sources(adios2 PRIVATE target_sources(adios2 PRIVATE
engine/hdf5/HDF5ReaderP.cpp engine/hdf5/HDF5ReaderP.cpp
engine/hdf5/HDF5WriterP.cpp engine/hdf5/HDF5WriterP.cpp
engine/mixer/HDFMixer.cpp
engine/mixer/HDFMixer.tcc
engine/mixer/HDFMixerWriter.cpp
toolkit/interop/hdf5/HDF5Common.cpp toolkit/interop/hdf5/HDF5Common.tcc toolkit/interop/hdf5/HDF5Common.cpp toolkit/interop/hdf5/HDF5Common.tcc
) )
target_link_libraries(adios2 PRIVATE ${HDF5_C_LIBRARIES}) target_link_libraries(adios2 PRIVATE ${HDF5_C_LIBRARIES})
......
...@@ -29,6 +29,7 @@ ...@@ -29,6 +29,7 @@
#ifdef ADIOS2_HAVE_HDF5 // external dependencies #ifdef ADIOS2_HAVE_HDF5 // external dependencies
#include "adios2/engine/hdf5/HDF5ReaderP.h" #include "adios2/engine/hdf5/HDF5ReaderP.h"
#include "adios2/engine/hdf5/HDF5WriterP.h" #include "adios2/engine/hdf5/HDF5WriterP.h"
#include "adios2/engine/mixer/HDFMixer.h"
#endif #endif
namespace adios2 namespace adios2
...@@ -229,6 +230,15 @@ std::shared_ptr<Engine> IO::Open(const std::string &name, ...@@ -229,6 +230,15 @@ std::shared_ptr<Engine> IO::Open(const std::string &name,
// engine = std::make_shared<BPFileReader>(*this, name, openMode, // engine = std::make_shared<BPFileReader>(*this, name, openMode,
// mpiComm); // mpiComm);
} }
else if (m_EngineType == "HDFMixer")
{
#ifdef ADIOS2_HAVE_HDF5
engine = std::make_shared<HDFMixer>(*this, name, openMode, mpiComm);
#else
throw std::invalid_argument("ERROR: this version didn't compile with "
"HDF5 library, can't use HDF5\n");
#endif
}
else if (m_EngineType == "DataManWriter") else if (m_EngineType == "DataManWriter")
{ {
#ifdef ADIOS2_HAVE_DATAMAN #ifdef ADIOS2_HAVE_DATAMAN
......
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* HDFMixer.cpp
*
* Created on: Dec 19, 2016
* Author: Junmin GU
*/
#include "HDFMixer.h"
#include "HDFMixer.tcc"
#include "adios2/ADIOSMPI.h"
#include "adios2/core/IO.h"
#include "adios2/helper/adiosFunctions.h" //CheckIndexRange
#include "adios2/toolkit/transport/file/FileStream.h"
namespace adios2
{
HDFMixer::HDFMixer(IO &io, const std::string &name, const OpenMode openMode,
MPI_Comm mpiComm)
: Engine("HDFMixer", io, name, openMode, mpiComm),
m_HDFVDSWriter(mpiComm, m_DebugMode),
m_HDFSerialWriter(MPI_COMM_SELF, m_DebugMode),
m_TransportsManager(mpiComm, m_DebugMode)
{
m_EndMessage = " in call to IO Open HDFMixer " + m_Name + "\n";
Init();
}
HDFMixer::~HDFMixer() = default;
void HDFMixer::Init()
{
InitParameters();
InitTransports();
InitBuffer();
}
#define declare_type(T) \
void HDFMixer::DoWrite(Variable<T> &variable, const T *values) \
{ \
DoWriteCommon(variable, values); \
}
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
void HDFMixer::Advance(const float /*timeout_sec*/)
{
m_HDFSerialWriter.Advance();
m_HDFVDSWriter.Advance();
}
void HDFMixer::Close(const int transportIndex)
{
if (m_DebugMode)
{
if (!m_TransportsManager.CheckTransportIndex(transportIndex))
{
auto transportsSize = m_TransportsManager.m_Transports.size();
throw std::invalid_argument(
"ERROR: transport index " + std::to_string(transportIndex) +
" outside range, -1 (default) to " +
std::to_string(transportsSize - 1) + ", in call to Close\n");
}
}
// close bp buffer by flattening data and metadata
m_HDFSerialWriter.Close();
m_HDFVDSWriter.Close();
// send data to corresponding transports
/*
m_TransportsManager.WriteFiles(m_HDFSerialWriter.m_HeapBuffer.GetData(),
m_HDFSerialWriter.m_HeapBuffer.m_DataPosition,
transportIndex);
m_TransportsManager.CloseFiles(transportIndex);
*/
/*
// do profiling later
if (m_HDFSerialWriter.m_Profiler.IsActive &&
m_TransportsManager.AllTransportsClosed())
{
WriteProfilingJSONFile();
}
*/
}
// PRIVATE FUNCTIONS
void HDFMixer::InitParameters()
{
// no need for hdf5
// m_HDFSerialWriter.InitParameters(m_IO.m_Parameters);
}
void HDFMixer::InitTransports()
{
if (m_IO.m_TransportsParameters.empty())
{
Params defaultTransportParameters;
defaultTransportParameters["transport"] = "File";
m_IO.m_TransportsParameters.push_back(defaultTransportParameters);
}
#ifdef NEVER
/*
// TODO need to add support for aggregators here later
// Names are std::vector<std::string>
auto transportsNames = m_TransportsManager.GetFilesBaseNames(
m_Name, m_IO.m_TransportsParameters);
auto bpBaseNames = m_HDFSerialWriter.GetBPBaseNames(transportsNames);
auto bpNames = m_HDFSerialWriter.GetBPNames(transportsNames);
m_TransportsManager.OpenFiles(bpBaseNames, bpNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_HDFSerialWriter.m_Profiler.IsActive);
*/
#else
int rank;
MPI_Comm_rank(m_MPIComm, &rank);
m_HDFSerialWriter.Init(m_Name, rank);
m_HDFVDSWriter.Init(m_Name);
/*
auto transportsNames = m_TransportsManager.GetFilesBaseNames(
m_Name,
m_IO.m_TransportsParameters); auto baseNames =
m_HDFSerialWriter.GetBaseNames(transportsNames);
auto h5name = m_HDFSerialWriter.GetLocalFileNames(baseNames, m_Name); // e.g.
base=/my/path/xy_1.h5 m_TransportsManager.OpenFiles(baseNames, h5name,
m_OpenMode, m_IO.m_TransportsParameters, m_HDFSerialWriter.m_Profiler.IsActive);
*/
#endif
}
void HDFMixer::InitBuffer()
{
/*
if (m_OpenMode == OpenMode::Append)
{
throw std::invalid_argument(
"ADIOS2: OpenMode Append hasn't been implemented, yet");
// TODO: Get last pg timestep and update timestep counter in
}
else
{
m_HDFSerialWriter.WriteProcessGroupIndex(
m_IO.m_HostLanguage,
m_TransportsManager.GetTransportsTypes());
}
*/
}
void HDFMixer::WriteProfilingJSONFile()
{ /*
auto transportTypes = m_TransportsManager.GetTransportsTypes();
auto transportProfilers = m_TransportsManager.GetTransportsProfilers();
const std::string lineJSON(
m_HDFSerialWriter.GetRankProfilingJSON(transportTypes,
transportProfilers));
const std::string profilingJSON(
m_HDFSerialWriter.AggregateProfilingJSON(lineJSON));
//if (m_HDFSerialWriter.m_BP1Aggregator.m_RankMPI == 0)
if (m_HDFSerialWriter.m_MPIRank == 0)
{
transport::FileStream profilingJSONStream(m_MPIComm, m_DebugMode);
auto baseNames = m_HDFSerialWriter.GetBaseNames({m_Name});
profilingJSONStream.Open(baseNames[0] + "/profiling.json",
OpenMode::Write);
profilingJSONStream.Write(profilingJSON.c_str(), profilingJSON.size());
profilingJSONStream.Close();
}
*/}
} // end namespace adios
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* HDFMixer.h
*
* Created on: Aug 16 2017
* Author: Junmin GU
*/
#ifndef ADIOS2_ENGINE_H5_HDFMIXER_H_
#define ADIOS2_ENGINE_H5_HDFMIXER_H_
#include "adios2/ADIOSConfig.h"
#include "adios2/core/Engine.h"
//#include "adios2/toolkit/format/bp1/BP1Writer.h" //format::BP1Writer
/// \cond EXCLUDE_FROM_DOXYGEN
#include <algorithm> //std::count, std::copy, std::for_each
#include <cmath> //std::ceil
#include <cstring> //std::memcpy
/// \endcond
#include "HDFMixerWriter.h"
#include "adios2/ADIOSConfig.h"
#include "adios2/ADIOSMacros.h"
#include "adios2/ADIOSTypes.h"
#include "adios2/core/Variable.h"
#include "adios2/helper/adiosFunctions.h"
#include "adios2/toolkit/capsule/heap/STLVector.h"
#include "adios2/toolkit/transportman/TransportMan.h" //transport::TransportsMan
#include <iostream>
namespace adios2
{
class HDFMixer : public Engine
{
public:
/**
* Constructor for file Writer in H5 format
* @param name unique name given to the engine
* @param openMode w (supported), r, a from OpenMode in ADIOSTypes.h
* @param mpiComm MPI communicator
*/
HDFMixer(IO &io, const std::string &name, const OpenMode openMode,
MPI_Comm mpiComm);
~HDFMixer();
void Advance(const float timeoutSeconds = 0.0) final;
/**
* Closes a single transport or all transports
* @param transportIndex, if -1 (default) closes all transports, otherwise
* it
* closes a transport in m_Transport[transportIndex]. In debug mode the
* latter
* is bounds-checked.
*/
void Close(const int transportIndex = -1) final;
void CreateName(std::string &pathName, std::string &rootName,
std::string &fullH5Name, int rank);
private:
/** Single object controlling H5 buffering */
// format::H51Writer m_H51Writer;
HDFSerialWriter m_HDFSerialWriter;
HDFVDSWriter m_HDFVDSWriter;
/** single object controlling a vector of Transports from IO AddTransport */
transportman::TransportMan m_TransportsManager;
/** true: due to buffer overflow, move to transports manager */
bool m_DoTransportFlush = false;
void Init() final;
/** Parses parameters from IO SetParameters */
void InitParameters() final;
/** Parses transports and parameters from IO AddTransport */
void InitTransports() final;
void InitBuffer();
#define declare_type(T) \
void DoWrite(Variable<T> &variable, const T *values) final;
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
/**
* Common function for primitive (including std::complex) writes
* @param variable
* @param values
*/
template <class T>
void DoWriteCommon(Variable<T> &variable, const T *values);
/** Write a profiling.json file from m_H51Writer and m_TransportsManager
* profilers*/
void WriteProfilingJSONFile();
};
} // end namespace adios
#endif /* ADIOS2_ENGINE_H5_HDFMIXER_H_ */
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* HDFMixer.tcc implementation of template functions with known type
*
* Created on: Aug 16 2017
* Author: Junmin GU
*/
#include "HDFMixer.h"
namespace adios2
{
template <class T>
void HDFMixer::DoWriteCommon(Variable<T> &variable, const T *values)
{
variable.m_AppValues = values;
m_WrittenVariables.insert(variable.m_Name);
Variable<T> local(variable.m_Name, {}, {}, variable.m_Count,
variable.m_Count.size(), false);
// m_HDFSerialWriter.m_H5File.Write(variable, values);
// writes only the m_Count() part
int nDims = std::max(variable.m_Shape.size(), variable.m_Count.size());
if (nDims == 0)
{
// this is scalar
if (m_HDFVDSWriter.m_Rank == 0)
{
m_HDFVDSWriter.m_VDSFile.Write(local, values);
}
}
else
{
m_HDFSerialWriter.m_H5File.Write(local, values);
// std::cout<<" ==> "<< variable.m_Name<<std::endl;
// BuildVDS(AddExtension(m_Name, ".h5"), variable,
// m_HDFSerialWriter.m_H5File.GetHDF5Type<T>());
m_HDFVDSWriter.AddVar(variable,
m_HDFSerialWriter.m_H5File.GetHDF5Type<T>());
}
}
} // end namespace adios
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* HDFMixer.h
*
* Created on: Aug 16 2017
* Author: Junmin GU
*/
#include <iostream>
#include "HDFMixerWriter.h"
#include "adios2/helper/adiosFunctions.h"
//
// class HDFSerialWriter
//
adios2::HDFVDSWriter::HDFVDSWriter(MPI_Comm mpiComm, bool debugMode = false)
: m_MPISubfileComm(mpiComm), m_VDSFile(debugMode), m_Rank(-1)
{
MPI_Comm_size(m_MPISubfileComm, &m_NumSubFiles);
MPI_Comm_rank(m_MPISubfileComm, &m_Rank);
}
void adios2::HDFVDSWriter::Init(const std::string &name)
{
if (m_Rank > 0)
{
return;
}
//
// VDS can only operate on one process. So let rank = 0 handle it
//
std::string h5Name = AddExtension(name, ".h5");
m_VDSFile.Init(h5Name, MPI_COMM_SELF, true);
// m_FileName = h5Name;
m_FileName = name;
}
void adios2::HDFVDSWriter::GetVarInfo(const VariableBase &var,
std::vector<hsize_t> &dimsf, int nDims,
std::vector<hsize_t> &start,
std::vector<hsize_t> &count,
std::vector<hsize_t> &one)
{ // interop::HDF5Common summaryFile(true);
// std::vector<hsize_t> dimsf, start, one, count;
// int nDims = std::max(var.m_Shape.size(), var.m_Count.size());
for (int i = 0; i < nDims; i++)
{
if (var.m_Shape.size() > 0)
{
dimsf.push_back(var.m_Shape[i]);
}
else
{
dimsf.push_back(var.m_Count[i]);
}
if (var.m_Start.size() > 0)
{
start.push_back(var.m_Start[i]);
}
else
{
start.push_back(0);
}
if (var.m_Count.size() > 0)
{
count.push_back(var.m_Count[i]);
}
else if (var.m_Shape.size() > 0)
{
count.push_back(var.m_Shape[i]);
}
else
{
count.push_back(0);
}
one.push_back(1);
}
}
void adios2::HDFVDSWriter::AddVar(const VariableBase &var, hid_t h5Type)
{
hid_t space;
/* Create VDS dataspace. */
int nDims = std::max(var.m_Shape.size(), var.m_Count.size());
if (nDims == 0)
{
if (m_Rank == 0)
{
/*
std::cout<<" will deal with scalar later?"<<var.m_Name<<std::endl;
hid_t filespaceID = H5Screate(H5S_SCALAR);
hid_t dsetID = H5Dcreate(m_VDSFile.m_GroupId, var.m_Name.c_str(),
h5Type, filespaceID, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
//hid_t plistID = H5Pcreate(H5P_DATASET_XFER);
//H5Pset_dxpl_mpio(plistID, H5FD_MPIO_COLLECTIVE);
herr_t status = H5Dwrite(dsetID, h5Type, H5S_ALL, H5S_ALL,
H5P_DEFAULT, values);
//herr_t status = H5Dwrite(dsetID, h5Type, H5S_ALL, H5S_ALL,
plistID, values);
H5Sclose(filespaceID);
H5Dclose(dsetID);
*/
}
return; //
}
/* Initialize hyperslab values. */
size_t all_starts[m_NumSubFiles][nDims];
size_t all_counts[m_NumSubFiles][nDims];
//
std::vector<hsize_t> dimsf, start, one, count;
GetVarInfo(var, dimsf, nDims, start, count, one);
//
MPI_Gather(start.data(), nDims, ADIOS_MPI_SIZE_T, all_starts, nDims,
ADIOS_MPI_SIZE_T, 0, m_MPISubfileComm);
MPI_Gather(count.data(), nDims, ADIOS_MPI_SIZE_T, all_counts, nDims,
ADIOS_MPI_SIZE_T, 0, m_MPISubfileComm);
herr_t status;
if (m_Rank == 0)
{
m_VDSFile.CheckWriteGroup();
/* Set VDS creation property. */
hid_t dcpl = H5Pcreate(H5P_DATASET_CREATE);
// status = H5Pset_fill_value(dcpl, ADIOS_MPI_SIZE_T, 0);
/*
std::cout<<"nDims = "<<nDims<<" h5type="<<h5Type<<"
filename="<<m_FileName<<std::endl; for (int i=0; i<m_NumSubFiles; i++) {
std::cout<<((size_t*)all_starts)[i*nDims]<<",
"<<((size_t*)all_starts)[i*nDims+1]<<" :: ";
std::cout<<((size_t*)all_counts)[i*nDims]<<",
"<<((size_t*)all_counts)[i*nDims+1]<<std::endl;
}
std::cout<<"num subfiles="<<m_NumSubFiles<<std::endl;
*/
space = H5Screate_simple(nDims, dimsf.data(), NULL);
// summaryFile.Init(fileName.c_str(), MPI_COMM_SELF, true);
hsize_t currCount[nDims], currStart[nDims];
// std::string subfileVarName="TimeStep0/"+var.m_Name; // need full
// path? NEED TO GET the RIGHT SUBFILE VAR NAME RELATED to TIMESTEP!!
std::string subfileVarName;
interop::HDF5Common::StaticGetTimeStepString(
subfileVarName, m_VDSFile.m_CurrentTimeStep);
subfileVarName += "/" + var.m_Name;
for (int i = 0; i < m_NumSubFiles; i++)
{
for (int j = 0; j < nDims; j++)
{
currCount[j] = all_counts[i][j];
currStart[j] = all_starts[i][j];
// std::cout<<i<<"th: subfile, "<<j<<"th dirmention:
// count:"<<currCount[j] <<" start:"<<currStart[j]<<std::endl;
}
hid_t src_space = H5Screate_simple(
nDims, currCount,
NULL); // with factor=1, we do not flatten the data
status = H5Sselect_hyperslab(space, H5S_SELECT_SET, currStart, NULL,
one.data(), currCount);
std::string path, root, subfileName;
HDFSerialWriter::StaticCreateName(
path, root, subfileName, m_FileName,
i); // for each core, get the subfile name
// std::cout<<" subfileName="<<subfileName<<",
// var="<<subfileVarName<<std::endl;
status = H5Pset_virtual(dcpl, space, subfileName.c_str(),
subfileVarName.c_str(), src_space);
status = H5Sclose(src_space);
}
/* Create a virtual dataset. */
// hid_t dset = H5Dcreate2 (m_VDSFile.m_FileId, subfileVarName.c_str(),
// h5Type, space, H5P_DEFAULT, dcpl, H5P_DEFAULT);
hid_t dset = H5Dcreate2(m_VDSFile.m_GroupId, var.m_Name.c_str(), h5Type,
space, H5P_DEFAULT, dcpl, H5P_DEFAULT);
status = H5Sclose(space);
status = H5Dclose(dset);
status = H5Pclose(dcpl);
}
// m_VDSFile.Close();
MPI_Barrier(m_MPISubfileComm);
}
void adios2::HDFVDSWriter::Advance(const float timeoutSeconds)
{
if (m_Rank > 0)
{
return;
}
m_VDSFile.Advance();
}
void adios2::HDFVDSWriter::Close(const int transportIndex)
{
if (m_Rank > 0)
{
return;
}
m_VDSFile.Close();
}
//
// class HDFSerialWriter
//
adios2::HDFSerialWriter::HDFSerialWriter(MPI_Comm mpiComm,
const bool debugMode = false)
: m_MPILocalComm(mpiComm), m_DebugMode(debugMode), m_H5File(debugMode)
{
}
void adios2::HDFSerialWriter::Advance(const float timeoutSeconds)
{
m_H5File.Advance();
}
void adios2::HDFSerialWriter::Close(const int transportIndex)
{
m_H5File.Close();
};
void adios2::HDFSerialWriter::StaticCreateName(std::string &pathName,
std::string &rootName,
std::string &fullH5Name,
const std::string &input,
int rank)
{
auto lf_GetBaseName = [](const std::string &name) -> std::string {
const std::string baseName(AddExtension(name, ".h5") + ".dir");
return baseName;
};
auto lf_GetRootTag = [](const std::string &userTag) -> std::string {
std::string h5RootName = userTag;
const auto lastPathSeparator(userTag.find_last_of(PathSeparator));
if (lastPathSeparator != std::string::npos)
{
h5RootName = userTag.substr(lastPathSeparator);
}
return h5RootName;
};
pathName = lf_GetBaseName(input);
rootName = lf_GetRootTag(input);
fullH5Name =
(pathName + "/" + rootName + "_" + std::to_string(rank) + ".h5");
}
void adios2::HDFSerialWriter::Init(const std::string &name, int rank)
{
/*
auto lf_GetBaseName = [](const std::string &name) -> std::string {
const std::string baseName(AddExtension(name, ".h5") + ".dir");
return baseName;
};
auto lf_GetRootTag = [] (const std::string &userTag) -> std::string {
std::string h5RootName = userTag;
const auto lastPathSeparator(userTag.find_last_of(PathSeparator));
if (lastPathSeparator != std::string::npos)
{
h5RootName = userTag.substr(lastPathSeparator);
}
return h5RootName;
};
std::string baseName=lf_GetBaseName(name);
auto rootTag = lf_GetRootTag(name);
const std::string h5Name(baseName + "/" +
rootTag+"_"+std::to_string(rank)+".h5");
*/
std::string baseName, rootTag, h5Name;
StaticCreateName(baseName, rootTag, h5Name, name, rank);
// std::cout<<"rank="<<rank<<" name="<<h5Name<<std::endl;
CreateDirectory(baseName);
m_H5File.Init(h5Name, m_MPILocalComm, true);
m_FileName = h5Name;
m_Rank = rank;
// m_H5File.Init(h5Name, m_, true);
}
/*
std::vector<std::string>
GetBaseNames(const std::vector<std::string> &names) const noexcept
{
auto lf_GetBaseName = [](const std::string &name) -> std::string {
const std::string baseName(AddExtension(name, ".h5") + ".dir");
return baseName;
};
std::vector<std::string> baseNames;
baseNames.reserve(names.size());
for (const auto &name : names)
{
baseNames.push_back(lf_GetBaseName(name));
}
return baseNames;
}
std::vector<std::string>
GetLocalFileNames(const std::vector<std::string> &baseNames,
const std::string &userTag) const noexcept
{
// e.g. /some/where/xy.h5.dir
// e.g. xy
auto lf_GetH5Name = [](const std::string &baseName,
const std::string &userTag,
const int rank) -> std::string {
#ifdef NEVER
const std::string h5BaseName = AddExtension(baseName, ".h5");
std::string h5RootName = h5BaseName;
const auto lastPathSeparator(h5BaseName.find_last_of(PathSeparator));
if (lastPathSeparator != std::string::npos)
{
h5RootName = h5BaseName.substr(lastPathSeparator);
}
const std::string h5Name(h5BaseName + ".dir/" + h5RootName + "." +
std::to_string(rank));
#else
const std::string h5Name(baseName + "/" +
userTag+"_"+std::to_string(rank)+".h5"); #endif return h5Name;
};
auto lf_GetRootTag = [] (const std::string &userTag) -> std::string {
std::string h5RootName = userTag;
const auto lastPathSeparator(userTag.find_last_of(PathSeparator));
if (lastPathSeparator != std::string::npos)
{
h5RootName = userTag.substr(lastPathSeparator);
}
return h5RootName;
};
std::vector<std::string> h5Names;
h5Names.reserve(baseNames.size());
auto rootTag = lf_GetRootTag(userTag);
for (const auto &baseName : baseNames)
{
h5Names.push_back(lf_GetH5Name(baseName, rootTag, m_RankMPI));
}
return h5Names;
}
enum class ResizeResult
{
Failure, //!< FAILURE, caught a std::bad_alloc
Unchanged, //!< UNCHANGED, no need to resize (sufficient capacity)
Success, //!< SUCCESS, resize was successful
Flush //!< FLUSH, need to flush to transports for current variable
};
template <class T>
ResizeResult ResizeBuffer(const Variable<T> &variable)
{ std::cout<<"ResizeBuffer() Forcing Flush for now."<<std::endl;
return HDFSerialWriter::ResizeResult::Flush;};
capsule::STLVector m_HeapBuffer;
profiling::IOChrono m_Profiler;
void InitParameters(const Params &parameters)
{
std::cout<<"InitParameters(), empty for now. "<<std::endl;
};
//
// from H51Writer
//
std::string GetRankProfilingJSON(
const std::vector<std::string>
&transportsTypes, const std::vector<profiling::IOChrono *> &transportsProfilers)
noexcept
{
std::cout<<"GetRankProfilingJSON() returns empty string now "<<std::endl;
return "";
}
std::string
AggregateProfilingJSON(const std::string &rankProfilingJSON) noexcept
{
std::cout<<"AggregateProfilingJSON() to hdf5"<<std::endl;
return "agg.hd5";
}
void WriteProcessGroupIndex(
const std::string hostLanguage,
const std::vector<std::string> &transportsTypes)
noexcept
{
std::cout<<"WriteProcessGroupIndex() to hdf5"<<std::endl;
}
void Flush() {
std::cout<<"Flush() out hdf5"<<std::endl;
}
template <class T>
void WriteVariableMetadata(const Variable<T> &variable) noexcept
{
std::cout<<"WriteVariableMetadata() to hdf5"<<std::endl;
}
template <class T>
void WriteVariablePayload(const Variable<T> &variable) noexcept
{
std::cout<<"WriteVariablePayload() to hdf5"<<std::endl;
}
};
*/
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* HDFMixer.h
*
* Created on: Aug 16 2017
* Author: Junmin GU
*/
#ifndef ADIOS2_ENGINE_BP_HDFSERIALWRITER_H_
#define ADIOS2_ENGINE_BP_HDFSERIALWRITER_H_
#include "adios2/toolkit/interop/hdf5/HDF5Common.h"
inline MPI_Datatype mpi_typeof(char) { return MPI_CHAR; }
inline MPI_Datatype mpi_typeof(signed short) { return MPI_SHORT; }
inline MPI_Datatype mpi_typeof(signed int) { return MPI_INT; }
inline MPI_Datatype mpi_typeof(signed long) { return MPI_LONG; }
inline MPI_Datatype mpi_typeof(unsigned char) { return MPI_UNSIGNED_CHAR; }
inline MPI_Datatype mpi_typeof(unsigned short) { return MPI_UNSIGNED_SHORT; }
inline MPI_Datatype mpi_typeof(unsigned) { return MPI_UNSIGNED; }
inline MPI_Datatype mpi_typeof(unsigned long) { return MPI_UNSIGNED_LONG; }
inline MPI_Datatype mpi_typeof(signed long long) { return MPI_LONG_LONG_INT; }
inline MPI_Datatype mpi_typeof(double) { return MPI_DOUBLE; }
inline MPI_Datatype mpi_typeof(long double) { return MPI_LONG_DOUBLE; }
inline MPI_Datatype mpi_typeof(std::pair<int, int>) { return MPI_2INT; }
inline MPI_Datatype mpi_typeof(std::pair<float, int>) { return MPI_FLOAT_INT; }
inline MPI_Datatype mpi_typeof(std::pair<double, int>)
{
return MPI_DOUBLE_INT;
}
inline MPI_Datatype mpi_typeof(std::pair<long double, int>)
{
return MPI_LONG_DOUBLE_INT;
}
inline MPI_Datatype mpi_typeof(std::pair<short, int>) { return MPI_SHORT_INT; }
#define ADIOS_MPI_SIZE_T (mpi_typeof(size_t()))
namespace adios2
{
class HDFVDSWriter
{
public:
HDFVDSWriter(MPI_Comm mpiComm, bool debugMode);
void Init(const std::string &name);
void AddVar(const VariableBase &var, hid_t h5Type);
void Advance(const float timeoutSeconds = 0.0);
void Close(const int transportIndex = -1);
interop::HDF5Common m_VDSFile;
int m_Rank;
private:
void GetVarInfo(const VariableBase &var, std::vector<hsize_t> &dimsf,
int nDim, std::vector<hsize_t> &start,
std::vector<hsize_t> &count, std::vector<hsize_t> &one);
int m_NumSubFiles;
std::string m_FileName;
MPI_Comm m_MPISubfileComm; // only rank 0 in this comm can build VDS;
};
class HDFSerialWriter
{
public:
HDFSerialWriter(MPI_Comm mpiComm, bool debugMode);
void Advance(const float timeoutSeconds = 0.0);
void Close(const int transportIndex = -1);
void Init(const std::string &name, int rank);
static void StaticCreateName(std::string &pathName, std::string &rootName,
std::string &fullH5Name,
const std::string &input, int rank);
/** contains data buffer and position */
// capsule::STLVector m_HeapBuffer;
// int m_MPIRank;
interop::HDF5Common m_H5File;
std::string m_FileName;
private:
MPI_Comm m_MPILocalComm; // all ranks in this comm write to the same file
const bool m_DebugMode = false;
int m_Rank;
};
} // end namespace adios2
#endif // ADIOS2_ENGINE_BP_HDFSerialWriter
...@@ -54,7 +54,9 @@ void HDF5Common::Init(const std::string &name, MPI_Comm comm, bool toWrite) ...@@ -54,7 +54,9 @@ void HDF5Common::Init(const std::string &name, MPI_Comm comm, bool toWrite)
H5Pset_fapl_mpio(m_PropertyListId, comm, MPI_INFO_NULL); H5Pset_fapl_mpio(m_PropertyListId, comm, MPI_INFO_NULL);
#endif #endif
std::string ts0 = "/TimeStep0"; // std::string ts0 = "/TimeStep0";
std::string ts0;
StaticGetTimeStepString(ts0, 0);
if (toWrite) if (toWrite)
{ {
...@@ -195,8 +197,10 @@ void HDF5Common::Advance() ...@@ -195,8 +197,10 @@ void HDF5Common::Advance()
return; return;
} }
std::string timeStepName = // std::string timeStepName =
"/TimeStep" + std::to_string(m_CurrentTimeStep + 1); // "/TimeStep" + std::to_string(m_CurrentTimeStep + 1);
std::string timeStepName;
StaticGetTimeStepString(timeStepName, m_CurrentTimeStep + 1);
m_GroupId = H5Gopen(m_FileId, timeStepName.c_str(), H5P_DEFAULT); m_GroupId = H5Gopen(m_FileId, timeStepName.c_str(), H5P_DEFAULT);
if (m_GroupId < 0) if (m_GroupId < 0)
{ {
...@@ -218,7 +222,10 @@ void HDF5Common::CheckWriteGroup() ...@@ -218,7 +222,10 @@ void HDF5Common::CheckWriteGroup()
return; return;
} }
std::string timeStepName = "/TimeStep" + std::to_string(m_CurrentTimeStep); // std::string timeStepName = "/TimeStep" +
// std::to_string(m_CurrentTimeStep);
std::string timeStepName;
StaticGetTimeStepString(timeStepName, m_CurrentTimeStep);
m_GroupId = H5Gcreate2(m_FileId, timeStepName.c_str(), H5P_DEFAULT, m_GroupId = H5Gcreate2(m_FileId, timeStepName.c_str(), H5P_DEFAULT,
H5P_DEFAULT, H5P_DEFAULT); H5P_DEFAULT, H5P_DEFAULT);
...@@ -232,6 +239,11 @@ void HDF5Common::CheckWriteGroup() ...@@ -232,6 +239,11 @@ void HDF5Common::CheckWriteGroup()
} }
} }
void HDF5Common::StaticGetTimeStepString(std::string &timeStepName, int ts)
{
timeStepName = "/TimeStep" + std::to_string(ts);
}
#define declare_template_instantiation(T) \ #define declare_template_instantiation(T) \
template void HDF5Common::Write(Variable<T> &variable, const T *value); template void HDF5Common::Write(Variable<T> &variable, const T *value);
......
...@@ -48,6 +48,8 @@ public: ...@@ -48,6 +48,8 @@ public:
unsigned int GetNumTimeSteps(); unsigned int GetNumTimeSteps();
void WriteTimeSteps(); void WriteTimeSteps();
static void StaticGetTimeStepString(std::string &timeStepName, int ts);
hid_t m_PropertyListId = -1; hid_t m_PropertyListId = -1;
hid_t m_FileId = -1; hid_t m_FileId = -1;
hid_t m_GroupId = -1; hid_t m_GroupId = -1;
......
...@@ -12,7 +12,7 @@ ...@@ -12,7 +12,7 @@
#define ADIOS2_TOOLKIT_INTEROP_HDF5_HDF5COMMON_TCC_ #define ADIOS2_TOOLKIT_INTEROP_HDF5_HDF5COMMON_TCC_
#include "HDF5Common.h" #include "HDF5Common.h"
#include <iostream>
#include <vector> #include <vector>
namespace adios2 namespace adios2
...@@ -25,6 +25,26 @@ void HDF5Common::Write(Variable<T> &variable, const T *values) ...@@ -25,6 +25,26 @@ void HDF5Common::Write(Variable<T> &variable, const T *values)
{ {
CheckWriteGroup(); CheckWriteGroup();
int dimSize = std::max(variable.m_Shape.size(), variable.m_Count.size()); int dimSize = std::max(variable.m_Shape.size(), variable.m_Count.size());
hid_t h5Type = GetHDF5Type<T>();
if (dimSize == 0)
{
// write scalar
hid_t filespaceID = H5Screate(H5S_SCALAR);
hid_t dsetID =
H5Dcreate(m_GroupId, variable.m_Name.c_str(), h5Type, filespaceID,
H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
hid_t plistID = H5Pcreate(H5P_DATASET_XFER);
H5Pset_dxpl_mpio(plistID, H5FD_MPIO_COLLECTIVE);
herr_t status =
H5Dwrite(dsetID, h5Type, H5S_ALL, H5S_ALL, plistID, values);
H5Sclose(filespaceID);
H5Dclose(dsetID);
return;
}
std::vector<hsize_t> dimsf, count, offset; std::vector<hsize_t> dimsf, count, offset;
...@@ -60,7 +80,6 @@ void HDF5Common::Write(Variable<T> &variable, const T *values) ...@@ -60,7 +80,6 @@ void HDF5Common::Write(Variable<T> &variable, const T *values)
hid_t fileSpace = H5Screate_simple(dimSize, dimsf.data(), NULL); hid_t fileSpace = H5Screate_simple(dimSize, dimsf.data(), NULL);
hid_t h5Type = GetHDF5Type<T>();
hid_t dsetID = H5Dcreate(m_GroupId, variable.m_Name.c_str(), h5Type, hid_t dsetID = H5Dcreate(m_GroupId, variable.m_Name.c_str(), h5Type,
fileSpace, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT); fileSpace, H5P_DEFAULT, H5P_DEFAULT, H5P_DEFAULT);
// H5Sclose(fileSpace); // H5Sclose(fileSpace);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment