diff --git a/pulsar/client/__init__.py b/pulsar/client/__init__.py index 951cb3c7c3a2a0ca82e13ee74cd941c3e0151895..8ecdb1c128b38b0c8bdccf612370fd2f14cfabce 100644 --- a/pulsar/client/__init__.py +++ b/pulsar/client/__init__.py @@ -44,9 +44,14 @@ from .destination import url_to_destination_params from .exceptions import PulsarClientTransportError from .manager import build_client_manager from .path_mapper import PathMapper -from .staging import ClientJobDescription -from .staging import PulsarOutputs -from .staging import ClientOutputs +from .staging import ( + ClientJobDescription, + ClientInputs, + ClientInput, + ClientOutputs, + CLIENT_INPUT_PATH_TYPES, + PulsarOutputs, +) from .staging.down import finish_job from .staging.up import submit_job @@ -58,6 +63,10 @@ __all__ = [ 'submit_job', 'ClientJobDescription', 'PulsarOutputs', + 'ClientInput', + 'ClientInputs', + 'ClientOutputs', + 'CLIENT_INPUT_PATH_TYPES', 'ClientOutputs', 'PathMapper', 'PulsarClientTransportError', diff --git a/pulsar/client/path_mapper.py b/pulsar/client/path_mapper.py index 3383ed3985204ce1ca13df750526c103a081dff5..dab809e56b3b59cdb21de65ae544379cfd0dfaf8 100644 --- a/pulsar/client/path_mapper.py +++ b/pulsar/client/path_mapper.py @@ -4,6 +4,7 @@ from galaxy.util import in_directory from .action_mapper import FileActionMapper from .action_mapper import path_type +from .staging import CLIENT_INPUT_PATH_TYPES from .util import PathHelper @@ -46,8 +47,11 @@ class PathMapper(object): remote_path = self.__remote_path_rewrite(local_path, output_type) return remote_path - def remote_input_path_rewrite(self, local_path): - remote_path = self.__remote_path_rewrite(local_path, path_type.INPUT) + def remote_input_path_rewrite(self, local_path, client_input_path_type=None): + name = None + if client_input_path_type == CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH: + name = "metadata_%s" % os.path.basename(local_path) + remote_path = self.__remote_path_rewrite(local_path, path_type.INPUT, name=name) return remote_path def remote_version_path_rewrite(self, local_path): diff --git a/pulsar/client/staging/__init__.py b/pulsar/client/staging/__init__.py index 17e257ea4ba48c737c32b5a3488b62adac15483b..453d276a9723f350d6260a2a1d532489074b426c 100644 --- a/pulsar/client/staging/__init__.py +++ b/pulsar/client/staging/__init__.py @@ -84,7 +84,7 @@ class ClientJobDescription(object): # Deprecated input but provided for backward compatibility. assert client_inputs is None client_inputs = ClientInputs.for_simple_input_paths(input_files) - self.client_inputs = client_inputs + self.client_inputs = client_inputs or ClientInputs([]) self.client_outputs = client_outputs or ClientOutputs() self.working_directory = working_directory self.metadata_directory = metadata_directory @@ -141,7 +141,7 @@ class ClientInputs(object): client_inputs.append(ClientInput(input_file, CLIENT_INPUT_PATH_TYPES.INPUT_PATH)) files_path = "%s_files" % input_file[0:-len(".dat")] if exists(files_path): - client_inputs.append(ClientInput(input_file, CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH)) + client_inputs.append(ClientInput(files_path, CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH)) return ClientInputs(client_inputs) diff --git a/pulsar/client/staging/up.py b/pulsar/client/staging/up.py index a6d8f8f28cf22e426127481e1f2dbf4164235ccb..f88d29b49ccdf6b488bcb08d2ade58077a9a1a1e 100644 --- a/pulsar/client/staging/up.py +++ b/pulsar/client/staging/up.py @@ -215,14 +215,17 @@ class FileStager(object): path = client_input.path if path in handled_inputs: continue + if client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_PATH: self.__upload_input_file(path) handled_inputs.add(path) elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH: self.__upload_input_extra_files(path) handled_inputs.add(path) + elif client_input.input_type == CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH: + self.__upload_input_metadata_file(path) + handled_inputs.add(path) else: - # TODO: implement metadata... raise NotImplementedError() def __upload_input_file(self, input_file): @@ -241,6 +244,12 @@ class FileStager(object): remote_name = self.path_helper.remote_name(relpath(extra_file_path, dirname(files_path))) self.transfer_tracker.handle_transfer(extra_file_path, path_type.INPUT, name=remote_name) + def __upload_input_metadata_file(self, path): + if self.__stage_input(path): + # Name must match what is generated in remote_input_path_rewrite in path_mapper. + remote_name = "metadata_%s" % basename(path) + self.transfer_tracker.handle_transfer(path, path_type.INPUT, name=remote_name) + def __upload_working_directory_files(self): # Task manager stages files into working directory, these need to be # uploaded if present. diff --git a/pulsar/client/test/check.py b/pulsar/client/test/check.py index dbc4550ee049c2b0092605ac022ab94a89bd93a3..5935c9c4cdd58cdbe95d9d9761a321251cf2cdba 100644 --- a/pulsar/client/test/check.py +++ b/pulsar/client/test/check.py @@ -24,7 +24,10 @@ from six import binary_type from pulsar.client import ( build_client_manager, ClientJobDescription, + ClientInputs, + ClientInput, ClientOutputs, + CLIENT_INPUT_PATH_TYPES, finish_job, PulsarOutputs, submit_job, @@ -73,6 +76,7 @@ try: assert_path_contents(sys.argv[2], "Hello world input!!@!") assert_path_contents(sys.argv[8], "INPUT_EXTRA_CONTENTS") assert_path_contents(sys.argv[13], "meta input") + assert_path_contents(sys.argv[14], "INPUT METADATA CONTENTS...") contents = config_input.read(1024) output.write(contents) open("workdir_output", "w").write("WORK DIR OUTPUT") @@ -149,6 +153,7 @@ def run(options): temp_input_path = os.path.join(temp_directory, "dataset_0.dat") temp_input_extra_path = os.path.join(temp_directory, "dataset_0_files", "input_subdir", "extra") + temp_input_metadata_path = os.path.join(temp_directory, "metadata", "12312231231231.dat") temp_index_path = os.path.join(temp_index_dir, "human.fa") temp_config_path = os.path.join(temp_work_dir, "config.txt") @@ -167,6 +172,7 @@ def run(options): __write_to_file(temp_input_path, b"Hello world input!!@!") __write_to_file(temp_input_extra_path, b"INPUT_EXTRA_CONTENTS") + __write_to_file(temp_input_metadata_path, b"INPUT METADATA CONTENTS...") __write_to_file(temp_config_path, EXPECTED_OUTPUT) __write_to_file(temp_metadata_path, "meta input") __write_to_file(temp_tool_path, TEST_SCRIPT) @@ -194,11 +200,17 @@ def run(options): temp_output4_path, temp_shared_dir, temp_metadata_path, + temp_input_metadata_path, ) assert os.path.exists(temp_index_path) - command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params + command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params config_files = [temp_config_path] - input_files = [temp_input_path, temp_input_path, empty_input] + client_inputs = [] + client_inputs.append(ClientInput(temp_input_path, CLIENT_INPUT_PATH_TYPES.INPUT_PATH)) + client_inputs.append(ClientInput(temp_input_path, CLIENT_INPUT_PATH_TYPES.INPUT_PATH)) + client_inputs.append(ClientInput(empty_input, CLIENT_INPUT_PATH_TYPES.INPUT_PATH)) + client_inputs.append(ClientInput(os.path.join(temp_directory, "dataset_0_files"), CLIENT_INPUT_PATH_TYPES.INPUT_EXTRA_FILES_PATH)) + client_inputs.append(ClientInput(temp_input_metadata_path, CLIENT_INPUT_PATH_TYPES.INPUT_METADATA_PATH)) output_files = [ temp_output_path, temp_output2_path, @@ -224,7 +236,7 @@ def run(options): command_line=command_line, tool=MockTool(temp_tool_dir), config_files=config_files, - input_files=input_files, + client_inputs=ClientInputs(client_inputs), client_outputs=client_outputs, working_directory=temp_work_dir, metadata_directory=temp_metadata_dir, diff --git a/test/client_staging_test.py b/test/client_staging_test.py index d6ae5155bbac286d34907f3a030916a8ef09cbed..cc570557e9ea7c36f40fbe4510d4005609994919 100644 --- a/test/client_staging_test.py +++ b/test/client_staging_test.py @@ -45,6 +45,7 @@ class TestStager(TempDirectoryTestCase): os.makedirs(files_directory) self.input1 = os.path.join(files_directory, "dataset_1.dat") self.input1_files_path = os.path.join(files_directory, "dataset_1_files") + os.makedirs(self.input1_files_path) open(self.input1, "wb").write(b"012345") self.input2 = os.path.join(files_directory, "dataset_2.dat") open(self.input2, "wb").write(b"6789")