From bd1958487f4e28e4f21ee328535e1d54ea08264d Mon Sep 17 00:00:00 2001 From: Souyama Debnath Date: Sun, 16 Jul 2023 00:46:32 +0530 Subject: [PATCH] fix: static check issues Signed-off-by: Souyama Debnath --- pkg/scalers/kafka_x_scaler.go | 100 +++++++++++++---------------- pkg/scalers/kafka_x_scaler_test.go | 4 +- 2 files changed, 48 insertions(+), 56 deletions(-) diff --git a/pkg/scalers/kafka_x_scaler.go b/pkg/scalers/kafka_x_scaler.go index d1b779f33bc..3e974ed7a04 100644 --- a/pkg/scalers/kafka_x_scaler.go +++ b/pkg/scalers/kafka_x_scaler.go @@ -10,7 +10,6 @@ import ( sigv4 "github.com/aws/aws-sdk-go/aws/signer/v4" "github.com/go-logr/logr" - kedautil "github.com/kedacore/keda/v2/pkg/util" "github.com/segmentio/kafka-go" "github.com/segmentio/kafka-go/sasl" "github.com/segmentio/kafka-go/sasl/aws_msk_iam" @@ -18,6 +17,8 @@ import ( "github.com/segmentio/kafka-go/sasl/scram" v2 "k8s.io/api/autoscaling/v2" "k8s.io/metrics/pkg/apis/external_metrics" + + kedautil "github.com/kedacore/keda/v2/pkg/util" ) type kafkaXScaler struct { @@ -53,7 +54,6 @@ type kafkaXMetadata struct { awsEndpoint string awsAuthorization awsAuthorizationMetadata - // TLS enableTLS bool cert string @@ -164,46 +164,43 @@ func parseKafkaXAuthParams(config *ScalerConfig, meta *kafkaXMetadata) error { if saslAuthType != "" { saslAuthType = strings.TrimSpace(saslAuthType) - mode := kafkaSaslType(saslAuthType) - - if mode == KafkaSASLTypeMskIam { + switch mode := kafkaSaslType(saslAuthType); mode { + case KafkaSASLTypeMskIam: + meta.saslType = mode + if val, ok := config.TriggerMetadata["awsEndpoint"]; ok { + meta.awsEndpoint = val + } if !meta.enableTLS { return errors.New("TLS is required for MSK") } - if val, ok := config.TriggerMetadata["awsRegion"]; ok && val != "" { meta.awsRegion = val } else { - return fmt.Errorf("no awsRegion given") + return errors.New("no awsRegion given") } - - if val, ok := config.TriggerMetadata["awsEndpoint"]; ok { - meta.awsEndpoint = val - } - auth, err := getAwsAuthorization(config.AuthParams, config.TriggerMetadata, config.ResolvedEnv) if err != nil { return err } meta.awsAuthorization = auth - meta.saslType = mode - } else if mode == KafkaSASLTypePlaintext || mode == KafkaSASLTypeSCRAMSHA256 || mode == KafkaSASLTypeSCRAMSHA512 { - if config.AuthParams["username"] == "" { + case KafkaSASLTypePlaintext: + fallthrough + case KafkaSASLTypeSCRAMSHA256: + fallthrough + case KafkaSASLTypeSCRAMSHA512: + if val, ok := config.AuthParams["username"]; ok { + meta.username = strings.TrimSpace(val) + } else { return errors.New("no username given") } - meta.username = strings.TrimSpace(config.AuthParams["username"]) - - if config.AuthParams["password"] == "" { + if val, ok := config.AuthParams["password"]; ok { + meta.password = strings.TrimSpace(val) + } else { return errors.New("no password given") } - meta.password = strings.TrimSpace(config.AuthParams["password"]) - meta.saslType = mode - } else if mode == KafkaSASLTypeOAuthbearer { - // TODO: implement - return fmt.Errorf("SASL/OAUTHBEARER is not implemented yet") - } else { - return fmt.Errorf("err SASL mode %s given", mode) - } + case KafkaSASLTypeOAuthbearer: + return errors.New("SASL/OAUTHBEARER is not implemented yet") + } } return nil @@ -328,9 +325,8 @@ func parseKafkaXMetadata(config *ScalerConfig, logger logr.Logger) (kafkaXMetada } func getKafkaXClient(metadata kafkaXMetadata, logger logr.Logger) (*kafka.Client, error) { - - var saslMechanism sasl.Mechanism = nil - var tlsConfig *tls.Config = nil + var saslMechanism sasl.Mechanism + var tlsConfig *tls.Config var err error logger.V(4).Info(fmt.Sprintf("Kafka SASL type %s", metadata.saslType)) @@ -341,32 +337,32 @@ func getKafkaXClient(metadata kafkaXMetadata, logger logr.Logger) (*kafka.Client } } - if metadata.saslType == KafkaSASLTypePlaintext { + switch metadata.saslType { + case KafkaSASLTypePlaintext: saslMechanism = plain.Mechanism{ Username: metadata.username, Password: metadata.password, } - } else if metadata.saslType == KafkaSASLTypeSCRAMSHA256 { + case KafkaSASLTypeSCRAMSHA256: saslMechanism, err = scram.Mechanism(scram.SHA256, metadata.username, metadata.password) if err != nil { return nil, err } - } else if metadata.saslType == KafkaSASLTypeSCRAMSHA512 { + case KafkaSASLTypeSCRAMSHA512: saslMechanism, err = scram.Mechanism(scram.SHA512, metadata.username, metadata.password) if err != nil { return nil, err } - } else if metadata.saslType == KafkaSASLTypeOAuthbearer { - // TODO: implement - return nil, fmt.Errorf("SASL/OAUTHBEARER is not implemented yet") - } else if metadata.saslType == KafkaSASLTypeMskIam { + case KafkaSASLTypeOAuthbearer: + return nil, errors.New("SASL/OAUTHBEARER is not implemented yet") + case KafkaSASLTypeMskIam: _, config := getAwsConfig(metadata.awsRegion, metadata.awsEndpoint, metadata.awsAuthorization) - + saslMechanism = &aws_msk_iam.Mechanism{ - Signer: sigv4.NewSigner(config.Credentials), - Region: metadata.awsRegion, + Signer: sigv4.NewSigner(config.Credentials), + Region: metadata.awsRegion, } } @@ -424,22 +420,21 @@ func (s *kafkaXScaler) getTopicPartitions() (map[string][]int, error) { result[topic.Name] = partitions } return result, nil - } else { - result := make(map[string][]int) - for _, topic := range metadata.Topics { - partitions := make([]int, 0) - if kedautil.Contains(s.metadata.topic, topic.Name) { - for _, partition := range topic.Partitions { - if (len(s.metadata.partitionLimitation) == 0) || - (len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) { - partitions = append(partitions, partition.ID) - } + } + result := make(map[string][]int) + for _, topic := range metadata.Topics { + partitions := make([]int, 0) + if kedautil.Contains(s.metadata.topic, topic.Name) { + for _, partition := range topic.Partitions { + if (len(s.metadata.partitionLimitation) == 0) || + (len(s.metadata.partitionLimitation) > 0 && kedautil.Contains(s.metadata.partitionLimitation, int32(partition.ID))) { + partitions = append(partitions, partition.ID) } } - result[topic.Name] = partitions } - return result, nil + result[topic.Name] = partitions } + return result, nil } func (s *kafkaXScaler) getConsumerOffsets(topicPartitions map[string][]int) (map[string]map[int]int64, error) { @@ -535,8 +530,6 @@ func (s *kafkaXScaler) Close(context.Context) error { if transport != nil { transport.CloseIdleConnections() } - //s.client = nil - //s.transport = nil return nil } @@ -568,7 +561,6 @@ type kafkaXProducerOffsetResult struct { err error } - // getConsumerAndProducerOffsets returns (consumerOffsets, producerOffsets, error) func (s *kafkaXScaler) getConsumerAndProducerOffsets(topicPartitions map[string][]int) (map[string]map[int]int64, map[string]map[int]int64, error) { consumerChan := make(chan kafkaXConsumerOffsetResult, 1) diff --git a/pkg/scalers/kafka_x_scaler_test.go b/pkg/scalers/kafka_x_scaler_test.go index 4d2a1e73fab..8f28e1f8865 100644 --- a/pkg/scalers/kafka_x_scaler_test.go +++ b/pkg/scalers/kafka_x_scaler_test.go @@ -73,7 +73,7 @@ var parseKafkaXMetadataTestDataset = []parseKafkaXMetadataTestData{ // success, no limitation {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "partitionLimitation": ""}, false, 1, []string{"foobar:9092"}, "my-group", []string{}, nil, offsetResetPolicy("latest"), false, false}, // TODO: remove failure, version not supported?? - //{map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false}, + {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "version": "1.2.3.4"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false}, // failure, lagThreshold is negative value {map[string]string{"bootstrapServers": "foobar:9092", "consumerGroup": "my-group", "topic": "my-topics", "lagThreshold": "-1"}, true, 1, []string{"foobar:9092"}, "my-group", []string{"my-topics"}, nil, offsetResetPolicy("latest"), false, false}, // failure, lagThreshold is 0 @@ -334,7 +334,7 @@ func TestKafkaXAuthParams(t *testing.T) { t.Errorf("Test case: %#v. Expected error but got success", id) } if !testData.isError { - if testData.metadata["tls"] == "true" && !meta.enableTLS { + if testData.metadata["tls"] == stringTrue && !meta.enableTLS { t.Errorf("Test case: %#v. Expected tls to be set to %#v but got %#v\n", id, testData.metadata["tls"], meta.enableTLS) } if meta.enableTLS {