From b5373d9f22fe946e62f13103271adb21518a7945 Mon Sep 17 00:00:00 2001 From: wfg <wfg@pc0098504.ornl.gov> Date: Fri, 17 Feb 2017 17:32:48 -0500 Subject: [PATCH] Working on PGIndex Write and added ./buildDataman.sh --- buildTest.sh => buildDataman.sh | 12 +-- examples/hello/vis/Makefile | 35 -------- examples/hello/vis/helloVis.cpp | 134 ----------------------------- include/core/Transport.h | 4 +- include/core/Variable.h | 8 +- include/engine/writer/Writer.h | 17 ++-- include/format/BP1.h | 78 ++++++++++++++++- include/format/BP1Writer.h | 147 +++++++++++++++++++------------- src/core/Transport.cpp | 6 +- src/engine/writer/Writer.cpp | 48 +++++++++-- src/format/BP1.cpp | 29 ++++++- src/format/BP1Writer.cpp | 115 ++++++++++++++++++++----- src/transport/FStream.cpp | 66 -------------- src/transport/File.cpp | 2 +- 14 files changed, 355 insertions(+), 346 deletions(-) rename buildTest.sh => buildDataman.sh (77%) delete mode 100644 examples/hello/vis/Makefile delete mode 100644 examples/hello/vis/helloVis.cpp diff --git a/buildTest.sh b/buildDataman.sh similarity index 77% rename from buildTest.sh rename to buildDataman.sh index d4a20878f..bf57df38d 100755 --- a/buildTest.sh +++ b/buildDataman.sh @@ -1,7 +1,7 @@ #!/bin/bash -# buildTest.sh for vis engine build and run test +# buildTest.sh for Dataman example # Created on: Feb 9, 2017 # Author: wfg @@ -14,21 +14,21 @@ echo echo echo "#################################################################" -echo "Building vis example" +echo "Building Dataman example" echo "#################################################################" -make -C ./examples/hello/vis +make -C ./examples/hello/dataman echo echo echo "#################################################################" -echo "Running vis nompi example" +echo "Running helloDataman_nompi.exe example" echo "#################################################################" -./examples/hello/vis/helloVis_nompi +./examples/hello/dataman/helloDataMan_nompi.exe echo echo echo "#################################################################" echo "To run mpi version with 4 mpi processes: " -echo "mpirun -n 4 ./examples/hello/vis/helloVis_mpi" +echo "mpirun -n 4 ./examples/hello/dataman/helloDataman.exe" echo "END" echo "################################################################" diff --git a/examples/hello/vis/Makefile b/examples/hello/vis/Makefile deleted file mode 100644 index f7767abd9..000000000 --- a/examples/hello/vis/Makefile +++ /dev/null @@ -1,35 +0,0 @@ -# Makefile for testing purposes, will build helloWriter_OOP_mpi (make or make mpi) or helloWriter_OOP_nompi (make nompi) -# Created on: Oct 4, 2016 -# Author: wfg - -BASE_NAME=helloVis - -#COMPILERS -CC=g++ -MPICC=mpic++ - -#ADIOS LOCATION -ADIOS_DIR=../../.. -ADIOS_INCLUDE=-I$(ADIOS_DIR)/include -ADIOS_LIB=$(ADIOS_DIR)/lib/libadios.a -ADIOS_NOMPI_LIB=$(ADIOS_DIR)/lib/libadios_nompi.a - -#FLAGS -CFLAGS=-Wall -Wpedantic -Woverloaded-virtual -std=c++11 -O0 -g -LDFLAGS= - -all: mpi nompi - -mpi: $(BASE_NAME).cpp - @echo; - @echo Building mpi version $(BASE_NAME)_mpi; - $(MPICC) $(CFLAGS) $(ADIOS_INCLUDE) -DHAVE_MPI $(BASE_NAME).cpp -o $(BASE_NAME)_mpi $(ADIOS_LIB) $(LDFLAGS) -lpthread - -nompi: $(BASE_NAME).cpp - @echo; - @echo Building nompi version $(BASE_NAME)_nompi; - $(CC) $(CFLAGS) $(ADIOS_INCLUDE) $(BASE_NAME).cpp -o $(BASE_NAME)_nompi $(ADIOS_NOMPI_LIB) $(LDFLAGS) -lpthread - -clean: - rm *_mpi; rm *_nompi - \ No newline at end of file diff --git a/examples/hello/vis/helloVis.cpp b/examples/hello/vis/helloVis.cpp deleted file mode 100644 index bd5c67dad..000000000 --- a/examples/hello/vis/helloVis.cpp +++ /dev/null @@ -1,134 +0,0 @@ -/* - * helloADIOSNoXML_OOP.cpp - * - * Created on: Jan 9, 2017 - * Author: wfg - */ - -#include <vector> -#include <iostream> - -#ifdef HAVE_MPI - #include <mpi.h> -#else - #include "mpidummy.h" - using adios::MPI_Init; - using adios::MPI_Comm_rank; - using adios::MPI_Finalize; -#endif - - -#include "ADIOS_OOP.h" - - -int main( int argc, char* argv [] ) -{ - MPI_Init( &argc, &argv ); - int rank; - MPI_Comm_rank( MPI_COMM_WORLD, &rank ); - const bool adiosDebug = true; - adios::ADIOS adios( MPI_COMM_WORLD, adiosDebug ); - - //Application variables from fluid and solid - const unsigned int fluidNx = 3; - const unsigned int fluidNy = 3; - const unsigned int fluidNz = 3; - std::vector<double> fluidTemperature = { 1, 2, 3, 4, 5, 6, 7, 8, 9, - 1, 2, 3, 4, 5, 6, 7, 8, 9, - 1, 2, 3, 4, 5, 6, 7, 8, 9 }; - - const unsigned int solidNx = 5; - const unsigned int solidNy = 3; - const unsigned int solidNz = 3; - std::vector<double> solidTemperature = { 111, 112, 113, 121, 122, 123, 131, 132, 133, - 211, 212, 213, 221, 222, 223, 231, 232, 233, - 311, 312, 313, 321, 322, 323, 331, 332, 333, - 411, 412, 413, 421, 422, 423, 431, 432, 433, - 511, 512, 513, 521, 522, 523, 531, 532, 533 - }; - - try - { - //Define fluid group and variables - adios::Group& ioFluid = adios.DeclareGroup( "fluid" ); - adios::Var ioFluidNx = ioFluid.DefineVariable<unsigned int>( "fluidNx" ); - adios::Var ioFluidNy = ioFluid.DefineVariable<unsigned int>( "fluidNy" ); - adios::Var ioFluidNz = ioFluid.DefineVariable<unsigned int>( "fluidNz" ); - adios::Dims ioFluidDims = ioFluid.SetDimensions( { ioFluidNx, ioFluidNy, ioFluidNz } ); - adios::Var ioFluidTemperature = ioFluid.DefineVariable<double>( "fluidTemperature", ioFluidDims ); - - //add transform to variable in group...not executed (just testing API) - adios::Transform bzip2 = adios::transform::BZIP2( ); - ioFluid.AddTransform( ioFluidTemperature, bzip2 , 1 ); - - //adios::Transform merge = adios::transform::Merge( 1 ); //potential merge transform? 1 is a tag - //ioFluid.AddTransform( ioFluidTemperature, merge , 1 ); //potential merge transform? 1 is a tag - - //Define solid group and variables - adios::Group& ioSolid = adios.DeclareGroup( "solid" ); - adios::Var ioSolidNx = ioSolid.DefineVariable<unsigned int>( "solidNx" ); - adios::Var ioSolidNy = ioSolid.DefineVariable<unsigned int>( "solidNy" ); - adios::Var ioSolidNz = ioSolid.DefineVariable<unsigned int>( "solidNz" ); - adios::Dims ioSolidDims = ioSolid.SetDimensions( { ioSolidNx, ioSolidNy, ioSolidNz } ); - adios::Var ioSolidTemperature = ioSolid.DefineVariable<double>( "solidTemperature", ioSolidDims ); - - //adios::Transform merge = adios::transform::Merge( 1 ); //potential merge transform? 1 is a tag - //ioSolid.AddTransform( ioSolidTemperature, merge , 1 ); //potential merge transform? 1 is a tag - - - //Define method for engine creation - adios::Method& visSettings = adios.DeclareMethod( "SimpleTask", "Vis" ); - visSettings.AddTransport( "VisIt", "use_shared_memory=no", "hex_mesh=true", "vtk-m_cores=8" ); //as many as you want - visSettings.AddTransport( "POSIX", "have_metadata_file=yes" ); - - //Create engine, smart pointer due to polymorphism - //Open returns a smart pointer to Engine containing the Derived class Vis engine - auto visWriter = adios.Open( "visEngine.tmp", "w", visSettings ); - - visWriter->SetDefaultGroup( ioFluid ); //default group can change - visWriter->Write<unsigned int>( ioFluidNx, &fluidNx ); //group, variableName, *value - visWriter->Write<unsigned int>( ioFluidNy, &fluidNy ); - visWriter->Write<unsigned int>( ioFluidNz, &fluidNz ); - visWriter->Write<double>( ioFluidTemperature, fluidTemperature.data() ); - - visWriter->SetDefaultGroup( ioSolid ); //default group can change - visWriter->Write<unsigned int>( ioSolidNx, &solidNx ); //group, variableName, *value - visWriter->Write<unsigned int>( ioSolidNy, &solidNy ); - visWriter->Write<unsigned int>( ioSolidNz, &solidNz ); - visWriter->Write<double>( ioSolidTemperature, solidTemperature.data() ); - - visWriter->Close( ); - } - catch( std::invalid_argument& e ) - { - if( rank == 0 ) - { - std::cout << "Invalid argument exception, STOPPING PROGRAM\n"; - std::cout << e.what() << "\n"; - } - } - catch( std::ios_base::failure& e ) - { - if( rank == 0 ) - { - std::cout << "System exception, STOPPING PROGRAM\n"; - std::cout << e.what() << "\n"; - } - } - catch( std::exception& e ) - { - if( rank == 0 ) - { - std::cout << "Exception, STOPPING PROGRAM\n"; - std::cout << e.what() << "\n"; - } - } - - MPI_Finalize( ); - - return 0; - -} - - - diff --git a/include/core/Transport.h b/include/core/Transport.h index 781180699..cb4d3ea08 100644 --- a/include/core/Transport.h +++ b/include/core/Transport.h @@ -40,8 +40,8 @@ public: MPI_Comm m_MPIComm = 0; ///< only used as reference to MPI communicator passed from parallel constructor, MPI_Comm is a pointer itself. Public as called from C #endif - int m_MPIRank = 0; ///< current MPI rank process - int m_MPISize = 1; ///< current MPI processes size + int m_RankMPI = 0; ///< current MPI rank process + int m_SizeMPI = 1; ///< current MPI processes size /** * Base constructor that all derived classes pass diff --git a/include/core/Variable.h b/include/core/Variable.h index d413ea23f..24f54ef13 100644 --- a/include/core/Variable.h +++ b/include/core/Variable.h @@ -92,8 +92,12 @@ public: if( m_AppValues != nullptr ) { - logInfo << "Values (first 10): \n"; - for( unsigned int i = 0; i < 100; ++i ) + logInfo << "Values (first 10 or max_size): \n"; + std::size_t size = TotalSize(); + if( size > 10 ) + size = 10; + + for( std::size_t i = 0; i < size; ++i ) { logInfo << m_AppValues[i] << " "; } diff --git a/include/engine/writer/Writer.h b/include/engine/writer/Writer.h index b9b1d9133..8e9fc634b 100644 --- a/include/engine/writer/Writer.h +++ b/include/engine/writer/Writer.h @@ -1,5 +1,5 @@ /* - * SingleBP.h + * BPWriter.h * * Created on: Dec 16, 2016 * Author: wfg @@ -74,8 +74,14 @@ private: float m_GrowthFactor = 1.5; bool m_TransportFlush = false; ///< true: transport flush happened, buffer must be reset + void Init( ); void InitTransports( ); + void InitProcessGroup( ); + + + void WriteProcessGroupIndex( ); + /** * Common function @@ -91,10 +97,10 @@ private: m_WrittenVariables.insert( variable.m_Name ); //precalculate new metadata and payload sizes const std::size_t indexSize = m_BP1Writer.GetVariableIndexSize( variable ); - const std::size_t payloadSize = variable.PayLoadSize(); + const std::size_t payloadSize = variable.PayLoadSize(); //will change if compression is applied //Buffer reallocation, expensive part - m_TransportFlush = CheckBuffersAllocation( variable.m_Name, indexSize, payloadSize ); + m_TransportFlush = CheckBuffersAllocation( indexSize, payloadSize ); //WRITE INDEX to data buffer and metadata structure (in memory)// m_BP1Writer.WriteVariableIndex( variable, m_Buffer, m_MetadataSet ); @@ -110,7 +116,7 @@ private: } else //Write data to buffer { - //EXPENSIVE part might want to use threads if large. + //EXPENSIVE part, might want to use threads if large. MemcpyThreads( m_Buffer.m_Data.data(), variable.m_AppValues, payloadSize, m_Cores ); //update indices m_Buffer.m_DataPosition += payloadSize; @@ -120,12 +126,11 @@ private: /** * Check if heap buffers for data and metadata need reallocation or maximum sizes have been reached. - * @param variableName name of the variable to be written * @param indexSize precalculated index size * @param payloadSize payload size from variable total size * @return true: transport must be flush and buffers reset, false: buffer is sufficient */ - bool CheckBuffersAllocation( const std::string variableName, const std::size_t indexSize, const std::size_t payloadSize ); + bool CheckBuffersAllocation( const std::size_t indexSize, const std::size_t payloadSize ); }; diff --git a/include/format/BP1.h b/include/format/BP1.h index 02bcea177..b4139cd2f 100644 --- a/include/format/BP1.h +++ b/include/format/BP1.h @@ -8,6 +8,9 @@ #ifndef BP1_H_ #define BP1_H_ + +#include <memory> //std::shared_ptr + #ifdef HAVE_MPI #include <mpi.h> #else @@ -16,7 +19,6 @@ #include "core/Transport.h" - namespace adios { namespace format @@ -27,6 +29,9 @@ namespace format */ struct BP1MetadataSet { + std::string TimeStepName; ///< time step name associated with this PG + std::uint32_t TimeStep = 0; ///< current time step, updated with advance step, if append it will be updated to last + std::uint64_t PGCount = 0; ///< number of process groups std::uint64_t PGLength = 0; ///< length in bytes of process groups std::size_t PGIndexPosition = 16; @@ -36,7 +41,6 @@ struct BP1MetadataSet std::uint64_t VarsLength = 0; ///< length in bytes of written Variables std::size_t VarsIndexPosition = 12; ///< initial position in bytes std::vector<char> VarsIndex = std::vector<char>( 102400 ); ///< metadata variable index, start with 1Kb - // std::map< std::string, std::pair<std::size_t,std::size_t> > VariablePositions; std::uint32_t AttributesCount = 0; ///< number of Attributes std::uint64_t AttributesLength = 0; ///< length in bytes of Attributes @@ -63,6 +67,48 @@ public: protected: + + /** + * method type for file I/O + */ + enum IO_METHOD { + METHOD_UNKNOWN = -2//!< ADIOS_METHOD_UNKNOWN + ,METHOD_NULL = -1 //!< ADIOS_METHOD_NULL + ,METHOD_MPI = 0 //!< METHOD_MPI + ,METHOD_DATATAP = 1 //OBSOLETE + ,METHOD_POSIX = 2 //!< METHOD_POSIX + ,METHOD_DATASPACES = 3 //!< METHOD_DATASPACES + ,METHOD_VTK = 4 //non-existent + ,METHOD_POSIX_ASCII = 5 //non-existent + ,METHOD_MPI_CIO = 6 //OBSOLETE + ,METHOD_PHDF5 = 7 //!< METHOD_PHDF5 + ,METHOD_PROVENANCE = 8 //OBSOLETE + ,METHOD_MPI_STRIPE = 9 //OBSOLETE + ,METHOD_MPI_LUSTRE = 10 //!< METHOD_MPI_LUSTRE + ,METHOD_MPI_STAGGER = 11 //OBSOLETE + ,METHOD_MPI_AGG = 12 //OBSOLETE + ,METHOD_ADAPTIVE = 13 //OBSOLETE + ,METHOD_POSIX1 = 14 //OBSOLETE + ,METHOD_NC4 = 15 //!< METHOD_NC4 + ,METHOD_MPI_AMR = 16 //!< METHOD_MPI_AMR + ,METHOD_MPI_AMR1 = 17 //OBSOLETE + ,METHOD_FLEXPATH = 18 //!< METHOD_FLEXPATH + ,METHOD_NSSI_STAGING = 19 //!< METHOD_NSSI_STAGING + ,METHOD_NSSI_FILTER = 20 //!< METHOD_NSSI_FILTER + ,METHOD_DIMES = 21 //!< METHOD_DIMES + ,METHOD_VAR_MERGE = 22 //!< METHOD_VAR_MERGE + ,METHOD_MPI_BGQ = 23 //!< METHOD_MPI_BGQ + ,METHOD_ICEE = 24 //!< METHOD_ICEE + ,METHOD_COUNT = 25 //!< METHOD_COUNT + ,METHOD_FSTREAM = 26 + ,METHOD_FILE = 27 + ,METHOD_ZMQ = 28 + ,METHOD_MDTM = 29 + + + }; + + /** * DataTypes mapping in BP Format */ @@ -122,9 +168,37 @@ protected: }; + /** + * Returns data type index from enum Datatypes + * @param variable input variable + * @return data type + */ + template< class T > inline std::int8_t GetDataType( ) const noexcept + { + return type_unknown; + } + + + std::vector<int> GetMethodIDs( const std::vector< std::shared_ptr<Transport> >& transports ) const noexcept; + }; +//Moving template BP1Writer::GetDataType template specializations outside of the class +template< > inline std::int8_t BP1::GetDataType<char>( ) const noexcept { return type_byte; } +template< > inline std::int8_t BP1::GetDataType<short>( ) const noexcept{ return type_short; } +template< > inline std::int8_t BP1::GetDataType<int>( ) const noexcept{ return type_integer; } +template< > inline std::int8_t BP1::GetDataType<long int>( ) const noexcept{ return type_long; } + +template< > inline std::int8_t BP1::GetDataType<unsigned char>( ) const noexcept { return type_unsigned_byte; } +template< > inline std::int8_t BP1::GetDataType<unsigned short>( ) const noexcept{ return type_unsigned_short; } +template< > inline std::int8_t BP1::GetDataType<unsigned int>( ) const noexcept{ return type_unsigned_integer; } +template< > inline std::int8_t BP1::GetDataType<unsigned long int>( ) const noexcept{ return type_unsigned_long; } + +template< > inline std::int8_t BP1::GetDataType<float>( ) const noexcept{ return type_real; } +template< > inline std::int8_t BP1::GetDataType<double>( ) const noexcept{ return type_double; } +template< > inline std::int8_t BP1::GetDataType<long double>( ) const noexcept{ return type_long_double; } + } //end namespace format diff --git a/include/format/BP1Writer.h b/include/format/BP1Writer.h index db6f8e8e2..dd20dbc96 100644 --- a/include/format/BP1Writer.h +++ b/include/format/BP1Writer.h @@ -39,35 +39,52 @@ public: unsigned int m_Cores = 1; ///< number of cores for thread operations in large array (min,max) unsigned int m_Verbosity = 0; ///< statistics verbosity, can change if redefined in Engine method. float m_GrowthFactor = 1.5; ///< memory growth factor, can change if redefined in Engine method. - unsigned int m_VariablesTotalCount = 0; /** - * Calculates the Process Index size in bytes according to the BP format - * @param name process group name - * @param timeStepName name of the corresponding time step - * @return size of process group index in bytes + * Calculates the Process Index size in bytes according to the BP format, including list of method with no parameters (for now) + * @param name + * @param timeStepName + * @param numberOfTransports + * @return size of pg index */ - std::size_t GetProcessIndexSize( const std::string name, const std::string timeStepName ); + std::size_t GetProcessGroupIndexSize( const std::string name, const std::string timeStepName, + const std::size_t numberOfTransports ) const noexcept; /** - * Writes a PGIndex, done at Open or aggregation + * Writes a process group index PGIndex and list of methods (from transports), done at Open or aggregation of new time step + * Version that operates on a single heap buffer and metadataset. * @param isFortran * @param name * @param processID * @param timeStepName * @param timeStep - * @param dataBuffers - * @param dataPositions - * @param dataAbsolutePositions - * @param metadataBuffers - * @param metadataPositions + * @param transports + * @param buffer + * @param metadataSet */ void WriteProcessGroupIndex( const bool isFortran, const std::string name, const unsigned int processID, const std::string timeStepName, const unsigned int timeStep, - std::vector<char*>& dataBuffers, std::vector<std::size_t>& dataPositions, - std::vector<std::size_t>& dataAbsolutePositions, - std::vector<char*>& metadataBuffers, - std::vector<std::size_t>& metadataPositions ); + const std::vector< std::shared_ptr<Transport> >& transports, + Heap& buffer, + BP1MetadataSet& metadataSet ) const noexcept; + /** + * Writes a process group index PGIndex and list of methods (from transports), done at Open or aggregation of new time step + * Version that operates on many capsules and metadatasets + * @param isFortran + * @param name + * @param processID + * @param timeStepName + * @param timeStep + * @param transports + * @param capsules + * @param metadataSets + */ + void WriteProcessGroupIndex( const bool isFortran, const std::string name, const unsigned int processID, + const std::string timeStepName, const unsigned int timeStep, + const std::vector< std::shared_ptr<Transport> >& transports, + std::vector< std::shared_ptr<Capsule> >& capsules, + std::vector<BP1MetadataSet>& metadataSets ) const noexcept; + /** * Returns the estimated variable index size @@ -78,7 +95,7 @@ public: * @return variable index size */ template< class T > - size_t GetVariableIndexSize( const Variable<T>& variable ) noexcept + size_t GetVariableIndexSize( const Variable<T>& variable ) const noexcept { //size_t indexSize = varEntryLength + memberID + lengthGroupName + groupName + lengthVariableName + lengthOfPath + path + datatype size_t indexSize = 23; //without characteristics @@ -127,8 +144,7 @@ public: * @param metadataSet */ template< class T > - void WriteVariableIndex( const Variable<T>& variable, Heap& buffer, BP1MetadataSet& metadataSet ) noexcept - + void WriteVariableIndex( const Variable<T>& variable, Heap& buffer, BP1MetadataSet& metadataSet ) const noexcept { // adapt this part to local variables std::vector<char*> dataBuffers{ buffer.m_Data.data() }; @@ -139,8 +155,8 @@ public: std::vector<std::size_t> metadataPositions{ metadataSet.VarsIndexPosition }; std::vector<unsigned int> variablesCount{ metadataSet.VarsCount }; - WriteVariableCommon( variable, dataBuffers, dataPositions, dataAbsolutePositions, - metadataBuffers, metadataPositions, variablesCount ); + WriteVariableIndexCommon( variable, dataBuffers, dataPositions, dataAbsolutePositions, + metadataBuffers, metadataPositions, variablesCount ); //update positions and varsCount originally passed by value buffer.m_DataPosition = dataPositions[0]; @@ -161,7 +177,7 @@ public: template< class T > void WriteVariableIndex( const Variable<T>& variable, std::vector< std::shared_ptr<Capsule> >& capsules, - std::vector<BP1MetadataSet>& metadataSets ) noexcept + std::vector<BP1MetadataSet>& metadataSets ) const noexcept { // adapt this part to local variables std::vector<char*> metadataBuffers, dataBuffers; @@ -182,8 +198,8 @@ public: dataAbsolutePositions.push_back( capsule->m_DataAbsolutePosition ); } - WriteVariableCommon( variable, dataBuffers, dataPositions, dataAbsolutePositions, - metadataBuffers, metadataPositions, variablesCount ); + WriteVariableIndexCommon( variable, dataBuffers, dataPositions, dataAbsolutePositions, + metadataBuffers, metadataPositions, variablesCount ); //update positions and varsCount originally passed by value const unsigned int buffersSize = static_cast<unsigned int>( capsules.size() ); @@ -197,18 +213,49 @@ public: } } - void Close( const BP1MetadataSet& metadataSet, Capsule& capsule, Transport& transport ); + void Close( const BP1MetadataSet& metadataSet, Capsule& capsule, Transport& transport ) const; private: + /** + * Common function that Writes a process group index PGIndex, done at Open or aggregation of new time step. + * Called from public WriteProcessGroupIndex functions. + * @param isFortran true: using Fortran, false: other language + * @param name process group, usually the rank (maybe communicator?) + * @param processID processID, usually the rank + * @param timeStepName + * @param timeStep + * @param dataBuffers + * @param dataPositions + * @param dataAbsolutePositions + * @param metadataBuffers + * @param metadataPositions + */ + void WriteProcessGroupIndexCommon( const bool isFortran, const std::string name, const unsigned int processID, + const std::string timeStepName, const unsigned int timeStep, + const std::vector<int>& methodIDs, + std::vector<char*>& dataBuffers, std::vector<std::size_t>& dataPositions, + std::vector<std::size_t>& dataAbsolutePositions, + std::vector<char*>& metadataBuffers, + std::vector<std::size_t>& metadataPositions ) const noexcept; + /** + * + * @param variable + * @param dataBuffers + * @param dataPositions + * @param dataAbsolutePositions + * @param metadataBuffers + * @param metadataPositions + * @param variablesCount + */ template<class T> - void WriteVariableCommon( const Variable<T>& variable, - std::vector<char*>& dataBuffers, std::vector<size_t>& dataPositions, - std::vector<size_t>& dataAbsolutePositions, - std::vector<char*>& metadataBuffers, std::vector<size_t>& metadataPositions, - std::vector<unsigned int> variablesCount ) + void WriteVariableIndexCommon( const Variable<T>& variable, + std::vector<char*>& dataBuffers, std::vector<size_t>& dataPositions, + std::vector<size_t>& dataAbsolutePositions, + std::vector<char*>& metadataBuffers, std::vector<size_t>& metadataPositions, + std::vector<unsigned int> variablesCount ) const noexcept { //capture initial positions const std::vector<std::size_t> metadataLengthPositions( metadataPositions ); @@ -376,8 +423,6 @@ private: MemcpyToBuffers( metadataBuffers, metadataCharacteristicsCountPositions, &characteristicsCounter, 1 ); MemcpyToBuffers( metadataBuffers, metadataCharacteristicsCountPositions, metadataCharacteristicsLengths, 4 ); //vector to vector MovePositions( -5, metadataCharacteristicsCountPositions ); //back to original position - - ++m_VariablesTotalCount; } //end of function @@ -389,7 +434,7 @@ private: * @param positions to be moved */ void WriteNameRecord( const std::string name, const std::uint16_t length, - std::vector<char*>& buffers, std::vector<std::size_t>& positions ); + std::vector<char*>& buffers, std::vector<std::size_t>& positions ) const noexcept; /** * Write a dimension record for a global variable used by WriteVariableCommon @@ -404,7 +449,7 @@ private: const std::vector<std::size_t>& localDimensions, const std::vector<std::size_t>& globalDimensions, const std::vector<std::size_t>& globalOffsets, - const bool addType = false ); + const bool addType = false ) const noexcept; /** * Write a dimension record for a local variable used by WriteVariableCommon @@ -417,7 +462,7 @@ private: void WriteDimensionRecord( std::vector<char*>& buffers, std::vector<std::size_t>& positions, const std::vector<std::size_t>& localDimensions, const unsigned int skip, - const bool addType = false ); + const bool addType = false ) const noexcept; /** * @@ -430,7 +475,7 @@ private: template<class T> void WriteStatisticsRecord( const std::uint8_t& id, const T& value, std::vector<char*>& buffers, std::vector<std::size_t>& positions, - const bool addLength = false ) + const bool addLength = false ) const noexcept { const std::uint8_t characteristicID = characteristic_stat; MemcpyToBuffers( buffers, positions, &characteristicID, 1 ); @@ -445,36 +490,20 @@ private: MemcpyToBuffers( buffers, positions, &value, sizeof(T) ); } + /** - * Returns data type index from enum Datatypes - * @param variable input variable - * @return data type + * + * @param capsule + * @param transport */ - template< class T > inline std::int8_t GetDataType( ) noexcept { return type_unknown; } - - void CloseRankFile( Capsule& capsule, Transport& transport ); + void CloseRankFile( Capsule& capsule, Transport& transport ) const; - void SetMetadata( const BP1MetadataSet& metadataSet, Capsule& capsule ); ///< sets the metadata buffer in capsule with indices and minifooter - void SetMiniFooter( BP1MetadataSet& metadataSet ); ///< sets the minifooter + void SetMetadata( const BP1MetadataSet& metadataSet, Capsule& capsule ) const; ///< sets the metadata buffer in capsule with indices and minifooter + void SetMiniFooter( BP1MetadataSet& metadataSet ) const; ///< sets the minifooter }; -//Moving template BP1Writer::GetDataType template specializations outside of the class -template< > inline std::int8_t BP1Writer::GetDataType<char>( ) noexcept { return type_byte; } -template< > inline std::int8_t BP1Writer::GetDataType<short>( ) noexcept{ return type_short; } -template< > inline std::int8_t BP1Writer::GetDataType<int>( ) noexcept{ return type_integer; } -template< > inline std::int8_t BP1Writer::GetDataType<long int>( ) noexcept{ return type_long; } - -template< > inline std::int8_t BP1Writer::GetDataType<unsigned char>( ) noexcept { return type_unsigned_byte; } -template< > inline std::int8_t BP1Writer::GetDataType<unsigned short>( ) noexcept{ return type_unsigned_short; } -template< > inline std::int8_t BP1Writer::GetDataType<unsigned int>( ) noexcept{ return type_unsigned_integer; } -template< > inline std::int8_t BP1Writer::GetDataType<unsigned long int>( ) noexcept{ return type_unsigned_long; } - -template< > inline std::int8_t BP1Writer::GetDataType<float>( ) noexcept{ return type_real; } -template< > inline std::int8_t BP1Writer::GetDataType<double>( ) noexcept{ return type_double; } -template< > inline std::int8_t BP1Writer::GetDataType<long double>( ) noexcept{ return type_long_double; } - } //end namespace format diff --git a/src/core/Transport.cpp b/src/core/Transport.cpp index a40be2576..1b5cec286 100644 --- a/src/core/Transport.cpp +++ b/src/core/Transport.cpp @@ -11,14 +11,13 @@ namespace adios { - Transport::Transport( const std::string type, MPI_Comm mpiComm, const bool debugMode ): m_Type{ type }, m_MPIComm{ mpiComm }, m_DebugMode{ debugMode } { - MPI_Comm_rank( m_MPIComm, &m_MPIRank ); - MPI_Comm_size( m_MPIComm, &m_MPISize ); + MPI_Comm_rank( m_MPIComm, &m_RankMPI ); + MPI_Comm_size( m_MPIComm, &m_SizeMPI ); } @@ -38,6 +37,5 @@ void Transport::Close( ) { } - } //end namespace diff --git a/src/engine/writer/Writer.cpp b/src/engine/writer/Writer.cpp index 01a739c64..7ddc5a562 100644 --- a/src/engine/writer/Writer.cpp +++ b/src/engine/writer/Writer.cpp @@ -54,6 +54,8 @@ void Writer::Init( ) m_BP1Writer.m_Verbosity = std::stoi( itVerbosity->second ); InitTransports( ); + InitProcessGroup( ); + } @@ -166,9 +168,6 @@ void Writer::Close( const int transportIndex ) } - - - void Writer::InitTransports( ) { if( m_DebugMode == true ) @@ -216,9 +215,48 @@ void Writer::InitTransports( ) } +void Writer::InitProcessGroup( ) +{ + if( m_AccessMode == "a" ) + { + //Get last pg timestep and update timestep counter in format::BP1MetadataSet + } + WriteProcessGroupIndex( ); +} + + + +void Writer::WriteProcessGroupIndex( ) +{ + //pg = process group + const std::string pgName( std::to_string( m_RankMPI ) ); //using rank as name + const unsigned int timeStep = m_MetadataSet.TimeStep; + const std::string timeStepName( std::to_string( timeStep ) ); + const std::size_t pgIndexSize = m_BP1Writer.GetProcessGroupIndexSize( pgName, timeStepName, m_Transports.size() ); + + //metadata + GrowBuffer( pgIndexSize, m_GrowthFactor, m_MetadataSet.PGIndexPosition, m_MetadataSet.PGIndex ); + + //data? Need to be careful, maybe add some trailing tolerance in variable ???? + GrowBuffer( pgIndexSize, m_GrowthFactor, m_Buffer.m_DataPosition, m_Buffer.m_Data ); + +// const bool isFortran = ( m_HostLanguage == "Fortran" ) ? true : false; +// const unsigned int processID = static_cast<unsigned int> ( m_RankMPI ); + +// m_BP1Writer.WriteProcessGroupIndex( isFortran, name, processID, timeStepName, timeStep, m_Transports, +// m_Buffer.m_Data, m_Buffer.m_DataPosition, m_Buffer.m_DataAbsolutePosition, +// m_MetadataSet.PGIndex, m_MetadataSet.PGIndexPosition ); + +// const bool isFortran, const std::string name, const unsigned int processID, +// const std::string timeStepName, const unsigned int timeStep, +// std::vector<char*>& dataBuffers, std::vector<std::size_t>& dataPositions, +// std::vector<std::size_t>& dataAbsolutePositions, +// std::vector<char*>& metadataBuffers, +// std::vector<std::size_t>& metadataPositions + +} -bool Writer::CheckBuffersAllocation( const std::string variableName, const std::size_t indexSize, - const std::size_t payloadSize ) +bool Writer::CheckBuffersAllocation( const std::size_t indexSize, const std::size_t payloadSize ) { //Check if data in buffer needs to be reallocated const std::size_t dataSize = payloadSize + indexSize + 10; //adding some bytes tolerance diff --git a/src/format/BP1.cpp b/src/format/BP1.cpp index 1df1cbad6..2a29f11de 100644 --- a/src/format/BP1.cpp +++ b/src/format/BP1.cpp @@ -33,7 +33,7 @@ void BP1::OpenRankFiles( const std::string name, const std::string accessMode, T } CreateDirectory( directory ); - std::string fileName( directory + "/" + baseName + ".bp." + std::to_string( file.m_MPIRank ) ); + std::string fileName( directory + "/" + baseName + ".bp." + std::to_string( file.m_RankMPI ) ); if( file.m_MPIComm == MPI_COMM_SELF ) fileName = name; @@ -43,5 +43,32 @@ void BP1::OpenRankFiles( const std::string name, const std::string accessMode, T + +std::vector<int> BP1::GetMethodIDs( const std::vector< std::shared_ptr<Transport> >& transports ) const noexcept +{ + auto lf_GetMethodID = []( const std::string method ) -> int + { + int id = METHOD_UNKNOWN; + if( method == "NULL" ) id = METHOD_NULL; + else if( method == "POSIX" ) id = METHOD_POSIX; + else if( method == "FStream" ) id = METHOD_FSTREAM; + else if( method == "File" ) id = METHOD_FILE; + else if( method == "MPI" ) id = METHOD_MPI; + + return id; + }; + + std::vector<int> methodIDs; + methodIDs.reserve( transports.size() ); + + for( const auto transport : transports ) + { + methodIDs.push_back( lf_GetMethodID( transport->m_Type ) ); + } + + return methodIDs; +} + + } //end namespace format } //end namespace adios diff --git a/src/format/BP1Writer.cpp b/src/format/BP1Writer.cpp index 0f8697b95..d5bbaf46b 100644 --- a/src/format/BP1Writer.cpp +++ b/src/format/BP1Writer.cpp @@ -16,18 +16,98 @@ namespace format { -std::size_t BP1Writer::GetProcessIndexSize( const std::string name, const std::string timeStepName ) +std::size_t BP1Writer::GetProcessGroupIndexSize( const std::string name, const std::string timeStepName, + const size_t numberOfTransports ) const noexcept { - return name.length() + timeStepName.length() + 23; + //pgIndex + list of methods (transports) + return ( name.length() + timeStepName.length() + 23 ) + ( 3 + numberOfTransports ); //should be sufficient for data and metadata pgindices } +void BP1Writer::WriteProcessGroupIndex( const bool isFortran, const std::string name, const unsigned int processID, + const std::string timeStepName, const unsigned int timeStep, + const std::vector< std::shared_ptr<Transport> >& transports, + Heap& buffer, BP1MetadataSet& metadataSet ) const noexcept +{ + // adapt this part to local variables + std::vector<char*> dataBuffers{ buffer.m_Data.data() }; + std::vector<size_t> dataPositions{ buffer.m_DataPosition }; + std::vector<size_t> dataAbsolutePositions{ buffer.m_DataAbsolutePosition }; + + std::vector<char*> metadataBuffers{ metadataSet.PGIndex.data() }; + std::vector<std::size_t> metadataPositions{ metadataSet.PGIndexPosition }; + + const std::vector<int> methodIDs = GetMethodIDs( transports ); + + WriteProcessGroupIndexCommon( isFortran, name, processID, timeStepName, timeStep, methodIDs, + dataBuffers, dataPositions, dataAbsolutePositions, + metadataBuffers, metadataPositions ); + + buffer.m_DataPosition = dataPositions[0]; + buffer.m_DataAbsolutePosition = dataAbsolutePositions[0]; + metadataSet.PGIndexPosition = metadataPositions[0]; + +} void BP1Writer::WriteProcessGroupIndex( const bool isFortran, const std::string name, const unsigned int processID, const std::string timeStepName, const unsigned int timeStep, - std::vector<char*>& dataBuffers, std::vector<std::size_t>& dataPositions, - std::vector<std::size_t>& dataAbsolutePositions, - std::vector<char*>& metadataBuffers, - std::vector<std::size_t>& metadataPositions ) + const std::vector< std::shared_ptr<Transport> >& transports, + std::vector< std::shared_ptr<Capsule> >& capsules, + std::vector<BP1MetadataSet>& metadataSets ) const noexcept +{ + + // adapt this part to local variables + std::vector<char*> metadataBuffers, dataBuffers; + std::vector<std::size_t> metadataPositions, dataPositions, dataAbsolutePositions; + + for( auto& metadataSet : metadataSets ) + { + metadataBuffers.push_back( metadataSet.PGIndex.data() ); + metadataPositions.push_back( metadataSet.PGIndexPosition ); + } + + for( auto& capsule : capsules ) + { + dataBuffers.push_back( capsule->GetData( ) ); + dataPositions.push_back( capsule->m_DataPosition ); + dataAbsolutePositions.push_back( capsule->m_DataAbsolutePosition ); + } + + const std::vector<int> methodIDs = GetMethodIDs( transports ); + + WriteProcessGroupIndexCommon( isFortran, name, processID, timeStepName, timeStep, methodIDs, + dataBuffers, dataPositions, dataAbsolutePositions, + metadataBuffers, metadataPositions ); + + //update positions and varsCount originally passed by value + const unsigned int buffersSize = static_cast<unsigned int>( capsules.size() ); + for( unsigned int i = 0; i < buffersSize; ++i ) + { + metadataSets[i].PGIndexPosition = metadataPositions[i]; + + capsules[i]->m_DataPosition = dataPositions[i]; + capsules[i]->m_DataAbsolutePosition = dataAbsolutePositions[i]; + } +} + + + +void BP1Writer::Close( const BP1MetadataSet& metadataSet, Capsule& capsule, Transport& transport ) const +{ + + + +} + + + + +void BP1Writer::WriteProcessGroupIndexCommon( const bool isFortran, const std::string name, const unsigned int processID, + const std::string timeStepName, const unsigned int timeStep, + const std::vector<int>& methodIDs, + std::vector<char*>& dataBuffers, std::vector<std::size_t>& dataPositions, + std::vector<std::size_t>& dataAbsolutePositions, + std::vector<char*>& metadataBuffers, + std::vector<std::size_t>& metadataPositions ) const noexcept { std::vector<std::size_t> metadataLengthPositions( metadataPositions ); //get length of pg position @@ -62,7 +142,6 @@ void BP1Writer::WriteProcessGroupIndex( const bool isFortran, const std::string //write offset to pg in data on metadata which is the current absolute position MemcpyToBuffers( metadataBuffers, metadataPositions, dataAbsolutePositions, 8 ); - //get pg index length std::vector<std::uint16_t> metadataIndexLengths( metadataPositions.size() ); for( unsigned int i = 0; i < metadataPositions.size(); ++i ) @@ -73,18 +152,9 @@ void BP1Writer::WriteProcessGroupIndex( const bool isFortran, const std::string MovePositions( -2, metadataLengthPositions ); //back to original position } - - -void BP1Writer::Close( const BP1MetadataSet& metadataSet, Capsule& capsule, Transport& transport ) -{ - - - -} - //PRIVATE FUNCTIONS void BP1Writer::WriteNameRecord( const std::string name, const std::uint16_t length, - std::vector<char*>& buffers, std::vector<std::size_t>& positions ) + std::vector<char*>& buffers, std::vector<std::size_t>& positions ) const noexcept { MemcpyToBuffers( buffers, positions, &length, 2 ); MemcpyToBuffers( buffers, positions, name.c_str( ), length ); @@ -95,7 +165,7 @@ void BP1Writer::WriteDimensionRecord( std::vector<char*>& buffers, std::vector<s const std::vector<std::size_t>& localDimensions, const std::vector<std::size_t>& globalDimensions, const std::vector<std::size_t>& globalOffsets, - const bool addType ) + const bool addType ) const noexcept { if( addType == true ) { @@ -124,9 +194,8 @@ void BP1Writer::WriteDimensionRecord( std::vector<char*>& buffers, std::vector<s void BP1Writer::WriteDimensionRecord( std::vector<char*>& buffers, std::vector<std::size_t>& positions, const std::vector<std::size_t>& localDimensions, const unsigned int skip, - const bool addType ) + const bool addType ) const noexcept { - if( addType == true ) { constexpr char no = 'n'; //dimension format unsigned int value (not using memberID for now) @@ -148,7 +217,7 @@ void BP1Writer::WriteDimensionRecord( std::vector<char*>& buffers, std::vector<s } -void BP1Writer::CloseRankFile( Capsule& capsule, Transport& transport ) +void BP1Writer::CloseRankFile( Capsule& capsule, Transport& transport ) const { } @@ -156,7 +225,7 @@ void BP1Writer::CloseRankFile( Capsule& capsule, Transport& transport ) -void BP1Writer::SetMiniFooter( BP1MetadataSet& metadataSet ) +void BP1Writer::SetMiniFooter( BP1MetadataSet& metadataSet ) const { @@ -164,7 +233,7 @@ void BP1Writer::SetMiniFooter( BP1MetadataSet& metadataSet ) } -void BP1Writer::SetMetadata( const BP1MetadataSet& metadataSet, Capsule& capsule ) +void BP1Writer::SetMetadata( const BP1MetadataSet& metadataSet, Capsule& capsule ) const { //setup metadata to capsule metadata buffer diff --git a/src/transport/FStream.cpp b/src/transport/FStream.cpp index 5c0e2ec68..c299a639e 100644 --- a/src/transport/FStream.cpp +++ b/src/transport/FStream.cpp @@ -78,71 +78,5 @@ void FStream::Close( ) } -//void CFStream::Write( const CVariable& variable ) ///this is aggregation -//{ -// //local buffer, to be send over MPI -// std::vector<char> buffer; -// const std::string type( variable.m_Type ); -// auto var = GetVariableValues( variable ); - -// if( type.find( "vector" ) ) //is a vector -// { -// //find total size first -// //auto values = variable.Get< std::vector<int> >(); -// auto values = GetVariableValues( variable ); -// //auto values = GetVariableValues( variable ); -// unsigned int sizeSum = 0; -// for( auto element : *values ) -// sizeSum += (int) std::log10( (double) std::abs( element ) ) + 1; -// -// buffer.reserve( 2*sizeSum ); -// -// for( auto element : *values ) -// { -// const char* elementChar = std::to_string( element ).c_str(); -// buffer.insert( buffer.end(), elementChar, elementChar + strlen( elementChar ) ); -// buffer.push_back(' '); -// } -// -// if( m_RankMPI == 0 ) -// { -// std::cout << "Writing to file " << m_StreamName << "\n"; -// -// m_FStream << "Hello from rank " << m_RankMPI << " : "; -// m_FStream.write( &buffer[0], buffer.size() ); -// m_FStream << "\n"; -// -// MPI_Status* status = NULL; -// -// for( int r = 1; r < m_SizeMPI; ++r ) -// { -// int bufferSize; -// MPI_Recv( &bufferSize, 1, MPI_INT, r, 0, m_MPIComm, status ); //receive from r the buffer size -// std::cout << "Getting from rank: " << r << " buffer size "<< bufferSize << "\n"; -// -// buffer.resize( bufferSize ); -// MPI_Recv( &buffer[0], bufferSize, MPI_CHAR, r, 1, m_MPIComm, status ); //receive from r the buffer -// -// m_FStream << "Hello from rank " << r << " : "; -// m_FStream.write( &buffer[0], bufferSize ); -// m_FStream << "\n"; -// } -// } -// else -// { -// int bufferSize = (int)buffer.size(); -// MPI_Send( &bufferSize, 1, MPI_INT, 0, 0, m_MPIComm ); //send to rank=0 the buffer size -// -// std::cout << "Hello from rank: " << m_RankMPI << "\n"; -// std::cout << "Buffer size: " << bufferSize << "\n"; -// -// MPI_Send( &buffer[0], bufferSize, MPI_CHAR, 0, 1, m_MPIComm ); //send to rank=0 the buffer -// } -// -// MPI_Barrier( m_MPIComm ); -// } -//} - - } //end namespace diff --git a/src/transport/File.cpp b/src/transport/File.cpp index ed8a4796f..4a5f9f3cb 100644 --- a/src/transport/File.cpp +++ b/src/transport/File.cpp @@ -57,7 +57,7 @@ void File::SetBuffer( char* buffer, std::size_t size ) { if( status == 1 ) throw std::ios_base::failure( "ERROR: could not set buffer in rank " - + std::to_string( m_MPIRank ) + "\n" ); + + std::to_string( m_RankMPI ) + "\n" ); } } -- GitLab