From 9a267b93403586be9a78194bb614a2af2a7955d8 Mon Sep 17 00:00:00 2001 From: "cristopher.moccia" Date: Thu, 30 Nov 2023 12:56:42 +0100 Subject: [PATCH] some fix --- config/application.properties | 2 +- .../interop/signalhub/persister/config/AwsBeanBuilder.java | 2 +- .../persister/queue/consumer/SqsInternalListener.java | 4 +++- 3 files changed, 5 insertions(+), 3 deletions(-) diff --git a/config/application.properties b/config/application.properties index 4bc1109..553af30 100644 --- a/config/application.properties +++ b/config/application.properties @@ -1,2 +1,2 @@ aws.profile=default - +aws.region=eu-south-1 diff --git a/src/main/java/it/pagopa/interop/signalhub/persister/config/AwsBeanBuilder.java b/src/main/java/it/pagopa/interop/signalhub/persister/config/AwsBeanBuilder.java index 27c3a05..cb20932 100644 --- a/src/main/java/it/pagopa/interop/signalhub/persister/config/AwsBeanBuilder.java +++ b/src/main/java/it/pagopa/interop/signalhub/persister/config/AwsBeanBuilder.java @@ -36,7 +36,7 @@ public SqsMessageListenerContainerFactory defaultSqsListenerContainerFac return SqsMessageListenerContainerFactory .builder() .configure(options -> options - //.acknowledgementMode(AcknowledgementMode.ON_SUCCESS) + .acknowledgementMode(AcknowledgementMode.ON_SUCCESS) .maxConcurrentMessages(10) .maxMessagesPerPoll(10)) .sqsAsyncClient(sqsAsyncClient()) diff --git a/src/main/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListener.java b/src/main/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListener.java index 9290da0..2798173 100644 --- a/src/main/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListener.java +++ b/src/main/java/it/pagopa/interop/signalhub/persister/queue/consumer/SqsInternalListener.java @@ -37,7 +37,9 @@ public CompletableFuture pullFromAwsInternalQueue(@Payload String node, @H .doOnNext(json -> log.info("payloadBody: {}, headers: {}, PullFromInternalQueue received input", node, headers)) .map(json -> Utility.jsonToObject(node, SignalEvent.class)) .map(signalEvent -> signalMapper.signalEventToSignal(signalEvent, correlationId)) - .flatMap(signalService::signalServiceFlow) + .flatMap(signalEvent -> signalService.signalServiceFlow(signalEvent) + .contextWrite(Context.of(TRACE_ID_KEY, correlationId)) + ) .then() .toFuture(); }