Skip to content

Commit

Permalink
Avoiding and softening issues around corrupted messages. (#74)
Browse files Browse the repository at this point in the history
* Avoiding and softening issues around corrupted messages.
  • Loading branch information
onukristo authored Aug 10, 2023
1 parent 3acf194 commit e4ececc
Show file tree
Hide file tree
Showing 12 changed files with 336 additions and 91 deletions.
21 changes: 18 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,21 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [0.25.0] - 2023-08-09

### Added

* Message id into error logs.
* Message id into MDC.
* `validateSerialization` option as a guardrail for corrupted gzip inflation, due to zlib bugs etc.

### Changed

* `protobuf-java` will be shaded to avoid incompatibility issues in services.
Articles point out that it is recommended to use the same `protobuf-java` version which was used to generate java stubs (in our case StoredMessage).
Even when historically `protobuf-java` has had good backward compatibility then it is not guaranteed. And forward compatibility had been pretty bad
in our experience.

## [0.24.3] - 2023-08-01

### Added
Expand Down Expand Up @@ -37,9 +52,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
In case there is a rare long-running transaction and its messages need to get sent out.

* An option to defer insertion of messages into the database to the very end of a transaction, just before commit
- `deferMessageRegistrationUntilCommit`
This would generate fairly recent ids for the messages and earliest visible messages system has less chance to not see and thus skip those.
With that, the earliest visible message system can configure a fairly small look-back window and reduce CPU consumption even further.
- `deferMessageRegistrationUntilCommit`
This would generate fairly recent ids for the messages and earliest visible messages system has less chance to not see and thus skip those.
With that, the earliest visible message system can configure a fairly small look-back window and reduce CPU consumption even further.

It can also help to reduce total transaction latency, as all individual messages collected are sent out in batches.

Expand Down
3 changes: 3 additions & 0 deletions build.common.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ configurations {
testAnnotationProcessor {
extendsFrom(local)
}
shadow {
extendsFrom(local)
}

all {
resolutionStrategy {
Expand Down
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ plugins {
id "at.zierler.yamlvalidator" version "1.5.0"
id 'org.ajoberstar.grgit' version '5.2.0'
id 'io.github.gradle-nexus.publish-plugin' version "1.1.0"
id 'com.github.johnrengelman.shadow' version '8.1.1' apply false
}

idea.project {
Expand Down
1 change: 1 addition & 0 deletions build.library.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -76,4 +76,5 @@ publishing {
}
}
}

}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
version=0.24.3
version=0.25.0
120 changes: 119 additions & 1 deletion tw-tkms-starter/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,19 @@ plugins {
id 'idea'
id "com.google.protobuf"
id "docker-compose"
id 'com.github.johnrengelman.shadow'
id 'maven-publish'
id 'signing'
}

ext.projectGitHubRepoName = "tw-tkms"
ext.projectScmUrl = "https://github.com/transferwise/${projectGitHubRepoName}"
ext.projectScmConnection = "scm:git:git://github.com/transferwise/${projectGitHubRepoName}.git"
ext.projectName = "tw-tkms-starter"
ext.projectDescription = "tw-tkms-starter"
ext.projectArtifactName = "tw-tkms-starter"

apply from: "$rootProject.rootDir/build.common.gradle"
apply from: "$rootProject.rootDir/build.library.gradle"

dependencies {
annotationProcessor libraries.springBootConfigurationProcessor
Expand Down Expand Up @@ -99,3 +104,116 @@ test {
dockerCompose.exposeAsEnvironment(test)
}
}

/*
Protobuf version in a service may not be compatible with our generated `StoredMessage`.
It is safer and better to shadow the version we used.
*/
shadowJar {
dependencies {
dependencies {
exclude(dependency {
it.moduleName != 'protobuf-java'
})
}
}
manifest {
attributes 'Implementation-Version': "$project.version"
}
relocate('com.google.protobuf', 'com.transferwise.kafka.tkms.shadow.com.google.protobuf')

// Minimize does not reduce the jar much (1.9->1.5 MB), so let's not risk/mess with that.
/*
minimize {}
*/
}

jar.enabled = false
jar.dependsOn shadowJar

shadowJar {
archiveClassifier.set('')
}

publishing {
publications {
entrypoints(MavenPublication) { publication ->
artifactId projectArtifactName

artifacts = [shadowJar, javadocJar, sourcesJar]
/*
This ensures that libraries will have explicit dependency versions in their Maven POM and Gradle module files, so that there would be less
ambiguity and less chances of dependency conflicts.
*/
versionMapping {
usage('java-api') {
fromResolutionOf('runtimeClasspath')
}
usage('java-runtime') {
fromResolutionOf('runtimeClasspath')
}
}

pom {
name = projectName
description = projectDescription
url = projectScmUrl
packaging = "jar"
licenses {
license {
name = 'The Apache License, Version 2.0, Copyright 2021 TransferWise Ltd'
url = 'http://www.apache.org/licenses/LICENSE-2.0.txt'
}
}
developers {
developer {
id = 'onukristo'
name = 'Kristo Kuusküll'
email = "[email protected]"
organization = "Transferwise Ltd"
organizationUrl = "https://github.com/transferwise"
}
}
scm {
connection = projectScmConnection
developerConnection = projectScmConnection
url = projectScmUrl
}
withXml { xml ->
def dependenciesNode = xml.asNode().get('dependencies') ?: xml.asNode().appendNode('dependencies')

project.configurations.getByName("runtimeClasspath").resolvedConfiguration.firstLevelModuleDependencies.forEach {
if (it.configuration != "platform-runtime" && it.moduleName != 'protobuf-java') {
def dependencyNode = dependenciesNode.appendNode('dependency')
dependencyNode.appendNode('groupId', it.moduleGroup)
dependencyNode.appendNode('artifactId', it.moduleName)
dependencyNode.appendNode('version', it.moduleVersion)
dependencyNode.appendNode('scope', 'runtime')
}
}

if (!asNode().dependencyManagement.isEmpty()) {
throw new IllegalStateException("There should not be any `dependencyManagement` block in POM.")
}
}
}
}
}

if (System.getenv("OSS_SIGNING_KEY")) {
signing {
useInMemoryPgpKeys(System.getenv("OSS_SIGNING_KEY"), System.getenv("OSS_SIGNING_PASSWORD"))
sign publishing.publications.entrypoints
}
}

repositories {
maven {
url System.getenv("MAVEN_URL")
credentials {
username = System.getenv("MAVEN_USER")
password = System.getenv("MAVEN_PASSWORD")
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.slf4j.MDC;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
Expand Down Expand Up @@ -223,7 +224,7 @@ private void poll(Control control, TkmsShardPartition shardPartition) {
// Essentially forces polling of all records
earliestMessageIdToUse = -1L;

log.info("Polling all messages for {}, to make sure we are not missing some created by long running transactions.",
log.info("Polling all messages for '{}', to make sure we are not missing some created by long running transactions.",
shardPartition);

lastPollAllTimeMs.setValue(System.currentTimeMillis());
Expand Down Expand Up @@ -271,57 +272,65 @@ private void poll(Control control, TkmsShardPartition shardPartition) {

for (int i = 0; i < records.size(); i++) {
MessageRecord messageRecord = records.get(i);
ProducerRecord<String, byte[]> preCreatedProducerRecord = producerRecordMap.get(i);
ProducerRecord<String, byte[]> producerRecord =
preCreatedProducerRecord == null ? toProducerRecord(messageRecord) : preCreatedProducerRecord;
contexts[i] = new MessageProcessingContext().setProducerRecord(producerRecord).setMessageRecord(messageRecord)
.setShardPartition(shardPartition);
MessageProcessingContext context = contexts[i];

MessageInterceptionDecision interceptionDecision = interceptionDecisions == null ? null : interceptionDecisions.get(i);
if (interceptionDecision != null) {
if (interceptionDecision == MessageInterceptionDecision.DISCARD) {
log.warn("Discarding message {}:{}.", shardPartition, messageRecord.getId());
context.setAcked(true);
continue;
} else if (interceptionDecision == MessageInterceptionDecision.RETRY) {
// In this context retry means - allowing interceptors to try to execute their logic again.
continue;
MDC.put(properties.getMdc().getMessageIdKey(), String.valueOf(messageRecord.getId()));
try {
ProducerRecord<String, byte[]> preCreatedProducerRecord = producerRecordMap.get(i);
ProducerRecord<String, byte[]> producerRecord =
preCreatedProducerRecord == null ? toProducerRecord(messageRecord) : preCreatedProducerRecord;
contexts[i] = new MessageProcessingContext().setProducerRecord(producerRecord).setMessageRecord(messageRecord)
.setShardPartition(shardPartition);
MessageProcessingContext context = contexts[i];

MessageInterceptionDecision interceptionDecision = interceptionDecisions == null ? null : interceptionDecisions.get(i);
if (interceptionDecision != null) {
if (interceptionDecision == MessageInterceptionDecision.DISCARD) {
log.warn("Discarding message {}:{}.", shardPartition, messageRecord.getId());
context.setAcked(true);
continue;
} else if (interceptionDecision == MessageInterceptionDecision.RETRY) {
// In this context retry means - allowing interceptors to try to execute their logic again.
continue;
}
}
}

try {
// Theoretically, to be absolutely sure, about the ordering, we would need to wait for the future result immediately.
// But it would not be practical. I mean we could send one message from each partitions concurrently, but
// there is a high chance that all the messages in this thread would reside in the same transaction, so it would not work.
// TODO: Consider transactions. They would need heavy performance testing though.
Future<RecordMetadata> future = kafkaProducer.send(producerRecord, (metadata, exception) -> {
try {
shardPartition.putIntoMdc();

if (exception == null) {
context.setAcked(true);
fireMessageAcknowledgedEvent(shardPartition, messageRecord.getId(), producerRecord);
Instant insertTime = messageRecord.getMessage().hasInsertTimestamp()
? Instant.ofEpochMilli(messageRecord.getMessage().getInsertTimestamp().getValue()) : null;
metricsTemplate.recordProxyMessageSendSuccess(shardPartition, producerRecord.topic(), insertTime);
} else {
failedSendsCount.incrementAndGet();
handleKafkaError(shardPartition, "Sending message " + messageRecord.getId() + " in " + shardPartition + " failed.",
exception,
context);
metricsTemplate.recordProxyMessageSendFailure(shardPartition, producerRecord.topic());
try {
// Theoretically, to be absolutely sure, about the ordering, we would need to wait for the future result immediately.
// But it would not be practical. I mean we could send one message from each partitions concurrently, but
// there is a high chance that all the messages in this thread would reside in the same transaction, so it would not work.
// TODO: Consider transactions. They would need heavy performance testing though.
Future<RecordMetadata> future = kafkaProducer.send(producerRecord, (metadata, exception) -> {
MDC.put(properties.getMdc().getMessageIdKey(), String.valueOf(messageRecord.getId()));
try {
shardPartition.putIntoMdc();

if (exception == null) {
context.setAcked(true);
fireMessageAcknowledgedEvent(shardPartition, messageRecord.getId(), producerRecord);
Instant insertTime = messageRecord.getMessage().hasInsertTimestamp()
? Instant.ofEpochMilli(messageRecord.getMessage().getInsertTimestamp().getValue()) : null;
metricsTemplate.recordProxyMessageSendSuccess(shardPartition, producerRecord.topic(), insertTime);
} else {
failedSendsCount.incrementAndGet();
handleKafkaError(shardPartition, "Sending message " + messageRecord.getId() + " in " + shardPartition + " failed.",
exception,
context);
metricsTemplate.recordProxyMessageSendFailure(shardPartition, producerRecord.topic());
}
} finally {
shardPartition.removeFromMdc();
MDC.remove(properties.getMdc().getMessageIdKey());
}
} finally {
shardPartition.removeFromMdc();
}
});
atLeastOneSendDone = true;
});
atLeastOneSendDone = true;

contexts[i].setKafkaSenderFuture(future);
} catch (Throwable t) {
failedSendsCount.incrementAndGet();
handleKafkaError(shardPartition, "Sending message " + messageRecord.getId() + " in " + shardPartition + " failed.", t, context);
contexts[i].setKafkaSenderFuture(future);
} catch (Throwable t) {
failedSendsCount.incrementAndGet();
handleKafkaError(shardPartition, "Sending message " + messageRecord.getId() + " in " + shardPartition + " failed.", t, context);
}
} finally {
MDC.remove(properties.getMdc().getMessageIdKey());
}
}

Expand All @@ -335,7 +344,8 @@ private void poll(Control control, TkmsShardPartition shardPartition) {
try {
context.getKafkaSenderFuture().get();
} catch (Throwable t) {
handleKafkaError(shardPartition, "Sending message in " + shardPartition + " failed.", t, context);
handleKafkaError(shardPartition, "Sending message " + context.getMessageRecord().getId() + " in " + shardPartition + " failed.",
t, context);
}
}
}
Expand Down
Loading

0 comments on commit e4ececc

Please sign in to comment.