Skip to content
Snippets Groups Projects
BP3Serializer.cpp 36.4 KiB
Newer Older
            m_Metadata.Resize(buffer.size() +
                                  static_cast<size_t>(entryLength + 4),
                              "in call to MergeSerializeIndices bp3 index");
            CopyToBuffer(buffer, position, &entryLength);
            CopyToBuffer(buffer, position, &indices[firstRank].Buffer[4],
                         headerSize - 8 - 4);
            CopyToBuffer(buffer, position, &setsCount);
            CopyToBuffer(buffer, position, sorted.data(), sorted.size());
        }
    };

    auto lf_MergeRankRange = [&](
        const std::unordered_map<std::string, std::vector<SerialElementIndex>>
            &nameRankIndices,
        const std::vector<std::string> &names, const size_t start,
        const size_t end)

    {
        for (size_t i = start; i < end; ++i)
        {
            auto itIndex = nameRankIndices.find(names[i]);
            lf_MergeRank(itIndex->second);
        }
    };
    // BODY OF FUNCTION STARTS HERE
William F Godoy's avatar
William F Godoy committed
    // if (m_Threads == 1) // enforcing serial version for now
    {
        for (const auto &rankIndices : nameRankIndices)
        {
            lf_MergeRank(rankIndices.second);
        }
        return;
    }
William F Godoy's avatar
William F Godoy committed
    // TODO need to debug this part, if threaded per variable
    const size_t elements = nameRankIndices.size();
    const size_t stride = elements / m_Threads;        // elements per thread
    const size_t last = stride + elements % m_Threads; // remainder to last

    std::vector<std::thread> threads;
    threads.reserve(m_Threads);

    // copy names in order to use threads
    std::vector<std::string> names;
    names.reserve(nameRankIndices.size());

    for (const auto &nameRankIndexPair : nameRankIndices)
    {
        names.push_back(nameRankIndexPair.first);
    }

    for (unsigned int t = 0; t < m_Threads; ++t)
    {
        const size_t start = stride * t;
        size_t end;

        if (t == m_Threads - 1)
        {
            end = start + stride;
        }
        else
        {
            end = start + last;
        }

        threads.push_back(std::thread(lf_MergeRankRange,
                                      std::ref(nameRankIndices),
                                      std::ref(names), start, end));
    }

    for (auto &thread : threads)
    {
        thread.join();
std::vector<char>
BP3Serializer::SetCollectiveProfilingJSON(const std::string &rankLog) const
{
    // Gather sizes
    const size_t rankLogSize = rankLog.size();
    std::vector<size_t> rankLogsSizes = GatherValues(rankLogSize, m_MPIComm);

    // Gatherv JSON per rank
    std::vector<char> profilingJSON(3);
    const std::string header("[\n");
    const std::string footer("\n]\n");
    size_t gatheredSize = 0;
    size_t position = 0;

    if (m_RankMPI == 0) // pre-allocate in destination
    {
        gatheredSize =
            std::accumulate(rankLogsSizes.begin(), rankLogsSizes.end(), 0);

        profilingJSON.resize(gatheredSize + header.size() + footer.size() - 2);
        CopyToBuffer(profilingJSON, position, header.c_str(), header.size());
    }

    GathervArrays(rankLog.c_str(), rankLog.size(), rankLogsSizes.data(),
                  rankLogsSizes.size(), &profilingJSON[position], m_MPIComm);

    if (m_RankMPI == 0) // add footer to close JSON
    {
        position += gatheredSize - 2;
        CopyToBuffer(profilingJSON, position, footer.c_str(), footer.size());
    }

    return profilingJSON;
}

//------------------------------------------------------------------------------
// Explicit instantiation of only public templates

#define declare_template_instantiation(T)                                      \
    template void BP3Serializer::PutVariableMetadata(                          \
        const Variable<T> &variable) noexcept;                                 \
    template void BP3Serializer::PutVariablePayload(                           \
        const Variable<T> &variable) noexcept;
ADIOS2_FOREACH_TYPE_1ARG(declare_template_instantiation)
#undef declare_template_instantiation

//------------------------------------------------------------------------------

} // end namespace format