diff --git a/cloud/pkg/edgecontroller/constants/service.go b/cloud/pkg/edgecontroller/constants/service.go index 12c8c30dce1..b35405d868e 100644 --- a/cloud/pkg/edgecontroller/constants/service.go +++ b/cloud/pkg/edgecontroller/constants/service.go @@ -29,4 +29,11 @@ const ( EdgeNodeLabel = "node-role.kubernetes.io/edge" // node ready status Ready = "Ready" + + // default filter configmap name + FilterConfig = "filter-config" + // default filter namespace item name + FilterPodNamespaces = "filterPodNamespaces" + // default filter pod name prefix + FilterPodNamePrefixs = "filterPodNamePrefixs" ) diff --git a/cloud/pkg/edgecontroller/controller/downstream.go b/cloud/pkg/edgecontroller/controller/downstream.go index 2f52e2e46c8..539776fed1e 100644 --- a/cloud/pkg/edgecontroller/controller/downstream.go +++ b/cloud/pkg/edgecontroller/controller/downstream.go @@ -2,6 +2,7 @@ package controller import ( "context" + "time" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -48,6 +49,8 @@ type DownstreamController struct { lc *manager.LocationCache podLister clientgov1.PodLister + + config *v1alpha1.EdgeController } func (dc *DownstreamController) syncPod() { @@ -65,6 +68,9 @@ func (dc *DownstreamController) syncPod() { if !dc.lc.IsEdgeNode(pod.Spec.NodeName) { continue } + if !util.IsEdgePod(dc.config, pod) { + continue + } resource, err := messagelayer.BuildResource(pod.Spec.NodeName, pod.Namespace, model.ResourceTypePod, pod.Name) if err != nil { klog.Warningf("built message resource failed with error: %s", err) @@ -431,6 +437,37 @@ func (dc *DownstreamController) initLocating() error { return nil } +func (dc *DownstreamController) setFilterConfig() { + setFunc := func() { + filterConfig, err := dc.kubeClient.CoreV1().ConfigMaps(v1.NamespaceDefault).Get(context.Background(), constants.FilterConfig, metav1.GetOptions{}) + if err != nil { + klog.Warning("Edgecontroller get filter config failed: %v", err) + return + } + // if configmap has been set, use configmap. otherwise local config will be used + namespaces := filterConfig.Data[constants.FilterPodNamespaces] + if namespaces != "" { + dc.config.FilterPodNamespaces = namespaces + } + namePrefixs := filterConfig.Data[constants.FilterPodNamePrefixs] + if namePrefixs != "" { + dc.config.FilterPodNamePrefixs = namePrefixs + } + } + setFunc() + + ticker := time.NewTicker(time.Minute) + for { + select { + case <-beehiveContext.Done(): + klog.Warning("Stop edgecontroller init filter used pod, secret and configmap config loop") + return + case <-ticker.C: + setFunc() + } + } +} + // NewDownstreamController create a DownstreamController from config func NewDownstreamController(config *v1alpha1.EdgeController, k8sInformerFactory k8sinformers.SharedInformerFactory, keInformerFactory informers.KubeEdgeCustomInformer, crdInformerFactory crdinformers.SharedInformerFactory) (*DownstreamController, error) { @@ -488,10 +525,12 @@ func NewDownstreamController(config *v1alpha1.EdgeController, k8sInformerFactory podLister: podInformer.Lister(), rulesManager: rulesManager, ruleEndpointsManager: ruleEndpointsManager, + config: config, } if err := dc.initLocating(); err != nil { return nil, err } + go dc.setFilterConfig() return dc, nil } diff --git a/cloud/pkg/edgecontroller/util/util.go b/cloud/pkg/edgecontroller/util/util.go new file mode 100644 index 00000000000..3d2cfc1cb89 --- /dev/null +++ b/cloud/pkg/edgecontroller/util/util.go @@ -0,0 +1,56 @@ +package util + +import ( + "strings" + + v1 "k8s.io/api/core/v1" + + "github.com/kubeedge/kubeedge/pkg/apis/componentconfig/cloudcore/v1alpha1" +) + +func IsEdgePod(config *v1alpha1.EdgeController, pod *v1.Pod) bool { + var namespaces []string + var podNamePrefixs []string + + if len(config.FilterPodNamespaces) != 0 { + namespaces = strings.Split(config.FilterPodNamespaces, ",") + } + + if len(config.FilterPodNamePrefixs) != 0 { + podNamePrefixs = strings.Split(config.FilterPodNamePrefixs, ",") + } + + if InArray(namespaces, pod.Namespace) { + return false + } + + if MatchPrefixs(podNamePrefixs, pod.Name) { + return false + } + + return true +} + +func InArray(arr []string, target string) bool { + if len(arr) == 0 { + return false + } + for _, elem := range arr { + if elem == target { + return true + } + } + return false +} + +func MatchPrefixs(arr []string, target string) bool { + if len(arr) == 0 { + return false + } + for _, elem := range arr { + if strings.HasPrefix(target, elem) { + return true + } + } + return false +} diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go index 1ccc625a76a..81d51bd0c7e 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/default.go @@ -40,9 +40,9 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig { BindAddress: "127.0.0.1:9001", EnableProfiling: false, Prometheus: Prometheus{ - Server: "127.0.0.1:9091", + Server: "127.0.0.1:9091", IntervalS: 10, - Job: constants.DefaultJobName, + Job: constants.DefaultJobName, }, }, }, @@ -55,7 +55,7 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig { CloudHub: &CloudHub{ Enable: true, KeepaliveInterval: 30, - NodeLimit: constants.DefaultNodeLimit, // TODO: tune NodeLimit + NodeLimit: constants.DefaultNodeLimit, // TODO: tune NodeLimit TLSCAFile: constants.DefaultCAFile, TLSCAKeyFile: constants.DefaultCAKeyFile, TLSCertFile: constants.DefaultCertFile, @@ -69,7 +69,7 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig { Enable: false, Address: "0.0.0.0", Port: 10001, - MaxIncomingStreams: 10000, // TODO: tune MaxIncomingStreams + MaxIncomingStreams: 10000, // TODO: tune MaxIncomingStreams }, UnixSocket: &CloudHubUnixSocket{ Enable: true, @@ -87,15 +87,17 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig { }, }, EdgeController: &EdgeController{ - Enable: true, - NodeUpdateFrequency: 10, - Buffer: getDefaultEdgeControllerBuffer(constants.DefaultNodeLimit), - Load: getDefaultEdgeControllerLoad(constants.DefaultNodeLimit), + Enable: true, + NodeUpdateFrequency: 10, + Buffer: getDefaultEdgeControllerBuffer(constants.DefaultNodeLimit), + Load: getDefaultEdgeControllerLoad(constants.DefaultNodeLimit), + FilterPodNamespaces: "", + FilterPodNamePrefixs: "", ReportNodeConnectionStatusConfig: &ReportNodeConnectionStatusConfig{ - Schema: "http", - Address: "127.0.0.1", - Port: 8088, - ReportPath: constants.DefaultNodeConnectionReportPath, + Schema: "http", + Address: "127.0.0.1", + Port: 8088, + ReportPath: constants.DefaultNodeConnectionReportPath, }, }, DeviceController: &DeviceController{ @@ -139,7 +141,7 @@ func NewDefaultCloudCoreConfig() *CloudCoreConfig { Router: &Router{ Enable: false, Address: "0.0.0.0", - Port: 9443, // TODO: Need to config the Router Port + Port: 9443, // TODO: Need to config the Router Port RestTimeout: 60, }, IptablesManager: &IptablesManager{ @@ -202,7 +204,7 @@ func getDefaultEdgeControllerBuffer(nodeLimit int32) *EdgeControllerBuffer { CreateLease: 1024 + nodeLimit, QueryLease: constants.DefaultQueryLeaseBuffer, ServiceAccountToken: constants.DefaultServiceAccountTokenBuffer, - ReportNode: constants.DefaultReportNodeConnectionStatusBuffer, + ReportNode: constants.DefaultReportNodeConnectionStatusBuffer, } } diff --git a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go index 4fa2422e29c..619bbf16cdb 100644 --- a/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go +++ b/pkg/apis/componentconfig/cloudcore/v1alpha1/types.go @@ -64,13 +64,13 @@ type MonitorServer struct { type Prometheus struct { // Server is the IP address and port for the metric data to push, // defaulting to 127.0.0.1:9091 (set to 0.0.0.0 for all interfaces) - Server string `json:"server,omitempty"` + Server string `json:"server,omitempty"` // IntervalS is the time interval for pushing data to the prometheus service, // defaulting to 10 IntervalS int `json:"interval_s,omitempty"` // Job is the name of push job, // defaulting to connected_node_count - Job string `json:"job,omitempty"` + Job string `json:"job,omitempty"` } // KubeAPIConfig indicates the configuration for interacting with k8s server @@ -233,16 +233,19 @@ type EdgeController struct { Buffer *EdgeControllerBuffer `json:"buffer,omitempty"` // Load indicates EdgeController load Load *EdgeControllerLoad `json:"load,omitempty"` + // filter Pod Namespace + FilterPodNamespaces string `json:"filterPodNamespaces,omitempty"` + // filter Pod name prefix + FilterPodNamePrefixs string `json:"filterPodNamePrefixs,omitempty"` // ReportNodeConnectionStatusConfig indicates report node connection status url ReportNodeConnectionStatusConfig *ReportNodeConnectionStatusConfig `json:"ReportNodeConnectionStatusConfig"` } - type ReportNodeConnectionStatusConfig struct { - Schema string `json:"schema,omitempty"` - Address string `json:"address,omitempty"` - Port uint32 `json:"port,omitempty"` - ReportPath string `json:"reportPath,omitempty"` + Schema string `json:"schema,omitempty"` + Address string `json:"address,omitempty"` + Port uint32 `json:"port,omitempty"` + ReportPath string `json:"reportPath,omitempty"` } // EdgeControllerBuffer indicates the EdgeController buffer