Skip to content
Snippets Groups Projects
BP3Serializer.cpp 37.8 KiB
Newer Older
 * Distributed under the OSI-approved Apache License, Version 2.0.  See
 * accompanying file Copyright.txt for details.
 *
 * BP3Serializer.cpp
 *
 *  Created on: Feb 1, 2017
 *      Author: William F Godoy godoywf@ornl.gov
#include "BP3Serializer.h"
#include "BP3Serializer.tcc"
#include <chrono>
#include <future>
#include <string>
#include <vector>

#include "adios2/helper/adiosFunctions.h" //GetType<T>, ReadValue<T>,
                                          // ReduceValue<T>
namespace adios2
{
namespace format
{

std::mutex BP3Serializer::m_Mutex;
BP3Serializer::BP3Serializer(MPI_Comm mpiComm, const bool debugMode)
: BP3Base(mpiComm, debugMode)
void BP3Serializer::PutProcessGroupIndex(
    const std::string hostLanguage,
    const std::vector<std::string> &transportsTypes) noexcept
    ProfilerStart("buffering");
    std::vector<char> &metadataBuffer = m_MetadataSet.PGIndex.Buffer;
    std::vector<char> &dataBuffer = m_Data.m_Buffer;
    size_t &dataPosition = m_Data.m_Position;

    m_MetadataSet.DataPGLengthPosition = dataPosition;
    dataPosition += 8; // skip pg length (8)
wfg's avatar
wfg committed

    const std::size_t metadataPGLengthPosition = metadataBuffer.size();
    metadataBuffer.insert(metadataBuffer.end(), 2, '\0'); // skip pg length (2)
wfg's avatar
wfg committed

    // write name to metadata
    const std::string name(std::to_string(m_RankMPI));
    PutNameRecord(name, metadataBuffer);
wfg's avatar
wfg committed
    // write if host language Fortran in metadata and data
    const char hostFortran = (hostLanguage == "Fortran") ? 'y' : 'n';
    InsertToBuffer(metadataBuffer, &hostFortran);
    CopyToBuffer(dataBuffer, dataPosition, &hostFortran);
wfg's avatar
wfg committed
    // write name in data
    PutNameRecord(name, dataBuffer, dataPosition);
wfg's avatar
wfg committed

    // processID in metadata,
    const uint32_t processID = static_cast<const uint32_t>(m_RankMPI);
    InsertToBuffer(metadataBuffer, &processID);
wfg's avatar
wfg committed
    // skip coordination var in data ....what is coordination var?
    dataPosition += 4;
wfg's avatar
wfg committed

    // time step name to metadata and data
    const std::string timeStepName(std::to_string(m_MetadataSet.TimeStep));
    PutNameRecord(timeStepName, metadataBuffer);
    PutNameRecord(timeStepName, dataBuffer, dataPosition);
wfg's avatar
wfg committed

    // time step to metadata and data
    InsertToBuffer(metadataBuffer, &m_MetadataSet.TimeStep);
    CopyToBuffer(dataBuffer, dataPosition, &m_MetadataSet.TimeStep);
wfg's avatar
wfg committed

    // offset to pg in data in metadata which is the current absolute position
    InsertU64(metadataBuffer, m_Data.m_AbsolutePosition);
wfg's avatar
wfg committed

    // Back to writing metadata pg index length (length of group)
    const uint16_t metadataPGIndexLength = static_cast<const uint16_t>(
        metadataBuffer.size() - metadataPGLengthPosition - 2);

    size_t backPosition = metadataPGLengthPosition;
    CopyToBuffer(metadataBuffer, backPosition, &metadataPGIndexLength);
wfg's avatar
wfg committed
    // DONE With metadataBuffer

    // here write method in data
    const std::vector<uint8_t> methodIDs = GetTransportIDs(transportsTypes);
    const uint8_t methodsCount = static_cast<const uint8_t>(methodIDs.size());
    CopyToBuffer(dataBuffer, dataPosition, &methodsCount); // count
    // methodID (1) + method params length(2), no parameters for now
    const uint16_t methodsLength =
        static_cast<const uint16_t>(methodsCount * 3);

    CopyToBuffer(dataBuffer, dataPosition, &methodsLength); // length
wfg's avatar
wfg committed

    for (const auto methodID : methodIDs)
    {
        CopyToBuffer(dataBuffer, dataPosition, &methodID); // method ID,
        dataPosition += 2; // skip method params length = 0 (2 bytes) for now
wfg's avatar
wfg committed
    }

    // update absolute position
    m_Data.m_AbsolutePosition +=
        dataPosition - m_MetadataSet.DataPGLengthPosition;
wfg's avatar
wfg committed
    // pg vars count and position
    m_MetadataSet.DataPGVarsCount = 0;
    m_MetadataSet.DataPGVarsCountPosition = dataPosition;
wfg's avatar
wfg committed
    // add vars count and length
    dataPosition += 12;
    m_Data.m_AbsolutePosition += 12; // add vars count and length
    ++m_MetadataSet.DataPGCount;
    m_MetadataSet.DataPGIsOpen = true;
    ProfilerStop("buffering");
William F Godoy's avatar
William F Godoy committed
void BP3Serializer::AllocateDeferredSize()
{
    if (m_MaxBufferSize == DefaultMaxBufferSize)
    {
        if (m_DeferredVariablesDataSize > DefaultInitialBufferSize)
        {
            m_MaxBufferSize = m_DeferredVariablesDataSize;
        }
        else
        {
            m_MaxBufferSize = DefaultInitialBufferSize;
        }
    }
    else if (m_MaxBufferSize > m_DeferredVariablesDataSize)
    {
        // must be a multiple integer
        const size_t times = m_MaxBufferSize / m_DeferredVariablesDataSize;
        m_MaxBufferSize = times * m_DeferredVariablesDataSize;
    }

    ResizeBuffer(m_MaxBufferSize, "in call to PerformPuts");
}

void BP3Serializer::SerializeData(IO &io, const bool advanceStep)
    ProfilerStart("buffering");
    SerializeDataBuffer(io);
    if (advanceStep)
        if (m_MaxBufferSize == DefaultMaxBufferSize)
        {
            m_MaxBufferSize = m_Data.m_Position + 64;
        }
        ++m_MetadataSet.TimeStep;
    ProfilerStop("buffering");
void BP3Serializer::CloseData(IO &io)
    ProfilerStart("buffering");
William F Godoy's avatar
William F Godoy committed
    if (!m_IsClosed)
William F Godoy's avatar
William F Godoy committed
        if (m_MetadataSet.DataPGIsOpen)
            SerializeDataBuffer(io);
        SerializeMetadataInData();
        m_Profiler.Bytes.at("buffering") += m_Data.m_AbsolutePosition;
        m_IsClosed = true;
    }

    ProfilerStop("buffering");
std::string BP3Serializer::GetRankProfilingJSON(
    const std::vector<std::string> &transportsTypes,
    const std::vector<profiling::IOChrono *> &transportsProfilers) noexcept
{
    auto lf_WriterTimer = [](std::string &rankLog,
                             const profiling::Timer &timer) {
        rankLog += "\"" + timer.m_Process + "_" + timer.GetShortUnits() +
                   "\": " + std::to_string(timer.m_ProcessTime) + ", ";
    };

    // prepare string dictionary per rank
    std::string rankLog("{ \"rank\": " + std::to_string(m_RankMPI) + ", ");

    auto &profiler = m_Profiler;

    std::string timeDate(profiler.Timers.at("buffering").m_LocalTimeDate);
    timeDate.pop_back();
    // avoid whitespace
    std::replace(timeDate.begin(), timeDate.end(), ' ', '_');
    rankLog += "\"start\": \"" + timeDate + "\", ";
    rankLog += "\"threads\": " + std::to_string(m_Threads) + ", ";
    rankLog +=
        "\"bytes\": " + std::to_string(profiler.Bytes.at("buffering")) + ", ";
    lf_WriterTimer(rankLog, profiler.Timers.at("buffering"));

    const size_t transportsSize = transportsTypes.size();

    for (unsigned int t = 0; t < transportsSize; ++t)
    {
        rankLog += "\"transport_" + std::to_string(t) + "\": { ";
        rankLog += "\"type\": \"" + transportsTypes[t] + "\", ";

        for (const auto &transportTimerPair : transportsProfilers[t]->Timers)
            lf_WriterTimer(rankLog, transportTimerPair.second);
        // replace last comma with space
        rankLog.pop_back();
        rankLog.pop_back();
        rankLog += " ";
        if (t == transportsSize - 1) // last element
            rankLog += "}";
        }
        else
        {
            rankLog += "},";
    rankLog += " }"; // end rank entry
    return rankLog;
BP3Serializer::AggregateProfilingJSON(const std::string &rankProfilingLog)
    return SetCollectiveProfilingJSON(rankProfilingLog);
void BP3Serializer::AggregateCollectiveMetadata()
    const uint64_t pgIndexStart = m_Metadata.m_Position;
    AggregateIndex(m_MetadataSet.PGIndex, m_MetadataSet.DataPGCount);

    const uint64_t variablesIndexStart = m_Metadata.m_Position;
    AggregateMergeIndex(m_MetadataSet.VarsIndices);

    const uint64_t attributesIndexStart = m_Metadata.m_Position;
    AggregateMergeIndex(m_MetadataSet.AttributesIndices);

    if (m_RankMPI == 0)
    {
        m_Metadata.Resize(m_Metadata.m_Position + m_MetadataSet.MiniFooterSize,
                          " when writing collective bp1 Minifooter");
        PutMinifooter(pgIndexStart, variablesIndexStart, attributesIndexStart,
                      m_Metadata.m_Buffer, m_Metadata.m_Position, true);
        m_Metadata.m_AbsolutePosition = m_Metadata.m_Position;
    }
// PRIVATE FUNCTIONS
void BP3Serializer::PutAttributes(IO &io)
{
    const auto attributesDataMap = io.GetAttributesDataMap();

    auto &buffer = m_Data.m_Buffer;
    auto &position = m_Data.m_Position;
    auto &absolutePosition = m_Data.m_AbsolutePosition;

    // used only to update m_HeapBuffer.m_DataAbsolutePosition;
    const size_t attributesCountPosition = position;

    // count is known ahead of time, write
    const uint32_t attributesCount =
        static_cast<const uint32_t>(attributesDataMap.size());
    CopyToBuffer(buffer, position, &attributesCount);

    // will go back
    const size_t attributesLengthPosition = position;
    position += 8; // skip attributes length

    absolutePosition += position - attributesCountPosition;

    uint32_t memberID = 0;

    for (const auto &attributePair : attributesDataMap)
    {
        const std::string name(attributePair.first);
        const std::string type(attributePair.second.first);

        if (type == "unknown")
        {
        }
#define declare_type(T)                                                        \
    else if (type == GetType<T>())                                             \
    {                                                                          \
        Stats<T> stats;                                                        \
        stats.Offset = absolutePosition;                                       \
        stats.MemberID = memberID;                                             \
        stats.Step = m_MetadataSet.TimeStep;                                   \
        stats.FileIndex = static_cast<uint32_t>(m_RankMPI);                    \
        Attribute<T> &attribute = *io.InquireAttribute<T>(name);               \
        PutAttributeInData(attribute, stats);                                  \
        PutAttributeInIndex(attribute, stats);                                 \
    }
        ADIOS2_FOREACH_ATTRIBUTE_TYPE_1ARG(declare_type)
#undef declare_type

        ++memberID;
    }

    // complete attributes length
    const uint64_t attributesLength =
        static_cast<const uint64_t>(position - attributesLengthPosition);

    size_t backPosition = attributesLengthPosition;
    CopyToBuffer(buffer, backPosition, &attributesLength);
}

void BP3Serializer::PutDimensionsRecord(const Dims &localDimensions,
                                        const Dims &globalDimensions,
                                        const Dims &offsets,
                                        std::vector<char> &buffer) noexcept
William F Godoy's avatar
William F Godoy committed
    if (offsets.empty())
        for (const auto localDimension : localDimensions)
            InsertU64(buffer, localDimension);
            buffer.insert(buffer.end(), 2 * sizeof(uint64_t), '\0');
    }
    else
    {
        for (unsigned int d = 0; d < localDimensions.size(); ++d)
            InsertU64(buffer, localDimensions[d]);
            InsertU64(buffer, globalDimensions[d]);
            InsertU64(buffer, offsets[d]);
void BP3Serializer::PutDimensionsRecord(const Dims &localDimensions,
                                        const Dims &globalDimensions,
                                        const Dims &offsets,
                                        std::vector<char> &buffer,
                                        size_t &position,
                                        const bool isCharacteristic) noexcept
    auto lf_CopyDimension = [](std::vector<char> &buffer, size_t &position,
                               const size_t dimension,
                               const bool isCharacteristic) {
        if (!isCharacteristic)
        {
            constexpr char no = 'n';
            CopyToBuffer(buffer, position, &no);
        }

        const uint64_t dimension64 = static_cast<const uint64_t>(dimension);

        CopyToBuffer(buffer, position, &dimension64);
wfg's avatar
wfg committed
    };

    // BODY Starts here
William F Godoy's avatar
William F Godoy committed
    if (offsets.empty())
        unsigned int globalBoundsSkip = 18;
        if (isCharacteristic)
        {
            globalBoundsSkip = 16;
        }

        for (const auto &localDimension : localDimensions)
            lf_CopyDimension(buffer, position, localDimension,
                             isCharacteristic);
            position += globalBoundsSkip;
        for (unsigned int d = 0; d < localDimensions.size(); ++d)
            lf_CopyDimension(buffer, position, localDimensions[d],
                             isCharacteristic);
            lf_CopyDimension(buffer, position, globalDimensions[d],
                             isCharacteristic);
            lf_CopyDimension(buffer, position, offsets[d], isCharacteristic);
void BP3Serializer::PutNameRecord(const std::string name,
                                  std::vector<char> &buffer) noexcept
    const uint16_t length = static_cast<const uint16_t>(name.length());
    InsertToBuffer(buffer, &length);
    InsertToBuffer(buffer, name.c_str(), length);
void BP3Serializer::PutNameRecord(const std::string name,
                                  std::vector<char> &buffer,
                                  size_t &position) noexcept
    const uint16_t length = static_cast<const uint16_t>(name.length());
    CopyToBuffer(buffer, position, &length);
    CopyToBuffer(buffer, position, name.c_str(), length);
}

BP3Serializer::SerialElementIndex &BP3Serializer::GetSerialElementIndex(
    const std::string &name,
    std::unordered_map<std::string, SerialElementIndex> &indices,
    bool &isNew) const noexcept
wfg's avatar
wfg committed
    auto itName = indices.find(name);
    if (itName == indices.end())
    {
        indices.emplace(name, SerialElementIndex(indices.size()));
wfg's avatar
wfg committed
        isNew = true;
        return indices.at(name);
    }

    isNew = false;
    return itName->second;
void BP3Serializer::SerializeDataBuffer(IO &io) noexcept
    auto &buffer = m_Data.m_Buffer;
    auto &position = m_Data.m_Position;
    auto &absolutePosition = m_Data.m_AbsolutePosition;
wfg's avatar
wfg committed
    // vars count and Length (only for PG)
    CopyToBuffer(buffer, m_MetadataSet.DataPGVarsCountPosition,
                 &m_MetadataSet.DataPGVarsCount);
    // without record itself and vars count
    const uint64_t varsLength =
        position - m_MetadataSet.DataPGVarsCountPosition - 8 - 4;
    CopyToBuffer(buffer, m_MetadataSet.DataPGVarsCountPosition, &varsLength);
    // attributes are only written once
    if (!m_MetadataSet.AreAttributesWritten)
    {
        PutAttributes(io);
        m_MetadataSet.AreAttributesWritten = true;
    }
    else
    {
        position += 12;
    // Finish writing pg group length without record itself
    const uint64_t dataPGLength =
        position - m_MetadataSet.DataPGLengthPosition - 8;
    CopyToBuffer(buffer, m_MetadataSet.DataPGLengthPosition, &dataPGLength);

    m_MetadataSet.DataPGIsOpen = false;
void BP3Serializer::SerializeMetadataInData() noexcept
    auto lf_SetIndexCountLength =
        [](std::unordered_map<std::string, SerialElementIndex> &indices,
           uint32_t &count, uint64_t &length) {
wfg's avatar
wfg committed

            count = indices.size();
            length = 0;
            for (auto &indexPair : indices) // set each index length
            {
                auto &indexBuffer = indexPair.second.Buffer;
                const uint32_t indexLength = indexBuffer.size() - 4;
                size_t indexLengthPosition = 0;
                CopyToBuffer(indexBuffer, indexLengthPosition, &indexLength);
wfg's avatar
wfg committed

                length += indexBuffer.size(); // overall length
            }
        };

    auto lf_FlattenIndices =
        [](const uint32_t count, const uint64_t length,
           const std::unordered_map<std::string, SerialElementIndex> &indices,
           std::vector<char> &buffer, size_t &position) {

            CopyToBuffer(buffer, position, &count);
            CopyToBuffer(buffer, position, &length);
wfg's avatar
wfg committed

            for (const auto &indexPair : indices) // set each index length
            {
                const auto &indexBuffer = indexPair.second.Buffer;
                CopyToBuffer(buffer, position, indexBuffer.data(),
                             indexBuffer.size());
wfg's avatar
wfg committed
            }
        };

    // Finish writing metadata counts and lengths
    // PG Index
    const uint64_t pgCount = m_MetadataSet.DataPGCount;
    const uint64_t pgLength = m_MetadataSet.PGIndex.Buffer.size();
wfg's avatar
wfg committed

    // var index count and length (total), and each index length
    uint32_t varsCount;
    uint64_t varsLength;
    lf_SetIndexCountLength(m_MetadataSet.VarsIndices, varsCount, varsLength);

wfg's avatar
wfg committed
    // attribute index count and length, and each index length
    uint32_t attributesCount;
    uint64_t attributesLength;
    lf_SetIndexCountLength(m_MetadataSet.AttributesIndices, attributesCount,
                           attributesLength);
    const size_t footerSize = static_cast<const size_t>(
        (pgLength + 16) + (varsLength + 12) + (attributesLength + 12) +
        m_MetadataSet.MiniFooterSize);
    auto &buffer = m_Data.m_Buffer;
    auto &position = m_Data.m_Position;
    auto &absolutePosition = m_Data.m_AbsolutePosition;

    // reserve data to fit metadata,
    // must replace with growth buffer strategy?
    m_Data.Resize(position + footerSize,
                  " when writing metadata in bp data buffer");
wfg's avatar
wfg committed

    // write pg index
    CopyToBuffer(buffer, position, &pgCount);
    CopyToBuffer(buffer, position, &pgLength);
    CopyToBuffer(buffer, position, m_MetadataSet.PGIndex.Buffer.data(),
                 static_cast<const size_t>(pgLength));
wfg's avatar
wfg committed
    // Vars indices
    lf_FlattenIndices(varsCount, varsLength, m_MetadataSet.VarsIndices, buffer,
                      position);
wfg's avatar
wfg committed
    // Attribute indices
    lf_FlattenIndices(attributesCount, attributesLength,
                      m_MetadataSet.AttributesIndices, buffer, position);
    // getting absolute offset start, minifooter is 28 bytes for now
    const uint64_t pgIndexStart = static_cast<const uint64_t>(absolutePosition);
    const uint64_t variablesIndexStart =
        static_cast<const uint64_t>(pgIndexStart + (pgLength + 16));
    const uint64_t attributesIndexStart =
        static_cast<const uint64_t>(variablesIndexStart + (varsLength + 12));
    PutMinifooter(pgIndexStart, variablesIndexStart, attributesIndexStart,
                  buffer, position);
    absolutePosition += footerSize;
    if (m_Profiler.IsActive)
        m_Profiler.Bytes.emplace("buffering", absolutePosition);
void BP3Serializer::PutMinifooter(const uint64_t pgIndexStart,
                                  const uint64_t variablesIndexStart,
                                  const uint64_t attributesIndexStart,
                                  std::vector<char> &buffer, size_t &position,
                                  const bool addSubfiles)
{
    CopyToBuffer(buffer, position, &pgIndexStart);
    CopyToBuffer(buffer, position, &variablesIndexStart);
    CopyToBuffer(buffer, position, &attributesIndexStart);

    // version
    uint8_t endianness = 0; // little-endian
    if (!IsLittleEndian())
        endianness = 1; // big-endian
    CopyToBuffer(buffer, position, &endianness);
    if (addSubfiles)
        position += 1;
        CopyToBuffer(buffer, position, &m_Version);
    }
    else
    {
        position += 2;
    CopyToBuffer(buffer, position, &m_Version);
void BP3Serializer::AggregateIndex(const SerialElementIndex &index,
                                   const size_t count)
{
    auto &buffer = m_Metadata.m_Buffer;
    auto &position = m_Metadata.m_Position;

    size_t countPosition = position;
    const size_t totalCount = ReduceValues<size_t>(count, m_MPIComm);
    if (m_RankMPI == 0)
    {
        // Write count
        position += 16;
        m_Metadata.Resize(position, " in call to AggregateIndex bp1 metadata");
        const uint64_t totalCountU64 = static_cast<const uint64_t>(totalCount);
        CopyToBuffer(buffer, countPosition, &totalCountU64);
    }

    // write contents
    GathervVectors(index.Buffer, buffer, position, m_MPIComm);

    // get total length and write it after count and before index
    if (m_RankMPI == 0)
    {
        const uint64_t totalLengthU64 =
            static_cast<const uint64_t>(position - countPosition - 8);
        CopyToBuffer(buffer, countPosition, &totalLengthU64);
    }
}

void BP3Serializer::AggregateMergeIndex(
    const std::unordered_map<std::string, SerialElementIndex> &indices) noexcept
{
    // first serialize index
    std::vector<char> serializedIndices = SerializeIndices(indices);
    // gather in rank 0
    std::vector<char> gatheredSerialIndices;
    size_t gatheredSerialIndicesPosition = 0;

    GathervVectors(serializedIndices, gatheredSerialIndices,
                   gatheredSerialIndicesPosition, m_MPIComm);

    // deallocate local serialized Indices
    std::vector<char>().swap(serializedIndices);

    // deserialize in [name][rank] order
    const std::unordered_map<std::string, std::vector<SerialElementIndex>>
        nameRankIndices =
            DeserializeIndicesPerRankThreads(gatheredSerialIndices);

    // deallocate gathered serial indices (full in rank 0 only)
    std::vector<char>().swap(gatheredSerialIndices);

    // to write count and length
    auto &buffer = m_Metadata.m_Buffer;
    auto &position = m_Metadata.m_Position;

    size_t countPosition = position;

    if (m_RankMPI == 0)
    {
        // Write count
        position += 12;
        m_Metadata.Resize(position,
                          ", in call to AggregateMergeIndex bp1 metadata");
        const uint32_t totalCountU32 =
            static_cast<const uint32_t>(nameRankIndices.size());
        CopyToBuffer(buffer, countPosition, &totalCountU32);
    }

    MergeSerializeIndices(nameRankIndices);

    if (m_RankMPI == 0)
    {
        // Write length
        const uint64_t totalLengthU64 =
            static_cast<const uint64_t>(position - countPosition - 8);
        CopyToBuffer(buffer, countPosition, &totalLengthU64);
    }
}

std::vector<char> BP3Serializer::SerializeIndices(
    const std::unordered_map<std::string, SerialElementIndex> &indices) const
    noexcept
{
    std::vector<char> serializedIndices;

    for (const auto &indexPair : indices)
    {
        const SerialElementIndex &index = indexPair.second;

        // add rank at the beginning
        const uint32_t rankSource = static_cast<const uint32_t>(m_RankMPI);
        InsertToBuffer(serializedIndices, &rankSource);

        // insert buffer
        InsertToBuffer(serializedIndices, index.Buffer.data(),
                       index.Buffer.size());
    }

    return serializedIndices;
}

std::unordered_map<std::string, std::vector<BP3Base::SerialElementIndex>>
BP3Serializer::DeserializeIndicesPerRankThreads(
    const std::vector<char> &serialized) const noexcept
{
    auto lf_Deserialize = [&](
        const int rankSource, const std::vector<char> &serialized,
        const size_t serializedPosition,
        std::unordered_map<std::string, std::vector<SerialElementIndex>>
            &deserialized) {

        size_t localPosition = serializedPosition;
        ElementIndexHeader header =
            ReadElementIndexHeader(serialized, localPosition);

        // mutex portion
        {
            std::lock_guard<std::mutex> lock(m_Mutex);
            if (deserialized.count(header.Name) == 0)
            {
                deserialized[header.Name] = std::vector<SerialElementIndex>(
                    m_SizeMPI, SerialElementIndex(header.MemberID, 0));
            }
        }

        const size_t bufferSize = static_cast<const size_t>(header.Length) + 4;
        SerialElementIndex &index = deserialized[header.Name][rankSource];
        InsertToBuffer(index.Buffer, &serialized[serializedPosition],
                       bufferSize);
    };

    // BODY OF FUNCTION starts here
    std::unordered_map<std::string, std::vector<SerialElementIndex>>
        deserialized;
    const size_t serializedSize = serialized.size();

    if (m_RankMPI != 0 || serializedSize < 8)
    {
        return deserialized;
    }

    size_t serializedPosition = 0;

    std::vector<std::future<void>> asyncs(m_Threads);
    std::vector<size_t> asyncPositions(m_Threads);
    std::vector<int> asyncRankSources(m_Threads);

    bool launched = false;

    while (serializedPosition < serializedSize)
    {
        // extract rank and index buffer size
        for (unsigned int t = 0; t < m_Threads; ++t)
        {
            if (serializedPosition >= serializedSize)
            {
                break;
            }

            const int rankSource = static_cast<const int>(
                ReadValue<uint32_t>(serialized, serializedPosition));
            asyncRankSources[t] = rankSource;
            asyncPositions[t] = serializedPosition;

            const size_t bufferSize = static_cast<const size_t>(
                ReadValue<uint32_t>(serialized, serializedPosition));
            serializedPosition += bufferSize;

            if (launched)
            {
                asyncs[t].get();
            }

            if (serializedPosition <= serializedSize)
            {
                asyncs[t] =
                    std::async(std::launch::async, lf_Deserialize,
                               asyncRankSources[t], std::ref(serialized),
                               asyncPositions[t], std::ref(deserialized));
            }
        }

        launched = true;
    }

    for (auto &async : asyncs)
    {
        if (async.valid())
        {
            async.wait();
        }
    }

    return deserialized;
}

void BP3Serializer::MergeSerializeIndices(
    const std::unordered_map<std::string, std::vector<SerialElementIndex>>
        &nameRankIndices) noexcept
{
    auto lf_GetCharacteristics = [&](const std::vector<char> &buffer,
                                     size_t &position, const uint8_t dataType,
                                     uint8_t &count, uint32_t &length,
                                     uint32_t &timeStep)

    {
        const DataTypes dataTypeEnum = static_cast<DataTypes>(dataType);

        switch (dataTypeEnum)
        case (type_string):
        {
            const auto characteristics =
                ReadElementIndexCharacteristics<std::string>(buffer, position,
                                                             type_string, true);
            count = characteristics.EntryCount;
            length = characteristics.EntryLength;
            timeStep = characteristics.Statistics.Step;
            break;
        }

        case (type_string_array):
        {
            const auto characteristics =
                ReadElementIndexCharacteristics<std::string>(
                    buffer, position, type_string_array, true);
            count = characteristics.EntryCount;
            length = characteristics.EntryLength;
            timeStep = characteristics.Statistics.Step;
            break;
        }

            const auto characteristics = ReadElementIndexCharacteristics<char>(
                buffer, position, type_byte, true);
            count = characteristics.EntryCount;
            length = characteristics.EntryLength;
            timeStep = characteristics.Statistics.Step;
            const auto characteristics = ReadElementIndexCharacteristics<short>(
                buffer, position, type_short, true);
            count = characteristics.EntryCount;
            length = characteristics.EntryLength;
            timeStep = characteristics.Statistics.Step;
            const auto characteristics = ReadElementIndexCharacteristics<int>(
                buffer, position, type_integer, true);
            count = characteristics.EntryCount;
            length = characteristics.EntryLength;
            timeStep = characteristics.Statistics.Step;
            break;
        }

        case (type_long):
        {
            const auto characteristics =
                ReadElementIndexCharacteristics<long int>(buffer, position,
                                                          type_long, true);
            count = characteristics.EntryCount;
            length = characteristics.EntryLength;
            timeStep = characteristics.Statistics.Step;
            break;
        }

        case (type_unsigned_byte):
        {
            const auto characteristics =
                ReadElementIndexCharacteristics<unsigned char>(
                    buffer, position, type_unsigned_byte, true);
            count = characteristics.EntryCount;
            length = characteristics.EntryLength;
            timeStep = characteristics.Statistics.Step;
            break;
        }

        case (type_unsigned_short):
        {
            const auto characteristics =
                ReadElementIndexCharacteristics<unsigned short>(
                    buffer, position, type_unsigned_short, true);
            count = characteristics.EntryCount;
            length = characteristics.EntryLength;
            timeStep = characteristics.Statistics.Step;
            break;
        }

        case (type_unsigned_integer):
        {
            const auto characteristics =
                ReadElementIndexCharacteristics<unsigned int>(
                    buffer, position, type_unsigned_integer, true);
            count = characteristics.EntryCount;
            length = characteristics.EntryLength;
            timeStep = characteristics.Statistics.Step;
            break;
        }

        case (type_unsigned_long):
        {
            auto characteristics =
                ReadElementIndexCharacteristics<unsigned long int>(
                    buffer, position, type_unsigned_long, true);
            count = characteristics.EntryCount;
            length = characteristics.EntryLength;
            timeStep = characteristics.Statistics.Step;
            auto characteristics = ReadElementIndexCharacteristics<float>(
                buffer, position, type_real, true);
            count = characteristics.EntryCount;
            length = characteristics.EntryLength;
            timeStep = characteristics.Statistics.Step;
            auto characteristics = ReadElementIndexCharacteristics<double>(
                buffer, position, type_double, true);
            count = characteristics.EntryCount;
            length = characteristics.EntryLength;
            timeStep = characteristics.Statistics.Step;
            // TODO: complex, string array, long double
            throw std::invalid_argument("ERROR: type " +
                                        std::to_string(dataType) +
                                        " not supported in Merge\n");

        } // end switch

    };

    auto lf_MergeRank = [&](const std::vector<SerialElementIndex> &indices) {

        // extract header
        ElementIndexHeader header;
        // index non-empty buffer
        size_t firstRank = 0;
        // index positions per rank
        std::vector<size_t> positions(indices.size(), 0);
        // merge index length
        size_t headerSize = 0;

        for (size_t r = 0; r < indices.size(); ++r)
        {
            const auto &buffer = indices[r].Buffer;
            if (buffer.empty())
            {
                continue;
            }
            size_t &position = positions[r];

            header = ReadElementIndexHeader(buffer, position);
            firstRank = r;

            headerSize = position;
        // move all positions to headerSize
        for (size_t r = 0; r < indices.size(); ++r)
        {
            const auto &buffer = indices[r].Buffer;
            if (buffer.empty())
            {
                continue;
            }
            positions[r] = headerSize;
        }

        uint64_t setsCount = 0;
        unsigned int currentTimeStep = 1;
        bool marching = true;
        std::vector<char> sorted;

        while (marching)
        {
            marching = false;

            for (size_t r = firstRank; r < indices.size(); ++r)
            {
                const auto &buffer = indices[r].Buffer;
                if (buffer.empty())
                {
                    continue;
                }

                auto &position = positions[r];
                if (position < buffer.size())
                {
                    marching = true;
                }
                else
                {
                    continue;
                }

                uint8_t count = 0;