diff --git a/Framework/LiveData/test/KafkaEventStreamDecoderTest.h b/Framework/LiveData/test/KafkaEventStreamDecoderTest.h index 244435a658bf8c6b988ef5b672cca4226e221a57..a50564e62c1e0254636cabdb7afe59a39da8705c 100644 --- a/Framework/LiveData/test/KafkaEventStreamDecoderTest.h +++ b/Framework/LiveData/test/KafkaEventStreamDecoderTest.h @@ -19,8 +19,11 @@ #include <Poco/Path.h> #include <condition_variable> +#include <iostream> #include <thread> +using Mantid::LiveData::KafkaEventStreamDecoder; + class KafkaEventStreamDecoderTest : public CxxTest::TestSuite { public: // This pair of boilerplate methods prevent the suite being created statically @@ -512,38 +515,49 @@ private: void startCapturing(Mantid::LiveData::KafkaEventStreamDecoder &decoder, uint8_t maxIterations) { // Register callback to know when a whole loop as been iterated through + m_maxIterations = maxIterations; m_niterations = 0; - auto callback = [this, maxIterations]() { - this->iterationCallback(maxIterations); - }; - decoder.registerIterationEndCb(callback); - decoder.registerErrorCb(callback); + decoder.registerIterationEndCb([this]() { this->iterationCallback(); }); + + decoder.registerErrorCb([this, &decoder]() { errCallback(decoder); }); TS_ASSERT_THROWS_NOTHING(decoder.startCapture()); continueCapturing(decoder, maxIterations); } - void iterationCallback(uint8_t maxIterations) { + void iterationCallback() { std::unique_lock<std::mutex> lock(this->m_callbackMutex); this->m_niterations++; - if (this->m_niterations == maxIterations) { + if (this->m_niterations == m_maxIterations) { lock.unlock(); this->m_callbackCondition.notify_one(); } } - void continueCapturing(Mantid::LiveData::KafkaEventStreamDecoder &decoder, + void errCallback(KafkaEventStreamDecoder &decoder) { + try { + // Get the stored exception by calling extract data again + decoder.extractData(); + } catch (std::exception &e) { + // We could try to port the exception from this child thread to the main + // thread, or just print it here which is significantly easier + std::cerr << "Exception: " << e.what() << "\n"; + // Always keep incrementing so we don't deadlock + iterationCallback(); + } + } + + void continueCapturing(KafkaEventStreamDecoder &decoder, uint8_t maxIterations) { + m_maxIterations = maxIterations; + // Re-register callback with the (potentially) new value of maxIterations - auto callback = [this, maxIterations]() { - this->iterationCallback(maxIterations); - }; - decoder.registerIterationEndCb(callback); - decoder.registerErrorCb(callback); + decoder.registerIterationEndCb([this]() { this->iterationCallback(); }); + decoder.registerErrorCb([this, &decoder]() { errCallback(decoder); }); + { std::unique_lock<std::mutex> lk(m_callbackMutex); - this->m_callbackCondition.wait(lk, [this, maxIterations]() { - return this->m_niterations == maxIterations; - }); + this->m_callbackCondition.wait( + lk, [this]() { return this->m_niterations == m_maxIterations; }); } } @@ -574,8 +588,8 @@ private: void checkWorkspaceEventData( const Mantid::DataObjects::EventWorkspace &eventWksp) { - // A timer-based test and each message contains 6 events so the total should - // be divisible by 6, but not be 0 + // A timer-based test and each message contains 6 events so the total + // should be divisible by 6, but not be 0 TS_ASSERT(eventWksp.getNumberEvents() % 6 == 0); TS_ASSERT(eventWksp.getNumberEvents() != 0); } @@ -597,4 +611,5 @@ private: std::mutex m_callbackMutex; std::condition_variable m_callbackCondition; uint8_t m_niterations = 0; + std::atomic<uint8_t> m_maxIterations = 0; };