From 2a46ba618bfb91d38a042f8ae410d5814b90fb5c Mon Sep 17 00:00:00 2001 From: Fedor Dudinsky Date: Thu, 20 Jun 2024 09:56:59 +0200 Subject: [PATCH] QQE-737 | Make Kafka use KRaft mode In KRaft mode Zookeper is not used, since brokers communicate to each other directly. --- .../strimzi-custom-server-ssl.properties | 51 ++++++--------- .../BaseKafkaContainerManagedResource.java | 6 +- ...tStrimziKafkaContainerManagedResource.java | 2 +- .../StrimziKafkaContainerManagedResource.java | 37 ++++------- .../ExtendedStrimziKafkaContainer.java | 52 ++++++++++++--- ...strimzi-default-server-sasl-ssl.properties | 49 ++++++-------- .../strimzi-default-server-sasl.properties | 64 ++++++++----------- .../strimzi-default-server-ssl.properties | 50 ++++++--------- .../resources/strimzi-deployment-template.yml | 54 ++++++---------- 9 files changed, 162 insertions(+), 203 deletions(-) diff --git a/examples/kafka/src/test/resources/strimzi-custom-server-ssl.properties b/examples/kafka/src/test/resources/strimzi-custom-server-ssl.properties index cdad609b6..aaa5f8495 100644 --- a/examples/kafka/src/test/resources/strimzi-custom-server-ssl.properties +++ b/examples/kafka/src/test/resources/strimzi-custom-server-ssl.properties @@ -17,8 +17,14 @@ ############################# Server Basics ############################# -# The id of the broker. This must be set to a unique integer for each broker. -broker.id=0 +# The role of this server. Setting this puts us in KRaft mode +process.roles=broker,controller + +# The node id associated with this instance's roles +node.id=1 + +# The connect string for the controller quorum +controller.quorum.voters=1@localhost:9094 ############################# Socket Server Settings ############################# @@ -29,19 +35,25 @@ broker.id=0 # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 -listeners=BROKER://0.0.0.0:9093,SSL://0.0.0.0:9092 - +listeners=BROKER://0.0.0.0:9093,SSL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094 +# Name of listener used for communication between brokers. +inter.broker.listener.name=BROKER # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 -advertised.listeners=SSL://localhost:${KAFKA_MAPPED_PORT},BROKER://localhost:9093 +advertised.listeners=SSL://localhost:${KAFKA_MAPPED_PORT},BROKER://localhost:9093 + +# A comma-separated list of the names of the listeners used by the controller. +# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol +# This is required if running in KRaft mode. +controller.listener.names=CONTROLLER # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL -listener.security.protocol.map=BROKER:PLAINTEXT,SSL:SSL +listener.security.protocol.map=BROKER:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 @@ -58,10 +70,7 @@ socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 -inter.broker.listener.name=BROKER - #### SSL #### - ssl.keystore.location=/opt/kafka/config/strimzi-custom-server-ssl-keystore.p12 ssl.keystore.password=top-secret ssl.keystore.type=PKCS12 @@ -75,7 +84,7 @@ ssl.endpoint.identification.algorithm= ############################# Log Basics ############################# # A comma separated list of directories under which to store log files -log.dirs=/tmp/kafka-logs +log.dirs=/tmp/kraft-combined-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across @@ -130,25 +139,3 @@ log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 - -############################# Zookeeper ############################# - -# Zookeeper connection string (see zookeeper docs for details). -# This is a comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". -# You can also append an optional chroot string to the urls to specify the -# root directory for all kafka znodes. -zookeeper.connect=localhost:2181 - -# Timeout in ms for connecting to zookeeper -zookeeper.connection.timeout.ms=45000 - - -############################# Group Coordinator Settings ############################# - -# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. -# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. -# The default value for this is 3 seconds. -# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. -# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. -group.initial.rebalance.delay.ms=0 diff --git a/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/BaseKafkaContainerManagedResource.java b/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/BaseKafkaContainerManagedResource.java index 7e024b356..b4457ef94 100644 --- a/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/BaseKafkaContainerManagedResource.java +++ b/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/BaseKafkaContainerManagedResource.java @@ -9,12 +9,13 @@ import org.testcontainers.utility.MountableFile; import io.quarkus.test.bootstrap.KafkaService; +import io.quarkus.test.logging.Log; import io.quarkus.test.logging.TestContainersLoggingHandler; public abstract class BaseKafkaContainerManagedResource extends DockerContainerManagedResource { - private static final String SERVER_PROPERTIES = "server.properties"; - private static final String EXPECTED_LOG = ".*started \\(kafka.server.KafkaServer\\).*"; + private static final String SERVER_PROPERTIES = "kraft/server.properties"; + private static final String EXPECTED_LOG = ".*started .*kafka.server.Kafka.*Server.*"; protected final KafkaContainerManagedResourceBuilder model; @@ -79,6 +80,7 @@ protected GenericContainer initContainer() { String kafkaConfigPath = model.getKafkaConfigPath(); if (StringUtils.isNotEmpty(getServerProperties())) { + Log.info("Copying file %s to %s ", getServerProperties(), kafkaConfigPath + SERVER_PROPERTIES); kafkaContainer.withCopyFileToContainer(MountableFile.forClasspathResource(getServerProperties()), kafkaConfigPath + SERVER_PROPERTIES); } diff --git a/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/OpenShiftStrimziKafkaContainerManagedResource.java b/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/OpenShiftStrimziKafkaContainerManagedResource.java index 337c52fa6..95389fe78 100644 --- a/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/OpenShiftStrimziKafkaContainerManagedResource.java +++ b/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/OpenShiftStrimziKafkaContainerManagedResource.java @@ -28,7 +28,7 @@ public class OpenShiftStrimziKafkaContainerManagedResource implements ManagedRes private static final String REGISTRY_DEPLOYMENT_TEMPLATE_PROPERTY_DEFAULT = "/registry-deployment-template.yml"; private static final String REGISTRY_DEPLOYMENT = "registry.yml"; - private static final String EXPECTED_LOG = "started (kafka.server.KafkaServer)"; + private static final String EXPECTED_LOG = "Kafka Server started"; private static final int HTTP_PORT = 9092; diff --git a/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/StrimziKafkaContainerManagedResource.java b/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/StrimziKafkaContainerManagedResource.java index c8d8aaef4..33f9c4ccb 100644 --- a/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/StrimziKafkaContainerManagedResource.java +++ b/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/StrimziKafkaContainerManagedResource.java @@ -2,7 +2,6 @@ import static io.quarkus.test.services.Certificate.Format.PKCS12; -import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -56,37 +55,23 @@ public URILike getURI(Protocol protocol) { return uri; } - @Override - public void afterStart() { - super.afterStart(); - if (model.getProtocol() == KafkaProtocol.SASL_SSL - && (model.getServerProperties() == null || model.getServerProperties().isEmpty())) { - // make sure that client is added right after the start to Zookeeper - // see https://kafka.apache.org/documentation/#security_sasl_scram for more info - ExtendedStrimziKafkaContainer container = model.getContext().get(DOCKER_INNER_CONTAINER); - var command = ("/opt/kafka/bin/kafka-configs.sh --zookeeper localhost:2181 --alter --add-config " - + "'SCRAM-SHA-512=[password=%s]' --entity-type users --entity-name %s;") - .formatted(SASL_PASSWORD_VALUE, SASL_USERNAME_VALUE); - try { - var execResult = container.execInContainer("sh", "-c", command); - if (execResult.getExitCode() != 0) { - throw new IllegalStateException( - "Failed to add Kafka 'client' user to Zookeeper: " + execResult.getStderr()); - } - } catch (IOException | InterruptedException e) { - throw new RuntimeException("Failed to add Kafka 'client' user to Zookeeper", e); - } - } - } - @Override protected GenericContainer initKafkaContainer() { - ExtendedStrimziKafkaContainer container = new ExtendedStrimziKafkaContainer(getKafkaImageName(), getKafkaVersion()); + ExtendedStrimziKafkaContainer container = new ExtendedStrimziKafkaContainer(getKafkaImageName(), getKafkaVersion()) + .enableKraftMode(); if (StringUtils.isNotEmpty(getServerProperties())) { container.useCustomServerProperties(); } container.withCreateContainerCmdModifier(cmd -> cmd.withName(DockerUtils.generateDockerContainerName())); - + if (model.getProtocol() == KafkaProtocol.SASL_SSL + && (model.getServerProperties() == null || model.getServerProperties().isEmpty())) { + /* + * make sure that client is added before the start + * https://docs.confluent.io/platform/current/kafka/authentication_sasl/authentication_sasl_scram.html#kraft-based- + * clusters + */ + container.configureScram(SASL_USERNAME_VALUE, SASL_PASSWORD_VALUE); + } return container; } diff --git a/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/strimzi/ExtendedStrimziKafkaContainer.java b/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/strimzi/ExtendedStrimziKafkaContainer.java index bb717a767..01cfda1ec 100644 --- a/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/strimzi/ExtendedStrimziKafkaContainer.java +++ b/quarkus-test-service-kafka/src/main/java/io/quarkus/test/services/containers/strimzi/ExtendedStrimziKafkaContainer.java @@ -1,25 +1,29 @@ package io.quarkus.test.services.containers.strimzi; -import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; import org.testcontainers.images.builder.Transferable; import com.github.dockerjava.api.command.InspectContainerResponse; import io.quarkus.test.services.containers.model.KafkaVendor; +import io.smallrye.mutiny.tuples.Tuple2; import io.strimzi.test.container.StrimziKafkaContainer; /** * Extend the functionality of io.strimzi.StrimziKafkaContainer with: * - Do not overwrite parameters of server.properties. - * */ public class ExtendedStrimziKafkaContainer extends StrimziKafkaContainer { private static final String KAFKA_MAPPED_PORT = "${KAFKA_MAPPED_PORT}"; private static final int ALLOW_EXEC = 700; + private static final String TESTCONTAINERS_SCRIPT = "/testcontainers_start.sh"; private boolean useCustomServerProperties = false; + private Optional> credentials = Optional.empty(); public ExtendedStrimziKafkaContainer(String name, String version) { super(String.format("%s:%s", name, version)); @@ -32,17 +36,45 @@ public void useCustomServerProperties() { @Override protected void containerIsStarting(InspectContainerResponse containerInfo, boolean reused) { if (useCustomServerProperties) { + List script = new ArrayList<>(); + script.add("#!/bin/bash"); int kafkaExposedPort = this.getMappedPort(KafkaVendor.STRIMZI.getPort()); - - String command = "#!/bin/bash \n"; - command = command + "sed 's/" + KAFKA_MAPPED_PORT + "/" + kafkaExposedPort + "/g' " - + "config/server.properties > /tmp/effective_server.properties &\n"; - command = command + "bin/zookeeper-server-start.sh config/zookeeper.properties &\n"; - command = command + "bin/kafka-server-start.sh /tmp/effective_server.properties"; - this.copyFileToContainer(Transferable.of(command.getBytes(StandardCharsets.UTF_8), ALLOW_EXEC), - "/testcontainers_start.sh"); + script.add("sed 's/" + KAFKA_MAPPED_PORT + "/" + kafkaExposedPort + "/g' " + + "config/kraft/server.properties > /tmp/effective_server.properties"); + script.add("KAFKA_CLUSTER_ID=\"$(bin/kafka-storage.sh random-uuid)\""); + StringBuilder storageFormat = new StringBuilder() + .append("/opt/kafka/bin/kafka-storage.sh format") + .append(" -t ${KAFKA_CLUSTER_ID}") + .append(" -c /tmp/effective_server.properties"); + credentials.ifPresent(credentials -> { + storageFormat.append(" --add-scram 'SCRAM-SHA-512=[name=%s,password=%s]'" + .formatted(credentials.getItem1(), credentials.getItem2())); + }); + script.add(storageFormat.toString()); + script.add("bin/kafka-server-start.sh /tmp/effective_server.properties"); + this.copyFileToContainer(Transferable.of(String.join("\n", script), ALLOW_EXEC), TESTCONTAINERS_SCRIPT); } else { + // we do not process credentials here, since SASL always used together with custom properties + // see StrimziKafkaContainerManagedResource#getServerProperties super.containerIsStarting(containerInfo, reused); + // if that is to change, we will need to copy script from test containers, modify it and copy back again } + + } + + /** + * The code below requires an explanation. + * StrimziKafkaContainer has a special method which makes it use kraft mode (without a zookeeper) + * Container quay.io/strimzi/kafka requires for broker.id and node.id to have the same value in kraft mode, + * and for some reason strimzi class always overwrites broker id (to 0 by default) + * since config/kraft/server.properties contains node.id=1, we have to use this value + */ + public ExtendedStrimziKafkaContainer enableKraftMode() { + return (ExtendedStrimziKafkaContainer) super.withKraft() + .withBrokerId(1); + } + + public void configureScram(String name, String password) { + credentials = Optional.of(Tuple2.of(name, password)); } } diff --git a/quarkus-test-service-kafka/src/main/resources/strimzi-default-server-sasl-ssl.properties b/quarkus-test-service-kafka/src/main/resources/strimzi-default-server-sasl-ssl.properties index fb66b4a3c..309db0d1e 100644 --- a/quarkus-test-service-kafka/src/main/resources/strimzi-default-server-sasl-ssl.properties +++ b/quarkus-test-service-kafka/src/main/resources/strimzi-default-server-sasl-ssl.properties @@ -17,8 +17,14 @@ ############################# Server Basics ############################# -# The id of the broker. This must be set to a unique integer for each broker. -broker.id=0 +# The role of this server. Setting this puts us in KRaft mode +process.roles=broker,controller + +# The node id associated with this instance's roles +node.id=1 + +# The connect string for the controller quorum +controller.quorum.voters=1@localhost:9094 ############################# Socket Server Settings ############################# @@ -29,9 +35,10 @@ broker.id=0 # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 -listeners=BROKER://0.0.0.0:9093,SASL_SSL://0.0.0.0:9092 - +listeners=BROKER://0.0.0.0:9093,SASL_SSL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094 +# Name of listener used for communication between brokers. +inter.broker.listener.name=BROKER # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value @@ -39,9 +46,14 @@ listeners=BROKER://0.0.0.0:9093,SASL_SSL://0.0.0.0:9092 #advertised.listeners=PLAINTEXT://your.host.name:9092 advertised.listeners=SASL_SSL://localhost:${KAFKA_MAPPED_PORT},BROKER://localhost:9093 +# A comma-separated list of the names of the listeners used by the controller. +# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol +# This is required if running in KRaft mode. +controller.listener.names=CONTROLLER + # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL -listener.security.protocol.map=BROKER:PLAINTEXT,SASL_SSL:SASL_SSL +listener.security.protocol.map=BROKER:PLAINTEXT,SASL_SSL:SASL_SSL,CONTROLLER:PLAINTEXT # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 @@ -58,9 +70,6 @@ socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 - -inter.broker.listener.name=BROKER - ############################# SASL_SSL Settings ############################# sasl.enabled.mechanisms=SCRAM-SHA-512 @@ -86,7 +95,7 @@ ssl.client.auth=required ############################# Log Basics ############################# # A comma separated list of directories under which to store log files -log.dirs=/tmp/kafka-logs +log.dirs=/tmp/kraft-combined-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across @@ -141,25 +150,3 @@ log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 - -############################# Zookeeper ############################# - -# Zookeeper connection string (see zookeeper docs for details). -# This is a comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". -# You can also append an optional chroot string to the urls to specify the -# root directory for all kafka znodes. -zookeeper.connect=localhost:2181 - -# Timeout in ms for connecting to zookeeper -zookeeper.connection.timeout.ms=45000 - - -############################# Group Coordinator Settings ############################# - -# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. -# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. -# The default value for this is 3 seconds. -# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. -# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. -group.initial.rebalance.delay.ms=0 diff --git a/quarkus-test-service-kafka/src/main/resources/strimzi-default-server-sasl.properties b/quarkus-test-service-kafka/src/main/resources/strimzi-default-server-sasl.properties index 85d0e60f6..4a55cc867 100644 --- a/quarkus-test-service-kafka/src/main/resources/strimzi-default-server-sasl.properties +++ b/quarkus-test-service-kafka/src/main/resources/strimzi-default-server-sasl.properties @@ -13,35 +13,51 @@ # See the License for the specific language governing permissions and # limitations under the License. -# see kafka.server.KafkaConfig for additional details and defaults +# +# This configuration file is intended for use in KRaft mode, where +# Apache ZooKeeper is not present. +# ############################# Server Basics ############################# -# The id of the broker. This must be set to a unique integer for each broker. -broker.id=0 +# The role of this server. Setting this puts us in KRaft mode +process.roles=broker,controller + +# The node id associated with this instance's roles +node.id=1 + +# The connect string for the controller quorum +controller.quorum.voters=1@localhost:9094 ############################# Socket Server Settings ############################# -# The address the socket server listens on. It will get the value returned from -# java.net.InetAddress.getCanonicalHostName() if not configured. +# The address the socket server listens on. +# Combined nodes (i.e. those with `process.roles=broker,controller`) must list the controller listener here at a minimum. +# If the broker listener is not defined, the default listener will use a host name that is equal to the value of java.net.InetAddress.getCanonicalHostName(), +# with PLAINTEXT listener name, and port 9092. # FORMAT: # listeners = listener_name://host_name:port # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 -#listeners=PLAINTEXT://:9092 -listeners=BROKER://0.0.0.0:9093,SASL_PLAINTEXT://0.0.0.0:9092 +listeners=BROKER://0.0.0.0:9093,SASL_PLAINTEXT://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094 +# Name of listener used for communication between brokers. +inter.broker.listener.name=BROKER # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 -advertised.listeners=SASL_PLAINTEXT://localhost:${KAFKA_MAPPED_PORT},BROKER://localhost:9093 +advertised.listeners=SASL_PLAINTEXT://localhost:${KAFKA_MAPPED_PORT},BROKER://localhost:9093 + +# A comma-separated list of the names of the listeners used by the controller. +# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol +# This is required if running in KRaft mode. +controller.listener.names=CONTROLLER # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details -#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL -listener.security.protocol.map=BROKER:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT +listener.security.protocol.map=BROKER:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 @@ -58,10 +74,6 @@ socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 - -inter.broker.listener.name=BROKER - - #### SASL #### sasl.enabled.mechanisms=PLAIN @@ -78,7 +90,7 @@ listener.name.sasl_plaintext.plain.sasl.jaas.config=org.apache.kafka.common.secu ############################# Log Basics ############################# # A comma separated list of directories under which to store log files -log.dirs=/tmp/kafka-logs +log.dirs=/tmp/kraft-combined-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across @@ -133,25 +145,3 @@ log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 - -############################# Zookeeper ############################# - -# Zookeeper connection string (see zookeeper docs for details). -# This is a comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". -# You can also append an optional chroot string to the urls to specify the -# root directory for all kafka znodes. -zookeeper.connect=localhost:2181 - -# Timeout in ms for connecting to zookeeper -zookeeper.connection.timeout.ms=45000 - - -############################# Group Coordinator Settings ############################# - -# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. -# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. -# The default value for this is 3 seconds. -# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. -# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. -group.initial.rebalance.delay.ms=0 diff --git a/quarkus-test-service-kafka/src/main/resources/strimzi-default-server-ssl.properties b/quarkus-test-service-kafka/src/main/resources/strimzi-default-server-ssl.properties index 37cc5bfb6..3e21af16b 100644 --- a/quarkus-test-service-kafka/src/main/resources/strimzi-default-server-ssl.properties +++ b/quarkus-test-service-kafka/src/main/resources/strimzi-default-server-ssl.properties @@ -17,8 +17,14 @@ ############################# Server Basics ############################# -# The id of the broker. This must be set to a unique integer for each broker. -broker.id=0 +# The role of this server. Setting this puts us in KRaft mode +process.roles=broker,controller + +# The node id associated with this instance's roles +node.id=1 + +# The connect string for the controller quorum +controller.quorum.voters=1@localhost:9094 ############################# Socket Server Settings ############################# @@ -29,19 +35,25 @@ broker.id=0 # EXAMPLE: # listeners = PLAINTEXT://your.host.name:9092 #listeners=PLAINTEXT://:9092 -listeners=BROKER://0.0.0.0:9093,SSL://0.0.0.0:9092 - +listeners=BROKER://0.0.0.0:9093,SSL://0.0.0.0:9092,CONTROLLER://0.0.0.0:9094 +# Name of listener used for communication between brokers. +inter.broker.listener.name=BROKER # Hostname and port the broker will advertise to producers and consumers. If not set, # it uses the value for "listeners" if configured. Otherwise, it will use the value # returned from java.net.InetAddress.getCanonicalHostName(). #advertised.listeners=PLAINTEXT://your.host.name:9092 -advertised.listeners=SSL://localhost:${KAFKA_MAPPED_PORT},BROKER://localhost:9093 +advertised.listeners=SSL://localhost:${KAFKA_MAPPED_PORT},BROKER://localhost:9093 + +# A comma-separated list of the names of the listeners used by the controller. +# If no explicit mapping set in `listener.security.protocol.map`, default will be using PLAINTEXT protocol +# This is required if running in KRaft mode. +controller.listener.names=CONTROLLER # Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details #listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL -listener.security.protocol.map=BROKER:PLAINTEXT,SSL:SSL +listener.security.protocol.map=BROKER:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT # The number of threads that the server uses for receiving requests from the network and sending responses to the network num.network.threads=3 @@ -58,8 +70,6 @@ socket.receive.buffer.bytes=102400 # The maximum size of a request that the socket server will accept (protection against OOM) socket.request.max.bytes=104857600 -inter.broker.listener.name=BROKER - #### SSL #### ssl.keystore.location=/opt/kafka/config/strimzi-server-ssl-keystore.p12 @@ -75,7 +85,7 @@ ssl.endpoint.identification.algorithm= ############################# Log Basics ############################# # A comma separated list of directories under which to store log files -log.dirs=/tmp/kafka-logs +log.dirs=/tmp/kraft-combined-logs # The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across @@ -130,25 +140,3 @@ log.segment.bytes=1073741824 # The interval at which log segments are checked to see if they can be deleted according # to the retention policies log.retention.check.interval.ms=300000 - -############################# Zookeeper ############################# - -# Zookeeper connection string (see zookeeper docs for details). -# This is a comma separated host:port pairs, each corresponding to a zk -# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002". -# You can also append an optional chroot string to the urls to specify the -# root directory for all kafka znodes. -zookeeper.connect=localhost:2181 - -# Timeout in ms for connecting to zookeeper -zookeeper.connection.timeout.ms=45000 - - -############################# Group Coordinator Settings ############################# - -# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance. -# The rebalance will be further delayed by the value of group.initial.rebalance.delay.ms as new members join the group, up to a maximum of max.poll.interval.ms. -# The default value for this is 3 seconds. -# We override this to 0 here as it makes for a better out-of-the-box experience for development and testing. -# However, in production environments the default value of 3 seconds is more suitable as this will help to avoid unnecessary, and potentially expensive, rebalances during application startup. -group.initial.rebalance.delay.ms=0 diff --git a/quarkus-test-service-kafka/src/main/resources/strimzi-deployment-template.yml b/quarkus-test-service-kafka/src/main/resources/strimzi-deployment-template.yml index 2dc420c61..f351eacb4 100644 --- a/quarkus-test-service-kafka/src/main/resources/strimzi-deployment-template.yml +++ b/quarkus-test-service-kafka/src/main/resources/strimzi-deployment-template.yml @@ -19,17 +19,17 @@ items: status: loadBalancer: {} - apiVersion: v1 - kind: Service + kind: ConfigMap metadata: - name: zookeeper-service labels: - app: zookeeper - spec: - ports: - - port: 2181 - name: client - selector: - app: zookeeper + app: ${SERVICE_NAME} + name: "start-script" + data: + start.sh: |- + #!/bin/bash + KAFKA_CLUSTER_ID="$(/opt/kafka/bin/kafka-storage.sh random-uuid)" + /opt/kafka/bin/kafka-storage.sh format -t ${KAFKA_CLUSTER_ID} -c config/kraft/server.properties + /opt/kafka/bin/kafka-server-start.sh config/kraft/server.properties --override listeners=PLAINTEXT://0.0.0.0:${KAFKA_PORT},CONTROLLER://localhost:9093 --override advertised.listeners=PLAINTEXT://${SERVICE_NAME}:${KAFKA_PORT} - apiVersion: "apps/v1" kind: "Deployment" metadata: @@ -50,35 +50,23 @@ items: - name: ${SERVICE_NAME}-container image: ${IMAGE}:${VERSION} imagePullPolicy: IfNotPresent - command: [ "/bin/sh" ] - args: [ "-c", "bin/kafka-server-start.sh config/server.properties --override listeners=PLAINTEXT://0.0.0.0:${KAFKA_PORT} --override advertised.listeners=PLAINTEXT://${SERVICE_NAME}:${KAFKA_PORT} --override zookeeper.connect=zookeeper-service:2181" ] + command: + - "/tmp/start.sh" env: - name: "LOG_DIR" value: "/tmp" ports: - containerPort: ${KAFKA_PORT} resources: {} + volumeMounts: + - name: script-volume + mountPath: "/tmp/start.sh" + subPath: "start.sh" + volumes: + - name: script-volume + configMap: + #Our template engine turns octal to decimal(for some reason) so we have to use decimal here + defaultMode: 365 # that is rxrxrx or 0555. + name: start-script triggers: - type: "ConfigChange" - - apiVersion: apps/v1 - kind: Deployment - metadata: - name: zookeeper - spec: - replicas: 1 - selector: - matchLabels: - app: zookeeper - template: - metadata: - labels: - app: zookeeper - spec: - containers: - - name: k8s-zookeeper - image: quay.io/debezium/zookeeper - ports: - - containerPort: 2181 - name: client - restartPolicy: Always - status: { } \ No newline at end of file