Skip to content
Snippets Groups Projects
Unverified Commit 90f9f9e8 authored by Simon Heybrock's avatar Simon Heybrock Committed by GitHub
Browse files

Merge pull request #21655 from...

Merge pull request #21655 from mantidproject/21480_remove_dependence_of_KafkaEventListener_on_ISIS_specific_data

Remove dependence of Kafka Live Listener on ISIS specific event data
parents 4ab9b990 5d658b61
No related branches found
No related tags found
No related merge requests found
...@@ -507,22 +507,21 @@ void KafkaEventStreamDecoder::eventDataFromMessage(const std::string &buffer) { ...@@ -507,22 +507,21 @@ void KafkaEventStreamDecoder::eventDataFromMessage(const std::string &buffer) {
const auto &detData = *(eventMsg->detector_id()); const auto &detData = *(eventMsg->detector_id());
auto nEvents = tofData.size(); auto nEvents = tofData.size();
if (eventMsg->facility_specific_data_type() != FacilityData_ISISData) { DataObjects::EventWorkspace_sptr periodBuffer;
throw std::runtime_error("KafkaEventStreamDecoder only knows how to "
"deal with ISIS facility specific data");
}
auto ISISMsg =
static_cast<const ISISData *>(eventMsg->facility_specific_data());
std::lock_guard<std::mutex> lock(m_mutex); std::lock_guard<std::mutex> lock(m_mutex);
auto &periodBuffer = if (eventMsg->facility_specific_data_type() == FacilityData_ISISData) {
*m_localEvents[static_cast<size_t>(ISISMsg->period_number())]; auto ISISMsg =
auto &mutableRunInfo = periodBuffer.mutableRun(); static_cast<const ISISData *>(eventMsg->facility_specific_data());
mutableRunInfo.getTimeSeriesProperty<double>(PROTON_CHARGE_PROPERTY) periodBuffer = m_localEvents[static_cast<size_t>(ISISMsg->period_number())];
->addValue(pulseTime, ISISMsg->proton_charge()); auto &mutableRunInfo = periodBuffer->mutableRun();
mutableRunInfo.getTimeSeriesProperty<double>(PROTON_CHARGE_PROPERTY)
->addValue(pulseTime, ISISMsg->proton_charge());
} else {
periodBuffer = m_localEvents[0];
}
for (decltype(nEvents) i = 0; i < nEvents; ++i) { for (decltype(nEvents) i = 0; i < nEvents; ++i) {
auto &spectrum = auto &spectrum = periodBuffer->getSpectrum(
periodBuffer.getSpectrum(m_specToIdx[static_cast<int32_t>(detData[i])]); m_specToIdx[static_cast<int32_t>(detData[i])]);
spectrum.addEventQuickly(TofEvent(static_cast<double>(tofData[i]) * spectrum.addEventQuickly(TofEvent(static_cast<double>(tofData[i]) *
1e-3, // nanoseconds to microseconds 1e-3, // nanoseconds to microseconds
pulseTime)); pulseTime));
......
...@@ -54,7 +54,7 @@ public: ...@@ -54,7 +54,7 @@ public:
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
void test_Single_Period_Event_Stream() { void test_Single_Period_Event_Stream() {
using namespace ::testing; using namespace ::testing;
using namespace ISISKafkaTesting; using namespace KafkaTesting;
using Mantid::API::Workspace_sptr; using Mantid::API::Workspace_sptr;
using Mantid::DataObjects::EventWorkspace; using Mantid::DataObjects::EventWorkspace;
using namespace Mantid::LiveData; using namespace Mantid::LiveData;
...@@ -92,7 +92,7 @@ public: ...@@ -92,7 +92,7 @@ public:
void test_Multiple_Period_Event_Stream() { void test_Multiple_Period_Event_Stream() {
using namespace ::testing; using namespace ::testing;
using namespace ISISKafkaTesting; using namespace KafkaTesting;
using Mantid::API::Workspace_sptr; using Mantid::API::Workspace_sptr;
using Mantid::API::WorkspaceGroup; using Mantid::API::WorkspaceGroup;
using Mantid::DataObjects::EventWorkspace; using Mantid::DataObjects::EventWorkspace;
...@@ -134,7 +134,7 @@ public: ...@@ -134,7 +134,7 @@ public:
void test_End_Of_Run_Reported_After_Run_Stop_Reached() { void test_End_Of_Run_Reported_After_Run_Stop_Reached() {
using namespace ::testing; using namespace ::testing;
using namespace ISISKafkaTesting; using namespace KafkaTesting;
using Mantid::API::Workspace_sptr; using Mantid::API::Workspace_sptr;
using Mantid::DataObjects::EventWorkspace; using Mantid::DataObjects::EventWorkspace;
using namespace Mantid::LiveData; using namespace Mantid::LiveData;
...@@ -173,7 +173,7 @@ public: ...@@ -173,7 +173,7 @@ public:
void void
test_Get_All_Run_Events_When_Run_Stop_Message_Received_Before_Last_Event_Message() { test_Get_All_Run_Events_When_Run_Stop_Message_Received_Before_Last_Event_Message() {
using namespace ::testing; using namespace ::testing;
using namespace ISISKafkaTesting; using namespace KafkaTesting;
using Mantid::API::Workspace_sptr; using Mantid::API::Workspace_sptr;
using Mantid::DataObjects::EventWorkspace; using Mantid::DataObjects::EventWorkspace;
using namespace Mantid::LiveData; using namespace Mantid::LiveData;
...@@ -211,7 +211,7 @@ public: ...@@ -211,7 +211,7 @@ public:
void test_Sample_Log_From_Event_Stream() { void test_Sample_Log_From_Event_Stream() {
using namespace ::testing; using namespace ::testing;
using namespace ISISKafkaTesting; using namespace KafkaTesting;
using Mantid::API::Workspace_sptr; using Mantid::API::Workspace_sptr;
using Mantid::DataObjects::EventWorkspace; using Mantid::DataObjects::EventWorkspace;
using namespace Mantid::LiveData; using namespace Mantid::LiveData;
...@@ -244,7 +244,7 @@ public: ...@@ -244,7 +244,7 @@ public:
void test_Empty_Event_Stream_Waits() { void test_Empty_Event_Stream_Waits() {
using namespace ::testing; using namespace ::testing;
using namespace ISISKafkaTesting; using namespace KafkaTesting;
auto mockBroker = std::make_shared<MockKafkaBroker>(); auto mockBroker = std::make_shared<MockKafkaBroker>();
EXPECT_CALL(*mockBroker, subscribe_(_, _)) EXPECT_CALL(*mockBroker, subscribe_(_, _))
...@@ -260,12 +260,44 @@ public: ...@@ -260,12 +260,44 @@ public:
TS_ASSERT(!decoder->isCapturing()); TS_ASSERT(!decoder->isCapturing());
} }
void
test_No_Exception_When_Event_Message_Without_Facility_Data_Is_Processed() {
using namespace ::testing;
using namespace KafkaTesting;
using Mantid::API::Workspace_sptr;
using Mantid::DataObjects::EventWorkspace;
auto mockBroker = std::make_shared<MockKafkaBroker>();
EXPECT_CALL(*mockBroker, subscribe_(_, _))
.Times(Exactly(3))
.WillOnce(Return(new FakeEventSubscriber))
.WillOnce(Return(new FakeRunInfoStreamSubscriber(1)))
.WillOnce(Return(new FakeISISSpDetStreamSubscriber));
auto decoder = createTestDecoder(mockBroker);
startCapturing(*decoder, 2);
Workspace_sptr workspace;
TS_ASSERT_THROWS_NOTHING(workspace = decoder->extractData());
TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
TS_ASSERT(!decoder->isCapturing());
// Check we did process the event message and extract the events
TSM_ASSERT("Expected non-null workspace pointer from extractData()",
workspace);
auto eventWksp = boost::dynamic_pointer_cast<EventWorkspace>(workspace);
TSM_ASSERT(
"Expected an EventWorkspace from extractData(). Found something else",
eventWksp);
TSM_ASSERT_EQUALS("Expected 3 events from the event message", 3,
eventWksp->getNumberEvents());
}
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
// Failure tests // Failure tests
//---------------------------------------------------------------------------- //----------------------------------------------------------------------------
void test_Error_In_Stream_Extraction_Throws_Error_On_ExtractData() { void test_Error_In_Stream_Extraction_Throws_Error_On_ExtractData() {
using namespace ::testing; using namespace ::testing;
using namespace ISISKafkaTesting; using namespace KafkaTesting;
auto mockBroker = std::make_shared<MockKafkaBroker>(); auto mockBroker = std::make_shared<MockKafkaBroker>();
EXPECT_CALL(*mockBroker, subscribe_(_, _)) EXPECT_CALL(*mockBroker, subscribe_(_, _))
...@@ -283,7 +315,7 @@ public: ...@@ -283,7 +315,7 @@ public:
void test_Empty_SpDet_Stream_Throws_Error_On_ExtractData() { void test_Empty_SpDet_Stream_Throws_Error_On_ExtractData() {
using namespace ::testing; using namespace ::testing;
using namespace ISISKafkaTesting; using namespace KafkaTesting;
auto mockBroker = std::make_shared<MockKafkaBroker>(); auto mockBroker = std::make_shared<MockKafkaBroker>();
EXPECT_CALL(*mockBroker, subscribe_(_, _)) EXPECT_CALL(*mockBroker, subscribe_(_, _))
...@@ -301,7 +333,7 @@ public: ...@@ -301,7 +333,7 @@ public:
void test_Empty_RunInfo_Stream_Throws_Error_On_ExtractData() { void test_Empty_RunInfo_Stream_Throws_Error_On_ExtractData() {
using namespace ::testing; using namespace ::testing;
using namespace ISISKafkaTesting; using namespace KafkaTesting;
auto mockBroker = std::make_shared<MockKafkaBroker>(); auto mockBroker = std::make_shared<MockKafkaBroker>();
EXPECT_CALL(*mockBroker, subscribe_(_, _)) EXPECT_CALL(*mockBroker, subscribe_(_, _))
......
...@@ -18,7 +18,7 @@ GCC_DIAG_ON(conversion) ...@@ -18,7 +18,7 @@ GCC_DIAG_ON(conversion)
#include <ctime> #include <ctime>
namespace ISISKafkaTesting { namespace KafkaTesting {
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// Mock broker to inject fake subscribers // Mock broker to inject fake subscribers
...@@ -122,7 +122,7 @@ public: ...@@ -122,7 +122,7 @@ public:
} }
}; };
void fakeReceiveAnEventMessage(std::string *buffer, int32_t nextPeriod) { void fakeReceiveAnISISEventMessage(std::string *buffer, int32_t nextPeriod) {
flatbuffers::FlatBufferBuilder builder; flatbuffers::FlatBufferBuilder builder;
std::vector<uint32_t> spec = {5, 4, 3, 2, 1, 2}; std::vector<uint32_t> spec = {5, 4, 3, 2, 1, 2};
std::vector<uint32_t> tof = {11000, 10000, 9000, 8000, 7000, 6000}; std::vector<uint32_t> tof = {11000, 10000, 9000, 8000, 7000, 6000};
...@@ -143,6 +143,22 @@ void fakeReceiveAnEventMessage(std::string *buffer, int32_t nextPeriod) { ...@@ -143,6 +143,22 @@ void fakeReceiveAnEventMessage(std::string *buffer, int32_t nextPeriod) {
builder.GetSize()); builder.GetSize());
} }
void fakeReceiveAnEventMessage(std::string *buffer) {
flatbuffers::FlatBufferBuilder builder;
std::vector<uint32_t> spec = {5, 4, 3};
std::vector<uint32_t> tof = {11000, 10000, 9000};
uint64_t frameTime = 1;
auto messageFlatbuf = CreateEventMessage(
builder, builder.CreateString("KafkaTesting"), 0, frameTime,
builder.CreateVector(tof), builder.CreateVector(spec));
FinishEventMessageBuffer(builder, messageFlatbuf);
// Copy to provided buffer
buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
builder.GetSize());
}
void fakeReceiveASampleEnvMessage(std::string *buffer) { void fakeReceiveASampleEnvMessage(std::string *buffer) {
flatbuffers::FlatBufferBuilder builder; flatbuffers::FlatBufferBuilder builder;
// Sample environment log // Sample environment log
...@@ -206,7 +222,7 @@ public: ...@@ -206,7 +222,7 @@ public:
std::string &topic) override { std::string &topic) override {
assert(message); assert(message);
fakeReceiveAnEventMessage(message, m_nextPeriod); fakeReceiveAnISISEventMessage(message, m_nextPeriod);
m_nextPeriod = ((m_nextPeriod + 1) % m_nperiods); m_nextPeriod = ((m_nextPeriod + 1) % m_nperiods);
UNUSED_ARG(offset); UNUSED_ARG(offset);
...@@ -236,6 +252,58 @@ private: ...@@ -236,6 +252,58 @@ private:
int32_t m_nextPeriod; int32_t m_nextPeriod;
}; };
// ---------------------------------------------------------------------------------------
// Fake non-institution-specific event stream to provide event and sample
// environment data
// ---------------------------------------------------------------------------------------
class FakeEventSubscriber : public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *message, int64_t &offset, int32_t &partition,
std::string &topic) override {
assert(message);
switch (m_nextOffset) {
case 0:
fakeReceiveARunStartMessage(message, 1000, "2016-08-31T12:07:42",
"HRPDTEST", 1);
break;
case 2:
fakeReceiveARunStopMessage(message, m_stopTime);
break;
default:
fakeReceiveAnEventMessage(message);
}
m_nextOffset++;
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {std::pair<std::string, std::vector<int64_t>>(m_topicName, {1})};
}
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return {std::pair<std::string, std::vector<int64_t>>(m_topicName, {1})};
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
private:
std::string m_topicName = "topic_name";
int m_nextOffset = 0;
std::string m_stopTime = "2016-08-31T12:07:52";
};
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
// Fake event stream to provide sample environment data // Fake event stream to provide sample environment data
// ----------------------------------------------------------------------------- // -----------------------------------------------------------------------------
...@@ -354,7 +422,7 @@ public: ...@@ -354,7 +422,7 @@ public:
m_nperiods); m_nperiods);
break; break;
default: default:
fakeReceiveAnEventMessage(buffer, 0); fakeReceiveAnISISEventMessage(buffer, 0);
} }
topic = "topic_name"; topic = "topic_name";
offset = m_nextOffset; offset = m_nextOffset;
...@@ -441,6 +509,6 @@ private: ...@@ -441,6 +509,6 @@ private:
// These match the detector numbers in HRPDTEST_Definition.xml // These match the detector numbers in HRPDTEST_Definition.xml
std::vector<int32_t> m_detid = {1001, 1002, 1100, 901000, 10100}; std::vector<int32_t> m_detid = {1001, 1002, 1100, 901000, 10100};
}; };
} // namespace ISISKafkaTesting } // namespace KafkaTesting
#endif // MANTID_LIVEDATA_ISISKAFKAEVENTSTREAMDECODERTESTMOCKS_H_ #endif // MANTID_LIVEDATA_ISISKAFKAEVENTSTREAMDECODERTESTMOCKS_H_
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment