Commit a0af5e6b authored by Dan Nixon's avatar Dan Nixon
Browse files

Implement grouped parallel event insertion

This is based on the prototype derived from the meeting at the DMSC last December.

The workflow is as follows:
  - Pool all events received from the Kafka stream into a single buffer
  - When the buffer is to be flushed (i.e. events populated in the EventWorkspace):
    - Parallel sort the event buffer by the detector ID/workspace index
    - Determine group boundaries
    - Parallel insert events into EventWorkspace

Also included is the optimisation of using a spec to ID mapping based on
a vector rather than map.
This provides better performance during the initial event buffering
(where this lookup is made).

Initial benchamrking shows 25M events is a intermediate buffer threshold
for LOKI (9 banks) at 10^7 events per second.
This manages to maintain well over 14Hz pulse rate (with no post
processing).
parent dfa0ada6
......@@ -546,6 +546,10 @@ public:
void invalidateCachedSpectrumNumbers();
/// Invalidates the commons bins flag. This is generally called when a method
/// could allow the X values to be changed.
void invalidateCommonBinsFlag() { m_isCommonBinsFlagValid.store(false); }
protected:
/// Protected copy constructor. May be used by childs for cloning.
MatrixWorkspace(const MatrixWorkspace &other);
......@@ -561,10 +565,6 @@ protected:
virtual ISpectrum &getSpectrumWithoutInvalidation(const size_t index) = 0;
/// Invalidates the commons bins flag. This is generally called when a method
/// could allow the X values to be changed.
void invalidateCommonBinsFlag() { m_isCommonBinsFlagValid.store(false); }
void updateCachedDetectorGrouping(const size_t index) const override;
/// A vector of pointers to the axes for this workspace
......
......@@ -77,6 +77,7 @@ public:
return getSpectrumWithoutInvalidation(index);
}
const EventList &getSpectrum(const size_t index) const override;
EventList *getSpectrumUnsafe(const size_t index);
//------------------------------------------------------------
......
......@@ -184,6 +184,21 @@ const EventList &EventWorkspace::getSpectrum(const size_t index) const {
return *data[index];
}
/**
* Returns a pointer to the EventList for a given spectrum in a timely manner.
*
* Very minimal checking and preprocessing is performed by this function and it
* should only be used in tight loops where getSpectrum is too costly.
*
* See the implementation of the non-const getSpectrum to see what is missing.
*
* @param index Workspace index
* @return Pointer to EventList
*/
EventList *EventWorkspace::getSpectrumUnsafe(const size_t index) {
return data[index].get();
}
double EventWorkspace::getTofMin() const { return this->getEventXMin(); }
double EventWorkspace::getTofMax() const { return this->getEventXMax(); }
......
......@@ -137,7 +137,8 @@ protected:
/// Subscriber for the data stream
std::unique_ptr<IKafkaStreamSubscriber> m_dataStream;
/// Mapping of spectrum number to workspace index.
spec2index_map m_specToIdx;
std::vector<size_t> m_specToIdx;
specnum_t m_specToIdxOffset;
/// Start time of the run
Types::Core::DateAndTime m_runStart;
/// Subscriber for the run info stream
......
......@@ -31,7 +31,7 @@ class KafkaEventStreamDecoder;
*/
class DLLExport KafkaEventListener : public API::LiveListener {
public:
KafkaEventListener() = default;
KafkaEventListener();
~KafkaEventListener() override = default;
//----------------------------------------------------------------------
......
......@@ -13,9 +13,10 @@
#include "MantidLiveData/Kafka/IKafkaStreamDecoder.h"
#include "MantidLiveData/Kafka/IKafkaStreamSubscriber.h"
#include <vector>
namespace Mantid {
namespace LiveData {
/**
High-level interface to Kafka event system. It requires
3 topic names of the data streams.
......@@ -24,12 +25,25 @@ namespace LiveData {
thread.
*/
class DLLExport KafkaEventStreamDecoder : public IKafkaStreamDecoder {
public:
struct BufferedPulse {
Types::Core::DateAndTime pulseTime;
int periodNumber;
};
struct BufferedEvent {
size_t wsIdx;
uint64_t tof;
size_t pulseIndex;
};
public:
KafkaEventStreamDecoder(std::shared_ptr<IKafkaBroker> broker,
const std::string &eventTopic,
const std::string &runInfoTopic,
const std::string &spDetTopic,
const std::string &sampleEnvTopic);
const std::string &sampleEnvTopic,
const std::size_t bufferThreshold);
~KafkaEventStreamDecoder();
KafkaEventStreamDecoder(const KafkaEventStreamDecoder &) = delete;
KafkaEventStreamDecoder &operator=(const KafkaEventStreamDecoder &) = delete;
......@@ -44,13 +58,15 @@ public:
private:
void captureImplExcept() override;
void eventDataFromMessage(const std::string &buffer, size_t &eventCount,
uint64_t &pulseTimeRet);
void flushIntermediateBuffer();
/// Create the cache workspaces, LoadLiveData extracts data from these
void initLocalCaches(const std::string &rawMsgBuffer,
const RunStartStruct &runStartData) override;
/// Populate cache workspaces with data from messages
void eventDataFromMessage(const std::string &buffer);
void sampleDataFromMessage(const std::string &buffer) override;
/// For LoadLiveData to extract the cached data
......@@ -58,8 +74,21 @@ private:
/// Local event workspace buffers
std::vector<DataObjects::EventWorkspace_sptr> m_localEvents;
/// Intermediate buffer for received events yet to be populated in
/// m_localEvents
std::vector<BufferedEvent> m_receivedEventBuffer;
std::vector<BufferedPulse> m_receivedPulseBuffer;
/// Mutex protecting intermediate buffers
mutable std::mutex m_intermediateBufferMutex;
/// The number of events above which the intermediate buffer will be flushed
const std::size_t m_intermediateBufferFlushThreshold;
};
DLLExport std::vector<size_t> computeGroupBoundaries(
const std::vector<KafkaEventStreamDecoder::BufferedEvent> &eventBuffer,
const size_t numberOfGroups);
} // namespace LiveData
} // namespace Mantid
......
......@@ -20,6 +20,12 @@ namespace LiveData {
DECLARE_LISTENER(KafkaEventListener)
KafkaEventListener::KafkaEventListener() : API::LiveListener() {
declareProperty("BufferThreshold", static_cast<uint64_t>(1000000),
"Threshold number of events at which the intermediate event "
"buffer will be flushed to the buffered EventWorkspace.");
}
void KafkaEventListener::setAlgorithm(
const Mantid::API::IAlgorithm &callingAlgorithm) {
this->updatePropertyValues(callingAlgorithm);
......@@ -38,6 +44,8 @@ bool KafkaEventListener::connect(const Poco::Net::SocketAddress &address) {
g_log.error(
"KafkaEventListener::connect requires a non-empty instrument name");
}
const std::size_t bufferThreshold = getProperty("BufferThreshold");
auto broker = std::make_shared<KafkaBroker>(address.toString());
try {
const std::string eventTopic(m_instrumentName +
......@@ -48,7 +56,8 @@ bool KafkaEventListener::connect(const Poco::Net::SocketAddress &address) {
sampleEnvTopic(m_instrumentName +
KafkaTopicSubscriber::SAMPLE_ENV_TOPIC_SUFFIX);
m_decoder = std::make_unique<KafkaEventStreamDecoder>(
broker, eventTopic, runInfoTopic, spDetInfoTopic, sampleEnvTopic);
broker, eventTopic, runInfoTopic, spDetInfoTopic, sampleEnvTopic,
bufferThreshold);
} catch (std::exception &exc) {
g_log.error() << "KafkaEventListener::connect - Connection Error: "
<< exc.what() << "\n";
......
......@@ -10,6 +10,7 @@
#include "MantidAPI/WorkspaceGroup.h"
#include "MantidKernel/DateAndTimeHelpers.h"
#include "MantidKernel/Logger.h"
#include "MantidKernel/MultiThreaded.h"
#include "MantidKernel/TimeSeriesProperty.h"
#include "MantidKernel/WarningSuppressions.h"
......@@ -25,8 +26,18 @@ GNU_DIAG_OFF("conversion")
#include "private/Schema/is84_isis_events_generated.h"
GNU_DIAG_ON("conversion")
#include <chrono>
#include <numeric>
#include <tbb/parallel_sort.h>
using namespace Mantid::Types;
using namespace LogSchema;
size_t totalNumEventsSinceStart = 0;
size_t totalNumEventsBeforeLastTimeout = 0;
double totalPopulateWorkspaceDuration = 0;
double numPopulateWorkspaceCalls = 0;
double totalEventFromMessageDuration = 0;
double numEventFromMessageCalls = 0;
namespace {
/// Logger
......@@ -63,6 +74,27 @@ void appendToLog(Mantid::API::Run &mutableRunInfo, const std::string &name,
mutableRunInfo.addLogData(property);
}
}
void sortIntermediateEventBuffer(
std::vector<Mantid::LiveData::KafkaEventStreamDecoder::BufferedEvent>
&eventBuffer,
const std::vector<Mantid::LiveData::KafkaEventStreamDecoder::BufferedPulse>
&pulseBuffer) {
tbb::parallel_sort(
eventBuffer.begin(), eventBuffer.end(),
[&](const Mantid::LiveData::KafkaEventStreamDecoder::BufferedEvent &lhs,
const Mantid::LiveData::KafkaEventStreamDecoder::BufferedEvent &rhs)
-> bool {
const auto &lhsPulse = pulseBuffer[lhs.pulseIndex];
const auto &rhsPulse = pulseBuffer[rhs.pulseIndex];
/* If events are from different periods compare the period
* numbers, otherwise compare the workspace index */
return (lhsPulse.periodNumber != rhsPulse.periodNumber)
? lhsPulse.periodNumber < rhsPulse.periodNumber
: lhs.wsIdx < rhs.wsIdx;
});
}
} // namespace
namespace Mantid {
......@@ -83,9 +115,10 @@ using Types::Event::TofEvent;
KafkaEventStreamDecoder::KafkaEventStreamDecoder(
std::shared_ptr<IKafkaBroker> broker, const std::string &eventTopic,
const std::string &runInfoTopic, const std::string &spDetTopic,
const std::string &sampleEnvTopic)
const std::string &sampleEnvTopic, const std::size_t bufferThreshold)
: IKafkaStreamDecoder(broker, eventTopic, runInfoTopic, spDetTopic,
sampleEnvTopic) {}
sampleEnvTopic),
m_intermediateBufferFlushThreshold(bufferThreshold) {}
/**
* Destructor.
......@@ -99,7 +132,7 @@ KafkaEventStreamDecoder::~KafkaEventStreamDecoder() {}
* can be called, false otherwise
*/
bool KafkaEventStreamDecoder::hasData() const noexcept {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> workspaceLock(m_mutex);
return !m_localEvents.empty();
}
......@@ -127,7 +160,12 @@ bool KafkaEventStreamDecoder::hasReachedEndOfRun() noexcept {
// -----------------------------------------------------------------------------
API::Workspace_sptr KafkaEventStreamDecoder::extractDataImpl() {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> workspaceLock(m_mutex);
g_log.notice() << "Events since last timeout "
<< totalNumEventsSinceStart - totalNumEventsBeforeLastTimeout
<< std::endl;
totalNumEventsBeforeLastTimeout = totalNumEventsSinceStart;
if (m_localEvents.size() == 1) {
auto temp = createBufferWorkspace<DataObjects::EventWorkspace>(
"EventWorkspace", m_localEvents.front());
......@@ -175,21 +213,63 @@ void KafkaEventStreamDecoder::captureImplExcept() {
std::unordered_map<std::string, std::vector<bool>> reachedEnd;
bool checkOffsets = false;
size_t nEvents = 0;
size_t nMessages = 0;
size_t totalMessages = 0;
size_t eventsPerMessage = 0;
size_t lastMessageEvents = 0;
uint64_t lastPulseTime = 0;
size_t messagesPerPulse = 0;
size_t numMessagesForSinglePulse = 0;
size_t pulseTimeCount = 0;
auto globstart = std::chrono::system_clock::now();
auto start = std::chrono::system_clock::now();
while (!m_interrupt) {
if (m_endRun) {
/* Ensure the intermediate buffer is flushed so as to prevent
* EventWorksapces containing events from other runs. */
flushIntermediateBuffer();
waitForRunEndObservation();
continue;
} else {
waitForDataExtraction();
}
// Pull in events
m_dataStream->consumeMessage(&buffer, offset, partition, topicName);
// No events, wait for some to come along...
if (buffer.empty()) {
start = std::chrono::system_clock::now();
globstart = std::chrono::system_clock::now();
g_log.notice() << "Waiting to start..." << std::endl;
m_cbIterationEnd();
continue;
}
const auto end = std::chrono::system_clock::now();
const std::chrono::duration<double> dur = end - start;
if (dur.count() >= 60) {
g_log.notice() << "Message count " << nMessages << '\n';
const auto rate = static_cast<double>(nMessages) / dur.count();
g_log.notice() << "Consuming " << rate << "Hz\n";
nMessages = 0;
g_log.notice() << eventsPerMessage << " events per message\n";
const auto mpp = static_cast<double>(numMessagesForSinglePulse) /
static_cast<double>(pulseTimeCount);
g_log.notice() << mpp << " event messages per pulse\n";
g_log.notice() << "Achievable pulse rate is " << rate / mpp << "Hz\n";
g_log.notice() << "Average time taken to convert event messages "
<< totalEventFromMessageDuration / numEventFromMessageCalls
<< " seconds\n";
g_log.notice() << "Average time taken to populate workspace "
<< totalPopulateWorkspaceDuration /
numPopulateWorkspaceCalls
<< " seconds\n";
start = std::chrono::system_clock::now();
}
if (checkOffsets) {
checkRunEnd(topicName, checkOffsets, offset, partition, stopOffsets,
reachedEnd);
......@@ -206,7 +286,32 @@ void KafkaEventStreamDecoder::captureImplExcept() {
if (flatbuffers::BufferHasIdentifier(
reinterpret_cast<const uint8_t *>(buffer.c_str()),
EVENT_MESSAGE_ID.c_str())) {
eventDataFromMessage(buffer);
uint64_t currentPulseTime(-1);
eventDataFromMessage(buffer, nEvents, currentPulseTime);
if (lastPulseTime == 0)
lastPulseTime = currentPulseTime;
else if (lastPulseTime != currentPulseTime) {
++pulseTimeCount;
lastPulseTime = currentPulseTime;
numMessagesForSinglePulse += messagesPerPulse;
messagesPerPulse = 0;
}
++messagesPerPulse;
eventsPerMessage = nEvents - lastMessageEvents;
lastMessageEvents = nEvents;
/* If there are enough events in the receive buffer then empty it into
* the EventWorkspace(s) */
if (m_receivedEventBuffer.size() > m_intermediateBufferFlushThreshold) {
flushIntermediateBuffer();
}
totalNumEventsSinceStart = nEvents;
++nMessages;
++totalMessages;
}
// Check if we have a sample environment log message
else if (flatbuffers::BufferHasIdentifier(
......@@ -219,39 +324,146 @@ void KafkaEventStreamDecoder::captureImplExcept() {
checkRunMessage(buffer, checkOffsets, stopOffsets, reachedEnd);
m_cbIterationEnd();
}
/* Flush any remaining events when capture is terminated */
flushIntermediateBuffer();
const auto globend = std::chrono::system_clock::now();
const std::chrono::duration<double> dur = globend - globstart;
g_log.notice() << "Consumed at a rate of "
<< static_cast<double>(totalMessages) / dur.count() << "Hz"
<< std::endl;
g_log.debug("Event capture finished");
totalNumEventsBeforeLastTimeout = 0;
totalNumEventsSinceStart = 0;
totalNumEventsSinceStart = 0;
totalNumEventsBeforeLastTimeout = 0;
totalPopulateWorkspaceDuration = 0;
numPopulateWorkspaceCalls = 0;
totalEventFromMessageDuration = 0;
numEventFromMessageCalls = 0;
}
void KafkaEventStreamDecoder::eventDataFromMessage(const std::string &buffer) {
auto eventMsg =
void KafkaEventStreamDecoder::eventDataFromMessage(const std::string &buffer,
size_t &eventCount,
uint64_t &pulseTimeRet) {
/* Parse message */
const auto eventMsg =
GetEventMessage(reinterpret_cast<const uint8_t *>(buffer.c_str()));
DateAndTime pulseTime = static_cast<int64_t>(eventMsg->pulse_time());
/* Parse pulse time */
pulseTimeRet = static_cast<uint64_t>(eventMsg->pulse_time());
const DateAndTime pulseTime(pulseTimeRet);
/* Get TOF and detector ID buffers */
const auto &tofData = *(eventMsg->time_of_flight());
const auto &detData = *(eventMsg->detector_id());
auto nEvents = tofData.size();
DataObjects::EventWorkspace_sptr periodBuffer;
std::lock_guard<std::mutex> lock(m_mutex);
/* Increment event count */
const auto nEvents = tofData.size();
eventCount += nEvents;
/* Create buffered pulse */
BufferedPulse pulse{pulseTime, 0};
/* Perform facility specific operations */
if (eventMsg->facility_specific_data_type() == FacilityData_ISISData) {
auto ISISMsg =
std::lock_guard<std::mutex> workspaceLock(m_mutex);
const auto ISISMsg =
static_cast<const ISISData *>(eventMsg->facility_specific_data());
periodBuffer = m_localEvents[static_cast<size_t>(ISISMsg->period_number())];
auto &mutableRunInfo = periodBuffer->mutableRun();
pulse.periodNumber = static_cast<int>(ISISMsg->period_number());
auto periodWs = m_localEvents[pulse.periodNumber];
auto &mutableRunInfo = periodWs->mutableRun();
mutableRunInfo.getTimeSeriesProperty<double>(PROTON_CHARGE_PROPERTY)
->addValue(pulseTime, ISISMsg->proton_charge());
} else {
periodBuffer = m_localEvents[0];
}
for (decltype(nEvents) i = 0; i < nEvents; ++i) {
auto &spectrum = periodBuffer->getSpectrum(
m_specToIdx[static_cast<int32_t>(detData[i])]);
spectrum.addEventQuickly(TofEvent(static_cast<double>(tofData[i]) *
1e-3, // nanoseconds to microseconds
pulseTime));
const auto starttime = std::chrono::system_clock::now();
{
std::lock_guard<std::mutex> bufferLock(m_intermediateBufferMutex);
/* Store the buffered pulse */
m_receivedPulseBuffer.push_back(pulse);
const auto pulseIndex = m_receivedPulseBuffer.size() - 1;
/* Ensure storage for newly received events */
const auto oldBufferSize(m_receivedEventBuffer.size());
m_receivedEventBuffer.reserve(oldBufferSize + nEvents);
/* Store the buffered events */
std::transform(detData.begin(), detData.end(), detData.begin(),
std::back_inserter(m_receivedEventBuffer),
[&](uint64_t detId, uint64_t tof) -> BufferedEvent {
const auto workspaceIndex =
m_specToIdx[detId + m_specToIdxOffset];
return {workspaceIndex, tof, pulseIndex};
});
}
const auto endTime = std::chrono::system_clock::now();
const std::chrono::duration<double> dur = endTime - starttime;
totalEventFromMessageDuration += dur.count();
numEventFromMessageCalls += 1;
}
void KafkaEventStreamDecoder::flushIntermediateBuffer() {
/* Do nothing if there are no buffered events */
if (m_receivedEventBuffer.empty()) {
return;
}
g_log.notice() << "Populating event workspace with "
<< m_receivedEventBuffer.size() << " events\n";
const auto startTime = std::chrono::system_clock::now();
std::lock_guard<std::mutex> bufferLock(m_intermediateBufferMutex);
sortIntermediateEventBuffer(m_receivedEventBuffer, m_receivedPulseBuffer);
/* Compute groups for parallel insertion */
const auto numberOfGroups = PARALLEL_GET_MAX_THREADS;
const auto groupBoundaries =
computeGroupBoundaries(m_receivedEventBuffer, numberOfGroups);
/* Insert events into EventWorkspace(s) */
{
std::lock_guard<std::mutex> workspaceLock(m_mutex);
for (auto &ws : m_localEvents) {
ws->invalidateCommonBinsFlag();
}
PARALLEL_FOR_NO_WSP_CHECK()
for (auto group = 0; group < numberOfGroups; ++group) {
for (auto idx = groupBoundaries[group]; idx < groupBoundaries[group + 1];
++idx) {
const auto &event = m_receivedEventBuffer[idx];
const auto &pulse = m_receivedPulseBuffer[event.pulseIndex];
auto *spectrum =
m_localEvents[pulse.periodNumber]->getSpectrumUnsafe(event.wsIdx);
// nanoseconds to microseconds
spectrum->addEventQuickly(
TofEvent(static_cast<double>(event.tof) * 1e3, pulse.pulseTime));
}
}
}
/* Clear buffers */
m_receivedPulseBuffer.clear();
m_receivedEventBuffer.clear();
const auto endTime = std::chrono::system_clock::now();
const std::chrono::duration<double> dur = endTime - startTime;
g_log.notice() << "Time to populate EventWorkspace: " << dur.count() << '\n';
totalPopulateWorkspaceDuration += dur.count();
numPopulateWorkspaceCalls += 1;
} // namespace LiveData
/**
* Get sample environment log data from the flatbuffer and append it to the
* workspace
......@@ -261,8 +473,7 @@ void KafkaEventStreamDecoder::eventDataFromMessage(const std::string &buffer) {
* @param mutableRunInfo : Log manager containing the existing sample logs
*/
void KafkaEventStreamDecoder::sampleDataFromMessage(const std::string &buffer) {
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> workspaceLock(m_mutex);
// Add sample log values to every workspace for every period
for (const auto &periodBuffer : m_localEvents) {
auto &mutableRunInfo = periodBuffer->mutableRun();
......@@ -309,7 +520,6 @@ void KafkaEventStreamDecoder::sampleDataFromMessage(const std::string &buffer) {
*/
void KafkaEventStreamDecoder::initLocalCaches(
const std::string &rawMsgBuffer, const RunStartStruct &runStartData) {
if (rawMsgBuffer.empty()) {
throw std::runtime_error("KafkaEventStreamDecoder::initLocalCaches() - "
"Empty message received from spectrum-detector "
......@@ -358,7 +568,8 @@ void KafkaEventStreamDecoder::initLocalCaches(
new Kernel::TimeSeriesProperty<double>(PROTON_CHARGE_PROPERTY));
// Cache spec->index mapping. We assume it is the same across all periods
m_specToIdx = eventBuffer->getSpectrumToWorkspaceIndexMap();
m_specToIdx =
eventBuffer->getSpectrumToWorkspaceIndexVector(m_specToIdxOffset);
// Buffers for each period
const size_t nperiods = runStartData.nPeriods;
......@@ -368,7 +579,7 @@ void KafkaEventStreamDecoder::initLocalCaches(
"an error by the data producer");
}
{
std::lock_guard<std::mutex> lock(m_mutex);
std::lock_guard<std::mutex> workspaceLock(m_mutex);
m_localEvents.resize(nperiods);
m_localEvents[0] = eventBuffer;
for (size_t i = 1; i < nperiods; ++i) {
......@@ -381,6 +592,48 @@ void KafkaEventStreamDecoder::initLocalCaches(
m_dataReset = true;
}
std::vector<size_t> computeGroupBoundaries(
const std::vector<Mantid::LiveData::KafkaEventStreamDecoder::BufferedEvent>
&eventBuffer,
const size_t numberOfGroups) {
std::vector<size_t> groupBoundaries(numberOfGroups + 1);
/* Fill the buffer with the end of the event buffer index */
std::fill(groupBoundaries.begin(), groupBoundaries.end(), eventBuffer.size());
/* First group always starts at beginning of buffer */
groupBoundaries[0] = 0;
const auto eventsPerGroup =
std::max<size_t>(1, eventBuffer.size() / numberOfGroups);
/* Iterate over groups */
for (size_t group = 1; group < numberOfGroups; ++group) {
/* Calculate a reasonable end boundary for the group */
groupBoundaries[group] = groupBoundaries[group - 1] + eventsPerGroup - 1;