Skip to content
Snippets Groups Projects
Commit 61ed774a authored by John Chilton's avatar John Chilton
Browse files

Revise job recovery behavior.

 - Fix bug where jobs may have been recovered before the MQ callback was set on the manager.
 - Add a new LOST job state for jobs that cannot be recovered and the Pulsar is going to abandon.
 - Fire off callback for LOST jobs.
 - Add test case to verify callback is called for LOST jobs.
parent f1a9e836
No related branches found
No related tags found
No related merge requests found
......@@ -41,6 +41,7 @@ class PulsarApp(object):
self.__setup_managers(conf)
self.__setup_file_cache(conf)
self.__setup_bind_to_message_queue(conf)
self.__recover_jobs()
self.ensure_cleanup = conf.get("ensure_cleanup", False)
def shutdown(self, timeout=None):
......@@ -86,6 +87,10 @@ class PulsarApp(object):
def __setup_managers(self, conf):
self.managers = build_managers(self, conf)
def __recover_jobs(self):
for manager in self.managers.values():
manager.recover_active_jobs()
def __setup_private_token(self, private_token):
self.private_token = private_token
if private_token:
......
......@@ -7,6 +7,7 @@ from .directory import DirectoryBaseManager
DEFAULT_JOB_NAME_TEMPLATE = "pulsar_$job_id"
JOB_FILE_EXTERNAL_ID = "external_id"
FAILED_TO_LOAD_EXTERNAL_ID = object()
log = logging.getLogger(__name__)
......@@ -61,9 +62,11 @@ class ExternalBaseManager(DirectoryBaseManager):
return Template(self.job_name_template).safe_substitute(env)
def _recover_active_job(self, job_id):
external_id = self._job_directory(job_id).load_metadata(JOB_FILE_EXTERNAL_ID)
if external_id:
external_id = self._job_directory(job_id).load_metadata(JOB_FILE_EXTERNAL_ID, FAILED_TO_LOAD_EXTERNAL_ID)
if external_id and external_id is not FAILED_TO_LOAD_EXTERNAL_ID:
self._external_ids[job_id] = external_id
else:
raise Exception("Could not determine external ID for job_id [%s]" % job_id)
def _deactivate_job(self, job_id):
del self._external_ids[job_id]
......@@ -40,15 +40,17 @@ class StatefulManagerProxy(ManagerProxy):
self.__preprocess_action_executor = RetryActionExecutor(**preprocess_retry_action_kwds)
self.__postprocess_action_executor = RetryActionExecutor(**postprocess_retry_action_kwds)
self.min_polling_interval = datetime.timedelta(0, min_polling_interval)
self.active_jobs = ActiveJobs(manager)
self.__state_change_callback = lambda status, job_id: None
self.__recover_active_jobs()
self.active_jobs = ActiveJobs.from_manager(manager)
self.__state_change_callback = self._default_status_change_callback
self.__monitor = None
def set_state_change_callback(self, state_change_callback):
self.__state_change_callback = state_change_callback
self.__monitor = ManagerMonitor(self)
def _default_status_change_callback(self, status, job_id):
log.info("Status of job [%s] changed to [%s]. No callbacks enabled." % (status, job_id))
@property
def name(self):
return self._proxied_manager.name
......@@ -163,7 +165,7 @@ class StatefulManagerProxy(ManagerProxy):
log.exception("Failed to shutdown job monitor for manager %s" % self.name)
super(StatefulManagerProxy, self).shutdown(timeout)
def __recover_active_jobs(self):
def recover_active_jobs(self):
recover_method = getattr(self._proxied_manager, "_recover_active_job", None)
if recover_method is None:
return
......@@ -173,6 +175,12 @@ class StatefulManagerProxy(ManagerProxy):
recover_method(job_id)
except Exception:
log.exception("Failed to recover active job %s" % job_id)
self.__handle_recovery_problem(job_id)
def __handle_recovery_problem(self, job_id):
# Make sure we tell the client we have lost this job.
self.active_jobs.deactivate_job(job_id)
self.__state_change_callback(status.LOST, job_id)
class ActiveJobs(object):
......@@ -184,10 +192,15 @@ class ActiveJobs(object):
hit disk to recover this information.
"""
def __init__(self, manager):
@staticmethod
def from_manager(manager):
persistence_directory = manager.persistence_directory
manager_name = manager.name
return ActiveJobs(manager_name, persistence_directory)
def __init__(self, manager_name, persistence_directory):
if persistence_directory:
active_job_directory = os.path.join(persistence_directory, "%s-active-jobs" % manager.name)
active_job_directory = os.path.join(persistence_directory, "%s-active-jobs" % manager_name)
if not os.path.exists(active_job_directory):
os.makedirs(active_job_directory)
else:
......
# TODO: Make objects.
# Job is staging about will be queued shortly.
PREPROCESSING = "preprocessing"
# Job manager has queued this job for execution.
QUEUED = "queued"
# Job manager believes the job is currently running.
RUNNING = "running"
# Job manager has finished and postprocessing ran successfully.
COMPLETE = "complete"
# Job was cancelled
CANCELLED = "cancelled"
# Problem submitting the job, interfacing with the job manager,
# or postprocessing the job.
FAILED = "failed"
# DRM marked job as complete and job is being unstaged.
POSTPROCESSING = "postprocessing"
# Pulsar believed this job to be active but the job manager
# cannot determine a state for it.
LOST = "lost"
def is_job_done(status):
""" Does the supplied status correspond to a finished
job (done processing).
"""
return status in [COMPLETE, CANCELLED, FAILED]
return status in [COMPLETE, CANCELLED, FAILED, LOST]
import contextlib
import threading
import time
from .test_utils import (
TempDirectoryTestCase,
skip_unless_module,
......@@ -7,9 +10,9 @@ from .test_utils import (
from pulsar.manager_endpoint_util import (
submit_job,
)
from pulsar.managers.stateful import ActiveJobs
from pulsar.client.amqp_exchange_factory import get_exchange
from pulsar.managers.util.drmaa import DrmaaSessionFactory
import time
class RestartTestCase(TempDirectoryTestCase):
......@@ -17,10 +20,7 @@ class RestartTestCase(TempDirectoryTestCase):
@skip_unless_module("drmaa")
@skip_unless_module("kombu")
def test_restart_finishes_job(self):
mq_url = "memory://test1092"
app_conf = dict(message_queue_url=mq_url)
app_conf["managers"] = {"manager_restart": {'type': 'queued_drmaa'}}
with restartable_pulsar_app_provider(app_conf=app_conf, web=False) as app_provider:
with self._setup_app_provider("restart_and_finish") as app_provider:
job_id = '12345'
with app_provider.new_app() as app:
......@@ -31,25 +31,71 @@ class RestartTestCase(TempDirectoryTestCase):
'setup': True,
}
submit_job(manager, job_info)
# TODO: unfortunate breaking of abstractions here.
time.sleep(.2)
external_id = manager._proxied_manager._external_id(job_id)
external_id = None
for i in range(10):
time.sleep(.05)
# TODO: unfortunate breaking of abstractions here.
external_id = manager._proxied_manager._external_id(job_id)
if external_id:
break
if external_id is None:
assert False, "Test failed, couldn't get exteranl id for job id."
drmaa_session = DrmaaSessionFactory().get()
drmaa_session.kill(external_id)
drmaa_session.close()
time.sleep(.2)
consumer = SimpleConsumer(queue="status_update", url=mq_url, manager="manager_restart")
consumer = self._status_update_consumer("restart_and_finish")
consumer.start()
with app_provider.new_app() as app:
time.sleep(.3)
consumer.wait_for_messages()
consumer.join()
assert len(consumer.messages) == 1, len(consumer.messages)
assert consumer.messages[0]["status"] == "complete"
@skip_unless_module("drmaa")
@skip_unless_module("kombu")
def test_recovery_failure_fires_lost_status(self):
test = "restart_and_finish"
with self._setup_app_provider(test) as app_provider:
job_id = '12345'
with app_provider.new_app() as app:
persistence_directory = app.persistence_directory
# Break some abstractions to activate a job that
# never existed.
manager_name = "manager_%s" % test
active_jobs = ActiveJobs(manager_name, persistence_directory)
active_jobs.activate_job(job_id)
consumer = self._status_update_consumer(test)
consumer.start()
with app_provider.new_app() as app:
consumer.wait_for_messages()
consumer.join()
assert len(consumer.messages) == 1, len(consumer.messages)
assert consumer.messages[0]["status"] == "lost"
@contextlib.contextmanager
def _setup_app_provider(self, test):
mq_url = "memory://test_%s" % test
manager = "manager_%s" % test
app_conf = dict(message_queue_url=mq_url)
app_conf["managers"] = {manager: {'type': 'queued_drmaa'}}
with restartable_pulsar_app_provider(app_conf=app_conf, web=False) as app_provider:
yield app_provider
def _status_update_consumer(self, test):
mq_url = "memory://test_%s" % test
manager = "manager_%s" % test
consumer = SimpleConsumer(queue="status_update", url=mq_url, manager=manager)
return consumer
class SimpleConsumer(object):
......@@ -58,7 +104,7 @@ class SimpleConsumer(object):
self.url = url
self.manager = manager
self.active = True
self.exchange = get_exchange("memory://test1092", manager, {})
self.exchange = get_exchange(url, manager, {})
self.messages = []
......@@ -71,6 +117,10 @@ class SimpleConsumer(object):
self.active = False
self.thread.join(10)
def wait_for_messages(self, n=1):
while len(self.messages) < n:
time.sleep(.05)
def _run(self):
self.exchange.consume("status_update", self._callback, check=self)
......
......@@ -33,6 +33,7 @@ def test_persistence():
assert (not(exists(touch_file)))
queue1.shutdown()
queue2 = StatefulManagerProxy(QueueManager('test', app, num_concurrent_jobs=1))
queue2.recover_active_jobs()
time.sleep(1)
assert exists(touch_file)
finally:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment