From 25797a1e9999f8124bdd2af1b739781b05c9b47f Mon Sep 17 00:00:00 2001 From: Ramin Gharib Date: Fri, 16 Feb 2024 11:39:33 +0100 Subject: [PATCH] Validate offset topic (#20) --- .../kafka/KafkaConnectorSourceResetter.java | 18 ++++++++--- ...onnectorSourceResetterApplicationTest.java | 32 +++++++++++++++++++ 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java index 6d09874..30da329 100644 --- a/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java +++ b/src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java @@ -80,9 +80,9 @@ * } * * Kafka Connect stores offsets for source connectors in a dedicated topic. The key of such an offset consists of the - * connector name and a connector specific partition name, e.g., {@code ["connector-name", { some-source-specific - * -data... }] }. This tool finds all partitions belonging to the connector that should be reset and deletes the - * corresponding offsets. + * connector name and a connector specific partition name, e.g., + * {@code ["connector-name", { some-source-specific -data... }] }. This tool finds all partitions belonging to the + * connector that should be reset and deletes the corresponding offsets. */ @Slf4j @@ -135,6 +135,7 @@ public void run() { log.info("Finished resetting {}", this.sharedOptions.getConnectorName()); } + private void resetPartitions(final Iterable partitions, final Map kafkaConfig) { try (final Producer producer = createProducer(kafkaConfig)) { producer.initTransactions(); @@ -179,7 +180,7 @@ private Consumer createConsumer(final Map kafkaC final Deserializer byteArrayDeserializer = new ByteArrayDeserializer(); final Consumer consumer = new KafkaConsumer<>(kafkaConfig, byteArrayDeserializer, byteArrayDeserializer); - final List partitions = consumer.partitionsFor(this.offsetTopic); + final List partitions = this.partitionsForOffsetTopic(consumer); final List topicPartitions = partitions.stream() .map(KafkaConnectorSourceResetter::toTopicPartition) .collect(Collectors.toList()); @@ -188,4 +189,13 @@ private Consumer createConsumer(final Map kafkaC return consumer; } + private List partitionsForOffsetTopic(final Consumer consumer) { + final Map> topicsWithPartition = consumer.listTopics(); + if (!topicsWithPartition.containsKey(this.offsetTopic)) { + final String message = String.format("Topic '%s' does not exist.", this.offsetTopic); + throw new IllegalArgumentException(message); + } + return topicsWithPartition.get(this.offsetTopic); + } + } diff --git a/src/test/java/com/bakdata/kafka/KafkaConnectorSourceResetterApplicationTest.java b/src/test/java/com/bakdata/kafka/KafkaConnectorSourceResetterApplicationTest.java index d02f8ad..677e3d6 100644 --- a/src/test/java/com/bakdata/kafka/KafkaConnectorSourceResetterApplicationTest.java +++ b/src/test/java/com/bakdata/kafka/KafkaConnectorSourceResetterApplicationTest.java @@ -123,6 +123,38 @@ void test() throws InterruptedException { this.softly.assertThat(valuesAfterReset).hasSize(6); } + @Test + void shouldExitOneWhenOffsetTopicIsSetIncorrectly() throws InterruptedException { + this.createConnectCluster(); + delay(10, TimeUnit.SECONDS); + final List values = this.kafkaCluster.readValues(ReadKeyValues.from(TOPIC) + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .build()); + this.softly.assertThat(values) + .hasSize(3); + this.connectCluster.stop(); + delay(10, TimeUnit.SECONDS); + final KafkaConnectResetterApplication app = new KafkaConnectResetterApplication(); + + final CommandLine commandLine = getCLI(app); + final int exitCode = commandLine.execute("source", + CONNECTOR_NAME, + "--brokers", this.kafkaCluster.getBrokerList(), + "--offset-topic", "wrong-offset-topic" + ); + this.softly.assertThat(exitCode) + .isEqualTo(1); + this.createConnectCluster(); + delay(10, TimeUnit.SECONDS); + final List valuesAfterReset = this.kafkaCluster.readValues(ReadKeyValues.from(TOPIC) + .with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) + .build()); + this.softly.assertThat(valuesAfterReset) + .hasSize(3); + } + private void createConnectCluster() { this.connectCluster = new EmbeddedConnect(EmbeddedConnectConfig.kafkaConnect() .with(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, OFFSETS)