Unverified Commit 387660b0 authored by Marius van den Beek's avatar Marius van den Beek Committed by GitHub
Browse files

Merge pull request #15919 from mvdbeek/pulsar_fixes_22_05

[22.05] Various fixes for running pulsar jobs
parents 356d33b7 2f9b7212
Loading
Loading
Loading
Loading
+22 −24
Original line number Diff line number Diff line
@@ -64,7 +64,7 @@ log = get_logger(__name__)
    "user_over_quota",
    "user_over_total_walltime",
)
DEFAULT_JOB_PUT_FAILURE_MESSAGE = "Unable to run job due to a misconfiguration of the Galaxy job running system.  Please contact a site administrator."
DEFAULT_JOB_RUNNER_FAILURE_MESSAGE = "Unable to run job due to a misconfiguration of the Galaxy job running system.  Please contact a site administrator."


class JobHandlerI:
@@ -707,8 +707,8 @@ class JobHandlerQueue(Monitors):
            job_state = e.job_state or JOB_WAIT
            return job_state, None
        except Exception as e:
            failure_message = getattr(e, "failure_message", DEFAULT_JOB_PUT_FAILURE_MESSAGE)
            if failure_message == DEFAULT_JOB_PUT_FAILURE_MESSAGE:
            failure_message = getattr(e, "failure_message", DEFAULT_JOB_RUNNER_FAILURE_MESSAGE)
            if failure_message == DEFAULT_JOB_RUNNER_FAILURE_MESSAGE:
                log.exception("Failed to generate job destination")
            else:
                log.debug(f"Intentionally failing job with message ({failure_message})")
@@ -1211,13 +1211,6 @@ class DefaultJobDispatcher:
        for runner in self.job_runners.values():
            runner.start()

    def __get_runner_name(self, job_wrapper):
        if job_wrapper.can_split():
            runner_name = "tasks"
        else:
            runner_name = job_wrapper.job_destination.runner
        return runner_name

    def url_to_destination(self, url):
        """This is used by the runner mapper (a.k.a. dynamic runner) and
        recovery methods to have runners convert URLs to destinations.
@@ -1235,18 +1228,25 @@ class DefaultJobDispatcher:
            )
            return JobDestination(runner=runner_name)

    def put(self, job_wrapper):
        runner_name = self.__get_runner_name(job_wrapper)
    def get_job_runner(self, job_wrapper, get_task_runner=False):
        runner_name = job_wrapper.job_destination.runner
        try:
            runner = self.job_runners[runner_name]
        except KeyError:
            log.error(f"({job_wrapper.job_id}) Invalid job runner: {runner_name}")
            job_wrapper.fail(DEFAULT_JOB_RUNNER_FAILURE_MESSAGE)
        if get_task_runner and job_wrapper.can_split() and runner.runner_name != "PulsarJobRunner":
            return self.job_runners["tasks"]
        return runner

    def put(self, job_wrapper):
        runner = self.get_job_runner(job_wrapper, get_task_runner=True)
        if isinstance(job_wrapper, TaskWrapper):
            # DBTODO Refactor
                log.debug(f"({job_wrapper.job_id}) Dispatching task {job_wrapper.task_id} to {runner_name} runner")
            log.debug(f"({job_wrapper.job_id}) Dispatching task {job_wrapper.task_id} to task runner")
        else:
                log.debug(f"({job_wrapper.job_id}) Dispatching to {runner_name} runner")
            self.job_runners[runner_name].put(job_wrapper)
        except KeyError:
            log.error(f"put(): ({job_wrapper.job_id}) Invalid job runner: {runner_name}")
            job_wrapper.fail(DEFAULT_JOB_PUT_FAILURE_MESSAGE)
            log.debug(f"({job_wrapper.job_id}) Dispatching to {job_wrapper.job_destination.runner} runner")
        runner.put(job_wrapper)

    def stop(self, job, job_wrapper):
        """
@@ -1277,11 +1277,9 @@ class DefaultJobDispatcher:
    def recover(self, job, job_wrapper):
        runner_name = (job.job_runner_name.split(":", 1))[0]
        log.debug("recovering job %d in %s runner" % (job.id, runner_name))
        runner = self.get_job_runner(job_wrapper)
        try:
            self.job_runners[runner_name].recover(job, job_wrapper)
        except KeyError:
            log.error(f"recover(): ({job_wrapper.job_id}) Invalid job runner: {runner_name}")
            job_wrapper.fail(DEFAULT_JOB_PUT_FAILURE_MESSAGE)
            runner.recover(job, job_wrapper)
        except ObjectNotFound:
            msg = "Could not recover job working directory after Galaxy restart"
            log.exception(f"recover(): ({job_wrapper.job_id}) {msg}")
+2 −2
Original line number Diff line number Diff line
@@ -11,7 +11,7 @@ import time

from galaxy import model
from galaxy.jobs import JobDestination
from galaxy.jobs.handler import DEFAULT_JOB_PUT_FAILURE_MESSAGE
from galaxy.jobs.handler import DEFAULT_JOB_RUNNER_FAILURE_MESSAGE
from galaxy.jobs.runners import (
    AsynchronousJobRunner,
    AsynchronousJobState,
@@ -201,7 +201,7 @@ class DRMAAJobRunner(AsynchronousJobRunner):
            else:
                log.error(f"({galaxy_id_tag}) All attempts to submit job failed")
                if not fail_msg:
                    fail_msg = DEFAULT_JOB_PUT_FAILURE_MESSAGE
                    fail_msg = DEFAULT_JOB_RUNNER_FAILURE_MESSAGE
                job_wrapper.fail(fail_msg)
                return
        else:
+5 −0
Original line number Diff line number Diff line
@@ -646,6 +646,11 @@ class PulsarJobRunner(AsynchronousJobRunner):
            log.exception("failure finishing job %d", job_wrapper.job_id)
            return
        if not PulsarJobRunner.__remote_metadata(client):
            # we need an actual exit code file in the job working directory to detect job errors in the metadata script
            with open(
                os.path.join(job_wrapper.working_directory, f"galaxy_{job_wrapper.job_id}.ec"), "w"
            ) as exit_code_file:
                exit_code_file.write(str(exit_code))
            self._handle_metadata_externally(job_wrapper, resolve_requirements=True)
        # Finish the job
        try:
+25 −13
Original line number Diff line number Diff line
@@ -283,7 +283,7 @@ def set_metadata_portable(
        import_model_store = None

    tool_script_file = tool_job_working_directory / "tool_script.sh"
    job = None
    job: Optional[Job] = None
    if import_model_store and export_store:
        job = next(iter(import_model_store.sa_session.objects[Job].values()))

@@ -328,7 +328,7 @@ def set_metadata_portable(
            final_job_state = Job.states.ERROR
            job_messages.append(str(e))
        if job:
            job.job_messages = job_messages
            job.set_streams(tool_stdout=tool_stdout, tool_stderr=tool_stderr, job_messages=job_messages)
            job.state = final_job_state
            if os.path.exists(tool_script_file):
                with open(tool_script_file) as command_fh:
@@ -407,7 +407,19 @@ def set_metadata_portable(
                    files_path = os.path.abspath(
                        os.path.join(tool_job_working_directory, "working", extra_files_dir_name)
                    )
                    if os.path.exists(files_path):
                        dataset.dataset.external_extra_files_path = files_path
                    else:
                        # could be pulsar, stores extra files in outputs directory
                        pulsar_extra_files_path = os.path.join(
                            tool_job_working_directory, "outputs", extra_files_dir_name
                        )
                        if os.path.exists(pulsar_extra_files_path):
                            dataset.dataset.external_extra_files_path = pulsar_extra_files_path
                        elif dataset_filename_override and not object_store:
                            # pulsar, no remote metadata and no extended metadata
                            dataset.dataset.external_extra_files_path = os.path.join(os.path.dirname(dataset_filename_override), extra_files_dir_name)

            file_dict = tool_provided_metadata.get_dataset_meta(output_name, dataset.dataset.id, dataset.dataset.uuid)
            if "ext" in file_dict:
                dataset.extension = file_dict["ext"]
+1 −0
Original line number Diff line number Diff line
@@ -34,5 +34,6 @@ test_tools = integration_util.integration_tool_runner(
        "strict_shell",
        "tool_provided_metadata_9",
        "simple_constructs_y",
        "composite_output",
    ]
)