-
William F Godoy authoredWilliam F Godoy authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
HDFMixer.cpp 5.46 KiB
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* HDFMixer.cpp
*
* Created on: Dec 19, 2016
* Author: Junmin GU
*/
#include "HDFMixer.h"
#include "HDFMixer.tcc"
#include "adios2/ADIOSMPI.h"
#include "adios2/core/IO.h"
#include "adios2/helper/adiosFunctions.h" //CheckIndexRange
#include "adios2/toolkit/transport/file/FileStream.h"
namespace adios2
{
HDFMixer::HDFMixer(IO &io, const std::string &name, const Mode mode,
MPI_Comm mpiComm)
: Engine("HDFMixer", io, name, openMode, mpiComm),
m_HDFVDSWriter(mpiComm, m_DebugMode),
m_HDFSerialWriter(MPI_COMM_SELF, m_DebugMode),
m_TransportsManager(mpiComm, m_DebugMode)
{
m_EndMessage = " in call to IO Open HDFMixer " + m_Name + "\n";
Init();
}
HDFMixer::~HDFMixer() = default;
void HDFMixer::Init()
{
InitParameters();
InitTransports();
InitBuffer();
}
#define declare_type(T) \
void HDFMixer::DoWrite(Variable<T> &variable, const T *values) \
{ \
DoWriteCommon(variable, values); \
}
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
void HDFMixer::Advance(const float /*timeout_sec*/)
{
m_HDFSerialWriter.Advance();
m_HDFVDSWriter.Advance();
}
void HDFMixer::Close(const int transportIndex)
{
if (m_DebugMode)
{
if (!m_TransportsManager.CheckTransportIndex(transportIndex))
{
auto transportsSize = m_TransportsManager.m_Transports.size();
throw std::invalid_argument(
"ERROR: transport index " + std::to_string(transportIndex) +
" outside range, -1 (default) to " +
std::to_string(transportsSize - 1) + ", in call to Close\n");
}
}
// close bp buffer by flattening data and metadata
m_HDFSerialWriter.Close();
m_HDFVDSWriter.Close();
// send data to corresponding transports
/*
m_TransportsManager.WriteFiles(m_HDFSerialWriter.m_HeapBuffer.GetData(),
m_HDFSerialWriter.m_HeapBuffer.m_DataPosition,
transportIndex);
m_TransportsManager.CloseFiles(transportIndex);
*/
/*
// do profiling later
if (m_HDFSerialWriter.m_Profiler.IsActive &&
m_TransportsManager.AllTransportsClosed())
{
WriteProfilingJSONFile();
}
*/
}
// PRIVATE FUNCTIONS
void HDFMixer::InitParameters()
{
// no need for hdf5
// m_HDFSerialWriter.InitParameters(m_IO.m_Parameters);
}
void HDFMixer::InitTransports()
{
if (m_IO.m_TransportsParameters.empty())
{
Params defaultTransportParameters;
defaultTransportParameters["transport"] = "File";
m_IO.m_TransportsParameters.push_back(defaultTransportParameters);
}
#ifdef NEVER
/*
// TODO need to add support for aggregators here later
// Names are std::vector<std::string>
auto transportsNames = m_TransportsManager.GetFilesBaseNames(
m_Name, m_IO.m_TransportsParameters);
auto bpBaseNames = m_HDFSerialWriter.GetBPBaseNames(transportsNames);
auto bpNames = m_HDFSerialWriter.GetBPNames(transportsNames);
m_TransportsManager.OpenFiles(bpBaseNames, bpNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_HDFSerialWriter.m_Profiler.IsActive);
*/
#else
int rank;
MPI_Comm_rank(m_MPIComm, &rank);
m_HDFSerialWriter.Init(m_Name, rank);
m_HDFVDSWriter.Init(m_Name);
/*
auto transportsNames = m_TransportsManager.GetFilesBaseNames(
m_Name,
m_IO.m_TransportsParameters); auto baseNames =
m_HDFSerialWriter.GetBaseNames(transportsNames);
auto h5name = m_HDFSerialWriter.GetLocalFileNames(baseNames, m_Name); // e.g.
base=/my/path/xy_1.h5 m_TransportsManager.OpenFiles(baseNames, h5name,
m_OpenMode, m_IO.m_TransportsParameters, m_HDFSerialWriter.m_Profiler.IsActive);
*/
#endif
}
void HDFMixer::InitBuffer()
{
/*
if (m_OpenMode == OpenMode::Append)
{
throw std::invalid_argument(
"ADIOS2: OpenMode Append hasn't been implemented, yet");
// TODO: Get last pg timestep and update timestep counter in
}
else
{
m_HDFSerialWriter.WriteProcessGroupIndex(
m_IO.m_HostLanguage,
m_TransportsManager.GetTransportsTypes());
}
*/
}
void HDFMixer::WriteProfilingJSONFile() { /*
auto transportTypes = m_TransportsManager.GetTransportsTypes();
auto transportProfilers = m_TransportsManager.GetTransportsProfilers();
const std::string lineJSON(
m_HDFSerialWriter.GetRankProfilingJSON(transportTypes,
transportProfilers));
const std::string profilingJSON(
m_HDFSerialWriter.AggregateProfilingJSON(lineJSON));
//if (m_HDFSerialWriter.m_BP1Aggregator.m_RankMPI == 0)
if (m_HDFSerialWriter.m_MPIRank == 0)
{
transport::FileStream profilingJSONStream(m_MPIComm, m_DebugMode);
auto baseNames = m_HDFSerialWriter.GetBaseNames({m_Name});
profilingJSONStream.Open(baseNames[0] + "/profiling.json",
OpenMode::Write);
profilingJSONStream.Write(profilingJSON.c_str(), profilingJSON.size());
profilingJSONStream.Close();
}
*/}
} // end namespace adios