Skip to content
Snippets Groups Projects
Commit cb057396 authored by John Chilton's avatar John Chilton
Browse files

Allow managers/app shutdown to define timeout.

Useful for debugging deadlock situtations.
parent 651dd06b
No related branches found
No related tags found
No related merge requests found
......@@ -43,17 +43,17 @@ class PulsarApp(object):
self.__setup_bind_to_message_queue(conf)
self.ensure_cleanup = conf.get("ensure_cleanup", False)
def shutdown(self):
def shutdown(self, timeout=None):
for manager in self.managers.values():
try:
manager.shutdown()
manager.shutdown(timeout)
except Exception:
pass
if self.__queue_state:
self.__queue_state.deactivate()
if self.ensure_cleanup:
self.__queue_state.join()
self.__queue_state.join(timeout)
def __setup_bind_to_message_queue(self, conf):
message_queue_url = conf.get("message_queue_url", None)
......
......@@ -107,13 +107,13 @@ class ManagerProxy(object):
def kill(self, *args, **kwargs):
return self._proxied_manager.kill(*args, **kwargs)
def shutdown(self):
def shutdown(self, timeout=None):
""" Optional. """
try:
shutdown_method = self._proxied_manager.shutdown
except AttributeError:
return
shutdown_method()
shutdown_method(timeout)
def job_directory(self, *args, **kwargs):
return self._proxied_manager.job_directory(*args, **kwargs)
......
......@@ -20,10 +20,10 @@ class BaseDrmaaManager(ExternalBaseManager):
drmaa_session_factory = drmaa_session_factory_class()
self.drmaa_session = drmaa_session_factory.get()
def shutdown(self):
def shutdown(self, timeout=None):
try:
super(BaseDrmaaManager, self).shutdown()
except:
super(BaseDrmaaManager, self).shutdown(timeout)
except Exception:
pass
self.drmaa_session.close()
......
......@@ -57,11 +57,13 @@ class QueueManager(Manager):
if command_line:
self.work_queue.put((RUN, (job_id, command_line)))
def shutdown(self):
def shutdown(self, timeout=None):
for i in range(len(self.work_threads)):
self.work_queue.put((STOP_SIGNAL, None))
for worker in self.work_threads:
worker.join()
worker.join(timeout)
if worker.isAlive():
log.warn("Failed to stop worker thread [%s]" % worker)
def run_next(self):
"""
......
......@@ -155,13 +155,13 @@ class StatefulManagerProxy(ManagerProxy):
self.__state_change_callback(final_status, job_id)
new_thread_for_manager(self, "postprocess", do_postprocess, daemon=False)
def shutdown(self):
def shutdown(self, timeout=None):
if self.__monitor:
try:
self.__monitor.shutdown()
self.__monitor.shutdown(timeout)
except Exception:
log.exception("Failed to shutdown job monitor for manager %s" % self.name)
super(StatefulManagerProxy, self).shutdown()
super(StatefulManagerProxy, self).shutdown(timeout)
def __recover_active_jobs(self):
recover_method = getattr(self._proxied_manager, "_recover_active_job", None)
......@@ -231,9 +231,11 @@ class ManagerMonitor(object):
thread = new_thread_for_manager(self, "monitor", self._run, True)
self.thread = thread
def shutdown(self):
def shutdown(self, timeout=None):
self.active = False
self.thread.join()
self.thread.join(timeout)
if self.thread.isAlive():
log.warn("Failed to join monitor thread [%s]" % self.thread)
def _run(self):
""" Main loop, repeatedly checking active jobs of stateful manager.
......
......@@ -3,9 +3,13 @@ message queues. Code shared between client and server can be found in
submodules of ``pulsar.client``.
"""
import logging
from ..messaging import bind_amqp
from six import itervalues
log = logging.getLogger(__name__)
def bind_app(app, queue_id, connect_ssl=None):
connection_string = __id_to_connection_string(app, queue_id)
......@@ -29,9 +33,11 @@ class QueueState(object):
def __nonzero__(self):
return self.active
def join(self):
def join(self, timeout=None):
for t in self.threads:
t.join()
t.join(timeout)
if t.isAlive():
log.warn("Failed to join thread [%s]." % t)
def __id_to_connection_string(app, queue_id):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment