Skip to content

Commit

Permalink
feat: Add progress update via cachedimage CRD status
Browse files Browse the repository at this point in the history
  • Loading branch information
aDisplayName committed Aug 27, 2024
1 parent 8c437be commit 57686b2
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 5 deletions.
7 changes: 7 additions & 0 deletions api/kuik/v1alpha1/cachedimage_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@ type UsedBy struct {
Count int `json:"count,omitempty"`
}

type Progress struct {
Total int64 `json:"total,omitempty"`
Available int64 `json:"available,omitempty"`
}

// CachedImageStatus defines the observed state of CachedImage
type CachedImageStatus struct {
IsCached bool `json:"isCached,omitempty"`
Phase string `json:"phase,omitempty"`
UsedBy UsedBy `json:"usedBy,omitempty"`

Progress Progress `json:"progress,omitempty"`

Digest string `json:"digest,omitempty"`
UpstreamDigest string `json:"upstreamDigest,omitempty"`
UpToDate bool `json:"upToDate,omitempty"`
Expand Down
9 changes: 9 additions & 0 deletions config/crd/bases/kuik.enix.io_cachedimages.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ spec:
type: string
phase:
type: string
progress:
properties:
available:
format: int64
type: integer
total:
format: int64
type: integer
type: object
upToDate:
type: boolean
upstreamDigest:
Expand Down
12 changes: 12 additions & 0 deletions helm/kube-image-keeper/crds/cachedimage-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ spec:
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
- jsonPath: .status.progress.available
name: Downloaded
type: integer
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -93,6 +96,15 @@ spec:
type: boolean
upstreamDigest:
type: string
progress:
type: object
properties:
total:
type: integer
description: Total size of the compressed blob in bytes, including all layers.
available:
type: integer
description: Total downloaded / available size of the compressed blob in bytes, including all layers.
usedBy:
properties:
count:
Expand Down
27 changes: 26 additions & 1 deletion internal/controller/kuik/cachedimage_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/distribution/reference"
"github.com/go-logr/logr"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/remote"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -343,7 +344,31 @@ func (r *CachedImageReconciler) cacheImage(cachedImage *kuikv1alpha1.CachedImage
return err
}

err = registry.CacheImage(cachedImage.Spec.SourceImage, desc, r.Architectures)
lastUpdateTime := time.Now()
lastWriteComplete := int64(0)
onUpdated := func(update v1.Update) {
needUpdate := false
if lastWriteComplete != update.Complete && update.Complete == update.Total {
// Update is needed whenever the writing complmetes.
needUpdate = true
}

if time.Since(lastUpdateTime).Seconds() >= 5 {
// Update is needed if last update is more than 5 seconds ago
needUpdate = true
}
if needUpdate {
updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) {
cachedImage.Status.Progress.Total = update.Total
cachedImage.Status.Progress.Available = update.Complete
})

lastUpdateTime = time.Now()
}
lastWriteComplete = update.Complete
}

err = registry.CacheImage(cachedImage.Spec.SourceImage, desc, r.Architectures, onUpdated)

statusErr = updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) {
if err == nil {
Expand Down
15 changes: 12 additions & 3 deletions internal/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,21 @@ func DeleteImage(imageName string) error {
return remote.Delete(digest)
}

func CacheImage(imageName string, desc *remote.Descriptor, architectures []string) error {
func CacheImage(imageName string, desc *remote.Descriptor, architectures []string, callback func(v1.Update)) error {
destRef, err := parseLocalReference(imageName)
if err != nil {
return err
}

progressUpdate := make(chan v1.Update, 100)
go func() {
for update := range progressUpdate {
if callback != nil {
callback(update)
}
}
}()

switch desc.MediaType {
case types.OCIImageIndex, types.DockerManifestList:
index, err := desc.ImageIndex()
Expand All @@ -145,15 +154,15 @@ func CacheImage(imageName string, desc *remote.Descriptor, architectures []strin
return true
})

if err := remote.WriteIndex(destRef, filteredIndex); err != nil {
if err := remote.WriteIndex(destRef, filteredIndex, remote.WithProgress(progressUpdate)); err != nil {
return err
}
default:
image, err := desc.Image()
if err != nil {
return err
}
if err := remote.Write(destRef, image); err != nil {
if err := remote.Write(destRef, image, remote.WithProgress(progressUpdate)); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func Test_CacheImage(t *testing.T) {
desc, err := remote.Get(sourceRef)
g.Expect(err).To(BeNil())

err = CacheImage(imageName, desc, []string{"amd64"})
err = CacheImage(imageName, desc, []string{"amd64"}, nil)
if tt.wantErr != "" {
g.Expect(err).To(BeAssignableToTypeOf(tt.errType))
g.Expect(err).To(MatchError(ContainSubstring(tt.wantErr)))
Expand Down

0 comments on commit 57686b2

Please sign in to comment.