Unverified Commit 128736b0 authored by Jason Wang's avatar Jason Wang Committed by GitHub
Browse files

Merge pull request #982 from JasonRuonanWang/dataman-query

refined dataman metadata as preparation for the new staging engine
parents dadff59a 2ddb52e2
Loading
Loading
Loading
Loading
+6 −6
Original line number Diff line number Diff line
@@ -57,15 +57,15 @@ void DataManWriter::PutDeferredCommon(Variable<T> &variable, const T *values)
        {
            if (m_WorkflowMode == "subscribe")
            {
                m_DataManSerializer[i]->Put(
                    variable, m_Name, CurrentStep(), m_MPIRank,
                    m_IO.m_TransportsParameters[i], false);
                m_DataManSerializer[i]->Put(variable, m_Name, CurrentStep(),
                                            m_MPIRank, "",
                                            m_IO.m_TransportsParameters[i]);
            }
            else
            {
                m_DataManSerializer[i]->Put(
                    variable, m_Name, CurrentStep(), m_MPIRank,
                    m_IO.m_TransportsParameters[i], true);
                m_DataManSerializer[i]->Put(variable, m_Name, CurrentStep(),
                                            m_MPIRank, "",
                                            m_IO.m_TransportsParameters[i]);
            }
        }
    }
+81 −124
Original line number Diff line number Diff line
@@ -50,113 +50,73 @@ int DataManDeserializer::Put(
        key = rand();
    }
    m_BufferMap[key] = data;
    size_t position = 0;
    while (position < data->capacity())

    uint64_t metaPosition =
        (reinterpret_cast<const uint64_t *>(data->data()))[0];
    uint64_t metaSize = (reinterpret_cast<const uint64_t *>(data->data()))[1];

    nlohmann::json metaJ =
        nlohmann::json::from_msgpack(data->data() + metaPosition, metaSize);

    for (auto stepMapIt = metaJ.begin(); stepMapIt != metaJ.end(); ++stepMapIt)
    {
        uint32_t metasize;
        std::memcpy(&metasize, data->data() + position, sizeof(metasize));
        position += sizeof(metasize);
        if (position + metasize > data->size())
        for (auto rankMapIt = stepMapIt.value().begin();
             rankMapIt != stepMapIt.value().end(); ++rankMapIt)
        {
            for (const auto &varBlock : rankMapIt.value())
            {
            break;
        }
                DataManVar var;
                try
                {
            nlohmann::json metaj =
                nlohmann::json::from_msgpack(data->data() + position, metasize);
            position += metasize;

                    // compulsory properties
            var.name = metaj["N"].get<std::string>();
            var.start = metaj["O"].get<Dims>();
            var.count = metaj["C"].get<Dims>();
            var.step = metaj["T"].get<size_t>();
            var.size = metaj["I"].get<size_t>();
                    var.name = varBlock["N"].get<std::string>();
                    var.start = varBlock["O"].get<Dims>();
                    var.count = varBlock["C"].get<Dims>();
                    var.step = varBlock["T"].get<size_t>();
                    var.size = varBlock["I"].get<size_t>();

                    // optional properties

            auto itMap = m_VarDefaultsMap.find(var.name);
            auto itJson = metaj.find("D");
            if (itJson != metaj.end())
                    auto itJson = varBlock.find("D");
                    if (itJson != varBlock.end())
                    {
                        var.doid = itJson->get<std::string>();
                m_VarDefaultsMap[var.name].doid = var.doid;
            }
            else
            {
                if (itMap != m_VarDefaultsMap.end())
                {
                    var.doid = itMap->second.doid;
                }
                    }

            itJson = metaj.find("M");
            if (itJson != metaj.end())
                    itJson = varBlock.find("M");
                    if (itJson != varBlock.end())
                    {
                        var.isRowMajor = itJson->get<bool>();
                m_VarDefaultsMap[var.name].isRowMajor = var.isRowMajor;
            }
            else
            {
                if (itMap != m_VarDefaultsMap.end())
                {
                    var.isRowMajor = itMap->second.isRowMajor;
                }
                    }

            itJson = metaj.find("E");
            if (itJson != metaj.end())
                    itJson = varBlock.find("E");
                    if (itJson != varBlock.end())
                    {
                        var.isLittleEndian = itJson->get<bool>();
                m_VarDefaultsMap[var.name].isLittleEndian = var.isLittleEndian;
            }
            else
            {
                if (itMap != m_VarDefaultsMap.end())
                {
                    var.isLittleEndian = itMap->second.isLittleEndian;
                }
                    }

            itJson = metaj.find("Y");
            if (itJson != metaj.end())
                    itJson = varBlock.find("Y");
                    if (itJson != varBlock.end())
                    {
                        var.type = itJson->get<std::string>();
                m_VarDefaultsMap[var.name].type = var.type;
            }
            else
            {
                if (itMap != m_VarDefaultsMap.end())
                {
                    var.type = itMap->second.type;
                }
                    }

            itJson = metaj.find("S");
            if (itJson != metaj.end())
                    itJson = varBlock.find("S");
                    if (itJson != varBlock.end())
                    {
                        var.shape = itJson->get<Dims>();
                m_VarDefaultsMap[var.name].shape = var.shape;
            }
            else
            {
                if (itMap != m_VarDefaultsMap.end())
                {
                    var.shape = itMap->second.shape;
                }
                    }

            var.position = position;
                    var.position = varBlock["P"].get<size_t>();
                    var.index = key;

            auto it = metaj.find("Z");
            if (it != metaj.end())
                    auto it = varBlock.find("Z");
                    if (it != varBlock.end())
                    {
                        var.compression = it->get<std::string>();
                    }

            for (auto i = metaj.begin(); i != metaj.end(); ++i)
                    for (auto i = varBlock.begin(); i != varBlock.end(); ++i)
                    {
                        auto pos = i.key().find(":");
                        if (pos != std::string::npos)
@@ -165,17 +125,12 @@ int DataManDeserializer::Put(
                        }
                    }

            if (position + var.size > data->capacity())
            {
                break;
            }
                    if (m_MetaDataMap[var.step] == nullptr)
                    {
                        m_MetaDataMap[var.step] =
                            std::make_shared<std::vector<DataManVar>>();
                    }
            m_MetaDataMap[var.step]->push_back(std::move(var));
            position += var.size;
                    m_MetaDataMap[var.step]->emplace_back(std::move(var));
                }
                catch (std::exception &e)
                {
@@ -191,6 +146,8 @@ int DataManDeserializer::Put(
                    m_MinStep = var.step;
                }
            }
        }
    }
    return 0;
}

+0 −10
Original line number Diff line number Diff line
@@ -71,16 +71,6 @@ private:
    bool m_IsLittleEndian;

    std::mutex m_Mutex;

    struct VarDefaults
    {
        std::string doid;
        bool isRowMajor;
        bool isLittleEndian;
        std::string type;
        Dims shape;
    };
    std::map<std::string, VarDefaults> m_VarDefaultsMap;
};

} // end namespace format
+9 −6
Original line number Diff line number Diff line
@@ -31,21 +31,24 @@ void DataManSerializer::New(size_t size)
    // still be alive and needed somewhere in the workflow, for example the
    // queue in transport manager. It will be automatically released when the
    // entire workflow finishes using it.
    m_Metadata = nullptr;
    m_Buffer = std::make_shared<std::vector<char>>();
    m_Buffer->reserve(size);
    m_Position = 0;
    m_Position = sizeof(uint64_t) * 2;
}

const std::shared_ptr<std::vector<char>> DataManSerializer::Get()
{
    std::vector<uint8_t> metacbor = nlohmann::json::to_msgpack(m_Metadata);
    size_t metasize = metacbor.size();
    m_Buffer->resize(m_Position + metasize);
    (reinterpret_cast<uint64_t *>(m_Buffer->data()))[0] = m_Position;
    (reinterpret_cast<uint64_t *>(m_Buffer->data()))[1] = metasize;
    std::memcpy(m_Buffer->data() + m_Position, metacbor.data(), metasize);
    return m_Buffer;
}

float DataManSerializer::GetMetaRatio()
{
    return static_cast<float>(m_TotalMetadataSize) /
           static_cast<float>(m_TotalMetadataSize + m_TotalDataSize);
}
float DataManSerializer::GetMetaRatio() { return 0; }

std::shared_ptr<std::vector<char>> DataManSerializer::EndSignal(size_t step)
{
+20 −16
Original line number Diff line number Diff line
@@ -19,6 +19,22 @@
#include <mutex>
#include <unordered_map>

// A - Address
// C - Count
// D - Data Object ID or File Name
// E - Endian
// H - Meatadata Hash
// I - Data Size
// M - Major
// N - Variable Name
// O - Start
// P - Position of Memory Block
// S - Shape
// X - Index (Used only in deserializer)
// Y - Data Type
// Z - Compression Method
// ZP - Compression Parameters

namespace adios2
{
namespace format
@@ -33,17 +49,18 @@ public:
    void Put(const T *inputData, const std::string &varName,
             const Dims &varShape, const Dims &varStart, const Dims &varCount,
             const std::string &doid, const size_t step, const int rank,
             const Params &params, const bool optimizeMetadata);
             std::string address, const Params &params);
    template <class T>
    void Put(const core::Variable<T> &variable, const std::string &doid,
             const size_t step, const int rank, const Params &params,
             const bool optimizeMetadata);
             const size_t step, const int rank, std::string address,
             const Params &params);
    const std::shared_ptr<std::vector<char>> Get();
    float GetMetaRatio();
    static std::shared_ptr<std::vector<char>> EndSignal(size_t step);

private:
    std::shared_ptr<std::vector<char>> m_Buffer;
    nlohmann::json m_Metadata;
    std::vector<char> m_CompressBuffer;
    size_t m_Position = 0;
    bool m_IsRowMajor;
@@ -60,19 +77,6 @@ private:

    bool IsCompressionAvailable(const std::string &method,
                                const std::string &type, const Dims &count);

    size_t m_TotalDataSize;
    size_t m_TotalMetadataSize;

    struct VarDefaults
    {
        std::string doid;
        bool isRowMajor;
        bool isLittleEndian;
        std::string type;
        Dims shape;
    };
    std::map<std::string, VarDefaults> m_VarDefaultsMap;
};

} // end namespace format
Loading