diff --git a/ubo-common/src/main/java/org/mycore/ubo/http/RateLimitedHttpClient.java b/ubo-common/src/main/java/org/mycore/ubo/http/RateLimitedHttpClient.java new file mode 100644 index 000000000..ef20312f0 --- /dev/null +++ b/ubo-common/src/main/java/org/mycore/ubo/http/RateLimitedHttpClient.java @@ -0,0 +1,200 @@ +package org.mycore.ubo.http; + +import java.io.IOException; +import java.net.Authenticator; +import java.net.CookieHandler; +import java.net.ProxySelector; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.time.Instant; +import java.time.temporal.ChronoUnit; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLParameters; + +import org.mycore.common.MCRException; + +import com.google.common.util.concurrent.RateLimiter; + +public class RateLimitedHttpClient extends HttpClient { + private final HttpClient client; + + private final RateLimiter rateLimiter; + + RateLimitedHttpClient(HttpClient client, RateLimiter rateLimiter) { + this.client = client; + this.rateLimiter = rateLimiter; + } + + public double getRate() { + return rateLimiter.getRate(); + } + + @Override public Optional cookieHandler() { + return client.cookieHandler(); + } + + @Override public Optional connectTimeout() { + return client.connectTimeout(); + } + + @Override public Redirect followRedirects() { + return client.followRedirects(); + } + + @Override public Optional proxy() { + return client.proxy(); + } + + @Override public SSLContext sslContext() { + return client.sslContext(); + } + + @Override public SSLParameters sslParameters() { + return client.sslParameters(); + } + + @Override public Optional authenticator() { + return client.authenticator(); + } + + @Override public Version version() { + return client.version(); + } + + @Override public Optional executor() { + return client.executor(); + } + + @Override public HttpResponse send(HttpRequest request, + HttpResponse.BodyHandler responseBodyHandler) throws IOException, InterruptedException { + try { + return sendAsync(request, responseBodyHandler).get(); + } catch (ExecutionException e) { + final Throwable cause = e.getCause(); + if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } + if (cause instanceof IOException) { + throw (IOException) cause; + } + if (cause instanceof InterruptedException) { + throw (InterruptedException) cause; + } + throw new IOException(cause); + } + } + + @Override public CompletableFuture> sendAsync( + HttpRequest request, HttpResponse.BodyHandler responseBodyHandler) { + return sendAsync(request, responseBodyHandler, null); + } + + @Override public CompletableFuture> sendAsync( + HttpRequest request, HttpResponse.BodyHandler responseBodyHandler, + HttpResponse.PushPromiseHandler pushPromiseHandler) { + return acquireRequestPermission(request) + .thenCompose(waitTime -> client.sendAsync(request, responseBodyHandler, pushPromiseHandler)); + } + + private CompletableFuture acquireRequestPermission(HttpRequest request) { + final Optional connectionTimeOut = request.timeout() + .or(this::connectTimeout); + final Executor executor = executor() + .orElseGet(ForkJoinPool::commonPool); + return connectionTimeOut.map(duration -> CompletableFuture.supplyAsync(() -> { + final Instant start = Instant.now(); + if (rateLimiter.tryAcquire(duration)) { + return Duration.between(start, Instant.now()) + .get(ChronoUnit.MILLIS) / (double) 1000; + } + throw new MCRException("Timeout"); + }, executor)).orElseGet(() -> CompletableFuture.supplyAsync(rateLimiter::acquire, executor)); + } + + public static Builder newBuilder() { + return new BuilderImpl(); + } + + public interface Builder extends HttpClient.Builder { + Builder rateLimit(int rate, TimeUnit unit); + + @Override RateLimitedHttpClient build(); + } + + private static class BuilderImpl implements Builder { + private final HttpClient.Builder builder = HttpClient.newBuilder(); + + private RateLimiter rateLimiter; + + @Override public HttpClient.Builder cookieHandler(CookieHandler cookieHandler) { + builder.cookieHandler(cookieHandler); + return this; + } + + @Override public HttpClient.Builder connectTimeout(Duration duration) { + builder.connectTimeout(duration); + return this; + } + + @Override public HttpClient.Builder sslContext(SSLContext sslContext) { + builder.sslContext(sslContext); + return this; + } + + @Override public HttpClient.Builder sslParameters(SSLParameters sslParameters) { + builder.sslParameters(sslParameters); + return this; + } + + @Override public HttpClient.Builder executor(Executor executor) { + builder.executor(executor); + return this; + } + + @Override public HttpClient.Builder followRedirects(Redirect policy) { + builder.followRedirects(policy); + return this; + } + + @Override public HttpClient.Builder version(Version version) { + builder.version(version); + return this; + } + + @Override public HttpClient.Builder priority(int priority) { + builder.priority(priority); + return this; + } + + @Override public HttpClient.Builder proxy(ProxySelector proxySelector) { + builder.proxy(proxySelector); + return this; + } + + @Override public HttpClient.Builder authenticator(Authenticator authenticator) { + builder.authenticator(authenticator); + return this; + } + + @Override public RateLimitedHttpClient build() { + if (rateLimiter == null) { + throw new IllegalStateException("No rate limit defined!"); + } + return new RateLimitedHttpClient(builder.build(), rateLimiter); + } + + @Override public Builder rateLimit(int rate, TimeUnit unit) { + rateLimiter = RateLimiter.create(rate / (double) unit.toSeconds(1)); + return this; + } + } +} diff --git a/ubo-common/src/test/java/org/mycore/ubo/http/RateLimitedHttpClientTest.java b/ubo-common/src/test/java/org/mycore/ubo/http/RateLimitedHttpClientTest.java new file mode 100644 index 000000000..506e0812d --- /dev/null +++ b/ubo-common/src/test/java/org/mycore/ubo/http/RateLimitedHttpClientTest.java @@ -0,0 +1,117 @@ +package org.mycore.ubo.http; + +import java.io.IOException; +import java.net.Authenticator; +import java.net.CookieHandler; +import java.net.ProxySelector; +import java.net.URI; +import java.net.http.HttpClient; +import java.net.http.HttpRequest; +import java.net.http.HttpResponse; +import java.time.Duration; +import java.time.Instant; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; + +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLParameters; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.util.concurrent.RateLimiter; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class RateLimitedHttpClientTest { + + private HttpClient client; + + @Before + public void setUp() throws Exception { + this.client = new HttpClient() { + @Override public Optional cookieHandler() { + return Optional.empty(); + } + + @Override public Optional connectTimeout() { + return Optional.empty(); + } + + @Override public Redirect followRedirects() { + return null; + } + + @Override public Optional proxy() { + return Optional.empty(); + } + + @Override public SSLContext sslContext() { + return null; + } + + @Override public SSLParameters sslParameters() { + return null; + } + + @Override public Optional authenticator() { + return Optional.empty(); + } + + @Override public Version version() { + return null; + } + + @Override public Optional executor() { + return Optional.empty(); + } + + @Override + public HttpResponse send(HttpRequest request, HttpResponse.BodyHandler responseBodyHandler) + throws IOException, InterruptedException { + return null; + } + + @Override + public CompletableFuture> sendAsync(HttpRequest request, + HttpResponse.BodyHandler responseBodyHandler) { + return CompletableFuture.completedFuture(null); + } + + @Override + public CompletableFuture> sendAsync(HttpRequest request, + HttpResponse.BodyHandler responseBodyHandler, + HttpResponse.PushPromiseHandler pushPromiseHandler) { + return CompletableFuture.completedFuture(null); + } + }; + } + + @After + public void tearDown() throws Exception { + } + + @Test + public void getRate() { + final RateLimitedHttpClient httpClient = RateLimitedHttpClient.newBuilder() + .rateLimit(10, TimeUnit.MINUTES).build(); + assertEquals(10 / 60, httpClient.getRate(), 1d); + } + + @Test + public void send() throws IOException, InterruptedException { + final RateLimitedHttpClient httpClient = new RateLimitedHttpClient(client, RateLimiter.create(1)); + final Instant start = Instant.now(); + final int runs = 5; + for (int i = 0; i < runs; i++) { + httpClient.send(HttpRequest.newBuilder().uri(URI.create("http://junit.test")).build(), null); + } + final Duration duration = Duration.between(start, Instant.now()); + assertTrue("Request should have taken at least " + (runs - 1) + " seconds.", + duration.getSeconds() + 1 >= (runs - 1)); + } +}