Skip to content

Commit

Permalink
Pulsar Scaler: fix msgBacklogThreshold field being named wrongly as…
Browse files Browse the repository at this point in the history
… `msgBacklog` (#4736)

Co-authored-by: Jorge Turrado Ferrero <[email protected]>
  • Loading branch information
dttung2905 and JorTurFer authored Jun 30, 2023
1 parent 470e822 commit a57d00a
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 14 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ To learn more about active deprecations, we recommend checking [GitHub Discussio

### Fixes

- TODO ([#XXX](https://github.com/kedacore/keda/issue/XXX))
- **Pulsar Scaler**: Fix `msgBacklogThreshold` field being named wrongly as `msgBacklog` ([#4681](https://github.com/kedacore/keda/issues/4681))

### Deprecations

Expand Down
10 changes: 5 additions & 5 deletions pkg/scalers/authentication/authentication_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,15 @@ import "time"
type Type string

const (
// APIKeyAuthType is a auth type using an API key
// APIKeyAuthType is an auth type using an API key
APIKeyAuthType Type = "apiKey"
// BasicAuthType is a auth type using basic auth
// BasicAuthType is an auth type using basic auth
BasicAuthType Type = "basic"
// TLSAuthType is a auth type using TLS
// TLSAuthType is an auth type using TLS
TLSAuthType Type = "tls"
// BearerAuthType is a auth type using a bearer token
// BearerAuthType is an auth type using a bearer token
BearerAuthType Type = "bearer"
// CustomAuthType is a auth type using a custom header
// CustomAuthType is an auth type using a custom header
CustomAuthType Type = "custom"
)

Expand Down
18 changes: 15 additions & 3 deletions pkg/scalers/pulsar_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ type pulsarStats struct {

// NewPulsarScaler creates a new PulsarScaler
func NewPulsarScaler(config *ScalerConfig) (Scaler, error) {
pulsarMetadata, err := parsePulsarMetadata(config)
logger := InitializeLogger(config, "pulsar_scaler")
pulsarMetadata, err := parsePulsarMetadata(config, logger)
if err != nil {
return nil, fmt.Errorf("error parsing pulsar metadata: %w", err)
}
Expand Down Expand Up @@ -125,11 +126,11 @@ func NewPulsarScaler(config *ScalerConfig) (Scaler, error) {
return &pulsarScaler{
client: client,
metadata: pulsarMetadata,
logger: InitializeLogger(config, "pulsar_scaler"),
logger: logger,
}, nil
}

func parsePulsarMetadata(config *ScalerConfig) (pulsarMetadata, error) {
func parsePulsarMetadata(config *ScalerConfig, logger logr.Logger) (pulsarMetadata, error) {
meta := pulsarMetadata{}
switch {
case config.TriggerMetadata["adminURLFromEnv"] != "":
Expand Down Expand Up @@ -178,13 +179,24 @@ func parsePulsarMetadata(config *ScalerConfig) (pulsarMetadata, error) {

meta.msgBacklogThreshold = defaultMsgBacklogThreshold

// FIXME: msgBacklog support DEPRECATED to be removed in v2.14
fmt.Println(config.TriggerMetadata)
if val, ok := config.TriggerMetadata[msgBacklogMetricName]; ok {
logger.V(1).Info("\"msgBacklog\" is deprecated and will be removed in v2.14, please use \"msgBacklogThreshold\" instead")
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return meta, fmt.Errorf("error parsing %s: %w", msgBacklogMetricName, err)
}
meta.msgBacklogThreshold = t
} else if val, ok := config.TriggerMetadata["msgBacklogThreshold"]; ok {
t, err := strconv.ParseInt(val, 10, 64)
if err != nil {
return meta, fmt.Errorf("error parsing %s: %w", msgBacklogMetricName, err)
}
meta.msgBacklogThreshold = t
}
// END FIXME

// For backwards compatibility, we need to map "tls: enable" to
if tls, ok := config.TriggerMetadata["tls"]; ok {
if tls == enable && (config.AuthParams["cert"] != "" || config.AuthParams["key"] != "") {
Expand Down
38 changes: 34 additions & 4 deletions pkg/scalers/pulsar_scaler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package scalers
import (
"context"
"fmt"
"strconv"
"strings"
"testing"

Expand Down Expand Up @@ -60,6 +61,12 @@ var parsePulsarMetadataTestDataset = []parsePulsarMetadataTestData{
{map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"},
{map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, false, false, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"},
{map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "isPartitionedTopic": "true", "subscription": "sub1"}, false, false, true, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"},
// test metric msgBacklogThreshold
{map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "isPartitionedTopic": "true", "subscription": "sub1", "msgBacklogThreshold": "5"}, false, false, true, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"},
// FIXME: msgBacklog support DEPRECATED to be removed in v2.14
// test metric msgBacklog
{map[string]string{"adminURL": "http://127.0.0.1:8080", "topic": "persistent://public/default/my-topic", "isPartitionedTopic": "true", "subscription": "sub1", "msgBacklog": "5"}, false, false, true, "http://127.0.0.1:8080", "persistent://public/default/my-topic", "sub1"},
// END FIXME

// tls
{map[string]string{"adminURL": "https://localhost:8443", "tls": "enable", "cert": "certdata", "key": "keydata", "ca": "cadata", "topic": "persistent://public/default/my-topic", "subscription": "sub1"}, false, true, false, "https://localhost:8443", "persistent://public/default/my-topic", "sub1"},
Expand Down Expand Up @@ -87,7 +94,8 @@ var pulsarMetricIdentifiers = []pulsarMetricIdentifier{

func TestParsePulsarMetadata(t *testing.T) {
for _, testData := range parsePulsarMetadataTestDataset {
meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validPulsarWithAuthParams})
logger := InitializeLogger(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validPulsarWithAuthParams}, "test_pulsar_scaler")
meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: validPulsarWithAuthParams}, logger)

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
Expand Down Expand Up @@ -120,12 +128,32 @@ func TestParsePulsarMetadata(t *testing.T) {
t.Errorf("Expected subscription %s but got %s\n", testData.subscription, meta.subscription)
}

var testDataMsgBacklogThreshold int64
// FIXME: msgBacklog support DEPRECATED to be removed in v2.14
if val, ok := testData.metadata["msgBacklog"]; ok {
testDataMsgBacklogThreshold, err = strconv.ParseInt(val, 10, 64)
if err != nil {
t.Errorf("error parseing msgBacklog: %v", err)
}
// END FiXME
} else if val, ok := testData.metadata["msgBacklogThreshold"]; ok {
testDataMsgBacklogThreshold, err = strconv.ParseInt(val, 10, 64)
if err != nil {
t.Errorf("error parseing msgBacklogThreshold: %v", err)
}
} else {
testDataMsgBacklogThreshold = defaultMsgBacklogThreshold
}
if meta.msgBacklogThreshold != testDataMsgBacklogThreshold && testDataMsgBacklogThreshold != defaultMsgBacklogThreshold {
t.Errorf("Expected msgBacklogThreshold %s but got %d\n", testData.metadata["msgBacklogThreshold"], meta.msgBacklogThreshold)
}

authParams := validPulsarWithoutAuthParams
if k, ok := testData.metadata["tls"]; ok && k == "enable" {
authParams = validPulsarWithAuthParams
}

meta, err = parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: authParams})
meta, err = parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.metadata, AuthParams: authParams}, logger)

if err != nil && !testData.isError {
t.Error("Expected success but got error", err)
Expand All @@ -150,7 +178,8 @@ func TestParsePulsarMetadata(t *testing.T) {

func TestPulsarAuthParams(t *testing.T) {
for _, testData := range parsePulsarMetadataTestAuthTLSDataset {
meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams})
logger := InitializeLogger(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}, "test_pulsar_scaler")
meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.triggerMetadata, AuthParams: testData.authParams}, logger)

if err != nil && !testData.isError {
t.Error("Expected success but got error", testData.authParams, err)
Expand Down Expand Up @@ -204,7 +233,8 @@ func TestPulsarAuthParams(t *testing.T) {

func TestPulsarGetMetricSpecForScaling(t *testing.T) {
for _, testData := range pulsarMetricIdentifiers {
meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validWithAuthParams})
logger := InitializeLogger(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validWithAuthParams}, "test_pulsar_scaler")
meta, err := parsePulsarMetadata(&ScalerConfig{TriggerMetadata: testData.metadataTestData.metadata, AuthParams: validWithAuthParams}, logger)
if err != nil {
if testData.metadataTestData.isError {
continue
Expand Down
2 changes: 1 addition & 1 deletion pkg/scalers/scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ func InitializeLogger(config *ScalerConfig, scalerName string) logr.Logger {
return logf.Log.WithName(scalerName).WithValues("type", config.ScalableObjectType, "namespace", config.ScalableObjectNamespace, "name", config.ScalableObjectName)
}

// GetMetricTargetType helps getting the metric target type of the scaler
// GetMetricTargetType helps get the metric target type of the scaler
func GetMetricTargetType(config *ScalerConfig) (v2.MetricTargetType, error) {
switch config.MetricType {
case v2.UtilizationMetricType:
Expand Down

0 comments on commit a57d00a

Please sign in to comment.