Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add cachedimage pulling progress status #402

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/kuik/v1alpha1/cachedimage_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@ type UsedBy struct {
Count int `json:"count,omitempty"`
}

type Progress struct {
Total int64 `json:"total,omitempty"`
Available int64 `json:"available,omitempty"`
}

// CachedImageStatus defines the observed state of CachedImage
type CachedImageStatus struct {
IsCached bool `json:"isCached,omitempty"`
Phase string `json:"phase,omitempty"`
UsedBy UsedBy `json:"usedBy,omitempty"`

Progress Progress `json:"progress,omitempty"`

Digest string `json:"digest,omitempty"`
UpstreamDigest string `json:"upstreamDigest,omitempty"`
UpToDate bool `json:"upToDate,omitempty"`
Expand Down
9 changes: 9 additions & 0 deletions config/crd/bases/kuik.enix.io_cachedimages.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ spec:
type: string
phase:
type: string
progress:
properties:
available:
format: int64
type: integer
total:
format: int64
type: integer
type: object
upToDate:
type: boolean
upstreamDigest:
Expand Down
12 changes: 12 additions & 0 deletions helm/kube-image-keeper/crds/cachedimage-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ spec:
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
- jsonPath: .status.progress.available
name: Downloaded
type: integer
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -93,6 +96,15 @@ spec:
type: boolean
upstreamDigest:
type: string
progress:
type: object
properties:
total:
type: integer
description: Total size of the compressed blob in bytes, including all layers.
available:
type: integer
description: Total downloaded / available size of the compressed blob in bytes, including all layers.
usedBy:
properties:
count:
Expand Down
20 changes: 19 additions & 1 deletion internal/controller/kuik/cachedimage_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/distribution/reference"
"github.com/go-logr/logr"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/remote"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -343,7 +344,24 @@ func (r *CachedImageReconciler) cacheImage(cachedImage *kuikv1alpha1.CachedImage
return err
}

err = registry.CacheImage(cachedImage.Spec.SourceImage, desc, r.Architectures)
lastUpdateTime := time.Now()
lastWriteComplete := int64(0)
onUpdated := func(update v1.Update) {
isCompleted := lastWriteComplete != update.Complete && update.Complete == update.Total

if time.Since(lastUpdateTime).Seconds() >= 5 || isCompleted {
// Update is needed if last update is more than 5 seconds ago, or the current progress indicates the remote writing has just completed.
updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) {
cachedImage.Status.Progress.Total = update.Total
cachedImage.Status.Progress.Available = update.Complete
})

lastUpdateTime = time.Now()
}
lastWriteComplete = update.Complete
}

err = registry.CacheImage(cachedImage.Spec.SourceImage, desc, r.Architectures, onUpdated)

statusErr = updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) {
if err == nil {
Expand Down
15 changes: 12 additions & 3 deletions internal/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,21 @@ func DeleteImage(imageName string) error {
return remote.Delete(digest)
}

func CacheImage(imageName string, desc *remote.Descriptor, architectures []string) error {
func CacheImage(imageName string, desc *remote.Descriptor, architectures []string, callback func(v1.Update)) error {
destRef, err := parseLocalReference(imageName)
if err != nil {
return err
}

progressUpdate := make(chan v1.Update, 100)
go func() {
for update := range progressUpdate {
if callback != nil {
callback(update)
}
}
}()

switch desc.MediaType {
case types.OCIImageIndex, types.DockerManifestList:
index, err := desc.ImageIndex()
Expand All @@ -145,15 +154,15 @@ func CacheImage(imageName string, desc *remote.Descriptor, architectures []strin
return true
})

if err := remote.WriteIndex(destRef, filteredIndex); err != nil {
if err := remote.WriteIndex(destRef, filteredIndex, remote.WithProgress(progressUpdate)); err != nil {
return err
}
default:
image, err := desc.Image()
if err != nil {
return err
}
if err := remote.Write(destRef, image); err != nil {
if err := remote.Write(destRef, image, remote.WithProgress(progressUpdate)); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func Test_CacheImage(t *testing.T) {
desc, err := remote.Get(sourceRef)
g.Expect(err).To(BeNil())

err = CacheImage(imageName, desc, []string{"amd64"})
err = CacheImage(imageName, desc, []string{"amd64"}, nil)
if tt.wantErr != "" {
g.Expect(err).To(BeAssignableToTypeOf(tt.errType))
g.Expect(err).To(MatchError(ContainSubstring(tt.wantErr)))
Expand Down