Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Server] Improve publishing operation to avoid discarding values in edge cases #2763

Draft
wants to merge 10 commits into
base: develop/main374
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ private void IncrementSampleTime()
/// <summary>
/// Called by the subscription to publish any notification.
/// </summary>
public bool Publish(OperationContext context, Queue<MonitoredItemNotification> notifications, Queue<DiagnosticInfo> diagnostics)
public bool Publish(OperationContext context, Queue<MonitoredItemNotification> notifications, Queue<DiagnosticInfo> diagnostics, uint maxNotificationsPerPublish)
{
lock (m_lock)
{
Expand All @@ -715,23 +715,22 @@ public bool Publish(OperationContext context, Queue<MonitoredItemNotification> n
IncrementSampleTime();
}

// update publish flag.
m_readyToPublish = false;
m_readyToTrigger = false;

// check if queuing is enabled.
if (m_queue != null && (!m_resendData || m_queue.ItemsInQueue != 0))
{
DataValue value = null;
ServiceResult error = null;

while (m_queue.Publish(out value, out error))
uint notificationCount = 0;

while (notificationCount < maxNotificationsPerPublish && m_queue.Publish(out value, out error))
{
Publish(context, value, error, notifications, diagnostics);
notificationCount++;

if (m_resendData)
{
m_readyToPublish = m_queue.ItemsInQueue > 0;
break;
}
}
Expand All @@ -741,10 +740,14 @@ public bool Publish(OperationContext context, Queue<MonitoredItemNotification> n
Publish(context, m_lastValue, m_lastError, notifications, diagnostics);
}

bool moreValuesToPublish = m_queue?.ItemsInQueue > 0;

// update flags
m_readyToPublish = moreValuesToPublish;
m_readyToTrigger = moreValuesToPublish;
m_resendData = false;

return true;
return moreValuesToPublish;
}
}

Expand Down
5 changes: 3 additions & 2 deletions Libraries/Opc.Ua.Server/Subscription/IMonitoredItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ public interface IEventMonitoredItem : IMonitoredItem
/// Publishes all available event notifications.
/// </summary>
/// <returns>True if the caller should re-queue the item for publishing after the next interval elaspses.</returns>
bool Publish(OperationContext context, Queue<EventFieldList> notifications);
bool Publish(OperationContext context, Queue<EventFieldList> notifications, uint maxNotificationsPerPublish);

/// <summary>
/// Modifies the attributes for monitored item.
Expand Down Expand Up @@ -212,7 +212,8 @@ public interface IDataChangeMonitoredItem : IMonitoredItem
bool Publish(
OperationContext context,
Queue<MonitoredItemNotification> notifications,
Queue<DiagnosticInfo> diagnostics);
Queue<DiagnosticInfo> diagnostics,
uint maxNotificationsPerPublish);
}

/// <summary>
Expand Down
76 changes: 45 additions & 31 deletions Libraries/Opc.Ua.Server/Subscription/MonitoredItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Threading;
using System.Xml;

namespace Opc.Ua.Server
Expand Down Expand Up @@ -1127,7 +1126,7 @@ private void IncrementSampleTime()
/// <summary>
/// Publishes all available event notifications.
/// </summary>
public virtual bool Publish(OperationContext context, Queue<EventFieldList> notifications)
public virtual bool Publish(OperationContext context, Queue<EventFieldList> notifications, uint maxNotificationsPerPublish)
{
if (context == null) throw new ArgumentNullException(nameof(context));
if (notifications == null) throw new ArgumentNullException(nameof(notifications));
Expand All @@ -1149,6 +1148,7 @@ public virtual bool Publish(OperationContext context, Queue<EventFieldList> noti
// go to the next sampling interval.
IncrementSampleTime();

bool moreValuesToPublish = false;
// publish events.
if (m_events != null)
{
Expand Down Expand Up @@ -1179,55 +1179,67 @@ public virtual bool Publish(OperationContext context, Queue<EventFieldList> noti

// fetch the event fields.
overflowEvent = GetEventFields(
new FilterContext(m_server.NamespaceUris, m_server.TypeTree, Session.PreferredLocales),
new FilterContext(m_server.NamespaceUris, m_server.TypeTree, Session?.PreferredLocales),
m_filterToUse as EventFilter,
e);
}


int notificationCount = 0;

// place event at the beginning of the queue.
if (overflowEvent != null && m_discardOldest)
{
notifications.Enqueue(overflowEvent);
notificationCount++;
}

for (int ii = 0; ii < m_events.Count; ii++)
int eventsToRemove = 0;
foreach (EventFieldList fields in m_events)
{
EventFieldList fields = (EventFieldList)m_events[ii];

//stop publishing if maxNotificationsPerPublish is reached
if (notificationCount >= maxNotificationsPerPublish)
romanett marked this conversation as resolved.
Show resolved Hide resolved
{
break;
}
// apply any diagnostic masks.
for (int jj = 0; jj < fields.EventFields.Count; jj++)
{
object value = fields.EventFields[jj].Value;

StatusResult result = value as StatusResult;

if (result != null)
{
result.ApplyDiagnosticMasks(context.DiagnosticsMask, context.StringTable);
}
(fields.EventFields[jj].Value as StatusResult)?.ApplyDiagnosticMasks(context.DiagnosticsMask, context.StringTable);
}

notifications.Enqueue(m_events[ii]);
notifications.Enqueue(fields);
notificationCount++;
eventsToRemove++;
}

m_events.Clear();
m_events.RemoveRange(0, eventsToRemove);

moreValuesToPublish = m_events?.Count > 0;

// place event at the end of the queue.
// place overflow event at the end of the queue if there is still space in the publish.
if (overflowEvent != null && !m_discardOldest)
romanett marked this conversation as resolved.
Show resolved Hide resolved
{
notifications.Enqueue(overflowEvent);
if (notificationCount < maxNotificationsPerPublish)
{
notifications.Enqueue(overflowEvent);
}
else
{
moreValuesToPublish = true;
}
}

Utils.LogTrace(Utils.TraceMasks.OperationDetail, "MONITORED ITEM: Publish(QueueSize={0})", notifications.Count);
}

// reset state variables.
m_overflow = false;
m_readyToPublish = false;
m_readyToTrigger = false;
m_overflow = m_overflow && moreValuesToPublish && !m_discardOldest;
m_readyToPublish = moreValuesToPublish;
m_readyToTrigger = moreValuesToPublish;
m_triggered = false;

return false;
return moreValuesToPublish;
}
}

Expand All @@ -1237,7 +1249,8 @@ public virtual bool Publish(OperationContext context, Queue<EventFieldList> noti
public virtual bool Publish(
OperationContext context,
Queue<MonitoredItemNotification> notifications,
Queue<DiagnosticInfo> diagnostics)
Queue<DiagnosticInfo> diagnostics,
uint maxNotificationsPerPublish)
{
if (context == null) throw new ArgumentNullException(nameof(context));
if (notifications == null) throw new ArgumentNullException(nameof(notifications));
Expand Down Expand Up @@ -1279,21 +1292,19 @@ public virtual bool Publish(

IncrementSampleTime();
}

m_readyToPublish = false;

// check if queueing enabled.
if (m_queue != null && (!m_resendData || m_queue.ItemsInQueue != 0))
{
DataValue value = null;
ServiceResult error = null;
uint notificationCount = 0;

while (m_queue.Publish(out value, out error))
while (notificationCount < maxNotificationsPerPublish && m_queue.Publish(out value, out error))
{
Publish(context, notifications, diagnostics, value, error);
notificationCount++;
if (m_resendData)
{
m_readyToPublish = m_queue.ItemsInQueue > 0;
break;
}
}
Expand All @@ -1306,13 +1317,16 @@ public virtual bool Publish(
Publish(context, notifications, diagnostics, m_lastValue, m_lastError);
}

// reset state variables.
bool moreValuesToPublish = m_queue?.ItemsInQueue > 0;

// reset state variables.
m_overflow = false;
m_readyToTrigger = false;
m_readyToPublish = moreValuesToPublish;
m_readyToTrigger = moreValuesToPublish;
m_resendData = false;
m_triggered = false;

return false;
return moreValuesToPublish;
}
}

Expand Down
27 changes: 22 additions & 5 deletions Libraries/Opc.Ua.Server/Subscription/Subscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -847,23 +847,33 @@
// check for monitored items that are ready to publish.
LinkedListNode<IMonitoredItem> current = m_itemsToPublish.First;

//Limit the amount of values a monitored item publishes at once
uint maxNotificationsPerMonitoredItem = m_maxNotificationsPerPublish == 0 ? uint.MaxValue : m_maxNotificationsPerPublish * 3;

while (current != null)
{
LinkedListNode<IMonitoredItem> next = current.Next;
IMonitoredItem monitoredItem = current.Value;
bool hasMoreValuesToPublish;

if ((monitoredItem.MonitoredItemType & MonitoredItemTypeMask.DataChange) != 0)
{
((IDataChangeMonitoredItem)monitoredItem).Publish(context, datachanges, datachangeDiagnostics);
hasMoreValuesToPublish = ((IDataChangeMonitoredItem)monitoredItem).Publish(context, datachanges, datachangeDiagnostics, maxNotificationsPerMonitoredItem);
}
else
{
((IEventMonitoredItem)monitoredItem).Publish(context, events);
hasMoreValuesToPublish = ((IEventMonitoredItem)monitoredItem).Publish(context, events, maxNotificationsPerMonitoredItem);

Check warning on line 865 in Libraries/Opc.Ua.Server/Subscription/Subscription.cs

View check run for this annotation

Codecov / codecov/patch

Libraries/Opc.Ua.Server/Subscription/Subscription.cs#L865

Added line #L865 was not covered by tests
}

// add back to list to check.
m_itemsToPublish.Remove(current);
m_itemsToCheck.AddLast(current);
// if item has more values to publish leave it at the front of the list
// to execute publish in next cycle, no checking needed
// if no more values to publish are left add it to m_itemsToCheck
// to check status on next publish cylce
if (!hasMoreValuesToPublish)
{
m_itemsToPublish.Remove(current);
romanett marked this conversation as resolved.
Show resolved Hide resolved
m_itemsToCheck.AddLast(current);
}

// check there are enough notifications for a message.
if (m_maxNotificationsPerPublish > 0 && events.Count + datachanges.Count > m_maxNotificationsPerPublish)
Expand All @@ -888,6 +898,13 @@
m_diagnostics.EventNotificationsCount += (uint)(eventCount - events.Count);
m_diagnostics.NotificationsCount += (uint)notificationCount;
}

//stop fetching messages from MIs when message queue is full to avoid discards
// use m_maxMessageCount - 2 to put remaining values into the last allowed message (each MI is allowed to publish 3 up to messages at once)
if (messages.Count >= m_maxMessageCount - 2)
{
break;
}
}

current = next;
Expand Down
47 changes: 47 additions & 0 deletions Tests/Opc.Ua.Server.Tests/CommonTestWorkers.cs
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,10 @@ public static void SubscriptionTest(
QueueSize = queueSize
}
});

//add event item
itemsToCreate.Add(CreateEventMonitoredItem(queueSize, ref handleCounter));

response = services.CreateMonitoredItems(requestHeader, id, TimestampsToReturn.Neither, itemsToCreate,
out MonitoredItemCreateResultCollection itemCreateResults, out DiagnosticInfoCollection diagnosticInfos);
ServerFixtureUtils.ValidateResponse(response, itemCreateResults, itemsToCreate);
Expand Down Expand Up @@ -682,6 +686,49 @@ int samplingInterval
ServerFixtureUtils.ValidateResponse(response, itemCreateResults, itemsToCreate);
ServerFixtureUtils.ValidateDiagnosticInfos(diagnosticInfos, itemsToCreate, response.StringTable);
}

private static MonitoredItemCreateRequest CreateEventMonitoredItem(uint queueSize, ref uint handleCounter)
{
var whereClause = new ContentFilter();

whereClause.Push(FilterOperator.Equals, new FilterOperand[] {
new SimpleAttributeOperand() {
AttributeId = Attributes.Value,
TypeDefinitionId = ObjectTypeIds.BaseEventType,
BrowsePath = new QualifiedNameCollection(new QualifiedName[] { "EventType" })
},
new LiteralOperand {
Value = new Variant(new NodeId(ObjectTypeIds.BaseEventType))
}
});

var mi = new MonitoredItemCreateRequest() {
ItemToMonitor = new ReadValueId() {
AttributeId = Attributes.EventNotifier,
NodeId = ObjectIds.Server
},
MonitoringMode = MonitoringMode.Reporting,
RequestedParameters = new MonitoringParameters() {
ClientHandle = ++handleCounter,
SamplingInterval = -1,
Filter = new ExtensionObject(
new EventFilter {
SelectClauses = new SimpleAttributeOperandCollection(
new SimpleAttributeOperand[] {
new SimpleAttributeOperand{
AttributeId = Attributes.Value,
TypeDefinitionId = ObjectTypeIds.BaseEventType,
BrowsePath = new QualifiedNameCollection(new QualifiedName[] { BrowseNames.Message})
}
}),
WhereClause = whereClause,
}),
DiscardOldest = true,
QueueSize = queueSize
}
};
return mi;
}
#endregion

}
Expand Down
Loading
Loading