From 4c6dda3644d884c173a1905ba52060e15b7488b2 Mon Sep 17 00:00:00 2001 From: John Chilton <jmchilton@gmail.com> Date: Mon, 13 May 2019 10:21:34 -0400 Subject: [PATCH] Progress toward structured inputs... --- pulsar/client/staging/__init__.py | 4 +++ pulsar/client/staging/up.py | 48 ++++++++++++++++++------------- 2 files changed, 32 insertions(+), 20 deletions(-) diff --git a/pulsar/client/staging/__init__.py b/pulsar/client/staging/__init__.py index 453d276a..a141968f 100644 --- a/pulsar/client/staging/__init__.py +++ b/pulsar/client/staging/__init__.py @@ -159,6 +159,10 @@ class ClientInput(object): self.path = path self.input_type = input_type + @property + def action_source(self): + return {"path": self.path} + class ClientOutputs(object): """ Abstraction describing the output datasets EXPECTED by the Galaxy job diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 2207cd02..e9b0c043 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -212,43 +212,42 @@ class FileStager(object): def __upload_input_files(self): handled_inputs = set() for client_input in self.client_inputs: + # TODO: use object identity to handle this. path = client_input.path if path in handled_inputs: continue if client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_PATH: - self.__upload_input_file(path) + self.__upload_input_file(client_input.action_source) handled_inputs.add(path) elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH: - self.__upload_input_extra_files(path) + self.__upload_input_extra_files(client_input.action_source) handled_inputs.add(path) elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH: - self.__upload_input_metadata_file(path) + self.__upload_input_metadata_file(client_input.action_source) handled_inputs.add(path) else: raise NotImplementedError() - def __upload_input_file(self, input_file): - if self.__stage_input(input_file): - if exists(input_file): - self.transfer_tracker.handle_transfer_path(input_file, path_type.INPUT) - else: - message = "Pulsar: __upload_input_file called on empty or missing dataset." + \ - " No such file: [%s]" % input_file - log.debug(message) + def __upload_input_file(self, input_action_source): + if self.__stage_input(input_action_source): + self.transfer_tracker.handle_transfer_source(input_action_source, path_type.INPUT) - def __upload_input_extra_files(self, files_path): - if self.__stage_input(files_path): + def __upload_input_extra_files(self, input_action_source): + if self.__stage_input(input_action_source): + # TODO: needs to happen else where if using remote object store staging + # but we don't have the action type yet. + files_path = input_action_source['path'] for extra_file_name in directory_files(files_path): extra_file_path = join(files_path, extra_file_name) remote_name = self.path_helper.remote_name(relpath(extra_file_path, dirname(files_path))) self.transfer_tracker.handle_transfer_path(extra_file_path, path_type.INPUT, name=remote_name) - def __upload_input_metadata_file(self, path): - if self.__stage_input(path): + def __upload_input_metadata_file(self, input_action_source): + if self.__stage_input(input_action_source): # Name must match what is generated in remote_input_path_rewrite in path_mapper. - remote_name = "metadata_%s" % basename(path) - self.transfer_tracker.handle_transfer_path(path, path_type.INPUT, name=remote_name) + remote_name = "metadata_%s" % basename(input_action_source['path']) + self.transfer_tracker.handle_transfer_source(input_action_source, path_type.INPUT, name=remote_name) def __upload_working_directory_files(self): # Task manager stages files into working directory, these need to be @@ -317,10 +316,13 @@ class FileStager(object): """ return self.job_inputs.command_line - def __stage_input(self, file_path): + def __stage_input(self, source): + if not self.rewrite_paths: + return True + # If we have disabled path rewriting, just assume everything needs to be transferred, # else check to ensure the file is referenced before transferring it. - return (not self.rewrite_paths) or self.job_inputs.path_referenced(file_path) + return self.job_inputs.path_referenced(source['path']) class JobInputs(object): @@ -448,6 +450,12 @@ class TransferTracker(object): local_action = action.staging_action_local if local_action: path = source['path'] + if not exists(path): + message = "Pulsar: __upload_input_file called on empty or missing dataset." + \ + " No such file: [%s]" % path + log.debug(message) + return + response = self.client.put_file(path, type, name=name, contents=contents, action_type=action.action_type) def get_path(): @@ -490,7 +498,7 @@ class TransferTracker(object): else: path = source.get("path") if path is not None and not exists(path): - message = "handle_transfer_path called on non-existent file - [%s]" % path + message = "__action_for_transfer called on non-existent file - [%s]" % path log.warn(message) raise Exception(message) action = self.__action(source, type) -- GitLab