Skip to content

Commit

Permalink
Merge pull request #870: [proxima-direct-ingest-server] add latency m…
Browse files Browse the repository at this point in the history
…etrics to IngestService
  • Loading branch information
je-ik authored Jan 23, 2024
2 parents 220cb84 + 525c708 commit 8150cd8
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ static boolean ingestRequest(
String uuid,
Consumer<Rpc.Status> responseConsumer) {

long start = System.currentTimeMillis();
AttributeDescriptor<?> attributeDesc = ingest.getAttributeDescriptor();

OnlineAttributeWriter writer = getWriterForAttributeInTransform(direct, attributeDesc);
Expand All @@ -156,11 +157,12 @@ static boolean ingestRequest(
}

Metrics.COMMIT_LOG_APPEND.increment();
// write the ingest into the commit log and confirm to the client
// write the element into the commit log and confirm to the client
log.debug("Writing {} to commit log {}", ingest, writer.getUri());
writer.write(
ingest,
(s, exc) -> {
Metrics.INGEST_LATENCY.increment(System.currentTimeMillis() - start);
if (s) {
responseConsumer.accept(ok(uuid));
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,7 @@ private StreamElement validateAndConvertToStreamElement(
if (Strings.isNullOrEmpty(request.getKey())
|| Strings.isNullOrEmpty(request.getEntity())
|| Strings.isNullOrEmpty(request.getAttribute())) {

consumer.accept(status(request.getUuid(), 400, "Missing required fields in input message"));
return null;
}
Expand Down Expand Up @@ -398,7 +399,6 @@ private StreamElement validateAndConvertToStreamElement(

@Override
public void ingest(Ingest request, StreamObserver<Status> responseObserver) {

Metrics.INGEST_SINGLE.increment();
processSingleIngest(
request,
Expand All @@ -410,17 +410,14 @@ public void ingest(Ingest request, StreamObserver<Status> responseObserver) {

@Override
public StreamObserver<Ingest> ingestSingle(StreamObserver<Status> responseObserver) {

return new IngestObserver(responseObserver);
}

@Override
public StreamObserver<IngestBulk> ingestBulk(StreamObserver<StatusBulk> responseObserver) {

// the responseObserver doesn't have to be synchronized in this
// case, because the communication with the observer is done
// in single flush thread

return new IngestBulkObserver(responseObserver);
}

Expand All @@ -431,13 +428,17 @@ public void commit(

log.debug("Committing transaction {}", request.getTransactionId());
try {
long start = System.currentTimeMillis();
transactionContext
.get(request.getTransactionId())
.commit(
(succ, exc) -> {
Metrics.INGEST_LATENCY.increment(System.currentTimeMillis() - start);
if (exc != null) {
log.warn(
"Error during committing transaction {}", request.getTransactionId(), exc);
} else {
log.info("Transaction {} committed", request.getTransactionId());
}
responseObserver.onNext(
TransactionCommitResponse.newBuilder()
Expand All @@ -448,7 +449,6 @@ public void commit(
.build());
responseObserver.onCompleted();
});
log.info("Transaction {} committed", request.getTransactionId());
} catch (TransactionRejectedException e) {
log.info("Transaction {} rejected.", request.getTransactionId());
responseObserver.onNext(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public class Metrics {

public static final GaugeMetric LIVENESS = FACTORY.gauge(GROUP, "liveness");

public static final ApproxPercentileMetric INGEST_LATENCY =
FACTORY.percentile(GROUP, "ingest-latency", 60_000, 1_000);

private static final Map<String, Pair<Boolean, GaugeMetric>> consumerMetrics =
Collections.synchronizedMap(new HashMap<>());

Expand Down

0 comments on commit 8150cd8

Please sign in to comment.