Skip to content
Snippets Groups Projects
Unverified Commit 2671bcf4 authored by Gigg, Martyn Anthony's avatar Gigg, Martyn Anthony Committed by GitHub
Browse files

Merge pull request #28454 from mantidproject/Add_mechanism_to_flush_out_kafka_failures_2

Add mechanism to print exception in Kafka test thread
parents cb8d1b8c 6b6a42b0
No related branches found
No related tags found
No related merge requests found
...@@ -105,38 +105,49 @@ private: ...@@ -105,38 +105,49 @@ private:
void startCapturing(Mantid::LiveData::KafkaHistoStreamDecoder &decoder, void startCapturing(Mantid::LiveData::KafkaHistoStreamDecoder &decoder,
uint8_t maxIterations) { uint8_t maxIterations) {
// Register callback to know when a whole loop as been iterated through // Register callback to know when a whole loop as been iterated through
m_maxIterations = maxIterations;
m_niterations = 0; m_niterations = 0;
auto callback = [this, maxIterations]() { decoder.registerIterationEndCb([this]() { this->iterationCallback(); });
this->iterationCallback(maxIterations);
}; decoder.registerErrorCb([this, &decoder]() { errCallback(decoder); });
decoder.registerIterationEndCb(callback);
decoder.registerErrorCb(callback);
TS_ASSERT_THROWS_NOTHING(decoder.startCapture()); TS_ASSERT_THROWS_NOTHING(decoder.startCapture());
continueCapturing(decoder, maxIterations); continueCapturing(decoder, maxIterations);
} }
void iterationCallback(uint8_t maxIterations) { void iterationCallback() {
std::unique_lock<std::mutex> lock(this->m_callbackMutex); std::unique_lock<std::mutex> lock(this->m_callbackMutex);
this->m_niterations++; this->m_niterations++;
if (this->m_niterations == maxIterations) { if (this->m_niterations == m_maxIterations) {
lock.unlock(); lock.unlock();
this->m_callbackCondition.notify_one(); this->m_callbackCondition.notify_one();
} }
} }
void continueCapturing(Mantid::LiveData::KafkaHistoStreamDecoder &decoder, void errCallback(KafkaHistoStreamDecoder &decoder) {
try {
// Get the stored exception by calling extract data again
decoder.extractData();
} catch (std::exception &e) {
// We could try to port the exception from this child thread to the main
// thread, or just print it here which is significantly easier
std::cerr << "Exception: " << e.what() << "\n";
// Always keep incrementing so we don't deadlock
iterationCallback();
}
}
void continueCapturing(KafkaHistoStreamDecoder &decoder,
uint8_t maxIterations) { uint8_t maxIterations) {
m_maxIterations = maxIterations;
// Re-register callback with the (potentially) new value of maxIterations // Re-register callback with the (potentially) new value of maxIterations
auto callback = [this, maxIterations]() { decoder.registerIterationEndCb([this]() { this->iterationCallback(); });
this->iterationCallback(maxIterations); decoder.registerErrorCb([this, &decoder]() { errCallback(decoder); });
};
decoder.registerIterationEndCb(callback);
decoder.registerErrorCb(callback);
{ {
std::unique_lock<std::mutex> lk(m_callbackMutex); std::unique_lock<std::mutex> lk(m_callbackMutex);
this->m_callbackCondition.wait(lk, [this, maxIterations]() { this->m_callbackCondition.wait(
return this->m_niterations == maxIterations; lk, [this]() { return this->m_niterations == m_maxIterations; });
});
} }
} }
...@@ -183,4 +194,5 @@ private: ...@@ -183,4 +194,5 @@ private:
std::mutex m_callbackMutex; std::mutex m_callbackMutex;
std::condition_variable m_callbackCondition; std::condition_variable m_callbackCondition;
uint8_t m_niterations = 0; uint8_t m_niterations = 0;
std::atomic<uint8_t> m_maxIterations = 0;
}; };
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment