Commit 10fef229 authored by David M. Rogers's avatar David M. Rogers
Browse files

Bugfix graph.py, swap prepare to run, better logging, deprecate check_exit.

parent 6051bc89
Loading
Loading
Loading
Loading
+35 −39
Original line number Diff line number Diff line
@@ -73,18 +73,18 @@ def append_graph(G, types, rule, args, verb=False):
                raise TargetError("No rule to make target '%s' needed by '%s'."%(p, rule.id))
            rulename = nrule.params['rulename']
            if ftype != ttype:
            raise TargetError("Rule %s produces target file '%s' of type '%s' (but %s was requested)."%(rulename, tname, ftype, ttype))
                raise TargetError("Rule %s produces target file '%s' of type '%s' (but %s was requested)."%(
                        rulename, tname, ftype, ttype))

            # instantiate the Rule -- creating a job
            job = nrule(args)
        if job in checked:
            continue
            if job not in checked:
                addl.append(job) # follow-up with inputs of this job
                checked.add(job)
            if verb:
                print("%s -> %s"%(job.id, rule.id))

            G.add_edge(job, rule, obj=p) # path is created, and rule is enabled by running 'job'
        addl.append(job) # follow-up with inputs of this job

class TaskGraph:
    def __init__(self, types, jobs, testonly=False, verb=False):
@@ -132,22 +132,18 @@ class TaskGraph:
    #     Manager calls Machine.free
    # task = {
    #          rs      = ResourceSet
    #          prepare = (testonly=False) -> [cmd_string] | None
    #          run     = (testonly=False) -> Popen | None
    #          finish  = (return_val : int) -> None
    #        }
    def get_work(self, M):
        task = self.Q.get_task(M)
        if task is None:
            return None
        # wrap self.prepare & self.finish in the requested API
        return Task(task, self.prepare, self.finish)
        # wrap self.run & self.finish in the requested API
        return Task(task, self.run, self.finish)

    def prepare(self, task, testonly=False):
        cmd = task.prepare(testonly)
        if cmd is None:
            return cmd
        assert isinstance(cmd, list), "Task %s returned non-strlist command!"%(task)
        return cmd
    def run(self, task, testonly=False):
        return task.run(testonly)

    def finish(self, task, ret):
        if ret != 0:
@@ -262,21 +258,21 @@ class TaskGraph:
        return stats

# Closure storing task and its associated TaskGraph
# and providing prepare, finish callbacks.
# and providing run, finish callbacks.
class Task:
    def __init__(self, task, prep, fin):
    def __init__(self, task, run_fn, fin_fn):
        self.task = task
        self.rs   = task.params['resource']

        self.prep = prep
        self.fin  = fin
        self.run_fn = run_fn
        self.fin_fn = fin_fn

    def __str__(self):
        return str(self.task)
    def prepare(self, testonly=False):
        return self.prep(self.task, testonly)
    def run(self, testonly=False):
        return self.run_fn(self.task, testonly)
    def finish(self, ret):
        return self.fin(self.task, ret)
        return self.fin_fn(self.task, ret)

# wt is a dictionary mapping all potential tasks to priorities.
# self.l holds the heap of tasks actually in the queue.
+19 −17
Original line number Diff line number Diff line
@@ -7,6 +7,9 @@ from os import listdir

from colorama import Fore, Back, Style 

import logging
log = logging.getLogger(__name__)

class RuleError(Exception):
    pass
class TargetError(Exception):
@@ -22,21 +25,21 @@ def str_ok(msg):
    return Fore.GREEN + str(msg) + Style.RESET_ALL

# raises an exception on nonzero return
def run_cmd(*args):
    print(str_ok("Running: " + " ".join(args)))
    out = subprocess.check_output(args)
    out = out.decode("utf-8")
    print(out)
    return out

def check_exit(cmd, cwd=None):
    print(str_ok("Testing: %s"%cmd))
    r = bool(subprocess.call(cmd, shell=True, cwd=cwd))
    if r:
        print(str_err("[FAIL]"))
    else:
        print(str_ok("[PASS]"))
    return r
#def run_cmd(*args):
#    print(str_ok("Running: " + " ".join(args)))
#    out = subprocess.check_output(args)
#    out = out.decode("utf-8")
#    print(out)
#    return out

#def check_exit(cmd, cwd=None):
#    print(str_ok("Testing: %s"%cmd))
#    r = bool(subprocess.call(cmd, shell=True, cwd=cwd))
#    if r:
#        print(str_err("[FAIL]"))
#    else:
#        print(str_ok("[PASS]"))
#    return r

def jobfile(dirname, jobname, ext):
    return dirname / (jobname + '.' + ext)
@@ -108,8 +111,7 @@ class JobState:
            path = self.dirname/name
            if (verb or ok) and not path.exists():
                ok = False
                if verb:
                    print(str_err("Job %s is missing file %s."%(self.jobname, path)))
                log.error("Job %s is missing file %s.", self.jobname, path)
            paths[key] = path
        return ok, paths
    def input_paths(self, fnames):
+5 −0
Original line number Diff line number Diff line
@@ -61,10 +61,12 @@ class ResourceSet:
def machine(time):
    # Rhea
    if 'SLURM_JOB_NUM_NODES' in os.environ:
        print("Running on rhea")
        return Machine(time, int(os.environ['SLURM_JOB_NUM_NODES']), 0, 32, "srun")

    # Summit
    if 'LSB_MAX_NUM_PROCESSORS' in os.environ:
        print("Running on summit")
        return Machine(time, int(os.environ['LSB_MAX_NUM_PROCESSORS'])//42, 6, 42, "jsrun")

    # Localhost -- allow running up to 4 jobs locally
@@ -100,6 +102,9 @@ class Machine:
                jsrun += " %s" % R.jsrun_attr
            return jsrun
        
        elif self.batch == 'local':
            return ""

        raise IndexError(self.batch)

    # Returns None if this allocation is impossible to schedule.
+2 −10
Original line number Diff line number Diff line
@@ -7,15 +7,7 @@ from machine import machine, Machine
from manager import Manager

def main(argv):
    verb = False
    while len(argv) > 1 and argv[1][0] == '-':
      if argv[1] == "-v":
        verb = True
        del argv[1]
      else:
        raise KeyError("Unknown option: %s"%argv[1])

    assert len(argv) == 4, "Usage: %s [-v] <rules.yaml> <targets.yaml> <minutes avail.>"%argv[0]
    assert len(argv) == 4, "Usage: %s <rules.yaml> <targets.yaml> <minutes avail.>"%argv[0]
    rules = read_rules(argv[1])
    with open(argv[2]) as f:
        tgts = yaml.safe_load(f)
@@ -23,7 +15,7 @@ def main(argv):

    G = TaskGraph(rules, tgts)
    M = machine(T)
    R = Manager(M, T, verb)
    R = Manager(M, T)
    R.run( G.get_work )

if __name__=="__main__":

make_test.py

100644 → 100755
+1 −1
Original line number Diff line number Diff line
@@ -14,7 +14,7 @@ def main(argv):

    G = TaskGraph(rules, tgts, True, True)
    M = Machine(1e100, 4600, 6, 42, "jsrun")
    R = Manager(M, 1e100, True)
    R = Manager(M, 1e100)

    print("Critical Path Statistics:")
    paths = G.crit_paths()
Loading