diff --git a/source/adios2/core/adiosTemplates.h b/source/adios2/core/adiosTemplates.h index 702a5c1450a4a85d752c52f81acd7fa4578c21af..3c94ff392d5dfbcd2b66d230a5187874ccdc0b80 100644 --- a/source/adios2/core/adiosTemplates.h +++ b/source/adios2/core/adiosTemplates.h @@ -12,7 +12,8 @@ #define ADIOS2_CORE_ADIOSTEMPLATES_H_ /// \cond EXCLUDE_FROM_DOXYGEN -#include <cmath> //std::sqrt +#include <algorithm> //std::minmax_element +#include <cmath> //std::sqrt #include <complex> #include <cstring> //std::memcpy #include <iostream> @@ -22,6 +23,7 @@ /// \endcond #include "adios2/ADIOSConfig.h" +#include "adios2/ADIOSTypes.h" namespace adios { @@ -144,32 +146,69 @@ bool IsTypeAlias( return isAlias; } +template <class T> +void GetMinMax(const T *values, const size_t size, T &min, T &max) noexcept +{ + auto bounds = std::minmax_element(values, values + size); + min = *bounds.first; + max = *bounds.second; +} + /** * Get the minimum and maximum values in one loop * @param values array of primitives * @param size of the values array * @param min from values * @param max from values - * @param nthreads threaded version not yet implemented */ template <class T> -inline void GetMinMax(const T *values, const std::size_t size, T &min, T &max, - const unsigned int nthreads = 1) noexcept +void GetMinMaxThreads(const T *values, const size_t size, T &min, T &max, + const unsigned int threads = 1) noexcept { - min = values[0]; - max = min; + if (threads == 1) + { + GetMinMax(values, size, min, max); + return; + } + + const size_t stride = size / threads; // elements per thread + const size_t remainder = size % threads; // remainder if not aligned + const size_t last = stride + remainder; - for (std::size_t i = 1; i < size; ++i) + std::vector<T> mins(threads); // zero init + std::vector<T> maxs(threads); // zero init + + std::vector<std::thread> getMinMaxThreads; + getMinMaxThreads.reserve(threads); + + for (unsigned int t = 0; t < threads; ++t) { - if (values[i] < min) + const size_t position = stride * t; + + if (t == threads - 1) { - min = values[i]; - continue; + getMinMaxThreads.push_back( + std::thread(adios::GetMinMax<T>, &values[position], last, + std::ref(mins[t]), std::ref(maxs[t]))); } + else + { + getMinMaxThreads.push_back( + std::thread(adios::GetMinMax<T>, &values[position], stride, + std::ref(mins[t]), std::ref(maxs[t]))); + } + } - if (values[i] > max) - max = values[i]; + for (auto &getMinMaxThread : getMinMaxThreads) + { + getMinMaxThread.join(); } + + auto itMin = std::min_element(mins.begin(), mins.end()); + min = *itMin; + + auto itMax = std::max_element(maxs.begin(), maxs.end()); + max = *itMax; } /** @@ -179,17 +218,16 @@ inline void GetMinMax(const T *values, const std::size_t size, T &min, T &max, * @param size of the values array * @param min modulus from values * @param max modulus from values - * @param nthreads */ template <class T> -inline void GetMinMax(const std::complex<T> *values, const std::size_t size, - T &min, T &max, const unsigned int nthreads = 1) noexcept +void GetMinMaxComplex(const std::complex<T> *values, const size_t size, T &min, + T &max) noexcept { min = std::norm(values[0]); max = min; - for (std::size_t i = 1; i < size; ++i) + for (size_t i = 1; i < size; ++i) { T norm = std::norm(values[i]); @@ -209,6 +247,56 @@ inline void GetMinMax(const std::complex<T> *values, const std::size_t size, max = std::sqrt(max); } +template <class T> +void GetMinMaxThreads(const std::complex<T> *values, const size_t size, T &min, + T &max, const unsigned int threads = 1) noexcept +{ + if (threads == 1) + { + GetMinMaxComplex(values, size, min, max); + return; + } + + const size_t stride = size / threads; // elements per thread + const size_t remainder = size % threads; // remainder if not aligned + const size_t last = stride + remainder; + + std::vector<T> mins(threads); // zero init + std::vector<T> maxs(threads); // zero init + + std::vector<std::thread> getMinMaxThreads; + getMinMaxThreads.reserve(threads); + + for (unsigned int t = 0; t < threads; ++t) + { + const size_t position = stride * t; + + if (t == threads - 1) + { + getMinMaxThreads.push_back( + std::thread(GetMinMaxComplex<T>, &values[position], last, + std::ref(mins[t]), std::ref(maxs[t]))); + } + else + { + getMinMaxThreads.push_back( + std::thread(GetMinMaxComplex<T>, &values[position], stride, + std::ref(mins[t]), std::ref(maxs[t]))); + } + } + + for (auto &getMinMaxThread : getMinMaxThreads) + { + getMinMaxThread.join(); + } + + auto itMin = std::min_element(mins.begin(), mins.end()); + min = *itMin; + + auto itMax = std::max_element(maxs.begin(), maxs.end()); + max = *itMax; +} + /** * threaded version of std::memcpy * @param dest @@ -217,13 +305,12 @@ inline void GetMinMax(const std::complex<T> *values, const std::size_t size, * @param nthreads */ template <class T, class U> -void MemcpyThreads(T *destination, const U *source, std::size_t count, - const unsigned int nthreads = 1) +void MemcpyThreads(T *destination, const U *source, size_t count, + const unsigned int threads = 1) { // do not decompose tasks to less than 4MB pieces - const std::size_t minBlockSize = 4194304; - const std::size_t maxNThreads = - std::max((std::size_t)nthreads, count / minBlockSize); + const size_t minBlockSize = 4194304; + const size_t maxNThreads = std::max((size_t)threads, count / minBlockSize); if (maxNThreads == 1) { @@ -281,20 +368,65 @@ void InsertToBuffer(std::vector<char> &buffer, const T *source, } /** - * Copies data to a specific location in the buffer, - * doesn't update vec.size() + * Copies data to a specific location in the buffer. + * Updates position. + * Does not update vec.size(). * @param raw * @param position * @param source * @param elements */ template <class T> -void CopyToBufferPosition(std::vector<char> &buffer, const std::size_t position, - const T *source, - const std::size_t elements = 1) noexcept +void CopyToBuffer(std::vector<char> &buffer, size_t &position, const T *source, + const size_t elements = 1) noexcept { const char *src = reinterpret_cast<const char *>(source); std::copy(src, src + elements * sizeof(T), buffer.begin() + position); + position += elements * sizeof(T); +} + +template <class T> +void CopyToBufferThreads(std::vector<char> &buffer, size_t &position, + const T *source, const size_t elements = 1, + const unsigned int threads = 1) noexcept +{ + if (threads == 1) + { + CopyToBuffer(buffer, position, source, elements); + return; + } + + const size_t stride = elements / threads; // elements per thread + const size_t remainder = elements % threads; // remainder if not aligned + const size_t last = stride + remainder; + + std::vector<std::thread> copyThreads; + copyThreads.reserve(threads); + + for (unsigned int t = 0; t < threads; ++t) + { + size_t bufferPosition = stride * t * sizeof(T); + const size_t sourcePosition = stride * t; + + if (t == threads - 1) // last thread takes stride + remainder + { + copyThreads.push_back(std::thread(CopyToBuffer<T>, std::ref(buffer), + std::ref(bufferPosition), + &source[sourcePosition], last)); + position = bufferPosition; // last position + } + else + { + copyThreads.push_back(std::thread(CopyToBuffer<T>, std::ref(buffer), + std::ref(bufferPosition), + &source[sourcePosition], stride)); + } + } + + for (auto ©Thread : copyThreads) + { + copyThread.join(); + } } template <class T> diff --git a/source/adios2/engine/bp/BPFileWriter.cpp b/source/adios2/engine/bp/BPFileWriter.cpp index ad033c333d7d0c5e507f0dd3e3ed04586df5f11a..afc99b1de499ef2441fc7c2ce53d8a9d9631da56 100644 --- a/source/adios2/engine/bp/BPFileWriter.cpp +++ b/source/adios2/engine/bp/BPFileWriter.cpp @@ -25,10 +25,8 @@ BPFileWriter::BPFileWriter(ADIOS &adios, const std::string &name, const Method &method) : Engine(adios, "BPFileWriter", name, accessMode, mpiComm, method, " BPFileWriter constructor (or call to ADIOS Open).\n"), - m_Heap(m_DebugMode), m_BP1Aggregator(m_MPIComm, m_DebugMode), - m_MaxBufferSize(m_Heap.m_Data.max_size()) + m_BP1Writer(mpiComm, m_DebugMode) { - m_MetadataSet.TimeStep = 1; // to be compatible with ADIOS1.x Init(); } @@ -242,10 +240,7 @@ void BPFileWriter::Write(const std::string & /*variableName*/, { } -void BPFileWriter::Advance(float /*timeout_sec*/) -{ - m_BP1Writer.Advance(m_MetadataSet, m_Heap); -} +void BPFileWriter::Advance(float /*timeout_sec*/) { m_BP1Writer.Advance(); } void BPFileWriter::Close(const int transportIndex) { @@ -253,44 +248,76 @@ void BPFileWriter::Close(const int transportIndex) if (transportIndex == -1) { for (auto &transport : m_Transports) - { // by reference or value or it doesn't matter? - m_BP1Writer.Close(m_MetadataSet, m_Heap, *transport, m_IsFirstClose, - false); // false: not using aggregation for now + { + // false: not using aggregation for now + m_BP1Writer.Close(*transport, m_IsFirstClose, false); } } else { - m_BP1Writer.Close(m_MetadataSet, m_Heap, *m_Transports[transportIndex], - m_IsFirstClose, - false); // false: not using aggregation for now + // false: not using aggregation for now + m_BP1Writer.Close(*m_Transports[transportIndex], m_IsFirstClose, false); } - if (m_MetadataSet.Log.IsActive == true) + bool allClose = true; + for (auto &transport : m_Transports) { - bool allClose = true; - for (auto &transport : m_Transports) + if (transport->m_IsOpen == true) { - if (transport->m_IsOpen == true) - { - allClose = false; - break; - } + allClose = false; + break; } - if (allClose == true) // aggregate and write profiling.log - { - const std::string rankLog = m_BP1Writer.GetRankProfilingLog( - m_RankMPI, m_MetadataSet, m_Transports); + } - const std::string fileName(m_BP1Writer.GetDirectoryName(m_Name) + - "/profiling.log"); - m_BP1Aggregator.WriteProfilingLog(fileName, rankLog); - } + if (allClose == true) // aggregate and write profiling.log + { + m_BP1Writer.DumpProfilingLogFile(m_Name, m_RankMPI, m_Transports); } } // PRIVATE FUNCTIONS void BPFileWriter::InitParameters() { + auto itProfile = m_Method.m_Parameters.find("profile_units"); + if (itProfile != m_Method.m_Parameters.end()) + { + auto &log = m_BP1Writer.m_MetadataSet.Log; + + if (itProfile->second == "mus" || itProfile->second == "microseconds") + { + log.Timers.emplace_back("buffering", Support::Resolutions::mus); + } + else if (itProfile->second == "ms" || + itProfile->second == "milliseconds") + { + log.Timers.emplace_back("buffering", Support::Resolutions::ms); + } + else if (itProfile->second == "s" || itProfile->second == "seconds") + { + log.Timers.emplace_back("buffering", Support::Resolutions::s); + } + else if (itProfile->second == "min" || itProfile->second == "minutes") + { + log.Timers.emplace_back("buffering", Support::Resolutions::m); + } + else if (itProfile->second == "h" || itProfile->second == "hours") + { + log.Timers.emplace_back("buffering", Support::Resolutions::h); + } + else + { + if (m_DebugMode == true) + { + throw std::invalid_argument( + "ERROR: Method profile_buffering_units " + "argument must be mus, ms, s, min or h, in " + "call to Open or Engine constructor\n"); + } + } + + log.IsActive = true; + } + auto itGrowthFactor = m_Method.m_Parameters.find("buffer_growth"); if (itGrowthFactor != m_Method.m_Parameters.end()) { @@ -307,7 +334,6 @@ void BPFileWriter::InitParameters() } m_BP1Writer.m_GrowthFactor = growthFactor; - m_GrowthFactor = growthFactor; // float } auto itMaxBufferSize = m_Method.m_Parameters.find("max_size_MB"); @@ -324,8 +350,9 @@ void BPFileWriter::InitParameters() } } - m_MaxBufferSize = std::stoul(itMaxBufferSize->second) * - 1048576; // convert from MB to bytes + // convert from MB to bytes + m_BP1Writer.m_MaxBufferSize = + std::stoul(itMaxBufferSize->second) * 1048576; } auto itVerbosity = m_Method.m_Parameters.find("verbose"); @@ -344,46 +371,6 @@ void BPFileWriter::InitParameters() } m_BP1Writer.m_Verbosity = verbosity; } - - auto itProfile = m_Method.m_Parameters.find("profile_units"); - if (itProfile != m_Method.m_Parameters.end()) - { - auto &log = m_MetadataSet.Log; - - if (itProfile->second == "mus" || itProfile->second == "microseconds") - { - log.Timers.emplace_back("buffering", Support::Resolutions::mus); - } - else if (itProfile->second == "ms" || - itProfile->second == "milliseconds") - { - log.Timers.emplace_back("buffering", Support::Resolutions::ms); - } - else if (itProfile->second == "s" || itProfile->second == "seconds") - { - log.Timers.emplace_back("buffering", Support::Resolutions::s); - } - else if (itProfile->second == "min" || itProfile->second == "minutes") - { - log.Timers.emplace_back("buffering", Support::Resolutions::m); - } - else if (itProfile->second == "h" || itProfile->second == "hours") - { - log.Timers.emplace_back("buffering", Support::Resolutions::h); - } - else - { - if (m_DebugMode == true) - { - throw std::invalid_argument( - "ERROR: Method profile_buffering_units " - "argument must be mus, ms, s, min or h, in " - "call to Open or Engine constructor\n"); - } - } - - log.IsActive = true; - } } void BPFileWriter::InitTransports() @@ -399,6 +386,8 @@ void BPFileWriter::InitTransports() } } + bool setBuffer = false; + for (const auto ¶meters : m_Method.m_TransportParameters) { auto itProfile = parameters.find("profile_units"); @@ -460,6 +449,7 @@ void BPFileWriter::InitTransports() m_BP1Writer.OpenRankFiles(m_Name, m_AccessMode, *file); m_Transports.push_back(std::move(file)); + setBuffer = true; } else if (itLibrary->second == "FILE*" || itLibrary->second == "stdio") @@ -473,6 +463,7 @@ void BPFileWriter::InitTransports() m_BP1Writer.OpenRankFiles(m_Name, m_AccessMode, *file); m_Transports.push_back(std::move(file)); + setBuffer = true; } else if (itLibrary->second == "fstream" || itLibrary->second == "std::fstream") @@ -487,6 +478,7 @@ void BPFileWriter::InitTransports() m_BP1Writer.OpenRankFiles(m_Name, m_AccessMode, *file); m_Transports.push_back(std::move(file)); + setBuffer = true; } else if (itLibrary->second == "MPI_File" || itLibrary->second == "MPI-IO") @@ -513,13 +505,27 @@ void BPFileWriter::InitTransports() } } } + + if (setBuffer == false) + { + if (m_DebugMode == true) + { + throw std::invalid_argument( + "ERROR: file transport not declared in Method " + "need call to Method.AddTransport, in " + + m_Name + m_EndMessage); + } + } + + // initial size is 16KB, memory is initialized to zero + m_BP1Writer.m_Heap.ResizeData(16777216); } void BPFileWriter::InitProcessGroup() { - if (m_MetadataSet.Log.IsActive == true) + if (m_BP1Writer.m_MetadataSet.Log.IsActive == true) { - m_MetadataSet.Log.Timers[0].SetInitialTime(); + m_BP1Writer.m_MetadataSet.Log.Timers[0].SetInitialTime(); } if (m_AccessMode == "a") @@ -530,34 +536,19 @@ void BPFileWriter::InitProcessGroup() WriteProcessGroupIndex(); - if (m_MetadataSet.Log.IsActive == true) + if (m_BP1Writer.m_MetadataSet.Log.IsActive == true) { - m_MetadataSet.Log.Timers[0].SetTime(); + m_BP1Writer.m_MetadataSet.Log.Timers[0].SetTime(); } } void BPFileWriter::WriteProcessGroupIndex() { - // pg = process group - // const std::size_t pgIndexSize = m_BP1Writer.GetProcessGroupIndexSize( - // std::to_string( m_RankMPI ), - // std::to_string( - // m_MetadataSet.TimeStep - // ), - // m_Transports.size() - // ); - // metadata - // GrowBuffer( pgIndexSize, m_GrowthFactor, m_MetadataSet.PGIndex ); - - // data? Need to be careful, maybe add some trailing tolerance in variable - // ???? - // GrowBuffer( pgIndexSize, m_GrowthFactor, m_Buffer.m_Data ); - const bool isFortran = (m_HostLanguage == "Fortran") ? true : false; m_BP1Writer.WriteProcessGroupIndex(isFortran, std::to_string(m_RankMPI), - static_cast<std::uint32_t>(m_RankMPI), - m_Transports, m_Heap, m_MetadataSet); + static_cast<uint32_t>(m_RankMPI), + m_Transports); } } // end namespace adios diff --git a/source/adios2/engine/bp/BPFileWriter.h b/source/adios2/engine/bp/BPFileWriter.h index 1c0c533a3fd4efd98e69a9aafda18742e353a7cb..343c3167c48b6f6f39dda8bdd85ddaa1fc55a789 100644 --- a/source/adios2/engine/bp/BPFileWriter.h +++ b/source/adios2/engine/bp/BPFileWriter.h @@ -32,7 +32,6 @@ public: * @param method * @param debugMode */ - BPFileWriter(ADIOS &adios, const std::string &name, const std::string accessMode, MPI_Comm mpiComm, const Method &method); @@ -98,31 +97,21 @@ public: void Close(const int transportIndex = -1); private: - capsule::STLVector m_Heap; ///< heap capsule using STL std::vector<char> - format::BP1Writer - m_BP1Writer; ///< format object will provide the required BP - /// functionality to be applied on m_Buffer and - /// m_Transports - format::BP1MetadataSet - m_MetadataSet; ///< metadata set accompanying the heap - /// buffer data in bp format. Needed by - /// m_BP1Writer - format::BP1Aggregator m_BP1Aggregator; - - bool m_IsFirstClose = - true; ///< set to false after first Close is reached so - /// metadata doesn't have to be accommodated for a - /// subsequent Close - std::size_t m_MaxBufferSize; ///< maximum allowed memory to be allocated - float m_GrowthFactor = 1.5; ///< capsule memory growth factor, new_memory = - /// m_GrowthFactor * current_memory - - bool m_TransportFlush = false; ///< true: due to buffer overflow - - bool m_CloseProcessGroup = - false; ///< set to true if advance is called, this - /// prevents flattening the data and metadata - /// in Close + format::BP1Writer m_BP1Writer; + + /** set to false after first Close is reached so metadata + doesn't have to be accommodated for a subsequent Close */ + bool m_IsFirstClose = true; + + /** data buffer exponential growth factor */ + float m_GrowthFactor = 1.5; + + ///< true: due to buffer overflow + bool m_TransportFlush = false; + + /** set to true if advance is called, this + prevents flattening the data and metadata in Close*/ + bool m_CloseProcessGroup = false; void Init(); void InitParameters(); @@ -140,16 +129,20 @@ private: template <class T> void WriteVariableCommon(Variable<T> &variable, const T *values) { - if (m_MetadataSet.Log.IsActive == true) - m_MetadataSet.Log.Timers[0].SetInitialTime(); + if (m_BP1Writer.m_MetadataSet.Log.IsActive == true) + { + m_BP1Writer.m_MetadataSet.Log.Timers[0].SetInitialTime(); + } // set variable variable.m_AppValues = values; m_WrittenVariables.insert(variable.m_Name); - // if first timestep Write - if (m_MetadataSet.DataPGIsOpen == false) // create a new pg index + // if first timestep Write create a new pg index + if (m_BP1Writer.m_MetadataSet.DataPGIsOpen == false) + { WriteProcessGroupIndex(); + } // pre-calculate new metadata and payload sizes // m_TransportFlush = CheckBufferAllocation( @@ -160,7 +153,7 @@ private: // m_Buffer.m_Data ); // WRITE INDEX to data buffer and metadata structure (in memory)// - m_BP1Writer.WriteVariableMetadata(variable, m_Heap, m_MetadataSet); + m_BP1Writer.WriteVariableMetadata(variable); if (m_TransportFlush == true) // in batches { @@ -172,14 +165,16 @@ private: } else // Write data to buffer { - m_BP1Writer.WriteVariablePayload(variable, m_Heap, m_nThreads); + m_BP1Writer.WriteVariablePayload(variable); } - variable.m_AppValues = - nullptr; // setting pointer to null as not needed after write + // not needed after write + variable.m_AppValues = nullptr; - if (m_MetadataSet.Log.IsActive == true) - m_MetadataSet.Log.Timers[0].SetTime(); + if (m_BP1Writer.m_MetadataSet.Log.IsActive == true) + { + m_BP1Writer.m_MetadataSet.Log.Timers[0].SetTime(); + } } }; diff --git a/source/adios2/engine/dataman/DataManReader.cpp b/source/adios2/engine/dataman/DataManReader.cpp index 6ecc32f09beff58eeeb04d064043f59f9109523f..e9349adad5708bc94a1492b35ae22b3196743cdb 100644 --- a/source/adios2/engine/dataman/DataManReader.cpp +++ b/source/adios2/engine/dataman/DataManReader.cpp @@ -24,8 +24,7 @@ DataManReader::DataManReader(ADIOS &adios, const std::string &name, const std::string accessMode, MPI_Comm mpiComm, const Method &method) : Engine(adios, "DataManReader", name, accessMode, mpiComm, method, - " DataManReader constructor (or call to ADIOS Open).\n"), - m_Heap(m_DebugMode) + " DataManReader constructor (or call to ADIOS Open).\n") { Init(); } diff --git a/source/adios2/engine/dataman/DataManReader.h b/source/adios2/engine/dataman/DataManReader.h index 4475c31059cf2bf86469f230b158968e9a8cf0c5..43588fbc89be78604b92fd9df9d438ac7207e0fd 100644 --- a/source/adios2/engine/dataman/DataManReader.h +++ b/source/adios2/engine/dataman/DataManReader.h @@ -13,13 +13,13 @@ #include <iostream> //std::cout << Needs to go +#include <DataMan.h> + #include "adios2/ADIOSConfig.h" #include "adios2/capsule/heap/STLVector.h" #include "adios2/core/Engine.h" #include "adios2/utilities/format/bp1/BP1Writer.h" -#include <DataMan.h> - namespace adios { @@ -100,13 +100,6 @@ public: void Close(const int transportIndex = -1); private: - capsule::STLVector - m_Heap; ///< heap capsule, contains data and metadata buffers - format::BP1Writer - m_BP1Writer; ///< format object will provide the required BP - /// functionality to be applied on m_Buffer and - /// m_Transports - bool m_DoRealTime = false; DataMan m_Man; std::function<void(const void *, std::string, std::string, std::string, diff --git a/source/adios2/engine/dataman/DataManWriter.cpp b/source/adios2/engine/dataman/DataManWriter.cpp index 1cf180ffe0fb5bbf3ec55ea8e8756ce948cdd4c5..bd4bf6aa3e5e994a4f4053996fc98c277501b6d8 100644 --- a/source/adios2/engine/dataman/DataManWriter.cpp +++ b/source/adios2/engine/dataman/DataManWriter.cpp @@ -24,8 +24,7 @@ DataManWriter::DataManWriter(ADIOS &adios, const std::string name, const std::string accessMode, MPI_Comm mpiComm, const Method &method) : Engine(adios, "DataManWriter", name, accessMode, mpiComm, method, - " DataManWriter constructor (or call to ADIOS Open).\n"), - m_Heap(m_DebugMode) + " DataManWriter constructor (or call to ADIOS Open).\n") { Init(); } diff --git a/source/adios2/engine/dataman/DataManWriter.h b/source/adios2/engine/dataman/DataManWriter.h index 44567dddff6d6fc6b497d17cfbe713b738c413db..9405cc22fbe858e06d3865c2f6809f21c378b750 100644 --- a/source/adios2/engine/dataman/DataManWriter.h +++ b/source/adios2/engine/dataman/DataManWriter.h @@ -14,13 +14,13 @@ #include <iostream> //std::cout must be removed, only used for hello example #include <unistd.h> //sleep must be removed +#include <DataMan.h> + #include "adios2/ADIOSConfig.h" #include "adios2/capsule/heap/STLVector.h" #include "adios2/core/Engine.h" #include "adios2/utilities/format/bp1/BP1Writer.h" -#include <DataMan.h> - namespace adios { @@ -97,13 +97,6 @@ public: void Close(const int transportIndex = -1); private: - capsule::STLVector - m_Heap; ///< heap capsule, contains data and metadata buffers - format::BP1Writer - m_BP1Writer; ///< format object will provide the required BP - /// functionality to be applied on m_Buffer and - /// m_Transports - bool m_DoRealTime = false; bool m_DoMonitor = false; DataMan m_Man; diff --git a/source/adios2/utilities/format/bp1/BP1Base.cpp b/source/adios2/utilities/format/bp1/BP1Base.cpp index 86da561c739e14a4855385f6d2bc9bc4203df254..43b1d293b6bbe94b2e36644a5ddf615ea420f8f8 100644 --- a/source/adios2/utilities/format/bp1/BP1Base.cpp +++ b/source/adios2/utilities/format/bp1/BP1Base.cpp @@ -10,13 +10,18 @@ #include "BP1Base.h" -#include "adios2/core/adiosFunctions.h" +#include "adios2/core/adiosFunctions.h" //CreateDirectory namespace adios { namespace format { +BP1Base::BP1Base(MPI_Comm mpiComm, const bool debugMode) +: m_BP1Aggregator(mpiComm, debugMode) +{ +} + std::string BP1Base::GetDirectoryName(const std::string name) const noexcept { std::string directory; @@ -37,20 +42,19 @@ void BP1Base::OpenRankFiles(const std::string name, const std::string accessMode, Transport &file) const { const std::string directory = GetDirectoryName(name); - CreateDirectory( - directory); // creates a directory and sub-directories recursively + // creates a directory and sub-directories recursively + CreateDirectory(directory); - std::string fileName(directory + "/" + directory + "." + - std::to_string(file.m_RankMPI)); - file.Open(fileName, accessMode); // opens a file transport under - // name.bp/name.bp.rank reserve that - // location for writing + // opens a file transport under name.bp/name.bp.rank + const std::string fileName(directory + "/" + directory + "." + + std::to_string(file.m_RankMPI)); + file.Open(fileName, accessMode); } -std::vector<std::uint8_t> BP1Base::GetMethodIDs( +std::vector<uint8_t> BP1Base::GetMethodIDs( const std::vector<std::shared_ptr<Transport>> &transports) const noexcept { - auto lf_GetMethodID = [](const std::string method) -> std::uint8_t { + auto lf_GetMethodID = [](const std::string method) -> uint8_t { int id = METHOD_UNKNOWN; if (method == "NULL") id = METHOD_NULL; @@ -66,7 +70,7 @@ std::vector<std::uint8_t> BP1Base::GetMethodIDs( return id; }; - std::vector<std::uint8_t> methodIDs; + std::vector<uint8_t> methodIDs; methodIDs.reserve(transports.size()); for (const auto &transport : transports) diff --git a/source/adios2/utilities/format/bp1/BP1Base.h b/source/adios2/utilities/format/bp1/BP1Base.h index f9ce6493881909421cfca6ef094d9ba3e11b1f7d..80b89146202ed89d7f864256efe4564342f02301 100644 --- a/source/adios2/utilities/format/bp1/BP1Base.h +++ b/source/adios2/utilities/format/bp1/BP1Base.h @@ -12,14 +12,17 @@ #define ADIOS2_UTILITIES_FORMAT_BP1_BP1BASE_H_ /// \cond EXCLUDE_FROM_DOXYGEN -#include <cstdint> //std::uintX_t -#include <memory> //std::shared_ptr +#include <memory> //std::shared_ptr #include <unordered_map> #include <vector> /// \endcond +#include "BP1Aggregator.h" +#include "BP1Structs.h" #include "adios2/ADIOSConfig.h" +#include "adios2/ADIOSTypes.h" #include "adios2/ADIOS_MPI.h" +#include "adios2/capsule/heap/STLVector.h" #include "adios2/core/Transport.h" namespace adios @@ -34,6 +37,31 @@ class BP1Base { public: + /** statistics verbosity, only 0 is supported */ + unsigned int m_Verbosity = 0; + + /** contains data buffer and position */ + capsule::STLVector m_Heap = capsule::STLVector(true); + + /** memory growth factor */ + float m_GrowthFactor = 1.5; + + /** max buffer size */ + size_t m_MaxBufferSize = 0; + + /** contains bp1 format metadata */ + BP1MetadataSet m_MetadataSet; + + /** aggregation tasks */ + BP1Aggregator m_BP1Aggregator; + + /** + * Unique constructor + * @param mpiComm for m_BP1Aggregator + * @param debugMode true: exceptions checks + */ + BP1Base(MPI_Comm mpiComm, const bool debugMode); + /** * Checks if input name has .bp extension and returns a .bp directory name * @param name input (might or not have .bp) @@ -53,37 +81,31 @@ public: Transport &file) const; protected: - /** - * method type for file I/O - */ + /** might be used in large payload copies to buffer */ + unsigned int m_Threads = 1; + + /** method type for file I/O */ enum IO_METHOD { - METHOD_UNKNOWN = -2 //!< ADIOS_METHOD_UNKNOWN - , - METHOD_NULL = -1 //!< ADIOS_METHOD_NULL - , - METHOD_MPI = 0 //!< METHOD_MPI - , + METHOD_UNKNOWN = -2, + METHOD_NULL = -1, + METHOD_MPI = 0, METHOD_DATATAP = 1 // OBSOLETE , - METHOD_POSIX = 2 //!< METHOD_POSIX - , - METHOD_DATASPACES = 3 //!< METHOD_DATASPACES - , + METHOD_POSIX = 2, + METHOD_DATASPACES = 3, METHOD_VTK = 4 // non-existent , METHOD_POSIX_ASCII = 5 // non-existent , METHOD_MPI_CIO = 6 // OBSOLETE , - METHOD_PHDF5 = 7 //!< METHOD_PHDF5 - , + METHOD_PHDF5 = 7, METHOD_PROVENANCE = 8 // OBSOLETE , METHOD_MPI_STRIPE = 9 // OBSOLETE , - METHOD_MPI_LUSTRE = 10 //!< METHOD_MPI_LUSTRE - , + METHOD_MPI_LUSTRE = 10, METHOD_MPI_STAGGER = 11 // OBSOLETE , METHOD_MPI_AGG = 12 // OBSOLETE @@ -92,28 +114,18 @@ protected: , METHOD_POSIX1 = 14 // OBSOLETE , - METHOD_NC4 = 15 //!< METHOD_NC4 - , - METHOD_MPI_AMR = 16 //!< METHOD_MPI_AMR - , + METHOD_NC4 = 15, + METHOD_MPI_AMR = 16, METHOD_MPI_AMR1 = 17 // OBSOLETE , - METHOD_FLEXPATH = 18 //!< METHOD_FLEXPATH - , - METHOD_NSSI_STAGING = 19 //!< METHOD_NSSI_STAGING - , - METHOD_NSSI_FILTER = 20 //!< METHOD_NSSI_FILTER - , - METHOD_DIMES = 21 //!< METHOD_DIMES - , - METHOD_VAR_MERGE = 22 //!< METHOD_VAR_MERGE - , - METHOD_MPI_BGQ = 23 //!< METHOD_MPI_BGQ - , - METHOD_ICEE = 24 //!< METHOD_ICEE - , - METHOD_COUNT = 25 //!< METHOD_COUNT - , + METHOD_FLEXPATH = 18, + METHOD_NSSI_STAGING = 19, + METHOD_NSSI_FILTER = 20, + METHOD_DIMES = 21, + METHOD_VAR_MERGE = 22, + METHOD_MPI_BGQ = 23, + METHOD_ICEE = 24, + METHOD_COUNT = 25, METHOD_FSTREAM = 26, METHOD_FILE = 27, METHOD_ZMQ = 28, @@ -182,14 +194,14 @@ protected: { T Min; T Max; - std::uint64_t Offset; - std::uint64_t PayloadOffset; - std::uint32_t TimeIndex; - std::uint32_t MemberID; - - // unsigned long int count; - // long double sum; - // long double sumSquare; + uint64_t Offset; + uint64_t PayloadOffset; + uint32_t TimeIndex; + uint32_t MemberID; + + // unsigned long int count; + // long double sum; + // long double sumSquare; // unsigned long int histogram // bool finite?? }; @@ -200,12 +212,12 @@ protected: * @return data type */ template <class T> - inline std::int8_t GetDataType() const noexcept + inline int8_t GetDataType() const noexcept { return type_unknown; } - std::vector<std::uint8_t> GetMethodIDs( + std::vector<uint8_t> GetMethodIDs( const std::vector<std::shared_ptr<Transport>> &transports) const noexcept; }; @@ -213,63 +225,63 @@ protected: // Moving template BP1Writer::GetDataType template specializations outside of // the class template <> -inline std::int8_t BP1Base::GetDataType<char>() const noexcept +inline int8_t BP1Base::GetDataType<char>() const noexcept { return type_byte; } template <> -inline std::int8_t BP1Base::GetDataType<short>() const noexcept +inline int8_t BP1Base::GetDataType<short>() const noexcept { return type_short; } template <> -inline std::int8_t BP1Base::GetDataType<int>() const noexcept +inline int8_t BP1Base::GetDataType<int>() const noexcept { return type_integer; } template <> -inline std::int8_t BP1Base::GetDataType<long int>() const noexcept +inline int8_t BP1Base::GetDataType<long int>() const noexcept { return type_long; } template <> -inline std::int8_t BP1Base::GetDataType<unsigned char>() const noexcept +inline int8_t BP1Base::GetDataType<unsigned char>() const noexcept { return type_unsigned_byte; } template <> -inline std::int8_t BP1Base::GetDataType<unsigned short>() const noexcept +inline int8_t BP1Base::GetDataType<unsigned short>() const noexcept { return type_unsigned_short; } template <> -inline std::int8_t BP1Base::GetDataType<unsigned int>() const noexcept +inline int8_t BP1Base::GetDataType<unsigned int>() const noexcept { return type_unsigned_integer; } template <> -inline std::int8_t BP1Base::GetDataType<unsigned long int>() const noexcept +inline int8_t BP1Base::GetDataType<unsigned long int>() const noexcept { return type_unsigned_long; } template <> -inline std::int8_t BP1Base::GetDataType<float>() const noexcept +inline int8_t BP1Base::GetDataType<float>() const noexcept { return type_real; } template <> -inline std::int8_t BP1Base::GetDataType<double>() const noexcept +inline int8_t BP1Base::GetDataType<double>() const noexcept { return type_double; } template <> -inline std::int8_t BP1Base::GetDataType<long double>() const noexcept +inline int8_t BP1Base::GetDataType<long double>() const noexcept { return type_long_double; } diff --git a/source/adios2/utilities/format/bp1/BP1Structs.h b/source/adios2/utilities/format/bp1/BP1Structs.h index 60a79a04adefac460ef621e3e83111ff4cf41bf1..825a446fbcde647c36f682c18808fa92b3128cf0 100644 --- a/source/adios2/utilities/format/bp1/BP1Structs.h +++ b/source/adios2/utilities/format/bp1/BP1Structs.h @@ -19,6 +19,7 @@ /// \endcond #include "adios2/ADIOSConfig.h" +#include "adios2/ADIOSTypes.h" #include "adios2/core/IOChrono.h" namespace adios @@ -34,11 +35,12 @@ namespace format struct BP1Index { std::vector<char> Buffer; ///< metadata variable index, start with 100Kb - std::uint64_t Count = - 0; ///< number of characteristics sets (time and spatial aggregation) - const std::uint32_t MemberID; + /** number of characteristics sets (time and spatial aggregation) */ + uint64_t Count = 0; + /** unique ID assigned to each variable for counter */ + const uint32_t MemberID; - BP1Index(const std::uint32_t memberID) : MemberID{memberID} + BP1Index(const uint32_t memberID) : MemberID(memberID) { Buffer.reserve(500); } @@ -49,9 +51,11 @@ struct BP1Index */ struct BP1MetadataSet { - std::uint32_t - TimeStep; ///< current time step, updated with advance step, if - /// append it will be updated to last, starts with one in ADIOS1 + /** + * updated with advance step, if append it will be updated to last, + * starts with one in ADIOS1 + */ + uint32_t TimeStep = 1; BP1Index PGIndex = BP1Index(0); ///< single buffer for PGIndex @@ -59,24 +63,23 @@ struct BP1MetadataSet /** @brief key: variable name, value: bp metadata variable index */ std::unordered_map<std::string, BP1Index> VarsIndices; - std::unordered_map<std::string, BP1Index> - AttributesIndices; ///< key: name, value: attribute bp index + /** @brief key: attribute name, value: bp metadata attribute index */ + std::unordered_map<std::string, BP1Index> AttributesIndices; const unsigned int MiniFooterSize = 28; ///< from bpls reader - // PG (relative) positions in Data buffer to be updated - std::uint64_t DataPGCount = 0; - std::size_t DataPGLengthPosition = 0; ///< current PG initial ( relative ) - /// position, needs to be updated in - /// every advance step or init - std::uint32_t DataPGVarsCount = 0; ///< variables in current PG - - /** - * current PG variable count ( relative ) position, needs to be - * updated in very Advance - */ - std::size_t DataPGVarsCountPosition = 0; - + // PG (relative) positions in Data buffer, to be updated every advance step + // or init + /** number of current PGs */ + uint64_t DataPGCount = 0; + /** current PG initial ( relative ) position in data buffer */ + size_t DataPGLengthPosition = 0; + /** number of variables in current PG */ + uint32_t DataPGVarsCount = 0; + /** current PG variable count ( relative ) position */ + size_t DataPGVarsCountPosition = 0; + + /** true: currently writing to a pg, false: no current pg */ bool DataPGIsOpen = false; profiling::IOChrono Log; ///< object that takes buffering profiling info diff --git a/source/adios2/utilities/format/bp1/BP1Writer.cpp b/source/adios2/utilities/format/bp1/BP1Writer.cpp index 6f42bc968b7b944be9565ed2030b3bf85e5b132f..76814161e571683ee84eedbc2748d2d6d2348234 100644 --- a/source/adios2/utilities/format/bp1/BP1Writer.cpp +++ b/source/adios2/utilities/format/bp1/BP1Writer.cpp @@ -19,27 +19,31 @@ namespace adios namespace format { +BP1Writer::BP1Writer(MPI_Comm mpiComm, const bool debugMode) +: BP1Base(mpiComm, debugMode) +{ +} + std::size_t BP1Writer::GetProcessGroupIndexSize( const std::string name, const std::string timeStepName, const std::size_t numberOfTransports) const noexcept { // pgIndex + list of methods (transports) return (name.length() + timeStepName.length() + 23) + - (3 + - numberOfTransports); // should be sufficient for data and metadata - // pgindices + (3 + numberOfTransports); } void BP1Writer::WriteProcessGroupIndex( - const bool isFortran, const std::string name, const std::uint32_t processID, - const std::vector<std::shared_ptr<Transport>> &transports, - capsule::STLVector &heap, BP1MetadataSet &metadataSet) const noexcept + const bool isFortran, const std::string name, const uint32_t processID, + const std::vector<std::shared_ptr<Transport>> &transports) noexcept { - std::vector<char> &metadataBuffer = metadataSet.PGIndex.Buffer; - std::vector<char> &dataBuffer = heap.m_Data; + std::vector<char> &metadataBuffer = m_MetadataSet.PGIndex.Buffer; - metadataSet.DataPGLengthPosition = dataBuffer.size(); - dataBuffer.insert(dataBuffer.end(), 8, 0); // skip pg length (8) + std::vector<char> &dataBuffer = m_Heap.m_Data; + size_t &dataPosition = m_Heap.m_DataPosition; + + m_MetadataSet.DataPGLengthPosition = dataPosition; + dataPosition += 8; // skip pg length (8) const std::size_t metadataPGLengthPosition = metadataBuffer.size(); metadataBuffer.insert(metadataBuffer.end(), 2, 0); // skip pg length (2) @@ -47,95 +51,93 @@ void BP1Writer::WriteProcessGroupIndex( // write name to metadata WriteNameRecord(name, metadataBuffer); // write if host language Fortran in metadata and data - const char hostFortran = - (isFortran) ? 'y' : 'n'; // if host language is fortran + const char hostFortran = (isFortran) ? 'y' : 'n'; InsertToBuffer(metadataBuffer, &hostFortran); - InsertToBuffer(dataBuffer, &hostFortran); + CopyToBuffer(dataBuffer, dataPosition, &hostFortran); // write name in data - WriteNameRecord(name, dataBuffer); + WriteNameRecord(name, dataBuffer, dataPosition); // processID in metadata, InsertToBuffer(metadataBuffer, &processID); // skip coordination var in data ....what is coordination var? - dataBuffer.insert(dataBuffer.end(), 4, 0); + dataPosition += 4; // time step name to metadata and data - const std::string timeStepName(std::to_string(metadataSet.TimeStep)); + const std::string timeStepName(std::to_string(m_MetadataSet.TimeStep)); WriteNameRecord(timeStepName, metadataBuffer); - WriteNameRecord(timeStepName, dataBuffer); + WriteNameRecord(timeStepName, dataBuffer, dataPosition); // time step to metadata and data - InsertToBuffer(metadataBuffer, &metadataSet.TimeStep); - InsertToBuffer(dataBuffer, &metadataSet.TimeStep); + InsertToBuffer(metadataBuffer, &m_MetadataSet.TimeStep); + CopyToBuffer(dataBuffer, dataPosition, &m_MetadataSet.TimeStep); // offset to pg in data in metadata which is the current absolute position - InsertToBuffer(metadataBuffer, reinterpret_cast<std::uint64_t *>( - &heap.m_DataAbsolutePosition)); + InsertToBuffer(metadataBuffer, reinterpret_cast<uint64_t *>( + &m_Heap.m_DataAbsolutePosition)); // Back to writing metadata pg index length (length of group) - const std::uint16_t metadataPGIndexLength = - metadataBuffer.size() - metadataPGLengthPosition - - 2; // without length of group record - CopyToBufferPosition(metadataBuffer, metadataPGLengthPosition, - &metadataPGIndexLength); + const uint16_t metadataPGIndexLength = + metadataBuffer.size() - metadataPGLengthPosition - 2; + size_t backPosition = metadataPGLengthPosition; + CopyToBuffer(metadataBuffer, backPosition, &metadataPGIndexLength); // DONE With metadataBuffer // here write method in data - const std::vector<std::uint8_t> methodIDs = GetMethodIDs(transports); - const std::uint8_t methodsCount = methodIDs.size(); - InsertToBuffer(dataBuffer, &methodsCount); // count - const std::uint16_t methodsLength = - methodIDs.size() * - 3; // methodID (1) + method params length(2), no parameters for now - InsertToBuffer(dataBuffer, &methodsLength); // length + const std::vector<uint8_t> methodIDs = GetMethodIDs(transports); + const uint8_t methodsCount = methodIDs.size(); + CopyToBuffer(dataBuffer, dataPosition, &methodsCount); // count + // methodID (1) + method params length(2), no parameters for now + const uint16_t methodsLength = methodIDs.size() * 3; + + CopyToBuffer(dataBuffer, dataPosition, &methodsLength); // length for (const auto methodID : methodIDs) { - InsertToBuffer(dataBuffer, &methodID); // method ID, - dataBuffer.insert(dataBuffer.end(), 2, - 0); // skip method params length = 0 (2 bytes) for now + CopyToBuffer(dataBuffer, dataPosition, &methodID); // method ID, + dataPosition += 2; // skip method params length = 0 (2 bytes) for now } // update absolute position - heap.m_DataAbsolutePosition += - dataBuffer.size() - metadataSet.DataPGLengthPosition; + m_Heap.m_DataAbsolutePosition += + dataPosition - m_MetadataSet.DataPGLengthPosition; // pg vars count and position - metadataSet.DataPGVarsCount = 0; - metadataSet.DataPGVarsCountPosition = dataBuffer.size(); + m_MetadataSet.DataPGVarsCount = 0; + m_MetadataSet.DataPGVarsCountPosition = dataPosition; // add vars count and length - dataBuffer.insert(dataBuffer.end(), 12, 0); - heap.m_DataAbsolutePosition += 12; // add vars count and length + dataPosition += 12; + m_Heap.m_DataAbsolutePosition += 12; // add vars count and length - ++metadataSet.DataPGCount; - metadataSet.DataPGIsOpen = true; + ++m_MetadataSet.DataPGCount; + m_MetadataSet.DataPGIsOpen = true; } -void BP1Writer::Advance(BP1MetadataSet &metadataSet, capsule::STLVector &buffer) -{ - FlattenData(metadataSet, buffer); -} +void BP1Writer::Advance() { FlattenData(); } -void BP1Writer::Close(BP1MetadataSet &metadataSet, capsule::STLVector &heap, - Transport &transport, bool &isFirstClose, - const bool doAggregation) const noexcept +void BP1Writer::Close(Transport &transport, bool &isFirstClose, + const bool doAggregation) noexcept { - if (metadataSet.Log.IsActive == true) - metadataSet.Log.Timers[0].SetInitialTime(); + if (m_MetadataSet.Log.IsActive == true) + { + m_MetadataSet.Log.Timers[0].SetInitialTime(); + } if (isFirstClose == true) { - if (metadataSet.DataPGIsOpen == true) - FlattenData(metadataSet, heap); + if (m_MetadataSet.DataPGIsOpen == true) + { + FlattenData(); + } - FlattenMetadata(metadataSet, heap); + FlattenMetadata(); - if (metadataSet.Log.IsActive == true) - metadataSet.Log.Timers[0].SetInitialTime(); + if (m_MetadataSet.Log.IsActive == true) + { + m_MetadataSet.Log.Timers[0].SetInitialTime(); + } - if (doAggregation == - true) // N-to-M where 1 <= M <= N-1, might need a new - // Log metadataSet.Log.m_Timers just for - // aggregation + // N-to-M where 1 <= M <= N-1, might need a new + // Log metadataSet.Log.m_Timers just for aggregation + if (doAggregation == true) { // here call aggregator } @@ -148,134 +150,101 @@ void BP1Writer::Close(BP1MetadataSet &metadataSet, capsule::STLVector &heap, } else // N-to-N { - transport.Write(heap.m_Data.data(), heap.m_Data.size()); // single write + // single write + transport.Write(m_Heap.m_Data.data(), m_Heap.m_Data.size()); transport.Close(); } } -std::string BP1Writer::GetRankProfilingLog( - const int rank, const BP1MetadataSet &metadataSet, - const std::vector<std::shared_ptr<Transport>> &transports) const noexcept +void BP1Writer::DumpProfilingLogFile( + const std::string name, const unsigned int rank, + const std::vector<std::shared_ptr<Transport>> &transports) noexcept { - auto lf_WriterTimer = [](std::string &rankLog, - const profiling::Timer &timer) { - rankLog += "'" + timer.m_Process + "_" + timer.GetUnits() + "': " + - std::to_string(timer.m_ProcessTime); - }; - - // prepare string dictionary per rank - std::string rankLog("'rank_" + std::to_string(rank) + "': { "); - - auto &profiler = metadataSet.Log; - rankLog += "'bytes': " + std::to_string(profiler.TotalBytes[0]) + ", "; - lf_WriterTimer(rankLog, profiler.Timers[0]); - rankLog += ", "; + const std::string fileName(GetDirectoryName(name) + "/profiling.log"); + const std::string rankLog = GetRankProfilingLog(rank, transports); + m_BP1Aggregator.WriteProfilingLog(fileName, rankLog); +} - for (unsigned int t = 0; t < transports.size(); ++t) +// PRIVATE FUNCTIONS +void BP1Writer::WriteDimensionsRecord( + const std::vector<size_t> &localDimensions, + const std::vector<size_t> &globalDimensions, + const std::vector<size_t> &offsets, std::vector<char> &buffer) noexcept +{ + if (offsets.empty() == true) { - rankLog += "'transport_" + std::to_string(t) + "': { "; - rankLog += "'lib': '" + transports[t]->m_Type + "', "; - - for (unsigned int i = 0; i < 3; ++i) + for (const auto &localDimension : localDimensions) { - lf_WriterTimer(rankLog, transports[t]->m_Profiler.Timers[i]); - if (i < 2) - { - rankLog += ", "; - } - else - { - rankLog += " "; - } - } - - if (t == transports.size() - 1) // last element - { - rankLog += "}"; + InsertToBuffer(buffer, + reinterpret_cast<const uint64_t *>(&localDimension)); + buffer.insert(buffer.end(), 2 * sizeof(uint64_t), 0); } - else + } + else + { + for (unsigned int d = 0; d < localDimensions.size(); ++d) { - rankLog += "},"; + InsertToBuffer(buffer, reinterpret_cast<const uint64_t *>( + &localDimensions[d])); + InsertToBuffer(buffer, reinterpret_cast<const uint64_t *>( + &globalDimensions[d])); + InsertToBuffer(buffer, + reinterpret_cast<const uint64_t *>(&offsets[d])); } } - rankLog += " },"; - - return rankLog; } -// PRIVATE FUNCTIONS void BP1Writer::WriteDimensionsRecord( - std::vector<char> &buffer, const std::vector<std::size_t> &localDimensions, - const std::vector<std::size_t> &globalDimensions, - const std::vector<std::size_t> &globalOffsets, const unsigned int skip, - const bool addType) const noexcept + const std::vector<size_t> &localDimensions, + const std::vector<size_t> &globalDimensions, + const std::vector<size_t> &offsets, const unsigned int skip, + std::vector<char> &buffer, size_t &position) noexcept { - auto lf_WriteFlaggedDim = [](std::vector<char> &buffer, const char no, - const std::size_t dimension) { - InsertToBuffer(buffer, &no); - InsertToBuffer(buffer, - reinterpret_cast<const std::uint64_t *>(&dimension)); + auto lf_WriteFlaggedDim = [](std::vector<char> &buffer, size_t &position, + const size_t dimension) { + constexpr char no = 'n'; + CopyToBuffer(buffer, position, &no); + CopyToBuffer(buffer, position, + reinterpret_cast<const uint64_t *>(&dimension)); }; // BODY Starts here - if (globalDimensions.empty()) + if (offsets.empty() == true) { - if (addType == true) + for (const auto &localDimension : localDimensions) { - constexpr char no = - 'n'; // dimension format unsigned int value (not using - // memberID for now) - for (const auto &localDimension : localDimensions) - { - lf_WriteFlaggedDim(buffer, no, localDimension); - buffer.insert(buffer.end(), skip, 0); - } - } - else - { - for (const auto &localDimension : localDimensions) - { - InsertToBuffer(buffer, reinterpret_cast<const std::uint64_t *>( - &localDimension)); - buffer.insert(buffer.end(), skip, 0); - } + lf_WriteFlaggedDim(buffer, position, localDimension); + position += skip; } } else { - if (addType == true) + for (unsigned int d = 0; d < localDimensions.size(); ++d) { - constexpr char no = 'n'; - for (unsigned int d = 0; d < localDimensions.size(); ++d) - { - lf_WriteFlaggedDim(buffer, no, localDimensions[d]); - lf_WriteFlaggedDim(buffer, no, globalDimensions[d]); - lf_WriteFlaggedDim(buffer, no, globalOffsets[d]); - } - } - else - { - for (unsigned int d = 0; d < localDimensions.size(); ++d) - { - InsertToBuffer(buffer, reinterpret_cast<const std::uint64_t *>( - &localDimensions[d])); - InsertToBuffer(buffer, reinterpret_cast<const std::uint64_t *>( - &globalDimensions[d])); - InsertToBuffer(buffer, reinterpret_cast<const std::uint64_t *>( - &globalOffsets[d])); - } + lf_WriteFlaggedDim(buffer, position, localDimensions[d]); + lf_WriteFlaggedDim(buffer, position, globalDimensions[d]); + lf_WriteFlaggedDim(buffer, position, offsets[d]); } } } void BP1Writer::WriteNameRecord(const std::string name, - std::vector<char> &buffer) const noexcept + std::vector<char> &buffer) noexcept { - const std::uint16_t length = name.length(); + const uint16_t length = name.length(); InsertToBuffer(buffer, &length); InsertToBuffer(buffer, name.c_str(), length); } +void BP1Writer::WriteNameRecord(const std::string name, + std::vector<char> &buffer, + size_t &position) noexcept +{ + const uint16_t length = name.length(); + CopyToBuffer(buffer, position, &length); + CopyToBuffer(buffer, position, name.c_str(), length); +} + BP1Index & BP1Writer::GetBP1Index(const std::string name, std::unordered_map<std::string, BP1Index> &indices, @@ -293,140 +262,196 @@ BP1Writer::GetBP1Index(const std::string name, return itName->second; } -void BP1Writer::FlattenData(BP1MetadataSet &metadataSet, - capsule::STLVector &heap) const noexcept +void BP1Writer::FlattenData() noexcept { - auto &buffer = heap.m_Data; + auto &buffer = m_Heap.m_Data; + auto &position = m_Heap.m_DataPosition; + // vars count and Length (only for PG) - CopyToBufferPosition(buffer, metadataSet.DataPGVarsCountPosition, - &metadataSet.DataPGVarsCount); - const std::uint64_t varsLength = buffer.size() - - metadataSet.DataPGVarsCountPosition - 8 - - 4; // without record itself and vars count - CopyToBufferPosition(buffer, metadataSet.DataPGVarsCountPosition + 4, - &varsLength); + CopyToBuffer(buffer, m_MetadataSet.DataPGVarsCountPosition, + &m_MetadataSet.DataPGVarsCount); + // without record itself and vars count + const uint64_t varsLength = + position - m_MetadataSet.DataPGVarsCountPosition - 8 - 4; + CopyToBuffer(buffer, m_MetadataSet.DataPGVarsCountPosition, &varsLength); // attributes (empty for now) count (4) and length (8) are zero by moving // positions in time step zero - buffer.insert(buffer.end(), 12, 0); - heap.m_DataAbsolutePosition += 12; + position += 12; + m_Heap.m_DataAbsolutePosition += 12; // Finish writing pg group length - const std::uint64_t dataPGLength = - buffer.size() - metadataSet.DataPGLengthPosition - - 8; // without record itself, 12 due to empty attributes - CopyToBufferPosition(buffer, metadataSet.DataPGLengthPosition, - &dataPGLength); - - ++metadataSet.TimeStep; - metadataSet.DataPGIsOpen = false; + // without record itself, 12 due to empty attributes + const uint64_t dataPGLength = + position - m_MetadataSet.DataPGLengthPosition - 8; + CopyToBuffer(buffer, m_MetadataSet.DataPGLengthPosition, &dataPGLength); + + ++m_MetadataSet.TimeStep; + m_MetadataSet.DataPGIsOpen = false; } -void BP1Writer::FlattenMetadata(BP1MetadataSet &metadataSet, - capsule::STLVector &heap) const noexcept +void BP1Writer::FlattenMetadata() noexcept { auto lf_IndexCountLength = - [](std::unordered_map<std::string, BP1Index> &indices, - std::uint32_t &count, std::uint64_t &length) { + [](std::unordered_map<std::string, BP1Index> &indices, uint32_t &count, + uint64_t &length) { count = indices.size(); length = 0; for (auto &indexPair : indices) // set each index length { auto &indexBuffer = indexPair.second.Buffer; - const std::uint32_t indexLength = indexBuffer.size() - 4; - CopyToBufferPosition(indexBuffer, 0, &indexLength); + const uint32_t indexLength = indexBuffer.size() - 4; + size_t indexLengthPosition = 0; + CopyToBuffer(indexBuffer, indexLengthPosition, &indexLength); length += indexBuffer.size(); // overall length } }; auto lf_FlattenIndices = - [](const std::uint32_t count, const std::uint64_t length, + [](const uint32_t count, const uint64_t length, const std::unordered_map<std::string, BP1Index> &indices, - std::vector<char> &buffer) { - InsertToBuffer(buffer, &count); - InsertToBuffer(buffer, &length); + std::vector<char> &buffer, size_t &position) { + + CopyToBuffer(buffer, position, &count); + CopyToBuffer(buffer, position, &length); for (const auto &indexPair : indices) // set each index length { const auto &indexBuffer = indexPair.second.Buffer; - InsertToBuffer(buffer, indexBuffer.data(), indexBuffer.size()); + CopyToBuffer(buffer, position, indexBuffer.data(), + indexBuffer.size()); } }; // Finish writing metadata counts and lengths // PG Index - const std::uint64_t pgCount = metadataSet.DataPGCount; - const std::uint64_t pgLength = metadataSet.PGIndex.Buffer.size(); + const uint64_t pgCount = m_MetadataSet.DataPGCount; + const uint64_t pgLength = m_MetadataSet.PGIndex.Buffer.size(); // var index count and length (total), and each index length - std::uint32_t varsCount; - std::uint64_t varsLength; - lf_IndexCountLength(metadataSet.VarsIndices, varsCount, varsLength); + uint32_t varsCount; + uint64_t varsLength; + lf_IndexCountLength(m_MetadataSet.VarsIndices, varsCount, varsLength); // attribute index count and length, and each index length - std::uint32_t attributesCount; - std::uint64_t attributesLength; - lf_IndexCountLength(metadataSet.AttributesIndices, attributesCount, + uint32_t attributesCount; + uint64_t attributesLength; + lf_IndexCountLength(m_MetadataSet.AttributesIndices, attributesCount, attributesLength); - const std::size_t footerSize = (pgLength + 16) + (varsLength + 12) + - (attributesLength + 12) + - metadataSet.MiniFooterSize; - auto &buffer = heap.m_Data; - buffer.reserve(buffer.size() + footerSize); // reserve data to fit metadata, - // must replace with growth buffer - // strategy + const size_t footerSize = (pgLength + 16) + (varsLength + 12) + + (attributesLength + 12) + + m_MetadataSet.MiniFooterSize; + + auto &buffer = m_Heap.m_Data; + auto &position = m_Heap.m_DataPosition; + + // reserve data to fit metadata, + buffer.resize(position + footerSize); + // must replace with growth buffer strategy? // write pg index - InsertToBuffer(buffer, &pgCount); - InsertToBuffer(buffer, &pgLength); - InsertToBuffer(buffer, metadataSet.PGIndex.Buffer.data(), pgLength); + CopyToBuffer(buffer, position, &pgCount); + CopyToBuffer(buffer, position, &pgLength); + CopyToBuffer(buffer, position, m_MetadataSet.PGIndex.Buffer.data(), + pgLength); + // Vars indices - lf_FlattenIndices(varsCount, varsLength, metadataSet.VarsIndices, buffer); + lf_FlattenIndices(varsCount, varsLength, m_MetadataSet.VarsIndices, buffer, + position); // Attribute indices lf_FlattenIndices(attributesCount, attributesLength, - metadataSet.AttributesIndices, buffer); + m_MetadataSet.AttributesIndices, buffer, position); // getting absolute offsets, minifooter is 28 bytes for now - const std::uint64_t offsetPGIndex = heap.m_DataAbsolutePosition; - const std::uint64_t offsetVarsIndex = offsetPGIndex + (pgLength + 16); - const std::uint64_t offsetAttributeIndex = - offsetVarsIndex + (varsLength + 12); + const uint64_t offsetPGIndex = m_Heap.m_DataAbsolutePosition; + const uint64_t offsetVarsIndex = offsetPGIndex + (pgLength + 16); + const uint64_t offsetAttributeIndex = offsetVarsIndex + (varsLength + 12); - InsertToBuffer(buffer, &offsetPGIndex); - InsertToBuffer(buffer, &offsetVarsIndex); - InsertToBuffer(buffer, &offsetAttributeIndex); + CopyToBuffer(buffer, position, &offsetPGIndex); + CopyToBuffer(buffer, position, &offsetVarsIndex); + CopyToBuffer(buffer, position, &offsetAttributeIndex); // version if (IsLittleEndian()) { - const std::uint8_t endian = 0; - InsertToBuffer(buffer, &endian); - buffer.insert(buffer.end(), 2, 0); - InsertToBuffer(buffer, &m_Version); + const uint8_t endian = 0; + CopyToBuffer(buffer, position, &endian); + position += 2; + CopyToBuffer(buffer, position, &m_Version); } else { } - heap.m_DataAbsolutePosition += footerSize; + m_Heap.m_DataAbsolutePosition += footerSize; - if (metadataSet.Log.IsActive == true) - metadataSet.Log.TotalBytes.push_back(heap.m_DataAbsolutePosition); + if (m_MetadataSet.Log.IsActive == true) + { + m_MetadataSet.Log.TotalBytes.push_back(m_Heap.m_DataAbsolutePosition); + } +} + +std::string BP1Writer::GetRankProfilingLog( + const unsigned int rank, + const std::vector<std::shared_ptr<Transport>> &transports) noexcept +{ + auto lf_WriterTimer = [](std::string &rankLog, + const profiling::Timer &timer) { + rankLog += "'" + timer.m_Process + "_" + timer.GetUnits() + "': " + + std::to_string(timer.m_ProcessTime); + }; + + // prepare string dictionary per rank + std::string rankLog("'rank_" + std::to_string(rank) + "': { "); + + auto &profiler = m_MetadataSet.Log; + rankLog += "'bytes': " + std::to_string(profiler.TotalBytes[0]) + ", "; + lf_WriterTimer(rankLog, profiler.Timers[0]); + rankLog += ", "; + + for (unsigned int t = 0; t < transports.size(); ++t) + { + rankLog += "'transport_" + std::to_string(t) + "': { "; + rankLog += "'lib': '" + transports[t]->m_Type + "', "; + + for (unsigned int i = 0; i < 3; ++i) + { + lf_WriterTimer(rankLog, transports[t]->m_Profiler.Timers[i]); + if (i < 2) + { + rankLog += ", "; + } + else + { + rankLog += " "; + } + } + + if (t == transports.size() - 1) // last element + { + rankLog += "}"; + } + else + { + rankLog += "},"; + } + } + rankLog += " },"; + + return rankLog; } //------------------------------------------------------------------------------ -// Explicit instantiaiton of public tempaltes +// Explicit instantiation of only public templates #define declare_template_instantiation(T) \ template void BP1Writer::WriteVariablePayload( \ - const Variable<T> &variable, capsule::STLVector &heap, \ - const unsigned int nthreads) const noexcept; \ + const Variable<T> &variable) noexcept; \ \ template void BP1Writer::WriteVariableMetadata( \ - const Variable<T> &variable, capsule::STLVector &heap, \ - BP1MetadataSet &metadataSet) const noexcept; + const Variable<T> &variable) noexcept; ADIOS_FOREACH_TYPE_1ARG(declare_template_instantiation) #undef declare_template_instantiation diff --git a/source/adios2/utilities/format/bp1/BP1Writer.h b/source/adios2/utilities/format/bp1/BP1Writer.h index ba898ea1e118e0d428227bbed2c074e05fb61641..afffaa187de0b13c02c86bf6513e019a1d398a0f 100644 --- a/source/adios2/utilities/format/bp1/BP1Writer.h +++ b/source/adios2/utilities/format/bp1/BP1Writer.h @@ -36,10 +36,12 @@ class BP1Writer : public BP1Base { public: - unsigned int m_Threads = 1; ///< thread operations in large array (min,max) - unsigned int m_Verbosity = 0; ///< statistics verbosity, only 0 is supported - float m_GrowthFactor = 1.5; ///< memory growth factor - const std::uint8_t m_Version = 3; ///< BP format version + /** + * Unique constructor + * @param mpiComm MPI communicator for BP1 Aggregator + * @param debug true: extra checks + */ + BP1Writer(MPI_Comm mpiComm, const bool debugMode = false); /** * Calculates the Process Index size in bytes according to the BP format, @@ -49,9 +51,10 @@ public: * @param numberOfTransports * @return size of pg index */ - std::size_t GetProcessGroupIndexSize( - const std::string name, const std::string timeStepName, - const std::size_t numberOfTransports) const noexcept; + size_t GetProcessGroupIndexSize(const std::string name, + const std::string timeStepName, + const size_t numberOfTransports) const + noexcept; /** * Writes a process group index PGIndex and list of methods (from @@ -66,10 +69,8 @@ public: * @param metadataSet */ void WriteProcessGroupIndex( - const bool isFortran, const std::string name, - const std::uint32_t processID, - const std::vector<std::shared_ptr<Transport>> &transports, - capsule::STLVector &heap, BP1MetadataSet &metadataSet) const noexcept; + const bool isFortran, const std::string name, const uint32_t processID, + const std::vector<std::shared_ptr<Transport>> &transports) noexcept; /** * Returns the estimated variable index size @@ -80,8 +81,7 @@ public: * @return variable index size */ template <class T> - std::size_t GetVariableIndexSize(const Variable<T> &variable) const - noexcept; + size_t GetVariableIndexSize(const Variable<T> &variable) const noexcept; /** * Write metadata for a given variable @@ -90,9 +90,7 @@ public: * @param metadataSet */ template <class T> - void WriteVariableMetadata(const Variable<T> &variable, - capsule::STLVector &heap, - BP1MetadataSet &metadataSet) const noexcept; + void WriteVariableMetadata(const Variable<T> &variable) noexcept; /** * Expensive part this is only for heap buffers need to adapt to vector of @@ -101,16 +99,14 @@ public: * @param buffer */ template <class T> - void WriteVariablePayload(const Variable<T> &variable, - capsule::STLVector &heap, - const unsigned int nthreads = 1) const noexcept; + void WriteVariablePayload(const Variable<T> &variable) noexcept; /** * Flattens data * @param metadataSet * @param buffer */ - void Advance(BP1MetadataSet &metadataSet, capsule::STLVector &buffer); + void Advance(); /** * Function that sets metadata (if first close) and writes to a single @@ -121,53 +117,62 @@ public: * @param isFirstClose true: metadata has been set and aggregated * @param doAggregation true: for N-to-M, false: for N-to-N */ - void Close(BP1MetadataSet &metadataSet, capsule::STLVector &heap, - Transport &transport, bool &isFirstClose, - const bool doAggregation) const noexcept; + void Close(Transport &transport, bool &isFirstClose, + const bool doAggregation) noexcept; + + void DumpProfilingLogFile( + const std::string name, const unsigned int rank, + const std::vector<std::shared_ptr<Transport>> &transports) noexcept; + +private: + /** BP format version */ + const std::uint8_t m_Version = 3; /** - * Writes the ADIOS log information (buffering, open, write and close) for a - * rank process - * @param rank current rank - * @param metadataSet contains Profile info for buffering - * @param transports contains Profile info for transport open, writes and - * close - * @return string for this rank that will be aggregated into profiling.log + * Get variable statistics + * @param variable + * @return stats */ - std::string GetRankProfilingLog( - const int rank, const BP1MetadataSet &metadataSet, - const std::vector<std::shared_ptr<Transport>> &transports) const - noexcept; + template <class T> + Stats<typename TypeInfo<T>::ValueType> + GetStats(const Variable<T> &variable) const noexcept; -private: template <class T> void WriteVariableMetadataInData( const Variable<T> &variable, - const Stats<typename TypeInfo<T>::ValueType> &stats, - capsule::STLVector &heap) const noexcept; + const Stats<typename TypeInfo<T>::ValueType> &stats) noexcept; template <class T> void WriteVariableMetadataInIndex( const Variable<T> &variable, const Stats<typename TypeInfo<T>::ValueType> &stats, const bool isNew, - BP1Index &index) const noexcept; + BP1Index &index) noexcept; template <class T> void WriteVariableCharacteristics( const Variable<T> &variable, const Stats<typename TypeInfo<T>::ValueType> &stats, - std::vector<char> &buffer, const bool addLength = false) const noexcept; + std::vector<char> &buffer) noexcept; + + template <class T> + void WriteVariableCharacteristics( + const Variable<T> &variable, + const Stats<typename TypeInfo<T>::ValueType> &stats, + std::vector<char> &buffer, size_t &position) noexcept; /** * Writes from &buffer[position]: [2 * bytes:string.length()][string.length(): * string.c_str()] - * @param name - * @param buffer - * @param position + * @param name to be written in bp file + * @param buffer metadata buffer */ void WriteNameRecord(const std::string name, - std::vector<char> &buffer) const noexcept; + std::vector<char> &buffer) noexcept; + + /** Overloaded version for data buffer */ + void WriteNameRecord(const std::string name, std::vector<char> &buffer, + size_t &position) noexcept; /** * Write a dimension record for a global variable used by @@ -181,27 +186,31 @@ private: * data * characteristic */ - void WriteDimensionsRecord(std::vector<char> &buffer, - const std::vector<std::size_t> &localDimensions, - const std::vector<std::size_t> &globalDimensions, - const std::vector<std::size_t> &offsets, + void WriteDimensionsRecord(const std::vector<size_t> &localDimensions, + const std::vector<size_t> &globalDimensions, + const std::vector<size_t> &offsets, + std::vector<char> &buffer) noexcept; + + /** Overloaded version for data buffer */ + void WriteDimensionsRecord(const std::vector<size_t> &localDimensions, + const std::vector<size_t> &globalDimensions, + const std::vector<size_t> &offsets, const unsigned int skip, - const bool addType = false) const noexcept; + std::vector<char> &buffer, + size_t &position) noexcept; - /** - * Get variable statistics - * @param variable - * @return stats - */ + /** Writes min max */ template <class T> - Stats<typename TypeInfo<T>::ValueType> - GetStats(const Variable<T> &variable) const noexcept; + void WriteBoundsRecord(const bool isScalar, const Stats<T> &stats, + uint8_t &characteristicsCounter, + std::vector<char> &buffer) noexcept; + /** Overloaded version for data buffer */ template <class T> void WriteBoundsRecord(const bool isScalar, const Stats<T> &stats, + uint8_t &characteristicsCounter, std::vector<char> &buffer, - std::uint8_t &characteristicsCounter, - const bool addLength) const noexcept; + size_t &position) noexcept; /** * Write a characteristic value record to buffer @@ -213,10 +222,17 @@ private: * @param addLength true for data, false for metadata */ template <class T> - void WriteCharacteristicRecord(const std::uint8_t characteristicID, + void WriteCharacteristicRecord(const uint8_t characteristicID, + uint8_t &characteristicsCounter, + const T &value, + std::vector<char> &buffer) noexcept; + + /** Overloaded version for data buffer */ + template <class T> + void WriteCharacteristicRecord(const uint8_t characteristicID, + uint8_t &characteristicsCounter, const T &value, std::vector<char> &buffer, - std::uint8_t &characteristicsCounter, - const bool addLength = false) const noexcept; + size_t &position) noexcept; /** * Returns corresponding index of type BP1Index, if doesn't exists creates a @@ -237,26 +253,35 @@ private: * @param metadataSet * @param buffer */ - void FlattenData(BP1MetadataSet &metadataSet, - capsule::STLVector &buffer) const noexcept; + void FlattenData() noexcept; /** * Flattens the metadata indices into a single metadata buffer in capsule * @param metadataSet * @param buffer */ - void FlattenMetadata(BP1MetadataSet &metadataSet, - capsule::STLVector &buffer) const noexcept; + void FlattenMetadata() noexcept; + + /** + * Writes the ADIOS log information (buffering, open, write and close) for a + * rank process + * @param rank current rank + * @param metadataSet contains Profile info for buffering + * @param transports contains Profile info for transport open, writes and + * close + * @return string for this rank that will be aggregated into profiling.log + */ + std::string GetRankProfilingLog( + const unsigned int rank, + const std::vector<std::shared_ptr<Transport>> &transports) noexcept; }; #define declare_template_instantiation(T) \ extern template void BP1Writer::WriteVariablePayload( \ - const Variable<T> &variable, capsule::STLVector &heap, \ - const unsigned int nthreads) const noexcept; \ + const Variable<T> &variable) noexcept; \ \ extern template void BP1Writer::WriteVariableMetadata( \ - const Variable<T> &variable, capsule::STLVector &heap, \ - BP1MetadataSet &metadataSet) const noexcept; + const Variable<T> &variable) noexcept; ADIOS_FOREACH_TYPE_1ARG(declare_template_instantiation) #undef declare_template_instantiation diff --git a/source/adios2/utilities/format/bp1/BP1Writer.tcc b/source/adios2/utilities/format/bp1/BP1Writer.tcc index 1e5f0fb4fb914256dc61b28e6ecea6bd2d720470..dab05715c04ba2f2d96320259daa3313020b1b1a 100644 --- a/source/adios2/utilities/format/bp1/BP1Writer.tcc +++ b/source/adios2/utilities/format/bp1/BP1Writer.tcc @@ -19,19 +19,18 @@ namespace format // PUBLIC template <class T> -std::size_t BP1Writer::GetVariableIndexSize(const Variable<T> &variable) const +size_t BP1Writer::GetVariableIndexSize(const Variable<T> &variable) const noexcept { // size_t indexSize = varEntryLength + memberID + lengthGroupName + // groupName + lengthVariableName + lengthOfPath + path + datatype - std::size_t indexSize = 23; // without characteristics + size_t indexSize = 23; // without characteristics indexSize += variable.m_Name.size(); // characteristics 3 and 4, check variable number of dimensions - const std::size_t dimensions = - variable.DimensionsSize(); // commas in CSV + 1 - indexSize += 28 * dimensions; // 28 bytes per dimension - indexSize += 1; // id + const size_t dimensions = variable.DimensionsSize(); + indexSize += 28 * dimensions; // 28 bytes per dimension + indexSize += 1; // id // characteristics, offset + payload offset in data indexSize += 2 * (1 + 8); @@ -57,39 +56,35 @@ std::size_t BP1Writer::GetVariableIndexSize(const Variable<T> &variable) const } template <class T> -void BP1Writer::WriteVariableMetadata(const Variable<T> &variable, - capsule::STLVector &heap, - BP1MetadataSet &metadataSet) const - noexcept +void BP1Writer::WriteVariableMetadata(const Variable<T> &variable) noexcept { Stats<typename TypeInfo<T>::ValueType> stats = GetStats(variable); - stats.TimeIndex = metadataSet.TimeStep; + stats.TimeIndex = m_MetadataSet.TimeStep; // Get new Index or point to existing index bool isNew = true; // flag to check if variable is new - BP1Index &varIndex = - GetBP1Index(variable.m_Name, metadataSet.VarsIndices, isNew); - stats.MemberID = varIndex.MemberID; + BP1Index &variableIndex = + GetBP1Index(variable.m_Name, m_MetadataSet.VarsIndices, isNew); + stats.MemberID = variableIndex.MemberID; // write metadata header in data and extract offsets - stats.Offset = heap.m_DataAbsolutePosition; - WriteVariableMetadataInData(variable, stats, heap); - stats.PayloadOffset = heap.m_DataAbsolutePosition; + stats.Offset = m_Heap.m_DataAbsolutePosition; + WriteVariableMetadataInData(variable, stats); + stats.PayloadOffset = m_Heap.m_DataAbsolutePosition; // write to metadata index - WriteVariableMetadataInIndex(variable, stats, isNew, varIndex); + WriteVariableMetadataInIndex(variable, stats, isNew, variableIndex); - ++metadataSet.DataPGVarsCount; + ++m_MetadataSet.DataPGVarsCount; } template <class T> -void BP1Writer::WriteVariablePayload(const Variable<T> &variable, - capsule::STLVector &heap, - const unsigned int nthreads) const noexcept +void BP1Writer::WriteVariablePayload(const Variable<T> &variable) noexcept { // EXPENSIVE part, might want to use threads if large, serial for now - InsertToBuffer(heap.m_Data, variable.m_AppValues, variable.TotalSize()); - heap.m_DataAbsolutePosition += variable.PayLoadSize(); + CopyToBufferThreads(m_Heap.m_Data, m_Heap.m_DataPosition, + variable.m_AppValues, variable.TotalSize(), m_Threads); + m_Heap.m_DataAbsolutePosition += variable.PayLoadSize(); } // PRIVATE @@ -102,112 +97,67 @@ BP1Writer::GetStats(const Variable<T> &variable) const noexcept if (m_Verbosity == 0) { - if (valuesSize >= 10000000) // ten million? this needs actual results - // //we can make decisions for threads - // based on valuesSize - GetMinMax(variable.m_AppValues, valuesSize, stats.Min, stats.Max, - m_Threads); // here we can add cores from constructor - else - GetMinMax(variable.m_AppValues, valuesSize, stats.Min, stats.Max); + GetMinMaxThreads(variable.m_AppValues, valuesSize, stats.Min, stats.Max, + m_Threads); } return stats; } template <class T> -void BP1Writer::WriteBoundsRecord(const bool isScalar, const Stats<T> &stats, - std::vector<char> &buffer, - std::uint8_t &characteristicsCounter, - const bool addLength) const noexcept +void BP1Writer::WriteVariableMetadataInData( + const Variable<T> &variable, + const Stats<typename TypeInfo<T>::ValueType> &stats) noexcept { - if (isScalar == true) - { - WriteCharacteristicRecord(characteristic_value, stats.Min, buffer, - characteristicsCounter, - addLength); // stats.min = stats.max = value - return; - } + auto &buffer = m_Heap.m_Data; + auto &position = m_Heap.m_DataPosition; - if (m_Verbosity == 0) // default verbose - { - WriteCharacteristicRecord(characteristic_min, stats.Min, buffer, - characteristicsCounter, addLength); - WriteCharacteristicRecord(characteristic_max, stats.Max, buffer, - characteristicsCounter, addLength); - } -} - -template <class T> -void BP1Writer::WriteCharacteristicRecord(const std::uint8_t characteristicID, - const T &value, - std::vector<char> &buffer, - std::uint8_t &characteristicsCounter, - const bool addLength) const noexcept -{ - const std::uint8_t id = characteristicID; - InsertToBuffer(buffer, &id); + // for writing length at the end + const size_t varLengthPosition = position; + position += 8; // skip var length (8) - if (addLength == true) - { - const std::uint16_t lengthOfCharacteristic = sizeof(T); // id - InsertToBuffer(buffer, &lengthOfCharacteristic); - } + CopyToBuffer(buffer, position, &stats.MemberID); - InsertToBuffer(buffer, &value); - ++characteristicsCounter; -} + WriteNameRecord(variable.m_Name, buffer, position); + position += 2; // skip path -template <class T> -void BP1Writer::WriteVariableMetadataInData( - const Variable<T> &variable, - const Stats<typename TypeInfo<T>::ValueType> &stats, - capsule::STLVector &heap) const noexcept -{ - auto &buffer = heap.m_Data; + const uint8_t dataType = GetDataType<T>(); // dataType + CopyToBuffer(buffer, position, &dataType); - // for writing length at the end - const std::size_t varLengthPosition = buffer.size(); - - buffer.insert(buffer.end(), 8, 0); // skip var length (8) - InsertToBuffer(buffer, &stats.MemberID); // memberID - WriteNameRecord(variable.m_Name, buffer); // variable name - buffer.insert(buffer.end(), 2, 0); // skip path - const std::uint8_t dataType = GetDataType<T>(); // dataType - InsertToBuffer(buffer, &dataType); constexpr char no = 'n'; // isDimension - InsertToBuffer(buffer, &no); + CopyToBuffer(buffer, position, &no); - // write variable dimensions - const std::uint8_t dimensions = variable.m_LocalDimensions.size(); - InsertToBuffer(buffer, &dimensions); // count + const uint8_t dimensions = variable.m_LocalDimensions.size(); + CopyToBuffer(buffer, position, &dimensions); // count // 27 is from 9 bytes for each: var y/n + local, var y/n + global dimension, // var y/n + global offset, changed for characteristic - std::uint16_t dimensionsLength = 27 * dimensions; - InsertToBuffer(buffer, &dimensionsLength); // length - WriteDimensionsRecord(buffer, variable.m_LocalDimensions, + uint16_t dimensionsLength = 27 * dimensions; + CopyToBuffer(buffer, position, &dimensionsLength); // length + + WriteDimensionsRecord(variable.m_LocalDimensions, variable.m_GlobalDimensions, variable.m_Offsets, 18, - true); + buffer, position); // CHARACTERISTICS - WriteVariableCharacteristics(variable, stats, buffer, true); + // FIX + WriteVariableCharacteristics(variable, stats, buffer, position); // Back to varLength including payload size - const std::uint64_t varLength = buffer.size() - varLengthPosition + - variable.PayLoadSize() - - 8; // remove its own size + // remove its own size (8) from length + const uint64_t varLength = + position - varLengthPosition + variable.PayLoadSize() - 8; - CopyToBufferPosition(buffer, varLengthPosition, &varLength); // length + size_t backPosition = varLengthPosition; + CopyToBuffer(buffer, backPosition, &varLength); - heap.m_DataAbsolutePosition += - buffer.size() - varLengthPosition; // update absolute position to be - // used as payload position + m_Heap.m_DataAbsolutePosition += position - varLengthPosition; } template <class T> void BP1Writer::WriteVariableMetadataInIndex( const Variable<T> &variable, const Stats<typename TypeInfo<T>::ValueType> &stats, const bool isNew, - BP1Index &index) const noexcept + BP1Index &index) noexcept { auto &buffer = index.Buffer; @@ -229,75 +179,191 @@ void BP1Writer::WriteVariableMetadataInIndex( } else // update characteristics sets count { - const std::size_t characteristicsSetsCountPosition = - 15 + variable.m_Name.size(); - ++index.Count; - CopyToBufferPosition(buffer, characteristicsSetsCountPosition, - &index.Count); // test + if (m_Verbosity == 0) + { + ++index.Count; + size_t setsCountPosition = 15 + variable.m_Name.size(); + CopyToBuffer(buffer, setsCountPosition, &index.Count); + } } WriteVariableCharacteristics(variable, stats, buffer); } +template <class T> +void BP1Writer::WriteBoundsRecord(const bool isScalar, const Stats<T> &stats, + std::uint8_t &characteristicsCounter, + std::vector<char> &buffer) noexcept +{ + if (isScalar == true) + { + // stats.min = stats.max = value, need to test + WriteCharacteristicRecord(characteristic_value, characteristicsCounter, + stats.Min, buffer); + } + else + { + if (m_Verbosity == 0) // default verbose + { + WriteCharacteristicRecord( + characteristic_min, characteristicsCounter, stats.Min, buffer); + + WriteCharacteristicRecord( + characteristic_max, characteristicsCounter, stats.Max, buffer); + } + } +} + +template <class T> +void BP1Writer::WriteBoundsRecord(const bool isScalar, const Stats<T> &stats, + std::uint8_t &characteristicsCounter, + std::vector<char> &buffer, + size_t &position) noexcept +{ + if (isScalar == true) + { + // stats.min = stats.max = value, need to test + WriteCharacteristicRecord(characteristic_value, characteristicsCounter, + stats.Min, buffer, position); + } + else + { + if (m_Verbosity == 0) // default min and max only + { + WriteCharacteristicRecord(characteristic_min, + characteristicsCounter, stats.Min, buffer, + position); + + WriteCharacteristicRecord(characteristic_max, + characteristicsCounter, stats.Max, buffer, + position); + } + } +} + +template <class T> +void BP1Writer::WriteCharacteristicRecord(const std::uint8_t characteristicID, + std::uint8_t &characteristicsCounter, + const T &value, + std::vector<char> &buffer) noexcept +{ + const std::uint8_t id = characteristicID; + InsertToBuffer(buffer, &id); + InsertToBuffer(buffer, &value); + ++characteristicsCounter; +} + +template <class T> +void BP1Writer::WriteCharacteristicRecord(const uint8_t characteristicID, + uint8_t &characteristicsCounter, + const T &value, + std::vector<char> &buffer, + size_t &position) noexcept +{ + const std::uint8_t id = characteristicID; + CopyToBuffer(buffer, position, &id); + + const std::uint16_t lengthOfCharacteristic = sizeof(T); + CopyToBuffer(buffer, position, &lengthOfCharacteristic); + + CopyToBuffer(buffer, position, &value); + ++characteristicsCounter; +} + template <class T> void BP1Writer::WriteVariableCharacteristics( const Variable<T> &variable, const Stats<typename TypeInfo<T>::ValueType> &stats, - std::vector<char> &buffer, const bool addLength) const noexcept + std::vector<char> &buffer) noexcept { - const std::size_t characteristicsCountPosition = - buffer.size(); // very important to track as writer is going back to - // this position - buffer.insert(buffer.end(), 5, - 0); // skip characteristics count(1) + length (4) - std::uint8_t characteristicsCounter = 0; + // going back at the end + const size_t characteristicsCountPosition = buffer.size(); + // skip characteristics count(1) + length (4) + buffer.insert(buffer.end(), 5, 0); + uint8_t characteristicsCounter = 0; // DIMENSIONS - std::uint8_t characteristicID = characteristic_dimensions; + uint8_t characteristicID = characteristic_dimensions; InsertToBuffer(buffer, &characteristicID); - const std::uint8_t dimensions = variable.m_LocalDimensions.size(); - - if (addLength == true) - { - const std::int16_t lengthOfDimensionsCharacteristic = - 24 * dimensions + 3; // 24 = 3 local, global, offset x 8 bytes/each - InsertToBuffer(buffer, &lengthOfDimensionsCharacteristic); - } - + const uint8_t dimensions = variable.m_LocalDimensions.size(); InsertToBuffer(buffer, &dimensions); // count - const std::uint16_t dimensionsLength = 24 * dimensions; + const uint16_t dimensionsLength = 24 * dimensions; InsertToBuffer(buffer, &dimensionsLength); // length - WriteDimensionsRecord(buffer, variable.m_LocalDimensions, + WriteDimensionsRecord(variable.m_LocalDimensions, + variable.m_GlobalDimensions, variable.m_Offsets, + buffer); + ++characteristicsCounter; + + WriteBoundsRecord(variable.m_IsScalar, stats, characteristicsCounter, + buffer); + + WriteCharacteristicRecord(characteristic_time_index, characteristicsCounter, + stats.TimeIndex, buffer); + + WriteCharacteristicRecord(characteristic_offset, characteristicsCounter, + stats.Offset, buffer); + + WriteCharacteristicRecord(characteristic_payload_offset, + characteristicsCounter, stats.PayloadOffset, + buffer); + // END OF CHARACTERISTICS + + // Back to characteristics count and length + size_t backPosition = characteristicsCountPosition; + CopyToBuffer(buffer, backPosition, &characteristicsCounter); // count (1) + + // remove its own length (4) + characteristic counter (1) + const uint32_t characteristicsLength = + buffer.size() - characteristicsCountPosition - 4 - 1; + + CopyToBuffer(buffer, backPosition, &characteristicsLength); // length +} + +template <class T> +void BP1Writer::WriteVariableCharacteristics( + const Variable<T> &variable, + const Stats<typename TypeInfo<T>::ValueType> &stats, + std::vector<char> &buffer, size_t &position) noexcept +{ + // going back at the end + const size_t characteristicsCountPosition = position; + // skip characteristics count(1) + length (4) + position += 5; + uint8_t characteristicsCounter = 0; + + // DIMENSIONS + uint8_t characteristicID = characteristic_dimensions; + CopyToBuffer(buffer, position, &characteristicID); + const uint8_t dimensions = variable.m_LocalDimensions.size(); + + // 24 = 3 local, global, offset x 8 bytes/each + const int16_t lengthOfDimensionsCharacteristic = 24 * dimensions + 3; + CopyToBuffer(buffer, position, &lengthOfDimensionsCharacteristic); + + CopyToBuffer(buffer, position, &dimensions); // count + const uint16_t dimensionsLength = 24 * dimensions; + CopyToBuffer(buffer, position, &dimensionsLength); // length + WriteDimensionsRecord(variable.m_LocalDimensions, variable.m_GlobalDimensions, variable.m_Offsets, 16, - addLength); + buffer, position); ++characteristicsCounter; // VALUE for SCALAR or STAT min, max for ARRAY - WriteBoundsRecord(variable.m_IsScalar, stats, buffer, - characteristicsCounter, addLength); + WriteBoundsRecord(variable.m_IsScalar, stats, characteristicsCounter, + buffer, position); // TIME INDEX - WriteCharacteristicRecord(characteristic_time_index, stats.TimeIndex, - buffer, characteristicsCounter, addLength); - - if (addLength == false) // only in metadata offset and payload offset - { - WriteCharacteristicRecord(characteristic_offset, stats.Offset, buffer, - characteristicsCounter); - WriteCharacteristicRecord(characteristic_payload_offset, - stats.PayloadOffset, buffer, - characteristicsCounter); - } + WriteCharacteristicRecord(characteristic_time_index, characteristicsCounter, + stats.TimeIndex, buffer, position); // END OF CHARACTERISTICS // Back to characteristics count and length - CopyToBufferPosition(buffer, characteristicsCountPosition, - &characteristicsCounter); // count (1) - const std::uint32_t characteristicsLength = - buffer.size() - characteristicsCountPosition - 4 - - 1; // remove its own length (4) + characteristic counter (1) - - CopyToBufferPosition(buffer, characteristicsCountPosition + 1, - &characteristicsLength); // length + size_t backPosition = characteristicsCountPosition; + CopyToBuffer(buffer, backPosition, &characteristicsCounter); + + // remove its own length (4) + characteristic counter (1) + const uint32_t characteristicsLength = + position - characteristicsCountPosition - 4 - 1; + CopyToBuffer(buffer, backPosition, &characteristicsLength); } } // end namespace format