Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client Republish improvements #2468

Merged
merged 17 commits into from
Feb 27, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Libraries/Opc.Ua.Client/ISession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -943,7 +943,7 @@ ResponseHeader EndBrowseNext(
/// <summary>
/// Sends a republish request.
/// </summary>
bool Republish(uint subscriptionId, uint sequenceNumber);
bool Republish(uint subscriptionId, uint sequenceNumber, out ServiceResult error);

/// <summary>
/// Call the ResendData method on the server for all subscriptions.
Expand All @@ -954,7 +954,7 @@ ResponseHeader EndBrowseNext(
/// <summary>
/// Sends a republish request.
/// </summary>
Task<bool> RepublishAsync(uint subscriptionId, uint sequenceNumber, CancellationToken ct = default);
Task<(bool, ServiceResult)> RepublishAsync(uint subscriptionId, uint sequenceNumber, CancellationToken ct = default);

/// <summary>
/// Call the ResendData method on the server for all subscriptions.
Expand Down
57 changes: 40 additions & 17 deletions Libraries/Opc.Ua.Client/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
using Microsoft.Extensions.Logging;

namespace Opc.Ua.Client
{
Expand All @@ -51,6 +52,8 @@
private const int kMinPublishRequestCountMax = 100;
private const int kDefaultPublishRequestCount = 1;
private const int kKeepAliveGuardBand = 1000;
private const int kPublishRequestSequenceNumberOutOfOrderThreshold = 10;
private const int kPublishRequestSequenceNumberOutdatedThreshold = 100;

#region Constructors
/// <summary>
Expand Down Expand Up @@ -391,7 +394,7 @@
Utils.SilentDispose(m_nodeCache);
m_nodeCache = null;

IList<Subscription> subscriptions = null;
List<Subscription> subscriptions = null;
lock (SyncRoot)
{
subscriptions = new List<Subscription>(m_subscriptions);
Expand All @@ -402,6 +405,7 @@
{
Utils.SilentDispose(subscription);
}
subscriptions.Clear();
}

base.Dispose(disposing);
Expand Down Expand Up @@ -2416,7 +2420,8 @@
// save session id.
lock (SyncRoot)
{
SessionCreated(sessionId, sessionCookie);
// save session id and cookie in base
base.SessionCreated(sessionId, sessionCookie);
}

Utils.LogInfo("Revised session timeout value: {0}. ", m_sessionTimeout);
Expand Down Expand Up @@ -2527,6 +2532,9 @@

// raise event that session configuration chnaged.
IndicateSessionConfigurationChanged();

// notify session created callback, which was already set in base class only.
SessionCreated(sessionId, sessionCookie);
}
catch (Exception)
{
Expand Down Expand Up @@ -4857,8 +4865,8 @@
#endif
}

uint timeoutHint = (uint)((timeout > 0) ? timeout : 0);
timeoutHint = Math.Min((uint)OperationTimeout / 2, timeoutHint);
uint timeoutHint = (uint)((timeout > 0) ? (uint)timeout : uint.MaxValue);
timeoutHint = Math.Min((uint)(OperationTimeout / 2), timeoutHint);

// send publish request.
var requestHeader = new RequestHeader {
Expand Down Expand Up @@ -4960,11 +4968,14 @@
out acknowledgeResults,
out acknowledgeDiagnosticInfos);

LogLevel logLevel = LogLevel.Warning;
foreach (StatusCode code in acknowledgeResults)
{
if (StatusCode.IsBad(code))
if (StatusCode.IsBad(code) && code != StatusCodes.BadSequenceNumberUnknown)
{
Utils.LogError("Error - Publish call finished. ResultCode={0}; SubscriptionId={1};", code.ToString(), subscriptionId);
Utils.Log(logLevel, "Publish Ack Response. ResultCode={0}; SubscriptionId={1}", code.ToString(), subscriptionId);
// only show the first error
logLevel = LogLevel.Trace;
}
}

Expand Down Expand Up @@ -5102,8 +5113,11 @@
}

/// <inheritdoc/>
public bool Republish(uint subscriptionId, uint sequenceNumber)
public bool Republish(uint subscriptionId, uint sequenceNumber, out ServiceResult error)
{
bool result = true;
error = ServiceResult.Good;

Check warning on line 5119 in Libraries/Opc.Ua.Client/Session.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Client/Session.cs#L5118-L5119

Added lines #L5118 - L5119 were not covered by tests

// send republish request.
RequestHeader requestHeader = new RequestHeader {
TimeoutHint = (uint)OperationTimeout,
Expand Down Expand Up @@ -5133,13 +5147,13 @@
null,
false,
notificationMessage);

return true;
}
catch (Exception e)
{
return ProcessRepublishResponseError(e, subscriptionId, sequenceNumber);
(result, error) = ProcessRepublishResponseError(e, subscriptionId, sequenceNumber);

Check warning on line 5153 in Libraries/Opc.Ua.Client/Session.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Client/Session.cs#L5153

Added line #L5153 was not covered by tests
}

return result;

Check warning on line 5156 in Libraries/Opc.Ua.Client/Session.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Client/Session.cs#L5156

Added line #L5156 was not covered by tests
}

/// <inheritdoc/>
Expand Down Expand Up @@ -5593,14 +5607,15 @@
/// <param name="e">The exception that occurred during the republish operation.</param>
/// <param name="subscriptionId">The subscription Id for which the republish was requested. </param>
/// <param name="sequenceNumber">The sequencenumber for which the republish was requested.</param>
private bool ProcessRepublishResponseError(Exception e, uint subscriptionId, uint sequenceNumber)
private (bool, ServiceResult) ProcessRepublishResponseError(Exception e, uint subscriptionId, uint sequenceNumber)
{

ServiceResult error = new ServiceResult(e);

bool result = true;
switch (error.StatusCode.Code)
{
case StatusCodes.BadSubscriptionIdInvalid:
case StatusCodes.BadMessageNotAvailable:
Utils.LogWarning("Message {0}-{1} no longer available.", subscriptionId, sequenceNumber);
break;
Expand All @@ -5617,7 +5632,7 @@

default:
result = false;
Utils.LogError(e, "Unexpected error sending republish request.");
Utils.LogError("Unexpected error {0} sending republish request.", error);

Check warning on line 5635 in Libraries/Opc.Ua.Client/Session.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Client/Session.cs#L5635

Added line #L5635 was not covered by tests
break;
}

Expand All @@ -5641,7 +5656,7 @@
}
}

return result;
return (result, error);

Check warning on line 5659 in Libraries/Opc.Ua.Client/Session.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Client/Session.cs#L5659

Added line #L5659 was not covered by tests
}

/// <summary>
Expand Down Expand Up @@ -5716,6 +5731,7 @@
_ = availableSequenceNumbers?.Remove(notificationMessage.SequenceNumber);
}

// match an acknowledgement to be sent back to the server.
for (int ii = 0; ii < m_acknowledgementsToSend.Count; ii++)
{
SubscriptionAcknowledgement acknowledgement = m_acknowledgementsToSend[ii];
Expand All @@ -5724,26 +5740,33 @@
{
acknowledgementsToSend.Add(acknowledgement);
}
else if (availableSequenceNumbers == null || availableSequenceNumbers.Remove(acknowledgement.SequenceNumber))
else if (availableSequenceNumbers == null ||
availableSequenceNumbers.Remove(acknowledgement.SequenceNumber))
{
acknowledgementsToSend.Add(acknowledgement);
UpdateLatestSequenceNumberToSend(ref latestSequenceNumberToSend, acknowledgement.SequenceNumber);
}
// a publish response may by processed out of order,
// allow for a tolerance until the sequence number is removed.
else if (Math.Abs((int)(acknowledgement.SequenceNumber - latestSequenceNumberToSend)) < kPublishRequestSequenceNumberOutOfOrderThreshold)
{
acknowledgementsToSend.Add(acknowledgement);
}
else
{
Utils.LogWarning("SessionId {0}, SubscriptionId {1}, Sequence number={2} was not received in the available sequence numbers.", SessionId, subscriptionId, acknowledgement.SequenceNumber);
}
}

// Check for sleeping sequence numbers. May have been not acked due to a network glitch.
// Check for outdated sequence numbers. May have been not acked due to a network glitch.
if (latestSequenceNumberToSend != 0 && availableSequenceNumbers?.Count > 0)
{
foreach (var sequenceNumber in availableSequenceNumbers)
{
if ((int)(latestSequenceNumberToSend - sequenceNumber) > 100)
if ((int)(latestSequenceNumberToSend - sequenceNumber) > kPublishRequestSequenceNumberOutdatedThreshold)
{
AddAcknowledgementToSend(acknowledgementsToSend, subscriptionId, sequenceNumber);
Utils.LogWarning("SessionId {0}, SubscriptionId {1}, Sequence number={2} was sleeping, acknowledged.", SessionId, subscriptionId, sequenceNumber);
Utils.LogWarning("SessionId {0}, SubscriptionId {1}, Sequence number={2} was outdated, acknowledged.", SessionId, subscriptionId, sequenceNumber);

Check warning on line 5769 in Libraries/Opc.Ua.Client/Session.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Client/Session.cs#L5769

Added line #L5769 was not covered by tests
}
}
}
Expand Down
12 changes: 8 additions & 4 deletions Libraries/Opc.Ua.Client/SessionAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,8 @@ public async Task OpenAsync(
// save session id.
lock (SyncRoot)
{
SessionCreated(sessionId, sessionCookie);
// save session id and cookie in base
base.SessionCreated(sessionId, sessionCookie);
}

Utils.LogInfo("Revised session timeout value: {0}. ", m_sessionTimeout);
Expand Down Expand Up @@ -294,6 +295,9 @@ public async Task OpenAsync(

// raise event that session configuration chnaged.
IndicateSessionConfigurationChanged();

// call session created callback, which was already set in base class only.
SessionCreated(sessionId, sessionCookie);
}
catch (Exception)
{
Expand Down Expand Up @@ -1498,7 +1502,7 @@ private async Task ReconnectAsync(ITransportWaitingConnection connection, ITrans

try
{
_ = await operation.EndAsync(kReconnectTimeout / 2, true, ct).ConfigureAwait(false);
_ = await operation.EndAsync(kReconnectTimeout, true, ct).ConfigureAwait(false);
}
catch (ServiceResultException)
{
Expand Down Expand Up @@ -1547,7 +1551,7 @@ private async Task ReconnectAsync(ITransportWaitingConnection connection, ITrans
}

/// <inheritdoc/>
public async Task<bool> RepublishAsync(uint subscriptionId, uint sequenceNumber, CancellationToken ct)
public async Task<(bool, ServiceResult)> RepublishAsync(uint subscriptionId, uint sequenceNumber, CancellationToken ct)
{
// send republish request.
RequestHeader requestHeader = new RequestHeader {
Expand Down Expand Up @@ -1579,7 +1583,7 @@ public async Task<bool> RepublishAsync(uint subscriptionId, uint sequenceNumber,
false,
notificationMessage);

return true;
return (true, ServiceResult.Good);
}
catch (Exception e)
{
Expand Down
Loading
Loading