diff --git a/pulsar/client/amqp_exchange.py b/pulsar/client/amqp_exchange.py index 65b1e8010b054aba488a4b53c00e11ab877d50f7..bde9b36e381e6e7e7850bcb4550629aeaa86d363 100644 --- a/pulsar/client/amqp_exchange.py +++ b/pulsar/client/amqp_exchange.py @@ -105,7 +105,7 @@ class PulsarExchange(object): # queue once before the ack manager starts doing its # thing if self.acks_enabled and queue_name.endswith(ACK_QUEUE_SUFFIX): - ack_manager_thread = self.__start_ack_manager(queue_name) + self.__start_ack_manager(queue_name) while check and connection.connected: try: connection.drain_events(timeout=self.__timeout) @@ -184,7 +184,7 @@ class PulsarExchange(object): publish_log_prefix = self.__publish_log_prefex(transaction_uuid) log.debug("%sBegin publishing to key %s", publish_log_prefix, key) if (self.acks_enabled and not name.endswith(ACK_QUEUE_SUFFIX) - and ACK_FORCE_NOACK_KEY not in payload): + and ACK_FORCE_NOACK_KEY not in payload): # Publishing a message on a normal queue and it's not a republish # (or explicitly forced do-not-ack), so add ack keys ack_uuid = str(transaction_uuid) diff --git a/pulsar/client/util.py b/pulsar/client/util.py index ae382c178f4494593983b13e04a6f435b7e8557f..35696befdc1d04a8cd1249236aa668a6068acb06 100644 --- a/pulsar/client/util.py +++ b/pulsar/client/util.py @@ -262,7 +262,7 @@ class MessageQueueUUIDStore(object): raise def keys(self): - return iter(os.listdir(self.__store)) + return iter(listdir(self.__store)) def get_time(self, key): try: