From d50d35bc7d322f982d7b907ada0b1c4138317c09 Mon Sep 17 00:00:00 2001 From: Ryan Block Date: Tue, 15 Aug 2023 20:08:28 -0700 Subject: [PATCH] Ensure FIFO queues get `MessageGroupId` parameter in `queues.publish()` --- arc/queues/__init__.py | 9 ++++++++- tests/test_queues.py | 4 +++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/arc/queues/__init__.py b/arc/queues/__init__.py index 47311fd..bf56075 100644 --- a/arc/queues/__init__.py +++ b/arc/queues/__init__.py @@ -1,5 +1,7 @@ import json +import os import urllib.request +import uuid import boto3 from arc.services import _services from arc._lib import use_aws, get_ports @@ -37,8 +39,13 @@ def publish_aws(name, payload): _sqs_client_cache = boto3.client("sqs") def pub(arn): + stack = os.environ.get("ARC_STACK_NAME") return _sqs_client_cache.send_message( - QueueUrl=arn, MessageBody=json.dumps(payload), DelaySeconds=0 + QueueUrl=arn, + MessageBody=json.dumps(payload), + DelaySeconds=0, + MessageGroupId=stack, + MessageDeduplicationId=str(uuid.uuid4()), ) if _queues_cache.get(name): diff --git a/tests/test_queues.py b/tests/test_queues.py index 99bb8da..1dd49d5 100644 --- a/tests/test_queues.py +++ b/tests/test_queues.py @@ -25,7 +25,9 @@ def test_parse(): def test_queue_publish(arc_services, sqs_client): queue_name = "continuum" - sqs_client.create_queue(QueueName=queue_name) + sqs_client.create_queue( + QueueName=queue_name + ".fifo", Attributes={"FifoQueue": "true"} + ) queues = sqs_client.list_queues() arc_services(params={f"queues/{queue_name}": queues["QueueUrls"][0]})