Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling null in Transfer method #5

Merged
merged 7 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,4 @@ jobs:
if: ${{ inputs.publish }}
run: |
dotnet pack -c ${{ env.BUILD_CONFIG }} --no-build
dotnet nuget push build_output/packages/*.nupkg -k ${{ secrets.NUGET_API_KEY }} -s https://api.nuget.org/v3/index.json
dotnet nuget push build_output/packages/*.nupkg -k ${{ secrets.NUGETTEST_API_KEY }} -s https://apiint.nugettest.org/v3/index.json
2 changes: 1 addition & 1 deletion src/Directory.Build.props
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
<GeneratePackageOnBuild Condition="'$(Configuration)'=='Package'">True</GeneratePackageOnBuild>
<IncludeSymbols>true</IncludeSymbols>

<PackageVersion>1.0.0</PackageVersion>
<PackageVersion>1.0.1-preview.1</PackageVersion>
<Version>$(PackageVersion)</Version>
</PropertyGroup>

Expand Down
104 changes: 83 additions & 21 deletions src/DotNetty.Common/ThreadLocalPool.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public override void Release<T>(T value)
{
throw new InvalidOperationException("recycled already");
}

stack.Push(this);
}
}
Expand Down Expand Up @@ -107,6 +108,7 @@ internal Head(StrongBox<int> availableSharedCapacity, StrongBox<int> weakTableCo
{
Interlocked.Decrement(ref this.weakTableCounter.Value);
}

if (this.availableSharedCapacity == null)
{
return;
Expand Down Expand Up @@ -138,13 +140,14 @@ internal bool ReserveSpace(int space)
internal static bool ReserveSpace(StrongBox<int> availableSharedCapacity, int space)
{
Debug.Assert(space >= 0);
for (; ; )
for (;;)
{
int available = Volatile.Read(ref availableSharedCapacity.Value);
if (available < space)
{
return false;
}

if (Interlocked.CompareExchange(ref availableSharedCapacity.Value, available - space, available) == available)
{
return true;
Expand All @@ -155,7 +158,9 @@ internal static bool ReserveSpace(StrongBox<int> availableSharedCapacity, int sp

// chain of data items
readonly Head head;

Link tail;

// pointer to another queue of delayed items for the same stack
internal WeakOrderQueue next;
internal readonly WeakReference<Thread> owner;
Expand Down Expand Up @@ -220,10 +225,12 @@ internal void Add(DefaultHandle handle)
// Drop it.
return;
}

// We allocate a Link so reserve the space
this.tail = tail = tail.next = new Link();
writeIndex = tail.WriteIndex;
}

tail.elements[writeIndex] = handle;
handle.Stack = null;
// we lazy set to ensure that setting stack to null appears before we unnull it in the owning thread;
Expand All @@ -236,7 +243,7 @@ internal void Add(DefaultHandle handle)
// transfer as many items as we can from this queue to the stack, returning true if any were transferred
internal bool Transfer(Stack dst)
{
Link head = this.head.link;
Link head = this.head?.link;
if (head == null)
{
return false;
Expand All @@ -248,6 +255,7 @@ internal bool Transfer(Stack dst)
{
return false;
}

this.head.link = head = head.next;
}

Expand All @@ -259,6 +267,11 @@ internal bool Transfer(Stack dst)
return false;
}

if (dst?.elements == null)
{
return false;
}

int dstSize = dst.size;
int expectedCapacity = dstSize + srcSize;

Expand All @@ -273,9 +286,19 @@ internal bool Transfer(Stack dst)
DefaultHandle[] srcElems = head.elements;
DefaultHandle[] dstElems = dst.elements;
int newDstSize = dstSize;
if (head.elements == null)
{
return false;
}

for (int i = srcStart; i < srcEnd; i++)
{
DefaultHandle element = srcElems[i];
if (element == null)
{
return false;
}

Comment on lines +297 to +301
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: shouldn't those elements be just ignored, instead of stopping everything

if (element.recycleId == 0)
{
element.recycleId = element.lastRecycledId;
Expand All @@ -284,13 +307,15 @@ internal bool Transfer(Stack dst)
{
throw new InvalidOperationException("recycled already");
}

srcElems[i] = null;

if (dst.DropHandle(element))
{
// Drop the object.
continue;
}

element.Stack = dst;
dstElems[newDstSize++] = element;
}
Expand All @@ -307,6 +332,7 @@ internal bool Transfer(Stack dst)
{
return false;
}

dst.size = newDstSize;
return true;
}
Expand Down Expand Up @@ -344,8 +370,13 @@ protected sealed class Stack
WeakOrderQueue cursorQueue, prevQueue;
volatile WeakOrderQueue headQueue;

internal Stack(ThreadLocalPool parent, Thread thread, int maxCapacity, int maxSharedCapacityFactor,
int ratioMask, int maxDelayedQueues)
internal Stack(
ThreadLocalPool parent,
Thread thread,
int maxCapacity,
int maxSharedCapacityFactor,
int ratioMask,
int maxDelayedQueues)
{
this.parent = parent;
this.threadRef = new WeakReference<Thread>(thread);
Expand Down Expand Up @@ -410,6 +441,7 @@ void PushNow(DefaultHandle item)
{
throw new InvalidOperationException("released already");
}

item.recycleId = item.lastRecycledId = ownThreadId;

int size = this.size;
Expand All @@ -418,6 +450,7 @@ void PushNow(DefaultHandle item)
// Hit the maximum capacity - drop the possibly youngest object.
return;
}

if (size == this.elements.Length)
{
Array.Resize(ref this.elements, Math.Min(size << 1, this.maxCapacity));
Expand All @@ -443,12 +476,14 @@ void PushLater(DefaultHandle item, Thread thread)
delayedRecycled.Add(this, WeakOrderQueue.Dummy);
return;
}

// Check if we already reached the maximum number of delayed queues and if we can allocate at all.
if ((queue = WeakOrderQueue.Allocate(this, thread, countedWeakTable)) == null)
{
// drop object
return;
}

delayedRecycled.Add(this, queue);
}
else if (queue == WeakOrderQueue.Dummy)
Expand All @@ -469,8 +504,10 @@ internal bool DropHandle(DefaultHandle handle)
// Drop the object.
return true;
}

handle.hasBeenRecycled = true;
}

return false;
}

Expand All @@ -486,15 +523,18 @@ internal bool TryPop(out DefaultHandle item)
item = null;
return false;
}

size = this.size;
}

size--;
DefaultHandle ret = this.elements[size];
elements[size] = null;
if (ret.lastRecycledId != ret.recycleId)
{
throw new InvalidOperationException("recycled multiple times");
}

ret.recycleId = 0;
ret.lastRecycledId = 0;
this.size = size;
Expand Down Expand Up @@ -564,6 +604,7 @@ bool ScavengeSome()
}
}
}

if (prev != null)
{
prev.Next = next;
Expand Down Expand Up @@ -604,6 +645,7 @@ public class CountedWeakTable

internal readonly StrongBox<int> Counter = new StrongBox<int>();
}

protected override CountedWeakTable GetInitialValue() => new CountedWeakTable();
}

Expand All @@ -612,26 +654,31 @@ static ThreadLocalPool()
// In the future, we might have different maxCapacity for different object types.
// e.g. io.netty.recycler.maxCapacity.writeTask
// io.netty.recycler.maxCapacity.outboundBuffer
int maxCapacityPerThread = SystemPropertyUtil.GetInt("io.netty.recycler.maxCapacityPerThread",
SystemPropertyUtil.GetInt("io.netty.recycler.maxCapacity", DefaultInitialMaxCapacityPerThread));
int maxCapacityPerThread = SystemPropertyUtil.GetInt(
"io.netty.recycler.maxCapacityPerThread",
SystemPropertyUtil.GetInt("io.netty.recycler.maxCapacity", DefaultInitialMaxCapacityPerThread));
if (maxCapacityPerThread < 0)
{
maxCapacityPerThread = DefaultInitialMaxCapacityPerThread;
}

DefaultMaxCapacityPerThread = maxCapacityPerThread;

DefaultMaxSharedCapacityFactor = Math.Max(2,
SystemPropertyUtil.GetInt("io.netty.recycler.maxSharedCapacityFactor",
2));
DefaultMaxSharedCapacityFactor = Math.Max(
2,
SystemPropertyUtil.GetInt(
"io.netty.recycler.maxSharedCapacityFactor",
2));

DefaultMaxDelayedQueuesPerThread = Math.Max(0,
SystemPropertyUtil.GetInt("io.netty.recycler.maxDelayedQueuesPerThread",
// We use the same value as default EventLoop number
Environment.ProcessorCount * 2));
DefaultMaxDelayedQueuesPerThread = Math.Max(
0,
SystemPropertyUtil.GetInt(
"io.netty.recycler.maxDelayedQueuesPerThread",
// We use the same value as default EventLoop number
Environment.ProcessorCount * 2));

LinkCapacity = MathUtil.SafeFindNextPositivePowerOfTwo(
Math.Max(SystemPropertyUtil.GetInt("io.netty.recycler.linkCapacity", 16), 16));
Math.Max(SystemPropertyUtil.GetInt("io.netty.recycler.linkCapacity", 16), 16));

// By default we allow one push to a Recycler for each 8th try on handles that were never recycled before.
// This should help to slowly increase the capacity of the recycler while not be too sensitive to allocation
Expand Down Expand Up @@ -663,12 +710,15 @@ static ThreadLocalPool()
}

public ThreadLocalPool(int maxCapacityPerThread)
: this (maxCapacityPerThread, DefaultMaxSharedCapacityFactor, DefaultRatio, DefaultMaxDelayedQueuesPerThread)
: this(maxCapacityPerThread, DefaultMaxSharedCapacityFactor, DefaultRatio, DefaultMaxDelayedQueuesPerThread)
{
}

public ThreadLocalPool(int maxCapacityPerThread, int maxSharedCapacityFactor,
int ratio, int maxDelayedQueuesPerThread)
public ThreadLocalPool(
int maxCapacityPerThread,
int maxSharedCapacityFactor,
int ratio,
int maxDelayedQueuesPerThread)
{
this.ratioMask = MathUtil.SafeFindNextPositivePowerOfTwo(ratio) - 1;
if (maxCapacityPerThread <= 0)
Expand Down Expand Up @@ -718,8 +768,13 @@ public ThreadLocalPool(Func<Handle, T> valueFactory, int maxCapacityPerThread, i
{
}

public ThreadLocalPool(Func<Handle, T> valueFactory, int maxCapacityPerThread, int maxSharedCapacityFactor,
int ratio, int maxDelayedQueuesPerThread, bool preCreate = false)
public ThreadLocalPool(
Func<Handle, T> valueFactory,
int maxCapacityPerThread,
int maxSharedCapacityFactor,
int ratio,
int maxDelayedQueuesPerThread,
bool preCreate = false)
: base(maxCapacityPerThread, maxSharedCapacityFactor, ratio, maxDelayedQueuesPerThread)
{
Contract.Requires(valueFactory != null);
Expand All @@ -743,6 +798,7 @@ public T Take()
{
handle = CreateValue(stack);
}

return (T)handle.Value;
}

Expand All @@ -768,15 +824,21 @@ public ThreadLocalStack(ThreadLocalPool<T> owner)

protected override Stack GetInitialValue()
{
var stack = new Stack(this.owner, Thread.CurrentThread, this.owner.maxCapacityPerThread,
this.owner.maxSharedCapacityFactor, this.owner.ratioMask, this.owner.maxDelayedQueuesPerThread);
var stack = new Stack(
this.owner,
Thread.CurrentThread,
this.owner.maxCapacityPerThread,
this.owner.maxSharedCapacityFactor,
this.owner.ratioMask,
this.owner.maxDelayedQueuesPerThread);
if (this.owner.preCreate)
{
for (int i = 0; i < this.owner.maxCapacityPerThread; i++)
{
stack.Push(this.owner.CreateValue(stack));
}
}

return stack;
}

Expand Down