Skip to content

Commit

Permalink
feat: collect MV_ACTIVE_DISTRIBUTED_TRANSACTIONS
Browse files Browse the repository at this point in the history
- add exception patterns when collect processlist
  • Loading branch information
levin-kitty committed Sep 19, 2024
1 parent 8c72ec7 commit 39d8fdd
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 28 deletions.
26 changes: 14 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,20 @@ For systemd integration, refer to the example service files under the deploy/ fo

## Flags

| flag | description | default |
|-----------------------------------------|------------------------------------------------------|-------------------------------|
| collect.slow_query | Collect slow query metrics | false |
| collect.slow_query.threshold | Slow query threshold in seconds | 10 |
| collect.slow_query.log_path | Path to slow query log | "" (logs only to the console) |
| collect.replication_status | Collect replication status metrics | false |
| collect.data_disk_usage | Collect disk usage per database | false |
| collect.data_disk_usage.scrape_interval | Collect interval of disk usage per database | 30 |
| net.listen_address | Address to listen on for web interface and telemetry | 0.0.0.0:9105 |
| log.log_path | Log path | "" (logs only to the console) |
| log.level | Log level (info, warn, error, fatal, panic) | info |
| debug.pprof | Enable pprof | false |
| flag | description | default |
|--------------------------------------------|------------------------------------------------------|-------------------------------|
| collect.slow_query | Collect slow query metrics | false |
| collect.slow_query.threshold | Slow query threshold in seconds | 10 |
| collect.slow_query.log_path | Path to slow query log | "" (logs only to the console) |
| collect.slow_query.exception.hosts | Hosts to exclude from slow query metrics | "" |
| collect.slow_query.exception.info.patterns | Patterns of query to exclude from slow query metrics | "" |
| collect.replication_status | Collect replication status metrics | false |
| collect.data_disk_usage | Collect disk usage per database | false |
| collect.data_disk_usage.scrape_interval | Collect interval of disk usage per database | 30 |
| net.listen_address | Address to listen on for web interface and telemetry | 0.0.0.0:9105 |
| log.log_path | Log path | "" (logs only to the console) |
| log.level | Log level (info, warn, error, fatal, panic) | info |
| debug.pprof | Enable pprof | false |

## License

Expand Down
114 changes: 114 additions & 0 deletions collector/active_transaction.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package collector

import (
"github.com/jmoiron/sqlx"
"github.com/prometheus/client_golang/prometheus"
"singlestore_exporter/log"
"strings"
)

type ActiveDistributedTransactionsView struct {
TableSchema string `db:"TABLE_SCHEMA"`
TableName string `db:"TABLE_NAME"`
}

type ActiveDistributedTransactions struct {
PartitionName string `db:"PARTITION_NAME"`
Count int `db:"COUNT"`
TimeMax int `db:"TIME_MAX"`
RowLocksTotal int `db:"ROW_LOCKS_TOTAL"`
Database string
}

const (
activeTransaction = "active_transaction"

infoSchemaActiveDistributedTransactionsViewExistsQuery = `SELECT TABLE_SCHEMA, TABLE_NAME
FROM information_schema.TABLES
WHERE TABLE_SCHEMA = 'information_schema' AND TABLE_NAME LIKE 'MV_ACTIVE_DISTRIBUTED_TRANSACTIONS'`

infoSchemaActiveDistributedTransactionsQuery = `SELECT
NVL(DATABASE_NAME, '') AS PARTITION_NAME,
COUNT(*) AS COUNT,
MAX(TIMESTAMPDIFF(SECOND, NVL(REAL_CLOCK_BEGIN_TIME_STAMP, NOW()), NOW()) ) AS TIME_MAX,
SUM(ROW_LOCKS) AS ROW_LOCKS_TOTAL
FROM information_schema.MV_ACTIVE_DISTRIBUTED_TRANSACTIONS
WHERE PARTITION_NAME != ''
GROUP BY PARTITION_NAME;`
)

var (
activeTransactionCountDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, activeTransaction, "count"),
"The number of active transactions per partition",
[]string{"database", "partition_name"},
nil,
)

activeTransactionTimeMaxDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, activeTransaction, "time_max"),
"The max time in seconds since the transaction started",
[]string{"database", "partition_name"},
nil,
)

activeTransactionRowLocksTotalDesc = prometheus.NewDesc(
prometheus.BuildFQName(namespace, activeTransaction, "row_locks_total"),
"The count of row locks per database",
[]string{"database", "partition_name"},
nil,
)
)

type ScrapeActiveTransactions struct{}

func (s *ScrapeActiveTransactions) Help() string {
return "Collect active transactions"
}

func (s *ScrapeActiveTransactions) Scrape(db *sqlx.DB, ch chan<- prometheus.Metric) {
if db == nil {
return
}

views := make([]ActiveDistributedTransactionsView, 0)
if err := db.Select(&views, infoSchemaActiveDistributedTransactionsViewExistsQuery); err != nil {
log.ErrorLogger.Errorf("checking existence view query failed: query=%s error=%v", infoSchemaActiveDistributedTransactionsViewExistsQuery, err)
} else if len(views) == 0 {
return
}

activeTransactionList := make([]ActiveDistributedTransactions, 0)
if err := db.Select(&activeTransactionList, infoSchemaActiveDistributedTransactionsQuery); err != nil {
log.ErrorLogger.Errorf("scraping query failed: query=%s error=%v", infoSchemaActiveDistributedTransactionsQuery, err)
return
}

for i := range activeTransactionList {
if postfix := strings.LastIndex(activeTransactionList[i].PartitionName, "_"); postfix != -1 {
activeTransactionList[i].Database = activeTransactionList[i].PartitionName[:strings.LastIndex(activeTransactionList[i].PartitionName, "_")]
} else {
activeTransactionList[i].Database = activeTransactionList[i].PartitionName
}
}

for _, activeTransaction := range activeTransactionList {
ch <- prometheus.MustNewConstMetric(
activeTransactionCountDesc, prometheus.GaugeValue, float64(activeTransaction.Count),
activeTransaction.Database,
activeTransaction.PartitionName,
)

ch <- prometheus.MustNewConstMetric(
activeTransactionTimeMaxDesc, prometheus.GaugeValue, float64(activeTransaction.TimeMax),
activeTransaction.Database,
activeTransaction.PartitionName,
)

ch <- prometheus.MustNewConstMetric(
activeTransactionRowLocksTotalDesc, prometheus.GaugeValue, float64(activeTransaction.RowLocksTotal),
activeTransaction.Database,
activeTransaction.PartitionName,
)
}
}
8 changes: 7 additions & 1 deletion collector/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@ func New(
flagSlowQueryThreshold int,
flagReplicationStatus bool,
flagDataDiskUsage bool,
flagActiveTransactionPtr bool,
slowQueryExceptionHosts []string,
slowQueryExceptionInfoPatterns []string,
) *Exporter {
scrapers := []Scraper{
&ScrapeNodes{},
Expand All @@ -54,11 +57,14 @@ func New(
&ScrapeCachedBlobs{},
)
if flagSlowQuery {
scrapers = append(scrapers, &ScrapeProcessList{Threshold: flagSlowQueryThreshold})
scrapers = append(scrapers, NewScrapeProcessList(flagSlowQueryThreshold, slowQueryExceptionHosts, slowQueryExceptionInfoPatterns))
}
if flagReplicationStatus {
scrapers = append(scrapers, &ScrapeReplicationStatus{})
}
if flagActiveTransactionPtr {
scrapers = append(scrapers, &ScrapeActiveTransactions{})
}
}
if flagDataDiskUsage {
scrapers = append(scrapers, &ScrapeDataDiskUsage{})
Expand Down
41 changes: 29 additions & 12 deletions collector/processlist.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
)

var systemUsers = map[string]bool{
"kadba": true,
"kamon": true,
"distributed": true,
}

Expand Down Expand Up @@ -42,9 +40,7 @@ const (
process = "process"

// length of INFO is limited to 1000 characters to avoid memory overflow
infoSchemaProcessListQuery = `
SELECT ID, USER, HOST, DB, COMMAND, TIME, STATE, LEFT(INFO, 1000) AS INFO, RPC_INFO, PLAN_ID, TRANSACTION_STATE, ROW_LOCKS_HELD, PARTITION_LOCKS_HELD, EPOCH, LWPID, RESOURCE_POOL, STMT_VERSION, REASON_FOR_QUEUEING,
DATE_SUB(now(), INTERVAL time SECOND) AS SUBMITTED_TIME
infoSchemaProcessListQuery = `SELECT ID, USER, HOST, DB, COMMAND, TIME, STATE, LEFT(INFO, 1000) AS INFO, RPC_INFO, PLAN_ID, TRANSACTION_STATE, ROW_LOCKS_HELD, PARTITION_LOCKS_HELD, EPOCH, LWPID, RESOURCE_POOL, STMT_VERSION, REASON_FOR_QUEUEING, DATE_SUB(now(), INTERVAL time SECOND) AS SUBMITTED_TIME
FROM information_schema.PROCESSLIST`
)

Expand All @@ -66,6 +62,27 @@ var (

type ScrapeProcessList struct {
Threshold int
Query string
}

func NewScrapeProcessList(threshold int, exceptionHosts []string, exceptionInfoPatterns []string) *ScrapeProcessList {
query := infoSchemaProcessListQuery
if len(exceptionHosts) != 0 || len(exceptionInfoPatterns) != 0 {
query += "\nWHERE "
}
for _, host := range exceptionHosts {
query += "HOST NOT LIKE '" + host + ":%' AND "
}
for _, pattern := range exceptionInfoPatterns {
query += "NVL(INFO, '') NOT LIKE '%" + pattern + "%' AND "
}
if len(exceptionHosts) != 0 || len(exceptionInfoPatterns) != 0 {
query = query[:len(query)-5]
}
return &ScrapeProcessList{
Threshold: threshold,
Query: query,
}
}

func (s *ScrapeProcessList) Help() string {
Expand All @@ -78,12 +95,12 @@ func (s *ScrapeProcessList) Scrape(db *sqlx.DB, ch chan<- prometheus.Metric) {
}

processList := make([]Process, 0)
if err := db.Select(&processList, infoSchemaProcessListQuery); err != nil {
log.ErrorLogger.Errorf("scraping query failed: query=%s error=%v", infoSchemaProcessListQuery, err)
if err := db.Select(&processList, s.Query); err != nil {
log.ErrorLogger.Errorf("scraping query failed: query=%s error=%v", s.Query, err)
return
}

max := make(map[string]int)
maxTime := make(map[string]int)
counter := make(map[string]int)
for _, process := range processList {
if _, exists := systemUsers[process.User]; exists {
Expand All @@ -94,10 +111,10 @@ func (s *ScrapeProcessList) Scrape(db *sqlx.DB, ch chan<- prometheus.Metric) {
continue
}

if m, exists := max[process.User]; !exists {
max[process.User] = process.Time
if m, exists := maxTime[process.User]; !exists {
maxTime[process.User] = process.Time
} else if process.Time > m {
max[process.User] = process.Time
maxTime[process.User] = process.Time
}

counter[process.User]++
Expand All @@ -116,7 +133,7 @@ func (s *ScrapeProcessList) Scrape(db *sqlx.DB, ch chan<- prometheus.Metric) {
}).Info("slow query detected")
}

for user, maxTime := range max {
for user, maxTime := range maxTime {
ch <- prometheus.MustNewConstMetric(
processListTimeMaxDesc, prometheus.GaugeValue, float64(maxTime),
user,
Expand Down
58 changes: 58 additions & 0 deletions collector/processlist_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package collector

import (
"github.com/stretchr/testify/assert"
"testing"
)

func TestNewScrapeProcessList(t *testing.T) {
tt := []struct {
threshold int
hosts []string
infoPatterns []string
expectedThreshold int
expectedQuery string
}{
{
threshold: 1,
expectedThreshold: 1,
hosts: []string{},
infoPatterns: []string{},
expectedQuery: `SELECT ID, USER, HOST, DB, COMMAND, TIME, STATE, LEFT(INFO, 1000) AS INFO, RPC_INFO, PLAN_ID, TRANSACTION_STATE, ROW_LOCKS_HELD, PARTITION_LOCKS_HELD, EPOCH, LWPID, RESOURCE_POOL, STMT_VERSION, REASON_FOR_QUEUEING, DATE_SUB(now(), INTERVAL time SECOND) AS SUBMITTED_TIME
FROM information_schema.PROCESSLIST`,
},
{
threshold: 2,
expectedThreshold: 2,
hosts: []string{"host1", "host2"},
infoPatterns: []string{},
expectedQuery: `SELECT ID, USER, HOST, DB, COMMAND, TIME, STATE, LEFT(INFO, 1000) AS INFO, RPC_INFO, PLAN_ID, TRANSACTION_STATE, ROW_LOCKS_HELD, PARTITION_LOCKS_HELD, EPOCH, LWPID, RESOURCE_POOL, STMT_VERSION, REASON_FOR_QUEUEING, DATE_SUB(now(), INTERVAL time SECOND) AS SUBMITTED_TIME
FROM information_schema.PROCESSLIST
WHERE HOST NOT LIKE 'host1:%' AND HOST NOT LIKE 'host2:%'`,
},
{
threshold: 3,
expectedThreshold: 3,
hosts: []string{},
infoPatterns: []string{"pattern1", "pattern2"},
expectedQuery: `SELECT ID, USER, HOST, DB, COMMAND, TIME, STATE, LEFT(INFO, 1000) AS INFO, RPC_INFO, PLAN_ID, TRANSACTION_STATE, ROW_LOCKS_HELD, PARTITION_LOCKS_HELD, EPOCH, LWPID, RESOURCE_POOL, STMT_VERSION, REASON_FOR_QUEUEING, DATE_SUB(now(), INTERVAL time SECOND) AS SUBMITTED_TIME
FROM information_schema.PROCESSLIST
WHERE NVL(INFO, '') NOT LIKE '%pattern1%' AND NVL(INFO, '') NOT LIKE '%pattern2%'`,
},
{
threshold: 4,
expectedThreshold: 4,
hosts: []string{"host1", "host2"},
infoPatterns: []string{"pattern1", "pattern2"},
expectedQuery: `SELECT ID, USER, HOST, DB, COMMAND, TIME, STATE, LEFT(INFO, 1000) AS INFO, RPC_INFO, PLAN_ID, TRANSACTION_STATE, ROW_LOCKS_HELD, PARTITION_LOCKS_HELD, EPOCH, LWPID, RESOURCE_POOL, STMT_VERSION, REASON_FOR_QUEUEING, DATE_SUB(now(), INTERVAL time SECOND) AS SUBMITTED_TIME
FROM information_schema.PROCESSLIST
WHERE HOST NOT LIKE 'host1:%' AND HOST NOT LIKE 'host2:%' AND NVL(INFO, '') NOT LIKE '%pattern1%' AND NVL(INFO, '') NOT LIKE '%pattern2%'`,
},
}

for _, tc := range tt {
scraper := NewScrapeProcessList(tc.threshold, tc.hosts, tc.infoPatterns)
assert.Equal(t, tc.expectedThreshold, scraper.Threshold)
assert.Equal(t, tc.expectedQuery, scraper.Query)
}
}
3 changes: 3 additions & 0 deletions deploy/singlestore_exporter.service.aggregator
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ ExecStart=/opt/exporters/bin/singlestore_exporter \
--collect.slow_query=true \
--collect.slow_query.threshold=10 \
--collect.slow_query.log_path=/opt/exporters/logs/singlestore_exporter_slow_query.log \
--collect.slow_query.exception.hosts=localhost \
--collect.slow_query.exception.info.patterns=FOREGROUND \
--collect.active_transaction=true \
--collect.replication_status=false

[Install]
Expand Down
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/jmoiron/sqlx v1.3.5
github.com/prometheus/client_golang v1.19.0
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.8.4
github.com/x-cray/logrus-prefixed-formatter v0.5.2
gopkg.in/natefinch/lumberjack.v2 v2.2.1
)
Expand All @@ -15,18 +16,21 @@ require (
filippo.io/edwards25519 v1.1.0 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fsnotify/fsnotify v1.6.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mgutz/ansi v0.0.0-20200706080929-d51e80ef957d // indirect
github.com/onsi/ginkgo v1.16.5 // indirect
github.com/onsi/gomega v1.28.0 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.52.3 // indirect
github.com/prometheus/procfs v0.13.0 // indirect
github.com/stretchr/testify v1.8.4 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/term v0.19.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand All @@ -30,6 +31,10 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jmoiron/sqlx v1.3.5 h1:vFFPA71p1o5gAeqtEAwLU4dnX2napprKtHr7PYIcN3g=
github.com/jmoiron/sqlx v1.3.5/go.mod h1:nRVWtLre0KfCLJvgxzCsLVMogSvQ1zNJtpYr2Ccp0mQ=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/lib/pq v1.2.0 h1:LXpIM/LZ5xGFhOpXAQUIMM1HdyqzVYM13zNdjCEEcA0=
github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
Expand Down Expand Up @@ -62,6 +67,8 @@ github.com/prometheus/common v0.52.3 h1:5f8uj6ZwHSscOGNdIQg6OiZv/ybiK2CO2q2drVZA
github.com/prometheus/common v0.52.3/go.mod h1:BrxBKv3FWBIGXw89Mg1AeBq7FSyRzXWI3l3e7W3RN5U=
github.com/prometheus/procfs v0.13.0 h1:GqzLlQyfsPbaEHaQkO7tbDlriv/4o5Hudv6OXHGKX7o=
github.com/prometheus/procfs v0.13.0/go.mod h1:cd4PFCR54QLnGKPaKGA6l+cfuNXtht43ZKY6tow0Y1g=
github.com/rogpeppe/go-internal v1.10.0 h1:TMyTOH3F/DB16zRVcYyreMH6GnZZrwQVAoYjRBZyWFQ=
github.com/rogpeppe/go-internal v1.10.0/go.mod h1:UQnix2H7Ngw/k4C5ijL5+65zddjncjaFoBhdsK/akog=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down Expand Up @@ -125,6 +132,8 @@ google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/natefinch/lumberjack.v2 v2.2.1 h1:bBRl1b0OH9s/DuPhuXpNl+VtCaJXFZ5/uEFST95x9zc=
gopkg.in/natefinch/lumberjack.v2 v2.2.1/go.mod h1:YD8tP3GAjkrDg1eZH7EGmyESg/lsYskCTPBJVb9jqSc=
Expand Down
Loading

0 comments on commit 39d8fdd

Please sign in to comment.