Commit 30d8a648 authored by Doak, Peter W.'s avatar Doak, Peter W.
Browse files

Merge branch 'distG4_pr' into adios2_distG4_pr

parents a047ca8a c3d865ce
Loading
Loading
Loading
Loading
+46 −5
Original line number Diff line number Diff line
@@ -32,6 +32,9 @@
#include "dca/util/pack_operations.hpp"
#include "dca/util/type_utils.hpp"

#include "dca/parallel/util/get_workload.hpp"
#include "mpi.h"

namespace dca {
namespace func {
// dca::func::
@@ -47,7 +50,9 @@ public:
  // Default constructor
  // Constructs the function with the name name.
  // Postcondition: All elements are set to zero.
  function(const std::string& name = default_name_);
  // Special case: when distributed_g4_enabled, G4 related variables only gets
  // allocation of 1/p of original G4 size, where p = #mpiranks
  function(const std::string& name = default_name_, const bool distributed_g4_enabled = false);

  // Copy constructor
  // Constructs the function with the a copy of elements and name of other.
@@ -111,6 +116,10 @@ public:
  std::size_t size() const {
    return Nb_elements;
  }

  void resize(std::size_t Nb_elements_new) {
    Nb_elements = Nb_elements_new;
  }
  // Returns the size of the leaf domain with the given index.
  // Does not return function values!
  int operator[](const int index) const {
@@ -156,12 +165,18 @@ public:
  void linind_2_subind(int linind, int* subind) const;
  // std::vector version
  void linind_2_subind(int linind, std::vector<int>& subind) const;
  // modern RVO version
  std::vector<int> linind_2_subind(int linind) const;

  
  // Computes the linear index for the given subindices of the leaf domains.
  // Precondition: subind stores the the subindices of all LEAF domains.
  // TODO: Use std::array or std::vector to be able to check the size of subind.
  void subind_2_linind(const int* subind, int& linind) const;

  // using standard vector and avoiding returning argument
  int subind_2_linind(const std::vector<int>& subind) const;

  // Computes and returns the linear index for the given subindices of the branch or leaf domains,
  // depending on the size of subindices.
  // Enable only if all arguments are integral to prevent subind_to_linind(int*, int) to resolve to
@@ -276,7 +291,7 @@ template <typename scalartype, class domain>
const std::string function<scalartype, domain>::default_name_ = "no-name";

template <typename scalartype, class domain>
function<scalartype, domain>::function(const std::string& name)
function<scalartype, domain>::function(const std::string& name, const bool distributed_g4_enabled)
    : name_(name),
      function_type(__PRETTY_FUNCTION__),
      dmn(),
@@ -285,6 +300,13 @@ function<scalartype, domain>::function(const std::string& name)
      size_sbdm(dmn.get_leaf_domain_sizes()),
      step_sbdm(dmn.get_leaf_domain_steps()),
      fnc_values(nullptr) {
  if(name.substr(0, 2) == "G4" && distributed_g4_enabled)
  {
    int my_rank, mpi_size;
    MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
    MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
    Nb_elements = dca::parallel::util::getWorkload(dmn.get_size(), mpi_size, my_rank);
  }
    fnc_values = new scalartype[Nb_elements];
    for (int linind = 0; linind < Nb_elements; ++linind)
      setToZero(fnc_values[linind]);
@@ -421,6 +443,17 @@ void function<scalartype, domain>::linind_2_subind(int linind, std::vector<int>&
  }
}

template <typename scalartype, class domain>
std::vector<int> function<scalartype, domain>::linind_2_subind(int linind) const {
  std::vector<int> subind(Nb_sbdms);
  for (int i = 0; i < int(size_sbdm.size()); ++i) {
    subind[i] = linind % size_sbdm[i];
    linind = (linind - subind[i]) / size_sbdm[i];
  }
  return subind;
}


template <typename scalartype, class domain>
void function<scalartype, domain>::subind_2_linind(const int* const subind, int& linind) const {
  linind = 0;
@@ -428,6 +461,14 @@ void function<scalartype, domain>::subind_2_linind(const int* const subind, int&
    linind += subind[i] * step_sbdm[i];
}

template <typename scalartype, class domain>
int function<scalartype, domain>::subind_2_linind(const std::vector<int>& subind) const {
  int linind = 0;
  for (int i = 0; i < int(step_sbdm.size()); ++i)
    linind += subind[i] * step_sbdm[i];
  return linind;
}

template <typename scalartype, class domain>
scalartype& function<scalartype, domain>::operator()(const int* const subind) {
  int linind;
+6 −10
Original line number Diff line number Diff line
@@ -218,12 +218,11 @@ ReshapableMatrix<ScalarType, device_name, Allocator>::ReshapableMatrix(
template <typename ScalarType, DeviceType device_name, class Allocator>
ReshapableMatrix<ScalarType, device_name, Allocator>& ReshapableMatrix<
    ScalarType, device_name, Allocator>::operator=(const ThisType& rhs) {
  size_ = rhs.size_;
  capacity_ = rhs.capacity_;

  Allocator::deallocate(data_);
  data_ = Allocator::allocate(capacity_);
  if (this != &rhs) {
    resizeNoCopy(rhs.size_);
    util::memoryCopy(data_, leadingDimension(), rhs.data_, rhs.leadingDimension(), size_);
  }

  return *this;
}

@@ -232,12 +231,9 @@ template <DeviceType rhs_device_name, class AllocatorRhs>
ReshapableMatrix<ScalarType, device_name, Allocator>& ReshapableMatrix<
    ScalarType, device_name,
    Allocator>::operator=(const ReshapableMatrix<ScalarType, rhs_device_name, AllocatorRhs>& rhs) {
  size_ = rhs.size_;
  capacity_ = rhs.capacity_;

  Allocator::deallocate(data_);
  data_ = Allocator::allocate(capacity_);
  resizeNoCopy(rhs.size_);
  util::memoryCopy(data_, leadingDimension(), rhs.data_, rhs.leadingDimension(), size_);

  return *this;
}

+76 −0
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
#define DCA_PARALLEL_MPI_CONCURRENCY_MPI_COLLECTIVE_SUM_HPP

#include <algorithm>  // std::min
#include <numeric>  // std::partial_sum
#include <map>
#include <string>
#include <utility>  // std::move, std::swap
@@ -64,6 +65,12 @@ public:
  template <typename Scalar, class Domain>
  void localSum(func::function<Scalar, Domain>& f, int root_id) const;

  // Wrapper to MPI_Reduce. Gathers into specified locations from all processes in a group
  // Designed for collecting G4 when distributed_g4_enabled() == true but only for testing purpose
  // As if G4 is large enough that cannot fit into one GPU, one should not call this method
  template <typename Scalar, class Domain>
  void gatherv(func::function<Scalar, Domain>& f, int root_id) const;

  // Delay the execution of sum (implemented with MPI_Allreduce) until 'resolveSums' is called,
  // or 'delayedSum' is called with an object of different Scalar type.
  template <typename Scalar>
@@ -166,6 +173,12 @@ private:
  template <typename T>
  void sum(const T* in, T* out, std::size_t n, int rank_id = -1) const;

  // Gather results accross ranks on process 'rank_id'
  // Designed for collecting G4 when distributed_g4_enabled() == true but only for testing purpose
  // As if G4 is large enough that cannot fit into one GPU, one should not call this method
  template <typename T>
  void gatherv_helper(const T* in, T* out, std::size_t total_size, int root_id = 0) const;

  template <typename T>
  void delayedSum(T* in, std::size_t n);
  template <typename T>
@@ -294,6 +307,26 @@ void MPICollectiveSum::localSum(func::function<scalar_type, domain>& f, int id)
  f = std::move(f_sum);
}

template <typename scalar_type, class domain>
void MPICollectiveSum::gatherv(func::function<scalar_type, domain>& f, int id) const {
    if (id < 0 || id > get_size())
        throw(std::out_of_range("id out of range."));

    func::function<scalar_type, domain> f_sum;

    int my_rank, mpi_size;
    MPI_Comm_size(MPI_COMM_WORLD, &mpi_size);
    MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);

    if(my_rank == 0)
    {
        std::cout << "\n *********performing MPI Gatherv Sum************* \n";
    }

    my_rank == 0 ? gatherv_helper(f.values(), f.values(), f.get_domain().get_size(), id)
        : gatherv_helper(f.values(), f_sum.values(), f.get_domain().get_size(), id);
}

template <typename some_type>
void MPICollectiveSum::sum_and_average(some_type& obj, const int nr_meas_rank) const {
  sum(obj);
@@ -577,6 +610,49 @@ void MPICollectiveSum::sum(const T* in, T* out, std::size_t n, int root_id) cons
  }
}

template <typename T>
void MPICollectiveSum::gatherv_helper(const T* in, T* out, std::size_t total_size, int root_id) const {
    int mpi_size = MPIProcessorGrouping::get_size();
    int my_rank = MPIProcessorGrouping::get_id();

    uint64_t local_work = total_size / mpi_size;
    uint64_t more_work_before_index;

    std::vector<int> ranks_workload(mpi_size, local_work);
    // displs: integer array (of length group size).
    // We reserve mpi_size + 1 space, one extra space to fit STL algorithm logic.
    // Entry i specifies the displacement relative to recvbuf at which to place
    // the incoming data from process i (significant only at root)
    std::vector<int> displs(mpi_size + 1, 0);
    int* p_ranks_workload = ranks_workload.data();
    int* p_displs = displs.data();

    bool balanced = (total_size % mpi_size) == 0 ? true : false;

    if(balanced)
    {
        // offset displs for each rank
        std::partial_sum(ranks_workload.begin(), ranks_workload.end(),
                displs.begin() + 1, std::plus<int>());
        // remove last running sum
        displs.pop_back();
    }
    else
    {
        more_work_before_index = total_size % mpi_size;
        std::transform(ranks_workload.begin(), ranks_workload.begin() + more_work_before_index,
                       ranks_workload.begin(), [](int ele){ return ele+1; });
        std::partial_sum(ranks_workload.begin(), ranks_workload.end(),
                displs.begin() + 1, std::plus<int>());
        // remove last running sum
        displs.pop_back();
    }

    MPI_Gatherv(in, ranks_workload[my_rank], MPITypeMap<T>::value(),
            out, p_ranks_workload, p_displs, MPITypeMap<T>::value(),
            root_id, MPIProcessorGrouping::get());
}

template <typename Scalar>
void MPICollectiveSum::delayedSum(Scalar& obj) {
  delayedSum(&obj, 1);
+3 −0
Original line number Diff line number Diff line
@@ -39,6 +39,9 @@ public:
  template<class T>
  void localSum(const T& , int ){}

  template<class T>
  void gatherv(const T& , int ){}

  template <class T>
  void delayedSum(T&) const {}
  void resolveSums() const {}
+33 −0
Original line number Diff line number Diff line
@@ -42,6 +42,39 @@ int getWorkload(const unsigned int total_work, const unsigned int n_local_worker
  return getWorkload(local_work, n_local_workers, local_id);
}

/** This returns the first and last linear indexes, not the last + 1
 *
 *  i.e. write for(index i = 0; i <= end; ++i) ... 
 *  this with getting the proper subindices and this being integral indexes and not iterators
 */
inline void getComputeRange(const int& my_rank, const int& mpi_size,
                            const uint64_t& total_G4_size, uint64_t& start, uint64_t& end) {
  uint64_t offset = 0;
  // check if originally flattened one-dimensional G4 array can be equally (up to 0) distributed across ranks
  // if balanced, each rank has same amount of elements to compute
  // if not, ranks with (rank_id < more_work_ranks) has to compute 1 more element than other ranks
  bool balanced = (total_G4_size % static_cast<uint64_t>(mpi_size) == 0);
  uint64_t local_work = total_G4_size / static_cast<uint64_t>(mpi_size);

  if(balanced) {
        offset = static_cast<uint64_t>(my_rank) * local_work;
        end  = offset + local_work - 1;
  }
  else {
    int more_work_ranks = total_G4_size % static_cast<uint64_t>(mpi_size);

    if (my_rank < more_work_ranks) {
      offset = static_cast<uint64_t>(my_rank) * (local_work + 1);
      end = offset + (local_work + 1);
    } else {
        offset = more_work_ranks * (local_work + 1) +
                 (static_cast<uint64_t>(my_rank) - more_work_ranks) * local_work;
        end = offset + local_work;
      }
  }
  start = offset;
}

}  // util
}  // parallel
}  // dca
Loading