Newer
Older
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* BP1Writer.cpp
*
* Created on: Feb 1, 2017
* Author: wfg
*/
#include "BP1Writer.h"
#include "BP1Writer.tcc"
#include <string>
#include <vector>
namespace adios
{
namespace format
{
BP1Writer::BP1Writer(MPI_Comm mpiComm, const bool debugMode)
: BP1Base(mpiComm, debugMode)
{
}
std::size_t BP1Writer::GetProcessGroupIndexSize(
const std::string name, const std::string timeStepName,
const std::size_t numberOfTransports) const noexcept
{
// pgIndex + list of methods (transports)
return (name.length() + timeStepName.length() + 23) +
}
void BP1Writer::WriteProcessGroupIndex(
const bool isFortran, const std::string name, const uint32_t processID,
const std::vector<std::shared_ptr<Transport>> &transports) noexcept
std::vector<char> &metadataBuffer = m_MetadataSet.PGIndex.Buffer;
std::vector<char> &dataBuffer = m_Heap.m_Data;
size_t &dataPosition = m_Heap.m_DataPosition;
m_MetadataSet.DataPGLengthPosition = dataPosition;
dataPosition += 8; // skip pg length (8)
const std::size_t metadataPGLengthPosition = metadataBuffer.size();
metadataBuffer.insert(metadataBuffer.end(), 2, 0); // skip pg length (2)
// write name to metadata
WriteNameRecord(name, metadataBuffer);
// write if host language Fortran in metadata and data
const char hostFortran = (isFortran) ? 'y' : 'n';
InsertToBuffer(metadataBuffer, &hostFortran);
CopyToBuffer(dataBuffer, dataPosition, &hostFortran);
WriteNameRecord(name, dataBuffer, dataPosition);
InsertToBuffer(metadataBuffer, &processID);
// skip coordination var in data ....what is coordination var?
const std::string timeStepName(std::to_string(m_MetadataSet.TimeStep));
WriteNameRecord(timeStepName, dataBuffer, dataPosition);
InsertToBuffer(metadataBuffer, &m_MetadataSet.TimeStep);
CopyToBuffer(dataBuffer, dataPosition, &m_MetadataSet.TimeStep);
// offset to pg in data in metadata which is the current absolute position
InsertToBuffer(metadataBuffer, reinterpret_cast<uint64_t *>(
&m_Heap.m_DataAbsolutePosition));
// Back to writing metadata pg index length (length of group)
const uint16_t metadataPGIndexLength =
metadataBuffer.size() - metadataPGLengthPosition - 2;
size_t backPosition = metadataPGLengthPosition;
CopyToBuffer(metadataBuffer, backPosition, &metadataPGIndexLength);
// DONE With metadataBuffer
// here write method in data
const std::vector<uint8_t> methodIDs = GetMethodIDs(transports);
const uint8_t methodsCount = methodIDs.size();
CopyToBuffer(dataBuffer, dataPosition, &methodsCount); // count
// methodID (1) + method params length(2), no parameters for now
const uint16_t methodsLength = methodIDs.size() * 3;
CopyToBuffer(dataBuffer, dataPosition, &methodsLength); // length
CopyToBuffer(dataBuffer, dataPosition, &methodID); // method ID,
dataPosition += 2; // skip method params length = 0 (2 bytes) for now
m_Heap.m_DataAbsolutePosition +=
dataPosition - m_MetadataSet.DataPGLengthPosition;
m_MetadataSet.DataPGVarsCount = 0;
m_MetadataSet.DataPGVarsCountPosition = dataPosition;
dataPosition += 12;
m_Heap.m_DataAbsolutePosition += 12; // add vars count and length
++m_MetadataSet.DataPGCount;
m_MetadataSet.DataPGIsOpen = true;
void BP1Writer::Advance() { FlattenData(); }
void BP1Writer::Close(Transport &transport, bool &isFirstClose,
const bool doAggregation) noexcept
if (m_MetadataSet.Log.IsActive == true)
{
m_MetadataSet.Log.Timers[0].SetInitialTime();
}
if (m_MetadataSet.DataPGIsOpen == true)
{
FlattenData();
}
if (m_MetadataSet.Log.IsActive == true)
{
m_MetadataSet.Log.Timers[0].SetInitialTime();
}
// N-to-M where 1 <= M <= N-1, might need a new
// Log metadataSet.Log.m_Timers just for aggregation
if (doAggregation == true)
{
// here call aggregator
}
isFirstClose = false;
}
if (doAggregation == true) // N-to-M where 1 <= M <= N-1
{
// here call aggregator to select transports for Write and Close
}
else // N-to-N
// single write
transport.Write(m_Heap.m_Data.data(), m_Heap.m_Data.size());
void BP1Writer::WriteProfilingLogFile(
const std::string &name, const unsigned int rank,
const std::vector<std::shared_ptr<Transport>> &transports) noexcept
const std::string fileName(GetDirectoryName(name) + "/profiling.log");
const std::string rankLog(GetRankProfilingLog(rank, transports));
m_BP1Aggregator.WriteProfilingLog(fileName, rankLog);
}
// PRIVATE FUNCTIONS
void BP1Writer::WriteDimensionsRecord(
const std::vector<size_t> &localDimensions,
const std::vector<size_t> &globalDimensions,
const std::vector<size_t> &offsets, std::vector<char> &buffer) noexcept
{
if (offsets.empty() == true)
for (const auto &localDimension : localDimensions)
InsertToBuffer(buffer,
reinterpret_cast<const uint64_t *>(&localDimension));
buffer.insert(buffer.end(), 2 * sizeof(uint64_t), 0);
}
else
{
for (unsigned int d = 0; d < localDimensions.size(); ++d)
InsertToBuffer(buffer, reinterpret_cast<const uint64_t *>(
&localDimensions[d]));
InsertToBuffer(buffer, reinterpret_cast<const uint64_t *>(
&globalDimensions[d]));
InsertToBuffer(buffer,
reinterpret_cast<const uint64_t *>(&offsets[d]));
}
void BP1Writer::WriteDimensionsRecord(
const std::vector<size_t> &localDimensions,
const std::vector<size_t> &globalDimensions,
const std::vector<size_t> &offsets, const unsigned int skip,
std::vector<char> &buffer, size_t &position) noexcept
auto lf_WriteFlaggedDim = [](std::vector<char> &buffer, size_t &position,
const size_t dimension) {
constexpr char no = 'n';
CopyToBuffer(buffer, position, &no);
CopyToBuffer(buffer, position,
reinterpret_cast<const uint64_t *>(&dimension));
for (const auto &localDimension : localDimensions)
lf_WriteFlaggedDim(buffer, position, localDimension);
position += skip;
for (unsigned int d = 0; d < localDimensions.size(); ++d)
lf_WriteFlaggedDim(buffer, position, localDimensions[d]);
lf_WriteFlaggedDim(buffer, position, globalDimensions[d]);
lf_WriteFlaggedDim(buffer, position, offsets[d]);
}
}
void BP1Writer::WriteNameRecord(const std::string name,
InsertToBuffer(buffer, &length);
InsertToBuffer(buffer, name.c_str(), length);
void BP1Writer::WriteNameRecord(const std::string name,
std::vector<char> &buffer,
size_t &position) noexcept
{
const uint16_t length = name.length();
CopyToBuffer(buffer, position, &length);
CopyToBuffer(buffer, position, name.c_str(), length);
}
BP1Index &
BP1Writer::GetBP1Index(const std::string name,
std::unordered_map<std::string, BP1Index> &indices,
bool &isNew) const noexcept
{
auto itName = indices.find(name);
if (itName == indices.end())
{
indices.emplace(name, BP1Index(indices.size()));
isNew = true;
return indices.at(name);
}
isNew = false;
return itName->second;
auto &buffer = m_Heap.m_Data;
auto &position = m_Heap.m_DataPosition;
CopyToBuffer(buffer, m_MetadataSet.DataPGVarsCountPosition,
&m_MetadataSet.DataPGVarsCount);
// without record itself and vars count
const uint64_t varsLength =
position - m_MetadataSet.DataPGVarsCountPosition - 8 - 4;
CopyToBuffer(buffer, m_MetadataSet.DataPGVarsCountPosition, &varsLength);
// attributes (empty for now) count (4) and length (8) are zero by moving
// positions in time step zero
position += 12;
m_Heap.m_DataAbsolutePosition += 12;
// without record itself, 12 due to empty attributes
const uint64_t dataPGLength =
position - m_MetadataSet.DataPGLengthPosition - 8;
CopyToBuffer(buffer, m_MetadataSet.DataPGLengthPosition, &dataPGLength);
++m_MetadataSet.TimeStep;
m_MetadataSet.DataPGIsOpen = false;
void BP1Writer::FlattenMetadata() noexcept
[](std::unordered_map<std::string, BP1Index> &indices, uint32_t &count,
uint64_t &length) {
count = indices.size();
length = 0;
for (auto &indexPair : indices) // set each index length
{
auto &indexBuffer = indexPair.second.Buffer;
const uint32_t indexLength = indexBuffer.size() - 4;
size_t indexLengthPosition = 0;
CopyToBuffer(indexBuffer, indexLengthPosition, &indexLength);
length += indexBuffer.size(); // overall length
}
};
auto lf_FlattenIndices =
[](const uint32_t count, const uint64_t length,
const std::unordered_map<std::string, BP1Index> &indices,
std::vector<char> &buffer, size_t &position) {
CopyToBuffer(buffer, position, &count);
CopyToBuffer(buffer, position, &length);
for (const auto &indexPair : indices) // set each index length
{
const auto &indexBuffer = indexPair.second.Buffer;
CopyToBuffer(buffer, position, indexBuffer.data(),
indexBuffer.size());
}
};
// Finish writing metadata counts and lengths
// PG Index
const uint64_t pgCount = m_MetadataSet.DataPGCount;
const uint64_t pgLength = m_MetadataSet.PGIndex.Buffer.size();
// var index count and length (total), and each index length
uint32_t varsCount;
uint64_t varsLength;
lf_IndexCountLength(m_MetadataSet.VarsIndices, varsCount, varsLength);
// attribute index count and length, and each index length
uint32_t attributesCount;
uint64_t attributesLength;
lf_IndexCountLength(m_MetadataSet.AttributesIndices, attributesCount,
const size_t footerSize = (pgLength + 16) + (varsLength + 12) +
(attributesLength + 12) +
m_MetadataSet.MiniFooterSize;
auto &buffer = m_Heap.m_Data;
auto &position = m_Heap.m_DataPosition;
// reserve data to fit metadata,
buffer.resize(position + footerSize);
// must replace with growth buffer strategy?
CopyToBuffer(buffer, position, &pgCount);
CopyToBuffer(buffer, position, &pgLength);
CopyToBuffer(buffer, position, m_MetadataSet.PGIndex.Buffer.data(),
pgLength);
lf_FlattenIndices(varsCount, varsLength, m_MetadataSet.VarsIndices, buffer,
position);
// Attribute indices
lf_FlattenIndices(attributesCount, attributesLength,
m_MetadataSet.AttributesIndices, buffer, position);
// getting absolute offsets, minifooter is 28 bytes for now
const uint64_t offsetPGIndex = m_Heap.m_DataAbsolutePosition;
const uint64_t offsetVarsIndex = offsetPGIndex + (pgLength + 16);
const uint64_t offsetAttributeIndex = offsetVarsIndex + (varsLength + 12);
CopyToBuffer(buffer, position, &offsetPGIndex);
CopyToBuffer(buffer, position, &offsetVarsIndex);
CopyToBuffer(buffer, position, &offsetAttributeIndex);
const uint8_t endian = 0;
CopyToBuffer(buffer, position, &endian);
position += 2;
CopyToBuffer(buffer, position, &m_Version);
m_Heap.m_DataAbsolutePosition += footerSize;
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
if (m_MetadataSet.Log.IsActive == true)
{
m_MetadataSet.Log.TotalBytes.push_back(m_Heap.m_DataAbsolutePosition);
}
}
std::string BP1Writer::GetRankProfilingLog(
const unsigned int rank,
const std::vector<std::shared_ptr<Transport>> &transports) noexcept
{
auto lf_WriterTimer = [](std::string &rankLog,
const profiling::Timer &timer) {
rankLog += "'" + timer.m_Process + "_" + timer.GetUnits() + "': " +
std::to_string(timer.m_ProcessTime);
};
// prepare string dictionary per rank
std::string rankLog("'rank_" + std::to_string(rank) + "': { ");
auto &profiler = m_MetadataSet.Log;
rankLog += "'bytes': " + std::to_string(profiler.TotalBytes[0]) + ", ";
lf_WriterTimer(rankLog, profiler.Timers[0]);
rankLog += ", ";
for (unsigned int t = 0; t < transports.size(); ++t)
{
rankLog += "'transport_" + std::to_string(t) + "': { ";
rankLog += "'lib': '" + transports[t]->m_Type + "', ";
for (unsigned int i = 0; i < 3; ++i)
{
lf_WriterTimer(rankLog, transports[t]->m_Profiler.Timers[i]);
if (i < 2)
{
rankLog += ", ";
}
else
{
rankLog += " ";
}
}
if (t == transports.size() - 1) // last element
{
rankLog += "}";
}
else
{
rankLog += "},";
}
}
rankLog += " },";
return rankLog;
//------------------------------------------------------------------------------
// Explicit instantiation of only public templates
#define declare_template_instantiation(T) \
template BP1Writer::ResizeResult BP1Writer::ResizeBuffer( \
const Variable<T> &variable); \
\
template void BP1Writer::WriteVariablePayload( \
const Variable<T> &variable) noexcept; \
Atkins, Charles Vernon
committed
\
template void BP1Writer::WriteVariableMetadata( \
Atkins, Charles Vernon
committed
ADIOS_FOREACH_TYPE_1ARG(declare_template_instantiation)
#undef declare_template_instantiation
//------------------------------------------------------------------------------
} // end namespace format
} // end namespace adios