diff --git a/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaHistoListener.h b/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaHistoListener.h index 0b54d8dbed9e76ad8b4667ac49beef3ed4e18e5c..796962d211f3748935c1a44f680583494ede51f7 100644 --- a/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaHistoListener.h +++ b/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaHistoListener.h @@ -20,9 +20,9 @@ namespace LiveData { class KafkaHistoStreamDecoder; /** - Implementation of a live listener to consume messages from the Kafka system - at ISIS. It currently parses the events directly using flatbuffers so will - need updating if the schema changes. + Implementation of a live listener to consume messages which are in a histogram + format from the Kafka system at ISIS. It currently parses the histogram data + directly using flatbuffers so will need updating if the schema changes. */ class DLLExport KafkaHistoListener : public API::LiveListener { public: diff --git a/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaHistoStreamDecoder.h b/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaHistoStreamDecoder.h index 3251e7eb6d241130c97c94c43feac248b2904aa7..c40b313764e3d40c7473916398b394acbf780fd6 100644 --- a/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaHistoStreamDecoder.h +++ b/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaHistoStreamDecoder.h @@ -8,8 +8,8 @@ #define MANTID_LIVEDATA_ISISKAFKAHISTOSTREAMDECODER_H_ #include "MantidDataObjects/Workspace2D.h" -#include "MantidLiveData/Kafka/IKafkaBroker.h" #include "MantidLiveData/Kafka/IKafkaStreamSubscriber.h" +#include "MantidLiveData/Kafka/KafkaBroker.h" #include <atomic> #include <mutex> @@ -26,7 +26,7 @@ namespace LiveData { */ class DLLExport KafkaHistoStreamDecoder { public: - KafkaHistoStreamDecoder(std::shared_ptr<IKafkaBroker> broker, + KafkaHistoStreamDecoder(const std::string &brokerAddress, const std::string &histoTopic, const std::string &instrumentName); ~KafkaHistoStreamDecoder(); @@ -37,7 +37,7 @@ public: ///@name Start/stop ///@{ void startCapture(bool startNow = true); - void stopCapture(); + void stopCapture() noexcept; ///@} ///@name Querying @@ -61,7 +61,7 @@ private: DataObjects::Workspace2D_sptr createBufferWorkspace(); /// Broker to use to subscribe to topics - std::shared_ptr<IKafkaBroker> m_broker; + KafkaBroker m_broker; /// Topic name const std::string m_histoTopic; /// Instrument name @@ -82,7 +82,7 @@ private: /// Flag indicating that the decoder is capturing std::atomic<bool> m_capturing; /// Exception object indicating there was an error - boost::shared_ptr<std::runtime_error> m_exception; + std::unique_ptr<std::runtime_error> m_exception; }; } // namespace LiveData diff --git a/Framework/LiveData/src/Kafka/KafkaHistoListener.cpp b/Framework/LiveData/src/Kafka/KafkaHistoListener.cpp index 81b2841f5fbae47ec9cfcdbcd2f764d27419446d..b7f3d5ed22f52c7f8eaa0be77aece79cf1d4b0ba 100644 --- a/Framework/LiveData/src/Kafka/KafkaHistoListener.cpp +++ b/Framework/LiveData/src/Kafka/KafkaHistoListener.cpp @@ -27,13 +27,12 @@ KafkaHistoListener::KafkaHistoListener() { /// @copydoc ILiveListener::connect bool KafkaHistoListener::connect(const Poco::Net::SocketAddress &address) { - auto broker = std::make_shared<KafkaBroker>(address.toString()); try { std::string instrumentName = getProperty("InstrumentName"); const std::string histoTopic(instrumentName + KafkaTopicSubscriber::HISTO_TOPIC_SUFFIX); - m_decoder = Kernel::make_unique<KafkaHistoStreamDecoder>(broker, histoTopic, - instrumentName); + m_decoder = Kernel::make_unique<KafkaHistoStreamDecoder>( + address.toString(), histoTopic, instrumentName); } catch (std::exception &exc) { g_log.error() << "KafkaHistoListener::connect - Connection Error: " << exc.what() << "\n"; @@ -75,8 +74,8 @@ bool KafkaHistoListener::isConnected() { /// @copydoc ILiveListener::runStatus API::ILiveListener::RunStatus KafkaHistoListener::runStatus() { if (!m_decoder) { - g_log.error("KafkaHistoListener::runStatus(): Kafka is not connected"); - throw Kernel::Exception::InternetError("Kafka is not connected"); + g_log.warning("KafkaHistoListener::runStatus(): Kafka is not connected"); + return NoRun; } return m_decoder->hasReachedEndOfRun() ? EndRun : Running; diff --git a/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp b/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp index 5b1f91369433fdc105d1a7b8573317a92d08e136..5e9b11deb6ca4c230ec34e7c05b62159eb4322ab 100644 --- a/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp +++ b/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp @@ -35,18 +35,26 @@ namespace LiveData { /** * Constructor - * @param broker A reference to a Broker object for creating topic streams + * @param brokerAddress The physical ipAddress of the broker * @param histoTopic The name of the topic streaming the histo data * @param spDetTopic The name of the topic streaming the spectrum-detector * run mapping */ KafkaHistoStreamDecoder::KafkaHistoStreamDecoder( - std::shared_ptr<IKafkaBroker> broker, const std::string &histoTopic, + const std::string &brokerAddress, const std::string &histoTopic, const std::string &instrumentName) - : m_broker(broker), m_histoTopic(histoTopic), + : m_broker(brokerAddress), m_histoTopic(histoTopic), m_instrumentName(instrumentName), m_histoStream(), m_workspace(), m_buffer(), m_thread(), m_interrupt(false), m_capturing(false), - m_exception() { + m_exception(nullptr) { + if (histoTopic.empty()) + throw std::invalid_argument( + "KafkaHistoStreamDecoder::KafkaHistoStreamDecorder " + ": histogramTopic cannot be an empty string."); + if (instrumentName.empty()) + throw std::invalid_argument( + "KafkaHistoStreamDecoder::KafkaHistoStreamDecorder " + ": instrumentName cannot be an empty string."); // Initialize buffer workspace m_workspace = createBufferWorkspace(); } @@ -63,8 +71,7 @@ KafkaHistoStreamDecoder::~KafkaHistoStreamDecoder() { stopCapture(); } */ void KafkaHistoStreamDecoder::startCapture(bool) { g_log.debug() << "Starting capture on topic: " << m_histoTopic << "\n"; - m_histoStream = - m_broker->subscribe({m_histoTopic}, SubscribeAtOption::LATEST); + m_histoStream = m_broker.subscribe({m_histoTopic}, SubscribeAtOption::LATEST); m_thread = std::thread([this]() { this->captureImpl(); }); m_thread.detach(); @@ -74,7 +81,7 @@ void KafkaHistoStreamDecoder::startCapture(bool) { * Stop capturing from the stream. This is a blocking call until the capturing * function has completed */ -void KafkaHistoStreamDecoder::stopCapture() { +void KafkaHistoStreamDecoder::stopCapture() noexcept { g_log.debug() << "Stopping capture\n"; // This will interrupt the "event" loop @@ -126,9 +133,10 @@ API::Workspace_sptr KafkaHistoStreamDecoder::extractDataImpl() { throw Exception::NotYet("No message to process yet."); } - auto histoMsg = GetEventHistogram(m_buffer.c_str()); + // Retrieve flatbuffer struct describing histogram + const auto *histoMsg = GetEventHistogram(m_buffer.c_str()); - auto shape = histoMsg->current_shape(); + const auto *shape = histoMsg->current_shape(); auto nbins = shape->Get(0) - 1; auto nspectra = static_cast<size_t>(shape->Get(1)); @@ -171,10 +179,10 @@ void KafkaHistoStreamDecoder::captureImpl() { try { captureImplExcept(); } catch (std::exception &exc) { - m_exception = boost::make_shared<std::runtime_error>(exc.what()); + m_exception.reset(new std::runtime_error(exc.what())); } catch (...) { - m_exception = boost::make_shared<std::runtime_error>( - "KafkaEventStreamDecoder: Unknown exception type caught."); + m_exception.reset(new std::runtime_error( + "KafkaEventStreamDecoder: Unknown exception type caught.")); } m_capturing = false; } @@ -222,6 +230,7 @@ DataObjects::Workspace2D_sptr KafkaHistoStreamDecoder::createBufferWorkspace() { alg->setChild(true); alg->initialize(); alg->setPropertyValue("InstrumentName", m_instrumentName); + // Dummy workspace value "ws" as not placed in ADS alg->setPropertyValue("OutputWorkspace", "ws"); alg->execute(); workspace = alg->getProperty("OutputWorkspace"); @@ -231,6 +240,7 @@ DataObjects::Workspace2D_sptr KafkaHistoStreamDecoder::createBufferWorkspace() { throw; } + // OutputWorkspace type is Worspace2D in algorithm see LoadEmptyInstrument.cpp return boost::dynamic_pointer_cast<DataObjects::Workspace2D>(workspace); } diff --git a/Framework/LiveData/src/Kafka/private/Schema/ai33_det_count_imgs_generated.h b/Framework/LiveData/src/Kafka/private/Schema/ai33_det_count_imgs_generated.h deleted file mode 100644 index ca7d077acc4ab21eeacce1e921ef4b3a288a83c3..0000000000000000000000000000000000000000 --- a/Framework/LiveData/src/Kafka/private/Schema/ai33_det_count_imgs_generated.h +++ /dev/null @@ -1,111 +0,0 @@ -// automatically generated by the FlatBuffers compiler, do not modify - -#ifndef FLATBUFFERS_GENERATED_AI33DETCOUNTIMGS_H_ -#define FLATBUFFERS_GENERATED_AI33DETCOUNTIMGS_H_ - -#include "flatbuffers/flatbuffers.h" - -struct AccumulatedImage; - -struct AccumulatedImage FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { - enum { - VT_FIRST_PULSE_TIME = 4, - VT_PULSE_TIME = 6, - VT_DETECTOR_ID = 8, - VT_DETECTION_COUNT = 10 - }; - uint64_t first_pulse_time() const { - return GetField<uint64_t>(VT_FIRST_PULSE_TIME, 0); - } - uint64_t pulse_time() const { return GetField<uint64_t>(VT_PULSE_TIME, 0); } - const flatbuffers::Vector<uint32_t> *detector_id() const { - return GetPointer<const flatbuffers::Vector<uint32_t> *>(VT_DETECTOR_ID); - } - const flatbuffers::Vector<uint32_t> *detection_count() const { - return GetPointer<const flatbuffers::Vector<uint32_t> *>( - VT_DETECTION_COUNT); - } - bool Verify(flatbuffers::Verifier &verifier) const { - return VerifyTableStart(verifier) && - VerifyField<uint64_t>(verifier, VT_FIRST_PULSE_TIME) && - VerifyField<uint64_t>(verifier, VT_PULSE_TIME) && - VerifyField<flatbuffers::uoffset_t>(verifier, VT_DETECTOR_ID) && - verifier.Verify(detector_id()) && - VerifyField<flatbuffers::uoffset_t>(verifier, VT_DETECTION_COUNT) && - verifier.Verify(detection_count()) && verifier.EndTable(); - } -}; - -struct AccumulatedImageBuilder { - flatbuffers::FlatBufferBuilder &fbb_; - flatbuffers::uoffset_t start_; - void add_first_pulse_time(uint64_t first_pulse_time) { - fbb_.AddElement<uint64_t>(AccumulatedImage::VT_FIRST_PULSE_TIME, - first_pulse_time, 0); - } - void add_pulse_time(uint64_t pulse_time) { - fbb_.AddElement<uint64_t>(AccumulatedImage::VT_PULSE_TIME, pulse_time, 0); - } - void add_detector_id( - flatbuffers::Offset<flatbuffers::Vector<uint32_t>> detector_id) { - fbb_.AddOffset(AccumulatedImage::VT_DETECTOR_ID, detector_id); - } - void add_detection_count( - flatbuffers::Offset<flatbuffers::Vector<uint32_t>> detection_count) { - fbb_.AddOffset(AccumulatedImage::VT_DETECTION_COUNT, detection_count); - } - AccumulatedImageBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { - start_ = fbb_.StartTable(); - } - AccumulatedImageBuilder &operator=(const AccumulatedImageBuilder &); - flatbuffers::Offset<AccumulatedImage> Finish() { - const auto end = fbb_.EndTable(start_, 4); - auto o = flatbuffers::Offset<AccumulatedImage>(end); - return o; - } -}; - -inline flatbuffers::Offset<AccumulatedImage> CreateAccumulatedImage( - flatbuffers::FlatBufferBuilder &_fbb, uint64_t first_pulse_time = 0, - uint64_t pulse_time = 0, - flatbuffers::Offset<flatbuffers::Vector<uint32_t>> detector_id = 0, - flatbuffers::Offset<flatbuffers::Vector<uint32_t>> detection_count = 0) { - AccumulatedImageBuilder builder_(_fbb); - builder_.add_pulse_time(pulse_time); - builder_.add_first_pulse_time(first_pulse_time); - builder_.add_detection_count(detection_count); - builder_.add_detector_id(detector_id); - return builder_.Finish(); -} - -inline flatbuffers::Offset<AccumulatedImage> CreateAccumulatedImageDirect( - flatbuffers::FlatBufferBuilder &_fbb, uint64_t first_pulse_time = 0, - uint64_t pulse_time = 0, const std::vector<uint32_t> *detector_id = nullptr, - const std::vector<uint32_t> *detection_count = nullptr) { - return CreateAccumulatedImage( - _fbb, first_pulse_time, pulse_time, - detector_id ? _fbb.CreateVector<uint32_t>(*detector_id) : 0, - detection_count ? _fbb.CreateVector<uint32_t>(*detection_count) : 0); -} - -inline const AccumulatedImage *GetAccumulatedImage(const void *buf) { - return flatbuffers::GetRoot<AccumulatedImage>(buf); -} - -inline const char *AccumulatedImageIdentifier() { return "ai33"; } - -inline bool AccumulatedImageBufferHasIdentifier(const void *buf) { - return flatbuffers::BufferHasIdentifier(buf, AccumulatedImageIdentifier()); -} - -inline bool VerifyAccumulatedImageBuffer(flatbuffers::Verifier &verifier) { - return verifier.VerifyBuffer<AccumulatedImage>(AccumulatedImageIdentifier()); -} - -inline void -FinishAccumulatedImageBuffer(flatbuffers::FlatBufferBuilder &fbb, - flatbuffers::Offset<AccumulatedImage> root) { - fbb.Finish(root, AccumulatedImageIdentifier()); -} - -#endif // FLATBUFFERS_GENERATED_AI33DETCOUNTIMGS_H_ diff --git a/Framework/LiveData/src/Kafka/private/Schema/hs00_event_histogram_generated.h b/Framework/LiveData/src/Kafka/private/Schema/hs00_event_histogram_generated.h index 9c1f682d213972bc2921c6b421fad6491d961873..3a55e3dcd539ab1f2620e0f5f11c6703b923b92a 100644 --- a/Framework/LiveData/src/Kafka/private/Schema/hs00_event_histogram_generated.h +++ b/Framework/LiveData/src/Kafka/private/Schema/hs00_event_histogram_generated.h @@ -1,6 +1,7 @@ // clang-format off // automatically generated by the FlatBuffers compiler, do not modify - +// Original schema can be found at: +// https://github.com/ess-dmsc/streaming-data-types/tree/master/schemas/hs00_event_histogram.fbs #ifndef FLATBUFFERS_GENERATED_HS00EVENTHISTOGRAM_H_ #define FLATBUFFERS_GENERATED_HS00EVENTHISTOGRAM_H_