Skip to content

Commit

Permalink
[SELC-5128] refactor: Adding pagination size and limit in Send Event …
Browse files Browse the repository at this point in the history
…Metrics on sc-users (#161)
  • Loading branch information
manuraf authored Jun 24, 2024
1 parent 5d512a2 commit ca16cc6
Show file tree
Hide file tree
Showing 11 changed files with 112 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import it.pagopa.selfcare.user.event.entity.UserInstitution;
import it.pagopa.selfcare.user.event.mapper.NotificationMapper;
import it.pagopa.selfcare.user.event.repository.UserInstitutionRepository;
import it.pagopa.selfcare.user.model.TrackEventInput;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import jakarta.ws.rs.WebApplicationException;
Expand Down Expand Up @@ -127,7 +128,7 @@ private void initOrderStream(Boolean sendEventsEnabled) {
this::consumerUserInstitutionRepositoryEvent,
failure -> {
log.error("Error during subscribe collection, exception: {} , message: {}", failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(failure.getClass().toString(), null,null), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
Quarkus.asyncExit();
});

Expand All @@ -136,7 +137,7 @@ private void initOrderStream(Boolean sendEventsEnabled) {
this::consumerToSendScUserEvent,
failure -> {
log.error("Error during subscribe collection, exception: {} , message: {}", failure.toString(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(failure.getClass().toString(), null,null), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D));
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(TrackEventInput.builder().exception(failure.getClass().toString()).build()), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D));
Quarkus.asyncExit();
});
}
Expand Down Expand Up @@ -165,11 +166,11 @@ protected void consumerUserInstitutionRepositoryEvent(ChangeStreamDocument<UserI
result -> {
log.info("UserInfo collection successfully updated from UserInstitution document having id: {}", userInstitutionId);
updateLastResumeToken(document.getResumeToken());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(userInstitutionId, userInstitutionChanged.getUserId(), null), Map.of(USER_INFO_UPDATE_SUCCESS, 1D));
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userInstitutionChanged, null)), Map.of(USER_INFO_UPDATE_SUCCESS, 1D));
},
failure -> {
log.error("Error during UserInfo collection updating, from UserInstitution document having id: {} , message: {}", userInstitutionId, failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(userInstitutionId, userInstitutionChanged.getUserId(), null), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userInstitutionChanged, null)), Map.of(USER_INFO_UPDATE_FAILURE, 1D));
});
}

Expand Down Expand Up @@ -198,23 +199,32 @@ public void consumerToSendScUserEvent(ChangeStreamDocument<UserInstitution> docu
.onItem().transformToUni(userResource -> Multi.createFrom().iterable(UserUtils.groupingProductAndReturnMinStateProduct(userInstitutionChanged.getProducts()))
.map(onboardedProduct -> notificationMapper.toUserNotificationToSend(userInstitutionChanged, onboardedProduct, userResource))
.onItem().transformToUniAndMerge(userNotificationToSend -> eventHubRestClient.sendMessage(userNotificationToSend)
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(userInstitutionChanged.getId().toHexString(), userInstitutionChanged.getUserId(), userNotificationToSend.getProductId()), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(userInstitutionChanged.getId().toHexString(), userInstitutionChanged.getUserId(), userNotificationToSend.getProductId()), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D))))
.onItem().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userInstitutionChanged, userNotificationToSend.getProductId())), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS, 1D)))
.onFailure().invoke(() -> telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userInstitutionChanged, userNotificationToSend.getProductId())), Map.of(EVENTS_USER_INSTITUTION_PRODUCT_FAILURE, 1D))))
.toUni()
)
.subscribe().with(
result -> {
log.info("SendEvents successfully performed from UserInstitution document having id: {}", document.getDocumentKey().toJson());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(userInstitutionChanged.getId().toHexString(), userInstitutionChanged.getUserId(), null), Map.of(EVENTS_USER_INSTITUTION_SUCCESS, 1D));
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userInstitutionChanged, null)), Map.of(EVENTS_USER_INSTITUTION_SUCCESS, 1D));
},
failure -> {
log.error("Error during SendEvents from UserInstitution document having id: {} , message: {}", document.getDocumentKey().toJson(), failure.getMessage());
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(userInstitutionChanged.getId().toHexString(), userInstitutionChanged.getUserId(), null), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D));
telemetryClient.trackEvent(EVENT_USER_CDC_NAME, mapPropsForTrackEvent(toTrackEventInput(userInstitutionChanged, null)), Map.of(EVENTS_USER_INSTITUTION_FAILURE, 1D));
});
}

private boolean checkIfIsRetryableException(Throwable throwable) {
return throwable instanceof TimeoutException ||
(throwable instanceof WebApplicationException webApplicationException && webApplicationException.getResponse().getStatus() == 429);
}

private TrackEventInput toTrackEventInput(UserInstitution userInstitution, String productId) {
return TrackEventInput.builder()
.documentKey(userInstitution.getInstitutionId())
.userId(userInstitution.getUserId())
.institutionId(userInstitution.getInstitutionId())
.productId(productId)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import lombok.extern.slf4j.Slf4j;
import org.bson.Document;
import org.bson.types.ObjectId;
import org.eclipse.microprofile.config.inject.ConfigProperty;

import java.time.LocalDateTime;
import java.util.HashMap;
Expand All @@ -39,7 +40,9 @@
@RequiredArgsConstructor
public class UserInstitutionServiceDefault implements UserInstitutionService {

public static final Page PAGE_SIZE_FIND_USER_INSTITUTION = Page.ofSize(1000);
@ConfigProperty(name = "user-ms.eventhub.users.page-size")
Integer pageSizeFindUserInstitutions;

private final UserInstitutionMapper userInstitutionMapper;
private final QueryUtils queryUtils;
private final UserUtils userUtils;
Expand Down Expand Up @@ -168,13 +171,13 @@ public Multi<UserInstitution> findUserInstitutionsAfterDateWithFilter(Map<String
@Override
public Multi<UserInstitution> findUserInstitutionsAfterDateWithFilter(Map<String, Object> queryParameter, LocalDateTime fromDate, Integer page) {
Document query = queryUtils.buildQueryDocumentByDate(queryParameter, USER_INSTITUTION_COLLECTION, fromDate);
return runUserInstitutionFindQuery(query, null).page(PAGE_SIZE_FIND_USER_INSTITUTION.index(page)).stream();
return runUserInstitutionFindQuery(query, null).page(Page.ofSize(pageSizeFindUserInstitutions).index(page)).stream();
}

@Override
public Uni<Integer> pageCountUserInstitutionsAfterDateWithFilter(Map<String, Object> queryParameter, LocalDateTime fromDate) {
Document query = queryUtils.buildQueryDocumentByDate(queryParameter, USER_INSTITUTION_COLLECTION, fromDate);
return runUserInstitutionFindQuery(query, null).page(PAGE_SIZE_FIND_USER_INSTITUTION).pageCount();
return runUserInstitutionFindQuery(query, null).page(Page.ofSize(pageSizeFindUserInstitutions)).pageCount();
}

private Uni<Long> updateUserStatusDao(Map<String, Object> filterMap, OnboardedProductState status) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
import it.pagopa.selfcare.user.conf.CloudTemplateLoader;
import it.pagopa.selfcare.user.entity.UserInstitution;
import it.pagopa.selfcare.user.exception.InvalidRequestException;
import it.pagopa.selfcare.user.model.LoggedUser;
import it.pagopa.selfcare.user.model.OnboardedProduct;
import it.pagopa.selfcare.user.model.UserNotificationToSend;
import it.pagopa.selfcare.user.model.*;
import it.pagopa.selfcare.user.model.constants.OnboardedProductState;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
Expand Down Expand Up @@ -63,7 +61,6 @@ public UserNotificationServiceImpl(Configuration freemarkerConfig, CloudTemplate
public Uni<UserNotificationToSend> sendKafkaNotification(UserNotificationToSend userNotificationToSend) {
return eventHubUsersEnabled
? eventHubRestClient.sendMessage(userNotificationToSend)
.onItem().invoke(() -> log.info("sent dataLake notification for id : {}", userNotificationToSend.getId()))
.onItem().invoke(trackTelemetryEvent(userNotificationToSend, EVENTS_USER_INSTITUTION_PRODUCT_SUCCESS))
.onFailure().invoke(throwable -> log.warn("error during send dataLake notification for id {}: {} ", userNotificationToSend.getId(), throwable.getMessage(), throwable))
.onFailure().invoke(trackTelemetryEvent(userNotificationToSend, EVENTS_USER_INSTITUTION_PRODUCT_FAILURE))
Expand All @@ -72,7 +69,13 @@ public Uni<UserNotificationToSend> sendKafkaNotification(UserNotificationToSend
}

private Runnable trackTelemetryEvent(UserNotificationToSend userNotificationToSend, String metricsName) {
return () -> telemetryClient.trackEvent(EVENT_USER_MS_NAME, mapPropsForTrackEvent(userNotificationToSend.getId(), null, userNotificationToSend.getProductId()), Map.of(metricsName, 1D));
TrackEventInput trackEventInput = TrackEventInput.builder()
.documentKey(userNotificationToSend.getInstitutionId())
.userId(Optional.ofNullable(userNotificationToSend.getUser()).map(UserToNotify::getUserId).orElse(null))
.institutionId(userNotificationToSend.getInstitutionId())
.productId(userNotificationToSend.getProductId())
.build();
return () -> telemetryClient.trackEvent(EVENT_USER_MS_NAME, mapPropsForTrackEvent(trackEventInput), Map.of(metricsName, 1D));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import it.pagopa.selfcare.user.mapper.UserMapper;
import it.pagopa.selfcare.user.model.LoggedUser;
import it.pagopa.selfcare.user.model.OnboardedProduct;
import it.pagopa.selfcare.user.model.TrackEventInput;
import it.pagopa.selfcare.user.model.UserNotificationToSend;
import it.pagopa.selfcare.user.model.constants.OnboardedProductState;
import it.pagopa.selfcare.user.model.constants.QueueEvent;
Expand All @@ -40,6 +41,7 @@
import org.owasp.encoder.Encode;
import software.amazon.awssdk.utils.CollectionUtils;

import java.time.Duration;
import java.time.LocalDateTime;
import java.util.*;
import java.util.function.Function;
Expand Down Expand Up @@ -545,7 +547,8 @@ public Uni<Void> sendEventsByDateAndUserIdAndInstitutionId(LocalDateTime fromDat
.onItem().transformToUni(pageCount -> pageCount > 0
? Uni.combine().all().unis(
IntStream.range(0, pageCount).boxed()
.map(index -> sendEventsByDateAndUserIdAndInstitutionId(fromDate,institutionId,userId,index))
.map(index -> sendEventsByDateAndUserIdAndInstitutionId(fromDate,institutionId,userId,index)
.onItem().delayIt().by(Duration.ofSeconds(5)))
.toList())
.usingConcurrencyOf(eventhubUsersConcurrencyLevel)
.discardItems()
Expand All @@ -559,22 +562,27 @@ public Uni<Void> sendEventsByDateAndUserIdAndInstitutionId(LocalDateTime fromDat
.onItem().transformToUni(userInstitution -> {
String userIdToUse = userId != null ? userId : userInstitution.getUserId();
Uni<UserResource> userResourceUni = userRegistryService.findByIdUsingGET(USERS_FIELD_LIST_WITHOUT_FISCAL_CODE, userIdToUse);
TrackEventInput trackEventInput = TrackEventInput.builder()
.documentKey(userInstitution.getInstitutionId())
.userId(userIdToUse)
.institutionId(userInstitution.getInstitutionId())
.build();

return userResourceUni
.onItem().transformToUni(userResource -> buildAndSendKafkaNotifications(userInstitution, userResource)
.collect().asList()
.replaceWithVoid())
.onItem().invoke(trackTelemetryEvent(userInstitution, userIdToUse, EVENTS_USER_INSTITUTION_SUCCESS))
.onItem().invoke(() -> trackTelemetryEvent(trackEventInput, EVENTS_USER_INSTITUTION_SUCCESS))
.onFailure().invoke(exception -> log.error("Failed to retrieve UserResource userId:{}", userIdToUse, exception))
.onFailure().invoke(trackTelemetryEvent(userInstitution, userIdToUse, EVENTS_USER_INSTITUTION_FAILURE))
.onFailure().invoke(exception -> trackTelemetryEvent(trackEventInput.toBuilder().exception(exception.getMessage()).build(), EVENTS_USER_INSTITUTION_FAILURE))
.onFailure().recoverWithNull();
})
.merge().toUni()
.onFailure().invoke(exception -> log.error("Failed to send Events for page: {}, message: {}", page, exception.getMessage()));
}

private Runnable trackTelemetryEvent(UserInstitution userInstitution, String userIdToUse, String metricsName) {
return () -> telemetryClient.trackEvent(EVENT_USER_MS_NAME, mapPropsForTrackEvent(userInstitution.getId().toHexString(), userIdToUse, null), Map.of(metricsName, 1D));
private void trackTelemetryEvent(TrackEventInput trackEventInput, String metricsName) {
telemetryClient.trackEvent(EVENT_USER_MS_NAME, mapPropsForTrackEvent(trackEventInput), Map.of(metricsName, 1D));
}

private void applyFiltersToRemoveProducts(UserInstitution userInstitution, List<String> states, List<String> products, List<String> roles, List<String> productRoles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,11 @@ private List<Bson> constructBsonWithDateFilter(Map<String, Object> parameters, S
bsonList.addAll(addEqAndInFilters(parameters));
}
if (fromDate != null) {
bsonList.add(Filters.elemMatch("products", Filters.gt("createdAt", fromDate)));
bsonList.add(
Filters.or(
Filters.elemMatch("products", Filters.gt("createdAt", fromDate)),
Filters.elemMatch("products", Filters.gt("updatedAt", fromDate))
));
}

return bsonList;
Expand Down
3 changes: 2 additions & 1 deletion apps/user-ms/src/main/resources/application.properties
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ quarkus.rest-client.event-hub.url=${EVENT_HUB_BASE_PATH:test}
eventhub.rest-client.keyName=${SHARED_ACCESS_KEY_NAME:test}
eventhub.rest-client.key=${EVENTHUB-SC-USERS-SELFCARE-WO-KEY-LC:test}
user-ms.eventhub.users.enabled=${USER_MS_EVENTHUB_USERS_ENABLED:false}
user-ms.eventhub.users.concurrency-level=${USER_MS_EVENTHUB_USERS_CONCURRENCY_LEVEL:5}
user-ms.eventhub.users.concurrency-level=${USER_MS_EVENTHUB_USERS_CONCURRENCY_LEVEL:1}
user-ms.eventhub.users.page-size=${USER_MS_EVENTHUB_USERS_PAGE_SIZE:50}

quarkus.log.level=INFO
quarkus.http.limits.max-form-attribute-size=4096
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ void sendEventsByDateAndUserIdAndInstitutionId(){
userService
.sendEventsByDateAndUserIdAndInstitutionId(fromDate, institutionId, userId)
.subscribe()
.withSubscriber(UniAssertSubscriber.create()).assertCompleted();
.withSubscriber(UniAssertSubscriber.create()).awaitItem();

// Verify the result
verify(userInstitutionService, times(1))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package it.pagopa.selfcare.user;

import it.pagopa.selfcare.user.model.OnboardedProduct;
import it.pagopa.selfcare.user.model.TrackEventInput;
import it.pagopa.selfcare.user.model.constants.OnboardedProductState;

import java.util.*;
Expand Down Expand Up @@ -54,11 +55,13 @@ public static Collection<OnboardedProduct> groupingProductAndReturnMinStateProdu
return onboardedProductMap.values();
}

public static Map<String, String> mapPropsForTrackEvent(String documentKey, String userId, String productId) {
public static Map<String, String> mapPropsForTrackEvent(TrackEventInput trackEventInput) {
Map<String, String> propertiesMap = new HashMap<>();
Optional.ofNullable(documentKey).ifPresent(value -> propertiesMap.put("documentKey", value));
Optional.ofNullable(userId).ifPresent(value -> propertiesMap.put("userId", value));
Optional.ofNullable(productId).ifPresent(value -> propertiesMap.put("productId", value));
Optional.ofNullable(trackEventInput.getDocumentKey()).ifPresent(value -> propertiesMap.put("documentKey", value));
Optional.ofNullable(trackEventInput.getUserId()).ifPresent(value -> propertiesMap.put("userId", value));
Optional.ofNullable(trackEventInput.getProductId()).ifPresent(value -> propertiesMap.put("productId", value));
Optional.ofNullable(trackEventInput.getInstitutionId()).ifPresent(value -> propertiesMap.put("institutionId", value));
Optional.ofNullable(trackEventInput.getException()).ifPresent(value -> propertiesMap.put("exec", value));
return propertiesMap;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package it.pagopa.selfcare.user.model;


import lombok.Builder;
import lombok.Data;

@Data
@Builder(toBuilder = true)
public class TrackEventInput {

private String documentKey;
private String userId;
private String productId;

private String institutionId;

private String exception;
}
Loading

0 comments on commit ca16cc6

Please sign in to comment.