Skip to content
Snippets Groups Projects
BP1Writer.h 19.5 KiB
Newer Older
wfg's avatar
wfg committed
/*
 * Distributed under the OSI-approved Apache License, Version 2.0.  See
 * accompanying file Copyright.txt for details.
 *
wfg's avatar
wfg committed
 * BP1.h
 *
 *  Created on: Jan 24, 2017
 *      Author: wfg
 */

#ifndef BP1WRITER_H_
#define BP1WRITER_H_

wfg's avatar
wfg committed
/// \cond EXCLUDE_FROM_DOXYGEN
#include <algorithm> //std::count, std::copy, std::for_each
#include <cmath>     //std::ceil
#include <cstring>   //std::memcpy

#include "utilities/format/bp1/BP1Base.h"
#include "utilities/format/bp1/BP1Structs.h"
wfg's avatar
wfg committed

#include "capsule/heap/STLVector.h"
#include "core/Variable.h"
wfg's avatar
wfg committed
#include "functions/adiosFunctions.h"
#include "functions/adiosTemplates.h"
wfg's avatar
wfg committed

namespace adios
{
namespace format
{

class BP1Writer : public BP1Base
wfg's avatar
wfg committed
{

public:
wfg's avatar
wfg committed
    unsigned int m_Threads = 1; ///< thread operations in large array (min,max)
    unsigned int m_Verbosity = 0; ///< statistics verbosity, only 0 is supported
    float m_GrowthFactor = 1.5;   ///< memory growth factor
    const std::uint8_t m_Version = 3; ///< BP format version

    /**
     * Calculates the Process Index size in bytes according to the BP format,
     * including list of method with no parameters (for now)
     * @param name
     * @param timeStepName
     * @param numberOfTransports
     * @return size of pg index
     */
    std::size_t GetProcessGroupIndexSize(
        const std::string name, const std::string timeStepName,
        const std::size_t numberOfTransports) const noexcept;

    /**
     * Writes a process group index PGIndex and list of methods (from
     * transports),
     * done at Open or aggregation of new time step
     * Version that operates on a single heap buffer and metadataset.
     * @param isFortran
     * @param name
     * @param processID
     * @param transports
     * @param buffer
     * @param metadataSet
     */
    void WriteProcessGroupIndex(
        const bool isFortran, const std::string name,
        const std::uint32_t processID,
        const std::vector<std::shared_ptr<Transport>> &transports,
        capsule::STLVector &heap, BP1MetadataSet &metadataSet) const noexcept;

    /**
     * Returns the estimated variable index size
     * @param group
     * @param variableName
     * @param variable
     * @param verbosity
     * @return variable index size
     */
    template <class T>
    size_t GetVariableIndexSize(const Variable<T> &variable) const noexcept
wfg's avatar
wfg committed
    {
wfg's avatar
wfg committed
        // size_t indexSize = varEntryLength + memberID + lengthGroupName +
        // groupName + lengthVariableName + lengthOfPath + path + datatype
        size_t indexSize = 23; // without characteristics
        indexSize += variable.m_Name.size();

        // characteristics 3 and 4, check variable number of dimensions
        const std::size_t dimensions =
            variable.DimensionsSize(); // commas in CSV + 1
        indexSize += 28 * dimensions;  // 28 bytes per dimension
        indexSize += 1;                // id

        // characteristics, offset + payload offset in data
        indexSize += 2 * (1 + 8);
        // characteristic 0, if scalar add value, for now only allowing string
        if (dimensions == 1)
        {
            indexSize += sizeof(T);
            indexSize += 1; // id
            // must have an if here
            indexSize += 2 + variable.m_Name.size();
            indexSize += 1; // id
        }

        // characteristic statistics
        if (m_Verbosity == 0) // default, only min and max
        {
            indexSize += 2 * (sizeof(T) + 1);
            indexSize += 1 + 1; // id
        }

        return indexSize + 12; /// extra 12 bytes in case of attributes
                               // need to add transform characteristics
wfg's avatar
wfg committed
    /**
     * Version for primitive types (except std::complex<T>)
     * @param variable
     * @param heap
     * @param metadataSet
     */
    template <class T>
    void WriteVariableMetadata(const Variable<T> &variable,
                               capsule::STLVector &heap,
                               BP1MetadataSet &metadataSet) const noexcept
wfg's avatar
wfg committed
        Stats<T> stats = GetStats(variable);
        WriteVariableMetadataCommon(variable, stats, heap, metadataSet);
wfg's avatar
wfg committed
    }
wfg's avatar
wfg committed

wfg's avatar
wfg committed
    /**
     * Overloaded version for std::complex<T> variables
     * @param variable
     * @param heap
     * @param metadataSet
     */
    template <class T>
    void WriteVariableMetadata(const Variable<std::complex<T>> &variable,
                               capsule::STLVector &heap,
                               BP1MetadataSet &metadataSet) const noexcept
    {
        Stats<T> stats = GetStats(variable);
        WriteVariableMetadataCommon(variable, stats, heap, metadataSet);
    }
wfg's avatar
wfg committed
    /**
     * Expensive part this is only for heap buffers need to adapt to vector of
     * capsules
     * @param variable
     * @param buffer
     */
    template <class T>
    void WriteVariablePayload(const Variable<T> &variable,
                              capsule::STLVector &heap,
                              const unsigned int nthreads = 1) const noexcept
wfg's avatar
wfg committed
    {
wfg's avatar
wfg committed
        // EXPENSIVE part, might want to use threads if large, serial for now
        InsertToBuffer(heap.m_Data, variable.m_AppValues, variable.TotalSize());
wfg's avatar
wfg committed
        heap.m_DataAbsolutePosition += variable.PayLoadSize();
wfg's avatar
wfg committed

    void Advance(BP1MetadataSet &metadataSet, capsule::STLVector &buffer);

    /**
     * Function that sets metadata (if first close) and writes to a single
     * transport
     * @param metadataSet current rank metadata set
     * @param heap contains data buffer
     * @param transport does a write after data and metadata is setup
     * @param isFirstClose true: metadata has been set and aggregated
     * @param doAggregation true: for N-to-M, false: for N-to-N
     */
    void Close(BP1MetadataSet &metadataSet, capsule::STLVector &heap,
               Transport &transport, bool &isFirstClose,
               const bool doAggregation) const noexcept;

    /**
     * Writes the ADIOS log information (buffering, open, write and close) for a
     * rank process
     * @param rank current rank
     * @param metadataSet contains Profile info for buffering
     * @param transports  contains Profile info for transport open, writes and
     * close
     * @return string for this rank that will be aggregated into profiling.log
     */
    std::string GetRankProfilingLog(
        const int rank, const BP1MetadataSet &metadataSet,
        const std::vector<std::shared_ptr<Transport>> &transports) const
        noexcept;

private:
    template <class T, class U>
    void WriteVariableMetadataCommon(const Variable<T> &variable,
                                     Stats<U> &stats, capsule::STLVector &heap,
                                     BP1MetadataSet &metadataSet) const noexcept
wfg's avatar
wfg committed
        stats.TimeIndex = metadataSet.TimeStep;

        // Get new Index or point to existing index
        bool isNew = true; // flag to check if variable is new
        BP1Index &varIndex =
            GetBP1Index(variable.m_Name, metadataSet.VarsIndices, isNew);
        stats.MemberID = varIndex.MemberID;

        // write metadata header in data and extract offsets
        stats.Offset = heap.m_DataAbsolutePosition;
        WriteVariableMetadataInData(variable, stats, heap);
        stats.PayloadOffset = heap.m_DataAbsolutePosition;

        // write to metadata  index
        WriteVariableMetadataInIndex(variable, stats, isNew, varIndex);

        ++metadataSet.DataPGVarsCount;
wgodoy's avatar
wgodoy committed
    }
wfg's avatar
wfg committed
    template <class T, class U>
    void WriteVariableMetadataInData(const Variable<T> &variable,
                                     const Stats<U> &stats,
                                     capsule::STLVector &heap) const noexcept
wfg's avatar
wfg committed
        auto &buffer = heap.m_Data;

        const std::size_t varLengthPosition =
            buffer.size(); // capture initial position for variable length
        buffer.insert(buffer.end(), 8, 0);              // skip var length (8)
        InsertToBuffer(buffer, &stats.MemberID);        // memberID
wfg's avatar
wfg committed
        WriteNameRecord(variable.m_Name, buffer);       // variable name
        buffer.insert(buffer.end(), 2, 0);              // skip path
        const std::uint8_t dataType = GetDataType<T>(); // dataType
        InsertToBuffer(buffer, &dataType);
wfg's avatar
wfg committed
        constexpr char no = 'n'; // isDimension
        InsertToBuffer(buffer, &no);
wfg's avatar
wfg committed

        // write variable dimensions
        const std::uint8_t dimensions = variable.m_LocalDimensions.size();
        InsertToBuffer(buffer, &dimensions); // count
wfg's avatar
wfg committed
        std::uint16_t dimensionsLength =
            27 *
            dimensions; // 27 is from 9 bytes for each: var y/n + local, var
                        // y/n + global dimension, var y/n + global offset,
                        // changed for characteristic
        InsertToBuffer(buffer, &dimensionsLength); // length
wfg's avatar
wfg committed
        WriteDimensionsRecord(buffer, variable.m_LocalDimensions,
                              variable.m_GlobalDimensions, variable.m_Offsets,
                              18, true);

        // CHARACTERISTICS
        WriteVariableCharacteristics(variable, stats, buffer, true);

        // Back to varLength including payload size
        const std::uint64_t varLength = buffer.size() - varLengthPosition +
                                        variable.PayLoadSize() -
                                        8; // remove its own size
        CopyToBufferPosition(buffer, varLengthPosition, &varLength); // length
wfg's avatar
wfg committed

        heap.m_DataAbsolutePosition +=
            buffer.size() - varLengthPosition; // update absolute position to be
                                               // used as payload position
wfg's avatar
wfg committed
    template <class T, class U>
    void WriteVariableMetadataInIndex(const Variable<T> &variable,
                                      const Stats<U> &stats, const bool isNew,
                                      BP1Index &index) const noexcept
wfg's avatar
wfg committed
        auto &buffer = index.Buffer;

        if (isNew == true) // write variable header (might be shared with
                           // attributes index)
        {
            buffer.insert(buffer.end(), 4, 0); // skip var length (4)
            InsertToBuffer(buffer, &stats.MemberID);
wfg's avatar
wfg committed
            buffer.insert(buffer.end(), 2, 0); // skip group name
            WriteNameRecord(variable.m_Name, buffer);
            buffer.insert(buffer.end(), 2, 0); // skip path

            const std::uint8_t dataType = GetDataType<T>();
            InsertToBuffer(buffer, &dataType);
wfg's avatar
wfg committed

            // Characteristics Sets Count in Metadata
            index.Count = 1;
            InsertToBuffer(buffer, &index.Count);
wfg's avatar
wfg committed
        }
        else // update characteristics sets count
        {
            const std::size_t characteristicsSetsCountPosition =
                15 + variable.m_Name.size();
            ++index.Count;
            CopyToBufferPosition(buffer, characteristicsSetsCountPosition,
                                 &index.Count); // test
wfg's avatar
wfg committed
        }

        WriteVariableCharacteristics(variable, stats, buffer);
wfg's avatar
wfg committed

    template <class T, class U>
    void WriteVariableCharacteristics(const Variable<T> &variable,
                                      const Stats<U> &stats,
                                      std::vector<char> &buffer,
                                      const bool addLength = false) const
        noexcept
wfg's avatar
wfg committed
        const std::size_t characteristicsCountPosition =
            buffer.size(); // very important to track as writer is going back to
                           // this position
        buffer.insert(buffer.end(), 5,
                      0); // skip characteristics count(1) + length (4)
        std::uint8_t characteristicsCounter = 0;

        // DIMENSIONS
        std::uint8_t characteristicID = characteristic_dimensions;
        InsertToBuffer(buffer, &characteristicID);
wfg's avatar
wfg committed
        const std::uint8_t dimensions = variable.m_LocalDimensions.size();

        if (addLength == true)
        {
            const std::int16_t lengthOfDimensionsCharacteristic =
                24 * dimensions +
                3; // 24 = 3 local, global, offset x 8 bytes/each
            InsertToBuffer(buffer, &lengthOfDimensionsCharacteristic);
        InsertToBuffer(buffer, &dimensions); // count
wfg's avatar
wfg committed
        const std::uint16_t dimensionsLength = 24 * dimensions;
        InsertToBuffer(buffer, &dimensionsLength); // length
wfg's avatar
wfg committed
        WriteDimensionsRecord(buffer, variable.m_LocalDimensions,
                              variable.m_GlobalDimensions, variable.m_Offsets,
                              16, addLength);
        ++characteristicsCounter;

        // VALUE for SCALAR or STAT min, max for ARRAY
        WriteBoundsRecord(variable.m_IsScalar, stats, buffer,
                          characteristicsCounter, addLength);
        // TIME INDEX
        WriteCharacteristicRecord(characteristic_time_index, stats.TimeIndex,
                                  buffer, characteristicsCounter, addLength);

        if (addLength == false) // only in metadata offset and payload offset
        {
            WriteCharacteristicRecord(characteristic_offset, stats.Offset,
                                      buffer, characteristicsCounter);
            WriteCharacteristicRecord(characteristic_payload_offset,
                                      stats.PayloadOffset, buffer,
                                      characteristicsCounter);
        }
        // END OF CHARACTERISTICS

        // Back to characteristics count and length
        CopyToBufferPosition(buffer, characteristicsCountPosition,
                             &characteristicsCounter); // count (1)
wfg's avatar
wfg committed
        const std::uint32_t characteristicsLength =
            buffer.size() - characteristicsCountPosition - 4 -
            1; // remove its own length (4 bytes) + characteristic counter ( 1
               // byte
               // )
        CopyToBufferPosition(buffer, characteristicsCountPosition + 1,
                             &characteristicsLength); // length
wfg's avatar
wfg committed

    /**
     * Writes from &buffer[position]:  [2
     * bytes:string.length()][string.length():
     * string.c_str()]
     * @param name
     * @param buffer
     * @param position
     */
    void WriteNameRecord(const std::string name,
                         std::vector<char> &buffer) const noexcept;

    /**
     * Write a dimension record for a global variable used by
     * WriteVariableCommon
     * @param buffer
     * @param position
     * @param localDimensions
     * @param globalDimensions
     * @param offsets
     * @param addType true: for data buffers, false: for metadata buffer and
     * data
     * characteristic
     */
    void WriteDimensionsRecord(std::vector<char> &buffer,
                               const std::vector<std::size_t> &localDimensions,
                               const std::vector<std::size_t> &globalDimensions,
                               const std::vector<std::size_t> &offsets,
                               const unsigned int skip,
                               const bool addType = false) const noexcept;

    /**
     * GetStats for primitive types except std::complex<T> types
     * @param variable
     * @return stats
     */
    template <class T>
    Stats<T> GetStats(const Variable<T> &variable) const noexcept
wfg's avatar
wfg committed
        Stats<T> stats;
        const std::size_t valuesSize = variable.TotalSize();

        if (m_Verbosity == 0)
        {
            if (valuesSize >=
                10000000) // ten million? this needs actual results
                          // //here we can make decisions for threads
                          // based on valuesSize
                GetMinMax(variable.m_AppValues, valuesSize, stats.Min,
                          stats.Max,
                          m_Threads); // here we can add cores from constructor
            else
                GetMinMax(variable.m_AppValues, valuesSize, stats.Min,
                          stats.Max);
        }
        return stats;
wfg's avatar
wfg committed

    /**
     * GetStats for std::complex<T> types
     * @param variable
     * @return stats
     */
    template <class T>
    Stats<T> GetStats(const Variable<std::complex<T>> &variable) const noexcept
wfg's avatar
wfg committed
    {
wfg's avatar
wfg committed
        Stats<T> stats;
        const std::size_t valuesSize = variable.TotalSize();

        if (m_Verbosity == 0)
        {
            if (valuesSize >=
                10000000) // ten million? this needs actual results
                          // //here we can make decisions for threads
                          // based on valuesSize
                GetMinMax(variable.m_AppValues, valuesSize, stats.Min,
                          stats.Max, m_Threads);
            else
                GetMinMax(variable.m_AppValues, valuesSize, stats.Min,
                          stats.Max);
        }
        return stats;
wfg's avatar
wfg committed
    }
wfg's avatar
wfg committed
    template <class T>
    void WriteBoundsRecord(const bool isScalar, const Stats<T> &stats,
                           std::vector<char> &buffer,
                           std::uint8_t &characteristicsCounter,
                           const bool addLength) const noexcept
wfg's avatar
wfg committed
        if (isScalar == true)
        {
            WriteCharacteristicRecord(
                characteristic_value, stats.Min, buffer, characteristicsCounter,
                addLength); // stats.min = stats.max = value
            return;
        }

        if (m_Verbosity == 0) // default verbose
        {
            WriteCharacteristicRecord(characteristic_min, stats.Min, buffer,
                                      characteristicsCounter, addLength);
            WriteCharacteristicRecord(characteristic_max, stats.Max, buffer,
                                      characteristicsCounter, addLength);
        }
wfg's avatar
wfg committed

    /**
     * Write a characteristic value record to buffer
     * @param id
     * @param value
     * @param buffers
     * @param positions
     * @param characvteristicsCounter to be updated by 1
     * @param addLength true for data, false for metadata
     */
    template <class T>
    void WriteCharacteristicRecord(const std::uint8_t characteristicID,
                                   const T &value, std::vector<char> &buffer,
                                   std::uint8_t &characteristicsCounter,
                                   const bool addLength = false) const noexcept
wfg's avatar
wfg committed
        const std::uint8_t id = characteristicID;
        InsertToBuffer(buffer, &id);
wfg's avatar
wfg committed

        if (addLength == true)
        {
            const std::uint16_t lengthOfCharacteristic = sizeof(T); // id
            InsertToBuffer(buffer, &lengthOfCharacteristic);
        InsertToBuffer(buffer, &value);
wfg's avatar
wfg committed
        ++characteristicsCounter;
wfg's avatar
wfg committed
    /**
     * Returns corresponding index of type BP1Index, if doesn't exists creates a
     * new one.
     * Used for variables and attributes
     * @param name variable or attribute name to look for index
     * @param indices look up hash table of indices
     * @param isNew true: index is newly created, false: index already exists in
     * indices
     * @return reference to BP1Index in indices
     */
    BP1Index &GetBP1Index(const std::string name,
                          std::unordered_map<std::string, BP1Index> &indices,
                          bool &isNew) const noexcept;

    /**
     * Flattens the data and fills the pg length, vars count, vars length and
     * attributes
     * @param metadataSet
     * @param buffer
     */
    void FlattenData(BP1MetadataSet &metadataSet,
                     capsule::STLVector &buffer) const noexcept;

    /**
     * Flattens the metadata indices into a single metadata buffer in capsule
     * @param metadataSet
     * @param buffer
     */
    void FlattenMetadata(BP1MetadataSet &metadataSet,
                         capsule::STLVector &buffer) const noexcept;
} // end namespace format
} // end namespace adios
wfg's avatar
wfg committed

#endif /* BP1WRITER_H_ */