diff --git a/Makefile b/Makefile index 66afe5019a6a8e9c59f5b26e873cf50075e2e053..572f7c843427965c77aaf184037e101497537111 100644 --- a/Makefile +++ b/Makefile @@ -85,6 +85,11 @@ coverage: coverage html open htmlcov/index.html || xdg-open htmlcov/index.html +develop: + python setup.py develop + +develop-galaxy: + PULSAR_GALAXY_LIB=1 python setup.py develop ready-docs: rm -f docs/$(SOURCE_DIR).rst diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 6b7133f657e0cbc3e4e82d272ece069b823d26f2..4e8877cbce71cdc5a18465e5b9b68e7b385e34ae 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -__version__ = '0.7.0.dev2' +__version__ = '0.7.0.dev3' PROJECT_NAME = "pulsar" PROJECT_OWNER = PROJECT_USERAME = "galaxyproject" diff --git a/pulsar/client/client.py b/pulsar/client/client.py index 32e3605d375826e32edc8d197c8cde4eae5bd3f3..4624776b9997cf2030b929fb34644f8949428df6 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -118,12 +118,13 @@ class JobClient(BaseJobClient): launch_params['env'] = json_dumps(env) if remote_staging: launch_params['remote_staging'] = json_dumps(remote_staging) + if job_config and self.setup_handler.local: # Setup not yet called, job properties were inferred from # destination arguments. Hence, must have Pulsar setup job # before queueing. setup_params = _setup_params_from_job_config(job_config) - launch_params["setup_params"] = json_dumps(setup_params) + launch_params['setup_params'] = json_dumps(setup_params) return self._raw_execute("submit", launch_params) def full_status(self): @@ -305,7 +306,7 @@ class MessageJobClient(BaseMessageJobClient): dependencies_description=dependencies_description, env=env, remote_staging=remote_staging, - job_config=job_config + job_config=job_config, ) response = self.client_manager.exchange.publish("setup", launch_params) log.info("Job published to setup message queue.") @@ -330,7 +331,7 @@ class MessageCLIJobClient(BaseMessageJobClient): dependencies_description=dependencies_description, env=env, remote_staging=remote_staging, - job_config=job_config + job_config=job_config, ) base64_message = to_base64_json(launch_params) submit_command = os.path.join(self.remote_pulsar_path, "scripts", "submit.bash") @@ -392,5 +393,6 @@ def _setup_params_from_job_config(job_config): return dict( job_id=job_id, tool_id=tool_id, - tool_version=tool_version + tool_version=tool_version, + use_metadata=True, ) diff --git a/pulsar/client/interface.py b/pulsar/client/interface.py index a83a20b3e45d433815832f292ffc20c261e3856b..0882c91b595f06059f9ccb466e494e4d9b93b0f0 100644 --- a/pulsar/client/interface.py +++ b/pulsar/client/interface.py @@ -110,7 +110,13 @@ class HttpPulsarInterface(PulsarInterface): class LocalPulsarInterface(PulsarInterface): - def __init__(self, destination_params, job_manager=None, file_cache=None, object_store=None): + def __init__(self, destination_params, job_manager=None, pulsar_app=None, file_cache=None, object_store=None): + if job_manager is None: + job_manager_name = destination_params.get("manager", None) + if job_manager_name is None: + job_manager = pulsar_app.only_manager + else: + job_manager = pulsar_app.managers[job_manager_name] self.job_manager = job_manager self.file_cache = file_cache self.object_store = object_store diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py index c5058103290b2bd8d9f0806ca3ef072d89353cb0..ab0d367ff977f547ef80394f74633ab349845798 100644 --- a/pulsar/client/manager.py +++ b/pulsar/client/manager.py @@ -50,13 +50,15 @@ class ClientManager(object): def __init__(self, **kwds): """Build a HTTP client or a local client that talks directly to a job manger.""" - if 'job_manager' in kwds: + if 'pulsar_app' in kwds or 'job_manager' in kwds: self.job_manager_interface_class = LocalPulsarInterface - job_manager = kwds['job_manager'] + pulsar_app = kwds.get('pulsar_app', None) + job_manager = kwds.get('job_manager', None) file_cache = kwds.get('file_cache', None) self.job_manager_interface_args = dict( job_manager=job_manager, - file_cache=file_cache + pulsar_app=pulsar_app, + file_cache=file_cache, ) else: self.job_manager_interface_class = HttpPulsarInterface diff --git a/pulsar/client/setup_handler.py b/pulsar/client/setup_handler.py index 2cc9d1afcc4ee38f8ba84ae7e3f481117f3eb317..368deb8b9d00f73965a20f8085ebe3822453871c 100644 --- a/pulsar/client/setup_handler.py +++ b/pulsar/client/setup_handler.py @@ -1,6 +1,8 @@ import os from .util import filter_destination_params +from pulsar import __version__ as pulsar_version + REMOTE_SYSTEM_PROPERTY_PREFIX = "remote_property_" @@ -62,6 +64,7 @@ class RemoteSetupHandler(object): self.client = client def setup(self, **setup_args): + setup_args["use_metadata"] = "true" return self.client.remote_setup(**setup_args) @property @@ -76,6 +79,7 @@ def build_job_config(job_id, job_directory, system_properties={}, tool_id=None, """ inputs_directory = job_directory.inputs_directory() working_directory = job_directory.working_directory() + metadata_directory = job_directory.metadata_directory() outputs_directory = job_directory.outputs_directory() configs_directory = job_directory.configs_directory() tools_directory = job_directory.tool_files_directory() @@ -84,6 +88,7 @@ def build_job_config(job_id, job_directory, system_properties={}, tool_id=None, job_config = { "job_directory": job_directory.path, "working_directory": working_directory, + "metadata_directory": metadata_directory, "outputs_directory": outputs_directory, "configs_directory": configs_directory, "tools_directory": tools_directory, @@ -93,6 +98,7 @@ def build_job_config(job_id, job_directory, system_properties={}, tool_id=None, "path_separator": sep, "job_id": job_id, "system_properties": system_properties, + "pulsar_version": pulsar_version, } if tool_id: job_config["tool_id"] = tool_id diff --git a/pulsar/core.py b/pulsar/core.py index ff6b484745e1eadfaed5abcfd84ab6bdc9dc91be..e4547d27ec83749b5b9b9fd1eb43ad8506921f35 100644 --- a/pulsar/core.py +++ b/pulsar/core.py @@ -127,8 +127,11 @@ class PulsarApp(object): self.dependency_manager = DependencyManager(dependencies_dir, resolvers_config_file) def __setup_job_metrics(self, conf): - job_metrics_config_file = conf.get("job_metrics_config_file", "job_metrics_conf.xml") - self.job_metrics = JobMetrics(job_metrics_config_file) + job_metrics = conf.get("job_metrics", None) + if job_metrics is None: + job_metrics_config_file = conf.get("job_metrics_config_file", "job_metrics_conf.xml") + job_metrics = JobMetrics(job_metrics_config_file) + self.job_metrics = job_metrics @property def only_manager(self): diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py index b48b9c1788f07eb2f8a22eec6351afc24c92be04..f0baadc759309517394ca93b25ff27052c476eeb 100644 --- a/pulsar/manager_endpoint_util.py +++ b/pulsar/manager_endpoint_util.py @@ -1,11 +1,17 @@ """ Composite actions over managers shared between HTTP endpoint (routes.py) and message queue. """ + +import logging +import os + +from pulsar import __version__ as pulsar_version from pulsar.client.setup_handler import build_job_config from pulsar.managers import status from pulsar.managers import PULSAR_UNKNOWN_RETURN_CODE from galaxy.tools.deps import dependencies -import os + +log = logging.getLogger(__name__) def status_dict(manager, job_id): @@ -39,10 +45,12 @@ def __job_complete_dict(complete_status, manager, job_id): stdout=stdout_contents, stderr=stderr_contents, working_directory=job_directory.working_directory(), + metadata_directory=job_directory.metadata_directory(), working_directory_contents=job_directory.working_directory_contents(), metadata_directory_contents=job_directory.metadata_directory_contents(), outputs_directory_contents=job_directory.outputs_directory_contents(), system_properties=manager.system_properties(), + pulsar_version=pulsar_version, ) return as_dict @@ -61,13 +69,19 @@ def submit_job(manager, job_config): dependencies_description = job_config.get('dependencies_description', None) env = job_config.get('env', []) submit_params = job_config.get('submit_params', {}) - job_config = None if setup_params or force_setup: input_job_id = setup_params.get("job_id", job_id) tool_id = setup_params.get("tool_id", None) tool_version = setup_params.get("tool_version", None) - job_config = setup_job(manager, input_job_id, tool_id, tool_version) + use_metadata = setup_params.get("use_metadata", False) + job_config = setup_job( + manager, + input_job_id, + tool_id, + tool_version, + use_metadata + ) if job_config is not None: job_directory = job_config["job_directory"] @@ -83,19 +97,21 @@ def submit_job(manager, job_config): command_line, submit_params, dependencies_description=dependencies_description, - env=env + env=env, ) -def setup_job(manager, job_id, tool_id, tool_version): +def setup_job(manager, job_id, tool_id, tool_version, use_metadata=False): """ Setup new job from these inputs and return dict summarizing state (used to configure command line). """ job_id = manager.setup_job(job_id, tool_id, tool_version) + if use_metadata: + manager.enable_metadata_directory(job_id) return build_job_config( job_id=job_id, job_directory=manager.job_directory(job_id), system_properties=manager.system_properties(), tool_id=tool_id, - tool_version=tool_version, + tool_version=tool_version ) diff --git a/pulsar/manager_factory.py b/pulsar/manager_factory.py index fdd68ddfca2579a5b7b86b6898ddf8690df5147d..7c2ccf31b1edc7d5d390fa1916ec7eded13185bd 100644 --- a/pulsar/manager_factory.py +++ b/pulsar/manager_factory.py @@ -63,8 +63,9 @@ def _populate_manager_descriptions_from_ini(manager_descriptions, job_managers_c def _get_default_options(conf): options = {} - if "assign_ids" in conf: - options["assign_ids"] = conf["assign_ids"] + for simple_key in ["assign_ids", "galaxy_home"]: + if simple_key in conf: + options[simple_key] = conf[simple_key] options["debug"] = conf.get("debug", False) # mode to create job directories with, if None just use # default (usually 0777 with umask applied). diff --git a/pulsar/managers/__init__.py b/pulsar/managers/__init__.py index c7a05926554d8752d6c6b8ca13b82281a25bd503..eb088171133525c7ab4cf0f1aa83cde7cf5fca3f 100644 --- a/pulsar/managers/__init__.py +++ b/pulsar/managers/__init__.py @@ -107,6 +107,9 @@ class ManagerProxy(object): def kill(self, *args, **kwargs): return self._proxied_manager.kill(*args, **kwargs) + def enable_metadata_directory(self, *args, **kwargs): + return self._proxied_manager.enable_metadata_directory(*args, **kwargs) + def shutdown(self, timeout=None): """ Optional. """ try: diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py index a1365d0108e90904ef07337a1611521402398177..738dc3d0721f540c66e7db726a4f24e6ff4c7053 100644 --- a/pulsar/managers/base/__init__.py +++ b/pulsar/managers/base/__init__.py @@ -208,6 +208,13 @@ class JobDirectory(RemoteJobDirectory): # Assert this job id isn't hacking path somehow. assert job_id == basename(job_id) + def enable_metadata_directory(self): + self.store_metadata("use_metadata_directory", True) + + @property + def use_metadata_directory(self): + return self.has_metadata("use_metadata_directory") + def _job_file(self, name): return os.path.join(self.job_directory, name) @@ -289,6 +296,12 @@ class JobDirectory(RemoteJobDirectory): metadata_directory = self.metadata_directory() return self.__directory_contents(metadata_directory) + def metadata_directory(self): + if self.use_metadata_directory: + return super(JobDirectory, self).metadata_directory() + else: + return self.working_directory() + def __directory_contents(self, directory): contents = [] for path, _, files in walk(directory): diff --git a/pulsar/managers/base/directory.py b/pulsar/managers/base/directory.py index 3b57b97dfa1c5f7bb344a159619af60b8f4cec44..90308bef5c59c1ab9adb53b09aa1ffcf6486ed61 100644 --- a/pulsar/managers/base/directory.py +++ b/pulsar/managers/base/directory.py @@ -70,6 +70,9 @@ class DirectoryBaseManager(BaseManager): job_directory.store_metadata(JOB_FILE_TOOL_ID, tool_id) job_directory.store_metadata(JOB_FILE_TOOL_VERSION, tool_version) + def enable_metadata_directory(self, job_id): + self._job_directory(job_id).enable_metadata_directory() + def _record_cancel(self, job_id): try: self._job_directory(job_id).store_metadata(JOB_FILE_CANCELLED, True) @@ -118,6 +121,7 @@ class DirectoryBaseManager(BaseManager): 'env_setup_commands': env_setup_commands, 'exit_code_path': return_code_path, 'working_directory': self.job_directory(job_id).working_directory(), + 'metadata_directory': self.job_directory(job_id).metadata_directory(), 'job_id': job_id, } if command_line: diff --git a/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh b/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh index 5820de9e4a1fcdc05944b9d3d1d6fd3596b32472..0c533ce0c538d16b10400eaa9e64c68903cbab64 100644 --- a/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh +++ b/pulsar/managers/util/job_script/DEFAULT_JOB_FILE_TEMPLATE.sh @@ -1,12 +1,5 @@ -#!/bin/sh - -# The following block can be used by the job creation system -# to ensure this script is runnable before running it directly -# or submitting it to a cluster manager. -if [ -n "$ABC_TEST_JOB_SCRIPT_INTEGRITY_XYZ" ]; then - exit 42 -fi - +#!$shell +$integrity_injection $headers $slots_statement export GALAXY_SLOTS diff --git a/pulsar/managers/util/job_script/__init__.py b/pulsar/managers/util/job_script/__init__.py index 23725bef2a319f23524ce924b416c2f6d54b7795..4c81524203aa896a9941290bfacce846bdb1959d 100644 --- a/pulsar/managers/util/job_script/__init__.py +++ b/pulsar/managers/util/job_script/__init__.py @@ -1,7 +1,14 @@ +import os from string import Template +import subprocess +import time from pkg_resources import resource_string + +from six import text_type from galaxy.util import unicodify +DEFAULT_SHELL = '/bin/bash' + DEFAULT_JOB_FILE_TEMPLATE = Template( resource_string(__name__, 'DEFAULT_JOB_FILE_TEMPLATE.sh').decode('UTF-8') ) @@ -13,6 +20,19 @@ SLOTS_STATEMENT_SINGLE = """ GALAXY_SLOTS="1" """ +INTEGRITY_INJECTION = """ +# The following block can be used by the job system +# to ensure this script is runnable before actually attempting +# to run it. +if [ -n "$ABC_TEST_JOB_SCRIPT_INTEGRITY_XYZ" ]; then + exit 42 +fi +""" + +INTEGRITY_SYNC_COMMAND = "/bin/sync" +DEFAULT_INTEGRITY_CHECK = True +DEFAULT_INTEGRITY_COUNT = 35 +DEFAULT_INTEGRITY_SLEEP = .25 REQUIRED_TEMPLATE_PARAMS = ['working_directory', 'command', 'exit_code_path'] OPTIONAL_TEMPLATE_PARAMS = { 'galaxy_lib': None, @@ -22,6 +42,8 @@ OPTIONAL_TEMPLATE_PARAMS = { 'slots_statement': SLOTS_STATEMENT_CLUSTER_DEFAULT, 'instrument_pre_commands': '', 'instrument_post_commands': '', + 'integrity_injection': INTEGRITY_INJECTION, + 'shell': DEFAULT_SHELL, } @@ -40,10 +62,12 @@ def job_script(template=DEFAULT_JOB_FILE_TEMPLATE, **kwds): True >>> 'GALAXY_LIB="None"' in script True - >>> script.find('#PBS -test') > 0 + >>> script.startswith('#!/bin/bash\\n\\n# The following block can be used by the job system') + True + >>> 'PBS -test\\n' in script False - >>> script = job_script(working_directory='wd', command='uptime', exit_code_path='ec', headers='#PBS -test') - >>> script.find('#PBS -test') > 0 + >>> script = job_script(working_directory='wd', command='uptime', exit_code_path='ec', headers='#PBS -test', integrity_injection='') + >>> script.startswith('#!/bin/bash\\n\\n#PBS -test\\n') True >>> script = job_script(working_directory='wd', command='uptime', exit_code_path='ec', slots_statement='GALAXY_SLOTS="$SLURM_JOB_NUM_NODES"') >>> script.find('GALAXY_SLOTS="$SLURM_JOB_NUM_NODES"\\nexport GALAXY_SLOTS\\n') > 0 @@ -54,7 +78,7 @@ def job_script(template=DEFAULT_JOB_FILE_TEMPLATE, **kwds): job_instrumenter = kwds.get("job_instrumenter", None) if job_instrumenter: del kwds["job_instrumenter"] - working_directory = kwds["working_directory"] + working_directory = kwds.get("metadata_directory", kwds["working_directory"]) kwds["instrument_pre_commands"] = job_instrumenter.pre_execute_commands(working_directory) or '' kwds["instrument_post_commands"] = job_instrumenter.post_execute_commands(working_directory) or '' @@ -67,3 +91,60 @@ def job_script(template=DEFAULT_JOB_FILE_TEMPLATE, **kwds): if not isinstance(template, Template): template = Template(template) return template.safe_substitute(template_params) + + +def check_script_integrity(config): + return getattr(config, "check_job_script_integrity", DEFAULT_INTEGRITY_CHECK) + + +def write_script(path, contents, config, mode=0o755): + dir = os.path.dirname(path) + if not os.path.exists(dir): + os.makedirs(dir) + + with open(path, 'w') as f: + if isinstance(contents, text_type): + contents = contents.encode("UTF-8") + f.write(contents) + os.chmod(path, mode) + _handle_script_integrity(path, config) + + +def _handle_script_integrity(path, config): + if not check_script_integrity(config): + return + + script_integrity_verified = False + count = getattr(config, "check_job_script_integrity_count", DEFAULT_INTEGRITY_COUNT) + sleep_amt = getattr(config, "check_job_script_integrity_sleep", DEFAULT_INTEGRITY_SLEEP) + for i in range(count): + try: + proc = subprocess.Popen([path], shell=True, env={"ABC_TEST_JOB_SCRIPT_INTEGRITY_XYZ": "1"}) + proc.wait() + if proc.returncode == 42: + script_integrity_verified = True + break + + # Else we will sync and wait to see if the script becomes + # executable. + try: + # sync file system to avoid "Text file busy" problems. + # These have occurred both in Docker containers and on EC2 clusters + # under high load. + subprocess.check_call(INTEGRITY_SYNC_COMMAND) + except Exception: + pass + time.sleep(sleep_amt) + except Exception: + pass + + if not script_integrity_verified: + raise Exception("Failed to write job script, could not verify job script integrity.") + + +__all__ = [ + 'check_script_integrity', + 'job_script', + 'write_script', + 'INTEGRITY_INJECTION', +] diff --git a/pulsar/web/routes.py b/pulsar/web/routes.py index 45fb1ede07402f4c226b8e891320f8c53d9410dc..1f529608b86957c7f020f65f385a20f3bbabf290 100644 --- a/pulsar/web/routes.py +++ b/pulsar/web/routes.py @@ -43,12 +43,13 @@ class PulsarController(Controller): @PulsarController(path="/jobs", method="POST", response_type='json') -def setup(manager, job_id, tool_id=None, tool_version=None): - return __setup(manager, job_id, tool_id=tool_id, tool_version=tool_version) +def setup(manager, job_id, tool_id=None, tool_version=None, use_metadata='false'): + return __setup(manager, job_id, tool_id=tool_id, tool_version=tool_version, use_metadata=use_metadata) -def __setup(manager, job_id, tool_id, tool_version): - response = setup_job(manager, job_id, tool_id, tool_version) +def __setup(manager, job_id, tool_id, tool_version, use_metadata): + use_metadata = loads(use_metadata) + response = setup_job(manager, job_id, tool_id, tool_version, use_metadata) log.debug("Setup job with configuration: %s" % response) return response @@ -72,7 +73,7 @@ def submit(manager, job_id, command_line, params='{}', dependencies_description= submit_params=submit_params, dependencies_description=dependencies_description, env=env, - remote_staging=remote_staging + remote_staging=remote_staging, ) submit_job(manager, submit_config) diff --git a/test/client_staging_test.py b/test/client_staging_test.py index d68372cd68af8d5b3b4811df2be9b4170cefaa2e..60714100feeb0fde997b773a44cbecbbb4480dd9 100644 --- a/test/client_staging_test.py +++ b/test/client_staging_test.py @@ -149,7 +149,7 @@ class MockClient(object): def expect_put_paths(self, paths): self.put_paths = deque(paths) - def setup(self, tool_id, tool_version): + def setup(self, tool_id, tool_version, use_metadata=False): assert tool_id == self.expected_tool.id assert tool_version == self.expected_tool.version return {} diff --git a/test/client_test.py b/test/client_test.py index bf72339db128c8de669dc575dd0e4e85d31e3128..1c1b90fabdd98cfdf87235a0aaba3e98589676d7 100644 --- a/test/client_test.py +++ b/test/client_test.py @@ -106,7 +106,7 @@ class RequestChecker(object): def test_setup(): """ Test the setup method of Client """ client = TestClient() - request_checker = RequestChecker("jobs") + request_checker = RequestChecker("jobs", {"use_metadata": "true"}) response_json = b'{"working_directory":"C:\\\\home\\\\dir","outputs_directory" : "C:\\\\outputs","path_separator" : "\\\\"}' client.expect_open(request_checker, response_json) setup_response = client.setup() diff --git a/tox.ini b/tox.ini index 48e9df037ec6aeca15988ab9b60fa17fa00d93d8..0b5a9c14302bd6424dff321fb1901f92f4ecb884 100644 --- a/tox.ini +++ b/tox.ini @@ -1,5 +1,5 @@ [tox] -envlist = py27-lint, py27-lint-readme, lint-docs, py34-lint, py27, py34, py34-unit +envlist = py27-lint, py27-lint-readme, lint-docs, py34-lint, py27, py34, py27-unit, py34-unit toxworkdir={env:TOX_WORK_DIR:.tox} [testenv] @@ -10,6 +10,9 @@ deps = drmaa passenv = DRMAA_LIBRARY_PATH +[testenv:py27-unit] +commands = nosetests --exclude '.*integration.*' [] + [testenv:py34] deps = -rrequirements3.txt