Skip to content

Commit

Permalink
env var to enable specific queues
Browse files Browse the repository at this point in the history
  • Loading branch information
mdmatthias committed Apr 22, 2024
1 parent 5cb2a64 commit 629f815
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 50 deletions.
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "amqpstorm-flask"
version = "0.3.4"
version = "0.4.0"
description = "amqpstorm library for Flask"
readme = "README.md"
authors = [{ name = "Inuits", email = "[email protected]" }]
Expand Down
106 changes: 57 additions & 49 deletions src/amqpstorm_flask/RabbitMQ.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,55 +195,63 @@ def queue(
queue_arguments = {"x-queue-type": "quorum"}

def decorator(f):
@wraps(f)
def new_consumer():
queue = f.__name__.replace("_", getenv("MQ_DELIMITER", ".")) if queue_name is None else queue_name
retries = 0
while retries <= max_retries:
try:
self._validate_channel_connection()
self.channel.exchange.declare(
exchange_name if exchange_name else self.mq_exchange,
exchange_type=exchange_type,
durable=self.exchange_params.durable,
passive=self.exchange_params.passive,
auto_delete=self.exchange_params.auto_delete,
)
self.channel.queue.declare(
queue=queue,
durable=self.queue_params.durable,
passive=self.queue_params.passive if passive_queue is None else passive_queue,
auto_delete=self.queue_params.auto_delete,
arguments=queue_arguments,
)
self.channel.basic.qos(prefetch_count=prefetch_count)
cb_function = f if full_message_object else self.__create_wrapper_function(routing_key, f)
self.channel.basic.consume(
cb_function, queue=queue, no_ack=self.queue_params.no_ack if auto_ack is None else auto_ack
)
self.channel.queue.bind(
queue=queue,
exchange=self.mq_exchange,
routing_key=routing_key,
)
self.logger.info(f"Start consuming queue {queue}")
self.channel.start_consuming()
except Exception as ex:
retries += 1
if retries > max_retries:
exit(0)

self.logger.exception(
"An error occurred while consuming queue %s: %s",
queue,
ex,
)
self.logger.warning(f"Retrying in {retry_delay} seconds...")
sleep(retry_delay)

thread = threading.Thread(target=new_consumer)
thread.daemon = True
thread.start()
queue = f.__name__.replace("_", getenv("MQ_DELIMITER", ".")) if queue_name is None else queue_name

enabled_queues = None if getenv("MQ_QUEUES") is None else getenv("MQ_QUEUES").split(",")
if enabled_queues is None:
all_queues_enabled = True
else:
all_queues_enabled = False

This comment has been minimized.

Copy link
@gverm

gverm Apr 22, 2024

Contributor

Why not just:

all_queues_enabled = enabled_queues is None

since enabled_queues can only be None or not None.

This comment has been minimized.

Copy link
@mdmatthias

mdmatthias Apr 22, 2024

Author Contributor

correct, I'll change it


if queue in enabled_queues or all_queues_enabled:
@wraps(f)
def new_consumer():
retries = 0
while retries <= max_retries:
try:
self._validate_channel_connection()
self.channel.exchange.declare(
exchange_name if exchange_name else self.mq_exchange,
exchange_type=exchange_type,
durable=self.exchange_params.durable,
passive=self.exchange_params.passive,
auto_delete=self.exchange_params.auto_delete,
)
self.channel.queue.declare(
queue=queue,
durable=self.queue_params.durable,
passive=self.queue_params.passive if passive_queue is None else passive_queue,
auto_delete=self.queue_params.auto_delete,
arguments=queue_arguments,
)
self.channel.basic.qos(prefetch_count=prefetch_count)
cb_function = f if full_message_object else self.__create_wrapper_function(routing_key, f)
self.channel.basic.consume(
cb_function, queue=queue, no_ack=self.queue_params.no_ack if auto_ack is None else auto_ack
)
self.channel.queue.bind(
queue=queue,
exchange=self.mq_exchange,
routing_key=routing_key,
)
self.logger.info(f"Start consuming queue {queue}")
self.channel.start_consuming()
except Exception as ex:
retries += 1
if retries > max_retries:
exit(0)

self.logger.exception(
"An error occurred while consuming queue %s: %s",
queue,
ex,
)
self.logger.warning(f"Retrying in {retry_delay} seconds...")
sleep(retry_delay)

thread = threading.Thread(target=new_consumer)
thread.daemon = True
thread.start()

return f

Expand Down

0 comments on commit 629f815

Please sign in to comment.