diff --git a/pulsar/client/transport/__init__.py b/pulsar/client/transport/__init__.py index d6ff6469154dca2582a00755f06479a064043443..e9d10d3f2c5d52e2df8abda74fef7ea35cfa457c 100644 --- a/pulsar/client/transport/__init__.py +++ b/pulsar/client/transport/__init__.py @@ -23,12 +23,14 @@ def __get_transport_type(transport_type, os_module): transport_type = 'curl' return transport_type -# TODO: Provide urllib implementation if these unavailable, -# also explore a requests+poster option. from .curl import curl_available +from .requests import requests_multipart_post_available if curl_available: from .curl import get_file from .curl import post_file +elif requests_multipart_post_available: + from .requests import get_file + from .requests import post_file else: from .poster import get_file from .poster import post_file diff --git a/pulsar/client/transport/poster.py b/pulsar/client/transport/poster.py index f1a8b9b7617b5e05bc381febec3cb77b5a11c8a4..a79cd8dc57e6fc15dff0dba667d93ebbb1f87026 100644 --- a/pulsar/client/transport/poster.py +++ b/pulsar/client/transport/poster.py @@ -18,7 +18,7 @@ try: except ImportError: poster = None -POSTER_UNAVAILABLE_MESSAGE = "" +POSTER_UNAVAILABLE_MESSAGE = "Pulsar configured to use poster module - but it is unavailable. Please install poster." import logging log = logging.getLogger(__name__) diff --git a/pulsar/client/transport/requests.py b/pulsar/client/transport/requests.py new file mode 100644 index 0000000000000000000000000000000000000000..5c7715192c62780679a739e08ebb717822113ec6 --- /dev/null +++ b/pulsar/client/transport/requests.py @@ -0,0 +1,50 @@ +from __future__ import absolute_import +try: + from galaxy import eggs + eggs.require("requets") +except ImportError: + pass + +try: + import requests +except ImportError: + requests = None +requests_multipart_post_available = False +try: + import requests_toolbelt + requests_multipart_post_available = True +except ImportError: + requests_toolbelt = None + + +REQUESTS_UNAVAILABLE_MESSAGE = "Pulsar configured to use requests module - but it is unavailable. Please install requests." +REQUESTS_TOOLBELT_UNAVAILABLE_MESSAGE = "Pulsar configured to use requests_toolbelt module - but it is unavailable. Please install requests_toolbelt." + +import logging +log = logging.getLogger(__name__) + + +def post_file(url, path): + if requests_toolbelt is None: + raise ImportError(REQUESTS_TOOLBELT_UNAVAILABLE_MESSAGE) + + __ensure_requests() + m = requests_toolbelt.MultipartEncoder( + fields={'file': ('filename', open(path, 'rb'))} + ) + requests.post(url, data=m, headers={'Content-Type': m.content_type}) + + +def get_file(url, path): + __ensure_requests() + r = requests.get(url, stream=True) + with open(path, 'wb') as f: + for chunk in r.iter_content(chunk_size=1024): + if chunk: # filter out keep-alive new chunks + f.write(chunk) + f.flush() + + +def __ensure_requests(): + if requests is None: + raise ImportError(REQUESTS_UNAVAILABLE_MESSAGE) diff --git a/test/integration_test.py b/test/integration_test.py index 2ff70278d1d4e148a5e47ab2e4aae5940a5fa71d..a795a775081c7f8d0b35deb6dc9e80099fe62dba 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -158,7 +158,7 @@ class IntegrationTests(BaseIntegrationTest): class DirectIntegrationTests(IntegrationTests): default_kwargs = dict(direct_interface=True, test_requirement=False) - @skipUnlessAnyModule(["pycurl", "poster"]) + @skipUnlessAnyModule(["pycurl", "poster", "requests_toolbelt"]) def test_integration_remote_transfer(self): self._run( private_token=None,