Skip to content
Snippets Groups Projects
Commit 6b6a42b0 authored by David Fairbrother's avatar David Fairbrother
Browse files

Add mechanism to print exception in Kafka test thread

Adds a mechanism (which was added to KafkaEventStreamDecoder
incorrectly) to print the exception to help flush out our flakey tests.

See commit 92455e43 - which is the same changes in
KafkaEventStreamDecoderTest for details
parent d90e135d
No related branches found
No related tags found
No related merge requests found
......@@ -105,38 +105,49 @@ private:
void startCapturing(Mantid::LiveData::KafkaHistoStreamDecoder &decoder,
uint8_t maxIterations) {
// Register callback to know when a whole loop as been iterated through
m_maxIterations = maxIterations;
m_niterations = 0;
auto callback = [this, maxIterations]() {
this->iterationCallback(maxIterations);
};
decoder.registerIterationEndCb(callback);
decoder.registerErrorCb(callback);
decoder.registerIterationEndCb([this]() { this->iterationCallback(); });
decoder.registerErrorCb([this, &decoder]() { errCallback(decoder); });
TS_ASSERT_THROWS_NOTHING(decoder.startCapture());
continueCapturing(decoder, maxIterations);
}
void iterationCallback(uint8_t maxIterations) {
void iterationCallback() {
std::unique_lock<std::mutex> lock(this->m_callbackMutex);
this->m_niterations++;
if (this->m_niterations == maxIterations) {
if (this->m_niterations == m_maxIterations) {
lock.unlock();
this->m_callbackCondition.notify_one();
}
}
void continueCapturing(Mantid::LiveData::KafkaHistoStreamDecoder &decoder,
void errCallback(KafkaHistoStreamDecoder &decoder) {
try {
// Get the stored exception by calling extract data again
decoder.extractData();
} catch (std::exception &e) {
// We could try to port the exception from this child thread to the main
// thread, or just print it here which is significantly easier
std::cerr << "Exception: " << e.what() << "\n";
// Always keep incrementing so we don't deadlock
iterationCallback();
}
}
void continueCapturing(KafkaHistoStreamDecoder &decoder,
uint8_t maxIterations) {
m_maxIterations = maxIterations;
// Re-register callback with the (potentially) new value of maxIterations
auto callback = [this, maxIterations]() {
this->iterationCallback(maxIterations);
};
decoder.registerIterationEndCb(callback);
decoder.registerErrorCb(callback);
decoder.registerIterationEndCb([this]() { this->iterationCallback(); });
decoder.registerErrorCb([this, &decoder]() { errCallback(decoder); });
{
std::unique_lock<std::mutex> lk(m_callbackMutex);
this->m_callbackCondition.wait(lk, [this, maxIterations]() {
return this->m_niterations == maxIterations;
});
this->m_callbackCondition.wait(
lk, [this]() { return this->m_niterations == m_maxIterations; });
}
}
......@@ -183,4 +194,5 @@ private:
std::mutex m_callbackMutex;
std::condition_variable m_callbackCondition;
uint8_t m_niterations = 0;
std::atomic<uint8_t> m_maxIterations = 0;
};
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment