Commit eeb4d259 authored by nuwang's avatar nuwang
Browse files

Don't mark job as failed if unknown exit code

parent 8276216f
Loading
Loading
Loading
Loading
+39 −7
Original line number Diff line number Diff line
@@ -794,9 +794,14 @@ class KubernetesJobRunner(AsynchronousJobRunner):
                log.debug(
                    f"Job id: {job_state.job_id} failed and it is not a deletion case. Current state: {job_state.job_wrapper.get_state()}"
                )
                self._handle_job_failure(job, job_state)
                if self._handle_job_failure(job, job_state):
                    # changes for resubmission (removed self.mark_as_failed from handle_job_failure)
                    self.work_queue.put((self.mark_as_failed, job_state))
                else:
                    # Job failure was not due to a k8s issue or something that k8s can handle, so it's a tool error.
                    job_state.running = False
                    self.mark_as_finished(job_state)
                    return None

                return None

@@ -843,6 +848,7 @@ class KubernetesJobRunner(AsynchronousJobRunner):

    def _handle_job_failure(self, job, job_state):
        # Figure out why job has failed
        mark_failed = True
        with open(job_state.error_file, "a") as error_file:
            log.debug("Trying with error file in _handle_job_failure")
            if self.__job_failed_due_to_low_memory(job_state):
@@ -855,11 +861,18 @@ class KubernetesJobRunner(AsynchronousJobRunner):
                error_file.write("DeadlineExceeded")
                job_state.fail_message = "Job was active longer than specified deadline"
                job_state.runner_state = JobState.runner_states.WALLTIME_REACHED
            elif self.__job_failed_due_to_unknown_exit_code(job_state):
                msg = f"Job: {job_state.job_id} failed due to an unknown exit code from the tool."
                log.debug(msg)
                job_state.fail_message = msg
                job_state.runner_state = JobState.runner_states.TOOL_DETECT_ERROR
                mark_failed = False
            else:
                log.debug(f"Unknown error detected in job: {job_state.job_id}")
                error_file.write("Exceeded max number of job retries allowed for job\n")
                msg = f"An unknown error occurred in this job and the maximum number of retries have been exceeded for job: {job_state.job_id}."
                log.debug(msg)
                error_file.write(msg)
                job_state.fail_message = (
                    "More job retries failed than allowed. See standard output within the info section for details."
                    "An unknown error occurered with this job. See standard output within the info section for details."
                )
        # changes for resubmission
        # job_state.running = False
@@ -870,7 +883,7 @@ class KubernetesJobRunner(AsynchronousJobRunner):
            self.__cleanup_k8s_job(job)
        except Exception:
            log.exception("Could not clean up k8s batch job. Ignoring...")
        return None
        return mark_failed

    def __cleanup_k8s_job(self, job):
        k8s_cleanup_job = self.runner_params["k8s_cleanup_job"]
@@ -930,6 +943,25 @@ class KubernetesJobRunner(AsynchronousJobRunner):
        pod = Pod(self._pykube_api, pods.response["items"][0])
        return is_pod_unschedulable(self._pykube_api, pod, self.runner_params["k8s_namespace"])

    def __job_failed_due_to_unknown_exit_code(self, job_state):
        """
        checks whether the pod exited prematurely due to an unknown exit code (i.e. not an exit code like OOM that
        we can handle). This would mean that the tool failed, but the job should be considered to have succeeded.
        """
        pods = find_pod_object_by_name(self._pykube_api, job_state.job_id, self.runner_params["k8s_namespace"])
        if not pods.response["items"]:
            return False

        pod = pods.response["items"][0]
        if (
            pod
            and "terminated" in pod["status"]["containerStatuses"][0]["state"]
            and pod["status"]["containerStatuses"][0]["state"].get("exitCode")
        ):
            return True

        return False

    def __cleanup_k8s_guest_ports(self, job_wrapper, k8s_job):
        k8s_job_prefix = self.__produce_k8s_job_prefix()
        k8s_job_name = f"{k8s_job_prefix}-{self.__force_label_conformity(job_wrapper.get_id_tag())}"
+1 −1

File changed.

Contains only whitespace changes.