Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

reconciler: fix deletion wait and updater retry logic #380

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 23 additions & 24 deletions pkg/reconciler/internal/updater/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,41 +84,40 @@ func (u *Updater) Apply(ctx context.Context, obj *unstructured.Unstructured) err

backoff := retry.DefaultRetry

st := statusFor(obj)
needsStatusUpdate := false
for _, f := range u.updateStatusFuncs {
needsStatusUpdate = f(st) || needsStatusUpdate
}

// Always update the status first. During uninstall, if
// we remove the finalizer, updating the status will fail
// because the object and its status will be garbage-collected.
if err := retryOnRetryableUpdateError(backoff, func() error {
st := statusFor(obj)
needsStatusUpdate := false
for _, f := range u.updateStatusFuncs {
needsStatusUpdate = f(st) || needsStatusUpdate
if needsStatusUpdate {
uSt, err := runtime.DefaultUnstructuredConverter.ToUnstructured(st)
if err != nil {
return err
}
if needsStatusUpdate {
uSt, err := runtime.DefaultUnstructuredConverter.ToUnstructured(st)
if err != nil {
return err
}
obj.Object["status"] = uSt
obj.Object["status"] = uSt

if err := retryOnRetryableUpdateError(backoff, func() error {
return u.client.Status().Update(ctx, obj)
}); err != nil {
return err
}
return nil
}); err != nil {
return err
}

if err := retryOnRetryableUpdateError(backoff, func() error {
needsUpdate := false
for _, f := range u.updateFuncs {
needsUpdate = f(obj) || needsUpdate
}
if needsUpdate {
needsUpdate := false
for _, f := range u.updateFuncs {
needsUpdate = f(obj) || needsUpdate
}
if needsUpdate {
if err := retryOnRetryableUpdateError(backoff, func() error {
return u.client.Update(ctx, obj)
}); err != nil {
return err
}
return nil
}); err != nil {
return err
}

return nil
}

Expand Down
36 changes: 26 additions & 10 deletions pkg/reconciler/internal/updater/updater_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,19 @@ package updater

import (
"context"
"errors"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"helm.sh/helm/v3/pkg/release"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/types"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"

"github.com/operator-framework/helm-operator-plugins/pkg/reconciler/internal/conditions"
)
Expand All @@ -36,14 +39,15 @@ const testFinalizer = "testFinalizer"

var _ = Describe("Updater", func() {
var (
client client.Client
u Updater
obj *unstructured.Unstructured
cl client.Client
u Updater
obj *unstructured.Unstructured
interceptorFuncs interceptor.Funcs
)

BeforeEach(func() {
client = fake.NewClientBuilder().Build()
u = New(client)
JustBeforeEach(func() {
cl = fake.NewClientBuilder().WithInterceptorFuncs(interceptorFuncs).Build()
u = New(cl)
obj = &unstructured.Unstructured{Object: map[string]interface{}{
"apiVersion": "apps/v1",
"kind": "Deployment",
Expand All @@ -53,12 +57,12 @@ var _ = Describe("Updater", func() {
},
"spec": map[string]interface{}{},
}}
Expect(client.Create(context.TODO(), obj)).To(Succeed())
Expect(cl.Create(context.TODO(), obj)).To(Succeed())
})

When("the object does not exist", func() {
It("should fail", func() {
Expect(client.Delete(context.TODO(), obj)).To(Succeed())
Expect(cl.Delete(context.TODO(), obj)).To(Succeed())
u.Update(func(u *unstructured.Unstructured) bool {
u.SetAnnotations(map[string]string{"foo": "bar"})
return true
Expand All @@ -70,6 +74,18 @@ var _ = Describe("Updater", func() {
})

When("an update is a change", func() {
var updateCallCount int

BeforeEach(func() {
// On the first update of (status) subresource, return an error. After that do what is expected.
interceptorFuncs.SubResourceUpdate = func(ctx context.Context, interceptorClient client.Client, subResourceName string, obj client.Object, opts ...client.SubResourceUpdateOption) error {
updateCallCount++
if updateCallCount == 1 {
return errors.New("transient error")
}
return interceptorClient.SubResource(subResourceName).Update(ctx, obj, opts...)
}
})
It("should apply an update function", func() {
u.Update(func(u *unstructured.Unstructured) bool {
u.SetAnnotations(map[string]string{"foo": "bar"})
Expand All @@ -78,7 +94,7 @@ var _ = Describe("Updater", func() {
resourceVersion := obj.GetResourceVersion()

Expect(u.Apply(context.TODO(), obj)).To(Succeed())
Expect(client.Get(context.TODO(), types.NamespacedName{Namespace: "testNamespace", Name: "testDeployment"}, obj)).To(Succeed())
Expect(cl.Get(context.TODO(), types.NamespacedName{Namespace: "testNamespace", Name: "testDeployment"}, obj)).To(Succeed())
Expect(obj.GetAnnotations()["foo"]).To(Equal("bar"))
Expect(obj.GetResourceVersion()).NotTo(Equal(resourceVersion))
})
Expand All @@ -88,7 +104,7 @@ var _ = Describe("Updater", func() {
resourceVersion := obj.GetResourceVersion()

Expect(u.Apply(context.TODO(), obj)).To(Succeed())
Expect(client.Get(context.TODO(), types.NamespacedName{Namespace: "testNamespace", Name: "testDeployment"}, obj)).To(Succeed())
Expect(cl.Get(context.TODO(), types.NamespacedName{Namespace: "testNamespace", Name: "testDeployment"}, obj)).To(Succeed())
Expect((obj.Object["status"].(map[string]interface{}))["conditions"]).To(HaveLen(1))
Expect(obj.GetResourceVersion()).NotTo(Equal(resourceVersion))
})
Expand Down
3 changes: 3 additions & 0 deletions pkg/reconciler/internal/values/values.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"os"
"time"

"helm.sh/helm/v3/pkg/chartutil"
"helm.sh/helm/v3/pkg/strvals"
Expand All @@ -28,6 +29,8 @@ import (
"github.com/operator-framework/helm-operator-plugins/pkg/values"
)

var DefaultWaitForDeletionTimeout = 30 * time.Second

var DefaultMaxReleaseHistory = 10

var DefaultMapper = values.MapperFunc(func(v chartutil.Values) chartutil.Values { return v })
Expand Down
34 changes: 28 additions & 6 deletions pkg/reconciler/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import (
"time"

"github.com/go-logr/logr"
sdkhandler "github.com/operator-framework/operator-lib/handler"
errs "github.com/pkg/errors"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart"
Expand All @@ -46,6 +45,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"

sdkhandler "github.com/operator-framework/operator-lib/handler"

"github.com/operator-framework/helm-operator-plugins/internal/sdk/controllerutil"
"github.com/operator-framework/helm-operator-plugins/pkg/annotation"
helmclient "github.com/operator-framework/helm-operator-plugins/pkg/client"
Expand Down Expand Up @@ -78,6 +79,7 @@ type Reconciler struct {
skipDependentWatches bool
maxConcurrentReconciles int
reconcilePeriod time.Duration
waitForDeletionTimeout time.Duration
maxReleaseHistory *int
skipPrimaryGVKSchemeRegistration bool
controllerSetupFuncs []ControllerSetupFunc
Expand Down Expand Up @@ -332,6 +334,20 @@ func WithMaxConcurrentReconciles(max int) Option {
}
}

// WithWaitForDeletionTimeout is an Option that configures how long to wait for the client to
// report that the primary resource has been deleted. If the primary resource is not deleted
// within this time, the reconciler will return an error and retry reconciliation. By default,
// the reconciler will wait for 30s. This function requires positive values.
func WithWaitForDeletionTimeout(timeout time.Duration) Option {
return func(r *Reconciler) error {
if timeout <= 0 {
return errors.New("wait for deletion timeout must be a positive value")
}
r.waitForDeletionTimeout = timeout
return nil
}
}

// WithReconcilePeriod is an Option that configures the reconcile period of the
// controller. This will cause the controller to reconcile CRs at least once
// every period. By default, the reconcile period is set to 0, which means no
Expand Down Expand Up @@ -727,13 +743,15 @@ func (r *Reconciler) handleDeletion(ctx context.Context, actionClient helmclient
}

// Since the client is hitting a cache, waiting for the
// deletion here will guarantee that the next reconciliation
// deletion here will help ensure that the next reconciliation
// will see that the CR has been deleted and that there's
// nothing left to do.
if err := controllerutil.WaitForDeletion(ctx, r.client, obj); err != nil {
return err
}
return nil
//
// If the CR is not deleted within the timeout, the next reconciliation
// will attempt to uninstall the release again.
timeoutCtx, timeoutCancel := context.WithTimeout(ctx, r.waitForDeletionTimeout)
defer timeoutCancel()
return controllerutil.WaitForDeletion(timeoutCtx, r.client, obj)
}

func (r *Reconciler) getReleaseState(client helmclient.ActionInterface, obj metav1.Object, vals map[string]interface{}) (*release.Release, helmReleaseState, error) {
Expand Down Expand Up @@ -939,6 +957,10 @@ func (r *Reconciler) addDefaults(mgr ctrl.Manager, controllerName string) error
r.valueMapper = internalvalues.DefaultMapper
}

if r.waitForDeletionTimeout == 0 {
r.waitForDeletionTimeout = internalvalues.DefaultWaitForDeletionTimeout
}

if r.maxReleaseHistory == nil {
r.maxReleaseHistory = &internalvalues.DefaultMaxReleaseHistory
}
Expand Down
68 changes: 66 additions & 2 deletions pkg/reconciler/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@ import (
"strconv"
"time"

"github.com/go-logr/logr"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
. "github.com/onsi/gomega/gstruct"
sdkhandler "github.com/operator-framework/operator-lib/handler"

"github.com/go-logr/logr"
"helm.sh/helm/v3/pkg/action"
"helm.sh/helm/v3/pkg/chart"
"helm.sh/helm/v3/pkg/chartutil"
Expand All @@ -46,6 +46,7 @@ import (
"k8s.io/client-go/tools/record"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
"sigs.k8s.io/controller-runtime/pkg/client/interceptor"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/log/zap"
"sigs.k8s.io/controller-runtime/pkg/manager"
Expand All @@ -54,6 +55,8 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"
"sigs.k8s.io/yaml"

sdkhandler "github.com/operator-framework/operator-lib/handler"

"github.com/operator-framework/helm-operator-plugins/internal/sdk/controllerutil"
"github.com/operator-framework/helm-operator-plugins/pkg/annotation"
helmclient "github.com/operator-framework/helm-operator-plugins/pkg/client"
Expand Down Expand Up @@ -205,6 +208,18 @@ var _ = Describe("Reconciler", func() {
Expect(WithMaxConcurrentReconciles(-1)(r)).NotTo(Succeed())
})
})
_ = Describe("WithWaitForDeletionTimeout", func() {
It("should set the reconciler wait for deletion timeout", func() {
Expect(WithWaitForDeletionTimeout(time.Second)(r)).To(Succeed())
Expect(r.waitForDeletionTimeout).To(Equal(time.Second))
})
It("should fail if value is zero", func() {
Expect(WithWaitForDeletionTimeout(0)(r)).NotTo(Succeed())
})
It("should fail if value is negative", func() {
Expect(WithWaitForDeletionTimeout(-time.Second)(r)).NotTo(Succeed())
})
})
_ = Describe("WithReconcilePeriod", func() {
It("should set the reconciler reconcile period", func() {
Expect(WithReconcilePeriod(0)(r)).To(Succeed())
Expand Down Expand Up @@ -686,6 +701,55 @@ var _ = Describe("Reconciler", func() {
})
})
})
When("cache contains stale CR that has actually been deleted", func() {
// This test simulates what we expect to happen when we time out waiting for a CR that we
// deleted to be removed from the cache.
It("ignores not found errors and returns successfully", func() {
By("deleting the CR and then setting a finalizer on the stale CR", func() {
// We _actually_ remove the CR from the API server, but we'll make a fake client
// that returns the stale CR.
Expect(mgr.GetClient().Delete(ctx, obj)).To(Succeed())
Eventually(func() error {
return mgr.GetAPIReader().Get(ctx, objKey, obj)
}).Should(WithTransform(apierrors.IsNotFound, BeTrue()))

// We set the finalizer on the stale CR to simulate the typical state of the CR from a
// prior reconcile run that timed out waiting for the CR to be removed from the cache.
obj.SetFinalizers([]string{uninstallFinalizer})
})

By("configuring a client that returns the stale CR", func() {
// Make a client that returns the stale CR, but sends writes to the real client.
cl := fake.NewClientBuilder().WithObjects(obj).WithInterceptorFuncs(interceptor.Funcs{
Create: func(ctx context.Context, _ client.WithWatch, fakeObj client.Object, opts ...client.CreateOption) error {
return mgr.GetClient().Create(ctx, fakeObj, opts...)
},
Delete: func(ctx context.Context, _ client.WithWatch, fakeObj client.Object, opts ...client.DeleteOption) error {
return mgr.GetClient().Delete(ctx, fakeObj, opts...)
},
DeleteAllOf: func(ctx context.Context, _ client.WithWatch, fakeObj client.Object, opts ...client.DeleteAllOfOption) error {
return mgr.GetClient().DeleteAllOf(ctx, fakeObj, opts...)
},
Update: func(ctx context.Context, _ client.WithWatch, fakeObj client.Object, opts ...client.UpdateOption) error {
return mgr.GetClient().Update(ctx, fakeObj, opts...)
},
Patch: func(ctx context.Context, _ client.WithWatch, fakeObj client.Object, patch client.Patch, opts ...client.PatchOption) error {
return mgr.GetClient().Patch(ctx, fakeObj, patch, opts...)
},
SubResource: func(_ client.WithWatch, subresource string) client.SubResourceClient {
return mgr.GetClient().SubResource(subresource)
},
}).WithStatusSubresource(obj).Build()
r.client = cl
})

By("successfully ignoring not found errors and returning a nil error", func() {
res, err := r.Reconcile(ctx, req)
Expect(res).To(Equal(reconcile.Result{}))
Expect(err).ToNot(HaveOccurred())
})
})
})
When("CR is deleted, release is not present, but uninstall finalizer exists", func() {
It("removes the finalizer", func() {
By("adding the uninstall finalizer and deleting the CR", func() {
Expand Down