Unverified Commit 1f436939 authored by pnorbert's avatar pnorbert Committed by GitHub
Browse files

Merge pull request #991 from pnorbert/iotest_hdf5

Added Native Parallel HDF5 writer/reader to adios_iotest
parents ae4b1dae e7dfa40c
Loading
Loading
Loading
Loading
+14 −1
Original line number Diff line number Diff line
@@ -7,9 +7,22 @@ file(COPY ${CMAKE_CURRENT_SOURCE_DIR}/iotest-config
  DESTINATION ${PROJECT_BINARY_DIR}
)

add_executable(adios_iotest settings.cpp decomp.cpp processConfig.cpp adios_iotest.cpp)
add_executable(adios_iotest settings.cpp decomp.cpp processConfig.cpp ioGroup.cpp stream.cpp adiosStream.cpp adios_iotest.cpp)
target_link_libraries(adios_iotest adios2 MPI::MPI_C)

if(ADIOS2_HAVE_HDF5)
  if(HDF5_C_INCLUDE_DIRS)
    target_include_directories(adios_iotest PRIVATE ${HDF5_C_INCLUDE_DIRS})
  else()
    target_include_directories(adios_iotest PRIVATE ${HDF5_INCLUDE_DIRS})
  endif()

  target_sources(adios_iotest PRIVATE hdf5Stream.cpp)

  target_link_libraries(adios_iotest ${HDF5_C_LIBRARIES})
endif()


install(TARGETS adios_iotest EXPORT adios2
  RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
)
+251 −0
Original line number Diff line number Diff line
/*
 * adiosStream.cpp
 *
 *  Created on: Nov 2018
 *      Author: Norbert Podhorszki
 */

#include "adiosStream.h"

#include <cstdlib>
#include <iostream>
#include <map>
#include <math.h>
#include <stdexcept>
#include <string>

adiosStream::adiosStream(const std::string &streamName, adios2::IO &io,
                         const adios2::Mode mode, MPI_Comm comm)
: Stream(streamName, mode), io(io)
{

    if (mode == adios2::Mode::Write)
    {
        engine = io.Open(streamName, adios2::Mode::Write, comm);
    }
    else
    {
        engine = io.Open(streamName, adios2::Mode::Read, comm);
    }
}

adiosStream::~adiosStream(){};

void adiosStream::defineADIOSArray(const std::shared_ptr<VariableInfo> ov)
{
    if (ov->type == "double")
    {
        adios2::Variable<double> v = io.DefineVariable<double>(
            ov->name, ov->shape, ov->start, ov->count, true);
        // v = io->InquireVariable<double>(ov->name);
    }
    else if (ov->type == "float")
    {
        adios2::Variable<float> v = io.DefineVariable<float>(
            ov->name, ov->shape, ov->start, ov->count, true);
    }
    else if (ov->type == "int")
    {
        adios2::Variable<int> v = io.DefineVariable<int>(
            ov->name, ov->shape, ov->start, ov->count, true);
    }
}

void adiosStream::putADIOSArray(const std::shared_ptr<VariableInfo> ov)
{
    if (ov->type == "double")
    {
        const double *a = reinterpret_cast<const double *>(ov->data.data());
        engine.Put<double>(ov->name, a);
    }
    else if (ov->type == "float")
    {
        const float *a = reinterpret_cast<const float *>(ov->data.data());
        engine.Put<float>(ov->name, a);
    }
    else if (ov->type == "int")
    {
        const int *a = reinterpret_cast<const int *>(ov->data.data());
        engine.Put<int>(ov->name, a);
    }
}

void adiosStream::getADIOSArray(std::shared_ptr<VariableInfo> ov)
{
    if (ov->type == "double")
    {
        adios2::Variable<double> v = io.InquireVariable<double>(ov->name);
        if (!v)
        {
            ov->readFromInput = false;
            return;
        }
        v.SetSelection({ov->start, ov->count});
        double *a = reinterpret_cast<double *>(ov->data.data());
        engine.Get<double>(v, a);
        ov->readFromInput = true;
    }
    else if (ov->type == "float")
    {
        adios2::Variable<float> v = io.InquireVariable<float>(ov->name);
        if (!v)
        {
            ov->readFromInput = false;
            return;
        }
        v.SetSelection({ov->start, ov->count});
        float *a = reinterpret_cast<float *>(ov->data.data());
        engine.Get<float>(v, a);
        ov->readFromInput = true;
    }
    else if (ov->type == "int")
    {
        adios2::Variable<int> v = io.InquireVariable<int>(ov->name);
        if (!v)
        {
            ov->readFromInput = false;
            return;
        }
        v.SetSelection({ov->start, ov->count});
        int *a = reinterpret_cast<int *>(ov->data.data());
        engine.Get<int>(v, a);
        ov->readFromInput = true;
    }
}

/* return true if read-in completed */
adios2::StepStatus adiosStream::readADIOS(CommandRead *cmdR, Config &cfg,
                                          const Settings &settings, size_t step)
{
    if (!settings.myRank && settings.verbose)
    {
        std::cout << "    Read ";
        if (cmdR->stepMode == adios2::StepMode::NextAvailable)
        {
            std::cout << "next available step from ";
        }
        else
        {
            std::cout << "latest step from ";
        }

        std::cout << cmdR->streamName << " using the group " << cmdR->groupName;
        if (!cmdR->variables.empty())
        {
            std::cout << " with selected variables:  ";
            for (const auto &v : cmdR->variables)
            {
                std::cout << v->name << " ";
            }
        }
        std::cout << std::endl;
    }
    adios2::StepStatus status =
        engine.BeginStep(cmdR->stepMode, cmdR->timeout_sec);
    if (status != adios2::StepStatus::OK)
    {
        return status;
    }

    if (!settings.myRank && settings.verbose && step == 1)
    {
        const auto varmap = io.AvailableVariables();
        std::cout << "    Variables in input for reading: " << std::endl;
        for (const auto &v : varmap)
        {
            std::cout << "        " << v.first << std::endl;
        }
    }

    if (!settings.myRank && settings.verbose)
    {
        std::cout << "    Read data " << std::endl;
    }

    for (auto ov : cmdR->variables)
    {
        getADIOSArray(ov);
    }
    engine.EndStep();
    return status;
}

void adiosStream::writeADIOS(CommandWrite *cmdW, Config &cfg,
                             const Settings &settings, size_t step)
{
    if (!settings.myRank && settings.verbose)
    {
        std::cout << "    Write to output " << cmdW->streamName << " the group "
                  << cmdW->groupName;
        if (!cmdW->variables.empty())
        {
            std::cout << " with selected variables:  ";
            for (const auto &v : cmdW->variables)
            {
                std::cout << v->name << " ";
            }
        }
        std::cout << std::endl;
    }

    const double div =
        pow(10.0, static_cast<const double>(settings.ndigits(cfg.nSteps - 1)));
    double myValue = static_cast<double>(settings.myRank) +
                     static_cast<double>(step - 1) / div;

    std::map<std::string, adios2::Params> definedVars = io.AvailableVariables();
    for (auto ov : cmdW->variables)
    {
        // if the variable is not in the IO group it means
        // we have not defined it yet (e.g. a write-only variable or a linked
        // variable defined in another read group)
        const auto it = definedVars.find(ov->name);
        if (it == definedVars.end())
        {
            if (!settings.myRank && settings.verbose)
            {
                std::cout << "        Define array  " << ov->name
                          << "  for output" << std::endl;
            }
            defineADIOSArray(ov);
        }

        // if we read the variable, use the read values otherwise generate data
        // now
        if (!ov->readFromInput)
        {
            if (!settings.myRank && settings.verbose)
            {
                std::cout << "        Fill array  " << ov->name
                          << "  for output" << std::endl;
            }
            fillArray(ov, myValue);
        }
    }

    if (!settings.myRank && settings.verbose)
    {
        std::cout << "        Write data " << std::endl;
    }
    engine.BeginStep();
    for (const auto ov : cmdW->variables)
    {
        putADIOSArray(ov);
    }
    engine.EndStep();
}

void adiosStream::Write(CommandWrite *cmdW, Config &cfg,
                        const Settings &settings, size_t step)
{

    writeADIOS(cmdW, cfg, settings, step);
}

adios2::StepStatus adiosStream::Read(CommandRead *cmdR, Config &cfg,
                                     const Settings &settings, size_t step)
{
    return readADIOS(cmdR, cfg, settings, step);
}

void adiosStream::Close() { engine.Close(); }
+38 −0
Original line number Diff line number Diff line
/*
 * adiosStream.h
 *
 *  Created on: Nov 2018
 *      Author: Norbert Podhorszki
 */

#ifndef ADIOSSTREAM_H
#define ADIOSSTREAM_H

#include "adios2.h"
#include "stream.h"

class adiosStream : public Stream
{
public:
    adios2::Engine engine;
    adios2::IO io;
    adiosStream(const std::string &streamName, adios2::IO &io,
                const adios2::Mode mode, MPI_Comm comm);
    ~adiosStream();
    void Write(CommandWrite *cmdW, Config &cfg, const Settings &settings,
               size_t step);
    adios2::StepStatus Read(CommandRead *cmdR, Config &cfg,
                            const Settings &settings, size_t step);
    void Close();

private:
    void defineADIOSArray(const std::shared_ptr<VariableInfo> ov);
    void putADIOSArray(const std::shared_ptr<VariableInfo> ov);
    void getADIOSArray(std::shared_ptr<VariableInfo> ov);
    adios2::StepStatus readADIOS(CommandRead *cmdR, Config &cfg,
                                 const Settings &settings, size_t step);
    void writeADIOS(CommandWrite *cmdW, Config &cfg, const Settings &settings,
                    size_t step);
};

#endif /* ADIOSSTREAM_H */
+33 −272
Original line number Diff line number Diff line
@@ -13,244 +13,7 @@
#include "decomp.h"
#include "processConfig.h"
#include "settings.h"

void defineADIOSArray(adios2::IO &io, const std::shared_ptr<VariableInfo> ov)
{
    if (ov->type == "double")
    {
        adios2::Variable<double> v = io.DefineVariable<double>(
            ov->name, ov->shape, ov->start, ov->count, true);
        // v = io->InquireVariable<double>(ov->name);
    }
    else if (ov->type == "float")
    {
        adios2::Variable<float> v = io.DefineVariable<float>(
            ov->name, ov->shape, ov->start, ov->count, true);
    }
    else if (ov->type == "int")
    {
        adios2::Variable<int> v = io.DefineVariable<int>(
            ov->name, ov->shape, ov->start, ov->count, true);
    }
}

void fillArray(std::shared_ptr<VariableInfo> ov, double value)
{
    if (ov->type == "double")
    {
        double *a = reinterpret_cast<double *>(ov->data.data());
        for (size_t i = 0; i < ov->datasize / ov->elemsize; ++i)
        {
            a[i] = value;
        }
    }
    else if (ov->type == "float")
    {
        float v = static_cast<float>(value);
        float *a = reinterpret_cast<float *>(ov->data.data());
        for (size_t i = 0; i < ov->datasize / ov->elemsize; ++i)
        {
            a[i] = v;
        }
    }
    else if (ov->type == "int")
    {
        int v = static_cast<int>(value);
        int *a = reinterpret_cast<int *>(ov->data.data());
        for (size_t i = 0; i < ov->datasize / ov->elemsize; ++i)
        {
            a[i] = v;
        }
    }
}

void putADIOSArray(std::shared_ptr<adios2::Engine> writer,
                   const std::shared_ptr<VariableInfo> ov)
{
    if (ov->type == "double")
    {
        const double *a = reinterpret_cast<const double *>(ov->data.data());
        writer->Put<double>(ov->name, a);
    }
    else if (ov->type == "float")
    {
        const float *a = reinterpret_cast<const float *>(ov->data.data());
        writer->Put<float>(ov->name, a);
    }
    else if (ov->type == "int")
    {
        const int *a = reinterpret_cast<const int *>(ov->data.data());
        writer->Put<int>(ov->name, a);
    }
}

void getADIOSArray(std::shared_ptr<adios2::Engine> reader, adios2::IO &io,
                   std::shared_ptr<VariableInfo> ov)
{
    if (ov->type == "double")
    {
        adios2::Variable<double> v = io.InquireVariable<double>(ov->name);
        if (!v)
        {
            ov->readFromInput = false;
            return;
        }
        v.SetSelection({ov->start, ov->count});
        double *a = reinterpret_cast<double *>(ov->data.data());
        reader->Get<double>(v, a);
        ov->readFromInput = true;
    }
    else if (ov->type == "float")
    {
        adios2::Variable<float> v = io.InquireVariable<float>(ov->name);
        if (!v)
        {
            ov->readFromInput = false;
            return;
        }
        v.SetSelection({ov->start, ov->count});
        float *a = reinterpret_cast<float *>(ov->data.data());
        reader->Get<float>(v, a);
        ov->readFromInput = true;
    }
    else if (ov->type == "int")
    {
        adios2::Variable<int> v = io.InquireVariable<int>(ov->name);
        if (!v)
        {
            ov->readFromInput = false;
            return;
        }
        v.SetSelection({ov->start, ov->count});
        int *a = reinterpret_cast<int *>(ov->data.data());
        reader->Get<int>(v, a);
        ov->readFromInput = true;
    }
}

/* return true if read-in completed */
adios2::StepStatus readADIOS(std::shared_ptr<adios2::Engine> reader,
                             adios2::IO &io, CommandRead *cmdR, Config &cfg,
                             const Settings &settings, size_t step)
{
    if (!settings.myRank && settings.verbose)
    {
        std::cout << "    Read ";
        if (cmdR->stepMode == adios2::StepMode::NextAvailable)
        {
            std::cout << "next available step from ";
        }
        else
        {
            std::cout << "latest step from ";
        }

        std::cout << cmdR->streamName << " using the group " << cmdR->groupName;
        if (!cmdR->variables.empty())
        {
            std::cout << " with selected variables:  ";
            for (const auto &v : cmdR->variables)
            {
                std::cout << v->name << " ";
            }
        }
        std::cout << std::endl;
    }
    adios2::StepStatus status =
        reader->BeginStep(cmdR->stepMode, cmdR->timeout_sec);
    if (status != adios2::StepStatus::OK)
    {
        return status;
    }

    if (!settings.myRank && settings.verbose && step == 1)
    {
        const auto varmap = io.AvailableVariables();
        std::cout << "    Variables in input for reading: " << std::endl;
        for (const auto &v : varmap)
        {
            std::cout << "        " << v.first << std::endl;
        }
    }

    if (!settings.myRank && settings.verbose)
    {
        std::cout << "    Read data " << std::endl;
    }

    for (auto ov : cmdR->variables)
    {
        getADIOSArray(reader, io, ov);
    }
    reader->EndStep();
    return status;
}

void writeADIOS(std::shared_ptr<adios2::Engine> writer, adios2::IO &io,
                CommandWrite *cmdW, Config &cfg, const Settings &settings,
                size_t step)
{
    if (!settings.myRank && settings.verbose)
    {
        std::cout << "    Write to output " << cmdW->streamName << " the group "
                  << cmdW->groupName;
        if (!cmdW->variables.empty())
        {
            std::cout << " with selected variables:  ";
            for (const auto &v : cmdW->variables)
            {
                std::cout << v->name << " ";
            }
        }
        std::cout << std::endl;
    }

    const double div =
        pow(10.0, static_cast<const double>(settings.ndigits(cfg.nSteps - 1)));
    double myValue = static_cast<double>(settings.myRank) +
                     static_cast<double>(step - 1) / div;

    std::map<std::string, adios2::Params> definedVars = io.AvailableVariables();
    for (auto ov : cmdW->variables)
    {
        // if the variable is not in the IO group it means
        // we have not defined it yet (e.g. a write-only variable or a linked
        // variable defined in another read group)
        const auto it = definedVars.find(ov->name);
        if (it == definedVars.end())
        {
            if (!settings.myRank && settings.verbose)
            {
                std::cout << "        Define array  " << ov->name
                          << "  for output" << std::endl;
            }
            defineADIOSArray(io, ov);
        }

        // if we read the variable, use the read values otherwise generate data
        // now
        if (!ov->readFromInput)
        {
            if (!settings.myRank && settings.verbose)
            {
                std::cout << "        Fill array  " << ov->name
                          << "  for output" << std::endl;
            }
            fillArray(ov, myValue);
        }
    }

    if (!settings.myRank && settings.verbose)
    {
        std::cout << "        Write data " << std::endl;
    }
    writer->BeginStep();
    for (const auto ov : cmdW->variables)
    {
        putADIOSArray(writer, ov);
    }
    writer->EndStep();
}
#include "stream.h"

int main(int argc, char *argv[])
{
@@ -335,24 +98,22 @@ int main(int argc, char *argv[])
                }
            }

            std::map<std::string, adios2::IO> ioMap;
            std::map<std::string, std::shared_ptr<ioGroup>> ioMap;

            /* 2. Declare/define groups and open streams in the order they
             * appear */
            std::map<std::string, std::shared_ptr<adios2::Engine>>
                readEngineMap;
            std::map<std::string, std::shared_ptr<adios2::Engine>>
                writeEngineMap;
            std::map<std::string, std::shared_ptr<Stream>> readStreamMap;
            std::map<std::string, std::shared_ptr<Stream>> writeStreamMap;

            for (const auto &st : streamsInOrder)
            {
                const std::string &streamName = st.first;
                adios2::IO io;
                std::shared_ptr<ioGroup> io;
                auto &groupName = groupMap[streamName];
                auto it = ioMap.find(groupName);
                if (it == ioMap.end())
                {
                    io = adios.DeclareIO(groupName);
                    io = createGroup(groupName, settings.iolib, adios);
                    ioMap[groupName] = io;
                }
                else
@@ -362,24 +123,24 @@ int main(int argc, char *argv[])
                const bool isWrite = (st.second == Operation::Write);
                if (isWrite)
                {
                    auto it = writeEngineMap.find(streamName);
                    if (it == writeEngineMap.end())
                    auto it = writeStreamMap.find(streamName);
                    if (it == writeStreamMap.end())
                    {
                        adios2::Engine writer = io.Open(
                            streamName, adios2::Mode::Write, settings.appComm);
                        writeEngineMap[streamName] =
                            std::make_shared<adios2::Engine>(writer);
                        std::shared_ptr<Stream> writer =
                            openStream(streamName, io, adios2::Mode::Write,
                                       settings.iolib, settings.appComm);
                        writeStreamMap[streamName] = writer;
                    }
                }
                else /* Read */
                {
                    auto it = readEngineMap.find(streamName);
                    if (it == readEngineMap.end())
                    auto it = readStreamMap.find(streamName);
                    if (it == readStreamMap.end())
                    {
                        adios2::Engine reader = io.Open(
                            streamName, adios2::Mode::Read, settings.appComm);
                        readEngineMap[streamName] =
                            std::make_shared<adios2::Engine>(reader);
                        std::shared_ptr<Stream> reader =
                            openStream(streamName, io, adios2::Mode::Read,
                                       settings.iolib, settings.appComm);
                        readStreamMap[streamName] = reader;
                    }
                }
            }
@@ -428,9 +189,9 @@ int main(int argc, char *argv[])
                    case Operation::Write:
                    {
                        auto cmdW = dynamic_cast<CommandWrite *>(cmd.get());
                        auto writer = writeEngineMap[cmdW->streamName];
                        auto io = ioMap[cmdW->groupName];
                        writeADIOS(writer, io, cmdW, cfg, settings, step);
                        auto stream = writeStreamMap[cmdW->streamName];
                        // auto io = ioMap[cmdW->groupName];
                        stream->Write(cmdW, cfg, settings, step);
                        break;
                    }
                    case Operation::Read:
@@ -440,10 +201,10 @@ int main(int argc, char *argv[])
                        if (statusIt->second == adios2::StepStatus::OK ||
                            statusIt->second == adios2::StepStatus::NotReady)
                        {
                            auto reader = readEngineMap[cmdR->streamName];
                            auto io = ioMap[cmdR->groupName];
                            adios2::StepStatus status = readADIOS(
                                reader, io, cmdR, cfg, settings, step);
                            auto stream = readStreamMap[cmdR->streamName];
                            // auto io = ioMap[cmdR->groupName];
                            adios2::StepStatus status =
                                stream->Read(cmdR, cfg, settings, step);
                            statusIt->second = status;
                            switch (status)
                            {
@@ -491,22 +252,22 @@ int main(int argc, char *argv[])
                const bool isWrite = (st.second == Operation::Write);
                if (isWrite)
                {
                    auto writerIt = writeEngineMap.find(streamName);
                    if (writerIt != writeEngineMap.end())
                    auto writerIt = writeStreamMap.find(streamName);
                    if (writerIt != writeStreamMap.end())
                    {
                        auto writer = writeEngineMap[streamName];
                        auto writer = writeStreamMap[streamName];
                        writerIt->second->Close();
                        writeEngineMap.erase(writerIt);
                        writeStreamMap.erase(writerIt);
                    }
                }
                else /* Read */
                {
                    auto readerIt = readEngineMap.find(streamName);
                    if (readerIt != readEngineMap.end())
                    auto readerIt = readStreamMap.find(streamName);
                    if (readerIt != readStreamMap.end())
                    {
                        auto reader = readEngineMap[streamName];
                        auto reader = readStreamMap[streamName];
                        readerIt->second->Close();
                        readEngineMap.erase(readerIt);
                        readStreamMap.erase(readerIt);
                    }
                }
            }
+305 −0

File added.

Preview size limit exceeded, changes collapsed.

Loading