Skip to content

Commit

Permalink
Add injection of custom executor service to S3Base supplyAsync calls
Browse files Browse the repository at this point in the history
  • Loading branch information
thomasva committed Jul 8, 2024
1 parent 245f63d commit a491d99
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 13 deletions.
Binary file added .DS_Store
Binary file not shown.
29 changes: 20 additions & 9 deletions api/src/main/java/io/minio/MinioAsyncClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;
import java.util.regex.Matcher;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;
Expand Down Expand Up @@ -140,7 +142,8 @@ private MinioAsyncClient(
String region,
Provider provider,
OkHttpClient httpClient,
boolean closeHttpClient) {
boolean closeHttpClient,
ExecutorService executorService) {
super(
baseUrl,
awsS3Prefix,
Expand All @@ -150,7 +153,8 @@ private MinioAsyncClient(
region,
provider,
httpClient,
closeHttpClient);
closeHttpClient,
executorService);
}

protected MinioAsyncClient(MinioAsyncClient client) {
Expand Down Expand Up @@ -453,7 +457,7 @@ public CompletableFuture<ObjectWriteResponse> copyObject(CopyObjectArgs args)
args.validateSse(this.baseUrl);

return CompletableFuture.supplyAsync(
() -> args.source().offset() != null && args.source().length() != null)
() -> args.source().offset() != null && args.source().length() != null, executorService)
.thenCompose(
condition -> {
if (condition) {
Expand Down Expand Up @@ -667,7 +671,7 @@ public CompletableFuture<ObjectWriteResponse> composeObject(ComposeObjectArgs ar
Multimap<String, String> headers = newMultimap(args.extraHeaders());
headers.putAll(args.genHeaders());
return headers;
})
}, executorService)
.thenCompose(
headers -> {
try {
Expand Down Expand Up @@ -705,7 +709,7 @@ public CompletableFuture<ObjectWriteResponse> composeObject(ComposeObjectArgs ar
CompletableFuture.supplyAsync(
() -> {
return new Part[partCount[0]];
});
}, executorService);
for (ComposeSource src : sources) {
long size = 0;
try {
Expand Down Expand Up @@ -801,8 +805,8 @@ public CompletableFuture<ObjectWriteResponse> composeObject(ComposeObjectArgs ar
throw new CompletionException(e);
}
});
offset += length;
size -= length;
offset = startBytes;
size -= (endBytes - startBytes);
}
}

Expand Down Expand Up @@ -3155,7 +3159,7 @@ public CompletableFuture<ObjectWriteResponse> uploadSnowballObjects(
}
}
return baos;
})
}, executorService)
.thenCompose(
baos -> {
Multimap<String, String> headers = newMultimap(args.extraHeaders());
Expand Down Expand Up @@ -3223,6 +3227,7 @@ public static final class Builder {
private String region;
private Provider provider;
private OkHttpClient httpClient;
private ExecutorService executorService = ForkJoinPool.commonPool();

private void setAwsInfo(String host, boolean https) {
this.awsS3Prefix = null;
Expand Down Expand Up @@ -3329,6 +3334,11 @@ public Builder httpClient(OkHttpClient httpClient) {
return this;
}

public Builder executorService(ExecutorService executorService) {
this.executorService = executorService;
return this;
}

public MinioAsyncClient build() {
HttpUtils.validateNotNull(this.baseUrl, "endpoint");

Expand Down Expand Up @@ -3357,7 +3367,8 @@ public MinioAsyncClient build() {
region,
provider,
httpClient,
closeHttpClient);
closeHttpClient,
executorService());
}
}
}
30 changes: 26 additions & 4 deletions api/src/main/java/io/minio/S3Base.java
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -137,6 +138,7 @@ public abstract class S3Base implements AutoCloseable {
protected Provider provider;
protected OkHttpClient httpClient;
protected boolean closeHttpClient;
protected final ExecutorService executorService;

/** @deprecated This method is no longer supported. */
@Deprecated
Expand All @@ -161,6 +163,20 @@ protected S3Base(
false);
}

protected S3Base(
HttpUrl baseUrl,
String awsS3Prefix,
String awsDomainSuffix,
boolean awsDualstack,
boolean useVirtualStyle,
String region,
Provider provider,
OkHttpClient httpClient,
boolean closeHttpClient) {
this(baseUrl, awsS3Prefix, awsDomainSuffix, awsDualstack, useVirtualStyle, region, provider, httpClient, closeHttpClient);
this.executorService = ForkJoinPool.commonPool();
}

protected S3Base(
HttpUrl baseUrl,
String awsS3Prefix,
Expand All @@ -170,7 +186,8 @@ protected S3Base(
String region,
Provider provider,
OkHttpClient httpClient,
boolean closeHttpClient) {
boolean closeHttpClient,
ExecutorService executorService) {
this.baseUrl = baseUrl;
this.awsS3Prefix = awsS3Prefix;
this.awsDomainSuffix = awsDomainSuffix;
Expand All @@ -180,6 +197,7 @@ protected S3Base(
this.provider = provider;
this.httpClient = httpClient;
this.closeHttpClient = closeHttpClient;
this.executorService = executorService;
}

/** @deprecated This method is no longer supported. */
Expand Down Expand Up @@ -221,6 +239,7 @@ protected S3Base(S3Base client) {
this.provider = client.provider;
this.httpClient = client.httpClient;
this.closeHttpClient = client.closeHttpClient;
this.executorService = client.executorService;
}

/** Check whether argument is valid or not. */
Expand Down Expand Up @@ -1163,7 +1182,8 @@ protected CompletableFuture<Integer> calculatePartCountAsync(List<ComposeSource>
long[] objectSize = {0};
int index = 0;

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> 0);
CompletableFuture<Integer> completableFuture =
CompletableFuture.supplyAsync(() -> 0, executorService);
for (ComposeSource src : sources) {
index++;
final int i = index;
Expand Down Expand Up @@ -2882,7 +2902,8 @@ private CompletableFuture<ObjectWriteResponse> putMultipartObjectAsync(
}
}
return response;
});
},
executorService);
}

/**
Expand Down Expand Up @@ -2928,7 +2949,8 @@ protected CompletableFuture<ObjectWriteResponse> putObjectAsync(
} catch (NoSuchAlgorithmException | IOException e) {
throw new CompletionException(e);
}
})
},
executorService)
.thenCompose(
partSource -> {
try {
Expand Down

0 comments on commit a491d99

Please sign in to comment.