Commit 517c5fbc authored by Yakubov, Sergey's avatar Yakubov, Sergey
Browse files

add detailed job status for pulsar jobs

parent ce66a09c
Loading
Loading
Loading
Loading
+29 −11
Original line number Diff line number Diff line
@@ -9,7 +9,6 @@ import logging
import os
import re
import subprocess
from pathlib import Path
from time import sleep
from typing import (
    Any,
@@ -35,10 +34,10 @@ from pulsar.client import (
    submit_job as pulsar_submit_job,
    url_to_destination_params,
)

# TODO: Perform pulsar release with this included in the client package
from pulsar.client.staging import DEFAULT_DYNAMIC_COLLECTION_PATTERN
from sqlalchemy import select
from sqlalchemy.orm.attributes import flag_modified

from galaxy import model
from galaxy.job_execution.compute_environment import (
@@ -53,6 +52,7 @@ from galaxy.jobs.runners import (
    JobState,
)
from galaxy.model.base import check_database_connection
from galaxy.model.base import transaction
from galaxy.tool_util.deps import dependencies
from galaxy.util import (
    galaxy_directory,
@@ -193,7 +193,6 @@ PULSAR_PARAM_SPECS = dict(
    ),
)


PARAMETER_SPECIFICATION_REQUIRED = object()
PARAMETER_SPECIFICATION_IGNORED = object()

@@ -322,6 +321,21 @@ class PulsarJobRunner(AsynchronousJobRunner):
        job_state = self._update_job_state_for_status(job_state, status)
        return job_state

    def _update_job_for_status_details(self, job, job_wrapper, status_details):
        if not job.job_messages:
            job.job_messages = [status_details]
        else:
            for message in job.job_messages:
                if "status_details" in message:
                    message.update(status_details)
                    break
            else:
                job.job_messages.append(status_details)
        flag_modified(job, "job_messages")
        job_wrapper.sa_session.add(job)
        with transaction(self.sa_session):
            self.sa_session.commit()

    def _update_job_state_for_status(self, job_state, pulsar_status, full_status=None):
        log.debug("(%s) Received status update: %s", job_state.job_id, pulsar_status)
        if pulsar_status in ["complete", "cancelled"]:
@@ -1028,7 +1042,11 @@ class PulsarJobRunner(AsynchronousJobRunner):
                galaxy_job_id = remote_job_id
            job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id(galaxy_job_id)
            job_state = self._job_state(job, job_wrapper)

            self._update_job_state_for_status(job_state, full_status["status"], full_status=full_status)
            if "status_details" in full_status:
                self._update_job_for_status_details(job, job_wrapper, full_status["status_details"])

        except Exception:
            log.exception(f"Failed to update Pulsar job status for job_id ({galaxy_job_id}/{remote_job_id})")
            raise