From d130b11b095f8354e9b0e81d488e63355d58d57a Mon Sep 17 00:00:00 2001
From: John Chilton <jmchilton@gmail.com>
Date: Tue, 4 Jun 2019 09:55:31 -0400
Subject: [PATCH] Allow remote_object_store_copy file action.

Copy input files in from a configured object store.
---
 pulsar/client/action_mapper.py    | 40 +++++++++++++++++++++++++++++--
 pulsar/client/staging/__init__.py |  5 ++--
 pulsar/core.py                    | 16 +++++++++----
 pulsar/managers/__init__.py       |  4 ++++
 pulsar/managers/base/__init__.py  |  1 +
 pulsar/managers/staging/pre.py    |  4 +++-
 pulsar/managers/stateful.py       |  3 ++-
 test/persistence_test.py          |  1 +
 test/test_utils.py                |  3 ++-
 9 files changed, 66 insertions(+), 11 deletions(-)

diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py
index 47c75289..469581ad 100644
--- a/pulsar/client/action_mapper.py
+++ b/pulsar/client/action_mapper.py
@@ -462,6 +462,33 @@ class RemoteTransferAction(BaseAction):
         post_file(self.url, pulsar_path)
 
 
+class RemoteObjectStoreCopyAction(BaseAction):
+    """
+    """
+    action_type = "remote_object_store_copy"
+    staging = STAGING_ACTION_REMOTE
+    inject_object_store = True
+
+    @classmethod
+    def from_dict(cls, action_dict):
+        return RemoteObjectStoreCopyAction(source=action_dict["source"])
+
+    def write_to_path(self, path):
+        assert self.object_store  # Make sure object_store attribute injected
+        assert "object_store_ref" in self.source
+        object_store_ref = self.source["object_store_ref"]
+        dataset_object = Bunch(
+            id=object_store_ref["dataset_id"],
+            uuid=object_store_ref["dataset_uuid"],
+            object_store_id=object_store_ref["object_store_id"],
+        )
+        filename = self.object_store.get_filename(dataset_object)
+        copy_to_path(open(filename, 'rb'), path)
+
+    def write_from_path(self, pulsar_path):
+        raise NotImplementedError("Writing raw files to object store not supported at this time.")
+
+
 class PubkeyAuthenticatedTransferAction(BaseAction):
     """Base class for file transfers requiring an SSH public/private key
     """
@@ -585,7 +612,14 @@ class MessageAction(object):
         open(path, "w").write(self.contents)
 
 
-DICTIFIABLE_ACTION_CLASSES = [RemoteCopyAction, RemoteTransferAction, MessageAction, RsyncTransferAction, ScpTransferAction]
+DICTIFIABLE_ACTION_CLASSES = [
+    RemoteCopyAction,
+    RemoteTransferAction,
+    MessageAction,
+    RsyncTransferAction,
+    ScpTransferAction,
+    RemoteObjectStoreCopyAction
+]
 
 
 def from_dict(action_dict):
@@ -632,7 +666,8 @@ class BasePathMapper(object):
 
     def matches(self, path, path_type):
         path_type_matches = path_type in self.path_types
-        return path_type_matches and self._path_matches(path)
+        rval = path_type_matches and self._path_matches(path)
+        return rval
 
     def _extend_base_dict(self, **kwds):
         base_dict = dict(
@@ -761,6 +796,7 @@ ACTION_CLASSES = [
     CopyAction,
     RemoteCopyAction,
     RemoteTransferAction,
+    RemoteObjectStoreCopyAction,
     RsyncTransferAction,
     ScpTransferAction,
 ]
diff --git a/pulsar/client/staging/__init__.py b/pulsar/client/staging/__init__.py
index a141968f..86cb2158 100644
--- a/pulsar/client/staging/__init__.py
+++ b/pulsar/client/staging/__init__.py
@@ -155,13 +155,14 @@ CLIENT_INPUT_PATH_TYPES = Bunch(
 
 class ClientInput(object):
 
-    def __init__(self, path, input_type):
+    def __init__(self, path, input_type, object_store_ref=None):
         self.path = path
         self.input_type = input_type
+        self.object_store_ref = object_store_ref
 
     @property
     def action_source(self):
-        return {"path": self.path}
+        return {"path": self.path, "object_store_ref": self.object_store_ref}
 
 
 class ClientOutputs(object):
diff --git a/pulsar/core.py b/pulsar/core.py
index ee8f4f7d..2f392d49 100644
--- a/pulsar/core.py
+++ b/pulsar/core.py
@@ -118,18 +118,26 @@ class PulsarApp(object):
         self.file_cache = Cache(file_cache_dir) if file_cache_dir else None
 
     def __setup_object_store(self, conf):
-        if "object_store_config_file" not in conf:
+        if "object_store_config_file" not in conf and "object_store_config" not in conf:
             self.object_store = None
             return
-        object_store_config = Bunch(
-            object_store_config_file=conf['object_store_config_file'],
+
+        config_obj_kwds = dict(
             file_path=conf.get("object_store_file_path", None),
             object_store_check_old_style=False,
             job_working_directory=conf.get("object_store_job_working_directory", None),
             new_file_path=conf.get("object_store_new_file_path", tempdir),
             umask=int(conf.get("object_store_umask", "0000")),
+            jobs_directory=None,
         )
-        self.object_store = build_object_store_from_config(object_store_config)
+        config_dict = None
+        if conf.get("object_store_config_file"):
+            config_obj_kwds["object_store_config_file"] = conf['object_store_config_file']
+        else:
+            config_dict = conf["object_store_config"]
+
+        object_store_config = Bunch(**config_obj_kwds)
+        self.object_store = build_object_store_from_config(object_store_config, config_dict=config_dict)
 
     def __setup_dependency_manager(self, conf):
         dependencies_dir = conf.get("tool_dependency_dir", "dependencies")
diff --git a/pulsar/managers/__init__.py b/pulsar/managers/__init__.py
index 68bdc02b..a9aa741b 100644
--- a/pulsar/managers/__init__.py
+++ b/pulsar/managers/__init__.py
@@ -124,5 +124,9 @@ class ManagerProxy(object):
     def system_properties(self):
         return self._proxied_manager.system_properties()
 
+    @property
+    def object_store(self):
+        return self._proxied_manager.object_store
+
     def __str__(self):
         return "ManagerProxy[manager=%s]" % str(self._proxied_manager)
diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py
index 30a6485a..c924fb9c 100644
--- a/pulsar/managers/base/__init__.py
+++ b/pulsar/managers/base/__init__.py
@@ -70,6 +70,7 @@ class BaseManager(ManagerInterface):
         self.__init_env_vars(**kwds)
         self.dependency_manager = app.dependency_manager
         self.job_metrics = app.job_metrics
+        self.object_store = app.object_store
 
     def clean(self, job_id):
         if self.debug:
diff --git a/pulsar/managers/staging/pre.py b/pulsar/managers/staging/pre.py
index df3175ae..45c2c0ef 100644
--- a/pulsar/managers/staging/pre.py
+++ b/pulsar/managers/staging/pre.py
@@ -6,11 +6,13 @@ import logging
 log = logging.getLogger(__name__)
 
 
-def preprocess(job_directory, setup_actions, action_executor):
+def preprocess(job_directory, setup_actions, action_executor, object_store=None):
     for setup_action in setup_actions:
         name = setup_action["name"]
         input_type = setup_action["type"]
         action = from_dict(setup_action["action"])
+        if getattr(action, "inject_object_store", False):
+            action.object_store = object_store
         path = job_directory.calculate_path(name, input_type)
         description = "Staging %s '%s' via %s to %s" % (input_type, name, action, path)
         log.debug(description)
diff --git a/pulsar/managers/stateful.py b/pulsar/managers/stateful.py
index bff96998..beaf781e 100644
--- a/pulsar/managers/stateful.py
+++ b/pulsar/managers/stateful.py
@@ -102,7 +102,8 @@ class StatefulManagerProxy(ManagerProxy):
                         'setup' in staging_config:
                     for action in staging_config['setup']:
                         action['action'].update(ssh_key=staging_config['action_mapper']['ssh_key'])
-                preprocess(job_directory, staging_config.get("setup", []), self.__preprocess_action_executor)
+                setup_config = staging_config.get("setup", [])
+                preprocess(job_directory, setup_config, self.__preprocess_action_executor, object_store=self.object_store)
                 self.active_jobs.deactivate_job(job_id, active_status=ACTIVE_STATUS_PREPROCESSING)
 
         new_thread_for_job(self, "preprocess", job_id, do_preprocess, daemon=False)
diff --git a/test/persistence_test.py b/test/persistence_test.py
index d7221e26..99367a4f 100644
--- a/test/persistence_test.py
+++ b/test/persistence_test.py
@@ -127,6 +127,7 @@ def _app():
             authorizer=get_authorizer(None),
             dependency_manager=TestDependencyManager(),
             job_metrics=Bunch(default_job_instrumenter=NULL_JOB_INSTRUMENTER),
+            object_store=None,
         )
         yield app
 
diff --git a/test/test_utils.py b/test/test_utils.py
index 232255d5..994e8a14 100644
--- a/test/test_utils.py
+++ b/test/test_utils.py
@@ -217,7 +217,8 @@ def minimal_app_for_managers():
     return Bunch(staging_directory=staging_directory,
                  authorizer=authorizer,
                  job_metrics=NullJobMetrics(),
-                 dependency_manager=TestDependencyManager())
+                 dependency_manager=TestDependencyManager(),
+                 object_store=object())
 
 
 class NullJobMetrics(object):
-- 
GitLab