From 7f62014c5ec66ca7d1efd1ca209421740c9826a2 Mon Sep 17 00:00:00 2001
From: Jason Wang <wangr1@ornl.gov>
Date: Tue, 28 Feb 2017 14:03:12 -0500
Subject: [PATCH] DataMan (ZmqMan) example worked for streaming data

---
 .../datamanReader/helloDataManReader.cpp      |  2 -
 .../helloDataManReader_nompi.cpp              | 21 +++++-
 .../helloDataManWriter_nompi.cpp              | 12 ++--
 include/engine/dataman/DataManReader.h        |  4 +-
 src/engine/dataman/DataManReader.cpp          | 66 ++++++++++++++++++-
 src/engine/dataman/DataManWriter.cpp          |  3 +
 6 files changed, 96 insertions(+), 12 deletions(-)

diff --git a/examples/hello/datamanReader/helloDataManReader.cpp b/examples/hello/datamanReader/helloDataManReader.cpp
index 73c1cc99b..0943c6c40 100644
--- a/examples/hello/datamanReader/helloDataManReader.cpp
+++ b/examples/hello/datamanReader/helloDataManReader.cpp
@@ -24,7 +24,6 @@ void getcb( const void *data, std::string doid, std::string var, std::string dty
     std::cout << "data object ID = " << doid << "\n"; //do you need to flush?
     std::cout << "variable name = " << var << "\n";
     std::cout << "data type = " << dtype << "\n";
-    float *dataf = (float*)data;
 
     std::size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1, std::multiplies<std::size_t>());
 
@@ -33,7 +32,6 @@ void getcb( const void *data, std::string doid, std::string var, std::string dty
     std::cout << std::endl;
 }
 
-
 int main( int argc, char* argv [] )
 {
     MPI_Init( &argc, &argv );
diff --git a/examples/hello/datamanReader/helloDataManReader_nompi.cpp b/examples/hello/datamanReader/helloDataManReader_nompi.cpp
index 11c17a138..be16f1149 100644
--- a/examples/hello/datamanReader/helloDataManReader_nompi.cpp
+++ b/examples/hello/datamanReader/helloDataManReader_nompi.cpp
@@ -7,9 +7,22 @@
 
 #include <vector>
 #include <iostream>
+#include <numeric>
 
 #include "ADIOS_CPP.h"
 
+void getcb( const void *data, std::string doid, std::string var, std::string dtype, std::vector<std::size_t> varshape )
+{
+    std::cout << "data object ID = " << doid << "\n"; //do you need to flush?
+    std::cout << "variable name = " << var << "\n";
+    std::cout << "data type = " << dtype << "\n";
+
+    std::size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1, std::multiplies<std::size_t>());
+
+    for (int i=0; i<varsize; i++)
+        std::cout << ((float*)data)[i] << " ";
+    std::cout << std::endl;
+}
 
 int main( int argc, char* argv [] )
 {
@@ -20,8 +33,8 @@ int main( int argc, char* argv [] )
     {
         //Define method for engine creation, it is basically straight-forward parameters
         adios::Method& datamanSettings = adios.DeclareMethod( "WAN", "DataManReader" ); //default method type is BPWriter
-        datamanSettings.SetParameters( "peer-to-peer=yes" );
-        datamanSettings.AddTransport( "Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3" );
+        datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=zmq", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12307", "remote_port=12306" );
+//        datamanSettings.AddTransport( "Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3" );
         //datamanSettings.AddTransport( "ZeroMQ", "localIP=128.0.0.0.1.1", "remoteIP=128.0.0.0.2.1", "tolerances=1,2,3" ); not yet supported , will throw an exception
 
         //Create engine smart pointer to DataManReader Engine due to polymorphism,
@@ -31,6 +44,10 @@ int main( int argc, char* argv [] )
         if( datamanReader == nullptr )
             throw std::ios_base::failure( "ERROR: failed to create DataMan I/O engine at Open\n" );
 
+        datamanReader->SetCallBack( getcb );
+
+        while(1){}
+
         adios::Variable<double>* ioMyDoubles = datamanReader->InquireVariableDouble( "ioMyDoubles" );
         if( ioMyDoubles == nullptr )
             std::cout << "Variable ioMyDoubles not read...yet\n";
diff --git a/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp b/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp
index d9d245c79..b7ad9693b 100644
--- a/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp
+++ b/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp
@@ -17,6 +17,7 @@ int main( int argc, char* argv [] )
     adios::ADIOS adios( adiosDebug );
 
     //Application variable
+    std::vector<float> myFloats = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
     std::vector<double> myDoubles = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
     const std::size_t Nx = myDoubles.size();
 
@@ -30,12 +31,13 @@ int main( int argc, char* argv [] )
     {
         //Define variable and local size
         //Define variable and local size
-        auto& ioMyDoubles = adios.DefineVariable<double>( "myDoubles", adios::Dims{Nx} );
-        auto& ioMyCFloats = adios.DefineVariable<std::complex<float>>( "myCFloats", {3} );
+        auto& ioMyFloats = adios.DefineVariable<float>( "myfloats", adios::Dims{Nx} );
+//        auto& ioMyDoubles = adios.DefineVariable<double>( "myDoubles", adios::Dims{Nx} );
+//        auto& ioMyCFloats = adios.DefineVariable<std::complex<float>>( "myCFloats", {3} );
 
         //Define method for engine creation, it is basically straight-forward parameters
         adios::Method& datamanSettings = adios.DeclareMethod( "WAN", "DataManWriter" ); //default method type is Writer
-        datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=dump", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12306", "remote_port=12307" );
+        datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=zmq", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12306", "remote_port=12307" );
 //        datamanSettings.AddTransport( "Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3" );
         //datamanSettings.AddTransport( "ZeroMQ", "localIP=128.0.0.0.1.1", "remoteIP=128.0.0.0.2.1", "tolerances=1,2,3" ); not yet supported , will throw an exception
 
@@ -46,8 +48,8 @@ int main( int argc, char* argv [] )
         if( datamanWriter == nullptr )
             throw std::ios_base::failure( "ERROR: failed to create DataMan I/O engine at Open\n" );
 
-        datamanWriter->Write( ioMyDoubles, myDoubles.data() ); // Base class Engine own the Write<T> that will call overloaded Write from Derived
-        datamanWriter->Write( ioMyCFloats, myCFloats.data() );
+        datamanWriter->Write( ioMyFloats, myFloats.data() ); // Base class Engine own the Write<T> that will call overloaded Write from Derived
+//        datamanWriter->Write( ioMyCFloats, myCFloats.data() );
         datamanWriter->Close( );
     }
     catch( std::invalid_argument& e )
diff --git a/include/engine/dataman/DataManReader.h b/include/engine/dataman/DataManReader.h
index e29668774..9def2e9f6 100644
--- a/include/engine/dataman/DataManReader.h
+++ b/include/engine/dataman/DataManReader.h
@@ -79,8 +79,10 @@ private:
 
     capsule::STLVector m_Buffer; ///< heap capsule, contains data and metadata buffers
     format::BP1Writer m_BP1Writer; ///< format object will provide the required BP functionality to be applied on m_Buffer and m_Transports
-    std::function<void( const void*, std::string, std::string, std::string, Dims )> m_CallBack; ///< call back function
 
+    bool m_DoRealTime = false;
+    DataManager m_Man;
+    std::function<void( const void*, std::string, std::string, std::string, Dims )> m_CallBack; ///< call back function
 
     void Init( );  ///< calls InitCapsules and InitTransports based on Method, called from constructor
     void InitCapsules( );
diff --git a/src/engine/dataman/DataManReader.cpp b/src/engine/dataman/DataManReader.cpp
index a2e32889d..0f5a5c044 100644
--- a/src/engine/dataman/DataManReader.cpp
+++ b/src/engine/dataman/DataManReader.cpp
@@ -36,6 +36,7 @@ DataManReader::~DataManReader( )
 void DataManReader::SetCallBack( std::function<void( const void*, std::string, std::string, std::string, Dims )> callback )
 {
     m_CallBack = callback;
+    m_Man.reg_callback(callback);
 }
 
 Variable<void>* DataManReader::InquireVariable( const std::string name, const bool readIn ) //not yet implemented
@@ -109,8 +110,69 @@ void DataManReader::Init( )
                                          ", in call to ADIOS Open or DataManReader constructor\n"  );
     }
 
-    InitCapsules( );
-    InitTransports( );
+    auto itRealTime = m_Method.m_Parameters.find( "real_time" );
+    if( itRealTime != m_Method.m_Parameters.end() )
+    {
+        if( itRealTime->second == "yes" || itRealTime->second == "true" )
+            m_DoRealTime = true;
+    }
+
+    if(m_DoRealTime)
+    {
+        /**
+         * Lambda function that assigns a parameter in m_Method to a localVariable of type std::string
+         */
+        auto lf_AssignString = [this]( const std::string parameter, std::string& localVariable )
+        {
+            auto it = m_Method.m_Parameters.find( parameter );
+            if( it != m_Method.m_Parameters.end() )
+            {
+                localVariable = it->second;
+            }
+        };
+
+        /**
+         * Lambda function that assigns a parameter in m_Method to a localVariable of type int
+         */
+        auto lf_AssignInt = [this]( const std::string parameter, int& localVariable )
+        {
+            auto it = m_Method.m_Parameters.find( parameter );
+            if( it != m_Method.m_Parameters.end() )
+            {
+                localVariable = std::stoi( it->second );
+            }
+        };
+
+        std::string method_type, method, local_ip, remote_ip; //no need to initialize to empty (it's default)
+        int local_port=0, remote_port=0, num_channels=0;
+
+        lf_AssignString( "method_type", method_type );
+        if( method_type == "stream" )
+        {
+            lf_AssignString( "method", method );
+            lf_AssignString( "local_ip", local_ip );
+            lf_AssignString( "remote_ip", remote_ip );
+            lf_AssignInt( "local_port", local_port );
+            lf_AssignInt( "remote_port", remote_port );
+            lf_AssignInt( "num_channels", num_channels );
+
+            json jmsg;
+            jmsg["method"] = method;
+            jmsg["local_ip"] = local_ip;
+            jmsg["remote_ip"] = remote_ip;
+            jmsg["local_port"] = local_port;
+            jmsg["remote_port"] = remote_port;
+            jmsg["num_channels"] = num_channels;
+            jmsg["stream_mode"] = "receiver";
+
+            m_Man.add_stream(jmsg);
+        }
+    }
+    else
+    {
+        InitCapsules( );
+        InitTransports( );
+    }
 }
 
 
diff --git a/src/engine/dataman/DataManWriter.cpp b/src/engine/dataman/DataManWriter.cpp
index 4e0b70d9f..9152f6b15 100644
--- a/src/engine/dataman/DataManWriter.cpp
+++ b/src/engine/dataman/DataManWriter.cpp
@@ -39,6 +39,7 @@ DataManWriter::~DataManWriter( )
 void DataManWriter::SetCallBack( std::function<void( const void*, std::string, std::string, std::string, Dims )> callback )
 {
     m_CallBack = callback;
+    m_Man.reg_callback(callback);
 }
 
 void DataManWriter::Write( Variable<char>& variable, const char* values )
@@ -141,6 +142,7 @@ void DataManWriter::Write( const std::string variableName, const std::complex<lo
 
 void DataManWriter::Close( const int transportIndex )
 {
+    m_Man.flush();
     //here close IPs and deallocate or free/close resources (if using STL no need for memory deallocation)
 }
 
@@ -209,6 +211,7 @@ void DataManWriter::Init( )
             jmsg["local_port"] = local_port;
             jmsg["remote_port"] = remote_port;
             jmsg["num_channels"] = num_channels;
+            jmsg["stream_mode"] = "sender";
 
             m_Man.add_stream(jmsg);
         }
-- 
GitLab