From d85096f34424b3807061da01334b64a49b252c1b Mon Sep 17 00:00:00 2001
From: John Chilton <jmchilton@gmail.com>
Date: Mon, 16 Mar 2015 21:05:21 -0400
Subject: [PATCH] Improved heartbeat thread handling in amqp_exchange.

 - Add a join timeout incase this is is where things are hanging.
 - Don't allow heartbeat or sleep exceptions to halt queue consumption - shutdown events need to fire to actually stop this computation.
 - Add more logging to determine if this is a problematic spot.
---
 pulsar/client/amqp_exchange.py | 12 +++++++++---
 1 file changed, 9 insertions(+), 3 deletions(-)

diff --git a/pulsar/client/amqp_exchange.py b/pulsar/client/amqp_exchange.py
index 55e9a055..792a0892 100644
--- a/pulsar/client/amqp_exchange.py
+++ b/pulsar/client/amqp_exchange.py
@@ -86,9 +86,15 @@ class PulsarExchange(object):
             except (IOError, socket.error), exc:
                 # In testing, errno is None
                 log.warning('Got %s, will retry: %s', exc.__class__.__name__, exc)
-                if heartbeat_thread:
-                    heartbeat_thread.join()
-                sleep(DEFAULT_RECONNECT_CONSUMER_WAIT)
+                try:
+                    if heartbeat_thread:
+                        heartbeat_thread.join(DEFAULT_HEARTBEAT_JOIN_TIMEOUT)
+                except Exception:
+                    log.exception("Failed to join heartbeat thread, this is bad?")
+                try:
+                    sleep(DEFAULT_RECONNECT_CONSUMER_WAIT)
+                except Exception:
+                    log.exception("Interrupted sleep while waiting to reconnect to message queue, may restart unless problems encountered.")
             except BaseException:
                 log.exception("Problem consuming queue, consumer quitting in problematic fashion!")
                 raise
-- 
GitLab