Commit 69021ecf authored by Yakubov, Sergey's avatar Yakubov, Sergey
Browse files

Merge branch '137-add-detailed-job-status-field' into 'dev'

Add detailed job status field

Closes #137

See merge request !119
parents 408136de 83d2e2ca
Loading
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -9,8 +9,8 @@ variables:
  CONTAINER_GALAXY_URL: "${NDIP_DOCKER_REPOSITORY}/${CI_PROJECT_PATH}"
  CONTAINER_GALAXY_BASE_URL: "${CONTAINER_GALAXY_URL}/base"
  CONTAINER_GALAXY_COMMIT_URL: "${CONTAINER_GALAXY_URL}/commit"
  GALAXY_VERSION_PYTHON: 24.2.dev1+ornl
  GALAXY_VERSION_DOCKER: 24.2.dev1.ornl
  GALAXY_VERSION_PYTHON: 24.2.dev2+ornl
  GALAXY_VERSION_DOCKER: 24.2.dev2.ornl

# This import is for the func_rse_docker_* functions
before_script:
+1 −1
Original line number Diff line number Diff line
@@ -290,7 +290,7 @@ class AuthnzManager:
            raise exceptions.ItemAccessibilityException(msg)

    def refresh_expiring_oidc_tokens_for_provider(self, trans, auth):
        with open("/dev/null", "w") as lock:
        with open("/tmp/galaxy_refresh_lock", "w") as lock:
            try:
                fcntl.flock(lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
                success, message, backend = self._get_authnz_backend(auth.provider)
+29 −11
Original line number Diff line number Diff line
@@ -9,7 +9,6 @@ import logging
import os
import re
import subprocess
from pathlib import Path
from time import sleep
from typing import (
    Any,
@@ -35,10 +34,10 @@ from pulsar.client import (
    submit_job as pulsar_submit_job,
    url_to_destination_params,
)

# TODO: Perform pulsar release with this included in the client package
from pulsar.client.staging import DEFAULT_DYNAMIC_COLLECTION_PATTERN
from sqlalchemy import select
from sqlalchemy.orm.attributes import flag_modified

from galaxy import model
from galaxy.job_execution.compute_environment import (
@@ -53,6 +52,7 @@ from galaxy.jobs.runners import (
    JobState,
)
from galaxy.model.base import check_database_connection
from galaxy.model.base import transaction
from galaxy.tool_util.deps import dependencies
from galaxy.util import (
    galaxy_directory,
@@ -193,7 +193,6 @@ PULSAR_PARAM_SPECS = dict(
    ),
)


PARAMETER_SPECIFICATION_REQUIRED = object()
PARAMETER_SPECIFICATION_IGNORED = object()

@@ -322,6 +321,21 @@ class PulsarJobRunner(AsynchronousJobRunner):
        job_state = self._update_job_state_for_status(job_state, status)
        return job_state

    def _update_job_for_status_details(self, job, job_wrapper, status_details):
        if not job.job_messages:
            job.job_messages = [status_details]
        else:
            for message in job.job_messages:
                if "status_details" in message:
                    message.update(status_details)
                    break
            else:
                job.job_messages.append(status_details)
        flag_modified(job, "job_messages")
        job_wrapper.sa_session.add(job)
        with transaction(self.sa_session):
            self.sa_session.commit()

    def _update_job_state_for_status(self, job_state, pulsar_status, full_status=None):
        log.debug("(%s) Received status update: %s", job_state.job_id, pulsar_status)
        if pulsar_status in ["complete", "cancelled"]:
@@ -1028,7 +1042,11 @@ class PulsarJobRunner(AsynchronousJobRunner):
                galaxy_job_id = remote_job_id
            job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id(galaxy_job_id)
            job_state = self._job_state(job, job_wrapper)

            self._update_job_state_for_status(job_state, full_status["status"], full_status=full_status)
            if "status_details" in full_status:
                self._update_job_for_status_details(job, job_wrapper, full_status["status_details"])

        except Exception:
            log.exception(f"Failed to update Pulsar job status for job_id ({galaxy_job_id}/{remote_job_id})")
            raise