Unverified Commit ef6812db authored by mvdbeek's avatar mvdbeek
Browse files

Automatically disable parallelism on pulsar job runner

parent e29f927f
Loading
Loading
Loading
Loading
+22 −24
Original line number Diff line number Diff line
@@ -64,7 +64,7 @@ log = get_logger(__name__)
    "user_over_quota",
    "user_over_total_walltime",
)
DEFAULT_JOB_PUT_FAILURE_MESSAGE = "Unable to run job due to a misconfiguration of the Galaxy job running system.  Please contact a site administrator."
DEFAULT_JOB_RUNNER_FAILURE_MESSAGE = "Unable to run job due to a misconfiguration of the Galaxy job running system.  Please contact a site administrator."


class JobHandlerI:
@@ -707,8 +707,8 @@ class JobHandlerQueue(Monitors):
            job_state = e.job_state or JOB_WAIT
            return job_state, None
        except Exception as e:
            failure_message = getattr(e, "failure_message", DEFAULT_JOB_PUT_FAILURE_MESSAGE)
            if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
            failure_message = getattr(e, "failure_message", DEFAULT_JOB_RUNNER_FAILURE_MESSAGE)
            if failure_message == DEFAULT_JOB_RUNNER_FAILURE_MESSAGE:
                log.exception("Failed to generate job destination")
            else:
                log.debug(f"Intentionally failing job with message ({failure_message})")
@@ -1211,13 +1211,6 @@ class DefaultJobDispatcher:
        for runner in self.job_runners.values():
            runner.start()

    def __get_runner_name(self, job_wrapper):
        if job_wrapper.can_split():
            runner_name = "tasks"
        else:
            runner_name = job_wrapper.job_destination.runner
        return runner_name

    def url_to_destination(self, url):
        """This is used by the runner mapper (a.k.a. dynamic runner) and
        recovery methods to have runners convert URLs to destinations.
@@ -1235,18 +1228,25 @@ class DefaultJobDispatcher:
            )
            return JobDestination(runner=runner_name)

    def put(self, job_wrapper):
        runner_name = self.__get_runner_name(job_wrapper)
    def get_job_runner(self, job_wrapper, get_task_runner=False):
        runner_name = job_wrapper.job_destination.runner
        try:
            runner = self.job_runners[runner_name]
        except KeyError:
            log.error(f"({job_wrapper.job_id}) Invalid job runner: {runner_name}")
            job_wrapper.fail(DEFAULT_JOB_RUNNER_FAILURE_MESSAGE)
        if get_task_runner and job_wrapper.can_split() and runner.runner_name != "PulsarJobRunner":
            return self.job_runners["tasks"]
        return runner

    def put(self, job_wrapper):
        runner = self.get_job_runner(job_wrapper, get_task_runner=True)
        if isinstance(job_wrapper, TaskWrapper):
            # DBTODO Refactor
                log.debug(f"({job_wrapper.job_id}) Dispatching task {job_wrapper.task_id} to {runner_name} runner")
            log.debug(f"({job_wrapper.job_id}) Dispatching task {job_wrapper.task_id} to task runner")
        else:
                log.debug(f"({job_wrapper.job_id}) Dispatching to {runner_name} runner")
            self.job_runners[runner_name].put(job_wrapper)
        except KeyError:
            log.error(f"put(): ({job_wrapper.job_id}) Invalid job runner: {runner_name}")
            job_wrapper.fail(DEFAULT_JOB_PUT_FAILURE_MESSAGE)
            log.debug(f"({job_wrapper.job_id}) Dispatching to {job_wrapper.job_destination.runner} runner")
        runner.put(job_wrapper)

    def stop(self, job, job_wrapper):
        """
@@ -1277,11 +1277,9 @@ class DefaultJobDispatcher:
    def recover(self, job, job_wrapper):
        runner_name = (job.job_runner_name.split(":", 1))[0]
        log.debug("recovering job %d in %s runner" % (job.id, runner_name))
        runner = self.get_job_runner(job_wrapper)
        try:
            self.job_runners[runner_name].recover(job, job_wrapper)
        except KeyError:
            log.error(f"recover(): ({job_wrapper.job_id}) Invalid job runner: {runner_name}")
            job_wrapper.fail(DEFAULT_JOB_PUT_FAILURE_MESSAGE)
            runner.recover(job, job_wrapper)
        except ObjectNotFound:
            msg = "Could not recover job working directory after Galaxy restart"
            log.exception(f"recover(): ({job_wrapper.job_id}) {msg}")
+2 −2
Original line number Diff line number Diff line
@@ -11,7 +11,7 @@ import time

from galaxy import model
from galaxy.jobs import JobDestination
from galaxy.jobs.handler import DEFAULT_JOB_PUT_FAILURE_MESSAGE
from galaxy.jobs.handler import DEFAULT_JOB_RUNNER_FAILURE_MESSAGE
from galaxy.jobs.runners import (
    AsynchronousJobRunner,
    AsynchronousJobState,
@@ -201,7 +201,7 @@ class DRMAAJobRunner(AsynchronousJobRunner):
            else:
                log.error(f"({galaxy_id_tag}) All attempts to submit job failed")
                if not fail_msg:
                    fail_msg = DEFAULT_JOB_PUT_FAILURE_MESSAGE
                    fail_msg = DEFAULT_JOB_RUNNER_FAILURE_MESSAGE
                job_wrapper.fail(fail_msg)
                return
        else: