Skip to content

Commit

Permalink
Merge pull request #2414 from rhajek/feat/bigpanda
Browse files Browse the repository at this point in the history
Feat/Big Panda Event Handler
  • Loading branch information
docmerlin authored Nov 20, 2020
2 parents 9bedd7e + 79dfe4c commit 20aa35d
Show file tree
Hide file tree
Showing 15 changed files with 744 additions and 2 deletions.
27 changes: 27 additions & 0 deletions alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/influxdata/kapacitor/models"
"github.com/influxdata/kapacitor/pipeline"
alertservice "github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/bigpanda"
"github.com/influxdata/kapacitor/services/discord"
"github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/httppost"
Expand Down Expand Up @@ -486,6 +487,18 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a
}
an.handlers = append(an.handlers, h)
}

for _, s := range n.BigPandaHandlers {
c := bigpanda.HandlerConfig{
AppKey: s.AppKey,
}
h, err := et.tm.BigPandaService.Handler(c, ctx...)
if err != nil {
return nil, errors.Wrap(err, "failed to create BigPanda handler")
}
an.handlers = append(an.handlers, h)
}

for _, t := range n.TeamsHandlers {
c := teams.HandlerConfig{
ChannelURL: t.ChannelURL,
Expand Down Expand Up @@ -544,6 +557,20 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a
n.IsStateChangesOnly = true
}

if len(n.BigPandaHandlers) == 0 && (et.tm.BigPandaService != nil && et.tm.BigPandaService.Global()) {
h, err := et.tm.BigPandaService.Handler(bigpanda.HandlerConfig{}, ctx...)
if err != nil {
return nil, errors.Wrap(err, "failed to create BigPanda handler")
}
an.handlers = append(an.handlers, h)
}
// If BigPanda has been configured with state changes only set it.
if et.tm.BigPandaService != nil &&
et.tm.BigPandaService.Global() &&
et.tm.BigPandaService.StateChangesOnly() {
n.IsStateChangesOnly = true
}

// Parse level expressions
an.levels = make([]stateful.Expression, alert.Critical+1)
an.scopePools = make([]stateful.ScopePool, alert.Critical+1)
Expand Down
101 changes: 101 additions & 0 deletions integrations/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ import (
"github.com/influxdata/kapacitor/services/alert/alerttest"
"github.com/influxdata/kapacitor/services/alerta"
"github.com/influxdata/kapacitor/services/alerta/alertatest"
"github.com/influxdata/kapacitor/services/bigpanda"
"github.com/influxdata/kapacitor/services/bigpanda/bigpandatest"
"github.com/influxdata/kapacitor/services/diagnostic"
"github.com/influxdata/kapacitor/services/hipchat"
"github.com/influxdata/kapacitor/services/hipchat/hipchattest"
Expand Down Expand Up @@ -9229,6 +9231,105 @@ stream
}
}

func TestStream_AlertBigPanda(t *testing.T) {
ts := bigpandatest.NewServer()
defer ts.Close()

var script = `
stream
|from()
.measurement('cpu')
.where(lambda: "host" == 'serverA')
.groupBy('host')
|window()
.period(10s)
.every(10s)
|count('value')
|alert()
.id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}')
.message('kapacitor/{{ .Name }}/{{ index .Tags "host" }} is {{ .Level }} @{{.Time}}')
.details('https://example.org/link')
.info(lambda: "count" > 6.0)
.warn(lambda: "count" > 7.0)
.crit(lambda: "count" > 8.0)
.bigPanda()
.AppKey('111111')
.bigPanda()
.AppKey('222222')
.bigPanda()
`
tmInit := func(tm *kapacitor.TaskMaster) {

c := bigpanda.NewConfig()
c.Enabled = true
c.AppKey = "XXXXXXX"
c.Token = "testtoken1231234"
c.URL = ts.URL + "/test/bigpanda/url"

d := diagService.NewBigPandaHandler().WithContext(keyvalue.KV("test", "111"))
sl, err := bigpanda.NewService(c, d)
if err != nil {
t.Error(err)
}

tm.BigPandaService = sl
}

testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second, tmInit)

exp := []interface{}{
bigpandatest.Request{
URL: "/test/bigpanda/url",
PostData: bigpandatest.PostData{
Check: "kapacitor/cpu/serverA",
Description: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC",
AppKey: "111111",
Status: "critical",
Host: "serverA",
Timestamp: 31536010,
Task: "TestStream_Alert:cpu",
Details: "https://example.org/link",
},
},
bigpandatest.Request{
URL: "/test/bigpanda/url",
PostData: bigpandatest.PostData{
Check: "kapacitor/cpu/serverA",
Description: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC",
AppKey: "222222",
Status: "critical",
Host: "serverA",
Timestamp: 31536010,
Task: "TestStream_Alert:cpu",
Details: "https://example.org/link",
},
},
bigpandatest.Request{
URL: "/test/bigpanda/url",
PostData: bigpandatest.PostData{
Check: "kapacitor/cpu/serverA",
Description: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC",
AppKey: "XXXXXXX",
Status: "critical",
Host: "serverA",
Timestamp: 31536010,
Task: "TestStream_Alert:cpu",
Details: "https://example.org/link",
},
},
}

ts.Close()
var got []interface{}
for _, g := range ts.Requests() {
got = append(got, g)
}

if err := compareListIgnoreOrder(got, exp, nil); err != nil {
t.Error(err)
}
}

func TestStream_AlertPushover(t *testing.T) {
ts := pushovertest.NewServer()
defer ts.Close()
Expand Down
69 changes: 69 additions & 0 deletions pipeline/alert.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,10 @@ type AlertNodeData struct {
// tick:ignore
DiscordHandlers []*DiscordHandler `tick:"Discord" json:"discord"`

// Send alert to BigPanda
// tick:ignore
BigPandaHandlers []*BigPandaHandler `tick:"BigPanda" json:"bigPanda"`

// Send alert to Telegram.
// tick:ignore
TelegramHandlers []*TelegramHandler `tick:"Telegram" json:"telegram"`
Expand Down Expand Up @@ -1615,6 +1619,71 @@ type DiscordHandler struct {
EmbedTitle string `json:"embedTitle"`
}

// To allow Kapacitor to post to BigPanda,
// follow this guide https://docs.bigpanda.io/docs/api-key-management
// and create a new api key
// in the 'bigpanda' configuration section.
//
// Example:
// [bigpanda]
// enabled = true
// app-key = "my-app-key"
// token = "your-api-key"
//
// In order to not post a message every alert interval
// use AlertNode.StateChangesOnly so that only events
// where the alert changed state are posted to the channel.
//
// Example:
// stream
// |alert()
// .bigPanda()
//
// Send alerts with default app key
//
// Example:
// stream
// |alert()
// .bigPanda()
// .appKey('my-application')
//
// send alerts with custom appKey
//
// If the 'bigpanda' section in the configuration has the option: global = true
// then all alerts are sent to BigpPanda without the need to explicitly state it
// in the TICKscript.
//
// Example:
// [bigpanda]
// enabled = true
// default = true
// app-key = examplecorp
// global = true
// state-changes-only = true
//
// Example:
// stream
// |alert()
//
// Send alert to BigPanda.
// tick:property

func (n *AlertNodeData) BigPanda() *BigPandaHandler {
bigPanda := &BigPandaHandler{
AlertNodeData: n,
}
n.BigPandaHandlers = append(n.BigPandaHandlers, bigPanda)
return bigPanda
}

// tick:embedded:AlertNode.BigPanda
type BigPandaHandler struct {
*AlertNodeData `json:"-"`
// Application id
// If empty uses the default config
AppKey string `json:"app-key"`
}

// Send the alert to Telegram.
// For step-by-step instructions on setting up Kapacitor with Telegram, see the [Event Handler Setup Guide](https://docs.influxdata.com//kapacitor/latest/guides/event-handler-setup/#telegram-setup).
// To allow Kapacitor to post to Telegram,
Expand Down
1 change: 1 addition & 0 deletions pipeline/alert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ func TestAlertNode_MarshalJSON(t *testing.T) {
"sensu": null,
"slack": null,
"discord": null,
"bigPanda": null,
"telegram": null,
"hipChat": null,
"alerta": null,
Expand Down
1 change: 1 addition & 0 deletions pipeline/json_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ func TestPipeline_MarshalJSON(t *testing.T) {
"sensu": null,
"slack": null,
"discord": null,
"bigPanda": null,
"telegram": null,
"hipChat": null,
"alerta": null,
Expand Down
6 changes: 6 additions & 0 deletions server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/alerta"
"github.com/influxdata/kapacitor/services/azure"
"github.com/influxdata/kapacitor/services/bigpanda"
"github.com/influxdata/kapacitor/services/config"
"github.com/influxdata/kapacitor/services/consul"
"github.com/influxdata/kapacitor/services/deadman"
Expand Down Expand Up @@ -89,6 +90,7 @@ type Config struct {

// Alert handlers
Alerta alerta.Config `toml:"alerta" override:"alerta"`
BigPanda bigpanda.Config `toml:"bigpanda" override:"bigpanda"`
Discord discord.Configs `toml:"discord" override:"discord,element-key=workspace"`
HipChat hipchat.Config `toml:"hipchat" override:"hipchat"`
Kafka kafka.Configs `toml:"kafka" override:"kafka,element-key=id"`
Expand Down Expand Up @@ -161,6 +163,7 @@ func NewConfig() *Config {
c.OpenTSDB = opentsdb.NewConfig()

c.Alerta = alerta.NewConfig()
c.BigPanda = bigpanda.NewConfig()
c.Discord = discord.Configs{discord.NewDefaultConfig()}
c.HipChat = hipchat.NewConfig()
c.Kafka = kafka.Configs{kafka.NewConfig()}
Expand Down Expand Up @@ -281,6 +284,9 @@ func (c *Config) Validate() error {
if err := c.Alerta.Validate(); err != nil {
return errors.Wrap(err, "alerta")
}
if err := c.BigPanda.Validate(); err != nil {
return errors.Wrap(err, "bigpanda")
}
if err := c.Discord.Validate(); err != nil {
return err
}
Expand Down
20 changes: 20 additions & 0 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/influxdata/kapacitor/services/alert"
"github.com/influxdata/kapacitor/services/alerta"
"github.com/influxdata/kapacitor/services/azure"
"github.com/influxdata/kapacitor/services/bigpanda"
"github.com/influxdata/kapacitor/services/config"
"github.com/influxdata/kapacitor/services/consul"
"github.com/influxdata/kapacitor/services/deadman"
Expand Down Expand Up @@ -243,6 +244,9 @@ func New(c *Config, buildInfo BuildInfo, diagService *diagnostic.Service) (*Serv

// Append Alert integration services
s.appendAlertaService()
if err := s.appendBigPandaService(); err != nil {
return nil, errors.Wrap(err, "bigpanda service")
}
if err := s.appendDiscordService(); err != nil {
return nil, errors.Wrap(err, "discord service")
}
Expand Down Expand Up @@ -776,6 +780,22 @@ func (s *Server) appendAlertaService() {
s.AppendService("alerta", srv)
}

func (s *Server) appendBigPandaService() error {
c := s.config.BigPanda
d := s.DiagService.NewBigPandaHandler()
srv, err := bigpanda.NewService(c, d)
if err != nil {
return err
}

s.TaskMaster.BigPandaService = srv
s.AlertService.BigPandaService = srv

s.SetDynamicService("bigpanda", srv)
s.AppendService("bigpanda", srv)
return nil
}

func (s *Server) appendDiscordService() error {
c := s.config.Discord
d := s.DiagService.NewDiscordHandler()
Expand Down
Loading

0 comments on commit 20aa35d

Please sign in to comment.