Skip to content
Snippets Groups Projects
KafkaHistoStreamDecoderTest.h 7.41 KiB
Newer Older
// Mantid Repository : https://github.com/mantidproject/mantid
//
// Copyright © 2018 ISIS Rutherford Appleton Laboratory UKRI,
//   NScD Oak Ridge National Laboratory, European Spallation Source,
//   Institut Laue - Langevin & CSNS, Institute of High Energy Physics, CAS
// SPDX - License - Identifier: GPL - 3.0 +

#include <cxxtest/TestSuite.h>

#include "KafkaTesting.h"
#include "MantidAPI/Run.h"
#include "MantidGeometry/Instrument.h"
#include "MantidHistogramData/FixedLengthVector.h"
#include "MantidHistogramData/HistogramX.h"
#include "MantidHistogramData/HistogramY.h"
#include "MantidKernel/ConfigService.h"
#include "MantidKernel/TimeSeriesProperty.h"
#include "MantidLiveData/Kafka/KafkaHistoStreamDecoder.h"
#include <Poco/Path.h>

using Mantid::LiveData::KafkaHistoStreamDecoder;
using namespace KafkaTesting;

class KafkaHistoStreamDecoderTest : public CxxTest::TestSuite {
public:
  // This pair of boilerplate methods prevent the suite being created statically
  // This means the constructor isn't called when running other tests
  static KafkaHistoStreamDecoderTest *createSuite() {
    return new KafkaHistoStreamDecoderTest();
  }
  static void destroySuite(KafkaHistoStreamDecoderTest *suite) { delete suite; }

  void setUp() override {
    // Temporarily change the instrument directory to the testing one
    using Mantid::Kernel::ConfigService;
    auto &config = ConfigService::Instance();
    auto baseInstDir = config.getInstrumentDirectory();
    Poco::Path testFile =
        Poco::Path(baseInstDir).resolve("unit_testing/UnitTestFacilities.xml");
    // Load the test facilities file
    config.updateFacilities(testFile.toString());
    config.setFacility("TEST");
    // Update instrument search directory
    config.setString("instrumentDefinition.directory",
                     baseInstDir + "/unit_testing");
  }

  void tearDown() override {
    using Mantid::Kernel::ConfigService;
    auto &config = ConfigService::Instance();
    config.reset();
    // Restore the main facilities file
    config.updateFacilities();
  }

  void test_Histo_Stream() {
    using namespace ::testing;
    using namespace KafkaTesting;
    using Mantid::API::Workspace_sptr;
    using Mantid::DataObjects::Workspace2D;
    using namespace Mantid::LiveData;

    auto mockBroker = std::make_shared<MockKafkaBroker>();
    EXPECT_CALL(*mockBroker, subscribe_(_, _))
        .Times(Exactly(3))
        .WillOnce(Return(new FakeHistoSubscriber()))
        .WillOnce(Return(new FakeRunInfoStreamSubscriber(1)))
        .WillOnce(Return(new FakeISISSpDetStreamSubscriber));
    auto decoder = createTestDecoder(mockBroker);
    TSM_ASSERT("Decoder should not have create data buffers yet",
               !decoder->hasData());
    startCapturing(*decoder, 1);

    // Checks
    Workspace_sptr workspace;
    TSM_ASSERT("Decoder's data buffers should be created now",
               decoder->hasData());
    TS_ASSERT_THROWS_NOTHING(workspace = decoder->extractData());
    TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
    TS_ASSERT(!decoder->isCapturing());

    // -- Workspace checks --
    TSM_ASSERT("Expected non-null workspace pointer from extractData()",
               workspace);
    auto histoWksp = boost::dynamic_pointer_cast<Workspace2D>(workspace);
    TSM_ASSERT(
        "Expected a Workspace2D from extractData(). Found something else",
        histoWksp);
    checkWorkspaceMetadata(*histoWksp);
    checkWorkspaceHistoData(*histoWksp);
Lamar Moore's avatar
Lamar Moore committed
    TS_ASSERT(Mock::VerifyAndClear(mockBroker.get()));
  std::unique_ptr<Mantid::LiveData::KafkaHistoStreamDecoder> createTestDecoder(
      const std::shared_ptr<Mantid::LiveData::IKafkaBroker> &broker) {
    using namespace Mantid::LiveData;
    return std::make_unique<KafkaHistoStreamDecoder>(broker, "", "", "", "",
                                                     "");
  }

  // Start decoding and wait until we have gathered enough data to test
  void startCapturing(Mantid::LiveData::KafkaHistoStreamDecoder &decoder,
                      uint8_t maxIterations) {
    // Register callback to know when a whole loop as been iterated through
    m_maxIterations = maxIterations;
    m_niterations = 0;
    decoder.registerIterationEndCb([this]() { this->iterationCallback(); });

    decoder.registerErrorCb([this, &decoder]() { errCallback(decoder); });
    TS_ASSERT_THROWS_NOTHING(decoder.startCapture());
    continueCapturing(decoder, maxIterations);
  }

  void iterationCallback() {
    std::unique_lock<std::mutex> lock(this->m_callbackMutex);
    this->m_niterations++;
    if (this->m_niterations == m_maxIterations) {
      lock.unlock();
      this->m_callbackCondition.notify_one();
    }
  }

  void errCallback(KafkaHistoStreamDecoder &decoder) {
    try {
      // Get the stored exception by calling extract data again
      decoder.extractData();
    } catch (std::exception &e) {
      // We could try to port the exception from this child thread to the main
      // thread, or just print it here which is significantly easier
      std::cerr << "Exception: " << e.what() << "\n";
      // Always keep incrementing so we don't deadlock
      iterationCallback();
    }
  }

  void continueCapturing(KafkaHistoStreamDecoder &decoder,
                         uint8_t maxIterations) {
    m_maxIterations = maxIterations;

    // Re-register callback with the (potentially) new value of maxIterations
    decoder.registerIterationEndCb([this]() { this->iterationCallback(); });
    decoder.registerErrorCb([this, &decoder]() { errCallback(decoder); });

    {
      std::unique_lock<std::mutex> lk(m_callbackMutex);
      this->m_callbackCondition.wait(
          lk, [this]() { return this->m_niterations == m_maxIterations; });
    }
  }

  void
  checkWorkspaceMetadata(const Mantid::DataObjects::Workspace2D &histoWksp) {
    TS_ASSERT(histoWksp.getInstrument());
    TS_ASSERT_EQUALS("HRPDTEST", histoWksp.getInstrument()->getName());
    TS_ASSERT_EQUALS(
        "2016-08-31T12:07:42",
        histoWksp.run().getPropertyValueAsType<std::string>("run_start"));
    std::array<Mantid::specnum_t, 5> specs = {{1, 2, 3, 4, 5}};
    std::array<Mantid::detid_t, 5> ids = {{1001, 1002, 1100, 901000, 10100}};
    TS_ASSERT_EQUALS(specs.size(), histoWksp.getNumberHistograms());
    for (size_t i = 0; i < histoWksp.getNumberHistograms(); ++i) {
      const auto &spec = histoWksp.getSpectrum(i);
      TS_ASSERT_EQUALS(specs[i], spec.getSpectrumNo());
      const auto &sid = spec.getDetectorIDs();
      TS_ASSERT_EQUALS(ids[i], *(sid.begin()));
    }
  }

  void
  checkWorkspaceHistoData(const Mantid::DataObjects::Workspace2D &histoWksp) {
    // Inspect all 5 HRPDTEST Spectra
    auto data = histoWksp.histogram(0);
    // std::vector<double> xbins(data.x().cbegin(), data.x().cend());
    TS_ASSERT_EQUALS(data.x().rawData(), (std::vector<double>{0, 1, 2}));
    TS_ASSERT_EQUALS(data.y().rawData(), (std::vector<double>{100, 140}));

    data = histoWksp.histogram(1);
    TS_ASSERT_EQUALS(data.y().rawData(), (std::vector<double>{210, 100}));

    data = histoWksp.histogram(2);
    TS_ASSERT_EQUALS(data.y().rawData(), (std::vector<double>{110, 70}));

    data = histoWksp.histogram(3);
    TS_ASSERT_EQUALS(data.y().rawData(), (std::vector<double>{5, 3}));

    data = histoWksp.histogram(4);
    TS_ASSERT_EQUALS(data.y().rawData(), (std::vector<double>{20, 4}));
  }

private:
  std::mutex m_callbackMutex;
  std::condition_variable m_callbackCondition;
  uint8_t m_niterations = 0;
  std::atomic<uint8_t> m_maxIterations = 0;