Skip to content
Snippets Groups Projects
Commit a945ec61 authored by wfg's avatar wfg
Browse files

Merge branch 'groupless'

parents c2dc375d 9d7bd89e
No related branches found
No related tags found
1 merge request!8Integrate groupless
......@@ -9,7 +9,7 @@ DATAMAN_LOCATION=/Users/w4g/Dropbox/lib/DataMan
MPICOMPILER=mpic++
if [ "$(uname)" == "Darwin" ]; then
CCOMPILER=clang++
CCOMPILER=g++
export DYLD_LIBRARY_PATH=$DATAMAN_LOCATION/lib:$DYLD_LIBRARY_PATH
elif [ "$(expr substr $(uname -s) 1 5)" == "Linux" ]; then
CCOMPILER=g++
......@@ -43,7 +43,7 @@ echo "#################################################################"
echo "#################################################################"
echo "DataMan reader"
echo "#################################################################"
./examples/hello/datamanReader/helloDataManReader_nompi.exe
#./examples/hello/datamanReader/helloDataManReader_nompi.exe
echo
echo
......
/* Current API by design, incorrect as code but describes all the ideas */
#include "ADIOS.h"
namespace adios
{
typedef enum {
VARYING_DIMENSION = -1,
LOCAL_VALUE = 0,
GLOBAL_VALUE = 1
};
typedef enum { ERROR = 0, WARN = 1, INFO = 2, DEBUG = 3 } VerboseFlag;
typedef enum {
COLLECTIVE_WRITERS = 0, INDEPENDENT_WRITERS = 1,
GLOBAL_READERS = 2, ROUNDROBIN_READERS = 3, FIFO_READERS = 4,
OPEN_ALL_STEPS = 5
} OpenMode;
typedef enum { NOWAITFORSTREAM = 0, WAITFORSTREAM = 1 } StreamOpenMode; // default: wait for stream
typedef enum {
APPEND = 0, UPDATE = 1, // writer advance modes
NEXT_AVAILABLE = 2, LATEST_AVAILABLE = 3, // reader advance modes
} AdvanceMode;
typedef enum { NONBLOCKINGREAD = 0, BLOCKINGREAD = 1 } ReadMode;
} // namespace adios
void dummy()
{
//Application variables
std::vector<double> Temperature;
std::vector<float> RaggedArray;
unsigned int Nx;
int Nparts, nproc;
// Global class/object that serves for init, finalize and provides ADIOS functions
adios::ADIOS adios( std::string configfile, MPI_Comm comm, adios::verboseflag flag, bool debugflag );
/*************
* WRITE API
*************/
/* Method
* We associate Engines and Transports and user settings into an object called Method.
* ADIOS check if it is defined by the user in the config file, and fills it out if it is.
*/
std::shared_ptr<adios::Method> method = adios.DeclareMethod( "MethodName" );
if( ! method.isDefinedInConfig() )
{
// if not defined by user, we can change the default settings
method.SetEngine( "BP" ); // BP is the default engine
method.AddTransport( "File", "lucky=yes" ); // ISO-POSIX file is the default transport
method.AddTransport( "Staging" ); //"The" staging method developed in ECP
method.SetParameters("have_metadata_file","yes" ); // Passing parameters to the engine
method.SetParameters( "Aggregation", (nproc+1)/2 ); // number of aggregators
method.SetParameters( "verbose", adios::WARN ); // Verbosity level for this engine and what it calls
}
//Define variables with transformations.
adios::Variable<unsigned int>& varNX = adios.DefineVariable<unsigned int>( "NX" ); // global single-value across processes
adios::Variable<int>& varNproc = adios.DefineVariable<int>( "nproc", adios::GLOBAL_VALUE ); // same def for global value
adios::Variable<int>& varNparts = adios.DefineVariable<int>( "Nparts", adios::LOCAL_VALUE ); // a single-value different on every process
adios::Variable<double>& var1D = adios.DefineVariable<double>( "Temperature", {nproc*Nx} ); // 1D global array
adios::Variable<float>& varRagged = adios.DefineVariable<float>( "Ragged", {nproc,adios::VARYING_DIMENSION} ); // ragged array
// Define a variable with local dimensions now, and make ADIOS allocate it inside its buffers (zero-copy API)
adios::Variable<double>& varZeroCopy = adios.DefineVariable<double>( "ZC", {nproc,Nx}, {1,NX}, {rank,0} ); // 2D global array, 1D decomposition
double *myVarZC = adios.AllocateVar( varZeroCopy );
//add transform to variable
adios::Transform bzip2 = adios::transform::BZIP2( );
var1D->AddTransform( bzip2, 1 );
// open...write.write.write...advance...write.write.write...advance... ...close cycle
// "w" create/overwrite on open, "a" append at open, "u" open for update (does not increase step), "r" open for read.
std::shared_ptr<adios::Engine> writer = adios.Open( "myNumbers.bp", "w", method, adios::INDEPENDENT_WRITERS );
if( writer == nullptr )
throw std::ios_base::failure( "ERROR: failed to open ADIOS writer\n" );
for (int step = 0; step < 10; ++step) {
// write scalar value
writer->Write<int>( varNparts, Nparts );
// Make a selection to describe the local dimensions of the variable we write and
// its offsets in the global spaces
adios::Selection& sel = adios.SelectionBoundingBox( {Nx}, {rank*Nx} ); // local dims and offsets; both as list
var1D.SetSelection( sel );
writer->Write<double>( var1D, Temperature.data() );
// Indicate we are done for this step.
// N-to-M Aggregation, disk I/O will be performed during this call, unless
// time aggregation postpones all of that to some later step.
// When Advance() returns, user can overwrite its Zero Copy variables.
// Internal buffer is freed only if there are no Zero Copy variables and there is no time aggregation going on
writer->Advance(); // same as AppendMode
writer->Advance( adios::APPEND ); // append new step at next write
writer->Advance( adios::UPDATE ); // do not increase step; ? should this cause error in staging ?
// When AdvanceAsync returns, user need to wait for notification that he can overwrite the Zero Copy variables.
writer->AdvanceAsync( callback_func_to_notify_me() );
}
// Called once: indicate that we are done with this output for the run
// Zero Copy variables will be deallocated
writer->Close();
/*************
* READ API
*************/
// 1. Open a stream, where every reader can see everything in a stream (so that they can read a global array)
// Blocking read of a variable
try
{
// Open a stream
std::shared_ptr<adios::Engine> reader =
adios.Open( "filename.bp", "r", method,
adios::GLOBAL_READERS, // Each reader process sees everything from the stream
adios::WAITFORSTREAM, // wait for the first step appear (default)
timeout ); // wait this long for the stream, return error afterwards
/* Variable names are available as a vector of strings */
std::cout << "List of variables in file: " << reader->VariableNames << "\n";
/* read a Global scalar which has a single value in a step */
reader->Read<unsigned int>( "NX", Nx );
// inquiry about a variable, whose name we know
adios::Variable var1D = reader.InquiryVariable( "Temperature" );
vector<uint64_t> gdims = var1D->GetGlobalDimensions();
int step = varID->GetStep();
struct adios::BlockInfo blocks = reader.InquiryVariableBlockInfo( reader, var1D ); // get per-writer size info
// this is adios1 ADIOS_VARBLOCK
struct adios::Statistics stats = reader.InquiryVariableStat( reader, var1D, perstepstat, perblockstat ); // get min/max statistics
// this is adios1 ADIOS_VARSTAT
while( true )
{
// Make a selection to describe the local dimensions of the variable we READ and
// its offsets in the global spaces
adios::Selection bbsel = adios.SelectionBoundingBox( {ldim}, {offs} ); // local dims and offsets; both as list
var1D->SetSelection( bbsel );
reader->Read<double>( var1D, Temperature.data() );
// Better for staging to schedule several reads at once
reader->ScheduleRead<double>( var1D, Temperature.data() );
reader->PerformRead( adios::BLOCKINGREAD );
// promise to not read more from this step/item
reader->Release();
// want to move on to the next available step/item
reader->Advance(adios::NEXT_AVAILABLE); // default
reader->Advance(adios::LATEST_AVAILABLE); // interested only in the latest data
}
// Close file/stream
reader->Close();
}
catch( adios::end_of_stream& e )
{
// Reached end of stream, end processing loop
// Close file/stream
bpReader->Close();
}
catch( adios::file_not_found& e )
{
// File/stream does not exist, quit
}
// 2. Open a stream, where each item from the writers will get to a single reader only
// If the writers are collective, that means a whole steps go to different readers
// If the writers are independent, that means each writer's output goes to different readers
// Also show here ScheduleRead/PerformRead
//try
{
// Open a stream
std::shared_ptr<adios::Engine> reader =
adios.Open( "filename.bp", "r", method,
adios::FIFO_READERS, // Each reader process sees everything from the stream
adios::WAITFORSTREAM, // wait for the first step appear (default)
timeout ); // wait this long for the stream, return error afterwards
while( true )
{
// Make a selection to describe the local dimensions of the variable we READ and
// its offsets in the global spaces if we know this somehow
adios::Selection bbsel = adios.SelectionBoundingBox( {ldim}, {offs} ); // local dims and offsets; both as list
var1D->SetSelection( bbsel );
reader->Read<double>( var1D, Temperature.data() );
// Let ADIOS allocate space for the incoming (per-writer) item
double * data = reader->Read<double>( var1D );
// promise to not read more from this step/item
reader->Release();
// want to move on to the next available step/item
reader->Advance(); // default
reader->Advance(adios::LATEST_AVAILABLE); // This should be an error, or could it make sense?
}
reader->Close();
}
// 3. Open a stream and return immediately, not waiting for data to appear
// In this mode we cannot inquiry variables, but can schedule reads
//try
{
// Open a stream
std::shared_ptr<adios::Engine> reader =
adios.Open( "filename.bp", "r", method,
adios::GLOBAL_READERS, // Each reader process sees everything from the stream
adios::NOWAITFORSTREAM // wait for the first step appear (default)
);
while( true )
{
// Let ADIOS allocate space for the incoming (per-writer) item
reader->ScheduleRead<void>(); // read whatever comes
// One way is to handle the incoming data through a callback (which will be called in a thread)
// void cb( const void *data, std::string doid, std::string var, std::string dtype, std::vector<std::size_t> varshape );
// void cb( adios::VARCHUNK * chunk ); // see adios1 for VARCHUNK
reader->SetReadCallback( cb );
reader->PerformRead( adios::NONBLOCKINGREAD );
// Another way is checking back manually like in adios1 and processing chunks
reader->PerformRead( adios::NONBLOCKINGREAD );
int ck;
adios::VARCHUNK * chunk;
try
{
while ( (ck = reader->CheckReads( &chunk )) > 0) {
if (chunk) {
// process the chunk first
// ...
// free memory of chunk (not the data!)
adios::FreeChunk(chunk);
} else {
// no chunk was returned, slow down a little
sleep(1);
}
}
}
catch( std::exception& e )
{
// some error happened while getting a chunk
}
reader->Release();
reader->Advance();
}
reader->Close();
}
// 4. Open it as file and see all steps at once.
// Allow for reading multiple steps of a variable into a contiguous array
try
{
// Open a stream
std::shared_ptr<adios::Engine> reader =
adios.Open( "filename.bp", "r", method,
adios::OPEN_ALL_STEPS, //File mode, no advance
);
/* NX */
/* There is a single value for each step. We can read all into a 1D array with a step selection.
* Steps are not automatically presented as an array dimension and read does not read it as array.
*/
// We can also just conveniently get the first step with a simple read statement.
reader->Read<unsigned int>( "NX", &Nx ); // read a Global scalar which has a single value in a step
adios::Variable<void> varNx = bpReader.InquiryVariable("Nx");
std::vector<int> Nxs( varNx->GetSteps() ); // number of steps available
// make a StepSelection to select multiple steps. Args: From, #of consecutive steps
adios::StepSelection stepsNx( 0, varNx->GetSteps() );
// ? How do we make a selection for an arbitrary list of steps ?
varNX.SetStepSelection( stepsNx );
reader->Read<unsigned int>( varNx, Nxs.data() );
auto itmax = std::max_element(std::begin(Nxs), std::end(Nxs));
auto itmin = std::min_element(std::begin(Nxs), std::end(Nxs));
if (*itmin != *itmax)
{
throw std::ios_base::failure( "ERROR: NX is not the same at all steps!\n" );
}
/* Nparts */
// Nparts local scalar is presented as a 1D array of nproc elements.
// We can read all steps into a 2D array of nproc * nsteps
adios::Variable<void> varNparts = bpReader.InquiryVariable("Nparts");
std::vector<int> partsV( Nproc * varNparts->GetSteps() );
varNparts->SetStepSelection(
adios.StepSelection( 0, varNparts->GetSteps() )
);
bpReader->Read<int>( varNparts, partsV.data() ); // missing spatial selection = whole array at each step
// Close file/stream
reader->Close();
}
catch( adios::end_of_stream& e )
{
// Reached end of stream, end processing loop
// Close file/stream
bpReader->Close();
}
catch( adios::file_not_found& e )
{
// File/stream does not exist, quit
}
}
......@@ -38,8 +38,8 @@ int main( int argc, char* argv [] )
try
{
//Define variable and local size
auto& ioMyDoubles = adios.DefineVariable<double>( "myDoubles", {Nx} );
auto& ioMyCFloats = adios.DefineVariable<std::complex<float>>( "myCFloats", {3} );
auto ioMyDoubles = adios.DefineVariable<double>( "myDoubles", {Nx} );
auto ioMyCFloats = adios.DefineVariable<std::complex<float>>( "myCFloats", {3} );
//Define method for engine creation, it is basically straight-forward parameters
adios::Method& datamanSettings = adios.DeclareMethod( "WAN", "DataManWriter" ); //default method type is Writer
......
......@@ -31,8 +31,6 @@ int main( int argc, char* argv [] )
{
//Define variable and local size
//Define variable and local size
adios::ADIOS adios( "config.xml", adiosDebug );
auto ioMyFloats = adios.DefineVariable<float>( "myfloats", adios::Dims{Nx} );
auto ioMyFloat = adios.DefineVariable<float>( "myfloat", adios::Dims{1} );
// auto& ioMyDoubles = adios.DefineVariable<double>( "myDoubles", adios::Dims{Nx} );
......@@ -40,7 +38,7 @@ int main( int argc, char* argv [] )
//Define method for engine creation, it is basically straight-forward parameters
adios::Method& datamanSettings = adios.DeclareMethod( "WAN", "DataManWriter" ); //default method type is Writer
datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=dump", "application=XGC1", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12306", "remote_port=12307" );
datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=dump", "monitoring=yes", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12306", "remote_port=12307" );
// datamanSettings.AddTransport( "Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3" );
//datamanSettings.AddTransport( "ZeroMQ", "localIP=128.0.0.0.1.1", "remoteIP=128.0.0.0.2.1", "tolerances=1,2,3" ); not yet supported , will throw an exception
......
......@@ -54,6 +54,7 @@ private:
format::BP1Writer m_BP1Writer; ///< format object will provide the required BP functionality to be applied on m_Buffer and m_Transports
bool m_DoRealTime = false;
bool m_DoMonitor = false;
DataManager m_Man;
std::function<void( const void*, std::string, std::string, std::string, Dims )> m_CallBack; ///< call back function
......@@ -95,7 +96,6 @@ private:
void Write( const std::string variableName, const std::complex<double>* values );
void Write( const std::string variableName, const std::complex<long double>* values );
/**
* From transport Mdtm in m_Method
* @param parameter must be an accepted parameter
......@@ -119,36 +119,35 @@ private:
jmsg["doid"] = m_Name;
jmsg["var"] = variable.m_Name;
jmsg["dtype"] = GetType<T>();
std::cout << "variable.m_Dimensions.size() = " << variable.m_Dimensions.size() << endl;
jmsg["putshape"] = variable.m_Dimensions;
// if(variable.m_GlobalDimensions.size() == 0) variable.m_GlobalDimensions = variable.m_Dimensions;
if(variable.m_GlobalDimensions.size() == 0) variable.m_GlobalDimensions = variable.m_Dimensions;
jmsg["varshape"] = variable.m_GlobalDimensions;
// if(variable.m_GlobalOffsets.size() == 0) variable.m_GlobalOffsets.assign(variable.m_Dimensions.size(),0);
if(variable.m_GlobalOffsets.size() == 0) variable.m_GlobalOffsets.assign(variable.m_Dimensions.size(),0);
jmsg["offset"] = variable.m_GlobalOffsets;
jmsg["timestep"] = 0;
m_Man.put(values, jmsg);
std::cout << "putshape " << variable.m_Dimensions.size() << endl;
std::cout << "varshape " << variable.m_GlobalDimensions.size() << endl;
std::cout << "offset " << variable.m_GlobalOffsets.size() << endl;
std::cout << "I am hooked to the DataMan library\n";
MPI_Barrier( m_MPIComm );
for( int i = 0; i < m_SizeMPI; ++i )
{
if( i == m_RankMPI )
{
std::cout << "Rank: " << m_RankMPI << "\n";
variable.Monitor( std::cout );
std::cout << std::endl;
}
else
if(m_DoMonitor){
MPI_Barrier( m_MPIComm );
std::cout << "I am hooked to the DataMan library\n";
std::cout << "putshape " << variable.m_Dimensions.size() << endl;
std::cout << "varshape " << variable.m_GlobalDimensions.size() << endl;
std::cout << "offset " << variable.m_GlobalOffsets.size() << endl;
for( int i = 0; i < m_SizeMPI; ++i )
{
sleep( 1 );
if( i == m_RankMPI )
{
std::cout << "Rank: " << m_RankMPI << "\n";
variable.Monitor( std::cout );
std::cout << std::endl;
}
else
{
sleep( 1 );
}
}
MPI_Barrier( m_MPIComm );
}
MPI_Barrier( m_MPIComm );
}
};
......
......@@ -143,30 +143,27 @@ void DataManReader::Init( )
}
};
std::string method_type, method, local_ip, remote_ip; //no need to initialize to empty (it's default)
int local_port=0, remote_port=0, num_channels=0;
lf_AssignString( "method_type", method_type );
if( method_type == "stream" )
auto is_number = [] (const std::string& s)
{
lf_AssignString( "method", method );
lf_AssignString( "local_ip", local_ip );
lf_AssignString( "remote_ip", remote_ip );
lf_AssignInt( "local_port", local_port );
lf_AssignInt( "remote_port", remote_port );
lf_AssignInt( "num_channels", num_channels );
json jmsg;
jmsg["method"] = method;
jmsg["local_ip"] = local_ip;
jmsg["remote_ip"] = remote_ip;
jmsg["local_port"] = local_port;
jmsg["remote_port"] = remote_port;
jmsg["num_channels"] = num_channels;
jmsg["stream_mode"] = "receiver";
m_Man.add_stream(jmsg);
return !s.empty() && std::find_if(s.begin(), s.end(), [](char c) { return !std::isdigit(c); }) == s.end();
};
json jmsg;
for(auto &i : m_Method.m_Parameters){
if( is_number(i.second) ){
jmsg[i.first] = std::stoi(i.second);
}
else{
jmsg[i.first] = i.second;
}
}
jmsg["stream_mode"] = "receiver";
m_Man.add_stream(jmsg);
std::string method_type;
int num_channels=0;
lf_AssignString( "method_type", method_type );
lf_AssignInt( "num_channels", num_channels );
}
else
{
......
......@@ -165,6 +165,13 @@ void DataManWriter::Init( )
m_DoRealTime = true;
}
itRealTime = m_Method.m_Parameters.find( "monitoring" );
if( itRealTime != m_Method.m_Parameters.end() )
{
if( itRealTime->second == "yes" || itRealTime->second == "true" )
m_DoMonitor = true;
}
if(m_DoRealTime)
{
/**
......@@ -191,37 +198,33 @@ void DataManWriter::Init( )
}
};
std::string method_type, method, local_ip, remote_ip; //no need to initialize to empty (it's default)
int local_port=0, remote_port=0, num_channels=0;
lf_AssignString( "method_type", method_type );
if( method_type == "stream" )
auto is_number = [] (const std::string& s)
{
lf_AssignString( "method", method );
lf_AssignString( "local_ip", local_ip );
lf_AssignString( "remote_ip", remote_ip );
lf_AssignInt( "local_port", local_port );
lf_AssignInt( "remote_port", remote_port );
lf_AssignInt( "num_channels", num_channels );
json jmsg;
jmsg["method"] = method;
jmsg["local_ip"] = local_ip;
jmsg["remote_ip"] = remote_ip;
jmsg["local_port"] = local_port;
jmsg["remote_port"] = remote_port;
jmsg["num_channels"] = num_channels;
jmsg["stream_mode"] = "sender";
m_Man.add_stream(jmsg);
return !s.empty() && std::find_if(s.begin(), s.end(), [](char c) { return !std::isdigit(c); }) == s.end();
};
json jmsg;
for(auto &i : m_Method.m_Parameters){
if( is_number(i.second) ){
jmsg[i.first] = std::stoi(i.second);
}
else{
jmsg[i.first] = i.second;
}
}
jmsg["stream_mode"] = "sender";
m_Man.add_stream(jmsg);
std::string method_type;
int num_channels=0;
lf_AssignString( "method_type", method_type );
lf_AssignInt( "num_channels", num_channels );
}
else
{
InitCapsules( );
InitTransports( );
}
}
void DataManWriter::InitCapsules( )
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment