diff --git a/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaHistoStreamDecoder.h b/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaHistoStreamDecoder.h index 25e5a6133fc451b24150cf6bfa76730abcde24eb..2608a036f6e0c6815976cc73ffd0214b4809bb12 100644 --- a/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaHistoStreamDecoder.h +++ b/Framework/LiveData/inc/MantidLiveData/Kafka/KafkaHistoStreamDecoder.h @@ -27,7 +27,8 @@ public: const std::string &histoTopic, const std::string &runInfoTopic, const std::string &spDetTopic, - const std::string &sampleEnvTopic); + const std::string &sampleEnvTopic, + const std::string &chopperTopic); ~KafkaHistoStreamDecoder(); KafkaHistoStreamDecoder(const KafkaHistoStreamDecoder &) = delete; KafkaHistoStreamDecoder &operator=(const KafkaHistoStreamDecoder &) = delete; diff --git a/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp b/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp index f7e6acd3e2426eddbfa060ecdf657d6b11947b4d..1c3ef2ef0b4b65f66dcb5377ab1e47e8cfddd49b 100644 --- a/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp +++ b/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp @@ -87,9 +87,16 @@ void IKafkaStreamDecoder::startCapture(bool startNow) { SubscribeAtOption::LATEST); } - if (!m_chopperTopic.empty()) - m_chopperStream = - m_broker->subscribe({m_chopperTopic}, SubscribeAtOption::LATEST); + try { + if (!m_chopperTopic.empty()) + m_chopperStream = + m_broker->subscribe({m_chopperTopic}, SubscribeAtOption::LATEST); + } catch (std::exception &) { + g_log.notice() << "Could not subscribe to topic " + m_chopperTopic + + ". This topic does not exist. No chopper information " + "will be written to the logs." + << std::endl; + } // Get last two messages in run topic to ensure we get a runStart message m_runStream = diff --git a/Framework/LiveData/src/Kafka/KafkaHistoListener.cpp b/Framework/LiveData/src/Kafka/KafkaHistoListener.cpp index 18ca57301b7bb580ec00faf8d1d4c05d94b6a358..0268feb36e404ee437ba12d321e3d216dad770d8 100644 --- a/Framework/LiveData/src/Kafka/KafkaHistoListener.cpp +++ b/Framework/LiveData/src/Kafka/KafkaHistoListener.cpp @@ -51,11 +51,13 @@ bool KafkaHistoListener::connect(const Poco::Net::SocketAddress &address) { spDetInfoTopic(m_instrumentName + KafkaTopicSubscriber::DET_SPEC_TOPIC_SUFFIX), sampleEnvTopic(m_instrumentName + - KafkaTopicSubscriber::SAMPLE_ENV_TOPIC_SUFFIX); + KafkaTopicSubscriber::SAMPLE_ENV_TOPIC_SUFFIX), + chopperTimestampTopic(m_instrumentName + + KafkaTopicSubscriber::CHOPPER_TOPIC_SUFFIX); m_decoder = std::make_unique<KafkaHistoStreamDecoder>( std::make_shared<KafkaBroker>(address.toString()), histoTopic, - runInfoTopic, spDetInfoTopic, sampleEnvTopic); + runInfoTopic, spDetInfoTopic, sampleEnvTopic, chopperTimestampTopic); } catch (std::exception &exc) { g_log.error() << "KafkaHistoListener::connect - Connection Error: " << exc.what() << "\n"; diff --git a/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp b/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp index db2b2a21067c13a48bf84d46056b55f7903a8e2b..eeab29a0453988202ed007b490fbf3746059075a 100644 --- a/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp +++ b/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp @@ -58,9 +58,9 @@ namespace LiveData { KafkaHistoStreamDecoder::KafkaHistoStreamDecoder( std::shared_ptr<IKafkaBroker> broker, const std::string &histoTopic, const std::string &runInfoTopic, const std::string &spDetTopic, - const std::string &sampleEnvTopic) + const std::string &sampleEnvTopic, const std::string &chopperTopic) : IKafkaStreamDecoder(broker, histoTopic, runInfoTopic, spDetTopic, - sampleEnvTopic, ""), + sampleEnvTopic, chopperTopic), m_workspace() {} /** diff --git a/Framework/LiveData/test/KafkaHistoStreamDecoderTest.h b/Framework/LiveData/test/KafkaHistoStreamDecoderTest.h index 9dcf5ee29ee6d5fd1a567ac3162177f166df483c..4ebf592ae848061f28958534062218a77d89f413 100644 --- a/Framework/LiveData/test/KafkaHistoStreamDecoderTest.h +++ b/Framework/LiveData/test/KafkaHistoStreamDecoderTest.h @@ -98,7 +98,8 @@ private: std::unique_ptr<Mantid::LiveData::KafkaHistoStreamDecoder> createTestDecoder(std::shared_ptr<Mantid::LiveData::IKafkaBroker> broker) { using namespace Mantid::LiveData; - return std::make_unique<KafkaHistoStreamDecoder>(broker, "", "", "", ""); + return std::make_unique<KafkaHistoStreamDecoder>(broker, "", "", "", "", + ""); } // Start decoding and wait until we have gathered enough data to test