Skip to content

Commit

Permalink
Suite-connector get de-authenticated after maintenance (#88)
Browse files Browse the repository at this point in the history
[#87] Suite-connector get de-authenticated after maintenance of the Bosch IoT Suite

Signed-off-by: Hristo Bozhilov <[email protected]>
  • Loading branch information
hristobojilov authored May 19, 2023
1 parent 2c284f0 commit 8ecca4f
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 8 deletions.
18 changes: 17 additions & 1 deletion config/connections.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ import (
"github.com/eclipse/ditto-clients-golang/model"

conn "github.com/eclipse-kanto/suite-connector/connector"

"github.com/eclipse/paho.mqtt.golang/packets"
)

var (
Expand Down Expand Up @@ -139,6 +141,7 @@ func CreateHubConnection(
honoConfig.NoOpStore = nopStore
honoConfig.ConnectRetryInterval = 0

honoConfig.AuthErrRetries = 5
honoConfig.BackoffMultiplier = 2
honoConfig.MinReconnectInterval = time.Minute
honoConfig.MaxReconnectInterval = 4 * time.Minute
Expand All @@ -151,6 +154,10 @@ func CreateHubConnection(
honoConfig.MaxReconnectInterval = time.Duration(max) * time.Second
}

if retries, err := strconv.ParseInt(os.Getenv("HUB_CONNECT_AUTH_ERR_RETRIES"), 0, 64); err == nil {
honoConfig.AuthErrRetries = retries
}

if mul, err := strconv.ParseFloat(os.Getenv("HUB_CONNECT_MUL"), 32); err == nil {
honoConfig.BackoffMultiplier = mul
}
Expand Down Expand Up @@ -408,6 +415,7 @@ func HonoConnect(sigs chan os.Signal,
"client_id": honoClient.ClientID(),
}

var authErrRetries int64
for {
future := honoClient.Connect()

Expand All @@ -419,7 +427,15 @@ func HonoConnect(sigs chan os.Signal,
cause, retry := routing.StatusCause(err)
routing.SendStatus(cause, statusPub, logger)
if !retry {
return err
if errors.Is(err, packets.ErrorRefusedBadUsernameOrPassword) ||
errors.Is(err, packets.ErrorRefusedNotAuthorised) {
authErrRetries++
if authErrRetries == honoClient.AuthErrRetries() {
return err
}
} else {
return err
}
}

} else {
Expand Down
35 changes: 35 additions & 0 deletions config/it_connections_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,10 @@ import (

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/pubsub/gochannel"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/eclipse/paho.mqtt.golang/packets"
)

func TestLocalConnect(t *testing.T) {
Expand Down Expand Up @@ -91,3 +94,35 @@ func TestDummyHonoConnect(t *testing.T) {
require.NoError(t, config.HonoConnect(nil, statusPub, client, logger))
client.Disconnect()
}

func TestHonoConnectAuthError(t *testing.T) {
defer goleak.VerifyNone(t)

cfg, err := testutil.NewLocalConfig()
require.NoError(t, err)

cfg.Credentials.UserName = "invalid"
cfg.Credentials.Password = "invalid"
cfg.ConnectTimeout = 5 * time.Second
cfg.MinReconnectInterval = 2 * time.Second
cfg.MaxReconnectInterval = 5 * time.Second
cfg.BackoffMultiplier = 2
cfg.AuthErrRetries = 2

logger := testutil.NewLogger("connections", logger.ERROR, t)

client, err := conn.NewMQTTConnection(cfg, watermill.NewShortUUID(), logger)
require.NoError(t, err)

statusPub := gochannel.NewGoChannel(
gochannel.Config{
Persistent: true,
OutputChannelBuffer: int64(10),
},
logger,
)
defer statusPub.Close()

assert.ErrorIs(t, config.HonoConnect(nil, statusPub, client, logger), packets.ErrorRefusedNotAuthorised)
client.Disconnect()
}
6 changes: 6 additions & 0 deletions connector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type Configuration struct {
ConnectRetryInterval time.Duration

ExternalReconnect bool
AuthErrRetries int64
BackoffMultiplier float64
MaxReconnectInterval time.Duration
MinReconnectInterval time.Duration
Expand Down Expand Up @@ -90,6 +91,10 @@ func (c *Configuration) Validate() error {
return errors.New("MinReconnectInterval > MaxReconnectInterval")
}

if c.AuthErrRetries < 1 {
return errors.New("AuthErrRetries < 1")
}

if c.BackoffMultiplier < 1 {
return errors.New("BackoffMultiplier < 1")
}
Expand All @@ -111,6 +116,7 @@ func NewMQTTClientConfig(broker string) (*Configuration, error) {
ConnectRetryInterval: connectRetryInterval,
MinReconnectInterval: maxReconnectInterval,
MaxReconnectInterval: maxReconnectInterval,
AuthErrRetries: 1,
BackoffMultiplier: 2,
}, nil
}
5 changes: 5 additions & 0 deletions connector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestReconnectIntervalValid(t *testing.T) {
config, err := testutil.NewLocalConfig()
require.NoError(t, err)

config.AuthErrRetries = 3
config.BackoffMultiplier = 1.5
config.MinReconnectInterval = 2 * time.Second
config.MaxReconnectInterval = 10 * time.Second
Expand Down Expand Up @@ -67,4 +68,8 @@ func TestReconnectIntervalInvalid(t *testing.T) {
config.MaxReconnectInterval = 10 * time.Second
config.BackoffMultiplier = 0.2
assert.Error(t, config.Validate())

config.BackoffMultiplier = 2.0
config.AuthErrRetries = 0
assert.Error(t, config.Validate())
}
19 changes: 12 additions & 7 deletions connector/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ func (c *MQTTConnection) URL() string {
return c.config.URL
}

// AuthErrRetries returns number of retry attempts on authorization failure
func (c *MQTTConnection) AuthErrRetries() int64 {
return c.config.AuthErrRetries
}

// ClientID returns the connection client ID.
func (c *MQTTConnection) ClientID() string {
return c.clientID
Expand Down Expand Up @@ -504,6 +509,7 @@ func (c *MQTTConnection) externalReconnect(client mqtt.Client) {
b := c.ConnectBackoff()
b.Reset()

var authErrRetries int64
for {
if c.running.IsNotSet() {
return
Expand Down Expand Up @@ -537,14 +543,13 @@ func (c *MQTTConnection) externalReconnect(client mqtt.Client) {
c.logger.Error("Reconnect failed", err, logFields)

//Handle forced connection close by the broker
if errors.Is(err, packets.ErrorRefusedBadUsernameOrPassword) {
if errors.Is(err, packets.ErrorRefusedBadUsernameOrPassword) ||
errors.Is(err, packets.ErrorRefusedNotAuthorised) {
authErrRetries++
c.fireConnectionEvent(false, err)
return
}

if errors.Is(err, packets.ErrorRefusedNotAuthorised) {
c.fireConnectionEvent(false, err)
return
if authErrRetries == c.config.AuthErrRetries {
return
}
}

} else {
Expand Down

0 comments on commit 8ecca4f

Please sign in to comment.