Commit d3f5d050 authored by David M. Rogers's avatar David M. Rogers
Browse files

Manage machine at top-level, move time into Machine and ResourceSet classes.

parent 3f070a4d
Loading
Loading
Loading
Loading
+10 −14
Original line number Diff line number Diff line
@@ -77,7 +77,7 @@ class TaskGraph:
            return True
        return False

    # get_work : Machine, time_in_min -> task | None
    # get_work : Machine -> task | None
    #     get_work is responsible for calling Machine.alloc
    #     Manager calls Machine.free
    # task = {
@@ -85,10 +85,10 @@ class TaskGraph:
    #          prepare = (testonly=False) -> [cmd_string] | None
    #          finish  = (return_val : int) -> None
    #        }
    def get_work(self, M, time):
    def get_work(self, M):
        if self.verb:
            print("Called get_work (%d items)"%len(self.Q))
        task = self.Q.get_task(M, time)
        task = self.Q.get_task(M)
        if task is None:
            return None
        # wrap self.prepare & self.finish in the requested API
@@ -135,7 +135,6 @@ class TaskGraph:
class Task:
    def __init__(self, task, prep, fin):
        self.task = task
        self.time = task.time
        self.rs   = task.resource

        self.prep = prep
@@ -156,7 +155,8 @@ def prio_heft(G):
                visit(s)
            wmax = max(wmax, wt[s])

        wt[task] = wmax + task.time*task.resource.nrs*(task.resource.cpu+task.resource.gpu*25)
        R = task.resource
        wt[task] = wmax + R.time*R.nrs*(R.cpu+R.gpu*25)

    for task in G.nodes():
        if task not in wt:
@@ -188,23 +188,19 @@ class PriorityQueue:
    #          task.mpirun = cmd
    #
    # Returns Job class or None
    # Cannot return None when provided the whole machine.
    # Cannot return None when jobs passing M.possible()
    # are available and the whole machine is available
    # (so no jobs are currently running).
    #
    # M         : Machine -- machine availability status
    # time      : float   -- minutes available now, upper
    #                        limit on returned job's time
    def get_task(self, M, time):
    def get_task(self, M):
        tmp = [] # temporarily removed tasks
        for i in range(len(self.l)):
            task = self.pop()
            if task.time > time: # won't complete
                print("Skipping %s (requires %.0f minutes, but have only %.0f)"
                     % (task, task.time, time) )
                continue

            cmd = M.alloc(task.resource)
            if cmd is not None:
                break # next task matches requirements
                break # this task matches requirements
            if M.possible(task.resource):
                tmp.append(task) # maybe can do later
        else:
+1 −3
Original line number Diff line number Diff line
@@ -10,8 +10,7 @@ cd {dirname}

# Top-level rules are weak
class Top:
    time = 0.0
    resource = ResourceSet(1, 1, 0)
    resource = ResourceSet(0.0, 1, 1, 0)
    def __init__(self, jobname, args):
        self.jobname = jobname
        self.dirname = args['dirname']
@@ -36,7 +35,6 @@ class Job:
    reqs = { 'dirname' : Path,
             'jobname' : str,
             'resource': ResourceSet,
             'time'    : float,
             'inp'     : dict,
             'out'     : dict,
             'setup'   : str,
+18 −9
Original line number Diff line number Diff line
# Models the job launch environment by tracking available GPUs and CPUs.
# Basic usage:
#   M = machine()
#   M = machine(time_avail)
#   print( "%d nodes (%d CPUs, %d GPUs) on %s"%(len(M.N), len(M.N)*M.CpN, len(M.N)*M.GpN, M.launcher) )
#   Rs = [ ResourceSet(1, 1, 0)  # 1 CPU job
#        , ResourceSet(1, 6, 6)  # 6 GPU job
@@ -12,11 +12,14 @@
#   for R in Rs: # free resources
#      M.free(R)

import os
import os, time
_tick = time.time
tick = lambda: _tick()/60.0

class ResourceSet:
    def __init__(self, nrs, cpu, gpu=0, tasks=1, jsrun_attr=None, srun_attr=None):
    def __init__(self, time, nrs, cpu, gpu=0, tasks=1, jsrun_attr=None, srun_attr=None):
        assert cpu > 0
        self.time = time
        self.nrs = nrs
        self.cpu = cpu
        self.gpu = gpu
@@ -37,23 +40,25 @@ class ResourceSet:

        self.local = ""

def machine():
def machine(time):
    # Rhea
    if 'SLURM_JOB_NUM_NODES' in os.environ:
        return Machine(int(os.environ['SLURM_JOB_NUM_NODES']), 0, 16, "srun") # or 32 (2-thread?)
        return Machine(time, int(os.environ['SLURM_JOB_NUM_NODES']), 0, 16, "srun") # or 32 (2-thread?)

    # Summit
    if 'LSB_MAX_NUM_PROCESSORS' in os.environ:
        return Machine(int(os.environ['LSB_MAX_NUM_PROCESSORS'])//42, 6, 42, "jsrun")
        return Machine(time, int(os.environ['LSB_MAX_NUM_PROCESSORS'])//42, 6, 42, "jsrun")

    # Localhost -- allow running up to 4 jobs locally
    return Machine(4, 0, 1, "local")
    return Machine(time, 4, 0, 1, "local")

class Machine:
    # time = max available time in minutes
    # N = Nodes
    # GpN = GPU / Node
    # CpN = CPU / Node
    def __init__(self, N, GpN, CpN, launcher):
    def __init__(self, time, N, GpN, CpN, launcher):
        self.end_time = tick() + time
        self.GpN = GpN
        self.CpN = CpN
        self.launcher = launcher
@@ -63,6 +68,9 @@ class Machine:
    # Returns True if this allocation is possible
    # on the machine at all.
    def possible(self, R):
        if R.time > self.end_time - tick():
            print("Unable to schedule due to time limitations.")
            return None
        if R.cpu > self.CpN or R.gpu > self.GpN or R.cpu < 1 or R.gpu < 0:
            return False
        n = self.CpN//R.cpu
@@ -73,6 +81,8 @@ class Machine:
    # Returns `cmd : str` if allocation succeeds None otherwise.
    # if test == True, then resource are not actually consumed.
    def alloc(self, R, test=False):
        if R.time > self.end_time - tick(): # won't complete
            return None
        #assert R.cpu <= self.CpN
        #assert R.gpu <= self.GpN

@@ -115,4 +125,3 @@ class Machine:
            self.N[i][1] += c
        R.use = None
+4 −2
Original line number Diff line number Diff line
@@ -3,6 +3,7 @@
import yaml
from jobtypes import read_jobtypes
from graph import TaskGraph
from machine import machine
from manager import Manager

def main(argv):
@@ -21,8 +22,9 @@ def main(argv):
    T = float(argv[3])

    G = TaskGraph(types, jobs)
    M = Manager(time=T, verb=True)
    M.run( G.get_work, testonly )
    M = machine(T)
    R = Manager(M, T, verb=True)
    R.run( G.get_work, testonly )

if __name__=="__main__":
    import sys
+7 −11
Original line number Diff line number Diff line
@@ -9,13 +9,15 @@

import os, signal, time
from subprocess import Popen, check_call
from machine import *
_tick = time.time
tick = lambda: _tick()/60.0

class Manager:
    def __init__(self, time, verb=False):
        self.M = machine()
    # Inputs:
    #   M : `Machine`
    #   time : float
    def __init__(self, M, time=1e100, verb=False):
        self.M = M
        self.verb = verb

        self.end_time = tick() + time
@@ -47,13 +49,7 @@ class Manager:
        task.finish(ret)
        p.wait()

    # TODO: make rs = ResourceSet(task.(...))
    #  resource:
    #      task: 1
    #      cpu: 1
    #      gpu: 0
    #
    # get_work : Machine, time_in_min -> task | None
    # get_work : Machine -> task | None
    #     get_work is responsible for calling Machine.alloc
    #     Manager calls Machine.free
    # task = {
@@ -63,7 +59,7 @@ class Manager:
    #        }
    def run(self, get_work, testonly=False):
        while self.live and self.end_time > tick():
            task = get_work(self.M, self.end_time-tick())
            task = get_work(self.M)

            if task is None:
                if len(self.procs) == 0: # done