diff --git a/galaxy/tools/deps/requirements.py b/galaxy/tools/deps/requirements.py index 8f39d9788be77ef5ee11d08fa75fa2503dbfc1c1..cf765ef76c80f76dedf75f01446157939d47249b 100644 --- a/galaxy/tools/deps/requirements.py +++ b/galaxy/tools/deps/requirements.py @@ -15,6 +15,16 @@ class ToolRequirement( object ): self.type = type self.version = version + def to_dict( self ): + return dict(name=self.name, type=self.type, version=self.version) + + @staticmethod + def from_dict( dict ): + version = dict.get( "version", None ) + name = dict.get("name", None) + type = dict.get("type", None) + return ToolRequirement( name=name, type=type, version=version ) + def parse_requirements_from_xml( xml_root ): """ diff --git a/lwr/app.py b/lwr/app.py index fd8217bd5bbbddc5cb5d37e055bbd473262a94cb..87ba4dd04eb0ad2485b16c71871f6fa228ce8352 100644 --- a/lwr/app.py +++ b/lwr/app.py @@ -13,6 +13,7 @@ from lwr.tools.authorization import get_authorizer from lwr.util.bunch import Bunch import lwr.routes from galaxy.objectstore import build_object_store_from_config +from galaxy.tools.deps import DependencyManager from logging import getLogger log = getLogger(__name__) @@ -48,10 +49,11 @@ class LwrApp(RoutingApp): self.__setup_private_key(conf.get("private_key", DEFAULT_PRIVATE_KEY)) self.__setup_persistence_directory(conf.get("persistence_directory", None)) self.__setup_tool_config(conf) + self.__setup_object_store(conf) + self.__setup_dependency_manager(conf) self.__setup_managers(conf) self.__setup_file_cache(conf) self.__setup_routes() - self.__setup_object_store(conf) def shutdown(self): for manager in self.managers.values(): @@ -114,6 +116,11 @@ class LwrApp(RoutingApp): ) self.object_store = build_object_store_from_config(object_store_config) + def __setup_dependency_manager(self, conf): + dependencies_dir = conf.get("tool_dependency_dir", "dependencies") + resolvers_config_file = conf.get("dependency_resolvers_config_file", "dependency_resolvers_conf.xml") + self.dependency_manager = DependencyManager(dependencies_dir, resolvers_config_file) + def __add_route_for_function(self, function): route_suffix = '/%s' % function.__name__ # Default or old-style route without explicit manager specified, diff --git a/lwr/lwr_client/client.py b/lwr/lwr_client/client.py index 7b6ebd5991ca22d8e09d70d6fe85a544a0a291a6..08380f5f49817184cc797196fb042b9a810f0352 100644 --- a/lwr/lwr_client/client.py +++ b/lwr/lwr_client/client.py @@ -197,7 +197,7 @@ class JobClient(object): } self._raw_execute("download_output", output_params, output_path=output_path) - def launch(self, command_line): + def launch(self, command_line, requirements=[]): """ Run or queue up the execution of the supplied `command_line` on the remote server. @@ -211,6 +211,8 @@ class JobClient(object): submit_params = self._submit_params if submit_params: launch_params['params'] = dumps(submit_params) + if requirements: + launch_params['requirements'] = dumps([requirement.to_dict() for requirement in requirements]) return self._raw_execute("launch", launch_params) def kill(self): diff --git a/lwr/lwr_client/stager.py b/lwr/lwr_client/stager.py index d06b31daaafbd90d728712d1b7240d8483be84b0..d716083388a80af6b69023f9871992af5a3b8b7b 100644 --- a/lwr/lwr_client/stager.py +++ b/lwr/lwr_client/stager.py @@ -350,7 +350,7 @@ def submit_job(client, client_job_description, job_config=None): file_stager = FileStager(client, client_job_description, job_config) rebuilt_command_line = file_stager.get_rewritten_command_line() job_id = file_stager.job_id - client.launch(rebuilt_command_line) + client.launch(rebuilt_command_line, requirements=client_job_description.requirements) return job_id @@ -383,15 +383,17 @@ class ClientJobDescription(object): Directory containing tool to execute (if a wrapper is used, it will be transferred to remote server). working_directory : str Local path created by Galaxy for running this job. + requirements : list + List of requirements for tool execution. """ - def __init__(self, tool, command_line, config_files, input_files, output_files, working_directory): + def __init__(self, tool, command_line, config_files, input_files, output_files, working_directory, requirements): self.tool = tool self.command_line = command_line self.config_files = config_files self.input_files = input_files self.output_files = output_files self.working_directory = working_directory - + self.requirements = requirements __all__ = [submit_job, ClientJobDescription, finish_job] diff --git a/lwr/managers/base/__init__.py b/lwr/managers/base/__init__.py index ead775d7a2fc038c1e26e31b21f27f8fe32a0d54..6192f239ebe3542c78946b1385ac35ca268da73f 100644 --- a/lwr/managers/base/__init__.py +++ b/lwr/managers/base/__init__.py @@ -50,6 +50,7 @@ class BaseManager(ManagerInterface): self.debug = str(kwds.get("debug", False)).lower() == "true" self.authorizer = app.authorizer self.__init_system_properties() + self.dependency_manager = app.dependency_manager def clean(self, job_id): if self.debug: @@ -157,3 +158,9 @@ class BaseManager(ManagerInterface): path = join(config_files_dir, file) authorization.authorize_config_file(job_directory, file, path) authorization.authorize_execution(job_directory, command_line) + + def _expand_command_line(self, command_line, requirements): + dependency_commands = self.dependency_manager.dependency_shell_commands(requirements) + if dependency_commands: + command_line = "%s; %s" % ("; ".join(dependency_commands), command_line) + return command_line diff --git a/lwr/managers/base/base_drmaa.py b/lwr/managers/base/base_drmaa.py index 054063a91ee7271e066fce01bec494ed7044f026..ae91c62bbb8a8e9ad43a5a34fb03afc3560aa01c 100644 --- a/lwr/managers/base/base_drmaa.py +++ b/lwr/managers/base/base_drmaa.py @@ -37,12 +37,12 @@ class BaseDrmaaManager(ExternalBaseManager): JobState.FAILED: 'complete', # Should be a FAILED state here as well }[drmaa_state] - def _build_template_attributes(self, job_id, command_line): + def _build_template_attributes(self, job_id, command_line, requirements=[]): stdout_path = self._stdout_path(job_id) stderr_path = self._stderr_path(job_id) attributes = { - "remoteCommand": self._setup_job_file(job_id, command_line), + "remoteCommand": self._setup_job_file(job_id, command_line, requirements=requirements), "jobName": self._job_name(job_id), "outputPath": ":%s" % stdout_path, "errorPath": ":%s" % stderr_path, diff --git a/lwr/managers/base/external.py b/lwr/managers/base/external.py index c5df8e64c0d70ee2ac4998a0b78dc32fb05884c2..5d4758b89ebd9e1f95ae3662cc63ab16881e7298 100644 --- a/lwr/managers/base/external.py +++ b/lwr/managers/base/external.py @@ -36,7 +36,8 @@ class ExternalBaseManager(DirectoryBaseManager): raise KeyError("Failed to find external id for job_id %s" % job_id) return self._get_status_external(external_id) - def _setup_job_file(self, job_id, command_line): + def _setup_job_file(self, job_id, command_line, requirements=[]): + command_line = self._expand_command_line(command_line, requirements) script_env = self._job_template_env(job_id, command_line=command_line) script = job_script(**script_env) return self._write_job_script(job_id, script) diff --git a/lwr/managers/queued.py b/lwr/managers/queued.py index d926dc0c3d40981c93b57640803118ebab211cb9..02ad47fa45ad6555da3440ff854b289de931866a 100644 --- a/lwr/managers/queued.py +++ b/lwr/managers/queued.py @@ -47,8 +47,8 @@ class QueueManager(Manager): worker.start() self.work_threads.append(worker) - def launch(self, job_id, command_line, submit_params={}): - self._prepare_run(job_id, command_line) + def launch(self, job_id, command_line, submit_params={}, requirements=[]): + command_line = self._prepare_run(job_id, command_line, requirements=requirements) self.work_queue.put((RUN, (job_id, command_line))) self.persisted_job_store.enqueue(job_id, command_line) diff --git a/lwr/managers/queued_cli.py b/lwr/managers/queued_cli.py index 3a82c3d8a84db65d0b7bd1b40d920e6122fc192a..dd7803735a3822ac902f05aff21ce8dd7bb71468 100644 --- a/lwr/managers/queued_cli.py +++ b/lwr/managers/queued_cli.py @@ -20,7 +20,7 @@ class CliQueueManager(ExternalBaseManager): self.cli_interface = CliInterface(code_dir='.') self.shell_params, self.job_params = split_params(kwds) - def launch(self, job_id, command_line, submit_params={}): + def launch(self, job_id, command_line, submit_params={}, requirements=[]): self._check_execution_with_tool_file(job_id, command_line) shell, job_interface = self.__get_cli_plugins() return_code_path = self._return_code_path(job_id) @@ -28,6 +28,7 @@ class CliQueueManager(ExternalBaseManager): stderr_path = self._stderr_path(job_id) job_name = self._job_name(job_id) working_directory = self.working_directory(job_id) + command_line = self._expand_command_line(command_line, requirements) script = job_interface.get_job_template(stdout_path, stderr_path, job_name, working_directory, command_line, return_code_path) script_path = self._write_job_script(job_id, script) submission_command = job_interface.submit(script_path) diff --git a/lwr/managers/queued_condor.py b/lwr/managers/queued_condor.py index e92581202e3a9a350cfa356c3e7cc4ed669b1f22..8d1b3d2ddfe940e2105640053664ed78d4a8c527 100644 --- a/lwr/managers/queued_condor.py +++ b/lwr/managers/queued_condor.py @@ -25,9 +25,9 @@ class CondorQueueManager(ExternalBaseManager): self.user_log_sizes = {} self.state_cache = {} - def launch(self, job_id, command_line, submit_params={}): + def launch(self, job_id, command_line, submit_params={}, requirements=[]): self._check_execution_with_tool_file(job_id, command_line) - job_file_path = self._setup_job_file(job_id, command_line) + job_file_path = self._setup_job_file(job_id, command_line, requirements=requirements) log_path = self.__condor_user_log(job_id) open(log_path, 'w') # Touch log file build_submit_params = dict( diff --git a/lwr/managers/queued_drmaa.py b/lwr/managers/queued_drmaa.py index 7b723ea8bc79e65a1954b122852d26f97a1bae56..ef946fce2cbb304c7b7a64ae57cb1ff6102dd400 100644 --- a/lwr/managers/queued_drmaa.py +++ b/lwr/managers/queued_drmaa.py @@ -7,9 +7,9 @@ class DrmaaQueueManager(BaseDrmaaManager): """ manager_type = "queued_drmaa" - def launch(self, job_id, command_line, submit_params={}): + def launch(self, job_id, command_line, submit_params={}, requirements=[]): self._check_execution_with_tool_file(job_id, command_line) - attributes = self._build_template_attributes(job_id, command_line) + attributes = self._build_template_attributes(job_id, command_line, requirements=requirements) external_id = self.drmaa_session.run_job(**attributes) self._register_external_id(job_id, external_id) diff --git a/lwr/managers/queued_external_drmaa.py b/lwr/managers/queued_external_drmaa.py index 2bac0af08c55fb9baecb1f1b2d2fc1c948129c32..9ad769d9d332af94c76f38bdcd123478e9845989 100644 --- a/lwr/managers/queued_external_drmaa.py +++ b/lwr/managers/queued_external_drmaa.py @@ -27,9 +27,10 @@ class ExternalDrmaaQueueManager(BaseDrmaaManager): self.reclaimed = {} self.user_map = {} - def launch(self, job_id, command_line, submit_params={}): + def launch(self, job_id, command_line, submit_params={}, requirements=[]): self._check_execution_with_tool_file(job_id, command_line) - attributes = self._build_template_attributes(job_id, command_line) + attributes = self._build_template_attributes(job_id, command_line, requirements=requirements) + print open(attributes['remoteCommand'], 'r').read() job_attributes_file = self._write_job_file(job_id, 'jt.json', dumps(attributes)) user = submit_params.get('user', None) log.info("Submit as user %s" % user) diff --git a/lwr/managers/unqueued.py b/lwr/managers/unqueued.py index 96113e5ee00817ae1fd7764ddacac98e9c0074ba..b6c582feabde98f44f19e34cd44f26699e83cf48 100644 --- a/lwr/managers/unqueued.py +++ b/lwr/managers/unqueued.py @@ -170,12 +170,14 @@ class Manager(DirectoryBaseManager): else: self._monitor_execution(job_id, proc, stdout, stderr) - def launch(self, job_id, command_line, submit_params={}): - self._prepare_run(job_id, command_line) + def launch(self, job_id, command_line, submit_params={}, requirements=[]): + command_line = self._prepare_run(job_id, command_line, requirements=requirements) self._run(job_id, command_line) - def _prepare_run(self, job_id, command_line): + def _prepare_run(self, job_id, command_line, requirements): self._check_execution_with_tool_file(job_id, command_line) self._record_submission(job_id) + command_line = self._expand_command_line(command_line, requirements) + return command_line __all__ = [Manager] diff --git a/lwr/routes.py b/lwr/routes.py index 90988b7af3046c2fcbffb0e9e2a35d424412b8f7..dffddfcc69d90879225717e59f61e8bb2f6ed0dd 100644 --- a/lwr/routes.py +++ b/lwr/routes.py @@ -6,6 +6,8 @@ from lwr.util import get_mapped_file, copy_to_path, copy_to_temp, verify_is_in_d from lwr.framework import Controller from lwr.manager_factory import DEFAULT_MANAGER_NAME +from galaxy.tools.deps.requirements import ToolRequirement + import logging log = logging.getLogger(__name__) @@ -56,9 +58,10 @@ def clean(manager, job_id): @LwrController() -def launch(manager, job_id, command_line, params='{}'): +def launch(manager, job_id, command_line, params='{}', requirements='[]'): submit_params = loads(params) - manager.launch(job_id, command_line, submit_params) + requirements = [ToolRequirement.from_dict(requirement_dict) for requirement_dict in loads(requirements)] + manager.launch(job_id, command_line, submit_params, requirements) @LwrController(response_type='json') diff --git a/test/check.py b/test/check.py index e59558ab75f257a33df89f482efb6e4b291bd830..3b6516d1b674f9711a77d39ebb70371c6bcdc349 100644 --- a/test/check.py +++ b/test/check.py @@ -7,14 +7,18 @@ import traceback from io import open from lwr.lwr_client import submit_job, finish_job, ClientManager, ClientJobDescription +from galaxy.tools.deps.requirements import ToolRequirement TEST_SCRIPT = b""" import sys +from os import getenv + config_input = open(sys.argv[1], 'r') input_input = open(sys.argv[2], 'r') output = open(sys.argv[3], 'w') output2 = open(sys.argv[5], 'w') output2_contents = sys.argv[6] +output3 = open(sys.argv[7], 'w') try: assert input_input.read() == "Hello world input!!@!" contents = config_input.read(1024) @@ -22,14 +26,17 @@ try: open("workdir_output", "w").write("WORK DIR OUTPUT") output2.write(output2_contents) with open("galaxy.json", "w") as f: f.write("GALAXY_JSON") + output3.write(getenv("MOO", "moo_default")) finally: output.close() config_input.close() output2.close() + output3.close() """ EXPECTED_OUTPUT = b"hello world output" EXAMPLE_UNICODE_TEXT = u'єχαмÏâ„“Ñ” συтÏÏ…Ñ‚' +TEST_REQUIREMENT = ToolRequirement(name="dep1", version="1.1", type="package") class MockTool(object): @@ -53,6 +60,7 @@ def run(options): temp_tool_path = os.path.join(temp_directory, "t", "script.py") temp_output_path = os.path.join(temp_directory, "output") temp_output2_path = os.path.join(temp_directory, "output2") + temp_output3_path = os.path.join(temp_directory, "output3") __write_to_file(temp_input_path, b"Hello world input!!@!") __write_to_file(temp_config_path, EXPECTED_OUTPUT) @@ -67,12 +75,18 @@ def run(options): empty_input, temp_output2_path, EXAMPLE_UNICODE_TEXT, + temp_output3_path, ) - command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params + command_line = u'python %s "%s" "%s" "%s" "%s" "%s" "%s" "%s"' % command_line_params config_files = [temp_config_path] input_files = [temp_input_path, empty_input] - output_files = [temp_output_path, temp_output2_path] + output_files = [temp_output_path, temp_output2_path, temp_output3_path] client = __client(options) + requirements = [] + test_requirement = options.get("test_requirement", False) + if test_requirement: + requirements.append(TEST_REQUIREMENT) + job_description = ClientJobDescription( command_line=command_line, tool=MockTool(temp_tool_dir), @@ -80,6 +94,7 @@ def run(options): input_files=input_files, output_files=output_files, working_directory=temp_work_dir, + requirements=requirements, ) submit_job(client, job_description) result_status = client.wait() @@ -94,10 +109,13 @@ def run(options): ) failed = finish_job(**finish_args) if failed: - raise Exception("Failed to finish job correctly") + raise Exception("Failed to finish job correctly - %s" % result_status) __check_outputs(temp_output_path, temp_output2_path) __assert_contents(os.path.join(temp_work_dir, "galaxy.json"), b"GALAXY_JSON") - + if test_requirement: + __assert_contents(temp_output3_path, "moo_override") + else: + __assert_contents(temp_output3_path, "moo_default") __exercise_errors(options, client, temp_output_path, temp_directory) except BaseException: if not options.suppress_output: @@ -115,11 +133,11 @@ def __check_outputs(temp_output_path, temp_output2_path): __assert_contents(temp_output2_path, EXAMPLE_UNICODE_TEXT) -def __assert_contents(path, contents): - file = open(path, 'rb') +def __assert_contents(path, expected_contents): + file = open(path, 'r', encoding="utf-8") try: contents = file.read() - assert contents == contents, "Invalid contents: %s" % contents + assert contents == expected_contents, "Invalid contents: %s" % contents finally: file.close() diff --git a/test/integration_test.py b/test/integration_test.py index cf526e88e77c0f4d43656e65e8321c73e578bc35..159fed9837f9c0b60b7ab599bd41df2263916c0b 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -1,4 +1,5 @@ from os.path import join +from os import makedirs, system from six import next, itervalues from .test_utils import TempDirectoryTestCase, skipUnlessExecutable, skipUnlessModule @@ -15,19 +16,12 @@ class BaseIntegrationTest(TempDirectoryTestCase): def _run(self, app_conf={}, job_conf_props={}, **kwds): app_conf = app_conf.copy() job_conf_props = job_conf_props.copy() + if "suppress_output" not in kwds: kwds["suppress_output"] = False - if job_conf_props: - job_conf = join(self.temp_directory, "job_managers.ini") - config = ConfigParser() - section_name = "manager:_default_" - config.add_section(section_name) - for key, value in job_conf_props.iteritems(): - config.set(section_name, key, value) - with open(job_conf, "wb") as configf: - config.write(configf) - app_conf["job_managers_config"] = job_conf + self.__setup_job_properties(app_conf, job_conf_props) + self.__setup_dependencies(app_conf) if kwds.get("direct_interface", None): from .test_utils import test_app @@ -40,9 +34,41 @@ class BaseIntegrationTest(TempDirectoryTestCase): options = Bunch(url=server.application_url, **kwds) run(options) + def __setup_job_properties(self, app_conf, job_conf_props): + if job_conf_props: + job_conf = join(self.temp_directory, "job_managers.ini") + config = ConfigParser() + section_name = "manager:_default_" + config.add_section(section_name) + for key, value in job_conf_props.iteritems(): + config.set(section_name, key, value) + with open(job_conf, "wb") as configf: + config.write(configf) + + app_conf["job_managers_config"] = job_conf + + def __setup_dependencies(self, app_conf): + dependencies_dir = join(self.temp_directory, "dependencies") + dep1_directory = join(dependencies_dir, "dep1", "1.1") + makedirs(dep1_directory) + try: + # Let external users read/execute this directory for run as user + # test. + system("chmod 755 %s" % self.temp_directory) + system("chmod -R 755 %s" % dependencies_dir) + except Exception as e: + print e + env_file = join(dep1_directory, "env.sh") + with open(env_file, "w") as env: + env.write("MOO=moo_override; export MOO") + app_conf["tool_dependency_dir"] = dependencies_dir + class IntegrationTests(BaseIntegrationTest): - default_kwargs = dict(direct_interface=False) + default_kwargs = dict(direct_interface=False, test_requirement=True) + + def test_integration_no_requirement(self): + self._run(private_token=None, **self.default_kwargs) @skipUnlessModule("drmaa") def test_integration_as_user(self): @@ -85,4 +111,4 @@ class IntegrationTests(BaseIntegrationTest): class DirectIntegrationTests(IntegrationTests): - default_kwargs = dict(direct_interface=True) + default_kwargs = dict(direct_interface=True, test_requirement=False)