From 89742d8bfe87ef0c3a88af260aed8843a86f327a Mon Sep 17 00:00:00 2001 From: wfg <wfg@pc0098504.ornl.gov> Date: Thu, 23 Mar 2017 17:39:25 -0400 Subject: [PATCH] Working on time aggregation Will need a new branch to implement variable merging as it implies changing the BP1.h structs --- include/engine/bp/BPFileWriter.h | 4 ++++ include/format/BP1.h | 1 + include/format/BP1Writer.h | 12 ++++++++++++ src/engine/bp/BPFileWriter.cpp | 16 ++++++---------- src/format/BP1Aggregator.cpp | 6 +++--- src/format/BP1Writer.cpp | 26 +++++++++++++------------- src/transport/file/FStream.cpp | 2 +- src/transport/file/FileDescriptor.cpp | 2 +- src/transport/file/FilePointer.cpp | 2 +- 9 files changed, 42 insertions(+), 29 deletions(-) diff --git a/include/engine/bp/BPFileWriter.h b/include/engine/bp/BPFileWriter.h index 969d54b19..da39617bf 100644 --- a/include/engine/bp/BPFileWriter.h +++ b/include/engine/bp/BPFileWriter.h @@ -119,6 +119,10 @@ private: variable.m_AppValues = values; m_WrittenVariables.insert( variable.m_Name ); + //if first timestep Write + if( m_MetadataSet.DataPGIsOpen == false ) //create a new pg index timestep ready to write variables + WriteProcessGroupIndex( ); + //pre-calculate new metadata and payload sizes m_TransportFlush = CheckBuffersAllocation( m_BP1Writer.GetVariableIndexSize( variable ), variable.PayLoadSize(), m_GrowthFactor, m_MaxBufferSize, diff --git a/include/format/BP1.h b/include/format/BP1.h index 10c28a282..aac0b3534 100644 --- a/include/format/BP1.h +++ b/include/format/BP1.h @@ -50,6 +50,7 @@ struct BP1MetadataSet const unsigned int MiniFooterSize = 28; ///< 28 for now //PG (relative) positions in Data buffer to be updated + std::uint32_t DataPGVarsCount = 0; std::size_t DataPGLengthPosition = 0; ///< current PG initial ( relative ) position, needs to be updated in every advance step or init std::size_t DataVarsCountPosition = 0; ///< current PG variable count ( relative ) position, needs to be updated in every advance step or init bool DataPGIsOpen = false; diff --git a/include/format/BP1Writer.h b/include/format/BP1Writer.h index 32521f498..77a8c7fe9 100644 --- a/include/format/BP1Writer.h +++ b/include/format/BP1Writer.h @@ -270,6 +270,17 @@ public: } } + //Characteristic time index in metadata and data + characteristicID = characteristic_time_index; + MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, &characteristicID, 1 ); + MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, &metadataSet.TimeStep, 4 ); + + MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &characteristicID, 1 ); + const std::uint16_t lengthOfTimeIndex = 4; + MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &lengthOfTimeIndex, 2 ); //add length of characteristic in data + MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &metadataSet.TimeStep, 4 ); + ++characteristicsCounter; + //Back to characteristics count and length in Data //count std::memcpy( &buffer.m_Data[dataCharacteristicsCountPosition], &characteristicsCounter, 1 ); @@ -305,6 +316,7 @@ public: std::memcpy( &metadataSet.VarsIndex[metadataVarLengthPosition], &metadataVarEntryLength, 4 ); ++metadataSet.VarsCount; + ++metadataSet.DataPGVarsCount; } diff --git a/src/engine/bp/BPFileWriter.cpp b/src/engine/bp/BPFileWriter.cpp index d717e6d01..cfc641e33 100644 --- a/src/engine/bp/BPFileWriter.cpp +++ b/src/engine/bp/BPFileWriter.cpp @@ -146,7 +146,7 @@ void BPFileWriter::Write( const std::string variableName, const void* values ) / void BPFileWriter::Advance( ) { - + m_BP1Writer.Advance( m_MetadataSet, m_Buffer ); } @@ -364,7 +364,6 @@ void BPFileWriter::InitProcessGroup( ) } WriteProcessGroupIndex( ); - m_MetadataSet.DataPGIsOpen = true; if( m_MetadataSet.Log.m_IsActive == true ) m_MetadataSet.Log.m_Timers[0].SetTime(); @@ -375,11 +374,9 @@ void BPFileWriter::InitProcessGroup( ) void BPFileWriter::WriteProcessGroupIndex( ) { //pg = process group - const std::string name( std::to_string( m_RankMPI ) ); //using rank as name - const unsigned int timeStep = m_MetadataSet.TimeStep; - const std::string timeStepName( std::to_string( timeStep ) ); - const std::size_t pgIndexSize = m_BP1Writer.GetProcessGroupIndexSize( name, timeStepName, m_Transports.size() ); - + const std::size_t pgIndexSize = m_BP1Writer.GetProcessGroupIndexSize( std::to_string( m_RankMPI ), + std::to_string( m_MetadataSet.TimeStep ), + m_Transports.size() ); //metadata GrowBuffer( pgIndexSize, m_GrowthFactor, m_MetadataSet.PGIndexPosition, m_MetadataSet.PGIndex ); @@ -387,11 +384,10 @@ void BPFileWriter::WriteProcessGroupIndex( ) GrowBuffer( pgIndexSize, m_GrowthFactor, m_Buffer.m_DataPosition, m_Buffer.m_Data ); const bool isFortran = ( m_HostLanguage == "Fortran" ) ? true : false; - const unsigned int processID = static_cast<unsigned int> ( m_RankMPI ); - m_BP1Writer.WriteProcessGroupIndex( isFortran, name, processID, timeStepName, timeStep, m_Transports, + m_BP1Writer.WriteProcessGroupIndex( isFortran, std::to_string( m_RankMPI ), static_cast<unsigned int> ( m_RankMPI ), + std::to_string( m_MetadataSet.TimeStep ), m_MetadataSet.TimeStep, m_Transports, m_Buffer, m_MetadataSet ); - } diff --git a/src/format/BP1Aggregator.cpp b/src/format/BP1Aggregator.cpp index 9e38f3786..89a067e7a 100644 --- a/src/format/BP1Aggregator.cpp +++ b/src/format/BP1Aggregator.cpp @@ -5,10 +5,11 @@ * Author: wfg */ + +/// \cond EXCLUDE_FROM_DOXYGEN #include <vector> #include <fstream> -#include <iostream> // must be deleted -#include <unistd.h> // must be deleted +/// \endcond #include "format/BP1Aggregator.h" @@ -74,7 +75,6 @@ void BP1Aggregator::WriteProfilingLog( const std::string fileName, const std::st } logFile += " }\n"; - std::cout << logFile; std::ofstream logStream( fileName ); logStream.write( logFile.c_str(), logFile.size() ); logStream.close(); diff --git a/src/format/BP1Writer.cpp b/src/format/BP1Writer.cpp index cb63d54fd..bd7ed78eb 100644 --- a/src/format/BP1Writer.cpp +++ b/src/format/BP1Writer.cpp @@ -36,12 +36,10 @@ void BP1Writer::WriteProcessGroupIndex( const bool isFortran, const std::string const std::vector< std::shared_ptr<Transport> >& transports, capsule::STLVector& buffer, BP1MetadataSet& metadataSet ) const noexcept { - metadataSet.DataPGLengthPosition = buffer.m_DataPosition; - // adapt this part to local variables const std::vector<int> methodIDs = GetMethodIDs( transports ); - const std::size_t dataPGLengthPosition = buffer.m_DataPosition; + metadataSet.DataPGLengthPosition = buffer.m_DataPosition; const std::size_t metadataPGLengthPosition = metadataSet.PGIndexPosition; metadataSet.PGIndexPosition += 2; //skip length of pg in metadata, 2 bytes, would write at the end @@ -80,7 +78,6 @@ void BP1Writer::WriteProcessGroupIndex( const bool isFortran, const std::string const std::uint16_t metadataPGIndexLength = metadataSet.PGIndexPosition - metadataPGLengthPosition - 2; //without length of group record std::memcpy( &metadataSet.PGIndex[metadataPGLengthPosition], &metadataPGIndexLength, 2 ); - //here write method in data const std::uint8_t methodsSize = methodIDs.size(); MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &methodsSize, 1 ); //method count @@ -92,12 +89,16 @@ void BP1Writer::WriteProcessGroupIndex( const bool isFortran, const std::string buffer.m_DataPosition += 2; //skip method params length = 0 (2 bytes) for now } - buffer.m_DataAbsolutePosition = buffer.m_DataPosition - dataPGLengthPosition; //update aboslute position + buffer.m_DataAbsolutePosition += buffer.m_DataPosition - metadataSet.DataPGLengthPosition; //update absolute position + + metadataSet.DataVarsCountPosition = buffer.m_DataPosition; //update vars count and vars count position buffer.m_DataPosition += 12; //add vars count and length buffer.m_DataAbsolutePosition += 12; //add vars count and length - metadataSet.PGCount += 1; + ++metadataSet.PGCount; + metadataSet.DataPGVarsCount = 0; + metadataSet.DataPGIsOpen = true; } @@ -118,9 +119,7 @@ void BP1Writer::Close( BP1MetadataSet& metadataSet, capsule::STLVector& buffer, if( isFirstClose == true ) { if( metadataSet.DataPGIsOpen == true ) - { FlattenData( metadataSet, buffer ); - } FlattenMetadata( metadataSet, buffer ); @@ -131,7 +130,6 @@ void BP1Writer::Close( BP1MetadataSet& metadataSet, capsule::STLVector& buffer, { //here call aggregator } - isFirstClose = false; } @@ -250,17 +248,20 @@ void BP1Writer::WriteDimensionRecord( std::vector<char>& buffer, std::size_t& po void BP1Writer::FlattenData( BP1MetadataSet& metadataSet, capsule::STLVector& buffer ) const noexcept { //vars count and Length - std::memcpy( &buffer.m_Data[metadataSet.DataVarsCountPosition], &metadataSet.VarsCount, 4 ); //count + std::memcpy( &buffer.m_Data[metadataSet.DataVarsCountPosition], &metadataSet.DataPGVarsCount, 4 ); //count const std::uint64_t dataVarsLength = buffer.m_DataPosition - metadataSet.DataVarsCountPosition - 8 - 4; //without record itself and vars count std::memcpy( &buffer.m_Data[metadataSet.DataVarsCountPosition+4], &dataVarsLength, 8 ); //length - //attributes (empty for now) count and length by moving positions + //attributes (empty for now) count (4) and length (8) are zero by moving positions in time step zero buffer.m_DataPosition += 12; buffer.m_DataAbsolutePosition += 12; - //Finish writing pg group length and, vars count and length in Data + //Finish writing pg group length const std::uint64_t dataPGLength = buffer.m_DataPosition - metadataSet.DataPGLengthPosition - 8; //without record itself, 12 due to empty attributes std::memcpy( &buffer.m_Data[metadataSet.DataPGLengthPosition], &dataPGLength, 8 ); + + ++metadataSet.TimeStep; + metadataSet.DataPGIsOpen = false; } @@ -324,6 +325,5 @@ void BP1Writer::FlattenMetadata( BP1MetadataSet& metadataSet, capsule::STLVector - } //end namespace format } //end namespace adios diff --git a/src/transport/file/FStream.cpp b/src/transport/file/FStream.cpp index e471b2f6a..9d571ebaf 100644 --- a/src/transport/file/FStream.cpp +++ b/src/transport/file/FStream.cpp @@ -19,7 +19,7 @@ namespace transport FStream::FStream( MPI_Comm mpiComm, const bool debugMode ): - Transport( "FStream", mpiComm, debugMode ) + Transport( "fstream", mpiComm, debugMode ) { } diff --git a/src/transport/file/FileDescriptor.cpp b/src/transport/file/FileDescriptor.cpp index 0128efeaf..dabb4e2a2 100644 --- a/src/transport/file/FileDescriptor.cpp +++ b/src/transport/file/FileDescriptor.cpp @@ -25,7 +25,7 @@ namespace transport { FileDescriptor::FileDescriptor( MPI_Comm mpiComm, const bool debugMode ): - Transport( "FD", mpiComm, debugMode ) + Transport( "POSIX_IO", mpiComm, debugMode ) { } diff --git a/src/transport/file/FilePointer.cpp b/src/transport/file/FilePointer.cpp index 9340664b3..181af76a9 100644 --- a/src/transport/file/FilePointer.cpp +++ b/src/transport/file/FilePointer.cpp @@ -20,7 +20,7 @@ namespace transport FilePointer::FilePointer( MPI_Comm mpiComm, const bool debugMode ): - Transport( "File", mpiComm, debugMode ) + Transport( "FILE*", mpiComm, debugMode ) { } -- GitLab