"""Utilities allowing for high-level testing throughout Pulsar.""" from __future__ import print_function import traceback import sys import threading from contextlib import contextmanager from stat import S_IXOTH from os import pardir, stat, chmod, access, X_OK, pathsep, environ from os import makedirs, listdir from os.path import join, dirname, isfile, split from os.path import exists from tempfile import mkdtemp from shutil import rmtree import time import webob from webtest import TestApp from webtest.http import StopableWSGIServer import galaxy.util import pulsar.util from galaxy.util.bunch import Bunch from galaxy.jobs.metrics import NULL_JOB_INSTRUMENTER from pulsar.managers.util import drmaa from pulsar.tools import ToolBox from pulsar.managers.base import JobDirectory from pulsar.web.framework import file_response from unittest import TestCase, skip try: from nose.tools import nottest except ImportError: def nottest(x): return x import stopit from functools import wraps def timed(timeout): def outer_wrapper(f): @wraps(f) def wrapper(*args, **kwargs): with stopit.ThreadingTimeout(timeout) as to_ctx_mgr: f(*args, **kwargs) if to_ctx_mgr.state != to_ctx_mgr.EXECUTED: raise Exception("Test function timed out.") return wrapper return outer_wrapper INTEGRATION_MAXIMUM_TEST_TIME = 15 integration_test = timed(INTEGRATION_MAXIMUM_TEST_TIME) TEST_DIR = dirname(__file__) ROOT_DIR = join(TEST_DIR, pardir) class TempDirectoryTestCase(TestCase): def setUp(self): self.temp_directory = mkdtemp() def tearDown(self): rmtree(self.temp_directory) def get_test_toolbox(): toolbox_path = join(dirname(__file__), pardir, "test_data", "test_shed_toolbox.xml") toolbox = ToolBox(toolbox_path) return toolbox def get_test_tool(): return get_test_toolbox().get_tool("tool1") class TestManager(object): def setup_temp_directory(self): self.temp_directory = mkdtemp() self.__job_directory = JobDirectory(self.temp_directory, '1') def cleanup_temp_directory(self): rmtree(self.temp_directory) def job_directory(self, job_id): return self.__job_directory @contextmanager def test_job_directory(): with temp_directory() as directory: yield JobDirectory(directory, '1') @contextmanager def temp_directory(): directory = mkdtemp() try: yield directory finally: rmtree(directory) @contextmanager def test_manager(): manager = TestManager() manager.setup_temp_directory() yield manager manager.cleanup_temp_directory() class TestAuthorization(object): def __init__(self): self.allow_setup = True self.allow_tool_file = True self.allow_execution = True self.allow_config = True def authorize_setup(self): if not self.allow_setup: raise Exception def authorize_tool_file(self, name, contents): if not self.allow_tool_file: raise Exception def authorize_execution(self, job_directory, command_line): if not self.allow_execution: raise Exception def authorize_config_file(self, job_directory, name, path): if not self.allow_config: raise Exception class TestDependencyManager(object): def dependency_shell_commands(self, requirements, **kwds): return [] class BaseManagerTestCase(TestCase): def setUp(self): self.app = minimal_app_for_managers() self.staging_directory = self.app.staging_directory self.authorizer = self.app.authorizer def tearDown(self): rmtree(self.staging_directory) @nottest def _test_simple_execution(self, manager): command = """python -c "import sys; sys.stdout.write(\'Hello World!\'); sys.stderr.write(\'moo\')" """ job_id = manager.setup_job("123", "tool1", "1.0.0") manager.launch(job_id, command) while manager.get_status(job_id) not in ['complete', 'cancelled']: pass self.assertEquals(manager.stderr_contents(job_id), b'moo') self.assertEquals(manager.stdout_contents(job_id), b'Hello World!') self.assertEquals(manager.return_code(job_id), 0) manager.clean(job_id) self.assertEquals(len(listdir(self.staging_directory)), 0) def _test_cancelling(self, manager): job_id = manager.setup_job("124", "tool1", "1.0.0") command = self._python_to_command("import time; time.sleep(1000)") manager.launch(job_id, command) time.sleep(0.05) manager.kill(job_id) manager.kill(job_id) # Make sure kill doesn't choke if pid doesn't exist self._assert_status_becomes_cancelled(job_id, manager) manager.clean(job_id) def _python_to_command(self, code, quote='"'): assert '"' not in code return 'python -c "%s"' % "; ".join(code.split("\n")) def _assert_status_becomes_cancelled(self, job_id, manager): i = 0 while True: i += 1 status = manager.get_status(job_id) if status in ["complete", "failed"]: raise AssertionError("Expected cancelled status but got %s." % status) elif status == "cancelled": break time.sleep(0.01) if i > 100: # Wait one second raise AssertionError("Job failed to cancel quickly.") def minimal_app_for_managers(): """ Minimimal app description for consumption by managers. """ staging_directory = mkdtemp() rmtree(staging_directory) authorizer = TestAuthorizer() return Bunch(staging_directory=staging_directory, authorizer=authorizer, job_metrics=NullJobMetrics(), dependency_manager=TestDependencyManager()) class NullJobMetrics(object): def __init__(self): self.default_job_instrumenter = NULL_JOB_INSTRUMENTER @nottest @contextmanager def server_for_test_app(app): try: from paste.exceptions.errormiddleware import ErrorMiddleware error_app = ErrorMiddleware(app.app, debug=True, error_log="errors.log") server = StopableWSGIServer.create(error_app) except ImportError: # paste.exceptions not available for Python 3. error_app = app.app server = StopableWSGIServer.create(error_app) try: server.wait() yield server finally: server.shutdown() # There seem to be persistent transient problems with the testing, sleeping # between creation of test app instances for greater than .5 seconds seems # to help (async loop length in code is .5 so this maybe makes some sense?) if "TEST_WEBAPP_POST_SHUTDOWN_SLEEP" in environ: time.sleep(int(environ.get("TEST_WEBAPP_POST_SHUTDOWN_SLEEP"))) @nottest @contextmanager def test_pulsar_server(global_conf={}, app_conf={}, test_conf={}): with test_pulsar_app(global_conf, app_conf, test_conf) as app: with server_for_test_app(app) as test_pulsar_server: yield test_pulsar_server class RestartablePulsarAppProvider(object): def __init__(self, global_conf={}, app_conf={}, test_conf={}, web=True): self.staging_directory = mkdtemp() self.global_conf = global_conf self.app_conf = app_conf self.test_conf = test_conf self.web = web @contextmanager def new_app(self): with test_pulsar_app( self.global_conf, self.app_conf, self.test_conf, staging_directory=self.staging_directory, web=self.web, ) as app: yield app def cleanup(self): try: rmtree(self.staging_directory) except Exception: pass @contextmanager def restartable_pulsar_app_provider(**kwds): try: has_app = RestartablePulsarAppProvider(**kwds) yield has_app finally: has_app.cleanup() @nottest @contextmanager def test_pulsar_app( global_conf={}, app_conf={}, test_conf={}, staging_directory=None, web=True, ): clean_staging_directory = False if staging_directory is None: staging_directory = mkdtemp() clean_staging_directory = True # Make staging directory world executable for run as user tests. mode = stat(staging_directory).st_mode chmod(staging_directory, mode | S_IXOTH) cache_directory = mkdtemp() app_conf["staging_directory"] = staging_directory app_conf["file_cache_dir"] = cache_directory app_conf["ensure_cleanup"] = True try: with _yield_app(global_conf, app_conf, test_conf, web) as app: yield app finally: to_clean = [cache_directory] if clean_staging_directory: to_clean.append(staging_directory) for directory in to_clean: try: rmtree(directory) pass except Exception: pass @contextmanager def _yield_app(global_conf, app_conf, test_conf, web): # Yield either wsgi webapp of the underlying pulsar # app object if the web layer is not needed. try: if web: from pulsar.web.wsgi import app_factory app = app_factory(global_conf, **app_conf) yield TestApp(app, **test_conf) else: from pulsar.main import load_app_configuration from pulsar.core import PulsarApp app_conf = load_app_configuration(local_conf=app_conf) app = PulsarApp(**app_conf) yield app finally: try: shutdown_args = [] if not web: shutdown_args.append(2) app.shutdown(*shutdown_args) except Exception: pass def skip_unless_environ(var): if var in environ: return lambda func: func return skip("Environment variable %s not found, dependent test skipped." % var) def skip_unless_executable(executable): if _which(executable): return lambda func: func return skip("PATH doesn't contain executable %s" % executable) def skip_unless_module(module): available = True try: __import__(module) except ImportError: available = False if available: return lambda func: func return skip("Module %s could not be loaded, dependent test skipped." % module) def skip_unless_any_module(modules): available = False for module in modules: try: __import__(module) except ImportError: continue available = True if available: return lambda func: func return skip("None of the modules %s could be loaded, dependent test skipped." % modules) def skip_if_none(value): if value is not None: return lambda func: func return skip def skip_without_drmaa(f): return skip_if_none(drmaa.Session)(f) def _which(program): def is_exe(fpath): return isfile(fpath) and access(fpath, X_OK) fpath, fname = split(program) if fpath: if is_exe(program): return program else: for path in environ["PATH"].split(pathsep): path = path.strip('"') exe_file = join(path, program) if is_exe(exe_file): return exe_file return None class TestAuthorizer(object): def __init__(self): self.authorization = TestAuthorization() def get_authorization(self, tool_id): return self.authorization class JobFilesApp(object): def __init__(self, root_directory=None): self.root_directory = root_directory def __call__(self, environ, start_response): req = webob.Request(environ) params = req.params.mixed() method = req.method if method == "POST": resp = self._post(req, params) elif method == "GET": resp = self._get(req, params) else: raise Exception("Unhandled request method %s" % method) return resp(environ, start_response) def _post(self, request, params): path = params['path'] if not galaxy.util.in_directory(path, self.root_directory): assert False, "%s not in %s" % (path, self.root_directory) parent_directory = dirname(path) if not exists(parent_directory): makedirs(parent_directory) pulsar.util.copy_to_path(params["file"].file, path) return webob.Response(body='') def _get(self, request, params): path = params['path'] if not galaxy.util.in_directory(path, self.root_directory): assert False, "%s not in %s" % (path, self.root_directory) return file_response(path) @contextmanager def files_server(directory=None): if not directory: with temp_directory() as directory: app = TestApp(JobFilesApp(directory)) with server_for_test_app(app) as server: yield server, directory else: app = TestApp(JobFilesApp(directory)) with server_for_test_app(app) as server: yield server def dump_other_threads(): # Utility for debugging threads that aren't dying during # tests. main_thread = threading.current_thread() for t in threading.enumerate(): if t is main_thread: continue print(t.getName()) traceback.print_stack(sys._current_frames()[t.ident])