Skip to content

Commit

Permalink
Concurrency fixes (#3642)
Browse files Browse the repository at this point in the history
* Swap from `ForkJoinPool` to `ThreadPoolExecutor`
* `ForkJoinPool` does this weird thing where fork-join-tasks can be
re-entrant at `join` points, resulting in weird scenarios where a
mill-task that hits a yield point (e.g. inside Zinc /
parallel-collections / FJP) can start running a second mill-task even
before the first has finished, violating all sorts of invariants (# of
running tasks exceeds `--jobs`, `FixSizeCache` semaphores get taken
twice by the same thread, all sorts of craziness)
* We replace `ForkJoinPool#ManagedBlocker` with our own manual logic
increasing and decreasing the `ThreadPoolExecutor`s `maximumPoolSize`
and `corePoolSize` in our `blocking{...}` wrapper


* We need to `Thread#interrupt()` the `promptUpdaterThread` thread when
we close the `PromptLogger`, so we don't need to wait the
`promptUpdateInterval` (0.1ms for interactive, 60s for non-interactive)
before exiting

This should fix some of the flakiness we've been seeing in master, that
seems to have started from
05bef7e
(just eyeballing the CI history), and blocking our re-bootstrapping in
#3637
  • Loading branch information
lihaoyi authored Oct 1, 2024
1 parent 0362cdb commit 26cbc8a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 21 deletions.
39 changes: 25 additions & 14 deletions main/eval/src/mill/eval/ExecutionContexts.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package mill.eval

import os.Path

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import java.util.concurrent.ForkJoinPool.ManagedBlocker
import java.util.concurrent.{ExecutorService, ForkJoinPool}
import java.util.concurrent.{ExecutorService, LinkedBlockingQueue, ThreadPoolExecutor, TimeUnit}

private object ExecutionContexts {

Expand All @@ -29,21 +29,32 @@ private object ExecutionContexts {
* A simple thread-pool-based ExecutionContext with configurable thread count
* and AutoCloseable support
*/
class ThreadPool(threadCount: Int) extends mill.api.Ctx.Fork.Impl {
class ThreadPool(threadCount0: Int) extends mill.api.Ctx.Fork.Impl {
def await[T](t: Future[T]): T = blocking { Await.result(t, Duration.Inf) }
val forkJoinPool: ForkJoinPool = new ForkJoinPool(threadCount)
val threadPool: ExecutorService = forkJoinPool
val executor: ThreadPoolExecutor = new ThreadPoolExecutor(
threadCount0,
threadCount0,
0,
TimeUnit.SECONDS,
new LinkedBlockingQueue[Runnable]()
)

val threadPool: ExecutorService = executor

def updateThreadCount(delta: Int): Unit = synchronized {
if (delta > 0) {
executor.setMaximumPoolSize(executor.getMaximumPoolSize + delta)
executor.setCorePoolSize(executor.getCorePoolSize + delta)
} else {
executor.setCorePoolSize(executor.getCorePoolSize + delta)
executor.setMaximumPoolSize(executor.getMaximumPoolSize + delta)
}
}

def blocking[T](t: => T): T = {
@volatile var res: Option[T] = None
ForkJoinPool.managedBlock(new ManagedBlocker {
def block(): Boolean = {
if (res.isEmpty) res = Some(t)
true
}
def isReleasable: Boolean = res.nonEmpty
})
res.get
updateThreadCount(1)
try t
finally updateThreadCount(-1)
}

def execute(runnable: Runnable): Unit = {
Expand Down
16 changes: 9 additions & 7 deletions main/util/src/mill/util/PromptLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ private[mill] class PromptLogger(
if (termDimensions._1.isDefined) promptUpdateIntervalMillis
else nonInteractivePromptUpdateIntervalMillis

Thread.sleep(promptUpdateInterval)
try Thread.sleep(promptUpdateInterval)
catch { case e: InterruptedException => /*do nothing*/ }

if (!paused) {
synchronized {
Expand All @@ -85,8 +86,9 @@ private[mill] class PromptLogger(

override def setPromptLeftHeader(s: String): Unit = synchronized { state.updateGlobal(s) }
override def clearPromptStatuses(): Unit = synchronized { state.clearStatuses() }
override def removePromptLine(key: Seq[String]): Unit =
synchronized { state.updateCurrent(key, None) }
override def removePromptLine(key: Seq[String]): Unit = synchronized {
state.updateCurrent(key, None)
}

def ticker(s: String): Unit = ()
override def setPromptDetail(key: Seq[String], s: String): Unit = {
Expand All @@ -111,9 +113,8 @@ private[mill] class PromptLogger(
synchronized {
state.updateCurrent(key, Some(s"[${key.mkString("-")}] $message"))
seenIdentifiers(key) = (verboseKeySuffix, message)
super.setPromptLine(key.map(infoColor(_).toString()), verboseKeySuffix, message)

}

def debug(s: String): Unit = synchronized { if (debugEnabled) systemStreams.err.println(s) }

override def rawOutputStream: PrintStream = systemStreams0.out
Expand All @@ -126,6 +127,7 @@ private[mill] class PromptLogger(
}
// Needs to be outside the lock so we don't deadlock with `promptUpdaterThread`
// trying to take the lock one last time before exiting
promptUpdaterThread.interrupt()
promptUpdaterThread.join()
}

Expand All @@ -140,7 +142,7 @@ private[mill] class PromptLogger(
// After the prompt gets paused, wait until the `promptUpdaterThread` marks
// `pauseNoticed = true`, so we can be sure it's done printing out prompt updates for
// now and we can proceed with running `t` without any last updates slipping through
while (!pauseNoticed) Thread.sleep(1)
while (!pauseNoticed) Thread.sleep(2)
// Clear the prompt so the code in `t` has a blank terminal to work with
systemStreams0.err.write(AnsiNav.clearScreen(0).getBytes)
systemStreams0.err.flush()
Expand Down Expand Up @@ -174,7 +176,7 @@ private[mill] object PromptLogger {
)

def awaitPumperEmpty(): Unit = {
while (pipe.input.available() != 0) Thread.sleep(10)
while (pipe.input.available() != 0) Thread.sleep(2)
}
object pumper extends ProxyStream.Pumper(pipe.input, systemStreams0.out, systemStreams0.err) {
object PumperState extends Enumeration {
Expand Down

0 comments on commit 26cbc8a

Please sign in to comment.