diff --git a/.gitignore b/.gitignore index 31a3d855b01d91c45e4d05db543b28cde5621ee6..a6cbc787e11e74ccdfc5757259fece768c8b22f7 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ local_env.sh coverage_html_report files/staging files/persisted_data +docker/coexecutor/*whl app.yml server.ini job_managers.ini diff --git a/Makefile b/Makefile index 6f1254ab1bcd50e19ed169c3bce7cf8cb85791e5..31bdbd54678c9dc2d56a143775c788b041a7d4e7 100644 --- a/Makefile +++ b/Makefile @@ -45,6 +45,7 @@ clean-build: rm -fr build/ rm -fr dist/ rm -fr pulsar.*.egg-info + rm -rf docker/coexecutor/*whl clean-pyc: find . -name '*.pyc' -exec rm -f {} + diff --git a/docker/coexecutor/Dockerfile b/docker/coexecutor/Dockerfile index 058b8008f4ae010740dfeadcbc721e5fc0aec7ce..7d55e69a58bce36a1379b3d089153b4d15434ba7 100644 --- a/docker/coexecutor/Dockerfile +++ b/docker/coexecutor/Dockerfile @@ -15,10 +15,8 @@ RUN apt-get update \ && apt-get autoclean \ && rm -rf /var/lib/apt/lists/* /var/log/dpkg.log -RUN pip install -U pip && pip install wheel +RUN pip install -U pip && pip install wheel kombu pykube poster ADD pulsar_app-*.dev0-py2.py3-none-any.whl /pulsar_app-*.dev0-py2.py3-none-any.whl RUN pip install /pulsar_app-*.dev0-py2.py3-none-any.whl - -RUN pip install kombu pykube poster diff --git a/docs/files/file_actions_sample_1.yaml b/docs/files/file_actions_sample_1.yaml index 2851d2c51547fe9b3812616a91216cafcddbd195..c523f7a837033622a3b4b5f450814caf00512b2c 100644 --- a/docs/files/file_actions_sample_1.yaml +++ b/docs/files/file_actions_sample_1.yaml @@ -19,7 +19,7 @@ paths: match_type: glob path_types: unstructured # Set to *any* to apply to defaults & unstructured paths. action: transfer - depth: 1 # Stage whole directory with job and just file. + depth: 1 # Stage whole directory with job and not just file. # Following block demonstrates rewriting paths without staging. Useful for # instance if Galaxy's data indices are mounted on both servers but with @@ -38,3 +38,16 @@ paths: ssh_user: galaxy ssh_host: f.q.d.n ssh_port: 22 + +# See action_mapper.py for explaination of mapper types: +# - input: Galaxy input datasets and extra files. +# - config: Galaxy config and param files. +# - tool: Files from tool's tool_dir (for now just wrapper if available). +# - workdir: Input work dir files - e.g.task-split input file. +# - metadata: Input metadata files. +# - output: Galaxy output datasets in their final home. +# - output_workdir: Galaxy from_work_dir output paths and other files (e.g. galaxy.json) +# - output_metadata: Meta job and data files (e.g. Galaxy metadata generation files and +# metric instrumentation files) +# - unstructured: Other fixed tool parameter paths (likely coming from tool data, but not +# nessecarily). Not sure this is the best name... diff --git a/pulsar/client/__init__.py b/pulsar/client/__init__.py index 951cb3c7c3a2a0ca82e13ee74cd941c3e0151895..8ecdb1c128b38b0c8bdccf612370fd2f14cfabce 100644 --- a/pulsar/client/__init__.py +++ b/pulsar/client/__init__.py @@ -44,9 +44,14 @@ from .destination import url_to_destination_params from .exceptions import PulsarClientTransportError from .manager import build_client_manager from .path_mapper import PathMapper -from .staging import ClientJobDescription -from .staging import PulsarOutputs -from .staging import ClientOutputs +from .staging import ( + ClientJobDescription, + ClientInputs, + ClientInput, + ClientOutputs, + CLIENT_INPUT_PATH_TYPES, + PulsarOutputs, +) from .staging.down import finish_job from .staging.up import submit_job @@ -58,6 +63,10 @@ __all__ = [ 'submit_job', 'ClientJobDescription', 'PulsarOutputs', + 'ClientInput', + 'ClientInputs', + 'ClientOutputs', + 'CLIENT_INPUT_PATH_TYPES', 'ClientOutputs', 'PathMapper', 'PulsarClientTransportError', diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index e95dd0f10176bd3eada21bd8d2dd81c94de216af..dcb5941a75816ec36de706f13e9e5b5226cd1181 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -48,7 +48,7 @@ path_type = Bunch( TOOL="tool", # Input work dir files - e.g.task-split input file WORKDIR="workdir", - # Input work dir files - e.g. metadata files, etc.. + # Input metadata dir files - e.g. metadata files, etc.. METADATA="metadata", # Galaxy output datasets in their final home. OUTPUT="output", @@ -58,7 +58,7 @@ path_type = Bunch( # metric instrumentation files) OUTPUT_METADATA="output_metadata", # Other fixed tool parameter paths (likely coming from tool data, but not - # nessecarily). Not sure this is the best name... + # nessecarily). UNSTRUCTURED="unstructured", ) @@ -98,53 +98,62 @@ class FileActionMapper(object): ... mock_client = Bunch(default_file_action=default_action, action_config_path=f.name, files_endpoint=None) ... mapper = FileActionMapper(mock_client) ... as_dict = config=mapper.to_dict() - ... # print(as_dict["paths"]) ... mapper = FileActionMapper(config=as_dict) # Serialize and deserialize it to make sure still works ... unlink(f.name) ... return mapper >>> mapper = mapper_for(default_action='none', config_contents=json_string) >>> # Test first config line above, implicit path prefix mapper - >>> action = mapper.action('/opt/galaxy/tools/filters/catWrapper.py', 'input') + >>> action = mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input') >>> action.action_type == u'none' True >>> action.staging_needed False >>> # Test another (2nd) mapper, this one with a different action - >>> action = mapper.action('/galaxy/data/files/000/dataset_1.dat', 'input') + >>> action = mapper.action({'path': '/galaxy/data/files/000/dataset_1.dat'}, 'input') >>> action.action_type == u'transfer' True >>> action.staging_needed True >>> # Always at least copy work_dir outputs. - >>> action = mapper.action('/opt/galaxy/database/working_directory/45.sh', 'workdir') + >>> action = mapper.action({'path': '/opt/galaxy/database/working_directory/45.sh'}, 'workdir') >>> action.action_type == u'copy' True >>> action.staging_needed True >>> # Test glob mapper (matching test) - >>> mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam', 'input').action_type == u'copy' + >>> mapper.action({'path': '/cool/bamfiles/projectABC/study1/patient3.bam'}, 'input').action_type == u'copy' True >>> # Test glob mapper (non-matching test) - >>> mapper.action('/cool/bamfiles/projectABC/study1/patient3.bam.bai', 'input').action_type == u'none' + >>> mapper.action({'path': '/cool/bamfiles/projectABC/study1/patient3.bam.bai'}, 'input').action_type == u'none' True >>> # Regex mapper test. - >>> mapper.action('/old/galaxy/data/dataset_10245.dat', 'input').action_type == u'copy' + >>> mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'input').action_type == u'copy' True >>> # Doesn't map unstructured paths by default - >>> mapper.action('/old/galaxy/data/dataset_10245.dat', 'unstructured').action_type == u'none' + >>> mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'unstructured').action_type == u'none' True >>> input_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \ {"path": "/", "action": "transfer", "path_types": "input"} \ ] }''') - >>> input_only_mapper.action('/dataset_1.dat', 'input').action_type == u'transfer' + >>> input_only_mapper.action({'path': '/dataset_1.dat'}, 'input').action_type == u'transfer' True - >>> input_only_mapper.action('/dataset_1.dat', 'output').action_type == u'none' + >>> input_only_mapper.action({'path': '/dataset_1.dat'}, 'output').action_type == u'none' True >>> unstructured_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \ {"path": "/", "action": "transfer", "path_types": "*any*"} \ ] }''') - >>> unstructured_mapper.action('/old/galaxy/data/dataset_10245.dat', 'unstructured').action_type == u'transfer' + >>> unstructured_mapper.action({'path': '/old/galaxy/data/dataset_10245.dat'}, 'unstructured').action_type == u'transfer' True + >>> match_type_only_mapper = mapper_for(default_action="none", config_contents=r'''{"paths": [ \ + {"action": "transfer", "path_types": "input"}, \ + {"action": "remote_copy", "path_types": "output"} \ + ] }''') + >>> input_action = match_type_only_mapper.action({}, 'input') + >>> input_action.action_type + 'transfer' + >>> output_action = match_type_only_mapper.action({}, 'output') + >>> output_action.action_type + 'remote_copy' """ def __init__(self, client=None, config=None): @@ -161,7 +170,8 @@ class FileActionMapper(object): self.mappers = mappers_from_dicts(config.get("paths", [])) self.files_endpoint = config.get("files_endpoint", None) - def action(self, path, type, mapper=None): + def action(self, source, type, mapper=None): + path = source.get("path", None) mapper = self.__find_mapper(path, type, mapper) action_class = self.__action_class(path, type, mapper) file_lister = DEFAULT_FILE_LISTER @@ -205,7 +215,10 @@ class FileActionMapper(object): def __find_mapper(self, path, type, mapper=None): if not mapper: - normalized_path = abspath(path) + if path is not None: + normalized_path = abspath(path) + else: + normalized_path = None for query_mapper in self.mappers: if query_mapper.matches(normalized_path, type): mapper = query_mapper @@ -262,6 +275,7 @@ UNSET_ACTION_KWD = "__UNSET__" class BaseAction(object): + whole_directory_transfer_supported = False action_spec = {} def __init__(self, path, file_lister=None): @@ -608,6 +622,22 @@ class BasePathMapper(object): base_dict.update(**kwds) return base_dict + def to_pattern(self): + raise NotImplementedError() + + +class PathTypeOnlyMapper(BasePathMapper): + match_type = 'path_type_only' + + def __init__(self, config): + super(PathTypeOnlyMapper, self).__init__(config) + + def _path_matches(self, path): + return True + + def to_dict(self): + return self._extend_base_dict() + class PrefixPathMapper(BasePathMapper): match_type = 'prefix' @@ -617,7 +647,7 @@ class PrefixPathMapper(BasePathMapper): self.prefix_path = abspath(config['path']) def _path_matches(self, path): - return path.startswith(self.prefix_path) + return path is not None and path.startswith(self.prefix_path) def to_pattern(self): pattern_str = r"(%s%s[^\s,\"\']+)" % (escape(self.prefix_path), escape(sep)) @@ -635,7 +665,7 @@ class GlobPathMapper(BasePathMapper): self.glob_path = config['path'] def _path_matches(self, path): - return fnmatch.fnmatch(path, self.glob_path) + return path is not None and fnmatch.fnmatch(path, self.glob_path) def to_pattern(self): return compile(fnmatch.translate(self.glob_path)) @@ -653,7 +683,7 @@ class RegexPathMapper(BasePathMapper): self.pattern = compile(self.pattern_raw) def _path_matches(self, path): - return self.pattern.match(path) is not None + return path is not None and self.pattern.match(path) is not None def to_pattern(self): return self.pattern @@ -662,7 +692,7 @@ class RegexPathMapper(BasePathMapper): return self._extend_base_dict(path=self.pattern_raw) -MAPPER_CLASSES = [PrefixPathMapper, GlobPathMapper, RegexPathMapper] +MAPPER_CLASSES = [PathTypeOnlyMapper, PrefixPathMapper, GlobPathMapper, RegexPathMapper] MAPPER_CLASS_DICT = dict(map(lambda c: (c.match_type, c), MAPPER_CLASSES)) @@ -671,7 +701,10 @@ def mappers_from_dicts(mapper_def_list): def _mappper_from_dict(mapper_dict): - map_type = mapper_dict.get('match_type', DEFAULT_PATH_MAPPER_TYPE) + if "path" in mapper_dict: + map_type = mapper_dict.get('match_type', DEFAULT_PATH_MAPPER_TYPE) + else: + map_type = 'path_type_only' return MAPPER_CLASS_DICT[map_type](mapper_dict) diff --git a/pulsar/client/client.py b/pulsar/client/client.py index cfe7ccf23e8fab24a268a109eb795b1e8f286984..e8e6184ee1f8b854bbb33b6209dcb0d2743de7de 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -365,7 +365,7 @@ class MessageCoexecutionPodJobClient(BaseMessageJobClient): def __init__(self, destination_params, job_id, client_manager): super(MessageCoexecutionPodJobClient, self).__init__(destination_params, job_id, client_manager) - self.pulsar_container_image = destination_params.get("pulsar_container_image", "galaxy/pulsar-pod-staging:0.1") + self.pulsar_container_image = destination_params.get("pulsar_container_image", "galaxy/pulsar-pod-staging:0.10.0") self._default_pull_policy = pull_policy(destination_params) def launch(self, command_line, dependencies_description=None, env=[], remote_staging=[], job_config=None, container=None, pulsar_app_config=None): diff --git a/pulsar/client/path_mapper.py b/pulsar/client/path_mapper.py index 3383ed3985204ce1ca13df750526c103a081dff5..eb7ed09d6de53d5c07f10bf1355952095c3fd572 100644 --- a/pulsar/client/path_mapper.py +++ b/pulsar/client/path_mapper.py @@ -4,6 +4,7 @@ from galaxy.util import in_directory from .action_mapper import FileActionMapper from .action_mapper import path_type +from .staging import CLIENT_INPUT_PATH_TYPES from .util import PathHelper @@ -46,8 +47,11 @@ class PathMapper(object): remote_path = self.__remote_path_rewrite(local_path, output_type) return remote_path - def remote_input_path_rewrite(self, local_path): - remote_path = self.__remote_path_rewrite(local_path, path_type.INPUT) + def remote_input_path_rewrite(self, local_path, client_input_path_type=None): + name = None + if client_input_path_type == CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH: + name = "metadata_%s" % os.path.basename(local_path) + remote_path = self.__remote_path_rewrite(local_path, path_type.INPUT, name=name) return remote_path def remote_version_path_rewrite(self, local_path): @@ -56,7 +60,7 @@ class PathMapper(object): def check_for_arbitrary_rewrite(self, local_path): path = str(local_path) # Use false_path if needed. - action = self.action_mapper.action(path, path_type.UNSTRUCTURED) + action = self.action_mapper.action({"path": path}, path_type.UNSTRUCTURED) if not action.staging_needed: return action.path_rewrite(self.path_helper), [] unique_names = action.unstructured_map() @@ -68,7 +72,7 @@ class PathMapper(object): """ Return remote path of this file (if staging is required) else None. """ path = str(dataset_path) # Use false_path if needed. - action = self.action_mapper.action(path, dataset_path_type) + action = self.action_mapper.action({"path": path}, dataset_path_type) if action.staging_needed: if name is None: name = os.path.basename(path) @@ -81,11 +85,6 @@ class PathMapper(object): return remote_path_rewrite - def __action(self, dataset_path, dataset_path_type): - path = str(dataset_path) # Use false_path if needed. - action = self.action_mapper.action(path, dataset_path_type) - return action - def __remote_directory(self, dataset_path_type): if dataset_path_type in [path_type.OUTPUT]: return self.output_directory diff --git a/pulsar/client/staging/__init__.py b/pulsar/client/staging/__init__.py index 8be7d2bc78cfe150e6b3cdce4e4871adc9aa5684..a141968f764fc0264fe1f41d5895a7d9951e6e1b 100644 --- a/pulsar/client/staging/__init__.py +++ b/pulsar/client/staging/__init__.py @@ -5,9 +5,12 @@ from os import sep from os.path import ( basename, dirname, + exists, join, ) +from galaxy.util.bunch import Bunch + from ..util import PathHelper COMMAND_VERSION_FILENAME = "COMMAND_VERSION" @@ -60,8 +63,9 @@ class ClientJobDescription(object): self, command_line, tool=None, - config_files=[], - input_files=[], + config_files=None, + input_files=None, + client_inputs=None, client_outputs=None, working_directory=None, metadata_directory=None, @@ -75,8 +79,12 @@ class ClientJobDescription(object): ): self.tool = tool self.command_line = command_line - self.config_files = config_files - self.input_files = input_files + self.config_files = config_files or [] + if input_files is not None: + # Deprecated input but provided for backward compatibility. + assert client_inputs is None + client_inputs = ClientInputs.for_simple_input_paths(input_files) + self.client_inputs = client_inputs or ClientInputs([]) self.client_outputs = client_outputs or ClientOutputs() self.working_directory = working_directory self.metadata_directory = metadata_directory @@ -88,6 +96,15 @@ class ClientJobDescription(object): self.container = container self.remote_pulsar_app_config = remote_pulsar_app_config + @property + def input_files(self): + # Deprecated but provided for backward compatibility. + input_files = [] + for client_input in self.client_inputs: + if client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_PATH: + input_files.append(client_input.path) + return input_files + @property def output_files(self): return self.client_outputs.output_files @@ -106,6 +123,47 @@ class ClientJobDescription(object): ) +class ClientInputs(object): + """Abstraction describing input datasets for a job.""" + + def __init__(self, client_inputs): + self.client_inputs = client_inputs + + def __iter__(self): + return iter(self.client_inputs) + + @staticmethod + def for_simple_input_paths(input_files): + # Legacy: just assume extra files path based on inputs, probably not + # the best behavior - ignores object store for instance. + client_inputs = [] + for input_file in input_files: + client_inputs.append(ClientInput(input_file, CLIENT_INPUT_PATH_TYPES.INPUT_PATH)) + files_path = "%s_files" % input_file[0:-len(".dat")] + if exists(files_path): + client_inputs.append(ClientInput(files_path, CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH)) + + return ClientInputs(client_inputs) + + +CLIENT_INPUT_PATH_TYPES = Bunch( + INPUT_PATH="input_path", + INPUT_EXTRA_FILES_PATH="input_extra_files_path", + INPUT_METADATA_PATH="input_metadata_path", +) + + +class ClientInput(object): + + def __init__(self, path, input_type): + self.path = path + self.input_type = input_type + + @property + def action_source(self): + return {"path": self.path} + + class ClientOutputs(object): """ Abstraction describing the output datasets EXPECTED by the Galaxy job runner client. diff --git a/pulsar/client/staging/down.py b/pulsar/client/staging/down.py index d6aad82bdf66b8e9802fd3ca25aa31d1ab057867..8bd9400109aad92b95a4b4828587f4b39973aaa6 100644 --- a/pulsar/client/staging/down.py +++ b/pulsar/client/staging/down.py @@ -135,7 +135,7 @@ class ResultsCollector(object): # path. collected = False with self.exception_tracker(): - action = self.action_mapper.action(path, output_type) + action = self.action_mapper.action({"path": path}, output_type) if self._collect_output(output_type, action, name): collected = True diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index 0143ce5e34d0da72f2ae7b1d667d8bf92bdc01fb..10e7b6847650d95a35382606ea4f5be5488b59fb 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -1,12 +1,10 @@ from io import open from logging import getLogger -from os import listdir, sep +from os import sep from os.path import ( abspath, basename, - dirname, exists, - isfile, join, relpath, ) @@ -16,7 +14,7 @@ from ..action_mapper import FileActionMapper from ..action_mapper import MessageAction from ..action_mapper import path_type from ..job_directory import RemoteJobDirectory -from ..staging import COMMAND_VERSION_FILENAME +from ..staging import CLIENT_INPUT_PATH_TYPES, COMMAND_VERSION_FILENAME from ..util import directory_files from ..util import PathHelper @@ -75,7 +73,7 @@ class FileStager(object): self.client = client self.command_line = client_job_description.command_line self.config_files = client_job_description.config_files - self.input_files = client_job_description.input_files + self.client_inputs = client_job_description.client_inputs self.output_files = client_job_description.output_files if client_job_description.tool is not None: self.tool_id = client_job_description.tool.id @@ -197,69 +195,65 @@ class FileStager(object): if path not in referenced_arbitrary_path_mappers: referenced_arbitrary_path_mappers[path] = mapper for path, mapper in referenced_arbitrary_path_mappers.items(): - action = self.action_mapper.action(path, path_type.UNSTRUCTURED, mapper) + action = self.action_mapper.action({"path": path}, path_type.UNSTRUCTURED, mapper) unstructured_map = action.unstructured_map(self.path_helper) self.arbitrary_files.update(unstructured_map) def __upload_tool_files(self): for referenced_tool_file in self.referenced_tool_files: - self.transfer_tracker.handle_transfer(referenced_tool_file, path_type.TOOL) + self.transfer_tracker.handle_transfer_path(referenced_tool_file, path_type.TOOL) def __upload_arbitrary_files(self): for path, name in self.arbitrary_files.items(): - self.transfer_tracker.handle_transfer(path, path_type.UNSTRUCTURED, name=name) + self.transfer_tracker.handle_transfer_path(path, path_type.UNSTRUCTURED, name=name) def __upload_input_files(self): handled_inputs = set() - for input_file in self.input_files: - if input_file in handled_inputs: + for client_input in self.client_inputs: + # TODO: use object identity to handle this. + path = client_input.path + if path in handled_inputs: continue - self.__upload_input_file(input_file) - self.__upload_input_extra_files(input_file) - handled_inputs.add(input_file) - - def __upload_input_file(self, input_file): - if self.__stage_input(input_file): - if exists(input_file): - self.transfer_tracker.handle_transfer(input_file, path_type.INPUT) + + if client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_PATH: + self.__upload_input_file(client_input.action_source) + handled_inputs.add(path) + elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH: + self.__upload_input_extra_files(client_input.action_source) + handled_inputs.add(path) + elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH: + self.__upload_input_metadata_file(client_input.action_source) + handled_inputs.add(path) else: - message = "Pulsar: __upload_input_file called on empty or missing dataset." + \ - " So such file: [%s]" % input_file - log.debug(message) - - def __upload_input_extra_files(self, input_file): - files_path = "%s_files" % input_file[0:-len(".dat")] - if exists(files_path) and self.__stage_input(files_path): - for extra_file_name in directory_files(files_path): - extra_file_path = join(files_path, extra_file_name) - remote_name = self.path_helper.remote_name(relpath(extra_file_path, dirname(files_path))) - self.transfer_tracker.handle_transfer(extra_file_path, path_type.INPUT, name=remote_name) + raise NotImplementedError() + + def __upload_input_file(self, input_action_source): + if self.__stage_input(input_action_source): + self.transfer_tracker.handle_transfer_source(input_action_source, path_type.INPUT) + + def __upload_input_extra_files(self, input_action_source): + if self.__stage_input(input_action_source): + # TODO: needs to happen else where if using remote object store staging + # but we don't have the action type yet. + self.transfer_tracker.handle_transfer_directory(path_type.INPUT, action_source=input_action_source) + + def __upload_input_metadata_file(self, input_action_source): + if self.__stage_input(input_action_source): + # Name must match what is generated in remote_input_path_rewrite in path_mapper. + remote_name = "metadata_%s" % basename(input_action_source['path']) + self.transfer_tracker.handle_transfer_source(input_action_source, path_type.INPUT, name=remote_name) def __upload_working_directory_files(self): # Task manager stages files into working directory, these need to be # uploaded if present. - working_directory_files = self.__working_directory_files() - for working_directory_file in working_directory_files: - path = join(self.working_directory, working_directory_file) - self.transfer_tracker.handle_transfer(path, path_type.WORKDIR) + directory = self.working_directory + if directory and exists(directory): + self.transfer_tracker.handle_transfer_directory(path_type.WORKDIR, directory=directory) def __upload_metadata_directory_files(self): - metadata_directory_files = self.__metadata_directory_files() - for metadata_directory_file in metadata_directory_files: - path = join(self.metadata_directory, metadata_directory_file) - self.transfer_tracker.handle_transfer(path, path_type.METADATA) - - def __working_directory_files(self): - return self.__list_files(self.working_directory) - - def __metadata_directory_files(self): - return self.__list_files(self.metadata_directory) - - def __list_files(self, directory): + directory = self.metadata_directory if directory and exists(directory): - return [f for f in listdir(directory) if isfile(join(directory, f))] - else: - return [] + self.transfer_tracker.handle_transfer_directory(path_type.METADATA, directory=directory) def __initialize_version_file_rename(self): version_file = self.version_file @@ -293,7 +287,7 @@ class FileStager(object): def __upload_rewritten_config_files(self): for config_file, new_config_contents in self.job_inputs.config_files.items(): - self.transfer_tracker.handle_transfer(config_file, type=path_type.CONFIG, contents=new_config_contents) + self.transfer_tracker.handle_transfer_path(config_file, type=path_type.CONFIG, contents=new_config_contents) def get_command_line(self): """ @@ -302,10 +296,13 @@ class FileStager(object): """ return self.job_inputs.command_line - def __stage_input(self, file_path): + def __stage_input(self, source): + if not self.rewrite_paths: + return True + # If we have disabled path rewriting, just assume everything needs to be transferred, # else check to ensure the file is referenced before transferring it. - return (not self.rewrite_paths) or self.job_inputs.path_referenced(file_path) + return self.job_inputs.path_referenced(source['path']) class JobInputs(object): @@ -422,20 +419,58 @@ class TransferTracker(object): self.file_renames = {} self.remote_staging_actions = [] - def handle_transfer(self, path, type, name=None, contents=None): - action = self.__action_for_transfer(path, type, contents) + def handle_transfer_path(self, path, type, name=None, contents=None): + source = {"path": path} + return self.handle_transfer_source(source, type, name=name, contents=contents) + + def handle_transfer_directory(self, type, directory=None, action_source=None): + # TODO: needs to happen else where if using remote object store staging + # but we don't have the action type yet. + if directory is None: + assert action_source is not None + action = self.__action_for_transfer(action_source, type, None) + if not action.staging_action_local and action.whole_directory_transfer_supported: + # If we're going to transfer the whole directory remotely, don't walk the files + # here. + + # We could still rewrite paths and just not transfer the files. + assert not self.rewrite_paths + self.__add_remote_staging_input(self, action, None, type) + return + + directory = action_source['path'] + else: + assert action_source is None + + for directory_file_name in directory_files(directory): + directory_file_path = join(directory, directory_file_name) + remote_name = self.path_helper.remote_name(relpath(directory_file_path, directory)) + self.handle_transfer_path(directory_file_path, type, name=remote_name) + + def handle_transfer_source(self, source, type, name=None, contents=None): + action = self.__action_for_transfer(source, type, contents) if action.staging_needed: local_action = action.staging_action_local if local_action: + path = source['path'] + if not exists(path): + message = "Pulsar: __upload_input_file called on empty or missing dataset." + \ + " No such file: [%s]" % path + log.debug(message) + return + response = self.client.put_file(path, type, name=name, contents=contents, action_type=action.action_type) def get_path(): return response['path'] else: + path = source['path'] job_directory = self.job_directory assert job_directory, "job directory required for action %s" % action if not name: + # TODO: consider fetching this from source so an actual input path + # isn't needed. At least it isn't used though. name = basename(path) self.__add_remote_staging_input(action, name, type) @@ -443,11 +478,11 @@ class TransferTracker(object): return job_directory.calculate_path(name, type) register = self.rewrite_paths or type == 'tool' # Even if inputs not rewritten, tool must be. if register: - self.register_rewrite(path, get_path(), type, force=True) + self.register_rewrite_action(action, get_path(), force=True) elif self.rewrite_paths: path_rewrite = action.path_rewrite(self.path_helper) if path_rewrite: - self.register_rewrite(path, path_rewrite, type, force=True) + self.register_rewrite_action(action, path_rewrite, force=True) # else: # No action for this file @@ -459,23 +494,29 @@ class TransferTracker(object): ) self.remote_staging_actions.append(input_dict) - def __action_for_transfer(self, path, type, contents): + def __action_for_transfer(self, source, type, contents): if contents: # If contents loaded in memory, no need to write out file and copy, # just transfer. action = MessageAction(contents=contents, client=self.client) else: - if not exists(path): - message = "handle_transfer called on non-existent file - [%s]" % path + path = source.get("path") + if path is not None and not exists(path): + message = "__action_for_transfer called on non-existent file - [%s]" % path log.warn(message) raise Exception(message) - action = self.__action(path, type) + action = self.__action(source, type) return action def register_rewrite(self, local_path, remote_path, type, force=False): - action = self.__action(local_path, type) + action = self.__action({"path": local_path}, type) + self.register_rewrite_action(action, remote_path, force=force) + + def register_rewrite_action(self, action, remote_path, force=False): if action.staging_needed or force: - self.file_renames[local_path] = remote_path + path = getattr(action, 'path', None) + if path: + self.file_renames[path] = remote_path def rewrite_input_paths(self): """ @@ -485,8 +526,8 @@ class TransferTracker(object): for local_path, remote_path in self.file_renames.items(): self.job_inputs.rewrite_paths(local_path, remote_path) - def __action(self, path, type): - return self.action_mapper.action(path, type) + def __action(self, source, type): + return self.action_mapper.action(source, type) def _read(path): diff --git a/pulsar/client/test/check.py b/pulsar/client/test/check.py index dbc4550ee049c2b0092605ac022ab94a89bd93a3..b47f4a1adb6c52ed64bdf9b26edfd31600e44f73 100644 --- a/pulsar/client/test/check.py +++ b/pulsar/client/test/check.py @@ -17,14 +17,23 @@ import traceback from collections import namedtuple from io import open -from galaxy.tools.deps.dependencies import DependenciesDescription -from galaxy.tools.deps.requirements import ToolRequirement +try: + # If galaxy-lib or Galaxy 19.05 present. + from galaxy.tools.deps.dependencies import DependenciesDescription + from galaxy.tools.deps.requirements import ToolRequirement +except ImportError: + # If galaxy-tool-util or Galaxy 19.09 present. + from galaxy.tool_util.deps.dependencies import DependenciesDescription + from galaxy.tool_util.deps.requirements import ToolRequirement from six import binary_type from pulsar.client import ( build_client_manager, ClientJobDescription, + ClientInputs, + ClientInput, ClientOutputs, + CLIENT_INPUT_PATH_TYPES, finish_job, PulsarOutputs, submit_job, @@ -73,6 +82,7 @@ try: assert_path_contents(sys.argv[2], "Hello world input!!@!") assert_path_contents(sys.argv[8], "INPUT_EXTRA_CONTENTS") assert_path_contents(sys.argv[13], "meta input") + assert_path_contents(sys.argv[14], "INPUT METADATA CONTENTS...") contents = config_input.read(1024) output.write(contents) open("workdir_output", "w").write("WORK DIR OUTPUT") @@ -149,6 +159,7 @@ def run(options): temp_input_path = os.path.join(temp_directory, "dataset_0.dat") temp_input_extra_path = os.path.join(temp_directory, "dataset_0_files", "input_subdir", "extra") + temp_input_metadata_path = os.path.join(temp_directory, "metadata", "12312231231231.dat") temp_index_path = os.path.join(temp_index_dir, "human.fa") temp_config_path = os.path.join(temp_work_dir, "config.txt") @@ -167,6 +178,7 @@ def run(options): __write_to_file(temp_input_path, b"Hello world input!!@!") __write_to_file(temp_input_extra_path, b"INPUT_EXTRA_CONTENTS") + __write_to_file(temp_input_metadata_path, b"INPUT METADATA CONTENTS...") __write_to_file(temp_config_path, EXPECTED_OUTPUT) __write_to_file(temp_metadata_path, "meta input") __write_to_file(temp_tool_path, TEST_SCRIPT) @@ -194,11 +206,20 @@ def run(options): temp_output4_path, temp_shared_dir, temp_metadata_path, + temp_input_metadata_path, ) assert os.path.exists(temp_index_path) - command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params + command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params config_files = [temp_config_path] - input_files = [temp_input_path, temp_input_path, empty_input] + client_inputs = [] + client_inputs.append(ClientInput(temp_input_path, CLIENT_INPUT_PATH_TYPES.INPUT_PATH)) + client_inputs.append(ClientInput(temp_input_path, CLIENT_INPUT_PATH_TYPES.INPUT_PATH)) + # Reverting empty input handling added in: + # https://github.com/galaxyproject/pulsar/commit/2fb36ba979cf047a595c53cdef833cae79cbb380 + # Seems like it really should cause a failure. + # client_inputs.append(ClientInput(empty_input, CLIENT_INPUT_PATH_TYPES.INPUT_PATH)) + client_inputs.append(ClientInput(os.path.join(temp_directory, "dataset_0_files"), CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH)) + client_inputs.append(ClientInput(temp_input_metadata_path, CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH)) output_files = [ temp_output_path, temp_output2_path, @@ -224,7 +245,7 @@ def run(options): command_line=command_line, tool=MockTool(temp_tool_dir), config_files=config_files, - input_files=input_files, + client_inputs=ClientInputs(client_inputs), client_outputs=client_outputs, working_directory=temp_work_dir, metadata_directory=temp_metadata_dir, diff --git a/pulsar/core.py b/pulsar/core.py index db9818bf106353e9d778d8bd20382c86dac182ce..ee8f4f7d654572ee63170d11b276b09ba34bc149 100644 --- a/pulsar/core.py +++ b/pulsar/core.py @@ -9,8 +9,18 @@ from pulsar.tools import ToolBox from pulsar.tools.authorization import get_authorizer from pulsar import messaging from galaxy.objectstore import build_object_store_from_config -from galaxy.tools.deps import DependencyManager -from galaxy.jobs.metrics import JobMetrics +try: + # If galaxy-lib or Galaxy <19.05 present. + from galaxy.tools.deps import DependencyManager +except ImportError: + # If galaxy-tool-util or Galaxy >=19.09 present. + from galaxy.tool_util.deps import DependencyManager +try: + # If galaxy-lib or Galaxy <19.05 present. + from galaxy.jobs.metrics import JobMetrics +except ImportError: + # If galaxy-job-metrics or Galaxy >=19.09 present. + from galaxy.job_metrics import JobMetrics from galaxy.util.bunch import Bunch from logging import getLogger diff --git a/pulsar/managers/stateful.py b/pulsar/managers/stateful.py index dcee9c9c353ff20c47cb30c7c523e03f51d4a20d..bff96998c704cb1708c9e014527d11fa9b745ed8 100644 --- a/pulsar/managers/stateful.py +++ b/pulsar/managers/stateful.py @@ -6,7 +6,12 @@ import os import time import threading -from galaxy.tools.deps import dependencies +try: + # If galaxy-lib or Galaxy 19.05 present. + from galaxy.tools.deps.dependencies import DependenciesDescription +except ImportError: + # If galaxy-tool-util or Galaxy 19.09 present. + from galaxy.tool_util.deps.dependencies import DependenciesDescription from pulsar.client.util import filter_destination_params from pulsar.managers import ManagerProxy @@ -109,7 +114,7 @@ class StatefulManagerProxy(ManagerProxy): yield launch_kwds = {} if launch_config.get("dependencies_description"): - dependencies_description = dependencies.DependenciesDescription.from_dict(launch_config["dependencies_description"]) + dependencies_description = DependenciesDescription.from_dict(launch_config["dependencies_description"]) launch_kwds["dependencies_description"] = dependencies_description for kwd in ["submit_params", "setup_params", "env"]: if kwd in launch_config: diff --git a/test/action_mapper_test.py b/test/action_mapper_test.py index 696f87f79c0322c6b33a71e5c097ea13bed6d440..2131cb659a10f3ac64e145d6ead5e57ef4690563 100644 --- a/test/action_mapper_test.py +++ b/test/action_mapper_test.py @@ -9,7 +9,7 @@ def test_endpoint_validation(): mapper = FileActionMapper(client) exception_found = False try: - mapper.action('/opt/galaxy/tools/filters/catWrapper.py', 'input') + mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input') except Exception as e: exception_found = True assert "files_endpoint" in str(e) @@ -21,7 +21,7 @@ def test_ssh_key_validation(): mapper = FileActionMapper(client) exception_found = False try: - mapper.action('/opt/galaxy/tools/filters/catWrapper.py', 'input') + mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input') except Exception as e: exception_found = True assert "ssh_key" in str(e) @@ -31,7 +31,7 @@ def test_ssh_key_validation(): def test_ssh_key_defaults(): client = _client("remote_rsync_transfer") mapper = FileActionMapper(client) - action = mapper.action('/opt/galaxy/tools/filters/catWrapper.py', 'input') + action = mapper.action({'path': '/opt/galaxy/tools/filters/catWrapper.py'}, 'input') action.to_dict() diff --git a/test/client_staging_test.py b/test/client_staging_test.py index d6ae5155bbac286d34907f3a030916a8ef09cbed..cc570557e9ea7c36f40fbe4510d4005609994919 100644 --- a/test/client_staging_test.py +++ b/test/client_staging_test.py @@ -45,6 +45,7 @@ class TestStager(TempDirectoryTestCase): os.makedirs(files_directory) self.input1 = os.path.join(files_directory, "dataset_1.dat") self.input1_files_path = os.path.join(files_directory, "dataset_1_files") + os.makedirs(self.input1_files_path) open(self.input1, "wb").write(b"012345") self.input2 = os.path.join(files_directory, "dataset_2.dat") open(self.input2, "wb").write(b"6789") diff --git a/test/path_mapper_test.py b/test/path_mapper_test.py index 58e5fb86411f9025397b28da103ff1e83196e416..e668f585cc42947009431c08621030a240f53846 100644 --- a/test/path_mapper_test.py +++ b/test/path_mapper_test.py @@ -75,7 +75,7 @@ class TestActionMapper(object): if not staging_needed: self._action.path_rewrite = lambda path: None - def action(self, path, type): - assert self.expected_path == path + def action(self, source, type): + assert self.expected_path == source["path"] assert self.expected_type == type return self._action