Skip to content

Commit

Permalink
Fix race condition when multiple nodes reconcile S3 snapshots
Browse files Browse the repository at this point in the history
Don't delete s3 etcdsnapshotfiles if they are missing from s3 but less than a minute old, its possible the other node just finished uploading it and the object key has not yet become visible.

Signed-off-by: Brad Davidson <[email protected]>
  • Loading branch information
brandond committed Oct 7, 2024
1 parent 38d13e0 commit 0826ebc
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 17 deletions.
25 changes: 19 additions & 6 deletions pkg/etcd/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ func (c *Client) SnapshotRetention(ctx context.Context, retention int, prefix st
logrus.Infof("Removing S3 snapshot: s3://%s/%s", c.etcdS3.Bucket, df.Key)

key := path.Base(df.Key)
if err := c.DeleteSnapshot(ctx, key); err != nil {
if err := c.DeleteSnapshot(ctx, key); err != nil && !snapshot.IsNotExist(err) {
return deleted, err
}
deleted = append(deleted, key)
Expand All @@ -431,14 +431,27 @@ func (c *Client) DeleteSnapshot(ctx context.Context, key string) error {
defer cancel()

key = path.Join(c.etcdS3.Folder, key)
err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, key, minio.RemoveObjectOptions{})
if err == nil || snapshot.IsNotExist(err) {
metadataKey := path.Join(path.Dir(key), snapshot.MetadataDir, path.Base(key))
if merr := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, metadataKey, minio.RemoveObjectOptions{}); merr != nil && !snapshot.IsNotExist(merr) {
err = merr
_, err := c.mc.StatObject(ctx, c.etcdS3.Bucket, key, minio.StatObjectOptions{})
if err == nil {
if err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, key, minio.RemoveObjectOptions{}); err != nil {
return err
}
}

// check for and try to delete the metadata regardless of whether or not the
// snapshot existed, just to ensure that things are cleaned up in the case of
// ephemeral errors. Metadata delete errors are only exposed if the object
// exists and fails to delete.
metadataKey := path.Join(path.Dir(key), snapshot.MetadataDir, path.Base(key))
_, merr := c.mc.StatObject(ctx, c.etcdS3.Bucket, metadataKey, minio.StatObjectOptions{})
if merr == nil {
if err := c.mc.RemoveObject(ctx, c.etcdS3.Bucket, metadataKey, minio.RemoveObjectOptions{}); err != nil {
return err
}
}

// return error from snapshot StatObject call, so that callers can determine
// if the object was actually deleted or not by checking for a NotFound error.
return err
}

Expand Down
43 changes: 32 additions & 11 deletions pkg/etcd/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"runtime"
"slices"
"sort"
"strconv"
"strings"
Expand Down Expand Up @@ -41,6 +42,7 @@ import (

const (
errorTTL = 24 * time.Hour
s3ReconcileTTL = time.Minute
snapshotListPageSize = 20
)

Expand Down Expand Up @@ -363,7 +365,7 @@ func (e *ETCD) Snapshot(ctx context.Context) (*managed.SnapshotResult, error) {
}
}

return res, e.ReconcileSnapshotData(ctx)
return res, e.reconcileSnapshotData(ctx, res)
}

// listLocalSnapshots provides a list of the currently stored
Expand Down Expand Up @@ -464,7 +466,7 @@ func (e *ETCD) PruneSnapshots(ctx context.Context) (*managed.SnapshotResult, err
res.Deleted = append(res.Deleted, deleted...)
}
}
return res, e.ReconcileSnapshotData(ctx)
return res, e.reconcileSnapshotData(ctx, res)
}

// ListSnapshots returns a list of snapshots. Local snapshots are always listed,
Expand Down Expand Up @@ -555,7 +557,7 @@ func (e *ETCD) DeleteSnapshots(ctx context.Context, snapshots []string) (*manage
}
}

return res, e.ReconcileSnapshotData(ctx)
return res, e.reconcileSnapshotData(ctx, res)
}

func (e *ETCD) deleteSnapshot(snapshotPath string) error {
Expand Down Expand Up @@ -647,9 +649,17 @@ func (e *ETCD) emitEvent(esf *k3s.ETCDSnapshotFile) {
}

// ReconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources.
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to list S3 snapshots
// and reconcile snapshots from S3.
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to
// list S3 snapshots and reconcile snapshots from S3.
func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
return e.reconcileSnapshotData(ctx, nil)
}

// reconcileSnapshotData reconciles snapshot data in the ETCDSnapshotFile resources.
// It will reconcile snapshot data from disk locally always, and if S3 is enabled, will attempt to
// list S3 snapshots and reconcile snapshots from S3. Any snapshots listed in the Deleted field of
// the provided SnapshotResult are deleted, even if they are within a retention window.
func (e *ETCD) reconcileSnapshotData(ctx context.Context, res *managed.SnapshotResult) error {
// make sure the core.Factory is initialized. There can
// be a race between this core code startup.
for e.config.Runtime.Core == nil {
Expand Down Expand Up @@ -726,6 +736,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
snapshots := e.config.Runtime.K3s.K3s().V1().ETCDSnapshotFile()
snapshotPager := pager.New(pager.SimplePageFunc(func(opts metav1.ListOptions) (k8sruntime.Object, error) { return snapshots.List(opts) }))
snapshotPager.PageSize = snapshotListPageSize
now := time.Now().Round(time.Second)

// List all snapshots matching the selector
// If a snapshot from Kubernetes was found on disk/s3, it is in sync and we can remove it from the map to sync.
Expand All @@ -742,10 +753,20 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
// exists in both and names match, don't need to sync
delete(snapshotFiles, sfKey)
} else {
// doesn't exist on disk - if it's an error that hasn't expired yet, leave it, otherwise remove it
if esf.Status.Error != nil && esf.Status.Error.Time != nil {
// doesn't exist on disk/s3
if res != nil && slices.Contains(res.Deleted, esf.Spec.SnapshotName) {
// snapshot has been intentionally deleted, skip checking for expiration
} else if esf.Status.Error != nil && esf.Status.Error.Time != nil {
expires := esf.Status.Error.Time.Add(errorTTL)
if time.Now().Before(expires) {
if now.Before(expires) {
// it's an error that hasn't expired yet, leave it
return nil
}
} else if esf.Spec.S3 != nil {
expires := esf.ObjectMeta.CreationTimestamp.Add(s3ReconcileTTL)
if now.Before(expires) {
// it's an s3 snapshot that's only just been created, leave it to prevent a race condition
// when multiple nodes are uploading snapshots at the same time.
return nil
}
}
Expand All @@ -754,6 +775,7 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
} else {
logrus.Debugf("Key %s not found in snapshotFile list", sfKey)
}
// otherwise remove it
logrus.Infof("Deleting ETCDSnapshotFile for %s", esf.Spec.SnapshotName)
if err := snapshots.Delete(esf.Name, &metav1.DeleteOptions{}); err != nil {
logrus.Errorf("Failed to delete ETCDSnapshotFile: %v", err)
Expand Down Expand Up @@ -817,18 +839,17 @@ func (e *ETCD) ReconcileSnapshotData(ctx context.Context) error {
}

// Update our Node object to note the timestamp of the snapshot storages that have been reconciled
now := time.Now().Round(time.Second).Format(time.RFC3339)
patch := []map[string]string{
{
"op": "add",
"value": now,
"value": now.Format(time.RFC3339),
"path": "/metadata/annotations/" + strings.ReplaceAll(annotationLocalReconciled, "/", "~1"),
},
}
if e.config.EtcdS3 != nil {
patch = append(patch, map[string]string{
"op": "add",
"value": now,
"value": now.Format(time.RFC3339),
"path": "/metadata/annotations/" + strings.ReplaceAll(annotationS3Reconciled, "/", "~1"),
})
}
Expand Down

0 comments on commit 0826ebc

Please sign in to comment.