Commit 75f8f1c7 authored by Yakubov, Sergey's avatar Yakubov, Sergey
Browse files

Merge branch 'dev' into 'main'

Sync with dev

See merge request !120
parents 8a47610d 69021ecf
Loading
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -9,8 +9,8 @@ variables:
  CONTAINER_GALAXY_URL: "${NDIP_DOCKER_REPOSITORY}/${CI_PROJECT_PATH}"
  CONTAINER_GALAXY_BASE_URL: "${CONTAINER_GALAXY_URL}/base"
  CONTAINER_GALAXY_COMMIT_URL: "${CONTAINER_GALAXY_URL}/commit"
  GALAXY_VERSION_PYTHON: 24.2.dev1+ornl
  GALAXY_VERSION_DOCKER: 24.2.dev1.ornl
  GALAXY_VERSION_PYTHON: 24.2.dev2+ornl
  GALAXY_VERSION_DOCKER: 24.2.dev2.ornl

# This import is for the func_rse_docker_* functions
before_script:
+1 −1
Original line number Diff line number Diff line
@@ -290,7 +290,7 @@ class AuthnzManager:
            raise exceptions.ItemAccessibilityException(msg)

    def refresh_expiring_oidc_tokens_for_provider(self, trans, auth):
        with open("/dev/null", "w") as lock:
        with open("/tmp/galaxy_refresh_lock", "w") as lock:
            try:
                fcntl.flock(lock, fcntl.LOCK_EX | fcntl.LOCK_NB)
                success, message, backend = self._get_authnz_backend(auth.provider)
+29 −11
Original line number Diff line number Diff line
@@ -9,7 +9,6 @@ import logging
import os
import re
import subprocess
from pathlib import Path
from time import sleep
from typing import (
    Any,
@@ -35,10 +34,10 @@ from pulsar.client import (
    submit_job as pulsar_submit_job,
    url_to_destination_params,
)

# TODO: Perform pulsar release with this included in the client package
from pulsar.client.staging import DEFAULT_DYNAMIC_COLLECTION_PATTERN
from sqlalchemy import select
from sqlalchemy.orm.attributes import flag_modified

from galaxy import model
from galaxy.job_execution.compute_environment import (
@@ -53,6 +52,7 @@ from galaxy.jobs.runners import (
    JobState,
)
from galaxy.model.base import check_database_connection
from galaxy.model.base import transaction
from galaxy.tool_util.deps import dependencies
from galaxy.util import (
    galaxy_directory,
@@ -193,7 +193,6 @@ PULSAR_PARAM_SPECS = dict(
    ),
)


PARAMETER_SPECIFICATION_REQUIRED = object()
PARAMETER_SPECIFICATION_IGNORED = object()

@@ -322,6 +321,21 @@ class PulsarJobRunner(AsynchronousJobRunner):
        job_state = self._update_job_state_for_status(job_state, status)
        return job_state

    def _update_job_for_status_details(self, job, job_wrapper, status_details):
        if not job.job_messages:
            job.job_messages = [status_details]
        else:
            for message in job.job_messages:
                if "status_details" in message:
                    message.update(status_details)
                    break
            else:
                job.job_messages.append(status_details)
        flag_modified(job, "job_messages")
        job_wrapper.sa_session.add(job)
        with transaction(self.sa_session):
            self.sa_session.commit()

    def _update_job_state_for_status(self, job_state, pulsar_status, full_status=None):
        log.debug("(%s) Received status update: %s", job_state.job_id, pulsar_status)
        if pulsar_status in ["complete", "cancelled"]:
@@ -1028,7 +1042,11 @@ class PulsarJobRunner(AsynchronousJobRunner):
                galaxy_job_id = remote_job_id
            job, job_wrapper = self.app.job_manager.job_handler.job_queue.job_pair_for_id(galaxy_job_id)
            job_state = self._job_state(job, job_wrapper)

            self._update_job_state_for_status(job_state, full_status["status"], full_status=full_status)
            if "status_details" in full_status:
                self._update_job_for_status_details(job, job_wrapper, full_status["status_details"])

        except Exception:
            log.exception(f"Failed to update Pulsar job status for job_id ({galaxy_job_id}/{remote_job_id})")
            raise