Skip to content
Snippets Groups Projects
Commit e6f07398 authored by wgodoy's avatar wgodoy
Browse files

Merge branch 'groupless' of https://github.com/williamfgc/ADIOSPP.git into groupless

parents 7c2f6b1b 8f6bdac2
No related branches found
No related tags found
1 merge request!8Integrate groupless
Showing
with 241 additions and 47 deletions
......@@ -162,14 +162,14 @@ void HeatTransfer::exchange( MPI_Comm comm )
MPI_Status status;
if( m_s->rank_left >= 0 )
{
std::cout << "Rank " << m_s->rank << " send left to rank " << m_s->rank_left;
std::cout << "Rank " << m_s->rank << " send left to rank " << m_s->rank_left << std::endl;
for( int i = 0; i < m_s->ndx+2; ++i)
send_x[i] = m_TCurrent[i][1];
MPI_Send(send_x, m_s->ndx+2, MPI_REAL8, m_s->rank_left, tag, comm);
}
if( m_s->rank_right >= 0 )
{
std::cout << "Rank " << m_s->rank << " receive from right from rank " << m_s->rank_right;
std::cout << "Rank " << m_s->rank << " receive from right from rank " << m_s->rank_right << std::endl;
MPI_Recv(recv_x, m_s->ndx+2, MPI_REAL8, m_s->rank_right, tag, comm, &status);
for( int i = 0; i < m_s->ndx+2; ++i)
m_TCurrent[i][m_s->ndy+1] = recv_x[i];
......@@ -179,14 +179,14 @@ void HeatTransfer::exchange( MPI_Comm comm )
tag = 2;
if( m_s->rank_right >= 0 )
{
std::cout << "Rank " << m_s->rank << " send right to rank " << m_s->rank_right;
std::cout << "Rank " << m_s->rank << " send right to rank " << m_s->rank_right << std::endl;
for( int i = 0; i < m_s->ndx+2; ++i)
send_x[i] = m_TCurrent[i][m_s->ndy];
MPI_Send(send_x, m_s->ndx+2, MPI_REAL8, m_s->rank_right, tag, comm);
}
if( m_s->rank_left >= 0 )
{
std::cout << "Rank " << m_s->rank << " receive from left from rank " << m_s->rank_left;
std::cout << "Rank " << m_s->rank << " receive from left from rank " << m_s->rank_left << std::endl;
MPI_Recv(recv_x, m_s->ndx+2, MPI_REAL8, m_s->rank_left, tag, comm, &status);
for( int i = 0; i < m_s->ndx+2; ++i)
m_TCurrent[i][0] = recv_x[i];
......@@ -196,12 +196,12 @@ void HeatTransfer::exchange( MPI_Comm comm )
tag = 3;
if( m_s->rank_down >= 0 )
{
std::cout << "Rank " << m_s->rank << " send down to rank " << m_s->rank_down;
std::cout << "Rank " << m_s->rank << " send down to rank " << m_s->rank_down << std::endl;
MPI_Send(m_TCurrent[m_s->ndx], m_s->ndy+2, MPI_REAL8, m_s->rank_down, tag, comm);
}
if ( m_s->rank_up >= 0 )
{
std::cout << "Rank " << m_s->rank << " receive from above from rank " << m_s->rank_up;
std::cout << "Rank " << m_s->rank << " receive from above from rank " << m_s->rank_up << std::endl;
MPI_Recv(m_TCurrent[0], m_s->ndy+2, MPI_REAL8, m_s->rank_up, tag, comm, &status);
}
......@@ -209,12 +209,12 @@ void HeatTransfer::exchange( MPI_Comm comm )
tag = 4;
if( m_s->rank_up >= 0 )
{
std::cout << "Rank " << m_s->rank << " send up to rank " << m_s->rank_up;
std::cout << "Rank " << m_s->rank << " send up to rank " << m_s->rank_up << std::endl;
MPI_Send(m_TCurrent[1], m_s->ndy+2, MPI_REAL8, m_s->rank_up, tag, comm);
}
if ( m_s->rank_down >= 0 )
{
std::cout << "Rank " << m_s->rank << " receive from below from rank " << m_s->rank_down;
std::cout << "Rank " << m_s->rank << " receive from below from rank " << m_s->rank_down << std::endl;
MPI_Recv(m_TCurrent[m_s->ndx+1], m_s->ndy+2, MPI_REAL8, m_s->rank_down, tag, comm, &status);
}
......@@ -222,4 +222,17 @@ void HeatTransfer::exchange( MPI_Comm comm )
delete[] recv_x;
}
#include <cstring>
/* Copies the internal ndx*ndy section of the ndx+2 * ndy+2 local array
* into a separate contiguous vector and returns it.
* @return A vector with ndx*ndy elements
*/
std::vector<double> HeatTransfer::data_noghost()
{
std::vector<double>d( m_s->ndx * m_s->ndy );
for( int i = 1; i <= m_s->ndx; ++i )
{
std::memcpy( &d[(i-1)*m_s->ndy], m_TCurrent[i]+1, m_s->ndy*sizeof(double));
}
return d;
}
......@@ -9,6 +9,8 @@
#define HEATTRANSFER_H_
#include <mpi.h>
#include <vector>
#include "Settings.h"
class HeatTransfer
......@@ -21,8 +23,10 @@ public:
void heatEdges(); // reset the heat values at the global edge
void exchange( MPI_Comm comm ); // send updates to neighbors
const double *data() {return m_TCurrent[0];}; // return (1D) pointer to current T data
const double T(int i, int j) {return m_TCurrent[i][j];}; // return current T value at i,j local coordinate
// return (1D) pointer to current T data, ndx+2 * ndy+2 elements
const double *data() {return m_TCurrent[0];};
// return (1D) pointer to current T data without ghost cells, ndx*ndy elements
std::vector<double> data_noghost();
void printT(std::string message, MPI_Comm comm); // debug: print local TCurrent on stdout
......
......@@ -16,7 +16,7 @@ class IO
{
public:
IO( std::string output_basename, MPI_Comm comm );
IO( std::shared_ptr<Settings> s, MPI_Comm comm );
~IO();
void write( int step, std::shared_ptr<HeatTransfer> ht, std::shared_ptr<Settings> s, MPI_Comm comm );
......
/*
* IO_ADIOS1.cpp
*
* Created on: Feb 2017
* Author: Norbert Podhorszki
*/
#include <string>
#include <iostream>
#include <iomanip>
#include "IO.h"
#include "adios.h"
static int64_t group;
static int rank_saved;
IO::IO( std::shared_ptr<Settings> s, MPI_Comm comm )
{
rank_saved = s->rank;
m_outputfilename = s->outputfile + ".bp";
adios_init_noxml( comm );
adios_declare_group( &group, "heat", "", adios_stat_default );
adios_select_method( group, "MPI", "", "" );
adios_define_var( group, "gndx", "", adios_integer, "", "", "");
adios_define_var( group, "gndy", "", adios_integer, "", "", "");
std::string ldims( std::to_string( s->ndx ) + "," + std::to_string( s->ndy ));
std::string gdims( std::to_string( s->gndx ) + "," + std::to_string( s->gndy ));
std::string offs( std::to_string( s->offsx ) + "," + std::to_string( s->offsy ));
uint64_t T_id;
T_id = adios_define_var( group, "T", "", adios_double,
ldims.c_str(), gdims.c_str(), offs.c_str() );
adios_set_transform( T_id, "none");
//adios_set_transform( T_id, "zfp:accuracy=0.001");
}
IO::~IO()
{
adios_finalize(rank_saved);
}
void IO::write( int step, std::shared_ptr<HeatTransfer> ht, std::shared_ptr<Settings> s, MPI_Comm comm )
{
char mode[2] = "w";
if (step > 0)
{
mode[0] = 'a';
}
// for time measurements, let's synchronize the processes
MPI_Barrier( comm );
double time_start = MPI_Wtime();
int64_t f;
adios_open( &f, "heat", m_outputfilename.c_str(), mode, comm);
adios_write( f, "gndx", &s->gndx);
adios_write( f, "gndy", &s->gndy);
adios_write( f, "T", ht->data_noghost().data() );
adios_close( f );
MPI_Barrier( comm );
double total_time = MPI_Wtime() - time_start;
uint64_t adios_totalsize = 2*sizeof(int) + 2*s->ndx*s->ndy*sizeof(double);
uint64_t sizeMB = adios_totalsize * s->nproc/1024/1024/1024; // size in MB
uint64_t mbs = sizeMB/total_time;
if (s->rank==0)
std::cout << "Step " << step << ": " << m_outputfilename
<< " " << sizeMB << " " << total_time << "" << mbs << std::endl;
}
......@@ -8,16 +8,16 @@
#include "IO.h"
IO::IO( std::string output_basename )
IO::IO( std::shared_ptr<Settings> s, MPI_Comm comm )
{
m_outputfilename = output_basename + ".bp";
m_outputfilename = s->outputfile + ".bp";
}
IO::~IO()
{
}
void IO::write( int step, int curr, Settings& s )
void IO::write(int step, std::shared_ptr<HeatTransfer> ht, std::shared_ptr<Settings> s, MPI_Comm comm )
{
}
/*
* IO_ADIOS2.cpp
* IO_ascii.cpp
*
* Created on: Feb 2017
* Author: Norbert Podhorszki
......@@ -14,9 +14,9 @@
static std::ofstream of;
static std::streambuf *buf;
IO::IO( std::string output_basename, MPI_Comm comm )
IO::IO( std::shared_ptr<Settings> s, MPI_Comm comm )
{
m_outputfilename = output_basename;
m_outputfilename = s->outputfile;
if (m_outputfilename == "cout")
{
......@@ -27,7 +27,7 @@ IO::IO( std::string output_basename, MPI_Comm comm )
int rank;
MPI_Comm_rank( comm, &rank );
std::string rs = std::to_string(rank);
of.open(output_basename + rs + ".txt");
of.open(m_outputfilename + rs + ".txt");
buf = of.rdbuf();
}
}
......@@ -43,7 +43,7 @@ IO::~IO()
void IO::write( int step, std::shared_ptr<HeatTransfer> ht, std::shared_ptr<Settings> s, MPI_Comm comm)
{
std::ostream out(buf);
if( step == 1)
if( step == 0)
{
out << "rank=" << s->rank
<< " size=" << s->ndx << "x" << s->ndy
......
# Makefile for testing purposes, will build writer_mpi (make or make mpi) or writer_nompi (make nompi)
# Makefile
# Created on: Feb 13, 2017
# Author: pnorbert
#
# SYSTEM SPECIFIC SETTINGS
#
#COMPILERS
CC=g++
MPICC=mpic++
#ADIOS LOCATION
ADIOS_DIR=../..
ADIOS_INCLUDE=-I$(ADIOS_DIR)/include
ADIOS_LIB=$(ADIOS_DIR)/lib/libadios.a
ADIOS_NOMPI_LIB=$(ADIOS_DIR)/lib/libadios_nompi.a
#FLAGS
CFLAGS=-Wall -Wpedantic -Woverloaded-virtual -std=c++11 -O0 -g
LDFLAGS=
LDFLAGS=-lpthread
#ADIOS1 LOCATION
ADIOS1_DIR=/opt/adios/1.11
#ADIOS2 LOCATION
ADIOS2_DIR=../..
#
# SHOULD NOT NEED TO CHANGE ANYTHING BELOW
#
ADIOS1_INCLUDE=-I$(ADIOS1_DIR)/include
ADIOS1_LIB=`${ADIOS1_DIR}/bin/adios_config -l`
ADIOS2_INCLUDE= -DHAVE_MPI -I$(ADIOS2_DIR)/include
ADIOS2_LIB=$(ADIOS2_DIR)/lib/libadios.a
default:
@echo "Make targets: ascii hdf5_a hdf5_b phdf5 adios1 adios2"
......@@ -31,11 +45,14 @@ help: default
ascii: main.cpp HeatTransfer.cpp Settings.cpp IO_ascii.cpp
$(MPICC) $(CFLAGS) $(ADIOS_INCLUDE) -DHAVE_MPI -o heatTransfer_ascii $^ $(ADIOS_LIB) $(LDFLAGS) -lpthread
$(MPICC) $(CFLAGS) -o heatTransfer_ascii $^ $(LDFLAGS)
adios1: main.cpp HeatTransfer.cpp Settings.cpp IO_adios1.cpp
$(MPICC) $(CFLAGS) $(ADIOS1_INCLUDE) -o heatTransfer_adios1 $^ $(ADIOS1_LIB) $(LDFLAGS)
adios2: main.cpp HeatTransfer.cpp Settings.cpp IO_adios2.cpp
$(MPICC) $(CFLAGS) $(ADIOS_INCLUDE) -DHAVE_MPI -o heatTransfer_adios2 $^ $(ADIOS_LIB) $(LDFLAGS) -lpthread
$(MPICC) $(CFLAGS) $(ADIOS2_INCLUDE) -o heatTransfer_adios2 $^ $(ADIOS2_LIB) $(LDFLAGS)
clean:
rm -f heatTransfer_ascii heatTransfer_adios2
rm -f heatTransfer_ascii heatTransfer_adios1 heatTransfer_adios2
......@@ -59,13 +59,15 @@ int main( int argc, char* argv [] )
double timeStart = MPI_Wtime();
std::shared_ptr<Settings> settings( new Settings( argc, argv, rank, nproc ));
std::shared_ptr<HeatTransfer> ht( new HeatTransfer( settings ));
std::shared_ptr<IO> io( new IO( settings->outputfile, mpiHeatTransferComm ));
std::shared_ptr<IO> io( new IO( settings, mpiHeatTransferComm ));
ht->init(false);
ht->init(true);
ht->printT("Initialized T:", mpiHeatTransferComm);
ht->heatEdges();
//ht->exchange( mpiHeatTransferComm );
ht->printT("Heated T:", mpiHeatTransferComm);
io->write( 0, ht, settings, mpiHeatTransferComm );
for( int t = 1; t <= settings->steps; ++t )
{
if( rank == 0 )
......@@ -73,8 +75,8 @@ int main( int argc, char* argv [] )
for( int iter = 1; iter <= settings->iterations; ++iter )
{
ht->iterate();
ht->heatEdges();
ht->exchange( mpiHeatTransferComm );
ht->heatEdges();
}
io->write( t, ht, settings, mpiHeatTransferComm );
}
......@@ -101,6 +103,7 @@ int main( int argc, char* argv [] )
std::cout << e.what() << std::endl;
}
MPI_Finalize();
return 0;
}
......
......@@ -24,7 +24,6 @@ void getcb( const void *data, std::string doid, std::string var, std::string dty
std::cout << "data object ID = " << doid << "\n"; //do you need to flush?
std::cout << "variable name = " << var << "\n";
std::cout << "data type = " << dtype << "\n";
float *dataf = (float*)data;
std::size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1, std::multiplies<std::size_t>());
......@@ -33,7 +32,6 @@ void getcb( const void *data, std::string doid, std::string var, std::string dty
std::cout << std::endl;
}
int main( int argc, char* argv [] )
{
MPI_Init( &argc, &argv );
......
......@@ -7,9 +7,22 @@
#include <vector>
#include <iostream>
#include <numeric>
#include "ADIOS_CPP.h"
void getcb( const void *data, std::string doid, std::string var, std::string dtype, std::vector<std::size_t> varshape )
{
std::cout << "data object ID = " << doid << "\n"; //do you need to flush?
std::cout << "variable name = " << var << "\n";
std::cout << "data type = " << dtype << "\n";
std::size_t varsize = std::accumulate(varshape.begin(), varshape.end(), 1, std::multiplies<std::size_t>());
for (int i=0; i<varsize; i++)
std::cout << ((float*)data)[i] << " ";
std::cout << std::endl;
}
int main( int argc, char* argv [] )
{
......@@ -20,8 +33,8 @@ int main( int argc, char* argv [] )
{
//Define method for engine creation, it is basically straight-forward parameters
adios::Method& datamanSettings = adios.DeclareMethod( "WAN", "DataManReader" ); //default method type is BPWriter
datamanSettings.SetParameters( "peer-to-peer=yes" );
datamanSettings.AddTransport( "Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3" );
datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=zmq", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12307", "remote_port=12306" );
// datamanSettings.AddTransport( "Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3" );
//datamanSettings.AddTransport( "ZeroMQ", "localIP=128.0.0.0.1.1", "remoteIP=128.0.0.0.2.1", "tolerances=1,2,3" ); not yet supported , will throw an exception
//Create engine smart pointer to DataManReader Engine due to polymorphism,
......@@ -31,6 +44,10 @@ int main( int argc, char* argv [] )
if( datamanReader == nullptr )
throw std::ios_base::failure( "ERROR: failed to create DataMan I/O engine at Open\n" );
datamanReader->SetCallBack( getcb );
while(1){}
adios::Variable<double>* ioMyDoubles = datamanReader->InquireVariableDouble( "ioMyDoubles" );
if( ioMyDoubles == nullptr )
std::cout << "Variable ioMyDoubles not read...yet\n";
......
......@@ -17,6 +17,7 @@ int main( int argc, char* argv [] )
adios::ADIOS adios( adiosDebug );
//Application variable
std::vector<float> myFloats = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
std::vector<double> myDoubles = { 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 };
const std::size_t Nx = myDoubles.size();
......@@ -30,12 +31,13 @@ int main( int argc, char* argv [] )
{
//Define variable and local size
//Define variable and local size
auto& ioMyDoubles = adios.DefineVariable<double>( "myDoubles", adios::Dims{Nx} );
auto& ioMyCFloats = adios.DefineVariable<std::complex<float>>( "myCFloats", {3} );
auto& ioMyFloats = adios.DefineVariable<float>( "myfloats", adios::Dims{Nx} );
// auto& ioMyDoubles = adios.DefineVariable<double>( "myDoubles", adios::Dims{Nx} );
// auto& ioMyCFloats = adios.DefineVariable<std::complex<float>>( "myCFloats", {3} );
//Define method for engine creation, it is basically straight-forward parameters
adios::Method& datamanSettings = adios.DeclareMethod( "WAN", "DataManWriter" ); //default method type is Writer
datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=dump", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12306", "remote_port=12307" );
datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=zmq", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12306", "remote_port=12307" );
// datamanSettings.AddTransport( "Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3" );
//datamanSettings.AddTransport( "ZeroMQ", "localIP=128.0.0.0.1.1", "remoteIP=128.0.0.0.2.1", "tolerances=1,2,3" ); not yet supported , will throw an exception
......@@ -46,8 +48,8 @@ int main( int argc, char* argv [] )
if( datamanWriter == nullptr )
throw std::ios_base::failure( "ERROR: failed to create DataMan I/O engine at Open\n" );
datamanWriter->Write( ioMyDoubles, myDoubles.data() ); // Base class Engine own the Write<T> that will call overloaded Write from Derived
datamanWriter->Write( ioMyCFloats, myCFloats.data() );
datamanWriter->Write( ioMyFloats, myFloats.data() ); // Base class Engine own the Write<T> that will call overloaded Write from Derived
// datamanWriter->Write( ioMyCFloats, myCFloats.data() );
datamanWriter->Close( );
}
catch( std::invalid_argument& e )
......
......@@ -79,8 +79,10 @@ private:
capsule::STLVector m_Buffer; ///< heap capsule, contains data and metadata buffers
format::BP1Writer m_BP1Writer; ///< format object will provide the required BP functionality to be applied on m_Buffer and m_Transports
std::function<void( const void*, std::string, std::string, std::string, Dims )> m_CallBack; ///< call back function
bool m_DoRealTime = false;
DataManager m_Man;
std::function<void( const void*, std::string, std::string, std::string, Dims )> m_CallBack; ///< call back function
void Init( ); ///< calls InitCapsules and InitTransports based on Method, called from constructor
void InitCapsules( );
......
......@@ -36,6 +36,7 @@ DataManReader::~DataManReader( )
void DataManReader::SetCallBack( std::function<void( const void*, std::string, std::string, std::string, Dims )> callback )
{
m_CallBack = callback;
m_Man.reg_callback(callback);
}
Variable<void>* DataManReader::InquireVariable( const std::string name, const bool readIn ) //not yet implemented
......@@ -109,8 +110,69 @@ void DataManReader::Init( )
", in call to ADIOS Open or DataManReader constructor\n" );
}
InitCapsules( );
InitTransports( );
auto itRealTime = m_Method.m_Parameters.find( "real_time" );
if( itRealTime != m_Method.m_Parameters.end() )
{
if( itRealTime->second == "yes" || itRealTime->second == "true" )
m_DoRealTime = true;
}
if(m_DoRealTime)
{
/**
* Lambda function that assigns a parameter in m_Method to a localVariable of type std::string
*/
auto lf_AssignString = [this]( const std::string parameter, std::string& localVariable )
{
auto it = m_Method.m_Parameters.find( parameter );
if( it != m_Method.m_Parameters.end() )
{
localVariable = it->second;
}
};
/**
* Lambda function that assigns a parameter in m_Method to a localVariable of type int
*/
auto lf_AssignInt = [this]( const std::string parameter, int& localVariable )
{
auto it = m_Method.m_Parameters.find( parameter );
if( it != m_Method.m_Parameters.end() )
{
localVariable = std::stoi( it->second );
}
};
std::string method_type, method, local_ip, remote_ip; //no need to initialize to empty (it's default)
int local_port=0, remote_port=0, num_channels=0;
lf_AssignString( "method_type", method_type );
if( method_type == "stream" )
{
lf_AssignString( "method", method );
lf_AssignString( "local_ip", local_ip );
lf_AssignString( "remote_ip", remote_ip );
lf_AssignInt( "local_port", local_port );
lf_AssignInt( "remote_port", remote_port );
lf_AssignInt( "num_channels", num_channels );
json jmsg;
jmsg["method"] = method;
jmsg["local_ip"] = local_ip;
jmsg["remote_ip"] = remote_ip;
jmsg["local_port"] = local_port;
jmsg["remote_port"] = remote_port;
jmsg["num_channels"] = num_channels;
jmsg["stream_mode"] = "receiver";
m_Man.add_stream(jmsg);
}
}
else
{
InitCapsules( );
InitTransports( );
}
}
......
......@@ -39,6 +39,7 @@ DataManWriter::~DataManWriter( )
void DataManWriter::SetCallBack( std::function<void( const void*, std::string, std::string, std::string, Dims )> callback )
{
m_CallBack = callback;
m_Man.reg_callback(callback);
}
void DataManWriter::Write( Variable<char>& variable, const char* values )
......@@ -141,6 +142,7 @@ void DataManWriter::Write( const std::string variableName, const std::complex<lo
void DataManWriter::Close( const int transportIndex )
{
m_Man.flush();
//here close IPs and deallocate or free/close resources (if using STL no need for memory deallocation)
}
......@@ -209,6 +211,7 @@ void DataManWriter::Init( )
jmsg["local_port"] = local_port;
jmsg["remote_port"] = remote_port;
jmsg["num_channels"] = num_channels;
jmsg["stream_mode"] = "sender";
m_Man.add_stream(jmsg);
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment