Newer
Older
#include "MantidLiveData/Kafka/KafkaEventStreamDecoder.h"
#include "MantidAPI/AlgorithmManager.h"
#include "MantidAPI/Axis.h"
#include "MantidAPI/WorkspaceGroup.h"
#include "MantidKernel/DateAndTimeHelpers.h"
#include "MantidKernel/TimeSeriesProperty.h"
#include "MantidKernel/UnitFactory.h"
#include "MantidKernel/WarningSuppressions.h"
#include "MantidKernel/make_unique.h"
#include "MantidLiveData/Exception.h"
#include "MantidLiveData/Kafka/KafkaTopicSubscriber.h"
GCC_DIAG_OFF(conversion)
#include "private/Schema/ba57_run_info_generated.h"
#include "private/Schema/df12_det_spec_map_generated.h"
#include "private/Schema/ev42_events_generated.h"
#include "private/Schema/f142_logdata_generated.h"
#include "private/Schema/is84_isis_events_generated.h"
GCC_DIAG_ON(conversion)
Mantid::Kernel::Logger g_log("KafkaEventStreamDecoder");
const std::string PROTON_CHARGE_PROPERTY = "proton_charge";
const std::string RUN_NUMBER_PROPERTY = "run_number";
const std::string RUN_START_PROPERTY = "run_start";
// File identifiers from flatbuffers schema
const std::string RUN_MESSAGE_ID = "ba57";
const std::string EVENT_MESSAGE_ID = "ev42";
const std::string SAMPLE_MESSAGE_ID = "f142";
const std::chrono::seconds MAX_LATENCY(1);
/**
* Append sample log data to existing log or create a new log if one with
* specified name does not already exist
*
* @tparam T : Type of the log value
* @param mutableRunInfo : Log manager containing the existing sample logs
* @param name : Name of the sample log
* @param time : Time at which the value was measured
* @param value : Sample log measured value
*/
template <typename T>
void appendToLog(Mantid::API::Run &mutableRunInfo, const std::string &name,
const Core::DateAndTime &time, T value) {
if (mutableRunInfo.hasProperty(name)) {
auto property = mutableRunInfo.getTimeSeriesProperty<T>(name);
property->addValue(time, value);
} else {
auto property = new Mantid::Kernel::TimeSeriesProperty<T>(name);
property->addValue(time, value);
mutableRunInfo.addLogData(property);
}
}
namespace Mantid {
namespace LiveData {
using Types::Event::TofEvent;
using Types::Core::DateAndTime;
// -----------------------------------------------------------------------------
// Public members
// -----------------------------------------------------------------------------
/**
* Constructor
* @param broker A reference to a Broker object for creating topic streams
* @param eventTopic The name of the topic streaming the event data
* @param spDetTopic The name of the topic streaming the spectrum-detector
KafkaEventStreamDecoder::KafkaEventStreamDecoder(
std::shared_ptr<IKafkaBroker> broker, const std::string &eventTopic,
const std::string &runInfoTopic, const std::string &spDetTopic,
const std::string &sampleEnvTopic)
: m_broker(broker), m_eventTopic(eventTopic), m_runInfoTopic(runInfoTopic),
m_spDetTopic(spDetTopic), m_sampleEnvTopic(sampleEnvTopic),
m_interrupt(false), m_localEvents(), m_specToIdx(), m_runStart(),
m_runNumber(-1), m_thread(), m_capturing(false), m_exception(),
m_extractWaiting(false), m_cbIterationEnd([] {}), m_cbError([] {}) {}
/**
* Destructor.
* Stops capturing from the stream
*/
KafkaEventStreamDecoder::~KafkaEventStreamDecoder() { stopCapture(); }
/**
* Start capturing from the stream on a separate thread. This is a non-blocking
* call and will return after the thread has started
*/
void KafkaEventStreamDecoder::startCapture(bool startNow) {
// If we are not starting now, then we want to start at the start of the run
// Get last two messages in run topic to ensure we get a runStart message
m_broker->subscribe({m_runInfoTopic}, SubscribeAtOption::LASTTWO);
std::string rawMsgBuffer;
auto runStartData = getRunStartMessage(rawMsgBuffer);
joinEventStreamAtTime(runStartData);
m_eventStream =
m_broker->subscribe({m_eventTopic, m_runInfoTopic, m_sampleEnvTopic},
SubscribeAtOption::LATEST);
// Get last two messages in run topic to ensure we get a runStart message
m_runStream =
m_broker->subscribe({m_runInfoTopic}, SubscribeAtOption::LASTTWO);
m_broker->subscribe({m_spDetTopic}, SubscribeAtOption::LASTONE);
m_thread = std::thread([this]() { this->captureImpl(); });
/** Indicate if the next data to be extracted should replace LoadLiveData's output workspace,
* for example the first data of a new run
*/
bool KafkaEventStreamDecoder::dataReset() {
bool result = (m_dataReset == true); // copy from atomic bool
m_dataReset = false; // reset to false
return result;
}
void KafkaEventStreamDecoder::joinEventStreamAtTime(
const KafkaEventStreamDecoder::RunStartStruct &runStartData) {
auto runStartTime = runStartData.startTime;
int64_t startTimeMilliseconds = nanosecondsToMilliseconds(runStartTime);
m_eventStream =
m_broker->subscribe({m_eventTopic, m_runInfoTopic, m_sampleEnvTopic},
startTimeMilliseconds, SubscribeAtOption::TIME);
// make sure we listen to the run start topic starting from the run start
// message we already got the start time from
m_eventStream->seek(m_runInfoTopic, 0, runStartData.runStartMsgOffset);
}
int64_t KafkaEventStreamDecoder::nanosecondsToMilliseconds(
uint64_t timeNanoseconds) const {
return static_cast<int64_t>(timeNanoseconds / 1000000);
}
/**
* Stop capturing from the stream. This is a blocking call until the capturing
* function has completed
*/
void KafkaEventStreamDecoder::stopCapture() noexcept {
// This will interrupt the "event" loop
m_interrupt = true;
// Wait until the function has completed. The background thread
// will exit automatically
std::this_thread::sleep_for(std::chrono::milliseconds(50));
};
}
/**
* Check if there is data available to extract
* @return True if data has been accumulated so that extractData()
* can be called, false otherwise
*/
bool KafkaEventStreamDecoder::hasData() const noexcept {
std::lock_guard<std::mutex> lock(m_mutex);
return !m_localEvents.empty();
}
/**
* Check if a message has indicated that end of run has been reached
* @return True if end of run has been reached
*/
bool KafkaEventStreamDecoder::hasReachedEndOfRun() noexcept {
// Notify the decoder that MonitorLiveData knows it has reached end of run
// and after giving it opportunity to interrupt, decoder can continue with
// messages of the next run
if (!m_extractedEndRunData || m_extractWaiting)
return false;
if (m_endRun) {
std::lock_guard<std::mutex> runStatusLock(m_runStatusMutex);
m_runStatusSeen = true;
m_cvRunStatus.notify_one();
/**
* Check for an exception thrown by the background thread and rethrow
* it if necessary. If no error occurred swap the current internal buffer
* for a fresh one and return the old buffer.
* @return A pointer to the data collected since the last call to this
* method
*/
API::Workspace_sptr KafkaEventStreamDecoder::extractData() {
throw std::runtime_error(*m_exception);
m_extractWaiting = true;
auto workspace_ptr = extractDataImpl();
m_extractWaiting = false;
return workspace_ptr;
}
// -----------------------------------------------------------------------------
// Private members
// -----------------------------------------------------------------------------
API::Workspace_sptr KafkaEventStreamDecoder::extractDataImpl() {
std::lock_guard<std::mutex> lock(m_mutex);
if (m_localEvents.size() == 1) {
auto temp = createBufferWorkspace(m_localEvents.front());
std::swap(m_localEvents.front(), temp);
return temp;
} else if (m_localEvents.size() > 1) {
auto group = boost::make_shared<API::WorkspaceGroup>();
size_t index(0);
for (auto &filledBuffer : m_localEvents) {
auto temp = createBufferWorkspace(filledBuffer);
std::swap(m_localEvents[index++], temp);
group->addWorkspace(temp);
}
return group;
throw Exception::NotYet("Local buffers not initialized.");
* Start decoding data from the streams into the internal buffers.
* Implementation designed to be entry point for new thread of execution.
* It catches all thrown exceptions.
void KafkaEventStreamDecoder::captureImpl() noexcept {
m_capturing = true;
try {
captureImplExcept();
} catch (std::exception &exc) {
m_exception = boost::make_shared<std::runtime_error>(exc.what());
} catch (...) {
m_exception = boost::make_shared<std::runtime_error>(
"KafkaEventStreamDecoder: Unknown exception type caught.");
m_capturing = false;
* Exception-throwing variant of captureImpl(). Do not call this directly
void KafkaEventStreamDecoder::captureImplExcept() {
g_log.debug("Event capture starting");
// Load spectra-detector and runstart struct then initial cache variables
std::string buffer;
int64_t offset;
int32_t partition;
std::string topicName;
m_spDetStream->consumeMessage(&buffer, offset, partition, topicName);
auto runStartStruct = getRunStartMessage(runBuffer);
initLocalCaches(buffer, runStartStruct);
m_interrupt = false;
m_endRun = false;
m_runStatusSeen = false;
m_extractedEndRunData = true;
std::unordered_map<std::string, std::vector<int64_t>> stopOffsets;
std::unordered_map<std::string, std::vector<bool>> reachedEnd;
// True when should be checking if run stop offsets have been reached
bool checkOffsets = false;
if (m_endRun) {
waitForRunEndObservation();
continue;
} else {
waitForDataExtraction();
m_eventStream->consumeMessage(&buffer, offset, partition, topicName);
// No events, wait for some to come along...
Matthew D Jones
committed
if (buffer.empty()) {
m_cbIterationEnd();
Matthew D Jones
committed
}
if (checkOffsets) {
if (reachedEnd.count(topicName) &&
offset >= stopOffsets[topicName][static_cast<size_t>(partition)]) {
reachedEnd[topicName][static_cast<size_t>(partition)] = true;
if (offset == stopOffsets[topicName][static_cast<size_t>(partition)]) {
g_log.debug() << "Reached end-of-run in " << topicName << " topic."
<< std::endl;
g_log.debug()
<< "topic: " << topicName << " offset: " << offset
<< " stopOffset: "
<< stopOffsets[topicName][static_cast<size_t>(partition)]
<< std::endl;
}
checkIfAllStopOffsetsReached(reachedEnd, checkOffsets);
if (offset > stopOffsets[topicName][static_cast<size_t>(partition)]) {
// If the offset is beyond the end of the current run, then skip to
// the next iteration and don't process the message
// Check if we have an event message
// Most will be event messages so we check for this type first
if (flatbuffers::BufferHasIdentifier(
reinterpret_cast<const uint8_t *>(buffer.c_str()),
EVENT_MESSAGE_ID.c_str())) {
eventDataFromMessage(buffer);
}
// Check if we have a sample environment log message
else if (flatbuffers::BufferHasIdentifier(
reinterpret_cast<const uint8_t *>(buffer.c_str()),
SAMPLE_MESSAGE_ID.c_str())) {
sampleDataFromMessage(buffer);
// Check if we have a runMessage
else if (flatbuffers::BufferHasIdentifier(
reinterpret_cast<const uint8_t *>(buffer.c_str()),
RUN_MESSAGE_ID.c_str())) {
auto runMsg =
GetRunInfo(reinterpret_cast<const uint8_t *>(buffer.c_str()));
if (!checkOffsets && runMsg->info_type_type() == InfoTypes_RunStop) {
auto runStopMsg = static_cast<const RunStop *>(runMsg->info_type());
auto stopTime = runStopMsg->stop_time();
g_log.debug() << "Received an end-of-run message with stop time = "
<< stopTime << std::endl;
stopOffsets = getStopOffsets(stopOffsets, reachedEnd, stopTime);
checkOffsets = true;
checkIfAllStopOffsetsReached(reachedEnd, checkOffsets);
/**
* Check if we've reached the stop offset on every partition of every topic
*
* @param reachedEnd : Bool for each topic and partition to mark when stop
* offset reached
*/
void KafkaEventStreamDecoder::checkIfAllStopOffsetsReached(
const std::unordered_map<std::string, std::vector<bool>> &reachedEnd,
bool &checkOffsets) {
if (std::all_of(reachedEnd.cbegin(), reachedEnd.cend(),
[](std::pair<std::string, std::vector<bool>> kv) {
return std::all_of(
kv.second.cbegin(), kv.second.cend(),
[](bool partitionEnd) { return partitionEnd; });
m_endRun = true;
// If we've reached the end of a run then set m_extractWaiting to true
// so that we wait until the buffer is emptied before continuing.
// Otherwise we can end up with data from two different runs in the
// same buffer workspace which is problematic if the user wanted the
// "Stop" or "Rename" run transition option.
m_extractedEndRunData = false;
checkOffsets = false;
g_log.notice("Reached end of run in data streams.");
}
}
std::unordered_map<std::string, std::vector<int64_t>>
KafkaEventStreamDecoder::getStopOffsets(
std::unordered_map<std::string, std::vector<int64_t>> &stopOffsets,
std::unordered_map<std::string, std::vector<bool>> &reachedEnd,
uint64_t stopTime) const {
reachedEnd.clear();
stopOffsets.clear();
// Wait for max latency so that we don't miss any late messages
std::this_thread::sleep_for(MAX_LATENCY);
stopOffsets = m_eventStream->getOffsetsForTimestamp(
static_cast<int64_t>(stopTime / 1000000));
// /1000000 to convert nanosecond precision from message to millisecond
// precision which Kafka offset query supports
auto currentOffsets = m_eventStream->getCurrentOffsets();
// Set reachedEnd to false for each topic and partition
for (auto &topicOffsets : stopOffsets) {
auto topicName = topicOffsets.first;
if (topicName.substr(topicName.length() -
KafkaTopicSubscriber::RUN_TOPIC_SUFFIX.length()) !=
KafkaTopicSubscriber::RUN_TOPIC_SUFFIX) {
g_log.debug() << "TOPIC: " << topicName
<< " PARTITIONS: " << topicOffsets.second.size()
<< std::endl;
{topicName, std::vector<bool>(topicOffsets.second.size(), false)});
auto &partitionOffsets = topicOffsets.second;
for (uint32_t partitionNumber = 0;
partitionNumber < partitionOffsets.size(); partitionNumber++) {
auto offset = partitionOffsets[partitionNumber];
// If the stop offset is negative then there are no messages for us
// to collect on this topic, so mark reachedEnd as true already
reachedEnd[topicName][partitionNumber] = offset < 0;
// If the stop offset has already been reached then mark reachedEnd as
// true
if (currentOffsets[topicName][partitionNumber] >= offset)
reachedEnd[topicName][partitionNumber] = true;
}
}
}
return stopOffsets;
}
/**
* If extractData method is waiting for access to the buffer workspace
* then we wait for it to finish
*/
void KafkaEventStreamDecoder::waitForDataExtraction() {
{
std::unique_lock<std::mutex> readyLock(m_waitMutex);
m_cv.wait(readyLock, [&] { return !m_extractWaiting; });
}
}
void KafkaEventStreamDecoder::waitForRunEndObservation() {
m_extractWaiting = true;
// Mark extractedEndRunData true before waiting on the extraction to ensure
// an immediate request for run status after extracting the data will return
// the correct value - avoids race condition in MonitorLiveData and tests
m_extractedEndRunData = true;
waitForDataExtraction();
// Wait until MonitorLiveData has seen that end of run was
// reached before setting m_endRun back to false and continuing
std::unique_lock<std::mutex> runStatusLock(m_runStatusMutex);
m_cvRunStatus.wait(runStatusLock, [&] { return m_runStatusSeen; });
m_endRun = false;
m_runStatusSeen = false;
runStatusLock.unlock();
// Set to zero until we have the new run number, MonitorLiveData will
// queries before each time it extracts data until it gets non-zero
m_runNumber = 0;
// Get new run message now so that new run number is available for
// MonitorLiveData as early as possible
RunStartStruct runStartStruct;
if (waitForNewRunStartMessage(runStartStruct))
return;
// Give time for MonitorLiveData to act on runStatus information
// and trigger m_interrupt for next loop iteration if user requested
// LiveData algorithm to stop at the end of the run
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (m_interrupt)
return;
// Rejoin event stream at start of new run
joinEventStreamAtTime(runStartStruct);
std::string detSpecMapMsgBuffer = getDetSpecMapForRun(runStartStruct);
initLocalCaches(detSpecMapMsgBuffer, runStartStruct);
}
std::string KafkaEventStreamDecoder::getDetSpecMapForRun(
const KafkaEventStreamDecoder::RunStartStruct &runStartStruct) {
std::string rawMsgBuffer;
int64_t offset;
int32_t partition;
std::string topicName;
m_spDetStream = m_broker->subscribe(
{m_spDetTopic}, nanosecondsToMilliseconds(runStartStruct.startTime),
SubscribeAtOption::TIME);
m_spDetStream->consumeMessage(&rawMsgBuffer, offset, partition, topicName);
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
if (rawMsgBuffer.empty()) {
std::runtime_error(
"No detector-spectrum map message found for run number " +
std::to_string(runStartStruct.runNumber));
}
return rawMsgBuffer;
}
bool KafkaEventStreamDecoder::waitForNewRunStartMessage(
RunStartStruct &runStartStructOutput) {
while (!m_interrupt) {
std::string runMsgBuffer;
int64_t offset;
int32_t partition;
std::string topicName;
m_runStream->consumeMessage(&runMsgBuffer, offset, partition, topicName);
if (runMsgBuffer.empty()) {
continue;
} else {
auto runMsg =
GetRunInfo(reinterpret_cast<const uint8_t *>(runMsgBuffer.c_str()));
if (runMsg->info_type_type() == InfoTypes_RunStart) {
auto runStartData = static_cast<const RunStart *>(runMsg->info_type());
KafkaEventStreamDecoder::RunStartStruct runStartStruct = {
runStartData->instrument_name()->str(), runStartData->run_number(),
runStartData->start_time(),
static_cast<size_t>(runStartData->n_periods()), offset};
if (runStartStruct.runNumber > m_runNumber) {
runStartStructOutput = runStartStruct;
m_runNumber = runStartStruct.runNumber;
return false; // not interrupted
}
} else {
continue;
}
}
}
return true; // interrupted
/**
* Get sample environment log data from the flatbuffer and append it to the
* workspace
*
* @param seData : flatbuffer offset of the sample environment log data
* @param nSEEvents : number of sample environment log values in the flatbuffer
* @param mutableRunInfo : Log manager containing the existing sample logs
*/
void KafkaEventStreamDecoder::sampleDataFromMessage(const std::string &buffer) {
std::lock_guard<std::mutex> lock(m_mutex);
// Add sample log values to every workspace for every period
for (const auto &periodBuffer : m_localEvents) {
auto &mutableRunInfo = periodBuffer->mutableRun();
auto seEvent =
GetLogData(reinterpret_cast<const uint8_t *>(buffer.c_str()));
auto name = seEvent->source_name()->str();
// Convert time from nanoseconds since 1 Jan 1970 to nanoseconds since 1 Jan
// 1990 to create a Mantid timestamp
const int64_t nanoseconds1970To1990 = 631152000000000000L;
auto time = Core::DateAndTime(static_cast<int64_t>(seEvent->timestamp()) -
nanoseconds1970To1990);
// If sample log with this name already exists then append to it
// otherwise create a new log
if (seEvent->value_type() == Value_Int) {
auto value = static_cast<const Int *>(seEvent->value());
appendToLog<int32_t>(mutableRunInfo, name, time, value->value());
} else if (seEvent->value_type() == Value_Long) {
auto value = static_cast<const Long *>(seEvent->value());
appendToLog<int64_t>(mutableRunInfo, name, time, value->value());
} else if (seEvent->value_type() == Value_Double) {
auto value = static_cast<const Double *>(seEvent->value());
appendToLog<double>(mutableRunInfo, name, time, value->value());
} else if (seEvent->value_type() == Value_Float) {
auto value = static_cast<const Float *>(seEvent->value());
appendToLog<double>(mutableRunInfo, name, time,
static_cast<double>(value->value()));
} else {
g_log.warning() << "Value for sample log named '" << name
<< "' was not of recognised type" << std::endl;
}
}
}
void KafkaEventStreamDecoder::eventDataFromMessage(const std::string &buffer) {
auto eventMsg =
GetEventMessage(reinterpret_cast<const uint8_t *>(buffer.c_str()));
DateAndTime pulseTime = static_cast<int64_t>(eventMsg->pulse_time());
const auto &tofData = *(eventMsg->time_of_flight());
const auto &detData = *(eventMsg->detector_id());
auto nEvents = tofData.size();
DataObjects::EventWorkspace_sptr periodBuffer;
std::lock_guard<std::mutex> lock(m_mutex);
if (eventMsg->facility_specific_data_type() == FacilityData_ISISData) {
auto ISISMsg =
static_cast<const ISISData *>(eventMsg->facility_specific_data());
periodBuffer = m_localEvents[static_cast<size_t>(ISISMsg->period_number())];
auto &mutableRunInfo = periodBuffer->mutableRun();
mutableRunInfo.getTimeSeriesProperty<double>(PROTON_CHARGE_PROPERTY)
->addValue(pulseTime, ISISMsg->proton_charge());
} else {
periodBuffer = m_localEvents[0];
}
for (decltype(nEvents) i = 0; i < nEvents; ++i) {
auto &spectrum = periodBuffer->getSpectrum(
m_specToIdx[static_cast<int32_t>(detData[i])]);
spectrum.addEventQuickly(TofEvent(static_cast<double>(tofData[i]) *
1e-3, // nanoseconds to microseconds
pulseTime));
}
}
KafkaEventStreamDecoder::RunStartStruct
KafkaEventStreamDecoder::getRunStartMessage(std::string &rawMsgBuffer) {
auto offset = getRunInfoMessage(rawMsgBuffer);
auto runMsg =
GetRunInfo(reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str()));
if (runMsg->info_type_type() != InfoTypes_RunStart) {
// We want a runStart message, try the next one
offset = getRunInfoMessage(rawMsgBuffer);
runMsg =
GetRunInfo(reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str()));
if (runMsg->info_type_type() != InfoTypes_RunStart) {
throw std::runtime_error("KafkaEventStreamDecoder::initLocalCaches() - "
"Could not find a run start message"
"in the run info topic. Unable to continue");
}
}
auto runStartData = static_cast<const RunStart *>(runMsg->info_type());
KafkaEventStreamDecoder::RunStartStruct runStart = {
runStartData->instrument_name()->str(), runStartData->run_number(),
runStartData->start_time(),
static_cast<size_t>(runStartData->n_periods()), offset};
return runStart;
}
/**
* Pull information from the run & detector-spectrum stream and initialize
* the internal EventWorkspace buffer + other cached information such as run
* start. This includes loading the instrument.
* By the end of this method the local event buffer is ready to accept
* events
*/
const std::string &rawMsgBuffer, const RunStartStruct &runStartData) {
throw std::runtime_error("KafkaEventStreamDecoder::initLocalCaches() - "
"Empty message received from spectrum-detector "
"topic. Unable to continue");
}
auto spDetMsg = GetSpectraDetectorMapping(
reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str()));
auto nspec = spDetMsg->spectrum()->size();
auto nudet = spDetMsg->detector_id()->size();
if (nudet != nspec) {
std::ostringstream os;
os << "KafkaEventStreamDecoder::initLocalEventBuffer() - Invalid "
"spectra/detector mapping. Expected matched length arrays but "
"found nspec=" << nspec << ", ndet=" << nudet;
throw std::runtime_error(os.str());
}
m_runNumber = runStartData.runNumber;
auto eventBuffer = createBufferWorkspace(
static_cast<size_t>(spDetMsg->n_spectra()), spDetMsg->spectrum()->data(),
spDetMsg->detector_id()->data(), nudet);
// Load the instrument if possible but continue if we can't
auto instName = runStartData.instrumentName;
if (!instName.empty())
loadInstrument(instName, eventBuffer);
"Empty instrument name received. Continuing without instrument");
auto &mutableRun = eventBuffer->mutableRun();
// Run start. Cache locally for computing frame times
// Convert nanoseconds to seconds (and discard the extra precision)
auto runStartTime = static_cast<time_t>(runStartData.startTime / 1000000000);
m_runStart.set_from_time_t(runStartTime);
auto timeString = m_runStart.toISO8601String();
// Run number
mutableRun.addProperty(RUN_START_PROPERTY, std::string(timeString));
mutableRun.addProperty(RUN_NUMBER_PROPERTY,
std::to_string(runStartData.runNumber));
// Create the proton charge property
mutableRun.addProperty(
new Kernel::TimeSeriesProperty<double>(PROTON_CHARGE_PROPERTY));
// Cache spec->index mapping. We assume it is the same across all periods
m_specToIdx = eventBuffer->getSpectrumToWorkspaceIndexMap();
const size_t nperiods = runStartData.nPeriods;
if (nperiods == 0) {
throw std::runtime_error(
"KafkaEventStreamDecoder - Message has n_periods==0. This is "
"an error by the data producer");
}
{
std::lock_guard<std::mutex> lock(m_mutex);
m_localEvents.resize(nperiods);
m_localEvents[0] = eventBuffer;
for (size_t i = 1; i < nperiods; ++i) {
// A clone should be cheap here as there are no events yet
m_localEvents[i] = eventBuffer->clone();
}
// New caches so LoadLiveData's output workspace needs to be replaced
m_dataReset = true;
/**
* Try to get a runInfo message from Kafka, throw error if it fails
* @param rawMsgBuffer : string to use as message buffer
*/
int64_t KafkaEventStreamDecoder::getRunInfoMessage(std::string &rawMsgBuffer) {
int64_t offset;
int32_t partition;
std::string topicName;
m_runStream->consumeMessage(&rawMsgBuffer, offset, partition, topicName);
if (rawMsgBuffer.empty()) {
throw std::runtime_error("KafkaEventStreamDecoder::getRunInfoMessage() - "
"Empty message received from run info "
"topic. Unable to continue");
}
if (!flatbuffers::BufferHasIdentifier(
reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str()),
RUN_MESSAGE_ID.c_str())) {
throw std::runtime_error("KafkaEventStreamDecoder::getRunInfoMessage() - "
"Received unexpected message type from run info "
"topic. Unable to continue");
}
return offset;
* Create a buffer workspace of the correct size based on the values given.
* @param nspectra The number of unique spectrum numbers
* @param spec An array of length ndet specifying the spectrum number of each
* detector
* @param udet An array of length ndet specifying the detector ID of each
* detector
* @param length The length of the spec/udet arrays
* @return A new workspace of the appropriate size
*/
DataObjects::EventWorkspace_sptr KafkaEventStreamDecoder::createBufferWorkspace(
const size_t nspectra, const int32_t *spec, const int32_t *udet,
const uint32_t length) {
// Order is important here
std::map<int32_t, std::set<int32_t>> spdetMap;
for (uint32_t i = 0; i < length; ++i) {
auto specNo = spec[i];
auto detId = udet[i];
auto search = spdetMap.find(specNo);
if (search != spdetMap.end()) {
search->second.insert(detId);
} else {
spdetMap.insert({specNo, {detId}});
}
}
assert(nspectra == spdetMap.size());
// Create event workspace
auto eventBuffer = boost::static_pointer_cast<DataObjects::EventWorkspace>(
API::WorkspaceFactory::Instance().create("EventWorkspace", nspectra, 2,
1));
// Set the units
eventBuffer->getAxis(0)->unit() =
Kernel::UnitFactory::Instance().create("TOF");
eventBuffer->setYUnit("Counts");
// Setup spectra-detector mapping.
size_t wsIdx(0);
for (const auto &spIter : spdetMap) {
auto &spectrum = eventBuffer->getSpectrum(wsIdx);
spectrum.setSpectrumNo(spIter.first);
spectrum.addDetectorIDs(spIter.second);
++wsIdx;
}
return eventBuffer;
/**
* Create new buffer workspace from an existing copy
* @param parent A pointer to an existing workspace
*/
DataObjects::EventWorkspace_sptr KafkaEventStreamDecoder::createBufferWorkspace(
const DataObjects::EventWorkspace_sptr &parent) {
auto buffer = boost::static_pointer_cast<DataObjects::EventWorkspace>(
API::WorkspaceFactory::Instance().create(
"EventWorkspace", parent->getNumberHistograms(), 2, 1));
// Copy meta data
API::WorkspaceFactory::Instance().initializeFromParent(*parent, *buffer,
false);
// Clear out the old logs, except for the most recent entry
buffer->mutableRun().clearOutdatedTimeSeriesLogValues();
return buffer;
}
/**
* Run LoadInstrument for the given instrument name. If it cannot succeed it
* does nothing to the internal workspace
* @param name Name of an instrument to load
* @param workspace A pointer to the workspace receiving the instrument
*/
void KafkaEventStreamDecoder::loadInstrument(
const std::string &name, DataObjects::EventWorkspace_sptr workspace) {
if (name.empty()) {
g_log.warning("Empty instrument name found");
return;
}
try {
auto alg =
API::AlgorithmManager::Instance().createUnmanaged("LoadInstrument");
// Do not put the workspace in the ADS
alg->setChild(true);
alg->initialize();
alg->setPropertyValue("InstrumentName", name);
alg->setProperty("Workspace", workspace);
alg->setProperty("RewriteSpectraMap", Kernel::OptionalBool(false));
alg->execute();
} catch (std::exception &exc) {
g_log.warning() << "Error loading instrument '" << name
<< "': " << exc.what() << "\n";
}