diff --git a/pulsar/client/client.py b/pulsar/client/client.py index ae6f24d280fd1be0ad34e6c65fc797d15867d516..d6b6d68066d27c5259137976b13a69b44f2e5d00 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -120,6 +120,9 @@ class JobClient(BaseJobClient): launch_params['env'] = json_dumps(env) if remote_staging: launch_params['remote_staging'] = json_dumps(remote_staging) + if job_config and 'touch_outputs' in job_config: + # message clients pass the entire job config + launch_params['submit_extras'] = json_dumps({'touch_outputs': job_config['touch_outputs']}) if job_config and self.setup_handler.local: # Setup not yet called, job properties were inferred from diff --git a/pulsar/client/staging/__init__.py b/pulsar/client/staging/__init__.py index de2718ade6edc4b507af2db608cd8e4ad86defcf..4d1f86affb60a561bf27306130cee9dbef39e28f 100644 --- a/pulsar/client/staging/__init__.py +++ b/pulsar/client/staging/__init__.py @@ -69,6 +69,7 @@ class ClientJobDescription(object): env=[], arbitrary_files=None, rewrite_paths=True, + touch_outputs=None, ): self.tool = tool self.command_line = command_line @@ -81,6 +82,7 @@ class ClientJobDescription(object): self.env = env self.rewrite_paths = rewrite_paths self.arbitrary_files = arbitrary_files or {} + self.touch_outputs = touch_outputs or [] @property def output_files(self): diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 92b93cd2faa3e271e20db5215a47ed8ed5a69669..d94dd9f639ec82cb698be3c96f3d04e7916fb540 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -92,6 +92,7 @@ class FileStager(object): self.action_mapper = FileActionMapper(client) self.__handle_setup(job_config) + self.__setup_touch_outputs(client_job_description.touch_outputs) self.transfer_tracker = TransferTracker(client, self.path_helper, self.action_mapper, self.job_inputs, rewrite_paths=self.rewrite_paths) @@ -137,6 +138,9 @@ class FileStager(object): self.client.job_id = self.job_id self.job_config = job_config + def __setup_touch_outputs(self, touch_outputs): + self.job_config['touch_outputs'] = touch_outputs + def __parse_remote_separator(self, job_config): separator = job_config.get("system_properties", {}).get("separator", None) if not separator: # Legacy Pulsar diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py index f57163d5047b13cd8a48b16e325134730384aa6e..828a31a0212df5c1b0600f92a48bd72ace9e8eec 100644 --- a/pulsar/manager_endpoint_util.py +++ b/pulsar/manager_endpoint_util.py @@ -69,6 +69,7 @@ def submit_job(manager, job_config): dependencies_description = job_config.get('dependencies_description', None) env = job_config.get('env', []) submit_params = job_config.get('submit_params', {}) + touch_outputs = job_config.get('touch_outputs', []) job_config = None if setup_params or force_setup: input_job_id = setup_params.get("job_id", job_id) @@ -89,6 +90,7 @@ def submit_job(manager, job_config): command_line = command_line.replace('__PULSAR_JOBS_DIRECTORY__', jobs_directory) # TODO: Handle __PULSAR_JOB_DIRECTORY__ config files, metadata files, etc... + manager.touch_outputs(job_id, touch_outputs) manager.handle_remote_staging(job_id, remote_staging) dependencies_description = dependencies.DependenciesDescription.from_dict(dependencies_description) diff --git a/pulsar/managers/stateful.py b/pulsar/managers/stateful.py index 2666673d0b0a361e75b6c53236c84243bd561000..b1eb5986727e5ef362b49b3c5c6ca7f76b3796a2 100644 --- a/pulsar/managers/stateful.py +++ b/pulsar/managers/stateful.py @@ -64,6 +64,12 @@ class StatefulManagerProxy(ManagerProxy): job_directory = self._proxied_manager.job_directory(job_id) job_directory.store_metadata("staging_config", staging_config) + def touch_outputs(self, job_id, touch_outputs): + job_directory = self._proxied_manager.job_directory(job_id) + for name in touch_outputs: + path = job_directory.calculate_path(name, 'output') + job_directory.open_file(path, mode='a') + def launch(self, job_id, *args, **kwargs): job_directory = self._proxied_manager.job_directory(job_id) diff --git a/pulsar/web/routes.py b/pulsar/web/routes.py index f460ef439cb488f030fe04e585a5306fd6a00b2a..5eaf5515ee0d682ba96c1c9007266daad1347c91 100644 --- a/pulsar/web/routes.py +++ b/pulsar/web/routes.py @@ -62,12 +62,13 @@ def clean(manager, job_id): @PulsarController(path="/jobs/{job_id}/submit", method="POST") -def submit(manager, job_id, command_line, params='{}', dependencies_description='null', setup_params='{}', remote_staging='{}', env='[]'): +def submit(manager, job_id, command_line, params='{}', dependencies_description='null', setup_params='{}', remote_staging='{}', env='[]', submit_extras='{}'): submit_params = loads(params) setup_params = loads(setup_params) dependencies_description = loads(dependencies_description) env = loads(env) remote_staging = loads(remote_staging) + submit_extras = loads(submit_extras) submit_config = dict( job_id=job_id, command_line=command_line, @@ -77,6 +78,7 @@ def submit(manager, job_id, command_line, params='{}', dependencies_description= env=env, remote_staging=remote_staging, ) + submit_config.update(submit_extras) submit_job(manager, submit_config)