Commit 7ebb964a authored by John Chilton's avatar John Chilton
Browse files

Eliminate copy-paste from Kubernetes runner.

parent fcf16544
Loading
Loading
Loading
Loading
+16 −12
Original line number Diff line number Diff line
@@ -907,17 +907,7 @@ class AsynchronousJobRunner(BaseJobRunner, Monitors):
    def check_watched_item(self, job_state: AsynchronousJobState) -> Union[AsynchronousJobState, None]:
        raise NotImplementedError()

    def finish_job(self, job_state: AsynchronousJobState):
        """
        Get the output/error for a finished job, pass to `job_wrapper.finish`
        and cleanup all the job's temporary files.
        """
        galaxy_id_tag = job_state.job_wrapper.get_id_tag()
        external_job_id = job_state.job_id

        # To ensure that files below are readable, ownership must be reclaimed first
        job_state.job_wrapper.reclaim_ownership()

    def _collect_job_output(self, job_id: int, external_job_id: Optional[str], job_state: JobState):
        # wait for the files to appear
        which_try = 0
        collect_output_success = True
@@ -931,11 +921,25 @@ class AsynchronousJobRunner(BaseJobRunner, Monitors):
                if which_try == self.app.config.retry_job_output_collection:
                    stdout = ""
                    stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER
                    log.error("(%s/%s) %s: %s", galaxy_id_tag, external_job_id, stderr, unicodify(e))
                    log.error("(%s/%s) %s: %s", job_id, external_job_id, stderr, unicodify(e))
                    collect_output_success = False
                else:
                    time.sleep(1)
                which_try += 1
        return collect_output_success, stdout, stderr

    def finish_job(self, job_state: AsynchronousJobState):
        """
        Get the output/error for a finished job, pass to `job_wrapper.finish`
        and cleanup all the job's temporary files.
        """
        galaxy_id_tag = job_state.job_wrapper.get_id_tag()
        external_job_id = job_state.job_id

        # To ensure that files below are readable, ownership must be reclaimed first
        job_state.job_wrapper.reclaim_ownership()

        collect_output_success, stdout, stderr = self._collect_job_output(galaxy_id_tag, external_job_id, job_state)

        if not collect_output_success:
            job_state.fail_message = stderr
+1 −18
Original line number Diff line number Diff line
@@ -7,7 +7,6 @@ import logging
import math
import os
import re
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Union
@@ -50,7 +49,6 @@ from galaxy.jobs.runners.util.pykube_util import (
    Service,
    service_object_dict,
)
from galaxy.util import unicodify
from galaxy.util.bytesize import ByteSize

log = logging.getLogger(__name__)
@@ -1095,22 +1093,7 @@ class KubernetesJobRunner(AsynchronousJobRunner):
        # To ensure that files below are readable, ownership must be reclaimed first
        job_state.job_wrapper.reclaim_ownership()

        # wait for the files to appear
        which_try = 0
        while which_try < self.app.config.retry_job_output_collection + 1:
            try:
                with open(job_state.output_file, "rb") as stdout_file, open(job_state.error_file, "rb") as stderr_file:
                    job_stdout = self._job_io_for_db(stdout_file)
                    job_stderr = self._job_io_for_db(stderr_file)
                break
            except Exception as e:
                if which_try == self.app.config.retry_job_output_collection:
                    job_stdout = ""
                    job_stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER
                    log.error(f"{gxy_job.id}/{gxy_job.job_runner_external_id} {job_stderr}: {unicodify(e)}")
                else:
                    time.sleep(1)
                which_try += 1
        _, job_stdout, job_stderr = self._collect_job_output(gxy_job.id, gxy_job.job_runner_external_id, job_state)

        # get stderr and stdout to database
        outputs_directory = os.path.join(job_state.job_wrapper.working_directory, "outputs")