From b6cb80a65e7cb73843a679b0f5e2ac8acdd642ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Tue, 19 Dec 2023 11:52:51 +0200 Subject: [PATCH] Separate kafka producers for each proxy (#78) * Introduces more reliability by separating Kafka Producers. --- .github/workflows/build.yml | 24 ++--- CHANGELOG.md | 32 ++++++- TODO.md | 2 + build.common.gradle | 4 +- build.gradle | 22 +++-- build.libraries.gradle | 26 ++--- docs/setup.md | 2 +- gradle.properties | 2 +- gradle/wrapper/gradle-wrapper.properties | 2 +- .../kafka/tkms/ITkmsInterrupterService.java | 17 ++++ .../kafka/tkms/TkmsInterrupterService.java | 40 ++++++++ .../kafka/tkms/TkmsStorageToKafkaProxy.java | 30 +++++- .../tkms/TransactionalKafkaMessageSender.java | 14 +-- .../config/ITkmsKafkaProducerProvider.java | 15 ++- .../kafka/tkms/config/TkmsConfiguration.java | 8 ++ .../config/TkmsKafkaProducerProvider.java | 94 ++++++++++++------- .../kafka/tkms/config/TkmsProperties.java | 9 ++ .../kafka/tkms/EndToEndIntTest.java | 29 ++++-- .../kafka/tkms/KafkaMetricsIntTest.java | 66 ++++--------- .../config/TkmsKafkaProducerProviderTest.java | 4 +- .../kafka/tkms/test/BaseIntTest.java | 12 +-- .../src/test/resources/docker-compose.yml | 15 +-- 22 files changed, 304 insertions(+), 165 deletions(-) create mode 100644 tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/ITkmsInterrupterService.java create mode 100644 tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsInterrupterService.java diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 94df6cb..907bfae 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -18,10 +18,9 @@ jobs: max-parallel: 100 matrix: spring_boot_version: - - 3.1.2 - - 3.0.7 - - 2.7.13 - - 2.6.15 + - 3.1.6 + - 3.0.13 + - 2.7.18 env: SPRING_BOOT_VERSION: ${{ matrix.spring_boot_version }} GRADLE_OPTS: "-Djava.security.egd=file:/dev/./urandom -Dorg.gradle.parallel=true" @@ -31,7 +30,7 @@ jobs: TW_TKMS_KAFKA_TCP_9092: 9092 TW_TKMS_KAFKA_TCP_HOST: kafka1 ZOOKEEPER_TCP_2181: 2181 - ZOOKEEPER_TCP_HOST: zk-service1 + ZOOKEEPER_TCP_HOST: zookeeper1 POSTGRES_TCP_HOST: postgres1 POSTGRES_TCP_5432: 5432 container: @@ -46,13 +45,8 @@ jobs: image: transferwiseworkspace/postgres12 env: POSTGRES_PASSWORD: example-password-change-me - zk-service1: - image: bitnami/zookeeper:3.5.5 - env: - ALLOW_ANONYMOUS_LOGIN: "yes" - JVMFLAGS: "-Xmx512m -Xms64m" - zk1: - image: bitnami/zookeeper:3.4.14 + zookeeper1: + image: bitnami/zookeeper:3.7.1 env: ALLOW_ANONYMOUS_LOGIN: "yes" JVMFLAGS: "-Xmx512m -Xms64m" @@ -60,7 +54,7 @@ jobs: image: wurstmeister/kafka:2.12-2.2.0 env: KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zk1:2181 + KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181 KAFKA_LISTENERS: PLAINTEXT://:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_MESSAGE_MAX_BYTES: "10485760" @@ -78,7 +72,7 @@ jobs: - name: Setup Gradle uses: gradle/gradle-build-action@v2 with: - gradle-version: 8.1.1 + gradle-version: 8.5 gradle-home-cache-cleanup: true # Comment out when you are upgrading gradle in a branch and doing tons of commits you would need to test. # cache-read-only: false @@ -157,7 +151,7 @@ jobs: - name: Setup Gradle uses: gradle/gradle-build-action@v2 with: - gradle-version: 8.1.1 + gradle-version: 8.5 gradle-home-cache-cleanup: true # Comment out when you are upgrading gradle in a branch and doing tons of commits you would need to test. # cache-read-only: false diff --git a/CHANGELOG.md b/CHANGELOG.md index 84e2dab..a152613 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,34 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.26.0] - 2023-12-14 + +### Removed + +- Support for Spring Boot 2.6 . + +### Changed + +- Every proxy has its own, independent, Kafka producer. + Before, one producer was shared by all partitions. And, the default shard's producer was also used for topics validation. + +- Kafka producer's flush will be now interrupted from another thread, by a separate housekeeping service. + Wise had an incident, where the `flush()` call hanged forever, and it was not easy to derive that this is the case. + Now we will at least get clear error logs, when this happens. + +- Proxies' Kafka producers will be closed after the poll loop exits. + This would allow to recover from unforeseen kafka clients' bugs and also release resources when another pod takes over the proxying. + +- The default linger time on proxies' kafka producer was increased from 5 ms. to 1000 ms. + This would allow potentially larger batches to get formed. We are not increasing the latency substantially, because we override the + lingering mechanism via `flush` call anyway. + +- Enabled idempotency on producers and increased the in flight requests count to 5. + ## [0.25.1] - 2023-10-30 + ### Added + - Setting METADATA_MAX_AGE_CONFIG to two minutes for producer ## [0.25.0] - 2023-08-09 @@ -56,9 +82,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 In case there is a rare long-running transaction and its messages need to get sent out. * An option to defer insertion of messages into the database to the very end of a transaction, just before commit - - `deferMessageRegistrationUntilCommit` - This would generate fairly recent ids for the messages and earliest visible messages system has less chance to not see and thus skip those. - With that, the earliest visible message system can configure a fairly small look-back window and reduce CPU consumption even further. + - `deferMessageRegistrationUntilCommit` + This would generate fairly recent ids for the messages and earliest visible messages system has less chance to not see and thus skip those. + With that, the earliest visible message system can configure a fairly small look-back window and reduce CPU consumption even further. It can also help to reduce total transaction latency, as all individual messages collected are sent out in batches. diff --git a/TODO.md b/TODO.md index 0464780..f60c76f 100644 --- a/TODO.md +++ b/TODO.md @@ -34,3 +34,5 @@ It’s possible to have our own partitioner and producer interceptor to avoid do 10. Consider changing default partitioner to `RoundRobinPartitioner`. In that keys messages with keys will be distributed more around and consumers can have smaller latencies. + +11. Restructure the docs around deferred messages inserts. Make sure it plays together with "the risks" paragraph etc. \ No newline at end of file diff --git a/build.common.gradle b/build.common.gradle index 1b16f2f..394bca4 100644 --- a/build.common.gradle +++ b/build.common.gradle @@ -24,7 +24,7 @@ configurations { testCompileProtoPath { extendsFrom(local) } - productionRuntimeClasspath{ + productionRuntimeClasspath { extendsFrom(local) } compileClasspath { @@ -143,8 +143,6 @@ test { } tasks.findAll { it.name.startsWith("spotbugs") }*.configure { - effort = "max" - excludeFilter = file('../spotbugs-exclude.xml') reports { diff --git a/build.gradle b/build.gradle index 3ffb09c..ee3084e 100644 --- a/build.gradle +++ b/build.gradle @@ -1,19 +1,24 @@ +import com.github.spotbugs.snom.Confidence +import com.github.spotbugs.snom.Effort import org.eclipse.jgit.api.errors.RefAlreadyExistsException buildscript { + if (!project.hasProperty("springBootVersion")) { + ext.springBootVersion = System.getenv("SPRING_BOOT_VERSION") ?: "2.7.18" + } dependencies { - classpath 'com.google.protobuf:protobuf-gradle-plugin:0.9.3' - classpath "com.avast.gradle:gradle-docker-compose-plugin:0.16.12" + classpath 'com.google.protobuf:protobuf-gradle-plugin:0.9.4' + classpath "com.avast.gradle:gradle-docker-compose-plugin:0.17.5" } } plugins { id 'idea' - id 'org.springframework.boot' version '2.6.14' apply false - id "com.github.spotbugs" version "5.0.14" apply false + id 'org.springframework.boot' version "$springBootVersion" apply false + id "com.github.spotbugs" version "6.0.2" id "at.zierler.yamlvalidator" version "1.5.0" - id 'org.ajoberstar.grgit' version '5.2.0' - id 'io.github.gradle-nexus.publish-plugin' version "1.1.0" + id 'org.ajoberstar.grgit' version '5.2.1' + id 'io.github.gradle-nexus.publish-plugin' version "1.3.0" id 'com.github.johnrengelman.shadow' version '8.1.1' apply false } @@ -23,6 +28,11 @@ idea.project { targetBytecodeVersion = JavaVersion.VERSION_17 } +spotbugs { + effort = Effort.valueOf('MAX') + reportLevel = Confidence.valueOf('DEFAULT') +} + yamlValidator { searchPaths = ['.circleci/'] allowDuplicates = false diff --git a/build.libraries.gradle b/build.libraries.gradle index f9abd78..9406fd0 100644 --- a/build.libraries.gradle +++ b/build.libraries.gradle @@ -1,30 +1,30 @@ ext { - protobufVersion = "3.22.4" - springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: '2.6.15'}" + protobufVersion = "3.24.0" + springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: '2.7.18'}" libraries = [ // version defined awaitility : 'org.awaitility:awaitility:4.2.0', commonsIo : 'commons-io:commons-io:2.11.0', curatorFramework : 'org.apache.curator:curator-framework:5.5.0', curatorRecipes : 'org.apache.curator:curator-recipes:5.5.0', - guava : 'com.google.guava:guava:31.1-jre', + guava : 'com.google.guava:guava:32.1.3-jre', jakartaValidationApi : 'jakarta.validation:jakarta.validation-api:3.0.2', javaxValidationApi : "javax.validation:validation-api:2.0.1.Final", - kafkaStreams : 'org.apache.kafka:kafka-streams:3.4.0', + kafkaStreams : 'org.apache.kafka:kafka-streams:3.2.3', lz4Java : 'org.lz4:lz4-java:1.8.0', protobufJava : "com.google.protobuf:protobuf-java:${protobufVersion}", semver4j : "com.vdurmont:semver4j:3.1.0", - snappyJava : 'org.xerial.snappy:snappy-java:1.1.10.1', + snappyJava : 'org.xerial.snappy:snappy-java:1.1.10.4', spotbugsAnnotations : "com.github.spotbugs:spotbugs-annotations:${spotbugs.toolVersion.get()}", springBootDependencies : "org.springframework.boot:spring-boot-dependencies:${springBootVersion}", - twBaseUtils : 'com.transferwise.common:tw-base-utils:1.10.1', - twContext : 'com.transferwise.common:tw-context:0.12.0', - twContextStarter : 'com.transferwise.common:tw-context-starter:0.12.0', - twGracefulShutdown : 'com.transferwise.common:tw-graceful-shutdown:2.11.0', - twGracefulShutdownInterfaces : 'com.transferwise.common:tw-graceful-shutdown-interfaces:2.11.0', - twLeaderSelector : 'com.transferwise.common:tw-leader-selector:1.10.0', - twLeaderSelectorStarter : 'com.transferwise.common:tw-leader-selector-starter:1.10.0', - zstdJni : 'com.github.luben:zstd-jni:1.5.0-4', + twBaseUtils : 'com.transferwise.common:tw-base-utils:1.12.1', + twContext : 'com.transferwise.common:tw-context:1.0.0', + twContextStarter : 'com.transferwise.common:tw-context-starter:1.0.0', + twGracefulShutdown : 'com.transferwise.common:tw-graceful-shutdown:2.14.2', + twGracefulShutdownInterfaces : 'com.transferwise.common:tw-graceful-shutdown-interfaces:2.14.2', + twLeaderSelector : 'com.transferwise.common:tw-leader-selector:1.10.1', + twLeaderSelectorStarter : 'com.transferwise.common:tw-leader-selector-starter:1.10.1', + zstdJni : 'com.github.luben:zstd-jni:1.5.2-1', // versions managed by spring-boot-dependencies platform commonsLang3 : 'org.apache.commons:commons-lang3', diff --git a/docs/setup.md b/docs/setup.md index 0855b51..9805c34 100644 --- a/docs/setup.md +++ b/docs/setup.md @@ -1,6 +1,6 @@ # Setup -We are assuming you are using Spring Boot, at least version 2.5. +We are assuming you are using Spring Boot, at least version 2.7. First ensure that you have the `mavenCentral` repository available in your Gradle buildscript: diff --git a/gradle.properties b/gradle.properties index b73a05f..6f6dd85 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.25.1 +version=0.26.0 diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 37aef8d..3499ded 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip networkTimeout=10000 zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/ITkmsInterrupterService.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/ITkmsInterrupterService.java new file mode 100644 index 0000000..5005865 --- /dev/null +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/ITkmsInterrupterService.java @@ -0,0 +1,17 @@ +package com.transferwise.kafka.tkms; + +import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor.TaskHandle; +import java.time.Duration; + +public interface ITkmsInterrupterService { + + TaskHandle interruptAfter(Thread t, Duration duration); + + /** + * Cancels the previously set interruption task. + * + *

The handle has to be the one returned from the `interruptAfter` call. + */ + void cancelInterruption(TaskHandle handler); + +} diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsInterrupterService.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsInterrupterService.java new file mode 100644 index 0000000..159a4f3 --- /dev/null +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsInterrupterService.java @@ -0,0 +1,40 @@ +package com.transferwise.kafka.tkms; + +import com.transferwise.common.baseutils.concurrency.IExecutorServicesProvider; +import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor; +import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor.TaskHandle; +import java.time.Duration; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; + +@Slf4j +public class TkmsInterrupterService implements ITkmsInterrupterService, InitializingBean { + + @Autowired + private IExecutorServicesProvider executorServicesProvider; + private ScheduledTaskExecutor scheduledTaskExecutor; + + public void afterPropertiesSet() { + this.scheduledTaskExecutor = executorServicesProvider.getGlobalScheduledTaskExecutor(); + } + + @Override + public TaskHandle interruptAfter(Thread t, Duration duration) { + return scheduledTaskExecutor.scheduleOnce(() -> { + var threadName = Thread.currentThread().getName(); + try { + Thread.currentThread().setName("tkms-interrupt"); + log.warn("Had to interrupt thread '{}'.", t.getName()); + t.interrupt(); + } finally { + Thread.currentThread().setName(threadName); + } + }, duration); + } + + @Override + public void cancelInterruption(TaskHandle handler) { + handler.stop(); + } +} diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java index 677f979..fb72a50 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java @@ -17,6 +17,7 @@ import com.transferwise.kafka.tkms.api.TkmsShardPartition; import com.transferwise.kafka.tkms.config.ITkmsDaoProvider; import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase; import com.transferwise.kafka.tkms.config.TkmsProperties; import com.transferwise.kafka.tkms.dao.ITkmsDao.MessageRecord; import com.transferwise.kafka.tkms.metrics.ITkmsMetricsTemplate; @@ -41,8 +42,10 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; @@ -77,6 +80,8 @@ public class TkmsStorageToKafkaProxy implements GracefulShutdownStrategy, ITkmsS private ITkmsMessageInterceptors messageIntereceptors; @Autowired private SharedReentrantLockBuilderFactory lockBuilderFactory; + @Autowired + private ITkmsInterrupterService tkmsInterrupterService; @TestOnly private volatile boolean paused = false; @@ -87,7 +92,6 @@ public class TkmsStorageToKafkaProxy implements GracefulShutdownStrategy, ITkmsS private final List leaderSelectors = new ArrayList<>(); private RateLimiter exceptionRateLimiter = RateLimiter.create(2); - @Override public void afterPropertiesSet() { for (int s = 0; s < properties.getShardsCount(); s++) { @@ -153,6 +157,16 @@ public void afterPropertiesSet() { } private void poll(Control control, TkmsShardPartition shardPartition) { + var kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(shardPartition, UseCase.PROXY); + + try { + poll0(control, shardPartition, kafkaProducer); + } finally { + tkmsKafkaProducerProvider.closeKafkaProducer(shardPartition, UseCase.PROXY); + } + } + + private void poll0(Control control, TkmsShardPartition shardPartition, KafkaProducer kafkaProducer) { int pollerBatchSize = properties.getPollerBatchSize(shardPartition.getShard()); long startTimeMs = System.currentTimeMillis(); @@ -257,7 +271,7 @@ private void poll(Control control, TkmsShardPartition shardPartition) { var contexts = new MessageProcessingContext[records.size()]; final var kafkaSendStartNanoTime = System.nanoTime(); - var kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(shardPartition.getShard()); + boolean atLeastOneSendDone = false; producerRecordMap.clear(); @@ -335,7 +349,13 @@ private void poll(Control control, TkmsShardPartition shardPartition) { } if (atLeastOneSendDone) { - kafkaProducer.flush(); + var interruptionHandle = + tkmsInterrupterService.interruptAfter(Thread.currentThread(), properties.getInternals().getFlushInterruptionDuration()); + try { + kafkaProducer.flush(); + } finally { + tkmsInterrupterService.cancelInterruption(interruptionHandle); + } } for (int i = 0; i < records.size(); i++) { @@ -372,6 +392,10 @@ private void poll(Control control, TkmsShardPartition shardPartition) { if (failedSendsCount.get() > 0) { proxyCyclePauseRequest.setValue(tkmsPaceMaker.getPollingPauseOnError(shardPartition)); } + } catch (InterruptException e) { + log.error("Kafka producer was interrupted for " + shardPartition + ".", e); + // Rethrow and force the recreation of the producer. + throw e; } catch (Throwable t) { log.error(t.getMessage(), t); proxyCyclePauseRequest.setValue(tkmsPaceMaker.getPollingPauseOnError(shardPartition)); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java index 1e16e30..a76f1b8 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java @@ -71,7 +71,7 @@ public void afterPropertiesSet() { environmentValidator.validate(); for (String topic : properties.getTopics()) { - validateTopic(properties.getDefaultShard(), topic); + validateTopic(topic); } validateDeleteBatchSizes(); @@ -168,7 +168,7 @@ public SendMessagesResult sendMessages(SendMessagesRequest request) { var topic = tkmsMessage.getTopic(); if (!validatedTopics.contains(topic)) { - validateTopic(shardPartition.getShard(), topic); + validateTopic(topic); validatedTopics.add(topic); } @@ -264,7 +264,7 @@ public SendMessageResult sendMessage(SendMessageRequest request) { validateMessageSize(message, 0); var topic = message.getTopic(); - validateTopic(shardPartition.getShard(), topic); + validateTopic(topic); if (deferMessageRegistrationUntilCommit) { // Transaction is guaranteed to be active here. @@ -374,8 +374,8 @@ public void afterCompletion(int status) { /** * Every call to normal `KafkaProducer.send()` uses metadata for a topic as well, so should be very fast. */ - protected void validateTopic(int shard, String topic) { - kafkaProducerProvider.getKafkaProducer(shard).partitionsFor(topic); + protected void validateTopic(String topic) { + kafkaProducerProvider.getKafkaProducerForTopicValidation().partitionsFor(topic); } protected void validateMessages(SendMessagesRequest request) { @@ -392,7 +392,7 @@ protected void validateMessage(TkmsMessage message, int messageIdx) { Preconditions.checkArgument(message.getPartition() >= 0, "%s: Partition number can not be negative: %s", messageIdx, message.getPartition()); } if (message.getKey() != null) { - Preconditions.checkArgument(message.getKey().length() > 0, "%s: Key can not be an empty string.", messageIdx); + Preconditions.checkArgument(!message.getKey().isEmpty(), "%s: Key can not be an empty string.", messageIdx); } if (message.getShard() != null) { Preconditions.checkArgument(message.getShard() >= 0, "%s: Shard number can not be negative :%s", messageIdx, message.getShard()); @@ -462,7 +462,7 @@ private int utf8Length(CharSequence s) { protected void fireMessageRegisteredEvent(TkmsShardPartition shardPartition, Long id, TkmsMessage message) { var listeners = getTkmsEventsListeners(); if (log.isDebugEnabled()) { - log.debug("Message was registered for {} with storage id {}. Listeners count: ", shardPartition, id, listeners.size()); + log.debug("Message was registered for {} with storage id {}. Listeners count: {}.", shardPartition, id, listeners.size()); } if (tkmsEventsListeners.isEmpty()) { diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java index d10d01b..b4c0717 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java @@ -1,10 +1,21 @@ package com.transferwise.kafka.tkms.config; +import com.transferwise.kafka.tkms.api.TkmsShardPartition; import org.apache.kafka.clients.producer.KafkaProducer; public interface ITkmsKafkaProducerProvider { - KafkaProducer getKafkaProducer(int shard); + KafkaProducer getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); - void closeKafkaProducer(int shard); + KafkaProducer getKafkaProducerForTopicValidation(); + + void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); + + void closeKafkaProducerForTopicValidation(); + + enum UseCase { + PROXY, + TEST, + TOPIC_VALIDATION + } } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsConfiguration.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsConfiguration.java index 0d94789..08978ed 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsConfiguration.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsConfiguration.java @@ -4,11 +4,13 @@ import com.transferwise.kafka.tkms.EnvironmentValidator; import com.transferwise.kafka.tkms.IEnvironmentValidator; import com.transferwise.kafka.tkms.IProblemNotifier; +import com.transferwise.kafka.tkms.ITkmsInterrupterService; import com.transferwise.kafka.tkms.ITkmsPaceMaker; import com.transferwise.kafka.tkms.ITkmsStorageToKafkaProxy; import com.transferwise.kafka.tkms.ITkmsZookeeperOperations; import com.transferwise.kafka.tkms.JavaxValidationEnvironmentValidator; import com.transferwise.kafka.tkms.ProblemNotifier; +import com.transferwise.kafka.tkms.TkmsInterrupterService; import com.transferwise.kafka.tkms.TkmsMessageInterceptors; import com.transferwise.kafka.tkms.TkmsPaceMaker; import com.transferwise.kafka.tkms.TkmsStorageToKafkaProxy; @@ -170,4 +172,10 @@ public TkmsClusterWideStateMonitor tkmsClusterWideStateMonitor() { public ProblemNotifier tkmsProblemNotifier() { return new ProblemNotifier(); } + + @Bean + @ConditionalOnMissingBean(ITkmsInterrupterService.class) + public TkmsInterrupterService tkmsInterrupterService() { + return new TkmsInterrupterService(); + } } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java index 2ce4f24..8b283c1 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java @@ -1,6 +1,7 @@ package com.transferwise.kafka.tkms.config; import com.transferwise.common.gracefulshutdown.GracefulShutdownStrategy; +import com.transferwise.kafka.tkms.api.TkmsShardPartition; import com.transferwise.kafka.tkms.config.TkmsProperties.ShardProperties; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; @@ -8,7 +9,10 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import lombok.Data; +import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -24,13 +28,11 @@ public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, Gr @Autowired private MeterRegistry meterRegistry; - private Map> kafkaProducers = new ConcurrentHashMap<>(); - - private Map kafkaClientMetrics = new HashMap<>(); + private Map, ProducerEntry> producers = new ConcurrentHashMap<>(); @Override - public KafkaProducer getKafkaProducer(int shard) { - return kafkaProducers.computeIfAbsent(shard, key -> { + public KafkaProducer getKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) { + return producers.computeIfAbsent(Pair.of(shardPartition, useCase), key -> { Map configs = new HashMap<>(); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -38,63 +40,83 @@ public KafkaProducer getKafkaProducer(int shard) { configs.put(ProducerConfig.ACKS_CONFIG, "all"); configs.put(ProducerConfig.BATCH_SIZE_CONFIG, "163840"); configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf(tkmsProperties.getMaximumMessageBytes())); - configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); + + // The following block is to guarantee the messages order. + configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); + configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + configs.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Please specify 'tw-tkms.kafka.bootstrap.servers'."); configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000"); configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "10000"); - configs.put(ProducerConfig.LINGER_MS_CONFIG, "5"); + configs.put(ProducerConfig.CLIENT_ID_CONFIG, + "tw-tkms-" + shardPartition.getShard() + "-" + shardPartition.getPartition() + "-" + useCase.name().toLowerCase()); + + if (useCase == UseCase.PROXY) { + // We use large lingering time, because we are calling the `.flush()` anyway. + configs.put(ProducerConfig.LINGER_MS_CONFIG, "1000"); + } configs.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "120000"); configs.putAll(tkmsProperties.getKafka()); - ShardProperties shardProperties = tkmsProperties.getShards().get(shard); + ShardProperties shardProperties = tkmsProperties.getShards().get(shardPartition.getShard()); if (shardProperties != null) { configs.putAll(shardProperties.getKafka()); } - KafkaProducer kafkaProducer = new KafkaProducer<>(configs); - kafkaClientMetrics.put(shard, new KafkaClientMetrics(kafkaProducer)); - kafkaClientMetrics.get(shard).bindTo(meterRegistry); - return kafkaProducer; - }); + final var producer = new KafkaProducer(configs); + final var kafkaClientMetrics = new KafkaClientMetrics(producer); + kafkaClientMetrics.bindTo(meterRegistry); + + return new ProducerEntry().setProducer(producer).setKafkaClientMetric(kafkaClientMetrics); + }).getProducer(); } @Override - public void closeKafkaProducer(int shard) { - KafkaClientMetrics kafkaClientMetric = kafkaClientMetrics.remove(shard); - if (kafkaClientMetric != null) { - kafkaClientMetric.close(); + public KafkaProducer getKafkaProducerForTopicValidation() { + return getKafkaProducer(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), UseCase.TOPIC_VALIDATION); + } + + @Override + public void closeKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) { + var producerEntry = producers.remove(Pair.of(shardPartition, useCase)); + + if (producerEntry == null) { + return; } - KafkaProducer producer = kafkaProducers.remove(shard); - if (producer != null) { - try { - producer.close(Duration.ofSeconds(5)); - } catch (Throwable t) { - log.error(t.getMessage(), t); - } + producerEntry.getKafkaClientMetric().close(); + + try { + producerEntry.getProducer().close(Duration.ofSeconds(5)); + } catch (Throwable t) { + log.error("Closing Kafka producer for shard partiton " + shardPartition + " failed.", t); } } @Override - public void applicationTerminating() { - kafkaProducers.forEach((shard, producer) -> { - KafkaClientMetrics kafkaClientMetric = kafkaClientMetrics.remove(shard); - if (kafkaClientMetric != null) { - kafkaClientMetric.close(); - } + public void closeKafkaProducerForTopicValidation() { + closeKafkaProducer(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), UseCase.TOPIC_VALIDATION); + } - try { - producer.close(Duration.ofSeconds(5)); - } catch (Throwable t) { - log.error(t.getMessage(), t); - } - }); + @Override + public void applicationTerminating() { + producers.keySet().forEach(key -> closeKafkaProducer(key.getLeft(), key.getRight())); } @Override public boolean canShutdown() { return true; } + + @Data + @Accessors(chain = true) + protected static class ProducerEntry { + + private KafkaProducer producer; + + private KafkaClientMetrics kafkaClientMetric; + } } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java index b4f0f85..cd49239 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java @@ -523,6 +523,15 @@ public static class Monitoring { public static class Internals { private int assertionLevel = 0; + + /** + * We use quite large duration. + * + *

But the idea is to at least detect hangs and possibly allow self-recovery. + * + *

Too low values may cause very large batches to fail. + */ + private Duration flushInterruptionDuration = Duration.ofSeconds(30); } public enum NotificationLevel { diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java index 6fb41f7..010d3ff 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java @@ -16,6 +16,7 @@ import com.transferwise.kafka.tkms.api.TkmsMessage.Header; import com.transferwise.kafka.tkms.api.TkmsShardPartition; import com.transferwise.kafka.tkms.config.ITkmsDaoProvider; +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; import com.transferwise.kafka.tkms.dao.FaultInjectedTkmsDao; import com.transferwise.kafka.tkms.metrics.TkmsMetricsTemplate; import com.transferwise.kafka.tkms.test.BaseIntTest; @@ -65,6 +66,8 @@ abstract class EndToEndIntTest extends BaseIntTest { private ITkmsDaoProvider tkmsDaoProvider; @Autowired private TkmsStorageToKafkaProxy tkmsStorageToKafkaProxy; + @Autowired + private ITkmsKafkaProducerProvider tkmsKafkaProducerProvider; private FaultInjectedTkmsDao faultInjectedTkmsDao; @@ -168,7 +171,8 @@ void testExactlyOnceDelivery(int scenario) throws Exception { setupConfig(deferUntilCommit); tkmsProperties.setValidateSerialization(validateSerialization); - String message = "Hello World!"; + // For producer to create more batches and spread messages around different partitions. + String message = StringUtils.repeat("Hello World!", 100); int threadsCount = 20; int batchesCount = 20; int batchSize = 20; @@ -247,13 +251,19 @@ void testExactlyOnceDelivery(int scenario) throws Exception { void testThatMessagesWithSameKeyEndUpInOnePartition(boolean deferUntilCommit) { setupConfig(deferUntilCommit); - String message = "Hello World!"; + String protoMessage = "Hello Estonia!"; + String message = StringUtils.repeat(protoMessage, 100); String key = "GrailsRocks"; - int n = 20; + int n = 200; ConcurrentHashMap partitionsMap = new ConcurrentHashMap<>(); AtomicInteger receivedCount = new AtomicInteger(); Consumer> messageCounter = cr -> { + var testEvent = ExceptionUtils.doUnchecked(() -> objectMapper.readValue(cr.value(), TestEvent.class)); + if (!message.equals(testEvent.getMessage())) { + throw new IllegalStateException("Unexpected message '" + message + "' received."); + } + partitionsMap.computeIfAbsent(cr.partition(), (k) -> new AtomicInteger()).incrementAndGet(); receivedCount.incrementAndGet(); }; @@ -462,11 +472,16 @@ void testThatMessagesOrderForAnEntityIsPreservedWithBatches(boolean deferUntilCo @ValueSource(booleans = {false, true}) @SneakyThrows void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean deferUntilCommit) { - setupConfig(deferUntilCommit); + try { + setupConfig(deferUntilCommit); - assertThatThrownBy(() -> transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender - .sendMessage(new TkmsMessage().setTopic("NotExistingTopic").setValue("Stuff".getBytes(StandardCharsets.UTF_8))))) - .hasMessageContaining("Topic NotExistingTopic not present in metadata"); + assertThatThrownBy(() -> transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender + .sendMessage(new TkmsMessage().setTopic("NotExistingTopic").setValue("Stuff".getBytes(StandardCharsets.UTF_8))))) + .hasMessageContaining("Topic NotExistingTopic not present in metadata"); + } finally { + // Stop logs spam about not existing topic in metadata. + tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(); + } } @ParameterizedTest diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/KafkaMetricsIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/KafkaMetricsIntTest.java index b0c703b..952fa63 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/KafkaMetricsIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/KafkaMetricsIntTest.java @@ -1,69 +1,39 @@ package com.transferwise.kafka.tkms; import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.transferwise.common.baseutils.ExceptionUtils; -import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper; -import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender; -import com.transferwise.kafka.tkms.api.TkmsMessage; +import com.transferwise.kafka.tkms.api.TkmsShardPartition; +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase; import com.transferwise.kafka.tkms.test.BaseIntTest; -import com.transferwise.kafka.tkms.test.TestMessagesListener; -import com.transferwise.kafka.tkms.test.TestProperties; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import java.util.concurrent.TimeUnit; +import lombok.SneakyThrows; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; class KafkaMetricsIntTest extends BaseIntTest { - @Autowired - private ObjectMapper objectMapper; - - @Autowired - private ITransactionalKafkaMessageSender transactionalKafkaMessageSender; - - @Autowired - private TestMessagesListener testMessagesListener; - - @Autowired - private TestProperties testProperties; @Autowired - protected ITransactionsHelper transactionsHelper; + private ITkmsKafkaProducerProvider tkmsKafkaProducerProvider; @Test + @SneakyThrows void testThatProducerMetricShowsSentMessage() { - String message = "Hello Toomas!"; - - AtomicInteger receivedCount = new AtomicInteger(); - Consumer> messageCounter = cr -> ExceptionUtils.doUnchecked(() -> { - TestMessagesListener.TestEvent receivedEvent = objectMapper.readValue(cr.value(), TestMessagesListener.TestEvent.class); - if (receivedEvent.getMessage().equals(message)) { - receivedCount.incrementAndGet(); - } else { - throw new IllegalStateException("Wrong message receive: " + receivedEvent.getMessage()); - } - }); + var producer = tkmsKafkaProducerProvider.getKafkaProducer(TkmsShardPartition.of(0, 0), UseCase.TEST); - testMessagesListener.registerConsumer(messageCounter); - try { - TestMessagesListener.TestEvent testEvent = new TestMessagesListener.TestEvent().setId(1L).setMessage(message); + producer.send(new ProducerRecord<>(testProperties.getTestTopic(), new byte[]{})).get(5, TimeUnit.SECONDS); - transactionsHelper.withTransaction().run(() -> - transactionalKafkaMessageSender - .sendMessage(new TkmsMessage().setTopic(testProperties.getTestTopic()) - .setValue(ExceptionUtils.doUnchecked(() -> objectMapper.writeValueAsBytes(testEvent))))); + assertThat(getAccumulativeCount("kafka.producer.record.send.total")) + .as("Producer's metric shows one message sent.").isPositive(); + } - await().until(() -> receivedCount.get() > 0); - waitUntilTablesAreEmpty(); - } finally { - testMessagesListener.unregisterConsumer(messageCounter); + protected double getAccumulativeCount(String metricsName) { + double sum = 0d; + for (var counter : meterRegistry.find(metricsName).functionCounters()) { + sum += counter.count(); } - - assertThat(meterRegistry.find("kafka.producer.record.send.total").tags().functionCounter().count()) - .as("Producer's metric shows one message sent.").isPositive(); + return sum; } } diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTest.java index 8abc316..514e4b2 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTest.java @@ -2,6 +2,8 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.transferwise.kafka.tkms.api.TkmsShardPartition; +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase; import com.transferwise.kafka.tkms.test.BaseIntTest; import java.lang.reflect.Field; import org.apache.kafka.clients.producer.KafkaProducer; @@ -16,7 +18,7 @@ class TkmsKafkaProducerProviderTest extends BaseIntTest { @Test void shardKafkaPropertiesAreApplied() throws Exception { - KafkaProducer kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(1); + KafkaProducer kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(TkmsShardPartition.of(1, 0), UseCase.PROXY); Field producerConfigField = kafkaProducer.getClass().getDeclaredField("producerConfig"); producerConfigField.setAccessible(true); diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/BaseIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/BaseIntTest.java index 4a783d9..52ca713 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/BaseIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/BaseIntTest.java @@ -36,10 +36,13 @@ public class BaseIntTest { @Autowired protected TkmsProperties tkmsProperties; - + @Autowired protected TkmsTestDao tkmsTestDao; + @Autowired + protected TestProperties testProperties; + @AfterEach public void cleanup() { tkmsRegisteredMessagesCollector.clear(); @@ -52,17 +55,14 @@ public void cleanup() { @BeforeEach public void setup() { for (Meter meter : meterRegistry.getMeters()) { - if (!(meter instanceof Gauge) && !(isKafkaProducerMeter(meter))) { + if (!(meter instanceof Gauge)) { meterRegistry.remove(meter); } } meterCache.clear(); - TkmsClockHolder.reset(); - } - private boolean isKafkaProducerMeter(Meter meter) { - return meter.getId().getName().startsWith("kafka.producer."); + TkmsClockHolder.reset(); } protected int getTablesRowsCount() { diff --git a/tw-tkms-starter/src/test/resources/docker-compose.yml b/tw-tkms-starter/src/test/resources/docker-compose.yml index b5b9e60..8895407 100644 --- a/tw-tkms-starter/src/test/resources/docker-compose.yml +++ b/tw-tkms-starter/src/test/resources/docker-compose.yml @@ -2,14 +2,7 @@ version: '3.7' services: zookeeper: - image: bitnami/zookeeper:3.5.5 - ports: - - "2181" - environment: - ALLOW_ANONYMOUS_LOGIN: "yes" - JVMFLAGS: -server -Xms25m -Xmx512m -XX:+HeapDumpOnOutOfMemoryError -XX:GCHeapFreeLimit=5 -XX:GCTimeLimit=90 -XX:SoftRefLRUPolicyMSPerMB=5 -XX:MinHeapFreeRatio=10 -XX:MaxHeapFreeRatio=20 -XX:+ExplicitGCInvokesConcurrent - kafka-zk: - image: bitnami/zookeeper:3.4.14 + image: bitnami/zookeeper:3.7.1 ports: - "2181" environment: @@ -17,22 +10,20 @@ services: JVMFLAGS: -server -Xms25m -Xmx512m -XX:+HeapDumpOnOutOfMemoryError -XX:GCHeapFreeLimit=5 -XX:GCTimeLimit=90 -XX:SoftRefLRUPolicyMSPerMB=5 -XX:MinHeapFreeRatio=10 -XX:MaxHeapFreeRatio=20 -XX:+ExplicitGCInvokesConcurrent kafka: image: wurstmeister/kafka:2.12-2.4.1 - depends_on: - - kafka-zk ports: - "9092" container_name: "tw_tkms_kafka" environment: PORT_COMMAND: "docker port $$(docker ps -q -f name=tw_tkms_kafka) 9092/tcp | cut -d: -f2" KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: kafka-zk:2181 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: "EXTERNAL://localhost:_{PORT_COMMAND},INTERNAL://kafka:9093" KAFKA_LISTENERS: EXTERNAL://:9092,INTERNAL://:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_MESSAGE_MAX_BYTES: 10485760 - KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 10000 + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 20000 KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "true" KAFKA_LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 5 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false"