From c05878b2d2f75e04d0dcfbbae7ee5f3dd0a5094c Mon Sep 17 00:00:00 2001 From: "J. Emrys Landivar" Date: Tue, 21 Jun 2022 13:32:50 -0500 Subject: [PATCH] feat(influxql): newer influxql (#2689) --- Gopkg.lock | 2 +- batch.go | 2 +- client/v1/client.go | 7 +- client/v1/client_test.go | 4 +- cmd/kapacitor/main.go | 2 +- go.mod | 19 ++- go.sum | 53 +++++-- influxdb_out.go | 2 +- influxql.gen.go | 66 ++++----- influxql.gen.go.tmpl | 18 +-- integrations/streamer_test.go | 8 +- join.go | 2 +- metaclient.go | 2 +- pipeline/barrier.go | 2 +- pipeline/batch.go | 2 +- pipeline/combine.go | 2 +- pipeline/derivative.go | 2 +- pipeline/flatten.go | 2 +- pipeline/http_post.go | 2 +- pipeline/influxdb_out.go | 2 +- pipeline/influxql.gen.go | 34 ++--- pipeline/influxql.gen.go.tmpl | 4 +- pipeline/influxql.go | 224 ++++++++++++++---------------- pipeline/join.go | 2 +- pipeline/k8s_autoscale.go | 2 +- pipeline/sample.go | 2 +- pipeline/shift.go | 2 +- pipeline/state_tracking.go | 2 +- pipeline/stats.go | 2 +- pipeline/stream.go | 2 +- pipeline/swarm_autoscale.go | 2 +- pipeline/udf.go | 2 +- pipeline/window.go | 2 +- query.go | 2 +- query_test.go | 2 + replay.go | 10 +- result.go | 4 +- server/server.go | 25 ++-- server/server_test.go | 83 +++++------ services/httpd/handler.go | 20 +-- services/influxdb/service.go | 2 +- services/influxdb/service_test.go | 2 +- services/replay/service.go | 2 +- services/task_store/util_test.go | 9 ++ task_master.go | 17 ++- tick/ast/json.go | 2 +- tick/ast/node.go | 2 +- tick/stateful/functions.go | 2 +- 48 files changed, 355 insertions(+), 312 deletions(-) diff --git a/Gopkg.lock b/Gopkg.lock index 7ec540378..763e84dc4 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -1114,7 +1114,7 @@ "github.com/influxdata/influxdb", "github.com/influxdata/influxdb/client", "github.com/influxdata/influxdb/client/v2", - "github.com/influxdata/influxdb/influxql", + "github.com/influxdata/influxql", "github.com/influxdata/influxdb/models", "github.com/influxdata/influxdb/services/collectd", "github.com/influxdata/influxdb/services/graphite", diff --git a/batch.go b/batch.go index aa609e637..a618376a5 100644 --- a/batch.go +++ b/batch.go @@ -7,7 +7,7 @@ import ( "time" "github.com/gorhill/cronexpr" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" "github.com/influxdata/kapacitor/edge" "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/influxdb" diff --git a/client/v1/client.go b/client/v1/client.go index 8f09592d9..ec0b80785 100644 --- a/client/v1/client.go +++ b/client/v1/client.go @@ -17,7 +17,8 @@ import ( "strconv" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/query" + "github.com/influxdata/influxql" khttp "github.com/influxdata/kapacitor/http" "github.com/pkg/errors" ) @@ -1283,7 +1284,7 @@ func (c *Client) ListTasks(opt *ListTasksOptions) ([]Task, error) { return r.Tasks, nil } -func (c *Client) TaskOutput(link Link, name string) (*influxql.Result, error) { +func (c *Client) TaskOutput(link Link, name string) (*query.Result, error) { u := *c.url u.Path = path.Join(link.Href, name) @@ -1291,7 +1292,7 @@ func (c *Client) TaskOutput(link Link, name string) (*influxql.Result, error) { if err != nil { return nil, err } - r := &influxql.Result{} + r := &query.Result{} _, err = c.Do(req, r, http.StatusOK) if err != nil { return nil, err diff --git a/client/v1/client_test.go b/client/v1/client_test.go index f13421582..b4f953939 100644 --- a/client/v1/client_test.go +++ b/client/v1/client_test.go @@ -11,8 +11,8 @@ import ( "time" "github.com/google/go-cmp/cmp" - "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/query" "github.com/influxdata/kapacitor/client/v1" ) @@ -1020,7 +1020,7 @@ func Test_TaskOutput(t *testing.T) { if err != nil { t.Fatal(err) } - exp := &influxql.Result{ + exp := &query.Result{ Series: models.Rows{{ Name: "cpu", Columns: []string{"time", "value"}, diff --git a/cmd/kapacitor/main.go b/cmd/kapacitor/main.go index 29e77db98..265e30db0 100644 --- a/cmd/kapacitor/main.go +++ b/cmd/kapacitor/main.go @@ -22,7 +22,7 @@ import ( humanize "github.com/dustin/go-humanize" "github.com/ghodss/yaml" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" "github.com/influxdata/kapacitor/client/v1" "github.com/pkg/errors" ) diff --git a/go.mod b/go.mod index 371f5a644..14a9e2788 100644 --- a/go.mod +++ b/go.mod @@ -25,8 +25,9 @@ require ( github.com/influxdata/flux v0.151.1 github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 github.com/influxdata/influx-cli/v2 v2.0.0-20210526124422-63da8eccbdb7 - github.com/influxdata/influxdb v1.8.4 + github.com/influxdata/influxdb v1.9.6 github.com/influxdata/influxdb/v2 v2.0.1-alpha.10.0.20210507184756-dc72dc3f0c07 + github.com/influxdata/influxql v1.1.1-0.20211004132434-7e7d61973256 github.com/influxdata/pkg-config v0.2.12 github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368 github.com/influxdata/wlog v0.0.0-20160411224016-7c63b0a71ef8 @@ -61,6 +62,7 @@ require ( cloud.google.com/go v0.82.0 // indirect cloud.google.com/go/bigquery v1.8.0 // indirect cloud.google.com/go/bigtable v1.10.1 // indirect + collectd.org v0.3.0 // indirect github.com/AlecAivazis/survey/v2 v2.2.9 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect github.com/Azure/azure-sdk-for-go v52.5.0+incompatible // indirect @@ -96,13 +98,11 @@ require ( github.com/aws/smithy-go v1.3.1 // indirect github.com/benbjohnson/immutable v0.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/bmizerany/pat v0.0.0-20170815010413-6226ea591a40 // indirect github.com/bonitoo-io/go-sql-bigquery v0.3.4-1.4.0 // indirect github.com/cespare/xxhash/v2 v2.1.1 // indirect github.com/cpuguy83/go-md2man/v2 v2.0.0 // indirect github.com/deepmap/oapi-codegen v1.6.0 // indirect github.com/denisenkom/go-mssqldb v0.10.0 // indirect - github.com/dgryski/go-bits v0.0.0-20180113010104-bd8a69a71dc2 // indirect github.com/dimchansky/utfbom v1.1.0 // indirect github.com/docker/go-connections v0.4.0 // indirect github.com/docker/go-units v0.4.0 // indirect @@ -114,6 +114,7 @@ require ( github.com/fatih/color v1.9.0 // indirect github.com/form3tech-oss/jwt-go v3.2.2+incompatible // indirect github.com/geoffgarside/ber v0.0.0-20170306085127-854377f11dfb // indirect + github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2 // indirect github.com/go-chi/chi v4.1.0+incompatible // indirect github.com/go-kit/kit v0.10.0 // indirect github.com/go-logfmt/logfmt v0.5.0 // indirect @@ -150,6 +151,7 @@ require ( github.com/imdario/mergo v0.3.9 // indirect github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040 // indirect github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 // indirect + github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6 // indirect github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect @@ -162,9 +164,9 @@ require ( github.com/jpillora/backoff v1.0.0 // indirect github.com/json-iterator/go v1.1.10 // indirect github.com/jstemmer/go-junit-report v0.9.1 // indirect + github.com/jsternberg/zap-logfmt v1.2.0 // indirect github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 // indirect github.com/kevinburke/go-bindata v3.11.0+incompatible // indirect - github.com/kimor79/gollectd v1.0.0 // indirect github.com/klauspost/compress v1.15.0 // indirect github.com/lib/pq v1.2.0 // indirect github.com/mattn/go-colorable v0.1.8 // indirect @@ -180,18 +182,19 @@ require ( github.com/mna/pigeon v1.0.1-0.20180808201053-bb0192cfc2ae // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/mschoch/smat v0.0.0-20160514031455-90eadee771ae // indirect github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f // indirect github.com/onsi/ginkgo v1.14.2 // indirect github.com/onsi/gomega v1.10.3 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.1 // indirect + github.com/philhofer/fwd v1.0.0 // indirect github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/pierrec/lz4/v4 v4.1.14 // indirect github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/prometheus/client_model v0.2.0 // indirect github.com/prometheus/procfs v0.6.0 // indirect - github.com/rakyll/statik v0.1.7 // indirect github.com/russross/blackfriday v1.5.2 // indirect github.com/russross/blackfriday/v2 v2.0.1 // indirect github.com/segmentio/kafka-go v0.3.10 // indirect @@ -201,11 +204,14 @@ require ( github.com/sirupsen/logrus v1.8.1 // indirect github.com/snowflakedb/gosnowflake v1.6.1 // indirect github.com/spf13/pflag v1.0.5 // indirect + github.com/tinylib/msgp v1.1.0 // indirect github.com/uber-go/tally v3.3.17+incompatible // indirect github.com/uber/athenadriver v1.1.13 // indirect github.com/uber/jaeger-lib v2.4.1+incompatible // indirect github.com/vertica/vertica-sql-go v1.1.1 // indirect + github.com/willf/bitset v1.1.9 // indirect github.com/xdg/stringprep v1.0.0 // indirect + github.com/xlab/treeprint v1.0.0 // indirect github.com/xwb1989/sqlparser v0.0.0-20180606152119-120387863bf2 // indirect go.mongodb.org/mongo-driver v1.4.6 // indirect go.opencensus.io v0.23.0 // indirect @@ -215,6 +221,7 @@ require ( golang.org/x/mod v0.4.2 // indirect golang.org/x/net v0.0.0-20220520000938-2e3eb7b945c2 // indirect golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c // indirect + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c // indirect golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e // indirect golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect golang.org/x/text v0.3.7 // indirect @@ -243,8 +250,6 @@ require ( replace gopkg.in/fsnotify.v1 => github.com/fsnotify/fsnotify v1.4.2 -replace github.com/influxdata/influxdb => github.com/influxdata/influxdb v1.1.4 - replace k8s.io/client-go => k8s.io/client-go v0.20.5 replace k8s.io/api => k8s.io/api v0.20.5 diff --git a/go.sum b/go.sum index c25c42e4a..8fab22414 100644 --- a/go.sum +++ b/go.sum @@ -1,11 +1,13 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= cloud.google.com/go v0.38.0/go.mod h1:990N+gfupTy94rShfmMCWGDn0LpTmnzTp2qbd1dvSRU= +cloud.google.com/go v0.43.0/go.mod h1:BOSR3VbTLkk6FDC/TcffxP4NF/FFBGA5ku+jvKOP7pg= cloud.google.com/go v0.44.1/go.mod h1:iSa0KzasP4Uvy3f1mN/7PiObzGgflwredwwASm/v6AU= cloud.google.com/go v0.44.2/go.mod h1:60680Gw3Yr4ikxnPRS/oxxkBccT6SA1yMk63TGekxKY= cloud.google.com/go v0.45.1/go.mod h1:RpBamKRgapWJb87xiFSdk4g1CME7QZg3uwTez+TSTjc= cloud.google.com/go v0.46.3/go.mod h1:a6bKKbmY7er1mI7TEI4lsAkts/mkhTSZK8w33B4RAg0= cloud.google.com/go v0.50.0/go.mod h1:r9sluTvynVuxRIOHXQEHMFffphuXHOMZMycpNR5e6To= +cloud.google.com/go v0.51.0/go.mod h1:hWtGJ6gnXH+KgDv+V0zFGDvpi07n3z8ZNj3T1RW0Gcw= cloud.google.com/go v0.52.0/go.mod h1:pXajvRH/6o3+F9jDHZWQ5PbGhn+o8w9qiu/CffaVdO4= cloud.google.com/go v0.53.0/go.mod h1:fp/UouUEsRkN6ryDKNW/Upv/JBKnv6WDthjR6+vze6M= cloud.google.com/go v0.54.0/go.mod h1:1rq2OEkV3YMf6n/9ZvGWI3GWw0VoqH/1x2nd8Is/bPc= @@ -27,6 +29,7 @@ cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUM cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc= cloud.google.com/go/bigquery v1.8.0 h1:PQcPefKFdaIzjQFbiyOgAqyx8q5djaE7x9Sqe712DPA= cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ= +cloud.google.com/go/bigtable v1.2.0/go.mod h1:JcVAOl45lrTmQfLj7T6TxyMzIN/3FGGcFm+2xVAli2o= cloud.google.com/go/bigtable v1.3.0/go.mod h1:z5EyKrPE8OQmeg4h5MNdKvuSnI9CCT49Ki3f23aBzio= cloud.google.com/go/bigtable v1.10.1 h1:QKcRHeAsraxIlrdCZ3LLobXKBvITqcOEnSbHG2rzL9g= cloud.google.com/go/bigtable v1.10.1/go.mod h1:cyHeKlx6dcZCO0oSQucYdauseD8kIENGuDOJPKMCVg8= @@ -42,6 +45,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0 h1:STgFzyU5/8miMl0//zKh2aQeTyeaUH3WN9bSUiJ09bA= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= +collectd.org v0.3.0 h1:iNBHGw1VvPJxH2B6RiFWFZ+vsjo1lCdRszBeOuwGi00= +collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/AlecAivazis/survey/v2 v2.2.9 h1:LWvJtUswz/W9/zVVXELrmlvdwWcKE60ZAw0FWV9vssk= github.com/AlecAivazis/survey/v2 v2.2.9/go.mod h1:9DYvHgXtiXm6nCn+jXnOXLKbH+Yo9u8fAS/SduGdoPk= @@ -101,6 +106,7 @@ github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBp github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/DATA-DOG/go-sqlmock v1.3.3/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/DATA-DOG/go-sqlmock v1.4.1 h1:ThlnYciV1iM/V0OSF/dtkqWb6xo5qITT1TJBG1MRDJM= github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q5eFN3EC/SaM= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= @@ -157,6 +163,7 @@ github.com/andreyvit/diff v0.0.0-20170406064948-c7f18ee00883/go.mod h1:rCTlJbsFo github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/aokoli/goutils v1.0.1 h1:7fpzNGoJ3VA8qcrm++XEE1QUe0mIwNeLa02Nwq7RDkg= github.com/aokoli/goutils v1.0.1/go.mod h1:SijmP0QR8LtwsmDs8Yii5Z/S4trXFGFC2oO5g9DP+DQ= +github.com/apache/arrow/go/arrow v0.0.0-20191024131854-af6fa24be0db/go.mod h1:VTxUBvSJ3s3eHAg65PNgrsn5BtqCRPdmyXh6rAfdxN0= github.com/apache/arrow/go/arrow v0.0.0-20200601151325-b2287a20f230/go.mod h1:QNYViu/X0HXDHw7m3KXzWSVXIbfUvJqBFe6Gj8/pYA0= github.com/apache/arrow/go/arrow v0.0.0-20200923215132-ac86123a3f01/go.mod h1:QNYViu/X0HXDHw7m3KXzWSVXIbfUvJqBFe6Gj8/pYA0= github.com/apache/arrow/go/arrow v0.0.0-20210722123801-4591d76fce28 h1:6ZRbTsAQWpML1HK8xOpZEAH9JQ/0X6VcjUjmovKcOQA= @@ -267,7 +274,6 @@ github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7 github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= -github.com/cpuguy83/go-md2man v1.0.10 h1:BSKMNlYxDvnunlTymqtgONjNnaRV1sTpcovwwjF22jk= github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE= github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.0 h1:EoUDS0afbrsXAZ9YQ9jdu/mZ2sXgT1/2yyNng4PGlyM= @@ -285,8 +291,6 @@ github.com/deepmap/oapi-codegen v1.6.0 h1:w/d1ntwh91XI0b/8ja7+u5SvA4IFfM0UNNLmiD github.com/deepmap/oapi-codegen v1.6.0/go.mod h1:ryDa9AgbELGeB+YEXE1dR53yAjHwFvE9iAUlWl9Al3M= github.com/denisenkom/go-mssqldb v0.10.0 h1:QykgLZBorFE95+gO3u9esLd0BmbvpWp0/waNNZfHBM8= github.com/denisenkom/go-mssqldb v0.10.0/go.mod h1:xbL0rPBG9cCiLr28tMa8zpbdarY27NDyej4t/EjAShU= -github.com/dgryski/go-bits v0.0.0-20180113010104-bd8a69a71dc2 h1:2+yip7nN/auel0PDwY7SIaTOxQPI2NwdkZkvpgtc3Pk= -github.com/dgryski/go-bits v0.0.0-20180113010104-bd8a69a71dc2/go.mod h1:/9UYwwvZuEgp+mQ4960SHWCU1FS+FgdFX+m5ExFByNs= github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8 h1:akOQj8IVgoeFfBTzGOEQakCYshWD6RNo1M5pivFXt70= github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8/go.mod h1:VMaSuZ+SZcx/wljOQKvp5srsbCiKDEb6K2wC4+PiBmQ= github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8PWV+bWy6jNmig1y/TA+kYO4g3RSRF0IAv0no= @@ -370,9 +374,12 @@ github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/globalsign/mgo v0.0.0-20180905125535-1ca0a4f7cbcb/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= github.com/globalsign/mgo v0.0.0-20181015135952-eeefdecb41b8/go.mod h1:xkRDCp4j0OGD1HRkm4kmhM+pmpv3AKq5SU7GMg4oO/Q= +github.com/glycerine/go-unsnap-stream v0.0.0-20180323001048-9f0cb55181dd/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2 h1:Ujru1hufTHVb++eG6OuNDKMxZnGIvF6o/u8q/8h2+I4= github.com/glycerine/go-unsnap-stream v0.0.0-20181221182339-f9677308dec2/go.mod h1:/20jfyN9Y5QPEAprSgKAUr+glWDY39ZiUEAYOEv5dsE= github.com/glycerine/goconvey v0.0.0-20180728074245-46e3a41ad493/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24= +github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31 h1:gclg6gY70GLy3PbkQ1AERPfmLMMagS60DKF78eWwLn8= +github.com/glycerine/goconvey v0.0.0-20190410193231-58a59202ab31/go.mod h1:Ogl1Tioa0aV7gstGFO7KhffUsb9M4ydbEbbxpcEDc24= github.com/go-chi/chi v4.1.0+incompatible h1:ETj3cggsVIY2Xao5ExCu6YhEh5MD6JTfcBzS37R260w= github.com/go-chi/chi v4.1.0+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= github.com/go-chi/chi/v5 v5.0.0/go.mod h1:BBug9lr0cqtdAhsu6R4AAdvufI0/XBzAQSsUqJpoZOs= @@ -477,6 +484,7 @@ github.com/go-openapi/validate v0.19.15/go.mod h1:tbn/fdOwYHgrhPBzidZfJC2MIVvs9G github.com/go-openapi/validate v0.20.1/go.mod h1:b60iJT+xNNLfaQJUqLI7946tYiFEOuE9E4k54HpKcJ0= github.com/go-openapi/validate v0.20.2/go.mod h1:e7OJoKNgd0twXZwIn0A43tHbvIcr/rZIVCbJBpTUoY0= github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-sql-driver/mysql v1.5.0 h1:ozyZYNQW3x3HtqT1jira07DN2PArx2v7/mN66gGcHOs= github.com/go-sql-driver/mysql v1.5.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg= github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk= @@ -737,6 +745,7 @@ github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANyt github.com/influxdata/cron v0.0.0-20191203200038-ded12750aac6/go.mod h1:XabtPPW2qsCg0tl+kjaPU+cFS+CjQXEXbT1VJvHT4og= github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe h1:7j4SdN/BvQwN6WoUq7mv0kg5U9NhnFBxPGMafYRKym0= github.com/influxdata/cron v0.0.0-20201006132531-4bb0a200dcbe/go.mod h1:XabtPPW2qsCg0tl+kjaPU+cFS+CjQXEXbT1VJvHT4og= +github.com/influxdata/flux v0.65.1/go.mod h1:J754/zds0vvpfwuq7Gc2wRdVwEodfpCFM7mYlOw2LqY= github.com/influxdata/flux v0.114.1/go.mod h1:dfG4vbsLSehtyx2h75GQM64jiPx5IObAfSaVGYHjDrg= github.com/influxdata/flux v0.151.1 h1:T/T87Q75+I0QM5/xzEYleebwLKNyAUL9sjetnOwV1eE= github.com/influxdata/flux v0.151.1/go.mod h1:qjuLZJvOoMUBcubg+qNrc0pLbG55iRCVNokwq/8q7is= @@ -744,25 +753,30 @@ github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69 h1:WQsmW0f github.com/influxdata/httprouter v1.3.1-0.20191122104820-ee83e2772f69/go.mod h1:pwymjR6SrP3gD3pRj9RJwdl1j5s3doEEV8gS4X9qSzA= github.com/influxdata/influx-cli/v2 v2.0.0-20210526124422-63da8eccbdb7 h1:9ibK8LdGVXx90F31gUpHYzCCwgjR9/WnZz5l/EK2gq0= github.com/influxdata/influx-cli/v2 v2.0.0-20210526124422-63da8eccbdb7/go.mod h1:A+JS4qejFQBmcfJIrYHVGejDcEOlcMVbCz4up86lAQ8= -github.com/influxdata/influxdb v1.1.4 h1:VLf6ewC579GzI+JQCCZdU2Ijeb/bnf6L8u+ptZhKXVM= -github.com/influxdata/influxdb v1.1.4/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY= +github.com/influxdata/influxdb v1.8.4/go.mod h1:JugdFhsvvI8gadxOI6noqNeeBHvWNTbfYGtiAn+2jhI= +github.com/influxdata/influxdb v1.9.6 h1:S9Mdwp501HRUnX2in/hs7DoIyCrcF7asfnNq/v5EvZ8= +github.com/influxdata/influxdb v1.9.6/go.mod h1:6waddyyJKoeLqfmLVrNxoOKxvQT/6t2Zuzdx8QyVcw4= github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040 h1:MBLCfcSsUyFPDJp6T7EoHp/Ph3Jkrm4EuUKLD2rUWHg= github.com/influxdata/influxdb-client-go/v2 v2.3.1-0.20210518120617-5d1fff431040/go.mod h1:vLNHdxTJkIf2mSLvGrpj8TCcISApPoXkaxP8g9uRlW8= github.com/influxdata/influxdb/v2 v2.0.1-alpha.10.0.20210507184756-dc72dc3f0c07 h1:ERHPVZofgMpPCS+vfWLOZk7UETeV/iVzsDhkEqkE8tY= github.com/influxdata/influxdb/v2 v2.0.1-alpha.10.0.20210507184756-dc72dc3f0c07/go.mod h1:JUtdw2axzK6sXrmCQ81TcK4yh7O94A3FFrwzm6xLwOI= github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo= -github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6 h1:CFx+pP90q/qg3spoiZjf8donE4WpAdjeJfPOcoNqkWo= github.com/influxdata/influxql v0.0.0-20180925231337-1cbfca8e56b6/go.mod h1:KpVI7okXjK6PRi3Z5B+mtKZli+R1DnZgb3N+tzevNgo= +github.com/influxdata/influxql v1.1.1-0.20200828144457-65d3ef77d385/go.mod h1:gHp9y86a/pxhjJ+zMjNXiQAA197Xk9wLxaz+fGG+kWk= +github.com/influxdata/influxql v1.1.1-0.20211004132434-7e7d61973256 h1:8io3jjCJ0j9NFvq3/m/rMrDiEILpsfOqWDPItUt/078= +github.com/influxdata/influxql v1.1.1-0.20211004132434-7e7d61973256/go.mod h1:gHp9y86a/pxhjJ+zMjNXiQAA197Xk9wLxaz+fGG+kWk= github.com/influxdata/line-protocol v0.0.0-20180522152040-32c6aa80de5e/go.mod h1:4kt73NQhadE3daL3WhR5EJ/J2ocX0PZzwxQ0gXJ7oFE= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839 h1:W9WBk7wlPfJLvMCdtV4zPulc4uCPrlywQOmbFOhgQNU= github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/influxdata/pkg-config v0.2.6/go.mod h1:EMS7Ll0S4qkzDk53XS3Z72/egBsPInt+BeRxb0WeSwk= github.com/influxdata/pkg-config v0.2.7/go.mod h1:EMS7Ll0S4qkzDk53XS3Z72/egBsPInt+BeRxb0WeSwk= -github.com/influxdata/pkg-config v0.2.11 h1:RDlWAvkTARzPRGChq34x179TYlRndq8OU5Ro80E9g3Q= github.com/influxdata/pkg-config v0.2.11/go.mod h1:EMS7Ll0S4qkzDk53XS3Z72/egBsPInt+BeRxb0WeSwk= github.com/influxdata/pkg-config v0.2.12 h1:KQ3Aw8ZEodq0uJwiNwc4d3wR2oGvr+HZ7RLF5rbgezk= github.com/influxdata/pkg-config v0.2.12/go.mod h1:EMS7Ll0S4qkzDk53XS3Z72/egBsPInt+BeRxb0WeSwk= github.com/influxdata/promql/v2 v2.12.0/go.mod h1:fxOPu+DY0bqCTCECchSRtWfc+0X19ybifQhZoQNF5D8= +github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6 h1:UzJnB7VRL4PSkUJHwsyzseGOmrO/r4yA+AuxGJxiZmA= +github.com/influxdata/roaring v0.4.13-0.20180809181101-fc520f41fab6/go.mod h1:bSgUQ7q5ZLSO+bKBGqJiCBGAl+9DxyW63zLTujjUlOE= +github.com/influxdata/tdigest v0.0.0-20181121200506-bf2b5ad3c0a9/go.mod h1:Js0mqiSBE6Ffsg94weZZ2c+v/ciT8QRHFOap7EKDrR0= github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b h1:i44CesU68ZBRvtCjBi3QSosCIKrjmMbYlQMFAwVLds4= github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b/go.mod h1:Z0kXnxzbTC2qrx4NaIzYkE1k66+6oEDQTvL95hQFh5Y= github.com/influxdata/usage-client v0.0.0-20160829180054-6d3895376368 h1:+TUUmaFa4YD1Q+7bH9o5NCHQGPMqZCYJiNW6lIIS9z4= @@ -805,6 +819,7 @@ github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/ github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfEUpgAwUN0o= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= +github.com/jsternberg/zap-logfmt v1.0.0/go.mod h1:uvPs/4X51zdkcm5jXl5SYoN+4RK21K8mysFmDaM/h+o= github.com/jsternberg/zap-logfmt v1.2.0 h1:1v+PK4/B48cy8cfQbxL4FmmNZrjnIMr2BsnyEmXqv2o= github.com/jsternberg/zap-logfmt v1.2.0/go.mod h1:kz+1CUmCutPWABnNkOu9hOHKdT2q3TDYCcsFy9hpqb0= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= @@ -823,17 +838,19 @@ github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNU github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= github.com/kevinburke/go-bindata v3.11.0+incompatible h1:RcC+GJNmrBHbGaOpQ9MBD8z22rdzlIm0esDRDkyxd4s= github.com/kevinburke/go-bindata v3.11.0+incompatible/go.mod h1:/pEEZ72flUW2p0yi30bslSp9YqD9pysLxunQDdb2CPM= -github.com/kimor79/gollectd v1.0.0 h1:6APoEersLJ2W0iwWj8C7COlienQ8XTWh8np4w0ggI8k= -github.com/kimor79/gollectd v1.0.0/go.mod h1:lDxzEAixH34FPZ0nBIpjCu2vR3ZdIKZbbnf2rc+b2ao= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.4.0/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.9.5/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/compress v1.13.1/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.15.0 h1:xqfchp4whNFxn5A4XFyyYtitiWI8Hy5EW59jEwcyL6U= github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= +github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg= +github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -1014,6 +1031,7 @@ github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go github.com/opentracing-contrib/go-stdlib v1.0.0/go.mod h1:qtI1ogk+2JhVPIXVc6q+NHziSmy2W5GbdQZFUHADCBU= github.com/opentracing/basictracer-go v1.0.0/go.mod h1:QfBfYuafItcjQuMwinw9GhYKwFXS9KnPs5lxoYwgW74= github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= +github.com/opentracing/opentracing-go v1.0.3-0.20180606204148-bd9c31933947/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o= github.com/opentracing/opentracing-go v1.2.0 h1:uEJPy/1a5RIPAJ0Ov+OIO8OxWu77jEv+1B0VhjKrZUs= github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc= @@ -1025,6 +1043,7 @@ github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIw github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY= github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc= +github.com/paulbellamy/ratecounter v0.2.0/go.mod h1:Hfx1hDpSGoqxkVVpBi/IlYD7kChlfo5C6hzIHwPqfFE= github.com/pborman/uuid v1.2.0/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtPdI/k= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.4.0/go.mod h1:PN7xzY2wHTK0K9p34ErDQMlFxa51Fk0OUruD3k1mMwo= @@ -1032,6 +1051,7 @@ github.com/pelletier/go-toml v1.7.0 h1:7utD74fnzVc/cpcyy8sjrlFr5vYpypUixARcHIMIG github.com/pelletier/go-toml v1.7.0/go.mod h1:vwGMzjaWMwyfHwgIBhI2YUM4fB6nL6lVAvS1LBMMhTE= github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/peterh/liner v1.0.1-0.20180619022028-8c1271fcf47f/go.mod h1:xIteQHvHuaLYG9IFj6mSxM0fCKrs34IrEQUhOYuGPHc= github.com/philhofer/fwd v1.0.0 h1:UbZqGr5Y38ApvM/V/jEljVxwocdweyH+vmYvRPBnbqQ= github.com/philhofer/fwd v1.0.0/go.mod h1:gk3iGcWd9+svBvR0sR+KPcfE+RNWozjowpeBVG3ZVNU= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= @@ -1101,11 +1121,10 @@ github.com/prometheus/prom2json v1.1.0/go.mod h1:v7OY1795b9fEUZgq4UU2+15YjRv0Lfp github.com/prometheus/prometheus v1.8.2-0.20210331101223-3cafc58827d1 h1:qHnjnMuVa8egkjH3KhD5PZxFKKtDchh/T6ygHSv14Fw= github.com/prometheus/prometheus v1.8.2-0.20210331101223-3cafc58827d1/go.mod h1:sf7j/iAbhZahjeC0s3wwMmp5dksrJ/Za1UKdR+j6Hmw= github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= -github.com/rakyll/statik v0.1.7 h1:OF3QCZUuyPxuGEP7B4ypUa7sB/iHtqOTDYZXGM8KOdQ= -github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Unghqrcc= github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 h1:N/ElC8H3+5XpJzTSTfLsJV/mx9Q9g7kxmchpfZyxgzM= github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= +github.com/retailnext/hllpp v1.0.1-0.20180308014038-101a6d2f8b52/go.mod h1:RDpi1RftBQPUCDRw6SmxeaREsAaRKnOclghuzp/WRzc= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.1.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -1130,6 +1149,7 @@ github.com/scaleway/scaleway-sdk-go v1.0.0-beta.7.0.20210223165440-c65ae3540d44/ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= +github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/segmentio/kafka-go v0.3.10 h1:h/1aSu7gWp6DXLmp0csxm8wrYD6rRYyaqclu2aQ/PWo= github.com/segmentio/kafka-go v0.3.10/go.mod h1:8rEphJEczp+yDE/R5vwmaqZgF1wllrl4ioQcNKB8wVA= github.com/serenize/snaker v0.0.0-20161123064335-543781d2b79b h1:B6dcIy62mIVH3xZ+Alc5J4fLDWthEa1RPzK7L7/glTw= @@ -1210,6 +1230,7 @@ github.com/tcnksm/go-input v0.0.0-20180404061846-548a7d7a8ee8/go.mod h1:IlWNj9v/ github.com/testcontainers/testcontainers-go v0.0.0-20190108154635-47c0da630f72/go.mod h1:wt/nMz68+kIO4RoguOZzsdv1B3kTYw+SuIKyJYRQpgE= github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= +github.com/tinylib/msgp v1.0.2/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tinylib/msgp v1.1.0 h1:9fQd+ICuRIu/ue4vxJZu6/LzxN0HwMds2nq/0cFvxHU= github.com/tinylib/msgp v1.1.0/go.mod h1:+d+yLhGm8mzTaHzB+wgMYrodPfmZrzkirds8fDWklFE= github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= @@ -1235,7 +1256,6 @@ github.com/uber/jaeger-lib v2.4.1+incompatible/go.mod h1:ComeNDZlWwrWnDv8aPp0Ba6 github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= -github.com/urfave/cli v1.22.1 h1:+mkCCcOFKPnCmVYVcURKps1Xe+3zP90gSYGNfRkjoIY= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli/v2 v2.3.0 h1:qph92Y649prgesehzOrQjdWyxFOp/QVM+6imKHad91M= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= @@ -1245,6 +1265,7 @@ github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+ github.com/vektah/gqlparser v1.1.2/go.mod h1:1ycwN7Ij5njmMkPPAOaRFY4rET2Enx7IkVv3vaXspKw= github.com/vertica/vertica-sql-go v1.1.1 h1:sZYijzBbvdAbJcl4cYlKjR+Eh/X1hGKzukWuhh8PjvI= github.com/vertica/vertica-sql-go v1.1.1/go.mod h1:fGr44VWdEvL+f+Qt5LkKLOT7GoxaWdoUCnPBU9h6t04= +github.com/willf/bitset v1.1.3/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/willf/bitset v1.1.9 h1:GBtFynGY9ZWZmEC9sWuu41/7VBXPFCOAbCbqTflOg9c= github.com/willf/bitset v1.1.9/go.mod h1:RjeCKbqT1RxIR/KWY6phxZiaY1IyutSBfGjNPySAYV4= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= @@ -1256,6 +1277,7 @@ github.com/xdg/stringprep v0.0.0-20180714160509-73f8eece6fdc/go.mod h1:Jhud4/sHM github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= +github.com/xlab/treeprint v0.0.0-20180616005107-d6fb6747feb6/go.mod h1:ce1O1j6UtZfjr22oyGxGLbauSBp2YVXpARAosm7dHBg= github.com/xlab/treeprint v1.0.0 h1:J0TkWtiuYgtdlrkkrDLISYBQ92M+X5m4LrIIMKrbDTs= github.com/xlab/treeprint v1.0.0/go.mod h1:IoImgRak9i3zJyuxOKUP1v4UZd1tMoKkq/Cimt1uhCg= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= @@ -1270,7 +1292,6 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= -github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/mwc v0.0.4 h1:9dNXNLtUB4lUXoXgyhy3YrKoV0OD7oRiu907YMS0nl0= github.com/zeebo/mwc v0.0.4/go.mod h1:qNHfgp/ZCpQNcJHwKcO5EP3VgaBrW6DPohsK4QfyxxE= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= @@ -1528,6 +1549,7 @@ golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20191220142924-d4481acd189f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200107162124-548cf772de50/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200113162924-86b910548bc1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200122134326-e047566fdf82/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1646,6 +1668,7 @@ golang.org/x/tools v0.0.0-20191130070609-6e064ea0cf2d/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191216173652-a0e659d51361/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= +golang.org/x/tools v0.0.0-20200108203644-89082a384178/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200117161641-43d50277825c/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200122220014-bf1340f18c4a/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= @@ -1686,6 +1709,7 @@ golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1N golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo= +gonum.org/v1/gonum v0.6.0/go.mod h1:9mxDZsDKxgMAuccQkewq682L+0eCu4dCN2yonUJTCLU= gonum.org/v1/gonum v0.8.2 h1:CCXrcPKiGGotvnN6jfUsKk4rRqm7q09/YbKb5xCEvtM= gonum.org/v1/gonum v0.8.2/go.mod h1:oe/vMfY3deqTw+1EZJhuvEW2iwGF1bW9wwu7XCu0+v0= gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw= @@ -1733,6 +1757,7 @@ google.golang.org/genproto v0.0.0-20190418145605-e7d98fc518a7/go.mod h1:VzzqZJRn google.golang.org/genproto v0.0.0-20190425155659-357c62f0e4bb/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190502173448-54afdca5d873/go.mod h1:VzzqZJRnGkLBvHegQrXjBqPurQTc5/KpmUdxsrq26oE= google.golang.org/genproto v0.0.0-20190530194941-fb225487d101/go.mod h1:z3L6/3dTEVtUr6QSP8miRzeRqwQOioJ9I66odjN4I7s= +google.golang.org/genproto v0.0.0-20190716160619-c506a9f90610/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190801165951-fa694d86fc64/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20190911173649-1774047e7e51/go.mod h1:IbNlFCBrqXvoKpeg0TB2l7cyZUmoaFKYIwrEpbDKLA8= @@ -1740,6 +1765,7 @@ google.golang.org/genproto v0.0.0-20191108220845-16a3f7862a1a/go.mod h1:n3cpQtvx google.golang.org/genproto v0.0.0-20191115194625-c23dd37a84c9/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20191216164720-4f79533eabd1/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20191230161307-f3c370f40bfb/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= +google.golang.org/genproto v0.0.0-20200108215221-bd8f9a0ef82f/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20200115191322-ca5a22157cba/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20200122232147-0452cf42e150/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc= google.golang.org/genproto v0.0.0-20200204135345-fa8e72b47b90/go.mod h1:GmwEX6Z4W5gMy59cAlVYjN9JhxgbQH6Gn+gFDQe2lzA= @@ -1894,7 +1920,6 @@ k8s.io/apimachinery v0.21.0/go.mod h1:jbreFvJo3ov9rj7eWT7+sYiRx+qZuCYXwWT1bcDswP k8s.io/client-go v0.20.5 h1:dJGtYUvFrFGjQ+GjXEIby0gZWdlAOc0xJBJqY3VyDxA= k8s.io/client-go v0.20.5/go.mod h1:Ee5OOMMYvlH8FCZhDsacjMlCBwetbGZETwo1OA+e6Zw= k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0= -k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE= k8s.io/klog/v2 v2.4.0/go.mod h1:Od+F08eJP+W3HUb4pSrPpgp9DGU4GzlpG/TmITuYh/Y= diff --git a/influxdb_out.go b/influxdb_out.go index abd816c9a..b1fca9d93 100644 --- a/influxdb_out.go +++ b/influxdb_out.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" "github.com/influxdata/kapacitor/edge" "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/influxdb" diff --git a/influxql.gen.go b/influxql.gen.go index 1a111a500..8d8792bf5 100644 --- a/influxql.gen.go +++ b/influxql.gen.go @@ -11,7 +11,7 @@ import ( "reflect" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/query" "github.com/influxdata/kapacitor/edge" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/pipeline" @@ -23,7 +23,7 @@ func convertFloatPoint( field string, isSimpleSelector bool, topBottomInfo *pipeline.TopBottomCallInfo, -) (*influxql.FloatPoint, error) { +) (*query.FloatPoint, error) { value, ok := p.Fields()[field] if !ok { return nil, fmt.Errorf("field %s missing from point cannot aggregate", field) @@ -32,9 +32,9 @@ func convertFloatPoint( if !ok { return nil, fmt.Errorf("field %s has wrong type: got %T exp float64", field, value) } - ap := &influxql.FloatPoint{ + ap := &query.FloatPoint{ Name: name, - Tags: influxql.NewTags(p.Tags()), + Tags: query.NewTags(p.Tags()), Time: p.Time().UnixNano(), Value: typed, } @@ -54,10 +54,10 @@ type floatPointAggregator struct { field string topBottomInfo *pipeline.TopBottomCallInfo isSimpleSelector bool - aggregator influxql.FloatPointAggregator + aggregator query.FloatPointAggregator } -func floatPopulateAuxFieldsAndTags(ap *influxql.FloatPoint, fieldsAndTags []string, fields models.Fields, tags models.Tags) { +func floatPopulateAuxFieldsAndTags(ap *query.FloatPoint, fieldsAndTags []string, fields models.Fields, tags models.Tags) { ap.Aux = make([]interface{}, len(fieldsAndTags)) for i, name := range fieldsAndTags { if f, ok := fields[name]; ok { @@ -79,7 +79,7 @@ func (a *floatPointAggregator) AggregatePoint(name string, p edge.FieldsTagsTime type floatPointEmitter struct { baseReduceContext - emitter influxql.FloatPointEmitter + emitter query.FloatPointEmitter isSimpleSelector bool byName bool } @@ -92,7 +92,7 @@ func (e *floatPointEmitter) EmitPoint() (edge.PointMessage, error) { ap := slice[0] var t time.Time if e.pointTimes { - if ap.Time == influxql.ZeroTime { + if ap.Time == query.ZeroTime { t = e.time } else { t = time.Unix(0, ap.Time).UTC() @@ -138,7 +138,7 @@ func (e *floatPointEmitter) EmitBatch() edge.BufferedBatchMessage { var t time.Time for i, ap := range slice { if e.pointTimes { - if ap.Time == influxql.ZeroTime { + if ap.Time == query.ZeroTime { t = e.time } else { t = time.Unix(0, ap.Time).UTC() @@ -181,7 +181,7 @@ func convertIntegerPoint( field string, isSimpleSelector bool, topBottomInfo *pipeline.TopBottomCallInfo, -) (*influxql.IntegerPoint, error) { +) (*query.IntegerPoint, error) { value, ok := p.Fields()[field] if !ok { return nil, fmt.Errorf("field %s missing from point cannot aggregate", field) @@ -190,9 +190,9 @@ func convertIntegerPoint( if !ok { return nil, fmt.Errorf("field %s has wrong type: got %T exp int64", field, value) } - ap := &influxql.IntegerPoint{ + ap := &query.IntegerPoint{ Name: name, - Tags: influxql.NewTags(p.Tags()), + Tags: query.NewTags(p.Tags()), Time: p.Time().UnixNano(), Value: typed, } @@ -212,10 +212,10 @@ type integerPointAggregator struct { field string topBottomInfo *pipeline.TopBottomCallInfo isSimpleSelector bool - aggregator influxql.IntegerPointAggregator + aggregator query.IntegerPointAggregator } -func integerPopulateAuxFieldsAndTags(ap *influxql.IntegerPoint, fieldsAndTags []string, fields models.Fields, tags models.Tags) { +func integerPopulateAuxFieldsAndTags(ap *query.IntegerPoint, fieldsAndTags []string, fields models.Fields, tags models.Tags) { ap.Aux = make([]interface{}, len(fieldsAndTags)) for i, name := range fieldsAndTags { if f, ok := fields[name]; ok { @@ -237,7 +237,7 @@ func (a *integerPointAggregator) AggregatePoint(name string, p edge.FieldsTagsTi type integerPointEmitter struct { baseReduceContext - emitter influxql.IntegerPointEmitter + emitter query.IntegerPointEmitter isSimpleSelector bool byName bool } @@ -250,7 +250,7 @@ func (e *integerPointEmitter) EmitPoint() (edge.PointMessage, error) { ap := slice[0] var t time.Time if e.pointTimes { - if ap.Time == influxql.ZeroTime { + if ap.Time == query.ZeroTime { t = e.time } else { t = time.Unix(0, ap.Time).UTC() @@ -296,7 +296,7 @@ func (e *integerPointEmitter) EmitBatch() edge.BufferedBatchMessage { var t time.Time for i, ap := range slice { if e.pointTimes { - if ap.Time == influxql.ZeroTime { + if ap.Time == query.ZeroTime { t = e.time } else { t = time.Unix(0, ap.Time).UTC() @@ -339,7 +339,7 @@ func convertStringPoint( field string, isSimpleSelector bool, topBottomInfo *pipeline.TopBottomCallInfo, -) (*influxql.StringPoint, error) { +) (*query.StringPoint, error) { value, ok := p.Fields()[field] if !ok { return nil, fmt.Errorf("field %s missing from point cannot aggregate", field) @@ -348,9 +348,9 @@ func convertStringPoint( if !ok { return nil, fmt.Errorf("field %s has wrong type: got %T exp string", field, value) } - ap := &influxql.StringPoint{ + ap := &query.StringPoint{ Name: name, - Tags: influxql.NewTags(p.Tags()), + Tags: query.NewTags(p.Tags()), Time: p.Time().UnixNano(), Value: typed, } @@ -370,10 +370,10 @@ type stringPointAggregator struct { field string topBottomInfo *pipeline.TopBottomCallInfo isSimpleSelector bool - aggregator influxql.StringPointAggregator + aggregator query.StringPointAggregator } -func stringPopulateAuxFieldsAndTags(ap *influxql.StringPoint, fieldsAndTags []string, fields models.Fields, tags models.Tags) { +func stringPopulateAuxFieldsAndTags(ap *query.StringPoint, fieldsAndTags []string, fields models.Fields, tags models.Tags) { ap.Aux = make([]interface{}, len(fieldsAndTags)) for i, name := range fieldsAndTags { if f, ok := fields[name]; ok { @@ -395,7 +395,7 @@ func (a *stringPointAggregator) AggregatePoint(name string, p edge.FieldsTagsTim type stringPointEmitter struct { baseReduceContext - emitter influxql.StringPointEmitter + emitter query.StringPointEmitter isSimpleSelector bool byName bool } @@ -408,7 +408,7 @@ func (e *stringPointEmitter) EmitPoint() (edge.PointMessage, error) { ap := slice[0] var t time.Time if e.pointTimes { - if ap.Time == influxql.ZeroTime { + if ap.Time == query.ZeroTime { t = e.time } else { t = time.Unix(0, ap.Time).UTC() @@ -454,7 +454,7 @@ func (e *stringPointEmitter) EmitBatch() edge.BufferedBatchMessage { var t time.Time for i, ap := range slice { if e.pointTimes { - if ap.Time == influxql.ZeroTime { + if ap.Time == query.ZeroTime { t = e.time } else { t = time.Unix(0, ap.Time).UTC() @@ -497,7 +497,7 @@ func convertBooleanPoint( field string, isSimpleSelector bool, topBottomInfo *pipeline.TopBottomCallInfo, -) (*influxql.BooleanPoint, error) { +) (*query.BooleanPoint, error) { value, ok := p.Fields()[field] if !ok { return nil, fmt.Errorf("field %s missing from point cannot aggregate", field) @@ -506,9 +506,9 @@ func convertBooleanPoint( if !ok { return nil, fmt.Errorf("field %s has wrong type: got %T exp bool", field, value) } - ap := &influxql.BooleanPoint{ + ap := &query.BooleanPoint{ Name: name, - Tags: influxql.NewTags(p.Tags()), + Tags: query.NewTags(p.Tags()), Time: p.Time().UnixNano(), Value: typed, } @@ -528,10 +528,10 @@ type booleanPointAggregator struct { field string topBottomInfo *pipeline.TopBottomCallInfo isSimpleSelector bool - aggregator influxql.BooleanPointAggregator + aggregator query.BooleanPointAggregator } -func booleanPopulateAuxFieldsAndTags(ap *influxql.BooleanPoint, fieldsAndTags []string, fields models.Fields, tags models.Tags) { +func booleanPopulateAuxFieldsAndTags(ap *query.BooleanPoint, fieldsAndTags []string, fields models.Fields, tags models.Tags) { ap.Aux = make([]interface{}, len(fieldsAndTags)) for i, name := range fieldsAndTags { if f, ok := fields[name]; ok { @@ -553,7 +553,7 @@ func (a *booleanPointAggregator) AggregatePoint(name string, p edge.FieldsTagsTi type booleanPointEmitter struct { baseReduceContext - emitter influxql.BooleanPointEmitter + emitter query.BooleanPointEmitter isSimpleSelector bool byName bool } @@ -566,7 +566,7 @@ func (e *booleanPointEmitter) EmitPoint() (edge.PointMessage, error) { ap := slice[0] var t time.Time if e.pointTimes { - if ap.Time == influxql.ZeroTime { + if ap.Time == query.ZeroTime { t = e.time } else { t = time.Unix(0, ap.Time).UTC() @@ -612,7 +612,7 @@ func (e *booleanPointEmitter) EmitBatch() edge.BufferedBatchMessage { var t time.Time for i, ap := range slice { if e.pointTimes { - if ap.Time == influxql.ZeroTime { + if ap.Time == query.ZeroTime { t = e.time } else { t = time.Unix(0, ap.Time).UTC() diff --git a/influxql.gen.go.tmpl b/influxql.gen.go.tmpl index 413b21015..076eb87c3 100644 --- a/influxql.gen.go.tmpl +++ b/influxql.gen.go.tmpl @@ -6,7 +6,7 @@ import ( "time" "reflect" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/query" "github.com/influxdata/kapacitor/models" "github.com/influxdata/kapacitor/edge" "github.com/influxdata/kapacitor/pipeline" @@ -21,7 +21,7 @@ func convert{{.Name}}Point( field string, isSimpleSelector bool, topBottomInfo *pipeline.TopBottomCallInfo, -) (*influxql.{{.Name}}Point, error) { +) (*query.{{.Name}}Point, error) { value, ok := p.Fields()[field] if !ok { return nil, fmt.Errorf("field %s missing from point cannot aggregate", field) @@ -30,9 +30,9 @@ func convert{{.Name}}Point( if !ok { return nil, fmt.Errorf("field %s has wrong type: got %T exp {{.Type}}", field, value) } - ap := &influxql.{{.Name}}Point{ + ap := &query.{{.Name}}Point{ Name: name, - Tags: influxql.NewTags(p.Tags()), + Tags: query.NewTags(p.Tags()), Time: p.Time().UnixNano(), Value: typed, } @@ -52,10 +52,10 @@ type {{.name}}PointAggregator struct { field string topBottomInfo *pipeline.TopBottomCallInfo isSimpleSelector bool - aggregator influxql.{{.Name}}PointAggregator + aggregator query.{{.Name}}PointAggregator } -func {{.name}}PopulateAuxFieldsAndTags(ap *influxql.{{.Name}}Point, fieldsAndTags []string, fields models.Fields, tags models.Tags) { +func {{.name}}PopulateAuxFieldsAndTags(ap *query.{{.Name}}Point, fieldsAndTags []string, fields models.Fields, tags models.Tags) { ap.Aux = make([]interface{}, len(fieldsAndTags)) for i, name := range fieldsAndTags { if f, ok := fields[name]; ok { @@ -77,7 +77,7 @@ func (a *{{.name}}PointAggregator) AggregatePoint(name string, p edge.FieldsTags type {{.name}}PointEmitter struct { baseReduceContext - emitter influxql.{{.Name}}PointEmitter + emitter query.{{.Name}}PointEmitter isSimpleSelector bool byName bool } @@ -90,7 +90,7 @@ func (e *{{.name}}PointEmitter) EmitPoint() (edge.PointMessage, error) { ap := slice[0] var t time.Time if e.pointTimes { - if ap.Time == influxql.ZeroTime { + if ap.Time == query.ZeroTime { t = e.time } else { t = time.Unix(0, ap.Time).UTC() @@ -136,7 +136,7 @@ func (e *{{.name}}PointEmitter) EmitBatch() edge.BufferedBatchMessage { var t time.Time for i, ap := range slice { if e.pointTimes { - if ap.Time == influxql.ZeroTime { + if ap.Time == query.ZeroTime { t = e.time } else { t = time.Unix(0, ap.Time).UTC() diff --git a/integrations/streamer_test.go b/integrations/streamer_test.go index 0dc4836bd..973b5b75d 100644 --- a/integrations/streamer_test.go +++ b/integrations/streamer_test.go @@ -12545,11 +12545,13 @@ stream t.Errorf("got %v exp %v", len(points), 1) } else { p := points[0] - if p.Name() != "m" { + if string(p.Name()) != "m" { t.Errorf("got %v exp %v", p.Name(), "m") } - if p.Fields()["count"] != int64(10) { - t.Errorf("got %v exp %v", p.Fields()["count"], 10.0) + if fields, err := p.Fields(); fields["count"] != int64(10) { + t.Errorf("got %v exp %v", fields["count"], 10.0) + } else if err != nil { + t.Errorf("expected no error on fields but got %v", err) } if len(p.Tags()) != 1 { t.Errorf("got %v exp %v", len(p.Tags()), 1) diff --git a/join.go b/join.go index 101145403..48d71c868 100644 --- a/join.go +++ b/join.go @@ -5,7 +5,7 @@ import ( "sync" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" "github.com/influxdata/kapacitor/edge" "github.com/influxdata/kapacitor/expvar" "github.com/influxdata/kapacitor/models" diff --git a/metaclient.go b/metaclient.go index 4282f08bd..9e974d31a 100644 --- a/metaclient.go +++ b/metaclient.go @@ -18,7 +18,7 @@ func (m *NoopMetaClient) CreateDatabase(name string) (*meta.DatabaseInfo, error) func (m *NoopMetaClient) CreateDatabaseWithRetentionPolicy(name string, rpi *meta.RetentionPolicySpec) (*meta.DatabaseInfo, error) { return nil, nil } -func (m *NoopMetaClient) CreateRetentionPolicy(database string, rpi *meta.RetentionPolicySpec) (*meta.RetentionPolicyInfo, error) { +func (m *NoopMetaClient) CreateRetentionPolicy(database string, spec *meta.RetentionPolicySpec, makeDefault bool) (*meta.RetentionPolicyInfo, error) { return nil, nil } func (m *NoopMetaClient) Database(name string) *meta.DatabaseInfo { diff --git a/pipeline/barrier.go b/pipeline/barrier.go index ab8b5d712..4cc407927 100644 --- a/pipeline/barrier.go +++ b/pipeline/barrier.go @@ -7,7 +7,7 @@ import ( "reflect" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" ) // A BarrierNode will emit a barrier with the current time, according to the system diff --git a/pipeline/batch.go b/pipeline/batch.go index 4880d779d..59570e008 100644 --- a/pipeline/batch.go +++ b/pipeline/batch.go @@ -7,7 +7,7 @@ import ( "reflect" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" ) // A node that handles creating several child QueryNodes. diff --git a/pipeline/combine.go b/pipeline/combine.go index c96912dd1..a44245818 100644 --- a/pipeline/combine.go +++ b/pipeline/combine.go @@ -6,7 +6,7 @@ import ( "strings" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" "github.com/influxdata/kapacitor/tick/ast" ) diff --git a/pipeline/derivative.go b/pipeline/derivative.go index 02af1ecbb..5f4cf63f9 100644 --- a/pipeline/derivative.go +++ b/pipeline/derivative.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" ) // Compute the derivative of a stream or batch. diff --git a/pipeline/flatten.go b/pipeline/flatten.go index dcc2dbdb0..e3e554651 100644 --- a/pipeline/flatten.go +++ b/pipeline/flatten.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" ) const ( diff --git a/pipeline/http_post.go b/pipeline/http_post.go index ac367fffe..f37b6704e 100644 --- a/pipeline/http_post.go +++ b/pipeline/http_post.go @@ -7,7 +7,7 @@ import ( "strings" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" ) // An HTTPPostNode will take the incoming data stream and POST it to an HTTP endpoint. diff --git a/pipeline/influxdb_out.go b/pipeline/influxdb_out.go index f7b92f616..6309f44ab 100644 --- a/pipeline/influxdb_out.go +++ b/pipeline/influxdb_out.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" ) const DefaultBufferSize = 1000 diff --git a/pipeline/influxql.gen.go b/pipeline/influxql.gen.go index e84d5475f..4a2d0242a 100644 --- a/pipeline/influxql.gen.go +++ b/pipeline/influxql.gen.go @@ -6,41 +6,41 @@ package pipeline -import "github.com/influxdata/influxdb/influxql" +import "github.com/influxdata/influxdb/query" //tick:ignore type ReduceCreater struct { - CreateFloatReducer func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) + CreateFloatReducer func() (query.FloatPointAggregator, query.FloatPointEmitter) - CreateFloatIntegerReducer func() (influxql.FloatPointAggregator, influxql.IntegerPointEmitter) + CreateFloatIntegerReducer func() (query.FloatPointAggregator, query.IntegerPointEmitter) - CreateFloatStringReducer func() (influxql.FloatPointAggregator, influxql.StringPointEmitter) + CreateFloatStringReducer func() (query.FloatPointAggregator, query.StringPointEmitter) - CreateFloatBooleanReducer func() (influxql.FloatPointAggregator, influxql.BooleanPointEmitter) + CreateFloatBooleanReducer func() (query.FloatPointAggregator, query.BooleanPointEmitter) - CreateIntegerFloatReducer func() (influxql.IntegerPointAggregator, influxql.FloatPointEmitter) + CreateIntegerFloatReducer func() (query.IntegerPointAggregator, query.FloatPointEmitter) - CreateIntegerReducer func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) + CreateIntegerReducer func() (query.IntegerPointAggregator, query.IntegerPointEmitter) - CreateIntegerStringReducer func() (influxql.IntegerPointAggregator, influxql.StringPointEmitter) + CreateIntegerStringReducer func() (query.IntegerPointAggregator, query.StringPointEmitter) - CreateIntegerBooleanReducer func() (influxql.IntegerPointAggregator, influxql.BooleanPointEmitter) + CreateIntegerBooleanReducer func() (query.IntegerPointAggregator, query.BooleanPointEmitter) - CreateStringFloatReducer func() (influxql.StringPointAggregator, influxql.FloatPointEmitter) + CreateStringFloatReducer func() (query.StringPointAggregator, query.FloatPointEmitter) - CreateStringIntegerReducer func() (influxql.StringPointAggregator, influxql.IntegerPointEmitter) + CreateStringIntegerReducer func() (query.StringPointAggregator, query.IntegerPointEmitter) - CreateStringReducer func() (influxql.StringPointAggregator, influxql.StringPointEmitter) + CreateStringReducer func() (query.StringPointAggregator, query.StringPointEmitter) - CreateStringBooleanReducer func() (influxql.StringPointAggregator, influxql.BooleanPointEmitter) + CreateStringBooleanReducer func() (query.StringPointAggregator, query.BooleanPointEmitter) - CreateBooleanFloatReducer func() (influxql.BooleanPointAggregator, influxql.FloatPointEmitter) + CreateBooleanFloatReducer func() (query.BooleanPointAggregator, query.FloatPointEmitter) - CreateBooleanIntegerReducer func() (influxql.BooleanPointAggregator, influxql.IntegerPointEmitter) + CreateBooleanIntegerReducer func() (query.BooleanPointAggregator, query.IntegerPointEmitter) - CreateBooleanStringReducer func() (influxql.BooleanPointAggregator, influxql.StringPointEmitter) + CreateBooleanStringReducer func() (query.BooleanPointAggregator, query.StringPointEmitter) - CreateBooleanReducer func() (influxql.BooleanPointAggregator, influxql.BooleanPointEmitter) + CreateBooleanReducer func() (query.BooleanPointAggregator, query.BooleanPointEmitter) TopBottomCallInfo *TopBottomCallInfo IsSimpleSelector bool diff --git a/pipeline/influxql.gen.go.tmpl b/pipeline/influxql.gen.go.tmpl index 5880b872f..4faa06eae 100644 --- a/pipeline/influxql.gen.go.tmpl +++ b/pipeline/influxql.gen.go.tmpl @@ -1,13 +1,13 @@ package pipeline -import "github.com/influxdata/influxdb/influxql" +import "github.com/influxdata/influxdb/query" //tick:ignore type ReduceCreater struct { {{with $types := .}} {{range $a := $types}} {{range $e := $types}} - Create{{$a.Name}}{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}Reducer func() (influxql.{{$a.Name}}PointAggregator, influxql.{{$e.Name}}PointEmitter) + Create{{$a.Name}}{{if ne $a.Name $e.Name}}{{$e.Name}}{{end}}Reducer func() (query.{{$a.Name}}PointAggregator, query.{{$e.Name}}PointEmitter) {{end}}{{end}}{{end}} TopBottomCallInfo *TopBottomCallInfo diff --git a/pipeline/influxql.go b/pipeline/influxql.go index 370607e96..2fea097e7 100644 --- a/pipeline/influxql.go +++ b/pipeline/influxql.go @@ -6,7 +6,8 @@ import ( "fmt" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/query" + "github.com/influxdata/influxql" ) // tmpl -- go get github.com/benbjohnson/tmpl @@ -48,6 +49,9 @@ type InfluxQLNode struct { // tick:ignore PointTimes bool `tick:"UsePointTimes" json:"usePointTimes"` + //tick:ignore + Reducer Node + // tick:ignore Args []interface{} `json:"args"` } @@ -171,20 +175,20 @@ func (n *InfluxQLNode) UsePointTimes() *InfluxQLNode { // Count the number of points. func (n *chainnode) Count(field string) *InfluxQLNode { i := newInfluxQLNode("count", field, n.Provides(), StreamEdge, ReduceCreater{ - CreateFloatIntegerReducer: func() (influxql.FloatPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewFloatFuncIntegerReducer(influxql.FloatCountReduce, &influxql.IntegerPoint{Value: 0}) + CreateFloatIntegerReducer: func() (query.FloatPointAggregator, query.IntegerPointEmitter) { + fn := query.NewFloatFuncIntegerReducer(query.FloatCountReduce, &query.IntegerPoint{Value: 0}) return fn, fn }, - CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewIntegerFuncReducer(influxql.IntegerCountReduce, &influxql.IntegerPoint{Value: 0}) + CreateIntegerReducer: func() (query.IntegerPointAggregator, query.IntegerPointEmitter) { + fn := query.NewIntegerFuncReducer(query.IntegerCountReduce, &query.IntegerPoint{Value: 0}) return fn, fn }, - CreateStringIntegerReducer: func() (influxql.StringPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewStringFuncIntegerReducer(influxql.StringCountReduce, &influxql.IntegerPoint{Value: 0}) + CreateStringIntegerReducer: func() (query.StringPointAggregator, query.IntegerPointEmitter) { + fn := query.NewStringFuncIntegerReducer(query.StringCountReduce, &query.IntegerPoint{Value: 0}) return fn, fn }, - CreateBooleanIntegerReducer: func() (influxql.BooleanPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewBooleanFuncIntegerReducer(influxql.BooleanCountReduce, &influxql.IntegerPoint{Value: 0}) + CreateBooleanIntegerReducer: func() (query.BooleanPointAggregator, query.IntegerPointEmitter) { + fn := query.NewBooleanFuncIntegerReducer(query.BooleanCountReduce, &query.IntegerPoint{Value: 0}) return fn, fn }, IsEmptyOK: true, @@ -196,20 +200,20 @@ func (n *chainnode) Count(field string) *InfluxQLNode { // Produce batch of only the distinct points. func (n *chainnode) Distinct(field string) *InfluxQLNode { i := newInfluxQLNode("distinct", field, n.Provides(), BatchEdge, ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatDistinctReducer() + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatDistinctReducer() return fn, fn }, - CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewIntegerDistinctReducer() + CreateIntegerReducer: func() (query.IntegerPointAggregator, query.IntegerPointEmitter) { + fn := query.NewIntegerDistinctReducer() return fn, fn }, - CreateStringReducer: func() (influxql.StringPointAggregator, influxql.StringPointEmitter) { - fn := influxql.NewStringDistinctReducer() + CreateStringReducer: func() (query.StringPointAggregator, query.StringPointEmitter) { + fn := query.NewStringDistinctReducer() return fn, fn }, - CreateBooleanReducer: func() (influxql.BooleanPointAggregator, influxql.BooleanPointEmitter) { - fn := influxql.NewBooleanDistinctReducer() + CreateBooleanReducer: func() (query.BooleanPointAggregator, query.BooleanPointEmitter) { + fn := query.NewBooleanDistinctReducer() return fn, fn }, }) @@ -220,12 +224,12 @@ func (n *chainnode) Distinct(field string) *InfluxQLNode { // Compute the mean of the data. func (n *chainnode) Mean(field string) *InfluxQLNode { i := newInfluxQLNode("mean", field, n.Provides(), StreamEdge, ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatMeanReducer() + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatMeanReducer() return fn, fn }, - CreateIntegerFloatReducer: func() (influxql.IntegerPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewIntegerMeanReducer() + CreateIntegerFloatReducer: func() (query.IntegerPointAggregator, query.FloatPointEmitter) { + fn := query.NewIntegerMeanReducer() return fn, fn }, }) @@ -237,12 +241,12 @@ func (n *chainnode) Mean(field string) *InfluxQLNode { // if you want the median point use `.percentile(field, 50.0)`. func (n *chainnode) Median(field string) *InfluxQLNode { i := newInfluxQLNode("median", field, n.Provides(), StreamEdge, ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatSliceFuncReducer(influxql.FloatMedianReduceSlice) + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatSliceFuncReducer(query.FloatMedianReduceSlice) return fn, fn }, - CreateIntegerFloatReducer: func() (influxql.IntegerPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewIntegerSliceFuncFloatReducer(influxql.IntegerMedianReduceSlice) + CreateIntegerFloatReducer: func() (query.IntegerPointAggregator, query.FloatPointEmitter) { + fn := query.NewIntegerSliceFuncFloatReducer(query.IntegerMedianReduceSlice) return fn, fn }, }) @@ -253,12 +257,12 @@ func (n *chainnode) Median(field string) *InfluxQLNode { // Compute the mode of the data. func (n *chainnode) Mode(field string) *InfluxQLNode { i := newInfluxQLNode("mode", field, n.Provides(), StreamEdge, ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatSliceFuncReducer(influxql.FloatModeReduceSlice) + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatSliceFuncReducer(query.FloatModeReduceSlice) return fn, fn }, - CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewIntegerSliceFuncReducer(influxql.IntegerModeReduceSlice) + CreateIntegerReducer: func() (query.IntegerPointAggregator, query.IntegerPointEmitter) { + fn := query.NewIntegerSliceFuncReducer(query.IntegerModeReduceSlice) return fn, fn }, }) @@ -269,12 +273,14 @@ func (n *chainnode) Mode(field string) *InfluxQLNode { // Compute the difference between `min` and `max` points. func (n *chainnode) Spread(field string) *InfluxQLNode { i := newInfluxQLNode("spread", field, n.Provides(), StreamEdge, ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatSliceFuncReducer(influxql.FloatSpreadReduceSlice) + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + // fn := NewFloatSliceFuncReducer(query.FloatSpreadReduceSlice) + fn := query.NewFloatSpreadReducer() return fn, fn }, - CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewIntegerSliceFuncReducer(influxql.IntegerSpreadReduceSlice) + CreateIntegerReducer: func() (query.IntegerPointAggregator, query.IntegerPointEmitter) { + //fn := query.NewIntegerSliceFuncReducer(query.IntegerSpreadReduceSlice) + fn := query.NewIntegerSpreadReducer() return fn, fn }, }) @@ -285,12 +291,12 @@ func (n *chainnode) Spread(field string) *InfluxQLNode { // Compute the sum of all values. func (n *chainnode) Sum(field string) *InfluxQLNode { i := newInfluxQLNode("sum", field, n.Provides(), StreamEdge, ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatFuncReducer(influxql.FloatSumReduce, &influxql.FloatPoint{Value: 0}) + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatFuncReducer(query.FloatSumReduce, &query.FloatPoint{Value: 0}) return fn, fn }, - CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewIntegerFuncReducer(influxql.IntegerSumReduce, &influxql.IntegerPoint{Value: 0}) + CreateIntegerReducer: func() (query.IntegerPointAggregator, query.IntegerPointEmitter) { + fn := query.NewIntegerFuncReducer(query.IntegerSumReduce, &query.IntegerPoint{Value: 0}) return fn, fn }, IsEmptyOK: true, @@ -306,20 +312,20 @@ func (n *chainnode) Sum(field string) *InfluxQLNode { // Select the first point. func (n *chainnode) First(field string) *InfluxQLNode { i := newInfluxQLNode("first", field, n.Provides(), StreamEdge, ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatFuncReducer(influxql.FloatFirstReduce, nil) + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatFuncReducer(query.FloatFirstReduce, nil) return fn, fn }, - CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewIntegerFuncReducer(influxql.IntegerFirstReduce, nil) + CreateIntegerReducer: func() (query.IntegerPointAggregator, query.IntegerPointEmitter) { + fn := query.NewIntegerFuncReducer(query.IntegerFirstReduce, nil) return fn, fn }, - CreateStringReducer: func() (influxql.StringPointAggregator, influxql.StringPointEmitter) { - fn := influxql.NewStringFuncReducer(influxql.StringFirstReduce, nil) + CreateStringReducer: func() (query.StringPointAggregator, query.StringPointEmitter) { + fn := query.NewStringFuncReducer(query.StringFirstReduce, nil) return fn, fn }, - CreateBooleanReducer: func() (influxql.BooleanPointAggregator, influxql.BooleanPointEmitter) { - fn := influxql.NewBooleanFuncReducer(influxql.BooleanFirstReduce, nil) + CreateBooleanReducer: func() (query.BooleanPointAggregator, query.BooleanPointEmitter) { + fn := query.NewBooleanFuncReducer(query.BooleanFirstReduce, nil) return fn, fn }, IsSimpleSelector: true, @@ -331,20 +337,20 @@ func (n *chainnode) First(field string) *InfluxQLNode { // Select the last point. func (n *chainnode) Last(field string) *InfluxQLNode { i := newInfluxQLNode("last", field, n.Provides(), StreamEdge, ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatFuncReducer(influxql.FloatLastReduce, nil) + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatFuncReducer(query.FloatLastReduce, nil) return fn, fn }, - CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewIntegerFuncReducer(influxql.IntegerLastReduce, nil) + CreateIntegerReducer: func() (query.IntegerPointAggregator, query.IntegerPointEmitter) { + fn := query.NewIntegerFuncReducer(query.IntegerLastReduce, nil) return fn, fn }, - CreateStringReducer: func() (influxql.StringPointAggregator, influxql.StringPointEmitter) { - fn := influxql.NewStringFuncReducer(influxql.StringLastReduce, nil) + CreateStringReducer: func() (query.StringPointAggregator, query.StringPointEmitter) { + fn := query.NewStringFuncReducer(query.StringLastReduce, nil) return fn, fn }, - CreateBooleanReducer: func() (influxql.BooleanPointAggregator, influxql.BooleanPointEmitter) { - fn := influxql.NewBooleanFuncReducer(influxql.BooleanLastReduce, nil) + CreateBooleanReducer: func() (query.BooleanPointAggregator, query.BooleanPointEmitter) { + fn := query.NewBooleanFuncReducer(query.BooleanLastReduce, nil) return fn, fn }, IsSimpleSelector: true, @@ -356,12 +362,12 @@ func (n *chainnode) Last(field string) *InfluxQLNode { // Select the minimum point. func (n *chainnode) Min(field string) *InfluxQLNode { i := newInfluxQLNode("min", field, n.Provides(), StreamEdge, ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatFuncReducer(influxql.FloatMinReduce, nil) + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatFuncReducer(query.FloatMinReduce, nil) return fn, fn }, - CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewIntegerFuncReducer(influxql.IntegerMinReduce, nil) + CreateIntegerReducer: func() (query.IntegerPointAggregator, query.IntegerPointEmitter) { + fn := query.NewIntegerFuncReducer(query.IntegerMinReduce, nil) return fn, fn }, IsSimpleSelector: true, @@ -373,12 +379,12 @@ func (n *chainnode) Min(field string) *InfluxQLNode { // Select the maximum point. func (n *chainnode) Max(field string) *InfluxQLNode { i := newInfluxQLNode("max", field, n.Provides(), StreamEdge, ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatFuncReducer(influxql.FloatMaxReduce, nil) + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatFuncReducer(query.FloatMaxReduce, nil) return fn, fn }, - CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewIntegerFuncReducer(influxql.IntegerMaxReduce, nil) + CreateIntegerReducer: func() (query.IntegerPointAggregator, query.IntegerPointEmitter) { + fn := query.NewIntegerFuncReducer(query.IntegerMaxReduce, nil) return fn, fn }, IsSimpleSelector: true, @@ -390,12 +396,12 @@ func (n *chainnode) Max(field string) *InfluxQLNode { // Select a point at the given percentile. This is a selector function, no interpolation between points is performed. func (n *chainnode) Percentile(field string, percentile float64) *InfluxQLNode { i := newInfluxQLNode("percentile", field, n.Provides(), StreamEdge, ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatSliceFuncReducer(influxql.NewFloatPercentileReduceSliceFunc(percentile)) + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatSliceFuncReducer(query.NewFloatPercentileReduceSliceFunc(percentile)) return fn, fn }, - CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewIntegerSliceFuncReducer(influxql.NewIntegerPercentileReduceSliceFunc(percentile)) + CreateIntegerReducer: func() (query.IntegerPointAggregator, query.IntegerPointEmitter) { + fn := query.NewIntegerSliceFuncReducer(query.NewIntegerPercentileReduceSliceFunc(percentile)) return fn, fn }, IsSimpleSelector: true, @@ -417,20 +423,12 @@ func (n *chainnode) Top(num int64, field string, fieldsAndTags ...string) *Influ tags[i] = i } i := newInfluxQLNode("top", field, n.Provides(), BatchEdge, ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatSliceFuncReducer(influxql.NewFloatTopReduceSliceFunc( - int(num), - tags, - influxql.Interval{}, - )) + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatTopReducer(int(num)) return fn, fn }, - CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewIntegerSliceFuncReducer(influxql.NewIntegerTopReduceSliceFunc( - int(num), - tags, - influxql.Interval{}, - )) + CreateIntegerReducer: func() (query.IntegerPointAggregator, query.IntegerPointEmitter) { + fn := query.NewIntegerTopReducer(int(num)) return fn, fn }, TopBottomCallInfo: &TopBottomCallInfo{ @@ -452,20 +450,12 @@ func (n *chainnode) Bottom(num int64, field string, fieldsAndTags ...string) *In tags[i] = i } i := newInfluxQLNode("bottom", field, n.Provides(), BatchEdge, ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatSliceFuncReducer(influxql.NewFloatBottomReduceSliceFunc( - int(num), - tags, - influxql.Interval{}, - )) + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatBottomReducer(int(num)) return fn, fn }, - CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewIntegerSliceFuncReducer(influxql.NewIntegerBottomReduceSliceFunc( - int(num), - tags, - influxql.Interval{}, - )) + CreateIntegerReducer: func() (query.IntegerPointAggregator, query.IntegerPointEmitter) { + fn := query.NewIntegerBottomReducer(int(num)) return fn, fn }, TopBottomCallInfo: &TopBottomCallInfo{ @@ -487,12 +477,12 @@ func (n *chainnode) Bottom(num int64, field string, fieldsAndTags ...string) *In // Compute the standard deviation. func (n *chainnode) Stddev(field string) *InfluxQLNode { i := newInfluxQLNode("stddev", field, n.Provides(), StreamEdge, ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatSliceFuncReducer(influxql.FloatStddevReduceSlice) + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatSliceFuncReducer(query.FloatStddevReduceSlice) return fn, fn }, - CreateIntegerFloatReducer: func() (influxql.IntegerPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewIntegerSliceFuncFloatReducer(influxql.IntegerStddevReduceSlice) + CreateIntegerFloatReducer: func() (query.IntegerPointAggregator, query.FloatPointEmitter) { + fn := query.NewIntegerSliceFuncFloatReducer(query.IntegerStddevReduceSlice) return fn, fn }, }) @@ -503,20 +493,20 @@ func (n *chainnode) Stddev(field string) *InfluxQLNode { // Compute the elapsed time between points func (n *chainnode) Elapsed(field string, unit time.Duration) *InfluxQLNode { i := newInfluxQLNode("elapsed", field, n.Provides(), n.Provides(), ReduceCreater{ - CreateFloatIntegerReducer: func() (influxql.FloatPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewFloatElapsedReducer(influxql.Interval{Duration: unit}) + CreateFloatIntegerReducer: func() (query.FloatPointAggregator, query.IntegerPointEmitter) { + fn := query.NewFloatElapsedReducer(query.Interval{Duration: unit}) return fn, fn }, - CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewIntegerElapsedReducer(influxql.Interval{Duration: unit}) + CreateIntegerReducer: func() (query.IntegerPointAggregator, query.IntegerPointEmitter) { + fn := query.NewIntegerElapsedReducer(query.Interval{Duration: unit}) return fn, fn }, - CreateStringIntegerReducer: func() (influxql.StringPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewStringElapsedReducer(influxql.Interval{Duration: unit}) + CreateStringIntegerReducer: func() (query.StringPointAggregator, query.IntegerPointEmitter) { + fn := query.NewStringElapsedReducer(query.Interval{Duration: unit}) return fn, fn }, - CreateBooleanIntegerReducer: func() (influxql.BooleanPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewBooleanElapsedReducer(influxql.Interval{Duration: unit}) + CreateBooleanIntegerReducer: func() (query.BooleanPointAggregator, query.IntegerPointEmitter) { + fn := query.NewBooleanElapsedReducer(query.Interval{Duration: unit}) return fn, fn }, IsStreamTransformation: true, @@ -529,12 +519,12 @@ func (n *chainnode) Elapsed(field string, unit time.Duration) *InfluxQLNode { // Compute the difference between points independent of elapsed time. func (n *chainnode) Difference(field string) *InfluxQLNode { i := newInfluxQLNode("difference", field, n.Provides(), n.Provides(), ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatDifferenceReducer() + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatDifferenceReducer(false) return fn, fn }, - CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewIntegerDifferenceReducer() + CreateIntegerReducer: func() (query.IntegerPointAggregator, query.IntegerPointEmitter) { + fn := query.NewIntegerDifferenceReducer(false) return fn, fn }, IsStreamTransformation: true, @@ -547,12 +537,12 @@ func (n *chainnode) Difference(field string) *InfluxQLNode { // No points are emitted until the window is full. func (n *chainnode) MovingAverage(field string, window int64) *InfluxQLNode { i := newInfluxQLNode("movingAverage", field, n.Provides(), n.Provides(), ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatMovingAverageReducer(int(window)) + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatMovingAverageReducer(int(window)) return fn, fn }, - CreateIntegerFloatReducer: func() (influxql.IntegerPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewIntegerMovingAverageReducer(int(window)) + CreateIntegerFloatReducer: func() (query.IntegerPointAggregator, query.FloatPointEmitter) { + fn := query.NewIntegerMovingAverageReducer(int(window)) return fn, fn }, IsStreamTransformation: true, @@ -575,12 +565,12 @@ func (n *chainnode) HoltWintersWithFit(field string, h, m int64, interval time.D func (n *chainnode) holtWinters(field string, h, m int64, interval time.Duration, includeFitData bool) *InfluxQLNode { i := newInfluxQLNode("holtWinters", field, n.Provides(), BatchEdge, ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatHoltWintersReducer(int(h), int(m), includeFitData, interval) + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatHoltWintersReducer(int(h), int(m), includeFitData, interval) return fn, fn }, - CreateIntegerFloatReducer: func() (influxql.IntegerPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatHoltWintersReducer(int(h), int(m), includeFitData, interval) + CreateIntegerFloatReducer: func() (query.IntegerPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatHoltWintersReducer(int(h), int(m), includeFitData, interval) return fn, fn }, }) @@ -595,12 +585,12 @@ func (n *chainnode) holtWinters(field string, h, m int64, interval time.Duration // A point is emitted for every point collected. func (n *chainnode) CumulativeSum(field string) *InfluxQLNode { i := newInfluxQLNode("cumulativeSum", field, n.Provides(), n.Provides(), ReduceCreater{ - CreateFloatReducer: func() (influxql.FloatPointAggregator, influxql.FloatPointEmitter) { - fn := influxql.NewFloatCumulativeSumReducer() + CreateFloatReducer: func() (query.FloatPointAggregator, query.FloatPointEmitter) { + fn := query.NewFloatCumulativeSumReducer() return fn, fn }, - CreateIntegerReducer: func() (influxql.IntegerPointAggregator, influxql.IntegerPointEmitter) { - fn := influxql.NewIntegerCumulativeSumReducer() + CreateIntegerReducer: func() (query.IntegerPointAggregator, query.IntegerPointEmitter) { + fn := query.NewIntegerCumulativeSumReducer() return fn, fn }, IsStreamTransformation: true, diff --git a/pipeline/join.go b/pipeline/join.go index 2984e68ab..8ce432a92 100644 --- a/pipeline/join.go +++ b/pipeline/join.go @@ -6,7 +6,7 @@ import ( "strings" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" ) const ( diff --git a/pipeline/k8s_autoscale.go b/pipeline/k8s_autoscale.go index 840442fe6..3fa99ca6e 100644 --- a/pipeline/k8s_autoscale.go +++ b/pipeline/k8s_autoscale.go @@ -6,7 +6,7 @@ import ( "fmt" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" "github.com/influxdata/kapacitor/services/k8s/client" "github.com/influxdata/kapacitor/tick/ast" ) diff --git a/pipeline/sample.go b/pipeline/sample.go index c18205c11..feed9dd06 100644 --- a/pipeline/sample.go +++ b/pipeline/sample.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" ) // Sample points or batches. diff --git a/pipeline/shift.go b/pipeline/shift.go index 296b4c074..612c698e2 100644 --- a/pipeline/shift.go +++ b/pipeline/shift.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" ) // Shift points and batches in time, this is useful for comparing diff --git a/pipeline/state_tracking.go b/pipeline/state_tracking.go index 34515544e..3b77b7c33 100644 --- a/pipeline/state_tracking.go +++ b/pipeline/state_tracking.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" "github.com/influxdata/kapacitor/tick/ast" ) diff --git a/pipeline/stats.go b/pipeline/stats.go index b4636e0bf..898b29193 100644 --- a/pipeline/stats.go +++ b/pipeline/stats.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" ) // A StatsNode emits internal statistics about the another node at a given interval. diff --git a/pipeline/stream.go b/pipeline/stream.go index d6b958db0..b8013233f 100644 --- a/pipeline/stream.go +++ b/pipeline/stream.go @@ -6,7 +6,7 @@ import ( "reflect" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" "github.com/influxdata/kapacitor/tick/ast" ) diff --git a/pipeline/swarm_autoscale.go b/pipeline/swarm_autoscale.go index 52e846c35..a7a4c02fa 100644 --- a/pipeline/swarm_autoscale.go +++ b/pipeline/swarm_autoscale.go @@ -6,7 +6,7 @@ import ( "fmt" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" "github.com/influxdata/kapacitor/tick/ast" ) diff --git a/pipeline/udf.go b/pipeline/udf.go index f1225b82a..244707bc8 100644 --- a/pipeline/udf.go +++ b/pipeline/udf.go @@ -8,7 +8,7 @@ import ( "strconv" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" "github.com/influxdata/kapacitor/tick" "github.com/influxdata/kapacitor/udf/agent" ) diff --git a/pipeline/window.go b/pipeline/window.go index 875e3195e..ffd383294 100644 --- a/pipeline/window.go +++ b/pipeline/window.go @@ -6,7 +6,7 @@ import ( "fmt" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" ) // A `window` node caches data within a moving time range. diff --git a/query.go b/query.go index 8a81e4e22..9b0785c3f 100644 --- a/query.go +++ b/query.go @@ -4,7 +4,7 @@ import ( "fmt" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" "github.com/influxdata/kapacitor/tick/ast" "github.com/pkg/errors" ) diff --git a/query_test.go b/query_test.go index 30a2f88ad..875b12d22 100644 --- a/query_test.go +++ b/query_test.go @@ -15,6 +15,8 @@ func TestQuery_Clone(t *testing.T) { "SELECT mean(usage) FROM telegraf.autogen.cpu WHERE host = 'serverA' AND dc = 'slc'", "SELECT mean(usage) FROM telegraf.autogen.cpu WHERE host = 'serverA' AND dc = 'slc' OR product = 'login'", "SELECT mean(usage) FROM telegraf.autogen.cpu WHERE host = 'serverA' AND (dc = 'slc' OR product = 'login')", + "SELECT * from (SELECT usage FROM telegraf.autogen.cpu)", + "SELECT * INTO telegraf.autogen.cpu FROM (SELECT usage FROM telegraf.autogen.cpu)", } equal := func(q0, q1 *kapacitor.Query) error { diff --git a/replay.go b/replay.go index 53f955e0f..cdc199a5c 100644 --- a/replay.go +++ b/replay.go @@ -95,12 +95,18 @@ func readPointsFromIO(data io.ReadCloser, points chan<- edge.PointMessage, preci return err } mp := mps[0] + + mpfields, err := mp.Fields() + if err != nil { + return err + } + p := edge.NewPointMessage( - mp.Name(), + string(mp.Name()), //TODO(docmerlin): there has to be a better way of handling things so we don't allocate everywhere db, rp, models.Dimensions{}, - models.Fields(mp.Fields()), + models.Fields(mpfields), models.Tags(mp.Tags().Map()), mp.Time().UTC(), ) diff --git a/result.go b/result.go index 5a35ee3e2..4082e1588 100644 --- a/result.go +++ b/result.go @@ -6,11 +6,11 @@ import ( "io/ioutil" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxdb/query" ) // The result from an output. -type Result influxql.Result +type Result query.Result // Unmarshal a Result object from JSON. func ResultFromJSON(in io.Reader) (r Result) { diff --git a/server/server.go b/server/server.go index 23ac1c21d..5c324767a 100644 --- a/server/server.go +++ b/server/server.go @@ -12,11 +12,14 @@ import ( "sync" "time" - "github.com/influxdata/influxdb/influxql" + "go.uber.org/zap" + + "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/services/collectd" "github.com/influxdata/influxdb/services/graphite" "github.com/influxdata/influxdb/services/meta" "github.com/influxdata/influxdb/services/opentsdb" + "github.com/influxdata/influxql" "github.com/influxdata/kapacitor" "github.com/influxdata/kapacitor/auth" "github.com/influxdata/kapacitor/command" @@ -893,11 +896,7 @@ func (s *Server) appendCollectdService() error { return nil } srv := collectd.NewService(c) - w, err := s.DiagService.NewStaticLevelHandler("info", "collectd") - if err != nil { - return fmt.Errorf("failed to create static level handler for collectd: %v", err) - } - srv.SetLogOutput(w) + srv.WithLogger(s.DiagService.NewZapLogger(zapcore.InfoLevel)) srv.MetaClient = s.MetaClient srv.PointsWriter = s.TaskMaster @@ -915,11 +914,7 @@ func (s *Server) appendOpenTSDBService() error { if err != nil { return err } - w, err := s.DiagService.NewStaticLevelHandler("info", "opentsdb") - if err != nil { - return fmt.Errorf("failed to create static level handler for opentsdb: %v", err) - } - srv.SetLogOutput(w) + srv.WithLogger(s.DiagService.NewZapLogger(zap.InfoLevel).With(zap.String("service", "opentsdb"))) srv.PointsWriter = s.TaskMaster srv.MetaClient = s.MetaClient @@ -936,11 +931,7 @@ func (s *Server) appendGraphiteServices() error { if err != nil { return errors.Wrap(err, "creating new graphite service") } - w, err := s.DiagService.NewStaticLevelHandler("info", "graphite") - if err != nil { - return fmt.Errorf("failed to create static level handler for graphite: %v", err) - } - srv.SetLogOutput(w) + srv.WithLogger(s.DiagService.NewZapLogger(zap.InfoLevel).With(zap.String("service", "graphite"))) srv.PointsWriter = s.TaskMaster srv.MetaClient = s.MetaClient @@ -1387,6 +1378,6 @@ type Queryexecutor struct{} func (qe *Queryexecutor) Authorize(u *meta.UserInfo, q *influxql.Query, db string) error { return nil } -func (qe *Queryexecutor) ExecuteQuery(q *influxql.Query, db string, chunkSize int) (<-chan *influxql.Result, error) { +func (qe *Queryexecutor) ExecuteQuery(q *influxql.Query, db string, chunkSize int) (<-chan *query.Result, error) { return nil, errors.New("cannot execute queries against Kapacitor") } diff --git a/server/server_test.go b/server/server_test.go index b910fd4e0..c2f4aac39 100644 --- a/server/server_test.go +++ b/server/server_test.go @@ -31,9 +31,10 @@ import ( "github.com/google/go-cmp/cmp" "github.com/influxdata/flux/fluxinit" iclient "github.com/influxdata/influxdb/client/v2" - "github.com/influxdata/influxdb/influxql" imodels "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/toml" + "github.com/influxdata/influxql" "github.com/influxdata/kapacitor/alert" "github.com/influxdata/kapacitor/client/v1" "github.com/influxdata/kapacitor/command" @@ -3686,18 +3687,18 @@ test value=1 0000000012 } defer f.Close() type response struct { - ID string `json:"id"` - Message string `json:"message"` - Time time.Time `json:"time"` - Level string `json:"level"` - Data influxql.Result `json:"data"` + ID string `json:"id"` + Message string `json:"message"` + Time time.Time `json:"time"` + Level string `json:"level"` + Data query.Result `json:"data"` } exp := response{ ID: "test-count", Message: "test-count got: 15", Time: time.Date(1970, 1, 1, 0, 0, 10, 0, time.UTC), Level: "CRITICAL", - Data: influxql.Result{ + Data: query.Result{ Series: imodels.Rows{ { Name: "test", @@ -3893,18 +3894,18 @@ test value=1 0000000012 } defer f.Close() type response struct { - ID string `json:"id"` - Message string `json:"message"` - Time time.Time `json:"time"` - Level string `json:"level"` - Data influxql.Result `json:"data"` + ID string `json:"id"` + Message string `json:"message"` + Time time.Time `json:"time"` + Level string `json:"level"` + Data query.Result `json:"data"` } exp := response{ ID: "test-count", Message: "test-count got: 15", Time: time.Date(1970, 1, 1, 0, 0, 10, 0, time.UTC), Level: "CRITICAL", - Data: influxql.Result{ + Data: query.Result{ Series: imodels.Rows{ { Name: "test", @@ -4096,11 +4097,11 @@ func TestServer_RecordReplayBatch(t *testing.T) { } defer f.Close() type response struct { - ID string `json:"id"` - Message string `json:"message"` - Time time.Time `json:"time"` - Level string `json:"level"` - Data influxql.Result `json:"data"` + ID string `json:"id"` + Message string `json:"message"` + Time time.Time `json:"time"` + Level string `json:"level"` + Data query.Result `json:"data"` } exp := []response{ { @@ -4108,7 +4109,7 @@ func TestServer_RecordReplayBatch(t *testing.T) { Message: "test-batch got: 3", Time: time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), Level: "CRITICAL", - Data: influxql.Result{ + Data: query.Result{ Series: imodels.Rows{ { Name: "cpu", @@ -4132,7 +4133,7 @@ func TestServer_RecordReplayBatch(t *testing.T) { Message: "test-batch got: 4", Time: time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), Level: "CRITICAL", - Data: influxql.Result{ + Data: query.Result{ Series: imodels.Rows{ { Name: "cpu", @@ -4309,11 +4310,11 @@ func TestServer_ReplayBatch(t *testing.T) { } defer f.Close() type response struct { - ID string `json:"id"` - Message string `json:"message"` - Time time.Time `json:"time"` - Level string `json:"level"` - Data influxql.Result `json:"data"` + ID string `json:"id"` + Message string `json:"message"` + Time time.Time `json:"time"` + Level string `json:"level"` + Data query.Result `json:"data"` } exp := []response{ { @@ -4321,7 +4322,7 @@ func TestServer_ReplayBatch(t *testing.T) { Message: "test-batch got: 3", Time: time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), Level: "CRITICAL", - Data: influxql.Result{ + Data: query.Result{ Series: imodels.Rows{ { Name: "cpu", @@ -4345,7 +4346,7 @@ func TestServer_ReplayBatch(t *testing.T) { Message: "test-batch got: 4", Time: time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), Level: "CRITICAL", - Data: influxql.Result{ + Data: query.Result{ Series: imodels.Rows{ { Name: "cpu", @@ -4562,11 +4563,11 @@ func TestServer_RecordReplayQuery(t *testing.T) { } defer f.Close() type response struct { - ID string `json:"id"` - Message string `json:"message"` - Time time.Time `json:"time"` - Level string `json:"level"` - Data influxql.Result `json:"data"` + ID string `json:"id"` + Message string `json:"message"` + Time time.Time `json:"time"` + Level string `json:"level"` + Data query.Result `json:"data"` } exp := []response{ { @@ -4574,7 +4575,7 @@ func TestServer_RecordReplayQuery(t *testing.T) { Message: "test-batch got: 3", Time: time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), Level: "CRITICAL", - Data: influxql.Result{ + Data: query.Result{ Series: imodels.Rows{ { Name: "cpu", @@ -4598,7 +4599,7 @@ func TestServer_RecordReplayQuery(t *testing.T) { Message: "test-batch got: 4", Time: time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), Level: "CRITICAL", - Data: influxql.Result{ + Data: query.Result{ Series: imodels.Rows{ { Name: "cpu", @@ -4846,11 +4847,11 @@ func TestServer_ReplayQuery(t *testing.T) { } defer f.Close() type response struct { - ID string `json:"id"` - Message string `json:"message"` - Time time.Time `json:"time"` - Level string `json:"level"` - Data influxql.Result `json:"data"` + ID string `json:"id"` + Message string `json:"message"` + Time time.Time `json:"time"` + Level string `json:"level"` + Data query.Result `json:"data"` } exp := []response{ { @@ -4858,7 +4859,7 @@ func TestServer_ReplayQuery(t *testing.T) { Message: "test-batch got: 3", Time: time.Date(1971, 1, 1, 0, 0, 3, 0, time.UTC), Level: "CRITICAL", - Data: influxql.Result{ + Data: query.Result{ Series: imodels.Rows{ { Name: "cpu", @@ -4882,7 +4883,7 @@ func TestServer_ReplayQuery(t *testing.T) { Message: "test-batch got: 4", Time: time.Date(1971, 1, 1, 0, 0, 4, 0, time.UTC), Level: "CRITICAL", - Data: influxql.Result{ + Data: query.Result{ Series: imodels.Rows{ { Name: "cpu", diff --git a/services/httpd/handler.go b/services/httpd/handler.go index e72d6254b..76f345bab 100644 --- a/services/httpd/handler.go +++ b/services/httpd/handler.go @@ -15,8 +15,8 @@ import ( "github.com/golang-jwt/jwt" "github.com/influxdata/influxdb" - "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxdb/query" "github.com/influxdata/influxdb/uuid" "github.com/influxdata/kapacitor/auth" "github.com/influxdata/kapacitor/client/v1" @@ -412,7 +412,7 @@ func (h *Handler) serve404(w http.ResponseWriter, r *http.Request) { HttpError(w, "Not Found", true, http.StatusNotFound) } -func (h *Handler) writeError(w http.ResponseWriter, result influxql.Result, statusCode int) { +func (h *Handler) writeError(w http.ResponseWriter, result query.Result, statusCode int) { w.WriteHeader(statusCode) w.Write([]byte(result.Err.Error())) w.Write([]byte("\n")) @@ -437,7 +437,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user auth.U if r.Header.Get("Content-encoding") == "gzip" { b, err := gzip.NewReader(r.Body) if err != nil { - h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest) + h.writeError(w, query.Result{Err: err}, http.StatusBadRequest) return } body = b @@ -449,7 +449,7 @@ func (h *Handler) serveWrite(w http.ResponseWriter, r *http.Request, user auth.U if h.writeTrace { h.diag.Error("write handler unabled to read bytes from request body", err) } - h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest) + h.writeError(w, query.Result{Err: err}, http.StatusBadRequest) return } h.statMap.Add(statWriteRequestBytesReceived, int64(len(b))) @@ -474,13 +474,13 @@ func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body [] w.WriteHeader(http.StatusOK) return } - h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest) + h.writeError(w, query.Result{Err: err}, http.StatusBadRequest) return } database := qp.Get("db") if database == "" { - h.writeError(w, influxql.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest) + h.writeError(w, query.Result{Err: fmt.Errorf("database is required")}, http.StatusBadRequest) return } @@ -489,7 +489,7 @@ func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body [] Privilege: auth.WritePrivilege, } if err := user.AuthorizeAction(action); err != nil { - h.writeError(w, influxql.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name(), database)}, http.StatusUnauthorized) + h.writeError(w, query.Result{Err: fmt.Errorf("%q user is not authorized to write to database %q", user.Name(), database)}, http.StatusUnauthorized) return } @@ -501,11 +501,11 @@ func (h *Handler) serveWriteLine(w http.ResponseWriter, r *http.Request, body [] points, ); influxdb.IsClientError(err) { h.statMap.Add(statPointsWrittenFail, int64(len(points))) - h.writeError(w, influxql.Result{Err: err}, http.StatusBadRequest) + h.writeError(w, query.Result{Err: err}, http.StatusBadRequest) return } else if err != nil { h.statMap.Add(statPointsWrittenFail, int64(len(points))) - h.writeError(w, influxql.Result{Err: err}, http.StatusInternalServerError) + h.writeError(w, query.Result{Err: err}, http.StatusInternalServerError) return } @@ -576,7 +576,7 @@ func HttpError(w http.ResponseWriter, err string, pretty bool, code int) { w.Write(b) } -func resultError(w http.ResponseWriter, result influxql.Result, code int) { +func resultError(w http.ResponseWriter, result query.Result, code int) { w.WriteHeader(code) _ = json.NewEncoder(w).Encode(&result) } diff --git a/services/influxdb/service.go b/services/influxdb/service.go index dc45d95cb..b508515b4 100644 --- a/services/influxdb/service.go +++ b/services/influxdb/service.go @@ -19,8 +19,8 @@ import ( "time" "github.com/cenkalti/backoff" - "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxql" khttp "github.com/influxdata/kapacitor/http" "github.com/influxdata/kapacitor/influxdb" "github.com/influxdata/kapacitor/keyvalue" diff --git a/services/influxdb/service_test.go b/services/influxdb/service_test.go index 5a76850ce..f9ad19fe3 100644 --- a/services/influxdb/service_test.go +++ b/services/influxdb/service_test.go @@ -14,8 +14,8 @@ import ( "time" "github.com/influxdata/flux" - "github.com/influxdata/influxdb/influxql" "github.com/influxdata/influxdb/models" + "github.com/influxdata/influxql" influxcli "github.com/influxdata/kapacitor/influxdb" "github.com/influxdata/kapacitor/services/diagnostic" "github.com/influxdata/kapacitor/services/httpd" diff --git a/services/replay/service.go b/services/replay/service.go index 97b2a99cf..8bb964818 100644 --- a/services/replay/service.go +++ b/services/replay/service.go @@ -17,7 +17,7 @@ import ( "strings" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" "github.com/influxdata/kapacitor" kclient "github.com/influxdata/kapacitor/client/v1" "github.com/influxdata/kapacitor/clock" diff --git a/services/task_store/util_test.go b/services/task_store/util_test.go index b903a04b9..5035a463d 100644 --- a/services/task_store/util_test.go +++ b/services/task_store/util_test.go @@ -101,6 +101,15 @@ func TestTaskTypeFromProgram(t *testing.T) { `, taskType: client.BatchTask, }, + { + name: "var batch subquery", + tickscript: `dbrp "telegraf"."autogen" + + var x = batch|query('SELECT * FROM (SELECT * FROM "telegraf"."autogen"."mymeas")') + `, + taskType: client.BatchTask, + }, + { name: "mixed type", tickscript: `dbrp "telegraf"."autogen" diff --git a/task_master.go b/task_master.go index 5fbfd8f32..723cc36fc 100644 --- a/task_master.go +++ b/task_master.go @@ -7,6 +7,8 @@ import ( "sync" "time" + "github.com/influxdata/influxdb/tsdb" + imodels "github.com/influxdata/influxdb/models" "github.com/influxdata/kapacitor/alert" "github.com/influxdata/kapacitor/command" @@ -267,6 +269,10 @@ type TaskMaster struct { wg sync.WaitGroup } +func (tm *TaskMaster) WritePointsPrivileged(ctx tsdb.WriteContext, database, retentionPolicy string, consistencyLevel imodels.ConsistencyLevel, points []imodels.Point) error { + panic("not implemented") // we shouldn't need this. +} + type forkKey struct { Database string RetentionPolicy string @@ -782,17 +788,22 @@ func (tm *TaskMaster) WritePoints(database, retentionPolicy string, consistencyL if retentionPolicy == "" { retentionPolicy = tm.DefaultRetentionPolicy } + for _, mp := range points { + mpFields, err := mp.Fields() + if err != nil { + return err + } p := edge.NewPointMessage( - mp.Name(), + string(mp.Name()), database, retentionPolicy, models.Dimensions{}, - models.Fields(mp.Fields()), + models.Fields(mpFields), models.Tags(mp.Tags().Map()), mp.Time(), ) - err := tm.writePointsIn.CollectPoint(p) + err = tm.writePointsIn.CollectPoint(p) if err != nil { return err } diff --git a/tick/ast/json.go b/tick/ast/json.go index 820ed460f..436c6bf55 100644 --- a/tick/ast/json.go +++ b/tick/ast/json.go @@ -5,7 +5,7 @@ import ( "regexp" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" ) // NodeTypeOf is used by all Node to identify the node duration Marshal and Unmarshal diff --git a/tick/ast/node.go b/tick/ast/node.go index bb996086b..84e8f6d21 100644 --- a/tick/ast/node.go +++ b/tick/ast/node.go @@ -10,7 +10,7 @@ import ( "strings" "time" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" ) // Indent string for formatted TICKscripts diff --git a/tick/stateful/functions.go b/tick/stateful/functions.go index b9b32375f..ddfe90263 100644 --- a/tick/stateful/functions.go +++ b/tick/stateful/functions.go @@ -11,7 +11,7 @@ import ( "time" humanize "github.com/dustin/go-humanize" - "github.com/influxdata/influxdb/influxql" + "github.com/influxdata/influxql" "github.com/influxdata/kapacitor/tick/ast" "github.com/zeebo/mwc" )