diff --git a/examples/heatTransfer/HeatTransfer.cpp b/examples/heatTransfer/HeatTransfer.cpp index de71231a5b1895a7805b6c9bf7ef6c68b880dc82..473a5e000fc9b8a2ade2597244dbcff331a712f3 100644 --- a/examples/heatTransfer/HeatTransfer.cpp +++ b/examples/heatTransfer/HeatTransfer.cpp @@ -162,14 +162,14 @@ void HeatTransfer::exchange( MPI_Comm comm ) MPI_Status status; if( m_s->rank_left >= 0 ) { - std::cout << "Rank " << m_s->rank << " send left to rank " << m_s->rank_left; + std::cout << "Rank " << m_s->rank << " send left to rank " << m_s->rank_left << std::endl; for( int i = 0; i < m_s->ndx+2; ++i) send_x[i] = m_TCurrent[i][1]; MPI_Send(send_x, m_s->ndx+2, MPI_REAL8, m_s->rank_left, tag, comm); } if( m_s->rank_right >= 0 ) { - std::cout << "Rank " << m_s->rank << " receive from right from rank " << m_s->rank_right; + std::cout << "Rank " << m_s->rank << " receive from right from rank " << m_s->rank_right << std::endl; MPI_Recv(recv_x, m_s->ndx+2, MPI_REAL8, m_s->rank_right, tag, comm, &status); for( int i = 0; i < m_s->ndx+2; ++i) m_TCurrent[i][m_s->ndy+1] = recv_x[i]; @@ -179,14 +179,14 @@ void HeatTransfer::exchange( MPI_Comm comm ) tag = 2; if( m_s->rank_right >= 0 ) { - std::cout << "Rank " << m_s->rank << " send right to rank " << m_s->rank_right; + std::cout << "Rank " << m_s->rank << " send right to rank " << m_s->rank_right << std::endl; for( int i = 0; i < m_s->ndx+2; ++i) send_x[i] = m_TCurrent[i][m_s->ndy]; MPI_Send(send_x, m_s->ndx+2, MPI_REAL8, m_s->rank_right, tag, comm); } if( m_s->rank_left >= 0 ) { - std::cout << "Rank " << m_s->rank << " receive from left from rank " << m_s->rank_left; + std::cout << "Rank " << m_s->rank << " receive from left from rank " << m_s->rank_left << std::endl; MPI_Recv(recv_x, m_s->ndx+2, MPI_REAL8, m_s->rank_left, tag, comm, &status); for( int i = 0; i < m_s->ndx+2; ++i) m_TCurrent[i][0] = recv_x[i]; @@ -196,12 +196,12 @@ void HeatTransfer::exchange( MPI_Comm comm ) tag = 3; if( m_s->rank_down >= 0 ) { - std::cout << "Rank " << m_s->rank << " send down to rank " << m_s->rank_down; + std::cout << "Rank " << m_s->rank << " send down to rank " << m_s->rank_down << std::endl; MPI_Send(m_TCurrent[m_s->ndx], m_s->ndy+2, MPI_REAL8, m_s->rank_down, tag, comm); } if ( m_s->rank_up >= 0 ) { - std::cout << "Rank " << m_s->rank << " receive from above from rank " << m_s->rank_up; + std::cout << "Rank " << m_s->rank << " receive from above from rank " << m_s->rank_up << std::endl; MPI_Recv(m_TCurrent[0], m_s->ndy+2, MPI_REAL8, m_s->rank_up, tag, comm, &status); } @@ -209,12 +209,12 @@ void HeatTransfer::exchange( MPI_Comm comm ) tag = 4; if( m_s->rank_up >= 0 ) { - std::cout << "Rank " << m_s->rank << " send up to rank " << m_s->rank_up; + std::cout << "Rank " << m_s->rank << " send up to rank " << m_s->rank_up << std::endl; MPI_Send(m_TCurrent[1], m_s->ndy+2, MPI_REAL8, m_s->rank_up, tag, comm); } if ( m_s->rank_down >= 0 ) { - std::cout << "Rank " << m_s->rank << " receive from below from rank " << m_s->rank_down; + std::cout << "Rank " << m_s->rank << " receive from below from rank " << m_s->rank_down << std::endl; MPI_Recv(m_TCurrent[m_s->ndx+1], m_s->ndy+2, MPI_REAL8, m_s->rank_down, tag, comm, &status); } @@ -222,4 +222,17 @@ void HeatTransfer::exchange( MPI_Comm comm ) delete[] recv_x; } - +#include <cstring> +/* Copies the internal ndx*ndy section of the ndx+2 * ndy+2 local array + * into a separate contiguous vector and returns it. + * @return A vector with ndx*ndy elements + */ +std::vector<double> HeatTransfer::data_noghost() +{ + std::vector<double>d( m_s->ndx * m_s->ndy ); + for( int i = 1; i <= m_s->ndx; ++i ) + { + std::memcpy( &d[(i-1)*m_s->ndy], m_TCurrent[i]+1, m_s->ndy*sizeof(double)); + } + return d; +} diff --git a/examples/heatTransfer/HeatTransfer.h b/examples/heatTransfer/HeatTransfer.h index 6008f8ea74aa8292cebc4771489c536837546c1b..1f05505113db73601f60c42820a62a1f472977c9 100644 --- a/examples/heatTransfer/HeatTransfer.h +++ b/examples/heatTransfer/HeatTransfer.h @@ -9,6 +9,8 @@ #define HEATTRANSFER_H_ #include <mpi.h> +#include <vector> + #include "Settings.h" class HeatTransfer @@ -21,8 +23,10 @@ public: void heatEdges(); // reset the heat values at the global edge void exchange( MPI_Comm comm ); // send updates to neighbors - const double *data() {return m_TCurrent[0];}; // return (1D) pointer to current T data - const double T(int i, int j) {return m_TCurrent[i][j];}; // return current T value at i,j local coordinate + // return (1D) pointer to current T data, ndx+2 * ndy+2 elements + const double *data() {return m_TCurrent[0];}; + // return (1D) pointer to current T data without ghost cells, ndx*ndy elements + std::vector<double> data_noghost(); void printT(std::string message, MPI_Comm comm); // debug: print local TCurrent on stdout diff --git a/examples/heatTransfer/IO.h b/examples/heatTransfer/IO.h index 1e6539eb3ab71989f6f0525af1c862467ad83166..542112f1c8d9c03c73f126e1f7a5da5e40e4f37b 100644 --- a/examples/heatTransfer/IO.h +++ b/examples/heatTransfer/IO.h @@ -16,7 +16,7 @@ class IO { public: - IO( std::string output_basename, MPI_Comm comm ); + IO( std::shared_ptr<Settings> s, MPI_Comm comm ); ~IO(); void write( int step, std::shared_ptr<HeatTransfer> ht, std::shared_ptr<Settings> s, MPI_Comm comm ); diff --git a/examples/heatTransfer/IO_adios1.cpp b/examples/heatTransfer/IO_adios1.cpp new file mode 100644 index 0000000000000000000000000000000000000000..3cd1705811590b27966801a04287cf1d558feb6d --- /dev/null +++ b/examples/heatTransfer/IO_adios1.cpp @@ -0,0 +1,73 @@ +/* + * IO_ADIOS1.cpp + * + * Created on: Feb 2017 + * Author: Norbert Podhorszki + */ + +#include <string> +#include <iostream> +#include <iomanip> + +#include "IO.h" +#include "adios.h" + +static int64_t group; +static int rank_saved; + +IO::IO( std::shared_ptr<Settings> s, MPI_Comm comm ) +{ + rank_saved = s->rank; + m_outputfilename = s->outputfile + ".bp"; + adios_init_noxml( comm ); + adios_declare_group( &group, "heat", "", adios_stat_default ); + adios_select_method( group, "MPI", "", "" ); + + adios_define_var( group, "gndx", "", adios_integer, "", "", ""); + adios_define_var( group, "gndy", "", adios_integer, "", "", ""); + + std::string ldims( std::to_string( s->ndx ) + "," + std::to_string( s->ndy )); + std::string gdims( std::to_string( s->gndx ) + "," + std::to_string( s->gndy )); + std::string offs( std::to_string( s->offsx ) + "," + std::to_string( s->offsy )); + uint64_t T_id; + T_id = adios_define_var( group, "T", "", adios_double, + ldims.c_str(), gdims.c_str(), offs.c_str() ); + + adios_set_transform( T_id, "none"); + //adios_set_transform( T_id, "zfp:accuracy=0.001"); +} + +IO::~IO() +{ + adios_finalize(rank_saved); +} + +void IO::write( int step, std::shared_ptr<HeatTransfer> ht, std::shared_ptr<Settings> s, MPI_Comm comm ) +{ + char mode[2] = "w"; + if (step > 0) + { + mode[0] = 'a'; + } + + // for time measurements, let's synchronize the processes + MPI_Barrier( comm ); + double time_start = MPI_Wtime(); + + int64_t f; + adios_open( &f, "heat", m_outputfilename.c_str(), mode, comm); + adios_write( f, "gndx", &s->gndx); + adios_write( f, "gndy", &s->gndy); + adios_write( f, "T", ht->data_noghost().data() ); + adios_close( f ); + + MPI_Barrier( comm ); + double total_time = MPI_Wtime() - time_start; + uint64_t adios_totalsize = 2*sizeof(int) + 2*s->ndx*s->ndy*sizeof(double); + uint64_t sizeMB = adios_totalsize * s->nproc/1024/1024/1024; // size in MB + uint64_t mbs = sizeMB/total_time; + if (s->rank==0) + std::cout << "Step " << step << ": " << m_outputfilename + << " " << sizeMB << " " << total_time << "" << mbs << std::endl; +} + diff --git a/examples/heatTransfer/IO_adios2.cpp b/examples/heatTransfer/IO_adios2.cpp index 0617a3f1ca4a8e01cf65eb904304d85aada59258..c6f3615b414c45a1c6103812e28a489f0ce9e9df 100644 --- a/examples/heatTransfer/IO_adios2.cpp +++ b/examples/heatTransfer/IO_adios2.cpp @@ -8,16 +8,16 @@ #include "IO.h" -IO::IO( std::string output_basename ) +IO::IO( std::shared_ptr<Settings> s, MPI_Comm comm ) { - m_outputfilename = output_basename + ".bp"; + m_outputfilename = s->outputfile + ".bp"; } IO::~IO() { } -void IO::write( int step, int curr, Settings& s ) +void IO::write(int step, std::shared_ptr<HeatTransfer> ht, std::shared_ptr<Settings> s, MPI_Comm comm ) { } diff --git a/examples/heatTransfer/IO_ascii.cpp b/examples/heatTransfer/IO_ascii.cpp index ce71ecd5897d2cdb44b144a4c564fb5aa024c72a..4ab36749bd10cdb77c527b6385eb2d29a3fe33f7 100644 --- a/examples/heatTransfer/IO_ascii.cpp +++ b/examples/heatTransfer/IO_ascii.cpp @@ -1,5 +1,5 @@ /* - * IO_ADIOS2.cpp + * IO_ascii.cpp * * Created on: Feb 2017 * Author: Norbert Podhorszki @@ -14,9 +14,9 @@ static std::ofstream of; static std::streambuf *buf; -IO::IO( std::string output_basename, MPI_Comm comm ) +IO::IO( std::shared_ptr<Settings> s, MPI_Comm comm ) { - m_outputfilename = output_basename; + m_outputfilename = s->outputfile; if (m_outputfilename == "cout") { @@ -27,7 +27,7 @@ IO::IO( std::string output_basename, MPI_Comm comm ) int rank; MPI_Comm_rank( comm, &rank ); std::string rs = std::to_string(rank); - of.open(output_basename + rs + ".txt"); + of.open(m_outputfilename + rs + ".txt"); buf = of.rdbuf(); } } @@ -43,7 +43,7 @@ IO::~IO() void IO::write( int step, std::shared_ptr<HeatTransfer> ht, std::shared_ptr<Settings> s, MPI_Comm comm) { std::ostream out(buf); - if( step == 1) + if( step == 0) { out << "rank=" << s->rank << " size=" << s->ndx << "x" << s->ndy diff --git a/examples/heatTransfer/Makefile b/examples/heatTransfer/Makefile index e1ed1b5b88fbbc8b04a1e54493e0afdf9573360c..1d8b8c5acc840678d8b566946c8ed09d000fbecd 100644 --- a/examples/heatTransfer/Makefile +++ b/examples/heatTransfer/Makefile @@ -1,20 +1,34 @@ -# Makefile for testing purposes, will build writer_mpi (make or make mpi) or writer_nompi (make nompi) +# Makefile # Created on: Feb 13, 2017 # Author: pnorbert +# +# SYSTEM SPECIFIC SETTINGS +# #COMPILERS CC=g++ MPICC=mpic++ -#ADIOS LOCATION -ADIOS_DIR=../.. -ADIOS_INCLUDE=-I$(ADIOS_DIR)/include -ADIOS_LIB=$(ADIOS_DIR)/lib/libadios.a -ADIOS_NOMPI_LIB=$(ADIOS_DIR)/lib/libadios_nompi.a - #FLAGS CFLAGS=-Wall -Wpedantic -Woverloaded-virtual -std=c++11 -O0 -g -LDFLAGS= +LDFLAGS=-lpthread + +#ADIOS1 LOCATION +ADIOS1_DIR=/opt/adios/1.11 + +#ADIOS2 LOCATION +ADIOS2_DIR=../.. + +# +# SHOULD NOT NEED TO CHANGE ANYTHING BELOW +# + +ADIOS1_INCLUDE=-I$(ADIOS1_DIR)/include +ADIOS1_LIB=`${ADIOS1_DIR}/bin/adios_config -l` + + +ADIOS2_INCLUDE= -DHAVE_MPI -I$(ADIOS2_DIR)/include +ADIOS2_LIB=$(ADIOS2_DIR)/lib/libadios.a default: @echo "Make targets: ascii hdf5_a hdf5_b phdf5 adios1 adios2" @@ -31,11 +45,14 @@ help: default ascii: main.cpp HeatTransfer.cpp Settings.cpp IO_ascii.cpp - $(MPICC) $(CFLAGS) $(ADIOS_INCLUDE) -DHAVE_MPI -o heatTransfer_ascii $^ $(ADIOS_LIB) $(LDFLAGS) -lpthread + $(MPICC) $(CFLAGS) -o heatTransfer_ascii $^ $(LDFLAGS) + +adios1: main.cpp HeatTransfer.cpp Settings.cpp IO_adios1.cpp + $(MPICC) $(CFLAGS) $(ADIOS1_INCLUDE) -o heatTransfer_adios1 $^ $(ADIOS1_LIB) $(LDFLAGS) adios2: main.cpp HeatTransfer.cpp Settings.cpp IO_adios2.cpp - $(MPICC) $(CFLAGS) $(ADIOS_INCLUDE) -DHAVE_MPI -o heatTransfer_adios2 $^ $(ADIOS_LIB) $(LDFLAGS) -lpthread + $(MPICC) $(CFLAGS) $(ADIOS2_INCLUDE) -o heatTransfer_adios2 $^ $(ADIOS2_LIB) $(LDFLAGS) clean: - rm -f heatTransfer_ascii heatTransfer_adios2 + rm -f heatTransfer_ascii heatTransfer_adios1 heatTransfer_adios2 diff --git a/examples/heatTransfer/main.cpp b/examples/heatTransfer/main.cpp index 0fcde73ffa3f1c2e0e4300ea6225bca65d0c2319..c4978446b218fe326419f83fa2a48ccf85ef69c5 100644 --- a/examples/heatTransfer/main.cpp +++ b/examples/heatTransfer/main.cpp @@ -59,13 +59,15 @@ int main( int argc, char* argv [] ) double timeStart = MPI_Wtime(); std::shared_ptr<Settings> settings( new Settings( argc, argv, rank, nproc )); std::shared_ptr<HeatTransfer> ht( new HeatTransfer( settings )); - std::shared_ptr<IO> io( new IO( settings->outputfile, mpiHeatTransferComm )); + std::shared_ptr<IO> io( new IO( settings, mpiHeatTransferComm )); - - ht->init(false); + ht->init(true); ht->printT("Initialized T:", mpiHeatTransferComm); ht->heatEdges(); + //ht->exchange( mpiHeatTransferComm ); ht->printT("Heated T:", mpiHeatTransferComm); + io->write( 0, ht, settings, mpiHeatTransferComm ); + for( int t = 1; t <= settings->steps; ++t ) { if( rank == 0 ) @@ -73,8 +75,8 @@ int main( int argc, char* argv [] ) for( int iter = 1; iter <= settings->iterations; ++iter ) { ht->iterate(); - ht->heatEdges(); ht->exchange( mpiHeatTransferComm ); + ht->heatEdges(); } io->write( t, ht, settings, mpiHeatTransferComm ); } @@ -101,6 +103,7 @@ int main( int argc, char* argv [] ) std::cout << e.what() << std::endl; } + MPI_Finalize(); return 0; } diff --git a/examples/hello/datamanReader/helloDataManReader.cpp b/examples/hello/datamanReader/helloDataManReader.cpp index 73c1cc99bac31ba05ad8cb1fc99a38d8cb40b0b4..0943c6c402f32867ac5f966a1bf4ee2f496bbe3e 100644 --- a/examples/hello/datamanReader/helloDataManReader.cpp +++ b/examples/hello/datamanReader/helloDataManReader.cpp @@ -24,7 +24,6 @@ void getcb( const void *data, std::string doid, std::string var, std::string dty std::cout << "data object ID = " << doid << "\n"; //do you need to flush? std::cout << "variable name = " << var << "\n"; std::cout << "data type = " << dtype << "\n"; - float *dataf = (float*)data; std::size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1, std::multiplies<std::size_t>()); @@ -33,7 +32,6 @@ void getcb( const void *data, std::string doid, std::string var, std::string dty std::cout << std::endl; } - int main( int argc, char* argv [] ) { MPI_Init( &argc, &argv ); diff --git a/examples/hello/datamanReader/helloDataManReader_nompi.cpp b/examples/hello/datamanReader/helloDataManReader_nompi.cpp index 11c17a138d1dc72ecc05b0c09d1d903d5a8e6665..be16f1149ad4a54b89ae951020bb019f9bd9ffbe 100644 --- a/examples/hello/datamanReader/helloDataManReader_nompi.cpp +++ b/examples/hello/datamanReader/helloDataManReader_nompi.cpp @@ -7,9 +7,22 @@ #include <vector> #include <iostream> +#include <numeric> #include "ADIOS_CPP.h" +void getcb( const void *data, std::string doid, std::string var, std::string dtype, std::vector<std::size_t> varshape ) +{ + std::cout << "data object ID = " << doid << "\n"; //do you need to flush? + std::cout << "variable name = " << var << "\n"; + std::cout << "data type = " << dtype << "\n"; + + std::size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1, std::multiplies<std::size_t>()); + + for (int i=0; i<varsize; i++) + std::cout << ((float*)data)[i] << " "; + std::cout << std::endl; +} int main( int argc, char* argv [] ) { @@ -20,8 +33,8 @@ int main( int argc, char* argv [] ) { //Define method for engine creation, it is basically straight-forward parameters adios::Method& datamanSettings = adios.DeclareMethod( "WAN", "DataManReader" ); //default method type is BPWriter - datamanSettings.SetParameters( "peer-to-peer=yes" ); - datamanSettings.AddTransport( "Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3" ); + datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=zmq", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12307", "remote_port=12306" ); +// datamanSettings.AddTransport( "Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3" ); //datamanSettings.AddTransport( "ZeroMQ", "localIP=128.0.0.0.1.1", "remoteIP=128.0.0.0.2.1", "tolerances=1,2,3" ); not yet supported , will throw an exception //Create engine smart pointer to DataManReader Engine due to polymorphism, @@ -31,6 +44,10 @@ int main( int argc, char* argv [] ) if( datamanReader == nullptr ) throw std::ios_base::failure( "ERROR: failed to create DataMan I/O engine at Open\n" ); + datamanReader->SetCallBack( getcb ); + + while(1){} + adios::Variable<double>* ioMyDoubles = datamanReader->InquireVariableDouble( "ioMyDoubles" ); if( ioMyDoubles == nullptr ) std::cout << "Variable ioMyDoubles not read...yet\n"; diff --git a/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp b/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp index d9d245c79de90897fddda7e132c2c8f8582fee43..b7ad9693b85fb6ed380fa17a2f23c8d9613eee4c 100644 --- a/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp +++ b/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp @@ -17,6 +17,7 @@ int main( int argc, char* argv [] ) adios::ADIOS adios( adiosDebug ); //Application variable + std::vector<float> myFloats = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; std::vector<double> myDoubles = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }; const std::size_t Nx = myDoubles.size(); @@ -30,12 +31,13 @@ int main( int argc, char* argv [] ) { //Define variable and local size //Define variable and local size - auto& ioMyDoubles = adios.DefineVariable<double>( "myDoubles", adios::Dims{Nx} ); - auto& ioMyCFloats = adios.DefineVariable<std::complex<float>>( "myCFloats", {3} ); + auto& ioMyFloats = adios.DefineVariable<float>( "myfloats", adios::Dims{Nx} ); +// auto& ioMyDoubles = adios.DefineVariable<double>( "myDoubles", adios::Dims{Nx} ); +// auto& ioMyCFloats = adios.DefineVariable<std::complex<float>>( "myCFloats", {3} ); //Define method for engine creation, it is basically straight-forward parameters adios::Method& datamanSettings = adios.DeclareMethod( "WAN", "DataManWriter" ); //default method type is Writer - datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=dump", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12306", "remote_port=12307" ); + datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=zmq", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12306", "remote_port=12307" ); // datamanSettings.AddTransport( "Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3" ); //datamanSettings.AddTransport( "ZeroMQ", "localIP=128.0.0.0.1.1", "remoteIP=128.0.0.0.2.1", "tolerances=1,2,3" ); not yet supported , will throw an exception @@ -46,8 +48,8 @@ int main( int argc, char* argv [] ) if( datamanWriter == nullptr ) throw std::ios_base::failure( "ERROR: failed to create DataMan I/O engine at Open\n" ); - datamanWriter->Write( ioMyDoubles, myDoubles.data() ); // Base class Engine own the Write<T> that will call overloaded Write from Derived - datamanWriter->Write( ioMyCFloats, myCFloats.data() ); + datamanWriter->Write( ioMyFloats, myFloats.data() ); // Base class Engine own the Write<T> that will call overloaded Write from Derived +// datamanWriter->Write( ioMyCFloats, myCFloats.data() ); datamanWriter->Close( ); } catch( std::invalid_argument& e ) diff --git a/include/engine/dataman/DataManReader.h b/include/engine/dataman/DataManReader.h index e296687746d012dbe90efc066d61681690b27fff..9def2e9f647212e357a19b04e73a30432c861641 100644 --- a/include/engine/dataman/DataManReader.h +++ b/include/engine/dataman/DataManReader.h @@ -79,8 +79,10 @@ private: capsule::STLVector m_Buffer; ///< heap capsule, contains data and metadata buffers format::BP1Writer m_BP1Writer; ///< format object will provide the required BP functionality to be applied on m_Buffer and m_Transports - std::function<void( const void*, std::string, std::string, std::string, Dims )> m_CallBack; ///< call back function + bool m_DoRealTime = false; + DataManager m_Man; + std::function<void( const void*, std::string, std::string, std::string, Dims )> m_CallBack; ///< call back function void Init( ); ///< calls InitCapsules and InitTransports based on Method, called from constructor void InitCapsules( ); diff --git a/src/engine/dataman/DataManReader.cpp b/src/engine/dataman/DataManReader.cpp index a2e32889de2731839dc16c330c75086f4a8e3eee..0f5a5c044cf47014b6073205108cf97888f1f15f 100644 --- a/src/engine/dataman/DataManReader.cpp +++ b/src/engine/dataman/DataManReader.cpp @@ -36,6 +36,7 @@ DataManReader::~DataManReader( ) void DataManReader::SetCallBack( std::function<void( const void*, std::string, std::string, std::string, Dims )> callback ) { m_CallBack = callback; + m_Man.reg_callback(callback); } Variable<void>* DataManReader::InquireVariable( const std::string name, const bool readIn ) //not yet implemented @@ -109,8 +110,69 @@ void DataManReader::Init( ) ", in call to ADIOS Open or DataManReader constructor\n" ); } - InitCapsules( ); - InitTransports( ); + auto itRealTime = m_Method.m_Parameters.find( "real_time" ); + if( itRealTime != m_Method.m_Parameters.end() ) + { + if( itRealTime->second == "yes" || itRealTime->second == "true" ) + m_DoRealTime = true; + } + + if(m_DoRealTime) + { + /** + * Lambda function that assigns a parameter in m_Method to a localVariable of type std::string + */ + auto lf_AssignString = [this]( const std::string parameter, std::string& localVariable ) + { + auto it = m_Method.m_Parameters.find( parameter ); + if( it != m_Method.m_Parameters.end() ) + { + localVariable = it->second; + } + }; + + /** + * Lambda function that assigns a parameter in m_Method to a localVariable of type int + */ + auto lf_AssignInt = [this]( const std::string parameter, int& localVariable ) + { + auto it = m_Method.m_Parameters.find( parameter ); + if( it != m_Method.m_Parameters.end() ) + { + localVariable = std::stoi( it->second ); + } + }; + + std::string method_type, method, local_ip, remote_ip; //no need to initialize to empty (it's default) + int local_port=0, remote_port=0, num_channels=0; + + lf_AssignString( "method_type", method_type ); + if( method_type == "stream" ) + { + lf_AssignString( "method", method ); + lf_AssignString( "local_ip", local_ip ); + lf_AssignString( "remote_ip", remote_ip ); + lf_AssignInt( "local_port", local_port ); + lf_AssignInt( "remote_port", remote_port ); + lf_AssignInt( "num_channels", num_channels ); + + json jmsg; + jmsg["method"] = method; + jmsg["local_ip"] = local_ip; + jmsg["remote_ip"] = remote_ip; + jmsg["local_port"] = local_port; + jmsg["remote_port"] = remote_port; + jmsg["num_channels"] = num_channels; + jmsg["stream_mode"] = "receiver"; + + m_Man.add_stream(jmsg); + } + } + else + { + InitCapsules( ); + InitTransports( ); + } } diff --git a/src/engine/dataman/DataManWriter.cpp b/src/engine/dataman/DataManWriter.cpp index 4e0b70d9f32e32c2ad027e71d1c25c685751fa6b..9152f6b15b924440ae9e09ea259a6226506861b0 100644 --- a/src/engine/dataman/DataManWriter.cpp +++ b/src/engine/dataman/DataManWriter.cpp @@ -39,6 +39,7 @@ DataManWriter::~DataManWriter( ) void DataManWriter::SetCallBack( std::function<void( const void*, std::string, std::string, std::string, Dims )> callback ) { m_CallBack = callback; + m_Man.reg_callback(callback); } void DataManWriter::Write( Variable<char>& variable, const char* values ) @@ -141,6 +142,7 @@ void DataManWriter::Write( const std::string variableName, const std::complex<lo void DataManWriter::Close( const int transportIndex ) { + m_Man.flush(); //here close IPs and deallocate or free/close resources (if using STL no need for memory deallocation) } @@ -209,6 +211,7 @@ void DataManWriter::Init( ) jmsg["local_port"] = local_port; jmsg["remote_port"] = remote_port; jmsg["num_channels"] = num_channels; + jmsg["stream_mode"] = "sender"; m_Man.add_stream(jmsg); }