diff --git a/src/KubeOps/Operator/Caching/IResourceCache.cs b/src/KubeOps/Operator/Caching/IResourceCache.cs index 24a68345..59b10060 100644 --- a/src/KubeOps/Operator/Caching/IResourceCache.cs +++ b/src/KubeOps/Operator/Caching/IResourceCache.cs @@ -10,6 +10,8 @@ internal interface IResourceCache TEntity Upsert(TEntity resource, out CacheComparisonResult result); + bool Exists(TEntity resource); + void Fill(IEnumerable resources); void Remove(TEntity resource); diff --git a/src/KubeOps/Operator/Caching/ResourceCache{TEntity}.cs b/src/KubeOps/Operator/Caching/ResourceCache{TEntity}.cs index a52908d1..d823a543 100644 --- a/src/KubeOps/Operator/Caching/ResourceCache{TEntity}.cs +++ b/src/KubeOps/Operator/Caching/ResourceCache{TEntity}.cs @@ -74,6 +74,8 @@ public void Clear() _metrics.CachedItemsSummary.Observe(_cache.Count); } + public bool Exists(TEntity resource) => _cache.ContainsKey(resource.Metadata.Uid); + private CacheComparisonResult CompareCache(TEntity resource) { if (!Exists(resource)) @@ -101,8 +103,6 @@ private CacheComparisonResult CompareCache(TEntity resource) return CacheComparisonResult.Other; } - private bool Exists(TEntity resource) => _cache.ContainsKey(resource.Metadata.Uid); - private void Remove(string resourceUid) { _cache.TryRemove(resourceUid, out _); diff --git a/src/KubeOps/Operator/Controller/EventQueue.cs b/src/KubeOps/Operator/Controller/EventQueue.cs index 8582b0a5..e3e16689 100644 --- a/src/KubeOps/Operator/Controller/EventQueue.cs +++ b/src/KubeOps/Operator/Controller/EventQueue.cs @@ -49,8 +49,10 @@ public EventQueue( .GroupBy(e => e.Resource.Uid()) .Select( group => group - .Select(ProcessDelay) - .Switch()) + .GroupBy(e => e.Type) + .Select(typedGroup => typedGroup + .Select(ProcessDelay).Switch()) + .Merge()) .Merge() .Select(UpdateResourceData) .Merge() diff --git a/src/KubeOps/Operator/Kubernetes/ResourceWatcher{TEntity}.cs b/src/KubeOps/Operator/Kubernetes/ResourceWatcher{TEntity}.cs index d34ea15d..cc489887 100644 --- a/src/KubeOps/Operator/Kubernetes/ResourceWatcher{TEntity}.cs +++ b/src/KubeOps/Operator/Kubernetes/ResourceWatcher{TEntity}.cs @@ -8,6 +8,7 @@ using k8s.Models; using KubeOps.KubernetesClient; +using KubeOps.Operator.Caching; using KubeOps.Operator.DevOps; namespace KubeOps.Operator.Kubernetes; @@ -21,6 +22,7 @@ internal class ResourceWatcher : IDisposable, IResourceWatcher private readonly IKubernetesClient _client; private readonly ILogger> _logger; private readonly IResourceWatcherMetrics _metrics; + private readonly IResourceCache _resourceCache; private readonly OperatorSettings _settings; private readonly Subject _reconnectHandler = new(); private readonly IDisposable _reconnectSubscription; @@ -34,11 +36,13 @@ public ResourceWatcher( IKubernetesClient client, ILogger> logger, IResourceWatcherMetrics metrics, + IResourceCache resourceCache, OperatorSettings settings) { _client = client; _logger = logger; _metrics = metrics; + _resourceCache = resourceCache; _settings = settings; _reconnectSubscription = _reconnectHandler @@ -131,6 +135,16 @@ private void OnWatcherEvent(WatchEventType type, TEntity resource) _metrics.WatchedEvents.Inc(); + if (_resourceCache.Exists(resource) && type == WatchEventType.Added) + { + _logger.LogTrace( + @"The resource ""{kind}/{name}"" binded to the watcher event already exist in cache. Skipping ""{watchEvent}"" event", + resource.Kind, + resource.Name(), + type); + return; + } + switch (type) { case WatchEventType.Added: diff --git a/tests/KubeOps.Test/Operator/Kubernetes/ResourceWatcher{TEntity}.Test.cs b/tests/KubeOps.Test/Operator/Kubernetes/ResourceWatcher{TEntity}.Test.cs index ac791dee..4e221590 100644 --- a/tests/KubeOps.Test/Operator/Kubernetes/ResourceWatcher{TEntity}.Test.cs +++ b/tests/KubeOps.Test/Operator/Kubernetes/ResourceWatcher{TEntity}.Test.cs @@ -9,8 +9,11 @@ using KubeOps.KubernetesClient; using KubeOps.Operator; +using KubeOps.Operator.Caching; using KubeOps.Operator.DevOps; using KubeOps.Operator.Kubernetes; +using KubeOps.Test.TestEntities; +using KubeOps.TestOperator.Entities; using Microsoft.Extensions.Logging.Abstractions; using Microsoft.Reactive.Testing; @@ -35,6 +38,10 @@ public class TestResource : IKubernetesObject private readonly Mock _client = new(); private readonly Mock> _metrics = new(); + private readonly Mock> _resourceCacheMock; + + public ResourceWatcherTest() => + _resourceCacheMock = new Mock>(MockBehavior.Strict); [Fact] public async Task Should_Restart_Watcher_On_Exception() @@ -50,6 +57,7 @@ public async Task Should_Restart_Watcher_On_Exception() _client.Object, new NullLogger>(), _metrics.Object, + _resourceCacheMock.Object, settings); var testScheduler = new TestScheduler(); @@ -105,6 +113,7 @@ public async Task Should_Not_Throw_Overflow_Exception() _client.Object, new NullLogger>(), _metrics.Object, + _resourceCacheMock.Object, settings); var testScheduler = new TestScheduler(); @@ -139,7 +148,12 @@ public async Task Should_Not_Dispose_Reconnect_Subject_Or_Throw_Exception_After_ _metrics.Setup(c => c.Running).Returns(Mock.Of()); _metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of()); - using var resourceWatcher = new ResourceWatcher(_client.Object, new NullLogger>(), _metrics.Object, settings); + using var resourceWatcher = new ResourceWatcher( + _client.Object, + new NullLogger>(), + _metrics.Object, + _resourceCacheMock.Object, + settings); await resourceWatcher.StartAsync(); @@ -166,7 +180,12 @@ public async Task Should_Not_Restart_On_Serialization_Exception() _metrics.Setup(c => c.Running).Returns(Mock.Of()); _metrics.Setup(c => c.WatcherExceptions).Returns(Mock.Of()); - using var resourceWatcher = new ResourceWatcher(_client.Object, new NullLogger>(), _metrics.Object, settings); + using var resourceWatcher = new ResourceWatcher( + _client.Object, + new NullLogger>(), + _metrics.Object, + _resourceCacheMock.Object, + settings); await resourceWatcher.StartAsync(); @@ -188,7 +207,12 @@ public async Task Should_Be_Restarted_After_TaskCanceledException_IOException() SetupResourceWatcherMetrics(); - using var resourceWatcher = new ResourceWatcher(_client.Object, new NullLogger>(), _metrics.Object, settings); + using var resourceWatcher = new ResourceWatcher( + _client.Object, + new NullLogger>(), + _metrics.Object, + _resourceCacheMock.Object, + settings); await resourceWatcher.StartAsync(); @@ -212,6 +236,9 @@ public async Task Should_Publish_On_Watcher_Event() Action onWatcherEvent = null!; + _resourceCacheMock.Setup(c => c.Exists(It.IsAny())) + .Returns(false); + _client.Setup( c => c.Watch( It.IsAny(), @@ -234,6 +261,7 @@ public async Task Should_Publish_On_Watcher_Event() _client.Object, new NullLogger>(), _metrics.Object, + _resourceCacheMock.Object, settings); var watchEvents = resourceWatcher.WatchEvents.Replay(1); @@ -273,7 +301,12 @@ public async Task Should_Restart_Watcher_On_Close() _metrics.Setup(c => c.Running).Returns(Mock.Of()); _metrics.Setup(c => c.WatcherClosed).Returns(Mock.Of()); - using var resourceWatcher = new ResourceWatcher(_client.Object, new NullLogger>(), _metrics.Object, settings); + using var resourceWatcher = new ResourceWatcher( + _client.Object, + new NullLogger>(), + _metrics.Object, + _resourceCacheMock.Object, + settings); await resourceWatcher.StartAsync();