Unverified Commit 66852487 authored by John Chilton's avatar John Chilton Committed by GitHub
Browse files

Merge pull request #20862 from jmchilton/pulsar_15_10

Various Container Execution Enhancements (including GCP Batch support)
parents 286fd6e6 4e211235
Loading
Loading
Loading
Loading
+70 −0
Original line number Diff line number Diff line
@@ -338,6 +338,27 @@ runners:
    #app: {}
    #pulsar_config: path/to/pulsar/app.yml

  # Using Pulsar and co-execution to run jobs against a TES service.
  # - https://www.ga4gh.org/product/task-execution-service-tes/
  # - https://pulsar.readthedocs.io/en/latest/containers.html#ga4gh-tes
  pulsar_tes:
    load: galaxy.jobs.runners.pulsar:PulsarTesJobRunner
    # RabbitMQ URL from Galaxy server (include credentials).
    amqp_url: <amqp_url>
    # If Pulsar needs to talk to Galaxy at a particular host and port, set that here.
    #galaxy_url: <galaxy_url>  

  # Using Pulsar and co-execution to run jobs against Google Cloud Platform's Batch service.
  # - https://cloud.google.com/batch/docs/get-started
  # - https://pulsar.readthedocs.io/en/latest/containers.html#google-cloud-platform-batch
  pulsar_gcp:
    load: galaxy.jobs.runners.pulsar:PulsarGcpBatchJobRunner
    # RabbitMQ URL from Galaxy server (include credentials).
    amqp_url: <amqp_url>
    # If Pulsar needs to talk to Galaxy at a particular host and port, set that here.
    #galaxy_url: <galaxy_url>


# Job handler configuration - for a full discussion of job handlers, see the documentation at:
#     https://docs.galaxyproject.org/en/latest/admin/scaling.html
handling:
@@ -916,6 +937,55 @@ execution:
      # Path to Kubernetes configuration fil (see Kubernetes runner description.)
      #k8s_config_path: /path/to/kubeconfig

    pulsar_tes_environment:
      runner: pulsar_tes
      # A tes_url is required
      tes_url: "<tes_url>"
      #basic_auth:
      #  username: <username>
      #  password: <password>
      #cpu_cores: 1
      # Define if the task is allowed to run on preemptible compute instances,\nfor example, AWS Spot. This option may have no effect when utilized\non some backends that don't have the concept of preemptible jobs.
      #preemptible: false
      #ram_gb: 8
      #disk_gb: 40
      # Request that the task be run in these compute zones. How this string is utilized will be dependent on the backend system. For example, a\nsystem based on a cluster queueing system may use this string to define\npriorty queue to which the job is assigned.
      #zones: us-west-1
      #backend_parameters: {}
      #backend_parameters_strict: false
      # Configure the embedded Pulsar app, only message_queue_url is required but
      # other options may be useful (unsure).
      pulsar_app_config:
          # This needs to be the RabbitMQ server, but this should be the host
          # and port that your TES nodes would connect to the server via.
          message_queue_url: "<amqp_url>"

    pulsar_gcp_environment:
      runner: pulsar_gcp
      # required
      project_id: <gcp_project_id>
      # Path to GCP service account credentials file. (not sure if ~ would be implicitly respected in this example)
      #credentials_file: ~/.config/gcloud/application_default_credentials.json
      # GCP region or zone to use (optional)
      #region: us-central1
      # Max walltime to use in seconds (defaults to 60 * 60 * 24)
      #walltime_limit: 216000
      # Maximum number of retries for the job. Maps to TaskSpec.max_retry_count.
      #retry_count: 2
      # Name of the SSD volume to be mounted in the task. Shared among all containers in job.
      #ssd_name: pulsar_staging
      # Size of the shared local SSD disk in GB (must be a multiple of 375).
      #disk_size: 375
      # Machine type for the job's VM.
      #machine_type: n1-standard-1
      #labels: {}
      # Configure the embedded Pulsar app, only message_queue_url is required but
      # other options may be useful (unsure).
      pulsar_app_config:
          # This needs to be the RabbitMQ server, but this should be the host
          # and port that your GCP compute would connect to the server via.
          message_queue_url: "<amqp_url>"

    # Example CLI runners.
    ssh_torque:
      runner: cli
+23 −12
Original line number Diff line number Diff line
@@ -818,6 +818,13 @@ class AsynchronousJobState(JobState):
        if attribute not in self.cleanup_file_attributes:
            self.cleanup_file_attributes.append(attribute)

    def init_job_stream_files(self):
        """For runners that don't create explicit job scripts - create job stream files."""
        with open(self.output_file, "w"):
            pass
        with open(self.error_file, "w"):
            pass


class AsynchronousJobRunner(BaseJobRunner, Monitors):
    """Parent class for any job runner that runs jobs asynchronously (e.g. via
@@ -907,17 +914,7 @@ class AsynchronousJobRunner(BaseJobRunner, Monitors):
    def check_watched_item(self, job_state: AsynchronousJobState) -> Union[AsynchronousJobState, None]:
        raise NotImplementedError()

    def finish_job(self, job_state: AsynchronousJobState):
        """
        Get the output/error for a finished job, pass to `job_wrapper.finish`
        and cleanup all the job's temporary files.
        """
        galaxy_id_tag = job_state.job_wrapper.get_id_tag()
        external_job_id = job_state.job_id

        # To ensure that files below are readable, ownership must be reclaimed first
        job_state.job_wrapper.reclaim_ownership()

    def _collect_job_output(self, job_id: int, external_job_id: Optional[str], job_state: JobState):
        # wait for the files to appear
        which_try = 0
        collect_output_success = True
@@ -931,11 +928,25 @@ class AsynchronousJobRunner(BaseJobRunner, Monitors):
                if which_try == self.app.config.retry_job_output_collection:
                    stdout = ""
                    stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER
                    log.error("(%s/%s) %s: %s", galaxy_id_tag, external_job_id, stderr, unicodify(e))
                    log.error("(%s/%s) %s: %s", job_id, external_job_id, stderr, unicodify(e))
                    collect_output_success = False
                else:
                    time.sleep(1)
                which_try += 1
        return collect_output_success, stdout, stderr

    def finish_job(self, job_state: AsynchronousJobState):
        """
        Get the output/error for a finished job, pass to `job_wrapper.finish`
        and cleanup all the job's temporary files.
        """
        galaxy_id_tag = job_state.job_wrapper.get_id_tag()
        external_job_id = job_state.job_id

        # To ensure that files below are readable, ownership must be reclaimed first
        job_state.job_wrapper.reclaim_ownership()

        collect_output_success, stdout, stderr = self._collect_job_output(galaxy_id_tag, external_job_id, job_state)

        if not collect_output_success:
            job_state.fail_message = stderr
+3 −44
Original line number Diff line number Diff line
@@ -7,7 +7,6 @@ import logging
import math
import os
import re
import time
from dataclasses import dataclass
from datetime import datetime
from typing import Union
@@ -50,7 +49,6 @@ from galaxy.jobs.runners.util.pykube_util import (
    Service,
    service_object_dict,
)
from galaxy.util import unicodify
from galaxy.util.bytesize import ByteSize

log = logging.getLogger(__name__)
@@ -175,12 +173,8 @@ class KubernetesJobRunner(AsynchronousJobRunner):
            job_wrapper=job_wrapper,
            job_destination=job_wrapper.job_destination,
        )
        # Kubernetes doesn't really produce meaningful "job stdout", but file needs to be present
        with open(ajs.output_file, "w"):
            pass
        with open(ajs.error_file, "w"):
            pass

        # Kubernetes doesn't really produce a "job script", but job stream files needs to be present
        ajs.init_job_stream_files()
        if not self.prepare_job(
            job_wrapper,
            include_metadata=False,
@@ -680,26 +674,6 @@ class KubernetesJobRunner(AsynchronousJobRunner):
        """
        return ByteSize(mem_value).value

    def __assemble_k8s_container_image_name(self, job_wrapper):
        """Assembles the container image name as repo/owner/image:tag, where repo, owner and tag are optional"""
        job_destination = job_wrapper.job_destination

        # Determine the job's Kubernetes destination (context, namespace) and options from the job destination
        # definition
        repo = ""
        owner = ""
        if "repo" in job_destination.params:
            repo = f"{job_destination.params['repo']}/"
        if "owner" in job_destination.params:
            owner = f"{job_destination.params['owner']}/"

        k8s_cont_image = repo + owner + job_destination.params["image"]

        if "tag" in job_destination.params:
            k8s_cont_image += f":{job_destination.params['tag']}"

        return k8s_cont_image

    def __get_k8s_container_name(self, job_wrapper):
        # These must follow a specific regex for Kubernetes.
        raw_id = job_wrapper.job_destination.id
@@ -1115,22 +1089,7 @@ class KubernetesJobRunner(AsynchronousJobRunner):
        # To ensure that files below are readable, ownership must be reclaimed first
        job_state.job_wrapper.reclaim_ownership()

        # wait for the files to appear
        which_try = 0
        while which_try < self.app.config.retry_job_output_collection + 1:
            try:
                with open(job_state.output_file, "rb") as stdout_file, open(job_state.error_file, "rb") as stderr_file:
                    job_stdout = self._job_io_for_db(stdout_file)
                    job_stderr = self._job_io_for_db(stderr_file)
                break
            except Exception as e:
                if which_try == self.app.config.retry_job_output_collection:
                    job_stdout = ""
                    job_stderr = job_state.runner_states.JOB_OUTPUT_NOT_RETURNED_FROM_CLUSTER
                    log.error(f"{gxy_job.id}/{gxy_job.job_runner_external_id} {job_stderr}: {unicodify(e)}")
                else:
                    time.sleep(1)
                which_try += 1
        _, job_stdout, job_stderr = self._collect_job_output(gxy_job.id, gxy_job.job_runner_external_id, job_state)

        # get stderr and stdout to database
        outputs_directory = os.path.join(job_state.job_wrapper.working_directory, "outputs")
+41 −20
Original line number Diff line number Diff line
@@ -210,6 +210,7 @@ class PulsarJobRunner(AsynchronousJobRunner):
    default_build_pulsar_app = False
    use_mq = False
    poll = True
    client_manager_kwargs: dict[str, Any] = {}

    def __init__(self, app, nworkers, **kwds):
        """Start the job runner."""
@@ -240,19 +241,27 @@ class PulsarJobRunner(AsynchronousJobRunner):
            pulsar_conf_file = self.runner_params.get("pulsar_config", None)
        self.__init_pulsar_app(pulsar_conf, pulsar_conf_file)

        client_manager_kwargs = {}
        for kwd in "manager", "cache", "transport", "persistence_directory":
            client_manager_kwargs[kwd] = self.runner_params[kwd]
        client_manager_kwargs = self._pulsar_client_manager_args()
        if self.pulsar_app is not None:
            client_manager_kwargs["pulsar_app"] = self.pulsar_app
            # TODO: Hack remove this following line pulsar lib update
            # that includes https://github.com/galaxyproject/pulsar/commit/ce0636a5b64fae52d165bcad77b2caa3f0e9c232
            client_manager_kwargs["file_cache"] = None
        self.client_manager = build_client_manager(**client_manager_kwargs)

    def _pulsar_client_manager_args(self):
        """Most connection parameters can be specified as environment parameters, but...

        ... global parameters about message queues, what Pulsar client to use, etc... must
        be specified as runner parameters. This method returns a configuration based
        on the runner parameters that is ready for Pulsar's build_client_manager.
        """
        client_manager_kwargs = self.client_manager_kwargs.copy()
        for kwd in "manager", "cache", "transport", "persistence_directory":
            client_manager_kwargs[kwd] = self.runner_params[kwd]

        for kwd in self.runner_params.keys():
            if kwd.startswith("amqp_") or kwd.startswith("transport_"):
                client_manager_kwargs[kwd] = self.runner_params[kwd]
        self.client_manager = build_client_manager(**client_manager_kwargs)

        return client_manager_kwargs

    def __init_pulsar_app(self, conf, pulsar_conf_path):
        if conf is None and pulsar_conf_path is None and not self.default_build_pulsar_app:
@@ -1050,7 +1059,6 @@ DEFAULT_PULSAR_CONTAINER = "galaxy/pulsar-pod-staging:0.15.0.2"
COEXECUTION_DESTINATION_DEFAULTS = {
    "default_file_action": "remote_transfer",
    "rewrite_parameters": "true",
    "jobs_directory": "/pulsar_staging",
    "pulsar_container_image": DEFAULT_PULSAR_CONTAINER,
    "remote_container_handling": True,
    "url": PARAMETER_SPECIFICATION_IGNORED,
@@ -1058,27 +1066,20 @@ COEXECUTION_DESTINATION_DEFAULTS = {
}


class PulsarCoexecutionJobRunner(PulsarMQJobRunner):
class PulsarCoexecutionJobRunner(PulsarJobRunner):
    destination_defaults = COEXECUTION_DESTINATION_DEFAULTS

    def _populate_parameter_defaults(self, job_destination):
        super()._populate_parameter_defaults(job_destination)
        params = job_destination.params
        # Set some sensible defaults for Pulsar application that runs in staging container.
        if "pulsar_app_config" not in params:
            params["pulsar_app_config"] = {}
        pulsar_app_config = params["pulsar_app_config"]
        if "staging_directory" not in pulsar_app_config:
            # coexecution always uses a fixed path for staging directory
            pulsar_app_config["staging_directory"] = params.get("jobs_directory")


KUBERNETES_DESTINATION_DEFAULTS: dict[str, Any] = {"k8s_enabled": True, **COEXECUTION_DESTINATION_DEFAULTS}

KUBERNETES_CLIENT_MANAGER_KWARGS = {"k8s_enabled": True}


class PulsarKubernetesJobRunner(PulsarCoexecutionJobRunner):
    destination_defaults = KUBERNETES_DESTINATION_DEFAULTS
    use_mq = True
    poll = True  # Poll so we can check API for pod IP for ITs.
    client_manager_kwargs = KUBERNETES_CLIENT_MANAGER_KWARGS


TES_DESTINATION_DEFAULTS: dict[str, Any] = {
@@ -1086,9 +1087,29 @@ TES_DESTINATION_DEFAULTS: dict[str, Any] = {
    **COEXECUTION_DESTINATION_DEFAULTS,
}

TES_CLIENT_MANAGER_KWARGS = {"tes_enabled": True}


class PulsarTesJobRunner(PulsarCoexecutionJobRunner):
    destination_defaults = TES_DESTINATION_DEFAULTS
    client_manager_kwargs = TES_CLIENT_MANAGER_KWARGS
    use_mq = True
    poll = False


GCP_DESTINATION_DEFAULTS: dict[str, Any] = {
    "project_id": PARAMETER_SPECIFICATION_REQUIRED,
    **COEXECUTION_DESTINATION_DEFAULTS,
}
GCP_BATCH_CLIENT_MANAGER_KWARGS = {"gcp_batch_enabled": True}


class PulsarGcpBatchJobRunner(PulsarCoexecutionJobRunner):
    use_mq = True
    poll = False

    client_manager_kwargs = GCP_BATCH_CLIENT_MANAGER_KWARGS
    destination_defaults = GCP_DESTINATION_DEFAULTS


class PulsarRESTJobRunner(PulsarJobRunner):
+1 −1
Original line number Diff line number Diff line
@@ -70,7 +70,7 @@ dependencies = [
    "pebble",
    "pillow",
    "psutil",
    "pulsar-galaxy-lib>=0.15.0.dev0",
    "pulsar-galaxy-lib>=0.15.10",
    "pycryptodome",
    "pydantic[email]>=2.7.4",  # https://github.com/pydantic/pydantic/pull/9639
    "PyJWT",