Unverified Commit f668f832 authored by Atkins, Charles Vernon's avatar Atkins, Charles Vernon Committed by GitHub
Browse files

Merge pull request #2016 from caitlinross/inline-engine

Inline engine fixes
parents 1e7fd8f8 c84eceab
......@@ -160,19 +160,13 @@ template <class T>
typename Variable<T>::Info *Engine::Get(Variable<T> &variable,
const Mode launch)
{
if (m_DebugMode)
{
// CommonChecks<T>(variable, nullptr, {{Mode::Read}}, "in call to Get");
}
typename Variable<T>::Info *info = nullptr;
switch (launch)
{
case Mode::Deferred:
// TODO different? Should use DoGetDeferred?
return DoGetBlockSync(variable);
case Mode::Sync:
// TODO should use DoGetSync()?
return DoGetBlockSync(variable);
info = DoGetBlockSync(variable);
break;
default:
if (m_DebugMode)
{
......@@ -182,7 +176,12 @@ typename Variable<T>::Info *Engine::Get(Variable<T> &variable,
"GetBlock\n");
}
}
return nullptr;
if (m_DebugMode)
{
CommonChecks<T>(variable, info->Data, {{Mode::Read}}, "in call to Get");
}
return info;
}
template <class T>
......
......@@ -12,6 +12,7 @@
#include "InlineReader.tcc"
#include "adios2/helper/adiosFunctions.h" // CSVToVector
#include "adios2/toolkit/profiling/taustubs/tautimer.hpp"
#include <iostream>
......@@ -26,6 +27,7 @@ InlineReader::InlineReader(IO &io, const std::string &name, const Mode mode,
helper::Comm comm)
: Engine("InlineReader", io, name, mode, std::move(comm))
{
TAU_SCOPED_TIMER("InlineReader::Open");
m_EndMessage = " in call to IO Open InlineReader " + m_Name + "\n";
m_ReaderRank = m_Comm.Rank();
Init();
......@@ -38,22 +40,18 @@ InlineReader::InlineReader(IO &io, const std::string &name, const Mode mode,
}
}
InlineReader::~InlineReader()
{
/* m_Inline deconstructor does close and finalize */
if (m_Verbosity == 5)
{
std::cout << "Inline Reader " << m_ReaderRank << " deconstructor on "
<< m_Name << "\n";
}
}
StepStatus InlineReader::BeginStep(const StepMode mode,
const float timeoutSeconds)
{
// step info should be received from the writer side in BeginStep()
// so this forced increase should not be here
++m_CurrentStep;
TAU_SCOPED_TIMER("InlineReader::BeginStep");
// Reader should be on same step as writer
const auto &writer =
dynamic_cast<InlineWriter &>(m_IO.GetEngine(m_WriterID));
m_CurrentStep = writer.CurrentStep();
if (m_CurrentStep == -1)
{
return StepStatus::EndOfStream;
}
if (m_Verbosity == 5)
{
......@@ -61,35 +59,35 @@ StepStatus InlineReader::BeginStep(const StepMode mode,
<< " BeginStep() new step " << m_CurrentStep << "\n";
}
// m_IO Variables and Attributes should be defined at this point
// so that the application can inquire them and start getting data
return StepStatus::OK;
}
void InlineReader::PerformGets()
{
TAU_SCOPED_TIMER("InlineReader::PerformGets");
if (m_Verbosity == 5)
{
std::cout << "Inline Reader " << m_ReaderRank << " PerformGets()\n";
}
m_NeedPerformGets = false;
}
size_t InlineReader::CurrentStep() const { return m_CurrentStep; }
size_t InlineReader::CurrentStep() const
{
// Reader should be on same step as writer
// added here since it's not really necessary to use beginstep/endstep for
// this engine's reader so this ensures we do report the correct step
const auto &writer =
dynamic_cast<InlineWriter &>(m_IO.GetEngine(m_WriterID));
return writer.CurrentStep();
}
void InlineReader::EndStep()
{
// EndStep should call PerformGets() if there are unserved GetDeferred()
// requests
if (m_NeedPerformGets)
{
PerformGets();
}
TAU_SCOPED_TIMER("InlineReader::EndStep");
if (m_Verbosity == 5)
{
std::cout << "Inline Reader " << m_ReaderRank << " EndStep()\n";
std::cout << "Inline Reader " << m_ReaderRank << " EndStep() Step "
<< m_CurrentStep << std::endl;
}
}
......@@ -98,15 +96,18 @@ void InlineReader::EndStep()
#define declare_type(T) \
void InlineReader::DoGetSync(Variable<T> &variable, T *data) \
{ \
TAU_SCOPED_TIMER("InlineReader::DoGetSync"); \
GetSyncCommon(variable, data); \
} \
void InlineReader::DoGetDeferred(Variable<T> &variable, T *data) \
{ \
TAU_SCOPED_TIMER("InlineReader::DoGetDeferred"); \
GetDeferredCommon(variable, data); \
} \
typename Variable<T>::Info *InlineReader::DoGetBlockSync( \
Variable<T> &variable) \
{ \
TAU_SCOPED_TIMER("InlineReader::DoGetBlockSync"); \
return GetBlockSyncCommon(variable); \
}
......@@ -121,12 +122,14 @@ ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
std::map<size_t, std::vector<typename Variable<T>::Info>> \
InlineReader::DoAllStepsBlocksInfo(const Variable<T> &variable) const \
{ \
TAU_SCOPED_TIMER("InlineReader::AllStepsBlockInfo"); \
return std::map<size_t, std::vector<typename Variable<T>::Info>>(); \
} \
\
std::vector<typename Variable<T>::Info> InlineReader::DoBlocksInfo( \
const Variable<T> &variable, const size_t step) const \
{ \
TAU_SCOPED_TIMER("InlineReader::DoBlocksInfo"); \
return variable.m_BlocksInfo; \
}
......@@ -147,7 +150,6 @@ void InlineReader::InitParameters()
std::transform(key.begin(), key.end(), key.begin(), ::tolower);
std::string value(pair.second);
// std::transform(value.begin(), value.end(), value.begin(), ::tolower);
if (key == "verbose")
{
......@@ -180,6 +182,7 @@ void InlineReader::InitTransports()
void InlineReader::DoClose(const int transportIndex)
{
TAU_SCOPED_TIMER("InlineReader::DoClose");
if (m_Verbosity == 5)
{
std::cout << "Inline Reader " << m_ReaderRank << " Close(" << m_Name
......
......@@ -42,7 +42,7 @@ public:
InlineReader(IO &adios, const std::string &name, const Mode mode,
helper::Comm comm);
~InlineReader();
~InlineReader() = default;
StepStatus BeginStep(StepMode mode = StepMode::Read,
const float timeoutSeconds = -1.0) final;
void PerformGets() final;
......@@ -56,9 +56,6 @@ private:
// step info should be received from the writer side in BeginStep()
int m_CurrentStep = -1;
// EndStep must call PerformGets if necessary
bool m_NeedPerformGets = false;
std::string m_WriterID;
void Init() final; ///< called from constructor, gets the selected Inline
......
......@@ -23,9 +23,8 @@ namespace core
namespace engine
{
template <>
inline void InlineReader::GetSyncCommon(Variable<std::string> &variable,
std::string *data)
template <class T>
inline void InlineReader::GetSyncCommon(Variable<T> &variable, T *data)
{
variable.m_Data = data;
auto blockInfo = variable.m_BlocksInfo.back();
......@@ -44,22 +43,6 @@ inline void InlineReader::GetSyncCommon(Variable<std::string> &variable,
}
}
template <class T>
inline void InlineReader::GetSyncCommon(Variable<T> &variable, T *data)
{
variable.m_Data = data;
auto blockInfo = variable.m_BlocksInfo.back();
if (blockInfo.IsValue)
{
*data = blockInfo.Value;
}
if (m_Verbosity == 5)
{
std::cout << "Inline Reader " << m_ReaderRank << " GetSync("
<< variable.m_Name << ")\n";
}
}
template <class T>
void InlineReader::GetDeferredCommon(Variable<T> &variable, T *data)
{
......@@ -69,18 +52,16 @@ void InlineReader::GetDeferredCommon(Variable<T> &variable, T *data)
std::cout << "Inline Reader " << m_ReaderRank << " GetDeferred("
<< variable.m_Name << ")\n";
}
m_NeedPerformGets = true;
}
template <class T>
inline typename Variable<T>::Info *
InlineReader::GetBlockSyncCommon(Variable<T> &variable)
{
InlineWriter &writer =
dynamic_cast<InlineWriter &>(m_IO.GetEngine(m_WriterID));
writer.AddReadVariable(variable.m_Name);
if (m_DebugMode)
{
InlineWriter &writer =
dynamic_cast<InlineWriter &>(m_IO.GetEngine(m_WriterID));
if (variable.m_BlockID >= variable.m_BlocksInfo.size())
{
throw std::invalid_argument(
......
......@@ -12,6 +12,7 @@
#include "InlineWriter.tcc"
#include "adios2/helper/adiosFunctions.h"
#include "adios2/toolkit/profiling/taustubs/tautimer.hpp"
#include <iostream>
......@@ -26,6 +27,7 @@ InlineWriter::InlineWriter(IO &io, const std::string &name, const Mode mode,
helper::Comm comm)
: Engine("InlineWriter", io, name, mode, std::move(comm))
{
TAU_SCOPED_TIMER("InlineWriter::Open");
m_EndMessage = " in call to InlineWriter " + m_Name + " Open\n";
m_WriterRank = m_Comm.Rank();
Init();
......@@ -38,6 +40,7 @@ InlineWriter::InlineWriter(IO &io, const std::string &name, const Mode mode,
StepStatus InlineWriter::BeginStep(StepMode mode, const float timeoutSeconds)
{
TAU_SCOPED_TIMER("InlineWriter::BeginStep");
m_CurrentStep++; // 0 is the first step
if (m_Verbosity == 5)
{
......@@ -45,14 +48,12 @@ StepStatus InlineWriter::BeginStep(StepMode mode, const float timeoutSeconds)
<< " BeginStep() new step " << m_CurrentStep << "\n";
}
// Need to clear block info from previous step at this point.
if (m_ReadVariables.empty())
{
return StepStatus::OK;
}
for (const std::string &name : m_ReadVariables)
// m_BlocksInfo for all variables should be cleared at this point,
// whether they were read in the last step or not.
auto availVars = m_IO.GetAvailableVariables();
for (auto &varPair : availVars)
{
const auto &name = varPair.first;
const std::string type = m_IO.InquireVariableType(name);
if (type == "compound")
......@@ -68,44 +69,34 @@ StepStatus InlineWriter::BeginStep(StepMode mode, const float timeoutSeconds)
#undef declare_type
}
m_ReadVariables.clear();
return StepStatus::OK;
}
size_t InlineWriter::CurrentStep() const
{
if (m_Verbosity == 5)
{
std::cout << "Inline Writer " << m_WriterRank
<< " CurrentStep() returns " << m_CurrentStep << "\n";
}
return m_CurrentStep;
}
size_t InlineWriter::CurrentStep() const { return m_CurrentStep; }
/* PutDeferred = PutSync, so nothing to be done in PerformPuts */
void InlineWriter::PerformPuts()
{
TAU_SCOPED_TIMER("InlineWriter::PerformPuts");
if (m_Verbosity == 5)
{
std::cout << "Inline Writer " << m_WriterRank << " PerformPuts()\n";
}
m_NeedPerformPuts = false;
}
void InlineWriter::EndStep()
{
if (m_NeedPerformPuts)
{
PerformPuts();
}
TAU_SCOPED_TIMER("InlineWriter::EndStep");
if (m_Verbosity == 5)
{
std::cout << "Inline Writer " << m_WriterRank << " EndStep()\n";
std::cout << "Inline Writer " << m_WriterRank << " EndStep() Step "
<< m_CurrentStep << std::endl;
}
}
void InlineWriter::Flush(const int transportIndex)
void InlineWriter::Flush(const int)
{
TAU_SCOPED_TIMER("InlineWriter::Flush");
if (m_Verbosity == 5)
{
std::cout << "Inline Writer " << m_WriterRank << " Flush()\n";
......@@ -117,11 +108,12 @@ void InlineWriter::Flush(const int transportIndex)
#define declare_type(T) \
void InlineWriter::DoPutSync(Variable<T> &variable, const T *data) \
{ \
TAU_SCOPED_TIMER("InlineWriter::DoPutSync"); \
PutSyncCommon(variable, variable.SetBlockInfo(data, CurrentStep())); \
/*reader uses: variable.m_BlocksInfo.clear();*/ \
} \
void InlineWriter::DoPutDeferred(Variable<T> &variable, const T *data) \
{ \
TAU_SCOPED_TIMER("InlineWriter::DoPutDeferred"); \
PutDeferredCommon(variable, data); \
}
ADIOS2_FOREACH_STDTYPE_1ARG(declare_type)
......@@ -165,11 +157,14 @@ void InlineWriter::InitTransports()
void InlineWriter::DoClose(const int transportIndex)
{
TAU_SCOPED_TIMER("InlineWriter::DoClose");
if (m_Verbosity == 5)
{
std::cout << "Inline Writer " << m_WriterRank << " Close(" << m_Name
<< ")\n";
}
// end of stream
m_CurrentStep = -1;
}
} // end namespace engine
......
......@@ -47,22 +47,11 @@ public:
void EndStep() final;
void Flush(const int transportIndex = -1) final;
void AddReadVariable(const std::string &name)
{
m_ReadVariables.insert(name);
}
private:
int m_Verbosity = 0;
int m_WriterRank; // my rank in the writers' comm
int m_CurrentStep = -1; // steps start from 0
// EndStep must call PerformPuts if necessary
bool m_NeedPerformPuts = false;
// track which variables have been read, so their blockinfo can be cleared.
std::set<std::string> m_ReadVariables;
void Init() final;
void InitParameters() final;
void InitTransports() final;
......@@ -88,7 +77,7 @@ private:
*/
template <class T>
void PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo);
typename Variable<T>::Info &blockInfo);
template <class T>
void PutDeferredCommon(Variable<T> &variable, const T *values);
......
......@@ -23,16 +23,13 @@ namespace engine
template <class T>
void InlineWriter::PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo)
typename Variable<T>::Info &blockInfo)
{
auto &info = variable.m_BlocksInfo.back();
info.BlockID = variable.m_BlocksInfo.size() - 1;
// passed in blockInfo has current blockInfo.Data member.
if (blockInfo.Shape.size() == 0 && blockInfo.Count.size() == 0 &&
blockInfo.StepsCount == 1)
if (variable.m_ShapeID == ShapeID::GlobalValue ||
variable.m_ShapeID == ShapeID::LocalValue)
{
info.IsValue = true;
info.Value = blockInfo.Data[0];
blockInfo.IsValue = true;
blockInfo.Value = blockInfo.Data[0];
}
if (m_Verbosity == 5)
{
......@@ -45,15 +42,11 @@ template <class T>
void InlineWriter::PutDeferredCommon(Variable<T> &variable, const T *data)
{
variable.SetBlockInfo(data, CurrentStep());
auto &info = variable.m_BlocksInfo.back();
info.BlockID = variable.m_BlocksInfo.size() - 1;
if (m_Verbosity == 5)
{
std::cout << "Inline Writer " << m_WriterRank << " PutDeferred("
<< variable.m_Name << ")\n";
}
m_NeedPerformPuts = true;
}
} // end namespace engine
......
......@@ -42,6 +42,15 @@ typename adios2::Variable<T>::Info setSelection(adios2::Variable<T> &var_i8,
return info;
}
template <class T>
void testBlocksInfo(adios2::Variable<T> &var, size_t step,
adios2::Engine &inlineReader)
{
var.SetStepSelection({step, 1});
auto blocksInfo = inlineReader.BlocksInfo(var, step);
ASSERT_EQ(blocksInfo.size(), 1);
}
TEST_F(InlineWriteRead, InlineWriteRead1D8)
{
// Each process would write a 1x8 array and all processes would
......@@ -239,6 +248,16 @@ TEST_F(InlineWriteRead, InlineWriteRead1D8)
std::string IString;
auto writerStep = inlineWriter.CurrentStep();
auto readerStep = inlineReader.CurrentStep();
ASSERT_EQ(writerStep, readerStep);
// Test skipping a step on the read side
if (step == 1)
{
continue;
}
inlineReader.Get(var_iString, IString);
auto info_i8 = setSelection<int8_t>(var_i8, step, inlineReader);
auto info_i16 = setSelection<int16_t>(var_i16, step, inlineReader);
......@@ -255,6 +274,19 @@ TEST_F(InlineWriteRead, InlineWriteRead1D8)
auto info_cr64 = setSelection<std::complex<double>>(var_cr64, step,
inlineReader);
testBlocksInfo<int8_t>(var_i8, step, inlineReader);
setSelection<int16_t>(var_i16, step, inlineReader);
setSelection<int32_t>(var_i32, step, inlineReader);
setSelection<int64_t>(var_i64, step, inlineReader);
setSelection<uint8_t>(var_u8, step, inlineReader);
setSelection<uint16_t>(var_u16, step, inlineReader);
setSelection<uint32_t>(var_u32, step, inlineReader);
setSelection<uint64_t>(var_u64, step, inlineReader);
setSelection<float>(var_r32, step, inlineReader);
setSelection<double>(var_r64, step, inlineReader);
setSelection<std::complex<float>>(var_cr32, step, inlineReader);
setSelection<std::complex<double>>(var_cr64, step, inlineReader);
// Generate test data for each rank uniquely
SmallTestData currentTestData = generateNewSmallTestData(
m_TestData, static_cast<int>(step), mpiRank, mpiSize);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment