From f477bc4a764c1d8bf9ade43e7e8c541a32a3addb Mon Sep 17 00:00:00 2001 From: John Chilton <jmchilton@gmail.com> Date: Fri, 10 May 2019 10:58:16 -0400 Subject: [PATCH] Refactor toward more structured inputs. --- pulsar/client/staging/__init__.py | 62 +++++++++++++++++++++++++++++-- pulsar/client/staging/up.py | 28 ++++++++------ 2 files changed, 75 insertions(+), 15 deletions(-) diff --git a/pulsar/client/staging/__init__.py b/pulsar/client/staging/__init__.py index 8be7d2bc..17e257ea 100644 --- a/pulsar/client/staging/__init__.py +++ b/pulsar/client/staging/__init__.py @@ -5,9 +5,12 @@ from os import sep from os.path import ( basename, dirname, + exists, join, ) +from galaxy.util.bunch import Bunch + from ..util import PathHelper COMMAND_VERSION_FILENAME = "COMMAND_VERSION" @@ -60,8 +63,9 @@ class ClientJobDescription(object): self, command_line, tool=None, - config_files=[], - input_files=[], + config_files=None, + input_files=None, + client_inputs=None, client_outputs=None, working_directory=None, metadata_directory=None, @@ -75,8 +79,12 @@ class ClientJobDescription(object): ): self.tool = tool self.command_line = command_line - self.config_files = config_files - self.input_files = input_files + self.config_files = config_files or [] + if input_files is not None: + # Deprecated input but provided for backward compatibility. + assert client_inputs is None + client_inputs = ClientInputs.for_simple_input_paths(input_files) + self.client_inputs = client_inputs self.client_outputs = client_outputs or ClientOutputs() self.working_directory = working_directory self.metadata_directory = metadata_directory @@ -88,6 +96,15 @@ class ClientJobDescription(object): self.container = container self.remote_pulsar_app_config = remote_pulsar_app_config + @property + def input_files(self): + # Deprecated but provided for backward compatibility. + input_files = [] + for client_input in self.client_inputs: + if client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_PATH: + input_files.append(client_input.path) + return input_files + @property def output_files(self): return self.client_outputs.output_files @@ -106,6 +123,43 @@ class ClientJobDescription(object): ) +class ClientInputs(object): + """Abstraction describing input datasets for a job.""" + + def __init__(self, client_inputs): + self.client_inputs = client_inputs + + def __iter__(self): + return iter(self.client_inputs) + + @staticmethod + def for_simple_input_paths(input_files): + # Legacy: just assume extra files path based on inputs, probably not + # the best behavior - ignores object store for instance. + client_inputs = [] + for input_file in input_files: + client_inputs.append(ClientInput(input_file, CLIENT_INPUT_PATH_TYPES.INPUT_PATH)) + files_path = "%s_files" % input_file[0:-len(".dat")] + if exists(files_path): + client_inputs.append(ClientInput(input_file, CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH)) + + return ClientInputs(client_inputs) + + +CLIENT_INPUT_PATH_TYPES = Bunch( + INPUT_PATH="input_path", + INPUT_EXTRA_FILES_PATH="input_extra_files_path", + INPUT_METADATA_PATH="input_metadata_path", +) + + +class ClientInput(object): + + def __init__(self, path, input_type): + self.path = path + self.input_type = input_type + + class ClientOutputs(object): """ Abstraction describing the output datasets EXPECTED by the Galaxy job runner client. diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 0143ce5e..a6d8f8f2 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -16,7 +16,7 @@ from ..action_mapper import FileActionMapper from ..action_mapper import MessageAction from ..action_mapper import path_type from ..job_directory import RemoteJobDirectory -from ..staging import COMMAND_VERSION_FILENAME +from ..staging import CLIENT_INPUT_PATH_TYPES, COMMAND_VERSION_FILENAME from ..util import directory_files from ..util import PathHelper @@ -75,7 +75,7 @@ class FileStager(object): self.client = client self.command_line = client_job_description.command_line self.config_files = client_job_description.config_files - self.input_files = client_job_description.input_files + self.client_inputs = client_job_description.client_inputs self.output_files = client_job_description.output_files if client_job_description.tool is not None: self.tool_id = client_job_description.tool.id @@ -211,12 +211,19 @@ class FileStager(object): def __upload_input_files(self): handled_inputs = set() - for input_file in self.input_files: - if input_file in handled_inputs: + for client_input in self.client_inputs: + path = client_input.path + if path in handled_inputs: continue - self.__upload_input_file(input_file) - self.__upload_input_extra_files(input_file) - handled_inputs.add(input_file) + if client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_PATH: + self.__upload_input_file(path) + handled_inputs.add(path) + elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH: + self.__upload_input_extra_files(path) + handled_inputs.add(path) + else: + # TODO: implement metadata... + raise NotImplementedError() def __upload_input_file(self, input_file): if self.__stage_input(input_file): @@ -224,12 +231,11 @@ class FileStager(object): self.transfer_tracker.handle_transfer(input_file, path_type.INPUT) else: message = "Pulsar: __upload_input_file called on empty or missing dataset." + \ - " So such file: [%s]" % input_file + " No such file: [%s]" % input_file log.debug(message) - def __upload_input_extra_files(self, input_file): - files_path = "%s_files" % input_file[0:-len(".dat")] - if exists(files_path) and self.__stage_input(files_path): + def __upload_input_extra_files(self, files_path): + if self.__stage_input(files_path): for extra_file_name in directory_files(files_path): extra_file_path = join(files_path, extra_file_name) remote_name = self.path_helper.remote_name(relpath(extra_file_path, dirname(files_path))) -- GitLab