Commit e20b9e78 authored by Brewer, Wes's avatar Brewer, Wes
Browse files

Add initial wfbench capability

parent 34dd1f2d
Loading
Loading
Loading
Loading
+6 −0
Original line number Diff line number Diff line
@@ -167,6 +167,12 @@ There are three ways to modify replaying of telemetry data:

4. `--shuffle`. Shuffle the jobs before playing.

## WfBench

    pip install commons

    raps run -w wfbench

## Job-level power output example for replay of single job

    raps run -f $DPATH/slurm/joblive/$DATEDIR,$DPATH/jobprofile/$DATEDIR --jid 1234567 -o
+1 −1
Original line number Diff line number Diff line
@@ -143,7 +143,7 @@ class SimConfig(RAPSBaseModel, abc.ABC):
    # Workload arguments (TODO split into separate model)
    workload: Literal['random', 'benchmark', 'peak', 'idle', 'synthetic',
                      'multitenant', 'replay', 'randomAI', 'network_test',
                      'inter_job_congestion', 'calculon', 'hpl'] = "random"
                      'inter_job_congestion', 'calculon', 'hpl', 'wfbench'] = "random"

    """ Type of synthetic workload """
    multimodal: list[float] = [1.0]
+3 −1
Original line number Diff line number Diff line
@@ -19,6 +19,7 @@ from .multitenant import MultitenantWorkload
from .network import NetworkTestWorkload
from .inter_job_congestion import InterJobCongestionWorkload
from .utils import plot_job_hist
from .wfbench import WfBenchWorkload


class BaseWorkload:
@@ -59,7 +60,8 @@ class Workload(
    NetworkTestWorkload,
    InterJobCongestionWorkload,
    Calculon,
    HPL
    HPL,
    WfBenchWorkload
):
    """Final workload class with all workload types."""
    pass
+125 −0
Original line number Diff line number Diff line
import pathlib
import random

from raps.job import Job, job_dict

# It is assumed that wfcommons is installed in the environment.
# The user should have activated the venv with `source /opt/venvs/exadigit/bin/activate`
# which should have wfcommons installed.
try:
    from wfcommons import BlastRecipe
    from wfcommons.wfbench import WorkflowBenchmark
    import networkx as nx
except ImportError:
    print("wfcommons or networkx not found. Please install them.")
    print("pip install wfcommons networkx")
    # This will fail later, but we give a hint.
    pass


class WfBenchWorkload:
    """
    Generate a workload based on a WfBench workflow.
    """

    def generate(self, **kwargs):
        """
        Generate jobs from a WfBench workflow.
        """
        args = kwargs.get('args', None)
        if args is None:
            raise ValueError("args not found in kwargs")

        # --- WfBench specific arguments ---
        # We can add more arguments to the command line later.
        num_tasks = getattr(args, 'wfbench_num_tasks', 50)
        cpu_work = getattr(args, 'wfbench_cpu_work', 100)
        data = getattr(args, 'wfbench_data', 10)
        percent_cpu = getattr(args, 'wfbench_percent_cpu', 0.6)
        # ---

        # 1. Generate workflow benchmark specification
        # For now, we use BlastRecipe as a default.
        # We can make the recipe configurable later.
        benchmark = WorkflowBenchmark(recipe=BlastRecipe, num_tasks=num_tasks)
        # The path is temporary, we don't need to store the benchmark file itself.
        benchmark.create_benchmark(pathlib.Path("/tmp/"), cpu_work=cpu_work, data=data, percent_cpu=percent_cpu)

        workflow = benchmark.workflow
        if not workflow:
            print("Workflow generation failed.")
            return []

        # 2. Convert the workflow to a list of RAPS jobs
        jobs = []
        task_to_job_map = {}
        task_end_times = {}
        job_id_counter = 0

        # The workflow is a networkx.DiGraph. We can use topological_sort.
        for task_name in nx.topological_sort(workflow):
            task = workflow.nodes[task_name].get('task')
            if task is None:
                # The task object is not stored under the 'task' key.
                # Let's assume the attributes are directly on the node data.
                # This is a fallback and might need adjustment based on the actual data structure.
                
                # Let's try to reconstruct a task-like object from the node data
                node_data = workflow.nodes[task_name]
                
                # Create a simple object that mimics the Task class structure
                class SimpleTask:
                    def __init__(self, name, runtime):
                        self.name = name
                        self.runtime = runtime

                task = SimpleTask(name=task_name, runtime=node_data.get('runtime'))


            # Determine submit time based on parent end times
            max_parent_end_time = 0
            for parent_task_name in workflow.predecessors(task_name):
                if parent_task_name in task_end_times:
                    max_parent_end_time = max(max_parent_end_time, task_end_times[parent_task_name])

            submit_time = max_parent_end_time
            
            # For simplicity, let's assume each task requires 1 node.
            # We can make this more complex later.
            nodes_required = 1
            
            # WfBench tasks have a `runtime`. We use it as `expected_run_time`.
            # If not available, use a default.
            # The runtime in wfcommons is in seconds.
            expected_run_time = int(task.runtime) if task.runtime else 60

            start_time = submit_time # RAPS will schedule it.
            end_time = start_time + expected_run_time

            task_end_times[task.name] = end_time

            new_job = Job(job_dict(
                nodes_required=nodes_required,
                name=f"wfbench_{task.name}",
                account="wfbench_user",
                end_state="Success",
                id=f"wfbench_{job_id_counter}",
                cpu_trace=1, # Placeholder
                gpu_trace=0, # Placeholder
                ntx_trace=None,
                nrx_trace=None,
                submit_time=submit_time,
                time_limit=expected_run_time + 60, # Add a buffer
                start_time=start_time,
                end_time=end_time,
                expected_run_time=expected_run_time
            ))
            jobs.append(new_job)
            task_to_job_map[task.name] = new_job
            job_id_counter += 1

        return jobs

    def wfbench(self, **kwargs):
        """ Alias for generate """
        return self.generate(**kwargs)