diff --git a/README.md b/README.md index adf4da6d499f5f164830a9bdaa5ab4ec51526f7b..8287936002d165ea408eb74ce7bab420b3e5d42b 100644 --- a/README.md +++ b/README.md @@ -1,15 +1,46 @@ # ADIOSPP -Next generation ADIOS in C++11/14 for exascale computations. -Read ./doc/CodingGuidelines first if you are a developer. -Doxygen documentation can be generated running doxygen under ./doc, a ./doc/html directory will be created - -Requirements: -1) C++11 compiler (e.g. gnu gcc 4.8.x and above) in PATH, default is g++ -2) MPI compiler (e.g. openmpi, mpich2 ) in PATH, default is mpic++ - -make -> build ./lib/libadios.a and ./lib/libadios_nompi.a -make mpi -> build ./lib/libadios.a with truly MPI code (from mpi.h) using mpic++ -make nompi -> build ./lib/libadios_nompi.a with serial (dummy MPI) code only calling mpiduumy.h using g++ (C++11) - -For examples, start with examples/hello/writer/helloWriter_OOP.cpp, build as above after ADIOS library is built \ No newline at end of file + Next generation ADIOS2.0 in C++11 for exascale computations. + Read ./doc/CodingGuidelines first if you are a developer. + Doxygen documentation can be generated running doxygen under ./doc, a ./doc/html directory will be created + + Requirements: + 1) C++11 compiler (e.g. gnu gcc 4.8.x and above) in PATH, default is g++ + 2) MPI compiler (e.g. openmpi, mpich2 ) in PATH, default is mpic++ + + MULTICORE BUILD MODES: ( use -jn where n: number of cores) + + Basic modes; + build ./lib/libadios.a and ./lib/libadios_nompi.a + make -jn + e.g.: make -j4 + + build ./lib/libadios.so and ./lib/libadios_nompi.so + make -jn + e.g.: make -j4 + + build ./lib/libadios.a (.so) with truly MPI code (from mpi.h) using mpic++ + make mpi -jn + e.g: make mpi -j4 + + build ./lib/libadios_nompi.a with serial (dummy MPI) code only calling mpidummy.h using g++ (C++11) + make nompi -jn + e.g: make nompi -j4 + + Additional modes: + + For shared library mode add the SHARED=yes option to any build: + build ./lib/libadios.so and ./lib/libadios_nompi.so + make -jn SHARED=yes + e.g.: make -j4 SHARED=yes + + build with DataMan library (https://github.com/JasonRuonanWang/DataMan) for WAN transport + Make sure DataMan path is correct in Makefile.libs add HAVE_DATAMAN=yes flag + e.g.: make -j4 HAVE_DATAMAN=yes + + + For examples, start with examples/hello/bpWriter/, build as above after ADIOS basic modes library is built + cd examples/hello/bpWriter/ + make -j4 + mpirun -n 4 ./helloBPWriter.exe -> generates myDoubles.bp directory + ./helloBPWriter_nompi.exe -> generates myDoubles_nompi.bp directory \ No newline at end of file diff --git a/bindings/python/README.md b/bindings/python/README.md index dcee25ef042caae1d2f339d2840600f0b35c3719..b94db28b5b90ead79adfbc5433d17cf5023c1373 100644 --- a/bindings/python/README.md +++ b/bindings/python/README.md @@ -1,12 +1,28 @@ Python bindings for ADIOS 2.0 -For Boost Python use Makefile.BoostPython -Requirements: -Boost.Python version 1.63 -Don't forget to add boost python libraries path in LD_LIBRARY_PATH +Dependencies: + + Python 2.7.x + Numpy and MPI4PY + ADIOS shared library (libadios.so) built with : make SHARED=yes (see ADIOS README.md) + Add boost python and libadios.so libraries path in LD_LIBRARY_PATH or DYLD_LIBRARY_PATH +For Boost Python use: + make -jn , where n is the number of cores + e.g. + make -j4 + make -j8 -For PyBind11 use Makefile.PyBind11 -Requirements: -PyBind11 version 2.0.1 + Additional Requirements: + Boost.Python version 1.63 (latest) + + +For PyBind11 use: + make -jn HAVE_PYBIND11=yes, where n is the number of cores + e.g. + make -j4 HAVE_PYBIND11=yes + make -j8 HAVE_PYBIND11=yes + + Requirements: + PyBind11 version 2.0.1 or master from github (header only library, no need to compile) diff --git a/bindings/python/include/EnginePy.h b/bindings/python/include/EnginePy.h index 23bf36c3fe33dea23ceb21dc53dc38da8dcc7380..9955d7b64b4331c85e10f77a49c28fd90d35a885 100644 --- a/bindings/python/include/EnginePy.h +++ b/bindings/python/include/EnginePy.h @@ -49,6 +49,8 @@ public: void WritePy( VariablePy& variable, const pyArray& array ); + void Advance( ); + void Close( ); void GetEngineType( ) const; diff --git a/bindings/python/src/EnginePy.cpp b/bindings/python/src/EnginePy.cpp index e29b37f6ce8a7b4d5bc00457fb3f0140c8569037..3bb80cdd31b78054c57b9ddc1f6adc7ebc7fd7f8 100644 --- a/bindings/python/src/EnginePy.cpp +++ b/bindings/python/src/EnginePy.cpp @@ -66,18 +66,24 @@ void EnginePy::WritePy( VariablePy& variable, const pyArray& array ) } - - -void EnginePy::GetEngineType( ) const +void EnginePy::Advance( ) { - std::cout << "Engine type " << m_Engine->m_EngineType << "\n"; + } + + void EnginePy::Close( ) { m_Engine->Close( -1 ); } +void EnginePy::GetEngineType( ) const +{ + std::cout << "Engine type " << m_Engine->m_EngineType << "\n"; +} + + } //end namespace diff --git a/bindings/python/src/glueBoostPython.cpp b/bindings/python/src/glueBoostPython.cpp index 4a94464aa3557da29aef5114954be713df55f205..2adc5af2d7f83b27e4ee4e831966a9eac4f7fb11 100644 --- a/bindings/python/src/glueBoostPython.cpp +++ b/bindings/python/src/glueBoostPython.cpp @@ -68,7 +68,8 @@ BOOST_PYTHON_MODULE( ADIOSPy ) //Engine py::class_<adios::EnginePy>("EnginePy", py::no_init ) .def("Write", &adios::EnginePy::WritePy ) - .def( "Close", &adios::EnginePy::Close ) + .def("Advance", &adios::EnginePy::WritePy ) + .def("Close", &adios::EnginePy::Close ) ; } diff --git a/bindings/python/src/gluePyBind11.cpp b/bindings/python/src/gluePyBind11.cpp index ccd727ce6479a9b644697b1f760bc9a7374f5aec..d7ee0456c69307d2ae3cff53255642209fa48e46 100644 --- a/bindings/python/src/gluePyBind11.cpp +++ b/bindings/python/src/gluePyBind11.cpp @@ -55,6 +55,7 @@ PYBIND11_PLUGIN( ADIOSPy ) //Engine py::class_<adios::EnginePy>( m, "Engine") .def("Write", &adios::EnginePy::WritePy ) + .def("Advance", &adios::EnginePy::WritePy ) .def("Close", &adios::EnginePy::Close ) ; diff --git a/include/core/Engine.h b/include/core/Engine.h index 05839734c18c332eb794a6d1d2c2e08ac4e37988..7c6016567cc2b10aad70b0054d538ccb2285d1e3 100644 --- a/include/core/Engine.h +++ b/include/core/Engine.h @@ -172,7 +172,7 @@ public: virtual void Write( const std::string variableName, const std::complex<float>* values ); virtual void Write( const std::string variableName, const std::complex<double>* values ); virtual void Write( const std::string variableName, const std::complex<long double>* values ); - + virtual void Write( const std::string variableName, const void* values ); /** * Indicates that a new step is going to be written as new variables come in. @@ -218,9 +218,6 @@ protected: const bool m_DebugMode = false; ///< true: additional checks, false: by-pass checks unsigned int m_Cores = 1; const std::string m_EndMessage; ///< added to exceptions to improve debugging - - Profiler m_Profiler; ///< engine time and bytes profiler - std::set<std::string> m_WrittenVariables; ///< contains the names of the variables that are being written virtual void Init( ); ///< Initialize m_Capsules and m_Transports, called from constructor diff --git a/include/core/Profiler.h b/include/core/Profiler.h index d8888fd9d855ea33d89366533982116ba38a86b5..ca446ef3a145fc0b0f9e328e1a3f780128efa494 100644 --- a/include/core/Profiler.h +++ b/include/core/Profiler.h @@ -15,21 +15,16 @@ namespace adios { -/** - * Utilities for profiling using the chrono header in C++11 - */ -struct Profiler +class Timer { - class Timer - { - public: - const std::string Process; - unsigned long long int ProcessTime = 0; +public: + const std::string Process; + unsigned long long int ProcessTime = 0; - Timer( const std::string process, const Support::Resolutions resolution ): - Process{ process }, - Resolution{ resolution } + Timer( const std::string process, const Support::Resolutions resolution ): + Process{ process }, + Resolution{ resolution } { } void SetInitialTime( ) @@ -63,14 +58,31 @@ struct Profiler return -1; //failure } - private: + std::string GetUnits( ) const + { + std::string units; + if( Resolution == Support::Resolutions::mus ) units = "mus"; + else if( Resolution == Support::Resolutions::ms ) units = "ms"; + else if( Resolution == Support::Resolutions::s ) units = "s"; + else if( Resolution == Support::Resolutions::m ) units = "m"; + else if( Resolution == Support::Resolutions::h ) units = "h"; + return units; + } + +private: const Support::Resolutions Resolution; std::chrono::time_point<std::chrono::high_resolution_clock> InitialTime; std::chrono::time_point<std::chrono::high_resolution_clock> ElapsedTime; bool InitialTimeSet = false; - }; +}; + +/** + * Utilities for profiling using the chrono header in C++11 + */ +struct Profiler +{ std::vector<Timer> m_Timers; std::vector<unsigned long long int> m_TotalBytes; bool m_IsActive = false; diff --git a/include/core/Transport.h b/include/core/Transport.h index 532b286fa3936c86406080ef78fa7c58c729d427..dc6409e6eb9ae9f8ad984cc038ee10d77dad8957 100644 --- a/include/core/Transport.h +++ b/include/core/Transport.h @@ -83,9 +83,6 @@ public: protected: const bool m_DebugMode = false; ///< if true: additional checks and exceptions - - - }; diff --git a/include/engine/bp/BPFileWriter.h b/include/engine/bp/BPFileWriter.h index af45aa4b09c77e0c89c9068931eb7b11e50c1307..969d54b191d5d27d9d47de986d09d617cd388bc8 100644 --- a/include/engine/bp/BPFileWriter.h +++ b/include/engine/bp/BPFileWriter.h @@ -10,6 +10,7 @@ #include "core/Engine.h" #include "format/BP1Writer.h" +#include "format/BP1Aggregator.h" //supported capsules #include "capsule/heap/STLVector.h" @@ -72,8 +73,6 @@ public: void Write( const std::string variableName, const std::complex<long double>* values ); void Write( const std::string variableName, const void* values ); - - void Advance( ); /** @@ -88,12 +87,14 @@ private: capsule::STLVector m_Buffer; ///< heap capsule using STL std::vector<char> format::BP1Writer m_BP1Writer; ///< format object will provide the required BP functionality to be applied on m_Buffer and m_Transports format::BP1MetadataSet m_MetadataSet; ///< metadata set accompanying the heap buffer data in bp format. Needed by m_BP1Writer + format::BP1Aggregator m_BP1Aggregator; bool m_IsFirstClose = true; ///< set to false after first Close is reached so metadata doesn't have to be accommodated for a subsequent Close std::size_t m_MaxBufferSize; ///< maximum allowed memory to be allocated float m_GrowthFactor = 1.5; ///< capsule memory growth factor, new_memory = m_GrowthFactor * current_memory bool m_TransportFlush = false; ///< true: transport flush happened, buffer must be reset + bool m_CloseProcessGroup = false; ///< set to true if advance is called, this prevents flattening the data and metadata in Close void Init( ); void InitParameters( ); @@ -111,6 +112,9 @@ private: template< class T > void WriteVariableCommon( Variable<T>& variable, const T* values ) { + if( m_MetadataSet.Log.m_IsActive == true ) + m_MetadataSet.Log.m_Timers[0].SetInitialTime(); + //set variable variable.m_AppValues = values; m_WrittenVariables.insert( variable.m_Name ); @@ -139,6 +143,9 @@ private: } variable.m_AppValues = nullptr; //setting pointer to null as not needed after write + + if( m_MetadataSet.Log.m_IsActive == true ) + m_MetadataSet.Log.m_Timers[0].SetTime(); } }; diff --git a/include/format/BP1.h b/include/format/BP1.h index 0019913e888e1da3ddbb1a1992819b4114749eda..2e551ecec359039b984a8f4ff3259bf0eaaa1b8d 100644 --- a/include/format/BP1.h +++ b/include/format/BP1.h @@ -19,6 +19,7 @@ #endif #include "core/Transport.h" +#include "core/Profiler.h" namespace adios @@ -51,6 +52,9 @@ struct BP1MetadataSet //PG (relative) positions in Data buffer to be updated std::size_t DataPGLengthPosition = 0; ///< current PG initial ( relative ) position, needs to be updated in every advance step or init std::size_t DataVarsCountPosition = 0; ///< current PG variable count ( relative ) position, needs to be updated in every advance step or init + bool DataPGIsOpen = false; + + Profiler Log; }; /** diff --git a/include/format/BP1Aggregator.h b/include/format/BP1Aggregator.h index 4817382009fcff97557a41f675dd7bb976ac4d2c..60f1a8efafeb5024ce807dbd5f32dd07e889cc01 100644 --- a/include/format/BP1Aggregator.h +++ b/include/format/BP1Aggregator.h @@ -10,20 +10,48 @@ +#ifdef ADIOS_NOMPI + #include "mpidummy.h" +#else + #include <mpi.h> +#endif + + namespace adios { namespace format { +/** + * Does all MPI related spatial aggregation tasks + */ class BP1Aggregator { +public: + + MPI_Comm m_MPIComm = MPI_COMM_SELF; ///< MPI communicator from Engine + int m_RankMPI = 0; ///< current MPI rank process + int m_SizeMPI = 1; ///< current MPI processes size + /** + * Unique constructor + * @param mpiComm coming from engine + */ + BP1Aggregator( MPI_Comm mpiComm, const bool debugMode = false ); + ~BP1Aggregator( ); + /** + * Function that aggregates and writes (from rank = 0) profiling.log in python dictionary format + * @param rankLog contain rank profiling info to be aggregated + */ + void WriteProfilingLog( const std::string fileName, const std::string& rankLog ); +private: + const bool m_DebugMode = false; }; diff --git a/include/format/BP1Writer.h b/include/format/BP1Writer.h index 51fbd6d1dd8ce817549e61c039cd4dc9ba29908a..32521f4985e56e3e34b14d872836e79ac463994b 100644 --- a/include/format/BP1Writer.h +++ b/include/format/BP1Writer.h @@ -19,6 +19,7 @@ #include "BP1.h" #include "core/Variable.h" #include "core/Capsule.h" +#include "core/Profiler.h" #include "capsule/heap/STLVector.h" #include "functions/adiosTemplates.h" #include "functions/adiosFunctions.h" @@ -326,19 +327,27 @@ public: void Advance( BP1MetadataSet& metadataSet, capsule::STLVector& buffer ); - /** * Function that sets metadata (if first close) and writes to a single transport * @param metadataSet current rank metadata set * @param buffer contains data * @param transport does a write after data and metadata is setup * @param isFirstClose true: metadata has been set and aggregated - * @param haveMetadata true: attach metadata buffer to each data buffer and do a transport write - * @param haveTiming true: add timing.log file + * @param doAggregation true: for N-to-M, false: for N-to-N */ void Close( BP1MetadataSet& metadataSet, capsule::STLVector& buffer, Transport& transport, bool& isFirstClose, - const bool haveMetadata = true, const bool haveTiming = false ) const noexcept; + const bool doAggregation ) const noexcept; + + /** + * Writes the ADIOS log information (buffering, open, write and close) for a rank process + * @param rank current rank + * @param metadataSet contains Profile info for buffering + * @param transports contains Profile info for transport open, writes and close + * @return string for this rank that will be aggregated into profiling.log + */ + std::string GetRankProfilingLog( const int rank, const BP1MetadataSet& metadataSet, + const std::vector< std::shared_ptr<Transport> >& transports ) const noexcept; private: diff --git a/include/mpidummy.h b/include/mpidummy.h index b287c43f8017219fafe299f61a4663fdad7e3f54..619a50ad9d554b5d06f4eca0352942934dfe5fb8 100644 --- a/include/mpidummy.h +++ b/include/mpidummy.h @@ -30,6 +30,7 @@ namespace adios typedef int MPI_Comm; typedef uint64_t MPI_Status; +typedef uint64_t MPI_Request; typedef int MPI_File; typedef int MPI_Info; typedef int MPI_Datatype; /* Store the byte size of a type in such vars */ @@ -87,7 +88,9 @@ int MPI_Scatter(void *sendbuf, int sendcnt, MPI_Datatype sendtype, void *recvbuf int MPI_Scatterv(void *sendbuf, int *sendcnts, int *displs, MPI_Datatype sendtype, void *recvbuf, int recvcnt, MPI_Datatype recvtype, int root, MPI_Comm comm); int MPI_Recv( void *recvbuffer, int count, MPI_Datatype type, int source, int tag, MPI_Comm comm, MPI_Status* status ); +int MPI_Irecv( void *recvbuffer, int count, MPI_Datatype type, int source, int tag, MPI_Comm comm, MPI_Request* request ); int MPI_Send( void *sendbuffer, int count, MPI_Datatype type, int destination, int tag, MPI_Comm comm ); +int MPI_Isend( void *recvbuffer, int count, MPI_Datatype type, int source, int tag, MPI_Comm comm, MPI_Request* request ); int MPI_File_open(MPI_Comm comm, char *filename, int amode, MPI_Info info, MPI_File *fh); int MPI_File_close(MPI_File *fh); diff --git a/include/transport/file/FileDescriptor.h b/include/transport/file/FileDescriptor.h index 5555c450c61f9cd25f2e7516818bf7696f2d83f7..0e199659c9861b16861deda4387d657a49d4b0ae 100644 --- a/include/transport/file/FileDescriptor.h +++ b/include/transport/file/FileDescriptor.h @@ -1,5 +1,5 @@ /* - * FileDescriptor.h + * FileDescriptor.h uses POSIX as the underlying library * * Created on: Oct 6, 2016 * Author: wfg @@ -35,7 +35,6 @@ public: void Close( ); - private: int m_FileDescriptor = -1; ///< file descriptor returned by POSIX open @@ -44,7 +43,6 @@ private: - } //end namespace transport } //end namespace #endif /* FILEDESCRIPTOR_H_ */ diff --git a/src/core/Engine.cpp b/src/core/Engine.cpp index 76445c1d575406def3eef4827e919367ee6bd70a..feb562b92c60d559cf22dad7b8489397d59a0d7a 100644 --- a/src/core/Engine.cpp +++ b/src/core/Engine.cpp @@ -80,6 +80,7 @@ void Engine::Write( const std::string variableName, const long double* values ){ void Engine::Write( const std::string variableName, const std::complex<float>* values ){ } void Engine::Write( const std::string variableName, const std::complex<double>* values ){ } void Engine::Write( const std::string variableName, const std::complex<long double>* values ){ } +void Engine::Write( const std::string variableName, const void* values ){ } void Engine::Advance(){ } diff --git a/src/engine/bp/BPFileWriter.cpp b/src/engine/bp/BPFileWriter.cpp index e70a72e57756ab1876507db68b0d19cddb51ab11..85fce4ace4aac55d5a75c7b30a6ac040ed381cb4 100644 --- a/src/engine/bp/BPFileWriter.cpp +++ b/src/engine/bp/BPFileWriter.cpp @@ -21,6 +21,7 @@ BPFileWriter::BPFileWriter( ADIOS& adios, const std::string name, const std::str const Method& method, const bool debugMode, const unsigned int cores ): Engine( adios, "BPFileWriter", name, accessMode, mpiComm, method, debugMode, cores, " BPFileWriter constructor (or call to ADIOS Open).\n" ), m_Buffer{ capsule::STLVector( accessMode, m_RankMPI, m_DebugMode ) }, + m_BP1Aggregator{ format::BP1Aggregator( m_MPIComm, debugMode ) }, m_MaxBufferSize{ m_Buffer.m_Data.max_size() } { Init( ); @@ -138,7 +139,7 @@ void BPFileWriter::Write( const std::string variableName, const std::complex<dou void BPFileWriter::Write( const std::string variableName, const std::complex<long double>* values ) { WriteVariableCommon( m_ADIOS.GetVariable<std::complex<long double>>( variableName ), values ); } -void BPFileWriter::Write( const std::string variableName, const void* values ) +void BPFileWriter::Write( const std::string variableName, const void* values ) //Compound type { } @@ -154,12 +155,33 @@ void BPFileWriter::Close( const int transportIndex ) if( transportIndex == -1 ) { for( auto& transport : m_Transports ) //by reference or value or it doesn't matter? - m_BP1Writer.Close( m_MetadataSet, m_Buffer, *transport, m_IsFirstClose ); + m_BP1Writer.Close( m_MetadataSet, m_Buffer, *transport, m_IsFirstClose, false ); //false: not using aggregation for now } else { - m_BP1Writer.Close( m_MetadataSet, m_Buffer, *m_Transports[transportIndex], m_IsFirstClose ); + m_BP1Writer.Close( m_MetadataSet, m_Buffer, *m_Transports[transportIndex], m_IsFirstClose, false ); //false: not using aggregation for now } + + if( m_MetadataSet.Log.m_IsActive == true ) + { + bool allClose = true; + for( auto& transport : m_Transports ) + { + if( transport->m_IsOpen == true ) + { + allClose = false; + break; + } + } + if( allClose == true ) //aggregate and write profiling.log + { + const std::string rankLog = m_BP1Writer.GetRankProfilingLog( m_RankMPI, m_MetadataSet, m_Transports ); + + + + } + } + } @@ -204,30 +226,32 @@ void BPFileWriter::InitParameters( ) m_BP1Writer.m_Verbosity = verbosity; } - auto itProfile = m_Method.m_Parameters.find( "profile_buffering_units" ); + auto itProfile = m_Method.m_Parameters.find( "profile_units" ); if( itProfile != m_Method.m_Parameters.end() ) { + auto& profiler = m_MetadataSet.Log; + if( itProfile->second == "mus" || itProfile->second == "microseconds" ) - m_Profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::mus ); + profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::mus ); else if( itProfile->second == "ms" || itProfile->second == "milliseconds" ) - m_Profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::ms ); + profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::ms ); else if( itProfile->second == "s" || itProfile->second == "seconds" ) - m_Profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::s ); + profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::s ); else if( itProfile->second == "min" || itProfile->second == "minutes" ) - m_Profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::m ); + profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::m ); else if( itProfile->second == "h" || itProfile->second == "hours" ) - m_Profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::h ); + profiler.m_Timers.emplace_back( "Buffering", Support::Resolutions::h ); else { if( m_DebugMode == true ) throw std::invalid_argument( "ERROR: Method profile_buffering_units argument must be mus, ms, s, min or h, in call to Open or Engine constructor\n" ); } - m_Profiler.m_IsActive = true; + profiler.m_IsActive = true; } } @@ -327,11 +351,19 @@ void BPFileWriter::InitTransports( ) void BPFileWriter::InitProcessGroup( ) { + if( m_MetadataSet.Log.m_IsActive == true ) + m_MetadataSet.Log.m_Timers[0].SetInitialTime(); + if( m_AccessMode == "a" ) { //Get last pg timestep and update timestep counter in format::BP1MetadataSet } + WriteProcessGroupIndex( ); + m_MetadataSet.DataPGIsOpen = true; + + if( m_MetadataSet.Log.m_IsActive == true ) + m_MetadataSet.Log.m_Timers[0].SetTime(); } diff --git a/src/format/BP1Aggregator.cpp b/src/format/BP1Aggregator.cpp new file mode 100644 index 0000000000000000000000000000000000000000..bb2328ff305d4e1f7a7cc7bf328e29bbee2dcb64 --- /dev/null +++ b/src/format/BP1Aggregator.cpp @@ -0,0 +1,95 @@ +/* + * BP1Aggregator.cpp + * + * Created on: Mar 21, 2017 + * Author: wfg + */ + +#include <vector> +#include <fstream> + +#include "format/BP1Aggregator.h" + + +namespace adios +{ +namespace format +{ + + +BP1Aggregator::BP1Aggregator( MPI_Comm mpiComm, const bool debugMode ): + m_MPIComm{ mpiComm }, + m_DebugMode{ debugMode } +{ + MPI_Comm_rank( m_MPIComm, &m_RankMPI ); + MPI_Comm_size( m_MPIComm, &m_SizeMPI ); +} + + +BP1Aggregator::~BP1Aggregator( ) +{ } + + + +void BP1Aggregator::WriteProfilingLog( const std::string fileName, const std::string& rankLog ) +{ + if( m_RankMPI == 0 ) + { + std::vector< std::vector<char> > rankLogs( m_SizeMPI - 1 ); //rankLogs from other processes + + //first receive sizes + unsigned int sizeMPI = static_cast<unsigned int>( m_SizeMPI ); + + for( unsigned int i = 1; i < sizeMPI; ++i ) + { + int size = -1; + MPI_Request request; + MPI_Irecv( &size, 1, MPI_INT, i, 0, m_MPIComm, &request ); + + if( m_DebugMode == true ) + { + if( size == -1 ) + throw std::runtime_error( "ERROR: couldn't get size from rank " + std::to_string(i) + ", in ADIOS aggregator for Profiling.log\n" ); + //here check request + } + rankLogs[i-1].resize( size ); //allocate with zeros + } + //receive rankLog from other ranks + for( unsigned int i = 1; i < sizeMPI; ++i ) + { + int size = static_cast<int>( rankLogs[i-1].size() ); + MPI_Request request; + MPI_Irecv( rankLogs[i-1].data(), size, MPI_CHAR, i, 0, m_MPIComm, &request ); + + if( m_DebugMode == true ) + { + //here check request + } + } + //write file + std::string logFile( "log = { " + rankLog + "\n" ); + for( unsigned int i = 1; i < sizeMPI; ++i ) + { + std::string rankLogStr( rankLogs[i-1].data() ); + logFile += rankLogStr + "\n"; + } + logFile += " }\n"; + + std::ofstream logStream( fileName ); + logStream.write( logFile.c_str(), logFile.size() ); + logStream.close(); + } + else + { + int size = static_cast<int>( rankLog.size() ); + MPI_Request request; + MPI_Isend( &size, 1, MPI_INT, 0, 0, m_MPIComm, &request ); + MPI_Isend( const_cast<char*>( rankLog.c_str() ), size, MPI_CHAR, 0, 0, m_MPIComm, &request ); + } +} + + + +} //end namespace format +} //end namespace adios + diff --git a/src/format/BP1Writer.cpp b/src/format/BP1Writer.cpp index 11af2e25429003cc77b8aa16595326747a3e9021..2ad022f860b346623c309e4e46702622251576fd 100644 --- a/src/format/BP1Writer.cpp +++ b/src/format/BP1Writer.cpp @@ -5,8 +5,13 @@ * Author: wfg */ +/// \cond EXCLUDE_FROM_DOXYGEN +#include <string> +/// \endcond #include "format/BP1Writer.h" +#include "core/Profiler.h" + @@ -105,25 +110,70 @@ void BP1Writer::Advance( BP1MetadataSet& metadataSet, capsule::STLVector& buffer void BP1Writer::Close( BP1MetadataSet& metadataSet, capsule::STLVector& buffer, Transport& transport, bool& isFirstClose, - const bool haveMetadata, const bool haveTiming ) const noexcept + const bool doAggregation ) const noexcept { + if( metadataSet.Log.m_IsActive == true ) + metadataSet.Log.m_Timers[0].SetInitialTime(); + if( isFirstClose == true ) { - FlattenData( metadataSet, buffer ); + if( metadataSet.DataPGIsOpen == true ) + { + FlattenData( metadataSet, buffer ); + } + FlattenMetadata( metadataSet, buffer ); + + if( metadataSet.Log.m_IsActive == true ) + metadataSet.Log.m_Timers[0].SetInitialTime(); + + if( doAggregation == true ) //N-to-M where 1 <= M <= N-1, might need a new Log metadataSet.Log.m_Timers just for aggregation + { + //here call aggregator + } + isFirstClose = false; } - //implementing N-to-N for now, no aggregation - transport.Write( buffer.m_Data.data(), buffer.m_DataPosition ); //single write + if( doAggregation == true ) //N-to-M where 1 <= M <= N-1 + { + //here call aggregator to select transports for Write and Close + } + else // N-to-N + { + transport.Write( buffer.m_Data.data(), buffer.m_DataPosition ); //single write + transport.Close(); + } +} + + +std::string BP1Writer::GetRankProfilingLog( const int rank, const BP1MetadataSet& metadataSet, + const std::vector< std::shared_ptr<Transport> >& transports ) const noexcept +{ + auto lf_WriterTimer = []( const std::string name, const Timer& timer, std::string rankLog ) + { + rankLog += "'" + name + "_" + timer.GetUnits() + "': " + std::to_string( timer.ProcessTime ) + ", "; + }; + + //prepare string dictionary per rank + std::string rankLog( "'rank_" + std::to_string( rank ) + "': { " ); + auto& profiler = metadataSet.Log; + rankLog += "'totalBytes': " + std::to_string( profiler.m_TotalBytes[0] ) + ", "; + lf_WriterTimer( "t_buffering", profiler.m_Timers[0], rankLog ); - if( haveMetadata == true ) + for( unsigned int t = 0; t < transports.size(); ++t ) { - //here call aggregator - } + auto& timers = transports[t]->m_Profiler.m_Timers; - transport.Close(); + rankLog += "'transport_" + std::to_string(t) + "': { "; + lf_WriterTimer( "t_open", timers[0], rankLog ); + lf_WriterTimer( "t_write", timers[1], rankLog ); + lf_WriterTimer( "t_close", timers[2], rankLog ); + rankLog += " }, "; + } + rankLog += "}, "; + return rankLog; } @@ -259,6 +309,12 @@ void BP1Writer::FlattenMetadata( BP1MetadataSet& metadataSet, capsule::STLVector } buffer.m_DataAbsolutePosition += metadataSize; + + if( metadataSet.Log.m_IsActive == true ) + { + metadataSet.Log.m_TotalBytes.push_back( buffer.m_DataAbsolutePosition ); + } + } diff --git a/src/mpidummy.cpp b/src/mpidummy.cpp index ef00b2099bf51f3272bbcb55c299d1301da7fdf1..52fd646e4c02b8cfe2c5e60099f1f0d5a0dd9f23 100644 --- a/src/mpidummy.cpp +++ b/src/mpidummy.cpp @@ -168,9 +168,16 @@ int MPI_Scatterv( void *sendbuf, int *sendcnts, int *displs, int MPI_Recv( void *recvbuffer, int count, MPI_Datatype type, int source, int tag, MPI_Comm comm, MPI_Status* status ) { return 0; } +int MPI_Irecv( void *recvbuffer, int count, MPI_Datatype type, int source, int tag, MPI_Comm comm, MPI_Request* request ) +{ return 0; } + int MPI_Send( void *sendbuffer, int count, MPI_Datatype type, int destination, int tag, MPI_Comm comm ) { return 0; } +int MPI_Isend( void *recvbuffer, int count, MPI_Datatype type, int source, int tag, MPI_Comm comm, MPI_Request* request ) +{ return 0; } + + int MPI_File_open(MPI_Comm comm, char *filename, int amode, MPI_Info info, MPI_File *fh) { *fh = open64 (filename, amode); diff --git a/src/transport/file/FileDescriptor.cpp b/src/transport/file/FileDescriptor.cpp index cf90c938036dfb8f023cd12960578503b1d74f81..0128efeaf5fe3233b743f07c3d237044994fd4f1 100644 --- a/src/transport/file/FileDescriptor.cpp +++ b/src/transport/file/FileDescriptor.cpp @@ -59,7 +59,7 @@ void FileDescriptor::Open( const std::string name, const std::string accessMode if( m_Profiler.m_IsActive == true ) m_Profiler.m_Timers[0].SetInitialTime(); - m_FileDescriptor = open( m_Name.c_str(), O_WRONLY | O_APPEND ); + m_FileDescriptor = open( m_Name.c_str(), O_WRONLY | O_APPEND ); //we need to change this if( m_Profiler.m_IsActive == true ) m_Profiler.m_Timers[0].SetTime(); @@ -125,6 +125,8 @@ void FileDescriptor::Close( ) throw std::ios_base::failure( "ERROR: couldn't close file " + m_Name + ", in call to POSIX write\n" ); } + + m_IsOpen = false; } diff --git a/src/transport/file/FilePointer.cpp b/src/transport/file/FilePointer.cpp index ce3088ebbf57128e3c74a91fb9fc236595a2484b..9340664b31bb363f8ad0289a610664ba63875306 100644 --- a/src/transport/file/FilePointer.cpp +++ b/src/transport/file/FilePointer.cpp @@ -89,6 +89,8 @@ void FilePointer::Flush( ) void FilePointer::Close( ) { fclose( m_File ); + + m_IsOpen = false; }