diff --git a/examples/kafka/pom.xml b/examples/kafka/pom.xml
index 8f3884678..b22ef38a8 100644
--- a/examples/kafka/pom.xml
+++ b/examples/kafka/pom.xml
@@ -18,6 +18,10 @@
io.quarkus
quarkus-resteasy
+
+ io.quarkus
+ quarkus-tls-registry
+
io.quarkus.qe
quarkus-test-containers
diff --git a/examples/kafka/src/main/java/io/quarkus/qe/TlsRegistryResource.java b/examples/kafka/src/main/java/io/quarkus/qe/TlsRegistryResource.java
new file mode 100644
index 000000000..ce9ffa94a
--- /dev/null
+++ b/examples/kafka/src/main/java/io/quarkus/qe/TlsRegistryResource.java
@@ -0,0 +1,33 @@
+package io.quarkus.qe;
+
+import jakarta.inject.Inject;
+import jakarta.ws.rs.GET;
+import jakarta.ws.rs.Path;
+import jakarta.ws.rs.PathParam;
+
+import io.quarkus.logging.Log;
+import io.quarkus.tls.TlsConfigurationRegistry;
+
+@Path("tls-registry")
+public class TlsRegistryResource {
+
+ @Inject
+ TlsConfigurationRegistry tlsRegistry;
+
+ @GET
+ @Path("validate-config/{tls-config-name}")
+ public boolean validateNamedTlsConfig(@PathParam("tls-config-name") String tlsConfigName) {
+ var namedConfig = tlsRegistry.get(tlsConfigName);
+ if (namedConfig.isEmpty()) {
+ Log.error("TLS config '%s' is missing".formatted(tlsConfigName));
+ return false;
+ }
+ var truststore = namedConfig.get().getTrustStore();
+ if (truststore == null) {
+ Log.error("TLS config '%s' truststore is not configured".formatted(tlsConfigName));
+ return false;
+ }
+ return true;
+ }
+
+}
diff --git a/examples/kafka/src/test/java/io/quarkus/qe/StrimziKafkaWithTlsRegistryAndSaslSslMessagingIT.java b/examples/kafka/src/test/java/io/quarkus/qe/StrimziKafkaWithTlsRegistryAndSaslSslMessagingIT.java
new file mode 100644
index 000000000..e197d5144
--- /dev/null
+++ b/examples/kafka/src/test/java/io/quarkus/qe/StrimziKafkaWithTlsRegistryAndSaslSslMessagingIT.java
@@ -0,0 +1,52 @@
+package io.quarkus.qe;
+
+import static org.hamcrest.Matchers.is;
+
+import java.time.Duration;
+
+import org.apache.http.HttpStatus;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.Test;
+
+import io.quarkus.test.bootstrap.KafkaService;
+import io.quarkus.test.bootstrap.RestService;
+import io.quarkus.test.scenarios.QuarkusScenario;
+import io.quarkus.test.services.KafkaContainer;
+import io.quarkus.test.services.QuarkusApplication;
+import io.quarkus.test.services.containers.model.KafkaProtocol;
+import io.quarkus.test.services.containers.model.KafkaVendor;
+
+@QuarkusScenario
+public class StrimziKafkaWithTlsRegistryAndSaslSslMessagingIT {
+
+ private static final String TLS_CONFIG_NAME = "tls-config-name-1";
+
+ @KafkaContainer(vendor = KafkaVendor.STRIMZI, protocol = KafkaProtocol.SASL_SSL, tlsConfigName = TLS_CONFIG_NAME, tlsRegistryEnabled = true)
+ static final KafkaService kafka = new KafkaService();
+
+ @QuarkusApplication
+ static final RestService app = new RestService()
+ .withProperties(kafka::getSslProperties)
+ .withProperty("kafka.bootstrap.servers", kafka::getBootstrapUrl);
+
+ @Test
+ public void checkUserResourceByNormalUser() {
+ Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
+ app.given().get("/prices/poll")
+ .then()
+ .statusCode(HttpStatus.SC_OK);
+ });
+ }
+
+ @Test
+ public void testTlsRegistryContainNamedTlsConfig() {
+ // this is but a smoke test so that we know @KafkaContainer#tlsConfigName had effect
+ // while the 'checkUserResourceByNormalUser' tests that configured truststore works
+ app.given()
+ .pathParam("tls-config-name", TLS_CONFIG_NAME)
+ .get("/tls-registry/validate-config/{tls-config-name}")
+ .then()
+ .statusCode(HttpStatus.SC_OK)
+ .body(is("true"));
+ }
+}
diff --git a/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/KafkaContainer.java b/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/KafkaContainer.java
index c9db03afd..248821275 100644
--- a/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/KafkaContainer.java
+++ b/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/KafkaContainer.java
@@ -37,4 +37,14 @@
String[] kafkaConfigResources() default {};
Class extends ManagedResourceBuilder> builder() default KafkaContainerManagedResourceBuilder.class;
+
+ /**
+ * @return name of the TLS configuration in the TLS Registry extension; Kafka only supports named TLS configs here
+ */
+ String tlsConfigName() default "";
+
+ /**
+ * @return whether Quarkus configuration properties prepared for SSL and SASL_SSL should use Quarkus TLS registry.
+ */
+ boolean tlsRegistryEnabled() default false;
}
diff --git a/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/KafkaContainerManagedResourceBuilder.java b/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/KafkaContainerManagedResourceBuilder.java
index ddcbaaff9..adf9786e4 100644
--- a/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/KafkaContainerManagedResourceBuilder.java
+++ b/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/KafkaContainerManagedResourceBuilder.java
@@ -27,6 +27,7 @@ public class KafkaContainerManagedResourceBuilder implements ManagedResourceBuil
private String kafkaConfigPath;
private String serverProperties;
private String[] kafkaConfigResources;
+ private String quarkusTlsRegistryConfigName = null;
protected KafkaVendor getVendor() {
return vendor;
@@ -91,6 +92,13 @@ protected String getRegistryImageVersion() {
return registryImage;
}
+ /**
+ * @return TLS config name or null if the registry is disabled
+ */
+ protected String getQuarkusTlsRegistryConfigName() {
+ return quarkusTlsRegistryConfigName;
+ }
+
@Override
public void init(Annotation annotation) {
KafkaContainer metadata = (KafkaContainer) annotation;
@@ -104,6 +112,13 @@ public void init(Annotation annotation) {
this.kafkaConfigPath = PropertiesUtils.resolveProperty(metadata.kafkaConfigPath());
this.serverProperties = PropertiesUtils.resolveProperty(metadata.serverProperties());
this.kafkaConfigResources = metadata.kafkaConfigResources();
+ if (metadata.tlsRegistryEnabled()) {
+ if (metadata.tlsConfigName().isEmpty()) {
+ throw new IllegalStateException(
+ "Kafka client must be configured with named TLS config when TLS registry is enabled");
+ }
+ this.quarkusTlsRegistryConfigName = metadata.tlsConfigName();
+ }
}
@Override
diff --git a/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/StrimziKafkaContainerManagedResource.java b/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/StrimziKafkaContainerManagedResource.java
index 33f9c4ccb..a17384b17 100644
--- a/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/StrimziKafkaContainerManagedResource.java
+++ b/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/StrimziKafkaContainerManagedResource.java
@@ -136,12 +136,20 @@ protected String[] getKafkaConfigResources() {
trustStoreLocation = "${custom-kafka-trust-store-location:}";
}
- var configPropertyIterator = Map.of(
- "kafka.ssl.enable", "true",
- "kafka.security.protocol", "SSL",
- "kafka.ssl.truststore.location", trustStoreLocation,
- "kafka.ssl.truststore.password", "top-secret",
- "kafka.ssl.truststore.type", "PKCS12");
+ var configPropertyIterator = new HashMap<>();
+ configPropertyIterator.put("kafka.ssl.enable", "true");
+ configPropertyIterator.put("kafka.security.protocol", "SSL");
+ if (model.getQuarkusTlsRegistryConfigName() == null) {
+ configPropertyIterator.put("kafka.ssl.truststore.location", trustStoreLocation);
+ configPropertyIterator.put("kafka.ssl.truststore.password", "top-secret");
+ configPropertyIterator.put("kafka.ssl.truststore.type", "PKCS12");
+ } else {
+ final String tsConfigKeyPrefix = "quarkus.tls.%s.trust-store.p12."
+ .formatted(model.getQuarkusTlsRegistryConfigName());
+ configPropertyIterator.put("kafka.tls-configuration-name", model.getQuarkusTlsRegistryConfigName());
+ configPropertyIterator.put(tsConfigKeyPrefix + "path", trustStoreLocation);
+ configPropertyIterator.put(tsConfigKeyPrefix + "password", "top-secret");
+ }
if (model.getProtocol() == KafkaProtocol.SASL_SSL) {
configPropertyIterator = new HashMap<>(configPropertyIterator);
configPropertyIterator.put("kafka.security.protocol", "SASL_SSL");