Newer
Older
#ifndef MANTID_LIVEDATA_ISISKAFKATESTING_H_
#define MANTID_LIVEDATA_ISISKAFKATESTING_H_
#include "MantidKernel/DateAndTimeHelpers.h"
#include "MantidKernel/WarningSuppressions.h"
#include "MantidLiveData/Kafka/IKafkaBroker.h"
#include "MantidLiveData/Kafka/IKafkaStreamSubscriber.h"
#include "MantidTypes/Core/DateAndTime.h"
#include "Kafka/private/Schema/ba57_run_info_generated.h"
#include "Kafka/private/Schema/df12_det_spec_map_generated.h"
#include "Kafka/private/Schema/ev42_events_generated.h"
#include "Kafka/private/Schema/is84_isis_events_generated.h"
#include "Kafka/private/Schema/f142_logdata_generated.h"
GCC_DIAG_ON(conversion)
#include <ctime>
namespace ISISKafkaTesting {
// -----------------------------------------------------------------------------
// Mock broker to inject fake subscribers
// -----------------------------------------------------------------------------
class MockKafkaBroker : public Mantid::LiveData::IKafkaBroker {
public:
using IKafkaStreamSubscriber_uptr =
std::unique_ptr<Mantid::LiveData::IKafkaStreamSubscriber>;
using IKafkaStreamSubscriber_ptr = Mantid::LiveData::IKafkaStreamSubscriber *;
GCC_DIAG_OFF_SUGGEST_OVERRIDE
// GMock cannot mock non-copyable return types so we resort to a small
// adapter method. Users have to use EXPECT_CALL(subscribe_) instead
MOCK_CONST_METHOD2(subscribe_, IKafkaStreamSubscriber_ptr(
std::vector<std::string>,
Mantid::LiveData::SubscribeAtOption));
IKafkaStreamSubscriber_uptr
subscribe(std::vector<std::string> s,
Mantid::LiveData::SubscribeAtOption option) const override {
return std::unique_ptr<Mantid::LiveData::IKafkaStreamSubscriber>(
MOCK_CONST_METHOD3(subscribe_, IKafkaStreamSubscriber_ptr(
std::vector<std::string>, int64_t,
Mantid::LiveData::SubscribeAtOption));
IKafkaStreamSubscriber_uptr
subscribe(std::vector<std::string> s, int64_t offset,
Mantid::LiveData::SubscribeAtOption option) const override {
return std::unique_ptr<Mantid::LiveData::IKafkaStreamSubscriber>(
GCC_DIAG_ON_SUGGEST_OVERRIDE
};
// -----------------------------------------------------------------------------
// Fake stream to raise error to tests
// -----------------------------------------------------------------------------
class FakeExceptionThrowingStreamSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
std::string &topic) override {
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
throw std::runtime_error("FakeExceptionThrowingStreamSubscriber");
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {
std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
}
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return offsets;
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
};
// -----------------------------------------------------------------------------
// Fake stream to provide empty stream to client
// -----------------------------------------------------------------------------
class FakeEmptyStreamSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
std::string &topic) override {
buffer->clear();
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {
std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
}
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return offsets;
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
void fakeReceiveAnEventMessage(std::string *buffer, int32_t nextPeriod) {
flatbuffers::FlatBufferBuilder builder;
std::vector<uint32_t> spec = {5, 4, 3, 2, 1, 2};
std::vector<uint32_t> tof = {11000, 10000, 9000, 8000, 7000, 6000};
uint64_t frameTime = 1;
float protonCharge(0.5f);
auto messageFlatbuf = CreateEventMessage(
builder, builder.CreateString("KafkaTesting"), 0, frameTime,
builder.CreateVector(tof), builder.CreateVector(spec),
FacilityData_ISISData,
CreateISISData(builder, static_cast<uint32_t>(nextPeriod),
RunState_RUNNING, protonCharge).Union());
FinishEventMessageBuffer(builder, messageFlatbuf);
// Copy to provided buffer
buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
builder.GetSize());
}
void fakeReceiveASampleEnvMessage(std::string *buffer) {
flatbuffers::FlatBufferBuilder builder;
// Sample environment log
auto logDataMessage =
CreateLogData(builder, builder.CreateString("fake source"), Value_Int,
CreateInt(builder, 42).Union(), 1495618188000000000L);
FinishLogDataBuffer(builder, logDataMessage);
// Copy to provided buffer
buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
builder.GetSize());
}
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
void fakeReceiveARunStartMessage(std::string *buffer, int32_t runNumber,
const std::string &startTime,
const std::string &instName,
int32_t nPeriods) {
// Convert date to time_t
auto mantidTime = Mantid::Types::Core::DateAndTime(startTime);
auto startTimestamp =
static_cast<uint64_t>(mantidTime.to_time_t() * 1000000000);
flatbuffers::FlatBufferBuilder builder;
auto runInfo = CreateRunInfo(
builder, InfoTypes_RunStart,
CreateRunStart(builder, startTimestamp, runNumber,
builder.CreateString(instName), nPeriods).Union());
FinishRunInfoBuffer(builder, runInfo);
// Copy to provided buffer
buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
builder.GetSize());
}
void fakeReceiveARunStopMessage(std::string *buffer,
const std::string &stopTime) {
// Convert date to time_t
auto mantidTime = Mantid::Types::Core::DateAndTime(stopTime);
auto stopTimestamp =
static_cast<uint64_t>(mantidTime.to_time_t() * 1000000000);
flatbuffers::FlatBufferBuilder builder;
auto runInfo = CreateRunInfo(builder, InfoTypes_RunStop,
CreateRunStop(builder, stopTimestamp).Union());
FinishRunInfoBuffer(builder, runInfo);
// Copy to provided buffer
buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
builder.GetSize());
}
// -----------------------------------------------------------------------------
// Fake ISIS event stream to provide event and sample environment data
// -----------------------------------------------------------------------------
class FakeISISEventSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
explicit FakeISISEventSubscriber(int32_t nperiods)
: m_nperiods(nperiods), m_nextPeriod(0) {}
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *message, int64_t &offset, int32_t &partition,
std::string &topic) override {
fakeReceiveAnEventMessage(message, m_nextPeriod);
m_nextPeriod = ((m_nextPeriod + 1) % m_nperiods);
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {
std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return offsets;
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
private:
const int32_t m_nperiods;
int32_t m_nextPeriod;
};
// -----------------------------------------------------------------------------
// Fake event stream to provide sample environment data
// -----------------------------------------------------------------------------
class FakeSampleEnvironmentSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *message, int64_t &offset, int32_t &partition,
std::string &topic) override {
assert(message);
fakeReceiveASampleEnvMessage(message);
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {
std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return offsets;
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
// -----------------------------------------------------------------------------
// -----------------------------------------------------------------------------
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
explicit FakeRunInfoStreamSubscriber(int32_t nperiods)
: m_nperiods(nperiods) {}
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
std::string &topic) override {
fakeReceiveARunStartMessage(buffer, m_runNumber, m_startTime, m_instName,
m_nperiods);
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {
std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return offsets;
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
private:
std::string m_startTime = "2016-08-31T12:07:42";
int32_t m_runNumber = 1000;
std::string m_instName = "HRPDTEST";
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
// -----------------------------------------------------------------------------
// Fake data stream with run and event messages
// -----------------------------------------------------------------------------
class FakeDataStreamSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
std::string &topic) override {
assert(buffer);
// Return a messages in this order:
// Run start
// Event data
// Run stop
// Run start
// Event data... ad infinitum
switch (m_currentOffset) {
case 0:
fakeReceiveARunStartMessage(buffer, 1000, m_startTime, m_instName,
m_nperiods);
break;
case 2:
fakeReceiveARunStopMessage(buffer, m_stopTime);
break;
case 3:
fakeReceiveARunStartMessage(buffer, 1000, m_startTime, m_instName,
m_nperiods);
break;
default:
fakeReceiveAnEventMessage(buffer, 0);
}
topic = "topic_name";
offset = m_currentOffset;
partition = 0;
m_currentOffset++;
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {std::pair<std::string, std::vector<int64_t>>(m_topicName,
{m_stopOffset})};
}
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
return {std::pair<std::string, std::vector<int64_t>>(m_topicName,
{m_currentOffset})};
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
private:
const std::string m_topicName = "topic_name";
uint32_t m_currentOffset = 0;
std::string m_startTime = "2016-08-31T12:07:42";
std::string m_stopTime = "2016-08-31T12:07:52";
const std::string m_instName = "HRPDTEST";
int32_t m_nperiods = 1;
int64_t m_stopOffset = 1;
};
// -----------------------------------------------------------------------------
// Fake ISIS spectra-detector stream
// -----------------------------------------------------------------------------
class FakeISISSpDetStreamSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
std::string &topic) override {
assert(buffer);
// Serialize data with flatbuffers
flatbuffers::FlatBufferBuilder builder;
auto specVector = builder.CreateVector(m_spec);
auto detIdsVector = builder.CreateVector(m_detid);
builder, specVector, detIdsVector, static_cast<int32_t>(m_spec.size()));
FinishSpectraDetectorMappingBuffer(builder, spdet);
// Copy to provided buffer
buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
builder.GetSize());
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {
std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return offsets;
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
private:
std::vector<int32_t> m_spec = {1, 2, 3, 4, 5};
// These match the detector numbers in HRPDTEST_Definition.xml
std::vector<int32_t> m_detid = {1001, 1002, 1100, 901000, 10100};
#endif // MANTID_LIVEDATA_ISISKAFKAEVENTSTREAMDECODERTESTMOCKS_H_