Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add cachedimage pulling progress status #402

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions api/kuik/v1alpha1/cachedimage_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,19 @@
Count int `json:"count,omitempty"`
}

type Progress struct {
Total int64 `json:"total,omitempty"`
Available int64 `json:"available,omitempty"`
}

// CachedImageStatus defines the observed state of CachedImage
type CachedImageStatus struct {
IsCached bool `json:"isCached,omitempty"`
Phase string `json:"phase,omitempty"`
UsedBy UsedBy `json:"usedBy,omitempty"`

Progress Progress `json:"progress,omitempty"`

Digest string `json:"digest,omitempty"`
UpstreamDigest string `json:"upstreamDigest,omitempty"`
UpToDate bool `json:"upToDate,omitempty"`
Expand Down Expand Up @@ -71,5 +78,5 @@
}

func init() {
SchemeBuilder.Register(&CachedImage{}, &CachedImageList{})

Check failure on line 81 in api/kuik/v1alpha1/cachedimage_types.go

View workflow job for this annotation

GitHub Actions / Static Analysis

cannot use &CachedImage{} (value of type *CachedImage) as "k8s.io/apimachinery/pkg/runtime".Object value in argument to SchemeBuilder.Register: *CachedImage does not implement "k8s.io/apimachinery/pkg/runtime".Object (missing method DeepCopyObject)

Check failure on line 81 in api/kuik/v1alpha1/cachedimage_types.go

View workflow job for this annotation

GitHub Actions / Static Analysis

cannot use &CachedImageList{} (value of type *CachedImageList) as "k8s.io/apimachinery/pkg/runtime".Object value in argument to SchemeBuilder.Register: *CachedImageList does not implement "k8s.io/apimachinery/pkg/runtime".Object (missing method DeepCopyObject)

Check failure on line 81 in api/kuik/v1alpha1/cachedimage_types.go

View workflow job for this annotation

GitHub Actions / Static Analysis

cannot use &CachedImage{} (value of type *CachedImage) as "k8s.io/apimachinery/pkg/runtime".Object value in argument to SchemeBuilder.Register: *CachedImage does not implement "k8s.io/apimachinery/pkg/runtime".Object (missing method DeepCopyObject)
}
9 changes: 9 additions & 0 deletions config/crd/bases/kuik.enix.io_cachedimages.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,15 @@ spec:
type: string
phase:
type: string
progress:
properties:
available:
format: int64
type: integer
total:
format: int64
type: integer
type: object
upToDate:
type: boolean
upstreamDigest:
Expand Down
12 changes: 12 additions & 0 deletions helm/kube-image-keeper/crds/cachedimage-crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ spec:
- jsonPath: .metadata.creationTimestamp
name: Age
type: date
- jsonPath: .status.progress.available
name: Downloaded
type: integer
name: v1alpha1
schema:
openAPIV3Schema:
Expand Down Expand Up @@ -93,6 +96,15 @@ spec:
type: boolean
upstreamDigest:
type: string
progress:
type: object
properties:
total:
type: integer
description: Total size of the compressed blob in bytes, including all layers.
available:
type: integer
description: Total downloaded / available size of the compressed blob in bytes, including all layers.
usedBy:
properties:
count:
Expand Down
27 changes: 26 additions & 1 deletion internal/controller/kuik/cachedimage_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/distribution/reference"
"github.com/go-logr/logr"
v1 "github.com/google/go-containerregistry/pkg/v1"
"github.com/google/go-containerregistry/pkg/v1/remote"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -343,7 +344,31 @@ func (r *CachedImageReconciler) cacheImage(cachedImage *kuikv1alpha1.CachedImage
return err
}

err = registry.CacheImage(cachedImage.Spec.SourceImage, desc, r.Architectures)
lastUpdateTime := time.Now()
lastWriteComplete := int64(0)
onUpdated := func(update v1.Update) {
needUpdate := false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the update, but now the code isn't compiling...

Error: internal/controller/kuik/cachedimage_controller.go:350:3: needUpdate declared and not used

if lastWriteComplete != update.Complete && update.Complete == update.Total {
// Update is needed whenever the writing complmetes.
aDisplayName marked this conversation as resolved.
Show resolved Hide resolved
needUpdate = true
}

if time.Since(lastUpdateTime).Seconds() >= 5 {
// Update is needed if last update is more than 5 seconds ago
needUpdate = true
}
if needUpdate {
aDisplayName marked this conversation as resolved.
Show resolved Hide resolved
updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) {
cachedImage.Status.Progress.Total = update.Total
cachedImage.Status.Progress.Available = update.Complete
})

lastUpdateTime = time.Now()
}
lastWriteComplete = update.Complete
}

err = registry.CacheImage(cachedImage.Spec.SourceImage, desc, r.Architectures, onUpdated)

statusErr = updateStatus(r.Client, cachedImage, desc, func(status *kuikv1alpha1.CachedImageStatus) {
if err == nil {
Expand Down
15 changes: 12 additions & 3 deletions internal/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,21 @@ func DeleteImage(imageName string) error {
return remote.Delete(digest)
}

func CacheImage(imageName string, desc *remote.Descriptor, architectures []string) error {
func CacheImage(imageName string, desc *remote.Descriptor, architectures []string, callback func(v1.Update)) error {
destRef, err := parseLocalReference(imageName)
if err != nil {
return err
}

progressUpdate := make(chan v1.Update, 100)
go func() {
for update := range progressUpdate {
if callback != nil {
callback(update)
}
}
}()

switch desc.MediaType {
case types.OCIImageIndex, types.DockerManifestList:
index, err := desc.ImageIndex()
Expand All @@ -145,15 +154,15 @@ func CacheImage(imageName string, desc *remote.Descriptor, architectures []strin
return true
})

if err := remote.WriteIndex(destRef, filteredIndex); err != nil {
if err := remote.WriteIndex(destRef, filteredIndex, remote.WithProgress(progressUpdate)); err != nil {
return err
}
default:
image, err := desc.Image()
if err != nil {
return err
}
if err := remote.Write(destRef, image); err != nil {
if err := remote.Write(destRef, image, remote.WithProgress(progressUpdate)); err != nil {
return err
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/registry/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func Test_CacheImage(t *testing.T) {
desc, err := remote.Get(sourceRef)
g.Expect(err).To(BeNil())

err = CacheImage(imageName, desc, []string{"amd64"})
err = CacheImage(imageName, desc, []string{"amd64"}, nil)
if tt.wantErr != "" {
g.Expect(err).To(BeAssignableToTypeOf(tt.errType))
g.Expect(err).To(MatchError(ContainSubstring(tt.wantErr)))
Expand Down
Loading