Skip to content

Commit

Permalink
reconciler: fix deletion wait and updater retry logic
Browse files Browse the repository at this point in the history
Signed-off-by: Joe Lanford <[email protected]>
  • Loading branch information
joelanford committed Aug 22, 2024
1 parent 016690c commit b6d580d
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 42 deletions.
47 changes: 23 additions & 24 deletions pkg/reconciler/internal/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,41 +84,40 @@ func (u *Updater) Apply(ctx context.Context, obj *unstructured.Unstructured) err

backoff := retry.DefaultRetry

st := statusFor(obj)
needsStatusUpdate := false
for _, f := range u.updateStatusFuncs {
needsStatusUpdate = f(st) || needsStatusUpdate
}

// Always update the status first. During uninstall, if
// we remove the finalizer, updating the status will fail
// because the object and its status will be garbage-collected.
if err := retryOnRetryableUpdateError(backoff, func() error {
st := statusFor(obj)
needsStatusUpdate := false
for _, f := range u.updateStatusFuncs {
needsStatusUpdate = f(st) || needsStatusUpdate
if needsStatusUpdate {
uSt, err := runtime.DefaultUnstructuredConverter.ToUnstructured(st)
if err != nil {
return err
}
if needsStatusUpdate {
uSt, err := runtime.DefaultUnstructuredConverter.ToUnstructured(st)
if err != nil {
return err
}
obj.Object["status"] = uSt
obj.Object["status"] = uSt

if err := retryOnRetryableUpdateError(backoff, func() error {
return u.client.Status().Update(ctx, obj)
}); err != nil {
return err
}
return nil
}); err != nil {
return err
}

if err := retryOnRetryableUpdateError(backoff, func() error {
needsUpdate := false
for _, f := range u.updateFuncs {
needsUpdate = f(obj) || needsUpdate
}
if needsUpdate {
needsUpdate := false
for _, f := range u.updateFuncs {
needsUpdate = f(obj) || needsUpdate
}
if needsUpdate {
if err := retryOnRetryableUpdateError(backoff, func() error {
return u.client.Update(ctx, obj)
}); err != nil {
return err
}
return nil
}); err != nil {
return err
}

return nil
}

Expand Down
36 changes: 26 additions & 10 deletions pkg/reconciler/internal/updater/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ package updater

import (
"context"
"errors"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"helm.sh/helm/v3/pkg/release"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"

"github.com/operator-framework/helm-operator-plugins/pkg/reconciler/internal/conditions"
)
Expand All @@ -36,14 +39,15 @@ const testFinalizer = "testFinalizer"

var _ = Describe("Updater", func() {
var (
client client.Client
u Updater
obj *unstructured.Unstructured
cl client.Client
u Updater
obj *unstructured.Unstructured
interceptorFuncs interceptor.Funcs
)

BeforeEach(func() {
client = fake.NewClientBuilder().Build()
u = New(client)
JustBeforeEach(func() {
cl = fake.NewClientBuilder().WithInterceptorFuncs(interceptorFuncs).Build()
u = New(cl)
obj = &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
Expand All @@ -53,12 +57,12 @@ var _ = Describe("Updater", func() {
},
"spec": map[string]interface{}{},
}}
Expect(client.Create(context.TODO(), obj)).To(Succeed())
Expect(cl.Create(context.TODO(), obj)).To(Succeed())
})

When("the object does not exist", func() {
It("should fail", func() {
Expect(client.Delete(context.TODO(), obj)).To(Succeed())
Expect(cl.Delete(context.TODO(), obj)).To(Succeed())
u.Update(func(u *unstructured.Unstructured) bool {
u.SetAnnotations(map[string]string{"foo": "bar"})
return true
Expand All @@ -70,6 +74,18 @@ var _ = Describe("Updater", func() {
})

When("an update is a change", func() {
var updateCallCount int

BeforeEach(func() {
// On the first update of (status) subresource, return an error. After that do what is expected.
interceptorFuncs.SubResourceUpdate = func(ctx context.Context, interceptorClient client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error {
updateCallCount++
if updateCallCount == 1 {
return errors.New("transient error")
}
return interceptorClient.SubResource(subResourceName).Update(ctx, obj, opts...)
}
})
It("should apply an update function", func() {
u.Update(func(u *unstructured.Unstructured) bool {
u.SetAnnotations(map[string]string{"foo": "bar"})
Expand All @@ -78,7 +94,7 @@ var _ = Describe("Updater", func() {
resourceVersion := obj.GetResourceVersion()

Expect(u.Apply(context.TODO(), obj)).To(Succeed())
Expect(client.Get(context.TODO(), types.NamespacedName{Namespace: "testNamespace", Name: "testDeployment"}, obj)).To(Succeed())
Expect(cl.Get(context.TODO(), types.NamespacedName{Namespace: "testNamespace", Name: "testDeployment"}, obj)).To(Succeed())
Expect(obj.GetAnnotations()["foo"]).To(Equal("bar"))
Expect(obj.GetResourceVersion()).NotTo(Equal(resourceVersion))
})
Expand All @@ -88,7 +104,7 @@ var _ = Describe("Updater", func() {
resourceVersion := obj.GetResourceVersion()

Expect(u.Apply(context.TODO(), obj)).To(Succeed())
Expect(client.Get(context.TODO(), types.NamespacedName{Namespace: "testNamespace", Name: "testDeployment"}, obj)).To(Succeed())
Expect(cl.Get(context.TODO(), types.NamespacedName{Namespace: "testNamespace", Name: "testDeployment"}, obj)).To(Succeed())
Expect((obj.Object["status"].(map[string]interface{}))["conditions"]).To(HaveLen(1))
Expect(obj.GetResourceVersion()).NotTo(Equal(resourceVersion))
})
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/internal/values/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,11 @@ import (
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

"github.com/operator-framework/helm-operator-plugins/pkg/values"
"time"
)

var DefaultWaitForDeletionTimeout = 30 * time.Second

var DefaultMaxReleaseHistory = 10

var DefaultMapper = values.MapperFunc(func(v chartutil.Values) chartutil.Values { return v })
Expand Down
34 changes: 28 additions & 6 deletions pkg/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"time"

"github.com/go-logr/logr"
sdkhandler "github.com/operator-framework/operator-lib/handler"
errs "github.com/pkg/errors"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart"
Expand All @@ -46,6 +45,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

sdkhandler "github.com/operator-framework/operator-lib/handler"

"github.com/operator-framework/helm-operator-plugins/internal/sdk/controllerutil"
"github.com/operator-framework/helm-operator-plugins/pkg/annotation"
helmclient "github.com/operator-framework/helm-operator-plugins/pkg/client"
Expand Down Expand Up @@ -78,6 +79,7 @@ type Reconciler struct {
skipDependentWatches bool
maxConcurrentReconciles int
reconcilePeriod time.Duration
waitForDeletionTimeout time.Duration
maxReleaseHistory *int
skipPrimaryGVKSchemeRegistration bool
controllerSetupFuncs []ControllerSetupFunc
Expand Down Expand Up @@ -332,6 +334,20 @@ func WithMaxConcurrentReconciles(max int) Option {
}
}

// WithWaitForDeletionTimeout is an Option that configures how long to wait for the client to
// report that the primary resource has been deleted. If the primary resource is not deleted
// within this time, the reconciler will return an error and retry reconciliation. By default,
// the reconciler will wait for 30s. This function requires positive values.
func WithWaitForDeletionTimeout(timeout time.Duration) Option {
return func(r *Reconciler) error {
if timeout <= 0 {
return errors.New("wait for deletion timeout must be a positive value")
}
r.waitForDeletionTimeout = timeout
return nil
}
}

// WithReconcilePeriod is an Option that configures the reconcile period of the
// controller. This will cause the controller to reconcile CRs at least once
// every period. By default, the reconcile period is set to 0, which means no
Expand Down Expand Up @@ -727,13 +743,15 @@ func (r *Reconciler) handleDeletion(ctx context.Context, actionClient helmclient
}

// Since the client is hitting a cache, waiting for the
// deletion here will guarantee that the next reconciliation
// deletion here will help ensure that the next reconciliation
// will see that the CR has been deleted and that there's
// nothing left to do.
if err := controllerutil.WaitForDeletion(ctx, r.client, obj); err != nil {
return err
}
return nil
//
// If the CR is not deleted within the timeout, the next reconciliation
// will attempt to uninstall the release again.
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, r.waitForDeletionTimeout)
defer timeoutCancel()
return controllerutil.WaitForDeletion(timeoutCtx, r.client, obj)
}

func (r *Reconciler) getReleaseState(client helmclient.ActionInterface, obj metav1.Object, vals map[string]interface{}) (*release.Release, helmReleaseState, error) {
Expand Down Expand Up @@ -939,6 +957,10 @@ func (r *Reconciler) addDefaults(mgr ctrl.Manager, controllerName string) error
r.valueMapper = internalvalues.DefaultMapper
}

if r.waitForDeletionTimeout == 0 {
r.waitForDeletionTimeout = internalvalues.DefaultWaitForDeletionTimeout
}

if r.maxReleaseHistory == nil {
r.maxReleaseHistory = &internalvalues.DefaultMaxReleaseHistory
}
Expand Down
68 changes: 66 additions & 2 deletions pkg/reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
"strconv"
"time"

"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gstruct"
sdkhandler "github.com/operator-framework/operator-lib/handler"

"github.com/go-logr/logr"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/chartutil"
Expand All @@ -46,6 +46,7 @@ import (
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -54,6 +55,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
"sigs.k8s.io/yaml"

sdkhandler "github.com/operator-framework/operator-lib/handler"

"github.com/operator-framework/helm-operator-plugins/internal/sdk/controllerutil"
"github.com/operator-framework/helm-operator-plugins/pkg/annotation"
helmclient "github.com/operator-framework/helm-operator-plugins/pkg/client"
Expand Down Expand Up @@ -205,6 +208,18 @@ var _ = Describe("Reconciler", func() {
Expect(WithMaxConcurrentReconciles(-1)(r)).NotTo(Succeed())
})
})
_ = Describe("WithWaitForDeletionTimeout", func() {
It("should set the reconciler wait for deletion timeout", func() {
Expect(WithWaitForDeletionTimeout(time.Second)(r)).To(Succeed())
Expect(r.waitForDeletionTimeout).To(Equal(time.Second))
})
It("should fail if value is zero", func() {
Expect(WithWaitForDeletionTimeout(0)(r)).NotTo(Succeed())
})
It("should fail if value is negative", func() {
Expect(WithWaitForDeletionTimeout(-time.Second)(r)).NotTo(Succeed())
})
})
_ = Describe("WithReconcilePeriod", func() {
It("should set the reconciler reconcile period", func() {
Expect(WithReconcilePeriod(0)(r)).To(Succeed())
Expand Down Expand Up @@ -686,6 +701,55 @@ var _ = Describe("Reconciler", func() {
})
})
})
When("cache contains stale CR that has actually been deleted", func() {
// This test simulates what we expect to happen when we time out waiting for a CR that we
// deleted to be removed from the cache.
It("ignores not found errors and returns successfully", func() {
By("deleting the CR and then setting a finalizer on the stale CR", func() {
// We _actually_ remove the CR from the API server, but we'll make a fake client
// that returns the stale CR.
Expect(mgr.GetClient().Delete(ctx, obj)).To(Succeed())
Eventually(func() error {
return mgr.GetAPIReader().Get(ctx, objKey, obj)
}).Should(WithTransform(apierrors.IsNotFound, BeTrue()))

// We set the finalizer on the stale CR to simulate the typical state of the CR from a
// prior reconcile run that timed out waiting for the CR to be removed from the cache.
obj.SetFinalizers([]string{uninstallFinalizer})
})

By("configuring a client that returns the stale CR", func() {
// Make a client that returns the stale CR, but sends writes to the real client.
cl := fake.NewClientBuilder().WithObjects(obj).WithInterceptorFuncs(interceptor.Funcs{
Create: func(ctx context.Context, _ client.WithWatch, fakeObj client.Object, opts ...client.CreateOption) error {
return mgr.GetClient().Create(ctx, fakeObj, opts...)
},
Delete: func(ctx context.Context, _ client.WithWatch, fakeObj client.Object, opts ...client.DeleteOption) error {
return mgr.GetClient().Delete(ctx, fakeObj, opts...)
},
DeleteAllOf: func(ctx context.Context, _ client.WithWatch, fakeObj client.Object, opts ...client.DeleteAllOfOption) error {
return mgr.GetClient().DeleteAllOf(ctx, fakeObj, opts...)
},
Update: func(ctx context.Context, _ client.WithWatch, fakeObj client.Object, opts ...client.UpdateOption) error {
return mgr.GetClient().Update(ctx, fakeObj, opts...)
},
Patch: func(ctx context.Context, _ client.WithWatch, fakeObj client.Object, patch client.Patch, opts ...client.PatchOption) error {
return mgr.GetClient().Patch(ctx, fakeObj, patch, opts...)
},
SubResource: func(_ client.WithWatch, subresource string) client.SubResourceClient {
return mgr.GetClient().SubResource(subresource)
},
}).WithStatusSubresource(obj).Build()
r.client = cl
})

By("successfully ignoring not found errors and returning a nil error", func() {
res, err := r.Reconcile(ctx, req)
Expect(res).To(Equal(reconcile.Result{}))
Expect(err).ToNot(HaveOccurred())
})
})
})
When("CR is deleted, release is not present, but uninstall finalizer exists", func() {
It("removes the finalizer", func() {
By("adding the uninstall finalizer and deleting the CR", func() {
Expand Down

0 comments on commit b6d580d

Please sign in to comment.