Skip to content

Commit

Permalink
Merge pull request #2233 from loft-sh/thomaskosiewski/etcd-client-fixes
Browse files Browse the repository at this point in the history
fix: Only get PartialObjectMetadata in mappings GC
  • Loading branch information
cbron authored Oct 18, 2024
2 parents ce8c161 + a8ae8c5 commit 630cfa6
Show file tree
Hide file tree
Showing 15 changed files with 274 additions and 96 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ jobs:
- id: set-paths-matrix
run: |
set -x
sudo apt-get install -y jq yq
sudo apt-get install -y jq
paths=$(ls -d ./test/e2e*)
echo "matrix=$(printf '%s\n' "${paths}" | jq -R . | jq -cs .)" >> "$GITHUB_OUTPUT"
outputs:
Expand Down Expand Up @@ -268,6 +268,9 @@ jobs:
run: |
sudo apt-get install -y sed
- name: Install yq@v4
run: go install github.com/mikefarah/yq/v4@latest

- name: create vcluster with current cli
run: |
chmod +x ./vcluster-current
Expand Down
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ linters:
- asasalint
- asciicheck
- bidichk
- copyloopvar
- decorder
- dupl
- durationcheck
- errcheck
- errname
- errorlint
- exhaustive
- exportloopref
- ginkgolinter
- gocheckcompilerdirectives
- gofmt
Expand Down
2 changes: 1 addition & 1 deletion .goreleaser.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ signs:
artifacts: checksum

snapshot:
name_template: "{{ incpatch .Version }}-next"
version_template: "{{ incpatch .Version }}-next"

changelog:
use: github
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile.release
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
ARG KINE_VERSION="v0.13.1"
FROM rancher/kine:${KINE_VERSION} as kine
FROM rancher/kine:${KINE_VERSION} AS kine

# Build the manager binary
FROM alpine:3.20 as builder
FROM alpine:3.20 AS builder

WORKDIR /vcluster-dev

Expand Down
22 changes: 22 additions & 0 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,25 @@ gen-license-report:
go-licenses save --save_path=./licenses --ignore github.com/loft-sh ./...

cp -r ./licenses ./cmd/vclusterctl/cmd/credits

build-dev-image tag="":
TELEMETRY_PRIVATE_KEY="" goreleaser build --snapshot --clean

cp dist/vcluster_linux_$(go env GOARCH | sed s/amd64/amd64_v1/g)/vcluster ./vcluster
docker build -t vcluster:dev-{{tag}} -f Dockerfile.release --build-arg TARGETARCH=$(uname -m) --build-arg TARGETOS=linux .
rm ./vcluster

run-conformance k8s_version="1.31.1" mode="conformance-lite" tag="conf": (build-dev-image tag)
minikube start --kubernetes-version {{k8s_version}} --nodes=2
minikube addons enable metrics-server
minikube image load vcluster:dev-{{tag}}

vcluster create vcluster -n vcluster -f vcluster.yaml

sonobuoy run --mode={{mode}} --level=debug

conformance-status:
sonobuoy status

conformance-logs:
sonobuoy logs
2 changes: 1 addition & 1 deletion cmd/vcluster/cmd/debug/etcd/keys.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func ExecuteKeys(ctx context.Context, options *KeysOptions) error {
}

// create new etcd backend & list mappings
keyValues, err := etcdClient.List(ctx, options.Prefix, 0)
keyValues, err := etcdClient.List(ctx, options.Prefix)
if err != nil {
return err
}
Expand Down
7 changes: 4 additions & 3 deletions hack/schema/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@ import (
"gopkg.in/yaml.v3"
)

const OutFile = "chart/values.schema.json"
const ValuesOutFile = "chart/values.yaml"
const (
OutFile = "chart/values.schema.json"
ValuesOutFile = "chart/values.yaml"
)
const (
defsPrefix = "#/$defs/"
externalConfigName = "ExternalConfig"
Expand Down Expand Up @@ -100,7 +102,6 @@ func addPlatformSchema(toSchema *jsonschema.Schema) error {
}

for pair := platformConfigSchema.Properties.Oldest(); pair != nil; pair = pair.Next() {
pair := pair
platformNode.Properties.AddPairs(*pair)
}

Expand Down
2 changes: 0 additions & 2 deletions pkg/cli/add_vcluster_helm.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ func AddVClusterHelm(
var addErrors []error
log.Debugf("trying to add %d vCluster instances to platform", len(vClusters))
for _, vCluster := range vClusters {
vCluster := vCluster
log.Infof("adding %s vCluster to platform", vCluster.Name)
err := addVClusterHelm(ctx, options, globalFlags, vCluster.Name, &vCluster, kubeClient, log)
if err != nil {
Expand Down Expand Up @@ -131,7 +130,6 @@ func addVClusterHelm(

return !lifecycle.IsPaused(vCluster), nil
})

if err != nil {
return fmt.Errorf("error waiting for vCluster to wake up %w", err)
}
Expand Down
62 changes: 23 additions & 39 deletions pkg/etcd/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,17 @@ import (
type Value struct {
Key []byte
Data []byte
Revision int64
Modified int64
}

var (
ErrNotFound = errors.New("etcdwrapper: key not found")
)
var ErrNotFound = errors.New("etcdwrapper: key not found")

type Client interface {
List(ctx context.Context, key string, rev int) ([]Value, error)
Watch(ctx context.Context, key string, rev int) clientv3.WatchChan
List(ctx context.Context, key string) ([]Value, error)
Watch(ctx context.Context, key string) clientv3.WatchChan
Get(ctx context.Context, key string) (Value, error)
Put(ctx context.Context, key string, value []byte) error
Create(ctx context.Context, key string, value []byte) error
Update(ctx context.Context, key string, revision int64, value []byte) error
Delete(ctx context.Context, key string, revision int64) error
Compact(ctx context.Context, revision int64) (int64, error)
Delete(ctx context.Context, key string) error
Close() error
}

Expand Down Expand Up @@ -102,12 +97,12 @@ func New(ctx context.Context, certificates *Certificates, endpoints ...string) (
}, nil
}

func (c *client) Watch(ctx context.Context, key string, rev int) clientv3.WatchChan {
return c.c.Watch(ctx, key, clientv3.WithPrefix(), clientv3.WithRev(int64(rev)))
func (c *client) Watch(ctx context.Context, key string) clientv3.WatchChan {
return c.c.Watch(ctx, key, clientv3.WithPrefix(), clientv3.WithPrevKV(), clientv3.WithProgressNotify())
}

func (c *client) List(ctx context.Context, key string, rev int) ([]Value, error) {
resp, err := c.c.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithRev(int64(rev)))
func (c *client) List(ctx context.Context, key string) ([]Value, error) {
resp, err := c.c.Get(ctx, key, clientv3.WithPrefix(), clientv3.WithRev(int64(0)))
if err != nil {
return nil, err
}
Expand All @@ -117,7 +112,7 @@ func (c *client) List(ctx context.Context, key string, rev int) ([]Value, error)
vals = append(vals, Value{
Key: kv.Key,
Data: kv.Value,
Revision: kv.ModRevision,
Modified: kv.ModRevision,
})
}

Expand All @@ -134,7 +129,7 @@ func (c *client) Get(ctx context.Context, key string) (Value, error) {
return Value{
Key: resp.Kvs[0].Key,
Data: resp.Kvs[0].Value,
Revision: resp.Kvs[0].ModRevision,
Modified: resp.Kvs[0].ModRevision,
}, nil
}

Expand All @@ -146,10 +141,10 @@ func (c *client) Put(ctx context.Context, key string, value []byte) error {
if err != nil && !errors.Is(err, ErrNotFound) {
return err
}
if val.Revision == 0 {
if val.Modified == 0 {
return c.Create(ctx, key, value)
}
return c.Update(ctx, key, val.Revision, value)
return c.Update(ctx, key, val.Modified, value)
}

func (c *client) Create(ctx context.Context, key string, value []byte) error {
Expand All @@ -161,8 +156,9 @@ func (c *client) Create(ctx context.Context, key string, value []byte) error {
return err
}
if !resp.Succeeded {
return fmt.Errorf("key exists")
return errors.New("key exists")
}

return nil
}

Expand All @@ -178,30 +174,18 @@ func (c *client) Update(ctx context.Context, key string, revision int64, value [
if !resp.Succeeded {
return fmt.Errorf("revision %d doesnt match", revision)
}
return nil
}

func (c *client) Delete(ctx context.Context, key string, revision int64) error {
resp, err := c.c.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", revision)).
Then(clientv3.OpDelete(key)).
Else(clientv3.OpGet(key)).
Commit()
if err != nil {
return err
}
if !resp.Succeeded {
return fmt.Errorf("revision %d doesnt match", revision)
}
return nil
}

func (c *client) Compact(ctx context.Context, revision int64) (int64, error) {
resp, err := c.c.Compact(ctx, revision)
if resp != nil {
return resp.Header.GetRevision(), err
}
return 0, err
func (c *client) Delete(ctx context.Context, key string) error {
_, err := c.c.Txn(ctx).
Then(
clientv3.OpGet(key),
clientv3.OpDelete(key),
).
Commit()
return err
}

func (c *client) Close() error {
Expand Down
2 changes: 1 addition & 1 deletion pkg/mappings/generic/recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ func RecordMapping(ctx *synccontext.SyncContext, pName, vName types.NamespacedNa
}

// record the reference
err := ctx.Mappings.Store().AddReference(ctx, synccontext.NameMapping{
err := ctx.Mappings.Store().AddReferenceAndSave(ctx, synccontext.NameMapping{
GroupVersionKind: gvk,

HostName: pName,
Expand Down
5 changes: 2 additions & 3 deletions pkg/mappings/store/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@ type Backend interface {
}

type BackendWatchResponse struct {
Err error
Events []*BackendWatchEvent

Err error
}

type BackendWatchEvent struct {
Type BackendWatchEventType
Mapping *Mapping
Type BackendWatchEventType
}

type BackendWatchEventType string
Expand Down
44 changes: 32 additions & 12 deletions pkg/mappings/store/etcd_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,17 @@ type etcdBackend struct {
}

func (m *etcdBackend) List(ctx context.Context) ([]*Mapping, error) {
mappings, err := m.etcdClient.List(ctx, mappingsPrefix, 0)
mappings, err := m.etcdClient.List(ctx, mappingsPrefix)
if err != nil {
return nil, fmt.Errorf("list mappings")
return nil, fmt.Errorf("etcd backend: list mappings: %w", err)
}

retMappings := make([]*Mapping, 0, len(mappings))
for _, kv := range mappings {
retMapping := &Mapping{}
err = json.Unmarshal(kv.Data, retMapping)
if err != nil {
return nil, fmt.Errorf("parse mapping %s: %w", string(kv.Key), err)
return nil, fmt.Errorf("etcd backend: parse mapping %s: %w", string(kv.Key), err)
}

retMappings = append(retMappings, retMapping)
Expand All @@ -46,32 +46,52 @@ func (m *etcdBackend) List(ctx context.Context) ([]*Mapping, error) {

func (m *etcdBackend) Watch(ctx context.Context) <-chan BackendWatchResponse {
responseChan := make(chan BackendWatchResponse)
watchChan := m.etcdClient.Watch(ctx, mappingsPrefix, 0)
watchChan := m.etcdClient.Watch(ctx, mappingsPrefix)
go func() {
defer close(responseChan)

for event := range watchChan {
if event.Canceled {
switch {
case event.Canceled:
responseChan <- BackendWatchResponse{
Err: event.Err(),
}
} else if len(event.Events) > 0 {
case event.IsProgressNotify():
klog.FromContext(ctx).V(1).Info("received progress notify from etcd")
case len(event.Events) > 0:
retEvents := make([]*BackendWatchEvent, 0, len(event.Events))
for _, singleEvent := range event.Events {
var eventType BackendWatchEventType
if singleEvent.Type == mvccpb.PUT {
switch singleEvent.Type {
case mvccpb.PUT:
eventType = BackendWatchEventTypeUpdate
} else if singleEvent.Type == mvccpb.DELETE {
case mvccpb.DELETE:
eventType = BackendWatchEventTypeDelete
} else {
default:
continue
}

// parse mapping
retMapping := &Mapping{}
err := json.Unmarshal(singleEvent.Kv.Value, retMapping)

value := singleEvent.Kv.Value
if len(value) == 0 && singleEvent.Type == mvccpb.DELETE && singleEvent.PrevKv != nil {
value = singleEvent.PrevKv.Value
}

err := json.Unmarshal(value, retMapping)
if err != nil {
klog.FromContext(ctx).Info("Error decoding event", "key", string(singleEvent.Kv.Key), "error", err.Error())
klog.FromContext(ctx).Info(
"etcd backend: Error decoding event",
"key", string(singleEvent.Kv.Key),
"singleEventValue", string(singleEvent.Kv.Value),
"eventType", eventType,
"error", err.Error(),
)
// FIXME(ThomasK33): This leads to mapping leaks. Etcd might have
// already compacted the previous version. Thus we would never
// receive any information of the mapping that was deleted apart from its keys.
// And because there is no mapping, we are omitting deleting it from the mapping stores.
continue
}

Expand Down Expand Up @@ -101,7 +121,7 @@ func (m *etcdBackend) Save(ctx context.Context, mapping *Mapping) error {
}

func (m *etcdBackend) Delete(ctx context.Context, mapping *Mapping) error {
return m.etcdClient.Delete(ctx, mappingToKey(mapping), 0)
return m.etcdClient.Delete(ctx, mappingToKey(mapping))
}

func mappingToKey(mapping *Mapping) string {
Expand Down
Loading

0 comments on commit 630cfa6

Please sign in to comment.