Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UBO-122 add RateLimitedHttpClient #123

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package org.mycore.ubo.http;

import java.io.IOException;
import java.net.Authenticator;
import java.net.CookieHandler;
import java.net.ProxySelector;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;

import org.mycore.common.MCRException;

import com.google.common.util.concurrent.RateLimiter;

public class RateLimitedHttpClient extends HttpClient {
private final HttpClient client;

private final RateLimiter rateLimiter;

RateLimitedHttpClient(HttpClient client, RateLimiter rateLimiter) {
this.client = client;
this.rateLimiter = rateLimiter;
}

public double getRate() {
return rateLimiter.getRate();
}

@Override public Optional<CookieHandler> cookieHandler() {
return client.cookieHandler();
}

@Override public Optional<Duration> connectTimeout() {
return client.connectTimeout();
}

@Override public Redirect followRedirects() {
return client.followRedirects();
}

@Override public Optional<ProxySelector> proxy() {
return client.proxy();
}

@Override public SSLContext sslContext() {
return client.sslContext();
}

@Override public SSLParameters sslParameters() {
return client.sslParameters();
}

@Override public Optional<Authenticator> authenticator() {
return client.authenticator();
}

@Override public Version version() {
return client.version();
}

@Override public Optional<Executor> executor() {
return client.executor();
}

@Override public <T> HttpResponse<T> send(HttpRequest request,
HttpResponse.BodyHandler<T> responseBodyHandler) throws IOException, InterruptedException {
try {
return sendAsync(request, responseBodyHandler).get();
} catch (ExecutionException e) {
final Throwable cause = e.getCause();
if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
if (cause instanceof IOException) {
throw (IOException) cause;
}
if (cause instanceof InterruptedException) {
throw (InterruptedException) cause;
}
throw new IOException(cause);
}
}

@Override public <T> CompletableFuture<HttpResponse<T>> sendAsync(
HttpRequest request, HttpResponse.BodyHandler<T> responseBodyHandler) {
return sendAsync(request, responseBodyHandler, null);
}

@Override public <T> CompletableFuture<HttpResponse<T>> sendAsync(
HttpRequest request, HttpResponse.BodyHandler<T> responseBodyHandler,
HttpResponse.PushPromiseHandler<T> pushPromiseHandler) {
return acquireRequestPermission(request)
.thenCompose(waitTime -> client.sendAsync(request, responseBodyHandler, pushPromiseHandler));
}

private CompletableFuture<Double> acquireRequestPermission(HttpRequest request) {
final Optional<Duration> connectionTimeOut = request.timeout()
.or(this::connectTimeout);
final Executor executor = executor()
.orElseGet(ForkJoinPool::commonPool);
return connectionTimeOut.map(duration -> CompletableFuture.supplyAsync(() -> {
final Instant start = Instant.now();
if (rateLimiter.tryAcquire(duration)) {
return Duration.between(start, Instant.now())
.get(ChronoUnit.MILLIS) / (double) 1000;
}
throw new MCRException("Timeout");
}, executor)).orElseGet(() -> CompletableFuture.supplyAsync(rateLimiter::acquire, executor));
}

public static Builder newBuilder() {
return new BuilderImpl();
}

public interface Builder extends HttpClient.Builder {
Builder rateLimit(int rate, TimeUnit unit);

@Override RateLimitedHttpClient build();
}

private static class BuilderImpl implements Builder {
private final HttpClient.Builder builder = HttpClient.newBuilder();

private RateLimiter rateLimiter;

@Override public HttpClient.Builder cookieHandler(CookieHandler cookieHandler) {
builder.cookieHandler(cookieHandler);
return this;
}

@Override public HttpClient.Builder connectTimeout(Duration duration) {
builder.connectTimeout(duration);
return this;
}

@Override public HttpClient.Builder sslContext(SSLContext sslContext) {
builder.sslContext(sslContext);
return this;
}

@Override public HttpClient.Builder sslParameters(SSLParameters sslParameters) {
builder.sslParameters(sslParameters);
return this;
}

@Override public HttpClient.Builder executor(Executor executor) {
builder.executor(executor);
return this;
}

@Override public HttpClient.Builder followRedirects(Redirect policy) {
builder.followRedirects(policy);
return this;
}

@Override public HttpClient.Builder version(Version version) {
builder.version(version);
return this;
}

@Override public HttpClient.Builder priority(int priority) {
builder.priority(priority);
return this;
}

@Override public HttpClient.Builder proxy(ProxySelector proxySelector) {
builder.proxy(proxySelector);
return this;
}

@Override public HttpClient.Builder authenticator(Authenticator authenticator) {
builder.authenticator(authenticator);
return this;
}

@Override public RateLimitedHttpClient build() {
if (rateLimiter == null) {
throw new IllegalStateException("No rate limit defined!");
}
return new RateLimitedHttpClient(builder.build(), rateLimiter);
}

@Override public Builder rateLimit(int rate, TimeUnit unit) {
rateLimiter = RateLimiter.create(rate / (double) unit.toSeconds(1));
return this;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package org.mycore.ubo.http;

import java.io.IOException;
import java.net.Authenticator;
import java.net.CookieHandler;
import java.net.ProxySelector;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.time.Duration;
import java.time.Instant;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;

import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLParameters;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import com.google.common.util.concurrent.RateLimiter;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

public class RateLimitedHttpClientTest {

private HttpClient client;

@Before
public void setUp() throws Exception {
this.client = new HttpClient() {
@Override public Optional<CookieHandler> cookieHandler() {
return Optional.empty();
}

@Override public Optional<Duration> connectTimeout() {
return Optional.empty();
}

@Override public Redirect followRedirects() {
return null;
}

@Override public Optional<ProxySelector> proxy() {
return Optional.empty();
}

@Override public SSLContext sslContext() {
return null;
}

@Override public SSLParameters sslParameters() {
return null;
}

@Override public Optional<Authenticator> authenticator() {
return Optional.empty();
}

@Override public Version version() {
return null;
}

@Override public Optional<Executor> executor() {
return Optional.empty();
}

@Override
public <T> HttpResponse<T> send(HttpRequest request, HttpResponse.BodyHandler<T> responseBodyHandler)
throws IOException, InterruptedException {
return null;
}

@Override
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request,
HttpResponse.BodyHandler<T> responseBodyHandler) {
return CompletableFuture.completedFuture(null);
}

@Override
public <T> CompletableFuture<HttpResponse<T>> sendAsync(HttpRequest request,
HttpResponse.BodyHandler<T> responseBodyHandler,
HttpResponse.PushPromiseHandler<T> pushPromiseHandler) {
return CompletableFuture.completedFuture(null);
}
};
}

@After
public void tearDown() throws Exception {
}

@Test
public void getRate() {
final RateLimitedHttpClient httpClient = RateLimitedHttpClient.newBuilder()
.rateLimit(10, TimeUnit.MINUTES).build();
assertEquals(10 / 60, httpClient.getRate(), 1d);
}

@Test
public void send() throws IOException, InterruptedException {
final RateLimitedHttpClient httpClient = new RateLimitedHttpClient(client, RateLimiter.create(1));
final Instant start = Instant.now();
final int runs = 5;
for (int i = 0; i < runs; i++) {
httpClient.send(HttpRequest.newBuilder().uri(URI.create("http://junit.test")).build(), null);
}
final Duration duration = Duration.between(start, Instant.now());
assertTrue("Request should have taken at least " + (runs - 1) + " seconds.",
duration.getSeconds() + 1 >= (runs - 1));
}
}