Skip to content

Commit

Permalink
fix nodes inforemer not updated
Browse files Browse the repository at this point in the history
  • Loading branch information
alexei-led committed May 22, 2023
1 parent f61cb18 commit ad6d056
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 64 deletions.
7 changes: 5 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,16 @@ var (
func runController(ctx context.Context, cluster string, log *logrus.Entry, clientset *kubernetes.Clientset, uploader firehose.Uploader) error {
// load nodes
nodesInformer := controller.NewNodesInformer()
loaded := nodesInformer.Load(ctx, cluster, clientset)
loaded, err := nodesInformer.Load(ctx, log, cluster, clientset)
if err != nil {
return errors.Wrap(err, "loading nodes")
}
// wait for nodes to be loaded
<-loaded

// create controller and run it
scanner := controller.New(log, clientset, uploader, nodesInformer)
err := scanner.Run(ctx)
err = scanner.Run(ctx)
if err != nil {
return errors.Wrap(err, "running scanner controller")
}
Expand Down
79 changes: 40 additions & 39 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ import (
)

const (
syncPeriod = 15 * time.Minute
syncPeriodDebug = 5 * time.Minute
developModeKey contextKey = "develop-mode"
podCacheSyncPeriod = 5 * time.Minute
syncPeriod = 15 * time.Minute
syncPeriodDebug = 5 * time.Minute
developModeKey contextKey = "develop-mode"
)

type contextKey string
Expand Down Expand Up @@ -64,7 +65,7 @@ func (s *scanner) DeletePod(obj interface{}) {
// get the node info from the cache
node, ok := s.nodeInformer.GetNode(pod.Spec.NodeName)
if !ok {
s.log.Warnf("getting node %s from cache", pod.Spec.NodeName)
s.log.Warnf("failed to get node %s from cache", pod.Spec.NodeName)
}
// convert PodInfo to usage record
now := time.Now()
Expand All @@ -89,7 +90,7 @@ func (s *scanner) Run(ctx context.Context) error {
DisableChunking: true,
},
&v1.Pod{},
syncPeriod,
podCacheSyncPeriod,
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)

Expand All @@ -116,44 +117,44 @@ func (s *scanner) Run(ctx context.Context) error {
tick = syncPeriodDebug
}

// upload function
upload := func() {
// get the list of pods from the cache
pods := podInformer.GetStore().List()
// convert PodInfo to usage record
now := time.Now()
beginTime := now.Add(-syncPeriod)
records := make([]*usage.PodInfo, 0, len(pods))
for _, obj := range pods {
pod := obj.(*v1.Pod)
// get the node info from the cache
node, ok := s.nodeInformer.GetNode(pod.Spec.NodeName)
if !ok {
s.log.Warnf("getting node %s from cache", pod.Spec.NodeName)
}
record := usage.GetPodInfo(s.log, pod, beginTime, now, node)
records = append(records, record)
}
// add deleted pods and clear the list if any
if len(s.deletedPods) > 0 {
s.log.WithField("count", len(s.deletedPods)).Debug("adding deleted pods to the pod records")
records = append(records, s.deletedPods...)
s.deletedPods = make([]*usage.PodInfo, 0)
}
// upload the records to EKS Lens
s.log.WithField("count", len(records)).Debug("uploading pod records to EKS Lens")
err = s.uploader.Upload(ctx, records)
if err != nil {
s.log.WithError(err).Error("uploading pods records to EKS Lens")
}
}
// upload first time
upload()

// get pod list from the cache every "syncPeriod/DevelopMode" minutes
ticker := time.NewTicker(tick)
defer ticker.Stop()
for {
upload := func() {
// get the list of pods from the cache
pods := podInformer.GetStore().List()
// convert PodInfo to usage record
now := time.Now()
beginTime := now.Add(-syncPeriod)
records := make([]*usage.PodInfo, 0, len(pods))
for _, obj := range pods {
pod := obj.(*v1.Pod)
// get the node info from the cache
node, ok := s.nodeInformer.GetNode(pod.Spec.NodeName)
if !ok {
s.log.Warnf("getting node %s from cache", pod.Spec.NodeName)
}
record := usage.GetPodInfo(s.log, pod, beginTime, now, node)
records = append(records, record)
}
// add deleted pods and clear the list if any
if len(s.deletedPods) > 0 {
s.log.WithField("count", len(s.deletedPods)).Debug("adding deleted pods to the pod records")
records = append(records, s.deletedPods...)
s.deletedPods = make([]*usage.PodInfo, 0)
}
// upload the records to EKS Lens
s.log.WithField("count", len(records)).Debug("uploading pod records to EKS Lens")
err = s.uploader.Upload(ctx, records)
if err != nil {
s.log.WithError(err).Error("uploading pods records to EKS Lens")
}
}

// upload first time
upload()

select {
case <-ctx.Done():
return nil
Expand Down
72 changes: 50 additions & 22 deletions internal/controller/nodes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@ package controller

import (
"context"
"log"
"sync"
"time"

"github.com/doitintl/eks-lens-agent/internal/usage"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -15,8 +16,18 @@ import (
"k8s.io/client-go/tools/cache"
)

const (
delayDelete = 1 * time.Minute
nodeCacheSyncPeriod = 5 * time.Minute
)

var (
// ErrCacheSync is returned when the cache fails to sync
ErrCacheSync = errors.New("failed to sync cache")
)

type NodesInformer interface {
Load(ctx context.Context, cluster string, clientset kubernetes.Interface) chan bool
Load(ctx context.Context, log *logrus.Entry, cluster string, clientset kubernetes.Interface) (chan bool, error)
GetNode(nodeName string) (*usage.NodeInfo, bool)
}

Expand All @@ -39,9 +50,11 @@ func (n *NodesMap) GetNode(nodeName string) (*usage.NodeInfo, bool) {
}

// Load loads the NodesMap with the current nodes in the cluster return channel to signal when the map is loaded
func (n *NodesMap) Load(ctx context.Context, cluster string, clientset kubernetes.Interface) chan bool {
//
//nolint:funlen
func (n *NodesMap) Load(ctx context.Context, log *logrus.Entry, cluster string, clientset kubernetes.Interface) (chan bool, error) {
// Create a new Node informer
nodeInformer := cache.NewSharedIndexInformer(
nodeInformer := cache.NewSharedInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (object runtime.Object, err error) {
return clientset.CoreV1().Nodes().List(context.Background(), options) //nolint:wrapcheck
Expand All @@ -51,20 +64,19 @@ func (n *NodesMap) Load(ctx context.Context, cluster string, clientset kubernete
},
},
&v1.Node{},
0, // resyncPeriod
cache.Indexers{},
nodeCacheSyncPeriod,
)

// create stopper channel
stopper := make(chan struct{})
defer close(stopper)

// Start the Node informer
go nodeInformer.Run(stopper)

// Wait for the Node informer to sync
log.Debug("waiting for node informer to sync")
if !cache.WaitForCacheSync(make(chan struct{}), nodeInformer.HasSynced) {
log.Panicf("Error syncing node informer cache")
return nil, ErrCacheSync
}

// Process Node add and delete events
Expand All @@ -74,6 +86,7 @@ func (n *NodesMap) Load(ctx context.Context, cluster string, clientset kubernete
nodeInfo := usage.NodeInfoFromNode(cluster, node)
n.mu.Lock()
defer n.mu.Unlock()
log.WithField("node", node.Name).Debug("adding node to map")
n.data[node.Name] = nodeInfo
},
DeleteFunc: func(obj interface{}) {
Expand All @@ -84,11 +97,18 @@ func (n *NodesMap) Load(ctx context.Context, cluster string, clientset kubernete
}
n.mu.Lock()
defer n.mu.Unlock()
delete(n.data, node.Name)
// non-blocking delete from the map after 5 minutes
go func() {
time.Sleep(delayDelete)
n.mu.Lock()
defer n.mu.Unlock()
log.WithField("node", node.Name).Debug("deleting node from map after delay")
delete(n.data, node.Name)
}()
},
})
if err != nil {
log.Panicf("Error adding event handler to node informer: %v", err)
return nil, errors.Wrap(err, "failed to add event handler to node informer")
}

// Create a channel to signal when the map is loaded
Expand All @@ -97,10 +117,6 @@ func (n *NodesMap) Load(ctx context.Context, cluster string, clientset kubernete
// Update the Node map periodically
previousResourceVersion := "0" // the resource version of the nodes at the last sync
go func() {
ticker := time.NewTicker(syncPeriod)
// abort if the context is cancelled
defer ticker.Stop()

// refresh the nodes map and send to the loaded channel (if not already sent)
refresh := func() {
n.mu.Lock()
Expand All @@ -112,9 +128,14 @@ func (n *NodesMap) Load(ctx context.Context, cluster string, clientset kubernete
return
}

// If the nodes have been updated, update the nodes map
// clear the nodes map
log.Debug("refreshing nodes map with latest nodes")
n.data = make(map[string]usage.NodeInfo)

// update the nodes map
for _, obj := range nodeInformer.GetStore().List() {
node := obj.(*v1.Node)
log.WithField("node", node.Name).Debug("adding node to map")
n.data[node.Name] = usage.NodeInfoFromNode(cluster, node)
}

Expand All @@ -133,14 +154,21 @@ func (n *NodesMap) Load(ctx context.Context, cluster string, clientset kubernete
// refresh the nodes map once before starting the ticker
refresh()

// loop until the context is cancelled
select {
case <-ctx.Done():
return
case <-ticker.C:
refresh()
// refresh the nodes map periodically
ticker := time.NewTicker(nodeCacheSyncPeriod)
defer ticker.Stop()
for {
// loop until the context is cancelled
select {
case <-ctx.Done():
// context is cancelled, close the stopper channel to stop the informer
close(stopper)
return
case <-ticker.C:
refresh()
}
}
}()

return loaded
return loaded, nil
}
3 changes: 2 additions & 1 deletion internal/controller/nodes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/doitintl/eks-lens-agent/internal/usage"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -78,7 +79,7 @@ func TestNodesInformerLoad(t *testing.T) {
// Load the nodes using the fake clientset
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Second)
defer cancel()
loaded := nodesInformer.Load(ctx, "test-cluster", clientset)
loaded, _ := nodesInformer.Load(ctx, logrus.NewEntry(logrus.New()), "test-cluster", clientset)

// Check if the nodes are loaded
select {
Expand Down

0 comments on commit ad6d056

Please sign in to comment.