Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: configurable burst and QPS in k8s client #279

Merged
merged 2 commits into from
Apr 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 12 additions & 1 deletion cmd/x509-certificate-exporter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

_ "go.uber.org/automaxprocs"
"k8s.io/client-go/util/flowcontrol"

"github.com/enix/x509-certificate-exporter/v3/internal"
getopt "github.com/pborman/getopt/v2"
Expand Down Expand Up @@ -38,6 +39,9 @@ func main() {
maxCacheDuration := durationFlag(0)
getopt.FlagLong(&maxCacheDuration, "max-cache-duration", 0, "maximum cache duration for kube secrets. cache is per namespace and randomized to avoid massive requests.")

rateLimitQPS := getopt.IntLong("kube-api-rate-limit-qps", 0, 0, "Kubernetes API request rate limit")
rateLimitBurst := getopt.IntLong("kube-api-rate-limit-burst", 0, 0, "Kubernetes API request burst")

files := stringArrayFlag{}
getopt.FlagLong(&files, "watch-file", 'f', "watch one or more x509 certificate file")

Expand Down Expand Up @@ -130,7 +134,14 @@ func main() {
configpath = defaultKubeConfig
}

err := exporter.ConnectToKubernetesCluster(configpath)
// Set rate limiter only if both QPS and burst are set
var rateLimiter flowcontrol.RateLimiter
if *rateLimitQPS > 0 && *rateLimitBurst > 0 {
log.Infof("setting Kubernetes API rate limiter to %d QPS and %d burst", *rateLimitQPS, *rateLimitBurst)
rateLimiter = flowcontrol.NewTokenBucketRateLimiter(float32(*rateLimitQPS), *rateLimitBurst)
}

err := exporter.ConnectToKubernetesCluster(configpath, rateLimiter)
if err != nil {
log.Fatal(err)
}
Expand Down
5 changes: 4 additions & 1 deletion deploy/charts/x509-certificate-exporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ hostPathsExporter:
| secretsExporter.priorityClassName | string | `""` | PriorityClassName for Pods of the TLS Secrets exporter |
| secretsExporter.podExtraLabels | object | `{}` | Additional labels added to Pods of the TLS Secrets exporter |
| secretsExporter.podAnnotations | object | `{}` | Annotations added to Pods of the TLS Secrets exporter |
| secretsExporter.podSecurityContext | object | `{}` | PodSecurityContext for Pods of the TLS Secrets exporter |
| secretsExporter.podSecurityContext | object | check `values.yaml` | PodSecurityContext for Pods of the TLS Secrets exporter |
| secretsExporter.securityContext | object | check `values.yaml` | SecurityContext for containers of the TLS Secrets exporter |
| secretsExporter.extraVolumes | list | `[]` | Additionnal volumes added to Pods of the TLS Secrets exporter (combined with global `extraVolumes`) |
| secretsExporter.extraVolumeMounts | list | `[]` | Additionnal volume mounts added to Pod containers of the TLS Secrets exporter (combined with global `extraVolumeMounts`) |
Expand All @@ -390,6 +390,9 @@ hostPathsExporter:
| secretsExporter.excludeLabels | list | `[]` | Exclude TLS Secrets having these labels. Items can be keys such as `my-label` or also require a value with syntax `my-label=my-value`. |
| secretsExporter.cache.enabled | bool | `true` | Enable caching of Kubernetes objects to prevent scraping timeouts |
| secretsExporter.cache.maxDuration | int | `300` | Maximum time an object can stay in cache unrefreshed (seconds) - it will be at least half of that |
| secretsExporter.kubeApiRateLimits.enabled | bool | `false` | Should requests to the Kubernetes API server be rate-limited |
| secretsExporter.kubeApiRateLimits.queriesPerSecond | int | `5` | Maximum rate of queries sent to the API server (per second) |
| secretsExporter.kubeApiRateLimits.burstQueries | int | `10` | Burst bucket size for queries sent to the API server |
| secretsExporter.env | list | `[]` | Additional environment variables for container |
| hostPathsExporter.debugMode | bool | `false` | Should debug messages be produced by hostPath exporters (default for all hostPathsExporter.daemonSets) |
| hostPathsExporter.restartPolicy | string | `"Always"` | restartPolicy for Pods of hostPath exporters (default for all hostPathsExporter.daemonSets) |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,12 @@ spec:
{{- else }}
- --max-cache-duration=0
{{- end }}
{{- with .Values.secretsExporter.kubeApiRateLimits }}
{{- if .enabled }}
- --kube-api-rate-limit-qps={{ int .queriesPerSecond }}
- --kube-api-rate-limit-burst={{ int .burstQueries }}
{{- end }}
{{- end }}
{{- if .Values.exposePerCertificateErrorMetrics }}
- --expose-per-cert-error-metrics
{{- end }}
Expand Down
9 changes: 9 additions & 0 deletions deploy/charts/x509-certificate-exporter/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ secretsExporter:
# -- Annotations added to Pods of the TLS Secrets exporter
podAnnotations: {}
# -- PodSecurityContext for Pods of the TLS Secrets exporter
# @default -- check `values.yaml`
podSecurityContext:
runAsNonRoot: true
# -- SecurityContext for containers of the TLS Secrets exporter
Expand Down Expand Up @@ -128,6 +129,14 @@ secretsExporter:
# -- Maximum time an object can stay in cache unrefreshed (seconds) - it will be at least half of that
maxDuration: 300

kubeApiRateLimits:
# -- Should requests to the Kubernetes API server be rate-limited
enabled: false
# -- Maximum rate of queries sent to the API server (per second)
queriesPerSecond: 5
# -- Burst bucket size for queries sent to the API server
burstQueries: 10

# -- Additional environment variables for container
env: []
# - name: GOMAXPROCS
Expand Down
11 changes: 8 additions & 3 deletions internal/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,14 @@ import (
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/flowcontrol"
)

// ConnectToKubernetesCluster : Try connect to a cluster from inside if path is empty,
// otherwise try loading the kubeconfig at path "path"
func (exporter *Exporter) ConnectToKubernetesCluster(path string) error {
func (exporter *Exporter) ConnectToKubernetesCluster(path string, rateLimiter flowcontrol.RateLimiter) error {
var err error
exporter.kubeClient, err = connectToKubernetesCluster(path, false)
exporter.kubeClient, err = connectToKubernetesCluster(path, false, rateLimiter)
return err
}

Expand Down Expand Up @@ -232,7 +233,7 @@ func (exporter *Exporter) shrinkSecret(secret v1.Secret) v1.Secret {
return secret
}

func connectToKubernetesCluster(kubeconfigPath string, insecure bool) (*kubernetes.Clientset, error) {
func connectToKubernetesCluster(kubeconfigPath string, insecure bool, rateLimiter flowcontrol.RateLimiter) (*kubernetes.Clientset, error) {
config, err := parseKubeConfig(kubeconfigPath)
if err != nil {
return nil, err
Expand All @@ -243,6 +244,10 @@ func connectToKubernetesCluster(kubeconfigPath string, insecure bool) (*kubernet
config.TLSClientConfig.CAData = nil
}

if rateLimiter != nil {
config.RateLimiter = rateLimiter
}

return getKubeClient(config)
}

Expand Down
10 changes: 5 additions & 5 deletions internal/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestMain(m *testing.M) {
panic(err)
}

sharedKubeClient, err = connectToKubernetesCluster("kubeconfig", true)
sharedKubeClient, err = connectToKubernetesCluster("kubeconfig", true, nil)
if err != nil {
panic(err)
}
Expand Down Expand Up @@ -286,7 +286,7 @@ func TestKubeMetricLabels(t *testing.T) {
}

func TestKubeNamespaceListFailure(t *testing.T) {
kubeClient, err := connectToKubernetesCluster("kubeconfig.x509-certificate-exporter", true)
kubeClient, err := connectToKubernetesCluster("kubeconfig.x509-certificate-exporter", true, nil)
if err != nil {
panic(err)
}
Expand All @@ -301,7 +301,7 @@ func TestKubeNamespaceListFailure(t *testing.T) {
}

func TestKubeSecretsListFailure(t *testing.T) {
kubeClient, err := connectToKubernetesCluster("kubeconfig.x509-certificate-exporter-list", true)
kubeClient, err := connectToKubernetesCluster("kubeconfig.x509-certificate-exporter-list", true, nil)
if err != nil {
panic(err)
}
Expand All @@ -316,7 +316,7 @@ func TestKubeSecretsListFailure(t *testing.T) {
}

func TestKubeInvalidConfig(t *testing.T) {
_, err := connectToKubernetesCluster("../test/kubeconfig-corrupted.yml", true)
_, err := connectToKubernetesCluster("../test/kubeconfig-corrupted.yml", true, nil)
assert.NotNil(t, err)
}

Expand Down Expand Up @@ -370,7 +370,7 @@ func TestKubeEmptyStringKey(t *testing.T) {

func TestKubeConnectionFromInsideFailure(t *testing.T) {
e := &Exporter{}
err := e.ConnectToKubernetesCluster("")
err := e.ConnectToKubernetesCluster("", nil)
assert.NotNil(t, err)
}

Expand Down
Loading