From d911a2a54acb1efd5e2ecee6e304e206f5948581 Mon Sep 17 00:00:00 2001
From: John Chilton <jmchilton@gmail.com>
Date: Mon, 9 Mar 2015 00:54:39 -0400
Subject: [PATCH] Cleanup MQ resources in tests.

Go further than runtime and ensure threads join so that they cannot interfer with other tests.
---
 pulsar/client/manager.py      | 9 +++++++--
 pulsar/client/test/check.py   | 7 +++++++
 pulsar/core.py                | 3 +++
 pulsar/messaging/__init__.py  | 5 +++++
 pulsar/messaging/bind_amqp.py | 6 ++++--
 test/integration_test.py      | 8 ++++----
 test/test_utils.py            | 1 +
 7 files changed, 31 insertions(+), 8 deletions(-)

diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py
index 25e28ee5..120a6531 100644
--- a/pulsar/client/manager.py
+++ b/pulsar/client/manager.py
@@ -68,7 +68,7 @@ class ClientManager(object):
         job_manager_interface = job_manager_interface_class(**job_manager_interface_args)
         return self.client_class(destination_params, job_id, job_manager_interface, **self.extra_client_kwds)
 
-    def shutdown(self):
+    def shutdown(self, ensure_cleanup=False):
         pass
 
 
@@ -95,6 +95,9 @@ class MessageQueueClientManager(object):
                 return
 
             def callback_wrapper(body, message):
+                if not self.active:
+                    message.requeue()
+
                 try:
                     if "job_id" in body:
                         job_id = body["job_id"]
@@ -120,8 +123,10 @@ class MessageQueueClientManager(object):
             thread.start()
             self.callback_thread = thread
 
-    def shutdown(self):
+    def shutdown(self, ensure_cleanup=False):
         self.active = False
+        if ensure_cleanup:
+            self.callback_thread.join()
 
     def __nonzero__(self):
         return self.active
diff --git a/pulsar/client/test/check.py b/pulsar/client/test/check.py
index 5816f8d7..715e3a8c 100644
--- a/pulsar/client/test/check.py
+++ b/pulsar/client/test/check.py
@@ -80,6 +80,7 @@ class MockTool(object):
 
 
 def run(options):
+    waiter = None
     try:
         temp_directory = tempfile.mkdtemp()
         temp_index_dir = os.path.join(temp_directory, "idx", "bwa")
@@ -196,6 +197,8 @@ def run(options):
             traceback.print_exc()
         raise
     finally:
+        if waiter is not None:
+            waiter.shutdown()
         shutil.rmtree(temp_directory)
 
 
@@ -239,6 +242,10 @@ class Waiter(object):
 
         return final_status
 
+    def shutdown(self):
+        client_manager = self.client_manager
+        client_manager.shutdown(ensure_cleanup=True)
+
 
 def __assert_contents(path, expected_contents, pulsar_state):
     if not os.path.exists(path):
diff --git a/pulsar/core.py b/pulsar/core.py
index 30ba82ba..0a3aa81d 100644
--- a/pulsar/core.py
+++ b/pulsar/core.py
@@ -41,6 +41,7 @@ class PulsarApp(object):
         self.__setup_managers(conf)
         self.__setup_file_cache(conf)
         self.__setup_bind_to_message_queue(conf)
+        self.ensure_cleanup = conf.get("ensure_cleanup", False)
 
     def shutdown(self):
         for manager in self.managers.values():
@@ -51,6 +52,8 @@ class PulsarApp(object):
 
         if self.__queue_state:
             self.__queue_state.deactivate()
+            if self.ensure_cleanup:
+                self.__queue_state.join()
 
     def __setup_bind_to_message_queue(self, conf):
         message_queue_url = conf.get("message_queue_url", None)
diff --git a/pulsar/messaging/__init__.py b/pulsar/messaging/__init__.py
index 024d0173..c000c48c 100644
--- a/pulsar/messaging/__init__.py
+++ b/pulsar/messaging/__init__.py
@@ -21,6 +21,7 @@ class QueueState(object):
     """
     def __init__(self):
         self.active = True
+        self.threads = []
 
     def deactivate(self):
         self.active = False
@@ -28,6 +29,10 @@ class QueueState(object):
     def __nonzero__(self):
         return self.active
 
+    def join(self):
+        for t in self.threads:
+            t.join()
+
 
 def __id_to_connection_string(app, queue_id):
     return queue_id
diff --git a/pulsar/messaging/bind_amqp.py b/pulsar/messaging/bind_amqp.py
index 9eb8ba35..3403d9e9 100644
--- a/pulsar/messaging/bind_amqp.py
+++ b/pulsar/messaging/bind_amqp.py
@@ -45,8 +45,10 @@ def bind_manager_to_queue(manager, queue_state, connection_string, conf):
         log.info("Finished consuming %s queue - no more messages will be processed." % (name))
 
     if conf.get("message_queue_consume", True):
-        start_setup_consumer(pulsar_exchange, functools.partial(drain, process_setup_messages, "setup"))
-        start_kill_consumer(pulsar_exchange, functools.partial(drain, process_kill_messages, "kill"))
+        setup_thread = start_setup_consumer(pulsar_exchange, functools.partial(drain, process_setup_messages, "setup"))
+        kill_thread = start_kill_consumer(pulsar_exchange, functools.partial(drain, process_kill_messages, "kill"))
+        if hasattr(queue_state, "threads"):
+            queue_state.threads.extend([setup_thread, kill_thread])
 
     # TODO: Think through job recovery, jobs shouldn't complete until after bind
     # has occurred.
diff --git a/test/integration_test.py b/test/integration_test.py
index c8845c56..37f3ba8e 100644
--- a/test/integration_test.py
+++ b/test/integration_test.py
@@ -120,22 +120,22 @@ class IntegrationTests(BaseIntegrationTest):
     @skip_unless_environ("PULSAR_TEST_KEY")
     def test_integration_scp(self):
         self._run(
-            app_conf=dict(message_queue_url="memory://test1"),
+            app_conf=dict(message_queue_url="memory://test2"),
             private_token=None,
             default_file_action="remote_scp_transfer",
             local_setup=True,
-            manager_url="memory://test1",
+            manager_url="memory://test2",
             **self.default_kwargs
         )
 
     @skip_unless_environ("PULSAR_TEST_KEY")
     def test_integration_rsync(self):
         self._run(
-            app_conf=dict(message_queue_url="memory://test1"),
+            app_conf=dict(message_queue_url="memory://test3"),
             private_token=None,
             default_file_action="remote_rsync_transfer",
             local_setup=True,
-            manager_url="memory://test1",
+            manager_url="memory://test3",
             **self.default_kwargs
         )
 
diff --git a/test/test_utils.py b/test/test_utils.py
index 3edf53bc..4d7f5aae 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -233,6 +233,7 @@ def test_pulsar_app(global_conf={}, app_conf={}, test_conf={}):
     try:
         app_conf["staging_directory"] = staging_directory
         app_conf["file_cache_dir"] = cache_directory
+        app_conf["ensure_cleanup"] = True
         from pulsar.web.wsgi import app_factory
 
         app = app_factory(global_conf, **app_conf)
-- 
GitLab