Skip to content

Commit

Permalink
refactor: expose s3 metric names for doc generation
Browse files Browse the repository at this point in the history
  • Loading branch information
jeqo committed Oct 22, 2024
1 parent 34bac1b commit ef05c49
Show file tree
Hide file tree
Showing 2 changed files with 587 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Map;
import java.util.stream.Collectors;

import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.metrics.JmxReporter;
import org.apache.kafka.common.metrics.KafkaMetricsContext;
import org.apache.kafka.common.metrics.MetricConfig;
Expand All @@ -39,64 +40,240 @@
import software.amazon.awssdk.metrics.MetricCollection;
import software.amazon.awssdk.metrics.MetricPublisher;

import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.ABORT_MULTIPART_UPLOAD_REQUESTS;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.ABORT_MULTIPART_UPLOAD_REQUESTS_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.ABORT_MULTIPART_UPLOAD_REQUESTS_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.ABORT_MULTIPART_UPLOAD_TIME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.ABORT_MULTIPART_UPLOAD_TIME_AVG_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.ABORT_MULTIPART_UPLOAD_TIME_MAX_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.COMPLETE_MULTIPART_UPLOAD_REQUESTS;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.COMPLETE_MULTIPART_UPLOAD_REQUESTS_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.COMPLETE_MULTIPART_UPLOAD_REQUESTS_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.COMPLETE_MULTIPART_UPLOAD_TIME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.COMPLETE_MULTIPART_UPLOAD_TIME_AVG_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.COMPLETE_MULTIPART_UPLOAD_TIME_MAX_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.CONFIGURED_TIMEOUT_ERRORS;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.CONFIGURED_TIMEOUT_ERRORS_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.CONFIGURED_TIMEOUT_ERRORS_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.CREATE_MULTIPART_UPLOAD_REQUESTS;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.CREATE_MULTIPART_UPLOAD_REQUESTS_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.CREATE_MULTIPART_UPLOAD_REQUESTS_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.CREATE_MULTIPART_UPLOAD_TIME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.CREATE_MULTIPART_UPLOAD_TIME_AVG_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.CREATE_MULTIPART_UPLOAD_TIME_MAX_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.DELETE_OBJECTS_REQUESTS;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.DELETE_OBJECTS_REQUESTS_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.DELETE_OBJECTS_REQUESTS_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.DELETE_OBJECTS_TIME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.DELETE_OBJECTS_TIME_AVG_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.DELETE_OBJECTS_TIME_MAX_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.DELETE_OBJECT_REQUESTS;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.DELETE_OBJECT_REQUESTS_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.DELETE_OBJECT_REQUESTS_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.DELETE_OBJECT_TIME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.DELETE_OBJECT_TIME_AVG_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.DELETE_OBJECT_TIME_MAX_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.GET_OBJECT_REQUESTS;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.GET_OBJECT_REQUESTS_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.GET_OBJECT_REQUESTS_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.GET_OBJECT_TIME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.GET_OBJECT_TIME_AVG_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.GET_OBJECT_TIME_MAX_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.IO_ERRORS;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.IO_ERRORS_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.IO_ERRORS_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.METRIC_CONTEXT;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.OTHER_ERRORS;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.OTHER_ERRORS_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.OTHER_ERRORS_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.PUT_OBJECT_REQUESTS;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.PUT_OBJECT_REQUESTS_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.PUT_OBJECT_REQUESTS_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.PUT_OBJECT_TIME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.PUT_OBJECT_TIME_AVG_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.PUT_OBJECT_TIME_MAX_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.SERVER_ERRORS;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.SERVER_ERRORS_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.SERVER_ERRORS_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.THROTTLING_ERRORS;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.THROTTLING_ERRORS_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.THROTTLING_ERRORS_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.UPLOAD_PART_REQUESTS;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.UPLOAD_PART_REQUESTS_RATE_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.UPLOAD_PART_REQUESTS_TOTAL_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.UPLOAD_PART_TIME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.UPLOAD_PART_TIME_AVG_METRIC_NAME;
import static io.aiven.kafka.tieredstorage.storage.s3.MetricRegistry.UPLOAD_PART_TIME_MAX_METRIC_NAME;
import static software.amazon.awssdk.core.internal.metrics.SdkErrorType.CONFIGURED_TIMEOUT;
import static software.amazon.awssdk.core.internal.metrics.SdkErrorType.IO;
import static software.amazon.awssdk.core.internal.metrics.SdkErrorType.OTHER;
import static software.amazon.awssdk.core.internal.metrics.SdkErrorType.SERVER_ERROR;
import static software.amazon.awssdk.core.internal.metrics.SdkErrorType.THROTTLING;

class MetricCollector implements MetricPublisher {
public class MetricCollector implements MetricPublisher {
private static final Logger log = LoggerFactory.getLogger(MetricCollector.class);

private final org.apache.kafka.common.metrics.Metrics metrics;

private static final String METRIC_GROUP = "s3-client-metrics";
private final Map<String, Sensor> requestMetrics = new HashMap<>();
private final Map<String, Sensor> latencyMetrics = new HashMap<>();
private final Map<String, Sensor> errorMetrics = new HashMap<>();

MetricCollector() {
public MetricCollector() {
final MetricsReporter reporter = new JmxReporter();

metrics = new org.apache.kafka.common.metrics.Metrics(
new MetricConfig(), List.of(reporter), Time.SYSTEM,
new KafkaMetricsContext("aiven.kafka.server.tieredstorage.s3")
);
requestMetrics.put("GetObject", createRequestsSensor("get-object-requests"));
latencyMetrics.put("GetObject", createLatencySensor("get-object-time"));
requestMetrics.put("UploadPart", createRequestsSensor("upload-part-requests"));
latencyMetrics.put("UploadPart", createLatencySensor("upload-part-time"));
requestMetrics.put("CreateMultipartUpload", createRequestsSensor("create-multipart-upload-requests"));
latencyMetrics.put("CreateMultipartUpload", createLatencySensor("create-multipart-upload-time"));
requestMetrics.put("CompleteMultipartUpload", createRequestsSensor("complete-multipart-upload-requests"));
latencyMetrics.put("CompleteMultipartUpload", createLatencySensor("complete-multipart-upload-time"));
requestMetrics.put("PutObject", createRequestsSensor("put-object-requests"));
latencyMetrics.put("PutObject", createLatencySensor("put-object-time"));
requestMetrics.put("DeleteObject", createRequestsSensor("delete-object-requests"));
latencyMetrics.put("DeleteObject", createLatencySensor("delete-object-time"));
requestMetrics.put("DeleteObjects", createRequestsSensor("delete-objects-requests"));
latencyMetrics.put("DeleteObjects", createLatencySensor("delete-objects-time"));
requestMetrics.put("AbortMultipartUpload", createRequestsSensor("abort-multipart-upload-requests"));
latencyMetrics.put("AbortMultipartUpload", createLatencySensor("abort-multipart-upload-time"));

errorMetrics.put(THROTTLING.toString(), createRequestsSensor("throttling-errors"));
errorMetrics.put(SERVER_ERROR.toString(), createRequestsSensor("server-errors"));
errorMetrics.put(CONFIGURED_TIMEOUT.toString(), createRequestsSensor("configured-timeout-errors"));
errorMetrics.put(IO.toString(), createRequestsSensor("io-errors"));
errorMetrics.put(OTHER.toString(), createRequestsSensor("other-errors"));
new KafkaMetricsContext(METRIC_CONTEXT)
);
final Sensor getObjectRequestsSensor = createRequestsSensor(
GET_OBJECT_REQUESTS,
GET_OBJECT_REQUESTS_RATE_METRIC_NAME,
GET_OBJECT_REQUESTS_TOTAL_METRIC_NAME
);
requestMetrics.put("GetObject", getObjectRequestsSensor);
final Sensor getObjectTimeSensor = createLatencySensor(
GET_OBJECT_TIME,
GET_OBJECT_TIME_AVG_METRIC_NAME,
GET_OBJECT_TIME_MAX_METRIC_NAME
);
latencyMetrics.put("GetObject", getObjectTimeSensor);
final Sensor uploadPartRequestsSensor = createRequestsSensor(
UPLOAD_PART_REQUESTS,
UPLOAD_PART_REQUESTS_RATE_METRIC_NAME,
UPLOAD_PART_REQUESTS_TOTAL_METRIC_NAME
);
requestMetrics.put("UploadPart", uploadPartRequestsSensor);
final Sensor uploadPartTimeSensor = createLatencySensor(
UPLOAD_PART_TIME,
UPLOAD_PART_TIME_AVG_METRIC_NAME,
UPLOAD_PART_TIME_MAX_METRIC_NAME
);
latencyMetrics.put("UploadPart", uploadPartTimeSensor);
final Sensor createMpuRequestsSensor = createRequestsSensor(
CREATE_MULTIPART_UPLOAD_REQUESTS,
CREATE_MULTIPART_UPLOAD_REQUESTS_RATE_METRIC_NAME,
CREATE_MULTIPART_UPLOAD_REQUESTS_TOTAL_METRIC_NAME
);
requestMetrics.put("CreateMultipartUpload", createMpuRequestsSensor);
final Sensor createMpuTimeSensor = createLatencySensor(
CREATE_MULTIPART_UPLOAD_TIME,
CREATE_MULTIPART_UPLOAD_TIME_AVG_METRIC_NAME,
CREATE_MULTIPART_UPLOAD_TIME_MAX_METRIC_NAME
);
latencyMetrics.put("CreateMultipartUpload", createMpuTimeSensor);
final Sensor completeMpuRequestsSensor = createRequestsSensor(
COMPLETE_MULTIPART_UPLOAD_REQUESTS,
COMPLETE_MULTIPART_UPLOAD_REQUESTS_RATE_METRIC_NAME,
COMPLETE_MULTIPART_UPLOAD_REQUESTS_TOTAL_METRIC_NAME
);
requestMetrics.put("CompleteMultipartUpload", completeMpuRequestsSensor);
final Sensor completeMpuTimeSensor = createLatencySensor(
COMPLETE_MULTIPART_UPLOAD_TIME,
COMPLETE_MULTIPART_UPLOAD_TIME_AVG_METRIC_NAME,
COMPLETE_MULTIPART_UPLOAD_TIME_MAX_METRIC_NAME
);
latencyMetrics.put("CompleteMultipartUpload", completeMpuTimeSensor);
final Sensor putObjectRequestsSensor = createRequestsSensor(
PUT_OBJECT_REQUESTS,
PUT_OBJECT_REQUESTS_RATE_METRIC_NAME,
PUT_OBJECT_REQUESTS_TOTAL_METRIC_NAME
);
requestMetrics.put("PutObject", putObjectRequestsSensor);
final Sensor putObjectTimeSensor = createLatencySensor(
PUT_OBJECT_TIME,
PUT_OBJECT_TIME_AVG_METRIC_NAME,
PUT_OBJECT_TIME_MAX_METRIC_NAME
);
latencyMetrics.put("PutObject", putObjectTimeSensor);
final Sensor deleteObjectRequestsSensor = createRequestsSensor(
DELETE_OBJECT_REQUESTS,
DELETE_OBJECT_REQUESTS_RATE_METRIC_NAME,
DELETE_OBJECT_REQUESTS_TOTAL_METRIC_NAME
);
requestMetrics.put("DeleteObject", deleteObjectRequestsSensor);
final Sensor deleteObjectTimeSensor = createLatencySensor(
DELETE_OBJECT_TIME,
DELETE_OBJECT_TIME_AVG_METRIC_NAME,
DELETE_OBJECT_TIME_MAX_METRIC_NAME
);
latencyMetrics.put("DeleteObject", deleteObjectTimeSensor);
final Sensor deleteObjectsRequestsSensor = createRequestsSensor(
DELETE_OBJECTS_REQUESTS,
DELETE_OBJECTS_REQUESTS_RATE_METRIC_NAME,
DELETE_OBJECTS_REQUESTS_TOTAL_METRIC_NAME
);
requestMetrics.put("DeleteObjects", deleteObjectsRequestsSensor);
final Sensor deleteObjectsTimeSensor = createLatencySensor(
DELETE_OBJECTS_TIME,
DELETE_OBJECTS_TIME_AVG_METRIC_NAME,
DELETE_OBJECTS_TIME_MAX_METRIC_NAME
);
latencyMetrics.put("DeleteObjects", deleteObjectsTimeSensor);
final Sensor abortMpuRequestsSensor = createRequestsSensor(
ABORT_MULTIPART_UPLOAD_REQUESTS,
ABORT_MULTIPART_UPLOAD_REQUESTS_RATE_METRIC_NAME,
ABORT_MULTIPART_UPLOAD_REQUESTS_TOTAL_METRIC_NAME
);
requestMetrics.put("AbortMultipartUpload", abortMpuRequestsSensor);
final Sensor abortMpuTimeSensor = createLatencySensor(
ABORT_MULTIPART_UPLOAD_TIME,
ABORT_MULTIPART_UPLOAD_TIME_AVG_METRIC_NAME,
ABORT_MULTIPART_UPLOAD_TIME_MAX_METRIC_NAME
);
latencyMetrics.put("AbortMultipartUpload", abortMpuTimeSensor);

final Sensor throttlingErrorsSensor = createRequestsSensor(
THROTTLING_ERRORS,
THROTTLING_ERRORS_RATE_METRIC_NAME,
THROTTLING_ERRORS_TOTAL_METRIC_NAME
);
errorMetrics.put(THROTTLING.toString(), throttlingErrorsSensor);
final Sensor serverErrorsSensor = createRequestsSensor(
SERVER_ERRORS,
SERVER_ERRORS_RATE_METRIC_NAME,
SERVER_ERRORS_TOTAL_METRIC_NAME
);
errorMetrics.put(SERVER_ERROR.toString(), serverErrorsSensor);
final Sensor configuredTimeoutErrorsSensor = createRequestsSensor(
CONFIGURED_TIMEOUT_ERRORS,
CONFIGURED_TIMEOUT_ERRORS_RATE_METRIC_NAME,
CONFIGURED_TIMEOUT_ERRORS_TOTAL_METRIC_NAME
);
errorMetrics.put(CONFIGURED_TIMEOUT.toString(), configuredTimeoutErrorsSensor);
final Sensor ioErrorsSensor = createRequestsSensor(
IO_ERRORS,
IO_ERRORS_RATE_METRIC_NAME,
IO_ERRORS_TOTAL_METRIC_NAME
);
errorMetrics.put(IO.toString(), ioErrorsSensor);
final Sensor otherErrorsSensor = createRequestsSensor(
OTHER_ERRORS,
OTHER_ERRORS_RATE_METRIC_NAME,
OTHER_ERRORS_TOTAL_METRIC_NAME
);
errorMetrics.put(OTHER.toString(), otherErrorsSensor);
}

private Sensor createRequestsSensor(final String name) {
private Sensor createRequestsSensor(
final String name,
final MetricNameTemplate rateMetricName,
final MetricNameTemplate totalMetricName
) {
final Sensor sensor = metrics.sensor(name);
sensor.add(metrics.metricName(name + "-rate", METRIC_GROUP), new Rate());
sensor.add(metrics.metricName(name + "-total", METRIC_GROUP), new CumulativeCount());
sensor.add(metrics.metricInstance(rateMetricName), new Rate());
sensor.add(metrics.metricInstance(totalMetricName), new CumulativeCount());
return sensor;
}

private Sensor createLatencySensor(final String name) {
private Sensor createLatencySensor(
final String name,
final MetricNameTemplate avgMetricName,
final MetricNameTemplate maxMetricName
) {
final Sensor sensor = metrics.sensor(name);
sensor.add(metrics.metricName(name + "-max", METRIC_GROUP), new Max());
sensor.add(metrics.metricName(name + "-avg", METRIC_GROUP), new Avg());
sensor.add(metrics.metricInstance(maxMetricName), new Max());
sensor.add(metrics.metricInstance(avgMetricName), new Avg());
return sensor;
}

Expand Down
Loading

0 comments on commit ef05c49

Please sign in to comment.