diff --git a/lightning/pkg/importer/chunk_process.go b/lightning/pkg/importer/chunk_process.go index b196e59ca92c2..2b7d584d5237a 100644 --- a/lightning/pkg/importer/chunk_process.go +++ b/lightning/pkg/importer/chunk_process.go @@ -177,9 +177,10 @@ func (cr *chunkProcessor) process( // Create the encoder. kvEncoder, err := rc.encBuilder.NewEncoder(ctx, &encode.EncodingConfig{ SessionOptions: encode.SessionOptions{ - SQLMode: rc.cfg.TiDB.SQLMode, - Timestamp: cr.chunk.Timestamp, - SysVars: rc.sysVars, + SQLMode: rc.cfg.TiDB.SQLMode, + Timestamp: cr.chunk.Timestamp, + SysVars: rc.sysVars, + LogicalImportPrepStmt: rc.cfg.TikvImporter.LogicalImportPrepStmt, // use chunk.PrevRowIDMax as the auto random seed, so it can stay the same value after recover from checkpoint. AutoRandomSeed: cr.chunk.Chunk.PrevRowIDMax, }, @@ -262,9 +263,10 @@ func (cr *chunkProcessor) encodeLoop( originalTableEncoder, err = rc.encBuilder.NewEncoder(ctx, &encode.EncodingConfig{ SessionOptions: encode.SessionOptions{ - SQLMode: rc.cfg.TiDB.SQLMode, - Timestamp: cr.chunk.Timestamp, - SysVars: rc.sysVars, + SQLMode: rc.cfg.TiDB.SQLMode, + Timestamp: cr.chunk.Timestamp, + SysVars: rc.sysVars, + LogicalImportPrepStmt: rc.cfg.TikvImporter.LogicalImportPrepStmt, // use chunk.PrevRowIDMax as the auto random seed, so it can stay the same value after recover from checkpoint. AutoRandomSeed: cr.chunk.Chunk.PrevRowIDMax, }, diff --git a/lightning/tests/lightning_tidb_duplicate_data/error.toml b/lightning/tests/lightning_tidb_duplicate_data/error.toml index ae201b501ccbb..eba965ae934a0 100644 --- a/lightning/tests/lightning_tidb_duplicate_data/error.toml +++ b/lightning/tests/lightning_tidb_duplicate_data/error.toml @@ -2,3 +2,4 @@ backend = "tidb" on-duplicate = "error" logical-import-batch-rows = 1 +logical-import-prep-stmt = true diff --git a/lightning/tests/lightning_tidb_duplicate_data/ignore.toml b/lightning/tests/lightning_tidb_duplicate_data/ignore.toml index 1cfd526c1acb7..f029f41d46434 100644 --- a/lightning/tests/lightning_tidb_duplicate_data/ignore.toml +++ b/lightning/tests/lightning_tidb_duplicate_data/ignore.toml @@ -2,3 +2,4 @@ backend = "tidb" on-duplicate = "ignore" logical-import-batch-rows = 1 +logical-import-prep-stmt = true diff --git a/lightning/tests/lightning_tidb_duplicate_data/replace.toml b/lightning/tests/lightning_tidb_duplicate_data/replace.toml index 5e3b7b9993d63..88b81f6518fd4 100644 --- a/lightning/tests/lightning_tidb_duplicate_data/replace.toml +++ b/lightning/tests/lightning_tidb_duplicate_data/replace.toml @@ -2,3 +2,4 @@ backend = "tidb" on-duplicate = "replace" logical-import-batch-rows = 1 +logical-import-prep-stmt = true diff --git a/lightning/tidb-lightning.toml b/lightning/tidb-lightning.toml index 3a7de55bf758e..df7c8842e7f59 100644 --- a/lightning/tidb-lightning.toml +++ b/lightning/tidb-lightning.toml @@ -166,6 +166,8 @@ addr = "127.0.0.1:8287" # the rows will be split in a way to respect both settings. # This value may be decreased to reduce the stress on the cluster due to large transaction. #logical-import-batch-rows = 65536 +# logical-import-prep-stmt controls whether to use prepared statements in logical mode (TiDB backend). +#logical-import-prep-stmt = false [mydumper] # block size of file reading diff --git a/pkg/lightning/backend/encode/encode.go b/pkg/lightning/backend/encode/encode.go index 73fe990032465..4ce2ed8716115 100644 --- a/pkg/lightning/backend/encode/encode.go +++ b/pkg/lightning/backend/encode/encode.go @@ -52,9 +52,10 @@ type Encoder interface { // SessionOptions is the initial configuration of the session. type SessionOptions struct { - SQLMode mysql.SQLMode - Timestamp int64 - SysVars map[string]string + SQLMode mysql.SQLMode + Timestamp int64 + SysVars map[string]string + LogicalImportPrepStmt bool // a seed used for tableKvEncoder's auto random bits value AutoRandomSeed int64 // IndexID is used by the dupeDetector. Only the key range with the specified index ID is scanned. diff --git a/pkg/lightning/backend/tidb/BUILD.bazel b/pkg/lightning/backend/tidb/BUILD.bazel index 3c84f18714a0a..74ab3e1586816 100644 --- a/pkg/lightning/backend/tidb/BUILD.bazel +++ b/pkg/lightning/backend/tidb/BUILD.bazel @@ -22,6 +22,8 @@ go_library( "//pkg/table", "//pkg/types", "//pkg/util/dbutil", + "//pkg/util/hack", + "//pkg/util/kvcache", "//pkg/util/redact", "@com_github_go_sql_driver_mysql//:mysql", "@com_github_google_uuid//:uuid", @@ -37,7 +39,7 @@ go_test( timeout = "short", srcs = ["tidb_test.go"], flaky = True, - shard_count = 15, + shard_count = 17, deps = [ ":tidb", "//pkg/errno", @@ -60,5 +62,6 @@ go_test( "@com_github_go_sql_driver_mysql//:mysql", "@com_github_stretchr_testify//require", "@org_uber_go_atomic//:atomic", + "@org_uber_go_zap//:zap", ], ) diff --git a/pkg/lightning/backend/tidb/tidb.go b/pkg/lightning/backend/tidb/tidb.go index a19e15b9a1a5e..caf837523df90 100644 --- a/pkg/lightning/backend/tidb/tidb.go +++ b/pkg/lightning/backend/tidb/tidb.go @@ -21,6 +21,7 @@ import ( "fmt" "strconv" "strings" + "sync" "time" gmysql "github.com/go-sql-driver/mysql" @@ -43,6 +44,8 @@ import ( "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/dbutil" + "github.com/pingcap/tidb/pkg/util/hack" + "github.com/pingcap/tidb/pkg/util/kvcache" "github.com/pingcap/tidb/pkg/util/redact" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -56,12 +59,16 @@ var extraHandleTableColumn = &table.Column{ const ( writeRowsMaxRetryTimes = 3 + // To limit memory usage for prepared statements. + prepStmtCacheSize uint = 100 ) type tidbRow struct { - insertStmt string - path string - offset int64 + insertStmt string + preparedInsertStmt string + values []any + path string + offset int64 } var emptyTiDBRow = tidbRow{ @@ -91,8 +98,9 @@ type tidbEncoder struct { // the there are enough columns. columnCnt int // data file path - path string - logger log.Logger + path string + logger log.Logger + prepStmt bool } type encodingBuilder struct{} @@ -106,10 +114,11 @@ func NewEncodingBuilder() encode.EncodingBuilder { // It implements the `backend.EncodingBuilder` interface. func (*encodingBuilder) NewEncoder(_ context.Context, config *encode.EncodingConfig) (encode.Encoder, error) { return &tidbEncoder{ - mode: config.SQLMode, - tbl: config.Table, - path: config.Path, - logger: config.Logger, + mode: config.SQLMode, + tbl: config.Table, + path: config.Path, + logger: config.Logger, + prepStmt: config.LogicalImportPrepStmt, }, nil } @@ -288,6 +297,16 @@ func (*targetInfoGetter) CheckRequirements(ctx context.Context, _ *backend.Check return nil } +// stmtKey defines key for stmtCache. +type stmtKey struct { + query string +} + +// Hash implements SimpleLRUCache.Key. +func (k *stmtKey) Hash() []byte { + return hack.Slice(k.query) +} + type tidbBackend struct { db *sql.DB conflictCfg config.Conflict @@ -301,6 +320,9 @@ type tidbBackend struct { // affecting the cluster too much. maxChunkSize uint64 maxChunkRows int + // implement stmtCache to improve performance + stmtCache *kvcache.SimpleLRUCache + stmtCacheMutex sync.RWMutex } var _ backend.Backend = (*tidbBackend)(nil) @@ -334,13 +356,23 @@ func NewTiDBBackend( log.FromContext(ctx).Warn("unsupported conflict strategy for TiDB backend, overwrite with `error`") onDuplicate = config.ErrorOnDup } + var stmtCache *kvcache.SimpleLRUCache + if cfg.TikvImporter.LogicalImportPrepStmt { + stmtCache = kvcache.NewSimpleLRUCache(prepStmtCacheSize, 0, 0) + stmtCache.SetOnEvict(func(_ kvcache.Key, value kvcache.Value) { + stmt := value.(*sql.Stmt) + stmt.Close() + }) + } return &tidbBackend{ - db: db, - conflictCfg: conflict, - onDuplicate: onDuplicate, - errorMgr: errorMgr, - maxChunkSize: uint64(cfg.TikvImporter.LogicalImportBatchSize), - maxChunkRows: cfg.TikvImporter.LogicalImportBatchRows, + db: db, + conflictCfg: conflict, + onDuplicate: onDuplicate, + errorMgr: errorMgr, + maxChunkSize: uint64(cfg.TikvImporter.LogicalImportBatchSize), + maxChunkRows: cfg.TikvImporter.LogicalImportBatchRows, + stmtCache: stmtCache, + stmtCacheMutex: sync.RWMutex{}, } } @@ -556,9 +588,15 @@ func (enc *tidbEncoder) Encode(row []types.Datum, _ int64, columnPermutation []i return emptyTiDBRow, errors.Errorf("column count mismatch, at most %d but got %d", len(enc.columnIdx), len(row)) } - var encoded strings.Builder + var encoded, preparedInsertStmt strings.Builder + var values []any encoded.Grow(8 * len(row)) encoded.WriteByte('(') + if enc.prepStmt { + preparedInsertStmt.Grow(2 * len(row)) + preparedInsertStmt.WriteByte('(') + values = make([]any, 0, len(row)) + } cnt := 0 for i, field := range row { if enc.columnIdx[i] < 0 { @@ -566,6 +604,9 @@ func (enc *tidbEncoder) Encode(row []types.Datum, _ int64, columnPermutation []i } if cnt > 0 { encoded.WriteByte(',') + if enc.prepStmt { + preparedInsertStmt.WriteByte(',') + } } datum := field if err := enc.appendSQL(&encoded, &datum, getColumnByIndex(cols, enc.columnIdx[i])); err != nil { @@ -576,13 +617,23 @@ func (enc *tidbEncoder) Encode(row []types.Datum, _ int64, columnPermutation []i ) return nil, err } + if enc.prepStmt { + preparedInsertStmt.WriteByte('?') + values = append(values, datum.GetValue()) + } cnt++ } encoded.WriteByte(')') + if enc.prepStmt { + preparedInsertStmt.WriteByte(')') + } + return tidbRow{ - insertStmt: encoded.String(), - path: enc.path, - offset: offset, + insertStmt: encoded.String(), + preparedInsertStmt: preparedInsertStmt.String(), + values: values, + path: enc.path, + offset: offset, }, nil } @@ -665,8 +716,9 @@ rowLoop: } type stmtTask struct { - rows tidbRows - stmt string + rows tidbRows + stmt string + values []any } // WriteBatchRowsToDB write rows in batch mode, which will insert multiple rows like this: @@ -679,14 +731,23 @@ func (be *tidbBackend) WriteBatchRowsToDB(ctx context.Context, tableName string, } // Note: we are not going to do interpolation (prepared statements) to avoid // complication arise from data length overflow of BIT and BINARY columns + var values []any + if be.stmtCache != nil && len(rows) > 0 { + values = make([]any, 0, len(rows[0].values)*len(rows)) + } stmtTasks := make([]stmtTask, 1) for i, row := range rows { if i != 0 { insertStmt.WriteByte(',') } - insertStmt.WriteString(row.insertStmt) + if be.stmtCache != nil { + insertStmt.WriteString(row.preparedInsertStmt) + values = append(values, row.values...) + } else { + insertStmt.WriteString(row.insertStmt) + } } - stmtTasks[0] = stmtTask{rows, insertStmt.String()} + stmtTasks[0] = stmtTask{rows, insertStmt.String(), values} return be.execStmts(ctx, stmtTasks, tableName, true) } @@ -715,8 +776,12 @@ func (be *tidbBackend) WriteRowsToDB(ctx context.Context, tableName string, colu for _, row := range rows { var finalInsertStmt strings.Builder finalInsertStmt.WriteString(is) - finalInsertStmt.WriteString(row.insertStmt) - stmtTasks = append(stmtTasks, stmtTask{[]tidbRow{row}, finalInsertStmt.String()}) + if be.stmtCache != nil { + finalInsertStmt.WriteString(row.preparedInsertStmt) + } else { + finalInsertStmt.WriteString(row.insertStmt) + } + stmtTasks = append(stmtTasks, stmtTask{[]tidbRow{row}, finalInsertStmt.String(), row.values}) } return be.execStmts(ctx, stmtTasks, tableName, false) } @@ -754,8 +819,34 @@ stmtLoop: err error ) for i := 0; i < writeRowsMaxRetryTimes; i++ { - stmt := stmtTask.stmt - result, err = be.db.ExecContext(ctx, stmt) + query := stmtTask.stmt + if be.stmtCache != nil { + var prepStmt *sql.Stmt + key := &stmtKey{query: query} + be.stmtCacheMutex.RLock() + stmt, ok := be.stmtCache.Get(key) + be.stmtCacheMutex.RUnlock() + if ok { + prepStmt = stmt.(*sql.Stmt) + } else if stmt, err := be.db.PrepareContext(ctx, query); err == nil { + be.stmtCacheMutex.Lock() + // check again if the key is already in the cache + // to avoid override existing stmt without closing it + if cachedStmt, ok := be.stmtCache.Get(key); !ok { + prepStmt = stmt + be.stmtCache.Put(key, stmt) + } else { + prepStmt = cachedStmt.(*sql.Stmt) + stmt.Close() + } + be.stmtCacheMutex.Unlock() + } else { + return errors.Trace(err) + } + result, err = prepStmt.ExecContext(ctx, stmtTask.values...) + } else { + result, err = be.db.ExecContext(ctx, query) + } if err == nil { affected, err2 := result.RowsAffected() if err2 != nil { @@ -776,7 +867,7 @@ stmtLoop: if !common.IsContextCanceledError(err) { log.FromContext(ctx).Error("execute statement failed", - zap.Array("rows", stmtTask.rows), zap.String("stmt", redact.Value(stmt)), zap.Error(err)) + zap.Array("rows", stmtTask.rows), zap.String("stmt", redact.Value(query)), zap.Error(err)) } // It's batch mode, just return the error. Caller will fall back to row-by-row mode. if batch { diff --git a/pkg/lightning/backend/tidb/tidb_test.go b/pkg/lightning/backend/tidb/tidb_test.go index d32d3dfe45213..1ee7dd0024660 100644 --- a/pkg/lightning/backend/tidb/tidb_test.go +++ b/pkg/lightning/backend/tidb/tidb_test.go @@ -43,6 +43,7 @@ import ( "github.com/pingcap/tidb/pkg/types" "github.com/stretchr/testify/require" "go.uber.org/atomic" + "go.uber.org/zap" ) type mysqlSuite struct { @@ -812,9 +813,10 @@ func encodeRowsTiDB(t *testing.T, encBuilder encode.EncodingBuilder, tbl table.T } for _, rc := range rowCases { encoder, err := encBuilder.NewEncoder(context.Background(), &encode.EncodingConfig{ - Path: rc.path, - Table: tbl, - Logger: log.L(), + SessionOptions: encode.SessionOptions{LogicalImportPrepStmt: true}, + Path: rc.path, + Table: tbl, + Logger: log.L(), }) require.NoError(t, err) row, err := encoder.Encode(rc.row, rc.rowID, rc.colMapping, rc.offset) @@ -922,3 +924,139 @@ func TestLogicalImportBatch(t *testing.T) { err = writer.AppendRows(ctx, []string{"a"}, dataRows) require.NoError(t, err) } + +// TestLogicalImportBatch tests that each INSERT statement is limited by both +// logical-import-batch-size and logical-import-batch-rows configurations. Here +// we ensure each INSERT statement has up to 5 rows *and* ~30 bytes of values. +// In this test, we enable the `logical-import-prep-stmt` option, +// to verify prepared once and executed multiple times. +func TestLogicalImportBatchPrepStmt(t *testing.T) { + s := createMysqlSuite(t) + defer s.TearDownTest(t) + + query1 := "\\QINSERT INTO `foo`.`bar`(`a`) VALUES(?),(?),(?),(?),(?)\\E" + query2 := "\\QINSERT INTO `foo`.`bar`(`a`) VALUES(?),(?),(?),(?)\\E" + query3 := "\\QINSERT INTO `foo`.`bar`(`a`) VALUES(?)\\E" + + // Expect the query to be prepared + stmt1 := s.mockDB.ExpectPrepare(query1) + // Set the expectation for the query execution with specific arguments + stmt1.ExpectExec(). + WithArgs(1, 2, 4, 8, 16). + WillReturnResult(sqlmock.NewResult(1, 5)) // Simulate an insert with 5 rows affected + stmt1.ExpectExec(). + //WithArgs(32, 64, 128, 256, 512). + WithArgs(sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg(), sqlmock.AnyArg()). + WillReturnResult(sqlmock.NewResult(1, 5)) // Simulate an insert with 5 rows affected + // Expect the query to be prepared + stmt2 := s.mockDB.ExpectPrepare(query2) + // Set the expectation for the query execution with specific arguments + stmt2.ExpectExec(). + WithArgs(1024, 2048, 4096, 8192). + WillReturnResult(sqlmock.NewResult(1, 4)) // Simulate an insert with 4 rows affected + stmt2.ExpectExec(). + WithArgs(16384, 32768, 65536, 131072). + WillReturnResult(sqlmock.NewResult(1, 4)) // Simulate an insert with 4 rows affected + // Expect the query to be prepared + stmt3 := s.mockDB.ExpectPrepare(query3) + // Set the expectation for the query execution with specific arguments + stmt3.ExpectExec(). + WithArgs(262144). + WillReturnResult(sqlmock.NewResult(1, 1)) // Simulate an insert with 1 rows affected + + ctx := context.Background() + logger := log.L() + + cfg := config.NewConfig() + cfg.Conflict.Strategy = config.ErrorOnDup + cfg.TikvImporter.LogicalImportBatchSize = 30 + cfg.TikvImporter.LogicalImportBatchRows = 5 + cfg.TikvImporter.LogicalImportPrepStmt = true + ignoreBackend := tidb.NewTiDBBackend(ctx, s.dbHandle, cfg, errormanager.New(nil, cfg, logger)) + encBuilder := tidb.NewEncodingBuilder() + // add logicalImportPrepStmt to the encoding config + encoder, err := encBuilder.NewEncoder(context.Background(), &encode.EncodingConfig{ + SessionOptions: encode.SessionOptions{LogicalImportPrepStmt: true}, + Path: "1.csv", + Table: s.tbl, + Logger: log.L(), + }) + require.NoError(t, err) + + dataRows := encBuilder.MakeEmptyRows() + dataChecksum := verification.MakeKVChecksum(0, 0, 0) + indexRows := encBuilder.MakeEmptyRows() + indexChecksum := verification.MakeKVChecksum(0, 0, 0) + for i := int64(0); i < 19; i++ { // encode rows 1, 2, 4, 8, ..., 262144. + row, err := encoder.Encode( + []types.Datum{types.NewIntDatum(1 << i)}, + i, + []int{0, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1, -1}, + 8*i, + ) + require.NoError(t, err) + row.ClassifyAndAppend(&dataRows, &dataChecksum, &indexRows, &indexChecksum) + } + logger.Error("dataRows", zap.Any("dataRows", dataRows)) + + engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) + require.NoError(t, err) + writerCfg := &backend.LocalWriterConfig{} + writerCfg.TiDB.TableName = "`foo`.`bar`" + writer, err := engine.LocalWriter(ctx, writerCfg) + require.NoError(t, err) + err = writer.AppendRows(ctx, []string{"a"}, dataRows) + require.NoError(t, err) +} + +// TestWriteRowsRecordOneErrorPrepStmt tests that when LogicalImportPrepStmt is true and the batch insert fails, +// it will fallback to a single row insert, +// the error will be recorded in tidb_lightning_errors.conflict_records. +func TestWriteRowsRecordOneErrorPrepStmt(t *testing.T) { + dupErr := &gmysql.MySQLError{Number: errno.ErrDupEntry, Message: "Duplicate entry '2' for key 'PRIMARY'"} + s := createMysqlSuite(t) + defer s.TearDownTest(t) + // First, batch insert, fail and rollback. + query1 := "\\QINSERT INTO `foo`.`bar`(`a`) VALUES(?),(?),(?),(?),(?)\\E" + query2 := "\\QINSERT INTO `foo`.`bar`(`a`) VALUES(?)\\E" + // Expect any INSERT statement to be prepared + stmt1 := s.mockDB.ExpectPrepare(query1) + // Expect the batch query execution to fail + stmt1.ExpectExec(). + WithArgs(1, 2, 3, 4, 5). + WillReturnError(dupErr) + // Expect single-row inserts + stmt2 := s.mockDB.ExpectPrepare(query2) + stmt2.ExpectExec(). + WithArgs(1). + WillReturnResult(sqlmock.NewResult(1, 1)) + stmt2.ExpectExec(). + WithArgs(2). + WillReturnError(dupErr) + s.mockDB. + ExpectExec("INSERT INTO `tidb_lightning_errors`\\.conflict_records.*"). + WithArgs(sqlmock.AnyArg(), "`foo`.`bar`", "8.csv", int64(0), dupErr.Error(), 0, "(2)"). + WillReturnResult(driver.ResultNoRows) + + cfg := config.NewConfig() + cfg.Conflict.Strategy = config.ErrorOnDup + cfg.Conflict.Threshold = 0 + cfg.Conflict.MaxRecordRows = 0 + cfg.App.TaskInfoSchemaName = "tidb_lightning_errors" + cfg.TikvImporter.LogicalImportPrepStmt = true + ignoreBackend := tidb.NewTiDBBackend(context.Background(), s.dbHandle, cfg, errormanager.New(s.dbHandle, cfg, log.L())) + encBuilder := tidb.NewEncodingBuilder() + dataRows := encodeRowsTiDB(t, encBuilder, s.tbl) + ctx := context.Background() + engine, err := backend.MakeEngineManager(ignoreBackend).OpenEngine(ctx, &backend.EngineConfig{}, "`foo`.`bar`", 1) + require.NoError(t, err) + writerCfg := &backend.LocalWriterConfig{} + writerCfg.TiDB.TableName = "`foo`.`bar`" + writer, err := engine.LocalWriter(ctx, writerCfg) + require.NoError(t, err) + err = writer.AppendRows(ctx, []string{"a"}, dataRows) + require.ErrorContains(t, err, "Duplicate entry '2' for key 'PRIMARY'") + st, err := writer.Close(ctx) + require.NoError(t, err) + require.Nil(t, st) +} diff --git a/pkg/lightning/common/util.go b/pkg/lightning/common/util.go index 1dfcfc9f5170f..7017e79a646c2 100644 --- a/pkg/lightning/common/util.go +++ b/pkg/lightning/common/util.go @@ -26,6 +26,7 @@ import ( "net" "net/http" "os" + "runtime" "strconv" "strings" "syscall" @@ -142,6 +143,11 @@ func (param *MySQLConnectParam) Connect() (*sql.DB, error) { if err != nil { return nil, errors.Trace(err) } + // The actual number of alive connections is controlled by the region concurrency + // The setting is required to avoid frequent connection creation and close + if db != nil { + db.SetMaxIdleConns(runtime.GOMAXPROCS(0)) + } return db, nil } diff --git a/pkg/lightning/config/BUILD.bazel b/pkg/lightning/config/BUILD.bazel index 459bc46dda9fa..22a0ac21dcf02 100644 --- a/pkg/lightning/config/BUILD.bazel +++ b/pkg/lightning/config/BUILD.bazel @@ -42,7 +42,7 @@ go_test( ], embed = [":config"], flaky = True, - shard_count = 49, + shard_count = 50, deps = [ "@com_github_burntsushi_toml//:toml", "@com_github_stretchr_testify//require", diff --git a/pkg/lightning/config/config.go b/pkg/lightning/config/config.go index f7e07f256a09e..24a9c127530f3 100644 --- a/pkg/lightning/config/config.go +++ b/pkg/lightning/config/config.go @@ -78,6 +78,7 @@ const ( DefaultRegionSplitBatchSize = 4096 defaultLogicalImportBatchSize = 96 * units.KiB defaultLogicalImportBatchRows = 65536 + defaultLogicalImportPrepStmt = false // defaultMetaSchemaName is the default database name used to store lightning metadata defaultMetaSchemaName = "lightning_metadata" @@ -1101,6 +1102,7 @@ type TikvImporter struct { StoreWriteBWLimit ByteSize `toml:"store-write-bwlimit" json:"store-write-bwlimit"` LogicalImportBatchSize ByteSize `toml:"logical-import-batch-size" json:"logical-import-batch-size"` LogicalImportBatchRows int `toml:"logical-import-batch-rows" json:"logical-import-batch-rows"` + LogicalImportPrepStmt bool `toml:"logical-import-prep-stmt" json:"logical-import-prep-stmt"` // default is PausePDSchedulerScopeTable to compatible with previous version(>= 6.1) PausePDSchedulerScope PausePDSchedulerScope `toml:"pause-pd-scheduler-scope" json:"pause-pd-scheduler-scope"` @@ -1497,6 +1499,7 @@ func NewConfig() *Config { BlockSize: 16 * 1024, LogicalImportBatchSize: ByteSize(defaultLogicalImportBatchSize), LogicalImportBatchRows: defaultLogicalImportBatchRows, + LogicalImportPrepStmt: defaultLogicalImportPrepStmt, }, PostRestore: PostRestore{ Checksum: OpLevelRequired, diff --git a/pkg/lightning/config/config_test.go b/pkg/lightning/config/config_test.go index 0f3e2f86237bd..b858bfdf790cf 100644 --- a/pkg/lightning/config/config_test.go +++ b/pkg/lightning/config/config_test.go @@ -1039,6 +1039,20 @@ func TestAdjustDiskQuota(t *testing.T) { require.Equal(t, int64(0), int64(cfg.TikvImporter.DiskQuota)) } +func TestAdjustLogicalImportPrepStmt(t *testing.T) { + cfg := NewConfig() + assignMinimalLegalValue(cfg) + ctx := context.Background() + + cfg.TikvImporter.Backend = BackendTiDB + require.NoError(t, cfg.Adjust(ctx)) + require.Equal(t, false, cfg.TikvImporter.LogicalImportPrepStmt) + + cfg.TikvImporter.LogicalImportPrepStmt = true + require.NoError(t, cfg.Adjust(ctx)) + require.Equal(t, true, cfg.TikvImporter.LogicalImportPrepStmt) +} + func TestAdjustConflictStrategy(t *testing.T) { cfg := NewConfig() assignMinimalLegalValue(cfg)