Commit c75ff59b authored by William F Godoy's avatar William F Godoy
Browse files

Making BP3 thread safe

Enabling thread safety in deserializer
Tests pass
parent 0ff7b477
Loading
Loading
Loading
Loading
+0 −3
Original line number Diff line number Diff line
@@ -100,9 +100,6 @@ public:
     * */
    std::map<size_t, std::vector<size_t>> m_AvailableStepBlockIndexOffsets;

    /** wildcard memory space used for contiguous memory read */
    std::map<size_t, std::vector<char>> m_RawMemory;

    VariableBase(const std::string &name, const std::string type,
                 const size_t elementSize, const Dims &shape, const Dims &start,
                 const Dims &count, const bool constantShape,
+26 −96
Original line number Diff line number Diff line
@@ -71,116 +71,46 @@ void BP3Reader::ReadVariableBlocks(Variable<T> &variable)

        for (const auto &stepPair : blockInfo.StepBlockSubStreamsInfo)
        {
            const std::vector<helper::SubStreamBoxInfo> &subStreamsInfo =
                stepPair.second;

            for (const helper::SubStreamBoxInfo &subStreamInfo : subStreamsInfo)
            for (const helper::SubStreamBoxInfo &subStreamBoxInfo :
                 stepPair.second)
            {
                if (subStreamInfo.ZeroBlock)
                if (subStreamBoxInfo.ZeroBlock)
                {
                    continue;
                }

                const size_t subFileIndex = subStreamInfo.SubStreamID;

                if (m_SubFileManager.m_Transports.count(subFileIndex) == 0)
                // check if subfile is already opened
                if (m_SubFileManager.m_Transports.count(
                        subStreamBoxInfo.SubStreamID) == 0)
                {
                    const std::string subFile(
                        m_BP3Deserializer.GetBPSubFileName(m_Name,
                                                           subFileIndex));
                    const std::string subFileName =
                        m_BP3Deserializer.GetBPSubFileName(
                            m_Name, subStreamBoxInfo.SubStreamID);

                    m_SubFileManager.OpenFileID(
                        subFile, subFileIndex, Mode::Read,
                        subFileName, subStreamBoxInfo.SubStreamID, Mode::Read,
                        {{"transport", "File"}}, profile);
                }

                // need to decompress before into m_Memory
                if (subStreamInfo.OperationsInfo.size() > 0)
                {
                    const bool identity =
                        m_BP3Deserializer.IdentityOperation<T>(
                            blockInfo.Operations);

                    const helper::BlockOperationInfo &blockOperationInfo =
                        m_BP3Deserializer.InitPostOperatorBlockData(
                            subStreamInfo.OperationsInfo,
                            variable.m_RawMemory[1], identity);

                    // if identity is true, just read the entire block content
                    char *output =
                        identity ? reinterpret_cast<char *>(blockInfo.Data)
                                 : variable.m_RawMemory[1].data();
                    m_SubFileManager.ReadFile(
                        output, blockOperationInfo.PayloadSize,
                        blockOperationInfo.PayloadOffset, subFileIndex);
                    if (identity)
                    {
                        continue;
                    }
                    m_BP3Deserializer.GetPreOperatorBlockData(
                        variable.m_RawMemory[1], blockOperationInfo,
                        variable.m_RawMemory[0]);
                char *buffer = nullptr;
                size_t payloadSize = 0, payloadStart = 0;

                    helper::ClipVector(variable.m_RawMemory[0],
                                       subStreamInfo.Seeks.first,
                                       subStreamInfo.Seeks.second);
                }
                else
                {
                    const size_t payloadStart = subStreamInfo.Seeks.first;
                    const size_t payloadSize =
                        subStreamInfo.Seeks.second - subStreamInfo.Seeks.first;
                    // a single m_Memory can prevent threading per variable,
                    // need to think for later
                    variable.m_RawMemory[0].resize(payloadSize);
                    m_SubFileManager.ReadFile(variable.m_RawMemory[0].data(),
                                              payloadSize, payloadStart,
                                              subFileIndex);
                }
                //                m_BP3Deserializer.ClipContiguousMemory<T>(
                //                                       blockInfo,
                //                                       variable.m_RawMemory[0],
                //                                       subStreamInfo.BlockBox,
                //                                       subStreamInfo.IntersectionBox);
                const Box<Dims> sourceStartCount =
                    helper::StartCountBox(subStreamInfo.BlockBox.first,
                                          subStreamInfo.BlockBox.second);

// TODO: this should be a single BP3 deserializer function
#ifdef ADIOS2_HAVE_ENDIAN_REVERSE
                const bool endianReverse =
                    (helper::IsLittleEndian() !=
                     m_BP3Deserializer.m_Minifooter.IsLittleEndian)
                        ? true
                        : false;
#else
                constexpr bool endianReverse = false;
#endif
                if (variable.m_ShapeID == ShapeID::GlobalArray)
                {
                    helper::CopyMemory(
                        blockInfo.Data, blockInfo.Start, blockInfo.Count,
                        helper::IsRowMajor(m_IO.m_HostLanguage),
                        reinterpret_cast<T *>(variable.m_RawMemory[0].data()),
                        sourceStartCount.first, sourceStartCount.second,
                        m_BP3Deserializer.m_IsRowMajor, endianReverse);
                }
                else if (variable.m_ShapeID == ShapeID::LocalArray)
                {
                    helper::CopyMemory(
                        blockInfo.Data, Dims(blockInfo.Count.size(), 0),
                        blockInfo.Count,
                        helper::IsRowMajor(m_IO.m_HostLanguage),
                        reinterpret_cast<T *>(variable.m_RawMemory[0].data()),
                        sourceStartCount.first, sourceStartCount.second,
                        m_BP3Deserializer.m_IsRowMajor, endianReverse);
                }
            }
                m_BP3Deserializer.PreDataRead(variable, blockInfo,
                                              subStreamBoxInfo, buffer,
                                              payloadSize, payloadStart, 0);

                m_SubFileManager.ReadFile(buffer, payloadSize, payloadStart,
                                          subStreamBoxInfo.SubStreamID);

                m_BP3Deserializer.PostDataRead(
                    variable, blockInfo, subStreamBoxInfo,
                    helper::IsRowMajor(m_IO.m_HostLanguage), 0);
            } // substreams loop
            // advance pointer to next step
            blockInfo.Data += helper::GetTotalSize(blockInfo.Count);
        }
        } // steps loop
        blockInfo.Data = originalBlockData;
    }
    } // deferred blocks loop
}

} // end namespace engine
+10 −0
Original line number Diff line number Diff line
@@ -13,6 +13,7 @@

/// \cond EXCLUDE_FROM_DOXYGEN
#include <bitset>
#include <map>
#include <memory> //std::shared_ptr
#include <set>
#include <string>
@@ -180,6 +181,15 @@ public:
     */
    std::unordered_set<std::string> m_SerializedAttributes;

    /**
     * scratch memory buffers used for management of temporary memory buffers
     * per thread.
     * This allows thread-safety mostly is deserialization.
     * Indices:
     * [threadID][bufferID]
     */
    std::map<size_t, std::map<size_t, std::vector<char>>> m_ThreadBuffers;

    /**
     * Unique constructor
     * @param mpiComm for m_BP1Aggregator
+10 −26
Original line number Diff line number Diff line
@@ -42,10 +42,8 @@ void BP3Deserializer::ParseMetadata(const BufferSTL &bufferSTL, core::IO &io)
}

const helper::BlockOperationInfo &BP3Deserializer::InitPostOperatorBlockData(
    const std::vector<helper::BlockOperationInfo> &blockOperationsInfo,
    std::vector<char> &postOpData, const bool identity) const
    const std::vector<helper::BlockOperationInfo> &blockOperationsInfo) const
{

    size_t index = 0;
    for (const helper::BlockOperationInfo &blockOperationInfo :
         blockOperationsInfo)
@@ -53,10 +51,6 @@ const helper::BlockOperationInfo &BP3Deserializer::InitPostOperatorBlockData(
        const std::string type = blockOperationInfo.Info.at("Type");
        if (m_TransformTypes.count(type) == 1)
        {
            if (!identity)
            {
                postOpData.resize(blockOperationInfo.PayloadSize);
            }
            break;
        }
        ++index;
@@ -64,21 +58,6 @@ const helper::BlockOperationInfo &BP3Deserializer::InitPostOperatorBlockData(
    return blockOperationsInfo.at(index);
}

void BP3Deserializer::GetPreOperatorBlockData(
    const std::vector<char> &postOpData,
    const helper::BlockOperationInfo &blockOperationInfo,
    std::vector<char> &preOpData) const
{
    // pre-allocate decompressed block
    preOpData.resize(helper::GetTotalSize(blockOperationInfo.PreCount) *
                     blockOperationInfo.PreSizeOf);

    // get the right bp3Op
    std::shared_ptr<BP3Operation> bp3Op =
        SetBP3Operation(blockOperationInfo.Info.at("Type"));
    bp3Op->GetData(postOpData.data(), blockOperationInfo, preOpData.data());
}

// PRIVATE
void BP3Deserializer::ParseMinifooter(const BufferSTL &bufferSTL)
{
@@ -552,12 +531,17 @@ ADIOS2_FOREACH_TYPE_1ARG(declare_template_instantiation)
    BP3Deserializer::BlocksInfo(const core::Variable<T> &, const size_t)       \
        const;                                                                 \
                                                                               \
    template bool BP3Deserializer::IdentityOperation<T>(                       \
        const std::vector<typename core::Variable<T>::Operation> &)            \
        const noexcept;
    template void BP3Deserializer::PreDataRead(                                \
        core::Variable<T> &, typename core::Variable<T>::Info &,               \
        const helper::SubStreamBoxInfo &, char *&, size_t &, size_t &,         \
        const size_t);                                                         \
                                                                               \
    template void BP3Deserializer::PostDataRead(                               \
        core::Variable<T> &, typename core::Variable<T>::Info &,               \
        const helper::SubStreamBoxInfo &, const bool, const size_t);

ADIOS2_FOREACH_TYPE_1ARG(declare_template_instantiation)
#undef declare_template_instantiation

} // end namespace format
} // end namespace formata
} // end namespace adios2
+44 −14
Original line number Diff line number Diff line
@@ -75,20 +75,36 @@ public:
    SetVariableBlockInfo(core::Variable<T> &variable,
                         typename core::Variable<T>::Info &blockInfo) const;

    // Operation related functions
    /**
     * Prepares the information to get raw data from the transport manager for a
     * required substream box (block)
     * @param variable input Variable
     * @param blockInfo input blockInfo with information about Get request
     * @param subStreamBoxInfo contains information (e.g. bounds, operation,
     * etc.) about the available box (block) to be accessed by the Transport
     * Manager.
     * @param buffer output to be passed to Transport Manager for current box
     * @param payloadSize output to be passed to Transport Manager for current
     * box
     * @param payloadStart output to be passed to Transport Manager for current
     * box
     * @param threadID assign different thread ID to have independent raw memory
     * spaces per thread, default = 0
     */
    template <class T>
    bool IdentityOperation(
        const std::vector<typename core::Variable<T>::Operation> &operations)
        const noexcept;
    void PreDataRead(core::Variable<T> &variable,
                     typename core::Variable<T>::Info &blockInfo,
                     const helper::SubStreamBoxInfo &subStreamBoxInfo,
                     char *&buffer, size_t &payloadSize, size_t &payloadOffset,
                     const size_t threadID = 0);

    const helper::BlockOperationInfo &InitPostOperatorBlockData(
        const std::vector<helper::BlockOperationInfo> &blockOperationsInfo,
        std::vector<char> &postOpData, const bool identity) const;
    template <class T>
    void PostDataRead(core::Variable<T> &variable,
                      typename core::Variable<T>::Info &blockInfo,
                      const helper::SubStreamBoxInfo &subStreamBoxInfo,
                      const bool isRowMajorDestination,
                      const size_t threadID = 0);

    void GetPreOperatorBlockData(
        const std::vector<char> &postOpData,
        const helper::BlockOperationInfo &blockOperationInfo,
        std::vector<char> &preOpData) const;
    /**
     * Clips and assigns memory to blockInfo.Data from a contiguous memory
     * input
@@ -181,6 +197,15 @@ private:
    std::vector<typename core::Variable<T>::Info>
    BlocksInfoCommon(const core::Variable<T> &variable,
                     const std::vector<size_t> &blocksIndexOffsets) const;

    template <class T>
    bool IdentityOperation(
        const std::vector<typename core::Variable<T>::Operation> &operations)
        const noexcept;

    const helper::BlockOperationInfo &InitPostOperatorBlockData(
        const std::vector<helper::BlockOperationInfo> &blockOperationsInfo)
        const;
};

// TODO: deprecate this
@@ -224,9 +249,14 @@ ADIOS2_FOREACH_TYPE_1ARG(declare_template_instantiation)
    BP3Deserializer::BlocksInfo(const core::Variable<T> &, const size_t)       \
        const;                                                                 \
                                                                               \
    extern template bool BP3Deserializer::IdentityOperation<T>(                \
        const std::vector<typename core::Variable<T>::Operation> &)            \
        const noexcept;
    extern template void BP3Deserializer::PreDataRead(                         \
        core::Variable<T> &, typename core::Variable<T>::Info &,               \
        const helper::SubStreamBoxInfo &, char *&, size_t &, size_t &,         \
        const size_t);                                                         \
                                                                               \
    extern template void BP3Deserializer::PostDataRead(                        \
        core::Variable<T> &, typename core::Variable<T>::Info &,               \
        const helper::SubStreamBoxInfo &, const bool, const size_t);

ADIOS2_FOREACH_TYPE_1ARG(declare_template_instantiation)
#undef declare_template_instantiation
Loading