diff --git a/Framework/LiveData/CMakeLists.txt b/Framework/LiveData/CMakeLists.txt index 93a7f515a089cbdeab6580d7459e25f18f7ca2d3..53571ba3431fcf0763867580caf2025865e79d0f 100644 --- a/Framework/LiveData/CMakeLists.txt +++ b/Framework/LiveData/CMakeLists.txt @@ -75,6 +75,7 @@ if(LIBRDKAFKA_FOUND) src/Kafka/private/Schema/flatbuffers/base.h src/Kafka/private/Schema/flatbuffers/stl_emulation.h src/Kafka/private/Schema/df12_det_spec_map_generated.h + src/Kafka/private/Schema/dtdb_adc_pulse_debug_generated.h src/Kafka/private/Schema/ev42_events_generated.h src/Kafka/private/Schema/f142_logdata_generated.h src/Kafka/private/Schema/fwdi_forwarder_internal_generated.h diff --git a/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.h b/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.h index b865a7ffb75e0f5c023398d0beac35ad69b2342a..04f89d19eca57def565da2b5e697c0e3d17f37a5 100644 --- a/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.h +++ b/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.h @@ -60,7 +60,8 @@ public: const std::string &runInfoTopic, const std::string &spDetTopic, const std::string &sampleEnvTopic, - const std::string &chopperTopic); + const std::string &chopperTopic, + const std::string &monitorTopic); virtual ~IKafkaStreamDecoder(); IKafkaStreamDecoder(const IKafkaStreamDecoder &) = delete; IKafkaStreamDecoder &operator=(const IKafkaStreamDecoder &) = delete; @@ -139,6 +140,7 @@ protected: const std::string m_spDetTopic; const std::string m_sampleEnvTopic; const std::string m_chopperTopic; + const std::string m_monitorTopic; /// Flag indicating if user interruption has been requested std::atomic<bool> m_interrupt; /// Subscriber for the data stream diff --git a/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.tcc b/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.tcc index 90fbc5fc87b56002231c188f1c16e0fd1fe5850e..689f62c4fb095f25930533b394a5085e300d8128 100644 --- a/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.tcc +++ b/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.tcc @@ -153,7 +153,8 @@ void IKafkaStreamDecoder::writeChopperTimestampsToWorkspaceLogs( const auto *timestamps = chopperMsg->timestamps(); std::vector<uint64_t> mantidTimestamps; - std::copy(timestamps->begin(), timestamps->end(), mantidTimestamps.begin()); + std::copy(timestamps->begin(), timestamps->end(), + std::back_inserter(mantidTimestamps)); auto name = chopperMsg->name()->str(); for (auto workspace : workspaces) { diff --git a/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaEventStreamDecoder.h b/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaEventStreamDecoder.h index 4a058c2053b1557b04aa8dc58aa8b27391d882cb..dad15c81599fb148b4c799614098ceb225964c7a 100644 --- a/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaEventStreamDecoder.h +++ b/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaEventStreamDecoder.h @@ -38,13 +38,11 @@ public: }; public: - KafkaEventStreamDecoder(std::shared_ptr<IKafkaBroker> broker, - const std::string &eventTopic, - const std::string &runInfoTopic, - const std::string &spDetTopic, - const std::string &sampleEnvTopic, - const std::string &chopperTopic, - const std::size_t bufferThreshold); + KafkaEventStreamDecoder( + std::shared_ptr<IKafkaBroker> broker, const std::string &eventTopic, + const std::string &runInfoTopic, const std::string &spDetTopic, + const std::string &sampleEnvTopic, const std::string &chopperTopic, + const std::string &monitorTopic, const std::size_t bufferThreshold); ~KafkaEventStreamDecoder(); KafkaEventStreamDecoder(const KafkaEventStreamDecoder &) = delete; KafkaEventStreamDecoder &operator=(const KafkaEventStreamDecoder &) = delete; diff --git a/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaTopicSubscriber.h b/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaTopicSubscriber.h index 227eb2969b947cd4294f63448b5e943567322e0e..4d74fa43b5be0bac83a695ea5846ff62ed382f62 100644 --- a/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaTopicSubscriber.h +++ b/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaTopicSubscriber.h @@ -49,6 +49,7 @@ public: static const std::string DET_SPEC_TOPIC_SUFFIX; static const std::string SAMPLE_ENV_TOPIC_SUFFIX; static const std::string CHOPPER_TOPIC_SUFFIX; + static const std::string MONITOR_TOPIC_SUFFIX; static const int64_t IGNORE_OFFSET = -1; diff --git a/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp b/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp index 78d7a50fa008e2d9cd8841b1cd7a332ce6161ec2..6076da9832b8c96a78dbdacaa111549c33d79851 100644 --- a/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp +++ b/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp @@ -53,13 +53,14 @@ IKafkaStreamDecoder::IKafkaStreamDecoder(std::shared_ptr<IKafkaBroker> broker, const std::string &runInfoTopic, const std::string &spDetTopic, const std::string &sampleEnvTopic, - const std::string &chopperTopic) + const std::string &chopperTopic, + const std::string &monitorTopic) : m_broker(broker), m_streamTopic(streamTopic), m_runInfoTopic(runInfoTopic), m_spDetTopic(spDetTopic), m_sampleEnvTopic(sampleEnvTopic), m_chopperTopic(chopperTopic), - m_interrupt(false), m_specToIdx(), m_runStart(), m_runId(""), m_thread(), - m_capturing(false), m_exception(), m_extractWaiting(false), - m_cbIterationEnd([] {}), m_cbError([] {}) {} + m_monitorTopic(monitorTopic), m_interrupt(false), m_specToIdx(), + m_runStart(), m_runId(""), m_thread(), m_capturing(false), m_exception(), + m_extractWaiting(false), m_cbIterationEnd([] {}), m_cbError([] {}) {} /** * Destructor. @@ -83,7 +84,7 @@ void IKafkaStreamDecoder::startCapture(bool startNow) { joinStreamAtTime(runStartData); } else { m_dataStream = - m_broker->subscribe({m_streamTopic, m_runInfoTopic, m_sampleEnvTopic}, + m_broker->subscribe({m_streamTopic, m_runInfoTopic, m_sampleEnvTopic, m_monitorTopic}, SubscribeAtOption::LATEST); } @@ -425,7 +426,7 @@ IKafkaStreamDecoder::getRunStartMessage(std::string &rawMsgBuffer) { GetRunInfo(reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str())); if (runMsg->info_type_type() != InfoTypes::RunStart) { throw std::runtime_error("IKafkaStreamDecoder::initLocalCaches() - " - "Could not find a run start message" + "Could not find a run start message " "in the run info topic. Unable to continue"); } } diff --git a/Framework/LiveData/src/Kafka/KafkaEventListener.cpp b/Framework/LiveData/src/Kafka/KafkaEventListener.cpp index 79c7b7a7c63069ae9a94e58c5b6d703792bddb26..32764ef8276551edac7b5c14c2753f4cc8eb78fa 100644 --- a/Framework/LiveData/src/Kafka/KafkaEventListener.cpp +++ b/Framework/LiveData/src/Kafka/KafkaEventListener.cpp @@ -59,7 +59,9 @@ bool KafkaEventListener::connect(const Poco::Net::SocketAddress &address) { sampleEnvTopic(m_instrumentName + KafkaTopicSubscriber::SAMPLE_ENV_TOPIC_SUFFIX), chopperTopic(m_instrumentName + - KafkaTopicSubscriber::CHOPPER_TOPIC_SUFFIX); + KafkaTopicSubscriber::CHOPPER_TOPIC_SUFFIX), + monitorTopic(m_instrumentName + + KafkaTopicSubscriber::MONITOR_TOPIC_SUFFIX); for (const auto &topic : topics) { switch (topic.type()) { @@ -75,6 +77,9 @@ bool KafkaEventListener::connect(const Poco::Net::SocketAddress &address) { case Kernel::TopicType::Run: runInfoTopic = topic.name(); break; + case Kernel::TopicType::Monitor: + monitorTopic = topic.name(); + break; } } @@ -86,7 +91,7 @@ bool KafkaEventListener::connect(const Poco::Net::SocketAddress &address) { m_decoder = std::make_unique<KafkaEventStreamDecoder>( broker, eventTopic, runInfoTopic, spDetInfoTopic, sampleEnvTopic, - chopperTopic, bufferThreshold); + chopperTopic, monitorTopic, bufferThreshold); } catch (std::exception &exc) { g_log.error() << "KafkaEventListener::connect - Connection Error: " << exc.what() << "\n"; diff --git a/Framework/LiveData/src/Kafka/KafkaEventStreamDecoder.cpp b/Framework/LiveData/src/Kafka/KafkaEventStreamDecoder.cpp index f505d8a11b53cc70ec1509daa0907df031eae562..2d5655a9b7c414ba33ef3117eab347e4d53dc750 100644 --- a/Framework/LiveData/src/Kafka/KafkaEventStreamDecoder.cpp +++ b/Framework/LiveData/src/Kafka/KafkaEventStreamDecoder.cpp @@ -118,9 +118,9 @@ KafkaEventStreamDecoder::KafkaEventStreamDecoder( std::shared_ptr<IKafkaBroker> broker, const std::string &eventTopic, const std::string &runInfoTopic, const std::string &spDetTopic, const std::string &sampleEnvTopic, const std::string &chopperTopic, - const std::size_t bufferThreshold) + const std::string &monitorTopic, const std::size_t bufferThreshold) : IKafkaStreamDecoder(broker, eventTopic, runInfoTopic, spDetTopic, - sampleEnvTopic, chopperTopic), + sampleEnvTopic, chopperTopic, monitorTopic), m_intermediateBufferFlushThreshold(bufferThreshold) { #ifndef _OPENMP g_log.warning() << "Multithreading is not available on your system. This " @@ -379,7 +379,7 @@ void KafkaEventStreamDecoder::eventDataFromMessage(const std::string &buffer, BufferedPulse pulse{pulseTime, 0}; /* Perform facility specific operations */ - if (eventMsg->facility_specific_data_type() == FacilityData_ISISData) { + if (eventMsg->facility_specific_data_type() == FacilityData::ISISData) { std::lock_guard<std::mutex> workspaceLock(m_mutex); const auto ISISMsg = static_cast<const ISISData *>(eventMsg->facility_specific_data()); @@ -404,13 +404,24 @@ void KafkaEventStreamDecoder::eventDataFromMessage(const std::string &buffer, m_receivedEventBuffer.reserve(oldBufferSize + nEvents); /* Store the buffered events */ + for (flatbuffers::uoffset_t i = 0; i < tofData.size(); ++i) { + auto detId = detData[i]; + auto tof = tofData[i]; + uint32_t index = detId + m_specToIdxOffset; + if (index < m_specToIdx.size()) { + const auto workspaceIndex = m_specToIdx[index]; + auto event = BufferedEvent{workspaceIndex, tof, pulseIndex}; + m_receivedEventBuffer.emplace_back(std::move(event)); + } + } + /* std::transform(detData.begin(), detData.end(), tofData.begin(), std::back_inserter(m_receivedEventBuffer), [&](uint64_t detId, uint64_t tof) -> BufferedEvent { const auto workspaceIndex = m_specToIdx[detId + m_specToIdxOffset]; return {workspaceIndex, tof, pulseIndex}; - }); + });*/ } const auto endTime = std::chrono::system_clock::now(); @@ -549,6 +560,8 @@ void KafkaEventStreamDecoder::initLocalCaches( eventBuffer = boost::static_pointer_cast<DataObjects::EventWorkspace>( API::WorkspaceFactory::Instance().create("EventWorkspace", nspec, 2, 1)); + eventBuffer->setInstrument(ws->getInstrument()); + eventBuffer->rebuildSpectraMapping(); eventBuffer->getAxis(0)->unit() = Kernel::UnitFactory::Instance().create("TOF"); eventBuffer->setYUnit("Counts"); @@ -574,15 +587,17 @@ void KafkaEventStreamDecoder::initLocalCaches( } // Load the instrument if possible but continue if we can't - if (!instName.empty()) { - loadInstrument<DataObjects::EventWorkspace>(instName, eventBuffer, - jsonGeometry); - if (rawMsgBuffer.empty()) { - eventBuffer->rebuildSpectraMapping(); - } - } else - g_log.warning( - "Empty instrument name received. Continuing without instrument"); + if (!eventBuffer) { + if (!instName.empty()) { + loadInstrument<DataObjects::EventWorkspace>(instName, eventBuffer, + jsonGeometry); + if (rawMsgBuffer.empty()) { + eventBuffer->rebuildSpectraMapping(); + } + } else + g_log.warning( + "Empty instrument name received. Continuing without instrument"); + } auto &mutableRun = eventBuffer->mutableRun(); // Run start. Cache locally for computing frame times @@ -602,7 +617,8 @@ void KafkaEventStreamDecoder::initLocalCaches( eventBuffer->getSpectrumToWorkspaceIndexVector(m_specToIdxOffset); // Buffers for each period - const size_t nperiods = runStartData.nPeriods; + const size_t nperiods = + runStartData.nPeriods == 0 ? 1 : runStartData.nPeriods; if (nperiods == 0) { throw std::runtime_error( "KafkaEventStreamDecoder - Message has n_periods==0. This is " diff --git a/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp b/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp index 6e19376937389a1b7a0c342cb2142c1ccf225342..6c5a60c8ebdbc8f68bdb08eaf95785995ae8ee50 100644 --- a/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp +++ b/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp @@ -60,7 +60,7 @@ KafkaHistoStreamDecoder::KafkaHistoStreamDecoder( const std::string &runInfoTopic, const std::string &spDetTopic, const std::string &sampleEnvTopic, const std::string &chopperTopic) : IKafkaStreamDecoder(broker, histoTopic, runInfoTopic, spDetTopic, - sampleEnvTopic, chopperTopic), + sampleEnvTopic, chopperTopic, ""), m_workspace() {} /** diff --git a/Framework/LiveData/src/Kafka/KafkaTopicSubscriber.cpp b/Framework/LiveData/src/Kafka/KafkaTopicSubscriber.cpp index 09ce48b3b8fa514f10a9440f14c8158b5afd3d33..c2268bb2a3f320c4e480c841107118d5c523df32 100644 --- a/Framework/LiveData/src/Kafka/KafkaTopicSubscriber.cpp +++ b/Framework/LiveData/src/Kafka/KafkaTopicSubscriber.cpp @@ -12,6 +12,7 @@ #include <chrono> #include <sstream> #include <thread> +#include <iostream> using RdKafka::Conf; using RdKafka::KafkaConsumer; @@ -29,18 +30,21 @@ Mantid::Kernel::Logger &LOGGER() { } /// Create and return the global configuration object -std::unique_ptr<Conf> createGlobalConfiguration(const std::string &brokerAddr) { - auto conf = std::unique_ptr<Conf>(Conf::create(Conf::CONF_GLOBAL)); +std::unique_ptr<RdKafka::Conf> +createGlobalConfiguration(const std::string &brokerAddr) { + auto conf = + std::unique_ptr<RdKafka::Conf>(RdKafka::Conf::create(Conf::CONF_GLOBAL)); std::string errorMsg; conf->set("metadata.broker.list", brokerAddr, errorMsg); conf->set("session.timeout.ms", "10000", errorMsg); conf->set("group.id", "mantid", errorMsg); conf->set("message.max.bytes", MAX_MESSAGE_SIZE, errorMsg); + conf->set("fetch.max.bytes", MAX_MESSAGE_SIZE, errorMsg); conf->set("fetch.message.max.bytes", MAX_MESSAGE_SIZE, errorMsg); + conf->set("receive.message.max.bytes", "100000512", errorMsg); conf->set("replica.fetch.max.bytes", MAX_MESSAGE_SIZE, errorMsg); conf->set("enable.auto.commit", "false", errorMsg); conf->set("enable.auto.offset.store", "false", errorMsg); - conf->set("offset.store.method", "none", errorMsg); conf->set("api.version.request", "true", errorMsg); return conf; } @@ -59,6 +63,7 @@ const std::string KafkaTopicSubscriber::RUN_TOPIC_SUFFIX = "_runInfo"; const std::string KafkaTopicSubscriber::DET_SPEC_TOPIC_SUFFIX = "_detSpecMap"; const std::string KafkaTopicSubscriber::SAMPLE_ENV_TOPIC_SUFFIX = "_sampleEnv"; const std::string KafkaTopicSubscriber::CHOPPER_TOPIC_SUFFIX = "_choppers"; +const std::string KafkaTopicSubscriber::MONITOR_TOPIC_SUFFIX = "_monitors"; /** * Construct a topic subscriber @@ -278,9 +283,7 @@ void KafkaTopicSubscriber::seek(const std::string &topic, uint32_t partition, * Create the KafkaConsumer for required configuration */ void KafkaTopicSubscriber::createConsumer() { - // Create configurations auto globalConf = createGlobalConfiguration(m_brokerAddr); - std::string errorMsg; m_consumer = std::unique_ptr<KafkaConsumer>( KafkaConsumer::create(globalConf.get(), errorMsg)); diff --git a/Framework/LiveData/src/Kafka/private/Schema/dtdb_adc_pulse_debug_generated.h b/Framework/LiveData/src/Kafka/private/Schema/dtdb_adc_pulse_debug_generated.h new file mode 100644 index 0000000000000000000000000000000000000000..ffd63fb4be037bbf849baac702346911b57aedc7 --- /dev/null +++ b/Framework/LiveData/src/Kafka/private/Schema/dtdb_adc_pulse_debug_generated.h @@ -0,0 +1,130 @@ +// automatically generated by the FlatBuffers compiler, do not modify + + +#ifndef FLATBUFFERS_GENERATED_DTDBADCPULSEDEBUG_H_ +#define FLATBUFFERS_GENERATED_DTDBADCPULSEDEBUG_H_ + +#include "flatbuffers/flatbuffers.h" + +struct AdcPulseDebug; + +struct AdcPulseDebug FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + static FLATBUFFERS_CONSTEXPR const char *GetFullyQualifiedName() { + return "AdcPulseDebug"; + } + enum { + VT_AMPLITUDE = 4, + VT_PEAK_AREA = 6, + VT_BACKGROUND = 8, + VT_THRESHOLD_TIME = 10, + VT_PEAK_TIME = 12 + }; + const flatbuffers::Vector<uint32_t> *amplitude() const { + return GetPointer<const flatbuffers::Vector<uint32_t> *>(VT_AMPLITUDE); + } + flatbuffers::Vector<uint32_t> *mutable_amplitude() { + return GetPointer<flatbuffers::Vector<uint32_t> *>(VT_AMPLITUDE); + } + const flatbuffers::Vector<uint32_t> *peak_area() const { + return GetPointer<const flatbuffers::Vector<uint32_t> *>(VT_PEAK_AREA); + } + flatbuffers::Vector<uint32_t> *mutable_peak_area() { + return GetPointer<flatbuffers::Vector<uint32_t> *>(VT_PEAK_AREA); + } + const flatbuffers::Vector<uint32_t> *background() const { + return GetPointer<const flatbuffers::Vector<uint32_t> *>(VT_BACKGROUND); + } + flatbuffers::Vector<uint32_t> *mutable_background() { + return GetPointer<flatbuffers::Vector<uint32_t> *>(VT_BACKGROUND); + } + const flatbuffers::Vector<uint64_t> *threshold_time() const { + return GetPointer<const flatbuffers::Vector<uint64_t> *>(VT_THRESHOLD_TIME); + } + flatbuffers::Vector<uint64_t> *mutable_threshold_time() { + return GetPointer<flatbuffers::Vector<uint64_t> *>(VT_THRESHOLD_TIME); + } + const flatbuffers::Vector<uint64_t> *peak_time() const { + return GetPointer<const flatbuffers::Vector<uint64_t> *>(VT_PEAK_TIME); + } + flatbuffers::Vector<uint64_t> *mutable_peak_time() { + return GetPointer<flatbuffers::Vector<uint64_t> *>(VT_PEAK_TIME); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_AMPLITUDE) && + verifier.Verify(amplitude()) && + VerifyOffset(verifier, VT_PEAK_AREA) && + verifier.Verify(peak_area()) && + VerifyOffset(verifier, VT_BACKGROUND) && + verifier.Verify(background()) && + VerifyOffset(verifier, VT_THRESHOLD_TIME) && + verifier.Verify(threshold_time()) && + VerifyOffset(verifier, VT_PEAK_TIME) && + verifier.Verify(peak_time()) && + verifier.EndTable(); + } +}; + +struct AdcPulseDebugBuilder { + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_amplitude(flatbuffers::Offset<flatbuffers::Vector<uint32_t>> amplitude) { + fbb_.AddOffset(AdcPulseDebug::VT_AMPLITUDE, amplitude); + } + void add_peak_area(flatbuffers::Offset<flatbuffers::Vector<uint32_t>> peak_area) { + fbb_.AddOffset(AdcPulseDebug::VT_PEAK_AREA, peak_area); + } + void add_background(flatbuffers::Offset<flatbuffers::Vector<uint32_t>> background) { + fbb_.AddOffset(AdcPulseDebug::VT_BACKGROUND, background); + } + void add_threshold_time(flatbuffers::Offset<flatbuffers::Vector<uint64_t>> threshold_time) { + fbb_.AddOffset(AdcPulseDebug::VT_THRESHOLD_TIME, threshold_time); + } + void add_peak_time(flatbuffers::Offset<flatbuffers::Vector<uint64_t>> peak_time) { + fbb_.AddOffset(AdcPulseDebug::VT_PEAK_TIME, peak_time); + } + explicit AdcPulseDebugBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + AdcPulseDebugBuilder &operator=(const AdcPulseDebugBuilder &); + flatbuffers::Offset<AdcPulseDebug> Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset<AdcPulseDebug>(end); + return o; + } +}; + +inline flatbuffers::Offset<AdcPulseDebug> CreateAdcPulseDebug( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset<flatbuffers::Vector<uint32_t>> amplitude = 0, + flatbuffers::Offset<flatbuffers::Vector<uint32_t>> peak_area = 0, + flatbuffers::Offset<flatbuffers::Vector<uint32_t>> background = 0, + flatbuffers::Offset<flatbuffers::Vector<uint64_t>> threshold_time = 0, + flatbuffers::Offset<flatbuffers::Vector<uint64_t>> peak_time = 0) { + AdcPulseDebugBuilder builder_(_fbb); + builder_.add_peak_time(peak_time); + builder_.add_threshold_time(threshold_time); + builder_.add_background(background); + builder_.add_peak_area(peak_area); + builder_.add_amplitude(amplitude); + return builder_.Finish(); +} + +inline flatbuffers::Offset<AdcPulseDebug> CreateAdcPulseDebugDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const std::vector<uint32_t> *amplitude = nullptr, + const std::vector<uint32_t> *peak_area = nullptr, + const std::vector<uint32_t> *background = nullptr, + const std::vector<uint64_t> *threshold_time = nullptr, + const std::vector<uint64_t> *peak_time = nullptr) { + return CreateAdcPulseDebug( + _fbb, + amplitude ? _fbb.CreateVector<uint32_t>(*amplitude) : 0, + peak_area ? _fbb.CreateVector<uint32_t>(*peak_area) : 0, + background ? _fbb.CreateVector<uint32_t>(*background) : 0, + threshold_time ? _fbb.CreateVector<uint64_t>(*threshold_time) : 0, + peak_time ? _fbb.CreateVector<uint64_t>(*peak_time) : 0); +} + +#endif // FLATBUFFERS_GENERATED_DTDBADCPULSEDEBUG_H_ diff --git a/Framework/LiveData/src/Kafka/private/Schema/ev42_events_generated.h b/Framework/LiveData/src/Kafka/private/Schema/ev42_events_generated.h index 09d38e051526fe8a76d5a1e1fcd96b3fe9416ccd..09a5a6188794cd65bcf3f8c3d3675058af64ebbb 100644 --- a/Framework/LiveData/src/Kafka/private/Schema/ev42_events_generated.h +++ b/Framework/LiveData/src/Kafka/private/Schema/ev42_events_generated.h @@ -1,41 +1,67 @@ -// Mantid Repository : https://github.com/mantidproject/mantid -// -// Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI, -// NScD Oak Ridge National Laboratory, European Spallation Source -// & Institut Laue - Langevin -// SPDX - License - Identifier: GPL - 3.0 + -// clang-format off // automatically generated by the FlatBuffers compiler, do not modify + #ifndef FLATBUFFERS_GENERATED_EV42EVENTS_H_ #define FLATBUFFERS_GENERATED_EV42EVENTS_H_ #include "flatbuffers/flatbuffers.h" +#include "dtdb_adc_pulse_debug_generated.h" #include "is84_isis_events_generated.h" -struct ISISData; - - struct EventMessage; -enum FacilityData { - FacilityData_NONE = 0, - FacilityData_ISISData = 1, - FacilityData_MIN = FacilityData_NONE, - FacilityData_MAX = FacilityData_ISISData +enum class FacilityData : uint8_t { + NONE = 0, + ISISData = 1, + AdcPulseDebug = 2, + MIN = NONE, + MAX = AdcPulseDebug }; -inline const char **EnumNamesFacilityData() { - static const char *names[] = { "NONE", "ISISData", nullptr }; +inline const FacilityData (&EnumValuesFacilityData())[3] { + static const FacilityData values[] = { + FacilityData::NONE, + FacilityData::ISISData, + FacilityData::AdcPulseDebug + }; + return values; +} + +inline const char * const *EnumNamesFacilityData() { + static const char * const names[] = { + "NONE", + "ISISData", + "AdcPulseDebug", + nullptr + }; return names; } -inline const char *EnumNameFacilityData(FacilityData e) { return EnumNamesFacilityData()[static_cast<int>(e)]; } +inline const char *EnumNameFacilityData(FacilityData e) { + const size_t index = static_cast<int>(e); + return EnumNamesFacilityData()[index]; +} -inline bool VerifyFacilityData(flatbuffers::Verifier &verifier, const void *union_obj, FacilityData type); +template<typename T> struct FacilityDataTraits { + static const FacilityData enum_value = FacilityData::NONE; +}; + +template<> struct FacilityDataTraits<ISISData> { + static const FacilityData enum_value = FacilityData::ISISData; +}; + +template<> struct FacilityDataTraits<AdcPulseDebug> { + static const FacilityData enum_value = FacilityData::AdcPulseDebug; +}; + +bool VerifyFacilityData(flatbuffers::Verifier &verifier, const void *obj, FacilityData type); +bool VerifyFacilityDataVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector<flatbuffers::Offset<void>> *values, const flatbuffers::Vector<uint8_t> *types); struct EventMessage FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + static FLATBUFFERS_CONSTEXPR const char *GetFullyQualifiedName() { + return "EventMessage"; + } enum { VT_SOURCE_NAME = 4, VT_MESSAGE_ID = 6, @@ -45,56 +71,125 @@ struct EventMessage FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VT_FACILITY_SPECIFIC_DATA_TYPE = 14, VT_FACILITY_SPECIFIC_DATA = 16 }; - const flatbuffers::String *source_name() const { return GetPointer<const flatbuffers::String *>(VT_SOURCE_NAME); } - uint64_t message_id() const { return GetField<uint64_t>(VT_MESSAGE_ID, 0); } - uint64_t pulse_time() const { return GetField<uint64_t>(VT_PULSE_TIME, 0); } - const flatbuffers::Vector<uint32_t> *time_of_flight() const { return GetPointer<const flatbuffers::Vector<uint32_t> *>(VT_TIME_OF_FLIGHT); } - const flatbuffers::Vector<uint32_t> *detector_id() const { return GetPointer<const flatbuffers::Vector<uint32_t> *>(VT_DETECTOR_ID); } - FacilityData facility_specific_data_type() const { return static_cast<FacilityData>(GetField<uint8_t>(VT_FACILITY_SPECIFIC_DATA_TYPE, 0)); } - const void *facility_specific_data() const { return GetPointer<const void *>(VT_FACILITY_SPECIFIC_DATA); } + const flatbuffers::String *source_name() const { + return GetPointer<const flatbuffers::String *>(VT_SOURCE_NAME); + } + flatbuffers::String *mutable_source_name() { + return GetPointer<flatbuffers::String *>(VT_SOURCE_NAME); + } + uint64_t message_id() const { + return GetField<uint64_t>(VT_MESSAGE_ID, 0); + } + bool mutate_message_id(uint64_t _message_id) { + return SetField<uint64_t>(VT_MESSAGE_ID, _message_id, 0); + } + uint64_t pulse_time() const { + return GetField<uint64_t>(VT_PULSE_TIME, 0); + } + bool mutate_pulse_time(uint64_t _pulse_time) { + return SetField<uint64_t>(VT_PULSE_TIME, _pulse_time, 0); + } + const flatbuffers::Vector<uint32_t> *time_of_flight() const { + return GetPointer<const flatbuffers::Vector<uint32_t> *>(VT_TIME_OF_FLIGHT); + } + flatbuffers::Vector<uint32_t> *mutable_time_of_flight() { + return GetPointer<flatbuffers::Vector<uint32_t> *>(VT_TIME_OF_FLIGHT); + } + const flatbuffers::Vector<uint32_t> *detector_id() const { + return GetPointer<const flatbuffers::Vector<uint32_t> *>(VT_DETECTOR_ID); + } + flatbuffers::Vector<uint32_t> *mutable_detector_id() { + return GetPointer<flatbuffers::Vector<uint32_t> *>(VT_DETECTOR_ID); + } + FacilityData facility_specific_data_type() const { + return static_cast<FacilityData>(GetField<uint8_t>(VT_FACILITY_SPECIFIC_DATA_TYPE, 0)); + } + bool mutate_facility_specific_data_type(FacilityData _facility_specific_data_type) { + return SetField<uint8_t>(VT_FACILITY_SPECIFIC_DATA_TYPE, static_cast<uint8_t>(_facility_specific_data_type), 0); + } + const void *facility_specific_data() const { + return GetPointer<const void *>(VT_FACILITY_SPECIFIC_DATA); + } + template<typename T> const T *facility_specific_data_as() const; + const ISISData *facility_specific_data_as_ISISData() const { + return facility_specific_data_type() == FacilityData::ISISData ? static_cast<const ISISData *>(facility_specific_data()) : nullptr; + } + const AdcPulseDebug *facility_specific_data_as_AdcPulseDebug() const { + return facility_specific_data_type() == FacilityData::AdcPulseDebug ? static_cast<const AdcPulseDebug *>(facility_specific_data()) : nullptr; + } + void *mutable_facility_specific_data() { + return GetPointer<void *>(VT_FACILITY_SPECIFIC_DATA); + } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && - VerifyField<flatbuffers::uoffset_t>(verifier, VT_SOURCE_NAME) && + VerifyOffset(verifier, VT_SOURCE_NAME) && verifier.Verify(source_name()) && VerifyField<uint64_t>(verifier, VT_MESSAGE_ID) && VerifyField<uint64_t>(verifier, VT_PULSE_TIME) && - VerifyField<flatbuffers::uoffset_t>(verifier, VT_TIME_OF_FLIGHT) && + VerifyOffset(verifier, VT_TIME_OF_FLIGHT) && verifier.Verify(time_of_flight()) && - VerifyField<flatbuffers::uoffset_t>(verifier, VT_DETECTOR_ID) && + VerifyOffset(verifier, VT_DETECTOR_ID) && verifier.Verify(detector_id()) && VerifyField<uint8_t>(verifier, VT_FACILITY_SPECIFIC_DATA_TYPE) && - VerifyField<flatbuffers::uoffset_t>(verifier, VT_FACILITY_SPECIFIC_DATA) && + VerifyOffset(verifier, VT_FACILITY_SPECIFIC_DATA) && VerifyFacilityData(verifier, facility_specific_data(), facility_specific_data_type()) && verifier.EndTable(); } }; +template<> inline const ISISData *EventMessage::facility_specific_data_as<ISISData>() const { + return facility_specific_data_as_ISISData(); +} + +template<> inline const AdcPulseDebug *EventMessage::facility_specific_data_as<AdcPulseDebug>() const { + return facility_specific_data_as_AdcPulseDebug(); +} + struct EventMessageBuilder { flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; - void add_source_name(flatbuffers::Offset<flatbuffers::String> source_name) { fbb_.AddOffset(EventMessage::VT_SOURCE_NAME, source_name); } - void add_message_id(uint64_t message_id) { fbb_.AddElement<uint64_t>(EventMessage::VT_MESSAGE_ID, message_id, 0); } - void add_pulse_time(uint64_t pulse_time) { fbb_.AddElement<uint64_t>(EventMessage::VT_PULSE_TIME, pulse_time, 0); } - void add_time_of_flight(flatbuffers::Offset<flatbuffers::Vector<uint32_t>> time_of_flight) { fbb_.AddOffset(EventMessage::VT_TIME_OF_FLIGHT, time_of_flight); } - void add_detector_id(flatbuffers::Offset<flatbuffers::Vector<uint32_t>> detector_id) { fbb_.AddOffset(EventMessage::VT_DETECTOR_ID, detector_id); } - void add_facility_specific_data_type(FacilityData facility_specific_data_type) { fbb_.AddElement<uint8_t>(EventMessage::VT_FACILITY_SPECIFIC_DATA_TYPE, static_cast<uint8_t>(facility_specific_data_type), 0); } - void add_facility_specific_data(flatbuffers::Offset<void> facility_specific_data) { fbb_.AddOffset(EventMessage::VT_FACILITY_SPECIFIC_DATA, facility_specific_data); } - EventMessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { start_ = fbb_.StartTable(); } + void add_source_name(flatbuffers::Offset<flatbuffers::String> source_name) { + fbb_.AddOffset(EventMessage::VT_SOURCE_NAME, source_name); + } + void add_message_id(uint64_t message_id) { + fbb_.AddElement<uint64_t>(EventMessage::VT_MESSAGE_ID, message_id, 0); + } + void add_pulse_time(uint64_t pulse_time) { + fbb_.AddElement<uint64_t>(EventMessage::VT_PULSE_TIME, pulse_time, 0); + } + void add_time_of_flight(flatbuffers::Offset<flatbuffers::Vector<uint32_t>> time_of_flight) { + fbb_.AddOffset(EventMessage::VT_TIME_OF_FLIGHT, time_of_flight); + } + void add_detector_id(flatbuffers::Offset<flatbuffers::Vector<uint32_t>> detector_id) { + fbb_.AddOffset(EventMessage::VT_DETECTOR_ID, detector_id); + } + void add_facility_specific_data_type(FacilityData facility_specific_data_type) { + fbb_.AddElement<uint8_t>(EventMessage::VT_FACILITY_SPECIFIC_DATA_TYPE, static_cast<uint8_t>(facility_specific_data_type), 0); + } + void add_facility_specific_data(flatbuffers::Offset<void> facility_specific_data) { + fbb_.AddOffset(EventMessage::VT_FACILITY_SPECIFIC_DATA, facility_specific_data); + } + explicit EventMessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } EventMessageBuilder &operator=(const EventMessageBuilder &); flatbuffers::Offset<EventMessage> Finish() { - auto o = flatbuffers::Offset<EventMessage>(fbb_.EndTable(start_, 7)); + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset<EventMessage>(end); return o; } }; -inline flatbuffers::Offset<EventMessage> CreateEventMessage(flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset<flatbuffers::String> source_name = 0, - uint64_t message_id = 0, - uint64_t pulse_time = 0, - flatbuffers::Offset<flatbuffers::Vector<uint32_t>> time_of_flight = 0, - flatbuffers::Offset<flatbuffers::Vector<uint32_t>> detector_id = 0, - FacilityData facility_specific_data_type = FacilityData_NONE, - flatbuffers::Offset<void> facility_specific_data = 0) { +inline flatbuffers::Offset<EventMessage> CreateEventMessage( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset<flatbuffers::String> source_name = 0, + uint64_t message_id = 0, + uint64_t pulse_time = 0, + flatbuffers::Offset<flatbuffers::Vector<uint32_t>> time_of_flight = 0, + flatbuffers::Offset<flatbuffers::Vector<uint32_t>> detector_id = 0, + FacilityData facility_specific_data_type = FacilityData::NONE, + flatbuffers::Offset<void> facility_specific_data = 0) { EventMessageBuilder builder_(_fbb); builder_.add_pulse_time(pulse_time); builder_.add_message_id(message_id); @@ -106,24 +201,96 @@ inline flatbuffers::Offset<EventMessage> CreateEventMessage(flatbuffers::FlatBuf return builder_.Finish(); } -inline bool VerifyFacilityData(flatbuffers::Verifier &verifier, const void *union_obj, FacilityData type) { +inline flatbuffers::Offset<EventMessage> CreateEventMessageDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const char *source_name = nullptr, + uint64_t message_id = 0, + uint64_t pulse_time = 0, + const std::vector<uint32_t> *time_of_flight = nullptr, + const std::vector<uint32_t> *detector_id = nullptr, + FacilityData facility_specific_data_type = FacilityData::NONE, + flatbuffers::Offset<void> facility_specific_data = 0) { + return CreateEventMessage( + _fbb, + source_name ? _fbb.CreateString(source_name) : 0, + message_id, + pulse_time, + time_of_flight ? _fbb.CreateVector<uint32_t>(*time_of_flight) : 0, + detector_id ? _fbb.CreateVector<uint32_t>(*detector_id) : 0, + facility_specific_data_type, + facility_specific_data); +} + +inline bool VerifyFacilityData(flatbuffers::Verifier &verifier, const void *obj, FacilityData type) { switch (type) { - case FacilityData_NONE: return true; - case FacilityData_ISISData: return verifier.VerifyTable(reinterpret_cast<const ISISData *>(union_obj)); + case FacilityData::NONE: { + return true; + } + case FacilityData::ISISData: { + auto ptr = reinterpret_cast<const ISISData *>(obj); + return verifier.VerifyTable(ptr); + } + case FacilityData::AdcPulseDebug: { + auto ptr = reinterpret_cast<const AdcPulseDebug *>(obj); + return verifier.VerifyTable(ptr); + } default: return false; } } -inline const EventMessage *GetEventMessage(const void *buf) { return flatbuffers::GetRoot<EventMessage>(buf); } +inline bool VerifyFacilityDataVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector<flatbuffers::Offset<void>> *values, const flatbuffers::Vector<uint8_t> *types) { + if (!values || !types) return !values && !types; + if (values->size() != types->size()) return false; + for (flatbuffers::uoffset_t i = 0; i < values->size(); ++i) { + if (!VerifyFacilityData( + verifier, values->Get(i), types->GetEnum<FacilityData>(i))) { + return false; + } + } + return true; +} + +inline const EventMessage *GetEventMessage(const void *buf) { + return flatbuffers::GetRoot<EventMessage>(buf); +} -inline bool VerifyEventMessageBuffer(flatbuffers::Verifier &verifier) { return verifier.VerifyBuffer<EventMessage>(); } +inline const EventMessage *GetSizePrefixedEventMessage(const void *buf) { + return flatbuffers::GetSizePrefixedRoot<EventMessage>(buf); +} -inline const char *EventMessageIdentifier() { return "ev42"; } +inline EventMessage *GetMutableEventMessage(void *buf) { + return flatbuffers::GetMutableRoot<EventMessage>(buf); +} -inline bool EventMessageBufferHasIdentifier(const void *buf) { return flatbuffers::BufferHasIdentifier(buf, EventMessageIdentifier()); } +inline const char *EventMessageIdentifier() { + return "ev42"; +} -inline void FinishEventMessageBuffer(flatbuffers::FlatBufferBuilder &fbb, flatbuffers::Offset<EventMessage> root) { fbb.Finish(root, EventMessageIdentifier()); } +inline bool EventMessageBufferHasIdentifier(const void *buf) { + return flatbuffers::BufferHasIdentifier( + buf, EventMessageIdentifier()); +} +inline bool VerifyEventMessageBuffer( + flatbuffers::Verifier &verifier) { + return verifier.VerifyBuffer<EventMessage>(EventMessageIdentifier()); +} + +inline bool VerifySizePrefixedEventMessageBuffer( + flatbuffers::Verifier &verifier) { + return verifier.VerifySizePrefixedBuffer<EventMessage>(EventMessageIdentifier()); +} + +inline void FinishEventMessageBuffer( + flatbuffers::FlatBufferBuilder &fbb, + flatbuffers::Offset<EventMessage> root) { + fbb.Finish(root, EventMessageIdentifier()); +} + +inline void FinishSizePrefixedEventMessageBuffer( + flatbuffers::FlatBufferBuilder &fbb, + flatbuffers::Offset<EventMessage> root) { + fbb.FinishSizePrefixed(root, EventMessageIdentifier()); +} #endif // FLATBUFFERS_GENERATED_EV42EVENTS_H_ -// clang-format on diff --git a/buildconfig/CMake/Bootstrap.cmake b/buildconfig/CMake/Bootstrap.cmake index b8f0b387d619880bce3f8f3799ca15c89bb3fc0c..d5dc7276e6e21f57610a31456f5f1624d4c0b4d4 100644 --- a/buildconfig/CMake/Bootstrap.cmake +++ b/buildconfig/CMake/Bootstrap.cmake @@ -10,7 +10,7 @@ if( MSVC ) include ( ExternalProject ) set( EXTERNAL_ROOT ${PROJECT_SOURCE_DIR}/external CACHE PATH "Location to clone third party dependencies to" ) set( THIRD_PARTY_GIT_URL "https://github.com/mantidproject/thirdparty-msvc2015.git" ) - set ( THIRD_PARTY_GIT_SHA1 c6e26fc65fad0dea30777ed29f343c382b5e2b29 ) + set ( THIRD_PARTY_GIT_SHA1 c0ab451678efa06bea22748c1ee811eccb70d62b ) set ( THIRD_PARTY_DIR ${EXTERNAL_ROOT}/src/ThirdParty ) # Generates a script to do the clone/update in tmp set ( _project_name ThirdParty ) diff --git a/instrument/Facilities.xml b/instrument/Facilities.xml index 11a540caa38f1f3a3cfc94b030db402ff0117c95..199be25e5199736d304990e5df80fa8593dcf7b9 100644 --- a/instrument/Facilities.xml +++ b/instrument/Facilities.xml @@ -882,6 +882,14 @@ <!-- HZB --> <facility name="HZB" FileExtensions=".nxs"> <timezone>Europe/Berlin</timezone> + + <instrument name="TEST" shortname="TEST"> + <zeropadding size="8" /> + <technique>ESS Test Beamline</technique> + <livedata default="event"> + <connection name="event" address="192.168.1.80:9092" listener="KafkaEventListener" /> + </livedata> + </instrument> <instrument name="EXED"> <technique>TOF Direct Geometry Spectroscopy</technique>