Skip to content
Snippets Groups Projects
BP1Writer.cpp 16.3 KiB
Newer Older
 * Distributed under the OSI-approved Apache License, Version 2.0.  See
 * accompanying file Copyright.txt for details.
 *
 * BP1Writer.cpp
 *
 *  Created on: Feb 1, 2017
 *      Author: William F Godoy godoywf@ornl.gov
#include <string>
#include <vector>

namespace adios2
{
namespace format
{

BP1Writer::BP1Writer(MPI_Comm mpiComm, const bool debugMode)
: BP1Base(mpiComm, debugMode)
{
}

void BP1Writer::WriteProcessGroupIndex(
    const std::string hostLanguage,
    const std::vector<std::string> &transportsTypes) noexcept
William F Godoy's avatar
William F Godoy committed
    if (m_Profiler.IsActive)
    {
        m_Profiler.Timers.at("buffering").Resume();
    }

    std::vector<char> &metadataBuffer = m_MetadataSet.PGIndex.Buffer;
    std::vector<char> &dataBuffer = m_HeapBuffer.m_Data;
    size_t &dataPosition = m_HeapBuffer.m_DataPosition;

    m_MetadataSet.DataPGLengthPosition = dataPosition;
    dataPosition += 8; // skip pg length (8)
wfg's avatar
wfg committed

    const std::size_t metadataPGLengthPosition = metadataBuffer.size();
    metadataBuffer.insert(metadataBuffer.end(), 2, 0); // skip pg length (2)

    // write name to metadata
    const std::string name(std::to_string(m_BP1Aggregator.m_RankMPI));

wfg's avatar
wfg committed
    WriteNameRecord(name, metadataBuffer);
    // write if host language Fortran in metadata and data
    const char hostFortran = (hostLanguage == "Fortran") ? 'y' : 'n';
    InsertToBuffer(metadataBuffer, &hostFortran);
    CopyToBuffer(dataBuffer, dataPosition, &hostFortran);
wfg's avatar
wfg committed
    // write name in data
    WriteNameRecord(name, dataBuffer, dataPosition);
wfg's avatar
wfg committed

    // processID in metadata,
    const uint32_t processID = static_cast<uint32_t>(m_BP1Aggregator.m_RankMPI);
    InsertToBuffer(metadataBuffer, &processID);
wfg's avatar
wfg committed
    // skip coordination var in data ....what is coordination var?
    dataPosition += 4;
wfg's avatar
wfg committed

    // time step name to metadata and data
    const std::string timeStepName(std::to_string(m_MetadataSet.TimeStep));
wfg's avatar
wfg committed
    WriteNameRecord(timeStepName, metadataBuffer);
    WriteNameRecord(timeStepName, dataBuffer, dataPosition);
wfg's avatar
wfg committed

    // time step to metadata and data
    InsertToBuffer(metadataBuffer, &m_MetadataSet.TimeStep);
    CopyToBuffer(dataBuffer, dataPosition, &m_MetadataSet.TimeStep);
wfg's avatar
wfg committed

    // offset to pg in data in metadata which is the current absolute position
    InsertToBuffer(metadataBuffer, reinterpret_cast<uint64_t *>(
                                       &m_HeapBuffer.m_DataAbsolutePosition));
wfg's avatar
wfg committed

    // Back to writing metadata pg index length (length of group)
    const uint16_t metadataPGIndexLength =
        metadataBuffer.size() - metadataPGLengthPosition - 2;
    size_t backPosition = metadataPGLengthPosition;
    CopyToBuffer(metadataBuffer, backPosition, &metadataPGIndexLength);
wfg's avatar
wfg committed
    // DONE With metadataBuffer

    // here write method in data
    const std::vector<uint8_t> methodIDs = GetTransportIDs(transportsTypes);
    const uint8_t methodsCount = methodIDs.size();
    CopyToBuffer(dataBuffer, dataPosition, &methodsCount); // count
    // methodID (1) + method params length(2), no parameters for now
    const uint16_t methodsLength = methodIDs.size() * 3;

    CopyToBuffer(dataBuffer, dataPosition, &methodsLength); // length
wfg's avatar
wfg committed

    for (const auto methodID : methodIDs)
    {
        CopyToBuffer(dataBuffer, dataPosition, &methodID); // method ID,
        dataPosition += 2; // skip method params length = 0 (2 bytes) for now
wfg's avatar
wfg committed
    }

    // update absolute position
    m_HeapBuffer.m_DataAbsolutePosition +=
        dataPosition - m_MetadataSet.DataPGLengthPosition;
wfg's avatar
wfg committed
    // pg vars count and position
    m_MetadataSet.DataPGVarsCount = 0;
    m_MetadataSet.DataPGVarsCountPosition = dataPosition;
wfg's avatar
wfg committed
    // add vars count and length
    dataPosition += 12;
    m_HeapBuffer.m_DataAbsolutePosition += 12; // add vars count and length
    ++m_MetadataSet.DataPGCount;
    m_MetadataSet.DataPGIsOpen = true;
William F Godoy's avatar
William F Godoy committed
    if (m_Profiler.IsActive)
    {
        m_Profiler.Timers.at("buffering").Pause();
    }
void BP1Writer::Advance()
{
William F Godoy's avatar
William F Godoy committed
    // enforce memory policy here to restrict buffer size for each timestep
    // this is flushing

    if (m_MaxBufferSize == DefaultMaxBufferSize)
    {
        // current position + 1Kb chunk tolerance
        m_MaxBufferSize = m_HeapBuffer.m_DataPosition + 64;
    }

William F Godoy's avatar
William F Godoy committed
    if (m_Profiler.IsActive)
    {
        m_Profiler.Timers.at("buffering").Resume();
    }
    FlattenData();
    ++m_MetadataSet.TimeStep;

    if (m_Profiler.IsActive)
    {
        m_Profiler.Timers.at("buffering").Pause();
    }
}

void BP1Writer::Flush()
{
    if (m_Profiler.IsActive)
    {
        m_Profiler.Timers.at("buffering").Resume();
    }

William F Godoy's avatar
William F Godoy committed
    if (m_Profiler.IsActive)
    {
        m_Profiler.Timers.at("buffering").Pause();
    }
}

void BP1Writer::Close() noexcept
William F Godoy's avatar
William F Godoy committed
    if (m_Profiler.IsActive)
        m_Profiler.Timers.at("buffering").Resume();
William F Godoy's avatar
William F Godoy committed
    if (!m_IsClosed)
William F Godoy's avatar
William F Godoy committed
        if (m_MetadataSet.DataPGIsOpen)
        FlattenMetadata();

        m_Profiler.Bytes.at("buffering") += m_HeapBuffer.m_DataAbsolutePosition;

        m_IsClosed = true;
    }

William F Godoy's avatar
William F Godoy committed
    if (m_Profiler.IsActive)
    {
        m_Profiler.Timers.at("buffering").Pause();
    }
}
std::string BP1Writer::GetRankProfilingJSON(
    const std::vector<std::string> &transportsTypes,
    const std::vector<profiling::IOChrono *> &transportsProfilers) noexcept
{
    auto lf_WriterTimer = [](std::string &rankLog,
                             const profiling::Timer &timer) {
        rankLog += "\"" + timer.m_Process + "_" + timer.GetShortUnits() +
                   "\": " + std::to_string(timer.m_ProcessTime) + ", ";
    };

    // prepare string dictionary per rank
    std::string rankLog("{ \"rank\": " +
                        std::to_string(m_BP1Aggregator.m_RankMPI) + ", ");

    auto &profiler = m_Profiler;

    std::string timeDate(profiler.Timers.at("buffering").m_LocalTimeDate);
    timeDate.pop_back();
    // avoid whitespace
    std::replace(timeDate.begin(), timeDate.end(), ' ', '_');
    rankLog += "\"start\": \"" + timeDate + "\", ";
    rankLog += "\"threads\": " + std::to_string(m_Threads) + ", ";
    rankLog +=
        "\"bytes\": " + std::to_string(profiler.Bytes.at("buffering")) + ", ";
    lf_WriterTimer(rankLog, profiler.Timers.at("buffering"));

    const size_t transportsSize = transportsTypes.size();

    for (unsigned int t = 0; t < transportsSize; ++t)
    {
        rankLog += "\"transport_" + std::to_string(t) + "\": { ";
        rankLog += "\"type\": \"" + transportsTypes[t] + "\", ";

        for (const auto &transportTimerPair : transportsProfilers[t]->Timers)
            lf_WriterTimer(rankLog, transportTimerPair.second);
        // replace last comma with space
        rankLog.pop_back();
        rankLog.pop_back();
        rankLog += " ";
        if (t == transportsSize - 1) // last element
            rankLog += "}";
        }
        else
        {
            rankLog += "},";
    rankLog += " }"; // end rank entry
    return rankLog;
std::string
BP1Writer::AggregateProfilingJSON(const std::string &rankProfilingLog) noexcept
    return m_BP1Aggregator.GetGlobalProfilingJSON(rankProfilingLog);
// PRIVATE FUNCTIONS
void BP1Writer::WriteDimensionsRecord(const Dims localDimensions,
                                      const Dims globalDimensions,
                                      const Dims offsets,
                                      std::vector<char> &buffer) noexcept
William F Godoy's avatar
William F Godoy committed
    if (offsets.empty())
        for (const auto &localDimension : localDimensions)
            InsertToBuffer(buffer,
                           reinterpret_cast<const uint64_t *>(&localDimension));
            buffer.insert(buffer.end(), 2 * sizeof(uint64_t), 0);
    }
    else
    {
        for (unsigned int d = 0; d < localDimensions.size(); ++d)
            InsertToBuffer(buffer, reinterpret_cast<const uint64_t *>(
                                       &localDimensions[d]));
            InsertToBuffer(buffer, reinterpret_cast<const uint64_t *>(
                                       &globalDimensions[d]));
            InsertToBuffer(buffer,
                           reinterpret_cast<const uint64_t *>(&offsets[d]));
void BP1Writer::WriteDimensionsRecord(const Dims localDimensions,
                                      const Dims globalDimensions,
                                      const Dims offsets,
                                      std::vector<char> &buffer,
                                      size_t &position,
                                      const bool isCharacteristic) noexcept
    auto lf_CopyDimension = [](std::vector<char> &buffer, size_t &position,
                               const size_t dimension,
                               const bool isCharacteristic) {
        if (!isCharacteristic)
        {
            constexpr char no = 'n';
            CopyToBuffer(buffer, position, &no);
        }

        CopyToBuffer(buffer, position,
                     reinterpret_cast<const uint64_t *>(&dimension));
wfg's avatar
wfg committed
    };

    // BODY Starts here
William F Godoy's avatar
William F Godoy committed
    if (offsets.empty())
        unsigned int globalBoundsSkip = 18;
        if (isCharacteristic)
        {
            globalBoundsSkip = 16;
        }

        for (const auto &localDimension : localDimensions)
            lf_CopyDimension(buffer, position, localDimension,
                             isCharacteristic);
            position += globalBoundsSkip;
        for (unsigned int d = 0; d < localDimensions.size(); ++d)
            lf_CopyDimension(buffer, position, localDimensions[d],
                             isCharacteristic);
            lf_CopyDimension(buffer, position, globalDimensions[d],
                             isCharacteristic);
            lf_CopyDimension(buffer, position, offsets[d], isCharacteristic);
    }
}

void BP1Writer::WriteNameRecord(const std::string name,
                                std::vector<char> &buffer) noexcept
    const uint16_t length = name.length();
    InsertToBuffer(buffer, &length);
    InsertToBuffer(buffer, name.c_str(), length);
void BP1Writer::WriteNameRecord(const std::string name,
                                std::vector<char> &buffer,
                                size_t &position) noexcept
{
    const uint16_t length = name.length();
    CopyToBuffer(buffer, position, &length);
    CopyToBuffer(buffer, position, name.c_str(), length);
}

BP1Index &
BP1Writer::GetBP1Index(const std::string name,
                       std::unordered_map<std::string, BP1Index> &indices,
                       bool &isNew) const noexcept
{
wfg's avatar
wfg committed
    auto itName = indices.find(name);
    if (itName == indices.end())
    {
        indices.emplace(name, BP1Index(indices.size()));
        isNew = true;
        return indices.at(name);
    }

    isNew = false;
    return itName->second;
void BP1Writer::FlattenData() noexcept
    auto &buffer = m_HeapBuffer.m_Data;
    auto &position = m_HeapBuffer.m_DataPosition;
wfg's avatar
wfg committed
    // vars count and Length (only for PG)
    CopyToBuffer(buffer, m_MetadataSet.DataPGVarsCountPosition,
                 &m_MetadataSet.DataPGVarsCount);
    // without record itself and vars count
    const uint64_t varsLength =
        position - m_MetadataSet.DataPGVarsCountPosition - 8 - 4;
    CopyToBuffer(buffer, m_MetadataSet.DataPGVarsCountPosition, &varsLength);
wfg's avatar
wfg committed

    // attributes (empty for now) count (4) and length (8) are zero by moving
    // positions in time step zero
    position += 12;
    m_HeapBuffer.m_DataAbsolutePosition += 12;
wfg's avatar
wfg committed

    // Finish writing pg group length
    // without record itself, 12 due to empty attributes
    const uint64_t dataPGLength =
        position - m_MetadataSet.DataPGLengthPosition - 8;
    CopyToBuffer(buffer, m_MetadataSet.DataPGLengthPosition, &dataPGLength);

    m_MetadataSet.DataPGIsOpen = false;
void BP1Writer::FlattenMetadata() noexcept
wfg's avatar
wfg committed
    auto lf_IndexCountLength =
        [](std::unordered_map<std::string, BP1Index> &indices, uint32_t &count,
           uint64_t &length) {
wfg's avatar
wfg committed

            count = indices.size();
            length = 0;
            for (auto &indexPair : indices) // set each index length
            {
                auto &indexBuffer = indexPair.second.Buffer;
                const uint32_t indexLength = indexBuffer.size() - 4;
                size_t indexLengthPosition = 0;
                CopyToBuffer(indexBuffer, indexLengthPosition, &indexLength);
wfg's avatar
wfg committed

                length += indexBuffer.size(); // overall length
            }
        };

    auto lf_FlattenIndices =
        [](const uint32_t count, const uint64_t length,
wfg's avatar
wfg committed
           const std::unordered_map<std::string, BP1Index> &indices,
           std::vector<char> &buffer, size_t &position) {

            CopyToBuffer(buffer, position, &count);
            CopyToBuffer(buffer, position, &length);
wfg's avatar
wfg committed

            for (const auto &indexPair : indices) // set each index length
            {
                const auto &indexBuffer = indexPair.second.Buffer;
                CopyToBuffer(buffer, position, indexBuffer.data(),
                             indexBuffer.size());
wfg's avatar
wfg committed
            }
        };

    // Finish writing metadata counts and lengths
    // PG Index
    const uint64_t pgCount = m_MetadataSet.DataPGCount;
    const uint64_t pgLength = m_MetadataSet.PGIndex.Buffer.size();
wfg's avatar
wfg committed

    // var index count and length (total), and each index length
    uint32_t varsCount;
    uint64_t varsLength;
    lf_IndexCountLength(m_MetadataSet.VarsIndices, varsCount, varsLength);
wfg's avatar
wfg committed
    // attribute index count and length, and each index length
    uint32_t attributesCount;
    uint64_t attributesLength;
    lf_IndexCountLength(m_MetadataSet.AttributesIndices, attributesCount,
wfg's avatar
wfg committed
                        attributesLength);

    const size_t footerSize = (pgLength + 16) + (varsLength + 12) +
                              (attributesLength + 12) +
                              m_MetadataSet.MiniFooterSize;

    auto &buffer = m_HeapBuffer.m_Data;
    auto &position = m_HeapBuffer.m_DataPosition;

    // reserve data to fit metadata,
    // must replace with growth buffer strategy
    m_HeapBuffer.ResizeData(position + footerSize);
    // buffer.resize(position + footerSize);
wfg's avatar
wfg committed

    // write pg index
    CopyToBuffer(buffer, position, &pgCount);
    CopyToBuffer(buffer, position, &pgLength);
    CopyToBuffer(buffer, position, m_MetadataSet.PGIndex.Buffer.data(),
                 pgLength);

wfg's avatar
wfg committed
    // Vars indices
    lf_FlattenIndices(varsCount, varsLength, m_MetadataSet.VarsIndices, buffer,
                      position);
wfg's avatar
wfg committed
    // Attribute indices
    lf_FlattenIndices(attributesCount, attributesLength,
                      m_MetadataSet.AttributesIndices, buffer, position);
wfg's avatar
wfg committed

    // getting absolute offsets, minifooter is 28 bytes for now
    const uint64_t offsetPGIndex = m_HeapBuffer.m_DataAbsolutePosition;
    const uint64_t offsetVarsIndex = offsetPGIndex + (pgLength + 16);
    const uint64_t offsetAttributeIndex = offsetVarsIndex + (varsLength + 12);
    CopyToBuffer(buffer, position, &offsetPGIndex);
    CopyToBuffer(buffer, position, &offsetVarsIndex);
    CopyToBuffer(buffer, position, &offsetAttributeIndex);
wfg's avatar
wfg committed

    // version
    if (IsLittleEndian())
    {
        const uint8_t endian = 0;
        CopyToBuffer(buffer, position, &endian);
        position += 2;
        CopyToBuffer(buffer, position, &m_Version);
    m_HeapBuffer.m_DataAbsolutePosition += footerSize;
William F Godoy's avatar
William F Godoy committed
    if (m_Profiler.IsActive)
        m_Profiler.Bytes.emplace("buffering",
                                 m_HeapBuffer.m_DataAbsolutePosition);
//------------------------------------------------------------------------------
// Explicit instantiation of only public templates

#define declare_template_instantiation(T)                                      \
    template void BP1Writer::WriteVariableMetadata(                            \
        const Variable<T> &variable) noexcept;                                 \
    template void BP1Writer::WriteVariablePayload(                             \
        const Variable<T> &variable) noexcept;
ADIOS2_FOREACH_TYPE_1ARG(declare_template_instantiation)
#undef declare_template_instantiation

//------------------------------------------------------------------------------

} // end namespace format
} // end namespace adios