Commit 12da438d authored by David M. Rogers's avatar David M. Rogers
Browse files

Merged changes.

parents e4219e45 916ded6f
This diff is collapsed.
TODO: periodic status updates on DB health
* v. 2.0
* Created a rules.yaml entry for each job-type
- stores inp file names
- stores script generated per file
* generalized loadem.py to run prologue / epilogue
scripts copying data to/from `config.py`-specified location
- this uses the job metadata created in 2
* Improved loadem.py to store 'assigned'
items inside assigned/M (where M is config's unique_name())
And check 'assigned/M' on startup!
* Create separate DBs for each function name
- This lets the workflow crunch different jobs
with different reqs.
- associate the DB access info. with a job-type
* Package launch scripts to reference launch config. file
- these files are: env.sh and conf.py (see README.md)
- allow flexible methods for starting per-job services
- consolidate all system paths into one config
(used 2 config files)
* Documented configure and run steps
- esp. python/anaconda environment packaging
- db scanning and analysis steps
- user-level database interaction activities
- backup/checkpoint
- status query
- load/reset/dump
- slurm cluster care and feeding
* v. 1.2
* Add check for repeat failures of docking.
......@@ -10,10 +36,17 @@ TODO: periodic status updates on DB health
* v. 1.0
* Remove hopper limits to run complete docking.
* v. 1.0
* Setup to run with multiple job types (rules.yaml)
* loadem.py is passed a worker index (e.g. SLURM_ARRAY_TASK_ID)
and resumes assigned tasks on startup
* DB keys changed to 'ready', 'a_0', 'a_1', ..., 'errors'
* removed 'done' set.
* v. 0.6
* Added logfile into tgz outputs
* Added 'hopper' global counter to terminate after running n ds
* Added some comments to key steps
* Added logfile into tgz outputs
* Added 'hopper' global counter to terminate after running n ds
* Added some comments to key steps
* v. 0.5
* fix stupid bug
......@@ -37,4 +70,6 @@ TODO: periodic status updates on DB health
* ran redis fill/empty test for listing ds
* v 0.1
* test the run_ad.sh script on 1 docking shard
* Test the run_ad.sh script on 1 docking shard.
This is a modified version of a fireworks script from
Swen Boehm (ORNL).
# Large-Scale Docking Workflow
This package contains the heart of the docking
workflow manager to allow efficient docking
runs at large scale.
It requires several subsystems that work together in tandem:
* Pre-requisites (not included):
* A fast docking code
- we recommend https://github.com/jvermaas/AutoDock-GPU/tree/relicensing
- or https://github.com/scottlegrand/AutoDock-GPU
* A Slurm Cluster
* A Redis server network-accessible from compute nodes
* Either a shared filesystem or a gcloud bucket
- You implement your file source/destination inside `config.py`.
* Internal machinery:
* A `rules.yaml` file listing how to run each docking step.
* A ``launchad.py`` file to be run on each accelerator device
to pull job-IDs from the database and run the appropriate
script from `rules.yaml`
## Interacting With the Work Queue Database
To run the process, you customize one of the slurm/lsf
batch job templates, then run it.
The LSF template (`run_docking.lsf`) starts
up the database during the job. The slurm templates
(like `docker.sh`) rely on a pre-existing databse.
Either way, the jobs call `loadem.py` in parallel.
That script talks to the redis DB number listed in `rules.yaml`
and operates on 3 Redis sets:
* ready: task strings ready to run
- The task strings are arbitrary, but are split by
whitespace into tokens to fill out "params" in `rules.yaml`.
* errors: task strings resulting in nonzero return value
* hosts: list of hosts currently online
As a user, you need to load up the database
with task strings and check the tasks remanining
and error sets after each run.
Of course, you can use redis' standard mechanisms
for backing up the database.
## Rescoring Setup
The rescoring scripts pull multiple docked ligand parquet
files and combine all the results into a single parquet.
This makes a good naming scheme important. You can check
the rescore.py file to see what we're using now, but we plan
to make this simpler in the future.
### Python/Anaconda Environment Setup
* Python Packages
* redis
* pandas
* numpy
* oddt
ODDT is only required by the `rescore` scripts. I ended up
installing this with Anaconda and using a separate environment
for it. That environment still needs all the other packages
above.
# Files Included
Example Job Scripts:
- Note the jobname is passed to `env.sh` and `launchad.py`
to determine the environment to load and the job type to run.
* `breaker.sh` -- breakup a set of large ligand pq-s into smaller ones
* `docker.sh` -- version of docking used for large runs (<2000 array)
* `small_docker.sh` -- version of docking used for small runs (<1000 array)
* `rescore.sh`
* `run_docking.lsf` -- old summit script to run large docking, needs updating
* `test_dock.lsf` -- old summit script to dock with a few nodes, needs updating
* `test_redis.lsf` -- old summit script to load up, then drain the DB at max speed
Helper scripts:
* `helpers.py`
* `q2.py` -- python3 multiprocessing work stream
Analysis Scripts:
* `parse_log.py` -- parse timestamps to summarize run timings
* `pack.py` -- pack pdbqt files into a pq
* `rescore3.py` -- rescore interactively
Configuration Scripts:
* `config.py` -- functions to move data
(encapsulating specifics of global file store)
* `env.sh` -- called at start of jobs to setup environment
variables and cd to `/dev/shm`
* `shards.conf` -- you can use this as a template to run your
Redis server.
Main Working Scripts:
* `loadem.py` -- main launching script
* `breakup.py` -- breakup larger parquets into smaller ones
* `create_inp.py` -- print a list of ligands and create pdbqt-s from a pq
* `package_out.py` -- package up docking outputs
* `rescore.py` -- pq -> pq parallel rescoring
DB Interaction Helpers:
* `requeue.py` -- move all assigned jobs and errors back into ready state
* `setdb.py` -- load new jobs into the task db
Python imports for all codes:
* numpy -- I installed these using pip in a python3 venv
* pandas
* redis -- this is the python client API (https://pypi.org/project/redis/)
1. fix run_docking.lsf to trap errors and shutdown the DB on time-up signal
#. Fix shutdown process in rescore.py so that errors don't cause it to hang!
#. change file naming scheme to something more user-defined
#. curate complete dataset for each job type [1h]
- fix input tarballs on Summit
#. build & run test jobs for each jobtype [3h]
- small database (e.g. enamine diversity)
- protein point & click
#. Change rescoring calculation to use GPU-accelerated contact
code @ https://github.com/scottlegrand/contacts
#. Create python worker that attaches to job-db API [2h]
using ZeroMQ
- make this transport agnostic and test on Summit
#. add FAQ on common error conditions
- protein conversion error
- docking step failure (reset & rerun scripts)
#. consolidate docking codebase
#. create separate directory per work item,
so stage-in/out can be completed asynchronously
#. create status query scripts parameterized over
jobtype (i.e. database name)
- tabled, since query script accepts db ID
#!/bin/bash
#SBATCH -p rescore
#SBATCH --nodes 1
#SBATCH -n64
#SBATCH -J breakup
#SBATCH -o %x.%A_%a.out
#SBATCH --array=1-5
echo "Starting $SLURM_JOB_NAME-$SLURM_ARRAY_TASK_ID at" `date`
source /apps/launchad/env.sh $SLURM_JOB_NAME
export OMP_NUM_THREADS=1
srun -n64 -N1 $LAD $redis_host $SLURM_JOB_NAME
echo "Completed $SLURM_JOB_NAME-$SLURM_ARRAY_TASK_ID at" `date`
#!/usr/bin/env python3
#
# Copyright 2020 Oak Ridge National Laboratory
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
#
# Break up large parquet into smaller ones.
import pandas as pd
def ihash(x):
return "%x"%( (48271*x)%2147483647 )
def main(argv):
n = None
if len(argv) >= 3 and argv[1] == '-n':
n = int(argv[2])
del argv[1:3]
assert len(argv) == 4, "Usage: %s [-n nout] <start seq> <inp.pq> <out-fmt.pq>"
start = int(argv[1])
df = pd.read_parquet(argv[2])
out = argv[3]
if n is None:
chunk = 1024
n = len(df) // chunk # number of output files
chunk = len(df) // n
if chunk == 0:
raise IndexError("len(df) < n # need fewer output files.")
low = len(df) % n # outputs 0, 1, ..., low-1 get +1 output
# chunk*n + low == len(df)
print("Creating %d outputs of sz = %d, %d of size %d"%(
n-low, chunk+1, low, chunk))
k = 0
for i in range(n):
oname = out % ihash(start+i)
m = chunk + (i < low)
z = df.iloc[k:k+m][['name', 'conf']].set_index('name')
z.to_parquet(oname,
compression='snappy',
engine='pyarrow'
)
k += m
if __name__=="__main__":
import sys
main(sys.argv)
#
# Copyright 2020 Oak Ridge National Laboratory
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# Implement the copyin and moveout functions here.
# There's also a unique_name function which lets the
# database know what effective virtual host name you have.
from pathlib import Path, PurePosixPath
import os, subprocess, time
bucket = 'gs://ccddc'
# Use environment variables to determine a unique rank name:
# may potentially need username = os.environ['USER']
def unique_name():
# Can also stagger jobs here to limit to
# 5k connections per second at startup:
try:
rank = os.environ['SLURM_ARRAY_TASK_ID']
time.sleep(int(rank)*0.0002)
except KeyError:
rank = "x"
try:
rank = rank + "-" + os.environ['SLURM_PROCID']
except KeyError:
pass
return rank
def gsutil(cmd):
sdir = Path("gsutil")
sdir.mkdir(exist_ok=True)
args = ["gsutil", "-o", "GSUtil:parallel_process_count=1",
"-o", "GSUtil:parallel_thread_count=1",
"-o", "GSUtil:state_dir=%s" % str(sdir)
] + cmd
return subprocess.call( args )
def copyin(name, bucket = bucket, static=False):
base = PurePosixPath(name).name
static = (base[-4:] == '.tgz') # hack for tgz
if static and Path(base).is_file():
print("Skipping cached input %s"%base)
return
ret = gsutil( ["cp", bucket + '/' + name, base] )
if ret: return ret
if base[-4:] == '.tgz':
ret = subprocess.call("tar xzf {0} && echo >{0}".format(base), shell=True)
if ret: return ret
return ret
def moveout(name, bucket = bucket):
if name[-1] == '/': # move whole directory
return gsutil( ["-m", "cp", name+"*", bucket + '/' + name] )
loc = PurePosixPath(name).name
return gsutil( ["mv", loc, bucket + '/' + name] )
#!/usr/bin/env python3
#
# Copyright 2020 Oak Ridge National Laboratory
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
# Note: we need to prune the metadata on the ligands
# down to the stuff from ROOT .. TER
import re
import pandas as pd
expr = re.compile('ROOT\n.*?\nTER\n', re.DOTALL)
# essentially we have to strip all 'MODEL', 'USER' and 'ENDMDL' lines
def fix(s):
#return s
s = s.replace('\r', '')
m = expr.search(s)
if m is not None:
return m[0]
return s[s.index("ROOT"):]
# Pandas has too many types
def get_name(df, i, idx):
if idx:
return df.index[i]
else:
return df.iloc[i]['name']
# This script breaks out a dataframe into several
# ligand pdbqt files. It also prints a list of <name>\n<file>
# pairs suitable for sending to autodock-GPU's filelist.
#
# Note the workarounds for different conventions of input
# dataframes: frames with name-as-index
# or pdbqt-s containing extraneous data to remove
def main(argv):
assert len(argv) == 2, "Usage: %s <ligs.pq>"
df = pd.read_parquet(argv[1])
idx = True
if 'name' in df.columns:
idx = False
for i in range(len(df)):
name = get_name(df, i, idx)
conf = df.iloc[i]['conf']
fname = name + '.pdbqt'
with open(fname, "w") as f:
f.write( fix(conf) )
print( "%s\n%s"%(name, fname) )
if __name__=="__main__":
import sys
main(sys.argv)
#!/bin/bash
#SBATCH -p dock
#SBATCH --nodes 2
#SBATCH --cpus-per-task 2
#SBATCH --gres gpu:1
#SBATCH -J dock
#SBATCH -o %x.%A_%a.%j.out
#SBATCH --array=1-868
echo "Starting $SLURM_JOB_NAME-$SLURM_ARRAY_TASK_ID at" `date`
source /apps/launchad/env.sh $SLURM_JOB_NAME
export OMP_NUM_THREADS=1
srun -n2 -N2 --gres=gpu:1 --cpus-per-task=2 --exclusive \
$LAD ccddc-controller $SLURM_JOB_NAME
echo "Completed $SLURM_JOB_NAME-$SLURM_ARRAY_TASK_ID at" `date`
#!/bin/bash
# This script is run once -- at the start of a job script.
#
# It must setup the environment for the programs to be run
# and then cd into the top-level working directory (usually /dev/shm).
redis_host=ccddc-controller
LAD=/apps/launchad/loadem.py # launcher to run (using srun)
# run with arg = job name
case $1 in
rescore)
eval "$(/apps/anaconda3/bin/conda shell.bash hook)"
# includes oddt
conda activate rescore2
;;
*)
source /apps/dock_env/env.sh
;;
esac
# run from here
cd /dev/shm
# Copyright 2020 Oak Ridge National Laboratory
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
#
import re
import tarfile as tf
from pathlib import Path
import numpy as np
newaxis = np.newaxis
map = lambda f, x: [f(y) for y in x]
# iterator over tar-file
def tar_iter(name, ext=None):
inp = Path(name)
fname = inp.name.split('.', 1)[0]
with tf.open(inp, 'r|*') as tar:
data = None
for tarinfo in tar:
#if tarinfo.isreg():
# print(tarinfo.name, "is", tarinfo.size, "bytes in size")
if ext is not None and tarinfo.name[-len(ext)-1:] != '.'+ext:
continue
name = tarinfo.name[:-len(ext)-1] # get a good name from the file
#if name[:2] == './':
# name = name[2:]
name = name.rsplit('/', 1)[-1]
with tar.extractfile(tarinfo) as f:
yield fname, name, f
# parse the first cluster from each included xml file.
# returns [ (score : float, run : int) ]
# example cluster syntax is:
# <cluster cluster_rank="1" lowest_binding_energy="-4.09" run="9" mean_binding_energy="-4.09" num_in_clus="1" />
cl = re.compile(r'\s*<cluster cluster_rank="([^"]*)" lowest_binding_energy="([^"]*)" run="([^"]*)" mean_binding_energy="([^"]*)" num_in_clus="([^"]*)" />')
# *could* also parse runs:
# example run syntax is:
# <run rank="1" sub_rank="1" run="5" binding_energy="-3.96" cluster_rmsd="0.00" reference_rmsd="86.97" />
def xml_to_energy(f):
rank = 1
ans = []
for line in f:
m = cl.match(line)
if m:
assert rank == int(m[1])
rank += 1
ans.append( (float(m[2]), int(m[3])) )
return ans
def grep_all(f, *keys):
out = dict((k,[]) for k in keys)
for line in f:
#line = line.decode('utf8')
for k in keys:
if len(line) >= len(k) and line[:len(k)] == k:
out[k].append(line)
return out
# Directly parse a dlg into a list of energies and confs.
def parse_dlg(f):
#dlg = grep_all(f, "DOCKED:", "Estimated", "Number of atoms", "Number of rotat")
dlg = grep_all(f, "DOCKED:", "Number of rotatable bonds:")
confs = dlg_to_confs(dlg['DOCKED:'])
# truncate to max 5
#confs = confs[:min(5,len(confs))]
try:
tors = int( dlg['Number of rotatable bonds:'][0][26:] )
except:
tors = np.nan
return confs, tors # [(en, pdbqt)], int
# parse the xml and dlg file for the ligand name
def collect_lig(name, max_clusters=3):
with open(name+'.xml', encoding='utf-8') as f:
xml = xml_to_energy(f)
with open(name+'.dlg', encoding='utf-8') as f:
confs, tors = parse_dlg(f)
if len(xml) > max_clusters:
xml = xml[:max_clusters]
return [(en, confs[i-1][1]) for en,i in xml], tors
# inputstrings are all containing "DOCKED:"
# [(en,pdbqt)]
def dlg_to_confs(lines):
confs = []
conf = []
en = None
for s in lines:
conf.append( s[8:].strip() )
#print(s[8:].strip())
# DOCKED: USER Estimated Free Energy of Binding = -7.85 kcal/mol
if "Estimated Free Energy of Binding" in s:
tok = s.replace('=','').split()
en = float(tok[7])
elif s[8:14] == "ENDMDL":
assert en is not None
confs.append( (en, conf) )
conf = []
en = None
#confs.sort()
return confs
# Basically, autodock will spit out -nruns number of poses, which I chose to output to a .pdb file so I could load them in typical molecular visualization tools. This also means that you'd need to get the energies for all the models, and sort them. This is what I was doing on a much smaller set:
# lines that contain "Estimated"
def dlg_to_energy(lines):
en = []
for line in lines[:-1]:
en.append( line.replace('=','').split()[7] )
return en
# You can also get ligand data from the dlg if you like.
def dlg_liganddata(atline, rotline):
atomcount = int(atline.split()[-1])
torsioncount = int(rotline.split()[-1])
return (atomcount, torsioncount)
# conf is a list of lines like:
# ATOM 2 C LIG 1 7.314 -0.197 19.453
def validate_conf(conf):
try:
xyz = [ map(float, [line[30:38], line[38:46], line[46:54]]) \
for line in conf if line[:6] == "ATOM " or line[:6] == "HETATM" ]
x = np.array(xyz)
linf = np.abs( x[:,newaxis] - x ).max(2)
linf += np.identity(len(linf))
if linf.min() < 0.05:
return 0
except ValueError: # couldn't parse coords.
return 0
return len(xyz)
def add_score(name, confs):
#conf, ntor = parse_dlg(f)
conf, ntor = collect_lig(name, len(confs))
for i in range(len(confs)):
if len(conf) <= i:
confs[i][0].append(None)
confs[i][1].append(None)
else:
confs[i][0].append(conf[i][0]) # en[i]
confs[i][1].append("\n".join(conf[i][1])) # crd[i]