From 92455e437ad73f220d80c98974535215bd69fe5c Mon Sep 17 00:00:00 2001
From: David Fairbrother <DavidFair@users.noreply.github.com>
Date: Wed, 25 Mar 2020 11:38:31 +0000
Subject: [PATCH] Emit Kafka exception when err callback used

Previously Kafka dropped the exception and just incremented using the
same callback our wait. This will now print the exception message
associated, so we can distinguish between a successful and failed
callback in unit test logs.

I could have made it so that the exception is thrown in the main thread
by taking a s_ptr. However it quickly becomes unweildly for a simple
probe, and we only gain the stack trace - which was already lost when
extractData re throws the exception.
---
 .../test/KafkaEventStreamDecoderTest.h        | 51 ++++++++++++-------
 1 file changed, 33 insertions(+), 18 deletions(-)

diff --git a/Framework/LiveData/test/KafkaEventStreamDecoderTest.h b/Framework/LiveData/test/KafkaEventStreamDecoderTest.h
index 244435a658b..a50564e62c1 100644
--- a/Framework/LiveData/test/KafkaEventStreamDecoderTest.h
+++ b/Framework/LiveData/test/KafkaEventStreamDecoderTest.h
@@ -19,8 +19,11 @@
 
 #include <Poco/Path.h>
 #include <condition_variable>
+#include <iostream>
 #include <thread>
 
+using Mantid::LiveData::KafkaEventStreamDecoder;
+
 class KafkaEventStreamDecoderTest : public CxxTest::TestSuite {
 public:
   // This pair of boilerplate methods prevent the suite being created statically
@@ -512,38 +515,49 @@ private:
   void startCapturing(Mantid::LiveData::KafkaEventStreamDecoder &decoder,
                       uint8_t maxIterations) {
     // Register callback to know when a whole loop as been iterated through
+    m_maxIterations = maxIterations;
     m_niterations = 0;
-    auto callback = [this, maxIterations]() {
-      this->iterationCallback(maxIterations);
-    };
-    decoder.registerIterationEndCb(callback);
-    decoder.registerErrorCb(callback);
+    decoder.registerIterationEndCb([this]() { this->iterationCallback(); });
+
+    decoder.registerErrorCb([this, &decoder]() { errCallback(decoder); });
     TS_ASSERT_THROWS_NOTHING(decoder.startCapture());
     continueCapturing(decoder, maxIterations);
   }
 
-  void iterationCallback(uint8_t maxIterations) {
+  void iterationCallback() {
     std::unique_lock<std::mutex> lock(this->m_callbackMutex);
     this->m_niterations++;
-    if (this->m_niterations == maxIterations) {
+    if (this->m_niterations == m_maxIterations) {
       lock.unlock();
       this->m_callbackCondition.notify_one();
     }
   }
 
-  void continueCapturing(Mantid::LiveData::KafkaEventStreamDecoder &decoder,
+  void errCallback(KafkaEventStreamDecoder &decoder) {
+    try {
+      // Get the stored exception by calling extract data again
+      decoder.extractData();
+    } catch (std::exception &e) {
+      // We could try to port the exception from this child thread to the main
+      // thread, or just print it here which is significantly easier
+      std::cerr << "Exception: " << e.what() << "\n";
+      // Always keep incrementing so we don't deadlock
+      iterationCallback();
+    }
+  }
+
+  void continueCapturing(KafkaEventStreamDecoder &decoder,
                          uint8_t maxIterations) {
+    m_maxIterations = maxIterations;
+
     // Re-register callback with the (potentially) new value of maxIterations
-    auto callback = [this, maxIterations]() {
-      this->iterationCallback(maxIterations);
-    };
-    decoder.registerIterationEndCb(callback);
-    decoder.registerErrorCb(callback);
+    decoder.registerIterationEndCb([this]() { this->iterationCallback(); });
+    decoder.registerErrorCb([this, &decoder]() { errCallback(decoder); });
+
     {
       std::unique_lock<std::mutex> lk(m_callbackMutex);
-      this->m_callbackCondition.wait(lk, [this, maxIterations]() {
-        return this->m_niterations == maxIterations;
-      });
+      this->m_callbackCondition.wait(
+          lk, [this]() { return this->m_niterations == m_maxIterations; });
     }
   }
 
@@ -574,8 +588,8 @@ private:
 
   void checkWorkspaceEventData(
       const Mantid::DataObjects::EventWorkspace &eventWksp) {
-    // A timer-based test and each message contains 6 events so the total should
-    // be divisible by 6, but not be 0
+    // A timer-based test and each message contains 6 events so the total
+    // should be divisible by 6, but not be 0
     TS_ASSERT(eventWksp.getNumberEvents() % 6 == 0);
     TS_ASSERT(eventWksp.getNumberEvents() != 0);
   }
@@ -597,4 +611,5 @@ private:
   std::mutex m_callbackMutex;
   std::condition_variable m_callbackCondition;
   uint8_t m_niterations = 0;
+  std::atomic<uint8_t> m_maxIterations = 0;
 };
-- 
GitLab