Skip to content
Snippets Groups Projects
Commit afd38941 authored by Simon Heybrock's avatar Simon Heybrock
Browse files

Re #21181. Add Parallel::Status for recv.

parent d28b7fd6
No related branches found
No related tags found
No related merge requests found
...@@ -23,6 +23,7 @@ set ( INC_FILES ...@@ -23,6 +23,7 @@ set ( INC_FILES
inc/MantidParallel/IO/PulseTimeGenerator.h inc/MantidParallel/IO/PulseTimeGenerator.h
inc/MantidParallel/Nonblocking.h inc/MantidParallel/Nonblocking.h
inc/MantidParallel/Request.h inc/MantidParallel/Request.h
inc/MantidParallel/Status.h
inc/MantidParallel/StorageMode.h inc/MantidParallel/StorageMode.h
inc/MantidParallel/ThreadingBackend.h inc/MantidParallel/ThreadingBackend.h
) )
......
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include "MantidParallel/DllConfig.h" #include "MantidParallel/DllConfig.h"
#include "MantidParallel/Request.h" #include "MantidParallel/Request.h"
#include "MantidParallel/Status.h"
#include "MantidParallel/ThreadingBackend.h" #include "MantidParallel/ThreadingBackend.h"
#ifdef MPI_EXPERIMENTAL #ifdef MPI_EXPERIMENTAL
...@@ -62,7 +63,7 @@ public: ...@@ -62,7 +63,7 @@ public:
int rank() const; int rank() const;
int size() const; int size() const;
template <typename... T> void send(T &&... args) const; template <typename... T> void send(T &&... args) const;
template <typename... T> void recv(T &&... args) const; template <typename... T> Status recv(T &&... args) const;
template <typename... T> Request isend(T &&... args) const; template <typename... T> Request isend(T &&... args) const;
template <typename... T> Request irecv(T &&... args) const; template <typename... T> Request irecv(T &&... args) const;
...@@ -95,14 +96,14 @@ template <typename... T> void Communicator::send(T &&... args) const { ...@@ -95,14 +96,14 @@ template <typename... T> void Communicator::send(T &&... args) const {
backend().send(m_rank, std::forward<T>(args)...); backend().send(m_rank, std::forward<T>(args)...);
} }
template <typename... T> void Communicator::recv(T &&... args) const { template <typename... T> Status Communicator::recv(T &&... args) const {
// Not returning a status since it would usually not get initialized. See // Not returning a status since it would usually not get initialized. See
// http://mpi-forum.org/docs/mpi-1.1/mpi-11-html/node35.html#Node35. // http://mpi-forum.org/docs/mpi-1.1/mpi-11-html/node35.html#Node35.
#ifdef MPI_EXPERIMENTAL #ifdef MPI_EXPERIMENTAL
if (!hasBackend()) if (!hasBackend())
return static_cast<void>(m_communicator.recv(std::forward<T>(args)...)); return static_cast<void>(m_communicator.recv(std::forward<T>(args)...));
#endif #endif
backend().recv(m_rank, std::forward<T>(args)...); return backend().recv(m_rank, std::forward<T>(args)...);
} }
template <typename... T> Request Communicator::isend(T &&... args) const { template <typename... T> Request Communicator::isend(T &&... args) const {
......
#ifndef MANTID_PARALLEL_STATUS_H_
#define MANTID_PARALLEL_STATUS_H_
#include "MantidParallel/DllConfig.h"
#include <boost/optional/optional.hpp>
#ifdef MPI_EXPERIMENTAL
#include <boost/mpi/status.hpp>
#endif
namespace Mantid {
namespace Parallel {
namespace detail {
class ThreadingBackend;
}
/** Wrapper for boost::mpi::status. For non-MPI builds an equivalent
implementation is provided.
@author Simon Heybrock
@date 2017
Copyright &copy; 2017 ISIS Rutherford Appleton Laboratory, NScD Oak Ridge
National Laboratory & European Spallation Source
This file is part of Mantid.
Mantid is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 3 of the License, or
(at your option) any later version.
Mantid is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
File change history is stored at: <https://github.com/mantidproject/mantid>
Code Documentation is available at: <http://doxygen.mantidproject.org>
*/
class MANTID_PARALLEL_DLL Status {
public:
#ifdef MPI_EXPERIMENTAL
Status(const boost::mpi::status &status);
#endif
template <typename T> boost::optional<int> count() const {
#ifdef MPI_EXPERIMENTAL
if (!m_threadingBackend)
return m_status.count<T>();
#endif
return static_cast<int>(m_size / sizeof(T));
}
private:
Status(const size_t size) : m_size(size), m_threadingBackend(true) {}
#ifdef MPI_EXPERIMENTAL
boost::mpi::status m_status;
#endif
const size_t m_size{0};
const bool m_threadingBackend{false};
// For accessing constructor based on size.
friend class detail::ThreadingBackend;
};
} // namespace Parallel
} // namespace Mantid
#endif /* MANTID_PARALLEL_STATUS_H_ */
...@@ -3,6 +3,7 @@ ...@@ -3,6 +3,7 @@
#include "MantidParallel/DllConfig.h" #include "MantidParallel/DllConfig.h"
#include "MantidParallel/Request.h" #include "MantidParallel/Request.h"
#include "MantidParallel/Status.h"
#include "MantidKernel/make_unique.h" #include "MantidKernel/make_unique.h"
#include <boost/archive/binary_oarchive.hpp> #include <boost/archive/binary_oarchive.hpp>
...@@ -63,7 +64,7 @@ public: ...@@ -63,7 +64,7 @@ public:
void send(int source, int dest, int tag, T &&... args); void send(int source, int dest, int tag, T &&... args);
template <typename... T> template <typename... T>
void recv(int dest, int source, int tag, T &&... args); Status recv(int dest, int source, int tag, T &&... args);
template <typename... T> template <typename... T>
Request isend(int source, int dest, int tag, T &&... args); Request isend(int source, int dest, int tag, T &&... args);
...@@ -92,22 +93,32 @@ void saveToStream(boost::archive::binary_oarchive &oa, ...@@ -92,22 +93,32 @@ void saveToStream(boost::archive::binary_oarchive &oa,
template <class T> template <class T>
void saveToStream(boost::archive::binary_oarchive &oa, const T *data, void saveToStream(boost::archive::binary_oarchive &oa, const T *data,
const size_t count) { const size_t count) {
oa.operator<<(count);
for (size_t i = 0; i < count; ++i) for (size_t i = 0; i < count; ++i)
oa.operator<<(data[i]); oa.operator<<(data[i]);
} }
template <class T> template <class T>
void loadFromStream(boost::archive::binary_iarchive &ia, T &data) { size_t loadFromStream(boost::archive::binary_iarchive &ia, T &data) {
ia.operator>>(data); ia.operator>>(data);
return sizeof(T);
} }
template <class T> template <class T>
void loadFromStream(boost::archive::binary_iarchive &ia, std::vector<T> &data) { size_t loadFromStream(boost::archive::binary_iarchive &ia,
std::vector<T> &data) {
ia.operator>>(data); ia.operator>>(data);
return data.size() * sizeof(T);
} }
template <class T> template <class T>
void loadFromStream(boost::archive::binary_iarchive &ia, T *data, size_t loadFromStream(boost::archive::binary_iarchive &ia, T *data,
const size_t count) { const size_t count) {
for (size_t i = 0; i < count; ++i) size_t received;
ia.operator>>(received);
for (size_t i = 0; i < count; ++i) {
if (i >= received)
return i * sizeof(T);
ia.operator>>(data[i]); ia.operator>>(data[i]);
}
return count * sizeof(T);
} }
} }
...@@ -131,7 +142,7 @@ void ThreadingBackend::send(int source, int dest, int tag, T &&... args) { ...@@ -131,7 +142,7 @@ void ThreadingBackend::send(int source, int dest, int tag, T &&... args) {
} }
template <typename... T> template <typename... T>
void ThreadingBackend::recv(int dest, int source, int tag, T &&... args) { Status ThreadingBackend::recv(int dest, int source, int tag, T &&... args) {
const auto key = std::make_tuple(source, dest, tag); const auto key = std::make_tuple(source, dest, tag);
std::unique_ptr<std::stringbuf> buf; std::unique_ptr<std::stringbuf> buf;
while (true) { while (true) {
...@@ -151,7 +162,7 @@ void ThreadingBackend::recv(int dest, int source, int tag, T &&... args) { ...@@ -151,7 +162,7 @@ void ThreadingBackend::recv(int dest, int source, int tag, T &&... args) {
} }
std::istream is(buf.get()); std::istream is(buf.get());
boost::archive::binary_iarchive ia(is); boost::archive::binary_iarchive ia(is);
detail::loadFromStream(ia, std::forward<T>(args)...); return Status(detail::loadFromStream(ia, std::forward<T>(args)...));
} }
template <typename... T> template <typename... T>
......
...@@ -25,6 +25,27 @@ void send_recv(const Communicator &comm) { ...@@ -25,6 +25,27 @@ void send_recv(const Communicator &comm) {
} }
} }
void send_recv_status(const Communicator &comm) {
if (comm.size() < 2)
return;
std::vector<double> data{1.1, 2.2};
if (comm.rank() == 0)
(comm.send(1, 123, data.data(), 2));
(comm.send(1, 123, data.data(), 1));
if (comm.rank() == 1) {
std::vector<double> result1(2);
const auto status1 = comm.recv(0, 123, result1.data(), 2);
TS_ASSERT_EQUALS(*status1.count<double>(), 2);
TS_ASSERT_EQUALS(result1, data);
std::vector<double> result2(2);
const auto status2 = comm.recv(0, 123, result2.data(), 2);
TS_ASSERT_EQUALS(*status2.count<double>(), 1);
TS_ASSERT_EQUALS(result2, (std::vector<double>{1.1, 0.0}));
}
}
void isend_recv(const Communicator &comm) { void isend_recv(const Communicator &comm) {
int64_t data = 123456789 + comm.rank(); int64_t data = 123456789 + comm.rank();
int dest = (comm.rank() + 1) % comm.size(); int dest = (comm.rank() + 1) % comm.size();
...@@ -90,6 +111,8 @@ public: ...@@ -90,6 +111,8 @@ public:
void test_send_recv() { runParallel(send_recv); } void test_send_recv() { runParallel(send_recv); }
void test_send_recv_status() { runParallel(send_recv_status); }
void test_isend_recv() { runParallel(isend_recv); } void test_isend_recv() { runParallel(isend_recv); }
void test_send_irecv() { runParallel(send_irecv); } void test_send_irecv() { runParallel(send_irecv); }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment