Skip to content
Snippets Groups Projects
Commit d130b11b authored by John Chilton's avatar John Chilton
Browse files

Allow remote_object_store_copy file action.

Copy input files in from a configured object store.
parent 68021a3b
No related branches found
No related tags found
No related merge requests found
...@@ -462,6 +462,33 @@ class RemoteTransferAction(BaseAction): ...@@ -462,6 +462,33 @@ class RemoteTransferAction(BaseAction):
post_file(self.url, pulsar_path) post_file(self.url, pulsar_path)
class RemoteObjectStoreCopyAction(BaseAction):
"""
"""
action_type = "remote_object_store_copy"
staging = STAGING_ACTION_REMOTE
inject_object_store = True
@classmethod
def from_dict(cls, action_dict):
return RemoteObjectStoreCopyAction(source=action_dict["source"])
def write_to_path(self, path):
assert self.object_store # Make sure object_store attribute injected
assert "object_store_ref" in self.source
object_store_ref = self.source["object_store_ref"]
dataset_object = Bunch(
id=object_store_ref["dataset_id"],
uuid=object_store_ref["dataset_uuid"],
object_store_id=object_store_ref["object_store_id"],
)
filename = self.object_store.get_filename(dataset_object)
copy_to_path(open(filename, 'rb'), path)
def write_from_path(self, pulsar_path):
raise NotImplementedError("Writing raw files to object store not supported at this time.")
class PubkeyAuthenticatedTransferAction(BaseAction): class PubkeyAuthenticatedTransferAction(BaseAction):
"""Base class for file transfers requiring an SSH public/private key """Base class for file transfers requiring an SSH public/private key
""" """
...@@ -585,7 +612,14 @@ class MessageAction(object): ...@@ -585,7 +612,14 @@ class MessageAction(object):
open(path, "w").write(self.contents) open(path, "w").write(self.contents)
DICTIFIABLE_ACTION_CLASSES = [RemoteCopyAction, RemoteTransferAction, MessageAction, RsyncTransferAction, ScpTransferAction] DICTIFIABLE_ACTION_CLASSES = [
RemoteCopyAction,
RemoteTransferAction,
MessageAction,
RsyncTransferAction,
ScpTransferAction,
RemoteObjectStoreCopyAction
]
def from_dict(action_dict): def from_dict(action_dict):
...@@ -632,7 +666,8 @@ class BasePathMapper(object): ...@@ -632,7 +666,8 @@ class BasePathMapper(object):
def matches(self, path, path_type): def matches(self, path, path_type):
path_type_matches = path_type in self.path_types path_type_matches = path_type in self.path_types
return path_type_matches and self._path_matches(path) rval = path_type_matches and self._path_matches(path)
return rval
def _extend_base_dict(self, **kwds): def _extend_base_dict(self, **kwds):
base_dict = dict( base_dict = dict(
...@@ -761,6 +796,7 @@ ACTION_CLASSES = [ ...@@ -761,6 +796,7 @@ ACTION_CLASSES = [
CopyAction, CopyAction,
RemoteCopyAction, RemoteCopyAction,
RemoteTransferAction, RemoteTransferAction,
RemoteObjectStoreCopyAction,
RsyncTransferAction, RsyncTransferAction,
ScpTransferAction, ScpTransferAction,
] ]
......
...@@ -155,13 +155,14 @@ CLIENT_INPUT_PATH_TYPES = Bunch( ...@@ -155,13 +155,14 @@ CLIENT_INPUT_PATH_TYPES = Bunch(
class ClientInput(object): class ClientInput(object):
def __init__(self, path, input_type): def __init__(self, path, input_type, object_store_ref=None):
self.path = path self.path = path
self.input_type = input_type self.input_type = input_type
self.object_store_ref = object_store_ref
@property @property
def action_source(self): def action_source(self):
return {"path": self.path} return {"path": self.path, "object_store_ref": self.object_store_ref}
class ClientOutputs(object): class ClientOutputs(object):
......
...@@ -118,18 +118,26 @@ class PulsarApp(object): ...@@ -118,18 +118,26 @@ class PulsarApp(object):
self.file_cache = Cache(file_cache_dir) if file_cache_dir else None self.file_cache = Cache(file_cache_dir) if file_cache_dir else None
def __setup_object_store(self, conf): def __setup_object_store(self, conf):
if "object_store_config_file" not in conf: if "object_store_config_file" not in conf and "object_store_config" not in conf:
self.object_store = None self.object_store = None
return return
object_store_config = Bunch(
object_store_config_file=conf['object_store_config_file'], config_obj_kwds = dict(
file_path=conf.get("object_store_file_path", None), file_path=conf.get("object_store_file_path", None),
object_store_check_old_style=False, object_store_check_old_style=False,
job_working_directory=conf.get("object_store_job_working_directory", None), job_working_directory=conf.get("object_store_job_working_directory", None),
new_file_path=conf.get("object_store_new_file_path", tempdir), new_file_path=conf.get("object_store_new_file_path", tempdir),
umask=int(conf.get("object_store_umask", "0000")), umask=int(conf.get("object_store_umask", "0000")),
jobs_directory=None,
) )
self.object_store = build_object_store_from_config(object_store_config) config_dict = None
if conf.get("object_store_config_file"):
config_obj_kwds["object_store_config_file"] = conf['object_store_config_file']
else:
config_dict = conf["object_store_config"]
object_store_config = Bunch(**config_obj_kwds)
self.object_store = build_object_store_from_config(object_store_config, config_dict=config_dict)
def __setup_dependency_manager(self, conf): def __setup_dependency_manager(self, conf):
dependencies_dir = conf.get("tool_dependency_dir", "dependencies") dependencies_dir = conf.get("tool_dependency_dir", "dependencies")
......
...@@ -124,5 +124,9 @@ class ManagerProxy(object): ...@@ -124,5 +124,9 @@ class ManagerProxy(object):
def system_properties(self): def system_properties(self):
return self._proxied_manager.system_properties() return self._proxied_manager.system_properties()
@property
def object_store(self):
return self._proxied_manager.object_store
def __str__(self): def __str__(self):
return "ManagerProxy[manager=%s]" % str(self._proxied_manager) return "ManagerProxy[manager=%s]" % str(self._proxied_manager)
...@@ -70,6 +70,7 @@ class BaseManager(ManagerInterface): ...@@ -70,6 +70,7 @@ class BaseManager(ManagerInterface):
self.__init_env_vars(**kwds) self.__init_env_vars(**kwds)
self.dependency_manager = app.dependency_manager self.dependency_manager = app.dependency_manager
self.job_metrics = app.job_metrics self.job_metrics = app.job_metrics
self.object_store = app.object_store
def clean(self, job_id): def clean(self, job_id):
if self.debug: if self.debug:
......
...@@ -6,11 +6,13 @@ import logging ...@@ -6,11 +6,13 @@ import logging
log = logging.getLogger(__name__) log = logging.getLogger(__name__)
def preprocess(job_directory, setup_actions, action_executor): def preprocess(job_directory, setup_actions, action_executor, object_store=None):
for setup_action in setup_actions: for setup_action in setup_actions:
name = setup_action["name"] name = setup_action["name"]
input_type = setup_action["type"] input_type = setup_action["type"]
action = from_dict(setup_action["action"]) action = from_dict(setup_action["action"])
if getattr(action, "inject_object_store", False):
action.object_store = object_store
path = job_directory.calculate_path(name, input_type) path = job_directory.calculate_path(name, input_type)
description = "Staging %s '%s' via %s to %s" % (input_type, name, action, path) description = "Staging %s '%s' via %s to %s" % (input_type, name, action, path)
log.debug(description) log.debug(description)
......
...@@ -102,7 +102,8 @@ class StatefulManagerProxy(ManagerProxy): ...@@ -102,7 +102,8 @@ class StatefulManagerProxy(ManagerProxy):
'setup' in staging_config: 'setup' in staging_config:
for action in staging_config['setup']: for action in staging_config['setup']:
action['action'].update(ssh_key=staging_config['action_mapper']['ssh_key']) action['action'].update(ssh_key=staging_config['action_mapper']['ssh_key'])
preprocess(job_directory, staging_config.get("setup", []), self.__preprocess_action_executor) setup_config = staging_config.get("setup", [])
preprocess(job_directory, setup_config, self.__preprocess_action_executor, object_store=self.object_store)
self.active_jobs.deactivate_job(job_id, active_status=ACTIVE_STATUS_PREPROCESSING) self.active_jobs.deactivate_job(job_id, active_status=ACTIVE_STATUS_PREPROCESSING)
new_thread_for_job(self, "preprocess", job_id, do_preprocess, daemon=False) new_thread_for_job(self, "preprocess", job_id, do_preprocess, daemon=False)
......
...@@ -127,6 +127,7 @@ def _app(): ...@@ -127,6 +127,7 @@ def _app():
authorizer=get_authorizer(None), authorizer=get_authorizer(None),
dependency_manager=TestDependencyManager(), dependency_manager=TestDependencyManager(),
job_metrics=Bunch(default_job_instrumenter=NULL_JOB_INSTRUMENTER), job_metrics=Bunch(default_job_instrumenter=NULL_JOB_INSTRUMENTER),
object_store=None,
) )
yield app yield app
......
...@@ -217,7 +217,8 @@ def minimal_app_for_managers(): ...@@ -217,7 +217,8 @@ def minimal_app_for_managers():
return Bunch(staging_directory=staging_directory, return Bunch(staging_directory=staging_directory,
authorizer=authorizer, authorizer=authorizer,
job_metrics=NullJobMetrics(), job_metrics=NullJobMetrics(),
dependency_manager=TestDependencyManager()) dependency_manager=TestDependencyManager(),
object_store=object())
class NullJobMetrics(object): class NullJobMetrics(object):
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment