Skip to content
Snippets Groups Projects
BP1Writer.cpp 15.4 KiB
Newer Older
 * Distributed under the OSI-approved Apache License, Version 2.0.  See
 * accompanying file Copyright.txt for details.
 *
 * BP1Writer.cpp
 *
 *  Created on: Feb 1, 2017
 *      Author: wfg
 */

#include <string>
#include <vector>

namespace adios
{
namespace format
{

BP1Writer::BP1Writer(MPI_Comm mpiComm, const bool debugMode)
: BP1Base(mpiComm, debugMode)
{
}

std::size_t BP1Writer::GetProcessGroupIndexSize(
    const std::string name, const std::string timeStepName,
    const std::size_t numberOfTransports) const noexcept
{
wfg's avatar
wfg committed
    // pgIndex + list of methods (transports)
    return (name.length() + timeStepName.length() + 23) +
           (3 + numberOfTransports);
}

void BP1Writer::WriteProcessGroupIndex(
    const bool isFortran, const std::string name, const uint32_t processID,
    const std::vector<std::shared_ptr<Transport>> &transports) noexcept
    std::vector<char> &metadataBuffer = m_MetadataSet.PGIndex.Buffer;
    std::vector<char> &dataBuffer = m_Heap.m_Data;
    size_t &dataPosition = m_Heap.m_DataPosition;

    m_MetadataSet.DataPGLengthPosition = dataPosition;
    dataPosition += 8; // skip pg length (8)
wfg's avatar
wfg committed

    const std::size_t metadataPGLengthPosition = metadataBuffer.size();
    metadataBuffer.insert(metadataBuffer.end(), 2, 0); // skip pg length (2)

    // write name to metadata
    WriteNameRecord(name, metadataBuffer);
    // write if host language Fortran in metadata and data
    const char hostFortran = (isFortran) ? 'y' : 'n';
    InsertToBuffer(metadataBuffer, &hostFortran);
    CopyToBuffer(dataBuffer, dataPosition, &hostFortran);
wfg's avatar
wfg committed
    // write name in data
    WriteNameRecord(name, dataBuffer, dataPosition);
wfg's avatar
wfg committed

    // processID in metadata,
    InsertToBuffer(metadataBuffer, &processID);
wfg's avatar
wfg committed
    // skip coordination var in data ....what is coordination var?
    dataPosition += 4;
wfg's avatar
wfg committed

    // time step name to metadata and data
    const std::string timeStepName(std::to_string(m_MetadataSet.TimeStep));
wfg's avatar
wfg committed
    WriteNameRecord(timeStepName, metadataBuffer);
    WriteNameRecord(timeStepName, dataBuffer, dataPosition);
wfg's avatar
wfg committed

    // time step to metadata and data
    InsertToBuffer(metadataBuffer, &m_MetadataSet.TimeStep);
    CopyToBuffer(dataBuffer, dataPosition, &m_MetadataSet.TimeStep);
wfg's avatar
wfg committed

    // offset to pg in data in metadata which is the current absolute position
    InsertToBuffer(metadataBuffer, reinterpret_cast<uint64_t *>(
                                       &m_Heap.m_DataAbsolutePosition));
wfg's avatar
wfg committed

    // Back to writing metadata pg index length (length of group)
    const uint16_t metadataPGIndexLength =
        metadataBuffer.size() - metadataPGLengthPosition - 2;
    size_t backPosition = metadataPGLengthPosition;
    CopyToBuffer(metadataBuffer, backPosition, &metadataPGIndexLength);
wfg's avatar
wfg committed
    // DONE With metadataBuffer

    // here write method in data
    const std::vector<uint8_t> methodIDs = GetMethodIDs(transports);
    const uint8_t methodsCount = methodIDs.size();
    CopyToBuffer(dataBuffer, dataPosition, &methodsCount); // count
    // methodID (1) + method params length(2), no parameters for now
    const uint16_t methodsLength = methodIDs.size() * 3;

    CopyToBuffer(dataBuffer, dataPosition, &methodsLength); // length
wfg's avatar
wfg committed

    for (const auto methodID : methodIDs)
    {
        CopyToBuffer(dataBuffer, dataPosition, &methodID); // method ID,
        dataPosition += 2; // skip method params length = 0 (2 bytes) for now
wfg's avatar
wfg committed
    }

    // update absolute position
    m_Heap.m_DataAbsolutePosition +=
        dataPosition - m_MetadataSet.DataPGLengthPosition;
wfg's avatar
wfg committed
    // pg vars count and position
    m_MetadataSet.DataPGVarsCount = 0;
    m_MetadataSet.DataPGVarsCountPosition = dataPosition;
wfg's avatar
wfg committed
    // add vars count and length
    dataPosition += 12;
    m_Heap.m_DataAbsolutePosition += 12; // add vars count and length
    ++m_MetadataSet.DataPGCount;
    m_MetadataSet.DataPGIsOpen = true;
void BP1Writer::Advance() { FlattenData(); }
void BP1Writer::Close(Transport &transport, bool &isFirstClose,
                      const bool doAggregation) noexcept
    if (m_MetadataSet.Log.IsActive == true)
    {
        m_MetadataSet.Log.Timers[0].SetInitialTime();
    }
wfg's avatar
wfg committed
    if (isFirstClose == true)
    {
        if (m_MetadataSet.DataPGIsOpen == true)
        {
            FlattenData();
        }
        FlattenMetadata();
        if (m_MetadataSet.Log.IsActive == true)
        {
            m_MetadataSet.Log.Timers[0].SetInitialTime();
        }
        // N-to-M  where 1 <= M <= N-1, might need a new
        // Log metadataSet.Log.m_Timers just for aggregation
        if (doAggregation == true)
wfg's avatar
wfg committed
        {
            // here call aggregator
        }
        isFirstClose = false;
    }

    if (doAggregation == true) // N-to-M  where 1 <= M <= N-1
    {
        // here call aggregator to select transports for Write and Close
    }
    else // N-to-N
        // single write
        transport.Write(m_Heap.m_Data.data(), m_Heap.m_Data.size());
wfg's avatar
wfg committed
        transport.Close();
void BP1Writer::WriteProfilingLogFile(
    const std::string &name, const unsigned int rank,
    const std::vector<std::shared_ptr<Transport>> &transports) noexcept
    const std::string fileName(GetDirectoryName(name) + "/profiling.log");
    const std::string rankLog(GetRankProfilingLog(rank, transports));
    m_BP1Aggregator.WriteProfilingLog(fileName, rankLog);
}
// PRIVATE FUNCTIONS
void BP1Writer::WriteDimensionsRecord(
    const std::vector<size_t> &localDimensions,
    const std::vector<size_t> &globalDimensions,
    const std::vector<size_t> &offsets, std::vector<char> &buffer) noexcept
{
    if (offsets.empty() == true)
        for (const auto &localDimension : localDimensions)
            InsertToBuffer(buffer,
                           reinterpret_cast<const uint64_t *>(&localDimension));
            buffer.insert(buffer.end(), 2 * sizeof(uint64_t), 0);
    }
    else
    {
        for (unsigned int d = 0; d < localDimensions.size(); ++d)
            InsertToBuffer(buffer, reinterpret_cast<const uint64_t *>(
                                       &localDimensions[d]));
            InsertToBuffer(buffer, reinterpret_cast<const uint64_t *>(
                                       &globalDimensions[d]));
            InsertToBuffer(buffer,
                           reinterpret_cast<const uint64_t *>(&offsets[d]));
}

void BP1Writer::WriteDimensionsRecord(
    const std::vector<size_t> &localDimensions,
    const std::vector<size_t> &globalDimensions,
    const std::vector<size_t> &offsets, const unsigned int skip,
    std::vector<char> &buffer, size_t &position) noexcept
    auto lf_WriteFlaggedDim = [](std::vector<char> &buffer, size_t &position,
                                 const size_t dimension) {
        constexpr char no = 'n';
        CopyToBuffer(buffer, position, &no);
        CopyToBuffer(buffer, position,
                     reinterpret_cast<const uint64_t *>(&dimension));
wfg's avatar
wfg committed
    };

    // BODY Starts here
    if (offsets.empty() == true)
        for (const auto &localDimension : localDimensions)
            lf_WriteFlaggedDim(buffer, position, localDimension);
            position += skip;
        for (unsigned int d = 0; d < localDimensions.size(); ++d)
            lf_WriteFlaggedDim(buffer, position, localDimensions[d]);
            lf_WriteFlaggedDim(buffer, position, globalDimensions[d]);
            lf_WriteFlaggedDim(buffer, position, offsets[d]);
    }
}

void BP1Writer::WriteNameRecord(const std::string name,
                                std::vector<char> &buffer) noexcept
    const uint16_t length = name.length();
    InsertToBuffer(buffer, &length);
    InsertToBuffer(buffer, name.c_str(), length);
void BP1Writer::WriteNameRecord(const std::string name,
                                std::vector<char> &buffer,
                                size_t &position) noexcept
{
    const uint16_t length = name.length();
    CopyToBuffer(buffer, position, &length);
    CopyToBuffer(buffer, position, name.c_str(), length);
}

BP1Index &
BP1Writer::GetBP1Index(const std::string name,
                       std::unordered_map<std::string, BP1Index> &indices,
                       bool &isNew) const noexcept
{
wfg's avatar
wfg committed
    auto itName = indices.find(name);
    if (itName == indices.end())
    {
        indices.emplace(name, BP1Index(indices.size()));
        isNew = true;
        return indices.at(name);
    }

    isNew = false;
    return itName->second;
void BP1Writer::FlattenData() noexcept
    auto &buffer = m_Heap.m_Data;
    auto &position = m_Heap.m_DataPosition;

wfg's avatar
wfg committed
    // vars count and Length (only for PG)
    CopyToBuffer(buffer, m_MetadataSet.DataPGVarsCountPosition,
                 &m_MetadataSet.DataPGVarsCount);
    // without record itself and vars count
    const uint64_t varsLength =
        position - m_MetadataSet.DataPGVarsCountPosition - 8 - 4;
    CopyToBuffer(buffer, m_MetadataSet.DataPGVarsCountPosition, &varsLength);
wfg's avatar
wfg committed

    // attributes (empty for now) count (4) and length (8) are zero by moving
    // positions in time step zero
    position += 12;
    m_Heap.m_DataAbsolutePosition += 12;
wfg's avatar
wfg committed

    // Finish writing pg group length
    // without record itself, 12 due to empty attributes
    const uint64_t dataPGLength =
        position - m_MetadataSet.DataPGLengthPosition - 8;
    CopyToBuffer(buffer, m_MetadataSet.DataPGLengthPosition, &dataPGLength);

    ++m_MetadataSet.TimeStep;
    m_MetadataSet.DataPGIsOpen = false;
void BP1Writer::FlattenMetadata() noexcept
wfg's avatar
wfg committed
    auto lf_IndexCountLength =
        [](std::unordered_map<std::string, BP1Index> &indices, uint32_t &count,
           uint64_t &length) {
wfg's avatar
wfg committed

            count = indices.size();
            length = 0;
            for (auto &indexPair : indices) // set each index length
            {
                auto &indexBuffer = indexPair.second.Buffer;
                const uint32_t indexLength = indexBuffer.size() - 4;
                size_t indexLengthPosition = 0;
                CopyToBuffer(indexBuffer, indexLengthPosition, &indexLength);
wfg's avatar
wfg committed

                length += indexBuffer.size(); // overall length
            }
        };

    auto lf_FlattenIndices =
        [](const uint32_t count, const uint64_t length,
wfg's avatar
wfg committed
           const std::unordered_map<std::string, BP1Index> &indices,
           std::vector<char> &buffer, size_t &position) {

            CopyToBuffer(buffer, position, &count);
            CopyToBuffer(buffer, position, &length);
wfg's avatar
wfg committed

            for (const auto &indexPair : indices) // set each index length
            {
                const auto &indexBuffer = indexPair.second.Buffer;
                CopyToBuffer(buffer, position, indexBuffer.data(),
                             indexBuffer.size());
wfg's avatar
wfg committed
            }
        };

    // Finish writing metadata counts and lengths
    // PG Index
    const uint64_t pgCount = m_MetadataSet.DataPGCount;
    const uint64_t pgLength = m_MetadataSet.PGIndex.Buffer.size();
wfg's avatar
wfg committed

    // var index count and length (total), and each index length
    uint32_t varsCount;
    uint64_t varsLength;
    lf_IndexCountLength(m_MetadataSet.VarsIndices, varsCount, varsLength);
wfg's avatar
wfg committed
    // attribute index count and length, and each index length
    uint32_t attributesCount;
    uint64_t attributesLength;
    lf_IndexCountLength(m_MetadataSet.AttributesIndices, attributesCount,
wfg's avatar
wfg committed
                        attributesLength);

    const size_t footerSize = (pgLength + 16) + (varsLength + 12) +
                              (attributesLength + 12) +
                              m_MetadataSet.MiniFooterSize;

    auto &buffer = m_Heap.m_Data;
    auto &position = m_Heap.m_DataPosition;

    // reserve data to fit metadata,
    buffer.resize(position + footerSize);
    // must replace with growth buffer strategy?
wfg's avatar
wfg committed

    // write pg index
    CopyToBuffer(buffer, position, &pgCount);
    CopyToBuffer(buffer, position, &pgLength);
    CopyToBuffer(buffer, position, m_MetadataSet.PGIndex.Buffer.data(),
                 pgLength);

wfg's avatar
wfg committed
    // Vars indices
    lf_FlattenIndices(varsCount, varsLength, m_MetadataSet.VarsIndices, buffer,
                      position);
wfg's avatar
wfg committed
    // Attribute indices
    lf_FlattenIndices(attributesCount, attributesLength,
                      m_MetadataSet.AttributesIndices, buffer, position);
wfg's avatar
wfg committed

    // getting absolute offsets, minifooter is 28 bytes for now
    const uint64_t offsetPGIndex = m_Heap.m_DataAbsolutePosition;
    const uint64_t offsetVarsIndex = offsetPGIndex + (pgLength + 16);
    const uint64_t offsetAttributeIndex = offsetVarsIndex + (varsLength + 12);
    CopyToBuffer(buffer, position, &offsetPGIndex);
    CopyToBuffer(buffer, position, &offsetVarsIndex);
    CopyToBuffer(buffer, position, &offsetAttributeIndex);
wfg's avatar
wfg committed

    // version
    if (IsLittleEndian())
    {
        const uint8_t endian = 0;
        CopyToBuffer(buffer, position, &endian);
        position += 2;
        CopyToBuffer(buffer, position, &m_Version);
    m_Heap.m_DataAbsolutePosition += footerSize;
    if (m_MetadataSet.Log.IsActive == true)
    {
        m_MetadataSet.Log.TotalBytes.push_back(m_Heap.m_DataAbsolutePosition);
    }
}

std::string BP1Writer::GetRankProfilingLog(
    const unsigned int rank,
    const std::vector<std::shared_ptr<Transport>> &transports) noexcept
{
    auto lf_WriterTimer = [](std::string &rankLog,
                             const profiling::Timer &timer) {
        rankLog += "'" + timer.m_Process + "_" + timer.GetUnits() + "': " +
                   std::to_string(timer.m_ProcessTime);
    };

    // prepare string dictionary per rank
    std::string rankLog("'rank_" + std::to_string(rank) + "': { ");

    auto &profiler = m_MetadataSet.Log;
    rankLog += "'bytes': " + std::to_string(profiler.TotalBytes[0]) + ", ";
    lf_WriterTimer(rankLog, profiler.Timers[0]);
    rankLog += ", ";

    for (unsigned int t = 0; t < transports.size(); ++t)
    {
        rankLog += "'transport_" + std::to_string(t) + "': { ";
        rankLog += "'lib': '" + transports[t]->m_Type + "', ";

        for (unsigned int i = 0; i < 3; ++i)
        {
            lf_WriterTimer(rankLog, transports[t]->m_Profiler.Timers[i]);
            if (i < 2)
            {
                rankLog += ", ";
            }
            else
            {
                rankLog += " ";
            }
        }

        if (t == transports.size() - 1) // last element
        {
            rankLog += "}";
        }
        else
        {
            rankLog += "},";
        }
    }
    rankLog += " },";

    return rankLog;
//------------------------------------------------------------------------------
// Explicit instantiation of only public templates

#define declare_template_instantiation(T)                                      \
    template BP1Writer::ResizeResult BP1Writer::ResizeBuffer(                  \
        const Variable<T> &variable);                                          \
                                                                               \
    template void BP1Writer::WriteVariablePayload(                             \
        const Variable<T> &variable) noexcept;                                 \
    template void BP1Writer::WriteVariableMetadata(                            \
        const Variable<T> &variable) noexcept;
ADIOS_FOREACH_TYPE_1ARG(declare_template_instantiation)
#undef declare_template_instantiation

//------------------------------------------------------------------------------

} // end namespace format
} // end namespace adios