Loading README.rst +11 −9 Original line number Diff line number Diff line Loading @@ -5,16 +5,14 @@ Simulation Campaigns are listed make-style, one-by-one in a flat yaml-file:: --- make.yaml proj1: &defaults param: 1 sauce: 1 dirname: Proj1 resources: nrs: 500 out: log: run.log data: grid.hd5 proj2: param: 2 sauce: 2 dirname: Proj2 <<: *defaults Loading @@ -22,8 +20,11 @@ Job types contain run-scripts and list input/output files:: --- jobtypes.yaml sim_flow: time: 60 # minutes vars: - sauce resources: time: 60 # minutes nrs: 500 tasks: 6 gpu: 6 cpu: 42 Loading @@ -37,7 +38,7 @@ Job types contain run-scripts and list input/output files:: module load gcc spectrum-mpi export OMP_NUM_THREADS=7 script: | {mpirun} sim_flow {inp[params} {out[data]} >{out[log]} {mpirun} sim_flow -s {sauce} {inp[params} {out[data]} >{out[log]} The jobs are used within an allocation and make use of all available resources in parallel:: Loading @@ -50,6 +51,7 @@ The jobs are used within an allocation and make use of all available resources i #BSUB -wt 2 # 2 min. before job completion. module load python/3 umask 0002 python3 make.py jobtypes.yaml make.yaml 60 Getting Started Loading @@ -62,7 +64,7 @@ to make, fill them into your own `make.yaml`. For help with syntax, see the `examples` subdirectory. There are 2 important things to keep in mind for interacting with the scheduler. Every job must contain keys for `resources` and `time` (in minutes). the scheduler. Every job must contain a `resources` section. It is possible to choose your allocation poorly, and try running 2 jobs requiring 500 nodes on 999 nodes, in which case 499 nodes will sit idle. It is also possible to run out of time to complete all jobs. Loading graph.py +54 −21 Original line number Diff line number Diff line Loading @@ -25,8 +25,20 @@ def append_graph(G, types, rule, args): rule = addl.pop() for ttype,tname in rule.inp.items(): p = rule.dirname / tname if p.exists(): # TODO: implement 'completeness' test? continue # or check that all outputs of a given job (assuming lookup is successful) are present. if p.exists(): try: jobname, ftype = types.lookup[tname] except KeyError: continue # no creating job found # Check for newer inputs. mtime = p.stat().st_mtime for t,f in types[jobname](args).inp.items(): # need to re-run f = rule.dirname/f if f.stat().st_mtime > mtime: print("File %s is newer than %s - re-running %s"%(f, p, jobname)) break else: continue try: jobname, ftype = types.lookup[tname] except KeyError: Loading Loading @@ -59,8 +71,9 @@ class TaskGraph: del args['out'] # not inherited append_graph(self.G, types, task, args) # M : Machine # create priority queue of runnable tasks wt = prio_heft(self.G) wt = prio_width(self.G) self.Q = PriorityQueue(wt) for task in self.G.nodes(): if self.is_root(task): Loading @@ -86,8 +99,6 @@ class TaskGraph: # finish = (return_val : int) -> None # } def get_work(self, M): if self.verb: print("Called get_work (%d items)"%len(self.Q)) task = self.Q.get_task(M) if task is None: return None Loading @@ -102,8 +113,6 @@ class TaskGraph: return cmd def finish(self, task, ret): if self.verb: print("Called finish (%d items in queue)"%len(self.Q)) if ret != 0: self.errors += 1 print("Job returned nonzero exit code, %d.\n"%ret) Loading @@ -113,13 +122,15 @@ class TaskGraph: # enqueue newly enabled jobs tset = list( self.G[task] ) suc = 0 self.G.remove_node(task) for task in tset: if self.is_root(task): self.Q.push(task) suc += 1 if self.verb: print(" Added dependent jobs (%d items in queue)"%len(self.Q)) if self.verb and len(tset) > 0: print(" Added %d/%d successor jobs (%d items in queue)"%(suc,len(tset),len(self.Q))) return False def stats(self): Loading Loading @@ -147,24 +158,46 @@ class Task: def finish(self, ret): return self.fin(self.task, ret) # dole out heterogeneous earliest finish time priorities def prio_heft(G): # prioritize by job resources 'till done def prio_width(G): wt = {} def visit(task): wmax = 0.0 for s in G[task]: if s not in wt: visit(s) wmax = max(wmax, wt[s]) topo = list(nx.topological_sort(G)) for task in reversed(topo): wmax = reduce(max, (wt[s] for s in G[task]), 0.0) R = task.resource wt[task] = wmax + R.time*R.nrs*(R.cpu+R.gpu*25) for task in G.nodes(): if task not in wt: visit(task) return wt def earliest_time(G): # dole out heterogeneous earliest finish time priorities time = {} # dictionary of earliest possible finish time topo = list(nx.topological_sort(G)) for task in reversed(topo): tmax = max(0.0, *(time[s] for s in G[task])) time[task] = tmax + task.resource.time return time def crit_paths(G): # Gather a list of critical paths, sorted from # longest finish time to shortest. time = earliest_time(G) paths = [] while len(time) > 0: # Start from any node with max finish time and determine # the graph's critical path. t, task = max( (t,j) for j,t in time.items() ) crit = [ task ] del time[task] while len(G[task]) > 0: _, task = max( (time[j], j) for j in G[task] ) crit.append(task) del time[task] paths.append( (t, crit) ) print(paths) # wt is a dictionary mapping all potential tasks to priorities. # self.l holds the heap of tasks actually in the queue. # push(task) pushes a task into the queue (must be in wt) Loading jobtypes.py +8 −2 Original line number Diff line number Diff line import yaml #from jinja2 import Template from helpers import * from machine import ResourceSet Loading Loading @@ -68,15 +69,20 @@ class Job: def __eq__(a, b): return repr(a) == repr(b) # TODO: use Template(getattr(self, var)).render(kws) for non-script tags # to (and include kws in self & reqs) to implement waterfall substitution. # potentially create self-contained dict. of params to make this happen instead # of patching self-attributes... def typecheck(self, kws): for uniq in ['inp', 'out']: # non-overridable by args for uniq in ['inp', 'out', 'resource', 'setup', 'script']: # non-overridable by args assert uniq not in kws, "Invalid input." # Copy kws into reqs # Fill-in from kws and type-check class vars for var, cls in self.reqs.items(): if hasattr(self, var): # vars in jobs take precedence pass elif var in kws: # initialize from yaml data elif var in kws: # initialize at instantiation time val = kws[var] if isinstance(val, dict) and hasattr(self, var) \ and isinstance(getattr(self,var), dict): # merge together Loading Loading
README.rst +11 −9 Original line number Diff line number Diff line Loading @@ -5,16 +5,14 @@ Simulation Campaigns are listed make-style, one-by-one in a flat yaml-file:: --- make.yaml proj1: &defaults param: 1 sauce: 1 dirname: Proj1 resources: nrs: 500 out: log: run.log data: grid.hd5 proj2: param: 2 sauce: 2 dirname: Proj2 <<: *defaults Loading @@ -22,8 +20,11 @@ Job types contain run-scripts and list input/output files:: --- jobtypes.yaml sim_flow: time: 60 # minutes vars: - sauce resources: time: 60 # minutes nrs: 500 tasks: 6 gpu: 6 cpu: 42 Loading @@ -37,7 +38,7 @@ Job types contain run-scripts and list input/output files:: module load gcc spectrum-mpi export OMP_NUM_THREADS=7 script: | {mpirun} sim_flow {inp[params} {out[data]} >{out[log]} {mpirun} sim_flow -s {sauce} {inp[params} {out[data]} >{out[log]} The jobs are used within an allocation and make use of all available resources in parallel:: Loading @@ -50,6 +51,7 @@ The jobs are used within an allocation and make use of all available resources i #BSUB -wt 2 # 2 min. before job completion. module load python/3 umask 0002 python3 make.py jobtypes.yaml make.yaml 60 Getting Started Loading @@ -62,7 +64,7 @@ to make, fill them into your own `make.yaml`. For help with syntax, see the `examples` subdirectory. There are 2 important things to keep in mind for interacting with the scheduler. Every job must contain keys for `resources` and `time` (in minutes). the scheduler. Every job must contain a `resources` section. It is possible to choose your allocation poorly, and try running 2 jobs requiring 500 nodes on 999 nodes, in which case 499 nodes will sit idle. It is also possible to run out of time to complete all jobs. Loading
graph.py +54 −21 Original line number Diff line number Diff line Loading @@ -25,8 +25,20 @@ def append_graph(G, types, rule, args): rule = addl.pop() for ttype,tname in rule.inp.items(): p = rule.dirname / tname if p.exists(): # TODO: implement 'completeness' test? continue # or check that all outputs of a given job (assuming lookup is successful) are present. if p.exists(): try: jobname, ftype = types.lookup[tname] except KeyError: continue # no creating job found # Check for newer inputs. mtime = p.stat().st_mtime for t,f in types[jobname](args).inp.items(): # need to re-run f = rule.dirname/f if f.stat().st_mtime > mtime: print("File %s is newer than %s - re-running %s"%(f, p, jobname)) break else: continue try: jobname, ftype = types.lookup[tname] except KeyError: Loading Loading @@ -59,8 +71,9 @@ class TaskGraph: del args['out'] # not inherited append_graph(self.G, types, task, args) # M : Machine # create priority queue of runnable tasks wt = prio_heft(self.G) wt = prio_width(self.G) self.Q = PriorityQueue(wt) for task in self.G.nodes(): if self.is_root(task): Loading @@ -86,8 +99,6 @@ class TaskGraph: # finish = (return_val : int) -> None # } def get_work(self, M): if self.verb: print("Called get_work (%d items)"%len(self.Q)) task = self.Q.get_task(M) if task is None: return None Loading @@ -102,8 +113,6 @@ class TaskGraph: return cmd def finish(self, task, ret): if self.verb: print("Called finish (%d items in queue)"%len(self.Q)) if ret != 0: self.errors += 1 print("Job returned nonzero exit code, %d.\n"%ret) Loading @@ -113,13 +122,15 @@ class TaskGraph: # enqueue newly enabled jobs tset = list( self.G[task] ) suc = 0 self.G.remove_node(task) for task in tset: if self.is_root(task): self.Q.push(task) suc += 1 if self.verb: print(" Added dependent jobs (%d items in queue)"%len(self.Q)) if self.verb and len(tset) > 0: print(" Added %d/%d successor jobs (%d items in queue)"%(suc,len(tset),len(self.Q))) return False def stats(self): Loading Loading @@ -147,24 +158,46 @@ class Task: def finish(self, ret): return self.fin(self.task, ret) # dole out heterogeneous earliest finish time priorities def prio_heft(G): # prioritize by job resources 'till done def prio_width(G): wt = {} def visit(task): wmax = 0.0 for s in G[task]: if s not in wt: visit(s) wmax = max(wmax, wt[s]) topo = list(nx.topological_sort(G)) for task in reversed(topo): wmax = reduce(max, (wt[s] for s in G[task]), 0.0) R = task.resource wt[task] = wmax + R.time*R.nrs*(R.cpu+R.gpu*25) for task in G.nodes(): if task not in wt: visit(task) return wt def earliest_time(G): # dole out heterogeneous earliest finish time priorities time = {} # dictionary of earliest possible finish time topo = list(nx.topological_sort(G)) for task in reversed(topo): tmax = max(0.0, *(time[s] for s in G[task])) time[task] = tmax + task.resource.time return time def crit_paths(G): # Gather a list of critical paths, sorted from # longest finish time to shortest. time = earliest_time(G) paths = [] while len(time) > 0: # Start from any node with max finish time and determine # the graph's critical path. t, task = max( (t,j) for j,t in time.items() ) crit = [ task ] del time[task] while len(G[task]) > 0: _, task = max( (time[j], j) for j in G[task] ) crit.append(task) del time[task] paths.append( (t, crit) ) print(paths) # wt is a dictionary mapping all potential tasks to priorities. # self.l holds the heap of tasks actually in the queue. # push(task) pushes a task into the queue (must be in wt) Loading
jobtypes.py +8 −2 Original line number Diff line number Diff line import yaml #from jinja2 import Template from helpers import * from machine import ResourceSet Loading Loading @@ -68,15 +69,20 @@ class Job: def __eq__(a, b): return repr(a) == repr(b) # TODO: use Template(getattr(self, var)).render(kws) for non-script tags # to (and include kws in self & reqs) to implement waterfall substitution. # potentially create self-contained dict. of params to make this happen instead # of patching self-attributes... def typecheck(self, kws): for uniq in ['inp', 'out']: # non-overridable by args for uniq in ['inp', 'out', 'resource', 'setup', 'script']: # non-overridable by args assert uniq not in kws, "Invalid input." # Copy kws into reqs # Fill-in from kws and type-check class vars for var, cls in self.reqs.items(): if hasattr(self, var): # vars in jobs take precedence pass elif var in kws: # initialize from yaml data elif var in kws: # initialize at instantiation time val = kws[var] if isinstance(val, dict) and hasattr(self, var) \ and isinstance(getattr(self,var), dict): # merge together Loading