Skip to content
Snippets Groups Projects
BPFileWriter.cpp 4.56 KiB
Newer Older
 * Distributed under the OSI-approved Apache License, Version 2.0.  See
 * accompanying file Copyright.txt for details.
 *
 * BPFileWriter.cpp
 *
 *  Created on: Dec 19, 2016
 *      Author: William F Godoy godoywf@ornl.gov
#include "BPFileWriter.tcc"
#include "adios2/ADIOSMPI.h"
#include "adios2/core/IO.h"
#include "adios2/helper/adiosFunctions.h" //CheckIndexRange
#include "adios2/toolkit/transport/file/FileStream.h"
namespace adios2
BPFileWriter::BPFileWriter(IO &io, const std::string &name,
                           const OpenMode openMode, MPI_Comm mpiComm)
: Engine("BPFileWriter", io, name, openMode, mpiComm),
  m_BP1Writer(mpiComm, m_DebugMode), m_TransportsManager(mpiComm, m_DebugMode)
    m_EndMessage = " in call to IO Open BPFileWriter " + m_Name + "\n";
BPFileWriter::~BPFileWriter() = default;

void BPFileWriter::Init()
{
    InitParameters();
    InitTransports();
    InitBPBuffer();
#define declare_type(T)                                                        \
    void BPFileWriter::DoWrite(Variable<T> &variable, const T *values)         \
    {                                                                          \
        DoWriteCommon(variable, values);                                       \
    }
ADIOS2_FOREACH_TYPE_1ARG(declare_type)
#undef declare_type
void BPFileWriter::Advance(const float /*timeout_sec*/)
    m_BP1Writer.Advance();
}

void BPFileWriter::Close(const int transportIndex)
{
William F Godoy's avatar
William F Godoy committed
    if (m_DebugMode)
        if (!m_TransportsManager.CheckTransportIndex(transportIndex))
            auto transportsSize = m_TransportsManager.m_Transports.size();
            throw std::invalid_argument(
                "ERROR: transport index " + std::to_string(transportIndex) +
                " outside range, -1 (default) to " +
                std::to_string(transportsSize - 1) + ", in call to Close\n");
    // close bp buffer by flattening data and metadata
    m_BP1Writer.Close();
    // send data to corresponding transports
    m_TransportsManager.WriteFiles(m_BP1Writer.m_HeapBuffer.GetData(),
                                   m_BP1Writer.m_HeapBuffer.m_DataPosition,
                                   transportIndex);

    m_TransportsManager.CloseFiles(transportIndex);
    if (m_BP1Writer.m_Profiler.IsActive &&
        m_TransportsManager.AllTransportsClosed())
        WriteProfilingJSONFile();
    }
}

// PRIVATE FUNCTIONS
void BPFileWriter::InitParameters()
{
    m_BP1Writer.InitParameters(m_IO.m_Parameters);
}

void BPFileWriter::InitTransports()
{
    // TODO need to add support for aggregators here later
    if (m_IO.m_TransportsParameters.empty())
        Params defaultTransportParameters;
William F Godoy's avatar
William F Godoy committed
        defaultTransportParameters["transport"] = "File";
        m_IO.m_TransportsParameters.push_back(defaultTransportParameters);
    // Names are std::vector<std::string>
    auto transportsNames = m_TransportsManager.GetFilesBaseNames(
        m_Name, m_IO.m_TransportsParameters);
    auto bpBaseNames = m_BP1Writer.GetBPBaseNames(transportsNames);
    auto bpNames = m_BP1Writer.GetBPNames(transportsNames);
    m_TransportsManager.OpenFiles(bpBaseNames, bpNames, m_OpenMode,
                                  m_IO.m_TransportsParameters,
                                  m_BP1Writer.m_Profiler.IsActive);
void BPFileWriter::InitBPBuffer()
William F Godoy's avatar
William F Godoy committed
    if (m_OpenMode == OpenMode::Append)
        throw std::invalid_argument(
            "ADIOS2: OpenMode Append hasn't been implemented, yet");
        // TODO: Get last pg timestep and update timestep counter in
        m_BP1Writer.WriteProcessGroupIndex(
            m_IO.m_HostLanguage, m_TransportsManager.GetTransportsTypes());
void BPFileWriter::WriteProfilingJSONFile()
{
    auto transportTypes = m_TransportsManager.GetTransportsTypes();
    auto transportProfilers = m_TransportsManager.GetTransportsProfilers();

    const std::string lineJSON(
        m_BP1Writer.GetRankProfilingJSON(transportTypes, transportProfilers));

    const std::string profilingJSON(
        m_BP1Writer.AggregateProfilingJSON(lineJSON));

    if (m_BP1Writer.m_BP1Aggregator.m_RankMPI == 0)
    {
        transport::FileStream profilingJSONStream(m_MPIComm, m_DebugMode);
        auto bpBaseNames = m_BP1Writer.GetBPBaseNames({m_Name});
        profilingJSONStream.Open(bpBaseNames[0] + "/profiling.json",
                                 OpenMode::Write);
        profilingJSONStream.Write(profilingJSON.c_str(), profilingJSON.size());
        profilingJSONStream.Close();
    }
}

} // end namespace adios