Commit 4e032a38 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Get initial version of telemetry working for multi-partition systems

parent 38895605
Loading
Loading
Loading
Loading
+1 −1
Original line number Diff line number Diff line
@@ -90,7 +90,7 @@ if args.replay:
        if args.scale:
            for job in tqdm(jobs, desc=f"Scaling jobs to {args.scale} nodes"):
                job['nodes_required'] = random.randint(1, args.scale)
                args.reschedule = True
                job['requested_nodes'] = None # Setting to None triggers scheduler to assign nodes

        if args.reschedule:
            print("available nodes:", config['AVAILABLE_NODES'])
+27 −6
Original line number Diff line number Diff line
@@ -11,27 +11,48 @@ from raps.ui import LayoutManager
from raps.scheduler import Scheduler
from raps.flops import FLOPSManager
from raps.power import PowerManager, compute_node_power
from raps.telemetry import Telemetry
from raps.workload import Workload
from raps.utils import convert_to_seconds
from tqdm import tqdm

# Load configurations for each partition
partition_names = args.partitions
configs = [ConfigManager(system_name=partition).get_config() for partition in partition_names]
args_dicts = [{**vars(args), 'config': config} for config in configs]

# Initialize Workload with all configurations
# Initialize Workload
if args.replay:

    td = Telemetry(**args_dict)
    # Currently this assumes that an .npz file has already been created 
    # e.g., python main.py --system marconi100 -f ~/data/marconi100/job_table.parquet
    td = Telemetry(**args_dicts[0])
    print(f"Loading {args.replay[0]}...")
    jobs = td.load_snapshot(args.replay[0])

else:
    # Randomly assign partition
    for job in jobs: 
        job['partition'] = random.choice(partition_names)

    if args.scale:
        for job in tqdm(jobs, desc=f"Scaling jobs to {args.scale} nodes"):
            job['nodes_required'] = random.randint(1, args.scale)
            job['requested_nodes'] = None # Setting to None triggers scheduler to assign nodes

    if args.reschedule:
        print("available nodes:", config['AVAILABLE_NODES'])
        for job in tqdm(jobs, desc="Rescheduling jobs"):
            job['requested_nodes'] = None
            job['submit_time'] = next_arrival(1 / configs[0]['JOB_ARRIVAL_TIME'])

else: # Synthetic workload
    wl = Workload(*configs)

    # Generate jobs based on workload type
    jobs = getattr(wl, args.workload)(num_jobs=args.numjobs)

# Group jobs by partition
jobs_by_partition = {partition: [] for partition in wl.partitions}
jobs_by_partition = {partition: [] for partition in partition_names}
for job in jobs:
    jobs_by_partition[job['partition']].append(job)

+1 −1
Original line number Diff line number Diff line
@@ -43,7 +43,7 @@ class Telemetry:
        try:
            self.dataloader = importlib.import_module(f".dataloaders.{self.system}", package = __package__)
        except:
            print("WARNIGNG: Failed to load dataloader")
            print("WARNING: Failed to load dataloader")


    def save_snapshot(self, jobs: list, filename: str):