Verified Commit a2c21486 authored by Hines, Jesse's avatar Hines, Jesse
Browse files

Fixes to cleanup task

parent 98110807
Loading
Loading
Loading
Loading
+14 −15
Original line number Diff line number Diff line
@@ -16,7 +16,7 @@ from ..models.output import (
    SCHEDULER_SIM_JOB_POWER_HISTORY_API_FIELDS, SCHEDULER_SIM_JOB_POWER_HISTORY_FIELD_SELECTORS,
)
from ..util.misc import pick, omit
from ..util.k8s import submit_job, get_k8s_jobs, get_k8s_job_state, get_k8s_job_end_time
from ..util.k8s import submit_job, get_k8s_job, get_k8s_job_state, get_k8s_job_end_time
from ..util.druid import to_timestamp, any_value, latest, execute_ignore_missing
from ..util.api_queries import (
    Filters, Sort, QuerySpan, Granularity, expand_field_selectors, DatetimeValidator,
@@ -137,16 +137,22 @@ def cleanup_jobs(druid_engine, kafka_producer, settings):
    now = datetime.now(timezone.utc)
    # threshold after job has ended before sending a cancel (incase the job did send its own
    # cancel message and it just hasn't shown up in Druid yet)
    threshold = timedelta(minutes=1)
    threshold = timedelta(minutes=2)

    running_sims, _ = query_sims(
        filters=SIM_FILTERS(state = ["eq:running"]),
        fields = ["all"],
        limit = 1000, # If somehow there's more than that we'll just get them next trigger
        druid_engine = druid_engine,
    )

    running_jobs = set()
    if 'KUBERNETES_SERVICE_HOST' in os.environ:
        for job in get_k8s_jobs():
            if job.metadata.name.startswith('exadigit-simulation-server-'):
                sim_id = job.metadata.name.removeprefix('exadigit-simulation-server-')
        for sim in running_sims:
            job = get_k8s_job(f"exadigit-simulation-server-{sim.id}")
            # Add a little bit of threshold to avoid potentially sending duplicate fail messages
                if get_k8s_job_state(job) == "running" or get_k8s_job_end_time(job) < now - threshold:
                    running_jobs.add(sim_id)
            if get_k8s_job_state(job) == "running" or (job and now - get_k8s_job_end_time(job) < threshold):
                running_jobs.add(sim.id)
    else:
        for proc in psutil.Process().children():
            try:
@@ -157,13 +163,6 @@ def cleanup_jobs(druid_engine, kafka_producer, settings):
            except (psutil.Error):
                pass

    running_sims, _ = query_sims(
        filters=SIM_FILTERS(state = ["eq:running"]),
        fields = ["all"],
        limit = 1000, # If somehow there's more than that we'll just get them next trigger
        druid_engine = druid_engine,
    )

    for sim in running_sims:
        if sim.id not in running_jobs and now - sim.execution_start > threshold:
            sim.state = 'fail'
+2 −2
Original line number Diff line number Diff line
@@ -17,9 +17,9 @@ def submit_job(job: dict):
    return get_batch_api().create_namespaced_job(namespace = get_namespace(), body = job)


def get_k8s_jobs():
def get_k8s_job(name):
    try:
        return get_batch_api().list_namespaced_job(namespace = get_namespace())
        return get_batch_api().read_namespaced_job(namespace = get_namespace(), name = name)
    except k8s.client.ApiException as e:
        if e.status == 404:
            return None