Unverified Commit 7ff6a0a2 authored by Dannon's avatar Dannon Committed by GitHub
Browse files

Merge pull request #11961 from mvdbeek/release_21.01

Merge release_20.09 forward and move string formatting to f-strings
parents 2ea1b94e 2552ab47
Loading
Loading
Loading
Loading
+9 −0
Original line number Diff line number Diff line
@@ -2135,6 +2135,15 @@ class JobWrapper(HasResourceParameters):
            log.debug("found container runtime %s" % container_runtime)
            self.app.interactivetool_manager.configure_entry_points(job, container_runtime)
            return True
        container_exception_path = os.path.join(working_directory, "container_monitor_exception.txt")
        if os.path.exists(container_exception_path):
            with open(container_exception_path) as fh:
                exception_string = fh.read()
            error_message = "Monitoring interactive tool entry point failed"
            log.error(f"Monitoring interactive tool entry point for job {self.job_id} failed: {exception_string}")
            self.fail(error_message)
            # local job runner uses return value to determine if we're done polling
            return True

    def container_monitor_command(self, container, **kwds):
        if not container or not self.tool.produces_entry_points:
+9 −7
Original line number Diff line number Diff line
@@ -416,13 +416,15 @@ class JobHandlerQueue(Monitors):
                                   JOB_USER_OVER_TOTAL_WALLTIME):
                    if job_state == JOB_USER_OVER_QUOTA:
                        log.info("(%d) User (%s) is over quota: job paused" % (job.id, job.user_id))
                        what = "your disk quota"
                    else:
                        log.info("(%d) User (%s) is over total walltime limit: job paused" % (job.id, job.user_id))
                        what = "your total job runtime"

                    job.set_state(model.Job.states.PAUSED)
                    for dataset_assoc in job.output_datasets + job.output_library_datasets:
                        dataset_assoc.dataset.dataset.state = model.Dataset.states.PAUSED
                        dataset_assoc.dataset.info = "Execution of this dataset's job is paused because you were over your disk quota at the time it was ready to run"
                        dataset_assoc.dataset.info = f"Execution of this dataset's job is paused because you were over {what} at the time it was ready to run"
                        self.sa_session.add(dataset_assoc.dataset.dataset)
                    self.sa_session.add(job)
                elif job_state == JOB_ERROR:
@@ -584,13 +586,13 @@ class JobHandlerQueue(Monitors):
        if (state == JOB_READY and
                "delta" in self.app.job_config.limits.total_walltime):
            jobs_to_check = self.sa_session.query(model.Job).filter(
                model.Job.user_id == job.user.id,
                model.Job.update_time >= datetime.datetime.now() -
                datetime.timedelta(
                    self.app.job_config.limits.total_walltime["window"]
                ),
                model.Job.update_time >= datetime.datetime.now() - datetime.timedelta(self.app.job_config.limits.total_walltime["window"]),
                model.Job.state == 'ok'
            ).all()
            )
            if job.user_id:
                jobs_to_check = jobs_to_check.filter(model.Job.user_id == job.user_id)
            else:
                jobs_to_check = jobs_to_check.filter(model.Job.session_id == job.session_id)
            time_spent = datetime.timedelta(0)
            for job in jobs_to_check:
                # History is job.state_history
+6 −1
Original line number Diff line number Diff line
@@ -227,6 +227,7 @@ def parse_port_text(port_text):
    >>> slurm_ports = parse_port_text("8888/tcp -> 0.0.0.0:32769")
    >>> slurm_ports[8888]['host']
    '0.0.0.0'
    >>> ports = parse_port_text("5432/tcp -> :::5432")
    """
    ports = None
    if port_text is not None:
@@ -235,7 +236,11 @@ def parse_port_text(port_text):
            if " -> " not in line:
                raise Exception("Cannot parse host and port from line [%s]" % line)
            tool, host = line.split(" -> ", 1)
            hostname, port = host.split(':')
            hostname, port = host.rsplit(':', 1)
            if hostname == '::':
                # Skip unspecified IPv6 address, which is also specified as 0:0:0:0 in another line.
                # This is brittle of course, but so is parsing the container ports like this.
                continue
            port = int(port)
            tool_p, tool_prot = tool.split("/")
            tool_p = int(tool_p)
+8 −3
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ import subprocess
import sys
import tempfile
import time
import traceback

# insert *this* galaxy before all others on sys.path
sys.path.insert(1, os.path.abspath(os.path.join(os.path.dirname(__file__), os.pardir, os.pardir)))
@@ -44,6 +45,7 @@ def main():
        raise Exception("Monitoring container type [%s], not yet implemented." % container_type)

    ports_raw = None
    exc_traceback = ""
    for i in range(10):
        try:
            ports_raw = parse_ports(container_name, connection_configuration)
@@ -62,10 +64,13 @@ def main():
                break
            else:
                raise Exception("Failed to recover ports...")
        except Exception as e:
            with open("container_monitor_exception.txt", "a") as f:
                f.write(str(e) + "\n\n\n")
        except Exception:
            exc_info = sys.exc_info()
            exc_traceback = "".join(traceback.format_exception(*exc_info))
        time.sleep(i * 2)
    else:
        with open("container_monitor_exception.txt", "w") as f:
            f.write(exc_traceback)


if __name__ == "__main__":
+7 −2
Original line number Diff line number Diff line
@@ -68,12 +68,17 @@ class BaseInteractiveToolsIntegrationTestCase(ContainerizedIntegrationTestCase):
            entry_points = self.entry_points_for_job(job_id)
            if len(entry_points) != expected_num:
                return None
            elif any([not e["active"] for e in entry_points]):
            elif any(not e["active"] for e in entry_points):
                job_json = self._get(f"jobs/{job_id}?full=true").json()
                if job_json['state'] == 'error':
                    raise Exception(f"Interactive tool job {job_id} failed: {job_json}")
                return None
            else:
                return entry_points

        return wait_on(active_entry_points, "entry points to become active")
        # It currently takes at least 90 seconds until we can be sure the container monitor failed.
        # Can be decreased when galaxy_ext/container_monitor/monitor.py changes
        return wait_on(active_entry_points, "entry points to become active", timeout=120)

    def entry_points_for_job(self, job_id):
        entry_points_response = self._get("entry_points?job_id=%s" % job_id)