Skip to content
Snippets Groups Projects
KafkaTesting.h 16 KiB
Newer Older
#ifndef MANTID_LIVEDATA_ISISKAFKATESTING_H_
#define MANTID_LIVEDATA_ISISKAFKATESTING_H_

#include "MantidKernel/DateAndTimeHelpers.h"
#include "MantidKernel/WarningSuppressions.h"
#include "MantidLiveData/Kafka/IKafkaBroker.h"
Matthew D Jones's avatar
Matthew D Jones committed
#include "MantidLiveData/Kafka/IKafkaStreamSubscriber.h"
#include "MantidTypes/Core/DateAndTime.h"
#include <gmock/gmock.h>

GCC_DIAG_OFF(conversion)
Matthew D Jones's avatar
Matthew D Jones committed
#include "Kafka/private/Schema/ba57_run_info_generated.h"
#include "Kafka/private/Schema/df12_det_spec_map_generated.h"
#include "Kafka/private/Schema/ev42_events_generated.h"
#include "Kafka/private/Schema/is84_isis_events_generated.h"
#include "Kafka/private/Schema/f142_logdata_generated.h"
GCC_DIAG_ON(conversion)

#include <ctime>

namespace ISISKafkaTesting {

// -----------------------------------------------------------------------------
// Mock broker to inject fake subscribers
// -----------------------------------------------------------------------------
class MockKafkaBroker : public Mantid::LiveData::IKafkaBroker {
public:
  using IKafkaStreamSubscriber_uptr =
      std::unique_ptr<Mantid::LiveData::IKafkaStreamSubscriber>;
  using IKafkaStreamSubscriber_ptr = Mantid::LiveData::IKafkaStreamSubscriber *;

  GCC_DIAG_OFF_SUGGEST_OVERRIDE
  // GMock cannot mock non-copyable return types so we resort to a small
  // adapter method. Users have to use EXPECT_CALL(subscribe_) instead
Matthew D Jones's avatar
Matthew D Jones committed
  MOCK_CONST_METHOD2(subscribe_, IKafkaStreamSubscriber_ptr(
                                     std::vector<std::string>,
                                     Mantid::LiveData::SubscribeAtOption));
Matthew D Jones's avatar
Matthew D Jones committed
  IKafkaStreamSubscriber_uptr
  subscribe(std::vector<std::string> s,
            Mantid::LiveData::SubscribeAtOption option) const override {
    return std::unique_ptr<Mantid::LiveData::IKafkaStreamSubscriber>(
Matthew D Jones's avatar
Matthew D Jones committed
        this->subscribe_(s, option));
Matthew D Jones's avatar
Matthew D Jones committed
  MOCK_CONST_METHOD3(subscribe_, IKafkaStreamSubscriber_ptr(
                                     std::vector<std::string>, int64_t,
                                     Mantid::LiveData::SubscribeAtOption));
Matthew D Jones's avatar
Matthew D Jones committed
  IKafkaStreamSubscriber_uptr
  subscribe(std::vector<std::string> s, int64_t offset,
            Mantid::LiveData::SubscribeAtOption option) const override {
    return std::unique_ptr<Mantid::LiveData::IKafkaStreamSubscriber>(
Matthew D Jones's avatar
Matthew D Jones committed
        this->subscribe_(s, offset, option));
  GCC_DIAG_ON_SUGGEST_OVERRIDE
};

// -----------------------------------------------------------------------------
// Fake stream to raise error to tests
// -----------------------------------------------------------------------------
class FakeExceptionThrowingStreamSubscriber
    : public Mantid::LiveData::IKafkaStreamSubscriber {
public:
  void subscribe() override {}
  void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
  void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
                      std::string &topic) override {
    buffer->clear();
    UNUSED_ARG(offset);
    UNUSED_ARG(partition);
    UNUSED_ARG(topic);
    throw std::runtime_error("FakeExceptionThrowingStreamSubscriber");
  }
  std::unordered_map<std::string, std::vector<int64_t>>
  getOffsetsForTimestamp(int64_t timestamp) override {
    UNUSED_ARG(timestamp);
    return {
        std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
  }
  std::unordered_map<std::string, std::vector<int64_t>>
  getCurrentOffsets() override {
    std::unordered_map<std::string, std::vector<int64_t>> offsets;
    return offsets;
  }
  void seek(const std::string &topic, uint32_t partition,
            int64_t offset) override {
    UNUSED_ARG(topic);
    UNUSED_ARG(partition);
    UNUSED_ARG(offset);
  }
};

// -----------------------------------------------------------------------------
// Fake stream to provide empty stream to client
// -----------------------------------------------------------------------------
class FakeEmptyStreamSubscriber
    : public Mantid::LiveData::IKafkaStreamSubscriber {
public:
  void subscribe() override {}
  void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
  void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
                      std::string &topic) override {
    buffer->clear();
    UNUSED_ARG(offset);
    UNUSED_ARG(partition);
    UNUSED_ARG(topic);
  }
  std::unordered_map<std::string, std::vector<int64_t>>
  getOffsetsForTimestamp(int64_t timestamp) override {
    UNUSED_ARG(timestamp);
    return {
        std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
  }
  std::unordered_map<std::string, std::vector<int64_t>>
  getCurrentOffsets() override {
    std::unordered_map<std::string, std::vector<int64_t>> offsets;
    return offsets;
  }
  void seek(const std::string &topic, uint32_t partition,
            int64_t offset) override {
    UNUSED_ARG(topic);
    UNUSED_ARG(partition);
    UNUSED_ARG(offset);
  }
void fakeReceiveAnEventMessage(std::string *buffer, int32_t nextPeriod) {
  flatbuffers::FlatBufferBuilder builder;
  std::vector<uint32_t> spec = {5, 4, 3, 2, 1, 2};
  std::vector<uint32_t> tof = {11000, 10000, 9000, 8000, 7000, 6000};

  uint64_t frameTime = 1;
  float protonCharge(0.5f);

  auto messageFlatbuf = CreateEventMessage(
      builder, builder.CreateString("KafkaTesting"), 0, frameTime,
      builder.CreateVector(tof), builder.CreateVector(spec),
      FacilityData_ISISData,
      CreateISISData(builder, static_cast<uint32_t>(nextPeriod),
                     RunState_RUNNING, protonCharge).Union());
  FinishEventMessageBuffer(builder, messageFlatbuf);

  // Copy to provided buffer
  buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
                 builder.GetSize());
}

void fakeReceiveASampleEnvMessage(std::string *buffer) {
  flatbuffers::FlatBufferBuilder builder;
  // Sample environment log
  auto logDataMessage =
      CreateLogData(builder, builder.CreateString("fake source"), Value_Int,
                    CreateInt(builder, 42).Union(), 1495618188000000000L);
  FinishLogDataBuffer(builder, logDataMessage);

  // Copy to provided buffer
  buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
                 builder.GetSize());
}

void fakeReceiveARunStartMessage(std::string *buffer, int32_t runNumber,
                                 const std::string &startTime,
                                 const std::string &instName,
                                 int32_t nPeriods) {
  // Convert date to time_t
  auto mantidTime = Mantid::Types::Core::DateAndTime(startTime);
  auto startTimestamp =
      static_cast<uint64_t>(mantidTime.to_time_t() * 1000000000);

  flatbuffers::FlatBufferBuilder builder;
  auto runInfo = CreateRunInfo(
      builder, InfoTypes_RunStart,
      CreateRunStart(builder, startTimestamp, runNumber,
                     builder.CreateString(instName), nPeriods).Union());
  FinishRunInfoBuffer(builder, runInfo);
  // Copy to provided buffer
  buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
                 builder.GetSize());
}

void fakeReceiveARunStopMessage(std::string *buffer,
                                const std::string &stopTime) {
  // Convert date to time_t
  auto mantidTime = Mantid::Types::Core::DateAndTime(stopTime);
  auto stopTimestamp =
      static_cast<uint64_t>(mantidTime.to_time_t() * 1000000000);

  flatbuffers::FlatBufferBuilder builder;
  auto runInfo = CreateRunInfo(builder, InfoTypes_RunStop,
                               CreateRunStop(builder, stopTimestamp).Union());
  FinishRunInfoBuffer(builder, runInfo);
  // Copy to provided buffer
  buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
                 builder.GetSize());
}

// -----------------------------------------------------------------------------
// Fake ISIS event stream to provide event and sample environment data
// -----------------------------------------------------------------------------
class FakeISISEventSubscriber
    : public Mantid::LiveData::IKafkaStreamSubscriber {
public:
  explicit FakeISISEventSubscriber(int32_t nperiods)
      : m_nperiods(nperiods), m_nextPeriod(0) {}
  void subscribe() override {}
  void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
  void consumeMessage(std::string *message, int64_t &offset, int32_t &partition,
                      std::string &topic) override {
    assert(message);
    fakeReceiveAnEventMessage(message, m_nextPeriod);
    m_nextPeriod = ((m_nextPeriod + 1) % m_nperiods);

    UNUSED_ARG(offset);
    UNUSED_ARG(partition);
    UNUSED_ARG(topic);
  }
  std::unordered_map<std::string, std::vector<int64_t>>
  getOffsetsForTimestamp(int64_t timestamp) override {
    UNUSED_ARG(timestamp);
    return {
        std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
  std::unordered_map<std::string, std::vector<int64_t>>
  getCurrentOffsets() override {
    std::unordered_map<std::string, std::vector<int64_t>> offsets;
    return offsets;
  }
  void seek(const std::string &topic, uint32_t partition,
            int64_t offset) override {
    UNUSED_ARG(topic);
    UNUSED_ARG(partition);
    UNUSED_ARG(offset);
  }
private:
  const int32_t m_nperiods;
  int32_t m_nextPeriod;
};

// -----------------------------------------------------------------------------
// Fake event stream to provide sample environment data
// -----------------------------------------------------------------------------
class FakeSampleEnvironmentSubscriber
    : public Mantid::LiveData::IKafkaStreamSubscriber {
public:
  void subscribe() override {}
  void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
  void consumeMessage(std::string *message, int64_t &offset, int32_t &partition,
                      std::string &topic) override {
    assert(message);

    fakeReceiveASampleEnvMessage(message);

    UNUSED_ARG(offset);
    UNUSED_ARG(partition);
    UNUSED_ARG(topic);
  }
  std::unordered_map<std::string, std::vector<int64_t>>
  getOffsetsForTimestamp(int64_t timestamp) override {
    UNUSED_ARG(timestamp);
    return {
        std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
  std::unordered_map<std::string, std::vector<int64_t>>
  getCurrentOffsets() override {
    std::unordered_map<std::string, std::vector<int64_t>> offsets;
    return offsets;
  }
  void seek(const std::string &topic, uint32_t partition,
            int64_t offset) override {
    UNUSED_ARG(topic);
    UNUSED_ARG(partition);
    UNUSED_ARG(offset);
  }
// -----------------------------------------------------------------------------
// Fake run data stream
// -----------------------------------------------------------------------------
class FakeRunInfoStreamSubscriber
    : public Mantid::LiveData::IKafkaStreamSubscriber {
public:
  explicit FakeRunInfoStreamSubscriber(int32_t nperiods)
  void subscribe() override {}
  void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
  void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
                      std::string &topic) override {
    fakeReceiveARunStartMessage(buffer, m_runNumber, m_startTime, m_instName,
                                m_nperiods);

    UNUSED_ARG(offset);
    UNUSED_ARG(partition);
    UNUSED_ARG(topic);
  }
  std::unordered_map<std::string, std::vector<int64_t>>
  getOffsetsForTimestamp(int64_t timestamp) override {
    UNUSED_ARG(timestamp);
    return {
        std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
  std::unordered_map<std::string, std::vector<int64_t>>
  getCurrentOffsets() override {
    std::unordered_map<std::string, std::vector<int64_t>> offsets;
    return offsets;
  }
  void seek(const std::string &topic, uint32_t partition,
            int64_t offset) override {
    UNUSED_ARG(topic);
    UNUSED_ARG(partition);
    UNUSED_ARG(offset);
  }

private:
  std::string m_startTime = "2016-08-31T12:07:42";
  int32_t m_runNumber = 1000;
  std::string m_instName = "HRPDTEST";
  int32_t m_nperiods = 1;
// -----------------------------------------------------------------------------
// Fake data stream with run and event messages
// -----------------------------------------------------------------------------
class FakeDataStreamSubscriber
    : public Mantid::LiveData::IKafkaStreamSubscriber {
public:
  void subscribe() override {}
  void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
  void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
                      std::string &topic) override {
    assert(buffer);

    // Return a messages in this order:
    // Run start
    // Event data
    // Run stop
    // Run start
    // Event data... ad infinitum

    switch (m_currentOffset) {
    case 0:
      fakeReceiveARunStartMessage(buffer, 1000, m_startTime, m_instName,
                                  m_nperiods);
      break;
    case 2:
      fakeReceiveARunStopMessage(buffer, m_stopTime);
      break;
    case 3:
      fakeReceiveARunStartMessage(buffer, 1000, m_startTime, m_instName,
                                  m_nperiods);
      break;
    default:
      fakeReceiveAnEventMessage(buffer, 0);
    }
    topic = "topic_name";
    offset = m_currentOffset;
    partition = 0;
    m_currentOffset++;
  }
  std::unordered_map<std::string, std::vector<int64_t>>
  getOffsetsForTimestamp(int64_t timestamp) override {
    UNUSED_ARG(timestamp);
    return {std::pair<std::string, std::vector<int64_t>>(m_topicName,
                                                         {m_stopOffset})};
  }
  std::unordered_map<std::string, std::vector<int64_t>>
  getCurrentOffsets() override {
    return {std::pair<std::string, std::vector<int64_t>>(m_topicName,
                                                         {m_currentOffset})};
  }
  void seek(const std::string &topic, uint32_t partition,
            int64_t offset) override {
    UNUSED_ARG(topic);
    UNUSED_ARG(partition);
    UNUSED_ARG(offset);
  }

private:
  const std::string m_topicName = "topic_name";
  uint32_t m_currentOffset = 0;
  std::string m_startTime = "2016-08-31T12:07:42";
  std::string m_stopTime = "2016-08-31T12:07:52";
  const std::string m_instName = "HRPDTEST";
  int32_t m_nperiods = 1;
  int64_t m_stopOffset = 1;
};

// -----------------------------------------------------------------------------
// Fake ISIS spectra-detector stream
// -----------------------------------------------------------------------------
class FakeISISSpDetStreamSubscriber
    : public Mantid::LiveData::IKafkaStreamSubscriber {
public:
  void subscribe() override {}
  void subscribe(int64_t offset) override { UNUSED_ARG(offset) }
  void consumeMessage(std::string *buffer, int64_t &offset, int32_t &partition,
                      std::string &topic) override {
    assert(buffer);

    // Serialize data with flatbuffers
    flatbuffers::FlatBufferBuilder builder;
    auto specVector = builder.CreateVector(m_spec);
    auto detIdsVector = builder.CreateVector(m_detid);
Matthew D Jones's avatar
Matthew D Jones committed
    auto spdet = CreateSpectraDetectorMapping(
        builder, specVector, detIdsVector, static_cast<int32_t>(m_spec.size()));
    FinishSpectraDetectorMappingBuffer(builder, spdet);
    // Copy to provided buffer
    buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
                   builder.GetSize());

    UNUSED_ARG(offset);
    UNUSED_ARG(partition);
    UNUSED_ARG(topic);
  }
  std::unordered_map<std::string, std::vector<int64_t>>
  getOffsetsForTimestamp(int64_t timestamp) override {
    UNUSED_ARG(timestamp);
    return {
        std::pair<std::string, std::vector<int64_t>>("topic_name", {1, 2, 3})};
  std::unordered_map<std::string, std::vector<int64_t>>
  getCurrentOffsets() override {
    std::unordered_map<std::string, std::vector<int64_t>> offsets;
    return offsets;
  }
  void seek(const std::string &topic, uint32_t partition,
            int64_t offset) override {
    UNUSED_ARG(topic);
    UNUSED_ARG(partition);
    UNUSED_ARG(offset);
  }

private:
  std::vector<int32_t> m_spec = {1, 2, 3, 4, 5};
  // These match the detector numbers in HRPDTEST_Definition.xml
  std::vector<int32_t> m_detid = {1001, 1002, 1100, 901000, 10100};
} // namespace ISISKafkaTesting

#endif // MANTID_LIVEDATA_ISISKAFKAEVENTSTREAMDECODERTESTMOCKS_H_