From 5aae795529ee644451d1f385f6655963c6605b5f Mon Sep 17 00:00:00 2001 From: YenchangChan Date: Wed, 20 Mar 2024 16:56:30 +0800 Subject: [PATCH] fix: kill distributed task queue session --- docs/docs.go | 35 ++++++++++++++++++++++++ docs/swagger.json | 35 ++++++++++++++++++++++++ docs/swagger.yaml | 23 ++++++++++++++++ frontend | 2 +- service/clickhouse/clickhouse_service.go | 8 ++++-- 5 files changed, 99 insertions(+), 4 deletions(-) diff --git a/docs/docs.go b/docs/docs.go index 27cecf48..e73c7055 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -341,6 +341,41 @@ var doc = `{ } } }, + "/api/v2/ck/ddl_queue/{clusterName}": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "查询分布式DDL", + "consumes": [ + "application/json" + ], + "tags": [ + "clickhouse" + ], + "summary": "查询分布式DDL", + "parameters": [ + { + "type": "string", + "default": "test", + "description": "cluster name", + "name": "clusterName", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "{\"code\":\"0000\",\"msg\":\"ok\",\"data\":[{\"startTime\":1609986493,\"queryDuration\":145,\"query\":\"select * from dist_sensor_dt_result_online limit 10000\",\"user\":\"default\",\"queryId\":\"8aa3de08-92c4-4102-a83d-2f5d88569dab\",\"address\":\"::1\",\"threads\":2}]}", + "schema": { + "type": "string" + } + } + } + } + }, "/api/v2/ck/destroy/{clusterName}": { "put": { "security": [ diff --git a/docs/swagger.json b/docs/swagger.json index 86e25f39..5f9c50ce 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -325,6 +325,41 @@ } } }, + "/api/v2/ck/ddl_queue/{clusterName}": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "查询分布式DDL", + "consumes": [ + "application/json" + ], + "tags": [ + "clickhouse" + ], + "summary": "查询分布式DDL", + "parameters": [ + { + "type": "string", + "default": "test", + "description": "cluster name", + "name": "clusterName", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "{\"code\":\"0000\",\"msg\":\"ok\",\"data\":[{\"startTime\":1609986493,\"queryDuration\":145,\"query\":\"select * from dist_sensor_dt_result_online limit 10000\",\"user\":\"default\",\"queryId\":\"8aa3de08-92c4-4102-a83d-2f5d88569dab\",\"address\":\"::1\",\"threads\":2}]}", + "schema": { + "type": "string" + } + } + } + } + }, "/api/v2/ck/destroy/{clusterName}": { "put": { "security": [ diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 2565d5ae..a225bd34 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -853,6 +853,29 @@ paths: summary: 修改集群配置 tags: - clickhouse + /api/v2/ck/ddl_queue/{clusterName}: + get: + consumes: + - application/json + description: 查询分布式DDL + parameters: + - default: test + description: cluster name + in: path + name: clusterName + required: true + type: string + responses: + "200": + description: '{"code":"0000","msg":"ok","data":[{"startTime":1609986493,"queryDuration":145,"query":"select + * from dist_sensor_dt_result_online limit 10000","user":"default","queryId":"8aa3de08-92c4-4102-a83d-2f5d88569dab","address":"::1","threads":2}]}' + schema: + type: string + security: + - ApiKeyAuth: [] + summary: 查询分布式DDL + tags: + - clickhouse /api/v2/ck/destroy/{clusterName}: put: consumes: diff --git a/frontend b/frontend index 487a89bc..09cf79e3 160000 --- a/frontend +++ b/frontend @@ -1 +1 @@ -Subproject commit 487a89bc37a57312ae0ec6b539d6cd843cac5df0 +Subproject commit 09cf79e36d3eddfb26762bcd8f2e2eb352d75722 diff --git a/service/clickhouse/clickhouse_service.go b/service/clickhouse/clickhouse_service.go index 4f6cdc45..a942c3a9 100644 --- a/service/clickhouse/clickhouse_service.go +++ b/service/clickhouse/clickhouse_service.go @@ -1219,7 +1219,7 @@ func GetCkOpenSessions(conf *model.CKManClickHouseConfig, limit int) ([]*model.C } func GetDistibutedDDLQueue(conf *model.CKManClickHouseConfig) ([]*model.CkSessionInfo, error) { - query := fmt.Sprintf("select query_create_time, query, host, initiator_host, entry from cluster('{cluster}', system.distributed_ddl_queue) where cluster = '%s' and status != 'Finished' ORDER BY query_create_time", conf.Cluster) + query := fmt.Sprintf("select DISTINCT query_create_time, query, host, initiator_host, entry from cluster('{cluster}', system.distributed_ddl_queue) where cluster = '%s' and status != 'Finished' ORDER BY query_create_time", conf.Cluster) log.Logger.Debugf("query:%s", query) service := NewCkService(conf) err := service.InitCkService() @@ -1264,7 +1264,7 @@ func KillCkOpenSessions(conf *model.CKManClickHouseConfig, host, queryId, typ st FROM ( SELECT - (extractAllGroups(value, '(\\w+\\.\\w+) UUID')[1])[1] AS table, + (extractAllGroups(value, 'TABLE (\\w+\\.\\w+) ')[1])[1] AS table, (extractAllGroups(value, 'initial_query_id: (.*)\n')[1])[1] AS initial_query_id FROM system.zookeeper WHERE (path = '/clickhouse/task_queue/ddl/%s') AND (name = '%s') @@ -1281,7 +1281,8 @@ func KillCkOpenSessions(conf *model.CKManClickHouseConfig, host, queryId, typ st log.Logger.Debugf(query) err = conn.QueryRow(query).Scan(&query_id) if err == nil { - query = fmt.Sprintf("KILL QUERY WHERE query_id = '%s'", queryId) + query = fmt.Sprintf("KILL QUERY WHERE query_id = '%s'", query_id) + log.Logger.Debugf(query) err = conn.Exec(query) if err != nil { return errors.Wrap(err, "") @@ -1297,6 +1298,7 @@ func KillCkOpenSessions(conf *model.CKManClickHouseConfig, host, queryId, typ st } if count > 0 { query = fmt.Sprintf("KILL MUTATION WHERE database = '%s' AND table = '%s'", database, table) + log.Logger.Debugf(query) err = conn.Exec(query) if err != nil { return errors.Wrap(err, "")