diff --git a/pulsar/core.py b/pulsar/core.py index ce0af41bd0d6525ff781ea2aa3ae5781b9f58723..4d801434df56d349329fa54e8ef494f53881af2c 100644 --- a/pulsar/core.py +++ b/pulsar/core.py @@ -123,3 +123,10 @@ class PulsarApp(object): def __setup_job_metrics(self, conf): job_metrics_config_file = conf.get("job_metrics_config_file", "job_metrics_conf.xml") self.job_metrics = JobMetrics(job_metrics_config_file) + + @property + def only_manager(self): + # convience method for tests, etc... where when we know there + # is only one manager. + assert len(self.managers) == 1 + return self.managers.values()[0] diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py index f560af3a738b1fbc6b7fc1636dfe3e4b5fb40d4e..2ed6dc5057b4627ebd648dd1653a8389fafd1142 100644 --- a/pulsar/manager_endpoint_util.py +++ b/pulsar/manager_endpoint_util.py @@ -72,9 +72,8 @@ def submit_job(manager, job_config): jobs_directory = os.path.abspath(os.path.join(job_directory, os.pardir)) command_line = command_line.replace('__PULSAR_JOBS_DIRECTORY__', jobs_directory) - if remote_staging: - # TODO: Handle __PULSAR_JOB_DIRECTORY__ config files, metadata files, etc... - manager.handle_remote_staging(job_id, remote_staging) + # TODO: Handle __PULSAR_JOB_DIRECTORY__ config files, metadata files, etc... + manager.handle_remote_staging(job_id, remote_staging) dependencies_description = dependencies.DependenciesDescription.from_dict(dependencies_description) return manager.launch( diff --git a/pulsar/managers/staging/post.py b/pulsar/managers/staging/post.py index b90c771414aa344ca1e04c749e55cf0fbd3bf4ef..69178217acaf95fbc1c232f6f9e33f0a67d5f322 100644 --- a/pulsar/managers/staging/post.py +++ b/pulsar/managers/staging/post.py @@ -15,8 +15,8 @@ def postprocess(job_directory, action_executor): # Returns True iff outputs were collected. try: staging_config = job_directory.load_metadata("staging_config", None) - if staging_config: - return __collect_outputs(job_directory, staging_config, action_executor) + collected = __collect_outputs(job_directory, staging_config, action_executor) + return collected finally: job_directory.write_file("postprocessed", "") return False diff --git a/pulsar/web/routes.py b/pulsar/web/routes.py index e9472edce979846e768f83ec08bf3be474803d02..ca3c637cf7277933bf32e1fbc5667e154f61f46a 100644 --- a/pulsar/web/routes.py +++ b/pulsar/web/routes.py @@ -59,7 +59,7 @@ def clean(manager, job_id): @PulsarController(path="/jobs/{job_id}/submit", method="POST") -def submit(manager, job_id, command_line, params='{}', dependencies_description='null', setup_params='{}', remote_staging='[]', env='[]'): +def submit(manager, job_id, command_line, params='{}', dependencies_description='null', setup_params='{}', remote_staging='{}', env='[]'): submit_params = loads(params) setup_params = loads(setup_params) dependencies_description = loads(dependencies_description) diff --git a/test/integration_test_restart.py b/test/integration_test_restart.py new file mode 100644 index 0000000000000000000000000000000000000000..31c692f77973142f37ed81012cb813021f38f270 --- /dev/null +++ b/test/integration_test_restart.py @@ -0,0 +1,84 @@ +import threading +from .test_utils import ( + TempDirectoryTestCase, + skip_unless_module, + restartable_pulsar_app_provider, +) +from pulsar.manager_endpoint_util import ( + submit_job, + status_dict, +) +from pulsar.client.amqp_exchange_factory import get_exchange +from pulsar.managers.util.drmaa import DrmaaSessionFactory +import time + + +class RestartTestCase(TempDirectoryTestCase): + + @skip_unless_module("drmaa") + @skip_unless_module("kombu") + def test_restart_finishes_job(self): + mq_url = "memory://test1092" + app_conf = dict(message_queue_url=mq_url) + app_conf["managers"] = {"manager_restart": {'type': 'queued_drmaa'}} + with restartable_pulsar_app_provider(app_conf=app_conf, web=False) as app_provider: + job_id = '12345' + + with app_provider.new_app() as app: + manager = app.only_manager + job_info = { + 'job_id': job_id, + 'command_line': 'sleep 1000', + 'setup': True, + } + submit_job(manager, job_info) + # TODO: unfortunate breaking of abstractions here. + time.sleep(.2) + external_id = manager._proxied_manager._external_id(job_id) + print status_dict(manager, job_id) + + drmaa_session = DrmaaSessionFactory().get() + drmaa_session.kill(external_id) + drmaa_session.close() + time.sleep(.2) + + consumer = SimpleConsumer(queue="status_update", url=mq_url, manager="manager_restart") + consumer.start() + + with app_provider.new_app() as app: + time.sleep(.3) + + consumer.join() + assert len(consumer.messages) == 1, len(consumer.messages) + assert consumer.messages[0]["status"] == "complete" + + +class SimpleConsumer(object): + + def __init__(self, queue, url, manager="_default_"): + self.queue = queue + self.url = url + self.manager = manager + self.active = True + self.exchange = get_exchange("memory://test1092", manager, {}) + + self.messages = [] + + def start(self): + t = threading.Thread(target=self._run) + t.start() + self.thread = t + + def join(self): + self.active = False + self.thread.join(10) + + def _run(self): + self.exchange.consume("status_update", self._callback, check=self) + + def _callback(self, body, message): + self.messages.append(body) + message.ack() + + def __nonzero__(self): + return self.active