Commit 5e7ffcba authored by David M. Rogers's avatar David M. Rogers
Browse files

Created DAG and HEFT queue functionality.

parent 53c700ab
Loading
Loading
Loading
Loading
+57 −89
Original line number Diff line number Diff line
@@ -3,112 +3,83 @@ Simple file-based job manager

Simulation Campaigns are listed make-style, one-by-one in a flat yaml-file::

    Cov1n: &CovRun
      jobtype: mdrun
      dirname: /gpfs/alpine/proj-shared/bip200/Cov/Cov1/Norest
      inp:
        tpr: run.tpr
    --- make.yaml
    proj1: &defaults
      param: 1
      dirname: Proj1
      nodes: 500
      out:
        xtc: run.xtc
        edr: run.edr
        log: run.log
        cpt: run.cpt
        conf: run.gro
      retry:
        checkpoint:
          - tpr
          - cpt
        complete:
          - "grep -q 'Statistics over 50000001 steps' {out[log]}"

    Cov2r:
      <<: *CovRun
      dirname: /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Wrest

    Cov2n:
      <<: *CovRun
      dirname: /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Norest
        data: grid.hd5

    proj2:
      param: 2
      dirname: Proj2
      <<: *defaults

Job types contain run-scripts and list input/output files::

    mdrun:
      jobtype: Restartable
    --- jobtypes.yaml
    sim_flow:
      time: 60 # minutes
      inp:
        tpr: topol.tpr
        params: parameter_file.txt
      out:
        log: run.log
        edr: run.edr
        conf: run.gro
        xtc: run.xtc
        cpt: run.cpt
      nodes: 2
      preroll: 6 # number of jobs to stack in the queue (this makes 12 h)
      time: "2:00"
        data: grid.hd5
      setup: |
        module load gcc spectrum-mpi gromacs/2020-rdtscp_off
        export GMX_MAXBACKUP=-1
        module load gcc spectrum-mpi
        export OMP_NUM_THREADS=7
        GMX=gmx_mpi
      script: |
        jsrun -n {nodes} -l gpu-cpu -a 6 -g 6 -c 42 -d plane:6 -b packed:7 \
                 $GMX mdrun -s {inp[tpr]} -g {out[log]} -e {out[edr]} \
                            -c {out[conf]} -x {out[xtc]} -cpo {out[cpt]} \
                            -maxh 2 -pme gpu -npme 1 -nb gpu -bonded gpu -pin off
      retry_script: |
        jsrun -n {nodes} -l gpu-cpu -a 6 -g 6 -c 42 -d plane:6 -b packed:7 \
                 $GMX mdrun -cpi {out[cpt]} \
                            -s {inp[tpr]} -g {out[log]} -e {out[edr]} \
                            -c {out[conf]} -x {out[xtc]} -cpo {out[cpt]} \
                            -maxh 2 -pme gpu -npme 1 -nb gpu -bonded gpu -pin off

Usage is also make-style::

    $ bjobs
    JOBID   USER       STAT   SLOTS    QUEUE       START_TIME    FINISH_TIME   JOB_NAME
    993162  user       PEND      -     batch             -             -       Cov2r.5
    993163  user       PEND      -     batch             -             -       Cov2r.5
    993164  user       PEND      -     batch             -             -       Cov2r.5
    
    
    $ python3 ~/src/flow/make.py -j jobtypes.yaml Cov.yaml BIP200
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov1/Norest/Cov1n.4.out completed successfully.
    Testing: grep -q 'Statistics over 50000001 steps' run.log
    [FAIL]
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov1/Norest/Cov1n.4.out requires restart (failed test: grep -q 'Statistics over 50000001 steps' run.log).
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov1/Norest/Cov1n.5.sh has been created (probably running).
    
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Wrest/Cov2r.4.out completed successfully.
    Testing: grep -q 'Statistics over 50000001 steps' run.log
    [FAIL]
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Wrest/Cov2r.4.out requires restart (failed test: grep -q 'Statistics over 50000001 steps' run.log).
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Wrest/Cov2r.5.sh has been created (probably running).
    
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Norest/Cov2n.4.out completed successfully.
    Testing: grep -q 'Statistics over 50000001 steps' run.log
    [FAIL]
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Norest/Cov2n.4.out requires restart (failed test: grep -q 'Statistics over 50000001 steps' run.log).
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Norest/Cov2n.5.sh has been created (probably running).
                 sim_flow {inp[params} {out[data]} >{out[log]}

The jobs are used within an allocation and make use of all available resources in parallel::

    #!/bin/bash
    #BSUB -P SPY007
    #BSUB -W 60
    #BSUB -nnodes 1000
    #BSUB -J AllFlow

    module load python/3
    python3 make.py jobtypes.yaml make.yaml

Getting Started
---------------

Start by running just as you normally would.  When you get a good jobscript
going, transform it into an entry in `jobtypes.yaml` and point to it from
your own `make.yaml`.  For help with syntax, see the `examples` subdirectory.

The source, `jobtypes.py`, contains 4 basic types of jobs for now,
`Local`, `Simple`, `Restartable`, and `Chain`.  It should produce error
messages if you get the names wrong.  Contact me if the error messages are not
Start by running jobs without this tool.  When you get a good workflow
going, transform it into an entry in `jobtypes.yaml`.
When you get to the point where you're listing outputs you'd like
to make, fill them into your own `make.yaml`.
For help with syntax, see the `examples` subdirectory.

There are 2 important things to keep in mind for interacting with
the scheduler.  Every job must contain keys for `nodes` and `time` (in minutes).
It is possible to choose your allocation poorly, and try running 2 jobs
requiring 500 nodes on 999 nodes, in which case 499 nodes will sit idle.
It is also possible to run out of time to complete all jobs.

I'm working to create a simulator for your
workflow to help select the optimal number of nodes and allocation time
to avoid the first kind of problem.
I'm working to add a timer to `make.py` to detect the second kind of
problem and exit early.  Subsequent invocations of `make.py` should
then continue where the work left off.

Other than reserved variable names, any key can be present in your yaml file.
These will be available for substitution into the job's `script`
(see Substitutions_ below).
There should also be helpful error messages if you get the syntax of yaml wrong.
Contact me if the error messages are not
helpful enough to solve your problems, or if you have new ideas on functionality.

Substitutions
-------------

Before `make.py` runs jobs, it generates jobscripts from your yaml.
The `environment`, `script` and `retry_script` variables of each job
As `make.py` runs jobs, it generates jobscripts from your yaml.
The `setup` and `script` variables of each job
go through substitution at creation time for each script.
In the case of a `Restartable` jobtype, the `retry_script`
and `complete` tests are also substituted.

Specifically, the python format() function is invoked.
This means ordinary brackets, `{}`, need to be written as
@@ -116,9 +87,6 @@ double-brackets, `{{}}`. Also, it means you can use the
variables defined in the yaml itself in the script.

In addition to those variables, you also have access to
everything defined the `reqs` dictionary of jobtypes.py.
Finally, every project also has:

    proj = the project ID input to `make.py`
    jobname = the job name itself (top-level header for the job)
everything defined the `reqs` dictionary of the `Job` class
(see `jobtypes.py`).
+8 −25
Original line number Diff line number Diff line
# This example shows a chain workflow that composes
# This example shows a 2-step workflow that composes
# a grompp step together with an mdrun.
# The grompp has jobtype Local, and completes immediately,
# while the mdrun is Restartable, and pre-rolls.
TestRun:
  jobtype: Chain
  dirname: /gpfs/alpine/proj-shared/myproj/Simulation
  inp:
    mdp: "run.mdp"
    top: topol.top
    conf: start.pdb
  chain:
    - grompp:
        out:
          tpr: run.tpr
    - mdrun:
        preroll: 2
  dirname: Simulation
  out:
      xtc: run.xtc
      edr: run.edr
      log: run.log
      cpt: run.cpt
      conf: run.gro
        retry:
          checkpoint: [cpt]
          complete:
            - "grep -q 'Statistics over 5000001 steps' {out[log]}"
+8 −18
Original line number Diff line number Diff line
# These jobtypes are make-rules that know
# how to generate a group of output files from a group of input files.
# Classic `make` permits 1 output file per rule.
grompp:
  jobtype: Local
  time: 10.0
  nodes: 1
  inp:
    mdp: grompp.mdp
    top: topol.top
@@ -15,7 +19,8 @@ grompp:
    $GMX grompp -f {inp[mdp]} -p {inp[top]} -c {inp[conf]} -o {out[tpr]} -maxwarn 99

mdrun:
  jobtype: Restartable
  nodes: 2
  time: 120.0
  inp:
    tpr: topol.tpr
  out:
@@ -24,20 +29,12 @@ mdrun:
    conf: run.gro
    xtc: run.xtc
    cpt: run.cpt
  nodes: 2
  preroll: 6 # number of jobs to stack in the queue (this makes 12 h)
  time: "2:00"
  setup: |
    module load gcc spectrum-mpi gromacs/2020-rdtscp_off
    export GMX_MAXBACKUP=-1
    export OMP_NUM_THREADS=7
    GMX=gmx_mpi
  script: |
    jsrun -n {nodes} -l gpu-cpu -a 6 -g 6 -c 42 -d plane:6 -b packed:7 \
             $GMX mdrun -s {inp[tpr]} -g {out[log]} -e {out[edr]} \
                        -c {out[conf]} -x {out[xtc]} -cpo {out[cpt]} \
                        -maxh 2 -pme gpu -npme 1 -nb gpu -bonded gpu -pin off
  retry_script: |
    jsrun -n {nodes} -l gpu-cpu -a 6 -g 6 -c 42 -d plane:6 -b packed:7 \
             $GMX mdrun -cpi {out[cpt]} \
                        -s {inp[tpr]} -g {out[log]} -e {out[edr]} \
@@ -46,6 +43,7 @@ mdrun:

# WARNING: this one uses the same output name throughout.
gmx_remd:
  time: 120.0
  jobtype: Restartable
  inp:
    tpr: remd_00/remd.tpr
@@ -55,20 +53,12 @@ gmx_remd:
    trr: remd_00/remd.trr
    cpt: remd_00/remd.cpt
    run_log: run.log
  preroll: 6 # number of jobs to stack in the queue (this makes 12 h)
  time: "2:00"
  setup: |
    module load gcc spectrum-mpi gromacs/2020-rdtscp_off
    export GMX_MAXBACKUP=-1
    export OMP_NUM_THREADS=7
    GMX=gmx_mpi
  script: |
    jsrun -X 1 -n {nodes} -c 42 -a 6 -g 6 -d plane:6 -b packed:7 \
        $GMX mdrun -deffnm `basename {out[log]} .log` -multidir `ls -d */` \
        -pme gpu -npme 1 -noconfout -nb gpu -bonded gpu -pin off \
        -cpt 1 -maxh 1.95 -replex 1000 &>{out[run_log]}
  retry_script: |
    echo "Starting retry step {jobid}" >>{out[run_log]}
    jsrun -X 1 -n {nodes} -c 42 -a 6 -g 6 -d plane:6 -b packed:7 \
        $GMX mdrun -deffnm `basename {out[log]} .log` -multidir `ls -d */` \
        -pme gpu -npme 1 -noconfout -nb gpu -bonded gpu -pin off \
+15 −40
Original line number Diff line number Diff line
import re
import subprocess
import os.path
from pathlib import Path
import codecs
from os import listdir

from colorama import Fore, Back, Style 

class TargetError(Exception):
    pass

def str_err(msg):
    return Fore.RED + str(msg) + Style.RESET_ALL

@@ -48,27 +51,11 @@ def queue_job(script, dep=None):
    print(str_err("Error launching job %s!"%script))
    return -1

def last_job(dirname, jobname):
    # Determine the highest-numbered jobname.{jobid}.out file.
    maxjob = -1
    for f in listdir(dirname):
        tok = f.rsplit('.', 3)
        if len(tok) != 3 or tok[0] != jobname or tok[2] != 'out':
            continue
        i = int(tok[1])
        if i > maxjob:
            maxjob = i
    return JobState(dirname, jobname, maxjob)

def jobfile_name(dirname, jobname, jobid, ext):
    assert jobid >= 0
    return os.path.join(dirname, jobname + '.' + str(jobid) + '.' + ext)
def jobfile(dirname, jobname, ext):
    return dirname / (jobname + '.' + ext)

def has_file(dirname, fname):
    return os.path.isfile( os.path.join(dirname, fname) )

def isdir(dirname):
    return os.path.isdir(dirname)
    return (dirname / fname).exists()

def dict_merge(dct, merge_dct):
    """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
@@ -97,43 +84,31 @@ def grep(fname, exp):
            return True
    return False

# Possible state list:
class JobState:
    def __init__(self, dirname, jobname, jobid):
    def __init__(self, dirname, jobname):
        self.dirname = dirname
        self.jobname = jobname
        self.jobid   = jobid
        self.err = False
        if jobid >= 0:
            self.job_in  = jobfile_name(dirname, jobname, jobid, 'sh')
            self.started = os.path.isfile(self.job_in)
            self.job_out = jobfile_name(dirname, jobname, jobid, 'out')
            self.done = os.path.isfile(self.job_out) \
			and grep(self.job_out, r'The output \(if any\) is above this job summary.')
            if self.done:
                self.err = grep(self.job_out, r'MPI_ABORT was invoked')
        else:
            self.done = True
        self.done = False
        self.job_in  = jobfile(dirname, jobname, 'sh')
        self.job_out = jobfile(dirname, jobname, 'out')

    def __str__(self):
        if self.jobid < 0:
            return str_ok("New project")
        if self.err:
            with open(self.job_out) as f:
                head = "".join(f.readlines()[:25])
            return str_err("Job %s resulted in error:\n"%self.job_out) + head
        if self.done:
            return str_ok("Job %s completed successfully."%self.job_out)
        if self.started:
            return str_warn("Job %s has been created (probably running)."%self.job_in)
        return str_warn("Blank job %s/%s.%d"%(self.dirname, self.jobname, self.jobid))
        return str_warn("Blank job %s"%(self.dirname / self.jobname))
    def file_paths(self, fnames, verb=False):
        # Get the file paths from this job's dir.
        # returns (True/False, {key:path}), where True == all paths present
        ok = True
        paths = {}
        for key, name in fnames.items():
            path = os.path.join(self.dirname, name)
            if (verb or ok) and not os.path.isfile(path):
            path = self.dirname/name
            if (verb or ok) and not path.exists():
                ok = False
                if verb:
                    print(str_err("Job %s is missing file %s."%(self.jobname, path)))
+50 −192
Original line number Diff line number Diff line
@@ -2,15 +2,7 @@
import yaml
from helpers import *

jobscript = """#!/bin/bash
#BSUB -P {proj}
#BSUB -W {time}
#BSUB -alloc_flags "gpumps smt1"
#BSUB -nnodes {nodes}
#BSUB -J {jobname}.{jobid}
#BSUB -o {dirname}/{jobname}.{jobid}.out
#BSUB -e {dirname}/{jobname}.{jobid}.out

jobscript = """
cd {dirname}
{setup}
%s
@@ -22,27 +14,37 @@ cd {dirname}
# "error"   == error detected
# "done"    == completed job

class Simple:
    reqs = { 'proj'    : str,
             'dirname' : str,
class Job:
    reqs = { 'dirname' : Path,
             'jobname' : str,
             'time'    : str,
             'nodes'   : int,
             'time'    : float,
             'inp'     : dict,
             'out'     : dict,
             'nodes'   : int,
             'setup'   : str,
             'script'  : str
           }
    inp = {}
    setup = ""

    def __init__(self, proj, jobname, kws): # kws : {}
        self.proj    = proj
        self.jobname = jobname
    def __init__(self, kws): # kws : {}
        self.typecheck(kws)
    def __repr__(self): # for uniqueness, we can ignore other params
        return "Job(%s, {'dirname': '%s'})"%(self.jobname, self.dirname)
    def __lt__(a, b):
        return repr(a) < repr(b)
    def __gt__(a, b):
        return repr(a) > repr(b)

    def typecheck(self, kws):
        for uniq in ['inp', 'out']: # non-overridable by args
            assert uniq not in kws, "Invalid input."

        # Fill-in from kws and type-check class vars
        for var, cls in self.reqs.items():
            if var in kws: # initialize from yaml data
            if hasattr(self, var): # vars in jobs take precedence
                pass
            elif var in kws: # initialize from yaml data
                val = kws[var]
                if isinstance(val, dict) and hasattr(self, var) \
                         and isinstance(getattr(self,var), dict): # merge together
@@ -53,213 +55,69 @@ class Simple:
                        dict_merge( getattr(self,var), val )
                    else:
                        setattr(self, var, val)
            elif hasattr(self, var):
                pass
            else:   
                raise SyntaxError("Job %s requires variable '%s' (type %s)"%(self.jobname, var, str(cls)))
            if not isinstance(getattr(self,var), cls):
                raise SyntaxError("Job %s requires variable '%s' to have type %s (incorrect type)"%(self.jobname, var, str(cls)))

        assert isdir(self.dirname), "Job directory (%s) not present."%self.dirname
        assert self.dirname.is_dir, "Job directory (%s) not present."%self.dirname

    def format(self, script, jobid):
    def format(self, script):
        args = {}
        for var in self.reqs.keys():
            args[var] = getattr(self, var)
        if jobid is not None:
            args['jobid'] = jobid
        return script.format(**args)

    # Queries files to determine current state of the job.
    # Returns True if there is no more work to be done.
    def get_state(self):
        self.last_job = last_job(self.dirname, self.jobname)
        print(self.last_job)
        if not self.last_job.done: # can't continue
            return "started"

        if self.last_job.jobid < 0: # not started
            return "none" # fresh start
        if self.last_job.err:
            return "error"
        self.last_job.output_paths(self.out) # grok outputs

        return self.is_complete() # retry

    def is_complete(self):
        # All outputs must be present.
        if self.last_job.all_outputs_present:
            return "done"
        print(str_err("Job %s is complete, but missing output files!", self.last_job.job_out))
        return "error"

    def setup_job(self):
        self.next_job.input_paths(self.inp) # grok inputs
        if not self.next_job.all_inputs_present:
            print(str_err("Not all inputs for %s present.", self.jobname))
            return True
        with open(self.next_job.job_in,'w') as f:
            f.write(self.format(jobscript % self.script, self.next_job.jobid))
            f.write(self.format(jobscript % self.script))
        return False

    # run returns True on success and False on error
    def run(self, testonly=False):
        """ Check on current state and submit next job if necessary.
        """
        state = self.get_state()
        if state is not "none": # no more work
            return state

        # Create next job and check whether it's 'sh' is present.
        self.next_job = JobState(self.dirname, self.jobname, self.last_job.jobid+1)
        if self.next_job.started:
            print(self.next_job)
            return "started"
        self.next_job = JobState(self.dirname, self.jobname)

        if testonly:
            return "none"
        if self.setup_job(): # error creating jobscript
            return "error"
        job_n = queue_job(self.next_job.job_in)
        if job_n < 0:
            return "error"
        return job_n

class Local(Simple):
    time = "0:10"
    nodes = 1
    def run(self, testonly=False):
        state = self.get_state()
        if state is not "none": # no more work
            return state

        # Create next job and check whether it's 'sh' is present.
        self.next_job = JobState(self.dirname, self.jobname, self.last_job.jobid+1)
        if self.next_job.started:
            print(self.next_job)
            return "started"

        if testonly:
            return "none"
            print("Would run " + str(self.next_job))
            return True
        if self.setup_job(): # error creating jobscript
            return "error"
            return False
        if self.next_job.exec():
            return "error"
        return "done"

class Restartable(Simple):
    def __init__(self, proj, jobname, kws):
        self.reqs = self.reqs.copy()
        self.reqs.update({
                      'retry'   : dict,
                      'preroll'  : int,
                      'retry_script'  : str
                      })
        Simple.__init__(self, proj, jobname, kws)

    def is_complete(self):
        # All completeness tests must be met,
        for exp in self.retry['complete']:
            #if not grep(self.last_job.out[name], exp): self.retry['complete'] # requires retry be a dict (self.retry.items())
            exp = self.format(exp, self.last_job.jobid)
            if check_exit(exp, self.dirname): # WARNING: executing arbitrary shell string here.
                print(str_warn("Job %s requires restart (failed test: %s)." % (self.last_job.job_out, exp)))
                return "none"
        # and all outputs must be present.
        return Simple.is_complete(self)

    def setup_retry(self):
        for oname in self.retry['checkpoint']:
            fname = self.last_job.out[oname]
            if not has_file(self.dirname, fname):
                print(str_err("Checkpoint file '%s' not present.", fname))
                return True
        with open(self.next_job.job_in,'w') as f:
            f.write(self.format(jobscript % self.retry_script, self.next_job.jobid))
            return False
        return True

    def run(self, testonly=False):
        """ Check on current state and submit next job if necessary.
        """
        state = self.get_state()
        if state is not "none": # no more work
            return state

        # Create next job and check whether it's 'sh' is present.
        self.next_job = JobState(self.dirname, self.jobname, self.last_job.jobid+1)
        if self.next_job.started:
            print(self.next_job)
            return "started"

        if testonly:
            return "none"
        if self.next_job.jobid == 0: # initial run
            mk = self.setup_job
        else:
            mk = self.setup_retry

        # Create and run next `preroll` job(s).
        job_n = None
        for jobid in range(self.next_job.jobid, self.next_job.jobid+self.preroll):
            if mk(): # error creating jobscript
                return "error"
            job_n = queue_job(self.next_job.job_in, job_n)
            if job_n < 0: # error queueing job
                break

            mk = self.setup_retry
            self.next_job = JobState(self.dirname, self.jobname, jobid+1)

        if job_n < 0:
            return "error"
        return job_n

class Chain(Simple):
    time = "0:00"
    nodes = 1
    setup = ""
    script = ""
    out = {}
    def __init__(self, proj, jobname, kws):
        self.reqs = self.reqs.copy()
        self.reqs.update({
                      'chain'   : list
                      })
        Simple.__init__(self, proj, jobname, kws)
        #self.out = self.chain[-1][1]['out']

    def run(self, testonly=False):
        """ Check on current state and submit next job if necessary.
        """
        inp = self.inp
        for i, (job,args) in enumerate(self.chain):
            if 'dirname' not in args:
                args['dirname'] = self.dirname
            z = job(self.proj, "%s.%d"%(self.jobname,i), args)
            z.inp.update(inp) # connect inputs and outputs
            state = z.run(testonly)
            inp = z.out
            if isinstance(state, int):
                return "started"
            if state is not "done":
                return state
        self.out = inp
        return "done"

# extend the current file's defaults with yaml-defined jobtypes
# Mapping from rule name to implementation class
#  lookup : fname -> (rule, ftype)
class Jobtypes(dict):
    def mk_lookup(self): # create a lookup table
        self.lookup = {}
        for rule, cls in self.items():
            for ftype, fname in cls.out.items():
                if fname in self.lookup:
                    jt,ft = self.lookup[fname]
                    raise TargetError("Duplicate target (%s: %s) declared by %s.\n"%(ftype,fname,rule) +
                             "  conflicts with earlier target (%s: %s) declared by %s.\n"%(ft,fname,jt))
                self.lookup[fname] = (rule, ftype)

# Read yaml-defined jobtypes.
def read_jobtypes(filename):
    types = { 'Local': Local, 'Simple': Simple,
              'Restartable': Restartable, 'Chain': Chain }
    types = Jobtypes()
    if filename is None:
        types.mk_lookup()
        return types

    with open(filename) as f:
        x = yaml.load(f)

    for name, info in x.items():
        bname = info['jobtype']
        info['jobtype'] = name
        base = types[bname]
        types[name] = type(name, (base,), info)
        info['jobname'] = name # name of make-rule
        types[name] = type(name, (Job,), info)

    types.mk_lookup()
    return types
Loading