Commit 7cb6912a authored by Podhorszki, Norbert's avatar Podhorszki, Norbert
Browse files

Simplify BeginStep: Each Reader receives step info from only one Writer (primary Writer)

parent 6891946f
Loading
Loading
Loading
Loading
+43 −8
Original line number Diff line number Diff line
@@ -188,28 +188,63 @@ int ConnectDirectPeers(const MPI_Comm commWorld, const bool IAmSender,
                       const bool IAmWriterRoot, const int globalRank,
                       const std::vector<int> &peers)
{
    int token = (IAmWriterRoot ? 1 : 0);
    int writeRootGlobalRank = -1;
    /* Return value:
         Reader: write root global rank or -1
         Writer: am I first connect to my Reader?
    */
    int retval;
    MPI_Status status;

    /* Writers send msg to each connected Reader, Writer Root sends different
     * value than others */
    int writeRootToken = (IAmWriterRoot ? 1 : 0);
    for (const auto peerRank : peers)
    {
        if (IAmSender)
        {
            // std::cout << " Send from " << rank << " to " << peerRank
            //          << std::endl;
            MPI_Send(&token, 1, MPI_INT, peerRank, MpiTags::Connect, commWorld);
            MPI_Send(&writeRootToken, 1, MPI_INT, peerRank, MpiTags::Connect,
                     commWorld);
        }
        else
        {
            // std::cout << " Recv from " << peerRank << " by " << rank
            //          << std::endl;
            MPI_Recv(&token, 1, MPI_INT, peerRank, MpiTags::Connect, commWorld,
                     &status);
            if (token == 1)
                writeRootGlobalRank = peerRank;
            MPI_Recv(&writeRootToken, 1, MPI_INT, peerRank, MpiTags::Connect,
                     commWorld, &status);
            if (writeRootToken == 1)
                retval = peerRank;
        }
    }

    /* Each Reader selects one Writer to be main contact (for BeginStep) */
    int firstAssignedWriterToken = 1;
    for (const auto peerRank : peers)
    {
        if (IAmSender)
        {
            // std::cout << " Receive reader selection from " << peerRank << "
            // by " << rank
            //          << std::endl;
            MPI_Recv(&firstAssignedWriterToken, 1, MPI_INT, peerRank,
                     MpiTags::Connect, commWorld, &status);
            retval = firstAssignedWriterToken;
            /* Note that in case there are less Writer than Readers, here retval
               is set to 1
               multiple times. A mix of 0s and 1s cannot happen */
        }
        else
        {
            // std::cout << " Send reader selection from " << rank << " to " <<
            // peerRank
            //          << std::endl;
            MPI_Send(&firstAssignedWriterToken, 1, MPI_INT, peerRank,
                     MpiTags::Connect, commWorld);
            firstAssignedWriterToken = 0;
        }
    }
    return writeRootGlobalRank;
    return retval;
}

std::vector<MPI_Status> CompleteRequests(std::vector<MPI_Request> &requests,
+5 −2
Original line number Diff line number Diff line
@@ -60,8 +60,11 @@ std::vector<int> AssignPeers(const int rank, const int nproc,
// on the Writer and Reader side at the same time.
// IAmSender is true on the writers, false on the readers.
// IAmWriterRoot is true only on one writer who will send the global metadata.
// return global rank of writer root on the reader who is connected to writer
// root, -1 everywhere else.
// return:
//    on Reader: global rank of writer root on the reader who is connected
//       to writer root, -1 everywhere else.
//    on Writer: 1 on a writer who is the first rank connected to a particular
//       reader, 0 everywhere else (i.e. one Writer selected for each Reader)
int ConnectDirectPeers(const MPI_Comm commWorld, const bool IAmSender,
                       const bool IAmWriterRoot, const int globalRank,
                       const std::vector<int> &peers);
+7 −12
Original line number Diff line number Diff line
@@ -127,21 +127,17 @@ StepStatus InSituMPIReader::BeginStep(const StepMode mode,
    // FIXME: All processes should timeout or succeed together.
    // If some timeouts (and never checks back) and the others succeed,
    // the global metadata operation will hang
    std::vector<MPI_Request> requests(m_RankDirectPeers.size());
    std::vector<int> steps(m_RankDirectPeers.size());
    for (int peerID = 0; peerID < m_RankDirectPeers.size(); peerID++)
    {
        MPI_Irecv(&steps[peerID], 1, MPI_INT, m_RankDirectPeers[peerID],
                  insitumpi::MpiTags::Step, m_CommWorld, &requests[peerID]);
    }
    insitumpi::CompleteRequests(requests, false, m_ReaderRank);
    int step;
    MPI_Status status;
    MPI_Recv(&step, 1, MPI_INT, m_RankDirectPeers[0], insitumpi::MpiTags::Step,
             m_CommWorld, &status);

    if (m_Verbosity == 5)
    {
        std::cout << "InSituMPI Reader " << m_ReaderRank << " new step "
                  << steps[0] << " arrived for " << m_Name << std::endl;
        std::cout << "InSituMPI Reader " << m_ReaderRank << " new step " << step
                  << " arrived for " << m_Name << std::endl;
    }
    m_CurrentStep = steps[0];
    m_CurrentStep = step;
    // FIXME: missing test whether all writers sent the same step

    if (m_CurrentStep == -1)
@@ -158,7 +154,6 @@ StepStatus InSituMPIReader::BeginStep(const StepMode mode,

        if (m_ReaderRootRank == m_ReaderRank)
        {
            MPI_Status status;
            MPI_Recv(&mdLen, 1, MPI_UNSIGNED_LONG, m_WriteRootGlobalRank,
                     insitumpi::MpiTags::MetadataLength, m_CommWorld, &status);
            if (m_Verbosity == 5)
+28 −14
Original line number Diff line number Diff line
@@ -55,9 +55,10 @@ InSituMPIWriter::InSituMPIWriter(IO &io, const std::string &name,
                  << " #appsize=" << m_GlobalNproc
                  << " #direct peers=" << m_RankDirectPeers.size() << std::endl;
    }
    insitumpi::ConnectDirectPeers(m_CommWorld, true,
                                  (m_BP3Serializer.m_RankMPI == 0),
                                  m_GlobalRank, m_RankDirectPeers);
    int primaryContact = insitumpi::ConnectDirectPeers(
        m_CommWorld, true, (m_BP3Serializer.m_RankMPI == 0), m_GlobalRank,
        m_RankDirectPeers);
    m_AmIPrimaryContact = static_cast<bool>(primaryContact);
}

InSituMPIWriter::~InSituMPIWriter() {}
@@ -76,11 +77,13 @@ StepStatus InSituMPIWriter::BeginStep(StepMode mode, const float timeoutSeconds)
    }

    m_CurrentStep++; // 0 is the first step
    if (m_AmIPrimaryContact)
    {
        if (m_Verbosity == 5)
        {
            std::cout << "InSituMPI Writer " << m_WriterRank << " new step "
                  << m_CurrentStep << " for " << m_Name << ". Notify peers..."
                  << std::endl;
                      << m_CurrentStep << " for " << m_Name
                      << ". Notify peers..." << std::endl;
        }
        // Send the step to all reader peers, asynchronously
        // We need to call Wait on all Isend/Irecv calls at some point otherwise
@@ -89,7 +92,18 @@ StepStatus InSituMPIWriter::BeginStep(StepMode mode, const float timeoutSeconds)
        {
            m_MPIRequests.emplace_back();
            MPI_Isend(&m_CurrentStep, 1, MPI_INT, peerRank,
                  insitumpi::MpiTags::Step, m_CommWorld, &m_MPIRequests.back());
                      insitumpi::MpiTags::Step, m_CommWorld,
                      &m_MPIRequests.back());
        }
    }
    else
    {
        if (m_Verbosity == 5)
        {
            std::cout << "InSituMPI Writer " << m_WriterRank << " new step "
                      << m_CurrentStep << " for " << m_Name
                      << ". Notify nobody." << std::endl;
        }
    }

    m_NCallsPerformPuts = 0;
+3 −0
Original line number Diff line number Diff line
@@ -61,6 +61,9 @@ private:
    std::map<int, int> m_RankToPeerID;
    // Global ranks of the readers directly assigned to me
    std::vector<int> m_RankDirectPeers;
    // true: Reader(s) selected me as primary writer contact
    // e.g. I need to send info to Readers in BeginStep()
    bool m_AmIPrimaryContact;

    int m_CurrentStep = -1; // steps start from 0