From 29b8a0f699dd1afa4355e02f5b8b83cf8bd0490d Mon Sep 17 00:00:00 2001
From: John Chilton <jmchilton@gmail.com>
Date: Sun, 10 Apr 2016 19:41:47 -0400
Subject: [PATCH] New backward-compatible meta directory handling.

Pulsar has to be a little smarter than I assumed in 09f7ec3ede3c70c9c2a854214748c1c73f747aa0 because it writes out job metrics on the server.
---
 pulsar/__init__.py                          |  2 +-
 pulsar/client/client.py                     | 10 +++++---
 pulsar/client/setup_handler.py              |  3 +++
 pulsar/core.py                              |  7 ++++--
 pulsar/manager_endpoint_util.py             | 27 +++++++++++++++------
 pulsar/manager_factory.py                   |  5 ++--
 pulsar/managers/__init__.py                 |  3 +++
 pulsar/managers/base/__init__.py            | 13 ++++++++++
 pulsar/managers/base/directory.py           |  4 +++
 pulsar/managers/util/job_script/__init__.py |  2 +-
 pulsar/web/routes.py                        | 11 +++++----
 test/client_staging_test.py                 |  2 +-
 test/client_test.py                         |  2 +-
 13 files changed, 67 insertions(+), 24 deletions(-)

diff --git a/pulsar/__init__.py b/pulsar/__init__.py
index 6b7133f6..4e8877cb 100644
--- a/pulsar/__init__.py
+++ b/pulsar/__init__.py
@@ -1,6 +1,6 @@
 # -*- coding: utf-8 -*-
 
-__version__ = '0.7.0.dev2'
+__version__ = '0.7.0.dev3'
 
 PROJECT_NAME = "pulsar"
 PROJECT_OWNER = PROJECT_USERAME = "galaxyproject"
diff --git a/pulsar/client/client.py b/pulsar/client/client.py
index 32e3605d..4624776b 100644
--- a/pulsar/client/client.py
+++ b/pulsar/client/client.py
@@ -118,12 +118,13 @@ class JobClient(BaseJobClient):
             launch_params['env'] = json_dumps(env)
         if remote_staging:
             launch_params['remote_staging'] = json_dumps(remote_staging)
+
         if job_config and self.setup_handler.local:
             # Setup not yet called, job properties were inferred from
             # destination arguments. Hence, must have Pulsar setup job
             # before queueing.
             setup_params = _setup_params_from_job_config(job_config)
-            launch_params["setup_params"] = json_dumps(setup_params)
+            launch_params['setup_params'] = json_dumps(setup_params)
         return self._raw_execute("submit", launch_params)
 
     def full_status(self):
@@ -305,7 +306,7 @@ class MessageJobClient(BaseMessageJobClient):
             dependencies_description=dependencies_description,
             env=env,
             remote_staging=remote_staging,
-            job_config=job_config
+            job_config=job_config,
         )
         response = self.client_manager.exchange.publish("setup", launch_params)
         log.info("Job published to setup message queue.")
@@ -330,7 +331,7 @@ class MessageCLIJobClient(BaseMessageJobClient):
             dependencies_description=dependencies_description,
             env=env,
             remote_staging=remote_staging,
-            job_config=job_config
+            job_config=job_config,
         )
         base64_message = to_base64_json(launch_params)
         submit_command = os.path.join(self.remote_pulsar_path, "scripts", "submit.bash")
@@ -392,5 +393,6 @@ def _setup_params_from_job_config(job_config):
     return dict(
         job_id=job_id,
         tool_id=tool_id,
-        tool_version=tool_version
+        tool_version=tool_version,
+        use_metadata=True,
     )
diff --git a/pulsar/client/setup_handler.py b/pulsar/client/setup_handler.py
index 00742170..368deb8b 100644
--- a/pulsar/client/setup_handler.py
+++ b/pulsar/client/setup_handler.py
@@ -64,6 +64,7 @@ class RemoteSetupHandler(object):
         self.client = client
 
     def setup(self, **setup_args):
+        setup_args["use_metadata"] = "true"
         return self.client.remote_setup(**setup_args)
 
     @property
@@ -78,6 +79,7 @@ def build_job_config(job_id, job_directory, system_properties={}, tool_id=None,
     """
     inputs_directory = job_directory.inputs_directory()
     working_directory = job_directory.working_directory()
+    metadata_directory = job_directory.metadata_directory()
     outputs_directory = job_directory.outputs_directory()
     configs_directory = job_directory.configs_directory()
     tools_directory = job_directory.tool_files_directory()
@@ -86,6 +88,7 @@ def build_job_config(job_id, job_directory, system_properties={}, tool_id=None,
     job_config = {
         "job_directory": job_directory.path,
         "working_directory": working_directory,
+        "metadata_directory": metadata_directory,
         "outputs_directory": outputs_directory,
         "configs_directory": configs_directory,
         "tools_directory": tools_directory,
diff --git a/pulsar/core.py b/pulsar/core.py
index ff6b4847..e4547d27 100644
--- a/pulsar/core.py
+++ b/pulsar/core.py
@@ -127,8 +127,11 @@ class PulsarApp(object):
         self.dependency_manager = DependencyManager(dependencies_dir, resolvers_config_file)
 
     def __setup_job_metrics(self, conf):
-        job_metrics_config_file = conf.get("job_metrics_config_file", "job_metrics_conf.xml")
-        self.job_metrics = JobMetrics(job_metrics_config_file)
+        job_metrics = conf.get("job_metrics", None)
+        if job_metrics is None:
+            job_metrics_config_file = conf.get("job_metrics_config_file", "job_metrics_conf.xml")
+            job_metrics = JobMetrics(job_metrics_config_file)
+        self.job_metrics = job_metrics
 
     @property
     def only_manager(self):
diff --git a/pulsar/manager_endpoint_util.py b/pulsar/manager_endpoint_util.py
index b0df90ea..f0baadc7 100644
--- a/pulsar/manager_endpoint_util.py
+++ b/pulsar/manager_endpoint_util.py
@@ -1,12 +1,17 @@
 """ Composite actions over managers shared between HTTP endpoint (routes.py)
 and message queue.
 """
+
+import logging
+import os
+
 from pulsar import __version__ as pulsar_version
 from pulsar.client.setup_handler import build_job_config
 from pulsar.managers import status
 from pulsar.managers import PULSAR_UNKNOWN_RETURN_CODE
 from galaxy.tools.deps import dependencies
-import os
+
+log = logging.getLogger(__name__)
 
 
 def status_dict(manager, job_id):
@@ -40,6 +45,7 @@ def __job_complete_dict(complete_status, manager, job_id):
         stdout=stdout_contents,
         stderr=stderr_contents,
         working_directory=job_directory.working_directory(),
+        metadata_directory=job_directory.metadata_directory(),
         working_directory_contents=job_directory.working_directory_contents(),
         metadata_directory_contents=job_directory.metadata_directory_contents(),
         outputs_directory_contents=job_directory.outputs_directory_contents(),
@@ -63,13 +69,19 @@ def submit_job(manager, job_config):
     dependencies_description = job_config.get('dependencies_description', None)
     env = job_config.get('env', [])
     submit_params = job_config.get('submit_params', {})
-
     job_config = None
     if setup_params or force_setup:
         input_job_id = setup_params.get("job_id", job_id)
         tool_id = setup_params.get("tool_id", None)
         tool_version = setup_params.get("tool_version", None)
-        job_config = setup_job(manager, input_job_id, tool_id, tool_version)
+        use_metadata = setup_params.get("use_metadata", False)
+        job_config = setup_job(
+            manager,
+            input_job_id,
+            tool_id,
+            tool_version,
+            use_metadata
+        )
 
     if job_config is not None:
         job_directory = job_config["job_directory"]
@@ -85,20 +97,21 @@ def submit_job(manager, job_config):
         command_line,
         submit_params,
         dependencies_description=dependencies_description,
-        env=env
+        env=env,
     )
 
 
-def setup_job(manager, job_id, tool_id, tool_version):
+def setup_job(manager, job_id, tool_id, tool_version, use_metadata=False):
     """ Setup new job from these inputs and return dict summarizing state
     (used to configure command line).
     """
     job_id = manager.setup_job(job_id, tool_id, tool_version)
+    if use_metadata:
+        manager.enable_metadata_directory(job_id)
     return build_job_config(
         job_id=job_id,
         job_directory=manager.job_directory(job_id),
         system_properties=manager.system_properties(),
         tool_id=tool_id,
-        tool_version=tool_version,
-        pulsar_version=pulsar_version,
+        tool_version=tool_version
     )
diff --git a/pulsar/manager_factory.py b/pulsar/manager_factory.py
index fdd68ddf..7c2ccf31 100644
--- a/pulsar/manager_factory.py
+++ b/pulsar/manager_factory.py
@@ -63,8 +63,9 @@ def _populate_manager_descriptions_from_ini(manager_descriptions, job_managers_c
 
 def _get_default_options(conf):
     options = {}
-    if "assign_ids" in conf:
-        options["assign_ids"] = conf["assign_ids"]
+    for simple_key in ["assign_ids", "galaxy_home"]:
+        if simple_key in conf:
+            options[simple_key] = conf[simple_key]
     options["debug"] = conf.get("debug", False)
     # mode to create job directories with, if None just use
     # default (usually 0777 with umask applied).
diff --git a/pulsar/managers/__init__.py b/pulsar/managers/__init__.py
index c7a05926..eb088171 100644
--- a/pulsar/managers/__init__.py
+++ b/pulsar/managers/__init__.py
@@ -107,6 +107,9 @@ class ManagerProxy(object):
     def kill(self, *args, **kwargs):
         return self._proxied_manager.kill(*args, **kwargs)
 
+    def enable_metadata_directory(self, *args, **kwargs):
+        return self._proxied_manager.enable_metadata_directory(*args, **kwargs)
+
     def shutdown(self, timeout=None):
         """ Optional. """
         try:
diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py
index a1365d01..738dc3d0 100644
--- a/pulsar/managers/base/__init__.py
+++ b/pulsar/managers/base/__init__.py
@@ -208,6 +208,13 @@ class JobDirectory(RemoteJobDirectory):
         # Assert this job id isn't hacking path somehow.
         assert job_id == basename(job_id)
 
+    def enable_metadata_directory(self):
+        self.store_metadata("use_metadata_directory", True)
+
+    @property
+    def use_metadata_directory(self):
+        return self.has_metadata("use_metadata_directory")
+
     def _job_file(self, name):
         return os.path.join(self.job_directory, name)
 
@@ -289,6 +296,12 @@ class JobDirectory(RemoteJobDirectory):
         metadata_directory = self.metadata_directory()
         return self.__directory_contents(metadata_directory)
 
+    def metadata_directory(self):
+        if self.use_metadata_directory:
+            return super(JobDirectory, self).metadata_directory()
+        else:
+            return self.working_directory()
+
     def __directory_contents(self, directory):
         contents = []
         for path, _, files in walk(directory):
diff --git a/pulsar/managers/base/directory.py b/pulsar/managers/base/directory.py
index 3b57b97d..90308bef 100644
--- a/pulsar/managers/base/directory.py
+++ b/pulsar/managers/base/directory.py
@@ -70,6 +70,9 @@ class DirectoryBaseManager(BaseManager):
         job_directory.store_metadata(JOB_FILE_TOOL_ID, tool_id)
         job_directory.store_metadata(JOB_FILE_TOOL_VERSION, tool_version)
 
+    def enable_metadata_directory(self, job_id):
+        self._job_directory(job_id).enable_metadata_directory()
+
     def _record_cancel(self, job_id):
         try:
             self._job_directory(job_id).store_metadata(JOB_FILE_CANCELLED, True)
@@ -118,6 +121,7 @@ class DirectoryBaseManager(BaseManager):
             'env_setup_commands': env_setup_commands,
             'exit_code_path': return_code_path,
             'working_directory': self.job_directory(job_id).working_directory(),
+            'metadata_directory': self.job_directory(job_id).metadata_directory(),
             'job_id': job_id,
         }
         if command_line:
diff --git a/pulsar/managers/util/job_script/__init__.py b/pulsar/managers/util/job_script/__init__.py
index fffd29fe..4c815242 100644
--- a/pulsar/managers/util/job_script/__init__.py
+++ b/pulsar/managers/util/job_script/__init__.py
@@ -78,7 +78,7 @@ def job_script(template=DEFAULT_JOB_FILE_TEMPLATE, **kwds):
     job_instrumenter = kwds.get("job_instrumenter", None)
     if job_instrumenter:
         del kwds["job_instrumenter"]
-        working_directory = kwds["working_directory"]
+        working_directory = kwds.get("metadata_directory", kwds["working_directory"])
         kwds["instrument_pre_commands"] = job_instrumenter.pre_execute_commands(working_directory) or ''
         kwds["instrument_post_commands"] = job_instrumenter.post_execute_commands(working_directory) or ''
 
diff --git a/pulsar/web/routes.py b/pulsar/web/routes.py
index 45fb1ede..1f529608 100644
--- a/pulsar/web/routes.py
+++ b/pulsar/web/routes.py
@@ -43,12 +43,13 @@ class PulsarController(Controller):
 
 
 @PulsarController(path="/jobs", method="POST", response_type='json')
-def setup(manager, job_id, tool_id=None, tool_version=None):
-    return __setup(manager, job_id, tool_id=tool_id, tool_version=tool_version)
+def setup(manager, job_id, tool_id=None, tool_version=None, use_metadata='false'):
+    return __setup(manager, job_id, tool_id=tool_id, tool_version=tool_version, use_metadata=use_metadata)
 
 
-def __setup(manager, job_id, tool_id, tool_version):
-    response = setup_job(manager, job_id, tool_id, tool_version)
+def __setup(manager, job_id, tool_id, tool_version, use_metadata):
+    use_metadata = loads(use_metadata)
+    response = setup_job(manager, job_id, tool_id, tool_version, use_metadata)
     log.debug("Setup job with configuration: %s" % response)
     return response
 
@@ -72,7 +73,7 @@ def submit(manager, job_id, command_line, params='{}', dependencies_description=
         submit_params=submit_params,
         dependencies_description=dependencies_description,
         env=env,
-        remote_staging=remote_staging
+        remote_staging=remote_staging,
     )
     submit_job(manager, submit_config)
 
diff --git a/test/client_staging_test.py b/test/client_staging_test.py
index d68372cd..60714100 100644
--- a/test/client_staging_test.py
+++ b/test/client_staging_test.py
@@ -149,7 +149,7 @@ class MockClient(object):
     def expect_put_paths(self, paths):
         self.put_paths = deque(paths)
 
-    def setup(self, tool_id, tool_version):
+    def setup(self, tool_id, tool_version, use_metadata=False):
         assert tool_id == self.expected_tool.id
         assert tool_version == self.expected_tool.version
         return {}
diff --git a/test/client_test.py b/test/client_test.py
index bf72339d..1c1b90fa 100644
--- a/test/client_test.py
+++ b/test/client_test.py
@@ -106,7 +106,7 @@ class RequestChecker(object):
 def test_setup():
     """ Test the setup method of Client """
     client = TestClient()
-    request_checker = RequestChecker("jobs")
+    request_checker = RequestChecker("jobs", {"use_metadata": "true"})
     response_json = b'{"working_directory":"C:\\\\home\\\\dir","outputs_directory" : "C:\\\\outputs","path_separator" : "\\\\"}'
     client.expect_open(request_checker, response_json)
     setup_response = client.setup()
-- 
GitLab