Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ConcurrentDictionary lookups from Unix socket event loop #109052

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions src/libraries/System.Net.Sockets/src/Resources/Strings.resx
Original file line number Diff line number Diff line change
Expand Up @@ -312,9 +312,6 @@
<data name="SystemNetSockets_PlatformNotSupported" xml:space="preserve">
<value>System.Net.Sockets is not supported on this platform.</value>
</data>
<data name="net_sockets_handle_already_used" xml:space="preserve">
<value>Handle is already used by another Socket.</value>
</data>
<data name="net_sockets_address_small" xml:space="preserve">
<value>Provided SocketAddress is too small for given AddressFamily.</value>
</data>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1262,6 +1262,8 @@ public void Trace(SocketAsyncContext context, string message, [CallerMemberName]
private SocketAsyncEngine? _asyncEngine;
private bool IsRegistered => _asyncEngine != null;
private bool _isHandleNonBlocking = OperatingSystem.IsWasi(); // WASI sockets are always non-blocking, because we don't have another thread which could be blocked
/// <summary>Handle associated with a <see cref="SocketAsyncEngine"/> while <see cref="IsRegistered"/>.</summary>
internal GCHandle GCHandle;

private readonly object _registerLock = new object();

Expand Down Expand Up @@ -1330,7 +1332,10 @@ public bool StopAndAbort()
// We don't need to synchronize with Register.
// This method is called when the handle gets released.
// The Register method will throw ODE when it tries to use the handle at this point.
_asyncEngine?.UnregisterSocket(_socket.DangerousGetHandle(), this);
if (IsRegistered)
{
SocketAsyncEngine.UnregisterSocket(this);
}

return aborted;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,14 +74,20 @@ private static SocketAsyncEngine[] CreateEngines()
return engines;
}

/// <summary>
/// Each <see cref="SocketAsyncContext"/> is assigned a <see cref="GCHandle"/> while registered with a <see cref="SocketAsyncEngine"/>.
/// <para>The handle is used as the <see cref="Interop.Sys.SocketEvent.Data"/> to quickly map events to <see cref="SocketAsyncContext"/>s.</para>
/// <para>When a socket is disposed and unregistered, there is a small race condition where we may still receive events with the
/// now-unused <see cref="GCHandle"/>. Since we assume that the handle is always allocated in <see cref="SocketEventHandler.HandleSocketEvents"/>,
/// we can never <see cref="GCHandle.Free"/> these handles.</para>
/// <para>Instead, we reuse handles for new sockets. The race condition may therefore trigger a notification for the wrong socket,
/// but that is okay as those operations can be retried.</para>
/// </summary>
private static readonly ConcurrentQueue<GCHandle> s_gcHandles = new();
Copy link
Member

@stephentoub stephentoub Oct 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this pool really necessary?

#95098

Copy link
Member Author

@MihaZupan MihaZupan Oct 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not a typical pool (correctness more-so than perf). As the comment above describes, we're never freeing these handles since their lifetime is tricky.
This pool is here to avoid completely leaking a handle per socket, instead capping their number at whatever peak load the process reaches. That seems fine given that even with the dictionary, we wouldn't ever resize it back down.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The dictionary has only a small overhead of memory per slot. What is the impact of never releasing a GCHandle?

Copy link
Member Author

@MihaZupan MihaZupan Oct 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

About 8 bytes * peak socket count.
I'm guessing the more meaningful impact would be on the GC throughput if you really have a ton of these laying around.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


private readonly IntPtr _port;
private readonly Interop.Sys.SocketEvent* _buffer;

//
// Maps handle values to SocketAsyncContext instances.
//
private readonly ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> _handleToContextMap = new ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper>();

//
// Queue of events generated by EventLoop() that would be processed by the thread pool
//
Expand Down Expand Up @@ -119,28 +125,39 @@ public static bool TryRegisterSocket(IntPtr socketHandle, SocketAsyncContext con

private bool TryRegisterCore(IntPtr socketHandle, SocketAsyncContext context, out Interop.Error error)
{
bool added = _handleToContextMap.TryAdd(socketHandle, new SocketAsyncContextWrapper(context));
if (!added)
Debug.Assert(!context.GCHandle.IsAllocated);

if (s_gcHandles.TryDequeue(out context.GCHandle))
{
context.GCHandle.Target = context;
}
else
{
// Using public SafeSocketHandle(IntPtr) a user can add the same handle
// from a different Socket instance.
throw new InvalidOperationException(SR.net_sockets_handle_already_used);
context.GCHandle = GCHandle.Alloc(context, GCHandleType.Normal);
}

error = Interop.Sys.TryChangeSocketEventRegistration(_port, socketHandle, Interop.Sys.SocketEvents.None,
Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, socketHandle);
Interop.Sys.SocketEvents.Read | Interop.Sys.SocketEvents.Write, GCHandle.ToIntPtr(context.GCHandle));
if (error == Interop.Error.SUCCESS)
{
return true;
}

_handleToContextMap.TryRemove(socketHandle, out _);
UnregisterSocket(context);
return false;
}

public void UnregisterSocket(IntPtr socketHandle, SocketAsyncContext __)
public static void UnregisterSocket(SocketAsyncContext context)
{
_handleToContextMap.TryRemove(socketHandle, out _);
Debug.Assert(context.GCHandle.IsAllocated);
Debug.Assert(ReferenceEquals(context.GCHandle.Target, context));

GCHandle handle = context.GCHandle;
context.GCHandle = default;

// See comments on s_gcHandles for why we're reusing the handle instead of freeing it.
handle.Target = null;
s_gcHandles.Enqueue(handle);
}

private SocketAsyncEngine()
Expand Down Expand Up @@ -324,13 +341,11 @@ private readonly struct SocketEventHandler
{
public Interop.Sys.SocketEvent* Buffer { get; }

private readonly ConcurrentDictionary<IntPtr, SocketAsyncContextWrapper> _handleToContextMap;
private readonly ConcurrentQueue<SocketIOEvent> _eventQueue;

public SocketEventHandler(SocketAsyncEngine engine)
{
Buffer = engine._buffer;
_handleToContextMap = engine._handleToContextMap;
_eventQueue = engine._eventQueue;
}

Expand All @@ -340,10 +355,12 @@ public bool HandleSocketEvents(int numEvents)
bool enqueuedEvent = false;
foreach (var socketEvent in new ReadOnlySpan<Interop.Sys.SocketEvent>(Buffer, numEvents))
{
if (_handleToContextMap.TryGetValue(socketEvent.Data, out SocketAsyncContextWrapper contextWrapper))
{
SocketAsyncContext context = contextWrapper.Context;
GCHandle handle = GCHandle.FromIntPtr(socketEvent.Data);
Debug.Assert(handle.IsAllocated);
Debug.Assert(handle.Target is null or SocketAsyncContext);

if (handle.IsAllocated && Unsafe.As<SocketAsyncContext>(handle.Target) is { } context)
{
if (context.PreferInlineCompletions)
{
context.HandleEventsInline(socketEvent.Events);
Expand All @@ -365,18 +382,6 @@ public bool HandleSocketEvents(int numEvents)
}
}

// struct wrapper is used in order to improve the performance of the epoll thread hot path by up to 3% of some TechEmpower benchmarks
// the goal is to have a dedicated generic instantiation and using:
// System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.Net.Sockets.SocketAsyncContextWrapper]::TryGetValueInternal(!0,int32,!1&)
// instead of:
// System.Collections.Concurrent.ConcurrentDictionary`2[System.IntPtr,System.__Canon]::TryGetValueInternal(!0,int32,!1&)
private readonly struct SocketAsyncContextWrapper
{
public SocketAsyncContextWrapper(SocketAsyncContext context) => Context = context;

internal SocketAsyncContext Context { get; }
}

private readonly struct SocketIOEvent
{
public SocketAsyncContext Context { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ public static bool TryRegisterSocket(IntPtr socketHandle, SocketAsyncContext con
return true;
}

#pragma warning disable CA1822
public void UnregisterSocket(IntPtr _, SocketAsyncContext context)
#pragma warning restore CA1822
public static void UnregisterSocket(SocketAsyncContext context)
{
context.unregisterPollHook.Cancel();
}
Expand Down
Loading