Skip to content

Commit

Permalink
flaky test check
Browse files Browse the repository at this point in the history
  • Loading branch information
z3d1k committed Feb 2, 2024
1 parent f39b39e commit b0d75f5
Showing 1 changed file with 36 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -459,47 +459,43 @@ public CompletableFuture<Void> subscribeToShard(

requests.add(request);

return CompletableFuture.supplyAsync(
() -> {
responseHandler.responseReceived(
SubscribeToShardResponse.builder().build());
responseHandler.onEventStream(
subscriber -> {
final List<SubscribeToShardEvent> eventsToSend;

if (remainingSubscriptions > 0) {
eventsToSend = getEventsToSend();
remainingSubscriptions--;
} else {
eventsToSend =
Collections.singletonList(
SubscribeToShardEvent.builder()
.millisBehindLatest(0L)
.continuationSequenceNumber(null)
.build());
}

Subscription subscription = mock(Subscription.class);
Iterator<SubscribeToShardEvent> iterator =
eventsToSend.iterator();

doAnswer(
a -> {
if (!iterator.hasNext()) {
completeSubscription(subscriber);
} else {
subscriber.onNext(iterator.next());
}

return null;
})
.when(subscription)
.request(anyLong());

subscriber.onSubscribe(subscription);
});
return null;
responseHandler.responseReceived(SubscribeToShardResponse.builder().build());
responseHandler.onEventStream(
subscriber -> {
final List<SubscribeToShardEvent> eventsToSend;

synchronized (this) {
if (remainingSubscriptions > 0) {
eventsToSend = getEventsToSend();
remainingSubscriptions--;
} else {
eventsToSend =
Collections.singletonList(
SubscribeToShardEvent.builder()
.millisBehindLatest(0L)
.continuationSequenceNumber(null)
.build());
}
}
Subscription subscription = mock(Subscription.class);
Iterator<SubscribeToShardEvent> iterator = eventsToSend.iterator();

doAnswer(
a -> {
if (!iterator.hasNext()) {
completeSubscription(subscriber);
} else {
subscriber.onNext(iterator.next());
}

return null;
})
.when(subscription)
.request(anyLong());

subscriber.onSubscribe(subscription);
});
return CompletableFuture.completedFuture(null);
}

void completeSubscription(Subscriber<? super SubscribeToShardEventStream> subscriber) {
Expand Down

0 comments on commit b0d75f5

Please sign in to comment.