-
Notifications
You must be signed in to change notification settings - Fork 519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
IOLocal
propagation for unsafe access
#3636
base: series/3.x
Are you sure you want to change the base?
Changes from all commits
0b88c01
db743e2
716ef32
0a69caf
2775064
270764f
d55489d
2cf72a5
cb3859d
7dce01c
5e171ac
c2f312d
638930d
9174c6a
1987e3a
02a43a6
a7bf748
145fc0e
fa99a5c
6cad03c
bb5d4b1
7517755
8d8e004
3589db4
522677e
6cc4d38
ac88480
49e5c30
925f504
d63a6ff
d4549fb
2502045
535fc8a
d854799
f070552
2cf1d8a
0eec9dd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,19 @@ | ||
/* | ||
* Copyright 2020-2024 Typelevel | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package cats.effect | ||
|
||
private[effect] trait IOLocalPlatform[A] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Copyright 2020-2024 Typelevel | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package cats.effect | ||
|
||
import IOFiberConstants.ioLocalPropagation | ||
|
||
private[effect] trait IOLocalPlatform[A] { self: IOLocal[A] => | ||
|
||
/** | ||
* Returns a [[java.lang.ThreadLocal]] view of this [[IOLocal]] that allows to unsafely get, | ||
* set, and remove (aka reset) the value in the currently running fiber. The system property | ||
* `cats.effect.ioLocalPropagation` must be `true`, otherwise throws an | ||
* [[java.lang.UnsupportedOperationException]]. | ||
*/ | ||
def unsafeThreadLocal(): ThreadLocal[A] = if (ioLocalPropagation) | ||
Comment on lines
+23
to
+29
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the new API based on the
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this is great, all of it. |
||
new ThreadLocal[A] { | ||
override def get(): A = { | ||
val fiber = IOFiber.currentIOFiber() | ||
val state = if (fiber ne null) fiber.getLocalState() else IOLocalState.empty | ||
self.getOrDefault(state) | ||
} | ||
|
||
override def set(value: A): Unit = { | ||
val fiber = IOFiber.currentIOFiber() | ||
if (fiber ne null) { | ||
fiber.setLocalState(self.set(fiber.getLocalState(), value)) | ||
} | ||
} | ||
|
||
override def remove(): Unit = { | ||
val fiber = IOFiber.currentIOFiber() | ||
if (fiber ne null) { | ||
fiber.setLocalState(self.reset(fiber.getLocalState())) | ||
} | ||
} | ||
} | ||
else | ||
throw new UnsupportedOperationException( | ||
"IOLocal-ThreadLocal propagation is disabled.\n" + | ||
"Enable by setting cats.effect.ioLocalPropagation=true." | ||
) | ||
|
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -107,9 +107,18 @@ private final class IOFiber[A]( | |
@volatile | ||
private[this] var outcome: OutcomeIO[A] = _ | ||
|
||
def getLocalState(): IOLocalState = localState | ||
|
||
def setLocalState(s: IOLocalState): Unit = localState = s | ||
|
||
override def run(): Unit = { | ||
// insert a read barrier after every async boundary | ||
readBarrier() | ||
|
||
if (ioLocalPropagation) { | ||
IOFiber.setCurrentIOFiber(this) | ||
} | ||
|
||
(resumeTag: @switch) match { | ||
case 0 => execR() | ||
case 1 => asyncContinueSuccessfulR() | ||
|
@@ -121,6 +130,10 @@ private final class IOFiber[A]( | |
case 7 => autoCedeR() | ||
case 8 => () // DoneR | ||
} | ||
|
||
if (ioLocalPropagation) { | ||
IOFiber.setCurrentIOFiber(null) | ||
} | ||
Comment on lines
+134
to
+136
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We definitely need to do some careful benchmarking here. This particular bit is very sensitive to inliner shenanigans. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We meaning you 😛 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah yeah yeah… Give me a pair of shas! :) |
||
} | ||
|
||
/* backing fields for `cancel` and `join` */ | ||
|
@@ -1559,6 +1572,23 @@ private object IOFiber { | |
@static private[IOFiber] val OutcomeCanceled = Outcome.Canceled() | ||
@static private[effect] val RightUnit = Right(()) | ||
|
||
@static private[this] val threadLocal = new ThreadLocal[IOFiber[_]] | ||
@static def currentIOFiber(): IOFiber[_] = { | ||
val thread = Thread.currentThread() | ||
if (thread.isInstanceOf[WorkerThread[_]]) | ||
thread.asInstanceOf[WorkerThread[_]].currentIOFiber | ||
else | ||
threadLocal.get() | ||
} | ||
|
||
@static private def setCurrentIOFiber(f: IOFiber[_]): Unit = { | ||
val thread = Thread.currentThread() | ||
if (thread.isInstanceOf[WorkerThread[_]]) | ||
thread.asInstanceOf[WorkerThread[_]].currentIOFiber = f | ||
else | ||
threadLocal.set(f) | ||
} | ||
|
||
@static def onFatalFailure(t: Throwable): Nothing = { | ||
val interrupted = Thread.interrupted() | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since JS/Native currently lack a mechanism to eliminate branches based on system properties at link- or run-time we just hard-code to
false
to DCE at compile-time.