Loading graph.py +17 −13 Original line number Diff line number Diff line Loading @@ -4,10 +4,12 @@ from heapq import * from pathlib import Path from rules import Top from helpers import TargetError, str_ok, str_warn, str_err import yaml import yaml, logging from functools import reduce log = logging.getLogger(__name__) # Key data objects for make: # job-class lookup dictionary, jobtype : Rule implemention class # - the types dict Loading @@ -23,7 +25,7 @@ def gen_targets(params): assert len(params['loop']) == 2, "Loop must have a single variable." assert 'out' in params['loop'], "Loop must contain 'out:'" for var, r in params['loop'].items(): if var is not 'out': if var != 'out': break assert isinstance(r, str), "Loop must have a single variable holding an expr." if '{' in r: Loading @@ -48,8 +50,8 @@ def must_generate(dirname, tname, types): for t,f in gen_targets(nrule.params): # need to re-run f = dirname / f if f.exists() and f.stat().st_mtime > mtime: print("File %s is newer than %s - re-running %s"% (f, fname, nrule.params['rulename'])) log.warning("File %s is newer than %s - re-running %s", f, fname, nrule.params['rulename']) return True return False Loading Loading @@ -81,8 +83,7 @@ def append_graph(G, types, rule, args, verb=False): if job not in checked: addl.append(job) # follow-up with inputs of this job checked.add(job) if verb: print("%s -> %s"%(job.id, rule.id)) log.debug("%s -> %s", job.id, rule.id) G.add_edge(job, rule, obj=p) # path is created, and rule is enabled by running 'job' Loading Loading @@ -116,7 +117,7 @@ class TaskGraph: if self.is_root(task): self.Q.push(task) print("%d/%d tasks ready to run."%(len(self.Q), len(self.G))) log.warning("%d/%d tasks ready to run.", len(self.Q), len(self.G)) self.completed = 0 self.errors = 0 Loading @@ -133,7 +134,7 @@ class TaskGraph: # task = { # rs = ResourceSet # run = (testonly=False) -> Popen | None # finish = (return_val : int) -> None # finish = (return_val : int) -> ok : Bool # } def get_work(self, M): task = self.Q.get_task(M) Loading @@ -146,9 +147,11 @@ class TaskGraph: return task.run(testonly) def finish(self, task, ret): if ret != 0: ok = task.finish(ret) if not ok: self.errors += 1 print("Rule returned nonzero exit code, %d.\n"%ret) log.error("Rule '%s' returned !ok.", task.id) return False # don't escalate, but don't continue self.completed += 1 Loading @@ -162,8 +165,9 @@ class TaskGraph: self.Q.push(task) suc += 1 if self.verb and len(tset) > 0: print(" Added %d/%d successor jobs (%d items in queue)"%(suc,len(tset),len(self.Q))) if len(tset) > 0: log.info("Task '%s' added %d/%d successor jobs " "(%d items in queue).", task.id, suc, len(tset), len(self.Q)) return False def stats(self): Loading Loading @@ -242,7 +246,7 @@ class TaskGraph: # Produce statistics on this path. # path : [task] # M : Machine (will call M.possible() for node # and time info.) # return : [(expected start time, nodes used)] # return : [ [(expected start time, nodes used, time needed)] ] def path_stats(self, M, paths): time = self.earliest_start_time() stats = [] Loading helpers.py +6 −2 Original line number Diff line number Diff line Loading @@ -5,7 +5,11 @@ import codecs import yaml from os import listdir from colorama import Fore, Back, Style try: from colorama import Fore, Style except ModuleNotFoundError: Fore = type('Fore', (), dict(RED="", YELLOW="", GREEN=""))() Style = type('Style', (), dict(RESET_ALL=""))() import logging log = logging.getLogger(__name__) Loading Loading @@ -92,7 +96,7 @@ class JobState: self.err = False self.done = False self.job_in = jobfile(dirname, jobname, 'sh') self.job_out = jobfile(dirname, jobname, 'out') self.job_out = jobfile(dirname, jobname, 'log') def __str__(self): if self.err: Loading machine.py +8 −6 Original line number Diff line number Diff line Loading @@ -12,10 +12,12 @@ # for R in Rs: # free resources # M.free(R) import os, time import os, time, logging _tick = time.time tick = lambda: _tick()/60.0 log = logging.getLogger(__name__) # Set a fake timer that pull sequentially from an event list. # events = heapsorted #def queue_event(): Loading Loading @@ -61,12 +63,12 @@ class ResourceSet: def machine(time): # Rhea if 'SLURM_JOB_NUM_NODES' in os.environ: print("Running on rhea") log.info("Running on rhea") return Machine(time, int(os.environ['SLURM_JOB_NUM_NODES']), 0, 32, "srun") # Summit if 'LSB_MAX_NUM_PROCESSORS' in os.environ: print("Running on summit") log.info("Running on summit") return Machine(time, int(os.environ['LSB_MAX_NUM_PROCESSORS'])//42, 6, 42, "jsrun") # Localhost -- allow running up to 4 jobs locally Loading Loading @@ -111,14 +113,14 @@ class Machine: # Or a tuple (nodes, time) indicating max #nodes and avg. time taken. def possible(self, R): if R.time > self.end_time - tick(): print("Unable to schedule due to time limitations.") log.warning("Unable to schedule due to time limitations.") return None nodes = R.min_nodes(self.CpN, self.GpN) if nodes is None: print("Unable to schedule due to inavailability of high gpu/cpu count nodes.") log.warning("Unable to schedule due to inavailability of high gpu/cpu count nodes.") return None if nodes > len(self.N): print("Unable to schedule due to inavailability of sufficient nodes.") log.warning("Unable to schedule due to inavailability of sufficient nodes.") return None return (nodes, R.time) Loading make_test.py +13 −11 Original line number Diff line number Diff line Loading @@ -16,19 +16,21 @@ def main(argv): M = Machine(1e100, 4600, 6, 42, "jsrun") R = Manager(M, 1e100) print("Critical Path Statistics:") #print("Critical Path Statistics:") paths = G.crit_paths() print("Critical path time = %f min."%(paths[0][0])) stats = G.path_stats(M, paths) print(" ; ".join("start time nodes" for s in stats)) l = max(0, *(len(s) for s in stats)) for i in range(l): u = [] for s in stats: if len(s) <= i: u.append(" ") continue u.append("%5.0f %5.0f %5d"%(s[i][0], s[i][2], s[i][1])) print(" ; ".join(u)) print("Upper bound on nodes = %d."%sum(max(s[1] for s in step) for step in stats)) #print(" ; ".join("start time nodes" for s in stats)) #l = max(0, *(len(s) for s in stats)) #for i in range(l): # u = [] # for s in stats: # if len(s) <= i: # u.append(" ") # continue # u.append("%5.0f %5.0f %5d"%(s[i][0], s[i][2], s[i][1])) # print(" ; ".join(u)) R.run( G.get_work, True ) Loading manager.py +2 −6 Original line number Diff line number Diff line Loading @@ -8,8 +8,6 @@ # track the resources available across the machine. import os, signal, time, logging from subprocess import STDOUT #from subprocess import check_call _tick = time.time tick = lambda: _tick()/60.0 Loading Loading @@ -48,10 +46,8 @@ class Manager: log.info("Process '%s' completed.", task) self.M.free(task.rs) task.finish(ret) ok = task.finish(ret) p.wait() #if p.stdout != STDOUT: # p.stdout.close() # get_work : Machine -> task | None # get_work is responsible for calling Machine.alloc Loading @@ -59,7 +55,7 @@ class Manager: # task = { # rs = ResourceSet # run = (testonly=False) -> Popen instance | None # finish = (return_val : int) -> None # finish = (return_val : int) -> ok : Bool # } def run(self, get_work, testonly=False): while self.live and self.end_time > tick(): Loading Loading
graph.py +17 −13 Original line number Diff line number Diff line Loading @@ -4,10 +4,12 @@ from heapq import * from pathlib import Path from rules import Top from helpers import TargetError, str_ok, str_warn, str_err import yaml import yaml, logging from functools import reduce log = logging.getLogger(__name__) # Key data objects for make: # job-class lookup dictionary, jobtype : Rule implemention class # - the types dict Loading @@ -23,7 +25,7 @@ def gen_targets(params): assert len(params['loop']) == 2, "Loop must have a single variable." assert 'out' in params['loop'], "Loop must contain 'out:'" for var, r in params['loop'].items(): if var is not 'out': if var != 'out': break assert isinstance(r, str), "Loop must have a single variable holding an expr." if '{' in r: Loading @@ -48,8 +50,8 @@ def must_generate(dirname, tname, types): for t,f in gen_targets(nrule.params): # need to re-run f = dirname / f if f.exists() and f.stat().st_mtime > mtime: print("File %s is newer than %s - re-running %s"% (f, fname, nrule.params['rulename'])) log.warning("File %s is newer than %s - re-running %s", f, fname, nrule.params['rulename']) return True return False Loading Loading @@ -81,8 +83,7 @@ def append_graph(G, types, rule, args, verb=False): if job not in checked: addl.append(job) # follow-up with inputs of this job checked.add(job) if verb: print("%s -> %s"%(job.id, rule.id)) log.debug("%s -> %s", job.id, rule.id) G.add_edge(job, rule, obj=p) # path is created, and rule is enabled by running 'job' Loading Loading @@ -116,7 +117,7 @@ class TaskGraph: if self.is_root(task): self.Q.push(task) print("%d/%d tasks ready to run."%(len(self.Q), len(self.G))) log.warning("%d/%d tasks ready to run.", len(self.Q), len(self.G)) self.completed = 0 self.errors = 0 Loading @@ -133,7 +134,7 @@ class TaskGraph: # task = { # rs = ResourceSet # run = (testonly=False) -> Popen | None # finish = (return_val : int) -> None # finish = (return_val : int) -> ok : Bool # } def get_work(self, M): task = self.Q.get_task(M) Loading @@ -146,9 +147,11 @@ class TaskGraph: return task.run(testonly) def finish(self, task, ret): if ret != 0: ok = task.finish(ret) if not ok: self.errors += 1 print("Rule returned nonzero exit code, %d.\n"%ret) log.error("Rule '%s' returned !ok.", task.id) return False # don't escalate, but don't continue self.completed += 1 Loading @@ -162,8 +165,9 @@ class TaskGraph: self.Q.push(task) suc += 1 if self.verb and len(tset) > 0: print(" Added %d/%d successor jobs (%d items in queue)"%(suc,len(tset),len(self.Q))) if len(tset) > 0: log.info("Task '%s' added %d/%d successor jobs " "(%d items in queue).", task.id, suc, len(tset), len(self.Q)) return False def stats(self): Loading Loading @@ -242,7 +246,7 @@ class TaskGraph: # Produce statistics on this path. # path : [task] # M : Machine (will call M.possible() for node # and time info.) # return : [(expected start time, nodes used)] # return : [ [(expected start time, nodes used, time needed)] ] def path_stats(self, M, paths): time = self.earliest_start_time() stats = [] Loading
helpers.py +6 −2 Original line number Diff line number Diff line Loading @@ -5,7 +5,11 @@ import codecs import yaml from os import listdir from colorama import Fore, Back, Style try: from colorama import Fore, Style except ModuleNotFoundError: Fore = type('Fore', (), dict(RED="", YELLOW="", GREEN=""))() Style = type('Style', (), dict(RESET_ALL=""))() import logging log = logging.getLogger(__name__) Loading Loading @@ -92,7 +96,7 @@ class JobState: self.err = False self.done = False self.job_in = jobfile(dirname, jobname, 'sh') self.job_out = jobfile(dirname, jobname, 'out') self.job_out = jobfile(dirname, jobname, 'log') def __str__(self): if self.err: Loading
machine.py +8 −6 Original line number Diff line number Diff line Loading @@ -12,10 +12,12 @@ # for R in Rs: # free resources # M.free(R) import os, time import os, time, logging _tick = time.time tick = lambda: _tick()/60.0 log = logging.getLogger(__name__) # Set a fake timer that pull sequentially from an event list. # events = heapsorted #def queue_event(): Loading Loading @@ -61,12 +63,12 @@ class ResourceSet: def machine(time): # Rhea if 'SLURM_JOB_NUM_NODES' in os.environ: print("Running on rhea") log.info("Running on rhea") return Machine(time, int(os.environ['SLURM_JOB_NUM_NODES']), 0, 32, "srun") # Summit if 'LSB_MAX_NUM_PROCESSORS' in os.environ: print("Running on summit") log.info("Running on summit") return Machine(time, int(os.environ['LSB_MAX_NUM_PROCESSORS'])//42, 6, 42, "jsrun") # Localhost -- allow running up to 4 jobs locally Loading Loading @@ -111,14 +113,14 @@ class Machine: # Or a tuple (nodes, time) indicating max #nodes and avg. time taken. def possible(self, R): if R.time > self.end_time - tick(): print("Unable to schedule due to time limitations.") log.warning("Unable to schedule due to time limitations.") return None nodes = R.min_nodes(self.CpN, self.GpN) if nodes is None: print("Unable to schedule due to inavailability of high gpu/cpu count nodes.") log.warning("Unable to schedule due to inavailability of high gpu/cpu count nodes.") return None if nodes > len(self.N): print("Unable to schedule due to inavailability of sufficient nodes.") log.warning("Unable to schedule due to inavailability of sufficient nodes.") return None return (nodes, R.time) Loading
make_test.py +13 −11 Original line number Diff line number Diff line Loading @@ -16,19 +16,21 @@ def main(argv): M = Machine(1e100, 4600, 6, 42, "jsrun") R = Manager(M, 1e100) print("Critical Path Statistics:") #print("Critical Path Statistics:") paths = G.crit_paths() print("Critical path time = %f min."%(paths[0][0])) stats = G.path_stats(M, paths) print(" ; ".join("start time nodes" for s in stats)) l = max(0, *(len(s) for s in stats)) for i in range(l): u = [] for s in stats: if len(s) <= i: u.append(" ") continue u.append("%5.0f %5.0f %5d"%(s[i][0], s[i][2], s[i][1])) print(" ; ".join(u)) print("Upper bound on nodes = %d."%sum(max(s[1] for s in step) for step in stats)) #print(" ; ".join("start time nodes" for s in stats)) #l = max(0, *(len(s) for s in stats)) #for i in range(l): # u = [] # for s in stats: # if len(s) <= i: # u.append(" ") # continue # u.append("%5.0f %5.0f %5d"%(s[i][0], s[i][2], s[i][1])) # print(" ; ".join(u)) R.run( G.get_work, True ) Loading
manager.py +2 −6 Original line number Diff line number Diff line Loading @@ -8,8 +8,6 @@ # track the resources available across the machine. import os, signal, time, logging from subprocess import STDOUT #from subprocess import check_call _tick = time.time tick = lambda: _tick()/60.0 Loading Loading @@ -48,10 +46,8 @@ class Manager: log.info("Process '%s' completed.", task) self.M.free(task.rs) task.finish(ret) ok = task.finish(ret) p.wait() #if p.stdout != STDOUT: # p.stdout.close() # get_work : Machine -> task | None # get_work is responsible for calling Machine.alloc Loading @@ -59,7 +55,7 @@ class Manager: # task = { # rs = ResourceSet # run = (testonly=False) -> Popen instance | None # finish = (return_val : int) -> None # finish = (return_val : int) -> ok : Bool # } def run(self, get_work, testonly=False): while self.live and self.end_time > tick(): Loading