From 72e715523eabf97472c9a2567d7b1b0f5412c804 Mon Sep 17 00:00:00 2001 From: wfg <wfg@pc0098504.ornl.gov> Date: Fri, 10 Feb 2017 18:00:11 -0500 Subject: [PATCH] Added a function to check buffer memory allocation in Writer engine --- include/engine/writer/Writer.h | 87 +++++++++++++++++++--------------- include/format/BP1Writer.h | 3 +- src/engine/writer/Writer.cpp | 61 +++++++++++++++++------- 3 files changed, 95 insertions(+), 56 deletions(-) diff --git a/include/engine/writer/Writer.h b/include/engine/writer/Writer.h index e4db2d40c..ced2f65ef 100644 --- a/include/engine/writer/Writer.h +++ b/include/engine/writer/Writer.h @@ -37,43 +37,6 @@ public: ~Writer( ); - template< class T > - void WriteVariable( const Group& group, const Var variableName, const Variable<T>& variable ) - { - auto lf_CheckAllocationResult = []( const int result, const std::string variableName, const int rankMPI ) - { - if( result == -1 ) - throw std::runtime_error( "ERROR: bad_alloc when writing variable " + variableName + - " from rank " + std::to_string( rankMPI ) ); - }; - - //Check if data in buffer needs to be reallocated - const size_t indexSize = m_BP1Writer.GetVariableIndexSize( group, variableName, variable ); //metadata size - const std::size_t payloadSize = GetTotalSize( group.GetDimensions( variable.DimensionsCSV ) ) * sizeof( T ); - const std::size_t dataSize = payloadSize + indexSize + 10; //adding some bytes tolerance - const std::size_t neededSize = dataSize + m_Buffer.m_DataPosition; - // might need to write payload in batches - const bool doTransportsFlush = ( neededSize > m_MaxBufferSize )? true : false; - - int result = GrowBuffer( m_MaxBufferSize, m_GrowthFactor, m_Buffer.m_DataPosition, m_Buffer.m_Data ); - lf_CheckAllocationResult( result, variableName, m_RankMPI ); - - //WRITE INDEX// - m_BP1Writer.WriteVariableIndex( group, variableName, variable, m_Buffer, m_MetadataSet ); - - if( doTransportsFlush == true ) //in batches - { - - } - else //Write data - { - //this is the expensive part might want to use threaded memcpy - MemcpyThreads( m_Buffer.m_Data.data(), variable.Values, payloadSize, m_Cores ); - m_Buffer.m_DataPosition += payloadSize; - m_Buffer.m_DataAbsolutePosition += payloadSize; - } - } - void Write( Group& group, const std::string variableName, const char* values ); void Write( Group& group, const std::string variableName, const unsigned char* values ); void Write( Group& group, const std::string variableName, const short* values ); @@ -111,10 +74,60 @@ private: format::BP1MetadataSet m_MetadataSet; ///< metadata set accompanying the heap buffer data in bp format. Needed by m_BP1Writer std::size_t m_MaxBufferSize; float m_GrowthFactor = 1.5; + bool m_TransportFlush = false; ///< true: transport flush happened, buffer must be reset void Init( ); void InitTransports( ); + + /** + * Common function + * @param group + * @param variableName + * @param variable + */ + template< class T > + void WriteVariable( const Group& group, const Var variableName, const Variable<T>& variable ) + { + //precalculate new metadata and payload sizes + const std::size_t indexSize = m_BP1Writer.GetVariableIndexSize( group, variableName, variable ); + const std::size_t payloadSize = GetTotalSize( group.GetDimensions( variable.DimensionsCSV ) ) * sizeof( T ); + + //Buffer reallocation, expensive part + m_TransportFlush = CheckBuffersAllocation( group, variableName, indexSize, payloadSize ); + + //WRITE INDEX to data buffer and metadata structure (in memory)// + m_BP1Writer.WriteVariableIndex( group, variableName, variable, m_Buffer, m_MetadataSet ); + + if( m_TransportFlush == true ) //in batches + { + //write pg index + + //flush to transports + + //reset positions to zero, update absolute position + + } + else //Write data to buffer + { + //Values to Buffer -> Copy of data, Expensive part might want to use threads if large. Need a model to apply threading. + MemcpyThreads( m_Buffer.m_Data.data(), variable.Values, payloadSize, m_Cores ); + //update indices + m_Buffer.m_DataPosition += payloadSize; + m_Buffer.m_DataAbsolutePosition += payloadSize; + } + } + + /** + * Check if heap buffers for data and metadata need reallocation or maximum sizes have been reached. + * @param group variable owner + * @param variableName name of the variable to be written + * @param indexSize precalculated index size + * @param payloadSize payload size from variable total size + * @return true: transport must be flush and buffers reset, false: buffer is sufficient + */ + bool CheckBuffersAllocation( const Group& group, const Var variableName, const std::size_t indexSize, const std::size_t payloadSize ); + }; diff --git a/include/format/BP1Writer.h b/include/format/BP1Writer.h index 1cc60c277..425738d8a 100644 --- a/include/format/BP1Writer.h +++ b/include/format/BP1Writer.h @@ -10,7 +10,7 @@ /// \cond EXCLUDE_FROM_DOXYGEN #include <vector> -#include <cstdint> +#include <cstdint> //std::intX_t fixed size classes #include <algorithm> //std::count, std::copy, std::for_each #include <cstring> //std::memcpy #include <cmath> //std::ceil @@ -31,6 +31,7 @@ namespace format { + class BP1Writer : public BP1 { diff --git a/src/engine/writer/Writer.cpp b/src/engine/writer/Writer.cpp index 4d725d6f0..f96d2b9c1 100644 --- a/src/engine/writer/Writer.cpp +++ b/src/engine/writer/Writer.cpp @@ -258,14 +258,44 @@ void Writer::Write( const std::string variableName, const long double* values ) } +void Writer::Close( const int transportIndex ) +{ + //BP1Writer to update the metadata indices + + + //merge all metadata indices in capsule.m_Metadata buffer or capsule.m_Data buffer (depends on transport predefined functionality) + + + //BP1Writer to write to corresponding transport + + if( transportIndex == -1 ) // all transports + { + for( auto& transport : m_Transports ) + transport->Write( m_Buffer.m_Data.data(), m_Buffer.m_DataPosition ); + + for( auto& transport : m_Transports ) + transport->Close( ); //actually no need, close is in destructor (like fstream) + } + else + { + m_Transports[ transportIndex ]->Write( m_Buffer.m_Data.data(), m_Buffer.m_DataPosition ); + } + + //Close the corresponding transport +} + + + + + void Writer::InitTransports( ) { if( m_DebugMode == true ) { if( TransportNamesUniqueness( ) == false ) { - std::invalid_argument( "ERROR: two transports of the same kind (e.g file IO) " - "can't have the same name, modify with name= in Method AddTransport\n" ); + throw std::invalid_argument( "ERROR: two transports of the same kind (e.g file IO) " + "can't have the same name, modify with name= in Method AddTransport\n" ); } } @@ -306,28 +336,23 @@ void Writer::InitTransports( ) -void Writer::Close( const int transportIndex ) +bool Writer::CheckBuffersAllocation( const Group& group, const Var variableName, const std::size_t indexSize, + const std::size_t payloadSize ) { - //flush the last piece of data - if( transportIndex == -1 ) // all transports - { - for( auto& transport : m_Transports ) - transport->Write( m_Buffer.m_Data.data(), m_Buffer.m_DataPosition ); + //Check if data in buffer needs to be reallocated + const std::size_t dataSize = payloadSize + indexSize + 10; //adding some bytes tolerance + const std::size_t neededSize = dataSize + m_Buffer.m_DataPosition; + // might need to write payload in batches + bool doTransportsFlush = ( neededSize > m_MaxBufferSize )? true : false; - for( auto& transport : m_Transports ) - transport->Close( ); - } - else - { - m_Transports[ transportIndex ]->Write( m_Buffer.m_Data.data(), m_Buffer.m_DataPosition ); - } - - //BP1Writer to take care of metadata indices at Close + if( GrowBuffer( m_MaxBufferSize, m_GrowthFactor, m_Buffer.m_DataPosition, m_Buffer.m_Data ) == -1 ) + doTransportsFlush = true; + GrowBuffer( indexSize, m_GrowthFactor, m_MetadataSet.VarsIndexPosition, m_MetadataSet.VarsIndex ); //not checking for metadata + return doTransportsFlush; } - } //end namespace adios -- GitLab