From 3de29d23d8d8c98e79b05e06ff8ca03943fddd51 Mon Sep 17 00:00:00 2001 From: wfg <wfg@pc0098504.ornl.gov> Date: Tue, 21 Mar 2017 18:13:21 -0400 Subject: [PATCH] Working on Advance and Aggregate class for Profiling info Updated README.md files --- README.md | 57 +++++++++++---- bindings/python/README.md | 30 ++++++-- bindings/python/include/EnginePy.h | 2 + bindings/python/src/EnginePy.cpp | 14 ++-- bindings/python/src/glueBoostPython.cpp | 3 +- bindings/python/src/gluePyBind11.cpp | 1 + include/core/Engine.h | 5 +- include/core/Profiler.h | 40 +++++++---- include/core/Transport.h | 3 - include/engine/bp/BPFileWriter.h | 11 ++- include/format/BP1.h | 4 ++ include/format/BP1Aggregator.h | 28 ++++++++ include/format/BP1Writer.h | 17 +++-- include/mpidummy.h | 3 + include/transport/file/FileDescriptor.h | 4 +- src/core/Engine.cpp | 1 + src/engine/bp/BPFileWriter.cpp | 52 +++++++++++--- src/format/BP1Aggregator.cpp | 95 +++++++++++++++++++++++++ src/format/BP1Writer.cpp | 72 ++++++++++++++++--- src/mpidummy.cpp | 7 ++ src/transport/file/FileDescriptor.cpp | 4 +- src/transport/file/FilePointer.cpp | 2 + 22 files changed, 381 insertions(+), 74 deletions(-) create mode 100644 src/format/BP1Aggregator.cpp diff --git a/README.md b/README.md index adf4da6d4..828793600 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,46 @@ # ADIOSPP -Next generation ADIOS in C++11/14 for exascale computations. -Read ./doc/CodingGuidelines first if you are a developer. -Doxygen documentation can be generated running doxygen under ./doc, a ./doc/html directory will be created - -Requirements: -1) C++11 compiler (e.g. gnu gcc 4.8.x and above) in PATH, default is g++ -2) MPI compiler (e.g. openmpi, mpich2 ) in PATH, default is mpic++ - -make -> build ./lib/libadios.a and ./lib/libadios_nompi.a -make mpi -> build ./lib/libadios.a with truly MPI code (from mpi.h) using mpic++ -make nompi -> build ./lib/libadios_nompi.a with serial (dummy MPI) code only calling mpiduumy.h using g++ (C++11) - -For examples, start with examples/hello/writer/helloWriter_OOP.cpp, build as above after ADIOS library is built \ No newline at end of file + Next generation ADIOS2.0 in C++11 for exascale computations. + Read ./doc/CodingGuidelines first if you are a developer. + Doxygen documentation can be generated running doxygen under ./doc, a ./doc/html directory will be created + + Requirements: + 1) C++11 compiler (e.g. gnu gcc 4.8.x and above) in PATH, default is g++ + 2) MPI compiler (e.g. openmpi, mpich2 ) in PATH, default is mpic++ + + MULTICORE BUILD MODES: ( use -jn where n: number of cores) + + Basic modes; + build ./lib/libadios.a and ./lib/libadios_nompi.a + make -jn + e.g.: make -j4 + + build ./lib/libadios.so and ./lib/libadios_nompi.so + make -jn + e.g.: make -j4 + + build ./lib/libadios.a (.so) with truly MPI code (from mpi.h) using mpic++ + make mpi -jn + e.g: make mpi -j4 + + build ./lib/libadios_nompi.a with serial (dummy MPI) code only calling mpidummy.h using g++ (C++11) + make nompi -jn + e.g: make nompi -j4 + + Additional modes: + + For shared library mode add the SHARED=yes option to any build: + build ./lib/libadios.so and ./lib/libadios_nompi.so + make -jn SHARED=yes + e.g.: make -j4 SHARED=yes + + build with DataMan library (https://github.com/JasonRuonanWang/DataMan) for WAN transport + Make sure DataMan path is correct in Makefile.libs add HAVE_DATAMAN=yes flag + e.g.: make -j4 HAVE_DATAMAN=yes + + + For examples, start with examples/hello/bpWriter/, build as above after ADIOS basic modes library is built + cd examples/hello/bpWriter/ + make -j4 + mpirun -n 4 ./helloBPWriter.exe -> generates myDoubles.bp directory + ./helloBPWriter_nompi.exe -> generates myDoubles_nompi.bp directory \ No newline at end of file diff --git a/bindings/python/README.md b/bindings/python/README.md index dcee25ef0..b94db28b5 100644 --- a/bindings/python/README.md +++ b/bindings/python/README.md @@ -1,12 +1,28 @@ Python bindings for ADIOS 2.0 -For Boost Python use Makefile.BoostPython -Requirements: -Boost.Python version 1.63 -Don't forget to add boost python libraries path in LD_LIBRARY_PATH +Dependencies: + + Python 2.7.x + Numpy and MPI4PY + ADIOS shared library (libadios.so) built with : make SHARED=yes (see ADIOS README.md) + Add boost python and libadios.so libraries path in LD_LIBRARY_PATH or DYLD_LIBRARY_PATH +For Boost Python use: + make -jn , where n is the number of cores + e.g. + make -j4 + make -j8 -For PyBind11 use Makefile.PyBind11 -Requirements: -PyBind11 version 2.0.1 + Additional Requirements: + Boost.Python version 1.63 (latest) + + +For PyBind11 use: + make -jn HAVE_PYBIND11=yes, where n is the number of cores + e.g. + make -j4 HAVE_PYBIND11=yes + make -j8 HAVE_PYBIND11=yes + + Requirements: + PyBind11 version 2.0.1 or master from github (header only library, no need to compile) diff --git a/bindings/python/include/EnginePy.h b/bindings/python/include/EnginePy.h index 23bf36c3f..9955d7b64 100644 --- a/bindings/python/include/EnginePy.h +++ b/bindings/python/include/EnginePy.h @@ -49,6 +49,8 @@ public: void WritePy( VariablePy& variable, const pyArray& array ); + void Advance( ); + void Close( ); void GetEngineType( ) const; diff --git a/bindings/python/src/EnginePy.cpp b/bindings/python/src/EnginePy.cpp index e29b37f6c..3bb80cdd3 100644 --- a/bindings/python/src/EnginePy.cpp +++ b/bindings/python/src/EnginePy.cpp @@ -66,18 +66,24 @@ void EnginePy::WritePy( VariablePy& variable, const pyArray& array ) } - - -void EnginePy::GetEngineType( ) const +void EnginePy::Advance( ) { - std::cout << "Engine type " << m_Engine->m_EngineType << "\n"; + } + + void EnginePy::Close( ) { m_Engine->Close( -1 ); } +void EnginePy::GetEngineType( ) const +{ + std::cout << "Engine type " << m_Engine->m_EngineType << "\n"; +} + + } //end namespace diff --git a/bindings/python/src/glueBoostPython.cpp b/bindings/python/src/glueBoostPython.cpp index 4a94464aa..2adc5af2d 100644 --- a/bindings/python/src/glueBoostPython.cpp +++ b/bindings/python/src/glueBoostPython.cpp @@ -68,7 +68,8 @@ BOOST_PYTHON_MODULE( ADIOSPy ) //Engine py::class_<adios::EnginePy>("EnginePy", py::no_init ) .def("Write", &adios::EnginePy::WritePy ) - .def( "Close", &adios::EnginePy::Close ) + .def("Advance", &adios::EnginePy::WritePy ) + .def("Close", &adios::EnginePy::Close ) ; } diff --git a/bindings/python/src/gluePyBind11.cpp b/bindings/python/src/gluePyBind11.cpp index ccd727ce6..d7ee0456c 100644 --- a/bindings/python/src/gluePyBind11.cpp +++ b/bindings/python/src/gluePyBind11.cpp @@ -55,6 +55,7 @@ PYBIND11_PLUGIN( ADIOSPy ) //Engine py::class_<adios::EnginePy>( m, "Engine") .def("Write", &adios::EnginePy::WritePy ) + .def("Advance", &adios::EnginePy::WritePy ) .def("Close", &adios::EnginePy::Close ) ; diff --git a/include/core/Engine.h b/include/core/Engine.h index 05839734c..7c6016567 100644 --- a/include/core/Engine.h +++ b/include/core/Engine.h @@ -172,7 +172,7 @@ public: virtual void Write( const std::string variableName, const std::complex<float>* values ); virtual void Write( const std::string variableName, const std::complex<double>* values ); virtual void Write( const std::string variableName, const std::complex<long double>* values ); - + virtual void Write( const std::string variableName, const void* values ); /** * Indicates that a new step is going to be written as new variables come in. @@ -218,9 +218,6 @@ protected: const bool m_DebugMode = false; ///< true: additional checks, false: by-pass checks unsigned int m_Cores = 1; const std::string m_EndMessage; ///< added to exceptions to improve debugging - - Profiler m_Profiler; ///< engine time and bytes profiler - std::set<std::string> m_WrittenVariables; ///< contains the names of the variables that are being written virtual void Init( ); ///< Initialize m_Capsules and m_Transports, called from constructor diff --git a/include/core/Profiler.h b/include/core/Profiler.h index d8888fd9d..ca446ef3a 100644 --- a/include/core/Profiler.h +++ b/include/core/Profiler.h @@ -15,21 +15,16 @@ namespace adios { -/** - * Utilities for profiling using the chrono header in C++11 - */ -struct Profiler +class Timer { - class Timer - { - public: - const std::string Process; - unsigned long long int ProcessTime = 0; +public: + const std::string Process; + unsigned long long int ProcessTime = 0; - Timer( const std::string process, const Support::Resolutions resolution ): - Process{ process }, - Resolution{ resolution } + Timer( const std::string process, const Support::Resolutions resolution ): + Process{ process }, + Resolution{ resolution } { } void SetInitialTime( ) @@ -63,14 +58,31 @@ struct Profiler return -1; //failure } - private: + std::string GetUnits( ) const + { + std::string units; + if( Resolution == Support::Resolutions::mus ) units = "mus"; + else if( Resolution == Support::Resolutions::ms ) units = "ms"; + else if( Resolution == Support::Resolutions::s ) units = "s"; + else if( Resolution == Support::Resolutions::m ) units = "m"; + else if( Resolution == Support::Resolutions::h ) units = "h"; + return units; + } + +private: const Support::Resolutions Resolution; std::chrono::time_point<std::chrono::high_resolution_clock> InitialTime; std::chrono::time_point<std::chrono::high_resolution_clock> ElapsedTime; bool InitialTimeSet = false; - }; +}; + +/** + * Utilities for profiling using the chrono header in C++11 + */ +struct Profiler +{ std::vector<Timer> m_Timers; std::vector<unsigned long long int> m_TotalBytes; bool m_IsActive = false; diff --git a/include/core/Transport.h b/include/core/Transport.h index 532b286fa..dc6409e6e 100644 --- a/include/core/Transport.h +++ b/include/core/Transport.h @@ -83,9 +83,6 @@ public: protected: const bool m_DebugMode = false; ///< if true: additional checks and exceptions - - - }; diff --git a/include/engine/bp/BPFileWriter.h b/include/engine/bp/BPFileWriter.h index af45aa4b0..969d54b19 100644 --- a/include/engine/bp/BPFileWriter.h +++ b/include/engine/bp/BPFileWriter.h @@ -10,6 +10,7 @@ #include "core/Engine.h" #include "format/BP1Writer.h" +#include "format/BP1Aggregator.h" //supported capsules #include "capsule/heap/STLVector.h" @@ -72,8 +73,6 @@ public: void Write( const std::string variableName, const std::complex<long double>* values ); void Write( const std::string variableName, const void* values ); - - void Advance( ); /** @@ -88,12 +87,14 @@ private: capsule::STLVector m_Buffer; ///< heap capsule using STL std::vector<char> format::BP1Writer m_BP1Writer; ///< format object will provide the required BP functionality to be applied on m_Buffer and m_Transports format::BP1MetadataSet m_MetadataSet; ///< metadata set accompanying the heap buffer data in bp format. Needed by m_BP1Writer + format::BP1Aggregator m_BP1Aggregator; bool m_IsFirstClose = true; ///< set to false after first Close is reached so metadata doesn't have to be accommodated for a subsequent Close std::size_t m_MaxBufferSize; ///< maximum allowed memory to be allocated float m_GrowthFactor = 1.5; ///< capsule memory growth factor, new_memory = m_GrowthFactor * current_memory bool m_TransportFlush = false; ///< true: transport flush happened, buffer must be reset + bool m_CloseProcessGroup = false; ///< set to true if advance is called, this prevents flattening the data and metadata in Close void Init( ); void InitParameters( ); @@ -111,6 +112,9 @@ private: template< class T > void WriteVariableCommon( Variable<T>& variable, const T* values ) { + if( m_MetadataSet.Log.m_IsActive == true ) + m_MetadataSet.Log.m_Timers[0].SetInitialTime(); + //set variable variable.m_AppValues = values; m_WrittenVariables.insert( variable.m_Name ); @@ -139,6 +143,9 @@ private: } variable.m_AppValues = nullptr; //setting pointer to null as not needed after write + + if( m_MetadataSet.Log.m_IsActive == true ) + m_MetadataSet.Log.m_Timers[0].SetTime(); } }; diff --git a/include/format/BP1.h b/include/format/BP1.h index 0019913e8..2e551ecec 100644 --- a/include/format/BP1.h +++ b/include/format/BP1.h @@ -19,6 +19,7 @@ #endif #include "core/Transport.h" +#include "core/Profiler.h" namespace adios @@ -51,6 +52,9 @@ struct BP1MetadataSet //PG (relative) positions in Data buffer to be updated std::size_t DataPGLengthPosition = 0; ///< current PG initial ( relative ) position, needs to be updated in every advance step or init std::size_t DataVarsCountPosition = 0; ///< current PG variable count ( relative ) position, needs to be updated in every advance step or init + bool DataPGIsOpen = false; + + Profiler Log; }; /** diff --git a/include/format/BP1Aggregator.h b/include/format/BP1Aggregator.h index 481738200..60f1a8efa 100644 --- a/include/format/BP1Aggregator.h +++ b/include/format/BP1Aggregator.h @@ -10,20 +10,48 @@ +#ifdef ADIOS_NOMPI + #include "mpidummy.h" +#else + #include <mpi.h> +#endif + + namespace adios { namespace format { +/** + * Does all MPI related spatial aggregation tasks + */ class BP1Aggregator { +public: + + MPI_Comm m_MPIComm = MPI_COMM_SELF; ///< MPI communicator from Engine + int m_RankMPI = 0; ///< current MPI rank process + int m_SizeMPI = 1; ///< current MPI processes size + /** + * Unique constructor + * @param mpiComm coming from engine + */ + BP1Aggregator( MPI_Comm mpiComm, const bool debugMode = false ); + ~BP1Aggregator( ); + /** + * Function that aggregates and writes (from rank = 0) profiling.log in python dictionary format + * @param rankLog contain rank profiling info to be aggregated + */ + void WriteProfilingLog( const std::string fileName, const std::string& rankLog ); +private: + const bool m_DebugMode = false; }; diff --git a/include/format/BP1Writer.h b/include/format/BP1Writer.h index 51fbd6d1d..32521f498 100644 --- a/include/format/BP1Writer.h +++ b/include/format/BP1Writer.h @@ -19,6 +19,7 @@ #include "BP1.h" #include "core/Variable.h" #include "core/Capsule.h" +#include "core/Profiler.h" #include "capsule/heap/STLVector.h" #include "functions/adiosTemplates.h" #include "functions/adiosFunctions.h" @@ -326,19 +327,27 @@ public: void Advance( BP1MetadataSet& metadataSet, capsule::STLVector& buffer ); - /** * Function that sets metadata (if first close) and writes to a single transport * @param metadataSet current rank metadata set * @param buffer contains data * @param transport does a write after data and metadata is setup * @param isFirstClose true: metadata has been set and aggregated - * @param haveMetadata true: attach metadata buffer to each data buffer and do a transport write - * @param haveTiming true: add timing.log file + * @param doAggregation true: for N-to-M, false: for N-to-N */ void Close( BP1MetadataSet& metadataSet, capsule::STLVector& buffer, Transport& transport, bool& isFirstClose, - const bool haveMetadata = true, const bool haveTiming = false ) const noexcept; + const bool doAggregation ) const noexcept; + + /** + * Writes the ADIOS log information (buffering, open, write and close) for a rank process + * @param rank current rank + * @param metadataSet contains Profile info for buffering + * @param transports contains Profile info for transport open, writes and close + * @return string for this rank that will be aggregated into profiling.log + */ + std::string GetRankProfilingLog( const int rank, const BP1MetadataSet& metadataSet, + const std::vector< std::shared_ptr<Transport> >& transports ) const noexcept; private: diff --git a/include/mpidummy.h b/include/mpidummy.h index b287c43f8..619a50ad9 100644 --- a/include/mpidummy.h +++ b/include/mpidummy.h @@ -30,6 +30,7 @@ namespace adios typedef int MPI_Comm; typedef uint64_t MPI_Status; +typedef uint64_t MPI_Request; typedef int MPI_File; typedef int MPI_Info; typedef int MPI_Datatype; /* Store the byte size of a type in such vars */ @@ -87,7 +88,9 @@ int MPI_Scatter(void *sendbuf, int sendcnt, MPI_Datatype sendtype, void *recvbuf int MPI_Scatterv(void *sendbuf, int *sendcnts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcnt, MPI_Datatype recvtype, int root, MPI_Comm comm); int MPI_Recv( void *recvbuffer, int count, MPI_Datatype type, int source, int tag, MPI_Comm comm, MPI_Status* status ); +int MPI_Irecv( void *recvbuffer, int count, MPI_Datatype type, int source, int tag, MPI_Comm comm, MPI_Request* request ); int MPI_Send( void *sendbuffer, int count, MPI_Datatype type, int destination, int tag, MPI_Comm comm ); +int MPI_Isend( void *recvbuffer, int count, MPI_Datatype type, int source, int tag, MPI_Comm comm, MPI_Request* request ); int MPI_File_open(MPI_Comm comm, char *filename, int amode, MPI_Info info, MPI_File *fh); int MPI_File_close(MPI_File *fh); diff --git a/include/transport/file/FileDescriptor.h b/include/transport/file/FileDescriptor.h index 5555c450c..0e199659c 100644 --- a/include/transport/file/FileDescriptor.h +++ b/include/transport/file/FileDescriptor.h @@ -1,5 +1,5 @@ /* - * FileDescriptor.h + * FileDescriptor.h uses POSIX as the underlying library * * Created on: Oct 6, 2016 * Author: wfg @@ -35,7 +35,6 @@ public: void Close( ); - private: int m_FileDescriptor = -1; ///< file descriptor returned by POSIX open @@ -44,7 +43,6 @@ private: - } //end namespace transport } //end namespace #endif /* FILEDESCRIPTOR_H_ */ diff --git a/src/core/Engine.cpp b/src/core/Engine.cpp index 76445c1d5..feb562b92 100644 --- a/src/core/Engine.cpp +++ b/src/core/Engine.cpp @@ -80,6 +80,7 @@ void Engine::Write( const std::string variableName, const long double* values ){ void Engine::Write( const std::string variableName, const std::complex<float>* values ){ } void Engine::Write( const std::string variableName, const std::complex<double>* values ){ } void Engine::Write( const std::string variableName, const std::complex<long double>* values ){ } +void Engine::Write( const std::string variableName, const void* values ){ } void Engine::Advance(){ } diff --git a/src/engine/bp/BPFileWriter.cpp b/src/engine/bp/BPFileWriter.cpp index e70a72e57..85fce4ace 100644 --- a/src/engine/bp/BPFileWriter.cpp +++ b/src/engine/bp/BPFileWriter.cpp @@ -21,6 +21,7 @@ BPFileWriter::BPFileWriter( ADIOS& adios, const std::string name, const std::str const Method& method, const bool debugMode, const unsigned int cores ): Engine( adios, "BPFileWriter", name, accessMode, mpiComm, method, debugMode, cores, " BPFileWriter constructor (or call to ADIOS Open).\n" ), m_Buffer{ capsule::STLVector( accessMode, m_RankMPI, m_DebugMode ) }, + m_BP1Aggregator{ format::BP1Aggregator( m_MPIComm, debugMode ) }, m_MaxBufferSize{ m_Buffer.m_Data.max_size() } { Init( ); @@ -138,7 +139,7 @@ void BPFileWriter::Write( const std::string variableName, const std::complex<dou void BPFileWriter::Write( const std::string variableName, const std::complex<long double>* values ) { WriteVariableCommon( m_ADIOS.GetVariable<std::complex<long double>>( variableName ), values ); } -void BPFileWriter::Write( const std::string variableName, const void* values ) +void BPFileWriter::Write( const std::string variableName, const void* values ) //Compound type { } @@ -154,12 +155,33 @@ void BPFileWriter::Close( const int transportIndex ) if( transportIndex == -1 ) { for( auto& transport : m_Transports ) //by reference or value or it doesn't matter? - m_BP1Writer.Close( m_MetadataSet, m_Buffer, *transport, m_IsFirstClose ); + m_BP1Writer.Close( m_MetadataSet, m_Buffer, *transport, m_IsFirstClose, false ); //false: not using aggregation for now } else { - m_BP1Writer.Close( m_MetadataSet, m_Buffer, *m_Transports[transportIndex], m_IsFirstClose ); + m_BP1Writer.Close( m_MetadataSet, m_Buffer, *m_Transports[transportIndex], m_IsFirstClose, false ); //false: not using aggregation for now } + + if( m_MetadataSet.Log.m_IsActive == true ) + { + bool allClose = true; + for( auto& transport : m_Transports ) + { + if( transport->m_IsOpen == true ) + { + allClose = false; + break; + } + } + if( allClose == true ) //aggregate and write profiling.log + { + const std::string rankLog = m_BP1Writer.GetRankProfilingLog( m_RankMPI, m_MetadataSet, m_Transports ); + + + + } + } + } @@ -204,30 +226,32 @@ void BPFileWriter::InitParameters( ) m_BP1Writer.m_Verbosity = verbosity; } - auto itProfile = m_Method.m_Parameters.find( "profile_buffering_units" ); + auto itProfile = m_Method.m_Parameters.find( "profile_units" ); if( itProfile != m_Method.m_Parameters.end() ) { + auto& profiler = m_MetadataSet.Log; + if( itProfile->second == "mus" || itProfile->second == "microseconds" ) - m_Profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::mus ); + profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::mus ); else if( itProfile->second == "ms" || itProfile->second == "milliseconds" ) - m_Profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::ms ); + profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::ms ); else if( itProfile->second == "s" || itProfile->second == "seconds" ) - m_Profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::s ); + profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::s ); else if( itProfile->second == "min" || itProfile->second == "minutes" ) - m_Profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::m ); + profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::m ); else if( itProfile->second == "h" || itProfile->second == "hours" ) - m_Profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::h ); + profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::h ); else { if( m_DebugMode == true ) throw std::invalid_argument( "ERROR: Method profile_buffering_units argument must be mus, ms, s, min or h, in call to Open or Engine constructor\n" ); } - m_Profiler.m_IsActive = true; + profiler.m_IsActive = true; } } @@ -327,11 +351,19 @@ void BPFileWriter::InitTransports( ) void BPFileWriter::InitProcessGroup( ) { + if( m_MetadataSet.Log.m_IsActive == true ) + m_MetadataSet.Log.m_Timers[0].SetInitialTime(); + if( m_AccessMode == "a" ) { //Get last pg timestep and update timestep counter in format::BP1MetadataSet } + WriteProcessGroupIndex( ); + m_MetadataSet.DataPGIsOpen = true; + + if( m_MetadataSet.Log.m_IsActive == true ) + m_MetadataSet.Log.m_Timers[0].SetTime(); } diff --git a/src/format/BP1Aggregator.cpp b/src/format/BP1Aggregator.cpp new file mode 100644 index 000000000..bb2328ff3 --- /dev/null +++ b/src/format/BP1Aggregator.cpp @@ -0,0 +1,95 @@ +/* + * BP1Aggregator.cpp + * + * Created on: Mar 21, 2017 + * Author: wfg + */ + +#include <vector> +#include <fstream> + +#include "format/BP1Aggregator.h" + + +namespace adios +{ +namespace format +{ + + +BP1Aggregator::BP1Aggregator( MPI_Comm mpiComm, const bool debugMode ): + m_MPIComm{ mpiComm }, + m_DebugMode{ debugMode } +{ + MPI_Comm_rank( m_MPIComm, &m_RankMPI ); + MPI_Comm_size( m_MPIComm, &m_SizeMPI ); +} + + +BP1Aggregator::~BP1Aggregator( ) +{ } + + + +void BP1Aggregator::WriteProfilingLog( const std::string fileName, const std::string& rankLog ) +{ + if( m_RankMPI == 0 ) + { + std::vector< std::vector<char> > rankLogs( m_SizeMPI - 1 ); //rankLogs from other processes + + //first receive sizes + unsigned int sizeMPI = static_cast<unsigned int>( m_SizeMPI ); + + for( unsigned int i = 1; i < sizeMPI; ++i ) + { + int size = -1; + MPI_Request request; + MPI_Irecv( &size, 1, MPI_INT, i, 0, m_MPIComm, &request ); + + if( m_DebugMode == true ) + { + if( size == -1 ) + throw std::runtime_error( "ERROR: couldn't get size from rank " + std::to_string(i) + ", in ADIOS aggregator for Profiling.log\n" ); + //here check request + } + rankLogs[i-1].resize( size ); //allocate with zeros + } + //receive rankLog from other ranks + for( unsigned int i = 1; i < sizeMPI; ++i ) + { + int size = static_cast<int>( rankLogs[i-1].size() ); + MPI_Request request; + MPI_Irecv( rankLogs[i-1].data(), size, MPI_CHAR, i, 0, m_MPIComm, &request ); + + if( m_DebugMode == true ) + { + //here check request + } + } + //write file + std::string logFile( "log = { " + rankLog + "\n" ); + for( unsigned int i = 1; i < sizeMPI; ++i ) + { + std::string rankLogStr( rankLogs[i-1].data() ); + logFile += rankLogStr + "\n"; + } + logFile += " }\n"; + + std::ofstream logStream( fileName ); + logStream.write( logFile.c_str(), logFile.size() ); + logStream.close(); + } + else + { + int size = static_cast<int>( rankLog.size() ); + MPI_Request request; + MPI_Isend( &size, 1, MPI_INT, 0, 0, m_MPIComm, &request ); + MPI_Isend( const_cast<char*>( rankLog.c_str() ), size, MPI_CHAR, 0, 0, m_MPIComm, &request ); + } +} + + + +} //end namespace format +} //end namespace adios + diff --git a/src/format/BP1Writer.cpp b/src/format/BP1Writer.cpp index 11af2e254..2ad022f86 100644 --- a/src/format/BP1Writer.cpp +++ b/src/format/BP1Writer.cpp @@ -5,8 +5,13 @@ * Author: wfg */ +/// \cond EXCLUDE_FROM_DOXYGEN +#include <string> +/// \endcond #include "format/BP1Writer.h" +#include "core/Profiler.h" + @@ -105,25 +110,70 @@ void BP1Writer::Advance( BP1MetadataSet& metadataSet, capsule::STLVector& buffer void BP1Writer::Close( BP1MetadataSet& metadataSet, capsule::STLVector& buffer, Transport& transport, bool& isFirstClose, - const bool haveMetadata, const bool haveTiming ) const noexcept + const bool doAggregation ) const noexcept { + if( metadataSet.Log.m_IsActive == true ) + metadataSet.Log.m_Timers[0].SetInitialTime(); + if( isFirstClose == true ) { - FlattenData( metadataSet, buffer ); + if( metadataSet.DataPGIsOpen == true ) + { + FlattenData( metadataSet, buffer ); + } + FlattenMetadata( metadataSet, buffer ); + + if( metadataSet.Log.m_IsActive == true ) + metadataSet.Log.m_Timers[0].SetInitialTime(); + + if( doAggregation == true ) //N-to-M where 1 <= M <= N-1, might need a new Log metadataSet.Log.m_Timers just for aggregation + { + //here call aggregator + } + isFirstClose = false; } - //implementing N-to-N for now, no aggregation - transport.Write( buffer.m_Data.data(), buffer.m_DataPosition ); //single write + if( doAggregation == true ) //N-to-M where 1 <= M <= N-1 + { + //here call aggregator to select transports for Write and Close + } + else // N-to-N + { + transport.Write( buffer.m_Data.data(), buffer.m_DataPosition ); //single write + transport.Close(); + } +} + + +std::string BP1Writer::GetRankProfilingLog( const int rank, const BP1MetadataSet& metadataSet, + const std::vector< std::shared_ptr<Transport> >& transports ) const noexcept +{ + auto lf_WriterTimer = []( const std::string name, const Timer& timer, std::string rankLog ) + { + rankLog += "'" + name + "_" + timer.GetUnits() + "': " + std::to_string( timer.ProcessTime ) + ", "; + }; + + //prepare string dictionary per rank + std::string rankLog( "'rank_" + std::to_string( rank ) + "': { " ); + auto& profiler = metadataSet.Log; + rankLog += "'totalBytes': " + std::to_string( profiler.m_TotalBytes[0] ) + ", "; + lf_WriterTimer( "t_buffering", profiler.m_Timers[0], rankLog ); - if( haveMetadata == true ) + for( unsigned int t = 0; t < transports.size(); ++t ) { - //here call aggregator - } + auto& timers = transports[t]->m_Profiler.m_Timers; - transport.Close(); + rankLog += "'transport_" + std::to_string(t) + "': { "; + lf_WriterTimer( "t_open", timers[0], rankLog ); + lf_WriterTimer( "t_write", timers[1], rankLog ); + lf_WriterTimer( "t_close", timers[2], rankLog ); + rankLog += " }, "; + } + rankLog += "}, "; + return rankLog; } @@ -259,6 +309,12 @@ void BP1Writer::FlattenMetadata( BP1MetadataSet& metadataSet, capsule::STLVector } buffer.m_DataAbsolutePosition += metadataSize; + + if( metadataSet.Log.m_IsActive == true ) + { + metadataSet.Log.m_TotalBytes.push_back( buffer.m_DataAbsolutePosition ); + } + } diff --git a/src/mpidummy.cpp b/src/mpidummy.cpp index ef00b2099..52fd646e4 100644 --- a/src/mpidummy.cpp +++ b/src/mpidummy.cpp @@ -168,9 +168,16 @@ int MPI_Scatterv( void *sendbuf, int *sendcnts, int *displs, int MPI_Recv( void *recvbuffer, int count, MPI_Datatype type, int source, int tag, MPI_Comm comm, MPI_Status* status ) { return 0; } +int MPI_Irecv( void *recvbuffer, int count, MPI_Datatype type, int source, int tag, MPI_Comm comm, MPI_Request* request ) +{ return 0; } + int MPI_Send( void *sendbuffer, int count, MPI_Datatype type, int destination, int tag, MPI_Comm comm ) { return 0; } +int MPI_Isend( void *recvbuffer, int count, MPI_Datatype type, int source, int tag, MPI_Comm comm, MPI_Request* request ) +{ return 0; } + + int MPI_File_open(MPI_Comm comm, char *filename, int amode, MPI_Info info, MPI_File *fh) { *fh = open64 (filename, amode); diff --git a/src/transport/file/FileDescriptor.cpp b/src/transport/file/FileDescriptor.cpp index cf90c9380..0128efeaf 100644 --- a/src/transport/file/FileDescriptor.cpp +++ b/src/transport/file/FileDescriptor.cpp @@ -59,7 +59,7 @@ void FileDescriptor::Open( const std::string name, const std::string accessMode if( m_Profiler.m_IsActive == true ) m_Profiler.m_Timers[0].SetInitialTime(); - m_FileDescriptor = open( m_Name.c_str(), O_WRONLY | O_APPEND ); + m_FileDescriptor = open( m_Name.c_str(), O_WRONLY | O_APPEND ); //we need to change this if( m_Profiler.m_IsActive == true ) m_Profiler.m_Timers[0].SetTime(); @@ -125,6 +125,8 @@ void FileDescriptor::Close( ) throw std::ios_base::failure( "ERROR: couldn't close file " + m_Name + ", in call to POSIX write\n" ); } + + m_IsOpen = false; } diff --git a/src/transport/file/FilePointer.cpp b/src/transport/file/FilePointer.cpp index ce3088ebb..9340664b3 100644 --- a/src/transport/file/FilePointer.cpp +++ b/src/transport/file/FilePointer.cpp @@ -89,6 +89,8 @@ void FilePointer::Flush( ) void FilePointer::Close( ) { fclose( m_File ); + + m_IsOpen = false; } -- GitLab