From 57686b21ed003f90641947785b2d114b884747ab Mon Sep 17 00:00:00 2001 From: aDisplayName Date: Tue, 27 Aug 2024 10:10:50 -0500 Subject: [PATCH 1/3] feat: Add progress update via cachedimage CRD status --- api/kuik/v1alpha1/cachedimage_types.go | 7 +++++ .../crd/bases/kuik.enix.io_cachedimages.yaml | 9 +++++++ .../crds/cachedimage-crd.yaml | 12 +++++++++ .../controller/kuik/cachedimage_controller.go | 27 ++++++++++++++++++- internal/registry/registry.go | 15 ++++++++--- internal/registry/registry_test.go | 2 +- 6 files changed, 67 insertions(+), 5 deletions(-) diff --git a/api/kuik/v1alpha1/cachedimage_types.go b/api/kuik/v1alpha1/cachedimage_types.go index 9ddf41ef..0832925d 100644 --- a/api/kuik/v1alpha1/cachedimage_types.go +++ b/api/kuik/v1alpha1/cachedimage_types.go @@ -26,12 +26,19 @@ type UsedBy struct { Count int `json:"count,omitempty"` } +type Progress struct { + Total int64 `json:"total,omitempty"` + Available int64 `json:"available,omitempty"` +} + // CachedImageStatus defines the observed state of CachedImage type CachedImageStatus struct { IsCached bool `json:"isCached,omitempty"` Phase string `json:"phase,omitempty"` UsedBy UsedBy `json:"usedBy,omitempty"` + Progress Progress `json:"progress,omitempty"` + Digest string `json:"digest,omitempty"` UpstreamDigest string `json:"upstreamDigest,omitempty"` UpToDate bool `json:"upToDate,omitempty"` diff --git a/config/crd/bases/kuik.enix.io_cachedimages.yaml b/config/crd/bases/kuik.enix.io_cachedimages.yaml index efbe1f37..0dbff7f0 100644 --- a/config/crd/bases/kuik.enix.io_cachedimages.yaml +++ b/config/crd/bases/kuik.enix.io_cachedimages.yaml @@ -90,6 +90,15 @@ spec: type: string phase: type: string + progress: + properties: + available: + format: int64 + type: integer + total: + format: int64 + type: integer + type: object upToDate: type: boolean upstreamDigest: diff --git a/helm/kube-image-keeper/crds/cachedimage-crd.yaml b/helm/kube-image-keeper/crds/cachedimage-crd.yaml index cb9a882e..456a0611 100644 --- a/helm/kube-image-keeper/crds/cachedimage-crd.yaml +++ b/helm/kube-image-keeper/crds/cachedimage-crd.yaml @@ -34,6 +34,9 @@ spec: - jsonPath: .metadata.creationTimestamp name: Age type: date + - jsonPath: .status.progress.available + name: Downloaded + type: integer name: v1alpha1 schema: openAPIV3Schema: @@ -93,6 +96,15 @@ spec: type: boolean upstreamDigest: type: string + progress: + type: object + properties: + total: + type: integer + description: Total size of the compressed blob in bytes, including all layers. + available: + type: integer + description: Total downloaded / available size of the compressed blob in bytes, including all layers. usedBy: properties: count: diff --git a/internal/controller/kuik/cachedimage_controller.go b/internal/controller/kuik/cachedimage_controller.go index 8aa44153..1816346b 100644 --- a/internal/controller/kuik/cachedimage_controller.go +++ b/internal/controller/kuik/cachedimage_controller.go @@ -9,6 +9,7 @@ import ( "github.com/distribution/reference" "github.com/go-logr/logr" + v1 "github.com/google/go-containerregistry/pkg/v1" "github.com/google/go-containerregistry/pkg/v1/remote" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -343,7 +344,31 @@ func (r *CachedImageReconciler) cacheImage(cachedImage *kuikv1alpha1.CachedImage return err } - err = registry.CacheImage(cachedImage.Spec.SourceImage, desc, r.Architectures) + lastUpdateTime := time.Now() + lastWriteComplete := int64(0) + onUpdated := func(update v1.Update) { + needUpdate := false + if lastWriteComplete != update.Complete && update.Complete == update.Total { + // Update is needed whenever the writing complmetes. + needUpdate = true + } + + if time.Since(lastUpdateTime).Seconds() >= 5 { + // Update is needed if last update is more than 5 seconds ago + needUpdate = true + } + if needUpdate { + updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) { + cachedImage.Status.Progress.Total = update.Total + cachedImage.Status.Progress.Available = update.Complete + }) + + lastUpdateTime = time.Now() + } + lastWriteComplete = update.Complete + } + + err = registry.CacheImage(cachedImage.Spec.SourceImage, desc, r.Architectures, onUpdated) statusErr = updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) { if err == nil { diff --git a/internal/registry/registry.go b/internal/registry/registry.go index 3c187601..5932f6fb 100644 --- a/internal/registry/registry.go +++ b/internal/registry/registry.go @@ -123,12 +123,21 @@ func DeleteImage(imageName string) error { return remote.Delete(digest) } -func CacheImage(imageName string, desc *remote.Descriptor, architectures []string) error { +func CacheImage(imageName string, desc *remote.Descriptor, architectures []string, callback func(v1.Update)) error { destRef, err := parseLocalReference(imageName) if err != nil { return err } + progressUpdate := make(chan v1.Update, 100) + go func() { + for update := range progressUpdate { + if callback != nil { + callback(update) + } + } + }() + switch desc.MediaType { case types.OCIImageIndex, types.DockerManifestList: index, err := desc.ImageIndex() @@ -145,7 +154,7 @@ func CacheImage(imageName string, desc *remote.Descriptor, architectures []strin return true }) - if err := remote.WriteIndex(destRef, filteredIndex); err != nil { + if err := remote.WriteIndex(destRef, filteredIndex, remote.WithProgress(progressUpdate)); err != nil { return err } default: @@ -153,7 +162,7 @@ func CacheImage(imageName string, desc *remote.Descriptor, architectures []strin if err != nil { return err } - if err := remote.Write(destRef, image); err != nil { + if err := remote.Write(destRef, image, remote.WithProgress(progressUpdate)); err != nil { return err } } diff --git a/internal/registry/registry_test.go b/internal/registry/registry_test.go index a7635388..fdce494d 100644 --- a/internal/registry/registry_test.go +++ b/internal/registry/registry_test.go @@ -320,7 +320,7 @@ func Test_CacheImage(t *testing.T) { desc, err := remote.Get(sourceRef) g.Expect(err).To(BeNil()) - err = CacheImage(imageName, desc, []string{"amd64"}) + err = CacheImage(imageName, desc, []string{"amd64"}, nil) if tt.wantErr != "" { g.Expect(err).To(BeAssignableToTypeOf(tt.errType)) g.Expect(err).To(MatchError(ContainSubstring(tt.wantErr))) From cb35fab915e400737d66babcb02ee768bcbf0360 Mon Sep 17 00:00:00 2001 From: aDisplayName Date: Wed, 28 Aug 2024 13:48:31 -0500 Subject: [PATCH 2/3] fix: Fix typo. Consolidate update condition clause --- internal/controller/kuik/cachedimage_controller.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/internal/controller/kuik/cachedimage_controller.go b/internal/controller/kuik/cachedimage_controller.go index 1816346b..2b0e76d8 100644 --- a/internal/controller/kuik/cachedimage_controller.go +++ b/internal/controller/kuik/cachedimage_controller.go @@ -348,16 +348,11 @@ func (r *CachedImageReconciler) cacheImage(cachedImage *kuikv1alpha1.CachedImage lastWriteComplete := int64(0) onUpdated := func(update v1.Update) { needUpdate := false - if lastWriteComplete != update.Complete && update.Complete == update.Total { - // Update is needed whenever the writing complmetes. - needUpdate = true - } + + isCompleted := lastWriteComplete != update.Complete && update.Complete == update.Total - if time.Since(lastUpdateTime).Seconds() >= 5 { - // Update is needed if last update is more than 5 seconds ago - needUpdate = true - } - if needUpdate { + if time.Since(lastUpdateTime).Seconds() >= 5 || isCompleted { + // Update is needed if last update is more than 5 seconds ago, or the current progress indicates the remote writing has just completed. updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) { cachedImage.Status.Progress.Total = update.Total cachedImage.Status.Progress.Available = update.Complete From 61be8a5d5569d839370555d95e07c24a88b3eab3 Mon Sep 17 00:00:00 2001 From: aDisplayName Date: Thu, 29 Aug 2024 09:46:16 -0500 Subject: [PATCH 3/3] fix: build error --- internal/controller/kuik/cachedimage_controller.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/internal/controller/kuik/cachedimage_controller.go b/internal/controller/kuik/cachedimage_controller.go index 2b0e76d8..cfcd132c 100644 --- a/internal/controller/kuik/cachedimage_controller.go +++ b/internal/controller/kuik/cachedimage_controller.go @@ -347,8 +347,6 @@ func (r *CachedImageReconciler) cacheImage(cachedImage *kuikv1alpha1.CachedImage lastUpdateTime := time.Now() lastWriteComplete := int64(0) onUpdated := func(update v1.Update) { - needUpdate := false - isCompleted := lastWriteComplete != update.Complete && update.Complete == update.Total if time.Since(lastUpdateTime).Seconds() >= 5 || isCompleted {