From 9867bb539c5d59b73266634d5284eedff5b5bb78 Mon Sep 17 00:00:00 2001 From: wfg <wfg@pc0098504.ornl.gov> Date: Thu, 5 Jan 2017 17:52:08 -0500 Subject: [PATCH] Working on SingleBP Engine Understanding BP file format and each record. To do: Move Metadata out of FStream transport. Keep it simple. --- Makefile | 3 +- Makefile.libs | 20 +++--- examples/dataman/datamanNoXML.cpp | 12 ++-- include/capsule/Heap.h | 2 +- include/core/Capsule.h | 12 +++- include/core/Engine.h | 15 +++-- include/core/Method.h | 1 - include/core/Transport.h | 33 +++++++--- include/engine/Single.h | 49 -------------- include/engine/SingleBP.h | 69 ++++++++++++++++++++ include/transport/FStream.h | 15 +++-- include/transport/MPIFile.h | 55 ++++++++++++++++ src/capsule/Heap.cpp | 4 +- src/core/Capsule.cpp | 10 +-- src/core/Engine.cpp | 48 +++++--------- src/core/Transport.cpp | 14 +++- src/engine/Single.cpp | 34 ---------- src/engine/SingleBP.cpp | 105 ++++++++++++++++++++++++++++++ src/transport/FStream.cpp | 91 +++++++++++++++++++++----- 19 files changed, 413 insertions(+), 179 deletions(-) delete mode 100644 include/engine/Single.h create mode 100644 include/engine/SingleBP.h create mode 100644 include/transport/MPIFile.h delete mode 100644 src/engine/Single.cpp create mode 100644 src/engine/SingleBP.cpp diff --git a/Makefile b/Makefile index 3e048587b..43b8bf1e3 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -# Makefile for testing purposes, will build libadios.a +# Makefile for testing purposes, will build libadios.a, libadios_nompi.a # Created on: Oct 4, 2016 # Author: wfg @@ -30,6 +30,7 @@ OBJMPI:=$(patsubst %.cpp, ./bin/mpi/%.o, $(notdir $(CPPFiles)) ) OBJMPI:=$(patsubst ./bin/mpi/mpidummy.o, ,$(OBJMPI) ) #remove mpidummy from compilation OBJNoMPI:=$(patsubst %.cpp, ./bin/nompi/%.o, $(notdir $(CPPFiles)) ) +OBJNoMPI:=$(patsubst ./bin/nompi/MPIFile.o, ,$(OBJNoMPI) ) #remove MPIFile from compilation (not supported in serial) .PHONY: all clean mpi nompi diff --git a/Makefile.libs b/Makefile.libs index 7547da289..5e075454b 100644 --- a/Makefile.libs +++ b/Makefile.libs @@ -17,16 +17,16 @@ ifeq ($(HAVE_NETCDF),yes) LIBS += -lnetcdf CFLAGS += -DHAVE_NETCDF else - HFiles:=$(filter-out $(TRANSPORT_INC)/CNetCDF4.h,$(HFiles)) - CPPFiles:=$(filter-out $(TRANSPORT_SRC)/CNetCDF4.cpp,$(CPPFiles)) + HFiles:=$(filter-out $(TRANSPORT_INC)/NetCDF4.h,$(HFiles)) + CPPFiles:=$(filter-out $(TRANSPORT_SRC)/NetCDF4.cpp,$(CPPFiles)) endif ifeq ($(HAVE_PHDF5),yes) LIBS += -lhdf5 CFLAGS += -DHAVE_PHDF5 else - HFiles:=$(filter-out $(TRANSPORT_INC)/CPHDF5.h,$(HFiles)) - CPPFiles:=$(filter-out $(TRANSPORT_SRC)/CPHDF5.cpp,$(CPPFiles)) + HFiles:=$(filter-out $(TRANSPORT_INC)/PHDF5.h,$(HFiles)) + CPPFiles:=$(filter-out $(TRANSPORT_SRC)/PHDF5.cpp,$(CPPFiles)) endif @@ -38,22 +38,22 @@ ifeq ($(HAVE_BZIP2),yes) LIBS += -lbz2 CFLAGS += -DHAVE_BZIP2 else - HFiles:=$(filter-out $(TRANSFORM_INC)/CBZIP2.h,$(HFiles)) - CPPFiles:=$(filter-out $(TRANSFORM_SRC)/CBZIP2.cpp,$(CPPFiles)) + HFiles:=$(filter-out $(TRANSFORM_INC)/BZIP2.h,$(HFiles)) + CPPFiles:=$(filter-out $(TRANSFORM_SRC)/BZIP2.cpp,$(CPPFiles)) endif ifeq ($(HAVE_SZIP),yes) LIBS += -lsz CFLAGS += -DHAVE_SZIP else - HFiles:=$(filter-out $(TRANSFORM_INC)/CSZIP.h,$(HFiles)) - CPPFiles:=$(filter-out $(TRANSFORM_SRC)/CSZIP.cpp,$(CPPFiles)) + HFiles:=$(filter-out $(TRANSFORM_INC)/SZIP.h,$(HFiles)) + CPPFiles:=$(filter-out $(TRANSFORM_SRC)/SZIP.cpp,$(CPPFiles)) endif ifeq ($(HAVE_ZLIB),yes) LIBS += -lz CFLAGS += -DHAVE_ZLIB else - HFiles:=$(filter-out $(TRANSFORM_INC)/CZLIB.h,$(HFiles)) - CPPFiles:=$(filter-out $(TRANSFORM_SRC)/CZLIB.cpp,$(CPPFiles)) + HFiles:=$(filter-out $(TRANSFORM_INC)/ZLIB.h,$(HFiles)) + CPPFiles:=$(filter-out $(TRANSFORM_SRC)/ZLIB.cpp,$(CPPFiles)) endif diff --git a/examples/dataman/datamanNoXML.cpp b/examples/dataman/datamanNoXML.cpp index 4bf62bcdc..8935049c1 100644 --- a/examples/dataman/datamanNoXML.cpp +++ b/examples/dataman/datamanNoXML.cpp @@ -9,12 +9,12 @@ #ifdef HAVE_MPI #include <mpi.h> #else - #include "../../include/mpidummy.h" + #include "mpidummy.h" using adios::MPI_Init; using adios::MPI_Comm_rank; #endif -#include "../../include/ADIOS.h" +#include "ADIOS.h" int main( int argc, char* argv [] ) @@ -38,10 +38,12 @@ int main( int argc, char* argv [] ) adios.DefineVariable( groupTCP, "myCharsSize", "unsigned int" ); //scalar : group, name, type adios.DefineVariable( groupTCP, "myChars", "char", "myCharsSize" ); //group, name, type, integer variables defining dimensions + //Define method or engine + //Open stream using two transports, DataMan is default, POSIX is an additional one - const std::string streamTCP( "TCP" ); - adios.Open( streamTCP, "write", "DataMan" ); //here open a stream called TCPStream for writing (w or write), name is the same as stream - adios.AddTransport( streamTCP, "write", "POSIX", "name=TCP.bp" ); //add POSIX transport with .bp name + const std::string streamTCP( "TCP.bp" ); + const unsigned int tcpHandle = adios.Open( streamTCP, "w", "SingleBP" ); //here open a stream called TCPStream for writing (w or write), name is the same as stream + adios.AddTransport( , "write", "POSIX", "name=TCP.bp" ); //add POSIX transport with .bp name adios.SetMaxBufferSize( streamTCP, 1000000000 ); // Setting max Buffer size to 1Gb //Writing diff --git a/include/capsule/Heap.h b/include/capsule/Heap.h index 8eea88849..b5085d125 100644 --- a/include/capsule/Heap.h +++ b/include/capsule/Heap.h @@ -30,7 +30,7 @@ public: * @param dataSize maximum data size set by user * @param metadataSize maximum metadata size set by user */ - Heap( const std::string accessMode, const int rankMPI, const unsigned int cores = 1 ); + Heap( const std::string accessMode, const int rankMPI, const bool debugMode = false, const unsigned int cores = 1 ); ~Heap( ); diff --git a/include/core/Capsule.h b/include/core/Capsule.h index 85a2a8ef8..9f72f064a 100644 --- a/include/core/Capsule.h +++ b/include/core/Capsule.h @@ -26,8 +26,6 @@ public: const std::string m_Type; ///< buffer type const std::string m_AccessMode; ///< 'w': write, 'r': read, 'a': append - const int m_RankMPI; ///< current MPI rank - const unsigned int m_Cores = 1; ///< number of cores for threaded operations /** * Base class constructor providing type from derived class and accessMode @@ -37,7 +35,8 @@ public: * @param cores if using threads */ Capsule( const std::string type, const std::string accessMode, const int rankMPI, - const unsigned int cores ); + const bool debugMode, const unsigned int cores ); + virtual ~Capsule( ); @@ -84,6 +83,13 @@ public: virtual void WriteMetadata( const std::size_t first, const double* metadata, const std::size_t size ) = 0; virtual void WriteMetadata( const std::size_t first, const long double* metadata, const std::size_t size ) = 0; + +private: + + const int m_RankMPI; ///< current MPI rank + const bool m_DebugMode = false; ///< true: extra checks + const unsigned int m_Cores = 1; ///< number of cores for threaded operations + }; diff --git a/include/core/Engine.h b/include/core/Engine.h index 2c2ce7d99..26c938b28 100644 --- a/include/core/Engine.h +++ b/include/core/Engine.h @@ -50,7 +50,7 @@ public: const std::string m_EngineType; ///< from derived class const std::string m_Name; ///< name used for this engine const std::string m_AccessMode; ///< accessMode for buffers used by this engine - Method* m_Method = nullptr; ///< associated method containing engine metadata + const Method& m_Method; ///< associated method containing engine metadata Group* m_Group = nullptr; ///< associated group to look for variable information int m_RankMPI = 0; ///< current MPI rank process @@ -65,7 +65,7 @@ public: * @param method */ Engine( const std::string engineType, const std::string name, const std::string accessMode, const MPI_Comm mpiComm, - const Method& method, const bool debugMode ); + const Method& method, const bool debugMode = false, const unsigned int cores = 1 ); virtual ~Engine( ); @@ -116,12 +116,14 @@ public: protected: - std::vector< std::shared_ptr<Transport> > m_Transports; ///< transports managed std::vector< std::shared_ptr<Capsule> > m_Capsules; ///< managed Capsules - const bool m_DebugMode = false; + std::vector< std::shared_ptr<Transport> > m_Transports; ///< transports managed + const bool m_DebugMode = false; ///< true: additional checks, false: by-pass checks + unsigned int m_Cores = 1; - virtual void SetCapsules( ); ///< Initialize transports from Method, called from constructor. - virtual void SetTransports( ); ///< Initialize transports from Method, called from constructor. + virtual void Init( ); ///< Initialize m_Capsules and m_Transports, called from constructor + virtual void InitCapsules( ); ///< Initialize transports from Method, called from Init in constructor. + virtual void InitTransports( ); ///< Initialize transports from Method, called from Init in constructor. /** * Performs preliminary checks before writing a variable. Throws an exception if checks fail. @@ -136,7 +138,6 @@ protected: const std::set<std::string>& types, const std::string hint ) const; - std::string GetName( const std::vector<std::string>& arguments ) const; //might move this to adiosFunctions }; diff --git a/include/core/Method.h b/include/core/Method.h index e97e68b23..e042cef0a 100644 --- a/include/core/Method.h +++ b/include/core/Method.h @@ -23,7 +23,6 @@ struct Method std::string Name; ///< Method name std::vector< std::string > Capsules; ///< Capsule type std::map< std::string, std::vector<std::string> > Transports; ///< key: transports, value: arguments to Transport - std::string Group; ///< Associated group name to this engine }; diff --git a/include/core/Transport.h b/include/core/Transport.h index 9097768ab..0637d0ef7 100644 --- a/include/core/Transport.h +++ b/include/core/Transport.h @@ -31,15 +31,15 @@ class Transport public: const std::string m_Type; ///< transport type from derived class + const std::string m_Name; ///< from Open + const std::string m_AccessMode; ///< from Open + #ifdef HAVE_MPI MPI_Comm m_MPIComm = NULL; ///< only used as reference to MPI communicator passed from parallel constructor, MPI_Comm is a pointer itself. Public as called from C #else MPI_Comm m_MPIComm = 0; ///< only used as reference to MPI communicator passed from parallel constructor, MPI_Comm is a pointer itself. Public as called from C #endif - //const unsigned int m_Iterations; ///< iteration number for this transport - const bool m_DebugMode; ///< if true: additional checks and exceptions - int m_MPIRank = 0; ///< current MPI rank process int m_MPISize = 1; ///< current MPI processes size @@ -56,23 +56,40 @@ public: /** * Open Output file accesing a mode - * @param streamName name of stream or file + * @param name name of stream or file * @param accessMode r or read, w or write, a or append */ - virtual void Open( const std::string streamName, const std::string accessMode ) = 0; + virtual void Open( const std::string name, const std::string accessMode ) = 0; + + /** + * Set buffer and size for a particular transport + * @param buffer raw data buffer + * @param size raw data buffer size + */ + virtual void SetBuffer( char* buffer, std::size_t size ); + + /** + * Write function for a transport + * @param buffer pointer to buffer to be written + * @param size size of buffer to be written + */ + virtual void Write( const char* buffer, const std::size_t size ) = 0; /** - * Write function for a transport, only called if required + * Some transports separate the data from the metadata in a different medium * @param buffer + * @param size */ - virtual void Write( const Capsule& capsule ); + virtual void WriteMetadata( const char* buffer, const std::size_t size ); + virtual void Flush( ); ///< flushes current contents to physical medium without closing the transport - virtual void Close( ) = 0; ///< closes current transport and flushes everything, can't be reachable after this call + virtual void Close( ); ///< closes current transport and flushes everything, transport becomes unreachable protected: + const bool m_DebugMode = false; ///< if true: additional checks and exceptions /** * Initialize particular derived transport class members * @param arguments particular transport arguments from ADIOS Open variadic function diff --git a/include/engine/Single.h b/include/engine/Single.h deleted file mode 100644 index ff60729c1..000000000 --- a/include/engine/Single.h +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Single.h - * - * Created on: Dec 16, 2016 - * Author: wfg - */ - -#ifndef SINGLE_H_ -#define SINGLE_H_ - -#include "core/Engine.h" - - -namespace adios -{ - - -class Single : public Engine -{ - -public: - - /** - * Constructor for single capsule engine - * @param streamName - * @param accessMode - * @param mpiComm - * @param method - * @param debugMode - */ - Single( const std::string streamName, const std::string accessMode, const MPI_Comm mpiComm, const Method& method, - const bool debugMode = false ); - - - ~Single( ); - -}; - - - - - - - - -} //end namespace - - -#endif /* SINGLE_H_ */ diff --git a/include/engine/SingleBP.h b/include/engine/SingleBP.h new file mode 100644 index 000000000..4e3d08e7d --- /dev/null +++ b/include/engine/SingleBP.h @@ -0,0 +1,69 @@ +/* + * SingleBP.h + * + * Created on: Dec 16, 2016 + * Author: wfg + */ + +#ifndef SINGLEBP_H_ +#define SINGLEBP_H_ + +#include "core/Engine.h" + + +namespace adios +{ + + +class SingleBP : public Engine +{ + +public: + + /** + * Constructor for single BP capsule engine, writes in BP format into a single heap capsule + * @param name unique name given to the engine + * @param accessMode + * @param mpiComm + * @param method + * @param debugMode + */ + SingleBP( const std::string name, const std::string accessMode, const MPI_Comm mpiComm, + const Method& method, const bool debugMode = false, const unsigned int cores = 1 ); + + ~SingleBP( ); + + void Write( Group& group, const std::string variableName, const char* values ); + void Write( Group& group, const std::string variableName, const unsigned char* values ); + void Write( Group& group, const std::string variableName, const short* values ); + void Write( Group& group, const std::string variableName, const unsigned short* values ); + void Write( Group& group, const std::string variableName, const int* values ); + void Write( Group& group, const std::string variableName, const unsigned int* values ); + void Write( Group& group, const std::string variableName, const long int* values ); + void Write( Group& group, const std::string variableName, const unsigned long int* values ); + void Write( Group& group, const std::string variableName, const long long int* values ); + void Write( Group& group, const std::string variableName, const unsigned long long int* values ); + void Write( Group& group, const std::string variableName, const float* values ); + void Write( Group& group, const std::string variableName, const double* values ); + void Write( Group& group, const std::string variableName, const long double* values ); + + +private: + + void Init( ); + void InitCapsules( ); + void InitTransports( ); + +}; + + + + + + + + +} //end namespace + + +#endif /* SINGLEBP_H_ */ diff --git a/include/transport/FStream.h b/include/transport/FStream.h index 7f8916571..a99000e38 100644 --- a/include/transport/FStream.h +++ b/include/transport/FStream.h @@ -29,17 +29,22 @@ public: ~FStream( ); - void Open( const std::string streamName, const std::string accessMode ); + void Open( const std::string name, const std::string accessMode ); - void SetBuffer( std::vector<char>& buffer ); + void SetBuffer( char* buffer, std::size_t size ); - void Write( const Capsule& capsule ); + void Write( const char* buffer, std::size_t size ); - void Close( const Capsule& capsule ); + void Flush( ); + + void Close( ); private: - std::fstream m_FStream; ///< file stream corresponding to this transport + std::fstream m_Data; ///< file stream under name.bp.dir/name.bp.rank + + bool m_HasMetadataFile = false; ///< true if metadata file is defined in arguments as have_metadata_file=1 + std::fstream m_Metadata; ///< file stream under name.bp storing metadata void Init( const std::vector<std::string>& arguments ); diff --git a/include/transport/MPIFile.h b/include/transport/MPIFile.h new file mode 100644 index 000000000..50e0055c9 --- /dev/null +++ b/include/transport/MPIFile.h @@ -0,0 +1,55 @@ +/* + * MPIFile.h + * + * Created on: Jan 5, 2017 + * Author: wfg + */ + +#ifndef MPIFILE_H_ +#define MPIFILE_H_ + +#include <mpi.h> + + +namespace adios +{ + +/** + * Class that defines a transport method using C++ file streams + */ +class MPIFile : public Transport +{ + +public: + + MPIFile( MPI_Comm mpiComm, const bool debugMode, const std::vector<std::string>& arguments ); + + ~MPIFile( ); + + void Open( const std::string streamName, const std::string accessMode ); + + void SetBuffer( char* buffer, std::size_t size ); + + void Write( const char* buffer, std::size_t size ); + + void Flush( ); + + void Close( ); + + +private: + + MPI_File m_MPIFile; + +}; + + + + +} //end namespace + + + + + +#endif /* MPIFILE_H_ */ diff --git a/src/capsule/Heap.cpp b/src/capsule/Heap.cpp index cbc2e64e5..c858a4c4a 100644 --- a/src/capsule/Heap.cpp +++ b/src/capsule/Heap.cpp @@ -14,8 +14,8 @@ namespace adios { -Heap::Heap( const std::string accessMode, const int rankMPI, const unsigned int cores ): - Capsule( "Heap", accessMode, rankMPI, cores ) +Heap::Heap( const std::string accessMode, const int rankMPI, const bool debugMode, const unsigned int cores ): + Capsule( "Heap", accessMode, rankMPI, debugMode, cores ) { m_Data.reserve( 16777216 ); //default capacity = 16Mb m_Metadata.reserve( 102400 ); //default capacity = 100Kb diff --git a/src/core/Capsule.cpp b/src/core/Capsule.cpp index 324f2eb99..55ee80985 100644 --- a/src/core/Capsule.cpp +++ b/src/core/Capsule.cpp @@ -14,10 +14,12 @@ namespace adios Capsule::Capsule( const std::string type, const std::string accessMode, const int rankMPI, - const unsigned int cores ): + const bool debugMode, const unsigned int cores ): m_Type{ type }, m_AccessMode{ accessMode }, - m_RankMPI{ rankMPI } + m_RankMPI{ rankMPI }, + m_DebugMode{ debugMode }, + m_Cores{ cores } { } @@ -25,11 +27,11 @@ Capsule::~Capsule( ) { } -void Capsule::ResizeData( std::size_t size ) +void Capsule::ResizeData( const std::size_t size ) { } -void Capsule::ResizeMetadata( std::size_t size ) +void Capsule::ResizeMetadata( const std::size_t size ) { } diff --git a/src/core/Engine.cpp b/src/core/Engine.cpp index e87bf83b3..fea99e02f 100644 --- a/src/core/Engine.cpp +++ b/src/core/Engine.cpp @@ -16,12 +16,14 @@ namespace adios Engine::Engine( const std::string engineType, const std::string name, const std::string accessMode, const MPI_Comm mpiComm, const Method& method, - const bool debugMode ): + const bool debugMode, const unsigned int cores ): m_EngineType{ engineType }, m_Name{ name }, m_AccessMode{ accessMode }, m_Method{ &method }, - m_MPIComm{ mpiComm } + m_MPIComm{ mpiComm }, + m_DebugMode{ debugMode }, + m_Cores{ cores } { MPI_Comm_rank( m_MPIComm, &m_RankMPI ); MPI_Comm_size( m_MPIComm, &m_SizeMPI ); @@ -32,6 +34,18 @@ Engine::~Engine( ) { } +void Engine::Init( ) +{ } + + +void Engine::InitCapsules( ) +{ } + + +void Engine::InitTransports( ) +{ } + + //WRITE Functions void Engine::Write( Group& group, const std::string variableName, const char* values ) { @@ -133,7 +147,7 @@ const unsigned int Engine::PreSetVariable( Group& group, const std::string varia } group.m_WrittenVariables.insert( variableName ); //should be done before writing to buffer, in case there is a crash? - unsigned int index = itVariable->second.second; + const unsigned int index = itVariable->second.second; return index; } @@ -152,33 +166,7 @@ void Engine::Close( int transportIndex ) } -//PROTECTED FUNCTIONS -void Engine::SetTransports( ) -{ - for( const auto& transportPair : m_Method->Transports ) - { - const std::string transport = transportPair.first; - const std::vector<std::string>& arguments = transportPair.second; - - if( transport == "POSIX" ) - m_Transports.push_back( std::make_shared<POSIX>( m_MPIComm, m_DebugMode, arguments ) ); - - else if( transport == "FStream" ) - m_Transports.push_back( std::make_shared<FStream>( m_MPIComm, m_DebugMode, arguments ) ); - - else - { - if( m_DebugMode == true ) - throw std::invalid_argument( "ERROR: transport + " + transport + " not supported, in Engine constructor (or Open).\n" ); - } - - std::string name = GetName( arguments ); - m_Transports.back()->Open( name, m_AccessMode ); - } -} - - - +//PROTECTED std::string Engine::GetName( const std::vector<std::string>& arguments ) const { bool isNameFound = false; diff --git a/src/core/Transport.cpp b/src/core/Transport.cpp index 6a63ef904..db5208e1f 100644 --- a/src/core/Transport.cpp +++ b/src/core/Transport.cpp @@ -26,15 +26,23 @@ Transport::~Transport( ) { } -void Transport::Write( const Capsule& capsule ) +void Transport::SetBuffer( char* buffer, size_t size ) { } -void Transport::Close( const Capsule& capsule ) +void Transport::Init( const std::vector<std::string>& arguments ) { } -void Transport::Init( const std::vector<std::string>& arguments ) +void Transport::WriteMetadata( const char* buffer, const std::size_t size ) +{ } + + +void Transport::Flush( ) +{ } + + +void Transport::Close( ) { } diff --git a/src/engine/Single.cpp b/src/engine/Single.cpp deleted file mode 100644 index ecaa64a0f..000000000 --- a/src/engine/Single.cpp +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Single.cpp - * - * Created on: Dec 19, 2016 - * Author: wfg - */ - - -#include "engine/Single.h" - -namespace adios -{ - -Single::Single( const std::string streamName, const std::string accessMode, const MPI_Comm mpiComm, const Method& method, - const bool debugMode ): - Engine( "Single Buffer Engine", streamName, accessMode, mpiComm, method, debugMode ) -{ - if( m_DebugMode == true ) - { - if( m_Method->Capsules.size() > 1 ) - throw std::invalid_argument( "ERROR: single buffer engine only allows one buffer, from Single constructor in Open.\n" ); - } - - SetTransports( ); -} - - -Single::~Single( ) -{ } - - - -} - diff --git a/src/engine/SingleBP.cpp b/src/engine/SingleBP.cpp new file mode 100644 index 000000000..0a2ef4b28 --- /dev/null +++ b/src/engine/SingleBP.cpp @@ -0,0 +1,105 @@ +/* + * SingleBP.cpp + * + * Created on: Dec 19, 2016 + * Author: wfg + */ +#include "engine/SingleBP.h" + +//supported capsules +#include "capsule/Heap.h" + +//supported transports +#include "transport/POSIX.h" +#include "transport/FStream.h" + + +namespace adios +{ + + +SingleBP::SingleBP( const std::string streamName, const std::string accessMode, const MPI_Comm mpiComm, + const Method& method, const bool debugMode, const unsigned int cores ): + Engine( "SingleBP", streamName, accessMode, mpiComm, method, debugMode, cores ) +{ + Init( ); +} + + +SingleBP::~SingleBP( ) +{ } + + +void SingleBP::Init( ) +{ + InitCapsules( ); + InitTransports( ); +} + + +void SingleBP::Write( Group& group, const std::string variableName, const double* values ) +{ + auto index = PreSetVariable( group, variableName, Support::DatatypesAliases.at("double"), " from call to Write double*" ); + + +} + + +void SingleBP::InitCapsules( ) +{ + if( m_DebugMode == true ) + { + if( m_Method.Capsules.size() != 1 ) + throw std::invalid_argument( "ERROR: SingleBP engine only allows one heap buffer, from SingleBP constructor in Open.\n" ); + + if( m_Method.Capsules[0] != "Heap" ) + throw std::invalid_argument( "ERROR: SingleBP doesn't support Capsule of type " + m_Method.Capsules[0] + ", from SingleBP constructor in Open.\n" ); + } + //Create single capsule of type heap + m_Capsules.push_back( std::make_shared<Heap>( m_AccessMode, m_RankMPI, m_Cores ) ); +} + + + +void SingleBP::InitTransports( ) +{ + std::set< std::string > transportStreamNames; //used to check for name conflict between transports + std::string name = GetName( arguments ); + m_Transports.back()->Open( name, m_AccessMode ); + + for( const auto& transportPair : m_Method.Transports ) + { + const std::string transport = transportPair.first; + const std::vector<std::string>& arguments = transportPair.second; + + if( transport == "POSIX" ) + { + m_Transports.push_back( std::make_shared<POSIX>( m_MPIComm, m_DebugMode, arguments ) ); + } + else if( transport == "FStream" ) + { + m_Transports.push_back( std::make_shared<FStream>( m_MPIComm, m_DebugMode, arguments ) ); + } + else + { + if( m_DebugMode == true ) + throw std::invalid_argument( "ERROR: transport + " + transport + " not supported, in Engine constructor (or Open).\n" ); + } + + + } +} + + + + + + + + + + + + +} //end namespace + diff --git a/src/transport/FStream.cpp b/src/transport/FStream.cpp index 41e8cf7e6..67e577013 100644 --- a/src/transport/FStream.cpp +++ b/src/transport/FStream.cpp @@ -24,7 +24,7 @@ namespace adios FStream::FStream( MPI_Comm mpiComm, const bool debugMode, const std::vector<std::string>& arguments ): Transport( "FStream", mpiComm, debugMode ) { - //here do something with arguments + Init( arguments ); } @@ -32,52 +32,111 @@ FStream::~FStream( ) { } -void FStream::Open( const std::string streamName, const std::string accessMode ) +void FStream::Open( const std::string name, const std::string accessMode ) { - const std::string directory( streamName + ".dir" ); //data.bp.dir + m_Name = name; + if( name.compare( name.size()-3, 3, ".bp" ) == 0 ) //check if .bp extension already exists + m_Name = name.substr( 0, name.size()-3 ); + + m_AccessMode = accessMode; + + const std::string directory( m_Name + ".bp.dir" ); //data.bp.dir if( m_MPIRank == 0 ) CreateDirectory( directory ); MPI_Barrier( m_MPIComm ); //all processor wait until directory is created - const std::string streamNameRank( directory + "/" + streamName + "." + std::to_string( m_MPIRank ) ); //data.bp.dir./data.bp.Rank + const std::string rankFile( directory + "/" + m_Name + ".bp." + std::to_string( m_MPIRank ) ); //data.bp.dir./data.bp.Rank if( accessMode == "w" || accessMode == "write" ) - m_FStream.open( streamNameRank, std::fstream::out ); + m_Data.open( rankFile, std::fstream::out ); else if( accessMode == "a" || accessMode == "append" ) - m_FStream.open( streamNameRank, std::fstream::out | std::fstream::app ); + m_Data.open( rankFile, std::fstream::out | std::fstream::app ); else if( accessMode == "r" || accessMode == "read" ) - m_FStream.open( streamNameRank, std::fstream::in ); + m_Data.open( rankFile, std::fstream::in ); + if( m_DebugMode == true ) { - if( !m_FStream ) - throw std::ios_base::failure( "ERROR: couldn't open file " + streamName + " in Open function of FStream transport\n" ); + if( !m_Data ) + throw std::ios_base::failure( "ERROR: couldn't open file " + rankFile + ", in call to Open from FStream transport\n" ); + } + + + if( m_HasMetadataFile == true ) + { + const std::string metadataFile( m_Name + ".bp" ); + if( accessMode == "w" || accessMode == "write" ) + m_Metadata.open( metadataFile, std::fstream::out ); + + else if( accessMode == "a" || accessMode == "append" ) + m_Metadata.open( metadataFile, std::fstream::out | std::fstream::app ); + + else if( accessMode == "r" || accessMode == "read" ) + m_Metadata.open( metadataFile, std::fstream::in ); + + if( !m_Metadata ) + throw std::ios_base::failure( "ERROR: couldn't open file " + metadataFile + ", in call to Open from FStream transport\n" ); } +} - MPI_Barrier( m_MPIComm ); //all of them must wait until the file is opened + +void FStream::SetBuffer( char* buffer, std::size_t size ) +{ + m_Data.rdbuf()->pubsetbuf( buffer, size ); } -void FStream::SetBuffer( std::vector<char>& buffer ) +void FStream::Write( const char* buffer, std::size_t size ) { - //m_FStream.rdbuf()->pubsetbuf( &buffer[0], buffer.size() ); + m_Data.write( buffer, size ); } -void FStream::Write( const Capsule& capsule ) + +void FStream::WriteMetadata( const char* buffer, std::size_t size ) { - //m_FStream.write( &buffer[0], buffer.size() ); + m_Metadata.write( buffer, size ); } -void FStream::Close( const Capsule& capsule ) +void FStream::Flush( ) { - m_FStream.close(); + m_Data.flush( ); } + +void FStream::Close( ) +{ + m_Data.close(); + + if( m_Metadata ) + m_Metadata.close( ); +} + + +void FStream::Init( const std::vector<std::string>& arguments ) +{ + for( const auto argument : arguments ) + { + if( argument.compare( 0, 19, "have_metadata_file=" ) == 0 ) + { + int haveMetadata = std::stoi( argument.substr(19) ); + + if( haveMetadata == 0 ) + m_HasMetadataFile = false; + else if( haveMetadata == 1 ) + m_HasMetadataFile = true; + else + throw std::invalid_argument( "ERROR: have_metadata_file= value must be 0 (off) or 1 (on), in FStream transport constructor\n" ); + } + } +} + + + //void CFStream::Write( const CVariable& variable ) ///this is aggregation //{ // //local buffer, to be send over MPI -- GitLab