From 2692d64101687f3ba93b433e060ab5f01ec05216 Mon Sep 17 00:00:00 2001 From: John Chilton <jmchilton@gmail.com> Date: Sun, 24 Feb 2013 16:09:42 -0600 Subject: [PATCH] Add toolbox option to app.py. Add intial outline of authorization framework with tests. If toolbox is set, tool_id must be sent along with request and this tool_id will be checked to verify it is in the toolbox. Add new manager test to verify this behavior. Reworked base manager to rely on a more hefty JobDirectory class and named constants instead of strings more places. --- lwr/app.py | 46 +++++++++++---- lwr/managers/base.py | 118 +++++++++++++++++++++---------------- lwr/tools/authorization.py | 52 ++++++++++++++++ lwr/util.py | 41 +++++++++++++ test/authorization_test.py | 14 +++++ test/manager_test.py | 56 ++++++++++++++++++ test/persistence_test.py | 5 +- test/test_utils.py | 10 ++++ test/toolbox_test.py | 8 +-- 9 files changed, 282 insertions(+), 68 deletions(-) create mode 100644 lwr/tools/authorization.py create mode 100644 test/authorization_test.py create mode 100644 test/manager_test.py create mode 100644 test/test_utils.py diff --git a/lwr/app.py b/lwr/app.py index f59768b3..0c1d358c 100644 --- a/lwr/app.py +++ b/lwr/app.py @@ -7,8 +7,13 @@ import os from lwr.manager_factory import build_managers from lwr.persistence import PersistedJobStore from lwr.framework import RoutingApp +from lwr.tools import ToolBox +from lwr.tools.authorization import get_authorizer import lwr.routes +from logging import getLogger +log = getLogger(__name__) + def app_factory(global_conf, **local_conf): """ @@ -26,11 +31,11 @@ class LwrApp(RoutingApp): def __init__(self, **conf): RoutingApp.__init__(self) - self.private_key = None - self.staging_directory = os.path.abspath(conf.get('staging_directory', "lwr_staging")) + self.__setup_staging_directory(conf.get('staging_directory', "lwr_staging")) self.__setup_private_key(conf.get("private_key", None)) - self.persisted_job_store = PersistedJobStore(**conf) - self.managers = build_managers(self, conf.get("job_managers_config", None)) + self.__setup_persisted_job_store(conf) + self.__setup_tool_config(conf.get("toolbox_path", None)) + self.__setup_managers(conf.get("job_managers_config", None)) self.__setup_routes() def shutdown(self): @@ -40,6 +45,33 @@ class LwrApp(RoutingApp): except: pass + def __setup_tool_config(self, toolbox_path): + """ + Setups toolbox object and authorization mechanism based + on supplied toolbox_path. + """ + toolbox = None + if toolbox_path: + toolbox = ToolBox(toolbox_path) + else: + log.info("Starting the LWR without a toolbox to white-list tools with, please ensure this application is protected by firewall or a configured private token.") + self.toolbox = toolbox + self.authorizer = get_authorizer(toolbox) + + def __setup_staging_directory(self, staging_directory): + self.staging_directory = os.path.abspath(staging_directory) + + def __setup_persisted_job_store(self, conf): + self.persisted_job_store = PersistedJobStore(**conf) + + def __setup_managers(self, job_managers_config): + self.managers = build_managers(self, job_managers_config) + + def __setup_private_key(self, private_key): + self.private_key = private_key + if private_key: + print "Securing LWR web app with private key, please verify you are using HTTPS so key cannot be obtained by monitoring traffic." + def __setup_routes(self): for func_name, func in inspect.getmembers(lwr.routes, lambda x: getattr(x, '__controller__', False)): self.__add_route_for_function(func) @@ -53,9 +85,3 @@ class LwrApp(RoutingApp): # Add route for named manager as well. named_manager_route = '/managers/{manager_name}%s' % route_suffix self.add_route(named_manager_route, function) - - def __setup_private_key(self, private_key): - if not private_key: - return - print "Securing LWR web app with private key, please verify you are using HTTPS so key cannot be obtained by monitoring traffic." - self.private_key = private_key diff --git a/lwr/managers/base.py b/lwr/managers/base.py index d8cea214..60c06a15 100644 --- a/lwr/managers/base.py +++ b/lwr/managers/base.py @@ -5,7 +5,20 @@ import thread import platform from threading import Lock -from lwr.util import kill_pid +from lwr.util import kill_pid, JobDirectory + +JOB_FILE_SUBMITTED = "submitted" +JOB_FILE_CANCELLED = "cancelled" +JOB_FILE_PID = "pid" +JOB_FILE_RETURN_CODE = "return_code" +JOB_FILE_STANDARD_OUTPUT = "stdout" +JOB_FILE_STANDARD_ERROR = "stderr" +JOB_FILE_TOOL_ID = "tool_id" +JOB_FILE_TOOL_VERSION = "tool_version" + +JOB_DIRECTORY_INPUTS = "inputs" +JOB_DIRECTORY_OUTPUTS = "outputs" +JOB_DIRECTORY_WORKING = "working" class Manager(object): @@ -17,13 +30,14 @@ class Manager(object): >>> import tempfile >>> from lwr.util import Bunch + >>> from lwr.tools.authorization import get_authorizer >>> staging_directory = tempfile.mkdtemp() >>> shutil.rmtree(staging_directory) >>> class PersistedJobStore: ... def next_id(self): ... yield 1 ... yield 2 - >>> app = Bunch(staging_directory=staging_directory, persisted_job_store=PersistedJobStore()) + >>> app = Bunch(staging_directory=staging_directory, persisted_job_store=PersistedJobStore(), authorizer=get_authorizer(None)) >>> manager = Manager('_default_', app) >>> assert os.path.exists(staging_directory) >>> command = "python -c \\"import sys; sys.stdout.write('Hello World!'); sys.stderr.write('moo')\\"" @@ -45,7 +59,7 @@ class Manager(object): >>> import time >>> time.sleep(0.1) >>> manager.kill(job_id) - >>> manager.kill(job_id) # Make sure kill doesn't choke if pid doesn't exist + >>> manager.kill(job_id) # Make sure kill doesn't choke if pid doesn't exist >>> while not manager.check_complete(job_id): pass >>> manager.clean_job_directory(job_id) """ @@ -55,6 +69,7 @@ class Manager(object): self.name = name self.setup_staging_directory(app.staging_directory) self.job_locks = dict({}) + self.authorizer = app.authorizer def setup_staging_directory(self, staging_directory): assert not staging_directory == None @@ -63,57 +78,61 @@ class Manager(object): assert os.path.isdir(staging_directory) self.staging_directory = staging_directory - def __job_file(self, job_id, name): - return os.path.join(self.job_directory(job_id), name) + def __job_directory(self, job_id): + return JobDirectory(self.staging_directory, job_id) def __read_job_file(self, job_id, name): - path = self.__job_file(job_id, name) - job_file = open(path, 'r') - try: - return job_file.read() - finally: - job_file.close() + return self.__job_directory(job_id).read_file(name) def __write_job_file(self, job_id, name, contents): - path = self.__job_file(job_id, name) - job_file = open(path, 'w') - try: - job_file.write(contents) - finally: - job_file.close() + self.__job_directory(job_id).write_file(name, contents) def _record_submission(self, job_id): - self.__write_job_file(job_id, 'submitted', 'true') + self.__write_job_file(job_id, JOB_FILE_SUBMITTED, 'true') def _record_cancel(self, job_id): - self.__write_job_file(job_id, 'cancelled', 'true') + self.__write_job_file(job_id, JOB_FILE_CANCELLED, 'true') def _is_cancelled(self, job_id): - return self._has_job_file(job_id, 'cancelled') - - def _has_job_file(self, job_id, filename): - return os.path.exists(self.__job_file(job_id, filename)) + return self.__job_directory(job_id).contains_file(JOB_FILE_CANCELLED) def _record_pid(self, job_id, pid): - self.__write_job_file(job_id, 'pid', str(pid)) + self.__write_job_file(job_id, JOB_FILE_PID, str(pid)) def get_pid(self, job_id): pid = None try: - pid = self.__read_job_file(job_id, 'pid') + pid = self.__read_job_file(job_id, JOB_FILE_PID) if pid != None: pid = int(pid) except: pass return pid + def __get_authorization(self, job_id, tool_id=None): + job_directory = self.__job_directory(job_id) + if tool_id is None and job_directory.contains_file(JOB_FILE_TOOL_ID): + tool_id = job_directory.read_file(JOB_FILE_TOOL_ID) + return self.authorizer.get_authorization(tool_id) + + def __unauthorized(self, msg): + raise Exception("Unauthorized action attempted: %s" % msg) + def setup_job(self, input_job_id, tool_id, tool_version): job_id = self._register_job(input_job_id, True) - job_directory = self.job_directory(job_id) - os.mkdir(job_directory) - os.mkdir(self.inputs_directory(job_id)) - os.mkdir(self.outputs_directory(job_id)) - os.mkdir(self.working_directory(job_id)) + authorization = self.__get_authorization(job_id, tool_id) + if not authorization.can_setup(): + self.__unauthorized("Cannot submit tool with id '%s'" % tool_id) + job_directory = self.__job_directory(job_id) + job_directory.setup() + for directory in [JOB_DIRECTORY_INPUTS, JOB_DIRECTORY_WORKING, JOB_DIRECTORY_OUTPUTS]: + job_directory.make_directory(directory) + + tool_id = str(tool_id) if tool_id else "" + tool_version = str(tool_version) if tool_version else "" + + job_directory.write_file(JOB_FILE_TOOL_ID, tool_id) + job_directory.write_file(JOB_FILE_TOOL_VERSION, tool_version) return job_id def _get_job_id(self, galaxy_job_id): @@ -145,36 +164,37 @@ class Manager(object): return os.path.join(self.staging_directory, job_id) def working_directory(self, job_id): - return os.path.join(self.job_directory(job_id), 'working') + return os.path.join(self.job_directory(job_id), JOB_DIRECTORY_WORKING) def inputs_directory(self, job_id): - return os.path.join(self.job_directory(job_id), 'inputs') + return os.path.join(self.job_directory(job_id), JOB_DIRECTORY_INPUTS) def outputs_directory(self, job_id): - return os.path.join(self.job_directory(job_id), 'outputs') + return os.path.join(self.job_directory(job_id), JOB_DIRECTORY_OUTPUTS) def check_complete(self, job_id): - return not os.path.exists(self.__job_file(job_id, 'submitted')) + return not self.__job_directory(job_id).contains_file(JOB_FILE_SUBMITTED) def return_code(self, job_id): - return int(self.__read_job_file(job_id, 'return_code')) + return int(self.__read_job_file(job_id, JOB_FILE_RETURN_CODE)) def stdout_contents(self, job_id): - return self.__read_job_file(job_id, 'stdout') + return self.__read_job_file(job_id, JOB_FILE_STANDARD_OUTPUT) def stderr_contents(self, job_id): - return self.__read_job_file(job_id, 'stderr') + return self.__read_job_file(job_id, JOB_FILE_STANDARD_ERROR) def get_status(self, job_id): with self._get_job_lock(job_id): return self._get_status(job_id) def _get_status(self, job_id): + job_directory = self.__job_directory(job_id) if self._is_cancelled(job_id): return 'cancelled' - elif self._has_job_file(job_id, 'pid'): + elif job_directory.contains_file(JOB_FILE_PID): return 'running' - elif self._has_job_file(job_id, 'submitted'): + elif job_directory.contains_file(JOB_FILE_SUBMITTED): return 'queued' else: return 'complete' @@ -198,7 +218,7 @@ class Manager(object): pid = self.get_pid(job_id) if pid == None: self._record_cancel(job_id) - self._attempt_remove_job_file(job_id, 'submitted') + self.__job_directory(job_id).remove_file(JOB_FILE_SUBMITTED) if pid: kill_pid(pid) @@ -209,20 +229,14 @@ class Manager(object): stdout.close() stderr.close() return_code = proc.returncode - self.__write_job_file(job_id, 'return_code', str(return_code)) + self.__write_job_file(job_id, JOB_FILE_RETURN_CODE, str(return_code)) finally: with self._get_job_lock(job_id): self._finish_execution(job_id) def _finish_execution(self, job_id): - self._attempt_remove_job_file(job_id, 'submitted') - self._attempt_remove_job_file(job_id, 'pid') - - def _attempt_remove_job_file(self, job_id, filename): - try: - os.remove(self.__job_file(job_id, filename)) - except OSError: - pass + self.__job_directory(job_id).remove_file(JOB_FILE_SUBMITTED) + self.__job_directory(job_id).remove_file(JOB_FILE_PID) def _run(self, job_id, command_line, async=True): with self._get_job_lock(job_id): @@ -232,8 +246,8 @@ class Manager(object): preexec_fn = None if not self.is_windows(): preexec_fn = os.setpgrp - stdout = open(self.__job_file(job_id, 'stdout'), 'w') - stderr = open(self.__job_file(job_id, 'stderr'), 'w') + stdout = self.__job_directory(job_id).open_file(JOB_FILE_STANDARD_OUTPUT, 'w') + stderr = self.__job_directory(job_id).open_file(JOB_FILE_STANDARD_ERROR, 'w') proc = subprocess.Popen(args=command_line, shell=True, cwd=working_directory, @@ -250,3 +264,5 @@ class Manager(object): def launch(self, job_id, command_line): self._record_submission(job_id) self._run(job_id, command_line) + +__all__ = [Manager] diff --git a/lwr/tools/authorization.py b/lwr/tools/authorization.py new file mode 100644 index 00000000..08b305e4 --- /dev/null +++ b/lwr/tools/authorization.py @@ -0,0 +1,52 @@ + + +class AllowAnyAuthorization(object): + + def can_setup(self): + return True + + +class AllowAnyAuthorizer(object): + """ + Allow any, by default LWR is assumed to be secured + using a firewall or private_token. + """ + ALLOW_ANY_AUTHORIZATION = AllowAnyAuthorization() + + def get_authorization(self, tool_id): + return self.ALLOW_ANY_AUTHORIZATION + + +class ToolBasedAuthorization(AllowAnyAuthorization): + + def __init__(self, tool): + self.tool = tool + + def can_setup(self): + return self.tool is not None + + +class ToolBasedAuthorizer(object): + """ + Work In Progress: Implement tool based white-listing + of what jobs can run and what those jobs can do. + """ + + def __init__(self, toolbox): + self.toolbox = toolbox + + def get_authorization(self, tool_id): + tool = self.toolbox.get_tool(tool_id) + return ToolBasedAuthorization(tool) + + +def get_authorizer(toolbox): + if toolbox: + # Use toolbox as a white list. + authorizer = ToolBasedAuthorizer(toolbox) + else: + # No toolbox specified, allow any tools to run. + authorizer = AllowAnyAuthorizer() + return authorizer + +__all__ = [get_authorizer] diff --git a/lwr/util.py b/lwr/util.py index 5d5a6089..c4ccaca9 100644 --- a/lwr/util.py +++ b/lwr/util.py @@ -72,6 +72,47 @@ class JobDirectory(object): def outputs_directory(self): return self._sub_dir('outputs') + def __job_file(self, name): + return os.path.join(self.job_directory, name) + + def read_file(self, name): + path = self.__job_file(name) + job_file = open(path, 'r') + try: + return job_file.read() + finally: + job_file.close() + + def write_file(self, name, contents): + path = self.__job_file(name) + job_file = open(path, 'w') + try: + job_file.write(contents) + finally: + job_file.close() + + def remove_file(self, name): + """ + Quietly remove a job file. + """ + try: + os.remove(self.__job_file(name)) + except OSError: + pass + + def contains_file(self, name): + return os.path.exists(self.__job_file(name)) + + def open_file(self, name, mode='w'): + return open(self.__job_file(name), mode) + + def setup(self): + os.mkdir(self.job_directory) + + def make_directory(self, name): + path = self.__job_file(name) + os.mkdir(path) + def get_mapped_file(directory, remote_path, allow_nested_files=False, local_path_module=os.path, mkdir=True): """ diff --git a/test/authorization_test.py b/test/authorization_test.py new file mode 100644 index 00000000..26e93d6b --- /dev/null +++ b/test/authorization_test.py @@ -0,0 +1,14 @@ + +from lwr.tools.authorization import get_authorizer +from test_utils import get_test_toolbox + + +def test_allow_any_authorization(): + authorization_manager = get_authorizer(None) + assert authorization_manager.get_authorization('tool1').can_setup() + + +def test_tool_whitelist_authorization(): + toolbox = get_test_toolbox() + authorization_manager = get_authorizer(toolbox) + assert authorization_manager.get_authorization('tool1').can_setup() diff --git a/test/manager_test.py b/test/manager_test.py new file mode 100644 index 00000000..c1273857 --- /dev/null +++ b/test/manager_test.py @@ -0,0 +1,56 @@ +import tempfile + +from lwr.managers.base import Manager +from lwr.util import Bunch + +from unittest import TestCase +from shutil import rmtree + + +class ManagerTest(TestCase): + + def setUp(self): + staging_directory = tempfile.mkdtemp() + rmtree(staging_directory) + self.staging_directory = staging_directory + self.authorizer = TestAuthorizer() + + self.app = Bunch(staging_directory=staging_directory, + persisted_job_store=TestPersistedJobStore(), + authorizer=self.authorizer) + + self.manager = Manager('_default_', self.app) + + def tearDown(self): + rmtree(self.staging_directory) + + def test_unauthorized_tool_submission(self): + self.authorizer.authorization.allow_setup = False + with self.assertRaises(Exception): + self.manager.setup_job("123", "tool1", "1.0.0") + + +class TestAuthorization(object): + + def __init__(self): + self.allow_setup = True + + def can_setup(self): + return self.allow_setup + + +class TestAuthorizer(object): + + def __init__(self): + self.authorization = TestAuthorization() + + def get_authorization(self): + return self.authorization + + +class TestPersistedJobStore: + + def next_id(self): + yield 1 + yield 2 + yield 3 diff --git a/test/persistence_test.py b/test/persistence_test.py index ab124867..9df3d6ba 100644 --- a/test/persistence_test.py +++ b/test/persistence_test.py @@ -6,6 +6,7 @@ import time from lwr.persistence import PersistedJobStore from lwr.managers.queued import QueueManager from lwr.util import Bunch +from lwr.tools.authorization import get_authorizer def test_persistence(): @@ -15,7 +16,9 @@ def test_persistence(): staging_directory = tempfile.mkdtemp() try: persisted_job_store = PersistedJobStore(**{'shelf_filename': os.path.join(staging_directory, 'persisted_jobs')}) - app = Bunch(persisted_job_store=persisted_job_store, staging_directory=staging_directory) + app = Bunch(persisted_job_store=persisted_job_store, + staging_directory=staging_directory, + authorizer=get_authorizer(None)) queue1 = QueueManager('test', app, num_concurrent_jobs=0) job_id = queue1.setup_job('4', 'tool1', '1.0.0') touch_file = os.path.join(staging_directory, 'ran') diff --git a/test/test_utils.py b/test/test_utils.py new file mode 100644 index 00000000..ebfeba7f --- /dev/null +++ b/test/test_utils.py @@ -0,0 +1,10 @@ +from lwr.tools import ToolBox + +from os import pardir +from os.path import join, dirname + + +def get_test_toolbox(): + toolbox_path = join(dirname(__file__), pardir, "test_data", "test_shed_toolbox.xml") + toolbox = ToolBox(toolbox_path) + return toolbox diff --git a/test/toolbox_test.py b/test/toolbox_test.py index 4f73b843..d389a94b 100644 --- a/test/toolbox_test.py +++ b/test/toolbox_test.py @@ -1,11 +1,7 @@ - -from lwr.tools import ToolBox -from os import pardir -from os.path import join, dirname +from test_utils import get_test_toolbox def test_load_simple_tool(): - toolbox_path = join(dirname(__file__), pardir, "test_data", "test_shed_toolbox.xml") - toolbox = ToolBox(toolbox_path) + toolbox = get_test_toolbox() tool1 = toolbox.get_tool("tool1") assert tool1.version == "0.1" -- GitLab