Newer
Older
#ifndef MANTID_LIVEDATA_ISISKAFKAEVENTSTREAMDECODERTEST_H_
#define MANTID_LIVEDATA_ISISKAFKAEVENTSTREAMDECODERTEST_H_
#include <cxxtest/TestSuite.h>
#include "MantidAPI/WorkspaceGroup.h"
#include "MantidGeometry/Instrument.h"
#include "MantidKernel/ConfigService.h"
#include "MantidKernel/TimeSeriesProperty.h"
#include "MantidKernel/make_unique.h"
#include "MantidLiveData/Kafka/KafkaEventStreamDecoder.h"
class KafkaEventStreamDecoderTest : public CxxTest::TestSuite {
public:
// This pair of boilerplate methods prevent the suite being created statically
// This means the constructor isn't called when running other tests
static KafkaEventStreamDecoderTest *createSuite() {
return new KafkaEventStreamDecoderTest();
static void destroySuite(KafkaEventStreamDecoderTest *suite) { delete suite; }
void setUp() override {
// Temporarily change the instrument directory to the testing one
using Mantid::Kernel::ConfigService;
auto &config = ConfigService::Instance();
auto baseInstDir = config.getInstrumentDirectory();
Poco::Path testFile =
Poco::Path(baseInstDir)
.resolve("IDFs_for_UNIT_TESTING/UnitTestFacilities.xml");
// Load the test facilities file
config.updateFacilities(testFile.toString());
config.setFacility("TEST");
// Update instrument search directory
config.setString("instrumentDefinition.directory",
baseInstDir + "/IDFs_for_UNIT_TESTING");
}
void tearDown() override {
using Mantid::Kernel::ConfigService;
auto &config = ConfigService::Instance();
config.reset();
// Restore the main facilities file
config.updateFacilities();
}
//----------------------------------------------------------------------------
// Success tests
//----------------------------------------------------------------------------
void test_Single_Period_Event_Stream() {
using namespace ::testing;
using namespace ISISKafkaTesting;
using Mantid::API::Workspace_sptr;
using Mantid::DataObjects::EventWorkspace;
using namespace Mantid::LiveData;
auto mockBroker = std::make_shared<MockKafkaBroker>();
.WillOnce(Return(new FakeISISEventSubscriber(1)))
.WillOnce(Return(new FakeISISRunInfoStreamSubscriber(1)))
.WillOnce(Return(new FakeISISSpDetStreamSubscriber));
auto decoder = createTestDecoder(mockBroker);
TSM_ASSERT("Decoder should not have create data buffers yet",
!decoder->hasData());
startCapturing(*decoder, 1);
// Checks
TSM_ASSERT("Decoder's data buffers should be created now",
decoder->hasData());
TS_ASSERT_THROWS_NOTHING(workspace = decoder->extractData());
TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
TS_ASSERT(!decoder->isCapturing());
TSM_ASSERT("Expected non-null workspace pointer from extractData()",
workspace);
auto eventWksp = boost::dynamic_pointer_cast<EventWorkspace>(workspace);
"Expected an EventWorkspace from extractData(). Found something else",
eventWksp);
checkWorkspaceMetadata(*eventWksp);
checkWorkspaceEventData(*eventWksp);
void test_Multiple_Period_Event_Stream() {
using namespace ::testing;
using namespace ISISKafkaTesting;
using Mantid::API::Workspace_sptr;
using Mantid::API::WorkspaceGroup;
using Mantid::DataObjects::EventWorkspace;
using namespace Mantid::LiveData;
auto mockBroker = std::make_shared<MockKafkaBroker>();
.WillOnce(Return(new FakeISISEventSubscriber(2)))
.WillOnce(Return(new FakeISISRunInfoStreamSubscriber(2)))
.WillOnce(Return(new FakeISISSpDetStreamSubscriber));
auto decoder = createTestDecoder(mockBroker);
// Need 2 full loops to get both periods
startCapturing(*decoder, 2);
TS_ASSERT_THROWS_NOTHING(workspace = decoder->extractData());
TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
TS_ASSERT(!decoder->isCapturing());
// --- Workspace checks ---
TSM_ASSERT("Expected non-null workspace pointer from extractData()",
workspace);
auto group = boost::dynamic_pointer_cast<WorkspaceGroup>(workspace);
TSM_ASSERT(
"Expected a WorkspaceGroup from extractData(). Found something else.",
group);
TS_ASSERT_EQUALS(2, group->size());
for (size_t i = 0; i < 2; ++i) {
auto eventWksp =
boost::dynamic_pointer_cast<EventWorkspace>(group->getItem(i));
TSM_ASSERT("Expected an EventWorkspace for each member of the group",
eventWksp);
checkWorkspaceMetadata(*eventWksp);
checkWorkspaceEventData(*eventWksp);
}
void test_Sample_Log_From_Event_Stream() {
using namespace ::testing;
using namespace ISISKafkaTesting;
using Mantid::API::Workspace_sptr;
using Mantid::DataObjects::EventWorkspace;
using namespace Mantid::LiveData;
auto mockBroker = std::make_shared<MockKafkaBroker>();
EXPECT_CALL(*mockBroker, subscribe_(_, _))
.WillOnce(Return(new FakeSampleEnvironmentSubscriber))
.WillOnce(Return(new FakeISISRunInfoStreamSubscriber(1)))
.WillOnce(Return(new FakeISISSpDetStreamSubscriber));
auto decoder = createTestDecoder(mockBroker);
TSM_ASSERT("Decoder should not have create data buffers yet",
!decoder->hasData());
startCapturing(*decoder, 1);
Workspace_sptr workspace;
TS_ASSERT_THROWS_NOTHING(workspace = decoder->extractData());
TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
TS_ASSERT(!decoder->isCapturing());
// -- Workspace checks --
TSM_ASSERT("Expected non-null workspace pointer from extractData()",
workspace);
auto eventWksp = boost::dynamic_pointer_cast<EventWorkspace>(workspace);
TSM_ASSERT(
"Expected an EventWorkspace from extractData(). Found something else",
eventWksp);
checkWorkspaceLogData(*eventWksp);
}
void test_Empty_Event_Stream_Waits() {
using namespace ::testing;
using namespace ISISKafkaTesting;
auto mockBroker = std::make_shared<MockKafkaBroker>();
.Times(Exactly(3))
.WillOnce(Return(new FakeEmptyStreamSubscriber))
.WillOnce(Return(new FakeISISRunInfoStreamSubscriber(1)))
.WillOnce(Return(new FakeISISSpDetStreamSubscriber));
auto decoder = createTestDecoder(mockBroker);
TS_ASSERT_THROWS_NOTHING(decoder->extractData());
TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
TS_ASSERT(!decoder->isCapturing());
}
//----------------------------------------------------------------------------
// Failure tests
//----------------------------------------------------------------------------
void test_Error_In_Stream_Extraction_Throws_Error_On_ExtractData() {
using namespace ::testing;
using namespace ISISKafkaTesting;
auto mockBroker = std::make_shared<MockKafkaBroker>();
.WillOnce(Return(new FakeExceptionThrowingStreamSubscriber))
.WillOnce(Return(new FakeExceptionThrowingStreamSubscriber))
.WillOnce(Return(new FakeExceptionThrowingStreamSubscriber));
auto decoder = createTestDecoder(mockBroker);
TS_ASSERT_THROWS(decoder->extractData(), std::runtime_error);
TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
TS_ASSERT(!decoder->isCapturing());
}
void test_Empty_SpDet_Stream_Throws_Error_On_ExtractData() {
using namespace ::testing;
using namespace ISISKafkaTesting;
auto mockBroker = std::make_shared<MockKafkaBroker>();
.WillOnce(Return(new FakeISISEventSubscriber(1)))
.WillOnce(Return(new FakeISISRunInfoStreamSubscriber(1)))
.WillOnce(Return(new FakeEmptyStreamSubscriber));
auto decoder = createTestDecoder(mockBroker);
TS_ASSERT_THROWS(decoder->extractData(), std::runtime_error);
TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
TS_ASSERT(!decoder->isCapturing());
void test_Empty_RunInfo_Stream_Throws_Error_On_ExtractData() {
using namespace ::testing;
using namespace ISISKafkaTesting;
auto mockBroker = std::make_shared<MockKafkaBroker>();
.WillOnce(Return(new FakeISISEventSubscriber(1)))
.WillOnce(Return(new FakeEmptyStreamSubscriber))
.WillOnce(Return(new FakeISISSpDetStreamSubscriber));
auto decoder = createTestDecoder(mockBroker);
TS_ASSERT_THROWS(decoder->extractData(), std::runtime_error);
TS_ASSERT_THROWS_NOTHING(decoder->stopCapture());
TS_ASSERT(!decoder->isCapturing());
// Start decoding and wait until we have gathered enough data to test
void startCapturing(Mantid::LiveData::KafkaEventStreamDecoder &decoder,
uint8_t maxIterations) {
// Register callback to know when a whole loop as been iterated through
m_niterations = 0;
auto callback = [this, maxIterations]() {
{
std::unique_lock<std::mutex> lock(this->m_callbackMutex);
this->m_niterations++;
if (this->m_niterations == maxIterations) {
lock.unlock();
this->m_callbackCondition.notify_one();
}
}
};
decoder.registerIterationEndCb(callback);
decoder.registerErrorCb(callback);
TS_ASSERT_THROWS_NOTHING(decoder.startCapture());
{
std::unique_lock<std::mutex> lk(m_callbackMutex);
this->m_callbackCondition.wait(lk, [this, maxIterations]() {
return this->m_niterations == maxIterations;
});
}
}
std::unique_ptr<Mantid::LiveData::KafkaEventStreamDecoder>
createTestDecoder(std::shared_ptr<Mantid::LiveData::IKafkaBroker> broker) {
using namespace Mantid::LiveData;
return Mantid::Kernel::make_unique<KafkaEventStreamDecoder>(broker, "", "",
void
checkWorkspaceMetadata(const Mantid::DataObjects::EventWorkspace &eventWksp) {
TS_ASSERT(eventWksp.getInstrument());
TS_ASSERT_EQUALS("HRPDTEST", eventWksp.getInstrument()->getName());
TS_ASSERT_EQUALS(
"2016-08-31T12:07:42",
eventWksp.run().getPropertyValueAsType<std::string>("run_start"));
std::array<Mantid::specnum_t, 5> specs = {{1, 2, 3, 4, 5}};
std::array<Mantid::detid_t, 5> ids = {{1001, 1002, 1100, 901000, 10100}};
TS_ASSERT_EQUALS(specs.size(), eventWksp.getNumberHistograms());
for (size_t i = 0; i < eventWksp.getNumberHistograms(); ++i) {
const auto &spec = eventWksp.getSpectrum(i);
TS_ASSERT_EQUALS(specs[i], spec.getSpectrumNo());
const auto &sid = spec.getDetectorIDs();
TS_ASSERT_EQUALS(ids[i], *(sid.begin()));
}
}
void checkWorkspaceEventData(
const Mantid::DataObjects::EventWorkspace &eventWksp) {
// A timer-based test and each message contains 6 events so the total should
// be divisible by 6, but not be 0
TS_ASSERT(eventWksp.getNumberEvents() % 6 == 0);
TS_ASSERT(eventWksp.getNumberEvents() != 0);
void checkWorkspaceLogData(Mantid::DataObjects::EventWorkspace &eventWksp) {
Mantid::Kernel::TimeSeriesProperty<int32_t> *log = nullptr;
auto run = eventWksp.mutableRun();
// We should find a sample log with this name
TS_ASSERT_THROWS_NOTHING(
log = run.getTimeSeriesProperty<int32_t>("fake source"));
TS_ASSERT_EQUALS(log->firstTime().toISO8601String(),
"2017-05-24T09:29:48")
TS_ASSERT_EQUALS(log->firstValue(), 42)
private:
std::mutex m_callbackMutex;
std::condition_variable m_callbackCondition;
uint8_t m_niterations = 0;
};
#endif /* MANTID_LIVEDATA_ISISKAFKAEVENTSTREAMDECODERTEST_H_ */