From 1705649b7d4c2659cb1e1f13c341d701b56f44a1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Fri, 15 Dec 2023 10:50:48 +0200 Subject: [PATCH 1/9] Storage --- .github/workflows/build.yml | 7 +- CHANGELOG.md | 30 +++++++- TODO.md | 2 + build.common.gradle | 4 +- build.gradle | 22 ++++-- build.libraries.gradle | 26 +++---- gradle.properties | 2 +- gradle/wrapper/gradle-wrapper.properties | 2 +- .../kafka/tkms/ITkmsInterrupterService.java | 17 ++++ .../kafka/tkms/TkmsInterrupterService.java | 42 ++++++++++ .../kafka/tkms/TkmsStorageToKafkaProxy.java | 30 +++++++- .../tkms/TransactionalKafkaMessageSender.java | 11 +-- .../config/ITkmsKafkaProducerProvider.java | 11 ++- .../kafka/tkms/config/TkmsConfiguration.java | 8 ++ .../config/TkmsKafkaProducerProvider.java | 77 ++++++++++--------- .../kafka/tkms/config/TkmsProperties.java | 9 +++ .../kafka/tkms/KafkaMetricsIntTest.java | 66 +++++----------- .../config/TkmsKafkaProducerProviderTest.java | 4 +- .../kafka/tkms/test/BaseIntTest.java | 12 +-- .../src/test/resources/docker-compose.yml | 15 +--- 20 files changed, 254 insertions(+), 143 deletions(-) create mode 100644 tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/ITkmsInterrupterService.java create mode 100644 tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsInterrupterService.java diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 94df6cb..5a52267 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -18,10 +18,9 @@ jobs: max-parallel: 100 matrix: spring_boot_version: - - 3.1.2 - - 3.0.7 - - 2.7.13 - - 2.6.15 + - 3.1.6 + - 3.0.13 + - 2.7.18 env: SPRING_BOOT_VERSION: ${{ matrix.spring_boot_version }} GRADLE_OPTS: "-Djava.security.egd=file:/dev/./urandom -Dorg.gradle.parallel=true" diff --git a/CHANGELOG.md b/CHANGELOG.md index 84e2dab..1b5af12 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,8 +5,32 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [0.26.0] - 2023-12-14 + +### Removed + +- Support for Spring Boot 2.6 . + +### Changed + +- Every proxy has its own, independent, Kafka producer. + Before, one producer was shared by all partitions. And, the default shard's producer was also used for topics validation. + +- Kafka producer's flush will be now interrupted from another thread, by a separate housekeeping service. + Wise had an incident, where the `flush()` call hanged forever, and it was not easy to derive that this is the case. + Now we will at least get clear error logs, when this happens. + +- Proxies' Kafka producers will be closed after the poll loop exits. + This would allow to recover from unforeseen kafka clients' bugs and also release resources when another pod takes over the proxying. + +- The default linger time on kafka producer was increased from 5 ms. to 1000 ms. + This would allow potentially larger batches to get formed. We are not increasing the latency, because we override the + lingering mechanism via `flush` call anyway. + ## [0.25.1] - 2023-10-30 + ### Added + - Setting METADATA_MAX_AGE_CONFIG to two minutes for producer ## [0.25.0] - 2023-08-09 @@ -56,9 +80,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 In case there is a rare long-running transaction and its messages need to get sent out. * An option to defer insertion of messages into the database to the very end of a transaction, just before commit - - `deferMessageRegistrationUntilCommit` - This would generate fairly recent ids for the messages and earliest visible messages system has less chance to not see and thus skip those. - With that, the earliest visible message system can configure a fairly small look-back window and reduce CPU consumption even further. + - `deferMessageRegistrationUntilCommit` + This would generate fairly recent ids for the messages and earliest visible messages system has less chance to not see and thus skip those. + With that, the earliest visible message system can configure a fairly small look-back window and reduce CPU consumption even further. It can also help to reduce total transaction latency, as all individual messages collected are sent out in batches. diff --git a/TODO.md b/TODO.md index 0464780..f60c76f 100644 --- a/TODO.md +++ b/TODO.md @@ -34,3 +34,5 @@ It’s possible to have our own partitioner and producer interceptor to avoid do 10. Consider changing default partitioner to `RoundRobinPartitioner`. In that keys messages with keys will be distributed more around and consumers can have smaller latencies. + +11. Restructure the docs around deferred messages inserts. Make sure it plays together with "the risks" paragraph etc. \ No newline at end of file diff --git a/build.common.gradle b/build.common.gradle index 1b16f2f..394bca4 100644 --- a/build.common.gradle +++ b/build.common.gradle @@ -24,7 +24,7 @@ configurations { testCompileProtoPath { extendsFrom(local) } - productionRuntimeClasspath{ + productionRuntimeClasspath { extendsFrom(local) } compileClasspath { @@ -143,8 +143,6 @@ test { } tasks.findAll { it.name.startsWith("spotbugs") }*.configure { - effort = "max" - excludeFilter = file('../spotbugs-exclude.xml') reports { diff --git a/build.gradle b/build.gradle index 3ffb09c..ee3084e 100644 --- a/build.gradle +++ b/build.gradle @@ -1,19 +1,24 @@ +import com.github.spotbugs.snom.Confidence +import com.github.spotbugs.snom.Effort import org.eclipse.jgit.api.errors.RefAlreadyExistsException buildscript { + if (!project.hasProperty("springBootVersion")) { + ext.springBootVersion = System.getenv("SPRING_BOOT_VERSION") ?: "2.7.18" + } dependencies { - classpath 'com.google.protobuf:protobuf-gradle-plugin:0.9.3' - classpath "com.avast.gradle:gradle-docker-compose-plugin:0.16.12" + classpath 'com.google.protobuf:protobuf-gradle-plugin:0.9.4' + classpath "com.avast.gradle:gradle-docker-compose-plugin:0.17.5" } } plugins { id 'idea' - id 'org.springframework.boot' version '2.6.14' apply false - id "com.github.spotbugs" version "5.0.14" apply false + id 'org.springframework.boot' version "$springBootVersion" apply false + id "com.github.spotbugs" version "6.0.2" id "at.zierler.yamlvalidator" version "1.5.0" - id 'org.ajoberstar.grgit' version '5.2.0' - id 'io.github.gradle-nexus.publish-plugin' version "1.1.0" + id 'org.ajoberstar.grgit' version '5.2.1' + id 'io.github.gradle-nexus.publish-plugin' version "1.3.0" id 'com.github.johnrengelman.shadow' version '8.1.1' apply false } @@ -23,6 +28,11 @@ idea.project { targetBytecodeVersion = JavaVersion.VERSION_17 } +spotbugs { + effort = Effort.valueOf('MAX') + reportLevel = Confidence.valueOf('DEFAULT') +} + yamlValidator { searchPaths = ['.circleci/'] allowDuplicates = false diff --git a/build.libraries.gradle b/build.libraries.gradle index f9abd78..9406fd0 100644 --- a/build.libraries.gradle +++ b/build.libraries.gradle @@ -1,30 +1,30 @@ ext { - protobufVersion = "3.22.4" - springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: '2.6.15'}" + protobufVersion = "3.24.0" + springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: '2.7.18'}" libraries = [ // version defined awaitility : 'org.awaitility:awaitility:4.2.0', commonsIo : 'commons-io:commons-io:2.11.0', curatorFramework : 'org.apache.curator:curator-framework:5.5.0', curatorRecipes : 'org.apache.curator:curator-recipes:5.5.0', - guava : 'com.google.guava:guava:31.1-jre', + guava : 'com.google.guava:guava:32.1.3-jre', jakartaValidationApi : 'jakarta.validation:jakarta.validation-api:3.0.2', javaxValidationApi : "javax.validation:validation-api:2.0.1.Final", - kafkaStreams : 'org.apache.kafka:kafka-streams:3.4.0', + kafkaStreams : 'org.apache.kafka:kafka-streams:3.2.3', lz4Java : 'org.lz4:lz4-java:1.8.0', protobufJava : "com.google.protobuf:protobuf-java:${protobufVersion}", semver4j : "com.vdurmont:semver4j:3.1.0", - snappyJava : 'org.xerial.snappy:snappy-java:1.1.10.1', + snappyJava : 'org.xerial.snappy:snappy-java:1.1.10.4', spotbugsAnnotations : "com.github.spotbugs:spotbugs-annotations:${spotbugs.toolVersion.get()}", springBootDependencies : "org.springframework.boot:spring-boot-dependencies:${springBootVersion}", - twBaseUtils : 'com.transferwise.common:tw-base-utils:1.10.1', - twContext : 'com.transferwise.common:tw-context:0.12.0', - twContextStarter : 'com.transferwise.common:tw-context-starter:0.12.0', - twGracefulShutdown : 'com.transferwise.common:tw-graceful-shutdown:2.11.0', - twGracefulShutdownInterfaces : 'com.transferwise.common:tw-graceful-shutdown-interfaces:2.11.0', - twLeaderSelector : 'com.transferwise.common:tw-leader-selector:1.10.0', - twLeaderSelectorStarter : 'com.transferwise.common:tw-leader-selector-starter:1.10.0', - zstdJni : 'com.github.luben:zstd-jni:1.5.0-4', + twBaseUtils : 'com.transferwise.common:tw-base-utils:1.12.1', + twContext : 'com.transferwise.common:tw-context:1.0.0', + twContextStarter : 'com.transferwise.common:tw-context-starter:1.0.0', + twGracefulShutdown : 'com.transferwise.common:tw-graceful-shutdown:2.14.2', + twGracefulShutdownInterfaces : 'com.transferwise.common:tw-graceful-shutdown-interfaces:2.14.2', + twLeaderSelector : 'com.transferwise.common:tw-leader-selector:1.10.1', + twLeaderSelectorStarter : 'com.transferwise.common:tw-leader-selector-starter:1.10.1', + zstdJni : 'com.github.luben:zstd-jni:1.5.2-1', // versions managed by spring-boot-dependencies platform commonsLang3 : 'org.apache.commons:commons-lang3', diff --git a/gradle.properties b/gradle.properties index b73a05f..6f6dd85 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1 +1 @@ -version=0.25.1 +version=0.26.0 diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 37aef8d..3499ded 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,6 +1,6 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip networkTimeout=10000 zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/ITkmsInterrupterService.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/ITkmsInterrupterService.java new file mode 100644 index 0000000..5005865 --- /dev/null +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/ITkmsInterrupterService.java @@ -0,0 +1,17 @@ +package com.transferwise.kafka.tkms; + +import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor.TaskHandle; +import java.time.Duration; + +public interface ITkmsInterrupterService { + + TaskHandle interruptAfter(Thread t, Duration duration); + + /** + * Cancels the previously set interruption task. + * + *

The handle has to be the one returned from the `interruptAfter` call. + */ + void cancelInterruption(TaskHandle handler); + +} diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsInterrupterService.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsInterrupterService.java new file mode 100644 index 0000000..4b4e9bd --- /dev/null +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsInterrupterService.java @@ -0,0 +1,42 @@ +package com.transferwise.kafka.tkms; + +import com.transferwise.common.baseutils.concurrency.IExecutorServicesProvider; +import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor; +import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor.TaskHandle; +import java.time.Duration; +import javax.annotation.PostConstruct; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.InitializingBean; +import org.springframework.beans.factory.annotation.Autowired; + +@Slf4j +public class TkmsInterrupterService implements ITkmsInterrupterService, InitializingBean { + + @Autowired + private IExecutorServicesProvider executorServicesProvider; + private ScheduledTaskExecutor scheduledTaskExecutor; + + @PostConstruct + public void afterPropertiesSet() { + this.scheduledTaskExecutor = executorServicesProvider.getGlobalScheduledTaskExecutor(); + } + + @Override + public TaskHandle interruptAfter(Thread t, Duration duration) { + return scheduledTaskExecutor.scheduleOnce(() -> { + var threadName = Thread.currentThread().getName(); + try { + Thread.currentThread().setName("tkms-interrupt"); + log.warn("Had to interrupt thread '{}'.", t.getName()); + t.interrupt(); + } finally { + Thread.currentThread().setName(threadName); + } + }, duration); + } + + @Override + public void cancelInterruption(TaskHandle handler) { + handler.stop(); + } +} diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java index 677f979..fb72a50 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsStorageToKafkaProxy.java @@ -17,6 +17,7 @@ import com.transferwise.kafka.tkms.api.TkmsShardPartition; import com.transferwise.kafka.tkms.config.ITkmsDaoProvider; import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase; import com.transferwise.kafka.tkms.config.TkmsProperties; import com.transferwise.kafka.tkms.dao.ITkmsDao.MessageRecord; import com.transferwise.kafka.tkms.metrics.ITkmsMetricsTemplate; @@ -41,8 +42,10 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.mutable.MutableLong; import org.apache.commons.lang3.mutable.MutableObject; +import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.internals.RecordHeader; @@ -77,6 +80,8 @@ public class TkmsStorageToKafkaProxy implements GracefulShutdownStrategy, ITkmsS private ITkmsMessageInterceptors messageIntereceptors; @Autowired private SharedReentrantLockBuilderFactory lockBuilderFactory; + @Autowired + private ITkmsInterrupterService tkmsInterrupterService; @TestOnly private volatile boolean paused = false; @@ -87,7 +92,6 @@ public class TkmsStorageToKafkaProxy implements GracefulShutdownStrategy, ITkmsS private final List leaderSelectors = new ArrayList<>(); private RateLimiter exceptionRateLimiter = RateLimiter.create(2); - @Override public void afterPropertiesSet() { for (int s = 0; s < properties.getShardsCount(); s++) { @@ -153,6 +157,16 @@ public void afterPropertiesSet() { } private void poll(Control control, TkmsShardPartition shardPartition) { + var kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(shardPartition, UseCase.PROXY); + + try { + poll0(control, shardPartition, kafkaProducer); + } finally { + tkmsKafkaProducerProvider.closeKafkaProducer(shardPartition, UseCase.PROXY); + } + } + + private void poll0(Control control, TkmsShardPartition shardPartition, KafkaProducer kafkaProducer) { int pollerBatchSize = properties.getPollerBatchSize(shardPartition.getShard()); long startTimeMs = System.currentTimeMillis(); @@ -257,7 +271,7 @@ private void poll(Control control, TkmsShardPartition shardPartition) { var contexts = new MessageProcessingContext[records.size()]; final var kafkaSendStartNanoTime = System.nanoTime(); - var kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(shardPartition.getShard()); + boolean atLeastOneSendDone = false; producerRecordMap.clear(); @@ -335,7 +349,13 @@ private void poll(Control control, TkmsShardPartition shardPartition) { } if (atLeastOneSendDone) { - kafkaProducer.flush(); + var interruptionHandle = + tkmsInterrupterService.interruptAfter(Thread.currentThread(), properties.getInternals().getFlushInterruptionDuration()); + try { + kafkaProducer.flush(); + } finally { + tkmsInterrupterService.cancelInterruption(interruptionHandle); + } } for (int i = 0; i < records.size(); i++) { @@ -372,6 +392,10 @@ private void poll(Control control, TkmsShardPartition shardPartition) { if (failedSendsCount.get() > 0) { proxyCyclePauseRequest.setValue(tkmsPaceMaker.getPollingPauseOnError(shardPartition)); } + } catch (InterruptException e) { + log.error("Kafka producer was interrupted for " + shardPartition + ".", e); + // Rethrow and force the recreation of the producer. + throw e; } catch (Throwable t) { log.error(t.getMessage(), t); proxyCyclePauseRequest.setValue(tkmsPaceMaker.getPollingPauseOnError(shardPartition)); diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java index 1e16e30..be84834 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java @@ -14,6 +14,7 @@ import com.transferwise.kafka.tkms.api.TkmsShardPartition; import com.transferwise.kafka.tkms.config.ITkmsDaoProvider; import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase; import com.transferwise.kafka.tkms.config.TkmsProperties; import com.transferwise.kafka.tkms.config.TkmsProperties.DatabaseDialect; import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationLevel; @@ -71,7 +72,7 @@ public void afterPropertiesSet() { environmentValidator.validate(); for (String topic : properties.getTopics()) { - validateTopic(properties.getDefaultShard(), topic); + validateTopic(TkmsShardPartition.of(properties.getDefaultShard(), 0), topic); } validateDeleteBatchSizes(); @@ -168,7 +169,7 @@ public SendMessagesResult sendMessages(SendMessagesRequest request) { var topic = tkmsMessage.getTopic(); if (!validatedTopics.contains(topic)) { - validateTopic(shardPartition.getShard(), topic); + validateTopic(shardPartition, topic); validatedTopics.add(topic); } @@ -264,7 +265,7 @@ public SendMessageResult sendMessage(SendMessageRequest request) { validateMessageSize(message, 0); var topic = message.getTopic(); - validateTopic(shardPartition.getShard(), topic); + validateTopic(shardPartition, topic); if (deferMessageRegistrationUntilCommit) { // Transaction is guaranteed to be active here. @@ -374,8 +375,8 @@ public void afterCompletion(int status) { /** * Every call to normal `KafkaProducer.send()` uses metadata for a topic as well, so should be very fast. */ - protected void validateTopic(int shard, String topic) { - kafkaProducerProvider.getKafkaProducer(shard).partitionsFor(topic); + protected void validateTopic(TkmsShardPartition shardPartition, String topic) { + kafkaProducerProvider.getKafkaProducer(shardPartition, UseCase.TOPIC_VALIDATION).partitionsFor(topic); } protected void validateMessages(SendMessagesRequest request) { diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java index d10d01b..449e183 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java @@ -1,10 +1,17 @@ package com.transferwise.kafka.tkms.config; +import com.transferwise.kafka.tkms.api.TkmsShardPartition; import org.apache.kafka.clients.producer.KafkaProducer; public interface ITkmsKafkaProducerProvider { - KafkaProducer getKafkaProducer(int shard); + KafkaProducer getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); - void closeKafkaProducer(int shard); + void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); + + enum UseCase { + PROXY, + TEST, + TOPIC_VALIDATION + } } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsConfiguration.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsConfiguration.java index 0d94789..08978ed 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsConfiguration.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsConfiguration.java @@ -4,11 +4,13 @@ import com.transferwise.kafka.tkms.EnvironmentValidator; import com.transferwise.kafka.tkms.IEnvironmentValidator; import com.transferwise.kafka.tkms.IProblemNotifier; +import com.transferwise.kafka.tkms.ITkmsInterrupterService; import com.transferwise.kafka.tkms.ITkmsPaceMaker; import com.transferwise.kafka.tkms.ITkmsStorageToKafkaProxy; import com.transferwise.kafka.tkms.ITkmsZookeeperOperations; import com.transferwise.kafka.tkms.JavaxValidationEnvironmentValidator; import com.transferwise.kafka.tkms.ProblemNotifier; +import com.transferwise.kafka.tkms.TkmsInterrupterService; import com.transferwise.kafka.tkms.TkmsMessageInterceptors; import com.transferwise.kafka.tkms.TkmsPaceMaker; import com.transferwise.kafka.tkms.TkmsStorageToKafkaProxy; @@ -170,4 +172,10 @@ public TkmsClusterWideStateMonitor tkmsClusterWideStateMonitor() { public ProblemNotifier tkmsProblemNotifier() { return new ProblemNotifier(); } + + @Bean + @ConditionalOnMissingBean(ITkmsInterrupterService.class) + public TkmsInterrupterService tkmsInterrupterService() { + return new TkmsInterrupterService(); + } } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java index 2ce4f24..dc7f98c 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java @@ -1,6 +1,7 @@ package com.transferwise.kafka.tkms.config; import com.transferwise.common.gracefulshutdown.GracefulShutdownStrategy; +import com.transferwise.kafka.tkms.api.TkmsShardPartition; import com.transferwise.kafka.tkms.config.TkmsProperties.ShardProperties; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.binder.kafka.KafkaClientMetrics; @@ -8,7 +9,10 @@ import java.util.HashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import lombok.Data; +import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.ByteArraySerializer; @@ -24,13 +28,11 @@ public class TkmsKafkaProducerProvider implements ITkmsKafkaProducerProvider, Gr @Autowired private MeterRegistry meterRegistry; - private Map> kafkaProducers = new ConcurrentHashMap<>(); - - private Map kafkaClientMetrics = new HashMap<>(); + private Map, ProducerEntry> producers = new ConcurrentHashMap<>(); @Override - public KafkaProducer getKafkaProducer(int shard) { - return kafkaProducers.computeIfAbsent(shard, key -> { + public KafkaProducer getKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) { + return producers.computeIfAbsent(Pair.of(shardPartition, useCase), key -> { Map configs = new HashMap<>(); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -43,58 +45,63 @@ public KafkaProducer getKafkaProducer(int shard) { configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000"); configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); configs.put(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG, "10000"); - configs.put(ProducerConfig.LINGER_MS_CONFIG, "5"); + configs.put(ProducerConfig.CLIENT_ID_CONFIG, + "tw-tkms-" + shardPartition.getShard() + "-" + shardPartition.getPartition() + "-" + useCase.name().toLowerCase()); + + if (useCase == UseCase.PROXY) { + // We use large lingering time, because we are calling the `.flush()` anyway. + configs.put(ProducerConfig.LINGER_MS_CONFIG, "1000"); + } configs.put(ProducerConfig.METADATA_MAX_AGE_CONFIG, "120000"); configs.putAll(tkmsProperties.getKafka()); - ShardProperties shardProperties = tkmsProperties.getShards().get(shard); + ShardProperties shardProperties = tkmsProperties.getShards().get(shardPartition.getShard()); if (shardProperties != null) { configs.putAll(shardProperties.getKafka()); } - KafkaProducer kafkaProducer = new KafkaProducer<>(configs); - kafkaClientMetrics.put(shard, new KafkaClientMetrics(kafkaProducer)); - kafkaClientMetrics.get(shard).bindTo(meterRegistry); - return kafkaProducer; - }); + final var producer = new KafkaProducer(configs); + final var kafkaClientMetrics = new KafkaClientMetrics(producer); + kafkaClientMetrics.bindTo(meterRegistry); + + return new ProducerEntry().setProducer(producer).setKafkaClientMetric(kafkaClientMetrics); + }).getProducer(); } @Override - public void closeKafkaProducer(int shard) { - KafkaClientMetrics kafkaClientMetric = kafkaClientMetrics.remove(shard); - if (kafkaClientMetric != null) { - kafkaClientMetric.close(); + public void closeKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) { + var producerEntry = producers.remove(Pair.of(shardPartition, useCase)); + + if (producerEntry == null) { + return; } - KafkaProducer producer = kafkaProducers.remove(shard); - if (producer != null) { - try { - producer.close(Duration.ofSeconds(5)); - } catch (Throwable t) { - log.error(t.getMessage(), t); - } + producerEntry.getKafkaClientMetric().close(); + + try { + producerEntry.getProducer().close(Duration.ofSeconds(5)); + } catch (Throwable t) { + log.error("Closing Kafka producer for shard partiton " + shardPartition + " failed.", t); } } @Override public void applicationTerminating() { - kafkaProducers.forEach((shard, producer) -> { - KafkaClientMetrics kafkaClientMetric = kafkaClientMetrics.remove(shard); - if (kafkaClientMetric != null) { - kafkaClientMetric.close(); - } - - try { - producer.close(Duration.ofSeconds(5)); - } catch (Throwable t) { - log.error(t.getMessage(), t); - } - }); + producers.keySet().forEach(key -> closeKafkaProducer(key.getLeft(), key.getRight())); } @Override public boolean canShutdown() { return true; } + + @Data + @Accessors(chain = true) + protected static class ProducerEntry { + + private KafkaProducer producer; + + private KafkaClientMetrics kafkaClientMetric; + } } diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java index b4f0f85..cd49239 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsProperties.java @@ -523,6 +523,15 @@ public static class Monitoring { public static class Internals { private int assertionLevel = 0; + + /** + * We use quite large duration. + * + *

But the idea is to at least detect hangs and possibly allow self-recovery. + * + *

Too low values may cause very large batches to fail. + */ + private Duration flushInterruptionDuration = Duration.ofSeconds(30); } public enum NotificationLevel { diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/KafkaMetricsIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/KafkaMetricsIntTest.java index b0c703b..952fa63 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/KafkaMetricsIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/KafkaMetricsIntTest.java @@ -1,69 +1,39 @@ package com.transferwise.kafka.tkms; import static org.assertj.core.api.Assertions.assertThat; -import static org.awaitility.Awaitility.await; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.transferwise.common.baseutils.ExceptionUtils; -import com.transferwise.common.baseutils.transactionsmanagement.ITransactionsHelper; -import com.transferwise.kafka.tkms.api.ITransactionalKafkaMessageSender; -import com.transferwise.kafka.tkms.api.TkmsMessage; +import com.transferwise.kafka.tkms.api.TkmsShardPartition; +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase; import com.transferwise.kafka.tkms.test.BaseIntTest; -import com.transferwise.kafka.tkms.test.TestMessagesListener; -import com.transferwise.kafka.tkms.test.TestProperties; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; +import java.util.concurrent.TimeUnit; +import lombok.SneakyThrows; +import org.apache.kafka.clients.producer.ProducerRecord; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; class KafkaMetricsIntTest extends BaseIntTest { - @Autowired - private ObjectMapper objectMapper; - - @Autowired - private ITransactionalKafkaMessageSender transactionalKafkaMessageSender; - - @Autowired - private TestMessagesListener testMessagesListener; - - @Autowired - private TestProperties testProperties; @Autowired - protected ITransactionsHelper transactionsHelper; + private ITkmsKafkaProducerProvider tkmsKafkaProducerProvider; @Test + @SneakyThrows void testThatProducerMetricShowsSentMessage() { - String message = "Hello Toomas!"; - - AtomicInteger receivedCount = new AtomicInteger(); - Consumer> messageCounter = cr -> ExceptionUtils.doUnchecked(() -> { - TestMessagesListener.TestEvent receivedEvent = objectMapper.readValue(cr.value(), TestMessagesListener.TestEvent.class); - if (receivedEvent.getMessage().equals(message)) { - receivedCount.incrementAndGet(); - } else { - throw new IllegalStateException("Wrong message receive: " + receivedEvent.getMessage()); - } - }); + var producer = tkmsKafkaProducerProvider.getKafkaProducer(TkmsShardPartition.of(0, 0), UseCase.TEST); - testMessagesListener.registerConsumer(messageCounter); - try { - TestMessagesListener.TestEvent testEvent = new TestMessagesListener.TestEvent().setId(1L).setMessage(message); + producer.send(new ProducerRecord<>(testProperties.getTestTopic(), new byte[]{})).get(5, TimeUnit.SECONDS); - transactionsHelper.withTransaction().run(() -> - transactionalKafkaMessageSender - .sendMessage(new TkmsMessage().setTopic(testProperties.getTestTopic()) - .setValue(ExceptionUtils.doUnchecked(() -> objectMapper.writeValueAsBytes(testEvent))))); + assertThat(getAccumulativeCount("kafka.producer.record.send.total")) + .as("Producer's metric shows one message sent.").isPositive(); + } - await().until(() -> receivedCount.get() > 0); - waitUntilTablesAreEmpty(); - } finally { - testMessagesListener.unregisterConsumer(messageCounter); + protected double getAccumulativeCount(String metricsName) { + double sum = 0d; + for (var counter : meterRegistry.find(metricsName).functionCounters()) { + sum += counter.count(); } - - assertThat(meterRegistry.find("kafka.producer.record.send.total").tags().functionCounter().count()) - .as("Producer's metric shows one message sent.").isPositive(); + return sum; } } diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTest.java index 8abc316..514e4b2 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProviderTest.java @@ -2,6 +2,8 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.transferwise.kafka.tkms.api.TkmsShardPartition; +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase; import com.transferwise.kafka.tkms.test.BaseIntTest; import java.lang.reflect.Field; import org.apache.kafka.clients.producer.KafkaProducer; @@ -16,7 +18,7 @@ class TkmsKafkaProducerProviderTest extends BaseIntTest { @Test void shardKafkaPropertiesAreApplied() throws Exception { - KafkaProducer kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(1); + KafkaProducer kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(TkmsShardPartition.of(1, 0), UseCase.PROXY); Field producerConfigField = kafkaProducer.getClass().getDeclaredField("producerConfig"); producerConfigField.setAccessible(true); diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/BaseIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/BaseIntTest.java index 4a783d9..52ca713 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/BaseIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/test/BaseIntTest.java @@ -36,10 +36,13 @@ public class BaseIntTest { @Autowired protected TkmsProperties tkmsProperties; - + @Autowired protected TkmsTestDao tkmsTestDao; + @Autowired + protected TestProperties testProperties; + @AfterEach public void cleanup() { tkmsRegisteredMessagesCollector.clear(); @@ -52,17 +55,14 @@ public void cleanup() { @BeforeEach public void setup() { for (Meter meter : meterRegistry.getMeters()) { - if (!(meter instanceof Gauge) && !(isKafkaProducerMeter(meter))) { + if (!(meter instanceof Gauge)) { meterRegistry.remove(meter); } } meterCache.clear(); - TkmsClockHolder.reset(); - } - private boolean isKafkaProducerMeter(Meter meter) { - return meter.getId().getName().startsWith("kafka.producer."); + TkmsClockHolder.reset(); } protected int getTablesRowsCount() { diff --git a/tw-tkms-starter/src/test/resources/docker-compose.yml b/tw-tkms-starter/src/test/resources/docker-compose.yml index b5b9e60..8895407 100644 --- a/tw-tkms-starter/src/test/resources/docker-compose.yml +++ b/tw-tkms-starter/src/test/resources/docker-compose.yml @@ -2,14 +2,7 @@ version: '3.7' services: zookeeper: - image: bitnami/zookeeper:3.5.5 - ports: - - "2181" - environment: - ALLOW_ANONYMOUS_LOGIN: "yes" - JVMFLAGS: -server -Xms25m -Xmx512m -XX:+HeapDumpOnOutOfMemoryError -XX:GCHeapFreeLimit=5 -XX:GCTimeLimit=90 -XX:SoftRefLRUPolicyMSPerMB=5 -XX:MinHeapFreeRatio=10 -XX:MaxHeapFreeRatio=20 -XX:+ExplicitGCInvokesConcurrent - kafka-zk: - image: bitnami/zookeeper:3.4.14 + image: bitnami/zookeeper:3.7.1 ports: - "2181" environment: @@ -17,22 +10,20 @@ services: JVMFLAGS: -server -Xms25m -Xmx512m -XX:+HeapDumpOnOutOfMemoryError -XX:GCHeapFreeLimit=5 -XX:GCTimeLimit=90 -XX:SoftRefLRUPolicyMSPerMB=5 -XX:MinHeapFreeRatio=10 -XX:MaxHeapFreeRatio=20 -XX:+ExplicitGCInvokesConcurrent kafka: image: wurstmeister/kafka:2.12-2.4.1 - depends_on: - - kafka-zk ports: - "9092" container_name: "tw_tkms_kafka" environment: PORT_COMMAND: "docker port $$(docker ps -q -f name=tw_tkms_kafka) 9092/tcp | cut -d: -f2" KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: kafka-zk:2181 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 KAFKA_ADVERTISED_LISTENERS: "EXTERNAL://localhost:_{PORT_COMMAND},INTERNAL://kafka:9093" KAFKA_LISTENERS: EXTERNAL://:9092,INTERNAL://:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_MESSAGE_MAX_BYTES: 10485760 - KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 10000 + KAFKA_ZOOKEEPER_SESSION_TIMEOUT_MS: 20000 KAFKA_UNCLEAN_LEADER_ELECTION_ENABLE: "true" KAFKA_LEADER_IMBALANCE_CHECK_INTERVAL_SECONDS: 5 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "false" From 5c34bd56c399d71479f2f61c7f32060a4374236a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Fri, 15 Dec 2023 10:56:34 +0200 Subject: [PATCH 2/9] Fix. --- .github/workflows/build.yml | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 5a52267..cfb073a 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -45,13 +45,8 @@ jobs: image: transferwiseworkspace/postgres12 env: POSTGRES_PASSWORD: example-password-change-me - zk-service1: - image: bitnami/zookeeper:3.5.5 - env: - ALLOW_ANONYMOUS_LOGIN: "yes" - JVMFLAGS: "-Xmx512m -Xms64m" - zk1: - image: bitnami/zookeeper:3.4.14 + zookeeper1: + image: bitnami/zookeeper:3.7.1 env: ALLOW_ANONYMOUS_LOGIN: "yes" JVMFLAGS: "-Xmx512m -Xms64m" @@ -59,7 +54,7 @@ jobs: image: wurstmeister/kafka:2.12-2.2.0 env: KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zk1:2181 + KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181 KAFKA_LISTENERS: PLAINTEXT://:9092 KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 KAFKA_MESSAGE_MAX_BYTES: "10485760" @@ -77,7 +72,7 @@ jobs: - name: Setup Gradle uses: gradle/gradle-build-action@v2 with: - gradle-version: 8.1.1 + gradle-version: 8.5 gradle-home-cache-cleanup: true # Comment out when you are upgrading gradle in a branch and doing tons of commits you would need to test. # cache-read-only: false @@ -156,7 +151,7 @@ jobs: - name: Setup Gradle uses: gradle/gradle-build-action@v2 with: - gradle-version: 8.1.1 + gradle-version: 8.5 gradle-home-cache-cleanup: true # Comment out when you are upgrading gradle in a branch and doing tons of commits you would need to test. # cache-read-only: false From 51c7bcf4a9d03bb8a73cd9432532e55b0fb037ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Fri, 15 Dec 2023 10:57:27 +0200 Subject: [PATCH 3/9] Fix. --- .../com/transferwise/kafka/tkms/TkmsInterrupterService.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsInterrupterService.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsInterrupterService.java index 4b4e9bd..159a4f3 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsInterrupterService.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TkmsInterrupterService.java @@ -4,7 +4,6 @@ import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor; import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor.TaskHandle; import java.time.Duration; -import javax.annotation.PostConstruct; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.InitializingBean; import org.springframework.beans.factory.annotation.Autowired; @@ -16,7 +15,6 @@ public class TkmsInterrupterService implements ITkmsInterrupterService, Initiali private IExecutorServicesProvider executorServicesProvider; private ScheduledTaskExecutor scheduledTaskExecutor; - @PostConstruct public void afterPropertiesSet() { this.scheduledTaskExecutor = executorServicesProvider.getGlobalScheduledTaskExecutor(); } From 86f369150c5e8be30aec5cb785e83e03b17a85bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Fri, 15 Dec 2023 11:13:49 +0200 Subject: [PATCH 4/9] Fix. --- .github/workflows/build.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index cfb073a..907bfae 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -30,7 +30,7 @@ jobs: TW_TKMS_KAFKA_TCP_9092: 9092 TW_TKMS_KAFKA_TCP_HOST: kafka1 ZOOKEEPER_TCP_2181: 2181 - ZOOKEEPER_TCP_HOST: zk-service1 + ZOOKEEPER_TCP_HOST: zookeeper1 POSTGRES_TCP_HOST: postgres1 POSTGRES_TCP_5432: 5432 container: From 82d94a45842efb39fce291e09ae7939d33fb4352 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Fri, 15 Dec 2023 12:20:45 +0200 Subject: [PATCH 5/9] Fix. --- .../tkms/TransactionalKafkaMessageSender.java | 15 +++++++------- .../config/ITkmsKafkaProducerProvider.java | 4 ++++ .../config/TkmsKafkaProducerProvider.java | 17 +++++++++++++++- .../kafka/tkms/EndToEndIntTest.java | 20 ++++++++++++++----- 4 files changed, 42 insertions(+), 14 deletions(-) diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java index be84834..a76f1b8 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/TransactionalKafkaMessageSender.java @@ -14,7 +14,6 @@ import com.transferwise.kafka.tkms.api.TkmsShardPartition; import com.transferwise.kafka.tkms.config.ITkmsDaoProvider; import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; -import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase; import com.transferwise.kafka.tkms.config.TkmsProperties; import com.transferwise.kafka.tkms.config.TkmsProperties.DatabaseDialect; import com.transferwise.kafka.tkms.config.TkmsProperties.NotificationLevel; @@ -72,7 +71,7 @@ public void afterPropertiesSet() { environmentValidator.validate(); for (String topic : properties.getTopics()) { - validateTopic(TkmsShardPartition.of(properties.getDefaultShard(), 0), topic); + validateTopic(topic); } validateDeleteBatchSizes(); @@ -169,7 +168,7 @@ public SendMessagesResult sendMessages(SendMessagesRequest request) { var topic = tkmsMessage.getTopic(); if (!validatedTopics.contains(topic)) { - validateTopic(shardPartition, topic); + validateTopic(topic); validatedTopics.add(topic); } @@ -265,7 +264,7 @@ public SendMessageResult sendMessage(SendMessageRequest request) { validateMessageSize(message, 0); var topic = message.getTopic(); - validateTopic(shardPartition, topic); + validateTopic(topic); if (deferMessageRegistrationUntilCommit) { // Transaction is guaranteed to be active here. @@ -375,8 +374,8 @@ public void afterCompletion(int status) { /** * Every call to normal `KafkaProducer.send()` uses metadata for a topic as well, so should be very fast. */ - protected void validateTopic(TkmsShardPartition shardPartition, String topic) { - kafkaProducerProvider.getKafkaProducer(shardPartition, UseCase.TOPIC_VALIDATION).partitionsFor(topic); + protected void validateTopic(String topic) { + kafkaProducerProvider.getKafkaProducerForTopicValidation().partitionsFor(topic); } protected void validateMessages(SendMessagesRequest request) { @@ -393,7 +392,7 @@ protected void validateMessage(TkmsMessage message, int messageIdx) { Preconditions.checkArgument(message.getPartition() >= 0, "%s: Partition number can not be negative: %s", messageIdx, message.getPartition()); } if (message.getKey() != null) { - Preconditions.checkArgument(message.getKey().length() > 0, "%s: Key can not be an empty string.", messageIdx); + Preconditions.checkArgument(!message.getKey().isEmpty(), "%s: Key can not be an empty string.", messageIdx); } if (message.getShard() != null) { Preconditions.checkArgument(message.getShard() >= 0, "%s: Shard number can not be negative :%s", messageIdx, message.getShard()); @@ -463,7 +462,7 @@ private int utf8Length(CharSequence s) { protected void fireMessageRegisteredEvent(TkmsShardPartition shardPartition, Long id, TkmsMessage message) { var listeners = getTkmsEventsListeners(); if (log.isDebugEnabled()) { - log.debug("Message was registered for {} with storage id {}. Listeners count: ", shardPartition, id, listeners.size()); + log.debug("Message was registered for {} with storage id {}. Listeners count: {}.", shardPartition, id, listeners.size()); } if (tkmsEventsListeners.isEmpty()) { diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java index 449e183..b4c0717 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/ITkmsKafkaProducerProvider.java @@ -7,8 +7,12 @@ public interface ITkmsKafkaProducerProvider { KafkaProducer getKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); + KafkaProducer getKafkaProducerForTopicValidation(); + void closeKafkaProducer(TkmsShardPartition tkmsShardPartition, UseCase useCase); + void closeKafkaProducerForTopicValidation(); + enum UseCase { PROXY, TEST, diff --git a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java index dc7f98c..8b283c1 100644 --- a/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java +++ b/tw-tkms-starter/src/main/java/com/transferwise/kafka/tkms/config/TkmsKafkaProducerProvider.java @@ -40,7 +40,12 @@ public KafkaProducer getKafkaProducer(TkmsShardPartition shardPa configs.put(ProducerConfig.ACKS_CONFIG, "all"); configs.put(ProducerConfig.BATCH_SIZE_CONFIG, "163840"); configs.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, String.valueOf(tkmsProperties.getMaximumMessageBytes())); - configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 1); + + // The following block is to guarantee the messages order. + configs.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); + configs.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); + configs.put(ProducerConfig.RETRIES_CONFIG, Integer.MAX_VALUE); + configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Please specify 'tw-tkms.kafka.bootstrap.servers'."); configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "5000"); configs.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "5000"); @@ -69,6 +74,11 @@ public KafkaProducer getKafkaProducer(TkmsShardPartition shardPa }).getProducer(); } + @Override + public KafkaProducer getKafkaProducerForTopicValidation() { + return getKafkaProducer(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), UseCase.TOPIC_VALIDATION); + } + @Override public void closeKafkaProducer(TkmsShardPartition shardPartition, UseCase useCase) { var producerEntry = producers.remove(Pair.of(shardPartition, useCase)); @@ -86,6 +96,11 @@ public void closeKafkaProducer(TkmsShardPartition shardPartition, UseCase useCas } } + @Override + public void closeKafkaProducerForTopicValidation() { + closeKafkaProducer(TkmsShardPartition.of(tkmsProperties.getDefaultShard(), 0), UseCase.TOPIC_VALIDATION); + } + @Override public void applicationTerminating() { producers.keySet().forEach(key -> closeKafkaProducer(key.getLeft(), key.getRight())); diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java index 6fb41f7..ddd029e 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java @@ -16,6 +16,7 @@ import com.transferwise.kafka.tkms.api.TkmsMessage.Header; import com.transferwise.kafka.tkms.api.TkmsShardPartition; import com.transferwise.kafka.tkms.config.ITkmsDaoProvider; +import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider; import com.transferwise.kafka.tkms.dao.FaultInjectedTkmsDao; import com.transferwise.kafka.tkms.metrics.TkmsMetricsTemplate; import com.transferwise.kafka.tkms.test.BaseIntTest; @@ -65,6 +66,8 @@ abstract class EndToEndIntTest extends BaseIntTest { private ITkmsDaoProvider tkmsDaoProvider; @Autowired private TkmsStorageToKafkaProxy tkmsStorageToKafkaProxy; + @Autowired + private ITkmsKafkaProducerProvider tkmsKafkaProducerProvider; private FaultInjectedTkmsDao faultInjectedTkmsDao; @@ -168,7 +171,8 @@ void testExactlyOnceDelivery(int scenario) throws Exception { setupConfig(deferUntilCommit); tkmsProperties.setValidateSerialization(validateSerialization); - String message = "Hello World!"; + // For producer to create more batches and spread messages around different partitions. + String message = StringUtils.repeat("Hello World!", 100); int threadsCount = 20; int batchesCount = 20; int batchSize = 20; @@ -462,11 +466,17 @@ void testThatMessagesOrderForAnEntityIsPreservedWithBatches(boolean deferUntilCo @ValueSource(booleans = {false, true}) @SneakyThrows void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean deferUntilCommit) { - setupConfig(deferUntilCommit); + try { + setupConfig(deferUntilCommit); - assertThatThrownBy(() -> transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender - .sendMessage(new TkmsMessage().setTopic("NotExistingTopic").setValue("Stuff".getBytes(StandardCharsets.UTF_8))))) - .hasMessageContaining("Topic NotExistingTopic not present in metadata"); + assertThatThrownBy(() -> transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender + .sendMessage(new TkmsMessage().setTopic("NotExistingTopic").setValue("Stuff".getBytes(StandardCharsets.UTF_8))))) + .hasMessageContaining("Topic NotExistingTopic not present in metadata"); + } + finally { + // Stop logs spam about not existing topic in metadata. + tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(); + } } @ParameterizedTest From c208ab3ba72361c40c25215fb77298bc4ca12456 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Fri, 15 Dec 2023 12:22:13 +0200 Subject: [PATCH 6/9] Fix. --- CHANGELOG.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1b5af12..f84093f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -27,6 +27,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 This would allow potentially larger batches to get formed. We are not increasing the latency, because we override the lingering mechanism via `flush` call anyway. +- Enabled idempotency on producers and increased the in flight requests count to 5. + ## [0.25.1] - 2023-10-30 ### Added From dc041a76f3959002db7a1442c28fbf4c7a56df2d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Fri, 15 Dec 2023 12:34:08 +0200 Subject: [PATCH 7/9] Fix. --- .../test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java index ddd029e..2d3063a 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java @@ -472,8 +472,7 @@ void sendingToUnknownTopicWillBePreventedWhenTopicAutoCreationIsDisabled(boolean assertThatThrownBy(() -> transactionsHelper.withTransaction().run(() -> transactionalKafkaMessageSender .sendMessage(new TkmsMessage().setTopic("NotExistingTopic").setValue("Stuff".getBytes(StandardCharsets.UTF_8))))) .hasMessageContaining("Topic NotExistingTopic not present in metadata"); - } - finally { + } finally { // Stop logs spam about not existing topic in metadata. tkmsKafkaProducerProvider.closeKafkaProducerForTopicValidation(); } From f6a7cfcbabb4633be4b0daba7bd9bf8a7449e10c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Fri, 15 Dec 2023 12:47:36 +0200 Subject: [PATCH 8/9] Fix. --- .../com/transferwise/kafka/tkms/EndToEndIntTest.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java index 2d3063a..010d3ff 100644 --- a/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java +++ b/tw-tkms-starter/src/test/java/com/transferwise/kafka/tkms/EndToEndIntTest.java @@ -251,13 +251,19 @@ void testExactlyOnceDelivery(int scenario) throws Exception { void testThatMessagesWithSameKeyEndUpInOnePartition(boolean deferUntilCommit) { setupConfig(deferUntilCommit); - String message = "Hello World!"; + String protoMessage = "Hello Estonia!"; + String message = StringUtils.repeat(protoMessage, 100); String key = "GrailsRocks"; - int n = 20; + int n = 200; ConcurrentHashMap partitionsMap = new ConcurrentHashMap<>(); AtomicInteger receivedCount = new AtomicInteger(); Consumer> messageCounter = cr -> { + var testEvent = ExceptionUtils.doUnchecked(() -> objectMapper.readValue(cr.value(), TestEvent.class)); + if (!message.equals(testEvent.getMessage())) { + throw new IllegalStateException("Unexpected message '" + message + "' received."); + } + partitionsMap.computeIfAbsent(cr.partition(), (k) -> new AtomicInteger()).incrementAndGet(); receivedCount.incrementAndGet(); }; From 81eb6c68e00de75d816dda6d30af508675dffde3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Mon, 18 Dec 2023 16:47:16 +0200 Subject: [PATCH 9/9] Fix. --- CHANGELOG.md | 4 ++-- docs/setup.md | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f84093f..a152613 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,8 +23,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Proxies' Kafka producers will be closed after the poll loop exits. This would allow to recover from unforeseen kafka clients' bugs and also release resources when another pod takes over the proxying. -- The default linger time on kafka producer was increased from 5 ms. to 1000 ms. - This would allow potentially larger batches to get formed. We are not increasing the latency, because we override the +- The default linger time on proxies' kafka producer was increased from 5 ms. to 1000 ms. + This would allow potentially larger batches to get formed. We are not increasing the latency substantially, because we override the lingering mechanism via `flush` call anyway. - Enabled idempotency on producers and increased the in flight requests count to 5. diff --git a/docs/setup.md b/docs/setup.md index 0855b51..9805c34 100644 --- a/docs/setup.md +++ b/docs/setup.md @@ -1,6 +1,6 @@ # Setup -We are assuming you are using Spring Boot, at least version 2.5. +We are assuming you are using Spring Boot, at least version 2.7. First ensure that you have the `mavenCentral` repository available in your Gradle buildscript: