From 4903b4dec3b8934ef109074ecd489ac7e4cbabf7 Mon Sep 17 00:00:00 2001 From: John Chilton <jmchilton@gmail.com> Date: Fri, 16 May 2014 15:40:38 -0500 Subject: [PATCH] More work toward 'rewrite' action type - LWR integration test working. --- lwr/lwr_client/action_mapper.py | 38 ++++++++++++++++++++++----------- lwr/lwr_client/staging/up.py | 14 +++++++++--- test/check.py | 29 +++++++++++++++++++------ test/integration_test.py | 2 +- 4 files changed, 60 insertions(+), 23 deletions(-) diff --git a/lwr/lwr_client/action_mapper.py b/lwr/lwr_client/action_mapper.py index eec61591..96d2628e 100644 --- a/lwr/lwr_client/action_mapper.py +++ b/lwr/lwr_client/action_mapper.py @@ -136,9 +136,11 @@ class FileActionMapper(object): mapper = self.__find_mapper(path, type, mapper) action_class = self.__action_class(path, type, mapper) file_lister = DEFAULT_FILE_LISTER + action_kwds = {} if mapper: file_lister = mapper.file_lister - action = action_class(path, file_lister=file_lister) + action_kwds = mapper.action_kwds + action = action_class(path, file_lister=file_lister, **action_kwds) self.__process_action(action, type) return action @@ -217,12 +219,20 @@ class BaseAction(object): self.path = path self.file_lister = file_lister or DEFAULT_FILE_LISTER - def unstructured_map(self): + def unstructured_map(self, path_helper): unstructured_map = self.file_lister.unstructured_map(self.path) - # To ensure uniqueness, prepend unique prefix to each name - prefix = unique_path_prefix(self.path) - for path, name in unstructured_map.iteritems(): - unstructured_map[path] = join(prefix, name) + if self.staging_needed: + # To ensure uniqueness, prepend unique prefix to each name + prefix = unique_path_prefix(self.path) + for path, name in unstructured_map.iteritems(): + unstructured_map[path] = join(prefix, name) + else: + path_rewrites = {} + for path in unstructured_map: + rewrite = self.path_rewrite(path_helper, path) + if rewrite: + path_rewrites[path] = rewrite + unstructured_map = path_rewrites return unstructured_map @property @@ -249,7 +259,7 @@ class NoneAction(BaseAction): def from_dict(cls, action_dict): return NoneAction(path=action_dict["path"]) - def path_rewrite(self, path_helper): + def path_rewrite(self, path_helper, path=None): return None @@ -286,8 +296,10 @@ class RewriteAction(BaseAction): destination_directory=action_dict["destination_directory"], ) - def path_rewrite(self, path_helper): - new_path = path_helper.from_posix_with_new_base(self.path, self.from_posix_with_new_base, self.destination_directory) + def path_rewrite(self, path_helper, path=None): + if not path: + path = self.path + new_path = path_helper.from_posix_with_new_base(self.path, self.source_directory, self.destination_directory) return None if new_path == self.path else new_path @@ -416,9 +428,10 @@ class BasePathMapper(object): action_type = config.get('action', DEFAULT_MAPPED_ACTION) action_class = actions.get(action_type, None) action_kwds = action_class.action_spec.copy() - action_kwds.update(config) for key, value in action_kwds.items(): - if value is REQUIRED_ACTION_KWD: + if key in config: + action_kwds[key] = config[key] + elif value is REQUIRED_ACTION_KWD: message_template = "action_type %s requires key word argument %s" message = message_template % (action_type, key) raise Exception( message ) @@ -535,10 +548,11 @@ DEFAULT_FILE_LISTER = FileLister(dict(depth=0)) ACTION_CLASSES = [ NoneAction, + RewriteAction, TransferAction, CopyAction, RemoteCopyAction, - RemoteTransferAction + RemoteTransferAction, ] actions = dict([(clazz.action_type, clazz) for clazz in ACTION_CLASSES]) diff --git a/lwr/lwr_client/staging/up.py b/lwr/lwr_client/staging/up.py index 1bbe9487..fa9f70ec 100644 --- a/lwr/lwr_client/staging/up.py +++ b/lwr/lwr_client/staging/up.py @@ -78,9 +78,11 @@ class FileStager(object): self.job_inputs = JobInputs(self.command_line, self.config_files) self.action_mapper = FileActionMapper(client) - self.transfer_tracker = TransferTracker(client, self.action_mapper, self.job_inputs, rewrite_paths=self.rewrite_paths) self.__handle_setup(job_config) + + self.transfer_tracker = TransferTracker(client, self.path_helper, self.action_mapper, self.job_inputs, rewrite_paths=self.rewrite_paths) + self.__initialize_referenced_tool_files() if self.rewrite_paths: self.__initialize_referenced_arbitrary_files() @@ -142,7 +144,7 @@ class FileStager(object): referenced_arbitrary_path_mappers[path] = mapper for path, mapper in referenced_arbitrary_path_mappers.iteritems(): action = self.action_mapper.action(path, path_type.UNSTRUCTURED, mapper) - unstructured_map = action.unstructured_map() + unstructured_map = action.unstructured_map(self.path_helper) self.arbitrary_files.update(unstructured_map) def __upload_tool_files(self): @@ -330,8 +332,9 @@ class JobInputs(object): class TransferTracker(object): - def __init__(self, client, action_mapper, job_inputs, rewrite_paths): + def __init__(self, client, path_helper, action_mapper, job_inputs, rewrite_paths): self.client = client + self.path_helper = path_helper self.action_mapper = action_mapper self.job_inputs = job_inputs @@ -357,6 +360,11 @@ class TransferTracker(object): register = self.rewrite_paths or type == 'tool' # Even if inputs not rewritten, tool must be. if register: self.register_rewrite(path, get_path(), type, force=True) + elif self.rewrite_paths: + path_rewrite = action.path_rewrite(self.path_helper) + if path_rewrite: + self.register_rewrite(path, path_rewrite, type, force=True) + # else: # No action for this file def __add_remote_staging_input(self, action, name, type): diff --git a/test/check.py b/test/check.py index 6bc71a81..6a5abd15 100644 --- a/test/check.py +++ b/test/check.py @@ -47,6 +47,7 @@ try: output.write(contents) open("workdir_output", "w").write("WORK DIR OUTPUT") open("env_test", "w").write(getenv("TEST_ENV", "DEFAULT")) + open("rewrite_action_test", "w").write(sys.argv[12]) output2.write(output2_contents) with open("galaxy.json", "w") as f: f.write("GALAXY_JSON") output3.write(getenv("MOO", "moo_default")) @@ -83,10 +84,11 @@ def run(options): temp_directory = tempfile.mkdtemp() temp_index_dir = os.path.join(temp_directory, "idx", "bwa") temp_index_dir_sibbling = os.path.join(temp_directory, "idx", "seq") + temp_shared_dir = os.path.join(temp_directory, "shared", "test1") temp_work_dir = os.path.join(temp_directory, "w") temp_tool_dir = os.path.join(temp_directory, "t") - __makedirs([temp_tool_dir, temp_work_dir, temp_index_dir, temp_index_dir_sibbling]) + __makedirs([temp_tool_dir, temp_work_dir, temp_index_dir, temp_index_dir_sibbling, temp_shared_dir]) temp_input_path = os.path.join(temp_directory, "dataset_0.dat") temp_input_extra_path = os.path.join(temp_directory, "dataset_0_files", "input_subdir", "extra") @@ -102,6 +104,9 @@ def run(options): temp_output_workdir_destination = os.path.join(temp_directory, "dataset_77.dat") temp_output_workdir = os.path.join(temp_work_dir, "env_test") + temp_output_workdir_destination2 = os.path.join(temp_directory, "dataset_78.dat") + temp_output_workdir2 = os.path.join(temp_work_dir, "rewrite_action_test") + __write_to_file(temp_input_path, b"Hello world input!!@!") __write_to_file(temp_input_extra_path, b"INPUT_EXTRA_CONTENTS") __write_to_file(temp_config_path, EXPECTED_OUTPUT) @@ -128,17 +133,21 @@ def run(options): temp_version_output_path, temp_index_path, temp_output4_path, + temp_shared_dir, ) assert os.path.exists(temp_index_path) - command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params + command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params config_files = [temp_config_path] input_files = [temp_input_path, empty_input] - output_files = [temp_output_path, temp_output2_path, temp_output3_path, temp_output4_path, os.path.join(temp_directory, "dataset_77.dat")] + output_files = [temp_output_path, temp_output2_path, temp_output3_path, temp_output4_path, temp_output_workdir_destination, temp_output_workdir_destination2] client, client_manager = __client(temp_directory, options) waiter = Waiter(client, client_manager) client_outputs = ClientOutputs( working_directory=temp_work_dir, - work_dir_outputs=[(temp_output_workdir, temp_output_workdir_destination)], + work_dir_outputs=[ + (temp_output_workdir, temp_output_workdir_destination), + (temp_output_workdir2, temp_output_workdir_destination2), + ], output_files=output_files, version_file=temp_version_output_path, ) @@ -160,6 +169,8 @@ def run(options): __assert_contents(temp_output2_path, cmd_text, result_status) __assert_contents(os.path.join(temp_work_dir, "galaxy.json"), b"GALAXY_JSON", result_status) __assert_contents(os.path.join(temp_directory, "dataset_1_files", "extra"), b"EXTRA_OUTPUT_CONTENTS", result_status) + if getattr(options, "test_rewrite_action", False): + __assert_contents(temp_output_workdir_destination2, os.path.join(temp_directory, "shared2", "test1"), result_status) if job_description.env: __assert_contents(temp_output_workdir_destination, b"TEST_ENV_VALUE", result_status) __assert_contents(temp_version_output_path, b"1.0.1", result_status) @@ -251,12 +262,16 @@ def __exercise_errors(options, client, temp_output_path, temp_directory): def __client(temp_directory, options): default_file_action = getattr(options, "default_file_action", None) unstructured_action = default_file_action or "transfer" + path_defs = [ + dict(path=os.path.join(temp_directory, "idx"), path_types="unstructured", depth=2, action=unstructured_action), + ] + if getattr(options, "test_rewrite_action", False): + rewrite_def = dict(path=os.path.join(temp_directory, "shared"), path_types="unstructured", action="rewrite", source_directory=os.path.join(temp_directory, "shared"), destination_directory=os.path.join(temp_directory, "shared2")) + path_defs.append(rewrite_def) client_options = { "url": getattr(options, "url", None), "private_token": getattr(options, "private_token", None), - "file_action_config": write_json_config(temp_directory, dict(paths=[ - dict(path=os.path.join(temp_directory, "idx"), path_types="unstructured", depth=2, action=unstructured_action) - ])), + "file_action_config": write_json_config(temp_directory, dict(paths=path_defs)), } if default_file_action: client_options["default_file_action"] = default_file_action diff --git a/test/integration_test.py b/test/integration_test.py index 9b76c767..cff9790f 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -86,7 +86,7 @@ class BaseIntegrationTest(TempDirectoryTestCase): class IntegrationTests(BaseIntegrationTest): - default_kwargs = dict(direct_interface=False, test_requirement=True, test_unicode=True, test_env=True) + default_kwargs = dict(direct_interface=False, test_requirement=True, test_unicode=True, test_env=True, test_rewrite_action=True) def test_integration_no_requirement(self): self._run(private_token=None, **self.default_kwargs) -- GitLab