Skip to content

Commit

Permalink
Improve the reliability of SSE.
Browse files Browse the repository at this point in the history
  • Loading branch information
overcat committed Nov 7, 2023
1 parent e8cf617 commit d6c9a84
Show file tree
Hide file tree
Showing 12 changed files with 177 additions and 26 deletions.
12 changes: 10 additions & 2 deletions src/main/java/org/stellar/sdk/requests/AccountsRequestBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,19 @@ public static Page<AccountResponse> execute(OkHttpClient httpClient, HttpUrl uri
* @see <a href="https://developers.stellar.org/api/introduction/response-format/"
* target="_blank">Response Format documentation</a>
* @param listener {@link EventListener} implementation with {@link AccountResponse} type
* @param reconnectTimeout Custom stream connection timeout in ms
* @return EventSource object, so you can <code>close()</code> connection when not needed anymore
*/
public SSEStream<AccountResponse> stream(final EventListener<AccountResponse> listener) {
public SSEStream<AccountResponse> stream(
final EventListener<AccountResponse> listener, long reconnectTimeout) {
return SSEStream.create(httpClient, this, AccountResponse.class, listener, reconnectTimeout);
}

return SSEStream.create(httpClient, this, AccountResponse.class, listener);
/**
* An overloaded version of {@link #stream(EventListener, long)} with default reconnect timeout.
*/
public SSEStream<AccountResponse> stream(final EventListener<AccountResponse> listener) {
return stream(listener, SSEStream.DEFAULT_RECONNECT_TIMEOUT);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,19 @@ public static Page<EffectResponse> execute(OkHttpClient httpClient, HttpUrl uri)
* @see <a href="https://developers.stellar.org/api/introduction/response-format/"
* target="_blank">Response Format documentation</a>
* @param listener {@link EventListener} implementation with {@link EffectResponse} type
* @param reconnectTimeout Custom stream connection timeout in ms
* @return EventSource object, so you can <code>close()</code> connection when not needed anymore
*/
public SSEStream<EffectResponse> stream(
final EventListener<EffectResponse> listener, long reconnectTimeout) {
return SSEStream.create(httpClient, this, EffectResponse.class, listener, reconnectTimeout);
}

/**
* An overloaded version of {@link #stream(EventListener, long)} with default reconnect timeout.
*/
public SSEStream<EffectResponse> stream(final EventListener<EffectResponse> listener) {
return SSEStream.create(httpClient, this, EffectResponse.class, listener);
return stream(listener, SSEStream.DEFAULT_RECONNECT_TIMEOUT);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,19 @@ public static Page<LedgerResponse> execute(OkHttpClient httpClient, HttpUrl uri)
* @see <a href="https://developers.stellar.org/api/introduction/response-format/"
* target="_blank">Response Format documentation</a>
* @param listener {@link EventListener} implementation with {@link LedgerResponse} type
* @param reconnectTimeout Custom stream connection timeout in ms
* @return EventSource object, so you can <code>close()</code> connection when not needed anymore
*/
public SSEStream<LedgerResponse> stream(
final EventListener<LedgerResponse> listener, long reconnectTimeout) {
return SSEStream.create(httpClient, this, LedgerResponse.class, listener, reconnectTimeout);
}

/**
* An overloaded version of {@link #stream(EventListener, long)} with default reconnect timeout.
*/
public SSEStream<LedgerResponse> stream(final EventListener<LedgerResponse> listener) {
return SSEStream.create(httpClient, this, LedgerResponse.class, listener);
return stream(listener, SSEStream.DEFAULT_RECONNECT_TIMEOUT);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
import okhttp3.Request;
import okhttp3.Response;
import org.stellar.sdk.LiquidityPoolID;
import org.stellar.sdk.responses.LiquidityPoolResponse;
import org.stellar.sdk.responses.Page;
import org.stellar.sdk.responses.*;

/** Builds requests connected to liquidity pools. */
public class LiquidityPoolsRequestBuilder extends RequestBuilder {
Expand Down Expand Up @@ -116,12 +115,21 @@ public static Page<LiquidityPoolResponse> execute(OkHttpClient httpClient, HttpU
* @see <a href="https://developers.stellar.org/api/introduction/response-format/"
* target="_blank">Response Format documentation</a>
* @param listener {@link EventListener} implementation with {@link LiquidityPoolResponse} type
* @param reconnectTimeout Custom stream connection timeout in ms
* @return EventSource object, so you can <code>close()</code> connection when not needed anymore
*/
public SSEStream<LiquidityPoolResponse> stream(
final EventListener<LiquidityPoolResponse> listener) {
final EventListener<LiquidityPoolResponse> listener, long reconnectTimeout) {
return SSEStream.create(
httpClient, this, LiquidityPoolResponse.class, listener, reconnectTimeout);
}

return SSEStream.create(httpClient, this, LiquidityPoolResponse.class, listener);
/**
* An overloaded version of {@link #stream(EventListener, long)} with default reconnect timeout.
*/
public SSEStream<LiquidityPoolResponse> stream(
final EventListener<LiquidityPoolResponse> listener) {
return stream(listener, SSEStream.DEFAULT_RECONNECT_TIMEOUT);
}

/**
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/org/stellar/sdk/requests/OffersRequestBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,19 @@ public static Page<OfferResponse> execute(OkHttpClient httpClient, HttpUrl uri)
* @see <a href="https://developers.stellar.org/api/introduction/response-format/"
* target="_blank">Response Format documentation</a>
* @param listener {@link EventListener} implementation with {@link OfferResponse} type
* @param reconnectTimeout Custom stream connection timeout in ms
* @return EventSource object, so you can <code>close()</code> connection when not needed anymore
*/
public SSEStream<OfferResponse> stream(
final EventListener<OfferResponse> listener, long reconnectTimeout) {
return SSEStream.create(httpClient, this, OfferResponse.class, listener, reconnectTimeout);
}

/**
* An overloaded version of {@link #stream(EventListener, long)} with default reconnect timeout.
*/
public SSEStream<OfferResponse> stream(final EventListener<OfferResponse> listener) {
return SSEStream.create(httpClient, this, OfferResponse.class, listener);
return stream(listener, SSEStream.DEFAULT_RECONNECT_TIMEOUT);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,10 +190,19 @@ public static Page<OperationResponse> execute(OkHttpClient httpClient, HttpUrl u
* @see <a href="https://developers.stellar.org/api/introduction/response-format/"
* target="_blank">Response Format documentation</a>
* @param listener {@link OperationResponse} implementation with {@link OperationResponse} type
* @param reconnectTimeout Custom stream connection timeout in ms
* @return EventSource object, so you can <code>close()</code> connection when not needed anymore
*/
public SSEStream<OperationResponse> stream(
final EventListener<OperationResponse> listener, long reconnectTimeout) {
return SSEStream.create(httpClient, this, OperationResponse.class, listener, reconnectTimeout);
}

/**
* An overloaded version of {@link #stream(EventListener, long)} with default reconnect timeout.
*/
public SSEStream<OperationResponse> stream(final EventListener<OperationResponse> listener) {
return SSEStream.create(httpClient, this, OperationResponse.class, listener);
return stream(listener, SSEStream.DEFAULT_RECONNECT_TIMEOUT);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,19 @@ public static OrderBookResponse execute(OkHttpClient httpClient, HttpUrl uri)
* @see <a href="https://developers.stellar.org/api/introduction/response-format/"
* target="_blank">Response Format documentation</a>
* @param listener {@link OrderBookResponse} implementation with {@link OrderBookResponse} type
* @param reconnectTimeout Custom stream connection timeout in ms
* @return EventSource object, so you can <code>close()</code> connection when not needed anymore
*/
public SSEStream<OrderBookResponse> stream(
final EventListener<OrderBookResponse> listener, long reconnectTimeout) {
return SSEStream.create(httpClient, this, OrderBookResponse.class, listener, reconnectTimeout);
}

/**
* An overloaded version of {@link #stream(EventListener, long)} with default reconnect timeout.
*/
public SSEStream<OrderBookResponse> stream(final EventListener<OrderBookResponse> listener) {
return SSEStream.create(httpClient, this, OrderBookResponse.class, listener);
return stream(listener, SSEStream.DEFAULT_RECONNECT_TIMEOUT);
}

public OrderBookResponse execute() throws IOException, TooManyRequestsException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,19 @@ public static Page<OperationResponse> execute(OkHttpClient httpClient, HttpUrl u
* @see <a href="https://developers.stellar.org/api/introduction/response-format/"
* target="_blank">Response Format documentation</a>
* @param listener {@link EventListener} implementation with {@link OperationResponse} type
* @param reconnectTimeout Custom stream connection timeout in ms
* @return EventSource object, so you can <code>close()</code> connection when not needed anymore
*/
public SSEStream<OperationResponse> stream(
final EventListener<OperationResponse> listener, long reconnectTimeout) {
return SSEStream.create(httpClient, this, OperationResponse.class, listener, reconnectTimeout);
}

/**
* An overloaded version of {@link #stream(EventListener, long)} with default reconnect timeout.
*/
public SSEStream<OperationResponse> stream(final EventListener<OperationResponse> listener) {
return SSEStream.create(httpClient, this, OperationResponse.class, listener);
return stream(listener, SSEStream.DEFAULT_RECONNECT_TIMEOUT);
}

/**
Expand Down
69 changes: 63 additions & 6 deletions src/main/java/org/stellar/sdk/requests/SSEStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -23,29 +25,42 @@
import org.stellar.sdk.responses.Pageable;

public class SSEStream<T extends org.stellar.sdk.responses.Response> implements Closeable {
static final long DEFAULT_RECONNECT_TIMEOUT = 15 * 1000L;
private final OkHttpClient okHttpClient;
private final RequestBuilder requestBuilder;
private final Class<T> responseClass;
private final EventListener<T> listener;

private final AtomicBoolean isStopped = new AtomicBoolean(false);
private final AtomicBoolean serverSideClosed =
new AtomicBoolean(true); // make sure we start correctly

// When the client closes the connection itself, it will be set to true.
// This is for handling cases where the SSE (Server-Sent Events) does not
// receive a response for a long time.
// If the server requests us to close the connection, we will set serverSideClosed to true.
private final AtomicBoolean clientSideClosed = new AtomicBoolean(true);
private final ScheduledExecutorService clientTimeoutTimer =
Executors.newSingleThreadScheduledExecutor();
private final AtomicLong latestEventTime =
new AtomicLong(0); // The timestamp of the last received event.
private final AtomicReference<String> lastEventId = new AtomicReference<String>(null);
private ExecutorService executorService;
private final ExecutorService executorService;
private EventSource eventSource = null;
private final Lock lock = new ReentrantLock();
private final long reconnectTimeout;

private SSEStream(
final OkHttpClient okHttpClient,
final RequestBuilder requestBuilder,
final Class<T> responseClass,
final EventListener<T> listener) {
final EventListener<T> listener,
final long reconnectTimeout) {
// Create a new client with no read timeout
this.okHttpClient = okHttpClient.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build();
this.requestBuilder = requestBuilder;
this.responseClass = responseClass;
this.listener = listener;
this.reconnectTimeout = reconnectTimeout;

executorService = Executors.newSingleThreadExecutor();
requestBuilder.buildUri(); // call this once to add the segments
Expand All @@ -62,9 +77,10 @@ public void run() {
while (!isStopped.get()) {
try {
Thread.sleep(200);
if (serverSideClosed.get()) {
if (serverSideClosed.get() || clientSideClosed.get()) {
// don't restart until true again
serverSideClosed.set(false);
clientSideClosed.set(false);
if (!isStopped.get()) {
lock.lock();
try {
Expand All @@ -83,13 +99,29 @@ public void run() {
}
}
});

// Start a timer to check if the client has not received any event for a long time.
// If so, we will close the connection and restart it.
clientTimeoutTimer.scheduleAtFixedRate(
() -> {
if (System.currentTimeMillis() - latestEventTime.get() > reconnectTimeout) {
this.latestEventTime.set(System.currentTimeMillis());
clientSideClosed.set(true);
}
},
0,
300,
TimeUnit.MILLISECONDS);
}

public String lastPagingToken() {
return lastEventId.get();
}

private void restart() {
if (eventSource != null) {
eventSource.cancel();
}
eventSource =
doStreamRequest(
this,
Expand Down Expand Up @@ -118,8 +150,10 @@ static <T extends org.stellar.sdk.responses.Response> SSEStream<T> create(
final OkHttpClient okHttpClient,
final RequestBuilder requestBuilder,
final Class<T> responseClass,
final EventListener<T> listener) {
SSEStream<T> stream = new SSEStream<T>(okHttpClient, requestBuilder, responseClass, listener);
final EventListener<T> listener,
final long reconnectTimeout) {
SSEStream<T> stream =
new SSEStream<T>(okHttpClient, requestBuilder, responseClass, listener, reconnectTimeout);
stream.start();
return stream;
}
Expand Down Expand Up @@ -220,6 +254,9 @@ public void onFailure(
@Override
public void onEvent(
EventSource eventSource, @Nullable String id, @Nullable String type, String data) {
// Update the timestamp of the last received event.
stream.latestEventTime.set(System.currentTimeMillis());

if (data.equals("\"hello\"") || data.equals("\"byebye\"")) {
return;
}
Expand All @@ -232,4 +269,24 @@ public void onEvent(
listener.onEvent(event);
}
}

/**
* Check if the stream is stopped. Current implementation does not allow to restart the stream if
* it was stopped.
*
* @return true if the stream is stopped.
*/
public boolean isStopped() {
return isStopped.get();
}

/**
* Check if the stream is closed. Current implementation will try to restart the stream if it was
* closed.
*
* @return true if the stream is closed.
*/
public boolean isClosed() {
return serverSideClosed.get() || clientSideClosed.get();
}
}
11 changes: 10 additions & 1 deletion src/main/java/org/stellar/sdk/requests/TradesRequestBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,18 @@ public TradesRequestBuilder limit(int number) {
* @see <a href="https://developers.stellar.org/api/introduction/response-format/"
* target="_blank">Response Format documentation</a>
* @param listener {@link EventListener} implementation with {@link TradeResponse} type
* @param reconnectTimeout Custom stream connection timeout in ms
* @return EventSource object, so you can <code>close()</code> connection when not needed anymore
*/
public SSEStream<TradeResponse> stream(
final EventListener<TradeResponse> listener, long reconnectTimeout) {
return SSEStream.create(httpClient, this, TradeResponse.class, listener, reconnectTimeout);
}

/**
* An overloaded version of {@link #stream(EventListener, long)} with default reconnect timeout.
*/
public SSEStream<TradeResponse> stream(final EventListener<TradeResponse> listener) {
return SSEStream.create(httpClient, this, TradeResponse.class, listener);
return stream(listener, SSEStream.DEFAULT_RECONNECT_TIMEOUT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,20 @@ public static Page<TransactionResponse> execute(OkHttpClient httpClient, HttpUrl
* @see <a href="https://developers.stellar.org/api/introduction/response-format/"
* target="_blank">Response Format documentation</a>
* @param listener {@link EventListener} implementation with {@link TransactionResponse} type
* @param reconnectTimeout Custom stream connection timeout in ms
* @return EventSource object, so you can <code>close()</code> connection when not needed anymore
*/
public SSEStream<TransactionResponse> stream(
final EventListener<TransactionResponse> listener, long reconnectTimeout) {
return SSEStream.create(
httpClient, this, TransactionResponse.class, listener, reconnectTimeout);
}

/**
* An overloaded version of {@link #stream(EventListener, long)} with default reconnect timeout.
*/
public SSEStream<TransactionResponse> stream(final EventListener<TransactionResponse> listener) {
return SSEStream.create(httpClient, this, TransactionResponse.class, listener);
return stream(listener, SSEStream.DEFAULT_RECONNECT_TIMEOUT);
}

/**
Expand Down
Loading

0 comments on commit d6c9a84

Please sign in to comment.