Commit 011774b2 authored by David M. Rogers's avatar David M. Rogers
Browse files

Initial project.

parents
Loading
Loading
Loading
Loading

README.rst

0 → 100644
+91 −0
Original line number Diff line number Diff line
Simple file-based job manager
=============================

Simulation Campaigns are listed make-style, one-by-one in a flat yaml-file::

    Cov1n: &CovRun
      jobtype: mdrun
      dirname: /gpfs/alpine/proj-shared/bip200/Cov/Cov1/Norest
      inp:
        tpr: run.tpr
      out:
        xtc: run.xtc
        edr: run.edr
        log: run.log
        cpt: run.cpt
        conf: run.gro
      retry:
        checkpoint:
          - tpr
          - cpt
        complete:
          - "grep -q 'Statistics over 50000001 steps' {out[log]}"

    Cov2r:
      <<: *CovRun
      dirname: /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Wrest

    Cov2n:
      <<: *CovRun
      dirname: /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Norest

Job types contain run-scripts and list input/output files::

    mdrun:
      jobtype: Restartable
      inp:
        tpr: topol.tpr
      out:
        log: run.log
        edr: run.edr
        conf: run.gro
        xtc: run.xtc
        cpt: run.cpt
      nodes: 2
      preroll: 6 # number of jobs to stack in the queue (this makes 12 h)
      time: "2:00"
      setup: |
        module load gcc spectrum-mpi gromacs/2020-rdtscp_off
        export GMX_MAXBACKUP=-1
        export OMP_NUM_THREADS=7
        GMX=gmx_mpi
      script: |
        jsrun -n {nodes} -l gpu-cpu -a 6 -g 6 -c 42 -d plane:6 -b packed:7 \
                 $GMX mdrun -s {inp[tpr]} -g {out[log]} -e {out[edr]} \
                            -c {out[conf]} -x {out[xtc]} -cpo {out[cpt]} \
                            -maxh 2 -pme gpu -npme 1 -nb gpu -bonded gpu -pin off
      retry_script: |
        jsrun -n {nodes} -l gpu-cpu -a 6 -g 6 -c 42 -d plane:6 -b packed:7 \
                 $GMX mdrun -cpi {out[cpt]} \
                            -s {inp[tpr]} -g {out[log]} -e {out[edr]} \
                            -c {out[conf]} -x {out[xtc]} -cpo {out[cpt]} \
                            -maxh 2 -pme gpu -npme 1 -nb gpu -bonded gpu -pin off

Usage is also make-style::

    $ bjobs
    JOBID   USER       STAT   SLOTS    QUEUE       START_TIME    FINISH_TIME   JOB_NAME
    993162  user       PEND      -     batch             -             -       Cov2r.5
    993163  user       PEND      -     batch             -             -       Cov2r.5
    993164  user       PEND      -     batch             -             -       Cov2r.5
    
    
    $ python3 ~/src/flow/make.py -j jobtypes.yaml Cov.yaml BIP200
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov1/Norest/Cov1n.4.out completed successfully.
    Testing: grep -q 'Statistics over 50000001 steps' run.log
    [FAIL]
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov1/Norest/Cov1n.4.out requires restart (failed test: grep -q 'Statistics over 50000001 steps' run.log).
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov1/Norest/Cov1n.5.sh has been created (probably running).
    
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Wrest/Cov2r.4.out completed successfully.
    Testing: grep -q 'Statistics over 50000001 steps' run.log
    [FAIL]
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Wrest/Cov2r.4.out requires restart (failed test: grep -q 'Statistics over 50000001 steps' run.log).
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Wrest/Cov2r.5.sh has been created (probably running).
    
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Norest/Cov2n.4.out completed successfully.
    Testing: grep -q 'Statistics over 50000001 steps' run.log
    [FAIL]
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Norest/Cov2n.4.out requires restart (failed test: grep -q 'Statistics over 50000001 steps' run.log).
    Job /gpfs/alpine/proj-shared/bip200/Cov/Cov2/Norest/Cov2n.5.sh has been created (probably running).

helpers.py

0 → 100644
+137 −0
Original line number Diff line number Diff line
import re
import subprocess
import os.path
import codecs
from os import listdir

from colorama import Fore, Back, Style 

def str_err(msg):
    return Fore.RED + str(msg) + Style.RESET_ALL

def str_warn(msg):
    return Fore.YELLOW + str(msg) + Style.RESET_ALL

def str_ok(msg):
    return Fore.GREEN + str(msg) + Style.RESET_ALL

# raises an exception on nonzero return
def run_cmd(*args):
    print(str_ok("Running: " + " ".join(args)))
    out = subprocess.check_output(args)
    out = out.decode("utf-8")
    print(out)
    return out

def check_exit(cmd, cwd=None):
    print(str_ok("Testing: %s"%cmd))
    r = bool(subprocess.call(cmd, shell=True, cwd=cwd))
    if r:
        print(str_err("[FAIL]"))
    else:
        print(str_ok("[PASS]"))
    return r

# Queue up a job and return its jobid.
def queue_job(script, dep=None):
    if dep is None:
       job = ["bsub", script]
    else:
       job = ["bsub", "-w", "done(%d)"%dep, script]
    out = run_cmd(*job)
    # regexp search n in out
    exp = re.compile(r'Job <([0-9]+)> is submitted to')
    for line in out.split('\n'):
        m = exp.match(line)
        if m:
            return int(m[1])
    print(str_err("Error launching job %s!"%script))
    return -1

def last_job(dirname, jobname):
    # Determine the highest-numbered jobname.{jobid}.out file.
    maxjob = -1
    for f in listdir(dirname):
        tok = f.split('.')
        if len(tok) != 3 or tok[0] != jobname or tok[2] != 'out':
            continue
        i = int(tok[1])
        if i > maxjob:
            maxjob = i
    return JobState(dirname, jobname, maxjob)

def jobfile_name(dirname, jobname, jobid, ext):
    assert jobid >= 0
    return os.path.join(dirname, jobname + '.' + str(jobid) + '.' + ext)

def has_file(dirname, fname):
    return os.path.isfile( os.path.join(dirname, fname) )

def isdir(dirname):
    return os.path.isdir(dirname)

# Check for an MPI error in this file
def grep(fname, exp):
    err = re.compile(exp)
    with codecs.open(fname, encoding='utf-8') as f:
        if err.search(f.read()):
            return True
    return False

# Possible state list:
class JobState:
    def __init__(self, dirname, jobname, jobid):
        self.dirname = dirname
        self.jobname = jobname
        self.jobid   = jobid
        self.err = False
        if jobid >= 0:
            self.job_in  = jobfile_name(dirname, jobname, jobid, 'sh')
            self.started = os.path.isfile(self.job_in)
            self.job_out = jobfile_name(dirname, jobname, jobid, 'out')
            self.done = os.path.isfile(self.job_out) \
			and grep(self.job_out, r'The output \(if any\) is above this job summary.')
            if self.done:
                self.err = grep(self.job_out, r'MPI_ABORT was invoked')
        else:
            self.done = True
    def __str__(self):
        if self.jobid < 0:
            return str_ok("New project")
        if self.err:
            with open(self.job_out) as f:
                head = "".join(f.readlines()[:25])
            return str_err("Job %s resulted in error:\n"%self.job_out) + head
        if self.done:
            return str_ok("Job %s completed successfully."%self.job_out)
        if self.started:
            return str_warn("Job %s has been created (probably running)."%self.job_in)
        return str_warn("Blank job %s/%s.%d"%(self.dirname, self.jobname, self.jobid))
    def file_paths(self, fnames, verb=False):
        # Get the file paths from this job's dir.
        # returns (True/False, {key:path}), where True == all paths present
        ok = True
        paths = {}
        for key, name in fnames.items():
            path = os.path.join(self.dirname, name)
            if (verb or ok) and not os.path.isfile(path):
                ok = False
                if verb:
                    print(str_err("Job %s is missing file %s."%(self.jobname, path)))
            paths[key] = path
        return ok, paths
    def input_paths(self, fnames):
        self.all_inputs_present, self.inp = self.file_paths(fnames, True)
    def output_paths(self, fnames):
        self.all_outputs_present, self.out = self.file_paths(fnames, False)

    # Directly run a jobscript
    def exec(self):
        print(str_ok("Running: %s"%self.job_in))
        r = bool(subprocess.call("bash %s >%s"%(self.job_in, self.job_out), shell=True, cwd=self.dirname))
        if r:
            print(str_err("[FAIL]"))
        else:
            print(str_ok("[PASS]"))
        return r

jobtypes.py

0 → 100644
+200 −0
Original line number Diff line number Diff line

import yaml
from helpers import *

jobscript = """#!/bin/bash
#BSUB -P {proj}
#BSUB -W {time}
#BSUB -alloc_flags "gpumps smt1"
#BSUB -nnodes {nodes}
#BSUB -J {jobname}.{jobid}
#BSUB -o {dirname}/{jobname}.{jobid}.out
#BSUB -e {dirname}/{jobname}.{jobid}.out

cd {dirname}
{setup}
%s
"""

class Simple:
    def __init__(self, proj, jobname, kws): # kws : {}
        self.proj    = proj
        self.jobname = jobname
        self.reqs = { 'proj'    : str,
                      'dirname' : str,
                      'jobname' : str,
                      'time'    : str,
                      'inp'     : dict,
                      'out'     : dict,
                      'nodes'   : int,
                      'setup'   : str,
                      'script'  : str
                    }
        self.typecheck(kws)

    def typecheck(self, kws):
        # Fill-in from kws and type-check class vars
        for var, cls in self.reqs.items():
            if var in kws:
                setattr(self, var, kws[var])
            elif hasattr(self, var): # can be over-written by the jobscript (not recommended)
                pass
            else:   
                raise SyntaxError("Job requires variable '%s' (type %s)"%(var, str(cls)))
            if not isinstance(getattr(self,var), cls):
                raise SyntaxError("Job requires variable '%s' to have type %s (incorrect type)"%(var, str(cls)))

        assert isdir(self.dirname), "Job directory (%s) not present."%self.dirname

    def format(self, script, jobid):
        args = {}
        for var in self.reqs.keys():
            args[var] = getattr(self, var)
        if jobid is not None:
            args['jobid'] = jobid
        return script.format(**args)

    # Queries files to determine current state of the job.
    # Returns True if there is no more work to be done.
    def get_state(self):
        self.last_job = last_job(self.dirname, self.jobname)
        print(self.last_job)
        if not self.last_job.done: # can't continue
            return True

        if self.last_job.jobid < 0: # not started
            return False # fresh start
        if self.last_job.err:
            return True
        self.last_job.output_paths(self.out) # grok outputs

        return self.is_complete() # retry

    def is_complete(self):
        # All outputs must be present.
        if self.last_job.all_outputs_present:
            return True
        print(str_err("Job %s is complete, but missing output files!", self.last_job.job_out))
        return True

    def setup_job(self):
        self.next_job.input_paths(self.inp) # grok inputs
        if not self.next_job.all_inputs_present:
            print(str_err("Not all inputs for %s present.", self.jobname))
            return True
        with open(self.next_job.job_in,'w') as f:
            f.write(self.format(jobscript % self.script, self.next_job.jobid))
        return False

    def run(self):
        """ Check on current state and submit next job if necessary.
            returns True if an error prevents progress.
        """
        if self.get_state(): # no more work
            return self.last_job.err

        # Create next job and check whether it's 'sh' is present.
        self.next_job = JobState(self.dirname, self.jobname, self.last_job.jobid+1)
        if self.next_job.started:
            print(self.next_job)
            return True

        if self.setup_job(): # error creating jobscript
            return True
        job_n = queue_job(self.next_job.job_in)
        return job_n >= 0

class Local(Simple):
    def run(self):
        if self.get_state(): # no more work
            return self.last_job.err

        # Create next job and check whether it's 'sh' is present.
        self.next_job = JobState(self.dirname, self.jobname, self.last_job.jobid+1)
        if self.next_job.started:
            print(self.next_job)
            return True

        if self.setup_job(): # error creating jobscript
            return True
        return self.next_job.exec()

class Restartable(Simple):
    def __init__(self, proj, jobname, kws):
        Simple.__init__(self, proj, jobname, kws)
        self.reqs.update({
                      'retry'   : dict,
                      'preroll'  : int,
                      'retry_script'  : str
                      })
        self.typecheck(kws)

    def is_complete(self):
        # All completeness tests must be met,
        for exp in self.retry['complete']:
            #if not grep(self.last_job.out[name], exp): self.retry['complete'] # requires retry be a dict (self.retry.items())
            exp = self.format(exp, self.last_job.jobid)
            if check_exit(exp, self.dirname): # WARNING: executing arbitrary shell string here.
                print(str_warn("Job %s requires restart (failed test: %s)." % (self.last_job.job_out, exp)))
                return False
        # and all outputs must be present.
        return Simple.is_complete(self)

    def setup_retry(self):
        for oname in self.retry['checkpoint']:
            fname = self.last_job.out[oname]
            if not has_file(self.dirname, fname):
                print(str_err("Checkpoint file '%s' not present.", fname))
                return True
        with open(self.next_job.job_in,'w') as f:
            f.write(self.format(jobscript % self.retry_script, self.next_job.jobid))
        return False

    def run(self):
        """ Check on current state and submit next job if necessary.
            returns True if an error prevents progress.
        """
        if self.get_state(): # no more work
            return self.last_job.err

        # Create next job and check whether it's 'sh' is present.
        self.next_job = JobState(self.dirname, self.jobname, self.last_job.jobid+1)
        if self.next_job.started:
            print(self.next_job)
            return True

        if self.next_job.jobid == 0: # initial run
            mk = self.setup_job
        else:
            mk = self.setup_retry

        # Create and run next `preroll` job(s).
        job_n = None
        for jobid in range(self.next_job.jobid, self.next_job.jobid+self.preroll):
            if mk(): # error creating jobscript
                return True
            job_n = queue_job(self.next_job.job_in, job_n)
            if job_n < 0: # error queueing job
                return True

            mk = self.setup_retry
            self.next_job = JobState(self.dirname, self.jobname, jobid+1)

        return job_n >= 0

# extend the current file's defaults with yaml-defined jobtypes
def read_jobtypes(filename):
    types = { 'Local': Local, 'Simple': Simple, 'Restartable': Restartable }
    if filename is None:
        return types

    with open(filename) as f:
        x = yaml.load(f)

    for name, info in x.items():
        bname = info['jobtype']
        base = types[bname]
        types[name] = type(name, (base,), info)

    return types

make.py

0 → 100644
+37 −0
Original line number Diff line number Diff line
#!/usr/bin/env python3
#
# TODO:
#  yaml input validation:
#      - make sure retry/checkpoint is a list of names from outputs section
#      - error on unknown keys in job spec

import yaml
from jobtypes import read_jobtypes

def main(argv):
    jtfile = None
    if len(argv) > 2 and argv[1] == "-j":
        jtfile = argv[2]
        del argv[1:3]
    types = read_jobtypes(jtfile)

    assert len(argv) == 3, "Usage: %s [-j jobtypes.yaml] <jobs.yaml> <proj id>"%argv[0]
    with open(argv[1]) as f:
        jobs = yaml.load(f)
    proj = argv[2]

    for jobname, args in jobs.items():
        try:
            jobtype = args['jobtype']
        except KeyError:
            raise KeyError("Job spec for %s has no jobtype key!"%jobname)
        if jobtype not in types:
            raise KeyError("Unknown jobtype (%s) for job %s!"%(jobtype, jobname))

        job = types[jobtype](proj, jobname, args)
        job.run()

if __name__=="__main__":
    import sys
    main(sys.argv)