diff --git a/buildDataMan.sh b/buildDataMan.sh index 0e65f3f0c51e5f109fcd19ef35af398473f09a13..886d96447ab724f8f003605560cedac7b183a7cd 100755 --- a/buildDataMan.sh +++ b/buildDataMan.sh @@ -9,7 +9,7 @@ DATAMAN_LOCATION=/Users/w4g/Dropbox/lib/DataMan MPICOMPILER=mpic++ if [ "$(uname)" == "Darwin" ]; then - CCOMPILER=clang++ + CCOMPILER=g++ export DYLD_LIBRARY_PATH=$DATAMAN_LOCATION/lib:$DYLD_LIBRARY_PATH elif [ "$(expr substr $(uname -s) 1 5)" == "Linux" ]; then CCOMPILER=g++ @@ -43,7 +43,7 @@ echo "#################################################################" echo "#################################################################" echo "DataMan reader" echo "#################################################################" -./examples/hello/datamanReader/helloDataManReader_nompi.exe +#./examples/hello/datamanReader/helloDataManReader_nompi.exe echo echo diff --git a/doc/API_design_doc.h b/doc/API_design_doc.h new file mode 100644 index 0000000000000000000000000000000000000000..7f3b65369eb756a7fbe8209cd56d9ce098912ecc --- /dev/null +++ b/doc/API_design_doc.h @@ -0,0 +1,332 @@ +/* Current API by design, incorrect as code but describes all the ideas */ + +#include "ADIOS.h" + + +namespace adios +{ +typedef enum { + VARYING_DIMENSION = -1, + LOCAL_VALUE = 0, + GLOBAL_VALUE = 1 +}; + +typedef enum { ERROR = 0, WARN = 1, INFO = 2, DEBUG = 3 } VerboseFlag; + +typedef enum { + COLLECTIVE_WRITERS = 0, INDEPENDENT_WRITERS = 1, + GLOBAL_READERS = 2, ROUNDROBIN_READERS = 3, FIFO_READERS = 4, + OPEN_ALL_STEPS = 5 +} OpenMode; + +typedef enum { NOWAITFORSTREAM = 0, WAITFORSTREAM = 1 } StreamOpenMode; // default: wait for stream + +typedef enum { + APPEND = 0, UPDATE = 1, // writer advance modes + NEXT_AVAILABLE = 2, LATEST_AVAILABLE = 3, // reader advance modes +} AdvanceMode; + +typedef enum { NONBLOCKINGREAD = 0, BLOCKINGREAD = 1 } ReadMode; + +} // namespace adios + +void dummy() +{ + //Application variables + std::vector<double> Temperature; + std::vector<float> RaggedArray; + unsigned int Nx; + int Nparts, nproc; + + // Global class/object that serves for init, finalize and provides ADIOS functions + adios::ADIOS adios( std::string configfile, MPI_Comm comm, adios::verboseflag flag, bool debugflag ); + + + /************* + * WRITE API + *************/ + + /* Method + * We associate Engines and Transports and user settings into an object called Method. + * ADIOS check if it is defined by the user in the config file, and fills it out if it is. + */ + + std::shared_ptr<adios::Method> method = adios.DeclareMethod( "MethodName" ); + if( ! method.isDefinedInConfig() ) + { + // if not defined by user, we can change the default settings + method.SetEngine( "BP" ); // BP is the default engine + method.AddTransport( "File", "lucky=yes" ); // ISO-POSIX file is the default transport + method.AddTransport( "Staging" ); //"The" staging method developed in ECP + method.SetParameters("have_metadata_file","yes" ); // Passing parameters to the engine + method.SetParameters( "Aggregation", (nproc+1)/2 ); // number of aggregators + method.SetParameters( "verbose", adios::WARN ); // Verbosity level for this engine and what it calls + } + + + //Define variables with transformations. + adios::Variable<unsigned int>& varNX = adios.DefineVariable<unsigned int>( "NX" ); // global single-value across processes + adios::Variable<int>& varNproc = adios.DefineVariable<int>( "nproc", adios::GLOBAL_VALUE ); // same def for global value + adios::Variable<int>& varNparts = adios.DefineVariable<int>( "Nparts", adios::LOCAL_VALUE ); // a single-value different on every process + adios::Variable<double>& var1D = adios.DefineVariable<double>( "Temperature", {nproc*Nx} ); // 1D global array + adios::Variable<float>& varRagged = adios.DefineVariable<float>( "Ragged", {nproc,adios::VARYING_DIMENSION} ); // ragged array + + // Define a variable with local dimensions now, and make ADIOS allocate it inside its buffers (zero-copy API) + adios::Variable<double>& varZeroCopy = adios.DefineVariable<double>( "ZC", {nproc,Nx}, {1,NX}, {rank,0} ); // 2D global array, 1D decomposition + double *myVarZC = adios.AllocateVar( varZeroCopy ); + + //add transform to variable + adios::Transform bzip2 = adios::transform::BZIP2( ); + var1D->AddTransform( bzip2, 1 ); + + // open...write.write.write...advance...write.write.write...advance... ...close cycle + // "w" create/overwrite on open, "a" append at open, "u" open for update (does not increase step), "r" open for read. + std::shared_ptr<adios::Engine> writer = adios.Open( "myNumbers.bp", "w", method, adios::INDEPENDENT_WRITERS ); + if( writer == nullptr ) + throw std::ios_base::failure( "ERROR: failed to open ADIOS writer\n" ); + + for (int step = 0; step < 10; ++step) { + // write scalar value + writer->Write<int>( varNparts, Nparts ); + + // Make a selection to describe the local dimensions of the variable we write and + // its offsets in the global spaces + adios::Selection& sel = adios.SelectionBoundingBox( {Nx}, {rank*Nx} ); // local dims and offsets; both as list + var1D.SetSelection( sel ); + writer->Write<double>( var1D, Temperature.data() ); + + // Indicate we are done for this step. + // N-to-M Aggregation, disk I/O will be performed during this call, unless + // time aggregation postpones all of that to some later step. + // When Advance() returns, user can overwrite its Zero Copy variables. + // Internal buffer is freed only if there are no Zero Copy variables and there is no time aggregation going on + writer->Advance(); // same as AppendMode + writer->Advance( adios::APPEND ); // append new step at next write + writer->Advance( adios::UPDATE ); // do not increase step; ? should this cause error in staging ? + + // When AdvanceAsync returns, user need to wait for notification that he can overwrite the Zero Copy variables. + writer->AdvanceAsync( callback_func_to_notify_me() ); + } + + // Called once: indicate that we are done with this output for the run + // Zero Copy variables will be deallocated + writer->Close(); + + + + /************* + * READ API + *************/ + + // 1. Open a stream, where every reader can see everything in a stream (so that they can read a global array) + // Blocking read of a variable + try + { + + // Open a stream + std::shared_ptr<adios::Engine> reader = + adios.Open( "filename.bp", "r", method, + adios::GLOBAL_READERS, // Each reader process sees everything from the stream + adios::WAITFORSTREAM, // wait for the first step appear (default) + timeout ); // wait this long for the stream, return error afterwards + + /* Variable names are available as a vector of strings */ + std::cout << "List of variables in file: " << reader->VariableNames << "\n"; + /* read a Global scalar which has a single value in a step */ + reader->Read<unsigned int>( "NX", Nx ); + + // inquiry about a variable, whose name we know + adios::Variable var1D = reader.InquiryVariable( "Temperature" ); + vector<uint64_t> gdims = var1D->GetGlobalDimensions(); + int step = varID->GetStep(); + + struct adios::BlockInfo blocks = reader.InquiryVariableBlockInfo( reader, var1D ); // get per-writer size info + // this is adios1 ADIOS_VARBLOCK + struct adios::Statistics stats = reader.InquiryVariableStat( reader, var1D, perstepstat, perblockstat ); // get min/max statistics + // this is adios1 ADIOS_VARSTAT + + while( true ) + { + // Make a selection to describe the local dimensions of the variable we READ and + // its offsets in the global spaces + adios::Selection bbsel = adios.SelectionBoundingBox( {ldim}, {offs} ); // local dims and offsets; both as list + var1D->SetSelection( bbsel ); + reader->Read<double>( var1D, Temperature.data() ); + + // Better for staging to schedule several reads at once + reader->ScheduleRead<double>( var1D, Temperature.data() ); + reader->PerformRead( adios::BLOCKINGREAD ); + + // promise to not read more from this step/item + reader->Release(); + + // want to move on to the next available step/item + reader->Advance(adios::NEXT_AVAILABLE); // default + reader->Advance(adios::LATEST_AVAILABLE); // interested only in the latest data + } + // Close file/stream + reader->Close(); + } + catch( adios::end_of_stream& e ) + { + // Reached end of stream, end processing loop + // Close file/stream + bpReader->Close(); + } + catch( adios::file_not_found& e ) + { + // File/stream does not exist, quit + } + + + + // 2. Open a stream, where each item from the writers will get to a single reader only + // If the writers are collective, that means a whole steps go to different readers + // If the writers are independent, that means each writer's output goes to different readers + // Also show here ScheduleRead/PerformRead + //try + { + + // Open a stream + std::shared_ptr<adios::Engine> reader = + adios.Open( "filename.bp", "r", method, + adios::FIFO_READERS, // Each reader process sees everything from the stream + adios::WAITFORSTREAM, // wait for the first step appear (default) + timeout ); // wait this long for the stream, return error afterwards + + while( true ) + { + // Make a selection to describe the local dimensions of the variable we READ and + // its offsets in the global spaces if we know this somehow + adios::Selection bbsel = adios.SelectionBoundingBox( {ldim}, {offs} ); // local dims and offsets; both as list + var1D->SetSelection( bbsel ); + reader->Read<double>( var1D, Temperature.data() ); + + // Let ADIOS allocate space for the incoming (per-writer) item + double * data = reader->Read<double>( var1D ); + + // promise to not read more from this step/item + reader->Release(); + + // want to move on to the next available step/item + reader->Advance(); // default + reader->Advance(adios::LATEST_AVAILABLE); // This should be an error, or could it make sense? + } + reader->Close(); + } + + + // 3. Open a stream and return immediately, not waiting for data to appear + // In this mode we cannot inquiry variables, but can schedule reads + //try + { + + // Open a stream + std::shared_ptr<adios::Engine> reader = + adios.Open( "filename.bp", "r", method, + adios::GLOBAL_READERS, // Each reader process sees everything from the stream + adios::NOWAITFORSTREAM // wait for the first step appear (default) + ); + + while( true ) + { + + // Let ADIOS allocate space for the incoming (per-writer) item + reader->ScheduleRead<void>(); // read whatever comes + + // One way is to handle the incoming data through a callback (which will be called in a thread) + // void cb( const void *data, std::string doid, std::string var, std::string dtype, std::vector<std::size_t> varshape ); + // void cb( adios::VARCHUNK * chunk ); // see adios1 for VARCHUNK + reader->SetReadCallback( cb ); + reader->PerformRead( adios::NONBLOCKINGREAD ); + + // Another way is checking back manually like in adios1 and processing chunks + reader->PerformRead( adios::NONBLOCKINGREAD ); + int ck; + adios::VARCHUNK * chunk; + try + { + while ( (ck = reader->CheckReads( &chunk )) > 0) { + if (chunk) { + // process the chunk first + // ... + // free memory of chunk (not the data!) + adios::FreeChunk(chunk); + } else { + // no chunk was returned, slow down a little + sleep(1); + } + } + } + catch( std::exception& e ) + { + // some error happened while getting a chunk + } + + reader->Release(); + reader->Advance(); + } + reader->Close(); + } + + + // 4. Open it as file and see all steps at once. + // Allow for reading multiple steps of a variable into a contiguous array + try + { + + // Open a stream + std::shared_ptr<adios::Engine> reader = + adios.Open( "filename.bp", "r", method, + adios::OPEN_ALL_STEPS, //File mode, no advance + ); + + /* NX */ + /* There is a single value for each step. We can read all into a 1D array with a step selection. + * Steps are not automatically presented as an array dimension and read does not read it as array. + */ + // We can also just conveniently get the first step with a simple read statement. + reader->Read<unsigned int>( "NX", &Nx ); // read a Global scalar which has a single value in a step + + + adios::Variable<void> varNx = bpReader.InquiryVariable("Nx"); + std::vector<int> Nxs( varNx->GetSteps() ); // number of steps available + // make a StepSelection to select multiple steps. Args: From, #of consecutive steps + adios::StepSelection stepsNx( 0, varNx->GetSteps() ); + // ? How do we make a selection for an arbitrary list of steps ? + varNX.SetStepSelection( stepsNx ); + reader->Read<unsigned int>( varNx, Nxs.data() ); + + auto itmax = std::max_element(std::begin(Nxs), std::end(Nxs)); + auto itmin = std::min_element(std::begin(Nxs), std::end(Nxs)); + if (*itmin != *itmax) + { + throw std::ios_base::failure( "ERROR: NX is not the same at all steps!\n" ); + } + + + /* Nparts */ + // Nparts local scalar is presented as a 1D array of nproc elements. + // We can read all steps into a 2D array of nproc * nsteps + adios::Variable<void> varNparts = bpReader.InquiryVariable("Nparts"); + std::vector<int> partsV( Nproc * varNparts->GetSteps() ); + varNparts->SetStepSelection( + adios.StepSelection( 0, varNparts->GetSteps() ) + ); + bpReader->Read<int>( varNparts, partsV.data() ); // missing spatial selection = whole array at each step + + // Close file/stream + reader->Close(); + } + catch( adios::end_of_stream& e ) + { + // Reached end of stream, end processing loop + // Close file/stream + bpReader->Close(); + } + catch( adios::file_not_found& e ) + { + // File/stream does not exist, quit + } + +} diff --git a/examples/hello/datamanWriter/helloDataManWriter.cpp b/examples/hello/datamanWriter/helloDataManWriter.cpp index 4b7e2d3eb28177e63ceaa3f0bcc09a234b2e2709..d4b4dee1f25464bfd3bf8e8ebf77a1adebe7dc97 100644 --- a/examples/hello/datamanWriter/helloDataManWriter.cpp +++ b/examples/hello/datamanWriter/helloDataManWriter.cpp @@ -38,8 +38,8 @@ int main( int argc, char* argv [] ) try { //Define variable and local size - auto& ioMyDoubles = adios.DefineVariable<double>( "myDoubles", {Nx} ); - auto& ioMyCFloats = adios.DefineVariable<std::complex<float>>( "myCFloats", {3} ); + auto ioMyDoubles = adios.DefineVariable<double>( "myDoubles", {Nx} ); + auto ioMyCFloats = adios.DefineVariable<std::complex<float>>( "myCFloats", {3} ); //Define method for engine creation, it is basically straight-forward parameters adios::Method& datamanSettings = adios.DeclareMethod( "WAN", "DataManWriter" ); //default method type is Writer diff --git a/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp b/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp index eef6c02d718416240a5d8d99ac3989c2c090a931..107e313de1f648b52c1782ae05a55f100b2fbdb2 100644 --- a/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp +++ b/examples/hello/datamanWriter/helloDataManWriter_nompi.cpp @@ -31,8 +31,6 @@ int main( int argc, char* argv [] ) { //Define variable and local size //Define variable and local size - adios::ADIOS adios( "config.xml", adiosDebug ); - auto ioMyFloats = adios.DefineVariable<float>( "myfloats", adios::Dims{Nx} ); auto ioMyFloat = adios.DefineVariable<float>( "myfloat", adios::Dims{1} ); // auto& ioMyDoubles = adios.DefineVariable<double>( "myDoubles", adios::Dims{Nx} ); @@ -40,7 +38,7 @@ int main( int argc, char* argv [] ) //Define method for engine creation, it is basically straight-forward parameters adios::Method& datamanSettings = adios.DeclareMethod( "WAN", "DataManWriter" ); //default method type is Writer - datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=dump", "application=XGC1", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12306", "remote_port=12307" ); + datamanSettings.SetParameters( "real_time=yes", "method_type=stream", "method=dump", "monitoring=yes", "local_ip=127.0.0.1", "remote_ip=127.0.0.1", "local_port=12306", "remote_port=12307" ); // datamanSettings.AddTransport( "Mdtm", "localIP=128.0.0.0.1", "remoteIP=128.0.0.0.2", "tolerances=1,2,3" ); //datamanSettings.AddTransport( "ZeroMQ", "localIP=128.0.0.0.1.1", "remoteIP=128.0.0.0.2.1", "tolerances=1,2,3" ); not yet supported , will throw an exception diff --git a/include/engine/dataman/DataManWriter.h b/include/engine/dataman/DataManWriter.h index 58c2728617f1df0f5cdd3dc79cac16af9d6d94d8..8f00973042b981cecec402ad92ab96ba8b4df6bb 100644 --- a/include/engine/dataman/DataManWriter.h +++ b/include/engine/dataman/DataManWriter.h @@ -54,6 +54,7 @@ private: format::BP1Writer m_BP1Writer; ///< format object will provide the required BP functionality to be applied on m_Buffer and m_Transports bool m_DoRealTime = false; + bool m_DoMonitor = false; DataManager m_Man; std::function<void( const void*, std::string, std::string, std::string, Dims )> m_CallBack; ///< call back function @@ -95,7 +96,6 @@ private: void Write( const std::string variableName, const std::complex<double>* values ); void Write( const std::string variableName, const std::complex<long double>* values ); - /** * From transport Mdtm in m_Method * @param parameter must be an accepted parameter @@ -119,36 +119,35 @@ private: jmsg["doid"] = m_Name; jmsg["var"] = variable.m_Name; jmsg["dtype"] = GetType<T>(); - std::cout << "variable.m_Dimensions.size() = " << variable.m_Dimensions.size() << endl; jmsg["putshape"] = variable.m_Dimensions; -// if(variable.m_GlobalDimensions.size() == 0) variable.m_GlobalDimensions = variable.m_Dimensions; + if(variable.m_GlobalDimensions.size() == 0) variable.m_GlobalDimensions = variable.m_Dimensions; jmsg["varshape"] = variable.m_GlobalDimensions; -// if(variable.m_GlobalOffsets.size() == 0) variable.m_GlobalOffsets.assign(variable.m_Dimensions.size(),0); + if(variable.m_GlobalOffsets.size() == 0) variable.m_GlobalOffsets.assign(variable.m_Dimensions.size(),0); jmsg["offset"] = variable.m_GlobalOffsets; jmsg["timestep"] = 0; m_Man.put(values, jmsg); - std::cout << "putshape " << variable.m_Dimensions.size() << endl; - std::cout << "varshape " << variable.m_GlobalDimensions.size() << endl; - std::cout << "offset " << variable.m_GlobalOffsets.size() << endl; - - std::cout << "I am hooked to the DataMan library\n"; - MPI_Barrier( m_MPIComm ); - - for( int i = 0; i < m_SizeMPI; ++i ) - { - if( i == m_RankMPI ) - { - std::cout << "Rank: " << m_RankMPI << "\n"; - variable.Monitor( std::cout ); - std::cout << std::endl; - } - else + if(m_DoMonitor){ + MPI_Barrier( m_MPIComm ); + std::cout << "I am hooked to the DataMan library\n"; + std::cout << "putshape " << variable.m_Dimensions.size() << endl; + std::cout << "varshape " << variable.m_GlobalDimensions.size() << endl; + std::cout << "offset " << variable.m_GlobalOffsets.size() << endl; + for( int i = 0; i < m_SizeMPI; ++i ) { - sleep( 1 ); + if( i == m_RankMPI ) + { + std::cout << "Rank: " << m_RankMPI << "\n"; + variable.Monitor( std::cout ); + std::cout << std::endl; + } + else + { + sleep( 1 ); + } } + MPI_Barrier( m_MPIComm ); } - MPI_Barrier( m_MPIComm ); } }; diff --git a/src/engine/dataman/DataManReader.cpp b/src/engine/dataman/DataManReader.cpp index 0f5a5c044cf47014b6073205108cf97888f1f15f..2704088d6cefef59e80ac9b9a9bbc733873fafe5 100644 --- a/src/engine/dataman/DataManReader.cpp +++ b/src/engine/dataman/DataManReader.cpp @@ -143,30 +143,27 @@ void DataManReader::Init( ) } }; - std::string method_type, method, local_ip, remote_ip; //no need to initialize to empty (it's default) - int local_port=0, remote_port=0, num_channels=0; - - lf_AssignString( "method_type", method_type ); - if( method_type == "stream" ) + auto is_number = [] (const std::string& s) { - lf_AssignString( "method", method ); - lf_AssignString( "local_ip", local_ip ); - lf_AssignString( "remote_ip", remote_ip ); - lf_AssignInt( "local_port", local_port ); - lf_AssignInt( "remote_port", remote_port ); - lf_AssignInt( "num_channels", num_channels ); - - json jmsg; - jmsg["method"] = method; - jmsg["local_ip"] = local_ip; - jmsg["remote_ip"] = remote_ip; - jmsg["local_port"] = local_port; - jmsg["remote_port"] = remote_port; - jmsg["num_channels"] = num_channels; - jmsg["stream_mode"] = "receiver"; - - m_Man.add_stream(jmsg); + return !s.empty() && std::find_if(s.begin(), s.end(), [](char c) { return !std::isdigit(c); }) == s.end(); + }; + + json jmsg; + for(auto &i : m_Method.m_Parameters){ + if( is_number(i.second) ){ + jmsg[i.first] = std::stoi(i.second); + } + else{ + jmsg[i.first] = i.second; + } } + jmsg["stream_mode"] = "receiver"; + m_Man.add_stream(jmsg); + + std::string method_type; + int num_channels=0; + lf_AssignString( "method_type", method_type ); + lf_AssignInt( "num_channels", num_channels ); } else { diff --git a/src/engine/dataman/DataManWriter.cpp b/src/engine/dataman/DataManWriter.cpp index 9152f6b15b924440ae9e09ea259a6226506861b0..7f06b75c7d7af85a7f7358e58cdba80e062f7687 100644 --- a/src/engine/dataman/DataManWriter.cpp +++ b/src/engine/dataman/DataManWriter.cpp @@ -165,6 +165,13 @@ void DataManWriter::Init( ) m_DoRealTime = true; } + itRealTime = m_Method.m_Parameters.find( "monitoring" ); + if( itRealTime != m_Method.m_Parameters.end() ) + { + if( itRealTime->second == "yes" || itRealTime->second == "true" ) + m_DoMonitor = true; + } + if(m_DoRealTime) { /** @@ -191,37 +198,33 @@ void DataManWriter::Init( ) } }; - std::string method_type, method, local_ip, remote_ip; //no need to initialize to empty (it's default) - int local_port=0, remote_port=0, num_channels=0; - - lf_AssignString( "method_type", method_type ); - if( method_type == "stream" ) + auto is_number = [] (const std::string& s) { - lf_AssignString( "method", method ); - lf_AssignString( "local_ip", local_ip ); - lf_AssignString( "remote_ip", remote_ip ); - lf_AssignInt( "local_port", local_port ); - lf_AssignInt( "remote_port", remote_port ); - lf_AssignInt( "num_channels", num_channels ); - - json jmsg; - jmsg["method"] = method; - jmsg["local_ip"] = local_ip; - jmsg["remote_ip"] = remote_ip; - jmsg["local_port"] = local_port; - jmsg["remote_port"] = remote_port; - jmsg["num_channels"] = num_channels; - jmsg["stream_mode"] = "sender"; - - m_Man.add_stream(jmsg); + return !s.empty() && std::find_if(s.begin(), s.end(), [](char c) { return !std::isdigit(c); }) == s.end(); + }; + + json jmsg; + for(auto &i : m_Method.m_Parameters){ + if( is_number(i.second) ){ + jmsg[i.first] = std::stoi(i.second); + } + else{ + jmsg[i.first] = i.second; + } } + jmsg["stream_mode"] = "sender"; + m_Man.add_stream(jmsg); + + std::string method_type; + int num_channels=0; + lf_AssignString( "method_type", method_type ); + lf_AssignInt( "num_channels", num_channels ); } else { InitCapsules( ); InitTransports( ); } - } void DataManWriter::InitCapsules( )