diff --git a/config/connections.go b/config/connections.go index 52103e3..f40c82c 100644 --- a/config/connections.go +++ b/config/connections.go @@ -37,6 +37,8 @@ import ( "github.com/eclipse/ditto-clients-golang/model" conn "github.com/eclipse-kanto/suite-connector/connector" + + "github.com/eclipse/paho.mqtt.golang/packets" ) var ( @@ -139,6 +141,7 @@ func CreateHubConnection( honoConfig.NoOpStore = nopStore honoConfig.ConnectRetryInterval = 0 + honoConfig.AuthErrRetries = 5 honoConfig.BackoffMultiplier = 2 honoConfig.MinReconnectInterval = time.Minute honoConfig.MaxReconnectInterval = 4 * time.Minute @@ -151,6 +154,10 @@ func CreateHubConnection( honoConfig.MaxReconnectInterval = time.Duration(max) * time.Second } + if retries, err := strconv.ParseInt(os.Getenv("HUB_CONNECT_AUTH_ERR_RETRIES"), 0, 64); err == nil { + honoConfig.AuthErrRetries = retries + } + if mul, err := strconv.ParseFloat(os.Getenv("HUB_CONNECT_MUL"), 32); err == nil { honoConfig.BackoffMultiplier = mul } @@ -408,6 +415,7 @@ func HonoConnect(sigs chan os.Signal, "client_id": honoClient.ClientID(), } + var authErrRetries int64 for { future := honoClient.Connect() @@ -419,7 +427,15 @@ func HonoConnect(sigs chan os.Signal, cause, retry := routing.StatusCause(err) routing.SendStatus(cause, statusPub, logger) if !retry { - return err + if errors.Is(err, packets.ErrorRefusedBadUsernameOrPassword) || + errors.Is(err, packets.ErrorRefusedNotAuthorised) { + authErrRetries++ + if authErrRetries == honoClient.AuthErrRetries() { + return err + } + } else { + return err + } } } else { diff --git a/config/it_connections_test.go b/config/it_connections_test.go index f2fb549..12a0f7b 100644 --- a/config/it_connections_test.go +++ b/config/it_connections_test.go @@ -31,7 +31,10 @@ import ( "github.com/ThreeDotsLabs/watermill" "github.com/ThreeDotsLabs/watermill/pubsub/gochannel" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/eclipse/paho.mqtt.golang/packets" ) func TestLocalConnect(t *testing.T) { @@ -91,3 +94,35 @@ func TestDummyHonoConnect(t *testing.T) { require.NoError(t, config.HonoConnect(nil, statusPub, client, logger)) client.Disconnect() } + +func TestHonoConnectAuthError(t *testing.T) { + defer goleak.VerifyNone(t) + + cfg, err := testutil.NewLocalConfig() + require.NoError(t, err) + + cfg.Credentials.UserName = "invalid" + cfg.Credentials.Password = "invalid" + cfg.ConnectTimeout = 5 * time.Second + cfg.MinReconnectInterval = 2 * time.Second + cfg.MaxReconnectInterval = 5 * time.Second + cfg.BackoffMultiplier = 2 + cfg.AuthErrRetries = 2 + + logger := testutil.NewLogger("connections", logger.ERROR, t) + + client, err := conn.NewMQTTConnection(cfg, watermill.NewShortUUID(), logger) + require.NoError(t, err) + + statusPub := gochannel.NewGoChannel( + gochannel.Config{ + Persistent: true, + OutputChannelBuffer: int64(10), + }, + logger, + ) + defer statusPub.Close() + + assert.ErrorIs(t, config.HonoConnect(nil, statusPub, client, logger), packets.ErrorRefusedNotAuthorised) + client.Disconnect() +} diff --git a/connector/config.go b/connector/config.go index b53dc3b..3064003 100644 --- a/connector/config.go +++ b/connector/config.go @@ -53,6 +53,7 @@ type Configuration struct { ConnectRetryInterval time.Duration ExternalReconnect bool + AuthErrRetries int64 BackoffMultiplier float64 MaxReconnectInterval time.Duration MinReconnectInterval time.Duration @@ -90,6 +91,10 @@ func (c *Configuration) Validate() error { return errors.New("MinReconnectInterval > MaxReconnectInterval") } + if c.AuthErrRetries < 1 { + return errors.New("AuthErrRetries < 1") + } + if c.BackoffMultiplier < 1 { return errors.New("BackoffMultiplier < 1") } @@ -111,6 +116,7 @@ func NewMQTTClientConfig(broker string) (*Configuration, error) { ConnectRetryInterval: connectRetryInterval, MinReconnectInterval: maxReconnectInterval, MaxReconnectInterval: maxReconnectInterval, + AuthErrRetries: 1, BackoffMultiplier: 2, }, nil } diff --git a/connector/config_test.go b/connector/config_test.go index 3fa5192..c2712a1 100644 --- a/connector/config_test.go +++ b/connector/config_test.go @@ -40,6 +40,7 @@ func TestReconnectIntervalValid(t *testing.T) { config, err := testutil.NewLocalConfig() require.NoError(t, err) + config.AuthErrRetries = 3 config.BackoffMultiplier = 1.5 config.MinReconnectInterval = 2 * time.Second config.MaxReconnectInterval = 10 * time.Second @@ -67,4 +68,8 @@ func TestReconnectIntervalInvalid(t *testing.T) { config.MaxReconnectInterval = 10 * time.Second config.BackoffMultiplier = 0.2 assert.Error(t, config.Validate()) + + config.BackoffMultiplier = 2.0 + config.AuthErrRetries = 0 + assert.Error(t, config.Validate()) } diff --git a/connector/connection.go b/connector/connection.go index 543bd97..37e5d2b 100644 --- a/connector/connection.go +++ b/connector/connection.go @@ -221,6 +221,11 @@ func (c *MQTTConnection) URL() string { return c.config.URL } +// AuthErrRetries returns number of retry attempts on authorization failure +func (c *MQTTConnection) AuthErrRetries() int64 { + return c.config.AuthErrRetries +} + // ClientID returns the connection client ID. func (c *MQTTConnection) ClientID() string { return c.clientID @@ -504,6 +509,7 @@ func (c *MQTTConnection) externalReconnect(client mqtt.Client) { b := c.ConnectBackoff() b.Reset() + var authErrRetries int64 for { if c.running.IsNotSet() { return @@ -537,14 +543,13 @@ func (c *MQTTConnection) externalReconnect(client mqtt.Client) { c.logger.Error("Reconnect failed", err, logFields) //Handle forced connection close by the broker - if errors.Is(err, packets.ErrorRefusedBadUsernameOrPassword) { + if errors.Is(err, packets.ErrorRefusedBadUsernameOrPassword) || + errors.Is(err, packets.ErrorRefusedNotAuthorised) { + authErrRetries++ c.fireConnectionEvent(false, err) - return - } - - if errors.Is(err, packets.ErrorRefusedNotAuthorised) { - c.fireConnectionEvent(false, err) - return + if authErrRetries == c.config.AuthErrRetries { + return + } } } else {