Commit f2b77f8f authored by Maiterth, Matthias's avatar Maiterth, Matthias
Browse files

Large Process toward live system simulation.

parent 58231a00
Loading
Loading
Loading
Loading
+11 −2
Original line number Diff line number Diff line
@@ -83,7 +83,16 @@ def main():
    args_dict['config'] = config
    flops_manager = FLOPSManager(**args_dict)

    if args.replay:
    if args.live and not args.replay:
        assert args.time is not None, {"--time must be set, specifing how long we want to predict"}
        td = Telemetry(**args_dict)
        jobs, timestep_start, timestep_end = \
            td.load_jobs_times_args_from_live_system()
        if args.output is not None:
            td.save_snapshot(jobs=jobs, timestep_start=timestep_start,
                             timestep_end=timestep_end, args=args, filename=td.dirname)

    elif args.replay:

        td = Telemetry(**args_dict)
        jobs, timestep_start, timestep_end, args_from_file = \
@@ -113,7 +122,7 @@ def main():
                         timestep_end=timestep_end, args=args, filename=td.dirname)

    if args.fastforward is not None:
        timestep_start = args.fastforward
        timestep_start = timestep_start + args.fastforward

    if args.time is not None:
        timestep_end = timestep_start + args.time
+3 −0
Original line number Diff line number Diff line
@@ -156,6 +156,9 @@ parser.add_argument("--jid", type=str, default="*",
parser.add_argument("--scale", type=int, default=0,
                    help=("Scale telemetry to a smaller target system, "
                          "e.g., --scale 192"))
parser.add_argument("--live", action="store_true",
                    help="Grab data from live system.")


# Synthetic workloads
parser = add_workload_to_parser(parser)
+2 −2
Original line number Diff line number Diff line
@@ -157,7 +157,7 @@ class ThermoFluidsModel:
            self.weather and self.weather.start is not None and \
                self.weather.has_coords:
            # Convert total seconds to timedelta object
            delta = timedelta(seconds=engine.current_time)
            delta = timedelta(seconds=engine.current_timestep)
            target_datetime = self.weather.start + delta

            # Get temperature from weather data
@@ -332,7 +332,7 @@ class ThermoFluidsModel:
        # FMU inputs are N powers and the wetbulb temp
        fmu_inputs = self.generate_fmu_inputs(runtime_values,
                                              uncertainties=engine.power_manager.uncertainties)
        cooling_inputs, cooling_outputs = self.step(engine.current_time,
        cooling_inputs, cooling_outputs = self.step(engine.current_timestep,
                                                    fmu_inputs,
                                                    engine.config['POWER_UPDATE_FREQ'])
        return cooling_inputs, cooling_outputs
+3 −3
Original line number Diff line number Diff line
@@ -81,14 +81,12 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):

    # Map dataframe to job state. Add results to jobs list
    for jidx in tqdm(range(num_jobs - 1), total=num_jobs, desc="Processing Jobs"):

        job_id = jobs_df.loc[jidx, 'job_id']
        if not jid == '*':
            if int(jid) == int(job_id):
                print(f'Extracting {job_id} profile')
            else:
                continue

        nodes_required = jobs_df.loc[jidx, 'num_nodes_alloc']
        name = str(uuid.uuid4())[:6]
        account = jobs_df.loc[jidx, 'user_id']
@@ -193,10 +191,12 @@ def load_data_from_df(jobs_df: pd.DataFrame, **kwargs):
                                time_limit=time_limit,
                                start_time=start_time,
                                end_time=end_time,
                                wall_time=wall_time,
                                expected_run_time=wall_time,
                                current_run_time=0,
                                trace_time=trace_time,
                                trace_start_time=trace_start_time,
                                trace_end_time=trace_end_time,
                                trace_quanta=None,
                                trace_missing_values=True
                                )
            job = Job(job_info)
+7 −5
Original line number Diff line number Diff line
@@ -220,10 +220,11 @@ def load_data(local_dataset_path, **kwargs):
            id=jid,
            priority=0,
            submit_time=sub,
            time_limit=0,
            time_limit=int(rec.get("wall_time")),
            start_time=st,
            end_time=et,
            wall_time=duration,
            expected_run_time=duration,
            current_run_time=0,
            trace_time=sub,
            trace_start_time=st,
            trace_end_time=et,
@@ -268,7 +269,7 @@ def load_data(local_dataset_path, **kwargs):
    bin_s = config.get("TRACE_QUANTA")
    jobs = []

    for r in jobs_raw:
    for r in jobs_raw:  # Is this intended? We go throught the 'raw' jobs_dicts that were creeated above?
        st_abs = int(r["start_time"])
        et_abs = int(r["end_time"])
        nodes = r.get("scheduled_nodes") or []
@@ -300,10 +301,11 @@ def load_data(local_dataset_path, **kwargs):
            id=jid,
            priority=0,
            submit_time=int(r["submit_time"]),
            time_limit=0,
            time_limit=int(r["time_limit"]),
            start_time=st_abs,
            end_time=et_abs,
            wall_time=et_abs - st_abs,
            expected_run_time=et_abs - st_abs,
            current_run_time=0,
            trace_time=st_abs,
            trace_start_time=st_abs,
            trace_end_time=st_abs + samples * bin_s,
Loading