/* * Distributed under the OSI-approved Apache License, Version 2.0. See * accompanying file Copyright.txt for details. * * BPFileWriter.cpp * * Created on: Dec 19, 2016 * Author: wfg */ #include "engine/bp/BPFileWriter.h" #include "ADIOS.h" // supported transports #include "transport/file/FStream.h" #include "transport/file/FileDescriptor.h" #include "transport/file/FilePointer.h" namespace adios { BPFileWriter::BPFileWriter(ADIOS &adios, const std::string name, const std::string accessMode, MPI_Comm mpiComm, const Method &method, const IOMode iomode, const float timeout_sec, const bool debugMode, const unsigned int nthreads) : Engine(adios, "BPFileWriter", name, accessMode, mpiComm, method, debugMode, nthreads, " BPFileWriter constructor (or call to ADIOS Open).\n"), m_Buffer{capsule::STLVector(accessMode, m_RankMPI, m_DebugMode)}, m_BP1Aggregator{format::BP1Aggregator(m_MPIComm, debugMode)}, m_MaxBufferSize{m_Buffer.m_Data.max_size()} { m_MetadataSet.TimeStep = 1; // starting at one to be compatible with ADIOS1.x Init(); } BPFileWriter::~BPFileWriter() {} void BPFileWriter::Init() { InitParameters(); InitTransports(); InitProcessGroup(); } void BPFileWriter::Write(Variable<char> &variable, const char *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(Variable<unsigned char> &variable, const unsigned char *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(Variable<short> &variable, const short *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(Variable<unsigned short> &variable, const unsigned short *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(Variable<int> &variable, const int *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(Variable<unsigned int> &variable, const unsigned int *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(Variable<long int> &variable, const long int *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(Variable<unsigned long int> &variable, const unsigned long int *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(Variable<long long int> &variable, const long long int *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(Variable<unsigned long long int> &variable, const unsigned long long int *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(Variable<float> &variable, const float *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(Variable<double> &variable, const double *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(Variable<long double> &variable, const long double *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(Variable<std::complex<float>> &variable, const std::complex<float> *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(Variable<std::complex<double>> &variable, const std::complex<double> *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(Variable<std::complex<long double>> &variable, const std::complex<long double> *values) { WriteVariableCommon(variable, values); } void BPFileWriter::Write(VariableCompound &variable, const void *values) {} // String version void BPFileWriter::Write(const std::string variableName, const char *values) { WriteVariableCommon(m_ADIOS.GetVariable<char>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const unsigned char *values) { WriteVariableCommon(m_ADIOS.GetVariable<unsigned char>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const short *values) { WriteVariableCommon(m_ADIOS.GetVariable<short>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const unsigned short *values) { WriteVariableCommon(m_ADIOS.GetVariable<unsigned short>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const int *values) { WriteVariableCommon(m_ADIOS.GetVariable<int>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const unsigned int *values) { WriteVariableCommon(m_ADIOS.GetVariable<unsigned int>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const long int *values) { WriteVariableCommon(m_ADIOS.GetVariable<long int>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const unsigned long int *values) { WriteVariableCommon(m_ADIOS.GetVariable<unsigned long int>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const long long int *values) { WriteVariableCommon(m_ADIOS.GetVariable<long long int>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const unsigned long long int *values) { WriteVariableCommon(m_ADIOS.GetVariable<unsigned long long int>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const float *values) { WriteVariableCommon(m_ADIOS.GetVariable<float>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const double *values) { WriteVariableCommon(m_ADIOS.GetVariable<double>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const long double *values) { WriteVariableCommon(m_ADIOS.GetVariable<long double>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const std::complex<float> *values) { WriteVariableCommon(m_ADIOS.GetVariable<std::complex<float>>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const std::complex<double> *values) { WriteVariableCommon(m_ADIOS.GetVariable<std::complex<double>>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const std::complex<long double> *values) { WriteVariableCommon( m_ADIOS.GetVariable<std::complex<long double>>(variableName), values); } void BPFileWriter::Write(const std::string variableName, const void *values) // Compound type { } void BPFileWriter::Advance(float timeout_sec) { m_BP1Writer.Advance(m_MetadataSet, m_Buffer); } void BPFileWriter::Close(const int transportIndex) { CheckTransportIndex(transportIndex); if (transportIndex == -1) { for (auto &transport : m_Transports) // by reference or value or it doesn't matter? m_BP1Writer.Close(m_MetadataSet, m_Buffer, *transport, m_IsFirstClose, false); // false: not using aggregation for now } else { m_BP1Writer.Close(m_MetadataSet, m_Buffer, *m_Transports[transportIndex], m_IsFirstClose, false); // false: not using aggregation for now } if (m_MetadataSet.Log.m_IsActive == true) { bool allClose = true; for (auto &transport : m_Transports) { if (transport->m_IsOpen == true) { allClose = false; break; } } if (allClose == true) // aggregate and write profiling.log { const std::string rankLog = m_BP1Writer.GetRankProfilingLog( m_RankMPI, m_MetadataSet, m_Transports); const std::string fileName(m_BP1Writer.GetDirectoryName(m_Name) + "/profiling.log"); m_BP1Aggregator.WriteProfilingLog(fileName, rankLog); } } } // PRIVATE FUNCTIONS void BPFileWriter::InitParameters() { auto itGrowthFactor = m_Method.m_Parameters.find("buffer_growth"); if (itGrowthFactor != m_Method.m_Parameters.end()) { const float growthFactor = std::stof(itGrowthFactor->second); if (m_DebugMode == true) { if (growthFactor == 1.f) throw std::invalid_argument( "ERROR: buffer_growth argument can't be less of equal than 1, in " + m_EndMessage + "\n"); } m_BP1Writer.m_GrowthFactor = growthFactor; m_GrowthFactor = growthFactor; // float } auto itMaxBufferSize = m_Method.m_Parameters.find("max_size_MB"); if (itMaxBufferSize != m_Method.m_Parameters.end()) { if (m_DebugMode == true) { if (m_GrowthFactor <= 1.f) throw std::invalid_argument("ERROR: Method buffer_growth argument " "can't be less of equal than 1, in " + m_EndMessage + "\n"); } m_MaxBufferSize = std::stoul(itMaxBufferSize->second) * 1048576; // convert from MB to bytes } auto itVerbosity = m_Method.m_Parameters.find("verbose"); if (itVerbosity != m_Method.m_Parameters.end()) { int verbosity = std::stoi(itVerbosity->second); if (m_DebugMode == true) { if (verbosity < 0 || verbosity > 5) throw std::invalid_argument("ERROR: Method verbose argument must be an " "integer in the range [0,5], in call to " "Open or Engine constructor\n"); } m_BP1Writer.m_Verbosity = verbosity; } auto itProfile = m_Method.m_Parameters.find("profile_units"); if (itProfile != m_Method.m_Parameters.end()) { auto &profiler = m_MetadataSet.Log; if (itProfile->second == "mus" || itProfile->second == "microseconds") profiler.m_Timers.emplace_back("buffering", Support::Resolutions::mus); else if (itProfile->second == "ms" || itProfile->second == "milliseconds") profiler.m_Timers.emplace_back("buffering", Support::Resolutions::ms); else if (itProfile->second == "s" || itProfile->second == "seconds") profiler.m_Timers.emplace_back("buffering", Support::Resolutions::s); else if (itProfile->second == "min" || itProfile->second == "minutes") profiler.m_Timers.emplace_back("buffering", Support::Resolutions::m); else if (itProfile->second == "h" || itProfile->second == "hours") profiler.m_Timers.emplace_back("buffering", Support::Resolutions::h); else { if (m_DebugMode == true) throw std::invalid_argument("ERROR: Method profile_buffering_units " "argument must be mus, ms, s, min or h, in " "call to Open or Engine constructor\n"); } profiler.m_IsActive = true; } } void BPFileWriter::InitTransports() { if (m_DebugMode == true) { if (TransportNamesUniqueness() == false) { throw std::invalid_argument( "ERROR: two transports of the same kind (e.g file IO) " "can't have the same name, modify with name= in Method " "AddTransport\n"); } } for (const auto ¶meters : m_Method.m_TransportParameters) { auto itProfile = parameters.find("profile_units"); bool doProfiling = false; Support::Resolutions resolution = Support::Resolutions::s; // default is seconds if (itProfile != parameters.end()) { if (itProfile->second == "mus" || itProfile->second == "microseconds") resolution = Support::Resolutions::mus; else if (itProfile->second == "ms" || itProfile->second == "milliseconds") resolution = Support::Resolutions::ms; else if (itProfile->second == "s" || itProfile->second == "seconds") resolution = Support::Resolutions::s; else if (itProfile->second == "min" || itProfile->second == "minutes") resolution = Support::Resolutions::m; else if (itProfile->second == "h" || itProfile->second == "hours") resolution = Support::Resolutions::h; else { if (m_DebugMode == true) throw std::invalid_argument("ERROR: Transport profile_units argument " "must be mus, ms, s, min or h " + m_EndMessage); } doProfiling = true; } auto itTransport = parameters.find("transport"); if (itTransport->second == "file" || itTransport->second == "File") { auto itLibrary = parameters.find("library"); if (itLibrary == parameters.end() || itLibrary->second == "POSIX") // use default POSIX { auto file = std::make_shared<transport::FileDescriptor>(m_MPIComm, m_DebugMode); if (doProfiling == true) file->InitProfiler(m_AccessMode, resolution); m_BP1Writer.OpenRankFiles(m_Name, m_AccessMode, *file); m_Transports.push_back(std::move(file)); } else if (itLibrary->second == "FILE*" || itLibrary->second == "stdio") { auto file = std::make_shared<transport::FilePointer>(m_MPIComm, m_DebugMode); if (doProfiling == true) file->InitProfiler(m_AccessMode, resolution); m_BP1Writer.OpenRankFiles(m_Name, m_AccessMode, *file); m_Transports.push_back(std::move(file)); } else if (itLibrary->second == "fstream" || itLibrary->second == "std::fstream") { auto file = std::make_shared<transport::FStream>(m_MPIComm, m_DebugMode); if (doProfiling == true) file->InitProfiler(m_AccessMode, resolution); m_BP1Writer.OpenRankFiles(m_Name, m_AccessMode, *file); m_Transports.push_back(std::move(file)); } else if (itLibrary->second == "MPI_File" || itLibrary->second == "MPI-IO") { } else { if (m_DebugMode == true) throw std::invalid_argument( "ERROR: file transport library " + itLibrary->second + " not supported, in " + m_Name + m_EndMessage); } } else { if (m_DebugMode == true) throw std::invalid_argument("ERROR: transport " + itTransport->second + " (you mean File?) not supported, in " + m_Name + m_EndMessage); } } } void BPFileWriter::InitProcessGroup() { if (m_MetadataSet.Log.m_IsActive == true) m_MetadataSet.Log.m_Timers[0].SetInitialTime(); if (m_AccessMode == "a") { // Get last pg timestep and update timestep counter in // format::BP1MetadataSet } WriteProcessGroupIndex(); if (m_MetadataSet.Log.m_IsActive == true) m_MetadataSet.Log.m_Timers[0].SetTime(); } void BPFileWriter::WriteProcessGroupIndex() { // pg = process group // const std::size_t pgIndexSize = m_BP1Writer.GetProcessGroupIndexSize( // std::to_string( m_RankMPI ), // std::to_string( // m_MetadataSet.TimeStep // ), // m_Transports.size() // ); // metadata // GrowBuffer( pgIndexSize, m_GrowthFactor, m_MetadataSet.PGIndex ); // data? Need to be careful, maybe add some trailing tolerance in variable // ???? // GrowBuffer( pgIndexSize, m_GrowthFactor, m_Buffer.m_Data ); const bool isFortran = (m_HostLanguage == "Fortran") ? true : false; m_BP1Writer.WriteProcessGroupIndex(isFortran, std::to_string(m_RankMPI), static_cast<std::uint32_t>(m_RankMPI), m_Transports, m_Buffer, m_MetadataSet); } } // end namespace adios