Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use topic hook for schema registry reset #251

Draft
wants to merge 2 commits into
base: v3
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig to
public static ProducerCleanUpRunner create(@NonNull final ProducerTopicConfig topics,
@NonNull final Map<String, Object> kafkaProperties,
@NonNull final ProducerCleanUpConfiguration configuration) {
SchemaRegistryTopicHook.createSchemaRegistryClient(kafkaProperties)
.map(SchemaRegistryTopicHook::new)
.ifPresent(configuration::registerTopicHook);
return new ProducerCleanUpRunner(topics, kafkaProperties, configuration);
}

Expand Down Expand Up @@ -105,8 +108,8 @@ private void deleteTopics() {
}

private void deleteTopic(final String topic) {
this.adminClient.getSchemaTopicClient()
.deleteTopicAndResetSchemaRegistry(topic);
this.adminClient.getTopicClient()
.deleteTopicIfExists(topic);
ProducerCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,94 +22,65 @@
* SOFTWARE.
*/

package com.bakdata.kafka.util;
package com.bakdata.kafka;

import com.bakdata.kafka.CleanUpException;
import com.bakdata.kafka.HasTopicHooks.TopicHook;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClientFactory;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import lombok.NonNull;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.admin.AdminClient;

/**
* Client to interact with Kafka topics and its associated schema registry subjects in a unified way
*/
@Slf4j
@RequiredArgsConstructor
public final class SchemaTopicClient implements AutoCloseable {
public class SchemaRegistryTopicHook implements TopicHook {
private static final int CACHE_CAPACITY = 100;
private final @NonNull TopicClient topicClient;
private final SchemaRegistryClient schemaRegistryClient;

/**
* Creates a new {@code SchemaTopicClient} using the specified configuration.
*
* @param configs properties passed to {@link AdminClient#create(Map)}
* @param schemaRegistryUrl URL of schema registry
* @param timeout timeout for waiting for Kafka admin calls
* @return {@code SchemaTopicClient}
*/
public static SchemaTopicClient create(final Map<String, Object> configs, final String schemaRegistryUrl,
final Duration timeout) {
final SchemaRegistryClient schemaRegistryClient =
createSchemaRegistryClient(configs, schemaRegistryUrl);
final TopicClient topicClient = TopicClient.create(configs, timeout);
return new SchemaTopicClient(topicClient, schemaRegistryClient);
}
private final @NonNull SchemaRegistryClient schemaRegistryClient;

/**
* Creates a new {@code SchemaTopicClient} with no {@link SchemaRegistryClient} using the specified configuration.
* Creates a new {@code SchemaRegistryClient} using the specified configuration if
* {@link AbstractKafkaSchemaSerDeConfig#SCHEMA_REGISTRY_URL_CONFIG} is configured.
*
* @param configs properties passed to {@link AdminClient#create(Map)}
* @param timeout timeout for waiting for Kafka admin calls
* @return {@code SchemaTopicClient}
* @param kafkaProperties properties for creating {@code SchemaRegistryClient}
* @return {@code SchemaRegistryClient} if {@link AbstractKafkaSchemaSerDeConfig#SCHEMA_REGISTRY_URL_CONFIG} is
* configured
* @see SchemaRegistryTopicHook#createSchemaRegistryClient(Map, String)
*/
public static SchemaTopicClient create(final Map<String, Object> configs, final Duration timeout) {
final TopicClient topicClient = TopicClient.create(configs, timeout);
return new SchemaTopicClient(topicClient, null);
public static Optional<SchemaRegistryClient> createSchemaRegistryClient(final Map<String, Object> kafkaProperties) {
final String schemaRegistryUrl =
(String) kafkaProperties.get(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
if (schemaRegistryUrl == null) {
return Optional.empty();
}
final Map<String, Object> properties = new HashMap<>(kafkaProperties);
properties.remove(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
return Optional.of(createSchemaRegistryClient(properties, schemaRegistryUrl));
}

/**
* Creates a new {@link SchemaRegistryClient} using the specified configuration.
* Creates a new {@code SchemaRegistryClient} using the specified configuration.
*
* @param configs properties passed to
* {@link SchemaRegistryClientFactory#newClient(List, int, List, Map, Map)}
* @param schemaRegistryUrl URL of schema registry
* @return {@link SchemaRegistryClient}
* @return {@code SchemaRegistryClient}
*/
public static SchemaRegistryClient createSchemaRegistryClient(@NonNull final Map<String, Object> configs,
@NonNull final String schemaRegistryUrl) {
return SchemaRegistryClientFactory.newClient(List.of(schemaRegistryUrl), CACHE_CAPACITY, null, configs, null);
}

/**
* Delete a topic if it exists and reset the corresponding Schema Registry subjects.
*
* @param topic the topic name
*/
public void deleteTopicAndResetSchemaRegistry(final String topic) {
this.topicClient.deleteTopicIfExists(topic);
this.resetSchemaRegistry(topic);
}

/**
* Delete key and value schemas associated with a topic from the schema registry.
*
* @param topic the topic name
*/
public void resetSchemaRegistry(final String topic) {
if (this.schemaRegistryClient == null) {
log.debug("No Schema Registry URL set. Skipping schema deletion for topic {}.", topic);
return;
}
@Override
public void deleted(final String topic) {
log.info("Resetting Schema Registry for topic '{}'", topic);
try {
final Collection<String> allSubjects = this.schemaRegistryClient.getAllSubjects();
Expand All @@ -134,13 +105,10 @@ public void resetSchemaRegistry(final String topic) {

@Override
public void close() {
this.topicClient.close();
if (this.schemaRegistryClient != null) {
try {
this.schemaRegistryClient.close();
} catch (final IOException e) {
throw new UncheckedIOException("Error closing schema registry client", e);
}
try {
this.schemaRegistryClient.close();
} catch (final IOException e) {
throw new UncheckedIOException("Error closing schema registry client", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ public static StreamsCleanUpRunner create(final @NonNull Topology topology,
final @NonNull StreamsConfig streamsConfig, final @NonNull StreamsCleanUpConfiguration configuration) {
final ImprovedStreamsConfig config = new ImprovedStreamsConfig(streamsConfig);
final TopologyInformation topologyInformation = new TopologyInformation(topology, config.getAppId());
SchemaRegistryTopicHook.createSchemaRegistryClient(config.getKafkaProperties())
.map(SchemaRegistryTopicHook::new)
.ifPresent(configuration::registerTopicHook);
return new StreamsCleanUpRunner(topologyInformation, topology, config, configuration);
}

Expand Down Expand Up @@ -242,14 +245,12 @@ private void deleteTopics() {
}

private void resetInternalTopic(final String topic) {
this.adminClient.getSchemaTopicClient()
.resetSchemaRegistry(topic);
StreamsCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic);
}

private void deleteTopic(final String topic) {
this.adminClient.getSchemaTopicClient()
.deleteTopicAndResetSchemaRegistry(topic);
this.adminClient.getTopicClient()
.deleteTopicIfExists(topic);
StreamsCleanUpRunner.this.cleanHooks.runTopicDeletionHooks(topic);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,9 @@

package com.bakdata.kafka.util;

import static com.bakdata.kafka.util.SchemaTopicClient.createSchemaRegistryClient;
import static com.bakdata.kafka.SchemaRegistryTopicHook.createSchemaRegistryClient;

import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
Expand Down Expand Up @@ -75,10 +74,7 @@ public static ImprovedAdminClient create(@NonNull final Map<String, Object> prop
String.format("%s must be specified in properties", AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
}
final Admin adminClient = AdminClient.create(properties);
final String schemaRegistryUrl =
(String) properties.get(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG);
final SchemaRegistryClient schemaRegistryClient =
schemaRegistryUrl == null ? null : createSchemaRegistryClient(properties, schemaRegistryUrl);
final SchemaRegistryClient schemaRegistryClient = createSchemaRegistryClient(properties).orElse(null);
return builder()
.adminClient(adminClient)
.schemaRegistryClient(schemaRegistryClient)
Expand All @@ -95,10 +91,6 @@ public Optional<SchemaRegistryClient> getSchemaRegistryClient() {
.map(PooledSchemaRegistryClient::new);
}

public SchemaTopicClient getSchemaTopicClient() {
return new SchemaTopicClient(this.getTopicClient(), this.getSchemaRegistryClient().orElse(null));
}

public TopicClient getTopicClient() {
return new TopicClient(this.getAdminClient(), this.timeout);
}
Expand Down
Loading
Loading