From 0865ba6c5286e69b7a9f0cecdd47cfa233bc7871 Mon Sep 17 00:00:00 2001
From: John Chilton <jmchilton@gmail.com>
Date: Wed, 17 Sep 2014 15:04:43 -0400
Subject: [PATCH] Improve publish logging.

Assign each publish message a UUID and include this in error handling.
---
 pulsar/client/amqp_exchange.py | 35 +++++++++++++++++++++++++++-------
 1 file changed, 28 insertions(+), 7 deletions(-)

diff --git a/pulsar/client/amqp_exchange.py b/pulsar/client/amqp_exchange.py
index 20591c22..1d909967 100644
--- a/pulsar/client/amqp_exchange.py
+++ b/pulsar/client/amqp_exchange.py
@@ -4,6 +4,8 @@ try:
 except ImportError:
     kombu = None
 
+import copy
+import uuid
 import socket
 import logging
 import threading
@@ -101,24 +103,43 @@ class PulsarExchange(object):
         log.debug('AMQP heartbeat thread exiting')
 
     def publish(self, name, payload):
+        # Consider optionally disabling if throughput becomes main concern.
+        transaction_uuid = uuid.uuid1()
         key = self.__queue_name(name)
-        log.debug("Begin publishing to key %s" % key)
+        publish_log_prefix = self.__publish_log_prefex(transaction_uuid)
+        log.debug("%sBegin publishing to key %s", publish_log_prefix, key)
         with self.connection(self.__url) as connection:
             with pools.producers[connection].acquire() as producer:
-                log.debug("Have producer for publishing to key %s" % key)
+                log.debug("%sHave producer for publishing to key %s", publish_log_prefix, key)
+                publish_kwds = self.__prepare_publish_kwds(publish_log_prefix)
                 producer.publish(
                     payload,
                     serializer='json',
                     exchange=self.__exchange,
                     declare=[self.__exchange],
                     routing_key=key,
-                    **self.__publish_kwds
+                    **publish_kwds
                 )
-                log.debug("Published to key %s" % key)
+                log.debug("%sPublished to key %s", publish_log_prefix, key)
 
-    def __publish_errback(self, exc, interval):
-        log.error("Connection error while publishing: %r", exc, exc_info=1)
-        log.info("Retrying in %s seconds", interval)
+    def __prepare_publish_kwds(self, publish_log_prefix):
+        if "retry_policy" in self.__publish_kwds:
+            publish_kwds = copy.deepcopy(self.__publish_kwds)
+            errback = lambda exc, interval: self.__publish_errback(exc, interval, publish_log_prefix)
+            publish_kwds["retry_policy"]["errback"] = errback
+        else:
+            publish_kwds = self.__publish_kwds
+        return publish_kwds
+
+    def __publish_errback(self, exc, interval, publish_log_prefix=""):
+        log.error("%sConnection error while publishing: %r", publish_log_prefix, exc, exc_info=1)
+        log.info("%sRetrying in %s seconds", publish_log_prefix, interval)
+
+    def __publish_log_prefex(self, transaction_uuid=None):
+        prefix = ""
+        if transaction_uuid:
+            prefix = "[publish:%s] " % str(transaction_uuid)
+        return prefix
 
     def connection(self, connection_string, **kwargs):
         if "ssl" not in kwargs:
-- 
GitLab