Skip to content
Snippets Groups Projects
Commit 0865ba6c authored by John Chilton's avatar John Chilton
Browse files

Improve publish logging.

Assign each publish message a UUID and include this in error handling.
parent f8d832fe
No related branches found
No related tags found
No related merge requests found
...@@ -4,6 +4,8 @@ try: ...@@ -4,6 +4,8 @@ try:
except ImportError: except ImportError:
kombu = None kombu = None
import copy
import uuid
import socket import socket
import logging import logging
import threading import threading
...@@ -101,24 +103,43 @@ class PulsarExchange(object): ...@@ -101,24 +103,43 @@ class PulsarExchange(object):
log.debug('AMQP heartbeat thread exiting') log.debug('AMQP heartbeat thread exiting')
def publish(self, name, payload): def publish(self, name, payload):
# Consider optionally disabling if throughput becomes main concern.
transaction_uuid = uuid.uuid1()
key = self.__queue_name(name) key = self.__queue_name(name)
log.debug("Begin publishing to key %s" % key) publish_log_prefix = self.__publish_log_prefex(transaction_uuid)
log.debug("%sBegin publishing to key %s", publish_log_prefix, key)
with self.connection(self.__url) as connection: with self.connection(self.__url) as connection:
with pools.producers[connection].acquire() as producer: with pools.producers[connection].acquire() as producer:
log.debug("Have producer for publishing to key %s" % key) log.debug("%sHave producer for publishing to key %s", publish_log_prefix, key)
publish_kwds = self.__prepare_publish_kwds(publish_log_prefix)
producer.publish( producer.publish(
payload, payload,
serializer='json', serializer='json',
exchange=self.__exchange, exchange=self.__exchange,
declare=[self.__exchange], declare=[self.__exchange],
routing_key=key, routing_key=key,
**self.__publish_kwds **publish_kwds
) )
log.debug("Published to key %s" % key) log.debug("%sPublished to key %s", publish_log_prefix, key)
def __publish_errback(self, exc, interval): def __prepare_publish_kwds(self, publish_log_prefix):
log.error("Connection error while publishing: %r", exc, exc_info=1) if "retry_policy" in self.__publish_kwds:
log.info("Retrying in %s seconds", interval) publish_kwds = copy.deepcopy(self.__publish_kwds)
errback = lambda exc, interval: self.__publish_errback(exc, interval, publish_log_prefix)
publish_kwds["retry_policy"]["errback"] = errback
else:
publish_kwds = self.__publish_kwds
return publish_kwds
def __publish_errback(self, exc, interval, publish_log_prefix=""):
log.error("%sConnection error while publishing: %r", publish_log_prefix, exc, exc_info=1)
log.info("%sRetrying in %s seconds", publish_log_prefix, interval)
def __publish_log_prefex(self, transaction_uuid=None):
prefix = ""
if transaction_uuid:
prefix = "[publish:%s] " % str(transaction_uuid)
return prefix
def connection(self, connection_string, **kwargs): def connection(self, connection_string, **kwargs):
if "ssl" not in kwargs: if "ssl" not in kwargs:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment