Commit f7239878 authored by David M. Rogers's avatar David M. Rogers
Browse files

Working jsrun/srun job step launch.

parent 5e7ffcba
Loading
Loading
Loading
Loading
+25 −8
Original line number Diff line number Diff line
@@ -7,7 +7,8 @@ Simulation Campaigns are listed make-style, one-by-one in a flat yaml-file::
    proj1: &defaults
      param: 1
      dirname: Proj1
      nodes: 500
      resources:
        nrs: 500
      out:
        log: run.log
        data: grid.hd5
@@ -22,6 +23,11 @@ Job types contain run-scripts and list input/output files::
    --- jobtypes.yaml
    sim_flow:
      time: 60 # minutes
      resources:
        tasks: 6
        gpu: 6
        cpu: 42
        jsrun_attr: "-l gpu-cpu -d plane:6 -b packed:7"
      inp:
        params: parameter_file.txt
      out:
@@ -31,19 +37,20 @@ Job types contain run-scripts and list input/output files::
        module load gcc spectrum-mpi
        export OMP_NUM_THREADS=7
      script: |
        jsrun -n {nodes} -l gpu-cpu -a 6 -g 6 -c 42 -d plane:6 -b packed:7 \
                 sim_flow {inp[params} {out[data]} >{out[log]}
        {mpirun} sim_flow {inp[params} {out[data]} >{out[log]}

The jobs are used within an allocation and make use of all available resources in parallel::

    #!/bin/bash
    #BSUB -P SPY007
    #BSUB -W 60
    #BSUB -W 62
    #BSUB -nnodes 1000
    #BSUB -J AllFlow
    #BSUB -wa TERM # Send a SIGTERM
    #BSUB -wt 2    # 2 min. before job completion.

    module load python/3
    python3 make.py jobtypes.yaml make.yaml
    python3 make.py jobtypes.yaml make.yaml 60

Getting Started
---------------
@@ -55,7 +62,7 @@ to make, fill them into your own `make.yaml`.
For help with syntax, see the `examples` subdirectory.

There are 2 important things to keep in mind for interacting with
the scheduler.  Every job must contain keys for `nodes` and `time` (in minutes).
the scheduler.  Every job must contain keys for `resources` and `time` (in minutes).
It is possible to choose your allocation poorly, and try running 2 jobs
requiring 500 nodes on 999 nodes, in which case 499 nodes will sit idle.
It is also possible to run out of time to complete all jobs.
@@ -86,7 +93,17 @@ This means ordinary brackets, `{}`, need to be written as
double-brackets, `{{}}`.  Also, it means you can use the
variables defined in the yaml itself in the script.

In addition to those variables, you also have access to
everything defined the `reqs` dictionary of the `Job` class
It is important to use the `{mpirun}` substitution in your
`script`.  This is expanded to the appropriate `jsrun`
or `srun` command for your chosen `resources` depending
on whether the environment variables `SLURM_JOB_NUM_NODES` or
`LSB_MAX_NUM_PROCESSORS` are found.
If neither of these environment variables are set,
the `machine()` function in `machine.py` sets `launcher='local'`
In this mode `{mpirun}` expands to an empty string,
and only 1-processor jobs are allowed (up to 4 simultaneously).

In addition to variables from your yaml files, `script` substitutions
can also access everything defined the `reqs` dictionary of the `Job` class
(see `jobtypes.py`).
+34 −20
Original line number Diff line number Diff line
@@ -2,8 +2,15 @@
# how to generate a group of output files from a group of input files.
# Classic `make` permits 1 output file per rule.
grompp:
  time: 10.0
  nodes: 1
  time: 5.0
  resource:
      nrs: 1 # 1 resource set
      cpu: 1
      # These are default values:
      #tasks: 1
      #gpu: 0
      #jsrun_attr: ""
      #srun_attr: ""
  inp:
    mdp: grompp.mdp
    top: topol.top
@@ -16,11 +23,15 @@ grompp:
    export OMP_NUM_THREADS=7
    GMX=gmx
  script: |
    $GMX grompp -f {inp[mdp]} -p {inp[top]} -c {inp[conf]} -o {out[tpr]} -maxwarn 99
    {mpirun} $GMX grompp -f {inp[mdp]} -p {inp[top]} -c {inp[conf]} -o {out[tpr]} -maxwarn 99

mdrun:
  nodes: 2
  time: 120.0
  time: 100.0
  resource:
      nrs: 2
      cpu: 42
      gpu: 6
      jsrun_attr: "-l gpu-cpu -d plane:6 -b packed:7"
  inp:
      tpr: topol.tpr
  out:
@@ -35,16 +46,20 @@ mdrun:
      export OMP_NUM_THREADS=7
      GMX=gmx_mpi
  script: |
    jsrun -n {nodes} -l gpu-cpu -a 6 -g 6 -c 42 -d plane:6 -b packed:7 \
             $GMX mdrun -cpi {out[cpt]} \
      {mpirun} $GMX mdrun -cpi {out[cpt]} \
                        -s {inp[tpr]} -g {out[log]} -e {out[edr]} \
                        -c {out[conf]} -x {out[xtc]} -cpo {out[cpt]} \
                        -maxh 2 -pme gpu -npme 1 -nb gpu -bonded gpu -pin off
                        -pme gpu -npme 1 -nb gpu -bonded gpu -pin off

# WARNING: this one uses the same output name throughout.
gmx_remd:
  time: 120.0
  jobtype: Restartable
  resource:
      nrs: 40
      cpu: 42
      gpu: 6
      jsrun_attr: "-X 1 -l gpu-cpu -d plane:6 -b packed:7"
  inp:
    tpr: remd_00/remd.tpr
  out:
@@ -59,8 +74,7 @@ gmx_remd:
    export OMP_NUM_THREADS=7
    GMX=gmx_mpi
  script: |
    jsrun -X 1 -n {nodes} -c 42 -a 6 -g 6 -d plane:6 -b packed:7 \
        $GMX mdrun -deffnm `basename {out[log]} .log` -multidir `ls -d */` \
    {mpirun} $GMX mdrun -deffnm `basename {out[log]} .log` -multidir `ls -d */` \
        -pme gpu -npme 1 -noconfout -nb gpu -bonded gpu -pin off \
        -cpi remd.cpt -cpt 1 -maxh 1.95 -replex 1000 &>>{out[run_log]}
+0 −0

File moved.

graph.py

0 → 100644
+210 −0
Original line number Diff line number Diff line
import networkx as nx
from heapq import *

from pathlib import Path
from jobtypes import Top
from helpers import TargetError, str_ok, str_warn, str_err

from functools import reduce

# Key data objects for make:
#   job-class lookup dictionary, jobtype : Job implemention class
#     - the types dict
#   job-type lookup dictionary, filename : (creating jobtype, filetype tag)
#     - present in types.lookup dict
#   job graph -- edges are directed foward in time (x triggers y)
#   leaf-set of jobs ready to run

# G       : networkx.DiGraph of the current computation
# types   : dict of jobnames -> implementation classes
# rule    : Job class being currently implemented
# args    : dict of args to pass through to all descendant jobs (includes dirname)
def append_graph(G, types, rule, args):
    addl = [rule] # breadth-first traversal
    while len(addl) > 0:
      rule = addl.pop()
      for ttype,tname in rule.inp.items():
        p = rule.dirname / tname
        if p.exists(): # TODO: implement 'completeness' test?
            continue   # or check that all outputs of a given job (assuming lookup is successful) are present.
        try:
            jobname, ftype = types.lookup[tname]
        except KeyError:
            raise TargetError("No rule to make target '%s' needed by '%s'."%(tname, rule.jobname))
        if ftype != ttype:
            raise TargetError("Rule %s produces target file '%s' of type '%s' (but %s was requested)."%(jobname, tname, ftype, ttype))

        # instantiate the job
        job = types[jobname](args)

        G.add_edge(job, rule, obj=p) # path is created, and rule is enabled by running 'job'
        addl.append(job) # follow-up with inputs of this job

class TaskGraph:
    def __init__(self, types, jobs, testonly=False):
        self.testonly = testonly
        self.G = nx.DiGraph()
        for rule, args in jobs.items():
            if 'dirname' not in args:
                raise KeyError("Job %s is missing 'dirname:'!"%rule)
            args['dirname'] = Path(args['dirname']) # create rich repr
            if not args['dirname'].is_dir():
                raise KeyError("Job %s: missing directory %s"%(rule,args['dirname']))
            if 'out' not in args:
                raise KeyError("Job %s is missing 'out:' a dictionary of outputs!"%rule)
    
            task = Top(rule, args)
            task.inp = args['out']
            del args['out'] # not inherited
            append_graph(self.G, types, task, args)

        # create priority queue of runnable tasks
        wt = prio_heft(self.G)
        self.Q = PriorityQueue(wt)
        for task in self.G.nodes():
            if self.is_root(task):
                self.Q.push(task)

        print("%d/%d tasks ready to run."%(len(self.Q), len(self.G)))
        self.completed = 0
        self.errors = 0

    def is_root(self, task):
        try:
            _ = self.G.predecessors(task).__next__()
        except StopIteration:
            return True
        return False

    # get_work : Machine, time_in_min -> task | None
    #     get_work is responsible for calling Machine.alloc
    #     Manager calls Machine.free
    # task = {
    #          rs      = ResourceSet
    #          prepare = (testonly=False) -> [cmd_string] | None
    #          finish  = (return_val : int) -> None
    #        }
    def get_work(self, M, time):
        task = self.Q.get_task(M, time)
        if task is None:
            return None
        # wrap self.prepare & self.finish in the requested API
        return Task(task, self.prepare, self.finish)

    def prepare(self, task, testonly=False):
        cmd = task.prepare(testonly)
        if cmd is None:
            return cmd
        assert isinstance(cmd, list), "Task %s returned non-strlist command!"%(task)
        return cmd

    def finish(self, task, ret):
        if ret != 0:
            self.errors += 1
            print("Job returned nonzero exit code, %d.\n"%ret)
            return False # don't escalate, but don't continue

        self.completed += 1
    
        # enqueue newly enabled jobs
        tset = list( self.G[task] )
        self.G.remove_node(task)
        for task in tset:
            if self.is_root(task):
                self.Q.push(task)

        return False

    def stats(self):
        return "Job stats:\n" + \
               "    %s: %d\n    %s: %d\n    %s: %d" % (
                         str_ok("completed"), self.completed,
                         str_err("errors"), self.errors,
                         str_warn("incomplete"), len(self.G)
                   )

# Closure storing task and its associated TaskGraph
# and providing prepare, finish callbacks.
class Task:
    def __init__(self, task, prep, fin):
        self.task = task
        self.time = task.time
        self.rs   = task.resource

        self.prep = prep
        self.fin  = fin

    def prepare(self, testonly=False):
        return self.prep(self.task, testonly)
    def finish(self, ret):
        return self.fin(self.task, ret)

# dole out heterogeneous earliest finish time priorities
def prio_heft(G):
    queue = [k for k in G.nodes() if len(G[k]) == 0]
    wt = {}
    while len(queue) > 0:
        task = queue.pop()
        w = reduce(max, (wt[s] for s in G[task]), 0)
        wt[task] = w + task.time*task.resource.nrs*(task.resource.cpu+task.resource.gpu*25)
        for s in G.predecessors(task):
            if s not in wt:
                queue.append(s)
    return wt

# wt is a dictionary mapping all potential tasks to priorities.
# self.l holds the heap of tasks actually in the queue.
# push(task)  pushes a task into the queue (must be in wt)
# pop()       returns highest priority item
# get_task    filters through tasks in prio-order
class PriorityQueue:
    def __init__(self, wt):
        self.l = []
    def __len__(self):
        return len(self.l)
    def push(self, item, pri=0):
        heappush(self.l, (-pri, item))

    def pop(self): # raises IndexError on empty
        npri, item = heappop(self.l)
        return item

    # Pop the highest priority task
    # (that meets the requirements) off the queue.
    # This also removes all items from the
    # queue that can't run in the current machine allocation.
    # WARNING: This monkey-patches task to set
    #          task.mpirun = cmd
    #
    # Returns Job class or None
    # Raises StopIteration when no runnable jobs are left.
    # Cannot return None when provided the whole machine.
    #
    # M         : Machine -- machine availability status
    # time      : float   -- minutes available now, upper
    #                        limit on returned job's time
    def get_task(self, M, time):
        tmp = [] # temporarily removed tasks
        for i in range(len(self.l)):
            task = self.pop()
            if task.time > time: # won't complete
                print("Skipping %s (requires %.0f minutes, but have only %.0f)"
                     % (task, task.time, time) )
                continue

            cmd = M.alloc(task.resource)
            if cmd is not None:
                break # next task matches requirements
            if M.possible(task.resource):
                tmp.append(task) # maybe can do later
        else:
            if len(tmp) == 0:
                raise StopIteration

        for j in tmp: # contains all jobs for which cmd == None
            self.push(j)
        if cmd is None:
            return None
        task.mpirun = cmd
        return task
+36 −16
Original line number Diff line number Diff line

import yaml
from helpers import *
from machine import ResourceSet

jobscript = """
cd {dirname}
@@ -8,16 +8,30 @@ cd {dirname}
%s
"""

# NOTE: potential job return states are
# "none"    == not started
# "started" == in progress
# "error"   == error detected
# "done"    == completed job
# Top-level rules are weak
class Top:
    time = 0.0
    resource = ResourceSet(1, 1, 0)
    def __init__(self, jobname, args):
        self.jobname = jobname
        self.dirname = args['dirname']
    def __lt__(a, b):
        return repr(a) < repr(b)
    def __gt__(a, b):
        return repr(a) > repr(b)
    def __repr__(self):
        return "Top(%s, {'dirname': %s})"%(self.jobname, self.dirname)
    def prepare(self, testonly=False):
        if testonly:
            return 'echo "%s"' % str_ok("Would have finished target %s."%self.jobname)
        return 'echo "%s"' % str_ok("Finished target %s."%self.jobname)
    def finish(self):
        pass

class Job:
    reqs = { 'dirname' : Path,
             'jobname' : str,
             'nodes'   : int,
             'resource': ResourceSet,
             'time'    : float,
             'inp'     : dict,
             'out'     : dict,
@@ -57,6 +71,10 @@ class Job:
                        setattr(self, var, val)
            else:   
                raise SyntaxError("Job %s requires variable '%s' (type %s)"%(self.jobname, var, str(cls)))
            
            # Process 'resource' into a real ResourceSet
            if var == 'resource':
                self.resource = ResourceSet(**self.resource)
            if not isinstance(getattr(self,var), cls):
                raise SyntaxError("Job %s requires variable '%s' to have type %s (incorrect type)"%(self.jobname, var, str(cls)))

@@ -66,6 +84,7 @@ class Job:
        args = {}
        for var in self.reqs.keys():
            args[var] = getattr(self, var)
        args['mpirun'] = self.mpirun
        return script.format(**args)

    def setup_job(self):
@@ -77,19 +96,20 @@ class Job:
            f.write(self.format(jobscript % self.script))
        return False

    # run returns True on success and False on error
    def run(self, testonly=False):
    def prepare(self, testonly=False):
        # Create next job and check whether it's 'sh' is present.
        # Note: the Manager object has monkey-patched
        # an 'mpirun' attribute into self.
        self.next_job = JobState(self.dirname, self.jobname)

        if testonly:
            print("Would run " + str(self.next_job))
            return True
            return ["echo", "Would run %s" % str(self.next_job)]
        if self.setup_job(): # error creating jobscript
            return False
        if self.next_job.exec():
            return False
        return True
            return None
        return [ "bash", self.next_job.job_in ]

    def finish(self):
        pass

# Mapping from rule name to implementation class
#  lookup : fname -> (rule, ftype)
@@ -112,7 +132,7 @@ def read_jobtypes(filename):
        return types

    with open(filename) as f:
        x = yaml.load(f)
        x = yaml.safe_load(f)

    for name, info in x.items():
        info['jobname'] = name # name of make-rule
Loading