Commit 32e1102a authored by David M. Rogers's avatar David M. Rogers
Browse files

Fix stupid python bug.

parent 6cfbbb89
* v. 0.5
* fix stupid bug
* create new redis db key (done) to track completed decishards
* added requeue.py to move 'doing' keys back to 'shards'
* added mk_done.sh to list completed tarfiles using find (in case complete
recovery is needed) -- this file now works with setdb.py
* upgraded parse_log.py to give stats on each AD process stage
* v. 0.4
* timestamp all log messages
* introduced subtle bug on "testone = True"
* v. 0.3 * v. 0.3
loadem.py: - moved logs/rank0000.log into logs/jobid/rank0000.log * loadem.py: - moved logs/rank0000.log into logs/jobid/rank0000.log
- saved shards-in-progress to new redis db key - saved shards-in-progress to new redis db key (doing)
run_ad.sh: - added trap for copy-out errors * run_ad.sh: - added trap for copy-out errors
...@@ -11,7 +11,7 @@ testtwo = True ...@@ -11,7 +11,7 @@ testtwo = True
conn_retries = 0 conn_retries = 0
def stamp(): def stamp():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " v0.4" return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " v0.5"
def run_redis(host, fn): def run_redis(host, fn):
global conn_retries global conn_retries
...@@ -70,21 +70,23 @@ def main(argv): ...@@ -70,21 +70,23 @@ def main(argv):
cmd.extend(shard.split()) cmd.extend(shard.split())
cmd[2] = "p" + cmd[2] cmd[2] = "p" + cmd[2]
ret = subprocess.call(cmd) ret = subprocess.call(cmd)
newset = 'done'
if ret: if ret:
ofile.write("%s %s ERR\n"%(stamp(),shard)) ofile.write("%s %s ERR\n"%(stamp(),shard))
run_redis(host, lambda r: r.sadd('errors', shard)) newset = 'errors'
else: else:
ofile.write("%s %s OK\n"%(stamp(),shard)) ofile.write("%s %s OK\n"%(stamp(),shard))
run_redis(host, lambda r: r.smove('doing', newset, shard))
run_redis(host, lambda r: r.srem('doing', shard))
n += 1 n += 1
if n%10 == 0: # 13k of these messages. if n%10 == 0: # 13k of these messages.
ofile.flush() ofile.flush()
print("%s Host %04x processed %d decishards."%(stamp(),rank,n)) print("%s Host %04x processed %d decishards."%(stamp(),rank,n))
if testone: if testone:
break break
if testtwo: if testtwo and n == 2:
testone = True break
ofile.close() ofile.close()
......
# create the 'done' file listing completed decishards
find /gpfs/alpine/world-shared/bif128/docked/ -name 'p*.tgz' | sed -n -e 's/.*\/p\([0-9]*\)\.\([0-9]*\)\.tgz/\1 \2/p' >done
#!/usr/bin/env python3
# This script will re-queue all 'doing' entries back into 'shards'
# It is meant to be run by hand to recover from a failed run.
import re
def get_rdb(host):
import redis
return redis.Redis(host=host, port=6379, password="Z1908840168_2_T1", db=0)
def main(argv):
assert len(argv) == 2, "Usage: %s <server name>"
r = get_rdb(argv[1])
todo = r.scard('doing')
k = 0
while True:
sh = r.spop('doing')
if sh is None:
break
k += r.sadd('shards', sh)
print("%d/%d shards requeued"%(k,todo))
if __name__=="__main__":
import sys
main(sys.argv)
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
export OMP_NUM_THREADS=7 export OMP_NUM_THREADS=7
set -e set -e
version="run_ad.sh v0.4" version="run_ad.sh v0.5"
if [ $# -ne 2 ]; then if [ $# -ne 2 ]; then
echo "Usage: $0 shard_name shard_segment" echo "Usage: $0 shard_name shard_segment"
......
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
#BSUB -W 120 #BSUB -W 120
#BSUB -q batch #BSUB -q batch
#BSUB -P BIF128 #BSUB -P BIF128
#BSUB -J ADv0.4 #BSUB -J ADv0.5
#BSUB -o %J.out #BSUB -o %J.out
#BSUB -alloc_flags "NVME" #BSUB -alloc_flags "NVME"
...@@ -10,7 +10,7 @@ source /ccs/proj/bif128/venvs/env.sh ...@@ -10,7 +10,7 @@ source /ccs/proj/bif128/venvs/env.sh
PROJ=/gpfs/alpine/bif128/proj-shared/redis PROJ=/gpfs/alpine/bif128/proj-shared/redis
gpus=$(( (LSB_MAX_NUM_PROCESSORS-1)/7 )) gpus=$(( (LSB_MAX_NUM_PROCESSORS-1)/7 ))
echo "Starting $((gpus/6)) node run of v0.4 at " `date` echo "Starting $((gpus/6)) node run of ADv0.5 at " `date`
[ -s $PROJ/shards.rdb ] [ -s $PROJ/shards.rdb ]
REMAKE=$? REMAKE=$?
......
...@@ -11,7 +11,15 @@ def get_rdb(host): ...@@ -11,7 +11,15 @@ def get_rdb(host):
expr = re.compile(r"_p([0-9]*).tar.gz") expr = re.compile(r"_p([0-9]*).tar.gz")
def main(argv): def main(argv):
assert len(argv) == 3, "Usage: %s <server name> <shard list file>" el = set()
if len(argv) >= 3 and argv[1] == "-e":
with open(argv[2]) as f:
for line in f:
el.add(line.strip())
del argv[1:3]
print("%d exclusions"%len(el))
assert len(argv) == 3, "Usage: %s [-e excl_list_file] <server name> <shard list file>"
r = get_rdb(argv[1]) r = get_rdb(argv[1])
k = 0 k = 0
...@@ -21,11 +29,11 @@ def main(argv): ...@@ -21,11 +29,11 @@ def main(argv):
m = expr.search(line) m = expr.search(line)
if m is None: if m is None:
continue continue
shards = ["%s %d"%(m[1], i) for i in range(10)] shards = set(["%s %d"%(m[1], i) for i in range(10)])
r.sadd('shards', *shards) shards -= el
k += 1 k += r.sadd('shards', *list(shards))
print("%d/%d shards added"%(k,lines)) print("%d/%d shards added"%(k,lines*10))
if __name__=="__main__": if __name__=="__main__":
import sys import sys
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment