Newer
Older
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* Author: William F Godoy godoywf@ornl.gov
#include "BP3Serializer.h"
#include "BP3Serializer.tcc"
#include <chrono>
#include <future>
#include <string>
#include <vector>
#include "adios2/helper/adiosFunctions.h" //GetType<T>, ReadValue<T>,
// ReduceValue<T>
std::mutex BP3Serializer::m_Mutex;
BP3Serializer::BP3Serializer(MPI_Comm mpiComm, const bool debugMode)
: BP3Base(mpiComm, debugMode)
void BP3Serializer::PutProcessGroupIndex(
const std::string hostLanguage,
const std::vector<std::string> &transportsTypes) noexcept
ProfilerStart("buffering");
std::vector<char> &metadataBuffer = m_MetadataSet.PGIndex.Buffer;
std::vector<char> &dataBuffer = m_Data.m_Buffer;
size_t &dataPosition = m_Data.m_Position;
m_MetadataSet.DataPGLengthPosition = dataPosition;
dataPosition += 8; // skip pg length (8)
const std::size_t metadataPGLengthPosition = metadataBuffer.size();
metadataBuffer.insert(metadataBuffer.end(), 2, '\0'); // skip pg length (2)
const std::string name(std::to_string(m_RankMPI));
PutNameRecord(name, metadataBuffer);
// write if host language Fortran in metadata and data
const char hostFortran = (hostLanguage == "Fortran") ? 'y' : 'n';
InsertToBuffer(metadataBuffer, &hostFortran);
CopyToBuffer(dataBuffer, dataPosition, &hostFortran);
PutNameRecord(name, dataBuffer, dataPosition);
const uint32_t processID = static_cast<const uint32_t>(m_RankMPI);
InsertToBuffer(metadataBuffer, &processID);
// skip coordination var in data ....what is coordination var?
const std::string timeStepName(std::to_string(m_MetadataSet.TimeStep));
PutNameRecord(timeStepName, metadataBuffer);
PutNameRecord(timeStepName, dataBuffer, dataPosition);
InsertToBuffer(metadataBuffer, &m_MetadataSet.TimeStep);
CopyToBuffer(dataBuffer, dataPosition, &m_MetadataSet.TimeStep);
// offset to pg in data in metadata which is the current absolute position
InsertU64(metadataBuffer, m_Data.m_AbsolutePosition);
// Back to writing metadata pg index length (length of group)
const uint16_t metadataPGIndexLength = static_cast<const uint16_t>(
metadataBuffer.size() - metadataPGLengthPosition - 2);
size_t backPosition = metadataPGLengthPosition;
CopyToBuffer(metadataBuffer, backPosition, &metadataPGIndexLength);
// DONE With metadataBuffer
// here write method in data
const std::vector<uint8_t> methodIDs = GetTransportIDs(transportsTypes);
const uint8_t methodsCount = static_cast<const uint8_t>(methodIDs.size());
CopyToBuffer(dataBuffer, dataPosition, &methodsCount); // count
// methodID (1) + method params length(2), no parameters for now
const uint16_t methodsLength =
static_cast<const uint16_t>(methodsCount * 3);
CopyToBuffer(dataBuffer, dataPosition, &methodsLength); // length
CopyToBuffer(dataBuffer, dataPosition, &methodID); // method ID,
dataPosition += 2; // skip method params length = 0 (2 bytes) for now
m_Data.m_AbsolutePosition +=
dataPosition - m_MetadataSet.DataPGLengthPosition;
m_MetadataSet.DataPGVarsCount = 0;
m_MetadataSet.DataPGVarsCountPosition = dataPosition;
m_Data.m_AbsolutePosition += 12; // add vars count and length
++m_MetadataSet.DataPGCount;
m_MetadataSet.DataPGIsOpen = true;
ProfilerStop("buffering");
void BP3Serializer::SerializeData(IO &io, const bool advanceStep)
ProfilerStart("buffering");
SerializeDataBuffer(io);
if (advanceStep)
if (m_MaxBufferSize == DefaultMaxBufferSize)
{
m_MaxBufferSize = m_Data.m_Position + 64;
}
++m_MetadataSet.TimeStep;
ProfilerStop("buffering");
void BP3Serializer::CloseData(IO &io)
ProfilerStart("buffering");
SerializeMetadataInData();
m_Profiler.Bytes.at("buffering") += m_Data.m_AbsolutePosition;
ProfilerStop("buffering");
std::string BP3Serializer::GetRankProfilingJSON(
const std::vector<std::string> &transportsTypes,
const std::vector<profiling::IOChrono *> &transportsProfilers) noexcept
{
auto lf_WriterTimer = [](std::string &rankLog,
const profiling::Timer &timer) {
rankLog += "\"" + timer.m_Process + "_" + timer.GetShortUnits() +
"\": " + std::to_string(timer.m_ProcessTime) + ", ";
};
// prepare string dictionary per rank
std::string rankLog("{ \"rank\": " + std::to_string(m_RankMPI) + ", ");
std::string timeDate(profiler.Timers.at("buffering").m_LocalTimeDate);
timeDate.pop_back();
// avoid whitespace
std::replace(timeDate.begin(), timeDate.end(), ' ', '_');
rankLog += "\"start\": \"" + timeDate + "\", ";
rankLog += "\"threads\": " + std::to_string(m_Threads) + ", ";
rankLog +=
"\"bytes\": " + std::to_string(profiler.Bytes.at("buffering")) + ", ";
lf_WriterTimer(rankLog, profiler.Timers.at("buffering"));
const size_t transportsSize = transportsTypes.size();
for (unsigned int t = 0; t < transportsSize; ++t)
{
rankLog += "\"transport_" + std::to_string(t) + "\": { ";
rankLog += "\"type\": \"" + transportsTypes[t] + "\", ";
for (const auto &transportTimerPair : transportsProfilers[t]->Timers)
lf_WriterTimer(rankLog, transportTimerPair.second);
// replace last comma with space
rankLog.pop_back();
rankLog.pop_back();
rankLog += " ";
if (t == transportsSize - 1) // last element
rankLog += "}";
}
else
{
rankLog += "},";
rankLog += " }"; // end rank entry
std::vector<char>
BP3Serializer::AggregateProfilingJSON(const std::string &rankProfilingLog)
return SetCollectiveProfilingJSON(rankProfilingLog);
void BP3Serializer::AggregateCollectiveMetadata()
const uint64_t pgIndexStart = m_Metadata.m_Position;
AggregateIndex(m_MetadataSet.PGIndex, m_MetadataSet.DataPGCount);
const uint64_t variablesIndexStart = m_Metadata.m_Position;
AggregateMergeIndex(m_MetadataSet.VarsIndices);
const uint64_t attributesIndexStart = m_Metadata.m_Position;
AggregateMergeIndex(m_MetadataSet.AttributesIndices);
if (m_RankMPI == 0)
{
m_Metadata.Resize(m_Metadata.m_Position + m_MetadataSet.MiniFooterSize,
" when writing collective bp1 Minifooter");
PutMinifooter(pgIndexStart, variablesIndexStart, attributesIndexStart,
m_Metadata.m_Buffer, m_Metadata.m_Position, true);
m_Metadata.m_AbsolutePosition = m_Metadata.m_Position;
}
void BP3Serializer::PutAttributes(IO &io)
{
const auto attributesDataMap = io.GetAttributesDataMap();
auto &buffer = m_Data.m_Buffer;
auto &position = m_Data.m_Position;
auto &absolutePosition = m_Data.m_AbsolutePosition;
// used only to update m_HeapBuffer.m_DataAbsolutePosition;
const size_t attributesCountPosition = position;
// count is known ahead of time, write
const uint32_t attributesCount =
static_cast<const uint32_t>(attributesDataMap.size());
CopyToBuffer(buffer, position, &attributesCount);
// will go back
const size_t attributesLengthPosition = position;
position += 8; // skip attributes length
absolutePosition += position - attributesCountPosition;
uint32_t memberID = 0;
for (const auto &attributePair : attributesDataMap)
{
const std::string name(attributePair.first);
const std::string type(attributePair.second.first);
if (type == "unknown")
{
}
#define declare_type(T) \
else if (type == GetType<T>()) \
{ \
Stats<T> stats; \
stats.Offset = absolutePosition; \
stats.MemberID = memberID; \
Attribute<T> &attribute = *io.InquireAttribute<T>(name); \
PutAttributeInData(attribute, stats); \
PutAttributeInIndex(attribute, stats); \
}
ADIOS2_FOREACH_ATTRIBUTE_TYPE_1ARG(declare_type)
#undef declare_type
++memberID;
}
// complete attributes length
const uint64_t attributesLength =
static_cast<const uint64_t>(position - attributesLengthPosition);
size_t backPosition = attributesLengthPosition;
CopyToBuffer(buffer, backPosition, &attributesLength);
}
void BP3Serializer::PutDimensionsRecord(const Dims &localDimensions,
const Dims &globalDimensions,
const Dims &offsets,
std::vector<char> &buffer) noexcept
for (const auto localDimension : localDimensions)
InsertU64(buffer, localDimension);
buffer.insert(buffer.end(), 2 * sizeof(uint64_t), '\0');
}
else
{
for (unsigned int d = 0; d < localDimensions.size(); ++d)
InsertU64(buffer, localDimensions[d]);
InsertU64(buffer, globalDimensions[d]);
InsertU64(buffer, offsets[d]);
void BP3Serializer::PutDimensionsRecord(const Dims &localDimensions,
const Dims &globalDimensions,
const Dims &offsets,
std::vector<char> &buffer,
size_t &position,
const bool isCharacteristic) noexcept
auto lf_CopyDimension = [](std::vector<char> &buffer, size_t &position,
const size_t dimension,
const bool isCharacteristic) {
if (!isCharacteristic)
{
constexpr char no = 'n';
CopyToBuffer(buffer, position, &no);
}
const uint64_t dimension64 = static_cast<const uint64_t>(dimension);
CopyToBuffer(buffer, position, &dimension64);
unsigned int globalBoundsSkip = 18;
if (isCharacteristic)
{
globalBoundsSkip = 16;
}
for (const auto &localDimension : localDimensions)
lf_CopyDimension(buffer, position, localDimension,
isCharacteristic);
position += globalBoundsSkip;
for (unsigned int d = 0; d < localDimensions.size(); ++d)
lf_CopyDimension(buffer, position, localDimensions[d],
isCharacteristic);
lf_CopyDimension(buffer, position, globalDimensions[d],
isCharacteristic);
lf_CopyDimension(buffer, position, offsets[d], isCharacteristic);
void BP3Serializer::PutNameRecord(const std::string name,
std::vector<char> &buffer) noexcept
const uint16_t length = static_cast<const uint16_t>(name.length());
InsertToBuffer(buffer, &length);
InsertToBuffer(buffer, name.c_str(), length);
void BP3Serializer::PutNameRecord(const std::string name,
std::vector<char> &buffer,
size_t &position) noexcept
const uint16_t length = static_cast<const uint16_t>(name.length());
CopyToBuffer(buffer, position, &length);
CopyToBuffer(buffer, position, name.c_str(), length);
}
BP3Serializer::SerialElementIndex &BP3Serializer::GetSerialElementIndex(
const std::string &name,
std::unordered_map<std::string, SerialElementIndex> &indices,
bool &isNew) const noexcept
auto itName = indices.find(name);
if (itName == indices.end())
{
indices.emplace(name, SerialElementIndex(indices.size()));
isNew = true;
return indices.at(name);
}
isNew = false;
return itName->second;
void BP3Serializer::SerializeDataBuffer(IO &io) noexcept
auto &buffer = m_Data.m_Buffer;
auto &position = m_Data.m_Position;
auto &absolutePosition = m_Data.m_AbsolutePosition;
CopyToBuffer(buffer, m_MetadataSet.DataPGVarsCountPosition,
&m_MetadataSet.DataPGVarsCount);
// without record itself and vars count
const uint64_t varsLength =
position - m_MetadataSet.DataPGVarsCountPosition - 8 - 4;
CopyToBuffer(buffer, m_MetadataSet.DataPGVarsCountPosition, &varsLength);
// attributes are only written once
if (!m_MetadataSet.AreAttributesWritten)
{
m_MetadataSet.AreAttributesWritten = true;
}
else
{
position += 12;
absolutePosition += 12;
// Finish writing pg group length without record itself
const uint64_t dataPGLength =
position - m_MetadataSet.DataPGLengthPosition - 8;
CopyToBuffer(buffer, m_MetadataSet.DataPGLengthPosition, &dataPGLength);
m_MetadataSet.DataPGIsOpen = false;
void BP3Serializer::SerializeMetadataInData() noexcept
auto lf_SetIndexCountLength =
[](std::unordered_map<std::string, SerialElementIndex> &indices,
uint32_t &count, uint64_t &length) {
count = indices.size();
length = 0;
for (auto &indexPair : indices) // set each index length
{
auto &indexBuffer = indexPair.second.Buffer;
const uint32_t indexLength = indexBuffer.size() - 4;
size_t indexLengthPosition = 0;
CopyToBuffer(indexBuffer, indexLengthPosition, &indexLength);
length += indexBuffer.size(); // overall length
}
};
auto lf_FlattenIndices =
[](const uint32_t count, const uint64_t length,
const std::unordered_map<std::string, SerialElementIndex> &indices,
std::vector<char> &buffer, size_t &position) {
CopyToBuffer(buffer, position, &count);
CopyToBuffer(buffer, position, &length);
for (const auto &indexPair : indices) // set each index length
{
const auto &indexBuffer = indexPair.second.Buffer;
CopyToBuffer(buffer, position, indexBuffer.data(),
indexBuffer.size());
}
};
// Finish writing metadata counts and lengths
// PG Index
const uint64_t pgCount = m_MetadataSet.DataPGCount;
const uint64_t pgLength = m_MetadataSet.PGIndex.Buffer.size();
// var index count and length (total), and each index length
uint32_t varsCount;
uint64_t varsLength;
lf_SetIndexCountLength(m_MetadataSet.VarsIndices, varsCount, varsLength);
// attribute index count and length, and each index length
uint32_t attributesCount;
uint64_t attributesLength;
lf_SetIndexCountLength(m_MetadataSet.AttributesIndices, attributesCount,
attributesLength);
const size_t footerSize = static_cast<const size_t>(
(pgLength + 16) + (varsLength + 12) + (attributesLength + 12) +
m_MetadataSet.MiniFooterSize);
auto &buffer = m_Data.m_Buffer;
auto &position = m_Data.m_Position;
auto &absolutePosition = m_Data.m_AbsolutePosition;
// must replace with growth buffer strategy?
m_Data.Resize(position + footerSize,
" when writing metadata in bp data buffer");
CopyToBuffer(buffer, position, &pgCount);
CopyToBuffer(buffer, position, &pgLength);
CopyToBuffer(buffer, position, m_MetadataSet.PGIndex.Buffer.data(),
static_cast<const size_t>(pgLength));
lf_FlattenIndices(varsCount, varsLength, m_MetadataSet.VarsIndices, buffer,
position);
// Attribute indices
lf_FlattenIndices(attributesCount, attributesLength,
m_MetadataSet.AttributesIndices, buffer, position);
// getting absolute offset start, minifooter is 28 bytes for now
const uint64_t pgIndexStart = static_cast<const uint64_t>(absolutePosition);
const uint64_t variablesIndexStart =
static_cast<const uint64_t>(pgIndexStart + (pgLength + 16));
const uint64_t attributesIndexStart =
static_cast<const uint64_t>(variablesIndexStart + (varsLength + 12));
PutMinifooter(pgIndexStart, variablesIndexStart, attributesIndexStart,
buffer, position);
absolutePosition += footerSize;
if (m_Profiler.IsActive)
m_Profiler.Bytes.emplace("buffering", absolutePosition);
void BP3Serializer::PutMinifooter(const uint64_t pgIndexStart,
const uint64_t variablesIndexStart,
const uint64_t attributesIndexStart,
std::vector<char> &buffer, size_t &position,
const bool addSubfiles)
{
CopyToBuffer(buffer, position, &pgIndexStart);
CopyToBuffer(buffer, position, &variablesIndexStart);
CopyToBuffer(buffer, position, &attributesIndexStart);
// version
uint8_t endianness = 0; // little-endian
if (!IsLittleEndian())
CopyToBuffer(buffer, position, &endianness);
if (addSubfiles)
position += 1;
CopyToBuffer(buffer, position, &m_Version);
}
else
{
position += 2;
CopyToBuffer(buffer, position, &m_Version);
void BP3Serializer::AggregateIndex(const SerialElementIndex &index,
const size_t count)
{
auto &buffer = m_Metadata.m_Buffer;
auto &position = m_Metadata.m_Position;
size_t countPosition = position;
const size_t totalCount = ReduceValues<size_t>(count, m_MPIComm);
{
// Write count
position += 16;
m_Metadata.Resize(position, " in call to AggregateIndex bp1 metadata");
const uint64_t totalCountU64 = static_cast<const uint64_t>(totalCount);
CopyToBuffer(buffer, countPosition, &totalCountU64);
}
// write contents
GathervVectors(index.Buffer, buffer, position, m_MPIComm);
// get total length and write it after count and before index
{
const uint64_t totalLengthU64 =
static_cast<const uint64_t>(position - countPosition - 8);
CopyToBuffer(buffer, countPosition, &totalLengthU64);
}
}
void BP3Serializer::AggregateMergeIndex(
const std::unordered_map<std::string, SerialElementIndex> &indices) noexcept
{
// first serialize index
std::vector<char> serializedIndices = SerializeIndices(indices);
// gather in rank 0
std::vector<char> gatheredSerialIndices;
size_t gatheredSerialIndicesPosition = 0;
GathervVectors(serializedIndices, gatheredSerialIndices,
gatheredSerialIndicesPosition, m_MPIComm);
// deallocate local serialized Indices
std::vector<char>().swap(serializedIndices);
// deserialize in [name][rank] order
const std::unordered_map<std::string, std::vector<SerialElementIndex>>
nameRankIndices =
DeserializeIndicesPerRankThreads(gatheredSerialIndices);
// deallocate gathered serial indices (full in rank 0 only)
std::vector<char>().swap(gatheredSerialIndices);
// to write count and length
auto &buffer = m_Metadata.m_Buffer;
auto &position = m_Metadata.m_Position;
size_t countPosition = position;
{
// Write count
position += 12;
m_Metadata.Resize(position,
", in call to AggregateMergeIndex bp1 metadata");
const uint32_t totalCountU32 =
static_cast<const uint32_t>(nameRankIndices.size());
CopyToBuffer(buffer, countPosition, &totalCountU32);
}
MergeSerializeIndices(nameRankIndices);
{
// Write length
const uint64_t totalLengthU64 =
static_cast<const uint64_t>(position - countPosition - 8);
CopyToBuffer(buffer, countPosition, &totalLengthU64);
}
}
std::vector<char> BP3Serializer::SerializeIndices(
const std::unordered_map<std::string, SerialElementIndex> &indices) const
noexcept
{
std::vector<char> serializedIndices;
for (const auto &indexPair : indices)
{
const SerialElementIndex &index = indexPair.second;
// add rank at the beginning
const uint32_t rankSource = static_cast<const uint32_t>(m_RankMPI);
InsertToBuffer(serializedIndices, &rankSource);
// insert buffer
InsertToBuffer(serializedIndices, index.Buffer.data(),
index.Buffer.size());
}
return serializedIndices;
}
std::unordered_map<std::string, std::vector<BP3Base::SerialElementIndex>>
BP3Serializer::DeserializeIndicesPerRankThreads(
const std::vector<char> &serialized) const noexcept
{
auto lf_Deserialize = [&](
const int rankSource, const std::vector<char> &serialized,
const size_t serializedPosition,
std::unordered_map<std::string, std::vector<SerialElementIndex>>
&deserialized) {
size_t localPosition = serializedPosition;
ElementIndexHeader header =
ReadElementIndexHeader(serialized, localPosition);
// mutex portion
{
std::lock_guard<std::mutex> lock(m_Mutex);
if (deserialized.count(header.Name) == 0)
{
deserialized[header.Name] = std::vector<SerialElementIndex>(
m_SizeMPI, SerialElementIndex(header.MemberID, 0));
}
}
const size_t bufferSize = static_cast<const size_t>(header.Length) + 4;
SerialElementIndex &index = deserialized[header.Name][rankSource];
InsertToBuffer(index.Buffer, &serialized[serializedPosition],
bufferSize);
};
// BODY OF FUNCTION starts here
std::unordered_map<std::string, std::vector<SerialElementIndex>>
deserialized;
const size_t serializedSize = serialized.size();
if (m_RankMPI != 0 || serializedSize < 8)
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
{
return deserialized;
}
size_t serializedPosition = 0;
std::vector<std::future<void>> asyncs(m_Threads);
std::vector<size_t> asyncPositions(m_Threads);
std::vector<int> asyncRankSources(m_Threads);
bool launched = false;
while (serializedPosition < serializedSize)
{
// extract rank and index buffer size
for (unsigned int t = 0; t < m_Threads; ++t)
{
const int rankSource = static_cast<const int>(
ReadValue<uint32_t>(serialized, serializedPosition));
asyncRankSources[t] = rankSource;
asyncPositions[t] = serializedPosition;
const size_t bufferSize = static_cast<const size_t>(
ReadValue<uint32_t>(serialized, serializedPosition));
serializedPosition += bufferSize;
if (launched)
{
asyncs[t].get();
}
if (serializedPosition <= serializedSize)
{
asyncs[t] =
std::async(std::launch::async, lf_Deserialize,
asyncRankSources[t], std::ref(serialized),
asyncPositions[t], std::ref(deserialized));
}
}
launched = true;
}
for (auto &async : asyncs)
{
if (async.valid())
{
async.wait();
}
}
return deserialized;
}
void BP3Serializer::MergeSerializeIndices(
const std::unordered_map<std::string, std::vector<SerialElementIndex>>
&nameRankIndices) noexcept
{
auto lf_GetCharacteristics = [&](const std::vector<char> &buffer,
size_t &position, const uint8_t dataType,
uint8_t &count, uint32_t &length,
uint32_t &timeStep)
{
switch (dataType)
{
case (type_byte):
{
const auto characteristics =
ReadElementIndexCharacteristics<char>(buffer, position, true);
count = characteristics.EntryCount;
length = characteristics.EntryLength;
timeStep = characteristics.Statistics.Step;
break;
}
case (type_short):
{
const auto characteristics =
ReadElementIndexCharacteristics<short>(buffer, position, true);
count = characteristics.EntryCount;
length = characteristics.EntryLength;
timeStep = characteristics.Statistics.Step;
break;
}
case (type_integer):
{
const auto characteristics =
ReadElementIndexCharacteristics<int>(buffer, position, true);
count = characteristics.EntryCount;
length = characteristics.EntryLength;
timeStep = characteristics.Statistics.Step;
break;
}
case (type_long):
{
const auto characteristics =
ReadElementIndexCharacteristics<long int>(buffer, position,
true);
count = characteristics.EntryCount;
length = characteristics.EntryLength;
timeStep = characteristics.Statistics.Step;
break;
}
case (type_unsigned_byte):
{
const auto characteristics =
ReadElementIndexCharacteristics<unsigned char>(buffer, position,
true);
count = characteristics.EntryCount;
length = characteristics.EntryLength;
timeStep = characteristics.Statistics.Step;
break;
}
case (type_unsigned_short):
{
const auto characteristics =
ReadElementIndexCharacteristics<unsigned short>(buffer,
position, true);
count = characteristics.EntryCount;
length = characteristics.EntryLength;
timeStep = characteristics.Statistics.Step;
break;
}
case (type_unsigned_integer):
{
const auto characteristics =
ReadElementIndexCharacteristics<unsigned int>(buffer, position,
true);
count = characteristics.EntryCount;
length = characteristics.EntryLength;
timeStep = characteristics.Statistics.Step;
break;
}
case (type_unsigned_long):
{
auto characteristics =
ReadElementIndexCharacteristics<unsigned long int>(
buffer, position, true);
count = characteristics.EntryCount;
length = characteristics.EntryLength;
timeStep = characteristics.Statistics.Step;
break;
}
case (type_real):
{
auto characteristics =
ReadElementIndexCharacteristics<float>(buffer, position, true);
count = characteristics.EntryCount;
length = characteristics.EntryLength;
timeStep = characteristics.Statistics.Step;
break;
}
case (type_double):
{
auto characteristics =
ReadElementIndexCharacteristics<double>(buffer, position, true);
count = characteristics.EntryCount;
length = characteristics.EntryLength;
timeStep = characteristics.Statistics.Step;
break;
}
// TODO: complex, string, string array, long double
};
auto lf_MergeRank = [&](const std::vector<SerialElementIndex> &indices) {
// extract header
ElementIndexHeader header;
// index non-empty buffer
size_t firstRank = 0;
// index positions per rank
std::vector<size_t> positions(indices.size(), 0);
// merge index length
size_t headerSize = 0;
for (size_t r = 0; r < indices.size(); ++r)
{
const auto &buffer = indices[r].Buffer;
if (buffer.empty())
{
continue;
}
size_t &position = positions[r];
header = ReadElementIndexHeader(buffer, position);
firstRank = r;
// move all positions to headerSize
for (size_t r = 0; r < indices.size(); ++r)
{
const auto &buffer = indices[r].Buffer;
if (buffer.empty())
{
continue;
}
positions[r] = headerSize;
}
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
uint64_t setsCount = 0;
unsigned int currentTimeStep = 1;
bool marching = true;
std::vector<char> sorted;
while (marching)
{
marching = false;
for (size_t r = firstRank; r < indices.size(); ++r)
{
const auto &buffer = indices[r].Buffer;
if (buffer.empty())
{
continue;
}
auto &position = positions[r];
if (position < buffer.size())
{
marching = true;
}
else
{
continue;
}
uint8_t count = 0;
uint32_t length = 0;
uint32_t timeStep = static_cast<uint32_t>(currentTimeStep);
while (timeStep == currentTimeStep)
{
size_t localPosition = position;
lf_GetCharacteristics(buffer, localPosition,
header.DataType, count, length,
timeStep);
if (timeStep != currentTimeStep)
{
break;
}
++setsCount;
// here copy to sorted buffer
InsertToBuffer(sorted, &buffer[position], length + 5);
position += length + 5;
if (position >= buffer.size())
{
break;
}
}
}
++currentTimeStep;
}
const uint32_t entryLength = headerSize + sorted.size() - 4;
// Copy header to metadata buffer, need mutex here
{
std::lock_guard<std::mutex> lock(m_Mutex);
auto &buffer = m_Metadata.m_Buffer;
auto &position = m_Metadata.m_Position;
m_Metadata.Resize(buffer.size() +
static_cast<size_t>(entryLength + 4),
"in call to MergeSerializeIndices bp3 index");
CopyToBuffer(buffer, position, &entryLength);
CopyToBuffer(buffer, position, &indices[firstRank].Buffer[4],
headerSize - 8 - 4);
CopyToBuffer(buffer, position, &setsCount);
CopyToBuffer(buffer, position, sorted.data(), sorted.size());
}
};
auto lf_MergeRankRange = [&](
const std::unordered_map<std::string, std::vector<SerialElementIndex>>
&nameRankIndices,
const std::vector<std::string> &names, const size_t start,
const size_t end)
{
for (size_t i = start; i < end; ++i)
{
auto itIndex = nameRankIndices.find(names[i]);