Skip to content

Commit

Permalink
Trigger all syncers to sync when zones or subnets changed.
Browse files Browse the repository at this point in the history
* When zones or subnets of a cluster changes, all syncers react to this
  change by either creating additional NEGs in new zone/subnet, or mark
  NEGs as Inactive/To-be-deleted.
  • Loading branch information
sawsa307 committed Oct 1, 2024
1 parent e9f83bd commit 3b273f4
Show file tree
Hide file tree
Showing 9 changed files with 247 additions and 33 deletions.
2 changes: 1 addition & 1 deletion pkg/apis/svcneg/v1beta1/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions pkg/ingparams/client/informers/externalversions/generic.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

129 changes: 106 additions & 23 deletions pkg/neg/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"time"

nodetopologyv1 "github.com/GoogleCloudPlatform/gke-networking-api/apis/nodetopology/v1"
apiv1 "k8s.io/api/core/v1"
discovery "k8s.io/api/discovery/v1"
v1 "k8s.io/api/networking/v1"
Expand Down Expand Up @@ -85,6 +86,9 @@ type Controller struct {
endpointQueue workqueue.RateLimitingInterface
// nodeQueue takes node name as work item.
nodeQueue workqueue.RateLimitingInterface
// nodeTopologyQueue acts as an intermeidate queue to trigger sync on all
// syncers on Node Topology resource updates.
nodeTopologyQueue workqueue.RateLimitingInterface

// syncTracker tracks the latest time that service and endpoint changes are processed
syncTracker utils.TimeTracker
Expand All @@ -104,6 +108,10 @@ type Controller struct {
// gce-regional-external ingresses
enableIngressRegionalExternal bool

// enableMultiSubnetClusterPhase1 indicates whether NEG controller should create
// additional NEGs in the non-default subnets.
enableMultiSubnetClusterPhase1 bool

// runL4ForNetLB indicates if the controller can create NEGs for L4 NetLB services.
runL4ForNetLB bool

Expand Down Expand Up @@ -217,30 +225,36 @@ func NewController(
if gkeNetworkParamSetInformer != nil {
gkeNetworkParamSetIndexer = gkeNetworkParamSetInformer.GetIndexer()
}
enableMultiSubnetClusterPhase1 := flags.F.EnableMultiSubnetClusterPhase1

negController := &Controller{
client: kubeClient,
manager: manager,
gcPeriod: gcPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
namer: namer,
l4Namer: l4Namer,
defaultBackendService: defaultBackendService,
hasSynced: hasSynced,
ingressLister: ingressInformer.GetIndexer(),
serviceLister: serviceInformer.GetIndexer(),
networkResolver: network.NewNetworksResolver(networkIndexer, gkeNetworkParamSetIndexer, cloud, enableMultiNetworking, logger),
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_service_queue"),
endpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_endpoint_queue"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_node_queue"),
syncTracker: utils.NewTimeTracker(),
reflector: reflector,
syncerMetrics: syncerMetrics,
runL4ForILB: runL4Controller,
enableIngressRegionalExternal: enableIngressRegionalExternal,
runL4ForNetLB: runL4ForNetLB,
stopCh: stopCh,
logger: logger,
client: kubeClient,
manager: manager,
gcPeriod: gcPeriod,
recorder: recorder,
zoneGetter: zoneGetter,
namer: namer,
l4Namer: l4Namer,
defaultBackendService: defaultBackendService,
hasSynced: hasSynced,
ingressLister: ingressInformer.GetIndexer(),
serviceLister: serviceInformer.GetIndexer(),
networkResolver: network.NewNetworksResolver(networkIndexer, gkeNetworkParamSetIndexer, cloud, enableMultiNetworking, logger),
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_service_queue"),
endpointQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_endpoint_queue"),
nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_node_queue"),
syncTracker: utils.NewTimeTracker(),
reflector: reflector,
syncerMetrics: syncerMetrics,
runL4ForILB: runL4Controller,
enableIngressRegionalExternal: enableIngressRegionalExternal,
enableMultiSubnetClusterPhase1: enableMultiSubnetClusterPhase1,
runL4ForNetLB: runL4ForNetLB,
stopCh: stopCh,
logger: logger,
}
if enableMultiSubnetClusterPhase1 {
negController.nodeTopologyQueue = workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "neg_node_topology_queue")
}

ingressInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
Expand Down Expand Up @@ -328,6 +342,31 @@ func NewController(
}
},
})
if enableMultiSubnetClusterPhase1 {
nodeTopologyInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
crd := obj.(*nodetopologyv1.NodeTopology)
negController.enqueueNodeTopology(crd)
},
UpdateFunc: func(old, cur interface{}) {
oldCrd := old.(*nodetopologyv1.NodeTopology)
currentCrd := cur.(*nodetopologyv1.NodeTopology)

var zoneChanged, subnetChanged bool
if isZoneChanged(oldCrd.Status.Zones, currentCrd.Status.Zones) {
logger.Info("Zones in Node Topology CR have changed", "oldZones", oldCrd.Status.Zones, "currentZones", currentCrd.Status.Zones)
zoneChanged = true
}
if isSubnetChanged(oldCrd.Status.Subnets, currentCrd.Status.Subnets) {
logger.Info("Subnets in Node Topology CR have changed", "oldSubnets", oldCrd.Status.Subnets, "currentSubnets", currentCrd.Status.Subnets)
subnetChanged = true
}
if zoneChanged || subnetChanged {
negController.enqueueNodeTopology(currentCrd)
}
},
})
}

if enableAsm {
negController.enableASM = enableAsm
Expand All @@ -352,6 +391,9 @@ func (c *Controller) Run() {
go wait.Until(c.serviceWorker, time.Second, c.stopCh)
go wait.Until(c.endpointWorker, time.Second, c.stopCh)
go wait.Until(c.nodeWorker, time.Second, c.stopCh)
if c.enableMultiSubnetClusterPhase1 {
go wait.Until(c.nodeTopologyWorker, time.Second, c.stopCh)
}
go func() {
// Wait for gcPeriod to run the first GC
// This is to make sure that all services are fully processed before running GC.
Expand Down Expand Up @@ -381,6 +423,9 @@ func (c *Controller) stop() {
c.serviceQueue.ShutDown()
c.endpointQueue.ShutDown()
c.nodeQueue.ShutDown()
if c.enableMultiSubnetClusterPhase1 {
c.nodeTopologyQueue.ShutDown()
}
c.manager.ShutDown()
}

Expand Down Expand Up @@ -536,6 +581,33 @@ func (c *Controller) processService(key string) error {
return c.syncNegStatusAnnotation(namespace, name, make(negtypes.PortInfoMap))
}

func (c *Controller) nodeTopologyWorker() {
for {
func() {
key, quit := c.nodeTopologyQueue.Get()
if quit {
return
}
c.processNodeTopology()
// Node Topology CR is a cluster-wide resource, so the key will
// always be the same.
// Done() ensures that if the item is updated while it is being
// process, it will be re-added to the queue for re-processing,
// so we won't miss any updates.
c.nodeTopologyQueue.Done(key)
}()
}
}

// processNodeTopology signals all syncers to sync
func (c *Controller) processNodeTopology() {
defer func() {
now := c.syncTracker.Track()
metrics.LastSyncTimestamp.Set(float64(now.UTC().UnixNano()))
}()
c.manager.SyncAllSyncers()
}

// mergeIngressPortInfo merges Ingress PortInfo into portInfoMap if the service has Enable Ingress annotation.
func (c *Controller) mergeIngressPortInfo(service *apiv1.Service, name types.NamespacedName, portInfoMap negtypes.PortInfoMap, networkInfo *network.NetworkInfo) error {
negAnnotation, foundNEGAnnotation, err := annotations.FromService(service).NEGAnnotation()
Expand Down Expand Up @@ -848,6 +920,17 @@ func (c *Controller) enqueueIngressServices(ing *v1.Ingress) {
}
}

func (c *Controller) enqueueNodeTopology(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err != nil {
c.logger.Error(err, "Failed to generate Node Topology key")
metrics.PublishNegControllerErrorCountMetrics(err, true)
return
}
c.logger.V(3).Info("Adding NodeTopology to nodeTopologyQueue for processing", "nodeTopology", key)
c.nodeTopologyQueue.Add(key)
}

func (c *Controller) gc() {
if err := c.manager.GC(); err != nil {
c.logger.Error(err, "NEG controller garbage collection failed")
Expand Down
14 changes: 14 additions & 0 deletions pkg/neg/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,20 @@ func (manager *syncerManager) SyncNodes() {
}
}

// SyncAllSyncers signals all syncers to sync.
func (manager *syncerManager) SyncAllSyncers() {
manager.mu.Lock()
defer manager.mu.Unlock()

for key, syncer := range manager.syncerMap {
if syncer.IsStopped() {
manager.logger.V(1).Info("SyncAllSyncers: Syncer is already stopped; not syncing.", "negSyncerKey", key.String())
continue
}
syncer.Sync()
}
}

// updateZoneMap updates the existingZoneMap with the latest zones and returns
// true if the zones have changed. The caller must obtain mu mutex of the
// manager before calling this function since it modifies the passed
Expand Down
2 changes: 2 additions & 0 deletions pkg/neg/types/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ type NegSyncerManager interface {
GC() error
// ShutDown shuts down the manager
ShutDown()
// SyncAllSyncer signals all syncers to sync. This call is asynchronous.
SyncAllSyncers()
}

type NetworkEndpointsCalculator interface {
Expand Down
19 changes: 19 additions & 0 deletions pkg/neg/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ package neg
import (
"fmt"

nodetopologyv1 "github.com/GoogleCloudPlatform/gke-networking-api/apis/nodetopology/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/ingress-gce/pkg/annotations"
"k8s.io/ingress-gce/pkg/neg/types"
)
Expand Down Expand Up @@ -58,3 +60,20 @@ func contains(ss []string, target string) bool {
}
return false
}

func isZoneChanged(oldZones, newZones []string) bool {
return !sets.NewString(oldZones...).Equal(sets.NewString(newZones...))
}

func isSubnetChanged(oldSubnets, newSubnets []nodetopologyv1.SubnetConfig) bool {
oldSubnetSet := sets.NewString()
newSubnetSet := sets.NewString()
for _, oldSubnet := range oldSubnets {
oldSubnetSet.Insert(oldSubnet.SubnetPath)
}
for _, newSubnet := range newSubnets {
newSubnetSet.Insert(newSubnet.SubnetPath)
}

return !oldSubnetSet.Equal(newSubnetSet)
}
Loading

0 comments on commit 3b273f4

Please sign in to comment.