diff --git a/examples/hello/fstream.xml b/examples/hello/fstream.xml index 3065a1a2805b8c6a4202e98057ea6712302dddc2..e46437445a1af49d4a8ee0465ea5a8762cfd430e 100644 --- a/examples/hello/fstream.xml +++ b/examples/hello/fstream.xml @@ -6,8 +6,16 @@ <attribute name="description" value="1 to 10"/> </adios-group> - <transport group="Types" method="FStream">verbose=3</transport> - + + <transform type="bzip2" > + </transform> + + <!-- transport group="Types" method="FStream">verbose=3</transport> --> + + <method name="SingleFile" profile_units="mus" max_buffer_size="10000"> + <transport type="File" have_metadata_file="no" aggregators="10" profile_units="mus"/> + </method> + <!-- <buffer size-MB="40" allocate-time="now"/> --> </adios-config> diff --git a/examples/hello/timeBP/Makefile b/examples/hello/timeBP/Makefile new file mode 100644 index 0000000000000000000000000000000000000000..2c80ba83ad6861ddd6ea9c6b37a57ddac58f0bb6 --- /dev/null +++ b/examples/hello/timeBP/Makefile @@ -0,0 +1,31 @@ +# Makefile for testing purposes, will build helloWriter_OOP_mpi (make or make mpi) or helloWriter_OOP_nompi (make nompi) +# Created on: Oct 4, 2016 +# Author: wfg + +BASE_NAME=timeBPWriter + +#COMPILERS +CC=g++ +MPICC=mpic++ + +#ADIOS LOCATION +ADIOS_DIR=../../.. +ADIOS_INCLUDE=-I$(ADIOS_DIR)/include +ADIOS_LIB=$(ADIOS_DIR)/lib/libadios.a +ADIOS_NOMPI_LIB=$(ADIOS_DIR)/lib/libadios_nompi.a + +#FLAGS +CFLAGS=-Wall -Wpedantic -std=c++11 -O0 -g +LDFLAGS= + +all: mpi nompi + +mpi: $(ADIOS_LIB) $(ADIOS_HFiles) + $(MPICC) $(CFLAGS) $(ADIOS_INCLUDE) $(BASE_NAME).cpp -o $(BASE_NAME).exe $(ADIOS_LIB) $(LDFLAGS) -lpthread + +nompi: $(ADIOS_NOMPI_LIB) $(NoMPI_HFiles) + $(CC) $(CFLAGS) $(ADIOS_INCLUDE) -DADIOS_NOMPI $(BASE_NAME)_nompi.cpp -o $(BASE_NAME)_nompi.exe $(ADIOS_NOMPI_LIB) $(LDFLAGS) -lpthread + +clean: + rm *.exe + \ No newline at end of file diff --git a/examples/hello/timeBP/test.sh b/examples/hello/timeBP/test.sh new file mode 100755 index 0000000000000000000000000000000000000000..ef262a74c54cf2bdd7895fb273ce3516e9279667 --- /dev/null +++ b/examples/hello/timeBP/test.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +rm -fr *.bp +make clean +make -j4 diff --git a/examples/hello/timeBP/timeBPWriter.cpp b/examples/hello/timeBP/timeBPWriter.cpp new file mode 100644 index 0000000000000000000000000000000000000000..771a9ebecbafbed34f18aba212eb17f3192d7bb1 --- /dev/null +++ b/examples/hello/timeBP/timeBPWriter.cpp @@ -0,0 +1,113 @@ +/* + * timeBPWriter.cpp example for time aggregation + * + * Created on: Feb 16, 2017 + * Author: wfg + */ + + +#include <vector> +#include <iostream> + +#include <mpi.h> + + +#include "ADIOS_CPP.h" + + +int main( int argc, char* argv [] ) +{ + MPI_Init( &argc, &argv ); + int rank; + MPI_Comm_rank( MPI_COMM_WORLD, &rank ); + const bool adiosDebug = true; + adios::ADIOS adios( MPI_COMM_WORLD, adios::Verbose::ERROR, adiosDebug ); + + //Application variable + std::vector<double> myDoubles = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; + const std::size_t Nx = myDoubles.size(); + + const std::size_t rows = 3; + const std::size_t columns = 3; + + std::vector<float> myMatrix; + if( rank % 2 == 0 ) //even rank + { + myMatrix.reserve( rows * columns ); + myMatrix.push_back( 1 ); myMatrix.push_back( 2 ), myMatrix.push_back( 3 ); + myMatrix.push_back( 4 ); myMatrix.push_back( 5 ), myMatrix.push_back( 6 ); + myMatrix.push_back( 7 ); myMatrix.push_back( 8 ), myMatrix.push_back( 8 ); + } + + std::vector<float> myMatrix2 = { -1, -2, -3, + -4, -5, -6, + -7, -8, -9 }; + + try + { + //Define variable and local size + adios::Variable<double>& ioMyDoubles = adios.DefineVariable<double>( "myDoubles", {Nx} ); + adios::Variable<float>& ioMyMatrix = adios.DefineVariable<float>( "myMatrix", {rows,columns} ); + adios::Variable<float>& ioMyMatrix2 = adios.DefineVariable<float>( "myMatrix2", {rows,columns} ); + + //Define method for engine creation, it is basically straight-forward parameters + adios::Method& bpWriterSettings = adios.DeclareMethod( "MyMethod" ); //default method type is BPWriter + bpWriterSettings.SetParameters( "profile_units=mus" ); + bpWriterSettings.AddTransport( "File", "profile_units=mus", "have_metadata_file=no" ); //uses default POSIX library + + //Create engine smart pointer due to polymorphism, + //Open returns a smart pointer to Engine containing the Derived class Writer + auto bpWriter = adios.Open( "time.bp", "w", bpWriterSettings ); + + if( bpWriter == nullptr ) + throw std::ios_base::failure( "ERROR: couldn't create bpWriter at Open\n" ); + + + for( unsigned int t = 0; t < 10; ++t ) + { + myDoubles[0] = t; + bpWriter->Write<double>( ioMyDoubles, myDoubles.data() ); // Base class Engine own the Write<T> that will call overloaded Write from Derived + + if( rank % 2 == 0 ) //even rank + { + myMatrix[0] = t; + myMatrix2[0] = t; + + bpWriter->Write<float>( ioMyMatrix, myMatrix.data() ); + bpWriter->Write<float>( ioMyMatrix2, myMatrix2.data() ); + } + bpWriter->Advance(); + } + + bpWriter->Close( ); + } + catch( std::invalid_argument& e ) + { + if( rank == 0 ) + { + std::cout << "Invalid argument exception, STOPPING PROGRAM\n"; + std::cout << e.what() << "\n"; + } + } + catch( std::ios_base::failure& e ) + { + if( rank == 0 ) + { + std::cout << "System exception, STOPPING PROGRAM\n"; + std::cout << e.what() << "\n"; + } + } + catch( std::exception& e ) + { + if( rank == 0 ) + { + std::cout << "Exception, STOPPING PROGRAM\n"; + std::cout << e.what() << "\n"; + } + } + + MPI_Finalize( ); + + return 0; + +} diff --git a/examples/hello/timeBP/timeBPWriter_nompi.cpp b/examples/hello/timeBP/timeBPWriter_nompi.cpp new file mode 100644 index 0000000000000000000000000000000000000000..d85a1b523be4b929242df7c102aa9dd647d38327 --- /dev/null +++ b/examples/hello/timeBP/timeBPWriter_nompi.cpp @@ -0,0 +1,86 @@ +/* + * timeBPWriter.cpp example for time aggregation + * + * Created on: Feb 16, 2017 + * Author: wfg + */ + + +#include <vector> +#include <iostream> + +#include "ADIOS_CPP.h" + + +int main( int argc, char* argv [] ) +{ + const bool adiosDebug = true; + adios::ADIOS adios( adios::Verbose::ERROR, adiosDebug ); + + //Application variable + std::vector<double> myDoubles = { 10, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; + const std::size_t Nx = myDoubles.size(); + + const std::size_t rows = 3; + const std::size_t columns = 3; + + std::vector<float> myMatrix; + myMatrix.reserve( rows * columns ); + myMatrix.push_back( 1 ); myMatrix.push_back( 2 ), myMatrix.push_back( 3 ); + myMatrix.push_back( 4 ); myMatrix.push_back( 5 ), myMatrix.push_back( 6 ); + myMatrix.push_back( 7 ); myMatrix.push_back( 8 ), myMatrix.push_back( 8 ); + + + std::vector<float> myMatrix2 = { -1, -2, -3, + -4, -5, -6, + -7, -8, -9 }; + + try + { + //Define variable and local size + adios::Variable<double>& ioMyDoubles = adios.DefineVariable<double>( "myDoubles", {Nx} ); + adios::Variable<float>& ioMyMatrix = adios.DefineVariable<float>( "myMatrix", {rows,columns} ); + adios::Variable<float>& ioMyMatrix2 = adios.DefineVariable<float>( "myMatrix2", {rows,columns} ); + + //Define method for engine creation, it is basically straight-forward parameters + adios::Method& bpWriterSettings = adios.DeclareMethod( "SingleFile" ); //default method type is BPWriter + bpWriterSettings.SetParameters( "profile_units=mus" ); + bpWriterSettings.AddTransport( "File", "profile_units=mus", "have_metadata_file=no" ); //uses default POSIX library + + //Create object directly rather than using polymorphism with ADIOS.Open + adios::BPFileWriter bpWriter( adios, "time_nompi.bp", "w", adios.m_MPIComm, bpWriterSettings ); + + for( unsigned int t = 0; t < 3; ++t ) + { + myDoubles[0] = t; // t * -1; + myMatrix[0] = t; + myMatrix2[0] = t; + + bpWriter.Write( ioMyDoubles, myDoubles.data() ); // Base class Engine own the Write<T> that will call overloaded Write from Derived + bpWriter.Write( ioMyMatrix, myMatrix.data() ); + bpWriter.Write( ioMyMatrix2, myMatrix2.data() ); + bpWriter.Advance(); + } + + bpWriter.Close( ); + } + catch( std::invalid_argument& e ) + { + std::cout << "Invalid argument exception, STOPPING PROGRAM\n"; + std::cout << e.what() << "\n"; + + } + catch( std::ios_base::failure& e ) + { + std::cout << "System exception, STOPPING PROGRAM\n"; + std::cout << e.what() << "\n"; + } + catch( std::exception& e ) + { + std::cout << "Exception, STOPPING PROGRAM\n"; + std::cout << e.what() << "\n"; + } + + return 0; + +} diff --git a/include/ADIOS.h b/include/ADIOS.h index b91898773962f0789f588be7d41a9523235f6bee..3009c3eb4688c0f663fc7c06d6620eaa654502cc 100644 --- a/include/ADIOS.h +++ b/include/ADIOS.h @@ -149,7 +149,7 @@ public: // PUBLIC Constructors and Functions define the User Interface with ADIO * @return Derived class of base Engine depending on Method parameters, shared_ptr for potential flexibility */ std::shared_ptr<Engine> Open( const std::string streamName, const std::string accessMode, MPI_Comm mpiComm, - const Method& method, const IOMode iomode, const float timeout_sec = 0.0 ); + const Method& method, const IOMode iomode = IOMode::INDEPENDENT, const float timeout_sec = 0.0 ); /** * @brief Open to Write, Read. Creates a new engine from previously defined method. @@ -162,7 +162,7 @@ public: // PUBLIC Constructors and Functions define the User Interface with ADIO * @return Derived class of base Engine depending on Method parameters, shared_ptr for potential flexibility */ std::shared_ptr<Engine> Open( const std::string streamName, const std::string accessMode, - const Method& method, const IOMode iomode, const float timeout_sec = 0.0 ); + const Method& method, const IOMode iomode = IOMode::INDEPENDENT, const float timeout_sec = 0.0 ); /** @@ -176,7 +176,7 @@ public: // PUBLIC Constructors and Functions define the User Interface with ADIO * @return Derived class of base Engine depending on Method parameters, shared_ptr for potential flexibility */ std::shared_ptr<Engine> Open( const std::string streamName, const std::string accessMode, MPI_Comm mpiComm, - const std::string methodName, const IOMode iomode, const float timeout_sec = 0.0 ); + const std::string methodName, const IOMode iomode = IOMode::INDEPENDENT, const float timeout_sec = 0.0 ); /** * Version required by the XML config file implementation, searches method inside ADIOS through a unique name. @@ -189,7 +189,8 @@ public: // PUBLIC Constructors and Functions define the User Interface with ADIO * @return Derived class of base Engine depending on Method parameters, shared_ptr for potential flexibility */ std::shared_ptr<Engine> Open( const std::string streamName, const std::string accessMode, - const std::string methodName, const IOMode iomode, const float timeout_sec = 0.0 ); + const std::string methodName, const IOMode iomode = IOMode::INDEPENDENT, + const float timeout_sec = 0.0 ); /** * @brief Open to Read all steps from a file. No streaming, advancing is possible here. All steps in the file @@ -201,7 +202,7 @@ public: // PUBLIC Constructors and Functions define the User Interface with ADIO * @return Derived class of base Engine depending on Method parameters, shared_ptr for potential flexibility */ std::shared_ptr<Engine> OpenFileReader( const std::string fileName, MPI_Comm mpiComm, - const Method& method, const IOMode iomode ); + const Method& method, const IOMode iomode = IOMode::INDEPENDENT ); /** * @brief Open to Read all steps from a file. No streaming, advancing is possible here. All steps in the file @@ -214,7 +215,7 @@ public: // PUBLIC Constructors and Functions define the User Interface with ADIO * @return Derived class of base Engine depending on Method parameters, shared_ptr for potential flexibility */ std::shared_ptr<Engine> OpenFileReader( const std::string fileName, MPI_Comm mpiComm, - const std::string methodName, const IOMode iomode ); + const std::string methodName, const IOMode iomode = IOMode::INDEPENDENT ); /** * @brief Dumps groups information to a file stream or standard output. diff --git a/include/engine/bp/BPFileWriter.h b/include/engine/bp/BPFileWriter.h index ae9a43ba87c7db0678a6e4b565ba8cfb7bdaeab1..923d0b22764be8fa10977f6bb4c36be2e096ea82 100644 --- a/include/engine/bp/BPFileWriter.h +++ b/include/engine/bp/BPFileWriter.h @@ -33,7 +33,7 @@ public: * @param debugMode */ BPFileWriter( ADIOS& adios, const std::string name, const std::string accessMode, MPI_Comm mpiComm, - const Method& method, const IOMode iomode, const float timeout_sec, + const Method& method, const IOMode iomode = IOMode::INDEPENDENT, const float timeout_sec = 0., const bool debugMode = false, const unsigned int nthreads = 1 ); ~BPFileWriter( ); @@ -74,7 +74,7 @@ public: void Write( const std::string variableName, const std::complex<long double>* values ); void Write( const std::string variableName, const void* values ); - void Advance( ); + void Advance( float timeout_sec = 0.0 ); /** * Closes a single transport or all transports @@ -121,17 +121,15 @@ private: m_WrittenVariables.insert( variable.m_Name ); //if first timestep Write - if( m_MetadataSet.DataPGIsOpen == false ) //create a new pg index timestep ready to write variables + if( m_MetadataSet.DataPGIsOpen == false ) //create a new pg index WriteProcessGroupIndex( ); //pre-calculate new metadata and payload sizes - m_TransportFlush = CheckBuffersAllocation( m_BP1Writer.GetVariableIndexSize( variable ), variable.PayLoadSize(), - m_GrowthFactor, m_MaxBufferSize, - m_MetadataSet.VarsIndexPosition, m_MetadataSet.VarsIndex, - m_Buffer.m_DataPosition, m_Buffer.m_Data ); +// m_TransportFlush = CheckBufferAllocation( m_BP1Writer.GetVariableIndexSize( variable ) + variable.PayLoadSize(), +// m_GrowthFactor, m_MaxBufferSize, m_Buffer.m_Data ); //WRITE INDEX to data buffer and metadata structure (in memory)// - m_BP1Writer.WriteVariableIndex( variable, m_Buffer, m_MetadataSet ); + m_BP1Writer.WriteVariableMetadata( variable, m_Buffer, m_MetadataSet ); if( m_TransportFlush == true ) //in batches { diff --git a/include/format/BP1.h b/include/format/BP1.h index aac0b353499d59464845812e7291f88c3a7fdcd0..eed6b17be8484e87b5038538fb97ab6544567f6a 100644 --- a/include/format/BP1.h +++ b/include/format/BP1.h @@ -10,6 +10,10 @@ /// \cond EXCLUDE_FROM_DOXYGEN #include <memory> //std::shared_ptr +#include <cstdint> //std::uintX_t +#include <unordered_map> +#include <vector> +//#include <queue> //std::priority_queue to be added later /// \endcond #if ADIOS_NOMPI @@ -28,34 +32,44 @@ namespace format { /** - * Struct that tracks metadata indices in bp format + * Used for Variables and Attributes, needed in a container for characteristic sets merge independently for each Variable or Attribute */ -struct BP1MetadataSet +struct BP1Index { - std::string TimeStepName; ///< time step name associated with this PG - std::uint32_t TimeStep = 0; ///< current time step, updated with advance step, if append it will be updated to last + std::vector<char> Buffer; ///< metadata variable index, start with 100Kb + std::uint64_t Count = 0; ///< number of characteristics sets (time and spatial aggregation) + const std::uint32_t MemberID; + + BP1Index( const std::uint32_t memberID ): + MemberID{ memberID } + { + Buffer.reserve( 500 ); + } +}; - std::uint64_t PGCount = 0; ///< number of process groups - std::size_t PGIndexPosition = 16; - std::vector<char> PGIndex = std::vector<char>( 102400, '\0' ); ///< process group index metadata +/** + * Single struct that tracks metadata indices in bp format + */ +struct BP1MetadataSet +{ + std::uint32_t TimeStep; ///< current time step, updated with advance step, if append it will be updated to last, starts with one in ADIOS1 - std::uint32_t VarsCount = 0; ///< number of written Variables - std::size_t VarsIndexPosition = 12; ///< initial position in bytes - std::vector<char> VarsIndex = std::vector<char>( 102400, '\0' ); ///< metadata variable index, start with 1Kb + BP1Index PGIndex = BP1Index( 0 ); ///< single buffer for PGIndex - std::uint32_t AttributesCount = 0; ///< number of Attributes - std::size_t AttributesIndexPosition = 12; ///< initial position in bytes - std::vector<char> AttributesIndex = std::vector<char>( 102400, '\0' ); ///< metadata attribute index, start with 1Kb + //no priority for now + std::unordered_map< std::string, BP1Index > VarsIndices; ///< key: variable name, value: bp metadata variable index + std::unordered_map< std::string, BP1Index > AttributesIndices; ///< key: attribute name, value: bp metadata attribute index - const unsigned int MiniFooterSize = 28; ///< 28 for now + const unsigned int MiniFooterSize = 28; ///< from bpls reader //PG (relative) positions in Data buffer to be updated - std::uint32_t DataPGVarsCount = 0; + std::uint64_t DataPGCount = 0; std::size_t DataPGLengthPosition = 0; ///< current PG initial ( relative ) position, needs to be updated in every advance step or init - std::size_t DataVarsCountPosition = 0; ///< current PG variable count ( relative ) position, needs to be updated in every advance step or init + std::uint32_t DataPGVarsCount = 0; ///< variables in current PG + std::size_t DataPGVarsCountPosition = 0; ///< current PG variable count ( relative ) position, needs to be updated in every advance step or init bool DataPGIsOpen = false; - Profiler Log; + Profiler Log; ///< object that takes buffering profiling info }; /** @@ -121,8 +135,6 @@ protected: ,METHOD_FILE = 27 ,METHOD_ZMQ = 28 ,METHOD_MDTM = 29 - - }; @@ -184,6 +196,25 @@ protected: statistic_finite = 6 }; + template<class T> + struct Stats + { + T Min; + T Max; + std::uint64_t Offset; + std::uint64_t PayloadOffset; + std::uint32_t TimeIndex; + std::uint32_t MemberID; + + +// unsigned long int count; +// long double sum; +// long double sumSquare; + //unsigned long int histogram + //bool finite?? + }; + + /** * Returns data type index from enum Datatypes @@ -195,8 +226,7 @@ protected: return type_unknown; } - - std::vector<int> GetMethodIDs( const std::vector< std::shared_ptr<Transport> >& transports ) const noexcept; + std::vector<std::uint8_t> GetMethodIDs( const std::vector< std::shared_ptr<Transport> >& transports ) const noexcept; }; diff --git a/include/format/BP1Writer.h b/include/format/BP1Writer.h index 2cc30f7597b9e061f246ef1cf5a4422b4a2a8f1d..5e68f083f1e6a9ec2d62b146a6b8ef7a0a3f6f8c 100644 --- a/include/format/BP1Writer.h +++ b/include/format/BP1Writer.h @@ -9,8 +9,6 @@ #define BP1WRITER_H_ /// \cond EXCLUDE_FROM_DOXYGEN -#include <vector> -#include <cstdint> //std::intX_t fixed size integers #include <algorithm> //std::count, std::copy, std::for_each #include <cstring> //std::memcpy #include <cmath> //std::ceil @@ -57,17 +55,13 @@ public: * @param isFortran * @param name * @param processID - * @param timeStepName - * @param timeStep * @param transports * @param buffer * @param metadataSet */ - void WriteProcessGroupIndex( const bool isFortran, const std::string name, const unsigned int processID, - const std::string timeStepName, const unsigned int timeStep, + void WriteProcessGroupIndex( const bool isFortran, const std::string name, const std::uint32_t processID, const std::vector< std::shared_ptr<Transport> >& transports, - capsule::STLVector& buffer, - BP1MetadataSet& metadataSet ) const noexcept; + capsule::STLVector& heap, BP1MetadataSet& metadataSet ) const noexcept; /** * Returns the estimated variable index size @@ -89,13 +83,8 @@ public: indexSize += 28 * dimensions; //28 bytes per dimension indexSize += 1; //id - //characteristics 3, variable offset in data - indexSize += 8; - indexSize += 1; //id - //characteristics 6, variable payload offset in data - indexSize += 8; - indexSize += 1; //id - + //characteristics, offset + payload offset in data + indexSize += 2*(1+8); //characteristic 0, if scalar add value, for now only allowing string if( dimensions == 1 ) { @@ -118,205 +107,29 @@ public: } /** - * - * @param variable - * @param dataBuffers - * @param dataPositions - * @param dataAbsolutePositions - * @param metadataBuffers - * @param metadataPositions - * @param variablesCount - */ - template<class T> - void WriteVariableIndex( const Variable<T>& variable, capsule::STLVector& buffer, BP1MetadataSet& metadataSet ) const noexcept + * Version for primitive types (except std::complex<T>) + * @param variable + * @param heap + * @param metadataSet + */ + template<class T> inline + void WriteVariableMetadata( const Variable<T>& variable, capsule::STLVector& heap, BP1MetadataSet& metadataSet ) const noexcept { - auto lf_String = []( const std::string name, const std::uint16_t length, - std::vector<char>& buffer, std::size_t& position ) - { - MemcpyToBuffer( buffer, position, &length, 2 ); - MemcpyToBuffer( buffer, position, name.c_str(), length ); - }; - - auto lf_MemberID = []( const std::uint32_t memberID, capsule::STLVector& buffer, BP1MetadataSet& metadataSet ) - { - MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, &memberID, 4 ); - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &memberID, 4 ); - }; - - auto lf_VarName = [&]( const std::string name, capsule::STLVector& buffer, BP1MetadataSet& metadataSet ) - { - const std::uint16_t length = name.length(); - lf_String( name, length, buffer.m_Data, buffer.m_DataPosition ); - lf_String( name, length, metadataSet.VarsIndex, metadataSet.VarsIndexPosition ); - }; - - auto lf_DataType = []( const std::uint8_t dataType, capsule::STLVector& buffer, BP1MetadataSet& metadataSet ) - { - MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, &dataType, 1 ); - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &dataType, 1 ); - }; - - - //BODY of function starts here - //capture initial positions storing the variable Length - const std::size_t metadataVarLengthPosition = metadataSet.VarsIndexPosition; - const std::size_t dataVarLengthPosition = buffer.m_DataPosition; - - metadataSet.VarsIndexPosition += 4; //skip var length - buffer.m_DataPosition += 8; //skip var length - - lf_MemberID( metadataSet.VarsCount, buffer, metadataSet ); //memberID in metadata and data - metadataSet.VarsIndexPosition += 2; //skipping 2 bytes for length of group name which is zero, only in metadata - lf_VarName( variable.m_Name, buffer, metadataSet ); //variable name to metadata and data - - metadataSet.VarsIndexPosition += 2; //skip path - buffer.m_DataPosition += 2; //skip path - - //dataType - const std::uint8_t dataType = GetDataType<T>(); - lf_DataType( dataType, buffer, metadataSet ); - - //write in data if it's a dimension variable (scalar) y or n - const char dimensionYorN = ( variable.m_IsDimension ) ? 'y' : 'n'; - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &dimensionYorN, 1 ); - - //Characteristics Sets Count in Metadata - const std::uint64_t sets = 1; //write one for now - MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, &sets, 8 ); - - //Characteristics set - std::uint8_t characteristicsCounter = 0; //used for characteristics count, characteristics length will be calculated at the end - const std::size_t metadataCharacteristicsCountPosition = metadataSet.VarsIndexPosition; - metadataSet.VarsIndexPosition += 5; //here move positions 5 bytes in data and metadata for characteristics count + length - - //DIMENSIONS CHARACTERISTIC - const std::vector<std::size_t>& localDimensions = variable.m_Dimensions; - - //write to metadata characteristic - //characteristic: dimension - std::uint8_t characteristicID = characteristic_dimensions; - MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, &characteristicID, 1 ); - const std::uint8_t dimensions = localDimensions.size(); - MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, &dimensions, 1 ); - const std::uint16_t dimensionsLength = dimensions * 24; //24 is from 8 bytes for each: local dimension, global dimension, global offset - MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, &dimensionsLength, 2 ); - - //write dimensions count and length in data - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &dimensions, 1 ); - const std::uint16_t dimensionsLengthInData = dimensions * 27; //27 is from 9 bytes for each: var y/n + local, var y/n + global dimension, var y/n + global offset - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &dimensionsLengthInData, 2 ); - - std::size_t dataCharacteristicsCountPosition = buffer.m_DataPosition; //will be modified - - if( variable.m_GlobalDimensions.empty() ) //local variable - { - WriteDimensionRecord( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, localDimensions, 16 ); - WriteDimensionRecord( buffer.m_Data, buffer.m_DataPosition, localDimensions, 18, true ); //not using memberID for now - - dataCharacteristicsCountPosition = buffer.m_DataPosition; //very important to track as writer is going back to this position - buffer.m_DataPosition += 5; //skip characteristics count(1) + length (4) - - //dimensions in data characteristic entry - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &characteristicID, 1 ); - const std::int16_t lengthOfDimensionsCharacteristic = 24 * dimensions; // 24 = 3 local, global, global offset x 8 bytes/each - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &lengthOfDimensionsCharacteristic, 2 ); - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &dimensions, 1 ); - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &dimensionsLength, 2 ); - WriteDimensionRecord( buffer.m_Data, buffer.m_DataPosition, localDimensions, 16 ); - } - else //global variable - { - const std::vector<std::size_t>& globalDimensions = variable.m_GlobalDimensions; - const std::vector<std::size_t>& globalOffsets = variable.m_GlobalOffsets; - - WriteDimensionRecord( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, localDimensions, globalDimensions, globalOffsets ); - WriteDimensionRecord( buffer.m_Data, buffer.m_DataPosition, localDimensions, globalDimensions, globalOffsets, true ); - - dataCharacteristicsCountPosition = buffer.m_DataPosition; //very important, going back to these positions - buffer.m_DataPosition += 5; //skip characteristics count(1) + length (4) - - //dimensions in data characteristic entry - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &characteristicID, 1 ); //id - const std::int16_t lengthOfDimensionsCharacteristic = 24 * dimensions; // 24 = 3 local, global, global offset x 8 bytes/each - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &lengthOfDimensionsCharacteristic, 2 ); - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &dimensions, 1 ); - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &dimensionsLength, 2 ); - WriteDimensionRecord( buffer.m_Data, buffer.m_DataPosition, localDimensions, globalDimensions, globalOffsets ); - } - ++characteristicsCounter; - - //VALUE for SCALAR or STAT min, max for ARRAY - //Value for scalar - if( variable.m_IsScalar ) //scalar //just doing string scalars for now (by name), needs to be modified when user passes value - { - characteristicID = characteristic_value; - MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, &characteristicID, 1 ); - MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, variable.m_AppValues, sizeof(T) ); - - //data - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &characteristicID, 1 ); - const std::uint16_t lengthOfValue = sizeof( T ); - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &lengthOfValue, 2 ); //add length of characteristic in data - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, variable.m_AppValues, sizeof(T) ); - - ++characteristicsCounter; - } - else // Stat -> Min, Max for arrays, - { - if( m_Verbosity == 0 ) //default verbose - { - WriteMinMax( variable, buffer, metadataSet ); - characteristicsCounter += 2; - } - } - - //Characteristic time index in metadata and data - characteristicID = characteristic_time_index; - MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, &characteristicID, 1 ); - MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, &metadataSet.TimeStep, 4 ); - - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &characteristicID, 1 ); - const std::uint16_t lengthOfTimeIndex = 4; - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &lengthOfTimeIndex, 2 ); //add length of characteristic in data - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &metadataSet.TimeStep, 4 ); - ++characteristicsCounter; - - //Back to characteristics count and length in Data - //count - std::memcpy( &buffer.m_Data[dataCharacteristicsCountPosition], &characteristicsCounter, 1 ); - //length - const std::uint32_t dataCharacteristicsLength = buffer.m_DataPosition - dataCharacteristicsCountPosition - 4 - 1; //remove its own length (4 bytes) + characteristic counter ( 1 byte ) - std::memcpy( &buffer.m_Data[dataCharacteristicsCountPosition+1], &dataCharacteristicsLength, 4 ); - - //Metadata only: Offsets should be last, they come from data absolute positions - characteristicID = characteristic_offset; - MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, &characteristicID, 1 ); //variable offset id - MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, &buffer.m_DataAbsolutePosition, 8 ); //variable offset - ++characteristicsCounter; - - //Back to length of var including payload size in data + update absolute position - const std::uint64_t varLength = buffer.m_DataPosition - dataVarLengthPosition + variable.PayLoadSize() - 8; //remove its own size - std::memcpy( &buffer.m_Data[dataVarLengthPosition], &varLength, 8 ); - buffer.m_DataAbsolutePosition += buffer.m_DataPosition - dataVarLengthPosition; //payload offset - - characteristicID = characteristic_payload_offset; - MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, &characteristicID, 1 ); //variable payload offset id - MemcpyToBuffer( metadataSet.VarsIndex, metadataSet.VarsIndexPosition, &buffer.m_DataAbsolutePosition, 8 ); //variable payload offset - ++characteristicsCounter; - - //Back to writing characteristics count and length in Metadata - //count - std::memcpy( &metadataSet.VarsIndex[metadataCharacteristicsCountPosition], &characteristicsCounter, 1 ); - //length - const std::uint32_t metadataCharacteristicsLength = metadataSet.VarsIndexPosition - metadataCharacteristicsCountPosition - 4 - 1; //remove its own size and characteristic counter size - std::memcpy( &metadataSet.VarsIndex[metadataCharacteristicsCountPosition+1], &metadataCharacteristicsLength, 4 ); - - //Back to writing var entry length in Metadata - const std::uint32_t metadataVarEntryLength = metadataSet.VarsIndexPosition - metadataVarLengthPosition - 4; //remove its own size - std::memcpy( &metadataSet.VarsIndex[metadataVarLengthPosition], &metadataVarEntryLength, 4 ); + Stats<T> stats = GetStats( variable ); + WriteVariableMetadataCommon( variable, stats, heap, metadataSet ); + } - ++metadataSet.VarsCount; - ++metadataSet.DataPGVarsCount; + /** + * Overloaded version for std::complex<T> variables + * @param variable + * @param heap + * @param metadataSet + */ + template<class T> + void WriteVariableMetadata( const Variable<std::complex<T>>& variable, capsule::STLVector& heap, BP1MetadataSet& metadataSet ) const noexcept + { + Stats<T> stats = GetStats( variable ); + WriteVariableMetadataCommon( variable, stats, heap, metadataSet ); } @@ -326,14 +139,11 @@ public: * @param buffer */ template< class T > - void WriteVariablePayload( const Variable<T>& variable, capsule::STLVector& buffer, const unsigned int nthreads = 1 ) const noexcept + void WriteVariablePayload( const Variable<T>& variable, capsule::STLVector& heap, const unsigned int nthreads = 1 ) const noexcept { - std::size_t payloadSize = variable.PayLoadSize(); //not using const due to memcpy inside Memcpythreads //EXPENSIVE part, might want to use threads if large, serial for now - MemcpyThreads( &buffer.m_Data[buffer.m_DataPosition], variable.m_AppValues, payloadSize, nthreads ); - //update indices - buffer.m_DataPosition += payloadSize; - buffer.m_DataAbsolutePosition += payloadSize; + CopyToBuffer( heap.m_Data, variable.m_AppValues, variable.TotalSize() ); + heap.m_DataAbsolutePosition += variable.PayLoadSize(); } @@ -342,12 +152,12 @@ public: /** * Function that sets metadata (if first close) and writes to a single transport * @param metadataSet current rank metadata set - * @param buffer contains data + * @param heap contains data buffer * @param transport does a write after data and metadata is setup * @param isFirstClose true: metadata has been set and aggregated * @param doAggregation true: for N-to-M, false: for N-to-N */ - void Close( BP1MetadataSet& metadataSet, capsule::STLVector& buffer, Transport& transport, bool& isFirstClose, + void Close( BP1MetadataSet& metadataSet, capsule::STLVector& heap, Transport& transport, bool& isFirstClose, const bool doAggregation ) const noexcept; @@ -363,107 +173,259 @@ public: private: - /** - * Writes name record using a - * @param name to be written - * @param length number of characters in name - * @param buffers to be written - * @param positions to be moved - */ - void WriteNameRecord( const std::string name, const std::uint16_t length, - std::vector<char>& buffer, std::size_t& position ) const noexcept; + template< class T, class U > + void WriteVariableMetadataCommon( const Variable<T>& variable, Stats<U>& stats, + capsule::STLVector& heap, BP1MetadataSet& metadataSet ) const noexcept + { + stats.TimeIndex = metadataSet.TimeStep; + + //Get new Index or point to existing index + bool isNew = true; //flag to check if variable is new + BP1Index& varIndex = GetBP1Index( variable.m_Name, metadataSet.VarsIndices, isNew ); + stats.MemberID = varIndex.MemberID; + + //write metadata header in data and extract offsets + stats.Offset = heap.m_DataAbsolutePosition; + WriteVariableMetadataInData( variable, stats, heap ); + stats.PayloadOffset = heap.m_DataAbsolutePosition; + + //write to metadata index + WriteVariableMetadataInIndex( variable, stats, isNew, varIndex ); + + ++metadataSet.DataPGVarsCount; + } + + + template< class T, class U > + void WriteVariableMetadataInData( const Variable<T>& variable, const Stats<U>& stats, + capsule::STLVector& heap ) const noexcept + { + auto& buffer = heap.m_Data; + + const std::size_t varLengthPosition = buffer.size(); //capture initial position for variable length + buffer.insert( buffer.end(), 8, 0 ); //skip var length (8) + CopyToBuffer( buffer, &stats.MemberID ); //memberID + WriteNameRecord( variable.m_Name, buffer ); //variable name + buffer.insert( buffer.end(), 2, 0 ); //skip path + const std::uint8_t dataType = GetDataType<T>(); //dataType + CopyToBuffer( buffer, &dataType ); + constexpr char no = 'n'; //isDimension + CopyToBuffer( buffer, &no ); + + //write variable dimensions + const std::uint8_t dimensions = variable.m_Dimensions.size(); + CopyToBuffer( buffer, &dimensions ); //count + std::uint16_t dimensionsLength = 27 * dimensions; //27 is from 9 bytes for each: var y/n + local, var y/n + global dimension, var y/n + global offset, changed for characteristic + CopyToBuffer( buffer, &dimensionsLength ); //length + WriteDimensionsRecord( buffer, variable.m_Dimensions, variable.m_GlobalDimensions, variable.m_GlobalOffsets, 18, true ); + + //CHARACTERISTICS + WriteVariableCharacteristics( variable, stats, buffer, true ); + + //Back to varLength including payload size + const std::uint64_t varLength = buffer.size() - varLengthPosition + variable.PayLoadSize() - 8; //remove its own size + CopyToBuffer( buffer, varLengthPosition, &varLength ); //length + + heap.m_DataAbsolutePosition += buffer.size() - varLengthPosition; // update absolute position to be used as payload position + } + + + template< class T, class U> + void WriteVariableMetadataInIndex( const Variable<T>& variable, const Stats<U>& stats, + const bool isNew, BP1Index& index ) const noexcept + { + auto& buffer = index.Buffer; + + if( isNew == true ) //write variable header (might be shared with attributes index) + { + buffer.insert( buffer.end(), 4, 0 ); //skip var length (4) + CopyToBuffer( buffer, &stats.MemberID ); + buffer.insert( buffer.end(), 2, 0 ); //skip group name + WriteNameRecord( variable.m_Name, buffer ); + buffer.insert( buffer.end(), 2, 0 ); //skip path + + const std::uint8_t dataType = GetDataType<T>(); + CopyToBuffer( buffer, &dataType ); + + //Characteristics Sets Count in Metadata + index.Count = 1; + CopyToBuffer( buffer, &index.Count ); + } + else //update characteristics sets count + { + const std::size_t characteristicsSetsCountPosition = 15 + variable.m_Name.size(); + ++index.Count; + CopyToBuffer( buffer, characteristicsSetsCountPosition, &index.Count ); //test + } + + WriteVariableCharacteristics( variable, stats, buffer ); + } + + + template<class T, class U> + void WriteVariableCharacteristics( const Variable<T>& variable, const Stats<U>& stats, std::vector<char>& buffer, + const bool addLength = false ) const noexcept + { + const std::size_t characteristicsCountPosition = buffer.size(); //very important to track as writer is going back to this position + buffer.insert( buffer.end(), 5, 0 ); //skip characteristics count(1) + length (4) + std::uint8_t characteristicsCounter = 0; + + //DIMENSIONS + std::uint8_t characteristicID = characteristic_dimensions; + CopyToBuffer( buffer, &characteristicID ); + const std::uint8_t dimensions = variable.m_Dimensions.size(); + + if( addLength == true ) + { + const std::int16_t lengthOfDimensionsCharacteristic = 24 * dimensions + 3; // 24 = 3 local, global, global offset x 8 bytes/each + CopyToBuffer( buffer, &lengthOfDimensionsCharacteristic ); + } + + CopyToBuffer( buffer, &dimensions ); //count + const std::uint16_t dimensionsLength = 24 * dimensions; + CopyToBuffer( buffer, &dimensionsLength ); //length + WriteDimensionsRecord( buffer, variable.m_Dimensions, variable.m_GlobalDimensions, variable.m_GlobalOffsets, 16, addLength ); + ++characteristicsCounter; + + //VALUE for SCALAR or STAT min, max for ARRAY + WriteBoundsRecord( variable.m_IsScalar, stats, buffer, characteristicsCounter, addLength ); + //TIME INDEX + WriteCharacteristicRecord( characteristic_time_index, stats.TimeIndex, buffer, characteristicsCounter, addLength ); + + if( addLength == false )//only in metadata offset and payload offset + { + WriteCharacteristicRecord( characteristic_offset, stats.Offset, buffer, characteristicsCounter ); + WriteCharacteristicRecord( characteristic_payload_offset, stats.PayloadOffset, buffer, characteristicsCounter ); + } + //END OF CHARACTERISTICS + + //Back to characteristics count and length + CopyToBuffer( buffer, characteristicsCountPosition, &characteristicsCounter ); //count (1) + const std::uint32_t characteristicsLength = buffer.size() - characteristicsCountPosition - 4 - 1; //remove its own length (4 bytes) + characteristic counter ( 1 byte ) + CopyToBuffer( buffer, characteristicsCountPosition+1, &characteristicsLength ); //length + } /** - * Write a dimension record for a global variable used by WriteVariableCommon + * Writes from &buffer[position]: [2 bytes:string.length()][string.length(): string.c_str()] + * @param name * @param buffer * @param position - * @param localDimensions - * @param globalDimensions - * @param globalOffsets - * @param addType true: for data buffers, false: for metadata buffer and data characteristic */ - void WriteDimensionRecord( std::vector<char>& buffer, std::size_t& position, - const std::vector<std::size_t>& localDimensions, - const std::vector<std::size_t>& globalDimensions, - const std::vector<std::size_t>& globalOffsets, - const bool addType = false ) const noexcept; + void WriteNameRecord( const std::string name, std::vector<char>& buffer ) const noexcept; + /** - * Write a dimension record for a local variable used by WriteVariableCommon + * Write a dimension record for a global variable used by WriteVariableCommon * @param buffer * @param position * @param localDimensions - * @param skip + * @param globalDimensions + * @param globalOffsets * @param addType true: for data buffers, false: for metadata buffer and data characteristic */ - void WriteDimensionRecord( std::vector<char>& buffer, std::size_t& position, - const std::vector<std::size_t>& localDimensions, - const unsigned int skip, - const bool addType = false ) const noexcept; + void WriteDimensionsRecord( std::vector<char>& buffer, + const std::vector<std::size_t>& localDimensions, + const std::vector<std::size_t>& globalDimensions, + const std::vector<std::size_t>& globalOffsets, + const unsigned int skip, + const bool addType = false ) const noexcept; /** - * Function that writes min and max into data and metadata, called from WriteVariableIndex common. - * Will be specialized for complex types, this is the version for primitive types + * GetStats for primitive types except std::complex<T> types * @param variable - * @param dataBuffers - * @param dataPositions - * @param metadataBuffers - * @param metadataPositions + * @return stats */ - template<class T> inline - void WriteMinMax( const Variable<T>& variable, capsule::STLVector& buffer, BP1MetadataSet& metadataSet ) const noexcept - { - T min, max; - const std::size_t valuesSize = variable.TotalSize(); - if( valuesSize >= 10000000 ) //ten million? this needs actual results //here we can make decisions for threads based on valuesSize - GetMinMax( variable.m_AppValues, valuesSize, min, max, m_Threads ); //here we can add threads from constructor - else - GetMinMax( variable.m_AppValues, valuesSize, min, max ); - - WriteMinMaxValues( min, max, buffer, metadataSet ); - } - + template<class T> + Stats<T> GetStats( const Variable<T>& variable ) const noexcept + { + Stats<T> stats; + const std::size_t valuesSize = variable.TotalSize(); + + if( m_Verbosity == 0 ) + { + if( valuesSize >= 10000000 ) //ten million? this needs actual results //here we can make decisions for threads based on valuesSize + GetMinMax( variable.m_AppValues, valuesSize, stats.Min, stats.Max, m_Threads ); //here we can add cores from constructor + else + GetMinMax( variable.m_AppValues, valuesSize, stats.Min, stats.Max ); + } + return stats; + } /** - * Common part of WriteMinMax specialized templates. Writes to buffers after min and max are calculated. + * GetStats for std::complex<T> types + * @param variable + * @return stats */ template<class T> - void WriteMinMaxValues( const T min, const T max, capsule::STLVector& buffer, BP1MetadataSet& metadataSet ) const noexcept + Stats<T> GetStats( const Variable<std::complex<T>>& variable ) const noexcept + { + Stats<T> stats; + const std::size_t valuesSize = variable.TotalSize(); + + if( m_Verbosity == 0 ) + { + if( valuesSize >= 10000000 ) //ten million? this needs actual results //here we can make decisions for threads based on valuesSize + GetMinMax( variable.m_AppValues, valuesSize, stats.Min, stats.Max, m_Threads ); //here we can add cores from constructor + else + GetMinMax( variable.m_AppValues, valuesSize, stats.Min, stats.Max ); + } + return stats; + } + + template< class T > + void WriteBoundsRecord( const bool isScalar, const Stats<T>& stats, std::vector<char>& buffer, + std::uint8_t& characteristicsCounter, const bool addLength ) const noexcept { - constexpr std::int8_t characteristicMinID = characteristic_min; - constexpr std::int8_t characteristicMaxID = characteristic_max; + if( isScalar == true ) + { + WriteCharacteristicRecord( characteristic_value, stats.Min, buffer, characteristicsCounter, addLength ); //stats.min = stats.max = value + return; + } - WriteValueRecord( characteristicMinID, min, metadataSet.VarsIndex, metadataSet.VarsIndexPosition ); - WriteValueRecord( characteristicMaxID, max, metadataSet.VarsIndex, metadataSet.VarsIndexPosition ); - WriteValueRecord( characteristicMinID, min, buffer.m_Data, buffer.m_DataPosition, true ); //true: addLength in between for data - WriteValueRecord( characteristicMaxID, max, buffer.m_Data, buffer.m_DataPosition, true ); //true: addLength in between for data + if( m_Verbosity == 0 ) //default verbose + { + WriteCharacteristicRecord( characteristic_min, stats.Min, buffer, characteristicsCounter, addLength ); + WriteCharacteristicRecord( characteristic_max, stats.Max, buffer, characteristicsCounter, addLength ); + } } - /** - * Write a statistics record to buffer + * Write a characteristic value record to buffer * @param id * @param value * @param buffers * @param positions + * @param characvteristicsCounter to be updated by 1 * @param addLength true for data, false for metadata */ template<class T> - void WriteValueRecord( const std::uint8_t& characteristicID, const T& value, - std::vector<char>& buffer, std::size_t& position, - const bool addLength = false ) const noexcept + void WriteCharacteristicRecord( const std::uint8_t characteristicID, const T& value, + std::vector<char>& buffer, std::uint8_t& characteristicsCounter, + const bool addLength = false ) const noexcept { - MemcpyToBuffer( buffer, position, &characteristicID, 1 ); + const std::uint8_t id = characteristicID; + CopyToBuffer( buffer, &id ); if( addLength == true ) { - const std::uint16_t lengthCharacteristic = 1 + sizeof( T ); //id - MemcpyToBuffer( buffer, position, &lengthCharacteristic, 2 ); + const std::uint16_t lengthOfCharacteristic = sizeof( T ); //id + CopyToBuffer( buffer, &lengthOfCharacteristic ); } - MemcpyToBuffer( buffer, position, &value, sizeof(T) ); + CopyToBuffer( buffer, &value ); + ++characteristicsCounter; } + /** + * Returns corresponding index of type BP1Index, if doesn't exists creates a new one. + * Used for variables and attributes + * @param name variable or attribute name to look for index + * @param indices look up hash table of indices + * @param isNew true: index is newly created, false: index already exists in indices + * @return reference to BP1Index in indices + */ + BP1Index& GetBP1Index( const std::string name, std::unordered_map<std::string, BP1Index>& indices, bool& isNew ) const noexcept; + /** * Flattens the data and fills the pg length, vars count, vars length and attributes * @param metadataSet @@ -481,63 +443,6 @@ private: }; - -/** - * Specialized version of WriteMinMax for std::complex<float> - * @param variable - * @param dataBuffers - * @param dataPositions - * @param metadataBuffers - * @param metadataPositions - */ -template<> inline -void BP1Writer::WriteMinMax<std::complex<float>>( const Variable<std::complex<float>>& variable, capsule::STLVector& buffer, - BP1MetadataSet& metadataSet ) const noexcept -{ - float min, max; - const std::size_t valuesSize = variable.TotalSize(); - if( valuesSize >= 10000000 ) //ten million? this needs actual results //here we can make decisions for threads based on valuesSize - GetMinMax( variable.m_AppValues, valuesSize, min, max, m_Threads ); //here we can add threads from constructor - else - GetMinMax( variable.m_AppValues, valuesSize, min, max ); - - WriteMinMaxValues( min, max, buffer, metadataSet ); -} - - -template<> inline -void BP1Writer::WriteMinMax<std::complex<double>>( const Variable<std::complex<double>>& variable, capsule::STLVector& buffer, - BP1MetadataSet& metadataSet ) const noexcept -{ - double min, max; - const std::size_t valuesSize = variable.TotalSize(); - if( valuesSize >= 10000000 ) //ten million? this needs actual results //here we can make decisions for threads based on valuesSize - GetMinMax( variable.m_AppValues, valuesSize, min, max, m_Threads ); //here we can add threads from constructor - else - GetMinMax( variable.m_AppValues, valuesSize, min, max ); - - WriteMinMaxValues( min, max, buffer, metadataSet ); -} - - -template<> inline -void BP1Writer::WriteMinMax<std::complex<long double>>( const Variable<std::complex<long double>>& variable, - capsule::STLVector& buffer, - BP1MetadataSet& metadataSet ) const noexcept -{ - long double min, max; - const std::size_t valuesSize = variable.TotalSize(); - if( valuesSize >= 10000000 ) //ten million? this needs actual results //here we can make decisions for threads based on valuesSize - GetMinMax( variable.m_AppValues, valuesSize, min, max, m_Threads ); //here we can add threads from constructor - else - GetMinMax( variable.m_AppValues, valuesSize, min, max ); - - WriteMinMaxValues( min, max, buffer, metadataSet ); -} - - - - } //end namespace format } //end namespace adios diff --git a/include/functions/adiosFunctions.h b/include/functions/adiosFunctions.h index 2225edec74a30ab63cab06954f0844dc3d95d6c6..46598ff855d57abf6cd24d428697a217e126db23 100644 --- a/include/functions/adiosFunctions.h +++ b/include/functions/adiosFunctions.h @@ -161,29 +161,23 @@ std::vector<int> CSVToVectorInt( const std::string csv ); /** * Common strategy to check for heap buffer allocation for data and metadata typically calculated in Write - * @param indexSize metadata index size for a variable - * @param payloadSize variable payload size from application + * @param newSize new data size * @param growthFactor user provided growth factor for index and data memory buffers ( default = 1.5 ) * @param maxBufferSize user provided maximum buffer size - * @param indexPosition - * @param indexBuffer - * @param buffer heap capsule containing data buffer for payload + * @param buffer to be reallocated * @return true: must do a transport flush, false: buffer sizes are enough to contain incoming data, no need for transport flush */ -bool CheckBuffersAllocation( const std::size_t indexSize, const std::size_t payloadSize, - const float growthFactor, const std::size_t maxBufferSize, - const std::size_t indexPosition, std::vector<char>& indexBuffer, - const std::size_t dataPosition, std::vector<char>& dataBuffer ); +bool CheckBufferAllocation( const std::size_t newSize, const float growthFactor, const std::size_t maxBufferSize, + std::vector<char>& buffer ); /** * Grows a buffer by a factor of n . growthFactor . currentCapacity to accommodate for incomingDataSize * @param incomingDataSize size of new data required to be stored in buffer * @param growthFactor buffer grows in multiples of the growth buffer - * @param currentPosition current buffer position, needed to get available space = currentCapacity - currentPosition * @param buffer to be resized * @return -1: failed to allocate (bad_alloc), 0: didn't have to allocate (enough space), 1: successful allocation */ -int GrowBuffer( const std::size_t incomingDataSize, const float growthFactor, const std::size_t currentPosition, +int GrowBuffer( const std::size_t incomingDataSize, const float growthFactor, std::vector<char>& buffer ); diff --git a/include/functions/adiosTemplates.h b/include/functions/adiosTemplates.h index 78285d949534e2251c99e5349e4a028aa95f6c04..8d373033679a98521e05f8fd6908a788e044240d 100644 --- a/include/functions/adiosTemplates.h +++ b/include/functions/adiosTemplates.h @@ -181,6 +181,43 @@ void MemcpyToBuffer( std::vector<char>& raw, std::size_t& position, const T* sou } + +/** + * Version that pushed to the end of the buffer, updates vec.size() automatically + * @param raw + * @param source using pointer notation + * @param elements + */ +template<class T> +void CopyToBuffer( std::vector<char>& buffer, const T* source, const std::size_t elements = 1 ) noexcept +{ + const char* src = reinterpret_cast<const char*>( source ); + buffer.insert( buffer.end(), src, src + elements*sizeof(T) ); +} + +/** + * Overloaded version to copies data to a specific location in the buffer, doesn't update vec.size() + * @param raw + * @param position + * @param source + * @param elements + */ +template<class T> +void CopyToBuffer( std::vector<char>& buffer, const std::size_t position, const T* source, const std::size_t elements = 1 ) noexcept +{ + const char* src = reinterpret_cast<const char*>( source ); + std::copy( src, src + elements*sizeof(T), buffer.begin() + position ); +} + + +template<class T> +void CopyFromBuffer( T* destination, std::size_t elements, const std::vector<char>& raw, std::size_t& position ) noexcept +{ + std::copy( raw.begin() + position, raw.begin() + position + sizeof(T)*elements, reinterpret_cast<char*>(destination) ); + position += elements*sizeof(T); +} + + template< class T > void PrintValues( const std::string name, const char* buffer, const std::size_t position, const std::size_t elements ) { diff --git a/src/capsule/heap/STLVector.cpp b/src/capsule/heap/STLVector.cpp index 92c42702c47e3a2e3e7ddfacb6c0c9dbb9ea323a..dafae7a7b4adba29c3b2995534b73b8058cbd0ae 100644 --- a/src/capsule/heap/STLVector.cpp +++ b/src/capsule/heap/STLVector.cpp @@ -20,9 +20,10 @@ namespace capsule STLVector::STLVector( const std::string accessMode, const int rankMPI, const bool debugMode ): - Capsule( "Heap", accessMode, rankMPI, debugMode ), - m_Data( 16777216, '\0' ) //default capacity = 16Mb -{ } + Capsule( "Heap", accessMode, rankMPI, debugMode ) +{ + m_Data.reserve( 16777216 ); +} STLVector::~STLVector( ) diff --git a/src/engine/bp/BPFileWriter.cpp b/src/engine/bp/BPFileWriter.cpp index 85c86c985d76956fb63e0174348143311c8d5e50..775fbe44a91d9f679a770b091d12c243dcbdb7d5 100644 --- a/src/engine/bp/BPFileWriter.cpp +++ b/src/engine/bp/BPFileWriter.cpp @@ -25,6 +25,7 @@ BPFileWriter::BPFileWriter( ADIOS& adios, const std::string name, const std::str m_BP1Aggregator{ format::BP1Aggregator( m_MPIComm, debugMode ) }, m_MaxBufferSize{ m_Buffer.m_Data.max_size() } { + m_MetadataSet.TimeStep = 1; //starting at one to be compatible with ADIOS1.x Init( ); } @@ -145,7 +146,7 @@ void BPFileWriter::Write( const std::string variableName, const void* values ) / { } -void BPFileWriter::Advance( ) +void BPFileWriter::Advance( float timeout_sec ) { m_BP1Writer.Advance( m_MetadataSet, m_Buffer ); } @@ -375,20 +376,20 @@ void BPFileWriter::InitProcessGroup( ) void BPFileWriter::WriteProcessGroupIndex( ) { //pg = process group - const std::size_t pgIndexSize = m_BP1Writer.GetProcessGroupIndexSize( std::to_string( m_RankMPI ), - std::to_string( m_MetadataSet.TimeStep ), - m_Transports.size() ); +// const std::size_t pgIndexSize = m_BP1Writer.GetProcessGroupIndexSize( std::to_string( m_RankMPI ), +// std::to_string( m_MetadataSet.TimeStep ), +// m_Transports.size() ); //metadata - GrowBuffer( pgIndexSize, m_GrowthFactor, m_MetadataSet.PGIndexPosition, m_MetadataSet.PGIndex ); + //GrowBuffer( pgIndexSize, m_GrowthFactor, m_MetadataSet.PGIndex ); //data? Need to be careful, maybe add some trailing tolerance in variable ???? - GrowBuffer( pgIndexSize, m_GrowthFactor, m_Buffer.m_DataPosition, m_Buffer.m_Data ); + //GrowBuffer( pgIndexSize, m_GrowthFactor, m_Buffer.m_Data ); const bool isFortran = ( m_HostLanguage == "Fortran" ) ? true : false; - m_BP1Writer.WriteProcessGroupIndex( isFortran, std::to_string( m_RankMPI ), static_cast<unsigned int> ( m_RankMPI ), - std::to_string( m_MetadataSet.TimeStep ), m_MetadataSet.TimeStep, m_Transports, - m_Buffer, m_MetadataSet ); + m_BP1Writer.WriteProcessGroupIndex( isFortran, std::to_string( m_RankMPI ), static_cast<std::uint32_t>( m_RankMPI ), + m_Transports, m_Buffer, m_MetadataSet ); + } diff --git a/src/format/BP1.cpp b/src/format/BP1.cpp index ddf74bb83bdc6a48e9a1dbbb6faffc677cadfc11..eb70ff6d22c286db96107dcb3c04fd17f31bd0fd 100644 --- a/src/format/BP1.cpp +++ b/src/format/BP1.cpp @@ -39,9 +39,9 @@ void BP1::OpenRankFiles( const std::string name, const std::string accessMode, T } -std::vector<int> BP1::GetMethodIDs( const std::vector< std::shared_ptr<Transport> >& transports ) const noexcept +std::vector<std::uint8_t> BP1::GetMethodIDs( const std::vector< std::shared_ptr<Transport> >& transports ) const noexcept { - auto lf_GetMethodID = []( const std::string method ) -> int + auto lf_GetMethodID = []( const std::string method ) -> std::uint8_t { int id = METHOD_UNKNOWN; if( method == "NULL" ) id = METHOD_NULL; @@ -53,13 +53,11 @@ std::vector<int> BP1::GetMethodIDs( const std::vector< std::shared_ptr<Transport return id; }; - std::vector<int> methodIDs; + std::vector<std::uint8_t> methodIDs; methodIDs.reserve( transports.size() ); - for( const auto transport : transports ) - { + for( const auto& transport : transports ) methodIDs.push_back( lf_GetMethodID( transport->m_Type ) ); - } return methodIDs; } diff --git a/src/format/BP1Writer.cpp b/src/format/BP1Writer.cpp index bd7ed78eb7bc4faa5b0dfc3479b7b051770cd192..a441e6392535913885223b1f615abf1dbfe1a7fa 100644 --- a/src/format/BP1Writer.cpp +++ b/src/format/BP1Writer.cpp @@ -7,8 +7,6 @@ /// \cond EXCLUDE_FROM_DOXYGEN #include <string> -#include <iostream> -#include <unistd.h> //sleep must be removed /// \endcond #include "format/BP1Writer.h" @@ -24,80 +22,80 @@ namespace format std::size_t BP1Writer::GetProcessGroupIndexSize( const std::string name, const std::string timeStepName, - const size_t numberOfTransports ) const noexcept + const std::size_t numberOfTransports ) const noexcept { //pgIndex + list of methods (transports) return ( name.length() + timeStepName.length() + 23 ) + ( 3 + numberOfTransports ); //should be sufficient for data and metadata pgindices } -void BP1Writer::WriteProcessGroupIndex( const bool isFortran, const std::string name, const unsigned int processID, - const std::string timeStepName, const unsigned int timeStep, +void BP1Writer::WriteProcessGroupIndex( const bool isFortran, const std::string name, const std::uint32_t processID, const std::vector< std::shared_ptr<Transport> >& transports, - capsule::STLVector& buffer, BP1MetadataSet& metadataSet ) const noexcept + capsule::STLVector& heap, BP1MetadataSet& metadataSet ) const noexcept { - // adapt this part to local variables - const std::vector<int> methodIDs = GetMethodIDs( transports ); + std::vector<char>& metadataBuffer = metadataSet.PGIndex.Buffer; + std::vector<char>& dataBuffer = heap.m_Data; - metadataSet.DataPGLengthPosition = buffer.m_DataPosition; - const std::size_t metadataPGLengthPosition = metadataSet.PGIndexPosition; + metadataSet.DataPGLengthPosition = dataBuffer.size(); + dataBuffer.insert( dataBuffer.end(), 8, 0 ); //skip pg length (8) - metadataSet.PGIndexPosition += 2; //skip length of pg in metadata, 2 bytes, would write at the end - buffer.m_DataPosition += 8; //skip length of pg in data, 8 bytes, would write at the end + const std::size_t metadataPGLengthPosition = metadataBuffer.size(); + metadataBuffer.insert( metadataBuffer.end(), 2, 0 ); //skip pg length (2) //write name to metadata - const std::uint16_t lengthOfName = name.length(); - WriteNameRecord( name, lengthOfName, metadataSet.PGIndex, metadataSet.PGIndexPosition ); - + WriteNameRecord( name, metadataBuffer ); //write if host language Fortran in metadata and data const char hostFortran = ( isFortran ) ? 'y' : 'n'; //if host language is fortran - MemcpyToBuffer( metadataSet.PGIndex, metadataSet.PGIndexPosition, &hostFortran, 1 ); - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &hostFortran, 1 ); - - //name in data - WriteNameRecord( name, lengthOfName, buffer.m_Data, buffer.m_DataPosition ); + CopyToBuffer( metadataBuffer, &hostFortran ); + CopyToBuffer( dataBuffer, &hostFortran ); + //write name in data + WriteNameRecord( name, dataBuffer ); - //processID, - MemcpyToBuffer( metadataSet.PGIndex, metadataSet.PGIndexPosition, &processID, 4 ); + //processID in metadata, + CopyToBuffer( metadataBuffer, &processID ); //skip coordination var in data ....what is coordination var? - buffer.m_DataPosition += 4; + dataBuffer.insert( dataBuffer.end(), 4, 0 ); //time step name to metadata and data - const std::uint16_t lengthOfTimeStep = timeStepName.length(); - WriteNameRecord( timeStepName, lengthOfTimeStep, metadataSet.PGIndex, metadataSet.PGIndexPosition ); - WriteNameRecord( timeStepName, lengthOfTimeStep, buffer.m_Data, buffer.m_DataPosition ); + const std::string timeStepName( std::to_string( metadataSet.TimeStep ) ); + WriteNameRecord( timeStepName, metadataBuffer ); + WriteNameRecord( timeStepName, dataBuffer ); //time step to metadata and data - MemcpyToBuffer( metadataSet.PGIndex, metadataSet.PGIndexPosition, &timeStep, 4 ); - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &timeStep, 4 ); + CopyToBuffer( metadataBuffer, &metadataSet.TimeStep ); + CopyToBuffer( dataBuffer, &metadataSet.TimeStep ); //offset to pg in data in metadata which is the current absolute position - MemcpyToBuffer( metadataSet.PGIndex, metadataSet.PGIndexPosition, &buffer.m_DataAbsolutePosition, 8 ); + CopyToBuffer( metadataBuffer, reinterpret_cast<std::uint64_t*>( &heap.m_DataAbsolutePosition ) ); //Back to writing metadata pg index length (length of group) - const std::uint16_t metadataPGIndexLength = metadataSet.PGIndexPosition - metadataPGLengthPosition - 2; //without length of group record - std::memcpy( &metadataSet.PGIndex[metadataPGLengthPosition], &metadataPGIndexLength, 2 ); + const std::uint16_t metadataPGIndexLength = metadataBuffer.size() - metadataPGLengthPosition - 2; //without length of group record + CopyToBuffer( metadataBuffer, metadataPGLengthPosition, &metadataPGIndexLength ); + //DONE With metadataBuffer //here write method in data - const std::uint8_t methodsSize = methodIDs.size(); - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &methodsSize, 1 ); //method count - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &methodsSize, 2 ); //method length, assume one byte for methodID for now + const std::vector<std::uint8_t> methodIDs = GetMethodIDs( transports ); + const std::uint8_t methodsCount = methodIDs.size(); + CopyToBuffer( dataBuffer, &methodsCount ); //count + const std::uint16_t methodsLength = methodIDs.size() * 3; //methodID (1) + method params length(2), no parameters for now + CopyToBuffer( dataBuffer, &methodsLength );//length - for( auto& methodID : methodIDs ) + for( const auto methodID : methodIDs ) { - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &methodID, 1 ); //method ID, unknown for now - buffer.m_DataPosition += 2; //skip method params length = 0 (2 bytes) for now + CopyToBuffer( dataBuffer, &methodID ); //method ID, + dataBuffer.insert( dataBuffer.end(), 2, 0 ); //skip method params length = 0 (2 bytes) for now } - buffer.m_DataAbsolutePosition += buffer.m_DataPosition - metadataSet.DataPGLengthPosition; //update absolute position - - metadataSet.DataVarsCountPosition = buffer.m_DataPosition; //update vars count and vars count position - - buffer.m_DataPosition += 12; //add vars count and length - buffer.m_DataAbsolutePosition += 12; //add vars count and length - - ++metadataSet.PGCount; + //update absolute position + heap.m_DataAbsolutePosition += dataBuffer.size() - metadataSet.DataPGLengthPosition; + //pg vars count and position metadataSet.DataPGVarsCount = 0; + metadataSet.DataPGVarsCountPosition = dataBuffer.size(); + //add vars count and length + dataBuffer.insert( dataBuffer.end(), 12, 0 ); + heap.m_DataAbsolutePosition += 12; //add vars count and length + + ++metadataSet.DataPGCount; metadataSet.DataPGIsOpen = true; } @@ -110,7 +108,7 @@ void BP1Writer::Advance( BP1MetadataSet& metadataSet, capsule::STLVector& buffer -void BP1Writer::Close( BP1MetadataSet& metadataSet, capsule::STLVector& buffer, Transport& transport, bool& isFirstClose, +void BP1Writer::Close( BP1MetadataSet& metadataSet, capsule::STLVector& heap, Transport& transport, bool& isFirstClose, const bool doAggregation ) const noexcept { if( metadataSet.Log.m_IsActive == true ) @@ -119,9 +117,9 @@ void BP1Writer::Close( BP1MetadataSet& metadataSet, capsule::STLVector& buffer, if( isFirstClose == true ) { if( metadataSet.DataPGIsOpen == true ) - FlattenData( metadataSet, buffer ); + FlattenData( metadataSet, heap ); - FlattenMetadata( metadataSet, buffer ); + FlattenMetadata( metadataSet, heap ); if( metadataSet.Log.m_IsActive == true ) metadataSet.Log.m_Timers[0].SetInitialTime(); @@ -139,7 +137,7 @@ void BP1Writer::Close( BP1MetadataSet& metadataSet, capsule::STLVector& buffer, } else // N-to-N { - transport.Write( buffer.m_Data.data(), buffer.m_DataPosition ); //single write + transport.Write( heap.m_Data.data(), heap.m_Data.size() ); //single write transport.Close(); } } @@ -150,7 +148,7 @@ std::string BP1Writer::GetRankProfilingLog( const int rank, const BP1MetadataSet { auto lf_WriterTimer = []( std::string& rankLog, const Timer& timer ) { - rankLog += timer.Process + "_" + timer.GetUnits() + "': " + std::to_string( timer.ProcessTime ) + ", "; + rankLog += "'" + timer.Process + "_" + timer.GetUnits() + "': " + std::to_string( timer.ProcessTime ) + ", "; }; //prepare string dictionary per rank @@ -165,160 +163,212 @@ std::string BP1Writer::GetRankProfilingLog( const int rank, const BP1MetadataSet auto& timers = transports[t]->m_Profiler.m_Timers; rankLog += "'transport_" + std::to_string(t) + "': { "; - rankLog += "'lib:' " + transports[t]->m_Type + ", "; + rankLog += "'lib': " + transports[t]->m_Type + ", "; for( unsigned int i = 0; i < 3; ++i ) lf_WriterTimer( rankLog, timers[i] ); - rankLog += " }, "; + rankLog += "}, "; } rankLog += "}, "; - //std::cout << rankLog << "\n"; return rankLog; } //PRIVATE FUNCTIONS -void BP1Writer::WriteNameRecord( const std::string name, const std::uint16_t length, - std::vector<char>& buffer, std::size_t& position ) const noexcept +void BP1Writer::WriteDimensionsRecord( std::vector<char>& buffer, + const std::vector<std::size_t>& localDimensions, + const std::vector<std::size_t>& globalDimensions, + const std::vector<std::size_t>& globalOffsets, + const unsigned int skip, + const bool addType ) const noexcept { - MemcpyToBuffer( buffer, position, &length, 2 ); - MemcpyToBuffer( buffer, position, name.c_str( ), length ); -} - + auto lf_WriteFlaggedDim = []( std::vector<char>& buffer, const char no, + const std::size_t dimension ) + { + CopyToBuffer( buffer, &no ); + CopyToBuffer( buffer, reinterpret_cast<const std::uint64_t*>( &dimension ) ); + }; -void BP1Writer::WriteDimensionRecord( std::vector<char>& buffer, std::size_t& position, - const std::vector<std::size_t>& localDimensions, - const std::vector<std::size_t>& globalDimensions, - const std::vector<std::size_t>& globalOffsets, - const bool addType ) const noexcept -{ - if( addType == true ) + //BODY Starts here + if( globalDimensions.empty() ) { - constexpr char no = 'n'; //dimension format unsigned int value for now - for( unsigned int d = 0; d < localDimensions.size(); ++d ) + if( addType == true ) { - MemcpyToBuffer( buffer, position, &no, 1 ); - MemcpyToBuffer( buffer, position, &localDimensions[d], 8 ); - MemcpyToBuffer( buffer, position, &no, 1 ); - MemcpyToBuffer( buffer, position, &globalDimensions[d], 8 ); - MemcpyToBuffer( buffer, position, &no, 1 ); - MemcpyToBuffer( buffer, position, &globalOffsets[d], 8 ); + constexpr char no = 'n'; //dimension format unsigned int value (not using memberID for now) + for( const auto& localDimension : localDimensions ) + { + lf_WriteFlaggedDim( buffer, no, localDimension ); + buffer.insert( buffer.end(), skip, 0 ); + } + } + else + { + for( const auto& localDimension : localDimensions ) + { + CopyToBuffer( buffer, reinterpret_cast<const std::uint64_t*>( &localDimension ) ); + buffer.insert( buffer.end(), skip, 0 ); + } } } else { - for( unsigned int d = 0; d < localDimensions.size(); ++d ) - { - MemcpyToBuffer( buffer, position, &localDimensions[d], 8 ); - MemcpyToBuffer( buffer, position, &globalDimensions[d], 8 ); - MemcpyToBuffer( buffer, position, &globalOffsets[d], 8 ); - } + if( addType == true ) + { + constexpr char no = 'n'; //dimension format unsigned int value for now + for( unsigned int d = 0; d < localDimensions.size(); ++d ) + { + lf_WriteFlaggedDim( buffer, no, localDimensions[d] ); + lf_WriteFlaggedDim( buffer, no, globalDimensions[d] ); + lf_WriteFlaggedDim( buffer, no, globalOffsets[d] ); + } + } + else + { + for( unsigned int d = 0; d < localDimensions.size(); ++d ) + { + CopyToBuffer( buffer, reinterpret_cast<const std::uint64_t*>( &localDimensions[d] ) ); + CopyToBuffer( buffer, reinterpret_cast<const std::uint64_t*>( &globalDimensions[d] ) ); + CopyToBuffer( buffer, reinterpret_cast<const std::uint64_t*>( &globalOffsets[d] ) ); + } + } } } -void BP1Writer::WriteDimensionRecord( std::vector<char>& buffer, std::size_t& position, - const std::vector<std::size_t>& localDimensions, - const unsigned int skip, - const bool addType ) const noexcept + +void BP1Writer::WriteNameRecord( const std::string name, std::vector<char>& buffer ) const noexcept { - if( addType == true ) - { - constexpr char no = 'n'; //dimension format unsigned int value (not using memberID for now) - for( const auto& localDimension : localDimensions ) - { - MemcpyToBuffer( buffer, position, &no, 1 ); - MemcpyToBuffer( buffer, position, &localDimension, 8 ); - position += skip; - } - } - else + const std::uint16_t length = name.length( ); + CopyToBuffer( buffer, &length ); + CopyToBuffer( buffer, name.c_str(), length ); +} + + + +BP1Index& BP1Writer::GetBP1Index( const std::string name, std::unordered_map<std::string, BP1Index>& indices, + bool& isNew ) const noexcept +{ + auto itName = indices.find( name ); + if( itName == indices.end() ) { - for( const auto& localDimension : localDimensions ) - { - MemcpyToBuffer( buffer, position, &localDimension, 8 ); - position += skip; - } + indices.emplace( name, BP1Index( indices.size() ) ); + isNew = true; + return indices.at( name ); } + + isNew = false; + return itName->second; } -void BP1Writer::FlattenData( BP1MetadataSet& metadataSet, capsule::STLVector& buffer ) const noexcept +void BP1Writer::FlattenData( BP1MetadataSet& metadataSet, capsule::STLVector& heap ) const noexcept { - //vars count and Length - std::memcpy( &buffer.m_Data[metadataSet.DataVarsCountPosition], &metadataSet.DataPGVarsCount, 4 ); //count - const std::uint64_t dataVarsLength = buffer.m_DataPosition - metadataSet.DataVarsCountPosition - 8 - 4; //without record itself and vars count - std::memcpy( &buffer.m_Data[metadataSet.DataVarsCountPosition+4], &dataVarsLength, 8 ); //length + auto& buffer = heap.m_Data; + //vars count and Length (only for PG) + CopyToBuffer( buffer, metadataSet.DataPGVarsCountPosition, &metadataSet.DataPGVarsCount ); + const std::uint64_t varsLength = buffer.size() - metadataSet.DataPGVarsCountPosition - 8 - 4; //without record itself and vars count + CopyToBuffer( buffer, metadataSet.DataPGVarsCountPosition + 4, &varsLength ); //attributes (empty for now) count (4) and length (8) are zero by moving positions in time step zero - buffer.m_DataPosition += 12; - buffer.m_DataAbsolutePosition += 12; + buffer.insert( buffer.end(), 12, 0 ); + heap.m_DataAbsolutePosition += 12; //Finish writing pg group length - const std::uint64_t dataPGLength = buffer.m_DataPosition - metadataSet.DataPGLengthPosition - 8; //without record itself, 12 due to empty attributes - std::memcpy( &buffer.m_Data[metadataSet.DataPGLengthPosition], &dataPGLength, 8 ); + const std::uint64_t dataPGLength = buffer.size() - metadataSet.DataPGLengthPosition - 8; //without record itself, 12 due to empty attributes + CopyToBuffer( buffer, metadataSet.DataPGLengthPosition, &dataPGLength ); ++metadataSet.TimeStep; metadataSet.DataPGIsOpen = false; } -void BP1Writer::FlattenMetadata( BP1MetadataSet& metadataSet, capsule::STLVector& buffer ) const noexcept +void BP1Writer::FlattenMetadata( BP1MetadataSet& metadataSet, capsule::STLVector& heap ) const noexcept { - //Finish writing metadata counts and lengths (IndexPosition) - //pg index - std::memcpy( &metadataSet.PGIndex[0], &metadataSet.PGCount, 8 ); //count - const std::uint64_t pgIndexLength = metadataSet.PGIndexPosition - 16; //without record itself - std::memcpy( &metadataSet.PGIndex[8], &pgIndexLength, 8 ); - //vars index - std::memcpy( &metadataSet.VarsIndex[0], &metadataSet.VarsCount, 4 ); //count - const std::uint64_t varsIndexLength = metadataSet.VarsIndexPosition - 12; //without record itself - std::memcpy( &metadataSet.VarsIndex[4], &varsIndexLength, 8 ); - //attributes index - std::memcpy( &metadataSet.AttributesIndex[0], &metadataSet.AttributesCount, 4 ); //count - const std::uint64_t attributesIndexLength = metadataSet.AttributesIndexPosition - 12; //without record itself - std::memcpy( &metadataSet.AttributesIndex[4], &attributesIndexLength, 8 ); - - const std::size_t metadataSize = metadataSet.PGIndexPosition + metadataSet.VarsIndexPosition + - metadataSet.AttributesIndexPosition + metadataSet.MiniFooterSize; - - buffer.m_Data.resize( buffer.m_DataPosition + metadataSize ); //resize data to fit metadata, must replace with growth buffer strategy - - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, metadataSet.PGIndex.data(), metadataSet.PGIndexPosition ); - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, metadataSet.VarsIndex.data(), metadataSet.VarsIndexPosition ); - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, metadataSet.AttributesIndex.data(), metadataSet.AttributesIndexPosition ); + auto lf_IndexCountLength = []( std::unordered_map<std::string, BP1Index>& indices, + std::uint32_t& count, std::uint64_t& length ) + { + count = indices.size(); + length = 0; + for( auto& indexPair : indices ) //set each index length + { + auto& indexBuffer = indexPair.second.Buffer; + const std::uint32_t indexLength = indexBuffer.size()-4; + CopyToBuffer( indexBuffer, 0, &indexLength ); - //getting absolute offsets, minifooter is 28 bytes for now - const std::uint64_t offsetPGIndex = buffer.m_DataAbsolutePosition; - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &offsetPGIndex, 8 ); + length += indexBuffer.size(); //overall length + } + }; + + auto lf_FlattenIndices = []( const std::uint32_t count, const std::uint64_t length, + const std::unordered_map<std::string, BP1Index>& indices, + std::vector<char>& buffer ) + { + CopyToBuffer( buffer, &count ); + CopyToBuffer( buffer, &length ); + + for( const auto& indexPair : indices ) //set each index length + { + const auto& indexBuffer = indexPair.second.Buffer; + CopyToBuffer( buffer, indexBuffer.data(), indexBuffer.size() ); + } + }; - const std::uint64_t offsetVarsIndex = offsetPGIndex + metadataSet.PGIndexPosition; - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &offsetVarsIndex, 8 ); - const std::uint64_t offsetAttributeIndex = offsetVarsIndex + metadataSet.VarsIndexPosition; - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &offsetAttributeIndex, 8 ); + //Finish writing metadata counts and lengths + //PG Index + const std::uint64_t pgCount = metadataSet.DataPGCount; + const std::uint64_t pgLength = metadataSet.PGIndex.Buffer.size(); + + //var index count and length (total), and each index length + std::uint32_t varsCount; + std::uint64_t varsLength; + lf_IndexCountLength( metadataSet.VarsIndices, varsCount, varsLength ); + //attribute index count and length, and each index length + std::uint32_t attributesCount; + std::uint64_t attributesLength; + lf_IndexCountLength( metadataSet.AttributesIndices, attributesCount, attributesLength ); + + const std::size_t footerSize = (pgLength+16) + (varsLength+12) + (attributesLength+12) + metadataSet.MiniFooterSize; + auto& buffer = heap.m_Data; + buffer.reserve( buffer.size() + footerSize ); //reserve data to fit metadata, must replace with growth buffer strategy + + //write pg index + CopyToBuffer( buffer, &pgCount ); + CopyToBuffer( buffer, &pgLength ); + CopyToBuffer( buffer, metadataSet.PGIndex.Buffer.data(), pgLength ); + //Vars indices + lf_FlattenIndices( varsCount, varsLength, metadataSet.VarsIndices, buffer ); + //Attribute indices + lf_FlattenIndices( attributesCount, attributesLength, metadataSet.AttributesIndices, buffer ); + + //getting absolute offsets, minifooter is 28 bytes for now + const std::uint64_t offsetPGIndex = heap.m_DataAbsolutePosition; + const std::uint64_t offsetVarsIndex = offsetPGIndex + (pgLength+16); + const std::uint64_t offsetAttributeIndex = offsetVarsIndex + (varsLength+12); + + CopyToBuffer( buffer, &offsetPGIndex ); + CopyToBuffer( buffer, &offsetVarsIndex ); + CopyToBuffer( buffer, &offsetAttributeIndex ); //version if( IsLittleEndian( ) ) { const std::uint8_t endian = 0; - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &endian, 1 ); - buffer.m_DataPosition += 2; - MemcpyToBuffer( buffer.m_Data, buffer.m_DataPosition, &m_Version, 1 ); + CopyToBuffer( buffer, &endian ); + buffer.insert( buffer.end(), 2, 0 ); + CopyToBuffer( buffer, &m_Version ); } else { } - buffer.m_DataAbsolutePosition += metadataSize; + heap.m_DataAbsolutePosition += footerSize; if( metadataSet.Log.m_IsActive == true ) - { - metadataSet.Log.m_TotalBytes.push_back( buffer.m_DataAbsolutePosition ); - } + metadataSet.Log.m_TotalBytes.push_back( heap.m_DataAbsolutePosition ); } diff --git a/src/functions/adiosFunctions.cpp b/src/functions/adiosFunctions.cpp index 48fd527502fbb3cd041587d9c3434e1602bbd668..11e92b07b87f945524194f45a1c7a0190a66c76b 100644 --- a/src/functions/adiosFunctions.cpp +++ b/src/functions/adiosFunctions.cpp @@ -537,35 +537,32 @@ std::vector<int> CSVToVectorInt( const std::string csv ) } -bool CheckBuffersAllocation( const std::size_t indexSize, const std::size_t payloadSize, - const float growthFactor, const std::size_t maxBufferSize, - const std::size_t indexPosition, std::vector<char>& indexBuffer, - const std::size_t dataPosition, std::vector<char>& dataBuffer ) +bool CheckBufferAllocation( const std::size_t newSize, const float growthFactor, const std::size_t maxBufferSize, + std::vector<char>& buffer ) { //Check if data in buffer needs to be reallocated - const std::size_t requiredDataSize = dataPosition + payloadSize + indexSize + 100; //adding some bytes tolerance + const std::size_t requiredDataSize = buffer.size() + newSize + 100; //adding some bytes for tolerance // might need to write payload in batches bool doTransportsFlush = ( requiredDataSize > maxBufferSize )? true : false; - if( GrowBuffer( requiredDataSize, growthFactor, dataPosition, dataBuffer ) == -1 ) + if( GrowBuffer( requiredDataSize, growthFactor, buffer ) == -1 ) doTransportsFlush = true; - GrowBuffer( indexSize, growthFactor, indexPosition, indexBuffer ); return doTransportsFlush; } -int GrowBuffer( const std::size_t incomingDataSize, const float growthFactor, const std::size_t currentPosition, +int GrowBuffer( const std::size_t incomingDataSize, const float growthFactor, std::vector<char>& buffer ) { const std::size_t currentCapacity = buffer.capacity(); - const std::size_t availableSpace = currentCapacity - currentPosition; + const std::size_t availableSpace = currentCapacity - buffer.size(); const double gf = static_cast<double>( growthFactor ); if( incomingDataSize > availableSpace ) { - const std::size_t neededCapacity = incomingDataSize + currentPosition; + const std::size_t neededCapacity = incomingDataSize + buffer.size(); const double numerator = std::log( static_cast<double>( neededCapacity ) / static_cast<double>( currentCapacity ) ); const double denominator = std::log( gf ); @@ -574,7 +571,7 @@ int GrowBuffer( const std::size_t incomingDataSize, const float growthFactor, co try { - buffer.resize( newSize ); + buffer.reserve( newSize ); } catch( std::bad_alloc& e ) {