From 63f739af0cd1b1ed4c78e329ce251c26890196a5 Mon Sep 17 00:00:00 2001 From: Holger Schmidt Date: Thu, 11 Apr 2024 00:16:05 +0200 Subject: [PATCH] rewrote Cmd.batchedThrottle to Dispatch.batchThrottled extension because it feels more natural to use it that way with a dispatch inside an ofEffect that produces values rapidly --- src/Fabulous.Tests/CmdTests.fs | 32 +++++----- src/Fabulous/Cmd.fs | 111 +++++++++++++++++---------------- 2 files changed, 72 insertions(+), 71 deletions(-) diff --git a/src/Fabulous.Tests/CmdTests.fs b/src/Fabulous.Tests/CmdTests.fs index 6d026bd87..f3aed2229 100644 --- a/src/Fabulous.Tests/CmdTests.fs +++ b/src/Fabulous.Tests/CmdTests.fs @@ -153,7 +153,7 @@ type ``Cmd tests``() = } [] - member _.``Cmd.batchedThrottle dispatches all undispatched values on interval expiry``() = + member _.``Dispatch.batchThrottled dispatches all undispatched values on interval expiry``() = async { let mutable messageCount = 0 let mutable dispatched = [] // records dispatched messages latest first @@ -162,12 +162,12 @@ type ``Cmd tests``() = messageCount <- messageCount + 1 dispatched <- msg :: dispatched - let batchedThrottleCmd, _ = Cmd.batchedThrottle 100 NewValues + let batchedThrottleCmd, _ = dispatch.batchThrottled(100, NewValues) - batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch - batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch - batchedThrottleCmd 3 |> CmdTestsHelper.execute dispatch - batchedThrottleCmd 4 |> CmdTestsHelper.execute dispatch + batchedThrottleCmd 1 + batchedThrottleCmd 2 + batchedThrottleCmd 3 + batchedThrottleCmd 4 do! Async.Sleep 200 // Wait longer than the throttle interval @@ -177,7 +177,7 @@ type ``Cmd tests``() = } [] - member _.``Cmd.batchedThrottle dispatches messages immediately if interval not expired``() = + member _.``Dispatch.batchThrottled dispatches messages immediately if interval not expired``() = async { let mutable messageCount = 0 let mutable dispatched = [] // records dispatched messages latest first @@ -186,10 +186,10 @@ type ``Cmd tests``() = messageCount <- messageCount + 1 dispatched <- msg :: dispatched - let batchedThrottleCmd, _ = Cmd.batchedThrottle 100 NewValues + let batchedThrottleCmd, _ = dispatch.batchThrottled(100, NewValues) - batchedThrottleCmd 1 |> CmdTestsHelper.execute dispatch - batchedThrottleCmd 2 |> CmdTestsHelper.execute dispatch + batchedThrottleCmd 1 + batchedThrottleCmd 2 // Only the first value should have been dispatched immediately Assert.AreEqual(1, messageCount) @@ -199,8 +199,8 @@ type ``Cmd tests``() = giving second value time to dispatch and elapsing time until next dispatch *) do! Async.Sleep 210 - batchedThrottleCmd 3 |> CmdTestsHelper.execute dispatch - batchedThrottleCmd 4 |> CmdTestsHelper.execute dispatch + batchedThrottleCmd 3 + batchedThrottleCmd 4 // Second value should have dispatched delayed, third immediately Assert.AreEqual(3, messageCount) @@ -214,7 +214,7 @@ type ``Cmd tests``() = } [] - member _.``Cmd.batchedThrottle factory can be awaited for completion``() = + member _.``Dispatch.batchThrottled factory can be awaited for completion``() = async { let mutable messageCount = 0 let mutable dispatched = [] // records dispatched messages latest first @@ -223,10 +223,10 @@ type ``Cmd tests``() = messageCount <- messageCount + 1 dispatched <- msg :: dispatched - let createCmd, awaitNextDispatch = Cmd.batchedThrottle 100 NewValues + let createCmd, awaitNextDispatch = dispatch.batchThrottled(100, NewValues) - createCmd 1 |> CmdTestsHelper.execute dispatch - createCmd 2 |> CmdTestsHelper.execute dispatch + createCmd 1 + createCmd 2 // Only the first value should have been dispatched immediately Assert.AreEqual(1, messageCount) diff --git a/src/Fabulous/Cmd.fs b/src/Fabulous/Cmd.fs index 8c41c23ac..cc83a4eb7 100644 --- a/src/Fabulous/Cmd.fs +++ b/src/Fabulous/Cmd.fs @@ -1,5 +1,6 @@ namespace Fabulous +open System.Runtime.CompilerServices open System.Threading open System.Threading.Tasks @@ -312,27 +313,28 @@ module Cmd = cts.Token )) ] +type DispatchExtensions = + /// - /// Creates a factory for Commands that dispatch messages with a list of pending values at a fixed maximum rate, - /// ensuring that all pending values are dispatched when the specified interval elapses. - /// This function is similar to , but instead of dispatching only the last value, - /// it remembers and dispatches all undispatched values within the specified interval. - /// Helpful for scenarios where you want to throttle messages but cannot afford to lose any of the values they carry, - /// ensuring all values are processed at a controlled rate. + /// Creates a throttled dispatch factory that dispatches values in batches at a fixed minimum interval/maximum rate + /// while ensuring that all values are dispatched eventually. + /// This helps throttle the message dispatch of a rapid producer to avoid overloading the MVU loop + /// without dropping any of the carried values - ensuring all values are processed in batches at a controlled rate. /// Note that this function creates an object with internal state and is intended to be used per Program /// or longer-running background process rather than once per message in the update function. /// - /// The minimum time interval between two consecutive Command executions in milliseconds. - /// A function that maps a list of factory input values to a message for dispatch. + /// The minimum time interval between two consecutive dispatches in milliseconds. + /// A function that maps a list of pending input values to a message for dispatch. /// - /// Two methods - the first being a Command factory function that maps a list of input values to a Command - /// which dispatches a message (mapped from the pending values), - /// either immediately or after a delay respecting the interval, while remembering and dispatching all remembered values - /// when the interval has elapsed, ensuring no values are lost. - /// The second can be used for awaiting the next dispatch from the outside while adding some buffer time. + /// Two functions. The first has a Dispatch signature and is used to feed a single value into the factory, + /// where it is either dispatched immediately or after a delay respecting the interval, + /// batched with other pending values in the order they were fed in. + /// The second can be used for awaiting the next dispatch from the outside + /// - while optionally adding some buffer time (in milliseconds) to account for race condiditions. /// - let batchedThrottle (interval: int) (mapValuesToMsg: 'value list -> 'msg) : ('value -> Cmd<'msg>) * (System.TimeSpan option -> Async) = - let rateLimit = System.TimeSpan.FromMilliseconds(float interval) + [] + static member batchThrottled((dispatch: Dispatch<'msg>), interval, (mapBatchToMsg: 'value list -> 'msg)) = + let rateLimit = System.TimeSpan.FromMilliseconds(interval) let funLock = obj() // ensures safe access to resources shared across different threads let mutable lastDispatch = System.DateTime.MinValue let mutable pendingValues: 'value list = [] @@ -343,49 +345,48 @@ module Cmd = lastDispatch.Add(rateLimit) - System.DateTime.UtcNow // dispatches all pendingValues and resets them while updating lastDispatch - let dispatchBatch (dispatch: 'msg -> unit) = + let dispatchBatch () = // Dispatch in the order they were received - pendingValues |> List.rev |> mapValuesToMsg |> dispatch + pendingValues |> List.rev |> mapBatchToMsg |> dispatch lastDispatch <- System.DateTime.UtcNow pendingValues <- [] - // a factory function mapping input values to sleeping Commands dispatching all pending messages - let factory = + // a function with the Dispatch signature for feeding a single value into the throttled batch factory + let dispatchSingle = fun (value: 'value) -> - [ fun dispatch -> - lock funLock (fun () -> - let untilNextDispatch = getTimeUntilNextDispatch() - pendingValues <- value :: pendingValues - - // If the interval has elapsed since the last dispatch, dispatch all pending messages - if untilNextDispatch <= System.TimeSpan.Zero then - dispatchBatch dispatch - else // schedule dispatch - - // if the the last sleeping dispatch can still be cancelled, do so - if cts <> null then - cts.Cancel() - cts.Dispose() - - // used to enable cancelling this dispatch if newer values come into the factory - cts <- new CancellationTokenSource() - - Async.Start( - async { - // wait only as long as we have to before next dispatch - do! Async.Sleep(untilNextDispatch) - - lock funLock (fun () -> - dispatchBatch dispatch - - // done; invalidate own cancellation - if cts <> null then - cts.Dispose() - cts <- null) - }, - cts.Token - )) ] + lock funLock (fun () -> + let untilNextDispatch = getTimeUntilNextDispatch() + pendingValues <- value :: pendingValues + + // If the interval has elapsed since the last dispatch, dispatch all pending messages + if untilNextDispatch <= System.TimeSpan.Zero then + dispatchBatch() + else // schedule dispatch + + // if the the last sleeping dispatch can still be cancelled, do so + if cts <> null then + cts.Cancel() + cts.Dispose() + + // used to enable cancelling this dispatch if newer values come into the factory + cts <- new CancellationTokenSource() + + Async.Start( + async { + // wait only as long as we have to before next dispatch + do! Async.Sleep(untilNextDispatch) + + lock funLock (fun () -> + dispatchBatch() + + // done; invalidate own cancellation + if cts <> null then + cts.Dispose() + cts <- null) + }, + cts.Token + )) // a function to wait until after the next async dispatch + some buffer time to ensure the dispatch is complete let awaitNextDispatch buffer = @@ -395,12 +396,12 @@ module Cmd = let untilAfterNextDispatch = getTimeUntilNextDispatch() + match buffer with - | Some value -> value + | Some value -> System.TimeSpan.FromMilliseconds(value) | None -> System.TimeSpan.Zero if untilAfterNextDispatch > System.TimeSpan.Zero then do! Async.Sleep(untilAfterNextDispatch) }) - // return both the factory and the await helper - factory, awaitNextDispatch + // return both the dispatch and the await helper + dispatchSingle, awaitNextDispatch