Commit 3090cef4 authored by nuwang's avatar nuwang
Browse files

Accept any valid k8s destination param as overridable

parent 6350bab0
Loading
Loading
Loading
Loading
+4 −12
Original line number Diff line number Diff line
@@ -267,8 +267,10 @@ class KubernetesJobRunner(AsynchronousJobRunner):
        ingress.create()

    def __get_overridable_params(self, job_wrapper, param_key):
        dest_params = self.__get_destination_params(job_wrapper)
        return dest_params.get(param_key, self.runner_params[param_key])
        try:
            return job_wrapper.job_destination.params[param_key]
        except KeyError:
            return self.runner_params[param_key]

    def __get_pull_policy(self):
        return pull_policy(self.runner_params)
@@ -710,16 +712,6 @@ class KubernetesJobRunner(AsynchronousJobRunner):
    def __get_k8s_job_name(self, prefix, job_wrapper):
        return f"{prefix}-{self.__force_label_conformity(job_wrapper.get_id_tag())}"

    def __get_destination_params(self, job_wrapper):
        """Obtains allowable runner param overrides from the destination"""
        job_destination = job_wrapper.job_destination
        OVERRIDABLE_PARAMS = ["k8s_node_selector", "k8s_affinity", "k8s_extra_job_envs"]
        new_params = {}
        for each_param in OVERRIDABLE_PARAMS:
            if each_param in job_destination.params:
                new_params[each_param] = job_destination.params[each_param]
        return new_params

    def check_watched_item(self, job_state):
        """Checks the state of a job already submitted on k8s. Job state is an AsynchronousJobState"""
        jobs = find_job_object_by_name(self._pykube_api, job_state.job_id, self.runner_params["k8s_namespace"])