loadem.py 2.66 KB
Newer Older
David M. Rogers's avatar
David M. Rogers committed
1
2
3
4
#!/usr/bin/env python3

import os, subprocess
import redis, time, random
5
from datetime import datetime
David M. Rogers's avatar
David M. Rogers committed
6

David M. Rogers's avatar
David M. Rogers committed
7
test = False
8
9
testone = False
testtwo = True
David M. Rogers's avatar
David M. Rogers committed
10
11
12

conn_retries = 0

13
def stamp():
David M. Rogers's avatar
David M. Rogers committed
14
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " v0.5"
15

16
def run_redis(host, fn):
David M. Rogers's avatar
David M. Rogers committed
17
18
19
    global conn_retries
    for i in range(120):
        try:
20
            r = redis.StrictRedis(host=host, port=6379, password="Z1908840168_2_T1", db=0)
David M. Rogers's avatar
David M. Rogers committed
21
22
23
24
25
26
            break
        except redis.exceptions.ConnectionError:
            conn_retries += 1
            time.sleep(random.random()*0.2)
    else:
        raise redis.exceptions.ConnectionError
David M. Rogers's avatar
David M. Rogers committed
27
28

    u = fn(r)
David M. Rogers's avatar
David M. Rogers committed
29
30
31
    
    r.connection_pool.disconnect()
    del r
David M. Rogers's avatar
David M. Rogers committed
32
    return u
David M. Rogers's avatar
David M. Rogers committed
33

David M. Rogers's avatar
David M. Rogers committed
34
def get_shard(host):
David M. Rogers's avatar
David M. Rogers committed
35
36
37
38
39
40
41
    def enqueue(r):
        shard = r.spop('shards')
        if shard is not None:
            r.sadd('doing', shard)
        return shard

    shard = run_redis(host, enqueue)
42
43
    if shard is None:
        return shard
David M. Rogers's avatar
David M. Rogers committed
44
45
    return shard.decode('utf8')

David M. Rogers's avatar
David M. Rogers committed
46
47
out_pre = '/gpfs/alpine/world-shared/bif128/docked'

David M. Rogers's avatar
David M. Rogers committed
48
49
50
51
52
def main(argv):
    global conn_retries
    assert len(argv) == 2, "Usage: %s <redis host>"

    host = argv[1]
David M. Rogers's avatar
David M. Rogers committed
53
54
    rank = int(os.environ['OMPI_COMM_WORLD_RANK'])
    username = os.environ['USER']
David M. Rogers's avatar
David M. Rogers committed
55
56
57
58
    jobid = os.environ['LSB_JOBID']
    ret = subprocess.call("mkdir -p %s/logs/%s"%(out_pre,jobid), shell=True)
    ofile = open('%s/logs/%s/rank%04x.log'%(out_pre,jobid,rank), "w")

David M. Rogers's avatar
David M. Rogers committed
59
    time.sleep(rank*0.0001) # 10k connections per second at startup
60
    #print("%s: Host %04x requesting first shard."%(stamp(),rank)) # worried about hidden sync on file-write
David M. Rogers's avatar
David M. Rogers committed
61
62
63
64
65
66
67
68
69
70

    n = 0
    while True:
        shard = get_shard(host)
        if shard is None: # graceful shutdown
            break
        ret = False
        if not test:
            cmd = ["bash", "/ccs/proj/bif128/analysis/reduce/run_ad.sh"]
            cmd.extend(shard.split())
71
            cmd[2] = "p" + cmd[2]
David M. Rogers's avatar
David M. Rogers committed
72
            ret = subprocess.call(cmd)
David M. Rogers's avatar
David M. Rogers committed
73
74

        newset = 'done'
David M. Rogers's avatar
David M. Rogers committed
75
        if ret:
76
            ofile.write("%s %s ERR\n"%(stamp(),shard))
David M. Rogers's avatar
David M. Rogers committed
77
            newset = 'errors'
David M. Rogers's avatar
David M. Rogers committed
78
        else:
79
            ofile.write("%s %s OK\n"%(stamp(),shard))
David M. Rogers's avatar
David M. Rogers committed
80
        run_redis(host, lambda r: r.smove('doing', newset, shard))
David M. Rogers's avatar
David M. Rogers committed
81

David M. Rogers's avatar
David M. Rogers committed
82
83
84
        n += 1
        if n%10 == 0: # 13k of these messages.
            ofile.flush()
85
            print("%s Host %04x processed %d decishards."%(stamp(),rank,n))
David M. Rogers's avatar
David M. Rogers committed
86
87
        if testone:
            break
David M. Rogers's avatar
David M. Rogers committed
88
89
        if testtwo and n == 2:
            break
David M. Rogers's avatar
David M. Rogers committed
90
91
92

    ofile.close()

93
    print("%s Host %04x completed (%d decishards processed, %d conn retries)."%(stamp(),rank,n,conn_retries))
94
    ret = subprocess.call("rm -fr /mnt/bb/%s/%d"%(username, rank), shell=True)
David M. Rogers's avatar
David M. Rogers committed
95
96
97
98

if __name__=="__main__":
    import sys
    main(sys.argv)