Skip to content
Snippets Groups Projects
BP1Writer.cpp 13.9 KiB
Newer Older
 * Distributed under the OSI-approved Apache License, Version 2.0.  See
 * accompanying file Copyright.txt for details.
 *
 * BP1Writer.cpp
 *
 *  Created on: Feb 1, 2017
 *      Author: wfg
 */

/// \cond EXCLUDE_FROM_DOXYGEN
#include "utilities/format/bp1/BP1Writer.h"

#include <string>
#include <vector>
/// \endcond

namespace adios
{
namespace format
{

std::size_t BP1Writer::GetProcessGroupIndexSize(
    const std::string name, const std::string timeStepName,
    const std::size_t numberOfTransports) const noexcept
{
wfg's avatar
wfg committed
    // pgIndex + list of methods (transports)
    return (name.length() + timeStepName.length() + 23) +
           (3 +
            numberOfTransports); // should be sufficient for data and metadata
                                 // pgindices
}

void BP1Writer::WriteProcessGroupIndex(
    const bool isFortran, const std::string name, const std::uint32_t processID,
    const std::vector<std::shared_ptr<Transport>> &transports,
    capsule::STLVector &heap, BP1MetadataSet &metadataSet) const noexcept
{
wfg's avatar
wfg committed
    std::vector<char> &metadataBuffer = metadataSet.PGIndex.Buffer;
    std::vector<char> &dataBuffer = heap.m_Data;

    metadataSet.DataPGLengthPosition = dataBuffer.size();
    dataBuffer.insert(dataBuffer.end(), 8, 0); // skip pg length (8)

    const std::size_t metadataPGLengthPosition = metadataBuffer.size();
    metadataBuffer.insert(metadataBuffer.end(), 2, 0); // skip pg length (2)

    // write name to metadata
    WriteNameRecord(name, metadataBuffer);
    // write if host language Fortran in metadata and data
    const char hostFortran =
        (isFortran) ? 'y' : 'n'; // if host language is fortran
    CopyToBuffer(metadataBuffer, &hostFortran);
    CopyToBuffer(dataBuffer, &hostFortran);
    // write name in data
    WriteNameRecord(name, dataBuffer);

    // processID in metadata,
    CopyToBuffer(metadataBuffer, &processID);
    // skip coordination var in data ....what is coordination var?
    dataBuffer.insert(dataBuffer.end(), 4, 0);

    // time step name to metadata and data
    const std::string timeStepName(std::to_string(metadataSet.TimeStep));
    WriteNameRecord(timeStepName, metadataBuffer);
    WriteNameRecord(timeStepName, dataBuffer);

    // time step to metadata and data
    CopyToBuffer(metadataBuffer, &metadataSet.TimeStep);
    CopyToBuffer(dataBuffer, &metadataSet.TimeStep);

    // offset to pg in data in metadata which is the current absolute position
    CopyToBuffer(metadataBuffer, reinterpret_cast<std::uint64_t *>(
                                     &heap.m_DataAbsolutePosition));

    // Back to writing metadata pg index length (length of group)
    const std::uint16_t metadataPGIndexLength =
        metadataBuffer.size() - metadataPGLengthPosition -
        2; // without length of group record
    CopyToBuffer(metadataBuffer, metadataPGLengthPosition,
                 &metadataPGIndexLength);
    // DONE With metadataBuffer

    // here write method in data
    const std::vector<std::uint8_t> methodIDs = GetMethodIDs(transports);
    const std::uint8_t methodsCount = methodIDs.size();
    CopyToBuffer(dataBuffer, &methodsCount); // count
    const std::uint16_t methodsLength =
        methodIDs.size() *
        3; // methodID (1) + method params length(2), no parameters for now
    CopyToBuffer(dataBuffer, &methodsLength); // length

    for (const auto methodID : methodIDs)
    {
        CopyToBuffer(dataBuffer, &methodID); // method ID,
        dataBuffer.insert(dataBuffer.end(), 2,
                          0); // skip method params length = 0 (2 bytes) for now
    }

    // update absolute position
    heap.m_DataAbsolutePosition +=
        dataBuffer.size() - metadataSet.DataPGLengthPosition;
    // pg vars count and position
    metadataSet.DataPGVarsCount = 0;
    metadataSet.DataPGVarsCountPosition = dataBuffer.size();
    // add vars count and length
    dataBuffer.insert(dataBuffer.end(), 12, 0);
    heap.m_DataAbsolutePosition += 12; // add vars count and length

    ++metadataSet.DataPGCount;
    metadataSet.DataPGIsOpen = true;
}

void BP1Writer::Advance(BP1MetadataSet &metadataSet, capsule::STLVector &buffer)
{
wfg's avatar
wfg committed
    FlattenData(metadataSet, buffer);
}

void BP1Writer::Close(BP1MetadataSet &metadataSet, capsule::STLVector &heap,
                      Transport &transport, bool &isFirstClose,
                      const bool doAggregation) const noexcept
{
wfg's avatar
wfg committed
    if (metadataSet.Log.IsActive == true)
        metadataSet.Log.Timers[0].SetInitialTime();
wfg's avatar
wfg committed
    if (isFirstClose == true)
    {
        if (metadataSet.DataPGIsOpen == true)
            FlattenData(metadataSet, heap);
wfg's avatar
wfg committed
        FlattenMetadata(metadataSet, heap);
wfg's avatar
wfg committed
        if (metadataSet.Log.IsActive == true)
            metadataSet.Log.Timers[0].SetInitialTime();
wfg's avatar
wfg committed
        if (doAggregation ==
            true) // N-to-M  where 1 <= M <= N-1, might need a new
                  // Log metadataSet.Log.m_Timers just for
                  // aggregation
        {
            // here call aggregator
        }
        isFirstClose = false;
    }

    if (doAggregation == true) // N-to-M  where 1 <= M <= N-1
    {
        // here call aggregator to select transports for Write and Close
    }
    else // N-to-N
wfg's avatar
wfg committed
        transport.Write(heap.m_Data.data(), heap.m_Data.size()); // single write
        transport.Close();
    }
}

std::string BP1Writer::GetRankProfilingLog(
    const int rank, const BP1MetadataSet &metadataSet,
    const std::vector<std::shared_ptr<Transport>> &transports) const noexcept
{
wfg's avatar
wfg committed
    auto lf_WriterTimer = [](std::string &rankLog,
                             const profiling::Timer &timer) {
        rankLog += "'" + timer.m_Process + "_" + timer.GetUnits() + "': " +
                   std::to_string(timer.m_ProcessTime) + ", ";
    };
wfg's avatar
wfg committed
    // prepare string dictionary per rank
    std::string rankLog("'rank_" + std::to_string(rank) + "': { ");
wfg's avatar
wfg committed
    auto &profiler = metadataSet.Log;
    rankLog += "'bytes': " + std::to_string(profiler.TotalBytes[0]) + ", ";
    lf_WriterTimer(rankLog, profiler.Timers[0]);
wfg's avatar
wfg committed
    for (unsigned int t = 0; t < transports.size(); ++t)
    {
        auto &timers = transports[t]->m_Profiler.Timers;
wfg's avatar
wfg committed
        rankLog += "'transport_" + std::to_string(t) + "': { ";
        rankLog += "'lib': " + transports[t]->m_Type + ", ";
wfg's avatar
wfg committed
        for (unsigned int i = 0; i < 3; ++i)
            lf_WriterTimer(rankLog, timers[i]);
wfg's avatar
wfg committed
        rankLog += "}, ";
    }
    rankLog += "}, ";

wfg's avatar
wfg committed
    return rankLog;
}

// PRIVATE FUNCTIONS
void BP1Writer::WriteDimensionsRecord(
    std::vector<char> &buffer, const std::vector<std::size_t> &localDimensions,
    const std::vector<std::size_t> &globalDimensions,
    const std::vector<std::size_t> &globalOffsets, const unsigned int skip,
    const bool addType) const noexcept
{
wfg's avatar
wfg committed
    auto lf_WriteFlaggedDim = [](std::vector<char> &buffer, const char no,
                                 const std::size_t dimension) {
        CopyToBuffer(buffer, &no);
        CopyToBuffer(buffer,
wfg's avatar
wfg committed
                     reinterpret_cast<const std::uint64_t *>(&dimension));
    };

    // BODY Starts here
    if (globalDimensions.empty())
wfg's avatar
wfg committed
        if (addType == true)
        {
            constexpr char no =
                'n'; // dimension format unsigned int value (not using
                     // memberID for now)
            for (const auto &localDimension : localDimensions)
            {
                lf_WriteFlaggedDim(buffer, no, localDimension);
                buffer.insert(buffer.end(), skip, 0);
            }
        }
        else
        {
            for (const auto &localDimension : localDimensions)
            {
                CopyToBuffer(buffer, reinterpret_cast<const std::uint64_t *>(
                                         &localDimension));
                buffer.insert(buffer.end(), skip, 0);
            }
        }
wfg's avatar
wfg committed
        if (addType == true)
        {
            constexpr char no = 'n';
            for (unsigned int d = 0; d < localDimensions.size(); ++d)
            {
                lf_WriteFlaggedDim(buffer, no, localDimensions[d]);
                lf_WriteFlaggedDim(buffer, no, globalDimensions[d]);
                lf_WriteFlaggedDim(buffer, no, globalOffsets[d]);
            }
        }
        else
        {
            for (unsigned int d = 0; d < localDimensions.size(); ++d)
            {
                CopyToBuffer(buffer, reinterpret_cast<const std::uint64_t *>(
                                         &localDimensions[d]));
                CopyToBuffer(buffer, reinterpret_cast<const std::uint64_t *>(
                                         &globalDimensions[d]));
                CopyToBuffer(buffer, reinterpret_cast<const std::uint64_t *>(
                                         &globalOffsets[d]));
            }
        }
    }
}

void BP1Writer::WriteNameRecord(const std::string name,
                                std::vector<char> &buffer) const noexcept
{
wfg's avatar
wfg committed
    const std::uint16_t length = name.length();
    CopyToBuffer(buffer, &length);
    CopyToBuffer(buffer, name.c_str(), length);
}

BP1Index &
BP1Writer::GetBP1Index(const std::string name,
                       std::unordered_map<std::string, BP1Index> &indices,
                       bool &isNew) const noexcept
{
wfg's avatar
wfg committed
    auto itName = indices.find(name);
    if (itName == indices.end())
    {
        indices.emplace(name, BP1Index(indices.size()));
        isNew = true;
        return indices.at(name);
    }

    isNew = false;
    return itName->second;
}

void BP1Writer::FlattenData(BP1MetadataSet &metadataSet,
                            capsule::STLVector &heap) const noexcept
{
wfg's avatar
wfg committed
    auto &buffer = heap.m_Data;
    // vars count and Length (only for PG)
    CopyToBuffer(buffer, metadataSet.DataPGVarsCountPosition,
                 &metadataSet.DataPGVarsCount);
    const std::uint64_t varsLength = buffer.size() -
                                     metadataSet.DataPGVarsCountPosition - 8 -
                                     4; // without record itself and vars count
    CopyToBuffer(buffer, metadataSet.DataPGVarsCountPosition + 4, &varsLength);

    // attributes (empty for now) count (4) and length (8) are zero by moving
    // positions in time step zero
    buffer.insert(buffer.end(), 12, 0);
    heap.m_DataAbsolutePosition += 12;

    // Finish writing pg group length
    const std::uint64_t dataPGLength =
        buffer.size() - metadataSet.DataPGLengthPosition -
        8; // without record itself, 12 due to empty attributes
    CopyToBuffer(buffer, metadataSet.DataPGLengthPosition, &dataPGLength);

    ++metadataSet.TimeStep;
    metadataSet.DataPGIsOpen = false;
}

void BP1Writer::FlattenMetadata(BP1MetadataSet &metadataSet,
                                capsule::STLVector &heap) const noexcept
{
wfg's avatar
wfg committed
    auto lf_IndexCountLength =
        [](std::unordered_map<std::string, BP1Index> &indices,
           std::uint32_t &count, std::uint64_t &length) {

            count = indices.size();
            length = 0;
            for (auto &indexPair : indices) // set each index length
            {
                auto &indexBuffer = indexPair.second.Buffer;
                const std::uint32_t indexLength = indexBuffer.size() - 4;
                CopyToBuffer(indexBuffer, 0, &indexLength);

                length += indexBuffer.size(); // overall length
            }
        };

    auto lf_FlattenIndices =
        [](const std::uint32_t count, const std::uint64_t length,
           const std::unordered_map<std::string, BP1Index> &indices,
           std::vector<char> &buffer) {
            CopyToBuffer(buffer, &count);
            CopyToBuffer(buffer, &length);

            for (const auto &indexPair : indices) // set each index length
            {
                const auto &indexBuffer = indexPair.second.Buffer;
                CopyToBuffer(buffer, indexBuffer.data(), indexBuffer.size());
            }
        };

    // Finish writing metadata counts and lengths
    // PG Index
    const std::uint64_t pgCount = metadataSet.DataPGCount;
    const std::uint64_t pgLength = metadataSet.PGIndex.Buffer.size();

    // var index count and length (total), and each index length
    std::uint32_t varsCount;
    std::uint64_t varsLength;
    lf_IndexCountLength(metadataSet.VarsIndices, varsCount, varsLength);
    // attribute index count and length, and each index length
    std::uint32_t attributesCount;
    std::uint64_t attributesLength;
    lf_IndexCountLength(metadataSet.AttributesIndices, attributesCount,
                        attributesLength);

    const std::size_t footerSize = (pgLength + 16) + (varsLength + 12) +
                                   (attributesLength + 12) +
                                   metadataSet.MiniFooterSize;
    auto &buffer = heap.m_Data;
    buffer.reserve(buffer.size() + footerSize); // reserve data to fit metadata,
    // must replace with growth buffer
    // strategy

    // write pg index
    CopyToBuffer(buffer, &pgCount);
    CopyToBuffer(buffer, &pgLength);
    CopyToBuffer(buffer, metadataSet.PGIndex.Buffer.data(), pgLength);
    // Vars indices
    lf_FlattenIndices(varsCount, varsLength, metadataSet.VarsIndices, buffer);
    // Attribute indices
    lf_FlattenIndices(attributesCount, attributesLength,
                      metadataSet.AttributesIndices, buffer);

    // getting absolute offsets, minifooter is 28 bytes for now
    const std::uint64_t offsetPGIndex = heap.m_DataAbsolutePosition;
    const std::uint64_t offsetVarsIndex = offsetPGIndex + (pgLength + 16);
    const std::uint64_t offsetAttributeIndex =
        offsetVarsIndex + (varsLength + 12);

    CopyToBuffer(buffer, &offsetPGIndex);
    CopyToBuffer(buffer, &offsetVarsIndex);
    CopyToBuffer(buffer, &offsetAttributeIndex);

    // version
    if (IsLittleEndian())
    {
        const std::uint8_t endian = 0;
        CopyToBuffer(buffer, &endian);
        buffer.insert(buffer.end(), 2, 0);
        CopyToBuffer(buffer, &m_Version);
    }
    else
    {
    }
wfg's avatar
wfg committed
    heap.m_DataAbsolutePosition += footerSize;
wfg's avatar
wfg committed
    if (metadataSet.Log.IsActive == true)
        metadataSet.Log.TotalBytes.push_back(heap.m_DataAbsolutePosition);
}

} // end namespace format
} // end namespace adios