Commit a2afb2ce authored by John Chilton's avatar John Chilton
Browse files

Co-execution update - implement GCP, improve TES.

parent 93bbc008
Loading
Loading
Loading
Loading
+70 −0
Original line number Diff line number Diff line
@@ -338,6 +338,27 @@ runners:
    #app: {}
    #pulsar_config: path/to/pulsar/app.yml

  # Using Pulsar and co-execution to run jobs against a TES service.
  # - https://www.ga4gh.org/product/task-execution-service-tes/
  # - https://pulsar.readthedocs.io/en/latest/containers.html#ga4gh-tes
  pulsar_tes:
    load: galaxy.jobs.runners.pulsar:PulsarTesJobRunner
    # RabbitMQ URL from Galaxy server (include credentials).
    amqp_url: <amqp_url>
    # If Pulsar needs to talk to Galaxy at a particular host and port, set that here.
    #galaxy_url: <galaxy_url>  

  # Using Pulsar and co-execution to run jobs against Google Cloud Platform's Batch service.
  # - https://cloud.google.com/batch/docs/get-started
  # - https://pulsar.readthedocs.io/en/latest/containers.html#google-cloud-platform-batch
  pulsar_gcp:
    load: galaxy.jobs.runners.pulsar:PulsarGcpBatchJobRunner
    # RabbitMQ URL from Galaxy server (include credentials).
    amqp_url: <amqp_url>
    # If Pulsar needs to talk to Galaxy at a particular host and port, set that here.
    #galaxy_url: <galaxy_url>


# Job handler configuration - for a full discussion of job handlers, see the documentation at:
#     https://docs.galaxyproject.org/en/latest/admin/scaling.html
handling:
@@ -916,6 +937,55 @@ execution:
      # Path to Kubernetes configuration fil (see Kubernetes runner description.)
      #k8s_config_path: /path/to/kubeconfig

    pulsar_tes_environment:
      runner: pulsar_tes
      # A tes_url is required
      tes_url: "<tes_url>"
      #basic_auth:
      #  username: <username>
      #  password: <password>
      #cpu_cores: 1
      # Define if the task is allowed to run on preemptible compute instances,\nfor example, AWS Spot. This option may have no effect when utilized\non some backends that don't have the concept of preemptible jobs.
      #preemptible: false
      #ram_gb: 8
      #disk_gb: 40
      # Request that the task be run in these compute zones. How this string is utilized will be dependent on the backend system. For example, a\nsystem based on a cluster queueing system may use this string to define\npriorty queue to which the job is assigned.
      #zones: us-west-1
      #backend_parameters: {}
      #backend_parameters_strict: false
      # Configure the embedded Pulsar app, only message_queue_url is required but
      # other options may be useful (unsure).
      pulsar_app_config:
          # This needs to be the RabbitMQ server, but this should be the host
          # and port that your TES nodes would connect to the server via.
          message_queue_url: "<amqp_url>"

    pulsar_gcp_environment:
      runner: pulsar_gcp
      # required
      project_id: <gcp_project_id>
      # Path to GCP service account credentials file. (not sure if ~ would be implicitly respected in this example)
      #credentials_file: ~/.config/gcloud/application_default_credentials.json
      # GCP region or zone to use (optional)
      #region: us-central1
      # Max walltime to use in seconds (defaults to 60 * 60 * 24)
      #walltime_limit: 216000
      # Maximum number of retries for the job. Maps to TaskSpec.max_retry_count.
      #retry_count: 2
      # Name of the SSD volume to be mounted in the task. Shared among all containers in job.
      #ssd_name: pulsar_staging
      # Size of the shared local SSD disk in GB (must be a multiple of 375).
      #disk_size: 375
      # Machine type for the job's VM.
      #machine_type: n1-standard-1
      #labels: {}
      # Configure the embedded Pulsar app, only message_queue_url is required but
      # other options may be useful (unsure).
      pulsar_app_config:
          # This needs to be the RabbitMQ server, but this should be the host
          # and port that your GCP compute would connect to the server via.
          message_queue_url: "<amqp_url>"

    # Example CLI runners.
    ssh_torque:
      runner: cli
+41 −17
Original line number Diff line number Diff line
@@ -210,6 +210,7 @@ class PulsarJobRunner(AsynchronousJobRunner):
    default_build_pulsar_app = False
    use_mq = False
    poll = True
    client_manager_kwargs: dict[str, Any] = {}

    def __init__(self, app, nworkers, **kwds):
        """Start the job runner."""
@@ -240,16 +241,27 @@ class PulsarJobRunner(AsynchronousJobRunner):
            pulsar_conf_file = self.runner_params.get("pulsar_config", None)
        self.__init_pulsar_app(pulsar_conf, pulsar_conf_file)

        client_manager_kwargs = {}
        for kwd in "manager", "cache", "transport", "persistence_directory":
            client_manager_kwargs[kwd] = self.runner_params[kwd]
        client_manager_kwargs = self._pulsar_client_manager_args()
        if self.pulsar_app is not None:
            client_manager_kwargs["pulsar_app"] = self.pulsar_app
        self.client_manager = build_client_manager(**client_manager_kwargs)

    def _pulsar_client_manager_args(self):
        """Most connection parameters can be specified as environment parameters, but...

        ... global parameters about message queues, what Pulsar client to use, etc... must
        be specified as runner parameters. This method returns a configuration based
        on the runner parameters that is ready for Pulsar's build_client_manager.
        """
        client_manager_kwargs = self.client_manager_kwargs.copy()
        for kwd in "manager", "cache", "transport", "persistence_directory":
            client_manager_kwargs[kwd] = self.runner_params[kwd]

        for kwd in self.runner_params.keys():
            if kwd.startswith("amqp_") or kwd.startswith("transport_"):
                client_manager_kwargs[kwd] = self.runner_params[kwd]
        self.client_manager = build_client_manager(**client_manager_kwargs)

        return client_manager_kwargs

    def __init_pulsar_app(self, conf, pulsar_conf_path):
        if conf is None and pulsar_conf_path is None and not self.default_build_pulsar_app:
@@ -1047,7 +1059,6 @@ DEFAULT_PULSAR_CONTAINER = "galaxy/pulsar-pod-staging:0.15.0.2"
COEXECUTION_DESTINATION_DEFAULTS = {
    "default_file_action": "remote_transfer",
    "rewrite_parameters": "true",
    "jobs_directory": "/pulsar_staging",
    "pulsar_container_image": DEFAULT_PULSAR_CONTAINER,
    "remote_container_handling": True,
    "url": PARAMETER_SPECIFICATION_IGNORED,
@@ -1055,27 +1066,20 @@ COEXECUTION_DESTINATION_DEFAULTS = {
}


class PulsarCoexecutionJobRunner(PulsarMQJobRunner):
class PulsarCoexecutionJobRunner(PulsarJobRunner):
    destination_defaults = COEXECUTION_DESTINATION_DEFAULTS

    def _populate_parameter_defaults(self, job_destination):
        super()._populate_parameter_defaults(job_destination)
        params = job_destination.params
        # Set some sensible defaults for Pulsar application that runs in staging container.
        if "pulsar_app_config" not in params:
            params["pulsar_app_config"] = {}
        pulsar_app_config = params["pulsar_app_config"]
        if "staging_directory" not in pulsar_app_config:
            # coexecution always uses a fixed path for staging directory
            pulsar_app_config["staging_directory"] = params.get("jobs_directory")


KUBERNETES_DESTINATION_DEFAULTS: dict[str, Any] = {"k8s_enabled": True, **COEXECUTION_DESTINATION_DEFAULTS}

KUBERNETES_CLIENT_MANAGER_KWARGS = {"k8s_enabled": True}


class PulsarKubernetesJobRunner(PulsarCoexecutionJobRunner):
    destination_defaults = KUBERNETES_DESTINATION_DEFAULTS
    use_mq = True
    poll = True  # Poll so we can check API for pod IP for ITs.
    client_manager_kwargs = KUBERNETES_CLIENT_MANAGER_KWARGS


TES_DESTINATION_DEFAULTS: dict[str, Any] = {
@@ -1083,9 +1087,29 @@ TES_DESTINATION_DEFAULTS: dict[str, Any] = {
    **COEXECUTION_DESTINATION_DEFAULTS,
}

TES_CLIENT_MANAGER_KWARGS = {"tes_enabled": True}


class PulsarTesJobRunner(PulsarCoexecutionJobRunner):
    destination_defaults = TES_DESTINATION_DEFAULTS
    client_manager_kwargs = TES_CLIENT_MANAGER_KWARGS
    use_mq = True
    poll = False


GCP_DESTINATION_DEFAULTS: dict[str, Any] = {
    "project_id": PARAMETER_SPECIFICATION_REQUIRED,
    **COEXECUTION_DESTINATION_DEFAULTS,
}
GCP_BATCH_CLIENT_MANAGER_KWARGS = {"gcp_batch_enabled": True}


class PulsarGcpBatchJobRunner(PulsarCoexecutionJobRunner):
    use_mq = True
    poll = False

    client_manager_kwargs = GCP_BATCH_CLIENT_MANAGER_KWARGS
    destination_defaults = GCP_DESTINATION_DEFAULTS


class PulsarRESTJobRunner(PulsarJobRunner):
+1 −1
Original line number Diff line number Diff line
@@ -70,7 +70,7 @@ dependencies = [
    "pebble",
    "pillow",
    "psutil",
    "pulsar-galaxy-lib>=0.15.0.dev0",
    "pulsar-galaxy-lib>=0.15.10",
    "pycryptodome",
    "pydantic[email]>=2.7.4",  # https://github.com/pydantic/pydantic/pull/9639
    "PyJWT",