diff --git a/pulsar/client/__init__.py b/pulsar/client/__init__.py index 5ada39334bc8f35a051a9ad624dd702f9bf7683d..b28ab87cd93d2b8c915c847127c9e8583741f4cb 100644 --- a/pulsar/client/__init__.py +++ b/pulsar/client/__init__.py @@ -48,6 +48,7 @@ from .client import OutputNotFoundException from .manager import build_client_manager from .destination import url_to_destination_params from .path_mapper import PathMapper +from .exceptions import PulsarClientTransportError __all__ = [ 'build_client_manager', @@ -59,4 +60,5 @@ __all__ = [ 'PulsarOutputs', 'ClientOutputs', 'PathMapper', + 'PulsarClientTransportError', ] diff --git a/pulsar/client/exceptions.py b/pulsar/client/exceptions.py new file mode 100644 index 0000000000000000000000000000000000000000..095a67a730d8cbdbdd6ff471ee454438ee919554 --- /dev/null +++ b/pulsar/client/exceptions.py @@ -0,0 +1,39 @@ +""" +Pulsar client exceptions +""" + + +class PulsarClientTransportError(Exception): + TIMEOUT = 'timeout' + CONNECTION_REFUSED = 'connection_refused' + UNKNOWN = 'unknown' + + messages = { + TIMEOUT: 'Connection timed out', + CONNECTION_REFUSED: 'Connection refused', + UNKNOWN: 'Unknown transport error' + } + + INVALID_CODE_MESSAGE = 'Unknown transport error code: %s' + + def __init__(self, code=None, message=None, + transport_code=None, transport_message=None): + self.code = code or PulsarClientTransportError.UNKNOWN + self.message = message or PulsarClientTransportError.messages.get( + self.code, + PulsarClientTransportError.INVALID_CODE_MESSAGE % code + ) + self.transport_code = transport_code + self.transport_message = transport_message + if transport_code or transport_message: + self.message += " (" + if transport_code: + self.message += "transport code: %s" % transport_code + if transport_message: + self.message += ", " + if transport_message: + self.message += "transport message: %s" % transport_message + self.message += ")" + + def __str__(self): + return self.message diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py index ab0d367ff977f547ef80394f74633ab349845798..c43db0896c2db45738a00ff6b7d47ce458056f33 100644 --- a/pulsar/client/manager.py +++ b/pulsar/client/manager.py @@ -63,7 +63,8 @@ class ClientManager(object): else: self.job_manager_interface_class = HttpPulsarInterface transport_type = kwds.get('transport', None) - transport = get_transport(transport_type) + transport_params = dict([(p.replace('transport_', '', 1), v) for p, v in kwds.items() if p.startswith('transport_')]) + transport = get_transport(transport_type, transport_params=transport_params) self.job_manager_interface_args = dict(transport=transport) cache = kwds.get('cache', None) if cache is None: diff --git a/pulsar/client/transport/__init__.py b/pulsar/client/transport/__init__.py index 4ff9c7df25f95b48374f2242f0e1b81759a3d563..db2541baabb55c83c1a861ed612873b7e29972c0 100644 --- a/pulsar/client/transport/__init__.py +++ b/pulsar/client/transport/__init__.py @@ -18,12 +18,14 @@ else: from .poster import post_file -def get_transport(transport_type=None, os_module=os): +def get_transport(transport_type=None, os_module=os, transport_params=None): transport_type = _get_transport_type(transport_type, os_module) + if not transport_params: + transport_params = {} if transport_type == 'urllib': - transport = Urllib2Transport() + transport = Urllib2Transport(**transport_params) else: - transport = PycurlTransport() + transport = PycurlTransport(**transport_params) return transport diff --git a/pulsar/client/transport/curl.py b/pulsar/client/transport/curl.py index 633a57dbac6c3f0289abc9a43da3fa59f443a02a..6056e0f048d78c513102f4ba2f4767290cd49b52 100644 --- a/pulsar/client/transport/curl.py +++ b/pulsar/client/transport/curl.py @@ -1,15 +1,17 @@ +import os.path import logging from six import string_types from six import BytesIO try: - from pycurl import Curl, HTTP_CODE + import pycurl + from pycurl import Curl, HTTP_CODE, error curl_available = True except ImportError: curl_available = False -import os.path +from ..exceptions import PulsarClientTransportError PYCURL_UNAVAILABLE_MESSAGE = \ @@ -24,6 +26,9 @@ log = logging.getLogger(__name__) class PycurlTransport(object): + def __init__(self, timeout=None, **kwrgs): + self.timeout = timeout + def execute(self, url, method=None, data=None, input_path=None, output_path=None): buf = _open_output(output_path) try: @@ -41,7 +46,15 @@ class PycurlTransport(object): if isinstance(data, string_types): data = data.encode('UTF-8') c.setopt(c.POSTFIELDS, data) - c.perform() + if self.timeout: + c.setopt(c.TIMEOUT, self.timeout) + try: + c.perform() + except error as exc: + raise PulsarClientTransportError( + _error_curl_to_pulsar(exc.args[0]), + transport_code=exc.args[0], + transport_message=exc.args[1]) if not output_path: return buf.getvalue() finally: @@ -103,6 +116,14 @@ def _new_curl_object(): except NameError: raise ImportError(PYCURL_UNAVAILABLE_MESSAGE) +def _error_curl_to_pulsar(code): + if code == pycurl.E_OPERATION_TIMEDOUT: + return PulsarClientTransportError.TIMEOUT + elif code == pycurl.E_COULDNT_CONNECT: + return PulsarClientTransportError.CONNECTION_REFUSED + return None + + __all__ = [ 'PycurlTransport', 'post_file', diff --git a/pulsar/client/transport/standard.py b/pulsar/client/transport/standard.py index 9a5c46372e4fc2daa42ecd246bd61cc6f15739ea..bde2602e14649a08b868ff62bad5e218ebb931c7 100644 --- a/pulsar/client/transport/standard.py +++ b/pulsar/client/transport/standard.py @@ -16,6 +16,9 @@ except ImportError: class Urllib2Transport(object): + def __init__(self, **kwrgs): + pass + def _url_open(self, request, data): return urlopen(request, data)