Skip to content

Commit

Permalink
Add IORuntimeMetrics and CpuStarvationMetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
iRevive committed Jul 19, 2024
1 parent 48c65c6 commit 2d3ff52
Show file tree
Hide file tree
Showing 17 changed files with 213 additions and 178 deletions.
9 changes: 8 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -676,7 +676,14 @@ lazy val core = crossProject(JSPlatform, JVMPlatform, NativePlatform)
"cats.effect.unsafe.IORuntimeBuilder.this"),
// introduced by #3695, which enabled fiber dumps on native
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.unsafe.FiberMonitorCompanionPlatform")
"cats.effect.unsafe.FiberMonitorCompanionPlatform"),
// internal API change, makes CpuStarvationMetrics available on all platforms
ProblemFilters.exclude[MissingClassProblem](
"cats.effect.metrics.JvmCpuStarvationMetrics$NoOpCpuStarvationMetrics"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.JsCpuStarvationMetrics"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.JsCpuStarvationMetrics$"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.NativeCpuStarvationMetrics"),
ProblemFilters.exclude[MissingClassProblem]("cats.effect.metrics.NativeCpuStarvationMetrics$")
) ++ {
if (tlIsScala3.value) {
// Scala 3 specific exclusions
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package cats.effect.metrics

private[metrics] abstract class IORuntimeMetricsCompanionPlatform {
this: IORuntimeMetrics.type =>

private[effect] def apply(): IORuntimeMetrics =
new IORuntimeMetrics {
private[effect] val cpuStarvationSampler: CpuStarvationSampler =
CpuStarvationSampler()

val cpuStarvation: CpuStarvationMetrics =
CpuStarvationMetrics(cpuStarvationSampler)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package cats.effect.metrics

private[metrics] trait IORuntimeMetricsPlatform { this: IORuntimeMetrics => }
4 changes: 2 additions & 2 deletions core/js/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package cats.effect

import cats.effect.metrics.{CpuStarvationWarningMetrics, JsCpuStarvationMetrics}
import cats.effect.metrics.CpuStarvationWarningMetrics
import cats.effect.std.Console
import cats.effect.tracing.TracingConstants._

Expand Down Expand Up @@ -260,7 +260,7 @@ trait IOApp {
val fiber = Spawn[IO]
.raceOutcome[ExitCode, Nothing](
CpuStarvationCheck
.run(runtimeConfig, JsCpuStarvationMetrics(), onCpuStarvationWarn)
.run(runtimeConfig, runtime.metrics.cpuStarvationSampler, onCpuStarvationWarn)
.background
.surround(run(argList)),
keepAlive)
Expand Down

This file was deleted.

6 changes: 3 additions & 3 deletions core/jvm/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -447,10 +447,10 @@ trait IOApp {
val queue = this.queue

val fiber =
JvmCpuStarvationMetrics()
.flatMap { cpuStarvationMetrics =>
JvmCpuStarvationMetrics(runtime.metrics.cpuStarvationSampler)
.flatMap { _ =>
CpuStarvationCheck
.run(runtimeConfig, cpuStarvationMetrics, onCpuStarvationWarn)
.run(runtimeConfig, runtime.metrics.cpuStarvationSampler, onCpuStarvationWarn)
.background
}
.surround(ioa)
Expand Down
59 changes: 8 additions & 51 deletions core/jvm/src/main/scala/cats/effect/metrics/CpuStarvation.scala
Original file line number Diff line number Diff line change
@@ -1,57 +1,14 @@
/*
* Copyright 2020-2024 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.metrics

import cats.effect.IO

import scala.concurrent.duration.FiniteDuration

import java.util.concurrent.atomic.AtomicLong

private[metrics] class CpuStarvation private (
counter: AtomicLong,
currentClockDrift: AtomicLong,
maxClockDrift: AtomicLong)
extends CpuStarvationMBean {

override def getCpuStarvationCount(): Long = counter.get()

override def getMaxClockDriftMs(): Long = maxClockDrift.get()
private final class CpuStarvation(sampler: CpuStarvationSampler) extends CpuStarvationMBean {
def getCpuStarvationCount(): Long =
sampler.cpuStarvationCount()

override def getCurrentClockDriftMs(): Long = currentClockDrift.get()

def incStarvationCount: IO[Unit] = IO.delay(counter.incrementAndGet()).void

def recordDrift(drift: FiniteDuration): IO[Unit] = {
val driftMs = drift.toMillis

val maxDrift =
if (driftMs > 0) IO.delay(maxClockDrift.updateAndGet(math.max(_, driftMs))).void
else IO.unit

IO.delay(currentClockDrift.set(driftMs)) >> maxDrift
}
def getMaxClockDriftMs(): Long =
sampler.clockDriftMaxMs()

def getCurrentClockDriftMs(): Long =
sampler.clockDriftCurrentMs()
}

private[metrics] object CpuStarvation {
private[metrics] def apply(): IO[CpuStarvation] = for {
counter <- IO.delay(new AtomicLong(0))
currentClockDrift <- IO.delay(new AtomicLong(0))
maxClockDrift <- IO.delay(new AtomicLong(0))
} yield new CpuStarvation(counter, currentClockDrift, maxClockDrift)
}
private object CpuStarvation
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package cats.effect.metrics

private[metrics] abstract class IORuntimeMetricsCompanionPlatform {
this: IORuntimeMetrics.type =>

private[effect] def apply(): IORuntimeMetrics =
new IORuntimeMetrics {
private[effect] val cpuStarvationSampler: CpuStarvationSampler =
CpuStarvationSampler()

val cpuStarvation: CpuStarvationMetrics =
CpuStarvationMetrics(cpuStarvationSampler)
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
package cats.effect.metrics

private[metrics] trait IORuntimeMetricsPlatform { this: IORuntimeMetrics => }
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,13 @@ package cats.effect.metrics

import cats.effect.{IO, Resource}
import cats.effect.std.Console

import scala.concurrent.duration.FiniteDuration
import cats.syntax.functor._

import java.io.{PrintWriter, StringWriter}
import java.lang.management.ManagementFactory

import javax.management.{MBeanServer, ObjectName}

private[effect] class JvmCpuStarvationMetrics private (mbean: CpuStarvation)
extends CpuStarvationMetrics {
override def incCpuStarvationCount: IO[Unit] = mbean.incStarvationCount

override def recordClockDrift(drift: FiniteDuration): IO[Unit] = mbean.recordDrift(drift)
}

private[effect] object JvmCpuStarvationMetrics {
private[this] val mBeanObjectName = new ObjectName("cats.effect.metrics:type=CpuStarvation")

Expand All @@ -46,28 +38,18 @@ private[effect] object JvmCpuStarvationMetrics {
|""".stripMargin
}

private[this] class NoOpCpuStarvationMetrics extends CpuStarvationMetrics {
override def incCpuStarvationCount: IO[Unit] = IO.unit

override def recordClockDrift(drift: FiniteDuration): IO[Unit] = IO.unit
}

private[effect] def apply(): Resource[IO, CpuStarvationMetrics] = {
val acquire: IO[(MBeanServer, JvmCpuStarvationMetrics)] = for {
private[effect] def apply(metrics: CpuStarvationSampler): Resource[IO, Unit] = {
val acquire: IO[MBeanServer] = for {
mBeanServer <- IO.delay(ManagementFactory.getPlatformMBeanServer)
mBean <- CpuStarvation()
mBean <- IO.pure(new CpuStarvation(metrics))
// To allow user-defined program to use the compute pool from the beginning,
// here we use `IO.delay` rather than `IO.blocking`.
_ <- IO.delay(mBeanServer.registerMBean(mBean, mBeanObjectName))
} yield (mBeanServer, new JvmCpuStarvationMetrics(mBean))
} yield mBeanServer

Resource
.make(acquire) {
case (mbeanServer, _) => IO.blocking(mbeanServer.unregisterMBean(mBeanObjectName))
}
.map(_._2)
.handleErrorWith[CpuStarvationMetrics, Throwable] { th =>
Resource.eval(Console[IO].errorln(warning(th))).map(_ => new NoOpCpuStarvationMetrics)
}
.make(acquire)(mbeanServer => IO.blocking(mbeanServer.unregisterMBean(mBeanObjectName)))
.void
.handleErrorWith[Unit, Throwable](th => Resource.eval(Console[IO].errorln(warning(th))))
}
}
4 changes: 2 additions & 2 deletions core/native/src/main/scala/cats/effect/IOApp.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package cats.effect

import cats.effect.metrics.{CpuStarvationWarningMetrics, NativeCpuStarvationMetrics}
import cats.effect.metrics.CpuStarvationWarningMetrics
import cats.syntax.all._

import scala.concurrent.CancellationException
Expand Down Expand Up @@ -271,7 +271,7 @@ trait IOApp {
else Resource.unit[IO]

val starvationChecker = CpuStarvationCheck
.run(runtimeConfig, NativeCpuStarvationMetrics(), onCpuStarvationWarn)
.run(runtimeConfig, runtime.metrics.cpuStarvationSampler, onCpuStarvationWarn)
.background

Spawn[IO]
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package cats.effect

import cats.effect.metrics.{CpuStarvationMetrics, CpuStarvationWarningMetrics}
import cats.effect.metrics.{CpuStarvationSampler, CpuStarvationWarningMetrics}
import cats.effect.std.Console
import cats.effect.unsafe.IORuntimeConfig
import cats.syntax.all._
Expand All @@ -27,7 +27,7 @@ private[effect] object CpuStarvationCheck extends CpuStarvationCheckPlatform {

def run(
runtimeConfig: IORuntimeConfig,
metrics: CpuStarvationMetrics,
sampler: CpuStarvationSampler,
onCpuStarvationWarn: CpuStarvationWarningMetrics => IO[Unit]): IO[Nothing] = {
import runtimeConfig._

Expand All @@ -37,15 +37,15 @@ private[effect] object CpuStarvationCheck extends CpuStarvationCheckPlatform {
IO.sleep(cpuStarvationCheckInterval) >> IO.monotonic.flatMap { now =>
val delta = now - initial

metrics.recordClockDrift(delta - cpuStarvationCheckInterval) >>
sampler.recordClockDrift(delta - cpuStarvationCheckInterval) >>
IO.realTime
.flatMap(fd =>
(onCpuStarvationWarn(
CpuStarvationWarningMetrics(
fd,
delta - cpuStarvationCheckInterval,
cpuStarvationCheckThreshold,
cpuStarvationCheckInterval)) *> metrics.incCpuStarvationCount)
cpuStarvationCheckInterval)) *> sampler.incCpuStarvationCount)
.whenA(delta >= threshold)) >> go(now)
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,27 +1,46 @@
/*
* Copyright 2020-2024 Typelevel
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package cats.effect.metrics

import cats.effect.IO
import scala.concurrent.duration._

sealed trait CpuStarvationMetrics {

/**
* Returns the current number of times CPU starvation has occurred.
*
* @note
* the value may differ between invocations
*/
def starvationCount(): Long

/**
* Returns the current (last) observed clock drift.
*
* @note
* the value may differ between invocations
*/
def clockDriftCurrent(): FiniteDuration

/**
* Returns the maximum clock drift observed since the launch.
*
* @note
* the value may differ between invocations
*/
def clockDriftMax(): FiniteDuration
}

object CpuStarvationMetrics {

private[metrics] def apply(sampler: CpuStarvationSampler): CpuStarvationMetrics =
new CpuStarvationMetrics {
def starvationCount(): Long =
sampler.cpuStarvationCount()

import scala.concurrent.duration.FiniteDuration
def clockDriftCurrent(): FiniteDuration =
sampler.clockDriftCurrentMs().millis

private[effect] trait CpuStarvationMetrics {
def incCpuStarvationCount: IO[Unit]
def clockDriftMax(): FiniteDuration =
sampler.clockDriftMaxMs().millis
}

def recordClockDrift(drift: FiniteDuration): IO[Unit]
}
Loading

0 comments on commit 2d3ff52

Please sign in to comment.