From c0aa74e2df724891857f4623146b22faa8dcc63e Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Wed, 2 Aug 2023 17:31:02 -0700 Subject: [PATCH 1/9] wip: Add k8s events to task phase updates Signed-off-by: Andrew Dye --- go.mod | 2 + pkg/controller/nodes/task/handler.go | 1 + .../nodes/task/k8s/event_watcher.go | 117 +++++++++ .../nodes/task/k8s/event_watcher_test.go | 239 ++++++++++++++++++ .../nodes/task/k8s/plugin_manager.go | 51 +++- 5 files changed, 400 insertions(+), 10 deletions(-) create mode 100644 pkg/controller/nodes/task/k8s/event_watcher.go create mode 100644 pkg/controller/nodes/task/k8s/event_watcher_test.go diff --git a/go.mod b/go.mod index c35782dc3..adc598be8 100644 --- a/go.mod +++ b/go.mod @@ -148,3 +148,5 @@ require ( replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.5.11-0.20230614183933-d56d4d37bf34 + +replace github.com/flyteorg/flyteplugins => /Users/andrew/dev/flyteorg/flyteplugins diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index c3b8833fa..88e3b00f9 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -711,6 +711,7 @@ func (t Handler) Handle(ctx context.Context, nCtx interfaces.NodeExecutionContex if err != nil { return handler.UnknownTransition, err } + logger.Infof(ctx, "Recording buffered event [%s]", evInfo.Reason) if err := nCtx.EventsRecorder().RecordTaskEvent(ctx, evInfo, t.eventConfig); err != nil { logger.Errorf(ctx, "Event recording failed for Plugin [%s], eventPhase [%s], error :%s", p.GetID(), evInfo.Phase.String(), err.Error()) // Check for idempotency diff --git a/pkg/controller/nodes/task/k8s/event_watcher.go b/pkg/controller/nodes/task/k8s/event_watcher.go new file mode 100644 index 000000000..60fd3d93e --- /dev/null +++ b/pkg/controller/nodes/task/k8s/event_watcher.go @@ -0,0 +1,117 @@ +package k8s + +import ( + "context" + "sort" + "time" + + "github.com/flyteorg/flyteplugins/go/tasks/errors" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + "github.com/flyteorg/flytepropeller/pkg/utils" + "github.com/flyteorg/flytestdlib/logger" + eventsv1 "k8s.io/api/events/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" + informerEventsv1 "k8s.io/client-go/informers/events/v1" +) + +type EventWatcher interface { + List(objectNsName types.NamespacedName, createdAfter time.Time) []*eventsv1.Event +} + +type eventWatcher struct { + ctx context.Context + informer informerEventsv1.EventInformer + objectEvents map[types.NamespacedName]eventSet +} + +type eventSet map[types.NamespacedName]*eventsv1.Event + +func (e *eventWatcher) OnAdd(obj interface{}) { + event := obj.(*eventsv1.Event) + objectNsName := types.NamespacedName{Namespace: event.Regarding.Namespace, Name: event.Regarding.Name} + eventNsName := types.NamespacedName{Namespace: event.Namespace, Name: event.Name} + events, ok := e.objectEvents[objectNsName] + if !ok { + events = make(eventSet) + e.objectEvents[objectNsName] = events + } + if _, ok := events[eventNsName]; ok { + logger.Warnf(e.ctx, "Event add [%s/%s] received for object [%s/%s] that already exists in the cache", + event.Namespace, event.Name, event.Regarding.Namespace, event.Regarding.Name) + } + events[eventNsName] = event +} + +func (e *eventWatcher) OnUpdate(_, newObj interface{}) { + event := newObj.(*eventsv1.Event) + objectNsName := types.NamespacedName{Namespace: event.Regarding.Namespace, Name: event.Regarding.Name} + eventNsName := types.NamespacedName{Namespace: event.Namespace, Name: event.Name} + events, ok := e.objectEvents[objectNsName] + if !ok { + logger.Warn(e.ctx, "Event update [%s/%s] received for object [%s/%s] that does not exist in the cache", + event.Namespace, event.Name, event.Regarding.Namespace, event.Regarding.Name) + events = make(eventSet) + e.objectEvents[objectNsName] = events + } + events[eventNsName] = event +} + +func (e *eventWatcher) OnDelete(obj interface{}) { + event := obj.(*eventsv1.Event) + objectNsName := types.NamespacedName{Namespace: event.Regarding.Namespace, Name: event.Regarding.Name} + eventNsName := types.NamespacedName{Namespace: event.Namespace, Name: event.Name} + events, ok := e.objectEvents[objectNsName] + if !ok { + logger.Warn(e.ctx, "Event delete [%s/%s] received for object [%s/%s] that does not exist in the cache", + event.Namespace, event.Name, event.Regarding.Namespace, event.Regarding.Name) + return + } + delete(events, eventNsName) + if len(events) == 0 { + delete(e.objectEvents, objectNsName) + } +} + +// List returns all events for the given object that were created after the given time, sorted by creation time. +func (e *eventWatcher) List(objectNsName types.NamespacedName, createdAfter time.Time) []*eventsv1.Event { + events, ok := e.objectEvents[objectNsName] + if !ok { + return []*eventsv1.Event{} + } + result := make([]*eventsv1.Event, 0, len(events)) + for _, event := range events { + if event.CreationTimestamp.Time.After(createdAfter) { + result = append(result, event) + } + } + sort.SliceStable(result, func(i, j int) bool { + return result[i].CreationTimestamp.Time.Before(result[j].CreationTimestamp.Time) + }) + return result +} + +func NewEventWatcher(ctx context.Context, gvk schema.GroupVersionKind) (EventWatcher, error) { + kubeClient, _, err := utils.GetKubeConfig(ctx, config.GetConfig()) + if err != nil { + return nil, errors.Wrapf(errors.PluginInitializationFailed, err, "failed to get kube client") + } + objectSelector := func(opts *metav1.ListOptions) { + opts.FieldSelector = fields.OneTermEqualSelector("regarding.kind", gvk.Kind).String() + } + eventInformer := informers.NewSharedInformerFactoryWithOptions( + kubeClient, 0, informers.WithTweakListOptions(objectSelector)).Events().V1().Events() + watcher := &eventWatcher{ + informer: eventInformer, + objectEvents: make(map[types.NamespacedName]eventSet), + } + eventInformer.Informer().AddEventHandler(watcher) + + go eventInformer.Informer().Run(ctx.Done()) + logger.Debugf(ctx, "Started informer for [%s] events", gvk.Kind) + + return watcher, nil +} diff --git a/pkg/controller/nodes/task/k8s/event_watcher_test.go b/pkg/controller/nodes/task/k8s/event_watcher_test.go new file mode 100644 index 000000000..5fcb00ffe --- /dev/null +++ b/pkg/controller/nodes/task/k8s/event_watcher_test.go @@ -0,0 +1,239 @@ +package k8s + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + + corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +func TestEventWatcher_OnAdd(t *testing.T) { + ctx := context.Background() + now := time.Now() + ew := eventWatcher{ctx: ctx, objectEvents: map[types.NamespacedName]eventSet{ + {Namespace: "ns1", Name: "name1"}: { + {Namespace: "eventns1", Name: "eventname1"}: &eventsv1.Event{EventTime: metav1.NewMicroTime(now.Add(-time.Minute))}, + }, + {Namespace: "ns2", Name: "name2"}: {}, + }} + + t.Run("existing event", func(t *testing.T) { + ew.OnAdd(&eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns1", + Name: "eventname1", + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns1", + Name: "name1", + }, + EventTime: metav1.NewMicroTime(now), + }) + + assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns1", Name: "name1"}], 1) + assert.Equal(t, now, ew.objectEvents[types.NamespacedName{Namespace: "ns1", Name: "name1"}][types.NamespacedName{Namespace: "eventns1", Name: "eventname1"}].EventTime.Time) + }) + + t.Run("new event", func(t *testing.T) { + ew.OnAdd(&eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns2", + Name: "eventname2", + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns2", + Name: "name2", + }, + EventTime: metav1.NewMicroTime(now), + }) + + assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns2", Name: "name2"}], 1) + assert.Equal(t, now, ew.objectEvents[types.NamespacedName{Namespace: "ns2", Name: "name2"}][types.NamespacedName{Namespace: "eventns2", Name: "eventname2"}].EventTime.Time) + }) + + t.Run("new object", func(t *testing.T) { + ew.OnAdd(&eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns3", + Name: "eventname3", + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns3", + Name: "name3", + }, + EventTime: metav1.NewMicroTime(now), + }) + + assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns3", Name: "name3"}], 1) + assert.Equal(t, now, ew.objectEvents[types.NamespacedName{Namespace: "ns3", Name: "name3"}][types.NamespacedName{Namespace: "eventns3", Name: "eventname3"}].EventTime.Time) + }) +} + +func TestEventWatcher_OnUpdate(t *testing.T) { + ctx := context.Background() + now := time.Now() + ew := eventWatcher{ctx: ctx, objectEvents: map[types.NamespacedName]eventSet{ + {Namespace: "ns1", Name: "name1"}: { + {Namespace: "eventns1", Name: "eventname1"}: &eventsv1.Event{EventTime: metav1.NewMicroTime(now.Add(-time.Minute))}, + }, + {Namespace: "ns2", Name: "name2"}: {}, + }} + + t.Run("existing event", func(t *testing.T) { + ew.OnUpdate(&eventsv1.Event{}, &eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns1", + Name: "eventname1", + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns1", + Name: "name1", + }, + EventTime: metav1.NewMicroTime(now), + }) + + assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns1", Name: "name1"}], 1) + assert.Equal(t, now, ew.objectEvents[types.NamespacedName{Namespace: "ns1", Name: "name1"}][types.NamespacedName{Namespace: "eventns1", Name: "eventname1"}].EventTime.Time) + }) + + t.Run("new event", func(t *testing.T) { + ew.OnUpdate(&eventsv1.Event{}, &eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns2", + Name: "eventname2", + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns2", + Name: "name2", + }, + EventTime: metav1.NewMicroTime(now), + }) + + assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns2", Name: "name2"}], 1) + assert.Equal(t, now, ew.objectEvents[types.NamespacedName{Namespace: "ns2", Name: "name2"}][types.NamespacedName{Namespace: "eventns2", Name: "eventname2"}].EventTime.Time) + }) + + t.Run("new object", func(t *testing.T) { + ew.OnUpdate(&eventsv1.Event{}, &eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns3", + Name: "eventname3", + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns3", + Name: "name3", + }, + EventTime: metav1.NewMicroTime(now), + }) + + assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns3", Name: "name3"}], 1) + assert.Equal(t, now, ew.objectEvents[types.NamespacedName{Namespace: "ns3", Name: "name3"}][types.NamespacedName{Namespace: "eventns3", Name: "eventname3"}].EventTime.Time) + }) +} + +func TestEventWatcher_OnDelete(t *testing.T) { + ctx := context.Background() + now := time.Now() + ew := eventWatcher{ctx: ctx, objectEvents: map[types.NamespacedName]eventSet{ + {Namespace: "ns1", Name: "name1"}: { + {Namespace: "eventns1", Name: "eventname1"}: &eventsv1.Event{}, + }, + {Namespace: "ns2", Name: "name2"}: {}, + }} + + t.Run("existing event", func(t *testing.T) { + ew.OnDelete(&eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns1", + Name: "eventname1", + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns1", + Name: "name1", + }, + }) + + assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns1", Name: "name1"}], 0) + }) + + t.Run("missing event", func(t *testing.T) { + ew.OnDelete(&eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns2", + Name: "eventname2", + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns2", + Name: "name2", + }, + EventTime: metav1.NewMicroTime(now), + }) + + assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns2", Name: "name2"}], 0) + }) + + t.Run("missing object", func(t *testing.T) { + ew.OnDelete(&eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns3", + Name: "eventname3", + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns3", + Name: "name3", + }, + EventTime: metav1.NewMicroTime(now), + }) + + assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns3", Name: "name3"}], 0) + }) +} + +func TestEventWatcher_List(t *testing.T) { + ctx := context.Background() + now := time.Now() + ew := eventWatcher{ctx: ctx, objectEvents: map[types.NamespacedName]eventSet{ + {Namespace: "ns1", Name: "name1"}: { + {Namespace: "eventns1", Name: "eventname1"}: &eventsv1.Event{ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.NewTime(now)}}, + {Namespace: "eventns2", Name: "eventname2"}: &eventsv1.Event{ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.NewTime(now.Add(-time.Hour))}}, + }, + {Namespace: "ns2", Name: "name2"}: {}, + }} + + t.Run("all events", func(t *testing.T) { + result := ew.List(types.NamespacedName{Namespace: "ns1", Name: "name1"}, time.Time{}) + + assert.Len(t, result, 2) + }) + + t.Run("recent events", func(t *testing.T) { + result := ew.List(types.NamespacedName{Namespace: "ns1", Name: "name1"}, now.Add(-time.Minute)) + + assert.Len(t, result, 1) + }) + + t.Run("no events", func(t *testing.T) { + result := ew.List(types.NamespacedName{Namespace: "ns2", Name: "name2"}, time.Time{}) + + assert.Len(t, result, 0) + }) + + t.Run("no object", func(t *testing.T) { + result := ew.List(types.NamespacedName{Namespace: "ns3", Name: "name3"}, time.Time{}) + + assert.Len(t, result, 0) + }) + + t.Run("sorted", func(t *testing.T) { + result := ew.List(types.NamespacedName{Namespace: "ns1", Name: "name1"}, time.Time{}) + + assert.Equal(t, metav1.NewTime(now.Add(-time.Hour)), result[0].CreationTimestamp) + assert.Equal(t, metav1.NewTime(now), result[1].CreationTimestamp) + }) +} diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index a3397354e..4cd941a7e 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -12,7 +12,7 @@ import ( "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" + pluginsUtils "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" compiler "github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/backoff" @@ -99,12 +99,13 @@ type PluginManager struct { // Per namespace-resource backOffController *backoff.Controller resourceLevelMonitor *ResourceLevelMonitor + eventWatcher EventWatcher } func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) { o.SetNamespace(taskCtx.GetNamespace()) - o.SetAnnotations(utils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), utils.CopyMap(taskCtx.GetAnnotations()))) - o.SetLabels(utils.UnionMaps(cfg.DefaultLabels, o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()))) + o.SetAnnotations(pluginsUtils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), pluginsUtils.CopyMap(taskCtx.GetAnnotations()))) + o.SetLabels(pluginsUtils.UnionMaps(cfg.DefaultLabels, o.GetLabels(), pluginsUtils.CopyMap(taskCtx.GetLabels()))) o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName()) if !e.plugin.GetProperties().DisableInjectOwnerReferences { @@ -117,7 +118,7 @@ func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetad } if errs := validation.IsDNS1123Subdomain(o.GetName()); len(errs) > 0 { - o.SetName(utils.ConvertToDNS1123SubdomainCompatibleString(o.GetName())) + o.SetName(pluginsUtils.ConvertToDNS1123SubdomainCompatibleString(o.GetName())) } } @@ -284,6 +285,23 @@ func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore return pluginsCore.UnknownTransition, err } + // Add events since last update + recentEvents := e.eventWatcher.List(nsName, k8sPluginState.LastEventUpdate) + version := p.Version() + lastEventUpdate := k8sPluginState.LastEventUpdate + for _, event := range recentEvents { + logger.Infof(ctx, "Observed event [%s:%s] for object [%s]", event.Reason, event.Note, nsName.String()) + tCtx.EventsRecorder().RecordRaw(ctx, pluginsCore.PhaseInfoWithTaskInfo( + p.Phase(), version, event.Note, &pluginsCore.TaskInfo{OccurredAt: &event.CreationTimestamp.Time}, + )) + version += 1 + lastEventUpdate = event.CreationTimestamp.Time + // TODO: figure out how to convey last update time + } + p = p.WithVersion(version) + // TODO: what if no TaskInfo? + p.Info().LastEventUpdate = &lastEventUpdate + if p.Phase() == pluginsCore.PhaseSuccess { var opReader io.OutputReader if pCtx.ow == nil { @@ -353,13 +371,16 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio Reason: transition.Info().Reason(), }, } + if transition.Info().Info() != nil && transition.Info().Info().LastEventUpdate != nil { + newPluginState.K8sPluginState.LastEventUpdate = *transition.Info().Info().LastEventUpdate + } if err := tCtx.PluginStateWriter().Put(pluginStateVersion, &newPluginState); err != nil { return pluginsCore.UnknownTransition, err } } - return transition, err + return transition, nil } func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) error { @@ -551,8 +572,13 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry droppedUpdateCount := labeled.NewCounter("informer_update_dropped", "Update events from informer that have the same resource version", metricsScope) genericCount := labeled.NewCounter("informer_generic", "Generic events from informer", metricsScope) + gvk, err := getPluginGvk(entry.ResourceToWatch) + if err != nil { + return nil, err + } + enqueueOwner := iCtx.EnqueueOwner() - err := src.Start( + err = src.Start( ctx, // Handlers handler.Funcs{ @@ -617,16 +643,20 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry return nil, err } - // Construct the collector that will emit a gauge indicating current levels of the resource that this K8s plugin operates on - gvk, err := getPluginGvk(entry.ResourceToWatch) + // TODO: make configurable and/or fail open + // TODO: cache subset of event info we need? + // TODO: move logic to plugins or keep here? if plugins, where do we init? + eventWatcher, err := NewEventWatcher(ctx, gvk) if err != nil { return nil, err } - sharedInformer, err := getPluginSharedInformer(ctx, kubeClient, entry.ResourceToWatch) + + // Construct the collector that will emit a gauge indicating current levels of the resource that this K8s plugin operates on + pluginInformer, err := getPluginSharedInformer(ctx, kubeClient, entry.ResourceToWatch) if err != nil { return nil, err } - rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, sharedInformer, gvk) + rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, pluginInformer, gvk) // Start the poller and gauge emitter rm.RunCollectorOnce(ctx) @@ -637,6 +667,7 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry metrics: newPluginMetrics(metricsScope), kubeClient: kubeClient, resourceLevelMonitor: rm, + eventWatcher: eventWatcher, }, nil } From 29adee7664c5b2775c9a92d977d0a66e1ecbdc81 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Mon, 11 Sep 2023 14:01:58 -0700 Subject: [PATCH 2/9] Refactor clientset, pluginstate Signed-off-by: Andrew Dye --- pkg/controller/controller.go | 10 +- pkg/controller/executors/dag_structure.go | 2 + pkg/controller/executors/execution_context.go | 2 + pkg/controller/executors/kube.go | 2 + pkg/controller/executors/node_lookup.go | 2 + pkg/controller/executors/workflow.go | 2 + .../nodes/factory/handler_factory.go | 8 +- pkg/controller/nodes/task/handler.go | 9 +- pkg/controller/nodes/task/handler_test.go | 58 +++--- .../nodes/task/k8s/event_watcher.go | 15 +- .../nodes/task/k8s/plugin_manager.go | 181 +++++++++--------- .../nodes/task/k8s/plugin_manager_test.go | 105 +++++----- pkg/controller/nodes/task/plugin_config.go | 11 +- .../nodes/task/plugin_config_test.go | 19 +- pkg/controller/workflow/executor_test.go | 58 +++--- 15 files changed, 248 insertions(+), 236 deletions(-) diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 57dcb9ba6..5c30bcf33 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -323,7 +323,7 @@ func getAdminClient(ctx context.Context) (client service.AdminServiceClient, sig } // New returns a new FlyteWorkflow controller -func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Interface, flytepropellerClientset clientset.Interface, +func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Interface, flytepropellerClientset clientset.Interface, flyteworkflowInformerFactory informers.SharedInformerFactory, informerFactory k8sInformers.SharedInformerFactory, kubeClient executors.Client, scope promutils.Scope) (*Controller, error) { @@ -354,13 +354,13 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter if err != nil { return nil, errors.Wrapf(err, "Failed to create EventSink [%v], error %v", events.GetConfig(ctx).Type, err) } - gc, err := NewGarbageCollector(cfg, scope, clock.RealClock{}, kubeclientset.CoreV1().Namespaces(), flytepropellerClientset.FlyteworkflowV1alpha1()) + gc, err := NewGarbageCollector(cfg, scope, clock.RealClock{}, kubeClientset.CoreV1().Namespaces(), flytepropellerClientset.FlyteworkflowV1alpha1()) if err != nil { logger.Errorf(ctx, "failed to initialize GC for workflows") return nil, errors.Wrapf(err, "failed to initialize WF GC") } - eventRecorder, err := utils.NewK8sEventRecorder(ctx, kubeclientset, controllerAgentName, cfg.PublishK8sEvents) + eventRecorder, err := utils.NewK8sEventRecorder(ctx, kubeClientset, controllerAgentName, cfg.PublishK8sEvents) if err != nil { logger.Errorf(ctx, "failed to event recorder %v", err) return nil, errors.Wrapf(err, "failed to initialize resource lock.") @@ -372,7 +372,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter numWorkers: cfg.Workers, } - lock, err := leader.NewResourceLock(kubeclientset.CoreV1(), kubeclientset.CoordinationV1(), eventRecorder, cfg.LeaderElection) + lock, err := leader.NewResourceLock(kubeClientset.CoreV1(), kubeClientset.CoordinationV1(), eventRecorder, cfg.LeaderElection) if err != nil { logger.Errorf(ctx, "failed to initialize resource lock.") return nil, errors.Wrapf(err, "failed to initialize resource lock.") @@ -440,7 +440,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter recoveryClient := recovery.NewClient(adminClient) nodeHandlerFactory, err := factory.NewHandlerFactory(ctx, launchPlanActor, launchPlanActor, - kubeClient, catalogClient, recoveryClient, &cfg.EventConfig, cfg.ClusterID, signalClient, scope) + kubeClient, kubeClientset, catalogClient, recoveryClient, &cfg.EventConfig, cfg.ClusterID, signalClient, scope) if err != nil { return nil, errors.Wrapf(err, "failed to create node handler factory") } diff --git a/pkg/controller/executors/dag_structure.go b/pkg/controller/executors/dag_structure.go index cd0f1be3c..b62b46b33 100644 --- a/pkg/controller/executors/dag_structure.go +++ b/pkg/controller/executors/dag_structure.go @@ -6,6 +6,8 @@ import ( "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) +//go:generate mockery -case=underscore + // An interface that captures the Directed Acyclic Graph structure in which the nodes are connected. // If NodeLookup and DAGStructure are used together a traversal can be implemented. type DAGStructure interface { diff --git a/pkg/controller/executors/execution_context.go b/pkg/controller/executors/execution_context.go index 53c0bbbd1..3de9b2b92 100644 --- a/pkg/controller/executors/execution_context.go +++ b/pkg/controller/executors/execution_context.go @@ -4,6 +4,8 @@ import ( "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) +// go:generate mockery -case=underscore + type TaskDetailsGetter interface { GetTask(id v1alpha1.TaskID) (v1alpha1.ExecutableTask, error) } diff --git a/pkg/controller/executors/kube.go b/pkg/controller/executors/kube.go index a37e7a216..325adb5a3 100644 --- a/pkg/controller/executors/kube.go +++ b/pkg/controller/executors/kube.go @@ -13,6 +13,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +//go:generate mockery -case=underscore + // Client is a friendlier controller-runtime client that gets passed to executors type Client interface { // GetClient returns a client configured with the Config diff --git a/pkg/controller/executors/node_lookup.go b/pkg/controller/executors/node_lookup.go index 66fc9bddf..c14f13b1d 100644 --- a/pkg/controller/executors/node_lookup.go +++ b/pkg/controller/executors/node_lookup.go @@ -6,6 +6,8 @@ import ( "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) +//go:generate mockery -case=underscore + // NodeLookup provides a structure that enables looking up all nodes within the current execution hierarchy/context. // NOTE: execution hierarchy may change the nodes available, this is because when a SubWorkflow is being executed, only // the nodes within the subworkflow are visible diff --git a/pkg/controller/executors/workflow.go b/pkg/controller/executors/workflow.go index 59bc1efe3..9af39d938 100644 --- a/pkg/controller/executors/workflow.go +++ b/pkg/controller/executors/workflow.go @@ -6,6 +6,8 @@ import ( "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) +//go:generate mockery -case=underscore + type Workflow interface { Initialize(ctx context.Context) error HandleFlyteWorkflow(ctx context.Context, w *v1alpha1.FlyteWorkflow) error diff --git a/pkg/controller/nodes/factory/handler_factory.go b/pkg/controller/nodes/factory/handler_factory.go index 9ec00da7a..d5408a01d 100644 --- a/pkg/controller/nodes/factory/handler_factory.go +++ b/pkg/controller/nodes/factory/handler_factory.go @@ -3,6 +3,8 @@ package factory import ( "context" + "k8s.io/client-go/kubernetes" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog" @@ -33,6 +35,7 @@ type handlerFactory struct { workflowLauncher launchplan.Executor launchPlanReader launchplan.Reader kubeClient executors.Client + kubeClientset kubernetes.Interface catalogClient catalog.Client recoveryClient recovery.Client eventConfig *config.EventConfig @@ -50,7 +53,7 @@ func (f *handlerFactory) GetHandler(kind v1alpha1.NodeKind) (interfaces.NodeHand } func (f *handlerFactory) Setup(ctx context.Context, executor interfaces.Node, setup interfaces.SetupContext) error { - t, err := task.New(ctx, f.kubeClient, f.catalogClient, f.eventConfig, f.clusterID, f.scope) + t, err := task.New(ctx, f.kubeClient, f.kubeClientset, f.catalogClient, f.eventConfig, f.clusterID, f.scope) if err != nil { return err } @@ -79,13 +82,14 @@ func (f *handlerFactory) Setup(ctx context.Context, executor interfaces.Node, se } func NewHandlerFactory(ctx context.Context, workflowLauncher launchplan.Executor, launchPlanReader launchplan.Reader, - kubeClient executors.Client, catalogClient catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig, + kubeClient executors.Client, kubeClientset kubernetes.Interface, catalogClient catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig, clusterID string, signalClient service.SignalServiceClient, scope promutils.Scope) (interfaces.HandlerFactory, error) { return &handlerFactory{ workflowLauncher: workflowLauncher, launchPlanReader: launchPlanReader, kubeClient: kubeClient, + kubeClientset: kubeClientset, catalogClient: catalogClient, recoveryClient: recoveryClient, eventConfig: eventConfig, diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index 88e3b00f9..e857d2c43 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -6,6 +6,8 @@ import ( "runtime/debug" "time" + "k8s.io/client-go/kubernetes" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" @@ -205,6 +207,7 @@ type Handler struct { metrics *metrics pluginRegistry PluginRegistryIface kubeClient pluginCore.KubeClient + kubeClientset kubernetes.Interface secretManager pluginCore.SecretManager resourceManager resourcemanager.BaseResourceManager cfg *config.Config @@ -239,7 +242,7 @@ func (t *Handler) Setup(ctx context.Context, sCtx interfaces.SetupContext) error // Create the resource negotiator here // and then convert it to proxies later and pass them to plugins - enabledPlugins, defaultForTaskTypes, err := WranglePluginsAndGenerateFinalList(ctx, &t.cfg.TaskPlugins, t.pluginRegistry) + enabledPlugins, defaultForTaskTypes, err := WranglePluginsAndGenerateFinalList(ctx, &t.cfg.TaskPlugins, t.pluginRegistry, t.kubeClientset) if err != nil { logger.Errorf(ctx, "Failed to finalize enabled plugins. Error: %s", err) return err @@ -874,7 +877,8 @@ func (t Handler) Finalize(ctx context.Context, nCtx interfaces.NodeExecutionCont }() } -func New(ctx context.Context, kubeClient executors.Client, client catalog.Client, eventConfig *controllerConfig.EventConfig, clusterID string, scope promutils.Scope) (*Handler, error) { +func New(ctx context.Context, kubeClient executors.Client, kubeClientset kubernetes.Interface, client catalog.Client, + eventConfig *controllerConfig.EventConfig, clusterID string, scope promutils.Scope) (*Handler, error) { // TODO New should take a pointer async, err := catalog.NewAsyncClient(client, *catalog.GetConfig(), scope.NewSubScope("async_catalog")) if err != nil { @@ -910,6 +914,7 @@ func New(ctx context.Context, kubeClient executors.Client, client catalog.Client }, pluginScope: scope.NewSubScope("plugin"), kubeClient: kubeClient, + kubeClientset: kubeClientset, catalog: client, asyncCatalog: async, resourceManager: nil, diff --git a/pkg/controller/nodes/task/handler_test.go b/pkg/controller/nodes/task/handler_test.go index 0a009cb65..e4f8286fe 100644 --- a/pkg/controller/nodes/task/handler_test.go +++ b/pkg/controller/nodes/task/handler_test.go @@ -6,44 +6,35 @@ import ( "fmt" "testing" - "github.com/flyteorg/flyteidl/clients/go/coreutils" "github.com/golang/protobuf/proto" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + v1 "k8s.io/api/core/v1" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + k8sfake "k8s.io/client-go/kubernetes/fake" - eventsErr "github.com/flyteorg/flytepropeller/events/errors" - - pluginK8sMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s/mocks" - + "github.com/flyteorg/flyteidl/clients/go/coreutils" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" - "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" - - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" - - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/resourcemanager" - - "github.com/flyteorg/flytestdlib/contextutils" - "github.com/flyteorg/flytestdlib/promutils/labeled" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/datacatalog" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog" pluginCatalogMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog/mocks" pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" pluginCoreMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" ioMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" pluginK8s "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" - controllerConfig "github.com/flyteorg/flytepropeller/pkg/controller/config" - "github.com/flyteorg/flytestdlib/promutils" - "github.com/flyteorg/flytestdlib/storage" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - v1 "k8s.io/api/core/v1" - v12 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + pluginK8sMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s/mocks" + eventsErr "github.com/flyteorg/flytepropeller/events/errors" + "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" flyteMocks "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" + controllerConfig "github.com/flyteorg/flytepropeller/pkg/controller/config" "github.com/flyteorg/flytepropeller/pkg/controller/executors/mocks" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/interfaces" @@ -51,7 +42,13 @@ import ( "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/codex" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/fakeplugins" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/resourcemanager" rmConfig "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/resourcemanager/config" + + "github.com/flyteorg/flytestdlib/contextutils" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/promutils/labeled" + "github.com/flyteorg/flytestdlib/storage" ) var eventConfig = &controllerConfig.EventConfig{ @@ -244,12 +241,13 @@ func Test_task_Setup(t *testing.T) { t.Run(tt.name, func(t *testing.T) { sCtx := &nodeMocks.SetupContext{} fakeKubeClient := mocks.NewFakeKubeClient() + mockClientset := k8sfake.NewSimpleClientset() sCtx.On("KubeClient").Return(fakeKubeClient) sCtx.On("OwnerKind").Return("test") sCtx.On("EnqueueOwner").Return(pluginCore.EnqueueOwner(func(name types.NamespacedName) error { return nil })) sCtx.On("MetricsScope").Return(promutils.NewTestScope()) - tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope()) + tk, err := New(context.TODO(), fakeKubeClient, mockClientset, &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope()) tk.cfg.TaskPlugins.EnabledPlugins = tt.enabledPlugins tk.cfg.TaskPlugins.DefaultForTaskTypes = tt.defaultForTaskTypes assert.NoError(t, err) @@ -962,6 +960,7 @@ func Test_task_Handle_Catalog(t *testing.T) { ev := &fakeBufferedEventRecorder{} nCtx := createNodeContext(ev, "test", state, tt.args.catalogSkip) c := &pluginCatalogMocks.Client{} + mockClientset := k8sfake.NewSimpleClientset() if tt.args.catalogFetch { or := &ioMocks.OutputReader{} or.OnDeckExistsMatch(mock.Anything).Return(true, nil) @@ -980,7 +979,7 @@ func Test_task_Handle_Catalog(t *testing.T) { c.OnPutMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, nil), nil) c.OnUpdateMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, nil), nil) } - tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), c, eventConfig, testClusterID, promutils.NewTestScope()) + tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), mockClientset, c, eventConfig, testClusterID, promutils.NewTestScope()) assert.NoError(t, err) tk.defaultPlugins = map[pluginCore.TaskType]pluginCore.Plugin{ "test": fakeplugins.NewPhaseBasedPlugin(), @@ -1221,6 +1220,7 @@ func Test_task_Handle_Reservation(t *testing.T) { nr := &nodeMocks.NodeStateReader{} st := bytes.NewBuffer([]byte{}) cod := codex.GobStateCodec{} + mockClientset := k8sfake.NewSimpleClientset() assert.NoError(t, cod.Encode(&fakeplugins.NextPhaseState{ Phase: pluginCore.PhaseSuccess, OutputExists: true, @@ -1241,7 +1241,7 @@ func Test_task_Handle_Reservation(t *testing.T) { c.OnPutMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, nil), nil) c.OnUpdateMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(catalog.NewStatus(core.CatalogCacheStatus_CACHE_POPULATED, nil), nil) c.OnGetOrExtendReservationMatch(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(&datacatalog.Reservation{OwnerId: tt.args.ownerID}, nil) - tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), c, eventConfig, testClusterID, promutils.NewTestScope()) + tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), mockClientset, c, eventConfig, testClusterID, promutils.NewTestScope()) assert.NoError(t, err) tk.defaultPlugins = map[pluginCore.TaskType]pluginCore.Plugin{ "test": fakeplugins.NewPhaseBasedPlugin(), @@ -1761,6 +1761,7 @@ func Test_task_Finalize(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { nCtx := createNodeContext(tt.args.releaseReservation) + mockClientset := k8sfake.NewSimpleClientset() catalog := &pluginCatalogMocks.Client{} if tt.args.releaseReservationError { @@ -1770,7 +1771,7 @@ func Test_task_Finalize(t *testing.T) { } m := tt.fields.defaultPluginCallback() - tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), catalog, eventConfig, testClusterID, promutils.NewTestScope()) + tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), mockClientset, catalog, eventConfig, testClusterID, promutils.NewTestScope()) assert.NoError(t, err) tk.defaultPlugin = m tk.resourceManager = noopRm @@ -1789,7 +1790,8 @@ func Test_task_Finalize(t *testing.T) { } func TestNew(t *testing.T) { - got, err := New(context.TODO(), mocks.NewFakeKubeClient(), &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope()) + mockClientset := k8sfake.NewSimpleClientset() + got, err := New(context.TODO(), mocks.NewFakeKubeClient(), mockClientset, &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope()) assert.NoError(t, err) assert.NotNil(t, got) assert.NotNil(t, got.defaultPlugins) diff --git a/pkg/controller/nodes/task/k8s/event_watcher.go b/pkg/controller/nodes/task/k8s/event_watcher.go index 60fd3d93e..a6b157ee1 100644 --- a/pkg/controller/nodes/task/k8s/event_watcher.go +++ b/pkg/controller/nodes/task/k8s/event_watcher.go @@ -5,10 +5,6 @@ import ( "sort" "time" - "github.com/flyteorg/flyteplugins/go/tasks/errors" - "github.com/flyteorg/flytepropeller/pkg/controller/config" - "github.com/flyteorg/flytepropeller/pkg/utils" - "github.com/flyteorg/flytestdlib/logger" eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" @@ -16,6 +12,9 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" informerEventsv1 "k8s.io/client-go/informers/events/v1" + "k8s.io/client-go/kubernetes" + + "github.com/flyteorg/flytestdlib/logger" ) type EventWatcher interface { @@ -94,16 +93,12 @@ func (e *eventWatcher) List(objectNsName types.NamespacedName, createdAfter time return result } -func NewEventWatcher(ctx context.Context, gvk schema.GroupVersionKind) (EventWatcher, error) { - kubeClient, _, err := utils.GetKubeConfig(ctx, config.GetConfig()) - if err != nil { - return nil, errors.Wrapf(errors.PluginInitializationFailed, err, "failed to get kube client") - } +func NewEventWatcher(ctx context.Context, gvk schema.GroupVersionKind, kubeClientset kubernetes.Interface) (EventWatcher, error) { objectSelector := func(opts *metav1.ListOptions) { opts.FieldSelector = fields.OneTermEqualSelector("regarding.kind", gvk.Kind).String() } eventInformer := informers.NewSharedInformerFactoryWithOptions( - kubeClient, 0, informers.WithTweakListOptions(objectSelector)).Events().V1().Events() + kubeClientset, 0, informers.WithTweakListOptions(objectSelector)).Events().V1().Events() watcher := &eventWatcher{ informer: eventInformer, objectEvents: make(map[types.NamespacedName]eventSet), diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 4cd941a7e..e5e8ecbae 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -6,26 +6,7 @@ import ( "reflect" "time" - "github.com/flyteorg/flyteplugins/go/tasks/errors" - pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" - pluginsUtils "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" - - compiler "github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/backoff" - nodeTaskConfig "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config" - - "github.com/flyteorg/flytestdlib/contextutils" - stdErrors "github.com/flyteorg/flytestdlib/errors" - "github.com/flyteorg/flytestdlib/logger" - "github.com/flyteorg/flytestdlib/promutils" - "github.com/flyteorg/flytestdlib/promutils/labeled" - "golang.org/x/time/rate" - v1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,16 +14,31 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation" - + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/flyteorg/flyteplugins/go/tasks/errors" + pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" + pluginsUtils "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" + compiler "github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/backoff" + nodeTaskConfig "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config" + "github.com/flyteorg/flytestdlib/contextutils" + stdErrors "github.com/flyteorg/flytestdlib/errors" + "github.com/flyteorg/flytestdlib/logger" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/promutils/labeled" ) const finalizer = "flyte/flytek8s" @@ -58,8 +54,9 @@ const ( ) type PluginState struct { - Phase PluginPhase - K8sPluginState k8s.PluginState + Phase PluginPhase + K8sPluginState k8s.PluginState + LastEventUpdate time.Time } type PluginMetrics struct { @@ -84,7 +81,7 @@ func newPluginMetrics(s promutils.Scope) PluginMetrics { } } -func IsK8sObjectNotExists(err error) bool { +func isK8sObjectNotExists(err error) bool { return k8serrors.IsNotFound(err) || k8serrors.IsGone(err) || k8serrors.IsResourceExpired(err) } @@ -102,7 +99,7 @@ type PluginManager struct { eventWatcher EventWatcher } -func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) { +func (e *PluginManager) addObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) { o.SetNamespace(taskCtx.GetNamespace()) o.SetAnnotations(pluginsUtils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), pluginsUtils.CopyMap(taskCtx.GetAnnotations()))) o.SetLabels(pluginsUtils.UnionMaps(cfg.DefaultLabels, o.GetLabels(), pluginsUtils.CopyMap(taskCtx.GetLabels()))) @@ -185,7 +182,7 @@ func (e *PluginManager) getPodEffectiveResourceLimits(ctx context.Context, pod * return podRequestedResources } -func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (pluginsCore.Transition, error) { +func (e *PluginManager) launchResource(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (pluginsCore.Transition, error) { tmpl, err := tCtx.TaskReader().Read(ctx) if err != nil { return pluginsCore.Transition{}, err @@ -203,7 +200,7 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas return pluginsCore.UnknownTransition, err } - e.AddObjectMetadata(k8sTaskCtxMetadata, o, config.GetK8sPluginConfig()) + e.addObjectMetadata(k8sTaskCtxMetadata, o, config.GetK8sPluginConfig()) logger.Infof(ctx, "Creating Object: Type:[%v], Object:[%v/%v]", o.GetObjectKind().GroupVersionKind(), o.GetNamespace(), o.GetName()) key := backoff.ComposeResourceKey(o) @@ -252,18 +249,21 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas return pluginsCore.DoTransition(pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "task submitted to K8s")), nil } -func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, k8sPluginState *k8s.PluginState) (pluginsCore.Transition, error) { +func (e *PluginManager) getResource(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (client.Object, error) { o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata()) if err != nil { logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err) - return pluginsCore.DoTransition(pluginsCore.PhaseInfoFailure("BadTaskDefinition", fmt.Sprintf("Failed to build resource, caused by: %s", err.Error()), nil)), nil + return nil, err } + e.addObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) + return o, nil +} - e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) +func (e *PluginManager) checkResourcePhase(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, o client.Object, k8sPluginState *k8s.PluginState) (pluginsCore.Transition, error) { nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} // Attempt to get resource from informer cache, if not found, retrieve it from API server. if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil { - if IsK8sObjectNotExists(err) { + if isK8sObjectNotExists(err) { // This happens sometimes because a node gets removed and K8s deletes the pod. This will result in a // Pod does not exist error. This should be retried using the retry policy logger.Warningf(ctx, "Failed to find the Resource with name: %v. Error: %v", nsName, err) @@ -285,23 +285,6 @@ func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore return pluginsCore.UnknownTransition, err } - // Add events since last update - recentEvents := e.eventWatcher.List(nsName, k8sPluginState.LastEventUpdate) - version := p.Version() - lastEventUpdate := k8sPluginState.LastEventUpdate - for _, event := range recentEvents { - logger.Infof(ctx, "Observed event [%s:%s] for object [%s]", event.Reason, event.Note, nsName.String()) - tCtx.EventsRecorder().RecordRaw(ctx, pluginsCore.PhaseInfoWithTaskInfo( - p.Phase(), version, event.Note, &pluginsCore.TaskInfo{OccurredAt: &event.CreationTimestamp.Time}, - )) - version += 1 - lastEventUpdate = event.CreationTimestamp.Time - // TODO: figure out how to convey last update time - } - p = p.WithVersion(version) - // TODO: what if no TaskInfo? - p.Info().LastEventUpdate = &lastEventUpdate - if p.Phase() == pluginsCore.PhaseSuccess { var opReader io.OutputReader if pCtx.ow == nil { @@ -333,10 +316,11 @@ func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (pluginsCore.Transition, error) { // read phase state - ps := PluginState{} - if v, err := tCtx.PluginStateReader().Get(&ps); err != nil { + pluginState := PluginState{} + if v, err := tCtx.PluginStateReader().Get(&pluginState); err != nil { if v != pluginStateVersion { - return pluginsCore.DoTransition(pluginsCore.PhaseInfoRetryableFailure(errors.CorruptedPluginState, fmt.Sprintf("plugin state version mismatch expected [%d] got [%d]", pluginStateVersion, v), nil)), nil + return pluginsCore.DoTransition(pluginsCore.PhaseInfoRetryableFailure(errors.CorruptedPluginState, + fmt.Sprintf("plugin state version mismatch expected [%d] got [%d]", pluginStateVersion, v), nil)), nil } return pluginsCore.UnknownTransition, errors.Wrapf(errors.CorruptedPluginState, err, "Failed to read unmarshal custom state") } @@ -344,37 +328,57 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio // evaluate plugin var err error var transition pluginsCore.Transition - pluginPhase := ps.Phase - if ps.Phase == PluginPhaseNotStarted { - transition, err = e.LaunchResource(ctx, tCtx) + var o client.Object + pluginPhase := pluginState.Phase + if pluginState.Phase == PluginPhaseNotStarted { + transition, err = e.launchResource(ctx, tCtx) if err == nil && transition.Info().Phase() == pluginsCore.PhaseQueued { pluginPhase = PluginPhaseStarted } } else { - transition, err = e.CheckResourcePhase(ctx, tCtx, &ps.K8sPluginState) + o, err = e.getResource(ctx, tCtx) + if err != nil { + transition, err = pluginsCore.DoTransition(pluginsCore.PhaseInfoFailure("BadTaskDefinition", + fmt.Sprintf("Failed to build resource, caused by: %s", err.Error()), nil)), nil + } else { + transition, err = e.checkResourcePhase(ctx, tCtx, o, &pluginState.K8sPluginState) + } } if err != nil { return transition, err } - // persist any changes in phase state - k8sPluginState := ps.K8sPluginState - if ps.Phase != pluginPhase || k8sPluginState.Phase != transition.Info().Phase() || - k8sPluginState.PhaseVersion != transition.Info().Version() || k8sPluginState.Reason != transition.Info().Reason() { - - newPluginState := PluginState{ - Phase: pluginPhase, - K8sPluginState: k8s.PluginState{ - Phase: transition.Info().Phase(), - PhaseVersion: transition.Info().Version(), - Reason: transition.Info().Reason(), - }, - } - if transition.Info().Info() != nil && transition.Info().Info().LastEventUpdate != nil { - newPluginState.K8sPluginState.LastEventUpdate = *transition.Info().Info().LastEventUpdate + // Add events since last update + version := transition.Info().Version() + lastEventUpdate := pluginState.LastEventUpdate + if o != nil { + nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} + recentEvents := e.eventWatcher.List(nsName, lastEventUpdate) + for _, event := range recentEvents { + logger.Infof(ctx, "Observed event [%s:%s] for object [%s]", event.Reason, event.Note, nsName.String()) + tCtx.EventsRecorder().RecordRaw(ctx, pluginsCore.PhaseInfoWithTaskInfo( + transition.Info().Phase(), + version, + event.Note, + &pluginsCore.TaskInfo{OccurredAt: &event.CreationTimestamp.Time}, + )) + version += 1 + lastEventUpdate = event.CreationTimestamp.Time } + } + // persist any changes in phase state + newPluginState := PluginState{ + Phase: pluginPhase, + K8sPluginState: k8s.PluginState{ + Phase: transition.Info().Phase(), + PhaseVersion: version, + Reason: transition.Info().Reason(), + }, + LastEventUpdate: lastEventUpdate, + } + if pluginState != newPluginState { if err := tCtx.PluginStateWriter().Put(pluginStateVersion, &newPluginState); err != nil { return pluginsCore.UnknownTransition, err } @@ -387,16 +391,12 @@ func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecution logger.Infof(ctx, "KillTask invoked. We will attempt to delete object [%v].", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()) - o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata()) + o, err := e.getResource(ctx, tCtx) if err != nil { - // This will recurrent, so we will skip further finalize - logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v, when finalizing.", - tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err) + logger.Errorf(ctx, "%v", err) return nil } - e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) - deleteResource := true abortOverride, hasAbortOverride := e.plugin.(k8s.PluginAbortOverride) @@ -428,7 +428,7 @@ func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecution } } - if err != nil && !IsK8sObjectNotExists(err) { + if err != nil && !isK8sObjectNotExists(err) { logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v", resourceToFinalize.GetNamespace(), resourceToFinalize.GetName(), err) return err @@ -437,11 +437,11 @@ func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecution return nil } -func (e *PluginManager) ClearFinalizers(ctx context.Context, o client.Object) error { +func (e *PluginManager) clearFinalizers(ctx context.Context, o client.Object) error { if len(o.GetFinalizers()) > 0 { o.SetFinalizers([]string{}) err := e.kubeClient.GetClient().Update(ctx, o) - if err != nil && !IsK8sObjectNotExists(err) { + if err != nil && !isK8sObjectNotExists(err) { logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v", o.GetNamespace(), o.GetName(), err) return err @@ -455,25 +455,21 @@ func (e *PluginManager) ClearFinalizers(ctx context.Context, o client.Object) er func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (err error) { errs := stdErrors.ErrorCollection{} - var o client.Object var nsName k8stypes.NamespacedName cfg := config.GetK8sPluginConfig() - o, err = e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata()) + o, err := e.getResource(ctx, tCtx) if err != nil { - // This will recurrent, so we will skip further finalize - logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v, when finalizing.", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err) + logger.Errorf(ctx, "%v", err) return nil } - - e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) nsName = k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} // Attempt to cleanup finalizers so that the object may be deleted/garbage collected. We try to clear them for all // objects, regardless of whether or not InjectFinalizer is configured to handle all cases where InjectFinalizer is // enabled/disabled during object execution. if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil { - if IsK8sObjectNotExists(err) { + if isK8sObjectNotExists(err) { return nil } // This happens sometimes because a node gets removed and K8s deletes the pod. This will result in a @@ -485,7 +481,7 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu // This must happen after sending admin event. It's safe against partial failures because if the event failed, we will // simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send // the same event (idempotent) and then come here again... - err = e.ClearFinalizers(ctx, o) + err = e.clearFinalizers(ctx, o) if err != nil { errs.Append(err) } @@ -494,7 +490,7 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu if cfg.DeleteResourceOnFinalize && !e.plugin.GetProperties().DisableDeleteResourceOnFinalize { // Attempt to delete resource, if not found, return success. if err := e.kubeClient.GetClient().Delete(ctx, o); err != nil { - if IsK8sObjectNotExists(err) { + if isK8sObjectNotExists(err) { return errs.ErrorOrDefault() } @@ -509,9 +505,9 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu } func NewPluginManagerWithBackOff(ctx context.Context, iCtx pluginsCore.SetupContext, entry k8s.PluginEntry, backOffController *backoff.Controller, - monitorIndex *ResourceMonitorIndex) (*PluginManager, error) { + monitorIndex *ResourceMonitorIndex, kubeClientset kubernetes.Interface) (*PluginManager, error) { - mgr, err := NewPluginManager(ctx, iCtx, entry, monitorIndex) + mgr, err := NewPluginManager(ctx, iCtx, entry, monitorIndex, kubeClientset) if err == nil { mgr.backOffController = backOffController } @@ -520,7 +516,7 @@ func NewPluginManagerWithBackOff(ctx context.Context, iCtx pluginsCore.SetupCont // Creates a K8s generic task executor. This provides an easier way to build task executors that create K8s resources. func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry k8s.PluginEntry, - monitorIndex *ResourceMonitorIndex) (*PluginManager, error) { + monitorIndex *ResourceMonitorIndex, kubeClientset kubernetes.Interface) (*PluginManager, error) { if iCtx.EnqueueOwner() == nil { return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize plugin, enqueue Owner cannot be nil or empty.") @@ -645,8 +641,7 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry // TODO: make configurable and/or fail open // TODO: cache subset of event info we need? - // TODO: move logic to plugins or keep here? if plugins, where do we init? - eventWatcher, err := NewEventWatcher(ctx, gvk) + eventWatcher, err := NewEventWatcher(ctx, gvk, kubeClientset) if err != nil { return nil, err } diff --git a/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 7b78c6810..49f59942d 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -7,37 +7,32 @@ import ( "reflect" "testing" - "k8s.io/client-go/kubernetes/scheme" - - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" - "github.com/flyteorg/flytestdlib/contextutils" - "github.com/flyteorg/flytestdlib/promutils/labeled" - - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/backoff" - "github.com/flyteorg/flytestdlib/promutils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + v1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + k8stypes "k8s.io/apimachinery/pkg/types" + k8sfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" pluginsCoreMock "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" pluginsk8sMock "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s/mocks" - - "github.com/stretchr/testify/mock" - v1 "k8s.io/api/core/v1" - v12 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - "github.com/stretchr/testify/assert" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8stypes "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - "github.com/flyteorg/flytepropeller/pkg/controller/executors/mocks" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/backoff" + "github.com/flyteorg/flytestdlib/contextutils" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/promutils/labeled" ) type extendedFakeClient struct { @@ -135,17 +130,19 @@ func (p *pluginWithAbortOverride) OnAbort(ctx context.Context, tCtx pluginsCore. func ExampleNewPluginManager() { sCtx := &pluginsCoreMock.SetupContext{} fakeKubeClient := mocks.NewFakeKubeClient() + mockClientset := k8sfake.NewSimpleClientset() sCtx.On("KubeClient").Return(fakeKubeClient) sCtx.On("OwnerKind").Return("test") sCtx.On("EnqueueOwner").Return(pluginsCore.EnqueueOwner(func(name k8stypes.NamespacedName) error { return nil })) sCtx.On("MetricsScope").Return(promutils.NewTestScope()) ctx := context.TODO() + exec, err := NewPluginManager(ctx, sCtx, k8s.PluginEntry{ ID: "SampleHandler", RegisteredTaskTypes: []pluginsCore.TaskType{"container"}, ResourceToWatch: &v1.Pod{}, Plugin: k8sSampleHandler{}, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) if err == nil { fmt.Printf("Created executor: %v\n", exec.GetID()) } else { @@ -203,7 +200,7 @@ func getMockTaskExecutionMetadata() pluginsCore.TaskExecutionMetadata { taskExecutionMetadata.On("GetNamespace").Return("ns") taskExecutionMetadata.On("GetAnnotations").Return(map[string]string{"aKey": "aVal"}) taskExecutionMetadata.On("GetLabels").Return(map[string]string{"lKey": "lVal"}) - taskExecutionMetadata.On("GetOwnerReference").Return(v12.OwnerReference{Name: "x"}) + taskExecutionMetadata.On("GetOwnerReference").Return(metav1.OwnerReference{Name: "x"}) id := &pluginsCoreMock.TaskExecutionID{} id.On("GetGeneratedName").Return("test") @@ -217,7 +214,7 @@ func getMockTaskExecutionMetadataCustom( ns string, annotations map[string]string, labels map[string]string, - ownerRef v12.OwnerReference) pluginsCore.TaskExecutionMetadata { + ownerRef metav1.OwnerReference) pluginsCore.TaskExecutionMetadata { taskExecutionMetadata := &pluginsCoreMock.TaskExecutionMetadata{} taskExecutionMetadata.On("GetNamespace").Return(ns) taskExecutionMetadata.On("GetAnnotations").Return(annotations) @@ -251,26 +248,24 @@ func buildPluginWithAbortOverride(ctx context.Context, tctx pluginsCore.TaskExec pluginResource := &v1.Pod{} mockResourceHandler := new(pluginWithAbortOverride) - mockResourceHandler.On( "OnAbort", ctx, tctx, pluginResource, ).Return(abortBehavior, nil) - mockResourceHandler.On( "BuildIdentityResource", ctx, tctx.TaskExecutionMetadata(), ).Return(pluginResource, nil) - mockResourceHandler.On("GetProperties").Return(k8s.PluginProperties{}) mockClient := extendedFakeClient{ Client: client, } + mockClientset := k8sfake.NewSimpleClientset() return NewPluginManager(ctx, dummySetupContext(mockClient), k8s.PluginEntry{ ID: "x", ResourceToWatch: pluginResource, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) } func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { @@ -285,11 +280,12 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil) fakeClient := fake.NewClientBuilder().WithRuntimeObjects().Build() + mockClientset := k8sfake.NewSimpleClientset() pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NoError(t, err) transition, err := pluginManager.Handle(ctx, tCtx) @@ -300,7 +296,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { assert.Equal(t, pluginsCore.PhaseQueued, transitionInfo.Phase()) createdPod := &v1.Pod{} - pluginManager.AddObjectMetadata(tCtx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) + pluginManager.addObjectMetadata(tCtx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) assert.NoError(t, fakeClient.Get(ctx, k8stypes.NamespacedName{Namespace: tCtx.TaskExecutionMetadata().GetNamespace(), Name: tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()}, createdPod)) assert.Equal(t, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), createdPod.Name) @@ -314,15 +310,16 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil) fakeClient := fake.NewClientBuilder().WithRuntimeObjects().Build() + mockClientset := k8sfake.NewSimpleClientset() pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NoError(t, err) createdPod := &v1.Pod{} - pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) + pluginManager.addObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) assert.NoError(t, fakeClient.Create(ctx, createdPod)) transition, err := pluginManager.Handle(ctx, tctx) @@ -348,12 +345,13 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { Client: fake.NewClientBuilder().WithRuntimeObjects().Build(), CreateError: k8serrors.NewForbidden(schema.GroupResource{}, "", errors.New("exceeded quota")), } + mockClientset := k8sfake.NewSimpleClientset() pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NoError(t, err) createdPod := &v1.Pod{} @@ -382,12 +380,13 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { Client: fake.NewClientBuilder().WithRuntimeObjects().Build(), CreateError: k8serrors.NewForbidden(schema.GroupResource{}, "", errors.New("auth error")), } + mockClientset := k8sfake.NewSimpleClientset() pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NoError(t, err) createdPod := &v1.Pod{} @@ -434,13 +433,14 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { "exceeded quota: project-quota, requested: limits.memory=3Gi, "+ "used: limits.memory=7976Gi, limited: limits.memory=8000Gi")), } + mockClientset := k8sfake.NewSimpleClientset() backOffController := backoff.NewController(ctx) pluginManager, err := NewPluginManagerWithBackOff(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, backOffController, NewResourceMonitorIndex()) + }, backOffController, NewResourceMonitorIndex(), mockClientset) assert.NoError(t, err) transition, err := pluginManager.Handle(ctx, tctx) @@ -451,7 +451,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { // Build a reference resource that is supposed to be identical to the resource built by pluginManager referenceResource, err := mockResourceHandler.BuildResource(ctx, tctx) assert.NoError(t, err) - pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig()) + pluginManager.addObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig()) refKey := backoff.ComposeResourceKey(referenceResource) podBackOffHandler, found := backOffController.GetBackOffHandler(refKey) assert.True(t, found) @@ -463,7 +463,7 @@ func TestPluginManager_Abort(t *testing.T) { ctx := context.TODO() tm := getMockTaskExecutionMetadata() res := &v1.Pod{ - ObjectMeta: v12.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: tm.GetTaskExecutionID().GetGeneratedName(), Namespace: tm.GetNamespace(), }, @@ -473,6 +473,7 @@ func TestPluginManager_Abort(t *testing.T) { // common setup code tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme, res)} + mockClientset := k8sfake.NewSimpleClientset() // common setup code mockResourceHandler := &pluginsk8sMock.Plugin{} @@ -483,7 +484,7 @@ func TestPluginManager_Abort(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NotNil(t, res) assert.NoError(t, err) @@ -498,6 +499,7 @@ func TestPluginManager_Abort(t *testing.T) { // common setup code tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme)} + mockClientset := k8sfake.NewSimpleClientset() // common setup code mockResourceHandler := &pluginsk8sMock.Plugin{} mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) @@ -507,7 +509,7 @@ func TestPluginManager_Abort(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NotNil(t, res) assert.NoError(t, err) @@ -583,7 +585,7 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) { ctx := context.TODO() tm := getMockTaskExecutionMetadata() res := &v1.Pod{ - ObjectMeta: v12.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: tm.GetTaskExecutionID().GetGeneratedName(), Namespace: tm.GetNamespace(), }, @@ -683,6 +685,7 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) { // common setup code tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) fc := tt.args.fakeClient() + mockClientset := k8sfake.NewSimpleClientset() // common setup code mockResourceHandler := &pluginsk8sMock.Plugin{} mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) @@ -692,7 +695,7 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NotNil(t, res) assert.NoError(t, err) @@ -720,7 +723,7 @@ func TestPluginManager_Handle_PluginState(t *testing.T) { ctx := context.TODO() tm := getMockTaskExecutionMetadata() res := &v1.Pod{ - ObjectMeta: v12.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: tm.GetTaskExecutionID().GetGeneratedName(), Namespace: tm.GetNamespace(), }, @@ -839,6 +842,7 @@ func TestPluginManager_Handle_PluginState(t *testing.T) { tCtx.OnOutputWriter().Return(&dummyOutputWriter{}) fc := extendedFakeClient{Client: fake.NewFakeClient(res)} + mockClientset := k8sfake.NewSimpleClientset() mockResourceHandler := &pluginsk8sMock.Plugin{} mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) @@ -850,7 +854,7 @@ func TestPluginManager_Handle_PluginState(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NoError(t, err) // handle plugin @@ -877,6 +881,7 @@ func TestPluginManager_CustomKubeClient(t *testing.T) { fakeClient := fake.NewClientBuilder().Build() newFakeClient := &pluginsCoreMock.KubeClient{} newFakeClient.On("GetCache").Return(&mocks.FakeInformers{}) + mockClientset := k8sfake.NewSimpleClientset() pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ ID: "x", ResourceToWatch: &v1.Pod{}, @@ -884,7 +889,7 @@ func TestPluginManager_CustomKubeClient(t *testing.T) { CustomKubeClient: func(ctx context.Context) (pluginsCore.KubeClient, error) { return newFakeClient, nil }, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NoError(t, err) assert.Equal(t, newFakeClient, pluginManager.kubeClient) @@ -893,7 +898,7 @@ func TestPluginManager_CustomKubeClient(t *testing.T) { func TestPluginManager_AddObjectMetadata(t *testing.T) { genName := "gen-name" ns := "ns" - or := v12.OwnerReference{} + or := metav1.OwnerReference{} l := map[string]string{"l1": "lv1"} a := map[string]string{"aKey": "aVal"} tm := getMockTaskExecutionMetadataCustom(genName, ns, a, l, or) @@ -905,9 +910,9 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) { p := pluginsk8sMock.Plugin{} p.OnGetProperties().Return(k8s.PluginProperties{}) pluginManager := PluginManager{plugin: &p} - pluginManager.AddObjectMetadata(tm, o, cfg) + pluginManager.addObjectMetadata(tm, o, cfg) assert.Equal(t, genName, o.GetName()) - assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences()) + assert.Equal(t, []metav1.OwnerReference{or}, o.GetOwnerReferences()) assert.Equal(t, ns, o.GetNamespace()) assert.Equal(t, map[string]string{ "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", @@ -922,7 +927,7 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) { p.OnGetProperties().Return(k8s.PluginProperties{DisableInjectOwnerReferences: true}) pluginManager := PluginManager{plugin: &p} o := &v1.Pod{} - pluginManager.AddObjectMetadata(tm, o, cfg) + pluginManager.addObjectMetadata(tm, o, cfg) assert.Equal(t, genName, o.GetName()) // empty OwnerReference since we are ignoring assert.Equal(t, 0, len(o.GetOwnerReferences())) @@ -942,7 +947,7 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) { // enable finalizer injection cfg.InjectFinalizer = true o := &v1.Pod{} - pluginManager.AddObjectMetadata(tm, o, cfg) + pluginManager.addObjectMetadata(tm, o, cfg) assert.Equal(t, genName, o.GetName()) // empty OwnerReference since we are ignoring assert.Equal(t, 1, len(o.GetOwnerReferences())) @@ -962,7 +967,7 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) { // disable finalizer injection cfg.InjectFinalizer = false o := &v1.Pod{} - pluginManager.AddObjectMetadata(tm, o, cfg) + pluginManager.addObjectMetadata(tm, o, cfg) assert.Equal(t, genName, o.GetName()) // empty OwnerReference since we are ignoring assert.Equal(t, 1, len(o.GetOwnerReferences())) diff --git a/pkg/controller/nodes/task/plugin_config.go b/pkg/controller/nodes/task/plugin_config.go index 07f6d3a7a..88d1102d9 100644 --- a/pkg/controller/nodes/task/plugin_config.go +++ b/pkg/controller/nodes/task/plugin_config.go @@ -6,19 +6,20 @@ import ( "strings" "sync" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/backoff" + "k8s.io/client-go/kubernetes" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" - "github.com/flyteorg/flytestdlib/logger" - "github.com/flyteorg/flyteplugins/go/tasks/plugins/webapi/agent" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/backoff" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/k8s" + "github.com/flyteorg/flytestdlib/logger" ) var once sync.Once -func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface) (enabledPlugins []core.PluginEntry, defaultForTaskTypes map[pluginID][]taskType, err error) { +func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface, + kubeClientset kubernetes.Interface) (enabledPlugins []core.PluginEntry, defaultForTaskTypes map[pluginID][]taskType, err error) { if cfg == nil { return nil, nil, fmt.Errorf("unable to initialize plugin list, cfg is a required argument") } @@ -65,7 +66,7 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu ID: id, RegisteredTaskTypes: kpe.RegisteredTaskTypes, LoadPlugin: func(ctx context.Context, iCtx core.SetupContext) (plugin core.Plugin, e error) { - return k8s.NewPluginManagerWithBackOff(ctx, iCtx, kpe, backOffController, monitorIndex) + return k8s.NewPluginManagerWithBackOff(ctx, iCtx, kpe, backOffController, monitorIndex, kubeClientset) }, IsDefault: kpe.IsDefault, } diff --git a/pkg/controller/nodes/task/plugin_config_test.go b/pkg/controller/nodes/task/plugin_config_test.go index 2dbf9f31c..bcd4c9972 100644 --- a/pkg/controller/nodes/task/plugin_config_test.go +++ b/pkg/controller/nodes/task/plugin_config_test.go @@ -6,14 +6,14 @@ import ( "testing" "time" - config2 "github.com/flyteorg/flytestdlib/config" - - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" "github.com/magiconair/properties/assert" "k8s.io/apimachinery/pkg/util/sets" + k8sfake "k8s.io/client-go/kubernetes/fake" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config" + flytestdlibConfig "github.com/flyteorg/flytestdlib/config" ) func TestWranglePluginsAndGenerateFinalList(t *testing.T) { @@ -47,13 +47,13 @@ func TestWranglePluginsAndGenerateFinalList(t *testing.T) { args args want want }{ - {"config-no-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer}}, backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{}}, - {"no-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: nil}, backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{}}, + {"config-no-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer}}, backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: flytestdlibConfig.Duration{Duration: time.Second * 0}}}, want{}}, + {"no-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: nil}, backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: flytestdlibConfig.Duration{Duration: time.Second * 0}}}, want{}}, {"no-config-no-plugins", args{}, want{err: true}}, {"no-config-plugins", args{corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{err: true}}, {"empty-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{final: sets.NewString(k8sContainer, k8sOther, coreOther, coreContainer)}}, - {"config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer, k8sOther}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}}, - {"case-differs-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{strings.ToUpper(coreContainer), strings.ToUpper(k8sOther)}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}}, + {"config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer, k8sOther}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: flytestdlibConfig.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}}, + {"case-differs-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{strings.ToUpper(coreContainer), strings.ToUpper(k8sOther)}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: flytestdlibConfig.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -61,7 +61,8 @@ func TestWranglePluginsAndGenerateFinalList(t *testing.T) { core: tt.args.corePlugins, k8s: tt.args.k8sPlugins, } - got, _, err := WranglePluginsAndGenerateFinalList(context.TODO(), tt.args.cfg, pr) + mockClientset := k8sfake.NewSimpleClientset() + got, _, err := WranglePluginsAndGenerateFinalList(context.TODO(), tt.args.cfg, pr, mockClientset) if (err != nil) != tt.want.err { t.Errorf("WranglePluginsAndGenerateFinalList() error = %v, wantErr %v", err, tt.want.err) return diff --git a/pkg/controller/workflow/executor_test.go b/pkg/controller/workflow/executor_test.go index ddd553dd8..13e2d7862 100644 --- a/pkg/controller/workflow/executor_test.go +++ b/pkg/controller/workflow/executor_test.go @@ -9,58 +9,52 @@ import ( "testing" "time" - "k8s.io/apimachinery/pkg/util/rand" - - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - - "github.com/flyteorg/flyteidl/clients/go/coreutils" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" - - "github.com/flyteorg/flytestdlib/contextutils" - "github.com/flyteorg/flytestdlib/promutils/labeled" + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - + "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" + k8sfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" + "github.com/flyteorg/flyteidl/clients/go/coreutils" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery" pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" - "github.com/flyteorg/flytestdlib/logger" - "github.com/flyteorg/flytepropeller/events" eventsErr "github.com/flyteorg/flytepropeller/events/errors" eventMocks "github.com/flyteorg/flytepropeller/events/mocks" - mocks2 "github.com/flyteorg/flytepropeller/pkg/controller/executors/mocks" + "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + executorMocks "github.com/flyteorg/flytepropeller/pkg/controller/executors/mocks" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/factory" + gateMocks "github.com/flyteorg/flytepropeller/pkg/controller/nodes/gate/mocks" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" nodemocks "github.com/flyteorg/flytepropeller/pkg/controller/nodes/interfaces/mocks" + recoveryMocks "github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery/mocks" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/catalog" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/fakeplugins" - wfErrors "github.com/flyteorg/flytepropeller/pkg/controller/workflow/errors" - - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytestdlib/contextutils" + "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/promutils/labeled" "github.com/flyteorg/flytestdlib/storage" "github.com/flyteorg/flytestdlib/yamlutils" - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes" - "github.com/stretchr/testify/assert" - "k8s.io/client-go/tools/record" - - "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" - "github.com/flyteorg/flytepropeller/pkg/controller/config" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes" - gatemocks "github.com/flyteorg/flytepropeller/pkg/controller/nodes/gate/mocks" - recoveryMocks "github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery/mocks" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" ) var ( testScope = promutils.NewScope("test_wfexec") - fakeKubeClient = mocks2.NewFakeKubeClient() - signalClient = &gatemocks.SignalServiceClient{} + fakeKubeClient = executorMocks.NewFakeKubeClient() + mockClientset = k8sfake.NewSimpleClientset() + signalClient = &gateMocks.SignalServiceClient{} ) const ( @@ -248,7 +242,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Error(t *testing.T) { recoveryClient := &recoveryMocks.Client{} adminClient := launchplan.NewFailFastLaunchPlanExecutor() - handlerFactory, err := factory.NewHandlerFactory(ctx, adminClient, adminClient, fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, promutils.NewTestScope()) + handlerFactory, err := factory.NewHandlerFactory(ctx, adminClient, adminClient, fakeKubeClient, mockClientset, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, promutils.NewTestScope()) assert.NoError(t, err) nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient, @@ -331,7 +325,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow(t *testing.T) { recoveryClient := &recoveryMocks.Client{} adminClient := launchplan.NewFailFastLaunchPlanExecutor() - handlerFactory, err := factory.NewHandlerFactory(ctx, adminClient, adminClient, fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, promutils.NewTestScope()) + handlerFactory, err := factory.NewHandlerFactory(ctx, adminClient, adminClient, fakeKubeClient, mockClientset, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, promutils.NewTestScope()) assert.NoError(t, err) nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient, @@ -607,7 +601,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Events(t *testing.T) { adminClient := launchplan.NewFailFastLaunchPlanExecutor() recoveryClient := &recoveryMocks.Client{} - handlerFactory, err := factory.NewHandlerFactory(ctx, adminClient, adminClient, fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, promutils.NewTestScope()) + handlerFactory, err := factory.NewHandlerFactory(ctx, adminClient, adminClient, fakeKubeClient, mockClientset, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, promutils.NewTestScope()) assert.NoError(t, err) nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient, From 1a368779ecdd66b747800042b3af18d6831b88e1 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Wed, 20 Sep 2023 15:46:00 -0700 Subject: [PATCH 3/9] Refactor to use batched events Signed-off-by: Andrew Dye --- go.mod | 3 +- .../nodes/task/k8s/event_watcher.go | 97 ++++++----- .../nodes/task/k8s/event_watcher_test.go | 159 ++++++------------ .../nodes/task/k8s/plugin_manager.go | 31 ++-- pkg/controller/nodes/task/transformer.go | 42 +++-- 5 files changed, 152 insertions(+), 180 deletions(-) diff --git a/go.mod b/go.mod index adc598be8..48665d26a 100644 --- a/go.mod +++ b/go.mod @@ -147,6 +147,7 @@ require ( replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d -replace github.com/flyteorg/flyteidl => github.com/flyteorg/flyteidl v1.5.11-0.20230614183933-d56d4d37bf34 +// TODO: update version references once dependent PRs are merged +replace github.com/flyteorg/flyteidl => /Users/andrew/dev/flyteorg/flyteidl replace github.com/flyteorg/flyteplugins => /Users/andrew/dev/flyteorg/flyteplugins diff --git a/pkg/controller/nodes/task/k8s/event_watcher.go b/pkg/controller/nodes/task/k8s/event_watcher.go index a6b157ee1..18280951c 100644 --- a/pkg/controller/nodes/task/k8s/event_watcher.go +++ b/pkg/controller/nodes/task/k8s/event_watcher.go @@ -3,6 +3,7 @@ package k8s import ( "context" "sort" + "sync" "time" eventsv1 "k8s.io/api/events/v1" @@ -18,77 +19,84 @@ import ( ) type EventWatcher interface { - List(objectNsName types.NamespacedName, createdAfter time.Time) []*eventsv1.Event + List(objectNsName types.NamespacedName, createdAfter time.Time) []*EventInfo } type eventWatcher struct { - ctx context.Context - informer informerEventsv1.EventInformer - objectEvents map[types.NamespacedName]eventSet + informer informerEventsv1.EventInformer + objectCache sync.Map } -type eventSet map[types.NamespacedName]*eventsv1.Event +type objectEvents struct { + mu sync.RWMutex + eventInfos map[types.NamespacedName]*EventInfo +} + +// EventInfo stores detail about the event and the timestamp of the first occurrence. +// All other fields are thrown away to conserve space. +type EventInfo struct { + Reason string + Note string + CreatedAt time.Time + RecordedAt time.Time +} func (e *eventWatcher) OnAdd(obj interface{}) { event := obj.(*eventsv1.Event) objectNsName := types.NamespacedName{Namespace: event.Regarding.Namespace, Name: event.Regarding.Name} eventNsName := types.NamespacedName{Namespace: event.Namespace, Name: event.Name} - events, ok := e.objectEvents[objectNsName] - if !ok { - events = make(eventSet) - e.objectEvents[objectNsName] = events - } - if _, ok := events[eventNsName]; ok { - logger.Warnf(e.ctx, "Event add [%s/%s] received for object [%s/%s] that already exists in the cache", - event.Namespace, event.Name, event.Regarding.Namespace, event.Regarding.Name) + v, _ := e.objectCache.LoadOrStore(objectNsName, &objectEvents{ + eventInfos: map[types.NamespacedName]*EventInfo{}, + }) + objEvents := v.(*objectEvents) + objEvents.mu.Lock() + defer objEvents.mu.Unlock() + objEvents.eventInfos[eventNsName] = &EventInfo{ + Reason: event.Reason, + Note: event.Note, + CreatedAt: event.CreationTimestamp.Time, + RecordedAt: time.Now(), } - events[eventNsName] = event } func (e *eventWatcher) OnUpdate(_, newObj interface{}) { - event := newObj.(*eventsv1.Event) - objectNsName := types.NamespacedName{Namespace: event.Regarding.Namespace, Name: event.Regarding.Name} - eventNsName := types.NamespacedName{Namespace: event.Namespace, Name: event.Name} - events, ok := e.objectEvents[objectNsName] - if !ok { - logger.Warn(e.ctx, "Event update [%s/%s] received for object [%s/%s] that does not exist in the cache", - event.Namespace, event.Name, event.Regarding.Namespace, event.Regarding.Name) - events = make(eventSet) - e.objectEvents[objectNsName] = events - } - events[eventNsName] = event + // Dropping event updates since we only care about the creation } func (e *eventWatcher) OnDelete(obj interface{}) { event := obj.(*eventsv1.Event) objectNsName := types.NamespacedName{Namespace: event.Regarding.Namespace, Name: event.Regarding.Name} eventNsName := types.NamespacedName{Namespace: event.Namespace, Name: event.Name} - events, ok := e.objectEvents[objectNsName] - if !ok { - logger.Warn(e.ctx, "Event delete [%s/%s] received for object [%s/%s] that does not exist in the cache", - event.Namespace, event.Name, event.Regarding.Namespace, event.Regarding.Name) - return - } - delete(events, eventNsName) - if len(events) == 0 { - delete(e.objectEvents, objectNsName) + v, _ := e.objectCache.LoadOrStore(objectNsName, &objectEvents{}) + objEvents := v.(*objectEvents) + objEvents.mu.Lock() + defer objEvents.mu.Unlock() + delete(objEvents.eventInfos, eventNsName) + if len(objEvents.eventInfos) == 0 { + e.objectCache.Delete(objectNsName) } } // List returns all events for the given object that were created after the given time, sorted by creation time. -func (e *eventWatcher) List(objectNsName types.NamespacedName, createdAfter time.Time) []*eventsv1.Event { - events, ok := e.objectEvents[objectNsName] - if !ok { - return []*eventsv1.Event{} +func (e *eventWatcher) List(objectNsName types.NamespacedName, createdAfter time.Time) []*EventInfo { + v, _ := e.objectCache.Load(objectNsName) + if v == nil { + return []*EventInfo{} } - result := make([]*eventsv1.Event, 0, len(events)) - for _, event := range events { - if event.CreationTimestamp.Time.After(createdAfter) { - result = append(result, event) + objEvents := v.(*objectEvents) + objEvents.mu.RLock() + defer objEvents.mu.RUnlock() + // This logic assumes that cardinality of events per object is relatively low, so iterating over them to find + // recent ones and sorting the results is not too expensive. + result := make([]*EventInfo, 0, len(objEvents.eventInfos)) + for _, eventInfo := range objEvents.eventInfos { + if eventInfo.CreatedAt.After(createdAfter) { + result = append(result, eventInfo) } } sort.SliceStable(result, func(i, j int) bool { - return result[i].CreationTimestamp.Time.Before(result[j].CreationTimestamp.Time) + return result[i].CreatedAt.Before(result[j].CreatedAt) || + (result[i].CreatedAt.Equal(result[j].CreatedAt) && result[i].RecordedAt.Before(result[j].RecordedAt)) }) return result } @@ -100,8 +108,7 @@ func NewEventWatcher(ctx context.Context, gvk schema.GroupVersionKind, kubeClien eventInformer := informers.NewSharedInformerFactoryWithOptions( kubeClientset, 0, informers.WithTweakListOptions(objectSelector)).Events().V1().Events() watcher := &eventWatcher{ - informer: eventInformer, - objectEvents: make(map[types.NamespacedName]eventSet), + informer: eventInformer, } eventInformer.Informer().AddEventHandler(watcher) diff --git a/pkg/controller/nodes/task/k8s/event_watcher_test.go b/pkg/controller/nodes/task/k8s/event_watcher_test.go index 5fcb00ffe..9247a3e72 100644 --- a/pkg/controller/nodes/task/k8s/event_watcher_test.go +++ b/pkg/controller/nodes/task/k8s/event_watcher_test.go @@ -1,7 +1,6 @@ package k8s import ( - "context" "testing" "time" @@ -14,138 +13,89 @@ import ( ) func TestEventWatcher_OnAdd(t *testing.T) { - ctx := context.Background() now := time.Now() - ew := eventWatcher{ctx: ctx, objectEvents: map[types.NamespacedName]eventSet{ - {Namespace: "ns1", Name: "name1"}: { - {Namespace: "eventns1", Name: "eventname1"}: &eventsv1.Event{EventTime: metav1.NewMicroTime(now.Add(-time.Minute))}, + ew := eventWatcher{} + ew.objectCache.Store(types.NamespacedName{Namespace: "ns1", Name: "name1"}, &objectEvents{ + eventInfos: map[types.NamespacedName]*EventInfo{ + {Namespace: "eventns1", Name: "eventname1"}: {CreatedAt: now.Add(-time.Minute)}, }, - {Namespace: "ns2", Name: "name2"}: {}, - }} + }) + ew.objectCache.Store(types.NamespacedName{Namespace: "ns2", Name: "name2"}, &objectEvents{ + eventInfos: map[types.NamespacedName]*EventInfo{}, + }) t.Run("existing event", func(t *testing.T) { ew.OnAdd(&eventsv1.Event{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "eventns1", - Name: "eventname1", + Namespace: "eventns1", + Name: "eventname1", + CreationTimestamp: metav1.NewTime(now), }, Regarding: corev1.ObjectReference{ Namespace: "ns1", Name: "name1", }, - EventTime: metav1.NewMicroTime(now), }) - assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns1", Name: "name1"}], 1) - assert.Equal(t, now, ew.objectEvents[types.NamespacedName{Namespace: "ns1", Name: "name1"}][types.NamespacedName{Namespace: "eventns1", Name: "eventname1"}].EventTime.Time) + v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns1", Name: "name1"}) + assert.NotNil(t, v) + objEvents := v.(*objectEvents) + assert.Len(t, objEvents.eventInfos, 1) + assert.Equal(t, now, objEvents.eventInfos[types.NamespacedName{Namespace: "eventns1", Name: "eventname1"}].CreatedAt) }) t.Run("new event", func(t *testing.T) { ew.OnAdd(&eventsv1.Event{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "eventns2", - Name: "eventname2", + Namespace: "eventns2", + Name: "eventname2", + CreationTimestamp: metav1.NewTime(now), }, Regarding: corev1.ObjectReference{ Namespace: "ns2", Name: "name2", }, - EventTime: metav1.NewMicroTime(now), }) - assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns2", Name: "name2"}], 1) - assert.Equal(t, now, ew.objectEvents[types.NamespacedName{Namespace: "ns2", Name: "name2"}][types.NamespacedName{Namespace: "eventns2", Name: "eventname2"}].EventTime.Time) + v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns2", Name: "name2"}) + assert.NotNil(t, v) + objEvents := v.(*objectEvents) + assert.Len(t, objEvents.eventInfos, 1) + assert.Equal(t, now, objEvents.eventInfos[types.NamespacedName{Namespace: "eventns2", Name: "eventname2"}].CreatedAt) }) t.Run("new object", func(t *testing.T) { ew.OnAdd(&eventsv1.Event{ ObjectMeta: metav1.ObjectMeta{ - Namespace: "eventns3", - Name: "eventname3", + Namespace: "eventns3", + Name: "eventname3", + CreationTimestamp: metav1.NewTime(now), }, Regarding: corev1.ObjectReference{ Namespace: "ns3", Name: "name3", }, - EventTime: metav1.NewMicroTime(now), }) - assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns3", Name: "name3"}], 1) - assert.Equal(t, now, ew.objectEvents[types.NamespacedName{Namespace: "ns3", Name: "name3"}][types.NamespacedName{Namespace: "eventns3", Name: "eventname3"}].EventTime.Time) + v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns3", Name: "name3"}) + assert.NotNil(t, v) + objEvents := v.(*objectEvents) + assert.Len(t, objEvents.eventInfos, 1) + assert.Equal(t, now, objEvents.eventInfos[types.NamespacedName{Namespace: "eventns3", Name: "eventname3"}].CreatedAt) }) } -func TestEventWatcher_OnUpdate(t *testing.T) { - ctx := context.Background() +func TestEventWatcher_OnDelete(t *testing.T) { now := time.Now() - ew := eventWatcher{ctx: ctx, objectEvents: map[types.NamespacedName]eventSet{ - {Namespace: "ns1", Name: "name1"}: { - {Namespace: "eventns1", Name: "eventname1"}: &eventsv1.Event{EventTime: metav1.NewMicroTime(now.Add(-time.Minute))}, + ew := eventWatcher{} + ew.objectCache.Store(types.NamespacedName{Namespace: "ns1", Name: "name1"}, &objectEvents{ + eventInfos: map[types.NamespacedName]*EventInfo{ + {Namespace: "eventns1", Name: "eventname1"}: {CreatedAt: now.Add(-time.Minute)}, }, - {Namespace: "ns2", Name: "name2"}: {}, - }} - - t.Run("existing event", func(t *testing.T) { - ew.OnUpdate(&eventsv1.Event{}, &eventsv1.Event{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "eventns1", - Name: "eventname1", - }, - Regarding: corev1.ObjectReference{ - Namespace: "ns1", - Name: "name1", - }, - EventTime: metav1.NewMicroTime(now), - }) - - assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns1", Name: "name1"}], 1) - assert.Equal(t, now, ew.objectEvents[types.NamespacedName{Namespace: "ns1", Name: "name1"}][types.NamespacedName{Namespace: "eventns1", Name: "eventname1"}].EventTime.Time) }) - - t.Run("new event", func(t *testing.T) { - ew.OnUpdate(&eventsv1.Event{}, &eventsv1.Event{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "eventns2", - Name: "eventname2", - }, - Regarding: corev1.ObjectReference{ - Namespace: "ns2", - Name: "name2", - }, - EventTime: metav1.NewMicroTime(now), - }) - - assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns2", Name: "name2"}], 1) - assert.Equal(t, now, ew.objectEvents[types.NamespacedName{Namespace: "ns2", Name: "name2"}][types.NamespacedName{Namespace: "eventns2", Name: "eventname2"}].EventTime.Time) - }) - - t.Run("new object", func(t *testing.T) { - ew.OnUpdate(&eventsv1.Event{}, &eventsv1.Event{ - ObjectMeta: metav1.ObjectMeta{ - Namespace: "eventns3", - Name: "eventname3", - }, - Regarding: corev1.ObjectReference{ - Namespace: "ns3", - Name: "name3", - }, - EventTime: metav1.NewMicroTime(now), - }) - - assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns3", Name: "name3"}], 1) - assert.Equal(t, now, ew.objectEvents[types.NamespacedName{Namespace: "ns3", Name: "name3"}][types.NamespacedName{Namespace: "eventns3", Name: "eventname3"}].EventTime.Time) + ew.objectCache.Store(types.NamespacedName{Namespace: "ns2", Name: "name2"}, &objectEvents{ + eventInfos: map[types.NamespacedName]*EventInfo{}, }) -} - -func TestEventWatcher_OnDelete(t *testing.T) { - ctx := context.Background() - now := time.Now() - ew := eventWatcher{ctx: ctx, objectEvents: map[types.NamespacedName]eventSet{ - {Namespace: "ns1", Name: "name1"}: { - {Namespace: "eventns1", Name: "eventname1"}: &eventsv1.Event{}, - }, - {Namespace: "ns2", Name: "name2"}: {}, - }} t.Run("existing event", func(t *testing.T) { ew.OnDelete(&eventsv1.Event{ @@ -159,7 +109,8 @@ func TestEventWatcher_OnDelete(t *testing.T) { }, }) - assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns1", Name: "name1"}], 0) + v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns1", Name: "name1"}) + assert.Nil(t, v) }) t.Run("missing event", func(t *testing.T) { @@ -172,10 +123,10 @@ func TestEventWatcher_OnDelete(t *testing.T) { Namespace: "ns2", Name: "name2", }, - EventTime: metav1.NewMicroTime(now), }) - assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns2", Name: "name2"}], 0) + v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns2", Name: "name2"}) + assert.Nil(t, v) }) t.Run("missing object", func(t *testing.T) { @@ -188,23 +139,25 @@ func TestEventWatcher_OnDelete(t *testing.T) { Namespace: "ns3", Name: "name3", }, - EventTime: metav1.NewMicroTime(now), }) - assert.Len(t, ew.objectEvents[types.NamespacedName{Namespace: "ns3", Name: "name3"}], 0) + v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns3", Name: "name3"}) + assert.Nil(t, v) }) } func TestEventWatcher_List(t *testing.T) { - ctx := context.Background() now := time.Now() - ew := eventWatcher{ctx: ctx, objectEvents: map[types.NamespacedName]eventSet{ - {Namespace: "ns1", Name: "name1"}: { - {Namespace: "eventns1", Name: "eventname1"}: &eventsv1.Event{ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.NewTime(now)}}, - {Namespace: "eventns2", Name: "eventname2"}: &eventsv1.Event{ObjectMeta: metav1.ObjectMeta{CreationTimestamp: metav1.NewTime(now.Add(-time.Hour))}}, + ew := eventWatcher{} + ew.objectCache.Store(types.NamespacedName{Namespace: "ns1", Name: "name1"}, &objectEvents{ + eventInfos: map[types.NamespacedName]*EventInfo{ + {Namespace: "eventns1", Name: "eventname1"}: {CreatedAt: now}, + {Namespace: "eventns2", Name: "eventname2"}: {CreatedAt: now.Add(-time.Hour)}, }, - {Namespace: "ns2", Name: "name2"}: {}, - }} + }) + ew.objectCache.Store(types.NamespacedName{Namespace: "ns2", Name: "name2"}, &objectEvents{ + eventInfos: map[types.NamespacedName]*EventInfo{}, + }) t.Run("all events", func(t *testing.T) { result := ew.List(types.NamespacedName{Namespace: "ns1", Name: "name1"}, time.Time{}) @@ -233,7 +186,7 @@ func TestEventWatcher_List(t *testing.T) { t.Run("sorted", func(t *testing.T) { result := ew.List(types.NamespacedName{Namespace: "ns1", Name: "name1"}, time.Time{}) - assert.Equal(t, metav1.NewTime(now.Add(-time.Hour)), result[0].CreationTimestamp) - assert.Equal(t, metav1.NewTime(now), result[1].CreationTimestamp) + assert.Equal(t, now.Add(-time.Hour), result[0].CreatedAt) + assert.Equal(t, now, result[1].CreatedAt) }) } diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index e5e8ecbae..b5dca2219 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -352,19 +352,19 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio // Add events since last update version := transition.Info().Version() lastEventUpdate := pluginState.LastEventUpdate - if o != nil { + if e.eventWatcher != nil && o != nil { nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} recentEvents := e.eventWatcher.List(nsName, lastEventUpdate) - for _, event := range recentEvents { - logger.Infof(ctx, "Observed event [%s:%s] for object [%s]", event.Reason, event.Note, nsName.String()) - tCtx.EventsRecorder().RecordRaw(ctx, pluginsCore.PhaseInfoWithTaskInfo( - transition.Info().Phase(), - version, - event.Note, - &pluginsCore.TaskInfo{OccurredAt: &event.CreationTimestamp.Time}, - )) + if len(recentEvents) > 0 { + taskInfo := transition.Info().Info() + taskInfo.AdditionalReasons = make([]pluginsCore.ReasonInfo, 0, len(recentEvents)) + for _, event := range recentEvents { + taskInfo.AdditionalReasons = append(taskInfo.AdditionalReasons, + pluginsCore.ReasonInfo{Reason: event.Note, OccurredAt: &event.CreatedAt}) + lastEventUpdate = event.CreatedAt + } + // Bump the version to ensure newly added events are picked up version += 1 - lastEventUpdate = event.CreationTimestamp.Time } } @@ -639,11 +639,12 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry return nil, err } - // TODO: make configurable and/or fail open - // TODO: cache subset of event info we need? - eventWatcher, err := NewEventWatcher(ctx, gvk, kubeClientset) - if err != nil { - return nil, err + var eventWatcher EventWatcher + if config.GetK8sPluginConfig().SendObjectEvents { + eventWatcher, err = NewEventWatcher(ctx, gvk, kubeClientset) + if err != nil { + return nil, err + } } // Construct the collector that will emit a gauge indicating current levels of the resource that this K8s plugin operates on diff --git a/pkg/controller/nodes/task/transformer.go b/pkg/controller/nodes/task/transformer.go index 21edca723..9bd1b6cca 100644 --- a/pkg/controller/nodes/task/transformer.go +++ b/pkg/controller/nodes/task/transformer.go @@ -3,9 +3,12 @@ package task import ( "time" + "google.golang.org/protobuf/types/known/timestamppb" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" + flytek8sConfig "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flytepropeller/pkg/controller/config" @@ -13,9 +16,6 @@ import ( "github.com/flyteorg/flytepropeller/pkg/controller/nodes/common" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/interfaces" - - "github.com/golang/protobuf/ptypes" - timestamppb "github.com/golang/protobuf/ptypes/timestamp" ) // This is used by flyteadmin to indicate that map tasks now report subtask metadata individually. @@ -88,24 +88,16 @@ type ToTaskExecutionEventInputs struct { func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutionEvent, error) { // Transitions to a new phase - var err error var occurredAt *timestamppb.Timestamp if i := input.Info.Info(); i != nil && i.OccurredAt != nil { - occurredAt, err = ptypes.TimestampProto(*i.OccurredAt) + occurredAt = timestamppb.New(*i.OccurredAt) } else { - occurredAt, err = ptypes.TimestampProto(input.OccurredAt) - } - - if err != nil { - return nil, err + occurredAt = timestamppb.New(input.OccurredAt) } - reportedAt := ptypes.TimestampNow() + reportedAt := timestamppb.Now() if i := input.Info.Info(); i != nil && i.ReportedAt != nil { - occurredAt, err = ptypes.TimestampProto(*i.ReportedAt) - if err != nil { - return nil, err - } + occurredAt = timestamppb.New(*i.ReportedAt) } taskExecID := input.TaskExecContext.TaskExecutionMetadata().GetTaskExecutionID().GetID() @@ -136,6 +128,19 @@ func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutio } } + var reasons []*event.BatchedReason + if len(input.Info.Reason()) > 0 { + reasons = append(reasons, &event.BatchedReason{ + Reason: input.Info.Reason(), + OccurredAt: occurredAt, + }) + } + for _, reasonInfo := range input.Info.Info().AdditionalReasons { + reasons = append(reasons, &event.BatchedReason{ + Reason: reasonInfo.Reason, + OccurredAt: timestamppb.New(*reasonInfo.OccurredAt), + }) + } tev := &event.TaskExecutionEvent{ TaskId: taskExecID.TaskId, ParentNodeExecutionId: nodeExecutionID, @@ -145,11 +150,16 @@ func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutio ProducerId: input.ClusterID, OccurredAt: occurredAt, TaskType: input.TaskType, - Reason: input.Info.Reason(), + Reasons: reasons, Metadata: metadata, EventVersion: taskExecutionEventVersion, ReportedAt: reportedAt, } + if !flytek8sConfig.GetK8sPluginConfig().SendObjectEvents { + // For back compat with old versions of flyteadmin, populate the deprecated reason field. + // Setting SendObjectEvents to true assumes that flyteadmin is using the new reasons field. + tev.Reason = input.Info.Reason() + } if input.Info.Phase().IsSuccess() && input.OutputWriter != nil { if input.OutputWriter.GetOutputPath() != "" { From 5b7d7f755b0ab9690b20ebd9103d69f9f3265279 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Wed, 20 Sep 2023 23:29:37 -0700 Subject: [PATCH 4/9] go.mod updates Signed-off-by: Andrew Dye --- go.mod | 10 +++++----- go.sum | 16 ++++++++-------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/go.mod b/go.mod index 48665d26a..4be9e7270 100644 --- a/go.mod +++ b/go.mod @@ -6,9 +6,9 @@ require ( github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.13.0 - github.com/flyteorg/flyteidl v1.5.10 + github.com/flyteorg/flyteidl v1.5.13 github.com/flyteorg/flyteplugins v1.1.8 - github.com/flyteorg/flytestdlib v1.0.20 + github.com/flyteorg/flytestdlib v1.0.24 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible github.com/go-test/deep v1.0.7 @@ -74,7 +74,7 @@ require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/emicklei/go-restful/v3 v3.9.0 // indirect github.com/evanphx/json-patch v4.12.0+incompatible // indirect - github.com/flyteorg/stow v0.3.6 // indirect + github.com/flyteorg/stow v0.3.7 // indirect github.com/fsnotify/fsnotify v1.5.1 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect @@ -148,6 +148,6 @@ require ( replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d // TODO: update version references once dependent PRs are merged -replace github.com/flyteorg/flyteidl => /Users/andrew/dev/flyteorg/flyteidl +replace github.com/flyteorg/flyteidl => github.com/andrewwdye/flyteidl v0.0.0-20230921054948-85b9c706f792 -replace github.com/flyteorg/flyteplugins => /Users/andrew/dev/flyteorg/flyteplugins +replace github.com/flyteorg/flyteplugins => github.com/andrewwdye/flyteplugins v0.0.0-20230921062216-736c1e51c35b diff --git a/go.sum b/go.sum index 3fa0a9b27..016389b47 100644 --- a/go.sum +++ b/go.sum @@ -114,6 +114,10 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/andrewwdye/flyteidl v0.0.0-20230921054948-85b9c706f792 h1:mkSxCviDJa5cg8obcdJJ0RhB5TE7v/y2pBhinafwvQc= +github.com/andrewwdye/flyteidl v0.0.0-20230921054948-85b9c706f792/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= +github.com/andrewwdye/flyteplugins v0.0.0-20230921062216-736c1e51c35b h1:q1VZXL+lTD608oJu7JBZGZg/dF7JKjTeZtRK59+uzYU= +github.com/andrewwdye/flyteplugins v0.0.0-20230921062216-736c1e51c35b/go.mod h1:FujFQdL/f9r1HvFR81JCiNYusDy9F0lExhyoyMHXXbg= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY= @@ -242,14 +246,10 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.5.11-0.20230614183933-d56d4d37bf34 h1:Gj5UKqJU+ozeTeYAvDWHiF4HSVufHW1W1ecymFfbbis= -github.com/flyteorg/flyteidl v1.5.11-0.20230614183933-d56d4d37bf34/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= -github.com/flyteorg/flyteplugins v1.1.8 h1:UVYdqDdcIqz2JIso+m3MsaPSsTZJZyZQ6Eg7nhX9r/Y= -github.com/flyteorg/flyteplugins v1.1.8/go.mod h1:sRxeatEOHq1b9bTxTRNcwoIkVTAVN9dTz8toXkfcz2E= -github.com/flyteorg/flytestdlib v1.0.20 h1:BrCQMlpdrFAPlADFJvCyn7gm+37df9WGYqLEB1mOlCQ= -github.com/flyteorg/flytestdlib v1.0.20/go.mod h1:v3ua7HfHDXXTCrAt2yZERGKCuilP5Rh+L8TdAbfVcBg= -github.com/flyteorg/stow v0.3.6 h1:jt50ciM14qhKBaIrB+ppXXY+SXB59FNREFgTJqCyqIk= -github.com/flyteorg/stow v0.3.6/go.mod h1:5dfBitPM004dwaZdoVylVjxFT4GWAgI0ghAndhNUzCo= +github.com/flyteorg/flytestdlib v1.0.24 h1:jDvymcjlsTRCwOtxPapro0WZBe3isTz+T3Tiq+mZUuk= +github.com/flyteorg/flytestdlib v1.0.24/go.mod h1:6nXa5g00qFIsgdvQ7jKQMJmDniqO0hG6Z5X5olfduqQ= +github.com/flyteorg/stow v0.3.7 h1:Cx7j8/Ux6+toD5hp5fy++927V+yAcAttDeQAlUD/864= +github.com/flyteorg/stow v0.3.7/go.mod h1:5dfBitPM004dwaZdoVylVjxFT4GWAgI0ghAndhNUzCo= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= From 117d7bc70d06b26680923b540e1c1268af66fd34 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Mon, 25 Sep 2023 12:27:48 -0700 Subject: [PATCH 5/9] Linits Signed-off-by: Andrew Dye --- pkg/controller/executors/dag_structure.go | 2 +- pkg/controller/executors/kube.go | 2 +- pkg/controller/executors/node_lookup.go | 2 +- pkg/controller/executors/workflow.go | 2 +- pkg/controller/nodes/task/k8s/plugin_manager.go | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/controller/executors/dag_structure.go b/pkg/controller/executors/dag_structure.go index b62b46b33..0927cefb7 100644 --- a/pkg/controller/executors/dag_structure.go +++ b/pkg/controller/executors/dag_structure.go @@ -6,7 +6,7 @@ import ( "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) -//go:generate mockery -case=underscore +//go:generate mockery -name DAGStructure -name DAGStructureWithStartNode -case=underscore // An interface that captures the Directed Acyclic Graph structure in which the nodes are connected. // If NodeLookup and DAGStructure are used together a traversal can be implemented. diff --git a/pkg/controller/executors/kube.go b/pkg/controller/executors/kube.go index 325adb5a3..b7312df6c 100644 --- a/pkg/controller/executors/kube.go +++ b/pkg/controller/executors/kube.go @@ -13,7 +13,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) -//go:generate mockery -case=underscore +//go:generate mockery -name Client -case=underscore // Client is a friendlier controller-runtime client that gets passed to executors type Client interface { diff --git a/pkg/controller/executors/node_lookup.go b/pkg/controller/executors/node_lookup.go index c14f13b1d..d37ac87a4 100644 --- a/pkg/controller/executors/node_lookup.go +++ b/pkg/controller/executors/node_lookup.go @@ -6,7 +6,7 @@ import ( "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) -//go:generate mockery -case=underscore +//go:generate mockery -name NodeLookup -case=underscore // NodeLookup provides a structure that enables looking up all nodes within the current execution hierarchy/context. // NOTE: execution hierarchy may change the nodes available, this is because when a SubWorkflow is being executed, only diff --git a/pkg/controller/executors/workflow.go b/pkg/controller/executors/workflow.go index 9af39d938..aadc1d8be 100644 --- a/pkg/controller/executors/workflow.go +++ b/pkg/controller/executors/workflow.go @@ -6,7 +6,7 @@ import ( "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) -//go:generate mockery -case=underscore +//go:generate mockery -name Workflow -case=underscore type Workflow interface { Initialize(ctx context.Context) error diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index b5dca2219..6a2b958af 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -364,7 +364,7 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio lastEventUpdate = event.CreatedAt } // Bump the version to ensure newly added events are picked up - version += 1 + version++ } } From e602095c5422f3421e370c6670363f636e29c518 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Wed, 27 Sep 2023 15:10:16 -0700 Subject: [PATCH 6/9] Update flyteidl and flyteplugins versions Signed-off-by: Andrew Dye --- go.mod | 9 ++------- go.sum | 12 ++++-------- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/go.mod b/go.mod index 6baa69a49..23db72d1c 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.13.0 - github.com/flyteorg/flyteidl v1.5.13 - github.com/flyteorg/flyteplugins v1.1.29 + github.com/flyteorg/flyteidl v1.5.21 + github.com/flyteorg/flyteplugins v1.1.31 github.com/flyteorg/flytestdlib v1.0.24 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible @@ -146,8 +146,3 @@ require ( ) replace github.com/aws/amazon-sagemaker-operator-for-k8s => github.com/aws/amazon-sagemaker-operator-for-k8s v1.0.1-0.20210303003444-0fb33b1fd49d - -// TODO: update version references once dependent PRs are merged -replace github.com/flyteorg/flyteidl => github.com/andrewwdye/flyteidl v0.0.0-20230921054948-85b9c706f792 - -replace github.com/flyteorg/flyteplugins => github.com/andrewwdye/flyteplugins v0.0.0-20230921062216-736c1e51c35b diff --git a/go.sum b/go.sum index cc9def686..e05948e55 100644 --- a/go.sum +++ b/go.sum @@ -114,10 +114,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/andrewwdye/flyteidl v0.0.0-20230921054948-85b9c706f792 h1:mkSxCviDJa5cg8obcdJJ0RhB5TE7v/y2pBhinafwvQc= -github.com/andrewwdye/flyteidl v0.0.0-20230921054948-85b9c706f792/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= -github.com/andrewwdye/flyteplugins v0.0.0-20230921062216-736c1e51c35b h1:q1VZXL+lTD608oJu7JBZGZg/dF7JKjTeZtRK59+uzYU= -github.com/andrewwdye/flyteplugins v0.0.0-20230921062216-736c1e51c35b/go.mod h1:FujFQdL/f9r1HvFR81JCiNYusDy9F0lExhyoyMHXXbg= github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo29Kk6CurOXKm700vrz8f0KW0JNfpkRJY/8= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e/go.mod h1:F7bn7fEU90QkQ3tnmaTx3LTKLEDqnwWODIYppRQ5hnY= @@ -246,10 +242,10 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.5.13 h1:IQ2Cw+u36ew3BPyRDAcHdzc/GyNEOXOxhKy9jbS4hbo= -github.com/flyteorg/flyteidl v1.5.13/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= -github.com/flyteorg/flyteplugins v1.1.29 h1:75I045EgfwNcyYzDhNwWye80+nyMmWmiEW3G+kOvOxc= -github.com/flyteorg/flyteplugins v1.1.29/go.mod h1:FujFQdL/f9r1HvFR81JCiNYusDy9F0lExhyoyMHXXbg= +github.com/flyteorg/flyteidl v1.5.21 h1:zP1byUlNFqstTe7Io1DiiNgNf+mZAVmGZM04oIUA5kU= +github.com/flyteorg/flyteidl v1.5.21/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= +github.com/flyteorg/flyteplugins v1.1.31 h1:9LHEWq6I4/hh4BeSk7qKwgeaBSyedD8V5se54v77yYo= +github.com/flyteorg/flyteplugins v1.1.31/go.mod h1:FujFQdL/f9r1HvFR81JCiNYusDy9F0lExhyoyMHXXbg= github.com/flyteorg/flytestdlib v1.0.24 h1:jDvymcjlsTRCwOtxPapro0WZBe3isTz+T3Tiq+mZUuk= github.com/flyteorg/flytestdlib v1.0.24/go.mod h1:6nXa5g00qFIsgdvQ7jKQMJmDniqO0hG6Z5X5olfduqQ= github.com/flyteorg/stow v0.3.7 h1:Cx7j8/Ux6+toD5hp5fy++927V+yAcAttDeQAlUD/864= From 28ac3ddf8359ef30fd9d98aecc711fa23f7af519 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Wed, 27 Sep 2023 15:21:33 -0700 Subject: [PATCH 7/9] Update to EventReason Signed-off-by: Andrew Dye --- pkg/controller/nodes/task/plugin_config.go | 2 +- pkg/controller/nodes/task/transformer.go | 6 +++--- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/controller/nodes/task/plugin_config.go b/pkg/controller/nodes/task/plugin_config.go index e49ffa664..ff4c1f26f 100644 --- a/pkg/controller/nodes/task/plugin_config.go +++ b/pkg/controller/nodes/task/plugin_config.go @@ -28,7 +28,7 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu // Register the GRPC plugin after the config is loaded pluginsConfigMeta, err := cfg.GetEnabledPlugins() - once.Do(func() { agent.RegisterAgentPlugin() }) + once.Do(func() { agent.RegisterAgentPlugin(pluginsConfigMeta.AllDefaultForTaskTypes[AgentServiceKey]) }) if err != nil { return nil, nil, err diff --git a/pkg/controller/nodes/task/transformer.go b/pkg/controller/nodes/task/transformer.go index 9bd1b6cca..e671b75fa 100644 --- a/pkg/controller/nodes/task/transformer.go +++ b/pkg/controller/nodes/task/transformer.go @@ -128,15 +128,15 @@ func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutio } } - var reasons []*event.BatchedReason + var reasons []*event.EventReason if len(input.Info.Reason()) > 0 { - reasons = append(reasons, &event.BatchedReason{ + reasons = append(reasons, &event.EventReason{ Reason: input.Info.Reason(), OccurredAt: occurredAt, }) } for _, reasonInfo := range input.Info.Info().AdditionalReasons { - reasons = append(reasons, &event.BatchedReason{ + reasons = append(reasons, &event.EventReason{ Reason: reasonInfo.Reason, OccurredAt: timestamppb.New(*reasonInfo.OccurredAt), }) From 48ca09d80fe3f16c5ee88a7609712094ac2215a1 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Wed, 27 Sep 2023 16:04:59 -0700 Subject: [PATCH 8/9] Comments Signed-off-by: Andrew Dye --- pkg/controller/nodes/task/handler.go | 1 - pkg/controller/nodes/task/k8s/event_watcher.go | 14 ++++++++++---- pkg/controller/nodes/task/k8s/plugin_manager.go | 14 +++++++------- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index fc928fc99..5c48441c0 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -613,7 +613,6 @@ func (t Handler) Handle(ctx context.Context, nCtx interfaces.NodeExecutionContex if err != nil { return handler.UnknownTransition, err } - logger.Infof(ctx, "Recording buffered event [%s]", evInfo.Reason) if err := nCtx.EventsRecorder().RecordTaskEvent(ctx, evInfo, t.eventConfig); err != nil { logger.Errorf(ctx, "Event recording failed for Plugin [%s], eventPhase [%s], error :%s", p.GetID(), evInfo.Phase.String(), err.Error()) // Check for idempotency diff --git a/pkg/controller/nodes/task/k8s/event_watcher.go b/pkg/controller/nodes/task/k8s/event_watcher.go index 18280951c..6e7c40ac7 100644 --- a/pkg/controller/nodes/task/k8s/event_watcher.go +++ b/pkg/controller/nodes/task/k8s/event_watcher.go @@ -9,7 +9,7 @@ import ( eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" informerEventsv1 "k8s.io/client-go/informers/events/v1" @@ -22,6 +22,11 @@ type EventWatcher interface { List(objectNsName types.NamespacedName, createdAfter time.Time) []*EventInfo } +// eventWatcher is a simple wrapper around the informer that keeps track of outstanding object events. +// Event lifetime is controlled by kube-apiserver (see --event-ttl flag) and defaults to one hour. As a result, +// the cache size is bounded by the number of event objects created in the last hour (or otherwise configured ttl). +// Note that cardinality of per object events is relatively low (10s), while they may occur repeatedly. For example +// the ImagePullBackOff event may continue to fire, but this is only backed by a single event. type eventWatcher struct { informer informerEventsv1.EventInformer objectCache sync.Map @@ -101,9 +106,10 @@ func (e *eventWatcher) List(objectNsName types.NamespacedName, createdAfter time return result } -func NewEventWatcher(ctx context.Context, gvk schema.GroupVersionKind, kubeClientset kubernetes.Interface) (EventWatcher, error) { +func NewEventWatcher(ctx context.Context, obj runtime.Object, kubeClientset kubernetes.Interface) (EventWatcher, error) { + kind := obj.GetObjectKind().GroupVersionKind().Kind objectSelector := func(opts *metav1.ListOptions) { - opts.FieldSelector = fields.OneTermEqualSelector("regarding.kind", gvk.Kind).String() + opts.FieldSelector = fields.OneTermEqualSelector("regarding.kind", kind).String() } eventInformer := informers.NewSharedInformerFactoryWithOptions( kubeClientset, 0, informers.WithTweakListOptions(objectSelector)).Events().V1().Events() @@ -113,7 +119,7 @@ func NewEventWatcher(ctx context.Context, gvk schema.GroupVersionKind, kubeClien eventInformer.Informer().AddEventHandler(watcher) go eventInformer.Informer().Run(ctx.Done()) - logger.Debugf(ctx, "Started informer for [%s] events", gvk.Kind) + logger.Debugf(ctx, "Started informer for [%s] events", kind) return watcher, nil } diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 6a2b958af..4ff036d50 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -568,13 +568,8 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry droppedUpdateCount := labeled.NewCounter("informer_update_dropped", "Update events from informer that have the same resource version", metricsScope) genericCount := labeled.NewCounter("informer_generic", "Generic events from informer", metricsScope) - gvk, err := getPluginGvk(entry.ResourceToWatch) - if err != nil { - return nil, err - } - enqueueOwner := iCtx.EnqueueOwner() - err = src.Start( + err := src.Start( ctx, // Handlers handler.Funcs{ @@ -641,13 +636,18 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry var eventWatcher EventWatcher if config.GetK8sPluginConfig().SendObjectEvents { - eventWatcher, err = NewEventWatcher(ctx, gvk, kubeClientset) + eventWatcher, err = NewEventWatcher(ctx, entry.ResourceToWatch, kubeClientset) if err != nil { return nil, err } } // Construct the collector that will emit a gauge indicating current levels of the resource that this K8s plugin operates on + gvk, err := getPluginGvk(entry.ResourceToWatch) + if err != nil { + return nil, err + } + pluginInformer, err := getPluginSharedInformer(ctx, kubeClient, entry.ResourceToWatch) if err != nil { return nil, err From fe453ebc4b4bbb15d0328b78aab03fcc752029e4 Mon Sep 17 00:00:00 2001 From: Andrew Dye Date: Fri, 29 Sep 2023 09:58:42 -0700 Subject: [PATCH 9/9] Readd GroupVersionKind changes Signed-off-by: Andrew Dye --- pkg/controller/nodes/task/k8s/event_watcher.go | 9 ++++----- pkg/controller/nodes/task/k8s/plugin_manager.go | 12 ++++++------ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/pkg/controller/nodes/task/k8s/event_watcher.go b/pkg/controller/nodes/task/k8s/event_watcher.go index 6e7c40ac7..5b5b11890 100644 --- a/pkg/controller/nodes/task/k8s/event_watcher.go +++ b/pkg/controller/nodes/task/k8s/event_watcher.go @@ -9,7 +9,7 @@ import ( eventsv1 "k8s.io/api/events/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" informerEventsv1 "k8s.io/client-go/informers/events/v1" @@ -106,10 +106,9 @@ func (e *eventWatcher) List(objectNsName types.NamespacedName, createdAfter time return result } -func NewEventWatcher(ctx context.Context, obj runtime.Object, kubeClientset kubernetes.Interface) (EventWatcher, error) { - kind := obj.GetObjectKind().GroupVersionKind().Kind +func NewEventWatcher(ctx context.Context, gvk schema.GroupVersionKind, kubeClientset kubernetes.Interface) (EventWatcher, error) { objectSelector := func(opts *metav1.ListOptions) { - opts.FieldSelector = fields.OneTermEqualSelector("regarding.kind", kind).String() + opts.FieldSelector = fields.OneTermEqualSelector("regarding.kind", gvk.Kind).String() } eventInformer := informers.NewSharedInformerFactoryWithOptions( kubeClientset, 0, informers.WithTweakListOptions(objectSelector)).Events().V1().Events() @@ -119,7 +118,7 @@ func NewEventWatcher(ctx context.Context, obj runtime.Object, kubeClientset kube eventInformer.Informer().AddEventHandler(watcher) go eventInformer.Informer().Run(ctx.Done()) - logger.Debugf(ctx, "Started informer for [%s] events", kind) + logger.Debugf(ctx, "Started informer for [%s] events", gvk.Kind) return watcher, nil } diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 4ff036d50..1b281f66d 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -634,20 +634,20 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry return nil, err } + gvk, err := getPluginGvk(entry.ResourceToWatch) + if err != nil { + return nil, err + } + var eventWatcher EventWatcher if config.GetK8sPluginConfig().SendObjectEvents { - eventWatcher, err = NewEventWatcher(ctx, entry.ResourceToWatch, kubeClientset) + eventWatcher, err = NewEventWatcher(ctx, gvk, kubeClientset) if err != nil { return nil, err } } // Construct the collector that will emit a gauge indicating current levels of the resource that this K8s plugin operates on - gvk, err := getPluginGvk(entry.ResourceToWatch) - if err != nil { - return nil, err - } - pluginInformer, err := getPluginSharedInformer(ctx, kubeClient, entry.ResourceToWatch) if err != nil { return nil, err