Commit 6e873740 authored by David M. Rogers's avatar David M. Rogers
Browse files

Renamed Job -> Rule, added tests, implemented file timestamp check and format() on rule dicts.

parent c6e5a4d7
Loading
Loading
Loading
Loading
+0 −0

File moved.

+0 −0

File moved.

+37 −37
Original line number Diff line number Diff line
@@ -2,13 +2,13 @@ import networkx as nx
from heapq import *

from pathlib import Path
from jobtypes import Top
from rules import Top
from helpers import TargetError, str_ok, str_warn, str_err

from functools import reduce

# Key data objects for make:
#   job-class lookup dictionary, jobtype : Job implemention class
#   job-class lookup dictionary, jobtype : Rule implemention class
#     - the types dict
#   job-type lookup dictionary, filename : (creating jobtype, filetype tag)
#     - present in types.lookup dict
@@ -16,38 +16,39 @@ from functools import reduce
#   leaf-set of jobs ready to run

# G       : networkx.DiGraph of the current computation
# types   : dict of jobnames -> implementation classes
# rule    : Job class being currently implemented
# types   : dict of rulenames -> implementation classes
# rule    : Rule instance being currently implemented
# args    : dict of args to pass through to all descendant jobs (includes dirname)
def append_graph(G, types, rule, args):
def append_graph(G, types, rule, args, verb=False):
    addl = [rule] # breadth-first traversal
    while len(addl) > 0:
      rule = addl.pop()
      for ttype,tname in rule.inp.items():
        p = rule.dirname / tname
      for ttype,tname in rule.params['inp'].items():
        p = rule.params['dirname'] / tname
        if p.exists():
            try:
                jobname, ftype = types.lookup[tname]
                rulename, ftype = types.lookup[tname]
            except KeyError:
                continue # no creating job found
            # Check for newer inputs.
            mtime = p.stat().st_mtime
            for t,f in types[jobname](args).inp.items(): # need to re-run
                f = rule.dirname/f
                if f.stat().st_mtime > mtime:
                    print("File %s is newer than %s - re-running %s"%(f, p, jobname))
            for t,f in types[rulename](args).params['inp'].items(): # need to re-run
                f = rule.params['dirname'] / f
                if f.exists() and f.stat().st_mtime > mtime:
                    print("File %s is newer than %s - re-running %s"%(f, p, rulename))
                    break
            else:
            continue
        try:
            jobname, ftype = types.lookup[tname]
            rulename, ftype = types.lookup[tname]
        except KeyError:
            raise TargetError("No rule to make target '%s' needed by '%s'."%(p, rule.jobname))
            raise TargetError("No rule to make target '%s' needed by '%s'."%(p, rule.id))
        if ftype != ttype:
            raise TargetError("Rule %s produces target file '%s' of type '%s' (but %s was requested)."%(jobname, tname, ftype, ttype))
            raise TargetError("Rule %s produces target file '%s' of type '%s' (but %s was requested)."%(rulename, tname, ftype, ttype))
        if verb:
            print("%s -> %s"%(rulename, rule))

        # instantiate the job
        job = types[jobname](args)
        job = types[rulename](args)

        G.add_edge(job, rule, obj=p) # path is created, and rule is enabled by running 'job'
        addl.append(job) # follow-up with inputs of this job
@@ -57,19 +58,19 @@ class TaskGraph:
        self.testonly = testonly
        self.verb = verb
        self.G = nx.DiGraph()
        for rule, args in jobs.items():
        for tgt, args in jobs.items():
            if 'dirname' not in args:
                raise KeyError("Job %s is missing 'dirname:'!"%rule)
                raise KeyError("Target %s is missing 'dirname:'!"%tgt)
            args['dirname'] = Path(args['dirname']) # create rich repr
            if not args['dirname'].is_dir():
                raise KeyError("Job %s: missing directory %s"%(rule,args['dirname']))
                raise KeyError("Target %s: invalid directory %s"%(tgt,args['dirname']))
            if 'out' not in args:
                raise KeyError("Job %s is missing 'out:' a dictionary of outputs!"%rule)
                raise KeyError("Target %s is missing 'out:' a dictionary of target outputs!"%tgt)
    
            task = Top(rule, args)
            task.inp = args['out']
            task = Top(tgt, args)
            task.params['inp'] = args['out']
            del args['out'] # not inherited
            append_graph(self.G, types, task, args)
            append_graph(self.G, types, task, args, verb)

        # M : Machine
        # create priority queue of runnable tasks
@@ -115,7 +116,7 @@ class TaskGraph:
    def finish(self, task, ret):
        if ret != 0:
            self.errors += 1
            print("Job returned nonzero exit code, %d.\n"%ret)
            print("Rule returned nonzero exit code, %d.\n"%ret)
            return False # don't escalate, but don't continue

        self.completed += 1
@@ -134,7 +135,7 @@ class TaskGraph:
        return False

    def stats(self):
        return "Job stats:\n" + \
        return "Rule stats:\n" + \
               "    %s: %d\n    %s: %d\n    %s: %d" % (
                         str_ok("completed"), self.completed,
                         str_err("errors"), self.errors,
@@ -146,7 +147,7 @@ class TaskGraph:
class Task:
    def __init__(self, task, prep, fin):
        self.task = task
        self.rs   = task.resource
        self.rs   = task.params['resource']

        self.prep = prep
        self.fin  = fin
@@ -165,7 +166,7 @@ def prio_width(G):

    for task in reversed(topo):
        wmax = reduce(max, (wt[s] for s in G[task]), 0.0)
        R = task.resource
        R = task.params['resource']
        wt[task] = wmax + R.time*R.nrs*(R.cpu+R.gpu*25)
    return wt

@@ -175,7 +176,7 @@ def earliest_time(G):
    topo = list(nx.topological_sort(G))
    for task in reversed(topo):
        tmax = max(0.0, *(time[s] for s in G[task]))
        time[task] = tmax + task.resource.time
        time[task] = tmax + task.params['resource'].time

    return time

@@ -196,7 +197,7 @@ def crit_paths(G):
            crit.append(task)
            del time[task]
        paths.append( (t, crit) )
    print(paths)
    return paths

# wt is a dictionary mapping all potential tasks to priorities.
# self.l holds the heap of tasks actually in the queue.
@@ -219,10 +220,9 @@ class PriorityQueue:
    # (that meets the requirements) off the queue.
    # This also removes all items from the
    # queue that can't run in the current machine allocation.
    # WARNING: This monkey-patches task to set
    #          task.mpirun = cmd
    # WARNING: This monkey-patches task by calling task.set_mpirun.
    #
    # Returns Job class or None
    # Returns Rule class or None
    # Cannot return None when jobs passing M.possible()
    # are available and the whole machine is available
    # (so no jobs are currently running).
@@ -233,10 +233,10 @@ class PriorityQueue:
        for i in range(len(self.l)):
            task = self.pop()

            cmd = M.alloc(task.resource)
            cmd = M.alloc(task.params['resource'])
            if cmd is not None:
                break # this task matches requirements
            if M.possible(task.resource):
            if M.possible(task.params['resource']):
                tmp.append(task) # maybe can do later
        else:
            if len(tmp) == 0:
@@ -246,6 +246,6 @@ class PriorityQueue:
            self.push(j)
        if cmd is None:
            return None
        task.mpirun = cmd
        task.set_mpirun(cmd)
        return task
+17 −22
Original line number Diff line number Diff line
@@ -35,28 +35,22 @@ def check_exit(cmd, cwd=None):
        print(str_ok("[PASS]"))
    return r

# Queue up a job and return its jobid.
def queue_job(script, dep=None):
    if dep is None:
       job = ["bsub", script]
    else:
       job = ["bsub", "-w", "done(%d)"%dep, script]
    out = run_cmd(*job)
    # regexp search n in out
    exp = re.compile(r'Job <([0-9]+)> is submitted to')
    for line in out.split('\n'):
        m = exp.match(line)
        if m:
            return int(m[1])
    print(str_err("Error launching job %s!"%script))
    return -1

def jobfile(dirname, jobname, ext):
    return dirname / (jobname + '.' + ext)

def has_file(dirname, fname):
    return (dirname / fname).exists()

def format_dict(dct, kws, skip=[]):
    fmt = [dct]
    while len(fmt) > 0:
        dct = fmt.pop()
        for k, v in dct.items():
            if isinstance(v, str) and k not in skip:
                dct[k] = v.format(**kws)
            elif isinstance(v, dict):
                fmt.append(v)

def dict_merge(dct, merge_dct):
    """ Recursive dict merge. Inspired by :meth:``dict.update()``, instead of
    updating only top-level keys, dict_merge recurses down into dicts nested
@@ -66,13 +60,14 @@ def dict_merge(dct, merge_dct):
    :param merge_dct: dct merged into dct
    :return: None
    """
    mg = [(dct, merge_dct)]
    while len(mg) > 0:
        dct, merge_dct = mg.pop()
        for k, v in merge_dct.items():
        if k in dct and isinstance(dct[k], dict) and isinstance(v, dict):
            dict_merge(dct[k], v)
        else:
            if isinstance(v, dict): # deep-copy
                if not (k in dct and isinstance(dct[k], dict)):
                    dct[k] = {}
                dict_merge(dct[k], v)
                mg.append((dct[k], v))
            else:
                dct[k] = v

+5 −5
Original line number Diff line number Diff line
#!/usr/bin/env python3

import yaml
from jobtypes import read_jobtypes
from rules import read_rules
from graph import TaskGraph
from machine import machine, Machine
from manager import Manager
@@ -15,13 +15,13 @@ def main(argv):
      else:
        raise KeyError("Unknown option: %s"%argv[1])

    assert len(argv) == 4, "Usage: %s [-v] <jobtypes.yaml> <jobs.yaml> <minutes avail.>"%argv[0]
    types = read_jobtypes(argv[1])
    assert len(argv) == 4, "Usage: %s [-v] <rules.yaml> <targets.yaml> <minutes avail.>"%argv[0]
    rules = read_rules(argv[1])
    with open(argv[2]) as f:
        jobs = yaml.safe_load(f)
        tgts = yaml.safe_load(f)
    T = float(argv[3])

    G = TaskGraph(types, jobs)
    G = TaskGraph(rules, tgts)
    M = machine(T)
    R = Manager(M, T, verb)
    R.run( G.get_work )
Loading