Loading examples/rules.yaml +1 −1 Original line number Diff line number Diff line Loading @@ -28,7 +28,7 @@ grompp: mdrun: resource: time: 100.0 nrs: 2 nrs: "{replicas}" cpu: 42 gpu: 6 jsrun_attr: "-l gpu-cpu -d plane:6 -b packed:7" Loading examples/targets.yaml +2 −0 Original line number Diff line number Diff line Loading @@ -2,11 +2,13 @@ # a grompp step together with an mdrun. Cov1: dirname: Cov1 replicas: 20 out: xtc: run.xtc Cov2: dirname: Cov2 replicas: 10 out: xtc: run.xtc Loading graph.py +81 −42 Original line number Diff line number Diff line Loading @@ -74,7 +74,7 @@ class TaskGraph: # M : Machine # create priority queue of runnable tasks wt = prio_width(self.G) wt = self.prio_width() self.Q = PriorityQueue(wt) for task in self.G.nodes(): if self.is_root(task): Loading Loading @@ -142,25 +142,9 @@ class TaskGraph: str_warn("incomplete"), len(self.G) ) # Closure storing task and its associated TaskGraph # and providing prepare, finish callbacks. class Task: def __init__(self, task, prep, fin): self.task = task self.rs = task.params['resource'] self.prep = prep self.fin = fin def __str__(self): return str(self.task) def prepare(self, testonly=False): return self.prep(self.task, testonly) def finish(self, ret): return self.fin(self.task, ret) # prioritize by job resources 'till done def prio_width(G): def prio_width(self): G = self.G wt = {} topo = list(nx.topological_sort(G)) Loading @@ -170,20 +154,40 @@ def prio_width(G): wt[task] = wmax + R.time*R.nrs*(R.cpu+R.gpu*25) return wt def earliest_time(G): def earliest_start_time(self): G = self.G # Earliest possible starting time for job time = {} topo = list(nx.topological_sort(G)) for task in topo: if self.is_root(task): tmax = 0.0 else: tmax = max(0.0, *(time[s]+s.params['resource'].time for s in G.predecessors(task))) time[task] = tmax return time def earliest_finish_time(self): G = self.G # dole out heterogeneous earliest finish time priorities # By listing smallest possible "minutes remaining" for ea. task time = {} # dictionary of earliest possible finish time topo = list(nx.topological_sort(G)) for task in reversed(topo): if len(G[task]) == 0: tmax = 0.0 else: tmax = max(0.0, *(time[s] for s in G[task])) time[task] = tmax + task.params['resource'].time return time def crit_paths(G): def crit_paths(self): G = self.G # Gather a list of critical paths, sorted from # longest finish time to shortest. time = earliest_time(G) time = self.earliest_finish_time() paths = [] while len(time) > 0: Loading @@ -199,6 +203,41 @@ def crit_paths(G): paths.append( (t, crit) ) return paths # Produce statistics on this path. # path : [task] # M : Machine (will call M.possible() for node # and time info.) # return : [(expected start time, nodes used)] def path_stats(self, M, paths): time = self.earliest_start_time() stats = [] for t_remain, path in paths: stat = [] for task in path: x = M.possible(task.params['resource']) if x is None: break stat.append( (time[task],) + x ) # t_start, nodes, dt stats.append(stat) return stats # Closure storing task and its associated TaskGraph # and providing prepare, finish callbacks. class Task: def __init__(self, task, prep, fin): self.task = task self.rs = task.params['resource'] self.prep = prep self.fin = fin def __str__(self): return str(self.task) def prepare(self, testonly=False): return self.prep(self.task, testonly) def finish(self, ret): return self.fin(self.task, ret) # wt is a dictionary mapping all potential tasks to priorities. # self.l holds the heap of tasks actually in the queue. # push(task) pushes a task into the queue (must be in wt) Loading Loading @@ -236,7 +275,7 @@ class PriorityQueue: cmd = M.alloc(task.params['resource']) if cmd is not None: break # this task matches requirements if M.possible(task.params['resource']): if M.possible(task.params['resource']) is not None: tmp.append(task) # maybe can do later else: if len(tmp) == 0: Loading helpers.py +3 −2 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ import re import subprocess from pathlib import Path import codecs import yaml from os import listdir from colorama import Fore, Back, Style Loading Loading @@ -46,8 +47,8 @@ def format_dict(dct, kws, skip=[]): while len(fmt) > 0: dct = fmt.pop() for k, v in dct.items(): if isinstance(v, str) and k not in skip: dct[k] = v.format(**kws) if isinstance(v, str) and k not in skip and '{' in v: dct[k] = yaml.safe_load( v.format(**kws) ) elif isinstance(v, dict): fmt.append(v) Loading machine.py +14 −4 Original line number Diff line number Diff line Loading @@ -47,6 +47,11 @@ class ResourceSet: self.srun += " %s" % self.srun_attr self.local = "" def __str__(self): return repr(self) def __repr__(self): return 'ResourceSet("%f, %d, %d, gpu=%d, tasks=%d, jsrun_attr=%s, srun_attr=%s")'%( self.time,self.nrs,self.cpu,self.gpu,self.tasks,self.jsrun_attr,self.srun_attr) def ready(self): self.ready_time = tick() Loading Loading @@ -80,18 +85,23 @@ class Machine: self.N = [[GpN, CpN] for i in range(N)] # Returns True if this allocation is possible # on the machine at all. # Returns None if this allocation is impossible to schedule. # Or a tuple (nodes, time) indicating max #nodes and avg. time taken. def possible(self, R): if R.time > self.end_time - tick(): print("Unable to schedule due to time limitations.") return None if R.cpu > self.CpN or R.gpu > self.GpN or R.cpu < 1 or R.gpu < 0: return False print("Unable to schedule due to inavailability of high gpu/cpu count nodes.") return None n = self.CpN//R.cpu if R.gpu > 0: n = min(n, self.GpN//R.gpu) # max nrs per node return R.nrs <= len(self.N)*n nodes = (R.nrs+n-1) // n if nodes > len(self.N): print("Unable to schedule due to inavailability of sufficient nodes.") return None return (nodes, R.time) # Returns `cmd : str` if allocation succeeds None otherwise. # if test == True, then resource are not actually consumed. Loading Loading
examples/rules.yaml +1 −1 Original line number Diff line number Diff line Loading @@ -28,7 +28,7 @@ grompp: mdrun: resource: time: 100.0 nrs: 2 nrs: "{replicas}" cpu: 42 gpu: 6 jsrun_attr: "-l gpu-cpu -d plane:6 -b packed:7" Loading
examples/targets.yaml +2 −0 Original line number Diff line number Diff line Loading @@ -2,11 +2,13 @@ # a grompp step together with an mdrun. Cov1: dirname: Cov1 replicas: 20 out: xtc: run.xtc Cov2: dirname: Cov2 replicas: 10 out: xtc: run.xtc Loading
graph.py +81 −42 Original line number Diff line number Diff line Loading @@ -74,7 +74,7 @@ class TaskGraph: # M : Machine # create priority queue of runnable tasks wt = prio_width(self.G) wt = self.prio_width() self.Q = PriorityQueue(wt) for task in self.G.nodes(): if self.is_root(task): Loading Loading @@ -142,25 +142,9 @@ class TaskGraph: str_warn("incomplete"), len(self.G) ) # Closure storing task and its associated TaskGraph # and providing prepare, finish callbacks. class Task: def __init__(self, task, prep, fin): self.task = task self.rs = task.params['resource'] self.prep = prep self.fin = fin def __str__(self): return str(self.task) def prepare(self, testonly=False): return self.prep(self.task, testonly) def finish(self, ret): return self.fin(self.task, ret) # prioritize by job resources 'till done def prio_width(G): def prio_width(self): G = self.G wt = {} topo = list(nx.topological_sort(G)) Loading @@ -170,20 +154,40 @@ def prio_width(G): wt[task] = wmax + R.time*R.nrs*(R.cpu+R.gpu*25) return wt def earliest_time(G): def earliest_start_time(self): G = self.G # Earliest possible starting time for job time = {} topo = list(nx.topological_sort(G)) for task in topo: if self.is_root(task): tmax = 0.0 else: tmax = max(0.0, *(time[s]+s.params['resource'].time for s in G.predecessors(task))) time[task] = tmax return time def earliest_finish_time(self): G = self.G # dole out heterogeneous earliest finish time priorities # By listing smallest possible "minutes remaining" for ea. task time = {} # dictionary of earliest possible finish time topo = list(nx.topological_sort(G)) for task in reversed(topo): if len(G[task]) == 0: tmax = 0.0 else: tmax = max(0.0, *(time[s] for s in G[task])) time[task] = tmax + task.params['resource'].time return time def crit_paths(G): def crit_paths(self): G = self.G # Gather a list of critical paths, sorted from # longest finish time to shortest. time = earliest_time(G) time = self.earliest_finish_time() paths = [] while len(time) > 0: Loading @@ -199,6 +203,41 @@ def crit_paths(G): paths.append( (t, crit) ) return paths # Produce statistics on this path. # path : [task] # M : Machine (will call M.possible() for node # and time info.) # return : [(expected start time, nodes used)] def path_stats(self, M, paths): time = self.earliest_start_time() stats = [] for t_remain, path in paths: stat = [] for task in path: x = M.possible(task.params['resource']) if x is None: break stat.append( (time[task],) + x ) # t_start, nodes, dt stats.append(stat) return stats # Closure storing task and its associated TaskGraph # and providing prepare, finish callbacks. class Task: def __init__(self, task, prep, fin): self.task = task self.rs = task.params['resource'] self.prep = prep self.fin = fin def __str__(self): return str(self.task) def prepare(self, testonly=False): return self.prep(self.task, testonly) def finish(self, ret): return self.fin(self.task, ret) # wt is a dictionary mapping all potential tasks to priorities. # self.l holds the heap of tasks actually in the queue. # push(task) pushes a task into the queue (must be in wt) Loading Loading @@ -236,7 +275,7 @@ class PriorityQueue: cmd = M.alloc(task.params['resource']) if cmd is not None: break # this task matches requirements if M.possible(task.params['resource']): if M.possible(task.params['resource']) is not None: tmp.append(task) # maybe can do later else: if len(tmp) == 0: Loading
helpers.py +3 −2 Original line number Diff line number Diff line Loading @@ -2,6 +2,7 @@ import re import subprocess from pathlib import Path import codecs import yaml from os import listdir from colorama import Fore, Back, Style Loading Loading @@ -46,8 +47,8 @@ def format_dict(dct, kws, skip=[]): while len(fmt) > 0: dct = fmt.pop() for k, v in dct.items(): if isinstance(v, str) and k not in skip: dct[k] = v.format(**kws) if isinstance(v, str) and k not in skip and '{' in v: dct[k] = yaml.safe_load( v.format(**kws) ) elif isinstance(v, dict): fmt.append(v) Loading
machine.py +14 −4 Original line number Diff line number Diff line Loading @@ -47,6 +47,11 @@ class ResourceSet: self.srun += " %s" % self.srun_attr self.local = "" def __str__(self): return repr(self) def __repr__(self): return 'ResourceSet("%f, %d, %d, gpu=%d, tasks=%d, jsrun_attr=%s, srun_attr=%s")'%( self.time,self.nrs,self.cpu,self.gpu,self.tasks,self.jsrun_attr,self.srun_attr) def ready(self): self.ready_time = tick() Loading Loading @@ -80,18 +85,23 @@ class Machine: self.N = [[GpN, CpN] for i in range(N)] # Returns True if this allocation is possible # on the machine at all. # Returns None if this allocation is impossible to schedule. # Or a tuple (nodes, time) indicating max #nodes and avg. time taken. def possible(self, R): if R.time > self.end_time - tick(): print("Unable to schedule due to time limitations.") return None if R.cpu > self.CpN or R.gpu > self.GpN or R.cpu < 1 or R.gpu < 0: return False print("Unable to schedule due to inavailability of high gpu/cpu count nodes.") return None n = self.CpN//R.cpu if R.gpu > 0: n = min(n, self.GpN//R.gpu) # max nrs per node return R.nrs <= len(self.N)*n nodes = (R.nrs+n-1) // n if nodes > len(self.N): print("Unable to schedule due to inavailability of sufficient nodes.") return None return (nodes, R.time) # Returns `cmd : str` if allocation succeeds None otherwise. # if test == True, then resource are not actually consumed. Loading