Newer
Older
#include "MantidLiveData/Kafka/KafkaEventStreamDecoder.h"
#include "MantidAPI/AlgorithmManager.h"
#include "MantidAPI/Axis.h"
#include "MantidAPI/WorkspaceGroup.h"
#include "MantidKernel/DateAndTime.h"
#include "MantidKernel/TimeSeriesProperty.h"
#include "MantidKernel/UnitFactory.h"
#include "MantidKernel/WarningSuppressions.h"
#include "MantidKernel/make_unique.h"
#include "MantidLiveData/Exception.h"
GCC_DIAG_OFF(conversion)
#include "private/Schema/ba57_run_info_generated.h"
#include "private/Schema/df12_det_spec_map_generated.h"
#include "private/Schema/ev42_events_generated.h"
#include "private/Schema/is84_isis_events_generated.h"
GCC_DIAG_ON(conversion)
Mantid::Kernel::Logger g_log("KafkaEventStreamDecoder");
std::string PROTON_CHARGE_PROPERTY = "proton_charge";
std::string RUN_NUMBER_PROPERTY = "run_number";
std::string RUN_START_PROPERTY = "run_start";
namespace Mantid {
namespace LiveData {
using DataObjects::TofEvent;
using Kernel::DateAndTime;
// -----------------------------------------------------------------------------
// Public members
// -----------------------------------------------------------------------------
/**
* Constructor
* @param broker A reference to a Broker object for creating topic streams
* @param eventTopic The name of the topic streaming the event data
* @param spDetTopic The name of the topic streaming the spectrum-detector
KafkaEventStreamDecoder::KafkaEventStreamDecoder(
std::shared_ptr<IKafkaBroker> broker, const std::string &eventTopic,
const std::string &runInfoTopic, const std::string &spDetTopic)
: m_broker(broker), m_eventTopic(eventTopic), m_runInfoTopic(runInfoTopic),
m_spDetTopic(spDetTopic), m_interrupt(false), m_localEvents(),
m_specToIdx(), m_runStart(), m_runNumber(-1), m_thread(),
m_capturing(false), m_exception(), m_extractWaiting(false) {}
/**
* Destructor.
* Stops capturing from the stream
*/
KafkaEventStreamDecoder::~KafkaEventStreamDecoder() { stopCapture(); }
/**
* Start capturing from the stream on a separate thread. This is a non-blocking
* call and will return after the thread has started
*/
void KafkaEventStreamDecoder::startCapture(bool startNow) {
// If we are not starting now, then we want to start at the start of the run
// Get last two messages in run topic to ensure we get a runStart message
m_broker->subscribe({m_runInfoTopic}, subscribeAtOption::LASTTWO);
std::string rawMsgBuffer;
auto runStartData = getRunStartMessage(rawMsgBuffer);
auto startTimeMilliseconds =
runStartData.startTime * 1000; // seconds to milliseconds
m_eventStream =
m_broker->subscribe({m_eventTopic, m_runInfoTopic},
startTimeMilliseconds, subscribeAtOption::TIME);
m_eventStream =
m_broker->subscribe({m_eventTopic}, subscribeAtOption::LATEST);
// Get last two messages in run topic to ensure we get a runStart message
m_runStream =
m_broker->subscribe({m_runInfoTopic}, subscribeAtOption::LASTTWO);
m_spDetStream =
m_broker->subscribe({m_spDetTopic}, subscribeAtOption::LASTONE);
auto m_thread = std::thread([this]() { this->captureImpl(); });
m_thread.detach();
}
/**
* Stop capturing from the stream. This is a blocking call until the capturing
* function has completed
*/
void KafkaEventStreamDecoder::stopCapture() noexcept {
// This will interrupt the "event" loop
m_interrupt = true;
// Wait until the function has completed. The background thread
// will exit automatically
std::this_thread::sleep_for(std::chrono::milliseconds(50));
};
}
/**
* Check if there is data available to extract
* @return True if data has been accumulated so that extractData()
* can be called, false otherwise
*/
bool KafkaEventStreamDecoder::hasData() const noexcept {
std::lock_guard<std::mutex> lock(m_mutex);
return !m_localEvents.empty();
}
/**
* Check if a message has indicated that end of run has been reached
* @return True if end of run has been reached
*/
bool KafkaEventStreamDecoder::hasReachedEndOfRun() noexcept {
// Notify the decoder that MonitorLiveData knows it has reached end of run
// and after giving it opportunity to interrupt, decoder can continue with
// messages of the next run
if (!m_extractedEndRunData)
return false;
if (m_endRun) {
std::lock_guard<std::mutex> runStatusLock(m_runStatusMutex);
m_runStatusSeen = true;
m_cvRunStatus.notify_one();
}
return m_endRun;
}
/**
* Check for an exception thrown by the background thread and rethrow
* it if necessary. If no error occurred swap the current internal buffer
* for a fresh one and return the old buffer.
* @return A pointer to the data collected since the last call to this
* method
*/
API::Workspace_sptr KafkaEventStreamDecoder::extractData() {
if (m_exception) {
throw * m_exception;
}
m_extractWaiting = true;
auto workspace_ptr = extractDataImpl();
m_extractWaiting = false;
return workspace_ptr;
}
// -----------------------------------------------------------------------------
// Private members
// -----------------------------------------------------------------------------
API::Workspace_sptr KafkaEventStreamDecoder::extractDataImpl() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_localEvents.size() == 1) {
auto temp = createBufferWorkspace(m_localEvents.front());
std::swap(m_localEvents.front(), temp);
return temp;
} else if (m_localEvents.size() > 1) {
auto group = boost::make_shared<API::WorkspaceGroup>();
size_t index(0);
for (auto &filledBuffer : m_localEvents) {
auto temp = createBufferWorkspace(filledBuffer);
std::swap(m_localEvents[index++], temp);
group->addWorkspace(temp);
}
return group;
throw Exception::NotYet("Local buffers not initialized.");
* Start decoding data from the streams into the internal buffers.
* Implementation designed to be entry point for new thread of execution.
* It catches all thrown exceptions.
void KafkaEventStreamDecoder::captureImpl() noexcept {
m_capturing = true;
try {
captureImplExcept();
} catch (std::exception &exc) {
m_exception = boost::make_shared<std::runtime_error>(exc.what());
} catch (...) {
m_exception = boost::make_shared<std::runtime_error>(
"KafkaEventStreamDecoder: Unknown exception type caught.");
m_capturing = false;
* Exception-throwing variant of captureImpl(). Do not call this directly
void KafkaEventStreamDecoder::captureImplExcept() {
g_log.debug("Event capture starting");
initLocalCaches();
// File identifiers from flatbuffers schema
std::string runMessageID = "ba57";
std::string eventMessageID = "ev42";
m_interrupt = false;
m_endRun = false;
m_runStatusSeen = false;
m_extractedEndRunData = true;
std::string buffer;
// Set to true when a runStop message has been received and we need to start
// checking if we have allowed enough time for late messages (0.5 seconds?)
bool runStopping = false;
if (runStopping) {
// todo if clock has reached max latency time
m_endRun = true;
// If we've reached the end of a run then set m_extractWaiting to true
// so that we wait until the buffer is emptied before continuing.
// Otherwise we can end up with data from two different runs in the
// same buffer workspace which is problematic if the user wanted the
// "Stop" or "Rename" run transition option.
m_extractWaiting = true;
m_extractedEndRunData = false;
g_log.debug() << "Reached end of run in data stream." << std::endl;
// If extractData method is waiting for access to the buffer workspace
// then we wait for it to finish
std::unique_lock<std::mutex> readyLock(m_waitMutex);
if (m_extractWaiting) {
m_cv.wait(readyLock, [&] { return !m_extractWaiting; });
readyLock.unlock();
if (m_endRun) {
m_extractedEndRunData = true;
// Wait until MonitorLiveData has seen that end of run was
// reached before setting m_endRun back to false and continuing
std::unique_lock<std::mutex> runStatusLock(m_runStatusMutex);
m_cvRunStatus.wait(runStatusLock, [&] { return m_runStatusSeen; });
m_endRun = false;
m_runStatusSeen = false;
runStatusLock.unlock();
// Give time for MonitorLiveData to act on runStatus information
// and trigger m_interrupt for next loop iteration if user requested
// LiveData algorithm to stop at the end of the run
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
// Pull in events
m_eventStream->consumeMessage(&buffer);
// No events, wait for some to come along...
if (buffer.empty())
continue;
// Check if we have an event message
// Most will be event messages so we check for this type first
if (flatbuffers::BufferHasIdentifier(
reinterpret_cast<const uint8_t *>(buffer.c_str()),
eventMessageID.c_str())) {
auto eventMsg =
GetEventMessage(reinterpret_cast<const uint8_t *>(buffer.c_str()));
DateAndTime pulseTime = static_cast<int64_t>(eventMsg->pulse_time());
const auto &tofData = *(eventMsg->time_of_flight());
const auto &detData = *(eventMsg->detector_id());
auto nEvents = tofData.size();
if (eventMsg->facility_specific_data_type() != FacilityData_ISISData) {
throw std::runtime_error("KafkaEventStreamDecoder only knows how to "
"deal with ISIS facility specific data");
}
auto ISISMsg =
static_cast<const ISISData *>(eventMsg->facility_specific_data());
std::lock_guard<std::mutex> lock(m_mutex);
auto &periodBuffer =
*m_localEvents[static_cast<size_t>(ISISMsg->period_number())];
auto &mutableRunInfo = periodBuffer.mutableRun();
mutableRunInfo.getTimeSeriesProperty<double>(PROTON_CHARGE_PROPERTY)
->addValue(pulseTime, ISISMsg->proton_charge());
for (decltype(nEvents) i = 0; i < nEvents; ++i) {
auto &spectrum = periodBuffer.getSpectrum(m_specToIdx[detData[i]]);
spectrum.addEventQuickly(TofEvent(static_cast<double>(tofData[i]) *
1e-9, // nanoseconds to seconds
pulseTime));
}
}
// Check if we have a runMessage
if (flatbuffers::BufferHasIdentifier(
reinterpret_cast<const uint8_t *>(buffer.c_str()),
runMessageID.c_str())) {
// Check if we have a runStop message
auto runMsg =
GetRunInfo(reinterpret_cast<const uint8_t *>(buffer.c_str()));
if (runMsg->info_type_type() == InfoTypes_RunStop) {
runStopping = true;
// todo start the stopwatch
}
}
KafkaEventStreamDecoder::RunStartStruct
KafkaEventStreamDecoder::getRunStartMessage(std::string &rawMsgBuffer) {
getRunInfoMessage(rawMsgBuffer);
auto runMsg =
GetRunInfo(reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str()));
if (runMsg->info_type_type() != InfoTypes_RunStart) {
// We want a runStart message, try the next one
getRunInfoMessage(rawMsgBuffer);
runMsg =
GetRunInfo(reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str()));
if (runMsg->info_type_type() != InfoTypes_RunStart) {
throw std::runtime_error("KafkaEventStreamDecoder::initLocalCaches() - "
"Could not find a run start message"
"in the run info topic. Unable to continue");
}
}
auto runStartData = static_cast<const RunStart *>(runMsg->info_type());
KafkaEventStreamDecoder::RunStartStruct runStart = {
runStartData->instrument_name()->str(), runStartData->run_number(),
runStartData->start_time(),
static_cast<size_t>(runStartData->n_periods())};
return runStart;
}
/**
* Pull information from the run & detector-spectrum stream and initialize
* the internal EventWorkspace buffer + other cached information such as run
* start. This includes loading the instrument.
* By the end of this method the local event buffer is ready to accept
* events
*/
void KafkaEventStreamDecoder::initLocalCaches() {
std::string rawMsgBuffer;
// Load spectra-detector mapping from stream
m_spDetStream->consumeMessage(&rawMsgBuffer);
throw std::runtime_error("KafkaEventStreamDecoder::initLocalCaches() - "
"Empty message received from spectrum-detector "
"topic. Unable to continue");
}
auto spDetMsg = GetSpectraDetectorMapping(
reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str()));
auto nspec = spDetMsg->spectrum()->size();
auto nudet = spDetMsg->detector_id()->size();
if (nudet != nspec) {
std::ostringstream os;
os << "KafkaEventStreamDecoder::initLocalEventBuffer() - Invalid "
"spectra/detector mapping. Expected matched length arrays but "
"found nspec="
<< nspec << ", ndet=" << nudet;
throw std::runtime_error(os.str());
}
auto eventBuffer = createBufferWorkspace(
static_cast<size_t>(spDetMsg->n_spectra()), spDetMsg->spectrum()->data(),
spDetMsg->detector_id()->data(), nudet);
auto runStartData = getRunStartMessage(rawMsgBuffer);
// Load the instrument if possible but continue if we can't
auto instName = runStartData.instrumentName;
if (instName.size() > 0)
loadInstrument(instName, eventBuffer);
"Empty instrument name received. Continuing without instrument");
auto &mutableRun = eventBuffer->mutableRun();
// Run start. Cache locally for computing frame times
auto runStartTime = static_cast<time_t>(runStartData.startTime);
char timeString[32];
strftime(timeString, 32, "%Y-%m-%dT%H:%M:%S", localtime(&runStartTime));
m_runStart.setFromISO8601(timeString, false);
// Run number
mutableRun.addProperty(RUN_START_PROPERTY, std::string(timeString));
m_runNumber = runStartData.runNumber;
mutableRun.addProperty(RUN_NUMBER_PROPERTY, std::to_string(m_runNumber));
// Create the proton charge property
mutableRun.addProperty(
new Kernel::TimeSeriesProperty<double>(PROTON_CHARGE_PROPERTY));
// Cache spec->index mapping. We assume it is the same across all periods
m_specToIdx = eventBuffer->getSpectrumToWorkspaceIndexMap();
const size_t nperiods = runStartData.nPeriods;
if (nperiods == 0) {
throw std::runtime_error(
"KafkaEventStreamDecoder - Message has n_periods==0. This is "
"an error by the data producer");
}
std::lock_guard<std::mutex> lock(m_mutex);
m_localEvents.resize(nperiods);
m_localEvents[0] = eventBuffer;
for (size_t i = 1; i < nperiods; ++i) {
// A clone should be cheap here as there are no events yet
m_localEvents[i] = eventBuffer->clone();
}
/**
* Try to get a runInfo message from Kafka, throw error if it fails
* @param rawMsgBuffer : string to use as message buffer
*/
void KafkaEventStreamDecoder::getRunInfoMessage(std::string &rawMsgBuffer) {
m_runStream->consumeMessage(&rawMsgBuffer);
if (rawMsgBuffer.empty()) {
throw std::runtime_error("KafkaEventStreamDecoder::initLocalCaches() - "
"Empty message received from run info "
"topic. Unable to continue");
}
}
* Create a buffer workspace of the correct size based on the values given.
* @param nspectra The number of unique spectrum numbers
* @param spec An array of length ndet specifying the spectrum number of each
* detector
* @param udet An array of length ndet specifying the detector ID of each
* detector
* @param length The length of the spec/udet arrays
* @return A new workspace of the appropriate size
*/
DataObjects::EventWorkspace_sptr KafkaEventStreamDecoder::createBufferWorkspace(
const size_t nspectra, const int32_t *spec, const int32_t *udet,
const uint32_t length) {
// Order is important here
std::map<int32_t, std::set<int32_t>> spdetMap;
for (uint32_t i = 0; i < length; ++i) {
auto specNo = spec[i];
auto detId = udet[i];
auto search = spdetMap.find(specNo);
if (search != spdetMap.end()) {
search->second.insert(detId);
} else {
spdetMap.insert({specNo, {detId}});
}
}
assert(nspectra == spdetMap.size());
// Create event workspace
auto eventBuffer = boost::static_pointer_cast<DataObjects::EventWorkspace>(
API::WorkspaceFactory::Instance().create("EventWorkspace", nspectra, 2,
1));
// Set the units
eventBuffer->getAxis(0)->unit() =
Kernel::UnitFactory::Instance().create("TOF");
eventBuffer->setYUnit("Counts");
// Setup spectra-detector mapping.
size_t wsIdx(0);
for (const auto &spIter : spdetMap) {
auto &spectrum = eventBuffer->getSpectrum(wsIdx);
spectrum.setSpectrumNo(spIter.first);
spectrum.addDetectorIDs(spIter.second);
++wsIdx;
}
return eventBuffer;
/**
* Create new buffer workspace from an existing copy
* @param parent A pointer to an existing workspace
*/
DataObjects::EventWorkspace_sptr KafkaEventStreamDecoder::createBufferWorkspace(
const DataObjects::EventWorkspace_sptr &parent) {
auto buffer = boost::static_pointer_cast<DataObjects::EventWorkspace>(
API::WorkspaceFactory::Instance().create(
"EventWorkspace", parent->getNumberHistograms(), 2, 1));
// Copy meta data
API::WorkspaceFactory::Instance().initializeFromParent(*parent, *buffer,
false);
// Clear out the old logs, except for the most recent entry
buffer->mutableRun().clearOutdatedTimeSeriesLogValues();
return buffer;
}
/**
* Run LoadInstrument for the given instrument name. If it cannot succeed it
* does nothing to the internal workspace
* @param name Name of an instrument to load
* @param workspace A pointer to the workspace receiving the instrument
*/
void KafkaEventStreamDecoder::loadInstrument(
const std::string &name, DataObjects::EventWorkspace_sptr workspace) {
if (name.empty()) {
g_log.warning("Empty instrument name found");
return;
}
try {
auto alg =
API::AlgorithmManager::Instance().createUnmanaged("LoadInstrument");
// Do not put the workspace in the ADS
alg->setChild(true);
alg->initialize();
alg->setPropertyValue("InstrumentName", name);
alg->setProperty("Workspace", workspace);
alg->setProperty("RewriteSpectraMap", Kernel::OptionalBool(false));
alg->execute();
} catch (std::exception &exc) {
g_log.warning() << "Error loading instrument '" << name
<< "': " << exc.what() << "\n";
}