From 6f3664e144a934bd6201373f8923a7b22a7f486d Mon Sep 17 00:00:00 2001 From: wfg <wfg@pc0098504.ornl.gov> Date: Wed, 1 Feb 2017 18:30:45 -0500 Subject: [PATCH] Working on BP1Writer.h Tested memory leaks with valgrind for simple example Verifying if variable index is written correctly Added ./src/format/BP1Writer.cpp To do: Need to test for array index and payload write --- Makefile | 2 +- examples/hello/writer/helloWriter_OOP.cpp | 12 +-- include/core/Capsule.h | 6 +- include/core/Engine.h | 2 +- include/core/Group.h | 2 +- include/core/Transport.h | 2 +- include/engine/writer/Writer.h | 2 + include/engine/writer/WriterTemplates.h | 90 +++++++++++++++-- include/format/BP1Writer.h | 117 ++++++++++------------ include/functions/adiosTemplates.h | 70 +++++++++++++ include/functions/capsuleTemplates.h | 59 ----------- src/capsule/Heap.cpp | 6 +- src/core/Group.cpp | 7 +- src/core/Method.cpp | 1 + src/engine/writer/Writer.cpp | 51 +++++++--- src/format/BP1Writer.cpp | 47 +++++++++ src/mpidummy.cpp | 2 +- 17 files changed, 313 insertions(+), 165 deletions(-) create mode 100644 src/format/BP1Writer.cpp diff --git a/Makefile b/Makefile index 898f40eee..86b405b8d 100644 --- a/Makefile +++ b/Makefile @@ -22,7 +22,7 @@ CPPFiles:=$(shell find ./src -type f -name "*.cpp") INC:=-I./include VPATH = ./src ./src/core ./src/functions \ ./src/engine ./src/engine/writer ./src/engine/dataman \ - ./src/capsule ./src/transform ./src/transport + ./src/capsule ./src/transform ./src/transport ./src/format #SEPARATE EXTERNAL LIBRARIES HANDLING in Makefile.libs export $(HFiles) $(CPPFiles) $(CFLAGS) $(LIBS) diff --git a/examples/hello/writer/helloWriter_OOP.cpp b/examples/hello/writer/helloWriter_OOP.cpp index 08cfa24ce..35b526f9c 100644 --- a/examples/hello/writer/helloWriter_OOP.cpp +++ b/examples/hello/writer/helloWriter_OOP.cpp @@ -40,15 +40,15 @@ int main( int argc, char* argv [] ) adios::Group& ioGroup = adios.DeclareGroup( "ioGroup" ); adios::Var ioNx = ioGroup.DefineVariable<unsigned int>( "Nx" ); - //adios::Dims dimNx = ioGroup.SetDimensions( {ioNx} ); + adios::Dims dimNx = ioGroup.SetDimensions( {ioNx} ); - adios::Var ioMyDoubles = ioGroup.DefineVariable<double>( "myDoubles", "Nx" ); - adios::Var ioMyFloats = ioGroup.DefineVariable<float>( "myFloats", "Nx" ); + adios::Var ioMyDoubles = ioGroup.DefineVariable<double>( "myDoubles", dimNx ); + //adios::Var ioMyFloats = ioGroup.DefineVariable<float>( "myFloats", dimNx ) ; //add transform to variable in group...not executed (just testing API) adios::Transform bzip2 = adios::transform::BZIP2( ); ioGroup.AddTransform( ioMyDoubles, bzip2, 1 ); - ioGroup.AddTransform( ioMyFloats, bzip2, 1 ); + //ioGroup.AddTransform( ioMyFloats, bzip2, 1 ); //Define method for engine creation, it is basically straight-forward parameters adios::Method& bpWriterSettings = adios.DeclareMethod( "SinglePOSIXFile" ); //default method type is Writer @@ -63,8 +63,8 @@ int main( int argc, char* argv [] ) throw std::ios_base::failure( "ERROR: failed to open ADIOS bpWriter\n" ); bpWriter->Write<unsigned int>( ioNx, &Nx ); - bpWriter->Write<double>( ioMyDoubles, myDoubles.data() ); // Base class Engine own the Write<T> that will call overloaded Write from Derived - bpWriter->Write<float>( ioMyFloats, myFloats.data() ); + //bpWriter->Write<double>( ioMyDoubles, myDoubles.data() ); // Base class Engine own the Write<T> that will call overloaded Write from Derived + //bpWriter->Write<float>( ioMyFloats, myFloats.data() ); bpWriter->Close( ); } catch( std::invalid_argument& e ) diff --git a/include/core/Capsule.h b/include/core/Capsule.h index 372ffdfaf..852154187 100644 --- a/include/core/Capsule.h +++ b/include/core/Capsule.h @@ -27,8 +27,10 @@ public: const std::string m_Type; ///< buffer type const std::string m_AccessMode; ///< 'w': write, 'r': read, 'a': append - std::size_t m_AbsoluteDataPosition; ///< includes the data flushed to transports - std::size_t m_AbsoluteMetadataPosition; ///< includes the metadata flushed to transports + std::size_t m_DataPosition = 0; ///< position in current data buffer (not included data flushed to transports) + std::size_t m_DataAbsolutePosition = 0; ///< includes the data flushed to transports + + std::size_t m_MetadataPosition = 0; ///< position in metadata buffer /** * Base class constructor providing type from derived class and accessMode diff --git a/include/core/Engine.h b/include/core/Engine.h index 65e41b5de..5f053edee 100644 --- a/include/core/Engine.h +++ b/include/core/Engine.h @@ -76,7 +76,7 @@ public: template< class T > void Write( const std::string variableName, const T* values ) { - Write( *m_Group, variableName, values ); + Write( variableName, values ); } template< class T > diff --git a/include/core/Group.h b/include/core/Group.h index 879d85903..07dbcd3d5 100644 --- a/include/core/Group.h +++ b/include/core/Group.h @@ -109,7 +109,7 @@ public: template< class T > Var DefineVariable( const std::string variableName, - const Dims dimensionsCSV = "", + const Dims dimensionsCSV = "1", const Dims globalDimensionsCSV = "", const Dims globalOffsetsCSV = "", const std::vector<Transform*> transforms = std::vector<Transform*>(), const std::vector<int> parameters = std::vector<int>() ) diff --git a/include/core/Transport.h b/include/core/Transport.h index 8e5810684..bae14aba9 100644 --- a/include/core/Transport.h +++ b/include/core/Transport.h @@ -72,7 +72,7 @@ public: * @param buffer pointer to buffer to be written * @param size size of buffer to be written */ - virtual void Write( const char* buffer, const std::size_t size ) = 0; + virtual void Write( const char* buffer, std::size_t size ) = 0; /** * Some transports separate the data from the metadata in a different medium diff --git a/include/engine/writer/Writer.h b/include/engine/writer/Writer.h index 7e455e563..a5c3a309e 100644 --- a/include/engine/writer/Writer.h +++ b/include/engine/writer/Writer.h @@ -68,6 +68,8 @@ public: private: Heap m_Buffer; ///< heap capsule + float m_GrowthFactor = 1.5; + std::size_t m_MaxBufferSize; format::BP1Writer m_BP1Writer; ///< format object will provide the required BP functionality to be applied on m_Buffer and m_Transports void Init( ); diff --git a/include/engine/writer/WriterTemplates.h b/include/engine/writer/WriterTemplates.h index d21dd82b1..3ceaa2cdd 100644 --- a/include/engine/writer/WriterTemplates.h +++ b/include/engine/writer/WriterTemplates.h @@ -8,10 +8,14 @@ #ifndef WRITERTEMPLATES_H_ #define WRITERTEMPLATES_H_ +/// \cond EXCLUDE_FROM_DOXYGEN #include <string> #include <vector> #include <iostream> -#include <memory> +#include <memory> //std::shared_ptr +#include <cmath> //std::ceil +#include <new> //std::bad_alloc +/// \endcond #include "core/Group.h" @@ -27,31 +31,95 @@ namespace adios * Unique template function that replaces macros to write any variable type to a single heap capsule * @param group variable owner * @param variableName - * @param variable + * @param variable to be written + * @param growthFactor buffer growth factor + * @param maxBufferSize buffer maximum size, if reached transport will be called * @param buffers single heap capsule containing data and metadata buffers * @param transports all transports from Writer Engine, info is flushed in case of buffer overflow * @param bp1Writer from Writer Engine */ template <class T> -void WriterWriteVariable( const Group& group, const std::string variableName, const Variable<T>& variable, +void WriterWriteVariable( const Group& group, const Var variableName, const Variable<T>& variable, + const float growthFactor, const std::size_t maxBufferSize, const int rankMPI, Heap& buffer, std::vector< std::shared_ptr<Transport> >& transports, format::BP1Writer& bp1Writer ) { + //Check if data in buffer needs to be reallocated + const size_t indexSize = bp1Writer.GetVariableIndexSize( group, variableName, variable ); //metadata size + const std::size_t payloadSize = GetTotalSize( group.GetDimensions( variable.DimensionsCSV ) ) * sizeof( T ); + const std::size_t dataSize = payloadSize + indexSize + 10; //adding some bytes tolerance - std::cout << "Hello from bp Writer, writing variable " << variableName << " of typeid(T).name() = " << typeid(T).name() << "\n"; - if( variable.IsDimension ) + const std::size_t currentCapacity = buffer.m_Data.capacity( ); + const std::size_t availableSize = currentCapacity - buffer.m_DataPosition; + + bool doTransportsFlush = false; // might need to write payload in batches + + if( dataSize > availableSize ) { - std::cout << "Which is a dimension variable\n"; + const std::size_t minimumNewSize = dataSize + buffer.m_DataPosition; + const unsigned int factor = static_cast<unsigned int>( std::ceil( minimumNewSize / currentCapacity / growthFactor ) ); + const std::size_t newSize = factor * growthFactor * currentCapacity; + + if( newSize <= maxBufferSize ) + { + try + { + buffer.m_Data.resize( newSize ); + } + catch( std::bad_alloc& e ) + { + throw std::invalid_argument( "ERROR: couldn't allocate maximum buffer size of " + std::to_string( maxBufferSize ) + + " in rank " + std::to_string( rankMPI ) + " when Write variable " + variableName + "\n" ); + } + } + else + { + doTransportsFlush = true; + try + { + buffer.m_Data.resize( maxBufferSize ); + } + catch( std::bad_alloc& e ) + { + throw std::invalid_argument( "ERROR: couldn't allocate maximum buffer size of " + std::to_string( maxBufferSize ) + + " in rank " + std::to_string( rankMPI ) + " when Write variable " + variableName + "\n" ); + } + } } + //WRITE INDEX// + bp1Writer.CheckVariableIndexSize( indexSize ); //checks bp1Writer m_VariableIndex + + //Write in BP Format + std::vector<char*> dataBuffers { buffer.m_Data.data() }; + std::vector<std::size_t> dataPositions { buffer.m_DataPosition }; //needs to be updated + std::vector<std::size_t> dataAbsolutePositions { buffer.m_DataAbsolutePosition }; //needs to be updated at the end + std::vector<char*> metadataBuffers { bp1Writer.m_VariableIndex.data() }; + std::vector<std::size_t> metadataPositions { bp1Writer.m_VariableIndexPosition }; //needs to be updated + + bp1Writer.WriteVariableIndex( group, variableName, variable, dataBuffers, dataPositions, + dataAbsolutePositions, metadataBuffers, metadataPositions ); + + buffer.m_DataPosition = dataPositions[0]; + buffer.m_DataAbsolutePosition = dataAbsolutePositions[0]; + bp1Writer.m_VariableIndexPosition = metadataPositions[0]; + + + + //here write payload to data + if( doTransportsFlush == true ) + { + + } + else + { + bp1Writer.MemcpyToBuffers( dataBuffers, dataPositions, variable.Values, payloadSize ); //this is the expensive part might want to use Cores + buffer.m_DataPosition = dataPositions[0]; + buffer.m_DataAbsolutePosition += buffer.m_DataPosition; + } - //here deal with buffers allocation -// const auto localDimensions = group.GetDimensions( variable.DimensionsCSV ); -// const auto size = GetTotalSize( localDimensions ); -// T min, max; -// GetMinMax( variable.Values, size, min, max ); // // const std::size_t bytesToWrite = localSize * sizeof( double ); //size of values + min + max in bytes diff --git a/include/format/BP1Writer.h b/include/format/BP1Writer.h index dbb007046..5cbd167a1 100644 --- a/include/format/BP1Writer.h +++ b/include/format/BP1Writer.h @@ -8,15 +8,18 @@ #ifndef BP1WRITER_H_ #define BP1WRITER_H_ - +/// \cond EXCLUDE_FROM_DOXYGEN #include <vector> #include <cstdint> #include <algorithm> //std::count, std::copy, std::for_each #include <cstring> //std::memcpy +#include <cmath> //std::ceil +/// \endcond #include "core/Variable.h" #include "core/Group.h" - +#include "functions/adiosTemplates.h" +#include "functions/adiosFunctions.h" namespace adios @@ -36,14 +39,17 @@ public: std::uint32_t m_VariablesCount = 0; ///< number of written Variables std::uint64_t m_VariablesLength = 0; ///< length in bytes of written Variables - std::vector<char> m_VariableIndex; ///< metadata variable index + std::size_t m_VariableIndexPosition = 12; ///< initial position in bytes + std::vector<char> m_VariableIndex = std::vector<char>( 102400 ); ///< metadata variable index, start with 1Kb + std::map< std::string, std::pair<std::size_t,std::size_t> > m_VariablePositions; std::uint32_t m_AttributesCount = 0; ///< number of Attributes std::uint64_t m_AttributesLength = 0; ///< length in bytes of Attributes - std::vector<char> m_AttributeIndex; ///< metadata attribute index + std::vector<char> m_AttributeIndex = std::vector<char>( 102400 ); ///< metadata attribute index const unsigned int m_Cores = 1; const unsigned int m_Verbosity = 0; + const float m_GrowthFactor = 1.5; /** * DataTypes mapping in BP Format @@ -103,7 +109,6 @@ public: statistic_finite = 6 }; - /** * Returns the estimated variable index size * @param group @@ -168,7 +173,10 @@ public: for( unsigned int i = 0; i < length; ++i ) { - std::memcpy( &buffers[positions[i]], source, size ); + //std::memcpy( buffers[positions[i]], source, size ); + char* buffer = buffers[i]; + std::memcpy( &buffer[ positions[i] ], source, size ); + //std::copy( source, source+size, &buffers[ positions[i] ] ); positions[i] += size; } } @@ -181,34 +189,9 @@ public: for( unsigned int i = 0; i < length; ++i ) { - std::memcpy( &buffers[positions[i]], &source[i], size ); - positions[i] += size; - } - } - - - - template< class T, class U > - void CopyToBuffers( std::vector<char*>& buffers, std::vector<std::size_t>& positions, const T* source, U size ) noexcept - { - const unsigned int length = buffers.size( ); - - for( unsigned int i = 0; i < length; ++i ) - { - std::copy( source, source+size, &buffers[ positions[i] ] ); - positions[i] += size; - } - } - - - template< class T, class U > - void CopyToBuffers( std::vector<char*>& buffers, std::vector<std::size_t>& positions, const std::vector<T>& source, U size ) noexcept - { - const unsigned int length = buffers.size( ); - - for( unsigned int i = 0; i < length; ++i ) - { - std::copy( &source[i], &source[i]+size, &buffers[ positions[i] ] ); + char* buffer = buffers[i]; + std::memcpy( &buffer[ positions[i] ], &source[i], size ); + //std::copy( &source[i], &source[i]+size, &buffers[ positions[i] ] ); positions[i] += size; } } @@ -224,16 +207,16 @@ public: * @param metadataPosition position in metadataBuffer */ template< class T > - void WriteVariable( const Group& group, const Var variableName, const Variable<T>& variable, - std::vector<char*>& dataBuffers, - std::vector<std::size_t>& dataPositions, - std::vector<std::size_t>& dataAbsolutePositions, - std::vector<char*>& metadataBuffers, - std::vector<std::size_t>& metadataPositions ) noexcept + void WriteVariableIndex( const Group& group, const Var variableName, const Variable<T>& variable, + std::vector<char*>& dataBuffers, + std::vector<std::size_t>& dataPositions, + std::vector<std::size_t>& dataAbsolutePositions, + std::vector<char*>& metadataBuffers, + std::vector<std::size_t>& metadataPositions ) noexcept { auto lf_MovePositions = []( const int bytes, std::vector<std::size_t>& positions ) { - for( auto& position : positions ) // value or reference? + for( auto& position : positions ) position += bytes; }; @@ -304,7 +287,7 @@ public: lf_MovePositions( 16, metadataPositions ); //skipping global dimension(8), global offset (8) } - const char no = 'n'; //dimension format unsigned int value (not using memberID for now) + constexpr char no = 'n'; //dimension format unsigned int value (not using memberID for now) for( unsigned int d = 0; d < (unsigned int)localDimensions.size(); ++d ) { //data dimensions @@ -344,7 +327,7 @@ public: } //data dimensions entry - const char no = 'n'; //dimension format unsigned int value + constexpr char no = 'n'; //dimension format unsigned int value for( unsigned int d = 0; d < (unsigned int)localDimensions.size(); ++d ) { MemcpyToBuffers( dataBuffers, dataPositions, &no, 1 ); @@ -405,8 +388,8 @@ public: //set characteristic ids for min and max characteristicID = characteristic_stat; - const std::int8_t statisticMinID = statistic_min; - const std::int8_t statisticMaxID = statistic_max; + constexpr std::int8_t statisticMinID = statistic_min; + constexpr std::int8_t statisticMaxID = statistic_max; //write min and max to metadata MemcpyToBuffers( metadataBuffers, metadataPositions, &characteristicID, 1 ); //min @@ -419,7 +402,7 @@ public: //write min and max to data MemcpyToBuffers( dataBuffers, dataPositions, &characteristicID, 1 ); //min - const std::int16_t lengthCharacteristic = 1 + sizeof( T ); + constexpr std::int16_t lengthCharacteristic = 1 + sizeof( T ); MemcpyToBuffers( dataBuffers, dataPositions, &lengthCharacteristic, 2 ); MemcpyToBuffers( dataBuffers, dataPositions, &statisticMinID, 1 ); MemcpyToBuffers( dataBuffers, dataPositions, &min, sizeof(T) ); @@ -433,10 +416,9 @@ public: ++characteristicsCounter; //Characteristics count and length in Data - std::vector<std::uint32_t> dataCharacteristicsLengths( dataPositions ); - std::transform( dataCharacteristicsLengths.begin( ), dataCharacteristicsLengths.end( ), - dataCharacteristicsCountPositions.begin(), dataCharacteristicsCountPositions.end(), - std::minus<std::uint32_t>() ); + std::vector<std::uint32_t> dataCharacteristicsLengths( dataPositions.size() ); + for( unsigned int i = 0; i < dataPositions.size(); ++i ) + dataCharacteristicsLengths[i] = dataPositions[i] - dataCharacteristicsCountPositions[i]; MemcpyToBuffers( dataBuffers, dataCharacteristicsCountPositions, &characteristicsCounter, 1 ); MemcpyToBuffers( dataBuffers, dataCharacteristicsCountPositions, dataCharacteristicsLengths, 4 ); //vector to vector @@ -449,31 +431,32 @@ public: ++characteristicsCounter; //update absolute positions with dataPositions, this is the payload offset - std::transform( dataAbsolutePositions.begin(), dataAbsolutePositions.end(), - dataPositions.begin(), dataPositions.end(), std::plus<std::size_t>() ); + for( unsigned int i = 0; i < dataAbsolutePositions.size(); ++i ) + dataAbsolutePositions[i] += dataPositions[i]; characteristicID = characteristic_payload_offset; MemcpyToBuffers( metadataBuffers, metadataPositions, &characteristicID, 1 ); //variable payload offset id MemcpyToBuffers( metadataBuffers, metadataPositions, dataAbsolutePositions, 8 ); //variable payload offset ++characteristicsCounter; - //Characteristics count and length in Metadata - std::vector<std::uint32_t> metadataCharacteristicsLengths( metadataPositions ); - std::transform( metadataCharacteristicsLengths.begin( ), metadataCharacteristicsLengths.end( ), - metadataCharacteristicsCountPositions.begin(), metadataCharacteristicsCountPositions.end(), - std::minus<std::uint32_t>() ); + //Back to writing characteristics count and length in Metadata + std::vector<std::uint32_t> metadataCharacteristicsLengths( metadataPositions.size() ); + for( unsigned int i = 0; i < metadataPositions.size(); ++i ) + metadataCharacteristicsLengths[i] = metadataPositions[i] - metadataCharacteristicsCountPositions[i]; + MemcpyToBuffers( metadataBuffers, metadataCharacteristicsCountPositions, &characteristicsCounter, 1 ); MemcpyToBuffers( metadataBuffers, metadataCharacteristicsCountPositions, metadataCharacteristicsLengths, 4 ); //vector to vector lf_MovePositions( -5, metadataCharacteristicsCountPositions ); //back to original position - //here write payload - - - - ++m_VariablesCount; } //end of function + /** + * Checks for var index current size and reallocates if needed for newIndexSize (coming from GetVariableIndexSize) + * @param newIndexSize size of new variable index to be written + */ + void CheckVariableIndexSize( const std::size_t newIndexSize ); + private: @@ -484,6 +467,16 @@ private: */ template< class T > inline std::int8_t GetDataType( ) noexcept { return type_unknown; } + /** + * Grows a buffer until newDataSize bytes can be written into the buffer from currentPosition. + * Buffer will grow at a rate given by m_GrowthFactor + * @param newDataSize size of data to be written after current position + * @param currentPosition current buffer position + * @param buffer to be reallocated + */ + void GrowBuffer( const std::size_t newDataSize, const std::size_t currentPosition, std::vector<char>& buffer ); + + }; diff --git a/include/functions/adiosTemplates.h b/include/functions/adiosTemplates.h index 1e675933e..edeb177f0 100644 --- a/include/functions/adiosTemplates.h +++ b/include/functions/adiosTemplates.h @@ -8,6 +8,14 @@ #ifndef ADIOSTEMPLATES_H_ #define ADIOSTEMPLATES_H_ +/// \cond EXCLUDE_FROM_DOXYGEN +#include <cstring> //std::memcpy +#include <vector> +#include <thread> +/// \endcond + + + namespace adios { @@ -52,6 +60,68 @@ bool IsTypeAlias( const std::string type, } +/** + * Get the minimum and maximum values in one loop + * @param values + * @param size + * @param min + * @param max + */ +template<class T> +void GetMinMax( const T* values, const size_t size, T& min, T& max, const unsigned int cores = 1 ) noexcept +{ + min = values[0]; + max = values[0]; + + for( unsigned int i = 0; i < size; ++i ) + { + if( min < values[0] ) + min = values[0]; + + if( max > values[0] ) + max = values[0]; + } +} + + +/** + * threaded version of std::memcpy + * @param dest + * @param source + * @param count + * @param cores + */ +template<class T, class U> +void MemcpyThreads( T* destination, const U* source, std::size_t count, const unsigned int cores = 1 ) +{ + if( cores == 1 ) + { + std::memcpy( &destination[0], &source[0], count ); + return; + } + + const unsigned long long int stride = count/cores; + const unsigned long long int remainder = count % cores; + const unsigned long long int last = stride + remainder; + + std::vector<std::thread> memcpyThreads; + memcpyThreads.reserve( cores ); + + for( unsigned int core = 0; core < cores; ++core ) + { + const size_t initialDestination = stride * core / sizeof(T); + const size_t initialSource = stride * core / sizeof(U); + + if( core == cores-1 ) + memcpyThreads.push_back( std::thread( std::memcpy, &destination[initialDestination], &source[initialSource], last ) ); + else + memcpyThreads.push_back( std::thread( std::memcpy, &destination[initialDestination], &source[initialSource], stride ) ); + } + //Now join the threads (is this really needed?) + for( auto& thread : memcpyThreads ) + thread.join( ); +} + } //end namespace diff --git a/include/functions/capsuleTemplates.h b/include/functions/capsuleTemplates.h index d8f234d6e..8262ebedc 100644 --- a/include/functions/capsuleTemplates.h +++ b/include/functions/capsuleTemplates.h @@ -22,66 +22,7 @@ namespace adios { -/** - * Get the minimum and maximum values in one loop - * @param values - * @param size - * @param min - * @param max - */ -template<class T> -void GetMinMax( const T* values, const size_t size, T& min, T& max, const unsigned int cores = 1 ) noexcept -{ - min = values[0]; - max = values[0]; - - for( unsigned int i = 0; i < size; ++i ) - { - if( min < values[0] ) - min = values[0]; - if( max > values[0] ) - max = values[0]; - } -} - -/** - * threaded version of std::memcpy - * @param dest - * @param source - * @param count - * @param cores - */ -template<class T, class U> -void MemcpyThreads( T* destination, const U* source, std::size_t count, const unsigned int cores = 1 ) -{ - if( cores == 1 ) - { - std::memcpy( &destination[0], &source[0], count ); - return; - } - - const unsigned long long int stride = count/cores; - const unsigned long long int remainder = count % cores; - const unsigned long long int last = stride + remainder; - - std::vector<std::thread> memcpyThreads; - memcpyThreads.reserve( cores ); - - for( unsigned int core = 0; core < cores; ++core ) - { - const size_t initialDestination = stride * core / sizeof(T); - const size_t initialSource = stride * core / sizeof(U); - - if( core == cores-1 ) - memcpyThreads.push_back( std::thread( std::memcpy, &destination[initialDestination], &source[initialSource], last ) ); - else - memcpyThreads.push_back( std::thread( std::memcpy, &destination[initialDestination], &source[initialSource], stride ) ); - } - //Now join the threads (is this really needed?) - for( auto& thread : memcpyThreads ) - thread.join( ); -} /** diff --git a/src/capsule/Heap.cpp b/src/capsule/Heap.cpp index 10c5016fb..0472815c3 100644 --- a/src/capsule/Heap.cpp +++ b/src/capsule/Heap.cpp @@ -15,10 +15,10 @@ namespace adios Heap::Heap( const std::string accessMode, const int rankMPI, const bool debugMode, const unsigned int cores ): - Capsule( "Heap", accessMode, rankMPI, debugMode, cores ) + Capsule( "Heap", accessMode, rankMPI, debugMode, cores ), + m_Data( std::vector<char>( 16777216, '\0' ) ) { - m_Data.reserve( 16777216 ); //default capacity = 16Mb - m_Metadata.reserve( 102400 ); //default capacity = 100Kb + //m_Data.resize( 16777216 ); //default capacity = 16Mb using resize } diff --git a/src/core/Group.cpp b/src/core/Group.cpp index 341381d51..0bc699ae3 100644 --- a/src/core/Group.cpp +++ b/src/core/Group.cpp @@ -60,7 +60,7 @@ Dims Group::SetDimensions( std::initializer_list<Var> variableList ) } dimensionsCSV += variable + ","; } - dimensionsCSV.pop_back(); + dimensionsCSV.pop_back(); //remove last comma return dimensionsCSV; } @@ -72,7 +72,7 @@ Var Group::DefineVariable( const std::string variableName, const std::string typ { auto lf_CheckDimensionVariables = [&]( const std::string csv, const std::string dimensionType, const std::string variableName ) { - if( csv.empty() == false ) + if( csv.empty() == false && csv != "1" ) //skip scalars SetDimensionVariablesFlag( csv, " in " + dimensionType + " of variable " + variableName ); }; @@ -81,6 +81,9 @@ Var Group::DefineVariable( const std::string variableName, const std::string typ { if( m_Variables.count( variableName ) == 1 ) throw std::invalid_argument( "ERROR: variable " + variableName + " already exists, in call to DefineVariable\n" ); + + if( dimensionsCSV.empty() == true ) + throw std::invalid_argument( "ERROR: variable " + variableName + " dimensions can't be empty, in call to DefineVariable\n" ); } //Check for dimension variables diff --git a/src/core/Method.cpp b/src/core/Method.cpp index 989bc468f..2b304cc63 100644 --- a/src/core/Method.cpp +++ b/src/core/Method.cpp @@ -52,6 +52,7 @@ void Method::AddTransportParameters( const std::string type, const std::vector<s m_TransportParameters.push_back( mapParameters ); } + void Method::SetDefaultGroup( Group& group ) { m_Group = &group; diff --git a/src/engine/writer/Writer.cpp b/src/engine/writer/Writer.cpp index 9e5fc7c22..875f754fc 100644 --- a/src/engine/writer/Writer.cpp +++ b/src/engine/writer/Writer.cpp @@ -28,7 +28,8 @@ namespace adios Writer::Writer( const std::string streamName, const std::string accessMode, const MPI_Comm mpiComm, const Method& method, const bool debugMode, const unsigned int cores ): Engine( "Writer", streamName, accessMode, mpiComm, method, debugMode, cores, " Writer constructor (or call to ADIOS Open).\n" ), - m_Buffer{ Heap( accessMode, m_RankMPI, m_DebugMode, cores ) } + m_Buffer{ Heap( accessMode, m_RankMPI, m_DebugMode, cores ) }, + m_MaxBufferSize{ m_Buffer.m_Data.max_size() } { Init( ); } @@ -40,7 +41,14 @@ Writer::~Writer( ) void Writer::Init( ) { - InitCapsules( ); + auto itGrowthFactor = m_Method.m_Parameters.find( "buffer_growth" ); + if( itGrowthFactor != m_Method.m_Parameters.end() ) + m_GrowthFactor = std::stof( itGrowthFactor->second ); //float + + auto itMaxBufferSize = m_Method.m_Parameters.find( "max_size_MB" ); + if( itMaxBufferSize != m_Method.m_Parameters.end() ) + m_MaxBufferSize = std::stoul( itGrowthFactor->second ) * 1000000; //convert to bytes + InitTransports( ); } @@ -50,7 +58,7 @@ void Writer::Write( Group& group, const std::string variableName, const char* va auto index = PreSetVariable( group, variableName, " from call to Write char*" ); Variable<char>& variable = group.m_Char[index]; //must be a reference variable.Values = values; - WriterWriteVariable( group, variableName, variable, m_Buffer, m_Transports, m_BP1Writer ); + WriterWriteVariable( group, variableName, variable, m_GrowthFactor, m_MaxBufferSize, m_RankMPI, m_Buffer, m_Transports, m_BP1Writer ); } void Writer::Write( Group& group, const std::string variableName, const unsigned char* values ) @@ -58,7 +66,7 @@ void Writer::Write( Group& group, const std::string variableName, const unsigned auto index = PreSetVariable( group, variableName, " from call to Write unsigned char*" ); Variable<unsigned char>& variable = group.m_UChar[index]; //must be a reference variable.Values = values; - WriterWriteVariable( group, variableName, variable, m_Buffer, m_Transports, m_BP1Writer ); + WriterWriteVariable( group, variableName, variable, m_GrowthFactor, m_MaxBufferSize, m_RankMPI, m_Buffer, m_Transports, m_BP1Writer ); } void Writer::Write( Group& group, const std::string variableName, const short* values ) @@ -66,7 +74,7 @@ void Writer::Write( Group& group, const std::string variableName, const short* v auto index = PreSetVariable( group, variableName, " from call to Write short*" ); Variable<short>& variable = group.m_Short[index]; //must be a reference variable.Values = values; - WriterWriteVariable( group, variableName, variable, m_Buffer, m_Transports, m_BP1Writer ); + WriterWriteVariable( group, variableName, variable, m_GrowthFactor, m_MaxBufferSize, m_RankMPI, m_Buffer, m_Transports, m_BP1Writer ); } void Writer::Write( Group& group, const std::string variableName, const unsigned short* values ) @@ -74,7 +82,7 @@ void Writer::Write( Group& group, const std::string variableName, const unsigned auto index = PreSetVariable( group, variableName, " from call to Write unsigned short*" ); Variable<unsigned short>& variable = group.m_UShort[index]; //must be a reference variable.Values = values; - WriterWriteVariable( group, variableName, variable, m_Buffer, m_Transports, m_BP1Writer ); + WriterWriteVariable( group, variableName, variable, m_GrowthFactor, m_MaxBufferSize, m_RankMPI, m_Buffer, m_Transports, m_BP1Writer ); } void Writer::Write( Group& group, const std::string variableName, const int* values ) @@ -82,7 +90,7 @@ void Writer::Write( Group& group, const std::string variableName, const int* val auto index = PreSetVariable( group, variableName, " from call to Write int*" ); Variable<int>& variable = group.m_Int[index]; //must be a reference variable.Values = values; - WriterWriteVariable( group, variableName, variable, m_Buffer, m_Transports, m_BP1Writer ); + WriterWriteVariable( group, variableName, variable, m_GrowthFactor, m_MaxBufferSize, m_RankMPI, m_Buffer, m_Transports, m_BP1Writer ); } void Writer::Write( Group& group, const std::string variableName, const unsigned int* values ) @@ -90,7 +98,7 @@ void Writer::Write( Group& group, const std::string variableName, const unsigned auto index = PreSetVariable( group, variableName, " from call to Write unsigned int*" ); Variable<unsigned int>& variable = group.m_UInt[index]; //must be a reference variable.Values = values; - WriterWriteVariable( group, variableName, variable, m_Buffer, m_Transports, m_BP1Writer ); + WriterWriteVariable( group, variableName, variable, m_GrowthFactor, m_MaxBufferSize, m_RankMPI, m_Buffer, m_Transports, m_BP1Writer ); } void Writer::Write( Group& group, const std::string variableName, const long int* values ) @@ -98,7 +106,7 @@ void Writer::Write( Group& group, const std::string variableName, const long int auto index = PreSetVariable( group, variableName, " from call to Write long int*" ); Variable<long int>& variable = group.m_LInt[index]; //must be a reference variable.Values = values; - WriterWriteVariable( group, variableName, variable, m_Buffer, m_Transports, m_BP1Writer ); + WriterWriteVariable( group, variableName, variable, m_GrowthFactor, m_MaxBufferSize, m_RankMPI, m_Buffer, m_Transports, m_BP1Writer ); } void Writer::Write( Group& group, const std::string variableName, const unsigned long int* values ) @@ -106,7 +114,7 @@ void Writer::Write( Group& group, const std::string variableName, const unsigned auto index = PreSetVariable( group, variableName, " from call to Write unsigned long int*" ); Variable<unsigned long int>& variable = group.m_ULInt[index]; //must be a reference variable.Values = values; - WriterWriteVariable( group, variableName, variable, m_Buffer, m_Transports, m_BP1Writer ); + WriterWriteVariable( group, variableName, variable, m_GrowthFactor, m_MaxBufferSize, m_RankMPI, m_Buffer, m_Transports, m_BP1Writer ); } void Writer::Write( Group& group, const std::string variableName, const long long int* values ) @@ -114,7 +122,7 @@ void Writer::Write( Group& group, const std::string variableName, const long lon auto index = PreSetVariable( group, variableName, " from call to Write long long int*" ); Variable<long long int>& variable = group.m_LLInt[index]; //must be a reference variable.Values = values; - WriterWriteVariable( group, variableName, variable, m_Buffer, m_Transports, m_BP1Writer ); + WriterWriteVariable( group, variableName, variable, m_GrowthFactor, m_MaxBufferSize, m_RankMPI, m_Buffer, m_Transports, m_BP1Writer ); } void Writer::Write( Group& group, const std::string variableName, const unsigned long long int* values ) @@ -122,7 +130,7 @@ void Writer::Write( Group& group, const std::string variableName, const unsigned auto index = PreSetVariable( group, variableName, " from call to Write unsigned long long int*" ); Variable<unsigned long long int>& variable = group.m_ULLInt[index]; //must be a reference variable.Values = values; - WriterWriteVariable( group, variableName, variable, m_Buffer, m_Transports, m_BP1Writer ); + WriterWriteVariable( group, variableName, variable, m_GrowthFactor, m_MaxBufferSize, m_RankMPI, m_Buffer, m_Transports, m_BP1Writer ); } void Writer::Write( Group& group, const std::string variableName, const float* values ) @@ -130,7 +138,7 @@ void Writer::Write( Group& group, const std::string variableName, const float* v auto index = PreSetVariable( group, variableName, " from call to Write float*" ); Variable<float>& variable = group.m_Float[index]; //must be a reference variable.Values = values; - WriterWriteVariable( group, variableName, variable, m_Buffer, m_Transports, m_BP1Writer ); + WriterWriteVariable( group, variableName, variable, m_GrowthFactor, m_MaxBufferSize, m_RankMPI, m_Buffer, m_Transports, m_BP1Writer ); } @@ -139,7 +147,7 @@ void Writer::Write( Group& group, const std::string variableName, const double* auto index = PreSetVariable( group, variableName, " from call to Write double*" ); Variable<double>& variable = group.m_Double[index]; //must be a reference variable.Values = values; - WriterWriteVariable( group, variableName, variable, m_Buffer, m_Transports, m_BP1Writer ); + WriterWriteVariable( group, variableName, variable, m_GrowthFactor, m_MaxBufferSize, m_RankMPI, m_Buffer, m_Transports, m_BP1Writer ); } @@ -148,7 +156,7 @@ void Writer::Write( Group& group, const std::string variableName, const long dou auto index = PreSetVariable( group, variableName, " from call to Write long double*" ); Variable<long double>& variable = group.m_LDouble[index]; //must be a reference variable.Values = values; - WriterWriteVariable( group, variableName, variable, m_Buffer, m_Transports, m_BP1Writer ); + WriterWriteVariable( group, variableName, variable, m_GrowthFactor, m_MaxBufferSize, m_RankMPI, m_Buffer, m_Transports, m_BP1Writer ); } @@ -309,7 +317,20 @@ void Writer::InitTransports( ) void Writer::Close( const int transportIndex ) { + //this should be done by BP1Format + if( transportIndex == -1 ) // all transports + { + for( auto& transport : m_Transports ) + transport->Write( m_Buffer.m_Data.data(), m_Buffer.m_Data.size() ); + for( auto& transport : m_Transports ) + transport->Close( ); + } + else + { + m_Transports[ transportIndex ]->Write( m_Buffer.m_Data.data(), m_Buffer.m_Data.size() ); + m_Transports[ transportIndex ]->Close( ); + } } diff --git a/src/format/BP1Writer.cpp b/src/format/BP1Writer.cpp new file mode 100644 index 000000000..9c8960482 --- /dev/null +++ b/src/format/BP1Writer.cpp @@ -0,0 +1,47 @@ +/* + * BP1Writer.cpp + * + * Created on: Feb 1, 2017 + * Author: wfg + */ + + +#include "format/BP1Writer.h" + + + +namespace adios +{ +namespace format +{ + + +void BP1Writer::CheckVariableIndexSize( const std::size_t newIndexSize ) +{ + const std::size_t currentCapacity = m_VariableIndex.capacity( ); + const std::size_t availableSpace = currentCapacity - m_VariableIndexPosition; + + if( newIndexSize > availableSpace ) + GrowBuffer( newIndexSize, m_VariableIndexPosition, m_VariableIndex ); +} + + +void BP1Writer::GrowBuffer( const std::size_t newDataSize, const std::size_t currentPosition, std::vector<char>& buffer ) +{ + const size_t currentCapacity = buffer.capacity( ); + std::size_t newCapacity = static_cast<std::size_t>( std::ceil( m_GrowthFactor * currentCapacity ) ); + std::size_t newAvailableSpace = newCapacity - currentPosition; + + while( newDataSize > newAvailableSpace ) + { + newCapacity = static_cast<std::size_t>( std::ceil( m_GrowthFactor * newCapacity ) ); + newAvailableSpace = newCapacity - currentPosition; + } + + m_VariableIndex.resize( newCapacity ); +} + + + +} //end namespace format +} //end namespace adios diff --git a/src/mpidummy.cpp b/src/mpidummy.cpp index 18970f760..811e90c7b 100644 --- a/src/mpidummy.cpp +++ b/src/mpidummy.cpp @@ -198,7 +198,7 @@ int MPI_File_read(MPI_File fh, void *buf, int count, MPI_Datatype datatype, MPI_ std::uint64_t bytes_read; bytes_read = read (fh, buf, bytes_to_read); if (bytes_read != bytes_to_read) { - snprintf(mpierrmsg, MPI_MAX_ERROR_STRING, "could not read %llu bytes. read only: %llu \n", bytes_to_read, bytes_read); + snprintf(mpierrmsg, MPI_MAX_ERROR_STRING, "could not read %" PRId64 " bytes. read only: %" PRId64 "\n", bytes_to_read, bytes_read); return -2; } *status = bytes_read; -- GitLab