Unverified Commit 590fd5c1 authored by Podhorszki, Norbert's avatar Podhorszki, Norbert Committed by GitHub
Browse files

Merge pull request #3234 from pnorbert/test-performdatawrite

Test writing data with multiple PerformDataWrite and read back. This …
parents a1ad2061 57e4eaad
Loading
Loading
Loading
Loading
+22 −20
Original line number Diff line number Diff line
@@ -200,35 +200,37 @@ void BP5Reader::ReadData(const size_t WriterRank, const size_t Timestep,
                                     {{"transport", "File"}}, false);
    }

    /* Each block is in exactly one flush. The StartOffset was calculated
       as if all the flushes were in a single contiguous block in file.
    */
    size_t InfoStartPos =
        DataPosPos + (WriterRank * (2 * FlushCount + 1) * sizeof(uint64_t));
    size_t ThisFlushInfo = InfoStartPos;
    size_t RemainingLength = Length;
    size_t ThisDataPos;
    size_t Offset = StartOffset;
    size_t SumDataSize = 0; // count in contiguous space
    for (size_t flush = 0; flush < FlushCount; flush++)
    {

        ThisDataPos =
            helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, ThisFlushInfo,
        size_t ThisDataPos =
            helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, InfoStartPos,
                                        m_Minifooter.IsLittleEndian);
        size_t ThisDataSize =
            helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, ThisFlushInfo,
            helper::ReadValue<uint64_t>(m_MetadataIndex.m_Buffer, InfoStartPos,
                                        m_Minifooter.IsLittleEndian);
        if (ThisDataSize > RemainingLength)
            ThisDataSize = RemainingLength;
        m_DataFileManager.ReadFile(Destination, ThisDataSize,

        if (StartOffset < SumDataSize + ThisDataSize)
        {
            // discount offsets of skipped flushes
            size_t Offset = StartOffset - SumDataSize;
            m_DataFileManager.ReadFile(Destination, Length,
                                       ThisDataPos + Offset, SubfileNum);
        Destination += ThisDataSize;
        RemainingLength -= ThisDataSize;
        Offset = 0;
        if (RemainingLength == 0)
            return;
        }
    ThisDataPos = helper::ReadValue<uint64_t>(
        m_MetadataIndex.m_Buffer, ThisFlushInfo, m_Minifooter.IsLittleEndian);
    m_DataFileManager.ReadFile(Destination, RemainingLength,
                               ThisDataPos + Offset, SubfileNum);
        SumDataSize += ThisDataSize;
    }

    size_t ThisDataPos = helper::ReadValue<uint64_t>(
        m_MetadataIndex.m_Buffer, InfoStartPos, m_Minifooter.IsLittleEndian);
    size_t Offset = StartOffset - SumDataSize;
    m_DataFileManager.ReadFile(Destination, Length, ThisDataPos + Offset,
                               SubfileNum);
}

void BP5Reader::PerformGets()
+143 −0
Original line number Diff line number Diff line
@@ -2085,6 +2085,149 @@ TEST_F(BPWriteReadMultiblockTest, ADIOS2BPWriteReadMultiblock2D4x2)
    }
}

//******************************************************************************
// Test flushing data within the step and that read works properly for all
// blocks across all flushes
//******************************************************************************

TEST_F(BPWriteReadMultiblockTest, MultiblockPerformDataWrite)
{
    if (engineName != "BP5")
    {
        std::cout << "Engine " << engineName
                  << " is not tested for this feature." << std::endl;
        return;
    }
    // Each process would write a 1x8 array and all processes would
    // form a mpiSize * Nx 1D array
    const std::string fname("MultiblockPerformDataWrite.bp");

    int mpiRank = 0, mpiSize = 1;
    // Number of elements per blocks (blocksize)
    const size_t Nx = 8;
    // Number of blocks per process (= number of flushes)
    const size_t Nblocks = 3;
    // Number of steps
    const size_t NSteps = 3;

#if ADIOS2_USE_MPI
    MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
    MPI_Comm_size(MPI_COMM_WORLD, &mpiSize);
#endif

    // Write test data using BP

#if ADIOS2_USE_MPI
    adios2::ADIOS adios(MPI_COMM_WORLD);
#else
    adios2::ADIOS adios;
#endif
    /* Write output */
    {
        adios2::IO io = adios.DeclareIO("TestIO");
        const adios2::Dims shape{static_cast<size_t>(mpiSize),
                                 static_cast<size_t>(Nx * Nblocks)};
        const adios2::Dims start{static_cast<size_t>(mpiRank), 0};
        const adios2::Dims count{1, Nx};

        auto var_i32 = io.DefineVariable<int32_t>("i32", shape, start, count);

        if (!engineName.empty())
        {
            io.SetEngine(engineName);
        }
        else
        {
            // Create the BP Engine
            io.SetEngine("BPFile");
        }

        adios2::Engine bpWriter = io.Open(fname, adios2::Mode::Write);

        for (size_t step = 0; step < NSteps; ++step)
        {
            bpWriter.BeginStep();

            for (size_t b = 0; b < Nblocks; ++b)
            {
                // Generate test data for each process / block uniquely
                int t = static_cast<int>(step * Nblocks + b);
                SmallTestData currentTestData =
                    generateNewSmallTestData(m_TestData, t, mpiRank, mpiSize);

                const adios2::Box<adios2::Dims> sel({(size_t)mpiRank, b * Nx},
                                                    {1, Nx});
                var_i32.SetSelection(sel);
                bpWriter.Put(var_i32, currentTestData.I32.data());

                bpWriter.PerformDataWrite();
            }
            bpWriter.EndStep();
        }

        // Close the file
        bpWriter.Close();
    }

    /* Read back each step, each block and check the unique values.
       Different blocks in each step are coming from different flushes.
    */
    {
        adios2::IO io = adios.DeclareIO("ReadIO");

        if (!engineName.empty())
        {
            io.SetEngine(engineName);
        }

        adios2::Engine bpReader =
            io.Open(fname, adios2::Mode::ReadRandomAccess);

        auto var_i32 = io.InquireVariable<int32_t>("i32");
        EXPECT_TRUE(var_i32);
        EXPECT_EQ(var_i32.ShapeID(), adios2::ShapeID::GlobalArray);
        EXPECT_EQ(var_i32.Steps(), NSteps);
        EXPECT_EQ(var_i32.Shape()[0], mpiSize);
        EXPECT_EQ(var_i32.Shape()[1], Nx * Nblocks);

        SmallTestData testData;
        std::array<int32_t, Nx> I32;

        const auto i32AllInfo = bpReader.AllStepsBlocksInfo(var_i32);
        EXPECT_EQ(i32AllInfo.size(), NSteps);

        for (size_t step = 0; step < NSteps; step++)
        {
            var_i32.SetStepSelection({step, 1});
            for (size_t b = 0; b < Nblocks; ++b)
            {
                std::cout << "Read step " << step << " block=" << b
                          << std::endl;
                // Generate test data for each process / block uniquely
                int t = static_cast<int>(step * Nblocks + b);
                SmallTestData currentTestData =
                    generateNewSmallTestData(m_TestData, t, mpiRank, mpiSize);

                const adios2::Box<adios2::Dims> sel({(size_t)mpiRank, b * Nx},
                                                    {1, Nx});
                var_i32.SetSelection(sel);
                bpReader.Get(var_i32, I32.data(), adios2::Mode::Sync);

                /* check content of a single block */
                for (size_t i = 0; i < Nx; ++i)
                {
                    std::stringstream ss;
                    ss << "step=" << step << " block=" << b << " i=" << i
                       << " rank=" << mpiRank;
                    std::string msg = ss.str();
                    EXPECT_EQ(I32[i], currentTestData.I32[i]) << msg;
                }
            }
        }
        bpReader.Close();
    }
}

//******************************************************************************
// main
//******************************************************************************