From 86842f6c811850a1bbc11cdb0f695970f061ef95 Mon Sep 17 00:00:00 2001
From: John Chilton <jmchilton@gmail.com>
Date: Tue, 4 Jun 2019 09:44:27 -0400
Subject: [PATCH] Refactor file action to push source into action class.

More general information than just path was passed through layers above this in #180, this pushes that information down a layer... the idea is to add an object store reference here for remote staging of data directly from an object store.
---
 pulsar/client/action_mapper.py | 74 +++++++++++++++++++++-------------
 test/transfer_action_test.py   |  4 +-
 2 files changed, 48 insertions(+), 30 deletions(-)

diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py
index 725a9a93..85d8e29d 100644
--- a/pulsar/client/action_mapper.py
+++ b/pulsar/client/action_mapper.py
@@ -179,7 +179,7 @@ class FileActionMapper(object):
         if mapper:
             file_lister = mapper.file_lister
             action_kwds = mapper.action_kwds
-        action = action_class(path, file_lister=file_lister, **action_kwds)
+        action = action_class(source, file_lister=file_lister, **action_kwds)
         self.__process_action(action, type)
         return action
 
@@ -278,10 +278,14 @@ class BaseAction(object):
     whole_directory_transfer_supported = False
     action_spec = {}
 
-    def __init__(self, path, file_lister=None):
-        self.path = path
+    def __init__(self, source, file_lister=None):
+        self.source = source
         self.file_lister = file_lister or DEFAULT_FILE_LISTER
 
+    @property
+    def path(self):
+        return self.source.get("path")
+
     def unstructured_map(self, path_helper):
         unstructured_map = self.file_lister.unstructured_map(self.path)
         if self.staging_needed:
@@ -306,13 +310,24 @@ class BaseAction(object):
     def staging_action_local(self):
         return self.staging == STAGING_ACTION_LOCAL
 
+    def _extend_base_dict(self, **kwds):
+        base_dict = dict(
+            path=self.path,  # For older Pulsar servers (pre-0.13.0?)
+            source=self.source,
+            action_type=self.action_type,
+        )
+        base_dict.update(**kwds)
+        return base_dict
+
     def to_dict(self):
-        return dict(action_type=self.action_type)
+        return self._extend_base_dict()
 
     def __str__(self):
         as_dict = self.to_dict()
         attribute_str = ""
         for key, value in as_dict.items():
+            if key == "source":
+                continue
             attribute_str += "%s=%s" % (key, value)
         return "FileAction[%s]" % attribute_str
 
@@ -326,11 +341,11 @@ class NoneAction(BaseAction):
     staging = STAGING_ACTION_NONE
 
     def to_dict(self):
-        return dict(path=self.path, action_type=self.action_type)
+        return self._extend_base_dict()
 
     @classmethod
     def from_dict(cls, action_dict):
-        return NoneAction(path=action_dict["path"])
+        return NoneAction(source=action_dict["source"])
 
     def path_rewrite(self, path_helper, path=None):
         return None
@@ -347,16 +362,13 @@ class RewriteAction(BaseAction):
     action_type = "rewrite"
     staging = STAGING_ACTION_NONE
 
-    def __init__(self, path, file_lister=None, source_directory=None, destination_directory=None):
-        self.path = path
-        self.file_lister = file_lister or DEFAULT_FILE_LISTER
+    def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None):
+        super(RewriteAction, self).__init__(source, file_lister=file_lister)
         self.source_directory = source_directory
         self.destination_directory = destination_directory
 
     def to_dict(self):
-        return dict(
-            path=self.path,
-            action_type=self.action_type,
+        return self._extend_base_dict(
             source_directory=self.source_directory,
             destination_directory=self.destination_directory,
         )
@@ -364,7 +376,7 @@ class RewriteAction(BaseAction):
     @classmethod
     def from_dict(cls, action_dict):
         return RewriteAction(
-            path=action_dict["path"],
+            source=action_dict["source"],
             source_directory=action_dict["source_directory"],
             destination_directory=action_dict["destination_directory"],
         )
@@ -401,12 +413,9 @@ class RemoteCopyAction(BaseAction):
     action_type = "remote_copy"
     staging = STAGING_ACTION_REMOTE
 
-    def to_dict(self):
-        return dict(path=self.path, action_type=self.action_type)
-
     @classmethod
     def from_dict(cls, action_dict):
-        return RemoteCopyAction(path=action_dict["path"])
+        return RemoteCopyAction(source=action_dict["source"])
 
     def write_to_path(self, path):
         copy_to_path(open(self.path, "rb"), path)
@@ -430,16 +439,16 @@ class RemoteTransferAction(BaseAction):
     action_type = "remote_transfer"
     staging = STAGING_ACTION_REMOTE
 
-    def __init__(self, path, file_lister=None, url=None):
-        super(RemoteTransferAction, self).__init__(path, file_lister=file_lister)
+    def __init__(self, source, file_lister=None, url=None):
+        super(RemoteTransferAction, self).__init__(source, file_lister=file_lister)
         self.url = url
 
     def to_dict(self):
-        return dict(path=self.path, action_type=self.action_type, url=self.url)
+        return self._extend_base_dict(url=self.url)
 
     @classmethod
     def from_dict(cls, action_dict):
-        return RemoteTransferAction(path=action_dict["path"], url=action_dict["url"])
+        return RemoteTransferAction(source=action_dict["source"], url=action_dict["url"])
 
     def write_to_path(self, path):
         get_file(self.url, path)
@@ -460,18 +469,20 @@ class PubkeyAuthenticatedTransferAction(BaseAction):
     )
     staging = STAGING_ACTION_REMOTE
 
-    def __init__(self, path, file_lister=None, ssh_user=UNSET_ACTION_KWD,
+    def __init__(self, source, file_lister=None, ssh_user=UNSET_ACTION_KWD,
                  ssh_host=UNSET_ACTION_KWD, ssh_port=UNSET_ACTION_KWD, ssh_key=UNSET_ACTION_KWD):
-        super(PubkeyAuthenticatedTransferAction, self).__init__(path, file_lister=file_lister)
+        super(PubkeyAuthenticatedTransferAction, self).__init__(source, file_lister=file_lister)
         self.ssh_user = ssh_user
         self.ssh_host = ssh_host
         self.ssh_port = ssh_port
         self.ssh_key = ssh_key
 
     def to_dict(self):
-        return dict(path=self.path, action_type=self.action_type,
-                    ssh_user=self.ssh_user, ssh_host=self.ssh_host,
-                    ssh_port=self.ssh_port)
+        return self._extend_base_dict(
+            ssh_user=self.ssh_user,
+            ssh_host=self.ssh_host,
+            ssh_port=self.ssh_port
+        )
 
     @contextmanager
     def _serialized_key(self):
@@ -497,7 +508,7 @@ class RsyncTransferAction(PubkeyAuthenticatedTransferAction):
 
     @classmethod
     def from_dict(cls, action_dict):
-        return RsyncTransferAction(path=action_dict["path"],
+        return RsyncTransferAction(source=action_dict["source"],
                                    ssh_user=action_dict["ssh_user"],
                                    ssh_host=action_dict["ssh_host"],
                                    ssh_port=action_dict["ssh_port"],
@@ -519,7 +530,7 @@ class ScpTransferAction(PubkeyAuthenticatedTransferAction):
 
     @classmethod
     def from_dict(cls, action_dict):
-        return ScpTransferAction(path=action_dict["path"],
+        return ScpTransferAction(source=action_dict["source"],
                                  ssh_user=action_dict["ssh_user"],
                                  ssh_host=action_dict["ssh_host"],
                                  ssh_port=action_dict["ssh_port"],
@@ -581,6 +592,13 @@ def from_dict(action_dict):
     if not target_class:
         message = "Failed to recover action from dictionary - invalid action type specified %s." % action_type
         raise Exception(message)
+    if "source" in action_dict:
+        action_dict.pop("path")  # remove redundant information stored for backward compatibility.
+    elif "path" in action_dict:
+        # legacy message received from older Pulsar client, pop the path from the dict
+        # and convert it to a source.
+        source = {"path": action_dict.pop("path")}
+        action_dict["source"] = source
     return target_class.from_dict(action_dict)
 
 
diff --git a/test/transfer_action_test.py b/test/transfer_action_test.py
index 4155167e..30b927f9 100644
--- a/test/transfer_action_test.py
+++ b/test/transfer_action_test.py
@@ -11,7 +11,7 @@ def test_write_to_file():
 
         to_path = os.path.join(directory, "local_get")
         url = server.application_url + "?path=%s" % from_path
-        RemoteTransferAction(to_path, url=url).write_to_path(to_path)
+        RemoteTransferAction({"path": to_path}, url=url).write_to_path(to_path)
 
         assert open(to_path, "rb").read() == b"123456"
 
@@ -23,7 +23,7 @@ def test_write_from_file():
 
         to_path = os.path.join(directory, "remote_post")
         url = server.application_url + "?path=%s" % to_path
-        RemoteTransferAction(to_path, url=url).write_from_path(from_path)
+        RemoteTransferAction({"path": to_path}, url=url).write_from_path(from_path)
 
         posted_contents = open(to_path, "rb").read()
         assert posted_contents == b"123456", posted_contents
-- 
GitLab