Commit 9e106087 authored by Podhorszki, Norbert's avatar Podhorszki, Norbert
Browse files

Send -1 only from primary contacts and then wait for the completion of the -1...

Send -1 only from primary contacts and then wait for the completion of the -1 step message in destructor. This fixes the TestStagingMPMD tests
parent 113de17e
Loading
Loading
Loading
Loading
+21 −22
Original line number Diff line number Diff line
@@ -16,9 +16,9 @@

#include "adios2/helper/adiosFunctions.h" // CSVToVector

#include <chrono>
#include <iostream>
#include <thread> // sleep_for
#include <chrono>

namespace adios2
{
@@ -50,7 +50,7 @@ InSituMPIReader::InSituMPIReader(IO &io, const std::string &name,
                  << ". #readers=" << m_ReaderNproc
                  << " #writers=" << m_RankAllPeers.size()
                  << " #appsize=" << m_GlobalNproc
                  << " #direct peers=" << m_RankDirectPeers.size() << std::endl;
                  << " #direct_peers=" << m_RankDirectPeers.size() << std::endl;
    }

    m_WriteRootGlobalRank = insitumpi::ConnectDirectPeers(
@@ -148,7 +148,7 @@ StepStatus InSituMPIReader::BeginStep(const StepMode mode,
        {
            nanoTO = 1; // avoid 0
        }
        uint64_t pollTime = nanoTO/10; // TO/100 seconds polling time
        uint64_t pollTime = nanoTO / 1000; // TO/100 seconds polling time
        if (pollTime < 1)
        {
            pollTime = 1; // min 1 nanosecond polling time
@@ -159,9 +159,9 @@ StepStatus InSituMPIReader::BeginStep(const StepMode mode,
        }
        if (m_Verbosity == 5 && !m_ReaderRank)
        {
            std::cout << "InSituMPI Reader Polling for "
                      << nanoTO << " nanosec with sleep time of " 
                      << pollTime << " nanosec" << std::endl;
            std::cout << "InSituMPI Reader Polling for " << nanoTO
                      << " nanosec with sleep time of " << pollTime
                      << " nanosec" << std::endl;
        }
        /* Poll */
        double waited = 0.0;
@@ -169,7 +169,8 @@ StepStatus InSituMPIReader::BeginStep(const StepMode mode,
        while (waited < timeoutSeconds)
        {
            startTime = MPI_Wtime();
            MPI_Iprobe(m_RankDirectPeers[0], insitumpi::MpiTags::Step, m_CommWorld, &haveStepMsg, &status);
            MPI_Iprobe(m_RankDirectPeers[0], insitumpi::MpiTags::Step,
                       m_CommWorld, &haveStepMsg, &status);
            if (haveStepMsg)
                break;
            std::this_thread::sleep_for(std::chrono::nanoseconds(pollTime));
@@ -184,7 +185,6 @@ StepStatus InSituMPIReader::BeginStep(const StepMode mode,

            MPI_Recv(&step, 1, MPI_INT, m_RankDirectPeers[0],
                     insitumpi::MpiTags::Step, m_CommWorld, &status);

        }
        /* Exchange steps */
        int maxstep;
@@ -192,8 +192,8 @@ StepStatus InSituMPIReader::BeginStep(const StepMode mode,

        if (m_Verbosity == 5 && !m_ReaderRank)
        {
            std::cout << "InSituMPI Reader Polling result is "
                      << maxstep << std::endl;
            std::cout << "InSituMPI Reader Polling result is " << maxstep
                      << std::endl;
        }

        /* Mutually agreed result */
@@ -211,7 +211,6 @@ StepStatus InSituMPIReader::BeginStep(const StepMode mode,
        {
            return StepStatus::NotReady;
        }
           
    }

    if (m_CurrentStep == -1)
+39 −14
Original line number Diff line number Diff line
@@ -47,21 +47,39 @@ InSituMPIWriter::InSituMPIWriter(IO &io, const std::string &name,
    MPI_Comm_size(mpiComm, &m_WriterNproc);
    m_RankDirectPeers =
        insitumpi::AssignPeers(m_WriterRank, m_WriterNproc, m_RankAllPeers);
    int primaryContact = insitumpi::ConnectDirectPeers(
        m_CommWorld, true, (m_BP3Serializer.m_RankMPI == 0), m_GlobalRank,
        m_RankDirectPeers);
    m_AmIPrimaryContact = static_cast<bool>(primaryContact);
    if (m_Verbosity == 5)
    {
        std::cout << "InSituMPI Writer " << m_WriterRank << " Open(" << m_Name
                  << "). #readers=" << m_RankAllPeers.size()
                  << " #writers=" << m_WriterNproc
                  << " #appsize=" << m_GlobalNproc
                  << " #direct peers=" << m_RankDirectPeers.size() << std::endl;
                  << " #direct_peers=" << m_RankDirectPeers.size()
                  << " primary_contact=" << (m_AmIPrimaryContact ? "yes" : "no")
                  << std::endl;
    }
    int primaryContact = insitumpi::ConnectDirectPeers(
        m_CommWorld, true, (m_BP3Serializer.m_RankMPI == 0), m_GlobalRank,
        m_RankDirectPeers);
    m_AmIPrimaryContact = static_cast<bool>(primaryContact);
}

InSituMPIWriter::~InSituMPIWriter() {}
InSituMPIWriter::~InSituMPIWriter()
{
    // The completion of the -1 step message (sent by primary contacts only) is
    // waited upon here
    if (m_MPIRequests.size())
    {
        if (m_Verbosity == 5)
        {
            std::cout << "InSituMPI Writer " << m_WriterRank
                      << " needs to wait on " << m_MPIRequests.size()
                      << " outstanding MPI async message request..."
                      << std::endl;
        }
        insitumpi::CompleteRequests(m_MPIRequests, true, m_WriterRank);
        m_MPIRequests.clear();
    }
}

StepStatus InSituMPIWriter::BeginStep(StepMode mode, const float timeoutSeconds)
{
@@ -377,13 +395,20 @@ void InSituMPIWriter::DoClose(const int transportIndex)
        std::cout << "InSituMPI Writer " << m_WriterRank << " Close(" << m_Name
                  << ")\n";
    }
    if (m_AmIPrimaryContact)
    {
        m_CurrentStep = -1; // -1 will indicate end of stream
        // Send -1 to all reader peers, asynchronously
        // We need to call Wait on all Isend/Irecv calls at some point otherwise
        // MPI_Comm_free() will never release the communicator
        MPI_Request request;
        for (auto peerRank : m_RankDirectPeers)
        {
            m_MPIRequests.emplace_back();
            MPI_Isend(&m_CurrentStep, 1, MPI_INT, peerRank,
                  insitumpi::MpiTags::Step, m_CommWorld, &request);
                      insitumpi::MpiTags::Step, m_CommWorld,
                      &m_MPIRequests.back());
        }
    }
}

@@ -416,8 +441,8 @@ void InSituMPIWriter::ReceiveReadSchedule(
    std::vector<int> rsLengthsTmp(nPeerReaders);

    // Receive the size of each read schedule from readers
    // We use MPI_ANY_SOURCE here because at this point, we don't know who our
    // peer readers are
    // We use MPI_ANY_SOURCE here because at this point
    // we don't know who our peer readers are
    for (auto i = 0; i < nPeerReaders; i++)
    {
        MPI_Irecv(&rsLengthsTmp[i], 1, MPI_INT, MPI_ANY_SOURCE,