Skip to content

Commit

Permalink
feat: Add flush functionality (#130)
Browse files Browse the repository at this point in the history
* feat: Add flush functionality

* refactor: Remove unused variable in test

* feat: Add debug logging for flushing
  • Loading branch information
fabriziodemaria authored May 24, 2024
1 parent a0ad7a5 commit 0483702
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 4 deletions.
15 changes: 15 additions & 0 deletions src/main/java/com/spotify/confidence/Confidence.java
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,11 @@ public void emit(
this.eventSenderEngine.emit(name, context, message);
}

@Override
public void flush() {
this.eventSenderEngine.flush();
}

@Override
public CompletableFuture<ResolveFlagsResponse> resolveFlags(
String flag, ConfidenceValue.Struct context) {
Expand Down Expand Up @@ -249,6 +254,11 @@ protected ClientDelegate client() {
public void close() throws IOException {
closed = true;
}

@Override
public void flush() {
parent.flush();
}
}

private static class RootInstance extends Confidence {
Expand All @@ -272,6 +282,11 @@ public void close() throws IOException {
client = null;
}
}

@Override
public void flush() {
client.flush();
}
}

public static class Builder {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/spotify/confidence/EventSender.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ public interface EventSender extends Contextual {

public void track(String eventName);

void flush();

@Override
EventSender withContext(ConfidenceValue.Struct context);

Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/spotify/confidence/EventSenderEngine.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,6 @@

interface EventSenderEngine extends Closeable {
void emit(String name, ConfidenceValue.Struct context, Optional<ConfidenceValue.Struct> message);

void flush();
}
15 changes: 14 additions & 1 deletion src/main/java/com/spotify/confidence/EventSenderEngineImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,26 @@ public void emit(
LockSupport.unpark(pollingThread);
}

@Override
public void flush() {
sendQueue.add(Event.newBuilder().setEventDefinition("manual_flash").build());
LockSupport.unpark(pollingThread);
}

private void pollLoop() {
Instant latestFlushTime = Instant.now();
ArrayList<com.spotify.confidence.events.v1.Event> events = new ArrayList<>();
while (true) {
final var event = sendQueue.poll();
if (event != null) {
events.add(event);
if ("manual_flash".equals(event.getEventDefinition())) {
log.debug("Starting events upload due to manual flush");
upload(events);
events = new ArrayList<>();
break;
} else {
events.add(event);
}
} else {
if (intakeClosed) break;
LockSupport.parkUntil(Instant.now().plus(maxFlushInterval).toEpochMilli());
Expand Down
33 changes: 30 additions & 3 deletions src/test/java/com/spotify/confidence/EventSenderEngineTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@

public class EventSenderEngineTest {

private final ResolverClientTestUtils.FakeFlagResolverClient fakeFlagResolverClient =
new ResolverClientTestUtils.FakeFlagResolverClient();

private final FakeClock clock = new FakeClock();

@Test
Expand Down Expand Up @@ -161,6 +158,36 @@ public void testEngineUploadsTriggeredByFlushTimeout() throws IOException, Inter
engine.close();
}

@Test
public void testFlushForcesUploadsDespiteBatchLimits() throws IOException, InterruptedException {
final FakeUploader alwaysSucceedUploader = new FakeUploader(List.of());
final EventSenderEngine engine =
new EventSenderEngineImpl(
10000, // Big batch size
alwaysSucceedUploader,
clock,
Duration.ofMillis(10000), // Long max flush interval
DEFAULT_MAX_MEMORY_CONSUMPTION);

// send only one event
engine.emit(
"navigate",
ConfidenceValue.of(ImmutableMap.of("key", ConfidenceValue.of("size"))),
Optional.empty());

engine.flush();

// wait for the upload to be triggered
Thread.sleep(100);

// assert
assertThat(alwaysSucceedUploader.uploadCalls.size()).isEqualTo(1);
assertThat(alwaysSucceedUploader.uploadCalls.peek().size()).isEqualTo(1);

// close
engine.close();
}

@Test
public void testEngineUploadsWhenIntermittentErrorWillRetry() throws IOException {
final int maxBatchSize = 3;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,9 @@ public void emit(
String name, ConfidenceValue.Struct context, Optional<ConfidenceValue.Struct> message) {
events.add(event(name, context, message).setEventTime(clock.getTimestamp()).build());
}

@Override
public void flush() {
// NOOP
}
}

0 comments on commit 0483702

Please sign in to comment.