diff --git a/source/adios2/engine/bp/BPFileReader.cpp b/source/adios2/engine/bp/BPFileReader.cpp
index 5249d3ca7a640d68de1b0b2c2c5eac57dfba7208..8abcb1405de1b93e4b7d9b0183e3ec814cc6deef 100644
--- a/source/adios2/engine/bp/BPFileReader.cpp
+++ b/source/adios2/engine/bp/BPFileReader.cpp
@@ -25,6 +25,43 @@ BPFileReader::BPFileReader(IO &io, const std::string &name, const Mode mode,
     Init();
 }
 
+StepStatus BPFileReader::BeginStep(StepMode mode, const float timeoutSeconds)
+{
+    if (m_DebugMode && mode != StepMode::NextAvailable)
+    {
+        throw std::invalid_argument("ERROR: mode is not supported yet for "
+                                    "engine BPFileReader, in call to "
+                                    "BeginStep\n");
+    }
+
+    const auto &variablesData = m_IO.GetVariablesDataMap();
+
+    for (const auto &variableData : variablesData)
+    {
+        const std::string name = variableData.first;
+        const std::string type = m_IO.InquireVariableType(name);
+
+        if (type == "compound")
+        {
+        }
+#define declare_type(T)                                                        \
+    else if (type == GetType<T>())                                             \
+    {                                                                          \
+        auto variable = m_IO.InquireVariable<T>(name);                         \
+        if (mode == StepMode::NextAvailable)                                   \
+        {                                                                      \
+            variable->SetStepSelection({m_CurrentStep + 1, 1});                \
+        }                                                                      \
+    }
+        ADIOS2_FOREACH_TYPE_1ARG(declare_type)
+#undef declare_type
+    }
+
+    return StepStatus::OK;
+}
+
+void BPFileReader::EndStep() { ++m_CurrentStep; }
+
 void BPFileReader::PerformGets()
 {
     const std::map<std::string, SubFileInfoMap> variablesSubfileInfo =
diff --git a/source/adios2/engine/bp/BPFileReader.h b/source/adios2/engine/bp/BPFileReader.h
index 14f04f4bb6eead78044abfcfa83dc00c224c9aef..18f4be8cfbd140a3330e6d6a1e684c7eb9d3606c 100644
--- a/source/adios2/engine/bp/BPFileReader.h
+++ b/source/adios2/engine/bp/BPFileReader.h
@@ -42,6 +42,11 @@ public:
 
     virtual ~BPFileReader() = default;
 
+    StepStatus BeginStep(StepMode mode = StepMode::NextAvailable,
+                         const float timeoutSeconds = 0.f) final;
+
+    void EndStep() final;
+
     void PerformGets() final;
 
     void Close(const int transportIndex = -1);
@@ -51,6 +56,9 @@ private:
     transportman::TransportMan m_FileManager;
     transportman::TransportMan m_SubFileManager;
 
+    /** updates if step=1 in EndStep function, used for per-step reads */
+    size_t m_CurrentStep = 0;
+
     void Init();
     void InitTransports();
     void InitBuffer();
diff --git a/testing/adios2/engine/SmallTestData.h b/testing/adios2/engine/SmallTestData.h
index d6b82eae1ab89b7e83612415423581d282401584..240f5aece68c0c3b68eafc9e6d463b1215f35018 100644
--- a/testing/adios2/engine/SmallTestData.h
+++ b/testing/adios2/engine/SmallTestData.h
@@ -64,4 +64,19 @@ SmallTestData generateNewSmallTestData(SmallTestData in, int step, int rank,
     return in;
 }
 
+void UpdateSmallTestData(SmallTestData &in, int step, int rank, int size)
+{
+    int j = rank + 1 + step * size;
+    std::for_each(in.I8.begin(), in.I8.end(), [&](int8_t &v) { v += j; });
+    std::for_each(in.I16.begin(), in.I16.end(), [&](int16_t &v) { v += j; });
+    std::for_each(in.I32.begin(), in.I32.end(), [&](int32_t &v) { v += j; });
+    std::for_each(in.I64.begin(), in.I64.end(), [&](int64_t &v) { v += j; });
+    std::for_each(in.U8.begin(), in.U8.end(), [&](uint8_t &v) { v += j; });
+    std::for_each(in.U16.begin(), in.U16.end(), [&](uint16_t &v) { v += j; });
+    std::for_each(in.U32.begin(), in.U32.end(), [&](uint32_t &v) { v += j; });
+    std::for_each(in.U64.begin(), in.U64.end(), [&](uint64_t &v) { v += j; });
+    std::for_each(in.R32.begin(), in.R32.end(), [&](float &v) { v += j; });
+    std::for_each(in.R64.begin(), in.R64.end(), [&](double &v) { v += j; });
+}
+
 #endif // TESTING_ADIOS2_ENGINE_SMALLTESTDATA_H_
diff --git a/testing/adios2/engine/bp/CMakeLists.txt b/testing/adios2/engine/bp/CMakeLists.txt
index 736278bf6fb12af33c1da3b2f80f1f66e6885cc7..61a7d569f8c231f2e2fbc0bbe0a75881ae86d161 100644
--- a/testing/adios2/engine/bp/CMakeLists.txt
+++ b/testing/adios2/engine/bp/CMakeLists.txt
@@ -6,18 +6,23 @@
 add_executable(TestBPWriteReadADIOS2 TestBPWriteReadADIOS2.cpp)
 target_link_libraries(TestBPWriteReadADIOS2 adios2 gtest gtest_main)
 
+add_executable(TestBPWriteReadAsStreamADIOS2 TestBPWriteReadAsStreamADIOS2.cpp)
+target_link_libraries(TestBPWriteReadAsStreamADIOS2 adios2 gtest gtest_main)
+
 add_executable(TestBPWriteReadAttributesADIOS2 TestBPWriteReadAttributesADIOS2.cpp)
 target_link_libraries(TestBPWriteReadAttributesADIOS2 adios2 gtest gtest_main)
 
 
 if(ADIOS2_HAVE_MPI)
   target_link_libraries(TestBPWriteReadADIOS2 MPI::MPI_C)
+  target_link_libraries(TestBPWriteReadAsStreamADIOS2 MPI::MPI_C)
   target_link_libraries(TestBPWriteReadAttributesADIOS2 MPI::MPI_C)
   
   set(extra_test_args EXEC_WRAPPER ${MPIEXEC_COMMAND})
 endif()
 
 gtest_add_tests(TARGET TestBPWriteReadADIOS2 ${extra_test_args})
+gtest_add_tests(TARGET TestBPWriteReadAsStreamADIOS2 ${extra_test_args})
 gtest_add_tests(TARGET TestBPWriteReadAttributesADIOS2 ${extra_test_args})
   
 if (ADIOS2_HAVE_ADIOS1)
diff --git a/testing/adios2/engine/bp/TestBPWriteReadAsStreamADIOS2.cpp b/testing/adios2/engine/bp/TestBPWriteReadAsStreamADIOS2.cpp
new file mode 100644
index 0000000000000000000000000000000000000000..1ee87873a9f0c480889404cf8ddc08a8423c6c44
--- /dev/null
+++ b/testing/adios2/engine/bp/TestBPWriteReadAsStreamADIOS2.cpp
@@ -0,0 +1,785 @@
+/*
+ * Distributed under the OSI-approved Apache License, Version 2.0.  See
+ * accompanying file Copyright.txt for details.
+ */
+#include <cstdint>
+#include <cstring>
+
+#include <iostream>
+#include <stdexcept>
+
+#include <adios2.h>
+
+#include <gtest/gtest.h>
+
+#include "../SmallTestData.h"
+
+class BPWriteReadAsStreamTestADIOS2 : public ::testing::Test
+{
+public:
+    BPWriteReadAsStreamTestADIOS2() = default;
+
+    SmallTestData m_TestData;
+    SmallTestData m_OriginalData;
+};
+
+//******************************************************************************
+// 1D 1x8 test data
+//******************************************************************************
+
+// ADIOS2 BP write, native ADIOS1 read
+TEST_F(BPWriteReadAsStreamTestADIOS2, ADIOS2BPWriteRead1D8)
+{
+    // Each process would write a 1x8 array and all processes would
+    // form a mpiSize * Nx 1D array
+    const std::string fname("ADIOS2BPWriteReadAsStream1D8.bp");
+
+    int mpiRank = 0, mpiSize = 1;
+    // Number of rows
+    const size_t Nx = 8;
+
+    // Number of steps
+    const size_t NSteps = 3;
+
+#ifdef ADIOS2_HAVE_MPI
+    MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
+    MPI_Comm_size(MPI_COMM_WORLD, &mpiSize);
+#endif
+
+// Write test data using BP
+
+#ifdef ADIOS2_HAVE_MPI
+    adios2::ADIOS adios(MPI_COMM_WORLD, adios2::DebugON);
+#else
+    adios2::ADIOS adios(true);
+#endif
+    {
+        adios2::IO &io = adios.DeclareIO("TestIO");
+
+        // Declare 1D variables (NumOfProcesses * Nx)
+        // The local process' part (start, count) can be defined now or later
+        // before Write().
+        {
+            const adios2::Dims shape{static_cast<size_t>(Nx * mpiSize)};
+            const adios2::Dims start{static_cast<size_t>(Nx * mpiRank)};
+            const adios2::Dims count{Nx};
+
+            io.DefineVariable<int8_t>("i8", shape, start, count,
+                                      adios2::ConstantDims,
+                                      m_TestData.I8.data());
+            io.DefineVariable<int16_t>("i16", shape, start, count,
+                                       adios2::ConstantDims,
+                                       m_TestData.I16.data());
+            io.DefineVariable<int32_t>("i32", shape, start, count,
+                                       adios2::ConstantDims,
+                                       m_TestData.I32.data());
+            io.DefineVariable<int64_t>("i64", shape, start, count,
+                                       adios2::ConstantDims,
+                                       m_TestData.I64.data());
+
+            io.DefineVariable<uint8_t>("u8", shape, start, count,
+                                       adios2::ConstantDims,
+                                       m_TestData.U8.data());
+
+            io.DefineVariable<uint16_t>("u16", shape, start, count,
+                                        adios2::ConstantDims,
+                                        m_TestData.U16.data());
+            io.DefineVariable<uint32_t>("u32", shape, start, count,
+                                        adios2::ConstantDims,
+                                        m_TestData.U32.data());
+            io.DefineVariable<uint64_t>("u64", shape, start, count,
+                                        adios2::ConstantDims,
+                                        m_TestData.U64.data());
+
+            io.DefineVariable<float>("r32", shape, start, count,
+                                     adios2::ConstantDims,
+                                     m_TestData.R32.data());
+            io.DefineVariable<double>("r64", shape, start, count,
+                                      adios2::ConstantDims,
+                                      m_TestData.R64.data());
+        }
+
+        // Create the BP Engine
+        io.SetEngine("BPFileWriter");
+
+        io.AddTransport("file");
+
+        // QUESTION: It seems that BPFilterWriter cannot overwrite existing
+        // files
+        // Ex. if you tune Nx and NSteps, the test would fail. But if you clear
+        // the cache in
+        // ${adios2Build}/testing/adios2/engine/bp/ADIOS2BPWriteADIOS1Read1D8.bp.dir,
+        // then it works
+        adios2::Engine &bpWriter = io.Open(fname, adios2::Mode::Write);
+
+        for (size_t step = 0; step < NSteps; ++step)
+        {
+            UpdateSmallTestData(m_TestData, static_cast<int>(step), mpiRank,
+                                mpiSize);
+            bpWriter.WriteStep();
+        }
+
+        bpWriter.Close();
+    }
+
+    {
+        adios2::IO &io = adios.DeclareIO("ReadIO");
+
+        adios2::Engine &bpReader = io.Open(fname, adios2::Mode::Read);
+
+        auto var_i8 = io.InquireVariable<int8_t>("i8");
+        ASSERT_NE(var_i8, nullptr);
+        ASSERT_EQ(var_i8->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_i8->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_i8->m_Shape[0], mpiSize * Nx);
+
+        auto var_i16 = io.InquireVariable<int16_t>("i16");
+        ASSERT_NE(var_i16, nullptr);
+        ASSERT_EQ(var_i16->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_i16->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_i16->m_Shape[0], mpiSize * Nx);
+
+        auto var_i32 = io.InquireVariable<int32_t>("i32");
+        ASSERT_NE(var_i32, nullptr);
+        ASSERT_EQ(var_i32->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_i32->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_i32->m_Shape[0], mpiSize * Nx);
+
+        auto var_i64 = io.InquireVariable<int64_t>("i64");
+        ASSERT_NE(var_i64, nullptr);
+        ASSERT_EQ(var_i64->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_i64->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_i64->m_Shape[0], mpiSize * Nx);
+
+        auto var_u8 = io.InquireVariable<uint8_t>("u8");
+        ASSERT_NE(var_u8, nullptr);
+        ASSERT_EQ(var_u8->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_u8->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_u8->m_Shape[0], mpiSize * Nx);
+
+        auto var_u16 = io.InquireVariable<uint16_t>("u16");
+        ASSERT_NE(var_u16, nullptr);
+        ASSERT_EQ(var_u16->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_u16->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_u16->m_Shape[0], mpiSize * Nx);
+
+        auto var_u32 = io.InquireVariable<uint32_t>("u32");
+        ASSERT_NE(var_u32, nullptr);
+        ASSERT_EQ(var_u32->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_u32->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_u32->m_Shape[0], mpiSize * Nx);
+
+        auto var_u64 = io.InquireVariable<uint64_t>("u64");
+        ASSERT_NE(var_u64, nullptr);
+        ASSERT_EQ(var_u64->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_u64->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_u64->m_Shape[0], mpiSize * Nx);
+
+        auto var_r32 = io.InquireVariable<float>("r32");
+        ASSERT_NE(var_r32, nullptr);
+        ASSERT_EQ(var_r32->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_r32->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_r32->m_Shape[0], mpiSize * Nx);
+
+        auto var_r64 = io.InquireVariable<double>("r64");
+        ASSERT_NE(var_r64, nullptr);
+        ASSERT_EQ(var_r64->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_r64->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_r64->m_Shape[0], mpiSize * Nx);
+
+        std::string IString;
+        std::array<int8_t, Nx> I8;
+        std::array<int16_t, Nx> I16;
+        std::array<int32_t, Nx> I32;
+        std::array<int64_t, Nx> I64;
+        std::array<uint8_t, Nx> U8;
+        std::array<uint16_t, Nx> U16;
+        std::array<uint32_t, Nx> U32;
+        std::array<uint64_t, Nx> U64;
+        std::array<float, Nx> R32;
+        std::array<double, Nx> R64;
+
+        const adios2::Dims start{mpiRank * Nx};
+        const adios2::Dims count{Nx};
+
+        const adios2::Box<adios2::Dims> sel(start, count);
+
+        var_i8->SetSelection(sel);
+        var_i16->SetSelection(sel);
+        var_i32->SetSelection(sel);
+        var_i64->SetSelection(sel);
+
+        var_u8->SetSelection(sel);
+        var_u16->SetSelection(sel);
+        var_u32->SetSelection(sel);
+        var_u64->SetSelection(sel);
+
+        var_r32->SetSelection(sel);
+        var_r64->SetSelection(sel);
+
+        for (size_t t = 0; t < NSteps; ++t)
+        {
+            bpReader.BeginStep();
+
+            bpReader.GetDeferred(*var_i8, I8.data());
+            bpReader.GetDeferred(*var_i16, I16.data());
+            bpReader.GetDeferred(*var_i32, I32.data());
+            bpReader.GetDeferred(*var_i64, I64.data());
+
+            bpReader.GetDeferred(*var_u8, U8.data());
+            bpReader.GetDeferred(*var_u16, U16.data());
+            bpReader.GetDeferred(*var_u32, U32.data());
+            bpReader.GetDeferred(*var_u64, U64.data());
+
+            bpReader.GetDeferred(*var_r32, R32.data());
+            bpReader.GetDeferred(*var_r64, R64.data());
+
+            bpReader.PerformGets();
+
+            bpReader.EndStep();
+
+            UpdateSmallTestData(m_OriginalData, static_cast<int>(t), mpiRank,
+                                mpiSize);
+
+            for (size_t i = 0; i < Nx; ++i)
+            {
+                std::stringstream ss;
+                ss << "t=" << t << " i=" << i << " rank=" << mpiRank;
+                std::string msg = ss.str();
+
+                EXPECT_EQ(I8[i], m_OriginalData.I8[i]) << msg;
+                EXPECT_EQ(I16[i], m_OriginalData.I16[i]) << msg;
+                EXPECT_EQ(I32[i], m_OriginalData.I32[i]) << msg;
+                EXPECT_EQ(I64[i], m_OriginalData.I64[i]) << msg;
+                EXPECT_EQ(U8[i], m_OriginalData.U8[i]) << msg;
+                EXPECT_EQ(U16[i], m_OriginalData.U16[i]) << msg;
+                EXPECT_EQ(U32[i], m_OriginalData.U32[i]) << msg;
+                EXPECT_EQ(U64[i], m_OriginalData.U64[i]) << msg;
+                EXPECT_EQ(R32[i], m_OriginalData.R32[i]) << msg;
+                EXPECT_EQ(R64[i], m_OriginalData.R64[i]) << msg;
+            }
+        }
+        bpReader.Close();
+    }
+}
+
+//******************************************************************************
+// 2D 2x4 test data
+//******************************************************************************
+
+// ADIOS2 BP write, native ADIOS1 read
+TEST_F(BPWriteReadAsStreamTestADIOS2, ADIOS2BPWriteRead2D2x4)
+{
+    // Each process would write a 2x4 array and all processes would
+    // form a 2D 2 * (numberOfProcess*Nx) matrix where Nx is 4 here
+    const std::string fname("ADIOS2BPWriteReadAsStream2D2x4Test.bp");
+
+    int mpiRank = 0, mpiSize = 1;
+    // Number of rows
+    const std::size_t Nx = 4;
+
+    // Number of rows
+    const std::size_t Ny = 2;
+
+    // Number of steps
+    const std::size_t NSteps = 3;
+
+#ifdef ADIOS2_HAVE_MPI
+    MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
+    MPI_Comm_size(MPI_COMM_WORLD, &mpiSize);
+#endif
+
+// Write test data using ADIOS2
+
+#ifdef ADIOS2_HAVE_MPI
+    adios2::ADIOS adios(MPI_COMM_WORLD, adios2::DebugON);
+#else
+    adios2::ADIOS adios(true);
+#endif
+    {
+        adios2::IO &io = adios.DeclareIO("TestIO");
+
+        // Declare 2D variables (Ny * (NumOfProcesses * Nx))
+        // The local process' part (start, count) can be defined now or later
+        // before Write().
+        {
+            const adios2::Dims shape{Ny, static_cast<size_t>(Nx * mpiSize)};
+            const adios2::Dims start{0, static_cast<size_t>(mpiRank * Nx)};
+            const adios2::Dims count{Ny, Nx};
+
+            io.DefineVariable<int8_t>("i8", shape, start, count,
+                                      adios2::ConstantDims,
+                                      m_TestData.I8.data());
+            io.DefineVariable<int16_t>("i16", shape, start, count,
+                                       adios2::ConstantDims,
+                                       m_TestData.I16.data());
+            io.DefineVariable<int32_t>("i32", shape, start, count,
+                                       adios2::ConstantDims,
+                                       m_TestData.I32.data());
+            io.DefineVariable<int64_t>("i64", shape, start, count,
+                                       adios2::ConstantDims,
+                                       m_TestData.I64.data());
+
+            io.DefineVariable<uint8_t>("u8", shape, start, count,
+                                       adios2::ConstantDims,
+                                       m_TestData.U8.data());
+
+            io.DefineVariable<uint16_t>("u16", shape, start, count,
+                                        adios2::ConstantDims,
+                                        m_TestData.U16.data());
+            io.DefineVariable<uint32_t>("u32", shape, start, count,
+                                        adios2::ConstantDims,
+                                        m_TestData.U32.data());
+            io.DefineVariable<uint64_t>("u64", shape, start, count,
+                                        adios2::ConstantDims,
+                                        m_TestData.U64.data());
+
+            io.DefineVariable<float>("r32", shape, start, count,
+                                     adios2::ConstantDims,
+                                     m_TestData.R32.data());
+            io.DefineVariable<double>("r64", shape, start, count,
+                                      adios2::ConstantDims,
+                                      m_TestData.R64.data());
+        }
+
+        // Create the BP Engine
+        io.SetEngine("BPFileWriter");
+        io.AddTransport("file");
+
+        adios2::Engine &bpWriter = io.Open(fname, adios2::Mode::Write);
+
+        for (size_t step = 0; step < NSteps; ++step)
+        {
+            UpdateSmallTestData(m_TestData, static_cast<int>(step), mpiRank,
+                                mpiSize);
+            bpWriter.WriteStep();
+        }
+
+        // Close the file
+        bpWriter.Close();
+    }
+
+    {
+        adios2::IO &io = adios.DeclareIO("ReadIO");
+
+        adios2::Engine &bpReader = io.Open(fname, adios2::Mode::Read);
+
+        auto var_i8 = io.InquireVariable<int8_t>("i8");
+        ASSERT_NE(var_i8, nullptr);
+        ASSERT_EQ(var_i8->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_i8->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_i8->m_Shape[0], Ny);
+        ASSERT_EQ(var_i8->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_i16 = io.InquireVariable<int16_t>("i16");
+        ASSERT_NE(var_i16, nullptr);
+        ASSERT_EQ(var_i16->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_i16->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_i16->m_Shape[0], Ny);
+        ASSERT_EQ(var_i16->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_i32 = io.InquireVariable<int32_t>("i32");
+        ASSERT_NE(var_i32, nullptr);
+        ASSERT_EQ(var_i32->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_i32->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_i32->m_Shape[0], Ny);
+        ASSERT_EQ(var_i32->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_i64 = io.InquireVariable<int64_t>("i64");
+        ASSERT_NE(var_i64, nullptr);
+        ASSERT_EQ(var_i64->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_i64->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_i64->m_Shape[0], Ny);
+        ASSERT_EQ(var_i64->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_u8 = io.InquireVariable<uint8_t>("u8");
+        ASSERT_NE(var_u8, nullptr);
+        ASSERT_EQ(var_u8->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_u8->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_u8->m_Shape[0], Ny);
+        ASSERT_EQ(var_u8->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_u16 = io.InquireVariable<uint16_t>("u16");
+        ASSERT_NE(var_u16, nullptr);
+        ASSERT_EQ(var_u16->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_u16->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_u16->m_Shape[0], Ny);
+        ASSERT_EQ(var_u16->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_u32 = io.InquireVariable<uint32_t>("u32");
+        ASSERT_NE(var_u32, nullptr);
+        ASSERT_EQ(var_u32->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_u32->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_u32->m_Shape[0], Ny);
+        ASSERT_EQ(var_u32->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_u64 = io.InquireVariable<uint64_t>("u64");
+        ASSERT_NE(var_u64, nullptr);
+        ASSERT_EQ(var_u64->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_u64->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_u64->m_Shape[0], Ny);
+        ASSERT_EQ(var_u64->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_r32 = io.InquireVariable<float>("r32");
+        ASSERT_NE(var_r32, nullptr);
+        ASSERT_EQ(var_r32->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_r32->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_r32->m_Shape[0], Ny);
+        ASSERT_EQ(var_r32->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_r64 = io.InquireVariable<double>("r64");
+        ASSERT_NE(var_r64, nullptr);
+        ASSERT_EQ(var_r64->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_r64->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_r64->m_Shape[0], Ny);
+        ASSERT_EQ(var_r64->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        // If the size of the array is smaller than the data
+        // the result is weird... double and uint64_t would get
+        // completely garbage data
+        std::array<int8_t, Nx * Ny> I8;
+        std::array<int16_t, Nx * Ny> I16;
+        std::array<int32_t, Nx * Ny> I32;
+        std::array<int64_t, Nx * Ny> I64;
+        std::array<uint8_t, Nx * Ny> U8;
+        std::array<uint16_t, Nx * Ny> U16;
+        std::array<uint32_t, Nx * Ny> U32;
+        std::array<uint64_t, Nx * Ny> U64;
+        std::array<float, Nx * Ny> R32;
+        std::array<double, Nx * Ny> R64;
+
+        const adios2::Dims start{0, static_cast<size_t>(mpiRank * Nx)};
+        const adios2::Dims count{Ny, Nx};
+
+        const adios2::Box<adios2::Dims> sel(start, count);
+
+        var_i8->SetSelection(sel);
+        var_i16->SetSelection(sel);
+        var_i32->SetSelection(sel);
+        var_i64->SetSelection(sel);
+
+        var_u8->SetSelection(sel);
+        var_u16->SetSelection(sel);
+        var_u32->SetSelection(sel);
+        var_u64->SetSelection(sel);
+
+        var_r32->SetSelection(sel);
+        var_r64->SetSelection(sel);
+
+        for (size_t t = 0; t < NSteps; ++t)
+        {
+            bpReader.BeginStep();
+
+            bpReader.GetDeferred(*var_i8, I8.data());
+            bpReader.GetDeferred(*var_i16, I16.data());
+            bpReader.GetDeferred(*var_i32, I32.data());
+            bpReader.GetDeferred(*var_i64, I64.data());
+
+            bpReader.GetDeferred(*var_u8, U8.data());
+            bpReader.GetDeferred(*var_u16, U16.data());
+            bpReader.GetDeferred(*var_u32, U32.data());
+            bpReader.GetDeferred(*var_u64, U64.data());
+
+            bpReader.GetDeferred(*var_r32, R32.data());
+            bpReader.GetDeferred(*var_r64, R64.data());
+
+            bpReader.PerformGets();
+            bpReader.EndStep();
+
+            // Generate test data for each rank uniquely
+            UpdateSmallTestData(m_OriginalData, static_cast<int>(t), mpiRank,
+                                mpiSize);
+
+            for (size_t i = 0; i < Nx * Ny; ++i)
+            {
+                std::stringstream ss;
+                ss << "t=" << t << " i=" << i << " rank=" << mpiRank;
+                std::string msg = ss.str();
+
+                EXPECT_EQ(I8[i], m_OriginalData.I8[i]) << msg;
+                EXPECT_EQ(I16[i], m_OriginalData.I16[i]) << msg;
+                EXPECT_EQ(I32[i], m_OriginalData.I32[i]) << msg;
+                EXPECT_EQ(I64[i], m_OriginalData.I64[i]) << msg;
+                EXPECT_EQ(U8[i], m_OriginalData.U8[i]) << msg;
+                EXPECT_EQ(U16[i], m_OriginalData.U16[i]) << msg;
+                EXPECT_EQ(U32[i], m_OriginalData.U32[i]) << msg;
+                EXPECT_EQ(U64[i], m_OriginalData.U64[i]) << msg;
+                EXPECT_EQ(R32[i], m_OriginalData.R32[i]) << msg;
+                EXPECT_EQ(R64[i], m_OriginalData.R64[i]) << msg;
+            }
+        }
+        bpReader.Close();
+    }
+}
+
+//******************************************************************************
+// 2D 4x2 test data
+//******************************************************************************
+
+TEST_F(BPWriteReadAsStreamTestADIOS2, ADIOS2BPWriteRead2D4x2)
+{
+    // Each process would write a 4x2 array and all processes would
+    // form a 2D 4 * (NumberOfProcess * Nx) matrix where Nx is 2 here
+    const std::string fname("ADIOS2BPWriteReadAsStream2D4x2Test.bp");
+
+    int mpiRank = 0, mpiSize = 1;
+    // Number of rows
+    const std::size_t Nx = 2;
+    // Number of cols
+    const std::size_t Ny = 4;
+
+    // Number of steps
+    const std::size_t NSteps = 3;
+
+#ifdef ADIOS2_HAVE_MPI
+    MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
+    MPI_Comm_size(MPI_COMM_WORLD, &mpiSize);
+#endif
+
+// Write test data using ADIOS2
+
+#ifdef ADIOS2_HAVE_MPI
+    adios2::ADIOS adios(MPI_COMM_WORLD, adios2::DebugON);
+#else
+    adios2::ADIOS adios(true);
+#endif
+    {
+        adios2::IO &io = adios.DeclareIO("TestIO");
+
+        // Declare 2D variables (4 * (NumberOfProcess * Nx))
+        // The local process' part (start, count) can be defined now or later
+        // before Write().
+        {
+            adios2::Dims shape{static_cast<unsigned int>(Ny),
+                               static_cast<unsigned int>(mpiSize * Nx)};
+            adios2::Dims start{static_cast<unsigned int>(0),
+                               static_cast<unsigned int>(mpiRank * Nx)};
+            adios2::Dims count{static_cast<unsigned int>(Ny),
+                               static_cast<unsigned int>(Nx)};
+
+            io.DefineVariable<int8_t>("i8", shape, start, count,
+                                      adios2::ConstantDims,
+                                      m_TestData.I8.data());
+            io.DefineVariable<int16_t>("i16", shape, start, count,
+                                       adios2::ConstantDims,
+                                       m_TestData.I16.data());
+            io.DefineVariable<int32_t>("i32", shape, start, count,
+                                       adios2::ConstantDims,
+                                       m_TestData.I32.data());
+            io.DefineVariable<int64_t>("i64", shape, start, count,
+                                       adios2::ConstantDims,
+                                       m_TestData.I64.data());
+
+            io.DefineVariable<uint8_t>("u8", shape, start, count,
+                                       adios2::ConstantDims,
+                                       m_TestData.U8.data());
+
+            io.DefineVariable<uint16_t>("u16", shape, start, count,
+                                        adios2::ConstantDims,
+                                        m_TestData.U16.data());
+            io.DefineVariable<uint32_t>("u32", shape, start, count,
+                                        adios2::ConstantDims,
+                                        m_TestData.U32.data());
+            io.DefineVariable<uint64_t>("u64", shape, start, count,
+                                        adios2::ConstantDims,
+                                        m_TestData.U64.data());
+
+            io.DefineVariable<float>("r32", shape, start, count,
+                                     adios2::ConstantDims,
+                                     m_TestData.R32.data());
+            io.DefineVariable<double>("r64", shape, start, count,
+                                      adios2::ConstantDims,
+                                      m_TestData.R64.data());
+        }
+
+        // Create the BP Engine
+        io.SetEngine("BPFileWriter");
+
+        io.AddTransport("file");
+
+        adios2::Engine &bpWriter = io.Open(fname, adios2::Mode::Write);
+
+        for (size_t step = 0; step < NSteps; ++step)
+        {
+            // Generate test data for each process uniquely
+            UpdateSmallTestData(m_TestData, static_cast<int>(step), mpiRank,
+                                mpiSize);
+            bpWriter.WriteStep();
+        }
+
+        // Close the file
+        bpWriter.Close();
+    }
+
+    {
+        adios2::IO &io = adios.DeclareIO("ReadIO");
+
+        adios2::Engine &bpReader = io.Open(fname, adios2::Mode::Read);
+
+        auto var_i8 = io.InquireVariable<int8_t>("i8");
+        ASSERT_NE(var_i8, nullptr);
+        ASSERT_EQ(var_i8->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_i8->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_i8->m_Shape[0], Ny);
+        ASSERT_EQ(var_i8->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_i16 = io.InquireVariable<int16_t>("i16");
+        ASSERT_NE(var_i16, nullptr);
+        ASSERT_EQ(var_i16->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_i16->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_i16->m_Shape[0], Ny);
+        ASSERT_EQ(var_i16->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_i32 = io.InquireVariable<int32_t>("i32");
+        ASSERT_NE(var_i32, nullptr);
+        ASSERT_EQ(var_i32->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_i32->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_i32->m_Shape[0], Ny);
+        ASSERT_EQ(var_i32->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_i64 = io.InquireVariable<int64_t>("i64");
+        ASSERT_NE(var_i64, nullptr);
+        ASSERT_EQ(var_i64->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_i64->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_i64->m_Shape[0], Ny);
+        ASSERT_EQ(var_i64->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_u8 = io.InquireVariable<uint8_t>("u8");
+        ASSERT_NE(var_u8, nullptr);
+        ASSERT_EQ(var_u8->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_u8->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_u8->m_Shape[0], Ny);
+        ASSERT_EQ(var_u8->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_u16 = io.InquireVariable<uint16_t>("u16");
+        ASSERT_NE(var_u16, nullptr);
+        ASSERT_EQ(var_u16->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_u16->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_u16->m_Shape[0], Ny);
+        ASSERT_EQ(var_u16->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_u32 = io.InquireVariable<uint32_t>("u32");
+        ASSERT_NE(var_u32, nullptr);
+        ASSERT_EQ(var_u32->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_u32->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_u32->m_Shape[0], Ny);
+        ASSERT_EQ(var_u32->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_u64 = io.InquireVariable<uint64_t>("u64");
+        ASSERT_NE(var_u64, nullptr);
+        ASSERT_EQ(var_u64->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_u64->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_u64->m_Shape[0], Ny);
+        ASSERT_EQ(var_u64->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_r32 = io.InquireVariable<float>("r32");
+        ASSERT_NE(var_r32, nullptr);
+        ASSERT_EQ(var_r32->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_r32->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_r32->m_Shape[0], Ny);
+        ASSERT_EQ(var_r32->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        auto var_r64 = io.InquireVariable<double>("r64");
+        ASSERT_NE(var_r64, nullptr);
+        ASSERT_EQ(var_r64->m_ShapeID, adios2::ShapeID::GlobalArray);
+        ASSERT_EQ(var_r64->m_AvailableStepsCount, NSteps);
+        ASSERT_EQ(var_r64->m_Shape[0], Ny);
+        ASSERT_EQ(var_r64->m_Shape[1], static_cast<size_t>(mpiSize * Nx));
+
+        // If the size of the array is smaller than the data
+        // the result is weird... double and uint64_t would get
+        // completely garbage data
+        std::array<int8_t, Nx * Ny> I8;
+        std::array<int16_t, Nx * Ny> I16;
+        std::array<int32_t, Nx * Ny> I32;
+        std::array<int64_t, Nx * Ny> I64;
+        std::array<uint8_t, Nx * Ny> U8;
+        std::array<uint16_t, Nx * Ny> U16;
+        std::array<uint32_t, Nx * Ny> U32;
+        std::array<uint64_t, Nx * Ny> U64;
+        std::array<float, Nx * Ny> R32;
+        std::array<double, Nx * Ny> R64;
+
+        const adios2::Dims start{0, static_cast<size_t>(mpiRank * Nx)};
+        const adios2::Dims count{Ny, Nx};
+
+        const adios2::Box<adios2::Dims> sel(start, count);
+
+        var_i8->SetSelection(sel);
+        var_i16->SetSelection(sel);
+        var_i32->SetSelection(sel);
+        var_i64->SetSelection(sel);
+
+        var_u8->SetSelection(sel);
+        var_u16->SetSelection(sel);
+        var_u32->SetSelection(sel);
+        var_u64->SetSelection(sel);
+
+        var_r32->SetSelection(sel);
+        var_r64->SetSelection(sel);
+
+        for (size_t t = 0; t < NSteps; ++t)
+        {
+            bpReader.BeginStep();
+
+            bpReader.GetDeferred(*var_i8, I8.data());
+            bpReader.GetDeferred(*var_i16, I16.data());
+            bpReader.GetDeferred(*var_i32, I32.data());
+            bpReader.GetDeferred(*var_i64, I64.data());
+
+            bpReader.GetDeferred(*var_u8, U8.data());
+            bpReader.GetDeferred(*var_u16, U16.data());
+            bpReader.GetDeferred(*var_u32, U32.data());
+            bpReader.GetDeferred(*var_u64, U64.data());
+
+            bpReader.GetDeferred(*var_r32, R32.data());
+            bpReader.GetDeferred(*var_r64, R64.data());
+
+            bpReader.PerformGets();
+            bpReader.EndStep();
+
+            // Generate test data for each rank uniquely
+            UpdateSmallTestData(m_OriginalData, static_cast<int>(t), mpiRank,
+                                mpiSize);
+
+            for (size_t i = 0; i < Nx * Ny; ++i)
+            {
+                std::stringstream ss;
+                ss << "t=" << t << " i=" << i << " rank=" << mpiRank;
+                std::string msg = ss.str();
+
+                EXPECT_EQ(I8[i], m_OriginalData.I8[i]) << msg;
+                EXPECT_EQ(I16[i], m_OriginalData.I16[i]) << msg;
+                EXPECT_EQ(I32[i], m_OriginalData.I32[i]) << msg;
+                EXPECT_EQ(I64[i], m_OriginalData.I64[i]) << msg;
+                EXPECT_EQ(U8[i], m_OriginalData.U8[i]) << msg;
+                EXPECT_EQ(U16[i], m_OriginalData.U16[i]) << msg;
+                EXPECT_EQ(U32[i], m_OriginalData.U32[i]) << msg;
+                EXPECT_EQ(U64[i], m_OriginalData.U64[i]) << msg;
+                EXPECT_EQ(R32[i], m_OriginalData.R32[i]) << msg;
+                EXPECT_EQ(R64[i], m_OriginalData.R64[i]) << msg;
+            }
+        }
+        bpReader.Close();
+    }
+}
+
+//******************************************************************************
+// main
+//******************************************************************************
+
+int main(int argc, char **argv)
+{
+#ifdef ADIOS2_HAVE_MPI
+    MPI_Init(nullptr, nullptr);
+#endif
+
+    ::testing::InitGoogleTest(&argc, argv);
+    int result = RUN_ALL_TESTS();
+
+#ifdef ADIOS2_HAVE_MPI
+    MPI_Finalize();
+#endif
+
+    return result;
+}