Skip to content
Snippets Groups Projects
Commit 9fa2b56b authored by Lamar Moore's avatar Lamar Moore
Browse files

IKafkaStreamDecoder and unit tests

parent d75e7fbe
No related branches found
No related tags found
No related merge requests found
Showing
with 2499 additions and 2002 deletions
...@@ -56,6 +56,7 @@ find_package ( LibRDKafka 0.11 ) ...@@ -56,6 +56,7 @@ find_package ( LibRDKafka 0.11 )
if ( LIBRDKAFKA_FOUND ) if ( LIBRDKAFKA_FOUND )
set ( SRC_FILES set ( SRC_FILES
${SRC_FILES} ${SRC_FILES}
src/Kafka/IKafkaStreamDecoder.cpp
src/Kafka/KafkaEventListener.cpp src/Kafka/KafkaEventListener.cpp
src/Kafka/KafkaEventStreamDecoder.cpp src/Kafka/KafkaEventStreamDecoder.cpp
src/Kafka/KafkaHistoListener.cpp src/Kafka/KafkaHistoListener.cpp
...@@ -69,6 +70,7 @@ if ( LIBRDKAFKA_FOUND ) ...@@ -69,6 +70,7 @@ if ( LIBRDKAFKA_FOUND )
inc/MantidLiveData/Kafka/KafkaEventStreamDecoder.h inc/MantidLiveData/Kafka/KafkaEventStreamDecoder.h
inc/MantidLiveData/Kafka/IKafkaStreamSubscriber.h inc/MantidLiveData/Kafka/IKafkaStreamSubscriber.h
inc/MantidLiveData/Kafka/IKafkaBroker.h inc/MantidLiveData/Kafka/IKafkaBroker.h
inc/MantidLiveData/Kafka/IKafkaStreamDecoder.h
inc/MantidLiveData/Kafka/KafkaBroker.h inc/MantidLiveData/Kafka/KafkaBroker.h
inc/MantidLiveData/Kafka/KafkaHistoListener.h inc/MantidLiveData/Kafka/KafkaHistoListener.h
inc/MantidLiveData/Kafka/KafkaHistoStreamDecoder.h inc/MantidLiveData/Kafka/KafkaHistoStreamDecoder.h
...@@ -87,6 +89,7 @@ if ( LIBRDKAFKA_FOUND ) ...@@ -87,6 +89,7 @@ if ( LIBRDKAFKA_FOUND )
set ( TEST_FILES set ( TEST_FILES
${TEST_FILES} ${TEST_FILES}
KafkaEventStreamDecoderTest.h KafkaEventStreamDecoderTest.h
KafkaHistoStreamDecoderTest.h
KafkaTopicSubscriberTest.h KafkaTopicSubscriberTest.h
) )
endif() endif()
......
// Mantid Repository : https://github.com/mantidproject/mantid
//
// Copyright © 2016 ISIS Rutherford Appleton Laboratory UKRI,
// NScD Oak Ridge National Laboratory, European Spallation Source
// & Institut Laue - Langevin
// SPDX - License - Identifier: GPL - 3.0 +
#ifndef MANTID_LIVEDATA_IKAFKASTREAMDECODER_H_
#define MANTID_LIVEDATA_IKAFKASTREAMDECODER_H_
#include "MantidAPI/SpectraDetectorTypes.h"
#include "MantidDataObjects/EventWorkspace.h"
#include "MantidLiveData/Kafka/IKafkaBroker.h"
#include "MantidLiveData/Kafka/IKafkaStreamSubscriber.h"
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <unordered_map>
namespace Mantid {
namespace LiveData {
/**
Kafka stream decoder interface. Handles (implements) all thread synchronization
functionality for accessing the data stream and processing data.
*/
class DLLExport IKafkaStreamDecoder {
public:
using CallbackFn = std::function<void()>;
public:
IKafkaStreamDecoder(std::shared_ptr<IKafkaBroker> broker,
const std::string &streamTopic,
const std::string &runInfoTopic,
const std::string &spDetTopic,
const std::string &sampleEnvTopic);
virtual ~IKafkaStreamDecoder();
IKafkaStreamDecoder(const IKafkaStreamDecoder &) = delete;
IKafkaStreamDecoder &operator=(const IKafkaStreamDecoder &) = delete;
public:
///@name Start/stop
///@{
void startCapture(bool startNow = true);
void stopCapture() noexcept;
///@}
///@name Querying
///@{
bool isCapturing() const noexcept { return m_capturing; }
virtual bool hasData() const noexcept = 0;
int runNumber() const noexcept { return m_runNumber; }
virtual bool hasReachedEndOfRun() noexcept = 0;
bool dataReset();
///@}
///@name Callbacks
///@{
virtual void registerIterationEndCb(CallbackFn cb) {
m_cbIterationEnd = std::move(cb);
}
virtual void registerErrorCb(CallbackFn cb) { m_cbError = std::move(cb); }
///@}
///@name Modifying
///@{
API::Workspace_sptr extractData();
///@}
protected:
struct RunStartStruct {
std::string instrumentName;
int runNumber;
uint64_t startTime;
size_t nPeriods;
int64_t runStartMsgOffset;
};
/// Main loop of listening for data messages and populating the cache
/// workspaces
void captureImpl() noexcept;
virtual void captureImplExcept() = 0;
/// Create the cache workspaces, LoadLiveData extracts data from these
virtual void initLocalCaches(const std::string &rawMsgBuffer,
const RunStartStruct &runStartData) = 0;
/// Get an expected message from the run information topic
int64_t getRunInfoMessage(std::string &rawMsgBuffer);
/// Get an expected RunStart message
RunStartStruct getRunStartMessage(std::string &rawMsgBuffer);
/// Populate cache workspaces with data from messages
virtual void sampleDataFromMessage(const std::string &buffer) = 0;
/// For LoadLiveData to extract the cached data
virtual API::Workspace_sptr extractDataImpl() = 0;
/// Broker to use to subscribe to topics
std::shared_ptr<IKafkaBroker> m_broker;
/// Topic names
const std::string m_streamTopic;
const std::string m_runInfoTopic;
const std::string m_spDetTopic;
const std::string m_sampleEnvTopic;
/// Flag indicating if user interruption has been requested
std::atomic<bool> m_interrupt;
/// Subscriber for the data stream
std::unique_ptr<IKafkaStreamSubscriber> m_dataStream;
/// Mapping of spectrum number to workspace index.
spec2index_map m_specToIdx;
/// Start time of the run
Types::Core::DateAndTime m_runStart;
/// Subscriber for the run info stream
std::unique_ptr<IKafkaStreamSubscriber> m_runStream;
/// Subscriber for the run info stream
std::unique_ptr<IKafkaStreamSubscriber> m_spDetStream;
/// Run number
int m_runNumber;
/// Associated thread running the capture process
std::thread m_thread;
/// Mutex protecting event buffers
mutable std::mutex m_mutex;
/// Mutex protecting the wait flag
mutable std::mutex m_waitMutex;
/// Mutex protecting the runStatusSeen flag
mutable std::mutex m_runStatusMutex;
/// Flag indicating that the decoder is capturing
std::atomic<bool> m_capturing;
/// Exception object indicating there was an error
boost::shared_ptr<std::runtime_error> m_exception;
/// For notifying other threads of changes to conditions (the following bools)
std::condition_variable m_cv;
std::condition_variable m_cvRunStatus;
/// Indicate that decoder has reached the last message in a run
std::atomic<bool> m_endRun;
/// Indicate that LoadLiveData is waiting for access to the buffer workspace
std::atomic<bool> m_extractWaiting;
/// Indicate that MonitorLiveData has seen the runStatus since it was set to
/// EndRun
bool m_runStatusSeen;
std::atomic<bool> m_extractedEndRunData;
/// Indicate if the next data to be extracted should replace LoadLiveData's
/// output workspace
std::atomic<bool> m_dataReset;
void waitForDataExtraction();
void waitForRunEndObservation();
std::map<int32_t, std::set<int32_t>>
buildSpectrumToDetectorMap(const size_t nspectra, const int32_t *spec,
const int32_t *udet, uint32_t length);
template <typename T>
boost::shared_ptr<T>
createBufferWorkspace(const std::string &workspaceClassName, size_t nspectra,
const int32_t *spec, const int32_t *udet,
uint32_t length);
template <typename T>
boost::shared_ptr<T>
createBufferWorkspace(const std::string &workspaceClassName,
const boost::shared_ptr<T> &parent);
template <typename T>
void loadInstrument(const std::string &name, boost::shared_ptr<T> workspace);
void checkRunMessage(
const std::string &buffer, bool &checkOffsets,
std::unordered_map<std::string, std::vector<int64_t>> &stopOffsets,
std::unordered_map<std::string, std::vector<bool>> &reachedEnd);
void checkRunEnd(
const std::string &topicName, bool &checkOffsets, const int64_t offset,
const int32_t partition,
std::unordered_map<std::string, std::vector<int64_t>> &stopOffsets,
std::unordered_map<std::string, std::vector<bool>> &reachedEnd);
/// Methods for checking if the end of a run was reached
std::unordered_map<std::string, std::vector<int64_t>> getStopOffsets(
std::unordered_map<std::string, std::vector<int64_t>> &stopOffsets,
std::unordered_map<std::string, std::vector<bool>> &reachedEnd,
uint64_t stopTime) const;
void checkIfAllStopOffsetsReached(
const std::unordered_map<std::string, std::vector<bool>> &reachedEnd,
bool &checkOffsets);
/// Callbacks for unit tests
CallbackFn m_cbIterationEnd;
CallbackFn m_cbError;
/// Waits until a run start message with higher run number is received
bool waitForNewRunStartMessage(RunStartStruct &runStartStructOutput);
/// Subscribe to data stream at the time specified in a run start message
void joinStreamAtTime(const RunStartStruct &runStartData);
/// Convert a duration in nanoseconds to milliseconds
int64_t nanosecondsToMilliseconds(uint64_t timeNanoseconds) const;
/// Get a det-spec map message using the time specified in a run start message
std::string getDetSpecMapForRun(const RunStartStruct &runStartStruct);
};
/**
* Create a buffer workspace of the correct size based on the values given.
* @param workspaceClassName the name of the workspace class to be created e.g
* Workspace2D or EventWorkspace
* @param nspectra The number of unique spectrum numbers
* @param spec An array of length ndet specifying the spectrum number of each
* detector
* @param udet An array of length ndet specifying the detector ID of each
* detector
* @param length The length of the spec/udet arrays
* @return A new workspace of the appropriate size
*/
template <typename T>
boost::shared_ptr<T> IKafkaStreamDecoder::createBufferWorkspace(
const std::string &workspaceClassName, size_t nspectra, const int32_t *spec,
const int32_t *udet, uint32_t length) {
// Get spectra to detector mapping
auto spdetMap = buildSpectrumToDetectorMap(nspectra, spec, udet, length);
// Create histo workspace
auto buffer =
boost::static_pointer_cast<T>(API::WorkspaceFactory::Instance().create(
workspaceClassName, nspectra, 2, 1));
// Set the units
buffer->getAxis(0)->unit() = Kernel::UnitFactory::Instance().create("TOF");
buffer->setYUnit("Counts");
// Setup spectra-detector mapping.
size_t wsIdx(0);
for (const auto &spIter : spdetMap) {
auto &spectrum = buffer->getSpectrum(wsIdx);
spectrum.setSpectrumNo(spIter.first);
spectrum.addDetectorIDs(spIter.second);
++wsIdx;
}
return buffer;
}
/**
* Create new buffer workspace from an existing copy
* @param workspaceClassName the name of the workspace class to be created e.g
* Workspace2D or EventWorkspace
* @param parent A reference to an existing workspace
*/
template <typename T>
boost::shared_ptr<T> IKafkaStreamDecoder::createBufferWorkspace(
const std::string &workspaceClassName, const boost::shared_ptr<T> &parent) {
auto buffer =
boost::static_pointer_cast<T>(API::WorkspaceFactory::Instance().create(
workspaceClassName, parent->getNumberHistograms(), 2, 1));
// Copy meta data
API::WorkspaceFactory::Instance().initializeFromParent(*parent, *buffer,
false);
// Clear out the old logs, except for the most recent entry
buffer->mutableRun().clearOutdatedTimeSeriesLogValues();
return buffer;
}
/**
* Run LoadInstrument for the given instrument name. If it cannot succeed it
* does nothing to the internal workspace
* @param name Name of an instrument to load
* @param workspace A pointer to the workspace receiving the instrument
*/
template <typename T>
void IKafkaStreamDecoder::loadInstrument(const std::string &name,
boost::shared_ptr<T> workspace) {
if (name.empty()) {
g_log.warning("Empty instrument name found");
return;
}
try {
auto alg =
API::AlgorithmManager::Instance().createUnmanaged("LoadInstrument");
// Do not put the workspace in the ADS
alg->setChild(true);
alg->initialize();
alg->setPropertyValue("InstrumentName", name);
alg->setProperty("Workspace", workspace);
alg->setProperty("RewriteSpectraMap", Kernel::OptionalBool(false));
alg->execute();
} catch (std::exception &exc) {
g_log.warning() << "Error loading instrument '" << name
<< "': " << exc.what() << "\n";
}
}
} // namespace LiveData
} // namespace Mantid
#endif // MANTID_LIVEDATA_IKAFKASTREAMDECODER_H_
...@@ -10,13 +10,9 @@ ...@@ -10,13 +10,9 @@
#include "MantidAPI/SpectraDetectorTypes.h" #include "MantidAPI/SpectraDetectorTypes.h"
#include "MantidDataObjects/EventWorkspace.h" #include "MantidDataObjects/EventWorkspace.h"
#include "MantidLiveData/Kafka/IKafkaBroker.h" #include "MantidLiveData/Kafka/IKafkaBroker.h"
#include "MantidLiveData/Kafka/IKafkaStreamDecoder.h"
#include "MantidLiveData/Kafka/IKafkaStreamSubscriber.h" #include "MantidLiveData/Kafka/IKafkaStreamSubscriber.h"
#include <atomic>
#include <condition_variable>
#include <mutex>
#include <thread>
namespace Mantid { namespace Mantid {
namespace LiveData { namespace LiveData {
...@@ -27,10 +23,7 @@ namespace LiveData { ...@@ -27,10 +23,7 @@ namespace LiveData {
A call to capture() starts the process of capturing the stream on a separate A call to capture() starts the process of capturing the stream on a separate
thread. thread.
*/ */
class DLLExport KafkaEventStreamDecoder { class DLLExport KafkaEventStreamDecoder : public IKafkaStreamDecoder {
public:
using CallbackFn = std::function<void()>;
public: public:
KafkaEventStreamDecoder(std::shared_ptr<IKafkaBroker> broker, KafkaEventStreamDecoder(std::shared_ptr<IKafkaBroker> broker,
const std::string &eventTopic, const std::string &eventTopic,
...@@ -42,150 +35,29 @@ public: ...@@ -42,150 +35,29 @@ public:
KafkaEventStreamDecoder &operator=(const KafkaEventStreamDecoder &) = delete; KafkaEventStreamDecoder &operator=(const KafkaEventStreamDecoder &) = delete;
public: public:
///@name Start/stop
///@{
void startCapture(bool startNow = true);
void stopCapture() noexcept;
///@}
///@name Querying ///@name Querying
///@{ ///@{
bool isCapturing() const noexcept { return m_capturing; } bool hasData() const noexcept override;
bool hasData() const noexcept; bool hasReachedEndOfRun() noexcept override;
int runNumber() const noexcept { return m_runNumber; }
bool hasReachedEndOfRun() noexcept;
bool dataReset();
///@}
///@name Callbacks
///@{
void registerIterationEndCb(CallbackFn cb) {
m_cbIterationEnd = std::move(cb);
}
void registerErrorCb(CallbackFn cb) { m_cbError = std::move(cb); }
///@}
///@name Modifying
///@{
API::Workspace_sptr extractData();
///@} ///@}
private: private:
struct RunStartStruct { void captureImplExcept() override;
std::string instrumentName;
int runNumber;
uint64_t startTime;
size_t nPeriods;
int64_t runStartMsgOffset;
};
/// Main loop of listening for data messages and populating the cache
/// workspaces
void captureImpl() noexcept;
void captureImplExcept();
/// Create the cache workspaces, LoadLiveData extracts data from these /// Create the cache workspaces, LoadLiveData extracts data from these
void initLocalCaches(const std::string &rawMsgBuffer, void initLocalCaches(const std::string &rawMsgBuffer,
const RunStartStruct &runStartData); const RunStartStruct &runStartData) override;
DataObjects::EventWorkspace_sptr createBufferWorkspace(size_t nspectra,
const int32_t *spec,
const int32_t *udet,
uint32_t length);
DataObjects::EventWorkspace_sptr
createBufferWorkspace(const DataObjects::EventWorkspace_sptr &parent);
/// Load a named instrument into a workspace
void loadInstrument(const std::string &name,
DataObjects::EventWorkspace_sptr workspace);
/// Get an expected message from the run information topic
int64_t getRunInfoMessage(std::string &rawMsgBuffer);
/// Get an expected RunStart message
RunStartStruct getRunStartMessage(std::string &rawMsgBuffer);
/// Populate cache workspaces with data from messages /// Populate cache workspaces with data from messages
void eventDataFromMessage(const std::string &buffer); void eventDataFromMessage(const std::string &buffer);
void sampleDataFromMessage(const std::string &buffer);
void sampleDataFromMessage(const std::string &buffer) override;
/// For LoadLiveData to extract the cached data /// For LoadLiveData to extract the cached data
API::Workspace_sptr extractDataImpl(); API::Workspace_sptr extractDataImpl() override;
/// Broker to use to subscribe to topics
std::shared_ptr<IKafkaBroker> m_broker;
/// Topic names
const std::string m_eventTopic;
const std::string m_runInfoTopic;
const std::string m_spDetTopic;
const std::string m_sampleEnvTopic;
/// Flag indicating if user interruption has been requested
std::atomic<bool> m_interrupt;
/// Subscriber for the event stream
std::unique_ptr<IKafkaStreamSubscriber> m_eventStream;
/// Local event workspace buffers /// Local event workspace buffers
std::vector<DataObjects::EventWorkspace_sptr> m_localEvents; std::vector<DataObjects::EventWorkspace_sptr> m_localEvents;
/// Mapping of spectrum number to workspace index.
spec2index_map m_specToIdx;
/// Start time of the run
Types::Core::DateAndTime m_runStart;
/// Subscriber for the run info stream
std::unique_ptr<IKafkaStreamSubscriber> m_runStream;
/// Subscriber for the run info stream
std::unique_ptr<IKafkaStreamSubscriber> m_spDetStream;
/// Run number
int m_runNumber;
/// Associated thread running the capture process
std::thread m_thread;
/// Mutex protecting event buffers
mutable std::mutex m_mutex;
/// Mutex protecting the wait flag
mutable std::mutex m_waitMutex;
/// Mutex protecting the runStatusSeen flag
mutable std::mutex m_runStatusMutex;
/// Flag indicating that the decoder is capturing
std::atomic<bool> m_capturing;
/// Exception object indicating there was an error
boost::shared_ptr<std::runtime_error> m_exception;
/// For notifying other threads of changes to conditions (the following bools)
std::condition_variable m_cv;
std::condition_variable m_cvRunStatus;
/// Indicate that decoder has reached the last message in a run
std::atomic<bool> m_endRun;
/// Indicate that LoadLiveData is waiting for access to the buffer workspace
std::atomic<bool> m_extractWaiting;
/// Indicate that MonitorLiveData has seen the runStatus since it was set to
/// EndRun
bool m_runStatusSeen;
std::atomic<bool> m_extractedEndRunData;
/// Indicate if the next data to be extracted should replace LoadLiveData's
/// output workspace
std::atomic<bool> m_dataReset;
void waitForDataExtraction();
void waitForRunEndObservation();
/// Methods for checking if the end of a run was reached
std::unordered_map<std::string, std::vector<int64_t>> getStopOffsets(
std::unordered_map<std::string, std::vector<int64_t>> &stopOffsets,
std::unordered_map<std::string, std::vector<bool>> &reachedEnd,
uint64_t stopTime) const;
void checkIfAllStopOffsetsReached(
const std::unordered_map<std::string, std::vector<bool>> &reachedEnd,
bool &checkOffsets);
/// Callbacks for unit tests
CallbackFn m_cbIterationEnd;
CallbackFn m_cbError;
/// Waits until a run start message with higher run number is received
bool waitForNewRunStartMessage(RunStartStruct &runStartStructOutput);
/// Subscribe to event stream at the time specified in a run start message
void joinEventStreamAtTime(const RunStartStruct &runStartData);
/// Convert a duration in nanoseconds to milliseconds
int64_t nanosecondsToMilliseconds(uint64_t timeNanoseconds) const;
/// Get a det-spec map message using the time specified in a run start message
std::string getDetSpecMapForRun(const RunStartStruct &runStartStruct);
}; };
} // namespace LiveData } // namespace LiveData
......
...@@ -39,6 +39,8 @@ public: ...@@ -39,6 +39,8 @@ public:
/// Does this listener buffer events (true) or histogram data (false) /// Does this listener buffer events (true) or histogram data (false)
bool buffersEvents() const override { return false; } bool buffersEvents() const override { return false; }
void setAlgorithm(const Mantid::API::IAlgorithm &callingAlgorithm) override;
//---------------------------------------------------------------------- //----------------------------------------------------------------------
// Actions // Actions
//---------------------------------------------------------------------- //----------------------------------------------------------------------
...@@ -56,6 +58,7 @@ public: ...@@ -56,6 +58,7 @@ public:
private: private:
std::unique_ptr<KafkaHistoStreamDecoder> m_decoder = nullptr; std::unique_ptr<KafkaHistoStreamDecoder> m_decoder = nullptr;
std::string m_instrumentName;
}; };
} // namespace LiveData } // namespace LiveData
......
...@@ -8,12 +8,9 @@ ...@@ -8,12 +8,9 @@
#define MANTID_LIVEDATA_ISISKAFKAHISTOSTREAMDECODER_H_ #define MANTID_LIVEDATA_ISISKAFKAHISTOSTREAMDECODER_H_
#include "MantidDataObjects/Workspace2D.h" #include "MantidDataObjects/Workspace2D.h"
#include "MantidLiveData/Kafka/IKafkaBroker.h"
#include "MantidLiveData/Kafka/IKafkaStreamDecoder.h"
#include "MantidLiveData/Kafka/IKafkaStreamSubscriber.h" #include "MantidLiveData/Kafka/IKafkaStreamSubscriber.h"
#include "MantidLiveData/Kafka/KafkaBroker.h"
#include <atomic>
#include <mutex>
#include <thread>
namespace Mantid { namespace Mantid {
namespace LiveData { namespace LiveData {
...@@ -24,65 +21,34 @@ namespace LiveData { ...@@ -24,65 +21,34 @@ namespace LiveData {
A call to startCapture() starts the process of capturing the stream on a A call to startCapture() starts the process of capturing the stream on a
separate thread. separate thread.
*/ */
class DLLExport KafkaHistoStreamDecoder { class DLLExport KafkaHistoStreamDecoder : public IKafkaStreamDecoder {
public: public:
KafkaHistoStreamDecoder(const std::string &brokerAddress, KafkaHistoStreamDecoder(std::shared_ptr<IKafkaBroker> broker,
const std::string &histoTopic, const std::string &histoTopic,
const std::string &instrumentName); const std::string &runInfoTopic,
const std::string &spDetTopic,
const std::string &sampleEnvTopic);
~KafkaHistoStreamDecoder(); ~KafkaHistoStreamDecoder();
KafkaHistoStreamDecoder(const KafkaHistoStreamDecoder &) = delete; KafkaHistoStreamDecoder(const KafkaHistoStreamDecoder &) = delete;
KafkaHistoStreamDecoder &operator=(const KafkaHistoStreamDecoder &) = delete; KafkaHistoStreamDecoder &operator=(const KafkaHistoStreamDecoder &) = delete;
public: bool hasData() const noexcept override;
///@name Start/stop bool hasReachedEndOfRun() noexcept override { return !m_capturing; }
///@{
void startCapture(bool startNow = true);
void stopCapture() noexcept;
///@}
///@name Querying private:
///@{ void captureImplExcept() override;
bool isCapturing() const { return m_capturing; }
bool hasData() const;
int runNumber() const { return 1; }
bool hasReachedEndOfRun() { return !m_capturing; }
///@}
///@name Modifying /// Create the cache workspaces, LoadLiveData extracts data from these
///@{ void initLocalCaches(const std::string &rawMsgBuffer,
API::Workspace_sptr extractData(); const RunStartStruct &runStartData) override;
///@}
private: void sampleDataFromMessage(const std::string &buffer) override;
void captureImpl();
void captureImplExcept();
API::Workspace_sptr extractDataImpl();
DataObjects::Workspace2D_sptr createBufferWorkspace(); API::Workspace_sptr extractDataImpl() override;
/// Broker to use to subscribe to topics private:
KafkaBroker m_broker;
/// Topic name
const std::string m_histoTopic;
/// Instrument name
const std::string m_instrumentName;
/// Subscriber for the histo stream
std::unique_ptr<IKafkaStreamSubscriber> m_histoStream;
/// Workspace used as template for workspaces created when extracting
DataObjects::Workspace2D_sptr m_workspace;
/// Buffer for latest FlatBuffers message
std::string m_buffer; std::string m_buffer;
DataObjects::Workspace2D_sptr m_workspace;
/// Associated thread running the capture process
std::thread m_thread;
/// Mutex protecting histo buffers
mutable std::mutex m_buffer_mutex;
/// Flag indicating if user interruption has been requested
std::atomic<bool> m_interrupt;
/// Flag indicating that the decoder is capturing
std::atomic<bool> m_capturing;
/// Exception object indicating there was an error
std::unique_ptr<std::runtime_error> m_exception;
}; };
} // namespace LiveData } // namespace LiveData
......
// Mantid Repository : https://github.com/mantidproject/mantid
//
// Copyright &copy; 2018 ISIS Rutherford Appleton Laboratory UKRI,
// NScD Oak Ridge National Laboratory, European Spallation Source
// & Institut Laue - Langevin
// SPDX - License - Identifier: GPL - 3.0 +
#include "MantidLiveData/Kafka/IKafkaStreamDecoder.h"
#include "MantidKernel/Logger.h"
#include "MantidKernel/WarningSuppressions.h"
#include "MantidLiveData/Exception.h"
#include "MantidLiveData/Kafka/KafkaTopicSubscriber.h"
GNU_DIAG_OFF("conversion")
#include "private/Schema/ba57_run_info_generated.h"
#include "private/Schema/df12_det_spec_map_generated.h"
#include "private/Schema/f142_logdata_generated.h"
GNU_DIAG_ON("conversion")
using namespace Mantid::Types;
using namespace LogSchema;
namespace {
/// Logger
Mantid::Kernel::Logger g_log("IKafkaStreamDecoder");
// File identifiers from flatbuffers schema
const std::string RUN_MESSAGE_ID = "ba57";
const std::chrono::seconds MAX_LATENCY(1);
} // namespace
namespace Mantid {
namespace LiveData {
// -----------------------------------------------------------------------------
// Public members
// -----------------------------------------------------------------------------
/**
* Constructor
* @param broker A reference to a Broker object for creating topic streams
* @param streamTopic The name of the topic streaming the stream data
* @param runInfoTopic The name of the topic streaming the run information
* @param spDetTopic The name of the topic streaming the spectrum-detector
* @param sampleEnvTopic The name of the topic stream sample environment
* information. run mapping
*/
IKafkaStreamDecoder::IKafkaStreamDecoder(std::shared_ptr<IKafkaBroker> broker,
const std::string &streamTopic,
const std::string &runInfoTopic,
const std::string &spDetTopic,
const std::string &sampleEnvTopic)
: m_broker(broker), m_streamTopic(streamTopic),
m_runInfoTopic(runInfoTopic), m_spDetTopic(spDetTopic),
m_sampleEnvTopic(sampleEnvTopic), m_interrupt(false), m_specToIdx(),
m_runStart(), m_runNumber(-1), m_thread(), m_capturing(false),
m_exception(), m_extractWaiting(false), m_cbIterationEnd([] {}),
m_cbError([] {}) {}
/**
* Destructor.
* Stops capturing from the stream
*/
IKafkaStreamDecoder::~IKafkaStreamDecoder() { stopCapture(); }
/**
* Start capturing from the stream on a separate thread. This is a non-blocking
* call and will return after the thread has started
*/
void IKafkaStreamDecoder::startCapture(bool startNow) {
// If we are not starting now, then we want to start at the start of the run
if (!startNow) {
// Get last two messages in run topic to ensure we get a runStart message
m_runStream =
m_broker->subscribe({m_runInfoTopic}, SubscribeAtOption::LASTTWO);
std::string rawMsgBuffer;
auto runStartData = getRunStartMessage(rawMsgBuffer);
joinStreamAtTime(runStartData);
} else {
m_dataStream =
m_broker->subscribe({m_streamTopic, m_runInfoTopic, m_sampleEnvTopic},
SubscribeAtOption::LATEST);
}
// Get last two messages in run topic to ensure we get a runStart message
m_runStream =
m_broker->subscribe({m_runInfoTopic}, SubscribeAtOption::LASTTWO);
m_spDetStream =
m_broker->subscribe({m_spDetTopic}, SubscribeAtOption::LASTONE);
m_thread = std::thread([this]() { this->captureImpl(); });
m_thread.detach();
}
/** Indicate if the next data to be extracted should replace LoadLiveData's
* output workspace,
* for example the first data of a new run
*/
bool IKafkaStreamDecoder::dataReset() {
bool result = (m_dataReset == true); // copy from atomic bool
m_dataReset = false; // reset to false
return result;
}
void IKafkaStreamDecoder::joinStreamAtTime(
const IKafkaStreamDecoder::RunStartStruct &runStartData) {
auto runStartTime = runStartData.startTime;
int64_t startTimeMilliseconds = nanosecondsToMilliseconds(runStartTime);
m_dataStream =
m_broker->subscribe({m_streamTopic, m_runInfoTopic, m_sampleEnvTopic},
startTimeMilliseconds, SubscribeAtOption::TIME);
// make sure we listen to the run start topic starting from the run start
// message we already got the start time from
m_dataStream->seek(m_runInfoTopic, 0, runStartData.runStartMsgOffset);
}
int64_t
IKafkaStreamDecoder::nanosecondsToMilliseconds(uint64_t timeNanoseconds) const {
return static_cast<int64_t>(timeNanoseconds / 1000000);
}
/**
* Stop capturing from the stream. This is a blocking call until the capturing
* function has completed
*/
void IKafkaStreamDecoder::stopCapture() noexcept {
// This will interrupt the "event" loop
m_interrupt = true;
// Wait until the function has completed. The background thread
// will exit automatically
while (m_capturing) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
};
}
/**
* Check if a message has indicated that end of run has been reached
* @return True if end of run has been reached
*/
bool IKafkaStreamDecoder::hasReachedEndOfRun() noexcept {
// Notify the decoder that MonitorLiveData knows it has reached end of run
// and after giving it opportunity to interrupt, decoder can continue with
// messages of the next run
if (!m_extractedEndRunData || m_extractWaiting)
return false;
if (m_endRun) {
std::lock_guard<std::mutex> runStatusLock(m_runStatusMutex);
m_runStatusSeen = true;
m_cvRunStatus.notify_one();
return true;
}
return false;
}
/**
* Check for an exception thrown by the background thread and rethrow
* it if necessary. If no error occurred swap the current internal buffer
* for a fresh one and return the old buffer.
* @return A pointer to the data collected since the last call to this
* method
*/
API::Workspace_sptr IKafkaStreamDecoder::extractData() {
if (m_exception) {
throw std::runtime_error(*m_exception);
}
m_extractWaiting = true;
m_cv.notify_one();
auto workspace_ptr = extractDataImpl();
m_extractWaiting = false;
m_cv.notify_one();
return workspace_ptr;
}
/**
* Start decoding data from the streams into the internal buffers.
* Implementation designed to be entry point for new thread of execution.
* It catches all thrown exceptions.
*/
void IKafkaStreamDecoder::captureImpl() noexcept {
m_capturing = true;
try {
captureImplExcept();
} catch (std::exception &exc) {
m_cbError();
m_exception = boost::make_shared<std::runtime_error>(exc.what());
} catch (...) {
m_cbError();
m_exception = boost::make_shared<std::runtime_error>(
"IKafkaStreamDecoder: Unknown exception type caught.");
}
m_capturing = false;
}
/**
* Check if we've reached the stop offset on every partition of every topic
*
* @param reachedEnd : Bool for each topic and partition to mark when stop
* offset reached
*/
void IKafkaStreamDecoder::checkIfAllStopOffsetsReached(
const std::unordered_map<std::string, std::vector<bool>> &reachedEnd,
bool &checkOffsets) {
if (std::all_of(reachedEnd.cbegin(), reachedEnd.cend(),
[](std::pair<std::string, std::vector<bool>> kv) {
return std::all_of(
kv.second.cbegin(), kv.second.cend(),
[](bool partitionEnd) { return partitionEnd; });
}) ||
reachedEnd.empty()) {
m_endRun = true;
// If we've reached the end of a run then set m_extractWaiting to true
// so that we wait until the buffer is emptied before continuing.
// Otherwise we can end up with data from two different runs in the
// same buffer workspace which is problematic if the user wanted the
// "Stop" or "Rename" run transition option.
m_extractedEndRunData = false;
checkOffsets = false;
g_log.notice("Reached end of run in data streams.");
}
}
std::unordered_map<std::string, std::vector<int64_t>>
IKafkaStreamDecoder::getStopOffsets(
std::unordered_map<std::string, std::vector<int64_t>> &stopOffsets,
std::unordered_map<std::string, std::vector<bool>> &reachedEnd,
uint64_t stopTime) const {
reachedEnd.clear();
stopOffsets.clear();
// Wait for max latency so that we don't miss any late messages
std::this_thread::sleep_for(MAX_LATENCY);
stopOffsets = m_dataStream->getOffsetsForTimestamp(
static_cast<int64_t>(stopTime / 1000000));
// /1000000 to convert nanosecond precision from message to millisecond
// precision which Kafka offset query supports
auto currentOffsets = m_dataStream->getCurrentOffsets();
// Set reachedEnd to false for each topic and partition
for (auto &topicOffsets : stopOffsets) {
auto topicName = topicOffsets.first;
// Ignore the runInfo topic
if (topicName.substr(topicName.length() -
KafkaTopicSubscriber::RUN_TOPIC_SUFFIX.length()) !=
KafkaTopicSubscriber::RUN_TOPIC_SUFFIX) {
g_log.debug() << "TOPIC: " << topicName
<< " PARTITIONS: " << topicOffsets.second.size()
<< std::endl;
reachedEnd.insert(
{topicName, std::vector<bool>(topicOffsets.second.size(), false)});
auto &partitionOffsets = topicOffsets.second;
for (uint32_t partitionNumber = 0;
partitionNumber < partitionOffsets.size(); partitionNumber++) {
auto offset = partitionOffsets[partitionNumber];
// If the stop offset is negative then there are no messages for us
// to collect on this topic, so mark reachedEnd as true already
reachedEnd[topicName][partitionNumber] = offset < 0;
// If the stop offset has already been reached then mark reachedEnd as
// true
if (currentOffsets[topicName][partitionNumber] >= offset)
reachedEnd[topicName][partitionNumber] = true;
}
}
}
return stopOffsets;
}
/**
* If extractData method is waiting for access to the buffer workspace
* then we wait for it to finish
*/
void IKafkaStreamDecoder::waitForDataExtraction() {
{
std::unique_lock<std::mutex> readyLock(m_waitMutex);
m_cv.wait(readyLock, [&] { return !m_extractWaiting; });
}
}
void IKafkaStreamDecoder::waitForRunEndObservation() {
m_extractWaiting = true;
// Mark extractedEndRunData true before waiting on the extraction to ensure
// an immediate request for run status after extracting the data will return
// the correct value - avoids race condition in MonitorLiveData and tests
m_extractedEndRunData = true;
waitForDataExtraction();
// Wait until MonitorLiveData has seen that end of run was
// reached before setting m_endRun back to false and continuing
std::unique_lock<std::mutex> runStatusLock(m_runStatusMutex);
m_cvRunStatus.wait(runStatusLock, [&] { return m_runStatusSeen; });
m_endRun = false;
m_runStatusSeen = false;
runStatusLock.unlock();
// Set to zero until we have the new run number, MonitorLiveData will
// queries before each time it extracts data until it gets non-zero
m_runNumber = 0;
// Get new run message now so that new run number is available for
// MonitorLiveData as early as possible
RunStartStruct runStartStruct;
if (waitForNewRunStartMessage(runStartStruct))
return;
// Give time for MonitorLiveData to act on runStatus information
// and trigger m_interrupt for next loop iteration if user requested
// LiveData algorithm to stop at the end of the run
std::this_thread::sleep_for(std::chrono::milliseconds(100));
if (m_interrupt)
return;
// Rejoin event stream at start of new run
joinStreamAtTime(runStartStruct);
std::string detSpecMapMsgBuffer = getDetSpecMapForRun(runStartStruct);
initLocalCaches(detSpecMapMsgBuffer, runStartStruct);
}
/**
* Try to find a detector-spectrum map message published after the
* current run start time
*
* @param runStartStruct details of the current run
* @return received detector-spectrum map message buffer
*/
std::string IKafkaStreamDecoder::getDetSpecMapForRun(
const IKafkaStreamDecoder::RunStartStruct &runStartStruct) {
std::string rawMsgBuffer;
int64_t offset;
int32_t partition;
std::string topicName;
m_spDetStream = m_broker->subscribe(
{m_spDetTopic}, nanosecondsToMilliseconds(runStartStruct.startTime),
SubscribeAtOption::TIME);
m_spDetStream->consumeMessage(&rawMsgBuffer, offset, partition, topicName);
if (rawMsgBuffer.empty()) {
std::runtime_error(
"No detector-spectrum map message found for run number " +
std::to_string(runStartStruct.runNumber));
}
return rawMsgBuffer;
}
/**
* Wait for a run start message until we get one with a higher run number
* than the current run or the algorithm is interrupted
*
* @param runStartStructOutput details of the new run
* @return true if interrupted, false if got a new run start message
*/
bool IKafkaStreamDecoder::waitForNewRunStartMessage(
RunStartStruct &runStartStructOutput) {
while (!m_interrupt) {
std::string runMsgBuffer;
int64_t offset;
int32_t partition;
std::string topicName;
m_runStream->consumeMessage(&runMsgBuffer, offset, partition, topicName);
if (runMsgBuffer.empty()) {
continue; // no message available, try again
} else {
auto runMsg =
GetRunInfo(reinterpret_cast<const uint8_t *>(runMsgBuffer.c_str()));
if (runMsg->info_type_type() == InfoTypes_RunStart) {
// We got a run start message, deserialise it
auto runStartData = static_cast<const RunStart *>(runMsg->info_type());
IKafkaStreamDecoder::RunStartStruct runStartStruct = {
runStartData->instrument_name()->str(), runStartData->run_number(),
runStartData->start_time(),
static_cast<size_t>(runStartData->n_periods()), offset};
if (runStartStruct.runNumber > m_runNumber) {
runStartStructOutput = runStartStruct;
m_runNumber = runStartStruct.runNumber;
return false; // not interrupted
}
} else {
continue; // received message wasn't a RunStart message, try again
}
}
}
return true; // interrupted
}
IKafkaStreamDecoder::RunStartStruct
IKafkaStreamDecoder::getRunStartMessage(std::string &rawMsgBuffer) {
auto offset = getRunInfoMessage(rawMsgBuffer);
auto runMsg =
GetRunInfo(reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str()));
if (runMsg->info_type_type() != InfoTypes_RunStart) {
// We want a runStart message, try the next one
offset = getRunInfoMessage(rawMsgBuffer);
runMsg =
GetRunInfo(reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str()));
if (runMsg->info_type_type() != InfoTypes_RunStart) {
throw std::runtime_error("IKafkaStreamDecoder::initLocalCaches() - "
"Could not find a run start message"
"in the run info topic. Unable to continue");
}
}
auto runStartData = static_cast<const RunStart *>(runMsg->info_type());
IKafkaStreamDecoder::RunStartStruct runStart = {
runStartData->instrument_name()->str(), runStartData->run_number(),
runStartData->start_time(),
static_cast<size_t>(runStartData->n_periods()), offset};
return runStart;
}
/**
* Try to get a runInfo message from Kafka, throw error if it fails
* @param rawMsgBuffer : string to use as message buffer
*/
int64_t IKafkaStreamDecoder::getRunInfoMessage(std::string &rawMsgBuffer) {
int64_t offset;
int32_t partition;
std::string topicName;
m_runStream->consumeMessage(&rawMsgBuffer, offset, partition, topicName);
if (rawMsgBuffer.empty()) {
throw std::runtime_error("IKafkaStreamDecoder::getRunInfoMessage() - "
"Empty message received from run info "
"topic. Unable to continue");
}
if (!flatbuffers::BufferHasIdentifier(
reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str()),
RUN_MESSAGE_ID.c_str())) {
throw std::runtime_error("IKafkaStreamDecoder::getRunInfoMessage() - "
"Received unexpected message type from run info "
"topic. Unable to continue");
}
return offset;
}
std::map<int32_t, std::set<int32_t>>
IKafkaStreamDecoder::buildSpectrumToDetectorMap(const size_t nspectra,
const int32_t *spec,
const int32_t *udet,
uint32_t length) {
// Order is important here
std::map<int32_t, std::set<int32_t>> spdetMap;
for (uint32_t i = 0; i < length; ++i) {
auto specNo = spec[i];
auto detId = udet[i];
auto search = spdetMap.find(specNo);
if (search != spdetMap.end()) {
search->second.insert(detId);
} else {
spdetMap.insert({specNo, {detId}});
}
}
assert(nspectra == spdetMap.size());
return spdetMap;
}
void IKafkaStreamDecoder::checkRunMessage(
const std::string &buffer, bool &checkOffsets,
std::unordered_map<std::string, std::vector<int64_t>> &stopOffsets,
std::unordered_map<std::string, std::vector<bool>> &reachedEnd) {
if (flatbuffers::BufferHasIdentifier(
reinterpret_cast<const uint8_t *>(buffer.c_str()),
RUN_MESSAGE_ID.c_str())) {
auto runMsg = GetRunInfo(reinterpret_cast<const uint8_t *>(buffer.c_str()));
if (!checkOffsets && runMsg->info_type_type() == InfoTypes_RunStop) {
auto runStopMsg = static_cast<const RunStop *>(runMsg->info_type());
auto stopTime = runStopMsg->stop_time();
g_log.debug() << "Received an end-of-run message with stop time = "
<< stopTime << std::endl;
stopOffsets = getStopOffsets(stopOffsets, reachedEnd, stopTime);
checkOffsets = true;
checkIfAllStopOffsetsReached(reachedEnd, checkOffsets);
}
}
}
void IKafkaStreamDecoder::checkRunEnd(
const std::string &topicName, bool &checkOffsets, const int64_t offset,
const int32_t partition,
std::unordered_map<std::string, std::vector<int64_t>> &stopOffsets,
std::unordered_map<std::string, std::vector<bool>> &reachedEnd) {
if (reachedEnd.count(topicName) &&
offset >= stopOffsets[topicName][static_cast<size_t>(partition)]) {
reachedEnd[topicName][static_cast<size_t>(partition)] = true;
if (offset == stopOffsets[topicName][static_cast<size_t>(partition)]) {
g_log.debug() << "Reached end-of-run in " << topicName << " topic."
<< std::endl;
g_log.debug() << "topic: " << topicName << " offset: " << offset
<< " stopOffset: "
<< stopOffsets[topicName][static_cast<size_t>(partition)]
<< std::endl;
}
checkIfAllStopOffsetsReached(reachedEnd, checkOffsets);
}
}
} // namespace LiveData
} // namespace Mantid
This diff is collapsed.
...@@ -25,14 +25,37 @@ KafkaHistoListener::KafkaHistoListener() { ...@@ -25,14 +25,37 @@ KafkaHistoListener::KafkaHistoListener() {
declareProperty("InstrumentName", ""); declareProperty("InstrumentName", "");
} }
void KafkaHistoListener::setAlgorithm(
const Mantid::API::IAlgorithm &callingAlgorithm) {
this->updatePropertyValues(callingAlgorithm);
// Get the instrument name from StartLiveData so we can sub to correct topics
if (callingAlgorithm.existsProperty("Instrument")) {
m_instrumentName = callingAlgorithm.getPropertyValue("Instrument");
} else {
g_log.error("KafkaEventListener requires Instrument property to be set in "
"calling algorithm");
}
}
/// @copydoc ILiveListener::connect /// @copydoc ILiveListener::connect
bool KafkaHistoListener::connect(const Poco::Net::SocketAddress &address) { bool KafkaHistoListener::connect(const Poco::Net::SocketAddress &address) {
if (m_instrumentName.empty()) {
g_log.error(
"KafkaHistoListener::connect requires a non-empty instrument name");
}
try { try {
std::string instrumentName = getProperty("InstrumentName"); const std::string histoTopic(m_instrumentName +
const std::string histoTopic(instrumentName + KafkaTopicSubscriber::HISTO_TOPIC_SUFFIX),
KafkaTopicSubscriber::HISTO_TOPIC_SUFFIX); runInfoTopic(m_instrumentName + KafkaTopicSubscriber::RUN_TOPIC_SUFFIX),
spDetInfoTopic(m_instrumentName +
KafkaTopicSubscriber::DET_SPEC_TOPIC_SUFFIX),
sampleEnvTopic(m_instrumentName +
KafkaTopicSubscriber::SAMPLE_ENV_TOPIC_SUFFIX);
m_decoder = Kernel::make_unique<KafkaHistoStreamDecoder>( m_decoder = Kernel::make_unique<KafkaHistoStreamDecoder>(
address.toString(), histoTopic, instrumentName); std::make_shared<KafkaBroker>(address.toString()), histoTopic,
runInfoTopic, spDetInfoTopic, sampleEnvTopic);
} catch (std::exception &exc) { } catch (std::exception &exc) {
g_log.error() << "KafkaHistoListener::connect - Connection Error: " g_log.error() << "KafkaHistoListener::connect - Connection Error: "
<< exc.what() << "\n"; << exc.what() << "\n";
......
...@@ -12,16 +12,25 @@ ...@@ -12,16 +12,25 @@
#include "MantidDataObjects/WorkspaceCreation.h" #include "MantidDataObjects/WorkspaceCreation.h"
#include "MantidGeometry/Instrument/DetectorInfo.h" #include "MantidGeometry/Instrument/DetectorInfo.h"
#include "MantidHistogramData/BinEdges.h" #include "MantidHistogramData/BinEdges.h"
#include "MantidIndexing/IndexInfo.h"
#include "MantidKernel/Logger.h" #include "MantidKernel/Logger.h"
#include "MantidKernel/OptionalBool.h" #include "MantidKernel/OptionalBool.h"
#include "MantidKernel/TimeSeriesProperty.h"
#include "MantidKernel/UnitFactory.h"
#include "MantidKernel/WarningSuppressions.h" #include "MantidKernel/WarningSuppressions.h"
#include "MantidLiveData/Exception.h" #include "MantidLiveData/Exception.h"
GNU_DIAG_OFF("conversion") GNU_DIAG_OFF("conversion")
#include "private/Schema/df12_det_spec_map_generated.h"
#include "private/Schema/hs00_event_histogram_generated.h" #include "private/Schema/hs00_event_histogram_generated.h"
GNU_DIAG_ON("conversion") GNU_DIAG_ON("conversion")
using namespace HistoSchema;
namespace { namespace {
const std::string PROTON_CHARGE_PROPERTY = "proton_charge";
const std::string RUN_NUMBER_PROPERTY = "run_number";
const std::string RUN_START_PROPERTY = "run_start";
/// Logger /// Logger
Mantid::Kernel::Logger g_log("KafkaHistoStreamDecoder"); Mantid::Kernel::Logger g_log("KafkaHistoStreamDecoder");
} // namespace } // namespace
...@@ -35,95 +44,41 @@ namespace LiveData { ...@@ -35,95 +44,41 @@ namespace LiveData {
/** /**
* Constructor * Constructor
* @param brokerAddress The physical ipAddress of the broker * @param broker Kafka broker
* @param histoTopic The name of the topic streaming the histo data * @param histoTopic The name of the topic streaming the histo data
* @param spDetTopic The name of the topic streaming the spectrum-detector * @param spDetTopic The name of the topic streaming the spectrum-detector
* run mapping * run mapping
*/ */
KafkaHistoStreamDecoder::KafkaHistoStreamDecoder( KafkaHistoStreamDecoder::KafkaHistoStreamDecoder(
const std::string &brokerAddress, const std::string &histoTopic, std::shared_ptr<IKafkaBroker> broker, const std::string &histoTopic,
const std::string &instrumentName) const std::string &runInfoTopic, const std::string &spDetTopic,
: m_broker(brokerAddress), m_histoTopic(histoTopic), const std::string &sampleEnvTopic)
m_instrumentName(instrumentName), m_histoStream(), m_workspace(), : IKafkaStreamDecoder(broker, histoTopic, runInfoTopic, spDetTopic,
m_buffer(), m_thread(), m_interrupt(false), m_capturing(false), sampleEnvTopic),
m_exception(nullptr) { m_workspace() {}
if (histoTopic.empty())
throw std::invalid_argument(
"KafkaHistoStreamDecoder::KafkaHistoStreamDecorder "
": histogramTopic cannot be an empty string.");
if (instrumentName.empty())
throw std::invalid_argument(
"KafkaHistoStreamDecoder::KafkaHistoStreamDecorder "
": instrumentName cannot be an empty string.");
// Initialize buffer workspace
m_workspace = createBufferWorkspace();
}
/** /**
* Destructor. * Destructor.
* Stops capturing from the stream * Stops capturing from the stream
*/ */
KafkaHistoStreamDecoder::~KafkaHistoStreamDecoder() { stopCapture(); } KafkaHistoStreamDecoder::~KafkaHistoStreamDecoder() {}
/**
* Start capturing from the stream on a separate thread. This is a non-blocking
* call and will return after the thread has started
*/
void KafkaHistoStreamDecoder::startCapture(bool) {
g_log.debug() << "Starting capture on topic: " << m_histoTopic << "\n";
m_histoStream = m_broker.subscribe({m_histoTopic}, SubscribeAtOption::LATEST);
m_thread = std::thread([this]() { this->captureImpl(); });
m_thread.detach();
}
/**
* Stop capturing from the stream. This is a blocking call until the capturing
* function has completed
*/
void KafkaHistoStreamDecoder::stopCapture() noexcept {
g_log.debug() << "Stopping capture\n";
// This will interrupt the "event" loop
m_interrupt = true;
// Wait until the function has completed. The background thread will exit
// automatically
while (m_capturing) {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
};
}
/** /**
* Check if there is data available to extract * Check if there is data available to extract
* @return True if data has been accumulated so that extractData() * @return True if data has been accumulated so that extractData()
* can be called, false otherwise * can be called, false otherwise
*/ */
bool KafkaHistoStreamDecoder::hasData() const { bool KafkaHistoStreamDecoder::hasData() const noexcept {
std::lock_guard<std::mutex> lock(m_buffer_mutex); std::lock_guard<std::mutex> lock(m_mutex);
return !m_buffer.empty(); return !m_buffer.empty();
} }
/**
* Check for an exception thrown by the background thread and rethrow
* it if necessary.
* @return A pointer to the data collected since the last call to this
* method
*/
API::Workspace_sptr KafkaHistoStreamDecoder::extractData() {
if (m_exception) {
throw *m_exception;
}
auto workspace_ptr = extractDataImpl();
return workspace_ptr;
}
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// Private members // Private members
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
API::Workspace_sptr KafkaHistoStreamDecoder::extractDataImpl() { API::Workspace_sptr KafkaHistoStreamDecoder::extractDataImpl() {
std::lock_guard<std::mutex> lock(m_buffer_mutex); std::lock_guard<std::mutex> lock(m_mutex);
if (!m_capturing) { if (!m_capturing) {
throw Exception::NotYet("Local buffers not initialized."); throw Exception::NotYet("Local buffers not initialized.");
...@@ -169,78 +124,143 @@ API::Workspace_sptr KafkaHistoStreamDecoder::extractDataImpl() { ...@@ -169,78 +124,143 @@ API::Workspace_sptr KafkaHistoStreamDecoder::extractDataImpl() {
return ws; return ws;
} }
/**
* Start decoding data from the streams into the internal buffers.
* Implementation designed to be entry point for new thread of execution.
* It catches all thrown exceptions.
*/
void KafkaHistoStreamDecoder::captureImpl() {
m_capturing = true;
try {
captureImplExcept();
} catch (std::exception &exc) {
m_exception.reset(new std::runtime_error(exc.what()));
} catch (...) {
m_exception.reset(new std::runtime_error(
"KafkaEventStreamDecoder: Unknown exception type caught."));
}
m_capturing = false;
}
/** /**
* Exception-throwing variant of captureImpl(). Do not call this directly * Exception-throwing variant of captureImpl(). Do not call this directly
*/ */
void KafkaHistoStreamDecoder::captureImplExcept() { void KafkaHistoStreamDecoder::captureImplExcept() {
g_log.information("Event capture starting"); g_log.information("Event capture starting");
m_interrupt = false; m_interrupt = false; // Allow MonitorLiveData or user to interrupt
m_endRun = false;
m_runStatusSeen = false; // Flag to ensure MonitorLiveData observes end of run
std::string buffer; std::string buffer;
std::string runBuffer;
int64_t offset; int64_t offset;
int32_t partition; int32_t partition;
std::string topicName; std::string topicName;
auto runStartStruct = getRunStartMessage(runBuffer);
m_spDetStream->consumeMessage(&buffer, offset, partition, topicName);
initLocalCaches(buffer, runStartStruct);
while (!m_interrupt) { // Keep track of whether we've reached the end of a run
// Pull in events std::unordered_map<std::string, std::vector<int64_t>> stopOffsets;
m_histoStream->consumeMessage(&buffer, offset, partition, topicName); std::unordered_map<std::string, std::vector<bool>> reachedEnd;
bool checkOffsets = false;
// No events, wait for some to come along... while (!m_interrupt) {
if (buffer.empty()) { if (m_endRun) {
std::this_thread::sleep_for(std::chrono::milliseconds(100)); waitForRunEndObservation();
continue; continue;
} else {
waitForDataExtraction();
} }
// Lock so we don't overwrite buffer while workspace is being extracted
{ {
std::lock_guard<std::mutex> lock(m_buffer_mutex); // Lock so we don't overwrite buffer while workspace is being extracted or
// try to access data before it is read.
std::lock_guard<std::mutex> lock(m_mutex);
// Pull in data
m_dataStream->consumeMessage(&buffer, offset, partition, topicName);
// No events, wait for some to come along...
if (buffer.empty()) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
m_cbIterationEnd();
continue;
}
if (checkOffsets) {
checkRunEnd(topicName, checkOffsets, offset, partition, stopOffsets,
reachedEnd);
if (offset > stopOffsets[topicName][static_cast<size_t>(partition)]) {
// If the offset is beyond the end of the current run, then skip to
// the next iteration and don't process the message
m_cbIterationEnd();
continue;
}
}
// Data being accumulated before being streamed so no need to store
// messages.
m_buffer = buffer; m_buffer = buffer;
} }
checkRunMessage(buffer, checkOffsets, stopOffsets, reachedEnd);
std::this_thread::sleep_for(std::chrono::milliseconds(100)); std::this_thread::sleep_for(std::chrono::milliseconds(100));
m_cbIterationEnd();
} }
g_log.debug("Histo capture finished"); g_log.debug("Histo capture finished");
} }
DataObjects::Workspace2D_sptr KafkaHistoStreamDecoder::createBufferWorkspace() { void KafkaHistoStreamDecoder::initLocalCaches(
DataObjects::Workspace2D_sptr workspace; const std::string &rawMsgBuffer, const RunStartStruct &runStartData) {
if (rawMsgBuffer.empty()) {
try { throw std::runtime_error("KafkaEventStreamDecoder::initLocalCaches() - "
auto alg = API::AlgorithmManager::Instance().createUnmanaged( "Empty message received from spectrum-detector "
"LoadEmptyInstrument"); "topic. Unable to continue");
// Do not put the workspace in the ADS
alg->setChild(true);
alg->initialize();
alg->setPropertyValue("InstrumentName", m_instrumentName);
// Dummy workspace value "ws" as not placed in ADS
alg->setPropertyValue("OutputWorkspace", "ws");
alg->execute();
workspace = alg->getProperty("OutputWorkspace");
} catch (std::exception &exc) {
g_log.error() << "Error loading empty instrument '" << m_instrumentName
<< "': " << exc.what() << "\n";
throw;
} }
auto spDetMsg = GetSpectraDetectorMapping(
reinterpret_cast<const uint8_t *>(rawMsgBuffer.c_str()));
auto nspec = static_cast<uint32_t>(spDetMsg->n_spectra());
auto nudet = spDetMsg->detector_id()->size();
if (nudet != nspec) {
std::ostringstream os;
os << "KafkaEventStreamDecoder::initLocalEventBuffer() - Invalid "
"spectra/detector mapping. Expected matched length arrays but "
"found nspec="
<< nspec << ", ndet=" << nudet;
throw std::runtime_error(os.str());
}
m_runNumber = runStartData.runNumber;
// Create buffer
auto histoBuffer = createBufferWorkspace<DataObjects::Workspace2D>(
"Workspace2D", static_cast<size_t>(spDetMsg->n_spectra()),
spDetMsg->spectrum()->data(), spDetMsg->detector_id()->data(), nudet);
// Load the instrument if possible but continue if we can't
auto instName = runStartData.instrumentName;
if (!instName.empty())
loadInstrument<DataObjects::Workspace2D>(instName, histoBuffer);
else
g_log.warning(
"Empty instrument name received. Continuing without instrument");
auto &mutableRun = histoBuffer->mutableRun();
// Run start. Cache locally for computing frame times
// Convert nanoseconds to seconds (and discard the extra precision)
auto runStartTime = static_cast<time_t>(runStartData.startTime / 1000000000);
m_runStart.set_from_time_t(runStartTime);
auto timeString = m_runStart.toISO8601String();
// Run number
mutableRun.addProperty(RUN_START_PROPERTY, std::string(timeString));
mutableRun.addProperty(RUN_NUMBER_PROPERTY,
std::to_string(runStartData.runNumber));
// Create the proton charge property
mutableRun.addProperty(
new Kernel::TimeSeriesProperty<double>(PROTON_CHARGE_PROPERTY));
// Cache spec->index mapping. We assume it is the same across all periods
m_specToIdx = histoBuffer->getSpectrumToWorkspaceIndexMap();
// Buffers for each period
const size_t nperiods = runStartData.nPeriods;
if (nperiods > 1) {
throw std::runtime_error(
"KafkaHistoStreamDecoder - Does not support multi-period data.");
}
// New caches so LoadLiveData's output workspace needs to be replaced
m_dataReset = true;
m_workspace = histoBuffer;
}
return workspace; void KafkaHistoStreamDecoder::sampleDataFromMessage(const std::string &) {
throw Kernel::Exception::NotImplementedError("This method will require "
"implementation when processing "
"sample environment messages.");
} }
} // namespace LiveData } // namespace LiveData
......
// Mantid Repository : https://github.com/mantidproject/mantid
//
// Copyright &copy; 2018 ISIS Rutherford Appleton Laboratory UKRI,
// NScD Oak Ridge National Laboratory, European Spallation Source
// & Institut Laue - Langevin
// SPDX - License - Identifier: GPL - 3.0 +
#ifndef MANTID_LIVEDATA_KAFKAHISTOSTREAMDECODERTEST_H_
#define MANTID_LIVEDATA_KAFKAHISTOSTREAMDECODERTEST_H_
#include <cxxtest/TestSuite.h>
#include "KafkaTesting.h"
#include "MantidAPI/Run.h"
#include "MantidGeometry/Instrument.h"
#include "MantidHistogramData/FixedLengthVector.h"
#include "MantidHistogramData/HistogramX.h"
#include "MantidHistogramData/HistogramY.h"
#include "MantidKernel/ConfigService.h"
#include "MantidKernel/TimeSeriesProperty.h"
#include "MantidKernel/make_unique.h"
#include "MantidLiveData/Kafka/KafkaHistoStreamDecoder.h"
#include <Poco/Path.h>
using Mantid::LiveData::KafkaHistoStreamDecoder;
using namespace KafkaTesting;
class KafkaHistoStreamDecoderTest : public CxxTest::TestSuite {
public:
// This pair of boilerplate methods prevent the suite being created statically
// This means the constructor isn't called when running other tests
static KafkaHistoStreamDecoderTest *createSuite() {
return new KafkaHistoStreamDecoderTest();
}
static void destroySuite(KafkaHistoStreamDecoderTest *suite) { delete suite; }
void setUp() override {
// Temporarily change the instrument directory to the testing one
using Mantid::Kernel::ConfigService;
auto &config = ConfigService::Instance();
auto baseInstDir = config.getInstrumentDirectory();
Poco::Path testFile =
Poco::Path(baseInstDir)
.resolve("IDFs_for_UNIT_TESTING/UnitTestFacilities.xml");
// Load the test facilities file
config.updateFacilities(testFile.toString());
config.setFacility("TEST");
// Update instrument search directory
config.setString("instrumentDefinition.directory",
baseInstDir + "/IDFs_for_UNIT_TESTING");
}
void tearDown() override {
using Mantid::Kernel::ConfigService;
auto &config = ConfigService::Instance();
config.reset();
// Restore the main facilities file
config.updateFacilities();
}
void test_Histo_Stream() {
using namespace ::testing;
using namespace KafkaTesting;
using Mantid::API::Workspace_sptr;
using Mantid::DataObjects::Workspace2D;
using namespace Mantid::LiveData;
auto mockBroker = std::make_shared<MockKafkaBroker>();
EXPECT_CALL(*mockBroker, subscribe_(_, _))
.Times(Exactly(3))
.WillOnce(Return(new FakeHistoSubscriber()))
.WillOnce(Return(new FakeRunInfoStreamSubscriber(1)))
.WillOnce(Return(new FakeISISSpDetStreamSubscriber));
auto decoder = createTestDecoder(mockBroker);
TSM_ASSERT("Decoder should not have create data buffers yet",
!decoder->hasData());
startCapturing(*decoder, 1);
// Checks
Workspace_sptr workspace;
TSM_ASSERT("Decoder's data buffers should be created now",
decoder->hasData());
TS_ASSERT_THROWS_NOTHING(workspace = decoder->extractData());
TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
TS_ASSERT(!decoder->isCapturing());
// -- Workspace checks --
TSM_ASSERT("Expected non-null workspace pointer from extractData()",
workspace);
auto histoWksp = boost::dynamic_pointer_cast<Workspace2D>(workspace);
TSM_ASSERT(
"Expected a Workspace2D from extractData(). Found something else",
histoWksp);
checkWorkspaceMetadata(*histoWksp);
checkWorkspaceHistoData(*histoWksp);
}
private:
std::unique_ptr<Mantid::LiveData::KafkaHistoStreamDecoder>
createTestDecoder(std::shared_ptr<Mantid::LiveData::IKafkaBroker> broker) {
using namespace Mantid::LiveData;
return Mantid::Kernel::make_unique<KafkaHistoStreamDecoder>(broker, "", "",
"", "");
}
// Start decoding and wait until we have gathered enough data to test
void startCapturing(Mantid::LiveData::KafkaHistoStreamDecoder &decoder,
uint8_t maxIterations) {
// Register callback to know when a whole loop as been iterated through
m_niterations = 0;
auto callback = [this, maxIterations]() {
this->iterationCallback(maxIterations);
};
decoder.registerIterationEndCb(callback);
decoder.registerErrorCb(callback);
TS_ASSERT_THROWS_NOTHING(decoder.startCapture());
continueCapturing(decoder, maxIterations);
}
void iterationCallback(uint8_t maxIterations) {
std::unique_lock<std::mutex> lock(this->m_callbackMutex);
this->m_niterations++;
if (this->m_niterations == maxIterations) {
lock.unlock();
this->m_callbackCondition.notify_one();
}
}
void continueCapturing(Mantid::LiveData::KafkaHistoStreamDecoder &decoder,
uint8_t maxIterations) {
// Re-register callback with the (potentially) new value of maxIterations
auto callback = [this, maxIterations]() {
this->iterationCallback(maxIterations);
};
decoder.registerIterationEndCb(callback);
decoder.registerErrorCb(callback);
{
std::unique_lock<std::mutex> lk(m_callbackMutex);
this->m_callbackCondition.wait(lk, [this, maxIterations]() {
return this->m_niterations == maxIterations;
});
}
}
void
checkWorkspaceMetadata(const Mantid::DataObjects::Workspace2D &histoWksp) {
TS_ASSERT(histoWksp.getInstrument());
TS_ASSERT_EQUALS("HRPDTEST", histoWksp.getInstrument()->getName());
TS_ASSERT_EQUALS(
"2016-08-31T12:07:42",
histoWksp.run().getPropertyValueAsType<std::string>("run_start"));
std::array<Mantid::specnum_t, 5> specs = {{1, 2, 3, 4, 5}};
std::array<Mantid::detid_t, 5> ids = {{1001, 1002, 1100, 901000, 10100}};
TS_ASSERT_EQUALS(specs.size(), histoWksp.getNumberHistograms());
for (size_t i = 0; i < histoWksp.getNumberHistograms(); ++i) {
const auto &spec = histoWksp.getSpectrum(i);
TS_ASSERT_EQUALS(specs[i], spec.getSpectrumNo());
const auto &sid = spec.getDetectorIDs();
TS_ASSERT_EQUALS(ids[i], *(sid.begin()));
}
}
void
checkWorkspaceHistoData(const Mantid::DataObjects::Workspace2D &histoWksp) {
// Inspect all 5 HRPDTEST Spectra
auto data = histoWksp.histogram(0);
// std::vector<double> xbins(data.x().cbegin(), data.x().cend());
TS_ASSERT_EQUALS(data.x().rawData(), (std::vector<double>{0, 1, 2}));
TS_ASSERT_EQUALS(data.y().rawData(), (std::vector<double>{100, 140}));
data = histoWksp.histogram(1);
TS_ASSERT_EQUALS(data.y().rawData(), (std::vector<double>{210, 100}));
data = histoWksp.histogram(2);
TS_ASSERT_EQUALS(data.y().rawData(), (std::vector<double>{110, 70}));
data = histoWksp.histogram(3);
TS_ASSERT_EQUALS(data.y().rawData(), (std::vector<double>{5, 3}));
data = histoWksp.histogram(4);
TS_ASSERT_EQUALS(data.y().rawData(), (std::vector<double>{20, 4}));
}
private:
std::mutex m_callbackMutex;
std::condition_variable m_callbackCondition;
uint8_t m_niterations = 0;
};
#endif /* MANTID_LIVEDATA_KAFKAHISTOSTREAMDECODERTEST_H_ */
\ No newline at end of file
This diff is collapsed.
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment