diff --git a/src/main/java/com/spotify/confidence/Confidence.java b/src/main/java/com/spotify/confidence/Confidence.java index 19cda817..1ff6f49c 100644 --- a/src/main/java/com/spotify/confidence/Confidence.java +++ b/src/main/java/com/spotify/confidence/Confidence.java @@ -210,6 +210,11 @@ public void emit( this.eventSenderEngine.emit(name, context, message); } + @Override + public void flush() { + this.eventSenderEngine.flush(); + } + @Override public CompletableFuture resolveFlags( String flag, ConfidenceValue.Struct context) { @@ -249,6 +254,11 @@ protected ClientDelegate client() { public void close() throws IOException { closed = true; } + + @Override + public void flush() { + parent.flush(); + } } private static class RootInstance extends Confidence { @@ -272,6 +282,11 @@ public void close() throws IOException { client = null; } } + + @Override + public void flush() { + client.flush(); + } } public static class Builder { diff --git a/src/main/java/com/spotify/confidence/EventSender.java b/src/main/java/com/spotify/confidence/EventSender.java index e1164d2d..97febc29 100644 --- a/src/main/java/com/spotify/confidence/EventSender.java +++ b/src/main/java/com/spotify/confidence/EventSender.java @@ -9,6 +9,8 @@ public interface EventSender extends Contextual { public void track(String eventName); + void flush(); + @Override EventSender withContext(ConfidenceValue.Struct context); diff --git a/src/main/java/com/spotify/confidence/EventSenderEngine.java b/src/main/java/com/spotify/confidence/EventSenderEngine.java index 0494040c..27d2fc1d 100644 --- a/src/main/java/com/spotify/confidence/EventSenderEngine.java +++ b/src/main/java/com/spotify/confidence/EventSenderEngine.java @@ -5,4 +5,6 @@ interface EventSenderEngine extends Closeable { void emit(String name, ConfidenceValue.Struct context, Optional message); + + void flush(); } diff --git a/src/main/java/com/spotify/confidence/EventSenderEngineImpl.java b/src/main/java/com/spotify/confidence/EventSenderEngineImpl.java index 1c7da912..9552dbbe 100644 --- a/src/main/java/com/spotify/confidence/EventSenderEngineImpl.java +++ b/src/main/java/com/spotify/confidence/EventSenderEngineImpl.java @@ -91,13 +91,26 @@ public void emit( LockSupport.unpark(pollingThread); } + @Override + public void flush() { + sendQueue.add(Event.newBuilder().setEventDefinition("manual_flash").build()); + LockSupport.unpark(pollingThread); + } + private void pollLoop() { Instant latestFlushTime = Instant.now(); ArrayList events = new ArrayList<>(); while (true) { final var event = sendQueue.poll(); if (event != null) { - events.add(event); + if ("manual_flash".equals(event.getEventDefinition())) { + log.debug("Starting events upload due to manual flush"); + upload(events); + events = new ArrayList<>(); + break; + } else { + events.add(event); + } } else { if (intakeClosed) break; LockSupport.parkUntil(Instant.now().plus(maxFlushInterval).toEpochMilli()); diff --git a/src/test/java/com/spotify/confidence/EventSenderEngineTest.java b/src/test/java/com/spotify/confidence/EventSenderEngineTest.java index f28bfc07..53976d2a 100644 --- a/src/test/java/com/spotify/confidence/EventSenderEngineTest.java +++ b/src/test/java/com/spotify/confidence/EventSenderEngineTest.java @@ -17,9 +17,6 @@ public class EventSenderEngineTest { - private final ResolverClientTestUtils.FakeFlagResolverClient fakeFlagResolverClient = - new ResolverClientTestUtils.FakeFlagResolverClient(); - private final FakeClock clock = new FakeClock(); @Test @@ -161,6 +158,36 @@ public void testEngineUploadsTriggeredByFlushTimeout() throws IOException, Inter engine.close(); } + @Test + public void testFlushForcesUploadsDespiteBatchLimits() throws IOException, InterruptedException { + final FakeUploader alwaysSucceedUploader = new FakeUploader(List.of()); + final EventSenderEngine engine = + new EventSenderEngineImpl( + 10000, // Big batch size + alwaysSucceedUploader, + clock, + Duration.ofMillis(10000), // Long max flush interval + DEFAULT_MAX_MEMORY_CONSUMPTION); + + // send only one event + engine.emit( + "navigate", + ConfidenceValue.of(ImmutableMap.of("key", ConfidenceValue.of("size"))), + Optional.empty()); + + engine.flush(); + + // wait for the upload to be triggered + Thread.sleep(100); + + // assert + assertThat(alwaysSucceedUploader.uploadCalls.size()).isEqualTo(1); + assertThat(alwaysSucceedUploader.uploadCalls.peek().size()).isEqualTo(1); + + // close + engine.close(); + } + @Test public void testEngineUploadsWhenIntermittentErrorWillRetry() throws IOException { final int maxBatchSize = 3; diff --git a/src/test/java/com/spotify/confidence/FakeEventSenderEngine.java b/src/test/java/com/spotify/confidence/FakeEventSenderEngine.java index 9e52e508..8741e64b 100644 --- a/src/test/java/com/spotify/confidence/FakeEventSenderEngine.java +++ b/src/test/java/com/spotify/confidence/FakeEventSenderEngine.java @@ -28,4 +28,9 @@ public void emit( String name, ConfidenceValue.Struct context, Optional message) { events.add(event(name, context, message).setEventTime(clock.getTimestamp()).build()); } + + @Override + public void flush() { + // NOOP + } }