diff --git a/pulsar/client/transport/__init__.py b/pulsar/client/transport/__init__.py index 1f1925e3c1c9d3a411d34973ce706f1e39f6be19..d6ff6469154dca2582a00755f06479a064043443 100644 --- a/pulsar/client/transport/__init__.py +++ b/pulsar/client/transport/__init__.py @@ -25,7 +25,12 @@ def __get_transport_type(transport_type, os_module): # TODO: Provide urllib implementation if these unavailable, # also explore a requests+poster option. -from .curl import get_file -from .curl import post_file +from .curl import curl_available +if curl_available: + from .curl import get_file + from .curl import post_file +else: + from .poster import get_file + from .poster import post_file __all__ = [get_transport, get_file, post_file] diff --git a/pulsar/client/transport/curl.py b/pulsar/client/transport/curl.py index 9594c7195419239e66c84934a1f98e4e4d36ce4f..e4d3d36b624d6e58537fa8c27a0283a2a23a9e87 100644 --- a/pulsar/client/transport/curl.py +++ b/pulsar/client/transport/curl.py @@ -2,10 +2,11 @@ try: from cStringIO import StringIO except ImportError: from io import StringIO +curl_available = True try: from pycurl import Curl except ImportError: - pass + curl_available = False from os.path import getsize diff --git a/pulsar/client/transport/poster.py b/pulsar/client/transport/poster.py new file mode 100644 index 0000000000000000000000000000000000000000..f1a8b9b7617b5e05bc381febec3cb77b5a11c8a4 --- /dev/null +++ b/pulsar/client/transport/poster.py @@ -0,0 +1,56 @@ +from __future__ import absolute_import +try: + from urllib2 import urlopen +except ImportError: + from urllib.request import urlopen +try: + from urllib2 import Request +except ImportError: + from urllib.request import Request +try: + from galaxy import eggs + eggs.require("poster") +except ImportError: + pass + +try: + import poster +except ImportError: + poster = None + +POSTER_UNAVAILABLE_MESSAGE = "" + +import logging +log = logging.getLogger(__name__) + + +if poster is not None: + poster.streaminghttp.register_openers() + + +def post_file(url, path): + __ensure_poster() + try: + datagen, headers = poster.encode.multipart_encode({"file": open(path, "rb")}) + request = Request(url, datagen, headers) + return urlopen(request).read() + except: + log.exception("problem") + raise + + +def get_file(url, path): + __ensure_poster() + request = Request(url=url) + response = urlopen(request) + with open(path, 'wb') as output: + while True: + buffer = response.read(1024) + if not buffer: + break + output.write(buffer) + + +def __ensure_poster(): + if poster is None: + raise ImportError(POSTER_UNAVAILABLE_MESSAGE) diff --git a/test/integration_test.py b/test/integration_test.py index b0f53c8b296b15fce27688785c8207f9a30f4792..2ff70278d1d4e148a5e47ab2e4aae5940a5fa71d 100644 --- a/test/integration_test.py +++ b/test/integration_test.py @@ -2,7 +2,12 @@ from os.path import join from os import makedirs, system from six import next, itervalues from six.moves import configparser -from .test_utils import TempDirectoryTestCase, skipUnlessExecutable, skipUnlessModule +from .test_utils import ( + TempDirectoryTestCase, + skipUnlessExecutable, + skipUnlessModule, + skipUnlessAnyModule +) from .test_utils import test_pulsar_app from .test_utils import test_pulsar_server @@ -153,7 +158,7 @@ class IntegrationTests(BaseIntegrationTest): class DirectIntegrationTests(IntegrationTests): default_kwargs = dict(direct_interface=True, test_requirement=False) - @skipUnlessModule("pycurl") + @skipUnlessAnyModule(["pycurl", "poster"]) def test_integration_remote_transfer(self): self._run( private_token=None, diff --git a/test/test_utils.py b/test/test_utils.py index 21ba03a71ede25b097917c23707b6fd8fbf3f4d4..6fe440ab3e1c6d2ce6425d68c36cdc4ed0c09bd6 100644 --- a/test/test_utils.py +++ b/test/test_utils.py @@ -187,6 +187,19 @@ def skipUnlessModule(module): return skip("Module %s could not be loaded, dependent test skipped." % module) +def skipUnlessAnyModule(modules): + available = False + for module in modules: + try: + __import__(module) + except ImportError: + continue + available = True + if available: + return lambda func: func + return skip("None of the modules %s could be loaded, dependent test skipped." % modules) + + def __which(program): def is_exe(fpath):