Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add metrics to Kafka Source #3118

Merged
merged 11 commits into from
Aug 10, 2023
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ public class KafkaTopicMetrics {
static final String NUMBER_OF_DESERIALIZATION_ERRORS = "numberOfDeserializationErrors";
static final String NUMBER_OF_BUFFER_SIZE_OVERFLOWS = "numberOfBufferSizeOverflows";
static final String NUMBER_OF_POLL_AUTH_ERRORS = "numberOfPollAuthErrors";
static final String NUMBER_OF_NON_CONSUMERS = "numberOfNonConsumers";
static final String NUMBER_OF_RECORDS_COMMITTED = "numberOfRecordsCommitted";
static final String NUMBER_OF_RECORDS_CONSUMED = "numberOfRecordsConsumed";
static final String NUMBER_OF_BYTES_CONSUMED = "numberOfBytesConsumed";
Expand All @@ -42,7 +41,6 @@ public class KafkaTopicMetrics {
private final Counter numberOfRecordsCommitted;
private final Counter numberOfRecordsConsumed;
private final Counter numberOfBytesConsumed;
private final Counter numberOfNonConsumers;

public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetrics) {
this.pluginMetrics = pluginMetrics;
Expand All @@ -57,7 +55,6 @@ public KafkaTopicMetrics(final String topicName, final PluginMetrics pluginMetri
this.numberOfDeserializationErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_DESERIALIZATION_ERRORS));
this.numberOfBufferSizeOverflows = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_BUFFER_SIZE_OVERFLOWS));
this.numberOfPollAuthErrors = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POLL_AUTH_ERRORS));
this.numberOfNonConsumers = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_NON_CONSUMERS));
this.numberOfPositiveAcknowledgements = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_POSITIVE_ACKNOWLEDGEMENTS));
this.numberOfNegativeAcknowledgements = pluginMetrics.counter(getTopicMetricName(NUMBER_OF_NEGATIVE_ACKNOWLEDGEMENTS));
}
Expand All @@ -74,34 +71,40 @@ private void initializeMetricNamesMap() {
metricsNameMap.put("join-rate", "joinRate");
metricsNameMap.put("incoming-byte-rate", "incomingByteRate");
metricsNameMap.put("outgoing-byte-rate", "outgoingByteRate");
metricsNameMap.put("assigned-partitions", "numberOfNonConsumers");
metricsNameMap.forEach((metricName, camelCaseName) -> {
if (!metricName.contains("-total")) {
if (metricName.equals("records-lag-max")) {
pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code should be unit tested.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, it might make sense to invert the if-else with the gauge to clarify the behavior some. The condition is really on the metric function, not inside it.

Something like:

if(metricName.equals("records-lag-max") {
  pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> {
    double max = 0.0;
    for (Map.Entry<KafkaConsumer, Map<String, Double>> entry : metricValues.entrySet()) {
    if (entry.getValue().get(metricName) > max) {
        max = entry.getValue().get(metricName);
      }
    }
    return max;
  });
else if(...)
  pluginMetrics.gauge(...)
}

synchronized(metricValues) {
if (metricName.equals("records-lag-max")) {
double max = 0.0;
for (Map.Entry<KafkaConsumer, Map<String, Double>> entry : metricValues.entrySet()) {
if (entry.getValue().get(metricName) > max) {
max = entry.getValue().get(metricName);
}
}
return max;
} else if (metricName.equals("records-lead-min")) {
double min = Double.MAX_VALUE;
for (Map.Entry<KafkaConsumer, Map<String, Double>> entry : metricValues.entrySet()) {
if (entry.getValue().get(metricName) < min) {
min = entry.getValue().get(metricName);
}
}
return min;
} else {
double sum = 0;
for (Map.Entry<KafkaConsumer, Map<String, Double>> entry : metricValues.entrySet()) {
sum += entry.getValue().get(metricName);
}
return sum;
double max = 0.0;
for (Map.Entry<KafkaConsumer, Map<String, Double>> entry : metricValues.entrySet()) {
Map<String, Double> consumerMetrics = entry.getValue();
synchronized(consumerMetrics) {
max = Math.max(max, consumerMetrics.get(metricName));
}
}
return max;
});
} else if (metricName.equals("records-lead-min")) {
pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> {
double min = Double.MAX_VALUE;
for (Map.Entry<KafkaConsumer, Map<String, Double>> entry : metricValues.entrySet()) {
Map<String, Double> consumerMetrics = entry.getValue();
synchronized(consumerMetrics) {
min = Math.min(min, consumerMetrics.get(metricName));
}
}
return min;
});
} else if (!metricName.contains("-total")) {
pluginMetrics.gauge(getTopicMetricName(camelCaseName), metricValues, metricValues -> {
double sum = 0;
for (Map.Entry<KafkaConsumer, Map<String, Double>> entry : metricValues.entrySet()) {
Map<String, Double> consumerMetrics = entry.getValue();
synchronized(consumerMetrics) {
sum += consumerMetrics.get(metricName);
}
}
return sum;
});
}
});
Expand All @@ -115,12 +118,16 @@ public void register(final KafkaConsumer consumer) {
});
}

public Counter getNumberOfRecordsCommitted() {
return numberOfRecordsCommitted;
Counter getNumberOfRecordsConsumed() {
return numberOfRecordsConsumed;
}

public Counter getNumberOfNonConsumers() {
return numberOfNonConsumers;
Counter getNumberOfBytesConsumed() {
return numberOfBytesConsumed;
}

public Counter getNumberOfRecordsCommitted() {
return numberOfRecordsCommitted;
}

public Counter getNumberOfPollAuthErrors() {
Expand All @@ -147,7 +154,7 @@ public Counter getNumberOfPositiveAcknowledgements() {
return numberOfPositiveAcknowledgements;
}

public String getTopicMetricName(final String metricName) {
private String getTopicMetricName(final String metricName) {
return "topic."+topicName+"."+metricName;
}

Expand All @@ -159,36 +166,47 @@ private String getCamelCaseName(final String name) {
return camelCaseName;
}

Map<KafkaConsumer, Map<String, Double>> getMetricValues() {
return metricValues;
}

public void update(final KafkaConsumer consumer) {
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
Map<String, Double> consumerMetrics = metricValues.get(consumer);

synchronized(metricValues) {
Map<String, Double> consumerMetrics = metricValues.get(consumer);
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
MetricName metric = entry.getKey();
Metric value = entry.getValue();
String metricName = metric.name();
if (Objects.nonNull(metricsNameMap.get(metricName))) {
if (metric.tags().containsKey("partition") &&
(metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) {
continue;
}
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
for (Map.Entry<MetricName, ? extends Metric> entry : metrics.entrySet()) {
MetricName metric = entry.getKey();
Metric value = entry.getValue();
String metricName = metric.name();
if (Objects.nonNull(metricsNameMap.get(metricName))) {
if (metric.tags().containsKey("partition") &&
(metricName.equals("records-lag-max") || metricName.equals("records-lead-min"))) {
continue;
}

if (metricName.contains("consumed-total") && !metric.tags().containsKey("topic")) {
continue;
}
if (metricName.contains("byte-rate") && metric.tags().containsKey("node-id")) {
continue;
}
double newValue = (Double)value.metricValue();
if (metricName.equals("records-consumed-total")) {
if (metricName.contains("consumed-total") && !metric.tags().containsKey("topic")) {
continue;
}
if (metricName.contains("byte-rate") && metric.tags().containsKey("node-id")) {
continue;
}
double newValue = (Double)value.metricValue();
if (metricName.equals("records-consumed-total")) {
synchronized(consumerMetrics) {
double prevValue = consumerMetrics.get(metricName);
numberOfRecordsConsumed.increment(newValue - prevValue);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than tracking the previous value, do you think we could use the metric's built-in time feature?

https://kafka.apache.org/27/javadoc/org/apache/kafka/common/metrics/Measurable.html#measure-org.apache.kafka.common.metrics.MetricConfig-long-

Thus:

previousValue = ???.measure(???, lastTime);

It's not critical, but might be something to consider. Perhaps for a follow-on.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do this later.

} else if (metricName.equals("bytes-consumed-total")) {
}
} else if (metricName.equals("bytes-consumed-total")) {
synchronized(consumerMetrics) {
double prevValue = consumerMetrics.get(metricName);
numberOfBytesConsumed.increment(newValue - prevValue);
}

}
// Keep the count of number of consumers without any assigned partitions. This value can go up or down. So, it is made as Guage metric
if (metricName.equals("assigned-partitions")) {
newValue = (newValue == 0.0) ? 1.0 : 0.0;
}
synchronized(consumerMetrics) {
consumerMetrics.put(metricName, newValue);
}
}
Expand Down
Loading
Loading