Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

untimed invokeAny and invokeAll with virtual threads #29787

Open
wants to merge 8 commits into
base: integration
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,25 @@ max=Maximum concurrency
max.desc=Specifies the maximum number of tasks that can run simultaneously. The default is Integer.MAX_VALUE. Maximum concurrency can be updated while tasks are in progress. If the maximum concurrency is reduced below the number of concurrently running tasks, the update goes into effect gradually as in-progress tasks complete, rather than canceling them.

maxPolicy=Maximum policy
maxPolicy.desc=Indicates whether to loosely or strictly enforce maximum concurrency for tasks that run on the task submitter's thread. Tasks can run on the task submitter's thread when using the untimed invokeAll method, or, if only invoking a single task, the untimed invokeAny method. If the run-if-queue-full attribute is configured, it is also possible for tasks to run the task submitter's thread when using the execute and submit methods. In all of these cases, this attribute determines whether or not running on the submitter's thread counts against the maximum concurrency.
maxPolicy.loose.desc=Maximum concurrency is loosely enforced. Tasks are allowed to run on the task submitter's thread without counting against maximum concurrency.
maxPolicy.strict.desc=Maximum concurrency is strictly enforced. Tasks that run on the task submitter's thread count towards maximum concurrency. This policy does not allow tasks to run on the task submitter's thread when already at maximum concurrency.
maxPolicy.desc=Indicates whether to loosely or strictly enforce maximum \
concurrency for tasks that run on the task submitter's thread. If the \
dmuelle marked this conversation as resolved.
Show resolved Hide resolved
concurrency policy is not configured to run tasks on virtual threads, the \
invokeAll and invokeAny methods can sometimes run tasks on the task submitter's \
thread. This situation happens when you use the untimed invokeAll method, or, \
if you only supply a single task, when you use the untimed invokeAny method. \
If the run-if-queue-full attribute is configured, tasks can also \
run on the task submitter's thread when you use the execute and submit methods. \
In all of these cases, this attribute determines whether running on the \
submitter's thread counts against the maximum concurrency. Completion stage \
tasks that run inline do not count against the maximum concurrency, regardless \
of the maximum policy.
maxPolicy.loose.desc=Maximum concurrency is loosely enforced. \
Tasks are allowed to run on the task submitter's thread without \
counting against maximum concurrency.
maxPolicy.strict.desc=Maximum concurrency is strictly enforced. \
Tasks that run on the task submitter's thread count towards maximum concurrency. \
This policy does not allow tasks to run on the task submitter's thread when \
already at maximum concurrency.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note to reviewers: maxPolicy.loose.desc and maxPolicy.strict.desc were not changed. I only split the existing text onto multiple lines so that I could read them in a code editor without needing to keep scrolling back and forth. I also split up maxPolicy.desc for the same reason, but added some text to that one, so that is the one to review.


maxQueueSize=Maximum queue size
maxQueueSize.desc=Specifies the maximum number of tasks that can be in the queue waiting for execution. As tasks are started, canceled, or aborted, they are removed from the queue. When the queue is at capacity and another task is submitted, the behavior is determined by the maximum wait for enqueue and run-if-queue-full attributes. To ensure that a specific number of tasks can be queued within a short interval of time, use a maximum queue size that is at least as large as that amount. The default maximum queue size is Integer.MAX_VALUE. Maximum queue size can be updated while tasks are both in progress or queued for execution. If the maximum queue size is reduced below the current number of queued tasks, the update goes into effect gradually rather than automatically canceling the excess queued tasks.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import static jakarta.enterprise.concurrent.ContextServiceDefinition.APPLICATION;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.fail;

import java.time.ZoneId;
Expand All @@ -25,6 +26,7 @@
import java.util.List;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
Expand Down Expand Up @@ -253,7 +255,119 @@ public void testTimedInvokeAnyOnVirtualThreads() throws Exception {
assertEquals(Boolean.TRUE, Thread.class.getMethod("isVirtual").invoke(thread));
}

// TODO after the workaround is removed, write: public void testUntimedInvokeAllOnVirtualThreads() throws Exception {
/**
* Use ManagedExecutorDefinition with virtual=true to submit multiple task
* to run on a virtual thread via the untimed invokeAll method.
*/
@Test
public void testUntimedInvokeAllOnVirtualThreads() throws Exception {
Callable<Thread> task = () -> {
InitialContext.doLookup("java:comp/env/TestEntry");
return Thread.currentThread();
};

ManagedExecutorService executor = InitialContext //
.doLookup("java:module/concurrent/virtual-executor");

List<Future<Thread>> futures = executor.invokeAll(List.of(task, task, task));

assertEquals(futures.toString(), 3, futures.size());

Set<Thread> uniqueThreads = new HashSet<Thread>();
uniqueThreads.add(Thread.currentThread());

Thread thread;
assertNotNull(thread = futures.get(0).get(TIMEOUT_NS, TimeUnit.NANOSECONDS));
assertEquals(Boolean.TRUE,
Thread.class.getMethod("isVirtual").invoke(thread));
uniqueThreads.add(thread);

assertNotNull(thread = futures.get(1).get(TIMEOUT_NS, TimeUnit.NANOSECONDS));
assertEquals(Boolean.TRUE,
Thread.class.getMethod("isVirtual").invoke(thread));
uniqueThreads.add(thread);

assertNotNull(thread = futures.get(2).get(TIMEOUT_NS, TimeUnit.NANOSECONDS));
assertEquals(Boolean.TRUE,
Thread.class.getMethod("isVirtual").invoke(thread));
uniqueThreads.add(thread);

assertEquals(uniqueThreads.toString(), 4, uniqueThreads.size());
uniqueThreads.clear();

// run from a virtual thread:
CompletableFuture<List<Future<Thread>>> ff = new CompletableFuture<>();
ManagedThreadFactory threadFactory = InitialContext //
.doLookup("java:module/concurrent/virtual-thread-factory");
threadFactory.newThread(() -> {
try {
uniqueThreads.add(Thread.currentThread());
ff.complete(executor.invokeAll(List.of(task, task, task)));
} catch (Throwable x) {
ff.completeExceptionally(x);
}
}).start();

futures = ff.get(TIMEOUT_NS, TimeUnit.NANOSECONDS);

assertEquals(futures.toString(), 3, futures.size());

assertNotNull(thread = futures.get(0).get(TIMEOUT_NS, TimeUnit.NANOSECONDS));
assertEquals(Boolean.TRUE,
Thread.class.getMethod("isVirtual").invoke(thread));
uniqueThreads.add(thread);

assertNotNull(thread = futures.get(1).get(TIMEOUT_NS, TimeUnit.NANOSECONDS));
assertEquals(Boolean.TRUE,
Thread.class.getMethod("isVirtual").invoke(thread));
uniqueThreads.add(thread);

assertNotNull(thread = futures.get(2).get(TIMEOUT_NS, TimeUnit.NANOSECONDS));
assertEquals(Boolean.TRUE,
Thread.class.getMethod("isVirtual").invoke(thread));
uniqueThreads.add(thread);

assertEquals(uniqueThreads.toString(), 4, uniqueThreads.size());
}

/**
* Use ManagedExecutorDefinition with virtual=true to submit a single task
* to run on a virtual thread via the untimed invokeAny method.
*/
@Test
public void testUntimedInvokeAnyOneOnVirtualThread() throws Exception {
Callable<Thread> anyTask = () -> {
InitialContext.doLookup("java:comp/env/TestEntry");
return Thread.currentThread();
};

ManagedExecutorService executor = InitialContext //
.doLookup("java:module/concurrent/virtual-executor");

Thread thread = executor.invokeAny(List.of(anyTask));

assertNotSame(Thread.currentThread(), thread); // does not run inline
assertEquals(Boolean.TRUE,
Thread.class.getMethod("isVirtual").invoke(thread));

// run from a virtual thread:
CompletableFuture<Thread> futureThread = new CompletableFuture<>();
ManagedThreadFactory threadFactory = InitialContext //
.doLookup("java:module/concurrent/virtual-thread-factory");
Thread newThread = threadFactory.newThread(() -> {
try {
futureThread.complete(executor.invokeAny(List.of(anyTask)));
} catch (Throwable x) {
futureThread.completeExceptionally(x);
}
});
newThread.start();

thread = futureThread.get(TIMEOUT_NS, TimeUnit.NANOSECONDS);
assertNotSame(newThread, thread); // does not run inline
assertEquals(Boolean.TRUE,
Thread.class.getMethod("isVirtual").invoke(thread));
}

/**
* Use ManagedExecutorDefinition with virtual=true to submit multiple tasks to run on virtual threads,
Expand Down Expand Up @@ -282,8 +396,6 @@ public void testUntimedInvokeAnyOnVirtualThreads() throws Exception {
assertEquals(Boolean.TRUE, Thread.class.getMethod("isVirtual").invoke(thread));
}

// TODO after the workaround is removed, write: public void testUntimedInvokeAnyOneOnVirtualThread() throws Exception {

/**
* Use a managed-executor from the application.xml deployment descriptor with
* virtual=true to request that tasks run on virtual threads.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,6 @@ public enum MaxPolicy {
* This policy does not allow running on the submitter's thread if already at maximum concurrency.
*/
strict

/*
* Leaving the policy unspecified indicates that the policy is to be chosen on a case-by-case basis
* based on whether the submitter thread is a virtual thread (try to avoid running on the submitter's thread)
* or a platform thread (use a policy of loose).
*/
// unspecified
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -803,14 +803,12 @@ public <T> List<PolicyTaskFuture<T>> invokeAll(Collection<? extends Callable<T>>
boolean havePermit = false;
boolean useCurrentThread;
MaxPolicy policy = maxPolicy;
if (policy == MaxPolicy.loose) // can always run inline
if (virtual) // always run asynchronously on new virtual thread
useCurrentThread = false;
else if (policy == MaxPolicy.loose) // can always run inline
useCurrentThread = true;
else if (policy == MaxPolicy.strict) // must acquire a permit to run inline
else // policy == MaxPolicy.strict // must acquire a permit to run inline
useCurrentThread = havePermit = taskCount > 0 && maxConcurrencyConstraint.tryAcquire();
else // can run inline on platform threads (loose); Never run inline on virtual threads
throw new UnsupportedOperationException("maxPolicy=null"); // currently unreachable, waiting for a pull that is blocked
// TODO:
// useCurrentThread = !(virtualThreadOps.isSupported() && virtualThreadOps.isVirtual(Thread.currentThread()));

List<PolicyTaskFutureImpl<T>> futures = new ArrayList<PolicyTaskFutureImpl<T>>(taskCount);
try {
Expand Down Expand Up @@ -983,14 +981,12 @@ public <T> T invokeAny(Collection<? extends Callable<T>> tasks, PolicyTaskCallba
boolean havePermit = false;
boolean useCurrentThread;
MaxPolicy policy = maxPolicy;
if (policy == MaxPolicy.loose) // can always run inline
if (virtual) // always run asynchronously on new virtual thread
useCurrentThread = false;
else if (policy == MaxPolicy.loose) // can always run inline
useCurrentThread = true;
else if (policy == MaxPolicy.strict) // must acquire a permit to run inline
else // policy == MaxPolicy.strict // must acquire a permit to run inline
useCurrentThread = havePermit = maxConcurrencyConstraint.tryAcquire();
else // can run inline on platform threads (loose); Never run inline on virtual threads
throw new UnsupportedOperationException("maxPolicy=null"); // currently unreachable, waiting for a pull that is blocked
// TODO:
// useCurrentThread = !(virtualThreadOps.isSupported() && virtualThreadOps.isVirtual(Thread.currentThread()));

if (useCurrentThread)
try {
Expand Down Expand Up @@ -1538,7 +1534,7 @@ public void updateConfig(Map<String, Object> props) {
long u_maxWaitForEnqueue = (Long) props.get("maxWaitForEnqueue");
boolean u_runIfQueueFull = (Boolean) props.get("runIfQueueFull");
long u_startTimeout = null == (v = props.get("startTimeout")) ? -1l : (Long) v;
boolean useVirtualThreads = null == (v = props.get("virtual")) ? false : (Boolean) v;;
boolean useVirtualThreads = null == (v = props.get("virtual")) ? false : (Boolean) v;

// Validation that cannot be performed by metatype:
if (useVirtualThreads && !virtualThreadOps.isSupported()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,11 +219,10 @@ else if (maxAsync != -1) // unbounded
concurrencyPolicyProps.put("maxWaitForEnqueue", 0L);
concurrencyPolicyProps.put("runIfQueueFull", false);

if (Boolean.TRUE.equals(virtual) && JavaInfo.majorVersion() >= 21) { // only available in Concurrency 3.1+ and Java 21+
if (Boolean.TRUE.equals(virtual) && JavaInfo.majorVersion() >= 21) {
// only available in Concurrency 3.1+ and Java 21+
concurrencyPolicyProps.put("virtual", virtual);
// maxPolicy unspecified makes the policy conditional on whether or not the submitter thread is virtual
// TODO remove the following once unspecified is supported
concurrencyPolicyProps.put("maxPolicy", "loose");
concurrencyPolicyProps.put("maxPolicy", "strict");
} else {
if (Boolean.TRUE.equals(virtual))
Tr.info(tc, "CWWKC1217.no.virtual.threads",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,11 +222,10 @@ else if (maxAsync != -1) // unbounded
concurrencyPolicyProps.put("maxWaitForEnqueue", 0L);
concurrencyPolicyProps.put("runIfQueueFull", false);

if (Boolean.TRUE.equals(virtual) && JavaInfo.majorVersion() >= 21) { // only available in Concurrency 3.1+ and Java 21+
if (Boolean.TRUE.equals(virtual) && JavaInfo.majorVersion() >= 21) {
// only available in Concurrency 3.1+ and Java 21+
concurrencyPolicyProps.put("virtual", virtual);
// maxPolicy unspecified makes the policy conditional on whether or not the submitter thread is virtual
// TODO remove the following once unspecified is supported
concurrencyPolicyProps.put("maxPolicy", "loose");
concurrencyPolicyProps.put("maxPolicy", "strict");
} else {
if (Boolean.TRUE.equals(virtual))
Tr.info(tc, "CWWKC1217.no.virtual.threads",
Expand Down