Commit e4704b17 authored by David M. Rogers's avatar David M. Rogers
Browse files

Part-way to GCP implementation.

parent 91f9dc30
# Pre-requisites
* Python Packages
* pandas
* numpy
* oddt
1. fix run_docking.lsf to trap errors and shutdown the DB on time-up signal
x. Create a rules.yaml entry for each job-type
- store inp file names
- store script generated per file
x. generalize loadem.py to run prologue / epilogue
scripts copying data to/from GS://ccddc
- this uses the job metadata created in 2
2.5. Improve loadem.py to store 'assigned'
items inside assigned/M (where M is the job-array-ID).
And check 'assigned/M' on startup!
3. Create separate DBs for each function name
- This lets the workflow crunch different jobs
with different reqs.
- associate the DB access info. with a job-type
4. create status query scripts parameterized over
jobtype (i.e. database name)
5. build & run test jobs for each jobtype
6. curate complete dataset for each job type
#!/usr/bin/env python3
import pandas as pd
def main(argv):
assert len(argv) == 2, "Usage: %s <ligs.pq>"
df = pd.read_parquet(argv[1])
for lig in df.itertuples():
fname = lig.name + '.pdbqt'
with open(fname, "w") as f:
f.write(lig.pdbqt)
print( "%s\n%s"%(lig.name, fname) )
if __name__=="__main__":
import sys
main(sys.argv)
import re
import tarfile as tf
from pathlib import Path
import numpy as np
newaxis = np.newaxis
map = lambda f, x: [f(y) for y in x]
# iterator over tar-file
def tar_iter(name, ext=None):
inp = Path(name)
fname = inp.name.split('.', 1)[0]
with tf.open(inp, 'r|*') as tar:
data = None
for tarinfo in tar:
#if tarinfo.isreg():
# print(tarinfo.name, "is", tarinfo.size, "bytes in size")
if ext is not None and tarinfo.name[-len(ext)-1:] != '.'+ext:
continue
name = tarinfo.name[:-len(ext)-1] # get a good name from the file
#if name[:2] == './':
# name = name[2:]
name = name.rsplit('/', 1)[-1]
with tar.extractfile(tarinfo) as f:
yield fname, name, f
# parse the first cluster from each included xml file.
# example cluster syntax is:
# <cluster cluster_rank="1" lowest_binding_energy="-4.09" run="9" mean_binding_energy="-4.09" num_in_clus="1" />
cl = re.compile(b'\s*<cluster cluster_rank="1" lowest_binding_energy="([^"]*)" run="([^"]*)" mean_binding_energy="([^"]*)" num_in_clus="([^"]*)" />')
def xml_to_energy(f):
for line in f:
m = cl.match(line)
if m:
return float(m[1]),int(m[2]),float(m[3]),int(m[4])
return None
def grep_all(f, *keys):
out = dict((k,[]) for k in keys)
for line in f:
line = line.decode('utf8')
for k in keys:
if k in line:
out[k].append(line)
return out
def parse_dlg(f):
#dlg = grep_all(f, "DOCKED:", "Estimated", "Number of atoms", "Number of rotat")
dlg = grep_all(f, "DOCKED:", "Number of rotatable bonds:")
confs = dlg_to_confs(dlg['DOCKED:'])
# truncate to max 5
#confs = confs[:min(5,len(confs))]
try:
tors = int( dlg['Number of rotatable bonds:'][0][26:] )
except:
tors = np.nan
return confs, tors # en, pdbqt
# inputstrings are all containing "DOCKED:"
def dlg_to_confs(lines):
confs = []
conf = []
en = None
for s in lines:
conf.append( s[8:].strip() )
# DOCKED: USER Estimated Free Energy of Binding = -7.85 kcal/mol
if "Estimated Free Energy of Binding" in s:
tok = s.replace('=','').split()
en = float(tok[7])
elif s[8:14] == "ENDMDL":
assert en is not None
confs.append( (en, conf) )
conf = []
en = None
confs.sort()
return confs
# Basically, autodock will spit out -nruns number of poses, which I chose to output to a .pdb file so I could load them in typical molecular visualization tools. This also means that you'd need to get the energies for all the models, and sort them. This is what I was doing on a much smaller set:
# lines that contain "Estimated"
def dlg_to_energy(lines):
en = []
for line in lines[:-1]:
en.append( line.replace('=','').split()[7] )
return en
# You can also get ligand data from the dlg if you like.
def dlg_liganddata(atline, rotline):
atomcount = int(atline.split()[-1])
torsioncount = int(rotline.split()[-1])
return (atomcount, torsioncount)
# conf is a list of lines like:
# ATOM 2 C LIG 1 7.314 -0.197 19.453
def validate_conf(conf):
try:
xyz = [ map(float, [line[30:38], line[38:46], line[46:54]]) \
for line in conf if line[:6] == "ATOM " or line[:6] == "HETATM" ]
x = np.array(xyz)
linf = np.abs( x[:,newaxis] - x ).max(2)
linf += np.identity(len(linf))
if linf.min() < 0.05:
return 0
except ValueError: # couldn't parse coords.
return 0
return len(xyz)
def add_score(f, confs):
conf, ntor = parse_dlg(f)
for i in range(len(confs)):
if len(conf) < i:
confs[i][0].append(None)
confs[i][1].append(None)
else:
confs[i][0].append(conf[i][0]) # en[i]
confs[i][1].append("\n".join(conf[i][1])) # crd[i]
return True
#!/usr/bin/env python3
import os, subprocess
import os, subprocess, yaml
from pathlib import Path, PurePosixPath
import redis, time, random
from datetime import datetime
base = Path(__file__).resolve().parent
rules = yaml.safe_load(base / 'rules.yaml')
bucket = 'gs://ccddc'
test = False
testone = False
hopper = False
......@@ -11,13 +16,13 @@ hopper = False
conn_retries = 0
def stamp():
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " v1.2"
return datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f") + " v1.0"
def run_redis(host, fn):
def run_redis(fn, host, db=0):
global conn_retries
for i in range(120):
try:
r = redis.StrictRedis(host=host, port=6379, password="Z1908840168_2_T1", db=0)
r = redis.StrictRedis(host=host, port=6379, password="Z1908840168_2_T1", db=db)
break
except redis.exceptions.ConnectionError:
conn_retries += 1
......@@ -31,7 +36,7 @@ def run_redis(host, fn):
del r
return u
def get_shard(host):
def get_item(host, db):
# TODO: make enqueue atomic
def enqueue(r):
if hopper: # use a counter to terminate early
......@@ -39,75 +44,115 @@ def get_shard(host):
if k < 0:
return None
shard = r.spop('shards')
if shard is not None:
r.sadd('doing', shard)
return shard
shard = run_redis(host, enqueue) # ex. "3321 7" ~> seg. 7 of shard p3321
if shard is None:
return shard
return shard.decode('utf8')
out_pre = '/gpfs/alpine/world-shared/bif128/6WQF_docked'
item = r.spop('ready')
if item is not None:
r.sadd('assigned', item)
return item
item = run_redis(enqueue, host, db)
if item is None:
return item
return item.decode('utf8')
def copyin(name, bucket = bucket, static=False):
base = PurePosixPath(name).name
static = base[-4:] == '.tgz' # hack for tgz
if static and Path(base).is_file():
return
ret = subprocess.call( ["gsutil", "cp", bucket + '/' name, base] )
if ret: return ret
if base[-4:] == '.tgz':
ret = subprocess.call( ["tar", "xzf", base] )
return ret
def moveout(name, bucket = bucket):
loc = PurePosixPath(name).name
return subprocess.call( ["gsutil", "mv", loc, bucket + '/' name] )
def run_job(job, item):
for p,x in zip(job['params'], item.split()):
job[p] = x
for inp in job['inp']:
ret = copyin(inp.format(**job))
if ret: return ret
ret = subprocess.call( job['script'].format(**job), shell=True )
if ret: return ret
for out in job['out']:
ret = moveout(out.format(**job))
if ret: return ret
return ret
# requeue all members of `assigned`
def requeue(assigned, host, db):
def run(r):
for i in range(10):
item = r.spop(assigned)
if item is None:
break
r.smove(assigned, 'ready', item)
else:
raise IndexError("More than 10 items assigned to %s!"%assigned)
run_redis(run, host, db)
# usually run as:
# loadem.py localhost $SLURM_JOB_ID $SLURM_ARRAY_TASK_ID
def main(argv):
global conn_retries
assert len(argv) == 2, "Usage: %s <redis host>"
assert len(argv) == 4, "Usage: %s <redis host> <job type> <worker id>"
host = argv[1]
rank = int(os.environ['OMPI_COMM_WORLD_RANK'])
jobname = argv[2]
rank = int(argv[3])
job = rules[job]
db = int(job['db']) # assoc. redis db
username = os.environ['USER']
jobid = os.environ['LSB_JOBID']
ret = subprocess.call("mkdir -p %s/logs/%s"%(out_pre,jobid), shell=True)
ret = subprocess.call("rm -fr /mnt/bb/%s/%d"%(username, rank), shell=True)
ofile = open('%s/logs/%s/rank%04x.log'%(out_pre,jobid,rank), "w")
time.sleep(rank*0.0001) # 10k connections per second at startup
#print("%s: Host %04x requesting first shard."%(stamp(),rank)) # worried about hidden sync on file-write
# redis DB contains 4 sets of shard-IDs
# assignment key for this rank
assigned = 'a_%d'%rank
time.sleep(rank*0.001) # 1k connections per second at startup
requeue(assigned, host, db)
n = 0
errors = 0
consecutive_errors = 0
while True:
shard = get_shard(host)
if shard is None: # graceful shutdown
item = get_item(host, db)
if item is None: # graceful shutdown
break
ret = False
if not test:
cmd = ["bash", "/ccs/proj/bif128/analysis/reduce/run_ad.sh"]
cmd.extend(shard.split())
cmd[2] = "p" + cmd[2]
ret = subprocess.call(cmd) # ex. bash run_ad.sh p3321 7
ret = run_job(job, item)
newset = 'done'
if ret:
ofile.write("%s %s ERR\n"%(stamp(),shard))
newset = 'errors'
def add_err(r):
r.sadd('errors', item)
#r.sadd('item_err/%s'%item, "%d"%rank)
r.srem(assigned, item)
run_redis(add_err, host, db)
consecutive_errors += 1
errors += 1
if consecutive_errors >= 10:
print("%s Host %04x quitting due to %d consecutive errors."%(stamp(),rank,consecutive_errors))
if consecutive_errors >= 5:
print("%s Rank %d quitting due to %d consecutive errors."%(
stamp(),rank,consecutive_errors))
break
if consecutive_errors >= 2:
time.sleep(60)
else:
ofile.write("%s %s OK\n"%(stamp(),shard))
run_redis(lambda r: r.srem(assigned, item), host, db)
consecutive_errors = 0
run_redis(host, lambda r: r.smove('doing', newset, shard))
n += 1
if n%10 == 0: # 13k of these messages.
ofile.flush()
print("%s Host %04x processed %d decishards."%(stamp(),rank,n))
if testone:
break
ofile.close()
print("%s Host %04x completed (%d decishards processed, %d errors, %d conn retries)."%(stamp(),rank,n,errors,conn_retries))
ret = subprocess.call("rm -fr /mnt/bb/%s/%d"%(username, rank), shell=True)
print("%s Rank %d completed (%d items processed, "
"%d errors, %d conn retries)."%(stamp(),rank,n,errors,conn_retries))
if __name__=="__main__":
import sys
......
#!/usr/bin/env python3
# Directly parse ligand dlg files into a pandas dataframe.
from helpers import *
import pandas as pd
def write_df(out, names, conf):
df = pd.DataFrame(data={'name': names,
'score': conf[0][0],
'score2': conf[1][0],
'score3': conf[2][0],
'conf': conf[0][1],
'conf2': conf[1][1],
'conf3': conf[2][1]
}
)
df.set_index('name')
df.to_parquet(out)
def main(argv):
assert len(argv) == 3, "Usage: %s <filelist> <ligs.pq>"
names = []
confs = [[[], []],
[[], []],
[[], []]]
# every other line contains a name, beginning on second line
n = 0
with open(argv[1]) as lst:
for line in lst:
n += 1
if n % 2 == 1: # skip line 1, 3, 5, ...
continue
name = line.strip()
names.append(name)
fname = Path(name + '.dlg')
if not fname.is_file():
for c in confs: # 0, 1, 2
for j in c: # en, crd
j.append(None)
continue
with open(fname) as f:
add_score(f, confs)
write_df(argv[2], names, confs)
if __name__=="__main__":
import sys
main(sys.argv)
# Dock ligand set n to receptor r
dock:
queue: dock
db: 0
params: [r, n]
out: {r}_docked/{n}.pq
inp:
- targets/{r}.tgz # note: untarring is automatic
- shards/{n}.pq
script:
export OMP_NUM_THREADS=2
version="dwork v1.0"
log() { echo $(date +"%F %H:%M:%S.%N") "($version) {r}_docked/{n} $*" }
log started
ls *{r}*.fld >filelist
/apps/docking/create_inp.py {n}.pq >>filelist
rm {n}.pq
log completed file list
autodock_gpu_64wi -filelist filelist -nrun 20 -autostop 1 -nev 3000000 >/dev/null
/apps/docking/package_out.py filelist {n}.pq
# Re-score ligand/receptor conf.
rescore:
queue: rescore
db: 1
params: [r, n]
out: {r}_scored/{n}.pq
inp:
- targets/{r}.tgz # note: untarring is automatic
- {r}_docked/{n}.pq
script:
/apps/scoring/rescore.py {r}.pdbqt {n}.pq
#BSUB -q dock
#BSUB -n 1
#BSUB -J dock
#BSUB -gres gpu:1
#SBATCH --array=1-2
source /apps/dock_env/bin/activate
export OMP_NUM_THREADS=2
DIR=$PWD
cd /dev/shm
srun --gres=gpu:1 -n1 --exclusive \
$PWD/loadem.py localhost $SLURM_JOB_ID $SLURM_ARRAY_TASK_ID
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment