Commit b0a216ea authored by Hines, Jesse's avatar Hines, Jesse
Browse files

Update simulation driver script

parent 6078902e
Loading
Loading
Loading
Loading
+7 −93
Original line number Diff line number Diff line
from __future__ import annotations
from typing import Optional, Literal, Annotated as A, get_args
from datetime import timedelta, datetime, timezone
import json, math
from pydantic import AwareDatetime, model_validator, field_validator, Field
import json
from pydantic import AwareDatetime, Field
from raps import SingleSimConfig
from raps.utils import AutoAwareDatetime

from .base import BaseModel
from .job_state import JobStateEnum
from ..util.misc import omit
from ..util.api_queries import filter_params, sort_params

@@ -99,92 +99,6 @@ SIM_SORT = sort_params(omit(SIM_API_FIELDS, ['progress', 'progress_date', 'confi
])


class SimConfig(BaseModel):
    start: AwareDatetime
    end: AwareDatetime

    system: SimSystem
    scheduler: A[SchedulerSimConfig, Field(default_factory = lambda: SchedulerSimConfig())]
    cooling: A[CoolingSimConfig, Field(default_factory = lambda: CoolingSimConfig())]

    @model_validator(mode='after')
    def validate_model(self):
        if self.end <= self.start:
            raise ValueError("Start must be less than end")

        if not any(m.enabled for m in [self.scheduler, self.cooling]):
            raise ValueError("Must enable one simulation")
        if self.cooling.enabled and not self.scheduler.enabled:
            raise ValueError("Currently can't run cooling simulation without the scheduler")
        return self

    @field_validator("start", mode="after")
    @classmethod
    def trunc_start(cls, v: datetime, info):
        return v.fromtimestamp(math.floor(v.timestamp()), tz=timezone.utc)

    @field_validator("end", mode="after")
    @classmethod
    def trunc_end(cls, v: datetime, info):
        return v.fromtimestamp(math.ceil(v.timestamp()), tz=timezone.utc)


class SchedulerSimConfig(BaseModel):
    """
    Config for RAPS job simulation.
    There are 3 main "modes" for how to run the jobs.
    - replay: Replay data based on the real jobs run on Frontier during start/end
    - custom: Pass your own set of jobs to submit in the simulation in `jobs`
    - random: Run random jobs. You can pass `seed` and `num_jobs` to customize it.
    """

    enabled: bool = False
    down_nodes: list[int] = [] # List of hostnames. TODO: allow parsing from xnames
    
    jobs_mode: Literal['replay', 'custom', 'random', 'test'] = 'random'
    schedule_policy: Literal['fcfs', 'sjf', 'prq'] ='fcfs'
    """"
    Policy to use when scheduling jobs.
    Replay mode will ignore this and use the real time jobs were scheduled unless you also set
    reschedule to true.
    """
    reschedule: bool = False
    """ If true, will apply schedule_policy in replay mode """

    jobs: Optional[list[SchedulerSimCustomJob]] = None
    """
    The list of jobs.
    Only applicable if jobs_mode is "custom"
    """

    seed: Optional[int] = None
    """
    Random seed for consistent random job generation.
    Only applicable if jobs_mode is "random"
    """
    num_jobs: Optional[int] = None
    """
    Number of random jobs to generate.
    Only applicable if jobs_mode is "random"
    """


class SchedulerSimCustomJob(BaseModel):
    # This is mostly a subset of the SchedulerSimJob
    name: str
    allocation_nodes: int
    """ Number of nodes required """
    time_submission: AwareDatetime
    time_limit: timedelta

    cpu_util: float
    gpu_util: float
    cpu_trace: list[float]
    gpu_trace: list[float]

    end_state: JobStateEnum
    """ Slurm state job will end in """


class CoolingSimConfig(BaseModel):
    enabled: bool = False
class ServerSimConfig(SingleSimConfig):
    start: AutoAwareDatetime  # make start required
    """ Start of the simulation """
+39 −46
Original line number Diff line number Diff line
""" A script to run the ExaDigiT simulation """
from typing import Callable
import argparse, os, json
from pathlib import Path
from datetime import datetime, timezone
from loguru import logger
import yaml
from ..models.sim import Sim, SimConfig
from ..models.sim import Sim, ServerSimConfig
from .simulation import run_simulation
from ..util.kafka import get_kafka_producer


def cli_run(config: SimConfig):
    for i, data in enumerate(run_simulation(config)):
        print(f"TICK {i}")


def background_job(sim: Sim):
def write_sim(sim: Sim, writer: Callable[[str, bytes], None]):
    sim = sim.model_copy()
    kafka_producer = get_kafka_producer()

    def output_rows(topic, rows):
        for row in rows:
            value = json.dumps({"sim_id": sim.id, **row.model_dump(mode='json')}).encode()
            kafka_producer.send(topic=topic, value=value)
            writer(topic, value)

    logger.info(f"Starting simulation {sim.model_dump_json()}")
    config = SimConfig.model_validate(sim.config)
    config = ServerSimConfig.model_validate(sim.config)
    progress_date = sim.start

    try:
@@ -40,59 +35,57 @@ def background_job(sim: Sim):
        sim.execution_end = datetime.now(timezone.utc)
        sim.error_messages = str(e)
        sim.progress_date = progress_date
        kafka_producer.send("svc-event-exadigit-sim", value = sim.serialize_for_druid())
        kafka_producer.close() # Close and wait for messages to be sent
        writer("svc-event-exadigit-sim", sim.serialize_for_druid())
        logger.info(f"Simulation {sim.id} failed")
        raise e
    
    sim.state = "success"
    sim.execution_end = datetime.now(timezone.utc)
    sim.progress_date = sim.end
    kafka_producer.send(topic = "svc-event-exadigit-sim", value = sim.serialize_for_druid())
    kafka_producer.close() # Close and wait for messages to be sent
    writer("svc-event-exadigit-sim", sim.serialize_for_druid())
    logger.info(f"Simulation {sim.id} finished")


def write_sim_to_kafka(sim: Sim):
    kafka_producer = get_kafka_producer()
    def writer(topic: str, value: bytes):
        kafka_producer.send(topic=topic, value=value)
    try:
        write_sim(sim, writer=writer)
    finally:
        kafka_producer.close()


def write_sim_to_disk(sim: Sim, dest: str):
    Path(dest).mkdir(exist_ok=True)
    def writer(topic: str, value: bytes):
        with open(Path(dest) / f"{topic}.jsonl", 'ab') as f:
            f.write(value + b"\n")
    write_sim(sim, writer=writer)


if __name__ == "__main__":
    parser = argparse.ArgumentParser(
        description = __doc__.strip(),
        allow_abbrev = False,
        formatter_class = argparse.RawDescriptionHelpFormatter,
    )
    subparsers = parser.add_subparsers(required=True, dest="action")

    parser_cli_run = subparsers.add_parser('run')
    parser_cli_run.add_argument("--config", type=str, help="JSON config string")
    parser_cli_run.add_argument("--config-file", type=Path, help="Path to a yaml or json file contain the config")

    parser_cli_run = subparsers.add_parser('background-job')
    parser_cli_run.add_argument("--sim", type=str, help="JSON config string")
    parser.add_argument("--sim", type=str, help="Sim json")
    parser.add_argument("--dest", default=None)

    args = parser.parse_args()


    if args.action == "run":
        if args.config and args.config_file:
            raise Exception("You can only specify either config or config-file")
        
        if args.config:
            config = yaml.safe_load(args.config)
        elif args.config_file:
            config = yaml.safe_load(args.config_file.read_text())
        elif "SIM_CONFIG" in os.environ:
            config = yaml.safe_load(os.environ["SIM_CONFIG"])
        elif "SIM_CONFIG_FILE" in os.environ:
            config = yaml.safe_load(Path(os.environ["SIM_CONFIG_FILE"]).read_text())
    if args.sim:
        sim = args.sim
    elif os.environ.get("SIM"):
        sim = os.environ["SIM"]
    else:
        raise Exception("No configuration passed")
        config = SimConfig.model_validate(config)
        cli_run(config)
    elif args.action == "background-job":
        if args.sim:
            sim = yaml.safe_load(args.sim)
        elif "SIM" in os.environ:
            sim = yaml.safe_load(os.environ["SIM"])

    sim = Sim.model_validate(yaml.safe_load(sim))

    if args.dest:
        write_sim_to_disk(sim, args.dest)
    else:
            raise Exception("No sim passed")
        sim = Sim.model_validate(sim)
        background_job(sim)
        write_sim_to_kafka(sim)