Loading graph.py +14 −9 Original line number Diff line number Diff line Loading @@ -18,43 +18,48 @@ log = logging.getLogger(__name__) # job graph -- edges are directed foward in time (x triggers y) # leaf-set of jobs ready to run def gen_targets(params): def gen_targets(params, tag): for ttype, tname in params['inp'].items(): yield ttype, tname if 'loop' in params: assert len(params['loop']) == 2, "Loop must have a single variable." tgtname = 'tgt' if tgtname not in params['loop']: tgtname = 'in' assert tgtname in params['loop'], "Loop must contain 'tgt:/in:'" tgtname = 'inp' assert tgtname in params['loop'], "Loop must contain 'tgt:/inp:'" for var, r in params['loop'].items(): if var is not tgtname: break assert isinstance(r, str), "Loop must have a single variable holding an expr." if '{' in r: try: r = r.format(**params) except KeyError as e: print("While formatting %s in %s:"%(var, tag)) raise e for n in eval(r, {}, {}): for ttype, tgt in params['loop'][tgtname].items(): fmt = {var: n} yield ttype, tgt.format(**fmt) # Determine if dirname / tname must be generated by pmake def must_generate(dirname, tname, types): def must_generate(dirname, tname, types, args): fname = dirname / tname if not fname.exists(): return True try: nrule, ftype = types[tname] nrule = nrule(args) except KeyError: return False # don't know how to generate # Check for newer inputs. mtime = fname.stat().st_mtime for t,f in gen_targets(nrule.params): # need to re-run for t,f in gen_targets(nrule.params, nrule.id): # need to re-run f = dirname / f if f.exists() and f.stat().st_mtime > mtime: log.warning("File %s is newer than %s - re-running %s", f, fname, nrule.params['rulename']) f, fname, nrule.id) return True return False Loading @@ -68,9 +73,9 @@ def append_graph(G, types, rule, args, verb=False): checked = set(addl) # set of rules already set to be added while len(addl) > 0: rule = addl.pop() for ttype, tname in gen_targets(rule.params): for ttype, tname in gen_targets(rule.params, rule.id): p = rule.params['dirname'] / tname if not must_generate(rule.params['dirname'], tname, types): if not must_generate(rule.params['dirname'], tname, types, args): continue try: nrule, ftype = types[tname] Loading helpers.py +4 −1 Original line number Diff line number Diff line Loading @@ -51,12 +51,15 @@ def jobfile(dirname, jobname, ext): def has_file(dirname, fname): return (dirname / fname).exists() # In-place replacement def format_dict(dct, kws, skip=[]): fmt = [dct] while len(fmt) > 0: dct = fmt.pop() for k, v in dct.items(): if isinstance(v, str) and k not in skip and '{' in v: if k in skip: continue if isinstance(v, str) and '{' in v: dct[k] = yaml.safe_load( v.format(**kws) ) elif isinstance(v, dict): fmt.append(v) Loading machine.py +5 −2 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # for R in Rs: # free resources # M.free(R) import os, time, logging import os, re, time, logging _tick = time.time tick = lambda: _tick()/60.0 Loading Loading @@ -67,7 +67,10 @@ def machine(time): # "grep 'physical id' /proc/cpuinfo | sort -u | wc -l" #import multiprocessing as m #cpu = m.cpu_count() // 2 cpu = int( os.environ['SLURM_JOB_CPUS_PER_NODE'] ) scpu = os.environ['SLURM_JOB_CPUS_PER_NODE'] m = re.match(r'[0-9]+', scpu) assert m is not None, "Error parsing SLURM_JOB_CPUS_PER_NODE=%s"%scpu cpu = int(m[0]) return Machine(time, int(os.environ['SLURM_JOB_NUM_NODES']), 0, cpu, "srun") Loading make.py 100644 → 100755 +0 −0 File mode changed from 100644 to 100755. View file rules.py +5 −2 Original line number Diff line number Diff line Loading @@ -96,7 +96,7 @@ class Rule: par = {} dict_merge(par, self.defaults) dict_merge(par, self.params) format_dict(par, kws, skip=["script"]) format_dict(par, kws, skip=["script", "inp"]) self.params = par assert 'rulename' in self.params, "Rule subclass must have rulename" assert 'dirname' in kws, "Instantiation must include dirname." Loading Loading @@ -240,9 +240,12 @@ class RuleTypes: if isinstance(m, tuple): params[m[0]] = m[1] dict_merge(params, self.rules[rname]) # TODO: deal with loop: if "inp" in params and isinstance(params["inp"], dict): format_dict(params["inp"], params) if isinstance(m, tuple): # substutite outputs like blob.*ext* ~> blob.txt #if "loop" in params and isinstance(params["loop"], dict): # format_dict(params["loop"], params) if isinstance(m, tuple): # substutite outputs like blob.{ext} ~> blob.txt for k, v in params["out"].items(): params["out"][k] = FMatch(v).matched(m[1]) return type(rname, (Rule,), {'params': params, Loading Loading
graph.py +14 −9 Original line number Diff line number Diff line Loading @@ -18,43 +18,48 @@ log = logging.getLogger(__name__) # job graph -- edges are directed foward in time (x triggers y) # leaf-set of jobs ready to run def gen_targets(params): def gen_targets(params, tag): for ttype, tname in params['inp'].items(): yield ttype, tname if 'loop' in params: assert len(params['loop']) == 2, "Loop must have a single variable." tgtname = 'tgt' if tgtname not in params['loop']: tgtname = 'in' assert tgtname in params['loop'], "Loop must contain 'tgt:/in:'" tgtname = 'inp' assert tgtname in params['loop'], "Loop must contain 'tgt:/inp:'" for var, r in params['loop'].items(): if var is not tgtname: break assert isinstance(r, str), "Loop must have a single variable holding an expr." if '{' in r: try: r = r.format(**params) except KeyError as e: print("While formatting %s in %s:"%(var, tag)) raise e for n in eval(r, {}, {}): for ttype, tgt in params['loop'][tgtname].items(): fmt = {var: n} yield ttype, tgt.format(**fmt) # Determine if dirname / tname must be generated by pmake def must_generate(dirname, tname, types): def must_generate(dirname, tname, types, args): fname = dirname / tname if not fname.exists(): return True try: nrule, ftype = types[tname] nrule = nrule(args) except KeyError: return False # don't know how to generate # Check for newer inputs. mtime = fname.stat().st_mtime for t,f in gen_targets(nrule.params): # need to re-run for t,f in gen_targets(nrule.params, nrule.id): # need to re-run f = dirname / f if f.exists() and f.stat().st_mtime > mtime: log.warning("File %s is newer than %s - re-running %s", f, fname, nrule.params['rulename']) f, fname, nrule.id) return True return False Loading @@ -68,9 +73,9 @@ def append_graph(G, types, rule, args, verb=False): checked = set(addl) # set of rules already set to be added while len(addl) > 0: rule = addl.pop() for ttype, tname in gen_targets(rule.params): for ttype, tname in gen_targets(rule.params, rule.id): p = rule.params['dirname'] / tname if not must_generate(rule.params['dirname'], tname, types): if not must_generate(rule.params['dirname'], tname, types, args): continue try: nrule, ftype = types[tname] Loading
helpers.py +4 −1 Original line number Diff line number Diff line Loading @@ -51,12 +51,15 @@ def jobfile(dirname, jobname, ext): def has_file(dirname, fname): return (dirname / fname).exists() # In-place replacement def format_dict(dct, kws, skip=[]): fmt = [dct] while len(fmt) > 0: dct = fmt.pop() for k, v in dct.items(): if isinstance(v, str) and k not in skip and '{' in v: if k in skip: continue if isinstance(v, str) and '{' in v: dct[k] = yaml.safe_load( v.format(**kws) ) elif isinstance(v, dict): fmt.append(v) Loading
machine.py +5 −2 Original line number Diff line number Diff line Loading @@ -12,7 +12,7 @@ # for R in Rs: # free resources # M.free(R) import os, time, logging import os, re, time, logging _tick = time.time tick = lambda: _tick()/60.0 Loading Loading @@ -67,7 +67,10 @@ def machine(time): # "grep 'physical id' /proc/cpuinfo | sort -u | wc -l" #import multiprocessing as m #cpu = m.cpu_count() // 2 cpu = int( os.environ['SLURM_JOB_CPUS_PER_NODE'] ) scpu = os.environ['SLURM_JOB_CPUS_PER_NODE'] m = re.match(r'[0-9]+', scpu) assert m is not None, "Error parsing SLURM_JOB_CPUS_PER_NODE=%s"%scpu cpu = int(m[0]) return Machine(time, int(os.environ['SLURM_JOB_NUM_NODES']), 0, cpu, "srun") Loading
rules.py +5 −2 Original line number Diff line number Diff line Loading @@ -96,7 +96,7 @@ class Rule: par = {} dict_merge(par, self.defaults) dict_merge(par, self.params) format_dict(par, kws, skip=["script"]) format_dict(par, kws, skip=["script", "inp"]) self.params = par assert 'rulename' in self.params, "Rule subclass must have rulename" assert 'dirname' in kws, "Instantiation must include dirname." Loading Loading @@ -240,9 +240,12 @@ class RuleTypes: if isinstance(m, tuple): params[m[0]] = m[1] dict_merge(params, self.rules[rname]) # TODO: deal with loop: if "inp" in params and isinstance(params["inp"], dict): format_dict(params["inp"], params) if isinstance(m, tuple): # substutite outputs like blob.*ext* ~> blob.txt #if "loop" in params and isinstance(params["loop"], dict): # format_dict(params["loop"], params) if isinstance(m, tuple): # substutite outputs like blob.{ext} ~> blob.txt for k, v in params["out"].items(): params["out"][k] = FMatch(v).matched(m[1]) return type(rname, (Rule,), {'params': params, Loading