Commit 3fd6f630 authored by Rogers, David's avatar Rogers, David
Browse files

Merge branch 'targets' into 'master'

Targets

See merge request 99R/flow!1
parents 53c700ab f7239878
Loading
Loading
Loading
Loading
+76 −91
Original line number Diff line number Diff line
@@ -3,122 +3,107 @@ Simple file-based job manager

Simulation Campaigns are listed make-style, one-by-one in a flat yaml-file::

    Cov1n: &CovRun
      jobtype: mdrun
      dirname: /gpfs/alpine/proj-shared/bip200/Cov/Cov1/Norest
      inp:
        tpr: run.tpr
    --- make.yaml
    proj1: &defaults
      param: 1
      dirname: Proj1
      resources:
        nrs: 500
      out:
        xtc: run.xtc
        edr: run.edr
        log: run.log
        cpt: run.cpt
        conf: run.gro
      retry:
        checkpoint:
          - tpr
          - cpt
        complete:
          - "grep -q 'Statistics over 50000001 steps' {out[log]}"

    Cov2r:
      <<: *CovRun
      dirname: /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Wrest

    Cov2n:
      <<: *CovRun
      dirname: /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Norest
        data: grid.hd5

    proj2:
      param: 2
      dirname: Proj2
      <<: *defaults

Job types contain run-scripts and list input/output files::

    mdrun:
      jobtype: Restartable
    --- jobtypes.yaml
    sim_flow:
      time: 60 # minutes
      resources:
        tasks: 6
        gpu: 6
        cpu: 42
        jsrun_attr: "-l gpu-cpu -d plane:6 -b packed:7"
      inp:
        tpr: topol.tpr
        params: parameter_file.txt
      out:
        log: run.log
        edr: run.edr
        conf: run.gro
        xtc: run.xtc
        cpt: run.cpt
      nodes: 2
      preroll: 6 # number of jobs to stack in the queue (this makes 12 h)
      time: "2:00"
        data: grid.hd5
      setup: |
        module load gcc spectrum-mpi gromacs/2020-rdtscp_off
        export GMX_MAXBACKUP=-1
        module load gcc spectrum-mpi
        export OMP_NUM_THREADS=7
        GMX=gmx_mpi
      script: |
        jsrun -n {nodes} -l gpu-cpu -a 6 -g 6 -c 42 -d plane:6 -b packed:7 \
                 $GMX mdrun -s {inp[tpr]} -g {out[log]} -e {out[edr]} \
                            -c {out[conf]} -x {out[xtc]} -cpo {out[cpt]} \
                            -maxh 2 -pme gpu -npme 1 -nb gpu -bonded gpu -pin off
      retry_script: |
        jsrun -n {nodes} -l gpu-cpu -a 6 -g 6 -c 42 -d plane:6 -b packed:7 \
                 $GMX mdrun -cpi {out[cpt]} \
                            -s {inp[tpr]} -g {out[log]} -e {out[edr]} \
                            -c {out[conf]} -x {out[xtc]} -cpo {out[cpt]} \
                            -maxh 2 -pme gpu -npme 1 -nb gpu -bonded gpu -pin off

Usage is also make-style::

    $ bjobs
    JOBID   USER       STAT   SLOTS    QUEUE       START_TIME    FINISH_TIME   JOB_NAME
    993162  user       PEND      -     batch             -             -       Cov2r.5
    993163  user       PEND      -     batch             -             -       Cov2r.5
    993164  user       PEND      -     batch             -             -       Cov2r.5
    
    
    $ python3 ~/src/flow/make.py -j jobtypes.yaml Cov.yaml BIP200
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov1/Norest/Cov1n.4.out completed successfully.
    Testing: grep -q 'Statistics over 50000001 steps' run.log
    [FAIL]
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov1/Norest/Cov1n.4.out requires restart (failed test: grep -q 'Statistics over 50000001 steps' run.log).
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov1/Norest/Cov1n.5.sh has been created (probably running).
    
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Wrest/Cov2r.4.out completed successfully.
    Testing: grep -q 'Statistics over 50000001 steps' run.log
    [FAIL]
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Wrest/Cov2r.4.out requires restart (failed test: grep -q 'Statistics over 50000001 steps' run.log).
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Wrest/Cov2r.5.sh has been created (probably running).
    
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Norest/Cov2n.4.out completed successfully.
    Testing: grep -q 'Statistics over 50000001 steps' run.log
    [FAIL]
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Norest/Cov2n.4.out requires restart (failed test: grep -q 'Statistics over 50000001 steps' run.log).
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Norest/Cov2n.5.sh has been created (probably running).
        {mpirun} sim_flow {inp[params} {out[data]} >{out[log]}

The jobs are used within an allocation and make use of all available resources in parallel::

    #!/bin/bash
    #BSUB -P SPY007
    #BSUB -W 62
    #BSUB -nnodes 1000
    #BSUB -J AllFlow
    #BSUB -wa TERM # Send a SIGTERM
    #BSUB -wt 2    # 2 min. before job completion.

    module load python/3
    python3 make.py jobtypes.yaml make.yaml 60

Getting Started
---------------

Start by running just as you normally would.  When you get a good jobscript
going, transform it into an entry in `jobtypes.yaml` and point to it from
your own `make.yaml`.  For help with syntax, see the `examples` subdirectory.

The source, `jobtypes.py`, contains 4 basic types of jobs for now,
`Local`, `Simple`, `Restartable`, and `Chain`.  It should produce error
messages if you get the names wrong.  Contact me if the error messages are not
Start by running jobs without this tool.  When you get a good workflow
going, transform it into an entry in `jobtypes.yaml`.
When you get to the point where you're listing outputs you'd like
to make, fill them into your own `make.yaml`.
For help with syntax, see the `examples` subdirectory.

There are 2 important things to keep in mind for interacting with
the scheduler.  Every job must contain keys for `resources` and `time` (in minutes).
It is possible to choose your allocation poorly, and try running 2 jobs
requiring 500 nodes on 999 nodes, in which case 499 nodes will sit idle.
It is also possible to run out of time to complete all jobs.

I'm working to create a simulator for your
workflow to help select the optimal number of nodes and allocation time
to avoid the first kind of problem.
I'm working to add a timer to `make.py` to detect the second kind of
problem and exit early.  Subsequent invocations of `make.py` should
then continue where the work left off.

Other than reserved variable names, any key can be present in your yaml file.
These will be available for substitution into the job's `script`
(see Substitutions_ below).
There should also be helpful error messages if you get the syntax of yaml wrong.
Contact me if the error messages are not
helpful enough to solve your problems, or if you have new ideas on functionality.

Substitutions
-------------

Before `make.py` runs jobs, it generates jobscripts from your yaml.
The `environment`, `script` and `retry_script` variables of each job
As `make.py` runs jobs, it generates jobscripts from your yaml.
The `setup` and `script` variables of each job
go through substitution at creation time for each script.
In the case of a `Restartable` jobtype, the `retry_script`
and `complete` tests are also substituted.

Specifically, the python format() function is invoked.
This means ordinary brackets, `{}`, need to be written as
double-brackets, `{{}}`.  Also, it means you can use the
variables defined in the yaml itself in the script.

In addition to those variables, you also have access to
everything defined the `reqs` dictionary of jobtypes.py.
Finally, every project also has:

    proj = the project ID input to `make.py`
    jobname = the job name itself (top-level header for the job)
It is important to use the `{mpirun}` substitution in your
`script`.  This is expanded to the appropriate `jsrun`
or `srun` command for your chosen `resources` depending
on whether the environment variables `SLURM_JOB_NUM_NODES` or
`LSB_MAX_NUM_PROCESSORS` are found.
If neither of these environment variables are set,
the `machine()` function in `machine.py` sets `launcher='local'`
In this mode `{mpirun}` expands to an empty string,
and only 1-processor jobs are allowed (up to 4 simultaneously).

In addition to variables from your yaml files, `script` substitutions
can also access everything defined the `reqs` dictionary of the `Job` class
(see `jobtypes.py`).

examples/chain.yaml

deleted100644 → 0
+0 −28
Original line number Diff line number Diff line
# This example shows a chain workflow that composes
# a grompp step together with an mdrun.
# The grompp has jobtype Local, and completes immediately,
# while the mdrun is Restartable, and pre-rolls.
TestRun:
  jobtype: Chain
  dirname: /gpfs/alpine/proj-shared/myproj/Simulation
  inp:
    mdp: "run.mdp"
    top: topol.top
    conf: start.pdb
  chain:
    - grompp:
        out:
          tpr: run.tpr
    - mdrun:
        preroll: 2
        out:
          xtc: run.xtc
          edr: run.edr
          log: run.log
          cpt: run.cpt
          conf: run.gro
        retry:
          checkpoint: [cpt]
          complete:
            - "grep -q 'Statistics over 5000001 steps' {out[log]}"
+38 −34
Original line number Diff line number Diff line
# These jobtypes are make-rules that know
# how to generate a group of output files from a group of input files.
# Classic `make` permits 1 output file per rule.
grompp:
  jobtype: Local
  time: 5.0
  resource:
      nrs: 1 # 1 resource set
      cpu: 1
      # These are default values:
      #tasks: 1
      #gpu: 0
      #jsrun_attr: ""
      #srun_attr: ""
  inp:
    mdp: grompp.mdp
    top: topol.top
@@ -12,10 +23,15 @@ grompp:
    export OMP_NUM_THREADS=7
    GMX=gmx
  script: |
    $GMX grompp -f {inp[mdp]} -p {inp[top]} -c {inp[conf]} -o {out[tpr]} -maxwarn 99
    {mpirun} $GMX grompp -f {inp[mdp]} -p {inp[top]} -c {inp[conf]} -o {out[tpr]} -maxwarn 99

mdrun:
  jobtype: Restartable
  time: 100.0
  resource:
      nrs: 2
      cpu: 42
      gpu: 6
      jsrun_attr: "-l gpu-cpu -d plane:6 -b packed:7"
  inp:
      tpr: topol.tpr
  out:
@@ -24,29 +40,26 @@ mdrun:
      conf: run.gro
      xtc: run.xtc
      cpt: run.cpt
  nodes: 2
  preroll: 6 # number of jobs to stack in the queue (this makes 12 h)
  time: "2:00"
  setup: |
      module load gcc spectrum-mpi gromacs/2020-rdtscp_off
      export GMX_MAXBACKUP=-1
      export OMP_NUM_THREADS=7
      GMX=gmx_mpi
  script: |
    jsrun -n {nodes} -l gpu-cpu -a 6 -g 6 -c 42 -d plane:6 -b packed:7 \
             $GMX mdrun -s {inp[tpr]} -g {out[log]} -e {out[edr]} \
                        -c {out[conf]} -x {out[xtc]} -cpo {out[cpt]} \
                        -maxh 2 -pme gpu -npme 1 -nb gpu -bonded gpu -pin off
  retry_script: |
    jsrun -n {nodes} -l gpu-cpu -a 6 -g 6 -c 42 -d plane:6 -b packed:7 \
             $GMX mdrun -cpi {out[cpt]} \
      {mpirun} $GMX mdrun -cpi {out[cpt]} \
                        -s {inp[tpr]} -g {out[log]} -e {out[edr]} \
                        -c {out[conf]} -x {out[xtc]} -cpo {out[cpt]} \
                        -maxh 2 -pme gpu -npme 1 -nb gpu -bonded gpu -pin off
                        -pme gpu -npme 1 -nb gpu -bonded gpu -pin off

# WARNING: this one uses the same output name throughout.
gmx_remd:
  time: 120.0
  jobtype: Restartable
  resource:
      nrs: 40
      cpu: 42
      gpu: 6
      jsrun_attr: "-X 1 -l gpu-cpu -d plane:6 -b packed:7"
  inp:
    tpr: remd_00/remd.tpr
  out:
@@ -55,22 +68,13 @@ gmx_remd:
    trr: remd_00/remd.trr
    cpt: remd_00/remd.cpt
    run_log: run.log
  preroll: 6 # number of jobs to stack in the queue (this makes 12 h)
  time: "2:00"
  setup: |
    module load gcc spectrum-mpi gromacs/2020-rdtscp_off
    export GMX_MAXBACKUP=-1
    export OMP_NUM_THREADS=7
    GMX=gmx_mpi
  script: |
    jsrun -X 1 -n {nodes} -c 42 -a 6 -g 6 -d plane:6 -b packed:7 \
        $GMX mdrun -deffnm `basename {out[log]} .log` -multidir `ls -d */` \
        -pme gpu -npme 1 -noconfout -nb gpu -bonded gpu -pin off \
        -cpt 1 -maxh 1.95 -replex 1000 &>{out[run_log]}
  retry_script: |
    echo "Starting retry step {jobid}" >>{out[run_log]}
    jsrun -X 1 -n {nodes} -c 42 -a 6 -g 6 -d plane:6 -b packed:7 \
        $GMX mdrun -deffnm `basename {out[log]} .log` -multidir `ls -d */` \
    {mpirun} $GMX mdrun -deffnm `basename {out[log]} .log` -multidir `ls -d */` \
        -pme gpu -npme 1 -noconfout -nb gpu -bonded gpu -pin off \
        -cpi remd.cpt -cpt 1 -maxh 1.95 -replex 1000 &>>{out[run_log]}

examples/sim.yaml

0 → 100644
+11 −0
Original line number Diff line number Diff line
# This example shows a 2-step workflow that composes
# a grompp step together with an mdrun.
TestRun:
  dirname: Simulation
  out:
      xtc: run.xtc
      edr: run.edr
      log: run.log
      cpt: run.cpt
      conf: run.gro

graph.py

0 → 100644
+210 −0
Original line number Diff line number Diff line
import networkx as nx
from heapq import *

from pathlib import Path
from jobtypes import Top
from helpers import TargetError, str_ok, str_warn, str_err

from functools import reduce

# Key data objects for make:
#   job-class lookup dictionary, jobtype : Job implemention class
#     - the types dict
#   job-type lookup dictionary, filename : (creating jobtype, filetype tag)
#     - present in types.lookup dict
#   job graph -- edges are directed foward in time (x triggers y)
#   leaf-set of jobs ready to run

# G       : networkx.DiGraph of the current computation
# types   : dict of jobnames -> implementation classes
# rule    : Job class being currently implemented
# args    : dict of args to pass through to all descendant jobs (includes dirname)
def append_graph(G, types, rule, args):
    addl = [rule] # breadth-first traversal
    while len(addl) > 0:
      rule = addl.pop()
      for ttype,tname in rule.inp.items():
        p = rule.dirname / tname
        if p.exists(): # TODO: implement 'completeness' test?
            continue   # or check that all outputs of a given job (assuming lookup is successful) are present.
        try:
            jobname, ftype = types.lookup[tname]
        except KeyError:
            raise TargetError("No rule to make target '%s' needed by '%s'."%(tname, rule.jobname))
        if ftype != ttype:
            raise TargetError("Rule %s produces target file '%s' of type '%s' (but %s was requested)."%(jobname, tname, ftype, ttype))

        # instantiate the job
        job = types[jobname](args)

        G.add_edge(job, rule, obj=p) # path is created, and rule is enabled by running 'job'
        addl.append(job) # follow-up with inputs of this job

class TaskGraph:
    def __init__(self, types, jobs, testonly=False):
        self.testonly = testonly
        self.G = nx.DiGraph()
        for rule, args in jobs.items():
            if 'dirname' not in args:
                raise KeyError("Job %s is missing 'dirname:'!"%rule)
            args['dirname'] = Path(args['dirname']) # create rich repr
            if not args['dirname'].is_dir():
                raise KeyError("Job %s: missing directory %s"%(rule,args['dirname']))
            if 'out' not in args:
                raise KeyError("Job %s is missing 'out:' a dictionary of outputs!"%rule)
    
            task = Top(rule, args)
            task.inp = args['out']
            del args['out'] # not inherited
            append_graph(self.G, types, task, args)

        # create priority queue of runnable tasks
        wt = prio_heft(self.G)
        self.Q = PriorityQueue(wt)
        for task in self.G.nodes():
            if self.is_root(task):
                self.Q.push(task)

        print("%d/%d tasks ready to run."%(len(self.Q), len(self.G)))
        self.completed = 0
        self.errors = 0

    def is_root(self, task):
        try:
            _ = self.G.predecessors(task).__next__()
        except StopIteration:
            return True
        return False

    # get_work : Machine, time_in_min -> task | None
    #     get_work is responsible for calling Machine.alloc
    #     Manager calls Machine.free
    # task = {
    #          rs      = ResourceSet
    #          prepare = (testonly=False) -> [cmd_string] | None
    #          finish  = (return_val : int) -> None
    #        }
    def get_work(self, M, time):
        task = self.Q.get_task(M, time)
        if task is None:
            return None
        # wrap self.prepare & self.finish in the requested API
        return Task(task, self.prepare, self.finish)

    def prepare(self, task, testonly=False):
        cmd = task.prepare(testonly)
        if cmd is None:
            return cmd
        assert isinstance(cmd, list), "Task %s returned non-strlist command!"%(task)
        return cmd

    def finish(self, task, ret):
        if ret != 0:
            self.errors += 1
            print("Job returned nonzero exit code, %d.\n"%ret)
            return False # don't escalate, but don't continue

        self.completed += 1
    
        # enqueue newly enabled jobs
        tset = list( self.G[task] )
        self.G.remove_node(task)
        for task in tset:
            if self.is_root(task):
                self.Q.push(task)

        return False

    def stats(self):
        return "Job stats:\n" + \
               "    %s: %d\n    %s: %d\n    %s: %d" % (
                         str_ok("completed"), self.completed,
                         str_err("errors"), self.errors,
                         str_warn("incomplete"), len(self.G)
                   )

# Closure storing task and its associated TaskGraph
# and providing prepare, finish callbacks.
class Task:
    def __init__(self, task, prep, fin):
        self.task = task
        self.time = task.time
        self.rs   = task.resource

        self.prep = prep
        self.fin  = fin

    def prepare(self, testonly=False):
        return self.prep(self.task, testonly)
    def finish(self, ret):
        return self.fin(self.task, ret)

# dole out heterogeneous earliest finish time priorities
def prio_heft(G):
    queue = [k for k in G.nodes() if len(G[k]) == 0]
    wt = {}
    while len(queue) > 0:
        task = queue.pop()
        w = reduce(max, (wt[s] for s in G[task]), 0)
        wt[task] = w + task.time*task.resource.nrs*(task.resource.cpu+task.resource.gpu*25)
        for s in G.predecessors(task):
            if s not in wt:
                queue.append(s)
    return wt

# wt is a dictionary mapping all potential tasks to priorities.
# self.l holds the heap of tasks actually in the queue.
# push(task)  pushes a task into the queue (must be in wt)
# pop()       returns highest priority item
# get_task    filters through tasks in prio-order
class PriorityQueue:
    def __init__(self, wt):
        self.l = []
    def __len__(self):
        return len(self.l)
    def push(self, item, pri=0):
        heappush(self.l, (-pri, item))

    def pop(self): # raises IndexError on empty
        npri, item = heappop(self.l)
        return item

    # Pop the highest priority task
    # (that meets the requirements) off the queue.
    # This also removes all items from the
    # queue that can't run in the current machine allocation.
    # WARNING: This monkey-patches task to set
    #          task.mpirun = cmd
    #
    # Returns Job class or None
    # Raises StopIteration when no runnable jobs are left.
    # Cannot return None when provided the whole machine.
    #
    # M         : Machine -- machine availability status
    # time      : float   -- minutes available now, upper
    #                        limit on returned job's time
    def get_task(self, M, time):
        tmp = [] # temporarily removed tasks
        for i in range(len(self.l)):
            task = self.pop()
            if task.time > time: # won't complete
                print("Skipping %s (requires %.0f minutes, but have only %.0f)"
                     % (task, task.time, time) )
                continue

            cmd = M.alloc(task.resource)
            if cmd is not None:
                break # next task matches requirements
            if M.possible(task.resource):
                tmp.append(task) # maybe can do later
        else:
            if len(tmp) == 0:
                raise StopIteration

        for j in tmp: # contains all jobs for which cmd == None
            self.push(j)
        if cmd is None:
            return None
        task.mpirun = cmd
        return task
Loading