Skip to content
Snippets Groups Projects
Commit ec577686 authored by Nate Coraor's avatar Nate Coraor
Browse files

Allow the curl client transport to have a timeout value set (includes a

general framework for setting any other transport options that might be
necessary), and add an exception that can be raised up to the pulsar
runner in Galaxy to handle communication problems while checking state
or submitting jobs.

Fixes a problem encountered on usegalaxy.org where the monitor thread
would encounter some network problem and block forever (until handler
restart).

Partially fixes #114 in that restarts will not kill running jobs, but if
Galaxy attempts to submit a job while Pulsar is down it will still fail.
parent 07522636
No related branches found
No related tags found
No related merge requests found
...@@ -48,6 +48,7 @@ from .client import OutputNotFoundException ...@@ -48,6 +48,7 @@ from .client import OutputNotFoundException
from .manager import build_client_manager from .manager import build_client_manager
from .destination import url_to_destination_params from .destination import url_to_destination_params
from .path_mapper import PathMapper from .path_mapper import PathMapper
from .exceptions import PulsarClientTransportError
__all__ = [ __all__ = [
'build_client_manager', 'build_client_manager',
...@@ -59,4 +60,5 @@ __all__ = [ ...@@ -59,4 +60,5 @@ __all__ = [
'PulsarOutputs', 'PulsarOutputs',
'ClientOutputs', 'ClientOutputs',
'PathMapper', 'PathMapper',
'PulsarClientTransportError',
] ]
"""
Pulsar client exceptions
"""
class PulsarClientTransportError(Exception):
TIMEOUT = 'timeout'
CONNECTION_REFUSED = 'connection_refused'
UNKNOWN = 'unknown'
messages = {
TIMEOUT: 'Connection timed out',
CONNECTION_REFUSED: 'Connection refused',
UNKNOWN: 'Unknown transport error'
}
INVALID_CODE_MESSAGE = 'Unknown transport error code: %s'
def __init__(self, code=None, message=None,
transport_code=None, transport_message=None):
self.code = code or PulsarClientTransportError.UNKNOWN
self.message = message or PulsarClientTransportError.messages.get(
self.code,
PulsarClientTransportError.INVALID_CODE_MESSAGE % code
)
self.transport_code = transport_code
self.transport_message = transport_message
if transport_code or transport_message:
self.message += " ("
if transport_code:
self.message += "transport code: %s" % transport_code
if transport_message:
self.message += ", "
if transport_message:
self.message += "transport message: %s" % transport_message
self.message += ")"
def __str__(self):
return self.message
...@@ -63,7 +63,8 @@ class ClientManager(object): ...@@ -63,7 +63,8 @@ class ClientManager(object):
else: else:
self.job_manager_interface_class = HttpPulsarInterface self.job_manager_interface_class = HttpPulsarInterface
transport_type = kwds.get('transport', None) transport_type = kwds.get('transport', None)
transport = get_transport(transport_type) transport_params = dict([(p.replace('transport_', '', 1), v) for p, v in kwds.items() if p.startswith('transport_')])
transport = get_transport(transport_type, transport_params=transport_params)
self.job_manager_interface_args = dict(transport=transport) self.job_manager_interface_args = dict(transport=transport)
cache = kwds.get('cache', None) cache = kwds.get('cache', None)
if cache is None: if cache is None:
......
...@@ -18,12 +18,14 @@ else: ...@@ -18,12 +18,14 @@ else:
from .poster import post_file from .poster import post_file
def get_transport(transport_type=None, os_module=os): def get_transport(transport_type=None, os_module=os, transport_params=None):
transport_type = _get_transport_type(transport_type, os_module) transport_type = _get_transport_type(transport_type, os_module)
if not transport_params:
transport_params = {}
if transport_type == 'urllib': if transport_type == 'urllib':
transport = Urllib2Transport() transport = Urllib2Transport(**transport_params)
else: else:
transport = PycurlTransport() transport = PycurlTransport(**transport_params)
return transport return transport
......
import os.path
import logging import logging
from six import string_types from six import string_types
from six import BytesIO from six import BytesIO
try: try:
from pycurl import Curl, HTTP_CODE import pycurl
from pycurl import Curl, HTTP_CODE, error
curl_available = True curl_available = True
except ImportError: except ImportError:
curl_available = False curl_available = False
import os.path from ..exceptions import PulsarClientTransportError
PYCURL_UNAVAILABLE_MESSAGE = \ PYCURL_UNAVAILABLE_MESSAGE = \
...@@ -24,6 +26,9 @@ log = logging.getLogger(__name__) ...@@ -24,6 +26,9 @@ log = logging.getLogger(__name__)
class PycurlTransport(object): class PycurlTransport(object):
def __init__(self, timeout=None, **kwrgs):
self.timeout = timeout
def execute(self, url, method=None, data=None, input_path=None, output_path=None): def execute(self, url, method=None, data=None, input_path=None, output_path=None):
buf = _open_output(output_path) buf = _open_output(output_path)
try: try:
...@@ -41,7 +46,15 @@ class PycurlTransport(object): ...@@ -41,7 +46,15 @@ class PycurlTransport(object):
if isinstance(data, string_types): if isinstance(data, string_types):
data = data.encode('UTF-8') data = data.encode('UTF-8')
c.setopt(c.POSTFIELDS, data) c.setopt(c.POSTFIELDS, data)
c.perform() if self.timeout:
c.setopt(c.TIMEOUT, self.timeout)
try:
c.perform()
except error as exc:
raise PulsarClientTransportError(
_error_curl_to_pulsar(exc.args[0]),
transport_code=exc.args[0],
transport_message=exc.args[1])
if not output_path: if not output_path:
return buf.getvalue() return buf.getvalue()
finally: finally:
...@@ -103,6 +116,14 @@ def _new_curl_object(): ...@@ -103,6 +116,14 @@ def _new_curl_object():
except NameError: except NameError:
raise ImportError(PYCURL_UNAVAILABLE_MESSAGE) raise ImportError(PYCURL_UNAVAILABLE_MESSAGE)
def _error_curl_to_pulsar(code):
if code == pycurl.E_OPERATION_TIMEDOUT:
return PulsarClientTransportError.TIMEOUT
elif code == pycurl.E_COULDNT_CONNECT:
return PulsarClientTransportError.CONNECTION_REFUSED
return None
__all__ = [ __all__ = [
'PycurlTransport', 'PycurlTransport',
'post_file', 'post_file',
......
...@@ -16,6 +16,9 @@ except ImportError: ...@@ -16,6 +16,9 @@ except ImportError:
class Urllib2Transport(object): class Urllib2Transport(object):
def __init__(self, **kwrgs):
pass
def _url_open(self, request, data): def _url_open(self, request, data):
return urlopen(request, data) return urlopen(request, data)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment