Unverified Commit 3d156466 authored by Bolea Sanchez, Vicente Adolfo's avatar Bolea Sanchez, Vicente Adolfo Committed by GitHub
Browse files

Merge pull request #3885 from vicentebolea/backport-3588

Merge pull request #3588 from vicentebolea/fix-mpi-dp
parents b91aa014 4525797b
Loading
Loading
Loading
Loading
+70 −19
Original line number Diff line number Diff line
@@ -30,6 +30,7 @@

#include <mpi.h>

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
@@ -54,7 +55,7 @@ typedef struct _MpiWriterContactInfo
{
    char ContactString[MPI_DP_CONTACT_STRING_LEN];
    void *StreamWPR;
    int PID;
    long taskID;
} * MpiWriterContactInfo;

/* Base Stream class, used implicitly */
@@ -62,7 +63,7 @@ typedef struct _MpiStream
{
    void *CP_Stream;
    int Rank;
    int PID;
    long taskID;
} MpiStream;

/* Link Stream class, used implicitly */
@@ -231,7 +232,8 @@ static FMField MpiWriterContactList[] = {
     sizeof(char), FMOffset(MpiWriterContactInfo, ContactString)},
    {"writer_ID", "integer", sizeof(void *),
     FMOffset(MpiWriterContactInfo, StreamWPR)},
    {"PID", "integer", sizeof(int), FMOffset(MpiWriterContactInfo, PID)},
    {"taskID", "integer", sizeof(long),
      FMOffset(MpiWriterContactInfo, taskID)},
    {NULL, NULL, 0, 0}};

static FMStructDescRec MpiWriterContactStructs[] = {
@@ -241,6 +243,16 @@ static FMStructDescRec MpiWriterContactStructs[] = {

/*****Internal functions*****************************************************/

/**
 * Return an unique process ID (Task ID) for the current process. We do this by
 * combining the PID of the process and the hostid (as return the same output
 * as `hostid` or the content of /etc/machine-id in modern UNIX-like systems).
 */
static uint64_t GetUniqueTaskId()
{
    return ((uint32_t)getpid() * (1ll << 32ll)) | (uint32_t)gethostid();
}

static void MpiReadReplyHandler(CManager cm, CMConnection conn, void *msg_v,
                                void *client_Data, attr_list attrs);

@@ -256,9 +268,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v,
 * the reader side.  It should do whatever is necessary to initialize a new
 * reader-side data plane.  A pointer to per-reader-rank contact information
 * should be placed in *ReaderContactInfoPtr.  The structure of that
 * information should be described by DPInterface.ReaderContactFormats.  (This
 * is an FFS format description.  See
 * https://www.cc.gatech.edu/systems/projects/FFS/.)
 * information should be described by DPInterface.ReaderContactFormats.
 */
static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream,
                                  void **ReaderContactInfoPtr,
@@ -271,7 +281,7 @@ static DP_RS_Stream MpiInitReader(CP_Services Svcs, void *CP_Stream,
    CMFormat F;

    Stream->Stream.CP_Stream = CP_Stream;
    Stream->Stream.PID = getpid();
    Stream->Stream.taskID = GetUniqueTaskId();
    Stream->Link.Stats = Stats;

    SMPI_Comm_rank(comm, &Stream->Stream.Rank);
@@ -322,7 +332,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream,
    SMPI_Comm_rank(comm, &Stream->Stream.Rank);

    Stream->Stream.CP_Stream = CP_Stream;
    Stream->Stream.PID = getpid();
    Stream->Stream.taskID = GetUniqueTaskId();
    STAILQ_INIT(&Stream->TimeSteps);
    TAILQ_INIT(&Stream->Readers);

@@ -347,8 +357,7 @@ static DP_WS_Stream MpiInitWriter(CP_Services Svcs, void *CP_Stream,
 * on the connecting peer in InitReader) and should create its own
 * per-writer-rank contact information and place it in *writerContactInfoPtr.
 * The structure of that information should be described by
 * DPInterface.WriterContactFormats.   (This is an FFS format description.  See
 * https://www.cc.gatech.edu/systems/projects/FFS/.)
 * DPInterface.WriterContactFormats.
 */
static DP_WSR_Stream
MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v,
@@ -392,7 +401,7 @@ MpiInitWriterPerReader(CP_Services Svcs, DP_WS_Stream WS_Stream_v,
             "Writer Rank %d, test contact", Rank);

    StreamWPR->MyContactInfo.StreamWPR = StreamWPR;
    StreamWPR->MyContactInfo.PID = StreamWR->Stream.PID;
    StreamWPR->MyContactInfo.taskID = StreamWR->Stream.taskID;
    *WriterContactInfoPtr = &StreamWPR->MyContactInfo;

    return StreamWPR;
@@ -503,9 +512,9 @@ static void *MpiReadRemoteMemory(CP_Services Svcs, DP_RS_Stream Stream_v,
    ret->cm = cm;
    ret->CPStream = Stream->Stream.CP_Stream;
    ret->DestinationRank = Rank;
    ret->CommType = (TargetContact->PID == Stream->Stream.PID) ? MPI_DP_LOCAL
    ret->CommType =
        (TargetContact->taskID == Stream->Stream.taskID) ? MPI_DP_LOCAL
                                                         : MPI_DP_REMOTE;

    if (ret->CommType == MPI_DP_REMOTE)
    {
        CMCondition_set_client_data(cm, ReadRequestMsg.NotifyCondition, ret);
@@ -576,7 +585,7 @@ static int MpiWaitForCompletion(CP_Services Svcs, void *Handle_v)
    else
    {
        Svcs->verbose(
            Handle->CPStream, DPTraceVerbose,
            Handle->CPStream, DPCriticalVerbose,
            "Remote memory read to rank %d with condition %d has FAILED"
            "because of "
            "writer failure\n",
@@ -615,7 +624,7 @@ static void MpiReadRequestHandler(CManager cm, CMConnection conn, void *msg_v,
    if (!RequestedData)
    {
        PERFSTUBS_TIMER_STOP_FUNC(timer);
        Svcs->verbose(StreamWR->Stream.CP_Stream, DPPerStepVerbose,
        Svcs->verbose(StreamWR->Stream.CP_Stream, DPCriticalVerbose,
                      "Failed to read TimeStep %ld, not found\n",
                      ReadRequestMsg->TimeStep);
        return;
@@ -850,11 +859,42 @@ static void MpiNotifyConnFailure(CP_Services Svcs, DP_RS_Stream Stream_v,
                  FailedPeerRank);
}

/** MpiDisconnectWriterPerReader.
 *
 * This is called whenever a reader disconnect from a writer. This function
 * simply disconnect the mpi communicator, it does not frees any data
 * structure. We must do it in this way since:
 *
 * - There is the possibility of the failed peer to re-enter in the network.
 * - We must disconnect the MPI port for that particular mpi reader task since
 *   otherwise it the reader task might hung in mpi_finalize, in the case the
 *   the failure leads to a application graceful exit.
 */
static void MpiDisconnectWriterPerReader(CP_Services Svcs, DP_WSR_Stream WSR_Stream_v)
{
    MpiStreamWPR StreamWPR = (MpiStreamWPR)WSR_Stream_v;
    MpiStreamWR StreamWR = StreamWPR->StreamWR;

    const int CohortSize = StreamWPR->Link.CohortSize;

    Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
                  "MpiDisconnectWriterPerReader invoked [rank:%d;cohortSize:%d]\n", CohortSize,
                  StreamWR->Stream.Rank);

    for (int i = 0; i < CohortSize; i++)
    {
        if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL)
        {
            MPI_Comm_disconnect(&StreamWPR->CohortMpiComms[i]);
        }
    }
}

/**
 * MpiDestroyWriterPerReader.
 *
 * This is called whenever a reader disconnect from a writer. This function
 * also removes the StreamWPR from its own StreamWR.
 * This is called by the MpiDestroyWriter function. This function will free any resource
 * allocated to the particulare WriterPerReader instance (StreamWPR).
 */
static void MpiDestroyWriterPerReader(CP_Services Svcs,
                                      DP_WSR_Stream WSR_Stream_v)
@@ -864,6 +904,10 @@ static void MpiDestroyWriterPerReader(CP_Services Svcs,

    const int CohortSize = StreamWPR->Link.CohortSize;

    Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
                  "MpiDestroyWriterPerReader invoked [rank:%d;cohortSize:%d]", CohortSize,
                  StreamWR->Stream.Rank);

    for (int i = 0; i < CohortSize; i++)
    {
        if (StreamWPR->CohortMpiComms[i] != MPI_COMM_NULL)
@@ -889,6 +933,9 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
{
    MpiStreamWR StreamWR = (MpiStreamWR)WS_Stream_v;

    Svcs->verbose(StreamWR->Stream.CP_Stream, DPTraceVerbose,
                  "MpiDestroyWriter invoked [rank:%d]\n", StreamWR->Stream.Rank);

    pthread_mutex_lock(&StreamWR->MutexReaders);
    while (!TAILQ_EMPTY(&StreamWR->Readers))
    {
@@ -918,6 +965,10 @@ static void MpiDestroyWriter(CP_Services Svcs, DP_WS_Stream WS_Stream_v)
static void MpiDestroyReader(CP_Services Svcs, DP_RS_Stream RS_Stream_v)
{
    MpiStreamRD StreamRS = (MpiStreamRD)RS_Stream_v;

    Svcs->verbose(StreamRS->Stream.CP_Stream, DPTraceVerbose,
                  "MpiDestroyReader invoked [rank:%d]\n", StreamRS->Stream.Rank);

    const int CohortSize = StreamRS->Link.CohortSize;

    for (int i = 0; i < CohortSize; i++)
@@ -948,7 +999,7 @@ extern CP_DP_Interface LoadMpiDP()
        .getPriority = MpiGetPriority,
        .destroyReader = MpiDestroyReader,
        .destroyWriter = MpiDestroyWriter,
        .destroyWriterPerReader = MpiDestroyWriterPerReader,
        .destroyWriterPerReader = MpiDisconnectWriterPerReader,
        .notifyConnFailure = MpiNotifyConnFailure,
    };