From a14f0e164ed902ffe629e098beede7052fc8ba68 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Fri, 9 Oct 2020 17:49:05 +0200 Subject: [PATCH 01/14] feat: bigpanda integration --- alert.go | 27 ++ integrations/streamer_test.go | 70 +++++ pipeline/alert.go | 22 ++ .../bigpanda/bigpandatest/bigpandatest.go | 58 ++++ services/bigpanda/config.go | 34 +++ services/bigpanda/service.go | 262 ++++++++++++++++++ services/diagnostic/handlers.go | 27 ++ services/diagnostic/service.go | 6 + task_master.go | 7 + 9 files changed, 513 insertions(+) create mode 100644 services/bigpanda/bigpandatest/bigpandatest.go create mode 100644 services/bigpanda/config.go create mode 100644 services/bigpanda/service.go diff --git a/alert.go b/alert.go index 8b60b4f12..5831fb5d9 100644 --- a/alert.go +++ b/alert.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/influxdata/kapacitor/services/bigpanda" html "html/template" "os" "sync" @@ -486,6 +487,18 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a } an.handlers = append(an.handlers, h) } + + for _, s := range n.BigPandaHandlers { + c := bigpanda.HandlerConfig{ + AppKey: s.AppKey, + } + h, err := et.tm.BigPandaService.Handler(c, ctx...) + if err != nil { + return nil, errors.Wrap(err, "failed to create BigPanda handler") + } + an.handlers = append(an.handlers, h) + } + for _, t := range n.TeamsHandlers { c := teams.HandlerConfig{ ChannelURL: t.ChannelURL, @@ -543,6 +556,20 @@ func newAlertNode(et *ExecutingTask, n *pipeline.AlertNode, d NodeDiagnostic) (a n.IsStateChangesOnly = true } + if len(n.BigPandaHandlers) == 0 && (et.tm.BigPandaService != nil && et.tm.BigPandaService.Global()) { + h, err := et.tm.BigPandaService.Handler(bigpanda.HandlerConfig{}, ctx...) + if err != nil { + return nil, errors.Wrap(err, "failed to create BigPanda handler") + } + an.handlers = append(an.handlers, h) + } + // If BigPanda has been configured with state changes only set it. + if et.tm.BigPandaService != nil && + et.tm.BigPandaService.Global() && + et.tm.BigPandaService.StateChangesOnly() { + n.IsStateChangesOnly = true + } + // Parse level expressions an.levels = make([]stateful.Expression, alert.Critical+1) an.scopePools = make([]stateful.ScopePool, alert.Critical+1) diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index eb25a3d93..31d7647c0 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -41,6 +41,8 @@ import ( "github.com/influxdata/kapacitor/services/alert/alerttest" "github.com/influxdata/kapacitor/services/alerta" "github.com/influxdata/kapacitor/services/alerta/alertatest" + "github.com/influxdata/kapacitor/services/bigpanda" + "github.com/influxdata/kapacitor/services/bigpanda/bigpandatest" "github.com/influxdata/kapacitor/services/diagnostic" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/hipchat/hipchattest" @@ -9229,6 +9231,74 @@ stream } } +func TestStream_AlertBigPanda(t *testing.T) { + ts := bigpandatest.NewServer() + defer ts.Close() + + var script = ` +stream + |from() + .measurement('cpu') + .where(lambda: "host" == 'serverA') + .groupBy('host') + |window() + .period(10s) + .every(10s) + |count('value') + |alert() + .id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}') + .info(lambda: "count" > 6.0) + .warn(lambda: "count" > 7.0) + .crit(lambda: "count" > 8.0) + .bigPanda() + .AppKey('111111') + .bigPanda() + .AppKey('222222') +` + tmInit := func(tm *kapacitor.TaskMaster) { + + c := bigpanda.NewConfig() + c.Enabled = true + c.AppKey = "1111111" + c.Token = "testtoken1231234" + + d := diagService.NewBigPandaHandler().WithContext(keyvalue.KV("test", "111")) + sl, err := bigpanda.NewService(c, d) + if err != nil { + t.Error(err) + } + + tm.BigPandaService = sl + } + + testStreamerNoOutput(t, "TestStream_Alert", script, 13*time.Second, tmInit) + + exp := []interface{}{ + bigpandatest.Request{ + URL: "/data/v2/alerts", + PostData: bigpandatest.PostData{ + Description: "kapacitor/cpu/serverA is CRITICAL", + }, + }, + bigpandatest.Request{ + URL: "/data/v2/alerts", + PostData: bigpandatest.PostData{ + Description: "kapacitor/cpu/serverA is CRITICAL", + }, + }, + } + + ts.Close() + var got []interface{} + for _, g := range ts.Requests() { + got = append(got, g) + } + + if err := compareListIgnoreOrder(got, exp, nil); err != nil { + t.Error(err) + } +} + func TestStream_AlertPushover(t *testing.T) { ts := pushovertest.NewServer() defer ts.Close() diff --git a/pipeline/alert.go b/pipeline/alert.go index ce50dbeb8..b494d4db2 100644 --- a/pipeline/alert.go +++ b/pipeline/alert.go @@ -357,6 +357,10 @@ type AlertNodeData struct { // tick:ignore DiscordHandlers []*DiscordHandler `tick:"Discord" json:"discord"` + // Send alert to BigPanda + // tick:ignore + BigPandaHandlers []*BigPandaHandler `tick:"BigPanda" json:"bigPanda"` + // Send alert to Telegram. // tick:ignore TelegramHandlers []*TelegramHandler `tick:"Telegram" json:"telegram"` @@ -1615,6 +1619,24 @@ type DiscordHandler struct { EmbedTitle string `json:"embedTitle"` } +//TODO documentation +// Send the alert to BigPanda +func (n *AlertNodeData) BigPanda() *BigPandaHandler { + bigPanda := &BigPandaHandler{ + AlertNodeData: n, + } + n.BigPandaHandlers = append(n.BigPandaHandlers, bigPanda) + return bigPanda +} + +// tick:embedded:AlertNode.Discord +type BigPandaHandler struct { + *AlertNodeData `json:"-"` + // Application id + // If empty uses the default config + AppKey string `json:"app-key"` +} + // Send the alert to Telegram. // For step-by-step instructions on setting up Kapacitor with Telegram, see the [Event Handler Setup Guide](https://docs.influxdata.com//kapacitor/latest/guides/event-handler-setup/#telegram-setup). // To allow Kapacitor to post to Telegram, diff --git a/services/bigpanda/bigpandatest/bigpandatest.go b/services/bigpanda/bigpandatest/bigpandatest.go new file mode 100644 index 000000000..ee76ac9b2 --- /dev/null +++ b/services/bigpanda/bigpandatest/bigpandatest.go @@ -0,0 +1,58 @@ +package bigpandatest + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "sync" +) + +type Server struct { + mu sync.Mutex + ts *httptest.Server + URL string + requests []Request + closed bool +} + +func NewServer() *Server { + s := new(Server) + ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + pr := Request{ + URL: r.URL.String(), + } + dec := json.NewDecoder(r.Body) + dec.Decode(&pr.PostData) + s.mu.Lock() + s.requests = append(s.requests, pr) + s.mu.Unlock() + + })) + s.ts = ts + s.URL = ts.URL + return s +} + +func (s *Server) Requests() []Request { + s.mu.Lock() + defer s.mu.Unlock() + return s.requests +} + +func (s *Server) Close() { + if s.closed { + return + } + s.closed = true + s.ts.Close() +} + +type Request struct { + URL string + PostData PostData +} + +// PostData is the default struct to send an element through to PagerDuty +type PostData struct { + Description string `json:"description"` +} diff --git a/services/bigpanda/config.go b/services/bigpanda/config.go new file mode 100644 index 000000000..fe870a995 --- /dev/null +++ b/services/bigpanda/config.go @@ -0,0 +1,34 @@ +package bigpanda + +import ( + "github.com/pkg/errors" +) + +type Config struct { + // Whether BigPanda integration is enabled. + Enabled bool `toml:"enabled" override:"enabled"` + + // Whether all alerts should automatically post to Teams. + Global bool `toml:"global" override:"global"` + + //Each integration must have an App Key in BigPanda to identify it as a unique source. + AppKey string `toml:"app-key" override:"app-key"` + + //Each integration must have an App Key in BigPanda to identify it as a unique source. + Token string `toml:"token" override:"token"` + + // Whether all alerts should automatically use stateChangesOnly mode. + // Only applies if global is also set. + StateChangesOnly bool `toml:"state-changes-only" override:"state-changes-only"` +} + +func NewConfig() Config { + return Config{} +} + +func (c Config) Validate() error { + if c.Enabled && c.AppKey == "" { + return errors.New("must specify BigPanda AppKey") + } + return nil +} diff --git a/services/bigpanda/service.go b/services/bigpanda/service.go new file mode 100644 index 000000000..1ee1dcd23 --- /dev/null +++ b/services/bigpanda/service.go @@ -0,0 +1,262 @@ +package bigpanda + +import ( + "bytes" + "encoding/json" + "fmt" + "github.com/influxdata/kapacitor/models" + "io" + "io/ioutil" + "net/http" + "net/url" + "sync/atomic" + "time" + + "github.com/influxdata/kapacitor/alert" + "github.com/influxdata/kapacitor/keyvalue" + "github.com/pkg/errors" +) + +type Diagnostic interface { + WithContext(ctx ...keyvalue.T) Diagnostic + Error(msg string, err error) +} + +type Service struct { + configValue atomic.Value + diag Diagnostic +} + +func NewService(c Config, d Diagnostic) (*Service, error) { + s := &Service{ + diag: d, + } + s.configValue.Store(c) + return s, nil +} + +func (s *Service) Open() error { + return nil +} + +func (s *Service) Close() error { + return nil +} + +func (s *Service) config() Config { + return s.configValue.Load().(Config) +} + +func (s *Service) Update(newConfig []interface{}) error { + if l := len(newConfig); l != 1 { + return fmt.Errorf("expected only one new config object, got %d", l) + } + if c, ok := newConfig[0].(Config); !ok { + return fmt.Errorf("expected config object to be of type %T, got %T", c, newConfig[0]) + } else { + s.configValue.Store(c) + } + return nil +} + +func (s *Service) Global() bool { + return s.config().Global +} + +func (s *Service) StateChangesOnly() bool { + return s.config().StateChangesOnly +} + +type testOptions struct { + AlertTopic string `json:"alert_topic"` + AlertID string `json:"alert_id"` + Message string `json:"message"` + Level alert.Level `json:"level"` + Data alert.EventData `json:"event_data"` + Timestamp time.Time `json:"timestamp"` +} + +func (s *Service) TestOptions() interface{} { + layout := "2006-01-02T15:04:05.000Z" + str := "2014-11-12T11:45:26.371Z" + t, _ := time.Parse(layout, str) + + return &testOptions{ + AlertTopic: "test kapacitor alert topic", + AlertID: "foo/bar/bat", + Message: "test teams message", + Level: alert.Critical, + Data: alert.EventData{ + Name: "testBugPanda", + Tags: make(map[string]string), + Fields: make(map[string]interface{}), + Result: models.Result{}, + }, + Timestamp: t, + } + +} + +func (s *Service) Test(options interface{}) error { + o, ok := options.(*testOptions) + if !ok { + return fmt.Errorf("unexpected options type %T", options) + } + //c := s.config() + return s.Alert(o.AlertTopic, o.AlertID, o.Message, o.Level, o.Timestamp, o.Data) +} + +func (s *Service) Alert(alertTopic, alertID, message string, level alert.Level, timestamp time.Time, data alert.EventData) error { + url, post, err := s.preparePost(alertTopic, alertID, message, level, timestamp, data) + if err != nil { + return err + } + + resp, err := http.Post(url, "application/json", post) + if err != nil { + return err + } + defer resp.Body.Close() + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + type response struct { + Error string `json:"error"` + } + r := &response{Error: fmt.Sprintf("failed to understand Teams response. code: %d content: %s", resp.StatusCode, string(body))} + b := bytes.NewReader(body) + dec := json.NewDecoder(b) + dec.Decode(r) + return errors.New(r.Error) + } + return nil +} + +// BPAlert +// See https://docs.bigpanda.io/reference#alerts +/* + +curl -X POST -H "Content-Type: application/json" \ + -H "Authorization: Bearer " \ + https://api.bigpanda.io/data/v2/alerts \ + -d '{ "app_key": "", "status": "critical", "host": "production-database-1", "check": "CPU overloaded" }' + +{ + "app_key": "123", + "status": "critical", + "host": "production-database-1", + "timestamp": 1402302570, + "check": "CPU overloaded", + "description": "CPU is above upper limit (70%)", + "cluster": "production-databases", + "my_unique_attribute": "my_unique_value" +} + + statuses: ok, critical, warning, acknowledged + + "primary_property": "application", + "secondary_property": "host" +*/ +type BPAlert struct { + AppKey string `json:"app_key"` + Status string `json:"status"` + Host string `json:"host"` + Timestamp string `json:"timestamp"` + Check string `json:"check"` + Description string `json:"description"` + Cluster string `json:"cluster"` + Attributes map[string]interface{} `json:"-"` +} + +// append additional properties `Attributes` as extra attributes +func (o BPAlert) MarshalJSON() ([]byte, error) { + type Object_ BPAlert + b, err := json.Marshal(Object_(o)) + if err != nil { + return nil, err + } + if o.Attributes == nil || len(o.Attributes) == 0 { + return b, nil + } + m, err := json.Marshal(o.Attributes) + if err != nil { + return nil, err + } + if len(b) == 2 { + return m, nil + } else { + b[len(b)-1] = ',' + return append(b, m[1:]...), nil + } +} + +func (s *Service) preparePost(alertTopic, alertID, message string, level alert.Level, timestamp time.Time, data alert.EventData) (string, io.Reader, error) { + c := s.config() + if !c.Enabled { + return "", nil, errors.New("service is not enabled") + } + + var status string + switch level { + case alert.Warning: + status = "warning" + case alert.Critical: + status = "critical" + case alert.Info: + status = "ok" + default: + status = "critical" + } + + payload := &BPAlert{} + payload.Description = message + payload.Timestamp = timestamp.Format("2006-01-02T15:04:05.000000000Z07:00") + payload.Status = status + + postBytes, err := payload.MarshalJSON() + if err != nil { + return "", nil, errors.Wrap(err, "error marshaling card struct") + } + + post := bytes.NewBuffer(postBytes) + alertUrl, err := url.Parse("https://api.bigpanda.io/data/v2/alerts") + if err != nil { + return "", nil, err + } + + return alertUrl.String(), post, nil +} + +// HandlerConfig defines the high-level struct required to connect to BigPanda +type HandlerConfig struct { + AppKey string `mapstructure:"app-key"` +} + +type handler struct { + s *Service + c HandlerConfig + diag Diagnostic +} + +func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, error) { + return &handler{ + s: s, + c: c, + diag: s.diag.WithContext(ctx...), + }, nil +} + +func (h *handler) Handle(event alert.Event) { + if err := h.s.Alert( + event.Topic, + event.State.ID, + event.State.Message, + event.State.Level, + event.State.Time, + event.Data, + ); err != nil { + h.diag.Error("failed to send event to BigPanda", err) + } +} diff --git a/services/diagnostic/handlers.go b/services/diagnostic/handlers.go index da39965e9..f9209dd3f 100644 --- a/services/diagnostic/handlers.go +++ b/services/diagnostic/handlers.go @@ -17,6 +17,7 @@ import ( "github.com/influxdata/kapacitor/models" alertservice "github.com/influxdata/kapacitor/services/alert" "github.com/influxdata/kapacitor/services/alerta" + "github.com/influxdata/kapacitor/services/bigpanda" "github.com/influxdata/kapacitor/services/discord" "github.com/influxdata/kapacitor/services/ec2" "github.com/influxdata/kapacitor/services/hipchat" @@ -627,6 +628,32 @@ func (h *DiscordHandler) WithContext(ctx ...keyvalue.T) discord.Diagnostic { } } +// BigPanda Handler + +type BigPandaHandler struct { + l Logger +} + +func (h *BigPandaHandler) InsecureSkipVerify() { + h.l.Info("service is configured to skip ssl verification") +} + +func (h *BigPandaHandler) Error(msg string, err error) { + h.l.Error(msg, Error(err)) +} + +func (h *BigPandaHandler) TemplateError(err error, kv keyvalue.T) { + h.l.Error("failed to evaluate BigPanda template", Error(err), String(kv.Key, kv.Value)) +} + +func (h *BigPandaHandler) WithContext(ctx ...keyvalue.T) bigpanda.Diagnostic { + fields := logFieldsFromContext(ctx) + + return &BigPandaHandler{ + l: h.l.With(fields...), + } +} + // Storage Handler type StorageHandler struct { diff --git a/services/diagnostic/service.go b/services/diagnostic/service.go index 318e7a9a7..e009fc338 100644 --- a/services/diagnostic/service.go +++ b/services/diagnostic/service.go @@ -154,6 +154,12 @@ func (s *Service) NewDiscordHandler() *DiscordHandler { } } +func (s *Service) NewBigPandaHandler() *BigPandaHandler { + return &BigPandaHandler{ + l: s.Logger.With(String("service", "bigpanda")), + } +} + func (s *Service) NewTaskStoreHandler() *TaskStoreHandler { return &TaskStoreHandler{ l: s.Logger.With(String("service", "task_store")), diff --git a/task_master.go b/task_master.go index 5d723eeea..0cf873026 100644 --- a/task_master.go +++ b/task_master.go @@ -3,6 +3,7 @@ package kapacitor import ( "errors" "fmt" + "github.com/influxdata/kapacitor/services/bigpanda" "log" "sync" "time" @@ -157,6 +158,11 @@ type TaskMaster struct { StateChangesOnly() bool Handler(discord.HandlerConfig, ...keyvalue.T) (alert.Handler, error) } + BigPandaService interface { + Global() bool + StateChangesOnly() bool + Handler(bigpanda.HandlerConfig, ...keyvalue.T) (alert.Handler, error) + } SlackService interface { Global() bool StateChangesOnly() bool @@ -299,6 +305,7 @@ func (tm *TaskMaster) New(id string) *TaskMaster { n.PushoverService = tm.PushoverService n.HTTPPostService = tm.HTTPPostService n.DiscordService = tm.DiscordService + n.BigPandaService = tm.BigPandaService n.SlackService = tm.SlackService n.TelegramService = tm.TelegramService n.SNMPTrapService = tm.SNMPTrapService From da8bdd24387a0466ad8c9c991476ca0587d3bf92 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Mon, 12 Oct 2020 16:48:09 +0200 Subject: [PATCH 02/14] feat: bigpanda integration --- pipeline/alert.go | 2 +- pipeline/alert_test.go | 1 + pipeline/json_test.go | 1 + server/config.go | 6 +++++ server/server.go | 20 +++++++++++++++ services/alert/service.go | 12 ++++++--- services/bigpanda/config.go | 14 +++++++++- services/bigpanda/service.go | 50 ++++++++++++++++++++++++++++-------- 8 files changed, 89 insertions(+), 17 deletions(-) diff --git a/pipeline/alert.go b/pipeline/alert.go index b494d4db2..7e8c5c817 100644 --- a/pipeline/alert.go +++ b/pipeline/alert.go @@ -1629,7 +1629,7 @@ func (n *AlertNodeData) BigPanda() *BigPandaHandler { return bigPanda } -// tick:embedded:AlertNode.Discord +// tick:embedded:AlertNode.BigPanda type BigPandaHandler struct { *AlertNodeData `json:"-"` // Application id diff --git a/pipeline/alert_test.go b/pipeline/alert_test.go index ea9d350e0..fba170cab 100644 --- a/pipeline/alert_test.go +++ b/pipeline/alert_test.go @@ -73,6 +73,7 @@ func TestAlertNode_MarshalJSON(t *testing.T) { "sensu": null, "slack": null, "discord": null, + "bigPanda": null, "telegram": null, "hipChat": null, "alerta": null, diff --git a/pipeline/json_test.go b/pipeline/json_test.go index f2ef9c302..27c8de655 100644 --- a/pipeline/json_test.go +++ b/pipeline/json_test.go @@ -252,6 +252,7 @@ func TestPipeline_MarshalJSON(t *testing.T) { "sensu": null, "slack": null, "discord": null, + "bigPanda": null, "telegram": null, "hipChat": null, "alerta": null, diff --git a/server/config.go b/server/config.go index a3b093a63..8aeb3bbc5 100644 --- a/server/config.go +++ b/server/config.go @@ -3,6 +3,7 @@ package server import ( "encoding" "fmt" + "github.com/influxdata/kapacitor/services/bigpanda" "os" "os/user" "path/filepath" @@ -89,6 +90,7 @@ type Config struct { // Alert handlers Alerta alerta.Config `toml:"alerta" override:"alerta"` + BigPanda bigpanda.Config `toml:"bigpanda" override:"bigpanda"` Discord discord.Configs `toml:"discord" override:"discord,element-key=workspace"` HipChat hipchat.Config `toml:"hipchat" override:"hipchat"` Kafka kafka.Configs `toml:"kafka" override:"kafka,element-key=id"` @@ -161,6 +163,7 @@ func NewConfig() *Config { c.OpenTSDB = opentsdb.NewConfig() c.Alerta = alerta.NewConfig() + c.BigPanda = bigpanda.NewConfig() c.Discord = discord.Configs{discord.NewDefaultConfig()} c.HipChat = hipchat.NewConfig() c.Kafka = kafka.Configs{kafka.NewConfig()} @@ -281,6 +284,9 @@ func (c *Config) Validate() error { if err := c.Alerta.Validate(); err != nil { return errors.Wrap(err, "alerta") } + if err := c.BigPanda.Validate(); err != nil { + return errors.Wrap(err, "bigpanda") + } if err := c.Discord.Validate(); err != nil { return err } diff --git a/server/server.go b/server/server.go index cb334d879..374f63757 100644 --- a/server/server.go +++ b/server/server.go @@ -4,6 +4,7 @@ package server import ( "crypto/tls" "fmt" + "github.com/influxdata/kapacitor/services/bigpanda" "io/ioutil" "os" "path/filepath" @@ -243,6 +244,9 @@ func New(c *Config, buildInfo BuildInfo, diagService *diagnostic.Service) (*Serv // Append Alert integration services s.appendAlertaService() + if err := s.appendBigPandaService(); err != nil { + return nil, errors.Wrap(err, "bigpanda service") + } if err := s.appendDiscordService(); err != nil { return nil, errors.Wrap(err, "discord service") } @@ -776,6 +780,22 @@ func (s *Server) appendAlertaService() { s.AppendService("alerta", srv) } +func (s *Server) appendBigPandaService() error { + c := s.config.BigPanda + d := s.DiagService.NewBigPandaHandler() + srv, err := bigpanda.NewService(c, d) + if err != nil { + return err + } + + s.TaskMaster.BigPandaService = srv + s.AlertService.BigPandaService = srv + + s.SetDynamicService("bigpanda", srv) + s.AppendService("bigpanda", srv) + return nil +} + func (s *Server) appendDiscordService() error { c := s.config.Discord d := s.DiagService.NewDiscordHandler() diff --git a/services/alert/service.go b/services/alert/service.go index 6c50cc043..31435a1e3 100644 --- a/services/alert/service.go +++ b/services/alert/service.go @@ -14,6 +14,7 @@ import ( "github.com/influxdata/kapacitor/keyvalue" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/services/alerta" + "github.com/influxdata/kapacitor/services/bigpanda" "github.com/influxdata/kapacitor/services/discord" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/httpd" @@ -87,6 +88,9 @@ type Service struct { DefaultHandlerConfig() alerta.HandlerConfig Handler(alerta.HandlerConfig, ...keyvalue.T) (alert.Handler, error) } + BigPandaService interface { + Handler(bigpanda.HandlerConfig, ...keyvalue.T) (alert.Handler, error) + } HipChatService interface { Handler(hipchat.HandlerConfig, ...keyvalue.T) alert.Handler } @@ -767,17 +771,17 @@ func (s *Service) createHandlerFromSpec(spec HandlerSpec) (handler, error) { keyvalue.KV("topic", spec.Topic), } switch spec.Kind { - case "aggregate": - c := newDefaultAggregateHandlerConfig(s.EventCollector) + case "bigpanda": + c := bigpanda.HandlerConfig{} err = decodeOptions(spec.Options, &c) if err != nil { return handler{}, err } - handlerDiag := s.diag.WithHandlerContext(ctx...) - h, err = NewAggregateHandler(c, handlerDiag) + h, err = s.BigPandaService.Handler(c, ctx...) if err != nil { return handler{}, err } + h = newExternalHandler(h) case "alerta": c := s.AlertaService.DefaultHandlerConfig() err = decodeOptions(spec.Options, &c) diff --git a/services/bigpanda/config.go b/services/bigpanda/config.go index fe870a995..72bcbba6b 100644 --- a/services/bigpanda/config.go +++ b/services/bigpanda/config.go @@ -4,6 +4,10 @@ import ( "github.com/pkg/errors" ) +const ( + defaultBigPandaAlertApi = "https://api.bigpanda.io/data/v2/alerts" +) + type Config struct { // Whether BigPanda integration is enabled. Enabled bool `toml:"enabled" override:"enabled"` @@ -20,10 +24,18 @@ type Config struct { // Whether all alerts should automatically use stateChangesOnly mode. // Only applies if global is also set. StateChangesOnly bool `toml:"state-changes-only" override:"state-changes-only"` + + // Whether to skip the tls verification of the alerta host + InsecureSkipVerify bool `toml:"insecure-skip-verify" override:"insecure-skip-verify"` + + //Optional alert api URL, if not specified https://api.bigpanda.io/data/v2/alerts is used + Url string `toml: "url" override: "url"` } func NewConfig() Config { - return Config{} + return Config{ + Url: defaultBigPandaAlertApi, + } } func (c Config) Validate() error { diff --git a/services/bigpanda/service.go b/services/bigpanda/service.go index 1ee1dcd23..76f2b162e 100644 --- a/services/bigpanda/service.go +++ b/services/bigpanda/service.go @@ -2,10 +2,11 @@ package bigpanda import ( "bytes" + "crypto/tls" "encoding/json" "fmt" + khttp "github.com/influxdata/kapacitor/http" "github.com/influxdata/kapacitor/models" - "io" "io/ioutil" "net/http" "net/url" @@ -17,6 +18,10 @@ import ( "github.com/pkg/errors" ) +const ( + defaultTokenPrefix = "Bearer" +) + type Diagnostic interface { WithContext(ctx ...keyvalue.T) Diagnostic Error(msg string, err error) @@ -24,6 +29,7 @@ type Diagnostic interface { type Service struct { configValue atomic.Value + clientValue atomic.Value diag Diagnostic } @@ -32,6 +38,10 @@ func NewService(c Config, d Diagnostic) (*Service, error) { diag: d, } s.configValue.Store(c) + s.clientValue.Store(&http.Client{ + Transport: khttp.NewDefaultTransportWithTLS(&tls.Config{InsecureSkipVerify: c.InsecureSkipVerify}), + }) + return s, nil } @@ -55,6 +65,9 @@ func (s *Service) Update(newConfig []interface{}) error { return fmt.Errorf("expected config object to be of type %T, got %T", c, newConfig[0]) } else { s.configValue.Store(c) + s.clientValue.Store(&http.Client{ + Transport: khttp.NewDefaultTransportWithTLS(&tls.Config{InsecureSkipVerify: c.InsecureSkipVerify}), + }) } return nil } @@ -107,17 +120,21 @@ func (s *Service) Test(options interface{}) error { } func (s *Service) Alert(alertTopic, alertID, message string, level alert.Level, timestamp time.Time, data alert.EventData) error { - url, post, err := s.preparePost(alertTopic, alertID, message, level, timestamp, data) + + req, err := s.preparePost(alertTopic, alertID, message, level, timestamp, data) + if err != nil { return err } - resp, err := http.Post(url, "application/json", post) + client := s.clientValue.Load().(*http.Client) + resp, err := client.Do(req) if err != nil { return err } defer resp.Body.Close() - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNoContent { + + if resp.StatusCode != http.StatusCreated { body, err := ioutil.ReadAll(resp.Body) if err != nil { return err @@ -125,12 +142,13 @@ func (s *Service) Alert(alertTopic, alertID, message string, level alert.Level, type response struct { Error string `json:"error"` } - r := &response{Error: fmt.Sprintf("failed to understand Teams response. code: %d content: %s", resp.StatusCode, string(body))} + r := &response{Error: fmt.Sprintf("failed to understand BigPanda response. code: %d content: %s", resp.StatusCode, string(body))} b := bytes.NewReader(body) dec := json.NewDecoder(b) dec.Decode(r) return errors.New(r.Error) } + return nil } @@ -192,14 +210,16 @@ func (o BPAlert) MarshalJSON() ([]byte, error) { } } -func (s *Service) preparePost(alertTopic, alertID, message string, level alert.Level, timestamp time.Time, data alert.EventData) (string, io.Reader, error) { +func (s *Service) preparePost(alertTopic, alertID, message string, level alert.Level, timestamp time.Time, data alert.EventData) (*http.Request, error) { c := s.config() if !c.Enabled { - return "", nil, errors.New("service is not enabled") + return nil, errors.New("service is not enabled") } var status string switch level { + case alert.OK: + status = "ok" case alert.Warning: status = "warning" case alert.Critical: @@ -214,24 +234,32 @@ func (s *Service) preparePost(alertTopic, alertID, message string, level alert.L payload.Description = message payload.Timestamp = timestamp.Format("2006-01-02T15:04:05.000000000Z07:00") payload.Status = status + payload.AppKey = c.AppKey postBytes, err := payload.MarshalJSON() if err != nil { - return "", nil, errors.Wrap(err, "error marshaling card struct") + return nil, errors.Wrap(err, "error marshaling card struct") } post := bytes.NewBuffer(postBytes) - alertUrl, err := url.Parse("https://api.bigpanda.io/data/v2/alerts") + alertUrl, err := url.Parse(c.Url) if err != nil { - return "", nil, err + return nil, err } - return alertUrl.String(), post, nil + req, err := http.NewRequest("POST", alertUrl.String(), post) + req.Header.Add("Authorization", defaultTokenPrefix+" "+c.Token) + req.Header.Add("Content-Type", "application/json") + if err != nil { + return nil, err + } + return req, nil } // HandlerConfig defines the high-level struct required to connect to BigPanda type HandlerConfig struct { AppKey string `mapstructure:"app-key"` + ApiUrl string `mapstructure:"api-url"` } type handler struct { From 836664b8fd89ee2b9d2a0e7284b1e0e0b01d9431 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Tue, 13 Oct 2020 16:42:36 +0200 Subject: [PATCH 03/14] feat: bigpanda integration test cleanup --- integrations/streamer_test.go | 30 ++++++-- pipeline/alert.go | 51 ++++++++++++- .../bigpanda/bigpandatest/bigpandatest.go | 8 ++ services/bigpanda/service.go | 76 +++++++------------ 4 files changed, 109 insertions(+), 56 deletions(-) diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 31d7647c0..878684868 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -9254,13 +9254,15 @@ stream .AppKey('111111') .bigPanda() .AppKey('222222') + .bigPanda() ` tmInit := func(tm *kapacitor.TaskMaster) { c := bigpanda.NewConfig() c.Enabled = true - c.AppKey = "1111111" + c.AppKey = "XXXXXXX" c.Token = "testtoken1231234" + c.Url = ts.URL + "/test/bigpanda/url" d := diagService.NewBigPandaHandler().WithContext(keyvalue.KV("test", "111")) sl, err := bigpanda.NewService(c, d) @@ -9275,15 +9277,33 @@ stream exp := []interface{}{ bigpandatest.Request{ - URL: "/data/v2/alerts", + URL: "/test/bigpanda/url", PostData: bigpandatest.PostData{ - Description: "kapacitor/cpu/serverA is CRITICAL", + Check: "kapacitor/cpu/serverA is CRITICAL", + AppKey: "111111", + Status: "critical", + Host: "serverA", + Timestamp: "1971-01-01T00:00:10.000000000Z", }, }, bigpandatest.Request{ - URL: "/data/v2/alerts", + URL: "/test/bigpanda/url", PostData: bigpandatest.PostData{ - Description: "kapacitor/cpu/serverA is CRITICAL", + Check: "kapacitor/cpu/serverA is CRITICAL", + AppKey: "222222", + Status: "critical", + Host: "serverA", + Timestamp: "1971-01-01T00:00:10.000000000Z", + }, + }, + bigpandatest.Request{ + URL: "/test/bigpanda/url", + PostData: bigpandatest.PostData{ + Check: "kapacitor/cpu/serverA is CRITICAL", + AppKey: "XXXXXXX", + Status: "critical", + Host: "serverA", + Timestamp: "1971-01-01T00:00:10.000000000Z", }, }, } diff --git a/pipeline/alert.go b/pipeline/alert.go index 7e8c5c817..24606b0f3 100644 --- a/pipeline/alert.go +++ b/pipeline/alert.go @@ -1619,8 +1619,55 @@ type DiscordHandler struct { EmbedTitle string `json:"embedTitle"` } -//TODO documentation -// Send the alert to BigPanda +// To allow Kapacitor to post to BigPanda, +// follow this guide https://docs.bigpanda.io/docs/api-key-management +// and create a new api key +// in the 'bigpanda' configuration section. +// +// Example: +// [bigpanda] +// enabled = true +// app-key = "my-app-key" +// token = "your-api-key" +// +// In order to not post a message every alert interval +// use AlertNode.StateChangesOnly so that only events +// where the alert changed state are posted to the channel. +// +// Example: +// stream +// |alert() +// .bigPanda() +// +// Send alerts with default app key +// +// Example: +// stream +// |alert() +// .bigPanda() +// .appKey('my-application') +// +// send alerts with custom appKey +// +// If the 'bigpanda' section in the configuration has the option: global = true +// then all alerts are sent to BigpPanda without the need to explicitly state it +// in the TICKscript. +// +// Example: +// [bigpanda] +// enabled = true +// default = true +// app-key = examplecorp +// global = true +// state-changes-only = true +// +// Example: +// stream +// |alert() +// +// Send alert to BigPanda. +// tick:property + func (n *AlertNodeData) BigPanda() *BigPandaHandler { bigPanda := &BigPandaHandler{ AlertNodeData: n, diff --git a/services/bigpanda/bigpandatest/bigpandatest.go b/services/bigpanda/bigpandatest/bigpandatest.go index ee76ac9b2..6e0112e5d 100644 --- a/services/bigpanda/bigpandatest/bigpandatest.go +++ b/services/bigpanda/bigpandatest/bigpandatest.go @@ -26,6 +26,7 @@ func NewServer() *Server { s.mu.Lock() s.requests = append(s.requests, pr) s.mu.Unlock() + w.WriteHeader(http.StatusCreated) })) s.ts = ts @@ -53,6 +54,13 @@ type Request struct { } // PostData is the default struct to send an element through to PagerDuty + type PostData struct { + AppKey string `json:"app_key"` + Status string `json:"status"` + Host string `json:"host"` + Timestamp string `json:"timestamp"` + Check string `json:"check"` Description string `json:"description"` + Cluster string `json:"cluster"` } diff --git a/services/bigpanda/service.go b/services/bigpanda/service.go index 76f2b162e..6888eb884 100644 --- a/services/bigpanda/service.go +++ b/services/bigpanda/service.go @@ -81,6 +81,7 @@ func (s *Service) StateChangesOnly() bool { } type testOptions struct { + AppKey string `json:"app_key"` AlertTopic string `json:"alert_topic"` AlertID string `json:"alert_id"` Message string `json:"message"` @@ -116,12 +117,12 @@ func (s *Service) Test(options interface{}) error { return fmt.Errorf("unexpected options type %T", options) } //c := s.config() - return s.Alert(o.AlertTopic, o.AlertID, o.Message, o.Level, o.Timestamp, o.Data) + return s.Alert(o.AppKey, o.AlertTopic, o.AlertID, o.Message, o.Level, o.Timestamp, o.Data) } -func (s *Service) Alert(alertTopic, alertID, message string, level alert.Level, timestamp time.Time, data alert.EventData) error { +func (s *Service) Alert(appKey, alertTopic, alertID, message string, level alert.Level, timestamp time.Time, data alert.EventData) error { - req, err := s.preparePost(alertTopic, alertID, message, level, timestamp, data) + req, err := s.preparePost(appKey, alertTopic, alertID, message, level, timestamp, data) if err != nil { return err @@ -152,7 +153,7 @@ func (s *Service) Alert(alertTopic, alertID, message string, level alert.Level, return nil } -// BPAlert +// BigPanda alert // See https://docs.bigpanda.io/reference#alerts /* @@ -177,40 +178,7 @@ curl -X POST -H "Content-Type: application/json" \ "primary_property": "application", "secondary_property": "host" */ -type BPAlert struct { - AppKey string `json:"app_key"` - Status string `json:"status"` - Host string `json:"host"` - Timestamp string `json:"timestamp"` - Check string `json:"check"` - Description string `json:"description"` - Cluster string `json:"cluster"` - Attributes map[string]interface{} `json:"-"` -} - -// append additional properties `Attributes` as extra attributes -func (o BPAlert) MarshalJSON() ([]byte, error) { - type Object_ BPAlert - b, err := json.Marshal(Object_(o)) - if err != nil { - return nil, err - } - if o.Attributes == nil || len(o.Attributes) == 0 { - return b, nil - } - m, err := json.Marshal(o.Attributes) - if err != nil { - return nil, err - } - if len(b) == 2 { - return m, nil - } else { - b[len(b)-1] = ',' - return append(b, m[1:]...), nil - } -} - -func (s *Service) preparePost(alertTopic, alertID, message string, level alert.Level, timestamp time.Time, data alert.EventData) (*http.Request, error) { +func (s *Service) preparePost(appKey, alertTopic, alertID, message string, level alert.Level, timestamp time.Time, data alert.EventData) (*http.Request, error) { c := s.config() if !c.Enabled { return nil, errors.New("service is not enabled") @@ -230,24 +198,34 @@ func (s *Service) preparePost(alertTopic, alertID, message string, level alert.L status = "critical" } - payload := &BPAlert{} - payload.Description = message - payload.Timestamp = timestamp.Format("2006-01-02T15:04:05.000000000Z07:00") - payload.Status = status - payload.AppKey = c.AppKey + bpData := make(map[string]interface{}) + bpData["check"] = message + bpData["timestamp"] = timestamp.Format("2006-01-02T15:04:05.000000000Z07:00") + bpData["status"] = status - postBytes, err := payload.MarshalJSON() - if err != nil { - return nil, errors.Wrap(err, "error marshaling card struct") + if appKey == "" { + appKey = c.AppKey + } + bpData["app_key"] = appKey + + if len(data.Tags) > 0 { + for k, v := range data.Tags { + bpData[k] = v + } + } + + var post bytes.Buffer + enc := json.NewEncoder(&post) + if err := enc.Encode(bpData); err != nil { + return nil, err } - post := bytes.NewBuffer(postBytes) alertUrl, err := url.Parse(c.Url) if err != nil { return nil, err } - req, err := http.NewRequest("POST", alertUrl.String(), post) + req, err := http.NewRequest("POST", alertUrl.String(), &post) req.Header.Add("Authorization", defaultTokenPrefix+" "+c.Token) req.Header.Add("Content-Type", "application/json") if err != nil { @@ -259,7 +237,6 @@ func (s *Service) preparePost(alertTopic, alertID, message string, level alert.L // HandlerConfig defines the high-level struct required to connect to BigPanda type HandlerConfig struct { AppKey string `mapstructure:"app-key"` - ApiUrl string `mapstructure:"api-url"` } type handler struct { @@ -278,6 +255,7 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, er func (h *handler) Handle(event alert.Event) { if err := h.s.Alert( + h.c.AppKey, event.Topic, event.State.ID, event.State.Message, From deef3b479c811639ae93cf244654c35feffc618a Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Tue, 13 Oct 2020 21:23:27 +0200 Subject: [PATCH 04/14] feat: import fix --- alert.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alert.go b/alert.go index 5831fb5d9..8734417c7 100644 --- a/alert.go +++ b/alert.go @@ -4,7 +4,6 @@ import ( "bytes" "encoding/json" "fmt" - "github.com/influxdata/kapacitor/services/bigpanda" html "html/template" "os" "sync" @@ -18,6 +17,7 @@ import ( "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" alertservice "github.com/influxdata/kapacitor/services/alert" + "github.com/influxdata/kapacitor/services/bigpanda" "github.com/influxdata/kapacitor/services/discord" "github.com/influxdata/kapacitor/services/hipchat" "github.com/influxdata/kapacitor/services/httppost" From b5456b158b383982de2206d5fe0bde70aa949f9c Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Wed, 14 Oct 2020 00:03:17 +0200 Subject: [PATCH 05/14] feat: import fix --- server/config.go | 2 +- server/server.go | 2 +- task_master.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/server/config.go b/server/config.go index 8aeb3bbc5..336bbb3ca 100644 --- a/server/config.go +++ b/server/config.go @@ -3,7 +3,6 @@ package server import ( "encoding" "fmt" - "github.com/influxdata/kapacitor/services/bigpanda" "os" "os/user" "path/filepath" @@ -16,6 +15,7 @@ import ( "github.com/influxdata/kapacitor/services/alert" "github.com/influxdata/kapacitor/services/alerta" "github.com/influxdata/kapacitor/services/azure" + "github.com/influxdata/kapacitor/services/bigpanda" "github.com/influxdata/kapacitor/services/config" "github.com/influxdata/kapacitor/services/consul" "github.com/influxdata/kapacitor/services/deadman" diff --git a/server/server.go b/server/server.go index 374f63757..82194039b 100644 --- a/server/server.go +++ b/server/server.go @@ -4,7 +4,6 @@ package server import ( "crypto/tls" "fmt" - "github.com/influxdata/kapacitor/services/bigpanda" "io/ioutil" "os" "path/filepath" @@ -26,6 +25,7 @@ import ( "github.com/influxdata/kapacitor/services/alert" "github.com/influxdata/kapacitor/services/alerta" "github.com/influxdata/kapacitor/services/azure" + "github.com/influxdata/kapacitor/services/bigpanda" "github.com/influxdata/kapacitor/services/config" "github.com/influxdata/kapacitor/services/consul" "github.com/influxdata/kapacitor/services/deadman" diff --git a/task_master.go b/task_master.go index 0cf873026..58039888f 100644 --- a/task_master.go +++ b/task_master.go @@ -3,7 +3,6 @@ package kapacitor import ( "errors" "fmt" - "github.com/influxdata/kapacitor/services/bigpanda" "log" "sync" "time" @@ -20,6 +19,7 @@ import ( "github.com/influxdata/kapacitor/server/vars" alertservice "github.com/influxdata/kapacitor/services/alert" "github.com/influxdata/kapacitor/services/alerta" + "github.com/influxdata/kapacitor/services/bigpanda" "github.com/influxdata/kapacitor/services/discord" ec2 "github.com/influxdata/kapacitor/services/ec2/client" "github.com/influxdata/kapacitor/services/hipchat" From cdabe58ad960ad6ac35c7877541d7cee009c61ee Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Wed, 14 Oct 2020 00:13:02 +0200 Subject: [PATCH 06/14] feat: fixing whitespaces in toml syntax --- services/bigpanda/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/bigpanda/config.go b/services/bigpanda/config.go index 72bcbba6b..5c14337cb 100644 --- a/services/bigpanda/config.go +++ b/services/bigpanda/config.go @@ -29,7 +29,7 @@ type Config struct { InsecureSkipVerify bool `toml:"insecure-skip-verify" override:"insecure-skip-verify"` //Optional alert api URL, if not specified https://api.bigpanda.io/data/v2/alerts is used - Url string `toml: "url" override: "url"` + Url string `toml:"url" override:"url"` } func NewConfig() Config { From 5676e0130d631076697c558e5cbf3ce5246c3a7a Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Wed, 14 Oct 2020 15:51:53 +0200 Subject: [PATCH 07/14] feat: rolled back removed aggregate handler --- services/alert/service.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/services/alert/service.go b/services/alert/service.go index 31435a1e3..a9458e355 100644 --- a/services/alert/service.go +++ b/services/alert/service.go @@ -771,17 +771,17 @@ func (s *Service) createHandlerFromSpec(spec HandlerSpec) (handler, error) { keyvalue.KV("topic", spec.Topic), } switch spec.Kind { - case "bigpanda": - c := bigpanda.HandlerConfig{} + case "aggregate": + c := newDefaultAggregateHandlerConfig(s.EventCollector) err = decodeOptions(spec.Options, &c) if err != nil { return handler{}, err } - h, err = s.BigPandaService.Handler(c, ctx...) + handlerDiag := s.diag.WithHandlerContext(ctx...) + h, err = NewAggregateHandler(c, handlerDiag) if err != nil { return handler{}, err } - h = newExternalHandler(h) case "alerta": c := s.AlertaService.DefaultHandlerConfig() err = decodeOptions(spec.Options, &c) @@ -793,6 +793,17 @@ func (s *Service) createHandlerFromSpec(spec HandlerSpec) (handler, error) { return handler{}, err } h = newExternalHandler(h) + case "bigpanda": + c := bigpanda.HandlerConfig{} + err = decodeOptions(spec.Options, &c) + if err != nil { + return handler{}, err + } + h, err = s.BigPandaService.Handler(c, ctx...) + if err != nil { + return handler{}, err + } + h = newExternalHandler(h) case "discord": c := discord.HandlerConfig{} err = decodeOptions(spec.Options, &c) From 26f88400b536e603424a8472ecaadcc960abf17e Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Wed, 14 Oct 2020 17:58:22 +0200 Subject: [PATCH 08/14] tests: fixing tests --- server/server_test.go | 57 ++++++++++++++++++- .../bigpanda/bigpandatest/bigpandatest.go | 3 +- services/bigpanda/service.go | 35 +++++------- 3 files changed, 70 insertions(+), 25 deletions(-) diff --git a/server/server_test.go b/server/server_test.go index d6fa4cf93..85c7a64dd 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -22,8 +22,6 @@ import ( "testing" "time" - "github.com/influxdata/kapacitor/services/discord/discordtest" - "github.com/davecgh/go-spew/spew" "github.com/dgrijalva/jwt-go" "github.com/google/go-cmp/cmp" @@ -39,6 +37,8 @@ import ( "github.com/influxdata/kapacitor/server" "github.com/influxdata/kapacitor/services/alert/alerttest" "github.com/influxdata/kapacitor/services/alerta/alertatest" + "github.com/influxdata/kapacitor/services/bigpanda/bigpandatest" + "github.com/influxdata/kapacitor/services/discord/discordtest" "github.com/influxdata/kapacitor/services/hipchat/hipchattest" "github.com/influxdata/kapacitor/services/httppost" "github.com/influxdata/kapacitor/services/httppost/httpposttest" @@ -8850,6 +8850,28 @@ func TestServer_ListServiceTests(t *testing.T) { "id": "", }, }, + { + Link: client.Link{Relation: client.Self, Href: "/kapacitor/v1/service-tests/bigpanda"}, + Name: "bigpanda", + Options: client.ServiceTestOptions{ + "app_key": "my-app-key-123456", + "level": "CRITICAL", + "message": "test bigpanda message", + "timestamp": "1970-01-01T00:00:01Z", + "event_data": map[string]interface{}{ + "Fields": map[string]interface{}{}, + "Result": map[string]interface{}{ + "series": interface{}(nil), + }, + "Name": "testBigPanda", + "TaskName": "", + "Group": "", + "Tags": map[string]interface{}{}, + "Recoverable": false, + "Category": "", + }, + }, + }, { Link: client.Link{Relation: "self", Href: "/kapacitor/v1/service-tests/consul"}, Name: "consul", @@ -9795,6 +9817,37 @@ func TestServer_AlertHandlers(t *testing.T) { return nil }, }, + { + handler: client.TopicHandler{ + Kind: "bigpanda", + Options: map[string]interface{}{ + "app-key": "my-app-key-123456", + }, + }, + setup: func(c *server.Config, ha *client.TopicHandler) (context.Context, error) { + ts := bigpandatest.NewServer() + ctxt := context.WithValue(nil, "server", ts) + + c.BigPanda.Enabled = true + c.BigPanda.Url = ts.URL + "/test/bigpanda/alert" + return ctxt, nil + }, + result: func(ctxt context.Context) error { + ts := ctxt.Value("server").(*bigpandatest.Server) + ts.Close() + got := ts.Requests() + exp := []bigpandatest.Request{{ + URL: "/test/bigpanda/alert", + PostData: bigpandatest.PostData{ + AppKey: "my-app-key-123456", + }, + }} + if !reflect.DeepEqual(exp, got) { + return fmt.Errorf("unexpected bigpanda request:\nexp\n%+v\ngot\n%+v\n", exp, got) + } + return nil + }, + }, { handler: client.TopicHandler{ Kind: "discord", diff --git a/services/bigpanda/bigpandatest/bigpandatest.go b/services/bigpanda/bigpandatest/bigpandatest.go index 6e0112e5d..ac43bd362 100644 --- a/services/bigpanda/bigpandatest/bigpandatest.go +++ b/services/bigpanda/bigpandatest/bigpandatest.go @@ -53,8 +53,7 @@ type Request struct { PostData PostData } -// PostData is the default struct to send an element through to PagerDuty - +// PostData is the default struct to send an element through to BigPanda type PostData struct { AppKey string `json:"app_key"` Status string `json:"status"` diff --git a/services/bigpanda/service.go b/services/bigpanda/service.go index 6888eb884..ec6cda3b9 100644 --- a/services/bigpanda/service.go +++ b/services/bigpanda/service.go @@ -81,27 +81,22 @@ func (s *Service) StateChangesOnly() bool { } type testOptions struct { - AppKey string `json:"app_key"` - AlertTopic string `json:"alert_topic"` - AlertID string `json:"alert_id"` - Message string `json:"message"` - Level alert.Level `json:"level"` - Data alert.EventData `json:"event_data"` - Timestamp time.Time `json:"timestamp"` + AppKey string `json:"app_key"` + Message string `json:"message"` + Level alert.Level `json:"level"` + Data alert.EventData `json:"event_data"` + Timestamp time.Time `json:"timestamp"` } func (s *Service) TestOptions() interface{} { - layout := "2006-01-02T15:04:05.000Z" - str := "2014-11-12T11:45:26.371Z" - t, _ := time.Parse(layout, str) + t, _ := time.Parse(time.RFC3339, "1970-01-01T00:00:01Z") return &testOptions{ - AlertTopic: "test kapacitor alert topic", - AlertID: "foo/bar/bat", - Message: "test teams message", - Level: alert.Critical, + AppKey: "my-app-key-123456", + Message: "test bigpanda message", + Level: alert.Critical, Data: alert.EventData{ - Name: "testBugPanda", + Name: "testBigPanda", Tags: make(map[string]string), Fields: make(map[string]interface{}), Result: models.Result{}, @@ -117,12 +112,12 @@ func (s *Service) Test(options interface{}) error { return fmt.Errorf("unexpected options type %T", options) } //c := s.config() - return s.Alert(o.AppKey, o.AlertTopic, o.AlertID, o.Message, o.Level, o.Timestamp, o.Data) + return s.Alert(o.AppKey, o.Message, o.Level, o.Timestamp, o.Data) } -func (s *Service) Alert(appKey, alertTopic, alertID, message string, level alert.Level, timestamp time.Time, data alert.EventData) error { +func (s *Service) Alert(appKey, message string, level alert.Level, timestamp time.Time, data alert.EventData) error { - req, err := s.preparePost(appKey, alertTopic, alertID, message, level, timestamp, data) + req, err := s.preparePost(appKey, message, level, timestamp, data) if err != nil { return err @@ -178,7 +173,7 @@ curl -X POST -H "Content-Type: application/json" \ "primary_property": "application", "secondary_property": "host" */ -func (s *Service) preparePost(appKey, alertTopic, alertID, message string, level alert.Level, timestamp time.Time, data alert.EventData) (*http.Request, error) { +func (s *Service) preparePost(appKey, message string, level alert.Level, timestamp time.Time, data alert.EventData) (*http.Request, error) { c := s.config() if !c.Enabled { return nil, errors.New("service is not enabled") @@ -256,8 +251,6 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, er func (h *handler) Handle(event alert.Event) { if err := h.s.Alert( h.c.AppKey, - event.Topic, - event.State.ID, event.State.Message, event.State.Level, event.State.Time, From 51864cad04031b34477b08ffe14509a41a64cbef Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Wed, 14 Oct 2020 18:40:39 +0200 Subject: [PATCH 09/14] tests: rename URL, redact token, fix tests --- integrations/streamer_test.go | 2 +- server/server_test.go | 4 +++- services/bigpanda/config.go | 12 ++++++++---- services/bigpanda/service.go | 2 +- 4 files changed, 13 insertions(+), 7 deletions(-) diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 878684868..2a62a5a24 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -9262,7 +9262,7 @@ stream c.Enabled = true c.AppKey = "XXXXXXX" c.Token = "testtoken1231234" - c.Url = ts.URL + "/test/bigpanda/url" + c.URL = ts.URL + "/test/bigpanda/url" d := diagService.NewBigPandaHandler().WithContext(keyvalue.KV("test", "111")) sl, err := bigpanda.NewService(c, d) diff --git a/server/server_test.go b/server/server_test.go index 85c7a64dd..097c2d561 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -9829,7 +9829,9 @@ func TestServer_AlertHandlers(t *testing.T) { ctxt := context.WithValue(nil, "server", ts) c.BigPanda.Enabled = true - c.BigPanda.Url = ts.URL + "/test/bigpanda/alert" + c.BigPanda.Token = "my-token-123" + c.BigPanda.AppKey = "my-app-key" + c.BigPanda.URL = ts.URL + "/test/bigpanda/alert" return ctxt, nil }, result: func(ctxt context.Context) error { diff --git a/services/bigpanda/config.go b/services/bigpanda/config.go index 5c14337cb..450cb3240 100644 --- a/services/bigpanda/config.go +++ b/services/bigpanda/config.go @@ -19,7 +19,7 @@ type Config struct { AppKey string `toml:"app-key" override:"app-key"` //Each integration must have an App Key in BigPanda to identify it as a unique source. - Token string `toml:"token" override:"token"` + Token string `toml:"token" override:"token,redact"` // Whether all alerts should automatically use stateChangesOnly mode. // Only applies if global is also set. @@ -29,18 +29,22 @@ type Config struct { InsecureSkipVerify bool `toml:"insecure-skip-verify" override:"insecure-skip-verify"` //Optional alert api URL, if not specified https://api.bigpanda.io/data/v2/alerts is used - Url string `toml:"url" override:"url"` + URL string `toml:"url" override:"url"` } func NewConfig() Config { return Config{ - Url: defaultBigPandaAlertApi, + URL: defaultBigPandaAlertApi, } } func (c Config) Validate() error { if c.Enabled && c.AppKey == "" { - return errors.New("must specify BigPanda AppKey") + return errors.New("must specify BigPanda app-key") } + if c.Enabled && c.Token == "" { + return errors.New("must specify BigPanda token") + } + return nil } diff --git a/services/bigpanda/service.go b/services/bigpanda/service.go index ec6cda3b9..306d23d42 100644 --- a/services/bigpanda/service.go +++ b/services/bigpanda/service.go @@ -215,7 +215,7 @@ func (s *Service) preparePost(appKey, message string, level alert.Level, timesta return nil, err } - alertUrl, err := url.Parse(c.Url) + alertUrl, err := url.Parse(c.URL) if err != nil { return nil, err } From ef4bb669be33e6f36cbc5f382729c4c86747917e Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Wed, 14 Oct 2020 19:12:17 +0200 Subject: [PATCH 10/14] tests: fix tests --- server/server_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/server/server_test.go b/server/server_test.go index 097c2d561..3ae678b1c 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -9841,7 +9841,10 @@ func TestServer_AlertHandlers(t *testing.T) { exp := []bigpandatest.Request{{ URL: "/test/bigpanda/alert", PostData: bigpandatest.PostData{ - AppKey: "my-app-key-123456", + AppKey: "my-app-key-123456", + Status: "critical", + Timestamp: "1970-01-01T00:00:00.000000000Z", + Check: "message", }, }} if !reflect.DeepEqual(exp, got) { From 9197d1897cd3cbf8ee8a566eaf60801acbe406f6 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Wed, 14 Oct 2020 20:13:54 +0200 Subject: [PATCH 11/14] feat: URL format validation --- services/bigpanda/config.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/services/bigpanda/config.go b/services/bigpanda/config.go index 450cb3240..92f31e2fc 100644 --- a/services/bigpanda/config.go +++ b/services/bigpanda/config.go @@ -2,6 +2,7 @@ package bigpanda import ( "github.com/pkg/errors" + "net/url" ) const ( @@ -42,6 +43,9 @@ func (c Config) Validate() error { if c.Enabled && c.AppKey == "" { return errors.New("must specify BigPanda app-key") } + if _, err := url.Parse(c.URL); err != nil { + return errors.Wrapf(err, "invalid url %q", c.URL) + } if c.Enabled && c.Token == "" { return errors.New("must specify BigPanda token") } From 4aa0a44d1ab0eddfd0f67f283ce71bbda8b73611 Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Thu, 12 Nov 2020 15:21:28 +0100 Subject: [PATCH 12/14] feat: convert timestamp to unix format --- integrations/streamer_test.go | 6 +++--- server/server_test.go | 2 +- services/bigpanda/bigpandatest/bigpandatest.go | 2 +- services/bigpanda/service.go | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 2a62a5a24..a4bbf74e2 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -9283,7 +9283,7 @@ stream AppKey: "111111", Status: "critical", Host: "serverA", - Timestamp: "1971-01-01T00:00:10.000000000Z", + Timestamp: 31536010, }, }, bigpandatest.Request{ @@ -9293,7 +9293,7 @@ stream AppKey: "222222", Status: "critical", Host: "serverA", - Timestamp: "1971-01-01T00:00:10.000000000Z", + Timestamp: 31536010, }, }, bigpandatest.Request{ @@ -9303,7 +9303,7 @@ stream AppKey: "XXXXXXX", Status: "critical", Host: "serverA", - Timestamp: "1971-01-01T00:00:10.000000000Z", + Timestamp: 31536010, }, }, } diff --git a/server/server_test.go b/server/server_test.go index 3ae678b1c..f85ef57d4 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -9843,7 +9843,7 @@ func TestServer_AlertHandlers(t *testing.T) { PostData: bigpandatest.PostData{ AppKey: "my-app-key-123456", Status: "critical", - Timestamp: "1970-01-01T00:00:00.000000000Z", + Timestamp: 0, Check: "message", }, }} diff --git a/services/bigpanda/bigpandatest/bigpandatest.go b/services/bigpanda/bigpandatest/bigpandatest.go index ac43bd362..9bfd4da26 100644 --- a/services/bigpanda/bigpandatest/bigpandatest.go +++ b/services/bigpanda/bigpandatest/bigpandatest.go @@ -58,7 +58,7 @@ type PostData struct { AppKey string `json:"app_key"` Status string `json:"status"` Host string `json:"host"` - Timestamp string `json:"timestamp"` + Timestamp int64 `json:"timestamp"` Check string `json:"check"` Description string `json:"description"` Cluster string `json:"cluster"` diff --git a/services/bigpanda/service.go b/services/bigpanda/service.go index 306d23d42..f1d8f32e8 100644 --- a/services/bigpanda/service.go +++ b/services/bigpanda/service.go @@ -195,7 +195,7 @@ func (s *Service) preparePost(appKey, message string, level alert.Level, timesta bpData := make(map[string]interface{}) bpData["check"] = message - bpData["timestamp"] = timestamp.Format("2006-01-02T15:04:05.000000000Z07:00") + bpData["timestamp"] = timestamp.Unix() bpData["status"] = status if appKey == "" { From 39a1f7849a143271fffc25c71d7aaf281354cf2d Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Mon, 16 Nov 2020 15:42:07 +0100 Subject: [PATCH 13/14] feat: added event details, task, use event ID as the check name --- integrations/streamer_test.go | 41 ++++++++++++------- server/server_test.go | 11 +++-- .../bigpanda/bigpandatest/bigpandatest.go | 2 + services/bigpanda/service.go | 32 ++++++++++++--- 4 files changed, 61 insertions(+), 25 deletions(-) diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index a4bbf74e2..0299a533f 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -9247,6 +9247,8 @@ stream |count('value') |alert() .id('kapacitor/{{ .Name }}/{{ index .Tags "host" }}') + .message('kapacitor/{{ .Name }}/{{ index .Tags "host" }} is {{ .Level }} @{{.Time}}') + .details('https://example.org/link') .info(lambda: "count" > 6.0) .warn(lambda: "count" > 7.0) .crit(lambda: "count" > 8.0) @@ -9279,31 +9281,40 @@ stream bigpandatest.Request{ URL: "/test/bigpanda/url", PostData: bigpandatest.PostData{ - Check: "kapacitor/cpu/serverA is CRITICAL", - AppKey: "111111", - Status: "critical", - Host: "serverA", - Timestamp: 31536010, + Check: "kapacitor/cpu/serverA", + Description: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC", + AppKey: "111111", + Status: "critical", + Host: "serverA", + Timestamp: 31536010, + Task: "TestStream_Alert:cpu", + Details: "https://example.org/link", }, }, bigpandatest.Request{ URL: "/test/bigpanda/url", PostData: bigpandatest.PostData{ - Check: "kapacitor/cpu/serverA is CRITICAL", - AppKey: "222222", - Status: "critical", - Host: "serverA", - Timestamp: 31536010, + Check: "kapacitor/cpu/serverA", + Description: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC", + AppKey: "222222", + Status: "critical", + Host: "serverA", + Timestamp: 31536010, + Task: "TestStream_Alert:cpu", + Details: "https://example.org/link", }, }, bigpandatest.Request{ URL: "/test/bigpanda/url", PostData: bigpandatest.PostData{ - Check: "kapacitor/cpu/serverA is CRITICAL", - AppKey: "XXXXXXX", - Status: "critical", - Host: "serverA", - Timestamp: 31536010, + Check: "kapacitor/cpu/serverA", + Description: "kapacitor/cpu/serverA is CRITICAL @1971-01-01 00:00:10 +0000 UTC", + AppKey: "XXXXXXX", + Status: "critical", + Host: "serverA", + Timestamp: 31536010, + Task: "TestStream_Alert:cpu", + Details: "https://example.org/link", }, }, } diff --git a/server/server_test.go b/server/server_test.go index f85ef57d4..279940aa2 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -9841,10 +9841,13 @@ func TestServer_AlertHandlers(t *testing.T) { exp := []bigpandatest.Request{{ URL: "/test/bigpanda/alert", PostData: bigpandatest.PostData{ - AppKey: "my-app-key-123456", - Status: "critical", - Timestamp: 0, - Check: "message", + AppKey: "my-app-key-123456", + Check: "id", + Status: "critical", + Timestamp: 0, + Task: "testAlertHandlers:alert", + Description: "message", + Details: "details", }, }} if !reflect.DeepEqual(exp, got) { diff --git a/services/bigpanda/bigpandatest/bigpandatest.go b/services/bigpanda/bigpandatest/bigpandatest.go index 9bfd4da26..1ce50358d 100644 --- a/services/bigpanda/bigpandatest/bigpandatest.go +++ b/services/bigpanda/bigpandatest/bigpandatest.go @@ -62,4 +62,6 @@ type PostData struct { Check string `json:"check"` Description string `json:"description"` Cluster string `json:"cluster"` + Task string `json:"task"` + Details string `json:"details"` } diff --git a/services/bigpanda/service.go b/services/bigpanda/service.go index f1d8f32e8..9a0146f65 100644 --- a/services/bigpanda/service.go +++ b/services/bigpanda/service.go @@ -7,9 +7,11 @@ import ( "fmt" khttp "github.com/influxdata/kapacitor/http" "github.com/influxdata/kapacitor/models" + "html" "io/ioutil" "net/http" "net/url" + "strings" "sync/atomic" "time" @@ -112,12 +114,11 @@ func (s *Service) Test(options interface{}) error { return fmt.Errorf("unexpected options type %T", options) } //c := s.config() - return s.Alert(o.AppKey, o.Message, o.Level, o.Timestamp, o.Data) + return s.Alert(o.AppKey, "", o.Message, "", o.Level, o.Timestamp, o.Data) } -func (s *Service) Alert(appKey, message string, level alert.Level, timestamp time.Time, data alert.EventData) error { - - req, err := s.preparePost(appKey, message, level, timestamp, data) +func (s *Service) Alert(appKey, id string, message string, details string, level alert.Level, timestamp time.Time, data alert.EventData) error { + req, err := s.preparePost(appKey, id, message, details, level, timestamp, data) if err != nil { return err @@ -173,7 +174,7 @@ curl -X POST -H "Content-Type: application/json" \ "primary_property": "application", "secondary_property": "host" */ -func (s *Service) preparePost(appKey, message string, level alert.Level, timestamp time.Time, data alert.EventData) (*http.Request, error) { +func (s *Service) preparePost(appKey, id string, message string, details string, level alert.Level, timestamp time.Time, data alert.EventData) (*http.Request, error) { c := s.config() if !c.Enabled { return nil, errors.New("service is not enabled") @@ -194,7 +195,24 @@ func (s *Service) preparePost(appKey, message string, level alert.Level, timesta } bpData := make(map[string]interface{}) - bpData["check"] = message + + if message != "" { + bpData["description"] = message + } + + //ignore default details containing full json event + if details != "" { + unescapeString := html.UnescapeString(details) + if !strings.HasPrefix(unescapeString, "{") { + bpData["details"] = unescapeString + } + } + + if id != "" { + bpData["check"] = id + } + + bpData["task"] = fmt.Sprintf("%s:%s", data.TaskName, data.Name) bpData["timestamp"] = timestamp.Unix() bpData["status"] = status @@ -251,7 +269,9 @@ func (s *Service) Handler(c HandlerConfig, ctx ...keyvalue.T) (alert.Handler, er func (h *handler) Handle(event alert.Event) { if err := h.s.Alert( h.c.AppKey, + event.State.ID, event.State.Message, + event.State.Details, event.State.Level, event.State.Time, event.Data, From 79dfe4c336119e7bea89f6120cddebac08a3bc3f Mon Sep 17 00:00:00 2001 From: Robert Hajek Date: Fri, 20 Nov 2020 14:18:01 +0100 Subject: [PATCH 14/14] feat: code cleanup --- services/bigpanda/service.go | 1 - 1 file changed, 1 deletion(-) diff --git a/services/bigpanda/service.go b/services/bigpanda/service.go index 9a0146f65..4c3923b48 100644 --- a/services/bigpanda/service.go +++ b/services/bigpanda/service.go @@ -113,7 +113,6 @@ func (s *Service) Test(options interface{}) error { if !ok { return fmt.Errorf("unexpected options type %T", options) } - //c := s.config() return s.Alert(o.AppKey, "", o.Message, "", o.Level, o.Timestamp, o.Data) }