From db5e9455673d6777ffa87c2cd0734c15ba54e71d Mon Sep 17 00:00:00 2001 From: Nate Coraor <nate@bx.psu.edu> Date: Wed, 20 Sep 2017 12:27:31 -0400 Subject: [PATCH] Precreate Galaxy tool outputs on the remote before executing. --- pulsar/client/client.py | 3 +++ pulsar/client/staging/__init__.py | 2 ++ pulsar/client/staging/up.py | 4 ++++ pulsar/manager_endpoint_util.py | 2 ++ pulsar/managers/stateful.py | 6 ++++++ pulsar/web/routes.py | 4 +++- 6 files changed, 20 insertions(+), 1 deletion(-) diff --git a/pulsar/client/client.py b/pulsar/client/client.py index ae6f24d2..d6b6d680 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -120,6 +120,9 @@ class JobClient(BaseJobClient): launch_params['env'] = json_dumps(env) if remote_staging: launch_params['remote_staging'] = json_dumps(remote_staging) + if job_config and 'touch_outputs' in job_config: + # message clients pass the entire job config + launch_params['submit_extras'] = json_dumps({'touch_outputs': job_config['touch_outputs']}) if job_config and self.setup_handler.local: # Setup not yet called, job properties were inferred from diff --git a/pulsar/client/staging/__init__.py b/pulsar/client/staging/__init__.py index de2718ad..4d1f86af 100644 --- a/pulsar/client/staging/__init__.py +++ b/pulsar/client/staging/__init__.py @@ -69,6 +69,7 @@ class ClientJobDescription(object): env=[], arbitrary_files=None, rewrite_paths=True, + touch_outputs=None, ): self.tool = tool self.command_line = command_line @@ -81,6 +82,7 @@ class ClientJobDescription(object): self.env = env self.rewrite_paths = rewrite_paths self.arbitrary_files = arbitrary_files or {} + self.touch_outputs = touch_outputs or [] @property def output_files(self): diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 92b93cd2..d94dd9f6 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -92,6 +92,7 @@ class FileStager(object): self.action_mapper = FileActionMapper(client) self.__handle_setup(job_config) + self.__setup_touch_outputs(client_job_description.touch_outputs) self.transfer_tracker = TransferTracker(client, self.path_helper, self.action_mapper, self.job_inputs, rewrite_paths=self.rewrite_paths) @@ -137,6 +138,9 @@ class FileStager(object): self.client.job_id = self.job_id self.job_config = job_config + def __setup_touch_outputs(self, touch_outputs): + self.job_config['touch_outputs'] = touch_outputs + def __parse_remote_separator(self, job_config): separator = job_config.get("system_properties", {}).get("separator", None) if not separator: # Legacy Pulsar diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py index f57163d5..828a31a0 100644 --- a/pulsar/manager_endpoint_util.py +++ b/pulsar/manager_endpoint_util.py @@ -69,6 +69,7 @@ def submit_job(manager, job_config): dependencies_description = job_config.get('dependencies_description', None) env = job_config.get('env', []) submit_params = job_config.get('submit_params', {}) + touch_outputs = job_config.get('touch_outputs', []) job_config = None if setup_params or force_setup: input_job_id = setup_params.get("job_id", job_id) @@ -89,6 +90,7 @@ def submit_job(manager, job_config): command_line = command_line.replace('__PULSAR_JOBS_DIRECTORY__', jobs_directory) # TODO: Handle __PULSAR_JOB_DIRECTORY__ config files, metadata files, etc... + manager.touch_outputs(job_id, touch_outputs) manager.handle_remote_staging(job_id, remote_staging) dependencies_description = dependencies.DependenciesDescription.from_dict(dependencies_description) diff --git a/pulsar/managers/stateful.py b/pulsar/managers/stateful.py index 2666673d..b1eb5986 100644 --- a/pulsar/managers/stateful.py +++ b/pulsar/managers/stateful.py @@ -64,6 +64,12 @@ class StatefulManagerProxy(ManagerProxy): job_directory = self._proxied_manager.job_directory(job_id) job_directory.store_metadata("staging_config", staging_config) + def touch_outputs(self, job_id, touch_outputs): + job_directory = self._proxied_manager.job_directory(job_id) + for name in touch_outputs: + path = job_directory.calculate_path(name, 'output') + job_directory.open_file(path, mode='a') + def launch(self, job_id, *args, **kwargs): job_directory = self._proxied_manager.job_directory(job_id) diff --git a/pulsar/web/routes.py b/pulsar/web/routes.py index f460ef43..5eaf5515 100644 --- a/pulsar/web/routes.py +++ b/pulsar/web/routes.py @@ -62,12 +62,13 @@ def clean(manager, job_id): @PulsarController(path="/jobs/{job_id}/submit", method="POST") -def submit(manager, job_id, command_line, params='{}', dependencies_description='null', setup_params='{}', remote_staging='{}', env='[]'): +def submit(manager, job_id, command_line, params='{}', dependencies_description='null', setup_params='{}', remote_staging='{}', env='[]', submit_extras='{}'): submit_params = loads(params) setup_params = loads(setup_params) dependencies_description = loads(dependencies_description) env = loads(env) remote_staging = loads(remote_staging) + submit_extras = loads(submit_extras) submit_config = dict( job_id=job_id, command_line=command_line, @@ -77,6 +78,7 @@ def submit(manager, job_id, command_line, params='{}', dependencies_description= env=env, remote_staging=remote_staging, ) + submit_config.update(submit_extras) submit_job(manager, submit_config) -- GitLab