From 60dff7f12eb8c91a8ccb60d9ca1132771f26fd8c Mon Sep 17 00:00:00 2001
From: Martyn Gigg <martyn.gigg@gmail.com>
Date: Wed, 31 Aug 2016 16:21:40 +0100
Subject: [PATCH] Add unit tests using faked data sources

---
 Framework/LiveData/CMakeLists.txt             |   4 +
 Framework/LiveData/test/CMakeLists.txt        |   1 +
 .../test/ISISKafkaEventStreamDecoderTest.h    | 112 +++++++++++++++++-
 Framework/LiveData/test/ISISKafkaTesting.h    |  78 +++++++++++-
 .../UnitTestFacilities.xml                    |   5 +
 5 files changed, 188 insertions(+), 12 deletions(-)

diff --git a/Framework/LiveData/CMakeLists.txt b/Framework/LiveData/CMakeLists.txt
index 0f834d82c43..17d3182960f 100644
--- a/Framework/LiveData/CMakeLists.txt
+++ b/Framework/LiveData/CMakeLists.txt
@@ -71,6 +71,10 @@ if ( LIBRDKAFKA_FOUND )
     inc/MantidLiveData/Kafka/IKafkaBroker.h
     inc/MantidLiveData/Kafka/KafkaBroker.h
     inc/MantidLiveData/Kafka/KafkaTopicSubscriber.h
+    src/ISIS/private/Kafka/Schema/flatbuffers/flatbuffers.h
+    src/ISIS/private/Kafka/Schema/det_spec_mapping_schema_generated.h
+    src/ISIS/private/Kafka/Schema/event_schema_generated.h
+    src/ISIS/private/Kafka/Schema/run_info_schema_generated.h
   )
   set ( TEST_FILES
     ${TEST_FILES}
diff --git a/Framework/LiveData/test/CMakeLists.txt b/Framework/LiveData/test/CMakeLists.txt
index ffb86fcaf97..ef70f7721ab 100644
--- a/Framework/LiveData/test/CMakeLists.txt
+++ b/Framework/LiveData/test/CMakeLists.txt
@@ -24,6 +24,7 @@ if ( CXXTEST_FOUND )
             ${Boost_LIBRARIES}
             ${POCO_LIBRARIES}
             ${GMOCK_LIBRARIES} )
+  target_include_directories ( LiveDataTest PRIVATE ../src/ )
   add_dependencies ( LiveDataTest DataHandling Algorithms MDAlgorithms )
   add_dependencies ( FrameworkTests LiveDataTest )
   # Test data
diff --git a/Framework/LiveData/test/ISISKafkaEventStreamDecoderTest.h b/Framework/LiveData/test/ISISKafkaEventStreamDecoderTest.h
index 55d0bbcbdcd..afacfeee9e2 100644
--- a/Framework/LiveData/test/ISISKafkaEventStreamDecoderTest.h
+++ b/Framework/LiveData/test/ISISKafkaEventStreamDecoderTest.h
@@ -4,9 +4,13 @@
 #include <cxxtest/TestSuite.h>
 
 #include "MantidLiveData/ISIS/ISISKafkaEventStreamDecoder.h"
+#include "MantidAPI/Run.h"
+#include "MantidGeometry/Instrument.h"
+#include "MantidKernel/ConfigService.h"
 #include "MantidKernel/make_unique.h"
 #include "ISISKafkaTesting.h"
 
+#include <Poco/Path.h>
 #include <thread>
 
 class ISISKafkaEventStreamDecoderTest : public CxxTest::TestSuite {
@@ -20,23 +24,83 @@ public:
     delete suite;
   }
 
+  void setUp() override {
+    // Temporarily change the instrument directory to the testing one
+    using Mantid::Kernel::ConfigService;
+    auto &config = ConfigService::Instance();
+    auto baseInstDir = config.getInstrumentDirectory();
+    Poco::Path testFile =
+        Poco::Path(baseInstDir)
+            .resolve("IDFs_for_UNIT_TESTING/UnitTestFacilities.xml");
+    // Load the test facilities file
+    config.updateFacilities(testFile.toString());
+    config.setFacility("TEST");
+    // Update instrument search directory
+    config.setString("instrumentDefinition.directory",
+                     baseInstDir + "/IDFs_for_UNIT_TESTING");
+  }
+
+  void tearDown() override {
+    using Mantid::Kernel::ConfigService;
+    auto &config = ConfigService::Instance();
+    config.reset();
+    // Restore the main facilities file
+    config.updateFacilities();
+  }
+
   //----------------------------------------------------------------------------
   // Success tests
   //----------------------------------------------------------------------------
-  void xtest_Single_Period_Event_Stream() {
+  void test_Single_Period_Event_Stream() {
     using namespace ::testing;
     using namespace ISISKafkaTesting;
+    using Mantid::API::Workspace_sptr;
+    using Mantid::DataObjects::EventWorkspace;
     using namespace Mantid::LiveData;
 
     MockKafkaBroker mockBroker;
     EXPECT_CALL(mockBroker, subscribe_(_))
         .Times(Exactly(3))
-        .WillOnce(Return(new FakeISISSinglePeriodStreamSubscriber))
+        .WillOnce(Return(new FakeISISSinglePeriodEventSubscriber))
         .WillOnce(Return(new FakeISISRunInfoStreamSubscriber))
         .WillOnce(Return(new FakeISISSpDetStreamSubscriber));
     auto decoder = createTestDecoder(mockBroker);
     TS_ASSERT_THROWS_NOTHING(decoder->startCapture());
-    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    std::this_thread::sleep_for(std::chrono::milliseconds(500));
+    Workspace_sptr workspace;
+    TS_ASSERT_THROWS_NOTHING(workspace = decoder->extractData());
+    TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
+    TS_ASSERT(!decoder->isRunning());
+
+    // Workspace checks
+    TS_ASSERT(workspace);
+    auto eventWksp = boost::dynamic_pointer_cast<EventWorkspace>(workspace);
+    TS_ASSERT(eventWksp);
+    // Metadata
+    TS_ASSERT(eventWksp->getInstrument());
+    TS_ASSERT_EQUALS("HRPDTEST", eventWksp->getInstrument()->getName());
+    TS_ASSERT_EQUALS(
+        "2016-08-31T12:07:42",
+        eventWksp->run().getPropertyValueAsType<std::string>("run_start"));
+    // Data
+    TS_ASSERT_EQUALS(5, eventWksp->getNumberHistograms());
+    // A timer-based test so only check we actually got something
+    TS_ASSERT(eventWksp->getNumberEvents() > 0);
+  }
+
+  void test_Empty_Event_Stream_Waits() {
+    using namespace ::testing;
+    using namespace ISISKafkaTesting;
+
+    MockKafkaBroker mockBroker;
+    EXPECT_CALL(mockBroker, subscribe_(_))
+        .Times(Exactly(3))
+        .WillOnce(Return(new FakeEmptyStreamSubscriber))
+        .WillOnce(Return(new FakeISISRunInfoStreamSubscriber))
+        .WillOnce(Return(new FakeISISSpDetStreamSubscriber));
+    auto decoder = createTestDecoder(mockBroker);
+    TS_ASSERT_THROWS_NOTHING(decoder->startCapture());
+    std::this_thread::sleep_for(std::chrono::milliseconds(500));
     TS_ASSERT_THROWS_NOTHING(decoder->extractData());
     TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
     TS_ASSERT(!decoder->isRunning());
@@ -45,15 +109,33 @@ public:
   //----------------------------------------------------------------------------
   // Failure tests
   //----------------------------------------------------------------------------
-  void test_Empty_Stream_Throws_Error_On_ExtractData() {
+  void test_Error_In_Stream_Extraction_Throws_Error_On_ExtractData() {
     using namespace ::testing;
     using namespace ISISKafkaTesting;
 
     MockKafkaBroker mockBroker;
     EXPECT_CALL(mockBroker, subscribe_(_))
         .Times(Exactly(3))
-        .WillOnce(Return(new FakeEmptyStreamSubscriber))
-        .WillOnce(Return(new FakeEmptyStreamSubscriber))
+        .WillOnce(Return(new FakeExceptionThrowingStreamSubscriber))
+        .WillOnce(Return(new FakeExceptionThrowingStreamSubscriber))
+        .WillOnce(Return(new FakeExceptionThrowingStreamSubscriber));
+    auto decoder = createTestDecoder(mockBroker);
+    TS_ASSERT_THROWS_NOTHING(decoder->startCapture());
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    TS_ASSERT_THROWS(decoder->extractData(), std::runtime_error);
+    TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
+    TS_ASSERT(!decoder->isRunning());
+  }
+
+  void test_Empty_SpDet_Stream_Throws_Error_On_ExtractData() {
+    using namespace ::testing;
+    using namespace ISISKafkaTesting;
+
+    MockKafkaBroker mockBroker;
+    EXPECT_CALL(mockBroker, subscribe_(_))
+        .Times(Exactly(3))
+        .WillOnce(Return(new FakeISISSinglePeriodEventSubscriber))
+        .WillOnce(Return(new FakeISISRunInfoStreamSubscriber))
         .WillOnce(Return(new FakeEmptyStreamSubscriber));
     auto decoder = createTestDecoder(mockBroker);
     TS_ASSERT_THROWS_NOTHING(decoder->startCapture());
@@ -63,6 +145,24 @@ public:
     TS_ASSERT(!decoder->isRunning());
   }
 
+  void test_Empty_RunInfo_Stream_Throws_Error_On_ExtractData() {
+    using namespace ::testing;
+    using namespace ISISKafkaTesting;
+
+    MockKafkaBroker mockBroker;
+    EXPECT_CALL(mockBroker, subscribe_(_))
+        .Times(Exactly(3))
+        .WillOnce(Return(new FakeISISSinglePeriodEventSubscriber))
+        .WillOnce(Return(new FakeEmptyStreamSubscriber))
+        .WillOnce(Return(new FakeISISSpDetStreamSubscriber));
+    auto decoder = createTestDecoder(mockBroker);
+    TS_ASSERT_THROWS_NOTHING(decoder->startCapture());
+    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
+    TS_ASSERT_THROWS(decoder->extractData(), std::runtime_error);
+    TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
+    TS_ASSERT(!decoder->isRunning());
+  }
+
 private:
   std::unique_ptr<Mantid::LiveData::ISISKafkaEventStreamDecoder>
   createTestDecoder(const Mantid::LiveData::IKafkaBroker &broker) {
diff --git a/Framework/LiveData/test/ISISKafkaTesting.h b/Framework/LiveData/test/ISISKafkaTesting.h
index 7246b62b2f2..5cd9899a0dc 100644
--- a/Framework/LiveData/test/ISISKafkaTesting.h
+++ b/Framework/LiveData/test/ISISKafkaTesting.h
@@ -5,6 +5,14 @@
 #include "MantidKernel/WarningSuppressions.h"
 #include <gmock/gmock.h>
 
+GCC_DIAG_OFF(conversion)
+#include "ISIS/private/Kafka/Schema/det_spec_mapping_schema_generated.h"
+#include "ISIS/private/Kafka/Schema/event_schema_generated.h"
+#include "ISIS/private/Kafka/Schema/run_info_schema_generated.h"
+GCC_DIAG_ON(conversion)
+
+#include <ctime>
+
 namespace ISISKafkaTesting {
 
 // -----------------------------------------------------------------------------
@@ -29,23 +37,40 @@ public:
 };
 
 // -----------------------------------------------------------------------------
-// Fake stream to provide empty data to tests
+// Fake stream to raise error to tests
+// -----------------------------------------------------------------------------
+class FakeExceptionThrowingStreamSubscriber
+    : public Mantid::LiveData::IKafkaStreamSubscriber {
+public:
+  void subscribe() override {}
+  void consumeMessage(std::string *buffer) override {
+    buffer->clear();
+    throw std::runtime_error("FakeExceptionThrowingStreamSubscriber");
+  }
+};
+
+// -----------------------------------------------------------------------------
+// Fake stream to provide empty stream to client
 // -----------------------------------------------------------------------------
 class FakeEmptyStreamSubscriber
     : public Mantid::LiveData::IKafkaStreamSubscriber {
 public:
   void subscribe() override {}
-  bool consumeMessage(std::string *) override { return false; }
+  void consumeMessage(std::string *buffer) override { buffer->clear(); }
 };
 
 // -----------------------------------------------------------------------------
 // Fake ISIS event stream to provide event data
 // -----------------------------------------------------------------------------
-class FakeISISSinglePeriodStreamSubscriber
+class FakeISISSinglePeriodEventSubscriber
     : public Mantid::LiveData::IKafkaStreamSubscriber {
 public:
   void subscribe() override {}
-  bool consumeMessage(std::string *) override { return true; }
+  void consumeMessage(std::string *buffer) override { assert(buffer); }
+
+private:
+  std::vector<int32_t> m_spec = {};
+  std::vector<float> m_tof = {};
 };
 
 // -----------------------------------------------------------------------------
@@ -55,7 +80,30 @@ class FakeISISRunInfoStreamSubscriber
     : public Mantid::LiveData::IKafkaStreamSubscriber {
 public:
   void subscribe() override {}
-  bool consumeMessage(std::string *) override { return true; }
+  void consumeMessage(std::string *buffer) override {
+    assert(buffer);
+
+    // Convert date to time_t
+    std::tm tmb;
+    strptime(m_startTime.c_str(), "%Y-%m-%dT%H:%M:%S", &tmb);
+    uint64_t startTime = static_cast<uint64_t>(std::mktime(&tmb));
+
+    // Serialize data with flatbuffers
+    flatbuffers::FlatBufferBuilder builder;
+    auto runInfo = ISISDAE::CreateRunInfo(builder, startTime, m_runNumber,
+                                          builder.CreateString(m_instName),
+                                          m_streamOffset);
+    builder.Finish(runInfo);
+    // Copy to provided buffer
+    buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
+                   builder.GetSize());
+  }
+
+private:
+  std::string m_startTime = "2016-08-31T12:07:42";
+  int32_t m_runNumber = 1000;
+  std::string m_instName = "HRPDTEST";
+  int64_t m_streamOffset = 0;
 };
 
 // -----------------------------------------------------------------------------
@@ -65,7 +113,25 @@ class FakeISISSpDetStreamSubscriber
     : public Mantid::LiveData::IKafkaStreamSubscriber {
 public:
   void subscribe() override {}
-  bool consumeMessage(std::string *) override { return true; }
+  void consumeMessage(std::string *buffer) override {
+    assert(buffer);
+
+    // Serialize data with flatbuffers
+    flatbuffers::FlatBufferBuilder builder;
+    auto specVector = builder.CreateVector(m_spec);
+    auto detIdsVector = builder.CreateVector(m_detid);
+    auto spdet = ISISDAE::CreateSpectraDetectorMapping(builder, specVector,
+                                                       detIdsVector);
+    builder.Finish(spdet);
+    // Copy to provided buffer
+    buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
+                   builder.GetSize());
+  }
+
+private:
+  std::vector<int32_t> m_spec = {1, 2, 3, 4, 5};
+  // These match the detector numbers in HRPDTEST_Definition.xml
+  std::vector<int32_t> m_detid = {1001, 1002, 1100, 901000, 10100};
 };
 }
 
diff --git a/instrument/IDFs_for_UNIT_TESTING/UnitTestFacilities.xml b/instrument/IDFs_for_UNIT_TESTING/UnitTestFacilities.xml
index 84a5d4a155a..92b5782dee8 100644
--- a/instrument/IDFs_for_UNIT_TESTING/UnitTestFacilities.xml
+++ b/instrument/IDFs_for_UNIT_TESTING/UnitTestFacilities.xml
@@ -27,6 +27,11 @@
     <livedata listener="FakeEventDataListener" address="127.0.0.1:0" />
   </instrument>
 
+  <instrument name="HRPDTEST">
+    <technique>Test Listener</technique>
+    <livedata listener="ISISKafkaEventListener" address="127.0.0.1:0" />
+  </instrument>
+
 </facility>
 
 </facilities>
-- 
GitLab