Commit e7a6d22e authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Merge branch 'refactor-from-sim-config' into 'develop'

Simplify Engine creation

See merge request !120
parents 238c4aba b627c166
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -155,8 +155,8 @@ class ThermoFluidsModel:
        # If replay mode is on and weather data is available
        if self.weather and self.weather.has_coords:
            # Convert total seconds to timedelta object
            delta = timedelta(seconds=engine.current_timestep)
            target_datetime = self.weather.start + delta
            delta = timedelta(seconds=engine.current_timestep - engine.timestep_start)
            target_datetime = engine.start + delta

            # Get temperature from weather data
            temperature = self.weather.get_temperature(target_datetime) or self.config['WET_BULB_TEMP']
+107 −114
Original line number Diff line number Diff line
@@ -15,6 +15,7 @@ from raps.policy import PolicyType
from raps.utils import (
    summarize_ranges,
    get_current_utilization,
    WorkloadData,
)
from raps.resmgr import ResourceManager
from raps.schedulers import load_scheduler
@@ -39,7 +40,6 @@ from raps.account import Accounts
from raps.downtime import Downtime
from raps.weather import Weather
from raps.sim_config import SimConfig
from raps.system_config import SystemConfig
from bisect import bisect_right


@@ -125,94 +125,7 @@ def keyboard_listener(state):
class Engine:
    """Job scheduling simulation engine."""

    def __init__(self, *,
                 power_manager: PowerManager,
                 flops_manager: FLOPSManager,
                 telemetry: Telemetry,
                 cooling_model: ThermoFluidsModel | None = None,
                 jobs=None,
                 total_initial_jobs=0,
                 # Workload class to generate from for continuous generation
                 continuous_workload: Workload | None = None,
                 accounts=None,
                 sim_config: SimConfig,
                 system_config: SystemConfig,
                 ):
        self.config = system_config.get_legacy()
        self.down_nodes = summarize_ranges(self.config['DOWN_NODES'])
        self.resource_manager = ResourceManager(
            total_nodes=self.config['TOTAL_NODES'],
            down_nodes=self.config['DOWN_NODES'],
            config=self.config
        )
        # Initialize running and queue, etc.
        self.running = []
        self.queue = []
        self.accounts = accounts
        self.telemetry = telemetry
        self.job_history_dict = []
        self.jobs_completed = 0
        self.jobs_killed = 0
        self.total_initial_jobs = total_initial_jobs
        self.current_timestep = 0
        self.cooling_model = cooling_model
        self.sys_power = 0
        self.power_manager = power_manager
        self.flops_manager = flops_manager
        self.debug = sim_config.debug
        self.continuous_workload = continuous_workload
        self.replay = sim_config.replay
        self.downscale = sim_config.downscale  # Factor to downscale the 1s timesteps (power of 10)
        self.simulate_network = sim_config.simulate_network
        self.sys_util_history = []
        self.scheduler_queue_history = []
        self.scheduler_running_history = []
        self.avg_net_tx = []
        self.avg_net_rx = []
        self.net_util_history = []
        self.avg_slowdown_history = []
        self.max_slowdown_history = []
        self.node_occupancy_history = []
        self.downtime = Downtime(first_downtime=sim_config.downtime_first_int,
                                 downtime_interval=sim_config.downtime_interval_int,
                                 downtime_length=sim_config.downtime_length_int,
                                 debug=sim_config.debug,
                                 )

        # Set scheduler type - either based on config or command-line args - defaults to 'default'
        if self.config['multitenant']:
            scheduler_type = 'multitenant'
        else:
            scheduler_type = sim_config.scheduler

        policy_type = sim_config.policy
        backfill_type = sim_config.backfill

        self.scheduler = load_scheduler(scheduler_type)(
            config=self.config,
            policy=policy_type,
            bfpolicy=backfill_type,
            resource_manager=self.resource_manager,
            jobs=jobs
        )
        if sim_config.live:
            assert self.scheduler.policy != PolicyType.REPLAY, \
                "Cannot replay from a live system. Choose a scheduling policy!"
        print(f"Using scheduler: {str(self.scheduler.__class__).split('.')[2]}"
              f", with policy {self.scheduler.policy} "
              f"and backfill {self.scheduler.bfpolicy}")

        if self.simulate_network:
            available_nodes = self.resource_manager.available_nodes
            self.network_model = NetworkModel(
                available_nodes=available_nodes,
                config=self.config,
            )
        else:
            self.network_model = None

    @staticmethod
    def from_sim_config(sim_config: SimConfig, partition: str | None = None):
    def __init__(self, sim_config: SimConfig, partition: str | None = None):
        if partition:
            system_config = sim_config.get_system_config_by_name(partition)
        elif len(sim_config.system_configs) > 1:
@@ -234,12 +147,12 @@ class Engine:
            np.random.seed(sim_config.seed + 1)

        if sim_config.live and not sim_config.replay:
            td = Telemetry(**sim_config_dict)
            wd = td.load_from_live_system()
            telemetry = Telemetry(**sim_config_dict)
            wd = telemetry.load_from_live_system()
        elif sim_config.replay:
            # TODO: this will have issues if running separate systems or custom systems
            partition_short = partition.split("/")[-1] if partition else None
            td = Telemetry(
            telemetry = Telemetry(
                **sim_config_dict,
                partition=partition,
            )
@@ -254,15 +167,18 @@ class Engine:
            else:
                replay_files = sim_config.replay

            wd = td.load_from_files(replay_files)
            wd = telemetry.load_from_files(replay_files)
        else:  # Synthetic jobs
            wl = Workload(sim_config_args, system_config_dict)
            wd = wl.generate_jobs()
            td = Telemetry(**sim_config_dict)
            telemetry = Telemetry(**sim_config_dict)

        jobs = wd.jobs
        if len(jobs) == 0:
            print(f"Warning no jobs found for {partition or 'system'}")
        if partition and len(sim_config.system_configs) > 1:
            for job in jobs:
                job.partition = partition

        if sim_config.start:
            start = sim_config.start
@@ -319,19 +235,94 @@ class Engine:
            else:
                accounts = job_accounts

        engine = Engine(
            power_manager=power_manager,
            flops_manager=flops_manager,
            cooling_model=cooling_model,
            continuous_workload=continuous_workload,
            jobs=jobs,
            accounts=accounts,
            telemetry=td,
            sim_config=sim_config,
            system_config=system_config,
        self.sim_config = sim_config
        self.system_config = system_config
        self.config = system_config.get_legacy()

        self.start = start
        self.timestep_start = wd.telemetry_start
        self.timestep_end = wd.telemetry_end
        self.time_delta = time_delta

        self.down_nodes = summarize_ranges(self.config['DOWN_NODES'])
        self.resource_manager = ResourceManager(
            total_nodes=self.config['TOTAL_NODES'],
            down_nodes=self.config['DOWN_NODES'],
            config=self.config
        )
        # Initialize running and queue, etc.
        self.running = []
        self.queue = []
        self.accounts = accounts
        self.telemetry = telemetry
        self.job_history_dict = []
        self.jobs_completed = 0
        self.jobs_killed = 0
        self.jobs = jobs
        self.total_initial_jobs = len(jobs)
        self.current_timestep = 0
        self.cooling_model = cooling_model
        self.sys_power = 0
        self.power_manager = power_manager
        self.flops_manager = flops_manager
        self.debug = sim_config.debug
        self.continuous_workload = continuous_workload
        self.replay = sim_config.replay
        self.downscale = sim_config.downscale  # Factor to downscale the 1s timesteps (power of 10)
        self.simulate_network = sim_config.simulate_network
        self.sys_util_history = []
        self.scheduler_queue_history = []
        self.scheduler_running_history = []
        self.avg_net_tx = []
        self.avg_net_rx = []
        self.net_util_history = []
        self.avg_slowdown_history = []
        self.max_slowdown_history = []
        self.node_occupancy_history = []
        self.downtime = Downtime(first_downtime=sim_config.downtime_first_int,
                                 downtime_interval=sim_config.downtime_interval_int,
                                 downtime_length=sim_config.downtime_length_int,
                                 debug=sim_config.debug,
                                 )

        # Set scheduler type - either based on config or command-line args - defaults to 'default'
        if self.config['multitenant']:
            scheduler_type = 'multitenant'
        else:
            scheduler_type = sim_config.scheduler

        policy_type = sim_config.policy
        backfill_type = sim_config.backfill

        self.scheduler = load_scheduler(scheduler_type)(
            config=self.config,
            policy=policy_type,
            bfpolicy=backfill_type,
            resource_manager=self.resource_manager,
            jobs=jobs
        )
        if sim_config.live:
            assert self.scheduler.policy != PolicyType.REPLAY, \
                "Cannot replay from a live system. Choose a scheduling policy!"
        print(f"Using scheduler: {str(self.scheduler.__class__).split('.')[2]}"
              f", with policy {self.scheduler.policy} "
              f"and backfill {self.scheduler.bfpolicy}")

        if self.simulate_network:
            available_nodes = self.resource_manager.available_nodes
            self.network_model = NetworkModel(
                available_nodes=available_nodes,
                config=self.config,
            )
        else:
            self.network_model = None

        return engine, wd, time_delta
    def get_workload_data(self) -> WorkloadData:
        return WorkloadData(
            jobs=self.jobs[:],
            telemetry_start=self.timestep_start, telemetry_end=self.timestep_end,
            start_date=self.start,
        )

    def add_running_jobs_to_queue(self, jobs_to_submit: List):
        """
@@ -713,7 +704,7 @@ class Engine:
        self.scheduler.policy = target_policy
        self.scheduler.bfpolicy = target_bfpolicy

    def run_simulation(self, jobs, timestep_start, timestep_end, time_delta=1, autoshutdown=False):
    def run_simulation(self, autoshutdown=False):
        """Generator that yields after each simulation tick."""

        if self.scheduler.policy == PolicyType.REPLAY:
@@ -722,24 +713,26 @@ class Engine:
            replay = False

        if self.debug:
            print(f"[DEBUG] run_simulation: Initial jobs count: {len(jobs)}")
            if jobs:
            print(f"[DEBUG] run_simulation: Initial jobs count: {len(self.jobs)}")
            if self.jobs:
                print("[DEBUG] run_simulation: First job submit_time: "
                      f"{jobs[0].submit_time}, start_time: {jobs[0].start_time}")
                      f"{self.jobs[0].submit_time}, start_time: {self.jobs[0].start_time}")

        # Set times and place jobs that are currently running, onto the system.
        self.prepare_system_state(all_jobs=jobs, timestep_start=timestep_start, timestep_end=timestep_end)
        self.prepare_system_state(all_jobs=self.jobs,
                                  timestep_start=self.timestep_start, timestep_end=self.timestep_end,
                                  )

        # Process jobs in batches for better performance of timestep loop
        all_jobs = jobs.copy()
        all_jobs = self.jobs.copy()
        submit_times = [j.submit_time for j in all_jobs]
        cursor = 0

        jobs = []
        # Batch Jobs into 6h windows based on submit_time or twice the time_delta if larger
        batch_window = max(60 * 60 * 6, 2 * time_delta)  # at least 6h
        batch_window = max(60 * 60 * 6, 2 * self.time_delta)  # at least 6h

        sim_state = SimulationState(time_delta)
        sim_state = SimulationState(self.time_delta)
        # listener_thread = threading.Thread(target=keyboard_listener, args=(sim_state,), daemon=True)
        # listener_thread.start()

@@ -751,7 +744,7 @@ class Engine:

            current_time_delta = sim_state.get_time_delta()

            if (self.current_timestep % batch_window == 0) or (self.current_timestep == timestep_start):
            if (self.current_timestep % batch_window == 0) or (self.current_timestep == self.timestep_start):
                # Add jobs that are within the batching window and remove them from all jobs
                # jobs += [job for job in all_jobs if job.submit_time <= self.current_timestep + batch_window]
                # all_jobs[:] = [job for job in all_jobs if job.submit_time > self.current_timestep + batch_window]
+3 −5
Original line number Diff line number Diff line
@@ -59,12 +59,10 @@ class RAPSEnv(gym.Env):
        self.action_space = spaces.Discrete(max_jobs)

    def _create_engine(self):
        engine, workload_data, time_delta = Engine.from_sim_config(self.sim_config)
        engine = Engine(self.sim_config)
        engine.scheduler.env = self
        self.jobs = workload_data.jobs
        timestep_start = workload_data.telemetry_start
        timestep_end = workload_data.telemetry_end
        self.generator = engine.run_simulation(self.jobs, timestep_start, timestep_end, time_delta)
        self.jobs = engine.jobs
        self.generator = engine.run_simulation()
        return engine

    def reset(self, **kwargs):
+13 −33
Original line number Diff line number Diff line
from collections.abc import Iterable
from raps.engine import Engine, TickData
from raps.sim_config import MultiPartSimConfig
from raps.utils import WorkloadData


class MultiPartEngine:
    def __init__(self, engines: dict[str, Engine], jobs: dict[str, list]):
        self.partition_names = sorted(engines.keys())
        self.engines = engines
        self.jobs = jobs

    @staticmethod
    def from_sim_config(sim_config: MultiPartSimConfig):
    def __init__(self, sim_config: MultiPartSimConfig):
        if sim_config.replay:
            root_systems = set(s.system_name.split("/")[0] for s in sim_config.system_configs)
            # TODO should consider how to pass separate replay values for separate systems
            if len(root_systems) > 1:
                raise ValueError("Replay for multi-system runs is not supported")

        workloads_by_partition: dict[str, WorkloadData] = {}
        engines: dict[str, Engine] = {}

        time_delta = 0
        for partition in sim_config.system_configs:
            name = partition.system_name
            engine, workload_data, time_delta = Engine.from_sim_config(
                sim_config, partition=name,
            )
            for job in workload_data.jobs:
                job.partition = name
            workloads_by_partition[name] = workload_data
            engines[name] = engine
        timestep_start = min(w.telemetry_start for w in workloads_by_partition.values())
        timestep_end = min(w.telemetry_end for w in workloads_by_partition.values())

        total_initial_jobs = sum(len(j.jobs) for j in workloads_by_partition.values())
            engine = Engine(sim_config, partition=partition.system_name)
            engines[partition.system_name] = engine

        total_initial_jobs = sum(len(e.jobs) for e in engines.values())
        for engine in engines.values():
            engine.total_initial_jobs = total_initial_jobs

        multi_engine = MultiPartEngine(
            engines=engines,
            jobs={p: w.jobs for p, w in workloads_by_partition.items()},
        )

        return multi_engine, workloads_by_partition, timestep_start, timestep_end, time_delta
        self.partition_names = sorted(engines.keys())
        self.engines = engines
        first_engine = list(engines.values())[0]
        self.start = first_engine.start
        self.timestep_start = first_engine.timestep_start
        self.timestep_end = first_engine.timestep_end

    def run_simulation(self, jobs: dict, timestep_start, timestep_end, time_delta=1
                       ) -> Iterable[dict[str, TickData | None]]:
    def run_simulation(self) -> Iterable[dict[str, TickData | None]]:
        generators = []
        for part in self.partition_names:
            generators.append(self.engines[part].run_simulation(
                jobs[part], timestep_start, timestep_end, time_delta,
            ))
            generators.append(self.engines[part].run_simulation())
        for tick_datas in zip(*generators, strict=True):
            yield dict(zip(self.partition_names, tick_datas))

+9 −15
Original line number Diff line number Diff line
@@ -49,37 +49,34 @@ def run_sim(sim_config: SingleSimConfig):
        print("Use run-parts to run multi-partition simulations")
        sys.exit(1)

    engine, workload_data, time_delta = Engine.from_sim_config(sim_config)
    engine = Engine(sim_config)

    out = sim_config.get_output()
    if out:
        out.mkdir(parents=True)
        engine.telemetry.save_snapshot(
            dest=str(out / 'snapshot.npz'),
            result=workload_data,
            result=engine.get_workload_data(),
            args=sim_config,
        )
        (out / 'sim_config.yaml').write_text(sim_config.dump_yaml())

    jobs = workload_data.jobs
    timestep_start, timestep_end = workload_data.telemetry_start, workload_data.telemetry_end
    jobs = engine.jobs
    timestep_start, timestep_end = engine.timestep_start, engine.timestep_end
    total_timesteps = timestep_end - timestep_start

    downscale = sim_config.downscale
    downscale_str = ""if downscale == 1 else f"/{downscale}"
    print(f"Simulating {len(jobs)} jobs for {total_timesteps}{downscale_str}"
          f" seconds from {timestep_start} to {timestep_end}.")
    print(f"Simulation time delta: {time_delta}{downscale_str} s,"
    print(f"Simulation time delta: {engine.time_delta}{downscale_str} s,"
          f"Telemetry trace quanta: {jobs[0].trace_quanta}{downscale_str} s.")
    layout_manager = LayoutManager(
        sim_config.layout, engine=engine,
        debug=sim_config.debug, total_timesteps=total_timesteps,
        args_dict=sim_config.get_legacy_args_dict(), **sim_config.system_configs[0].get_legacy(),
    )
    layout_manager.run(
        jobs,
        timestep_start=timestep_start, timestep_end=timestep_end, time_delta=time_delta,
    )
    layout_manager.run()

    engine_stats = get_engine_stats(engine)
    job_stats = get_job_stats(engine)
@@ -223,8 +220,7 @@ def run_parts_sim(sim_config: MultiPartSimConfig):
            UserWarning
        )

    multi_engine, workload_results, timestep_start, timestep_end, time_delta = \
        MultiPartEngine.from_sim_config(sim_config)
    multi_engine = MultiPartEngine(sim_config)

    out = sim_config.get_output()
    if out:
@@ -232,15 +228,13 @@ def run_parts_sim(sim_config: MultiPartSimConfig):
        for part, engine in multi_engine.engines.items():
            engine.telemetry.save_snapshot(
                dest=str(out / part.split('/')[-1]),
                result=workload_results[part],
                result=engine.get_workload_data(),
                args=sim_config,
            )
        (out / 'sim_config.yaml').write_text(sim_config.dump_yaml())

    jobs = {p: w.jobs for p, w in workload_results.items()}

    ui_update_freq = sim_config.system_configs[0].scheduler.ui_update_freq
    gen = multi_engine.run_simulation(jobs, timestep_start, timestep_end, time_delta)
    gen = multi_engine.run_simulation()

    for tick_datas in gen:
        sys_power = 0
Loading