Skip to content
Snippets Groups Projects
BP3Serializer.cpp 35.7 KiB
Newer Older
            lf_MergeRank(itIndex->second);
        }
    };
    // BODY OF FUNCTION STARTS HERE
    if (m_Threads == 1) // serial version
    {
        for (const auto &rankIndices : nameRankIndices)
        {
            lf_MergeRank(rankIndices.second);
        }
        return;
    }

    // if threaded
    const size_t elements = nameRankIndices.size();
    const size_t stride = elements / m_Threads;        // elements per thread
    const size_t last = stride + elements % m_Threads; // remainder to last

    std::vector<std::thread> threads;
    threads.reserve(m_Threads);

    // copy names in order to use threads
    std::vector<std::string> names;
    names.reserve(nameRankIndices.size());

    for (const auto &nameRankIndexPair : nameRankIndices)
    {
        names.push_back(nameRankIndexPair.first);
    }

    for (unsigned int t = 0; t < m_Threads; ++t)
    {
        const size_t start = stride * t;
        size_t end;

        if (t == m_Threads - 1)
        {
            end = start + stride;
        }
        else
        {
            end = start + last;
        }

        threads.push_back(std::thread(lf_MergeRankRange,
                                      std::ref(nameRankIndices),
                                      std::ref(names), start, end));
    }

    for (auto &thread : threads)
    {
        thread.join();
std::vector<char>
BP3Serializer::SetCollectiveProfilingJSON(const std::string &rankLog) const
{
    // Gather sizes
    const size_t rankLogSize = rankLog.size();
    std::vector<size_t> rankLogsSizes = GatherValues(rankLogSize, m_MPIComm);

    // Gatherv JSON per rank
    std::vector<char> profilingJSON(3);
    const std::string header("[\n");
    const std::string footer("\n]\n");
    size_t gatheredSize = 0;
    size_t position = 0;

    if (m_RankMPI == 0) // pre-allocate in destination
    {
        gatheredSize =
            std::accumulate(rankLogsSizes.begin(), rankLogsSizes.end(), 0);

        profilingJSON.resize(gatheredSize + header.size() + footer.size() - 2);
        CopyToBuffer(profilingJSON, position, header.c_str(), header.size());
    }

    GathervArrays(rankLog.c_str(), rankLog.size(), rankLogsSizes.data(),
                  rankLogsSizes.size(), &profilingJSON[position], m_MPIComm);

    if (m_RankMPI == 0) // add footer to close JSON
    {
        position += gatheredSize - 2;
        CopyToBuffer(profilingJSON, position, footer.c_str(), footer.size());
    }

    return profilingJSON;
}

//------------------------------------------------------------------------------
// Explicit instantiation of only public templates

#define declare_template_instantiation(T)                                      \
    template void BP3Serializer::PutVariableMetadata(                          \
        const Variable<T> &variable) noexcept;                                 \
    template void BP3Serializer::PutVariablePayload(                           \
        const Variable<T> &variable) noexcept;
ADIOS2_FOREACH_TYPE_1ARG(declare_template_instantiation)
#undef declare_template_instantiation

//------------------------------------------------------------------------------

} // end namespace format