Newer
Older
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* BPFileWriter.cpp
*
* Created on: Dec 19, 2016
* Author: wfg
*/
#include "BPFileWriter.h"
#include <utility>
#include "adios2/ADIOS.h"
#include "adios2/transport/file/FStream.h"
#include "adios2/transport/file/FileDescriptor.h"
#include "adios2/transport/file/FilePointer.h"
BPFileWriter::BPFileWriter(ADIOS &adios, const std::string &name,
const std::string accessMode, MPI_Comm mpiComm,
const Method &method)
: Engine(adios, "BPFileWriter", name, accessMode, mpiComm, method,
" BPFileWriter constructor (or call to ADIOS Open).\n"),
m_Heap(m_DebugMode), m_BP1Aggregator(m_MPIComm, m_DebugMode),
m_MaxBufferSize(m_Heap.m_Data.max_size())
m_MetadataSet.TimeStep = 1; // to be compatible with ADIOS1.x
InitParameters();
InitTransports();
InitProcessGroup();
}
void BPFileWriter::Write(Variable<char> &variable, const char *values)
{
WriteVariableCommon(variable, values);
}
void BPFileWriter::Write(Variable<unsigned char> &variable,
const unsigned char *values)
{
WriteVariableCommon(variable, values);
}
void BPFileWriter::Write(Variable<short> &variable, const short *values)
{
WriteVariableCommon(variable, values);
}
void BPFileWriter::Write(Variable<unsigned short> &variable,
const unsigned short *values)
{
WriteVariableCommon(variable, values);
}
void BPFileWriter::Write(Variable<int> &variable, const int *values)
{
WriteVariableCommon(variable, values);
}
void BPFileWriter::Write(Variable<unsigned int> &variable,
const unsigned int *values)
{
WriteVariableCommon(variable, values);
}
void BPFileWriter::Write(Variable<long int> &variable, const long int *values)
{
WriteVariableCommon(variable, values);
}
void BPFileWriter::Write(Variable<unsigned long int> &variable,
const unsigned long int *values)
{
WriteVariableCommon(variable, values);
}
void BPFileWriter::Write(Variable<long long int> &variable,
const long long int *values)
{
WriteVariableCommon(variable, values);
}
void BPFileWriter::Write(Variable<unsigned long long int> &variable,
const unsigned long long int *values)
{
WriteVariableCommon(variable, values);
}
void BPFileWriter::Write(Variable<float> &variable, const float *values)
{
WriteVariableCommon(variable, values);
}
void BPFileWriter::Write(Variable<double> &variable, const double *values)
{
WriteVariableCommon(variable, values);
}
void BPFileWriter::Write(Variable<long double> &variable,
const long double *values)
{
WriteVariableCommon(variable, values);
}
void BPFileWriter::Write(Variable<std::complex<float>> &variable,
const std::complex<float> *values)
{
WriteVariableCommon(variable, values);
}
void BPFileWriter::Write(Variable<std::complex<double>> &variable,
const std::complex<double> *values)
{
WriteVariableCommon(variable, values);
}
void BPFileWriter::Write(Variable<std::complex<long double>> &variable,
const std::complex<long double> *values)
{
WriteVariableCommon(variable, values);
void BPFileWriter::Write(VariableCompound & /*variable*/,
const void * /*values*/)
{
}
void BPFileWriter::Write(const std::string &variableName, const char *values)
WriteVariableCommon(m_ADIOS.GetVariable<char>(variableName), values);
void BPFileWriter::Write(const std::string &variableName,
WriteVariableCommon(m_ADIOS.GetVariable<unsigned char>(variableName),
values);
void BPFileWriter::Write(const std::string &variableName, const short *values)
WriteVariableCommon(m_ADIOS.GetVariable<short>(variableName), values);
void BPFileWriter::Write(const std::string &variableName,
WriteVariableCommon(m_ADIOS.GetVariable<unsigned short>(variableName),
values);
void BPFileWriter::Write(const std::string &variableName, const int *values)
WriteVariableCommon(m_ADIOS.GetVariable<int>(variableName), values);
void BPFileWriter::Write(const std::string &variableName,
WriteVariableCommon(m_ADIOS.GetVariable<unsigned int>(variableName),
values);
void BPFileWriter::Write(const std::string &variableName,
const long int *values)
WriteVariableCommon(m_ADIOS.GetVariable<long int>(variableName), values);
void BPFileWriter::Write(const std::string &variableName,
WriteVariableCommon(m_ADIOS.GetVariable<unsigned long int>(variableName),
values);
void BPFileWriter::Write(const std::string &variableName,
WriteVariableCommon(m_ADIOS.GetVariable<long long int>(variableName),
values);
void BPFileWriter::Write(const std::string &variableName,
WriteVariableCommon(
m_ADIOS.GetVariable<unsigned long long int>(variableName), values);
void BPFileWriter::Write(const std::string &variableName, const float *values)
WriteVariableCommon(m_ADIOS.GetVariable<float>(variableName), values);
void BPFileWriter::Write(const std::string &variableName, const double *values)
WriteVariableCommon(m_ADIOS.GetVariable<double>(variableName), values);
void BPFileWriter::Write(const std::string &variableName,
WriteVariableCommon(m_ADIOS.GetVariable<long double>(variableName), values);
void BPFileWriter::Write(const std::string &variableName,
WriteVariableCommon(m_ADIOS.GetVariable<std::complex<float>>(variableName),
values);
void BPFileWriter::Write(const std::string &variableName,
WriteVariableCommon(m_ADIOS.GetVariable<std::complex<double>>(variableName),
values);
void BPFileWriter::Write(const std::string &variableName,
const std::complex<long double> *values)
{
WriteVariableCommon(
m_ADIOS.GetVariable<std::complex<long double>>(variableName), values);
void BPFileWriter::Write(const std::string & /*variableName*/,
const void * /*values*/) // Compound type
void BPFileWriter::Advance(float /*timeout_sec*/)
m_BP1Writer.Advance(m_MetadataSet, m_Heap);
}
void BPFileWriter::Close(const int transportIndex)
{
CheckTransportIndex(transportIndex);
if (transportIndex == -1)
{
for (auto &transport : m_Transports)
{ // by reference or value or it doesn't matter?
m_BP1Writer.Close(m_MetadataSet, m_Heap, *transport, m_IsFirstClose,
false); // false: not using aggregation for now
}
m_BP1Writer.Close(m_MetadataSet, m_Heap, *m_Transports[transportIndex],
m_IsFirstClose,
false); // false: not using aggregation for now
if (m_MetadataSet.Log.IsActive == true)
bool allClose = true;
for (auto &transport : m_Transports)
{
if (transport->m_IsOpen == true)
{
allClose = false;
break;
}
}
if (allClose == true) // aggregate and write profiling.log
{
const std::string rankLog = m_BP1Writer.GetRankProfilingLog(
m_RankMPI, m_MetadataSet, m_Transports);
const std::string fileName(m_BP1Writer.GetDirectoryName(m_Name) +
"/profiling.log");
m_BP1Aggregator.WriteProfilingLog(fileName, rankLog);
}
}
}
// PRIVATE FUNCTIONS
void BPFileWriter::InitParameters()
{
auto itGrowthFactor = m_Method.m_Parameters.find("buffer_growth");
if (itGrowthFactor != m_Method.m_Parameters.end())
const float growthFactor = std::stof(itGrowthFactor->second);
if (m_DebugMode == true)
{
if (growthFactor == 1.f)
{
throw std::invalid_argument("ERROR: buffer_growth argument "
"can't be less of equal than 1, "
"in " +
m_EndMessage + "\n");
}
}
m_BP1Writer.m_GrowthFactor = growthFactor;
m_GrowthFactor = growthFactor; // float
auto itMaxBufferSize = m_Method.m_Parameters.find("max_size_MB");
if (itMaxBufferSize != m_Method.m_Parameters.end())
if (m_DebugMode == true)
{
if (m_GrowthFactor <= 1.f)
{
throw std::invalid_argument(
"ERROR: Method buffer_growth argument "
"can't be less of equal than 1, in " +
m_EndMessage + "\n");
}
}
m_MaxBufferSize = std::stoul(itMaxBufferSize->second) *
1048576; // convert from MB to bytes
auto itVerbosity = m_Method.m_Parameters.find("verbose");
if (itVerbosity != m_Method.m_Parameters.end())
int verbosity = std::stoi(itVerbosity->second);
if (m_DebugMode == true)
{
if (verbosity < 0 || verbosity > 5)
{
throw std::invalid_argument(
"ERROR: Method verbose argument must be an "
"integer in the range [0,5], in call to "
"Open or Engine constructor\n");
}
}
m_BP1Writer.m_Verbosity = verbosity;
auto itProfile = m_Method.m_Parameters.find("profile_units");
if (itProfile != m_Method.m_Parameters.end())
auto &log = m_MetadataSet.Log;
if (itProfile->second == "mus" || itProfile->second == "microseconds")
{
log.Timers.emplace_back("buffering", Support::Resolutions::mus);
}
else if (itProfile->second == "ms" ||
itProfile->second == "milliseconds")
{
log.Timers.emplace_back("buffering", Support::Resolutions::ms);
}
else if (itProfile->second == "s" || itProfile->second == "seconds")
{
log.Timers.emplace_back("buffering", Support::Resolutions::s);
}
else if (itProfile->second == "min" || itProfile->second == "minutes")
{
log.Timers.emplace_back("buffering", Support::Resolutions::m);
}
else if (itProfile->second == "h" || itProfile->second == "hours")
{
log.Timers.emplace_back("buffering", Support::Resolutions::h);
}
else
{
if (m_DebugMode == true)
{
throw std::invalid_argument(
"ERROR: Method profile_buffering_units "
"argument must be mus, ms, s, min or h, in "
"call to Open or Engine constructor\n");
}
}
log.IsActive = true;
}
void BPFileWriter::InitTransports()
{
if (TransportNamesUniqueness() == false)
throw std::invalid_argument(
"ERROR: two transports of the same kind (e.g file IO) "
"can't have the same name, modify with name= in Method "
"AddTransport\n");
for (const auto ¶meters : m_Method.m_TransportParameters)
auto itProfile = parameters.find("profile_units");
bool doProfiling = false;
Support::Resolutions resolution =
Support::Resolutions::s; // default is seconds
if (itProfile != parameters.end())
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
if (itProfile->second == "mus" ||
itProfile->second == "microseconds")
{
resolution = Support::Resolutions::mus;
}
else if (itProfile->second == "ms" ||
itProfile->second == "milliseconds")
{
resolution = Support::Resolutions::ms;
}
else if (itProfile->second == "s" || itProfile->second == "seconds")
{
resolution = Support::Resolutions::s;
}
else if (itProfile->second == "min" ||
itProfile->second == "minutes")
{
resolution = Support::Resolutions::m;
}
else if (itProfile->second == "h" || itProfile->second == "hours")
{
resolution = Support::Resolutions::h;
}
else
{
if (m_DebugMode == true)
{
throw std::invalid_argument(
"ERROR: Transport profile_units argument "
"must be mus, ms, s, min or h " +
m_EndMessage);
}
}
doProfiling = true;
auto itTransport = parameters.find("transport");
if (itTransport->second == "file" || itTransport->second == "File")
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
auto itLibrary = parameters.find("library");
if (itLibrary == parameters.end() ||
itLibrary->second == "POSIX") // use default POSIX
{
auto file = std::make_shared<transport::FileDescriptor>(
m_MPIComm, m_DebugMode);
if (doProfiling == true)
{
file->InitProfiler(m_AccessMode, resolution);
}
m_BP1Writer.OpenRankFiles(m_Name, m_AccessMode, *file);
m_Transports.push_back(std::move(file));
}
else if (itLibrary->second == "FILE*" ||
itLibrary->second == "stdio")
{
auto file = std::make_shared<transport::FilePointer>(
m_MPIComm, m_DebugMode);
if (doProfiling == true)
{
file->InitProfiler(m_AccessMode, resolution);
}
m_BP1Writer.OpenRankFiles(m_Name, m_AccessMode, *file);
m_Transports.push_back(std::move(file));
}
else if (itLibrary->second == "fstream" ||
itLibrary->second == "std::fstream")
{
auto file = std::make_shared<transport::FStream>(m_MPIComm,
m_DebugMode);
if (doProfiling == true)
{
file->InitProfiler(m_AccessMode, resolution);
}
m_BP1Writer.OpenRankFiles(m_Name, m_AccessMode, *file);
m_Transports.push_back(std::move(file));
}
else if (itLibrary->second == "MPI_File" ||
itLibrary->second == "MPI-IO")
{
}
else
{
if (m_DebugMode == true)
{
throw std::invalid_argument(
"ERROR: file transport library " + itLibrary->second +
" not supported, in " + m_Name + m_EndMessage);
}
}
if (m_DebugMode == true)
{
throw std::invalid_argument(
"ERROR: transport " + itTransport->second +
" (you mean File?) not supported, in " + m_Name +
m_EndMessage);
}
}
}
void BPFileWriter::InitProcessGroup()
{
if (m_MetadataSet.Log.IsActive == true)
m_MetadataSet.Log.Timers[0].SetInitialTime();
if (m_AccessMode == "a")
{
// Get last pg timestep and update timestep counter in
// format::BP1MetadataSet
}
if (m_MetadataSet.Log.IsActive == true)
m_MetadataSet.Log.Timers[0].SetTime();
}
void BPFileWriter::WriteProcessGroupIndex()
{
// pg = process group
// const std::size_t pgIndexSize = m_BP1Writer.GetProcessGroupIndexSize(
// std::to_string( m_RankMPI ),
// std::to_string(
// m_MetadataSet.TimeStep
// ),
// m_Transports.size()
// );
// metadata
// GrowBuffer( pgIndexSize, m_GrowthFactor, m_MetadataSet.PGIndex );
// data? Need to be careful, maybe add some trailing tolerance in variable
// ????
// GrowBuffer( pgIndexSize, m_GrowthFactor, m_Buffer.m_Data );
const bool isFortran = (m_HostLanguage == "Fortran") ? true : false;
m_BP1Writer.WriteProcessGroupIndex(isFortran, std::to_string(m_RankMPI),
static_cast<std::uint32_t>(m_RankMPI),
m_Transports, m_Heap, m_MetadataSet);