diff --git a/.editorconfig b/.editorconfig index ca64b0ee..e7ef5f29 100644 --- a/.editorconfig +++ b/.editorconfig @@ -156,6 +156,9 @@ csharp_space_between_square_brackets = false dotnet_diagnostic.CA1510.severity = none dotnet_diagnostic.CA1513.severity = none +# Stream APIs with Memory parameters are not available in .NET Framework +dotnet_diagnostic.CA1835.severity = none + dotnet_diagnostic.IDE0290.severity = none # Use primary constructor dotnet_diagnostic.IDE0065.severity = none # Using directives must be placed outside of namespace diff --git a/docs/.vitepress/config.mts b/docs/.vitepress/config.mts index 2a33c264..2fc983fe 100644 --- a/docs/.vitepress/config.mts +++ b/docs/.vitepress/config.mts @@ -42,8 +42,9 @@ export default defineConfig({ { text: 'JS / .NET Marshalling', link: '/features/js-dotnet-marshalling' }, { text: 'JS types in .NET', link: '/features/js-types-in-dotnet' }, { text: 'JS value scopes', link: '/features/js-value-scopes' }, - { text: 'JS threading & async', link: '/features/js-threading-async' }, { text: 'JS references', link: '/features/js-references' }, + { text: 'JS threading & async', link: '/features/js-threading-async' }, + { text: 'Node worker threads', link: '/features/node-workers' }, { text: '.NET Native AOT', link: '/features/dotnet-native-aot' }, { text: 'Performance', link: '/features/performance' }, ] diff --git a/docs/features/node-workers.md b/docs/features/node-workers.md new file mode 100644 index 00000000..bac89384 --- /dev/null +++ b/docs/features/node-workers.md @@ -0,0 +1,65 @@ +# Node Worker Threads + +[Node worker threads](https://nodejs.org/api/worker_threads.html) enable parallel execution of +JavaScript in the same process. They are ideal for CPU-intensive JavaScript operations. They are +less suited to I/O-intensive work, where the Node.js built-in asynchronous I/O operations are more +efficient than Workers. + +The [NodeWorker](../reference/dotnet/Microsoft.JavaScript.NodeApi.Interop/NodeWorker) class enables +C# code to create Node worker threads in the same process, and communicate with them. + +## JS worker threads + +To create a worker, construct a new `NodeWorker` instance with the path to the worker JavaScript +file: + +```C# +var worker = new NodeWorker(@".\myWorker.js", new NodeWorker.Options()); +``` + +Or provide the worker script directly as a string, using the `Eval` option: +```C# +var worker = new NodeWorker(@" + const assert = require('node:assert'); + const { isMainThread } = require('node:worker_threads'); + assert(!isMainThread); // This script is running as a worker. + ", new NodeWorker.Options { Eval = true }); +``` + +Messages (any serializable JS values) can be passed back and forth between the C# host and the JS +worker: +```C# +var worker = new NodeWorker(@" + const { parentPort } = require('node:worker_threads'); + parentPort.on('message', (msg) => { + parentPort.postMessage(msg); // echo + }); + ", new NodeWorker.Options { Eval = true }); + +// Wait for the worker to start before sending a message. +TaskCompletionSource onlineCompletion = new(); +worker.Online += (sender, e) => onlineCompletion.TrySetResult(true); +worker.Error += (sender, e) => onlineCompletion.TrySetException(new JSException(e.Error)); +await onlineCompletion.Task; + +// Send a message and verify the response. +TaskCompletionSource echoCompletion = new(); +worker.Message += (_, e) => echoCompletion.TrySetResult((string)e.Value); +worker.Error += (_, e) => echoCompletion.TrySetException( + new JSException(e.Error)); +worker.Exit += (_, e) => echoCompletion.TrySetException( + new InvalidOperationException("Worker exited without echoing!")); +worker.PostMessage("hello"); +string echo = await echoCompletion.Task; +Assert.Equal("hello", echo); +``` + +## C# worker threads + +::: warning :construction: COMING SOON +This functionality is not available yet, but is coming soon. +::: + +Instead of starting a worker with a JavaScript file, it will be possible to provide a C# delegate. +The delegate callback will be invoked on the JS worker thread; then it can orchestrate importing +JavaScript packages, callilng JS functions, or whatever is needed to do the work on the thread. diff --git a/src/NodeApi.DotNetHost/JSMarshaller.cs b/src/NodeApi.DotNetHost/JSMarshaller.cs index 2db33de8..5060c5e7 100644 --- a/src/NodeApi.DotNetHost/JSMarshaller.cs +++ b/src/NodeApi.DotNetHost/JSMarshaller.cs @@ -3163,8 +3163,12 @@ private IEnumerable BuildFromJSToCollectionInterfaceExpressions( * (key) => (JSValue)key, * (value) => (JSValue)value); */ - MethodInfo asDictionaryMethod = typeof(JSCollectionExtensions).GetStaticMethod( - nameof(JSCollectionExtensions.AsDictionary))!.MakeGenericMethod(keyType, valueType); + MethodInfo asDictionaryMethod = typeof(JSCollectionExtensions) + .GetMethods(BindingFlags.Public | BindingFlags.Static) + .Where((m) => m.Name == nameof(JSCollectionExtensions.AsDictionary) && + m.GetParameters()[0].ParameterType == typeof(JSMap)) + .Single() + .MakeGenericMethod(keyType, valueType); MethodInfo asJSMapMethod = typeof(JSMap).GetExplicitConversion( typeof(JSValue), typeof(JSMap)); yield return Expression.Coalesce( @@ -3189,9 +3193,12 @@ private IEnumerable BuildFromJSToCollectionInterfaceExpressions( * (value) => (TValue)value, * (key) => (JSValue)key); */ - MethodInfo asDictionaryMethod = typeof(JSCollectionExtensions).GetStaticMethod( - nameof(JSCollectionExtensions.AsReadOnlyDictionary)) - !.MakeGenericMethod(keyType, valueType); + MethodInfo asDictionaryMethod = typeof(JSCollectionExtensions) + .GetMethods(BindingFlags.Public | BindingFlags.Static) + .Where((m) => m.Name == nameof(JSCollectionExtensions.AsReadOnlyDictionary) && + m.GetParameters()[0].ParameterType == typeof(JSMap)) + .Single() + .MakeGenericMethod(keyType, valueType); MethodInfo asJSMapMethod = typeof(JSMap).GetExplicitConversion( typeof(JSValue), typeof(JSMap)); yield return Expression.Coalesce( @@ -3248,71 +3255,6 @@ private IEnumerable BuildFromJSToCollectionClassExpressions( Expression.Convert(valueExpression, jsIterableType, asJSIterableMethod), GetFromJSValueExpression(elementType)))); } - else if (typeDefinition == typeof(Dictionary<,>)) - { - Type keyType = elementType; - Type valueType = toType.GenericTypeArguments[1]; - - /* - * value.TryUnwrap() as Dictionary ?? - * new Dictionary(((JSMap)value).AsDictionary( - * (key) => (TKey)key, - * (value) => (TValue)value, - * (key) => (JSValue)key, - * (value) => (JSValue)value); - */ - MethodInfo asDictionaryMethod = typeof(JSCollectionExtensions).GetStaticMethod( - nameof(JSCollectionExtensions.AsDictionary))!.MakeGenericMethod( - keyType, valueType); - MethodInfo asJSMapMethod = typeof(JSMap).GetExplicitConversion( - typeof(JSValue), typeof(JSMap)); - ConstructorInfo dictionaryConstructor = toType.GetConstructor( - new[] { typeof(IDictionary<,>).MakeGenericType(keyType, valueType) })!; - yield return Expression.Coalesce( - Expression.TypeAs(Expression.Call(valueExpression, s_tryUnwrap), toType), - Expression.New( - dictionaryConstructor, - Expression.Call( - asDictionaryMethod, - Expression.Convert(valueExpression, typeof(JSMap), asJSMapMethod), - GetFromJSValueExpression(keyType), - GetFromJSValueExpression(valueType), - GetToJSValueExpression(keyType), - GetToJSValueExpression(valueType)))); - } - else if (typeDefinition == typeof(SortedDictionary<,>)) - { - Type keyType = elementType; - Type valueType = toType.GenericTypeArguments[1]; - - /* - * value.TryUnwrap() as SortedDictionary ?? - * new SortedDictionary(((JSMap)value).AsDictionary( - * (key) => (TKey)key, - * (value) => (TValue)value, - * (key) => (JSValue)key, - * (value) => (JSValue)value)); - */ - MethodInfo asDictionaryMethod = typeof(JSCollectionExtensions).GetStaticMethod( - nameof(JSCollectionExtensions.AsDictionary))!.MakeGenericMethod( - keyType, valueType); - MethodInfo asJSMapMethod = typeof(JSMap).GetExplicitConversion( - typeof(JSValue), typeof(JSMap)); - // SortedDictionary doesn't have a constructor that takes IEnumerable>. - ConstructorInfo dictionaryConstructor = toType.GetConstructor( - new[] { typeof(IDictionary<,>).MakeGenericType(keyType, valueType) })!; - yield return Expression.Coalesce( - Expression.TypeAs(Expression.Call(valueExpression, s_tryUnwrap), toType), - Expression.New( - dictionaryConstructor, - Expression.Call( - asDictionaryMethod, - Expression.Convert(valueExpression, typeof(JSMap), asJSMapMethod), - GetFromJSValueExpression(keyType), - GetFromJSValueExpression(valueType), - GetToJSValueExpression(keyType), - GetToJSValueExpression(valueType)))); - } else if (typeDefinition == typeof(Collection<>) || typeDefinition == typeof(ReadOnlyCollection<>)) { @@ -3342,21 +3284,27 @@ private IEnumerable BuildFromJSToCollectionClassExpressions( GetToJSValueExpression(elementType)))); } - else if (typeDefinition == typeof(ReadOnlyDictionary<,>)) + else if (typeDefinition == typeof(Dictionary<,>) || + typeDefinition == typeof(SortedDictionary<,>) || + typeDefinition == typeof(ReadOnlyDictionary<,>)) { Type keyType = elementType; Type valueType = toType.GenericTypeArguments[1]; /* - * value.TryUnwrap() as ReadOnlyDictionary ?? + * value.TryUnwrap() as Dictionary ?? * new Dictionary(((JSMap)value).AsDictionary( * (key) => (TKey)key, * (value) => (TValue)value, * (key) => (JSValue)key, * (value) => (JSValue)value)); */ - MethodInfo asDictionaryMethod = typeof(JSCollectionExtensions).GetStaticMethod( - nameof(JSCollectionExtensions.AsDictionary))!.MakeGenericMethod(keyType, valueType); + MethodInfo asDictionaryMethod = typeof(JSCollectionExtensions) + .GetMethods(BindingFlags.Public | BindingFlags.Static) + .Where((m) => m.Name == nameof(JSCollectionExtensions.AsDictionary) && + m.GetParameters()[0].ParameterType == typeof(JSMap)) + .Single() + .MakeGenericMethod(keyType, valueType); MethodInfo asJSMapMethod = typeof(JSMap).GetExplicitConversion( typeof(JSValue), typeof(JSMap)); ConstructorInfo dictionaryConstructor = toType.GetConstructor( diff --git a/src/NodeApi.DotNetHost/ManagedHost.cs b/src/NodeApi.DotNetHost/ManagedHost.cs index b7192646..65860e50 100644 --- a/src/NodeApi.DotNetHost/ManagedHost.cs +++ b/src/NodeApi.DotNetHost/ManagedHost.cs @@ -113,7 +113,10 @@ JSValue removeListener(JSCallbackArgs args) JSPropertyDescriptor.Function("load", LoadAssembly), JSPropertyDescriptor.Function("addListener", addListener), - JSPropertyDescriptor.Function("removeListener", removeListener)); + JSPropertyDescriptor.Function("removeListener", removeListener), + + JSPropertyDescriptor.Function("runWorker", RunWorker)); + // Create a marshaller instance for the current thread. The marshaller dynamically // generates adapter delegates for calls to and from JS, for assemblies that were not @@ -560,6 +563,27 @@ private Assembly LoadAssembly(string assemblyNameOrFilePath, bool allowNativeLib return assembly; } + private JSValue RunWorker(JSCallbackArgs args) + { + nint callbackHandleValue = (nint)args[0].ToBigInteger(); + Trace($"> ManagedHost.RunWorker({callbackHandleValue})"); + + GCHandle callbackHandle = GCHandle.FromIntPtr(callbackHandleValue); + Action callback = (Action)callbackHandle.Target!; + callbackHandle.Free(); + + try + { + // Worker data and argv are available to the callback as NodejsWorker static properties. + callback(); + return JSValue.Undefined; + } + finally + { + Trace($"< ManagedHost.RunWorker({callbackHandleValue})"); + } + } + protected override void Dispose(bool disposing) { if (disposing) diff --git a/src/NodeApi/Interop/JSCollectionExtensions.cs b/src/NodeApi/Interop/JSCollectionExtensions.cs index 1b9a1012..d2b0f3ed 100644 --- a/src/NodeApi/Interop/JSCollectionExtensions.cs +++ b/src/NodeApi/Interop/JSCollectionExtensions.cs @@ -192,6 +192,35 @@ public static IDictionary AsDictionary( JSValue.From valueToJS) => ((JSValue)map).IsNullOrUndefined() ? null! : new JSMapDictionary((JSValue)map, keyFromJS, valueFromJS, keyToJS, valueToJS); + + /// + /// Creates a read-only dictionary adapter for properties of a JS object, without copying. + /// + /// + /// This method must be called from the JS thread. The returned dictionary object + /// is thread-safe and may be accessed from threads other than the JS thread + /// (though accessing from the JS thread is more efficient). + /// + public static IReadOnlyDictionary AsReadOnlyDictionary( + this JSObject obj, + JSValue.To valueFromJS) + => ((JSValue)obj).IsNullOrUndefined() ? null! : + new JSObjectReadOnlyDictionary((JSValue)obj, valueFromJS); + + /// + /// Creates a dictionary adapter for properties of a JS object, without copying. + /// + /// + /// This method must be called from the JS thread. The returned dictionary object + /// is thread-safe and may be accessed from threads other than the JS thread + /// (though accessing from the JS thread is more efficient). + /// + public static IDictionary AsDictionary( + this JSObject obj, + JSValue.To valueFromJS, + JSValue.From valueToJS) + => ((JSValue)obj).IsNullOrUndefined() ? null! : + new JSObjectDictionary((JSValue)obj, valueFromJS, valueToJS); } internal sealed class JSAsyncIterableEnumerator : IAsyncEnumerator, IDisposable @@ -850,3 +879,221 @@ bool ICollection>.Remove(KeyValuePair i }); } } + +internal class JSObjectReadOnlyDictionary : + IReadOnlyDictionary, + IEquatable, + IDisposable +{ + internal JSObjectReadOnlyDictionary( + JSValue obj, + JSValue.To valueFromJS) + { + _objReference = new JSReference(obj); + ValueFromJS = valueFromJS; + } + + private readonly JSReference _objReference; + + protected void Run(Action action) => _objReference.Run(action); + protected TResult Run(Func action) => _objReference.Run(action); + + internal JSValue Value => _objReference.GetValue(); + + bool IEquatable.Equals(JSValue other) => Run((obj) => obj.Equals(other)); + + protected JSValue.To ValueFromJS { get; } + + public int Count + => Run((obj) => (int)JSValue.Global["Object"].CallMethod("keys", obj)["length"]); + + public IEnumerable Keys + => Run((obj) => ((JSIterable)JSValue.Global["Object"].CallMethod("keys", obj)) + .AsEnumerable((key) => (string)key)); + + public IEnumerable Values + => Run((obj) => ((JSIterable)JSValue.Global["Object"].CallMethod("values", obj)) + .AsEnumerable(ValueFromJS)); + + public TValue this[string key] + { + get => Run((obj) => + { + JSValue jsValue = obj.GetProperty(key); + if (jsValue.IsUndefined()) + { + throw new KeyNotFoundException(); + } + return ValueFromJS(jsValue); + }); + } + + public bool ContainsKey(string key) => Run((obj) => (bool)obj.HasProperty(key)); + +#if NETFRAMEWORK || NETSTANDARD + public bool TryGetValue(string key, out TValue value) +#else + public bool TryGetValue(string key, [MaybeNullWhen(false)] out TValue value) +#endif + { + bool result; + (result, value) = Run((obj) => + { + JSValue jsValue = obj.GetProperty(key); + if (jsValue.IsUndefined()) + { + return (false, default(TValue)!); + } + return (true, ValueFromJS(jsValue)); + }); + return result; + } + + public IEnumerator> GetEnumerator() + { + return Run((obj) => ((JSIterable)JSValue.Global["Object"].CallMethod("values", obj)) + .AsEnumerable>((pair) => new KeyValuePair( + (string)pair[0], ValueFromJS(pair[1])))).GetEnumerator(); + } + + System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator() + => GetEnumerator(); + + public void Dispose() + { + // Do not change this code. Put cleanup code in 'Dispose(bool disposing)' method + Dispose(disposing: true); + } + + protected virtual void Dispose(bool disposing) + { + if (disposing) + { + _objReference.Dispose(); + } + } +} + +internal class JSObjectDictionary : + JSObjectReadOnlyDictionary, + IDictionary +{ + internal JSObjectDictionary( + JSValue map, + JSValue.To valueFromJS, + JSValue.From valueToJS) + : base(map, valueFromJS) + { + ValueToJS = valueToJS; + } + + public new TValue this[string key] + { + get => Run((obj) => + { + JSValue jsValue = obj.GetProperty(key); + if (jsValue.IsUndefined()) + { + throw new KeyNotFoundException(); + } + return ValueFromJS(jsValue); + }); + set => Run((obj) => + { + obj[key] = ValueToJS(value); + }); + } + + protected JSValue.From ValueToJS { get; } + + private int GetCount() => Count; + + public void Add(string key, TValue value) + { + if (ContainsKey(key)) + { + throw new ArgumentException("An item with the same key already exists."); + } + + this[key] = value; + } + + public bool Remove(string key) => Run((obj) => obj.DeleteProperty(key)); + + public void Clear() + { + Run((obj) => + { + foreach (JSValue key in (JSArray)obj.GetPropertyNames()) + { + obj.DeleteProperty(key); + } + }); + } + + ICollection IDictionary.Keys + => Run((obj) => new JSIterableCollection( + (JSIterable)JSValue.Global["Object"].CallMethod("keys", obj), + (key) => (string)key, + GetCount)); + + ICollection IDictionary.Values + => Run((obj) => new JSIterableCollection( + (JSIterable)JSValue.Global["Object"].CallMethod("values", obj), + ValueFromJS, + GetCount)); + + bool ICollection>.IsReadOnly => false; + + void ICollection>.Add(KeyValuePair item) + => Add(item.Key, item.Value); + + bool ICollection>.Contains(KeyValuePair item) + => TryGetValue(item.Key, out TValue? value) && + (item.Value?.Equals(value) ?? value == null); + + void ICollection>.CopyTo( + KeyValuePair[] array, int arrayIndex) + { + Run((obj) => + { + int i = arrayIndex; + JSValue entries = (JSIterable)JSValue.Global["Object"].CallMethod("entries", obj); + JSValue iterator = entries.CallMethod(JSSymbol.Iterator); + while (true) + { + JSValue nextResult = iterator.CallMethod("next"); + JSValue done = nextResult["done"]; + if (done.IsBoolean() && (bool)done) + { + break; + } + + JSValue pair = nextResult["value"]; + array[i++] = new KeyValuePair( + (string)pair[0], ValueFromJS(pair[1])); + } + }); + } + + bool ICollection>.Remove(KeyValuePair item) + { + return Run((obj) => + { + JSValue jsValue = obj.GetProperty(item.Key); + if (jsValue.IsUndefined()) + { + return false; + } + + TValue value = ValueFromJS(jsValue); + if (value?.Equals(item.Value) != true) + { + return false; + } + + obj.DeleteProperty(item.Key); + return true; + }); + } +} diff --git a/src/NodeApi/Interop/NodeProcess.cs b/src/NodeApi/Interop/NodeProcess.cs new file mode 100644 index 00000000..3e1cdb15 --- /dev/null +++ b/src/NodeApi/Interop/NodeProcess.cs @@ -0,0 +1,60 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System.Collections.Generic; +using System.IO; + +namespace Microsoft.JavaScript.NodeApi.Interop; + +/// +/// Provides information about, and control over, the current Node.js process. +/// +/// +/// These APIs are primarily meant for use with threads, for which the +/// process APIs are overridden to be specific to the worker thread context. +/// +public static class NodeProcess +{ + // Note the Import() function caches a reference to the imported module. + private static JSValue ProcessModule => JSRuntimeContext.Current.Import("node:process"); + + /// + /// Gets or sets the command-line arguments for the current process or worker thread. + /// The first argument (element 0) is the executable path; the second (index 1) is the + /// path to the main script file. + /// + public static IReadOnlyList Argv + => ((JSArray)ProcessModule["argv"]).AsReadOnlyList((item) => (string)item); + + /// + /// Gets a dictionary that allows getting or setting environment variables for the current + /// process or worker thread. + /// + public static IDictionary Env + => ((JSObject)ProcessModule["env"]).AsDictionary( + (value) => (string)value, (value) => (JSValue)value); + + /// + /// Gets a stream connected to the current process or worker thread stdin. + /// + public static Stream Stdin => (NodeStream)ProcessModule["stdin"]; + + /// + /// Gets a stream connected to the current process or worker thread stdout. + /// + public static Stream Stdout => (NodeStream)ProcessModule["stdout"]; + + /// + /// Gets a stream connected to the current process or worker thread stderr. + /// + public static Stream StdErr => (NodeStream)ProcessModule["stderr"]; + + /// + /// Exits the current process or worker thread. + /// + /// + public static void Exit(int exitCode) + { + ProcessModule.CallMethod("exit", exitCode); + } +} diff --git a/src/NodeApi/Interop/NodeStream.cs b/src/NodeApi/Interop/NodeStream.cs index 0b1d0307..6cb100c5 100644 --- a/src/NodeApi/Interop/NodeStream.cs +++ b/src/NodeApi/Interop/NodeStream.cs @@ -22,9 +22,17 @@ public static implicit operator JSValue(NodeStream stream) private NodeStream(JSValue value) { + bool isObject = value.IsObject(); + bool canRead = isObject && value.HasProperty("read"); + bool canWrite = isObject && value.HasProperty("write"); + if (!canRead && !canWrite) + { + throw new ArgumentException("Stream is neither readable nor writeable.", nameof(value)); + } + _valueReference = new JSReference(value); - if (CanRead) + if (canRead) { _readableSemaphore = new SemaphoreSlim(0); value.CallMethod("on", "readable", JSValue.CreateFunction("onreadable", (args) => @@ -32,9 +40,14 @@ private NodeStream(JSValue value) _readableSemaphore.Release(); return JSValue.Undefined; })); + value.CallMethod("on", "end", JSValue.CreateFunction("onend", (args) => + { + _readableSemaphore.Release(); + return JSValue.Undefined; + })); } - if (CanWrite) + if (canWrite) { _drainSemaphore = new SemaphoreSlim(0); value.CallMethod("on", "drain", JSValue.CreateFunction("ondrain", (args) => @@ -132,20 +145,31 @@ public async ValueTask ReadAsync( int count = buffer.Length; JSValue value = Value; JSValue result = value.CallMethod("read", count); - if (result.IsNull()) + while (result.IsNull()) { if ((bool)value.GetProperty("readableEnded")) { + // End of stream. return 0; } else { - // No data is currently available. Wait for the next "readable" event, which will be - // raised either when data becomes available or the end of the stream is reached. - await _readableSemaphore!.WaitAsync(cancellation); - ThrowIfError(); + int readableLength = (int)value.GetProperty("readableLength"); + if (readableLength > 0) + { + // There are some bytes available, but fewer than the requested count. + count = Math.Min(readableLength, buffer.Length); + buffer = buffer.Slice(0, count); + } + else + { + // No data is currently available. Wait for the next "readable" event, which will be + // raised either when data becomes available or the end of the stream is reached. + await _readableSemaphore!.WaitAsync(cancellation); + ThrowIfError(); + value = Value; + } - value = Value; result = value.CallMethod("read", count); } } diff --git a/src/NodeApi/Interop/NodeWorker.cs b/src/NodeApi/Interop/NodeWorker.cs new file mode 100644 index 00000000..dfa4d0fd --- /dev/null +++ b/src/NodeApi/Interop/NodeWorker.cs @@ -0,0 +1,510 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT License. + +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Runtime.InteropServices; +using System.Threading.Tasks; + +namespace Microsoft.JavaScript.NodeApi.Interop; + +/// +/// A Node.js worker thread, as documented at https://nodejs.org/api/worker_threads.html. +/// +/// +/// Static members of this class enable a worker thread to access its current context, while +/// instance members enable a parent (or main) thread to manage a specific child worker thread. +/// +public class NodeWorker +{ + private readonly JSReference _workerRef; + + // Note the Import() function caches a reference to the imported module. + private static JSValue WorkerModule => JSRuntimeContext.Current.Import("node:worker_threads"); + + /// + /// Creates a new instance of that runs a Node.js script in a + /// separate worker thread. + /// + /// Path to the script file to run in the worker. Or if + /// is true the string is a script to be directly + /// evaluated. + /// Worker options. + public NodeWorker(string workerScript, NodeWorker.Options options) + { + if (string.IsNullOrEmpty(workerScript)) + { + throw new ArgumentNullException(nameof(workerScript)); + } + if (options == null) + { + throw new ArgumentNullException(nameof(options)); + } + + JSValue workerClass = WorkerModule["Worker"]; + JSValue worker = workerClass.CallAsConstructor(workerScript, options.ToJS()); + RegisterEventHandlers(worker); + _workerRef = new JSReference(worker); + } + + /// + /// Creates a new instance of that runs a callback delegate + /// in a Node.js worker thread. + /// + /// Callback delegate to be invoked on the worker thread. + /// (The callback may then invoke script and interop with the worker JS context.) + /// Worker options. + public NodeWorker(Action workerCallback, NodeWorker.Options options) + { + if (workerCallback == null) + { + throw new ArgumentNullException(nameof(workerCallback)); + } + if (options == null) + { + throw new ArgumentNullException(nameof(options)); + } + + // It is not possible to pass a JS Function value to a worker. Instead, this saves the + // callback via a GC handle, then passes the GC handle integer to the worker. + // Do not use JSRuntimeContext.AllocGCHandle() here, because the handle will be freed from + // another runtime context (the worker thread). + nint callbackHandle = (nint)GCHandle.Alloc(workerCallback); + string workerScript = @$"require('node-api-dotnet').runWorker({callbackHandle}n);"; + + JSValue workerClass = WorkerModule["Worker"]; + JSValue worker = workerClass.CallAsConstructor( + workerScript, options.ToJS(overrideEval: true)); + RegisterEventHandlers(worker); + _workerRef = new JSReference(worker); + + // TODO: This isn't fullly implemented yet: the require('node-api-dotnet') in the worker + // script currently fails. We'll need a way to override the require() function in the + // worker context. + throw new NotImplementedException(); + } + + private void RegisterEventHandlers(JSValue worker) + { + JSValue onMethod = worker["on"]; + onMethod.Call(worker, "online", new JSFunction(() => + { + Online?.Invoke(this, EventArgs.Empty); + })); + onMethod.Call(worker, "message", new JSFunction((JSValue message) => + { + Message?.Invoke(this, new MessageEventArgs(message)); + })); + onMethod.Call(worker, "messageerror", new JSFunction((JSValue error) => + { + MessageError?.Invoke(this, new ErrorEventArgs(new JSError(error))); + })); + onMethod.Call(worker, "error", new JSFunction((JSValue error) => + { + Error?.Invoke(this, new ErrorEventArgs(new JSError(error))); + })); + onMethod.Call(worker, "exit", new JSFunction((JSValue exitCode) => + { + Exit?.Invoke(this, new ExitEventArgs((int)exitCode)); + })); + } + + /// + /// Gets a value indicating whether the current code is running on the main JS thread; + /// if false it is a worker thread. + /// + /// No JS scope was established for the current + /// thread. + public static bool IsMainThread => (bool)WorkerModule["isMainThread"]; + + /// + /// Gets an integer identifier for the current JS thread. On the corresponding worker object + /// (if there is any), it is available as . This value is unique for each + /// instance inside a single process. + /// + /// No JS scope was established for the current + /// thread. + public static int CurrentThreadId => (int)WorkerModule["threadId"]; + + /// + /// An arbitrary JavaScript value that contains a clone of the data passed to this thread's + /// Worker constructor. + /// + /// No JS scope was established for the current + /// thread. + public static JSValue CurrentWorkerData => WorkerModule["workerData"]; + + /// + /// An integer identifier for the referenced thread. Inside the worker thread, it is available + /// as . This value is unique for each Worker + /// instance inside a single process. + /// + public int ThreadId => (int)_workerRef.Run((worker) => worker["threadId"]); + + /// + /// If : true was passed to the Worker constructor, this is a writable + /// stream. The data written to this stream will be made available in the worker thread as + /// . + /// + public Stream? Stdin => _workerRef.Run((worker) => + { + JSValue stream = worker["stdin"]; + return stream.IsNullOrUndefined() ? null : (NodeStream)stream; + }); + + /// + /// A readable stream which contains data written to inside + /// the worker thread. If : true was not passed to the Worker constructor, + /// then data is piped to the parent thread's stream. + /// + public Stream Stdout => _workerRef.Run((worker) => (NodeStream)worker["stdout"]); + + /// + /// A readable stream which contains data written to inside + /// the worker thread. If : true was not passed to the Worker constructor, + /// then data is piped to the parent thread's stream. + /// + public Stream Stderr => _workerRef.Run((worker) => (NodeStream)worker["stderr"]); + + /// + /// Within a worker thread, returns a clone of data passed to the spawning thread's + /// . Every new Worker receives its own copy of the environment + /// data automatically. + /// + public static JSValue GetEnvironmentData(JSValue key) + => WorkerModule.CallMethod("getEnvironmentData", key); + + /// + /// Sets the environment data in the current thread and all new Worker instances spawned from + /// the current context. + /// + public static void SetEnvironmentData(JSValue key, JSValue value) + => WorkerModule.CallMethod("setEnvironmentData", key, value); + + /// + /// Opposite of . Calling ref() on a previously unref()ed worker does not + /// let the program exit if it's the only active handle left (the default behavior). If the + /// worker is ref()ed, calling ref() again has no effect. + /// + public void Ref() + { + _workerRef.Run((worker) => worker.CallMethod("ref")); + } + + /// + /// Allows the thread to exit if this is the only active handle in the event system. If the + /// worker is already unref()ed calling unref() again has no effect. + /// + public void Unref() + { + _workerRef.Run((worker) => worker.CallMethod("unref")); + } + + /// + /// Stops all JavaScript execution in the worker thread as soon as possible. Returns a Promise + /// for the exit code that is fulfilled when the 'exit' event is emitted. + /// + public Task Terminate() + { + return _workerRef.Run((worker) => + { + JSPromise exitPromise = (JSPromise)worker.CallMethod("terminate"); + return exitPromise.AsTask((exitCode) => (int?)exitCode ?? null); + }); + } + + /// + /// Send a message to the worker that is received via the + /// event. + /// + /// + public void PostMessage(JSValue value) + { + _workerRef.Run((worker) => worker.CallMethod("postMessage", value)); + } + + /// + /// If this thread is a Worker, this is a allowing communication with + /// the parent thread. Messages posted via the parent port + /// are available in the parent thread via the worker's event, and + /// messages sent from the parent thread using the worker's are + /// available in this thread via the parent port's event. + /// + public static MessagePort? ParentPort + { + get + { + JSValue parentPort = WorkerModule["parentPort"]; + return parentPort.IsUndefined() ? null : new MessagePort(parentPort); + } + } + + /// + /// Represents one end of an asynchronous, two-way communications channel. It can be used to + /// transfer structured data, memory regions, and other MessagePorts between different Workers. + /// + public class MessagePort + { + private readonly JSReference _portRef; + + internal MessagePort(JSValue port) + { + RegisterEventHandlers(port); + _portRef = new JSReference(port); + } + + private void RegisterEventHandlers(JSValue port) + { + JSValue onMethod = port["on"]; + onMethod.Call(port, "message", new JSFunction((JSValue message) => + { + Message?.Invoke(this, new MessageEventArgs(message)); + })); + onMethod.Call(port, "messageerror", new JSFunction((JSValue error) => + { + MessageError?.Invoke(this, new ErrorEventArgs(new JSError(error))); + })); + onMethod.Call(port, "close", new JSFunction((JSValue error) => + { + Closed?.Invoke(this, new EventArgs()); + })); + } + + public static (MessagePort Port1, MessagePort Port2) CreateChannel() + { + JSValue channel = WorkerModule["MessageChannel"].CallAsConstructor(); + return (new MessagePort(channel["port1"]), new MessagePort(channel["port2"])); + } + + public void PostMessage(JSValue value) + { + _portRef.Run((port) => port.CallMethod("postMessage", value)); + } + + public event EventHandler? Message; + public event EventHandler? MessageError; + public event EventHandler? Closed; + + public void Ref() + { + _portRef.Run((port) => port.CallMethod("ref")); + } + + /// + /// Allows the thread to exit if this is the only active handle in the event system. If the + /// worker is already unref()ed calling unref() again has no effect. + /// + public void Unref() + { + _portRef.Run((port) => port.CallMethod("unref")); + } + + /// + /// Disables further sending of messages on either side of the connection. This method can + /// be called when no further communication will happen over this MessagePort. + /// The event is emitted on both MessagePort instances that are part + /// of the channel. + /// + public void Close() + { + _portRef.Run((port) => port.CallMethod("close")); + } + } + + /// + /// Emitted when the worker thread has started executing JavaScript code. + /// + public event EventHandler? Online; + + /// + /// Emitted when the worker thread has sent a message via its + /// method. + /// + /// + /// All messages sent from the worker thread are emitted before the event is + /// emitted on the Worker object. + /// + public event EventHandler? Message; + + /// + /// Emitted when deserializing a message failed. + /// + public event EventHandler? MessageError; + + /// + /// Emitted if the worker thread throws an uncaught exception. In that case, the worker is + /// terminated. + /// + public event EventHandler? Error; + + /// + /// Emitted once the worker has stopped. If the worker exited by calling + /// , the parameter is the + /// passed exit code. If the worker was terminated, the + /// parameter is 1. + /// + public event EventHandler? Exit; + + public class MessageEventArgs : EventArgs + { + public MessageEventArgs(JSValue value) => Value = value; + public JSValue Value { get; } + } + + public class ErrorEventArgs : EventArgs + { + public ErrorEventArgs(JSError error) => Error = error; + public JSError Error { get; } + } + + public class ExitEventArgs : EventArgs + { + public ExitEventArgs(int exitCode) => ExitCode = exitCode; + public int ExitCode { get; } + } + + /// + /// Options for configuring a . + /// + public class Options + { + /// + /// If true, interpret the first argument to the constructor as a script that is executed + /// once the worker is online. Otherwise the first argument to the constructor must be + /// a file path to the script. + /// + public bool Eval { get; init; } + + /// + /// An optional name to be appended to the worker title for debugging/identification purposes. + /// + public string? Name { get; set; } + + /// + /// List of arguments which would be stringified and appended to + /// in the worker. This is mostly similar to the + /// but the values are available on the global + /// as if they were passed as CLI options to the script. + /// + public string[]? Argv { get; set; } + + /// + /// List of node CLI options passed to the worker. V8 options and options that affect the + /// process are not supported. If set, this is provided as process.execArgv inside the + /// worker. By default, options are inherited from the parent thread. + /// + public string[]? ExecArgv { get; set; } + + /// + /// Any JavaScript value that is cloned and made available as + /// . The cloning occurs as described in the HTML structured + /// clone algorithm, and an error is thrown if the object cannot be cloned (e.g. because it + /// contains functions). + /// + public JSValue? WorkerData { get; set; } + + /// + /// If set, specifies the initial value of inside the + /// Worker thread. Default: parent thread env. + /// + /// + /// Not valid if is true. + /// + public IDictionary? Env { get; set; } + + /// + /// Specifies that the parent thread and the child thread should share their environment + /// variables; in that case, changes to one thread's object + /// affect the other thread as well. + /// + /// + /// must be null if this option is set. + /// + public bool ShareEnv { get; set; } + + /// + /// If this is set to true, then provides a writable stream + /// whose contents appear as inside the Worker. By default, + /// no data is provided. + /// + public bool Stdin { get; set; } + + /// + /// If this is set to true, then is not automatically + /// piped through to in the parent. + /// + public bool Stdout { get; set; } + + /// + /// If this is set to true, then is not automatically + /// piped through to in the parent. + /// + public bool Stderr { get; set; } + + /// + /// If this is set to true, then the Worker tracks raw file descriptors managed through + /// fs.open() and fs.close(), and closes them when the Worker exits, similar + /// to other resources like network sockets or file descriptors managed through the + /// FileHandle API. This option is automatically inherited by all nested Workers. + /// Default: true. + /// + public bool TrackUnmanagedFds { get; set; } = true; + + /// + /// An optional set of resource limits for the new JS engine instance. Reaching these limits + /// leads to termination of the Worker instance. These limits only affect the JS engine, + /// and no external data, including no ArrayBuffers. Even if these limits are set, the + /// process may still abort if it encounters a global out-of-memory situation. + /// + public NodeWorker.ResourceLimits? ResourceLimits { get; set; } + + internal JSObject ToJS(bool? overrideEval = null) + { + return new JSObject + { + ["eval"] = overrideEval ?? Eval, + ["name"] = Name ?? JSValue.Undefined, + ["argv"] = Argv != null ? + new JSArray(Argv.Select((a) => (JSValue)a).ToArray()) : + JSValue.Undefined, + ["execArgv"] = ExecArgv != null ? + new JSArray(ExecArgv.Select((a) => (JSValue)a).ToArray()) : + JSValue.Undefined, + ["workerData"] = WorkerData ?? JSValue.Undefined, + ["env"] = ShareEnv ? WorkerModule["SHARE_ENV"] : Env != null ? + new JSObject(Env.Select( + kv => new KeyValuePair(kv.Key, kv.Value))) : + JSValue.Undefined, + ["stdin"] = Stdin, + ["stdout"] = Stdout, + ["stderr"] = Stderr, + ["trackUnmanagedFds"] = TrackUnmanagedFds, + ["resourceLimits"] = ResourceLimits?.ToJS() ?? JSValue.Undefined, + }; + } + } + + /// + /// Resource limits for a . + /// + public class ResourceLimits + { + public uint? MaxOldGenerationSizeMb { get; set; } + public uint? MaxYoungGenerationSizeMb { get; set; } + public uint? CodeRangeSizeMb { get; set; } + public uint? StackSizeMb { get; set; } + + internal JSObject ToJS() + { + return new JSObject + { + ["maxOldGenerationSizeMb"] = MaxOldGenerationSizeMb ?? JSValue.Undefined, + ["maxYoungGenerationSizeMb"] = MaxYoungGenerationSizeMb ?? JSValue.Undefined, + ["codeRangeSizeMb"] = CodeRangeSizeMb ?? JSValue.Undefined, + ["stackSizeMb"] = StackSizeMb ?? JSValue.Undefined, + }; + } + } +} diff --git a/src/NodeApi/Runtime/NodejsEnvironment.cs b/src/NodeApi/Runtime/NodejsEnvironment.cs index 1debc2fb..07c3b8f6 100644 --- a/src/NodeApi/Runtime/NodejsEnvironment.cs +++ b/src/NodeApi/Runtime/NodejsEnvironment.cs @@ -90,6 +90,7 @@ private static void InitializeModuleImportFunctions( JSReference originalRequireRef = new(originalRequire); JSFunction envRequire = new("require", (modulePath) => { + Debug.WriteLine($"require('{(string)modulePath}')"); JSValue require = originalRequireRef.GetValue(); JSValue resolvedPath = ResolveModulePath(require, modulePath, baseDir); return require.Call(thisArg: default, resolvedPath); diff --git a/test/NodejsEmbeddingTests.cs b/test/NodejsEmbeddingTests.cs index ae8fd481..f1c452ce 100644 --- a/test/NodejsEmbeddingTests.cs +++ b/test/NodejsEmbeddingTests.cs @@ -1,12 +1,16 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT License. +#pragma warning disable CA1822 // Mark members as static + using System; using System.IO; using System.Linq; +using System.Text; using System.Threading; using System.Threading.Tasks; using Microsoft.JavaScript.NodeApi.DotNetHost; +using Microsoft.JavaScript.NodeApi.Interop; using Microsoft.JavaScript.NodeApi.Runtime; using Xunit; using static Microsoft.JavaScript.NodeApi.Test.TestUtils; @@ -235,6 +239,154 @@ public void ErrorPropagation() (line) => line.StartsWith($"at {typeof(NodejsEmbeddingTests).FullName}.")); } + [SkippableFact] + public async Task WorkerIsMainThread() + { + await TestWorker( + mainPrepare: () => + { + Assert.True(NodeWorker.IsMainThread); + return new NodeWorker.Options { Eval = true }; + }, + workerScript: @" +const assert = require('node:assert'); +const { isMainThread } = require('node:worker_threads'); +assert(!isMainThread); +", + mainRun: (worker) => Task.CompletedTask); + } + + [SkippableFact] + public async Task WorkerArgs() + { + await TestWorker( + mainPrepare: () => + { + return new NodeWorker.Options + { + Eval = true, +#pragma warning disable CA1861 // Prefer 'static readonly' fields over constant array arguments + Argv = new[] { "test1", "test2" }, +#pragma warning restore CA1861 + WorkerData = true, + }; + }, + workerScript: @" +const assert = require('node:assert'); +const process = require('node:process'); +const { workerData } = require('node:worker_threads'); +assert.deepStrictEqual(process.argv.slice(2), ['test1', 'test2']); +assert.strictEqual(typeof workerData, 'boolean'); +assert(workerData); +", + mainRun: (worker) => Task.CompletedTask); + } + + [SkippableFact] + public async Task WorkerEnv() + { + await TestWorker( + mainPrepare: () => + { + NodeWorker.SetEnvironmentData("test", JSValue.True); + return new NodeWorker.Options + { + Eval = true, + }; + }, + workerScript: @" +const assert = require('node:assert'); +const { getEnvironmentData } = require('node:worker_threads'); +assert.strictEqual(getEnvironmentData('test'), true); +", + mainRun: (worker) => Task.CompletedTask); + } + + [SkippableFact] + public async Task WorkerMessages() + { + await TestWorker( + mainPrepare: () => + { + return new NodeWorker.Options { Eval = true }; + }, + workerScript: @" +const { parentPort } = require('node:worker_threads'); +parentPort.on('message', (msg) => parentPort.postMessage(msg)); // echo +", + mainRun: async (worker) => + { + TaskCompletionSource echoCompletion = new(); + worker.Message += (_, e) => echoCompletion.TrySetResult((string)e.Value); + worker.Error += (_, e) => echoCompletion.TrySetException( + new JSException(e.Error)); + worker.Exit += (_, e) => echoCompletion.TrySetException( + new InvalidOperationException("Worker exited without echoing!")); + worker.PostMessage("test"); + string echo = await echoCompletion.Task; + Assert.Equal("test", echo); + }); + } + + [SkippableFact] + public async Task WorkerStdinStdout() + { + await TestWorker( + mainPrepare: () => + { + return new NodeWorker.Options + { + Eval = true, + Stdin = true, + Stdout = true, + }; + }, + workerScript: @"process.stdin.pipe(process.stdout)", + mainRun: async (worker) => + { + TaskCompletionSource echoCompletion = new(); + worker.Error += (_, e) => echoCompletion.TrySetException( + new JSException(e.Error)); + worker.Exit += (_, e) => echoCompletion.TrySetException( + new InvalidOperationException("Worker exited without echoing!")); + Assert.NotNull(worker.Stdin); + await worker.Stdin.WriteAsync(Encoding.ASCII.GetBytes("test\n"), 0, 5); + byte[] buffer = new byte[25]; + int count = await worker.Stdout.ReadAsync(buffer, 0, buffer.Length); + Assert.Equal("test\n", Encoding.ASCII.GetString(buffer, 0, count)); + }); + } + + private static async Task TestWorker( + Func mainPrepare, + string workerScript, + Func mainRun) + { + using NodejsEnvironment nodejs = CreateNodejsEnvironment(); + await nodejs.RunAsync(async () => + { + NodeWorker.Options workerOptions = mainPrepare.Invoke(); + NodeWorker worker = new(workerScript, workerOptions); + + TaskCompletionSource onlineCompletion = new(); + worker.Online += (sender, e) => onlineCompletion.SetResult(true); + TaskCompletionSource exitCompletion = new(); + worker.Error += (sender, e) => exitCompletion.SetException(new JSException(e.Error)); + worker.Exit += (sender, e) => exitCompletion.TrySetResult(e.ExitCode); + + await onlineCompletion.Task; + try + { + await mainRun.Invoke(worker); + } + finally + { + await worker.Terminate(); + } + await exitCompletion.Task; + }); + } + /// /// Tests the functionality of dynamically exporting and marshalling a class type from .NET /// to JS (as opposed to relying on [JSExport] (compile-time code-generation) for marshalling.