-
Lamar Moore authoredLamar Moore authored
Code owners
Assign users and groups as approvers for specific file changes. Learn more.
KafkaTesting.h 25.92 KiB
// Mantid Repository : https://github.com/mantidproject/mantid
//
// Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
// NScD Oak Ridge National Laboratory, European Spallation Source
// & Institut Laue - Langevin
// SPDX - License - Identifier: GPL - 3.0 +
#ifndef MANTID_LIVEDATA_ISISKAFKATESTING_H_
#define MANTID_LIVEDATA_ISISKAFKATESTING_H_
#include "MantidKernel/DateAndTimeHelpers.h"
#include "MantidKernel/WarningSuppressions.h"
#include "MantidLiveData/Kafka/IKafkaBroker.h"
#include "MantidLiveData/Kafka/IKafkaStreamSubscriber.h"
#include "MantidTypes/Core/DateAndTime.h"
#include <gmock/gmock.h>
GNU_DIAG_OFF("conversion")
#include "Kafka/private/Schema/ba57_run_info_generated.h"
#include "Kafka/private/Schema/df12_det_spec_map_generated.h"
#include "Kafka/private/Schema/ev42_events_generated.h"
#include "Kafka/private/Schema/f142_logdata_generated.h"
#include "Kafka/private/Schema/hs00_event_histogram_generated.h"
#include "Kafka/private/Schema/is84_isis_events_generated.h"
GNU_DIAG_ON("conversion")
#include <ctime>
namespace KafkaTesting {
// -----------------------------------------------------------------------------
// Mock broker to inject fake subscribers
// -----------------------------------------------------------------------------
class MockKafkaBroker : public Mantid::LiveData::IKafkaBroker {
public:
using IKafkaStreamSubscriber_uptr =
std::unique_ptr<Mantid::LiveData::IKafkaStreamSubscriber>;
using IKafkaStreamSubscriber_ptr = Mantid::LiveData::IKafkaStreamSubscriber *;
GNU_DIAG_OFF_SUGGEST_OVERRIDE
// GMock cannot mock non-copyable return types so we resort to a small
// adapter method. Users have to use EXPECT_CALL(subscribe_) instead
MOCK_CONST_METHOD2(subscribe_, IKafkaStreamSubscriber_ptr(
std::vector<std::string>,
Mantid::LiveData::SubscribeAtOption));
IKafkaStreamSubscriber_uptr
subscribe(std::vector<std::string> s,
Mantid::LiveData::SubscribeAtOption option) const override {
return std::unique_ptr<Mantid::LiveData::IKafkaStreamSubscriber>(
this->subscribe_(s, option));
}
MOCK_CONST_METHOD3(subscribe_, IKafkaStreamSubscriber_ptr(
std::vector<std::string>, int64_t,
Mantid::LiveData::SubscribeAtOption));
IKafkaStreamSubscriber_uptr
subscribe(std::vector<std::string> s, int64_t offset,
Mantid::LiveData::SubscribeAtOption option) const override {
return std::unique_ptr<Mantid::LiveData::IKafkaStreamSubscriber>(
this->subscribe_(s, offset, option));
}
GNU_DIAG_ON_SUGGEST_OVERRIDE
};
// -----------------------------------------------------------------------------
// Fake stream to raise error to tests
// -----------------------------------------------------------------------------
class FakeExceptionThrowingStreamSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
std::string &topic) override {
buffer->clear();
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
throw std::runtime_error("FakeExceptionThrowingStreamSubscriber");
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {
std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
}
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return offsets;
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
};
// -----------------------------------------------------------------------------
// Fake stream to provide empty stream to client
// -----------------------------------------------------------------------------
class FakeEmptyStreamSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
std::string &topic) override {
buffer->clear();
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {
std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
}
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return offsets;
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
};
namespace {
void fakeReceiveAnISISEventMessage(std::string *buffer, int32_t nextPeriod) {
flatbuffers::FlatBufferBuilder builder;
std::vector<uint32_t> spec = {5, 4, 3, 2, 1, 2};
std::vector<uint32_t> tof = {11000, 10000, 9000, 8000, 7000, 6000};
uint64_t frameTime = 1;
float protonCharge(0.5f);
auto messageFlatbuf = CreateEventMessage(
builder, builder.CreateString("KafkaTesting"), 0, frameTime,
builder.CreateVector(tof), builder.CreateVector(spec),
FacilityData_ISISData,
CreateISISData(builder, static_cast<uint32_t>(nextPeriod),
RunState_RUNNING, protonCharge)
.Union());
FinishEventMessageBuffer(builder, messageFlatbuf);
// Copy to provided buffer
buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
builder.GetSize());
}
void fakeReceiveAnEventMessage(std::string *buffer) {
flatbuffers::FlatBufferBuilder builder;
std::vector<uint32_t> spec = {5, 4, 3};
std::vector<uint32_t> tof = {11000, 10000, 9000};
uint64_t frameTime = 1;
auto messageFlatbuf = CreateEventMessage(
builder, builder.CreateString("KafkaTesting"), 0, frameTime,
builder.CreateVector(tof), builder.CreateVector(spec));
FinishEventMessageBuffer(builder, messageFlatbuf);
// Copy to provided buffer
buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
builder.GetSize());
}
void fakeReceiveHistoMessage(std::string *buffer) {
flatbuffers::FlatBufferBuilder builder;
// shape is binedges=2 nspectra=5
std::vector<uint32_t> current_shape{3, 5};
auto bin_edges = builder.CreateVector(std::vector<double>{0, 1, 2});
auto xbins = HistoSchema::CreateArrayDouble(builder, bin_edges);
auto bin_metadata = HistoSchema::CreateDimensionMetaData(
builder, 3, builder.CreateString("TOF"), builder.CreateString("TOF"),
HistoSchema::Array_ArrayDouble, xbins.Union());
auto unit_metadata = HistoSchema::CreateDimensionMetaData(
builder, 1, builder.CreateString("Counts"));
auto dim_metadata = builder.CreateVector(
std::vector<flatbuffers::Offset<HistoSchema::DimensionMetaData>>{
bin_metadata, unit_metadata});
// Data values are nspectra*nbins
auto data_values = builder.CreateVector(
std::vector<double>{100, 140, 210, 100, 110, 70, 5, 3, 20, 4});
auto data = HistoSchema::CreateArrayDouble(builder, data_values);
auto messageFlatBuf = HistoSchema::CreateEventHistogram(
builder, builder.CreateString("KafkaTesting"), 0, dim_metadata, 0,
builder.CreateVector(current_shape), 0, HistoSchema::Array_ArrayDouble,
data.Union());
FinishEventHistogramBuffer(builder, messageFlatBuf);
buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
builder.GetSize());
}
void fakeReceiveASampleEnvMessage(std::string *buffer) {
flatbuffers::FlatBufferBuilder builder;
// Sample environment log
auto logDataMessage = LogSchema::CreateLogData(
builder, builder.CreateString("fake source"), LogSchema::Value_Int,
LogSchema::CreateInt(builder, 42).Union(), 1495618188000000000L);
FinishLogDataBuffer(builder, logDataMessage);
// Copy to provided buffer
buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
builder.GetSize());
}
void fakeReceiveARunStartMessage(std::string *buffer, int32_t runNumber,
const std::string &startTime,
const std::string &instName,
int32_t nPeriods) {
// Convert date to time_t
auto mantidTime = Mantid::Types::Core::DateAndTime(startTime);
auto startTimestamp =
static_cast<uint64_t>(mantidTime.to_time_t() * 1000000000);
flatbuffers::FlatBufferBuilder builder;
auto runInfo =
CreateRunInfo(builder, InfoTypes_RunStart,
CreateRunStart(builder, startTimestamp, runNumber,
builder.CreateString(instName), nPeriods)
.Union());
FinishRunInfoBuffer(builder, runInfo);
// Copy to provided buffer
buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
builder.GetSize());
}
void fakeReceiveARunStopMessage(std::string *buffer,
const std::string &stopTime) {
// Convert date to time_t
auto mantidTime = Mantid::Types::Core::DateAndTime(stopTime);
auto stopTimestamp =
static_cast<uint64_t>(mantidTime.to_time_t() * 1000000000);
flatbuffers::FlatBufferBuilder builder;
auto runInfo = CreateRunInfo(builder, InfoTypes_RunStop,
CreateRunStop(builder, stopTimestamp).Union());
FinishRunInfoBuffer(builder, runInfo);
// Copy to provided buffer
buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
builder.GetSize());
}
} // namespace
// -----------------------------------------------------------------------------
// Fake ISIS event stream to provide event and sample environment data
// -----------------------------------------------------------------------------
class FakeISISEventSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
explicit FakeISISEventSubscriber(int32_t nperiods)
: m_nperiods(nperiods), m_nextPeriod(0) {}
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *message, int64_t &offset, int32_t &partition,
std::string &topic) override {
assert(message);
fakeReceiveAnISISEventMessage(message, m_nextPeriod);
m_nextPeriod = ((m_nextPeriod + 1) % m_nperiods);
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {
std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
}
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return offsets;
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
private:
const int32_t m_nperiods;
int32_t m_nextPeriod;
};
// ---------------------------------------------------------------------------------------
// Fake non-institution-specific event stream to provide event and sample
// environment data
// ---------------------------------------------------------------------------------------
class FakeEventSubscriber : public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *message, int64_t &offset, int32_t &partition,
std::string &topic) override {
assert(message);
switch (m_nextOffset) {
case 0:
fakeReceiveARunStartMessage(message, 1000, "2016-08-31T12:07:42",
"HRPDTEST", 1);
break;
case 2:
fakeReceiveARunStopMessage(message, m_stopTime);
break;
default:
fakeReceiveAnEventMessage(message);
}
m_nextOffset++;
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {std::pair<std::string, std::vector<int64_t>>(m_topicName, {1})};
}
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return {std::pair<std::string, std::vector<int64_t>>(m_topicName, {1})};
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
private:
std::string m_topicName = "topic_name";
int m_nextOffset = 0;
std::string m_stopTime = "2016-08-31T12:07:52";
};
// ---------------------------------------------------------------------------------------
// Fake non-institution-specific histo stream to provide histogram and sample
// environment data
// ---------------------------------------------------------------------------------------
class FakeHistoSubscriber : public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *message, int64_t &offset, int32_t &partition,
std::string &topic) override {
assert(message);
switch (m_nextOffset) {
case 0:
fakeReceiveARunStartMessage(message, 1000, "2016-08-31T12:07:42",
"HRPDTEST", 1);
break;
case 2:
fakeReceiveARunStopMessage(message, m_stopTime);
break;
default:
fakeReceiveHistoMessage(message);
}
m_nextOffset++;
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {std::pair<std::string, std::vector<int64_t>>(m_topicName, {1})};
}
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return {std::pair<std::string, std::vector<int64_t>>(m_topicName, {1})};
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
private:
std::string m_topicName = "topic_name";
int m_nextOffset = 0;
std::string m_stopTime = "2016-08-31T12:07:52";
};
// -----------------------------------------------------------------------------
// Fake event stream to provide sample environment data
// -----------------------------------------------------------------------------
class FakeSampleEnvironmentSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *message, int64_t &offset, int32_t &partition,
std::string &topic) override {
assert(message);
fakeReceiveASampleEnvMessage(message);
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {
std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
}
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return offsets;
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
};
// -----------------------------------------------------------------------------
// Fake run data stream
// -----------------------------------------------------------------------------
class FakeRunInfoStreamSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
explicit FakeRunInfoStreamSubscriber(int32_t nperiods)
: m_nperiods(nperiods) {}
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
std::string &topic) override {
assert(buffer);
fakeReceiveARunStartMessage(buffer, m_runNumber, m_startTime, m_instName,
m_nperiods);
m_runNumber++;
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {
std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
}
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return offsets;
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
private:
std::string m_startTime = "2016-08-31T12:07:42";
int32_t m_runNumber = 1000;
std::string m_instName = "HRPDTEST";
int32_t m_nperiods = 1;
};
// -----------------------------------------------------------------------------
// Fake run data stream with incrementing number of periods
// -----------------------------------------------------------------------------
class FakeRunInfoStreamSubscriberVaryingNPeriods
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
std::string &topic) override {
assert(buffer);
fakeReceiveARunStartMessage(buffer, m_runNumber, m_startTime, m_instName,
m_nperiods);
m_nperiods++;
m_runNumber++;
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {
std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
}
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return offsets;
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
private:
std::string m_startTime = "2016-08-31T12:07:42";
int32_t m_runNumber = 1000;
std::string m_instName = "HRPDTEST";
int32_t m_nperiods = 1;
};
// -----------------------------------------------------------------------------
// Varing period data stream with run and event messages
// -----------------------------------------------------------------------------
class FakeVariablePeriodSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
explicit FakeVariablePeriodSubscriber(uint32_t startOffset)
: m_nextOffset(startOffset) {}
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
std::string &topic) override {
assert(buffer);
// Return messages in this order:
// Run start (with 1 period)
// Event data
// Run start (with 2 periods)
// Run stop
// Event data
// Event data (data for 2nd period)
// Run stop
switch (m_nextOffset) {
case 0:
fakeReceiveARunStartMessage(buffer, 1000, m_startTime, m_instName,
m_nperiods);
break;
case 2:
fakeReceiveARunStartMessage(buffer, 1001, m_startTime, m_instName, 2);
break;
case 3:
fakeReceiveARunStopMessage(buffer, m_stopTime);
break;
case 5:
fakeReceiveAnISISEventMessage(buffer, 1);
break;
case 6:
fakeReceiveARunStopMessage(buffer, m_stopTime);
break;
default:
fakeReceiveAnISISEventMessage(buffer, 0);
}
topic = "topic_name";
offset = m_nextOffset;
partition = 0;
m_nextOffset++;
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {std::pair<std::string, std::vector<int64_t>>(m_topicName, {2})};
}
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
return {std::pair<std::string, std::vector<int64_t>>(m_topicName, {2})};
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
private:
const std::string m_topicName = "topic_name";
uint32_t m_nextOffset;
std::string m_startTime = "2016-08-31T12:07:42";
std::string m_stopTime = "2016-08-31T12:07:52";
const std::string m_instName = "HRPDTEST";
int32_t m_nperiods = 1;
};
// -----------------------------------------------------------------------------
// Fake data stream with run and event messages
// -----------------------------------------------------------------------------
class FakeDataStreamSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
explicit FakeDataStreamSubscriber(int64_t stopOffset)
: m_stopOffset(stopOffset) {}
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
std::string &topic) override {
assert(buffer);
// Return messages in this order:
// Run start
// Event data
// Run stop
// Event data
// Run start
// Event data... ad infinitum
switch (m_nextOffset) {
case 0:
fakeReceiveARunStartMessage(buffer, 1000, m_startTime, m_instName,
m_nperiods);
break;
case 2:
fakeReceiveARunStopMessage(buffer, m_stopTime);
break;
case 4:
fakeReceiveARunStartMessage(buffer, 1000, m_startTime, m_instName,
m_nperiods);
break;
default:
fakeReceiveAnISISEventMessage(buffer, 0);
}
topic = "topic_name";
offset = m_nextOffset;
partition = 0;
m_nextOffset++;
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {std::pair<std::string, std::vector<int64_t>>(m_topicName,
{m_stopOffset})};
}
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
return {std::pair<std::string, std::vector<int64_t>>(m_topicName,
{m_nextOffset - 1})};
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
private:
const std::string m_topicName = "topic_name";
uint32_t m_nextOffset = 0;
std::string m_startTime = "2016-08-31T12:07:42";
std::string m_stopTime = "2016-08-31T12:07:52";
const std::string m_instName = "HRPDTEST";
int32_t m_nperiods = 1;
int64_t m_stopOffset;
};
// -----------------------------------------------------------------------------
// Fake ISIS spectra-detector stream
// -----------------------------------------------------------------------------
class FakeISISSpDetStreamSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
std::string &topic) override {
assert(buffer);
// Serialize data with flatbuffers
flatbuffers::FlatBufferBuilder builder;
auto specVector = builder.CreateVector(m_spec);
auto detIdsVector = builder.CreateVector(m_detid);
auto spdet = CreateSpectraDetectorMapping(
builder, specVector, detIdsVector, static_cast<int32_t>(m_spec.size()));
FinishSpectraDetectorMappingBuffer(builder, spdet);
// Copy to provided buffer
buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
builder.GetSize());
UNUSED_ARG(offset);
UNUSED_ARG(partition);
UNUSED_ARG(topic);
}
std::unordered_map<std::string, std::vector<int64_t>>
getOffsetsForTimestamp(int64_t timestamp) override {
UNUSED_ARG(timestamp);
return {
std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
}
std::unordered_map<std::string, std::vector<int64_t>>
getCurrentOffsets() override {
std::unordered_map<std::string, std::vector<int64_t>> offsets;
return offsets;
}
void seek(const std::string &topic, uint32_t partition,
int64_t offset) override {
UNUSED_ARG(topic);
UNUSED_ARG(partition);
UNUSED_ARG(offset);
}
private:
std::vector<int32_t> m_spec = {1, 2, 3, 4, 5};
// These match the detector numbers in HRPDTEST_Definition.xml
std::vector<int32_t> m_detid = {1001, 1002, 1100, 901000, 10100};
};
} // namespace KafkaTesting
#endif // MANTID_LIVEDATA_ISISKAFKAEVENTSTREAMDECODERTESTMOCKS_H_