-
William F Godoy authored
Moved to sync functions Added exception in EndStep if Perform Puts/Gets is missing
William F Godoy authoredMoved to sync functions Added exception in EndStep if Perform Puts/Gets is missing
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
BPFileWriter.cpp 6.70 KiB
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* BPFileWriter.cpp
*
* Created on: Dec 19, 2016
* Author: William F Godoy godoywf@ornl.gov
*/
#include "BPFileWriter.h"
#include "BPFileWriter.tcc"
#include <iostream>
#include "adios2/ADIOSMPI.h"
#include "adios2/ADIOSMacros.h"
#include "adios2/core/IO.h"
#include "adios2/helper/adiosFunctions.h" //CheckIndexRange
#include "adios2/toolkit/transport/file/FileFStream.h"
namespace adios2
{
BPFileWriter::BPFileWriter(IO &io, const std::string &name, const Mode mode,
MPI_Comm mpiComm)
: Engine("BPFileWriter", io, name, mode, mpiComm),
m_BP3Serializer(mpiComm, m_DebugMode),
m_FileDataManager(mpiComm, m_DebugMode),
m_FileMetadataManager(mpiComm, m_DebugMode)
{
m_EndMessage = " in call to IO Open BPFileWriter " + m_Name + "\n";
Init();
}
BPFileWriter::~BPFileWriter() = default;
StepStatus BPFileWriter::BeginStep(StepMode mode, const float timeoutSeconds)
{
m_BP3Serializer.m_DeferredVariables.clear();
m_BP3Serializer.m_DeferredVariablesDataSize = 0;
return StepStatus::OK;
}
void BPFileWriter::PerformPuts()
{
m_BP3Serializer.AllocateDeferredSize();
for (const auto &variableName : m_BP3Serializer.m_DeferredVariables)
{
PutSync(variableName);
}
m_BP3Serializer.m_DeferredVariables.clear();
}
void BPFileWriter::EndStep()
{
if (m_DebugMode && m_BP3Serializer.m_DeferredVariables.size() > 0)
{
throw std::invalid_argument("ERROR: existing variables subscribed with "
"PutDeferred, did you forget to call "
"PerformPuts()?, in call to EndStep\n");
}
m_BP3Serializer.SerializeData(m_IO, true); // true: advances step
}
// PRIVATE
void BPFileWriter::Init()
{
InitParameters();
InitTransports();
InitBPBuffer();
}
#define declare_type(T) \
void BPFileWriter::DoPutSync(Variable<T> &variable, const T *values) \
{ \
PutSyncCommon(variable, values); \
} \
void BPFileWriter::DoPutDeferred(Variable<T> &variable, const T *values) \
{ \
PutDeferredCommon(variable, values); \
} \
void BPFileWriter::DoPutDeferred(Variable<T> &, const T &value) {}
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
void BPFileWriter::Close(const int transportIndex)
{
// close bp buffer by serializing data and metadata
m_BP3Serializer.CloseData(m_IO);
// send data to corresponding transports
m_FileDataManager.WriteFiles(m_BP3Serializer.m_Data.m_Buffer.data(),
m_BP3Serializer.m_Data.m_Position,
transportIndex);
m_FileDataManager.CloseFiles(transportIndex);
if (m_BP3Serializer.m_Profiler.IsActive &&
m_FileDataManager.AllTransportsClosed())
{
WriteProfilingJSONFile();
}
if (m_BP3Serializer.m_CollectiveMetadata &&
m_FileDataManager.AllTransportsClosed())
{
WriteCollectiveMetadataFile();
}
}
// PRIVATE FUNCTIONS
void BPFileWriter::InitParameters()
{
m_BP3Serializer.InitParameters(m_IO.m_Parameters);
}
void BPFileWriter::InitTransports()
{
// TODO need to add support for aggregators here later
if (m_IO.m_TransportsParameters.empty())
{
Params defaultTransportParameters;
defaultTransportParameters["transport"] = "File";
m_IO.m_TransportsParameters.push_back(defaultTransportParameters);
}
// Names passed to IO AddTransport option with key "Name"
const std::vector<std::string> transportsNames =
m_FileDataManager.GetFilesBaseNames(m_Name,
m_IO.m_TransportsParameters);
// /path/name.bp.dir/name.bp.rank
const std::vector<std::string> bpRankNames =
m_BP3Serializer.GetBPRankNames(transportsNames);
m_FileDataManager.OpenFiles(bpRankNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP3Serializer.m_Profiler.IsActive);
}
void BPFileWriter::InitBPBuffer()
{
if (m_OpenMode == Mode::Append)
{
throw std::invalid_argument(
"ADIOS2: OpenMode Append hasn't been implemented, yet");
// TODO: Get last pg timestep and update timestep counter in
}
else
{
m_BP3Serializer.PutProcessGroupIndex(
m_IO.m_HostLanguage, m_FileDataManager.GetTransportsTypes());
}
}
void BPFileWriter::WriteProfilingJSONFile()
{
auto transportTypes = m_FileDataManager.GetTransportsTypes();
auto transportProfilers = m_FileDataManager.GetTransportsProfilers();
const std::string lineJSON(m_BP3Serializer.GetRankProfilingJSON(
transportTypes, transportProfilers) +
",\n");
const std::vector<char> profilingJSON(
m_BP3Serializer.AggregateProfilingJSON(lineJSON));
if (m_BP3Serializer.m_RankMPI == 0)
{
transport::FileFStream profilingJSONStream(m_MPIComm, m_DebugMode);
auto bpBaseNames = m_BP3Serializer.GetBPBaseNames({m_Name});
profilingJSONStream.Open(bpBaseNames[0] + "/profiling.json",
Mode::Write);
profilingJSONStream.Write(profilingJSON.data(), profilingJSON.size());
profilingJSONStream.Close();
}
}
void BPFileWriter::WriteCollectiveMetadataFile()
{
m_BP3Serializer.AggregateCollectiveMetadata();
if (m_BP3Serializer.m_RankMPI == 0)
{
// first init metadata files
const std::vector<std::string> transportsNames =
m_FileMetadataManager.GetFilesBaseNames(
m_Name, m_IO.m_TransportsParameters);
const std::vector<std::string> bpMetadataFileNames =
m_BP3Serializer.GetBPMetadataFileNames(transportsNames);
m_FileMetadataManager.OpenFiles(bpMetadataFileNames, m_OpenMode,
m_IO.m_TransportsParameters,
m_BP3Serializer.m_Profiler.IsActive);
const auto &buffer = m_BP3Serializer.m_Metadata.m_Buffer;
const size_t size = m_BP3Serializer.m_Metadata.m_AbsolutePosition;
m_FileMetadataManager.WriteFiles(buffer.data(), size);
m_FileMetadataManager.CloseFiles();
}
}
} // end namespace adios2