From a6a0c277ec3e4b00751a5a71945338cfe2d4a353 Mon Sep 17 00:00:00 2001 From: "J. Emrys Landivar" Date: Thu, 3 Jun 2021 19:47:29 -0500 Subject: [PATCH] feat: Flux batch queries in TICKscripts (#2550) * feat: Flux batch queries in TICKscripts * chore: remove unused code * fix: split generic parser and kapacitor specific pieces * chore: remove unnecessary commented out import * chore: cleanup * chore: more cleanup * fix: HTTPClient.QueryFluxResponse should actually do what it is supposed to * fix: ResultToBufferedBatches should be able to handle time.Time type Co-authored-by: Sam Arnold --- batch.go | 293 +++++++++++++++++- cmd/kapacitor/main.go | 14 +- edge/edge.go | 1 + edge/messages.go | 22 +- fluxquery.go | 35 +++ influxdb/client.go | 16 +- influxdb/csv.go | 161 ++++++++++ influxdb/csv_test.go | 151 +++++++++ influxdb/event_parser.go | 163 ++++++++++ integrations/batcher_test.go | 49 ++- integrations/helpers_test.go | 8 + .../testdata/TestBatch_FluxQuery.0.brpl | 12 + pipeline/batch.go | 156 +++++++++- pipeline/json.go | 15 +- pipeline/tick/ast.go | 2 + pipeline/tick/batch_test.go | 16 + pipeline/tick/queryflux.go | 34 ++ pipeline/tick/queryflux_test.go | 38 +++ pipeline/tick/tick_test.go | 7 + query_test.go | 1 + services/ec2/client/client.go | 4 - services/influxdb/service_test.go | 44 ++- services/task_store/service.go | 1 - task.go | 3 +- task_master.go | 1 - tools.go | 2 + 26 files changed, 1185 insertions(+), 64 deletions(-) create mode 100644 fluxquery.go create mode 100644 influxdb/csv.go create mode 100644 influxdb/csv_test.go create mode 100644 influxdb/event_parser.go create mode 100644 integrations/testdata/TestBatch_FluxQuery.0.brpl create mode 100644 pipeline/tick/queryflux.go create mode 100644 pipeline/tick/queryflux_test.go diff --git a/batch.go b/batch.go index 967a9d22c..aa609e637 100644 --- a/batch.go +++ b/batch.go @@ -66,11 +66,20 @@ func (n *BatchNode) Wait() error { func (n *BatchNode) DBRPs() ([]DBRP, error) { var dbrps []DBRP for _, b := range n.children { - d, err := b.(*QueryNode).DBRPs() - if err != nil { - return nil, err + switch b := b.(type) { + case *QueryNode: + if b != nil { + d, err := b.DBRPs() + if err != nil { + return nil, err + } + dbrps = append(dbrps, d...) + } + case *FluxQueryNode: + // flux queries don't really have DBRPs + default: + panic("BatchNode shouldn't be followed by anything except QueryNode or QueryFluxNode") } - dbrps = append(dbrps, d...) } return dbrps, nil } @@ -81,35 +90,66 @@ func (n *BatchNode) Count() int { func (n *BatchNode) Start() { for _, b := range n.children { - b.(*QueryNode).Start() + switch b := b.(type) { + case *QueryNode: + b.Start() + case *FluxQueryNode: + b.Start() + default: + panic("BatchNode shouldn't be followed by anything except QueryNode or QueryFluxNode") + } } } func (n *BatchNode) Abort() { for _, b := range n.children { - b.(*QueryNode).Abort() + switch b := b.(type) { + case *QueryNode: + b.Abort() + case *FluxQueryNode: + b.Abort() + default: + panic("BatchNode shouldn't be followed by anything except QueryNode or QueryFluxNode") + } } } type BatchQueries struct { Queries []*Query + FluxQueries []*QueryFlux Cluster string GroupByMeasurement bool } func (n *BatchNode) Queries(start, stop time.Time) ([]BatchQueries, error) { queries := make([]BatchQueries, len(n.children)) - for i, b := range n.children { - qn := b.(*QueryNode) - qs, err := qn.Queries(start, stop) - if err != nil { - return nil, err - } - queries[i] = BatchQueries{ - Queries: qs, - Cluster: qn.Cluster(), - GroupByMeasurement: qn.GroupByMeasurement(), + for i, qn := range n.children { + switch qn := qn.(type) { + case *QueryNode: + qs, err := qn.Queries(start, stop) + if err != nil { + return nil, err + } + queries[i] = BatchQueries{ + Queries: qs, + Cluster: qn.Cluster(), + GroupByMeasurement: qn.GroupByMeasurement(), + } + + case *FluxQueryNode: + qs, err := qn.Queries(start, stop) + if err != nil { + return nil, err + } + queries[i] = BatchQueries{ + FluxQueries: qs, + Cluster: qn.Cluster(), + } + + default: + panic("BatchNode shouldn't be followed by anything except QueryNode or QueryFluxNode") } + } return queries, nil } @@ -522,3 +562,224 @@ func (c *cronTicker) Stop() { func (c *cronTicker) Next(now time.Time) time.Time { return c.expr.Next(now) } + +// FluxQueryNode is a node for making flux queries +type FluxQueryNode struct { + node + b *pipeline.QueryFluxNode + query *QueryFlux + ticker ticker + queryMu sync.Mutex + queryErr chan error + closing chan struct{} + aborting chan struct{} + + batchesQueried *expvar.Int + pointsQueried *expvar.Int + byName bool +} + +func newQueryFluxNode(et *ExecutingTask, n *pipeline.QueryFluxNode, d NodeDiagnostic) (*FluxQueryNode, error) { + bn := &FluxQueryNode{ + node: node{Node: n, et: et, diag: d}, + b: n, + closing: make(chan struct{}), + aborting: make(chan struct{}), + } + bn.node.runF = bn.runBatch + bn.node.stopF = bn.stopBatch + + // Create query + q, err := NewQueryFlux(n.QueryStr, n.Org, n.OrgID) + if err != nil { + return nil, err + } + bn.query = q + // Determine schedule + if n.Every != 0 && n.Cron != "" { + return nil, errors.New("must not set both 'every' and 'cron' properties") + } + switch { + case n.Every > 0: + bn.ticker = newTimeTicker(n.Every, n.AlignFlag) + case n.Cron != "": + var err error + bn.ticker, err = newCronTicker(n.Cron) + if err != nil { + return nil, err + } + case n.Every < 0: + return nil, errors.New("'every' duration must must non-negative") + default: + return nil, errors.New("must define one of 'every' or 'cron'") + } + + return bn, nil +} + +func (n *FluxQueryNode) Start() { + n.queryMu.Lock() + defer n.queryMu.Unlock() + n.queryErr = make(chan error, 1) + go func() { + n.queryErr <- n.doQuery(n.ins[0]) + }() +} + +func (n *FluxQueryNode) Abort() { + close(n.aborting) +} + +func (n *FluxQueryNode) Cluster() string { + return n.b.Cluster +} + +func (n *FluxQueryNode) Queries(start, stop time.Time) ([]*QueryFlux, error) { + now := time.Now() + if stop.IsZero() { + stop = now + } + // Crons are sensitive to timezones. + // Make sure we are using local time. + current := start.Local() + queries := make([]*QueryFlux, 0) + for { + current = n.ticker.Next(current) + if current.IsZero() || current.After(stop) { + break + } + qstop := current.Add(-1 * n.b.Offset) + if qstop.After(now) { + break + } + + q, err := n.query.Clone() + if err != nil { + return nil, err + } + q.Now = now + queries = append(queries, q) + } + return queries, nil +} + +// Query InfluxDB and collect batches on batch collector. +func (n *FluxQueryNode) doQuery(in edge.Edge) (err error) { + defer in.Close() + n.batchesQueried = &expvar.Int{} + n.pointsQueried = &expvar.Int{} + + n.statMap.Set(statsBatchesQueried, n.batchesQueried) + n.statMap.Set(statsPointsQueried, n.pointsQueried) + + if n.et.tm.InfluxDBService == nil { + return errors.New("InfluxDB not configured, cannot query InfluxDB for batch query") + } + + con, err := n.et.tm.InfluxDBService.NewNamedClient(n.b.Cluster) + if err != nil { + return errors.Wrap(err, "failed to get InfluxDB client") + } + tickC := n.ticker.Start() + for { + select { + case <-n.closing: + return nil + case <-n.aborting: + return errors.New("batch doQuery aborted") + case now := <-tickC: + n.timer.Start() + // Update times for query + n.query.Now = now.Add(-1 * n.b.Offset) //SetStartTime(stop.Add(-1 * n.b.Period)) + n.diag.StartingBatchQuery(n.query.stmt) + + // Execute query + resp, err := con.QueryFluxResponse(influxdb.FluxQuery{ + Query: n.query.stmt, + Org: n.query.org, + OrgID: n.query.orgID, + Now: n.query.Now, + }) + if err != nil { + n.diag.Error("error executing query", err) + n.timer.Stop() + break + } + //Collect batches + for _, res := range resp.Results { + batches, err := edge.ResultToBufferedBatches(res, n.byName) + if err != nil { + n.diag.Error("failed to understand query result", err) + continue + } + for _, bch := range batches { + // Set stop time based off query bounds + if bch.Begin().Time().IsZero() { + bch.Begin().SetTime(now) + } + n.batchesQueried.Add(1) + n.pointsQueried.Add(int64(len(bch.Points()))) + + n.timer.Pause() + if err := in.Collect(bch); err != nil { + return err + } + n.timer.Resume() + } + } + n.timer.Stop() + } + } +} + +func (n *FluxQueryNode) runBatch([]byte) error { + errC := make(chan error, 1) + go func() { + defer func() { + err := recover() + if err != nil { + errC <- fmt.Errorf("%v", err) + } + }() + for bt, ok := n.ins[0].Emit(); ok; bt, ok = n.ins[0].Emit() { + for _, child := range n.outs { + err := child.Collect(bt) + if err != nil { + errC <- err + return + } + } + } + errC <- nil + }() + var queryErr error + n.queryMu.Lock() + if n.queryErr != nil { + n.queryMu.Unlock() + select { + case queryErr = <-n.queryErr: + case <-n.aborting: + queryErr = errors.New("batch queryErr aborted") + } + } else { + n.queryMu.Unlock() + } + + var err error + select { + case err = <-errC: + case <-n.aborting: + err = errors.New("batch run aborted") + } + if queryErr != nil { + return queryErr + } + return err +} + +func (n *FluxQueryNode) stopBatch() { + if n.ticker != nil { + n.ticker.Stop() + } + close(n.closing) +} diff --git a/cmd/kapacitor/main.go b/cmd/kapacitor/main.go index 071fd8e06..d3bd68d78 100644 --- a/cmd/kapacitor/main.go +++ b/cmd/kapacitor/main.go @@ -348,11 +348,12 @@ var ( rbId = recordBatchFlags.String("recording-id", "", "The ID to give to this recording. If not set an random ID is chosen.") recordQueryFlags = flag.NewFlagSet("record-query", flag.ExitOnError) - rqQuery = recordQueryFlags.String("query", "", "The query to record.") - rqType = recordQueryFlags.String("type", "", "The type of the recording to save (stream|batch).") - rqCluster = recordQueryFlags.String("cluster", "", "Optional named InfluxDB cluster from configuration.") - rqNowait = recordQueryFlags.Bool("no-wait", false, "Do not wait for the recording to finish.") - rqId = recordQueryFlags.String("recording-id", "", "The ID to give to this recording. If not set an random ID is chosen.") + // TODO: queryFlux recording + rqQuery = recordQueryFlags.String("query", "", "The query to record.") + rqType = recordQueryFlags.String("type", "", "The type of the recording to save (stream|batch).") + rqCluster = recordQueryFlags.String("cluster", "", "Optional named InfluxDB cluster from configuration.") + rqNowait = recordQueryFlags.Bool("no-wait", false, "Do not wait for the recording to finish.") + rqId = recordQueryFlags.String("recording-id", "", "The ID to give to this recording. If not set an random ID is chosen.") ) func recordUsage() { @@ -539,7 +540,6 @@ func doRecord(args []string) error { return fmt.Errorf("Unknown record type %q, expected 'stream', 'batch' or 'query'", args[0]) } if noWait { - fmt.Println(recording.ID) return nil } for recording.Status == client.Running { @@ -1041,7 +1041,6 @@ func doReplay(args []string) error { return err } if *rnowait { - fmt.Println(replay.ID) return nil } for replay.Status == client.Running { @@ -1225,7 +1224,6 @@ func doReplayLive(args []string) error { return fmt.Errorf("Unknown replay-live type %q, expected 'batch' or 'query'", args[0]) } if noWait { - fmt.Println(replay.ID) return nil } for replay.Status == client.Running { diff --git a/edge/edge.go b/edge/edge.go index 837785319..7df5de905 100644 --- a/edge/edge.go +++ b/edge/edge.go @@ -64,6 +64,7 @@ func (e *channelEdge) Collect(m Message) error { } func (e *channelEdge) Emit() (m Message, ok bool) { + // locked here select { case m, ok = <-e.messages: case <-e.aborting: diff --git a/edge/messages.go b/edge/messages.go index 8f0c79d43..85e261c0a 100644 --- a/edge/messages.go +++ b/edge/messages.go @@ -819,17 +819,21 @@ func ResultToBufferedBatches(res influxdb.Result, groupByName bool) ([]BufferedB var t time.Time for i, c := range series.Columns { if c == "time" { - tStr, ok := v[i].(string) - if !ok { - return nil, fmt.Errorf("unexpected time value: %v", v[i]) - } - var err error - t, err = time.Parse(time.RFC3339Nano, tStr) - if err != nil { - t, err = time.Parse(time.RFC3339, tStr) + //tStr, ok := v[i].(string) + switch ts := v[i].(type) { + case string: + var err error + t, err = time.Parse(time.RFC3339Nano, ts) if err != nil { - return nil, fmt.Errorf("unexpected time format: %v", err) + t, err = time.Parse(time.RFC3339, ts) + if err != nil { + return nil, fmt.Errorf("unexpected time format: %v", err) + } } + case time.Time: + t = ts + default: + return nil, fmt.Errorf("unexpected time value: %v", v[i]) } } else { value := v[i] diff --git a/fluxquery.go b/fluxquery.go new file mode 100644 index 000000000..50399a190 --- /dev/null +++ b/fluxquery.go @@ -0,0 +1,35 @@ +package kapacitor + +import ( + "sync" + "time" +) + +type QueryFlux struct { + org string + orgID string + stmt string + queryMu sync.Mutex + Now time.Time +} + +func NewQueryFlux(queryString, org, orgID string) (*QueryFlux, error) { + return &QueryFlux{org: org, orgID: orgID, stmt: queryString}, nil +} + +// Deep clone this query +func (q *QueryFlux) Clone() (*QueryFlux, error) { + q.queryMu.Lock() + defer q.queryMu.Unlock() + return &QueryFlux{ + stmt: q.stmt, + org: q.org, + orgID: q.orgID, + Now: q.Now, + }, nil + +} + +func (q *QueryFlux) String() string { + return q.stmt +} diff --git a/influxdb/client.go b/influxdb/client.go index 600be698b..9435277e3 100644 --- a/influxdb/client.go +++ b/influxdb/client.go @@ -45,6 +45,12 @@ type Client interface { // The response is checked for an error and the is returned // if it exists QueryFlux(q FluxQuery) (flux.ResultIterator, error) + + // QueryFlux is for querying Influxdb with the Flux language + // The response is checked for an error and the is returned + // if it exists. Unlike QueryFlux, this returns a *Response + // object. + QueryFluxResponse(q FluxQuery) (*Response, error) } type ClientUpdater interface { @@ -618,11 +624,17 @@ func (c *HTTPClient) QueryFluxResponse(q FluxQuery) (*Response, error) { if err != nil { return nil, err } - _, err = c.doFlux(req, http.StatusOK) + reader, err := c.doFlux(req, http.StatusOK) + if err != nil { + return nil, err + } + + resp, err := NewFluxQueryResponse(reader) if err != nil { return nil, err } - panic("flux kapacitor response parsing not implemented") + return resp, nil + } func (c *HTTPClient) QueryFlux(q FluxQuery) (flux.ResultIterator, error) { diff --git a/influxdb/csv.go b/influxdb/csv.go new file mode 100644 index 000000000..2cce13e8f --- /dev/null +++ b/influxdb/csv.go @@ -0,0 +1,161 @@ +package influxdb + +import ( + "fmt" + "io" + "strconv" + "time" + + imodels "github.com/influxdata/influxdb/models" + "github.com/influxdata/kapacitor/models" + "github.com/pkg/errors" +) + +func NewFluxQueryResponse(r io.Reader) (*Response, error) { + builder := responseBuilder{} + err := NewFluxCSVEventParser(r, &builder).Parse() + if err != nil { + return nil, err + } + if builder.Err != nil { + return nil, builder.Err + } + return &Response{Results: []Result{{Series: builder.buf}}}, nil +} + +// queryCSVResult is the result of a flux query in CSV format +// it assumes a csv dialect with +// Annotations: []string{"datatype", "group"}, +// Delimiter: ",", +// Header: true, +type responseBuilder struct { + colNames []string + colNamesMap map[string]int + tags []int + measurementCol int + fields []int + defaultVals []string + Err error + buf []imodels.Row + seriesBuf *imodels.Row +} + +func (q *responseBuilder) TableStart(meta FluxTableMetaData, firstRow []string) { + if q.Err != nil { + return + } + q.seriesBuf = &imodels.Row{} + tags := make(models.Tags, len(firstRow)) + // add the tags from the row + for _, i := range q.tags { + tags[q.colNames[i]] = firstRow[i] + } + // add the column names from the row + for _, i := range q.fields { + q.seriesBuf.Columns = append(q.seriesBuf.Columns, q.colNames[i]) + } + q.seriesBuf.Tags = tags + if i, ok := q.colNamesMap["_measurement"]; ok { + q.seriesBuf.Name = firstRow[i] + } +} + +func (q *responseBuilder) TableEnd() { + if q.Err != nil { + return + } + if q.seriesBuf != nil { + q.buf = append(q.buf, *q.seriesBuf) + } + q.seriesBuf = nil +} + +func (q *responseBuilder) Error(err string) { + if q.Err != nil { + return + } + q.Err = errors.New("flux query error: " + err) +} + +func (q *responseBuilder) GroupStart(names []string, types []string, groups []bool) { + if q.Err != nil { + return + } + q.colNames = q.colNames[:0] + // replace "_time" that flux uses with "time" + for i := range names { + if names[i] == "_time" { + q.colNames = append(q.colNames, "time") + } else { + q.colNames = append(q.colNames, names[i]) + } + } + q.colNamesMap = make(map[string]int, len(q.colNames)) + q.tags = q.tags[:0] + q.fields = q.fields[:0] + for i := range q.colNames { + cn := q.colNames[i] + q.colNamesMap[cn] = i + switch cn { + case "_measurement", "_start", "_stop": + default: // its a tag or a field + if !groups[i] { // its a fields + q.fields = append(q.fields, i) + } else { + q.tags = append(q.tags, i) + } + } + } +} + +func (q *responseBuilder) DataRow(meta FluxTableMetaData, row []string) { + if q.Err != nil { + return + } + // Add the field values for the row + values := make([]interface{}, 0, len(q.fields)) + for _, i := range q.fields { + var err error + val, err := q.convert(meta.DataTypes[i], row[i]) + if err != nil { + q.Err = err + return + } + values = append(values, val) + } + q.seriesBuf.Values = append(q.seriesBuf.Values, values) +} + +// if it is part of the group key and not a time value named either _time, _start, or _stop + +func (q *responseBuilder) convert(dataType, value string) (interface{}, error) { + const ( + stringDatatype = "string" + timeDatatype = "dateTime" + floatDatatype = "double" + boolDatatype = "boolean" + intDatatype = "long" + uintDatatype = "unsignedLong" + timeDataTypeWithFmt = "dateTime:RFC3339" + ) + s := value + switch dataType { + case stringDatatype: + return s, nil + case timeDatatype, timeDataTypeWithFmt: + return time.Parse(time.RFC3339, s) + case floatDatatype: + return strconv.ParseFloat(s, 64) + case boolDatatype: + if s == "false" { + return false, nil + } + return true, nil + case intDatatype: + return strconv.ParseInt(s, 10, 64) + case uintDatatype: + return strconv.ParseUint(s, 10, 64) + default: + return nil, fmt.Errorf("%s has unknown data type %s", s, dataType) + } +} diff --git a/influxdb/csv_test.go b/influxdb/csv_test.go new file mode 100644 index 000000000..ad54b8c49 --- /dev/null +++ b/influxdb/csv_test.go @@ -0,0 +1,151 @@ +package influxdb + +import ( + "bytes" + "io/ioutil" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + imodels "github.com/influxdata/influxdb/models" +) + +func mustParseTime(s string) time.Time { + ts, err := time.Parse(time.RFC3339, s) + if err != nil { + panic(err) + } + return ts +} +func Test_FluxCSV(t *testing.T) { + x := ioutil.NopCloser(bytes.NewBufferString(` +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string +#group,false,false,true,true,false,false,true,true +,result,table,_start,_stop,_time,_value,_field,_measurement +,_result,0,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:14.5346632Z,45,counter,boltdb_reads_total +,_result,0,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:24.536408365Z,47,counter,boltdb_reads_total +,_result,0,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:34.536392986Z,49,counter,boltdb_reads_total +,_result,1,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:14.5346632Z,24,counter,boltdb_writes_total +,_result,1,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:24.536408365Z,24,counter,boltdb_writes_total +,_result,1,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:34.536392986Z,24,counter,boltdb_writes_total + +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string,string +#group,false,false,true,true,false,false,true,true,true +,result,table,_start,_stop,_time,_value,_field,_measurement,version +,_result,10,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:14.5346632Z,1,gauge,go_info,go1.15.2 +,_result,10,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:24.536408365Z,1,gauge,go_info,go1.15.2 +,_result,10,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:34.536392986Z,1,gauge,go_info,go1.15.2 + +#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string +#group,false,false,true,true,false,false,true,true +,result,table,_start,_stop,_time,_value,_field,_measurement +,_result,11,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:14.5346632Z,26004408,gauge,go_memstats_alloc_bytes +,_result,11,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:24.536408365Z,27478576,gauge,go_memstats_alloc_bytes +,_result,11,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:34.536392986Z,28957096,gauge,go_memstats_alloc_bytes +,_result,11,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:44.534859397Z,30440472,gauge,go_memstats_alloc_bytes +,_result,12,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:14.5346632Z,137883344,counter,go_memstats_alloc_bytes_total +,_result,12,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:24.536408365Z,139357512,counter,go_memstats_alloc_bytes_total +,_result,12,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:34.536392986Z,140836032,counter,go_memstats_alloc_bytes_total +,_result,12,2021-04-22T16:11:09.536898692Z,2021-04-22T16:13:09.536898692Z,2021-04-22T16:11:44.534859397Z,142319408,counter,go_memstats_alloc_bytes_total + +`)) + + res, err := NewFluxQueryResponse(x) + if err != nil { + t.Fatal(err) + } + expected := Response{ + Results: []Result{ + { + Series: []imodels.Row{ + { + Name: "boltdb_reads_total", + Tags: map[string]string{"_field": "counter"}, + Columns: []string{"time", "_value"}, + Values: [][]interface{}{ + {mustParseTime("2021-04-22T16:11:14.5346632Z"), float64(45)}, + {mustParseTime("2021-04-22T16:11:24.536408365Z"), float64(47)}, + {mustParseTime("2021-04-22T16:11:34.536392986Z"), float64(49)}, + }, + }, + { + Name: "boltdb_writes_total", + Tags: map[string]string{"_field": "counter"}, + Columns: []string{"time", "_value"}, + Values: [][]interface{}{ + {mustParseTime("2021-04-22T16:11:14.5346632Z"), float64(24)}, + {mustParseTime("2021-04-22T16:11:24.536408365Z"), float64(24)}, + {mustParseTime("2021-04-22T16:11:34.536392986Z"), float64(24)}, + }, + }, + { + Name: "go_info", + Tags: map[string]string{"_field": "gauge", "version": "go1.15.2"}, + Columns: []string{"time", "_value"}, + Values: [][]interface{}{ + {mustParseTime("2021-04-22T16:11:14.5346632Z"), float64(1)}, + {mustParseTime("2021-04-22T16:11:24.536408365Z"), float64(1)}, + {mustParseTime("2021-04-22T16:11:34.536392986Z"), float64(1)}, + }, + }, + { + Name: "go_memstats_alloc_bytes", + Tags: map[string]string{"_field": "gauge"}, + Columns: []string{"time", "_value"}, + Values: [][]interface{}{ + {mustParseTime("2021-04-22T16:11:14.5346632Z"), float64(2.6004408e+07)}, + {mustParseTime("2021-04-22T16:11:24.536408365Z"), float64(2.7478576e+07)}, + {mustParseTime("2021-04-22T16:11:34.536392986Z"), float64(2.8957096e+07)}, + {mustParseTime("2021-04-22T16:11:44.534859397Z"), float64(3.0440472e+07)}, + }, + }, + { + Name: "go_memstats_alloc_bytes_total", + Tags: map[string]string{"_field": "counter"}, + Columns: []string{"time", "_value"}, + Values: [][]interface{}{ + {mustParseTime("2021-04-22T16:11:14.5346632Z"), float64(137883344)}, + {mustParseTime("2021-04-22T16:11:24.536408365Z"), float64(139357512)}, + {mustParseTime("2021-04-22T16:11:34.536392986Z"), float64(140836032)}, + {mustParseTime("2021-04-22T16:11:44.534859397Z"), float64(142319408)}, + }, + }, + }, + }, + }, + } + if !cmp.Equal(res, &expected) { + t.Fatal(cmp.Diff(res, &expected)) + } +} +func Test_FluxCSV_Empty(t *testing.T) { + data := ioutil.NopCloser(bytes.NewBufferString(`#datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,string,string,double +#group,false,false,true,true,false,true,true,false +#default,_result,0,2018-04-17T00:00:00Z,2018-04-17T00:05:00Z,,cpu,A, +,result,table,_start,_stop,_time,_measurement,host,_value`)) + + res, err := NewFluxQueryResponse(data) + if err != nil { + t.Fatal(err) + } + + expected := Response{Results: []Result{{}}} + if !cmp.Equal(res, &expected) { + t.Fatal(cmp.Diff(res, &expected)) + } +} + +func Test_FluxCSV_Error(t *testing.T) { + data := ioutil.NopCloser(bytes.NewBufferString(`#datatype,string,string +#group,true,true +#default,, +,error,reference +,here is an error,`)) + + _, err := NewFluxQueryResponse(data) + exp := "flux query error: here is an error" + if err.Error() != exp { + t.Fatalf("Expected error '%s', but got '%s'", exp, err.Error()) + } + +} diff --git a/influxdb/event_parser.go b/influxdb/event_parser.go new file mode 100644 index 000000000..27093063c --- /dev/null +++ b/influxdb/event_parser.go @@ -0,0 +1,163 @@ +package influxdb + +import ( + "encoding/csv" + "io" + + "github.com/pkg/errors" +) + +type FluxTableMetaData struct { + TableNumber string + DataTypes []string + ColumnNames []string + Groups []bool +} + +type FluxCSVEventHandler interface { + // Error event represents a ',error' table in the flux result, which is special + Error(err string) + + // GroupStart gives the metadata for a new group of tables + GroupStart(names []string, types []string, groups []bool) + + // TableStart marks the start of a table + TableStart(meta FluxTableMetaData, firstRow []string) + + // DataRow is called for each regular data row + DataRow(meta FluxTableMetaData, row []string) + + // TableEnd marks the end of a table + TableEnd() +} + +// FluxCSVEventParser is an event-based parser for flux csv +// it assumes a csv dialect with +// Annotations: []string{"datatype", "group"}, +// Delimiter: ",", +// Header: true, +type FluxCSVEventParser struct { + handler FluxCSVEventHandler + csvReader *csv.Reader + row []string + colNames []string + dataTypes []string + group []bool + tableNum string + sawTableStart bool +} + +func NewFluxCSVEventParser(r io.Reader, handler FluxCSVEventHandler) *FluxCSVEventParser { + q := &FluxCSVEventParser{ + handler: handler, + csvReader: csv.NewReader(r), + tableNum: "0", + } + q.csvReader.FieldsPerRecord = -1 + q.csvReader.ReuseRecord = true + return q +} + +const skipNonDataFluxCols = 3 + +func (q *FluxCSVEventParser) Parse() error { + const ( + stateOtherRow = iota + stateNameRow + stateFirstDataRow + ) + state := stateOtherRow +readRow: + var err error + q.row, err = q.csvReader.Read() + if err != nil { + if err == io.EOF { + if q.sawTableStart { + q.handler.TableEnd() + } + return nil + } + return errors.Wrap(err, "unexpected error while finding next table") + } + + // check and process error tables + if len(q.row) > 2 && q.row[1] == "error" { + // note the call to Read invalidates the data in q.row but not the size + row, err := q.csvReader.Read() + if err != nil || len(row) != len(q.row) { + if err == io.EOF { + return errors.Wrap(err, "unexpected EOF in query tables") + } else if err == nil && len(row) != len(q.row) { + return errors.Wrap(err, "invalid query data") + } + return errors.Wrap(err, "failed to read error value") + } + q.handler.Error(row[1]) + return nil + } + + // skip short rows, this is so we can skip blank lines + if len(q.row) < skipNonDataFluxCols { + goto readRow + } + // processes based on first column + switch q.row[0] { + case "": + if len(q.row) <= 5 { + return errors.New("Unexpectedly few columns") + } + if state == stateNameRow { + newNames := q.row[skipNonDataFluxCols:] + if cap(q.colNames) < len(newNames) { + q.colNames = make([]string, len(newNames)) + } else { + q.colNames = q.colNames[:len(newNames)] + } + copy(q.colNames, newNames) + q.handler.GroupStart(q.colNames, q.dataTypes, q.group) + state = stateFirstDataRow + goto readRow + } + if q.tableNum != q.row[2] { // we have moved on to a new table + state = stateFirstDataRow + q.tableNum = q.row[2] + } + meta := FluxTableMetaData{ + TableNumber: q.tableNum, + DataTypes: q.dataTypes, + ColumnNames: q.colNames, + Groups: q.group, + } + if state == stateFirstDataRow { + if q.sawTableStart { + q.handler.TableEnd() + } + q.sawTableStart = true + q.handler.TableStart(meta, q.row[skipNonDataFluxCols:]) + q.handler.DataRow(meta, q.row[skipNonDataFluxCols:]) + state = stateOtherRow + goto readRow + } + q.handler.DataRow(meta, q.row[skipNonDataFluxCols:]) + goto readRow + case "#datatype": + newTypes := q.row[skipNonDataFluxCols:] + if cap(q.dataTypes) < len(newTypes) { + q.dataTypes = make([]string, len(newTypes)) + } else { + q.dataTypes = q.dataTypes[:len(newTypes)] + } + copy(q.dataTypes, newTypes) + goto readRow + case "#group": + q.group = q.group[:0] + for _, x := range q.row[skipNonDataFluxCols:] { + q.group = append(q.group, x == "true") + } + state = stateNameRow + goto readRow + default: + // if the first column isn't empty, and it isn't #datatype or #group or data row and we don't need it + goto readRow + } +} diff --git a/integrations/batcher_test.go b/integrations/batcher_test.go index 2e2c77dc6..a1cf93c36 100644 --- a/integrations/batcher_test.go +++ b/integrations/batcher_test.go @@ -19,6 +19,48 @@ import ( "github.com/influxdata/wlog" ) +func TestBatch_Flux(t *testing.T) { + testCases := []struct { + script string + name string + expected models.Result + }{ + { + name: "test1", + script: `batch|queryFlux('from(bucket:"example-bucket") +|> range(start:-1h) +|> filter(fn:(r) => +r._measurement == "cpu" and +r.cpu == "cpu-total" +) +|> aggregateWindow(every: 1m, fn: mean)') + .every(1s) +|httpOut('TestBatch_FluxQuery') +`, + expected: models.Result{ + Series: models.Rows{ + { + Name: "yeas", + Tags: map[string]string{"vote": "should we orange juice"}, + Columns: []string{"time", "value"}, + Values: [][]interface{}{ + {mustParseTime("1971-01-01T00:00:00Z"), "yea"}, + {mustParseTime("1971-01-01T00:00:02Z"), "nay"}, + {mustParseTime("1971-01-01T00:00:04Z"), "yea"}, + {mustParseTime("1971-01-01T00:00:05Z"), "yea"}, + {mustParseTime("1971-01-01T00:00:06Z"), "nay"}, + {mustParseTime("1971-01-01T00:00:08Z"), "yea"}, + }, + }, + }, + }, + }, + } + for _, tc := range testCases { + testBatcherWithOutput(t, "TestBatch_FluxQuery", tc.script, 21*time.Second, tc.expected, false) + } +} + func TestBatch_InvalidQuery(t *testing.T) { // Create a new execution env @@ -58,13 +100,6 @@ func TestBatch_InvalidQuery(t *testing.T) { } } -//{"name":"packets","points":[ -// {"fields":{"value":"bad"},"time":"2015-10-18T00:00:00Z"}, -// {"fields":{"value":"good"},"time":"2015-10-18T00:00:02Z"}, -// {"fields":{"value":"good"},"time":"2015-10-18T00:00:04Z"}, -// {"fields":{"value2":"good"},"time":"2015-10-18T00:00:05Z"}, -// {"fields":{"value":"bad"},"time":"2015-10-18T00:00:06Z"}, -// {"fields":{"value":"good"},"time":"2015-10-18T00:00:08Z"}]} func TestBatch_ChangeDetect(t *testing.T) { var script = ` diff --git a/integrations/helpers_test.go b/integrations/helpers_test.go index f3f016dab..9402b6157 100644 --- a/integrations/helpers_test.go +++ b/integrations/helpers_test.go @@ -20,6 +20,14 @@ import ( "github.com/influxdata/kapacitor/uuid" ) +func mustParseTime(s string) time.Time { + ts, err := time.Parse(time.RFC3339, s) + if err != nil { + panic(err) + } + return ts +} + func newHTTPDService() *httpd.Service { // create API server config := httpd.NewConfig() diff --git a/integrations/testdata/TestBatch_FluxQuery.0.brpl b/integrations/testdata/TestBatch_FluxQuery.0.brpl new file mode 100644 index 000000000..c0ab98f79 --- /dev/null +++ b/integrations/testdata/TestBatch_FluxQuery.0.brpl @@ -0,0 +1,12 @@ +{ + "name":"yeas", + "tags": {"vote": "should we orange juice"}, + "points":[ + {"fields":{"value":"yea"},"time":"2015-10-18T00:00:00Z"}, + {"fields":{"value":"nay"},"time":"2015-10-18T00:00:02Z"}, + {"fields":{"value":"yea"},"time":"2015-10-18T00:00:04Z"}, + {"fields":{"value":"yea"},"time":"2015-10-18T00:00:05Z"}, + {"fields":{"value":"nay"},"time":"2015-10-18T00:00:06Z"}, + {"fields":{"value":"yea"},"time":"2015-10-18T00:00:08Z"} + ] +} diff --git a/pipeline/batch.go b/pipeline/batch.go index 024f131ca..4880d779d 100644 --- a/pipeline/batch.go +++ b/pipeline/batch.go @@ -30,6 +30,7 @@ import ( // * batches_queried -- number of batches returned from queries // * points_queried -- total number of points in batches // + type BatchNode struct { node } @@ -94,6 +95,14 @@ func (b *BatchNode) Query(q string) *QueryNode { return n } +// The flux script to execute. +func (b *BatchNode) QueryFlux(q string) *QueryFluxNode { + n := newQueryFluxNode() + n.QueryStr = q + b.linkChild(n) + return n +} + // Do not add the source batch node to the dot output // since its not really an edge. // tick:ignore @@ -335,7 +344,7 @@ func (n *QueryNode) GroupByMeasurement() *QueryNode { return n } -// Align start and stop times for quiries with even boundaries of the QueryNode.Every property. +// Align start and stop times for queries with even boundaries of the QueryNode.Every property. // Does not apply if using the QueryNode.Cron property. // tick:property func (b *QueryNode) Align() *QueryNode { @@ -349,3 +358,148 @@ func (b *QueryNode) AlignGroup() *QueryNode { b.AlignGroupFlag = true return b } + +// A QueryFluxNode defines a source and a schedule for +// processing batch data. The data is queried from +// an InfluxDB database and then passed into the data pipeline. +// +// Example: +// batch +// |fluxQuery(''' +// from(bucket: "example-bucket") +// |> range(start: -1m) +// |> filter(fn: (r) => +// r._measurement == "example-measurement" and +// r._field == "example-field" +// ) +// ''') +// .period(1m) +// .every(20s) +// ... +// +// In the above example InfluxDB is queried every 20 seconds; the window of time returned +type QueryFluxNode struct { + chainnode `json:"-"` + + // The query text + //tick:ignore + QueryStr string `json:"queryStr"` + + // The period or length of time that will be queried from InfluxDB + Period time.Duration `json:"period"` + + // How often to query InfluxDB. + // + // The Every property is mutually exclusive with the Cron property. + Every time.Duration `json:"every"` + + // Align start and end times with the Every value + // Does not apply if Cron is used. + // tick:ignore + AlignFlag bool `tick:"Align" json:"align"` + + // Define a schedule using a cron syntax. + // + // The specific cron implementation is documented here: + // https://github.com/gorhill/cronexpr#implementation + // + // The Cron property is mutually exclusive with the Every property. + Cron string `json:"cron"` + + // How far back in time to query from the current time + // + // For example an Offest of 2 hours and an Every of 5m, + // Kapacitor will query InfluxDB every 5 minutes for the window of data 2 hours ago. + // + // This applies to Cron schedules as well. If the cron specifies to run every Sunday at + // 1 AM and the Offset is 1 hour. Then at 1 AM on Sunday the data from 12 AM will be queried. + Offset time.Duration `json:"offset"` + + // The name of a configured InfluxDB cluster. + // If empty the default cluster will be used. + Cluster string `json:"cluster"` + + // The influxdb 2x organization for flux + // if empty the default org will be used + Org string `json:"org"` + + // The influxdb 2x organization-id for flux + // if empty the default orgid will be used + OrgID string `json:"orgid"` +} + +func newQueryFluxNode() *QueryFluxNode { + return &QueryFluxNode{ + chainnode: newBasicChainNode("queryFlux", BatchEdge, BatchEdge), + } +} + +// MarshalJSON converts QueryNode to JSON +// tick:ignore +func (n *QueryFluxNode) MarshalJSON() ([]byte, error) { + type Alias QueryFluxNode + var raw = &struct { + TypeOf + *Alias + Period string `json:"period"` + Every string `json:"every"` + Offset string `json:"offset"` + }{ + TypeOf: TypeOf{ + Type: "queryFlux", + ID: n.ID(), + }, + Alias: (*Alias)(n), + Period: influxql.FormatDuration(n.Period), + Every: influxql.FormatDuration(n.Every), + Offset: influxql.FormatDuration(n.Offset), + } + return json.Marshal(raw) +} + +// UnmarshalJSON converts JSON to an QueryNode +// tick:ignore +func (n *QueryFluxNode) UnmarshalJSON(data []byte) error { + type Alias QueryFluxNode + var raw = &struct { + TypeOf + *Alias + Every string `json:"every"` + Offset string `json:"offset"` + }{ + Alias: (*Alias)(n), + } + err := json.Unmarshal(data, raw) + if err != nil { + return err + } + if raw.Type != "queryFlux" { + return fmt.Errorf("error unmarshaling node %d of type %s as QueryNode", raw.ID, raw.Type) + } + + n.Every, err = influxql.ParseDuration(raw.Every) + if err != nil { + return err + } + + n.Offset, err = influxql.ParseDuration(raw.Offset) + if err != nil { + return err + } + + n.setID(raw.ID) + return nil +} + +//tick:ignore +func (n *QueryFluxNode) ChainMethods() map[string]reflect.Value { + return map[string]reflect.Value{} +} + +// Align start and stop times for queries with even boundaries of the QueryNode.Every property. +// Does not apply if using the QueryNode.Cron property. +// tick:property +func (n *QueryFluxNode) Align() *QueryFluxNode { + n.AlignFlag = true + return n +} diff --git a/pipeline/json.go b/pipeline/json.go index 4b069cbef..ff21c1bea 100644 --- a/pipeline/json.go +++ b/pipeline/json.go @@ -173,8 +173,9 @@ func init() { // Filters modify the source and produce a specific data stream sourceFilters = map[string]func([]byte, Node) (Node, error){ - "from": unmarshalFrom, - "query": unmarshalQuery, + "from": unmarshalFrom, + "query": unmarshalQuery, + "queryFlux": unmarshalQueryFlux, } // Add default construction of chain nodes @@ -409,6 +410,16 @@ func unmarshalQuery(data []byte, source Node) (Node, error) { return child, err } +func unmarshalQueryFlux(data []byte, source Node) (Node, error) { + batch, ok := source.(*BatchNode) + if !ok { + return nil, fmt.Errorf("parent of query node must be a BatchNode but is %T", source) + } + child := batch.QueryFlux("") + err := json.Unmarshal(data, child) + return child, err +} + func unmarshalWhere(data []byte, parents []Node, typ TypeOf) (Node, error) { if len(parents) != 1 { return nil, fmt.Errorf("expected one parent for node %d but found %d", typ.ID, len(parents)) diff --git a/pipeline/tick/ast.go b/pipeline/tick/ast.go index 44a4d4823..071ce8183 100644 --- a/pipeline/tick/ast.go +++ b/pipeline/tick/ast.go @@ -126,6 +126,8 @@ func (a *AST) Create(n pipeline.Node, parents []ast.Node) (ast.Node, error) { return NewLog(parents).Build(node) case *pipeline.QueryNode: return NewQuery(parents).Build(node) + case *pipeline.QueryFluxNode: + return NewQueryFlux(parents).Build(node) case *pipeline.SampleNode: return NewSample(parents).Build(node) case *pipeline.ShiftNode: diff --git a/pipeline/tick/batch_test.go b/pipeline/tick/batch_test.go index db063e606..c0e4e093f 100644 --- a/pipeline/tick/batch_test.go +++ b/pipeline/tick/batch_test.go @@ -19,3 +19,19 @@ func TestBatch(t *testing.T) { t.Log(got) // print is helpful to get the correct format. } } + +func TestBatchFlux(t *testing.T) { + pipe, _, _ := BatchQueryFlux(`from(bucket: "example")`) + got, err := PipelineTick(pipe) + if err != nil { + t.Fatalf("Unexpected error building pipeline %v", err) + } + + want := `batch + |queryFlux('from(bucket: "example")') +` + if got != want { + t.Errorf("TestBatch = %v, want %v", got, want) + t.Log(got) // print is helpful to get the correct format. + } +} diff --git a/pipeline/tick/queryflux.go b/pipeline/tick/queryflux.go new file mode 100644 index 000000000..dedcaff6c --- /dev/null +++ b/pipeline/tick/queryflux.go @@ -0,0 +1,34 @@ +package tick + +import ( + "github.com/influxdata/kapacitor/pipeline" + "github.com/influxdata/kapacitor/tick/ast" +) + +// QueryFluxNode converts the QueryFluxNode pipeline node into the TICKScript AST +type QueryFluxNode struct { + Function +} + +// NewQueryFlux creates a QueryNode function builder +func NewQueryFlux(parents []ast.Node) *QueryFluxNode { + return &QueryFluxNode{ + Function{ + Parents: parents, + }, + } +} + +// Build creates a QueryNode ast.Node +func (n *QueryFluxNode) Build(q *pipeline.QueryFluxNode) (ast.Node, error) { + n.Pipe("queryFlux", q.QueryStr). + Dot("period", q.Period). + Dot("every", q.Every). + DotIf("align", q.AlignFlag). + Dot("cron", q.Cron). + Dot("offset", q.Offset). + Dot("cluster", q.Cluster). + Dot("orgID", q.OrgID). + Dot("org", q.Org) + return n.prev, n.err +} diff --git a/pipeline/tick/queryflux_test.go b/pipeline/tick/queryflux_test.go new file mode 100644 index 000000000..ac6a6c975 --- /dev/null +++ b/pipeline/tick/queryflux_test.go @@ -0,0 +1,38 @@ +package tick_test + +import ( + "testing" + "time" +) + +func TestQueryFlux(t *testing.T) { + pipe, _, query := BatchQueryFlux(`from(bucket:"example-bucket") +|> range(start:-1h) +|> filter(fn:(r) => +r._measurement == "cpu" and +r.cpu == "cpu-total" +) +|> aggregateWindow(every: 1m, fn: mean)`) + + query.Period = time.Minute + query.Every = 30 * time.Second + query.AlignFlag = true + query.Offset = time.Hour + query.Cluster = "mycluster" + + want := `batch + |queryFlux('from(bucket:"example-bucket") +|> range(start:-1h) +|> filter(fn:(r) => +r._measurement == "cpu" and +r.cpu == "cpu-total" +) +|> aggregateWindow(every: 1m, fn: mean)') + .period(1m) + .every(30s) + .align() + .offset(1h) + .cluster('mycluster') +` + PipelineTickTestHelper(t, pipe, want) +} diff --git a/pipeline/tick/tick_test.go b/pipeline/tick/tick_test.go index e4b344a96..984dd214c 100644 --- a/pipeline/tick/tick_test.go +++ b/pipeline/tick/tick_test.go @@ -97,6 +97,13 @@ func BatchQuery(q string) (pipe *pipeline.Pipeline, batch *pipeline.BatchNode, q return pipe, batch, query } +func BatchQueryFlux(q string) (pipe *pipeline.Pipeline, batch *pipeline.BatchNode, query *pipeline.QueryFluxNode) { + batch = &pipeline.BatchNode{} + pipe = pipeline.CreatePipelineSources(batch) + query = batch.QueryFlux(q) + return pipe, batch, query +} + func PipelineTick(pipe *pipeline.Pipeline) (string, error) { ast := tick.AST{} err := ast.Build(pipe) diff --git a/query_test.go b/query_test.go index 9ac819f61..30a2f88ad 100644 --- a/query_test.go +++ b/query_test.go @@ -139,6 +139,7 @@ func TestQuery_Clone(t *testing.T) { } } } + func TestQuery_IsGroupedByTime(t *testing.T) { q, err := kapacitor.NewQuery("SELECT usage FROM telegraf.autogen.cpu") if err != nil { diff --git a/services/ec2/client/client.go b/services/ec2/client/client.go index a2165db2d..ac06e1902 100644 --- a/services/ec2/client/client.go +++ b/services/ec2/client/client.go @@ -1,9 +1,6 @@ package client import ( - "fmt" - //"log" - "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/aws/credentials" @@ -35,7 +32,6 @@ func New(c Config) (Client, error) { Credentials: credentials.NewStaticCredentials(c.AccessKey, c.SecretKey, ""), }) if err != nil { - fmt.Println(err) return nil, err } svc := autoscaling.New(sess) diff --git a/services/influxdb/service_test.go b/services/influxdb/service_test.go index b5cb0eec7..87b9bdc0a 100644 --- a/services/influxdb/service_test.go +++ b/services/influxdb/service_test.go @@ -1238,20 +1238,17 @@ func (c *clientCreator) Create(config influxcli.Config) (influxcli.ClientUpdater } type influxDBClient struct { - clusterName string - PingFunc func(ctx context.Context) (time.Duration, string, error) - WriteFunc func(bp influxcli.BatchPoints) error - QueryFunc func(clusterName string, q influxcli.Query) (*influxcli.Response, error) - UpdateFunc func(influxcli.Config) error + clusterName string + PingFunc func(ctx context.Context) (time.Duration, string, error) + WriteFunc func(bp influxcli.BatchPoints) error + QueryFunc func(clusterName string, q influxcli.Query) (*influxcli.Response, error) + FluxQueryResponseFunc func(clusterName string, q influxcli.FluxQuery) (*influxcli.Response, error) + FluxQueryFunc func(clusterName string, q influxcli.FluxQuery) (flux.ResultIterator, error) + UpdateFunc func(influxcli.Config) error + WriteV2Func func(clusterName string, w influxcli.FluxWrite) error } -func (c influxDBClient) WriteV2(w influxcli.FluxWrite) error { - panic("not implemented") -} - -func (c influxDBClient) QueryFlux(q influxcli.FluxQuery) (flux.ResultIterator, error) { - panic("not implemented") -} +var _ influxcli.ClientUpdater = influxDBClient{} func (c influxDBClient) Close() error { return nil @@ -1263,12 +1260,21 @@ func (c influxDBClient) Ping(ctx context.Context) (time.Duration, string, error) } return 0, "testversion", nil } + func (c influxDBClient) Write(bp influxcli.BatchPoints) error { if c.WriteFunc != nil { return c.WriteFunc(bp) } return nil } + +func (c influxDBClient) WriteV2(w influxcli.FluxWrite) error { + if c.WriteV2Func != nil { + return c.WriteV2Func(c.clusterName, w) + } + return nil +} + func (c influxDBClient) Query(q influxcli.Query) (*influxcli.Response, error) { if c.QueryFunc != nil { return c.QueryFunc(c.clusterName, q) @@ -1276,6 +1282,20 @@ func (c influxDBClient) Query(q influxcli.Query) (*influxcli.Response, error) { return &influxcli.Response{}, nil } +func (c influxDBClient) QueryFlux(q influxcli.FluxQuery) (flux.ResultIterator, error) { + if c.FluxQueryFunc != nil { + return c.FluxQueryFunc(c.clusterName, q) + } + return nil, nil +} + +func (c influxDBClient) QueryFluxResponse(q influxcli.FluxQuery) (*influxcli.Response, error) { + if c.FluxQueryResponseFunc != nil { + return c.FluxQueryResponseFunc(c.clusterName, q) + } + return &influxcli.Response{}, nil +} + func (c influxDBClient) Update(config influxcli.Config) error { if c.UpdateFunc != nil { return c.UpdateFunc(config) diff --git a/services/task_store/service.go b/services/task_store/service.go index a89ca8f59..c91849123 100644 --- a/services/task_store/service.go +++ b/services/task_store/service.go @@ -2001,7 +2001,6 @@ func (ts *Service) startTask(task Task) error { ts.saveLastError(t.ID, err.Error()) return err } - // Start batching if t.Type == kapacitor.BatchTask { err := et.StartBatching() diff --git a/task.go b/task.go index 70c2f1554..e5962bb36 100644 --- a/task.go +++ b/task.go @@ -253,7 +253,6 @@ func (et *ExecutingTask) StartBatching() error { if et.Task.Type != BatchTask { return ErrWrongTaskType } - batcher := et.source.(*BatchNode) err := et.checkDBRPs(batcher) @@ -455,6 +454,8 @@ func (et *ExecutingTask) createNode(p pipeline.Node, d NodeDiagnostic) (n Node, n, err = newBatchNode(et, t, d) case *pipeline.QueryNode: n, err = newQueryNode(et, t, d) + case *pipeline.QueryFluxNode: + n, err = newQueryFluxNode(et, t, d) case *pipeline.WindowNode: n, err = newWindowNode(et, t, d) case *pipeline.HTTPOutNode: diff --git a/task_master.go b/task_master.go index 694bda9f1..5fbfd8f32 100644 --- a/task_master.go +++ b/task_master.go @@ -598,7 +598,6 @@ func (tm *TaskMaster) stopTask(id string) (err error) { case BatchTask: delete(tm.batches, id) } - err = et.stop() if err != nil { tm.diag.StoppedTaskWithError(id, err) diff --git a/tools.go b/tools.go index cff26dff0..5e335cbd7 100644 --- a/tools.go +++ b/tools.go @@ -8,6 +8,8 @@ package kapacitor import ( _ "github.com/benbjohnson/tmpl" _ "github.com/golang/protobuf/protoc-gen-go" + + // so we can use the rust dependencies of flux _ "github.com/influxdata/pkg-config" _ "github.com/mailru/easyjson/easyjson" )