diff --git a/lwr/lwr_client/client.py b/lwr/lwr_client/client.py index 59121ce22f5f3ac11400b3b466a1363fd5d3c2d6..edc452f497d690d4a2562965944b6a1e5c6c7f2e 100644 --- a/lwr/lwr_client/client.py +++ b/lwr/lwr_client/client.py @@ -79,7 +79,6 @@ class JobClient(BaseJobClient): super(JobClient, self).__init__(destination_params, job_id) self.job_manager_interface = job_manager_interface - def launch(self, command_line, requirements=[], remote_staging=[], job_config=None): """ Queue up the execution of the supplied `command_line` on the remote @@ -167,11 +166,39 @@ class JobClient(BaseJobClient): copy(path, lwr_path) return {'path': lwr_path} + def fetch_output(self, path, name, working_directory, action_type, output_type): + """ + Fetch (transfer, copy, etc...) an output from the remote LWR server. + + **Parameters** + + path : str + Local path of the dataset. + name : str + Remote name of file (i.e. path relative to remote staging output + or working directory). + working_directory : str + Local working_directory for the job. + action_type : str + Where to find file on LWR (output_workdir or output). legacy is also + an option in this case LWR is asked for location - this will only be + used if targetting an older LWR server that didn't return statuses + allowing this to be inferred. + """ + if output_type == 'legacy': + self._fetch_output_legacy(path, working_directory, action_type=action_type) + elif output_type == 'output_workdir': + self._fetch_work_dir_output(name, working_directory, path, action_type=action_type) + elif output_type == 'output': + self._fetch_output(path=path, name=name, action_type=action_type) + else: + raise Exception("Unknown output_type %s" % output_type) + def _raw_execute(self, command, args={}, data=None, input_path=None, output_path=None): return self.job_manager_interface.execute(command, args, data, input_path, output_path) # Deprecated - def fetch_output_legacy(self, path, working_directory, action_type='transfer'): + def _fetch_output_legacy(self, path, working_directory, action_type='transfer'): # Needs to determine if output is task/working directory or standard. name = os.path.basename(path) @@ -186,18 +213,7 @@ class JobClient(BaseJobClient): self.__populate_output_path(name, path, output_type, action_type) - def fetch_output(self, path, name=None, check_exists_remotely=False, action_type='transfer'): - """ - Download an output dataset from the remote server. - - **Parameters** - - path : str - Local path of the dataset. - working_directory : str - Local working_directory for the job. - """ - + def _fetch_output(self, path, name=None, check_exists_remotely=False, action_type='transfer'): if not name: # Extra files will send in the path. name = os.path.basename(path) @@ -205,20 +221,7 @@ class JobClient(BaseJobClient): output_type = "direct" # Task/from_work_dir outputs now handled with fetch_work_dir_output self.__populate_output_path(name, path, output_type, action_type) - def fetch_work_dir_output(self, name, working_directory, output_path, action_type='transfer'): - """ - Download an output dataset specified with from_work_dir from the - remote server. - - **Parameters** - - name : str - Path in job's working_directory to find output in. - working_directory : str - Local working_directory for the job. - output_path : str - Full path to output dataset. - """ + def _fetch_work_dir_output(self, name, working_directory, output_path, action_type='transfer'): self.__ensure_directory(output_path) if action_type == 'transfer': self.__raw_download_output(name, self.job_id, "work_dir", output_path) diff --git a/lwr/lwr_client/staging/down.py b/lwr/lwr_client/staging/down.py index 1c541f311611c96dc7f99e569ec045b6c324a14f..2f30ecb7c963c08c0a68febabb67abc5a843fa5c 100644 --- a/lwr/lwr_client/staging/down.py +++ b/lwr/lwr_client/staging/down.py @@ -41,15 +41,14 @@ class ClientOutputCollector(object): if not action.staging_action_local: return False - path = action.path - if output_type == 'legacy': - working_directory = results_collector.client_outputs.working_directory - self.client.fetch_output_legacy(path, working_directory, action_type=action.action_type) - elif output_type == 'output_workdir': - working_directory = results_collector.client_outputs.working_directory - self.client.fetch_work_dir_output(name, working_directory, path, action_type=action.action_type) - elif output_type == 'output': - self.client.fetch_output(path=path, name=name, action_type=action.action_type) + working_directory = results_collector.client_outputs.working_directory + self.client.fetch_output( + path=action.path, + name=name, + working_directory=working_directory, + output_type=output_type, + action_type=action.action_type + ) return True diff --git a/test/check.py b/test/check.py index 4beab463156f751d17bec06f1dc0e09467f19d41..258ef936ebd2094be2940a9e915f86b8eb951d5b 100644 --- a/test/check.py +++ b/test/check.py @@ -237,7 +237,7 @@ def __exercise_errors(options, client, temp_output_path, temp_directory): """ if getattr(options, 'test_errors', False): try: - client.fetch_output(temp_output_path + "x") + client._fetch_output(temp_output_path + "x") except BaseException: if not options.suppress_output: traceback.print_exc() diff --git a/test/client_test.py b/test/client_test.py index 0f5f457b4728487bb0d37ec77e72e2db1c435021..dda564f4e8f6203e1b58938bafd2ce90ac9afb47 100644 --- a/test/client_test.py +++ b/test/client_test.py @@ -178,7 +178,7 @@ def test_download_output(): client.expect_open(request_checker, b'"direct"') request_checker = RequestChecker("download_output", {"name": os.path.basename(temp_file.name), "output_type": "direct"}) client.expect_open(request_checker, b"test output contents") - client.fetch_output(temp_file.name) + client._fetch_output(temp_file.name) with open(temp_file.name, "r") as f: contents = f.read(1024)