From 6edd45dc880865ef33fd3607be4752cc97a0e52f Mon Sep 17 00:00:00 2001 From: John Chilton <jmchilton@gmail.com> Date: Wed, 8 Apr 2015 20:51:21 -0400 Subject: [PATCH] Implemented test case for restart causes MQ to get finalize message. --- pulsar/core.py | 7 +++ pulsar/manager_endpoint_util.py | 5 +- pulsar/managers/staging/post.py | 4 +- pulsar/web/routes.py | 2 +- test/integration_test_restart.py | 84 ++++++++++++++++++++++++++++++++ 5 files changed, 96 insertions(+), 6 deletions(-) create mode 100644 test/integration_test_restart.py diff --git a/pulsar/core.py b/pulsar/core.py index ce0af41b..4d801434 100644 --- a/pulsar/core.py +++ b/pulsar/core.py @@ -123,3 +123,10 @@ class PulsarApp(object): def __setup_job_metrics(self, conf): job_metrics_config_file = conf.get("job_metrics_config_file", "job_metrics_conf.xml") self.job_metrics = JobMetrics(job_metrics_config_file) + + @property + def only_manager(self): + # convience method for tests, etc... where when we know there + # is only one manager. + assert len(self.managers) == 1 + return self.managers.values()[0] diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py index f560af3a..2ed6dc50 100644 --- a/pulsar/manager_endpoint_util.py +++ b/pulsar/manager_endpoint_util.py @@ -72,9 +72,8 @@ def submit_job(manager, job_config): jobs_directory = os.path.abspath(os.path.join(job_directory, os.pardir)) command_line = command_line.replace('__PULSAR_JOBS_DIRECTORY__', jobs_directory) - if remote_staging: - # TODO: Handle __PULSAR_JOB_DIRECTORY__ config files, metadata files, etc... - manager.handle_remote_staging(job_id, remote_staging) + # TODO: Handle __PULSAR_JOB_DIRECTORY__ config files, metadata files, etc... + manager.handle_remote_staging(job_id, remote_staging) dependencies_description = dependencies.DependenciesDescription.from_dict(dependencies_description) return manager.launch( diff --git a/pulsar/managers/staging/post.py b/pulsar/managers/staging/post.py index b90c7714..69178217 100644 --- a/pulsar/managers/staging/post.py +++ b/pulsar/managers/staging/post.py @@ -15,8 +15,8 @@ def postprocess(job_directory, action_executor): # Returns True iff outputs were collected. try: staging_config = job_directory.load_metadata("staging_config", None) - if staging_config: - return __collect_outputs(job_directory, staging_config, action_executor) + collected = __collect_outputs(job_directory, staging_config, action_executor) + return collected finally: job_directory.write_file("postprocessed", "") return False diff --git a/pulsar/web/routes.py b/pulsar/web/routes.py index e9472edc..ca3c637c 100644 --- a/pulsar/web/routes.py +++ b/pulsar/web/routes.py @@ -59,7 +59,7 @@ def clean(manager, job_id): @PulsarController(path="/jobs/{job_id}/submit", method="POST") -def submit(manager, job_id, command_line, params='{}', dependencies_description='null', setup_params='{}', remote_staging='[]', env='[]'): +def submit(manager, job_id, command_line, params='{}', dependencies_description='null', setup_params='{}', remote_staging='{}', env='[]'): submit_params = loads(params) setup_params = loads(setup_params) dependencies_description = loads(dependencies_description) diff --git a/test/integration_test_restart.py b/test/integration_test_restart.py new file mode 100644 index 00000000..31c692f7 --- /dev/null +++ b/test/integration_test_restart.py @@ -0,0 +1,84 @@ +import threading +from .test_utils import ( + TempDirectoryTestCase, + skip_unless_module, + restartable_pulsar_app_provider, +) +from pulsar.manager_endpoint_util import ( + submit_job, + status_dict, +) +from pulsar.client.amqp_exchange_factory import get_exchange +from pulsar.managers.util.drmaa import DrmaaSessionFactory +import time + + +class RestartTestCase(TempDirectoryTestCase): + + @skip_unless_module("drmaa") + @skip_unless_module("kombu") + def test_restart_finishes_job(self): + mq_url = "memory://test1092" + app_conf = dict(message_queue_url=mq_url) + app_conf["managers"] = {"manager_restart": {'type': 'queued_drmaa'}} + with restartable_pulsar_app_provider(app_conf=app_conf, web=False) as app_provider: + job_id = '12345' + + with app_provider.new_app() as app: + manager = app.only_manager + job_info = { + 'job_id': job_id, + 'command_line': 'sleep 1000', + 'setup': True, + } + submit_job(manager, job_info) + # TODO: unfortunate breaking of abstractions here. + time.sleep(.2) + external_id = manager._proxied_manager._external_id(job_id) + print status_dict(manager, job_id) + + drmaa_session = DrmaaSessionFactory().get() + drmaa_session.kill(external_id) + drmaa_session.close() + time.sleep(.2) + + consumer = SimpleConsumer(queue="status_update", url=mq_url, manager="manager_restart") + consumer.start() + + with app_provider.new_app() as app: + time.sleep(.3) + + consumer.join() + assert len(consumer.messages) == 1, len(consumer.messages) + assert consumer.messages[0]["status"] == "complete" + + +class SimpleConsumer(object): + + def __init__(self, queue, url, manager="_default_"): + self.queue = queue + self.url = url + self.manager = manager + self.active = True + self.exchange = get_exchange("memory://test1092", manager, {}) + + self.messages = [] + + def start(self): + t = threading.Thread(target=self._run) + t.start() + self.thread = t + + def join(self): + self.active = False + self.thread.join(10) + + def _run(self): + self.exchange.consume("status_update", self._callback, check=self) + + def _callback(self, body, message): + self.messages.append(body) + message.ack() + + def __nonzero__(self): + return self.active -- GitLab