Commit 46f5b5e4 authored by David M. Rogers's avatar David M. Rogers
Browse files

refactoring for modularity

parent 0842e46a
* v. 2.0
* Created a rules.yaml entry for each job-type
- stores inp file names
- stores script generated per file
* generalized loadem.py to run prologue / epilogue
scripts copying data to/from `config.py`-specified location
- this uses the job metadata created in 2
* Improved loadem.py to store 'assigned'
items inside assigned/M (where M is config's unique_name())
And check 'assigned/M' on startup!
* Create separate DBs for each function name
- This lets the workflow crunch different jobs
with different reqs.
- associate the DB access info. with a job-type
* Package launch scripts to reference launch config. file
- these files are: env.sh and conf.py (see README.md)
- allow flexible methods for starting per-job services
- consolidate all system paths into one config
(used 2 config files)
* Documented configure and run steps
- esp. python/anaconda environment packaging
- db scanning and analysis steps
- user-level database interaction activities
- backup/checkpoint
- status query
- load/reset/dump
- slurm cluster care and feeding
* v. 1.0
* Setup to run with multiple job types (rules.yaml)
* loadem.py is passed a worker index (e.g. SLURM_ARRAY_TASK_ID)
......
# Pre-requisites
# Large-Scale Docking Workflow
This package contains the heart of the docking
workflow manager to allow efficient docking
runs at large scale.
It requires several subsystems that work together in tandem:
* Pre-requisites (not included):
* a fast docking code
* a Slurm Cluster
* a `dwork` server network-accessible from compute nodes
* either a shared filesystem or a gcloud bucket
* Internal machinery:
* A `rules.yaml` file listing how to run each docking step.
* A ``launchad.py`` file to be run on each accelerator device
to pull job-IDs from the database and run the appropriate
script from `rules.yaml`
## Interacting With the Work Queue Database
- db scanning and analysis steps
- user-level database interaction activities
- backup/checkpoint
- status query
- load/reset/dump
## Rescoring Setup
### Python/Anaconda Environment Setup
* Python Packages
* redis
* pandas
* numpy
* oddt
# Files Included
Example Job Scripts:
- Note the jobname is passed to `env.sh` and `launchad.py`
to determine the environment to load and the job type to run.
* `breaker.sh` -- breakup a set of large ligand pq-s into smaller ones
* `docker.sh` -- version of docking used for large runs (<2000 array)
* `small_docker.sh` -- version of docking used for small runs (<1000 array)
* `rescore.sh`
* `run_docking.lsf` -- old summit script to run large docking, needs updating
* `test_dock.lsf` -- old summit script to dock with a few nodes, needs updating
* `test_redis.lsf` -- old summit script to load up, then drain the DB at max speed
Helper scripts:
* `helpers.py`
* `q2.py` -- python3 multiprocessing work stream
Analysis Scripts:
* `parse_log.py` -- parse timestamps to summarize run timings
* `pack.py` -- pack pdbqt files into a pq
* `rescore3.py` -- rescore interactively
Configuration Scripts:
* `config.py` -- functions to move data
(encapsulating specifics of global file store)
* `env.sh` -- called at start of jobs to setup environment
variables and cd to `/dev/shm`
* `shards.conf` -- you can use this as a template to run your
Redis server.
Main Working Scripts:
* `loadem.py` -- main launching script
* `breakup.py` -- breakup larger parquets into smaller ones
* `create_inp.py` -- print a list of ligands and create pdbqt-s from a pq
* `package_out.py` -- package up docking outputs
* `rescore.py` -- pq -> pq parallel rescoring
DB Interaction Helpers:
* `requeue.py` -- move all assigned jobs and errors back into ready state
* `setdb.py` -- load new jobs into the task db
Python imports for all codes:
* numpy -- I installed these using pip in a python3 venv
* pandas
* redis -- this is the python client API (https://pypi.org/project/redis/)
Rescore imports:
* oddt -- I installed this with Anaconda
* don't forget the above libraries too!
* Python Packages
* pandas
* numpy
* oddt
x. Create a rules.yaml entry for each job-type
- store inp file names
- store script generated per file
1. change file naming scheme to something more user-defined
x. generalize loadem.py to run prologue / epilogue
scripts copying data to/from GS://ccddc
- this uses the job metadata created in 2
2. curate complete dataset for each job type [1h]
- fix input tarballs on Summit
2.5. Improve loadem.py to store 'assigned'
items inside assigned/M (where M is the job-array-ID).
And check 'assigned/M' on startup!
3. build & run test jobs for each jobtype [3h]
- small database (e.g. enamine diversity)
- protein point & click
3. Create separate DBs for each function name
- This lets the workflow crunch different jobs
with different reqs.
- associate the DB access info. with a job-type
4. Create python worker that attaches to job-db API [2h]
using ZeroMQ
- make this transport agnostic and test on Summit
4. create status query scripts parameterized over
jobtype (i.e. database name)
6. add FAQ on common error conditions
- protein conversion error
- docking step failure (reset & rerun scripts)
7. consolidate docking codebase
5. build & run test jobs for each jobtype
8. create separate directory per work item,
so stage-in/out can be completed asynchronously
*. create status query scripts parameterized over
jobtype (i.e. database name)
- tabled, since query script accepts db ID
6. curate complete dataset for each job type
......@@ -7,12 +7,9 @@
#SBATCH --array=1-5
echo "Starting $SLURM_JOB_NAME-$SLURM_ARRAY_TASK_ID at" `date`
source /apps/dock_env/env.sh
source /apps/launchad/env.sh $SLURM_JOB_NAME
export OMP_NUM_THREADS=1
DIR=/apps/launchad
cd /dev/shm
srun -n64 -N1 $DIR/loadem.py ccddc-controller $SLURM_JOB_NAME
srun -n64 -N1 $LAD $redis_host $SLURM_JOB_NAME
echo "Completed $SLURM_JOB_NAME-$SLURM_ARRAY_TASK_ID at" `date`
# Implement the copyin and moveout functions here.
# There's also a unique_name function which lets the
# database know what effective virtual host name you have.
from pathlib import Path, PurePosixPath
import os, subprocess
bucket = 'gs://ccddc'
# Use environment variables to determine a unique rank name:
# may potentially need username = os.environ['USER']
def unique_name():
# Can also stagger jobs here to limit to
# 5k connections per second at startup:
try:
rank = os.environ['SLURM_ARRAY_TASK_ID']
time.sleep(int(rank)*0.0002)
except KeyError:
rank = "x"
try:
rank = rank + "-" + os.environ['SLURM_PROCID']
except KeyError:
pass
return rank
def gsutil(cmd):
sdir = Path("gsutil")
sdir.mkdir(exist_ok=True)
args = ["gsutil", "-o", "GSUtil:parallel_process_count=1",
"-o", "GSUtil:parallel_thread_count=1",
"-o", "GSUtil:state_dir=%s" % str(sdir)
] + cmd
return subprocess.call( args )
def copyin(name, bucket = bucket, static=False):
base = PurePosixPath(name).name
static = (base[-4:] == '.tgz') # hack for tgz
if static and Path(base).is_file():
print("Skipping cached input %s"%base)
return
ret = gsutil( ["cp", bucket + '/' + name, base] )
if ret: return ret
if base[-4:] == '.tgz':
ret = subprocess.call("tar xzf {0} && echo >{0}".format(base), shell=True)
if ret: return ret
return ret
def moveout(name, bucket = bucket):
if name[-1] == '/': # move whole directory
return gsutil( ["-m", "cp", name+"*", bucket + '/' + name] )
loc = PurePosixPath(name).name
return gsutil( ["mv", loc, bucket + '/' + name] )
......@@ -7,15 +7,11 @@
#SBATCH -o %x.%A_%a.%j.out
#SBATCH --array=1-866
# TODO: add date/time to output filename
echo "Starting $SLURM_JOB_NAME-$SLURM_ARRAY_TASK_ID at" `date`
source /apps/dock_env/env.sh
source /apps/launchad/env.sh $SLURM_JOB_NAME
export OMP_NUM_THREADS=1
DIR=/apps/launchad
cd /dev/shm
export OMP_NUM_THREADS=2
srun -n2 -N2 --gres=gpu:1 --cpus-per-task=2 --exclusive \
$DIR/loadem.py ccddc-controller $SLURM_JOB_NAME
$LAD ccddc-controller $SLURM_JOB_NAME
echo "Completed $SLURM_JOB_NAME-$SLURM_ARRAY_TASK_ID at" `date`
#!/bin/bash
# This script is run once -- at the start of a job script.
#
# It must setup the environment for the programs to be run
# and then cd into the top-level working directory (usually /dev/shm).
redis_host=ccddc-controller
LAD=/apps/launchad/loadem.py # launcher to run (using srun)
# run with arg = job name
case $1 in
rescore)
eval "$(/apps/anaconda3/bin/conda shell.bash hook)"
# includes oddt
conda activate rescore2
;;
*)
source /apps/dock_env/env.sh
;;
esac
# run from here
cd /dev/shm
......@@ -5,9 +5,10 @@ from pathlib import Path, PurePosixPath
import redis, time, random
from datetime import datetime
from config import copyin, moveout, unique_name
base = Path(__file__).resolve().parent
rules = yaml.safe_load(open(base / 'rules.yaml'))
bucket = 'gs://ccddc'
test = False
hopper = False
......@@ -53,32 +54,6 @@ def get_item(host, db, assigned):
return item
return item.decode('utf8')
def gsutil(cmd):
args = ["gsutil", "-o", "GSUtil:parallel_process_count=1",
"-o", "GSUtil:parallel_thread_count=1",
"-o", "GSUtil:state_dir=%s"%str(base / "gsutil")
] + cmd
return subprocess.call( args )
def copyin(name, bucket = bucket, static=False):
base = PurePosixPath(name).name
static = (base[-4:] == '.tgz') # hack for tgz
if static and Path(base).is_file():
print("Skipping cached input %s"%base)
return
ret = gsutil( ["cp", bucket + '/' + name, base] )
if ret: return ret
if base[-4:] == '.tgz':
ret = subprocess.call("tar xzf {0} && echo >{0}".format(base), shell=True)
if ret: return ret
return ret
def moveout(name, bucket = bucket):
if name[-1] == '/': # move whole directory
return gsutil( ["-m", "cp", name+"*", bucket + '/' + name] )
loc = PurePosixPath(name).name
return gsutil( ["mv", loc, bucket + '/' + name] )
def cleanup(job):
for inp in job['inp'] + job['out']:
loc = PurePosixPath(inp.format(**job)).name
......@@ -118,14 +93,14 @@ def requeue(assigned, host, db):
raise IndexError("More than 10 items assigned to %s!"%assigned)
run_redis(run, host, db)
# Apparently, we need to get some things sorted first.
# Set up the working directory and fix broken ulimits
def setup(rank):
print("%s Rank %s starting."%(stamp(),rank))
if Path(rank).exists():
subprocess.call( ["rm", "-fr", rank] )
os.mkdir(rank)
os.chdir(rank)
os.mkdir("gsutil")
# max out resource limits
import resource
s,h = resource.getrlimit(resource.RLIMIT_NPROC)
resource.setrlimit(resource.RLIMIT_NPROC, (h,h))
......@@ -133,34 +108,23 @@ def setup(rank):
resource.setrlimit(resource.RLIMIT_NOFILE, (h,h))
# usually run as:
# loadem.py localhost $SLURM_JOB_ID $SLURM_ARRAY_TASK_ID
# loadem.py localhost $SLURM_JOB_ID
def main(argv):
global conn_retries
assert len(argv) == 3, "Usage: %s <redis host> <job type>"
host = argv[1]
jobname = argv[2]
# Determine a unique rank name
try:
rank = os.environ['SLURM_ARRAY_TASK_ID']
except KeyError:
rank = "x"
try:
rank = rank + "-" + os.environ['SLURM_PROCID']
except KeyError:
pass
rank = unique_name()
setup(rank)
job = rules[jobname]
db = int(job['db']) # assoc. redis db
#username = os.environ['USER']
# assignment key for this rank
assigned = 'a_%s'%rank
#time.sleep(rank*0.001) # 1k connections per second at startup
requeue(assigned, host, db)
n = 0
......
......@@ -15,7 +15,4 @@ z = pd.DataFrame(data={ 'name': names
}
)
z.to_parquet('control.pq',
compression='snappy',
engine='pyarrow'
)
z.to_parquet('control.pq')
......@@ -66,7 +66,7 @@ def process_inp(r, name):
else:
ans = pd.DataFrame()
ans.to_parquet(name+'.pq', compression='snappy', engine='pyarrow')
ans.to_parquet(name+'.pq')
end.set()
return stop_procs(loaders + [rf3, dude2])
......
......@@ -7,14 +7,8 @@
#SBATCH --array=1-13
echo "Starting $SLURM_JOB_NAME-$SLURM_ARRAY_TASK_ID at" `date`
source /apps/launchad/env.sh $SLURM_JOB_NAME
#export OMP_NUM_THREADS=1
#source /apps/dock_env/env.sh
eval "$(/apps/anaconda3/bin/conda shell.bash hook)"
conda activate rescore2
DIR=/apps/launchad
cd /dev/shm
srun --cpus-per-task=64 -N1 $DIR/loadem.py ccddc-controller $SLURM_JOB_NAME
srun --cpus-per-task=64 -N1 $LAD ccddc-controller $SLURM_JOB_NAME
echo "Completed $SLURM_JOB_NAME-$SLURM_ARRAY_TASK_ID at" `date`
......@@ -68,7 +68,7 @@ def process_inp(r, name):
else:
ans = pd.DataFrame()
ans.to_parquet(name+'.pq', compression='snappy', engine='pyarrow')
ans.to_parquet(name+'.pq')
end.set()
return stop_procs(loaders + [rf3, dude2])
......
#!/bin/bash
# modules: cuda gcc
# workflow copied from:
# /ccs/proj/bif128/fireworks/docking/docking_launcher/dynamic/docking.py
# example shard: /gpfs/alpine/bif128/world-shared/ligand_shards/A_C_F_N_P_Si_HD_Cl_NA_S_OA_Br_SA_13types_output_p9512.tar.gz
# - shard_name: p9512
# - shard_segment: 0
export OMP_NUM_THREADS=7
set -e
version="run_ad.sh v1.2"
if [ $# -ne 2 ]; then
echo "Usage: $0 shard_name shard_segment"
exit 1
fi
shard_name=$1
seg=$2
sfile=`grep $shard_name.tar.gz /gpfs/alpine/bif128/world-shared/ligand_shards.txt`
if [ ! -s "$sfile" ]; then
echo "Missing $sfile"
exit 1
fi
pfile=/gpfs/alpine/world-shared/bif128/6WQF_maps_Josh.tgz
if [ ! -s $pfile ]; then
echo "Missing $pfile"
exit 1
fi
HOST=`hostname`
OUT_DIR=/gpfs/alpine/world-shared/bif128/6WQF_docked/$LSB_JOBID/$HOST
mkdir -p $OUT_DIR
log() {
echo $(date +"%F %H:%M:%S.%N") "($version) $*" >>$OUT_DIR/$OMPI_COMM_WORLD_RANK.status
}
log started $shard_name $seg
# prep directories
WORK_DIR=/mnt/bb/$USER/$OMPI_COMM_WORLD_RANK/$shard_name
mkdir -p $WORK_DIR
cd $WORK_DIR
if [ ! -s `basename $sfile` ]; then
# copy-in function
cp $sfile $pfile .
tar -xzf `basename $sfile` # populates ligands/ dir.
tar -xzf `basename $pfile` # unzips *fld
# ls
# A_C_F_N_P_Si_HD_Cl_NA_S_OA_Br_SA_13types_output_p9512.tar.gz
# final_Mpro_pdbqt_maps_for_1B.tgz
# final_MPro_pdbqt_maps_for_1B
# ligands
[ -d final_MPro_pdbqt_maps_for_1B ] && mv final_MPro_pdbqt_maps_for_1B/* .
ls ligands/* >liglist
ls -1 ligands | cut -f 1 -d '.' >lignames
log completed copyin
fi
# build the file list
ls *fld >filelist.$seg
n=`wc -l <lignames`
start=$(( n*seg/10 + 1 ))
end=$(( n*(seg+1)/10 ))
sed -n -e ${start},${end}p lignames >lignames.$seg
sed -n -e ${start},${end}p liglist >liglist.$seg
paste -d '\n' lignames.$seg liglist.$seg >>filelist.$seg
log completed segment file list ${start} to ${end}
# run AD
/ccs/proj/bif128/docking/autodock_gpu_64wi -filelist filelist.$seg -nrun 20 -autostop 1 -nev 3000000 >${seg}.log
log completed docking
# copy-out function
#cd $WORK_DIR
tar czf $shard_name.$seg.tgz ${seg}.log `awk '{printf("%s.xml\n%s.dlg\n",$0,$0);}' lignames.$seg` \
|| echo "Error tarring some files."
cp $shard_name.$seg.tgz $OUT_DIR
#rm -fr $WORK_DIR
log completed copyout
......@@ -8,12 +8,10 @@
#SBATCH --array=1-2
echo "Starting $SLURM_JOB_NAME-$SLURM_ARRAY_TASK_ID at" `date`
source /apps/dock_env/env.sh
source /apps/launchad/env.sh $SLURM_JOB_NAME
export OMP_NUM_THREADS=1
DIR=/apps/launchad
cd /dev/shm
srun -n1 -N1 --gres=gpu:1 --cpus-per-task=2 --exclusive \
$DIR/loadem.py ccddc-controller $SLURM_JOB_NAME
$LAD ccddc-controller $SLURM_JOB_NAME
echo "Completed $SLURM_JOB_NAME-$SLURM_ARRAY_TASK_ID at" `date`
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment