Commit a3be0f3a authored by David M. Rogers's avatar David M. Rogers
Browse files

Added loop-rules.

parent 68b19459
Loading
Loading
Loading
Loading
+1 −3
Original line number Diff line number Diff line
docs:
    resource: # 1
        time: 1.0
        time: 0.1
        nrs:  1
        cpu:  1
    out:
        txt: file_*n*.txt # 1
#   inp:
#       doc: test_{n}.rst # 2
    script: | # 3
        echo {n} >{out[txt]}

+10 −5
Original line number Diff line number Diff line
@@ -5,21 +5,26 @@ Cov1:
  replicas: 20
  out:
      xtc: run.xtc
  loop:
      n: [0, "{replicas}"] # range(0,20)
      out:
          txt: file_{n:02d}.txt
          #txt: cov_{n:02d}.txt
      txt: file_00.txt

Cov2:
  dirname: Cov2
  replicas: 10
  out:
      xtc: run.xtc
  loop:
      n: [0, "{replicas}"] # range(0,10)
      out:
          txt: file_{n:02d}.txt

# Implied computation graph
#
#  1 cpu,5 min;  2 nodes, 100 min.;  notification
#       grompp ->    mdrun         ->    Cov1
#       docs [n="00"] --------------------^
#       grompp ->    mdrun         ->    Cov2
#       docs [n="00"] --------------------^
#       docs [n="01"] --------------------^
#       ...
#       docs [n="09"] --------------------^
#   
+5 −0
Original line number Diff line number Diff line
@@ -45,6 +45,11 @@ class FMatch:
            return (self.var, fname[self.ls:-self.le])
        return False

    def matched(self, x):
        if self.var is None:
            return self.start
        return self.start + x + self.end

    # returns True if any file could match both names
    # False otherwise
    def same(a, b):
+46 −10
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ from heapq import *
from pathlib import Path
from rules import Top
from helpers import TargetError, str_ok, str_warn, str_err
import yaml

from functools import reduce

@@ -15,15 +16,39 @@ from functools import reduce
#   job graph -- edges are directed foward in time (x triggers y)
#   leaf-set of jobs ready to run

def gen_targets(params):
    for ttype, tname in params['inp'].items():
        yield ttype, tname
    if 'loop' in params:
        assert len(params['loop']) == 2, "Loop must have a single variable."
        assert 'out' in params['loop'], "Loop must contain 'out:'"
        for var, r in params['loop'].items():
            if var is not 'out':
                break
        assert isinstance(r, list), "Loop must have a single variable holding a range list."
        for i in range(len(r)):
            if isinstance(r[i], str) and '{' in r[i]:
                r[i] = yaml.safe_load(r[i].format(**params))
        for n in range(*r):
            for ttype, out in params['loop']['out'].items():
                fmt = {var: n}
                yield ttype, out.format(**fmt)

# FIXME: prevent gen_targets from running too many times
#  1. remove 'loop' from rules - don't inherit
#  2. add 'n = ...' to Rule __repr__ class (so comparison is possible!

# G       : networkx.DiGraph of the current computation
# types   : RuleTypes object
# rule    : Rule instance being currently implemented
# args    : dict of args to pass through to all descendant jobs (includes dirname)
def append_graph(G, types, rule, args, verb=False):
    G.add_node(rule)
    addl = [rule] # breadth-first traversal
    checked = set(addl) # set of rules already set to be added
    while len(addl) > 0:
      rule = addl.pop()
      for ttype, tname in rule.params['inp'].items():
      for ttype, tname in gen_targets(rule.params):
        p = rule.params['dirname'] / tname
        if p.exists():
            try:
@@ -32,11 +57,12 @@ def append_graph(G, types, rule, args, verb=False):
                continue # no creating job found
            # Check for newer inputs.
            mtime = p.stat().st_mtime
            for t,f in nrule.params['inp'].items(): # need to re-run
            for t,f in gen_targets(nrule.params): # need to re-run
                f = rule.params['dirname'] / f
                if f.exists() and f.stat().st_mtime > mtime:
                    print("File %s is newer than %s - re-running %s"%(f, p, nrule.params['rulename']))
                    break
            else:
                continue
        try:
            nrule, ftype = types[tname]
@@ -45,11 +71,14 @@ def append_graph(G, types, rule, args, verb=False):
        rulename = nrule.params['rulename']
        if ftype != ttype:
            raise TargetError("Rule %s produces target file '%s' of type '%s' (but %s was requested)."%(rulename, tname, ftype, ttype))
        if verb:
            print("%s -> %s"%(rulename, rule))

        # instantiate the Rule -- creating a job
        job = nrule(args)
        if job in checked:
            continue
        checked.add(job)
        if verb:
            print("%s -> %s"%(job.id, rule.id))

        G.add_edge(job, rule, obj=p) # path is created, and rule is enabled by running 'job'
        addl.append(job) # follow-up with inputs of this job
@@ -71,6 +100,9 @@ class TaskGraph:
            task = Top(tgt, args)
            task.params['inp'] = args['out']
            del args['out'] # not inherited
            if 'loop' in args:
                task.params['loop'] = args['loop']
                del args['loop'] # not inherited
            append_graph(self.G, types, task, args, verb)

        # M : Machine
@@ -191,16 +223,20 @@ class TaskGraph:
        time = self.earliest_finish_time()

        paths = []
        while len(time) > 0:
        remain = set(G.nodes) # nodes not added to a crit. path
        while len(remain) > 0:
            # Start from any node with max finish time and determine
            # the graph's critical path.
            t, task = max( (t,j) for j,t in time.items() )
            t, task = max( (time[j],j) for j in remain )
            crit = [ task ]
            del time[task]
            remain.remove(task)
            while len(G[task]) > 0:
                _, task = max( (time[j], j) for j in G[task] )
                succ = set(G[task]) and remain
                if len(succ) == 0:
                    break
                _, task = max( (time[j], j) for j in succ )
                crit.append(task)
                del time[task]
                remain.remove(task)
            paths.append( (t, crit) )
        return paths

+34 −7
Original line number Diff line number Diff line
@@ -15,8 +15,10 @@ class Top:
        self.id = "%s/%s"%(args['dirname'], rulename)
        self.params = { 'rulename': rulename,
                        'dirname': args['dirname'],
                        'resource': ResourceSet(0.0, 1, 1, 0)
                        'resource': ResourceSet(0.0, 1, 1, 0),
                        'varname': None
                      }
        dict_merge(self.params, args)
    def __lt__(a, b):
        return repr(a) < repr(b)
    def __gt__(a, b):
@@ -70,7 +72,7 @@ class Rule:
          'mpirun': ""
    }
    # These variables cannot be overridden by kws to init.
    uniq = ['rulename', 'inp', 'out', 'resource', 'setup', 'script']
    uniq = ['rulename', 'inp', 'out', 'resource', 'setup', 'script', 'loop']

    def __init__(self, kws): # kws : {}
        """ By the time init is called, this class
@@ -92,12 +94,22 @@ class Rule:
        self.params = par
        assert 'rulename' in self.params, "Rule subclass must have rulename"
        assert 'dirname' in kws, "Instantiation must include dirname."
        if self.varname is None:
            self.id = "%s/%s"%(kws['dirname'], self.params['rulename'])
        else:
            assert self.varname in self.params, \
                   "Rule %s/%s requires setting var %s"%(kws['dirname'],
                           self.params['rulename'], self.varname)
            assert isinstance(self.params[self.varname], str), \
                   "Rule %s/%s var %s is not a string."%(kws['dirname'],
                           self.params['rulename'], self.varname)
            self.id = "%s/%s [%s=%s]"%(kws['dirname'], self.params['rulename'],
                                       self.varname, self.params[self.varname])

        # Merge
        try:
            self.typecheck(kws)
        except TypeError as e:
        except SyntaxError as e:
            print("Error instantiating rule %s" % self.id)
            raise e

@@ -158,7 +170,11 @@ class Rule:
        # Create next job and check whether it's 'sh' is present.
        # Note: the Manager object has monkey-patched
        # an 'mpirun' attribute into self.
        self.next_job = JobState(self.params['dirname'], self.params['rulename'])
        if self.varname is None:
            var = ""
        else:
            var = ".%s" % self.params[self.varname]
        self.next_job = JobState(self.params['dirname'], self.params['rulename']+var)

        if testonly:
            return ["echo", "Would run %s" % str(self.next_job)]
@@ -184,14 +200,21 @@ class RuleTypes:
        if rname in self.rules:
                raise RuleError("Duplicate rule name '%s'"%rname)
        self.rules[rname] = params
        varname = None
        for ftype, fname in params['out'].items():
            f = FMatch(fname)
            if varname is not None:
                if f.var is not None and f.var != varname:
                    raise RuleError(
                            "Variable names (%s and %s) differ in Rule %s"%(varname, f.var, rname))
            else:
                varname = f.var
            for f2,ft,rn in self.files:
                if f2.same(f):
                    raise RuleError(
                        "Duplicate target (%s: %s) declared by %s.\n"%(f,fname,rname) +
                        "  conflicts with earlier target (%s: %s) declared by %s.\n"%(f2,ft,rn))
            self.files.append(f, ftype, rname)
            self.files.append( (f, ftype, rname) )

    def __getitem__(self, fname):
        for f, ftype, rname in self.files:
@@ -204,7 +227,11 @@ class RuleTypes:
            dict_merge(params, self.rules[rname])
            if "inp" in params and isinstance(params["inp"], dict):
                format_dict(params["inp"], params)
            return type(rname, (Rule,), {'params': params}), ftype
            if isinstance(m, tuple): # substutite outputs like blob.*ext* ~> blob.txt
                for k, v in params["out"].items():
                    params["out"][k] = FMatch(v).matched(m[1])
            return type(rname, (Rule,), {'params': params,
                                         'varname': f.var}), ftype
        raise KeyError(fname)

# Read yaml-defined make-rules.
Loading