From b00f97df96e1642ae2ae1be1703678176d3efa57 Mon Sep 17 00:00:00 2001 From: John Chilton <jmchilton@gmail.com> Date: Wed, 11 Dec 2013 00:53:46 -0600 Subject: [PATCH] Allow launch to take a list of serialized tool requirements. Build a Galaxy 'DependencyManager' in LwrApp and utilize it in managers to resolve these requirements. A more correct approach would probably be to read the tool on LWR side, but this mode is potentially very useful for LWR instances not bound to toolboxes (i.e. every LWR instance I know about). --- galaxy/tools/deps/requirements.py | 10 ++++++ lwr/app.py | 9 ++++- lwr/lwr_client/client.py | 4 ++- lwr/lwr_client/stager.py | 8 +++-- lwr/managers/base/__init__.py | 7 ++++ lwr/managers/base/base_drmaa.py | 4 +-- lwr/managers/base/external.py | 3 +- lwr/managers/queued.py | 4 +-- lwr/managers/queued_cli.py | 3 +- lwr/managers/queued_condor.py | 4 +-- lwr/managers/queued_drmaa.py | 4 +-- lwr/managers/queued_external_drmaa.py | 5 +-- lwr/managers/unqueued.py | 8 +++-- lwr/routes.py | 7 ++-- test/check.py | 32 +++++++++++++---- test/integration_test.py | 50 ++++++++++++++++++++------- 16 files changed, 121 insertions(+), 41 deletions(-) diff --git a/galaxy/tools/deps/requirements.py b/galaxy/tools/deps/requirements.py index 8f39d978..cf765ef7 100644 --- a/galaxy/tools/deps/requirements.py +++ b/galaxy/tools/deps/requirements.py @@ -15,6 +15,16 @@ class ToolRequirement( object ): self.type = type self.version = version + def to_dict( self ): + return dict(name=self.name, type=self.type, version=self.version) + + @staticmethod + def from_dict( dict ): + version = dict.get( "version", None ) + name = dict.get("name", None) + type = dict.get("type", None) + return ToolRequirement( name=name, type=type, version=version ) + def parse_requirements_from_xml( xml_root ): """ diff --git a/lwr/app.py b/lwr/app.py index fd8217bd..87ba4dd0 100644 --- a/lwr/app.py +++ b/lwr/app.py @@ -13,6 +13,7 @@ from lwr.tools.authorization import get_authorizer from lwr.util.bunch import Bunch import lwr.routes from galaxy.objectstore import build_object_store_from_config +from galaxy.tools.deps import DependencyManager from logging import getLogger log = getLogger(__name__) @@ -48,10 +49,11 @@ class LwrApp(RoutingApp): self.__setup_private_key(conf.get("private_key", DEFAULT_PRIVATE_KEY)) self.__setup_persistence_directory(conf.get("persistence_directory", None)) self.__setup_tool_config(conf) + self.__setup_object_store(conf) + self.__setup_dependency_manager(conf) self.__setup_managers(conf) self.__setup_file_cache(conf) self.__setup_routes() - self.__setup_object_store(conf) def shutdown(self): for manager in self.managers.values(): @@ -114,6 +116,11 @@ class LwrApp(RoutingApp): ) self.object_store = build_object_store_from_config(object_store_config) + def __setup_dependency_manager(self, conf): + dependencies_dir = conf.get("tool_dependency_dir", "dependencies") + resolvers_config_file = conf.get("dependency_resolvers_config_file", "dependency_resolvers_conf.xml") + self.dependency_manager = DependencyManager(dependencies_dir, resolvers_config_file) + def __add_route_for_function(self, function): route_suffix = '/%s' % function.__name__ # Default or old-style route without explicit manager specified, diff --git a/lwr/lwr_client/client.py b/lwr/lwr_client/client.py index 7b6ebd59..08380f5f 100644 --- a/lwr/lwr_client/client.py +++ b/lwr/lwr_client/client.py @@ -197,7 +197,7 @@ class JobClient(object): } self._raw_execute("download_output", output_params, output_path=output_path) - def launch(self, command_line): + def launch(self, command_line, requirements=[]): """ Run or queue up the execution of the supplied `command_line` on the remote server. @@ -211,6 +211,8 @@ class JobClient(object): submit_params = self._submit_params if submit_params: launch_params['params'] = dumps(submit_params) + if requirements: + launch_params['requirements'] = dumps([requirement.to_dict() for requirement in requirements]) return self._raw_execute("launch", launch_params) def kill(self): diff --git a/lwr/lwr_client/stager.py b/lwr/lwr_client/stager.py index d06b31da..d7160833 100644 --- a/lwr/lwr_client/stager.py +++ b/lwr/lwr_client/stager.py @@ -350,7 +350,7 @@ def submit_job(client, client_job_description, job_config=None): file_stager = FileStager(client, client_job_description, job_config) rebuilt_command_line = file_stager.get_rewritten_command_line() job_id = file_stager.job_id - client.launch(rebuilt_command_line) + client.launch(rebuilt_command_line, requirements=client_job_description.requirements) return job_id @@ -383,15 +383,17 @@ class ClientJobDescription(object): Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server). working_directory : str Local path created by Galaxy for running this job. + requirements : list + List of requirements for tool execution. """ - def __init__(self, tool, command_line, config_files, input_files, output_files, working_directory): + def __init__(self, tool, command_line, config_files, input_files, output_files, working_directory, requirements): self.tool = tool self.command_line = command_line self.config_files = config_files self.input_files = input_files self.output_files = output_files self.working_directory = working_directory - + self.requirements = requirements __all__ = [submit_job, ClientJobDescription, finish_job] diff --git a/lwr/managers/base/__init__.py b/lwr/managers/base/__init__.py index ead775d7..6192f239 100644 --- a/lwr/managers/base/__init__.py +++ b/lwr/managers/base/__init__.py @@ -50,6 +50,7 @@ class BaseManager(ManagerInterface): self.debug = str(kwds.get("debug", False)).lower() == "true" self.authorizer = app.authorizer self.__init_system_properties() + self.dependency_manager = app.dependency_manager def clean(self, job_id): if self.debug: @@ -157,3 +158,9 @@ class BaseManager(ManagerInterface): path = join(config_files_dir, file) authorization.authorize_config_file(job_directory, file, path) authorization.authorize_execution(job_directory, command_line) + + def _expand_command_line(self, command_line, requirements): + dependency_commands = self.dependency_manager.dependency_shell_commands(requirements) + if dependency_commands: + command_line = "%s; %s" % ("; ".join(dependency_commands), command_line) + return command_line diff --git a/lwr/managers/base/base_drmaa.py b/lwr/managers/base/base_drmaa.py index 054063a9..ae91c62b 100644 --- a/lwr/managers/base/base_drmaa.py +++ b/lwr/managers/base/base_drmaa.py @@ -37,12 +37,12 @@ class BaseDrmaaManager(ExternalBaseManager): JobState.FAILED: 'complete', # Should be a FAILED state here as well }[drmaa_state] - def _build_template_attributes(self, job_id, command_line): + def _build_template_attributes(self, job_id, command_line, requirements=[]): stdout_path = self._stdout_path(job_id) stderr_path = self._stderr_path(job_id) attributes = { - "remoteCommand": self._setup_job_file(job_id, command_line), + "remoteCommand": self._setup_job_file(job_id, command_line, requirements=requirements), "jobName": self._job_name(job_id), "outputPath": ":%s" % stdout_path, "errorPath": ":%s" % stderr_path, diff --git a/lwr/managers/base/external.py b/lwr/managers/base/external.py index c5df8e64..5d4758b8 100644 --- a/lwr/managers/base/external.py +++ b/lwr/managers/base/external.py @@ -36,7 +36,8 @@ class ExternalBaseManager(DirectoryBaseManager): raise KeyError("Failed to find external id for job_id %s" % job_id) return self._get_status_external(external_id) - def _setup_job_file(self, job_id, command_line): + def _setup_job_file(self, job_id, command_line, requirements=[]): + command_line = self._expand_command_line(command_line, requirements) script_env = self._job_template_env(job_id, command_line=command_line) script = job_script(**script_env) return self._write_job_script(job_id, script) diff --git a/lwr/managers/queued.py b/lwr/managers/queued.py index d926dc0c..02ad47fa 100644 --- a/lwr/managers/queued.py +++ b/lwr/managers/queued.py @@ -47,8 +47,8 @@ class QueueManager(Manager): worker.start() self.work_threads.append(worker) - def launch(self, job_id, command_line, submit_params={}): - self._prepare_run(job_id, command_line) + def launch(self, job_id, command_line, submit_params={}, requirements=[]): + command_line = self._prepare_run(job_id, command_line, requirements=requirements) self.work_queue.put((RUN, (job_id, command_line))) self.persisted_job_store.enqueue(job_id, command_line) diff --git a/lwr/managers/queued_cli.py b/lwr/managers/queued_cli.py index 3a82c3d8..dd780373 100644 --- a/lwr/managers/queued_cli.py +++ b/lwr/managers/queued_cli.py @@ -20,7 +20,7 @@ class CliQueueManager(ExternalBaseManager): self.cli_interface = CliInterface(code_dir='.') self.shell_params, self.job_params = split_params(kwds) - def launch(self, job_id, command_line, submit_params={}): + def launch(self, job_id, command_line, submit_params={}, requirements=[]): self._check_execution_with_tool_file(job_id, command_line) shell, job_interface = self.__get_cli_plugins() return_code_path = self._return_code_path(job_id) @@ -28,6 +28,7 @@ class CliQueueManager(ExternalBaseManager): stderr_path = self._stderr_path(job_id) job_name = self._job_name(job_id) working_directory = self.working_directory(job_id) + command_line = self._expand_command_line(command_line, requirements) script = job_interface.get_job_template(stdout_path, stderr_path, job_name, working_directory, command_line, return_code_path) script_path = self._write_job_script(job_id, script) submission_command = job_interface.submit(script_path) diff --git a/lwr/managers/queued_condor.py b/lwr/managers/queued_condor.py index e9258120..8d1b3d2d 100644 --- a/lwr/managers/queued_condor.py +++ b/lwr/managers/queued_condor.py @@ -25,9 +25,9 @@ class CondorQueueManager(ExternalBaseManager): self.user_log_sizes = {} self.state_cache = {} - def launch(self, job_id, command_line, submit_params={}): + def launch(self, job_id, command_line, submit_params={}, requirements=[]): self._check_execution_with_tool_file(job_id, command_line) - job_file_path = self._setup_job_file(job_id, command_line) + job_file_path = self._setup_job_file(job_id, command_line, requirements=requirements) log_path = self.__condor_user_log(job_id) open(log_path, 'w') # Touch log file build_submit_params = dict( diff --git a/lwr/managers/queued_drmaa.py b/lwr/managers/queued_drmaa.py index 7b723ea8..ef946fce 100644 --- a/lwr/managers/queued_drmaa.py +++ b/lwr/managers/queued_drmaa.py @@ -7,9 +7,9 @@ class DrmaaQueueManager(BaseDrmaaManager): """ manager_type = "queued_drmaa" - def launch(self, job_id, command_line, submit_params={}): + def launch(self, job_id, command_line, submit_params={}, requirements=[]): self._check_execution_with_tool_file(job_id, command_line) - attributes = self._build_template_attributes(job_id, command_line) + attributes = self._build_template_attributes(job_id, command_line, requirements=requirements) external_id = self.drmaa_session.run_job(**attributes) self._register_external_id(job_id, external_id) diff --git a/lwr/managers/queued_external_drmaa.py b/lwr/managers/queued_external_drmaa.py index 2bac0af0..9ad769d9 100644 --- a/lwr/managers/queued_external_drmaa.py +++ b/lwr/managers/queued_external_drmaa.py @@ -27,9 +27,10 @@ class ExternalDrmaaQueueManager(BaseDrmaaManager): self.reclaimed = {} self.user_map = {} - def launch(self, job_id, command_line, submit_params={}): + def launch(self, job_id, command_line, submit_params={}, requirements=[]): self._check_execution_with_tool_file(job_id, command_line) - attributes = self._build_template_attributes(job_id, command_line) + attributes = self._build_template_attributes(job_id, command_line, requirements=requirements) + print open(attributes['remoteCommand'], 'r').read() job_attributes_file = self._write_job_file(job_id, 'jt.json', dumps(attributes)) user = submit_params.get('user', None) log.info("Submit as user %s" % user) diff --git a/lwr/managers/unqueued.py b/lwr/managers/unqueued.py index 96113e5e..b6c582fe 100644 --- a/lwr/managers/unqueued.py +++ b/lwr/managers/unqueued.py @@ -170,12 +170,14 @@ class Manager(DirectoryBaseManager): else: self._monitor_execution(job_id, proc, stdout, stderr) - def launch(self, job_id, command_line, submit_params={}): - self._prepare_run(job_id, command_line) + def launch(self, job_id, command_line, submit_params={}, requirements=[]): + command_line = self._prepare_run(job_id, command_line, requirements=requirements) self._run(job_id, command_line) - def _prepare_run(self, job_id, command_line): + def _prepare_run(self, job_id, command_line, requirements): self._check_execution_with_tool_file(job_id, command_line) self._record_submission(job_id) + command_line = self._expand_command_line(command_line, requirements) + return command_line __all__ = [Manager] diff --git a/lwr/routes.py b/lwr/routes.py index 90988b7a..dffddfcc 100644 --- a/lwr/routes.py +++ b/lwr/routes.py @@ -6,6 +6,8 @@ from lwr.util import get_mapped_file, copy_to_path, copy_to_temp, verify_is_in_d from lwr.framework import Controller from lwr.manager_factory import DEFAULT_MANAGER_NAME +from galaxy.tools.deps.requirements import ToolRequirement + import logging log = logging.getLogger(__name__) @@ -56,9 +58,10 @@ def clean(manager, job_id): @LwrController() -def launch(manager, job_id, command_line, params='{}'): +def launch(manager, job_id, command_line, params='{}', requirements='[]'): submit_params = loads(params) - manager.launch(job_id, command_line, submit_params) + requirements = [ToolRequirement.from_dict(requirement_dict) for requirement_dict in loads(requirements)] + manager.launch(job_id, command_line, submit_params, requirements) @LwrController(response_type='json') diff --git a/test/check.py b/test/check.py index e59558ab..3b6516d1 100644 --- a/test/check.py +++ b/test/check.py @@ -7,14 +7,18 @@ import traceback from io import open from lwr.lwr_client import submit_job, finish_job, ClientManager, ClientJobDescription +from galaxy.tools.deps.requirements import ToolRequirement TEST_SCRIPT = b""" import sys +from os import getenv + config_input = open(sys.argv[1], 'r') input_input = open(sys.argv[2], 'r') output = open(sys.argv[3], 'w') output2 = open(sys.argv[5], 'w') output2_contents = sys.argv[6] +output3 = open(sys.argv[7], 'w') try: assert input_input.read() == "Hello world input!!@!" contents = config_input.read(1024) @@ -22,14 +26,17 @@ try: open("workdir_output", "w").write("WORK DIR OUTPUT") output2.write(output2_contents) with open("galaxy.json", "w") as f: f.write("GALAXY_JSON") + output3.write(getenv("MOO", "moo_default")) finally: output.close() config_input.close() output2.close() + output3.close() """ EXPECTED_OUTPUT = b"hello world output" EXAMPLE_UNICODE_TEXT = u'єχαмÏâ„“Ñ” συтÏÏ…Ñ‚' +TEST_REQUIREMENT = ToolRequirement(name="dep1", version="1.1", type="package") class MockTool(object): @@ -53,6 +60,7 @@ def run(options): temp_tool_path = os.path.join(temp_directory, "t", "script.py") temp_output_path = os.path.join(temp_directory, "output") temp_output2_path = os.path.join(temp_directory, "output2") + temp_output3_path = os.path.join(temp_directory, "output3") __write_to_file(temp_input_path, b"Hello world input!!@!") __write_to_file(temp_config_path, EXPECTED_OUTPUT) @@ -67,12 +75,18 @@ def run(options): empty_input, temp_output2_path, EXAMPLE_UNICODE_TEXT, + temp_output3_path, ) - command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params + command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params config_files = [temp_config_path] input_files = [temp_input_path, empty_input] - output_files = [temp_output_path, temp_output2_path] + output_files = [temp_output_path, temp_output2_path, temp_output3_path] client = __client(options) + requirements = [] + test_requirement = options.get("test_requirement", False) + if test_requirement: + requirements.append(TEST_REQUIREMENT) + job_description = ClientJobDescription( command_line=command_line, tool=MockTool(temp_tool_dir), @@ -80,6 +94,7 @@ def run(options): input_files=input_files, output_files=output_files, working_directory=temp_work_dir, + requirements=requirements, ) submit_job(client, job_description) result_status = client.wait() @@ -94,10 +109,13 @@ def run(options): ) failed = finish_job(**finish_args) if failed: - raise Exception("Failed to finish job correctly") + raise Exception("Failed to finish job correctly - %s" % result_status) __check_outputs(temp_output_path, temp_output2_path) __assert_contents(os.path.join(temp_work_dir, "galaxy.json"), b"GALAXY_JSON") - + if test_requirement: + __assert_contents(temp_output3_path, "moo_override") + else: + __assert_contents(temp_output3_path, "moo_default") __exercise_errors(options, client, temp_output_path, temp_directory) except BaseException: if not options.suppress_output: @@ -115,11 +133,11 @@ def __check_outputs(temp_output_path, temp_output2_path): __assert_contents(temp_output2_path, EXAMPLE_UNICODE_TEXT) -def __assert_contents(path, contents): - file = open(path, 'rb') +def __assert_contents(path, expected_contents): + file = open(path, 'r', encoding="utf-8") try: contents = file.read() - assert contents == contents, "Invalid contents: %s" % contents + assert contents == expected_contents, "Invalid contents: %s" % contents finally: file.close() diff --git a/test/integration_test.py b/test/integration_test.py index cf526e88..159fed98 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -1,4 +1,5 @@ from os.path import join +from os import makedirs, system from six import next, itervalues from .test_utils import TempDirectoryTestCase, skipUnlessExecutable, skipUnlessModule @@ -15,19 +16,12 @@ class BaseIntegrationTest(TempDirectoryTestCase): def _run(self, app_conf={}, job_conf_props={}, **kwds): app_conf = app_conf.copy() job_conf_props = job_conf_props.copy() + if "suppress_output" not in kwds: kwds["suppress_output"] = False - if job_conf_props: - job_conf = join(self.temp_directory, "job_managers.ini") - config = ConfigParser() - section_name = "manager:_default_" - config.add_section(section_name) - for key, value in job_conf_props.iteritems(): - config.set(section_name, key, value) - with open(job_conf, "wb") as configf: - config.write(configf) - app_conf["job_managers_config"] = job_conf + self.__setup_job_properties(app_conf, job_conf_props) + self.__setup_dependencies(app_conf) if kwds.get("direct_interface", None): from .test_utils import test_app @@ -40,9 +34,41 @@ class BaseIntegrationTest(TempDirectoryTestCase): options = Bunch(url=server.application_url, **kwds) run(options) + def __setup_job_properties(self, app_conf, job_conf_props): + if job_conf_props: + job_conf = join(self.temp_directory, "job_managers.ini") + config = ConfigParser() + section_name = "manager:_default_" + config.add_section(section_name) + for key, value in job_conf_props.iteritems(): + config.set(section_name, key, value) + with open(job_conf, "wb") as configf: + config.write(configf) + + app_conf["job_managers_config"] = job_conf + + def __setup_dependencies(self, app_conf): + dependencies_dir = join(self.temp_directory, "dependencies") + dep1_directory = join(dependencies_dir, "dep1", "1.1") + makedirs(dep1_directory) + try: + # Let external users read/execute this directory for run as user + # test. + system("chmod 755 %s" % self.temp_directory) + system("chmod -R 755 %s" % dependencies_dir) + except Exception as e: + print e + env_file = join(dep1_directory, "env.sh") + with open(env_file, "w") as env: + env.write("MOO=moo_override; export MOO") + app_conf["tool_dependency_dir"] = dependencies_dir + class IntegrationTests(BaseIntegrationTest): - default_kwargs = dict(direct_interface=False) + default_kwargs = dict(direct_interface=False, test_requirement=True) + + def test_integration_no_requirement(self): + self._run(private_token=None, **self.default_kwargs) @skipUnlessModule("drmaa") def test_integration_as_user(self): @@ -85,4 +111,4 @@ class IntegrationTests(BaseIntegrationTest): class DirectIntegrationTests(IntegrationTests): - default_kwargs = dict(direct_interface=True) + default_kwargs = dict(direct_interface=True, test_requirement=False) -- GitLab