From 525c7087f3a975b557b737c016180f2a6207c4ff Mon Sep 17 00:00:00 2001 From: Jan Lukavsky Date: Tue, 23 Jan 2024 09:21:44 +0100 Subject: [PATCH] [proxima-direct-ingest-server] add latency metrics to IngestService --- .../java/cz/o2/proxima/direct/server/IngestServer.java | 4 +++- .../cz/o2/proxima/direct/server/IngestService.java | 10 +++++----- .../cz/o2/proxima/direct/server/metrics/Metrics.java | 3 +++ 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/IngestServer.java b/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/IngestServer.java index 115f22330..aafecf607 100644 --- a/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/IngestServer.java +++ b/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/IngestServer.java @@ -134,6 +134,7 @@ static boolean ingestRequest( String uuid, Consumer responseConsumer) { + long start = System.currentTimeMillis(); AttributeDescriptor attributeDesc = ingest.getAttributeDescriptor(); OnlineAttributeWriter writer = getWriterForAttributeInTransform(direct, attributeDesc); @@ -156,11 +157,12 @@ static boolean ingestRequest( } Metrics.COMMIT_LOG_APPEND.increment(); - // write the ingest into the commit log and confirm to the client + // write the element into the commit log and confirm to the client log.debug("Writing {} to commit log {}", ingest, writer.getUri()); writer.write( ingest, (s, exc) -> { + Metrics.INGEST_LATENCY.increment(System.currentTimeMillis() - start); if (s) { responseConsumer.accept(ok(uuid)); } else { diff --git a/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/IngestService.java b/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/IngestService.java index f43470a2f..509ec62d6 100644 --- a/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/IngestService.java +++ b/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/IngestService.java @@ -357,6 +357,7 @@ private StreamElement validateAndConvertToStreamElement( if (Strings.isNullOrEmpty(request.getKey()) || Strings.isNullOrEmpty(request.getEntity()) || Strings.isNullOrEmpty(request.getAttribute())) { + consumer.accept(status(request.getUuid(), 400, "Missing required fields in input message")); return null; } @@ -398,7 +399,6 @@ private StreamElement validateAndConvertToStreamElement( @Override public void ingest(Ingest request, StreamObserver responseObserver) { - Metrics.INGEST_SINGLE.increment(); processSingleIngest( request, @@ -410,17 +410,14 @@ public void ingest(Ingest request, StreamObserver responseObserver) { @Override public StreamObserver ingestSingle(StreamObserver responseObserver) { - return new IngestObserver(responseObserver); } @Override public StreamObserver ingestBulk(StreamObserver responseObserver) { - // the responseObserver doesn't have to be synchronized in this // case, because the communication with the observer is done // in single flush thread - return new IngestBulkObserver(responseObserver); } @@ -431,13 +428,17 @@ public void commit( log.debug("Committing transaction {}", request.getTransactionId()); try { + long start = System.currentTimeMillis(); transactionContext .get(request.getTransactionId()) .commit( (succ, exc) -> { + Metrics.INGEST_LATENCY.increment(System.currentTimeMillis() - start); if (exc != null) { log.warn( "Error during committing transaction {}", request.getTransactionId(), exc); + } else { + log.info("Transaction {} committed", request.getTransactionId()); } responseObserver.onNext( TransactionCommitResponse.newBuilder() @@ -448,7 +449,6 @@ public void commit( .build()); responseObserver.onCompleted(); }); - log.info("Transaction {} committed", request.getTransactionId()); } catch (TransactionRejectedException e) { log.info("Transaction {} rejected.", request.getTransactionId()); responseObserver.onNext( diff --git a/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/metrics/Metrics.java b/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/metrics/Metrics.java index 13db1171d..0b8451f06 100644 --- a/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/metrics/Metrics.java +++ b/direct/ingest-server/src/main/java/cz/o2/proxima/direct/server/metrics/Metrics.java @@ -85,6 +85,9 @@ public class Metrics { public static final GaugeMetric LIVENESS = FACTORY.gauge(GROUP, "liveness"); + public static final ApproxPercentileMetric INGEST_LATENCY = + FACTORY.percentile(GROUP, "ingest-latency", 60_000, 1_000); + private static final Map> consumerMetrics = Collections.synchronizedMap(new HashMap<>());