Skip to content

Commit

Permalink
Separate kafka producers for each proxy (#78)
Browse files Browse the repository at this point in the history
* Introduces more reliability by separating Kafka Producers.
  • Loading branch information
onukristo authored Dec 19, 2023
1 parent accd398 commit b6cb80a
Show file tree
Hide file tree
Showing 22 changed files with 304 additions and 165 deletions.
24 changes: 9 additions & 15 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ jobs:
max-parallel: 100
matrix:
spring_boot_version:
- 3.1.2
- 3.0.7
- 2.7.13
- 2.6.15
- 3.1.6
- 3.0.13
- 2.7.18
env:
SPRING_BOOT_VERSION: ${{ matrix.spring_boot_version }}
GRADLE_OPTS: "-Djava.security.egd=file:/dev/./urandom -Dorg.gradle.parallel=true"
Expand All @@ -31,7 +30,7 @@ jobs:
TW_TKMS_KAFKA_TCP_9092: 9092
TW_TKMS_KAFKA_TCP_HOST: kafka1
ZOOKEEPER_TCP_2181: 2181
ZOOKEEPER_TCP_HOST: zk-service1
ZOOKEEPER_TCP_HOST: zookeeper1
POSTGRES_TCP_HOST: postgres1
POSTGRES_TCP_5432: 5432
container:
Expand All @@ -46,21 +45,16 @@ jobs:
image: transferwiseworkspace/postgres12
env:
POSTGRES_PASSWORD: example-password-change-me
zk-service1:
image: bitnami/zookeeper:3.5.5
env:
ALLOW_ANONYMOUS_LOGIN: "yes"
JVMFLAGS: "-Xmx512m -Xms64m"
zk1:
image: bitnami/zookeeper:3.4.14
zookeeper1:
image: bitnami/zookeeper:3.7.1
env:
ALLOW_ANONYMOUS_LOGIN: "yes"
JVMFLAGS: "-Xmx512m -Xms64m"
kafka1:
image: wurstmeister/kafka:2.12-2.2.0
env:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zk1:2181
KAFKA_ZOOKEEPER_CONNECT: zookeeper1:2181
KAFKA_LISTENERS: PLAINTEXT://:9092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_MESSAGE_MAX_BYTES: "10485760"
Expand All @@ -78,7 +72,7 @@ jobs:
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
with:
gradle-version: 8.1.1
gradle-version: 8.5
gradle-home-cache-cleanup: true
# Comment out when you are upgrading gradle in a branch and doing tons of commits you would need to test.
# cache-read-only: false
Expand Down Expand Up @@ -157,7 +151,7 @@ jobs:
- name: Setup Gradle
uses: gradle/gradle-build-action@v2
with:
gradle-version: 8.1.1
gradle-version: 8.5
gradle-home-cache-cleanup: true
# Comment out when you are upgrading gradle in a branch and doing tons of commits you would need to test.
# cache-read-only: false
Expand Down
32 changes: 29 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,34 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.26.0] - 2023-12-14

### Removed

- Support for Spring Boot 2.6 .

### Changed

- Every proxy has its own, independent, Kafka producer.
Before, one producer was shared by all partitions. And, the default shard's producer was also used for topics validation.

- Kafka producer's flush will be now interrupted from another thread, by a separate housekeeping service.
Wise had an incident, where the `flush()` call hanged forever, and it was not easy to derive that this is the case.
Now we will at least get clear error logs, when this happens.

- Proxies' Kafka producers will be closed after the poll loop exits.
This would allow to recover from unforeseen kafka clients' bugs and also release resources when another pod takes over the proxying.

- The default linger time on proxies' kafka producer was increased from 5 ms. to 1000 ms.
This would allow potentially larger batches to get formed. We are not increasing the latency substantially, because we override the
lingering mechanism via `flush` call anyway.

- Enabled idempotency on producers and increased the in flight requests count to 5.

## [0.25.1] - 2023-10-30

### Added

- Setting METADATA_MAX_AGE_CONFIG to two minutes for producer

## [0.25.0] - 2023-08-09
Expand Down Expand Up @@ -56,9 +82,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
In case there is a rare long-running transaction and its messages need to get sent out.

* An option to defer insertion of messages into the database to the very end of a transaction, just before commit
- `deferMessageRegistrationUntilCommit`
This would generate fairly recent ids for the messages and earliest visible messages system has less chance to not see and thus skip those.
With that, the earliest visible message system can configure a fairly small look-back window and reduce CPU consumption even further.
- `deferMessageRegistrationUntilCommit`
This would generate fairly recent ids for the messages and earliest visible messages system has less chance to not see and thus skip those.
With that, the earliest visible message system can configure a fairly small look-back window and reduce CPU consumption even further.

It can also help to reduce total transaction latency, as all individual messages collected are sent out in batches.

Expand Down
2 changes: 2 additions & 0 deletions TODO.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,5 @@ It’s possible to have our own partitioner and producer interceptor to avoid do

10. Consider changing default partitioner to `RoundRobinPartitioner`.
In that keys messages with <null> keys will be distributed more around and consumers can have smaller latencies.

11. Restructure the docs around deferred messages inserts. Make sure it plays together with "the risks" paragraph etc.
4 changes: 1 addition & 3 deletions build.common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ configurations {
testCompileProtoPath {
extendsFrom(local)
}
productionRuntimeClasspath{
productionRuntimeClasspath {
extendsFrom(local)
}
compileClasspath {
Expand Down Expand Up @@ -143,8 +143,6 @@ test {
}

tasks.findAll { it.name.startsWith("spotbugs") }*.configure {
effort = "max"

excludeFilter = file('../spotbugs-exclude.xml')

reports {
Expand Down
22 changes: 16 additions & 6 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
import com.github.spotbugs.snom.Confidence
import com.github.spotbugs.snom.Effort
import org.eclipse.jgit.api.errors.RefAlreadyExistsException

buildscript {
if (!project.hasProperty("springBootVersion")) {
ext.springBootVersion = System.getenv("SPRING_BOOT_VERSION") ?: "2.7.18"
}
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.9.3'
classpath "com.avast.gradle:gradle-docker-compose-plugin:0.16.12"
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.9.4'
classpath "com.avast.gradle:gradle-docker-compose-plugin:0.17.5"
}
}

plugins {
id 'idea'
id 'org.springframework.boot' version '2.6.14' apply false
id "com.github.spotbugs" version "5.0.14" apply false
id 'org.springframework.boot' version "$springBootVersion" apply false
id "com.github.spotbugs" version "6.0.2"
id "at.zierler.yamlvalidator" version "1.5.0"
id 'org.ajoberstar.grgit' version '5.2.0'
id 'io.github.gradle-nexus.publish-plugin' version "1.1.0"
id 'org.ajoberstar.grgit' version '5.2.1'
id 'io.github.gradle-nexus.publish-plugin' version "1.3.0"
id 'com.github.johnrengelman.shadow' version '8.1.1' apply false
}

Expand All @@ -23,6 +28,11 @@ idea.project {
targetBytecodeVersion = JavaVersion.VERSION_17
}

spotbugs {
effort = Effort.valueOf('MAX')
reportLevel = Confidence.valueOf('DEFAULT')
}

yamlValidator {
searchPaths = ['.circleci/']
allowDuplicates = false
Expand Down
26 changes: 13 additions & 13 deletions build.libraries.gradle
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
ext {
protobufVersion = "3.22.4"
springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: '2.6.15'}"
protobufVersion = "3.24.0"
springBootVersion = "${System.getenv("SPRING_BOOT_VERSION") ?: '2.7.18'}"
libraries = [
// version defined
awaitility : 'org.awaitility:awaitility:4.2.0',
commonsIo : 'commons-io:commons-io:2.11.0',
curatorFramework : 'org.apache.curator:curator-framework:5.5.0',
curatorRecipes : 'org.apache.curator:curator-recipes:5.5.0',
guava : 'com.google.guava:guava:31.1-jre',
guava : 'com.google.guava:guava:32.1.3-jre',
jakartaValidationApi : 'jakarta.validation:jakarta.validation-api:3.0.2',
javaxValidationApi : "javax.validation:validation-api:2.0.1.Final",
kafkaStreams : 'org.apache.kafka:kafka-streams:3.4.0',
kafkaStreams : 'org.apache.kafka:kafka-streams:3.2.3',
lz4Java : 'org.lz4:lz4-java:1.8.0',
protobufJava : "com.google.protobuf:protobuf-java:${protobufVersion}",
semver4j : "com.vdurmont:semver4j:3.1.0",
snappyJava : 'org.xerial.snappy:snappy-java:1.1.10.1',
snappyJava : 'org.xerial.snappy:snappy-java:1.1.10.4',
spotbugsAnnotations : "com.github.spotbugs:spotbugs-annotations:${spotbugs.toolVersion.get()}",
springBootDependencies : "org.springframework.boot:spring-boot-dependencies:${springBootVersion}",
twBaseUtils : 'com.transferwise.common:tw-base-utils:1.10.1',
twContext : 'com.transferwise.common:tw-context:0.12.0',
twContextStarter : 'com.transferwise.common:tw-context-starter:0.12.0',
twGracefulShutdown : 'com.transferwise.common:tw-graceful-shutdown:2.11.0',
twGracefulShutdownInterfaces : 'com.transferwise.common:tw-graceful-shutdown-interfaces:2.11.0',
twLeaderSelector : 'com.transferwise.common:tw-leader-selector:1.10.0',
twLeaderSelectorStarter : 'com.transferwise.common:tw-leader-selector-starter:1.10.0',
zstdJni : 'com.github.luben:zstd-jni:1.5.0-4',
twBaseUtils : 'com.transferwise.common:tw-base-utils:1.12.1',
twContext : 'com.transferwise.common:tw-context:1.0.0',
twContextStarter : 'com.transferwise.common:tw-context-starter:1.0.0',
twGracefulShutdown : 'com.transferwise.common:tw-graceful-shutdown:2.14.2',
twGracefulShutdownInterfaces : 'com.transferwise.common:tw-graceful-shutdown-interfaces:2.14.2',
twLeaderSelector : 'com.transferwise.common:tw-leader-selector:1.10.1',
twLeaderSelectorStarter : 'com.transferwise.common:tw-leader-selector-starter:1.10.1',
zstdJni : 'com.github.luben:zstd-jni:1.5.2-1',

// versions managed by spring-boot-dependencies platform
commonsLang3 : 'org.apache.commons:commons-lang3',
Expand Down
2 changes: 1 addition & 1 deletion docs/setup.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Setup

We are assuming you are using Spring Boot, at least version 2.5.
We are assuming you are using Spring Boot, at least version 2.7.

First ensure that you have the `mavenCentral` repository available in your Gradle buildscript:

Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.25.1
version=0.26.0
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-8.1.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-8.5-bin.zip
networkTimeout=10000
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package com.transferwise.kafka.tkms;

import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor.TaskHandle;
import java.time.Duration;

public interface ITkmsInterrupterService {

TaskHandle interruptAfter(Thread t, Duration duration);

/**
* Cancels the previously set interruption task.
*
* <p>The handle has to be the one returned from the `interruptAfter` call.
*/
void cancelInterruption(TaskHandle handler);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package com.transferwise.kafka.tkms;

import com.transferwise.common.baseutils.concurrency.IExecutorServicesProvider;
import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor;
import com.transferwise.common.baseutils.concurrency.ScheduledTaskExecutor.TaskHandle;
import java.time.Duration;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;

@Slf4j
public class TkmsInterrupterService implements ITkmsInterrupterService, InitializingBean {

@Autowired
private IExecutorServicesProvider executorServicesProvider;
private ScheduledTaskExecutor scheduledTaskExecutor;

public void afterPropertiesSet() {
this.scheduledTaskExecutor = executorServicesProvider.getGlobalScheduledTaskExecutor();
}

@Override
public TaskHandle interruptAfter(Thread t, Duration duration) {
return scheduledTaskExecutor.scheduleOnce(() -> {
var threadName = Thread.currentThread().getName();
try {
Thread.currentThread().setName("tkms-interrupt");
log.warn("Had to interrupt thread '{}'.", t.getName());
t.interrupt();
} finally {
Thread.currentThread().setName(threadName);
}
}, duration);
}

@Override
public void cancelInterruption(TaskHandle handler) {
handler.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import com.transferwise.kafka.tkms.config.ITkmsDaoProvider;
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider;
import com.transferwise.kafka.tkms.config.ITkmsKafkaProducerProvider.UseCase;
import com.transferwise.kafka.tkms.config.TkmsProperties;
import com.transferwise.kafka.tkms.dao.ITkmsDao.MessageRecord;
import com.transferwise.kafka.tkms.metrics.ITkmsMetricsTemplate;
Expand All @@ -41,8 +42,10 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.commons.lang3.mutable.MutableObject;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
Expand Down Expand Up @@ -77,6 +80,8 @@ public class TkmsStorageToKafkaProxy implements GracefulShutdownStrategy, ITkmsS
private ITkmsMessageInterceptors messageIntereceptors;
@Autowired
private SharedReentrantLockBuilderFactory lockBuilderFactory;
@Autowired
private ITkmsInterrupterService tkmsInterrupterService;

@TestOnly
private volatile boolean paused = false;
Expand All @@ -87,7 +92,6 @@ public class TkmsStorageToKafkaProxy implements GracefulShutdownStrategy, ITkmsS
private final List<LeaderSelectorV2> leaderSelectors = new ArrayList<>();
private RateLimiter exceptionRateLimiter = RateLimiter.create(2);


@Override
public void afterPropertiesSet() {
for (int s = 0; s < properties.getShardsCount(); s++) {
Expand Down Expand Up @@ -153,6 +157,16 @@ public void afterPropertiesSet() {
}

private void poll(Control control, TkmsShardPartition shardPartition) {
var kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(shardPartition, UseCase.PROXY);

try {
poll0(control, shardPartition, kafkaProducer);
} finally {
tkmsKafkaProducerProvider.closeKafkaProducer(shardPartition, UseCase.PROXY);
}
}

private void poll0(Control control, TkmsShardPartition shardPartition, KafkaProducer<String, byte[]> kafkaProducer) {

int pollerBatchSize = properties.getPollerBatchSize(shardPartition.getShard());
long startTimeMs = System.currentTimeMillis();
Expand Down Expand Up @@ -257,7 +271,7 @@ private void poll(Control control, TkmsShardPartition shardPartition) {
var contexts = new MessageProcessingContext[records.size()];

final var kafkaSendStartNanoTime = System.nanoTime();
var kafkaProducer = tkmsKafkaProducerProvider.getKafkaProducer(shardPartition.getShard());

boolean atLeastOneSendDone = false;

producerRecordMap.clear();
Expand Down Expand Up @@ -335,7 +349,13 @@ private void poll(Control control, TkmsShardPartition shardPartition) {
}

if (atLeastOneSendDone) {
kafkaProducer.flush();
var interruptionHandle =
tkmsInterrupterService.interruptAfter(Thread.currentThread(), properties.getInternals().getFlushInterruptionDuration());
try {
kafkaProducer.flush();
} finally {
tkmsInterrupterService.cancelInterruption(interruptionHandle);
}
}

for (int i = 0; i < records.size(); i++) {
Expand Down Expand Up @@ -372,6 +392,10 @@ private void poll(Control control, TkmsShardPartition shardPartition) {
if (failedSendsCount.get() > 0) {
proxyCyclePauseRequest.setValue(tkmsPaceMaker.getPollingPauseOnError(shardPartition));
}
} catch (InterruptException e) {
log.error("Kafka producer was interrupted for " + shardPartition + ".", e);
// Rethrow and force the recreation of the producer.
throw e;
} catch (Throwable t) {
log.error(t.getMessage(), t);
proxyCyclePauseRequest.setValue(tkmsPaceMaker.getPollingPauseOnError(shardPartition));
Expand Down
Loading

0 comments on commit b6cb80a

Please sign in to comment.