diff --git a/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp b/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp index 1c3ef2ef0b4b65f66dcb5377ab1e47e8cfddd49b..78d7a50fa008e2d9cd8841b1cd7a332ce6161ec2 100644 --- a/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp +++ b/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp @@ -101,8 +101,13 @@ void IKafkaStreamDecoder::startCapture(bool startNow) { // Get last two messages in run topic to ensure we get a runStart message m_runStream = m_broker->subscribe({m_runInfoTopic}, SubscribeAtOption::LASTTWO); - m_spDetStream = - m_broker->subscribe({m_spDetTopic}, SubscribeAtOption::LASTONE); + try { + m_spDetStream = + m_broker->subscribe({m_spDetTopic}, SubscribeAtOption::LASTONE); + } catch (const std::runtime_error &) { + g_log.debug() + << "No detector-spectrum map message found, will assume a 1:1 mapping."; + } m_thread = std::thread([this]() { this->captureImpl(); }); m_thread.detach(); @@ -341,21 +346,25 @@ void IKafkaStreamDecoder::waitForRunEndObservation() { * current run start time * * @param runStartStruct details of the current run - * @return received detector-spectrum map message buffer + * @return received detector-spectrum map message buffer, empty string if a + * mapping was not streamed */ std::string IKafkaStreamDecoder::getDetSpecMapForRun( const IKafkaStreamDecoder::RunStartStruct &runStartStruct) { std::string rawMsgBuffer; - int64_t offset; - int32_t partition; - std::string topicName; - m_spDetStream = m_broker->subscribe( - {m_spDetTopic}, nanosecondsToMilliseconds(runStartStruct.startTime), - SubscribeAtOption::TIME); - m_spDetStream->consumeMessage(&rawMsgBuffer, offset, partition, topicName); + try { + m_spDetStream = m_broker->subscribe( + {m_spDetTopic}, nanosecondsToMilliseconds(runStartStruct.startTime), + SubscribeAtOption::TIME); + int64_t offset; + int32_t partition; + std::string topicName; + m_spDetStream->consumeMessage(&rawMsgBuffer, offset, partition, topicName); + } catch (const std::runtime_error &) { + } if (rawMsgBuffer.empty()) { - std::runtime_error("No detector-spectrum map message found for run " + - runStartStruct.runId); + g_log.debug() << "No detector-spectrum map message found for run " + << runStartStruct.runId << ", will assume a 1:1 mapping."; } return rawMsgBuffer; } diff --git a/Framework/LiveData/src/Kafka/KafkaEventStreamDecoder.cpp b/Framework/LiveData/src/Kafka/KafkaEventStreamDecoder.cpp index 092e2d1aa9f1bd5219027a128742a3a26305c72f..b08b29d35ec7d6964a207cc781f364bacbdfbda4 100644 --- a/Framework/LiveData/src/Kafka/KafkaEventStreamDecoder.cpp +++ b/Framework/LiveData/src/Kafka/KafkaEventStreamDecoder.cpp @@ -206,7 +206,9 @@ void KafkaEventStreamDecoder::captureImplExcept() { int64_t offset; int32_t partition; std::string topicName; - m_spDetStream->consumeMessage(&buffer, offset, partition, topicName); + if (m_spDetStream) { + m_spDetStream->consumeMessage(&buffer, offset, partition, topicName); + } auto runStartStruct = getRunStartMessage(runBuffer); initLocalCaches(buffer, runStartStruct); @@ -530,40 +532,60 @@ void KafkaEventStreamDecoder::sampleDataFromMessage(const std::string &buffer) { */ void KafkaEventStreamDecoder::initLocalCaches( const std::string &rawMsgBuffer, const RunStartStruct &runStartData) { - if (rawMsgBuffer.empty()) { - throw std::runtime_error("KafkaEventStreamDecoder::initLocalCaches() - " - "Empty message received from spectrum-detector " - "topic. Unable to continue"); - } - auto spDetMsg = GetSpectraDetectorMapping( - reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str())); - auto nspec = static_cast<uint32_t>(spDetMsg->n_spectra()); - auto nudet = spDetMsg->detector_id()->size(); - if (nudet != nspec) { - std::ostringstream os; - os << "KafkaEventStreamDecoder::initLocalEventBuffer() - Invalid " - "spectra/detector mapping. Expected matched length arrays but " - "found nspec=" - << nspec << ", ndet=" << nudet; - throw std::runtime_error(os.str()); - } - m_runId = runStartData.runId; - // Create buffer - auto eventBuffer = createBufferWorkspace<DataObjects::EventWorkspace>( - "EventWorkspace", static_cast<size_t>(spDetMsg->n_spectra()), - spDetMsg->spectrum()->data(), spDetMsg->detector_id()->data(), nudet); - - // Load the instrument if possible but continue if we can't - auto jsonGeometry = runStartData.nexusStructure; - auto instName = runStartData.instrumentName; - if (!instName.empty()) - loadInstrument<DataObjects::EventWorkspace>(instName, eventBuffer, - jsonGeometry); - else - g_log.warning( - "Empty instrument name received. Continuing without instrument"); + const auto jsonGeometry = runStartData.nexusStructure; + const auto instName = runStartData.instrumentName; + + DataObjects::EventWorkspace_sptr eventBuffer; + if (rawMsgBuffer.empty()) { + /* Load the instrument to get the number of spectra :c */ + auto ws = + API::WorkspaceFactory::Instance().create("EventWorkspace", 1, 2, 1); + loadInstrument<API::MatrixWorkspace>(instName, ws, jsonGeometry); + const auto nspec = ws->getInstrument()->getNumberDetectors(); + + // Create buffer + eventBuffer = boost::static_pointer_cast<DataObjects::EventWorkspace>( + API::WorkspaceFactory::Instance().create("EventWorkspace", nspec, 2, + 1)); + eventBuffer->setInstrument(ws->getInstrument()); + eventBuffer->rebuildSpectraMapping(); + eventBuffer->getAxis(0)->unit() = + Kernel::UnitFactory::Instance().create("TOF"); + eventBuffer->setYUnit("Counts"); + } else { + /* Parse mapping from stream */ + auto spDetMsg = GetSpectraDetectorMapping( + reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str())); + auto nspec = static_cast<uint32_t>(spDetMsg->n_spectra()); + auto nudet = spDetMsg->detector_id()->size(); + if (nudet != nspec) { + std::ostringstream os; + os << "KafkaEventStreamDecoder::initLocalEventBuffer() - Invalid " + "spectra/detector mapping. Expected matched length arrays but " + "found nspec=" + << nspec << ", ndet=" << nudet; + throw std::runtime_error(os.str()); + } + + // Create buffer + eventBuffer = createBufferWorkspace<DataObjects::EventWorkspace>( + "EventWorkspace", static_cast<size_t>(spDetMsg->n_spectra()), + spDetMsg->spectrum()->data(), spDetMsg->detector_id()->data(), nudet); + + // Load the instrument if possible but continue if we can't + if (!instName.empty()) { + loadInstrument<DataObjects::EventWorkspace>(instName, eventBuffer, + jsonGeometry); + if (rawMsgBuffer.empty()) { + eventBuffer->rebuildSpectraMapping(); + } + } else { + g_log.warning( + "Empty instrument name received. Continuing without instrument"); + } + } auto &mutableRun = eventBuffer->mutableRun(); // Run start. Cache locally for computing frame times diff --git a/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp b/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp index eeab29a0453988202ed007b490fbf3746059075a..0caff6f6973a61c5c87b5928020880884c1129f4 100644 --- a/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp +++ b/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp @@ -206,40 +206,57 @@ void KafkaHistoStreamDecoder::captureImplExcept() { void KafkaHistoStreamDecoder::initLocalCaches( const std::string &rawMsgBuffer, const RunStartStruct &runStartData) { - if (rawMsgBuffer.empty()) { - throw std::runtime_error("KafkaEventStreamDecoder::initLocalCaches() - " - "Empty message received from spectrum-detector " - "topic. Unable to continue"); - } - auto spDetMsg = GetSpectraDetectorMapping( - reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str())); - auto nspec = static_cast<uint32_t>(spDetMsg->n_spectra()); - auto nudet = spDetMsg->detector_id()->size(); - if (nudet != nspec) { - std::ostringstream os; - os << "KafkaEventStreamDecoder::initLocalEventBuffer() - Invalid " - "spectra/detector mapping. Expected matched length arrays but " - "found nspec=" - << nspec << ", ndet=" << nudet; - throw std::runtime_error(os.str()); - } - m_runId = runStartData.runId; - // Create buffer - auto histoBuffer = createBufferWorkspace<DataObjects::Workspace2D>( - "Workspace2D", static_cast<size_t>(spDetMsg->n_spectra()), - spDetMsg->spectrum()->data(), spDetMsg->detector_id()->data(), nudet); - - // Load the instrument if possible but continue if we can't - auto jsonGeometry = runStartData.nexusStructure; - auto instName = runStartData.instrumentName; - if (!instName.empty()) - loadInstrument<DataObjects::Workspace2D>(instName, histoBuffer, - jsonGeometry); - else - g_log.warning( - "Empty instrument name received. Continuing without instrument"); + const auto jsonGeometry = runStartData.nexusStructure; + const auto instName = runStartData.instrumentName; + + DataObjects::Workspace2D_sptr histoBuffer; + if (rawMsgBuffer.empty()) { + /* Load the instrument to get the number of spectra :c */ + auto ws = API::WorkspaceFactory::Instance().create("Workspace2D", 1, 2, 1); + loadInstrument<API::MatrixWorkspace>(instName, ws, jsonGeometry); + const auto nspec = ws->getInstrument()->getNumberDetectors(); + + // Create buffer + histoBuffer = boost::static_pointer_cast<DataObjects::Workspace2D>( + API::WorkspaceFactory::Instance().create("Workspace2D", nspec, 2, 1)); + histoBuffer->setInstrument(ws->getInstrument()); + histoBuffer->rebuildSpectraMapping(); + histoBuffer->getAxis(0)->unit() = + Kernel::UnitFactory::Instance().create("TOF"); + histoBuffer->setYUnit("Counts"); + } else { + auto spDetMsg = GetSpectraDetectorMapping( + reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str())); + auto nspec = static_cast<uint32_t>(spDetMsg->n_spectra()); + auto nudet = spDetMsg->detector_id()->size(); + if (nudet != nspec) { + std::ostringstream os; + os << "KafkaEventStreamDecoder::initLocalEventBuffer() - Invalid " + "spectra/detector mapping. Expected matched length arrays but " + "found nspec=" + << nspec << ", ndet=" << nudet; + throw std::runtime_error(os.str()); + } + + // Create buffer + histoBuffer = createBufferWorkspace<DataObjects::Workspace2D>( + "Workspace2D", static_cast<size_t>(spDetMsg->n_spectra()), + spDetMsg->spectrum()->data(), spDetMsg->detector_id()->data(), nudet); + + // Load the instrument if possible but continue if we can't + if (!instName.empty()) { + loadInstrument<DataObjects::Workspace2D>(instName, histoBuffer, + jsonGeometry); + if (rawMsgBuffer.empty()) { + histoBuffer->rebuildSpectraMapping(); + } + } else { + g_log.warning( + "Empty instrument name received. Continuing without instrument"); + } + } auto &mutableRun = histoBuffer->mutableRun(); // Run start. Cache locally for computing frame times diff --git a/Framework/LiveData/test/KafkaEventStreamDecoderTest.h b/Framework/LiveData/test/KafkaEventStreamDecoderTest.h index 7711ef785e2a006e2af09e60e833810bf7e752f8..8b91f7ccfcb2f766c5e9c00c3fde1580ffb20c50 100644 --- a/Framework/LiveData/test/KafkaEventStreamDecoderTest.h +++ b/Framework/LiveData/test/KafkaEventStreamDecoderTest.h @@ -443,24 +443,6 @@ public: TS_ASSERT(!decoder->isCapturing()); } - void test_Empty_SpDet_Stream_Throws_Error_On_ExtractData() { - using namespace ::testing; - using namespace KafkaTesting; - - auto mockBroker = std::make_shared<MockKafkaBroker>(); - EXPECT_CALL(*mockBroker, subscribe_(_, _)) - .Times(Exactly(3)) - .WillOnce(Return(new FakeISISEventSubscriber(1))) - .WillOnce(Return(new FakeRunInfoStreamSubscriber(1))) - .WillOnce(Return(new FakeEmptyStreamSubscriber)); - auto decoder = createTestDecoder(mockBroker); - startCapturing(*decoder, 1); - - TS_ASSERT_THROWS(decoder->extractData(), const std::runtime_error &); - TS_ASSERT_THROWS_NOTHING(decoder->stopCapture()); - TS_ASSERT(!decoder->isCapturing()); - } - void test_Empty_RunInfo_Stream_Throws_Error_On_ExtractData() { using namespace ::testing; using namespace KafkaTesting;