diff --git a/pulsar/client/interface.py b/pulsar/client/interface.py index a83a20b3e45d433815832f292ffc20c261e3856b..0882c91b595f06059f9ccb466e494e4d9b93b0f0 100644 --- a/pulsar/client/interface.py +++ b/pulsar/client/interface.py @@ -110,7 +110,13 @@ class HttpPulsarInterface(PulsarInterface): class LocalPulsarInterface(PulsarInterface): - def __init__(self, destination_params, job_manager=None, file_cache=None, object_store=None): + def __init__(self, destination_params, job_manager=None, pulsar_app=None, file_cache=None, object_store=None): + if job_manager is None: + job_manager_name = destination_params.get("manager", None) + if job_manager_name is None: + job_manager = pulsar_app.only_manager + else: + job_manager = pulsar_app.managers[job_manager_name] self.job_manager = job_manager self.file_cache = file_cache self.object_store = object_store diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py index c5058103290b2bd8d9f0806ca3ef072d89353cb0..ab0d367ff977f547ef80394f74633ab349845798 100644 --- a/pulsar/client/manager.py +++ b/pulsar/client/manager.py @@ -50,13 +50,15 @@ class ClientManager(object): def __init__(self, **kwds): """Build a HTTP client or a local client that talks directly to a job manger.""" - if 'job_manager' in kwds: + if 'pulsar_app' in kwds or 'job_manager' in kwds: self.job_manager_interface_class = LocalPulsarInterface - job_manager = kwds['job_manager'] + pulsar_app = kwds.get('pulsar_app', None) + job_manager = kwds.get('job_manager', None) file_cache = kwds.get('file_cache', None) self.job_manager_interface_args = dict( job_manager=job_manager, - file_cache=file_cache + pulsar_app=pulsar_app, + file_cache=file_cache, ) else: self.job_manager_interface_class = HttpPulsarInterface