Skip to content
Snippets Groups Projects
Commit 92455e43 authored by David Fairbrother's avatar David Fairbrother
Browse files

Emit Kafka exception when err callback used

Previously Kafka dropped the exception and just incremented using the
same callback our wait. This will now print the exception message
associated, so we can distinguish between a successful and failed
callback in unit test logs.

I could have made it so that the exception is thrown in the main thread
by taking a s_ptr. However it quickly becomes unweildly for a simple
probe, and we only gain the stack trace - which was already lost when
extractData re throws the exception.
parent ecd6193f
No related branches found
No related tags found
No related merge requests found
......@@ -19,8 +19,11 @@
#include <Poco/Path.h>
#include <condition_variable>
#include <iostream>
#include <thread>
using Mantid::LiveData::KafkaEventStreamDecoder;
class KafkaEventStreamDecoderTest : public CxxTest::TestSuite {
public:
// This pair of boilerplate methods prevent the suite being created statically
......@@ -512,38 +515,49 @@ private:
void startCapturing(Mantid::LiveData::KafkaEventStreamDecoder &decoder,
uint8_t maxIterations) {
// Register callback to know when a whole loop as been iterated through
m_maxIterations = maxIterations;
m_niterations = 0;
auto callback = [this, maxIterations]() {
this->iterationCallback(maxIterations);
};
decoder.registerIterationEndCb(callback);
decoder.registerErrorCb(callback);
decoder.registerIterationEndCb([this]() { this->iterationCallback(); });
decoder.registerErrorCb([this, &decoder]() { errCallback(decoder); });
TS_ASSERT_THROWS_NOTHING(decoder.startCapture());
continueCapturing(decoder, maxIterations);
}
void iterationCallback(uint8_t maxIterations) {
void iterationCallback() {
std::unique_lock<std::mutex> lock(this->m_callbackMutex);
this->m_niterations++;
if (this->m_niterations == maxIterations) {
if (this->m_niterations == m_maxIterations) {
lock.unlock();
this->m_callbackCondition.notify_one();
}
}
void continueCapturing(Mantid::LiveData::KafkaEventStreamDecoder &decoder,
void errCallback(KafkaEventStreamDecoder &decoder) {
try {
// Get the stored exception by calling extract data again
decoder.extractData();
} catch (std::exception &e) {
// We could try to port the exception from this child thread to the main
// thread, or just print it here which is significantly easier
std::cerr << "Exception: " << e.what() << "\n";
// Always keep incrementing so we don't deadlock
iterationCallback();
}
}
void continueCapturing(KafkaEventStreamDecoder &decoder,
uint8_t maxIterations) {
m_maxIterations = maxIterations;
// Re-register callback with the (potentially) new value of maxIterations
auto callback = [this, maxIterations]() {
this->iterationCallback(maxIterations);
};
decoder.registerIterationEndCb(callback);
decoder.registerErrorCb(callback);
decoder.registerIterationEndCb([this]() { this->iterationCallback(); });
decoder.registerErrorCb([this, &decoder]() { errCallback(decoder); });
{
std::unique_lock<std::mutex> lk(m_callbackMutex);
this->m_callbackCondition.wait(lk, [this, maxIterations]() {
return this->m_niterations == maxIterations;
});
this->m_callbackCondition.wait(
lk, [this]() { return this->m_niterations == m_maxIterations; });
}
}
......@@ -574,8 +588,8 @@ private:
void checkWorkspaceEventData(
const Mantid::DataObjects::EventWorkspace &eventWksp) {
// A timer-based test and each message contains 6 events so the total should
// be divisible by 6, but not be 0
// A timer-based test and each message contains 6 events so the total
// should be divisible by 6, but not be 0
TS_ASSERT(eventWksp.getNumberEvents() % 6 == 0);
TS_ASSERT(eventWksp.getNumberEvents() != 0);
}
......@@ -597,4 +611,5 @@ private:
std::mutex m_callbackMutex;
std::condition_variable m_callbackCondition;
uint8_t m_niterations = 0;
std::atomic<uint8_t> m_maxIterations = 0;
};
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment