Skip to content
Snippets Groups Projects
Commit 39de377e authored by John Chilton's avatar John Chilton
Browse files

Implement staging metadata inputs in the client.

parent f477bc4a
No related branches found
No related tags found
No related merge requests found
......@@ -44,9 +44,14 @@ from .destination import url_to_destination_params
from .exceptions import PulsarClientTransportError
from .manager import build_client_manager
from .path_mapper import PathMapper
from .staging import ClientJobDescription
from .staging import PulsarOutputs
from .staging import ClientOutputs
from .staging import (
ClientJobDescription,
ClientInputs,
ClientInput,
ClientOutputs,
CLIENT_INPUT_PATH_TYPES,
PulsarOutputs,
)
from .staging.down import finish_job
from .staging.up import submit_job
......@@ -58,6 +63,10 @@ __all__ = [
'submit_job',
'ClientJobDescription',
'PulsarOutputs',
'ClientInput',
'ClientInputs',
'ClientOutputs',
'CLIENT_INPUT_PATH_TYPES',
'ClientOutputs',
'PathMapper',
'PulsarClientTransportError',
......
......@@ -4,6 +4,7 @@ from galaxy.util import in_directory
from .action_mapper import FileActionMapper
from .action_mapper import path_type
from .staging import CLIENT_INPUT_PATH_TYPES
from .util import PathHelper
......@@ -46,8 +47,11 @@ class PathMapper(object):
remote_path = self.__remote_path_rewrite(local_path, output_type)
return remote_path
def remote_input_path_rewrite(self, local_path):
remote_path = self.__remote_path_rewrite(local_path, path_type.INPUT)
def remote_input_path_rewrite(self, local_path, client_input_path_type=None):
name = None
if client_input_path_type == CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH:
name = "metadata_%s" % os.path.basename(local_path)
remote_path = self.__remote_path_rewrite(local_path, path_type.INPUT, name=name)
return remote_path
def remote_version_path_rewrite(self, local_path):
......
......@@ -84,7 +84,7 @@ class ClientJobDescription(object):
# Deprecated input but provided for backward compatibility.
assert client_inputs is None
client_inputs = ClientInputs.for_simple_input_paths(input_files)
self.client_inputs = client_inputs
self.client_inputs = client_inputs or ClientInputs([])
self.client_outputs = client_outputs or ClientOutputs()
self.working_directory = working_directory
self.metadata_directory = metadata_directory
......@@ -141,7 +141,7 @@ class ClientInputs(object):
client_inputs.append(ClientInput(input_file, CLIENT_INPUT_PATH_TYPES.INPUT_PATH))
files_path = "%s_files" % input_file[0:-len(".dat")]
if exists(files_path):
client_inputs.append(ClientInput(input_file, CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH))
client_inputs.append(ClientInput(files_path, CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH))
return ClientInputs(client_inputs)
......
......@@ -215,14 +215,17 @@ class FileStager(object):
path = client_input.path
if path in handled_inputs:
continue
if client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_PATH:
self.__upload_input_file(path)
handled_inputs.add(path)
elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH:
self.__upload_input_extra_files(path)
handled_inputs.add(path)
elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH:
self.__upload_input_metadata_file(path)
handled_inputs.add(path)
else:
# TODO: implement metadata...
raise NotImplementedError()
def __upload_input_file(self, input_file):
......@@ -241,6 +244,12 @@ class FileStager(object):
remote_name = self.path_helper.remote_name(relpath(extra_file_path, dirname(files_path)))
self.transfer_tracker.handle_transfer(extra_file_path, path_type.INPUT, name=remote_name)
def __upload_input_metadata_file(self, path):
if self.__stage_input(path):
# Name must match what is generated in remote_input_path_rewrite in path_mapper.
remote_name = "metadata_%s" % basename(path)
self.transfer_tracker.handle_transfer(path, path_type.INPUT, name=remote_name)
def __upload_working_directory_files(self):
# Task manager stages files into working directory, these need to be
# uploaded if present.
......
......@@ -24,7 +24,10 @@ from six import binary_type
from pulsar.client import (
build_client_manager,
ClientJobDescription,
ClientInputs,
ClientInput,
ClientOutputs,
CLIENT_INPUT_PATH_TYPES,
finish_job,
PulsarOutputs,
submit_job,
......@@ -73,6 +76,7 @@ try:
assert_path_contents(sys.argv[2], "Hello world input!!@!")
assert_path_contents(sys.argv[8], "INPUT_EXTRA_CONTENTS")
assert_path_contents(sys.argv[13], "meta input")
assert_path_contents(sys.argv[14], "INPUT METADATA CONTENTS...")
contents = config_input.read(1024)
output.write(contents)
open("workdir_output", "w").write("WORK DIR OUTPUT")
......@@ -149,6 +153,7 @@ def run(options):
temp_input_path = os.path.join(temp_directory, "dataset_0.dat")
temp_input_extra_path = os.path.join(temp_directory, "dataset_0_files", "input_subdir", "extra")
temp_input_metadata_path = os.path.join(temp_directory, "metadata", "12312231231231.dat")
temp_index_path = os.path.join(temp_index_dir, "human.fa")
temp_config_path = os.path.join(temp_work_dir, "config.txt")
......@@ -167,6 +172,7 @@ def run(options):
__write_to_file(temp_input_path, b"Hello world input!!@!")
__write_to_file(temp_input_extra_path, b"INPUT_EXTRA_CONTENTS")
__write_to_file(temp_input_metadata_path, b"INPUT METADATA CONTENTS...")
__write_to_file(temp_config_path, EXPECTED_OUTPUT)
__write_to_file(temp_metadata_path, "meta input")
__write_to_file(temp_tool_path, TEST_SCRIPT)
......@@ -194,11 +200,17 @@ def run(options):
temp_output4_path,
temp_shared_dir,
temp_metadata_path,
temp_input_metadata_path,
)
assert os.path.exists(temp_index_path)
command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params
command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params
config_files = [temp_config_path]
input_files = [temp_input_path, temp_input_path, empty_input]
client_inputs = []
client_inputs.append(ClientInput(temp_input_path, CLIENT_INPUT_PATH_TYPES.INPUT_PATH))
client_inputs.append(ClientInput(temp_input_path, CLIENT_INPUT_PATH_TYPES.INPUT_PATH))
client_inputs.append(ClientInput(empty_input, CLIENT_INPUT_PATH_TYPES.INPUT_PATH))
client_inputs.append(ClientInput(os.path.join(temp_directory, "dataset_0_files"), CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH))
client_inputs.append(ClientInput(temp_input_metadata_path, CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH))
output_files = [
temp_output_path,
temp_output2_path,
......@@ -224,7 +236,7 @@ def run(options):
command_line=command_line,
tool=MockTool(temp_tool_dir),
config_files=config_files,
input_files=input_files,
client_inputs=ClientInputs(client_inputs),
client_outputs=client_outputs,
working_directory=temp_work_dir,
metadata_directory=temp_metadata_dir,
......
......@@ -45,6 +45,7 @@ class TestStager(TempDirectoryTestCase):
os.makedirs(files_directory)
self.input1 = os.path.join(files_directory, "dataset_1.dat")
self.input1_files_path = os.path.join(files_directory, "dataset_1_files")
os.makedirs(self.input1_files_path)
open(self.input1, "wb").write(b"012345")
self.input2 = os.path.join(files_directory, "dataset_2.dat")
open(self.input2, "wb").write(b"6789")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment