Skip to content

Commit

Permalink
Merge pull request #1276 from michalvavrik/feature/tls-registry-with-…
Browse files Browse the repository at this point in the history
…strimzi

Support TLS registry with Kafka Strimzi for SASL_SSL and SSL scenarios
  • Loading branch information
michalvavrik authored Sep 3, 2024
2 parents 8d18d28 + 1ca8418 commit 87b6219
Show file tree
Hide file tree
Showing 6 changed files with 128 additions and 6 deletions.
4 changes: 4 additions & 0 deletions examples/kafka/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
<groupId>io.quarkus</groupId>
<artifactId>quarkus-resteasy</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-tls-registry</artifactId>
</dependency>
<dependency>
<groupId>io.quarkus.qe</groupId>
<artifactId>quarkus-test-containers</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package io.quarkus.qe;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.PathParam;

import io.quarkus.logging.Log;
import io.quarkus.tls.TlsConfigurationRegistry;

@Path("tls-registry")
public class TlsRegistryResource {

@Inject
TlsConfigurationRegistry tlsRegistry;

@GET
@Path("validate-config/{tls-config-name}")
public boolean validateNamedTlsConfig(@PathParam("tls-config-name") String tlsConfigName) {
var namedConfig = tlsRegistry.get(tlsConfigName);
if (namedConfig.isEmpty()) {
Log.error("TLS config '%s' is missing".formatted(tlsConfigName));
return false;
}
var truststore = namedConfig.get().getTrustStore();
if (truststore == null) {
Log.error("TLS config '%s' truststore is not configured".formatted(tlsConfigName));
return false;
}
return true;
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.quarkus.qe;

import static org.hamcrest.Matchers.is;

import java.time.Duration;

import org.apache.http.HttpStatus;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;

import io.quarkus.test.bootstrap.KafkaService;
import io.quarkus.test.bootstrap.RestService;
import io.quarkus.test.scenarios.QuarkusScenario;
import io.quarkus.test.services.KafkaContainer;
import io.quarkus.test.services.QuarkusApplication;
import io.quarkus.test.services.containers.model.KafkaProtocol;
import io.quarkus.test.services.containers.model.KafkaVendor;

@QuarkusScenario
public class StrimziKafkaWithTlsRegistryAndSaslSslMessagingIT {

private static final String TLS_CONFIG_NAME = "tls-config-name-1";

@KafkaContainer(vendor = KafkaVendor.STRIMZI, protocol = KafkaProtocol.SASL_SSL, tlsConfigName = TLS_CONFIG_NAME, tlsRegistryEnabled = true)
static final KafkaService kafka = new KafkaService();

@QuarkusApplication
static final RestService app = new RestService()
.withProperties(kafka::getSslProperties)
.withProperty("kafka.bootstrap.servers", kafka::getBootstrapUrl);

@Test
public void checkUserResourceByNormalUser() {
Awaitility.await().atMost(Duration.ofSeconds(5)).untilAsserted(() -> {
app.given().get("/prices/poll")
.then()
.statusCode(HttpStatus.SC_OK);
});
}

@Test
public void testTlsRegistryContainNamedTlsConfig() {
// this is but a smoke test so that we know @KafkaContainer#tlsConfigName had effect
// while the 'checkUserResourceByNormalUser' tests that configured truststore works
app.given()
.pathParam("tls-config-name", TLS_CONFIG_NAME)
.get("/tls-registry/validate-config/{tls-config-name}")
.then()
.statusCode(HttpStatus.SC_OK)
.body(is("true"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,4 +37,14 @@
String[] kafkaConfigResources() default {};

Class<? extends ManagedResourceBuilder> builder() default KafkaContainerManagedResourceBuilder.class;

/**
* @return name of the TLS configuration in the TLS Registry extension; Kafka only supports named TLS configs here
*/
String tlsConfigName() default "";

/**
* @return whether Quarkus configuration properties prepared for SSL and SASL_SSL should use Quarkus TLS registry.
*/
boolean tlsRegistryEnabled() default false;
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public class KafkaContainerManagedResourceBuilder implements ManagedResourceBuil
private String kafkaConfigPath;
private String serverProperties;
private String[] kafkaConfigResources;
private String quarkusTlsRegistryConfigName = null;

protected KafkaVendor getVendor() {
return vendor;
Expand Down Expand Up @@ -91,6 +92,13 @@ protected String getRegistryImageVersion() {
return registryImage;
}

/**
* @return TLS config name or null if the registry is disabled
*/
protected String getQuarkusTlsRegistryConfigName() {
return quarkusTlsRegistryConfigName;
}

@Override
public void init(Annotation annotation) {
KafkaContainer metadata = (KafkaContainer) annotation;
Expand All @@ -104,6 +112,13 @@ public void init(Annotation annotation) {
this.kafkaConfigPath = PropertiesUtils.resolveProperty(metadata.kafkaConfigPath());
this.serverProperties = PropertiesUtils.resolveProperty(metadata.serverProperties());
this.kafkaConfigResources = metadata.kafkaConfigResources();
if (metadata.tlsRegistryEnabled()) {
if (metadata.tlsConfigName().isEmpty()) {
throw new IllegalStateException(
"Kafka client must be configured with named TLS config when TLS registry is enabled");
}
this.quarkusTlsRegistryConfigName = metadata.tlsConfigName();
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,20 @@ protected String[] getKafkaConfigResources() {
trustStoreLocation = "${custom-kafka-trust-store-location:}";
}

var configPropertyIterator = Map.of(
"kafka.ssl.enable", "true",
"kafka.security.protocol", "SSL",
"kafka.ssl.truststore.location", trustStoreLocation,
"kafka.ssl.truststore.password", "top-secret",
"kafka.ssl.truststore.type", "PKCS12");
var configPropertyIterator = new HashMap<>();
configPropertyIterator.put("kafka.ssl.enable", "true");
configPropertyIterator.put("kafka.security.protocol", "SSL");
if (model.getQuarkusTlsRegistryConfigName() == null) {
configPropertyIterator.put("kafka.ssl.truststore.location", trustStoreLocation);
configPropertyIterator.put("kafka.ssl.truststore.password", "top-secret");
configPropertyIterator.put("kafka.ssl.truststore.type", "PKCS12");
} else {
final String tsConfigKeyPrefix = "quarkus.tls.%s.trust-store.p12."
.formatted(model.getQuarkusTlsRegistryConfigName());
configPropertyIterator.put("kafka.tls-configuration-name", model.getQuarkusTlsRegistryConfigName());
configPropertyIterator.put(tsConfigKeyPrefix + "path", trustStoreLocation);
configPropertyIterator.put(tsConfigKeyPrefix + "password", "top-secret");
}
if (model.getProtocol() == KafkaProtocol.SASL_SSL) {
configPropertyIterator = new HashMap<>(configPropertyIterator);
configPropertyIterator.put("kafka.security.protocol", "SASL_SSL");
Expand Down

0 comments on commit 87b6219

Please sign in to comment.