diff --git a/pulsar/managers/util/drmaa/__init__.py b/pulsar/managers/util/drmaa/__init__.py index f1af42af344b9fd9730f19356f00f8fe2b77479e..1cf98ae69326932d9bb0ea7c880e0aac3331cc69 100644 --- a/pulsar/managers/util/drmaa/__init__.py +++ b/pulsar/managers/util/drmaa/__init__.py @@ -1,3 +1,7 @@ +import contextlib +import logging +import threading + try: from drmaa import Session, JobControlAction except OSError as e: @@ -10,6 +14,8 @@ except ImportError as e: NO_DRMAA_MESSAGE = "Attempt to use DRMAA, but DRMAA Python library cannot be loaded. " +log = logging.getLogger(__name__) + class DrmaaSessionFactory(object): """ @@ -22,39 +28,67 @@ class DrmaaSessionFactory(object): session_constructor = self.session_constructor if session_constructor is None: raise Exception(NO_DRMAA_MESSAGE + LOAD_ERROR_MESSAGE) - return DrmaaSession(session_constructor(), **kwds) + return DrmaaSession(session_constructor, **kwds) class DrmaaSession(object): """ Abstraction around `drmaa` module `Session` objects. """ + session_lock = threading.Lock() + session_count = 0 + session = None + + def __init__(self, session_constructor, **kwds): + with self._session_lock(): + if DrmaaSession.session is None: + if DrmaaSession.session_count != 0: + log.warn("DrmaaSession.session is None but session_count is non-zero - logic error occurred.") + log.debug("Initializing DRMAA session from thread %s", threading.current_thread().name) + DrmaaSession.session = session_constructor() + DrmaaSession.session.initialize() + DrmaaSession.session_count += 1 - def __init__(self, session, **kwds): - self.session = session - session.initialize() + @contextlib.contextmanager + def _session_lock(self): + with DrmaaSession.session_lock: + yield def run_job(self, **kwds): """ Create a DRMAA job template, populate with specified properties, run the job, and return the external_job_id. """ - template = self.session.createJobTemplate() + template = DrmaaSession.session.createJobTemplate() try: for key in kwds: setattr(template, key, kwds[key]) - return self.session.runJob(template) + with DrmaaSession.session_lock: + return DrmaaSession.session.runJob(template) finally: - self.session.deleteJobTemplate(template) + DrmaaSession.session.deleteJobTemplate(template) def kill(self, external_job_id): - return self.session.control(str(external_job_id), JobControlAction.TERMINATE) + with DrmaaSession.session_lock: + return DrmaaSession.session.control(str(external_job_id), JobControlAction.TERMINATE) def job_status(self, external_job_id): - return self.session.jobStatus(str(external_job_id)) + return DrmaaSession.session.jobStatus(str(external_job_id)) def close(self): - return self.session.exit() + with self._session_lock(): + if DrmaaSession.session_count == 0: + log.warn("close() called with zero active session counted - logic error.") + return + + DrmaaSession.session_count -= 1 + if DrmaaSession.session_count == 0: + if DrmaaSession.session is None: + log.warn("close() called with a non-zero session count but no session is defined.") + return + + DrmaaSession.session.exit() + DrmaaSession.session = None __all__ = ['DrmaaSessionFactory']