From 052c90281389db3b85b0511b227148cd0f1849c8 Mon Sep 17 00:00:00 2001 From: YenchangChan Date: Fri, 15 Mar 2024 17:11:25 +0800 Subject: [PATCH 1/6] fix: use move partition instead of insert into --- service/clickhouse/rebalance.go | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/service/clickhouse/rebalance.go b/service/clickhouse/rebalance.go index 5eb5d586..e24ddb0a 100644 --- a/service/clickhouse/rebalance.go +++ b/service/clickhouse/rebalance.go @@ -426,9 +426,9 @@ func (r *CKRebalance) CreateTemporaryTable() error { func (r *CKRebalance) CheckCounts(tableName string) error { var query string if strings.Contains(r.Engine, "Replacing") { - query = fmt.Sprintf("SELECT count() FROM (SELECT DISTINCT %s FROM cluster('%s', '%s.%s') FINAL)", strings.Join(r.SortingKey, ","), r.Cluster, r.Database, r.Table) + query = fmt.Sprintf("SELECT count() FROM (SELECT DISTINCT %s FROM cluster('%s', '%s.%s') FINAL)", strings.Join(r.SortingKey, ","), r.Cluster, r.Database, tableName) } else { - query = fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, r.Table) + query = fmt.Sprintf("SELECT count() FROM cluster('%s', '%s.%s')", r.Cluster, r.Database, tableName) } log.Logger.Debugf("query: %s", query) conn := common.GetConnection(r.Hosts[0]) @@ -463,13 +463,7 @@ func (r *CKRebalance) InsertPlan() error { _ = common.Pool.Submit(func() { defer wg.Done() conn := common.GetConnection(host) - query := fmt.Sprintf("TRUNCATE TABLE `%s`.`%s`", r.Database, r.Table) - log.Logger.Debugf("[%s]%s", host, query) - if err := conn.Exec(query); err != nil { - lastError = errors.Wrap(err, host) - return - } - query = fmt.Sprintf(`SELECT distinct partition_id FROM cluster('%s', 'system.parts') WHERE database = '%s' AND table = '%s' AND active=1 ORDER BY partition_id`, r.Cluster, r.Database, r.TmpTable) + query := fmt.Sprintf(`SELECT distinct partition_id FROM cluster('%s', 'system.parts') WHERE database = '%s' AND table = '%s' AND active=1 ORDER BY partition_id DESC`, r.Cluster, r.Database, r.TmpTable) log.Logger.Debugf("[%s]%s", host, query) rows, err := conn.Query(query) if err != nil { @@ -535,8 +529,7 @@ func (r *CKRebalance) MoveBackup() error { log.Logger.Debugf("host:[%s], partitions: %v", host, partitions) for idx, partition := range partitions { - query = fmt.Sprintf("INSERT INTO `%s`.`%s` SELECT * FROM `%s`.`%s` WHERE _partition_id = '%s' SETTINGS insert_deduplicate=false,max_execution_time=0,max_insert_threads=8", - r.Database, r.TmpTable, r.Database, r.Table, partition) + query = fmt.Sprintf("ALTER TABLE `%s`.`%s` MOVE PARTITION ID '%s' TO TABLE `%s`.`%s`", r.Database, r.Table, partition, r.Database, r.TmpTable) log.Logger.Debugf("[%s](%d/%d) %s", host, idx+1, len(partitions), query) if err = conn.Exec(query); err != nil { lastError = errors.Wrap(err, host) From 7517874c6f3465feb942a7c358f33319f1a67301 Mon Sep 17 00:00:00 2001 From: YenchangChan Date: Tue, 19 Mar 2024 09:47:19 +0800 Subject: [PATCH 2/6] fix: verify logic cluster when import cluster --- service/clickhouse/clickhouse_service.go | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/service/clickhouse/clickhouse_service.go b/service/clickhouse/clickhouse_service.go index 7bb57835..5076b1f4 100644 --- a/service/clickhouse/clickhouse_service.go +++ b/service/clickhouse/clickhouse_service.go @@ -199,6 +199,18 @@ func GetCkClusterConfig(conf *model.CKManClickHouseConfig) (string, error) { } } + if conf.LogicCluster != nil { + query := fmt.Sprintf("SELECT count() FROM system.clusters WHERE cluster = '%s'", *conf.LogicCluster) + value, err = service.QueryInfo(query) + if err != nil { + return model.E_DATA_SELECT_FAILED, err + } + c := value[1][0].(uint64) + if c == 0 { + return model.E_RECORD_NOT_FOUND, fmt.Errorf("logic cluster %s not exist", *conf.LogicCluster) + } + } + value, err = service.QueryInfo("SELECT version()") if err != nil { return model.E_DATA_SELECT_FAILED, err From 37d8026c1a60abb7b94e68363d66e0b95cdf6767 Mon Sep 17 00:00:00 2001 From: YenchangChan Date: Tue, 19 Mar 2024 10:45:09 +0800 Subject: [PATCH 3/6] perf; change distributed_ddl_task_timeout 15 => 60 --- ckconfig/users.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ckconfig/users.go b/ckconfig/users.go index 93720695..2c55208d 100644 --- a/ckconfig/users.go +++ b/ckconfig/users.go @@ -81,7 +81,7 @@ func profiles(userProfiles []model.Profile, info HostInfo) map[string]interface{ defaultProfile["max_query_size"] = 1073741824 defaultProfile["distributed_aggregation_memory_efficient"] = 1 defaultProfile["joined_subquery_requires_alias"] = 0 - defaultProfile["distributed_ddl_task_timeout"] = 15 + defaultProfile["distributed_ddl_task_timeout"] = 60 defaultProfile["allow_drop_detached"] = 1 defaultProfile["use_uncompressed_cache"] = 0 defaultProfile["max_execution_time"] = 3600 // 1 hour From f4741f77792c6bf4d750377d4411ec4a965c7764 Mon Sep 17 00:00:00 2001 From: YenchangChan Date: Tue, 19 Mar 2024 15:41:22 +0800 Subject: [PATCH 4/6] feat: add distributed_ddl_queue api --- common/ck.go | 2 + common/ck_cli.go | 29 ++++++++ controller/clickhouse.go | 45 +++++++++++- model/ck_session.go | 2 +- router/v1.go | 1 + router/v2.go | 1 + service/clickhouse/clickhouse_service.go | 92 ++++++++++++++++++++++-- 7 files changed, 164 insertions(+), 8 deletions(-) diff --git a/common/ck.go b/common/ck.go index 4771a124..11dbb2d9 100644 --- a/common/ck.go +++ b/common/ck.go @@ -8,6 +8,7 @@ import ( "encoding/hex" "encoding/json" "fmt" + "net" "regexp" "strconv" "strings" @@ -92,6 +93,7 @@ func ConnectClickHouse(host string, database string, opt model.ConnetOption) (*C } } conn := Conn{ + addr: net.JoinHostPort(host, fmt.Sprint(opt.Port)), protocol: opt.Protocol, ctx: context.Background(), } diff --git a/common/ck_cli.go b/common/ck_cli.go index c77ea45a..03cf11bc 100644 --- a/common/ck_cli.go +++ b/common/ck_cli.go @@ -24,6 +24,20 @@ func (c *ColumnType) ScanType() reflect.Type { } } +type Row struct { + proto clickhouse.Protocol + r1 *sql.Row + r2 driver.Row +} + +func (r *Row) Scan(dest ...any) error { + if r.proto == clickhouse.HTTP { + return r.r1.Scan(dest...) + } else { + return r.r2.Scan(dest...) + } +} + type Rows struct { protocol clickhouse.Protocol rs1 *sql.Rows @@ -87,6 +101,7 @@ func (r *Rows) ColumnTypes() ([]*ColumnType, error) { } type Conn struct { + addr string protocol clickhouse.Protocol c driver.Conn db *sql.DB @@ -95,6 +110,7 @@ type Conn struct { func (c *Conn) Query(query string, args ...any) (*Rows, error) { var rs Rows + //log.Logger.Debugf("[%s]%s", c.addr, query) rs.protocol = c.protocol if c.protocol == clickhouse.HTTP { rows, err := c.db.Query(query, args...) @@ -114,7 +130,20 @@ func (c *Conn) Query(query string, args ...any) (*Rows, error) { return &rs, nil } +func (c *Conn) QueryRow(query string, args ...any) *Row { + var row Row + //log.Logger.Debugf("[%s]%s", c.addr, query) + row.proto = c.protocol + if c.protocol == clickhouse.HTTP { + row.r1 = c.db.QueryRow(query, args...) + } else { + row.r2 = c.c.QueryRow(c.ctx, query, args...) + } + return &row +} + func (c *Conn) Exec(query string, args ...any) error { + //log.Logger.Debugf("[%s]%s", c.addr, query) if c.protocol == clickhouse.HTTP { _, err := c.db.Exec(query, args...) return err diff --git a/controller/clickhouse.go b/controller/clickhouse.go index 7c4c10d8..c3adfdf2 100644 --- a/controller/clickhouse.go +++ b/controller/clickhouse.go @@ -2043,7 +2043,8 @@ func (controller *ClickHouseController) GetOpenSessions(c *gin.Context) { func (controller *ClickHouseController) KillOpenSessions(c *gin.Context) { clusterName := c.Param(ClickHouseClusterPath) host := c.Query("host") - queryId := c.Query("query_id") + queryId := c.Query("queryId") + typ := c.Query("type") conf, err := repository.Ps.GetClusterbyName(clusterName) if err != nil { @@ -2051,7 +2052,7 @@ func (controller *ClickHouseController) KillOpenSessions(c *gin.Context) { return } - err = clickhouse.KillCkOpenSessions(&conf, host, queryId) + err = clickhouse.KillCkOpenSessions(&conf, host, queryId, typ) if err != nil { controller.wrapfunc(c, model.E_DATA_UPDATE_FAILED, err) return @@ -2121,6 +2122,46 @@ func (controller *ClickHouseController) GetSlowSessions(c *gin.Context) { controller.wrapfunc(c, model.E_SUCCESS, sessions) } +// @Summary 查询分布式DDL +// @Description 查询分布式DDL +// @version 1.0 +// @Security ApiKeyAuth +// @Tags clickhouse +// @Accept json +// @Param clusterName path string true "cluster name" default(test) +// @Failure 200 {string} json "{"code":"5800","msg":"集群不存在","data":""}" +// @Failure 200 {string} json "{"code":"5804","msg":"数据查询失败","data":""}" +// @Success 200 {string} json "{"code":"0000","msg":"ok","data":[{"startTime":1609986493,"queryDuration":145,"query":"select * from dist_sensor_dt_result_online limit 10000","user":"default","queryId":"8aa3de08-92c4-4102-a83d-2f5d88569dab","address":"::1","threads":2}]}" +// @Router /api/v2/ck/ddl_queue/{clusterName} [get] +func (controller *ClickHouseController) GetDistDDLQueue(c *gin.Context) { + clusterName := c.Param(ClickHouseClusterPath) + conf, err := repository.Ps.GetClusterbyName(clusterName) + if err != nil { + controller.wrapfunc(c, model.E_RECORD_NOT_FOUND, fmt.Sprintf("cluster %s does not exist", clusterName)) + return + } + + var gotError bool + ddlQueue, err := clickhouse.GetDistibutedDDLQueue(&conf) + if err != nil { + gotError = true + err = common.ClikHouseExceptionDecode(err) + var exception *client.Exception + if errors.As(err, &exception) { + if exception.Code == 60 { + // we do not return error when system.query_log is not exist + gotError = false + } + } + } + if gotError { + controller.wrapfunc(c, model.E_DATA_SELECT_FAILED, err) + return + } + + controller.wrapfunc(c, model.E_SUCCESS, ddlQueue) +} + // @Summary Ping集群是否健康 // @Description 探测集群是否可以正常对外提供服务 // @version 1.0 diff --git a/model/ck_session.go b/model/ck_session.go index f045405e..d46da2cf 100644 --- a/model/ck_session.go +++ b/model/ck_session.go @@ -8,7 +8,7 @@ type CkSessionInfo struct { QueryId string `json:"queryId"` Address string `json:"address"` Threads int `json:"threads"` - Host string `json:"host"` //sql running in which node + Host string `json:"host"` //sql running in which node } type SessionCond struct { diff --git a/router/v1.go b/router/v1.go index 213ed064..55b9a7a6 100644 --- a/router/v1.go +++ b/router/v1.go @@ -111,6 +111,7 @@ func InitRouterV1(groupV1 *gin.RouterGroup, config *config.CKManConfig, signal c groupV1.GET(fmt.Sprintf("/ck/open_sessions/:%s", controller.ClickHouseClusterPath), ckController.GetOpenSessions) groupV1.PUT(fmt.Sprintf("/ck/open_sessions/:%s", controller.ClickHouseClusterPath), ckController.KillOpenSessions) groupV1.GET(fmt.Sprintf("/ck/slow_sessions/:%s", controller.ClickHouseClusterPath), ckController.GetSlowSessions) + groupV1.GET(fmt.Sprintf("/ck/ddl_queue/:%s", controller.ClickHouseClusterPath), ckController.GetDistDDLQueue) groupV1.POST(fmt.Sprintf("/ck/node/:%s", controller.ClickHouseClusterPath), ckController.AddNode) groupV1.DELETE(fmt.Sprintf("/ck/node/:%s", controller.ClickHouseClusterPath), ckController.DeleteNode) groupV1.PUT(fmt.Sprintf("/ck/node/start/:%s", controller.ClickHouseClusterPath), ckController.StartNode) diff --git a/router/v2.go b/router/v2.go index 58fb7e46..0701b43f 100644 --- a/router/v2.go +++ b/router/v2.go @@ -111,6 +111,7 @@ func InitRouterV2(groupV2 *gin.RouterGroup, config *config.CKManConfig, signal c groupV2.GET(fmt.Sprintf("/ck/open-sessions/:%s", controller.ClickHouseClusterPath), ckController.GetOpenSessions) groupV2.PUT(fmt.Sprintf("/ck/open-sessions/:%s", controller.ClickHouseClusterPath), ckController.KillOpenSessions) groupV2.GET(fmt.Sprintf("/ck/slow-sessions/:%s", controller.ClickHouseClusterPath), ckController.GetSlowSessions) + groupV2.GET(fmt.Sprintf("/ck/ddl_queue/:%s", controller.ClickHouseClusterPath), ckController.GetDistDDLQueue) groupV2.POST(fmt.Sprintf("/ck/node/:%s", controller.ClickHouseClusterPath), ckController.AddNode) groupV2.DELETE(fmt.Sprintf("/ck/node/:%s", controller.ClickHouseClusterPath), ckController.DeleteNode) groupV2.PUT(fmt.Sprintf("/ck/node/start/:%s", controller.ClickHouseClusterPath), ckController.StartNode) diff --git a/service/clickhouse/clickhouse_service.go b/service/clickhouse/clickhouse_service.go index 5076b1f4..4f6cdc45 100644 --- a/service/clickhouse/clickhouse_service.go +++ b/service/clickhouse/clickhouse_service.go @@ -1218,15 +1218,97 @@ func GetCkOpenSessions(conf *model.CKManClickHouseConfig, limit int) ([]*model.C return getCkSessions(conf, limit, query) } -func KillCkOpenSessions(conf *model.CKManClickHouseConfig, host, queryId string) error { +func GetDistibutedDDLQueue(conf *model.CKManClickHouseConfig) ([]*model.CkSessionInfo, error) { + query := fmt.Sprintf("select query_create_time, query, host, initiator_host, entry from cluster('{cluster}', system.distributed_ddl_queue) where cluster = '%s' and status != 'Finished' ORDER BY query_create_time", conf.Cluster) + log.Logger.Debugf("query:%s", query) + service := NewCkService(conf) + err := service.InitCkService() + if err != nil { + return nil, err + } + + value, err := service.QueryInfo(query) + if err != nil { + return nil, err + } + var sessions []*model.CkSessionInfo + if len(value) > 1 { + sessions = make([]*model.CkSessionInfo, len(value)-1) + for i := 1; i < len(value); i++ { + var session model.CkSessionInfo + startTime := value[i][0].(time.Time) + session.StartTime = startTime.Unix() + session.QueryDuration = uint64(time.Since(startTime).Milliseconds()) + session.Query = value[i][1].(string) + session.Host = value[i][2].(string) + session.Address = value[i][3].(string) + session.QueryId = value[i][4].(string) + + sessions[i-1] = &session + } + } else { + sessions = make([]*model.CkSessionInfo, 0) + } + return sessions, nil +} +func KillCkOpenSessions(conf *model.CKManClickHouseConfig, host, queryId, typ string) error { conn, err := common.ConnectClickHouse(host, model.ClickHouseDefaultDB, conf.GetConnOption()) if err != nil { return err } - query := fmt.Sprintf("KILL QUERY WHERE query_id = '%s'", queryId) - err = conn.Exec(query) - if err != nil { - return errors.Wrap(err, "") + if typ == "queue" { + query := fmt.Sprintf(`SELECT + splitByChar('.', table)[1] AS database, + splitByChar('.', table)[2] AS tbl, + initial_query_id + FROM + ( + SELECT + (extractAllGroups(value, '(\\w+\\.\\w+) UUID')[1])[1] AS table, + (extractAllGroups(value, 'initial_query_id: (.*)\n')[1])[1] AS initial_query_id + FROM system.zookeeper + WHERE (path = '/clickhouse/task_queue/ddl/%s') AND (name = '%s') + )`, conf.Cluster, queryId) + var database, table, initial_query_id string + log.Logger.Debugf(query) + err := conn.QueryRow(query).Scan(&database, &table, &initial_query_id) + if err != nil { + return errors.Wrap(err, "") + } + log.Logger.Debugf("database: %s, table: %s, initial_query_id: %s", database, table, initial_query_id) + query = fmt.Sprintf("select query_id from system.processes where initial_query_id = '%s'", initial_query_id) + var query_id string + log.Logger.Debugf(query) + err = conn.QueryRow(query).Scan(&query_id) + if err == nil { + query = fmt.Sprintf("KILL QUERY WHERE query_id = '%s'", queryId) + err = conn.Exec(query) + if err != nil { + return errors.Wrap(err, "") + } + } else { + // kill mutation + query = fmt.Sprintf("select count() from system.mutations where is_done = 0 and database = '%s' and table = '%s'", database, table) + log.Logger.Debugf(query) + var count uint64 + err = conn.QueryRow(query).Scan(&count) + if err != nil { + return errors.Wrap(err, "") + } + if count > 0 { + query = fmt.Sprintf("KILL MUTATION WHERE database = '%s' AND table = '%s'", database, table) + err = conn.Exec(query) + if err != nil { + return errors.Wrap(err, "") + } + } + } + } else { + query := fmt.Sprintf("KILL QUERY WHERE query_id = '%s'", queryId) + err = conn.Exec(query) + if err != nil { + return errors.Wrap(err, "") + } } return nil } From 1c3b1a26dfe5a2fa1c8a5a257c5503add22e1a81 Mon Sep 17 00:00:00 2001 From: YenchangChan Date: Wed, 20 Mar 2024 15:38:21 +0800 Subject: [PATCH 5/6] feat: update frontend --- frontend | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/frontend b/frontend index 3e7e95cc..487a89bc 160000 --- a/frontend +++ b/frontend @@ -1 +1 @@ -Subproject commit 3e7e95cc8b07dfd967150a48a8b8c8090fc7dd48 +Subproject commit 487a89bc37a57312ae0ec6b539d6cd843cac5df0 From 5aae795529ee644451d1f385f6655963c6605b5f Mon Sep 17 00:00:00 2001 From: YenchangChan Date: Wed, 20 Mar 2024 16:56:30 +0800 Subject: [PATCH 6/6] fix: kill distributed task queue session --- docs/docs.go | 35 ++++++++++++++++++++++++ docs/swagger.json | 35 ++++++++++++++++++++++++ docs/swagger.yaml | 23 ++++++++++++++++ frontend | 2 +- service/clickhouse/clickhouse_service.go | 8 ++++-- 5 files changed, 99 insertions(+), 4 deletions(-) diff --git a/docs/docs.go b/docs/docs.go index 27cecf48..e73c7055 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -341,6 +341,41 @@ var doc = `{ } } }, + "/api/v2/ck/ddl_queue/{clusterName}": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "查询分布式DDL", + "consumes": [ + "application/json" + ], + "tags": [ + "clickhouse" + ], + "summary": "查询分布式DDL", + "parameters": [ + { + "type": "string", + "default": "test", + "description": "cluster name", + "name": "clusterName", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "{\"code\":\"0000\",\"msg\":\"ok\",\"data\":[{\"startTime\":1609986493,\"queryDuration\":145,\"query\":\"select * from dist_sensor_dt_result_online limit 10000\",\"user\":\"default\",\"queryId\":\"8aa3de08-92c4-4102-a83d-2f5d88569dab\",\"address\":\"::1\",\"threads\":2}]}", + "schema": { + "type": "string" + } + } + } + } + }, "/api/v2/ck/destroy/{clusterName}": { "put": { "security": [ diff --git a/docs/swagger.json b/docs/swagger.json index 86e25f39..5f9c50ce 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -325,6 +325,41 @@ } } }, + "/api/v2/ck/ddl_queue/{clusterName}": { + "get": { + "security": [ + { + "ApiKeyAuth": [] + } + ], + "description": "查询分布式DDL", + "consumes": [ + "application/json" + ], + "tags": [ + "clickhouse" + ], + "summary": "查询分布式DDL", + "parameters": [ + { + "type": "string", + "default": "test", + "description": "cluster name", + "name": "clusterName", + "in": "path", + "required": true + } + ], + "responses": { + "200": { + "description": "{\"code\":\"0000\",\"msg\":\"ok\",\"data\":[{\"startTime\":1609986493,\"queryDuration\":145,\"query\":\"select * from dist_sensor_dt_result_online limit 10000\",\"user\":\"default\",\"queryId\":\"8aa3de08-92c4-4102-a83d-2f5d88569dab\",\"address\":\"::1\",\"threads\":2}]}", + "schema": { + "type": "string" + } + } + } + } + }, "/api/v2/ck/destroy/{clusterName}": { "put": { "security": [ diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 2565d5ae..a225bd34 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -853,6 +853,29 @@ paths: summary: 修改集群配置 tags: - clickhouse + /api/v2/ck/ddl_queue/{clusterName}: + get: + consumes: + - application/json + description: 查询分布式DDL + parameters: + - default: test + description: cluster name + in: path + name: clusterName + required: true + type: string + responses: + "200": + description: '{"code":"0000","msg":"ok","data":[{"startTime":1609986493,"queryDuration":145,"query":"select + * from dist_sensor_dt_result_online limit 10000","user":"default","queryId":"8aa3de08-92c4-4102-a83d-2f5d88569dab","address":"::1","threads":2}]}' + schema: + type: string + security: + - ApiKeyAuth: [] + summary: 查询分布式DDL + tags: + - clickhouse /api/v2/ck/destroy/{clusterName}: put: consumes: diff --git a/frontend b/frontend index 487a89bc..09cf79e3 160000 --- a/frontend +++ b/frontend @@ -1 +1 @@ -Subproject commit 487a89bc37a57312ae0ec6b539d6cd843cac5df0 +Subproject commit 09cf79e36d3eddfb26762bcd8f2e2eb352d75722 diff --git a/service/clickhouse/clickhouse_service.go b/service/clickhouse/clickhouse_service.go index 4f6cdc45..a942c3a9 100644 --- a/service/clickhouse/clickhouse_service.go +++ b/service/clickhouse/clickhouse_service.go @@ -1219,7 +1219,7 @@ func GetCkOpenSessions(conf *model.CKManClickHouseConfig, limit int) ([]*model.C } func GetDistibutedDDLQueue(conf *model.CKManClickHouseConfig) ([]*model.CkSessionInfo, error) { - query := fmt.Sprintf("select query_create_time, query, host, initiator_host, entry from cluster('{cluster}', system.distributed_ddl_queue) where cluster = '%s' and status != 'Finished' ORDER BY query_create_time", conf.Cluster) + query := fmt.Sprintf("select DISTINCT query_create_time, query, host, initiator_host, entry from cluster('{cluster}', system.distributed_ddl_queue) where cluster = '%s' and status != 'Finished' ORDER BY query_create_time", conf.Cluster) log.Logger.Debugf("query:%s", query) service := NewCkService(conf) err := service.InitCkService() @@ -1264,7 +1264,7 @@ func KillCkOpenSessions(conf *model.CKManClickHouseConfig, host, queryId, typ st FROM ( SELECT - (extractAllGroups(value, '(\\w+\\.\\w+) UUID')[1])[1] AS table, + (extractAllGroups(value, 'TABLE (\\w+\\.\\w+) ')[1])[1] AS table, (extractAllGroups(value, 'initial_query_id: (.*)\n')[1])[1] AS initial_query_id FROM system.zookeeper WHERE (path = '/clickhouse/task_queue/ddl/%s') AND (name = '%s') @@ -1281,7 +1281,8 @@ func KillCkOpenSessions(conf *model.CKManClickHouseConfig, host, queryId, typ st log.Logger.Debugf(query) err = conn.QueryRow(query).Scan(&query_id) if err == nil { - query = fmt.Sprintf("KILL QUERY WHERE query_id = '%s'", queryId) + query = fmt.Sprintf("KILL QUERY WHERE query_id = '%s'", query_id) + log.Logger.Debugf(query) err = conn.Exec(query) if err != nil { return errors.Wrap(err, "") @@ -1297,6 +1298,7 @@ func KillCkOpenSessions(conf *model.CKManClickHouseConfig, host, queryId, typ st } if count > 0 { query = fmt.Sprintf("KILL MUTATION WHERE database = '%s' AND table = '%s'", database, table) + log.Logger.Debugf(query) err = conn.Exec(query) if err != nil { return errors.Wrap(err, "")