Skip to content

Commit

Permalink
fix: Merge Watch & Local events. Switch by uid & type grouping. (#616)
Browse files Browse the repository at this point in the history
Watcher and Local events are now merged to produce a sequence containing
all captured events instead of just the sequence that was last produced.
Events will still be switch but only at group level, grouping by
resource Uid and sub-grouping by event type, therefore Keeping always
last event for each type of each resources, preventing events
accumulation.

Fixes #585 #579
  • Loading branch information
Crespalvaro authored Sep 29, 2023
1 parent 1602b1c commit d6031c6
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 8 deletions.
2 changes: 2 additions & 0 deletions src/KubeOps/Operator/Caching/IResourceCache.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ internal interface IResourceCache<TEntity>

TEntity Upsert(TEntity resource, out CacheComparisonResult result);

bool Exists(TEntity resource);

void Fill(IEnumerable<TEntity> resources);

void Remove(TEntity resource);
Expand Down
4 changes: 2 additions & 2 deletions src/KubeOps/Operator/Caching/ResourceCache{TEntity}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ public void Clear()
_metrics.CachedItemsSummary.Observe(_cache.Count);
}

public bool Exists(TEntity resource) => _cache.ContainsKey(resource.Metadata.Uid);

private CacheComparisonResult CompareCache(TEntity resource)
{
if (!Exists(resource))
Expand Down Expand Up @@ -101,8 +103,6 @@ private CacheComparisonResult CompareCache(TEntity resource)
return CacheComparisonResult.Other;
}

private bool Exists(TEntity resource) => _cache.ContainsKey(resource.Metadata.Uid);

private void Remove(string resourceUid)
{
_cache.TryRemove(resourceUid, out _);
Expand Down
6 changes: 4 additions & 2 deletions src/KubeOps/Operator/Controller/EventQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,10 @@ public EventQueue(
.GroupBy(e => e.Resource.Uid())
.Select(
group => group
.Select(ProcessDelay)
.Switch())
.GroupBy(e => e.Type)
.Select(typedGroup => typedGroup
.Select(ProcessDelay).Switch())
.Merge())
.Merge()
.Select(UpdateResourceData)
.Merge()
Expand Down
14 changes: 14 additions & 0 deletions src/KubeOps/Operator/Kubernetes/ResourceWatcher{TEntity}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
using k8s.Models;

using KubeOps.KubernetesClient;
using KubeOps.Operator.Caching;
using KubeOps.Operator.DevOps;

namespace KubeOps.Operator.Kubernetes;
Expand All @@ -21,6 +22,7 @@ internal class ResourceWatcher<TEntity> : IDisposable, IResourceWatcher<TEntity>
private readonly IKubernetesClient _client;
private readonly ILogger<ResourceWatcher<TEntity>> _logger;
private readonly IResourceWatcherMetrics<TEntity> _metrics;
private readonly IResourceCache<TEntity> _resourceCache;
private readonly OperatorSettings _settings;
private readonly Subject<TimeSpan> _reconnectHandler = new();
private readonly IDisposable _reconnectSubscription;
Expand All @@ -34,11 +36,13 @@ public ResourceWatcher(
IKubernetesClient client,
ILogger<ResourceWatcher<TEntity>> logger,
IResourceWatcherMetrics<TEntity> metrics,
IResourceCache<TEntity> resourceCache,
OperatorSettings settings)
{
_client = client;
_logger = logger;
_metrics = metrics;
_resourceCache = resourceCache;
_settings = settings;
_reconnectSubscription =
_reconnectHandler
Expand Down Expand Up @@ -131,6 +135,16 @@ private void OnWatcherEvent(WatchEventType type, TEntity resource)

_metrics.WatchedEvents.Inc();

if (_resourceCache.Exists(resource) && type == WatchEventType.Added)
{
_logger.LogTrace(
@"The resource ""{kind}/{name}"" binded to the watcher event already exist in cache. Skipping ""{watchEvent}"" event",
resource.Kind,
resource.Name(),
type);
return;
}

switch (type)
{
case WatchEventType.Added:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,11 @@

using KubeOps.KubernetesClient;
using KubeOps.Operator;
using KubeOps.Operator.Caching;
using KubeOps.Operator.DevOps;
using KubeOps.Operator.Kubernetes;
using KubeOps.Test.TestEntities;
using KubeOps.TestOperator.Entities;

using Microsoft.Extensions.Logging.Abstractions;
using Microsoft.Reactive.Testing;
Expand All @@ -35,6 +38,10 @@ public class TestResource : IKubernetesObject<V1ObjectMeta>

private readonly Mock<IKubernetesClient> _client = new();
private readonly Mock<IResourceWatcherMetrics<TestResource>> _metrics = new();
private readonly Mock<IResourceCache<TestResource>> _resourceCacheMock;

public ResourceWatcherTest() =>
_resourceCacheMock = new Mock<IResourceCache<TestResource>>(MockBehavior.Strict);

[Fact]
public async Task Should_Restart_Watcher_On_Exception()
Expand All @@ -50,6 +57,7 @@ public async Task Should_Restart_Watcher_On_Exception()
_client.Object,
new NullLogger<ResourceWatcher<TestResource>>(),
_metrics.Object,
_resourceCacheMock.Object,
settings);

var testScheduler = new TestScheduler();
Expand Down Expand Up @@ -105,6 +113,7 @@ public async Task Should_Not_Throw_Overflow_Exception()
_client.Object,
new NullLogger<ResourceWatcher<TestResource>>(),
_metrics.Object,
_resourceCacheMock.Object,
settings);

var testScheduler = new TestScheduler();
Expand Down Expand Up @@ -139,7 +148,12 @@ public async Task Should_Not_Dispose_Reconnect_Subject_Or_Throw_Exception_After_
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
_metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of<ICounter>());

using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
using var resourceWatcher = new ResourceWatcher<TestResource>(
_client.Object,
new NullLogger<ResourceWatcher<TestResource>>(),
_metrics.Object,
_resourceCacheMock.Object,
settings);

await resourceWatcher.StartAsync();

Expand All @@ -166,7 +180,12 @@ public async Task Should_Not_Restart_On_Serialization_Exception()
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
_metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of<ICounter>());

using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
using var resourceWatcher = new ResourceWatcher<TestResource>(
_client.Object,
new NullLogger<ResourceWatcher<TestResource>>(),
_metrics.Object,
_resourceCacheMock.Object,
settings);

await resourceWatcher.StartAsync();

Expand All @@ -188,7 +207,12 @@ public async Task Should_Be_Restarted_After_TaskCanceledException_IOException()

SetupResourceWatcherMetrics();

using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
using var resourceWatcher = new ResourceWatcher<TestResource>(
_client.Object,
new NullLogger<ResourceWatcher<TestResource>>(),
_metrics.Object,
_resourceCacheMock.Object,
settings);

await resourceWatcher.StartAsync();

Expand All @@ -212,6 +236,9 @@ public async Task Should_Publish_On_Watcher_Event()

Action<WatchEventType, TestResource> onWatcherEvent = null!;

_resourceCacheMock.Setup(c => c.Exists(It.IsAny<TestResource>()))
.Returns(false);

_client.Setup(
c => c.Watch(
It.IsAny<TimeSpan>(),
Expand All @@ -234,6 +261,7 @@ public async Task Should_Publish_On_Watcher_Event()
_client.Object,
new NullLogger<ResourceWatcher<TestResource>>(),
_metrics.Object,
_resourceCacheMock.Object,
settings);

var watchEvents = resourceWatcher.WatchEvents.Replay(1);
Expand Down Expand Up @@ -273,7 +301,12 @@ public async Task Should_Restart_Watcher_On_Close()
_metrics.Setup(c => c.Running).Returns(Mock.Of<IGauge>());
_metrics.Setup(c => c.WatcherClosed).Returns(Mock.Of<ICounter>());

using var resourceWatcher = new ResourceWatcher<TestResource>(_client.Object, new NullLogger<ResourceWatcher<TestResource>>(), _metrics.Object, settings);
using var resourceWatcher = new ResourceWatcher<TestResource>(
_client.Object,
new NullLogger<ResourceWatcher<TestResource>>(),
_metrics.Object,
_resourceCacheMock.Object,
settings);

await resourceWatcher.StartAsync();

Expand Down

0 comments on commit d6031c6

Please sign in to comment.