Commit c47dfe63 authored by Caitlin Ross's avatar Caitlin Ross
Browse files

fixes to inline engine

parent 20a5afe2
......@@ -160,19 +160,13 @@ template <class T>
typename Variable<T>::Info *Engine::Get(Variable<T> &variable,
const Mode launch)
{
if (m_DebugMode)
{
// CommonChecks<T>(variable, nullptr, {{Mode::Read}}, "in call to Get");
}
typename Variable<T>::Info *info = nullptr;
switch (launch)
{
case Mode::Deferred:
// TODO different? Should use DoGetDeferred?
return DoGetBlockSync(variable);
case Mode::Sync:
// TODO should use DoGetSync()?
return DoGetBlockSync(variable);
info = DoGetBlockSync(variable);
break;
default:
if (m_DebugMode)
{
......@@ -182,7 +176,12 @@ typename Variable<T>::Info *Engine::Get(Variable<T> &variable,
"GetBlock\n");
}
}
return nullptr;
if (m_DebugMode)
{
CommonChecks<T>(variable, info->Data, {{Mode::Read}}, "in call to Get");
}
return info;
}
template <class T>
......
......@@ -38,22 +38,17 @@ InlineReader::InlineReader(IO &io, const std::string &name, const Mode mode,
}
}
InlineReader::~InlineReader()
{
/* m_Inline deconstructor does close and finalize */
if (m_Verbosity == 5)
{
std::cout << "Inline Reader " << m_ReaderRank << " deconstructor on "
<< m_Name << "\n";
}
}
StepStatus InlineReader::BeginStep(const StepMode mode,
const float timeoutSeconds)
{
// step info should be received from the writer side in BeginStep()
// so this forced increase should not be here
++m_CurrentStep;
// Reader should be on same step as writer
const auto &writer =
dynamic_cast<InlineWriter &>(m_IO.GetEngine(m_WriterID));
m_CurrentStep = writer.CurrentStep();
if (m_CurrentStep == -1)
{
return StepStatus::EndOfStream;
}
if (m_Verbosity == 5)
{
......@@ -61,9 +56,6 @@ StepStatus InlineReader::BeginStep(const StepMode mode,
<< " BeginStep() new step " << m_CurrentStep << "\n";
}
// m_IO Variables and Attributes should be defined at this point
// so that the application can inquire them and start getting data
return StepStatus::OK;
}
......@@ -73,23 +65,24 @@ void InlineReader::PerformGets()
{
std::cout << "Inline Reader " << m_ReaderRank << " PerformGets()\n";
}
m_NeedPerformGets = false;
}
size_t InlineReader::CurrentStep() const { return m_CurrentStep; }
size_t InlineReader::CurrentStep() const
{
// Reader should be on same step as writer
// added here since it's not really necessary to use beginstep/endstep for
// this engine's reader so this ensures we do report the correct step
const auto &writer =
dynamic_cast<InlineWriter &>(m_IO.GetEngine(m_WriterID));
return writer.CurrentStep();
}
void InlineReader::EndStep()
{
// EndStep should call PerformGets() if there are unserved GetDeferred()
// requests
if (m_NeedPerformGets)
{
PerformGets();
}
if (m_Verbosity == 5)
{
std::cout << "Inline Reader " << m_ReaderRank << " EndStep()\n";
std::cout << "Inline Reader " << m_ReaderRank << " EndStep() Step "
<< m_CurrentStep << std::endl;
}
}
......@@ -147,7 +140,6 @@ void InlineReader::InitParameters()
std::transform(key.begin(), key.end(), key.begin(), ::tolower);
std::string value(pair.second);
// std::transform(value.begin(), value.end(), value.begin(), ::tolower);
if (key == "verbose")
{
......
......@@ -42,7 +42,7 @@ public:
InlineReader(IO &adios, const std::string &name, const Mode mode,
helper::Comm comm);
~InlineReader();
~InlineReader() = default;
StepStatus BeginStep(StepMode mode = StepMode::Read,
const float timeoutSeconds = -1.0) final;
void PerformGets() final;
......@@ -56,9 +56,6 @@ private:
// step info should be received from the writer side in BeginStep()
int m_CurrentStep = -1;
// EndStep must call PerformGets if necessary
bool m_NeedPerformGets = false;
std::string m_WriterID;
void Init() final; ///< called from constructor, gets the selected Inline
......
......@@ -23,9 +23,8 @@ namespace core
namespace engine
{
template <>
inline void InlineReader::GetSyncCommon(Variable<std::string> &variable,
std::string *data)
template <class T>
inline void InlineReader::GetSyncCommon(Variable<T> &variable, T *data)
{
variable.m_Data = data;
auto blockInfo = variable.m_BlocksInfo.back();
......@@ -44,22 +43,6 @@ inline void InlineReader::GetSyncCommon(Variable<std::string> &variable,
}
}
template <class T>
inline void InlineReader::GetSyncCommon(Variable<T> &variable, T *data)
{
variable.m_Data = data;
auto blockInfo = variable.m_BlocksInfo.back();
if (blockInfo.IsValue)
{
*data = blockInfo.Value;
}
if (m_Verbosity == 5)
{
std::cout << "Inline Reader " << m_ReaderRank << " GetSync("
<< variable.m_Name << ")\n";
}
}
template <class T>
void InlineReader::GetDeferredCommon(Variable<T> &variable, T *data)
{
......@@ -69,18 +52,16 @@ void InlineReader::GetDeferredCommon(Variable<T> &variable, T *data)
std::cout << "Inline Reader " << m_ReaderRank << " GetDeferred("
<< variable.m_Name << ")\n";
}
m_NeedPerformGets = true;
}
template <class T>
inline typename Variable<T>::Info *
InlineReader::GetBlockSyncCommon(Variable<T> &variable)
{
InlineWriter &writer =
dynamic_cast<InlineWriter &>(m_IO.GetEngine(m_WriterID));
writer.AddReadVariable(variable.m_Name);
if (m_DebugMode)
{
InlineWriter &writer =
dynamic_cast<InlineWriter &>(m_IO.GetEngine(m_WriterID));
if (variable.m_BlockID >= variable.m_BlocksInfo.size())
{
throw std::invalid_argument(
......
......@@ -45,14 +45,12 @@ StepStatus InlineWriter::BeginStep(StepMode mode, const float timeoutSeconds)
<< " BeginStep() new step " << m_CurrentStep << "\n";
}
// Need to clear block info from previous step at this point.
if (m_ReadVariables.empty())
{
return StepStatus::OK;
}
for (const std::string &name : m_ReadVariables)
// m_BlocksInfo for all variables should be cleared at this point,
// whether they were read in the last step or not.
auto availVars = m_IO.GetAvailableVariables();
for (auto &varPair : availVars)
{
const auto &name = varPair.first;
const std::string type = m_IO.InquireVariableType(name);
if (type == "compound")
......@@ -68,20 +66,10 @@ StepStatus InlineWriter::BeginStep(StepMode mode, const float timeoutSeconds)
#undef declare_type
}
m_ReadVariables.clear();
return StepStatus::OK;
}
size_t InlineWriter::CurrentStep() const
{
if (m_Verbosity == 5)
{
std::cout << "Inline Writer " << m_WriterRank
<< " CurrentStep() returns " << m_CurrentStep << "\n";
}
return m_CurrentStep;
}
size_t InlineWriter::CurrentStep() const { return m_CurrentStep; }
/* PutDeferred = PutSync, so nothing to be done in PerformPuts */
void InlineWriter::PerformPuts()
......@@ -90,21 +78,18 @@ void InlineWriter::PerformPuts()
{
std::cout << "Inline Writer " << m_WriterRank << " PerformPuts()\n";
}
m_NeedPerformPuts = false;
}
void InlineWriter::EndStep()
{
if (m_NeedPerformPuts)
{
PerformPuts();
}
if (m_Verbosity == 5)
{
std::cout << "Inline Writer " << m_WriterRank << " EndStep()\n";
std::cout << "Inline Writer " << m_WriterRank << " EndStep() Step "
<< m_CurrentStep << std::endl;
}
}
void InlineWriter::Flush(const int transportIndex)
void InlineWriter::Flush(const int)
{
if (m_Verbosity == 5)
{
......@@ -118,7 +103,6 @@ void InlineWriter::Flush(const int transportIndex)
void InlineWriter::DoPutSync(Variable<T> &variable, const T *data) \
{ \
PutSyncCommon(variable, variable.SetBlockInfo(data, CurrentStep())); \
/*reader uses: variable.m_BlocksInfo.clear();*/ \
} \
void InlineWriter::DoPutDeferred(Variable<T> &variable, const T *data) \
{ \
......@@ -170,6 +154,8 @@ void InlineWriter::DoClose(const int transportIndex)
std::cout << "Inline Writer " << m_WriterRank << " Close(" << m_Name
<< ")\n";
}
// end of stream
m_CurrentStep = -1;
}
} // end namespace engine
......
......@@ -47,22 +47,11 @@ public:
void EndStep() final;
void Flush(const int transportIndex = -1) final;
void AddReadVariable(const std::string &name)
{
m_ReadVariables.insert(name);
}
private:
int m_Verbosity = 0;
int m_WriterRank; // my rank in the writers' comm
int m_CurrentStep = -1; // steps start from 0
// EndStep must call PerformPuts if necessary
bool m_NeedPerformPuts = false;
// track which variables have been read, so their blockinfo can be cleared.
std::set<std::string> m_ReadVariables;
void Init() final;
void InitParameters() final;
void InitTransports() final;
......@@ -88,7 +77,7 @@ private:
*/
template <class T>
void PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo);
typename Variable<T>::Info &blockInfo);
template <class T>
void PutDeferredCommon(Variable<T> &variable, const T *values);
......
......@@ -23,16 +23,13 @@ namespace engine
template <class T>
void InlineWriter::PutSyncCommon(Variable<T> &variable,
const typename Variable<T>::Info &blockInfo)
typename Variable<T>::Info &blockInfo)
{
auto &info = variable.m_BlocksInfo.back();
info.BlockID = variable.m_BlocksInfo.size() - 1;
// passed in blockInfo has current blockInfo.Data member.
if (blockInfo.Shape.size() == 0 && blockInfo.Count.size() == 0 &&
blockInfo.StepsCount == 1)
if (variable.m_ShapeID == ShapeID::GlobalValue ||
variable.m_ShapeID == ShapeID::LocalValue)
{
info.IsValue = true;
info.Value = blockInfo.Data[0];
blockInfo.IsValue = true;
blockInfo.Value = blockInfo.Data[0];
}
if (m_Verbosity == 5)
{
......@@ -45,15 +42,11 @@ template <class T>
void InlineWriter::PutDeferredCommon(Variable<T> &variable, const T *data)
{
variable.SetBlockInfo(data, CurrentStep());
auto &info = variable.m_BlocksInfo.back();
info.BlockID = variable.m_BlocksInfo.size() - 1;
if (m_Verbosity == 5)
{
std::cout << "Inline Writer " << m_WriterRank << " PutDeferred("
<< variable.m_Name << ")\n";
}
m_NeedPerformPuts = true;
}
} // end namespace engine
......
......@@ -42,6 +42,15 @@ typename adios2::Variable<T>::Info setSelection(adios2::Variable<T> &var_i8,
return info;
}
template <class T>
void testBlocksInfo(adios2::Variable<T> &var, size_t step,
adios2::Engine &inlineReader)
{
var.SetStepSelection({step, 1});
auto blocksInfo = inlineReader.BlocksInfo(var, step);
ASSERT_EQ(blocksInfo.size(), 1);
}
TEST_F(InlineWriteRead, InlineWriteRead1D8)
{
// Each process would write a 1x8 array and all processes would
......@@ -239,6 +248,16 @@ TEST_F(InlineWriteRead, InlineWriteRead1D8)
std::string IString;
auto writerStep = inlineWriter.CurrentStep();
auto readerStep = inlineReader.CurrentStep();
ASSERT_EQ(writerStep, readerStep);
// Test skipping a step on the read side
if (step == 1)
{
continue;
}
inlineReader.Get(var_iString, IString);
auto info_i8 = setSelection<int8_t>(var_i8, step, inlineReader);
auto info_i16 = setSelection<int16_t>(var_i16, step, inlineReader);
......@@ -255,6 +274,19 @@ TEST_F(InlineWriteRead, InlineWriteRead1D8)
auto info_cr64 = setSelection<std::complex<double>>(var_cr64, step,
inlineReader);
testBlocksInfo<int8_t>(var_i8, step, inlineReader);
setSelection<int16_t>(var_i16, step, inlineReader);
setSelection<int32_t>(var_i32, step, inlineReader);
setSelection<int64_t>(var_i64, step, inlineReader);
setSelection<uint8_t>(var_u8, step, inlineReader);
setSelection<uint16_t>(var_u16, step, inlineReader);
setSelection<uint32_t>(var_u32, step, inlineReader);
setSelection<uint64_t>(var_u64, step, inlineReader);
setSelection<float>(var_r32, step, inlineReader);
setSelection<double>(var_r64, step, inlineReader);
setSelection<std::complex<float>>(var_cr32, step, inlineReader);
setSelection<std::complex<double>>(var_cr64, step, inlineReader);
// Generate test data for each rank uniquely
SmallTestData currentTestData = generateNewSmallTestData(
m_TestData, static_cast<int>(step), mpiRank, mpiSize);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment