Skip to content
Snippets Groups Projects
Commit f477bc4a authored by John Chilton's avatar John Chilton
Browse files

Refactor toward more structured inputs.

parent f31a1525
No related branches found
No related tags found
No related merge requests found
......@@ -5,9 +5,12 @@ from os import sep
from os.path import (
basename,
dirname,
exists,
join,
)
from galaxy.util.bunch import Bunch
from ..util import PathHelper
COMMAND_VERSION_FILENAME = "COMMAND_VERSION"
......@@ -60,8 +63,9 @@ class ClientJobDescription(object):
self,
command_line,
tool=None,
config_files=[],
input_files=[],
config_files=None,
input_files=None,
client_inputs=None,
client_outputs=None,
working_directory=None,
metadata_directory=None,
......@@ -75,8 +79,12 @@ class ClientJobDescription(object):
):
self.tool = tool
self.command_line = command_line
self.config_files = config_files
self.input_files = input_files
self.config_files = config_files or []
if input_files is not None:
# Deprecated input but provided for backward compatibility.
assert client_inputs is None
client_inputs = ClientInputs.for_simple_input_paths(input_files)
self.client_inputs = client_inputs
self.client_outputs = client_outputs or ClientOutputs()
self.working_directory = working_directory
self.metadata_directory = metadata_directory
......@@ -88,6 +96,15 @@ class ClientJobDescription(object):
self.container = container
self.remote_pulsar_app_config = remote_pulsar_app_config
@property
def input_files(self):
# Deprecated but provided for backward compatibility.
input_files = []
for client_input in self.client_inputs:
if client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_PATH:
input_files.append(client_input.path)
return input_files
@property
def output_files(self):
return self.client_outputs.output_files
......@@ -106,6 +123,43 @@ class ClientJobDescription(object):
)
class ClientInputs(object):
"""Abstraction describing input datasets for a job."""
def __init__(self, client_inputs):
self.client_inputs = client_inputs
def __iter__(self):
return iter(self.client_inputs)
@staticmethod
def for_simple_input_paths(input_files):
# Legacy: just assume extra files path based on inputs, probably not
# the best behavior - ignores object store for instance.
client_inputs = []
for input_file in input_files:
client_inputs.append(ClientInput(input_file, CLIENT_INPUT_PATH_TYPES.INPUT_PATH))
files_path = "%s_files" % input_file[0:-len(".dat")]
if exists(files_path):
client_inputs.append(ClientInput(input_file, CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH))
return ClientInputs(client_inputs)
CLIENT_INPUT_PATH_TYPES = Bunch(
INPUT_PATH="input_path",
INPUT_EXTRA_FILES_PATH="input_extra_files_path",
INPUT_METADATA_PATH="input_metadata_path",
)
class ClientInput(object):
def __init__(self, path, input_type):
self.path = path
self.input_type = input_type
class ClientOutputs(object):
""" Abstraction describing the output datasets EXPECTED by the Galaxy job
runner client.
......
......@@ -16,7 +16,7 @@ from ..action_mapper import FileActionMapper
from ..action_mapper import MessageAction
from ..action_mapper import path_type
from ..job_directory import RemoteJobDirectory
from ..staging import COMMAND_VERSION_FILENAME
from ..staging import CLIENT_INPUT_PATH_TYPES, COMMAND_VERSION_FILENAME
from ..util import directory_files
from ..util import PathHelper
......@@ -75,7 +75,7 @@ class FileStager(object):
self.client = client
self.command_line = client_job_description.command_line
self.config_files = client_job_description.config_files
self.input_files = client_job_description.input_files
self.client_inputs = client_job_description.client_inputs
self.output_files = client_job_description.output_files
if client_job_description.tool is not None:
self.tool_id = client_job_description.tool.id
......@@ -211,12 +211,19 @@ class FileStager(object):
def __upload_input_files(self):
handled_inputs = set()
for input_file in self.input_files:
if input_file in handled_inputs:
for client_input in self.client_inputs:
path = client_input.path
if path in handled_inputs:
continue
self.__upload_input_file(input_file)
self.__upload_input_extra_files(input_file)
handled_inputs.add(input_file)
if client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_PATH:
self.__upload_input_file(path)
handled_inputs.add(path)
elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH:
self.__upload_input_extra_files(path)
handled_inputs.add(path)
else:
# TODO: implement metadata...
raise NotImplementedError()
def __upload_input_file(self, input_file):
if self.__stage_input(input_file):
......@@ -224,12 +231,11 @@ class FileStager(object):
self.transfer_tracker.handle_transfer(input_file, path_type.INPUT)
else:
message = "Pulsar: __upload_input_file called on empty or missing dataset." + \
" So such file: [%s]" % input_file
" No such file: [%s]" % input_file
log.debug(message)
def __upload_input_extra_files(self, input_file):
files_path = "%s_files" % input_file[0:-len(".dat")]
if exists(files_path) and self.__stage_input(files_path):
def __upload_input_extra_files(self, files_path):
if self.__stage_input(files_path):
for extra_file_name in directory_files(files_path):
extra_file_path = join(files_path, extra_file_name)
remote_name = self.path_helper.remote_name(relpath(extra_file_path, dirname(files_path)))
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment