Skip to content
Snippets Groups Projects
BP1Writer.cpp 13.2 KiB
Newer Older
 * Distributed under the OSI-approved Apache License, Version 2.0.  See
 * accompanying file Copyright.txt for details.
 *
 * BP1Writer.cpp
 *
 *  Created on: Feb 1, 2017
 *      Author: wfg
 */

/// \cond EXCLUDE_FROM_DOXYGEN
#include "utilities/format/bp1/BP1Writer.h"

#include <string>
#include <vector>
/// \endcond

namespace adios
{
namespace format
{

std::size_t BP1Writer::GetProcessGroupIndexSize(
    const std::string name, const std::string timeStepName,
    const std::size_t numberOfTransports) const noexcept
{
  // pgIndex + list of methods (transports)
  return (name.length() + timeStepName.length() + 23) +
         (3 + numberOfTransports); // should be sufficient for data and metadata
                                   // pgindices
}

void BP1Writer::WriteProcessGroupIndex(
    const bool isFortran, const std::string name, const std::uint32_t processID,
    const std::vector<std::shared_ptr<Transport>> &transports,
    capsule::STLVector &heap, BP1MetadataSet &metadataSet) const noexcept
{
  std::vector<char> &metadataBuffer = metadataSet.PGIndex.Buffer;
  std::vector<char> &dataBuffer = heap.m_Data;

  metadataSet.DataPGLengthPosition = dataBuffer.size();
  dataBuffer.insert(dataBuffer.end(), 8, 0); // skip pg length (8)

  const std::size_t metadataPGLengthPosition = metadataBuffer.size();
  metadataBuffer.insert(metadataBuffer.end(), 2, 0); // skip pg length (2)

  // write name to metadata
  WriteNameRecord(name, metadataBuffer);
  // write if host language Fortran in metadata and data
  const char hostFortran =
      (isFortran) ? 'y' : 'n'; // if host language is fortran
  CopyToBuffer(metadataBuffer, &hostFortran);
  CopyToBuffer(dataBuffer, &hostFortran);
  // write name in data
  WriteNameRecord(name, dataBuffer);

  // processID in metadata,
  CopyToBuffer(metadataBuffer, &processID);
  // skip coordination var in data ....what is coordination var?
  dataBuffer.insert(dataBuffer.end(), 4, 0);

  // time step name to metadata and data
  const std::string timeStepName(std::to_string(metadataSet.TimeStep));
  WriteNameRecord(timeStepName, metadataBuffer);
  WriteNameRecord(timeStepName, dataBuffer);

  // time step to metadata and data
  CopyToBuffer(metadataBuffer, &metadataSet.TimeStep);
  CopyToBuffer(dataBuffer, &metadataSet.TimeStep);

  // offset to pg in data in metadata which is the current absolute position
  CopyToBuffer(metadataBuffer,
               reinterpret_cast<std::uint64_t *>(&heap.m_DataAbsolutePosition));

  // Back to writing metadata pg index length (length of group)
  const std::uint16_t metadataPGIndexLength =
      metadataBuffer.size() - metadataPGLengthPosition -
      2; // without length of group record
  CopyToBuffer(metadataBuffer, metadataPGLengthPosition,
               &metadataPGIndexLength);
  // DONE With metadataBuffer

  // here write method in data
  const std::vector<std::uint8_t> methodIDs = GetMethodIDs(transports);
  const std::uint8_t methodsCount = methodIDs.size();
  CopyToBuffer(dataBuffer, &methodsCount); // count
  const std::uint16_t methodsLength =
      methodIDs.size() *
      3; // methodID (1) + method params length(2), no parameters for now
  CopyToBuffer(dataBuffer, &methodsLength); // length

  for (const auto methodID : methodIDs)
  {
    CopyToBuffer(dataBuffer, &methodID); // method ID,
    dataBuffer.insert(dataBuffer.end(), 2,
                      0); // skip method params length = 0 (2 bytes) for now
  }

  // update absolute position
  heap.m_DataAbsolutePosition +=
      dataBuffer.size() - metadataSet.DataPGLengthPosition;
  // pg vars count and position
  metadataSet.DataPGVarsCount = 0;
  metadataSet.DataPGVarsCountPosition = dataBuffer.size();
  // add vars count and length
  dataBuffer.insert(dataBuffer.end(), 12, 0);
  heap.m_DataAbsolutePosition += 12; // add vars count and length

  ++metadataSet.DataPGCount;
  metadataSet.DataPGIsOpen = true;
}

void BP1Writer::Advance(BP1MetadataSet &metadataSet, capsule::STLVector &buffer)
{
  FlattenData(metadataSet, buffer);
}

void BP1Writer::Close(BP1MetadataSet &metadataSet, capsule::STLVector &heap,
                      Transport &transport, bool &isFirstClose,
                      const bool doAggregation) const noexcept
{
  if (metadataSet.Log.IsActive == true)
    metadataSet.Log.Timers[0].SetInitialTime();

  if (isFirstClose == true)
  {
    if (metadataSet.DataPGIsOpen == true)
      FlattenData(metadataSet, heap);

    FlattenMetadata(metadataSet, heap);

    if (metadataSet.Log.IsActive == true)
      metadataSet.Log.Timers[0].SetInitialTime();

    if (doAggregation == true) // N-to-M  where 1 <= M <= N-1, might need a new
                               // Log metadataSet.Log.m_Timers just for
                               // aggregation
    {
      // here call aggregator
    }
    isFirstClose = false;
  }

  if (doAggregation == true) // N-to-M  where 1 <= M <= N-1
  {
    // here call aggregator to select transports for Write and Close
  }
  else // N-to-N
  {
    transport.Write(heap.m_Data.data(), heap.m_Data.size()); // single write
    transport.Close();
  }
}

std::string BP1Writer::GetRankProfilingLog(
    const int rank, const BP1MetadataSet &metadataSet,
    const std::vector<std::shared_ptr<Transport>> &transports) const noexcept
{
  auto lf_WriterTimer = [](std::string &rankLog,
                           const profiling::Timer &timer) {
    rankLog += "'" + timer.m_Process + "_" + timer.GetUnits() + "': " +
               std::to_string(timer.m_ProcessTime) + ", ";
  };

  // prepare string dictionary per rank
  std::string rankLog("'rank_" + std::to_string(rank) + "': { ");

  auto &profiler = metadataSet.Log;
  rankLog += "'bytes': " + std::to_string(profiler.TotalBytes[0]) + ", ";
  lf_WriterTimer(rankLog, profiler.Timers[0]);

  for (unsigned int t = 0; t < transports.size(); ++t)
  {
    auto &timers = transports[t]->m_Profiler.Timers;

    rankLog += "'transport_" + std::to_string(t) + "': { ";
    rankLog += "'lib': " + transports[t]->m_Type + ", ";

    for (unsigned int i = 0; i < 3; ++i)
      lf_WriterTimer(rankLog, timers[i]);

    rankLog += "}, ";
  }
  rankLog += "}, ";

  return rankLog;
}

// PRIVATE FUNCTIONS
void BP1Writer::WriteDimensionsRecord(
    std::vector<char> &buffer, const std::vector<std::size_t> &localDimensions,
    const std::vector<std::size_t> &globalDimensions,
    const std::vector<std::size_t> &globalOffsets, const unsigned int skip,
    const bool addType) const noexcept
{
  auto lf_WriteFlaggedDim = [](std::vector<char> &buffer, const char no,
                               const std::size_t dimension) {
    CopyToBuffer(buffer, &no);
    CopyToBuffer(buffer, reinterpret_cast<const std::uint64_t *>(&dimension));
  };

  // BODY Starts here
  if (globalDimensions.empty())
  {
    if (addType == true)
    {
      constexpr char no = 'n'; // dimension format unsigned int value (not using
                               // memberID for now)
      for (const auto &localDimension : localDimensions)
      {
        lf_WriteFlaggedDim(buffer, no, localDimension);
        buffer.insert(buffer.end(), skip, 0);
      }
    }
    else
    {
      for (const auto &localDimension : localDimensions)
      {
        CopyToBuffer(buffer,
                     reinterpret_cast<const std::uint64_t *>(&localDimension));
        buffer.insert(buffer.end(), skip, 0);
      }
    }
  }
  else
  {
    if (addType == true)
    {
      constexpr char no = 'n'; // dimension format unsigned int value for now
      for (unsigned int d = 0; d < localDimensions.size(); ++d)
      {
        lf_WriteFlaggedDim(buffer, no, localDimensions[d]);
        lf_WriteFlaggedDim(buffer, no, globalDimensions[d]);
        lf_WriteFlaggedDim(buffer, no, globalOffsets[d]);
      }
    }
    else
    {
      for (unsigned int d = 0; d < localDimensions.size(); ++d)
      {
        CopyToBuffer(buffer, reinterpret_cast<const std::uint64_t *>(
                                 &localDimensions[d]));
        CopyToBuffer(buffer, reinterpret_cast<const std::uint64_t *>(
                                 &globalDimensions[d]));
        CopyToBuffer(
            buffer, reinterpret_cast<const std::uint64_t *>(&globalOffsets[d]));
      }
    }
  }
}

void BP1Writer::WriteNameRecord(const std::string name,
                                std::vector<char> &buffer) const noexcept
{
  const std::uint16_t length = name.length();
  CopyToBuffer(buffer, &length);
  CopyToBuffer(buffer, name.c_str(), length);
}

BP1Index &
BP1Writer::GetBP1Index(const std::string name,
                       std::unordered_map<std::string, BP1Index> &indices,
                       bool &isNew) const noexcept
{
  auto itName = indices.find(name);
  if (itName == indices.end())
  {
    indices.emplace(name, BP1Index(indices.size()));
    isNew = true;
    return indices.at(name);
  }

  isNew = false;
  return itName->second;
}

void BP1Writer::FlattenData(BP1MetadataSet &metadataSet,
                            capsule::STLVector &heap) const noexcept
{
  auto &buffer = heap.m_Data;
  // vars count and Length (only for PG)
  CopyToBuffer(buffer, metadataSet.DataPGVarsCountPosition,
               &metadataSet.DataPGVarsCount);
  const std::uint64_t varsLength = buffer.size() -
                                   metadataSet.DataPGVarsCountPosition - 8 -
                                   4; // without record itself and vars count
  CopyToBuffer(buffer, metadataSet.DataPGVarsCountPosition + 4, &varsLength);

  // attributes (empty for now) count (4) and length (8) are zero by moving
  // positions in time step zero
  buffer.insert(buffer.end(), 12, 0);
  heap.m_DataAbsolutePosition += 12;

  // Finish writing pg group length
  const std::uint64_t dataPGLength =
      buffer.size() - metadataSet.DataPGLengthPosition -
      8; // without record itself, 12 due to empty attributes
  CopyToBuffer(buffer, metadataSet.DataPGLengthPosition, &dataPGLength);

  ++metadataSet.TimeStep;
  metadataSet.DataPGIsOpen = false;
}

void BP1Writer::FlattenMetadata(BP1MetadataSet &metadataSet,
                                capsule::STLVector &heap) const noexcept
{
  auto lf_IndexCountLength =
      [](std::unordered_map<std::string, BP1Index> &indices,
         std::uint32_t &count, std::uint64_t &length) {
        count = indices.size();
        length = 0;
        for (auto &indexPair : indices) // set each index length
        {
          auto &indexBuffer = indexPair.second.Buffer;
          const std::uint32_t indexLength = indexBuffer.size() - 4;
          CopyToBuffer(indexBuffer, 0, &indexLength);

          length += indexBuffer.size(); // overall length
        }
      };

  auto lf_FlattenIndices =
      [](const std::uint32_t count, const std::uint64_t length,
         const std::unordered_map<std::string, BP1Index> &indices,
         std::vector<char> &buffer) {
        CopyToBuffer(buffer, &count);
        CopyToBuffer(buffer, &length);

        for (const auto &indexPair : indices) // set each index length
        {
          const auto &indexBuffer = indexPair.second.Buffer;
          CopyToBuffer(buffer, indexBuffer.data(), indexBuffer.size());
        }
      };

  // Finish writing metadata counts and lengths
  // PG Index
  const std::uint64_t pgCount = metadataSet.DataPGCount;
  const std::uint64_t pgLength = metadataSet.PGIndex.Buffer.size();

  // var index count and length (total), and each index length
  std::uint32_t varsCount;
  std::uint64_t varsLength;
  lf_IndexCountLength(metadataSet.VarsIndices, varsCount, varsLength);
  // attribute index count and length, and each index length
  std::uint32_t attributesCount;
  std::uint64_t attributesLength;
  lf_IndexCountLength(metadataSet.AttributesIndices, attributesCount,
                      attributesLength);

  const std::size_t footerSize = (pgLength + 16) + (varsLength + 12) +
                                 (attributesLength + 12) +
                                 metadataSet.MiniFooterSize;
  auto &buffer = heap.m_Data;
  buffer.reserve(buffer.size() + footerSize); // reserve data to fit metadata,
                                              // must replace with growth buffer
                                              // strategy

  // write pg index
  CopyToBuffer(buffer, &pgCount);
  CopyToBuffer(buffer, &pgLength);
  CopyToBuffer(buffer, metadataSet.PGIndex.Buffer.data(), pgLength);
  // Vars indices
  lf_FlattenIndices(varsCount, varsLength, metadataSet.VarsIndices, buffer);
  // Attribute indices
  lf_FlattenIndices(attributesCount, attributesLength,
                    metadataSet.AttributesIndices, buffer);

  // getting absolute offsets, minifooter is 28 bytes for now
  const std::uint64_t offsetPGIndex = heap.m_DataAbsolutePosition;
  const std::uint64_t offsetVarsIndex = offsetPGIndex + (pgLength + 16);
  const std::uint64_t offsetAttributeIndex =
      offsetVarsIndex + (varsLength + 12);

  CopyToBuffer(buffer, &offsetPGIndex);
  CopyToBuffer(buffer, &offsetVarsIndex);
  CopyToBuffer(buffer, &offsetAttributeIndex);

  // version
  if (IsLittleEndian())
  {
    const std::uint8_t endian = 0;
    CopyToBuffer(buffer, &endian);
    buffer.insert(buffer.end(), 2, 0);
    CopyToBuffer(buffer, &m_Version);
  }
  else
  {
  }

  heap.m_DataAbsolutePosition += footerSize;

  if (metadataSet.Log.IsActive == true)
    metadataSet.Log.TotalBytes.push_back(heap.m_DataAbsolutePosition);
}

} // end namespace format
} // end namespace adios