Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poller.remove socket disposed fix #835

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
3b8dbb9
added NetMQPollerTest.RemoveAndDisposeSocket() as a repro for issue #834
jasells Dec 3, 2019
d0f2725
Task refactor of .Remove(ISocketPollable) passes test!
jasells Dec 4, 2019
53e1833
speed up the test a little
jasells Dec 4, 2019
68429d7
async fix supporting .NET40, broke a test
jasells Dec 4, 2019
dea90f4
un-break the API back to sycnronous (but now working)
jasells Dec 8, 2019
6b344ca
rename new test to avoid confusion
jasells Dec 8, 2019
c879804
fixed RemoveAndDispose()
jasells Dec 8, 2019
54c2931
fix other Remove methods
jasells Dec 8, 2019
bc362a9
clean up the RUn method for .NET40 compat
jasells Dec 8, 2019
98a0f00
undo changes in RemoveThrowsIfSocketAlreadyDisposed
jasells Dec 8, 2019
a3819e1
fixing spacing complaints
jasells Dec 8, 2019
4f3b1e6
return task from tests
jasells Dec 21, 2019
33816cb
fixing wait() calls
jasells Dec 21, 2019
bda8b3f
use Task.Factory.StartNew
jasells Dec 21, 2019
cb16939
initial refactor works, except for one test.
jasells Feb 18, 2020
544bb94
fix Selector to handle disposed sockets
jasells Feb 18, 2020
87b7649
added comment about previous commmit
jasells Feb 18, 2020
92163b1
removed comment
jasells Feb 18, 2020
51457d1
fix region tag formatting
jasells Feb 18, 2020
fce2bb0
sync with master
jasells Feb 29, 2020
38910d8
updated comments
jasells Feb 29, 2020
3bacfb8
cleaer names for async socket remove tests
jasells Feb 29, 2020
455f6b0
I believe this should address the net45/net40 Task.FromResult() concern
jasells Feb 29, 2020
52fa20e
ensure tasks are queued on the poller, not the task pool.
jasells Mar 31, 2020
884d4ea
remove the dictionary checks.
jasells Mar 31, 2020
9f17258
fix indentation
jasells Mar 31, 2020
3ea0693
removed comment that needs seperate PR
jasells Apr 7, 2020
72ebdbc
added RemoveAsync(Socket ) overload
jasells Apr 24, 2020
c14f9f9
comment update
jasells Apr 24, 2020
0a767c6
added REmoveASync(timer) overload and test
jasells Apr 24, 2020
fa5e49b
fix depricated message
jasells Apr 24, 2020
83a34b6
refactored ContainsAsync() methods
jasells Apr 24, 2020
217e4f4
added doc block
jasells Apr 24, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
151 changes: 151 additions & 0 deletions src/NetMQ.Tests/NetMQPollerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -347,6 +347,8 @@ public void RemoveSocket()
// identity
e.Socket.SkipFrame();

//**Note: bad to assert from worker thread!
// If it fails, the test will crash, not report failure!
Assert.Equal("Hello", e.Socket.ReceiveFrameString(out bool more));
Assert.False(more);

Expand Down Expand Up @@ -400,6 +402,150 @@ public void RemoveSocket()
}
}

[Fact]
public async void RemoveAndDisposeSocket()
jasells marked this conversation as resolved.
Show resolved Hide resolved
{
//set up poller, start it
var patient = new NetMQPoller();
patient.RunAsync();

Assert.True(patient.IsRunning);

//create a pub-sub pair
var port = 55667;
var conn = $"tcp://127.0.0.1:{port}";

var pub = new PublisherSocket();
pub.Bind(conn);

var sub = new SubscriberSocket();
sub.Connect(conn);
sub.SubscribeToAnyTopic();

//handle callbacks from poller thread
sub.ReceiveReady += (s, e) =>
{
var msg = e.Socket.ReceiveFrameString();

Debug.WriteLine($"sub has data: {msg}");
};

//add the subscriber socket to poller
patient.Add(sub);

//set up pub on separate thread
var canceller = new CancellationTokenSource();

var pubAction = new Action(async () =>
{
var token = canceller.Token;

uint i = 0;

while (!token.IsCancellationRequested)
{
pub.SendFrame($"Hello-{++i}");

// send ~ 5Hz
await Task.Delay(200);
}
});

var pubThread = Task.Run(pubAction);

//allow a little time to run
await Task.Delay(2000);

//now try to remove the sub from poller
patient.RemoveAndDispose(sub);

Assert.True(sub.IsDisposed);

//allow for poller to continue running
await Task.Delay(2000);

patient.Stop();
Assert.False(patient.IsRunning);

canceller.Cancel();

pub?.Dispose();
patient?.Dispose();
jasells marked this conversation as resolved.
Show resolved Hide resolved
}

[Fact]
public async void DisposeSocketAfterRemoval()
jasells marked this conversation as resolved.
Show resolved Hide resolved
{
//set up poller, start it
var patient = new NetMQPoller();
patient.RunAsync();

Assert.True(patient.IsRunning);

//create a pub-sub pair
var port = 55667;
var conn = $"tcp://127.0.0.1:{port}";

var pub = new PublisherSocket();
pub.Bind(conn);

var sub = new SubscriberSocket();
sub.Connect(conn);
sub.SubscribeToAnyTopic();

//handle callbacks from poller thread
sub.ReceiveReady += (s, e) =>
{
var msg = e.Socket.ReceiveFrameString();

Debug.WriteLine($"sub has data: {msg}");
};

//add the subscriber socket to poller
patient.Add(sub);

//set up pub on separate thread
var canceller = new CancellationTokenSource();

var pubAction = new Action(async () =>
{
var token = canceller.Token;

uint i = 0;

while(!token.IsCancellationRequested)
{
pub.SendFrame($"Hello-{++i}");

// send ~ 5Hz
await Task.Delay(200);
}
});

var pubThread = Task.Run(pubAction);
jasells marked this conversation as resolved.
Show resolved Hide resolved

//allow a little time to run
await Task.Delay(2000);

//now try to remove the sub from poller
patient.Remove(sub);

// dispose the sub (this will cause exception on poller's worker-thread) and it can't be caught!
sub.Dispose();
sub = null;
jasells marked this conversation as resolved.
Show resolved Hide resolved

//allow for poller to continue running
await Task.Delay(2000);

patient.Stop();
Assert.False(patient.IsRunning);

canceller.Cancel();

pub?.Dispose();
patient?.Dispose();
jasells marked this conversation as resolved.
Show resolved Hide resolved
}

[Fact]
public void AddThrowsIfSocketAlreadyDisposed()
{
Expand Down Expand Up @@ -454,6 +600,11 @@ public void DisposeThrowsIfSocketAlreadyDisposed()
// Dispose the socket.
// It is incorrect to have a disposed socket in a poller.
// Disposed sockets can throw into the poller's thread.

//**JASells: And what does that have to do with removing one? Should check for disposed
// socket on Add, not Remove! Check only internally on to avoid accessing a potentially,
// disposed socket, but otherwise, removing it from the list is safe, and preferable to
// throwing exception since it makes the poller more resilient to unintended mis-use!
socket.Dispose();

// Dispose throws if a polled socket is disposed
Expand Down
1 change: 1 addition & 0 deletions src/NetMQ/ISocketPollableCollection.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Threading.Tasks;
jasells marked this conversation as resolved.
Show resolved Hide resolved
using JetBrains.Annotations;
using NetMQ.Monitoring;

Expand Down
18 changes: 18 additions & 0 deletions src/NetMQ/NetMQ.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -130,4 +130,22 @@
<PackageReference Include="SourceLink.Create.CommandLine" Version="2.7.4" PrivateAssets="All" />
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net47'">
<PackageReference Include="Microsoft.Bcl.Async">
<Version>1.0.168</Version>
</PackageReference>
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'net40'">
<PackageReference Include="Microsoft.Bcl.Async">
<Version>1.0.168</Version>
</PackageReference>
</ItemGroup>

<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.0'">
<PackageReference Include="Microsoft.Bcl.Async">
<Version>1.0.168</Version>
</PackageReference>
</ItemGroup>

</Project>
72 changes: 45 additions & 27 deletions src/NetMQ/NetMQPoller.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Linq;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using NetMQ.Core.Utils;
#if !NET35
Expand Down Expand Up @@ -112,21 +113,47 @@ protected override void QueueTask(Task task)
m_tasksQueue.Enqueue(task);
}

public void Run([NotNull] Action action)
public Task Run([NotNull] Action action)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Breaking public API is not a good idea, and goes against the C4.1 process that this project uses.

I agree that returning a task would be better. It'd be better to create overloads:

public Task RunAsync([NotNull] Action action);
public Task RunAsync([NotNull] Func<CancellationToken> action, CancellationToken cancellationToken);

Copy link
Author

@jasells jasells Dec 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree, but was trying to avoid that because I originally thought only the Remove methods were public, and chainging only the non-public RunAsync method would avoid breaking changes...

Why is RunAsync even public?

The only reason I can see is to work around the original thread race bug when calling Remove()?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is RunAsync even public?

NetMQPoller is a TaskScheduler and can be used to schedule work to run on the same synchronisation context (in this case: thread) as the poller. RunAsync is useful when you need to schedule work that needs to be on that thread, which is not an uncommon requirement.

Regardless, it's public API and must be preserved.

Copy link
Author

@jasells jasells Jan 21, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regardless, it's public API and must be preserved.

Got that.

RunAsync is useful when you need to schedule work that needs to be on that thread, which is not an uncommon requirement.

I think a public RunAsync is a design flaw. NetMQ objects, NetMQPoller in this case could and should handle marshalling work that needs to be done on the polling thread internally. Making it public exposes a large path for external code to break NetMQPoller and is the root source of the issue being discussed. The only case I can think of currently is this case (add/remove a socket from the polled group). Certainly, the primary use case for this method is the async polling of the sockets in the list for data... for which all the rest of the framework code is either internal to NetMQ or protected/private, which makes sense.

I suggest that the public RunAsync be marked as deprecated..... this doesn't break the API, but does start the process toward resolving the root issue.

{
Task t;

if (!IsRunning || CanExecuteTaskInline)
{
action();

t = FromResult<object>(null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rather than instantiate a new completed task for each invocation, cache an instance in a CompletedTask field and reuse it. You can do that in an internal utility class if you like, maybe called Tasks.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This same pattern is also in existing overloaded method ContainsAsync. Also, I'm not convinced it is a good idea to share TaskCompletionSource instances among (potentially) multiple threads...?

Copy link
Member

@drewnoakes drewnoakes Mar 12, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A completed task is safe to share. Use Task.CompletedTask for TFMs that support it, and cache your own (from a TaskCompletionSource<>) for remaining TFMs.

The benefit of doing so is that this code path will not perform any heap allocations.

}
else
new Task(action).Start(this);
{
t = new Task(action);
t.Start(this);
Comment on lines +135 to +136
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Task.Factory.StartNew is preferred over new Task(...).Start (article).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh.... yes, I was wishing Thread.Start() was fluent.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even if Thread.Start did exist, it would be too heavyweight. We don't need or want to start a new thread for this background work. We need this task to run asynchronously on the poller's thread, which is achieved by passing this as the Task's TaskScheduler.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sry, I said Thread.Start but I meant Task.Start()

}

return t;
}

/// <summary>
/// Provides a completed task with the result for a syncronously run action.
/// this only needed for .NET40. Depricated by <see cref="Task.FromResult()"/> in 4.5+
/// </summary>
/// <typeparam name="TResult"></typeparam>
/// <param name="result"></param>
/// <returns></returns>
private static Task<TResult> FromResult<TResult>(TResult result)
{
var tcs = new TaskCompletionSource<TResult>();
tcs.SetResult(result);
return tcs.Task;
}
jasells marked this conversation as resolved.
Show resolved Hide resolved

#else
private void Run(Action action)
{
action();
}
#endif

#endregion
#endregion
jasells marked this conversation as resolved.
Show resolved Hide resolved

public NetMQPoller()
{
Expand Down Expand Up @@ -225,6 +252,11 @@ public void Remove(ISocketPollable socket)
{
if (socket == null)
throw new ArgumentNullException(nameof(socket));

// JASells: not sure I agree with this thow.
// If trying to remove a disposed socket, why complain? It *might* work. The issue
// is if the poller's thread tries to actually service the socket before the remove call...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd remove this from the code and discuss in an issue/PR instead where conversation can happen more easily.

The rationale here was to help people discover when they're tearing things down in the wrong order, as that is a common cause of deadlocks and other problems. If you're going to fail some of the time, it's better to fail all of the time as it makes it easier to write code correctly.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a note to do that =)

if (socket.IsDisposed)
throw new ArgumentException("Must not be disposed.", nameof(socket));
CheckDisposed();
Expand All @@ -243,33 +275,18 @@ public void Remove(ISocketPollable socket)
socket.Socket.EventsChanged -= OnSocketEventsChanged;
m_sockets.Remove(socket.Socket);
m_isPollSetDirty = true;
});
})
.Wait();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling Wait on tasks can lead to deadlocks in complex code.

I'd suggest making a RemoveAsync overload that returns Task.

As an aside, it's better to use GetAwaiter().GetResult() than Wait(), mostly because the latter wraps any thrown exception in a redundant AggregateException which can make debugging more difficult. Still, synchronous blocking waits in library code like this should be avoided. There are definitely user scenarios where deadlocks can occur.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lol.

@ first i tried await, but there was an issue using await in .NET40.... so I changed it to Wait().... forgot about these methods.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem likely was that you cannot await in a non-async method.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the latest addresses this by adding new RemoveAsync() method, and reverting the old version, along with the original issue.
I marked the Remove methods as Obsolete...

// keep the API syncronous by blocking the calling thread via Wait(), else RemoveThrowsIfSocketAlreadyDisposed() test fails
}

public void RemoveAndDispose<T>(T socket) where T : ISocketPollable, IDisposable
{
if (socket == null)
throw new ArgumentNullException(nameof(socket));
if (socket.IsDisposed)
throw new ArgumentException("Must not be disposed.", nameof(socket));
CheckDisposed();

Run(() =>
{
// Ensure the socket wasn't disposed while this code was waiting to be run on the poller thread
if (socket.IsDisposed)
throw new InvalidOperationException(
$"{nameof(NetMQPoller)}.{nameof(RemoveAndDispose)} was called from a non-poller thread, " +
"so ran asynchronously. " +
$"The {socket.GetType().Name} being removed was disposed while the remove " +
$"operation waited to start on the poller thread. When using {nameof(RemoveAndDispose)} " +
"you should not dispose the pollable object .");
//call the remove method
Remove(socket);

socket.Socket.EventsChanged -= OnSocketEventsChanged;
m_sockets.Remove(socket.Socket);
m_isPollSetDirty = true;
socket.Dispose();
});
//dispose of socket
socket.Dispose();
jasells marked this conversation as resolved.
Show resolved Hide resolved
}

public void Remove([NotNull] NetMQTimer timer)
Expand All @@ -280,7 +297,7 @@ public void Remove([NotNull] NetMQTimer timer)

timer.When = -1;

Run(() => m_timers.Remove(timer));
Run(() => m_timers.Remove(timer)).Wait();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, synchronous waits are not safe. This would have to be a RemoveAsync overload.

}

public void Remove([NotNull] Socket socket)
Expand All @@ -293,7 +310,8 @@ public void Remove([NotNull] Socket socket)
{
m_pollinSockets.Remove(socket);
m_isPollSetDirty = true;
});
})
.Wait();
jasells marked this conversation as resolved.
Show resolved Hide resolved
}

#endregion
Expand Down