Unverified Commit f9895a7c authored by pnorbert's avatar pnorbert Committed by GitHub
Browse files

Merge pull request #1068 from pnorbert/insitumpi_timeout

Insitumpi timeout
parents 3bb7950e 728ade72
Loading
Loading
Loading
Loading
+44 −8
Original line number Diff line number Diff line
@@ -188,28 +188,64 @@ int ConnectDirectPeers(const MPI_Comm commWorld, const bool IAmSender,
                       const bool IAmWriterRoot, const int globalRank,
                       const std::vector<int> &peers)
{
    int token = (IAmWriterRoot ? 1 : 0);
    int writeRootGlobalRank = -1;
    /* Return value:
         Reader: write root global rank or -1
         Writer: am I first connect to my Reader?
    */
    int retval;
    MPI_Status status;

    /* Writers send msg to each connected Reader, Writer Root sends different
     * value than others */
    int writeRootToken = (IAmWriterRoot ? 1 : 0);
    retval = -1;
    for (const auto peerRank : peers)
    {
        if (IAmSender)
        {
            // std::cout << " Send from " << rank << " to " << peerRank
            //          << std::endl;
            MPI_Send(&token, 1, MPI_INT, peerRank, MpiTags::Connect, commWorld);
            MPI_Send(&writeRootToken, 1, MPI_INT, peerRank, MpiTags::Connect,
                     commWorld);
        }
        else
        {
            // std::cout << " Recv from " << peerRank << " by " << rank
            //          << std::endl;
            MPI_Recv(&token, 1, MPI_INT, peerRank, MpiTags::Connect, commWorld,
                     &status);
            if (token == 1)
                writeRootGlobalRank = peerRank;
            MPI_Recv(&writeRootToken, 1, MPI_INT, peerRank, MpiTags::Connect,
                     commWorld, &status);
            if (writeRootToken == 1)
                retval = peerRank;
        }
    }

    /* Each Reader selects one Writer to be main contact (for BeginStep) */
    int firstAssignedWriterToken = 1;
    for (const auto peerRank : peers)
    {
        if (IAmSender)
        {
            // std::cout << " Receive reader selection from " << peerRank << "
            // by " << rank
            //          << std::endl;
            MPI_Recv(&firstAssignedWriterToken, 1, MPI_INT, peerRank,
                     MpiTags::Connect, commWorld, &status);
            retval = firstAssignedWriterToken;
            /* Note that in case there are less Writer than Readers, here retval
               is set to 1
               multiple times. A mix of 0s and 1s cannot happen */
        }
        else
        {
            // std::cout << " Send reader selection from " << rank << " to " <<
            // peerRank
            //          << std::endl;
            MPI_Send(&firstAssignedWriterToken, 1, MPI_INT, peerRank,
                     MpiTags::Connect, commWorld);
            firstAssignedWriterToken = 0;
        }
    }
    return writeRootGlobalRank;
    return retval;
}

std::vector<MPI_Status> CompleteRequests(std::vector<MPI_Request> &requests,
+5 −2
Original line number Diff line number Diff line
@@ -60,8 +60,11 @@ std::vector<int> AssignPeers(const int rank, const int nproc,
// on the Writer and Reader side at the same time.
// IAmSender is true on the writers, false on the readers.
// IAmWriterRoot is true only on one writer who will send the global metadata.
// return global rank of writer root on the reader who is connected to writer
// root, -1 everywhere else.
// return:
//    on Reader: global rank of writer root on the reader who is connected
//       to writer root, -1 everywhere else.
//    on Writer: 1 on a writer who is the first rank connected to a particular
//       reader, 0 everywhere else (i.e. one Writer selected for each Reader)
int ConnectDirectPeers(const MPI_Comm commWorld, const bool IAmSender,
                       const bool IAmWriterRoot, const int globalRank,
                       const std::vector<int> &peers);
+91 −23
Original line number Diff line number Diff line
@@ -16,7 +16,9 @@

#include "adios2/helper/adiosFunctions.h" // CSVToVector

#include <chrono>
#include <iostream>
#include <thread> // sleep_for

namespace adios2
{
@@ -48,7 +50,7 @@ InSituMPIReader::InSituMPIReader(IO &io, const std::string &name,
                  << ". #readers=" << m_ReaderNproc
                  << " #writers=" << m_RankAllPeers.size()
                  << " #appsize=" << m_GlobalNproc
                  << " #direct peers=" << m_RankDirectPeers.size() << std::endl;
                  << " #direct_peers=" << m_RankDirectPeers.size() << std::endl;
    }

    m_WriteRootGlobalRank = insitumpi::ConnectDirectPeers(
@@ -118,31 +120,98 @@ StepStatus InSituMPIReader::BeginStep(const StepMode mode,
        return StepStatus::EndOfStream;
    }

    // Wait for the Step message from all peers
    // with a timeout

    // FIXME: This is a blocking receive here, must make it async to handle
    // timeouts but then should not issue receives more than once per actual
    // step
    // FIXME: All processes should timeout or succeed together.
    // If some timeouts (and never checks back) and the others succeed,
    // the global metadata operation will hang
    std::vector<MPI_Request> requests(m_RankDirectPeers.size());
    std::vector<int> steps(m_RankDirectPeers.size());
    for (int peerID = 0; peerID < m_RankDirectPeers.size(); peerID++)
    // Wait for the Step message from primary Writer
    MPI_Status status;
    if (timeoutSeconds < 0.0)
    {
        MPI_Irecv(&steps[peerID], 1, MPI_INT, m_RankDirectPeers[peerID],
                  insitumpi::MpiTags::Step, m_CommWorld, &requests[peerID]);
    }
    insitumpi::CompleteRequests(requests, false, m_ReaderRank);
        /* No timeout: do independent, blocking wait for step message from
         * primary writer */
        int step;
        MPI_Recv(&step, 1, MPI_INT, m_RankDirectPeers[0],
                 insitumpi::MpiTags::Step, m_CommWorld, &status);

        if (m_Verbosity == 5)
        {
            std::cout << "InSituMPI Reader " << m_ReaderRank << " new step "
                  << steps[0] << " arrived for " << m_Name << std::endl;
                      << step << " arrived for " << m_Name << std::endl;
        }
    m_CurrentStep = steps[0];
        m_CurrentStep = step;
        // FIXME: missing test whether all writers sent the same step
    }
    else
    {
        /* Have timeout: do a collective wait for a step within timeout.
           Make sure every writer comes to the same conclusion */
        int haveStepMsg = 0;
        uint64_t nanoTO = timeoutSeconds * 1000000000.0;
        if (nanoTO < 1)
        {
            nanoTO = 1; // avoid 0
        }
        uint64_t pollTime = nanoTO / 1000; // TO/100 seconds polling time
        if (pollTime < 1)
        {
            pollTime = 1; // min 1 nanosecond polling time
        }
        if (pollTime > 1000000000)
        {
            pollTime = 1000000000; // max 1 seconds polling time
        }
        if (m_Verbosity == 5 && !m_ReaderRank)
        {
            std::cout << "InSituMPI Reader Polling for " << nanoTO
                      << " nanosec with sleep time of " << pollTime
                      << " nanosec" << std::endl;
        }
        /* Poll */
        double waited = 0.0;
        double startTime, endTime;
        while (waited < timeoutSeconds)
        {
            startTime = MPI_Wtime();
            MPI_Iprobe(m_RankDirectPeers[0], insitumpi::MpiTags::Step,
                       m_CommWorld, &haveStepMsg, &status);
            if (haveStepMsg)
                break;
            std::this_thread::sleep_for(std::chrono::nanoseconds(pollTime));
            endTime = MPI_Wtime();
            waited += endTime - startTime;
        }
        /* Get step msg if available */
        const int NOT_A_STEP = -2; // must be less than any valid step
        int step = NOT_A_STEP;
        if (haveStepMsg)
        {

            MPI_Recv(&step, 1, MPI_INT, m_RankDirectPeers[0],
                     insitumpi::MpiTags::Step, m_CommWorld, &status);
        }
        /* Exchange steps */
        int maxstep;
        MPI_Allreduce(&step, &maxstep, 1, MPI_INT, MPI_MAX, m_MPIComm);

        if (m_Verbosity == 5 && !m_ReaderRank)
        {
            std::cout << "InSituMPI Reader Polling result is " << maxstep
                      << std::endl;
        }

        /* Mutually agreed result */
        if (maxstep != NOT_A_STEP)
        {
            /* Receive my msg now if there was a message on other process */
            if (step == NOT_A_STEP)
            {
                MPI_Recv(&step, 1, MPI_INT, m_RankDirectPeers[0],
                         insitumpi::MpiTags::Step, m_CommWorld, &status);
            }
            m_CurrentStep = step;
        }
        else
        {
            return StepStatus::NotReady;
        }
    }

    if (m_CurrentStep == -1)
    {
@@ -158,7 +227,6 @@ StepStatus InSituMPIReader::BeginStep(const StepMode mode,

        if (m_ReaderRootRank == m_ReaderRank)
        {
            MPI_Status status;
            MPI_Recv(&mdLen, 1, MPI_UNSIGNED_LONG, m_WriteRootGlobalRank,
                     insitumpi::MpiTags::MetadataLength, m_CommWorld, &status);
            if (m_Verbosity == 5)
+57 −23
Original line number Diff line number Diff line
@@ -47,17 +47,20 @@ InSituMPIWriter::InSituMPIWriter(IO &io, const std::string &name,
    MPI_Comm_size(mpiComm, &m_WriterNproc);
    m_RankDirectPeers =
        insitumpi::AssignPeers(m_WriterRank, m_WriterNproc, m_RankAllPeers);
    int primaryContact = insitumpi::ConnectDirectPeers(
        m_CommWorld, true, (m_BP3Serializer.m_RankMPI == 0), m_GlobalRank,
        m_RankDirectPeers);
    m_AmIPrimaryContact = static_cast<bool>(primaryContact);
    if (m_Verbosity == 5)
    {
        std::cout << "InSituMPI Writer " << m_WriterRank << " Open(" << m_Name
                  << "). #readers=" << m_RankAllPeers.size()
                  << " #writers=" << m_WriterNproc
                  << " #appsize=" << m_GlobalNproc
                  << " #direct peers=" << m_RankDirectPeers.size() << std::endl;
                  << " #direct_peers=" << m_RankDirectPeers.size()
                  << " primary_contact=" << (m_AmIPrimaryContact ? "yes" : "no")
                  << std::endl;
    }
    insitumpi::ConnectDirectPeers(m_CommWorld, true,
                                  (m_BP3Serializer.m_RankMPI == 0),
                                  m_GlobalRank, m_RankDirectPeers);
}

InSituMPIWriter::~InSituMPIWriter() {}
@@ -76,11 +79,13 @@ StepStatus InSituMPIWriter::BeginStep(StepMode mode, const float timeoutSeconds)
    }

    m_CurrentStep++; // 0 is the first step
    if (m_AmIPrimaryContact)
    {
        if (m_Verbosity == 5)
        {
            std::cout << "InSituMPI Writer " << m_WriterRank << " new step "
                  << m_CurrentStep << " for " << m_Name << ". Notify peers..."
                  << std::endl;
                      << m_CurrentStep << " for " << m_Name
                      << ". Notify peers..." << std::endl;
        }
        // Send the step to all reader peers, asynchronously
        // We need to call Wait on all Isend/Irecv calls at some point otherwise
@@ -89,7 +94,18 @@ StepStatus InSituMPIWriter::BeginStep(StepMode mode, const float timeoutSeconds)
        {
            m_MPIRequests.emplace_back();
            MPI_Isend(&m_CurrentStep, 1, MPI_INT, peerRank,
                  insitumpi::MpiTags::Step, m_CommWorld, &m_MPIRequests.back());
                      insitumpi::MpiTags::Step, m_CommWorld,
                      &m_MPIRequests.back());
        }
    }
    else
    {
        if (m_Verbosity == 5)
        {
            std::cout << "InSituMPI Writer " << m_WriterRank << " new step "
                      << m_CurrentStep << " for " << m_Name
                      << ". Notify nobody." << std::endl;
        }
    }

    m_NCallsPerformPuts = 0;
@@ -363,13 +379,31 @@ void InSituMPIWriter::DoClose(const int transportIndex)
        std::cout << "InSituMPI Writer " << m_WriterRank << " Close(" << m_Name
                  << ")\n";
    }
    if (m_AmIPrimaryContact)
    {
        m_CurrentStep = -1; // -1 will indicate end of stream
        // Send -1 to all reader peers, asynchronously
    MPI_Request request;
        // We need to call Wait on all Isend/Irecv calls at some point otherwise
        // MPI_Comm_free() will never release the communicator
        for (auto peerRank : m_RankDirectPeers)
        {
            m_MPIRequests.emplace_back();
            MPI_Isend(&m_CurrentStep, 1, MPI_INT, peerRank,
                  insitumpi::MpiTags::Step, m_CommWorld, &request);
                      insitumpi::MpiTags::Step, m_CommWorld,
                      &m_MPIRequests.back());
        }
        // The completion of the -1 step message is waited upon here
        // because the destructor may be called after MPI_Finalize()
        // so this is the last safe point to do this.
        if (m_Verbosity == 5)
        {
            std::cout << "InSituMPI Writer " << m_WriterRank
                      << " needs to wait on " << m_MPIRequests.size()
                      << " outstanding MPI async message request..."
                      << std::endl;
        }
        insitumpi::CompleteRequests(m_MPIRequests, true, m_WriterRank);
        m_MPIRequests.clear();
    }
}

@@ -402,8 +436,8 @@ void InSituMPIWriter::ReceiveReadSchedule(
    std::vector<int> rsLengthsTmp(nPeerReaders);

    // Receive the size of each read schedule from readers
    // We use MPI_ANY_SOURCE here because at this point, we don't know who our
    // peer readers are
    // We use MPI_ANY_SOURCE here because at this point
    // we don't know who our peer readers are
    for (auto i = 0; i < nPeerReaders; i++)
    {
        MPI_Irecv(&rsLengthsTmp[i], 1, MPI_INT, MPI_ANY_SOURCE,
+3 −0
Original line number Diff line number Diff line
@@ -61,6 +61,9 @@ private:
    std::map<int, int> m_RankToPeerID;
    // Global ranks of the readers directly assigned to me
    std::vector<int> m_RankDirectPeers;
    // true: Reader(s) selected me as primary writer contact
    // e.g. I need to send info to Readers in BeginStep()
    bool m_AmIPrimaryContact;

    int m_CurrentStep = -1; // steps start from 0

Loading