diff --git a/pulsar/client/client.py b/pulsar/client/client.py index 6c4eb05e0faaf20fe610b9f3f39f6d4ef20cf9a3..b589a219b3b58cba1428fa0b709f7af1fcdcb195 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -2,16 +2,16 @@ import logging import os from pulsar.managers.util.pykube_util import ( - delete_job, ensure_pykube, find_job_object_by_name, find_pod_object_by_name, galaxy_instance_id, Job, job_object_dict, - produce_k8s_job_prefix, + produce_unique_k8s_job_name, pull_policy, pykube_client_from_dict, + stop_job, ) from .action_mapper import ( actions, @@ -479,7 +479,7 @@ class MessageCoexecutionPodJobClient(BaseMessageJobClient): base64_message = to_base64_json(launch_params) base64_app_conf = to_base64_json(pulsar_app_config) - k8s_job_prefix = self._k8s_job_prefix + job_name = self._k8s_job_name params = self.destination_params pulsar_container_image = self.pulsar_container_image @@ -527,7 +527,7 @@ class MessageCoexecutionPodJobClient(BaseMessageJobClient): template = { "metadata": { - "labels": {"app": k8s_job_prefix}, + "labels": {"app": job_name}, }, "spec": { "volumes": volumes, @@ -537,23 +537,23 @@ class MessageCoexecutionPodJobClient(BaseMessageJobClient): } spec = {"template": template} spec.update(self._job_spec_params(params)) - k8s_job_obj = job_object_dict(params, k8s_job_prefix, spec) + k8s_job_obj = job_object_dict(params, job_name, spec) pykube_client = self._pykube_client job = Job(pykube_client, k8s_job_obj) job.create() def kill(self): - job_name = self._k8s_job_prefix + job_name = self._k8s_job_name pykube_client = self._pykube_client job = find_job_object_by_name(pykube_client, job_name) if job: log.info("Kill k8s job with name %s" % job_name) - delete_job(job) + stop_job(job) else: log.info("Attempted to kill k8s job but it is unavailable.") def job_ip(self): - job_name = self._k8s_job_prefix + job_name = self._k8s_job_name pykube_client = self._pykube_client pod = find_pod_object_by_name(pykube_client, job_name) if pod: @@ -572,10 +572,10 @@ class MessageCoexecutionPodJobClient(BaseMessageJobClient): return pykube_client_from_dict(self.destination_params) @property - def _k8s_job_prefix(self): + def _k8s_job_name(self): job_id = self.job_id - job_prefix = produce_k8s_job_prefix(app_prefix="pulsar", job_id=job_id, instance_id=self.instance_id) - return job_prefix + job_name = produce_unique_k8s_job_name(app_prefix="pulsar", job_id=job_id, instance_id=self.instance_id) + return job_name def _job_spec_params(self, params): spec = {} diff --git a/pulsar/client/job_directory.py b/pulsar/client/job_directory.py index 6295727a446d4cad955a592cbbb0d61db5be051d..f9e136fa9a1ab01ad7e747881ca82ece4295eaea 100644 --- a/pulsar/client/job_directory.py +++ b/pulsar/client/job_directory.py @@ -42,6 +42,9 @@ class RemoteJobDirectory: else: self.job_directory = remote_staging_directory + def home_directory(self): + return self._sub_dir('home') + def metadata_directory(self): return self._sub_dir('metadata') diff --git a/pulsar/managers/base/directory.py b/pulsar/managers/base/directory.py index 98e133e5d8a27291850e498c2eec03c4bf207230..96a0055ec8e159eba210c7c5fd8e788b251d9b44 100644 --- a/pulsar/managers/base/directory.py +++ b/pulsar/managers/base/directory.py @@ -160,9 +160,10 @@ class DirectoryBaseManager(BaseManager): 'galaxy_lib': self._galaxy_lib(), 'preserve_python_environment': setup_params.get('preserve_galaxy_python_environment', False), 'env_setup_commands': env_setup_commands, - 'job_directory': self.job_directory(job_id).job_directory, + # job_diredctory not used by job_script and it calls the job directory working directory 'working_directory': self.job_directory(job_id).working_directory(), 'metadata_directory': self.job_directory(job_id).metadata_directory(), + 'home_directory': self.job_directory(job_id).home_directory(), 'job_id': job_id, 'tmp_dir_creation_statement': self._tmp_dir(job_id), } diff --git a/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh b/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh index 7b4d04d0a58b81bb04d66c0d2239cecf5fd24170..08d698ab77928eb8f50497cf711884c43df684f3 100644 --- a/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh +++ b/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh @@ -5,7 +5,7 @@ $headers _galaxy_setup_environment() { local _use_framework_galaxy="$1" _GALAXY_JOB_DIR="$working_directory" - _GALAXY_JOB_HOME_DIR="$working_directory/home" + _GALAXY_JOB_HOME_DIR="$home_directory" _GALAXY_JOB_TMP_DIR=$tmp_dir_creation_statement $env_setup_commands if [ "$GALAXY_LIB" != "None" -a "$_use_framework_galaxy" = "True" ]; then diff --git a/pulsar/managers/util/job_script/__init__.py b/pulsar/managers/util/job_script/__init__.py index 3d662811cc1b64b25607a12963fb3b3383b42aca..d3512d9f370dcde4038700152cdb93ef40c5225f 100644 --- a/pulsar/managers/util/job_script/__init__.py +++ b/pulsar/managers/util/job_script/__init__.py @@ -93,6 +93,9 @@ def job_script(template=DEFAULT_JOB_FILE_TEMPLATE, **kwds): kwds["instrument_pre_commands"] = job_instrumenter.pre_execute_commands(working_directory) or "" kwds["instrument_post_commands"] = job_instrumenter.post_execute_commands(working_directory) or "" + # Setup home directory var + kwds["home_directory"] = kwds.get("home_directory", os.path.join(kwds["working_directory"], "home")) + template_params = OPTIONAL_TEMPLATE_PARAMS.copy() template_params.update(**kwds) env_setup_commands_str = "\n".join(template_params["env_setup_commands"]) diff --git a/pulsar/managers/util/pykube_util.py b/pulsar/managers/util/pykube_util.py index 7c3f32d87b09537a3f3a930a7c1cf00469167ebb..53fb0f4447907c3142186275122ca8f266ba910c 100644 --- a/pulsar/managers/util/pykube_util.py +++ b/pulsar/managers/util/pykube_util.py @@ -2,42 +2,30 @@ import logging import os import re -from pathlib import PurePath +import uuid try: from pykube.config import KubeConfig - from pykube.exceptions import HTTPError from pykube.http import HTTPClient from pykube.objects import ( - Ingress, Job, - Pod, - Service, + Pod ) except ImportError as exc: KubeConfig = None - Ingress = None Job = None Pod = None - Service = None - HTTPError = None - K8S_IMPORT_MESSAGE = ( - "The Python pykube package is required to use " - "this feature, please install it or correct the " - "following error:\nImportError %s" % str(exc) - ) + K8S_IMPORT_MESSAGE = ('The Python pykube package is required to use ' + 'this feature, please install it or correct the ' + 'following error:\nImportError %s' % str(exc)) log = logging.getLogger(__name__) DEFAULT_JOB_API_VERSION = "batch/v1" -DEFAULT_SERVICE_API_VERSION = "v1" -DEFAULT_INGRESS_API_VERSION = "extensions/v1beta1" DEFAULT_NAMESPACE = "default" -INSTANCE_ID_INVALID_MESSAGE = ( - "Galaxy instance [%s] is either too long " - "(>20 characters) or it includes non DNS " - "acceptable characters, ignoring it." -) +INSTANCE_ID_INVALID_MESSAGE = ("Galaxy instance [%s] is either too long " + "(>20 characters) or it includes non DNS " + "acceptable characters, ignoring it.") def ensure_pykube(): @@ -51,233 +39,87 @@ def pykube_client_from_dict(params): else: config_path = params.get("k8s_config_path") if config_path is None: - config_path = os.environ.get("KUBECONFIG", None) + config_path = os.environ.get('KUBECONFIG', None) if config_path is None: - config_path = "~/.kube/config" + config_path = '~/.kube/config' pykube_client = HTTPClient(KubeConfig.from_file(config_path)) return pykube_client -def produce_k8s_job_prefix(app_prefix=None, instance_id=None): - job_name_elems = [app_prefix or "", instance_id or ""] - return "-".join(elem for elem in job_name_elems if elem) +def produce_unique_k8s_job_name(app_prefix=None, instance_id=None, job_id=None): + if job_id is None: + job_id = str(uuid.uuid4()) + + job_name = "" + if app_prefix: + job_name += "%s-" % app_prefix + + if instance_id and len(instance_id) > 0: + job_name += "%s-" % instance_id + + return job_name + job_id def pull_policy(params): # If this doesn't validate it returns None, that seems odd? if "k8s_pull_policy" in params: - if params["k8s_pull_policy"] in ["Always", "IfNotPresent", "Never"]: - return params["k8s_pull_policy"] + if params['k8s_pull_policy'] in ["Always", "IfNotPresent", "Never"]: + return params['k8s_pull_policy'] return None -def find_service_object_by_name(pykube_api, service_name, namespace=None): - if not service_name: - raise ValueError("service name must not be empty") - return Service.objects(pykube_api).filter(field_selector={"metadata.name": service_name}, namespace=namespace) - - -def find_ingress_object_by_name(pykube_api, ingress_name, namespace=None): - if not ingress_name: - raise ValueError("ingress name must not be empty") - return Ingress.objects(pykube_api).filter(field_selector={"metadata.name": ingress_name}, namespace=namespace) - - def find_job_object_by_name(pykube_api, job_name, namespace=None): - if not job_name: - raise ValueError("job name must not be empty") - return Job.objects(pykube_api).filter(field_selector={"metadata.name": job_name}, namespace=namespace) + return _find_object_by_name(Job, pykube_api, job_name, namespace=namespace) -def find_pod_object_by_name(pykube_api, job_name, namespace=None): - return Pod.objects(pykube_api).filter(selector=f"job-name={job_name}", namespace=namespace) +def find_pod_object_by_name(pykube_api, pod_name, namespace=None): + return _find_object_by_name(Pod, pykube_api, pod_name, namespace=namespace) -def is_pod_unschedulable(pykube_api, pod, namespace=None): - is_unschedulable = any(c.get("reason") == "Unschedulable" for c in pod.obj["status"].get("conditions", [])) - if pod.obj["status"].get("phase") == "Pending" and is_unschedulable: - return True +def _find_object_by_name(clazz, pykube_api, object_name, namespace=None): + filter_kwd = dict(selector="app=%s" % object_name) + if namespace is not None: + filter_kwd["namespace"] = namespace - return False + objs = clazz.objects(pykube_api).filter(**filter_kwd) + obj = None + if len(objs.response['items']) > 0: + obj = clazz(pykube_api, objs.response['items'][0]) + return obj -def delete_job(job, cleanup="always"): - job_failed = job.obj["status"]["failed"] > 0 if "failed" in job.obj["status"] else False +def stop_job(job, cleanup="always"): + job_failed = (job.obj['status']['failed'] > 0 + if 'failed' in job.obj['status'] else False) # Scale down the job just in case even if cleanup is never job.scale(replicas=0) - api_delete = cleanup == "always" - if not api_delete and cleanup == "onsuccess" and not job_failed: - api_delete = True - if api_delete: - delete_options = {"apiVersion": "v1", "kind": "DeleteOptions", "propagationPolicy": "Background"} + if (cleanup == "always" or + (cleanup == "onsuccess" and not job_failed)): + delete_options = { + "apiVersion": "v1", + "kind": "DeleteOptions", + "propagationPolicy": "Background" + } r = job.api.delete(json=delete_options, **job.api_kwargs()) job.api.raise_for_status(r) -def delete_ingress(ingress, cleanup="always", job_failed=False): - api_delete = cleanup == "always" - if not api_delete and cleanup == "onsuccess" and not job_failed: - api_delete = True - if api_delete: - delete_options = {"apiVersion": "v1", "kind": "DeleteOptions", "propagationPolicy": "Background"} - r = ingress.api.delete(json=delete_options, **ingress.api_kwargs()) - ingress.api.raise_for_status(r) - - -def delete_service(service, cleanup="always", job_failed=False): - api_delete = cleanup == "always" - if not api_delete and cleanup == "onsuccess" and not job_failed: - api_delete = True - if api_delete: - delete_options = {"apiVersion": "v1", "kind": "DeleteOptions", "propagationPolicy": "Background"} - r = service.api.delete(json=delete_options, **service.api_kwargs()) - service.api.raise_for_status(r) - - -def job_object_dict(params, job_prefix, spec): +def job_object_dict(params, job_name, spec): k8s_job_obj = { - "apiVersion": params.get("k8s_job_api_version", DEFAULT_JOB_API_VERSION), + "apiVersion": params.get('k8s_job_api_version', DEFAULT_JOB_API_VERSION), "kind": "Job", "metadata": { - "generateName": f"{job_prefix}-", - "namespace": params.get("k8s_namespace", DEFAULT_NAMESPACE), + # metadata.name is the name of the pod resource created, and must be unique + # http://kubernetes.io/docs/user-guide/configuring-containers/ + "name": job_name, + "namespace": params.get('k8s_namespace', DEFAULT_NAMESPACE), + "labels": {"app": job_name} }, "spec": spec, } return k8s_job_obj -def service_object_dict(params, service_name, spec): - k8s_service_obj = { - "apiVersion": params.get("k8s_service_api_version", DEFAULT_SERVICE_API_VERSION), - "kind": "Service", - "metadata": { - "name": service_name, - "namespace": params.get("k8s_namespace", DEFAULT_NAMESPACE), - }, - } - k8s_service_obj["metadata"].update(spec.pop("metadata", {})) - k8s_service_obj.update(spec) - return k8s_service_obj - - -def ingress_object_dict(params, ingress_name, spec): - k8s_ingress_obj = { - "apiVersion": params.get("k8s_ingress_api_version", DEFAULT_INGRESS_API_VERSION), - "kind": "Ingress", - "metadata": { - "name": ingress_name, - "namespace": params.get("k8s_namespace", DEFAULT_NAMESPACE), - # TODO: Add default annotations - }, - } - k8s_ingress_obj["metadata"].update(spec.pop("metadata", {})) - k8s_ingress_obj.update(spec) - return k8s_ingress_obj - - -def parse_pvc_param_line(pvc_param): - """ - :type pvc_param: str - :param pvc_param: the pvc mount param in the format ``pvc-name/subpath/desired:/mountpath/desired[:r]`` - - :rtype: dict - :return: a dict - like:: - - {"name": "pvc-name", - "subPath": "subpath/desired", - "mountPath": "/mountpath/desired", - "readOnly": False} - """ - claim, _, rest = pvc_param.partition(":") - mount_path, _, mode = rest.partition(":") - read_only = mode == "r" - claim_name, _, subpath = claim.partition("/") - return { - "name": claim_name.strip(), - "subPath": subpath.strip(), - "mountPath": mount_path.strip(), - "readOnly": read_only, - } - - -def generate_relative_mounts(pvc_param, files): - """ - Maps a list of files as mounts, relative to the base volume mount. - For example, given the pvc mount: - { - 'name': 'my_pvc', - 'mountPath': '/galaxy/database/jobs', - 'subPath': 'data', - 'readOnly': False - } - - and files: ['/galaxy/database/jobs/01/input.txt', '/galaxy/database/jobs/01/working'] - - returns each file as a relative mount as follows: - [ - { - 'name': 'my_pvc', - 'mountPath': '/galaxy/database/jobs/01/input.txt', - 'subPath': 'data/01/input.txt', - 'readOnly': False - }, - { - 'name': 'my_pvc', - 'mountPath': '/galaxy/database/jobs/01/working', - 'subPath': 'data/01/working', - 'readOnly': False - } - ] - - :param pvc_param: the pvc claim dict - :param files: a list of file or folder names - :return: A list of volume mounts - """ - if not pvc_param: - return - param_claim = parse_pvc_param_line(pvc_param) - claim_name = param_claim["name"] - base_subpath = PurePath(param_claim.get("subPath", "")) - base_mount = PurePath(param_claim["mountPath"]) - read_only = param_claim["readOnly"] - volume_mounts = [] - for f in files: - file_path = PurePath(str(f)) - if base_mount not in file_path.parents: - # force relative directory, needed for the job working directory in particular - file_path = base_mount.joinpath(file_path.relative_to("/") if file_path.is_absolute() else file_path) - relpath = file_path.relative_to(base_mount) - subpath = base_subpath.joinpath(relpath) - volume_mounts.append( - {"name": claim_name, "mountPath": str(file_path), "subPath": str(subpath), "readOnly": read_only} - ) - return volume_mounts - - -def deduplicate_entries(obj_list): - # remove duplicate entries in a list of dictionaries - # based on: https://stackoverflow.com/a/9428041 - return [i for n, i in enumerate(obj_list) if i not in obj_list[n + 1 :]] - - -def get_volume_mounts_for_job(job_wrapper, data_claim=None, working_claim=None): - volume_mounts = [] - if data_claim: - volume_mounts.extend(generate_relative_mounts(data_claim, job_wrapper.job_io.get_input_fnames())) - # for individual output files, mount the parent folder of each output as there could be wildcard outputs - output_folders = deduplicate_entries( - [str(PurePath(str(f)).parent) for f in job_wrapper.job_io.get_output_fnames()] - ) - volume_mounts.extend(generate_relative_mounts(data_claim, output_folders)) - - if working_claim: - volume_mounts.extend(generate_relative_mounts(working_claim, [job_wrapper.working_directory])) - - return deduplicate_entries(volume_mounts) - - def galaxy_instance_id(params): """Parse and validate the id of the Galaxy instance from supplied dict. @@ -291,7 +133,7 @@ def galaxy_instance_id(params): setup of a Job that is being recovered or restarted after a downtime/reboot. """ if "k8s_galaxy_instance_id" in params: - raw_value = params["k8s_galaxy_instance_id"] + raw_value = params['k8s_galaxy_instance_id'] if re.match(r"(?!-)[a-z\d-]{1,20}(?<!-)$", raw_value): return raw_value else: @@ -301,29 +143,15 @@ def galaxy_instance_id(params): __all__ = ( "DEFAULT_JOB_API_VERSION", - "DEFAULT_SERVICE_API_VERSION", - "DEFAULT_INGRESS_API_VERSION", "ensure_pykube", - "find_service_object_by_name", - "find_ingress_object_by_name", "find_job_object_by_name", "find_pod_object_by_name", "galaxy_instance_id", - "HTTPError", - "is_pod_unschedulable", "Job", - "Service", - "Ingress", "job_object_dict", - "service_object_dict", - "ingress_object_dict", "Pod", - "produce_k8s_job_prefix", + "produce_unique_k8s_job_name", "pull_policy", "pykube_client_from_dict", - "delete_job", - "delete_service", - "delete_ingress", - "get_volume_mounts_for_job", - "parse_pvc_param_line", + "stop_job", )