loadem.py 3.35 KB
Newer Older
David M. Rogers's avatar
David M. Rogers committed
1
2
3
4
#!/usr/bin/env python3

import os, subprocess
import redis, time, random
5
from datetime import datetime
David M. Rogers's avatar
David M. Rogers committed
6

David M. Rogers's avatar
David M. Rogers committed
7
test = False
8
testone = False
David M. Rogers's avatar
David M. Rogers committed
9
hopper = False
David M. Rogers's avatar
David M. Rogers committed
10
11
12

conn_retries = 0

13
def stamp():
David M. Rogers's avatar
David M. Rogers committed
14
    return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " v1.2"
15

16
def run_redis(host, fn):
David M. Rogers's avatar
David M. Rogers committed
17
18
19
    global conn_retries
    for i in range(120):
        try:
20
            r = redis.StrictRedis(host=host, port=6379, password="Z1908840168_2_T1", db=0)
David M. Rogers's avatar
David M. Rogers committed
21
22
23
24
25
26
            break
        except redis.exceptions.ConnectionError:
            conn_retries += 1
            time.sleep(random.random()*0.2)
    else:
        raise redis.exceptions.ConnectionError
David M. Rogers's avatar
David M. Rogers committed
27
28

    u = fn(r)
David M. Rogers's avatar
David M. Rogers committed
29
30
31
    
    r.connection_pool.disconnect()
    del r
David M. Rogers's avatar
David M. Rogers committed
32
    return u
David M. Rogers's avatar
David M. Rogers committed
33

David M. Rogers's avatar
David M. Rogers committed
34
def get_shard(host):
David M. Rogers's avatar
David M. Rogers committed
35
    # TODO: make enqueue atomic
David M. Rogers's avatar
David M. Rogers committed
36
    def enqueue(r):
David M. Rogers's avatar
David M. Rogers committed
37
38
39
40
41
        if hopper: # use a counter to terminate early
          k = r.decr("hopper")
          if k < 0:
              return None

David M. Rogers's avatar
David M. Rogers committed
42
43
44
45
46
        shard = r.spop('shards')
        if shard is not None:
            r.sadd('doing', shard)
        return shard

David M. Rogers's avatar
David M. Rogers committed
47
    shard = run_redis(host, enqueue) # ex. "3321 7" ~> seg. 7 of shard p3321
48
49
    if shard is None:
        return shard
David M. Rogers's avatar
David M. Rogers committed
50
51
    return shard.decode('utf8')

David M. Rogers's avatar
David M. Rogers committed
52
out_pre = '/gpfs/alpine/world-shared/bif128/6WQF_docked'
David M. Rogers's avatar
David M. Rogers committed
53

David M. Rogers's avatar
David M. Rogers committed
54
55
56
57
58
def main(argv):
    global conn_retries
    assert len(argv) == 2, "Usage: %s <redis host>"

    host = argv[1]
David M. Rogers's avatar
David M. Rogers committed
59
60
    rank = int(os.environ['OMPI_COMM_WORLD_RANK'])
    username = os.environ['USER']
David M. Rogers's avatar
David M. Rogers committed
61
62
    jobid = os.environ['LSB_JOBID']
    ret = subprocess.call("mkdir -p %s/logs/%s"%(out_pre,jobid), shell=True)
David M. Rogers's avatar
David M. Rogers committed
63
    ret = subprocess.call("rm -fr /mnt/bb/%s/%d"%(username, rank), shell=True)
David M. Rogers's avatar
David M. Rogers committed
64
65
    ofile = open('%s/logs/%s/rank%04x.log'%(out_pre,jobid,rank), "w")

David M. Rogers's avatar
David M. Rogers committed
66
    time.sleep(rank*0.0001) # 10k connections per second at startup
67
    #print("%s: Host %04x requesting first shard."%(stamp(),rank)) # worried about hidden sync on file-write
David M. Rogers's avatar
David M. Rogers committed
68
    # redis DB contains 4 sets of shard-IDs
David M. Rogers's avatar
David M. Rogers committed
69
70

    n = 0
David M. Rogers's avatar
David M. Rogers committed
71
72
    errors = 0
    consecutive_errors = 0
David M. Rogers's avatar
David M. Rogers committed
73
74
75
76
77
78
79
80
    while True:
        shard = get_shard(host)
        if shard is None: # graceful shutdown
            break
        ret = False
        if not test:
            cmd = ["bash", "/ccs/proj/bif128/analysis/reduce/run_ad.sh"]
            cmd.extend(shard.split())
81
            cmd[2] = "p" + cmd[2]
David M. Rogers's avatar
David M. Rogers committed
82
            ret = subprocess.call(cmd) # ex. bash run_ad.sh p3321 7
David M. Rogers's avatar
David M. Rogers committed
83
84

        newset = 'done'
David M. Rogers's avatar
David M. Rogers committed
85
        if ret:
86
            ofile.write("%s %s ERR\n"%(stamp(),shard))
David M. Rogers's avatar
David M. Rogers committed
87
            newset = 'errors'
David M. Rogers's avatar
David M. Rogers committed
88
89
90
91
92
93
94
            consecutive_errors += 1
            errors += 1
            if consecutive_errors >= 10:
                print("%s Host %04x quitting due to %d consecutive errors."%(stamp(),rank,consecutive_errors))
                break
            if consecutive_errors >= 2:
                time.sleep(60)
David M. Rogers's avatar
David M. Rogers committed
95
        else:
96
            ofile.write("%s %s OK\n"%(stamp(),shard))
David M. Rogers's avatar
David M. Rogers committed
97
            consecutive_errors = 0
David M. Rogers's avatar
David M. Rogers committed
98
        run_redis(host, lambda r: r.smove('doing', newset, shard))
David M. Rogers's avatar
David M. Rogers committed
99

David M. Rogers's avatar
David M. Rogers committed
100
101
102
        n += 1
        if n%10 == 0: # 13k of these messages.
            ofile.flush()
103
            print("%s Host %04x processed %d decishards."%(stamp(),rank,n))
David M. Rogers's avatar
David M. Rogers committed
104
105
        if testone:
            break
David M. Rogers's avatar
David M. Rogers committed
106
107
108

    ofile.close()

David M. Rogers's avatar
David M. Rogers committed
109
    print("%s Host %04x completed (%d decishards processed, %d errors, %d conn retries)."%(stamp(),rank,n,errors,conn_retries))
110
    ret = subprocess.call("rm -fr /mnt/bb/%s/%d"%(username, rank), shell=True)
David M. Rogers's avatar
David M. Rogers committed
111
112
113
114

if __name__=="__main__":
    import sys
    main(sys.argv)