Skip to content

Commit

Permalink
Validate offset topic (#20)
Browse files Browse the repository at this point in the history
  • Loading branch information
raminqaf authored Feb 16, 2024
1 parent 6e4b03c commit 25797a1
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 4 deletions.
18 changes: 14 additions & 4 deletions src/main/java/com/bakdata/kafka/KafkaConnectorSourceResetter.java
Original file line number Diff line number Diff line change
Expand Up @@ -80,9 +80,9 @@
* }</pre>
*
* Kafka Connect stores offsets for source connectors in a dedicated topic. The key of such an offset consists of the
* connector name and a connector specific partition name, e.g., {@code ["connector-name", { some-source-specific
* -data... }] }. This tool finds all partitions belonging to the connector that should be reset and deletes the
* corresponding offsets.
* connector name and a connector specific partition name, e.g.,
* {@code ["connector-name", { some-source-specific -data... }] }. This tool finds all partitions belonging to the
* connector that should be reset and deletes the corresponding offsets.
*/

@Slf4j
Expand Down Expand Up @@ -135,6 +135,7 @@ public void run() {
log.info("Finished resetting {}", this.sharedOptions.getConnectorName());
}


private void resetPartitions(final Iterable<byte[]> partitions, final Map<String, Object> kafkaConfig) {
try (final Producer<byte[], byte[]> producer = createProducer(kafkaConfig)) {
producer.initTransactions();
Expand Down Expand Up @@ -179,7 +180,7 @@ private Consumer<byte[], byte[]> createConsumer(final Map<String, Object> kafkaC
final Deserializer<byte[]> byteArrayDeserializer = new ByteArrayDeserializer();
final Consumer<byte[], byte[]> consumer =
new KafkaConsumer<>(kafkaConfig, byteArrayDeserializer, byteArrayDeserializer);
final List<PartitionInfo> partitions = consumer.partitionsFor(this.offsetTopic);
final List<PartitionInfo> partitions = this.partitionsForOffsetTopic(consumer);
final List<TopicPartition> topicPartitions = partitions.stream()
.map(KafkaConnectorSourceResetter::toTopicPartition)
.collect(Collectors.toList());
Expand All @@ -188,4 +189,13 @@ private Consumer<byte[], byte[]> createConsumer(final Map<String, Object> kafkaC
return consumer;
}

private <K, V> List<PartitionInfo> partitionsForOffsetTopic(final Consumer<K, V> consumer) {
final Map<String, List<PartitionInfo>> topicsWithPartition = consumer.listTopics();
if (!topicsWithPartition.containsKey(this.offsetTopic)) {
final String message = String.format("Topic '%s' does not exist.", this.offsetTopic);
throw new IllegalArgumentException(message);
}
return topicsWithPartition.get(this.offsetTopic);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,38 @@ void test() throws InterruptedException {
this.softly.assertThat(valuesAfterReset).hasSize(6);
}

@Test
void shouldExitOneWhenOffsetTopicIsSetIncorrectly() throws InterruptedException {
this.createConnectCluster();
delay(10, TimeUnit.SECONDS);
final List<String> values = this.kafkaCluster.readValues(ReadKeyValues.from(TOPIC)
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.build());
this.softly.assertThat(values)
.hasSize(3);
this.connectCluster.stop();
delay(10, TimeUnit.SECONDS);
final KafkaConnectResetterApplication app = new KafkaConnectResetterApplication();

final CommandLine commandLine = getCLI(app);
final int exitCode = commandLine.execute("source",
CONNECTOR_NAME,
"--brokers", this.kafkaCluster.getBrokerList(),
"--offset-topic", "wrong-offset-topic"
);
this.softly.assertThat(exitCode)
.isEqualTo(1);
this.createConnectCluster();
delay(10, TimeUnit.SECONDS);
final List<String> valuesAfterReset = this.kafkaCluster.readValues(ReadKeyValues.from(TOPIC)
.with(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.with(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
.build());
this.softly.assertThat(valuesAfterReset)
.hasSize(3);
}

private void createConnectCluster() {
this.connectCluster = new EmbeddedConnect(EmbeddedConnectConfig.kafkaConnect()
.with(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, OFFSETS)
Expand Down

0 comments on commit 25797a1

Please sign in to comment.