From e39bc464725886297a5088db11cba58209e71e8d Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Fri, 19 Jan 2024 08:11:43 +0100 Subject: [PATCH 1/8] improve republish processing --- Libraries/Opc.Ua.Client/ISession.cs | 4 +- Libraries/Opc.Ua.Client/Session.cs | 47 ++++++++++---- Libraries/Opc.Ua.Client/SessionAsync.cs | 12 ++-- Libraries/Opc.Ua.Client/Subscription.cs | 72 +++++++++++++++------ Libraries/Opc.Ua.Client/TraceableSession.cs | 6 +- 5 files changed, 99 insertions(+), 42 deletions(-) diff --git a/Libraries/Opc.Ua.Client/ISession.cs b/Libraries/Opc.Ua.Client/ISession.cs index e38cafef4e..971384ecb8 100644 --- a/Libraries/Opc.Ua.Client/ISession.cs +++ b/Libraries/Opc.Ua.Client/ISession.cs @@ -943,7 +943,7 @@ ResponseHeader EndBrowseNext( /// /// Sends a republish request. /// - bool Republish(uint subscriptionId, uint sequenceNumber); + bool Republish(uint subscriptionId, uint sequenceNumber, out ServiceResult error); /// /// Call the ResendData method on the server for all subscriptions. @@ -954,7 +954,7 @@ ResponseHeader EndBrowseNext( /// /// Sends a republish request. /// - Task RepublishAsync(uint subscriptionId, uint sequenceNumber, CancellationToken ct = default); + Task<(bool, ServiceResult)> RepublishAsync(uint subscriptionId, uint sequenceNumber, CancellationToken ct = default); /// /// Call the ResendData method on the server for all subscriptions. diff --git a/Libraries/Opc.Ua.Client/Session.cs b/Libraries/Opc.Ua.Client/Session.cs index 47a9dc89f1..a74178c6c9 100644 --- a/Libraries/Opc.Ua.Client/Session.cs +++ b/Libraries/Opc.Ua.Client/Session.cs @@ -51,6 +51,8 @@ public partial class Session : SessionClientBatched, ISession private const int kMinPublishRequestCountMax = 100; private const int kDefaultPublishRequestCount = 1; private const int kKeepAliveGuardBand = 1000; + private const int kPublishRequestSequenceNumberOutOfOrderThreshold = 10; + private const int kPublishRequestSequenceNumberOutdatedThreshold = 100; #region Constructors /// @@ -391,7 +393,7 @@ protected override void Dispose(bool disposing) Utils.SilentDispose(m_nodeCache); m_nodeCache = null; - IList subscriptions = null; + List subscriptions = null; lock (SyncRoot) { subscriptions = new List(m_subscriptions); @@ -402,6 +404,7 @@ protected override void Dispose(bool disposing) { Utils.SilentDispose(subscription); } + subscriptions.Clear(); } base.Dispose(disposing); @@ -2416,7 +2419,8 @@ public void Open( // save session id. lock (SyncRoot) { - SessionCreated(sessionId, sessionCookie); + // save session id and cookie in base + base.SessionCreated(sessionId, sessionCookie); } Utils.LogInfo("Revised session timeout value: {0}. ", m_sessionTimeout); @@ -2527,6 +2531,9 @@ public void Open( // raise event that session configuration chnaged. IndicateSessionConfigurationChanged(); + + // notify session created callback, which was already set in base class only. + SessionCreated(sessionId, sessionCookie); } catch (Exception) { @@ -4857,8 +4864,8 @@ public IAsyncResult BeginPublish(int timeout) #endif } - uint timeoutHint = (uint)((timeout > 0) ? timeout : 0); - timeoutHint = Math.Min((uint)OperationTimeout / 2, timeoutHint); + uint timeoutHint = (uint)((timeout > 0) ? (uint)timeout : uint.MaxValue); + timeoutHint = Math.Min((uint)(OperationTimeout / 2), timeoutHint); // send publish request. var requestHeader = new RequestHeader { @@ -5102,8 +5109,11 @@ private void OnPublishComplete(IAsyncResult result) } /// - public bool Republish(uint subscriptionId, uint sequenceNumber) + public bool Republish(uint subscriptionId, uint sequenceNumber, out ServiceResult error) { + bool result = true; + error = ServiceResult.Good; + // send republish request. RequestHeader requestHeader = new RequestHeader { TimeoutHint = (uint)OperationTimeout, @@ -5133,13 +5143,13 @@ public bool Republish(uint subscriptionId, uint sequenceNumber) null, false, notificationMessage); - - return true; } catch (Exception e) { - return ProcessRepublishResponseError(e, subscriptionId, sequenceNumber); + (result, error) = ProcessRepublishResponseError(e, subscriptionId, sequenceNumber); } + + return result; } /// @@ -5593,7 +5603,7 @@ ITransportChannel transportChannel /// The exception that occurred during the republish operation. /// The subscription Id for which the republish was requested. /// The sequencenumber for which the republish was requested. - private bool ProcessRepublishResponseError(Exception e, uint subscriptionId, uint sequenceNumber) + private (bool, ServiceResult) ProcessRepublishResponseError(Exception e, uint subscriptionId, uint sequenceNumber) { ServiceResult error = new ServiceResult(e); @@ -5601,6 +5611,7 @@ private bool ProcessRepublishResponseError(Exception e, uint subscriptionId, uin bool result = true; switch (error.StatusCode.Code) { + case StatusCodes.BadSubscriptionIdInvalid: case StatusCodes.BadMessageNotAvailable: Utils.LogWarning("Message {0}-{1} no longer available.", subscriptionId, sequenceNumber); break; @@ -5617,7 +5628,7 @@ private bool ProcessRepublishResponseError(Exception e, uint subscriptionId, uin default: result = false; - Utils.LogError(e, "Unexpected error sending republish request."); + Utils.LogError("Unexpected error {0} sending republish request.", error); break; } @@ -5641,7 +5652,7 @@ private bool ProcessRepublishResponseError(Exception e, uint subscriptionId, uin } } - return result; + return (result, error); } /// @@ -5716,6 +5727,7 @@ private void ProcessPublishResponse( _ = availableSequenceNumbers?.Remove(notificationMessage.SequenceNumber); } + // match an acknowledgement to be sent back to the server. for (int ii = 0; ii < m_acknowledgementsToSend.Count; ii++) { SubscriptionAcknowledgement acknowledgement = m_acknowledgementsToSend[ii]; @@ -5724,23 +5736,30 @@ private void ProcessPublishResponse( { acknowledgementsToSend.Add(acknowledgement); } - else if (availableSequenceNumbers == null || availableSequenceNumbers.Remove(acknowledgement.SequenceNumber)) + else if (availableSequenceNumbers == null || + availableSequenceNumbers.Remove(acknowledgement.SequenceNumber)) { acknowledgementsToSend.Add(acknowledgement); UpdateLatestSequenceNumberToSend(ref latestSequenceNumberToSend, acknowledgement.SequenceNumber); } + // a publish response may by processed out of order, + // allow for a tolerance until the sequence number is removed. + else if (Math.Abs((int)(acknowledgement.SequenceNumber - latestSequenceNumberToSend)) < kPublishRequestSequenceNumberOutOfOrderThreshold) + { + acknowledgementsToSend.Add(acknowledgement); + } else { Utils.LogWarning("SessionId {0}, SubscriptionId {1}, Sequence number={2} was not received in the available sequence numbers.", SessionId, subscriptionId, acknowledgement.SequenceNumber); } } - // Check for sleeping sequence numbers. May have been not acked due to a network glitch. + // Check for outdated sequence numbers. May have been not acked due to a network glitch. if (latestSequenceNumberToSend != 0 && availableSequenceNumbers?.Count > 0) { foreach (var sequenceNumber in availableSequenceNumbers) { - if ((int)(latestSequenceNumberToSend - sequenceNumber) > 100) + if ((int)(latestSequenceNumberToSend - sequenceNumber) > kPublishRequestSequenceNumberOutdatedThreshold) { AddAcknowledgementToSend(acknowledgementsToSend, subscriptionId, sequenceNumber); Utils.LogWarning("SessionId {0}, SubscriptionId {1}, Sequence number={2} was sleeping, acknowledged.", SessionId, subscriptionId, sequenceNumber); diff --git a/Libraries/Opc.Ua.Client/SessionAsync.cs b/Libraries/Opc.Ua.Client/SessionAsync.cs index 6a87625f9f..30236e26f8 100644 --- a/Libraries/Opc.Ua.Client/SessionAsync.cs +++ b/Libraries/Opc.Ua.Client/SessionAsync.cs @@ -184,7 +184,8 @@ public async Task OpenAsync( // save session id. lock (SyncRoot) { - SessionCreated(sessionId, sessionCookie); + // save session id and cookie in base + base.SessionCreated(sessionId, sessionCookie); } Utils.LogInfo("Revised session timeout value: {0}. ", m_sessionTimeout); @@ -294,6 +295,9 @@ public async Task OpenAsync( // raise event that session configuration chnaged. IndicateSessionConfigurationChanged(); + + // call session created callback, which was already set in base class only. + SessionCreated(sessionId, sessionCookie); } catch (Exception) { @@ -1498,7 +1502,7 @@ private async Task ReconnectAsync(ITransportWaitingConnection connection, ITrans try { - _ = await operation.EndAsync(kReconnectTimeout / 2, true, ct).ConfigureAwait(false); + _ = await operation.EndAsync(kReconnectTimeout, true, ct).ConfigureAwait(false); } catch (ServiceResultException) { @@ -1547,7 +1551,7 @@ private async Task ReconnectAsync(ITransportWaitingConnection connection, ITrans } /// - public async Task RepublishAsync(uint subscriptionId, uint sequenceNumber, CancellationToken ct) + public async Task<(bool, ServiceResult)> RepublishAsync(uint subscriptionId, uint sequenceNumber, CancellationToken ct) { // send republish request. RequestHeader requestHeader = new RequestHeader { @@ -1579,7 +1583,7 @@ public async Task RepublishAsync(uint subscriptionId, uint sequenceNumber, false, notificationMessage); - return true; + return (true, ServiceResult.Good); } catch (Exception e) { diff --git a/Libraries/Opc.Ua.Client/Subscription.cs b/Libraries/Opc.Ua.Client/Subscription.cs index 25e9e0b2ba..51e6cd126e 100644 --- a/Libraries/Opc.Ua.Client/Subscription.cs +++ b/Libraries/Opc.Ua.Client/Subscription.cs @@ -29,6 +29,7 @@ using System; using System.Collections.Generic; +using System.Diagnostics; using System.Linq; using System.Runtime.Serialization; using System.Threading; @@ -43,8 +44,10 @@ namespace Opc.Ua.Client [DataContract(Namespace = Namespaces.OpcUaXsd)] public partial class Subscription : IDisposable, ICloneable { - const int MinKeepAliveTimerInterval = 1000; - const int KeepAliveTimerMargin = 1000; + const int kMinKeepAliveTimerInterval = 1000; + const int kKeepAliveTimerMargin = 1000; + const int kRepublishMessageTimeout = 2000; + const int kRepublishMessageExpiredTimeout = 10000; #region Constructors /// @@ -797,7 +800,7 @@ public bool PublishingStopped get { TimeSpan timeSinceLastNotification = TimeSpan.FromTicks(DateTime.UtcNow.Ticks - Interlocked.Read(ref m_lastNotificationTime)); - if (timeSinceLastNotification.TotalMilliseconds > m_keepAliveInterval + KeepAliveTimerMargin) + if (timeSinceLastNotification.TotalMilliseconds > m_keepAliveInterval + kKeepAliveTimerMargin) { return true; } @@ -1475,8 +1478,9 @@ public void SaveMessageInCache( entry = node.Value; LinkedListNode next = node.Next; - // can only pull off processed or expired messages. - if (!entry.Processed && !(entry.Republished && entry.Timestamp.AddSeconds(10) < now)) + // can only pull off processed or expired or missing messages. + if (!entry.Processed && + !(entry.Republished && (entry.RepublishStatus != StatusCodes.Good || entry.Timestamp.AddMilliseconds(kRepublishMessageExpiredTimeout) < now))) { break; } @@ -1737,7 +1741,7 @@ private void ProcessTransferredSequenceNumbers(UInt32Collection availableSequenc // only republish consecutive sequence numbers // triggers the republish mechanism immediately, // if event is in the past - var now = DateTime.UtcNow.AddSeconds(-5); + var now = DateTime.UtcNow.AddMilliseconds(-kRepublishMessageTimeout*2); uint lastSequenceNumberToRepublish = m_lastSequenceNumberProcessed - 1; int availableNumbers = availableSequenceNumbers.Count; int republishMessages = 0; @@ -1838,10 +1842,10 @@ private void StartKeepAliveTimer() Interlocked.Exchange(ref m_lastNotificationTime, DateTime.UtcNow.Ticks); m_keepAliveInterval = (int)(Math.Min(m_currentPublishingInterval * (m_currentKeepAliveCount + 1), Int32.MaxValue)); - if (m_keepAliveInterval < MinKeepAliveTimerInterval) + if (m_keepAliveInterval < kMinKeepAliveTimerInterval) { m_keepAliveInterval = (int)(Math.Min(m_publishingInterval * (m_keepAliveCount + 1), Int32.MaxValue)); - m_keepAliveInterval = Math.Min(MinKeepAliveTimerInterval, m_keepAliveInterval); + m_keepAliveInterval = Math.Min(kMinKeepAliveTimerInterval, m_keepAliveInterval); } #if NET6_0_OR_GREATER var publishTimer = new PeriodicTimer(TimeSpan.FromMilliseconds(m_keepAliveInterval)); @@ -1975,7 +1979,7 @@ internal void TraceState(string context) /// private int BeginPublishTimeout() { - return Math.Max(Math.Min(m_keepAliveInterval * 3, Int32.MaxValue), MinKeepAliveTimerInterval); ; + return Math.Max(Math.Min(m_keepAliveInterval * 3, Int32.MaxValue), kMinKeepAliveTimerInterval); ; } /// @@ -2235,16 +2239,28 @@ private async Task OnMessageReceivedAsync(CancellationToken ct) // check for missing messages. else if (ii.Next != null && ii.Value.Message == null && !ii.Value.Processed && !ii.Value.Republished) { - if (ii.Value.Timestamp.AddSeconds(2) < DateTime.UtcNow) + // tolerate if a single request was received out of order + if (ii.Next.Next != null && + ii.Value.Timestamp.AddMilliseconds(kRepublishMessageTimeout) < DateTime.UtcNow) { - if (messagesToRepublish == null) - { - messagesToRepublish = new List(); - } - - messagesToRepublish.Add(ii.Value); ii.Value.Republished = true; publishStateChangedMask |= PublishStateChangedMask.Republish; + + // only call republish if the sequence number is available + if (m_availableSequenceNumbers?.Contains(ii.Value.SequenceNumber) == true) + { + if (messagesToRepublish == null) + { + messagesToRepublish = new List(); + } + + messagesToRepublish.Add(ii.Value); + } + else + { + Utils.LogInfo("Skipped to receive RepublishAsync for {0}-{1}-BadMessageNotAvailable", subscriptionId, ii.Value.SequenceNumber); + ii.Value.RepublishStatus = StatusCodes.BadMessageNotAvailable; + } } } #if DEBUG @@ -2373,11 +2389,27 @@ private async Task OnMessageReceivedAsync(CancellationToken ct) // do any re-publishes. if (messagesToRepublish != null && session != null && subscriptionId != 0) { - for (int ii = 0; ii < messagesToRepublish.Count; ii++) + int count = messagesToRepublish.Count; + var tasks = new Task<(bool, ServiceResult)>[count]; + for (int ii = 0; ii < count; ii++) + { + tasks[ii] = session.RepublishAsync(subscriptionId, messagesToRepublish[ii].SequenceNumber, ct); + } + + await Task.WhenAll(tasks).ConfigureAwait(false); + + lock (m_cache) { - if (!await session.RepublishAsync(subscriptionId, messagesToRepublish[ii].SequenceNumber, ct).ConfigureAwait(false)) + for (int ii = 0; ii < count; ii++) { - messagesToRepublish[ii].Republished = false; + bool success = false; + ServiceResult serviceResult = StatusCodes.BadMessageNotAvailable; + if (tasks[ii].IsCompleted) + { + (success, serviceResult) = tasks[ii].Result.ToTuple(); + } + messagesToRepublish[ii].Republished = success; + messagesToRepublish[ii].RepublishStatus = serviceResult.StatusCode; } } } @@ -2699,6 +2731,7 @@ private IncomingMessage FindOrCreateEntry(DateTime utcNow, uint sequenceNumber) IncomingMessage entry = null; LinkedListNode node = m_incomingMessages.Last; + Debug.Assert(Monitor.IsEntered(m_cache)); while (node != null) { entry = node.Value; @@ -2797,6 +2830,7 @@ private class IncomingMessage public NotificationMessage Message; public bool Processed; public bool Republished; + public StatusCode RepublishStatus; } private LinkedList m_incomingMessages; diff --git a/Libraries/Opc.Ua.Client/TraceableSession.cs b/Libraries/Opc.Ua.Client/TraceableSession.cs index 2d970ed550..359e41dbfa 100644 --- a/Libraries/Opc.Ua.Client/TraceableSession.cs +++ b/Libraries/Opc.Ua.Client/TraceableSession.cs @@ -938,16 +938,16 @@ public void StartPublishing(int timeout, bool fullQueue) } /// - public bool Republish(uint subscriptionId, uint sequenceNumber) + public bool Republish(uint subscriptionId, uint sequenceNumber, out ServiceResult error) { using (Activity activity = ActivitySource.StartActivity()) { - return m_session.Republish(subscriptionId, sequenceNumber); + return m_session.Republish(subscriptionId, sequenceNumber, out error); } } /// - public async Task RepublishAsync(uint subscriptionId, uint sequenceNumber, CancellationToken ct = default) + public async Task<(bool, ServiceResult)> RepublishAsync(uint subscriptionId, uint sequenceNumber, CancellationToken ct = default) { using (Activity activity = ActivitySource.StartActivity()) { From 6f450f66a08c139e3d1d3a9624d0ec139c07a96d Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Fri, 19 Jan 2024 08:26:44 +0100 Subject: [PATCH 2/8] reduce ack error output --- Libraries/Opc.Ua.Client/Session.cs | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/Libraries/Opc.Ua.Client/Session.cs b/Libraries/Opc.Ua.Client/Session.cs index a74178c6c9..cb1e41faf1 100644 --- a/Libraries/Opc.Ua.Client/Session.cs +++ b/Libraries/Opc.Ua.Client/Session.cs @@ -39,6 +39,7 @@ using System.Threading; using System.Threading.Tasks; using System.Xml; +using Microsoft.Extensions.Logging; namespace Opc.Ua.Client { @@ -4967,11 +4968,14 @@ private void OnPublishComplete(IAsyncResult result) out acknowledgeResults, out acknowledgeDiagnosticInfos); + LogLevel logLevel = LogLevel.Warning; foreach (StatusCode code in acknowledgeResults) { - if (StatusCode.IsBad(code)) + if (StatusCode.IsBad(code) && code != StatusCodes.BadSequenceNumberUnknown) { - Utils.LogError("Error - Publish call finished. ResultCode={0}; SubscriptionId={1};", code.ToString(), subscriptionId); + Utils.Log(logLevel, "Publish Ack Response. ResultCode={0}; SubscriptionId={1}", code.ToString(), subscriptionId); + // only show the first error + logLevel = LogLevel.Trace; } } @@ -5762,7 +5766,7 @@ private void ProcessPublishResponse( if ((int)(latestSequenceNumberToSend - sequenceNumber) > kPublishRequestSequenceNumberOutdatedThreshold) { AddAcknowledgementToSend(acknowledgementsToSend, subscriptionId, sequenceNumber); - Utils.LogWarning("SessionId {0}, SubscriptionId {1}, Sequence number={2} was sleeping, acknowledged.", SessionId, subscriptionId, sequenceNumber); + Utils.LogWarning("SessionId {0}, SubscriptionId {1}, Sequence number={2} was outdated, acknowledged.", SessionId, subscriptionId, sequenceNumber); } } } From b0fa88f506993c0a7e0bbf3c52901484ebf64b5c Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Wed, 14 Feb 2024 07:24:05 +0100 Subject: [PATCH 3/8] changes --- .../Security/Constants/SecurityPolicies.cs | 2 +- .../Stack/Tcp/UaSCBinaryChannel.cs | 19 +++---- .../Stack/Tcp/UaSCBinaryClientChannel.cs | 52 ++++++++----------- 3 files changed, 32 insertions(+), 41 deletions(-) diff --git a/Stack/Opc.Ua.Core/Security/Constants/SecurityPolicies.cs b/Stack/Opc.Ua.Core/Security/Constants/SecurityPolicies.cs index 4b673890f3..db0c315296 100644 --- a/Stack/Opc.Ua.Core/Security/Constants/SecurityPolicies.cs +++ b/Stack/Opc.Ua.Core/Security/Constants/SecurityPolicies.cs @@ -68,7 +68,7 @@ public static class SecurityPolicies #region Static Methods private static bool IsPlatformSupportedUri(string name) { - if (name.Equals(nameof(Aes256_Sha256_RsaPss)) && + if (name.Equals(nameof(Aes256_Sha256_RsaPss), StringComparison.Ordinal) && !RsaUtils.IsSupportingRSAPssSign.Value) { return false; diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs index d0c22f6c02..e179fe402c 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs @@ -339,23 +339,20 @@ public virtual bool ChannelFull /// public virtual void OnMessageReceived(IMessageSocket source, ArraySegment message) { - lock (DataLock) + try { - try - { - uint messageType = BitConverter.ToUInt32(message.Array, message.Offset); + uint messageType = BitConverter.ToUInt32(message.Array, message.Offset); - if (!HandleIncomingMessage(messageType, message)) - { - BufferManager.ReturnBuffer(message.Array, "OnMessageReceived"); - } - } - catch (Exception e) + if (!HandleIncomingMessage(messageType, message)) { - HandleMessageProcessingError(e, StatusCodes.BadTcpInternalError, "An error occurred receiving a message."); BufferManager.ReturnBuffer(message.Array, "OnMessageReceived"); } } + catch (Exception e) + { + HandleMessageProcessingError(e, StatusCodes.BadTcpInternalError, "An error occurred receiving a message."); + BufferManager.ReturnBuffer(message.Array, "OnMessageReceived"); + } } #region Incoming Message Support Functions diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs index 41f9ccf186..08f8bdc8ef 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs @@ -11,6 +11,7 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. */ using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Security.Cryptography.X509Certificates; @@ -79,7 +80,7 @@ public UaSCUaBinaryClientChannel( ClientCertificateChain = clientCertificateChain; } - m_requests = new Dictionary(); + m_requests = new ConcurrentDictionary(); m_lastRequestId = 0; m_ConnectCallback = new EventHandler(OnConnectComplete); m_startHandshake = new TimerCallback(OnScheduledHandshake); @@ -120,7 +121,6 @@ public IAsyncResult BeginConnect(Uri url, int timeout, AsyncCallback callback, o if (url == null) throw new ArgumentNullException(nameof(url)); if (timeout <= 0) throw new ArgumentException("Timeout must be greater than zero.", nameof(timeout)); - Task task; lock (DataLock) { if (State != TcpChannelState.Closed) @@ -155,12 +155,7 @@ public IAsyncResult BeginConnect(Uri url, int timeout, AsyncCallback callback, o else { Socket = m_socketFactory.Create(this, BufferManager, Quotas.MaxBufferSize); - task = Task.Run(async () => { - using (var cts = new CancellationTokenSource(timeout)) - { - await (Socket?.BeginConnect(m_via, m_ConnectCallback, operation, cts.Token) ?? Task.FromResult(false)).ConfigureAwait(false); - } - }); + Socket.BeginConnect(m_via, m_ConnectCallback, operation); } } return m_handshakeOperation; @@ -773,17 +768,17 @@ protected override void HandleWriteComplete(BufferCollection buffers, object sta /// True if the function takes ownership of the buffer. protected override bool HandleIncomingMessage(uint messageType, ArraySegment messageChunk) { - lock (DataLock) + // process a response. + if (TcpMessageType.IsType(messageType, TcpMessageType.Message)) { - // process a response. - if (TcpMessageType.IsType(messageType, TcpMessageType.Message)) - { - //Utils.LogTrace("ChannelId {0}: ProcessResponseMessage", ChannelId); - return ProcessResponseMessage(messageType, messageChunk); - } + //Utils.LogTrace("ChannelId {0}: ProcessResponseMessage", ChannelId); + return ProcessResponseMessage(messageType, messageChunk); + } + lock (DataLock) + { // check for acknowledge. - else if (messageType == TcpMessageType.Acknowledge) + if (messageType == TcpMessageType.Acknowledge) { //Utils.LogTrace("ChannelId {0}: ProcessAcknowledgeMessage", ChannelId); return ProcessAcknowledgeMessage(messageChunk); @@ -874,7 +869,6 @@ private void OnScheduledHandshake(object state) { Utils.LogInfo("ChannelId {0}: Scheduled Handshake Starting: TokenId={1}", ChannelId, CurrentToken?.TokenId); - Task task; lock (DataLock) { // check if renewing a token. @@ -930,10 +924,7 @@ private void OnScheduledHandshake(object state) State = TcpChannelState.Connecting; Socket = m_socketFactory.Create(this, BufferManager, Quotas.MaxBufferSize); - task = Task.Run(async () => - await (Socket?.BeginConnect( - m_via, m_ConnectCallback, m_handshakeOperation, - CancellationToken.None) ?? Task.FromResult(false)).ConfigureAwait(false)); + Socket.BeginConnect(m_via, m_ConnectCallback, m_handshakeOperation); } } } @@ -1244,7 +1235,10 @@ private WriteOperation BeginOperation(int timeout, AsyncCallback callback, objec { WriteOperation operation = new WriteOperation(timeout, callback, state); operation.RequestId = Utils.IncrementIdentifier(ref m_lastRequestId); - m_requests.Add(operation.RequestId, operation); + if (!m_requests.TryAdd(operation.RequestId, operation)) + { + throw new ServiceResultException(StatusCodes.BadUnexpectedError, "Could not add operation to list of pending operations."); + } return operation; } @@ -1258,14 +1252,14 @@ private void OperationCompleted(WriteOperation operation) return; } - lock (DataLock) + if (m_handshakeOperation == operation) { - if (m_handshakeOperation == operation) - { - m_handshakeOperation = null; - } + m_handshakeOperation = null; + } - m_requests.Remove(operation.RequestId); + if (!m_requests.TryRemove(operation.RequestId, out _)) + { + throw new ServiceResultException(StatusCodes.BadUnexpectedError, "Could not remove operation from list of pending operations."); } } @@ -1552,7 +1546,7 @@ private bool ProcessResponseMessage(uint messageType, ArraySegment message private Uri m_url; private Uri m_via; private long m_lastRequestId; - private Dictionary m_requests; + private ConcurrentDictionary m_requests; private WriteOperation m_handshakeOperation; private ChannelToken m_requestedToken; private Timer m_handshakeTimer; From 060f1312977e722c12c524d6f43616b82affd3aa Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Wed, 14 Feb 2024 08:12:55 +0100 Subject: [PATCH 4/8] asenumare --- Libraries/Opc.Ua.Client/Subscription.cs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Libraries/Opc.Ua.Client/Subscription.cs b/Libraries/Opc.Ua.Client/Subscription.cs index 51e6cd126e..3969ed3245 100644 --- a/Libraries/Opc.Ua.Client/Subscription.cs +++ b/Libraries/Opc.Ua.Client/Subscription.cs @@ -509,7 +509,7 @@ public IEnumerable MonitoredItems { lock (m_cache) { - return new List(m_monitoredItems.Values); + return m_monitoredItems.Values.AsEnumerable(); } } } @@ -1741,7 +1741,7 @@ private void ProcessTransferredSequenceNumbers(UInt32Collection availableSequenc // only republish consecutive sequence numbers // triggers the republish mechanism immediately, // if event is in the past - var now = DateTime.UtcNow.AddMilliseconds(-kRepublishMessageTimeout*2); + var now = DateTime.UtcNow.AddMilliseconds(-kRepublishMessageTimeout * 2); uint lastSequenceNumberToRepublish = m_lastSequenceNumberProcessed - 1; int availableNumbers = availableSequenceNumbers.Count; int republishMessages = 0; From 31e12945d269e509817543c7679976bed6f37b39 Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Tue, 20 Feb 2024 09:59:30 +0100 Subject: [PATCH 5/8] revert change for other PR --- .../Stack/Tcp/UaSCBinaryChannel.cs | 19 ++++++---- .../Stack/Tcp/UaSCBinaryClientChannel.cs | 38 +++++++++---------- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs index d60c803822..9423d0ab4c 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryChannel.cs @@ -339,20 +339,23 @@ public virtual bool ChannelFull /// public virtual void OnMessageReceived(IMessageSocket source, ArraySegment message) { - try + lock (DataLock) { - uint messageType = BitConverter.ToUInt32(message.Array, message.Offset); + try + { + uint messageType = BitConverter.ToUInt32(message.Array, message.Offset); - if (!HandleIncomingMessage(messageType, message)) + if (!HandleIncomingMessage(messageType, message)) + { + BufferManager.ReturnBuffer(message.Array, "OnMessageReceived"); + } + } + catch (Exception e) { + HandleMessageProcessingError(e, StatusCodes.BadTcpInternalError, "An error occurred receiving a message."); BufferManager.ReturnBuffer(message.Array, "OnMessageReceived"); } } - catch (Exception e) - { - HandleMessageProcessingError(e, StatusCodes.BadTcpInternalError, "An error occurred receiving a message."); - BufferManager.ReturnBuffer(message.Array, "OnMessageReceived"); - } } #region Incoming Message Support Functions diff --git a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs index f74b3e953d..f151b19cb9 100644 --- a/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs +++ b/Stack/Opc.Ua.Core/Stack/Tcp/UaSCBinaryClientChannel.cs @@ -11,7 +11,6 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. */ using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Security.Cryptography.X509Certificates; @@ -80,7 +79,7 @@ public UaSCUaBinaryClientChannel( ClientCertificateChain = clientCertificateChain; } - m_requests = new ConcurrentDictionary(); + m_requests = new Dictionary(); m_lastRequestId = 0; m_ConnectCallback = new EventHandler(OnConnectComplete); m_startHandshake = new TimerCallback(OnScheduledHandshake); @@ -768,17 +767,17 @@ protected override void HandleWriteComplete(BufferCollection buffers, object sta /// True if the function takes ownership of the buffer. protected override bool HandleIncomingMessage(uint messageType, ArraySegment messageChunk) { - // process a response. - if (TcpMessageType.IsType(messageType, TcpMessageType.Message)) - { - //Utils.LogTrace("ChannelId {0}: ProcessResponseMessage", ChannelId); - return ProcessResponseMessage(messageType, messageChunk); - } - lock (DataLock) { + // process a response. + if (TcpMessageType.IsType(messageType, TcpMessageType.Message)) + { + //Utils.LogTrace("ChannelId {0}: ProcessResponseMessage", ChannelId); + return ProcessResponseMessage(messageType, messageChunk); + } + // check for acknowledge. - if (messageType == TcpMessageType.Acknowledge) + else if (messageType == TcpMessageType.Acknowledge) { //Utils.LogTrace("ChannelId {0}: ProcessAcknowledgeMessage", ChannelId); return ProcessAcknowledgeMessage(messageChunk); @@ -1235,10 +1234,7 @@ private WriteOperation BeginOperation(int timeout, AsyncCallback callback, objec { WriteOperation operation = new WriteOperation(timeout, callback, state); operation.RequestId = Utils.IncrementIdentifier(ref m_lastRequestId); - if (!m_requests.TryAdd(operation.RequestId, operation)) - { - throw new ServiceResultException(StatusCodes.BadUnexpectedError, "Could not add operation to list of pending operations."); - } + m_requests.Add(operation.RequestId, operation); return operation; } @@ -1252,14 +1248,14 @@ private void OperationCompleted(WriteOperation operation) return; } - if (m_handshakeOperation == operation) + lock (DataLock) { - m_handshakeOperation = null; - } + if (m_handshakeOperation == operation) + { + m_handshakeOperation = null; + } - if (!m_requests.TryRemove(operation.RequestId, out _)) - { - throw new ServiceResultException(StatusCodes.BadUnexpectedError, "Could not remove operation from list of pending operations."); + m_requests.Remove(operation.RequestId); } } @@ -1546,7 +1542,7 @@ private bool ProcessResponseMessage(uint messageType, ArraySegment message private Uri m_url; private Uri m_via; private long m_lastRequestId; - private ConcurrentDictionary m_requests; + private Dictionary m_requests; private WriteOperation m_handshakeOperation; private ChannelToken m_requestedToken; private Timer m_handshakeTimer; From 3cb601405cbab933b649b80c5bd3667ae02aa41e Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Mon, 26 Feb 2024 17:02:06 +0100 Subject: [PATCH 6/8] targtes --- targets.props | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/targets.props b/targets.props index 4f38acb57f..7d41ba1c25 100644 --- a/targets.props +++ b/targets.props @@ -12,11 +12,11 @@ A build with all custom targets which are not part of a regular build is scheduled once a week in the DevOps build pipeline. Uncomment the following lines to test a custom test target - supported values: net462, netstandard2.0, netstandard2.1, net48, net6.0, net8.0 + supported values: net462, netstandard2.0, netstandard2.1, net472, net48, net6.0, net8.0 --> @@ -34,6 +34,17 @@ net462 + + + preview-all + net472 + net472 + net472 + net472 + net472 + net472 + + true @@ -91,17 +102,18 @@ - - + + + preview-all net6.0;net48 net6.0 net48;net6.0 - net48;netstandard2.0;netstandard2.1;net6.0;net8.0 + net48;netstandard2.1;net6.0;net8.0 net48;netstandard2.1;net6.0;net8.0 - net48;netstandard2.0;netcoreapp3.1;net6.0;net8.0 + net48;netcoreapp3.1;net6.0;net8.0 @@ -111,9 +123,9 @@ netcoreapp3.1;net48 netcoreapp3.1 net48;netcoreapp3.1 - net48;netstandard2.0;netstandard2.1 + net48;netstandard2.1 net48;netstandard2.1 - net48;netstandard2.0;netcoreapp3.1 + net48;netcoreapp3.1 From bc3f9d459ca0e42484686b6cce5564f41de37a2e Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Mon, 26 Feb 2024 23:34:38 +0100 Subject: [PATCH 7/8] polish --- Libraries/Opc.Ua.Client/Session.cs | 2 +- Libraries/Opc.Ua.Client/SessionAsync.cs | 2 +- Libraries/Opc.Ua.Client/Subscription.cs | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/Libraries/Opc.Ua.Client/Session.cs b/Libraries/Opc.Ua.Client/Session.cs index 76e6e9128d..f64b63841d 100644 --- a/Libraries/Opc.Ua.Client/Session.cs +++ b/Libraries/Opc.Ua.Client/Session.cs @@ -5668,7 +5668,7 @@ ITransportChannel transportChannel default: result = false; - Utils.LogError("Unexpected error {0} sending republish request.", error); + Utils.LogError(e, "Unexpected error sending republish request."); break; } diff --git a/Libraries/Opc.Ua.Client/SessionAsync.cs b/Libraries/Opc.Ua.Client/SessionAsync.cs index 30236e26f8..c660624a63 100644 --- a/Libraries/Opc.Ua.Client/SessionAsync.cs +++ b/Libraries/Opc.Ua.Client/SessionAsync.cs @@ -1502,7 +1502,7 @@ private async Task ReconnectAsync(ITransportWaitingConnection connection, ITrans try { - _ = await operation.EndAsync(kReconnectTimeout, true, ct).ConfigureAwait(false); + _ = await operation.EndAsync(kReconnectTimeout / 2, true, ct).ConfigureAwait(false); } catch (ServiceResultException) { diff --git a/Libraries/Opc.Ua.Client/Subscription.cs b/Libraries/Opc.Ua.Client/Subscription.cs index f61e978ef1..80c8096866 100644 --- a/Libraries/Opc.Ua.Client/Subscription.cs +++ b/Libraries/Opc.Ua.Client/Subscription.cs @@ -46,7 +46,7 @@ public partial class Subscription : IDisposable, ICloneable { const int kMinKeepAliveTimerInterval = 1000; const int kKeepAliveTimerMargin = 1000; - const int kRepublishMessageTimeout = 2000; + const int kRepublishMessageTimeout = 2500; const int kRepublishMessageExpiredTimeout = 10000; #region Constructors @@ -1979,7 +1979,7 @@ internal void TraceState(string context) /// private int BeginPublishTimeout() { - return Math.Max(Math.Min(m_keepAliveInterval * 3, Int32.MaxValue), kMinKeepAliveTimerInterval); ; + return Math.Max(Math.Min(m_keepAliveInterval * 3, Int32.MaxValue), kMinKeepAliveTimerInterval); } /// From 84316fe2089a0ffd25378b2f17595e68125309b5 Mon Sep 17 00:00:00 2001 From: Martin Regen Date: Mon, 26 Feb 2024 23:37:21 +0100 Subject: [PATCH 8/8] revert asenumerable --- Libraries/Opc.Ua.Client/Subscription.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Libraries/Opc.Ua.Client/Subscription.cs b/Libraries/Opc.Ua.Client/Subscription.cs index 80c8096866..46cf56c77d 100644 --- a/Libraries/Opc.Ua.Client/Subscription.cs +++ b/Libraries/Opc.Ua.Client/Subscription.cs @@ -509,7 +509,7 @@ public IEnumerable MonitoredItems { lock (m_cache) { - return m_monitoredItems.Values.AsEnumerable(); + return new List(m_monitoredItems.Values); } } }