Commit 83043df9 authored by David M. Rogers's avatar David M. Rogers
Browse files

Updated rescore for intra-pq parallelism.

parent 12da438d
......@@ -25,28 +25,12 @@ from q2 import Event, Worker, WorkQueue, time
import oddt
def fhash(x):
return (48271*x)%2147483647
def ihash(y):
return (1899818559*y)%2147483647
threads = 33
batch_sz = 648
def gsutil(cmd):
args = ["gsutil", "-o", "GSUtil:parallel_process_count=1"
, "-o", "GSUtil:parallel_thread_count=%d"%threads
, "-o", "GSUtil:state_dir=gsutil"
, "-m"
] + cmd
return subprocess.call( args )
chunk_sz = 256 # rows of dataframe to process at once
def process_inp(r, name):
n = ihash( int(name, 16) )
inp = [ (n+i, "%x.pq" % fhash(n+i)) for i in range(batch_sz) ]
inp2 = [ "gs://ccddc/%s_docked/%s"%(r, i[1]) for i in inp ]
gsutil(['cp'] + inp2 + ['./'])
def process_inp(r, inp, out):
df = pd.read_parquet(inp)
end = Event()
start = WorkQueue(end, 1)
......@@ -63,40 +47,38 @@ def process_inp(r, name):
rf3 = Scorer(out1, out2)
rf3.name = "rf3"
rf3.model = "/apps/data/RFScore_v3_pdbbind2016.pickle"
rf3.model = "/gpfs/alpine/world-shared/bif128/covid19-rapids/RFScore_v3_pdbbind2016.pickle"
rf3.version = 3
rf3.start()
dude2 = Scorer(out2, done)
dude2.name = "vs_dude_v2"
dude2.model = "/apps/data/RFScoreVS_v2_dude.pickle"
dude2.model = "/gpfs/alpine/world-shared/bif128/covid19-rapids/RFScoreVS_v2_dude.pickle"
dude2.version = 2
dude2.start()
for i in inp:
start.put(i)
# send chunks down the pipe
for i in range(0, len(df), chunk_sz):
j = min(i+chunk_sz, len(df))
start.put(df.iloc[i:j])
start.fin()
# re-concat chunks
ans = [ df for df in done ]
if len(ans) > 0:
ans = pd.concat(ans)
else:
ans = pd.DataFrame()
ans.to_parquet(name+'.pq')
ans.to_parquet(out)
end.set()
return stop_procs(loaders + [rf3, dude2])
def main(argv):
global threads
global batch_sz
if len(argv) >= 3 and argv[1] == "-n":
batch_sz = int(argv[2])
threads = batch_sz+2
del argv[1:3]
assert len(argv) == 3, "Usage: %s <receptor id> <lig id>"
status = process_inp(argv[1], argv[2])
assert len(argv) == 4, "Usage: %s <receptor.pdbqt> <ligand pq> <output pq>"
status = process_inp(argv[1], argv[2], argv[3])
print(status)
class LoadMol(Worker):
......@@ -106,8 +88,7 @@ class LoadMol(Worker):
from oddt.scoring import descriptors
# set up descriptors
receptor = next(oddt.toolkit.readfile('pdbqt',
self.r+'.pdbqt'))
receptor = next(oddt.toolkit.readfile('pdbqt', self.r))
cutoff = 12
ligand_atomic_nums = [6, 7, 8, 9, 15, 16, 17, 35, 53]
......@@ -137,15 +118,7 @@ class LoadMol(Worker):
dt = time.time() - t0
print("LoadMol setup done in %f seconds"%dt)
def fn(self, i):
n, inp = i
try:
df = pd.read_parquet(inp)
os.remove(inp)
except FileNotFoundError:
print("Error: Input file %s is missing!"%inp)
return pd.DataFrame()
df['batch'] = n
def fn(self, df):
v2 = self.v2
v3 = self.v3
for x in ['', '2', '3']:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment