diff --git a/cdc/owner/changefeed.go b/cdc/owner/changefeed.go index 1713618314b..469bb142beb 100755 --- a/cdc/owner/changefeed.go +++ b/cdc/owner/changefeed.go @@ -36,6 +36,7 @@ import ( "github.com/pingcap/tiflow/pkg/config" cerror "github.com/pingcap/tiflow/pkg/errors" pfilter "github.com/pingcap/tiflow/pkg/filter" + "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/pdutil" redoCfg "github.com/pingcap/tiflow/pkg/redo" "github.com/pingcap/tiflow/pkg/sink/observer" @@ -56,9 +57,8 @@ type Changefeed interface { // // It can be called in etcd ticks, so it should never be blocked. // Tick Returns: checkpointTs, minTableBarrierTs - Tick(context.Context, *model.ChangeFeedInfo, - *model.ChangeFeedStatus, - map[model.CaptureID]*model.CaptureInfo) (model.Ts, model.Ts) + Tick(context.Context, *orchestrator.ChangefeedReactorState, + map[model.CaptureID]*model.CaptureInfo) // Close closes the changefeed. Close(ctx context.Context) @@ -266,13 +266,12 @@ func newChangefeed4Test( } func (c *changefeed) Tick(ctx context.Context, - cfInfo *model.ChangeFeedInfo, - cfStatus *model.ChangeFeedStatus, + state *orchestrator.ChangefeedReactorState, captures map[model.CaptureID]*model.CaptureInfo, -) (model.Ts, model.Ts) { +) { startTime := time.Now() - c.latestInfo = cfInfo - c.latestStatus = cfStatus + c.latestInfo = state.Info + c.latestStatus = state.Status // Handle all internal warnings. noMoreWarnings := false for !noMoreWarnings { @@ -288,10 +287,10 @@ func (c *changefeed) Tick(ctx context.Context, if err != nil { c.handleErr(ctx, err) } - return 0, 0 + return } - checkpointTs, minTableBarrierTs, err := c.tick(ctx, captures, cfInfo, cfStatus) + checkpointTs, minTableBarrierTs, err := c.tick(ctx, captures, state) // The tick duration is recorded only if changefeed has completed initialization if c.initialized.Load() { @@ -309,7 +308,7 @@ func (c *changefeed) Tick(ctx context.Context, log.Error("changefeed tick failed", zap.Error(err)) c.handleErr(ctx, err) } - return checkpointTs, minTableBarrierTs + updateStatus(state, checkpointTs, minTableBarrierTs) } func (c *changefeed) Throw(ctx context.Context) func(error) { @@ -378,14 +377,16 @@ func (c *changefeed) checkStaleCheckpointTs( // tick returns the checkpointTs and minTableBarrierTs. func (c *changefeed) tick(ctx context.Context, captures map[model.CaptureID]*model.CaptureInfo, - cfInfo *model.ChangeFeedInfo, - cfStatus *model.ChangeFeedStatus, + state *orchestrator.ChangefeedReactorState, ) (model.Ts, model.Ts, error) { - adminJobPending := c.feedStateManager.Tick(c.resolvedTs, cfStatus, cfInfo) - preCheckpointTs := cfInfo.GetCheckpointTs(cfStatus) + adminJobPending := c.feedStateManager.Tick(c.resolvedTs, state) + preCheckpointTs := state.Checkpoint() + + info := state.Info + status := state.Status // checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()` // to ensure all changefeeds, no matter whether they are running or not, will be checked. - if err := c.checkStaleCheckpointTs(ctx, preCheckpointTs, cfInfo); err != nil { + if err := c.checkStaleCheckpointTs(ctx, preCheckpointTs, info); err != nil { return 0, 0, errors.Trace(err) } @@ -402,7 +403,7 @@ func (c *changefeed) tick(ctx context.Context, if !c.initialized.Load() { initialized, err := c.initializer.TryInitialize(ctx, func(ctx context.Context) error { - return c.initialize(ctx, cfInfo, cfStatus) + return c.initialize(ctx, state) }, c.globalVars.ChangefeedThreadPool) if err != nil { @@ -434,7 +435,7 @@ func (c *changefeed) tick(ctx context.Context, return 0, 0, nil } - err = c.handleBarrier(ctx, cfInfo, cfStatus, barrier) + err = c.handleBarrier(ctx, info, status, barrier) if err != nil { return 0, 0, errors.Trace(err) } @@ -482,16 +483,13 @@ func (c *changefeed) tick(ctx context.Context, } } - pdTime := c.upstream.PDClock.CurrentTime() - currentTs := oracle.GetPhysical(pdTime) - // CheckpointCannotProceed implies that not all tables are being replicated normally, // so in that case there is no need to advance the global watermarks. if watermark.CheckpointTs == scheduler.CheckpointCannotProceed { - if cfStatus != nil { + if status != nil { // We should keep the metrics updated even if the scheduler cannot // advance the watermarks for now. - c.updateMetrics(currentTs, cfStatus.CheckpointTs, c.resolvedTs) + c.updateMetrics() } return 0, 0, nil } @@ -508,36 +506,36 @@ func (c *changefeed) tick(ctx context.Context, } // MinTableBarrierTs should never regress - if barrier.MinTableBarrierTs < cfStatus.MinTableBarrierTs { - barrier.MinTableBarrierTs = cfStatus.MinTableBarrierTs + if barrier.MinTableBarrierTs < status.MinTableBarrierTs { + barrier.MinTableBarrierTs = status.MinTableBarrierTs } failpoint.Inject("ChangefeedOwnerDontUpdateCheckpoint", func() { - if c.lastDDLTs != 0 && cfStatus.CheckpointTs >= c.lastDDLTs { + if c.lastDDLTs != 0 && status.CheckpointTs >= c.lastDDLTs { log.Info("owner won't update checkpoint because of failpoint", zap.String("namespace", c.id.Namespace), zap.String("changefeed", c.id.ID), - zap.Uint64("keepCheckpoint", cfStatus.CheckpointTs), + zap.Uint64("keepCheckpoint", status.CheckpointTs), zap.Uint64("skipCheckpoint", watermark.CheckpointTs)) - watermark.CheckpointTs = cfStatus.CheckpointTs + watermark.CheckpointTs = status.CheckpointTs } }) failpoint.Inject("ChangefeedOwnerNotUpdateCheckpoint", func() { - watermark.CheckpointTs = cfStatus.CheckpointTs + watermark.CheckpointTs = status.CheckpointTs }) - c.updateMetrics(currentTs, watermark.CheckpointTs, c.resolvedTs) + c.latestStatus.CheckpointTs = watermark.CheckpointTs + c.updateMetrics() c.tickDownstreamObserver(ctx) return watermark.CheckpointTs, barrier.MinTableBarrierTs, nil } func (c *changefeed) initialize(ctx context.Context, - cfInfo *model.ChangeFeedInfo, - cfStatus *model.ChangeFeedStatus, + state *orchestrator.ChangefeedReactorState, ) (err error) { - if c.initialized.Load() || cfStatus == nil { + if c.initialized.Load() || state.Status == nil { // If `c.latestStatus` is nil it means the changefeed struct is just created, it needs to // 1. use startTs as checkpointTs and resolvedTs, if it's a new created changefeed; or // 2. load checkpointTs and resolvedTs from etcd, if it's an existing changefeed. @@ -566,12 +564,14 @@ LOOP2: } } - checkpointTs := cfStatus.CheckpointTs + status := state.Status + info := state.Info + checkpointTs := status.CheckpointTs if c.resolvedTs == 0 { c.resolvedTs = checkpointTs } - minTableBarrierTs := cfStatus.MinTableBarrierTs + minTableBarrierTs := status.MinTableBarrierTs failpoint.Inject("NewChangefeedNoRetryError", func() { failpoint.Return(cerror.ErrStartTsBeforeGC.GenWithStackByArgs(checkpointTs-300, checkpointTs)) @@ -580,7 +580,7 @@ LOOP2: failpoint.Return(errors.New("failpoint injected retriable error")) }) - if cfInfo.Config.CheckGCSafePoint { + if info.Config.CheckGCSafePoint { // Check TiDB GC safepoint does not exceed the checkpoint. // // We update TTL to 10 minutes, @@ -634,10 +634,10 @@ LOOP2: } c.barriers = newBarriers() - if util.GetOrZero(cfInfo.Config.EnableSyncPoint) { + if util.GetOrZero(info.Config.EnableSyncPoint) { // firstSyncPointStartTs = k * syncPointInterval, // which >= startTs, and choose the minimal k - syncPointInterval := util.GetOrZero(cfInfo.Config.SyncPointInterval) + syncPointInterval := util.GetOrZero(info.Config.SyncPointInterval) k := oracle.GetTimeFromTS(c.resolvedTs).Sub(time.Unix(0, 0)) / syncPointInterval if oracle.GetTimeFromTS(c.resolvedTs).Sub(time.Unix(0, 0))%syncPointInterval != 0 || oracle.ExtractLogical(c.resolvedTs) != 0 { k += 1 @@ -645,15 +645,15 @@ LOOP2: firstSyncPointTs := oracle.GoTimeToTS(time.Unix(0, 0).Add(k * syncPointInterval)) c.barriers.Update(syncPointBarrier, firstSyncPointTs) } - c.barriers.Update(finishBarrier, cfInfo.GetTargetTs()) + c.barriers.Update(finishBarrier, info.GetTargetTs()) - filter, err := pfilter.NewFilter(cfInfo.Config, "") + filter, err := pfilter.NewFilter(info.Config, "") if err != nil { return errors.Trace(err) } c.schema, err = entry.NewSchemaStorage( c.upstream.KVStorage, ddlStartTs, - cfInfo.Config.ForceReplicate, c.id, util.RoleOwner, filter) + info.Config.ForceReplicate, c.id, util.RoleOwner, filter) if err != nil { return errors.Trace(err) } @@ -665,10 +665,10 @@ LOOP2: if err != nil { return errors.Trace(err) } - cfInfo.Config.Sink.TiDBSourceID = sourceID + info.Config.Sink.TiDBSourceID = sourceID log.Info("get sourceID from PD", zap.Uint64("sourceID", sourceID), zap.Stringer("changefeedID", c.id)) - c.ddlSink = c.newSink(c.id, cfInfo, c.Throw(ctx), func(err error) { + c.ddlSink = c.newSink(c.id, info, c.Throw(ctx), func(err error) { select { case <-ctx.Done(): case c.warningCh <- err: @@ -683,13 +683,13 @@ LOOP2: c.Throw(ctx)(c.ddlPuller.Run(cancelCtx)) }() - c.downstreamObserver, err = c.newDownstreamObserver(ctx, c.id, cfInfo.SinkURI, cfInfo.Config) + c.downstreamObserver, err = c.newDownstreamObserver(ctx, c.id, info.SinkURI, info.Config) if err != nil { return err } c.observerLastTick = atomic.NewTime(time.Time{}) - c.redoDDLMgr = redo.NewDDLManager(c.id, cfInfo.Config.Consistent, ddlStartTs) + c.redoDDLMgr = redo.NewDDLManager(c.id, info.Config.Consistent, ddlStartTs) if c.redoDDLMgr.Enabled() { c.wg.Add(1) go func() { @@ -698,7 +698,7 @@ LOOP2: }() } - c.redoMetaMgr = redo.NewMetaManager(c.id, cfInfo.Config.Consistent, checkpointTs) + c.redoMetaMgr = redo.NewMetaManager(c.id, info.Config.Consistent, checkpointTs) if c.redoMetaMgr.Enabled() { c.wg.Add(1) go func() { @@ -713,22 +713,22 @@ LOOP2: c.ddlManager = newDDLManager( c.id, ddlStartTs, - cfStatus.CheckpointTs, + checkpointTs, c.ddlSink, filter, c.ddlPuller, c.schema, c.redoDDLMgr, c.redoMetaMgr, - util.GetOrZero(cfInfo.Config.BDRMode), - cfInfo.Config.Sink.ShouldSendAllBootstrapAtStart(), + util.GetOrZero(info.Config.BDRMode), + info.Config.Sink.ShouldSendAllBootstrapAtStart(), c.Throw(ctx), ) // create scheduler cfg := *c.cfg - cfg.ChangefeedSettings = cfInfo.Config.Scheduler - epoch := cfInfo.Epoch + cfg.ChangefeedSettings = info.Config.Scheduler + epoch := info.Epoch c.scheduler, err = c.newScheduler(ctx, c.id, c.upstream, epoch, &cfg, c.redoMetaMgr, c.globalVars) if err != nil { return errors.Trace(err) @@ -737,7 +737,7 @@ LOOP2: c.initMetrics() c.initialized.Store(true) - c.metricsChangefeedCreateTimeGuage.Set(float64(oracle.GetPhysical(cfInfo.CreateTime))) + c.metricsChangefeedCreateTimeGuage.Set(float64(oracle.GetPhysical(info.CreateTime))) c.metricsChangefeedRestartTimeGauge.Set(float64(oracle.GetPhysical(time.Now()))) log.Info("changefeed initialized", zap.String("namespace", c.id.Namespace), @@ -745,7 +745,7 @@ LOOP2: zap.Uint64("changefeedEpoch", epoch), zap.Uint64("checkpointTs", checkpointTs), zap.Uint64("resolvedTs", c.resolvedTs), - zap.String("info", cfInfo.String())) + zap.String("info", info.String())) return nil } @@ -800,11 +800,7 @@ func (c *changefeed) releaseResources(ctx context.Context) { c.wg.Wait() if c.ddlSink != nil { - canceledCtx, cancel := context.WithCancel(context.Background()) - cancel() - // TODO(dongmen): remove ctx from func ddlSink.close(), it is useless. - // We don't need to wait ddlSink Close, pass a canceled context is ok - if err := c.ddlSink.close(canceledCtx); err != nil { + if err := c.ddlSink.close(); err != nil { log.Warn("owner close ddlSink failed", zap.String("namespace", c.id.Namespace), zap.String("changefeed", c.id.ID), @@ -978,7 +974,11 @@ func (c *changefeed) handleBarrier(ctx context.Context, return nil } -func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs model.Ts) { +func (c *changefeed) updateMetrics() { + pdTime := c.upstream.PDClock.CurrentTime() + currentTs := oracle.GetPhysical(pdTime) + + checkpointTs := c.latestStatus.CheckpointTs phyCkpTs := oracle.ExtractPhysical(checkpointTs) c.metricsChangefeedCheckpointTsGauge.Set(float64(phyCkpTs)) @@ -986,7 +986,7 @@ func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs mod c.metricsChangefeedCheckpointTsLagGauge.Set(checkpointLag) c.metricsChangefeedCheckpointLagDuration.Observe(checkpointLag) - phyRTs := oracle.ExtractPhysical(resolvedTs) + phyRTs := oracle.ExtractPhysical(c.resolvedTs) c.metricsChangefeedResolvedTsGauge.Set(float64(phyRTs)) resolvedLag := float64(currentTs-phyRTs) / 1e3 diff --git a/cdc/owner/changefeed_test.go b/cdc/owner/changefeed_test.go index 4db8a77b1f1..448b88f5543 100644 --- a/cdc/owner/changefeed_test.go +++ b/cdc/owner/changefeed_test.go @@ -148,7 +148,7 @@ func (m *mockDDLSink) getCheckpointTsAndTableNames() (uint64, []*model.TableInfo return m.mu.checkpointTs, m.mu.currentTables } -func (m *mockDDLSink) close(_ context.Context) error { +func (m *mockDDLSink) close() error { m.wg.Wait() return nil } @@ -307,7 +307,7 @@ func TestInitialize(t *testing.T) { // initialize globalvars.EtcdClient = &etcd.CDCEtcdClientImpl{} - cf.Tick(ctx, state.Info, state.Status, captures) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() require.Equal(t, state.Status.CheckpointTs, changefeedInfo.StartTs) } @@ -323,12 +323,12 @@ func TestChangefeedHandleError(t *testing.T) { tester.MustApplyPatches() // initialize - cf.Tick(ctx, state.Info, state.Status, captures) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() cf.errCh <- errors.New("fake error") // handle error - cf.Tick(ctx, state.Info, state.Status, captures) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() require.Equal(t, state.Status.CheckpointTs, changefeedInfo.StartTs) require.Equal(t, state.Info.Error.Message, "fake error") @@ -353,11 +353,11 @@ func TestTrySendBootstrapMeetError(t *testing.T) { // initialize state.Info.Config.Sink.Protocol = util.AddressOf("simple") state.Info.Config.Sink.SendAllBootstrapAtStart = util.AddressOf(true) - cf.Tick(ctx, state.Info, state.Status, captures) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() require.Eventually(t, func() bool { - cf.Tick(ctx, state.Info, state.Status, captures) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() if state.Info.Error != nil { return state.Info.State == model.StatePending @@ -382,11 +382,9 @@ func TestExecDDL(t *testing.T) { cf.upstream.KVStorage = helper.Storage() defer cf.Close(ctx) tickTwoTime := func() { - checkpointTs, minTableBarrierTs := cf.Tick(ctx, state.Info, state.Status, captures) - updateStatus(state, checkpointTs, minTableBarrierTs) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() - checkpointTs, minTableBarrierTs = cf.Tick(ctx, state.Info, state.Status, captures) - updateStatus(state, checkpointTs, minTableBarrierTs) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() } // pre check and initialize @@ -468,14 +466,11 @@ func TestEmitCheckpointTs(t *testing.T) { defer cf.Close(ctx) tickThreeTime := func() { - checkpointTs, minTableBarrierTs := cf.Tick(ctx, state.Info, state.Status, captures) - updateStatus(state, checkpointTs, minTableBarrierTs) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() - checkpointTs, minTableBarrierTs = cf.Tick(ctx, state.Info, state.Status, captures) - updateStatus(state, checkpointTs, minTableBarrierTs) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() - checkpointTs, minTableBarrierTs = cf.Tick(ctx, state.Info, state.Status, captures) - updateStatus(state, checkpointTs, minTableBarrierTs) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() } // pre check and initialize @@ -539,7 +534,7 @@ func TestSyncPoint(t *testing.T) { tester.MustApplyPatches() // initialize - cf.Tick(ctx, state.Info, state.Status, captures) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() mockDDLPuller := cf.ddlManager.ddlPuller.(*mockDDLPuller) @@ -548,8 +543,7 @@ func TestSyncPoint(t *testing.T) { mockDDLPuller.resolvedTs = oracle.GoTimeToTS(oracle.GetTimeFromTS(mockDDLPuller.resolvedTs).Add(5 * time.Second)) // tick 20 times for i := 0; i <= 20; i++ { - checkpointTs, minTableBarrierTs := cf.Tick(ctx, state.Info, state.Status, captures) - updateStatus(state, checkpointTs, minTableBarrierTs) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() } for i := 1; i < len(mockDDLSink.syncPointHis); i++ { @@ -572,15 +566,14 @@ func TestFinished(t *testing.T) { tester.MustApplyPatches() // initialize - cf.Tick(ctx, state.Info, state.Status, captures) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() mockDDLPuller := cf.ddlManager.ddlPuller.(*mockDDLPuller) mockDDLPuller.resolvedTs += 2000 // tick many times to make sure the change feed is stopped for i := 0; i <= 10; i++ { - checkpointTs, minTableBarrierTs := cf.Tick(ctx, state.Info, state.Status, captures) - updateStatus(state, checkpointTs, minTableBarrierTs) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() } fmt.Println("checkpoint ts", state.Status.CheckpointTs) @@ -643,7 +636,7 @@ func testChangefeedReleaseResource( tester.MustApplyPatches() // initialize - cf.Tick(ctx, state.Info, state.Status, captures) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() require.Equal(t, cf.initialized.Load(), expectedInitialized) @@ -661,8 +654,8 @@ func testChangefeedReleaseResource( }) cf.isReleased = false // changefeed tick will release resources - cf.Tick(ctx, state.Info, state.Status, captures) - require.Nil(t, err) + cf.Tick(ctx, state, captures) + require.NoError(t, err) cancel() if state.Info.Config.Consistent.UseFileBackend { @@ -695,7 +688,7 @@ func TestBarrierAdvance(t *testing.T) { MinTableBarrierTs: state.Info.StartTs + 5, } // Do the preflightCheck and initialize the changefeed. - cf.Tick(ctx, state.Info, state.Status, captures) + cf.Tick(ctx, state, captures) tester.MustApplyPatches() if i == 1 { cf.ddlManager.ddlResolvedTs += 10 diff --git a/cdc/owner/ddl_sink.go b/cdc/owner/ddl_sink.go index ecbe8c3444a..538bec8df72 100644 --- a/cdc/owner/ddl_sink.go +++ b/cdc/owner/ddl_sink.go @@ -58,7 +58,7 @@ type DDLSink interface { // It will return after the bootstrap event is sent. emitBootstrap(ctx context.Context, bootstrap *model.DDLEvent) error // close the ddlsink, cancel running goroutine. - close(ctx context.Context) error + close() error } type ddlSinkImpl struct { @@ -406,7 +406,7 @@ func (s *ddlSinkImpl) emitSyncPoint(ctx context.Context, checkpointTs uint64) (e } } -func (s *ddlSinkImpl) close(ctx context.Context) (err error) { +func (s *ddlSinkImpl) close() (err error) { s.cancel() s.wg.Wait() diff --git a/cdc/owner/ddl_sink_test.go b/cdc/owner/ddl_sink_test.go index 4cf6780204a..52ef2abe092 100644 --- a/cdc/owner/ddl_sink_test.go +++ b/cdc/owner/ddl_sink_test.go @@ -82,7 +82,7 @@ func TestCheckpoint(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer func() { cancel() - ddlSink.close(ctx) + ddlSink.close() }() ddlSink.run(ctx) @@ -106,7 +106,7 @@ func TestExecDDLEvents(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer func() { cancel() - ddlSink.close(ctx) + ddlSink.close() }() ddlSink.run(ctx) @@ -150,7 +150,7 @@ func TestExecDDLError(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer func() { cancel() - ddlSink.close(ctx) + ddlSink.close() }() ddlSink.run(ctx) diff --git a/cdc/owner/feed_state_manager.go b/cdc/owner/feed_state_manager.go index 1c2c87cd85c..f7a89380099 100644 --- a/cdc/owner/feed_state_manager.go +++ b/cdc/owner/feed_state_manager.go @@ -22,6 +22,7 @@ import ( "github.com/pingcap/log" "github.com/pingcap/tiflow/cdc/model" cerrors "github.com/pingcap/tiflow/pkg/errors" + "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/upstream" "github.com/pingcap/tiflow/pkg/util" "github.com/tikv/client-go/v2/oracle" @@ -51,7 +52,7 @@ type FeedStateManager interface { // Tick is the main logic of the FeedStateManager, it will be called periodically // resolvedTs is the resolvedTs of the changefeed // returns true if there is a pending admin job, if so changefeed should not run the tick logic - Tick(resolvedTs model.Ts, status *model.ChangeFeedStatus, info *model.ChangeFeedInfo) (adminJobPending bool) + Tick(resolvedTs model.Ts, state *orchestrator.ChangefeedReactorState) (adminJobPending bool) // HandleError is called an error occurs in Changefeed.Tick HandleError(errs ...*model.RunningError) // HandleWarning is called a warning occurs in Changefeed.Tick @@ -141,8 +142,9 @@ func (m *feedStateManager) resetErrRetry() { } func (m *feedStateManager) Tick(resolvedTs model.Ts, - status *model.ChangeFeedStatus, info *model.ChangeFeedInfo, + state *orchestrator.ChangefeedReactorState, ) (adminJobPending bool) { + status := state.Status m.checkAndInitLastRetryCheckpointTs(status) if status != nil { @@ -181,7 +183,7 @@ func (m *feedStateManager) Tick(resolvedTs model.Ts, return } - switch info.State { + switch state.Info.State { case model.StateUnInitialized: m.patchState(model.StateNormal) return diff --git a/cdc/owner/feed_state_manager_test.go b/cdc/owner/feed_state_manager_test.go index 72250a499a6..02c462dff96 100644 --- a/cdc/owner/feed_state_manager_test.go +++ b/cdc/owner/feed_state_manager_test.go @@ -86,7 +86,7 @@ func TestHandleJob(t *testing.T) { return &model.ChangeFeedStatus{}, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) @@ -95,7 +95,7 @@ func TestHandleJob(t *testing.T) { CfID: model.DefaultChangeFeedID("fake-changefeed-id"), Type: model.AdminStop, }) - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) @@ -104,7 +104,7 @@ func TestHandleJob(t *testing.T) { CfID: model.DefaultChangeFeedID(changefeedInfo.ID), Type: model.AdminResume, }) - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) @@ -113,7 +113,7 @@ func TestHandleJob(t *testing.T) { CfID: model.DefaultChangeFeedID(changefeedInfo.ID), Type: model.AdminStop, }) - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) @@ -127,7 +127,7 @@ func TestHandleJob(t *testing.T) { CfID: model.DefaultChangeFeedID(changefeedInfo.ID), Type: model.AdminResume, }) - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) require.False(t, manager.ShouldRemoved()) @@ -140,7 +140,7 @@ func TestHandleJob(t *testing.T) { CfID: model.DefaultChangeFeedID(changefeedInfo.ID), Type: model.AdminRemove, }) - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) @@ -164,7 +164,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { }) tester.MustApplyPatches() manager.state = state - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) @@ -173,7 +173,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { CfID: model.DefaultChangeFeedID(changefeedInfo.ID), Type: model.AdminStop, }) - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) @@ -188,7 +188,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { Type: model.AdminResume, OverwriteCheckpointTs: 100, }) - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) require.False(t, manager.ShouldRemoved()) @@ -206,7 +206,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.Equal(t, state.Info.State, model.StateFailed) require.Equal(t, state.Info.AdminJobType, model.AdminStop) @@ -219,7 +219,7 @@ func TestResumeChangefeedWithCheckpointTs(t *testing.T) { Type: model.AdminResume, OverwriteCheckpointTs: 200, }) - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) require.False(t, manager.ShouldRemoved()) @@ -245,12 +245,12 @@ func TestMarkFinished(t *testing.T) { }) tester.MustApplyPatches() manager.state = state - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) manager.MarkFinished() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) @@ -280,12 +280,12 @@ func TestCleanUpInfos(t *testing.T) { tester.MustApplyPatches() require.Contains(t, state.TaskPositions, globalVars.CaptureInfo.ID) manager.state = state - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) manager.MarkFinished() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) require.Equal(t, state.Info.State, model.StateFinished) @@ -313,7 +313,7 @@ func TestHandleError(t *testing.T) { tester.MustApplyPatches() manager.state = state - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() intervals := []time.Duration{200, 400, 800, 1600, 1600} @@ -332,19 +332,19 @@ func TestHandleError(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) require.Equal(t, state.Info.State, model.StatePending) require.Equal(t, state.Info.AdminJobType, model.AdminStop) require.Equal(t, state.Status.AdminJobType, model.AdminStop) time.Sleep(d) - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() } // no error tick, state should be transferred from pending to warning - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) require.True(t, manager.ShouldRunning()) require.Equal(t, model.StateWarning, state.Info.State) require.Equal(t, model.AdminNone, state.Info.AdminJobType) @@ -358,7 +358,7 @@ func TestHandleError(t *testing.T) { return status, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) state.PatchStatus( @@ -366,7 +366,7 @@ func TestHandleError(t *testing.T) { status.CheckpointTs += 1 return status, true, nil }) - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.Equal(t, model.StateNormal, state.Info.State) require.Equal(t, model.AdminNone, state.Info.AdminJobType) @@ -393,7 +393,7 @@ func TestHandleFastFailError(t *testing.T) { }) tester.MustApplyPatches() manager.state = state - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) // test handling fast failed error with non-nil ChangeFeedInfo tester.MustApplyPatches() // test handling fast failed error with nil ChangeFeedInfo @@ -401,7 +401,7 @@ func TestHandleFastFailError(t *testing.T) { state.PatchInfo(func(info *model.ChangeFeedInfo) (*model.ChangeFeedInfo, bool, error) { return nil, true, nil }) - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) // When the patches are applied, the callback function of PatchInfo in feedStateManager.HandleError will be called. // At that time, the nil pointer will be checked instead of throwing a panic. See issue #3128 for more detail. tester.MustApplyPatches() @@ -482,7 +482,7 @@ func TestChangefeedStatusNotExist(t *testing.T) { ): "d563bfc0-f406-4f34-bc7d-6dc2e35a44e5", }) manager.state = state - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) require.False(t, manager.ShouldRunning()) require.False(t, manager.ShouldRemoved()) tester.MustApplyPatches() @@ -491,7 +491,7 @@ func TestChangefeedStatusNotExist(t *testing.T) { CfID: model.DefaultChangeFeedID(changefeedConfig.ID), Type: model.AdminRemove, }) - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) require.False(t, manager.ShouldRunning()) require.True(t, manager.ShouldRemoved()) tester.MustApplyPatches() @@ -513,7 +513,7 @@ func TestChangefeedNotRetry(t *testing.T) { }) tester.MustApplyPatches() manager.state = state - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) require.True(t, manager.ShouldRunning()) // changefeed in error state but error can be retried @@ -531,7 +531,7 @@ func TestChangefeedNotRetry(t *testing.T) { }, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) require.True(t, manager.ShouldRunning()) state.PatchTaskPosition("test", @@ -548,7 +548,7 @@ func TestChangefeedNotRetry(t *testing.T) { return position, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) require.False(t, manager.ShouldRunning()) state.PatchTaskPosition("test", @@ -564,7 +564,7 @@ func TestChangefeedNotRetry(t *testing.T) { return position, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) // should be false require.False(t, manager.ShouldRunning()) @@ -581,7 +581,7 @@ func TestChangefeedNotRetry(t *testing.T) { return position, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) // should be false require.False(t, manager.ShouldRunning()) } @@ -604,7 +604,7 @@ func TestBackoffStopsUnexpectedly(t *testing.T) { tester.MustApplyPatches() manager.state = state - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() for i := 1; i <= 10; i++ { @@ -632,7 +632,7 @@ func TestBackoffStopsUnexpectedly(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() // If an error occurs, backing off from running the task. require.False(t, manager.ShouldRunning()) @@ -644,7 +644,7 @@ func TestBackoffStopsUnexpectedly(t *testing.T) { // 500ms is the backoff interval, so sleep 500ms and after a manager // tick, the changefeed will turn into normal state time.Sleep(500 * time.Millisecond) - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() } } @@ -667,7 +667,7 @@ func TestBackoffNeverStops(t *testing.T) { tester.MustApplyPatches() manager.state = state - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() for i := 1; i <= 30; i++ { @@ -686,7 +686,7 @@ func TestBackoffNeverStops(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) require.Equal(t, model.StatePending, state.Info.State) @@ -695,7 +695,7 @@ func TestBackoffNeverStops(t *testing.T) { // 100ms is the backoff interval, so sleep 100ms and after a manager tick, // the changefeed will turn into normal state time.Sleep(100 * time.Millisecond) - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() } } @@ -718,7 +718,7 @@ func TestUpdateChangefeedEpoch(t *testing.T) { tester.MustApplyPatches() manager.state = state - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.Equal(t, state.Info.State, model.StateNormal) require.True(t, manager.ShouldRunning()) @@ -738,7 +738,7 @@ func TestUpdateChangefeedEpoch(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) require.Equal(t, model.StatePending, state.Info.State, i) @@ -775,7 +775,7 @@ func TestHandleWarning(t *testing.T) { tester.MustApplyPatches() manager.state = state - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.Equal(t, model.StateNormal, state.Info.State) require.True(t, manager.ShouldRunning()) @@ -791,7 +791,7 @@ func TestHandleWarning(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) // some patches will be generated when the manager.Tick is called // so we need to apply the patches before we check the state tester.MustApplyPatches() @@ -807,7 +807,7 @@ func TestHandleWarning(t *testing.T) { }, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.Equal(t, model.StateWarning, state.Info.State) require.True(t, manager.ShouldRunning()) @@ -821,7 +821,7 @@ func TestHandleWarning(t *testing.T) { }, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.Equal(t, model.StateNormal, state.Info.State) require.True(t, manager.ShouldRunning()) @@ -838,7 +838,7 @@ func TestHandleWarning(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) // some patches will be generated when the manager.Tick is called // so we need to apply the patches before we check the state tester.MustApplyPatches() @@ -858,7 +858,7 @@ func TestHandleWarning(t *testing.T) { manager.checkpointTsAdvanced = manager. checkpointTsAdvanced.Add(-(manager.changefeedErrorStuckDuration + 1)) // resolveTs = 202 > checkpointTs = 201 - manager.Tick(202, state.Status, state.Info) + manager.Tick(202, state) // some patches will be generated when the manager.Tick is called // so we need to apply the patches before we check the state tester.MustApplyPatches() @@ -888,7 +888,7 @@ func TestErrorAfterWarning(t *testing.T) { tester.MustApplyPatches() manager.state = state - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.Equal(t, model.StateNormal, state.Info.State) require.True(t, manager.ShouldRunning()) @@ -904,7 +904,7 @@ func TestErrorAfterWarning(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) // some patches will be generated when the manager.Tick is called // so we need to apply the patches before we check the state tester.MustApplyPatches() @@ -920,7 +920,7 @@ func TestErrorAfterWarning(t *testing.T) { }, true, nil }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.Equal(t, model.StateWarning, state.Info.State) require.True(t, manager.ShouldRunning()) @@ -939,13 +939,13 @@ func TestErrorAfterWarning(t *testing.T) { }) tester.MustApplyPatches() - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) // some patches will be generated when the manager.Tick is called // so we need to apply the patches before we check the state tester.MustApplyPatches() require.Equal(t, model.StatePending, state.Info.State) require.False(t, manager.ShouldRunning()) - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) // some patches will be generated when the manager.Tick is called // so we need to apply the patches before we check the state @@ -976,7 +976,7 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { }) tester.MustApplyPatches() - manager.Tick(200, state.Status, state.Info) + manager.Tick(200, state) tester.MustApplyPatches() require.Equal(t, model.StateNormal, state.Info.State) require.True(t, manager.ShouldRunning()) @@ -992,7 +992,7 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(200, state.Status, state.Info) + manager.Tick(200, state) // some patches will be generated when the manager.Tick is called // so we need to apply the patches before we check the state tester.MustApplyPatches() @@ -1017,7 +1017,7 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(200, state.Status, state.Info) + manager.Tick(200, state) tester.MustApplyPatches() require.Equal(t, model.StateWarning, state.Info.State) require.True(t, manager.ShouldRunning()) @@ -1038,7 +1038,7 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(400, state.Status, state.Info) + manager.Tick(400, state) tester.MustApplyPatches() require.Equal(t, model.StateWarning, state.Info.State) require.True(t, manager.ShouldRunning()) @@ -1060,7 +1060,7 @@ func TestHandleWarningWhileAdvanceResolvedTs(t *testing.T) { }}, true, nil }) tester.MustApplyPatches() - manager.Tick(400, state.Status, state.Info) + manager.Tick(400, state) tester.MustApplyPatches() require.Equal(t, model.StateFailed, state.Info.State) require.False(t, manager.ShouldRunning()) @@ -1082,7 +1082,7 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { }) tester.MustApplyPatches() manager.state = state - manager.Tick(0, state.Status, state.Info) + manager.Tick(0, state) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) @@ -1097,7 +1097,7 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { }) tester.MustApplyPatches() time.Sleep(stuckDuration - time.Second) - manager.Tick(100, state.Status, state.Info) + manager.Tick(100, state) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) require.Less(t, manager.changefeedErrorStuckDuration, stuckDuration) @@ -1125,7 +1125,7 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { OverwriteCheckpointTs: 100, }) - manager.Tick(101, state.Status, state.Info) + manager.Tick(101, state) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) require.False(t, manager.ShouldRemoved()) @@ -1145,7 +1145,7 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { tester.MustApplyPatches() time.Sleep(stuckDuration - time.Second) - manager.Tick(200, state.Status, state.Info) + manager.Tick(200, state) tester.MustApplyPatches() require.True(t, manager.ShouldRunning()) require.Equal(t, state.Info.State, model.StateWarning) @@ -1161,7 +1161,7 @@ func TestUpdateChangefeedWithChangefeedErrorStuckDuration(t *testing.T) { tester.MustApplyPatches() time.Sleep(time.Second) - manager.Tick(201, state.Status, state.Info) + manager.Tick(201, state) tester.MustApplyPatches() require.False(t, manager.ShouldRunning()) require.Equal(t, state.Info.State, model.StateFailed) diff --git a/cdc/owner/owner.go b/cdc/owner/owner.go index 51c56142627..e90db5b5064 100644 --- a/cdc/owner/owner.go +++ b/cdc/owner/owner.go @@ -106,7 +106,6 @@ type Owner interface { type ownerImpl struct { changefeeds map[model.ChangeFeedID]*changefeed - captures map[model.CaptureID]*model.CaptureInfo upstreamManager *upstream.Manager ownerJobQueue struct { sync.Mutex @@ -167,7 +166,7 @@ func NewOwner( } // Tick implements the Reactor interface -func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorState) (nextState orchestrator.ReactorState, err error) { +func (o *ownerImpl) Tick(ctx context.Context, rawState orchestrator.ReactorState) (nextState orchestrator.ReactorState, err error) { failpoint.Inject("owner-run-with-error", func() { failpoint.Return(nil, errors.New("owner run with injected error")) }) @@ -181,16 +180,16 @@ func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorSt return state, nil } - o.captures = state.Captures o.updateMetrics() + captures := state.Captures // handleJobs() should be called before clusterVersionConsistent(), because // when there are different versions of cdc nodes in the cluster, // the admin job may not be processed all the time. And http api relies on // admin job, which will cause all http api unavailable. - o.handleJobs(stdCtx) + o.handleJobs(ctx, captures) - if !o.clusterVersionConsistent(o.captures) { + if !o.clusterVersionConsistent(captures) { return state, nil } @@ -199,16 +198,12 @@ func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorSt // initializing. // // See more gc doc. - if err = o.updateGCSafepoint(stdCtx, state); err != nil { + if err = o.updateGCSafepoint(ctx, state); err != nil { return nil, errors.Trace(err) } // Tick all changefeeds. for changefeedID, changefeedState := range state.Changefeeds { - // check if we are the changefeed owner to handle this changefeed - if !o.shouldHandleChangefeed(changefeedState) { - continue - } if changefeedState.Info == nil { o.cleanUpChangefeed(changefeedState) if cfReactor, ok := o.changefeeds[changefeedID]; ok { @@ -229,12 +224,10 @@ func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorSt o.changefeeds[changefeedID] = cfReactor } changefeedState.CheckCaptureAlive(o.globalVars.CaptureInfo.ID) - captures := o.getChangefeedCaptures(changefeedState, state) if !preflightCheck(changefeedState, captures) { continue } - checkpointTs, minTableBarrierTs := cfReactor.Tick(stdCtx, changefeedState.Info, changefeedState.Status, captures) - updateStatus(changefeedState, checkpointTs, minTableBarrierTs) + cfReactor.Tick(ctx, changefeedState, captures) } o.changefeedTicked = true @@ -244,7 +237,7 @@ func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorSt if _, exist := state.Changefeeds[changefeedID]; exist { continue } - reactor.Close(stdCtx) + reactor.Close(ctx) delete(o.changefeeds, changefeedID) } } @@ -252,12 +245,12 @@ func (o *ownerImpl) Tick(stdCtx context.Context, rawState orchestrator.ReactorSt // Close and cleanup all changefeeds. if atomic.LoadInt32(&o.closed) != 0 { for _, reactor := range o.changefeeds { - reactor.Close(stdCtx) + reactor.Close(ctx) } return state, cerror.ErrReactorFinished.GenWithStackByArgs() } - if err := o.upstreamManager.Tick(stdCtx, state); err != nil { + if err := o.upstreamManager.Tick(ctx, state); err != nil { return state, errors.Trace(err) } return state, nil @@ -346,18 +339,6 @@ func updateStatus(changefeed *orchestrator.ChangefeedReactorState, }) } -// shouldHandleChangefeed returns whether the owner should handle the changefeed. -func (o *ownerImpl) shouldHandleChangefeed(_ *orchestrator.ChangefeedReactorState) bool { - return true -} - -// getChangefeedCaptures returns the captures to run the changefeed. -func (o *ownerImpl) getChangefeedCaptures(_ *orchestrator.ChangefeedReactorState, - globalStates *orchestrator.GlobalReactorState, -) map[model.CaptureID]*model.CaptureInfo { - return globalStates.Captures -} - // EnqueueJob enqueues an admin job into an internal queue, // and the Owner will handle the job in the next tick // `done` must be buffered to prevent blocking owner. @@ -501,6 +482,7 @@ func (o *ownerImpl) updateMetrics() { ownershipCounter.Add(float64(now.Sub(o.lastTickTime)) / float64(time.Second)) o.lastTickTime = now + // todo: move this to each changefeed individually ? for cfID, cf := range o.changefeeds { if cf.latestInfo != nil { changefeedStatusGauge.WithLabelValues(cfID.Namespace, cfID.ID). @@ -599,7 +581,7 @@ func (o *ownerImpl) handleDrainCaptures(ctx context.Context, query *scheduler.Qu close(done) } -func (o *ownerImpl) handleJobs(ctx context.Context) { +func (o *ownerImpl) handleJobs(ctx context.Context, captures map[model.CaptureID]*model.CaptureInfo) { jobs := o.takeOwnerJobs() for _, job := range jobs { changefeedID := job.ChangefeedID @@ -627,7 +609,7 @@ func (o *ownerImpl) handleJobs(ctx context.Context) { cfReactor.scheduler.Rebalance() } case ownerJobTypeQuery: - job.done <- o.handleQueries(job.query) + job.done <- o.handleQueries(job.query, captures) case ownerJobTypeDebugInfo: // TODO: implement this function } @@ -635,7 +617,7 @@ func (o *ownerImpl) handleJobs(ctx context.Context) { } } -func (o *ownerImpl) handleQueries(query *Query) error { +func (o *ownerImpl) handleQueries(query *Query, captures map[model.CaptureID]*model.CaptureInfo) error { switch query.Tp { case QueryAllChangeFeedSCheckpointTs: ret := make(map[model.ChangeFeedID]uint64) @@ -750,7 +732,7 @@ func (o *ownerImpl) handleQueries(query *Query) error { query.Data = ret case QueryCaptures: var ret []*model.CaptureInfo - for _, captureInfo := range o.captures { + for _, captureInfo := range captures { ret = append(ret, &model.CaptureInfo{ ID: captureInfo.ID, AdvertiseAddr: captureInfo.AdvertiseAddr, @@ -759,7 +741,7 @@ func (o *ownerImpl) handleQueries(query *Query) error { } query.Data = ret case QueryHealth: - query.Data = o.isHealthy() + query.Data = o.isHealthy(captures) case QueryOwner: _, exist := o.changefeeds[query.ChangeFeedID] query.Data = exist @@ -770,14 +752,14 @@ func (o *ownerImpl) handleQueries(query *Query) error { return nil } -func (o *ownerImpl) isHealthy() bool { +func (o *ownerImpl) isHealthy(captures map[model.CaptureID]*model.CaptureInfo) bool { if !o.changefeedTicked { // Owner has not yet tick changefeeds, some changefeeds may be not // initialized. log.Warn("owner is not healthy since changefeeds are not ticked") return false } - if !o.clusterVersionConsistent(o.captures) { + if !o.clusterVersionConsistent(captures) { return false } for _, changefeed := range o.changefeeds { diff --git a/cdc/owner/owner_test.go b/cdc/owner/owner_test.go index 2858ca7408a..42b13d8b891 100644 --- a/cdc/owner/owner_test.go +++ b/cdc/owner/owner_test.go @@ -574,14 +574,14 @@ func TestIsHealthyWithAbnormalChangefeeds(t *testing.T) { query := &Query{Tp: QueryHealth} // no changefeed at the first, should be healthy - err := o.handleQueries(query) + err := o.handleQueries(query, nil) require.NoError(t, err) require.True(t, query.Data.(bool)) // 1 changefeed, state is nil cf := &changefeed{} o.changefeeds[model.ChangeFeedID{ID: "1"}] = cf - err = o.handleQueries(query) + err = o.handleQueries(query, nil) require.NoError(t, err) require.True(t, query.Data.(bool)) @@ -591,7 +591,7 @@ func TestIsHealthyWithAbnormalChangefeeds(t *testing.T) { } cf.latestInfo = state.Info cf.latestStatus = state.Status - err = o.handleQueries(query) + err = o.handleQueries(query, nil) require.NoError(t, err) require.True(t, query.Data.(bool)) @@ -600,7 +600,7 @@ func TestIsHealthyWithAbnormalChangefeeds(t *testing.T) { latestInfo: &model.ChangeFeedInfo{State: model.StateNormal}, scheduler: &healthScheduler{init: true}, } - err = o.handleQueries(query) + err = o.handleQueries(query, nil) require.NoError(t, err) require.True(t, query.Data.(bool)) } @@ -616,13 +616,13 @@ func TestIsHealthy(t *testing.T) { // Unhealthy, changefeeds are not ticked. o.changefeedTicked = false - err := o.handleQueries(query) + err := o.handleQueries(query, nil) require.NoError(t, err) require.False(t, query.Data.(bool)) o.changefeedTicked = true // Unhealthy, cdc cluster version is inconsistent - o.captures = map[model.CaptureID]*model.CaptureInfo{ + captures := map[model.CaptureID]*model.CaptureInfo{ "1": { Version: version.MinTiCDCVersion.String(), }, @@ -630,14 +630,14 @@ func TestIsHealthy(t *testing.T) { Version: version.MaxTiCDCVersion.String(), }, } - err = o.handleQueries(query) + err = o.handleQueries(query, captures) require.NoError(t, err) require.False(t, query.Data.(bool)) // make all captures version consistent. - o.captures["2"].Version = version.MinTiCDCVersion.String() + captures["2"].Version = version.MinTiCDCVersion.String() // Healthy, no changefeed. - err = o.handleQueries(query) + err = o.handleQueries(query, captures) require.NoError(t, err) require.True(t, query.Data.(bool)) @@ -648,20 +648,20 @@ func TestIsHealthy(t *testing.T) { } o.changefeeds[model.ChangeFeedID{ID: "1"}] = cf o.changefeedTicked = true - err = o.handleQueries(query) + err = o.handleQueries(query, captures) require.NoError(t, err) require.False(t, query.Data.(bool)) // Healthy, scheduler is set and return true. cf.scheduler = &healthScheduler{init: true} o.changefeedTicked = true - err = o.handleQueries(query) + err = o.handleQueries(query, captures) require.NoError(t, err) require.True(t, query.Data.(bool)) // Unhealthy, changefeeds are not ticked. o.changefeedTicked = false - err = o.handleQueries(query) + err = o.handleQueries(query, captures) require.NoError(t, err) require.False(t, query.Data.(bool)) @@ -671,7 +671,7 @@ func TestIsHealthy(t *testing.T) { scheduler: &healthScheduler{init: false}, } o.changefeedTicked = true - err = o.handleQueries(query) + err = o.handleQueries(query, captures) require.NoError(t, err) require.False(t, query.Data.(bool)) } diff --git a/cdc/processor/manager.go b/cdc/processor/manager.go index b9f76bd9638..b0315d28fa3 100644 --- a/cdc/processor/manager.go +++ b/cdc/processor/manager.go @@ -38,6 +38,9 @@ type commandTp int const ( commandTpUnknown commandTp = iota commandTpWriteDebugInfo +) + +const ( processorLogsWarnDuration = 1 * time.Second ) @@ -108,13 +111,13 @@ func NewManager( // Tick implements the `orchestrator.State` interface // the `state` parameter is sent by the etcd worker, the `state` must be a snapshot of KVs in etcd // the Tick function of Manager create or remove processor instances according to the specified `state`, or pass the `state` to processor instances -func (m *managerImpl) Tick(stdCtx context.Context, state orchestrator.ReactorState) (nextState orchestrator.ReactorState, err error) { +func (m *managerImpl) Tick(ctx context.Context, state orchestrator.ReactorState) (nextState orchestrator.ReactorState, err error) { globalState := state.(*orchestrator.GlobalReactorState) m.handleCommand() var inactiveChangefeedCount int for changefeedID, changefeedState := range globalState.Changefeeds { - if !changefeedState.Active(m.captureInfo.ID) { + if !changefeedState.Active() { inactiveChangefeedCount++ m.closeProcessor(changefeedID) continue @@ -156,7 +159,7 @@ func (m *managerImpl) Tick(stdCtx context.Context, state orchestrator.ReactorSta if createTaskPosition(changefeedState, p.captureInfo) { continue } - err, warning := p.Tick(stdCtx, changefeedState.Info, changefeedState.Status) + err, warning := p.Tick(ctx, changefeedState) if warning != nil { patchProcessorWarning(p.captureInfo, changefeedState, warning) } @@ -176,7 +179,7 @@ func (m *managerImpl) Tick(stdCtx context.Context, state orchestrator.ReactorSta } } - if err := m.upstreamManager.Tick(stdCtx, globalState); err != nil { + if err = m.upstreamManager.Tick(ctx, globalState); err != nil { return state, errors.Trace(err) } return state, nil @@ -279,26 +282,27 @@ func patchProcessorWarning(captureInfo *model.CaptureInfo, func (m *managerImpl) closeProcessor(changefeedID model.ChangeFeedID) { processor, exist := m.processors[changefeedID] - if exist { - startTime := time.Now() - err := processor.Close() - costTime := time.Since(startTime) - if costTime > processorLogsWarnDuration { - log.Warn("processor close took too long", - zap.String("namespace", changefeedID.Namespace), - zap.String("changefeed", changefeedID.ID), - zap.String("capture", m.captureInfo.ID), - zap.Duration("duration", costTime)) - } - m.metricProcessorCloseDuration.Observe(costTime.Seconds()) - if err != nil { - log.Warn("failed to close processor", - zap.String("namespace", changefeedID.Namespace), - zap.String("changefeed", changefeedID.ID), - zap.Error(err)) - } - delete(m.processors, changefeedID) + if !exist { + return + } + startTime := time.Now() + err := processor.Close() + costTime := time.Since(startTime) + if costTime > processorLogsWarnDuration { + log.Warn("processor close took too long", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID), + zap.String("capture", m.captureInfo.ID), + zap.Duration("duration", costTime)) + } + m.metricProcessorCloseDuration.Observe(costTime.Seconds()) + if err != nil { + log.Warn("failed to close processor", + zap.String("namespace", changefeedID.Namespace), + zap.String("changefeed", changefeedID.ID), + zap.Error(err)) } + delete(m.processors, changefeedID) } // Close the manager itself and all processors. diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 76bdbfb502f..62f5a92da9a 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -40,6 +40,7 @@ import ( cerror "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/etcd" "github.com/pingcap/tiflow/pkg/filter" + "github.com/pingcap/tiflow/pkg/orchestrator" "github.com/pingcap/tiflow/pkg/pdutil" "github.com/pingcap/tiflow/pkg/retry" "github.com/pingcap/tiflow/pkg/sink" @@ -65,7 +66,7 @@ type Processor interface { // // It can be called in etcd ticks, so it should never be blocked. // Tick Returns: error and warnings. error will be propagated to the owner, and warnings will be record. - Tick(context.Context, *model.ChangeFeedInfo, *model.ChangeFeedStatus) (error, error) + Tick(context.Context, *orchestrator.ChangefeedReactorState) (error, error) // Close closes the processor. Close() error @@ -497,7 +498,7 @@ func isProcessorIgnorableError(err error) bool { // It can be called in etcd ticks, so it should never be blocked. func (p *processor) Tick( ctx context.Context, - info *model.ChangeFeedInfo, status *model.ChangeFeedStatus, + state *orchestrator.ChangefeedReactorState, ) (error, error) { if !p.initialized.Load() { initialized, err := p.initializer.TryInitialize(ctx, p.lazyInit, p.globalVars.ChangefeedThreadPool) @@ -509,8 +510,8 @@ func (p *processor) Tick( } } - p.latestInfo = info - p.latestStatus = status + p.latestInfo = state.Info + p.latestStatus = state.Status // check upstream error first if err := p.upstream.Error(); err != nil { diff --git a/cdc/processor/processor_test.go b/cdc/processor/processor_test.go index b610be1fdf1..0da49795968 100644 --- a/cdc/processor/processor_test.go +++ b/cdc/processor/processor_test.go @@ -237,8 +237,8 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { tester.MustApplyPatches() // no operation - err, _ := p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ := p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() // table-1: `preparing` -> `prepared` -> `replicating` @@ -272,8 +272,8 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { }}..., ) - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() done = p.IsAddTableSpanFinished(span, true) @@ -295,8 +295,8 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.NoError(t, err) require.True(t, ok) - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() // table-1: `prepared` -> `replicating` @@ -305,7 +305,7 @@ func TestTableExecutorAddingTableIndirectly(t *testing.T) { require.Equal(t, tablepb.TableStateReplicating, state) err = p.Close() - require.Nil(t, err) + require.NoError(t, err) require.Nil(t, p.agent) } @@ -327,8 +327,8 @@ func TestTableExecutorAddingTableIndirectlyWithRedoEnabled(t *testing.T) { tester.MustApplyPatches() // no operation - err, _ := p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ := p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() // table-1: `preparing` -> `prepared` -> `replicating` @@ -362,8 +362,8 @@ func TestTableExecutorAddingTableIndirectlyWithRedoEnabled(t *testing.T) { }}..., ) - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() done = p.IsAddTableSpanFinished(span, true) @@ -395,8 +395,8 @@ func TestTableExecutorAddingTableIndirectlyWithRedoEnabled(t *testing.T) { require.Equal(t, model.Ts(60), stats.ResolvedTs) require.Equal(t, model.Ts(50), stats.BarrierTs) - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() // table-1: `prepared` -> `replicating` @@ -405,7 +405,7 @@ func TestTableExecutorAddingTableIndirectlyWithRedoEnabled(t *testing.T) { require.Equal(t, tablepb.TableStateReplicating, state) err = p.Close() - require.Nil(t, err) + require.NoError(t, err) require.Nil(t, p.agent) } @@ -417,14 +417,14 @@ func TestProcessorError(t *testing.T) { // init tick require.Nil(t, p.lazyInit(ctx)) - err, _ := p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ := p.Tick(ctx, changefeed) + require.NoError(t, err) createTaskPosition(changefeed, p.captureInfo) tester.MustApplyPatches() // send a abnormal error p.sinkManager.errors <- cerror.ErrSinkURIInvalid - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) + err, _ = p.Tick(ctx, changefeed) require.Error(t, err) patchProcessorErr(p.captureInfo, changefeed, err) tester.MustApplyPatches() @@ -442,14 +442,14 @@ func TestProcessorError(t *testing.T) { p, tester, changefeed = initProcessor4Test(t, &liveness, false, globalVars, changefeedVars) // init tick require.Nil(t, p.lazyInit(ctx)) - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) createTaskPosition(changefeed, p.captureInfo) tester.MustApplyPatches() // send a normal error p.sinkManager.errors <- context.Canceled - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) + err, _ = p.Tick(ctx, changefeed) patchProcessorErr(p.captureInfo, changefeed, err) tester.MustApplyPatches() require.True(t, cerror.ErrReactorFinished.Equal(errors.Cause(err))) @@ -497,20 +497,20 @@ func TestProcessorClose(t *testing.T) { tester.MustApplyPatches() // Do a no operation tick to lazy init the processor. - err, _ := p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ := p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() // add tables done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 20}, false) - require.Nil(t, err) + require.NoError(t, err) require.True(t, done) done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), tablepb.Checkpoint{CheckpointTs: 30}, false) - require.Nil(t, err) + require.NoError(t, err) require.True(t, done) - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() // push the resolvedTs and checkpointTs @@ -518,8 +518,8 @@ func TestProcessorClose(t *testing.T) { return status, true, nil }) tester.MustApplyPatches() - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() require.Contains(t, changefeed.TaskPositions, p.captureInfo.ID) @@ -537,24 +537,24 @@ func TestProcessorClose(t *testing.T) { tester.MustApplyPatches() // Do a no operation tick to lazy init the processor. - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() // add tables done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 20}, false) - require.Nil(t, err) + require.NoError(t, err) require.True(t, done) done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), tablepb.Checkpoint{CheckpointTs: 30}, false) - require.Nil(t, err) + require.NoError(t, err) require.True(t, done) - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() // send error p.sinkManager.errors <- cerror.ErrSinkURIInvalid - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) + err, _ = p.Tick(ctx, changefeed) require.Error(t, err) patchProcessorErr(p.captureInfo, changefeed, err) tester.MustApplyPatches() @@ -585,16 +585,16 @@ func TestPositionDeleted(t *testing.T) { require.Contains(t, changefeed.TaskPositions, p.captureInfo.ID) // Do a no operation tick to lazy init the processor. - err, _ := p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ := p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() // add table done, err := p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(1), tablepb.Checkpoint{CheckpointTs: 30}, false) - require.Nil(t, err) + require.NoError(t, err) require.True(t, done) done, err = p.AddTableSpan(ctx, spanz.TableIDToComparableSpan(2), tablepb.Checkpoint{CheckpointTs: 40}, false) - require.Nil(t, err) + require.NoError(t, err) require.True(t, done) // some others delete the task position @@ -607,8 +607,8 @@ func TestPositionDeleted(t *testing.T) { // position created again checkChangefeedNormal(changefeed) createTaskPosition(changefeed, p.captureInfo) - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() require.Equal(t, &model.TaskPosition{}, changefeed.TaskPositions[p.captureInfo.ID]) require.Contains(t, changefeed.TaskPositions, p.captureInfo.ID) @@ -633,8 +633,8 @@ func TestSchemaGC(t *testing.T) { updateChangeFeedPosition(t, tester, model.DefaultChangeFeedID("changefeed-id-test"), 50) - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() // GC Ts should be (checkpoint - 1). @@ -658,7 +658,7 @@ func updateChangeFeedPosition(t *testing.T, tester *orchestrator.ReactorStateTes CheckpointTs: checkpointTs, } valueBytes, err := json.Marshal(cfStatus) - require.Nil(t, err) + require.NoError(t, err) tester.MustUpdate(keyStr, valueBytes) } @@ -700,24 +700,24 @@ func TestUpdateBarrierTs(t *testing.T) { require.Contains(t, changefeed.TaskPositions, p.captureInfo.ID) // Do a no operation tick to lazy init the processor. - err, _ := p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ := p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() span := spanz.TableIDToComparableSpan(1) done, err := p.AddTableSpan(ctx, span, tablepb.Checkpoint{CheckpointTs: 5}, false) require.True(t, done) - require.Nil(t, err) - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + require.NoError(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() // Global resolved ts has advanced while schema storage stalls. changefeed.PatchStatus(func(status *model.ChangeFeedStatus) (*model.ChangeFeedStatus, bool, error) { return status, true, nil }) - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() p.updateBarrierTs(&schedulepb.Barrier{GlobalBarrierTs: 20, TableBarriers: nil}) status := p.sinkManager.r.GetTableStats(span) @@ -725,8 +725,8 @@ func TestUpdateBarrierTs(t *testing.T) { // Schema storage has advanced too. p.ddlHandler.r.schemaStorage.(*mockSchemaStorage).resolvedTs = 15 - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() p.updateBarrierTs(&schedulepb.Barrier{GlobalBarrierTs: 20, TableBarriers: nil}) status = p.sinkManager.r.GetTableStats(span) @@ -744,13 +744,13 @@ func TestProcessorLiveness(t *testing.T) { // First tick for creating position. require.Nil(t, p.lazyInit(ctx)) - err, _ := p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ := p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() // Second tick for init. - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) // Changing p.liveness affects p.agent liveness. p.liveness.Store(model.LivenessCaptureStopping) @@ -781,17 +781,17 @@ func TestProcessorDostNotStuckInInit(t *testing.T) { require.Nil(t, p.lazyInit(ctx)) // First tick for creating position. - err, _ := p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ := p.Tick(ctx, changefeed) + require.NoError(t, err) tester.MustApplyPatches() // Second tick for init. - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) // TODO(qupeng): third tick for handle a warning. - err, _ = p.Tick(ctx, changefeed.Info, changefeed.Status) - require.Nil(t, err) + err, _ = p.Tick(ctx, changefeed) + require.NoError(t, err) require.Nil(t, p.Close()) tester.MustApplyPatches() diff --git a/cdc/processor/sinkmanager/manager.go b/cdc/processor/sinkmanager/manager.go index 171443fa1ba..13d73d8fb09 100644 --- a/cdc/processor/sinkmanager/manager.go +++ b/cdc/processor/sinkmanager/manager.go @@ -1055,7 +1055,7 @@ func (m *SinkManager) Close() { start := time.Now() m.waitSubroutines() - // NOTE: It's unnecceary to close table sinks before clear sink factory. + // NOTE: It's unnecessary to close table sinks before clear sink factory. m.clearSinkFactory() log.Info("Closed sink manager", diff --git a/cdc/processor/sourcemanager/manager.go b/cdc/processor/sourcemanager/manager.go index 3afb21f26bb..5dce0c75449 100644 --- a/cdc/processor/sourcemanager/manager.go +++ b/cdc/processor/sourcemanager/manager.go @@ -234,22 +234,17 @@ func (m *SourceManager) Close() { zap.String("changefeed", m.changefeedID.ID)) start := time.Now() - - log.Info("All pullers have been closed", - zap.String("namespace", m.changefeedID.Namespace), - zap.String("changefeed", m.changefeedID.ID), - zap.Duration("cost", time.Since(start))) - if err := m.engine.Close(); err != nil { log.Panic("Fail to close sort engine", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), + zap.Duration("duration", time.Since(start)), zap.Error(err)) } log.Info("Closed source manager", zap.String("namespace", m.changefeedID.Namespace), zap.String("changefeed", m.changefeedID.ID), - zap.Duration("cost", time.Since(start))) + zap.Duration("duration", time.Since(start))) } // Add adds events to the engine. It is used for testing. diff --git a/changefeed.toml b/changefeed.toml new file mode 100644 index 00000000000..5a16fb5b9bf --- /dev/null +++ b/changefeed.toml @@ -0,0 +1,8 @@ +[integrity] +integrity-check-level = "correctness" +corruption-handle-level = "error" + +# [sink.kafka-config.large-message-handle] +# # large-message-handle-compression = "snappy" +# large-message-handle-option = "claim-check" +# claim-check-storage-uri = "file:///tmp/kafka-simple-claim-check" diff --git a/pkg/orchestrator/reactor_state.go b/pkg/orchestrator/reactor_state.go index ccbde53e1cb..be8c5506e1e 100644 --- a/pkg/orchestrator/reactor_state.go +++ b/pkg/orchestrator/reactor_state.go @@ -205,6 +205,11 @@ func NewChangefeedReactorState(clusterID string, } } +// Checkpoint return the current checkpoint ts. +func (s *ChangefeedReactorState) Checkpoint() model.Ts { + return s.Info.GetCheckpointTs(s.Status) +} + // GetID returns the changefeed ID. func (s *ChangefeedReactorState) GetID() model.ChangeFeedID { return s.ID @@ -487,7 +492,7 @@ func (s *ChangefeedReactorState) Exist() bool { } // Active return true if the changefeed is ready to be processed -func (s *ChangefeedReactorState) Active(captureID model.CaptureID) bool { +func (s *ChangefeedReactorState) Active() bool { return s.Info != nil && s.Status != nil && s.Status.AdminJobType == model.AdminNone }