From bd05cb4cccd9d9ab33ef8c1e4c28b24395ab2f83 Mon Sep 17 00:00:00 2001 From: John Chilton <jmchilton@gmail.com> Date: Mon, 17 Feb 2014 22:28:03 -0600 Subject: [PATCH] Rearrange JobClient methods so highest-level, most important method toward top. --- lwr/lwr_client/client.py | 219 +++++++++++++++++++-------------------- 1 file changed, 108 insertions(+), 111 deletions(-) diff --git a/lwr/lwr_client/client.py b/lwr/lwr_client/client.py index bb62a619..59121ce2 100644 --- a/lwr/lwr_client/client.py +++ b/lwr/lwr_client/client.py @@ -79,12 +79,79 @@ class JobClient(BaseJobClient): super(JobClient, self).__init__(destination_params, job_id) self.job_manager_interface = job_manager_interface - def _raw_execute(self, command, args={}, data=None, input_path=None, output_path=None): - return self.job_manager_interface.execute(command, args, data, input_path, output_path) - @property - def _submit_params(self): - return submit_params(self.destination_params) + def launch(self, command_line, requirements=[], remote_staging=[], job_config=None): + """ + Queue up the execution of the supplied `command_line` on the remote + server. Called launch for historical reasons, should be renamed to + enqueue or something like that. + + **Parameters** + + command_line : str + Command to execute. + """ + launch_params = dict(command_line=command_line, job_id=self.job_id) + submit_params_dict = submit_params(self.destination_params) + if submit_params_dict: + launch_params['params'] = dumps(submit_params_dict) + if requirements: + launch_params['requirements'] = dumps([requirement.to_dict() for requirement in requirements]) + if remote_staging: + launch_params['remote_staging'] = dumps(remote_staging) + if job_config and self.setup_handler.local: + # Setup not yet called, job properties were inferred from + # destination arguments. Hence, must have LWR setup job + # before queueing. + setup_params = _setup_params_from_job_config(job_config) + launch_params["setup_params"] = dumps(setup_params) + return self._raw_execute("launch", launch_params) + + def final_status(self): + """ Return a dictionary summarizing final state of job. + """ + return self.raw_check_complete() + + def kill(self): + """ + Cancel remote job, either removing from the queue or killing it. + """ + return self._raw_execute("kill", {"job_id": self.job_id}) + + @retry() + @parseJson() + def raw_check_complete(self): + """ + Get check_complete response from the remote server. + """ + check_complete_response = self._raw_execute("check_complete", {"job_id": self.job_id}) + return check_complete_response + + def get_status(self): + check_complete_response = self.raw_check_complete() + # Older LWR instances won't set status so use 'complete', at some + # point drop backward compatibility. + status = check_complete_response.get("status", None) + if status in ["status", None]: + # LEGACY: Bug in certains older LWR instances returned literal + # "status". + complete = check_complete_response["complete"] == "true" + old_status = "complete" if complete else "running" + status = old_status + return status + + def clean(self): + """ + Cleanup the remote job. + """ + self._raw_execute("clean", {"job_id": self.job_id}) + + @parseJson() + def remote_setup(self, **setup_args): + """ + Setup remote LWR server to run this job. + """ + return self._raw_execute("setup", setup_args) def put_file(self, path, input_type, name=None, contents=None, action_type='transfer'): if not name: @@ -100,31 +167,8 @@ class JobClient(BaseJobClient): copy(path, lwr_path) return {'path': lwr_path} - @parseJson() - def _upload_file(self, args, contents, input_path): - return self._raw_execute(self._upload_file_action(args), args, contents, input_path) - - def _upload_file_action(self, args): - ## Hack for backward compatibility, instead of using new upload_file - ## path. Use old paths. - input_type = args['input_type'] - action = { - # For backward compatibility just target upload_input_extra for all - # inputs, it allows nested inputs. Want to do away with distinction - # inputs and extra inputs. - 'input': 'upload_extra_input', - 'config': 'upload_config_file', - 'workdir': 'upload_working_directory_file', - 'tool': 'upload_tool_file', - 'unstructured': 'upload_unstructured_file', - }[input_type] - del args['input_type'] - return action - - @parseJson() - def _get_output_type(self, name): - return self._raw_execute("get_output_type", {"name": name, - "job_id": self.job_id}) + def _raw_execute(self, command, args={}, data=None, input_path=None, output_path=None): + return self.job_manager_interface.execute(command, args, data, input_path, output_path) # Deprecated def fetch_output_legacy(self, path, working_directory, action_type='transfer'): @@ -161,14 +205,6 @@ class JobClient(BaseJobClient): output_type = "direct" # Task/from_work_dir outputs now handled with fetch_work_dir_output self.__populate_output_path(name, path, output_type, action_type) - def __populate_output_path(self, name, output_path, output_type, action_type): - self.__ensure_directory(output_path) - if action_type == 'transfer': - self.__raw_download_output(name, self.job_id, output_type, output_path) - elif action_type == 'copy': - lwr_path = self._output_path(name, self.job_id, output_type)['path'] - copy(lwr_path, output_path) - def fetch_work_dir_output(self, name, working_directory, output_path, action_type='transfer'): """ Download an output dataset specified with from_work_dir from the @@ -190,6 +226,40 @@ class JobClient(BaseJobClient): lwr_path = self._output_path(name, self.job_id, 'work_dir')['path'] copy(lwr_path, output_path) + def __populate_output_path(self, name, output_path, output_type, action_type): + self.__ensure_directory(output_path) + if action_type == 'transfer': + self.__raw_download_output(name, self.job_id, output_type, output_path) + elif action_type == 'copy': + lwr_path = self._output_path(name, self.job_id, output_type)['path'] + copy(lwr_path, output_path) + + @parseJson() + def _upload_file(self, args, contents, input_path): + return self._raw_execute(self._upload_file_action(args), args, contents, input_path) + + def _upload_file_action(self, args): + ## Hack for backward compatibility, instead of using new upload_file + ## path. Use old paths. + input_type = args['input_type'] + action = { + # For backward compatibility just target upload_input_extra for all + # inputs, it allows nested inputs. Want to do away with distinction + # inputs and extra inputs. + 'input': 'upload_extra_input', + 'config': 'upload_config_file', + 'workdir': 'upload_working_directory_file', + 'tool': 'upload_tool_file', + 'unstructured': 'upload_unstructured_file', + }[input_type] + del args['input_type'] + return action + + @parseJson() + def _get_output_type(self, name): + return self._raw_execute("get_output_type", {"name": name, + "job_id": self.job_id}) + def __ensure_directory(self, output_path): output_path_directory = os.path.dirname(output_path) if not os.path.exists(output_path_directory): @@ -211,79 +281,6 @@ class JobClient(BaseJobClient): } self._raw_execute("download_output", output_params, output_path=output_path) - def launch(self, command_line, requirements=[], remote_staging=[], job_config=None): - """ - Queue up the execution of the supplied `command_line` on the remote - server. Called launch for historical reasons, should be renamed to - enqueue or something like that. - - **Parameters** - - command_line : str - Command to execute. - """ - launch_params = dict(command_line=command_line, job_id=self.job_id) - submit_params = self._submit_params - if submit_params: - launch_params['params'] = dumps(submit_params) - if requirements: - launch_params['requirements'] = dumps([requirement.to_dict() for requirement in requirements]) - if remote_staging: - launch_params['remote_staging'] = dumps(remote_staging) - if job_config and self.setup_handler.local: - # Setup not yet called, job properties were inferred from - # destination arguments. Hence, must have LWR setup job - # before queueing. - setup_params = _setup_params_from_job_config(job_config) - launch_params["setup_params"] = dumps(setup_params) - return self._raw_execute("launch", launch_params) - - def final_status(self): - """ Return a dictionary summarizing final state of job. - """ - return self.raw_check_complete() - - def kill(self): - """ - Cancel remote job, either removing from the queue or killing it. - """ - return self._raw_execute("kill", {"job_id": self.job_id}) - - @retry() - @parseJson() - def raw_check_complete(self): - """ - Get check_complete response from the remote server. - """ - check_complete_response = self._raw_execute("check_complete", {"job_id": self.job_id}) - return check_complete_response - - def get_status(self): - check_complete_response = self.raw_check_complete() - # Older LWR instances won't set status so use 'complete', at some - # point drop backward compatibility. - status = check_complete_response.get("status", None) - if status in ["status", None]: - # LEGACY: Bug in certains older LWR instances returned literal - # "status". - complete = check_complete_response["complete"] == "true" - old_status = "complete" if complete else "running" - status = old_status - return status - - def clean(self): - """ - Cleanup the remote job. - """ - self._raw_execute("clean", {"job_id": self.job_id}) - - @parseJson() - def remote_setup(self, **setup_args): - """ - Setup remote LWR server to run this job. - """ - return self._raw_execute("setup", setup_args) - class MessageJobClient(BaseJobClient): -- GitLab