From f2a7282b850c6df1c754e734844e9041d726f4b3 Mon Sep 17 00:00:00 2001 From: John Chilton <jmchilton@gmail.com> Date: Tue, 17 Jun 2014 22:22:21 -0500 Subject: [PATCH] Rework API - remove lots of legacy cruft and improve endpoint naming. Routes going from: GET: [/managers/<manager_name>]/setup GET: [/managers/<manager_name>]/clean?job_id= GET: [/managers/<manager_name>]/launch?job_id= GET: [/managers/<manager_name>]/check_complete?job_id= GET: [/managers/<manager_name>]/kill?job_id= GET: [/managers/<manager_name>]/upload_input?job_id= GET: [/managers/<manager_name>]/upload_extra_input?job_id= GET: [/managers/<manager_name>]/upload_config_file?job_id= GET: [/managers/<manager_name>]/upload_working_directory_file?job_id= GET: [/managers/<manager_name>]/upload_unstructured_file?job_id= GET: [/managers/<manager_name>]/download_output?job_id= GET: [/managers/<manager_name>]/output_path?job_id= GET: [/managers/<manager_name>]/get_output_type?job_id= To: GET: [/managers/<manager_name>]/setup GET: [/managers/<manager_name>]/clean?job_id= GET: [/managers/<manager_name>]/submit?job_id= (was launch) GET: [/managers/<manager_name>]/status?job_id= (was check_complete) GET: [/managers/<manager_name>]/cancel?job_id= (was kill) GET: [/managers/<manager_name>]/upload_input?job_id= GET: [/managers/<manager_name>]/download_output?job_id= GET: [/managers/<manager_name>]/output_path?job_id= --- lwr/lwr_client/client.py | 55 +++----------------- lwr/lwr_client/job_directory.py | 4 +- lwr/lwr_client/staging/__init__.py | 19 ++----- lwr/lwr_client/staging/down.py | 13 ++--- lwr/web/routes.py | 82 +++--------------------------- test/app_test.py | 12 ++--- test/client_test.py | 41 +++------------ 7 files changed, 34 insertions(+), 192 deletions(-) diff --git a/lwr/lwr_client/client.py b/lwr/lwr_client/client.py index 7656b052..012da15f 100644 --- a/lwr/lwr_client/client.py +++ b/lwr/lwr_client/client.py @@ -111,7 +111,7 @@ class JobClient(BaseJobClient): # before queueing. setup_params = _setup_params_from_job_config(job_config) launch_params["setup_params"] = dumps(setup_params) - return self._raw_execute("launch", launch_params) + return self._raw_execute("submit", launch_params) def full_status(self): """ Return a dictionary summarizing final state of job. @@ -122,7 +122,7 @@ class JobClient(BaseJobClient): """ Cancel remote job, either removing from the queue or killing it. """ - return self._raw_execute("kill", {"job_id": self.job_id}) + return self._raw_execute("cancel", {"job_id": self.job_id}) @retry() @parseJson() @@ -130,7 +130,7 @@ class JobClient(BaseJobClient): """ Get check_complete response from the remote server. """ - check_complete_response = self._raw_execute("check_complete", {"job_id": self.job_id}) + check_complete_response = self._raw_execute("status", {"job_id": self.job_id}) return check_complete_response def get_status(self): @@ -138,12 +138,6 @@ class JobClient(BaseJobClient): # Older LWR instances won't set status so use 'complete', at some # point drop backward compatibility. status = check_complete_response.get("status", None) - if status in ["status", None]: - # LEGACY: Bug in certains older LWR instances returned literal - # "status". - complete = check_complete_response["complete"] == "true" - old_status = "complete" if complete else "running" - status = old_status return status def clean(self): @@ -192,34 +186,16 @@ class JobClient(BaseJobClient): used if targetting an older LWR server that didn't return statuses allowing this to be inferred. """ - if output_type == 'legacy': - self._fetch_output_legacy(path, working_directory, action_type=action_type) - elif output_type == 'output_workdir': + if output_type == 'output_workdir': self._fetch_work_dir_output(name, working_directory, path, action_type=action_type) elif output_type == 'output': self._fetch_output(path=path, name=name, action_type=action_type) else: raise Exception("Unknown output_type %s" % output_type) - + def _raw_execute(self, command, args={}, data=None, input_path=None, output_path=None): return self.job_manager_interface.execute(command, args, data, input_path, output_path) - # Deprecated - def _fetch_output_legacy(self, path, working_directory, action_type='transfer'): - # Needs to determine if output is task/working directory or standard. - name = os.path.basename(path) - - output_type = self._get_output_type(name) - if output_type == "none": - # Just make sure the file was created. - if not os.path.exists(path): - raise OutputNotFoundException(path) - return - elif output_type in ["task"]: - path = os.path.join(working_directory, name) - - self.__populate_output_path(name, path, output_type, action_type) - def _fetch_output(self, path, name=None, check_exists_remotely=False, action_type='transfer'): if not name: # Extra files will send in the path. @@ -249,26 +225,7 @@ class JobClient(BaseJobClient): return self._raw_execute(self._upload_file_action(args), args, contents, input_path) def _upload_file_action(self, args): - # Hack for backward compatibility, instead of using new upload_file - # path. Use old paths. - input_type = args['input_type'] - action = { - # For backward compatibility just target upload_input_extra for all - # inputs, it allows nested inputs. Want to do away with distinction - # inputs and extra inputs. - 'input': 'upload_extra_input', - 'config': 'upload_config_file', - 'workdir': 'upload_working_directory_file', - 'tool': 'upload_tool_file', - 'unstructured': 'upload_unstructured_file', - }[input_type] - del args['input_type'] - return action - - @parseJson() - def _get_output_type(self, name): - return self._raw_execute("get_output_type", {"name": name, - "job_id": self.job_id}) + return "upload_file" @parseJson() def _output_path(self, name, job_id, output_type): diff --git a/lwr/lwr_client/job_directory.py b/lwr/lwr_client/job_directory.py index b07e1c53..2ebaff79 100644 --- a/lwr/lwr_client/job_directory.py +++ b/lwr/lwr_client/job_directory.py @@ -13,11 +13,9 @@ log = getLogger(__name__) TYPES_TO_METHOD = dict( input="inputs_directory", - input_extra="inputs_directory", unstructured="unstructured_files_directory", config="configs_directory", tool="tool_files_directory", - work_dir="working_directory", workdir="working_directory", output="outputs_directory", output_workdir="working_directory", @@ -74,7 +72,7 @@ class RemoteJobDirectory(object): # Obviously this client won't be legacy because this is in the # client module, but this code is reused on server which may # serve legacy clients. - allow_nested_files = file_type in ['input', 'input_extra', 'unstructured', 'output', 'output_workdir'] + allow_nested_files = file_type in ['input', 'unstructured', 'output', 'output_workdir'] directory_function = getattr(self, TYPES_TO_METHOD.get(file_type, None), None) if not directory_function: raise Exception("Unknown file_type specified %s" % file_type) diff --git a/lwr/lwr_client/staging/__init__.py b/lwr/lwr_client/staging/__init__.py index c2a51da1..9341e762 100644 --- a/lwr/lwr_client/staging/__init__.py +++ b/lwr/lwr_client/staging/__init__.py @@ -132,8 +132,8 @@ class LwrOutputs(object): def from_status_response(complete_response): # Default to None instead of [] to distinguish between empty contents and it not set # by the LWR - older LWR instances will not set these in complete response. - working_directory_contents = complete_response.get("working_directory_contents", None) - output_directory_contents = complete_response.get("outputs_directory_contents", None) + working_directory_contents = complete_response.get("working_directory_contents") + output_directory_contents = complete_response.get("outputs_directory_contents") # Older (pre-2014) LWR servers will not include separator in response, # so this should only be used when reasoning about outputs in # subdirectories (which was not previously supported prior to that). @@ -145,25 +145,12 @@ class LwrOutputs(object): ) def has_output_file(self, output_file): - if self.output_directory_contents is None: - # Legacy LWR doesn't report this, return None indicating unsure if - # output was generated. - return None - else: - return basename(output_file) in self.output_directory_contents - - def has_output_directory_listing(self): - return self.output_directory_contents is not None + return basename(output_file) in self.output_directory_contents def output_extras(self, output_file): """ Returns dict mapping local path to remote name. """ - if not self.has_output_directory_listing(): - # Fetching $output.extra_files_path is not supported with legacy - # LWR (pre-2014) severs. - return {} - output_directory = dirname(output_file) def local_path(name): diff --git a/lwr/lwr_client/staging/down.py b/lwr/lwr_client/staging/down.py index 08e9db41..33de8100 100644 --- a/lwr/lwr_client/staging/down.py +++ b/lwr/lwr_client/staging/down.py @@ -91,9 +91,7 @@ class ResultsCollector(object): for output_file in self.output_files: # Fetch output directly... output_generated = self.lwr_outputs.has_output_file(output_file) - if output_generated is None: - self._attempt_collect_output('legacy', output_file) - elif output_generated: + if output_generated: self._attempt_collect_output('output', output_file) for galaxy_path, lwr_name in self.lwr_outputs.output_extras(output_file).iteritems(): @@ -102,8 +100,7 @@ class ResultsCollector(object): def __collect_version_file(self): version_file = self.client_outputs.version_file - # output_directory_contents may be none for legacy LWR servers. - lwr_output_directory_contents = (self.lwr_outputs.output_directory_contents or []) + lwr_output_directory_contents = self.lwr_outputs.output_directory_contents if version_file and COMMAND_VERSION_FILENAME in lwr_output_directory_contents: self._attempt_collect_output('output', version_file, name=COMMAND_VERSION_FILENAME) @@ -124,11 +121,7 @@ class ResultsCollector(object): # path. collected = False with self.exception_tracker(): - # output_action_type cannot be 'legacy' but output_type may be - # eventually drop support for legacy mode (where type wasn't known) - # ahead of time. - output_action_type = 'output_workdir' if output_type == 'output_workdir' else 'output' - action = self.action_mapper.action(path, output_action_type) + action = self.action_mapper.action(path, output_type) if self._collect_output(output_type, action, name): collected = True diff --git a/lwr/web/routes.py b/lwr/web/routes.py index 443a5d6d..af56f130 100644 --- a/lwr/web/routes.py +++ b/lwr/web/routes.py @@ -58,7 +58,7 @@ def clean(manager, job_id): @LwrController() -def launch(manager, job_id, command_line, params='{}', dependencies_description='null', setup_params='{}', remote_staging='[]', env='[]'): +def submit(manager, job_id, command_line, params='{}', dependencies_description='null', setup_params='{}', remote_staging='[]', env='[]'): submit_params = loads(params) setup_params = loads(setup_params) dependencies_description = loads(dependencies_description) @@ -76,88 +76,20 @@ def launch(manager, job_id, command_line, params='{}', dependencies_description= submit_job(manager, submit_config) -@LwrController(response_type='json') -def check_complete(manager, job_id): +@LwrController(path="/jobs/{job_id}/status", method="GET", response_type='json') +def status(manager, job_id): status = manager.get_status(job_id) return full_status(manager, status, job_id) -@LwrController() -def kill(manager, job_id): +@LwrController(path="/jobs/{job_id}/cancel", method="PUT") +def cancel(manager, job_id): manager.kill(job_id) -# Following routes allow older clients to talk to new LWR, should be considered -# deprecated in favor of generic upload_file route. -@LwrController(response_type='json') -def upload_tool_file(manager, file_cache, job_id, name, body, cache_token=None): - path = manager.job_directory(job_id).calculate_path(name, 'tool') - return _handle_upload( - file_cache, - path, - body, - cache_token=cache_token - ) - - -@LwrController(response_type='json') -def upload_input(manager, file_cache, job_id, name, body, cache_token=None): - path = manager.job_directory(job_id).calculate_path(name, 'input') - return _handle_upload( - file_cache, - path, - body, - cache_token=cache_token - ) - - -@LwrController(response_type='json') -def upload_extra_input(manager, file_cache, job_id, name, body, cache_token=None): - path = manager.job_directory(job_id).calculate_path(name, 'input') - return _handle_upload( - file_cache, - path, - body, - cache_token=cache_token - ) - - -@LwrController(response_type='json') -def upload_config_file(manager, file_cache, job_id, name, body, cache_token=None): - path = manager.job_directory(job_id).calculate_path(name, 'config') - return _handle_upload( - file_cache, - path, - body, - cache_token=cache_token, - ) - - -@LwrController(response_type='json') -def upload_working_directory_file(manager, file_cache, job_id, name, body, cache_token=None): - path = manager.job_directory(job_id).calculate_path(name, 'workdir') - return _handle_upload( - file_cache, - path, - body, - cache_token=cache_token, - ) - - -@LwrController(response_type='json') -def upload_unstructured_file(manager, file_cache, job_id, name, body, cache_token=None): - path = manager.job_directory(job_id).calculate_path(name, 'unstructured') - return _handle_upload( - file_cache, - path, - body, - cache_token=cache_token, - ) - - -@LwrController(response_type='json') +@LwrController(path="/jobs/{job_id}/files", method="POST", response_type='json') def upload_file(manager, input_type, file_cache, job_id, name, body, cache_token=None): - # Input type should be one of input, config, workdir, tool, or unstructured. + # Input type should be one of input, config, workdir, tool, or unstructured (see action_mapper.path_type) path = manager.job_directory(job_id).calculate_path(name, input_type) return _handle_upload(file_cache, path, body, cache_token=cache_token) diff --git a/test/app_test.py b/test/app_test.py index 0ea8c6de..48008048 100644 --- a/test/app_test.py +++ b/test/app_test.py @@ -21,7 +21,7 @@ def test_standard_requests(): job_id = setup_config["job_id"] def test_upload(upload_type): - url = "/upload_%s?job_id=%s&name=input1" % (upload_type, job_id) + url = "/upload_file?job_id=%s&name=input1&input_type=%s" % (job_id, upload_type) upload_input_response = app.post(url, "Test Contents") upload_input_config = json.loads(upload_input_response.body) staged_input_path = upload_input_config["path"] @@ -31,7 +31,7 @@ def test_standard_requests(): finally: staged_input.close() test_upload("input") - test_upload("tool_file") + test_upload("tool") test_output = open(os.path.join(outputs_directory, "test_output"), "w") try: @@ -48,21 +48,21 @@ def test_standard_requests(): pass command_line = urllib.quote("""python -c "import sys; sys.stdout.write('test_out')" """) - launch_response = app.get("/launch?job_id=%s&command_line=%s" % (job_id, command_line)) + launch_response = app.get("/submit?job_id=%s&command_line=%s" % (job_id, command_line)) assert launch_response.body == 'OK' # Hack: Call twice to ensure postprocessing occurs and has time to # complete. Monitor thread should get this. time.sleep(.2) - check_response = app.get("/check_complete?job_id=%s" % job_id) + check_response = app.get("/status?job_id=%s" % job_id) time.sleep(.2) - check_response = app.get("/check_complete?job_id=%s" % job_id) + check_response = app.get("/status?job_id=%s" % job_id) check_config = json.loads(check_response.body) assert check_config['returncode'] == 0 assert check_config['stdout'] == "test_out" assert check_config['stderr'] == "" - kill_response = app.get("/kill?job_id=%s" % job_id) + kill_response = app.get("/cancel?job_id=%s" % job_id) assert kill_response.body == 'OK' clean_response = app.get("/clean?job_id=%s" % job_id) diff --git a/test/client_test.py b/test/client_test.py index dda564f4..19ef2253 100644 --- a/test/client_test.py +++ b/test/client_test.py @@ -119,7 +119,7 @@ def test_setup(): def test_launch(): """ Test the launch method of client. """ client = TestClient() - request_checker = RequestChecker("launch", {"command_line": "python"}) + request_checker = RequestChecker("submit", {"command_line": "python"}) client.expect_open(request_checker, b'OK') client.launch("python") request_checker.assert_called() @@ -133,10 +133,10 @@ def __test_upload(upload_type): temp_file.write("Hello World!") finally: temp_file.close() - request_checker = RequestChecker("upload_%s" % upload_type, {"name": os.path.basename(temp_file_path)}, b"Hello World!") + request_checker = RequestChecker("upload_file", {"name": os.path.basename(temp_file_path), "input_type": upload_type}, b"Hello World!") client.expect_open(request_checker, b'{"path" : "C:\\\\tools\\\\foo"}') - if(upload_type == 'tool_file'): + if upload_type == 'tool': upload_result = client.put_file(temp_file_path, 'tool') else: upload_result = client.put_file(temp_file_path, 'input') @@ -146,11 +146,11 @@ def __test_upload(upload_type): def test_upload_tool(): - __test_upload("tool_file") + __test_upload("tool") def test_upload_input(): - __test_upload("extra_input") + __test_upload("input") def test_upload_config(): @@ -162,7 +162,7 @@ def test_upload_config(): finally: temp_file.close() modified_contents = "Hello World! <Modified>" - request_checker = RequestChecker("upload_config_file", {"name": os.path.basename(temp_file_path)}, modified_contents) + request_checker = RequestChecker("upload_file", {"name": os.path.basename(temp_file_path), "input_type": "config"}, modified_contents) client.expect_open(request_checker, b'{"path" : "C:\\\\tools\\\\foo"}') upload_result = client.put_file(temp_file_path, 'config', contents=modified_contents) request_checker.assert_called() @@ -185,42 +185,17 @@ def test_download_output(): assert contents == "test output contents", "Unxpected contents %s" % contents -def test_get_status_complete_legacy(): - client = TestClient() - request_checker = RequestChecker("check_complete") - client.expect_open(request_checker, b'{"complete": "true", "stdout" : "output"}') - assert client.get_status() == "complete" - request_checker.assert_called() - - -def test_get_status_running_legacy(): - client = TestClient() - request_checker = RequestChecker("check_complete") - client.expect_open(request_checker, b'{"complete": "false"}') - assert client.get_status() == "running" - request_checker.assert_called() - - def test_get_status_queued(): client = TestClient() - request_checker = RequestChecker("check_complete") + request_checker = RequestChecker("status") client.expect_open(request_checker, b'{"complete": "false", "status" : "queued"}') assert client.get_status() == "queued" request_checker.assert_called() -def test_get_status_invalid(): - client = TestClient() - request_checker = RequestChecker("check_complete") - # Mimic bug in specific older LWR instances. - client.expect_open(request_checker, b'{"complete": "false", "status" : "status"}') - assert client.get_status() == "running" - request_checker.assert_called() - - def test_kill(): client = TestClient() - request_checker = RequestChecker("kill") + request_checker = RequestChecker("cancel") client.expect_open(request_checker, b'OK') client.kill() request_checker.assert_called() -- GitLab