diff --git a/examples/hello/datamanReader/helloDataManReader.cpp b/examples/hello/datamanReader/helloDataManReader.cpp index 73c1cc99bac31ba05ad8cb1fc99a38d8cb40b0b4..0943c6c402f32867ac5f966a1bf4ee2f496bbe3e 100644 --- a/examples/hello/datamanReader/helloDataManReader.cpp +++ b/examples/hello/datamanReader/helloDataManReader.cpp @@ -24,7 +24,6 @@ void getcb( const void *data, std::string doid, std::string var, std::string dty std::cout << "data object ID = " << doid << "\n"; //do you need to flush? std::cout << "variable name = " << var << "\n"; std::cout << "data type = " << dtype << "\n"; - float *dataf = (float*)data; std::size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1, std::multiplies<std::size_t>()); @@ -33,7 +32,6 @@ void getcb( const void *data, std::string doid, std::string var, std::string dty std::cout << std::endl; } - int main( int argc, char* argv [] ) { MPI_Init( &argc, &argv ); diff --git a/examples/hello/datamanReader/helloDataManReader_nompi.cpp b/examples/hello/datamanReader/helloDataManReader_nompi.cpp index 11c17a138d1dc72ecc05b0c09d1d903d5a8e6665..be16f1149ad4a54b89ae951020bb019f9bd9ffbe 100644 --- a/examples/hello/datamanReader/helloDataManReader_nompi.cpp +++ b/examples/hello/datamanReader/helloDataManReader_nompi.cpp @@ -7,9 +7,22 @@ #include <vector> #include <iostream> +#include <numeric> #include "ADIOS_CPP.h" +void getcb( const void *data, std::string doid, std::string var, std::string dtype, std::vector<std::size_t> varshape ) +{ + std::cout << "data object ID = " << doid << "\n"; //do you need to flush? + std::cout << "variable name = " << var << "\n"; + std::cout << "data type = " << dtype << "\n"; + + std::size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1, std::multiplies<std::size_t>()); + + for (int i=0; i<varsize; i++) + std::cout << ((float*)data)[i] << " "; + std::cout << std::endl; +} int main( int argc, char* argv [] ) { @@ -20,8 +33,8 @@ int main( int argc, char* argv [] ) { //Define method for engine creation, it is basically straight-forward parameters adios::Method& datamanSettings = adios.DeclareMethod( "WAN", "DataManReader" ); //default method type is BPWriter - datamanSettings.SetParameters( "peer-to-peer=yes" ); - datamanSettings.AddTransport( "Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3" ); + datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=zmq", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12307", "remote_port=12306" ); +// datamanSettings.AddTransport( "Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3" ); //datamanSettings.AddTransport( "ZeroMQ", "localIP=128.0.0.0.1.1", "remoteIP=128.0.0.0.2.1", "tolerances=1,2,3" ); not yet supported , will throw an exception //Create engine smart pointer to DataManReader Engine due to polymorphism, @@ -31,6 +44,10 @@ int main( int argc, char* argv [] ) if( datamanReader == nullptr ) throw std::ios_base::failure( "ERROR: failed to create DataMan I/O engine at Open\n" ); + datamanReader->SetCallBack( getcb ); + + while(1){} + adios::Variable<double>* ioMyDoubles = datamanReader->InquireVariableDouble( "ioMyDoubles" ); if( ioMyDoubles == nullptr ) std::cout << "Variable ioMyDoubles not read...yet\n"; diff --git a/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp b/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp index d9d245c79de90897fddda7e132c2c8f8582fee43..b7ad9693b85fb6ed380fa17a2f23c8d9613eee4c 100644 --- a/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp +++ b/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp @@ -17,6 +17,7 @@ int main( int argc, char* argv [] ) adios::ADIOS adios( adiosDebug ); //Application variable + std::vector<float> myFloats = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; std::vector<double> myDoubles = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; const std::size_t Nx = myDoubles.size(); @@ -30,12 +31,13 @@ int main( int argc, char* argv [] ) { //Define variable and local size //Define variable and local size - auto& ioMyDoubles = adios.DefineVariable<double>( "myDoubles", adios::Dims{Nx} ); - auto& ioMyCFloats = adios.DefineVariable<std::complex<float>>( "myCFloats", {3} ); + auto& ioMyFloats = adios.DefineVariable<float>( "myfloats", adios::Dims{Nx} ); +// auto& ioMyDoubles = adios.DefineVariable<double>( "myDoubles", adios::Dims{Nx} ); +// auto& ioMyCFloats = adios.DefineVariable<std::complex<float>>( "myCFloats", {3} ); //Define method for engine creation, it is basically straight-forward parameters adios::Method& datamanSettings = adios.DeclareMethod( "WAN", "DataManWriter" ); //default method type is Writer - datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=dump", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12306", "remote_port=12307" ); + datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=zmq", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12306", "remote_port=12307" ); // datamanSettings.AddTransport( "Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3" ); //datamanSettings.AddTransport( "ZeroMQ", "localIP=128.0.0.0.1.1", "remoteIP=128.0.0.0.2.1", "tolerances=1,2,3" ); not yet supported , will throw an exception @@ -46,8 +48,8 @@ int main( int argc, char* argv [] ) if( datamanWriter == nullptr ) throw std::ios_base::failure( "ERROR: failed to create DataMan I/O engine at Open\n" ); - datamanWriter->Write( ioMyDoubles, myDoubles.data() ); // Base class Engine own the Write<T> that will call overloaded Write from Derived - datamanWriter->Write( ioMyCFloats, myCFloats.data() ); + datamanWriter->Write( ioMyFloats, myFloats.data() ); // Base class Engine own the Write<T> that will call overloaded Write from Derived +// datamanWriter->Write( ioMyCFloats, myCFloats.data() ); datamanWriter->Close( ); } catch( std::invalid_argument& e ) diff --git a/include/engine/dataman/DataManReader.h b/include/engine/dataman/DataManReader.h index e296687746d012dbe90efc066d61681690b27fff..9def2e9f647212e357a19b04e73a30432c861641 100644 --- a/include/engine/dataman/DataManReader.h +++ b/include/engine/dataman/DataManReader.h @@ -79,8 +79,10 @@ private: capsule::STLVector m_Buffer; ///< heap capsule, contains data and metadata buffers format::BP1Writer m_BP1Writer; ///< format object will provide the required BP functionality to be applied on m_Buffer and m_Transports - std::function<void( const void*, std::string, std::string, std::string, Dims )> m_CallBack; ///< call back function + bool m_DoRealTime = false; + DataManager m_Man; + std::function<void( const void*, std::string, std::string, std::string, Dims )> m_CallBack; ///< call back function void Init( ); ///< calls InitCapsules and InitTransports based on Method, called from constructor void InitCapsules( ); diff --git a/src/engine/dataman/DataManReader.cpp b/src/engine/dataman/DataManReader.cpp index a2e32889de2731839dc16c330c75086f4a8e3eee..0f5a5c044cf47014b6073205108cf97888f1f15f 100644 --- a/src/engine/dataman/DataManReader.cpp +++ b/src/engine/dataman/DataManReader.cpp @@ -36,6 +36,7 @@ DataManReader::~DataManReader( ) void DataManReader::SetCallBack( std::function<void( const void*, std::string, std::string, std::string, Dims )> callback ) { m_CallBack = callback; + m_Man.reg_callback(callback); } Variable<void>* DataManReader::InquireVariable( const std::string name, const bool readIn ) //not yet implemented @@ -109,8 +110,69 @@ void DataManReader::Init( ) ", in call to ADIOS Open or DataManReader constructor\n" ); } - InitCapsules( ); - InitTransports( ); + auto itRealTime = m_Method.m_Parameters.find( "real_time" ); + if( itRealTime != m_Method.m_Parameters.end() ) + { + if( itRealTime->second == "yes" || itRealTime->second == "true" ) + m_DoRealTime = true; + } + + if(m_DoRealTime) + { + /** + * Lambda function that assigns a parameter in m_Method to a localVariable of type std::string + */ + auto lf_AssignString = [this]( const std::string parameter, std::string& localVariable ) + { + auto it = m_Method.m_Parameters.find( parameter ); + if( it != m_Method.m_Parameters.end() ) + { + localVariable = it->second; + } + }; + + /** + * Lambda function that assigns a parameter in m_Method to a localVariable of type int + */ + auto lf_AssignInt = [this]( const std::string parameter, int& localVariable ) + { + auto it = m_Method.m_Parameters.find( parameter ); + if( it != m_Method.m_Parameters.end() ) + { + localVariable = std::stoi( it->second ); + } + }; + + std::string method_type, method, local_ip, remote_ip; //no need to initialize to empty (it's default) + int local_port=0, remote_port=0, num_channels=0; + + lf_AssignString( "method_type", method_type ); + if( method_type == "stream" ) + { + lf_AssignString( "method", method ); + lf_AssignString( "local_ip", local_ip ); + lf_AssignString( "remote_ip", remote_ip ); + lf_AssignInt( "local_port", local_port ); + lf_AssignInt( "remote_port", remote_port ); + lf_AssignInt( "num_channels", num_channels ); + + json jmsg; + jmsg["method"] = method; + jmsg["local_ip"] = local_ip; + jmsg["remote_ip"] = remote_ip; + jmsg["local_port"] = local_port; + jmsg["remote_port"] = remote_port; + jmsg["num_channels"] = num_channels; + jmsg["stream_mode"] = "receiver"; + + m_Man.add_stream(jmsg); + } + } + else + { + InitCapsules( ); + InitTransports( ); + } } diff --git a/src/engine/dataman/DataManWriter.cpp b/src/engine/dataman/DataManWriter.cpp index 4e0b70d9f32e32c2ad027e71d1c25c685751fa6b..9152f6b15b924440ae9e09ea259a6226506861b0 100644 --- a/src/engine/dataman/DataManWriter.cpp +++ b/src/engine/dataman/DataManWriter.cpp @@ -39,6 +39,7 @@ DataManWriter::~DataManWriter( ) void DataManWriter::SetCallBack( std::function<void( const void*, std::string, std::string, std::string, Dims )> callback ) { m_CallBack = callback; + m_Man.reg_callback(callback); } void DataManWriter::Write( Variable<char>& variable, const char* values ) @@ -141,6 +142,7 @@ void DataManWriter::Write( const std::string variableName, const std::complex<lo void DataManWriter::Close( const int transportIndex ) { + m_Man.flush(); //here close IPs and deallocate or free/close resources (if using STL no need for memory deallocation) } @@ -209,6 +211,7 @@ void DataManWriter::Init( ) jmsg["local_port"] = local_port; jmsg["remote_port"] = remote_port; jmsg["num_channels"] = num_channels; + jmsg["stream_mode"] = "sender"; m_Man.add_stream(jmsg); }