Commit a644feaa authored by David M. Rogers's avatar David M. Rogers
Browse files

Changes for v0.6 - hopper.

parent 32e1102a
TODO: - check output files of this run (ex:)
/gpfs/alpine/world-shared/bif128/docked/149875/g02n03/p2498.7.tgz
- run for 4 hrs on 90 nodes
* v. 0.6
* Added logfile into tgz outputs
* Added 'hopper' global counter to terminate after running n ds
* Added some comments to key steps
* v. 0.5 * v. 0.5
* fix stupid bug * fix stupid bug
* create new redis db key (done) to track completed decishards * create new redis db key (done) to track completed decishards
...@@ -15,3 +24,9 @@ ...@@ -15,3 +24,9 @@
- saved shards-in-progress to new redis db key (doing) - saved shards-in-progress to new redis db key (doing)
* run_ad.sh: - added trap for copy-out errors * run_ad.sh: - added trap for copy-out errors
* v 0.2
* change to use ds = decishards = 1/10th of a shard (shard_id seg)
* ran redis fill/empty test for listing ds
* v 0.1
* test the run_ad.sh script on 1 docking shard
...@@ -6,12 +6,13 @@ from datetime import datetime ...@@ -6,12 +6,13 @@ from datetime import datetime
test = False test = False
testone = False testone = False
testtwo = True testtwo = False
hopper = True
conn_retries = 0 conn_retries = 0
def stamp(): def stamp():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " v0.5" return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " v0.6"
def run_redis(host, fn): def run_redis(host, fn):
global conn_retries global conn_retries
...@@ -32,13 +33,19 @@ def run_redis(host, fn): ...@@ -32,13 +33,19 @@ def run_redis(host, fn):
return u return u
def get_shard(host): def get_shard(host):
# TODO: make enqueue atomic
def enqueue(r): def enqueue(r):
if hopper: # use a counter to terminate early
k = r.decr("hopper")
if k < 0:
return None
shard = r.spop('shards') shard = r.spop('shards')
if shard is not None: if shard is not None:
r.sadd('doing', shard) r.sadd('doing', shard)
return shard return shard
shard = run_redis(host, enqueue) shard = run_redis(host, enqueue) # ex. "3321 7" ~> seg. 7 of shard p3321
if shard is None: if shard is None:
return shard return shard
return shard.decode('utf8') return shard.decode('utf8')
...@@ -58,6 +65,7 @@ def main(argv): ...@@ -58,6 +65,7 @@ def main(argv):
time.sleep(rank*0.0001) # 10k connections per second at startup time.sleep(rank*0.0001) # 10k connections per second at startup
#print("%s: Host %04x requesting first shard."%(stamp(),rank)) # worried about hidden sync on file-write #print("%s: Host %04x requesting first shard."%(stamp(),rank)) # worried about hidden sync on file-write
# redis DB contains 4 sets of shard-IDs
n = 0 n = 0
while True: while True:
...@@ -69,7 +77,7 @@ def main(argv): ...@@ -69,7 +77,7 @@ def main(argv):
cmd = ["bash", "/ccs/proj/bif128/analysis/reduce/run_ad.sh"] cmd = ["bash", "/ccs/proj/bif128/analysis/reduce/run_ad.sh"]
cmd.extend(shard.split()) cmd.extend(shard.split())
cmd[2] = "p" + cmd[2] cmd[2] = "p" + cmd[2]
ret = subprocess.call(cmd) ret = subprocess.call(cmd) # ex. bash run_ad.sh p3321 7
newset = 'done' newset = 'done'
if ret: if ret:
......
...@@ -9,7 +9,7 @@ ...@@ -9,7 +9,7 @@
export OMP_NUM_THREADS=7 export OMP_NUM_THREADS=7
set -e set -e
version="run_ad.sh v0.5" version="run_ad.sh v0.6"
if [ $# -ne 2 ]; then if [ $# -ne 2 ]; then
echo "Usage: $0 shard_name shard_segment" echo "Usage: $0 shard_name shard_segment"
...@@ -78,7 +78,7 @@ log completed segment file list ${start} to ${end} ...@@ -78,7 +78,7 @@ log completed segment file list ${start} to ${end}
log completed docking log completed docking
# copy-out function # copy-out function
#cd $WORK_DIR #cd $WORK_DIR
tar czf $shard_name.$seg.tgz `awk '{printf("%s.xml\n%s.dlg\n",$0,$0);}' lignames.$seg` \ tar czf $shard_name.$seg.tgz ${seg}.log `awk '{printf("%s.xml\n%s.dlg\n",$0,$0);}' lignames.$seg` \
|| echo "Error tarring some files." || echo "Error tarring some files."
cp $shard_name.$seg.tgz $OUT_DIR cp $shard_name.$seg.tgz $OUT_DIR
#rm -fr $WORK_DIR #rm -fr $WORK_DIR
......
#BSUB -nnodes 922 #BSUB -nnodes 90
#BSUB -W 120 #BSUB -W 5:00
#BSUB -q batch #BSUB -q batch
#BSUB -P BIF128 #BSUB -P BIF128
#BSUB -J ADv0.5 #BSUB -J ADv0.6
#BSUB -o %J.out #BSUB -o %J.out
#BSUB -alloc_flags "NVME" #BSUB -alloc_flags "NVME"
...@@ -10,7 +10,7 @@ source /ccs/proj/bif128/venvs/env.sh ...@@ -10,7 +10,7 @@ source /ccs/proj/bif128/venvs/env.sh
PROJ=/gpfs/alpine/bif128/proj-shared/redis PROJ=/gpfs/alpine/bif128/proj-shared/redis
gpus=$(( (LSB_MAX_NUM_PROCESSORS-1)/7 )) gpus=$(( (LSB_MAX_NUM_PROCESSORS-1)/7 ))
echo "Starting $((gpus/6)) node run of ADv0.5 at " `date` echo "Starting $((gpus/6)) node run of ADv0.6 at " `date`
[ -s $PROJ/shards.rdb ] [ -s $PROJ/shards.rdb ]
REMAKE=$? REMAKE=$?
...@@ -34,6 +34,7 @@ export OMP_NUM_THREADS=7 ...@@ -34,6 +34,7 @@ export OMP_NUM_THREADS=7
jsrun -X 0 \ jsrun -X 0 \
-n $gpus -r6 -a1 -g1 -c7 -d cyclic -b packed:7 \ -n $gpus -r6 -a1 -g1 -c7 -d cyclic -b packed:7 \
python loadem.py `hostname` python loadem.py `hostname`
# jsrun -> loadem.py -> run_ad.sh
# Print a nice little summary: # Print a nice little summary:
memb=$(query scard shards) memb=$(query scard shards)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment