From dae33c0c1a2965f76e33bdd4f216c440ca23d80b Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Fri, 9 Aug 2024 15:11:50 -0400 Subject: [PATCH 01/19] Bug fix: new job could be ignored if it arrived with precise timing. --- src/lime/system/ThreadPool.hx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index 7fd6561b95..4a3bae7ef7 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -438,7 +438,7 @@ class ThreadPool extends WorkOutput if (interruption == null || output.__jobComplete.value) { // Work is done; wait for more. - event = null; + event = interruption; } else if(#if (haxe_ver >= 4.2) Std.isOfType #else Std.is #end (interruption, ThreadEvent)) { From bf4711a01d7a3cd9bb8e2bdf86fa653bdea356d5 Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Fri, 9 Aug 2024 15:58:48 -0400 Subject: [PATCH 02/19] Avoid sending `JobData` back to the main thread. The main thread can easily look these up by ID, and in HTML5, sending the full `JobData` can cause errors. --- src/lime/app/Future.hx | 6 ---- src/lime/system/ThreadPool.hx | 36 ++++++++++++----------- src/lime/system/WorkOutput.hx | 54 ++++++++++------------------------- 3 files changed, 35 insertions(+), 61 deletions(-) diff --git a/src/lime/app/Future.hx b/src/lime/app/Future.hx index 2af97182d4..780a243b08 100644 --- a/src/lime/app/Future.hx +++ b/src/lime/app/Future.hx @@ -416,17 +416,11 @@ import lime.utils.Log; var result = bundle.work.dispatch(bundle.state); if (result != null || bundle.legacyCode) { - #if (lime_threads && html5) - bundle.work.makePortable(); - #end output.sendComplete(result); } } catch (e:Dynamic) { - #if (lime_threads && html5) - bundle.work.makePortable(); - #end output.sendError(e); } } diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index 4a3bae7ef7..f997abe69e 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -245,12 +245,12 @@ class ThreadPool extends WorkOutput var thread:Thread = __activeThreads[job.id]; if (idleThreads < minThreads) { - thread.sendMessage(new ThreadEvent(WORK, null, null)); + thread.sendMessage({event: CANCEL}); __idleThreads.push(thread); } else { - thread.sendMessage(new ThreadEvent(EXIT, null, null)); + thread.sendMessage({event: EXIT}); } } #end @@ -270,10 +270,10 @@ class ThreadPool extends WorkOutput __activeJobs.clear(); #if lime_threads - // Cancel idle threads if there are more than the minimum. + // Exit idle threads if there are more than the minimum. while (idleThreads > minThreads) { - __idleThreads.pop().sendMessage(new ThreadEvent(EXIT, null, null)); + __idleThreads.pop().sendMessage({event: EXIT}); } #end @@ -310,7 +310,7 @@ class ThreadPool extends WorkOutput var thread:Thread = __activeThreads[data.id]; if (thread != null) { - thread.sendMessage(new ThreadEvent(WORK, null, null)); + thread.sendMessage({event: CANCEL}); __activeThreads.remove(data.id); __idleThreads.push(thread); } @@ -395,7 +395,7 @@ class ThreadPool extends WorkOutput { event = Thread.readMessage(true); } - while (!#if (haxe_ver >= 4.2) Std.isOfType #else Std.is #end (event, ThreadEvent)); + while (event == null || !Reflect.hasField(event, "event")); output.resetJobProgress(); } @@ -409,7 +409,7 @@ class ThreadPool extends WorkOutput return; } - if (event.event != WORK || event.job == null) + if (event.event != WORK || !#if (haxe_ver >= 4.2) Std.isOfType #else Std.is #end (event.job, JobData)) { // Go idle. event = null; @@ -440,7 +440,7 @@ class ThreadPool extends WorkOutput // Work is done; wait for more. event = interruption; } - else if(#if (haxe_ver >= 4.2) Std.isOfType #else Std.is #end (interruption, ThreadEvent)) + else if(Reflect.hasField(interruption, "event")) { // Work on the new job. event = interruption; @@ -494,7 +494,7 @@ class ThreadPool extends WorkOutput var thread:Thread = __idleThreads.isEmpty() ? createThread(__executeThread) : __idleThreads.pop(); __activeThreads[job.id] = thread; - thread.sendMessage(new ThreadEvent(WORK, null, job)); + thread.sendMessage({event: WORK, job: job}); } #end } @@ -539,15 +539,19 @@ class ThreadPool extends WorkOutput var threadEvent:ThreadEvent; while ((threadEvent = __jobOutput.pop(false)) != null) { - if (!__activeJobs.exists(threadEvent.job)) + if (threadEvent.jobID != null) { - // Ignore events from canceled jobs. - continue; + activeJob = __activeJobs.getByID(threadEvent.jobID); + } + else + { + activeJob = threadEvent.job; } - // Get by ID because in HTML5, the object will have been cloned, - // which will interfere with attempts to test equality. - activeJob = __activeJobs.getByID(threadEvent.job.id); + if (activeJob == null || !__activeJobs.exists(activeJob)) + { + continue; + } if (mode == MULTI_THREADED) { @@ -582,7 +586,7 @@ class ThreadPool extends WorkOutput if (currentThreads > maxThreads || __jobQueue.length == 0 && currentThreads > minThreads) { - thread.sendMessage(new ThreadEvent(EXIT, null, null)); + thread.sendMessage({event: EXIT}); } else { diff --git a/src/lime/system/WorkOutput.hx b/src/lime/system/WorkOutput.hx index 433fad10e3..a35b5fb71c 100644 --- a/src/lime/system/WorkOutput.hx +++ b/src/lime/system/WorkOutput.hx @@ -105,12 +105,11 @@ class WorkOutput #if (lime_threads && html5) if (mode == MULTI_THREADED) { - activeJob.doWork.makePortable(); - Thread.returnMessage(new ThreadEvent(COMPLETE, message, activeJob), transferList); + Thread.returnMessage({event: COMPLETE, message: message, jobID: activeJob.id}, transferList); } else #end - __jobOutput.add(new ThreadEvent(COMPLETE, message, activeJob)); + __jobOutput.add({event: COMPLETE, message: message, jobID: activeJob.id}); } } @@ -130,12 +129,11 @@ class WorkOutput #if (lime_threads && html5) if (mode == MULTI_THREADED) { - activeJob.doWork.makePortable(); - Thread.returnMessage(new ThreadEvent(ERROR, message, activeJob), transferList); + Thread.returnMessage({event: ERROR, message: message, jobID: activeJob.id}, transferList); } else #end - __jobOutput.add(new ThreadEvent(ERROR, message, activeJob)); + __jobOutput.add({event: ERROR, message: message, jobID: activeJob.id}); } } @@ -153,12 +151,11 @@ class WorkOutput #if (lime_threads && html5) if (mode == MULTI_THREADED) { - activeJob.doWork.makePortable(); - Thread.returnMessage(new ThreadEvent(PROGRESS, message, activeJob), transferList); + Thread.returnMessage({event: PROGRESS, message: message, jobID: activeJob.id}, transferList); } else #end - __jobOutput.add(new ThreadEvent(PROGRESS, message, activeJob)); + __jobOutput.add({event: PROGRESS, message: message, jobID: activeJob.id}); } } @@ -343,43 +340,22 @@ class JobData #if haxe4 enum #else @:enum #end abstract ThreadEventType(String) { - /** - Sent by the background thread, indicating completion. - **/ + // Events sent from a worker thread to the main thread var COMPLETE = "COMPLETE"; - /** - Sent by the background thread, indicating failure. - **/ var ERROR = "ERROR"; - /** - Sent by the background thread. - **/ var PROGRESS = "PROGRESS"; - /** - Sent by the main thread, indicating that the provided job should begin - in place of any ongoing job. If `state == null`, the existing job will - stop and the thread will go idle. (To run a job with no argument, set - `state = {}` instead.) - **/ + + // Commands sent from the main thread to a worker thread var WORK = "WORK"; - /** - Sent by the main thread to shut down a thread. - **/ + var CANCEL = "CANCEL"; var EXIT = "EXIT"; } -class ThreadEvent -{ - public var event(default, null):ThreadEventType; - public var message(default, null):State; - public var job(default, null):JobData; - - public inline function new(event:ThreadEventType, message:State, job:JobData) - { - this.event = event; - this.message = message; - this.job = job; - } +typedef ThreadEvent = { + var event:ThreadEventType; + @:optional var message:Dynamic; + @:optional var job:JobData; + @:optional var jobID:Int; } class JSAsync From 0b83b7d45ef18b4bb9a77ae37f18d6c5169e3a83 Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Sun, 11 Aug 2024 16:11:19 -0400 Subject: [PATCH 03/19] Fix typo. --- src/lime/_internal/backend/html5/HTML5Thread.hx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/lime/_internal/backend/html5/HTML5Thread.hx b/src/lime/_internal/backend/html5/HTML5Thread.hx index 845a2924d7..022a085da1 100644 --- a/src/lime/_internal/backend/html5/HTML5Thread.hx +++ b/src/lime/_internal/backend/html5/HTML5Thread.hx @@ -477,7 +477,7 @@ abstract Message(Dynamic) from Dynamic to Dynamic // Skip `null` for obvious reasons. return object == null // No need to preserve a primitive type. - || !#if (haxe_ver >= 4.2) Std.isOfType #else untyped __js__ #end (object, Object) + || !#if (haxe_ver >= 4.2) Std.isOfType #else Std.is #end (object, Object) // Objects with this field have been deliberately excluded. || Reflect.field(object, SKIP_FIELD) == true // A `Uint8Array` (the type used by `haxe.io.Bytes`) can have From 8f631fe3ad2d7ad256ff2fafb7cb00f7dac9335c Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Sun, 11 Aug 2024 21:35:12 -0400 Subject: [PATCH 04/19] Optimize `ThreadPool` slightly. `__activeThreads` and `__idleThreads` only need to be allocated for multi-threaded pools. Plus, there's no benefit to using a `List` here; we only add to and remove from the end. And finally, checking `event.job == null` instead of `isOfType()` is faster and avoids an issue in HTML5. Sadly it is less safe, so we might need to revisit it eventually. --- src/lime/system/ThreadPool.hx | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index f997abe69e..c33390a008 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -188,13 +188,13 @@ class ThreadPool extends WorkOutput /** The set of threads actively running a job. **/ - private var __activeThreads:Map = new Map(); + private var __activeThreads:Map; /** A list of idle threads. Not to be confused with `idleThreads`, a public variable equal to `__idleThreads.length`. **/ - private var __idleThreads:List = new List(); + private var __idleThreads:Array; #end private var __jobQueue:JobList = new JobList(); @@ -219,6 +219,14 @@ class ThreadPool extends WorkOutput this.minThreads = minThreads; this.maxThreads = maxThreads; + + #if lime_threads + if (this.mode == MULTI_THREADED) + { + __activeThreads = new Map(); + __idleThreads = []; + } + #end } /** @@ -409,7 +417,7 @@ class ThreadPool extends WorkOutput return; } - if (event.event != WORK || !#if (haxe_ver >= 4.2) Std.isOfType #else Std.is #end (event.job, JobData)) + if (event.event != WORK || event.job == null) { // Go idle. event = null; @@ -492,7 +500,7 @@ class ThreadPool extends WorkOutput job.doWork.makePortable(); #end - var thread:Thread = __idleThreads.isEmpty() ? createThread(__executeThread) : __idleThreads.pop(); + var thread:Thread = __idleThreads.length == 0 ? createThread(__executeThread) : __idleThreads.pop(); __activeThreads[job.id] = thread; thread.sendMessage({event: WORK, job: job}); } From d0cef427bcfec2f4f4cb6b5cd019e51d6d33181a Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Thu, 15 Aug 2024 15:39:07 -0400 Subject: [PATCH 05/19] Revert `BackgroundWorker` to its 8.1.3 version. It looks like we'll want to take `BackgroundWorker` in a different direction, so for the moment it's safest not to change anything about it. That way, there's only one historical version to maintain backwards compatibility with. --- src/lime/system/BackgroundWorker.hx | 177 +++++++++++++++++++++++++++- 1 file changed, 175 insertions(+), 2 deletions(-) diff --git a/src/lime/system/BackgroundWorker.hx b/src/lime/system/BackgroundWorker.hx index 5be17e9eae..ba1b7340a0 100644 --- a/src/lime/system/BackgroundWorker.hx +++ b/src/lime/system/BackgroundWorker.hx @@ -1,4 +1,177 @@ package lime.system; -@:deprecated("Replace references to lime.system.BackgroundWorker with lime.system.ThreadPool. As the API is identical, no other changes are necessary.") -typedef BackgroundWorker = ThreadPool; +import lime.app.Application; +import lime.app.Event; +#if sys +#if haxe4 +import sys.thread.Deque; +import sys.thread.Thread; +#elseif cpp +import cpp.vm.Deque; +import cpp.vm.Thread; +#elseif neko +import neko.vm.Deque; +import neko.vm.Thread; +#end +#end +#if !lime_debug +@:fileXml('tags="haxe,release"') +@:noDebug +#end + +/** + A background worker executes a single function on a background thread, + allowing it to avoid blocking the main thread. However, only system targets + have thread support, meaning the function will block on any other target. + @see `ThreadPool` for improved thread safety, HTML5 threads, and more. +**/ +class BackgroundWorker +{ + private static var MESSAGE_COMPLETE = "__COMPLETE__"; + private static var MESSAGE_ERROR = "__ERROR__"; + + public var canceled(default, null):Bool; + public var completed(default, null):Bool; + public var doWork = new EventVoid>(); + public var onComplete = new EventVoid>(); + public var onError = new EventVoid>(); + public var onProgress = new EventVoid>(); + + @:noCompletion private var __runMessage:Dynamic; + #if (cpp || neko) + @:noCompletion private var __messageQueue:Deque; + @:noCompletion private var __workerThread:Thread; + #end + + public function new() {} + + public function cancel():Void + { + canceled = true; + + #if (cpp || neko) + __workerThread = null; + #end + } + + public function run(message:Dynamic = null):Void + { + canceled = false; + completed = false; + __runMessage = message; + + #if (cpp || neko) + __messageQueue = new Deque(); + __workerThread = Thread.create(__doWork); + + // TODO: Better way to do this + + if (Application.current != null) + { + Application.current.onUpdate.add(__update); + } + #else + __doWork(); + #end + } + + public function sendComplete(message:Dynamic = null):Void + { + completed = true; + + #if (cpp || neko) + __messageQueue.add(MESSAGE_COMPLETE); + __messageQueue.add(message); + #else + if (!canceled) + { + canceled = true; + onComplete.dispatch(message); + } + #end + } + + public function sendError(message:Dynamic = null):Void + { + #if (cpp || neko) + __messageQueue.add(MESSAGE_ERROR); + __messageQueue.add(message); + #else + if (!canceled) + { + canceled = true; + onError.dispatch(message); + } + #end + } + + public function sendProgress(message:Dynamic = null):Void + { + #if (cpp || neko) + __messageQueue.add(message); + #else + if (!canceled) + { + onProgress.dispatch(message); + } + #end + } + + @:noCompletion private function __doWork():Void + { + doWork.dispatch(__runMessage); + + // #if (cpp || neko) + // + // __messageQueue.add (MESSAGE_COMPLETE); + // + // #else + // + // if (!canceled) { + // + // canceled = true; + // onComplete.dispatch (null); + // + // } + // + // #end + } + + @:noCompletion private function __update(deltaTime:Int):Void + { + #if (cpp || neko) + var message = __messageQueue.pop(false); + + if (message != null) + { + if (message == MESSAGE_ERROR) + { + Application.current.onUpdate.remove(__update); + + if (!canceled) + { + canceled = true; + onError.dispatch(__messageQueue.pop(false)); + } + } + else if (message == MESSAGE_COMPLETE) + { + Application.current.onUpdate.remove(__update); + + if (!canceled) + { + canceled = true; + onComplete.dispatch(__messageQueue.pop(false)); + } + } + else + { + if (!canceled) + { + onProgress.dispatch(message); + } + } + } + #end + } +} From 52931a8dc704f3cc2686381885f23159fd0b45bf Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Thu, 15 Aug 2024 16:16:17 -0400 Subject: [PATCH 06/19] Revert to `Future`'s behavior from 8.1.3. As with `BackgroundWorker`, we're postponing major changes to give us more time to consider. --- src/lime/app/Future.hx | 216 +++++++++------------------------- src/lime/graphics/Image.hx | 2 +- src/lime/media/AudioBuffer.hx | 2 +- 3 files changed, 55 insertions(+), 165 deletions(-) diff --git a/src/lime/app/Future.hx b/src/lime/app/Future.hx index 780a243b08..8fb51d96c3 100644 --- a/src/lime/app/Future.hx +++ b/src/lime/app/Future.hx @@ -67,24 +67,36 @@ import lime.utils.Log; @:noCompletion private var __progressListeners:ArrayInt->Void>; /** - @param work Deprecated; use `Future.withEventualValue()` instead. - @param useThreads Deprecated; use `Future.withEventualValue()` instead. + @param work Optional: a function to compute this future's value. + @param useThreads Whether to run `work` on a background thread, where supported. + If false or if this isn't a system target, it will run immediately on the main thread. **/ - public function new(work:WorkFunctionT> = null, useThreads:Bool = false) + public function new(work:Void->T = null, useThreads:Bool = false) { if (work != null) { - var promise = new Promise(); - promise.future = this; - - #if (lime_threads && html5) + #if (lime_threads && !html5) if (useThreads) { - work.makePortable(); + var promise = new Promise(); + promise.future = this; + + FutureWork.run(work, promise); } + else #end - - FutureWork.run(dispatchWorkFunction, work, promise, useThreads ? MULTI_THREADED : SINGLE_THREADED, true); + { + try + { + value = work(); + isComplete = true; + } + catch (e:Dynamic) + { + error = e; + isError = true; + } + } } } @@ -196,29 +208,14 @@ import lime.utils.Log; else { var time = System.getTimer(); - var prevTime = time; var end = time + waitTime; - while (!isComplete && !isError && time <= end) + while (!isComplete && !isError && time <= end && FutureWork.activeJobs > 0) { - if (FutureWork.activeJobs < 1) - { - Log.error('Cannot block for a Future without a "work" function.'); - return this; - } - - if (FutureWork.singleThreadPool != null && FutureWork.singleThreadPool.activeJobs > 0) - { - @:privateAccess FutureWork.singleThreadPool.__update(time - prevTime); - } - else - { - #if sys - Sys.sleep(0.01); - #end - } + #if sys + Sys.sleep(0.01); + #end - prevTime = time; time = System.getTimer(); } @@ -305,41 +302,9 @@ import lime.utils.Log; future.value = value; return future; } - - /** - Creates a `Future` instance which will asynchronously compute a value. - - Once `work()` returns a non-null value, the `Future` will finish with that value. - If `work()` throws an error, the `Future` will finish with that error instead. - @param work A function that computes a value of type `T`. - @param state An argument to pass to `work()`. As this may be used on another thread, the - main thread must not access or modify `state` until the `Future` finishes. - @param mode Whether to use real threads as opposed to green threads. Green threads rely - on cooperative multitasking, meaning `work()` must return periodically to allow other code - enough time to run. In these cases, `work()` should return null to signal that it isn't finished. - @return A new `Future` instance. - @see https://en.wikipedia.org/wiki/Cooperative_multitasking - **/ - public static function withEventualValue(work:WorkFunction Null>, state:State, mode:ThreadMode = #if html5 SINGLE_THREADED #else MULTI_THREADED #end):Future - { - var future = new Future(); - var promise = new Promise(); - promise.future = future; - - FutureWork.run(work, state, promise, mode); - - return future; - } - - /** - (For backwards compatibility.) Dispatches the given zero-argument function. - **/ - @:noCompletion private static function dispatchWorkFunction(work:WorkFunction T>):Null - { - return work.dispatch(); - } } +#if (lime_threads && !html5) /** The class that handles asynchronous `work` functions passed to `new Future()`. **/ @@ -349,75 +314,34 @@ import lime.utils.Log; #end @:dox(hide) class FutureWork { - @:allow(lime.app.Future) - private static var singleThreadPool:ThreadPool; - #if lime_threads - private static var multiThreadPool:ThreadPool; - // It isn't safe to pass a promise object to a web worker, but since it's - // `@:generic` we can't store it as `Promise`. Instead, we'll store - // the two methods we need. - private static var promises:Map Dynamic, error:Dynamic -> Dynamic}> = new Map(); - #end + private static var threadPool:ThreadPool; + private static var promises:Map Dynamic, error:Dynamic -> Dynamic}>; + public static var minThreads(default, set):Int = 0; public static var maxThreads(default, set):Int = 1; public static var activeJobs(get, never):Int; - private static function getPool(mode:ThreadMode):ThreadPool - { - #if lime_threads - if (mode == MULTI_THREADED) { - if(multiThreadPool == null) { - multiThreadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED); - multiThreadPool.onComplete.add(multiThreadPool_onComplete); - multiThreadPool.onError.add(multiThreadPool_onError); - } - return multiThreadPool; - } - #end - if(singleThreadPool == null) { - singleThreadPool = new ThreadPool(minThreads, maxThreads, SINGLE_THREADED); - singleThreadPool.onComplete.add(singleThreadPool_onComplete); - singleThreadPool.onError.add(singleThreadPool_onError); - } - return singleThreadPool; - } - @:allow(lime.app.Future) - private static function run(work:WorkFunctionNull>, state:State, promise:Promise, mode:ThreadMode = MULTI_THREADED, legacyCode:Bool = false):Void + private static function run(work:Void->T, promise:Promise):Void { - var bundle = {work: work, state: state, promise: promise, legacyCode: legacyCode}; + if(threadPool == null) { + threadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED); + threadPool.onComplete.add(threadPool_onComplete); + threadPool.onError.add(threadPool_onError); - #if lime_threads - if (mode == MULTI_THREADED) - { - #if html5 - work.makePortable(); - #end - - bundle.promise = null; + promises = new Map(); } - #end - - var jobID:Int = getPool(mode).run(threadPool_doWork, bundle); - #if lime_threads - if (mode == MULTI_THREADED) - { - promises[jobID] = {complete: promise.complete, error: promise.error}; - } - #end + var jobID:Int = threadPool.run(threadPool_doWork, work); + promises[jobID] = {complete: promise.complete, error: promise.error}; } // Event Handlers - private static function threadPool_doWork(bundle:{work:WorkFunctionDynamic>, state:State, legacyCode:Bool}, output:WorkOutput):Void + private static function threadPool_doWork(work:Void->Dynamic, output:WorkOutput):Void { try { - var result = bundle.work.dispatch(bundle.state); - if (result != null || bundle.legacyCode) - { - output.sendComplete(result); - } + output.sendComplete(work()); } catch (e:Dynamic) { @@ -425,76 +349,42 @@ import lime.utils.Log; } } - private static function singleThreadPool_onComplete(result:Dynamic):Void + private static function threadPool_onComplete(result:Dynamic):Void { - singleThreadPool.activeJob.state.promise.complete(result); - } - - private static function singleThreadPool_onError(error:Dynamic):Void - { - singleThreadPool.activeJob.state.promise.error(error); - } - - #if lime_threads - private static function multiThreadPool_onComplete(result:Dynamic):Void - { - var promise = promises[multiThreadPool.activeJob.id]; - promises.remove(multiThreadPool.activeJob.id); + var promise = promises[threadPool.activeJob.id]; + promises.remove(threadPool.activeJob.id); promise.complete(result); } - private static function multiThreadPool_onError(error:Dynamic):Void + private static function threadPool_onError(error:Dynamic):Void { - var promise = promises[multiThreadPool.activeJob.id]; - promises.remove(multiThreadPool.activeJob.id); + var promise = promises[threadPool.activeJob.id]; + promises.remove(threadPool.activeJob.id); promise.error(error); } - #end // Getters & Setters @:noCompletion private static inline function set_minThreads(value:Int):Int { - if (singleThreadPool != null) - { - singleThreadPool.minThreads = value; - } - #if lime_threads - if (multiThreadPool != null) + if (threadPool != null) { - multiThreadPool.minThreads = value; + threadPool.minThreads = value; } - #end return minThreads = value; } @:noCompletion private static inline function set_maxThreads(value:Int):Int { - if (singleThreadPool != null) + if (threadPool != null) { - singleThreadPool.maxThreads = value; + threadPool.maxThreads = value; } - #if lime_threads - if (multiThreadPool != null) - { - multiThreadPool.maxThreads = value; - } - #end return maxThreads = value; } - @:noCompletion private static function get_activeJobs():Int + @:noCompletion private static inline function get_activeJobs():Int { - var sum:Int = 0; - if (singleThreadPool != null) - { - sum += singleThreadPool.activeJobs; - } - #if lime_threads - if (multiThreadPool != null) - { - sum += multiThreadPool.activeJobs; - } - #end - return sum; + return threadPool != null ? threadPool.activeJobs : 0; } } +#end diff --git a/src/lime/graphics/Image.hx b/src/lime/graphics/Image.hx index c825d9d84d..97385cdc3a 100644 --- a/src/lime/graphics/Image.hx +++ b/src/lime/graphics/Image.hx @@ -1002,7 +1002,7 @@ class Image return promise.future; #else - return Future.withEventualValue(fromBytes, bytes, MULTI_THREADED); + return new Future(fromBytes.bind(bytes), true); #end } diff --git a/src/lime/media/AudioBuffer.hx b/src/lime/media/AudioBuffer.hx index ff067229ab..967c8fbbe9 100644 --- a/src/lime/media/AudioBuffer.hx +++ b/src/lime/media/AudioBuffer.hx @@ -335,7 +335,7 @@ class AudioBuffer return promise.future; #else - return Future.withEventualValue(fromFiles, paths, MULTI_THREADED); + return new Future(fromFiles.bind(paths), true); #end } From 2866d099a2b094cec3890aece7a6a9f66dccdd85 Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Thu, 15 Aug 2024 16:31:48 -0400 Subject: [PATCH 07/19] Remove external link. While I put a lot of effort into that guide, we're changing several things suddenly, and I don't have time to make sure it's up to date. --- src/lime/system/ThreadPool.hx | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index c33390a008..1f1ef98dac 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -37,9 +37,7 @@ import lime._internal.backend.html5.HTML5Thread as Thread; `WorkOutput` object it receives. Calling `output.sendComplete()` will trigger an `onComplete` event on the main thread. - @see `lime.system.WorkOutput.WorkFunction` for important information about - `doWork`. - @see https://player03.com/openfl/threads-guide/ for a tutorial. + @see `lime.system.WorkOutput.WorkFunction` for important information about `doWork`. **/ #if !lime_debug @:fileXml('tags="haxe,release"') From 6873ae1fd4c763a01e2a252cfcadcc12f54e303b Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Thu, 15 Aug 2024 16:44:52 -0400 Subject: [PATCH 08/19] `Future.ready()` only works when threads are available. --- src/lime/app/Future.hx | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/lime/app/Future.hx b/src/lime/app/Future.hx index 8fb51d96c3..d409dced8e 100644 --- a/src/lime/app/Future.hx +++ b/src/lime/app/Future.hx @@ -201,6 +201,7 @@ import lime.utils.Log; **/ public function ready(waitTime:Int = -1):Future { + #if (lime_threads && !html5) if (isComplete || isError) { return this; @@ -221,6 +222,9 @@ import lime.utils.Log; return this; } + #else + return this; + #end } /** From ed05aa267407aa2cac7f8fef99f8d8a7c13f9081 Mon Sep 17 00:00:00 2001 From: ACrazyTown Date: Fri, 16 Aug 2024 21:55:23 +0200 Subject: [PATCH 09/19] Fix getContextsDevice on Hashlink --- src/lime/media/openal/ALC.hx | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/lime/media/openal/ALC.hx b/src/lime/media/openal/ALC.hx index 62a3b6e264..4cd4379d21 100644 --- a/src/lime/media/openal/ALC.hx +++ b/src/lime/media/openal/ALC.hx @@ -76,12 +76,13 @@ class ALC public static function getContextsDevice(context:ALContext):ALDevice { - #if (lime_cffi && lime_openal && !macro) #if !hl var handle:Dynamic = NativeCFFI.lime_alc_get_contexts_device(context); + #if (lime_cffi && lime_openal && !macro) + var handle:Dynamic = NativeCFFI.lime_alc_get_contexts_device(context); if (handle != null) { return new ALDevice(handle); - } #else #end + } #end return null; From 9b7c7914bf6339f315358b6feeaf9b757045fc62 Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Sat, 17 Aug 2024 23:42:23 -0400 Subject: [PATCH 10/19] Update `ThreadPool` documentation. --- src/lime/system/ThreadPool.hx | 41 ++++++++++++++++++++++++++--------- 1 file changed, 31 insertions(+), 10 deletions(-) diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index 1f1ef98dac..4eb9c3c7eb 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -15,12 +15,18 @@ import lime._internal.backend.html5.HTML5Thread as Thread; #end /** - A simple and thread-safe way to run a one or more asynchronous jobs. It - manages a queue of jobs, starting new ones once the old ones are done. + A thread pool executes one or more functions asynchronously. - It can also keep a certain number of threads (configurable via `minThreads`) - running in the background even when no jobs are available. This avoids the - not-insignificant overhead of stopping and restarting threads. + In multi-threaded mode, jobs run on background threads. In HTML5, this means + using web workers, which impose additional restrictions (see below). In + single-threaded mode, jobs run between frames on the main thread. To avoid + blocking, these jobs should only do a small amount of work at a time. + + In multi-threaded mode, the pool spins up new threads as jobs arrive (up to + `maxThreads`). If too many jobs arrive at once, it places them in a queue to + run when threads open up. If you run jobs frequently but not constantly, you + can also set `minThreads` to keep a certain number of threads alive, + avoiding the overhead of repeatedly spinning them up. Sample usage: @@ -33,11 +39,26 @@ import lime._internal.backend.html5.HTML5Thread as Thread; threadPool.run(processFile, url); } - For thread safety, the worker function should only give output through the - `WorkOutput` object it receives. Calling `output.sendComplete()` will - trigger an `onComplete` event on the main thread. - - @see `lime.system.WorkOutput.WorkFunction` for important information about `doWork`. + If upgrading from `BackgroundWorker`, you can use `ThreadPool` as a drop-in + replacement. It will then display some deprecation warnings indicating which + steps to take next. + + --- + + Guidelines to make your code work on all targets and configurations: + + - For thread safety and web worker compatibility, your work function should + only return data through the `WorkOutput` object it receives. + - For web worker compatibility, you should only send data to your work + function via the `State` object. But since this can be any object, you can + put an arbitrary amount of data there. + - For web worker compatibility, your work function must be static, and you + can't `bind()` any extra arguments. + - For single-threaded performance, your function should only do a small + amount of work at a time. Store progress in the `State` object so you can + pick up where you left off. You don't have to worry about timing: just aim + to take a small fraction of the frame's time, and `ThreadPool` will keep + running the function until enough time passes. **/ #if !lime_debug @:fileXml('tags="haxe,release"') From b3e44ba03dce56b5a2f7d3eff8e202bdbb29698f Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Sat, 17 Aug 2024 23:47:46 -0400 Subject: [PATCH 11/19] Run formatter. --- src/lime/app/Future.hx | 5 ++- src/lime/system/ThreadPool.hx | 77 ++++++++++++++++++++++++++--------- src/lime/system/WorkOutput.hx | 23 +++++++---- 3 files changed, 74 insertions(+), 31 deletions(-) diff --git a/src/lime/app/Future.hx b/src/lime/app/Future.hx index d409dced8e..64f36c5200 100644 --- a/src/lime/app/Future.hx +++ b/src/lime/app/Future.hx @@ -319,7 +319,7 @@ import lime.utils.Log; @:dox(hide) class FutureWork { private static var threadPool:ThreadPool; - private static var promises:Map Dynamic, error:Dynamic -> Dynamic}>; + private static var promises:MapDynamic, error:Dynamic->Dynamic}>; public static var minThreads(default, set):Int = 0; public static var maxThreads(default, set):Int = 1; @@ -328,7 +328,8 @@ import lime.utils.Log; @:allow(lime.app.Future) private static function run(work:Void->T, promise:Promise):Void { - if(threadPool == null) { + if (threadPool == null) + { threadPool = new ThreadPool(minThreads, maxThreads, MULTI_THREADED); threadPool.onComplete.add(threadPool_onComplete); threadPool.onError.add(threadPool_onError); diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index 4eb9c3c7eb..9dca868885 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -88,7 +88,7 @@ class ThreadPool extends WorkOutput frame. See `workIterations` for instructions to improve the accuracy of this estimate. **/ - public static var workLoad:Float = 1/2; + public static var workLoad:Float = 1 / 2; /** __Access this only from the main thread.__ @@ -171,16 +171,19 @@ class ThreadPool extends WorkOutput Dispatched at most once per job. **/ public var onComplete(default, null) = new EventVoid>(); + /** Dispatched on the main thread when `doWork` calls `sendError()`. Dispatched at most once per job. **/ public var onError(default, null) = new EventVoid>(); + /** Dispatched on the main thread when `doWork` calls `sendProgress()`. May be dispatched any number of times per job. **/ public var onProgress(default, null) = new EventVoid>(); + /** Dispatched on the main thread when a new job begins. Dispatched exactly once per job. @@ -199,6 +202,7 @@ class ThreadPool extends WorkOutput @:deprecated("Instead pass the callback to ThreadPool.run().") @:noCompletion @:dox(hide) public var doWork(get, never):PseudoEvent; + private var __doWork:WorkFunctionWorkOutput->Void>; private var __activeJobs:JobList; @@ -409,6 +413,7 @@ class ThreadPool extends WorkOutput **/ private static function __executeThread():Void { + // @formatter:off JSAsync.async({ var output:WorkOutput = #if html5 new WorkOutput(MULTI_THREADED) #else cast(Thread.readMessage(true), WorkOutput) #end; var event:ThreadEvent = null; @@ -467,7 +472,7 @@ class ThreadPool extends WorkOutput // Work is done; wait for more. event = interruption; } - else if(Reflect.hasField(interruption, "event")) + else if (Reflect.hasField(interruption, "event")) { // Work on the new job. event = interruption; @@ -481,6 +486,7 @@ class ThreadPool extends WorkOutput // Do it all again. } }); + // @formatter:on } #end @@ -538,8 +544,7 @@ class ThreadPool extends WorkOutput // `workLoad / frameRate` is the total time that pools may use per // frame. `workPriority / __totalWorkPriority` is this pool's // fraction of that total. - var maxTimeElapsed:Float = workPriority * workLoad - / (__totalWorkPriority * Application.current.window.frameRate); + var maxTimeElapsed:Float = workPriority * workLoad / (__totalWorkPriority * Application.current.window.frameRate); var startTime:Float = timestamp(); var timeElapsed:Float = 0; @@ -683,33 +688,56 @@ class ThreadPool extends WorkOutput } @:access(lime.system.ThreadPool) @:forward(canceled) -private abstract PseudoEvent(ThreadPool) from ThreadPool { +private abstract PseudoEvent(ThreadPool) from ThreadPool +{ @:noCompletion @:dox(hide) public var __listeners(get, never):Array; - private inline function get___listeners():Array { return []; }; + + private inline function get___listeners():Array + { + return []; + }; + @:noCompletion @:dox(hide) public var __repeat(get, never):Array; - private inline function get___repeat():Array { return []; }; - public function add(callback:Dynamic -> Void):Void { + private inline function get___repeat():Array + { + return []; + }; + + public function add(callback:Dynamic->Void):Void + { function callCallback(state:State, output:WorkOutput):Void { callback(state); } #if (lime_threads && html5) - if (this.mode == MULTI_THREADED) - throw "Unsupported operation; instead pass the callback to ThreadPool's constructor."; + if (this.mode == MULTI_THREADED) throw "Unsupported operation; instead pass the callback to ThreadPool's constructor."; else - this.__doWork = { func: callCallback }; + this.__doWork = {func: callCallback}; #else this.__doWork = callCallback; #end } public inline function cancel():Void {} + public inline function dispatch():Void {} - public inline function has(callback:Dynamic -> Void):Bool { return this.__doWork != null; } - public inline function remove(callback:Dynamic -> Void):Void { this.__doWork = null; } - public inline function removeAll():Void { this.__doWork = null; } + + public inline function has(callback:Dynamic->Void):Bool + { + return this.__doWork != null; + } + + public inline function remove(callback:Dynamic->Void):Void + { + this.__doWork = null; + } + + public inline function removeAll():Void + { + this.__doWork = null; + } } class JobList @@ -853,7 +881,8 @@ class JobList // Getters & Setters - private inline function set___addingWorkPriority(value:Bool):Bool { + private inline function set___addingWorkPriority(value:Bool):Bool + { if (pool != null && __addingWorkPriority != value && ThreadPool.isMainThread()) { if (value) @@ -888,17 +917,25 @@ class JobList that's in use by multiple jobs, the wrong job may be selected or canceled. **/ @:forward -abstract JobIdentifier(JobIdentifierImpl) from JobIdentifierImpl { - @:from private static inline function fromJob(job:JobData):JobIdentifier { +abstract JobIdentifier(JobIdentifierImpl) from JobIdentifierImpl +{ + @:from private static inline function fromJob(job:JobData):JobIdentifier + { return ID(job.id); } - @:from private static inline function fromID(id:Int):JobIdentifier { + + @:from private static inline function fromID(id:Int):JobIdentifier + { return ID(id); } - @:from private static inline function fromFunction(doWork:WorkFunctionWorkOutput->Void>):JobIdentifier { + + @:from private static inline function fromFunction(doWork:WorkFunctionWorkOutput->Void>):JobIdentifier + { return FUNCTION(doWork); } - @:from private static inline function fromState(state:State):JobIdentifier { + + @:from private static inline function fromState(state:State):JobIdentifier + { return STATE(state); } } diff --git a/src/lime/system/WorkOutput.hx b/src/lime/system/WorkOutput.hx index a35b5fb71c..48db2a5e2d 100644 --- a/src/lime/system/WorkOutput.hx +++ b/src/lime/system/WorkOutput.hx @@ -13,12 +13,10 @@ import neko.vm.Deque; import neko.vm.Thread; import neko.vm.Tls; #end - #if html5 import lime._internal.backend.html5.HTML5Thread as Thread; import lime._internal.backend.html5.HTML5Thread.Transferable; #end - #if macro import haxe.macro.Expr; @@ -54,6 +52,7 @@ class WorkOutput available on this target, `mode` will always be `SINGLE_THREADED`. **/ public var mode(get, never):ThreadMode; + #if lime_threads /** __Set this only via the constructor.__ @@ -65,6 +64,7 @@ class WorkOutput Messages sent by active jobs, received by the main thread. **/ private var __jobOutput:Deque = new Deque(); + /** Thread-local storage. Tracks whether `sendError()` or `sendComplete()` was called by this job. @@ -77,6 +77,7 @@ class WorkOutput Will be null in all other cases. **/ public var activeJob(get, set):Null; + @:noCompletion private var __activeJob:Tls = new Tls(); private inline function new(mode:Null) @@ -171,7 +172,8 @@ class WorkOutput var thread:Thread = Thread.create(executeThread); #if html5 - thread.onMessage.add(function(event:ThreadEvent) { + thread.onMessage.add(function(event:ThreadEvent) + { __jobOutput.add(event); }); #end @@ -195,6 +197,7 @@ class WorkOutput { return __activeJob.value; } + private inline function set_activeJob(value:JobData):JobData { return __activeJob.value = value; @@ -261,8 +264,8 @@ abstract WorkFunction(T) from T to T { switch (self.typeof().follow().toComplexType()) { - case TPath({ sub: "WorkFunction", params: [TPType(t)] }): - return macro ($self:$t)($a{args}); + case TPath({sub: "WorkFunction", params: [TPType(t)]}): + return macro($self : $t)($a{args}); default: throw "Underlying function type not found."; } @@ -275,8 +278,8 @@ abstract WorkFunction(T) from T to T only accepts a single argument, you can pass multiple values as part of an anonymous structure. (Or an array, or a class.) - // Does not work: too many arguments. - // threadPool.run(doWork, argument0, argument1, argument2); + // Does not work: too many arguments. + // threadPool.run(doWork, argument0, argument1, argument2); // Works: all arguments are combined into one `State` object. threadPool.run(doWork, { arg0: argument0, arg1: argument1, arg2: argument2 }); @@ -299,6 +302,7 @@ typedef State = Dynamic; class JobData { private static var nextID:Int = 0; + /** `JobData` instances will regularly be copied in HTML5, so checking equality won't work. Instead, compare identifiers. @@ -339,6 +343,7 @@ class JobData } #if haxe4 enum #else @:enum #end abstract ThreadEventType(String) + { // Events sent from a worker thread to the main thread var COMPLETE = "COMPLETE"; @@ -351,7 +356,8 @@ class JobData var EXIT = "EXIT"; } -typedef ThreadEvent = { +typedef ThreadEvent = +{ var event:ThreadEventType; @:optional var message:Dynamic; @:optional var job:JobData; @@ -379,7 +385,6 @@ class JSAsync } // Define platform-specific types - #if target.threaded // Haxe 3 compatibility: "target.threaded" can't go in parentheses. #elseif !(cpp || neko) From b7dd45586ce8e041ebc5fe78604260c423a11abc Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Sun, 18 Aug 2024 21:42:44 -0400 Subject: [PATCH 12/19] Haxe expects documentation above metadata. --- src/lime/system/BackgroundWorker.hx | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/lime/system/BackgroundWorker.hx b/src/lime/system/BackgroundWorker.hx index ba1b7340a0..1a250062e1 100644 --- a/src/lime/system/BackgroundWorker.hx +++ b/src/lime/system/BackgroundWorker.hx @@ -14,10 +14,6 @@ import neko.vm.Deque; import neko.vm.Thread; #end #end -#if !lime_debug -@:fileXml('tags="haxe,release"') -@:noDebug -#end /** A background worker executes a single function on a background thread, @@ -25,6 +21,10 @@ import neko.vm.Thread; have thread support, meaning the function will block on any other target. @see `ThreadPool` for improved thread safety, HTML5 threads, and more. **/ +#if !lime_debug +@:fileXml('tags="haxe,release"') +@:noDebug +#end class BackgroundWorker { private static var MESSAGE_COMPLETE = "__COMPLETE__"; From a774bac183571f80c5bf19532745eb0055f2e1a2 Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Sun, 18 Aug 2024 21:44:47 -0400 Subject: [PATCH 13/19] Remove `canceled` and `completed` from `ThreadPool`. These were added for drop-in compatibility with `BackgroundWorker`, but we might not need that level of compatibility. We can discuss adding these back later if there's demand. --- src/lime/system/ThreadPool.hx | 27 ++------------------------- 1 file changed, 2 insertions(+), 25 deletions(-) diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index 9dca868885..55efff7c79 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -39,12 +39,6 @@ import lime._internal.backend.html5.HTML5Thread as Thread; threadPool.run(processFile, url); } - If upgrading from `BackgroundWorker`, you can use `ThreadPool` as a drop-in - replacement. It will then display some deprecation warnings indicating which - steps to take next. - - --- - Guidelines to make your code work on all targets and configurations: - For thread safety and web worker compatibility, your work function should @@ -110,17 +104,6 @@ class ThreadPool extends WorkOutput #end } - /** - Indicates that no further events will be dispatched. - **/ - public var canceled(default, null):Bool = false; - - /** - Indicates that the latest job finished successfully, and no other job - has been started/is ongoing. - **/ - public var completed(default, null):Bool = false; - /** The number of live threads in this pool, including both active and idle threads. Does not count threads that have been instructed to shut down. @@ -321,8 +304,6 @@ class ThreadPool extends WorkOutput __jobComplete.value = false; activeJob = null; - completed = false; - canceled = true; } /** @@ -391,8 +372,6 @@ class ThreadPool extends WorkOutput var job:JobData = new JobData(doWork, state); __jobQueue.push(job); - completed = false; - canceled = false; if (!Application.current.onUpdate.has(__update)) { @@ -627,15 +606,13 @@ class ThreadPool extends WorkOutput } #end - completed = threadEvent.event == COMPLETE && activeJobs == 0 && __jobQueue.length == 0; - default: } activeJob = null; } - if (completed) + if (activeJobs == 0 && __jobQueue.length == 0) { Application.current.onUpdate.remove(__update); } @@ -687,7 +664,7 @@ class ThreadPool extends WorkOutput } } -@:access(lime.system.ThreadPool) @:forward(canceled) +@:access(lime.system.ThreadPool) private abstract PseudoEvent(ThreadPool) from ThreadPool { @:noCompletion @:dox(hide) public var __listeners(get, never):Array; From d3a39b560dd92b16b47c979e3aa5f47007c78ab4 Mon Sep 17 00:00:00 2001 From: Joseph Cloutier Date: Thu, 22 Aug 2024 01:44:39 -0400 Subject: [PATCH 14/19] One more documentation pass. Hopefully my last before 8.2.0. --- src/lime/system/ThreadPool.hx | 31 ++++++++++++++----------------- src/lime/system/WorkOutput.hx | 16 +++++++--------- 2 files changed, 21 insertions(+), 26 deletions(-) diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index 55efff7c79..61d60763cd 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -78,9 +78,11 @@ class ThreadPool extends WorkOutput /** A rough estimate of how much of the app's time should be spent on single-threaded `ThreadPool`s. For instance, the default value of 1/2 - means they will aim to take up about half the app's available time every - frame. See `workIterations` for instructions to improve the accuracy of - this estimate. + means they'll use about half the app's available time every frame. + + The accuracy of this estimate depends on how often your work functions + return. If you find that a `ThreadPool` is taking longer than scheduled, + try making the work function return more often. **/ public static var workLoad:Float = 1 / 2; @@ -128,13 +130,6 @@ class ThreadPool extends WorkOutput The maximum number of live threads this pool can have at once. If this value decreases, active jobs will still be allowed to finish. - - You can set this in single-threaded mode, but it's rarely useful. For - instance, suppose you have six jobs, each of which takes about a second. - If you leave `maxThreads` at 1, then one will finish every second for - six seconds. If you set `maxThreads = 6`, then none will finish for five - seconds, and then they'll all finish at once. The total duration is - unchanged, but none of them finish early. **/ public var maxThreads:Int; @@ -142,10 +137,8 @@ class ThreadPool extends WorkOutput __Set this only from the main thread.__ The number of threads that will be kept alive at all times, even if - there's no work to do. Setting this won't add new threads, it'll just - keep existing ones running. - - Has no effect in single-threaded mode. + there's no work to do. Setting this won't immediately spin up new + threads; you must still call `run()` to get them started. **/ public var minThreads:Int; @@ -343,7 +336,13 @@ class ThreadPool extends WorkOutput } /** - Queues a new job, to be run once a thread becomes available. + Runs the given function asynchronously, or queues it for later if all + threads are busy. + @param doWork The function to run. For best results, see the guidelines + in the `ThreadPool` class overview. In brief: `doWork` should be static, + only access its arguments, and return often. + @param state An object to pass to `doWork`, ideally a mutable object so + that `doWork` can save its progress. @return The job's unique ID. **/ public function run(doWork:WorkFunctionWorkOutput->Void> = null, state:State = null):Int @@ -647,8 +646,6 @@ class ThreadPool extends WorkOutput return activeJobs + idleThreads; } - // Note the distinction between `doWork` and `__doWork`: the former is for - // backwards compatibility, while the latter is always used. private function get_doWork():PseudoEvent { return this; diff --git a/src/lime/system/WorkOutput.hx b/src/lime/system/WorkOutput.hx index 48db2a5e2d..c2673703bd 100644 --- a/src/lime/system/WorkOutput.hx +++ b/src/lime/system/WorkOutput.hx @@ -42,8 +42,9 @@ class WorkOutput the current job, including (if applicable) the ongoing call. In single-threaded mode, it only counts the number of calls this frame. - This helps you adjust `doWork`'s length: too few iterations per frame - means `workLoad` may be inaccurate, while too many may add overhead. + The lower the number, the less accurate `ThreadPool.workLoad` becomes, + but the higher the number, the more overhead there is. As a ballpark + estimate, aim for 10-100 iterations. **/ public var workIterations(default, null):Tls = new Tls(); @@ -236,21 +237,18 @@ class WorkOutput /** A function that performs asynchronous work. This can either be work on - another thread ("multi-threaded mode"), or it can represent a virtual - thread ("single-threaded mode"). + another thread ("multi-threaded mode"), or it can represent a green thread + ("single-threaded mode"). In single-threaded mode, the work function shouldn't complete the job all at once, as the main thread would lock up. Instead, it should perform a fraction of the job each time it's called. `ThreadPool` provides the - function with a persistent `State` argument that can track progress. - Alternatively, you may be able to bind your own `State` argument. + function with a persistent `State` argument for tracking progress, which can + be any object of your choice. Caution: if using multi-threaded mode in HTML5, this must be a static function and binding arguments is forbidden. Compile with `-Dlime-warn-portability` to highlight functions that won't work. - - The exact length of `doWork` can vary, but single-threaded mode will run - more smoothly if it's short enough to run several times per frame. **/ #if (lime_threads && html5) typedef WorkFunction = lime._internal.backend.html5.HTML5Thread.WorkFunction; From 2907e42431e27e0e46c93a95ca1cb92b9a6d4180 Mon Sep 17 00:00:00 2001 From: Chris Speciale Date: Thu, 22 Aug 2024 17:19:38 -0400 Subject: [PATCH 15/19] BackgroundWorker:Docs Originally had docs for this but we nixed it for a while so they never got added. --- src/lime/system/BackgroundWorker.hx | 40 +++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 5 deletions(-) diff --git a/src/lime/system/BackgroundWorker.hx b/src/lime/system/BackgroundWorker.hx index 1a250062e1..97bec2bae7 100644 --- a/src/lime/system/BackgroundWorker.hx +++ b/src/lime/system/BackgroundWorker.hx @@ -16,10 +16,17 @@ import neko.vm.Thread; #end /** - A background worker executes a single function on a background thread, - allowing it to avoid blocking the main thread. However, only system targets - have thread support, meaning the function will block on any other target. - @see `ThreadPool` for improved thread safety, HTML5 threads, and more. + A `BackgroundWorker` allows the execution of a function on a background thread, + avoiding the blocking of the main thread. This is particularly useful for long-running + operations like file I/O, network requests, or computationally intensive tasks. + + ### Notes: + - **Thread Support:** Only system targets (such as C++, Neko) support threading. + - **Events:** The class uses the `Event` class to dispatch completion, error, + and progress notifications. + + @see `ThreadPool` for more advanced threading capabilities, including thread + safety, HTML5 threads, and more robust handling of tasks. **/ #if !lime_debug @:fileXml('tags="haxe,release"') @@ -43,8 +50,15 @@ class BackgroundWorker @:noCompletion private var __workerThread:Thread; #end + /** + Creates a new `BackgroundWorker` instance. + **/ public function new() {} + /** + Cancels the worker's task if it is still running. This won't stop the thread + immediately. + **/ public function cancel():Void { canceled = true; @@ -54,6 +68,10 @@ class BackgroundWorker #end } + /** + Starts the worker's task, optionally passing a message to the task. + @param message An optional message to pass to the worker's task. + **/ public function run(message:Dynamic = null):Void { canceled = false; @@ -75,6 +93,10 @@ class BackgroundWorker #end } + /** + Sends a completion message, indicating that the worker has finished its task. + @param message An optional message to pass to the `onComplete` event. + **/ public function sendComplete(message:Dynamic = null):Void { completed = true; @@ -91,6 +113,10 @@ class BackgroundWorker #end } + /** + Sends an error message, indicating that an error occurred during the worker's task. + @param message An optional message to pass to the `onError` event. + **/ public function sendError(message:Dynamic = null):Void { #if (cpp || neko) @@ -104,7 +130,11 @@ class BackgroundWorker } #end } - + + /** + Sends a progress update message. + @param message An optional message to pass to the `onProgress` event. + **/ public function sendProgress(message:Dynamic = null):Void { #if (cpp || neko) From 5c8538efcb52d44fd6fc00803847ef70b9c9ab92 Mon Sep 17 00:00:00 2001 From: Chris Speciale Date: Thu, 22 Aug 2024 17:22:39 -0400 Subject: [PATCH 16/19] BackgroundWorker: More docs Forgot the properties. --- src/lime/system/BackgroundWorker.hx | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/src/lime/system/BackgroundWorker.hx b/src/lime/system/BackgroundWorker.hx index 97bec2bae7..f92ca0ca17 100644 --- a/src/lime/system/BackgroundWorker.hx +++ b/src/lime/system/BackgroundWorker.hx @@ -37,11 +37,35 @@ class BackgroundWorker private static var MESSAGE_COMPLETE = "__COMPLETE__"; private static var MESSAGE_ERROR = "__ERROR__"; + /** + Indicates whether the worker has been canceled. + **/ public var canceled(default, null):Bool; + + /** + Indicates whether the worker has completed its task. + **/ public var completed(default, null):Bool; + + /** + Dispatched when the worker is about to perform its task. + The function to execute should be added as a listener to this event. + **/ public var doWork = new EventVoid>(); + + /** + Dispatched when the worker has successfully completed its task. + **/ public var onComplete = new EventVoid>(); + + /** + Dispatched if an error occurs during the execution of the worker's task. + **/ public var onError = new EventVoid>(); + + /** + Dispatched periodically during the worker's task to provide progress updates. + **/ public var onProgress = new EventVoid>(); @:noCompletion private var __runMessage:Dynamic; From 4f4f5df7d83c59b49ffb7dbadecbdc872d9cd841 Mon Sep 17 00:00:00 2001 From: Chris Speciale Date: Thu, 22 Aug 2024 17:30:50 -0400 Subject: [PATCH 17/19] AudioSource: Docs We need more docs! --- src/lime/media/AudioSource.hx | 63 +++++++++++++++++++++++++++++++++++ 1 file changed, 63 insertions(+) diff --git a/src/lime/media/AudioSource.hx b/src/lime/media/AudioSource.hx index ab0992e4fe..2ccc9b1465 100644 --- a/src/lime/media/AudioSource.hx +++ b/src/lime/media/AudioSource.hx @@ -9,20 +9,71 @@ import lime.math.Vector4; @:fileXml('tags="haxe,release"') @:noDebug #end +/** + The `AudioSource` class provides a way to control audio playback in a Lime application. + It allows for playing, pausing, and stopping audio, as well as controlling various + audio properties such as gain, pitch, and looping. + + Depending on the platform, the audio backend may vary, but the API remains consistent. + + @see lime.media.AudioBuffer +**/ class AudioSource { + /** + An event that is dispatched when the audio playback is complete. + **/ public var onComplete = new EventVoid>(); + + /** + The `AudioBuffer` associated with this `AudioSource`. + **/ public var buffer:AudioBuffer; + + /** + The current playback position of the audio, in milliseconds. + **/ public var currentTime(get, set):Int; + + /** + The gain (volume) of the audio. A value of `1.0` represents the default volume. + **/ public var gain(get, set):Float; + + /** + The length of the audio, in milliseconds. + **/ public var length(get, set):Int; + + /** + The number of times the audio will loop. A value of `0` means the audio will not loop. + **/ public var loops(get, set):Int; + + /** + The pitch of the audio. A value of `1.0` represents the default pitch. + **/ public var pitch(get, set):Float; + + /** + The offset within the audio buffer to start playback, in samples. + **/ public var offset:Int; + + /** + The 3D position of the audio source, represented as a `Vector4`. + **/ public var position(get, set):Vector4; @:noCompletion private var __backend:AudioSourceBackend; + /** + Creates a new `AudioSource` instance. + @param buffer The `AudioBuffer` to associate with this `AudioSource`. + @param offset The starting offset within the audio buffer, in samples. + @param length The length of the audio to play, in milliseconds. If `null`, the full buffer is used. + @param loops The number of times to loop the audio. `0` means no looping. + **/ public function new(buffer:AudioBuffer = null, offset:Int = 0, length:Null = null, loops:Int = 0) { this.buffer = buffer; @@ -43,6 +94,9 @@ class AudioSource } } + /** + Releases any resources used by this `AudioSource`. + **/ public function dispose():Void { __backend.dispose(); @@ -53,16 +107,25 @@ class AudioSource __backend.init(); } + /** + Starts or resumes audio playback. + **/ public function play():Void { __backend.play(); } + /** + Pauses audio playback. + **/ public function pause():Void { __backend.pause(); } + /** + Stops audio playback and resets the playback position to the beginning. + **/ public function stop():Void { __backend.stop(); From 0b936846d9fb59b8c4b2e9c5cc9ab5db3f9d390d Mon Sep 17 00:00:00 2001 From: Chris Speciale Date: Thu, 22 Aug 2024 17:40:58 -0400 Subject: [PATCH 18/19] AudioBuffer: docs Missing docs. --- src/lime/media/AudioBuffer.hx | 78 +++++++++++++++++++++++++++++++++++ 1 file changed, 78 insertions(+) diff --git a/src/lime/media/AudioBuffer.hx b/src/lime/media/AudioBuffer.hx index 967c8fbbe9..e74b531ec3 100644 --- a/src/lime/media/AudioBuffer.hx +++ b/src/lime/media/AudioBuffer.hx @@ -31,12 +31,41 @@ import flash.net.URLRequest; @:fileXml('tags="haxe,release"') @:noDebug #end + +/** + The `AudioBuffer` class represents a buffer of audio data that can be played back using an `AudioSource`. + It supports a variety of audio formats and platforms, providing a consistent API for loading and managing audio data. + + Depending on the platform, the audio backend may differ, but the class provides a unified interface for accessing + audio data, whether it's stored in memory, loaded from a file, or streamed. + + @see lime.media.AudioSource +**/ class AudioBuffer { + /** + The number of bits per sample in the audio data. + **/ public var bitsPerSample:Int; + + /** + The number of audio channels (e.g., 1 for mono, 2 for stereo). + **/ public var channels:Int; + + /** + The raw audio data stored as a `UInt8Array`. + **/ public var data:UInt8Array; + + /** + The sample rate of the audio data, in Hz. + **/ public var sampleRate:Int; + + /** + The source of the audio data. This can be an `Audio`, `Sound`, `Howl`, or other platform-specific object. + **/ public var src(get, set):Dynamic; @:noCompletion private var __srcAudio:#if (js && html5) Audio #else Dynamic #end; @@ -57,8 +86,14 @@ class AudioBuffer } #end + /** + Creates a new, empty `AudioBuffer` instance. + **/ public function new() {} + /** + Disposes of the resources used by this `AudioBuffer`, such as unloading any associated audio data. + **/ public function dispose():Void { #if (js && html5 && lime_howlerjs) @@ -66,6 +101,12 @@ class AudioBuffer #end } + /** + Creates an `AudioBuffer` from a Base64-encoded string. + + @param base64String The Base64-encoded audio data. + @return An `AudioBuffer` instance with the decoded audio data. + **/ public static function fromBase64(base64String:String):AudioBuffer { if (base64String == null) return null; @@ -112,6 +153,12 @@ class AudioBuffer return null; } + /** + Creates an `AudioBuffer` from a `Bytes` object. + + @param bytes The `Bytes` object containing the audio data. + @return An `AudioBuffer` instance with the decoded audio data. + **/ public static function fromBytes(bytes:Bytes):AudioBuffer { if (bytes == null) return null; @@ -145,6 +192,12 @@ class AudioBuffer return null; } + /** + Creates an `AudioBuffer` from a file. + + @param path The file path to the audio data. + @return An `AudioBuffer` instance with the audio data loaded from the file. + **/ public static function fromFile(path:String):AudioBuffer { if (path == null) return null; @@ -196,6 +249,12 @@ class AudioBuffer #end } + /** + Creates an `AudioBuffer` from an array of file paths. + + @param paths An array of file paths to search for audio data. + @return An `AudioBuffer` instance with the audio data loaded from the first valid file found. + **/ public static function fromFiles(paths:Array):AudioBuffer { #if (js && html5 && lime_howlerjs) @@ -221,7 +280,14 @@ class AudioBuffer #end } + /** + Creates an `AudioBuffer` from a `VorbisFile`. + + @param vorbisFile The `VorbisFile` object containing the audio data. + @return An `AudioBuffer` instance with the decoded audio data. + **/ #if lime_vorbis + public static function fromVorbisFile(vorbisFile:VorbisFile):AudioBuffer { if (vorbisFile == null) return null; @@ -243,6 +309,12 @@ class AudioBuffer } #end + /** + Asynchronously loads an `AudioBuffer` from a file. + + @param path The file path to the audio data. + @return A `Future` that resolves to the loaded `AudioBuffer`. + **/ public static function loadFromFile(path:String):Future { #if (flash || (js && html5)) @@ -307,6 +379,12 @@ class AudioBuffer #end } + /** + Asynchronously loads an `AudioBuffer` from multiple files. + + @param paths An array of file paths to search for audio data. + @return A `Future` that resolves to the loaded `AudioBuffer`. + **/ public static function loadFromFiles(paths:Array):Future { #if (js && html5 && lime_howlerjs) From 96c5c1c1213d5a351c0a4e77639581544ccd50d5 Mon Sep 17 00:00:00 2001 From: player-03 Date: Sat, 24 Aug 2024 16:19:46 -0400 Subject: [PATCH 19/19] Simplify `ThreadPool.cancelJob()`. Offering four options makes the underlying code more complicated for minimal benefit. --- src/lime/system/ThreadPool.hx | 102 +++++----------------------------- 1 file changed, 13 insertions(+), 89 deletions(-) diff --git a/src/lime/system/ThreadPool.hx b/src/lime/system/ThreadPool.hx index 61d60763cd..ec8669b295 100644 --- a/src/lime/system/ThreadPool.hx +++ b/src/lime/system/ThreadPool.hx @@ -301,30 +301,21 @@ class ThreadPool extends WorkOutput /** Cancels one active or queued job. Does not dispatch an error event. - @param job A `JobData` object, or a job's unique `id`, `state`, or - `doWork` function. @return Whether a job was canceled. **/ - public function cancelJob(job:JobIdentifier):Bool + public function cancelJob(jobID:Int):Bool { - var data:JobData = __activeJobs.get(job); - - if (data != null) + #if lime_threads + var thread:Thread = __activeThreads[jobID]; + if (thread != null) { - #if lime_threads - var thread:Thread = __activeThreads[data.id]; - if (thread != null) - { - thread.sendMessage({event: CANCEL}); - __activeThreads.remove(data.id); - __idleThreads.push(thread); - } - #end - - return __activeJobs.remove(data); + thread.sendMessage({event: CANCEL}); + __activeThreads.remove(jobID); + __idleThreads.push(thread); } + #end - return __jobQueue.remove(__jobQueue.get(job)); + return __activeJobs.remove(__activeJobs.get(jobID)) || __jobQueue.remove(__jobQueue.get(jobID)); } /** @@ -551,7 +542,7 @@ class ThreadPool extends WorkOutput { if (threadEvent.jobID != null) { - activeJob = __activeJobs.getByID(threadEvent.jobID); + activeJob = __activeJobs.get(threadEvent.jobID); } else { @@ -750,7 +741,7 @@ class JobList public inline function exists(job:JobData):Bool { - return getByID(job.id) != null; + return get(job.id) != null; } public inline function hasNext():Bool @@ -798,7 +789,7 @@ class JobList public inline function removeByID(id:Int):Bool { - if (__jobs.remove(getByID(id))) + if (__jobs.remove(get(id))) { __addingWorkPriority = length > 0; return true; @@ -809,7 +800,7 @@ class JobList } } - public function getByID(id:Int):JobData + public function get(id:Int):JobData { for (job in __jobs) { @@ -820,33 +811,6 @@ class JobList } return null; } - - public function get(jobIdentifier:JobIdentifier):JobData - { - switch (jobIdentifier) - { - case ID(id): - return getByID(id); - case FUNCTION(doWork): - for (job in __jobs) - { - if (job.doWork == doWork) - { - return job; - } - } - case STATE(state): - for (job in __jobs) - { - if (job.state == state) - { - return job; - } - } - } - return null; - } - public inline function push(job:JobData):Void { __jobs.push(job); @@ -880,43 +844,3 @@ class JobList return __jobs.length; } } - -/** - A piece of data that uniquely represents a job. This can be the integer ID - (and integers will be assumed to be such), the `doWork` function, or the - `JobData` object itself. Failing any of those, a value will be assumed to be - the job's `state`. - - Caution: if the provided data isn't unique, such as a `doWork` function - that's in use by multiple jobs, the wrong job may be selected or canceled. -**/ -@:forward -abstract JobIdentifier(JobIdentifierImpl) from JobIdentifierImpl -{ - @:from private static inline function fromJob(job:JobData):JobIdentifier - { - return ID(job.id); - } - - @:from private static inline function fromID(id:Int):JobIdentifier - { - return ID(id); - } - - @:from private static inline function fromFunction(doWork:WorkFunctionWorkOutput->Void>):JobIdentifier - { - return FUNCTION(doWork); - } - - @:from private static inline function fromState(state:State):JobIdentifier - { - return STATE(state); - } -} - -private enum JobIdentifierImpl -{ - ID(id:Int); - FUNCTION(doWork:WorkFunctionWorkOutput->Void>); - STATE(state:State); -}