diff --git a/pyproject.toml b/pyproject.toml index 371ca5e..6ca5159 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta" [project] name = "amqpstorm-flask" -version = "0.3.4" +version = "0.4.0" description = "amqpstorm library for Flask" readme = "README.md" authors = [{ name = "Inuits", email = "developers@inuits.eu" }] diff --git a/src/amqpstorm_flask/RabbitMQ.py b/src/amqpstorm_flask/RabbitMQ.py index 1688cd4..95625c6 100644 --- a/src/amqpstorm_flask/RabbitMQ.py +++ b/src/amqpstorm_flask/RabbitMQ.py @@ -195,55 +195,63 @@ def queue( queue_arguments = {"x-queue-type": "quorum"} def decorator(f): - @wraps(f) - def new_consumer(): - queue = f.__name__.replace("_", getenv("MQ_DELIMITER", ".")) if queue_name is None else queue_name - retries = 0 - while retries <= max_retries: - try: - self._validate_channel_connection() - self.channel.exchange.declare( - exchange_name if exchange_name else self.mq_exchange, - exchange_type=exchange_type, - durable=self.exchange_params.durable, - passive=self.exchange_params.passive, - auto_delete=self.exchange_params.auto_delete, - ) - self.channel.queue.declare( - queue=queue, - durable=self.queue_params.durable, - passive=self.queue_params.passive if passive_queue is None else passive_queue, - auto_delete=self.queue_params.auto_delete, - arguments=queue_arguments, - ) - self.channel.basic.qos(prefetch_count=prefetch_count) - cb_function = f if full_message_object else self.__create_wrapper_function(routing_key, f) - self.channel.basic.consume( - cb_function, queue=queue, no_ack=self.queue_params.no_ack if auto_ack is None else auto_ack - ) - self.channel.queue.bind( - queue=queue, - exchange=self.mq_exchange, - routing_key=routing_key, - ) - self.logger.info(f"Start consuming queue {queue}") - self.channel.start_consuming() - except Exception as ex: - retries += 1 - if retries > max_retries: - exit(0) - - self.logger.exception( - "An error occurred while consuming queue %s: %s", - queue, - ex, - ) - self.logger.warning(f"Retrying in {retry_delay} seconds...") - sleep(retry_delay) - - thread = threading.Thread(target=new_consumer) - thread.daemon = True - thread.start() + queue = f.__name__.replace("_", getenv("MQ_DELIMITER", ".")) if queue_name is None else queue_name + + enabled_queues = None if getenv("MQ_QUEUES") is None else getenv("MQ_QUEUES").split(",") + if enabled_queues is None: + all_queues_enabled = True + else: + all_queues_enabled = False + + if queue in enabled_queues or all_queues_enabled: + @wraps(f) + def new_consumer(): + retries = 0 + while retries <= max_retries: + try: + self._validate_channel_connection() + self.channel.exchange.declare( + exchange_name if exchange_name else self.mq_exchange, + exchange_type=exchange_type, + durable=self.exchange_params.durable, + passive=self.exchange_params.passive, + auto_delete=self.exchange_params.auto_delete, + ) + self.channel.queue.declare( + queue=queue, + durable=self.queue_params.durable, + passive=self.queue_params.passive if passive_queue is None else passive_queue, + auto_delete=self.queue_params.auto_delete, + arguments=queue_arguments, + ) + self.channel.basic.qos(prefetch_count=prefetch_count) + cb_function = f if full_message_object else self.__create_wrapper_function(routing_key, f) + self.channel.basic.consume( + cb_function, queue=queue, no_ack=self.queue_params.no_ack if auto_ack is None else auto_ack + ) + self.channel.queue.bind( + queue=queue, + exchange=self.mq_exchange, + routing_key=routing_key, + ) + self.logger.info(f"Start consuming queue {queue}") + self.channel.start_consuming() + except Exception as ex: + retries += 1 + if retries > max_retries: + exit(0) + + self.logger.exception( + "An error occurred while consuming queue %s: %s", + queue, + ex, + ) + self.logger.warning(f"Retrying in {retry_delay} seconds...") + sleep(retry_delay) + + thread = threading.Thread(target=new_consumer) + thread.daemon = True + thread.start() return f