diff --git a/pulsar/client/amqp_exchange.py b/pulsar/client/amqp_exchange.py index 9c47ee0803a7441189daf913d763f13ed4441808..c0e34e3eca88418af222d28a6433e20887eeb83e 100644 --- a/pulsar/client/amqp_exchange.py +++ b/pulsar/client/amqp_exchange.py @@ -214,13 +214,16 @@ class PulsarExchange(object): def ack_manager(self): log.debug('Acknowledgement manager thread alive') + failed = set() try: while True: sleep(DEFAULT_ACK_MANAGER_SLEEP) with self.publish_ack_lock: for unack_uuid in self.publish_uuid_store.keys(): if self.publish_uuid_store.get_time(unack_uuid) < time() - self.__republish_time: - payload = self.publish_uuid_store[unack_uuid] + payload = self.__get_payload(unack_uuid, failed) + if payload is None: + continue payload[ACK_FORCE_NOACK_KEY] = True resubmit_queue = payload[ACK_SUBMIT_QUEUE_KEY] log.debug('UUID %s has not been acknowledged, ' @@ -233,6 +236,28 @@ class PulsarExchange(object): raise log.debug('Acknowledgedment manager thread exiting') + def __get_payload(self, uuid, failed): + """Retry reading a message from the publish_uuid_store once, delete on the second failure.""" + # Caller should have the publish_uuid_store lock + try: + return self.publish_uuid_store[uuid] + except Exception as exc: + msg = "Failed to load payload from publish store for UUID %s, %s: %s" + if uuid in failed: + log.error(msg, uuid, "discarding", str(exc)) + self.__discard_publish_uuid(uuid, failed) + else: + log.error(msg, uuid, "will try agan", str(exc)) + failed.add(uuid) + return None + + def __discard_publish_uuid(self, uuid, failed): + try: + del self.publish_uuid_store[uuid] + failed.discard(uuid) + except Exception as exc: + log.error("Failed to discard UUID %s from publish store: %s", uuid, str(exc)) + def __prepare_publish_kwds(self, publish_log_prefix): if "retry_policy" in self.__publish_kwds: publish_kwds = copy.deepcopy(self.__publish_kwds)