Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client Republish improvements #2468

Merged
merged 17 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 15 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Libraries/Opc.Ua.Client/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5668,7 +5668,7 @@ ITransportChannel transportChannel

default:
result = false;
Utils.LogError(e, "Unexpected error sending republish request.");
Utils.LogError("Unexpected error {0} sending republish request.", error);
break;
}

Expand Down Expand Up @@ -5767,6 +5767,7 @@ private void ProcessPublishResponse(
_ = availableSequenceNumbers?.Remove(notificationMessage.SequenceNumber);
}

// match an acknowledgement to be sent back to the server.
for (int ii = 0; ii < m_acknowledgementsToSend.Count; ii++)
{
SubscriptionAcknowledgement acknowledgement = m_acknowledgementsToSend[ii];
Expand All @@ -5775,7 +5776,8 @@ private void ProcessPublishResponse(
{
acknowledgementsToSend.Add(acknowledgement);
}
else if (availableSequenceNumbers == null || availableSequenceNumbers.Remove(acknowledgement.SequenceNumber))
else if (availableSequenceNumbers == null ||
availableSequenceNumbers.Remove(acknowledgement.SequenceNumber))
{
acknowledgementsToSend.Add(acknowledgement);
UpdateLatestSequenceNumberToSend(ref latestSequenceNumberToSend, acknowledgement.SequenceNumber);
Expand Down
2 changes: 1 addition & 1 deletion Libraries/Opc.Ua.Client/SessionAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1502,7 +1502,7 @@ private async Task ReconnectAsync(ITransportWaitingConnection connection, ITrans

try
{
_ = await operation.EndAsync(kReconnectTimeout / 2, true, ct).ConfigureAwait(false);
_ = await operation.EndAsync(kReconnectTimeout, true, ct).ConfigureAwait(false);
}
catch (ServiceResultException)
{
Expand Down
75 changes: 54 additions & 21 deletions Libraries/Opc.Ua.Client/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Runtime.Serialization;
using System.Threading;
Expand All @@ -43,8 +44,10 @@ namespace Opc.Ua.Client
[DataContract(Namespace = Namespaces.OpcUaXsd)]
public partial class Subscription : IDisposable, ICloneable
{
const int MinKeepAliveTimerInterval = 1000;
const int KeepAliveTimerMargin = 1000;
const int kMinKeepAliveTimerInterval = 1000;
const int kKeepAliveTimerMargin = 1000;
const int kRepublishMessageTimeout = 2000;
const int kRepublishMessageExpiredTimeout = 10000;

#region Constructors
/// <summary>
Expand Down Expand Up @@ -506,7 +509,7 @@ public IEnumerable<MonitoredItem> MonitoredItems
{
lock (m_cache)
{
return new List<MonitoredItem>(m_monitoredItems.Values);
return m_monitoredItems.Values.AsEnumerable();
mregen marked this conversation as resolved.
Show resolved Hide resolved
}
}
}
Expand Down Expand Up @@ -797,7 +800,7 @@ public bool PublishingStopped
get
{
TimeSpan timeSinceLastNotification = TimeSpan.FromTicks(DateTime.UtcNow.Ticks - Interlocked.Read(ref m_lastNotificationTime));
if (timeSinceLastNotification.TotalMilliseconds > m_keepAliveInterval + KeepAliveTimerMargin)
if (timeSinceLastNotification.TotalMilliseconds > m_keepAliveInterval + kKeepAliveTimerMargin)
{
return true;
}
Expand Down Expand Up @@ -1475,8 +1478,9 @@ public void SaveMessageInCache(
entry = node.Value;
LinkedListNode<IncomingMessage> next = node.Next;

// can only pull off processed or expired messages.
if (!entry.Processed && !(entry.Republished && entry.Timestamp.AddSeconds(10) < now))
// can only pull off processed or expired or missing messages.
if (!entry.Processed &&
!(entry.Republished && (entry.RepublishStatus != StatusCodes.Good || entry.Timestamp.AddMilliseconds(kRepublishMessageExpiredTimeout) < now)))
{
break;
}
Expand Down Expand Up @@ -1737,7 +1741,7 @@ private void ProcessTransferredSequenceNumbers(UInt32Collection availableSequenc
// only republish consecutive sequence numbers
// triggers the republish mechanism immediately,
// if event is in the past
var now = DateTime.UtcNow.AddSeconds(-5);
var now = DateTime.UtcNow.AddMilliseconds(-kRepublishMessageTimeout * 2);
uint lastSequenceNumberToRepublish = m_lastSequenceNumberProcessed - 1;
int availableNumbers = availableSequenceNumbers.Count;
int republishMessages = 0;
Expand Down Expand Up @@ -1838,10 +1842,10 @@ private void StartKeepAliveTimer()

Interlocked.Exchange(ref m_lastNotificationTime, DateTime.UtcNow.Ticks);
m_keepAliveInterval = (int)(Math.Min(m_currentPublishingInterval * (m_currentKeepAliveCount + 1), Int32.MaxValue));
if (m_keepAliveInterval < MinKeepAliveTimerInterval)
if (m_keepAliveInterval < kMinKeepAliveTimerInterval)
{
m_keepAliveInterval = (int)(Math.Min(m_publishingInterval * (m_keepAliveCount + 1), Int32.MaxValue));
m_keepAliveInterval = Math.Min(MinKeepAliveTimerInterval, m_keepAliveInterval);
m_keepAliveInterval = Math.Min(kMinKeepAliveTimerInterval, m_keepAliveInterval);
}
#if NET6_0_OR_GREATER
var publishTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(m_keepAliveInterval));
Expand Down Expand Up @@ -1975,7 +1979,7 @@ internal void TraceState(string context)
/// </summary>
private int BeginPublishTimeout()
{
return Math.Max(Math.Min(m_keepAliveInterval * 3, Int32.MaxValue), MinKeepAliveTimerInterval); ;
return Math.Max(Math.Min(m_keepAliveInterval * 3, Int32.MaxValue), kMinKeepAliveTimerInterval); ;
}

/// <summary>
Expand Down Expand Up @@ -2235,16 +2239,28 @@ private async Task OnMessageReceivedAsync(CancellationToken ct)
// check for missing messages.
else if (ii.Next != null && ii.Value.Message == null && !ii.Value.Processed && !ii.Value.Republished)
{
if (ii.Value.Timestamp.AddSeconds(2) < DateTime.UtcNow)
// tolerate if a single request was received out of order
if (ii.Next.Next != null &&
ii.Value.Timestamp.AddMilliseconds(kRepublishMessageTimeout) < DateTime.UtcNow)
{
if (messagesToRepublish == null)
{
messagesToRepublish = new List<IncomingMessage>();
}

messagesToRepublish.Add(ii.Value);
ii.Value.Republished = true;
publishStateChangedMask |= PublishStateChangedMask.Republish;

// only call republish if the sequence number is available
if (m_availableSequenceNumbers?.Contains(ii.Value.SequenceNumber) == true)
{
if (messagesToRepublish == null)
{
messagesToRepublish = new List<IncomingMessage>();
}

messagesToRepublish.Add(ii.Value);
}
else
{
Utils.LogInfo("Skipped to receive RepublishAsync for {0}-{1}-BadMessageNotAvailable", subscriptionId, ii.Value.SequenceNumber);
ii.Value.RepublishStatus = StatusCodes.BadMessageNotAvailable;
}
}
}
#if DEBUG
Expand Down Expand Up @@ -2373,12 +2389,27 @@ private async Task OnMessageReceivedAsync(CancellationToken ct)
// do any re-publishes.
if (messagesToRepublish != null && session != null && subscriptionId != 0)
{
for (int ii = 0; ii < messagesToRepublish.Count; ii++)
int count = messagesToRepublish.Count;
var tasks = new Task<(bool, ServiceResult)>[count];
for (int ii = 0; ii < count; ii++)
{
(bool success, _) = await session.RepublishAsync(subscriptionId, messagesToRepublish[ii].SequenceNumber, ct).ConfigureAwait(false);
if (!success)
tasks[ii] = session.RepublishAsync(subscriptionId, messagesToRepublish[ii].SequenceNumber, ct);
}

await Task.WhenAll(tasks).ConfigureAwait(false);

lock (m_cache)
{
for (int ii = 0; ii < count; ii++)
{
messagesToRepublish[ii].Republished = false;
bool success = false;
ServiceResult serviceResult = StatusCodes.BadMessageNotAvailable;
if (tasks[ii].IsCompleted)
{
(success, serviceResult) = tasks[ii].Result.ToTuple();
}
messagesToRepublish[ii].Republished = success;
messagesToRepublish[ii].RepublishStatus = serviceResult.StatusCode;
}
}
}
Expand Down Expand Up @@ -2705,6 +2736,7 @@ private IncomingMessage FindOrCreateEntry(DateTime utcNow, uint sequenceNumber)
IncomingMessage entry = null;
LinkedListNode<IncomingMessage> node = m_incomingMessages.Last;

Debug.Assert(Monitor.IsEntered(m_cache));
while (node != null)
{
entry = node.Value;
Expand Down Expand Up @@ -2803,6 +2835,7 @@ private class IncomingMessage
public NotificationMessage Message;
public bool Processed;
public bool Republished;
public StatusCode RepublishStatus;
}

private LinkedList<IncomingMessage> m_incomingMessages;
Expand Down
2 changes: 1 addition & 1 deletion Stack/Opc.Ua.Core/Security/Constants/SecurityPolicies.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public static class SecurityPolicies
#region Static Methods
private static bool IsPlatformSupportedUri(string name)
{
if (name.Equals(nameof(Aes256_Sha256_RsaPss)) &&
if (name.Equals(nameof(Aes256_Sha256_RsaPss), StringComparison.Ordinal) &&
!RsaUtils.IsSupportingRSAPssSign.Value)
{
return false;
Expand Down
13 changes: 12 additions & 1 deletion targets.props
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
A build with all custom targets which are not part of a regular build
is scheduled once a week in the DevOps build pipeline.
Uncomment the following lines to test a custom test target
supported values: net462, netstandard2.1, net48, net6.0, net8.0
supported values: net462, netstandard2.0, netstandard2.1, net472, net48, net6.0, net8.0
-->
<!--
<PropertyGroup>
Expand All @@ -34,6 +34,17 @@
<HttpsTargetFrameworks>net462</HttpsTargetFrameworks>
</PropertyGroup>
</When>
<When Condition="'$(CustomTestTarget)' == 'net472'">
<PropertyGroup>
<AnalysisLevel>preview-all</AnalysisLevel>
<AppTargetFrameworks>net472</AppTargetFrameworks>
<AppTargetFramework>net472</AppTargetFramework>
<TestsTargetFrameworks>net472</TestsTargetFrameworks>
<LibTargetFrameworks>net472</LibTargetFrameworks>
<LibxTargetFrameworks>net472</LibxTargetFrameworks>
<HttpsTargetFrameworks>net472</HttpsTargetFrameworks>
</PropertyGroup>
</When>
<When Condition="'$(CustomTestTarget)' == 'netstandard2.0'">
<PropertyGroup>
<DisableECCTests>true</DisableECCTests>
Expand Down
Loading