Skip to content
Snippets Groups Projects
Commit 8909d16d authored by William F Godoy's avatar William F Godoy
Browse files

Testing bp buffer memory reallocation with heat transfer example

parent 2e039d02
No related branches found
No related tags found
1 merge request!132Initial implementation of buffer resizing and flush for independent IO
......@@ -145,10 +145,10 @@ const std::string DefaultFileLibrary("POSIX");
const std::string DefaultTimeUnit("Microseconds");
constexpr TimeUnit DefaultTimeUnitEnum(TimeUnit::Microseconds);
/** default initial bp buffer size in bytes */
/** default initial bp buffer size, 16Kb, in bytes */
constexpr size_t DefaultInitialBufferSize(16384);
/** default maximum bp buffer size in bytes */
constexpr size_t DefaultMaxBufferSize(1048576);
/** default maximum bp buffer size, 16Mb, in bytes */
constexpr size_t DefaultMaxBufferSize(16777216);
/** default buffer growth factor (from STL vector = 2.) */
constexpr float DefaultBufferGrowthFactor(2.);
......
......@@ -10,7 +10,7 @@
#include "BPFileWriter.h"
#include <cmath>
#include <iostream> //TODO temporary
namespace adios
{
......@@ -29,57 +29,39 @@ void BPFileWriter::DoWriteCommon(Variable<T> &variable, const T *values)
m_IO.m_HostLanguage, m_TransportsManager.GetTransportsTypes());
}
const size_t oldSize = m_BP1Writer.m_HeapBuffer.GetDataSize();
format::BP1Base::ResizeResult resizeResult =
m_BP1Writer.ResizeBuffer(variable);
// WRITE INDEX to data buffer and metadata structure (in memory)//
m_BP1Writer.WriteVariableMetadata(variable);
const size_t newSize = m_BP1Writer.m_HeapBuffer.GetDataSize();
if (resizeResult == format::BP1Base::ResizeResult::FLUSH)
// if (resizeResult == format::BP1Base::ResizeResult::Success)
// {
// std::cout << "Old buffer size: " << oldSize << "\n";
// std::cout << "New buffer size: " << newSize << "\n";
// }
if (resizeResult == format::BP1Base::ResizeResult::Flush)
{
m_BP1Writer.Flush();
auto &heapBuffer = m_BP1Writer.m_HeapBuffer;
// first batch fills current buffer and sends to transports
const size_t firstBatchSize = heapBuffer.GetAvailableDataSize();
CopyToBuffer(heapBuffer.m_Data, heapBuffer.m_DataPosition, values,
firstBatchSize / sizeof(T));
m_TransportsManager.WriteFiles(heapBuffer.GetData(),
heapBuffer.m_DataPosition);
// start writing missing size in batches directly to transport
const size_t missingSize = variable.PayLoadSize() - firstBatchSize;
const size_t bufferSize = heapBuffer.GetDataSize();
const size_t batches = missingSize / bufferSize;
const size_t lastSize = missingSize % batches;
// flush to transports in uniform batches
for (size_t batch = 0; batch < batches; ++batch)
{
const size_t start = batch * bufferSize / sizeof(T);
const char *valuesPtr =
reinterpret_cast<const char *>(&values[start]);
if (batch == batches - 1) // lastSize
{
m_TransportsManager.WriteFiles(valuesPtr, lastSize);
}
else
{
m_TransportsManager.WriteFiles(valuesPtr, bufferSize);
}
}
// update absolute position
heapBuffer.m_DataAbsolutePosition += variable.PayLoadSize();
// set relative position to zero
heapBuffer.m_DataPosition = 0;
// reset buffer to zero values
heapBuffer.m_Data.assign(heapBuffer.GetDataSize(), '\0');
m_BP1Writer.WriteProcessGroupIndex(
m_IO.m_HostLanguage, m_TransportsManager.GetTransportsTypes());
}
else // Write data to buffer
{
m_BP1Writer.WriteVariablePayload(variable);
}
// WRITE INDEX to data buffer and metadata structure (in memory)//
m_BP1Writer.WriteVariableMetadata(variable);
m_BP1Writer.WriteVariablePayload(variable);
variable.m_AppValues = nullptr; // not needed after write
}
......
......@@ -353,7 +353,7 @@ size_t BP1Base::GetProcessGroupIndexSize(const std::string name,
#define declare_template_instantiation(T) \
template BP1Base::ResizeResult BP1Base::ResizeBuffer( \
const Variable<T> &variable) noexcept;
const Variable<T> &variable);
ADIOS2_FOREACH_TYPE_1ARG(declare_template_instantiation)
#undef declare_template_instantiation
......
......@@ -109,10 +109,10 @@ public:
/** Return type of the CheckAllocation function. */
enum class ResizeResult
{
FAILURE, //!< FAILURE, caught a std::bad_alloc
UNCHANGED, //!< UNCHANGED, no need to resize (sufficient capacity)
SUCCESS, //!< SUCCESS, resize was successful
FLUSH //!< FLUSH, need to flush to transports for current variable
Failure, //!< FAILURE, caught a std::bad_alloc
Unchanged, //!< UNCHANGED, no need to resize (sufficient capacity)
Success, //!< SUCCESS, resize was successful
Flush //!< FLUSH, need to flush to transports for current variable
};
/**
......@@ -124,7 +124,7 @@ public:
* 2: need a transport flush
*/
template <class T>
ResizeResult ResizeBuffer(const Variable<T> &variable) noexcept;
ResizeResult ResizeBuffer(const Variable<T> &variable);
protected:
/** might be used in large payload copies to buffer */
......@@ -309,7 +309,7 @@ protected:
#define declare_template_instantiation(T) \
extern template BP1Base::ResizeResult BP1Base::ResizeBuffer( \
const Variable<T> &variable) noexcept;
const Variable<T> &variable);
ADIOS2_FOREACH_TYPE_1ARG(declare_template_instantiation)
#undef declare_template_instantiation
......
......@@ -113,15 +113,26 @@ int8_t BP1Base::GetDataType<cldouble>() const noexcept
}
template <class T>
BP1Base::ResizeResult
BP1Base::ResizeBuffer(const Variable<T> &variable) noexcept
BP1Base::ResizeResult BP1Base::ResizeBuffer(const Variable<T> &variable)
{
size_t currentCapacity = m_HeapBuffer.m_Data.capacity();
size_t variableData =
GetVariableIndexSize(variable) + variable.PayLoadSize();
size_t requiredCapacity = variableData + m_HeapBuffer.m_DataPosition;
ResizeResult result = ResizeResult::UNCHANGED;
ResizeResult result = ResizeResult::Unchanged;
if (variableData > m_MaxBufferSize)
{
throw std::runtime_error(
"ERROR: variable " + variable.m_Name + " data size: " +
std::to_string(static_cast<float>(variableData) / (1024. * 1024.)) +
" Mb is too large for adios2 bp MaxBufferSize=" +
std::to_string(static_cast<float>(m_MaxBufferSize) /
(1024. * 1024.)) +
"Mb, try increasing MaxBufferSize in call to IO SetParameters, in "
"call to Write\n");
}
if (requiredCapacity <= currentCapacity)
{
......@@ -133,7 +144,7 @@ BP1Base::ResizeBuffer(const Variable<T> &variable) noexcept
{
m_HeapBuffer.ResizeData(m_MaxBufferSize);
}
result = ResizeResult::FLUSH;
result = ResizeResult::Flush;
}
else // buffer must grow
{
......@@ -144,7 +155,7 @@ BP1Base::ResizeBuffer(const Variable<T> &variable) noexcept
NextExponentialSize(requiredCapacity, currentCapacity,
m_GrowthFactor));
m_HeapBuffer.ResizeData(nextSize);
result = ResizeResult::SUCCESS;
result = ResizeResult::Success;
}
}
......
......@@ -122,6 +122,22 @@ void BP1Writer::Advance()
m_Profiler.Timers.at("buffering").Resume();
}
FlattenData();
++m_MetadataSet.TimeStep;
if (m_Profiler.IsActive)
{
m_Profiler.Timers.at("buffering").Pause();
}
}
void BP1Writer::Flush()
{
if (m_Profiler.IsActive)
{
m_Profiler.Timers.at("buffering").Resume();
}
FlattenData();
if (m_Profiler.IsActive)
......@@ -348,7 +364,6 @@ void BP1Writer::FlattenData() noexcept
position - m_MetadataSet.DataPGLengthPosition - 8;
CopyToBuffer(buffer, m_MetadataSet.DataPGLengthPosition, &dataPGLength);
++m_MetadataSet.TimeStep;
m_MetadataSet.DataPGIsOpen = false;
}
......
......@@ -68,9 +68,13 @@ public:
template <class T>
void WriteVariablePayload(const Variable<T> &variable) noexcept;
/** Flattens data buffer */
/** Flattens data buffer and closes current process group */
void Advance();
/** Flattens data buffer and close current process group, doesn't
* advance time index */
void Flush();
/**
* @param isFirstClose true: first time close, false: already closed buffer
*/
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment