Loading pyproject.toml +1 −0 Original line number Diff line number Diff line Loading @@ -29,6 +29,7 @@ dependencies = [ "elasticsearch==7.13.4", "elasticsearch-dbapi==0.2.11", "requests==2.32.5", "orjson==3.11.3", "raps@{root:uri}/raps", ] Loading simulation_server/simulation/simulation.py +38 −26 Original line number Diff line number Diff line from typing import NamedTuple from datetime import datetime, timedelta import functools import functools, itertools import orjson from loguru import logger from raps import Engine from raps.job import Job as RapsJob from raps.stats import get_engine_stats, get_job_stats from ..models.sim import ServerSimConfig from ..models.output import ( Loading @@ -23,9 +25,20 @@ class SimTickOutput(NamedTuple): power_history: list[SchedulerSimJobPowerHistory] def get_job_state_hash(job: SchedulerSimJob): def get_job_state_hash(job: RapsJob): """ Return string that can be used to check if any meaningful state changed """ return job.model_dump_json(exclude={"time_snapshot"}) return orjson.dumps([ str(job.id), job.name, job.nodes_required, job.submit_time, job.time_limit, job.start_time, job.end_time, job.current_state.name, # Node list shouldn't change once set so just do len instead of serializing the large list len(job.scheduled_nodes) if job.scheduled_nodes else None, ]) def run_simulation(sim_config: ServerSimConfig): Loading @@ -50,7 +63,7 @@ def run_simulation(sim_config: ServerSimConfig): # Keep record of how many power history steps we've emitted for each job power_history_counts: dict[int, int] = {} prev_jobs: dict[str, str] = {} prev_job_hashes: set[str] = set() for tick in engine.run_simulation(): timestamp: datetime = _offset_to_time(tick.current_timestep) Loading Loading @@ -89,36 +102,35 @@ def run_simulation(sim_config: ServerSimConfig): ))] scheduler_sim_jobs: list[SchedulerSimJob] = [] curr_jobs = {} tick_jobs = tick.queue + tick.running + tick.completed + tick.killed curr_job_hashes = set() tick_jobs = itertools.chain(tick.queue, tick.running, tick.completed, tick.killed) for job in tick_jobs: time_end = _offset_to_time(job.end_time) # end_time is set to its planned end once its scheduled. Set it to None for unfinished jobs here if time_end is not None and (job.start_time is None or time_end > timestamp): time_end = None parsed_job = SchedulerSimJob.model_validate(dict( job_id = str(job.id), name = job.name, node_count = job.nodes_required, time_snapshot = timestamp, time_submission = _offset_to_time(job.submit_time), time_limit = job.time_limit, time_start = _offset_to_time(job.start_time), time_end = time_end, state_current = JobStateEnum(job.current_state.name), nodes = _parse_nodes(tuple(job.scheduled_nodes)) if job.scheduled_nodes else None, job_state_hash = get_job_state_hash(job) # Output jobs if something other than time_snapshot changed if is_last_tick or job_state_hash not in prev_job_hashes: parsed_job = SchedulerSimJob.model_validate({ "job_id": str(job.id), "name": job.name, "node_count": job.nodes_required, "time_snapshot": timestamp, "time_submission": _offset_to_time(job.submit_time), "time_limit": job.time_limit, "time_start": _offset_to_time(job.start_time), "time_end": time_end, "state_current": JobStateEnum(job.current_state.name), "nodes": _parse_nodes(tuple(job.scheduled_nodes)) if job.scheduled_nodes else None, # How does the new job.power attribute work? Is it total_energy? # Or just the current wattage? # power = job.power, )) job_state_hash = get_job_state_hash(parsed_job) # Output jobs if something other than time_snapshot changed if is_last_tick or prev_jobs.get(parsed_job.job_id) != job_state_hash: }) scheduler_sim_jobs.append(parsed_job) curr_jobs[parsed_job.job_id] = job_state_hash prev_jobs = curr_jobs curr_job_hashes.add(job_state_hash) prev_job_hashes = curr_job_hashes power_history: list[SchedulerSimJobPowerHistory] = [] for job in tick_jobs: Loading Loading
pyproject.toml +1 −0 Original line number Diff line number Diff line Loading @@ -29,6 +29,7 @@ dependencies = [ "elasticsearch==7.13.4", "elasticsearch-dbapi==0.2.11", "requests==2.32.5", "orjson==3.11.3", "raps@{root:uri}/raps", ] Loading
simulation_server/simulation/simulation.py +38 −26 Original line number Diff line number Diff line from typing import NamedTuple from datetime import datetime, timedelta import functools import functools, itertools import orjson from loguru import logger from raps import Engine from raps.job import Job as RapsJob from raps.stats import get_engine_stats, get_job_stats from ..models.sim import ServerSimConfig from ..models.output import ( Loading @@ -23,9 +25,20 @@ class SimTickOutput(NamedTuple): power_history: list[SchedulerSimJobPowerHistory] def get_job_state_hash(job: SchedulerSimJob): def get_job_state_hash(job: RapsJob): """ Return string that can be used to check if any meaningful state changed """ return job.model_dump_json(exclude={"time_snapshot"}) return orjson.dumps([ str(job.id), job.name, job.nodes_required, job.submit_time, job.time_limit, job.start_time, job.end_time, job.current_state.name, # Node list shouldn't change once set so just do len instead of serializing the large list len(job.scheduled_nodes) if job.scheduled_nodes else None, ]) def run_simulation(sim_config: ServerSimConfig): Loading @@ -50,7 +63,7 @@ def run_simulation(sim_config: ServerSimConfig): # Keep record of how many power history steps we've emitted for each job power_history_counts: dict[int, int] = {} prev_jobs: dict[str, str] = {} prev_job_hashes: set[str] = set() for tick in engine.run_simulation(): timestamp: datetime = _offset_to_time(tick.current_timestep) Loading Loading @@ -89,36 +102,35 @@ def run_simulation(sim_config: ServerSimConfig): ))] scheduler_sim_jobs: list[SchedulerSimJob] = [] curr_jobs = {} tick_jobs = tick.queue + tick.running + tick.completed + tick.killed curr_job_hashes = set() tick_jobs = itertools.chain(tick.queue, tick.running, tick.completed, tick.killed) for job in tick_jobs: time_end = _offset_to_time(job.end_time) # end_time is set to its planned end once its scheduled. Set it to None for unfinished jobs here if time_end is not None and (job.start_time is None or time_end > timestamp): time_end = None parsed_job = SchedulerSimJob.model_validate(dict( job_id = str(job.id), name = job.name, node_count = job.nodes_required, time_snapshot = timestamp, time_submission = _offset_to_time(job.submit_time), time_limit = job.time_limit, time_start = _offset_to_time(job.start_time), time_end = time_end, state_current = JobStateEnum(job.current_state.name), nodes = _parse_nodes(tuple(job.scheduled_nodes)) if job.scheduled_nodes else None, job_state_hash = get_job_state_hash(job) # Output jobs if something other than time_snapshot changed if is_last_tick or job_state_hash not in prev_job_hashes: parsed_job = SchedulerSimJob.model_validate({ "job_id": str(job.id), "name": job.name, "node_count": job.nodes_required, "time_snapshot": timestamp, "time_submission": _offset_to_time(job.submit_time), "time_limit": job.time_limit, "time_start": _offset_to_time(job.start_time), "time_end": time_end, "state_current": JobStateEnum(job.current_state.name), "nodes": _parse_nodes(tuple(job.scheduled_nodes)) if job.scheduled_nodes else None, # How does the new job.power attribute work? Is it total_energy? # Or just the current wattage? # power = job.power, )) job_state_hash = get_job_state_hash(parsed_job) # Output jobs if something other than time_snapshot changed if is_last_tick or prev_jobs.get(parsed_job.job_id) != job_state_hash: }) scheduler_sim_jobs.append(parsed_job) curr_jobs[parsed_job.job_id] = job_state_hash prev_jobs = curr_jobs curr_job_hashes.add(job_state_hash) prev_job_hashes = curr_job_hashes power_history: list[SchedulerSimJobPowerHistory] = [] for job in tick_jobs: Loading