Skip to content

Commit

Permalink
Alerting: Add MQTT notifications receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-akhmetov committed Jul 31, 2024
1 parent f2ab7c7 commit 92d508e
Show file tree
Hide file tree
Showing 10 changed files with 575 additions and 0 deletions.
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137
github.com/aws/aws-sdk-go v1.50.29
github.com/benbjohnson/clock v1.3.5
github.com/eclipse/paho.mqtt.golang v1.4.3
github.com/go-kit/log v0.2.1
github.com/go-openapi/strfmt v0.22.0
github.com/google/go-cmp v0.6.0
Expand Down Expand Up @@ -47,6 +48,7 @@ require (
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
Expand Down Expand Up @@ -81,6 +83,7 @@ require (
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect
github.com/shurcooL/vfsgen v0.0.0-20200824052919-0d455de96546 // indirect
github.com/spf13/cast v1.3.1 // indirect
github.com/stretchr/objx v0.5.0 // indirect
go.mongodb.org/mongo-driver v1.13.1 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/mod v0.14.0 // indirect
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik=
github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down Expand Up @@ -187,6 +189,8 @@ github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grafana/prometheus-alertmanager v0.25.1-0.20240625192351-66ec17e3aa45 h1:AJKOtDKAOg8XNFnIZSmqqqutoTSxVlRs6vekL2p2KEY=
github.com/grafana/prometheus-alertmanager v0.25.1-0.20240625192351-66ec17e3aa45/go.mod h1:01sXtHoRwI8W324IPAzuxDFOmALqYLCOhvSC2fUHWXc=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
Expand Down Expand Up @@ -342,11 +346,16 @@ github.com/spf13/cast v1.3.1 h1:nFm6S0SMdyzrzcmThSipiEubIDy8WEXKNZ0UOgiRpng=
github.com/spf13/cast v1.3.1/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk=
github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo=
github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM=
Expand Down
4 changes: 4 additions & 0 deletions notify/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/grafana/alerting/receivers/googlechat"
"github.com/grafana/alerting/receivers/kafka"
"github.com/grafana/alerting/receivers/line"
"github.com/grafana/alerting/receivers/mqtt"
"github.com/grafana/alerting/receivers/oncall"
"github.com/grafana/alerting/receivers/opsgenie"
"github.com/grafana/alerting/receivers/pagerduty"
Expand Down Expand Up @@ -96,6 +97,9 @@ func BuildReceiverIntegrations(
for i, cfg := range receiver.LineConfigs {
ci(i, cfg.Metadata, line.New(cfg.Settings, cfg.Metadata, tmpl, nw(cfg.Metadata), nl(cfg.Metadata)))
}
for i, cfg := range receiver.MqttConfigs {
ci(i, cfg.Metadata, mqtt.New(cfg.Settings, cfg.Metadata, tmpl, nl(cfg.Metadata), nil))
}
for i, cfg := range receiver.OnCallConfigs {
ci(i, cfg.Metadata, oncall.New(cfg.Settings, cfg.Metadata, tmpl, nw(cfg.Metadata), img, nl(cfg.Metadata), orgID))
}
Expand Down
8 changes: 8 additions & 0 deletions notify/receivers.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/grafana/alerting/receivers/googlechat"
"github.com/grafana/alerting/receivers/kafka"
"github.com/grafana/alerting/receivers/line"
"github.com/grafana/alerting/receivers/mqtt"
"github.com/grafana/alerting/receivers/oncall"
"github.com/grafana/alerting/receivers/opsgenie"
"github.com/grafana/alerting/receivers/pagerduty"
Expand Down Expand Up @@ -190,6 +191,7 @@ type GrafanaReceiverConfig struct {
KafkaConfigs []*NotifierConfig[kafka.Config]
LineConfigs []*NotifierConfig[line.Config]
OpsgenieConfigs []*NotifierConfig[opsgenie.Config]
MqttConfigs []*NotifierConfig[mqtt.Config]
PagerdutyConfigs []*NotifierConfig[pagerduty.Config]
OnCallConfigs []*NotifierConfig[oncall.Config]
PushoverConfigs []*NotifierConfig[pushover.Config]
Expand Down Expand Up @@ -298,6 +300,12 @@ func parseNotifier(ctx context.Context, result *GrafanaReceiverConfig, receiver
return err
}
result.LineConfigs = append(result.LineConfigs, newNotifierConfig(receiver, cfg))
case "mqtt":
cfg, err := mqtt.NewConfig(receiver.Settings, decryptFn)
if err != nil {
return err
}
result.MqttConfigs = append(result.MqttConfigs, newNotifierConfig(receiver, cfg))
case "opsgenie":
cfg, err := opsgenie.NewConfig(receiver.Settings, decryptFn)
if err != nil {
Expand Down
5 changes: 5 additions & 0 deletions notify/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/grafana/alerting/receivers/googlechat"
"github.com/grafana/alerting/receivers/kafka"
"github.com/grafana/alerting/receivers/line"
"github.com/grafana/alerting/receivers/mqtt"
"github.com/grafana/alerting/receivers/opsgenie"
"github.com/grafana/alerting/receivers/pagerduty"
"github.com/grafana/alerting/receivers/pushover"
Expand Down Expand Up @@ -145,6 +146,10 @@ var AllKnownConfigsForTesting = map[string]NotifierConfigTest{
Config: line.FullValidConfigForTesting,
Secrets: line.FullValidSecretsForTesting,
},
"mqtt": {NotifierType: "mqtt",
Config: mqtt.FullValidConfigForTesting,
Secrets: mqtt.FullValidSecretsForTesting,
},
"opsgenie": {NotifierType: "opsgenie",
Config: opsgenie.FullValidConfigForTesting,
Secrets: opsgenie.FullValidSecretsForTesting,
Expand Down
83 changes: 83 additions & 0 deletions receivers/mqtt/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
package mqtt

import (
"encoding/json"
"errors"
"fmt"
"net"
"net/url"
"strconv"
"strings"

"github.com/grafana/alerting/receivers"
"github.com/grafana/alerting/templates"
)

type Config struct {
BrokerURL string `json:"brokerUrl,omitempty" yaml:"brokerUrl,omitempty"`
ClientID string `json:"clientId,omitempty" yaml:"clientId,omitempty"`
Topic string `json:"topic,omitempty" yaml:"topic,omitempty"`
Message string `json:"message,omitempty" yaml:"message,omitempty"`
Username string `json:"username,omitempty" yaml:"username,omitempty"`
Password string `json:"password,omitempty" yaml:"password,omitempty"`
InsecureSkipVerify bool `json:"insecureSkipVerify,omitempty" yaml:"insecureSkipVerify,omitempty"`
}

func NewConfig(jsonData json.RawMessage, decryptFn receivers.DecryptFunc) (Config, error) {
var settings Config
err := json.Unmarshal(jsonData, &settings)
if err != nil {
return Config{}, fmt.Errorf("failed to unmarshal settings: %w", err)
}

if settings.BrokerURL == "" {
return Config{}, errors.New("MQTT broker URL must be specified")
}
if _, err := isValidMqttURL(settings.BrokerURL); err != nil {
return Config{}, fmt.Errorf("Invalid MQTT broker URL: %w", err)
}

if settings.Topic == "" {
return Config{}, errors.New("MQTT topic must be specified")
}

if settings.ClientID == "" {
settings.ClientID = "Grafana"
}

if settings.Message == "" {
settings.Message = templates.DefaultMessageEmbed
}

password := decryptFn("password", settings.Password)
settings.Password = password

return settings, nil
}

func isValidMqttURL(mqttURL string) (bool, error) {
parsedURL, err := url.Parse(mqttURL)
if err != nil {
return false, err
}

if parsedURL.Scheme != "tcp" && parsedURL.Scheme != "ssl" {
return false, errors.New("Invalid scheme, must be 'tcp' or 'ssl'")
}

host := parsedURL.Host
if !strings.Contains(host, ":") {
return false, errors.New("Port must be specified")
}

_, port, err := net.SplitHostPort(host)
if err != nil {
return false, err
}

if portNum, err := strconv.ParseInt(port, 10, 32); err != nil || portNum > 65535 || portNum < 1 {
return false, errors.New("Port must be a valid number between 1 and 65535")
}

return true, nil
}
106 changes: 106 additions & 0 deletions receivers/mqtt/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package mqtt

import (
"encoding/json"
"testing"

"github.com/stretchr/testify/require"

receiversTesting "github.com/grafana/alerting/receivers/testing"
"github.com/grafana/alerting/templates"
)

func TestNewConfig(t *testing.T) {
cases := []struct {
name string
settings string
secureSettings map[string][]byte
expectedConfig Config
expectedInitError string
}{
{
name: "Error if empty",
settings: "",
expectedInitError: `failed to unmarshal settings`,
},
{
name: "Error if broker URL is missing",
settings: `{}`,
expectedInitError: `MQTT broker URL must be specified`,
},
{
name: "Error if topic is missing",
settings: `{ "brokerUrl" : "tcp://localhost:1883" }`,
expectedInitError: `MQTT topic must be specified`,
},
{
name: "Error if the broker URL does not have the scheme",
settings: `{ "brokerUrl" : "localhost" }`,
expectedInitError: `Invalid MQTT broker URL: Invalid scheme, must be 'tcp' or 'ssl'`,
},
{
name: "Error if the broker URL has invalid scheme",
settings: `{ "brokerUrl" : "http://localhost" }`,
expectedInitError: `Invalid MQTT broker URL: Invalid scheme, must be 'tcp' or 'ssl'`,
},
{
name: "Error if the broker URL does not have the port",
settings: `{ "brokerUrl" : "tcp://localhost" }`,
expectedInitError: `Invalid MQTT broker URL: Port must be specified`,
},
{
name: "Error if the broker URL port is invalid",
settings: `{ "brokerUrl" : "tcp://localhost:100000" }`,
expectedInitError: `Invalid MQTT broker URL: Port must be a valid number between 1 and 65535`,
},
{
name: "Minimal valid configuration",
settings: `{ "brokerUrl" : "tcp://localhost:1883", "topic": "grafana/alerts"}`,
expectedConfig: Config{
Message: templates.DefaultMessageEmbed,
BrokerURL: "tcp://localhost:1883",
Topic: "grafana/alerts",
ClientID: "Grafana",
},
},
{
name: "Configuration with insecureSkipVerify",
settings: `{ "brokerUrl" : "tcp://localhost:1883", "topic": "grafana/alerts", "insecureSkipVerify": true}`,
expectedConfig: Config{
Message: templates.DefaultMessageEmbed,
BrokerURL: "tcp://localhost:1883",
Topic: "grafana/alerts",
ClientID: "Grafana",
InsecureSkipVerify: true,
},
},
{
name: "Minimal valid configuration with secrets",
settings: `{ "brokerUrl" : "tcp://localhost:1883", "topic": "grafana/alerts", "username": "grafana"}`,
secureSettings: map[string][]byte{
"password": []byte("testpasswd"),
},
expectedConfig: Config{
Message: templates.DefaultMessageEmbed,
BrokerURL: "tcp://localhost:1883",
Topic: "grafana/alerts",
ClientID: "Grafana",
Username: "grafana",
Password: "testpasswd",
},
},
}

for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
actual, err := NewConfig(json.RawMessage(c.settings), receiversTesting.DecryptForTesting(c.secureSettings))

if c.expectedInitError != "" {
require.ErrorContains(t, err, c.expectedInitError)
return
}
require.NoError(t, err)
require.Equal(t, c.expectedConfig, actual)
})
}
}
Loading

0 comments on commit 92d508e

Please sign in to comment.