Commit faf14e9b authored by Podhorszki, Norbert's avatar Podhorszki, Norbert
Browse files

Implement timeout in InSituMPI Reader

parent 7cb6912a
Loading
Loading
Loading
Loading
+1 −0
Original line number Diff line number Diff line
@@ -198,6 +198,7 @@ int ConnectDirectPeers(const MPI_Comm commWorld, const bool IAmSender,
    /* Writers send msg to each connected Reader, Writer Root sends different
     * value than others */
    int writeRootToken = (IAmWriterRoot ? 1 : 0);
    retval = -1;
    for (const auto peerRank : peers)
    {
        if (IAmSender)
+92 −18
Original line number Diff line number Diff line
@@ -17,6 +17,8 @@
#include "adios2/helper/adiosFunctions.h" // CSVToVector

#include <iostream>
#include <thread> // sleep_for
#include <chrono>

namespace adios2
{
@@ -118,27 +120,99 @@ StepStatus InSituMPIReader::BeginStep(const StepMode mode,
        return StepStatus::EndOfStream;
    }

    // Wait for the Step message from all peers
    // with a timeout

    // FIXME: This is a blocking receive here, must make it async to handle
    // timeouts but then should not issue receives more than once per actual
    // step
    // FIXME: All processes should timeout or succeed together.
    // If some timeouts (and never checks back) and the others succeed,
    // the global metadata operation will hang
    int step;
    // Wait for the Step message from primary Writer
    MPI_Status status;
    MPI_Recv(&step, 1, MPI_INT, m_RankDirectPeers[0], insitumpi::MpiTags::Step,
             m_CommWorld, &status);
    if (timeoutSeconds < 0.0)
    {
        /* No timeout: do independent, blocking wait for step message from
         * primary writer */
        int step;
        MPI_Recv(&step, 1, MPI_INT, m_RankDirectPeers[0],
                 insitumpi::MpiTags::Step, m_CommWorld, &status);

        if (m_Verbosity == 5)
        {
        std::cout << "InSituMPI Reader " << m_ReaderRank << " new step " << step
                  << " arrived for " << m_Name << std::endl;
            std::cout << "InSituMPI Reader " << m_ReaderRank << " new step "
                      << step << " arrived for " << m_Name << std::endl;
        }
        m_CurrentStep = step;
        // FIXME: missing test whether all writers sent the same step
    } 
    else
    {
        /* Have timeout: do a collective wait for a step within timeout.
           Make sure every writer comes to the same conclusion */
        int haveStepMsg = 0;
        uint64_t nanoTO = timeoutSeconds*1000000000.0;
        if (nanoTO < 1)
        {
            nanoTO = 1; // avoid 0 
        }
        uint64_t pollTime = nanoTO/10; // TO/100 seconds polling time
        if (pollTime < 1)
        {
            pollTime = 1; // min 1 nanosecond polling time
        }
        if (pollTime > 1000000000)
        {
            pollTime = 1000000000; // max 1 seconds polling time
        }
        if (m_Verbosity == 5 && !m_ReaderRank)
        {
            std::cout << "InSituMPI Reader Polling for "
                      << nanoTO << " nanosec with sleep time of " 
                      << pollTime << " nanosec" << std::endl;
        }
        /* Poll */
        double waited = 0.0;
        double startTime, endTime;
        while (waited < timeoutSeconds) 
        {
            startTime = MPI_Wtime();
            MPI_Iprobe(m_RankDirectPeers[0], insitumpi::MpiTags::Step, m_CommWorld, &haveStepMsg, &status);
            if (haveStepMsg)
                break;
            std::this_thread::sleep_for(std::chrono::nanoseconds(pollTime));
            endTime = MPI_Wtime();
            waited += endTime-startTime;
        }
        /* Get step msg if available */
        const int NOT_A_STEP = -2; // must be less than any valid step
        int step = NOT_A_STEP;
        if (haveStepMsg)
        {

            MPI_Recv(&step, 1, MPI_INT, m_RankDirectPeers[0],
                 insitumpi::MpiTags::Step, m_CommWorld, &status);

        }
        /* Exchange steps */
        int maxstep;
        MPI_Allreduce(&step, &maxstep, 1, MPI_INT, MPI_MAX, m_MPIComm);
        
        if (m_Verbosity == 5 && !m_ReaderRank)
        {
            std::cout << "InSituMPI Reader Polling result is "
                      << maxstep << std::endl;
        }
        
        /* Mutually agreed result */
        if (maxstep != NOT_A_STEP)
        {
            /* Receive my msg now if there was a message on other process */
            if (step == NOT_A_STEP) 
            {
                MPI_Recv(&step, 1, MPI_INT, m_RankDirectPeers[0],
                     insitumpi::MpiTags::Step, m_CommWorld, &status);
            }
            m_CurrentStep = step;
        }
        else
        {
            return StepStatus::NotReady;
        }
           
    }
    
    if (m_CurrentStep == -1)
    {