Skip to content

Commit

Permalink
NOD-841 ehub wip
Browse files Browse the repository at this point in the history
  • Loading branch information
mscattarella committed May 31, 2024
1 parent 77e9a6b commit 6e18b4c
Show file tree
Hide file tree
Showing 11 changed files with 125 additions and 85 deletions.
8 changes: 4 additions & 4 deletions helm/values-dev.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ microservice-chart:
ADAPTER_API_CONFIG_CACHE_URL: "https://api.dev.platform.pagopa.it/api-config-cache/o/v1"
QUEUE_CONVERSION_NAME: "flowidsendqueue"
EVENT_HUB_RE_NAME: "fdr-re"
EVENT_HUB_FLUSSIRENDICONTAZIONE_NAME: "FLUSSI_RENDICONTAZIONE"
EVENT_HUB_IUVRENDICONTATI_NAME: "IUV_RENDICONTATI"
EVENT_HUB_FLOWTX_NAME: "FLUSSI_RENDICONTAZIONE"
EVENT_HUB_REPORTEDIUV_NAME: "IUV_RENDICONTATI"
BLOB_RE_CONTAINER_NAME: "payload"
BLOB_HISTORY_CONTAINER_NAME: "fdrhistory"
TABLE_HISTORY_FDR_PUBLISH_TABLE: "fdrpublish"
Expand All @@ -44,8 +44,8 @@ microservice-chart:
MONGODB_CONNECTION_STRING: "mongodb-connection-string"
QUEUE_CONVERSION_CONNECTION_STRING: "fdr-sa-connection-string"
EVENT_HUB_RE_CONNECTION_STRING: "azure-event-hub-re-connection-string"
EVENT_HUB_IUVRENDICONTATI_CONNECTION_STRING: "azure-event-hub-re-connection-string"
EVENT_HUB_FLUSSIRENDICONTAZIONE_CONNECTION_STRING: "azure-event-hub-re-connection-string"
EVENT_HUB_REPORTEDIUV_CONNECTION_STRING: "fdr-qi-reported-iuv-tx-connection-string"
EVENT_HUB_FLOWTX_CONNECTION_STRING: "fdr-qi-flows-tx-connection-string"
BLOB_RE_CONNECTION_STRING: "fdr-re-sa-connection-string"
BLOB_HISTORY_CONNECTION_STRING: "fdr-history-sa-connection-string"
TABLE_HISTORY_CONNECTION_STRING: "fdr-history-sa-connection-string"
Expand Down
8 changes: 4 additions & 4 deletions helm/values-uat.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ microservice-chart:
ADAPTER_API_CONFIG_CACHE_URL: "https://api.uat.platform.pagopa.it/api-config-cache/o/v1"
QUEUE_CONVERSION_NAME: "flowidsendqueue"
EVENT_HUB_RE_NAME: "fdr-re"
EVENT_HUB_FLUSSIRENDICONTAZIONE_NAME: "FLUSSI_RENDICONTAZIONE"
EVENT_HUB_IUVRENDICONTATI_NAME: "IUV_RENDICONTATI"
EVENT_HUB_FLOWTX_NAME: "FLUSSI_RENDICONTAZIONE"
EVENT_HUB_REPORTEDIUV_NAME: "IUV_RENDICONTATI"
BLOB_RE_CONTAINER_NAME: "payload"
BLOB_HISTORY_CONTAINER_NAME: "fdrhistory"
TABLE_HISTORY_FDR_PUBLISH_TABLE: "fdrpublish"
Expand All @@ -44,8 +44,8 @@ microservice-chart:
MONGODB_CONNECTION_STRING: "mongodb-connection-string"
QUEUE_CONVERSION_CONNECTION_STRING: "fdr-sa-connection-string"
EVENT_HUB_RE_CONNECTION_STRING: "azure-event-hub-re-connection-string"
EVENT_HUB_IUVRENDICONTATI_CONNECTION_STRING: "azure-event-hub-re-connection-string"
EVENT_HUB_FLUSSIRENDICONTAZIONE_CONNECTION_STRING: "azure-event-hub-re-connection-string"
EVENT_HUB_REPORTEDIUV_CONNECTION_STRING: "fdr-qi-reported-iuv-tx-connection-string"
EVENT_HUB_FLOWTX_CONNECTION_STRING: "fdr-qi-flows-tx-connection-string"
BLOB_RE_CONNECTION_STRING: "fdr-re-sa-connection-string"
BLOB_HISTORY_CONNECTION_STRING: "fdr-history-sa-connection-string"
TABLE_HISTORY_CONNECTION_STRING: "fdr-history-sa-connection-string"
Expand Down
31 changes: 30 additions & 1 deletion src/main/java/it/gov/pagopa/fdr/AppStartup.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import io.quarkus.runtime.Startup;
import it.gov.pagopa.fdr.service.conversion.ConversionService;
import it.gov.pagopa.fdr.service.flowTx.FlowTxService;
import it.gov.pagopa.fdr.service.history.HistoryService;
import it.gov.pagopa.fdr.service.re.ReService;
import it.gov.pagopa.fdr.service.reportedIuv.ReportedIuvService;
import jakarta.annotation.PostConstruct;
import jakarta.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.config.inject.ConfigProperty;
Expand All @@ -25,6 +27,12 @@ public class AppStartup {
@ConfigProperty(name = "history.enabled")
boolean historyEnabled;

@ConfigProperty(name = "eHub.reportediuv.enabled")
boolean eHubReportedIuvEnabled;

@ConfigProperty(name = "eHub.flowtx.enabled")
boolean eHubFlowTxEnabled;

private final Logger log;

private final Config config;
Expand All @@ -34,17 +42,24 @@ public class AppStartup {
private final ReService reService;
private final HistoryService historyService;

private final ReportedIuvService reportedIuvService;
private final FlowTxService flowTxService;

public AppStartup(
Logger log,
Config config,
ConversionService conversionQueue,
ReService reService,
HistoryService historyService) {
HistoryService historyService,
ReportedIuvService reportedIuvService,
FlowTxService flowTxService) {
this.log = log;
this.config = config;
this.conversionQueue = conversionQueue;
this.reService = reService;
this.historyService = historyService;
this.reportedIuvService = reportedIuvService;
this.flowTxService = flowTxService;
}

@PostConstruct
Expand Down Expand Up @@ -76,5 +91,19 @@ public void init() {
} else {
log.info("History DISABLED");
}

if (eHubReportedIuvEnabled) {
log.info("Start EventHub ReportedIUV");
reportedIuvService.init();
} else {
log.info("Start EventHub ReportedIUV");
}

if (eHubFlowTxEnabled) {
log.info("Start EventHub FlowTx");
flowTxService.init();
} else {
log.info("Start EventHub FlowTx");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,11 @@ public enum AppErrorCodeMessageEnum implements AppErrorCodeMessageInterface {
FILE_UTILS_FILE_NOT_FOUND("0730", "fdr.fileUtilsFileNotFound", Status.INTERNAL_SERVER_ERROR),
COMPRESS_JSON("0731", "compress.json.error", Status.INTERNAL_SERVER_ERROR),

EVENT_HUB_IUVRENDICONTATI_PARSE_JSON(
"0732", "eHub.iuvrendicontati.parse", Status.INTERNAL_SERVER_ERROR),
EVENT_HUB_IUVRENDICONTATI_TOO_LARGE(
"0733", "eHub.iuvrendicontati.tooLarge", Status.INTERNAL_SERVER_ERROR),
EVENT_HUB_FLUSSIRENDICONTAZIONE_PARSE_JSON(
"0734", "eHub.flussirendicontazione.parse", Status.INTERNAL_SERVER_ERROR),
EVENT_HUB_FLUSSIRENDICONTAZIONE_TOO_LARGE(
"0735", "eHub.flussirendicontazione.tooLarge", Status.INTERNAL_SERVER_ERROR);
EVENT_HUB_REPORTEDIUV_PARSE_JSON("0732", "eHub.reportediuv.parse", Status.INTERNAL_SERVER_ERROR),
EVENT_HUB_REPORTEDIUV_TOO_LARGE(
"0733", "eHub.reportediuv.tooLarge", Status.INTERNAL_SERVER_ERROR),
EVENT_HUB_FLOWTX_PARSE_JSON("0734", "eHub.flowtx.parse", Status.INTERNAL_SERVER_ERROR),
EVENT_HUB_FLOWTX_TOO_LARGE("0735", "eHub.flowtx.tooLarge", Status.INTERNAL_SERVER_ERROR);

private final String errorCode;
private final String errorMessageKey;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package it.gov.pagopa.fdr.service.flussiRendicontazione;
package it.gov.pagopa.fdr.service.flowTx;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
Expand All @@ -8,7 +8,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import it.gov.pagopa.fdr.exception.AppErrorCodeMessageEnum;
import it.gov.pagopa.fdr.exception.AppException;
import it.gov.pagopa.fdr.service.flussiRendicontazione.model.FlussiRendicontazione;
import it.gov.pagopa.fdr.service.flowTx.model.FlowTx;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Arrays;
import java.util.List;
Expand All @@ -17,35 +17,35 @@
import org.jboss.logging.Logger;

@ApplicationScoped
public class FlussiRendicontazioneService {
public class FlowTxService {

private final Logger log;

@ConfigProperty(name = "ehub.flussirendicontazione.connect-str")
@ConfigProperty(name = "ehub.flowtx.connect-str")
String eHubConnectStr;

@ConfigProperty(name = "ehub.flussirendicontazione.name")
@ConfigProperty(name = "ehub.flowtx.name")
String eHubName;

private EventHubProducerClient producer;

private final ObjectMapper objectMapper;

public FlussiRendicontazioneService(Logger log, ObjectMapper objectMapper) {
public FlowTxService(Logger log, ObjectMapper objectMapper) {
this.log = log;
this.objectMapper = objectMapper;
}

public void init() {
log.infof("EventHub flussirendicontazione init. EventHub name [%s]", eHubName);
log.infof("EventHub flowtx init. EventHub name [%s]", eHubName);

this.producer =
new EventHubClientBuilder()
.connectionString(eHubConnectStr, eHubName)
.buildProducerClient();
}

public final void sendEvent(FlussiRendicontazione... list) {
public final void sendEvent(FlowTx... list) {
if (this.producer == null) {
log.debugf("EventHub re [%s] NOT INITIALIZED", eHubName);
} else {
Expand All @@ -59,8 +59,7 @@ public final void sendEvent(FlussiRendicontazione... list) {
return new EventData(objectMapper.writeValueAsString(l));
} catch (JsonProcessingException e) {
log.errorf("Producer SDK Azure RE event error", e);
throw new AppException(
AppErrorCodeMessageEnum.EVENT_HUB_FLUSSIRENDICONTAZIONE_PARSE_JSON);
throw new AppException(AppErrorCodeMessageEnum.EVENT_HUB_FLOWTX_PARSE_JSON);
}
})
.toList();
Expand All @@ -84,7 +83,7 @@ public void publishEvents(List<EventData> allEvents) {
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new AppException(
AppErrorCodeMessageEnum.EVENT_HUB_FLUSSIRENDICONTAZIONE_TOO_LARGE,
AppErrorCodeMessageEnum.EVENT_HUB_FLOWTX_TOO_LARGE,
eventDataBatch.getMaxSizeInBytes());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package it.gov.pagopa.fdr.service.flussiRendicontazione.model;
package it.gov.pagopa.fdr.service.flowTx.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigDecimal;
Expand All @@ -9,7 +9,7 @@

@Data
@Builder
public class FlussiRendicontazione {
public class FlowTx {

@JsonProperty("ID_FLUSSO")
private String idFlusso;
Expand Down
25 changes: 12 additions & 13 deletions src/main/java/it/gov/pagopa/fdr/service/psps/PspsService.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,15 @@
import it.gov.pagopa.fdr.service.conversion.ConversionService;
import it.gov.pagopa.fdr.service.conversion.message.FdrMessage;
import it.gov.pagopa.fdr.service.dto.*;
import it.gov.pagopa.fdr.service.flussiRendicontazione.FlussiRendicontazioneService;
import it.gov.pagopa.fdr.service.flussiRendicontazione.model.FlussiRendicontazione;
import it.gov.pagopa.fdr.service.flowTx.FlowTxService;
import it.gov.pagopa.fdr.service.flowTx.model.FlowTx;
import it.gov.pagopa.fdr.service.history.HistoryService;
import it.gov.pagopa.fdr.service.history.model.HistoryBlobBody;
import it.gov.pagopa.fdr.service.iuvRendicontati.IUVRendicontatiService;
import it.gov.pagopa.fdr.service.iuvRendicontati.model.IUVRendicontati;
import it.gov.pagopa.fdr.service.psps.mapper.PspsServiceServiceMapper;
import it.gov.pagopa.fdr.service.re.ReService;
import it.gov.pagopa.fdr.service.re.model.*;
import it.gov.pagopa.fdr.service.reportedIuv.ReportedIuvService;
import it.gov.pagopa.fdr.service.reportedIuv.model.ReportedIuv;
import it.gov.pagopa.fdr.util.AppDBUtil;
import it.gov.pagopa.fdr.util.AppMessageUtil;
import jakarta.enterprise.context.ApplicationScoped;
Expand All @@ -54,9 +54,9 @@ public class PspsService {

private final ReService reService;

private final FlussiRendicontazioneService flussiRendicontazioneService;
private final FlowTxService flowTxService;

private final IUVRendicontatiService iuvRendicontatiService;
private final ReportedIuvService reportedIuvService;

private final HistoryService historyService;

Expand All @@ -66,15 +66,15 @@ public PspsService(
ConversionService conversionQueue,
ReService reService,
HistoryService historyService,
FlussiRendicontazioneService flussiRendicontazioneService,
IUVRendicontatiService iuvRendicontatiService) {
FlowTxService flowTxService,
ReportedIuvService reportedIuvService) {
this.mapper = mapper;
this.log = log;
this.conversionQueue = conversionQueue;
this.reService = reService;
this.historyService = historyService;
this.flussiRendicontazioneService = flussiRendicontazioneService;
this.iuvRendicontatiService = iuvRendicontatiService;
this.flowTxService = flowTxService;
this.reportedIuvService = reportedIuvService;
}

@WithSpan(kind = SERVER)
Expand Down Expand Up @@ -378,10 +378,9 @@ public void publishByFdr(String action, String pspId, String fdr, boolean intern
.fdrAction(FdrActionEnum.PUBLISH)
.build());

flussiRendicontazioneService.sendEvent(
FlussiRendicontazione.builder().build()); // FIXME popolare campi
flowTxService.sendEvent(FlowTx.builder().build()); // FIXME popolare campi

iuvRendicontatiService.sendEvent(IUVRendicontati.builder().build()); // FIXME popolare campi
reportedIuvService.sendEvent(ReportedIuv.builder().build()); // FIXME popolare campi
}

@WithSpan(kind = SERVER)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package it.gov.pagopa.fdr.service.iuvRendicontati;
package it.gov.pagopa.fdr.service.reportedIuv;

import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventDataBatch;
Expand All @@ -8,7 +8,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import it.gov.pagopa.fdr.exception.AppErrorCodeMessageEnum;
import it.gov.pagopa.fdr.exception.AppException;
import it.gov.pagopa.fdr.service.iuvRendicontati.model.IUVRendicontati;
import it.gov.pagopa.fdr.service.reportedIuv.model.ReportedIuv;
import jakarta.enterprise.context.ApplicationScoped;
import java.util.Arrays;
import java.util.List;
Expand All @@ -17,35 +17,35 @@
import org.jboss.logging.Logger;

@ApplicationScoped
public class IUVRendicontatiService {
public class ReportedIuvService {

private final Logger log;

@ConfigProperty(name = "ehub.iuvrendicontati.connect-str")
@ConfigProperty(name = "ehub.reportediuv.connect-str")
String eHubConnectStr;

@ConfigProperty(name = "ehub.iuvrendicontati.name")
@ConfigProperty(name = "ehub.reportediuv.name")
String eHubName;

private EventHubProducerClient producer;

private final ObjectMapper objectMapper;

public IUVRendicontatiService(Logger log, ObjectMapper objectMapper) {
public ReportedIuvService(Logger log, ObjectMapper objectMapper) {
this.log = log;
this.objectMapper = objectMapper;
}

public void init() {
log.infof("EventHub iuvrendicontati init. EventHub name [%s]", eHubName);
log.infof("EventHub reportediuv init. EventHub name [%s]", eHubName);

this.producer =
new EventHubClientBuilder()
.connectionString(eHubConnectStr, eHubName)
.buildProducerClient();
}

public final void sendEvent(IUVRendicontati... list) {
public final void sendEvent(ReportedIuv... list) {
if (this.producer == null) {
log.debugf("EventHub re [%s] NOT INITIALIZED", eHubName);
} else {
Expand All @@ -60,7 +60,7 @@ public final void sendEvent(IUVRendicontati... list) {
} catch (JsonProcessingException e) {
log.errorf("Producer SDK Azure RE event error", e);
throw new AppException(
AppErrorCodeMessageEnum.EVENT_HUB_IUVRENDICONTATI_PARSE_JSON);
AppErrorCodeMessageEnum.EVENT_HUB_REPORTEDIUV_PARSE_JSON);
}
})
.toList();
Expand All @@ -84,7 +84,7 @@ public void publishEvents(List<EventData> allEvents) {
// Try to add that event that couldn't fit before.
if (!eventDataBatch.tryAdd(eventData)) {
throw new AppException(
AppErrorCodeMessageEnum.EVENT_HUB_IUVRENDICONTATI_TOO_LARGE,
AppErrorCodeMessageEnum.EVENT_HUB_REPORTEDIUV_TOO_LARGE,
eventDataBatch.getMaxSizeInBytes());
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package it.gov.pagopa.fdr.service.iuvRendicontati.model;
package it.gov.pagopa.fdr.service.reportedIuv.model;

import com.fasterxml.jackson.annotation.JsonProperty;
import java.math.BigDecimal;
Expand All @@ -8,7 +8,7 @@

@Data
@Builder
public class IUVRendicontati {
public class ReportedIuv {

@JsonProperty("IUV")
private String identificativoUnivocoVersamento;
Expand Down
Loading

0 comments on commit 6e18b4c

Please sign in to comment.