Skip to content
Snippets Groups Projects
Commit d3b42bda authored by John Chilton's avatar John Chilton
Browse files

Merge pull request #115 from natefoo/transport-timeout

curl transport improvements (timeout handling)
parents 07522636 679137dc
No related branches found
No related tags found
No related merge requests found
...@@ -48,6 +48,7 @@ from .client import OutputNotFoundException ...@@ -48,6 +48,7 @@ from .client import OutputNotFoundException
from .manager import build_client_manager from .manager import build_client_manager
from .destination import url_to_destination_params from .destination import url_to_destination_params
from .path_mapper import PathMapper from .path_mapper import PathMapper
from .exceptions import PulsarClientTransportError
__all__ = [ __all__ = [
'build_client_manager', 'build_client_manager',
...@@ -59,4 +60,5 @@ __all__ = [ ...@@ -59,4 +60,5 @@ __all__ = [
'PulsarOutputs', 'PulsarOutputs',
'ClientOutputs', 'ClientOutputs',
'PathMapper', 'PathMapper',
'PulsarClientTransportError',
] ]
"""
Pulsar client exceptions
"""
class PulsarClientTransportError(Exception):
TIMEOUT = 'timeout'
CONNECTION_REFUSED = 'connection_refused'
UNKNOWN = 'unknown'
messages = {
TIMEOUT: 'Connection timed out',
CONNECTION_REFUSED: 'Connection refused',
UNKNOWN: 'Unknown transport error'
}
INVALID_CODE_MESSAGE = 'Unknown transport error code: %s'
def __init__(self, code=None, message=None,
transport_code=None, transport_message=None):
self.code = code or PulsarClientTransportError.UNKNOWN
self.message = message or PulsarClientTransportError.messages.get(
self.code,
PulsarClientTransportError.INVALID_CODE_MESSAGE % code
)
self.transport_code = transport_code
self.transport_message = transport_message
if transport_code or transport_message:
self.message += " ("
if transport_code:
self.message += "transport code: %s" % transport_code
if transport_message:
self.message += ", "
if transport_message:
self.message += "transport message: %s" % transport_message
self.message += ")"
def __str__(self):
return self.message
...@@ -63,7 +63,8 @@ class ClientManager(object): ...@@ -63,7 +63,8 @@ class ClientManager(object):
else: else:
self.job_manager_interface_class = HttpPulsarInterface self.job_manager_interface_class = HttpPulsarInterface
transport_type = kwds.get('transport', None) transport_type = kwds.get('transport', None)
transport = get_transport(transport_type) transport_params = dict([(p.replace('transport_', '', 1), v) for p, v in kwds.items() if p.startswith('transport_')])
transport = get_transport(transport_type, transport_params=transport_params)
self.job_manager_interface_args = dict(transport=transport) self.job_manager_interface_args = dict(transport=transport)
cache = kwds.get('cache', None) cache = kwds.get('cache', None)
if cache is None: if cache is None:
......
...@@ -18,12 +18,14 @@ else: ...@@ -18,12 +18,14 @@ else:
from .poster import post_file from .poster import post_file
def get_transport(transport_type=None, os_module=os): def get_transport(transport_type=None, os_module=os, transport_params=None):
transport_type = _get_transport_type(transport_type, os_module) transport_type = _get_transport_type(transport_type, os_module)
if not transport_params:
transport_params = {}
if transport_type == 'urllib': if transport_type == 'urllib':
transport = Urllib2Transport() transport = Urllib2Transport(**transport_params)
else: else:
transport = PycurlTransport() transport = PycurlTransport(**transport_params)
return transport return transport
......
import os.path
import logging import logging
from six import string_types from six import string_types
from six import BytesIO from six import BytesIO
try: try:
from pycurl import Curl, HTTP_CODE import pycurl
from pycurl import Curl, HTTP_CODE, error
curl_available = True curl_available = True
except ImportError: except ImportError:
curl_available = False curl_available = False
import os.path from ..exceptions import PulsarClientTransportError
PYCURL_UNAVAILABLE_MESSAGE = \ PYCURL_UNAVAILABLE_MESSAGE = \
...@@ -24,6 +26,9 @@ log = logging.getLogger(__name__) ...@@ -24,6 +26,9 @@ log = logging.getLogger(__name__)
class PycurlTransport(object): class PycurlTransport(object):
def __init__(self, timeout=None, **kwrgs):
self.timeout = timeout
def execute(self, url, method=None, data=None, input_path=None, output_path=None): def execute(self, url, method=None, data=None, input_path=None, output_path=None):
buf = _open_output(output_path) buf = _open_output(output_path)
try: try:
...@@ -41,7 +46,15 @@ class PycurlTransport(object): ...@@ -41,7 +46,15 @@ class PycurlTransport(object):
if isinstance(data, string_types): if isinstance(data, string_types):
data = data.encode('UTF-8') data = data.encode('UTF-8')
c.setopt(c.POSTFIELDS, data) c.setopt(c.POSTFIELDS, data)
c.perform() if self.timeout:
c.setopt(c.TIMEOUT, self.timeout)
try:
c.perform()
except error as exc:
raise PulsarClientTransportError(
_error_curl_to_pulsar(exc.args[0]),
transport_code=exc.args[0],
transport_message=exc.args[1])
if not output_path: if not output_path:
return buf.getvalue() return buf.getvalue()
finally: finally:
...@@ -103,6 +116,14 @@ def _new_curl_object(): ...@@ -103,6 +116,14 @@ def _new_curl_object():
except NameError: except NameError:
raise ImportError(PYCURL_UNAVAILABLE_MESSAGE) raise ImportError(PYCURL_UNAVAILABLE_MESSAGE)
def _error_curl_to_pulsar(code):
if code == pycurl.E_OPERATION_TIMEDOUT:
return PulsarClientTransportError.TIMEOUT
elif code == pycurl.E_COULDNT_CONNECT:
return PulsarClientTransportError.CONNECTION_REFUSED
return None
__all__ = [ __all__ = [
'PycurlTransport', 'PycurlTransport',
'post_file', 'post_file',
......
...@@ -4,20 +4,27 @@ Pulsar HTTP Client layer based on Python Standard Library (urllib2) ...@@ -4,20 +4,27 @@ Pulsar HTTP Client layer based on Python Standard Library (urllib2)
from __future__ import with_statement from __future__ import with_statement
from os.path import getsize from os.path import getsize
import mmap import mmap
import socket
try: try:
from urllib2 import urlopen from urllib2 import urlopen, URLError
except ImportError: except ImportError:
from urllib.request import urlopen from urllib.request import urlopen
from urllib.error import URLError
try: try:
from urllib2 import Request from urllib2 import Request
except ImportError: except ImportError:
from urllib.request import Request from urllib.request import Request
from ..exceptions import PulsarClientTransportError
class Urllib2Transport(object): class Urllib2Transport(object):
def __init__(self, timeout=None, **kwrgs):
self.timeout = timeout
def _url_open(self, request, data): def _url_open(self, request, data):
return urlopen(request, data) return urlopen(request, data, self.timeout)
def execute(self, url, method=None, data=None, input_path=None, output_path=None): def execute(self, url, method=None, data=None, input_path=None, output_path=None):
request = self.__request(url, data, method) request = self.__request(url, data, method)
...@@ -31,7 +38,12 @@ class Urllib2Transport(object): ...@@ -31,7 +38,12 @@ class Urllib2Transport(object):
else: else:
data = b"" data = b""
request.add_header('Content-Length', str(size)) request.add_header('Content-Length', str(size))
response = self._url_open(request, data) try:
response = self._url_open(request, data)
except socket.timeout as exc:
raise PulsarClientTransportError(code=PulsarClientTransportError.TIMEOUT)
except URLError as exc:
raise PulsarClientTransportError(transport_message=exc.reason)
finally: finally:
if input: if input:
input.close() input.close()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment