Skip to content
Snippets Groups Projects
BP1Writer.h 17.4 KiB
Newer Older
wfg's avatar
wfg committed
/*
 * BP1.h
 *
 *  Created on: Jan 24, 2017
 *      Author: wfg
 */

#ifndef BP1WRITER_H_
#define BP1WRITER_H_

wfg's avatar
wfg committed
/// \cond EXCLUDE_FROM_DOXYGEN
#include <algorithm> //std::count, std::copy, std::for_each
wfg's avatar
wfg committed
#include <cstring> //std::memcpy
wfg's avatar
wfg committed
#include <cmath>   //std::ceil
wfg's avatar
wfg committed

#include "BP1.h"
wfg's avatar
wfg committed
#include "core/Variable.h"
wfg's avatar
wfg committed
#include "core/Capsule.h"
#include "core/Profiler.h"
#include "capsule/heap/STLVector.h"
wfg's avatar
wfg committed
#include "functions/adiosTemplates.h"
#include "functions/adiosFunctions.h"
wfg's avatar
wfg committed


namespace adios
{
namespace format
{


class BP1Writer : public BP1
    unsigned int m_Threads = 1;  ///< number of threads for thread operations in large array (min,max)
wfg's avatar
wfg committed
    unsigned int m_Verbosity = 0; ///< statistics verbosity, can change if redefined in Engine method.
    float m_GrowthFactor = 1.5; ///< memory growth factor, can change if redefined in Engine method.
    const std::uint8_t m_Version = 3; ///< BP format version
wfg's avatar
wfg committed

    /**
     * Calculates the Process Index size in bytes according to the BP format, including list of method with no parameters (for now)
     * @param name
     * @param timeStepName
     * @param numberOfTransports
     * @return size of pg index
wfg's avatar
wfg committed
     */
    std::size_t GetProcessGroupIndexSize( const std::string name, const std::string timeStepName,
                                          const std::size_t numberOfTransports ) const noexcept;
     * Writes a process group index PGIndex and list of methods (from transports), done at Open or aggregation of new time step
     * Version that operates on a single heap buffer and metadataset.
     * @param isFortran
     * @param name
     * @param processID
     * @param transports
     * @param buffer
     * @param metadataSet
wfg's avatar
wfg committed
    void WriteProcessGroupIndex( const bool isFortran, const std::string name, const std::uint32_t processID,
                                 const std::vector< std::shared_ptr<Transport> >& transports,
wfg's avatar
wfg committed
                                 capsule::STLVector& heap, BP1MetadataSet& metadataSet ) const noexcept;
wfg's avatar
wfg committed
    /**
     * Returns the estimated variable index size
     * @param group
     * @param variableName
     * @param variable
     * @param verbosity
     * @return variable index size
     */
    template< class T >
    size_t GetVariableIndexSize( const Variable<T>& variable ) const noexcept
wfg's avatar
wfg committed
    {
        //size_t indexSize = varEntryLength + memberID + lengthGroupName + groupName + lengthVariableName + lengthOfPath + path + datatype
        size_t indexSize = 23; //without characteristics
wfg's avatar
wfg committed
        indexSize += variable.m_Name.size();
wfg's avatar
wfg committed

        // characteristics 3 and 4, check variable number of dimensions
        const std::size_t dimensions = variable.DimensionsSize(); //number of commas in CSV + 1
wfg's avatar
wfg committed
        indexSize += 28 * dimensions; //28 bytes per dimension
        indexSize += 1; //id

wfg's avatar
wfg committed
        //characteristics, offset + payload offset in data
        indexSize += 2*(1+8);
wfg's avatar
wfg committed
        //characteristic 0, if scalar add value, for now only allowing string
        if( dimensions == 1 )
        {
            indexSize += sizeof(T);
            indexSize += 1; //id
            //must have an if here
wfg's avatar
wfg committed
            indexSize += 2 + variable.m_Name.size();
wfg's avatar
wfg committed
            indexSize += 1; //id
        }

        //characteristic statistics
        if( m_Verbosity == 0 ) //default, only min and max
wfg's avatar
wfg committed
        {
            indexSize += 2 * ( sizeof(T) + 1 );
            indexSize += 1 + 1; //id
        }

        return indexSize + 12; ///extra 12 bytes in case of attributes
wfg's avatar
wfg committed
        //need to add transform characteristics
    }

     * Version for primitive types (except std::complex<T>)
wfg's avatar
wfg committed
     * @param variable
     * @param heap
     * @param metadataSet
wfg's avatar
wfg committed
     */
    template<class T> inline
    void WriteVariableMetadata( const Variable<T>& variable, capsule::STLVector& heap, BP1MetadataSet& metadataSet ) const noexcept
    	Stats<T> stats = GetStats( variable );
    	WriteVariableMetadataCommon( variable, stats, heap, metadataSet );
wfg's avatar
wfg committed
    }
wfg's avatar
wfg committed

    /**
     * Overloaded version for std::complex<T> variables
     * @param variable
     * @param heap
     * @param metadataSet
     */
    template<class T>
    void WriteVariableMetadata( const Variable<std::complex<T>>& variable, capsule::STLVector& heap, BP1MetadataSet& metadataSet ) const noexcept
wfg's avatar
wfg committed
    {
    	Stats<T> stats = GetStats( variable );
    	WriteVariableMetadataCommon( variable, stats, heap, metadataSet );
wfg's avatar
wfg committed
    /**
     * Expensive part this is only for heap buffers need to adapt to vector of capsules
     * @param variable
     * @param buffer
     */
    template< class T >
    void WriteVariablePayload( const Variable<T>& variable, capsule::STLVector& heap, const unsigned int nthreads = 1 ) const noexcept
wfg's avatar
wfg committed
    {
        //EXPENSIVE part, might want to use threads if large, serial for now
wfg's avatar
wfg committed
        CopyToBuffer( heap.m_Data, variable.m_AppValues, variable.TotalSize() );
        heap.m_DataAbsolutePosition += variable.PayLoadSize();
wgodoy's avatar
wgodoy committed
    }
    void Advance( BP1MetadataSet& metadataSet, capsule::STLVector& buffer );

wfg's avatar
wfg committed
    /**
     * Function that sets metadata (if first close) and writes to a single transport
     * @param metadataSet current rank metadata set
wfg's avatar
wfg committed
     * @param heap contains data buffer
wfg's avatar
wfg committed
     * @param transport does a write after data and metadata is setup
     * @param isFirstClose true: metadata has been set and aggregated
     * @param doAggregation true: for N-to-M, false: for N-to-N
wfg's avatar
wfg committed
    void Close( BP1MetadataSet& metadataSet, capsule::STLVector& heap, Transport& transport, bool& isFirstClose,
                const bool doAggregation ) const noexcept;

    /**
     * Writes the ADIOS log information (buffering, open, write and close) for a rank process
     * @param rank current rank
     * @param metadataSet contains Profile info for buffering
     * @param transports  contains Profile info for transport open, writes and close
     * @return string for this rank that will be aggregated into profiling.log
     */
    std::string GetRankProfilingLog( const int rank, const BP1MetadataSet& metadataSet,
                                     const std::vector< std::shared_ptr<Transport> >& transports ) const noexcept;
    template< class T, class U >
    void WriteVariableMetadataCommon( const Variable<T>& variable, Stats<U>& stats,
    		                          capsule::STLVector& heap, BP1MetadataSet& metadataSet ) const noexcept
	{
wfg's avatar
wfg committed
        stats.TimeIndex = metadataSet.TimeStep;

        //Get new Index or point to existing index
        bool isNew = true; //flag to check if variable is new
    	BP1Index& varIndex = GetBP1Index( variable.m_Name, metadataSet.VarsIndices, isNew );
wfg's avatar
wfg committed
    	stats.MemberID = varIndex.MemberID;
wfg's avatar
wfg committed
    	//write metadata header in data and extract offsets
    	stats.Offset = heap.m_DataAbsolutePosition;
wfg's avatar
wfg committed
    	WriteVariableMetadataInData( variable, stats, heap );
    	stats.PayloadOffset = heap.m_DataAbsolutePosition;

wfg's avatar
wfg committed
    	//write to metadata  index
    	WriteVariableMetadataInIndex( variable, stats, isNew, varIndex );

    	++metadataSet.DataPGVarsCount;
	}


    template< class T, class U >
wfg's avatar
wfg committed
    void WriteVariableMetadataInData( const Variable<T>& variable, const Stats<U>& stats,
    								  capsule::STLVector& heap ) const noexcept
    {
        auto& buffer = heap.m_Data;

wfg's avatar
wfg committed
        const std::size_t varLengthPosition = buffer.size(); //capture initial position for variable length
        buffer.insert( buffer.end(), 8, 0 ); //skip var length (8)
        CopyToBuffer( buffer, &stats.MemberID ); //memberID
        WriteNameRecord( variable.m_Name, buffer ); //variable name
        buffer.insert( buffer.end(), 2, 0 ); //skip path
        const std::uint8_t dataType = GetDataType<T>(); //dataType
wfg's avatar
wfg committed
        CopyToBuffer( buffer, &dataType );
        constexpr char no = 'n';  //isDimension
wfg's avatar
wfg committed
        CopyToBuffer( buffer, &no );

        //write variable dimensions
wfg's avatar
wfg committed
        const std::uint8_t dimensions = variable.m_Dimensions.size();
        CopyToBuffer( buffer, &dimensions ); //count
        std::uint16_t dimensionsLength = 27 * dimensions; //27 is from 9 bytes for each: var y/n + local, var y/n + global dimension, var y/n + global offset, changed for characteristic
wfg's avatar
wfg committed
        CopyToBuffer( buffer, &dimensionsLength ); //length
        WriteDimensionsRecord( buffer, variable.m_Dimensions, variable.m_GlobalDimensions, variable.m_GlobalOffsets, 18, true );

        //CHARACTERISTICS
        WriteVariableCharacteristics( variable, stats, buffer, true );

        //Back to varLength including payload size
        const std::uint64_t varLength = buffer.size() - varLengthPosition + variable.PayLoadSize() - 8; //remove its own size
        CopyToBuffer( buffer, varLengthPosition, &varLength ); //length

        heap.m_DataAbsolutePosition += buffer.size() - varLengthPosition; // update absolute position to be used as payload position
    }

wfg's avatar
wfg committed
    template< class T, class U>
    void WriteVariableMetadataInIndex( const Variable<T>& variable, const Stats<U>& stats,
                                       const bool isNew, BP1Index& index ) const noexcept
    {
        auto& buffer = index.Buffer;

        if( isNew == true ) //write variable header (might be shared with attributes index)
        {
            buffer.insert( buffer.end(), 4, 0 ); //skip var length (4)
            CopyToBuffer( buffer, &stats.MemberID );
            buffer.insert( buffer.end(), 2, 0 ); //skip group name
            WriteNameRecord( variable.m_Name, buffer );
            buffer.insert( buffer.end(), 2, 0 ); //skip path

            const std::uint8_t dataType = GetDataType<T>();
            CopyToBuffer( buffer, &dataType );

            //Characteristics Sets Count in Metadata
            index.Count = 1;
            CopyToBuffer( buffer, &index.Count );
        }
        else //update characteristics sets count
        {
            const std::size_t characteristicsSetsCountPosition = 15 + variable.m_Name.size();
wfg's avatar
wfg committed
            ++index.Count;
            CopyToBuffer( buffer, characteristicsSetsCountPosition, &index.Count ); //test
        }

        WriteVariableCharacteristics( variable, stats, buffer );
    }


    template<class T, class U>
    void WriteVariableCharacteristics( const Variable<T>& variable, const Stats<U>& stats, std::vector<char>& buffer,
                                       const bool addLength = false ) const noexcept
    {
        const std::size_t characteristicsCountPosition = buffer.size(); //very important to track as writer is going back to this position
        buffer.insert( buffer.end(), 5, 0 ); //skip characteristics count(1) + length (4)
        std::uint8_t characteristicsCounter = 0;

        //DIMENSIONS
        std::uint8_t characteristicID = characteristic_dimensions;
wfg's avatar
wfg committed
        CopyToBuffer( buffer, &characteristicID );
        const std::uint8_t dimensions = variable.m_Dimensions.size();

        if( addLength == true )
        {
            const std::int16_t lengthOfDimensionsCharacteristic = 24 * dimensions + 3; // 24 = 3 local, global, global offset x 8 bytes/each
            CopyToBuffer( buffer, &lengthOfDimensionsCharacteristic );
        }

        CopyToBuffer( buffer, &dimensions ); //count
        const std::uint16_t dimensionsLength = 24 * dimensions;
        CopyToBuffer( buffer, &dimensionsLength ); //length
        WriteDimensionsRecord( buffer, variable.m_Dimensions, variable.m_GlobalDimensions, variable.m_GlobalOffsets, 16, addLength );
        ++characteristicsCounter;

        //VALUE for SCALAR or STAT min, max for ARRAY
wfg's avatar
wfg committed
        WriteBoundsRecord( variable.m_IsScalar, stats, buffer, characteristicsCounter, addLength );
wfg's avatar
wfg committed
        WriteCharacteristicRecord( characteristic_time_index, stats.TimeIndex, buffer, characteristicsCounter, addLength );

        if( addLength == false )//only in metadata offset and payload offset
        {
            WriteCharacteristicRecord( characteristic_offset, stats.Offset, buffer, characteristicsCounter );
            WriteCharacteristicRecord( characteristic_payload_offset, stats.PayloadOffset, buffer, characteristicsCounter );
        }
        //END OF CHARACTERISTICS

        //Back to characteristics count and length
wfg's avatar
wfg committed
        CopyToBuffer( buffer, characteristicsCountPosition, &characteristicsCounter ); //count (1)
        const std::uint32_t characteristicsLength = buffer.size() - characteristicsCountPosition - 4 - 1; //remove its own length (4 bytes) + characteristic counter ( 1 byte )
        CopyToBuffer( buffer, characteristicsCountPosition+1, &characteristicsLength ); //length
wfg's avatar
wfg committed

wfg's avatar
wfg committed
     * Writes from &buffer[position]:  [2 bytes:string.length()][string.length(): string.c_str()]
     * @param name
wfg's avatar
wfg committed
     * @param buffer
     * @param position
wfg's avatar
wfg committed
    void WriteNameRecord( const std::string name, std::vector<char>& buffer ) const noexcept;
wfg's avatar
wfg committed

    /**
     * Write a dimension record for a global variable used by WriteVariableCommon
wfg's avatar
wfg committed
     * @param buffer
     * @param position
     * @param localDimensions
     * @param globalDimensions
     * @param globalOffsets
     * @param addType true: for data buffers, false: for metadata buffer and data characteristic
     */
wfg's avatar
wfg committed
    void WriteDimensionsRecord( std::vector<char>& buffer,
                                const std::vector<std::size_t>& localDimensions,
                                const std::vector<std::size_t>& globalDimensions,
                                const std::vector<std::size_t>& globalOffsets,
							    const unsigned int skip,
                                const bool addType = false ) const noexcept;
wfg's avatar
wfg committed
    /**
     * GetStats for primitive types except std::complex<T> types
     * @param variable
     * @return stats
    template<class T>
	Stats<T> GetStats( const Variable<T>& variable ) const noexcept
	{
		Stats<T> stats;
		const std::size_t valuesSize = variable.TotalSize();

		if( m_Verbosity == 0 )
		{
			if( valuesSize >= 10000000 ) //ten million? this needs actual results //here we can make decisions for threads based on valuesSize
				GetMinMax( variable.m_AppValues, valuesSize, stats.Min, stats.Max, m_Threads ); //here we can add cores from constructor
			else
				GetMinMax( variable.m_AppValues, valuesSize, stats.Min, stats.Max );
		}
		return stats;
	}
     * GetStats for std::complex<T> types
     * @param variable
     * @return stats
     */
    template<class T>
	Stats<T> GetStats( const Variable<std::complex<T>>& variable ) const noexcept
	{
		Stats<T> stats;
		const std::size_t valuesSize = variable.TotalSize();

		if( m_Verbosity == 0 )
		{
			if( valuesSize >= 10000000 ) //ten million? this needs actual results //here we can make decisions for threads based on valuesSize
				GetMinMax( variable.m_AppValues, valuesSize, stats.Min, stats.Max, m_Threads ); //here we can add cores from constructor
			else
				GetMinMax( variable.m_AppValues, valuesSize, stats.Min, stats.Max );
		}
		return stats;
	}
    template< class T >
wfg's avatar
wfg committed
    void WriteBoundsRecord( const bool isScalar, const Stats<T>& stats, std::vector<char>& buffer,
    		                std::uint8_t& characteristicsCounter, const bool addLength ) const noexcept
    	if( isScalar == true )
    	{
wfg's avatar
wfg committed
    	    WriteCharacteristicRecord( characteristic_value, stats.Min, buffer, characteristicsCounter, addLength ); //stats.min = stats.max = value
    	if( m_Verbosity == 0 ) //default verbose
        {
wfg's avatar
wfg committed
    	    WriteCharacteristicRecord( characteristic_min, stats.Min, buffer, characteristicsCounter, addLength );
    	    WriteCharacteristicRecord( characteristic_max, stats.Max, buffer, characteristicsCounter, addLength );
     * Write a characteristic value record to buffer
wfg's avatar
wfg committed
     * @param id
     * @param value
     * @param buffers
     * @param positions
     * @param characvteristicsCounter to be updated by 1
wfg's avatar
wfg committed
     * @param addLength true for data, false for metadata
     */
    template<class T>
wfg's avatar
wfg committed
    void WriteCharacteristicRecord( const std::uint8_t characteristicID, const T& value,
                                    std::vector<char>& buffer, std::uint8_t& characteristicsCounter,
                                    const bool addLength = false ) const noexcept
wfg's avatar
wfg committed
    {
        const std::uint8_t id = characteristicID;
wfg's avatar
wfg committed
    	CopyToBuffer( buffer, &id );
wfg's avatar
wfg committed

        if( addLength == true )
        {
wfg's avatar
wfg committed
            const std::uint16_t lengthOfCharacteristic = sizeof( T ); //id
            CopyToBuffer( buffer, &lengthOfCharacteristic );
wfg's avatar
wfg committed
        }

wfg's avatar
wfg committed
        CopyToBuffer( buffer, &value );
        ++characteristicsCounter;
wfg's avatar
wfg committed
    }
    /**
     * Returns corresponding index of type BP1Index, if doesn't exists creates a new one.
     * Used for variables and attributes
     * @param name variable or attribute name to look for index
     * @param indices look up hash table of indices
     * @param isNew true: index is newly created, false: index already exists in indices
     * @return reference to BP1Index in indices
     */
wfg's avatar
wfg committed
    BP1Index& GetBP1Index( const std::string name, std::unordered_map<std::string, BP1Index>& indices, bool& isNew ) const noexcept;
wfg's avatar
wfg committed
     * Flattens the data and fills the pg length, vars count, vars length and attributes
     * @param metadataSet
wfg's avatar
wfg committed
     * @param buffer
wfg's avatar
wfg committed
    void FlattenData( BP1MetadataSet& metadataSet, capsule::STLVector& buffer ) const noexcept;
wfg's avatar
wfg committed
     * Flattens the metadata indices into a single metadata buffer in capsule
     * @param metadataSet
wfg's avatar
wfg committed
     * @param buffer
wfg's avatar
wfg committed
    void FlattenMetadata( BP1MetadataSet& metadataSet, capsule::STLVector& buffer ) const noexcept; ///< sets the metadata buffer in capsule with indices and minifooter
wfg's avatar
wfg committed
};


} //end namespace format
} //end namespace adios

#endif /* BP1WRITER_H_ */