Newer
Older
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* BP1Writer.cpp
*
* Created on: Feb 1, 2017
* Author: William F Godoy godoywf@ornl.gov
#include "BP1Writer.h"
#include "BP1Writer.tcc"
#include <chrono>
#include <future>
#include <string>
#include <vector>
#include "adios2/helper/adiosFunctions.h" //GetType<T>, ReadValue<T>,
// ReduceValue<T>
std::mutex BP1Writer::m_Mutex;
BP1Writer::BP1Writer(MPI_Comm mpiComm, const bool debugMode)
: BP1Base(mpiComm, debugMode)
{
}
const std::string hostLanguage,
const std::vector<std::string> &transportsTypes) noexcept
ProfilerStart("buffering");
std::vector<char> &metadataBuffer = m_MetadataSet.PGIndex.Buffer;
std::vector<char> &dataBuffer = m_Data.m_Buffer;
size_t &dataPosition = m_Data.m_Position;
m_MetadataSet.DataPGLengthPosition = dataPosition;
dataPosition += 8; // skip pg length (8)
const std::size_t metadataPGLengthPosition = metadataBuffer.size();
metadataBuffer.insert(metadataBuffer.end(), 2, '\0'); // skip pg length (2)
const std::string name(std::to_string(m_BP1Aggregator.m_RankMPI));
WriteNameRecord(name, metadataBuffer);
// write if host language Fortran in metadata and data
const char hostFortran = (hostLanguage == "Fortran") ? 'y' : 'n';
InsertToBuffer(metadataBuffer, &hostFortran);
CopyToBuffer(dataBuffer, dataPosition, &hostFortran);
WriteNameRecord(name, dataBuffer, dataPosition);
const uint32_t processID =
static_cast<const uint32_t>(m_BP1Aggregator.m_RankMPI);
InsertToBuffer(metadataBuffer, &processID);
// skip coordination var in data ....what is coordination var?
const std::string timeStepName(std::to_string(m_MetadataSet.TimeStep));
WriteNameRecord(timeStepName, dataBuffer, dataPosition);
InsertToBuffer(metadataBuffer, &m_MetadataSet.TimeStep);
CopyToBuffer(dataBuffer, dataPosition, &m_MetadataSet.TimeStep);
// offset to pg in data in metadata which is the current absolute position
InsertU64(metadataBuffer, m_Data.m_AbsolutePosition);
// Back to writing metadata pg index length (length of group)
const uint16_t metadataPGIndexLength = static_cast<const uint16_t>(
metadataBuffer.size() - metadataPGLengthPosition - 2);
size_t backPosition = metadataPGLengthPosition;
CopyToBuffer(metadataBuffer, backPosition, &metadataPGIndexLength);
// DONE With metadataBuffer
// here write method in data
const std::vector<uint8_t> methodIDs = GetTransportIDs(transportsTypes);
const uint8_t methodsCount = static_cast<const uint8_t>(methodIDs.size());
CopyToBuffer(dataBuffer, dataPosition, &methodsCount); // count
// methodID (1) + method params length(2), no parameters for now
const uint16_t methodsLength =
static_cast<const uint16_t>(methodsCount * 3);
CopyToBuffer(dataBuffer, dataPosition, &methodsLength); // length
CopyToBuffer(dataBuffer, dataPosition, &methodID); // method ID,
dataPosition += 2; // skip method params length = 0 (2 bytes) for now
m_Data.m_AbsolutePosition +=
dataPosition - m_MetadataSet.DataPGLengthPosition;
m_MetadataSet.DataPGVarsCount = 0;
m_MetadataSet.DataPGVarsCountPosition = dataPosition;
m_Data.m_AbsolutePosition += 12; // add vars count and length
++m_MetadataSet.DataPGCount;
m_MetadataSet.DataPGIsOpen = true;
ProfilerStop("buffering");
void BP1Writer::Advance(IO &io)
ProfilerStart("buffering");
if (m_MaxBufferSize == DefaultMaxBufferSize)
{
m_MaxBufferSize = m_Data.m_Position + 64;
SerializeData(io);
++m_MetadataSet.TimeStep;
ProfilerStop("buffering");
}
void BP1Writer::Flush(IO &io)
ProfilerStart("buffering");
SerializeData(io);
ProfilerStop("buffering");
void BP1Writer::Close(IO &io) noexcept
ProfilerStart("buffering");
SerializeData(io);
SerializeMetadataInData();
m_Profiler.Bytes.at("buffering") += m_Data.m_AbsolutePosition;
ProfilerStop("buffering");
std::string BP1Writer::GetRankProfilingJSON(
const std::vector<std::string> &transportsTypes,
const std::vector<profiling::IOChrono *> &transportsProfilers) noexcept
{
auto lf_WriterTimer = [](std::string &rankLog,
const profiling::Timer &timer) {
rankLog += "\"" + timer.m_Process + "_" + timer.GetShortUnits() +
"\": " + std::to_string(timer.m_ProcessTime) + ", ";
};
// prepare string dictionary per rank
std::string rankLog("{ \"rank\": " +
std::to_string(m_BP1Aggregator.m_RankMPI) + ", ");
std::string timeDate(profiler.Timers.at("buffering").m_LocalTimeDate);
timeDate.pop_back();
// avoid whitespace
std::replace(timeDate.begin(), timeDate.end(), ' ', '_');
rankLog += "\"start\": \"" + timeDate + "\", ";
rankLog += "\"threads\": " + std::to_string(m_Threads) + ", ";
rankLog +=
"\"bytes\": " + std::to_string(profiler.Bytes.at("buffering")) + ", ";
lf_WriterTimer(rankLog, profiler.Timers.at("buffering"));
const size_t transportsSize = transportsTypes.size();
for (unsigned int t = 0; t < transportsSize; ++t)
{
rankLog += "\"transport_" + std::to_string(t) + "\": { ";
rankLog += "\"type\": \"" + transportsTypes[t] + "\", ";
for (const auto &transportTimerPair : transportsProfilers[t]->Timers)
lf_WriterTimer(rankLog, transportTimerPair.second);
// replace last comma with space
rankLog.pop_back();
rankLog.pop_back();
rankLog += " ";
if (t == transportsSize - 1) // last element
rankLog += "}";
}
else
{
rankLog += "},";
rankLog += " }"; // end rank entry
std::vector<char>
BP1Writer::AggregateProfilingJSON(const std::string &rankProfilingLog)
{
return m_BP1Aggregator.SetCollectiveProfilingJSON(rankProfilingLog);
}
void BP1Writer::AggregateCollectiveMetadata()
AggregateIndex(m_MetadataSet.PGIndex, m_MetadataSet.DataPGCount);
AggregateMergeIndex(m_MetadataSet.VarsIndices);
AggregateMergeIndex(m_MetadataSet.AttributesIndices);
void BP1Writer::WriteAttributes(IO &io)
{
const auto attributesDataMap = io.GetAttributesDataMap();
auto &buffer = m_Data.m_Buffer;
auto &position = m_Data.m_Position;
auto &absolutePosition = m_Data.m_AbsolutePosition;
// used only to update m_HeapBuffer.m_DataAbsolutePosition;
const size_t attributesCountPosition = position;
// count is known ahead of time, write
const uint32_t attributesCount =
static_cast<const uint32_t>(attributesDataMap.size());
CopyToBuffer(buffer, position, &attributesCount);
// will go back
const size_t attributesLengthPosition = position;
position += 8; // skip attributes length
absolutePosition += position - attributesCountPosition;
uint32_t memberID = 0;
for (const auto &attributePair : attributesDataMap)
{
const std::string name(attributePair.first);
const std::string type(attributePair.second.first);
if (type == "unknown")
{
}
#define declare_type(T) \
else if (type == GetType<T>()) \
{ \
Stats<T> stats; \
stats.Offset = absolutePosition; \
stats.MemberID = memberID; \
Attribute<T> &attribute = io.GetAttribute<T>(name); \
WriteAttributeInData(attribute, stats); \
WriteAttributeInIndex(attribute, stats); \
}
ADIOS2_FOREACH_ATTRIBUTE_TYPE_1ARG(declare_type)
#undef declare_type
++memberID;
}
// complete attributes length
const uint64_t attributesLength =
static_cast<const uint64_t>(position - attributesLengthPosition);
size_t backPosition = attributesLengthPosition;
CopyToBuffer(buffer, backPosition, &attributesLength);
}
void BP1Writer::WriteDimensionsRecord(const Dims &localDimensions,
const Dims &globalDimensions,
const Dims &offsets,
for (const auto localDimension : localDimensions)
InsertU64(buffer, localDimension);
buffer.insert(buffer.end(), 2 * sizeof(uint64_t), '\0');
}
else
{
for (unsigned int d = 0; d < localDimensions.size(); ++d)
InsertU64(buffer, localDimensions[d]);
InsertU64(buffer, globalDimensions[d]);
InsertU64(buffer, offsets[d]);
void BP1Writer::WriteDimensionsRecord(const Dims &localDimensions,
const Dims &globalDimensions,
const Dims &offsets,
size_t &position,
const bool isCharacteristic) noexcept
auto lf_CopyDimension = [](std::vector<char> &buffer, size_t &position,
const size_t dimension,
const bool isCharacteristic) {
if (!isCharacteristic)
{
constexpr char no = 'n';
CopyToBuffer(buffer, position, &no);
}
const uint64_t dimension64 = static_cast<const uint64_t>(dimension);
CopyToBuffer(buffer, position, &dimension64);
unsigned int globalBoundsSkip = 18;
if (isCharacteristic)
{
globalBoundsSkip = 16;
}
for (const auto &localDimension : localDimensions)
lf_CopyDimension(buffer, position, localDimension,
isCharacteristic);
position += globalBoundsSkip;
for (unsigned int d = 0; d < localDimensions.size(); ++d)
lf_CopyDimension(buffer, position, localDimensions[d],
isCharacteristic);
lf_CopyDimension(buffer, position, globalDimensions[d],
isCharacteristic);
lf_CopyDimension(buffer, position, offsets[d], isCharacteristic);
}
}
void BP1Writer::WriteNameRecord(const std::string name,
const uint16_t length = static_cast<const uint16_t>(name.length());
InsertToBuffer(buffer, &length);
InsertToBuffer(buffer, name.c_str(), length);
void BP1Writer::WriteNameRecord(const std::string name,
std::vector<char> &buffer,
size_t &position) noexcept
{
const uint16_t length = static_cast<const uint16_t>(name.length());
CopyToBuffer(buffer, position, &length);
CopyToBuffer(buffer, position, name.c_str(), length);
}
BP1Writer::SerialElementIndex &BP1Writer::GetSerialElementIndex(
const std::string &name,
std::unordered_map<std::string, SerialElementIndex> &indices,
bool &isNew) const noexcept
auto itName = indices.find(name);
if (itName == indices.end())
{
indices.emplace(name, SerialElementIndex(indices.size()));
isNew = true;
return indices.at(name);
}
isNew = false;
return itName->second;
void BP1Writer::SerializeData(IO &io) noexcept
auto &buffer = m_Data.m_Buffer;
auto &position = m_Data.m_Position;
auto &absolutePosition = m_Data.m_AbsolutePosition;
CopyToBuffer(buffer, m_MetadataSet.DataPGVarsCountPosition,
&m_MetadataSet.DataPGVarsCount);
// without record itself and vars count
const uint64_t varsLength =
position - m_MetadataSet.DataPGVarsCountPosition - 8 - 4;
CopyToBuffer(buffer, m_MetadataSet.DataPGVarsCountPosition, &varsLength);
// attributes are only written once
if (!m_MetadataSet.AreAttributesWritten)
{
WriteAttributes(io);
m_MetadataSet.AreAttributesWritten = true;
}
else
{
position += 12;
absolutePosition += 12;
// Finish writing pg group length without record itself
const uint64_t dataPGLength =
position - m_MetadataSet.DataPGLengthPosition - 8;
CopyToBuffer(buffer, m_MetadataSet.DataPGLengthPosition, &dataPGLength);
m_MetadataSet.DataPGIsOpen = false;
void BP1Writer::SerializeMetadataInData() noexcept
auto lf_SetIndexCountLength =
[](std::unordered_map<std::string, SerialElementIndex> &indices,
uint32_t &count, uint64_t &length) {
count = indices.size();
length = 0;
for (auto &indexPair : indices) // set each index length
{
auto &indexBuffer = indexPair.second.Buffer;
const uint32_t indexLength = indexBuffer.size() - 4;
size_t indexLengthPosition = 0;
CopyToBuffer(indexBuffer, indexLengthPosition, &indexLength);
length += indexBuffer.size(); // overall length
}
};
auto lf_FlattenIndices =
[](const uint32_t count, const uint64_t length,
const std::unordered_map<std::string, SerialElementIndex> &indices,
std::vector<char> &buffer, size_t &position) {
CopyToBuffer(buffer, position, &count);
CopyToBuffer(buffer, position, &length);
for (const auto &indexPair : indices) // set each index length
{
const auto &indexBuffer = indexPair.second.Buffer;
CopyToBuffer(buffer, position, indexBuffer.data(),
indexBuffer.size());
}
};
// Finish writing metadata counts and lengths
// PG Index
const uint64_t pgCount = m_MetadataSet.DataPGCount;
const uint64_t pgLength = m_MetadataSet.PGIndex.Buffer.size();
// var index count and length (total), and each index length
uint32_t varsCount;
uint64_t varsLength;
lf_SetIndexCountLength(m_MetadataSet.VarsIndices, varsCount, varsLength);
// attribute index count and length, and each index length
uint32_t attributesCount;
uint64_t attributesLength;
lf_SetIndexCountLength(m_MetadataSet.AttributesIndices, attributesCount,
attributesLength);
const size_t footerSize = static_cast<const size_t>(
(pgLength + 16) + (varsLength + 12) + (attributesLength + 12) +
m_MetadataSet.MiniFooterSize);
auto &buffer = m_Data.m_Buffer;
auto &position = m_Data.m_Position;
auto &absolutePosition = m_Data.m_AbsolutePosition;
// must replace with growth buffer strategy?
m_Data.Resize(position + footerSize,
" when writing metadata in bp data buffer");
CopyToBuffer(buffer, position, &pgCount);
CopyToBuffer(buffer, position, &pgLength);
CopyToBuffer(buffer, position, m_MetadataSet.PGIndex.Buffer.data(),
static_cast<const size_t>(pgLength));
lf_FlattenIndices(varsCount, varsLength, m_MetadataSet.VarsIndices, buffer,
position);
// Attribute indices
lf_FlattenIndices(attributesCount, attributesLength,
m_MetadataSet.AttributesIndices, buffer, position);
// getting absolute offsets, minifooter is 28 bytes for now
const uint64_t offsetPGIndex =
static_cast<const uint64_t>(absolutePosition);
const uint64_t offsetVarsIndex =
static_cast<const uint64_t>(offsetPGIndex + (pgLength + 16));
const uint64_t offsetAttributeIndex =
static_cast<const uint64_t>(offsetVarsIndex + (varsLength + 12));
CopyToBuffer(buffer, position, &offsetPGIndex);
CopyToBuffer(buffer, position, &offsetVarsIndex);
CopyToBuffer(buffer, position, &offsetAttributeIndex);
const uint8_t endian = 0;
CopyToBuffer(buffer, position, &endian);
position += 2;
CopyToBuffer(buffer, position, &m_Version);
absolutePosition += footerSize;
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
m_Profiler.Bytes.emplace("buffering", absolutePosition);
}
}
void BP1Writer::AggregateIndex(const SerialElementIndex &index,
const size_t count)
{
auto &buffer = m_Metadata.m_Buffer;
auto &position = m_Metadata.m_Position;
size_t countPosition = position;
const size_t totalCount =
ReduceValues<size_t>(count, m_BP1Aggregator.m_MPIComm);
if (m_BP1Aggregator.m_RankMPI == 0)
{
// Write count
position += 16;
m_Metadata.Resize(position, " in call to AggregateIndex bp1 metadata");
const uint64_t totalCountU64 = static_cast<const uint64_t>(totalCount);
CopyToBuffer(buffer, countPosition, &totalCountU64);
}
// write contents
GathervVectors(index.Buffer, buffer, position, m_BP1Aggregator.m_MPIComm);
// get total length and write it after count and before index
if (m_BP1Aggregator.m_RankMPI == 0)
{
const uint64_t totalLengthU64 =
static_cast<const uint64_t>(position - countPosition - 8);
CopyToBuffer(buffer, countPosition, &totalLengthU64);
}
}
void BP1Writer::AggregateMergeIndex(
const std::unordered_map<std::string, SerialElementIndex> &indices) noexcept
{
// first serialize index
std::vector<char> serializedIndices = SerializeIndices(indices);
// gather in rank 0
std::vector<char> gatheredSerialIndices;
size_t gatheredSerialIndicesPosition = 0;
GathervVectors(serializedIndices, gatheredSerialIndices,
gatheredSerialIndicesPosition, m_BP1Aggregator.m_MPIComm);
// deallocate local serialized Indices
std::vector<char>().swap(serializedIndices);
// deserialize in [name][rank] order
const std::unordered_map<std::string, std::vector<SerialElementIndex>>
nameRankIndices =
DeserializeIndicesPerRankThreads(gatheredSerialIndices);
// deallocate gathered serial indices (full in rank 0 only)
std::vector<char>().swap(gatheredSerialIndices);
// to write count and length
auto &buffer = m_Metadata.m_Buffer;
auto &position = m_Metadata.m_Position;
size_t countPosition = position;
if (m_BP1Aggregator.m_RankMPI == 0)
{
// Write count
position += 12;
m_Metadata.Resize(position,
", in call to AggregateMergeIndex bp1 metadata");
const uint64_t totalCountU64 =
static_cast<const uint64_t>(nameRankIndices.size());
CopyToBuffer(buffer, countPosition, &totalCountU64);
}
MergeSerializeIndices(nameRankIndices);
if (m_BP1Aggregator.m_RankMPI == 0)
{
// Write length
const uint64_t totalLengthU64 =
static_cast<const uint64_t>(position - countPosition - 8);
CopyToBuffer(buffer, countPosition, &totalLengthU64);
}
}
std::vector<char> BP1Writer::SerializeIndices(
const std::unordered_map<std::string, SerialElementIndex> &indices) const
noexcept
{
std::vector<char> serializedIndices;
for (const auto &indexPair : indices)
{
const SerialElementIndex &index = indexPair.second;
// add rank at the beginning
const uint32_t rankSource =
static_cast<const uint32_t>(m_BP1Aggregator.m_RankMPI);
InsertToBuffer(serializedIndices, &rankSource);
// insert buffer
InsertToBuffer(serializedIndices, index.Buffer.data(),
index.Buffer.size());
}
return serializedIndices;
}
std::unordered_map<std::string, std::vector<BP1Base::SerialElementIndex>>
BP1Writer::DeserializeIndicesPerRankThreads(
const std::vector<char> &serialized) const noexcept
{
auto lf_Deserialize = [&](
const int rankSource, const std::vector<char> &serialized,
const size_t serializedPosition,
std::unordered_map<std::string, std::vector<SerialElementIndex>>
&deserialized) {
size_t localPosition = serializedPosition;
ElementIndexHeader header =
ReadElementIndexHeader(serialized, localPosition);
// mutex portion
{
std::lock_guard<std::mutex> lock(m_Mutex);
// inside mutex to avoid race condition
if (deserialized.count(header.Name) == 0)
{
deserialized[header.Name] = std::vector<SerialElementIndex>(
m_BP1Aggregator.m_SizeMPI,
SerialElementIndex(header.MemberID, 0));
}
}
const size_t bufferSize = static_cast<const size_t>(header.Length) + 4;
SerialElementIndex &index = deserialized[header.Name][rankSource];
InsertToBuffer(index.Buffer, &serialized[serializedPosition],
bufferSize);
};
// BODY OF FUNCTION starts here
std::unordered_map<std::string, std::vector<SerialElementIndex>>
deserialized;
const size_t serializedSize = serialized.size();
if (m_BP1Aggregator.m_RankMPI != 0 || serializedSize < 8)
{
return deserialized;
}
size_t serializedPosition = 0;
std::vector<std::future<void>> asyncs(m_Threads);
std::vector<size_t> asyncPositions(m_Threads);
std::vector<int> asyncRankSources(m_Threads);
bool launched = false;
while (serializedPosition < serializedSize)
{
// extract rank and index buffer size
for (unsigned int t = 0; t < m_Threads; ++t)
{
const int rankSource = static_cast<const int>(
ReadValue<uint32_t>(serialized, serializedPosition));
asyncRankSources[t] = rankSource;
asyncPositions[t] = serializedPosition;
const size_t bufferSize = static_cast<const size_t>(
ReadValue<uint32_t>(serialized, serializedPosition));
serializedPosition += bufferSize;
if (launched)
{
asyncs[t].get();
}
if (serializedPosition <= serializedSize)
{
asyncs[t] =
std::async(std::launch::async, lf_Deserialize,
asyncRankSources[t], std::ref(serialized),
asyncPositions[t], std::ref(deserialized));
}
}
launched = true;
}
for (auto &async : asyncs)
{
if (async.valid())
{
async.wait();
}
}
return deserialized;
}
void BP1Writer::MergeSerializeIndices(
const std::unordered_map<std::string, std::vector<SerialElementIndex>>
&nameRankIndices) noexcept
{
auto lf_GetCharacteristics = [&](const std::vector<char> &buffer,
size_t &position, const uint8_t dataType,
uint8_t &count, uint32_t &length,
uint32_t &timeStep)
{
switch (dataType)
{
case (type_byte):
{
const auto characteristics =
ReadElementIndexCharacteristics<char>(buffer, position, true);
count = characteristics.Count;
length = characteristics.Length;
timeStep = characteristics.Statistics.TimeStep;
break;
}
case (type_short):
{
const auto characteristics =
ReadElementIndexCharacteristics<short>(buffer, position, true);
count = characteristics.Count;
length = characteristics.Length;
timeStep = characteristics.Statistics.TimeStep;
break;
}
case (type_integer):
{
const auto characteristics =
ReadElementIndexCharacteristics<int>(buffer, position, true);
count = characteristics.Count;
length = characteristics.Length;
timeStep = characteristics.Statistics.TimeStep;
break;
}
case (type_long):
{
const auto characteristics =
ReadElementIndexCharacteristics<long int>(buffer, position,
true);
count = characteristics.Count;
length = characteristics.Length;
timeStep = characteristics.Statistics.TimeStep;
break;
}
case (type_unsigned_byte):
{
const auto characteristics =
ReadElementIndexCharacteristics<unsigned char>(buffer, position,
true);
count = characteristics.Count;
length = characteristics.Length;
timeStep = characteristics.Statistics.TimeStep;
break;
}
case (type_unsigned_short):
{
const auto characteristics =
ReadElementIndexCharacteristics<unsigned short>(buffer,
position, true);
count = characteristics.Count;
length = characteristics.Length;
timeStep = characteristics.Statistics.TimeStep;
break;
}
case (type_unsigned_integer):
{
const auto characteristics =
ReadElementIndexCharacteristics<unsigned int>(buffer, position,
true);
count = characteristics.Count;
length = characteristics.Length;
timeStep = characteristics.Statistics.TimeStep;
break;
}
case (type_unsigned_long):
{
auto characteristics =
ReadElementIndexCharacteristics<unsigned long int>(
buffer, position, true);
count = characteristics.Count;
length = characteristics.Length;
timeStep = characteristics.Statistics.TimeStep;
break;
}
case (type_real):
{
auto characteristics =
ReadElementIndexCharacteristics<float>(buffer, position, true);
count = characteristics.Count;
length = characteristics.Length;
timeStep = characteristics.Statistics.TimeStep;
break;
}
case (type_double):
{
auto characteristics =
ReadElementIndexCharacteristics<double>(buffer, position, true);
count = characteristics.Count;
length = characteristics.Length;
timeStep = characteristics.Statistics.TimeStep;
break;
}
// TODO: complex, string, string array, long double
}
};
auto lf_MergeRank = [&](const std::vector<SerialElementIndex> &indices) {
// extract header
ElementIndexHeader header;
// index non-empty buffer
size_t firstRank = 0;
// index positions per rank
std::vector<size_t> positions(indices.size(), 0);
// merge index length
uint32_t entryLength = 0;
size_t headerSize = 0;
for (size_t r = 0; r < indices.size(); ++r)
{
const auto &buffer = indices[r].Buffer;
if (buffer.empty())
{
continue;
}
size_t &position = positions[r];
header = ReadElementIndexHeader(buffer, position);
firstRank = r;
entryLength += position;
headerSize += position;
break;
}
uint64_t setsCount = 0;
unsigned int currentTimeStep = 1;
bool marching = true;
std::vector<char> sorted;
while (marching)
{
marching = false;
for (size_t r = firstRank; r < indices.size(); ++r)
{
const auto &buffer = indices[r].Buffer;
if (buffer.empty())
{
continue;
}
auto &position = positions[r];
if (position < buffer.size())
{
marching = true;
}
else
{
continue;
}
uint8_t count = 0;
uint32_t length = 0;
uint32_t timeStep = static_cast<uint32_t>(currentTimeStep);
while (timeStep == currentTimeStep)
{
size_t localPosition = position;
lf_GetCharacteristics(buffer, localPosition,
header.DataType, count, length,
timeStep);
if (timeStep != currentTimeStep)
{
break;
}
entryLength += length + 5;
++setsCount;
// here copy to sorted buffer
InsertToBuffer(sorted, &buffer[position], length + 5);
position += length + 5;
if (position >= buffer.size())
{
break;
}
}
}
++currentTimeStep;
}
// Copy header to metadata buffer, need mutex here
{
std::lock_guard<std::mutex> lock(m_Mutex);
auto &buffer = m_Metadata.m_Buffer;
auto &position = m_Metadata.m_Position;
m_Metadata.Resize(buffer.size() + headerSize + sorted.size(),
"in call to MergeSerializeIndices bp1 metadata");
CopyToBuffer(buffer, position, indices[firstRank].Buffer.data(),
headerSize);
CopyToBuffer(buffer, position, sorted.data(), sorted.size());
}
};
auto lf_MergeRankRange = [&](
const std::unordered_map<std::string, std::vector<SerialElementIndex>>
&nameRankIndices,
const std::vector<std::string> &names, const size_t start,
const size_t end)
{
for (size_t i = start; i < end; ++i)
{
auto itIndex = nameRankIndices.find(names[i]);
lf_MergeRank(itIndex->second);
}
};
// BODY OF FUNCTION STARTS HERE
if (m_Threads == 1) // serial version
{
for (const auto &rankIndices : nameRankIndices)
{
lf_MergeRank(rankIndices.second);
}
return;
}
// if threaded
const size_t elements = nameRankIndices.size();
const size_t stride = elements / m_Threads; // elements per thread
const size_t last = stride + elements % m_Threads; // remainder to last
std::vector<std::thread> threads;
threads.reserve(m_Threads);
// copy names in order to use threads
std::vector<std::string> names;
names.reserve(nameRankIndices.size());
for (const auto &nameRankIndexPair : nameRankIndices)
{
names.push_back(nameRankIndexPair.first);
}