From d28b7fd6c7d3d07d744c8271b6ccde952446011c Mon Sep 17 00:00:00 2001
From: Simon Heybrock <simon.heybrock@esss.se>
Date: Tue, 14 Nov 2017 15:57:37 +0100
Subject: [PATCH] Re #21181. Naive MPI implementation of IndexInfo
 det->spectrum mapper.

Probably very inefficient.
---
 .../MantidIndexing/SpectrumNumberTranslator.h |  2 +
 Framework/Indexing/src/IndexInfo.cpp          | 89 ++++++++++++++-----
 .../Indexing/src/SpectrumNumberTranslator.cpp |  8 ++
 Framework/Indexing/test/IndexInfoTest.h       | 28 ++++--
 4 files changed, 100 insertions(+), 27 deletions(-)

diff --git a/Framework/Indexing/inc/MantidIndexing/SpectrumNumberTranslator.h b/Framework/Indexing/inc/MantidIndexing/SpectrumNumberTranslator.h
index 974d19df399..486e19ed5fc 100644
--- a/Framework/Indexing/inc/MantidIndexing/SpectrumNumberTranslator.h
+++ b/Framework/Indexing/inc/MantidIndexing/SpectrumNumberTranslator.h
@@ -70,6 +70,8 @@ public:
   SpectrumIndexSet
   makeIndexSet(const std::vector<GlobalSpectrumIndex> &globalIndices) const;
 
+  PartitionIndex partitionOf(const GlobalSpectrumIndex globalIndex) const;
+
 private:
   bool isPartitioned() const;
   void checkUniqueSpectrumNumbers() const;
diff --git a/Framework/Indexing/src/IndexInfo.cpp b/Framework/Indexing/src/IndexInfo.cpp
index 30e91a8dd85..40da999a075 100644
--- a/Framework/Indexing/src/IndexInfo.cpp
+++ b/Framework/Indexing/src/IndexInfo.cpp
@@ -1,6 +1,7 @@
 #include "MantidIndexing/IndexInfo.h"
 #include "MantidIndexing/RoundRobinPartitioner.h"
 #include "MantidIndexing/SpectrumNumberTranslator.h"
+#include "MantidParallel/Collectives.h"
 #include "MantidParallel/Communicator.h"
 #include "MantidKernel/make_cow.h"
 #include "MantidKernel/make_unique.h"
@@ -226,10 +227,6 @@ SpectrumIndexSet IndexInfo::makeIndexSet(
 std::vector<GlobalSpectrumIndex>
 IndexInfo::globalSpectrumIndicesFromDetectorIndices(
     const std::vector<size_t> &detectorIndices) const {
-  if (m_communicator->size() != 1)
-    throw std::runtime_error("IndexInfo::"
-                             "globalSpectrumIndicesFromDetectorIndices does "
-                             "not support MPI runs");
   if (!m_spectrumDefinitions)
     throw std::runtime_error("IndexInfo::"
                              "globalSpectrumIndicesFromDetectorIndices -- no "
@@ -244,29 +241,81 @@ IndexInfo::globalSpectrumIndicesFromDetectorIndices(
     detectorMap[index] = 1;
   }
 
-  std::vector<GlobalSpectrumIndex> spectrumIndices;
+  // Global vector of spectrum definitions. For this purpose we do not need
+  // actual definitions which would be hard to transmit via MPI (many small
+  // vectors of unknown length). Either single detector or error flag.
+  std::vector<std::vector<int64_t>> spectrumDefinitions(communicator().size());
+  auto &thisRankSpectrumDefinitions =
+      spectrumDefinitions[communicator().rank()];
+  thisRankSpectrumDefinitions.resize(size());
   for (size_t i = 0; i < size(); ++i) {
     const auto &spectrumDefinition = m_spectrumDefinitions->operator[](i);
     if (spectrumDefinition.size() == 1) {
       const auto detectorIndex = spectrumDefinition[0].first;
-      if (detectorMap.size() > detectorIndex &&
-          detectorMap[detectorIndex] != 0) {
-        if (detectorMap[detectorIndex] > 1)
-          throw std::runtime_error(
-              "Multiple spectra correspond to the same detector");
-        // Increment flag to catch two spectra mapping to same detector.
-        ++detectorMap[detectorIndex];
-        spectrumIndices.push_back(i);
-      }
+      thisRankSpectrumDefinitions[i] = detectorIndex;
     }
+    // detectorIndex is unsigned so we can use negative values as error flags.
+    if (spectrumDefinition.size() == 0)
+      thisRankSpectrumDefinitions[i] = -1;
     if (spectrumDefinition.size() > 1)
-      throw std::runtime_error("SpectrumDefinition contains multiple entries. "
-                               "No unique mapping from detector to spectrum "
-                               "possible");
+      thisRankSpectrumDefinitions[i] = -2;
   }
-  if (detectorIndices.size() != spectrumIndices.size())
-    throw std::runtime_error(
-        "Some of the requested detectors do not have a corresponding spectrum");
+
+  std::vector<size_t> allSizes;
+  Parallel::gather(communicator(), size(), allSizes, 0);
+  std::vector<GlobalSpectrumIndex> spectrumIndices;
+  if (communicator().rank() == 0) {
+    for (int rank = 1; rank < communicator().size(); ++rank) {
+      spectrumDefinitions[rank].resize(allSizes[rank]);
+      int tag = 0;
+      auto buffer = reinterpret_cast<char *>(spectrumDefinitions[rank].data());
+      auto bytes = static_cast<int>(sizeof(int64_t) * allSizes[rank]);
+      communicator().recv(rank, tag, buffer, bytes);
+    }
+    std::vector<size_t> currentIndex(communicator().size(), 0);
+    for (size_t i = 0; i < globalSize(); ++i) {
+      int rank = static_cast<int>(
+          m_spectrumNumberTranslator->partitionOf(GlobalSpectrumIndex(i)));
+      const auto spectrumDefinition =
+          spectrumDefinitions[rank][currentIndex[rank]++];
+      if (spectrumDefinition >= 0) {
+        const auto detectorIndex = static_cast<size_t>(spectrumDefinition);
+        if (detectorMap.size() > detectorIndex &&
+            detectorMap[detectorIndex] != 0) {
+          if (detectorMap[detectorIndex] > 1)
+            throw std::runtime_error(
+                "Multiple spectra correspond to the same detector");
+          // Increment flag to catch two spectra mapping to same detector.
+          ++detectorMap[detectorIndex];
+          spectrumIndices.push_back(i);
+        }
+      }
+      if (spectrumDefinition == -2)
+        throw std::runtime_error(
+            "SpectrumDefinition contains multiple entries. "
+            "No unique mapping from detector to spectrum "
+            "possible");
+    }
+    if (detectorIndices.size() != spectrumIndices.size())
+      throw std::runtime_error("Some of the requested detectors do not have a "
+                               "corresponding spectrum");
+    for (int rank = 1; rank < communicator().size(); ++rank) {
+      int tag = 0;
+      auto buffer = reinterpret_cast<char *>(spectrumIndices.data());
+      auto bytes = static_cast<int>(sizeof(int64_t) * spectrumIndices.size());
+      communicator().send(rank, tag, buffer, bytes);
+    }
+  } else {
+    int tag = 0;
+    auto buffer = reinterpret_cast<char *>(thisRankSpectrumDefinitions.data());
+    auto bytes = static_cast<int>(sizeof(int64_t) * size());
+    communicator().send(0, tag, buffer, bytes);
+    spectrumIndices.resize(detectorIndices.size());
+    buffer = reinterpret_cast<char *>(spectrumIndices.data());
+    bytes = static_cast<int>(sizeof(int64_t) * spectrumIndices.size());
+    communicator().recv(0, tag, buffer, bytes);
+  }
+
   return spectrumIndices;
 }
 
diff --git a/Framework/Indexing/src/SpectrumNumberTranslator.cpp b/Framework/Indexing/src/SpectrumNumberTranslator.cpp
index ccbacc4393e..8802e2212e1 100644
--- a/Framework/Indexing/src/SpectrumNumberTranslator.cpp
+++ b/Framework/Indexing/src/SpectrumNumberTranslator.cpp
@@ -153,6 +153,14 @@ SpectrumIndexSet SpectrumNumberTranslator::makeIndexSet(
   return SpectrumIndexSet(indices, m_globalToLocal.size());
 }
 
+PartitionIndex SpectrumNumberTranslator::partitionOf(
+    const GlobalSpectrumIndex globalIndex) const {
+  checkUniqueSpectrumNumbers();
+  const auto spectrumNumber =
+      m_globalSpectrumNumbers[static_cast<size_t>(globalIndex)];
+  return m_spectrumNumberToPartition.at(spectrumNumber);
+}
+
 void SpectrumNumberTranslator::checkUniqueSpectrumNumbers() const {
   // To support legacy code that creates workspaces with duplicate spectrum
   // numbers we check for bad spectrum numbers only when needed, i.e., when
diff --git a/Framework/Indexing/test/IndexInfoTest.h b/Framework/Indexing/test/IndexInfoTest.h
index d825af1f46a..57edc2887c7 100644
--- a/Framework/Indexing/test/IndexInfoTest.h
+++ b/Framework/Indexing/test/IndexInfoTest.h
@@ -97,13 +97,27 @@ void run_construct_from_parent_StorageMode_Distributed(
 
 void run_globalSpectrumIndicesFromDetectorIndices_Distributed(
     const Parallel::Communicator &comm) {
-  IndexInfo i(47, Parallel::StorageMode::Distributed, comm);
-  if (comm.size() != 1)
-    TS_ASSERT_THROWS_EQUALS(
-        i.globalSpectrumIndicesFromDetectorIndices({}),
-        const std::runtime_error &e, std::string(e.what()),
-        "IndexInfo::globalSpectrumIndicesFromDetectorIndices "
-        "does not support MPI runs");
+  IndexInfo i(666, Parallel::StorageMode::Distributed, comm);
+  // Out of order
+  std::vector<size_t> detectorIndices{100, 101, 102, 200, 199};
+  std::vector<SpectrumDefinition> specDefs(i.size());
+  size_t current = 0;
+  for (size_t spec = 0; spec < 200; ++spec) {
+    if (i.isOnThisPartition(GlobalSpectrumIndex(spec))) {
+      if (spec > 42)
+        specDefs[current].add(spec + 1);
+      ++current;
+    }
+  }
+  i.setSpectrumDefinitions(specDefs);
+  const auto &indices =
+      i.globalSpectrumIndicesFromDetectorIndices(detectorIndices);
+  TS_ASSERT_EQUALS(indices.size(), 5);
+  TS_ASSERT_EQUALS(indices[0], 99);
+  TS_ASSERT_EQUALS(indices[1], 100);
+  TS_ASSERT_EQUALS(indices[2], 101);
+  TS_ASSERT_EQUALS(indices[3], 198);
+  TS_ASSERT_EQUALS(indices[4], 199);
 }
 }
 
-- 
GitLab