diff --git a/pulsar/client/action_mapper.py b/pulsar/client/action_mapper.py index dcb5941a75816ec36de706f13e9e5b5226cd1181..469581adcbfd63613b6b5aa1b8eea8018573b71d 100644 --- a/pulsar/client/action_mapper.py +++ b/pulsar/client/action_mapper.py @@ -58,7 +58,7 @@ path_type = Bunch( # metric instrumentation files) OUTPUT_METADATA="output_metadata", # Other fixed tool parameter paths (likely coming from tool data, but not - # nessecarily). + # necessarily). UNSTRUCTURED="unstructured", ) @@ -179,7 +179,7 @@ class FileActionMapper(object): if mapper: file_lister = mapper.file_lister action_kwds = mapper.action_kwds - action = action_class(path, file_lister=file_lister, **action_kwds) + action = action_class(source, file_lister=file_lister, **action_kwds) self.__process_action(action, type) return action @@ -278,10 +278,14 @@ class BaseAction(object): whole_directory_transfer_supported = False action_spec = {} - def __init__(self, path, file_lister=None): - self.path = path + def __init__(self, source, file_lister=None): + self.source = source self.file_lister = file_lister or DEFAULT_FILE_LISTER + @property + def path(self): + return self.source.get("path") + def unstructured_map(self, path_helper): unstructured_map = self.file_lister.unstructured_map(self.path) if self.staging_needed: @@ -306,13 +310,29 @@ class BaseAction(object): def staging_action_local(self): return self.staging == STAGING_ACTION_LOCAL + def _extend_base_dict(self, **kwds): + base_dict = dict( + path=self.path, # For older Pulsar servers (pre-0.13.0?) + source=self.source, + action_type=self.action_type, + ) + base_dict.update(**kwds) + return base_dict + def to_dict(self): - return dict(action_type=self.action_type) + return self._extend_base_dict() def __str__(self): as_dict = self.to_dict() attribute_str = "" + first = True for key, value in as_dict.items(): + if key == "source": + continue + if first: + first = False + else: + attribute_str += "," attribute_str += "%s=%s" % (key, value) return "FileAction[%s]" % attribute_str @@ -326,11 +346,11 @@ class NoneAction(BaseAction): staging = STAGING_ACTION_NONE def to_dict(self): - return dict(path=self.path, action_type=self.action_type) + return self._extend_base_dict() @classmethod def from_dict(cls, action_dict): - return NoneAction(path=action_dict["path"]) + return NoneAction(source=action_dict["source"]) def path_rewrite(self, path_helper, path=None): return None @@ -347,16 +367,13 @@ class RewriteAction(BaseAction): action_type = "rewrite" staging = STAGING_ACTION_NONE - def __init__(self, path, file_lister=None, source_directory=None, destination_directory=None): - self.path = path - self.file_lister = file_lister or DEFAULT_FILE_LISTER + def __init__(self, source, file_lister=None, source_directory=None, destination_directory=None): + super(RewriteAction, self).__init__(source, file_lister=file_lister) self.source_directory = source_directory self.destination_directory = destination_directory def to_dict(self): - return dict( - path=self.path, - action_type=self.action_type, + return self._extend_base_dict( source_directory=self.source_directory, destination_directory=self.destination_directory, ) @@ -364,7 +381,7 @@ class RewriteAction(BaseAction): @classmethod def from_dict(cls, action_dict): return RewriteAction( - path=action_dict["path"], + source=action_dict["source"], source_directory=action_dict["source_directory"], destination_directory=action_dict["destination_directory"], ) @@ -401,12 +418,9 @@ class RemoteCopyAction(BaseAction): action_type = "remote_copy" staging = STAGING_ACTION_REMOTE - def to_dict(self): - return dict(path=self.path, action_type=self.action_type) - @classmethod def from_dict(cls, action_dict): - return RemoteCopyAction(path=action_dict["path"]) + return RemoteCopyAction(source=action_dict["source"]) def write_to_path(self, path): copy_to_path(open(self.path, "rb"), path) @@ -430,16 +444,16 @@ class RemoteTransferAction(BaseAction): action_type = "remote_transfer" staging = STAGING_ACTION_REMOTE - def __init__(self, path, file_lister=None, url=None): - super(RemoteTransferAction, self).__init__(path, file_lister=file_lister) + def __init__(self, source, file_lister=None, url=None): + super(RemoteTransferAction, self).__init__(source, file_lister=file_lister) self.url = url def to_dict(self): - return dict(path=self.path, action_type=self.action_type, url=self.url) + return self._extend_base_dict(url=self.url) @classmethod def from_dict(cls, action_dict): - return RemoteTransferAction(path=action_dict["path"], url=action_dict["url"]) + return RemoteTransferAction(source=action_dict["source"], url=action_dict["url"]) def write_to_path(self, path): get_file(self.url, path) @@ -448,6 +462,33 @@ class RemoteTransferAction(BaseAction): post_file(self.url, pulsar_path) +class RemoteObjectStoreCopyAction(BaseAction): + """ + """ + action_type = "remote_object_store_copy" + staging = STAGING_ACTION_REMOTE + inject_object_store = True + + @classmethod + def from_dict(cls, action_dict): + return RemoteObjectStoreCopyAction(source=action_dict["source"]) + + def write_to_path(self, path): + assert self.object_store # Make sure object_store attribute injected + assert "object_store_ref" in self.source + object_store_ref = self.source["object_store_ref"] + dataset_object = Bunch( + id=object_store_ref["dataset_id"], + uuid=object_store_ref["dataset_uuid"], + object_store_id=object_store_ref["object_store_id"], + ) + filename = self.object_store.get_filename(dataset_object) + copy_to_path(open(filename, 'rb'), path) + + def write_from_path(self, pulsar_path): + raise NotImplementedError("Writing raw files to object store not supported at this time.") + + class PubkeyAuthenticatedTransferAction(BaseAction): """Base class for file transfers requiring an SSH public/private key """ @@ -460,18 +501,20 @@ class PubkeyAuthenticatedTransferAction(BaseAction): ) staging = STAGING_ACTION_REMOTE - def __init__(self, path, file_lister=None, ssh_user=UNSET_ACTION_KWD, + def __init__(self, source, file_lister=None, ssh_user=UNSET_ACTION_KWD, ssh_host=UNSET_ACTION_KWD, ssh_port=UNSET_ACTION_KWD, ssh_key=UNSET_ACTION_KWD): - super(PubkeyAuthenticatedTransferAction, self).__init__(path, file_lister=file_lister) + super(PubkeyAuthenticatedTransferAction, self).__init__(source, file_lister=file_lister) self.ssh_user = ssh_user self.ssh_host = ssh_host self.ssh_port = ssh_port self.ssh_key = ssh_key def to_dict(self): - return dict(path=self.path, action_type=self.action_type, - ssh_user=self.ssh_user, ssh_host=self.ssh_host, - ssh_port=self.ssh_port) + return self._extend_base_dict( + ssh_user=self.ssh_user, + ssh_host=self.ssh_host, + ssh_port=self.ssh_port + ) @contextmanager def _serialized_key(self): @@ -497,7 +540,7 @@ class RsyncTransferAction(PubkeyAuthenticatedTransferAction): @classmethod def from_dict(cls, action_dict): - return RsyncTransferAction(path=action_dict["path"], + return RsyncTransferAction(source=action_dict["source"], ssh_user=action_dict["ssh_user"], ssh_host=action_dict["ssh_host"], ssh_port=action_dict["ssh_port"], @@ -519,7 +562,7 @@ class ScpTransferAction(PubkeyAuthenticatedTransferAction): @classmethod def from_dict(cls, action_dict): - return ScpTransferAction(path=action_dict["path"], + return ScpTransferAction(source=action_dict["source"], ssh_user=action_dict["ssh_user"], ssh_host=action_dict["ssh_host"], ssh_port=action_dict["ssh_port"], @@ -569,7 +612,14 @@ class MessageAction(object): open(path, "w").write(self.contents) -DICTIFIABLE_ACTION_CLASSES = [RemoteCopyAction, RemoteTransferAction, MessageAction, RsyncTransferAction, ScpTransferAction] +DICTIFIABLE_ACTION_CLASSES = [ + RemoteCopyAction, + RemoteTransferAction, + MessageAction, + RsyncTransferAction, + ScpTransferAction, + RemoteObjectStoreCopyAction +] def from_dict(action_dict): @@ -581,6 +631,13 @@ def from_dict(action_dict): if not target_class: message = "Failed to recover action from dictionary - invalid action type specified %s." % action_type raise Exception(message) + if "source" in action_dict: + action_dict.pop("path") # remove redundant information stored for backward compatibility. + elif "path" in action_dict: + # legacy message received from older Pulsar client, pop the path from the dict + # and convert it to a source. + source = {"path": action_dict.pop("path")} + action_dict["source"] = source return target_class.from_dict(action_dict) @@ -609,7 +666,8 @@ class BasePathMapper(object): def matches(self, path, path_type): path_type_matches = path_type in self.path_types - return path_type_matches and self._path_matches(path) + rval = path_type_matches and self._path_matches(path) + return rval def _extend_base_dict(self, **kwds): base_dict = dict( @@ -738,6 +796,7 @@ ACTION_CLASSES = [ CopyAction, RemoteCopyAction, RemoteTransferAction, + RemoteObjectStoreCopyAction, RsyncTransferAction, ScpTransferAction, ] diff --git a/pulsar/client/staging/__init__.py b/pulsar/client/staging/__init__.py index a141968f764fc0264fe1f41d5895a7d9951e6e1b..86cb2158ae011d1bb3d611838baf197ac93b694c 100644 --- a/pulsar/client/staging/__init__.py +++ b/pulsar/client/staging/__init__.py @@ -155,13 +155,14 @@ CLIENT_INPUT_PATH_TYPES = Bunch( class ClientInput(object): - def __init__(self, path, input_type): + def __init__(self, path, input_type, object_store_ref=None): self.path = path self.input_type = input_type + self.object_store_ref = object_store_ref @property def action_source(self): - return {"path": self.path} + return {"path": self.path, "object_store_ref": self.object_store_ref} class ClientOutputs(object): diff --git a/pulsar/core.py b/pulsar/core.py index ee8f4f7d654572ee63170d11b276b09ba34bc149..2f392d492560ac4f73e88df7e30dd5684b1b2040 100644 --- a/pulsar/core.py +++ b/pulsar/core.py @@ -118,18 +118,26 @@ class PulsarApp(object): self.file_cache = Cache(file_cache_dir) if file_cache_dir else None def __setup_object_store(self, conf): - if "object_store_config_file" not in conf: + if "object_store_config_file" not in conf and "object_store_config" not in conf: self.object_store = None return - object_store_config = Bunch( - object_store_config_file=conf['object_store_config_file'], + + config_obj_kwds = dict( file_path=conf.get("object_store_file_path", None), object_store_check_old_style=False, job_working_directory=conf.get("object_store_job_working_directory", None), new_file_path=conf.get("object_store_new_file_path", tempdir), umask=int(conf.get("object_store_umask", "0000")), + jobs_directory=None, ) - self.object_store = build_object_store_from_config(object_store_config) + config_dict = None + if conf.get("object_store_config_file"): + config_obj_kwds["object_store_config_file"] = conf['object_store_config_file'] + else: + config_dict = conf["object_store_config"] + + object_store_config = Bunch(**config_obj_kwds) + self.object_store = build_object_store_from_config(object_store_config, config_dict=config_dict) def __setup_dependency_manager(self, conf): dependencies_dir = conf.get("tool_dependency_dir", "dependencies") diff --git a/pulsar/managers/__init__.py b/pulsar/managers/__init__.py index 68bdc02b1644cc6b4183d811262f218d2e4cdaa0..a9aa741bec9ce5932764818836c75a69df4d2395 100644 --- a/pulsar/managers/__init__.py +++ b/pulsar/managers/__init__.py @@ -124,5 +124,9 @@ class ManagerProxy(object): def system_properties(self): return self._proxied_manager.system_properties() + @property + def object_store(self): + return self._proxied_manager.object_store + def __str__(self): return "ManagerProxy[manager=%s]" % str(self._proxied_manager) diff --git a/pulsar/managers/base/__init__.py b/pulsar/managers/base/__init__.py index 30a6485a1f1dcda42f5a526bd61c790b2321fb3e..c924fb9c67298f3231e715e4a371ef2894de12c5 100644 --- a/pulsar/managers/base/__init__.py +++ b/pulsar/managers/base/__init__.py @@ -70,6 +70,7 @@ class BaseManager(ManagerInterface): self.__init_env_vars(**kwds) self.dependency_manager = app.dependency_manager self.job_metrics = app.job_metrics + self.object_store = app.object_store def clean(self, job_id): if self.debug: diff --git a/pulsar/managers/staging/pre.py b/pulsar/managers/staging/pre.py index df3175ae89d376c19cb7fafd9c1785c3a6ccba60..45c2c0ef49486646a46391de8ed025f1f5a2d6ec 100644 --- a/pulsar/managers/staging/pre.py +++ b/pulsar/managers/staging/pre.py @@ -6,11 +6,13 @@ import logging log = logging.getLogger(__name__) -def preprocess(job_directory, setup_actions, action_executor): +def preprocess(job_directory, setup_actions, action_executor, object_store=None): for setup_action in setup_actions: name = setup_action["name"] input_type = setup_action["type"] action = from_dict(setup_action["action"]) + if getattr(action, "inject_object_store", False): + action.object_store = object_store path = job_directory.calculate_path(name, input_type) description = "Staging %s '%s' via %s to %s" % (input_type, name, action, path) log.debug(description) diff --git a/pulsar/managers/stateful.py b/pulsar/managers/stateful.py index bff96998c704cb1708c9e014527d11fa9b745ed8..beaf781e782b5a0f78248f6d04873bbb0b5119b6 100644 --- a/pulsar/managers/stateful.py +++ b/pulsar/managers/stateful.py @@ -102,7 +102,8 @@ class StatefulManagerProxy(ManagerProxy): 'setup' in staging_config: for action in staging_config['setup']: action['action'].update(ssh_key=staging_config['action_mapper']['ssh_key']) - preprocess(job_directory, staging_config.get("setup", []), self.__preprocess_action_executor) + setup_config = staging_config.get("setup", []) + preprocess(job_directory, setup_config, self.__preprocess_action_executor, object_store=self.object_store) self.active_jobs.deactivate_job(job_id, active_status=ACTIVE_STATUS_PREPROCESSING) new_thread_for_job(self, "preprocess", job_id, do_preprocess, daemon=False) diff --git a/test/persistence_test.py b/test/persistence_test.py index d7221e26a89f72c3f9693f2d1ff4a48cbe0596ac..99367a4fee0f57b180441a977047e5510e619af9 100644 --- a/test/persistence_test.py +++ b/test/persistence_test.py @@ -127,6 +127,7 @@ def _app(): authorizer=get_authorizer(None), dependency_manager=TestDependencyManager(), job_metrics=Bunch(default_job_instrumenter=NULL_JOB_INSTRUMENTER), + object_store=None, ) yield app diff --git a/test/test_utils.py b/test/test_utils.py index 232255d550b9c46331426a537e3e449c8ca6b786..994e8a149ba905334d127f2c0f0da07bc9652952 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -217,7 +217,8 @@ def minimal_app_for_managers(): return Bunch(staging_directory=staging_directory, authorizer=authorizer, job_metrics=NullJobMetrics(), - dependency_manager=TestDependencyManager()) + dependency_manager=TestDependencyManager(), + object_store=object()) class NullJobMetrics(object): diff --git a/test/transfer_action_test.py b/test/transfer_action_test.py index 4155167e48b90597e7530b454f1dbf6cf9eedd27..30b927f9e2e42014d9c4a43da57031b61f206020 100644 --- a/test/transfer_action_test.py +++ b/test/transfer_action_test.py @@ -11,7 +11,7 @@ def test_write_to_file(): to_path = os.path.join(directory, "local_get") url = server.application_url + "?path=%s" % from_path - RemoteTransferAction(to_path, url=url).write_to_path(to_path) + RemoteTransferAction({"path": to_path}, url=url).write_to_path(to_path) assert open(to_path, "rb").read() == b"123456" @@ -23,7 +23,7 @@ def test_write_from_file(): to_path = os.path.join(directory, "remote_post") url = server.application_url + "?path=%s" % to_path - RemoteTransferAction(to_path, url=url).write_from_path(from_path) + RemoteTransferAction({"path": to_path}, url=url).write_from_path(from_path) posted_contents = open(to_path, "rb").read() assert posted_contents == b"123456", posted_contents