Skip to content

Commit

Permalink
feat(operator): add better error handling and reconnection logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Christoph Bühler committed Oct 6, 2023
1 parent 003950a commit 62db9ad
Show file tree
Hide file tree
Showing 8 changed files with 27 additions and 131 deletions.
28 changes: 0 additions & 28 deletions _old/src/KubeOps/Operator/Errors/BackoffStrategies.cs

This file was deleted.

18 changes: 0 additions & 18 deletions _old/src/KubeOps/Operator/Errors/CrdConversionException.cs

This file was deleted.

19 changes: 0 additions & 19 deletions _old/src/KubeOps/Operator/Errors/CrdPropertyTypeException.cs

This file was deleted.

31 changes: 0 additions & 31 deletions examples/Operator/Controller/V1SecondEntityController.cs

This file was deleted.

11 changes: 0 additions & 11 deletions examples/Operator/Entities/V1SecondEntity.cs

This file was deleted.

13 changes: 0 additions & 13 deletions examples/Operator/Finalizer/FinalizerTwo.cs

This file was deleted.

1 change: 0 additions & 1 deletion examples/Operator/todos.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
todo:
- error handling
- web: webhooks
- docs
37 changes: 27 additions & 10 deletions src/KubeOps.Operator/Watcher/ResourceWatcher{TEntity}.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ internal class ResourceWatcher<TEntity> : IHostedService
private readonly ConcurrentDictionary<string, long> _entityCache = new();
private readonly Lazy<List<FinalizerRegistration>> _finalizers;
private bool _stopped;
private uint _watcherReconnectRetries;

private Watcher<TEntity>? _watcher;

Expand Down Expand Up @@ -81,21 +82,14 @@ private void WatchResource()
}
}

_logger.LogDebug("""Create watcher for entity of type "{type}".""", typeof(TEntity));
_watcher = _client.Watch(OnEvent, OnError, OnClosed, @namespace: _settings.Namespace);
}

private void StopWatching()
{
_watcher?.Dispose();
}

private void OnClosed()
{
_logger.LogDebug("The server closed the connection.");
if (!_stopped)
{
WatchResource();
}
_watcher = null;
}

private async void OnEntityRequeue(object? sender, (string Name, string? Namespace) queued)
Expand All @@ -116,7 +110,7 @@ private async void OnEntityRequeue(object? sender, (string Name, string? Namespa
await ReconcileModification(entity);
}

private void OnError(Exception e)
private async void OnError(Exception e)
{
switch (e)
{
Expand All @@ -138,11 +132,34 @@ e.InnerException is EndOfStreamException &&
}

_logger.LogError(e, """There was an error while watching the resource "{resource}".""", typeof(TEntity));
StopWatching();
_watcherReconnectRetries++;

var delay = TimeSpan
.FromSeconds(Math.Pow(2, Math.Clamp(_watcherReconnectRetries, 0, 5)))
.Add(TimeSpan.FromMilliseconds(new Random().Next(0, 1000)));
_logger.LogWarning(
"There were {retries} errors / retries in the watcher. Wait {seconds}s before next attempt to connect.",
_watcherReconnectRetries,
delay.TotalSeconds);
await Task.Delay(delay);

WatchResource();
}

private void OnClosed()
{
_logger.LogDebug("The watcher was closed.");
if (!_stopped && _watcherReconnectRetries == 0)
{
WatchResource();
}
}

private async void OnEvent(WatchEventType type, TEntity entity)
{
_watcherReconnectRetries = 0;

_logger.LogTrace(
"""Received watch event "{eventType}" for "{kind}/{name}".""",
type,
Expand Down

0 comments on commit 62db9ad

Please sign in to comment.