diff --git a/Framework/LiveData/CMakeLists.txt b/Framework/LiveData/CMakeLists.txt index 0f834d82c43a85f908b6041ee6dd08e705261f4d..17d3182960f9b735c0979123e6ed858c8f0947e3 100644 --- a/Framework/LiveData/CMakeLists.txt +++ b/Framework/LiveData/CMakeLists.txt @@ -71,6 +71,10 @@ if ( LIBRDKAFKA_FOUND ) inc/MantidLiveData/Kafka/IKafkaBroker.h inc/MantidLiveData/Kafka/KafkaBroker.h inc/MantidLiveData/Kafka/KafkaTopicSubscriber.h + src/ISIS/private/Kafka/Schema/flatbuffers/flatbuffers.h + src/ISIS/private/Kafka/Schema/det_spec_mapping_schema_generated.h + src/ISIS/private/Kafka/Schema/event_schema_generated.h + src/ISIS/private/Kafka/Schema/run_info_schema_generated.h ) set ( TEST_FILES ${TEST_FILES} diff --git a/Framework/LiveData/test/CMakeLists.txt b/Framework/LiveData/test/CMakeLists.txt index ffb86fcaf9730ce31f4025577b42638a0d467d77..ef70f7721abd19bf3c8d6dd58bb2a64ba6ab62d9 100644 --- a/Framework/LiveData/test/CMakeLists.txt +++ b/Framework/LiveData/test/CMakeLists.txt @@ -24,6 +24,7 @@ if ( CXXTEST_FOUND ) ${Boost_LIBRARIES} ${POCO_LIBRARIES} ${GMOCK_LIBRARIES} ) + target_include_directories ( LiveDataTest PRIVATE ../src/ ) add_dependencies ( LiveDataTest DataHandling Algorithms MDAlgorithms ) add_dependencies ( FrameworkTests LiveDataTest ) # Test data diff --git a/Framework/LiveData/test/ISISKafkaEventStreamDecoderTest.h b/Framework/LiveData/test/ISISKafkaEventStreamDecoderTest.h index 55d0bbcbdcd905d672414d492bbca5831f2126bd..afacfeee9e2cf54b8f5a58f9702d242585e6f60f 100644 --- a/Framework/LiveData/test/ISISKafkaEventStreamDecoderTest.h +++ b/Framework/LiveData/test/ISISKafkaEventStreamDecoderTest.h @@ -4,9 +4,13 @@ #include <cxxtest/TestSuite.h> #include "MantidLiveData/ISIS/ISISKafkaEventStreamDecoder.h" +#include "MantidAPI/Run.h" +#include "MantidGeometry/Instrument.h" +#include "MantidKernel/ConfigService.h" #include "MantidKernel/make_unique.h" #include "ISISKafkaTesting.h" +#include <Poco/Path.h> #include <thread> class ISISKafkaEventStreamDecoderTest : public CxxTest::TestSuite { @@ -20,23 +24,83 @@ public: delete suite; } + void setUp() override { + // Temporarily change the instrument directory to the testing one + using Mantid::Kernel::ConfigService; + auto &config = ConfigService::Instance(); + auto baseInstDir = config.getInstrumentDirectory(); + Poco::Path testFile = + Poco::Path(baseInstDir) + .resolve("IDFs_for_UNIT_TESTING/UnitTestFacilities.xml"); + // Load the test facilities file + config.updateFacilities(testFile.toString()); + config.setFacility("TEST"); + // Update instrument search directory + config.setString("instrumentDefinition.directory", + baseInstDir + "/IDFs_for_UNIT_TESTING"); + } + + void tearDown() override { + using Mantid::Kernel::ConfigService; + auto &config = ConfigService::Instance(); + config.reset(); + // Restore the main facilities file + config.updateFacilities(); + } + //---------------------------------------------------------------------------- // Success tests //---------------------------------------------------------------------------- - void xtest_Single_Period_Event_Stream() { + void test_Single_Period_Event_Stream() { using namespace ::testing; using namespace ISISKafkaTesting; + using Mantid::API::Workspace_sptr; + using Mantid::DataObjects::EventWorkspace; using namespace Mantid::LiveData; MockKafkaBroker mockBroker; EXPECT_CALL(mockBroker, subscribe_(_)) .Times(Exactly(3)) - .WillOnce(Return(new FakeISISSinglePeriodStreamSubscriber)) + .WillOnce(Return(new FakeISISSinglePeriodEventSubscriber)) .WillOnce(Return(new FakeISISRunInfoStreamSubscriber)) .WillOnce(Return(new FakeISISSpDetStreamSubscriber)); auto decoder = createTestDecoder(mockBroker); TS_ASSERT_THROWS_NOTHING(decoder->startCapture()); - std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + Workspace_sptr workspace; + TS_ASSERT_THROWS_NOTHING(workspace = decoder->extractData()); + TS_ASSERT_THROWS_NOTHING(decoder->stopCapture()); + TS_ASSERT(!decoder->isRunning()); + + // Workspace checks + TS_ASSERT(workspace); + auto eventWksp = boost::dynamic_pointer_cast<EventWorkspace>(workspace); + TS_ASSERT(eventWksp); + // Metadata + TS_ASSERT(eventWksp->getInstrument()); + TS_ASSERT_EQUALS("HRPDTEST", eventWksp->getInstrument()->getName()); + TS_ASSERT_EQUALS( + "2016-08-31T12:07:42", + eventWksp->run().getPropertyValueAsType<std::string>("run_start")); + // Data + TS_ASSERT_EQUALS(5, eventWksp->getNumberHistograms()); + // A timer-based test so only check we actually got something + TS_ASSERT(eventWksp->getNumberEvents() > 0); + } + + void test_Empty_Event_Stream_Waits() { + using namespace ::testing; + using namespace ISISKafkaTesting; + + MockKafkaBroker mockBroker; + EXPECT_CALL(mockBroker, subscribe_(_)) + .Times(Exactly(3)) + .WillOnce(Return(new FakeEmptyStreamSubscriber)) + .WillOnce(Return(new FakeISISRunInfoStreamSubscriber)) + .WillOnce(Return(new FakeISISSpDetStreamSubscriber)); + auto decoder = createTestDecoder(mockBroker); + TS_ASSERT_THROWS_NOTHING(decoder->startCapture()); + std::this_thread::sleep_for(std::chrono::milliseconds(500)); TS_ASSERT_THROWS_NOTHING(decoder->extractData()); TS_ASSERT_THROWS_NOTHING(decoder->stopCapture()); TS_ASSERT(!decoder->isRunning()); @@ -45,15 +109,33 @@ public: //---------------------------------------------------------------------------- // Failure tests //---------------------------------------------------------------------------- - void test_Empty_Stream_Throws_Error_On_ExtractData() { + void test_Error_In_Stream_Extraction_Throws_Error_On_ExtractData() { using namespace ::testing; using namespace ISISKafkaTesting; MockKafkaBroker mockBroker; EXPECT_CALL(mockBroker, subscribe_(_)) .Times(Exactly(3)) - .WillOnce(Return(new FakeEmptyStreamSubscriber)) - .WillOnce(Return(new FakeEmptyStreamSubscriber)) + .WillOnce(Return(new FakeExceptionThrowingStreamSubscriber)) + .WillOnce(Return(new FakeExceptionThrowingStreamSubscriber)) + .WillOnce(Return(new FakeExceptionThrowingStreamSubscriber)); + auto decoder = createTestDecoder(mockBroker); + TS_ASSERT_THROWS_NOTHING(decoder->startCapture()); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + TS_ASSERT_THROWS(decoder->extractData(), std::runtime_error); + TS_ASSERT_THROWS_NOTHING(decoder->stopCapture()); + TS_ASSERT(!decoder->isRunning()); + } + + void test_Empty_SpDet_Stream_Throws_Error_On_ExtractData() { + using namespace ::testing; + using namespace ISISKafkaTesting; + + MockKafkaBroker mockBroker; + EXPECT_CALL(mockBroker, subscribe_(_)) + .Times(Exactly(3)) + .WillOnce(Return(new FakeISISSinglePeriodEventSubscriber)) + .WillOnce(Return(new FakeISISRunInfoStreamSubscriber)) .WillOnce(Return(new FakeEmptyStreamSubscriber)); auto decoder = createTestDecoder(mockBroker); TS_ASSERT_THROWS_NOTHING(decoder->startCapture()); @@ -63,6 +145,24 @@ public: TS_ASSERT(!decoder->isRunning()); } + void test_Empty_RunInfo_Stream_Throws_Error_On_ExtractData() { + using namespace ::testing; + using namespace ISISKafkaTesting; + + MockKafkaBroker mockBroker; + EXPECT_CALL(mockBroker, subscribe_(_)) + .Times(Exactly(3)) + .WillOnce(Return(new FakeISISSinglePeriodEventSubscriber)) + .WillOnce(Return(new FakeEmptyStreamSubscriber)) + .WillOnce(Return(new FakeISISSpDetStreamSubscriber)); + auto decoder = createTestDecoder(mockBroker); + TS_ASSERT_THROWS_NOTHING(decoder->startCapture()); + std::this_thread::sleep_for(std::chrono::milliseconds(1000)); + TS_ASSERT_THROWS(decoder->extractData(), std::runtime_error); + TS_ASSERT_THROWS_NOTHING(decoder->stopCapture()); + TS_ASSERT(!decoder->isRunning()); + } + private: std::unique_ptr<Mantid::LiveData::ISISKafkaEventStreamDecoder> createTestDecoder(const Mantid::LiveData::IKafkaBroker &broker) { diff --git a/Framework/LiveData/test/ISISKafkaTesting.h b/Framework/LiveData/test/ISISKafkaTesting.h index 7246b62b2f2be6b863860c950639e3859c2fa69c..5cd9899a0dcf2be92880322fe507459d072dad07 100644 --- a/Framework/LiveData/test/ISISKafkaTesting.h +++ b/Framework/LiveData/test/ISISKafkaTesting.h @@ -5,6 +5,14 @@ #include "MantidKernel/WarningSuppressions.h" #include <gmock/gmock.h> +GCC_DIAG_OFF(conversion) +#include "ISIS/private/Kafka/Schema/det_spec_mapping_schema_generated.h" +#include "ISIS/private/Kafka/Schema/event_schema_generated.h" +#include "ISIS/private/Kafka/Schema/run_info_schema_generated.h" +GCC_DIAG_ON(conversion) + +#include <ctime> + namespace ISISKafkaTesting { // ----------------------------------------------------------------------------- @@ -29,23 +37,40 @@ public: }; // ----------------------------------------------------------------------------- -// Fake stream to provide empty data to tests +// Fake stream to raise error to tests +// ----------------------------------------------------------------------------- +class FakeExceptionThrowingStreamSubscriber + : public Mantid::LiveData::IKafkaStreamSubscriber { +public: + void subscribe() override {} + void consumeMessage(std::string *buffer) override { + buffer->clear(); + throw std::runtime_error("FakeExceptionThrowingStreamSubscriber"); + } +}; + +// ----------------------------------------------------------------------------- +// Fake stream to provide empty stream to client // ----------------------------------------------------------------------------- class FakeEmptyStreamSubscriber : public Mantid::LiveData::IKafkaStreamSubscriber { public: void subscribe() override {} - bool consumeMessage(std::string *) override { return false; } + void consumeMessage(std::string *buffer) override { buffer->clear(); } }; // ----------------------------------------------------------------------------- // Fake ISIS event stream to provide event data // ----------------------------------------------------------------------------- -class FakeISISSinglePeriodStreamSubscriber +class FakeISISSinglePeriodEventSubscriber : public Mantid::LiveData::IKafkaStreamSubscriber { public: void subscribe() override {} - bool consumeMessage(std::string *) override { return true; } + void consumeMessage(std::string *buffer) override { assert(buffer); } + +private: + std::vector<int32_t> m_spec = {}; + std::vector<float> m_tof = {}; }; // ----------------------------------------------------------------------------- @@ -55,7 +80,30 @@ class FakeISISRunInfoStreamSubscriber : public Mantid::LiveData::IKafkaStreamSubscriber { public: void subscribe() override {} - bool consumeMessage(std::string *) override { return true; } + void consumeMessage(std::string *buffer) override { + assert(buffer); + + // Convert date to time_t + std::tm tmb; + strptime(m_startTime.c_str(), "%Y-%m-%dT%H:%M:%S", &tmb); + uint64_t startTime = static_cast<uint64_t>(std::mktime(&tmb)); + + // Serialize data with flatbuffers + flatbuffers::FlatBufferBuilder builder; + auto runInfo = ISISDAE::CreateRunInfo(builder, startTime, m_runNumber, + builder.CreateString(m_instName), + m_streamOffset); + builder.Finish(runInfo); + // Copy to provided buffer + buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()), + builder.GetSize()); + } + +private: + std::string m_startTime = "2016-08-31T12:07:42"; + int32_t m_runNumber = 1000; + std::string m_instName = "HRPDTEST"; + int64_t m_streamOffset = 0; }; // ----------------------------------------------------------------------------- @@ -65,7 +113,25 @@ class FakeISISSpDetStreamSubscriber : public Mantid::LiveData::IKafkaStreamSubscriber { public: void subscribe() override {} - bool consumeMessage(std::string *) override { return true; } + void consumeMessage(std::string *buffer) override { + assert(buffer); + + // Serialize data with flatbuffers + flatbuffers::FlatBufferBuilder builder; + auto specVector = builder.CreateVector(m_spec); + auto detIdsVector = builder.CreateVector(m_detid); + auto spdet = ISISDAE::CreateSpectraDetectorMapping(builder, specVector, + detIdsVector); + builder.Finish(spdet); + // Copy to provided buffer + buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()), + builder.GetSize()); + } + +private: + std::vector<int32_t> m_spec = {1, 2, 3, 4, 5}; + // These match the detector numbers in HRPDTEST_Definition.xml + std::vector<int32_t> m_detid = {1001, 1002, 1100, 901000, 10100}; }; } diff --git a/instrument/IDFs_for_UNIT_TESTING/UnitTestFacilities.xml b/instrument/IDFs_for_UNIT_TESTING/UnitTestFacilities.xml index 84a5d4a155a9e18dad28851361897543c9efc08f..92b5782dee83595b95a7d4d122219d068612dad4 100644 --- a/instrument/IDFs_for_UNIT_TESTING/UnitTestFacilities.xml +++ b/instrument/IDFs_for_UNIT_TESTING/UnitTestFacilities.xml @@ -27,6 +27,11 @@ <livedata listener="FakeEventDataListener" address="127.0.0.1:0" /> </instrument> + <instrument name="HRPDTEST"> + <technique>Test Listener</technique> + <livedata listener="ISISKafkaEventListener" address="127.0.0.1:0" /> + </instrument> + </facility> </facilities>