Skip to content
Snippets Groups Projects
Commit 4903b4de authored by John Chilton's avatar John Chilton
Browse files

More work toward 'rewrite' action type - LWR integration test working.

parent 2d980b49
No related branches found
No related tags found
No related merge requests found
...@@ -136,9 +136,11 @@ class FileActionMapper(object): ...@@ -136,9 +136,11 @@ class FileActionMapper(object):
mapper = self.__find_mapper(path, type, mapper) mapper = self.__find_mapper(path, type, mapper)
action_class = self.__action_class(path, type, mapper) action_class = self.__action_class(path, type, mapper)
file_lister = DEFAULT_FILE_LISTER file_lister = DEFAULT_FILE_LISTER
action_kwds = {}
if mapper: if mapper:
file_lister = mapper.file_lister file_lister = mapper.file_lister
action = action_class(path, file_lister=file_lister) action_kwds = mapper.action_kwds
action = action_class(path, file_lister=file_lister, **action_kwds)
self.__process_action(action, type) self.__process_action(action, type)
return action return action
...@@ -217,12 +219,20 @@ class BaseAction(object): ...@@ -217,12 +219,20 @@ class BaseAction(object):
self.path = path self.path = path
self.file_lister = file_lister or DEFAULT_FILE_LISTER self.file_lister = file_lister or DEFAULT_FILE_LISTER
def unstructured_map(self): def unstructured_map(self, path_helper):
unstructured_map = self.file_lister.unstructured_map(self.path) unstructured_map = self.file_lister.unstructured_map(self.path)
# To ensure uniqueness, prepend unique prefix to each name if self.staging_needed:
prefix = unique_path_prefix(self.path) # To ensure uniqueness, prepend unique prefix to each name
for path, name in unstructured_map.iteritems(): prefix = unique_path_prefix(self.path)
unstructured_map[path] = join(prefix, name) for path, name in unstructured_map.iteritems():
unstructured_map[path] = join(prefix, name)
else:
path_rewrites = {}
for path in unstructured_map:
rewrite = self.path_rewrite(path_helper, path)
if rewrite:
path_rewrites[path] = rewrite
unstructured_map = path_rewrites
return unstructured_map return unstructured_map
@property @property
...@@ -249,7 +259,7 @@ class NoneAction(BaseAction): ...@@ -249,7 +259,7 @@ class NoneAction(BaseAction):
def from_dict(cls, action_dict): def from_dict(cls, action_dict):
return NoneAction(path=action_dict["path"]) return NoneAction(path=action_dict["path"])
def path_rewrite(self, path_helper): def path_rewrite(self, path_helper, path=None):
return None return None
...@@ -286,8 +296,10 @@ class RewriteAction(BaseAction): ...@@ -286,8 +296,10 @@ class RewriteAction(BaseAction):
destination_directory=action_dict["destination_directory"], destination_directory=action_dict["destination_directory"],
) )
def path_rewrite(self, path_helper): def path_rewrite(self, path_helper, path=None):
new_path = path_helper.from_posix_with_new_base(self.path, self.from_posix_with_new_base, self.destination_directory) if not path:
path = self.path
new_path = path_helper.from_posix_with_new_base(self.path, self.source_directory, self.destination_directory)
return None if new_path == self.path else new_path return None if new_path == self.path else new_path
...@@ -416,9 +428,10 @@ class BasePathMapper(object): ...@@ -416,9 +428,10 @@ class BasePathMapper(object):
action_type = config.get('action', DEFAULT_MAPPED_ACTION) action_type = config.get('action', DEFAULT_MAPPED_ACTION)
action_class = actions.get(action_type, None) action_class = actions.get(action_type, None)
action_kwds = action_class.action_spec.copy() action_kwds = action_class.action_spec.copy()
action_kwds.update(config)
for key, value in action_kwds.items(): for key, value in action_kwds.items():
if value is REQUIRED_ACTION_KWD: if key in config:
action_kwds[key] = config[key]
elif value is REQUIRED_ACTION_KWD:
message_template = "action_type %s requires key word argument %s" message_template = "action_type %s requires key word argument %s"
message = message_template % (action_type, key) message = message_template % (action_type, key)
raise Exception( message ) raise Exception( message )
...@@ -535,10 +548,11 @@ DEFAULT_FILE_LISTER = FileLister(dict(depth=0)) ...@@ -535,10 +548,11 @@ DEFAULT_FILE_LISTER = FileLister(dict(depth=0))
ACTION_CLASSES = [ ACTION_CLASSES = [
NoneAction, NoneAction,
RewriteAction,
TransferAction, TransferAction,
CopyAction, CopyAction,
RemoteCopyAction, RemoteCopyAction,
RemoteTransferAction RemoteTransferAction,
] ]
actions = dict([(clazz.action_type, clazz) for clazz in ACTION_CLASSES]) actions = dict([(clazz.action_type, clazz) for clazz in ACTION_CLASSES])
......
...@@ -78,9 +78,11 @@ class FileStager(object): ...@@ -78,9 +78,11 @@ class FileStager(object):
self.job_inputs = JobInputs(self.command_line, self.config_files) self.job_inputs = JobInputs(self.command_line, self.config_files)
self.action_mapper = FileActionMapper(client) self.action_mapper = FileActionMapper(client)
self.transfer_tracker = TransferTracker(client, self.action_mapper, self.job_inputs, rewrite_paths=self.rewrite_paths)
self.__handle_setup(job_config) self.__handle_setup(job_config)
self.transfer_tracker = TransferTracker(client, self.path_helper, self.action_mapper, self.job_inputs, rewrite_paths=self.rewrite_paths)
self.__initialize_referenced_tool_files() self.__initialize_referenced_tool_files()
if self.rewrite_paths: if self.rewrite_paths:
self.__initialize_referenced_arbitrary_files() self.__initialize_referenced_arbitrary_files()
...@@ -142,7 +144,7 @@ class FileStager(object): ...@@ -142,7 +144,7 @@ class FileStager(object):
referenced_arbitrary_path_mappers[path] = mapper referenced_arbitrary_path_mappers[path] = mapper
for path, mapper in referenced_arbitrary_path_mappers.iteritems(): for path, mapper in referenced_arbitrary_path_mappers.iteritems():
action = self.action_mapper.action(path, path_type.UNSTRUCTURED, mapper) action = self.action_mapper.action(path, path_type.UNSTRUCTURED, mapper)
unstructured_map = action.unstructured_map() unstructured_map = action.unstructured_map(self.path_helper)
self.arbitrary_files.update(unstructured_map) self.arbitrary_files.update(unstructured_map)
def __upload_tool_files(self): def __upload_tool_files(self):
...@@ -330,8 +332,9 @@ class JobInputs(object): ...@@ -330,8 +332,9 @@ class JobInputs(object):
class TransferTracker(object): class TransferTracker(object):
def __init__(self, client, action_mapper, job_inputs, rewrite_paths): def __init__(self, client, path_helper, action_mapper, job_inputs, rewrite_paths):
self.client = client self.client = client
self.path_helper = path_helper
self.action_mapper = action_mapper self.action_mapper = action_mapper
self.job_inputs = job_inputs self.job_inputs = job_inputs
...@@ -357,6 +360,11 @@ class TransferTracker(object): ...@@ -357,6 +360,11 @@ class TransferTracker(object):
register = self.rewrite_paths or type == 'tool' # Even if inputs not rewritten, tool must be. register = self.rewrite_paths or type == 'tool' # Even if inputs not rewritten, tool must be.
if register: if register:
self.register_rewrite(path, get_path(), type, force=True) self.register_rewrite(path, get_path(), type, force=True)
elif self.rewrite_paths:
path_rewrite = action.path_rewrite(self.path_helper)
if path_rewrite:
self.register_rewrite(path, path_rewrite, type, force=True)
# else: # No action for this file # else: # No action for this file
def __add_remote_staging_input(self, action, name, type): def __add_remote_staging_input(self, action, name, type):
......
...@@ -47,6 +47,7 @@ try: ...@@ -47,6 +47,7 @@ try:
output.write(contents) output.write(contents)
open("workdir_output", "w").write("WORK DIR OUTPUT") open("workdir_output", "w").write("WORK DIR OUTPUT")
open("env_test", "w").write(getenv("TEST_ENV", "DEFAULT")) open("env_test", "w").write(getenv("TEST_ENV", "DEFAULT"))
open("rewrite_action_test", "w").write(sys.argv[12])
output2.write(output2_contents) output2.write(output2_contents)
with open("galaxy.json", "w") as f: f.write("GALAXY_JSON") with open("galaxy.json", "w") as f: f.write("GALAXY_JSON")
output3.write(getenv("MOO", "moo_default")) output3.write(getenv("MOO", "moo_default"))
...@@ -83,10 +84,11 @@ def run(options): ...@@ -83,10 +84,11 @@ def run(options):
temp_directory = tempfile.mkdtemp() temp_directory = tempfile.mkdtemp()
temp_index_dir = os.path.join(temp_directory, "idx", "bwa") temp_index_dir = os.path.join(temp_directory, "idx", "bwa")
temp_index_dir_sibbling = os.path.join(temp_directory, "idx", "seq") temp_index_dir_sibbling = os.path.join(temp_directory, "idx", "seq")
temp_shared_dir = os.path.join(temp_directory, "shared", "test1")
temp_work_dir = os.path.join(temp_directory, "w") temp_work_dir = os.path.join(temp_directory, "w")
temp_tool_dir = os.path.join(temp_directory, "t") temp_tool_dir = os.path.join(temp_directory, "t")
__makedirs([temp_tool_dir, temp_work_dir, temp_index_dir, temp_index_dir_sibbling]) __makedirs([temp_tool_dir, temp_work_dir, temp_index_dir, temp_index_dir_sibbling, temp_shared_dir])
temp_input_path = os.path.join(temp_directory, "dataset_0.dat") temp_input_path = os.path.join(temp_directory, "dataset_0.dat")
temp_input_extra_path = os.path.join(temp_directory, "dataset_0_files", "input_subdir", "extra") temp_input_extra_path = os.path.join(temp_directory, "dataset_0_files", "input_subdir", "extra")
...@@ -102,6 +104,9 @@ def run(options): ...@@ -102,6 +104,9 @@ def run(options):
temp_output_workdir_destination = os.path.join(temp_directory, "dataset_77.dat") temp_output_workdir_destination = os.path.join(temp_directory, "dataset_77.dat")
temp_output_workdir = os.path.join(temp_work_dir, "env_test") temp_output_workdir = os.path.join(temp_work_dir, "env_test")
temp_output_workdir_destination2 = os.path.join(temp_directory, "dataset_78.dat")
temp_output_workdir2 = os.path.join(temp_work_dir, "rewrite_action_test")
__write_to_file(temp_input_path, b"Hello world input!!@!") __write_to_file(temp_input_path, b"Hello world input!!@!")
__write_to_file(temp_input_extra_path, b"INPUT_EXTRA_CONTENTS") __write_to_file(temp_input_extra_path, b"INPUT_EXTRA_CONTENTS")
__write_to_file(temp_config_path, EXPECTED_OUTPUT) __write_to_file(temp_config_path, EXPECTED_OUTPUT)
...@@ -128,17 +133,21 @@ def run(options): ...@@ -128,17 +133,21 @@ def run(options):
temp_version_output_path, temp_version_output_path,
temp_index_path, temp_index_path,
temp_output4_path, temp_output4_path,
temp_shared_dir,
) )
assert os.path.exists(temp_index_path) assert os.path.exists(temp_index_path)
command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params
config_files = [temp_config_path] config_files = [temp_config_path]
input_files = [temp_input_path, empty_input] input_files = [temp_input_path, empty_input]
output_files = [temp_output_path, temp_output2_path, temp_output3_path, temp_output4_path, os.path.join(temp_directory, "dataset_77.dat")] output_files = [temp_output_path, temp_output2_path, temp_output3_path, temp_output4_path, temp_output_workdir_destination, temp_output_workdir_destination2]
client, client_manager = __client(temp_directory, options) client, client_manager = __client(temp_directory, options)
waiter = Waiter(client, client_manager) waiter = Waiter(client, client_manager)
client_outputs = ClientOutputs( client_outputs = ClientOutputs(
working_directory=temp_work_dir, working_directory=temp_work_dir,
work_dir_outputs=[(temp_output_workdir, temp_output_workdir_destination)], work_dir_outputs=[
(temp_output_workdir, temp_output_workdir_destination),
(temp_output_workdir2, temp_output_workdir_destination2),
],
output_files=output_files, output_files=output_files,
version_file=temp_version_output_path, version_file=temp_version_output_path,
) )
...@@ -160,6 +169,8 @@ def run(options): ...@@ -160,6 +169,8 @@ def run(options):
__assert_contents(temp_output2_path, cmd_text, result_status) __assert_contents(temp_output2_path, cmd_text, result_status)
__assert_contents(os.path.join(temp_work_dir, "galaxy.json"), b"GALAXY_JSON", result_status) __assert_contents(os.path.join(temp_work_dir, "galaxy.json"), b"GALAXY_JSON", result_status)
__assert_contents(os.path.join(temp_directory, "dataset_1_files", "extra"), b"EXTRA_OUTPUT_CONTENTS", result_status) __assert_contents(os.path.join(temp_directory, "dataset_1_files", "extra"), b"EXTRA_OUTPUT_CONTENTS", result_status)
if getattr(options, "test_rewrite_action", False):
__assert_contents(temp_output_workdir_destination2, os.path.join(temp_directory, "shared2", "test1"), result_status)
if job_description.env: if job_description.env:
__assert_contents(temp_output_workdir_destination, b"TEST_ENV_VALUE", result_status) __assert_contents(temp_output_workdir_destination, b"TEST_ENV_VALUE", result_status)
__assert_contents(temp_version_output_path, b"1.0.1", result_status) __assert_contents(temp_version_output_path, b"1.0.1", result_status)
...@@ -251,12 +262,16 @@ def __exercise_errors(options, client, temp_output_path, temp_directory): ...@@ -251,12 +262,16 @@ def __exercise_errors(options, client, temp_output_path, temp_directory):
def __client(temp_directory, options): def __client(temp_directory, options):
default_file_action = getattr(options, "default_file_action", None) default_file_action = getattr(options, "default_file_action", None)
unstructured_action = default_file_action or "transfer" unstructured_action = default_file_action or "transfer"
path_defs = [
dict(path=os.path.join(temp_directory, "idx"), path_types="unstructured", depth=2, action=unstructured_action),
]
if getattr(options, "test_rewrite_action", False):
rewrite_def = dict(path=os.path.join(temp_directory, "shared"), path_types="unstructured", action="rewrite", source_directory=os.path.join(temp_directory, "shared"), destination_directory=os.path.join(temp_directory, "shared2"))
path_defs.append(rewrite_def)
client_options = { client_options = {
"url": getattr(options, "url", None), "url": getattr(options, "url", None),
"private_token": getattr(options, "private_token", None), "private_token": getattr(options, "private_token", None),
"file_action_config": write_json_config(temp_directory, dict(paths=[ "file_action_config": write_json_config(temp_directory, dict(paths=path_defs)),
dict(path=os.path.join(temp_directory, "idx"), path_types="unstructured", depth=2, action=unstructured_action)
])),
} }
if default_file_action: if default_file_action:
client_options["default_file_action"] = default_file_action client_options["default_file_action"] = default_file_action
......
...@@ -86,7 +86,7 @@ class BaseIntegrationTest(TempDirectoryTestCase): ...@@ -86,7 +86,7 @@ class BaseIntegrationTest(TempDirectoryTestCase):
class IntegrationTests(BaseIntegrationTest): class IntegrationTests(BaseIntegrationTest):
default_kwargs = dict(direct_interface=False, test_requirement=True, test_unicode=True, test_env=True) default_kwargs = dict(direct_interface=False, test_requirement=True, test_unicode=True, test_env=True, test_rewrite_action=True)
def test_integration_no_requirement(self): def test_integration_no_requirement(self):
self._run(private_token=None, **self.default_kwargs) self._run(private_token=None, **self.default_kwargs)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment