Skip to content
Snippets Groups Projects
Commit 86842f6c authored by John Chilton's avatar John Chilton
Browse files

Refactor file action to push source into action class.

More general information than just path was passed through layers above this in #180, this pushes that information down a layer... the idea is to add an object store reference here for remote staging of data directly from an object store.
parent 612baccc
No related branches found
No related tags found
No related merge requests found
......@@ -179,7 +179,7 @@ class FileActionMapper(object):
if mapper:
file_lister = mapper.file_lister
action_kwds = mapper.action_kwds
action = action_class(path, file_lister=file_lister, **action_kwds)
action = action_class(source, file_lister=file_lister, **action_kwds)
self.__process_action(action, type)
return action
......@@ -278,10 +278,14 @@ class BaseAction(object):
whole_directory_transfer_supported = False
action_spec = {}
def __init__(self, path, file_lister=None):
self.path = path
def __init__(self, source, file_lister=None):
self.source = source
self.file_lister = file_lister or DEFAULT_FILE_LISTER
@property
def path(self):
return self.source.get("path")
def unstructured_map(self, path_helper):
unstructured_map = self.file_lister.unstructured_map(self.path)
if self.staging_needed:
......@@ -306,13 +310,24 @@ class BaseAction(object):
def staging_action_local(self):
return self.staging == STAGING_ACTION_LOCAL
def _extend_base_dict(self, **kwds):
base_dict = dict(
path=self.path, # For older Pulsar servers (pre-0.13.0?)
source=self.source,
action_type=self.action_type,
)
base_dict.update(**kwds)
return base_dict
def to_dict(self):
return dict(action_type=self.action_type)
return self._extend_base_dict()
def __str__(self):
as_dict = self.to_dict()
attribute_str = ""
for key, value in as_dict.items():
if key == "source":
continue
attribute_str += "%s=%s" % (key, value)
return "FileAction[%s]" % attribute_str
......@@ -326,11 +341,11 @@ class NoneAction(BaseAction):
staging = STAGING_ACTION_NONE
def to_dict(self):
return dict(path=self.path, action_type=self.action_type)
return self._extend_base_dict()
@classmethod
def from_dict(cls, action_dict):
return NoneAction(path=action_dict["path"])
return NoneAction(source=action_dict["source"])
def path_rewrite(self, path_helper, path=None):
return None
......@@ -347,16 +362,13 @@ class RewriteAction(BaseAction):
action_type = "rewrite"
staging = STAGING_ACTION_NONE
def __init__(self, path, file_lister=None, source_directory=None, destination_directory=None):
self.path = path
self.file_lister = file_lister or DEFAULT_FILE_LISTER
def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None):
super(RewriteAction, self).__init__(source, file_lister=file_lister)
self.source_directory = source_directory
self.destination_directory = destination_directory
def to_dict(self):
return dict(
path=self.path,
action_type=self.action_type,
return self._extend_base_dict(
source_directory=self.source_directory,
destination_directory=self.destination_directory,
)
......@@ -364,7 +376,7 @@ class RewriteAction(BaseAction):
@classmethod
def from_dict(cls, action_dict):
return RewriteAction(
path=action_dict["path"],
source=action_dict["source"],
source_directory=action_dict["source_directory"],
destination_directory=action_dict["destination_directory"],
)
......@@ -401,12 +413,9 @@ class RemoteCopyAction(BaseAction):
action_type = "remote_copy"
staging = STAGING_ACTION_REMOTE
def to_dict(self):
return dict(path=self.path, action_type=self.action_type)
@classmethod
def from_dict(cls, action_dict):
return RemoteCopyAction(path=action_dict["path"])
return RemoteCopyAction(source=action_dict["source"])
def write_to_path(self, path):
copy_to_path(open(self.path, "rb"), path)
......@@ -430,16 +439,16 @@ class RemoteTransferAction(BaseAction):
action_type = "remote_transfer"
staging = STAGING_ACTION_REMOTE
def __init__(self, path, file_lister=None, url=None):
super(RemoteTransferAction, self).__init__(path, file_lister=file_lister)
def __init__(self, source, file_lister=None, url=None):
super(RemoteTransferAction, self).__init__(source, file_lister=file_lister)
self.url = url
def to_dict(self):
return dict(path=self.path, action_type=self.action_type, url=self.url)
return self._extend_base_dict(url=self.url)
@classmethod
def from_dict(cls, action_dict):
return RemoteTransferAction(path=action_dict["path"], url=action_dict["url"])
return RemoteTransferAction(source=action_dict["source"], url=action_dict["url"])
def write_to_path(self, path):
get_file(self.url, path)
......@@ -460,18 +469,20 @@ class PubkeyAuthenticatedTransferAction(BaseAction):
)
staging = STAGING_ACTION_REMOTE
def __init__(self, path, file_lister=None, ssh_user=UNSET_ACTION_KWD,
def __init__(self, source, file_lister=None, ssh_user=UNSET_ACTION_KWD,
ssh_host=UNSET_ACTION_KWD, ssh_port=UNSET_ACTION_KWD, ssh_key=UNSET_ACTION_KWD):
super(PubkeyAuthenticatedTransferAction, self).__init__(path, file_lister=file_lister)
super(PubkeyAuthenticatedTransferAction, self).__init__(source, file_lister=file_lister)
self.ssh_user = ssh_user
self.ssh_host = ssh_host
self.ssh_port = ssh_port
self.ssh_key = ssh_key
def to_dict(self):
return dict(path=self.path, action_type=self.action_type,
ssh_user=self.ssh_user, ssh_host=self.ssh_host,
ssh_port=self.ssh_port)
return self._extend_base_dict(
ssh_user=self.ssh_user,
ssh_host=self.ssh_host,
ssh_port=self.ssh_port
)
@contextmanager
def _serialized_key(self):
......@@ -497,7 +508,7 @@ class RsyncTransferAction(PubkeyAuthenticatedTransferAction):
@classmethod
def from_dict(cls, action_dict):
return RsyncTransferAction(path=action_dict["path"],
return RsyncTransferAction(source=action_dict["source"],
ssh_user=action_dict["ssh_user"],
ssh_host=action_dict["ssh_host"],
ssh_port=action_dict["ssh_port"],
......@@ -519,7 +530,7 @@ class ScpTransferAction(PubkeyAuthenticatedTransferAction):
@classmethod
def from_dict(cls, action_dict):
return ScpTransferAction(path=action_dict["path"],
return ScpTransferAction(source=action_dict["source"],
ssh_user=action_dict["ssh_user"],
ssh_host=action_dict["ssh_host"],
ssh_port=action_dict["ssh_port"],
......@@ -581,6 +592,13 @@ def from_dict(action_dict):
if not target_class:
message = "Failed to recover action from dictionary - invalid action type specified %s." % action_type
raise Exception(message)
if "source" in action_dict:
action_dict.pop("path") # remove redundant information stored for backward compatibility.
elif "path" in action_dict:
# legacy message received from older Pulsar client, pop the path from the dict
# and convert it to a source.
source = {"path": action_dict.pop("path")}
action_dict["source"] = source
return target_class.from_dict(action_dict)
......
......@@ -11,7 +11,7 @@ def test_write_to_file():
to_path = os.path.join(directory, "local_get")
url = server.application_url + "?path=%s" % from_path
RemoteTransferAction(to_path, url=url).write_to_path(to_path)
RemoteTransferAction({"path": to_path}, url=url).write_to_path(to_path)
assert open(to_path, "rb").read() == b"123456"
......@@ -23,7 +23,7 @@ def test_write_from_file():
to_path = os.path.join(directory, "remote_post")
url = server.application_url + "?path=%s" % to_path
RemoteTransferAction(to_path, url=url).write_from_path(from_path)
RemoteTransferAction({"path": to_path}, url=url).write_from_path(from_path)
posted_contents = open(to_path, "rb").read()
assert posted_contents == b"123456", posted_contents
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment