diff --git a/examples/hello/datamanReader/helloDataManReader.cpp b/examples/hello/datamanReader/helloDataManReader.cpp index 9b315d702875c9ac071a36479885fd5e26ac1f03..73c1cc99bac31ba05ad8cb1fc99a38d8cb40b0b4 100644 --- a/examples/hello/datamanReader/helloDataManReader.cpp +++ b/examples/hello/datamanReader/helloDataManReader.cpp @@ -1,5 +1,5 @@ /* - * helloWriter.cpp + * helloDataManReader.cpp * * Created on: Feb 16, 2017 * Author: wfg @@ -8,7 +8,10 @@ #include <vector> -#include <iostream> +#include <iostream> //std::cout, std::endl +#include <string> +#include <numeric> //std::accumulate +#include <functional> //std::multiplies #include <mpi.h> @@ -16,6 +19,20 @@ #include "ADIOS_CPP.h" +void getcb( const void *data, std::string doid, std::string var, std::string dtype, std::vector<std::size_t> varshape ) +{ + std::cout << "data object ID = " << doid << "\n"; //do you need to flush? + std::cout << "variable name = " << var << "\n"; + std::cout << "data type = " << dtype << "\n"; + float *dataf = (float*)data; + + std::size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1, std::multiplies<std::size_t>()); + + for (int i=0; i<varsize; i++) + std::cout << ((float*)data)[i] << " "; + std::cout << std::endl; +} + int main( int argc, char* argv [] ) { @@ -25,7 +42,6 @@ int main( int argc, char* argv [] ) const bool adiosDebug = true; adios::ADIOS adios( MPI_COMM_WORLD, adiosDebug ); - try { //Define method for engine creation, it is basically straight-forward parameters @@ -41,6 +57,8 @@ int main( int argc, char* argv [] ) if( datamanReader == nullptr ) throw std::ios_base::failure( "ERROR: failed to create DataMan I/O engine at Open\n" ); + datamanReader->SetCallBack( getcb ); + adios::Variable<double>* ioMyDoubles = datamanReader->InquireVariableDouble( "ioMyDoubles" ); if( ioMyDoubles == nullptr ) std::cout << "Variable ioMyDoubles not read...yet\n"; diff --git a/include/core/Engine.h b/include/core/Engine.h index 228fed860855ba0bd99d7a3f76ff164f3ee299ff..e5e8f16cac74ea35c72a3e72628d6634ccf20211 100644 --- a/include/core/Engine.h +++ b/include/core/Engine.h @@ -77,6 +77,13 @@ public: virtual ~Engine( ); + + /** + * Needed for DataMan Engine + * @param callback + */ + virtual void SetCallBack( std::function<void( const void*, std::string, std::string, std::string, Dims )> callback ); + /** * Write function that adds static checking on the variable to be passed by values * It then calls its corresponding derived class virtual function diff --git a/include/engine/dataman/DataManReader.h b/include/engine/dataman/DataManReader.h index 3e4df180242bb499c031923d3b28b5b48c7ec1a2..1509b6b97986bbedcd3c0a2fe091096b2677c4de 100644 --- a/include/engine/dataman/DataManReader.h +++ b/include/engine/dataman/DataManReader.h @@ -40,6 +40,12 @@ public: ~DataManReader( ); + /** + * Set callback function from user application + * @param callback function (get) provided by the user to be applied in DataMan + */ + void SetCallBack( std::function<void( const void*, std::string, std::string, std::string, Dims )> callback ); + Variable<void>* InquireVariable( const std::string name, const bool readIn = true ); Variable<char>* InquireVariableChar( const std::string name, const bool readIn = true ); Variable<unsigned char>* InquireVariableUChar( const std::string name, const bool readIn = true ); diff --git a/include/engine/dataman/DataManWriter.h b/include/engine/dataman/DataManWriter.h index 2cffcb4ee308adcd96865e2b366d88ac464a6c83..f903a6475747653f8c38f4662d6c893810086239 100644 --- a/include/engine/dataman/DataManWriter.h +++ b/include/engine/dataman/DataManWriter.h @@ -43,6 +43,8 @@ public: ~DataManWriter( ); + void SetCallBack( std::function<void( const void*, std::string, std::string, std::string, Dims )> callback ); + void Write( Variable<char>& variable, const char* values ); void Write( Variable<unsigned char>& variable, const unsigned char* values ); void Write( Variable<short>& variable, const short* values ); @@ -87,6 +89,7 @@ private: bool m_DoRealTime = false; DataManager m_Man; + std::function<void( const void*, std::string, std::string, std::string, Dims )> m_CallBack; ///< call back function void Init( ); ///< calls InitCapsules and InitTransports based on Method, called from constructor void InitCapsules( ); @@ -111,7 +114,19 @@ private: //This part will go away, this is just to monitor variables per rank - m_Man.put(values, "", variable.m_Name, GetType<T>(), variable.m_Dimensions, variable.m_GlobalDimensions, variable.m_GlobalOffsets, 0); +// put(void *p_data, +// string p_doid, +// string p_var, +// string p_dtype, +// vector<uint64_t> p_putshape, +// vector<uint64_t> p_varshape, +// vector<uint64_t> p_offset, +// uint64_t p_timestep, +// int p_tolerance, +// int p_priority +// ){ + + m_Man.put( values, "", variable.m_Name, GetType<T>(), variable.m_Dimensions, variable.m_GlobalDimensions, variable.m_GlobalOffsets, 0); std::cout << "I am hooked to the DataMan library\n"; MPI_Barrier( m_MPIComm ); diff --git a/src/core/Engine.cpp b/src/core/Engine.cpp index c8f7902851e09aaf82fc5e23536f1c3696756db4..0ad8d8c946f528e5509db4c60649a8007ee4b792 100644 --- a/src/core/Engine.cpp +++ b/src/core/Engine.cpp @@ -36,6 +36,7 @@ Engine::Engine( ADIOS& adios, const std::string engineType, const std::string na Engine::~Engine( ) { } +void Engine::SetCallBack( std::function<void( const void*, std::string, std::string, std::string, Dims )> callback ){ } //should these functions throw an exception? void Engine::Write( Variable<char>& variable, const char* values ){ } diff --git a/src/engine/dataman/DataManReader.cpp b/src/engine/dataman/DataManReader.cpp index ef17b90885d07f03ee808a1f154a6ecfe8f452e9..6286cc4fb081de3e7543f743d83e9ff6bb1ba33c 100644 --- a/src/engine/dataman/DataManReader.cpp +++ b/src/engine/dataman/DataManReader.cpp @@ -33,20 +33,11 @@ DataManReader::DataManReader( ADIOS& adios, const std::string name, const std::s DataManReader::~DataManReader( ) { } -void DataManReader::Init( ) +void DataManWriter::SetCallBack( std::function<void( const void*, std::string, std::string, std::string, Dims )> callback ) { - if( m_DebugMode == true ) - { - if( m_AccessMode != "r" && m_AccessMode != "read" ) - throw std::invalid_argument( "ERROR: DataManReader doesn't support access mode " + m_AccessMode + - ", in call to ADIOS Open or DataManReader constructor\n" ); - } - - InitCapsules( ); - InitTransports( ); + m_CallBack = callback; } - Variable<void>* DataManReader::InquireVariable( const std::string name, const bool readIn ) //not yet implemented { return nullptr; } @@ -109,6 +100,20 @@ void DataManReader::Close( const int transportIndex ) //PRIVATE +void DataManReader::Init( ) +{ + if( m_DebugMode == true ) + { + if( m_AccessMode != "r" && m_AccessMode != "read" ) + throw std::invalid_argument( "ERROR: DataManReader doesn't support access mode " + m_AccessMode + + ", in call to ADIOS Open or DataManReader constructor\n" ); + } + + InitCapsules( ); + InitTransports( ); +} + + void DataManReader::InitCapsules( ) { //here init memory capsules diff --git a/src/engine/dataman/DataManWriter.cpp b/src/engine/dataman/DataManWriter.cpp index 4a94dbbb08be7f0cc686a857c5cf1c3428532fd2..0cb73e1acadb394fe91def22ade9a21ca2ac335f 100644 --- a/src/engine/dataman/DataManWriter.cpp +++ b/src/engine/dataman/DataManWriter.cpp @@ -36,63 +36,11 @@ DataManWriter::DataManWriter( ADIOS& adios, const std::string name, const std::s DataManWriter::~DataManWriter( ) { } - -void DataManWriter::Init( ) +void DataManWriter::SetCallBack( std::function<void( const void*, std::string, std::string, std::string, Dims )> callback ) { - if( m_DebugMode == true ) - { - if( m_AccessMode != "w" && m_AccessMode != "write" && m_AccessMode != "a" && m_AccessMode != "append" ) - throw std::invalid_argument( "ERROR: DataManWriter doesn't support access mode " + m_AccessMode + - ", in call to ADIOS Open or DataManWriter constructor\n" ); - } - - auto itRealTime = m_Method.m_Parameters.find( "real_time" ); - if( itRealTime != m_Method.m_Parameters.end() ) - { - if( itRealTime->second == "yes" || itRealTime->second == "true" ) - m_DoRealTime = true; - } - - if(m_DoRealTime) - { - string method_type="", method="", local_ip="", remote_ip=""; - int local_port=0, remote_port=0, num_channels=0; - - auto i = m_Method.m_Parameters.find( "method_type" ); - if( i != m_Method.m_Parameters.end() ) - method_type = i->second; - if(method_type == "stream"){ - i = m_Method.m_Parameters.find( "method" ); - if( i != m_Method.m_Parameters.end() ) - method = i->second; - i = m_Method.m_Parameters.find( "local_ip" ); - if( i != m_Method.m_Parameters.end() ) - local_ip = i->second; - i = m_Method.m_Parameters.find( "remote_ip" ); - if( i != m_Method.m_Parameters.end() ) - remote_ip = i->second; - i = m_Method.m_Parameters.find( "local_port" ); - if( i != m_Method.m_Parameters.end() ) - istringstream(i->second) >> local_port; - i = m_Method.m_Parameters.find( "remote_port" ); - if( i != m_Method.m_Parameters.end() ) - istringstream(i->second) >> remote_port; - i = m_Method.m_Parameters.find( "num_channels" ); - if( i != m_Method.m_Parameters.end() ) - istringstream(i->second) >> num_channels; - m_Man.add_stream(local_ip, remote_ip, local_port, remote_port, num_channels, method); - } - - } - else - { - InitCapsules( ); - InitTransports( ); - } - + m_CallBack = callback; } - void DataManWriter::Write( Variable<char>& variable, const char* values ) { WriteVariableCommon( variable, values ); } @@ -199,6 +147,72 @@ void DataManWriter::Close( const int transportIndex ) //PRIVATE functions below +void DataManWriter::Init( ) +{ + if( m_DebugMode == true ) + { + if( m_AccessMode != "w" && m_AccessMode != "write" && m_AccessMode != "a" && m_AccessMode != "append" ) + throw std::invalid_argument( "ERROR: DataManWriter doesn't support access mode " + m_AccessMode + + ", in call to ADIOS Open or DataManWriter constructor\n" ); + } + + auto itRealTime = m_Method.m_Parameters.find( "real_time" ); + if( itRealTime != m_Method.m_Parameters.end() ) + { + if( itRealTime->second == "yes" || itRealTime->second == "true" ) + m_DoRealTime = true; + } + + if(m_DoRealTime) + { + /** + * Lambda function that assigns a parameter in m_Method to a localVariable of type std::string + */ + auto lf_AssignString = [this]( const std::string parameter, std::string& localVariable ) + { + auto it = m_Method.m_Parameters.find( parameter ); + if( it != m_Method.m_Parameters.end() ) + { + localVariable = it->second; + } + }; + + /** + * Lambda function that assigns a parameter in m_Method to a localVariable of type int + */ + auto lf_AssignInt = [this]( const std::string parameter, int& localVariable ) + { + auto it = m_Method.m_Parameters.find( parameter ); + if( it != m_Method.m_Parameters.end() ) + { + localVariable = std::stoi( it->second ); + } + }; + + std::string method_type, method, local_ip, remote_ip; //no need to initialize to empty (it's default) + int local_port=0, remote_port=0, num_channels=0; + + lf_AssignString( "method_type", method_type ); + if( method_type == "stream" ) + { + lf_AssignString( "method", method ); + lf_AssignString( "local_ip", local_ip ); + lf_AssignString( "remote_ip", remote_ip ); + lf_AssignInt( "local_port", local_port ); + lf_AssignInt( "remote_port", remote_port ); + lf_AssignInt( "num_channels", num_channels ); + + m_Man.add_stream(local_ip, remote_ip, local_port, remote_port, num_channels, method); + } + } + else + { + InitCapsules( ); + InitTransports( ); + } + +} + void DataManWriter::InitCapsules( ) { //here init memory capsules