Commit 48086d33 authored by Weile's avatar Weile
Browse files

adding bidirectional ring communication, rudimentary support and need cleanup

parent 66556a17
Loading
Loading
Loading
Loading
+80 −41
Original line number Diff line number Diff line
@@ -114,19 +114,25 @@ private:

  void perform_one_communication_step(float& flop);

  float updateG4(const std::size_t channel_index);
  float updateG4(const std::size_t channel_index, std::array<RMatrix, 2>& G_array);

  void send(const std::array<RMatrix, 2>& data, int target, std::array<MPI_Request, 2>& request);
  void receive(std::array<RMatrix, 2>& data, int source, std::array<MPI_Request, 2>& request);
  void send(const std::array<RMatrix, 2>& data, int target, std::array<MPI_Request, 2>& request,
            const bool odd);
  void receive(std::array<RMatrix, 2>& data, int source, std::array<MPI_Request, 2>& request,
               const bool odd);

  using BaseClass::channels_;
  using BaseClass::G4_;

  // send buffer for pipeline ring algorithm
  std::array<RMatrix, 2> even_G_;
  std::array<RMatrix, 2> odd_sendbuff_G_;
  std::array<RMatrix, 2> even_sendbuff_G_;

  std::array<MPI_Request, 2> recv_requests_{MPI_REQUEST_NULL, MPI_REQUEST_NULL};
  std::array<MPI_Request, 2> send_requests_{MPI_REQUEST_NULL, MPI_REQUEST_NULL};
  std::array<MPI_Request, 2> odd_recv_requests_{MPI_REQUEST_NULL, MPI_REQUEST_NULL};
  std::array<MPI_Request, 2> even_recv_requests_{MPI_REQUEST_NULL, MPI_REQUEST_NULL};
  std::array<MPI_Request, 2> odd_send_requests_{MPI_REQUEST_NULL, MPI_REQUEST_NULL};
  std::array<MPI_Request, 2> even_send_requests_{MPI_REQUEST_NULL, MPI_REQUEST_NULL};

#ifndef DCA_HAVE_CUDA_AWARE_MPI
  std::array<std::vector<Complex>, 2> sendbuffer_;
@@ -181,7 +187,7 @@ float TpAccumulator<Parameters, linalg::GPU, DistType::MPI>::accumulate(
  BaseClass::computeG();

  for (std::size_t channel = 0; channel < G4_.size(); ++channel) {
    flop += updateG4(channel);
    flop += updateG4(channel, G_);
  }

  ringG(flop, meas_id);
@@ -256,7 +262,8 @@ void TpAccumulator<Parameters, linalg::GPU, DistType::MPI>::resetG4() {
}

template <class Parameters>
float TpAccumulator<Parameters, linalg::GPU, DistType::MPI>::updateG4(const std::size_t channel_index) {
float TpAccumulator<Parameters, linalg::GPU, DistType::MPI>::updateG4(
    const std::size_t channel_index, std::array<RMatrix, 2>& G_array) {
  // G4 is stored with the following band convention:
  // b1 ------------------------ b3
  //        |           |
@@ -272,33 +279,39 @@ float TpAccumulator<Parameters, linalg::GPU, DistType::MPI>::updateG4(const std:
  switch (channel) {
    case PARTICLE_HOLE_TRANSVERSE:
      return details::updateG4<Real, PARTICLE_HOLE_TRANSVERSE>(
          get_G4()[channel_index].ptr(), G_[0].ptr(), G_[0].leadingDimension(), G_[1].ptr(),
          G_[1].leadingDimension(), sign_, multiple_accumulators_, queues_[0], start_, end_);
          get_G4()[channel_index].ptr(), G_array[0].ptr(), G_array[0].leadingDimension(),
          G_array[1].ptr(), G_array[1].leadingDimension(), sign_, multiple_accumulators_,
          queues_[0], start_, end_);

    case PARTICLE_HOLE_MAGNETIC:
      return details::updateG4<Real, PARTICLE_HOLE_MAGNETIC>(
          get_G4()[channel_index].ptr(), G_[0].ptr(), G_[0].leadingDimension(), G_[1].ptr(),
          G_[1].leadingDimension(), sign_, multiple_accumulators_, queues_[0], start_, end_);
          get_G4()[channel_index].ptr(), G_array[0].ptr(), G_array[0].leadingDimension(),
          G_array[1].ptr(), G_array[1].leadingDimension(), sign_, multiple_accumulators_,
          queues_[0], start_, end_);

    case PARTICLE_HOLE_CHARGE:
      return details::updateG4<Real, PARTICLE_HOLE_CHARGE>(
          get_G4()[channel_index].ptr(), G_[0].ptr(), G_[0].leadingDimension(), G_[1].ptr(),
          G_[1].leadingDimension(), sign_, multiple_accumulators_, queues_[0], start_, end_);
          get_G4()[channel_index].ptr(), G_array[0].ptr(), G_array[0].leadingDimension(),
          G_array[1].ptr(), G_array[1].leadingDimension(), sign_, multiple_accumulators_,
          queues_[0], start_, end_);

    case PARTICLE_HOLE_LONGITUDINAL_UP_UP:
      return details::updateG4<Real, PARTICLE_HOLE_LONGITUDINAL_UP_UP>(
          get_G4()[channel_index].ptr(), G_[0].ptr(), G_[0].leadingDimension(), G_[1].ptr(),
          G_[1].leadingDimension(), sign_, multiple_accumulators_, queues_[0], start_, end_);
          get_G4()[channel_index].ptr(), G_array[0].ptr(), G_array[0].leadingDimension(),
          G_array[1].ptr(), G_array[1].leadingDimension(), sign_, multiple_accumulators_,
          queues_[0], start_, end_);

    case PARTICLE_HOLE_LONGITUDINAL_UP_DOWN:
      return details::updateG4<Real, PARTICLE_HOLE_LONGITUDINAL_UP_DOWN>(
          get_G4()[channel_index].ptr(), G_[0].ptr(), G_[0].leadingDimension(), G_[1].ptr(),
          G_[1].leadingDimension(), sign_, multiple_accumulators_, queues_[0], start_, end_);
          get_G4()[channel_index].ptr(), G_array[0].ptr(), G_array[0].leadingDimension(),
          G_array[1].ptr(), G_array[1].leadingDimension(), sign_, multiple_accumulators_,
          queues_[0], start_, end_);

    case PARTICLE_PARTICLE_UP_DOWN:
      return details::updateG4<Real, PARTICLE_PARTICLE_UP_DOWN>(
          get_G4()[channel_index].ptr(), G_[0].ptr(), G_[0].leadingDimension(), G_[1].ptr(),
          G_[1].leadingDimension(), sign_, multiple_accumulators_, queues_[0], start_, end_);
          get_G4()[channel_index].ptr(), G_array[0].ptr(), G_array[0].leadingDimension(),
          G_array[1].ptr(), G_array[1].leadingDimension(), sign_, multiple_accumulators_,
          queues_[0], start_, end_);

    default:
      throw std::logic_error("Specified four point type not implemented.");
@@ -320,25 +333,33 @@ void TpAccumulator<Parameters, linalg::GPU, DistType::MPI>::perform_one_communic
  //      b) and, local measurements are equal, and each accumulator should have same #measurement, i.e.
  //         measurements % ranks == 0 && local_measurement % threads == 0.

  send(odd_sendbuff_G_, right_neighbor, send_requests_);
  receive(G_, left_neighbor, recv_requests_);
  receive(G_, left_neighbor, odd_recv_requests_, true);
  receive(even_G_, right_neighbor, even_recv_requests_, false);
  send(odd_sendbuff_G_, right_neighbor, odd_send_requests_, true);
  send(even_sendbuff_G_, left_neighbor, even_send_requests_, false);

  // wait for G2 to be available again
  for (int s = 0; s < 2; ++s)
    MPI_Wait(&recv_requests_[s], MPI_STATUSES_IGNORE);
    MPI_Wait(&odd_recv_requests_[s], MPI_STATUSES_IGNORE);
  for (int s = 0; s < 2; ++s)
    MPI_Wait(&even_recv_requests_[s], MPI_STATUSES_IGNORE);

  // use newly copied G2 to update G4
  for (std::size_t channel = 0; channel < G4_.size(); ++channel) {
    flop += updateG4(channel);
    flop += updateG4(channel, G_);
    flop += updateG4(channel, even_G_);
  }

  // wait for sendbuf_G2 to be available again
  for (int s = 0; s < 2; ++s)
    MPI_Wait(&send_requests_[s], MPI_STATUSES_IGNORE);
    MPI_Wait(&odd_send_requests_[s], MPI_STATUSES_IGNORE);
  for (int s = 0; s < 2; ++s)
    MPI_Wait(&even_send_requests_[s], MPI_STATUSES_IGNORE);

  // get ready for send again
  for (int s = 0; s < 2; ++s) {
    odd_sendbuff_G_[s].swap(G_[s]);
    even_sendbuff_G_[s].swap(even_G_[s]);
  }
}

@@ -347,7 +368,7 @@ void TpAccumulator<Parameters, linalg::GPU, DistType::MPI>::ringG(float& flop, c
  auto is_odd = [](int const i) -> bool { return ((i % 2) == 1); };
  bool const perform_bidirectional_ring = is_odd(meas_id);

//  if (perform_bidirectional_ring == true) {
  if (perform_bidirectional_ring == true) {
    // get ready for current send and receive (i.e. meas_id = 1, 3, 5...)
    for (int s = 0; s < 2; ++s) {
      odd_sendbuff_G_[s] = G_[s];
@@ -356,14 +377,14 @@ void TpAccumulator<Parameters, linalg::GPU, DistType::MPI>::ringG(float& flop, c
    for (int icount = 0; icount < (mpi_size - 1); icount++) {
      perform_one_communication_step(flop);
    }
//  }
//  else {
//    // store G2 in even measurement step (i.e. meas_id = 0, 2, 4...) as a previous G2
//    for (int s = 0; s < 2; ++s) {
//      even_G_[s] = G_[s];
//      even_sendbuff_G_[s] = G_[s];
//    }
//  }
  }
  else {
    // store G2 in even measurement step (i.e. meas_id = 0, 2, 4...) as a even G2
    for (int s = 0; s < 2; ++s) {
      even_G_[s] = G_[s];
      even_sendbuff_G_[s] = G_[s];
    }
  }
}

template <class Parameters>
@@ -375,15 +396,25 @@ auto TpAccumulator<Parameters, linalg::GPU, DistType::MPI>::get_G4() -> std::vec
template <class Parameters>
void TpAccumulator<Parameters, linalg::GPU, DistType::MPI>::send(const std::array<RMatrix, 2>& data,
                                                                 int target,
                                                                 std::array<MPI_Request, 2>& request) {
                                                                 std::array<MPI_Request, 2>& request,
                                                                 const bool odd) {
  using dca::parallel::MPITypeMap;
  const auto g_size = data[0].size().first * data[0].size().second;

#ifdef DCA_HAVE_CUDA_AWARE_MPI
  if (odd) {
    for (int s = 0; s < 2; ++s) {
      MPI_Isend(data[s].ptr(), g_size, MPITypeMap<Complex>::value(), target, thread_id_ + 1,
                MPI_COMM_WORLD, &request[s]);
    }
  }
  else {
    for (int s = 0; s < 2; ++s) {
      MPI_Isend(data[s].ptr(), g_size, MPITypeMap<Complex>::value(), target,
                thread_id_ + 1 + nr_accumulators_, MPI_COMM_WORLD, &request[s]);
    }
  }

#else

  for (int s = 0; s < 2; ++s) {
@@ -399,15 +430,23 @@ void TpAccumulator<Parameters, linalg::GPU, DistType::MPI>::send(const std::arra

template <class Parameters>
void TpAccumulator<Parameters, linalg::GPU, DistType::MPI>::receive(
    std::array<RMatrix, 2>& data, int source, std::array<MPI_Request, 2>& request) {
    std::array<RMatrix, 2>& data, int source, std::array<MPI_Request, 2>& request, const bool odd) {
  using dca::parallel::MPITypeMap;
  const auto g_size = data[0].size().first * data[0].size().second;

#ifdef DCA_HAVE_CUDA_AWARE_MPI
  if (odd) {
    for (int s = 0; s < 2; ++s) {
      MPI_Irecv(data[s].ptr(), g_size, MPITypeMap<Complex>::value(), source, thread_id_ + 1,
                MPI_COMM_WORLD, &request[s]);
    }
  }
  else {
    for (int s = 0; s < 2; ++s) {
      MPI_Irecv(data[s].ptr(), g_size, MPITypeMap<Complex>::value(), source,
                thread_id_ + 1 + nr_accumulators_, MPI_COMM_WORLD, &request[s]);
    }
  }

#else
  for (int s = 0; s < 2; ++s) {