Skip to content

Commit

Permalink
address commments
Browse files Browse the repository at this point in the history
  • Loading branch information
Rkee committed Jun 2, 2024
1 parent f1232fc commit a882647
Show file tree
Hide file tree
Showing 7 changed files with 35 additions and 58 deletions.
3 changes: 1 addition & 2 deletions src/main/java/io/lettuce/core/AbstractRedisClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import java.util.concurrent.atomic.AtomicInteger;

import reactor.core.publisher.Mono;
import io.lettuce.core.event.RecordableEvent;
import io.lettuce.core.event.command.CommandListener;
import io.lettuce.core.event.connection.ConnectEvent;
import io.lettuce.core.event.connection.ConnectionCreatedEvent;
Expand Down Expand Up @@ -401,7 +400,7 @@ protected <K, V, T extends RedisChannelHandler<K, V>> ConnectionFuture<T> initia
String uriString = connectionBuilder.getRedisURI().toString();

EventRecorder.getInstance().record(new ConnectionCreatedEvent(uriString, connectionBuilder.endpoint().getId()));
RecordableEvent event = EventRecorder.getInstance()
EventRecorder.RecordableEvent event = EventRecorder.getInstance()
.start(new ConnectEvent(uriString, connectionBuilder.endpoint().getId()));

channelReadyFuture.whenComplete((channel, throwable) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import io.lettuce.core.cluster.topology.TopologyComparators;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.event.RecordableEvent;
import io.lettuce.core.event.jfr.EventRecorder;
import io.lettuce.core.internal.Exceptions;
import io.lettuce.core.internal.Futures;
Expand Down Expand Up @@ -887,7 +886,7 @@ public CompletionStage<Void> refreshPartitionsAsync() {
sources.add(redisURI);
}

RecordableEvent event = EventRecorder.getInstance().start(new TopologyRefreshEvent(sources));
EventRecorder.RecordableEvent event = EventRecorder.getInstance().start(new TopologyRefreshEvent(sources));

if (partitions == null) {
return initializePartitions().thenAccept(Partitions::updateCache)
Expand Down
8 changes: 6 additions & 2 deletions src/main/java/io/lettuce/core/event/DefaultEventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,15 @@ public Flux<Event> get() {
@Override
public void publish(Event event) {

recorder.publish(event);
final Event sourceEvent = (event instanceof EventRecorder.RecordableEvent)
? ((EventRecorder.RecordableEvent) event).getSource()
: event;

recorder.record(sourceEvent);

Sinks.EmitResult emitResult;

while ((emitResult = bus.tryEmitNext(event)) == Sinks.EmitResult.FAIL_NON_SERIALIZED) {
while ((emitResult = bus.tryEmitNext(sourceEvent)) == Sinks.EmitResult.FAIL_NON_SERIALIZED) {
// busy-loop
}

Expand Down
18 changes: 0 additions & 18 deletions src/main/java/io/lettuce/core/event/RecordableEvent.java

This file was deleted.

18 changes: 16 additions & 2 deletions src/main/java/io/lettuce/core/event/jfr/EventRecorder.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package io.lettuce.core.event.jfr;

import io.lettuce.core.event.Event;
import io.lettuce.core.event.RecordableEvent;

/**
* Event recorder that can delegate events from the {@link io.lettuce.core.event.EventBus} into a recording facility such as
Expand Down Expand Up @@ -39,6 +38,21 @@ static EventRecorder getInstance() {
*/
RecordableEvent start(Event event);

void publish(Event event);
/**
* Interface defining a recordable event that is recorded on calling {@link #record()}.
*/
interface RecordableEvent extends Event {

/**
* Complete the event recording.
*/
void record();

/**
* Get the source event.
*/
Event getSource();

}

}
35 changes: 10 additions & 25 deletions src/main/java/io/lettuce/core/event/jfr/JfrEventRecorder.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import java.util.Map;

import io.lettuce.core.event.Event;
import io.lettuce.core.event.RecordableEvent;
import io.lettuce.core.internal.LettuceAssert;
import io.lettuce.core.internal.LettuceClassUtils;

Expand All @@ -31,10 +30,14 @@ public void record(Event event) {

LettuceAssert.notNull(event, "Event must not be null");

jdk.jfr.Event jfrEvent = createEvent(event);
if (event instanceof RecordableEvent) {
((RecordableEvent) event).record();
} else {
jdk.jfr.Event jfrEvent = createEvent(event);

if (jfrEvent != null) {
jfrEvent.commit();
if (jfrEvent != null) {
jfrEvent.commit();
}
}
}

Expand All @@ -54,24 +57,6 @@ public RecordableEvent start(Event event) {
return NoOpEventRecorder.INSTANCE;
}

/**
* When the given event is an instance of RecordableEvent, this method works the same as {@link #record}.
* Otherwise, do nothing.
*
* @param event the event to be published
*/
@Override
public void publish(Event event) {

LettuceAssert.notNull(event, "Event must not be null");

if (event instanceof RecordableEvent) {
((RecordableEvent) event).record();
} else {
record(event);
}
}

private Constructor<?> getEventConstructor(Event event) throws NoSuchMethodException {

Constructor<?> constructor;
Expand Down Expand Up @@ -119,12 +104,12 @@ private jdk.jfr.Event createEvent(Event event) {

class JfrRecordableEvent implements RecordableEvent {

private final Event originalEvent;
private final Event sourceEvent;

private final jdk.jfr.Event jfrEvent;

public JfrRecordableEvent(Event event) {
this.originalEvent = event;
this.sourceEvent = event;
this.jfrEvent = createEvent(event);
}

Expand All @@ -136,7 +121,7 @@ public void record() {

@Override
public Event getSource() {
return originalEvent;
return sourceEvent;
}

public jdk.jfr.Event getJfrEvent() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,15 +1,14 @@
package io.lettuce.core.event.jfr;

import io.lettuce.core.event.Event;
import io.lettuce.core.event.RecordableEvent;

/**
* No-op implementation.
*
* @author Mark Paluch
* @since 6.1
*/
public final class NoOpEventRecorder implements EventRecorder, RecordableEvent {
public final class NoOpEventRecorder implements EventRecorder, EventRecorder.RecordableEvent {

public final static NoOpEventRecorder INSTANCE = new NoOpEventRecorder();

Expand All @@ -32,11 +31,6 @@ public RecordableEvent start(Event event) {
return this;
}

@Override
public void publish(Event event) {

}

@Override
public void record() {
}
Expand Down

0 comments on commit a882647

Please sign in to comment.