diff --git a/pulsar/__init__.py b/pulsar/__init__.py index 6b7133f657e0cbc3e4e82d272ece069b823d26f2..4e8877cbce71cdc5a18465e5b9b68e7b385e34ae 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- -__version__ = '0.7.0.dev2' +__version__ = '0.7.0.dev3' PROJECT_NAME = "pulsar" PROJECT_OWNER = PROJECT_USERAME = "galaxyproject" diff --git a/pulsar/client/client.py b/pulsar/client/client.py index 32e3605d375826e32edc8d197c8cde4eae5bd3f3..4624776b9997cf2030b929fb34644f8949428df6 100644 --- a/pulsar/client/client.py +++ b/pulsar/client/client.py @@ -118,12 +118,13 @@ class JobClient(BaseJobClient): launch_params['env'] = json_dumps(env) if remote_staging: launch_params['remote_staging'] = json_dumps(remote_staging) + if job_config and self.setup_handler.local: # Setup not yet called, job properties were inferred from # destination arguments. Hence, must have Pulsar setup job # before queueing. setup_params = _setup_params_from_job_config(job_config) - launch_params["setup_params"] = json_dumps(setup_params) + launch_params['setup_params'] = json_dumps(setup_params) return self._raw_execute("submit", launch_params) def full_status(self): @@ -305,7 +306,7 @@ class MessageJobClient(BaseMessageJobClient): dependencies_description=dependencies_description, env=env, remote_staging=remote_staging, - job_config=job_config + job_config=job_config, ) response = self.client_manager.exchange.publish("setup", launch_params) log.info("Job published to setup message queue.") @@ -330,7 +331,7 @@ class MessageCLIJobClient(BaseMessageJobClient): dependencies_description=dependencies_description, env=env, remote_staging=remote_staging, - job_config=job_config + job_config=job_config, ) base64_message = to_base64_json(launch_params) submit_command = os.path.join(self.remote_pulsar_path, "scripts", "submit.bash") @@ -392,5 +393,6 @@ def _setup_params_from_job_config(job_config): return dict( job_id=job_id, tool_id=tool_id, - tool_version=tool_version + tool_version=tool_version, + use_metadata=True, ) diff --git a/pulsar/client/setup_handler.py b/pulsar/client/setup_handler.py index 007421700d8e43ffe8c6ce0d5184b0d1e5c98f81..368deb8b9d00f73965a20f8085ebe3822453871c 100644 --- a/pulsar/client/setup_handler.py +++ b/pulsar/client/setup_handler.py @@ -64,6 +64,7 @@ class RemoteSetupHandler(object): self.client = client def setup(self, **setup_args): + setup_args["use_metadata"] = "true" return self.client.remote_setup(**setup_args) @property @@ -78,6 +79,7 @@ def build_job_config(job_id, job_directory, system_properties={}, tool_id=None, """ inputs_directory = job_directory.inputs_directory() working_directory = job_directory.working_directory() + metadata_directory = job_directory.metadata_directory() outputs_directory = job_directory.outputs_directory() configs_directory = job_directory.configs_directory() tools_directory = job_directory.tool_files_directory() @@ -86,6 +88,7 @@ def build_job_config(job_id, job_directory, system_properties={}, tool_id=None, job_config = { "job_directory": job_directory.path, "working_directory": working_directory, + "metadata_directory": metadata_directory, "outputs_directory": outputs_directory, "configs_directory": configs_directory, "tools_directory": tools_directory, diff --git a/pulsar/core.py b/pulsar/core.py index ff6b484745e1eadfaed5abcfd84ab6bdc9dc91be..e4547d27ec83749b5b9b9fd1eb43ad8506921f35 100644 --- a/pulsar/core.py +++ b/pulsar/core.py @@ -127,8 +127,11 @@ class PulsarApp(object): self.dependency_manager = DependencyManager(dependencies_dir, resolvers_config_file) def __setup_job_metrics(self, conf): - job_metrics_config_file = conf.get("job_metrics_config_file", "job_metrics_conf.xml") - self.job_metrics = JobMetrics(job_metrics_config_file) + job_metrics = conf.get("job_metrics", None) + if job_metrics is None: + job_metrics_config_file = conf.get("job_metrics_config_file", "job_metrics_conf.xml") + job_metrics = JobMetrics(job_metrics_config_file) + self.job_metrics = job_metrics @property def only_manager(self): diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py index b0df90eaf089924b3bc232d87d3943f056c5746e..f0baadc759309517394ca93b25ff27052c476eeb 100644 --- a/pulsar/manager_endpoint_util.py +++ b/pulsar/manager_endpoint_util.py @@ -1,12 +1,17 @@ """ Composite actions over managers shared between HTTP endpoint (routes.py) and message queue. """ + +import logging +import os + from pulsar import __version__ as pulsar_version from pulsar.client.setup_handler import build_job_config from pulsar.managers import status from pulsar.managers import PULSAR_UNKNOWN_RETURN_CODE from galaxy.tools.deps import dependencies -import os + +log = logging.getLogger(__name__) def status_dict(manager, job_id): @@ -40,6 +45,7 @@ def __job_complete_dict(complete_status, manager, job_id): stdout=stdout_contents, stderr=stderr_contents, working_directory=job_directory.working_directory(), + metadata_directory=job_directory.metadata_directory(), working_directory_contents=job_directory.working_directory_contents(), metadata_directory_contents=job_directory.metadata_directory_contents(), outputs_directory_contents=job_directory.outputs_directory_contents(), @@ -63,13 +69,19 @@ def submit_job(manager, job_config): dependencies_description = job_config.get('dependencies_description', None) env = job_config.get('env', []) submit_params = job_config.get('submit_params', {}) - job_config = None if setup_params or force_setup: input_job_id = setup_params.get("job_id", job_id) tool_id = setup_params.get("tool_id", None) tool_version = setup_params.get("tool_version", None) - job_config = setup_job(manager, input_job_id, tool_id, tool_version) + use_metadata = setup_params.get("use_metadata", False) + job_config = setup_job( + manager, + input_job_id, + tool_id, + tool_version, + use_metadata + ) if job_config is not None: job_directory = job_config["job_directory"] @@ -85,20 +97,21 @@ def submit_job(manager, job_config): command_line, submit_params, dependencies_description=dependencies_description, - env=env + env=env, ) -def setup_job(manager, job_id, tool_id, tool_version): +def setup_job(manager, job_id, tool_id, tool_version, use_metadata=False): """ Setup new job from these inputs and return dict summarizing state (used to configure command line). """ job_id = manager.setup_job(job_id, tool_id, tool_version) + if use_metadata: + manager.enable_metadata_directory(job_id) return build_job_config( job_id=job_id, job_directory=manager.job_directory(job_id), system_properties=manager.system_properties(), tool_id=tool_id, - tool_version=tool_version, - pulsar_version=pulsar_version, + tool_version=tool_version ) diff --git a/pulsar/manager_factory.py b/pulsar/manager_factory.py index fdd68ddfca2579a5b7b86b6898ddf8690df5147d..7c2ccf31b1edc7d5d390fa1916ec7eded13185bd 100644 --- a/pulsar/manager_factory.py +++ b/pulsar/manager_factory.py @@ -63,8 +63,9 @@ def _populate_manager_descriptions_from_ini(manager_descriptions, job_managers_c def _get_default_options(conf): options = {} - if "assign_ids" in conf: - options["assign_ids"] = conf["assign_ids"] + for simple_key in ["assign_ids", "galaxy_home"]: + if simple_key in conf: + options[simple_key] = conf[simple_key] options["debug"] = conf.get("debug", False) # mode to create job directories with, if None just use # default (usually 0777 with umask applied). diff --git a/pulsar/managers/__init__.py b/pulsar/managers/__init__.py index c7a05926554d8752d6c6b8ca13b82281a25bd503..eb088171133525c7ab4cf0f1aa83cde7cf5fca3f 100644 --- a/pulsar/managers/__init__.py +++ b/pulsar/managers/__init__.py @@ -107,6 +107,9 @@ class ManagerProxy(object): def kill(self, *args, **kwargs): return self._proxied_manager.kill(*args, **kwargs) + def enable_metadata_directory(self, *args, **kwargs): + return self._proxied_manager.enable_metadata_directory(*args, **kwargs) + def shutdown(self, timeout=None): """ Optional. """ try: diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py index a1365d0108e90904ef07337a1611521402398177..738dc3d0721f540c66e7db726a4f24e6ff4c7053 100644 --- a/pulsar/managers/base/__init__.py +++ b/pulsar/managers/base/__init__.py @@ -208,6 +208,13 @@ class JobDirectory(RemoteJobDirectory): # Assert this job id isn't hacking path somehow. assert job_id == basename(job_id) + def enable_metadata_directory(self): + self.store_metadata("use_metadata_directory", True) + + @property + def use_metadata_directory(self): + return self.has_metadata("use_metadata_directory") + def _job_file(self, name): return os.path.join(self.job_directory, name) @@ -289,6 +296,12 @@ class JobDirectory(RemoteJobDirectory): metadata_directory = self.metadata_directory() return self.__directory_contents(metadata_directory) + def metadata_directory(self): + if self.use_metadata_directory: + return super(JobDirectory, self).metadata_directory() + else: + return self.working_directory() + def __directory_contents(self, directory): contents = [] for path, _, files in walk(directory): diff --git a/pulsar/managers/base/directory.py b/pulsar/managers/base/directory.py index 3b57b97dfa1c5f7bb344a159619af60b8f4cec44..90308bef5c59c1ab9adb53b09aa1ffcf6486ed61 100644 --- a/pulsar/managers/base/directory.py +++ b/pulsar/managers/base/directory.py @@ -70,6 +70,9 @@ class DirectoryBaseManager(BaseManager): job_directory.store_metadata(JOB_FILE_TOOL_ID, tool_id) job_directory.store_metadata(JOB_FILE_TOOL_VERSION, tool_version) + def enable_metadata_directory(self, job_id): + self._job_directory(job_id).enable_metadata_directory() + def _record_cancel(self, job_id): try: self._job_directory(job_id).store_metadata(JOB_FILE_CANCELLED, True) @@ -118,6 +121,7 @@ class DirectoryBaseManager(BaseManager): 'env_setup_commands': env_setup_commands, 'exit_code_path': return_code_path, 'working_directory': self.job_directory(job_id).working_directory(), + 'metadata_directory': self.job_directory(job_id).metadata_directory(), 'job_id': job_id, } if command_line: diff --git a/pulsar/managers/util/job_script/__init__.py b/pulsar/managers/util/job_script/__init__.py index fffd29fe150d416ff8780f8f1695dc5ce825e93f..4c81524203aa896a9941290bfacce846bdb1959d 100644 --- a/pulsar/managers/util/job_script/__init__.py +++ b/pulsar/managers/util/job_script/__init__.py @@ -78,7 +78,7 @@ def job_script(template=DEFAULT_JOB_FILE_TEMPLATE, **kwds): job_instrumenter = kwds.get("job_instrumenter", None) if job_instrumenter: del kwds["job_instrumenter"] - working_directory = kwds["working_directory"] + working_directory = kwds.get("metadata_directory", kwds["working_directory"]) kwds["instrument_pre_commands"] = job_instrumenter.pre_execute_commands(working_directory) or '' kwds["instrument_post_commands"] = job_instrumenter.post_execute_commands(working_directory) or '' diff --git a/pulsar/web/routes.py b/pulsar/web/routes.py index 45fb1ede07402f4c226b8e891320f8c53d9410dc..1f529608b86957c7f020f65f385a20f3bbabf290 100644 --- a/pulsar/web/routes.py +++ b/pulsar/web/routes.py @@ -43,12 +43,13 @@ class PulsarController(Controller): @PulsarController(path="/jobs", method="POST", response_type='json') -def setup(manager, job_id, tool_id=None, tool_version=None): - return __setup(manager, job_id, tool_id=tool_id, tool_version=tool_version) +def setup(manager, job_id, tool_id=None, tool_version=None, use_metadata='false'): + return __setup(manager, job_id, tool_id=tool_id, tool_version=tool_version, use_metadata=use_metadata) -def __setup(manager, job_id, tool_id, tool_version): - response = setup_job(manager, job_id, tool_id, tool_version) +def __setup(manager, job_id, tool_id, tool_version, use_metadata): + use_metadata = loads(use_metadata) + response = setup_job(manager, job_id, tool_id, tool_version, use_metadata) log.debug("Setup job with configuration: %s" % response) return response @@ -72,7 +73,7 @@ def submit(manager, job_id, command_line, params='{}', dependencies_description= submit_params=submit_params, dependencies_description=dependencies_description, env=env, - remote_staging=remote_staging + remote_staging=remote_staging, ) submit_job(manager, submit_config) diff --git a/test/client_staging_test.py b/test/client_staging_test.py index d68372cd68af8d5b3b4811df2be9b4170cefaa2e..60714100feeb0fde997b773a44cbecbbb4480dd9 100644 --- a/test/client_staging_test.py +++ b/test/client_staging_test.py @@ -149,7 +149,7 @@ class MockClient(object): def expect_put_paths(self, paths): self.put_paths = deque(paths) - def setup(self, tool_id, tool_version): + def setup(self, tool_id, tool_version, use_metadata=False): assert tool_id == self.expected_tool.id assert tool_version == self.expected_tool.version return {} diff --git a/test/client_test.py b/test/client_test.py index bf72339db128c8de669dc575dd0e4e85d31e3128..1c1b90fabdd98cfdf87235a0aaba3e98589676d7 100644 --- a/test/client_test.py +++ b/test/client_test.py @@ -106,7 +106,7 @@ class RequestChecker(object): def test_setup(): """ Test the setup method of Client """ client = TestClient() - request_checker = RequestChecker("jobs") + request_checker = RequestChecker("jobs", {"use_metadata": "true"}) response_json = b'{"working_directory":"C:\\\\home\\\\dir","outputs_directory" : "C:\\\\outputs","path_separator" : "\\\\"}' client.expect_open(request_checker, response_json) setup_response = client.setup()