diff --git a/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.h b/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.h index af83e9bb7ba8d0c9bd350653441f7e26f4c256f9..b865a7ffb75e0f5c023398d0beac35ad69b2342a 100644 --- a/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.h +++ b/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.h @@ -125,7 +125,8 @@ protected: /// Populate cache workspaces with data from messages virtual void sampleDataFromMessage(const std::string &buffer) = 0; - void writeChopperInfoToWorkspaceLogs(Mantid::API::Run &mutableRun); + template <typename T = API::MatrixWorkspace> + void writeChopperTimestampsToWorkspaceLogs(std::vector<T> workspaces); /// For LoadLiveData to extract the cached data virtual API::Workspace_sptr extractDataImpl() = 0; diff --git a/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.tcc b/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.tcc index c250b2ad1919314f56235f689a1a9b3620cb158e..bb9449b0512b3b928dac10a5c3027182cfc0cb60 100644 --- a/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.tcc +++ b/Framework/LiveData/inc/MantidLiveData/Kafka/IKafkaStreamDecoder.tcc @@ -117,5 +117,50 @@ void IKafkaStreamDecoder::loadInstrument(const std::string &name, "instrument geometry. \n"; } } + +/** + * Add chopper timestamps to the mutable run info of all workspaces used to + * buffer data from the kafka stream. + * @param qorkspaces buffer workspaces storing kafka data. + */ +template <typename T> +void IKafkaStreamDecoder::writeChopperTimestampsToWorkspaceLogs( + std::vector<T> workspaces) { + if (!m_chopperStream) + return; + + std::string buffer; + int64_t offset; + int32_t partition; + std::string topicName; + m_chopperStream->consumeMessage(&buffer, offset, partition, topicName); + + if (buffer.empty()) + return; + + if (flatbuffers::BufferHasIdentifier( + reinterpret_cast<const uint8_t *>(buffer.c_str()), + CHOPPER_MESSAGE_ID.c_str())) { + auto chopperMsg = + Gettimestamp(reinterpret_cast<const uint8_t *>(buffer.c_str())); + + const auto *timestamps = chopperMsg->timestamps(); + std::vector<uint64_t> mantidTimestamps; + std::copy(timestamps->begin(), timestamps->end(), mantidTimestamps.begin()); + auto name = chopperMsg->name()->str(); + + for (auto workspace : workspaces) { + auto mutableRunInfo = workspace->mutableRun(); + Kernel::ArrayProperty<uint64_t> *property; + if (mutableRunInfo.hasProperty(name)) { + property = dynamic_cast<Kernel::ArrayProperty<uint64_t> *>( + mutableRunInfo.getProperty(name)); + } else { + property = new Mantid::Kernel::ArrayProperty<uint64_t>(name); + } + *property = mantidTimestamps; + } + } +} } // namespace LiveData } // namespace Mantid diff --git a/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp b/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp index e44bbf4c2a6e00aac2732a4400e25ff03f82be24..5cca34e00a4cdabff35f584f2583b26f12e1e272 100644 --- a/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp +++ b/Framework/LiveData/src/Kafka/IKafkaStreamDecoder.cpp @@ -6,19 +6,18 @@ // SPDX - License - Identifier: GPL - 3.0 + #include "MantidLiveData/Kafka/IKafkaStreamDecoder.tcc" #include "MantidAPI/Run.h" +#include "MantidKernel/ArrayProperty.h" #include "MantidKernel/Logger.h" #include "MantidKernel/WarningSuppressions.h" #include "MantidLiveData/Exception.h" #include "MantidLiveData/Kafka/KafkaTopicSubscriber.h" #include "MantidNexusGeometry/JSONGeometryParser.h" -#include "MantidTypes/Core/DateAndTime.h" -#include "MantidKernel/ArrayProperty.h" GNU_DIAG_OFF("conversion") #include "private/Schema/df12_det_spec_map_generated.h" #include "private/Schema/f142_logdata_generated.h" -#include "private/Schema/y2gw_run_info_generated.h" #include "private/Schema/tdct_timestamps_generated.h" +#include "private/Schema/y2gw_run_info_generated.h" GNU_DIAG_ON("conversion") #include <json/json.h> @@ -451,45 +450,6 @@ int64_t IKafkaStreamDecoder::getRunInfoMessage(std::string &rawMsgBuffer) { return offset; } -void IKafkaStreamDecoder::writeChopperInfoToWorkspaceLogs( - API::Run &mutableRunInfo) { - std::string buffer; - int64_t offset; - int32_t partition; - std::string topicName; - m_chopperStream->consumeMessage(&buffer, offset, partition, topicName); - - if (buffer.empty()) - return; - - if (flatbuffers::BufferHasIdentifier( - reinterpret_cast<const uint8_t *>(buffer.c_str()), - CHOPPER_MESSAGE_ID.c_str())) { - auto chopperMsg = - Gettimestamp(reinterpret_cast<const uint8_t *>(buffer.c_str())); - const auto *timestamps = chopperMsg->timestamps(); - std::vector<Types::Core::DateAndTime> mantidTimestamps; - mantidTimestamps.reserve(timestamps->size()); - const int64_t nanoseconds1970To1990 = 631152000000000000L; - std::transform(timestamps->begin(), timestamps->end(), - std::back_inserter(mantidTimestamps), - [&nanoseconds1970To1990](const int64_t ×tamp) { - return Types::Core::DateAndTime( - static_cast<int64_t>(timestamp) - - nanoseconds1970To1990); - }); - auto name = chopperMsg->name()->str(); - Kernel::ArrayProperty<Core::DateAndTime> *property; - if (mutableRunInfo.hasProperty(name)) { - property = dynamic_cast<Kernel::ArrayProperty<Core::DateAndTime> *>( - mutableRunInfo.getProperty(name)); - } else { - property = new Mantid::Kernel::ArrayProperty<Core::DateAndTime>(name); - } - *property = mantidTimestamps; - } -} - std::map<int32_t, std::set<int32_t>> IKafkaStreamDecoder::buildSpectrumToDetectorMap(const int32_t *spec, const int32_t *udet, @@ -510,58 +470,57 @@ IKafkaStreamDecoder::buildSpectrumToDetectorMap(const int32_t *spec, return spdetMap; } - void IKafkaStreamDecoder::checkRunMessage( - const std::string &buffer, bool &checkOffsets, - std::unordered_map<std::string, std::vector<int64_t>> &stopOffsets, - std::unordered_map<std::string, std::vector<bool>> &reachedEnd) { - if (flatbuffers::BufferHasIdentifier( - reinterpret_cast<const uint8_t *>(buffer.c_str()), - RUN_MESSAGE_ID.c_str())) { - auto runMsg = - GetRunInfo(reinterpret_cast<const uint8_t *>(buffer.c_str())); - if (!checkOffsets && runMsg->info_type_type() == InfoTypes::RunStop) { - auto runStopMsg = static_cast<const RunStop *>(runMsg->info_type()); - auto stopTime = runStopMsg->stop_time(); - g_log.debug() << "Received an end-of-run message with stop time = " - << stopTime << std::endl; - stopOffsets = getStopOffsets(stopOffsets, reachedEnd, stopTime); - checkOffsets = true; - checkIfAllStopOffsetsReached(reachedEnd, checkOffsets); - } +void IKafkaStreamDecoder::checkRunMessage( + const std::string &buffer, bool &checkOffsets, + std::unordered_map<std::string, std::vector<int64_t>> &stopOffsets, + std::unordered_map<std::string, std::vector<bool>> &reachedEnd) { + if (flatbuffers::BufferHasIdentifier( + reinterpret_cast<const uint8_t *>(buffer.c_str()), + RUN_MESSAGE_ID.c_str())) { + auto runMsg = GetRunInfo(reinterpret_cast<const uint8_t *>(buffer.c_str())); + if (!checkOffsets && runMsg->info_type_type() == InfoTypes::RunStop) { + auto runStopMsg = static_cast<const RunStop *>(runMsg->info_type()); + auto stopTime = runStopMsg->stop_time(); + g_log.debug() << "Received an end-of-run message with stop time = " + << stopTime << std::endl; + stopOffsets = getStopOffsets(stopOffsets, reachedEnd, stopTime); + checkOffsets = true; + checkIfAllStopOffsetsReached(reachedEnd, checkOffsets); } } +} - void IKafkaStreamDecoder::checkRunEnd( - const std::string &topicName, bool &checkOffsets, const int64_t offset, - const int32_t partition, - std::unordered_map<std::string, std::vector<int64_t>> &stopOffsets, - std::unordered_map<std::string, std::vector<bool>> &reachedEnd) { - if (reachedEnd.count(topicName) && - offset >= stopOffsets[topicName][static_cast<size_t>(partition)]) { - - reachedEnd[topicName][static_cast<size_t>(partition)] = true; - - if (offset == stopOffsets[topicName][static_cast<size_t>(partition)]) { - g_log.debug() << "Reached end-of-run in " << topicName << " topic." - << std::endl; - g_log.debug() << "topic: " << topicName << " offset: " << offset - << " stopOffset: " - << stopOffsets[topicName][static_cast<size_t>(partition)] - << std::endl; - } - checkIfAllStopOffsetsReached(reachedEnd, checkOffsets); +void IKafkaStreamDecoder::checkRunEnd( + const std::string &topicName, bool &checkOffsets, const int64_t offset, + const int32_t partition, + std::unordered_map<std::string, std::vector<int64_t>> &stopOffsets, + std::unordered_map<std::string, std::vector<bool>> &reachedEnd) { + if (reachedEnd.count(topicName) && + offset >= stopOffsets[topicName][static_cast<size_t>(partition)]) { + + reachedEnd[topicName][static_cast<size_t>(partition)] = true; + + if (offset == stopOffsets[topicName][static_cast<size_t>(partition)]) { + g_log.debug() << "Reached end-of-run in " << topicName << " topic." + << std::endl; + g_log.debug() << "topic: " << topicName << " offset: " << offset + << " stopOffset: " + << stopOffsets[topicName][static_cast<size_t>(partition)] + << std::endl; } + checkIfAllStopOffsetsReached(reachedEnd, checkOffsets); } +} - int IKafkaStreamDecoder::runNumber() const noexcept { - if (m_runId.empty() || - (std::find_if_not(m_runId.cbegin(), m_runId.cend(), [](const char c) { - return std::isdigit(c); - }) == m_runId.end())) - return -1; +int IKafkaStreamDecoder::runNumber() const noexcept { + if (m_runId.empty() || + (std::find_if_not(m_runId.cbegin(), m_runId.cend(), [](const char c) { + return std::isdigit(c); + }) == m_runId.end())) + return -1; - return std::atoi(m_runId.c_str()); - } + return std::atoi(m_runId.c_str()); +} } // namespace LiveData } // namespace Mantid diff --git a/Framework/LiveData/src/Kafka/KafkaEventStreamDecoder.cpp b/Framework/LiveData/src/Kafka/KafkaEventStreamDecoder.cpp index f68487780da3f9f7619d0c4a9c63804f9bddb6bf..74a90853af3ec8623ef7da064e1629b075e19f3e 100644 --- a/Framework/LiveData/src/Kafka/KafkaEventStreamDecoder.cpp +++ b/Framework/LiveData/src/Kafka/KafkaEventStreamDecoder.cpp @@ -258,6 +258,8 @@ void KafkaEventStreamDecoder::captureImplExcept() { continue; } + writeChopperTimestampsToWorkspaceLogs(m_localEvents); + const auto end = std::chrono::system_clock::now(); const std::chrono::duration<double> dur = end - start; if (dur.count() >= 60) { diff --git a/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp b/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp index 1ac66d7a1ace3455ba53fbf7e76fcf27a27d8ff5..db2b2a21067c13a48bf84d46056b55f7903a8e2b 100644 --- a/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp +++ b/Framework/LiveData/src/Kafka/KafkaHistoStreamDecoder.cpp @@ -60,7 +60,7 @@ KafkaHistoStreamDecoder::KafkaHistoStreamDecoder( const std::string &runInfoTopic, const std::string &spDetTopic, const std::string &sampleEnvTopic) : IKafkaStreamDecoder(broker, histoTopic, runInfoTopic, spDetTopic, - sampleEnvTopic), + sampleEnvTopic, ""), m_workspace() {} /**