From ec577686400d1f015bfb0a2cf5ddb68d49d0967d Mon Sep 17 00:00:00 2001
From: Nate Coraor <nate@bx.psu.edu>
Date: Wed, 8 Jun 2016 15:48:58 -0400
Subject: [PATCH] Allow the curl client transport to have a timeout value set
 (includes a general framework for setting any other transport options that
 might be necessary), and add an exception that can be raised up to the pulsar
 runner in Galaxy to handle communication problems while checking state or
 submitting jobs.

Fixes a problem encountered on usegalaxy.org where the monitor thread
would encounter some network problem and block forever (until handler
restart).

Partially fixes #114 in that restarts will not kill running jobs, but if
Galaxy attempts to submit a job while Pulsar is down it will still fail.
---
 pulsar/client/__init__.py           |  2 ++
 pulsar/client/exceptions.py         | 39 +++++++++++++++++++++++++++++
 pulsar/client/manager.py            |  3 ++-
 pulsar/client/transport/__init__.py |  8 +++---
 pulsar/client/transport/curl.py     | 27 +++++++++++++++++---
 pulsar/client/transport/standard.py |  3 +++
 6 files changed, 75 insertions(+), 7 deletions(-)
 create mode 100644 pulsar/client/exceptions.py

diff --git a/pulsar/client/__init__.py b/pulsar/client/__init__.py
index 5ada3933..b28ab87c 100644
--- a/pulsar/client/__init__.py
+++ b/pulsar/client/__init__.py
@@ -48,6 +48,7 @@ from .client import OutputNotFoundException
 from .manager import build_client_manager
 from .destination import url_to_destination_params
 from .path_mapper import PathMapper
+from .exceptions import PulsarClientTransportError
 
 __all__ = [
     'build_client_manager',
@@ -59,4 +60,5 @@ __all__ = [
     'PulsarOutputs',
     'ClientOutputs',
     'PathMapper',
+    'PulsarClientTransportError',
 ]
diff --git a/pulsar/client/exceptions.py b/pulsar/client/exceptions.py
new file mode 100644
index 00000000..095a67a7
--- /dev/null
+++ b/pulsar/client/exceptions.py
@@ -0,0 +1,39 @@
+"""
+Pulsar client exceptions
+"""
+
+
+class PulsarClientTransportError(Exception):
+    TIMEOUT = 'timeout'
+    CONNECTION_REFUSED = 'connection_refused'
+    UNKNOWN = 'unknown'
+
+    messages = {
+        TIMEOUT: 'Connection timed out',
+        CONNECTION_REFUSED: 'Connection refused',
+        UNKNOWN: 'Unknown transport error'
+    }
+
+    INVALID_CODE_MESSAGE = 'Unknown transport error code: %s'
+
+    def __init__(self, code=None, message=None,
+                 transport_code=None, transport_message=None):
+        self.code = code or PulsarClientTransportError.UNKNOWN
+        self.message = message or PulsarClientTransportError.messages.get(
+            self.code,
+            PulsarClientTransportError.INVALID_CODE_MESSAGE % code
+        )
+        self.transport_code = transport_code
+        self.transport_message = transport_message
+        if transport_code or transport_message:
+            self.message += " ("
+            if transport_code:
+                self.message += "transport code: %s" % transport_code
+                if transport_message:
+                    self.message += ", "
+            if transport_message:
+                self.message += "transport message: %s" % transport_message
+            self.message += ")"
+
+    def __str__(self):
+        return self.message
diff --git a/pulsar/client/manager.py b/pulsar/client/manager.py
index ab0d367f..c43db089 100644
--- a/pulsar/client/manager.py
+++ b/pulsar/client/manager.py
@@ -63,7 +63,8 @@ class ClientManager(object):
         else:
             self.job_manager_interface_class = HttpPulsarInterface
             transport_type = kwds.get('transport', None)
-            transport = get_transport(transport_type)
+            transport_params = dict([(p.replace('transport_', '', 1), v) for p, v in kwds.items() if p.startswith('transport_')])
+            transport = get_transport(transport_type, transport_params=transport_params)
             self.job_manager_interface_args = dict(transport=transport)
         cache = kwds.get('cache', None)
         if cache is None:
diff --git a/pulsar/client/transport/__init__.py b/pulsar/client/transport/__init__.py
index 4ff9c7df..db2541ba 100644
--- a/pulsar/client/transport/__init__.py
+++ b/pulsar/client/transport/__init__.py
@@ -18,12 +18,14 @@ else:
     from .poster import post_file
 
 
-def get_transport(transport_type=None, os_module=os):
+def get_transport(transport_type=None, os_module=os, transport_params=None):
     transport_type = _get_transport_type(transport_type, os_module)
+    if not transport_params:
+        transport_params = {}
     if transport_type == 'urllib':
-        transport = Urllib2Transport()
+        transport = Urllib2Transport(**transport_params)
     else:
-        transport = PycurlTransport()
+        transport = PycurlTransport(**transport_params)
     return transport
 
 
diff --git a/pulsar/client/transport/curl.py b/pulsar/client/transport/curl.py
index 633a57db..6056e0f0 100644
--- a/pulsar/client/transport/curl.py
+++ b/pulsar/client/transport/curl.py
@@ -1,15 +1,17 @@
+import os.path
 import logging
 
 from six import string_types
 from six import BytesIO
 
 try:
-    from pycurl import Curl, HTTP_CODE
+    import pycurl
+    from pycurl import Curl, HTTP_CODE, error
     curl_available = True
 except ImportError:
     curl_available = False
 
-import os.path
+from ..exceptions import PulsarClientTransportError
 
 
 PYCURL_UNAVAILABLE_MESSAGE = \
@@ -24,6 +26,9 @@ log = logging.getLogger(__name__)
 
 class PycurlTransport(object):
 
+    def __init__(self, timeout=None, **kwrgs):
+        self.timeout = timeout
+
     def execute(self, url, method=None, data=None, input_path=None, output_path=None):
         buf = _open_output(output_path)
         try:
@@ -41,7 +46,15 @@ class PycurlTransport(object):
                 if isinstance(data, string_types):
                     data = data.encode('UTF-8')
                 c.setopt(c.POSTFIELDS, data)
-            c.perform()
+            if self.timeout:
+                c.setopt(c.TIMEOUT, self.timeout)
+            try:
+                c.perform()
+            except error as exc:
+                raise PulsarClientTransportError(
+                        _error_curl_to_pulsar(exc.args[0]),
+                        transport_code=exc.args[0],
+                        transport_message=exc.args[1])
             if not output_path:
                 return buf.getvalue()
         finally:
@@ -103,6 +116,14 @@ def _new_curl_object():
     except NameError:
         raise ImportError(PYCURL_UNAVAILABLE_MESSAGE)
 
+def _error_curl_to_pulsar(code):
+    if code == pycurl.E_OPERATION_TIMEDOUT:
+        return PulsarClientTransportError.TIMEOUT
+    elif code == pycurl.E_COULDNT_CONNECT:
+        return PulsarClientTransportError.CONNECTION_REFUSED
+    return None
+
+
 __all__ = [
     'PycurlTransport',
     'post_file',
diff --git a/pulsar/client/transport/standard.py b/pulsar/client/transport/standard.py
index 9a5c4637..bde2602e 100644
--- a/pulsar/client/transport/standard.py
+++ b/pulsar/client/transport/standard.py
@@ -16,6 +16,9 @@ except ImportError:
 
 class Urllib2Transport(object):
 
+    def __init__(self, **kwrgs):
+        pass
+
     def _url_open(self, request, data):
         return urlopen(request, data)
 
-- 
GitLab