Newer
Older
Podhorszki, Norbert
committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
/* Current API by design, incorrect as code but describes all the ideas */
#include "ADIOS.h"
namespace adios
{
typedef enum {
VARYING_DIMENSION = -1,
LOCAL_VALUE = 0,
GLOBAL_VALUE = 1
};
typedef enum { ERROR = 0, WARN = 1, INFO = 2, DEBUG = 3 } VerboseFlag;
typedef enum {
COLLECTIVE_WRITERS = 0, INDEPENDENT_WRITERS = 1,
GLOBAL_READERS = 2, ROUNDROBIN_READERS = 3, FIFO_READERS = 4,
OPEN_ALL_STEPS = 5
} OpenMode;
typedef enum { NOWAITFORSTREAM = 0, WAITFORSTREAM = 1 } StreamOpenMode; // default: wait for stream
typedef enum {
APPEND = 0, UPDATE = 1, // writer advance modes
NEXT_AVAILABLE = 2, LATEST_AVAILABLE = 3, // reader advance modes
} AdvanceMode;
typedef enum { NONBLOCKINGREAD = 0, BLOCKINGREAD = 1 } ReadMode;
} // namespace adios
void dummy()
{
//Application variables
std::vector<double> Temperature;
std::vector<float> RaggedArray;
unsigned int Nx;
int Nparts, nproc;
// Global class/object that serves for init, finalize and provides ADIOS functions
adios::ADIOS adios( std::string configfile, MPI_Comm comm, adios::verboseflag flag, bool debugflag );
/*************
* WRITE API
*************/
/* Method
* We associate Engines and Transports and user settings into an object called Method.
* ADIOS check if it is defined by the user in the config file, and fills it out if it is.
*/
std::shared_ptr<adios::Method> method = adios.DeclareMethod( "MethodName" );
if( ! method.isDefinedInConfig() )
{
// if not defined by user, we can change the default settings
method.SetEngine( "BP" ); // BP is the default engine
method.AddTransport( "File", "lucky=yes" ); // ISO-POSIX file is the default transport
method.AddTransport( "Staging" ); //"The" staging method developed in ECP
method.SetParameters("have_metadata_file","yes" ); // Passing parameters to the engine
method.SetParameters( "Aggregation", (nproc+1)/2 ); // number of aggregators
method.SetParameters( "verbose", adios::WARN ); // Verbosity level for this engine and what it calls
}
//Define variables with transformations.
adios::Variable<unsigned int>& varNX = adios.DefineVariable<unsigned int>( "NX" ); // global single-value across processes
adios::Variable<int>& varNproc = adios.DefineVariable<int>( "nproc", adios::GLOBAL_VALUE ); // same def for global value
adios::Variable<int>& varNparts = adios.DefineVariable<int>( "Nparts", adios::LOCAL_VALUE ); // a single-value different on every process
adios::Variable<double>& var1D = adios.DefineVariable<double>( "Temperature", {nproc*Nx} ); // 1D global array
adios::Variable<float>& varRagged = adios.DefineVariable<float>( "Ragged", {nproc,adios::VARYING_DIMENSION} ); // ragged array
// Define a variable with local dimensions now, and make ADIOS allocate it inside its buffers (zero-copy API)
adios::Variable<double>& varZeroCopy = adios.DefineVariable<double>( "ZC", {nproc,Nx}, {1,NX}, {rank,0} ); // 2D global array, 1D decomposition
double *myVarZC = adios.AllocateVar( varZeroCopy );
//add transform to variable
adios::Transform bzip2 = adios::transform::BZIP2( );
var1D->AddTransform( bzip2, 1 );
// open...write.write.write...advance...write.write.write...advance... ...close cycle
// "w" create/overwrite on open, "a" append at open, "u" open for update (does not increase step), "r" open for read.
std::shared_ptr<adios::Engine> writer = adios.Open( "myNumbers.bp", "w", method, adios::INDEPENDENT_WRITERS );
if( writer == nullptr )
throw std::ios_base::failure( "ERROR: failed to open ADIOS writer\n" );
for (int step = 0; step < 10; ++step) {
// write scalar value
writer->Write<int>( varNparts, Nparts );
// Make a selection to describe the local dimensions of the variable we write and
// its offsets in the global spaces
adios::Selection& sel = adios.SelectionBoundingBox( {Nx}, {rank*Nx} ); // local dims and offsets; both as list
var1D.SetSelection( sel );
writer->Write<double>( var1D, Temperature.data() );
// Indicate we are done for this step.
// N-to-M Aggregation, disk I/O will be performed during this call, unless
// time aggregation postpones all of that to some later step.
// When Advance() returns, user can overwrite its Zero Copy variables.
// Internal buffer is freed only if there are no Zero Copy variables and there is no time aggregation going on
writer->Advance(); // same as AppendMode
writer->Advance( adios::APPEND ); // append new step at next write
writer->Advance( adios::UPDATE ); // do not increase step; ? should this cause error in staging ?
// When AdvanceAsync returns, user need to wait for notification that he can overwrite the Zero Copy variables.
writer->AdvanceAsync( callback_func_to_notify_me() );
}
// Called once: indicate that we are done with this output for the run
// Zero Copy variables will be deallocated
writer->Close();
/*************
* READ API
*************/
// 1. Open a stream, where every reader can see everything in a stream (so that they can read a global array)
// Blocking read of a variable
try
{
// Open a stream
std::shared_ptr<adios::Engine> reader =
adios.Open( "filename.bp", "r", method,
adios::GLOBAL_READERS, // Each reader process sees everything from the stream
adios::WAITFORSTREAM, // wait for the first step appear (default)
timeout ); // wait this long for the stream, return error afterwards
/* Variable names are available as a vector of strings */
std::cout << "List of variables in file: " << reader->VariableNames << "\n";
/* read a Global scalar which has a single value in a step */
reader->Read<unsigned int>( "NX", Nx );
// inquiry about a variable, whose name we know
adios::Variable var1D = reader.InquiryVariable( "Temperature" );
vector<uint64_t> gdims = var1D->GetGlobalDimensions();
int step = varID->GetStep();
struct adios::BlockInfo blocks = reader.InquiryVariableBlockInfo( reader, var1D ); // get per-writer size info
// this is adios1 ADIOS_VARBLOCK
struct adios::Statistics stats = reader.InquiryVariableStat( reader, var1D, perstepstat, perblockstat ); // get min/max statistics
// this is adios1 ADIOS_VARSTAT
while( true )
{
// Make a selection to describe the local dimensions of the variable we READ and
// its offsets in the global spaces
adios::Selection bbsel = adios.SelectionBoundingBox( {ldim}, {offs} ); // local dims and offsets; both as list
var1D->SetSelection( bbsel );
reader->Read<double>( var1D, Temperature.data() );
// Better for staging to schedule several reads at once
reader->ScheduleRead<double>( var1D, Temperature.data() );
reader->PerformRead( adios::BLOCKINGREAD );
// promise to not read more from this step/item
reader->Release();
// want to move on to the next available step/item
reader->Advance(adios::NEXT_AVAILABLE); // default
reader->Advance(adios::LATEST_AVAILABLE); // interested only in the latest data
}
// Close file/stream
reader->Close();
}
catch( adios::end_of_stream& e )
{
// Reached end of stream, end processing loop
// Close file/stream
bpReader->Close();
}
catch( adios::file_not_found& e )
{
// File/stream does not exist, quit
}
// 2. Open a stream, where each item from the writers will get to a single reader only
// If the writers are collective, that means a whole steps go to different readers
// If the writers are independent, that means each writer's output goes to different readers
// Also show here ScheduleRead/PerformRead
//try
{
// Open a stream
std::shared_ptr<adios::Engine> reader =
adios.Open( "filename.bp", "r", method,
adios::FIFO_READERS, // Each reader process sees everything from the stream
adios::WAITFORSTREAM, // wait for the first step appear (default)
timeout ); // wait this long for the stream, return error afterwards
while( true )
{
// Make a selection to describe the local dimensions of the variable we READ and
// its offsets in the global spaces if we know this somehow
adios::Selection bbsel = adios.SelectionBoundingBox( {ldim}, {offs} ); // local dims and offsets; both as list
var1D->SetSelection( bbsel );
reader->Read<double>( var1D, Temperature.data() );
// Let ADIOS allocate space for the incoming (per-writer) item
double * data = reader->Read<double>( var1D );
// promise to not read more from this step/item
reader->Release();
// want to move on to the next available step/item
reader->Advance(); // default
reader->Advance(adios::LATEST_AVAILABLE); // This should be an error, or could it make sense?
}
reader->Close();
}
// 3. Open a stream and return immediately, not waiting for data to appear
// In this mode we cannot inquiry variables, but can schedule reads
//try
{
// Open a stream
std::shared_ptr<adios::Engine> reader =
adios.Open( "filename.bp", "r", method,
adios::GLOBAL_READERS, // Each reader process sees everything from the stream
adios::NOWAITFORSTREAM // wait for the first step appear (default)
);
while( true )
{
// Let ADIOS allocate space for the incoming (per-writer) item
reader->ScheduleRead<void>(); // read whatever comes
// One way is to handle the incoming data through a callback (which will be called in a thread)
// void cb( const void *data, std::string doid, std::string var, std::string dtype, std::vector<std::size_t> varshape );
// void cb( adios::VARCHUNK * chunk ); // see adios1 for VARCHUNK
reader->SetReadCallback( cb );
reader->PerformRead( adios::NONBLOCKINGREAD );
// Another way is checking back manually like in adios1 and processing chunks
reader->PerformRead( adios::NONBLOCKINGREAD );
int ck;
adios::VARCHUNK * chunk;
try
{
while ( (ck = reader->CheckReads( &chunk )) > 0) {
if (chunk) {
// process the chunk first
// ...
// free memory of chunk (not the data!)
adios::FreeChunk(chunk);
} else {
// no chunk was returned, slow down a little
sleep(1);
}
}
}
catch( std::exception& e )
{
// some error happened while getting a chunk
}
reader->Release();
reader->Advance();
}
reader->Close();
}
// 4. Open it as file and see all steps at once.
// Allow for reading multiple steps of a variable into a contiguous array
try
{
// Open a stream
std::shared_ptr<adios::Engine> reader =
adios.Open( "filename.bp", "r", method,
adios::OPEN_ALL_STEPS, //File mode, no advance
);
/* NX */
/* There is a single value for each step. We can read all into a 1D array with a step selection.
* Steps are not automatically presented as an array dimension and read does not read it as array.
*/
// We can also just conveniently get the first step with a simple read statement.
reader->Read<unsigned int>( "NX", &Nx ); // read a Global scalar which has a single value in a step
adios::Variable<void> varNx = bpReader.InquiryVariable("Nx");
std::vector<int> Nxs( varNx->GetSteps() ); // number of steps available
// make a StepSelection to select multiple steps. Args: From, #of consecutive steps
adios::StepSelection stepsNx( 0, varNx->GetSteps() );
// ? How do we make a selection for an arbitrary list of steps ?
varNX.SetStepSelection( stepsNx );
reader->Read<unsigned int>( varNx, Nxs.data() );
auto itmax = std::max_element(std::begin(Nxs), std::end(Nxs));
auto itmin = std::min_element(std::begin(Nxs), std::end(Nxs));
if (*itmin != *itmax)
{
throw std::ios_base::failure( "ERROR: NX is not the same at all steps!\n" );
}
/* Nparts */
// Nparts local scalar is presented as a 1D array of nproc elements.
// We can read all steps into a 2D array of nproc * nsteps
adios::Variable<void> varNparts = bpReader.InquiryVariable("Nparts");
std::vector<int> partsV( Nproc * varNparts->GetSteps() );
varNparts->SetStepSelection(
adios.StepSelection( 0, varNparts->GetSteps() )
);
bpReader->Read<int>( varNparts, partsV.data() ); // missing spatial selection = whole array at each step
// Close file/stream
reader->Close();
}
catch( adios::end_of_stream& e )
{
// Reached end of stream, end processing loop
// Close file/stream
bpReader->Close();
}
catch( adios::file_not_found& e )
{
// File/stream does not exist, quit
}
}