Skip to content

Commit

Permalink
fix: kill distributed task queue session
Browse files Browse the repository at this point in the history
  • Loading branch information
YenchangChan committed Mar 20, 2024
1 parent 1c3b1a2 commit 5aae795
Show file tree
Hide file tree
Showing 5 changed files with 99 additions and 4 deletions.
35 changes: 35 additions & 0 deletions docs/docs.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,41 @@ var doc = `{
}
}
},
"/api/v2/ck/ddl_queue/{clusterName}": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "查询分布式DDL",
"consumes": [
"application/json"
],
"tags": [
"clickhouse"
],
"summary": "查询分布式DDL",
"parameters": [
{
"type": "string",
"default": "test",
"description": "cluster name",
"name": "clusterName",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "{\"code\":\"0000\",\"msg\":\"ok\",\"data\":[{\"startTime\":1609986493,\"queryDuration\":145,\"query\":\"select * from dist_sensor_dt_result_online limit 10000\",\"user\":\"default\",\"queryId\":\"8aa3de08-92c4-4102-a83d-2f5d88569dab\",\"address\":\"::1\",\"threads\":2}]}",
"schema": {
"type": "string"
}
}
}
}
},
"/api/v2/ck/destroy/{clusterName}": {
"put": {
"security": [
Expand Down
35 changes: 35 additions & 0 deletions docs/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -325,6 +325,41 @@
}
}
},
"/api/v2/ck/ddl_queue/{clusterName}": {
"get": {
"security": [
{
"ApiKeyAuth": []
}
],
"description": "查询分布式DDL",
"consumes": [
"application/json"
],
"tags": [
"clickhouse"
],
"summary": "查询分布式DDL",
"parameters": [
{
"type": "string",
"default": "test",
"description": "cluster name",
"name": "clusterName",
"in": "path",
"required": true
}
],
"responses": {
"200": {
"description": "{\"code\":\"0000\",\"msg\":\"ok\",\"data\":[{\"startTime\":1609986493,\"queryDuration\":145,\"query\":\"select * from dist_sensor_dt_result_online limit 10000\",\"user\":\"default\",\"queryId\":\"8aa3de08-92c4-4102-a83d-2f5d88569dab\",\"address\":\"::1\",\"threads\":2}]}",
"schema": {
"type": "string"
}
}
}
}
},
"/api/v2/ck/destroy/{clusterName}": {
"put": {
"security": [
Expand Down
23 changes: 23 additions & 0 deletions docs/swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,29 @@ paths:
summary: 修改集群配置
tags:
- clickhouse
/api/v2/ck/ddl_queue/{clusterName}:
get:
consumes:
- application/json
description: 查询分布式DDL
parameters:
- default: test
description: cluster name
in: path
name: clusterName
required: true
type: string
responses:
"200":
description: '{"code":"0000","msg":"ok","data":[{"startTime":1609986493,"queryDuration":145,"query":"select
* from dist_sensor_dt_result_online limit 10000","user":"default","queryId":"8aa3de08-92c4-4102-a83d-2f5d88569dab","address":"::1","threads":2}]}'
schema:
type: string
security:
- ApiKeyAuth: []
summary: 查询分布式DDL
tags:
- clickhouse
/api/v2/ck/destroy/{clusterName}:
put:
consumes:
Expand Down
2 changes: 1 addition & 1 deletion frontend
8 changes: 5 additions & 3 deletions service/clickhouse/clickhouse_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,7 @@ func GetCkOpenSessions(conf *model.CKManClickHouseConfig, limit int) ([]*model.C
}

func GetDistibutedDDLQueue(conf *model.CKManClickHouseConfig) ([]*model.CkSessionInfo, error) {
query := fmt.Sprintf("select query_create_time, query, host, initiator_host, entry from cluster('{cluster}', system.distributed_ddl_queue) where cluster = '%s' and status != 'Finished' ORDER BY query_create_time", conf.Cluster)
query := fmt.Sprintf("select DISTINCT query_create_time, query, host, initiator_host, entry from cluster('{cluster}', system.distributed_ddl_queue) where cluster = '%s' and status != 'Finished' ORDER BY query_create_time", conf.Cluster)
log.Logger.Debugf("query:%s", query)
service := NewCkService(conf)
err := service.InitCkService()
Expand Down Expand Up @@ -1264,7 +1264,7 @@ func KillCkOpenSessions(conf *model.CKManClickHouseConfig, host, queryId, typ st
FROM
(
SELECT
(extractAllGroups(value, '(\\w+\\.\\w+) UUID')[1])[1] AS table,
(extractAllGroups(value, 'TABLE (\\w+\\.\\w+) ')[1])[1] AS table,
(extractAllGroups(value, 'initial_query_id: (.*)\n')[1])[1] AS initial_query_id
FROM system.zookeeper
WHERE (path = '/clickhouse/task_queue/ddl/%s') AND (name = '%s')
Expand All @@ -1281,7 +1281,8 @@ func KillCkOpenSessions(conf *model.CKManClickHouseConfig, host, queryId, typ st
log.Logger.Debugf(query)
err = conn.QueryRow(query).Scan(&query_id)
if err == nil {
query = fmt.Sprintf("KILL QUERY WHERE query_id = '%s'", queryId)
query = fmt.Sprintf("KILL QUERY WHERE query_id = '%s'", query_id)
log.Logger.Debugf(query)
err = conn.Exec(query)
if err != nil {
return errors.Wrap(err, "")
Expand All @@ -1297,6 +1298,7 @@ func KillCkOpenSessions(conf *model.CKManClickHouseConfig, host, queryId, typ st
}
if count > 0 {
query = fmt.Sprintf("KILL MUTATION WHERE database = '%s' AND table = '%s'", database, table)
log.Logger.Debugf(query)
err = conn.Exec(query)
if err != nil {
return errors.Wrap(err, "")
Expand Down

0 comments on commit 5aae795

Please sign in to comment.