Commit 1ec1ec43 authored by Brad King's avatar Brad King
Browse files

sst: Re-implement SMPI_ interfaces as C wrappers of Comm encapsulation

`adios2::helper::Comm` encapsulates ADIOS multi-process communication,
but it is a C++ interface and SST is implemented in C.  Create a C
wrapper to use the Comm encapsulation from SST.  This will enable the
SST engine to be used by both serial and parallel applications from a
single build.
parent 58d8916e
......@@ -19,12 +19,6 @@
#include "adios2/helper/adiosFunctions.h"
#include "adios2/toolkit/profiling/taustubs/tautimer.hpp"
#ifdef ADIOS2_HAVE_MPI
#include "adios2/helper/adiosCommMPI.h"
#else
#include "adios2/toolkit/sst/mpiwrap.h"
#endif
namespace adios2
{
namespace core
......@@ -41,13 +35,7 @@ SstReader::SstReader(IO &io, const std::string &name, const Mode mode,
Init();
m_Input = SstReaderOpen(cstr, &Params,
#ifdef ADIOS2_HAVE_MPI
CommAsMPI(m_Comm)
#else
MPI_COMM_NULL
#endif
);
m_Input = SstReaderOpen(cstr, &Params, &m_Comm);
if (!m_Input)
{
delete[] cstr;
......
......@@ -16,12 +16,6 @@
#include "SstWriter.tcc"
#include "adios2/toolkit/profiling/taustubs/tautimer.hpp"
#ifdef ADIOS2_HAVE_MPI
#include "adios2/helper/adiosCommMPI.h"
#else
#include "adios2/toolkit/sst/mpiwrap.h"
#endif
namespace adios2
{
namespace core
......@@ -115,13 +109,7 @@ SstWriter::SstWriter(IO &io, const std::string &name, const Mode mode,
Init();
m_Output = SstWriterOpen(name.c_str(), &Params,
#ifdef ADIOS2_HAVE_MPI
CommAsMPI(m_Comm)
#else
MPI_COMM_NULL
#endif
);
m_Output = SstWriterOpen(name.c_str(), &Params, &m_Comm);
if (m_MarshalMethod == SstMarshalBP)
{
......
......@@ -2,16 +2,18 @@
# Distributed under the OSI-approved Apache License, Version 2.0. See
# accompanying file Copyright.txt for details.
#------------------------------------------------------------------------------#
add_library(sst
add_library(sst OBJECT
dp/dp.c
dp/evpath_dp.c
cp/cp_reader.c
cp/cp_writer.c
cp/cp_common.c
cp/ffs_marshal.c
mpidummy.cpp
mpiwrap.cpp
sst_comm.cpp
)
if(BUILD_SHARED_LIBS)
set_property(TARGET sst PROPERTY POSITION_INDEPENDENT_CODE 1)
endif()
target_link_libraries(sst PRIVATE taustubs)
......@@ -70,13 +72,13 @@ target_include_directories(sst PRIVATE
)
target_link_libraries(sst PRIVATE adios2::thirdparty::EVPath)
if(ADIOS2_HAVE_MPI)
target_link_libraries(sst PUBLIC MPI::MPI_C)
target_link_libraries(sst PRIVATE MPI::MPI_C)
endif()
install(TARGETS sst EXPORT adios2Exports
RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR}
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
)
if(NOT BUILD_SHARED_LIBS)
# When sst is included in a static library we must export the
# target so that consumers know the link dependencies.
install(TARGETS sst EXPORT adios2Exports)
endif()
add_subdirectory(util)
......@@ -649,7 +649,7 @@ void **CP_consolidateDataToRankZero(SstStream Stream, void *LocalInfo,
{
FFSBuffer Buf = create_FFSBuffer();
int DataSize;
int *RecvCounts = NULL;
size_t *RecvCounts = NULL;
char *Buffer;
struct _CP_DP_init_info **Pointers = NULL;
......@@ -658,9 +658,10 @@ void **CP_consolidateDataToRankZero(SstStream Stream, void *LocalInfo,
if (Stream->Rank == 0)
{
RecvCounts = malloc(Stream->CohortSize * sizeof(int));
RecvCounts = malloc(Stream->CohortSize * sizeof(*RecvCounts));
}
SMPI_Gather(&DataSize, 1, SMPI_INT, RecvCounts, 1, SMPI_INT, 0,
size_t DataSz = DataSize;
SMPI_Gather(&DataSz, 1, SMPI_SIZE_T, RecvCounts, 1, SMPI_SIZE_T, 0,
Stream->mpiComm);
/*
......@@ -668,13 +669,13 @@ void **CP_consolidateDataToRankZero(SstStream Stream, void *LocalInfo,
* and displacements for each rank
*/
int *Displs = NULL;
size_t *Displs = NULL;
char *RecvBuffer = NULL;
if (Stream->Rank == 0)
{
int TotalLen = 0;
Displs = malloc(Stream->CohortSize * sizeof(int));
Displs = malloc(Stream->CohortSize * sizeof(*Displs));
Displs[0] = 0;
TotalLen = (RecvCounts[0] + 7) & ~7;
......@@ -761,16 +762,17 @@ void **CP_consolidateDataToAll(SstStream Stream, void *LocalInfo,
{
FFSBuffer Buf = create_FFSBuffer();
int DataSize;
int *RecvCounts;
size_t *RecvCounts;
char *Buffer;
struct _CP_DP_init_info **Pointers = NULL;
Buffer = FFSencode(Buf, FMFormat_of_original(Type), LocalInfo, &DataSize);
RecvCounts = malloc(Stream->CohortSize * sizeof(int));
RecvCounts = malloc(Stream->CohortSize * sizeof(*RecvCounts));
SMPI_Allgather(&DataSize, 1, SMPI_INT, RecvCounts, 1, SMPI_INT,
size_t DataSz = DataSize;
SMPI_Allgather(&DataSz, 1, SMPI_SIZE_T, RecvCounts, 1, SMPI_SIZE_T,
Stream->mpiComm);
/*
......@@ -778,12 +780,12 @@ void **CP_consolidateDataToAll(SstStream Stream, void *LocalInfo,
* and displacements for each rank
*/
int *Displs;
size_t *Displs;
char *RecvBuffer = NULL;
int i;
int TotalLen = 0;
Displs = malloc(Stream->CohortSize * sizeof(int));
Displs = malloc(Stream->CohortSize * sizeof(*Displs));
Displs[0] = 0;
TotalLen = (RecvCounts[0] + 7) & ~7;
......
......@@ -4,7 +4,7 @@
#include <sst_data.h>
#include "adios2/common/ADIOSConfig.h"
#include "mpiwrap.h"
#include "sst_comm.h"
#include <evpath.h>
/*!
......
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* ADIOS is freely available under the terms of the BSD license described
* in the COPYING file in the top level directory of this source distribution.
*
* Copyright (c) 2008 - 2009. UT-BATTELLE, LLC. All rights reserved.
A dummy MPI implementation for the BP READ API, to have an MPI-free version
of the API
*/
#include "mpidummy.h"
#include <cinttypes>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <numeric>
#include <chrono>
#include <iostream>
#include <string>
// remove warnings on Windows
#ifdef _WIN32
#pragma warning(disable : 4996) // fopen
#pragma warning(disable : 4477) // strcpy, sprintf
#endif
// Use some preprocessor hackery to achieve two objectives:
// - If we don't build with a real MPI, these functions below will go in the
// global namespace (they're extern "C" in this case, anyway).
// - If we do have a real MPI, we put these functions into the helper::mpidummy
// namespace so they can be used from the SMPI_* wrappers
#ifdef ADIOS2_HAVE_MPI
namespace adios2
{
namespace helper
{
namespace mpidummy
{
#define MPIDUMMY mpidummy
#else
#define MPIDUMMY
#endif
static inline int CheckReturn(int ier)
{
if (ier != MPI_SUCCESS)
{
std::cerr << "mpidummy: MPI function returned error code " << ier
<< ". Aborting!" << std::endl;
std::abort();
}
return ier;
}
#define RETURN_CHECK(ier) return CheckReturn(ier)
int MPI_Init(int * /*argc*/, char *** /*argv*/) { RETURN_CHECK(MPI_SUCCESS); }
int MPI_Finalize() { RETURN_CHECK(MPI_SUCCESS); }
int MPI_Initialized(int *flag)
{
*flag = 1;
RETURN_CHECK(MPI_SUCCESS);
}
int MPI_Finalized(int *flag)
{
*flag = 0;
RETURN_CHECK(MPI_SUCCESS);
}
int MPI_Comm_split(MPI_Comm /*comm*/, int /*color*/, int /*key*/,
MPI_Comm * /*comm_out*/)
{
RETURN_CHECK(MPI_SUCCESS);
}
int MPI_Barrier(MPI_Comm /*comm*/) { RETURN_CHECK(MPI_SUCCESS); }
int MPI_Bcast(void * /*buffer*/, int /*count*/, MPI_Datatype /*datatype*/,
int /*root*/, MPI_Comm /*comm*/)
{
RETURN_CHECK(MPI_SUCCESS);
}
int MPI_Comm_dup(MPI_Comm comm, MPI_Comm *newcomm)
{
*newcomm = comm;
RETURN_CHECK(MPI_SUCCESS);
}
int MPI_Comm_rank(MPI_Comm /*comm*/, int *rank)
{
*rank = 0;
RETURN_CHECK(MPI_SUCCESS);
}
int MPI_Comm_size(MPI_Comm /*comm*/, int *size)
{
*size = 1;
RETURN_CHECK(MPI_SUCCESS);
}
int MPI_Comm_free(MPI_Comm *comm)
{
*comm = 0;
RETURN_CHECK(MPI_SUCCESS);
}
#ifndef ADIOS2_HAVE_MPI
MPI_Comm MPI_Comm_f2c(MPI_Fint comm) { return comm; }
#endif
int MPI_Gather(const void *sendbuf, int sendcnt, MPI_Datatype sendtype,
void *recvbuf, int recvcnt, MPI_Datatype recvtype, int root,
MPI_Comm comm)
{
int ier = MPI_SUCCESS;
int n;
size_t nsent = 0, nrecv = 0;
if (sendcnt > 0 && !sendbuf)
{
RETURN_CHECK(MPI_ERR_BUFFER);
}
if (recvcnt > 0 && !recvbuf)
{
RETURN_CHECK(MPI_ERR_BUFFER);
}
if (root != 0)
{
RETURN_CHECK(MPI_ERR_ROOT);
}
ier = MPIDUMMY::MPI_Type_size(sendtype, &n);
if (ier != MPI_SUCCESS)
{
RETURN_CHECK(ier);
}
nsent = n * sendcnt;
ier = MPIDUMMY::MPI_Type_size(recvtype, &n);
if (ier != MPI_SUCCESS)
{
RETURN_CHECK(ier);
}
nrecv = n * recvcnt;
if (nrecv != nsent)
{
ier = MPI_ERR_COUNT;
}
if (ier == MPI_SUCCESS)
{
std::memcpy(recvbuf, sendbuf, nsent);
}
RETURN_CHECK(ier);
}
int MPI_Gatherv(const void *sendbuf, int sendcnt, MPI_Datatype sendtype,
void *recvbuf, const int *recvcnts, const int * /*displs */,
MPI_Datatype recvtype, int root, MPI_Comm comm)
{
int ier = MPI_SUCCESS;
if (*recvcnts != sendcnt)
{
ier = MPI_ERR_COUNT;
RETURN_CHECK(ier);
}
ier = MPIDUMMY::MPI_Gather(sendbuf, sendcnt, sendtype, recvbuf, *recvcnts,
recvtype, root, comm);
RETURN_CHECK(ier);
}
int MPI_Allgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int recvcount, MPI_Datatype recvtype,
MPI_Comm comm)
{
return MPIDUMMY::MPI_Gather(sendbuf, sendcount, sendtype, recvbuf,
recvcount, recvtype, 0, comm);
}
int MPI_Allgatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
void *recvbuf, int *recvcounts, int *displs,
MPI_Datatype recvtype, MPI_Comm comm)
{
return MPIDUMMY::MPI_Gatherv(sendbuf, sendcount, sendtype, recvbuf,
recvcounts, displs, recvtype, 0, comm);
}
int MPI_Scatter(const void *sendbuf, int sendcnt, MPI_Datatype sendtype,
void *recvbuf, int recvcnt, MPI_Datatype recvtype, int root,
MPI_Comm comm)
{
int ier = MPI_SUCCESS;
int n;
size_t nsent = 0, nrecv = 0;
if (sendcnt > 0 && !sendbuf)
{
RETURN_CHECK(MPI_ERR_BUFFER);
}
if (recvcnt > 0 && !recvbuf)
{
RETURN_CHECK(MPI_ERR_BUFFER);
}
if (root != 0)
{
RETURN_CHECK(MPI_ERR_ROOT);
}
ier = MPIDUMMY::MPI_Type_size(sendtype, &n);
if (ier != MPI_SUCCESS)
{
RETURN_CHECK(ier);
}
nsent = n * sendcnt;
ier = MPIDUMMY::MPI_Type_size(recvtype, &n);
if (ier != MPI_SUCCESS)
{
RETURN_CHECK(ier);
}
nrecv = n * recvcnt;
if (nrecv != nsent)
{
ier = MPI_ERR_COUNT;
}
if (ier == MPI_SUCCESS)
{
std::memcpy(recvbuf, sendbuf, nsent);
}
RETURN_CHECK(ier);
}
int MPI_Scatterv(const void *sendbuf, const int *sendcnts, const int *displs,
MPI_Datatype sendtype, void *recvbuf, int recvcnt,
MPI_Datatype recvtype, int root, MPI_Comm comm)
{
int ier = MPI_SUCCESS;
if (!sendcnts || !displs)
{
ier = MPI_ERR_BUFFER;
}
if (ier == MPI_SUCCESS)
{
ier = MPIDUMMY::MPI_Scatter(sendbuf, *sendcnts, sendtype, recvbuf,
recvcnt, recvtype, root, comm);
}
RETURN_CHECK(ier);
}
int MPI_Recv(void * /*recvbuffer*/, int /*count*/, MPI_Datatype /*type*/,
int /*source*/, int /*tag*/, MPI_Comm /*comm*/,
MPI_Status * /*status*/)
{
RETURN_CHECK(MPI_SUCCESS);
}
int MPI_Irecv(void * /*recvbuffer*/, int /*count*/, MPI_Datatype /*type*/,
int /*source*/, int /*tag*/, MPI_Comm /*comm*/,
MPI_Request * /*request*/)
{
RETURN_CHECK(MPI_SUCCESS);
}
int MPI_Send(const void * /*sendbuffer*/, int /*count*/, MPI_Datatype /*type*/,
int /*destination*/, int /*tag*/, MPI_Comm /*comm*/)
{
RETURN_CHECK(MPI_SUCCESS);
}
int MPI_Isend(const void * /*recvbuffer*/, int /*count*/, MPI_Datatype /*type*/,
int /*source*/, int /*tag*/, MPI_Comm /*comm*/,
MPI_Request * /*request*/)
{
RETURN_CHECK(MPI_SUCCESS);
}
int MPI_Wait(MPI_Request * /*request*/, MPI_Status * /*status*/)
{
RETURN_CHECK(MPI_SUCCESS);
}
double MPI_Wtime()
{
std::chrono::duration<double> now =
std::chrono::high_resolution_clock::now().time_since_epoch();
return now.count();
}
int MPI_Get_processor_name(char *name, int *resultlen)
{
std::sprintf(name, "0");
*resultlen = 1;
RETURN_CHECK(MPI_SUCCESS);
}
int MPI_Reduce(const void *sendbuf, void *recvbuf, int count,
MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm)
{
if (sendbuf == MPI_IN_PLACE)
{
RETURN_CHECK(MPI_SUCCESS);
}
int ier, size_of_type;
ier = MPIDUMMY::MPI_Type_size(datatype, &size_of_type);
if (ier != MPI_SUCCESS)
{
RETURN_CHECK(ier);
}
std::memcpy(recvbuf, sendbuf, count * static_cast<size_t>(size_of_type));
RETURN_CHECK(MPI_SUCCESS);
}
int MPI_Allreduce(const void *sendbuf, void *recvbuf, int count,
MPI_Datatype datatype, MPI_Op op, MPI_Comm comm)
{
return MPIDUMMY::MPI_Reduce(sendbuf, recvbuf, count, datatype, op, 0, comm);
}
int MPI_Type_size(MPI_Datatype datatype, int *size)
{
if (datatype == MPI_CHAR)
{
*size = sizeof(char);
}
else if (datatype == MPI_INT)
{
*size = sizeof(int);
}
else if (datatype == MPI_LONG)
{
*size = sizeof(long);
}
else if (datatype == MPI_UNSIGNED)
{
*size = sizeof(unsigned int);
}
else if (datatype == MPI_UNSIGNED_LONG)
{
*size = sizeof(unsigned long);
}
else if (datatype == MPI_UNSIGNED_LONG_LONG)
{
*size = sizeof(unsigned long long);
}
else
{
RETURN_CHECK(MPI_ERR_TYPE);
}
RETURN_CHECK(MPI_SUCCESS);
}
#ifdef ADIOS2_HAVE_MPI
} // end namespace mpidummy
} // end namespace helper
} // end namespace adios2
#endif
/*
* Distributed under the OSI-approved Apache License, Version 2.0. See
* accompanying file Copyright.txt for details.
*
* A dummy MPI 'implementation' for the BP READ API, to have an MPI-free version
* of the API
*
*/
#ifndef ADIOS2_HELPER_MPIDUMMY_H_
#define ADIOS2_HELPER_MPIDUMMY_H_
#include "stdint.h"
#include "stdio.h"
#include "adios2/common/ADIOSConfig.h"
// If MPI is available, use the types and constants from that implementation,
// rather than our fake ones
#ifdef ADIOS2_HAVE_MPI
#include <mpi.h>
#else
typedef int MPI_Comm;
typedef uint64_t MPI_Status;
typedef uint64_t MPI_Request;
typedef int MPI_Info;
typedef int MPI_Datatype;
typedef int MPI_Fint;
typedef int MPI_Op;
#define MPI_SUCCESS 0
#define MPI_ERR_BUFFER 1 /* Invalid buffer pointer */
#define MPI_ERR_COUNT 2 /* Invalid count argument */
#define MPI_ERR_TYPE 3 /* Invalid datatype argument */
#define MPI_ERR_TAG 4 /* Invalid tag argument */
#define MPI_ERR_COMM 5 /* Invalid communicator */
#define MPI_ERR_ROOT 6 /* Invalid root process */
#define MPI_ERR_INTERN 17 /* Invalid memory */
#define MPI_MAX_ERROR_STRING 512
#define MPI_MODE_RDONLY 1
#define MPI_MODE_WRONLY 2
#define MPI_MODE_RDWR (MPI_MODE_RDONLY | MPI_MODE_RDONLY)
#define MPI_MODE_CREATE MPI_MODE_WRONLY
#define MPI_MODE_EXCL 0
#define MPI_MODE_DELETE_ON_CLOSE 0
#define MPI_MODE_UNIQUE_OPEN 0
#define MPI_MODE_SEQUENTIAL 0
#define MPI_MODE_APPEND 4
#define MPI_SEEK_SET SEEK_SET
#define MPI_SEEK_CUR SEEK_CUR
#define MPI_SEEK_END SEEK_END
#define MPI_BYTE 1 /* I need the size of the type here */