From f21e56f250693c33356fdc0d1204244ef532f553 Mon Sep 17 00:00:00 2001 From: wfg <wfg@pc0098504.ornl.gov> Date: Fri, 18 Nov 2016 17:27:00 -0500 Subject: [PATCH] Modified CDataMan and continued working on CCapsule CDataMan now has a different constructor as CTransport need the name for CCapsule operations. Added ./include/functions/CCapsuleTemplates.h Working on CCapsule using template Write functions and a threaded version of memcpy (must be tested with large data). To do: Test current code Need to work on Metadata Need to work on Attribute Write --- include/core/CCapsule.h | 70 ++++++++------- include/core/CTransport.h | 8 +- include/functions/ADIOSFunctions.h | 33 ++++++- include/functions/ADIOSTemplates.h | 103 ++++++++++------------ include/functions/CCapsuleTemplates.h | 119 ++++++++++++++++++++++++++ include/public/ADIOS.h | 2 +- include/transport/CPOSIX.h | 2 +- src/core/CCapsule.cpp | 75 ++++++++-------- src/functions/ADIOSFunctions.cpp | 57 +++++++++++- src/public/ADIOS.cpp | 10 +-- src/transport/CDataMan.cpp | 4 +- src/transport/CFStream.cpp | 2 +- src/transport/CPOSIX.cpp | 2 +- 13 files changed, 343 insertions(+), 144 deletions(-) create mode 100644 include/functions/CCapsuleTemplates.h diff --git a/include/core/CCapsule.h b/include/core/CCapsule.h index 98baff57b..17ac013da 100644 --- a/include/core/CCapsule.h +++ b/include/core/CCapsule.h @@ -50,10 +50,11 @@ public: const bool m_DebugMode = false; - std::map< std::string, std::vector<char> > m_Buffers; ///< buffer to be managed, key is the streamName - std::map< std::string, std::shared_ptr<CTransport> > m_Transports; ///< transport associated with ADIOS run + std::map< std::string, std::vector<unsigned char> > m_Buffers; ///< buffer to be managed, key is the streamName + std::map< std::string, size_t > m_MaxBufferSize; ///< key is the streamName, value is the maximum buffer size + std::map< std::string, std::shared_ptr<CTransport> > m_Transports; ///< transport associated with ADIOS run, key is streamName - std::map< std::string, std::shared_ptr<CTransform> > m_Transforms; ///< transforms associated with ADIOS run + std::map< std::string, std::shared_ptr<CTransform> > m_Transforms; ///< transforms associated with ADIOS run, key is transform name ///Maybe add a communication class object @@ -77,47 +78,54 @@ public: ~CCapsule( ); - void SetTransform( const std::string transform ); - - void SetTransport( const std::string streamName, const std::string transport, const bool debugMode ); - - void SetBuffer( const std::string streamName, const unsigned long long int maxBufferSize ); - /** - * Open a certain stream based on transport method - * @param streamName associated file or stream - * @param accessMode "w": write, "a": append, need more info on this + * Open streamName by assigning a buffer, maxBufferSize and a transport mode + * @param streamName + * @param accessMode + * @param maxBufferSize + * @param transport */ - void Open( const std::string streamName, const std::string accessMode ); + void Open( const std::string streamName, const std::string accessMode, + const size_t maxBufferSize, const std::string transport ); /** - * Writes raw data to m_Buffer, call for local variables + * Writes raw data to m_Buffer * @param streamName key to get the corresponding buffer from m_Buffers * @param data pointer containing the data - * @param dataSize sizeof each element of data - * @param localDimensions if scalar it will have one number, if multidimensional it will start with the slowest moving dimension + * @param size of data to be written */ - void WriteDataToBuffer( const std::string streamName, const void* data, const size_t dataSize, - const std::vector<unsigned long long int>& localDimensions ); + template<class T> + void Write( const std::string streamName, const T* data, const size_t size, const unsigned int cores ) + { + auto itBuffer = m_Buffers.find( streamName ); + auto itTransport = m_Transports.find( streamName ); - /** - * Writes raw data to m_Buffer, call for global variables - * @param data pointer containing the data - * @param dataSize sizeof each element of data - * @param localDimensions if scalar it will have one number, if multidimensional it will start with the slowest moving dimension - * @param globalDimensions global dimensions, if multidimensional it will start with the slowest moving dimension - * @param globalOffsets global offsets, if multidimensional it will start with the slowest moving dimension offset - */ - void WriteDataToBuffer( const void* data, const size_t dataSize, - const std::vector<unsigned long long int>& localDimensions, - const std::vector<unsigned long long int>& globalDimensions, - const std::vector<unsigned long long int>& globalOffsets ); + if( m_DebugMode == true ) + { + if( itBuffer == m_Buffers.end() ) + throw std::invalid_argument( "ERROR: stream (file name ) " + streamName + " has not been declared" ); + } + + if( itTransport->second->m_Method == "DataMan" ) //CDataMan needs entire data in buffer + itBuffer->second.resize( size * sizeof(T) ); //resize buffer to fit all data + + //WriteToBuffer( data, size, itBuffer->second, ); + } /** * Closes a certain stream at the transport level * @param streamName passed to corresponding transport so it can be closed. */ - void CloseStream( const std::string streamName ); + void Close( const std::string streamName ); + + +private: + + void CreateTransport( const std::string streamName, const std::string transport ); + + void CreateBuffer( const std::string streamName, const size_t maxBufferSize ); + + void CreateTransform( const std::string transform ); }; diff --git a/include/core/CTransport.h b/include/core/CTransport.h index 15ca4069b..fdac65ca0 100644 --- a/include/core/CTransport.h +++ b/include/core/CTransport.h @@ -28,6 +28,7 @@ class CTransport public: + const std::string m_Method; ///< transport method name #ifdef HAVE_MPI MPI_Comm m_MPIComm = NULL; ///< only used as reference to MPI communicator passed from parallel constructor, MPI_Comm is a pointer itself. Public as called from C #else @@ -46,7 +47,8 @@ public: * @param mpiComm passed to m_MPIComm * @param debugMode passed to m_DebugMode */ - CTransport( MPI_Comm mpiComm, const bool debugMode ): + CTransport( const std::string method, MPI_Comm mpiComm, const bool debugMode ): + m_Method{ method }, m_MPIComm{ mpiComm }, m_DebugMode{ debugMode } { @@ -69,10 +71,10 @@ public: * Sets the buffer and bufferSize for certain transport methods * @param buffer to be set to transport */ - virtual void SetBuffer( const std::vector<char>& buffer ) + virtual void SetBuffer( const std::vector<unsigned char>& buffer ) { }; - virtual void Write( std::vector<char>& buffer ) + virtual void Write( std::vector<unsigned char>& buffer ) { }; virtual void Close( ) = 0; //here think what needs to be passed diff --git a/include/functions/ADIOSFunctions.h b/include/functions/ADIOSFunctions.h index ccb4c6d37..e988161cb 100644 --- a/include/functions/ADIOSFunctions.h +++ b/include/functions/ADIOSFunctions.h @@ -12,6 +12,7 @@ #include <string> #include <vector> #include <map> +#include <cstring> //std::size_t /// \endcond #ifdef HAVE_MPI @@ -94,8 +95,38 @@ void SetMembers( const std::string& fileContent, const MPI_Comm mpiComm, std::st void InitXML( const std::string xmlConfigFile, const MPI_Comm mpiComm, const bool debugMode, std::string& hostLanguage, std::map< std::string, CGroup >& groups ); +/** + * Loops through a vector containing dimensions and returns the product of all elements + * @param dimensions input containing size on each dimension {Nx, Ny, Nz} + * @return product of all dimensions Nx * Ny * Nz + */ +unsigned long long int GetTotalSize( const std::vector<unsigned long long int>& dimensions ); + + +/** + * Threaded version of memcpy + * @param destination + * @param source + * @param count + * @param cores + */ +void MemcpyThreads( void* destination, const void* source, std::size_t count, unsigned int cores ); + + +/** + * Identifies a char* variable, assigns a reference to the proper variable, and sends data to capsule so it can write to buffer + * @param group + * @param variableName + * @param values + * @param capsule + */ +void WriteChar( CGroup& group, const std::string variableName, const char* values, CCapsule& capsule ); + + + + + -void WriteChar( CGroup& group, SVariable<char>& variable, const char* values, CCapsule& capsule ); } //end namespace diff --git a/include/functions/ADIOSTemplates.h b/include/functions/ADIOSTemplates.h index fe68a0a82..1737c89da 100644 --- a/include/functions/ADIOSTemplates.h +++ b/include/functions/ADIOSTemplates.h @@ -8,9 +8,16 @@ #ifndef ADIOSTEMPLATES_H_ #define ADIOSTEMPLATES_H_ +/// \cond EXCLUDE_FROM_DOXYGEN +#include <string> +#include <stdexcept> +/// \endcond + + #include "core/CGroup.h" #include "core/SVariable.h" #include "core/CCapsule.h" +#include "functions/ADIOSFunctions.h" namespace adios @@ -24,8 +31,8 @@ namespace adios * @param group * @param capsule */ -template< class T > -void WriteVariableValues( CGroup& group, const std::string variableName, const T* values, CCapsule& capsule ) +template<class T> +void WriteVariableValues( CGroup& group, const std::string variableName, const T* values, CCapsule& capsule, const unsigned int cores ) { const bool debugMode( group.m_DebugMode ); const std::string streamName( group.m_StreamName ); @@ -37,60 +44,44 @@ void WriteVariableValues( CGroup& group, const std::string variableName, const T throw std::invalid_argument( "ERROR: from Write function, variable " + variableName + " doesn't exist\n" ); } - const unsigned int index( itVariable->second.second ); //index is second in the pair Value of the m_Variables map - - if( std::is_same<T,char>::value ) //maybe use type? - { - auto& variable = group.m_Char[index]; - variable.m_Values = values; - auto localDimensions = group.GetDimensions( variable.m_DimensionsCSV ); - - if( variable.m_GlobalBoundsIndex > -1 ) //global variable - { - auto globalDimensions = group.GetDimensions( group.m_GlobalBounds[ variable.m_GlobalBoundsIndex ].first ); - auto globalOffsets = group.GetDimensions( group.m_GlobalBounds[ variable.m_GlobalBoundsIndex ].second ); - capsule.WriteDataToBuffer( variable.m_Values, sizeof(char), localDimensions, globalDimensions, globalOffsets ); - } - else - { - capsule.WriteDataToBuffer( streamName, variable.m_Values, sizeof(char), localDimensions ); - } - } - else if( std::is_same<T,unsigned char>::value ) - group.m_UChar[index].m_Values = values; - - else if( std::is_same<T,short>::value ) - group.m_Short[index].m_Values = values; - - else if( std::is_same<T,unsigned short>::value ) - group.m_UShort[index].m_Values = values; - - else if( std::is_same<T,int>::value ) - group.m_Int[index].m_Values = values; - - else if( std::is_same<T,unsigned int>::value ) - group.m_UInt[index].m_Values = values; - - else if( std::is_same<T,long int>::value ) - group.m_LInt[index].m_Values = values; - - else if( std::is_same<T,unsigned long int>::value ) - group.m_ULInt[index].m_Values = values; - - else if( std::is_same<T,long long int>::value ) - group.m_LLInt[index].m_Values = values; - - else if( std::is_same<T,unsigned long long int>::value ) - group.m_ULLInt[index].m_Values = values; - - else if( std::is_same<T,float>::value ) - group.m_Float[index].m_Values = values; - - else if( std::is_same<T,double>::value ) - group.m_Double[index].m_Values = values; - - else if( std::is_same<T,long double>::value ) - group.m_LDouble[index].m_Values = values; + if( std::is_same<T,char>::value ) //maybe use type with debugMode? + WriteChar( group, variableName, values, capsule, cores ); + +// else if( std::is_same<T,unsigned char>::value ) +// group.m_UChar[index].m_Values = values; +// +// else if( std::is_same<T,short>::value ) +// group.m_Short[index].m_Values = values; +// +// else if( std::is_same<T,unsigned short>::value ) +// group.m_UShort[index].m_Values = values; +// +// else if( std::is_same<T,int>::value ) +// group.m_Int[index].m_Values = values; +// +// else if( std::is_same<T,unsigned int>::value ) +// group.m_UInt[index].m_Values = values; +// +// else if( std::is_same<T,long int>::value ) +// group.m_LInt[index].m_Values = values; +// +// else if( std::is_same<T,unsigned long int>::value ) +// group.m_ULInt[index].m_Values = values; +// +// else if( std::is_same<T,long long int>::value ) +// group.m_LLInt[index].m_Values = values; +// +// else if( std::is_same<T,unsigned long long int>::value ) +// group.m_ULLInt[index].m_Values = values; +// +// else if( std::is_same<T,float>::value ) +// group.m_Float[index].m_Values = values; +// +// else if( std::is_same<T,double>::value ) +// group.m_Double[index].m_Values = values; +// +// else if( std::is_same<T,long double>::value ) +// group.m_LDouble[index].m_Values = values; group.m_SetVariables.insert( variableName ); } diff --git a/include/functions/CCapsuleTemplates.h b/include/functions/CCapsuleTemplates.h new file mode 100644 index 000000000..159750003 --- /dev/null +++ b/include/functions/CCapsuleTemplates.h @@ -0,0 +1,119 @@ +/* + * CCapsuleTemplates.h + * + * Created on: Nov 18, 2016 + * Author: wfg + */ + +#ifndef CCAPSULETEMPLATES_H_ +#define CCAPSULETEMPLATES_H_ + + +#include <cstring> //std::memcpy + +namespace adios +{ + +/** + * threaded version of std::memcpy + * @param dest + * @param source + * @param count + * @param cores + */ +template<class T, class U> +void MemcpyThreads( T* destination, const U* source, std::size_t count, const unsigned int cores = 1 ) +{ + if( cores == 1 ) + { + std::memcpy( &destination[0], &source[0], count ); + return; + } + + const unsigned long long int stride = (unsigned long long int)std::floor( (unsigned long long int)count/cores ); + const unsigned long long int remainder = (unsigned long int) count % cores; + const unsigned long long int last = stride + remainder; + + std::vector<std::thread> memcpyThreads; + memcpyThreads.reserve( cores ); + + for( unsigned int core = 0; core < cores; ++core ) + { + const size_t initialDestination = stride * core / sizeof(T); + const size_t initialSource = stride * core / sizeof(U); + + if( core == cores-1 ) + memcpyThreads.push_back( std::thread( std::memcpy, &destination[initialDestination], &source[initialSource], last ) ); + else + memcpyThreads.push_back( std::thread( std::memcpy, &destination[initialDestination], &source[initialSource], stride ) ); + } + + //Now join the threads + for( auto& thread : memcpyThreads ) + thread.join( ); +} + + +/** + * Write data to buffer checking that size of data is no more than maxBufferSize + * @param data + * @param size + * @param buffer + * @param maxBufferSize + * @param cores + */ +template<class T> +void WriteToBuffer( const T* data, const size_t size, std::vector<unsigned char>& buffer, + const size_t maxBufferSize, CTransport& transport, const unsigned int cores ) +{ + const size_t dataBytes = size * sizeof( T ); + + //if buffer size is enough send all at once to transport and return + if( dataBytes <= buffer.size() ) + { + MemcpyThreads( &buffer[0], data, dataBytes, cores ); //copy memory in threaded fashion, need to test with size + return; + } + + if( dataBytes > buffer.size() ) //dataBytes > buffer.size() + { + if( dataBytes <= maxBufferSize ) // maxBufferSize > dataBytes > buffer.size() + { + buffer.resize( dataBytes ); + MemcpyThreads( &buffer[0], data, dataBytes, cores ); //copy memory in threaded fashion, need to test with size + return; + } + else + { + buffer.resize( maxBufferSize ); //resize to maxBufferSize + } + } + + // dataBytes > maxBufferSize == buffer.size() split the variable in buffer buckets + const size_t buckets = dataBytes / maxBufferSize + 1; + const size_t remainder = dataBytes % maxBufferSize; + + + for( unsigned int bucket = 0; buckets < buckets; ++bucket ) + { + const size_t dataOffset = bucket * maxBufferSize / sizeof( T ); + + if( bucket == buckets-1 ) + MemcpyThreads( &buffer[0], data[dataOffset], remainder, cores ); + else + MemcpyThreads( &buffer[0], data[dataOffset], maxBufferSize, cores ); + + transport.Write( buffer ); + } +} + + + + + + +} //end namespace + + + +#endif /* CCAPSULETEMPLATES_H_ */ diff --git a/include/public/ADIOS.h b/include/public/ADIOS.h index 5a7322c1f..4058e5058 100644 --- a/include/public/ADIOS.h +++ b/include/public/ADIOS.h @@ -95,7 +95,7 @@ public: // PUBLIC Constructors and Functions define the User Interface with ADIO * @param values pointer to the variable values passed from the user application, use dynamic_cast to check that pointer is of the same value type */ template<class T> - void Write( const std::string groupName, const std::string variableName, const T* values ) + void Write( const std::string groupName, const std::string variableName, const T* values, const unsigned int cores = 1 ) { auto itGroup = m_Groups.find( groupName ); if( m_DebugMode == true ) diff --git a/include/transport/CPOSIX.h b/include/transport/CPOSIX.h index e0256ccfa..26f85556d 100644 --- a/include/transport/CPOSIX.h +++ b/include/transport/CPOSIX.h @@ -22,7 +22,7 @@ class CPOSIX : public CTransport public: - CPOSIX( MPI_Comm mpiComm, const bool debugMode ); + CPOSIX( MPI_Comm mpiComm, const bool debugMode ); ~CPOSIX( ); diff --git a/src/core/CCapsule.cpp b/src/core/CCapsule.cpp index 3b7c467ec..98562ffe3 100644 --- a/src/core/CCapsule.cpp +++ b/src/core/CCapsule.cpp @@ -5,13 +5,16 @@ * Author: wfg */ + +#include <stdexcept> //std::invalid_argument +#include <cstring> + #include "core/CCapsule.h" #ifdef HAVE_BZIP2 #include "transform/CBZIP2.h" #endif - //transports #include "transport/CPOSIX.h" #include "transport/CFStream.h" @@ -42,68 +45,64 @@ CCapsule::~CCapsule( ) { } -void CCapsule::SetTransform( const std::string transform ) +void CCapsule::Open( const std::string streamName, const std::string accessMode, const size_t maxBufferSize, const std::string transport ) { - std::string method( transform ); - auto colonPosition = transform.find(":"); + CreateTransport( streamName, transport ); + m_Transports[streamName]->Open( streamName, accessMode ); - if( colonPosition != transform.npos ) - { - method = transform.substr( 0, colonPosition ); - } + CreateBuffer( streamName, maxBufferSize ); + m_Transports[streamName]->SetBuffer( m_Buffers[streamName] ); +} - if( m_Transforms.find( method ) != m_Transforms.end() ) //transform method already exists, do nothing - return; - if( method == "bzip2" ) //here must add debug mode exception - { - #ifdef HAVE_BZIP2 - m_Transforms["bzip2"] = std::make_shared<CBZIP2>( ); - #endif - } +void CCapsule::Close( const std::string streamName ) +{ + m_Transports[streamName]->Close( ); //should release resources } - -void CCapsule::SetTransport( const std::string streamName, const std::string transport, const bool debugMode ) +//PRIVATE FUNCTIONS +void CCapsule::CreateTransport( const std::string streamName, const std::string transport ) { if( transport == "POSIX" ) - m_Transports[streamName] = std::make_shared<CPOSIX>( m_MPIComm, debugMode ); + m_Transports[streamName] = std::make_shared<CPOSIX>( m_MPIComm, m_DebugMode ); else if( transport == "FStream" ) - m_Transports[streamName] = std::make_shared<CFStream>( m_MPIComm, debugMode ); + m_Transports[streamName] = std::make_shared<CFStream>( m_MPIComm, m_DebugMode ); else if( transport == "DataMan" ) - m_Transports[streamName] = std::make_shared<CDataMan>( m_MPIComm, debugMode ); + m_Transports[streamName] = std::make_shared<CDataMan>( m_MPIComm, m_DebugMode ); +} + +void CCapsule::CreateBuffer( const std::string streamName, const size_t maxBufferSize ) +{ + m_Buffers[streamName] = std::vector<unsigned char>( 1 ); // vector of size 1 + m_MaxBufferSize[streamName] = maxBufferSize; } -void CCapsule::SetBuffer( const std::string streamName, const unsigned long long int maxBufferSize ) +void CCapsule::CreateTransform( const std::string transform ) { - auto itBuffer = m_Buffers.find( streamName ); + std::string method( transform ); + auto colonPosition = transform.find(":"); - if( m_DebugMode == true ) + if( colonPosition != transform.npos ) { - if( itBuffer == m_Buffers.end() ) - throw std::invalid_argument( "ERROR: buffer for stream " + streamName + " not found, not setting size\n" ); + method = transform.substr( 0, colonPosition ); } - itBuffer->second.resize( maxBufferSize ); //use resize not reserve -} - + if( m_Transforms.find( method ) != m_Transforms.end() ) //transform method already exists, do nothing + return; -void CCapsule::Open( const std::string streamName, const std::string accessMode ) -{ - m_Transports[streamName]->Open( streamName, accessMode ); - m_Transports[streamName]->SetBuffer( m_Buffers[streamName] ); + if( method == "bzip2" ) //here must add debug mode exception + { + #ifdef HAVE_BZIP2 + m_Transforms["bzip2"] = std::make_shared<CBZIP2>( ); + #endif + } } -void CCapsule::CloseStream( const std::string streamName ) -{ - m_Transports[streamName]->Close( ); //should release resources -} - } //end namespace diff --git a/src/functions/ADIOSFunctions.cpp b/src/functions/ADIOSFunctions.cpp index 64ebc7a82..12ca15793 100644 --- a/src/functions/ADIOSFunctions.cpp +++ b/src/functions/ADIOSFunctions.cpp @@ -10,6 +10,8 @@ #include <sstream> #include <stdexcept> #include <iostream> +#include <thread> //std::thread +#include <cstring> //std::memcpy /// \endcond #include "functions/ADIOSFunctions.h" @@ -312,6 +314,7 @@ void SetMembers( const std::string& fileContent, const MPI_Comm mpiComm, const b lf_UIntCheck( method, iterationStr, "iteration", debugMode, iteration ); itGroup->second.m_Transport = method; + //here do something with the capsule } } @@ -348,10 +351,54 @@ void InitXML( const std::string xmlConfigFile, const MPI_Comm mpiComm, const boo } +unsigned long long int GetTotalSize( const std::vector<unsigned long long int>& dimensions ) +{ + unsigned long long int product = 1; + + for( const auto dimension : dimensions ) + product *= dimension; + + return product; +} + + +void MemcpyThreads( void* destination, const void* source, std::size_t count, const unsigned int cores ) +{ + + const size_t stride = (size_t) std::floor( count/cores ); + const size_t remainder = (size_t) count % cores; + std::vector<std::thread> memcpyThreads; + memcpyThreads.reserve( cores ); + + for( unsigned int core = 0; core < cores; ++core ) + { + const unsigned int initial = stride * core; + + if( core == cores-1 ) + memcpyThreads.push_back( std::thread( std::memcpy( &destination[initial], &source[initial], remainder ) ) ); + else + memcpyThreads.push_back( std::thread( std::memcpy( &destination[initial], &source[initial], stride ) ) ); + } + + //Now join the threads + std::for_each( memcpyThreads.begin(), memcpyThreads.end(), []( std::thread& thread ){ thread.join(); } ); + +} + + //Write helper functions -void WriteChar( CGroup& group, SVariable<char>& variable, const char* values, CCapsule& capsule ) +void WriteChar( CGroup& group, const std::string variableName, const char* values, CCapsule& capsule, const unsigned int cores ) { + if( group.m_DebugMode == true ) + { + const std::string type( group.m_Variables.at( variableName ).first ); + if( type != "char" ) + throw std::invalid_argument( "ERROR: variable " + variableName + " is not char\n" ); + } + + const unsigned int index = group.m_Variables.at( variableName ).second; + SVariable<char>& variable = group.m_Char[index]; variable.m_Values = values; auto localDimensions = group.GetDimensions( variable.m_DimensionsCSV ); @@ -359,11 +406,13 @@ void WriteChar( CGroup& group, SVariable<char>& variable, const char* values, CC { auto globalDimensions = group.GetDimensions( group.m_GlobalBounds[ variable.m_GlobalBoundsIndex ].first ); auto globalOffsets = group.GetDimensions( group.m_GlobalBounds[ variable.m_GlobalBoundsIndex ].second ); - capsule.WriteDataToBuffer( variable.m_Values, sizeof(char), localDimensions, globalDimensions, globalOffsets ); + + //capsule.Write( group.m_StreamName, variable.m_Values, sizeof(char), localDimensions, globalDimensions, globalOffsets ); } - else + else //write local variable { - capsule.WriteDataToBuffer( group.m_StreamName, variable.m_Values, sizeof(char), localDimensions ); + const unsigned long long int size = GetTotalSize( localDimensions ); + capsule.Write( group.m_StreamName, variable.m_Values, size, cores ); } } diff --git a/src/public/ADIOS.cpp b/src/public/ADIOS.cpp index 574677594..35ad01c7a 100644 --- a/src/public/ADIOS.cpp +++ b/src/public/ADIOS.cpp @@ -80,14 +80,12 @@ void ADIOS::Open( const std::string groupName, const std::string streamName, con CGroup& group = itGroup->second; group.m_IsOpen = true; group.m_StreamName = streamName; - - m_Capsule.SetTransport( streamName, itGroup->second.m_Transport, m_DebugMode ); //Set Transport - m_Capsule.SetBuffer( streamName, maxBufferSize ); - m_Capsule.Open( streamName, accessMode ); + //Open Stream/Buffer/Transport in capsule + m_Capsule.Open( streamName, accessMode, maxBufferSize, itGroup->second.m_Transport ); } -void ADIOS::Close( const std::string groupName ) +void ADIOS::Close( const std::string groupName ) //need to think if it's a group or stream { auto itGroup = m_Groups.find( groupName ); if( m_DebugMode == true ) @@ -100,7 +98,7 @@ void ADIOS::Close( const std::string groupName ) CGroup& group = itGroup->second; - m_Capsule.CloseStream( group.m_StreamName ); // first close any stream associated with group + m_Capsule.Close( group.m_StreamName ); // first close any stream associated with group group.m_StreamName.clear(); group.m_Transport.clear(); group.m_IsOpen = false; diff --git a/src/transport/CDataMan.cpp b/src/transport/CDataMan.cpp index 0422a486b..855afa687 100644 --- a/src/transport/CDataMan.cpp +++ b/src/transport/CDataMan.cpp @@ -13,10 +13,12 @@ namespace adios { + CDataMan::CDataMan( MPI_Comm mpiComm, const bool debugMode ): - CTransport( mpiComm, debugMode ) + CTransport( "DataMan", mpiComm, debugMode ) { } + CDataMan::~CDataMan( ) { } diff --git a/src/transport/CFStream.cpp b/src/transport/CFStream.cpp index a7f8029a3..a0285d817 100644 --- a/src/transport/CFStream.cpp +++ b/src/transport/CFStream.cpp @@ -20,7 +20,7 @@ namespace adios CFStream::CFStream( MPI_Comm mpiComm, const bool debugMode ): - CTransport( mpiComm, debugMode ) + CTransport( "FStream", mpiComm, debugMode ) { } diff --git a/src/transport/CPOSIX.cpp b/src/transport/CPOSIX.cpp index e5ac7ce2d..c253ec016 100644 --- a/src/transport/CPOSIX.cpp +++ b/src/transport/CPOSIX.cpp @@ -15,7 +15,7 @@ namespace adios CPOSIX::CPOSIX( MPI_Comm mpiComm, const bool debugMode ): - CTransport( mpiComm, debugMode ), + CTransport( "POSIX", mpiComm, debugMode ), m_File( NULL ) { } -- GitLab