Skip to content
Snippets Groups Projects
Commit 68cbb967 authored by Nate Coraor's avatar Nate Coraor
Browse files

Merge pull request #97 from natefoo/global-drmaa-session

Use a global DRMAA session
parents 251a418a 5b0d7436
No related branches found
No related tags found
No related merge requests found
import contextlib
import logging
import threading
try: try:
from drmaa import Session, JobControlAction from drmaa import Session, JobControlAction
except OSError as e: except OSError as e:
...@@ -10,6 +14,8 @@ except ImportError as e: ...@@ -10,6 +14,8 @@ except ImportError as e:
NO_DRMAA_MESSAGE = "Attempt to use DRMAA, but DRMAA Python library cannot be loaded. " NO_DRMAA_MESSAGE = "Attempt to use DRMAA, but DRMAA Python library cannot be loaded. "
log = logging.getLogger(__name__)
class DrmaaSessionFactory(object): class DrmaaSessionFactory(object):
""" """
...@@ -22,39 +28,67 @@ class DrmaaSessionFactory(object): ...@@ -22,39 +28,67 @@ class DrmaaSessionFactory(object):
session_constructor = self.session_constructor session_constructor = self.session_constructor
if session_constructor is None: if session_constructor is None:
raise Exception(NO_DRMAA_MESSAGE + LOAD_ERROR_MESSAGE) raise Exception(NO_DRMAA_MESSAGE + LOAD_ERROR_MESSAGE)
return DrmaaSession(session_constructor(), **kwds) return DrmaaSession(session_constructor, **kwds)
class DrmaaSession(object): class DrmaaSession(object):
""" """
Abstraction around `drmaa` module `Session` objects. Abstraction around `drmaa` module `Session` objects.
""" """
session_lock = threading.Lock()
session_count = 0
session = None
def __init__(self, session_constructor, **kwds):
with self._session_lock():
if DrmaaSession.session is None:
if DrmaaSession.session_count != 0:
log.warn("DrmaaSession.session is None but session_count is non-zero - logic error occurred.")
log.debug("Initializing DRMAA session from thread %s", threading.current_thread().name)
DrmaaSession.session = session_constructor()
DrmaaSession.session.initialize()
DrmaaSession.session_count += 1
def __init__(self, session, **kwds): @contextlib.contextmanager
self.session = session def _session_lock(self):
session.initialize() with DrmaaSession.session_lock:
yield
def run_job(self, **kwds): def run_job(self, **kwds):
""" """
Create a DRMAA job template, populate with specified properties, Create a DRMAA job template, populate with specified properties,
run the job, and return the external_job_id. run the job, and return the external_job_id.
""" """
template = self.session.createJobTemplate() template = DrmaaSession.session.createJobTemplate()
try: try:
for key in kwds: for key in kwds:
setattr(template, key, kwds[key]) setattr(template, key, kwds[key])
return self.session.runJob(template) with DrmaaSession.session_lock:
return DrmaaSession.session.runJob(template)
finally: finally:
self.session.deleteJobTemplate(template) DrmaaSession.session.deleteJobTemplate(template)
def kill(self, external_job_id): def kill(self, external_job_id):
return self.session.control(str(external_job_id), JobControlAction.TERMINATE) with DrmaaSession.session_lock:
return DrmaaSession.session.control(str(external_job_id), JobControlAction.TERMINATE)
def job_status(self, external_job_id): def job_status(self, external_job_id):
return self.session.jobStatus(str(external_job_id)) return DrmaaSession.session.jobStatus(str(external_job_id))
def close(self): def close(self):
return self.session.exit() with self._session_lock():
if DrmaaSession.session_count == 0:
log.warn("close() called with zero active session counted - logic error.")
return
DrmaaSession.session_count -= 1
if DrmaaSession.session_count == 0:
if DrmaaSession.session is None:
log.warn("close() called with a non-zero session count but no session is defined.")
return
DrmaaSession.session.exit()
DrmaaSession.session = None
__all__ = ['DrmaaSessionFactory'] __all__ = ['DrmaaSessionFactory']
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment