Unverified Commit 0067de9b authored by Peter Doak's avatar Peter Doak Committed by GitHub
Browse files

Merge pull request #138 from gbalduzz/optimize_summit_mpi

Optimize summit mpi
parents 4382f69e a9f24894
Loading
Loading
Loading
Loading
+99 −0
Original line number Diff line number Diff line
// Copyright (C) 2018 ETH Zurich
// Copyright (C) 2018 UT-Battelle, LLC
// All rights reserved.
//
// See LICENSE for terms of usage.
// See CITATION.md for citation guidelines, if DCA++ is used for scientific publications.
//
// Author: Giovanni Balduzzi (gbalduzz@itp.phys.ethz.ch)
//
// This file implements a class to store a subset of a domain distributed among different MPI ranks.
//
// INTERNAL: This class can only be used with dmn_variadic when it's wrapped with dmn_0.

#ifndef DCA_FUNCTION_DOMAINS_LOCAL_DOMAIN_HPP
#define DCA_FUNCTION_DOMAINS_LOCAL_DOMAIN_HPP

#include <iostream>
#include <stdexcept>
#include <string>
#include <vector>

#include "dca/util/integer_division.hpp"

namespace dca {
namespace func {
// dca::func::

template <typename BaseDomain, int id = 0>
class LocalDomain {
public:
  using element_type = typename BaseDomain::element_type;  // For putting LocalDomain in dmn_0.
  using this_type = LocalDomain<BaseDomain>;

  template <class Grouping>
  static void initialize(const Grouping& group);

  static std::size_t get_physical_size() {
    return elements_.size();
  }
  static std::size_t get_offset() {
    return offset_;
  }

  static std::size_t get_size() {
    assert(initialized_);
    return padded_size_;
  }
  static std::string get_name() {
    return "Local_" + BaseDomain::get_name();
  }
  static const std::vector<element_type>& get_elements() {
    assert(initialized_);
    return elements_;
  }
  static bool is_initialized() {
    return initialized_;
  }

private:
  static bool initialized_;
  static std::vector<element_type> elements_;
  static std::size_t padded_size_;
  static std::size_t offset_;
};

template <class BaseDomain, int id>
bool LocalDomain<BaseDomain, id>::initialized_ = false;
template <class BaseDomain, int id>
std::vector<typename LocalDomain<BaseDomain, id>::element_type> LocalDomain<BaseDomain, id>::elements_;
template <class BaseDomain, int id>
std::size_t LocalDomain<BaseDomain, id>::padded_size_ = 0;
template <class BaseDomain, int id>
std::size_t LocalDomain<BaseDomain, id>::offset_ = 0;

template <class BaseDomain, int id>
template <class Grouping>
void LocalDomain<BaseDomain, id>::initialize(const Grouping& group) {
  if (initialized_) {
    throw(std::logic_error("Domain " + get_name() + " is already initialized."));
  }

  const std::size_t global_size = BaseDomain::get_size();
  padded_size_ = dca::util::ceilDiv(global_size, std::size_t(group.get_size()));

  const std::size_t start = std::min(padded_size_ * group.get_id(), global_size);
  const std::size_t end = std::min(start + padded_size_, global_size);

  const auto physical_size = end - start;
  offset_ = start;
  elements_.resize(physical_size);
  std::copy_n(BaseDomain::get_elements().data() + start, physical_size, elements_.data());

  initialized_ = true;
}

}  // namespace func
}  // namespace dca

#endif  // DCA_FUNCTION_DOMAINS_LOCAL_DOMAIN_HPP
+18 −9
Original line number Diff line number Diff line
@@ -24,6 +24,10 @@ class Buffer : public std::vector<unsigned char> {
public:
  using Container = std::vector<unsigned char>;

  Buffer() = default;

  Buffer(std::size_t n) : Container(n) {}

  // Copies obj in the buffer as a plain sequence of bytes.
  template <class T, typename = std::enable_if_t<std::is_trivially_copyable<T>::value>>
  Buffer& operator<<(const T& obj);
@@ -46,6 +50,13 @@ public:
  template <class T1, class T2>
  Buffer& operator>>(std::pair<T1, T2>& p);

  // Copies n objects of type T in the buffer as a plain sequence of bytes.
  template <class T>
  void write(const T* ptr, std::size_t n = 1);
  // Read n objects o f type T from the buffer as a plain sequence of bytes.
  template <class T>
  void read(T* ptr, std::size_t n = 1);

  // Retrieves the position of the next read.
  std::size_t tellg() const {
    return read_idx_;
@@ -55,14 +66,12 @@ public:
    read_idx_ = idx;
  }

private:
  // Copies n objects of type T in the buffer as a plain sequence of bytes.
  template <class T>
  void write(const T* ptr, std::size_t n = 1);
  // Read n objects o f type T from the buffer as a plain sequence of bytes.
  template <class T>
  void read(T* ptr, std::size_t n = 1);
  void clear() {
    Container::clear();
    read_idx_ = 0;
  }

private:
  std::size_t read_idx_ = 0;
};

@@ -152,7 +161,7 @@ Buffer& Buffer::operator>>(std::pair<T1, T2>& p) {
  return (*this) >> p.first >> p.second;
}

}  // io
}  // dca
}  // namespace io
}  // namespace dca

#endif  // DCA_IO_BUFFER_HPP
+7 −2
Original line number Diff line number Diff line
@@ -29,6 +29,7 @@ template <typename ScalarType, DeviceType device_name = linalg::CPU>
class MatrixView {
public:
  using Pair = std::pair<int, int>;
  MatrixView(ScalarType* data, Pair size);
  MatrixView(ScalarType* data, Pair size, int ld);
  MatrixView(ScalarType* data, int size, int ld);
  MatrixView(ScalarType* data, int size);
@@ -93,6 +94,10 @@ private:
  const std::pair<int, int> size_;
};

template <typename ScalarType, DeviceType device_t>
MatrixView<ScalarType, device_t>::MatrixView(ScalarType* const data, const Pair size)
    : MatrixView(data, size, size.first) {}

template <typename ScalarType, DeviceType device_t>
MatrixView<ScalarType, device_t>::MatrixView(ScalarType* const data, const Pair size, const int ld)
    : ptr_(data), ldm_(ld), size_(size) {}
@@ -189,7 +194,7 @@ auto inline makeConstantView(const Matrix<ScalarType, device_t>& mat, const int
                                                mat.leadingDimension());
}

}  // linalg
}  // dca
}  // namespace linalg
}  // namespace dca

#endif  // DCA_LINALG_MATRIX_VIEW_HPP
+163 −35
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@

#include "dca/function/domains.hpp"
#include "dca/function/function.hpp"
#include "dca/io/buffer.hpp"
#include "dca/linalg/matrix.hpp"
#include "dca/linalg/vector.hpp"
#include "dca/parallel/mpi_concurrency/mpi_processor_grouping.hpp"
@@ -40,12 +41,13 @@ class MPICollectiveSum : public virtual MPIProcessorGrouping {
public:
  MPICollectiveSum() = default;

  // Wrappers to MPI_Allreduce.
  template <typename scalar_type>
  void sum(scalar_type& value) const;
  template <typename scalar_type>
  void sum(std::vector<scalar_type>& m) const;
  template <typename scalartype>
  void sum(std::map<std::string, std::vector<scalartype>>& m) const;
  void sum(std::map<std::string, std::vector<scalartype>>& m);
  template <typename scalar_type, class domain>
  void sum(func::function<scalar_type, domain>& f) const;
  template <typename scalar_type, class domain>
@@ -58,6 +60,22 @@ public:
  template <typename scalar_type>
  void sum(linalg::Matrix<scalar_type, linalg::CPU>& f) const;

  // Wrapper to MPI_Reduce.
  template <typename Scalar, class Domain>
  void localSum(func::function<Scalar, Domain>& f, int root_id) const;

  // Delay the execution of sum (implemented with MPI_Allreduce) until 'resolveSums' is called,
  // or 'delayedSum' is called with an object of different Scalar type.
  template <typename Scalar>
  void delayedSum(Scalar& obj);
  template <typename Scalar>
  void delayedSum(std::vector<Scalar>& f);
  template <typename Scalar, class domain>
  void delayedSum(func::function<Scalar, domain>& f);

  // Execute all the reductions scheduled with 'delayedSum' or delayed leaveOneOutSum out calls.
  void resolveSums();

  template <typename some_type>
  void sum_and_average(some_type& obj, int nr_meas_rank = 1) const;
  template <typename some_type>
@@ -74,20 +92,22 @@ public:
  // in s.
  // Does nothing, if there is only one rank.
  // In/Out: s
  // In: delay. If true delay the sum until 'resolveSums' is called.
  template <typename T>
  void leaveOneOutSum(T& s) const;
  void leaveOneOutSum(T& s, bool delay = 0);

  // Element-wise implementations for dca::func::function.
  // In/Out: f
  // In: delay. If true delay the sum until 'resolveSums' is called.
  template <typename Scalar, class Domain>
  void leaveOneOutSum(func::function<Scalar, Domain>& f) const;
  void leaveOneOutSum(func::function<Scalar, Domain>& f, bool delay = 0);

  // Computes the average of s over all ranks excluding the local value and stores the result back
  // in s.
  // Does nothing, if there is only one rank.
  // In/Out: s
  template <typename T>
  void leaveOneOutAvg(T& s) const;
  void leaveOneOutAvg(T& s);

  // Computes and returns the element-wise jackknife error
  // \Delta f_{jack} = \sqrt( (n-1)/n \sum_i^n (f_i - f_avg)^2 ),
@@ -142,8 +162,25 @@ public:
                                           const std::vector<int>& orders) const;

private:
  // Compute the sum on process 'rank_id', or all processes if rank_id == -1.
  template <typename T>
  void sum(const T* in, T* out, std::size_t n, int rank_id = -1) const;

  template <typename T>
  void sum(const T* in, T* out, std::size_t n) const;
  void delayedSum(T* in, std::size_t n);
  template <typename T>
  void delayedSum(std::complex<T>* in, std::size_t n);

  template <typename T>
  void resolveSumsImplementation();

  // Objects for delayed sums and "leave one out" sums.
  MPI_Datatype current_type_ = 0;
  io::Buffer packed_;
  // Each vector entry stores the properties of a single delayed sum.
  std::vector<char*> pointers_;
  std::vector<std::size_t> sizes_;  // number of scalars involved in each operation, not bytes.
  std::vector<std::int8_t> remove_local_value_;  // use int8_t to avoid vector<bool> issues.
};

template <typename scalar_type>
@@ -165,23 +202,13 @@ void MPICollectiveSum::sum(std::vector<scalar_type>& m) const {
  m = std::move(result);
}

template <typename scalar_type>
void MPICollectiveSum::sum(std::map<std::string, std::vector<scalar_type>>& m) const {
  typedef typename std::map<std::string, std::vector<scalar_type>>::iterator iterator_type;

  iterator_type it = m.begin();

  for (; it != m.end(); ++it) {
    std::vector<scalar_type> values((it->second).size());

    for (size_t l = 0; l < (it->second).size(); l++)
      values[l] = (it->second)[l];

    sum(values);

    for (size_t l = 0; l < (it->second).size(); l++)
      (it->second)[l] = values[l];
template <typename Scalar>
void MPICollectiveSum::sum(std::map<std::string, std::vector<Scalar>>& m) {
  for (auto it = m.begin(); it != m.end(); ++it) {
    delayedSum((it->second));
  }

  resolveSums();
}

template <typename scalar_type, class domain>
@@ -255,6 +282,18 @@ void MPICollectiveSum::sum(linalg::Matrix<scalar_type, linalg::CPU>& f) const {
  f = std::move(F);
}

template <typename scalar_type, class domain>
void MPICollectiveSum::localSum(func::function<scalar_type, domain>& f, int id) const {
  if (id < 0 || id > get_size())
    throw(std::out_of_range("id out of range."));

  func::function<scalar_type, domain> f_sum;

  sum(f.values(), f_sum.values(), f.size(), id);

  f = std::move(f_sum);
}

template <typename some_type>
void MPICollectiveSum::sum_and_average(some_type& obj, const int nr_meas_rank) const {
  sum(obj);
@@ -275,29 +314,40 @@ void MPICollectiveSum::sum_and_average(const some_type& in, some_type& out,
}

template <typename Scalar>
void MPICollectiveSum::leaveOneOutSum(Scalar& s) const {
void MPICollectiveSum::leaveOneOutSum(Scalar& s, bool delay) {
  if (MPIProcessorGrouping::get_size() == 1)
    return;

  const Scalar s_local(s);
  if (delay == false) {
    sum(s);
    s = s - s_local;
  }
  else {
    delayedSum(s);
    remove_local_value_.back() = true;
  }
}

template <typename Scalar, class Domain>
void MPICollectiveSum::leaveOneOutSum(func::function<Scalar, Domain>& f) const {
void MPICollectiveSum::leaveOneOutSum(func::function<Scalar, Domain>& f, const bool delay) {
  if (MPIProcessorGrouping::get_size() == 1)
    return;

  if (delay == false) {
    const func::function<Scalar, Domain> f_local(f);
    sum(f_local, f);

    for (int i = 0; i < f.size(); ++i)
      f(i) = f(i) - f_local(i);
  }
  else {
    delayedSum(f);
    remove_local_value_.back() = false;
  }
}

template <typename T>
void MPICollectiveSum::leaveOneOutAvg(T& x) const {
void MPICollectiveSum::leaveOneOutAvg(T& x) {
  if (MPIProcessorGrouping::get_size() == 1)
    return;

@@ -508,17 +558,95 @@ std::vector<Scalar> MPICollectiveSum::avgNormalizedMomenta(const func::function<
}

template <typename T>
void MPICollectiveSum::sum(const T* in, T* out, std::size_t n) const {
  // On summit large messages hangs if sizeof(floating point type) type * message_size > 2^31-1.
void MPICollectiveSum::sum(const T* in, T* out, std::size_t n, int root_id) const {
  // On summit large messages hangs if sizeof(floating point type) * message_size > 2^31-1.
  constexpr std::size_t max_size = dca::util::IsComplex<T>::value
                                       ? 2 * (std::numeric_limits<int>::max() / sizeof(T))
                                       : std::numeric_limits<int>::max() / sizeof(T);

  for (std::size_t start = 0; start < n; start += max_size) {
    const int msg_size = std::min(n - start, max_size);
    if (root_id == -1) {
      MPI_Allreduce(in + start, out + start, msg_size, MPITypeMap<T>::value(), MPI_SUM,
                    MPIProcessorGrouping::get());
    }
    else {
      MPI_Reduce(in + start, out + start, msg_size, MPITypeMap<T>::value(), MPI_SUM, root_id,
                 MPIProcessorGrouping::get());
    }
  }
}

template <typename Scalar>
void MPICollectiveSum::delayedSum(Scalar& obj) {
  delayedSum(&obj, 1);
}

template <typename Scalar>
void MPICollectiveSum::delayedSum(std::vector<Scalar>& v) {
  delayedSum(v.data(), v.size());
}

template <typename Scalar, class Domain>
void MPICollectiveSum::delayedSum(func::function<Scalar, Domain>& f) {
  delayedSum(f.values(), f.size());
}

template <typename T>
void MPICollectiveSum::delayedSum(std::complex<T>* in, std::size_t n) {
  delayedSum(reinterpret_cast<T*>(in), n * 2);
}

template <typename T>
void MPICollectiveSum::delayedSum(T* in, std::size_t n) {
  if (current_type_ != 0 && current_type_ != MPITypeMap<T>::value())
    resolveSums();

  current_type_ = MPITypeMap<T>::value();
  pointers_.push_back(reinterpret_cast<char*>(in));
  sizes_.push_back(n);
  remove_local_value_.push_back(false);
  packed_.write(in, n);
}

// TODO: move to .cpp file.
inline void MPICollectiveSum::resolveSums() {
  // Note: we can not use a switch statement as MPI_Datatype is not integer on the Summit system.
  if (current_type_ == MPI_DOUBLE)
    return resolveSumsImplementation<double>();
  else if (current_type_ == MPI_FLOAT)
    return resolveSumsImplementation<float>();
  else if (current_type_ == MPI_UNSIGNED_LONG)
    return resolveSumsImplementation<unsigned long int>();
  else
    throw(std::logic_error("Type not supported."));
}

template <typename T>
inline void MPICollectiveSum::resolveSumsImplementation() {
  io::Buffer output(packed_.size());

  sum(reinterpret_cast<T*>(packed_.data()), reinterpret_cast<T*>(output.data()),
      packed_.size() / sizeof(T));

  for (int i = 0; i < pointers_.size(); ++i) {
    const std::size_t position = output.tellg();
    output.read(pointers_[i], sizes_[i] * sizeof(T));

    if (remove_local_value_[i]) {
      // Subtract local value from sum.
      T* sum = reinterpret_cast<T*>(pointers_[i]);
      const T* local = reinterpret_cast<T*>(packed_.data() + position);
      // sum[0:n] = sum[0:n] - local[0:n]
      std::transform(sum, sum + sizes_[i], local, sum, std::minus<T>());
    }
  }

  current_type_ = 0;
  sizes_.clear();
  remove_local_value_.clear();
  pointers_.clear();
  packed_.clear();
}

}  // namespace parallel
+9 −4
Original line number Diff line number Diff line
@@ -25,6 +25,8 @@
#include "dca/parallel/mpi_concurrency/mpi_collective_min.hpp"
#include "dca/parallel/mpi_concurrency/mpi_collective_sum.hpp"
#include "dca/parallel/mpi_concurrency/mpi_initializer.hpp"
#include "dca/parallel/mpi_concurrency/mpi_gang.hpp"
#include "dca/parallel/mpi_concurrency/mpi_gather.hpp"
#include "dca/parallel/mpi_concurrency/mpi_packing.hpp"
#include "dca/parallel/mpi_concurrency/mpi_processor_grouping.hpp"
#include "dca/parallel/util/get_bounds.hpp"
@@ -34,12 +36,15 @@ namespace parallel {
// dca::parallel::

class MPIConcurrency final : public virtual MPIInitializer,
                             private virtual MPIProcessorGrouping,
                             public virtual MPIProcessorGrouping,
                             public MPIPacking,
                             public MPICollectiveMax,
                             public MPICollectiveMin,
                             public MPICollectiveSum {
                             public MPICollectiveSum,
                             public MPIGather {
public:
  using Gang = MPIGang;

  MPIConcurrency(int argc, char** argv);

  inline int id() const {
@@ -147,7 +152,7 @@ bool MPIConcurrency::broadcast_object(object_type& object, int root_id) const {
  return true;
}

}  // parallel
}  // dca
}  // namespace parallel
}  // namespace dca

#endif  // DCA_PARALLEL_MPI_CONCURRENCY_MPI_CONCURRENCY_HPP
Loading