Loading graph.py +20 −10 Original line number Diff line number Diff line Loading @@ -41,8 +41,9 @@ def append_graph(G, types, rule, args): addl.append(job) # follow-up with inputs of this job class TaskGraph: def __init__(self, types, jobs, testonly=False): def __init__(self, types, jobs, testonly=False, verb=False): self.testonly = testonly self.verb = verb self.G = nx.DiGraph() for rule, args in jobs.items(): if 'dirname' not in args: Loading Loading @@ -85,6 +86,8 @@ class TaskGraph: # finish = (return_val : int) -> None # } def get_work(self, M, time): if self.verb: print("Called get_work (%d items)"%len(self.Q)) task = self.Q.get_task(M, time) if task is None: return None Loading @@ -99,6 +102,8 @@ class TaskGraph: return cmd def finish(self, task, ret): if self.verb: print("Called finish (%d items in queue)"%len(self.Q)) if ret != 0: self.errors += 1 print("Job returned nonzero exit code, %d.\n"%ret) Loading @@ -113,6 +118,8 @@ class TaskGraph: if self.is_root(task): self.Q.push(task) if self.verb: print(" Added dependent jobs (%d items in queue)"%len(self.Q)) return False def stats(self): Loading Loading @@ -141,15 +148,19 @@ class Task: # dole out heterogeneous earliest finish time priorities def prio_heft(G): queue = [k for k in G.nodes() if len(G[k]) == 0] wt = {} while len(queue) > 0: task = queue.pop() w = reduce(max, (wt[s] for s in G[task]), 0) wt[task] = w + task.time*task.resource.nrs*(task.resource.cpu+task.resource.gpu*25) for s in G.predecessors(task): def visit(task): wmax = 0.0 for s in G[task]: if s not in wt: queue.append(s) visit(s) wmax = max(wmax, wt[s]) wt[task] = wmax + task.time*task.resource.nrs*(task.resource.cpu+task.resource.gpu*25) for task in G.nodes(): if task not in wt: visit(task) return wt # wt is a dictionary mapping all potential tasks to priorities. Loading Loading @@ -177,7 +188,6 @@ class PriorityQueue: # task.mpirun = cmd # # Returns Job class or None # Raises StopIteration when no runnable jobs are left. # Cannot return None when provided the whole machine. # # M : Machine -- machine availability status Loading @@ -199,7 +209,7 @@ class PriorityQueue: tmp.append(task) # maybe can do later else: if len(tmp) == 0: raise StopIteration return None for j in tmp: # contains all jobs for which cmd == None self.push(j) Loading helpers.py +1 −1 Original line number Diff line number Diff line Loading @@ -100,7 +100,7 @@ class JobState: return str_err("Job %s resulted in error:\n"%self.job_out) + head if self.done: return str_ok("Job %s completed successfully."%self.job_out) return str_warn("Blank job %s"%(self.dirname / self.jobname)) return str_warn("job %s"%(self.dirname / self.jobname)) def file_paths(self, fnames, verb=False): # Get the file paths from this job's dir. # returns (True/False, {key:path}), where True == all paths present Loading jobtypes.py +23 −3 Original line number Diff line number Diff line Loading @@ -21,6 +21,10 @@ class Top: return repr(a) > repr(b) def __repr__(self): return "Top(%s, {'dirname': %s})"%(self.jobname, self.dirname) def __hash__(self): return hash(repr(self)) def __eq__(a, b): return repr(a) == repr(b) def prepare(self, testonly=False): if testonly: return 'echo "%s"' % str_ok("Would have finished target %s."%self.jobname) Loading @@ -40,15 +44,31 @@ class Job: } inp = {} setup = "" mpirun = "" def __init__(self, kws): # kws : {} try: self.typecheck(kws) def __repr__(self): # for uniqueness, we can ignore other params return "Job(%s, {'dirname': '%s'})"%(self.jobname, self.dirname) except TypeError as e: print("Error instantiating jobtype %s"%self.jobname) raise try: self.format(jobscript % self.script) except (KeyError, IndexError) as e: print("Error substituting script for jobtype %s"%self.jobname) raise def __lt__(a, b): return repr(a) < repr(b) def __gt__(a, b): return repr(a) > repr(b) def __repr__(self): # for uniqueness, we can ignore other params return "Job(%s, {'dirname': '%s'})"%(self.jobname, self.dirname) def __hash__(self): return hash(repr(self)) def __eq__(a, b): return repr(a) == repr(b) def typecheck(self, kws): for uniq in ['inp', 'out']: # non-overridable by args Loading manager.py +3 −4 Original line number Diff line number Diff line Loading @@ -63,12 +63,11 @@ class Manager: # } def run(self, get_work, testonly=False): while self.live and self.end_time > tick(): try: task = get_work(self.M, self.end_time-tick()) except StopIteration: break if task is None: if len(self.procs) == 0: # done break try: self.wait_any() except Exception as e: # FIXME: distinguish types of brokenness Loading Loading
graph.py +20 −10 Original line number Diff line number Diff line Loading @@ -41,8 +41,9 @@ def append_graph(G, types, rule, args): addl.append(job) # follow-up with inputs of this job class TaskGraph: def __init__(self, types, jobs, testonly=False): def __init__(self, types, jobs, testonly=False, verb=False): self.testonly = testonly self.verb = verb self.G = nx.DiGraph() for rule, args in jobs.items(): if 'dirname' not in args: Loading Loading @@ -85,6 +86,8 @@ class TaskGraph: # finish = (return_val : int) -> None # } def get_work(self, M, time): if self.verb: print("Called get_work (%d items)"%len(self.Q)) task = self.Q.get_task(M, time) if task is None: return None Loading @@ -99,6 +102,8 @@ class TaskGraph: return cmd def finish(self, task, ret): if self.verb: print("Called finish (%d items in queue)"%len(self.Q)) if ret != 0: self.errors += 1 print("Job returned nonzero exit code, %d.\n"%ret) Loading @@ -113,6 +118,8 @@ class TaskGraph: if self.is_root(task): self.Q.push(task) if self.verb: print(" Added dependent jobs (%d items in queue)"%len(self.Q)) return False def stats(self): Loading Loading @@ -141,15 +148,19 @@ class Task: # dole out heterogeneous earliest finish time priorities def prio_heft(G): queue = [k for k in G.nodes() if len(G[k]) == 0] wt = {} while len(queue) > 0: task = queue.pop() w = reduce(max, (wt[s] for s in G[task]), 0) wt[task] = w + task.time*task.resource.nrs*(task.resource.cpu+task.resource.gpu*25) for s in G.predecessors(task): def visit(task): wmax = 0.0 for s in G[task]: if s not in wt: queue.append(s) visit(s) wmax = max(wmax, wt[s]) wt[task] = wmax + task.time*task.resource.nrs*(task.resource.cpu+task.resource.gpu*25) for task in G.nodes(): if task not in wt: visit(task) return wt # wt is a dictionary mapping all potential tasks to priorities. Loading Loading @@ -177,7 +188,6 @@ class PriorityQueue: # task.mpirun = cmd # # Returns Job class or None # Raises StopIteration when no runnable jobs are left. # Cannot return None when provided the whole machine. # # M : Machine -- machine availability status Loading @@ -199,7 +209,7 @@ class PriorityQueue: tmp.append(task) # maybe can do later else: if len(tmp) == 0: raise StopIteration return None for j in tmp: # contains all jobs for which cmd == None self.push(j) Loading
helpers.py +1 −1 Original line number Diff line number Diff line Loading @@ -100,7 +100,7 @@ class JobState: return str_err("Job %s resulted in error:\n"%self.job_out) + head if self.done: return str_ok("Job %s completed successfully."%self.job_out) return str_warn("Blank job %s"%(self.dirname / self.jobname)) return str_warn("job %s"%(self.dirname / self.jobname)) def file_paths(self, fnames, verb=False): # Get the file paths from this job's dir. # returns (True/False, {key:path}), where True == all paths present Loading
jobtypes.py +23 −3 Original line number Diff line number Diff line Loading @@ -21,6 +21,10 @@ class Top: return repr(a) > repr(b) def __repr__(self): return "Top(%s, {'dirname': %s})"%(self.jobname, self.dirname) def __hash__(self): return hash(repr(self)) def __eq__(a, b): return repr(a) == repr(b) def prepare(self, testonly=False): if testonly: return 'echo "%s"' % str_ok("Would have finished target %s."%self.jobname) Loading @@ -40,15 +44,31 @@ class Job: } inp = {} setup = "" mpirun = "" def __init__(self, kws): # kws : {} try: self.typecheck(kws) def __repr__(self): # for uniqueness, we can ignore other params return "Job(%s, {'dirname': '%s'})"%(self.jobname, self.dirname) except TypeError as e: print("Error instantiating jobtype %s"%self.jobname) raise try: self.format(jobscript % self.script) except (KeyError, IndexError) as e: print("Error substituting script for jobtype %s"%self.jobname) raise def __lt__(a, b): return repr(a) < repr(b) def __gt__(a, b): return repr(a) > repr(b) def __repr__(self): # for uniqueness, we can ignore other params return "Job(%s, {'dirname': '%s'})"%(self.jobname, self.dirname) def __hash__(self): return hash(repr(self)) def __eq__(a, b): return repr(a) == repr(b) def typecheck(self, kws): for uniq in ['inp', 'out']: # non-overridable by args Loading
manager.py +3 −4 Original line number Diff line number Diff line Loading @@ -63,12 +63,11 @@ class Manager: # } def run(self, get_work, testonly=False): while self.live and self.end_time > tick(): try: task = get_work(self.M, self.end_time-tick()) except StopIteration: break if task is None: if len(self.procs) == 0: # done break try: self.wait_any() except Exception as e: # FIXME: distinguish types of brokenness Loading