Commit 07153cf1 authored by Shawn Yang's avatar Shawn Yang
Browse files

Merge branch 'ndcopy' of github.com:JasonRuonanWang/ADIOS2 into ndcopy

parents b9a23533 ba24281b
Loading
Loading
Loading
Loading
+7 −0
Original line number Diff line number Diff line
@@ -10,6 +10,13 @@ if(ADIOS2_HAVE_MPI)
endif()
gtest_add_tests(TARGET TestDataManP2P2DMemSelect)

add_executable(TestDataManP2P3DMemSelect TestDataManP2P3DMemSelect.cpp)
target_link_libraries(TestDataManP2P3DMemSelect adios2 gtest)
if(ADIOS2_HAVE_MPI)
    target_link_libraries(TestDataManP2P3DMemSelect MPI::MPI_C)
endif()
gtest_add_tests(TARGET TestDataManP2P3DMemSelect)

add_executable(TestDataManP2P1D TestDataManP2P1D.cpp)
target_link_libraries(TestDataManP2P1D adios2 gtest)
if(ADIOS2_HAVE_MPI)
+10 −8
Original line number Diff line number Diff line
@@ -52,20 +52,22 @@ void PrintData(const T *data, const size_t step, const Dims &start,

    std::cout << "]" << std::endl;
}

template <class T>
void GenData(std::vector<T> &data, const size_t step, const Dims &start,
             const Dims &count, const Dims &shape)
{
    if (start.size() == 2)
    {
        for (size_t i = 0; i < count[0]; ++i)
        {
            for (size_t j = 0; j < count[1]; ++j)
            {
            data[i * count[1] + j] = (i + start[1]) * (shape[1]) + j + start[0];
                data[i * count[1] + j] =
                    (i + start[1]) * shape[1] + j + start[0];
            }
        }
    }
    std::cout << "GenData\n";

    PrintData(data.data(), step, start, count);
}

template <class T>
@@ -334,8 +336,8 @@ void DataManReaderP2PMemSelect(const Dims &shape, const Dims &start,
    dataManReader.Close();
    print_lines = 0;
}

#ifdef ADIOS2_HAVE_ZEROMQ
#ifdef ADIOS2_HAVE_SZ
TEST_F(DataManEngineTest, WriteRead_2D_MemSelect)
{
    // set parameters
@@ -366,7 +368,7 @@ TEST_F(DataManEngineTest, WriteRead_2D_MemSelect)
    r.join();
    std::cout << "Reader thread ended" << std::endl;
}
#endif // SZ

#endif // ZEROMQ

int main(int argc, char **argv)
+237 −0
Original line number Diff line number Diff line
/*
 * Distributed under the OSI-approved Apache License, Version 2.0.  See
 * accompanying file Copyright.txt for details.
 *
 * TestDataManP2P3SDMemSelect.cpp
 *
 *  Created on: Nov 24, 2018
 *      Author: Jason Wang
 */

#include <adios2.h>
#include <gtest/gtest.h>
#include <numeric>
#include <thread>

using namespace adios2;
int mpiRank = 0;
int mpiSize = 1;
size_t print_lines = 0;

Dims shape = {4, 4, 4};
std::vector<int> global_data = {0,   1,   2,   3,   10,  11,  12,  13,
                                20,  21,  22,  23,  30,  31,  32,  33,

                                100, 101, 102, 103, 110, 111, 112, 113,
                                120, 121, 122, 123, 130, 131, 132, 133,

                                200, 201, 202, 203, 210, 211, 212, 213,
                                220, 221, 222, 223, 230, 231, 232, 233,

                                300, 301, 302, 303, 310, 311, 312, 313,
                                320, 321, 322, 323, 330, 331, 332, 333};

Dims start = {1, 2, 1};
Dims count = {2, 1, 2};
std::vector<int> writer_data = {121, 122, 221, 222};

Dims memstart = {0, 1, 1};
Dims memcount = {3, 3, 3};
std::vector<int> reader_data = {11,  12,  13,  21,  22,  23,  31,  32,  33,

                                111, 112, 113, 121, 122, 123, 131, 132, 133,

                                211, 212, 213, 221, 222, 223, 231, 232, 233};

class DataManEngineTest : public ::testing::Test
{
public:
    DataManEngineTest() = default;
};

template <class T>
void PrintData(const T *data, const size_t step, const Dims &start,
               const Dims &count)
{
    size_t size = std::accumulate(count.begin(), count.end(), 1,
                                  std::multiplies<size_t>());
    std::cout << "Rank: " << mpiRank << " Step: " << step << " Size:" << size
              << "\n";
    size_t printsize = 128;

    if (size < printsize)
    {
        printsize = size;
    }
    int s = 0;
    for (size_t i = 0; i < printsize; ++i)
    {
        ++s;
        std::cout << data[i] << " ";
        if (s == count[1])
        {
            std::cout << std::endl;
            s = 0;
        }
    }

    std::cout << "]" << std::endl;
}

void VerifyData(const int *data, size_t step, const Dims &start,
                const Dims &count, const Dims &shape)
{
    size_t size = std::accumulate(count.begin(), count.end(), 1,
                                  std::multiplies<size_t>());
    bool compressed = false;
    if (print_lines < 100)
    {
        PrintData(data, step, start, count);
        ++print_lines;
    }
    for (size_t i = 0; i < size; ++i)
    {
        if (!compressed)
        {
            ASSERT_EQ(data[i], reader_data[i]);
        }
    }
}

void DataManWriterP2PMemSelect(const Dims &shape, const Dims &start,
                               const Dims &count, const size_t steps,
                               const adios2::Params &engineParams,
                               const std::vector<adios2::Params> &transParams)
{
    size_t datasize = std::accumulate(count.begin(), count.end(), 1,
                                      std::multiplies<size_t>());
    adios2::ADIOS adios(MPI_COMM_SELF, adios2::DebugON);
    adios2::IO dataManIO = adios.DeclareIO("WAN");
    dataManIO.SetEngine("DataMan");
    dataManIO.SetParameters(engineParams);
    for (const auto &params : transParams)
    {
        dataManIO.AddTransport("WAN", params);
    }
    auto bpInts = dataManIO.DefineVariable<int>("bpInts", shape, start, count);
    adios2::Engine dataManWriter =
        dataManIO.Open("stream", adios2::Mode::Write);
    for (int i = 0; i < steps; ++i)
    {
        dataManWriter.BeginStep();
        dataManWriter.Put(bpInts, writer_data.data(), adios2::Mode::Sync);
        dataManWriter.EndStep();
    }
    dataManWriter.Close();
}

void DataManReaderP2PMemSelect(const Dims &shape, const Dims &start,
                               const Dims &count, const Dims &memStart,
                               const Dims &memCount, const size_t steps,
                               const adios2::Params &engineParams,
                               const std::vector<adios2::Params> &transParams)
{
    adios2::ADIOS adios(MPI_COMM_SELF, adios2::DebugON);
    adios2::IO dataManIO = adios.DeclareIO("WAN");
    dataManIO.SetEngine("DataMan");
    dataManIO.SetParameters(engineParams);
    for (const auto &params : transParams)
    {
        dataManIO.AddTransport("WAN", params);
    }
    adios2::Engine dataManReader = dataManIO.Open("stream", adios2::Mode::Read);

    size_t datasize = std::accumulate(memCount.begin(), memCount.end(), 1,
                                      std::multiplies<size_t>());
    std::vector<int> myInts = reader_data;
    size_t i;
    for (i = 0; i < steps; ++i)
    {
        adios2::StepStatus status =
            dataManReader.BeginStep(StepMode::NextAvailable, 5);
        if (status == adios2::StepStatus::OK)
        {
            const auto &vars = dataManIO.AvailableVariables();
            ASSERT_EQ(vars.size(), 1);
            if (print_lines == 0)
            {
                std::cout << "All available variables : ";
                for (const auto &var : vars)
                {
                    std::cout << var.first << ", ";
                }
                std::cout << std::endl;
            }
            size_t currentStep = dataManReader.CurrentStep();
            ASSERT_EQ(i, currentStep);
            adios2::Variable<int> bpInts =
                dataManIO.InquireVariable<int>("bpInts");

            bpInts.SetSelection({start, count});

            bpInts.SetMemorySelection({memStart, memCount});

            dataManReader.Get(bpInts, myInts.data(), adios2::Mode::Sync);
            VerifyData(myInts.data(), currentStep, memStart, memCount, shape);
            dataManReader.EndStep();
        }
        else
        {
            std::cout << "DataManReader end of stream at Step " << i
                      << std::endl;
            break;
        }
    }
    ASSERT_EQ(i, steps);
    dataManReader.Close();
    print_lines = 0;
}

#ifdef ADIOS2_HAVE_ZEROMQ
TEST_F(DataManEngineTest, WriteRead_3D_MemSelect)
{

    size_t steps = 1;
    adios2::Params engineParams = {{"WorkflowMode", "p2p"}};
    std::vector<adios2::Params> transportParams = {{
        {"Library", "ZMQ"}, {"IPAddress", "127.0.0.1"}, {"Port", "12322"},
    }};
    // run workflow
    auto r =
        std::thread(DataManReaderP2PMemSelect, shape, start, count, memstart,
                    memcount, steps, engineParams, transportParams);
    std::cout << "Reader thread started" << std::endl;

    auto w = std::thread(DataManWriterP2PMemSelect, shape, start, count, steps,
                         engineParams, transportParams);
    std::cout << "Writer thread started" << std::endl;
    w.join();
    std::cout << "Writer thread ended" << std::endl;
    r.join();
    std::cout << "Reader thread ended" << std::endl;
}

#endif // ZEROMQ

int main(int argc, char **argv)
{
    int mpi_provided;
    MPI_Init_thread(&argc, &argv, MPI_THREAD_MULTIPLE, &mpi_provided);
    std::cout << "MPI_Init_thread required Mode " << MPI_THREAD_MULTIPLE
              << " and provided Mode " << mpi_provided << std::endl;
    if (mpi_provided != MPI_THREAD_MULTIPLE)
    {
        MPI_Finalize();
        return 0;
    }
    MPI_Comm_rank(MPI_COMM_WORLD, &mpiRank);
    MPI_Comm_size(MPI_COMM_WORLD, &mpiSize);

    int result;
    ::testing::InitGoogleTest(&argc, argv);
    result = RUN_ALL_TESTS();

    MPI_Finalize();

    return result;
}