Skip to content
Snippets Groups Projects
Commit 60dff7f1 authored by Martyn Gigg's avatar Martyn Gigg
Browse files

Add unit tests using faked data sources

parent 917b7caf
No related merge requests found
......@@ -71,6 +71,10 @@ if ( LIBRDKAFKA_FOUND )
inc/MantidLiveData/Kafka/IKafkaBroker.h
inc/MantidLiveData/Kafka/KafkaBroker.h
inc/MantidLiveData/Kafka/KafkaTopicSubscriber.h
src/ISIS/private/Kafka/Schema/flatbuffers/flatbuffers.h
src/ISIS/private/Kafka/Schema/det_spec_mapping_schema_generated.h
src/ISIS/private/Kafka/Schema/event_schema_generated.h
src/ISIS/private/Kafka/Schema/run_info_schema_generated.h
)
set ( TEST_FILES
${TEST_FILES}
......
......@@ -24,6 +24,7 @@ if ( CXXTEST_FOUND )
${Boost_LIBRARIES}
${POCO_LIBRARIES}
${GMOCK_LIBRARIES} )
target_include_directories ( LiveDataTest PRIVATE ../src/ )
add_dependencies ( LiveDataTest DataHandling Algorithms MDAlgorithms )
add_dependencies ( FrameworkTests LiveDataTest )
# Test data
......
......@@ -4,9 +4,13 @@
#include <cxxtest/TestSuite.h>
#include "MantidLiveData/ISIS/ISISKafkaEventStreamDecoder.h"
#include "MantidAPI/Run.h"
#include "MantidGeometry/Instrument.h"
#include "MantidKernel/ConfigService.h"
#include "MantidKernel/make_unique.h"
#include "ISISKafkaTesting.h"
#include <Poco/Path.h>
#include <thread>
class ISISKafkaEventStreamDecoderTest : public CxxTest::TestSuite {
......@@ -20,23 +24,83 @@ public:
delete suite;
}
void setUp() override {
// Temporarily change the instrument directory to the testing one
using Mantid::Kernel::ConfigService;
auto &config = ConfigService::Instance();
auto baseInstDir = config.getInstrumentDirectory();
Poco::Path testFile =
Poco::Path(baseInstDir)
.resolve("IDFs_for_UNIT_TESTING/UnitTestFacilities.xml");
// Load the test facilities file
config.updateFacilities(testFile.toString());
config.setFacility("TEST");
// Update instrument search directory
config.setString("instrumentDefinition.directory",
baseInstDir + "/IDFs_for_UNIT_TESTING");
}
void tearDown() override {
using Mantid::Kernel::ConfigService;
auto &config = ConfigService::Instance();
config.reset();
// Restore the main facilities file
config.updateFacilities();
}
//----------------------------------------------------------------------------
// Success tests
//----------------------------------------------------------------------------
void xtest_Single_Period_Event_Stream() {
void test_Single_Period_Event_Stream() {
using namespace ::testing;
using namespace ISISKafkaTesting;
using Mantid::API::Workspace_sptr;
using Mantid::DataObjects::EventWorkspace;
using namespace Mantid::LiveData;
MockKafkaBroker mockBroker;
EXPECT_CALL(mockBroker, subscribe_(_))
.Times(Exactly(3))
.WillOnce(Return(new FakeISISSinglePeriodStreamSubscriber))
.WillOnce(Return(new FakeISISSinglePeriodEventSubscriber))
.WillOnce(Return(new FakeISISRunInfoStreamSubscriber))
.WillOnce(Return(new FakeISISSpDetStreamSubscriber));
auto decoder = createTestDecoder(mockBroker);
TS_ASSERT_THROWS_NOTHING(decoder->startCapture());
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
std::this_thread::sleep_for(std::chrono::milliseconds(500));
Workspace_sptr workspace;
TS_ASSERT_THROWS_NOTHING(workspace = decoder->extractData());
TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
TS_ASSERT(!decoder->isRunning());
// Workspace checks
TS_ASSERT(workspace);
auto eventWksp = boost::dynamic_pointer_cast<EventWorkspace>(workspace);
TS_ASSERT(eventWksp);
// Metadata
TS_ASSERT(eventWksp->getInstrument());
TS_ASSERT_EQUALS("HRPDTEST", eventWksp->getInstrument()->getName());
TS_ASSERT_EQUALS(
"2016-08-31T12:07:42",
eventWksp->run().getPropertyValueAsType<std::string>("run_start"));
// Data
TS_ASSERT_EQUALS(5, eventWksp->getNumberHistograms());
// A timer-based test so only check we actually got something
TS_ASSERT(eventWksp->getNumberEvents() > 0);
}
void test_Empty_Event_Stream_Waits() {
using namespace ::testing;
using namespace ISISKafkaTesting;
MockKafkaBroker mockBroker;
EXPECT_CALL(mockBroker, subscribe_(_))
.Times(Exactly(3))
.WillOnce(Return(new FakeEmptyStreamSubscriber))
.WillOnce(Return(new FakeISISRunInfoStreamSubscriber))
.WillOnce(Return(new FakeISISSpDetStreamSubscriber));
auto decoder = createTestDecoder(mockBroker);
TS_ASSERT_THROWS_NOTHING(decoder->startCapture());
std::this_thread::sleep_for(std::chrono::milliseconds(500));
TS_ASSERT_THROWS_NOTHING(decoder->extractData());
TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
TS_ASSERT(!decoder->isRunning());
......@@ -45,15 +109,33 @@ public:
//----------------------------------------------------------------------------
// Failure tests
//----------------------------------------------------------------------------
void test_Empty_Stream_Throws_Error_On_ExtractData() {
void test_Error_In_Stream_Extraction_Throws_Error_On_ExtractData() {
using namespace ::testing;
using namespace ISISKafkaTesting;
MockKafkaBroker mockBroker;
EXPECT_CALL(mockBroker, subscribe_(_))
.Times(Exactly(3))
.WillOnce(Return(new FakeEmptyStreamSubscriber))
.WillOnce(Return(new FakeEmptyStreamSubscriber))
.WillOnce(Return(new FakeExceptionThrowingStreamSubscriber))
.WillOnce(Return(new FakeExceptionThrowingStreamSubscriber))
.WillOnce(Return(new FakeExceptionThrowingStreamSubscriber));
auto decoder = createTestDecoder(mockBroker);
TS_ASSERT_THROWS_NOTHING(decoder->startCapture());
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
TS_ASSERT_THROWS(decoder->extractData(), std::runtime_error);
TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
TS_ASSERT(!decoder->isRunning());
}
void test_Empty_SpDet_Stream_Throws_Error_On_ExtractData() {
using namespace ::testing;
using namespace ISISKafkaTesting;
MockKafkaBroker mockBroker;
EXPECT_CALL(mockBroker, subscribe_(_))
.Times(Exactly(3))
.WillOnce(Return(new FakeISISSinglePeriodEventSubscriber))
.WillOnce(Return(new FakeISISRunInfoStreamSubscriber))
.WillOnce(Return(new FakeEmptyStreamSubscriber));
auto decoder = createTestDecoder(mockBroker);
TS_ASSERT_THROWS_NOTHING(decoder->startCapture());
......@@ -63,6 +145,24 @@ public:
TS_ASSERT(!decoder->isRunning());
}
void test_Empty_RunInfo_Stream_Throws_Error_On_ExtractData() {
using namespace ::testing;
using namespace ISISKafkaTesting;
MockKafkaBroker mockBroker;
EXPECT_CALL(mockBroker, subscribe_(_))
.Times(Exactly(3))
.WillOnce(Return(new FakeISISSinglePeriodEventSubscriber))
.WillOnce(Return(new FakeEmptyStreamSubscriber))
.WillOnce(Return(new FakeISISSpDetStreamSubscriber));
auto decoder = createTestDecoder(mockBroker);
TS_ASSERT_THROWS_NOTHING(decoder->startCapture());
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
TS_ASSERT_THROWS(decoder->extractData(), std::runtime_error);
TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
TS_ASSERT(!decoder->isRunning());
}
private:
std::unique_ptr<Mantid::LiveData::ISISKafkaEventStreamDecoder>
createTestDecoder(const Mantid::LiveData::IKafkaBroker &broker) {
......
......@@ -5,6 +5,14 @@
#include "MantidKernel/WarningSuppressions.h"
#include <gmock/gmock.h>
GCC_DIAG_OFF(conversion)
#include "ISIS/private/Kafka/Schema/det_spec_mapping_schema_generated.h"
#include "ISIS/private/Kafka/Schema/event_schema_generated.h"
#include "ISIS/private/Kafka/Schema/run_info_schema_generated.h"
GCC_DIAG_ON(conversion)
#include <ctime>
namespace ISISKafkaTesting {
// -----------------------------------------------------------------------------
......@@ -29,23 +37,40 @@ public:
};
// -----------------------------------------------------------------------------
// Fake stream to provide empty data to tests
// Fake stream to raise error to tests
// -----------------------------------------------------------------------------
class FakeExceptionThrowingStreamSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
void consumeMessage(std::string *buffer) override {
buffer->clear();
throw std::runtime_error("FakeExceptionThrowingStreamSubscriber");
}
};
// -----------------------------------------------------------------------------
// Fake stream to provide empty stream to client
// -----------------------------------------------------------------------------
class FakeEmptyStreamSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
bool consumeMessage(std::string *) override { return false; }
void consumeMessage(std::string *buffer) override { buffer->clear(); }
};
// -----------------------------------------------------------------------------
// Fake ISIS event stream to provide event data
// -----------------------------------------------------------------------------
class FakeISISSinglePeriodStreamSubscriber
class FakeISISSinglePeriodEventSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
bool consumeMessage(std::string *) override { return true; }
void consumeMessage(std::string *buffer) override { assert(buffer); }
private:
std::vector<int32_t> m_spec = {};
std::vector<float> m_tof = {};
};
// -----------------------------------------------------------------------------
......@@ -55,7 +80,30 @@ class FakeISISRunInfoStreamSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
bool consumeMessage(std::string *) override { return true; }
void consumeMessage(std::string *buffer) override {
assert(buffer);
// Convert date to time_t
std::tm tmb;
strptime(m_startTime.c_str(), "%Y-%m-%dT%H:%M:%S", &tmb);
uint64_t startTime = static_cast<uint64_t>(std::mktime(&tmb));
// Serialize data with flatbuffers
flatbuffers::FlatBufferBuilder builder;
auto runInfo = ISISDAE::CreateRunInfo(builder, startTime, m_runNumber,
builder.CreateString(m_instName),
m_streamOffset);
builder.Finish(runInfo);
// Copy to provided buffer
buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
builder.GetSize());
}
private:
std::string m_startTime = "2016-08-31T12:07:42";
int32_t m_runNumber = 1000;
std::string m_instName = "HRPDTEST";
int64_t m_streamOffset = 0;
};
// -----------------------------------------------------------------------------
......@@ -65,7 +113,25 @@ class FakeISISSpDetStreamSubscriber
: public Mantid::LiveData::IKafkaStreamSubscriber {
public:
void subscribe() override {}
bool consumeMessage(std::string *) override { return true; }
void consumeMessage(std::string *buffer) override {
assert(buffer);
// Serialize data with flatbuffers
flatbuffers::FlatBufferBuilder builder;
auto specVector = builder.CreateVector(m_spec);
auto detIdsVector = builder.CreateVector(m_detid);
auto spdet = ISISDAE::CreateSpectraDetectorMapping(builder, specVector,
detIdsVector);
builder.Finish(spdet);
// Copy to provided buffer
buffer->assign(reinterpret_cast<const char *>(builder.GetBufferPointer()),
builder.GetSize());
}
private:
std::vector<int32_t> m_spec = {1, 2, 3, 4, 5};
// These match the detector numbers in HRPDTEST_Definition.xml
std::vector<int32_t> m_detid = {1001, 1002, 1100, 901000, 10100};
};
}
......
......@@ -27,6 +27,11 @@
<livedata listener="FakeEventDataListener" address="127.0.0.1:0" />
</instrument>
<instrument name="HRPDTEST">
<technique>Test Listener</technique>
<livedata listener="ISISKafkaEventListener" address="127.0.0.1:0" />
</instrument>
</facility>
</facilities>
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment