diff --git a/examples/hello/bpFlushWriter/helloBPFlushWriter.cpp b/examples/hello/bpFlushWriter/helloBPFlushWriter.cpp index 6bbbd923e5e8debb46fab2267f2607c2193cd2a7..9309811c7016a4df7aec82f052962c53fc6601a7 100644 --- a/examples/hello/bpFlushWriter/helloBPFlushWriter.cpp +++ b/examples/hello/bpFlushWriter/helloBPFlushWriter.cpp @@ -26,7 +26,7 @@ int main(int argc, char *argv[]) MPI_Comm_size(MPI_COMM_WORLD, &size); /** Application variable */ - std::vector<float> myFloats(100); + std::vector<float> myFloats(1000000); //~ 4 MB const std::size_t Nx = myFloats.size(); try @@ -38,7 +38,11 @@ int main(int argc, char *argv[]) * Parameters, Transports, and Execution: Engines */ adios2::IO &bpIO = adios.DeclareIO("BPFile_N2N_Flush"); bpIO.SetEngine("BPFileWriter"); - // bpIO.SetParameters( ) + + // bpIO.SetParameters({{"MaxBufferSize", "9Mb"}, + // {"BufferGrowthFactor", "1.5"}, + // {"Threads", "2"}}); + // bpIO.AddTransport("File", {{"ProfileUnits", "Microseconds"}}); /** global array : name, { shape (total) }, { start (local) }, { count * (local) }, all are constant dimensions */ @@ -54,8 +58,13 @@ int main(int argc, char *argv[]) "ERROR: bpWriter not created at Open\n"); } - /** Write variable for buffering */ - bpWriter->Write<float>(bpFloats, myFloats.data()); + for (unsigned int t = 0; t < 100; ++t) + { + /** values to time step */ + myFloats.assign(myFloats.size(), t); + /** Write variable for buffering */ + bpWriter->Write<float>(bpFloats, myFloats.data()); + } /** Create bp file, engine becomes unreachable after this*/ bpWriter->Close(); diff --git a/source/adios2/helper/adiosMath.inl b/source/adios2/helper/adiosMath.inl index bc31776207e0513567f013c2f7db9ebbeba95f2f..b8c70e0195ed37315311e3e4ab26b6328bf42452 100644 --- a/source/adios2/helper/adiosMath.inl +++ b/source/adios2/helper/adiosMath.inl @@ -60,7 +60,7 @@ template <class T> void GetMinMaxThreads(const T *values, const size_t size, T &min, T &max, const unsigned int threads) noexcept { - if (threads == 1) + if (threads == 1 || threads > size) { GetMinMax(values, size, min, max); return; diff --git a/source/adios2/helper/adiosMemory.inl b/source/adios2/helper/adiosMemory.inl index f736f825a94dc4d259dac91666f6b5559cca0aed..ee50425e2b1083eb2e076d8579b631de49b406eb 100644 --- a/source/adios2/helper/adiosMemory.inl +++ b/source/adios2/helper/adiosMemory.inl @@ -45,7 +45,7 @@ void CopyToBufferThreads(std::vector<char> &buffer, size_t &position, const T *source, const size_t elements, const unsigned int threads) noexcept { - if (threads == 1) + if (threads == 1 || threads > elements) { CopyToBuffer(buffer, position, source, elements); return; @@ -58,23 +58,35 @@ void CopyToBufferThreads(std::vector<char> &buffer, size_t &position, std::vector<std::thread> copyThreads; copyThreads.reserve(threads); + const char *src = reinterpret_cast<const char *>(source); + for (unsigned int t = 0; t < threads; ++t) { - size_t bufferPosition = stride * t * sizeof(T); - const size_t sourcePosition = stride * t; - + const size_t bufferStart = position + stride * t * sizeof(T); + const size_t srcStart = stride * t * sizeof(T); if (t == threads - 1) // last thread takes stride + remainder { - copyThreads.push_back(std::thread(CopyToBuffer<T>, std::ref(buffer), - std::ref(bufferPosition), - &source[sourcePosition], last)); - position = bufferPosition; // last position + copyThreads.push_back(std::thread(std::memcpy, &buffer[bufferStart], + &src[srcStart], + last * sizeof(T))); + // std::copy not working properly with std::thread...why? + // copyThreads.push_back(std::thread(std::copy, + // &src[srcStart], + // &src[srcStart] + + // last * sizeof(T), + // buffer.begin() + + // bufferStart)); } else { - copyThreads.push_back(std::thread(CopyToBuffer<T>, std::ref(buffer), - std::ref(bufferPosition), - &source[sourcePosition], stride)); + copyThreads.push_back(std::thread(std::memcpy, &buffer[bufferStart], + &src[srcStart], + stride * sizeof(T))); + // std::copy not working properly with std::thread...why? + // copyThreads.push_back(std::thread( + // std::copy, &src[srcStart], &src[srcStart] + stride + // * sizeof(T), + // buffer.begin() + bufferStart)); } } @@ -82,6 +94,8 @@ void CopyToBufferThreads(std::vector<char> &buffer, size_t &position, { copyThread.join(); } + + position += elements * sizeof(T); } template <class T> diff --git a/source/adios2/toolkit/format/bp1/BP1Base.cpp b/source/adios2/toolkit/format/bp1/BP1Base.cpp index e9710bcfed4119588694eda12b510ca79b359957..f0edcfc5117735b67438bec59afbef08a66caf88 100644 --- a/source/adios2/toolkit/format/bp1/BP1Base.cpp +++ b/source/adios2/toolkit/format/bp1/BP1Base.cpp @@ -57,6 +57,10 @@ void BP1Base::InitParameters(const Params ¶meters) { InitParameterMaxBufferSize(value); } + else if (key == "Threads") + { + InitParameterThreads(value); + } else if (key == "Verbose") { InitParameterVerbose(value); @@ -276,6 +280,38 @@ void BP1Base::InitParameterMaxBufferSize(const std::string value) } } +void BP1Base::InitParameterThreads(const std::string value) +{ + int threads = -1; + + if (m_DebugMode) + { + bool success = true; + + try + { + threads = std::stoi(value); + } + catch (std::exception &e) + { + success = false; + } + + if (!success || threads < 1) + { + throw std::invalid_argument( + "ERROR: value in Threads=value in IO SetParameters must be " + "an integer >= 1 (default), in call to Open\n"); + } + } + else + { + threads = std::stoi(value); + } + + m_Threads = static_cast<unsigned int>(threads); +} + void BP1Base::InitParameterVerbose(const std::string value) { int verbosity = -1; diff --git a/source/adios2/toolkit/format/bp1/BP1Base.h b/source/adios2/toolkit/format/bp1/BP1Base.h index 0ea66fd5353820fd062eed8f1312e831876ed0db..084a26298cbb7bbd879ddfe4f153ee85b57ee5ea 100644 --- a/source/adios2/toolkit/format/bp1/BP1Base.h +++ b/source/adios2/toolkit/format/bp1/BP1Base.h @@ -251,10 +251,14 @@ protected: /** set initial buffer size */ void InitParameterInitBufferSize(const std::string value); - /** unlimited (default), set max buffer size in Gb or Mb + /** default = DefaultMaxBufferSize in ADIOSTypes.h, set max buffer size in + * Gb or Mb * max_buffer_size=100Mb or max_buffer_size=1Gb */ void InitParameterMaxBufferSize(const std::string value); + /** Set available number of threads for vector operations */ + void InitParameterThreads(const std::string value); + /** verbose file level=0 (default) */ void InitParameterVerbose(const std::string value); diff --git a/source/adios2/toolkit/format/bp1/BP1Writer.cpp b/source/adios2/toolkit/format/bp1/BP1Writer.cpp index b49881e43678724803c0a93a38216c6d339464b8..7b93e22f61f6ab410cfd61c791dd12c69adc8c9f 100644 --- a/source/adios2/toolkit/format/bp1/BP1Writer.cpp +++ b/source/adios2/toolkit/format/bp1/BP1Writer.cpp @@ -192,7 +192,8 @@ std::string BP1Writer::GetRankProfilingLog( std::string timeDate(profiler.Timers.at("buffering").m_LocalTimeDate); timeDate.pop_back(); - rankLog += "'date_and_time': '" + timeDate + "', " + "'bytes': " + + rankLog += "'date_and_time': '" + timeDate + "', 'threads': " + + std::to_string(m_Threads) + ", 'bytes': " + std::to_string(profiler.Bytes.at("buffering")) + ", "; lf_WriterTimer(rankLog, profiler.Timers.at("buffering"));