From 61ed774ae6c67e7f0d891999def7233d1572013c Mon Sep 17 00:00:00 2001
From: John Chilton <jmchilton@gmail.com>
Date: Fri, 10 Apr 2015 14:59:41 -0400
Subject: [PATCH] Revise job recovery behavior.

 - Fix bug where jobs may have been recovered before the MQ callback was set on the manager.
 - Add a new LOST job state for jobs that cannot be recovered and the Pulsar is going to abandon.
 - Fire off callback for LOST jobs.
 - Add test case to verify callback is called for LOST jobs.
---
 pulsar/core.py                   |  5 +++
 pulsar/managers/base/external.py |  7 ++-
 pulsar/managers/stateful.py      | 25 ++++++++---
 pulsar/managers/status.py        | 13 +++++-
 test/integration_test_restart.py | 76 ++++++++++++++++++++++++++------
 test/persistence_test.py         |  1 +
 6 files changed, 105 insertions(+), 22 deletions(-)

diff --git a/pulsar/core.py b/pulsar/core.py
index 4d801434..efe180a0 100644
--- a/pulsar/core.py
+++ b/pulsar/core.py
@@ -41,6 +41,7 @@ class PulsarApp(object):
         self.__setup_managers(conf)
         self.__setup_file_cache(conf)
         self.__setup_bind_to_message_queue(conf)
+        self.__recover_jobs()
         self.ensure_cleanup = conf.get("ensure_cleanup", False)
 
     def shutdown(self, timeout=None):
@@ -86,6 +87,10 @@ class PulsarApp(object):
     def __setup_managers(self, conf):
         self.managers = build_managers(self, conf)
 
+    def __recover_jobs(self):
+        for manager in self.managers.values():
+            manager.recover_active_jobs()
+
     def __setup_private_token(self, private_token):
         self.private_token = private_token
         if private_token:
diff --git a/pulsar/managers/base/external.py b/pulsar/managers/base/external.py
index 9faab67e..b753641e 100644
--- a/pulsar/managers/base/external.py
+++ b/pulsar/managers/base/external.py
@@ -7,6 +7,7 @@ from .directory import DirectoryBaseManager
 
 DEFAULT_JOB_NAME_TEMPLATE = "pulsar_$job_id"
 JOB_FILE_EXTERNAL_ID = "external_id"
+FAILED_TO_LOAD_EXTERNAL_ID = object()
 
 log = logging.getLogger(__name__)
 
@@ -61,9 +62,11 @@ class ExternalBaseManager(DirectoryBaseManager):
         return Template(self.job_name_template).safe_substitute(env)
 
     def _recover_active_job(self, job_id):
-        external_id = self._job_directory(job_id).load_metadata(JOB_FILE_EXTERNAL_ID)
-        if external_id:
+        external_id = self._job_directory(job_id).load_metadata(JOB_FILE_EXTERNAL_ID, FAILED_TO_LOAD_EXTERNAL_ID)
+        if external_id and external_id is not FAILED_TO_LOAD_EXTERNAL_ID:
             self._external_ids[job_id] = external_id
+        else:
+            raise Exception("Could not determine external ID for job_id [%s]" % job_id)
 
     def _deactivate_job(self, job_id):
         del self._external_ids[job_id]
diff --git a/pulsar/managers/stateful.py b/pulsar/managers/stateful.py
index bbd3d614..6455abda 100644
--- a/pulsar/managers/stateful.py
+++ b/pulsar/managers/stateful.py
@@ -40,15 +40,17 @@ class StatefulManagerProxy(ManagerProxy):
         self.__preprocess_action_executor = RetryActionExecutor(**preprocess_retry_action_kwds)
         self.__postprocess_action_executor = RetryActionExecutor(**postprocess_retry_action_kwds)
         self.min_polling_interval = datetime.timedelta(0, min_polling_interval)
-        self.active_jobs = ActiveJobs(manager)
-        self.__state_change_callback = lambda status, job_id: None
-        self.__recover_active_jobs()
+        self.active_jobs = ActiveJobs.from_manager(manager)
+        self.__state_change_callback = self._default_status_change_callback
         self.__monitor = None
 
     def set_state_change_callback(self, state_change_callback):
         self.__state_change_callback = state_change_callback
         self.__monitor = ManagerMonitor(self)
 
+    def _default_status_change_callback(self, status, job_id):
+        log.info("Status of job [%s] changed to [%s]. No callbacks enabled." % (status, job_id))
+
     @property
     def name(self):
         return self._proxied_manager.name
@@ -163,7 +165,7 @@ class StatefulManagerProxy(ManagerProxy):
                 log.exception("Failed to shutdown job monitor for manager %s" % self.name)
         super(StatefulManagerProxy, self).shutdown(timeout)
 
-    def __recover_active_jobs(self):
+    def recover_active_jobs(self):
         recover_method = getattr(self._proxied_manager, "_recover_active_job", None)
         if recover_method is None:
             return
@@ -173,6 +175,12 @@ class StatefulManagerProxy(ManagerProxy):
                 recover_method(job_id)
             except Exception:
                 log.exception("Failed to recover active job %s" % job_id)
+                self.__handle_recovery_problem(job_id)
+
+    def __handle_recovery_problem(self, job_id):
+        # Make sure we tell the client we have lost this job.
+        self.active_jobs.deactivate_job(job_id)
+        self.__state_change_callback(status.LOST, job_id)
 
 
 class ActiveJobs(object):
@@ -184,10 +192,15 @@ class ActiveJobs(object):
     hit disk to recover this information.
     """
 
-    def __init__(self, manager):
+    @staticmethod
+    def from_manager(manager):
         persistence_directory = manager.persistence_directory
+        manager_name = manager.name
+        return ActiveJobs(manager_name, persistence_directory)
+
+    def __init__(self, manager_name, persistence_directory):
         if persistence_directory:
-            active_job_directory = os.path.join(persistence_directory, "%s-active-jobs" % manager.name)
+            active_job_directory = os.path.join(persistence_directory, "%s-active-jobs" % manager_name)
             if not os.path.exists(active_job_directory):
                 os.makedirs(active_job_directory)
         else:
diff --git a/pulsar/managers/status.py b/pulsar/managers/status.py
index 45296f2d..996578eb 100644
--- a/pulsar/managers/status.py
+++ b/pulsar/managers/status.py
@@ -1,16 +1,27 @@
 # TODO: Make objects.
 
+# Job is staging about will be queued shortly.
 PREPROCESSING = "preprocessing"
+# Job manager has queued this job for execution.
 QUEUED = "queued"
+# Job manager believes the job is currently running.
 RUNNING = "running"
+# Job manager has finished and postprocessing ran successfully.
 COMPLETE = "complete"
+# Job was cancelled
 CANCELLED = "cancelled"
+# Problem submitting the job, interfacing with the job manager,
+# or postprocessing the job.
 FAILED = "failed"
+# DRM marked job as complete and job is being unstaged.
 POSTPROCESSING = "postprocessing"
+# Pulsar believed this job to be active but the job manager
+# cannot determine a state for it.
+LOST = "lost"
 
 
 def is_job_done(status):
     """ Does the supplied status correspond to a finished
     job (done processing).
     """
-    return status in [COMPLETE, CANCELLED, FAILED]
+    return status in [COMPLETE, CANCELLED, FAILED, LOST]
diff --git a/test/integration_test_restart.py b/test/integration_test_restart.py
index 53b7ebee..36185376 100644
--- a/test/integration_test_restart.py
+++ b/test/integration_test_restart.py
@@ -1,4 +1,7 @@
+import contextlib
 import threading
+import time
+
 from .test_utils import (
     TempDirectoryTestCase,
     skip_unless_module,
@@ -7,9 +10,9 @@ from .test_utils import (
 from pulsar.manager_endpoint_util import (
     submit_job,
 )
+from pulsar.managers.stateful import ActiveJobs
 from pulsar.client.amqp_exchange_factory import get_exchange
 from pulsar.managers.util.drmaa import DrmaaSessionFactory
-import time
 
 
 class RestartTestCase(TempDirectoryTestCase):
@@ -17,10 +20,7 @@ class RestartTestCase(TempDirectoryTestCase):
     @skip_unless_module("drmaa")
     @skip_unless_module("kombu")
     def test_restart_finishes_job(self):
-        mq_url = "memory://test1092"
-        app_conf = dict(message_queue_url=mq_url)
-        app_conf["managers"] = {"manager_restart": {'type': 'queued_drmaa'}}
-        with restartable_pulsar_app_provider(app_conf=app_conf, web=False) as app_provider:
+        with self._setup_app_provider("restart_and_finish") as app_provider:
             job_id = '12345'
 
             with app_provider.new_app() as app:
@@ -31,25 +31,71 @@ class RestartTestCase(TempDirectoryTestCase):
                     'setup': True,
                 }
                 submit_job(manager, job_info)
-                # TODO: unfortunate breaking of abstractions here.
-                time.sleep(.2)
-                external_id = manager._proxied_manager._external_id(job_id)
+                external_id = None
+                for i in range(10):
+                    time.sleep(.05)
+                    # TODO: unfortunate breaking of abstractions here.
+                    external_id = manager._proxied_manager._external_id(job_id)
+                    if external_id:
+                        break
+                if external_id is None:
+                    assert False, "Test failed, couldn't get exteranl id for job id."
 
             drmaa_session = DrmaaSessionFactory().get()
             drmaa_session.kill(external_id)
             drmaa_session.close()
-            time.sleep(.2)
-
-            consumer = SimpleConsumer(queue="status_update", url=mq_url, manager="manager_restart")
+            consumer = self._status_update_consumer("restart_and_finish")
             consumer.start()
 
             with app_provider.new_app() as app:
-                time.sleep(.3)
+                consumer.wait_for_messages()
 
             consumer.join()
             assert len(consumer.messages) == 1, len(consumer.messages)
             assert consumer.messages[0]["status"] == "complete"
 
+    @skip_unless_module("drmaa")
+    @skip_unless_module("kombu")
+    def test_recovery_failure_fires_lost_status(self):
+        test = "restart_and_finish"
+        with self._setup_app_provider(test) as app_provider:
+            job_id = '12345'
+
+            with app_provider.new_app() as app:
+                persistence_directory = app.persistence_directory
+
+            # Break some abstractions to activate a job that
+            # never existed.
+            manager_name = "manager_%s" % test
+            active_jobs = ActiveJobs(manager_name, persistence_directory)
+            active_jobs.activate_job(job_id)
+
+            consumer = self._status_update_consumer(test)
+            consumer.start()
+
+            with app_provider.new_app() as app:
+                consumer.wait_for_messages()
+
+            consumer.join()
+
+            assert len(consumer.messages) == 1, len(consumer.messages)
+            assert consumer.messages[0]["status"] == "lost"
+
+    @contextlib.contextmanager
+    def _setup_app_provider(self, test):
+        mq_url = "memory://test_%s" % test
+        manager = "manager_%s" % test
+        app_conf = dict(message_queue_url=mq_url)
+        app_conf["managers"] = {manager: {'type': 'queued_drmaa'}}
+        with restartable_pulsar_app_provider(app_conf=app_conf, web=False) as app_provider:
+            yield app_provider
+
+    def _status_update_consumer(self, test):
+        mq_url = "memory://test_%s" % test
+        manager = "manager_%s" % test
+        consumer = SimpleConsumer(queue="status_update", url=mq_url, manager=manager)
+        return consumer
+
 
 class SimpleConsumer(object):
 
@@ -58,7 +104,7 @@ class SimpleConsumer(object):
         self.url = url
         self.manager = manager
         self.active = True
-        self.exchange = get_exchange("memory://test1092", manager, {})
+        self.exchange = get_exchange(url, manager, {})
 
         self.messages = []
 
@@ -71,6 +117,10 @@ class SimpleConsumer(object):
         self.active = False
         self.thread.join(10)
 
+    def wait_for_messages(self, n=1):
+        while len(self.messages) < n:
+            time.sleep(.05)
+
     def _run(self):
         self.exchange.consume("status_update", self._callback, check=self)
 
diff --git a/test/persistence_test.py b/test/persistence_test.py
index 39c84535..86a57410 100644
--- a/test/persistence_test.py
+++ b/test/persistence_test.py
@@ -33,6 +33,7 @@ def test_persistence():
         assert (not(exists(touch_file)))
         queue1.shutdown()
         queue2 = StatefulManagerProxy(QueueManager('test', app, num_concurrent_jobs=1))
+        queue2.recover_active_jobs()
         time.sleep(1)
         assert exists(touch_file)
     finally:
-- 
GitLab