diff --git a/examples/hello/CMakeLists.txt b/examples/hello/CMakeLists.txt index d01f1256fc13d301d7616c0ddf55547b8fce2b23..3795d3ab7fd5fd5849237c5ad6bff3de3e938923 100644 --- a/examples/hello/CMakeLists.txt +++ b/examples/hello/CMakeLists.txt @@ -5,6 +5,7 @@ add_subdirectory(bpWriter) add_subdirectory(bpTimeWriter) +add_subdirectory(bpFlushWriter) if(ADIOS_USE_ADIOS1) add_subdirectory(adios1Writer) diff --git a/examples/hello/bpFlushWriter/CMakeLists.txt b/examples/hello/bpFlushWriter/CMakeLists.txt new file mode 100644 index 0000000000000000000000000000000000000000..2061028eddfcac5742d798db434168de9cae19d5 --- /dev/null +++ b/examples/hello/bpFlushWriter/CMakeLists.txt @@ -0,0 +1,15 @@ +#------------------------------------------------------------------------------# +# Distributed under the OSI-approved Apache License, Version 2.0. See +# accompanying file Copyright.txt for details. +#------------------------------------------------------------------------------# + +if(ADIOS_USE_MPI) + find_package(MPI COMPONENTS C REQUIRED) + + add_executable(hello_bpFlushWriter helloBPFlushWriter.cpp) + target_include_directories(hello_bpFlushWriter PRIVATE ${MPI_C_INCLUDE_PATH}) + target_link_libraries(hello_bpFlushWriter ${MPI_C_LIBRARIES}) +else() + add_executable(hello_bpFlushWriter helloBPFlushWriter_nompi.cpp) +endif() +target_link_libraries(hello_bpFlushWriter adios2) diff --git a/examples/hello/bpFlushWriter/helloBPFlushWriter.cpp b/examples/hello/bpFlushWriter/helloBPFlushWriter.cpp new file mode 100644 index 0000000000000000000000000000000000000000..2342571c11ae8b96c1be6cdf551dfa51945419e2 --- /dev/null +++ b/examples/hello/bpFlushWriter/helloBPFlushWriter.cpp @@ -0,0 +1,85 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * helloBPFlushWriter.cpp: Example that tests buffer overflow forcing a flush to + * transports when writing a large variable in independent N-to-N mode. This + * will have performance penalties, but it's safer. + * + * Created on: Feb 16, 2017 + * Author: William F Godoy godoywf@ornl.gov + */ + +#include <ios> //std::ios_base::failure +#include <iostream> //std::cout +#include <mpi.h> +#include <stdexcept> //std::invalid_argument std::exception +#include <vector> + +#include <adios2.h> + +int main(int argc, char *argv[]) +{ + MPI_Init(&argc, &argv); + int rank, size; + MPI_Comm_rank(MPI_COMM_WORLD, &rank); + MPI_Comm_size(MPI_COMM_WORLD, &size); + + /** Application variable */ + std::vector<float> myFloats(100); + const std::size_t Nx = myFloats.size(); + + try + { + /** ADIOS class factory of IO class objects, DebugON is recommended */ + adios::ADIOS adios(MPI_COMM_WORLD, adios::DebugON); + + /*** IO class object: settings and factory of Settings: Variables, + * Parameters, Transports, and Execution: Engines */ + adios::IO &bpIO = adios.DeclareIO("BPFile_N2N_Flush"); + bpIO.SetEngine("BPFileWriter"); + // bpIO.SetParameters( ) + + /** global array : name, { shape (total) }, { start (local) }, { count + * (local) }, all are constant dimensions */ + adios::Variable<float> &bpFloats = bpIO.DefineVariable<float>( + "bpFloats", {size * Nx}, {rank * Nx}, {Nx}, adios::ConstantDims); + + /** Engine derived class, spawned to start IO operations */ + auto bpWriter = bpIO.Open("myVectorFlush.bp", adios::OpenMode::Write); + + if (!bpWriter) + { + throw std::ios_base::failure( + "ERROR: bpWriter not created at Open\n"); + } + + /** Write variable for buffering */ + bpWriter->Write<float>(bpFloats, myFloats.data()); + + /** Create bp file, engine becomes unreachable after this*/ + bpWriter->Close(); + } + catch (std::invalid_argument &e) + { + std::cout << "Invalid argument exception, STOPPING PROGRAM from rank " + << rank << "\n"; + std::cout << e.what() << "\n"; + } + catch (std::ios_base::failure &e) + { + std::cout + << "IO System base failure exception, STOPPING PROGRAM from rank " + << rank << "\n"; + std::cout << e.what() << "\n"; + } + catch (std::exception &e) + { + std::cout << "Exception, STOPPING PROGRAM from rank " << rank << "\n"; + std::cout << e.what() << "\n"; + } + + MPI_Finalize(); + + return 0; +} diff --git a/examples/hello/bpFlushWriter/helloBPFlushWriter_nompi.cpp b/examples/hello/bpFlushWriter/helloBPFlushWriter_nompi.cpp new file mode 100644 index 0000000000000000000000000000000000000000..b9ba169e39342e8faec177d3c09b4e63f48e0ab2 --- /dev/null +++ b/examples/hello/bpFlushWriter/helloBPFlushWriter_nompi.cpp @@ -0,0 +1,70 @@ +/* + * Distributed under the OSI-approved Apache License, Version 2.0. See + * accompanying file Copyright.txt for details. + * + * helloBPWriter_nompi.cpp sequential non-mpi version of helloBPWriter + * + * Created on: Jan 9, 2017 + * Author: William F Godoy godoywf@ornl.gov + */ + +#include <ios> //std::ios_base::failure +#include <iostream> //std::cout +#include <stdexcept> //std::invalid_argument std::exception +#include <vector> + +#include <adios2.h> + +int main(int argc, char *argv[]) +{ + /** Application variable */ + std::vector<float> myFloats = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9}; + const std::size_t Nx = myFloats.size(); + + try + { + /** ADIOS class factory of IO class objects, DebugON is recommended */ + adios::ADIOS adios(adios::DebugON); + + /*** IO class object: settings and factory of Settings: Variables, + * Parameters, Transports, and Execution: Engines */ + adios::IO &bpIO = adios.DeclareIO("BPFile_N2N"); + + /** global array: name, { shape (total dimensions) }, { start (local) }, + * { count (local) }, all are constant dimensions */ + adios::Variable<float> &bpFloats = bpIO.DefineVariable<float>( + "bpFloats", {}, {}, {Nx}, adios::ConstantDims); + + /** Engine derived class, spawned to start IO operations */ + auto bpWriter = bpIO.Open("myVector.bp", adios::OpenMode::Write); + + if (!bpWriter) + { + throw std::ios_base::failure( + "ERROR: bpWriter not created at Open\n"); + } + + /** Write variable for buffering */ + bpWriter->Write<float>(bpFloats, myFloats.data()); + + /** Create bp file, engine becomes unreachable after this*/ + bpWriter->Close(); + } + catch (std::invalid_argument &e) + { + std::cout << "Invalid argument exception, STOPPING PROGRAM\n"; + std::cout << e.what() << "\n"; + } + catch (std::ios_base::failure &e) + { + std::cout << "IO System base failure exception, STOPPING PROGRAM\n"; + std::cout << e.what() << "\n"; + } + catch (std::exception &e) + { + std::cout << "Exception, STOPPING PROGRAM from rank\n"; + std::cout << e.what() << "\n"; + } + + return 0; +} diff --git a/source/adios2/ADIOSTypes.h b/source/adios2/ADIOSTypes.h index 455b7a42b7715de48a93f0eb5fe4e93dd2ddb554..15e79ceed1e51859613e542140eecae9e7963075 100644 --- a/source/adios2/ADIOSTypes.h +++ b/source/adios2/ADIOSTypes.h @@ -144,7 +144,13 @@ enum class SelectionType const std::string DefaultFileLibrary("POSIX"); const std::string DefaultTimeUnit("Microseconds"); constexpr TimeUnit DefaultTimeUnitEnum(TimeUnit::Microseconds); -constexpr size_t DefaultBufferSize(16384); ///< in bytes + +/** default initial bp buffer size, 16Kb, in bytes */ +constexpr size_t DefaultInitialBufferSize(16384); +/** default maximum bp buffer size, 16Mb, in bytes */ +constexpr size_t DefaultMaxBufferSize(16777216); +/** default buffer growth factor (from STL vector = 2.) */ +constexpr float DefaultBufferGrowthFactor(2.); // adios alias values and types constexpr bool DebugON = true; diff --git a/source/adios2/CMakeLists.txt b/source/adios2/CMakeLists.txt index 084fe733e90df32ce8647ef96ec58103ddde151d..004dd81364dc1e82d3d60bb4f8b387c11aa1e7d3 100644 --- a/source/adios2/CMakeLists.txt +++ b/source/adios2/CMakeLists.txt @@ -17,7 +17,6 @@ add_library(adios2 #helper helper/adiosMath.cpp - helper/adiosMemory.cpp helper/adiosString.cpp helper/adiosSystem.cpp helper/adiosType.cpp diff --git a/source/adios2/engine/bp/BPFileWriter.cpp b/source/adios2/engine/bp/BPFileWriter.cpp index ee8c74883e564ad8d7188ea821ff49d9450251a2..3d1e2fe74267d233c80437f3badf42d5f134c139 100644 --- a/source/adios2/engine/bp/BPFileWriter.cpp +++ b/source/adios2/engine/bp/BPFileWriter.cpp @@ -70,10 +70,13 @@ void BPFileWriter::Close(const int transportIndex) // close bp buffer by flattening data and metadata m_BP1Writer.Close(); - // send data to corresponding transport - m_TransportsManager.CloseFiles(transportIndex, - m_BP1Writer.m_HeapBuffer.m_Data.data(), - m_BP1Writer.m_HeapBuffer.m_DataPosition); + // send data to corresponding transports + m_TransportsManager.WriteFiles(m_BP1Writer.m_HeapBuffer.GetData(), + m_BP1Writer.m_HeapBuffer.m_DataPosition, + transportIndex); + + // close them + m_TransportsManager.CloseFiles(transportIndex); if (m_BP1Writer.m_Profiler.IsActive) { diff --git a/source/adios2/engine/bp/BPFileWriter.tcc b/source/adios2/engine/bp/BPFileWriter.tcc index baf8999269438db0c6c34e39c950e8e5f1b2cc07..b9de2e96d8c83197e3926f9b6d76636d0c2e5d7a 100644 --- a/source/adios2/engine/bp/BPFileWriter.tcc +++ b/source/adios2/engine/bp/BPFileWriter.tcc @@ -10,6 +10,8 @@ #include "BPFileWriter.h" +#include <iostream> //TODO temporary + namespace adios { @@ -27,29 +29,39 @@ void BPFileWriter::DoWriteCommon(Variable<T> &variable, const T *values) m_IO.m_HostLanguage, m_TransportsManager.GetTransportsTypes()); } - // pre-calculate new metadata and payload sizes - // m_TransportFlush = CheckBufferAllocation( - // m_BP1Writer.GetVariableIndexSize( variable ) + - // variable.PayLoadSize(), - // m_GrowthFactor, - // m_MaxBufferSize, - // m_Buffer.m_Data ); + const size_t oldSize = m_BP1Writer.m_HeapBuffer.GetDataSize(); - // WRITE INDEX to data buffer and metadata structure (in memory)// - m_BP1Writer.WriteVariableMetadata(variable); + format::BP1Base::ResizeResult resizeResult = + m_BP1Writer.ResizeBuffer(variable); - if (m_DoTransportFlush) // in batches - { - // flatten data + const size_t newSize = m_BP1Writer.m_HeapBuffer.GetDataSize(); - // flush to transports + // if (resizeResult == format::BP1Base::ResizeResult::Success) + // { + // std::cout << "Old buffer size: " << oldSize << "\n"; + // std::cout << "New buffer size: " << newSize << "\n"; + // } - // reset relative positions to zero, update absolute position - } - else // Write data to buffer + if (resizeResult == format::BP1Base::ResizeResult::Flush) { - m_BP1Writer.WriteVariablePayload(variable); + m_BP1Writer.Flush(); + auto &heapBuffer = m_BP1Writer.m_HeapBuffer; + + m_TransportsManager.WriteFiles(heapBuffer.GetData(), + heapBuffer.m_DataPosition); + // set relative position to zero + heapBuffer.m_DataPosition = 0; + // reset buffer to zero values + heapBuffer.m_Data.assign(heapBuffer.GetDataSize(), '\0'); + + m_BP1Writer.WriteProcessGroupIndex( + m_IO.m_HostLanguage, m_TransportsManager.GetTransportsTypes()); } + + // WRITE INDEX to data buffer and metadata structure (in memory)// + m_BP1Writer.WriteVariableMetadata(variable); + m_BP1Writer.WriteVariablePayload(variable); + variable.m_AppValues = nullptr; // not needed after write } diff --git a/source/adios2/helper/adiosMath.cpp b/source/adios2/helper/adiosMath.cpp index 91f27372d296fd98427db2bf243b02b08c850b78..4ee27a2ff4844e5d2d611342a8cf65afc8f3c325 100644 --- a/source/adios2/helper/adiosMath.cpp +++ b/source/adios2/helper/adiosMath.cpp @@ -10,6 +10,8 @@ #include "adiosMath.h" +#include <cmath> + namespace adios { @@ -36,4 +38,25 @@ bool CheckIndexRange(const int index, const int upperLimit, return inRange; } +size_t NextExponentialSize(const size_t requiredSize, const size_t currentSize, + const float growthFactor) noexcept +{ + if (currentSize >= requiredSize) + { + return currentSize; + } + + const double growthFactorDouble = static_cast<double>(growthFactor); + + const double numerator = std::log(static_cast<double>(requiredSize) / + static_cast<double>(currentSize)); + const double denominator = std::log(growthFactorDouble); + const double n = std::ceil(numerator / denominator); + + const size_t nextExponentialSize = static_cast<size_t>( + std::ceil(std::pow(growthFactorDouble, n) * currentSize)); + + return nextExponentialSize; +} + } // end namespace adios diff --git a/source/adios2/helper/adiosMath.h b/source/adios2/helper/adiosMath.h index 69d941b2d502f409ae1c4bfc9bfb3d96ece1c981..1c0967cff97a72553098d2e6dd5f9dc9afc4271b 100644 --- a/source/adios2/helper/adiosMath.h +++ b/source/adios2/helper/adiosMath.h @@ -88,6 +88,17 @@ void GetMinMaxThreads(const std::complex<T> *values, const size_t size, T &min, bool CheckIndexRange(const int index, const int upperLimit, const int lowerLimit = 0) noexcept; +/** + * Returns the appropriate size larger than requiredSize + * @param requiredSize + * @param currentSize + * @param growthFactor larger than 1. (typically 1.5 or 2. ) + * @return next currentSize * growthFactor^n (n is a signed integer) larger than + * requiredSize + */ +size_t NextExponentialSize(const size_t requiredSize, const size_t currentSize, + const float growthFactor) noexcept; + } // end namespace adios #include "adiosMath.inl" diff --git a/source/adios2/helper/adiosMemory.cpp b/source/adios2/helper/adiosMemory.cpp deleted file mode 100644 index 5420b698d975b72c13105b86d54d5aa77eb5fa50..0000000000000000000000000000000000000000 --- a/source/adios2/helper/adiosMemory.cpp +++ /dev/null @@ -1,49 +0,0 @@ -/* - * Distributed under the OSI-approved Apache License, Version 2.0. See - * accompanying file Copyright.txt for details. - * - * adiosMemory.cpp - * - * Created on: May 17, 2017 - * Author: William F Godoy godoywf@ornl.gov - */ - -#include "adiosMemory.h" - -namespace adios -{ - -int GrowBuffer(const size_t incomingDataSize, const float growthFactor, - std::vector<char> &buffer, const size_t position) -{ - const size_t currentCapacity = buffer.capacity(); - const size_t availableSpace = currentCapacity - position; - const double gf = static_cast<double>(growthFactor); - - if (incomingDataSize < availableSpace) - { - return 0; - } - - const size_t neededCapacity = incomingDataSize + position; - const double numerator = std::log(static_cast<double>(neededCapacity) / - static_cast<double>(currentCapacity)); - const double denominator = std::log(gf); - - const double n = std::ceil(numerator / denominator); - const size_t newSize = - static_cast<size_t>(std::ceil(std::pow(gf, n) * currentCapacity)); - - try - { - buffer.resize(newSize); - } - catch (std::bad_alloc &e) - { - return -1; - } - - return 1; -} - -} // end namespace adios diff --git a/source/adios2/helper/adiosMemory.h b/source/adios2/helper/adiosMemory.h index aa2c6f785a95ecebae1f31db80936ffc6401cd86..76739251ac68115a4c87bdd428be6fdf24772529 100644 --- a/source/adios2/helper/adiosMemory.h +++ b/source/adios2/helper/adiosMemory.h @@ -83,19 +83,6 @@ void MemcpyToBufferThreads(std::vector<char> &buffer, size_t &position, const T *source, size_t size, const unsigned int threads = 1); -/** - * Grows a buffer by a factor of n . growthFactor . currentCapacity to - * accommodate for incomingDataSize - * @param incomingDataSize size of new data required to be stored in buffer - * @param growthFactor buffer grows in multiples of the growth buffer - * @param buffer to be resized - * @param position, current buffer position - * @return -1: failed to allocate (bad_alloc), 0: didn't have to allocate - * (enough space), 1: successful allocation - */ -int GrowBuffer(const size_t incomingDataSize, const float growthFactor, - std::vector<char> &buffer, const size_t position); - } // end namespace adios #include "adiosMemory.inl" diff --git a/source/adios2/toolkit/capsule/Capsule.cpp b/source/adios2/toolkit/capsule/Capsule.cpp index 2388c03fdf73556dc8658ae247eddf60ab8e1cd6..22568b63b6e8e693d149a1ef73dfac172b6a2a0b 100644 --- a/source/adios2/toolkit/capsule/Capsule.cpp +++ b/source/adios2/toolkit/capsule/Capsule.cpp @@ -18,6 +18,11 @@ Capsule::Capsule(const std::string type, const bool debugMode) { } +size_t Capsule::GetAvailableDataSize() const +{ + return GetDataSize() - m_DataPosition; +} + void Capsule::ResizeData(size_t /*size*/) {} void Capsule::ResizeMetadata(size_t /*size*/) {} diff --git a/source/adios2/toolkit/capsule/Capsule.h b/source/adios2/toolkit/capsule/Capsule.h index fc90bd09023e82034b36dec61b3f70843aaebe64..66e3aa8f1472b59888cce2bc179965d6291d61bc 100644 --- a/source/adios2/toolkit/capsule/Capsule.h +++ b/source/adios2/toolkit/capsule/Capsule.h @@ -59,6 +59,8 @@ public: virtual size_t GetDataSize() const = 0; ///< data buffer memory size virtual size_t GetMetadataSize() const = 0; ///< metadata buffer memory size + size_t GetAvailableDataSize() const; + virtual void ResizeData(size_t size); ///< resize data buffer virtual void ResizeMetadata(size_t size); ///< resize metadata buffer diff --git a/source/adios2/toolkit/format/bp1/BP1Base.cpp b/source/adios2/toolkit/format/bp1/BP1Base.cpp index 81e71392d7e88b84a59f06cc15931c9fd6ee16ed..95f7dcff06348f4c264c556e930b46a3800d8ffe 100644 --- a/source/adios2/toolkit/format/bp1/BP1Base.cpp +++ b/source/adios2/toolkit/format/bp1/BP1Base.cpp @@ -27,7 +27,7 @@ BP1Base::BP1Base(MPI_Comm mpiComm, const bool debugMode) void BP1Base::InitParameters(const Params ¶meters) { // flags for defaults that require constructors - bool useDefaultBufferSize = true; + bool useDefaultInitialBufferSize = true; bool useDefaultProfileUnits = true; for (const auto &pair : parameters) @@ -44,14 +44,14 @@ void BP1Base::InitParameters(const Params ¶meters) InitParameterProfileUnits(value); useDefaultProfileUnits = false; } - else if (key == "BufferGrowth") + else if (key == "BufferGrowthFactor") { InitParameterBufferGrowth(value); } - else if (key == "BufferSize") + else if (key == "InitialBufferSize") { InitParameterInitBufferSize(value); - useDefaultBufferSize = false; + useDefaultInitialBufferSize = false; } else if (key == "MaxBufferSize") { @@ -73,9 +73,9 @@ void BP1Base::InitParameters(const Params ¶meters) m_Profiler.Bytes.emplace("buffering", 0); } - if (useDefaultBufferSize) + if (useDefaultInitialBufferSize) { - m_HeapBuffer.ResizeData(DefaultBufferSize); + m_HeapBuffer.ResizeData(DefaultInitialBufferSize); } } @@ -122,11 +122,11 @@ std::string BP1Base::GetBPName(const std::string &name) const noexcept // PROTECTED void BP1Base::InitParameterProfile(const std::string value) { - if (value == "off") + if (value == "off" || value == "Off") { m_Profiler.IsActive = false; } - else if (value == "on") + else if (value == "on" || value == "On") { m_Profiler.IsActive = true; // default } @@ -189,8 +189,8 @@ void BP1Base::InitParameterInitBufferSize(const std::string value) { const std::string errorMessage( "ERROR: couldn't convert value of init_buffer_size IO " - "SetParameter, valid syntax: init_buffer_size=10Gb, " - "init_buffer_size=1000Mb, init_buffer_size=16Kb (minimum default), " + "SetParameter, valid syntax: InitialBufferSize=10Gb, " + "InitialBufferSize=1000Mb, InitialBufferSize=16Kb (minimum default), " " in call to Open"); if (m_DebugMode) @@ -204,7 +204,7 @@ void BP1Base::InitParameterInitBufferSize(const std::string value) const std::string number(value.substr(0, value.size() - 2)); const std::string units(value.substr(value.size() - 2)); const size_t factor = BytesFactor(units, m_DebugMode); - size_t bufferSize = DefaultBufferSize; // from ADIOSTypes.h + size_t bufferSize = DefaultInitialBufferSize; // from ADIOSTypes.h if (m_DebugMode) { @@ -218,7 +218,7 @@ void BP1Base::InitParameterInitBufferSize(const std::string value) success = false; } - if (!success || bufferSize < 16 * 1024) // 16384b + if (!success || bufferSize < DefaultInitialBufferSize) // 16384b { throw std::invalid_argument(errorMessage); } @@ -339,5 +339,24 @@ BP1Base::GetTransportIDs(const std::vector<std::string> &transportsTypes) const return transportsIDs; } +size_t BP1Base::GetProcessGroupIndexSize(const std::string name, + const std::string timeStepName, + const size_t transportsSize) const + noexcept +{ + // pgIndex + list of methods (transports) + size_t pgSize = + (name.length() + timeStepName.length() + 23) + (3 + transportsSize); + + return pgSize; +} + +#define declare_template_instantiation(T) \ + template BP1Base::ResizeResult BP1Base::ResizeBuffer( \ + const Variable<T> &variable); + +ADIOS2_FOREACH_TYPE_1ARG(declare_template_instantiation) +#undef declare_template_instantiation + } // end namespace format } // end namespace adios diff --git a/source/adios2/toolkit/format/bp1/BP1Base.h b/source/adios2/toolkit/format/bp1/BP1Base.h index d2e9024fcd3b529d4b261b77faa36e6819d3e3cd..ee61870c4b4e3e75f0efdf9ff5037bdd7191d6c1 100644 --- a/source/adios2/toolkit/format/bp1/BP1Base.h +++ b/source/adios2/toolkit/format/bp1/BP1Base.h @@ -20,6 +20,7 @@ #include "adios2/ADIOSMPICommOnly.h" #include "adios2/ADIOSMacros.h" #include "adios2/ADIOSTypes.h" +#include "adios2/core/Variable.h" #include "adios2/toolkit/capsule/heap/STLVector.h" #include "adios2/toolkit/format/bp1/BP1Aggregator.h" #include "adios2/toolkit/format/bp1/BP1Structs.h" @@ -44,10 +45,10 @@ public: capsule::STLVector m_HeapBuffer; /** memory growth factor,s set by the user */ - float m_GrowthFactor = 1.5; + float m_GrowthFactor = DefaultBufferGrowthFactor; /** max buffer size, set by the user */ - size_t m_MaxBufferSize = 0; + size_t m_MaxBufferSize = DefaultMaxBufferSize; /** contains bp1 format metadata indices*/ BP1MetadataSet m_MetadataSet; @@ -105,17 +106,26 @@ public: */ std::string GetBPName(const std::string &name) const noexcept; - /** - * Return type of the CheckAllocation function. - */ + /** Return type of the CheckAllocation function. */ enum class ResizeResult { - FAILURE, //!< FAILURE, caught a std::bad_alloc - UNCHANGED, //!< UNCHANGED, no need to resize (sufficient capacity) - SUCCESS, //!< SUCCESS, resize was successful - FLUSH //!< FLUSH, need to flush to transports for current variable + Failure, //!< FAILURE, caught a std::bad_alloc + Unchanged, //!< UNCHANGED, no need to resize (sufficient capacity) + Success, //!< SUCCESS, resize was successful + Flush //!< FLUSH, need to flush to transports for current variable }; + /** + * @param variable + * @return + * -1: allocation failed, + * 0: no allocation needed, + * 1: reallocation is sucessful + * 2: need a transport flush + */ + template <class T> + ResizeResult ResizeBuffer(const Variable<T> &variable); + protected: /** might be used in large payload copies to buffer */ unsigned int m_Threads = 1; @@ -275,8 +285,35 @@ protected: std::vector<uint8_t> GetTransportIDs(const std::vector<std::string> &transportsTypes) const noexcept; + + /** + * Calculates the Process Index size in bytes according to the BP + * format, + * including list of method with no parameters (for now) + * @param name + * @param timeStepName + * @param transportsSize + * @return size of pg index + */ + size_t GetProcessGroupIndexSize(const std::string name, + const std::string timeStepName, + const size_t transportsSize) const noexcept; + + /** + * Returns the estimated variable index size + * @param variable + */ + template <class T> + size_t GetVariableIndexSize(const Variable<T> &variable) const noexcept; }; +#define declare_template_instantiation(T) \ + extern template BP1Base::ResizeResult BP1Base::ResizeBuffer( \ + const Variable<T> &variable); + +ADIOS2_FOREACH_TYPE_1ARG(declare_template_instantiation) +#undef declare_template_instantiation + } // end namespace format } // end namespace adios diff --git a/source/adios2/toolkit/format/bp1/BP1Base.tcc b/source/adios2/toolkit/format/bp1/BP1Base.tcc index dfd39f6363985ac63b4055464065111c1d84d9bc..509861567737a0fb8bf5aa3370d13b9e2d55fde4 100644 --- a/source/adios2/toolkit/format/bp1/BP1Base.tcc +++ b/source/adios2/toolkit/format/bp1/BP1Base.tcc @@ -112,6 +112,92 @@ int8_t BP1Base::GetDataType<cldouble>() const noexcept return type_long_double_complex; } +template <class T> +BP1Base::ResizeResult BP1Base::ResizeBuffer(const Variable<T> &variable) +{ + size_t currentCapacity = m_HeapBuffer.m_Data.capacity(); + size_t variableData = + GetVariableIndexSize(variable) + variable.PayLoadSize(); + size_t requiredCapacity = variableData + m_HeapBuffer.m_DataPosition; + + ResizeResult result = ResizeResult::Unchanged; + + if (variableData > m_MaxBufferSize) + { + throw std::runtime_error( + "ERROR: variable " + variable.m_Name + " data size: " + + std::to_string(static_cast<float>(variableData) / (1024. * 1024.)) + + " Mb is too large for adios2 bp MaxBufferSize=" + + std::to_string(static_cast<float>(m_MaxBufferSize) / + (1024. * 1024.)) + + "Mb, try increasing MaxBufferSize in call to IO SetParameters, in " + "call to Write\n"); + } + + if (requiredCapacity <= currentCapacity) + { + // do nothing, unchanged is default + } + else if (requiredCapacity > m_MaxBufferSize) + { + if (currentCapacity < m_MaxBufferSize) + { + m_HeapBuffer.ResizeData(m_MaxBufferSize); + } + result = ResizeResult::Flush; + } + else // buffer must grow + { + if (currentCapacity < m_MaxBufferSize) + { + const size_t nextSize = + std::min(m_MaxBufferSize, + NextExponentialSize(requiredCapacity, currentCapacity, + m_GrowthFactor)); + m_HeapBuffer.ResizeData(nextSize); + result = ResizeResult::Success; + } + } + + return result; +} + +template <class T> +size_t BP1Base::GetVariableIndexSize(const Variable<T> &variable) const noexcept +{ + // size_t indexSize = varEntryLength + memberID + lengthGroupName + + // groupName + lengthVariableName + lengthOfPath + path + datatype + size_t indexSize = 23; // without characteristics + indexSize += variable.m_Name.size(); + + // characteristics 3 and 4, check variable number of dimensions + const size_t dimensions = variable.m_Count.size(); + indexSize += 28 * dimensions; // 28 bytes per dimension + indexSize += 1; // id + + // characteristics, offset + payload offset in data + indexSize += 2 * (1 + 8); + // characteristic 0, if scalar add value, for now only allowing string + if (dimensions == 1) + { + indexSize += sizeof(T); + indexSize += 1; // id + // must have an if here + indexSize += 2 + variable.m_Name.size(); + indexSize += 1; // id + } + + // characteristic statistics + if (m_Verbosity == 0) // default, only min and max + { + indexSize += 2 * (sizeof(T) + 1); + indexSize += 1 + 1; // id + } + + return indexSize + 12; // extra 12 bytes in case of attributes + // need to add transform characteristics +} + } // end namespace format } // end namespace adios diff --git a/source/adios2/toolkit/format/bp1/BP1Writer.cpp b/source/adios2/toolkit/format/bp1/BP1Writer.cpp index 2762b0208ff0782b37e35a337ca764cb4523879a..f5ed8fd362739e9b36080112038b7bd540eb7682 100644 --- a/source/adios2/toolkit/format/bp1/BP1Writer.cpp +++ b/source/adios2/toolkit/format/bp1/BP1Writer.cpp @@ -24,18 +24,6 @@ BP1Writer::BP1Writer(MPI_Comm mpiComm, const bool debugMode) { } -size_t BP1Writer::GetProcessGroupIndexSize(const std::string name, - const std::string timeStepName, - const size_t transportsSize) const - noexcept -{ - // pgIndex + list of methods (transports) - size_t pgSize = - (name.length() + timeStepName.length() + 23) + (3 + transportsSize); - - return pgSize; -} - void BP1Writer::WriteProcessGroupIndex( const std::string hostLanguage, const std::vector<std::string> &transportsTypes) noexcept @@ -134,6 +122,22 @@ void BP1Writer::Advance() m_Profiler.Timers.at("buffering").Resume(); } + FlattenData(); + ++m_MetadataSet.TimeStep; + + if (m_Profiler.IsActive) + { + m_Profiler.Timers.at("buffering").Pause(); + } +} + +void BP1Writer::Flush() +{ + if (m_Profiler.IsActive) + { + m_Profiler.Timers.at("buffering").Resume(); + } + FlattenData(); if (m_Profiler.IsActive) @@ -360,7 +364,6 @@ void BP1Writer::FlattenData() noexcept position - m_MetadataSet.DataPGLengthPosition - 8; CopyToBuffer(buffer, m_MetadataSet.DataPGLengthPosition, &dataPGLength); - ++m_MetadataSet.TimeStep; m_MetadataSet.DataPGIsOpen = false; } @@ -473,9 +476,6 @@ void BP1Writer::FlattenMetadata() noexcept // Explicit instantiation of only public templates #define declare_template_instantiation(T) \ - template BP1Writer::ResizeResult BP1Writer::ResizeBuffer( \ - const Variable<T> &variable); \ - \ template void BP1Writer::WriteVariableMetadata( \ const Variable<T> &variable) noexcept; \ \ diff --git a/source/adios2/toolkit/format/bp1/BP1Writer.h b/source/adios2/toolkit/format/bp1/BP1Writer.h index 4601215bf474c856e84cf6d995a78a9563e43bac..761500fa12b382a3305a89c0c56e234e183ed748 100644 --- a/source/adios2/toolkit/format/bp1/BP1Writer.h +++ b/source/adios2/toolkit/format/bp1/BP1Writer.h @@ -53,18 +53,6 @@ public: const std::string hostLanguage, const std::vector<std::string> &transportsTypes) noexcept; - /** - * - * @param variable - * @return - * -1: allocation failed, - * 0: no allocation needed, - * 1: reallocation is sucessful - * 2: need a transport flush - */ - template <class T> - ResizeResult ResizeBuffer(const Variable<T> &variable); - /** * Write metadata for a given variable * @param variable @@ -80,9 +68,13 @@ public: template <class T> void WriteVariablePayload(const Variable<T> &variable) noexcept; - /** Flattens data buffer */ + /** Flattens data buffer and closes current process group */ void Advance(); + /** Flattens data buffer and close current process group, doesn't + * advance time index */ + void Flush(); + /** * @param isFirstClose true: first time close, false: already closed buffer */ @@ -105,25 +97,6 @@ private: /** BP format version */ const uint8_t m_Version = 3; - /** - * Calculates the Process Index size in bytes according to the BP format, - * including list of method with no parameters (for now) - * @param name - * @param timeStepName - * @param transportsSize - * @return size of pg index - */ - size_t GetProcessGroupIndexSize(const std::string name, - const std::string timeStepName, - const size_t transportsSize) const noexcept; - - /** - * Returns the estimated variable index size - * @param variable - */ - template <class T> - size_t GetVariableIndexSize(const Variable<T> &variable) const noexcept; - /** * Get variable statistics * @param variable @@ -257,9 +230,6 @@ private: }; #define declare_template_instantiation(T) \ - extern template BP1Writer::ResizeResult BP1Writer::ResizeBuffer( \ - const Variable<T> &variable); \ - \ extern template void BP1Writer::WriteVariablePayload( \ const Variable<T> &variable) noexcept; \ \ diff --git a/source/adios2/toolkit/format/bp1/BP1Writer.tcc b/source/adios2/toolkit/format/bp1/BP1Writer.tcc index eef815c26204cd9a0a8ccc112ffcfebec8fd660c..de9f394b8936596891481a9b02c181a25a6b2fbb 100644 --- a/source/adios2/toolkit/format/bp1/BP1Writer.tcc +++ b/source/adios2/toolkit/format/bp1/BP1Writer.tcc @@ -19,26 +19,6 @@ namespace adios namespace format { -// PUBLIC -template <class T> -BP1Writer::ResizeResult BP1Writer::ResizeBuffer(const Variable<T> &variable) -{ - size_t variableData = - GetVariableIndexSize(variable) + variable.PayLoadSize(); - size_t requiredCapacity = variableData + m_HeapBuffer.m_DataPosition; - - if (requiredCapacity > m_MaxBufferSize && m_MaxBufferSize > 0) // is set - { - if (m_HeapBuffer.GetDataSize() < m_MaxBufferSize) - { - m_HeapBuffer.ResizeData(m_MaxBufferSize); - return ResizeResult::FLUSH; - } - } - - return ResizeResult::SUCCESS; -} - template <class T> void BP1Writer::WriteVariableMetadata(const Variable<T> &variable) noexcept { @@ -91,43 +71,6 @@ void BP1Writer::WriteVariablePayload(const Variable<T> &variable) noexcept } // PRIVATE -template <class T> -size_t BP1Writer::GetVariableIndexSize(const Variable<T> &variable) const - noexcept -{ - // size_t indexSize = varEntryLength + memberID + lengthGroupName + - // groupName + lengthVariableName + lengthOfPath + path + datatype - size_t indexSize = 23; // without characteristics - indexSize += variable.m_Name.size(); - - // characteristics 3 and 4, check variable number of dimensions - const size_t dimensions = variable.m_Count.size(); - indexSize += 28 * dimensions; // 28 bytes per dimension - indexSize += 1; // id - - // characteristics, offset + payload offset in data - indexSize += 2 * (1 + 8); - // characteristic 0, if scalar add value, for now only allowing string - if (dimensions == 1) - { - indexSize += sizeof(T); - indexSize += 1; // id - // must have an if here - indexSize += 2 + variable.m_Name.size(); - indexSize += 1; // id - } - - // characteristic statistics - if (m_Verbosity == 0) // default, only min and max - { - indexSize += 2 * (sizeof(T) + 1); - indexSize += 1 + 1; // id - } - - return indexSize + 12; // extra 12 bytes in case of attributes - // need to add transform characteristics -} - template <class T> BP1Writer::Stats<typename TypeInfo<T>::ValueType> BP1Writer::GetStats(const Variable<T> &variable) const noexcept diff --git a/source/adios2/toolkit/transportman/TransportMan.cpp b/source/adios2/toolkit/transportman/TransportMan.cpp index fef4ec08bfd8844c417cb4dfd54053ac5057fb67..7492922c102b83e0f58914e1690030d7a3f5a1b5 100644 --- a/source/adios2/toolkit/transportman/TransportMan.cpp +++ b/source/adios2/toolkit/transportman/TransportMan.cpp @@ -122,8 +122,8 @@ TransportMan::GetTransportsProfilers() noexcept return profilers; } -void TransportMan::CloseFiles(const int transportIndex, const char *buffer, - const size_t size) +void TransportMan::WriteFiles(const char *buffer, const size_t size, + const int transportIndex) { if (transportIndex == -1) { @@ -133,6 +133,34 @@ void TransportMan::CloseFiles(const int transportIndex, const char *buffer, { // make this truly asynch? transport->Write(buffer, size); + } + } + } + else + { + if (m_DebugMode) + { + if (m_Transports[transportIndex]->m_Type != "File") + { + throw std::invalid_argument( + "ERROR: index " + std::to_string(transportIndex) + + " doesn't come from a file transport in IO AddTransport, " + "in call to Write (flush) or Close\n"); + } + } + + m_Transports[transportIndex]->Write(buffer, size); + } +} + +void TransportMan::CloseFiles(const int transportIndex) +{ + if (transportIndex == -1) + { + for (auto &transport : m_Transports) + { + if (transport->m_Type == "File") + { transport->Close(); } } @@ -150,7 +178,6 @@ void TransportMan::CloseFiles(const int transportIndex, const char *buffer, } } - m_Transports[transportIndex]->Write(buffer, size); m_Transports[transportIndex]->Close(); } } diff --git a/source/adios2/toolkit/transportman/TransportMan.h b/source/adios2/toolkit/transportman/TransportMan.h index 2e984eb18496d86cb4bc97999f2537b93178150d..59e48f7c6f33720daa159513d13815f1c0b22836 100644 --- a/source/adios2/toolkit/transportman/TransportMan.h +++ b/source/adios2/toolkit/transportman/TransportMan.h @@ -94,15 +94,21 @@ public: * m_Transports.m_Profiler */ std::vector<profiling::IOChrono *> GetTransportsProfilers() noexcept; + /** + * Write to file transports + * @param transportIndex + * @param buffer + * @param size + */ + void WriteFiles(const char *buffer, const size_t size, + const int transportIndex = -1); + /** * Close file or files depending on transport index. Throws an exception * if transport is not a file when transportIndex > -1. * @param transportIndex -1: all transports, otherwise index in m_Transports - * @param buffer pass buffer to be sent to transport - * @param size passes the buffer size */ - void CloseFiles(const int transportIndex, const char *buffer, - const size_t size); + void CloseFiles(const int transportIndex = -1); /** Checks if all transports are closed */ bool AllTransportsClosed() const noexcept;