Loading lib/galaxy/jobs/__init__.py +9 −0 Original line number Diff line number Diff line Loading @@ -2125,6 +2125,15 @@ class JobWrapper(HasResourceParameters): log.debug("found container runtime %s" % container_runtime) self.app.interactivetool_manager.configure_entry_points(job, container_runtime) return True container_exception_path = os.path.join(working_directory, "container_monitor_exception.txt") if os.path.exists(container_exception_path): with open(container_exception_path) as fh: exception_string = fh.read() error_message = "Monitoring interactive tool entry point failed" log.error("Monitoring interactive tool entry point for job {} failed: {}".format(self.job_id, exception_string)) self.fail(error_message) # local job runner uses return value to determine if we're done polling return True def container_monitor_command(self, container, **kwds): if not container or not self.tool.produces_entry_points: Loading lib/galaxy/tool_util/deps/docker_util.py +6 −1 Original line number Diff line number Diff line Loading @@ -223,6 +223,7 @@ def parse_port_text(port_text): >>> slurm_ports = parse_port_text("8888/tcp -> 0.0.0.0:32769") >>> slurm_ports[8888]['host'] '0.0.0.0' >>> ports = parse_port_text("5432/tcp -> :::5432") """ ports = None if port_text is not None: Loading @@ -231,7 +232,11 @@ def parse_port_text(port_text): if " -> " not in line: raise Exception("Cannot parse host and port from line [%s]" % line) tool, host = line.split(" -> ", 1) hostname, port = host.split(':') hostname, port = host.rsplit(':', 1) if hostname == '::': # Skip unspecified IPv6 address, which is also specified as 0:0:0:0 in another line. # This is brittle of course, but so is parsing the container ports like this. continue port = int(port) tool_p, tool_prot = tool.split("/") tool_p = int(tool_p) Loading lib/galaxy_ext/container_monitor/monitor.py +8 −3 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ import subprocess import sys import tempfile import time import traceback # insert *this* galaxy before all others on sys.path sys.path.insert(1, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))) Loading Loading @@ -44,6 +45,7 @@ def main(): raise Exception("Monitoring container type [%s], not yet implemented." % container_type) ports_raw = None exc_traceback = "" for i in range(10): try: ports_raw = parse_ports(container_name, connection_configuration) Loading @@ -62,10 +64,13 @@ def main(): break else: raise Exception("Failed to recover ports...") except Exception as e: with open("container_monitor_exception.txt", "a") as f: f.write(str(e) + "\n\n\n") except Exception: exc_info = sys.exc_info() exc_traceback = "".join(traceback.format_exception(*exc_info)) time.sleep(i * 2) else: with open("container_monitor_exception.txt", "w") as f: f.write(exc_traceback) if __name__ == "__main__": Loading test/integration/test_interactivetools_api.py +7 −2 Original line number Diff line number Diff line Loading @@ -69,12 +69,17 @@ class BaseInteractiveToolsIntegrationTestCase(ContainerizedIntegrationTestCase): entry_points = self.entry_points_for_job(job_id) if len(entry_points) != expected_num: return None elif any([not e["active"] for e in entry_points]): elif any(not e["active"] for e in entry_points): job_json = self._get("jobs/{}?full=true".format(job_id)).json() if job_json['state'] == 'error': raise Exception("Interactive tool job {} failed: {}".format(job_id, job_json)) return None else: return entry_points return wait_on(active_entry_points, "entry points to become active") # It currently takes at least 90 seconds until we can be sure the container monitor failed. # Can be decreased when galaxy_ext/container_monitor/monitor.py changes return wait_on(active_entry_points, "entry points to become active", timeout=120) def entry_points_for_job(self, job_id): entry_points_response = self._get("entry_points?job_id=%s" % job_id) Loading Loading
lib/galaxy/jobs/__init__.py +9 −0 Original line number Diff line number Diff line Loading @@ -2125,6 +2125,15 @@ class JobWrapper(HasResourceParameters): log.debug("found container runtime %s" % container_runtime) self.app.interactivetool_manager.configure_entry_points(job, container_runtime) return True container_exception_path = os.path.join(working_directory, "container_monitor_exception.txt") if os.path.exists(container_exception_path): with open(container_exception_path) as fh: exception_string = fh.read() error_message = "Monitoring interactive tool entry point failed" log.error("Monitoring interactive tool entry point for job {} failed: {}".format(self.job_id, exception_string)) self.fail(error_message) # local job runner uses return value to determine if we're done polling return True def container_monitor_command(self, container, **kwds): if not container or not self.tool.produces_entry_points: Loading
lib/galaxy/tool_util/deps/docker_util.py +6 −1 Original line number Diff line number Diff line Loading @@ -223,6 +223,7 @@ def parse_port_text(port_text): >>> slurm_ports = parse_port_text("8888/tcp -> 0.0.0.0:32769") >>> slurm_ports[8888]['host'] '0.0.0.0' >>> ports = parse_port_text("5432/tcp -> :::5432") """ ports = None if port_text is not None: Loading @@ -231,7 +232,11 @@ def parse_port_text(port_text): if " -> " not in line: raise Exception("Cannot parse host and port from line [%s]" % line) tool, host = line.split(" -> ", 1) hostname, port = host.split(':') hostname, port = host.rsplit(':', 1) if hostname == '::': # Skip unspecified IPv6 address, which is also specified as 0:0:0:0 in another line. # This is brittle of course, but so is parsing the container ports like this. continue port = int(port) tool_p, tool_prot = tool.split("/") tool_p = int(tool_p) Loading
lib/galaxy_ext/container_monitor/monitor.py +8 −3 Original line number Diff line number Diff line Loading @@ -4,6 +4,7 @@ import subprocess import sys import tempfile import time import traceback # insert *this* galaxy before all others on sys.path sys.path.insert(1, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir))) Loading Loading @@ -44,6 +45,7 @@ def main(): raise Exception("Monitoring container type [%s], not yet implemented." % container_type) ports_raw = None exc_traceback = "" for i in range(10): try: ports_raw = parse_ports(container_name, connection_configuration) Loading @@ -62,10 +64,13 @@ def main(): break else: raise Exception("Failed to recover ports...") except Exception as e: with open("container_monitor_exception.txt", "a") as f: f.write(str(e) + "\n\n\n") except Exception: exc_info = sys.exc_info() exc_traceback = "".join(traceback.format_exception(*exc_info)) time.sleep(i * 2) else: with open("container_monitor_exception.txt", "w") as f: f.write(exc_traceback) if __name__ == "__main__": Loading
test/integration/test_interactivetools_api.py +7 −2 Original line number Diff line number Diff line Loading @@ -69,12 +69,17 @@ class BaseInteractiveToolsIntegrationTestCase(ContainerizedIntegrationTestCase): entry_points = self.entry_points_for_job(job_id) if len(entry_points) != expected_num: return None elif any([not e["active"] for e in entry_points]): elif any(not e["active"] for e in entry_points): job_json = self._get("jobs/{}?full=true".format(job_id)).json() if job_json['state'] == 'error': raise Exception("Interactive tool job {} failed: {}".format(job_id, job_json)) return None else: return entry_points return wait_on(active_entry_points, "entry points to become active") # It currently takes at least 90 seconds until we can be sure the container monitor failed. # Can be decreased when galaxy_ext/container_monitor/monitor.py changes return wait_on(active_entry_points, "entry points to become active", timeout=120) def entry_points_for_job(self, job_id): entry_points_response = self._get("entry_points?job_id=%s" % job_id) Loading