EventParser.h 9.1 KB
Newer Older
1
2
3
4
5
6
// Mantid Repository : https://github.com/mantidproject/mantid
//
// Copyright © 2017 ISIS Rutherford Appleton Laboratory UKRI,
//     NScD Oak Ridge National Laboratory, European Spallation Source
//     & Institut Laue - Langevin
// SPDX - License - Identifier: GPL - 3.0 +
7
#pragma once
LamarMoore's avatar
LamarMoore committed
8

9
#include "MantidParallel/Collectives.h"
10
#include "MantidParallel/Communicator.h"
11
#include "MantidParallel/DllConfig.h"
12
#include "MantidParallel/IO/Chunker.h"
13
#include "MantidParallel/IO/EventDataPartitioner.h"
LamarMoore's avatar
LamarMoore committed
14
#include "MantidParallel/Nonblocking.h"
Lamar Moore's avatar
Lamar Moore committed
15
#include "MantidTypes/Event/TofEvent.h"
16

17
#include <chrono>
LamarMoore's avatar
LamarMoore committed
18
#include <cstdint>
19
#include <numeric>
20
#include <thread>
LamarMoore's avatar
LamarMoore committed
21
#include <vector>
22
23
24
#include <xmmintrin.h>

using Mantid::Types::Event::TofEvent;
LamarMoore's avatar
LamarMoore committed
25
26
27
28

namespace Mantid {
namespace Parallel {
namespace IO {
29

30
31
32
/** Distributed (MPI) parsing of Nexus events from a data stream. Data is
distributed accross MPI ranks for writing to event lists on the correct target
rank.
33
34
35
36

@author Lamar Moore
@date 2017
*/
37
namespace detail {
38
39
40
void MANTID_PARALLEL_DLL eventIdToGlobalSpectrumIndex(int32_t *event_id_start,
                                                      size_t count,
                                                      const int32_t bankOffset);
41
42
}

43
template <class TimeOffsetType> class EventParser {
LamarMoore's avatar
LamarMoore committed
44
public:
45
  using Event = detail::Event<TimeOffsetType>;
46
47
  EventParser(const Communicator &comm,
              std::vector<std::vector<int>> rankGroups,
48
              std::vector<int32_t> bankOffsets,
49
              std::vector<std::vector<Types::Event::TofEvent> *> eventLists);
LamarMoore's avatar
LamarMoore committed
50

LamarMoore's avatar
LamarMoore committed
51
52
53
  void setEventDataPartitioner(
      std::unique_ptr<AbstractEventDataPartitioner<TimeOffsetType>>
          partitioner);
54
  void setEventTimeOffsetUnit(const std::string &unit);
55

56
  void startAsync(int32_t *event_id_start,
57
                  const TimeOffsetType *event_time_offset_start,
58
                  const Chunker::LoadRange &range);
59

60
  void wait();
LamarMoore's avatar
LamarMoore committed
61
62

private:
63
  void doParsing(int32_t *event_id_start,
64
                 const TimeOffsetType *event_time_offset_start,
65
                 const Chunker::LoadRange &range);
66

67
  void redistributeDataMPI();
68
69
70
71
  void populateEventLists();

  // Default to 0 such that failure to set unit is easily detected.
  double m_timeOffsetScale{0.0};
72
  Communicator m_comm;
73
  std::vector<std::vector<int>> m_rankGroups;
LamarMoore's avatar
LamarMoore committed
74
  std::vector<int32_t> m_bankOffsets;
75
  std::vector<std::vector<Types::Event::TofEvent> *> m_eventLists;
76
  std::unique_ptr<AbstractEventDataPartitioner<TimeOffsetType>> m_partitioner;
77
78
  std::vector<std::vector<Event>> m_allRankData;
  std::vector<Event> m_thisRankData;
79
  std::thread m_thread;
LamarMoore's avatar
LamarMoore committed
80
81
};

82
83
84
85
86
87
88
89
90
91
92
93
94
/** Constructor for EventParser.
 *
 * @param rankGroups rank grouping for banks which determines how work is
 * partitioned. The EventParser guarantees to process data obtained from ranks
 * in the same group in-order to ensure pulse time ordering.
 * @param bankOffsets used to convert from event ID to global spectrum index.
 * This assumes that all event IDs within a bank a contiguous.
 * @param eventLists workspace event lists which will be populated by the
 * parser. The parser assumes that there always is a matching event list for any
 * event ID that will be passed in via `startAsync`.
 * @param globalToLocalSpectrumIndex lookup table which converts a global
 * spectrum index to a spectrum index local to a given mpi rank
 */
95
96
template <class TimeOffsetType>
EventParser<TimeOffsetType>::EventParser(
97
98
    const Communicator &comm, std::vector<std::vector<int>> rankGroups,
    std::vector<int32_t> bankOffsets,
99
    std::vector<std::vector<TofEvent> *> eventLists)
100
    : m_comm(comm), m_rankGroups(std::move(rankGroups)),
101
102
103
      m_bankOffsets(std::move(bankOffsets)),
      m_eventLists(std::move(eventLists)) {}

104
/// Set the EventDataPartitioner to use for parsing subsequent events.
105
template <class TimeOffsetType>
106
107
void EventParser<TimeOffsetType>::setEventDataPartitioner(
    std::unique_ptr<AbstractEventDataPartitioner<TimeOffsetType>> partitioner) {
108
109
110
  // We hold (and use) the PulseTimeGenerator via a virtual base class to avoid
  // the need of having IndexType and TimeZeroType as templates for the whole
  // class.
111
  m_partitioner = std::move(partitioner);
112
113
}

114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
/** Set the unit of the values in `event_time_offset`.
 *
 * The unit is used to initialize a scale factor needed for conversion of
 * time-of-flight to microseconds, the unit used by TofEvent. */
template <class TimeOffsetType>
void EventParser<TimeOffsetType>::setEventTimeOffsetUnit(
    const std::string &unit) {
  constexpr char second[] = "second";
  constexpr char microsecond[] = "microsecond";
  constexpr char nanosecond[] = "nanosecond";

  if (unit == second) {
    m_timeOffsetScale = 1e6;
    return;
  }
  if (unit == microsecond) {
    m_timeOffsetScale = 1.0;
    return;
  }
  if (unit == nanosecond) {
    m_timeOffsetScale = 1e-3;
    return;
  }
  throw std::runtime_error("EventParser: unsupported unit `" + unit +
                           "` for event_time_offset");
}

141
142
/// Convert m_allRankData into m_thisRankData by means of redistribution via
/// MPI.
143
template <class TimeOffsetType>
144
145
146
void EventParser<TimeOffsetType>::redistributeDataMPI() {
  if (m_comm.size() == 1) {
    m_thisRankData = m_allRankData.front();
147
148
149
    return;
  }

150
151
  std::vector<int> sizes(m_allRankData.size());
  std::transform(m_allRankData.cbegin(), m_allRankData.cend(), sizes.begin(),
152
                 [](const std::vector<Event> &vec) {
153
154
                   return static_cast<int>(vec.size());
                 });
155
156
  std::vector<int> recv_sizes(m_allRankData.size());
  Parallel::all_to_all(m_comm, sizes, recv_sizes);
157

158
  auto total_size = std::accumulate(recv_sizes.begin(), recv_sizes.end(), 0);
159
  m_thisRankData.resize(total_size);
160
161
  size_t offset = 0;
  std::vector<Parallel::Request> recv_requests;
162
  for (int rank = 0; rank < m_comm.size(); ++rank) {
163
164
    if (recv_sizes[rank] == 0)
      continue;
165
    int tag = 0;
166
    auto buffer = reinterpret_cast<char *>(m_thisRankData.data() + offset);
167
    int size = recv_sizes[rank] * static_cast<int>(sizeof(Event));
168
    recv_requests.emplace_back(m_comm.irecv(rank, tag, buffer, size));
169
170
171
172
    offset += recv_sizes[rank];
  }

  std::vector<Parallel::Request> send_requests;
173
174
  for (int rank = 0; rank < m_comm.size(); ++rank) {
    const auto &vec = m_allRankData[rank];
175
176
    if (vec.size() == 0)
      continue;
177
178
    int tag = 0;
    send_requests.emplace_back(
179
180
        m_comm.isend(rank, tag, reinterpret_cast<const char *>(vec.data()),
                     static_cast<int>(vec.size() * sizeof(Event))));
181
182
183
184
185
186
  }

  Parallel::wait_all(send_requests.begin(), send_requests.end());
  Parallel::wait_all(recv_requests.begin(), recv_requests.end());
}

187
/// Append events in m_thisRankData to m_eventLists.
188
template <class TimeOffsetType>
189
190
191
192
void EventParser<TimeOffsetType>::populateEventLists() {
  for (const auto &event : m_thisRankData) {
    m_eventLists[event.index]->emplace_back(
        m_timeOffsetScale * static_cast<double>(event.tof), event.pulseTime);
193
194
195
196
    // In general `index` is random so this loop suffers from frequent cache
    // misses (probably because the hardware prefetchers cannot keep up with the
    // number of different memory locations that are getting accessed). We
    // manually prefetch into L2 cache to reduce the amount of misses.
197
198
199
    _mm_prefetch(
        reinterpret_cast<char *>(&m_eventLists[event.index]->back() + 1),
        _MM_HINT_T1);
200
201
202
203
204
205
206
207
208
209
210
211
212
213
  }
}

/** Accepts raw data from file which has been pre-treated and sorted into chunks
 * for parsing. The parser extracts event data from the provided buffers,
 * separates then according to MPI ranks and then appends them to the workspace
 * event list. Asynchronously starts parsing wait() must be called before
 * attempting to invoke this method subsequently.
 * @param event_id_start Buffer containing event IDs.
 * @param event_time_offset_start Buffer containing TOD.
 * @param range contains information on the detector bank which corresponds to
 * the data in the buffers, the file index offset where data starts and the
 * number of elements in the data array.
 */
214
215
template <class TimeOffsetType>
void EventParser<TimeOffsetType>::startAsync(
216
217
    int32_t *event_id_start, const TimeOffsetType *event_time_offset_start,
    const Chunker::LoadRange &range) {
218
  // Wrapped in lambda because std::thread is unable to specialize doParsing on
219
  // its own
220
  m_thread =
221
      std::thread([this, event_id_start, event_time_offset_start, range] {
222
223
        doParsing(event_id_start, event_time_offset_start, range);
      });
224
225
}

226
227
template <class TimeOffsetType>
void EventParser<TimeOffsetType>::doParsing(
228
229
230
    int32_t *event_id_start, const TimeOffsetType *event_time_offset_start,
    const Chunker::LoadRange &range) {
  // change event_id_start in place
231
232
  detail::eventIdToGlobalSpectrumIndex(event_id_start, range.eventCount,
                                       m_bankOffsets[range.bankIndex]);
233
234

  // event_id_start now contains globalSpectrumIndex
235
236
  m_partitioner->partition(m_allRankData, event_id_start,
                           event_time_offset_start, range);
237

238
  redistributeDataMPI();
239
  populateEventLists();
240
241
}

242
template <class TimeOffsetType> void EventParser<TimeOffsetType>::wait() {
243
  m_thread.join();
244
245
}

LamarMoore's avatar
LamarMoore committed
246
247
} // namespace IO
} // namespace Parallel
248
} // namespace Mantid