From 3b273f41d623074572c74d22161bcd1bdb50b7d2 Mon Sep 17 00:00:00 2001 From: David Cheung Date: Tue, 17 Sep 2024 03:45:41 +0000 Subject: [PATCH] Trigger all syncers to sync when zones or subnets changed. * When zones or subnets of a cluster changes, all syncers react to this change by either creating additional NEGs in new zone/subnet, or mark NEGs as Inactive/To-be-deleted. --- .../svcneg/v1beta1/zz_generated.openapi.go | 2 +- .../informers/externalversions/generic.go | 4 +- .../ingparams/v1beta1/gcpingressparams.go | 6 +- .../ingparams/v1beta1/interface.go | 8 +- pkg/neg/controller.go | 129 ++++++++++++++---- pkg/neg/manager.go | 14 ++ pkg/neg/types/interfaces.go | 2 + pkg/neg/utils.go | 19 +++ pkg/neg/utils_test.go | 96 +++++++++++++ 9 files changed, 247 insertions(+), 33 deletions(-) diff --git a/pkg/apis/svcneg/v1beta1/zz_generated.openapi.go b/pkg/apis/svcneg/v1beta1/zz_generated.openapi.go index d4c5b8c82a..9184adfbd0 100644 --- a/pkg/apis/svcneg/v1beta1/zz_generated.openapi.go +++ b/pkg/apis/svcneg/v1beta1/zz_generated.openapi.go @@ -137,7 +137,7 @@ func schema_pkg_apis_svcneg_v1beta1_NegObjectReference(ref common.ReferenceCallb }, "state": { SchemaProps: spec.SchemaProps{ - Description: "Current condition of this network endpoint group.", + Description: "Current condition of this network endpoint group. If state is empty, it should be considered the ACTIVE state.", Type: []string{"string"}, Format: "", }, diff --git a/pkg/ingparams/client/informers/externalversions/generic.go b/pkg/ingparams/client/informers/externalversions/generic.go index 130ba7137d..85b1a20d5d 100644 --- a/pkg/ingparams/client/informers/externalversions/generic.go +++ b/pkg/ingparams/client/informers/externalversions/generic.go @@ -53,8 +53,8 @@ func (f *genericInformer) Lister() cache.GenericLister { func (f *sharedInformerFactory) ForResource(resource schema.GroupVersionResource) (GenericInformer, error) { switch resource { // Group=networking.gke.io, Version=v1beta1 - case v1beta1.SchemeGroupVersion.WithResource("gcpingressparams"): - return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().GCPIngressParams().Informer()}, nil + case v1beta1.SchemeGroupVersion.WithResource("gcpingressparamses"): + return &genericInformer{resource: resource.GroupResource(), informer: f.Networking().V1beta1().GCPIngressParamses().Informer()}, nil } diff --git a/pkg/ingparams/client/informers/externalversions/ingparams/v1beta1/gcpingressparams.go b/pkg/ingparams/client/informers/externalversions/ingparams/v1beta1/gcpingressparams.go index c1506ca4dd..ef3d37b782 100644 --- a/pkg/ingparams/client/informers/externalversions/ingparams/v1beta1/gcpingressparams.go +++ b/pkg/ingparams/client/informers/externalversions/ingparams/v1beta1/gcpingressparams.go @@ -33,7 +33,7 @@ import ( ) // GCPIngressParamsInformer provides access to a shared informer and lister for -// GCPIngressParams. +// GCPIngressParamses. type GCPIngressParamsInformer interface { Informer() cache.SharedIndexInformer Lister() v1beta1.GCPIngressParamsLister @@ -61,13 +61,13 @@ func NewFilteredGCPIngressParamsInformer(client versioned.Interface, resyncPerio if tweakListOptions != nil { tweakListOptions(&options) } - return client.NetworkingV1beta1().GCPIngressParams().List(context.TODO(), options) + return client.NetworkingV1beta1().GCPIngressParamses().List(context.TODO(), options) }, WatchFunc: func(options v1.ListOptions) (watch.Interface, error) { if tweakListOptions != nil { tweakListOptions(&options) } - return client.NetworkingV1beta1().GCPIngressParams().Watch(context.TODO(), options) + return client.NetworkingV1beta1().GCPIngressParamses().Watch(context.TODO(), options) }, }, &ingparamsv1beta1.GCPIngressParams{}, diff --git a/pkg/ingparams/client/informers/externalversions/ingparams/v1beta1/interface.go b/pkg/ingparams/client/informers/externalversions/ingparams/v1beta1/interface.go index bdf5c16000..05f6d5a30d 100644 --- a/pkg/ingparams/client/informers/externalversions/ingparams/v1beta1/interface.go +++ b/pkg/ingparams/client/informers/externalversions/ingparams/v1beta1/interface.go @@ -24,8 +24,8 @@ import ( // Interface provides access to all the informers in this group version. type Interface interface { - // GCPIngressParams returns a GCPIngressParamsInformer. - GCPIngressParams() GCPIngressParamsInformer + // GCPIngressParamses returns a GCPIngressParamsInformer. + GCPIngressParamses() GCPIngressParamsInformer } type version struct { @@ -39,7 +39,7 @@ func New(f internalinterfaces.SharedInformerFactory, namespace string, tweakList return &version{factory: f, namespace: namespace, tweakListOptions: tweakListOptions} } -// GCPIngressParams returns a GCPIngressParamsInformer. -func (v *version) GCPIngressParams() GCPIngressParamsInformer { +// GCPIngressParamses returns a GCPIngressParamsInformer. +func (v *version) GCPIngressParamses() GCPIngressParamsInformer { return &gCPIngressParamsInformer{factory: v.factory, tweakListOptions: v.tweakListOptions} } diff --git a/pkg/neg/controller.go b/pkg/neg/controller.go index 6dfe23b96e..ed936a24ee 100644 --- a/pkg/neg/controller.go +++ b/pkg/neg/controller.go @@ -20,6 +20,7 @@ import ( "fmt" "time" + nodetopologyv1 "github.com/GoogleCloudPlatform/gke-networking-api/apis/nodetopology/v1" apiv1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1" v1 "k8s.io/api/networking/v1" @@ -85,6 +86,9 @@ type Controller struct { endpointQueue workqueue.RateLimitingInterface // nodeQueue takes node name as work item. nodeQueue workqueue.RateLimitingInterface + // nodeTopologyQueue acts as an intermeidate queue to trigger sync on all + // syncers on Node Topology resource updates. + nodeTopologyQueue workqueue.RateLimitingInterface // syncTracker tracks the latest time that service and endpoint changes are processed syncTracker utils.TimeTracker @@ -104,6 +108,10 @@ type Controller struct { // gce-regional-external ingresses enableIngressRegionalExternal bool + // enableMultiSubnetClusterPhase1 indicates whether NEG controller should create + // additional NEGs in the non-default subnets. + enableMultiSubnetClusterPhase1 bool + // runL4ForNetLB indicates if the controller can create NEGs for L4 NetLB services. runL4ForNetLB bool @@ -217,30 +225,36 @@ func NewController( if gkeNetworkParamSetInformer != nil { gkeNetworkParamSetIndexer = gkeNetworkParamSetInformer.GetIndexer() } + enableMultiSubnetClusterPhase1 := flags.F.EnableMultiSubnetClusterPhase1 + negController := &Controller{ - client: kubeClient, - manager: manager, - gcPeriod: gcPeriod, - recorder: recorder, - zoneGetter: zoneGetter, - namer: namer, - l4Namer: l4Namer, - defaultBackendService: defaultBackendService, - hasSynced: hasSynced, - ingressLister: ingressInformer.GetIndexer(), - serviceLister: serviceInformer.GetIndexer(), - networkResolver: network.NewNetworksResolver(networkIndexer, gkeNetworkParamSetIndexer, cloud, enableMultiNetworking, logger), - serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_service_queue"), - endpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_endpoint_queue"), - nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_node_queue"), - syncTracker: utils.NewTimeTracker(), - reflector: reflector, - syncerMetrics: syncerMetrics, - runL4ForILB: runL4Controller, - enableIngressRegionalExternal: enableIngressRegionalExternal, - runL4ForNetLB: runL4ForNetLB, - stopCh: stopCh, - logger: logger, + client: kubeClient, + manager: manager, + gcPeriod: gcPeriod, + recorder: recorder, + zoneGetter: zoneGetter, + namer: namer, + l4Namer: l4Namer, + defaultBackendService: defaultBackendService, + hasSynced: hasSynced, + ingressLister: ingressInformer.GetIndexer(), + serviceLister: serviceInformer.GetIndexer(), + networkResolver: network.NewNetworksResolver(networkIndexer, gkeNetworkParamSetIndexer, cloud, enableMultiNetworking, logger), + serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_service_queue"), + endpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_endpoint_queue"), + nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_node_queue"), + syncTracker: utils.NewTimeTracker(), + reflector: reflector, + syncerMetrics: syncerMetrics, + runL4ForILB: runL4Controller, + enableIngressRegionalExternal: enableIngressRegionalExternal, + enableMultiSubnetClusterPhase1: enableMultiSubnetClusterPhase1, + runL4ForNetLB: runL4ForNetLB, + stopCh: stopCh, + logger: logger, + } + if enableMultiSubnetClusterPhase1 { + negController.nodeTopologyQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_node_topology_queue") } ingressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ @@ -328,6 +342,31 @@ func NewController( } }, }) + if enableMultiSubnetClusterPhase1 { + nodeTopologyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + crd := obj.(*nodetopologyv1.NodeTopology) + negController.enqueueNodeTopology(crd) + }, + UpdateFunc: func(old, cur interface{}) { + oldCrd := old.(*nodetopologyv1.NodeTopology) + currentCrd := cur.(*nodetopologyv1.NodeTopology) + + var zoneChanged, subnetChanged bool + if isZoneChanged(oldCrd.Status.Zones, currentCrd.Status.Zones) { + logger.Info("Zones in Node Topology CR have changed", "oldZones", oldCrd.Status.Zones, "currentZones", currentCrd.Status.Zones) + zoneChanged = true + } + if isSubnetChanged(oldCrd.Status.Subnets, currentCrd.Status.Subnets) { + logger.Info("Subnets in Node Topology CR have changed", "oldSubnets", oldCrd.Status.Subnets, "currentSubnets", currentCrd.Status.Subnets) + subnetChanged = true + } + if zoneChanged || subnetChanged { + negController.enqueueNodeTopology(currentCrd) + } + }, + }) + } if enableAsm { negController.enableASM = enableAsm @@ -352,6 +391,9 @@ func (c *Controller) Run() { go wait.Until(c.serviceWorker, time.Second, c.stopCh) go wait.Until(c.endpointWorker, time.Second, c.stopCh) go wait.Until(c.nodeWorker, time.Second, c.stopCh) + if c.enableMultiSubnetClusterPhase1 { + go wait.Until(c.nodeTopologyWorker, time.Second, c.stopCh) + } go func() { // Wait for gcPeriod to run the first GC // This is to make sure that all services are fully processed before running GC. @@ -381,6 +423,9 @@ func (c *Controller) stop() { c.serviceQueue.ShutDown() c.endpointQueue.ShutDown() c.nodeQueue.ShutDown() + if c.enableMultiSubnetClusterPhase1 { + c.nodeTopologyQueue.ShutDown() + } c.manager.ShutDown() } @@ -536,6 +581,33 @@ func (c *Controller) processService(key string) error { return c.syncNegStatusAnnotation(namespace, name, make(negtypes.PortInfoMap)) } +func (c *Controller) nodeTopologyWorker() { + for { + func() { + key, quit := c.nodeTopologyQueue.Get() + if quit { + return + } + c.processNodeTopology() + // Node Topology CR is a cluster-wide resource, so the key will + // always be the same. + // Done() ensures that if the item is updated while it is being + // process, it will be re-added to the queue for re-processing, + // so we won't miss any updates. + c.nodeTopologyQueue.Done(key) + }() + } +} + +// processNodeTopology signals all syncers to sync +func (c *Controller) processNodeTopology() { + defer func() { + now := c.syncTracker.Track() + metrics.LastSyncTimestamp.Set(float64(now.UTC().UnixNano())) + }() + c.manager.SyncAllSyncers() +} + // mergeIngressPortInfo merges Ingress PortInfo into portInfoMap if the service has Enable Ingress annotation. func (c *Controller) mergeIngressPortInfo(service *apiv1.Service, name types.NamespacedName, portInfoMap negtypes.PortInfoMap, networkInfo *network.NetworkInfo) error { negAnnotation, foundNEGAnnotation, err := annotations.FromService(service).NEGAnnotation() @@ -848,6 +920,17 @@ func (c *Controller) enqueueIngressServices(ing *v1.Ingress) { } } +func (c *Controller) enqueueNodeTopology(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + c.logger.Error(err, "Failed to generate Node Topology key") + metrics.PublishNegControllerErrorCountMetrics(err, true) + return + } + c.logger.V(3).Info("Adding NodeTopology to nodeTopologyQueue for processing", "nodeTopology", key) + c.nodeTopologyQueue.Add(key) +} + func (c *Controller) gc() { if err := c.manager.GC(); err != nil { c.logger.Error(err, "NEG controller garbage collection failed") diff --git a/pkg/neg/manager.go b/pkg/neg/manager.go index 8f4000443c..a8d95a88ce 100644 --- a/pkg/neg/manager.go +++ b/pkg/neg/manager.go @@ -335,6 +335,20 @@ func (manager *syncerManager) SyncNodes() { } } +// SyncAllSyncers signals all syncers to sync. +func (manager *syncerManager) SyncAllSyncers() { + manager.mu.Lock() + defer manager.mu.Unlock() + + for key, syncer := range manager.syncerMap { + if syncer.IsStopped() { + manager.logger.V(1).Info("SyncAllSyncers: Syncer is already stopped; not syncing.", "negSyncerKey", key.String()) + continue + } + syncer.Sync() + } +} + // updateZoneMap updates the existingZoneMap with the latest zones and returns // true if the zones have changed. The caller must obtain mu mutex of the // manager before calling this function since it modifies the passed diff --git a/pkg/neg/types/interfaces.go b/pkg/neg/types/interfaces.go index 03f80b98e8..89576e1eb6 100644 --- a/pkg/neg/types/interfaces.go +++ b/pkg/neg/types/interfaces.go @@ -76,6 +76,8 @@ type NegSyncerManager interface { GC() error // ShutDown shuts down the manager ShutDown() + // SyncAllSyncer signals all syncers to sync. This call is asynchronous. + SyncAllSyncers() } type NetworkEndpointsCalculator interface { diff --git a/pkg/neg/utils.go b/pkg/neg/utils.go index 1905c18d4a..6d854dd5c5 100644 --- a/pkg/neg/utils.go +++ b/pkg/neg/utils.go @@ -19,7 +19,9 @@ package neg import ( "fmt" + nodetopologyv1 "github.com/GoogleCloudPlatform/gke-networking-api/apis/nodetopology/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/ingress-gce/pkg/annotations" "k8s.io/ingress-gce/pkg/neg/types" ) @@ -58,3 +60,20 @@ func contains(ss []string, target string) bool { } return false } + +func isZoneChanged(oldZones, newZones []string) bool { + return !sets.NewString(oldZones...).Equal(sets.NewString(newZones...)) +} + +func isSubnetChanged(oldSubnets, newSubnets []nodetopologyv1.SubnetConfig) bool { + oldSubnetSet := sets.NewString() + newSubnetSet := sets.NewString() + for _, oldSubnet := range oldSubnets { + oldSubnetSet.Insert(oldSubnet.SubnetPath) + } + for _, newSubnet := range newSubnets { + newSubnetSet.Insert(newSubnet.SubnetPath) + } + + return !oldSubnetSet.Equal(newSubnetSet) +} diff --git a/pkg/neg/utils_test.go b/pkg/neg/utils_test.go index d739b4aa46..4b765df831 100644 --- a/pkg/neg/utils_test.go +++ b/pkg/neg/utils_test.go @@ -21,6 +21,7 @@ import ( "reflect" "testing" + nodetopologyv1 "github.com/GoogleCloudPlatform/gke-networking-api/apis/nodetopology/v1" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" utilerrors "k8s.io/apimachinery/pkg/util/errors" @@ -231,3 +232,98 @@ func TestNEGServicePorts(t *testing.T) { }) } } + +func TestIsZoneChanged(t *testing.T) { + testCases := []struct { + desc string + oldZones []string + newZones []string + expectChanged bool + }{ + { + desc: "a zone is added", + oldZones: []string{"zone1"}, + newZones: []string{"zone1", "zone2"}, + expectChanged: true, + }, + { + desc: "a zone is deleted", + oldZones: []string{"zone1", "zone2"}, + newZones: []string{"zone1"}, + expectChanged: true, + }, + { + desc: "zones stay unchaged", + oldZones: []string{"zone1", "zone2"}, + newZones: []string{"zone1", "zone2"}, + expectChanged: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + gotChanged := isZoneChanged(tc.oldZones, tc.newZones) + if gotChanged != tc.expectChanged { + t.Errorf("Got zone changed = %v, expected %v", gotChanged, tc.expectChanged) + } + }) + } +} + +func TestIsSubnetChange(t *testing.T) { + defaultSubnet := "default" + defaultSubnetPath := "projects/my-project/regions/us-central1/subnetworks/default" + + additionalSubnet := "add-subnet" + additionalSubnetPath := "add-subnet" + testCases := []struct { + desc string + oldSubnets []nodetopologyv1.SubnetConfig + newSubnets []nodetopologyv1.SubnetConfig + expectChanged bool + }{ + { + desc: "a subnet is added", + oldSubnets: []nodetopologyv1.SubnetConfig{ + {Name: defaultSubnet, SubnetPath: defaultSubnetPath}, + }, + newSubnets: []nodetopologyv1.SubnetConfig{ + {Name: defaultSubnet, SubnetPath: defaultSubnetPath}, + {Name: additionalSubnet, SubnetPath: additionalSubnetPath}, + }, + expectChanged: true, + }, + { + desc: "a subnet is deleted", + oldSubnets: []nodetopologyv1.SubnetConfig{ + {Name: defaultSubnet, SubnetPath: defaultSubnetPath}, + {Name: additionalSubnet, SubnetPath: additionalSubnetPath}, + }, + newSubnets: []nodetopologyv1.SubnetConfig{ + {Name: defaultSubnet, SubnetPath: defaultSubnetPath}, + }, + expectChanged: true, + }, + { + desc: "subnets stay unchaged", + oldSubnets: []nodetopologyv1.SubnetConfig{ + {Name: defaultSubnet, SubnetPath: defaultSubnetPath}, + {Name: additionalSubnet, SubnetPath: additionalSubnetPath}, + }, + newSubnets: []nodetopologyv1.SubnetConfig{ + {Name: defaultSubnet, SubnetPath: defaultSubnetPath}, + {Name: additionalSubnet, SubnetPath: additionalSubnetPath}, + }, + expectChanged: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + gotChanged := isSubnetChanged(tc.oldSubnets, tc.newSubnets) + if gotChanged != tc.expectChanged { + t.Errorf("Got subnet changed = %v, expected %v", gotChanged, tc.expectChanged) + } + }) + } +}