Newer
Older
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* BP1.h
*
* Created on: Jan 24, 2017
* Author: wfg
*/
#ifndef BP1WRITER_H_
#define BP1WRITER_H_
#include <algorithm> //std::count, std::copy, std::for_each
#include <cmath> //std::ceil
#include <cstring> //std::memcpy
#include "utilities/format/bp1/BP1Base.h"
#include "utilities/format/bp1/BP1Structs.h"
#include "capsule/heap/STLVector.h"
#include "core/Variable.h"
#include "functions/adiosTemplates.h"
class BP1Writer : public BP1Base
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
unsigned int m_Threads = 1; ///< thread operations in large array (min,max)
unsigned int m_Verbosity = 0; ///< statistics verbosity, only 0 is supported
float m_GrowthFactor = 1.5; ///< memory growth factor
const std::uint8_t m_Version = 3; ///< BP format version
/**
* Calculates the Process Index size in bytes according to the BP format,
* including list of method with no parameters (for now)
* @param name
* @param timeStepName
* @param numberOfTransports
* @return size of pg index
*/
std::size_t GetProcessGroupIndexSize(
const std::string name, const std::string timeStepName,
const std::size_t numberOfTransports) const noexcept;
/**
* Writes a process group index PGIndex and list of methods (from
* transports),
* done at Open or aggregation of new time step
* Version that operates on a single heap buffer and metadataset.
* @param isFortran
* @param name
* @param processID
* @param transports
* @param buffer
* @param metadataSet
*/
void WriteProcessGroupIndex(
const bool isFortran, const std::string name,
const std::uint32_t processID,
const std::vector<std::shared_ptr<Transport>> &transports,
capsule::STLVector &heap, BP1MetadataSet &metadataSet) const noexcept;
/**
* Returns the estimated variable index size
* @param group
* @param variableName
* @param variable
* @param verbosity
* @return variable index size
*/
template <class T>
size_t GetVariableIndexSize(const Variable<T> &variable) const noexcept
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
// size_t indexSize = varEntryLength + memberID + lengthGroupName +
// groupName + lengthVariableName + lengthOfPath + path + datatype
size_t indexSize = 23; // without characteristics
indexSize += variable.m_Name.size();
// characteristics 3 and 4, check variable number of dimensions
const std::size_t dimensions =
variable.DimensionsSize(); // commas in CSV + 1
indexSize += 28 * dimensions; // 28 bytes per dimension
indexSize += 1; // id
// characteristics, offset + payload offset in data
indexSize += 2 * (1 + 8);
// characteristic 0, if scalar add value, for now only allowing string
if (dimensions == 1)
{
indexSize += sizeof(T);
indexSize += 1; // id
// must have an if here
indexSize += 2 + variable.m_Name.size();
indexSize += 1; // id
}
// characteristic statistics
if (m_Verbosity == 0) // default, only min and max
{
indexSize += 2 * (sizeof(T) + 1);
indexSize += 1 + 1; // id
}
return indexSize + 12; /// extra 12 bytes in case of attributes
// need to add transform characteristics
/**
* Version for primitive types (except std::complex<T>)
* @param variable
* @param heap
* @param metadataSet
*/
template <class T>
void WriteVariableMetadata(const Variable<T> &variable,
capsule::STLVector &heap,
BP1MetadataSet &metadataSet) const noexcept
Stats<T> stats = GetStats(variable);
WriteVariableMetadataCommon(variable, stats, heap, metadataSet);
/**
* Overloaded version for std::complex<T> variables
* @param variable
* @param heap
* @param metadataSet
*/
template <class T>
void WriteVariableMetadata(const Variable<std::complex<T>> &variable,
capsule::STLVector &heap,
BP1MetadataSet &metadataSet) const noexcept
{
Stats<T> stats = GetStats(variable);
WriteVariableMetadataCommon(variable, stats, heap, metadataSet);
}
/**
* Expensive part this is only for heap buffers need to adapt to vector of
* capsules
* @param variable
* @param buffer
*/
template <class T>
void WriteVariablePayload(const Variable<T> &variable,
capsule::STLVector &heap,
const unsigned int nthreads = 1) const noexcept
// EXPENSIVE part, might want to use threads if large, serial for now
InsertToBuffer(heap.m_Data, variable.m_AppValues, variable.TotalSize());
heap.m_DataAbsolutePosition += variable.PayLoadSize();
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
void Advance(BP1MetadataSet &metadataSet, capsule::STLVector &buffer);
/**
* Function that sets metadata (if first close) and writes to a single
* transport
* @param metadataSet current rank metadata set
* @param heap contains data buffer
* @param transport does a write after data and metadata is setup
* @param isFirstClose true: metadata has been set and aggregated
* @param doAggregation true: for N-to-M, false: for N-to-N
*/
void Close(BP1MetadataSet &metadataSet, capsule::STLVector &heap,
Transport &transport, bool &isFirstClose,
const bool doAggregation) const noexcept;
/**
* Writes the ADIOS log information (buffering, open, write and close) for a
* rank process
* @param rank current rank
* @param metadataSet contains Profile info for buffering
* @param transports contains Profile info for transport open, writes and
* close
* @return string for this rank that will be aggregated into profiling.log
*/
std::string GetRankProfilingLog(
const int rank, const BP1MetadataSet &metadataSet,
const std::vector<std::shared_ptr<Transport>> &transports) const
noexcept;
private:
template <class T, class U>
void WriteVariableMetadataCommon(const Variable<T> &variable,
Stats<U> &stats, capsule::STLVector &heap,
BP1MetadataSet &metadataSet) const noexcept
stats.TimeIndex = metadataSet.TimeStep;
// Get new Index or point to existing index
bool isNew = true; // flag to check if variable is new
BP1Index &varIndex =
GetBP1Index(variable.m_Name, metadataSet.VarsIndices, isNew);
stats.MemberID = varIndex.MemberID;
// write metadata header in data and extract offsets
stats.Offset = heap.m_DataAbsolutePosition;
WriteVariableMetadataInData(variable, stats, heap);
stats.PayloadOffset = heap.m_DataAbsolutePosition;
// write to metadata index
WriteVariableMetadataInIndex(variable, stats, isNew, varIndex);
++metadataSet.DataPGVarsCount;
template <class T, class U>
void WriteVariableMetadataInData(const Variable<T> &variable,
const Stats<U> &stats,
capsule::STLVector &heap) const noexcept
auto &buffer = heap.m_Data;
const std::size_t varLengthPosition =
buffer.size(); // capture initial position for variable length
buffer.insert(buffer.end(), 8, 0); // skip var length (8)
InsertToBuffer(buffer, &stats.MemberID); // memberID
WriteNameRecord(variable.m_Name, buffer); // variable name
buffer.insert(buffer.end(), 2, 0); // skip path
const std::uint8_t dataType = GetDataType<T>(); // dataType
InsertToBuffer(buffer, &dataType);
// write variable dimensions
const std::uint8_t dimensions = variable.m_LocalDimensions.size();
InsertToBuffer(buffer, &dimensions); // count
std::uint16_t dimensionsLength =
27 *
dimensions; // 27 is from 9 bytes for each: var y/n + local, var
// y/n + global dimension, var y/n + global offset,
// changed for characteristic
InsertToBuffer(buffer, &dimensionsLength); // length
WriteDimensionsRecord(buffer, variable.m_LocalDimensions,
variable.m_GlobalDimensions, variable.m_Offsets,
18, true);
// CHARACTERISTICS
WriteVariableCharacteristics(variable, stats, buffer, true);
// Back to varLength including payload size
const std::uint64_t varLength = buffer.size() - varLengthPosition +
variable.PayLoadSize() -
8; // remove its own size
CopyToBufferPosition(buffer, varLengthPosition, &varLength); // length
heap.m_DataAbsolutePosition +=
buffer.size() - varLengthPosition; // update absolute position to be
// used as payload position
template <class T, class U>
void WriteVariableMetadataInIndex(const Variable<T> &variable,
const Stats<U> &stats, const bool isNew,
BP1Index &index) const noexcept
auto &buffer = index.Buffer;
if (isNew == true) // write variable header (might be shared with
// attributes index)
{
buffer.insert(buffer.end(), 4, 0); // skip var length (4)
buffer.insert(buffer.end(), 2, 0); // skip group name
WriteNameRecord(variable.m_Name, buffer);
buffer.insert(buffer.end(), 2, 0); // skip path
const std::uint8_t dataType = GetDataType<T>();
InsertToBuffer(buffer, &dataType);
// Characteristics Sets Count in Metadata
index.Count = 1;
InsertToBuffer(buffer, &index.Count);
}
else // update characteristics sets count
{
const std::size_t characteristicsSetsCountPosition =
15 + variable.m_Name.size();
++index.Count;
CopyToBufferPosition(buffer, characteristicsSetsCountPosition,
&index.Count); // test
}
WriteVariableCharacteristics(variable, stats, buffer);
template <class T, class U>
void WriteVariableCharacteristics(const Variable<T> &variable,
const Stats<U> &stats,
std::vector<char> &buffer,
const bool addLength = false) const
noexcept
const std::size_t characteristicsCountPosition =
buffer.size(); // very important to track as writer is going back to
// this position
buffer.insert(buffer.end(), 5,
0); // skip characteristics count(1) + length (4)
std::uint8_t characteristicsCounter = 0;
// DIMENSIONS
std::uint8_t characteristicID = characteristic_dimensions;
InsertToBuffer(buffer, &characteristicID);
const std::uint8_t dimensions = variable.m_LocalDimensions.size();
if (addLength == true)
{
const std::int16_t lengthOfDimensionsCharacteristic =
24 * dimensions +
3; // 24 = 3 local, global, offset x 8 bytes/each
InsertToBuffer(buffer, &lengthOfDimensionsCharacteristic);
InsertToBuffer(buffer, &dimensions); // count
const std::uint16_t dimensionsLength = 24 * dimensions;
InsertToBuffer(buffer, &dimensionsLength); // length
WriteDimensionsRecord(buffer, variable.m_LocalDimensions,
variable.m_GlobalDimensions, variable.m_Offsets,
16, addLength);
++characteristicsCounter;
// VALUE for SCALAR or STAT min, max for ARRAY
WriteBoundsRecord(variable.m_IsScalar, stats, buffer,
characteristicsCounter, addLength);
// TIME INDEX
WriteCharacteristicRecord(characteristic_time_index, stats.TimeIndex,
buffer, characteristicsCounter, addLength);
if (addLength == false) // only in metadata offset and payload offset
{
WriteCharacteristicRecord(characteristic_offset, stats.Offset,
buffer, characteristicsCounter);
WriteCharacteristicRecord(characteristic_payload_offset,
stats.PayloadOffset, buffer,
characteristicsCounter);
}
// END OF CHARACTERISTICS
// Back to characteristics count and length
CopyToBufferPosition(buffer, characteristicsCountPosition,
&characteristicsCounter); // count (1)
const std::uint32_t characteristicsLength =
buffer.size() - characteristicsCountPosition - 4 -
1; // remove its own length (4 bytes) + characteristic counter ( 1
// byte
// )
CopyToBufferPosition(buffer, characteristicsCountPosition + 1,
&characteristicsLength); // length
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
/**
* Writes from &buffer[position]: [2
* bytes:string.length()][string.length():
* string.c_str()]
* @param name
* @param buffer
* @param position
*/
void WriteNameRecord(const std::string name,
std::vector<char> &buffer) const noexcept;
/**
* Write a dimension record for a global variable used by
* WriteVariableCommon
* @param buffer
* @param position
* @param localDimensions
* @param globalDimensions
* @param offsets
* @param addType true: for data buffers, false: for metadata buffer and
* data
* characteristic
*/
void WriteDimensionsRecord(std::vector<char> &buffer,
const std::vector<std::size_t> &localDimensions,
const std::vector<std::size_t> &globalDimensions,
const std::vector<std::size_t> &offsets,
const unsigned int skip,
const bool addType = false) const noexcept;
/**
* GetStats for primitive types except std::complex<T> types
* @param variable
* @return stats
*/
template <class T>
Stats<T> GetStats(const Variable<T> &variable) const noexcept
Stats<T> stats;
const std::size_t valuesSize = variable.TotalSize();
if (m_Verbosity == 0)
{
if (valuesSize >=
10000000) // ten million? this needs actual results
// //here we can make decisions for threads
// based on valuesSize
GetMinMax(variable.m_AppValues, valuesSize, stats.Min,
stats.Max,
m_Threads); // here we can add cores from constructor
else
GetMinMax(variable.m_AppValues, valuesSize, stats.Min,
stats.Max);
}
return stats;
/**
* GetStats for std::complex<T> types
* @param variable
* @return stats
*/
template <class T>
Stats<T> GetStats(const Variable<std::complex<T>> &variable) const noexcept
Stats<T> stats;
const std::size_t valuesSize = variable.TotalSize();
if (m_Verbosity == 0)
{
if (valuesSize >=
10000000) // ten million? this needs actual results
// //here we can make decisions for threads
// based on valuesSize
GetMinMax(variable.m_AppValues, valuesSize, stats.Min,
stats.Max, m_Threads);
else
GetMinMax(variable.m_AppValues, valuesSize, stats.Min,
stats.Max);
}
return stats;
template <class T>
void WriteBoundsRecord(const bool isScalar, const Stats<T> &stats,
std::vector<char> &buffer,
std::uint8_t &characteristicsCounter,
const bool addLength) const noexcept
if (isScalar == true)
{
WriteCharacteristicRecord(
characteristic_value, stats.Min, buffer, characteristicsCounter,
addLength); // stats.min = stats.max = value
return;
}
if (m_Verbosity == 0) // default verbose
{
WriteCharacteristicRecord(characteristic_min, stats.Min, buffer,
characteristicsCounter, addLength);
WriteCharacteristicRecord(characteristic_max, stats.Max, buffer,
characteristicsCounter, addLength);
}
/**
* Write a characteristic value record to buffer
* @param id
* @param value
* @param buffers
* @param positions
* @param characvteristicsCounter to be updated by 1
* @param addLength true for data, false for metadata
*/
template <class T>
void WriteCharacteristicRecord(const std::uint8_t characteristicID,
const T &value, std::vector<char> &buffer,
std::uint8_t &characteristicsCounter,
const bool addLength = false) const noexcept
if (addLength == true)
{
const std::uint16_t lengthOfCharacteristic = sizeof(T); // id
InsertToBuffer(buffer, &lengthOfCharacteristic);
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
/**
* Returns corresponding index of type BP1Index, if doesn't exists creates a
* new one.
* Used for variables and attributes
* @param name variable or attribute name to look for index
* @param indices look up hash table of indices
* @param isNew true: index is newly created, false: index already exists in
* indices
* @return reference to BP1Index in indices
*/
BP1Index &GetBP1Index(const std::string name,
std::unordered_map<std::string, BP1Index> &indices,
bool &isNew) const noexcept;
/**
* Flattens the data and fills the pg length, vars count, vars length and
* attributes
* @param metadataSet
* @param buffer
*/
void FlattenData(BP1MetadataSet &metadataSet,
capsule::STLVector &buffer) const noexcept;
/**
* Flattens the metadata indices into a single metadata buffer in capsule
* @param metadataSet
* @param buffer
*/
void FlattenMetadata(BP1MetadataSet &metadataSet,
capsule::STLVector &buffer) const noexcept;
} // end namespace format
} // end namespace adios