Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add synchronous jobs #10

Merged
merged 6 commits into from
Jan 26, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ language: go

go:
- 1.12
- 1.13
- tip

env:
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
0.3.0
0.4.0
18 changes: 16 additions & 2 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"io/ioutil"
"net/url"
"os"
"sync"
"time"
Expand All @@ -13,6 +12,20 @@ import (
"gopkg.in/yaml.v2"
)

var (
failedScrapes = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: "sql_exporter_last_scrape_failed",
Help: "Failed scrapes",
},
[]string{"driver", "host", "database", "user", "sql_job", "query"},
)
)

func init() {
prometheus.MustRegister(failedScrapes)
}

// Read attempts to parse the given config and return a file
// object
func Read(path string) (File, error) {
Expand Down Expand Up @@ -55,7 +68,7 @@ type Job struct {

type connection struct {
conn *sqlx.DB
url *url.URL
url string
driver string
host string
database string
Expand All @@ -68,6 +81,7 @@ type Query struct {
log log.Logger
desc *prometheus.Desc
metrics map[*connection][]prometheus.Metric
jobName string
Name string `yaml:"name"` // the prometheus metric name
Help string `yaml:"help"` // the prometheus metric help text
Labels []string `yaml:"labels"` // expose these columns as labels per gauge
Expand Down
10 changes: 0 additions & 10 deletions config.yml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -109,16 +109,6 @@ jobs:
, idx_blks_read::float
, idx_blks_hit::float
FROM pg_statio_user_tables;
queries:
pg_statio_user_tables: |
SELECT
schemaname::text
, relname::text
, heap_blks_read::float
, heap_blks_hit::float
, idx_blks_read::float
, idx_blks_hit::float
FROM pg_statio_user_tables;
- name: "athena"
interval: '5m'
connections:
Expand Down
35 changes: 30 additions & 5 deletions job.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
_ "github.com/denisenkom/go-mssqldb" // register the MS-SQL driver
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
_ "github.com/go-sql-driver/mysql" // register the MySQL driver
"github.com/go-sql-driver/mysql" // register the MySQL driver
"github.com/jmoiron/sqlx"
_ "github.com/lib/pq" // register the PostgreSQL driver
"github.com/prometheus/client_golang/prometheus"
Expand All @@ -35,6 +35,7 @@ func (j *Job) Init(logger log.Logger, queries map[string]string) error {
continue
}
q.log = log.With(j.log, "query", q.Name)
q.jobName = j.Name
if q.Query == "" && q.QueryRef != "" {
if qry, found := queries[q.QueryRef]; found {
q.Query = qry
Expand Down Expand Up @@ -86,6 +87,23 @@ func (j *Job) Run() {
// parse the connection URLs and create an connection object for each
if len(j.conns) < len(j.Connections) {
for _, conn := range j.Connections {
// MySQL DSNs do not parse cleanly as URLs as of Go 1.12.8+
if strings.HasPrefix(conn, "mysql://") {
config, err := mysql.ParseDSN(strings.TrimPrefix(conn, "mysql://"))
if err != nil {
level.Error(j.log).Log("msg", "Failed to parse MySQL DSN", "url", conn, "err", err)
}

j.conns = append(j.conns, &connection{
conn: nil,
url: conn,
driver: "mysql",
host: config.Addr,
database: config.DBName,
user: config.User,
})
continue
}
u, err := url.Parse(conn)
if err != nil {
level.Error(j.log).Log("msg", "Failed to parse URL", "url", conn, "err", err)
Expand All @@ -99,7 +117,7 @@ func (j *Job) Run() {
// remember them
newConn := &connection{
conn: nil,
url: u,
url: conn,
driver: u.Scheme,
host: u.Host,
database: strings.TrimPrefix(u.Path, "/"),
Expand Down Expand Up @@ -142,6 +160,7 @@ func (j *Job) runOnceConnection(conn *connection, done chan int) {
// connect to DB if not connected already
if err := conn.connect(j); err != nil {
level.Warn(j.log).Log("msg", "Failed to connect", "err", err)
j.markFailed(conn)
return
}

Expand All @@ -165,6 +184,12 @@ func (j *Job) runOnceConnection(conn *connection, done chan int) {
}
}

func (j *Job) markFailed(conn *connection) {
for _, q := range j.Queries {
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
}
}

func (j *Job) runOnce() error {
doneChan := make(chan int, len(j.conns))

Expand All @@ -190,14 +215,14 @@ func (c *connection) connect(job *Job) error {
if c.conn != nil {
return nil
}
dsn := c.url.String()
switch c.url.Scheme {
dsn := c.url
switch c.driver {
case "mysql":
dsn = strings.TrimPrefix(dsn, "mysql://")
case "clickhouse":
dsn = "tcp://" + strings.TrimPrefix(dsn, "clickhouse://")
}
conn, err := sqlx.Connect(c.url.Scheme, dsn)
conn, err := sqlx.Connect(c.driver, dsn)
if err != nil {
return err
}
Expand Down
8 changes: 4 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ func main() {

// init logger
logger := log.NewJSONLogger(os.Stdout)
logger = log.With(logger,
"ts", log.DefaultTimestampUTC,
"caller", log.DefaultCaller,
)
// set the allowed log level filter
switch strings.ToLower(os.Getenv("LOGLEVEL")) {
case "debug":
Expand All @@ -52,6 +48,10 @@ func main() {
default:
logger = level.NewFilter(logger, level.AllowAll())
}
logger = log.With(logger,
"ts", log.DefaultTimestampUTC,
"caller", log.DefaultCaller,
)

logger.Log("msg", "Starting sql_exporter", "version_info", version.Info(), "build_context", version.BuildContext())

Expand Down
4 changes: 4 additions & 0 deletions query.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func (q *Query) Run(conn *connection) error {
// execute query
rows, err := conn.conn.Queryx(q.Query)
if err != nil {
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
return err
}
defer rows.Close()
Expand All @@ -37,15 +38,18 @@ func (q *Query) Run(conn *connection) error {
err := rows.MapScan(res)
if err != nil {
level.Error(q.log).Log("msg", "Failed to scan", "err", err, "host", conn.host, "db", conn.database)
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
continue
}
m, err := q.updateMetrics(conn, res)
if err != nil {
level.Error(q.log).Log("msg", "Failed to update metrics", "err", err, "host", conn.host, "db", conn.database)
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(1.0)
continue
}
metrics = append(metrics, m...)
updated++
failedScrapes.WithLabelValues(conn.driver, conn.host, conn.database, conn.user, q.jobName, q.Name).Set(0.0)
}

if updated < 1 {
Expand Down