Commit a5bf765a authored by David M. Rogers's avatar David M. Rogers
Browse files

MPI in-stream enqueue example.

parent 2e1a1e2e
Loading
Loading
Loading
Loading

build.rc

0 → 100644
+10 −0
Original line number Diff line number Diff line
cflags = (-march=znver3 -fPIC -O2 -I$ROCM_PATH/include -I$ROCM_PATH/hip/include -I$ROCM_PATH/hipcub/include -I$ROCM_PATH/rocblas/include --amdgpu-target=gfx90a -I$MPICH_DIR/include)
libs = (-L$ROCM_PATH/lib -lamdhip64 -L$ROCM_PATH/rocblas/lib -lrocblas -L$MPICH_DIR/lib -lmpi -L$CRAY_MPICH_ROOTDIR/gtl/lib -lmpi_gtl_hsa)

cflags=$"cflags
libs=$"libs

CXXFLAGS=$cflags \
LDFLAGS=$libs \
  cmake -DCMAKE_HIP_ARCHITECTURES=gfx90a  \
        -DCMAKE_CXX_COMPILER=hipcc ..

include/helper.hh

0 → 100644
+28 −0
Original line number Diff line number Diff line
#pragma once

#ifdef ENABLE_GPU
#include <cuda_runtime.h>

#define CUDACHECK(cmd) do {                         \
  cudaError_t e = cmd;                              \
  if( e != cudaSuccess ) {                          \
    printf("Failed: Cuda error %s:%d '%s'\n",       \
        __FILE__,__LINE__,cudaGetErrorString(e));   \
    exit(EXIT_FAILURE);                             \
  }                                                 \
} while(0)
#define devMalloc(ptr,size) CUDACHECK( cudaMalloc(&ptr, size) )
#define devFree(ptr) CUDACHECK( cudaFree(ptr) )
#else

#define ERRCHECK(cmd) do {                          \
  int e = cmd;                                      \
  if( e != 0 ) {                                    \
    printf("Failed: error %s:%d '%d'\n",            \
        __FILE__,__LINE__,e);                       \
    exit(EXIT_FAILURE);                             \
  }                                                 \
} while(0)
#define devMalloc(ptr,size) ERRCHECK( (ptr = reinterpret_cast<decltype(ptr)>(malloc(size))) == nullptr )
#define devFree(ptr) free( ptr )
#endif
+65 −32
Original line number Diff line number Diff line
#include <mpiwrap.hh>
#include <stdio.h>
#include <memory>

#ifdef ENABLE_GPU
#include <cuda_runtime.h>

#define CUDACHECK(cmd) do {                         \
  cudaError_t e = cmd;                              \
  if( e != cudaSuccess ) {                          \
    printf("Failed: Cuda error %s:%d '%s'\n",       \
        __FILE__,__LINE__,cudaGetErrorString(e));   \
    exit(EXIT_FAILURE);                             \
  }                                                 \
} while(0)
#define devMalloc(ptr,size) CUDACHECK( cudaMalloc(&ptr, size) )
#define devFree(ptr) CUDACHECK( cudaFree(ptr) )
#else

#define ERRCHECK(cmd) do {                          \
  int e = cmd;                                      \
  if( e != 0 ) {                                    \
    printf("Failed: error %s:%d '%d'\n",            \
        __FILE__,__LINE__,e);                       \
    exit(EXIT_FAILURE);                             \
  }                                                 \
} while(0)
#define devMalloc(ptr,size) ERRCHECK( (ptr = reinterpret_cast<decltype(ptr)>(malloc(size))) == nullptr )
#define devFree(ptr) free( ptr )
#endif
#include "helper.hh"

using MPIp = std::shared_ptr<MPIH>;

/** Fake NCCL/RCCL providing an allreduce function, but
 * actually doing nothing more than spawning a thread to:
 *   1. sync. the stream
 *   2. run MPI_Allreduce
 *   3. notify the stream we're complete
 */
struct CCLH {
    MPIp mpi;
    cudaStream_t stream;

    CCLH(MPIp _mpi) : mpi(_mpi) {
        CUDACHECK( cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking) );
    }
    ~CCLH() {
        CUDACHECK(cudaStreamDestroy(stream));
    }
    struct ReduceData {
        const void *send;
        void *recv;
        int count;
        MPI_Comm comm;
        ReduceData(const void *s, void *r, int c, MPI_Comm cm)
            :  send(s), recv(r), count(c), comm(cm) { }
    };
    static void doAllReduce(void *_data) {
        ReduceData *data = (ReduceData *)_data;
        MPI_Allreduce(data->send, data->recv, data->count,
                      MPI_DOUBLE, MPI_SUM, data->comm);
        delete data;
    }

    // Warning: Assuming double here.
    //
    // Enqueue the doAllReduce operation onto the cuda stream.
    // see also: cudaError_t cudaStreamAddCallback ( 
    //               cudaStream_t stream, cudaStreamCallback_t callback,
    //               void* userData, unsigned int  flags )
    //           and cudaLaunchHostFunc
    void Allreduce(const void* send, void* recv, int count) {
        ReduceData *data = new ReduceData(send, recv, count, mpi->comm);
        CUDACHECK( hipLaunchHostFunc(stream, CCLH::doAllReduce, data));
    }
};
using CCLp = std::shared_ptr<CCLH>;

/* Note: ?CCL allreduce was:
int cclAllReduce((const void*)send, (void*)recv, size, ncclFloat, ncclSum,
                            nccl->comm, nccl->stream) {
    std::cout << "Hello" << std::endl;
    run(nccl, sendbuff, recvbuff);
    return 0;
}*/

int main(int argc, char *argv[]) {
    MPIH mpi(&argc, &argv);
    MPIp mpi = std::make_shared<MPIH>(&argc, &argv);
    CCLp ccl = std::make_shared<CCLH>(mpi);
    double *src, *dst;
    int count = 1024;
    int count = 8*1024*1024; // 64 MB

    devMalloc(src, count*sizeof(double));
    devMalloc(dst, count*sizeof(double));

    if(mpi.rank == 0)
        printf("Broadcasting %d doubles from root.\n", count);
    //MPI_Bcast(src, count, MPI_DOUBLE, 0, mpi.comm);
    MPI_Allreduce(src, dst, count, MPI_DOUBLE, MPI_SUM, mpi.comm);
    if(mpi->rank == 0)
        printf("Allreduce: %d doubles.\n", count);
    //MPI_Allreduce(src, dst, count, MPI_DOUBLE, MPI_SUM, mpi.comm);
    ccl->Allreduce(src, dst, count);


    devFree(src);
    devFree(dst);