Skip to content

Commit

Permalink
Merge branch 'main' into feature/support-access-token-postgres-scaler
Browse files Browse the repository at this point in the history
  • Loading branch information
Ferdinanddb authored Jul 27, 2024
2 parents 21b8e3c + d4fcc84 commit 1a17069
Show file tree
Hide file tree
Showing 6 changed files with 78 additions and 23 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/static-analysis-semgrep.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ name: "Semgrep"

on:
push:
branches: [ "main" ]
branches: ["main"]
pull_request_target: {}

concurrency:
Expand Down Expand Up @@ -30,7 +30,7 @@ jobs:
apk add github-cli
gh pr checkout ${{ github.event.number }}
- run: semgrep ci --sarif --output=semgrep.sarif
- run: semgrep ci --exclude=test --exclude=test --sarif --output=semgrep.sarif
env:
SEMGREP_APP_TOKEN: ${{ secrets.SEMGREP_APP_TOKEN }}

Expand Down
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ Here is an overview of all new **experimental** features:
### Improvements

- **Cassandra Scaler**: Add TLS support for cassandra scaler ([#5802](https://github.com/kedacore/keda/issues/5802))
- **GCP Pub/Sub**: Add optional valueIfNull to allow a default scaling value and prevent errors when GCP metric returns no value. ([#5896](https://github.com/kedacore/keda/issues/5896))
- **GCP Scalers**: Added custom time horizon in GCP scalers ([#5778](https://github.com/kedacore/keda/issues/5778))
- **GitHub Scaler**: Fixed pagination, fetching repository list ([#5738](https://github.com/kedacore/keda/issues/5738))
- **IBM MQ Scaler**: Add TLS support for IBM MQ scaler ([#5974](https://github.com/kedacore/keda/issues/5974))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **MYSQL Scaler**: Add support to fetch username from env ([#5883](https://github.com/kedacore/keda/issues/5883))
- **Postgres Scaler**: Add support for access token authentication to an Azure Postgres Flexible Server ([#5823](https://github.com/kedacore/keda/issues/5823))
Expand Down
7 changes: 5 additions & 2 deletions pkg/scalers/gcp/gcp_stackdriver_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (s StackDriverClient) GetMetrics(
//
// MQL provides a more expressive query language than
// the current filtering options of GetMetrics
func (s StackDriverClient) QueryMetrics(ctx context.Context, projectID, query string) (float64, error) {
func (s StackDriverClient) QueryMetrics(ctx context.Context, projectID, query string, valueIfNull *float64) (float64, error) {
req := &monitoringpb.QueryTimeSeriesRequest{
Query: query,
PageSize: 1,
Expand All @@ -303,7 +303,10 @@ func (s StackDriverClient) QueryMetrics(ctx context.Context, projectID, query st
resp, err := it.Next()

if err == iterator.Done {
return value, fmt.Errorf("could not find stackdriver metric with query %s", req.Query)
if valueIfNull == nil {
return value, fmt.Errorf("could not find stackdriver metric with query %s", req.Query)
}
return *valueIfNull, nil
}

if err != nil {
Expand Down
11 changes: 10 additions & 1 deletion pkg/scalers/gcp_pubsub_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type pubsubMetadata struct {
triggerIndex int
aggregation string
timeHorizon string
valueIfNull *float64
}

// NewPubSubScaler creates a new pubsubScaler
Expand Down Expand Up @@ -179,6 +180,14 @@ func parsePubSubMetadata(config *scalersconfig.ScalerConfig, logger logr.Logger)
}
}

if val, ok := config.TriggerMetadata["valueIfNull"]; ok && val != "" {
valueIfNull, err := strconv.ParseFloat(val, 64)
if err != nil {
return nil, fmt.Errorf("valueIfNull parsing error %w", err)
}
meta.valueIfNull = &valueIfNull
}

meta.aggregation = config.TriggerMetadata["aggregation"]

meta.timeHorizon = config.TriggerMetadata["timeHorizon"]
Expand Down Expand Up @@ -291,7 +300,7 @@ func (s *pubsubScaler) getMetrics(ctx context.Context, metricType string) (float

// Pubsub metrics are collected every 60 seconds so no need to aggregate them.
// See: https://cloud.google.com/monitoring/api/metrics_gcp#gcp-pubsub
return s.client.QueryMetrics(ctx, projectID, query)
return s.client.QueryMetrics(ctx, projectID, query, s.metadata.valueIfNull)
}

func getResourceData(s *pubsubScaler) (string, string) {
Expand Down
70 changes: 53 additions & 17 deletions pkg/scalers/ibmmq_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type IBMMQScaler struct {
metricType v2.MetricTargetType
metadata *IBMMQMetadata
defaultHTTPTimeout time.Duration
httpClient *http.Client
logger logr.Logger
}

Expand All @@ -45,6 +46,13 @@ type IBMMQMetadata struct {
activationQueueDepth int64
tlsDisabled bool
triggerIndex int

// TLS
ca string
cert string
key string
keyPassword string
unsafeSsl bool
}

// CommandResponse Full structured response from MQ admin REST query
Expand Down Expand Up @@ -75,16 +83,31 @@ func NewIBMMQScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
return nil, fmt.Errorf("error parsing IBM MQ metadata: %w", err)
}

httpClient := kedautil.CreateHTTPClient(config.GlobalHTTPTimeout, meta.tlsDisabled)

// Configure TLS if cert and key are specified
if meta.cert != "" && meta.key != "" {
tlsConfig, err := kedautil.NewTLSConfigWithPassword(meta.cert, meta.key, meta.keyPassword, meta.ca, meta.unsafeSsl)
if err != nil {
return nil, err
}
httpClient.Transport = kedautil.CreateHTTPTransportWithTLSConfig(tlsConfig)
}

return &IBMMQScaler{
metricType: metricType,
metadata: meta,
defaultHTTPTimeout: config.GlobalHTTPTimeout,
httpClient: httpClient,
logger: InitializeLogger(config, "ibm_mq_scaler"),
}, nil
}

// Close closes and returns nil
func (s *IBMMQScaler) Close(context.Context) error {
if s.httpClient != nil {
s.httpClient.CloseIdleConnections()
}
return nil
}

Expand Down Expand Up @@ -144,24 +167,38 @@ func parseIBMMQMetadata(config *scalersconfig.ScalerConfig) (*IBMMQMetadata, err
fmt.Println("No tls setting defined - setting default")
meta.tlsDisabled = defaultTLSDisabled
}
val, ok := config.AuthParams["username"]
switch {
case ok && val != "":

if val, ok := config.AuthParams["username"]; ok && val != "" {
meta.username = val
case config.TriggerMetadata["usernameFromEnv"] != "":
meta.username = config.ResolvedEnv[config.TriggerMetadata["usernameFromEnv"]]
default:
} else if val, ok := config.TriggerMetadata["usernameFromEnv"]; ok && val != "" {
meta.username = config.ResolvedEnv[val]
} else {
return nil, fmt.Errorf("no username given")
}
pwdValue, booleanValue := config.AuthParams["password"] // booleanValue reports whether the type assertion succeeded or not
switch {
case booleanValue && pwdValue != "":
meta.password = pwdValue
case config.TriggerMetadata["passwordFromEnv"] != "":
meta.password = config.ResolvedEnv[config.TriggerMetadata["passwordFromEnv"]]
default:

if val, ok := config.AuthParams["password"]; ok && val != "" {
meta.password = val
} else if val, ok := config.TriggerMetadata["passwordFromEnv"]; ok && val != "" {
meta.password = config.ResolvedEnv[val]
} else {
return nil, fmt.Errorf("no password given")
}

// TLS config (optional)
meta.ca = config.AuthParams["ca"]
meta.cert = config.AuthParams["cert"]
meta.key = config.AuthParams["key"]
meta.keyPassword = config.AuthParams["keyPassword"]

meta.unsafeSsl = false
if val, ok := config.TriggerMetadata["unsafeSsl"]; ok {
boolVal, err := strconv.ParseBool(val)
if err != nil {
return nil, fmt.Errorf("failed to parse unsafeSsl value. Must be either true or false")
}
meta.unsafeSsl = boolVal
}

meta.triggerIndex = config.TriggerIndex
return &meta, nil
}
Expand All @@ -178,19 +215,18 @@ func (s *IBMMQScaler) getQueueDepthViaHTTP(ctx context.Context) (int64, error) {
}
req.Header.Set("ibm-mq-rest-csrf-token", "value")
req.Header.Set("Content-Type", "application/json")
req.SetBasicAuth(s.metadata.username, s.metadata.password)

client := kedautil.CreateHTTPClient(s.defaultHTTPTimeout, s.metadata.tlsDisabled)
req.SetBasicAuth(s.metadata.username, s.metadata.password)

resp, err := client.Do(req)
resp, err := s.httpClient.Do(req)
if err != nil {
return 0, fmt.Errorf("failed to contact MQ via REST: %w", err)
}
defer resp.Body.Close()

body, err := io.ReadAll(resp.Body)
if err != nil {
return 0, fmt.Errorf("failed to ready body of request: %w", err)
return 0, fmt.Errorf("failed to read body of request: %w", err)
}

var response CommandResponse
Expand Down
7 changes: 6 additions & 1 deletion pkg/scalers/ibmmq_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,16 @@ var testIBMMQMetadata = []parseIBMMQMetadataTestData{
{map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Invalid URL
{map[string]string{"host": testInvalidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Properly formed authParams
// Properly formed authParams Basic Auth
{map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123"}},
// Properly formed authParams Basic Auth and TLS
{map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, false, map[string]string{"username": "testUsername", "password": "Pass123", "ca": "cavalue", "cert": "certvalue", "key": "keyvalue"}},
// No username provided
{map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"password": "Pass123"}},
// No password provided
{map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10"}, true, map[string]string{"username": "testUsername"}},
// Wrong input unsafeSsl
{map[string]string{"host": testValidMQQueueURL, "queueManager": "testQueueManager", "queueName": "testQueue", "queueDepth": "10", "unsafeSsl": "random"}, true, map[string]string{"username": "testUsername", "password": "Pass123"}},
}

// Test MQ Connection metadata is parsed correctly
Expand Down Expand Up @@ -216,6 +220,7 @@ func TestIBMMQScalerGetQueueDepthViaHTTP(t *testing.T) {
metadata: &IBMMQMetadata{
host: server.URL,
},
httpClient: server.Client(),
}

value, err := scaler.getQueueDepthViaHTTP(context.Background())
Expand Down

0 comments on commit 1a17069

Please sign in to comment.