Commit 416b9e6c authored by David M. Rogers's avatar David M. Rogers
Browse files

Updates during testing.

parent 30f24e1a
#!/bin/bash
#SBATCH -p rescore
#SBATCH --nodes 1
#SBATCH -n64
#SBATCH -J breakup
#SBATCH -o %x.%A_%a.out
#SBATCH --array=1
source /apps/dock_env/bin/activate
export OMP_NUM_THREADS=1
DIR=/apps/launchad
cd /dev/shm
srun -n64 -N1 --exclusive \
$DIR/loadem.py ccddc-controller $SLURM_JOB_ID
......@@ -3,26 +3,41 @@
import pandas as pd
def ihash(x):
return "%x"%( (48271*x)%2147483647 )
def main(argv):
assert len(argv) == 2, "Usage: %s <inp.pq> <out-fmt.pq>"
n = None
if len(argv) >= 3 and argv[1] == '-n':
n = int(argv[2])
del argv[1:3]
assert len(argv) == 4, "Usage: %s [-n nout] <start seq> <inp.pq> <out-fmt.pq>"
start = int(argv[1])
df = pd.read_parquet(argv[2])
out = argv[3]
if n is None:
chunk = 1024
n = len(df) // chunk # number of output files
out = argv[2]
df = pd.read_parquet(argv[1])
chunk = 1024
off = len(df)%chunk
chunk = len(df) // n
if chunk == 0:
raise IndexError("len(df) < n # need fewer output files.")
low = len(df) % n # outputs 0, 1, ..., low-1 get +1 output
# chunk*n + low == len(df)
n = len(df) // chunk # number of output files
chunk += off // n # extra number of ligands per output
low = off % n # outputs 0, 1, ..., low-1 get +1 ligand
print("Creating %d outputs of sz = %d, %d of size %d"%(
n-low, chunk+1, low, chunk))
k = 0
for i in range(n):
onema = out%i
print(oname)
oname = out % ihash(start+i)
m = chunk + (i < low)
df.iloc[k:k+m].to_parquet(oname,
compression='snappy',
engine='pyarrow'
)
z = df.iloc[k:k+m][['name', 'conf']].set_index('name')
z.to_parquet(oname,
compression='snappy',
engine='pyarrow'
)
k += m
if __name__=="__main__":
......
#!/bin/bash
#SBATCH -p dock
#SBATCH --nodes 1
#SBATCH --cpus-per-task 2
#SBATCH --gres gpu:1
#SBATCH -J dock
#SBATCH -o %x.%A_%a.out
#SBATCH --array=1-2
source /apps/dock_env/bin/activate
export OMP_NUM_THREADS=2
DIR=/apps/launchad
cd /dev/shm
srun -n1 -N1 --gres=gpu:1 --cpus-per-task=2 --exclusive \
$DIR/loadem.py ccddc-controller $SLURM_JOB_ID
......@@ -6,7 +6,7 @@ import redis, time, random
from datetime import datetime
base = Path(__file__).resolve().parent
rules = yaml.safe_load(base / 'rules.yaml')
rules = yaml.safe_load(open(base / 'rules.yaml'))
bucket = 'gs://ccddc'
test = False
......@@ -59,15 +59,17 @@ def copyin(name, bucket = bucket, static=False):
static = base[-4:] == '.tgz' # hack for tgz
if static and Path(base).is_file():
return
ret = subprocess.call( ["gsutil", "cp", bucket + '/' name, base] )
ret = subprocess.call( ["gsutil", "cp", bucket + '/' + name, base] )
if ret: return ret
if base[-4:] == '.tgz':
ret = subprocess.call( ["tar", "xzf", base] )
return ret
def moveout(name, bucket = bucket):
if name[-1] == '/': # move whole directory
return subprocess.call( ["gsutil", "mv", name+"*", bucket + '/' + name] )
loc = PurePosixPath(name).name
return subprocess.call( ["gsutil", "mv", loc, bucket + '/' name] )
return subprocess.call( ["gsutil", "mv", loc, bucket + '/' + name] )
def run_job(job, item):
for p,x in zip(job['params'], item.split()):
......@@ -85,9 +87,11 @@ def run_job(job, item):
if ret: return ret
return ret
# also adds self to hosts key
# requeue all members of `assigned`
def requeue(assigned, host, db):
def run(r):
r.sadd('hosts', assigned)
for i in range(10):
item = r.spop(assigned)
if item is None:
......@@ -101,21 +105,35 @@ def requeue(assigned, host, db):
# loadem.py localhost $SLURM_JOB_ID $SLURM_ARRAY_TASK_ID
def main(argv):
global conn_retries
assert len(argv) == 4, "Usage: %s <redis host> <job type> <worker id>"
assert len(argv) == 3, "Usage: %s <redis host> <job type>"
host = argv[1]
jobname = argv[2]
rank = int(argv[3])
# Determine a unique rank name
try:
rank = os.environ['SLURM_ARRAY_TASK_ID']
except KeyError:
rank = "x"
try:
rank = rank + "-" + os.environ['OMPI_COMM_WORLD_RANK']
except KeyError:
pass
print("%s Rank %s starting."%(stamp(),rank))
if Path(rank).exists():
subprocess.call( ["rm", "-fr", rank] )
os.mkdir(rank)
os.chdir(rank)
job = rules[job]
db = int(job['db']) # assoc. redis db
username = os.environ['USER']
#username = os.environ['USER']
# assignment key for this rank
assigned = 'a_%d'%rank
assigned = 'a_%s'%rank
time.sleep(rank*0.001) # 1k connections per second at startup
#time.sleep(rank*0.001) # 1k connections per second at startup
requeue(assigned, host, db)
n = 0
......@@ -132,13 +150,13 @@ def main(argv):
if ret:
def add_err(r):
r.sadd('errors', item)
#r.sadd('item_err/%s'%item, "%d"%rank)
#r.sadd('item_err/%s'%item, rank)
r.srem(assigned, item)
run_redis(add_err, host, db)
consecutive_errors += 1
errors += 1
if consecutive_errors >= 5:
print("%s Rank %d quitting due to %d consecutive errors."%(
print("%s Rank %s quitting due to %d consecutive errors."%(
stamp(),rank,consecutive_errors))
break
if consecutive_errors >= 2:
......@@ -151,7 +169,7 @@ def main(argv):
if testone:
break
print("%s Rank %d completed (%d items processed, "
print("%s Rank %s completed (%d items processed, "
"%d errors, %d conn retries)."%(stamp(),rank,n,errors,conn_retries))
if __name__=="__main__":
......
......@@ -3,31 +3,42 @@ dock:
queue: dock
db: 0
params: [r, n]
out: {r}_docked/{n}.pq
out: [ "{r}_docked/{n}.pq" ]
inp:
- targets/{r}.tgz # note: untarring is automatic
- shards/{n}.pq
- ligs/{n}.pq
script:
export OMP_NUM_THREADS=2
version="dwork v1.0"
log() { echo $(date +"%F %H:%M:%S.%N") "($version) {r}_docked/{n} $*" }
log() {{ echo $(date +"%F %H:%M:%S.%N") "($version) {r}_docked/{n} $*" }}
log started
ls *{r}*.fld >filelist
/apps/docking/create_inp.py {n}.pq >>filelist
/apps/launchad/create_inp.py {n}.pq >>filelist
rm {n}.pq
log completed file list
autodock_gpu_64wi -filelist filelist -nrun 20 -autostop 1 -nev 3000000 >/dev/null
/apps/docking/package_out.py filelist {n}.pq
/apps/launchad/package_out.py filelist {n}.pq
# Re-score ligand/receptor conf.
rescore:
queue: rescore
db: 1
params: [r, n]
out: {r}_scored/{n}.pq
out: [ "{r}_scored/{n}.pq" ]
inp:
- targets/{r}.tgz # note: untarring is automatic
- {r}_docked/{n}.pq
- "{r}_docked/{n}.pq"
script:
/apps/scoring/rescore.py *{r}*.pdbqt {n}.pq
/apps/launchad/rescore.py *{r}*.pdbqt {n}.pq
breakup:
queue: rescore
db: 2
params: [n]
out: [ ligs/ ]
inp:
- 6WQF_docked/docked.{n}.parquet
script:
mkdir ligs
/apps/launchad/breakup.py -n 512 $((1+{n}*512)) docked.{n}.parquet ligs/%s.pq
#!/usr/bin/env python3
# WARNING: this actually sets 10 shards per .tar.gz "shard" file
# labeled with index pair: "n m", n = shard_id, m \in {0, 1, ..., 9}
import re
def get_rdb(host):
def get_rdb(host, db=0):
import redis
return redis.Redis(host=host, port=6379, password="Z1908840168_2_T1", db=0)
expr = re.compile(r"_p([0-9]*).tar.gz")
return redis.Redis(host=host, port=6379, password="Z1908840168_2_T1", db=db)
def main(argv):
el = set()
......@@ -19,21 +15,20 @@ def main(argv):
del argv[1:3]
print("%d exclusions"%len(el))
assert len(argv) == 3, "Usage: %s [-e excl_list_file] <server name> <shard list file>"
r = get_rdb(argv[1])
assert len(argv) == 4, "Usage: %s [-e excl_list_file] <server name> <db> <file>"
r = get_rdb(argv[1], int(argv[2]))
jfile = argv[3]
n = 0
k = 0
lines = 0
for line in open(argv[2]):
lines += 1
m = expr.search(line)
if m is None:
continue
shards = set(["%s %d"%(m[1], i) for i in range(10)])
shards -= el
k += r.sadd('shards', *list(shards))
print("%d/%d shards added"%(k,lines*10))
for line in open(jfile):
x = line.strip()
if len(x) < 1: continue
n += 1
if x in el: continue
k += r.sadd('ready', x)
print("%d/%d items added"%(k,n))
if __name__=="__main__":
import sys
......
#BSUB -q dock
#BSUB -n 1
#BSUB -J dock
#BSUB -gres gpu:1
#SBATCH --array=1-2
source /apps/dock_env/bin/activate
export OMP_NUM_THREADS=2
DIR=$PWD
cd /dev/shm
srun --gres=gpu:1 -n1 --exclusive \
$PWD/loadem.py localhost $SLURM_JOB_ID $SLURM_ARRAY_TASK_ID
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment