Skip to content

Commit

Permalink
add extra stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
hussainkarafallah committed Aug 6, 2024
1 parent e8846b8 commit 7eab357
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,20 @@

import com.transferwise.kafka.tkms.api.TkmsMessage.Header;
import java.util.List;
import org.springframework.core.Ordered;

public interface ITkmsMessageDecorator {
public interface ITkmsMessageDecorator extends Ordered {

default List<Header> getHeaders(TkmsMessage message) {
default List<Header> getAdditionalHeaders(TkmsMessage message) {
return List.of();
}

default TkmsShardPartition getOverridedPartition(TkmsMessage message) {
return null;
}

default int getOrder() {
return LOWEST_PRECEDENCE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -80,10 +80,15 @@ public TkmsMessage addHeader(Header header) {
}

public TkmsMessage accept(ITkmsMessageDecorator decorator) {
var headers = decorator.getHeaders(this);
var headers = decorator.getAdditionalHeaders(this);
if (headers != null) {
headers.forEach(this::addHeader);
}
var overridedPartition = decorator.getOverridedPartition(this);
if (overridedPartition != null) {
setShard(overridedPartition.getShard());
setPartition(overridedPartition.getPartition());
}
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ public class TkmsShardPartition {
private Tag micrometerPartitionTag;
private String stringPresentation;

private TkmsShardPartition(int shard, int partition) {
public TkmsShardPartition(int shard, int partition) {
this.shard = shard;
this.partition = partition;
this.micrometerShardTag = Tag.of("shard", String.valueOf(shard));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public IMeterCache twDefaultMeterCache(MeterRegistry meterRegistry) {
public TransactionsHelper twTransactionsHelper() {
return new TransactionsHelper();
}

@Bean
@ConditionalOnMissingBean(ITkmsMessageDecorator.class)
public List<ITkmsMessageDecorator> messageDecorators() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,20 @@ void messagesAreDecorateWithJambi() {

transactionsHelper.withTransaction().run(() ->
transactionalKafkaMessageSender.sendMessages(new SendMessagesRequest()
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("adam-jones").setValue(someValue))
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("danny-carey").setValue(someValue))
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("adam-jones").setShard(4).setValue(someValue))
.addTkmsMessage(new TkmsMessage().setTopic(topic).setKey("danny-carey").setPartition(5).setValue(someValue))
));

await().until(() -> tkmsSentMessagesCollector.getSentMessages(topic).size() == 2);
var messages = tkmsSentMessagesCollector.getSentMessages(topic);

assertEquals(2, messages.size());
checkForHeader(messages.get(0), "tool", "jambi");
assertEquals(0, messages.get(0).getShardPartition().getShard());
assertEquals(0, messages.get(0).getShardPartition().getPartition());
checkForHeader(messages.get(1), "tool", "jambi");
assertEquals(0, messages.get(1).getShardPartition().getShard());
assertEquals(0, messages.get(1).getShardPartition().getPartition());
}

private void checkForHeader(SentMessage sentMessage, String key, String value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.transferwise.kafka.tkms.api.ITkmsMessageDecorator;
import com.transferwise.kafka.tkms.api.TkmsMessage;
import com.transferwise.kafka.tkms.api.TkmsMessage.Header;
import com.transferwise.kafka.tkms.api.TkmsShardPartition;
import java.nio.charset.StandardCharsets;
import java.util.List;
import org.springframework.stereotype.Component;
Expand All @@ -11,12 +12,17 @@
public class TestMessageDecorator implements ITkmsMessageDecorator {

@Override
public List<Header> getHeaders(TkmsMessage message) {
public List<Header> getAdditionalHeaders(TkmsMessage message) {
var h1 = new Header().setKey("tool").setValue("jambi".getBytes(StandardCharsets.UTF_8));
if (message.getValue() != null && new String(message.getValue(), StandardCharsets.UTF_8).startsWith("Here from")) {
return List.of(h1);
}
return List.of();
}

@Override
public TkmsShardPartition getOverridedPartition(TkmsMessage message) {
return new TkmsShardPartition(0, 0);
}

}

0 comments on commit 7eab357

Please sign in to comment.