Commit bdd55e33 authored by David M. Rogers's avatar David M. Rogers
Browse files

Added Chain jobtype and updated docs.

parent 011774b2
Loading
Loading
Loading
Loading
+12 −0
Original line number Diff line number Diff line
@@ -89,3 +89,15 @@ Usage is also make-style::
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Norest/Cov2n.4.out requires restart (failed test: grep -q 'Statistics over 50000001 steps' run.log).
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Norest/Cov2n.5.sh has been created (probably running).

Getting Started
---------------

Start by running just as you normally would.  When you get a good jobscript
going, transform it into an entry in `jobtypes.yaml` and point to it from
your own `make.yaml`.  For help with syntax, see the `examples` subdirectory.

The source, `jobtypes.py`, contains 4 basic types of jobs for now,
`Local`, `Simple`, `Restartable`, and `Chain`.  It should produce error
messages if you get the names wrong.  Contact me if the error messages are not
helpful enough to solve your problems, or if you have new ideas on functionality.

examples/chain.yaml

0 → 100644
+28 −0
Original line number Diff line number Diff line
# This example shows a chain workflow that composes
# a grompp step together with an mdrun.
# The grompp has jobtype Local, and completes immediately,
# while the mdrun is Restartable, and pre-rolls.
TestRun:
  jobtype: Chain
  dirname: /gpfs/alpine/proj-shared/myproj/Simulation
  inp:
    mdp: "run.mdp"
    top: topol.top
    conf: start.pdb
  chain:
    - grompp:
        out:
          tpr: run.tpr
    - mdrun:
        preroll: 2
        out:
          xtc: run.xtc
          edr: run.edr
          log: run.log
          cpt: run.cpt
          conf: run.gro
        retry:
          checkpoint: [cpt]
          complete:
            - "grep -q 'Statistics over 5000001 steps' {out[log]}"

examples/jobtypes.yaml

0 → 100644
+76 −0
Original line number Diff line number Diff line
grompp:
  jobtype: Local
  inp:
    mdp: grompp.mdp
    top: topol.top
    conf: conf.gro
  out:
    tpr: topol.tpr
  setup: |
    module load gcc gromacs/2020-rdtscp_off-analysis
    export GMX_MAXBACKUP=-1
    export OMP_NUM_THREADS=7
    GMX=gmx
  script: |
    $GMX grompp -f {inp[mdp]} -p {inp[top]} -c {inp[conf]} -o {out[tpr]} -maxwarn 99

mdrun:
  jobtype: Restartable
  inp:
    tpr: topol.tpr
  out:
    log: run.log
    edr: run.edr
    conf: run.gro
    xtc: run.xtc
    cpt: run.cpt
  nodes: 2
  preroll: 6 # number of jobs to stack in the queue (this makes 12 h)
  time: "2:00"
  setup: |
    module load gcc spectrum-mpi gromacs/2020-rdtscp_off
    export GMX_MAXBACKUP=-1
    export OMP_NUM_THREADS=7
    GMX=gmx_mpi
  script: |
    jsrun -n {nodes} -l gpu-cpu -a 6 -g 6 -c 42 -d plane:6 -b packed:7 \
             $GMX mdrun -s {inp[tpr]} -g {out[log]} -e {out[edr]} \
                        -c {out[conf]} -x {out[xtc]} -cpo {out[cpt]} \
                        -maxh 2 -pme gpu -npme 1 -nb gpu -bonded gpu -pin off
  retry_script: |
    jsrun -n {nodes} -l gpu-cpu -a 6 -g 6 -c 42 -d plane:6 -b packed:7 \
             $GMX mdrun -cpi {out[cpt]} \
                        -s {inp[tpr]} -g {out[log]} -e {out[edr]} \
                        -c {out[conf]} -x {out[xtc]} -cpo {out[cpt]} \
                        -maxh 2 -pme gpu -npme 1 -nb gpu -bonded gpu -pin off

# WARNING: this one uses the same output name throughout.
gmx_remd:
  jobtype: Restartable
  inp:
    tpr: remd_00/remd.tpr
  out:
    log: remd_00/remd.log
    edr: remd_00/remd.edr
    trr: remd_00/remd.trr
    cpt: remd_00/remd.cpt
    run_log: run.log
  preroll: 6 # number of jobs to stack in the queue (this makes 12 h)
  time: "2:00"
  setup: |
    module load gcc spectrum-mpi gromacs/2020-rdtscp_off
    export GMX_MAXBACKUP=-1
    export OMP_NUM_THREADS=7
    GMX=gmx_mpi
  script: |
    jsrun -X 1 -n {nodes} -c 42 -a 6 -g 6 -d plane:6 -b packed:7 \
        $GMX mdrun -deffnm `basename {out[log]} .log` -multidir `ls -d */` \
        -pme gpu -npme 1 -noconfout -nb gpu -bonded gpu -pin off \
        -cpt 1 -maxh 1.95 -replex 1000 &>{out[run_log]}
  retry_script: |
    echo "Starting retry step {jobid}" >>{out[run_log]}
    jsrun -X 1 -n {nodes} -c 42 -a 6 -g 6 -d plane:6 -b packed:7 \
        $GMX mdrun -deffnm `basename {out[log]} .log` -multidir `ls -d */` \
        -pme gpu -npme 1 -noconfout -nb gpu -bonded gpu -pin off \
        -cpi remd.cpt -cpt 1 -maxh 1.95 -replex 1000 &>>{out[run_log]}
+17 −1
Original line number Diff line number Diff line
@@ -52,7 +52,7 @@ def last_job(dirname, jobname):
    # Determine the highest-numbered jobname.{jobid}.out file.
    maxjob = -1
    for f in listdir(dirname):
        tok = f.split('.')
        tok = f.rsplit('.', 3)
        if len(tok) != 3 or tok[0] != jobname or tok[2] != 'out':
            continue
        i = int(tok[1])
@@ -70,6 +70,22 @@ def has_file(dirname, fname):
def isdir(dirname):
    return os.path.isdir(dirname)

def dict_merge(dct, merge_dct):
    """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
    updating only top-level keys, dict_merge recurses down into dicts nested
    to an arbitrary depth, updating keys. The ``merge_dct`` is merged into
    ``dct``.
    :param dct: dict onto which the merge is executed
    :param merge_dct: dct merged into dct
    :return: None
    """
    for k, v in merge_dct.items():
        if k in dct and isinstance(dct[k], dict) and isinstance(v, dict):
            dict_merge(dct[k], v)
        else:
            dct[k] = merge_dct[k]


# Check for an MPI error in this file
def grep(fname, exp):
    err = re.compile(exp)
+97 −42
Original line number Diff line number Diff line
@@ -16,11 +16,14 @@ cd {dirname}
%s
"""

# NOTE: potential job return states are
# "none"    == not started
# "started" == in progress
# "error"   == error detected
# "done"    == completed job

class Simple:
    def __init__(self, proj, jobname, kws): # kws : {}
        self.proj    = proj
        self.jobname = jobname
        self.reqs = { 'proj'    : str,
    reqs = { 'proj'    : str,
             'dirname' : str,
             'jobname' : str,
             'time'    : str,
@@ -30,19 +33,28 @@ class Simple:
             'setup'   : str,
             'script'  : str
           }

    def __init__(self, proj, jobname, kws): # kws : {}
        self.proj    = proj
        self.jobname = jobname
        self.typecheck(kws)

    def typecheck(self, kws):
        # Fill-in from kws and type-check class vars
        for var, cls in self.reqs.items():
            if var in kws:
                setattr(self, var, kws[var])
            elif hasattr(self, var): # can be over-written by the jobscript (not recommended)
            if var in kws: # initialize from yaml data
                val = kws[var]
                if isinstance(val, dict) and hasattr(self, var) \
                         and isinstance(getattr(self,var), dict): # merge together
                   dict_merge( getattr(self,var), val ) # recursively merge parameter hierarchies
                else:
                    setattr(self, var, val)
            elif hasattr(self, var):
                pass
            else:   
                raise SyntaxError("Job requires variable '%s' (type %s)"%(var, str(cls)))
                raise SyntaxError("Job %s requires variable '%s' (type %s)"%(self.jobname, var, str(cls)))
            if not isinstance(getattr(self,var), cls):
                raise SyntaxError("Job requires variable '%s' to have type %s (incorrect type)"%(var, str(cls)))
                raise SyntaxError("Job %s requires variable '%s' to have type %s (incorrect type)"%(self.jobname, var, str(cls)))

        assert isdir(self.dirname), "Job directory (%s) not present."%self.dirname

@@ -60,12 +72,12 @@ class Simple:
        self.last_job = last_job(self.dirname, self.jobname)
        print(self.last_job)
        if not self.last_job.done: # can't continue
            return True
            return "started"

        if self.last_job.jobid < 0: # not started
            return False # fresh start
            return "none" # fresh start
        if self.last_job.err:
            return True
            return "error"
        self.last_job.output_paths(self.out) # grok outputs

        return self.is_complete() # retry
@@ -73,9 +85,9 @@ class Simple:
    def is_complete(self):
        # All outputs must be present.
        if self.last_job.all_outputs_present:
            return True
            return "done"
        print(str_err("Job %s is complete, but missing output files!", self.last_job.job_out))
        return True
        return "error"

    def setup_job(self):
        self.next_job.input_paths(self.inp) # grok inputs
@@ -88,46 +100,53 @@ class Simple:

    def run(self):
        """ Check on current state and submit next job if necessary.
            returns True if an error prevents progress.
        """
        if self.get_state(): # no more work
            return self.last_job.err
        state = self.get_state()
        if state is not "none": # no more work
            return state

        # Create next job and check whether it's 'sh' is present.
        self.next_job = JobState(self.dirname, self.jobname, self.last_job.jobid+1)
        if self.next_job.started:
            print(self.next_job)
            return True
            return "started"

        if self.setup_job(): # error creating jobscript
            return True
            return "error"
        job_n = queue_job(self.next_job.job_in)
        return job_n >= 0
        if job_n < 0:
            return "error"
        return job_n

class Local(Simple):
    time = "0:10"
    nodes = 1
    def run(self):
        if self.get_state(): # no more work
            return self.last_job.err
        state = self.get_state()
        if state is not "none": # no more work
            return state

        # Create next job and check whether it's 'sh' is present.
        self.next_job = JobState(self.dirname, self.jobname, self.last_job.jobid+1)
        if self.next_job.started:
            print(self.next_job)
            return True
            return "started"

        if self.setup_job(): # error creating jobscript
            return True
        return self.next_job.exec()
            return "error"
        if self.next_job.exec():
            return "error"
        return "done"

class Restartable(Simple):
    def __init__(self, proj, jobname, kws):
        Simple.__init__(self, proj, jobname, kws)
        self.reqs = self.reqs.copy()
        self.reqs.update({
                      'retry'   : dict,
                      'preroll'  : int,
                      'retry_script'  : str
                      })
        self.typecheck(kws)
        Simple.__init__(self, proj, jobname, kws)

    def is_complete(self):
        # All completeness tests must be met,
@@ -136,7 +155,7 @@ class Restartable(Simple):
            exp = self.format(exp, self.last_job.jobid)
            if check_exit(exp, self.dirname): # WARNING: executing arbitrary shell string here.
                print(str_warn("Job %s requires restart (failed test: %s)." % (self.last_job.job_out, exp)))
                return False
                return "none"
        # and all outputs must be present.
        return Simple.is_complete(self)

@@ -152,16 +171,16 @@ class Restartable(Simple):

    def run(self):
        """ Check on current state and submit next job if necessary.
            returns True if an error prevents progress.
        """
        if self.get_state(): # no more work
            return self.last_job.err
        state = self.get_state()
        if state is not "none": # no more work
            return state

        # Create next job and check whether it's 'sh' is present.
        self.next_job = JobState(self.dirname, self.jobname, self.last_job.jobid+1)
        if self.next_job.started:
            print(self.next_job)
            return True
            return "started"

        if self.next_job.jobid == 0: # initial run
            mk = self.setup_job
@@ -172,19 +191,55 @@ class Restartable(Simple):
        job_n = None
        for jobid in range(self.next_job.jobid, self.next_job.jobid+self.preroll):
            if mk(): # error creating jobscript
                return True
                return "error"
            job_n = queue_job(self.next_job.job_in, job_n)
            if job_n < 0: # error queueing job
                return True
                break

            mk = self.setup_retry
            self.next_job = JobState(self.dirname, self.jobname, jobid+1)

        return job_n >= 0
        if job_n < 0:
            return "error"
        return job_n

class Chain(Simple):
    time = "0:00"
    nodes = 1
    setup = ""
    script = ""
    out = {}
    def __init__(self, proj, jobname, kws):
        self.reqs = self.reqs.copy()
        self.reqs.update({
                      'chain'   : list
                      })
        Simple.__init__(self, proj, jobname, kws)
        #self.out = self.chain[-1][1]['out']

    def run(self):
        """ Check on current state and submit next job if necessary.
        """
        inp = self.inp
        for i, (job,args) in enumerate(self.chain):
            if 'dirname' not in args:
                args['dirname'] = self.dirname
            print(i,job.jobtype,job.reqs)
            z = job(self.proj, "%s.%d"%(self.jobname,i), args)
            z.inp.update(inp) # connect inputs and outputs
            state = z.run()
            inp = z.out
            if isinstance(state, int):
                return "started"
            if state is not "done":
                return state
        self.out = inp
        return "done"

# extend the current file's defaults with yaml-defined jobtypes
def read_jobtypes(filename):
    types = { 'Local': Local, 'Simple': Simple, 'Restartable': Restartable }
    types = { 'Local': Local, 'Simple': Simple,
              'Restartable': Restartable, 'Chain': Chain }
    if filename is None:
        return types

Loading