Skip to content

Commit

Permalink
feat: Flux batch queries in TICKscripts (#2550)
Browse files Browse the repository at this point in the history
* feat: Flux batch queries in TICKscripts

* chore: remove unused code

* fix: split generic parser and kapacitor specific pieces

* chore: remove unnecessary commented out import

* chore: cleanup

* chore: more cleanup

* fix: HTTPClient.QueryFluxResponse should actually do what it is supposed to

* fix: ResultToBufferedBatches should be able to handle time.Time type

Co-authored-by: Sam Arnold <[email protected]>
  • Loading branch information
docmerlin and lesam authored Jun 4, 2021
1 parent 478a7d8 commit a6a0c27
Show file tree
Hide file tree
Showing 26 changed files with 1,185 additions and 64 deletions.
293 changes: 277 additions & 16 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,11 +66,20 @@ func (n *BatchNode) Wait() error {
func (n *BatchNode) DBRPs() ([]DBRP, error) {
var dbrps []DBRP
for _, b := range n.children {
d, err := b.(*QueryNode).DBRPs()
if err != nil {
return nil, err
switch b := b.(type) {
case *QueryNode:
if b != nil {
d, err := b.DBRPs()
if err != nil {
return nil, err
}
dbrps = append(dbrps, d...)
}
case *FluxQueryNode:
// flux queries don't really have DBRPs
default:
panic("BatchNode shouldn't be followed by anything except QueryNode or QueryFluxNode")
}
dbrps = append(dbrps, d...)
}
return dbrps, nil
}
Expand All @@ -81,35 +90,66 @@ func (n *BatchNode) Count() int {

func (n *BatchNode) Start() {
for _, b := range n.children {
b.(*QueryNode).Start()
switch b := b.(type) {
case *QueryNode:
b.Start()
case *FluxQueryNode:
b.Start()
default:
panic("BatchNode shouldn't be followed by anything except QueryNode or QueryFluxNode")
}
}
}

func (n *BatchNode) Abort() {
for _, b := range n.children {
b.(*QueryNode).Abort()
switch b := b.(type) {
case *QueryNode:
b.Abort()
case *FluxQueryNode:
b.Abort()
default:
panic("BatchNode shouldn't be followed by anything except QueryNode or QueryFluxNode")
}
}
}

type BatchQueries struct {
Queries []*Query
FluxQueries []*QueryFlux
Cluster string
GroupByMeasurement bool
}

func (n *BatchNode) Queries(start, stop time.Time) ([]BatchQueries, error) {
queries := make([]BatchQueries, len(n.children))
for i, b := range n.children {
qn := b.(*QueryNode)
qs, err := qn.Queries(start, stop)
if err != nil {
return nil, err
}
queries[i] = BatchQueries{
Queries: qs,
Cluster: qn.Cluster(),
GroupByMeasurement: qn.GroupByMeasurement(),
for i, qn := range n.children {
switch qn := qn.(type) {
case *QueryNode:
qs, err := qn.Queries(start, stop)
if err != nil {
return nil, err
}
queries[i] = BatchQueries{
Queries: qs,
Cluster: qn.Cluster(),
GroupByMeasurement: qn.GroupByMeasurement(),
}

case *FluxQueryNode:
qs, err := qn.Queries(start, stop)
if err != nil {
return nil, err
}
queries[i] = BatchQueries{
FluxQueries: qs,
Cluster: qn.Cluster(),
}

default:
panic("BatchNode shouldn't be followed by anything except QueryNode or QueryFluxNode")
}

}
return queries, nil
}
Expand Down Expand Up @@ -522,3 +562,224 @@ func (c *cronTicker) Stop() {
func (c *cronTicker) Next(now time.Time) time.Time {
return c.expr.Next(now)
}

// FluxQueryNode is a node for making flux queries
type FluxQueryNode struct {
node
b *pipeline.QueryFluxNode
query *QueryFlux
ticker ticker
queryMu sync.Mutex
queryErr chan error
closing chan struct{}
aborting chan struct{}

batchesQueried *expvar.Int
pointsQueried *expvar.Int
byName bool
}

func newQueryFluxNode(et *ExecutingTask, n *pipeline.QueryFluxNode, d NodeDiagnostic) (*FluxQueryNode, error) {
bn := &FluxQueryNode{
node: node{Node: n, et: et, diag: d},
b: n,
closing: make(chan struct{}),
aborting: make(chan struct{}),
}
bn.node.runF = bn.runBatch
bn.node.stopF = bn.stopBatch

// Create query
q, err := NewQueryFlux(n.QueryStr, n.Org, n.OrgID)
if err != nil {
return nil, err
}
bn.query = q
// Determine schedule
if n.Every != 0 && n.Cron != "" {
return nil, errors.New("must not set both 'every' and 'cron' properties")
}
switch {
case n.Every > 0:
bn.ticker = newTimeTicker(n.Every, n.AlignFlag)
case n.Cron != "":
var err error
bn.ticker, err = newCronTicker(n.Cron)
if err != nil {
return nil, err
}
case n.Every < 0:
return nil, errors.New("'every' duration must must non-negative")
default:
return nil, errors.New("must define one of 'every' or 'cron'")
}

return bn, nil
}

func (n *FluxQueryNode) Start() {
n.queryMu.Lock()
defer n.queryMu.Unlock()
n.queryErr = make(chan error, 1)
go func() {
n.queryErr <- n.doQuery(n.ins[0])
}()
}

func (n *FluxQueryNode) Abort() {
close(n.aborting)
}

func (n *FluxQueryNode) Cluster() string {
return n.b.Cluster
}

func (n *FluxQueryNode) Queries(start, stop time.Time) ([]*QueryFlux, error) {
now := time.Now()
if stop.IsZero() {
stop = now
}
// Crons are sensitive to timezones.
// Make sure we are using local time.
current := start.Local()
queries := make([]*QueryFlux, 0)
for {
current = n.ticker.Next(current)
if current.IsZero() || current.After(stop) {
break
}
qstop := current.Add(-1 * n.b.Offset)
if qstop.After(now) {
break
}

q, err := n.query.Clone()
if err != nil {
return nil, err
}
q.Now = now
queries = append(queries, q)
}
return queries, nil
}

// Query InfluxDB and collect batches on batch collector.
func (n *FluxQueryNode) doQuery(in edge.Edge) (err error) {
defer in.Close()
n.batchesQueried = &expvar.Int{}
n.pointsQueried = &expvar.Int{}

n.statMap.Set(statsBatchesQueried, n.batchesQueried)
n.statMap.Set(statsPointsQueried, n.pointsQueried)

if n.et.tm.InfluxDBService == nil {
return errors.New("InfluxDB not configured, cannot query InfluxDB for batch query")
}

con, err := n.et.tm.InfluxDBService.NewNamedClient(n.b.Cluster)
if err != nil {
return errors.Wrap(err, "failed to get InfluxDB client")
}
tickC := n.ticker.Start()
for {
select {
case <-n.closing:
return nil
case <-n.aborting:
return errors.New("batch doQuery aborted")
case now := <-tickC:
n.timer.Start()
// Update times for query
n.query.Now = now.Add(-1 * n.b.Offset) //SetStartTime(stop.Add(-1 * n.b.Period))
n.diag.StartingBatchQuery(n.query.stmt)

// Execute query
resp, err := con.QueryFluxResponse(influxdb.FluxQuery{
Query: n.query.stmt,
Org: n.query.org,
OrgID: n.query.orgID,
Now: n.query.Now,
})
if err != nil {
n.diag.Error("error executing query", err)
n.timer.Stop()
break
}
//Collect batches
for _, res := range resp.Results {
batches, err := edge.ResultToBufferedBatches(res, n.byName)
if err != nil {
n.diag.Error("failed to understand query result", err)
continue
}
for _, bch := range batches {
// Set stop time based off query bounds
if bch.Begin().Time().IsZero() {
bch.Begin().SetTime(now)
}
n.batchesQueried.Add(1)
n.pointsQueried.Add(int64(len(bch.Points())))

n.timer.Pause()
if err := in.Collect(bch); err != nil {
return err
}
n.timer.Resume()
}
}
n.timer.Stop()
}
}
}

func (n *FluxQueryNode) runBatch([]byte) error {
errC := make(chan error, 1)
go func() {
defer func() {
err := recover()
if err != nil {
errC <- fmt.Errorf("%v", err)
}
}()
for bt, ok := n.ins[0].Emit(); ok; bt, ok = n.ins[0].Emit() {
for _, child := range n.outs {
err := child.Collect(bt)
if err != nil {
errC <- err
return
}
}
}
errC <- nil
}()
var queryErr error
n.queryMu.Lock()
if n.queryErr != nil {
n.queryMu.Unlock()
select {
case queryErr = <-n.queryErr:
case <-n.aborting:
queryErr = errors.New("batch queryErr aborted")
}
} else {
n.queryMu.Unlock()
}

var err error
select {
case err = <-errC:
case <-n.aborting:
err = errors.New("batch run aborted")
}
if queryErr != nil {
return queryErr
}
return err
}

func (n *FluxQueryNode) stopBatch() {
if n.ticker != nil {
n.ticker.Stop()
}
close(n.closing)
}
14 changes: 6 additions & 8 deletions cmd/kapacitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,12 @@ var (
rbId = recordBatchFlags.String("recording-id", "", "The ID to give to this recording. If not set an random ID is chosen.")

recordQueryFlags = flag.NewFlagSet("record-query", flag.ExitOnError)
rqQuery = recordQueryFlags.String("query", "", "The query to record.")
rqType = recordQueryFlags.String("type", "", "The type of the recording to save (stream|batch).")
rqCluster = recordQueryFlags.String("cluster", "", "Optional named InfluxDB cluster from configuration.")
rqNowait = recordQueryFlags.Bool("no-wait", false, "Do not wait for the recording to finish.")
rqId = recordQueryFlags.String("recording-id", "", "The ID to give to this recording. If not set an random ID is chosen.")
// TODO: queryFlux recording
rqQuery = recordQueryFlags.String("query", "", "The query to record.")
rqType = recordQueryFlags.String("type", "", "The type of the recording to save (stream|batch).")
rqCluster = recordQueryFlags.String("cluster", "", "Optional named InfluxDB cluster from configuration.")
rqNowait = recordQueryFlags.Bool("no-wait", false, "Do not wait for the recording to finish.")
rqId = recordQueryFlags.String("recording-id", "", "The ID to give to this recording. If not set an random ID is chosen.")
)

func recordUsage() {
Expand Down Expand Up @@ -539,7 +540,6 @@ func doRecord(args []string) error {
return fmt.Errorf("Unknown record type %q, expected 'stream', 'batch' or 'query'", args[0])
}
if noWait {
fmt.Println(recording.ID)
return nil
}
for recording.Status == client.Running {
Expand Down Expand Up @@ -1041,7 +1041,6 @@ func doReplay(args []string) error {
return err
}
if *rnowait {
fmt.Println(replay.ID)
return nil
}
for replay.Status == client.Running {
Expand Down Expand Up @@ -1225,7 +1224,6 @@ func doReplayLive(args []string) error {
return fmt.Errorf("Unknown replay-live type %q, expected 'batch' or 'query'", args[0])
}
if noWait {
fmt.Println(replay.ID)
return nil
}
for replay.Status == client.Running {
Expand Down
1 change: 1 addition & 0 deletions edge/edge.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ func (e *channelEdge) Collect(m Message) error {
}

func (e *channelEdge) Emit() (m Message, ok bool) {
// locked here
select {
case m, ok = <-e.messages:
case <-e.aborting:
Expand Down
Loading

0 comments on commit a6a0c27

Please sign in to comment.