diff --git a/go.mod b/go.mod index 217a0bd83..23db72d1c 100644 --- a/go.mod +++ b/go.mod @@ -6,8 +6,8 @@ require ( github.com/DiSiqueira/GoTree v1.0.1-0.20180907134536-53a8e837f295 github.com/benlaurie/objecthash v0.0.0-20180202135721-d1e3d6079fc1 github.com/fatih/color v1.13.0 - github.com/flyteorg/flyteidl v1.5.13 - github.com/flyteorg/flyteplugins v1.1.30 + github.com/flyteorg/flyteidl v1.5.21 + github.com/flyteorg/flyteplugins v1.1.31 github.com/flyteorg/flytestdlib v1.0.24 github.com/ghodss/yaml v1.0.0 github.com/go-redis/redis v6.15.7+incompatible diff --git a/go.sum b/go.sum index e5f974662..e05948e55 100644 --- a/go.sum +++ b/go.sum @@ -242,10 +242,10 @@ github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5Kwzbycv github.com/fatih/color v1.13.0 h1:8LOYc1KYPPmyKMuN8QV2DNRWNbLo6LZ0iLs8+mlH53w= github.com/fatih/color v1.13.0/go.mod h1:kLAiJbzzSOZDVNGyDpeOxJ47H46qBXwg5ILebYFFOfk= github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= -github.com/flyteorg/flyteidl v1.5.13 h1:IQ2Cw+u36ew3BPyRDAcHdzc/GyNEOXOxhKy9jbS4hbo= -github.com/flyteorg/flyteidl v1.5.13/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= -github.com/flyteorg/flyteplugins v1.1.30 h1:AVqS6Eb9Nr9Z3Mb3CtP04ffAVS9LMx5Q1Z7AyFFk/e0= -github.com/flyteorg/flyteplugins v1.1.30/go.mod h1:FujFQdL/f9r1HvFR81JCiNYusDy9F0lExhyoyMHXXbg= +github.com/flyteorg/flyteidl v1.5.21 h1:zP1byUlNFqstTe7Io1DiiNgNf+mZAVmGZM04oIUA5kU= +github.com/flyteorg/flyteidl v1.5.21/go.mod h1:EtE/muM2lHHgBabjYcxqe9TWeJSL0kXwbI0RgVwI4Og= +github.com/flyteorg/flyteplugins v1.1.31 h1:9LHEWq6I4/hh4BeSk7qKwgeaBSyedD8V5se54v77yYo= +github.com/flyteorg/flyteplugins v1.1.31/go.mod h1:FujFQdL/f9r1HvFR81JCiNYusDy9F0lExhyoyMHXXbg= github.com/flyteorg/flytestdlib v1.0.24 h1:jDvymcjlsTRCwOtxPapro0WZBe3isTz+T3Tiq+mZUuk= github.com/flyteorg/flytestdlib v1.0.24/go.mod h1:6nXa5g00qFIsgdvQ7jKQMJmDniqO0hG6Z5X5olfduqQ= github.com/flyteorg/stow v0.3.7 h1:Cx7j8/Ux6+toD5hp5fy++927V+yAcAttDeQAlUD/864= diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index ebdf52a58..7ee7961d4 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -323,7 +323,7 @@ func getAdminClient(ctx context.Context) (client service.AdminServiceClient, sig } // New returns a new FlyteWorkflow controller -func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Interface, flytepropellerClientset clientset.Interface, +func New(ctx context.Context, cfg *config.Config, kubeClientset kubernetes.Interface, flytepropellerClientset clientset.Interface, flyteworkflowInformerFactory informers.SharedInformerFactory, informerFactory k8sInformers.SharedInformerFactory, kubeClient executors.Client, scope promutils.Scope) (*Controller, error) { @@ -354,13 +354,13 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter if err != nil { return nil, errors.Wrapf(err, "Failed to create EventSink [%v], error %v", events.GetConfig(ctx).Type, err) } - gc, err := NewGarbageCollector(cfg, scope, clock.RealClock{}, kubeclientset.CoreV1().Namespaces(), flytepropellerClientset.FlyteworkflowV1alpha1()) + gc, err := NewGarbageCollector(cfg, scope, clock.RealClock{}, kubeClientset.CoreV1().Namespaces(), flytepropellerClientset.FlyteworkflowV1alpha1()) if err != nil { logger.Errorf(ctx, "failed to initialize GC for workflows") return nil, errors.Wrapf(err, "failed to initialize WF GC") } - eventRecorder, err := utils.NewK8sEventRecorder(ctx, kubeclientset, controllerAgentName, cfg.PublishK8sEvents) + eventRecorder, err := utils.NewK8sEventRecorder(ctx, kubeClientset, controllerAgentName, cfg.PublishK8sEvents) if err != nil { logger.Errorf(ctx, "failed to event recorder %v", err) return nil, errors.Wrapf(err, "failed to initialize resource lock.") @@ -372,7 +372,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter numWorkers: cfg.Workers, } - lock, err := leader.NewResourceLock(kubeclientset.CoreV1(), kubeclientset.CoordinationV1(), eventRecorder, cfg.LeaderElection) + lock, err := leader.NewResourceLock(kubeClientset.CoreV1(), kubeClientset.CoordinationV1(), eventRecorder, cfg.LeaderElection) if err != nil { logger.Errorf(ctx, "failed to initialize resource lock.") return nil, errors.Wrapf(err, "failed to initialize resource lock.") @@ -440,7 +440,7 @@ func New(ctx context.Context, cfg *config.Config, kubeclientset kubernetes.Inter recoveryClient := recovery.NewClient(adminClient) nodeHandlerFactory, err := factory.NewHandlerFactory(ctx, launchPlanActor, launchPlanActor, - kubeClient, catalogClient, recoveryClient, &cfg.EventConfig, cfg.ClusterID, signalClient, scope) + kubeClient, kubeClientset, catalogClient, recoveryClient, &cfg.EventConfig, cfg.ClusterID, signalClient, scope) if err != nil { return nil, errors.Wrapf(err, "failed to create node handler factory") } diff --git a/pkg/controller/executors/dag_structure.go b/pkg/controller/executors/dag_structure.go index cd0f1be3c..0927cefb7 100644 --- a/pkg/controller/executors/dag_structure.go +++ b/pkg/controller/executors/dag_structure.go @@ -6,6 +6,8 @@ import ( "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) +//go:generate mockery -name DAGStructure -name DAGStructureWithStartNode -case=underscore + // An interface that captures the Directed Acyclic Graph structure in which the nodes are connected. // If NodeLookup and DAGStructure are used together a traversal can be implemented. type DAGStructure interface { diff --git a/pkg/controller/executors/execution_context.go b/pkg/controller/executors/execution_context.go index 53c0bbbd1..3de9b2b92 100644 --- a/pkg/controller/executors/execution_context.go +++ b/pkg/controller/executors/execution_context.go @@ -4,6 +4,8 @@ import ( "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) +// go:generate mockery -case=underscore + type TaskDetailsGetter interface { GetTask(id v1alpha1.TaskID) (v1alpha1.ExecutableTask, error) } diff --git a/pkg/controller/executors/kube.go b/pkg/controller/executors/kube.go index a37e7a216..b7312df6c 100644 --- a/pkg/controller/executors/kube.go +++ b/pkg/controller/executors/kube.go @@ -13,6 +13,8 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" ) +//go:generate mockery -name Client -case=underscore + // Client is a friendlier controller-runtime client that gets passed to executors type Client interface { // GetClient returns a client configured with the Config diff --git a/pkg/controller/executors/node_lookup.go b/pkg/controller/executors/node_lookup.go index 66fc9bddf..d37ac87a4 100644 --- a/pkg/controller/executors/node_lookup.go +++ b/pkg/controller/executors/node_lookup.go @@ -6,6 +6,8 @@ import ( "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) +//go:generate mockery -name NodeLookup -case=underscore + // NodeLookup provides a structure that enables looking up all nodes within the current execution hierarchy/context. // NOTE: execution hierarchy may change the nodes available, this is because when a SubWorkflow is being executed, only // the nodes within the subworkflow are visible diff --git a/pkg/controller/executors/workflow.go b/pkg/controller/executors/workflow.go index 59bc1efe3..aadc1d8be 100644 --- a/pkg/controller/executors/workflow.go +++ b/pkg/controller/executors/workflow.go @@ -6,6 +6,8 @@ import ( "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" ) +//go:generate mockery -name Workflow -case=underscore + type Workflow interface { Initialize(ctx context.Context) error HandleFlyteWorkflow(ctx context.Context, w *v1alpha1.FlyteWorkflow) error diff --git a/pkg/controller/nodes/factory/handler_factory.go b/pkg/controller/nodes/factory/handler_factory.go index 9ec00da7a..d5408a01d 100644 --- a/pkg/controller/nodes/factory/handler_factory.go +++ b/pkg/controller/nodes/factory/handler_factory.go @@ -3,6 +3,8 @@ package factory import ( "context" + "k8s.io/client-go/kubernetes" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog" @@ -33,6 +35,7 @@ type handlerFactory struct { workflowLauncher launchplan.Executor launchPlanReader launchplan.Reader kubeClient executors.Client + kubeClientset kubernetes.Interface catalogClient catalog.Client recoveryClient recovery.Client eventConfig *config.EventConfig @@ -50,7 +53,7 @@ func (f *handlerFactory) GetHandler(kind v1alpha1.NodeKind) (interfaces.NodeHand } func (f *handlerFactory) Setup(ctx context.Context, executor interfaces.Node, setup interfaces.SetupContext) error { - t, err := task.New(ctx, f.kubeClient, f.catalogClient, f.eventConfig, f.clusterID, f.scope) + t, err := task.New(ctx, f.kubeClient, f.kubeClientset, f.catalogClient, f.eventConfig, f.clusterID, f.scope) if err != nil { return err } @@ -79,13 +82,14 @@ func (f *handlerFactory) Setup(ctx context.Context, executor interfaces.Node, se } func NewHandlerFactory(ctx context.Context, workflowLauncher launchplan.Executor, launchPlanReader launchplan.Reader, - kubeClient executors.Client, catalogClient catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig, + kubeClient executors.Client, kubeClientset kubernetes.Interface, catalogClient catalog.Client, recoveryClient recovery.Client, eventConfig *config.EventConfig, clusterID string, signalClient service.SignalServiceClient, scope promutils.Scope) (interfaces.HandlerFactory, error) { return &handlerFactory{ workflowLauncher: workflowLauncher, launchPlanReader: launchPlanReader, kubeClient: kubeClient, + kubeClientset: kubeClientset, catalogClient: catalogClient, recoveryClient: recoveryClient, eventConfig: eventConfig, diff --git a/pkg/controller/nodes/task/handler.go b/pkg/controller/nodes/task/handler.go index d2d2107db..5c48441c0 100644 --- a/pkg/controller/nodes/task/handler.go +++ b/pkg/controller/nodes/task/handler.go @@ -6,6 +6,8 @@ import ( "runtime/debug" "time" + "k8s.io/client-go/kubernetes" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" @@ -195,6 +197,7 @@ type Handler struct { metrics *metrics pluginRegistry PluginRegistryIface kubeClient pluginCore.KubeClient + kubeClientset kubernetes.Interface secretManager pluginCore.SecretManager resourceManager resourcemanager.BaseResourceManager cfg *config.Config @@ -229,7 +232,7 @@ func (t *Handler) Setup(ctx context.Context, sCtx interfaces.SetupContext) error // Create the resource negotiator here // and then convert it to proxies later and pass them to plugins - enabledPlugins, defaultForTaskTypes, err := WranglePluginsAndGenerateFinalList(ctx, &t.cfg.TaskPlugins, t.pluginRegistry) + enabledPlugins, defaultForTaskTypes, err := WranglePluginsAndGenerateFinalList(ctx, &t.cfg.TaskPlugins, t.pluginRegistry, t.kubeClientset) if err != nil { logger.Errorf(ctx, "Failed to finalize enabled plugins. Error: %s", err) return err @@ -840,7 +843,8 @@ func (t Handler) Finalize(ctx context.Context, nCtx interfaces.NodeExecutionCont }() } -func New(ctx context.Context, kubeClient executors.Client, client catalog.Client, eventConfig *controllerConfig.EventConfig, clusterID string, scope promutils.Scope) (*Handler, error) { +func New(ctx context.Context, kubeClient executors.Client, kubeClientset kubernetes.Interface, client catalog.Client, + eventConfig *controllerConfig.EventConfig, clusterID string, scope promutils.Scope) (*Handler, error) { // TODO New should take a pointer async, err := catalog.NewAsyncClient(client, *catalog.GetConfig(), scope.NewSubScope("async_catalog")) if err != nil { @@ -866,6 +870,7 @@ func New(ctx context.Context, kubeClient executors.Client, client catalog.Client }, pluginScope: scope.NewSubScope("plugin"), kubeClient: kubeClient, + kubeClientset: kubeClientset, catalog: client, asyncCatalog: async, resourceManager: nil, diff --git a/pkg/controller/nodes/task/handler_test.go b/pkg/controller/nodes/task/handler_test.go index 38506c4c7..3278aaa46 100644 --- a/pkg/controller/nodes/task/handler_test.go +++ b/pkg/controller/nodes/task/handler_test.go @@ -6,42 +6,34 @@ import ( "fmt" "testing" - "github.com/flyteorg/flyteidl/clients/go/coreutils" "github.com/golang/protobuf/proto" - eventsErr "github.com/flyteorg/flytepropeller/events/errors" - - pluginK8sMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s/mocks" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + v1 "k8s.io/api/core/v1" + v12 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + k8sfake "k8s.io/client-go/kubernetes/fake" + "github.com/flyteorg/flyteidl/clients/go/coreutils" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" - "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" - - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" - - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/resourcemanager" - - "github.com/flyteorg/flytestdlib/contextutils" - "github.com/flyteorg/flytestdlib/promutils/labeled" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery" pluginCatalogMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/catalog/mocks" pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" pluginCoreMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" ioMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io/mocks" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" pluginK8s "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" - controllerConfig "github.com/flyteorg/flytepropeller/pkg/controller/config" - "github.com/flyteorg/flytestdlib/promutils" - "github.com/flyteorg/flytestdlib/storage" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - v1 "k8s.io/api/core/v1" - v12 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" + pluginK8sMocks "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s/mocks" + eventsErr "github.com/flyteorg/flytepropeller/events/errors" + "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" flyteMocks "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1/mocks" + controllerConfig "github.com/flyteorg/flytepropeller/pkg/controller/config" "github.com/flyteorg/flytepropeller/pkg/controller/executors/mocks" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/interfaces" @@ -49,7 +41,13 @@ import ( "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/codex" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/fakeplugins" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/resourcemanager" rmConfig "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/resourcemanager/config" + + "github.com/flyteorg/flytestdlib/contextutils" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/promutils/labeled" + "github.com/flyteorg/flytestdlib/storage" ) var eventConfig = &controllerConfig.EventConfig{ @@ -242,12 +240,13 @@ func Test_task_Setup(t *testing.T) { t.Run(tt.name, func(t *testing.T) { sCtx := &nodeMocks.SetupContext{} fakeKubeClient := mocks.NewFakeKubeClient() + mockClientset := k8sfake.NewSimpleClientset() sCtx.On("KubeClient").Return(fakeKubeClient) sCtx.On("OwnerKind").Return("test") sCtx.On("EnqueueOwner").Return(pluginCore.EnqueueOwner(func(name types.NamespacedName) error { return nil })) sCtx.On("MetricsScope").Return(promutils.NewTestScope()) - tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope()) + tk, err := New(context.TODO(), fakeKubeClient, mockClientset, &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope()) tk.cfg.TaskPlugins.EnabledPlugins = tt.enabledPlugins tk.cfg.TaskPlugins.DefaultForTaskTypes = tt.defaultForTaskTypes assert.NoError(t, err) @@ -1226,7 +1225,8 @@ func Test_task_Finalize(t *testing.T) { catalog := &pluginCatalogMocks.Client{} m := tt.fields.defaultPluginCallback() - tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), catalog, eventConfig, testClusterID, promutils.NewTestScope()) + mockClientset := k8sfake.NewSimpleClientset() + tk, err := New(context.TODO(), mocks.NewFakeKubeClient(), mockClientset, catalog, eventConfig, testClusterID, promutils.NewTestScope()) assert.NoError(t, err) tk.defaultPlugin = m tk.resourceManager = noopRm @@ -1245,7 +1245,8 @@ func Test_task_Finalize(t *testing.T) { } func TestNew(t *testing.T) { - got, err := New(context.TODO(), mocks.NewFakeKubeClient(), &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope()) + mockClientset := k8sfake.NewSimpleClientset() + got, err := New(context.TODO(), mocks.NewFakeKubeClient(), mockClientset, &pluginCatalogMocks.Client{}, eventConfig, testClusterID, promutils.NewTestScope()) assert.NoError(t, err) assert.NotNil(t, got) assert.NotNil(t, got.defaultPlugins) diff --git a/pkg/controller/nodes/task/k8s/event_watcher.go b/pkg/controller/nodes/task/k8s/event_watcher.go new file mode 100644 index 000000000..5b5b11890 --- /dev/null +++ b/pkg/controller/nodes/task/k8s/event_watcher.go @@ -0,0 +1,124 @@ +package k8s + +import ( + "context" + "sort" + "sync" + "time" + + eventsv1 "k8s.io/api/events/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/informers" + informerEventsv1 "k8s.io/client-go/informers/events/v1" + "k8s.io/client-go/kubernetes" + + "github.com/flyteorg/flytestdlib/logger" +) + +type EventWatcher interface { + List(objectNsName types.NamespacedName, createdAfter time.Time) []*EventInfo +} + +// eventWatcher is a simple wrapper around the informer that keeps track of outstanding object events. +// Event lifetime is controlled by kube-apiserver (see --event-ttl flag) and defaults to one hour. As a result, +// the cache size is bounded by the number of event objects created in the last hour (or otherwise configured ttl). +// Note that cardinality of per object events is relatively low (10s), while they may occur repeatedly. For example +// the ImagePullBackOff event may continue to fire, but this is only backed by a single event. +type eventWatcher struct { + informer informerEventsv1.EventInformer + objectCache sync.Map +} + +type objectEvents struct { + mu sync.RWMutex + eventInfos map[types.NamespacedName]*EventInfo +} + +// EventInfo stores detail about the event and the timestamp of the first occurrence. +// All other fields are thrown away to conserve space. +type EventInfo struct { + Reason string + Note string + CreatedAt time.Time + RecordedAt time.Time +} + +func (e *eventWatcher) OnAdd(obj interface{}) { + event := obj.(*eventsv1.Event) + objectNsName := types.NamespacedName{Namespace: event.Regarding.Namespace, Name: event.Regarding.Name} + eventNsName := types.NamespacedName{Namespace: event.Namespace, Name: event.Name} + v, _ := e.objectCache.LoadOrStore(objectNsName, &objectEvents{ + eventInfos: map[types.NamespacedName]*EventInfo{}, + }) + objEvents := v.(*objectEvents) + objEvents.mu.Lock() + defer objEvents.mu.Unlock() + objEvents.eventInfos[eventNsName] = &EventInfo{ + Reason: event.Reason, + Note: event.Note, + CreatedAt: event.CreationTimestamp.Time, + RecordedAt: time.Now(), + } +} + +func (e *eventWatcher) OnUpdate(_, newObj interface{}) { + // Dropping event updates since we only care about the creation +} + +func (e *eventWatcher) OnDelete(obj interface{}) { + event := obj.(*eventsv1.Event) + objectNsName := types.NamespacedName{Namespace: event.Regarding.Namespace, Name: event.Regarding.Name} + eventNsName := types.NamespacedName{Namespace: event.Namespace, Name: event.Name} + v, _ := e.objectCache.LoadOrStore(objectNsName, &objectEvents{}) + objEvents := v.(*objectEvents) + objEvents.mu.Lock() + defer objEvents.mu.Unlock() + delete(objEvents.eventInfos, eventNsName) + if len(objEvents.eventInfos) == 0 { + e.objectCache.Delete(objectNsName) + } +} + +// List returns all events for the given object that were created after the given time, sorted by creation time. +func (e *eventWatcher) List(objectNsName types.NamespacedName, createdAfter time.Time) []*EventInfo { + v, _ := e.objectCache.Load(objectNsName) + if v == nil { + return []*EventInfo{} + } + objEvents := v.(*objectEvents) + objEvents.mu.RLock() + defer objEvents.mu.RUnlock() + // This logic assumes that cardinality of events per object is relatively low, so iterating over them to find + // recent ones and sorting the results is not too expensive. + result := make([]*EventInfo, 0, len(objEvents.eventInfos)) + for _, eventInfo := range objEvents.eventInfos { + if eventInfo.CreatedAt.After(createdAfter) { + result = append(result, eventInfo) + } + } + sort.SliceStable(result, func(i, j int) bool { + return result[i].CreatedAt.Before(result[j].CreatedAt) || + (result[i].CreatedAt.Equal(result[j].CreatedAt) && result[i].RecordedAt.Before(result[j].RecordedAt)) + }) + return result +} + +func NewEventWatcher(ctx context.Context, gvk schema.GroupVersionKind, kubeClientset kubernetes.Interface) (EventWatcher, error) { + objectSelector := func(opts *metav1.ListOptions) { + opts.FieldSelector = fields.OneTermEqualSelector("regarding.kind", gvk.Kind).String() + } + eventInformer := informers.NewSharedInformerFactoryWithOptions( + kubeClientset, 0, informers.WithTweakListOptions(objectSelector)).Events().V1().Events() + watcher := &eventWatcher{ + informer: eventInformer, + } + eventInformer.Informer().AddEventHandler(watcher) + + go eventInformer.Informer().Run(ctx.Done()) + logger.Debugf(ctx, "Started informer for [%s] events", gvk.Kind) + + return watcher, nil +} diff --git a/pkg/controller/nodes/task/k8s/event_watcher_test.go b/pkg/controller/nodes/task/k8s/event_watcher_test.go new file mode 100644 index 000000000..9247a3e72 --- /dev/null +++ b/pkg/controller/nodes/task/k8s/event_watcher_test.go @@ -0,0 +1,192 @@ +package k8s + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + + corev1 "k8s.io/api/core/v1" + eventsv1 "k8s.io/api/events/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +func TestEventWatcher_OnAdd(t *testing.T) { + now := time.Now() + ew := eventWatcher{} + ew.objectCache.Store(types.NamespacedName{Namespace: "ns1", Name: "name1"}, &objectEvents{ + eventInfos: map[types.NamespacedName]*EventInfo{ + {Namespace: "eventns1", Name: "eventname1"}: {CreatedAt: now.Add(-time.Minute)}, + }, + }) + ew.objectCache.Store(types.NamespacedName{Namespace: "ns2", Name: "name2"}, &objectEvents{ + eventInfos: map[types.NamespacedName]*EventInfo{}, + }) + + t.Run("existing event", func(t *testing.T) { + ew.OnAdd(&eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns1", + Name: "eventname1", + CreationTimestamp: metav1.NewTime(now), + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns1", + Name: "name1", + }, + }) + + v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns1", Name: "name1"}) + assert.NotNil(t, v) + objEvents := v.(*objectEvents) + assert.Len(t, objEvents.eventInfos, 1) + assert.Equal(t, now, objEvents.eventInfos[types.NamespacedName{Namespace: "eventns1", Name: "eventname1"}].CreatedAt) + }) + + t.Run("new event", func(t *testing.T) { + ew.OnAdd(&eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns2", + Name: "eventname2", + CreationTimestamp: metav1.NewTime(now), + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns2", + Name: "name2", + }, + }) + + v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns2", Name: "name2"}) + assert.NotNil(t, v) + objEvents := v.(*objectEvents) + assert.Len(t, objEvents.eventInfos, 1) + assert.Equal(t, now, objEvents.eventInfos[types.NamespacedName{Namespace: "eventns2", Name: "eventname2"}].CreatedAt) + }) + + t.Run("new object", func(t *testing.T) { + ew.OnAdd(&eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns3", + Name: "eventname3", + CreationTimestamp: metav1.NewTime(now), + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns3", + Name: "name3", + }, + }) + + v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns3", Name: "name3"}) + assert.NotNil(t, v) + objEvents := v.(*objectEvents) + assert.Len(t, objEvents.eventInfos, 1) + assert.Equal(t, now, objEvents.eventInfos[types.NamespacedName{Namespace: "eventns3", Name: "eventname3"}].CreatedAt) + }) +} + +func TestEventWatcher_OnDelete(t *testing.T) { + now := time.Now() + ew := eventWatcher{} + ew.objectCache.Store(types.NamespacedName{Namespace: "ns1", Name: "name1"}, &objectEvents{ + eventInfos: map[types.NamespacedName]*EventInfo{ + {Namespace: "eventns1", Name: "eventname1"}: {CreatedAt: now.Add(-time.Minute)}, + }, + }) + ew.objectCache.Store(types.NamespacedName{Namespace: "ns2", Name: "name2"}, &objectEvents{ + eventInfos: map[types.NamespacedName]*EventInfo{}, + }) + + t.Run("existing event", func(t *testing.T) { + ew.OnDelete(&eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns1", + Name: "eventname1", + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns1", + Name: "name1", + }, + }) + + v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns1", Name: "name1"}) + assert.Nil(t, v) + }) + + t.Run("missing event", func(t *testing.T) { + ew.OnDelete(&eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns2", + Name: "eventname2", + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns2", + Name: "name2", + }, + }) + + v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns2", Name: "name2"}) + assert.Nil(t, v) + }) + + t.Run("missing object", func(t *testing.T) { + ew.OnDelete(&eventsv1.Event{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: "eventns3", + Name: "eventname3", + }, + Regarding: corev1.ObjectReference{ + Namespace: "ns3", + Name: "name3", + }, + }) + + v, _ := ew.objectCache.Load(types.NamespacedName{Namespace: "ns3", Name: "name3"}) + assert.Nil(t, v) + }) +} + +func TestEventWatcher_List(t *testing.T) { + now := time.Now() + ew := eventWatcher{} + ew.objectCache.Store(types.NamespacedName{Namespace: "ns1", Name: "name1"}, &objectEvents{ + eventInfos: map[types.NamespacedName]*EventInfo{ + {Namespace: "eventns1", Name: "eventname1"}: {CreatedAt: now}, + {Namespace: "eventns2", Name: "eventname2"}: {CreatedAt: now.Add(-time.Hour)}, + }, + }) + ew.objectCache.Store(types.NamespacedName{Namespace: "ns2", Name: "name2"}, &objectEvents{ + eventInfos: map[types.NamespacedName]*EventInfo{}, + }) + + t.Run("all events", func(t *testing.T) { + result := ew.List(types.NamespacedName{Namespace: "ns1", Name: "name1"}, time.Time{}) + + assert.Len(t, result, 2) + }) + + t.Run("recent events", func(t *testing.T) { + result := ew.List(types.NamespacedName{Namespace: "ns1", Name: "name1"}, now.Add(-time.Minute)) + + assert.Len(t, result, 1) + }) + + t.Run("no events", func(t *testing.T) { + result := ew.List(types.NamespacedName{Namespace: "ns2", Name: "name2"}, time.Time{}) + + assert.Len(t, result, 0) + }) + + t.Run("no object", func(t *testing.T) { + result := ew.List(types.NamespacedName{Namespace: "ns3", Name: "name3"}, time.Time{}) + + assert.Len(t, result, 0) + }) + + t.Run("sorted", func(t *testing.T) { + result := ew.List(types.NamespacedName{Namespace: "ns1", Name: "name1"}, time.Time{}) + + assert.Equal(t, now.Add(-time.Hour), result[0].CreatedAt) + assert.Equal(t, now, result[1].CreatedAt) + }) +} diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index a3397354e..1b281f66d 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -6,26 +6,7 @@ import ( "reflect" "time" - "github.com/flyteorg/flyteplugins/go/tasks/errors" - pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" - - compiler "github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/backoff" - nodeTaskConfig "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config" - - "github.com/flyteorg/flytestdlib/contextutils" - stdErrors "github.com/flyteorg/flytestdlib/errors" - "github.com/flyteorg/flytestdlib/logger" - "github.com/flyteorg/flytestdlib/promutils" - "github.com/flyteorg/flytestdlib/promutils/labeled" - "golang.org/x/time/rate" - v1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -33,16 +14,31 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/validation" - + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/source" + + "github.com/flyteorg/flyteplugins/go/tasks/errors" + pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" + pluginsUtils "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/utils" + compiler "github.com/flyteorg/flytepropeller/pkg/compiler/transformers/k8s" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/backoff" + nodeTaskConfig "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config" + "github.com/flyteorg/flytestdlib/contextutils" + stdErrors "github.com/flyteorg/flytestdlib/errors" + "github.com/flyteorg/flytestdlib/logger" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/promutils/labeled" ) const finalizer = "flyte/flytek8s" @@ -58,8 +54,9 @@ const ( ) type PluginState struct { - Phase PluginPhase - K8sPluginState k8s.PluginState + Phase PluginPhase + K8sPluginState k8s.PluginState + LastEventUpdate time.Time } type PluginMetrics struct { @@ -84,7 +81,7 @@ func newPluginMetrics(s promutils.Scope) PluginMetrics { } } -func IsK8sObjectNotExists(err error) bool { +func isK8sObjectNotExists(err error) bool { return k8serrors.IsNotFound(err) || k8serrors.IsGone(err) || k8serrors.IsResourceExpired(err) } @@ -99,12 +96,13 @@ type PluginManager struct { // Per namespace-resource backOffController *backoff.Controller resourceLevelMonitor *ResourceLevelMonitor + eventWatcher EventWatcher } -func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) { +func (e *PluginManager) addObjectMetadata(taskCtx pluginsCore.TaskExecutionMetadata, o client.Object, cfg *config.K8sPluginConfig) { o.SetNamespace(taskCtx.GetNamespace()) - o.SetAnnotations(utils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), utils.CopyMap(taskCtx.GetAnnotations()))) - o.SetLabels(utils.UnionMaps(cfg.DefaultLabels, o.GetLabels(), utils.CopyMap(taskCtx.GetLabels()))) + o.SetAnnotations(pluginsUtils.UnionMaps(cfg.DefaultAnnotations, o.GetAnnotations(), pluginsUtils.CopyMap(taskCtx.GetAnnotations()))) + o.SetLabels(pluginsUtils.UnionMaps(cfg.DefaultLabels, o.GetLabels(), pluginsUtils.CopyMap(taskCtx.GetLabels()))) o.SetName(taskCtx.GetTaskExecutionID().GetGeneratedName()) if !e.plugin.GetProperties().DisableInjectOwnerReferences { @@ -117,7 +115,7 @@ func (e *PluginManager) AddObjectMetadata(taskCtx pluginsCore.TaskExecutionMetad } if errs := validation.IsDNS1123Subdomain(o.GetName()); len(errs) > 0 { - o.SetName(utils.ConvertToDNS1123SubdomainCompatibleString(o.GetName())) + o.SetName(pluginsUtils.ConvertToDNS1123SubdomainCompatibleString(o.GetName())) } } @@ -184,7 +182,7 @@ func (e *PluginManager) getPodEffectiveResourceLimits(ctx context.Context, pod * return podRequestedResources } -func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (pluginsCore.Transition, error) { +func (e *PluginManager) launchResource(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (pluginsCore.Transition, error) { tmpl, err := tCtx.TaskReader().Read(ctx) if err != nil { return pluginsCore.Transition{}, err @@ -202,7 +200,7 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas return pluginsCore.UnknownTransition, err } - e.AddObjectMetadata(k8sTaskCtxMetadata, o, config.GetK8sPluginConfig()) + e.addObjectMetadata(k8sTaskCtxMetadata, o, config.GetK8sPluginConfig()) logger.Infof(ctx, "Creating Object: Type:[%v], Object:[%v/%v]", o.GetObjectKind().GroupVersionKind(), o.GetNamespace(), o.GetName()) key := backoff.ComposeResourceKey(o) @@ -251,18 +249,21 @@ func (e *PluginManager) LaunchResource(ctx context.Context, tCtx pluginsCore.Tas return pluginsCore.DoTransition(pluginsCore.PhaseInfoQueued(time.Now(), pluginsCore.DefaultPhaseVersion, "task submitted to K8s")), nil } -func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, k8sPluginState *k8s.PluginState) (pluginsCore.Transition, error) { +func (e *PluginManager) getResource(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (client.Object, error) { o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata()) if err != nil { logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err) - return pluginsCore.DoTransition(pluginsCore.PhaseInfoFailure("BadTaskDefinition", fmt.Sprintf("Failed to build resource, caused by: %s", err.Error()), nil)), nil + return nil, err } + e.addObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) + return o, nil +} - e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) +func (e *PluginManager) checkResourcePhase(ctx context.Context, tCtx pluginsCore.TaskExecutionContext, o client.Object, k8sPluginState *k8s.PluginState) (pluginsCore.Transition, error) { nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} // Attempt to get resource from informer cache, if not found, retrieve it from API server. if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil { - if IsK8sObjectNotExists(err) { + if isK8sObjectNotExists(err) { // This happens sometimes because a node gets removed and K8s deletes the pod. This will result in a // Pod does not exist error. This should be retried using the retry policy logger.Warningf(ctx, "Failed to find the Resource with name: %v. Error: %v", nsName, err) @@ -315,10 +316,11 @@ func (e *PluginManager) CheckResourcePhase(ctx context.Context, tCtx pluginsCore func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (pluginsCore.Transition, error) { // read phase state - ps := PluginState{} - if v, err := tCtx.PluginStateReader().Get(&ps); err != nil { + pluginState := PluginState{} + if v, err := tCtx.PluginStateReader().Get(&pluginState); err != nil { if v != pluginStateVersion { - return pluginsCore.DoTransition(pluginsCore.PhaseInfoRetryableFailure(errors.CorruptedPluginState, fmt.Sprintf("plugin state version mismatch expected [%d] got [%d]", pluginStateVersion, v), nil)), nil + return pluginsCore.DoTransition(pluginsCore.PhaseInfoRetryableFailure(errors.CorruptedPluginState, + fmt.Sprintf("plugin state version mismatch expected [%d] got [%d]", pluginStateVersion, v), nil)), nil } return pluginsCore.UnknownTransition, errors.Wrapf(errors.CorruptedPluginState, err, "Failed to read unmarshal custom state") } @@ -326,56 +328,75 @@ func (e PluginManager) Handle(ctx context.Context, tCtx pluginsCore.TaskExecutio // evaluate plugin var err error var transition pluginsCore.Transition - pluginPhase := ps.Phase - if ps.Phase == PluginPhaseNotStarted { - transition, err = e.LaunchResource(ctx, tCtx) + var o client.Object + pluginPhase := pluginState.Phase + if pluginState.Phase == PluginPhaseNotStarted { + transition, err = e.launchResource(ctx, tCtx) if err == nil && transition.Info().Phase() == pluginsCore.PhaseQueued { pluginPhase = PluginPhaseStarted } } else { - transition, err = e.CheckResourcePhase(ctx, tCtx, &ps.K8sPluginState) + o, err = e.getResource(ctx, tCtx) + if err != nil { + transition, err = pluginsCore.DoTransition(pluginsCore.PhaseInfoFailure("BadTaskDefinition", + fmt.Sprintf("Failed to build resource, caused by: %s", err.Error()), nil)), nil + } else { + transition, err = e.checkResourcePhase(ctx, tCtx, o, &pluginState.K8sPluginState) + } } if err != nil { return transition, err } - // persist any changes in phase state - k8sPluginState := ps.K8sPluginState - if ps.Phase != pluginPhase || k8sPluginState.Phase != transition.Info().Phase() || - k8sPluginState.PhaseVersion != transition.Info().Version() || k8sPluginState.Reason != transition.Info().Reason() { - - newPluginState := PluginState{ - Phase: pluginPhase, - K8sPluginState: k8s.PluginState{ - Phase: transition.Info().Phase(), - PhaseVersion: transition.Info().Version(), - Reason: transition.Info().Reason(), - }, + // Add events since last update + version := transition.Info().Version() + lastEventUpdate := pluginState.LastEventUpdate + if e.eventWatcher != nil && o != nil { + nsName := k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} + recentEvents := e.eventWatcher.List(nsName, lastEventUpdate) + if len(recentEvents) > 0 { + taskInfo := transition.Info().Info() + taskInfo.AdditionalReasons = make([]pluginsCore.ReasonInfo, 0, len(recentEvents)) + for _, event := range recentEvents { + taskInfo.AdditionalReasons = append(taskInfo.AdditionalReasons, + pluginsCore.ReasonInfo{Reason: event.Note, OccurredAt: &event.CreatedAt}) + lastEventUpdate = event.CreatedAt + } + // Bump the version to ensure newly added events are picked up + version++ } + } + // persist any changes in phase state + newPluginState := PluginState{ + Phase: pluginPhase, + K8sPluginState: k8s.PluginState{ + Phase: transition.Info().Phase(), + PhaseVersion: version, + Reason: transition.Info().Reason(), + }, + LastEventUpdate: lastEventUpdate, + } + if pluginState != newPluginState { if err := tCtx.PluginStateWriter().Put(pluginStateVersion, &newPluginState); err != nil { return pluginsCore.UnknownTransition, err } } - return transition, err + return transition, nil } func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) error { logger.Infof(ctx, "KillTask invoked. We will attempt to delete object [%v].", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()) - o, err := e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata()) + o, err := e.getResource(ctx, tCtx) if err != nil { - // This will recurrent, so we will skip further finalize - logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v, when finalizing.", - tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err) + logger.Errorf(ctx, "%v", err) return nil } - e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) - deleteResource := true abortOverride, hasAbortOverride := e.plugin.(k8s.PluginAbortOverride) @@ -407,7 +428,7 @@ func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecution } } - if err != nil && !IsK8sObjectNotExists(err) { + if err != nil && !isK8sObjectNotExists(err) { logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v", resourceToFinalize.GetNamespace(), resourceToFinalize.GetName(), err) return err @@ -416,11 +437,11 @@ func (e PluginManager) Abort(ctx context.Context, tCtx pluginsCore.TaskExecution return nil } -func (e *PluginManager) ClearFinalizers(ctx context.Context, o client.Object) error { +func (e *PluginManager) clearFinalizers(ctx context.Context, o client.Object) error { if len(o.GetFinalizers()) > 0 { o.SetFinalizers([]string{}) err := e.kubeClient.GetClient().Update(ctx, o) - if err != nil && !IsK8sObjectNotExists(err) { + if err != nil && !isK8sObjectNotExists(err) { logger.Warningf(ctx, "Failed to clear finalizers for Resource with name: %v/%v. Error: %v", o.GetNamespace(), o.GetName(), err) return err @@ -434,25 +455,21 @@ func (e *PluginManager) ClearFinalizers(ctx context.Context, o client.Object) er func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecutionContext) (err error) { errs := stdErrors.ErrorCollection{} - var o client.Object var nsName k8stypes.NamespacedName cfg := config.GetK8sPluginConfig() - o, err = e.plugin.BuildIdentityResource(ctx, tCtx.TaskExecutionMetadata()) + o, err := e.getResource(ctx, tCtx) if err != nil { - // This will recurrent, so we will skip further finalize - logger.Errorf(ctx, "Failed to build the Resource with name: %v. Error: %v, when finalizing.", tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), err) + logger.Errorf(ctx, "%v", err) return nil } - - e.AddObjectMetadata(tCtx.TaskExecutionMetadata(), o, config.GetK8sPluginConfig()) nsName = k8stypes.NamespacedName{Namespace: o.GetNamespace(), Name: o.GetName()} // Attempt to cleanup finalizers so that the object may be deleted/garbage collected. We try to clear them for all // objects, regardless of whether or not InjectFinalizer is configured to handle all cases where InjectFinalizer is // enabled/disabled during object execution. if err := e.kubeClient.GetClient().Get(ctx, nsName, o); err != nil { - if IsK8sObjectNotExists(err) { + if isK8sObjectNotExists(err) { return nil } // This happens sometimes because a node gets removed and K8s deletes the pod. This will result in a @@ -464,7 +481,7 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu // This must happen after sending admin event. It's safe against partial failures because if the event failed, we will // simply retry in the next round. If the event succeeded but this failed, we will try again the next round to send // the same event (idempotent) and then come here again... - err = e.ClearFinalizers(ctx, o) + err = e.clearFinalizers(ctx, o) if err != nil { errs.Append(err) } @@ -473,7 +490,7 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu if cfg.DeleteResourceOnFinalize && !e.plugin.GetProperties().DisableDeleteResourceOnFinalize { // Attempt to delete resource, if not found, return success. if err := e.kubeClient.GetClient().Delete(ctx, o); err != nil { - if IsK8sObjectNotExists(err) { + if isK8sObjectNotExists(err) { return errs.ErrorOrDefault() } @@ -488,9 +505,9 @@ func (e *PluginManager) Finalize(ctx context.Context, tCtx pluginsCore.TaskExecu } func NewPluginManagerWithBackOff(ctx context.Context, iCtx pluginsCore.SetupContext, entry k8s.PluginEntry, backOffController *backoff.Controller, - monitorIndex *ResourceMonitorIndex) (*PluginManager, error) { + monitorIndex *ResourceMonitorIndex, kubeClientset kubernetes.Interface) (*PluginManager, error) { - mgr, err := NewPluginManager(ctx, iCtx, entry, monitorIndex) + mgr, err := NewPluginManager(ctx, iCtx, entry, monitorIndex, kubeClientset) if err == nil { mgr.backOffController = backOffController } @@ -499,7 +516,7 @@ func NewPluginManagerWithBackOff(ctx context.Context, iCtx pluginsCore.SetupCont // Creates a K8s generic task executor. This provides an easier way to build task executors that create K8s resources. func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry k8s.PluginEntry, - monitorIndex *ResourceMonitorIndex) (*PluginManager, error) { + monitorIndex *ResourceMonitorIndex, kubeClientset kubernetes.Interface) (*PluginManager, error) { if iCtx.EnqueueOwner() == nil { return nil, errors.Errorf(errors.PluginInitializationFailed, "Failed to initialize plugin, enqueue Owner cannot be nil or empty.") @@ -617,16 +634,25 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry return nil, err } - // Construct the collector that will emit a gauge indicating current levels of the resource that this K8s plugin operates on gvk, err := getPluginGvk(entry.ResourceToWatch) if err != nil { return nil, err } - sharedInformer, err := getPluginSharedInformer(ctx, kubeClient, entry.ResourceToWatch) + + var eventWatcher EventWatcher + if config.GetK8sPluginConfig().SendObjectEvents { + eventWatcher, err = NewEventWatcher(ctx, gvk, kubeClientset) + if err != nil { + return nil, err + } + } + + // Construct the collector that will emit a gauge indicating current levels of the resource that this K8s plugin operates on + pluginInformer, err := getPluginSharedInformer(ctx, kubeClient, entry.ResourceToWatch) if err != nil { return nil, err } - rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, sharedInformer, gvk) + rm := monitorIndex.GetOrCreateResourceLevelMonitor(ctx, metricsScope, pluginInformer, gvk) // Start the poller and gauge emitter rm.RunCollectorOnce(ctx) @@ -637,6 +663,7 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry metrics: newPluginMetrics(metricsScope), kubeClient: kubeClient, resourceLevelMonitor: rm, + eventWatcher: eventWatcher, }, nil } diff --git a/pkg/controller/nodes/task/k8s/plugin_manager_test.go b/pkg/controller/nodes/task/k8s/plugin_manager_test.go index 7b78c6810..49f59942d 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager_test.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager_test.go @@ -7,37 +7,32 @@ import ( "reflect" "testing" - "k8s.io/client-go/kubernetes/scheme" - - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" - "github.com/flyteorg/flytestdlib/contextutils" - "github.com/flyteorg/flytestdlib/promutils/labeled" - - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/backoff" - "github.com/flyteorg/flytestdlib/promutils" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + v1 "k8s.io/api/core/v1" k8serrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" + k8stypes "k8s.io/apimachinery/pkg/types" + k8sfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/fake" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" pluginsCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" pluginsCoreMock "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core/mocks" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" pluginsk8sMock "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s/mocks" - - "github.com/stretchr/testify/mock" - v1 "k8s.io/api/core/v1" - v12 "k8s.io/apimachinery/pkg/apis/meta/v1" - - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" - "github.com/stretchr/testify/assert" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - k8stypes "k8s.io/apimachinery/pkg/types" - "sigs.k8s.io/controller-runtime/pkg/client/fake" - "github.com/flyteorg/flytepropeller/pkg/controller/executors/mocks" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/backoff" + "github.com/flyteorg/flytestdlib/contextutils" + "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/promutils/labeled" ) type extendedFakeClient struct { @@ -135,17 +130,19 @@ func (p *pluginWithAbortOverride) OnAbort(ctx context.Context, tCtx pluginsCore. func ExampleNewPluginManager() { sCtx := &pluginsCoreMock.SetupContext{} fakeKubeClient := mocks.NewFakeKubeClient() + mockClientset := k8sfake.NewSimpleClientset() sCtx.On("KubeClient").Return(fakeKubeClient) sCtx.On("OwnerKind").Return("test") sCtx.On("EnqueueOwner").Return(pluginsCore.EnqueueOwner(func(name k8stypes.NamespacedName) error { return nil })) sCtx.On("MetricsScope").Return(promutils.NewTestScope()) ctx := context.TODO() + exec, err := NewPluginManager(ctx, sCtx, k8s.PluginEntry{ ID: "SampleHandler", RegisteredTaskTypes: []pluginsCore.TaskType{"container"}, ResourceToWatch: &v1.Pod{}, Plugin: k8sSampleHandler{}, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) if err == nil { fmt.Printf("Created executor: %v\n", exec.GetID()) } else { @@ -203,7 +200,7 @@ func getMockTaskExecutionMetadata() pluginsCore.TaskExecutionMetadata { taskExecutionMetadata.On("GetNamespace").Return("ns") taskExecutionMetadata.On("GetAnnotations").Return(map[string]string{"aKey": "aVal"}) taskExecutionMetadata.On("GetLabels").Return(map[string]string{"lKey": "lVal"}) - taskExecutionMetadata.On("GetOwnerReference").Return(v12.OwnerReference{Name: "x"}) + taskExecutionMetadata.On("GetOwnerReference").Return(metav1.OwnerReference{Name: "x"}) id := &pluginsCoreMock.TaskExecutionID{} id.On("GetGeneratedName").Return("test") @@ -217,7 +214,7 @@ func getMockTaskExecutionMetadataCustom( ns string, annotations map[string]string, labels map[string]string, - ownerRef v12.OwnerReference) pluginsCore.TaskExecutionMetadata { + ownerRef metav1.OwnerReference) pluginsCore.TaskExecutionMetadata { taskExecutionMetadata := &pluginsCoreMock.TaskExecutionMetadata{} taskExecutionMetadata.On("GetNamespace").Return(ns) taskExecutionMetadata.On("GetAnnotations").Return(annotations) @@ -251,26 +248,24 @@ func buildPluginWithAbortOverride(ctx context.Context, tctx pluginsCore.TaskExec pluginResource := &v1.Pod{} mockResourceHandler := new(pluginWithAbortOverride) - mockResourceHandler.On( "OnAbort", ctx, tctx, pluginResource, ).Return(abortBehavior, nil) - mockResourceHandler.On( "BuildIdentityResource", ctx, tctx.TaskExecutionMetadata(), ).Return(pluginResource, nil) - mockResourceHandler.On("GetProperties").Return(k8s.PluginProperties{}) mockClient := extendedFakeClient{ Client: client, } + mockClientset := k8sfake.NewSimpleClientset() return NewPluginManager(ctx, dummySetupContext(mockClient), k8s.PluginEntry{ ID: "x", ResourceToWatch: pluginResource, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) } func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { @@ -285,11 +280,12 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil) fakeClient := fake.NewClientBuilder().WithRuntimeObjects().Build() + mockClientset := k8sfake.NewSimpleClientset() pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NoError(t, err) transition, err := pluginManager.Handle(ctx, tCtx) @@ -300,7 +296,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { assert.Equal(t, pluginsCore.PhaseQueued, transitionInfo.Phase()) createdPod := &v1.Pod{} - pluginManager.AddObjectMetadata(tCtx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) + pluginManager.addObjectMetadata(tCtx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) assert.NoError(t, fakeClient.Get(ctx, k8stypes.NamespacedName{Namespace: tCtx.TaskExecutionMetadata().GetNamespace(), Name: tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName()}, createdPod)) assert.Equal(t, tCtx.TaskExecutionMetadata().GetTaskExecutionID().GetGeneratedName(), createdPod.Name) @@ -314,15 +310,16 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) mockResourceHandler.OnBuildResourceMatch(mock.Anything, mock.Anything).Return(&v1.Pod{}, nil) fakeClient := fake.NewClientBuilder().WithRuntimeObjects().Build() + mockClientset := k8sfake.NewSimpleClientset() pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NoError(t, err) createdPod := &v1.Pod{} - pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) + pluginManager.addObjectMetadata(tctx.TaskExecutionMetadata(), createdPod, &config.K8sPluginConfig{}) assert.NoError(t, fakeClient.Create(ctx, createdPod)) transition, err := pluginManager.Handle(ctx, tctx) @@ -348,12 +345,13 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { Client: fake.NewClientBuilder().WithRuntimeObjects().Build(), CreateError: k8serrors.NewForbidden(schema.GroupResource{}, "", errors.New("exceeded quota")), } + mockClientset := k8sfake.NewSimpleClientset() pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NoError(t, err) createdPod := &v1.Pod{} @@ -382,12 +380,13 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { Client: fake.NewClientBuilder().WithRuntimeObjects().Build(), CreateError: k8serrors.NewForbidden(schema.GroupResource{}, "", errors.New("auth error")), } + mockClientset := k8sfake.NewSimpleClientset() pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NoError(t, err) createdPod := &v1.Pod{} @@ -434,13 +433,14 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { "exceeded quota: project-quota, requested: limits.memory=3Gi, "+ "used: limits.memory=7976Gi, limited: limits.memory=8000Gi")), } + mockClientset := k8sfake.NewSimpleClientset() backOffController := backoff.NewController(ctx) pluginManager, err := NewPluginManagerWithBackOff(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, backOffController, NewResourceMonitorIndex()) + }, backOffController, NewResourceMonitorIndex(), mockClientset) assert.NoError(t, err) transition, err := pluginManager.Handle(ctx, tctx) @@ -451,7 +451,7 @@ func TestK8sTaskExecutor_Handle_LaunchResource(t *testing.T) { // Build a reference resource that is supposed to be identical to the resource built by pluginManager referenceResource, err := mockResourceHandler.BuildResource(ctx, tctx) assert.NoError(t, err) - pluginManager.AddObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig()) + pluginManager.addObjectMetadata(tctx.TaskExecutionMetadata(), referenceResource, config.GetK8sPluginConfig()) refKey := backoff.ComposeResourceKey(referenceResource) podBackOffHandler, found := backOffController.GetBackOffHandler(refKey) assert.True(t, found) @@ -463,7 +463,7 @@ func TestPluginManager_Abort(t *testing.T) { ctx := context.TODO() tm := getMockTaskExecutionMetadata() res := &v1.Pod{ - ObjectMeta: v12.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: tm.GetTaskExecutionID().GetGeneratedName(), Namespace: tm.GetNamespace(), }, @@ -473,6 +473,7 @@ func TestPluginManager_Abort(t *testing.T) { // common setup code tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme, res)} + mockClientset := k8sfake.NewSimpleClientset() // common setup code mockResourceHandler := &pluginsk8sMock.Plugin{} @@ -483,7 +484,7 @@ func TestPluginManager_Abort(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NotNil(t, res) assert.NoError(t, err) @@ -498,6 +499,7 @@ func TestPluginManager_Abort(t *testing.T) { // common setup code tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) fc := extendedFakeClient{Client: fake.NewFakeClientWithScheme(scheme.Scheme)} + mockClientset := k8sfake.NewSimpleClientset() // common setup code mockResourceHandler := &pluginsk8sMock.Plugin{} mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) @@ -507,7 +509,7 @@ func TestPluginManager_Abort(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NotNil(t, res) assert.NoError(t, err) @@ -583,7 +585,7 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) { ctx := context.TODO() tm := getMockTaskExecutionMetadata() res := &v1.Pod{ - ObjectMeta: v12.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: tm.GetTaskExecutionID().GetGeneratedName(), Namespace: tm.GetNamespace(), }, @@ -683,6 +685,7 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) { // common setup code tctx := getMockTaskContext(PluginPhaseStarted, PluginPhaseStarted) fc := tt.args.fakeClient() + mockClientset := k8sfake.NewSimpleClientset() // common setup code mockResourceHandler := &pluginsk8sMock.Plugin{} mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) @@ -692,7 +695,7 @@ func TestPluginManager_Handle_CheckResourceStatus(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NotNil(t, res) assert.NoError(t, err) @@ -720,7 +723,7 @@ func TestPluginManager_Handle_PluginState(t *testing.T) { ctx := context.TODO() tm := getMockTaskExecutionMetadata() res := &v1.Pod{ - ObjectMeta: v12.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Name: tm.GetTaskExecutionID().GetGeneratedName(), Namespace: tm.GetNamespace(), }, @@ -839,6 +842,7 @@ func TestPluginManager_Handle_PluginState(t *testing.T) { tCtx.OnOutputWriter().Return(&dummyOutputWriter{}) fc := extendedFakeClient{Client: fake.NewFakeClient(res)} + mockClientset := k8sfake.NewSimpleClientset() mockResourceHandler := &pluginsk8sMock.Plugin{} mockResourceHandler.OnGetProperties().Return(k8s.PluginProperties{}) @@ -850,7 +854,7 @@ func TestPluginManager_Handle_PluginState(t *testing.T) { ID: "x", ResourceToWatch: &v1.Pod{}, Plugin: mockResourceHandler, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NoError(t, err) // handle plugin @@ -877,6 +881,7 @@ func TestPluginManager_CustomKubeClient(t *testing.T) { fakeClient := fake.NewClientBuilder().Build() newFakeClient := &pluginsCoreMock.KubeClient{} newFakeClient.On("GetCache").Return(&mocks.FakeInformers{}) + mockClientset := k8sfake.NewSimpleClientset() pluginManager, err := NewPluginManager(ctx, dummySetupContext(fakeClient), k8s.PluginEntry{ ID: "x", ResourceToWatch: &v1.Pod{}, @@ -884,7 +889,7 @@ func TestPluginManager_CustomKubeClient(t *testing.T) { CustomKubeClient: func(ctx context.Context) (pluginsCore.KubeClient, error) { return newFakeClient, nil }, - }, NewResourceMonitorIndex()) + }, NewResourceMonitorIndex(), mockClientset) assert.NoError(t, err) assert.Equal(t, newFakeClient, pluginManager.kubeClient) @@ -893,7 +898,7 @@ func TestPluginManager_CustomKubeClient(t *testing.T) { func TestPluginManager_AddObjectMetadata(t *testing.T) { genName := "gen-name" ns := "ns" - or := v12.OwnerReference{} + or := metav1.OwnerReference{} l := map[string]string{"l1": "lv1"} a := map[string]string{"aKey": "aVal"} tm := getMockTaskExecutionMetadataCustom(genName, ns, a, l, or) @@ -905,9 +910,9 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) { p := pluginsk8sMock.Plugin{} p.OnGetProperties().Return(k8s.PluginProperties{}) pluginManager := PluginManager{plugin: &p} - pluginManager.AddObjectMetadata(tm, o, cfg) + pluginManager.addObjectMetadata(tm, o, cfg) assert.Equal(t, genName, o.GetName()) - assert.Equal(t, []v12.OwnerReference{or}, o.GetOwnerReferences()) + assert.Equal(t, []metav1.OwnerReference{or}, o.GetOwnerReferences()) assert.Equal(t, ns, o.GetNamespace()) assert.Equal(t, map[string]string{ "cluster-autoscaler.kubernetes.io/safe-to-evict": "false", @@ -922,7 +927,7 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) { p.OnGetProperties().Return(k8s.PluginProperties{DisableInjectOwnerReferences: true}) pluginManager := PluginManager{plugin: &p} o := &v1.Pod{} - pluginManager.AddObjectMetadata(tm, o, cfg) + pluginManager.addObjectMetadata(tm, o, cfg) assert.Equal(t, genName, o.GetName()) // empty OwnerReference since we are ignoring assert.Equal(t, 0, len(o.GetOwnerReferences())) @@ -942,7 +947,7 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) { // enable finalizer injection cfg.InjectFinalizer = true o := &v1.Pod{} - pluginManager.AddObjectMetadata(tm, o, cfg) + pluginManager.addObjectMetadata(tm, o, cfg) assert.Equal(t, genName, o.GetName()) // empty OwnerReference since we are ignoring assert.Equal(t, 1, len(o.GetOwnerReferences())) @@ -962,7 +967,7 @@ func TestPluginManager_AddObjectMetadata(t *testing.T) { // disable finalizer injection cfg.InjectFinalizer = false o := &v1.Pod{} - pluginManager.AddObjectMetadata(tm, o, cfg) + pluginManager.addObjectMetadata(tm, o, cfg) assert.Equal(t, genName, o.GetName()) // empty OwnerReference since we are ignoring assert.Equal(t, 1, len(o.GetOwnerReferences())) diff --git a/pkg/controller/nodes/task/plugin_config.go b/pkg/controller/nodes/task/plugin_config.go index ebca2a449..ff4c1f26f 100644 --- a/pkg/controller/nodes/task/plugin_config.go +++ b/pkg/controller/nodes/task/plugin_config.go @@ -6,21 +6,22 @@ import ( "strings" "sync" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/backoff" + "k8s.io/client-go/kubernetes" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" - "github.com/flyteorg/flytestdlib/logger" - "github.com/flyteorg/flyteplugins/go/tasks/plugins/webapi/agent" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/backoff" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/k8s" + "github.com/flyteorg/flytestdlib/logger" ) const AgentServiceKey = "agent-service" var once sync.Once -func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface) (enabledPlugins []core.PluginEntry, defaultForTaskTypes map[pluginID][]taskType, err error) { +func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPluginConfig, pr PluginRegistryIface, + kubeClientset kubernetes.Interface) (enabledPlugins []core.PluginEntry, defaultForTaskTypes map[pluginID][]taskType, err error) { if cfg == nil { return nil, nil, fmt.Errorf("unable to initialize plugin list, cfg is a required argument") } @@ -68,7 +69,7 @@ func WranglePluginsAndGenerateFinalList(ctx context.Context, cfg *config.TaskPlu ID: id, RegisteredTaskTypes: kpe.RegisteredTaskTypes, LoadPlugin: func(ctx context.Context, iCtx core.SetupContext) (plugin core.Plugin, e error) { - return k8s.NewPluginManagerWithBackOff(ctx, iCtx, kpe, backOffController, monitorIndex) + return k8s.NewPluginManagerWithBackOff(ctx, iCtx, kpe, backOffController, monitorIndex, kubeClientset) }, IsDefault: kpe.IsDefault, } diff --git a/pkg/controller/nodes/task/plugin_config_test.go b/pkg/controller/nodes/task/plugin_config_test.go index 2dbf9f31c..bcd4c9972 100644 --- a/pkg/controller/nodes/task/plugin_config_test.go +++ b/pkg/controller/nodes/task/plugin_config_test.go @@ -6,14 +6,14 @@ import ( "testing" "time" - config2 "github.com/flyteorg/flytestdlib/config" - - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" - "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" "github.com/magiconair/properties/assert" "k8s.io/apimachinery/pkg/util/sets" + k8sfake "k8s.io/client-go/kubernetes/fake" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" + "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/k8s" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/config" + flytestdlibConfig "github.com/flyteorg/flytestdlib/config" ) func TestWranglePluginsAndGenerateFinalList(t *testing.T) { @@ -47,13 +47,13 @@ func TestWranglePluginsAndGenerateFinalList(t *testing.T) { args args want want }{ - {"config-no-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer}}, backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{}}, - {"no-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: nil}, backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{}}, + {"config-no-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer}}, backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: flytestdlibConfig.Duration{Duration: time.Second * 0}}}, want{}}, + {"no-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: nil}, backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: flytestdlibConfig.Duration{Duration: time.Second * 0}}}, want{}}, {"no-config-no-plugins", args{}, want{err: true}}, {"no-config-plugins", args{corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{err: true}}, {"empty-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin)}, want{final: sets.NewString(k8sContainer, k8sOther, coreOther, coreContainer)}}, - {"config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer, k8sOther}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}}, - {"case-differs-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{strings.ToUpper(coreContainer), strings.ToUpper(k8sOther)}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: config2.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}}, + {"config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{coreContainer, k8sOther}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: flytestdlibConfig.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}}, + {"case-differs-config-plugins", args{cfg: &config.TaskPluginConfig{EnabledPlugins: []string{strings.ToUpper(coreContainer), strings.ToUpper(k8sOther)}, DefaultForTaskTypes: map[string]string{"container": coreContainer}}, corePlugins: cpe(coreContainerPlugin, coreOtherPlugin), k8sPlugins: kpe(k8sContainerPlugin, k8sOtherPlugin), backOffCfg: &config.BackOffConfig{BaseSecond: 0, MaxDuration: flytestdlibConfig.Duration{Duration: time.Second * 0}}}, want{final: sets.NewString(k8sOther, coreContainer)}}, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -61,7 +61,8 @@ func TestWranglePluginsAndGenerateFinalList(t *testing.T) { core: tt.args.corePlugins, k8s: tt.args.k8sPlugins, } - got, _, err := WranglePluginsAndGenerateFinalList(context.TODO(), tt.args.cfg, pr) + mockClientset := k8sfake.NewSimpleClientset() + got, _, err := WranglePluginsAndGenerateFinalList(context.TODO(), tt.args.cfg, pr, mockClientset) if (err != nil) != tt.want.err { t.Errorf("WranglePluginsAndGenerateFinalList() error = %v, wantErr %v", err, tt.want.err) return diff --git a/pkg/controller/nodes/task/transformer.go b/pkg/controller/nodes/task/transformer.go index 21edca723..e671b75fa 100644 --- a/pkg/controller/nodes/task/transformer.go +++ b/pkg/controller/nodes/task/transformer.go @@ -3,9 +3,12 @@ package task import ( "time" + "google.golang.org/protobuf/types/known/timestamppb" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" + flytek8sConfig "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/flytek8s/config" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/io" "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" "github.com/flyteorg/flytepropeller/pkg/controller/config" @@ -13,9 +16,6 @@ import ( "github.com/flyteorg/flytepropeller/pkg/controller/nodes/common" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/interfaces" - - "github.com/golang/protobuf/ptypes" - timestamppb "github.com/golang/protobuf/ptypes/timestamp" ) // This is used by flyteadmin to indicate that map tasks now report subtask metadata individually. @@ -88,24 +88,16 @@ type ToTaskExecutionEventInputs struct { func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutionEvent, error) { // Transitions to a new phase - var err error var occurredAt *timestamppb.Timestamp if i := input.Info.Info(); i != nil && i.OccurredAt != nil { - occurredAt, err = ptypes.TimestampProto(*i.OccurredAt) + occurredAt = timestamppb.New(*i.OccurredAt) } else { - occurredAt, err = ptypes.TimestampProto(input.OccurredAt) - } - - if err != nil { - return nil, err + occurredAt = timestamppb.New(input.OccurredAt) } - reportedAt := ptypes.TimestampNow() + reportedAt := timestamppb.Now() if i := input.Info.Info(); i != nil && i.ReportedAt != nil { - occurredAt, err = ptypes.TimestampProto(*i.ReportedAt) - if err != nil { - return nil, err - } + occurredAt = timestamppb.New(*i.ReportedAt) } taskExecID := input.TaskExecContext.TaskExecutionMetadata().GetTaskExecutionID().GetID() @@ -136,6 +128,19 @@ func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutio } } + var reasons []*event.EventReason + if len(input.Info.Reason()) > 0 { + reasons = append(reasons, &event.EventReason{ + Reason: input.Info.Reason(), + OccurredAt: occurredAt, + }) + } + for _, reasonInfo := range input.Info.Info().AdditionalReasons { + reasons = append(reasons, &event.EventReason{ + Reason: reasonInfo.Reason, + OccurredAt: timestamppb.New(*reasonInfo.OccurredAt), + }) + } tev := &event.TaskExecutionEvent{ TaskId: taskExecID.TaskId, ParentNodeExecutionId: nodeExecutionID, @@ -145,11 +150,16 @@ func ToTaskExecutionEvent(input ToTaskExecutionEventInputs) (*event.TaskExecutio ProducerId: input.ClusterID, OccurredAt: occurredAt, TaskType: input.TaskType, - Reason: input.Info.Reason(), + Reasons: reasons, Metadata: metadata, EventVersion: taskExecutionEventVersion, ReportedAt: reportedAt, } + if !flytek8sConfig.GetK8sPluginConfig().SendObjectEvents { + // For back compat with old versions of flyteadmin, populate the deprecated reason field. + // Setting SendObjectEvents to true assumes that flyteadmin is using the new reasons field. + tev.Reason = input.Info.Reason() + } if input.Info.Phase().IsSuccess() && input.OutputWriter != nil { if input.OutputWriter.GetOutputPath() != "" { diff --git a/pkg/controller/workflow/executor_test.go b/pkg/controller/workflow/executor_test.go index 6633b11b4..6de4d297c 100644 --- a/pkg/controller/workflow/executor_test.go +++ b/pkg/controller/workflow/executor_test.go @@ -9,58 +9,52 @@ import ( "testing" "time" - "k8s.io/apimachinery/pkg/util/rand" - - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" - - "github.com/flyteorg/flyteidl/clients/go/coreutils" - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" - - "github.com/flyteorg/flytestdlib/contextutils" - "github.com/flyteorg/flytestdlib/promutils/labeled" + "github.com/golang/protobuf/proto" + "github.com/golang/protobuf/ptypes" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" - + "k8s.io/apimachinery/pkg/util/rand" "k8s.io/apimachinery/pkg/util/sets" + k8sfake "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/tools/record" + "github.com/flyteorg/flyteidl/clients/go/coreutils" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery" pluginCore "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/core" "github.com/flyteorg/flyteplugins/go/tasks/pluginmachinery/ioutils" - "github.com/flyteorg/flytestdlib/logger" - "github.com/flyteorg/flytepropeller/events" eventsErr "github.com/flyteorg/flytepropeller/events/errors" eventMocks "github.com/flyteorg/flytepropeller/events/mocks" - mocks2 "github.com/flyteorg/flytepropeller/pkg/controller/executors/mocks" + "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" + "github.com/flyteorg/flytepropeller/pkg/controller/config" + executorMocks "github.com/flyteorg/flytepropeller/pkg/controller/executors/mocks" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/catalog" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/factory" + gateMocks "github.com/flyteorg/flytepropeller/pkg/controller/nodes/gate/mocks" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/handler" nodemocks "github.com/flyteorg/flytepropeller/pkg/controller/nodes/interfaces/mocks" + recoveryMocks "github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery/mocks" + "github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" "github.com/flyteorg/flytepropeller/pkg/controller/nodes/task/fakeplugins" - wfErrors "github.com/flyteorg/flytepropeller/pkg/controller/workflow/errors" - - "github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core" + "github.com/flyteorg/flytestdlib/contextutils" + "github.com/flyteorg/flytestdlib/logger" "github.com/flyteorg/flytestdlib/promutils" + "github.com/flyteorg/flytestdlib/promutils/labeled" "github.com/flyteorg/flytestdlib/storage" "github.com/flyteorg/flytestdlib/yamlutils" - "github.com/golang/protobuf/proto" - "github.com/golang/protobuf/ptypes" - "github.com/stretchr/testify/assert" - "k8s.io/client-go/tools/record" - - "github.com/flyteorg/flytepropeller/pkg/apis/flyteworkflow/v1alpha1" - "github.com/flyteorg/flytepropeller/pkg/controller/config" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes" - gatemocks "github.com/flyteorg/flytepropeller/pkg/controller/nodes/gate/mocks" - recoveryMocks "github.com/flyteorg/flytepropeller/pkg/controller/nodes/recovery/mocks" - "github.com/flyteorg/flytepropeller/pkg/controller/nodes/subworkflow/launchplan" ) var ( testScope = promutils.NewScope("test_wfexec") - fakeKubeClient = mocks2.NewFakeKubeClient() - signalClient = &gatemocks.SignalServiceClient{} + fakeKubeClient = executorMocks.NewFakeKubeClient() + mockClientset = k8sfake.NewSimpleClientset() + signalClient = &gateMocks.SignalServiceClient{} ) const ( @@ -248,7 +242,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Error(t *testing.T) { recoveryClient := &recoveryMocks.Client{} adminClient := launchplan.NewFailFastLaunchPlanExecutor() - handlerFactory, err := factory.NewHandlerFactory(ctx, adminClient, adminClient, fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, promutils.NewTestScope()) + handlerFactory, err := factory.NewHandlerFactory(ctx, adminClient, adminClient, fakeKubeClient, mockClientset, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, promutils.NewTestScope()) assert.NoError(t, err) nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient, @@ -331,7 +325,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow(t *testing.T) { recoveryClient := &recoveryMocks.Client{} adminClient := launchplan.NewFailFastLaunchPlanExecutor() - handlerFactory, err := factory.NewHandlerFactory(ctx, adminClient, adminClient, fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, promutils.NewTestScope()) + handlerFactory, err := factory.NewHandlerFactory(ctx, adminClient, adminClient, fakeKubeClient, mockClientset, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, promutils.NewTestScope()) assert.NoError(t, err) nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient, @@ -607,7 +601,7 @@ func TestWorkflowExecutor_HandleFlyteWorkflow_Events(t *testing.T) { adminClient := launchplan.NewFailFastLaunchPlanExecutor() recoveryClient := &recoveryMocks.Client{} - handlerFactory, err := factory.NewHandlerFactory(ctx, adminClient, adminClient, fakeKubeClient, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, promutils.NewTestScope()) + handlerFactory, err := factory.NewHandlerFactory(ctx, adminClient, adminClient, fakeKubeClient, mockClientset, catalogClient, recoveryClient, eventConfig, testClusterID, signalClient, promutils.NewTestScope()) assert.NoError(t, err) nodeExec, err := nodes.NewExecutor(ctx, config.GetConfig().NodeConfig, store, enqueueWorkflow, eventSink, adminClient, adminClient,