diff --git a/Libraries/Opc.Ua.Client/Session.cs b/Libraries/Opc.Ua.Client/Session.cs index 87e8a03e6..f64b63841 100644 --- a/Libraries/Opc.Ua.Client/Session.cs +++ b/Libraries/Opc.Ua.Client/Session.cs @@ -5767,6 +5767,7 @@ private void ProcessPublishResponse( _ = availableSequenceNumbers?.Remove(notificationMessage.SequenceNumber); } + // match an acknowledgement to be sent back to the server. for (int ii = 0; ii < m_acknowledgementsToSend.Count; ii++) { SubscriptionAcknowledgement acknowledgement = m_acknowledgementsToSend[ii]; @@ -5775,7 +5776,8 @@ private void ProcessPublishResponse( { acknowledgementsToSend.Add(acknowledgement); } - else if (availableSequenceNumbers == null || availableSequenceNumbers.Remove(acknowledgement.SequenceNumber)) + else if (availableSequenceNumbers == null || + availableSequenceNumbers.Remove(acknowledgement.SequenceNumber)) { acknowledgementsToSend.Add(acknowledgement); UpdateLatestSequenceNumberToSend(ref latestSequenceNumberToSend, acknowledgement.SequenceNumber); diff --git a/Libraries/Opc.Ua.Client/Subscription.cs b/Libraries/Opc.Ua.Client/Subscription.cs index 114e80a3e..46cf56c77 100644 --- a/Libraries/Opc.Ua.Client/Subscription.cs +++ b/Libraries/Opc.Ua.Client/Subscription.cs @@ -29,6 +29,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Runtime.Serialization; using System.Threading; @@ -43,8 +44,10 @@ namespace Opc.Ua.Client [DataContract(Namespace = Namespaces.OpcUaXsd)] public partial class Subscription : IDisposable, ICloneable { - const int MinKeepAliveTimerInterval = 1000; - const int KeepAliveTimerMargin = 1000; + const int kMinKeepAliveTimerInterval = 1000; + const int kKeepAliveTimerMargin = 1000; + const int kRepublishMessageTimeout = 2500; + const int kRepublishMessageExpiredTimeout = 10000; #region Constructors /// @@ -797,7 +800,7 @@ public bool PublishingStopped get { TimeSpan timeSinceLastNotification = TimeSpan.FromTicks(DateTime.UtcNow.Ticks - Interlocked.Read(ref m_lastNotificationTime)); - if (timeSinceLastNotification.TotalMilliseconds > m_keepAliveInterval + KeepAliveTimerMargin) + if (timeSinceLastNotification.TotalMilliseconds > m_keepAliveInterval + kKeepAliveTimerMargin) { return true; } @@ -1475,8 +1478,9 @@ public void SaveMessageInCache( entry = node.Value; LinkedListNode next = node.Next; - // can only pull off processed or expired messages. - if (!entry.Processed && !(entry.Republished && entry.Timestamp.AddSeconds(10) < now)) + // can only pull off processed or expired or missing messages. + if (!entry.Processed && + !(entry.Republished && (entry.RepublishStatus != StatusCodes.Good || entry.Timestamp.AddMilliseconds(kRepublishMessageExpiredTimeout) < now))) { break; } @@ -1737,7 +1741,7 @@ private void ProcessTransferredSequenceNumbers(UInt32Collection availableSequenc // only republish consecutive sequence numbers // triggers the republish mechanism immediately, // if event is in the past - var now = DateTime.UtcNow.AddSeconds(-5); + var now = DateTime.UtcNow.AddMilliseconds(-kRepublishMessageTimeout * 2); uint lastSequenceNumberToRepublish = m_lastSequenceNumberProcessed - 1; int availableNumbers = availableSequenceNumbers.Count; int republishMessages = 0; @@ -1838,10 +1842,10 @@ private void StartKeepAliveTimer() Interlocked.Exchange(ref m_lastNotificationTime, DateTime.UtcNow.Ticks); m_keepAliveInterval = (int)(Math.Min(m_currentPublishingInterval * (m_currentKeepAliveCount + 1), Int32.MaxValue)); - if (m_keepAliveInterval < MinKeepAliveTimerInterval) + if (m_keepAliveInterval < kMinKeepAliveTimerInterval) { m_keepAliveInterval = (int)(Math.Min(m_publishingInterval * (m_keepAliveCount + 1), Int32.MaxValue)); - m_keepAliveInterval = Math.Min(MinKeepAliveTimerInterval, m_keepAliveInterval); + m_keepAliveInterval = Math.Min(kMinKeepAliveTimerInterval, m_keepAliveInterval); } #if NET6_0_OR_GREATER var publishTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(m_keepAliveInterval)); @@ -1975,7 +1979,7 @@ internal void TraceState(string context) /// private int BeginPublishTimeout() { - return Math.Max(Math.Min(m_keepAliveInterval * 3, Int32.MaxValue), MinKeepAliveTimerInterval); ; + return Math.Max(Math.Min(m_keepAliveInterval * 3, Int32.MaxValue), kMinKeepAliveTimerInterval); } /// @@ -2235,16 +2239,28 @@ private async Task OnMessageReceivedAsync(CancellationToken ct) // check for missing messages. else if (ii.Next != null && ii.Value.Message == null && !ii.Value.Processed && !ii.Value.Republished) { - if (ii.Value.Timestamp.AddSeconds(2) < DateTime.UtcNow) + // tolerate if a single request was received out of order + if (ii.Next.Next != null && + ii.Value.Timestamp.AddMilliseconds(kRepublishMessageTimeout) < DateTime.UtcNow) { - if (messagesToRepublish == null) - { - messagesToRepublish = new List(); - } - - messagesToRepublish.Add(ii.Value); ii.Value.Republished = true; publishStateChangedMask |= PublishStateChangedMask.Republish; + + // only call republish if the sequence number is available + if (m_availableSequenceNumbers?.Contains(ii.Value.SequenceNumber) == true) + { + if (messagesToRepublish == null) + { + messagesToRepublish = new List(); + } + + messagesToRepublish.Add(ii.Value); + } + else + { + Utils.LogInfo("Skipped to receive RepublishAsync for {0}-{1}-BadMessageNotAvailable", subscriptionId, ii.Value.SequenceNumber); + ii.Value.RepublishStatus = StatusCodes.BadMessageNotAvailable; + } } } #if DEBUG @@ -2373,12 +2389,27 @@ private async Task OnMessageReceivedAsync(CancellationToken ct) // do any re-publishes. if (messagesToRepublish != null && session != null && subscriptionId != 0) { - for (int ii = 0; ii < messagesToRepublish.Count; ii++) + int count = messagesToRepublish.Count; + var tasks = new Task<(bool, ServiceResult)>[count]; + for (int ii = 0; ii < count; ii++) + { + tasks[ii] = session.RepublishAsync(subscriptionId, messagesToRepublish[ii].SequenceNumber, ct); + } + + await Task.WhenAll(tasks).ConfigureAwait(false); + + lock (m_cache) { - (bool success, _) = await session.RepublishAsync(subscriptionId, messagesToRepublish[ii].SequenceNumber, ct).ConfigureAwait(false); - if (!success) + for (int ii = 0; ii < count; ii++) { - messagesToRepublish[ii].Republished = false; + bool success = false; + ServiceResult serviceResult = StatusCodes.BadMessageNotAvailable; + if (tasks[ii].IsCompleted) + { + (success, serviceResult) = tasks[ii].Result.ToTuple(); + } + messagesToRepublish[ii].Republished = success; + messagesToRepublish[ii].RepublishStatus = serviceResult.StatusCode; } } } @@ -2705,6 +2736,7 @@ private IncomingMessage FindOrCreateEntry(DateTime utcNow, uint sequenceNumber) IncomingMessage entry = null; LinkedListNode node = m_incomingMessages.Last; + Debug.Assert(Monitor.IsEntered(m_cache)); while (node != null) { entry = node.Value; @@ -2803,6 +2835,7 @@ private class IncomingMessage public NotificationMessage Message; public bool Processed; public bool Republished; + public StatusCode RepublishStatus; } private LinkedList m_incomingMessages; diff --git a/Stack/Opc.Ua.Core/Security/Constants/SecurityPolicies.cs b/Stack/Opc.Ua.Core/Security/Constants/SecurityPolicies.cs index f3a028645..df4adb345 100644 --- a/Stack/Opc.Ua.Core/Security/Constants/SecurityPolicies.cs +++ b/Stack/Opc.Ua.Core/Security/Constants/SecurityPolicies.cs @@ -68,7 +68,7 @@ public static class SecurityPolicies #region Static Methods private static bool IsPlatformSupportedUri(string name) { - if (name.Equals(nameof(Aes256_Sha256_RsaPss)) && + if (name.Equals(nameof(Aes256_Sha256_RsaPss), StringComparison.Ordinal) && !RsaUtils.IsSupportingRSAPssSign.Value) { return false; diff --git a/targets.props b/targets.props index 371c8abf3..7d41ba1c2 100644 --- a/targets.props +++ b/targets.props @@ -12,7 +12,7 @@ A build with all custom targets which are not part of a regular build is scheduled once a week in the DevOps build pipeline. Uncomment the following lines to test a custom test target - supported values: net462, netstandard2.1, net48, net6.0, net8.0 + supported values: net462, netstandard2.0, netstandard2.1, net472, net48, net6.0, net8.0 -->