Skip to content
Snippets Groups Projects
Commit 0492640c authored by Lamar Moore's avatar Lamar Moore
Browse files

Review changes

parent fcc35cce
No related merge requests found
......@@ -20,9 +20,9 @@ namespace LiveData {
class KafkaHistoStreamDecoder;
/**
Implementation of a live listener to consume messages from the Kafka system
at ISIS. It currently parses the events directly using flatbuffers so will
need updating if the schema changes.
Implementation of a live listener to consume messages which are in a histogram
format from the Kafka system at ISIS. It currently parses the histogram data
directly using flatbuffers so will need updating if the schema changes.
*/
class DLLExport KafkaHistoListener : public API::LiveListener {
public:
......
......@@ -8,8 +8,8 @@
#define MANTID_LIVEDATA_ISISKAFKAHISTOSTREAMDECODER_H_
#include "MantidDataObjects/Workspace2D.h"
#include "MantidLiveData/Kafka/IKafkaBroker.h"
#include "MantidLiveData/Kafka/IKafkaStreamSubscriber.h"
#include "MantidLiveData/Kafka/KafkaBroker.h"
#include <atomic>
#include <mutex>
......@@ -26,7 +26,7 @@ namespace LiveData {
*/
class DLLExport KafkaHistoStreamDecoder {
public:
KafkaHistoStreamDecoder(std::shared_ptr<IKafkaBroker> broker,
KafkaHistoStreamDecoder(const std::string &brokerAddress,
const std::string &histoTopic,
const std::string &instrumentName);
~KafkaHistoStreamDecoder();
......@@ -37,7 +37,7 @@ public:
///@name Start/stop
///@{
void startCapture(bool startNow = true);
void stopCapture();
void stopCapture() noexcept;
///@}
///@name Querying
......@@ -61,7 +61,7 @@ private:
DataObjects::Workspace2D_sptr createBufferWorkspace();
/// Broker to use to subscribe to topics
std::shared_ptr<IKafkaBroker> m_broker;
KafkaBroker m_broker;
/// Topic name
const std::string m_histoTopic;
/// Instrument name
......@@ -82,7 +82,7 @@ private:
/// Flag indicating that the decoder is capturing
std::atomic<bool> m_capturing;
/// Exception object indicating there was an error
boost::shared_ptr<std::runtime_error> m_exception;
std::unique_ptr<std::runtime_error> m_exception;
};
} // namespace LiveData
......
......@@ -27,13 +27,12 @@ KafkaHistoListener::KafkaHistoListener() {
/// @copydoc ILiveListener::connect
bool KafkaHistoListener::connect(const Poco::Net::SocketAddress &address) {
auto broker = std::make_shared<KafkaBroker>(address.toString());
try {
std::string instrumentName = getProperty("InstrumentName");
const std::string histoTopic(instrumentName +
KafkaTopicSubscriber::HISTO_TOPIC_SUFFIX);
m_decoder = Kernel::make_unique<KafkaHistoStreamDecoder>(broker, histoTopic,
instrumentName);
m_decoder = Kernel::make_unique<KafkaHistoStreamDecoder>(
address.toString(), histoTopic, instrumentName);
} catch (std::exception &exc) {
g_log.error() << "KafkaHistoListener::connect - Connection Error: "
<< exc.what() << "\n";
......@@ -75,8 +74,8 @@ bool KafkaHistoListener::isConnected() {
/// @copydoc ILiveListener::runStatus
API::ILiveListener::RunStatus KafkaHistoListener::runStatus() {
if (!m_decoder) {
g_log.error("KafkaHistoListener::runStatus(): Kafka is not connected");
throw Kernel::Exception::InternetError("Kafka is not connected");
g_log.warning("KafkaHistoListener::runStatus(): Kafka is not connected");
return NoRun;
}
return m_decoder->hasReachedEndOfRun() ? EndRun : Running;
......
......@@ -35,18 +35,26 @@ namespace LiveData {
/**
* Constructor
* @param broker A reference to a Broker object for creating topic streams
* @param brokerAddress The physical ipAddress of the broker
* @param histoTopic The name of the topic streaming the histo data
* @param spDetTopic The name of the topic streaming the spectrum-detector
* run mapping
*/
KafkaHistoStreamDecoder::KafkaHistoStreamDecoder(
std::shared_ptr<IKafkaBroker> broker, const std::string &histoTopic,
const std::string &brokerAddress, const std::string &histoTopic,
const std::string &instrumentName)
: m_broker(broker), m_histoTopic(histoTopic),
: m_broker(brokerAddress), m_histoTopic(histoTopic),
m_instrumentName(instrumentName), m_histoStream(), m_workspace(),
m_buffer(), m_thread(), m_interrupt(false), m_capturing(false),
m_exception() {
m_exception(nullptr) {
if (histoTopic.empty())
throw std::invalid_argument(
"KafkaHistoStreamDecoder::KafkaHistoStreamDecorder "
": histogramTopic cannot be an empty string.");
if (instrumentName.empty())
throw std::invalid_argument(
"KafkaHistoStreamDecoder::KafkaHistoStreamDecorder "
": instrumentName cannot be an empty string.");
// Initialize buffer workspace
m_workspace = createBufferWorkspace();
}
......@@ -63,8 +71,7 @@ KafkaHistoStreamDecoder::~KafkaHistoStreamDecoder() { stopCapture(); }
*/
void KafkaHistoStreamDecoder::startCapture(bool) {
g_log.debug() << "Starting capture on topic: " << m_histoTopic << "\n";
m_histoStream =
m_broker->subscribe({m_histoTopic}, SubscribeAtOption::LATEST);
m_histoStream = m_broker.subscribe({m_histoTopic}, SubscribeAtOption::LATEST);
m_thread = std::thread([this]() { this->captureImpl(); });
m_thread.detach();
......@@ -74,7 +81,7 @@ void KafkaHistoStreamDecoder::startCapture(bool) {
* Stop capturing from the stream. This is a blocking call until the capturing
* function has completed
*/
void KafkaHistoStreamDecoder::stopCapture() {
void KafkaHistoStreamDecoder::stopCapture() noexcept {
g_log.debug() << "Stopping capture\n";
// This will interrupt the "event" loop
......@@ -126,9 +133,10 @@ API::Workspace_sptr KafkaHistoStreamDecoder::extractDataImpl() {
throw Exception::NotYet("No message to process yet.");
}
auto histoMsg = GetEventHistogram(m_buffer.c_str());
// Retrieve flatbuffer struct describing histogram
const auto *histoMsg = GetEventHistogram(m_buffer.c_str());
auto shape = histoMsg->current_shape();
const auto *shape = histoMsg->current_shape();
auto nbins = shape->Get(0) - 1;
auto nspectra = static_cast<size_t>(shape->Get(1));
......@@ -171,10 +179,10 @@ void KafkaHistoStreamDecoder::captureImpl() {
try {
captureImplExcept();
} catch (std::exception &exc) {
m_exception = boost::make_shared<std::runtime_error>(exc.what());
m_exception.reset(new std::runtime_error(exc.what()));
} catch (...) {
m_exception = boost::make_shared<std::runtime_error>(
"KafkaEventStreamDecoder: Unknown exception type caught.");
m_exception.reset(new std::runtime_error(
"KafkaEventStreamDecoder: Unknown exception type caught."));
}
m_capturing = false;
}
......@@ -222,6 +230,7 @@ DataObjects::Workspace2D_sptr KafkaHistoStreamDecoder::createBufferWorkspace() {
alg->setChild(true);
alg->initialize();
alg->setPropertyValue("InstrumentName", m_instrumentName);
// Dummy workspace value "ws" as not placed in ADS
alg->setPropertyValue("OutputWorkspace", "ws");
alg->execute();
workspace = alg->getProperty("OutputWorkspace");
......@@ -231,6 +240,7 @@ DataObjects::Workspace2D_sptr KafkaHistoStreamDecoder::createBufferWorkspace() {
throw;
}
// OutputWorkspace type is Worspace2D in algorithm see LoadEmptyInstrument.cpp
return boost::dynamic_pointer_cast<DataObjects::Workspace2D>(workspace);
}
......
// automatically generated by the FlatBuffers compiler, do not modify
#ifndef FLATBUFFERS_GENERATED_AI33DETCOUNTIMGS_H_
#define FLATBUFFERS_GENERATED_AI33DETCOUNTIMGS_H_
#include "flatbuffers/flatbuffers.h"
struct AccumulatedImage;
struct AccumulatedImage FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
enum {
VT_FIRST_PULSE_TIME = 4,
VT_PULSE_TIME = 6,
VT_DETECTOR_ID = 8,
VT_DETECTION_COUNT = 10
};
uint64_t first_pulse_time() const {
return GetField<uint64_t>(VT_FIRST_PULSE_TIME, 0);
}
uint64_t pulse_time() const { return GetField<uint64_t>(VT_PULSE_TIME, 0); }
const flatbuffers::Vector<uint32_t> *detector_id() const {
return GetPointer<const flatbuffers::Vector<uint32_t> *>(VT_DETECTOR_ID);
}
const flatbuffers::Vector<uint32_t> *detection_count() const {
return GetPointer<const flatbuffers::Vector<uint32_t> *>(
VT_DETECTION_COUNT);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<uint64_t>(verifier, VT_FIRST_PULSE_TIME) &&
VerifyField<uint64_t>(verifier, VT_PULSE_TIME) &&
VerifyField<flatbuffers::uoffset_t>(verifier, VT_DETECTOR_ID) &&
verifier.Verify(detector_id()) &&
VerifyField<flatbuffers::uoffset_t>(verifier, VT_DETECTION_COUNT) &&
verifier.Verify(detection_count()) && verifier.EndTable();
}
};
struct AccumulatedImageBuilder {
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_first_pulse_time(uint64_t first_pulse_time) {
fbb_.AddElement<uint64_t>(AccumulatedImage::VT_FIRST_PULSE_TIME,
first_pulse_time, 0);
}
void add_pulse_time(uint64_t pulse_time) {
fbb_.AddElement<uint64_t>(AccumulatedImage::VT_PULSE_TIME, pulse_time, 0);
}
void add_detector_id(
flatbuffers::Offset<flatbuffers::Vector<uint32_t>> detector_id) {
fbb_.AddOffset(AccumulatedImage::VT_DETECTOR_ID, detector_id);
}
void add_detection_count(
flatbuffers::Offset<flatbuffers::Vector<uint32_t>> detection_count) {
fbb_.AddOffset(AccumulatedImage::VT_DETECTION_COUNT, detection_count);
}
AccumulatedImageBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) {
start_ = fbb_.StartTable();
}
AccumulatedImageBuilder &operator=(const AccumulatedImageBuilder &);
flatbuffers::Offset<AccumulatedImage> Finish() {
const auto end = fbb_.EndTable(start_, 4);
auto o = flatbuffers::Offset<AccumulatedImage>(end);
return o;
}
};
inline flatbuffers::Offset<AccumulatedImage> CreateAccumulatedImage(
flatbuffers::FlatBufferBuilder &_fbb, uint64_t first_pulse_time = 0,
uint64_t pulse_time = 0,
flatbuffers::Offset<flatbuffers::Vector<uint32_t>> detector_id = 0,
flatbuffers::Offset<flatbuffers::Vector<uint32_t>> detection_count = 0) {
AccumulatedImageBuilder builder_(_fbb);
builder_.add_pulse_time(pulse_time);
builder_.add_first_pulse_time(first_pulse_time);
builder_.add_detection_count(detection_count);
builder_.add_detector_id(detector_id);
return builder_.Finish();
}
inline flatbuffers::Offset<AccumulatedImage> CreateAccumulatedImageDirect(
flatbuffers::FlatBufferBuilder &_fbb, uint64_t first_pulse_time = 0,
uint64_t pulse_time = 0, const std::vector<uint32_t> *detector_id = nullptr,
const std::vector<uint32_t> *detection_count = nullptr) {
return CreateAccumulatedImage(
_fbb, first_pulse_time, pulse_time,
detector_id ? _fbb.CreateVector<uint32_t>(*detector_id) : 0,
detection_count ? _fbb.CreateVector<uint32_t>(*detection_count) : 0);
}
inline const AccumulatedImage *GetAccumulatedImage(const void *buf) {
return flatbuffers::GetRoot<AccumulatedImage>(buf);
}
inline const char *AccumulatedImageIdentifier() { return "ai33"; }
inline bool AccumulatedImageBufferHasIdentifier(const void *buf) {
return flatbuffers::BufferHasIdentifier(buf, AccumulatedImageIdentifier());
}
inline bool VerifyAccumulatedImageBuffer(flatbuffers::Verifier &verifier) {
return verifier.VerifyBuffer<AccumulatedImage>(AccumulatedImageIdentifier());
}
inline void
FinishAccumulatedImageBuffer(flatbuffers::FlatBufferBuilder &fbb,
flatbuffers::Offset<AccumulatedImage> root) {
fbb.Finish(root, AccumulatedImageIdentifier());
}
#endif // FLATBUFFERS_GENERATED_AI33DETCOUNTIMGS_H_
// clang-format off
// automatically generated by the FlatBuffers compiler, do not modify
// Original schema can be found at:
// https://github.com/ess-dmsc/streaming-data-types/tree/master/schemas/hs00_event_histogram.fbs
#ifndef FLATBUFFERS_GENERATED_HS00EVENTHISTOGRAM_H_
#define FLATBUFFERS_GENERATED_HS00EVENTHISTOGRAM_H_
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment