diff --git a/local_env.sh.sample b/local_env.sh.sample index 6eb9b4085b2e921917a7a18ed0ba5d9f521ea84c..4a27ee33ad0e9cb23371e5f90a5afb38cb3275cc 100644 --- a/local_env.sh.sample +++ b/local_env.sh.sample @@ -7,7 +7,11 @@ ## If you wish to use a variety of Galaxy tools that depend on galaxy.eggs being defined, ## set GALAXY_HOME to point to a copy of Galaxy. -#export GALAXY_HOME=/path/to/galaxy-dist +#export GALAXY_HOME=/path/to/galaxy + +## In order to set metadata on the pulsar side, Pulsar needs to know where +## Galaxy's virtualenv is. If not using remote metadata, this is unnecessary. +#export GALAXY_VIRTUAL_ENV=/path/to/galaxy/.venv ## Uncomment to verify GALAXY_HOME is set properly before starting the Pulsar. #export TEST_GALAXY_LIBS=1 diff --git a/pulsar/client/client.py b/pulsar/client/client.py index e3af9783fd475c78eb42362bad96cad93af8a52b..ae6f24d280fd1be0ad34e6c65fc797d15867d516 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -63,7 +63,7 @@ class BaseJobClient(object): self.setup_handler = build_setup_handler(self, destination_params) - def setup(self, tool_id=None, tool_version=None): + def setup(self, tool_id=None, tool_version=None, preserve_galaxy_python_environment=None): """ Setup remote Pulsar server to run this job. """ @@ -72,6 +72,8 @@ class BaseJobClient(object): setup_args["tool_id"] = tool_id if tool_version: setup_args["tool_version"] = tool_version + if preserve_galaxy_python_environment: + setup_args["preserve_galaxy_python_environment"] = preserve_galaxy_python_environment return self.setup_handler.setup(**setup_args) @property @@ -394,9 +396,11 @@ def _setup_params_from_job_config(job_config): job_id = job_config.get("job_id", None) tool_id = job_config.get("tool_id", None) tool_version = job_config.get("tool_version", None) + preserve_galaxy_python_environment = job_config.get("preserve_galaxy_python_environment", None) return dict( job_id=job_id, tool_id=tool_id, tool_version=tool_version, use_metadata=True, + preserve_galaxy_python_environment=preserve_galaxy_python_environment, ) diff --git a/pulsar/client/setup_handler.py b/pulsar/client/setup_handler.py index 8147579499c37cfb22d0d9692131959c3ef0a350..e0fe6f9398f3d9a3bfbf05cd415eefec42b4fab9 100644 --- a/pulsar/client/setup_handler.py +++ b/pulsar/client/setup_handler.py @@ -39,13 +39,14 @@ class LocalSetupHandler(object): self.system_properties = system_properties self.jobs_directory = destination_args["jobs_directory"] - def setup(self, job_id, tool_id=None, tool_version=None): + def setup(self, job_id, tool_id=None, tool_version=None, preserve_galaxy_python_environment=None): return build_job_config( job_id=job_id, job_directory=self.client.job_directory, system_properties=self.system_properties, tool_id=tool_id, tool_version=tool_version, + preserve_galaxy_python_environment=preserve_galaxy_python_environment, ) @property @@ -75,7 +76,7 @@ class RemoteSetupHandler(object): return False -def build_job_config(job_id, job_directory, system_properties={}, tool_id=None, tool_version=None): +def build_job_config(job_id, job_directory, system_properties={}, tool_id=None, tool_version=None, preserve_galaxy_python_environment=None): """ """ inputs_directory = job_directory.inputs_directory() @@ -100,6 +101,7 @@ def build_job_config(job_id, job_directory, system_properties={}, tool_id=None, "job_id": job_id, "system_properties": system_properties, "pulsar_version": pulsar_version, + "preserve_galaxy_python_environment": preserve_galaxy_python_environment, } if tool_id: job_config["tool_id"] = tool_id diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py index f0baadc759309517394ca93b25ff27052c476eeb..f57163d5047b13cd8a48b16e325134730384aa6e 100644 --- a/pulsar/manager_endpoint_util.py +++ b/pulsar/manager_endpoint_util.py @@ -80,7 +80,7 @@ def submit_job(manager, job_config): input_job_id, tool_id, tool_version, - use_metadata + use_metadata, ) if job_config is not None: @@ -98,6 +98,7 @@ def submit_job(manager, job_config): submit_params, dependencies_description=dependencies_description, env=env, + setup_params=setup_params, ) diff --git a/pulsar/managers/__init__.py b/pulsar/managers/__init__.py index eb088171133525c7ab4cf0f1aa83cde7cf5fca3f..68bdc02b1644cc6b4183d811262f218d2e4cdaa0 100644 --- a/pulsar/managers/__init__.py +++ b/pulsar/managers/__init__.py @@ -26,7 +26,7 @@ class ManagerInterface(object): """ @abstractmethod - def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[]): + def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None): """ Called to indicate that the client is ready for this job with specified job id and command line to be executed (i.e. run or queue this job diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py index 91d275703d3abaeee8d3cc977d922e84641a2432..bdda6b14895a5b96e2d94c7209c2d31806042397 100644 --- a/pulsar/managers/base/__init__.py +++ b/pulsar/managers/base/__init__.py @@ -87,6 +87,7 @@ class BaseManager(ManagerInterface): def __init_galaxy_system_properties(self, kwds): self.galaxy_home = kwds.get('galaxy_home', None) + self.galaxy_virtual_env = kwds.get('galaxy_virtual_env', None) self.galaxy_config_file = kwds.get('galaxy_config_file', None) self.galaxy_dataset_files_path = kwds.get('galaxy_dataset_files_path', None) self.galaxy_datatypes_config_file = kwds.get('galaxy_datatypes_config_file', None) @@ -98,6 +99,9 @@ class BaseManager(ManagerInterface): galaxy_home = self._galaxy_home() if galaxy_home: system_properties["galaxy_home"] = galaxy_home + galaxy_virtual_env = self._galaxy_virtual_env() + if galaxy_virtual_env: + system_properties["galaxy_virtual_env"] = galaxy_virtual_env for property in ['galaxy_config_file', 'galaxy_dataset_files_path', 'galaxy_datatypes_config_file']: value = getattr(self, property, None) if value: @@ -116,6 +120,9 @@ class BaseManager(ManagerInterface): def _galaxy_home(self): return self.galaxy_home or getenv('GALAXY_HOME', None) + def _galaxy_virtual_env(self): + return self.galaxy_virtual_env or getenv('GALAXY_VIRTUAL_ENV', None) + def _galaxy_lib(self): galaxy_home = self._galaxy_home() galaxy_lib = None diff --git a/pulsar/managers/base/base_drmaa.py b/pulsar/managers/base/base_drmaa.py index cbd27dcaec873b8336c34779de135d4993711fbe..d81602acbff51a877550d9987b65ba4abde0827a 100644 --- a/pulsar/managers/base/base_drmaa.py +++ b/pulsar/managers/base/base_drmaa.py @@ -51,12 +51,18 @@ class BaseDrmaaManager(ExternalBaseManager): JobState.FAILED: status.COMPLETE, # Should be a FAILED state here as well }[drmaa_state] - def _build_template_attributes(self, job_id, command_line, dependencies_description=None, env=[], submit_params={}): + def _build_template_attributes(self, job_id, command_line, dependencies_description=None, env=[], submit_params={}, setup_params=None): stdout_path = self._stdout_path(job_id) stderr_path = self._stderr_path(job_id) attributes = { - "remoteCommand": self._setup_job_file(job_id, command_line, dependencies_description=dependencies_description, env=env), + "remoteCommand": self._setup_job_file( + job_id, + command_line, + dependencies_description=dependencies_description, + env=env, + setup_params=setup_params + ), "jobName": self._job_name(job_id), "outputPath": ":%s" % stdout_path, "errorPath": ":%s" % stderr_path, diff --git a/pulsar/managers/base/directory.py b/pulsar/managers/base/directory.py index 0463b84c9b0c2ba4de0a091cd2a882418047eece..2bdb52b041ebd5f9fc6cc6af5132577ed1a8514c 100644 --- a/pulsar/managers/base/directory.py +++ b/pulsar/managers/base/directory.py @@ -104,20 +104,23 @@ class DirectoryBaseManager(BaseManager): return tool_id # Helpers methods related to setting up job script files. - def _setup_job_file(self, job_id, command_line, dependencies_description=None, env=[]): + def _setup_job_file(self, job_id, command_line, dependencies_description=None, env=[], setup_params=None): command_line = self._expand_command_line(command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory) - script_env = self._job_template_env(job_id, command_line=command_line, env=env) + script_env = self._job_template_env(job_id, command_line=command_line, env=env, setup_params=setup_params) script = job_script(**script_env) return self._write_job_script(job_id, script) - def _job_template_env(self, job_id, command_line=None, env=[]): + def _job_template_env(self, job_id, command_line=None, env=[], setup_params=None): return_code_path = self._return_code_path(job_id) # TODO: Add option to ignore remote env. env = env + self.env_vars + setup_params = setup_params or {} env_setup_commands = map(env_to_statement, env) job_template_env = { 'job_instrumenter': self.job_metrics.default_job_instrumenter, + 'galaxy_virtual_env': self._galaxy_virtual_env(), 'galaxy_lib': self._galaxy_lib(), + 'preserve_python_environment': setup_params.get('preserve_galaxy_python_environment', False), 'env_setup_commands': env_setup_commands, 'exit_code_path': return_code_path, 'working_directory': self.job_directory(job_id).working_directory(), diff --git a/pulsar/managers/queued.py b/pulsar/managers/queued.py index 5295af15f65e06310fd3702eaab8b6c73d840ebb..ac3758272d6bcd7be6dc9355e1e8bd988534417b 100644 --- a/pulsar/managers/queued.py +++ b/pulsar/managers/queued.py @@ -45,8 +45,14 @@ class QueueManager(Manager): worker.start() self.work_threads.append(worker) - def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[]): - command_line = self._prepare_run(job_id, command_line, dependencies_description=dependencies_description, env=env) + def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None): + command_line = self._prepare_run( + job_id, + command_line, + dependencies_description=dependencies_description, + env=env, + setup_params=setup_params + ) try: self._job_directory(job_id).store_metadata(JOB_FILE_COMMAND_LINE, command_line) except Exception: diff --git a/pulsar/managers/queued_cli.py b/pulsar/managers/queued_cli.py index 016cd6c62e81c99389586854155f4be56c6b84e7..943ee062dc4b3247b45332c2eaff2be6480b5217 100644 --- a/pulsar/managers/queued_cli.py +++ b/pulsar/managers/queued_cli.py @@ -20,14 +20,19 @@ class CliQueueManager(ExternalBaseManager): self.cli_interface = CliInterface(code_dir='.') self.shell_params, self.job_params = split_params(kwds) - def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[]): + def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None): self._check_execution_with_tool_file(job_id, command_line) shell, job_interface = self.__get_cli_plugins() stdout_path = self._stdout_path(job_id) stderr_path = self._stderr_path(job_id) job_name = self._job_name(job_id) command_line = self._expand_command_line(command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory) - job_script_kwargs = self._job_template_env(job_id, command_line=command_line, env=env) + job_script_kwargs = self._job_template_env( + job_id, + command_line=command_line, + env=env, + setup_params=setup_params + ) extra_kwargs = job_interface.job_script_kwargs(stdout_path, stderr_path, job_name) job_script_kwargs.update(extra_kwargs) script = job_script(**job_script_kwargs) diff --git a/pulsar/managers/queued_condor.py b/pulsar/managers/queued_condor.py index 2baa156e20fbc3014a7184f245f018c607e54491..5fb7140a0189f31fa7b2e7c24d888d6d78b269af 100644 --- a/pulsar/managers/queued_condor.py +++ b/pulsar/managers/queued_condor.py @@ -26,9 +26,15 @@ class CondorQueueManager(ExternalBaseManager): self.user_log_sizes = {} self.state_cache = {} - def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[]): + def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None): self._check_execution_with_tool_file(job_id, command_line) - job_file_path = self._setup_job_file(job_id, command_line, dependencies_description=dependencies_description, env=env) + job_file_path = self._setup_job_file( + job_id, + command_line, + dependencies_description=dependencies_description, + env=env, + setup_params=setup_params + ) log_path = self.__condor_user_log(job_id) open(log_path, 'w') # Touch log file build_submit_params = dict( diff --git a/pulsar/managers/queued_drmaa.py b/pulsar/managers/queued_drmaa.py index 0ffc68fd0e61069f1c3c9408677df68de10c5211..d4efb9cc0db96d7f56524cc2e47a0c72d217f854 100644 --- a/pulsar/managers/queued_drmaa.py +++ b/pulsar/managers/queued_drmaa.py @@ -10,7 +10,7 @@ class DrmaaQueueManager(BaseDrmaaManager): """ manager_type = "queued_drmaa" - def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[]): + def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None): self._check_execution_with_tool_file(job_id, command_line) attributes = self._build_template_attributes( job_id, @@ -18,6 +18,7 @@ class DrmaaQueueManager(BaseDrmaaManager): dependencies_description=dependencies_description, env=env, submit_params=submit_params, + setup_params=setup_params, ) external_id = self.drmaa_session.run_job(**attributes) log.info("Submitted DRMAA job with Pulsar job id %s and external id %s", job_id, external_id) diff --git a/pulsar/managers/queued_drmaa_xsede.py b/pulsar/managers/queued_drmaa_xsede.py index aae6d209b80220b8f1b7ef9f9b7c373fa49fab48..7123ff22115b469d632d02b060c562095a92fb98 100644 --- a/pulsar/managers/queued_drmaa_xsede.py +++ b/pulsar/managers/queued_drmaa_xsede.py @@ -16,13 +16,14 @@ class XsedeDrmaaQueueManager(DrmaaQueueManager): """ manager_type = "queued_drmaa_xsede" - def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[]): + def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None): super(XsedeDrmaaQueueManager, self).launch( job_id, command_line, submit_params=submit_params, dependencies_description=dependencies_description, - env=env + env=env, + setup_params=setup_params, ) try: check_call([ diff --git a/pulsar/managers/queued_external_drmaa.py b/pulsar/managers/queued_external_drmaa.py index fa7bbb76649cac1a0469eda29bb880388e6a99f5..9925f13f09d19e499cd1a4ebaef7a16337a4cd83 100644 --- a/pulsar/managers/queued_external_drmaa.py +++ b/pulsar/managers/queued_external_drmaa.py @@ -31,7 +31,7 @@ class ExternalDrmaaQueueManager(BaseDrmaaManager): self.reclaimed = {} self.user_map = {} - def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[]): + def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None): self._check_execution_with_tool_file(job_id, command_line) attributes = self._build_template_attributes( job_id, @@ -39,6 +39,7 @@ class ExternalDrmaaQueueManager(BaseDrmaaManager): dependencies_description=dependencies_description, env=env, submit_params=submit_params, + setup_params=setup_params, ) print(open(attributes['remoteCommand'], 'r').read()) job_attributes_file = self._write_job_file(job_id, 'jt.json', dumps(attributes)) diff --git a/pulsar/managers/unqueued.py b/pulsar/managers/unqueued.py index 525215a58391c8881e826d7a5d79302f24343843..422a85d1a0e55ece4760726ccf26c65e8a57ebf1 100644 --- a/pulsar/managers/unqueued.py +++ b/pulsar/managers/unqueued.py @@ -142,11 +142,11 @@ class Manager(DirectoryBaseManager): else: self._monitor_execution(job_id, proc, stdout, stderr) - def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[]): - command_line = self._prepare_run(job_id, command_line, dependencies_description=dependencies_description, env=env) + def launch(self, job_id, command_line, submit_params={}, dependencies_description=None, env=[], setup_params=None): + command_line = self._prepare_run(job_id, command_line, dependencies_description=dependencies_description, env=env, setup_params=setup_params) self._run(job_id, command_line) - def _prepare_run(self, job_id, command_line, dependencies_description, env): + def _prepare_run(self, job_id, command_line, dependencies_description, env, setup_params=None): self._check_execution_with_tool_file(job_id, command_line) self._record_submission(job_id) if platform.system().lower() == "windows": @@ -154,7 +154,13 @@ class Manager(DirectoryBaseManager): # process them or at least warn about them being ignored. command_line = self._expand_command_line(command_line, dependencies_description, job_directory=self.job_directory(job_id).job_directory) else: - command_line = self._setup_job_file(job_id, command_line, dependencies_description=dependencies_description, env=env) + command_line = self._setup_job_file( + job_id, + command_line, + dependencies_description=dependencies_description, + env=env, + setup_params=setup_params + ) return command_line diff --git a/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh b/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh index 6b3c283bd2e7f5ab191d712112208551fd93f9db..68da1b3e36284ce963d7a39a01d404c394620af7 100644 --- a/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh +++ b/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh @@ -1,24 +1,33 @@ #!$shell $headers + +_galaxy_setup_environment() { + local _use_framework_galaxy="$1" + if [ "$GALAXY_LIB" != "None" -a "$_use_framework_galaxy" = "True" ]; then + if [ -n "$PYTHONPATH" ]; then + PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" + else + PYTHONPATH="$GALAXY_LIB" + fi + export PYTHONPATH + fi + $env_setup_commands + if [ "$GALAXY_VIRTUAL_ENV" != "None" -a -f "$GALAXY_VIRTUAL_ENV/bin/activate" \ + -a "`command -v python`" != "$GALAXY_VIRTUAL_ENV/bin/python" ]; then + . "$GALAXY_VIRTUAL_ENV/bin/activate" + fi +} + $integrity_injection $slots_statement export GALAXY_SLOTS -GALAXY_LIB="$galaxy_lib" -if [ "$GALAXY_LIB" != "None" ]; then - if [ -n "$PYTHONPATH" ]; then - PYTHONPATH="$GALAXY_LIB:$PYTHONPATH" - else - PYTHONPATH="$GALAXY_LIB" - fi - export PYTHONPATH -fi -$env_setup_commands GALAXY_VIRTUAL_ENV="$galaxy_virtual_env" -if [ "$GALAXY_VIRTUAL_ENV" != "None" -a -z "$VIRTUAL_ENV" \ - -a -f "$GALAXY_VIRTUAL_ENV/bin/activate" ]; then - . "$GALAXY_VIRTUAL_ENV/bin/activate" -fi +_GALAXY_VIRTUAL_ENV="$galaxy_virtual_env" +PRESERVE_GALAXY_ENVIRONMENT="$preserve_python_environment" +GALAXY_LIB="$galaxy_lib" +_galaxy_setup_environment "$PRESERVE_GALAXY_ENVIRONMENT" +GALAXY_PYTHON=`command -v python` $instrument_pre_commands cd $working_directory $command diff --git a/pulsar/managers/util/job_script/__init__.py b/pulsar/managers/util/job_script/__init__.py index a08857c24626087d813a7e51b7ea7d1deed328d6..77af249e5c4817d3b05b9e682beb576eef133584 100644 --- a/pulsar/managers/util/job_script/__init__.py +++ b/pulsar/managers/util/job_script/__init__.py @@ -1,10 +1,11 @@ import os -from string import Template import subprocess import time -from pkg_resources import resource_string +from string import Template +from pkg_resources import resource_string from six import text_type + from galaxy.util import unicodify DEFAULT_SHELL = '/bin/bash' @@ -44,6 +45,7 @@ OPTIONAL_TEMPLATE_PARAMS = { 'instrument_post_commands': '', 'integrity_injection': INTEGRITY_INJECTION, 'shell': DEFAULT_SHELL, + 'preserve_python_environment': True, } @@ -140,9 +142,9 @@ def _handle_script_integrity(path, config): raise Exception("Failed to write job script, could not verify job script integrity.") -__all__ = [ +__all__ = ( 'check_script_integrity', 'job_script', 'write_script', 'INTEGRITY_INJECTION', -] +)