Skip to content
Snippets Groups Projects
Unverified Commit 692f0d6e authored by Nick Draper's avatar Nick Draper Committed by GitHub
Browse files

Merge pull request #28428 from mantidproject/Add_mechanism_to_flush_out_kafka_failures

Emit Kafka exception msg when err callback used
parents f589c928 92455e43
No related branches found
No related tags found
No related merge requests found
...@@ -19,8 +19,11 @@ ...@@ -19,8 +19,11 @@
#include <Poco/Path.h> #include <Poco/Path.h>
#include <condition_variable> #include <condition_variable>
#include <iostream>
#include <thread> #include <thread>
using Mantid::LiveData::KafkaEventStreamDecoder;
class KafkaEventStreamDecoderTest : public CxxTest::TestSuite { class KafkaEventStreamDecoderTest : public CxxTest::TestSuite {
public: public:
// This pair of boilerplate methods prevent the suite being created statically // This pair of boilerplate methods prevent the suite being created statically
...@@ -512,38 +515,49 @@ private: ...@@ -512,38 +515,49 @@ private:
void startCapturing(Mantid::LiveData::KafkaEventStreamDecoder &decoder, void startCapturing(Mantid::LiveData::KafkaEventStreamDecoder &decoder,
uint8_t maxIterations) { uint8_t maxIterations) {
// Register callback to know when a whole loop as been iterated through // Register callback to know when a whole loop as been iterated through
m_maxIterations = maxIterations;
m_niterations = 0; m_niterations = 0;
auto callback = [this, maxIterations]() { decoder.registerIterationEndCb([this]() { this->iterationCallback(); });
this->iterationCallback(maxIterations);
}; decoder.registerErrorCb([this, &decoder]() { errCallback(decoder); });
decoder.registerIterationEndCb(callback);
decoder.registerErrorCb(callback);
TS_ASSERT_THROWS_NOTHING(decoder.startCapture()); TS_ASSERT_THROWS_NOTHING(decoder.startCapture());
continueCapturing(decoder, maxIterations); continueCapturing(decoder, maxIterations);
} }
void iterationCallback(uint8_t maxIterations) { void iterationCallback() {
std::unique_lock<std::mutex> lock(this->m_callbackMutex); std::unique_lock<std::mutex> lock(this->m_callbackMutex);
this->m_niterations++; this->m_niterations++;
if (this->m_niterations == maxIterations) { if (this->m_niterations == m_maxIterations) {
lock.unlock(); lock.unlock();
this->m_callbackCondition.notify_one(); this->m_callbackCondition.notify_one();
} }
} }
void continueCapturing(Mantid::LiveData::KafkaEventStreamDecoder &decoder, void errCallback(KafkaEventStreamDecoder &decoder) {
try {
// Get the stored exception by calling extract data again
decoder.extractData();
} catch (std::exception &e) {
// We could try to port the exception from this child thread to the main
// thread, or just print it here which is significantly easier
std::cerr << "Exception: " << e.what() << "\n";
// Always keep incrementing so we don't deadlock
iterationCallback();
}
}
void continueCapturing(KafkaEventStreamDecoder &decoder,
uint8_t maxIterations) { uint8_t maxIterations) {
m_maxIterations = maxIterations;
// Re-register callback with the (potentially) new value of maxIterations // Re-register callback with the (potentially) new value of maxIterations
auto callback = [this, maxIterations]() { decoder.registerIterationEndCb([this]() { this->iterationCallback(); });
this->iterationCallback(maxIterations); decoder.registerErrorCb([this, &decoder]() { errCallback(decoder); });
};
decoder.registerIterationEndCb(callback);
decoder.registerErrorCb(callback);
{ {
std::unique_lock<std::mutex> lk(m_callbackMutex); std::unique_lock<std::mutex> lk(m_callbackMutex);
this->m_callbackCondition.wait(lk, [this, maxIterations]() { this->m_callbackCondition.wait(
return this->m_niterations == maxIterations; lk, [this]() { return this->m_niterations == m_maxIterations; });
});
} }
} }
...@@ -574,8 +588,8 @@ private: ...@@ -574,8 +588,8 @@ private:
void checkWorkspaceEventData( void checkWorkspaceEventData(
const Mantid::DataObjects::EventWorkspace &eventWksp) { const Mantid::DataObjects::EventWorkspace &eventWksp) {
// A timer-based test and each message contains 6 events so the total should // A timer-based test and each message contains 6 events so the total
// be divisible by 6, but not be 0 // should be divisible by 6, but not be 0
TS_ASSERT(eventWksp.getNumberEvents() % 6 == 0); TS_ASSERT(eventWksp.getNumberEvents() % 6 == 0);
TS_ASSERT(eventWksp.getNumberEvents() != 0); TS_ASSERT(eventWksp.getNumberEvents() != 0);
} }
...@@ -597,4 +611,5 @@ private: ...@@ -597,4 +611,5 @@ private:
std::mutex m_callbackMutex; std::mutex m_callbackMutex;
std::condition_variable m_callbackCondition; std::condition_variable m_callbackCondition;
uint8_t m_niterations = 0; uint8_t m_niterations = 0;
std::atomic<uint8_t> m_maxIterations = 0;
}; };
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment