Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

changefeed(ticdc): no need to use info and status as the changefeed tick arguments #11629

Open
wants to merge 14 commits into
base: master
Choose a base branch
from
120 changes: 60 additions & 60 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/pingcap/tiflow/pkg/config"
cerror "github.com/pingcap/tiflow/pkg/errors"
pfilter "github.com/pingcap/tiflow/pkg/filter"
"github.com/pingcap/tiflow/pkg/orchestrator"
"github.com/pingcap/tiflow/pkg/pdutil"
redoCfg "github.com/pingcap/tiflow/pkg/redo"
"github.com/pingcap/tiflow/pkg/sink/observer"
Expand All @@ -56,9 +57,8 @@ type Changefeed interface {
//
// It can be called in etcd ticks, so it should never be blocked.
// Tick Returns: checkpointTs, minTableBarrierTs
Tick(context.Context, *model.ChangeFeedInfo,
*model.ChangeFeedStatus,
map[model.CaptureID]*model.CaptureInfo) (model.Ts, model.Ts)
Tick(context.Context, *orchestrator.ChangefeedReactorState,
map[model.CaptureID]*model.CaptureInfo)

// Close closes the changefeed.
Close(ctx context.Context)
Expand Down Expand Up @@ -266,13 +266,12 @@ func newChangefeed4Test(
}

func (c *changefeed) Tick(ctx context.Context,
cfInfo *model.ChangeFeedInfo,
cfStatus *model.ChangeFeedStatus,
state *orchestrator.ChangefeedReactorState,
captures map[model.CaptureID]*model.CaptureInfo,
) (model.Ts, model.Ts) {
) {
startTime := time.Now()
c.latestInfo = cfInfo
c.latestStatus = cfStatus
c.latestInfo = state.Info
c.latestStatus = state.Status
// Handle all internal warnings.
noMoreWarnings := false
for !noMoreWarnings {
Expand All @@ -288,10 +287,10 @@ func (c *changefeed) Tick(ctx context.Context,
if err != nil {
c.handleErr(ctx, err)
}
return 0, 0
return
}

checkpointTs, minTableBarrierTs, err := c.tick(ctx, captures, cfInfo, cfStatus)
checkpointTs, minTableBarrierTs, err := c.tick(ctx, captures, state)

// The tick duration is recorded only if changefeed has completed initialization
if c.initialized.Load() {
Expand All @@ -309,7 +308,7 @@ func (c *changefeed) Tick(ctx context.Context,
log.Error("changefeed tick failed", zap.Error(err))
c.handleErr(ctx, err)
}
return checkpointTs, minTableBarrierTs
updateStatus(state, checkpointTs, minTableBarrierTs)
}

func (c *changefeed) Throw(ctx context.Context) func(error) {
Expand Down Expand Up @@ -378,14 +377,16 @@ func (c *changefeed) checkStaleCheckpointTs(
// tick returns the checkpointTs and minTableBarrierTs.
func (c *changefeed) tick(ctx context.Context,
captures map[model.CaptureID]*model.CaptureInfo,
cfInfo *model.ChangeFeedInfo,
cfStatus *model.ChangeFeedStatus,
state *orchestrator.ChangefeedReactorState,
) (model.Ts, model.Ts, error) {
adminJobPending := c.feedStateManager.Tick(c.resolvedTs, cfStatus, cfInfo)
preCheckpointTs := cfInfo.GetCheckpointTs(cfStatus)
adminJobPending := c.feedStateManager.Tick(c.resolvedTs, state)
preCheckpointTs := state.Checkpoint()

info := state.Info
status := state.Status
// checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()`
// to ensure all changefeeds, no matter whether they are running or not, will be checked.
if err := c.checkStaleCheckpointTs(ctx, preCheckpointTs, cfInfo); err != nil {
if err := c.checkStaleCheckpointTs(ctx, preCheckpointTs, info); err != nil {
return 0, 0, errors.Trace(err)
}

Expand All @@ -402,7 +403,7 @@ func (c *changefeed) tick(ctx context.Context,
if !c.initialized.Load() {
initialized, err := c.initializer.TryInitialize(ctx,
func(ctx context.Context) error {
return c.initialize(ctx, cfInfo, cfStatus)
return c.initialize(ctx, state)
},
c.globalVars.ChangefeedThreadPool)
if err != nil {
Expand Down Expand Up @@ -434,7 +435,7 @@ func (c *changefeed) tick(ctx context.Context,
return 0, 0, nil
}

err = c.handleBarrier(ctx, cfInfo, cfStatus, barrier)
err = c.handleBarrier(ctx, info, status, barrier)
if err != nil {
return 0, 0, errors.Trace(err)
}
Expand Down Expand Up @@ -482,16 +483,13 @@ func (c *changefeed) tick(ctx context.Context,
}
}

pdTime := c.upstream.PDClock.CurrentTime()
currentTs := oracle.GetPhysical(pdTime)

// CheckpointCannotProceed implies that not all tables are being replicated normally,
// so in that case there is no need to advance the global watermarks.
if watermark.CheckpointTs == scheduler.CheckpointCannotProceed {
if cfStatus != nil {
if status != nil {
// We should keep the metrics updated even if the scheduler cannot
// advance the watermarks for now.
c.updateMetrics(currentTs, cfStatus.CheckpointTs, c.resolvedTs)
c.updateMetrics()
}
return 0, 0, nil
}
Expand All @@ -508,36 +506,36 @@ func (c *changefeed) tick(ctx context.Context,
}

// MinTableBarrierTs should never regress
if barrier.MinTableBarrierTs < cfStatus.MinTableBarrierTs {
barrier.MinTableBarrierTs = cfStatus.MinTableBarrierTs
if barrier.MinTableBarrierTs < status.MinTableBarrierTs {
barrier.MinTableBarrierTs = status.MinTableBarrierTs
}

failpoint.Inject("ChangefeedOwnerDontUpdateCheckpoint", func() {
if c.lastDDLTs != 0 && cfStatus.CheckpointTs >= c.lastDDLTs {
if c.lastDDLTs != 0 && status.CheckpointTs >= c.lastDDLTs {
log.Info("owner won't update checkpoint because of failpoint",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Uint64("keepCheckpoint", cfStatus.CheckpointTs),
zap.Uint64("keepCheckpoint", status.CheckpointTs),
zap.Uint64("skipCheckpoint", watermark.CheckpointTs))
watermark.CheckpointTs = cfStatus.CheckpointTs
watermark.CheckpointTs = status.CheckpointTs
}
})

failpoint.Inject("ChangefeedOwnerNotUpdateCheckpoint", func() {
watermark.CheckpointTs = cfStatus.CheckpointTs
watermark.CheckpointTs = status.CheckpointTs
})

c.updateMetrics(currentTs, watermark.CheckpointTs, c.resolvedTs)
c.latestStatus.CheckpointTs = watermark.CheckpointTs
c.updateMetrics()
c.tickDownstreamObserver(ctx)

return watermark.CheckpointTs, barrier.MinTableBarrierTs, nil
}

func (c *changefeed) initialize(ctx context.Context,
cfInfo *model.ChangeFeedInfo,
cfStatus *model.ChangeFeedStatus,
state *orchestrator.ChangefeedReactorState,
) (err error) {
if c.initialized.Load() || cfStatus == nil {
if c.initialized.Load() || state.Status == nil {
// If `c.latestStatus` is nil it means the changefeed struct is just created, it needs to
// 1. use startTs as checkpointTs and resolvedTs, if it's a new created changefeed; or
// 2. load checkpointTs and resolvedTs from etcd, if it's an existing changefeed.
Expand Down Expand Up @@ -566,12 +564,14 @@ LOOP2:
}
}

checkpointTs := cfStatus.CheckpointTs
status := state.Status
info := state.Info
checkpointTs := status.CheckpointTs
if c.resolvedTs == 0 {
c.resolvedTs = checkpointTs
}

minTableBarrierTs := cfStatus.MinTableBarrierTs
minTableBarrierTs := status.MinTableBarrierTs

failpoint.Inject("NewChangefeedNoRetryError", func() {
failpoint.Return(cerror.ErrStartTsBeforeGC.GenWithStackByArgs(checkpointTs-300, checkpointTs))
Expand All @@ -580,7 +580,7 @@ LOOP2:
failpoint.Return(errors.New("failpoint injected retriable error"))
})

if cfInfo.Config.CheckGCSafePoint {
if info.Config.CheckGCSafePoint {
// Check TiDB GC safepoint does not exceed the checkpoint.
//
// We update TTL to 10 minutes,
Expand Down Expand Up @@ -634,26 +634,26 @@ LOOP2:
}

c.barriers = newBarriers()
if util.GetOrZero(cfInfo.Config.EnableSyncPoint) {
if util.GetOrZero(info.Config.EnableSyncPoint) {
// firstSyncPointStartTs = k * syncPointInterval,
// which >= startTs, and choose the minimal k
syncPointInterval := util.GetOrZero(cfInfo.Config.SyncPointInterval)
syncPointInterval := util.GetOrZero(info.Config.SyncPointInterval)
k := oracle.GetTimeFromTS(c.resolvedTs).Sub(time.Unix(0, 0)) / syncPointInterval
if oracle.GetTimeFromTS(c.resolvedTs).Sub(time.Unix(0, 0))%syncPointInterval != 0 || oracle.ExtractLogical(c.resolvedTs) != 0 {
k += 1
}
firstSyncPointTs := oracle.GoTimeToTS(time.Unix(0, 0).Add(k * syncPointInterval))
c.barriers.Update(syncPointBarrier, firstSyncPointTs)
}
c.barriers.Update(finishBarrier, cfInfo.GetTargetTs())
c.barriers.Update(finishBarrier, info.GetTargetTs())

filter, err := pfilter.NewFilter(cfInfo.Config, "")
filter, err := pfilter.NewFilter(info.Config, "")
if err != nil {
return errors.Trace(err)
}
c.schema, err = entry.NewSchemaStorage(
c.upstream.KVStorage, ddlStartTs,
cfInfo.Config.ForceReplicate, c.id, util.RoleOwner, filter)
info.Config.ForceReplicate, c.id, util.RoleOwner, filter)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -665,10 +665,10 @@ LOOP2:
if err != nil {
return errors.Trace(err)
}
cfInfo.Config.Sink.TiDBSourceID = sourceID
info.Config.Sink.TiDBSourceID = sourceID
log.Info("get sourceID from PD", zap.Uint64("sourceID", sourceID), zap.Stringer("changefeedID", c.id))

c.ddlSink = c.newSink(c.id, cfInfo, c.Throw(ctx), func(err error) {
c.ddlSink = c.newSink(c.id, info, c.Throw(ctx), func(err error) {
select {
case <-ctx.Done():
case c.warningCh <- err:
Expand All @@ -683,13 +683,13 @@ LOOP2:
c.Throw(ctx)(c.ddlPuller.Run(cancelCtx))
}()

c.downstreamObserver, err = c.newDownstreamObserver(ctx, c.id, cfInfo.SinkURI, cfInfo.Config)
c.downstreamObserver, err = c.newDownstreamObserver(ctx, c.id, info.SinkURI, info.Config)
if err != nil {
return err
}
c.observerLastTick = atomic.NewTime(time.Time{})

c.redoDDLMgr = redo.NewDDLManager(c.id, cfInfo.Config.Consistent, ddlStartTs)
c.redoDDLMgr = redo.NewDDLManager(c.id, info.Config.Consistent, ddlStartTs)
if c.redoDDLMgr.Enabled() {
c.wg.Add(1)
go func() {
Expand All @@ -698,7 +698,7 @@ LOOP2:
}()
}

c.redoMetaMgr = redo.NewMetaManager(c.id, cfInfo.Config.Consistent, checkpointTs)
c.redoMetaMgr = redo.NewMetaManager(c.id, info.Config.Consistent, checkpointTs)
if c.redoMetaMgr.Enabled() {
c.wg.Add(1)
go func() {
Expand All @@ -713,22 +713,22 @@ LOOP2:
c.ddlManager = newDDLManager(
c.id,
ddlStartTs,
cfStatus.CheckpointTs,
checkpointTs,
c.ddlSink,
filter,
c.ddlPuller,
c.schema,
c.redoDDLMgr,
c.redoMetaMgr,
util.GetOrZero(cfInfo.Config.BDRMode),
cfInfo.Config.Sink.ShouldSendAllBootstrapAtStart(),
util.GetOrZero(info.Config.BDRMode),
info.Config.Sink.ShouldSendAllBootstrapAtStart(),
c.Throw(ctx),
)

// create scheduler
cfg := *c.cfg
cfg.ChangefeedSettings = cfInfo.Config.Scheduler
epoch := cfInfo.Epoch
cfg.ChangefeedSettings = info.Config.Scheduler
epoch := info.Epoch
c.scheduler, err = c.newScheduler(ctx, c.id, c.upstream, epoch, &cfg, c.redoMetaMgr, c.globalVars)
if err != nil {
return errors.Trace(err)
Expand All @@ -737,15 +737,15 @@ LOOP2:
c.initMetrics()

c.initialized.Store(true)
c.metricsChangefeedCreateTimeGuage.Set(float64(oracle.GetPhysical(cfInfo.CreateTime)))
c.metricsChangefeedCreateTimeGuage.Set(float64(oracle.GetPhysical(info.CreateTime)))
c.metricsChangefeedRestartTimeGauge.Set(float64(oracle.GetPhysical(time.Now())))
log.Info("changefeed initialized",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Uint64("changefeedEpoch", epoch),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", c.resolvedTs),
zap.String("info", cfInfo.String()))
zap.String("info", info.String()))

return nil
}
Expand Down Expand Up @@ -800,11 +800,7 @@ func (c *changefeed) releaseResources(ctx context.Context) {
c.wg.Wait()

if c.ddlSink != nil {
canceledCtx, cancel := context.WithCancel(context.Background())
cancel()
// TODO(dongmen): remove ctx from func ddlSink.close(), it is useless.
// We don't need to wait ddlSink Close, pass a canceled context is ok
if err := c.ddlSink.close(canceledCtx); err != nil {
if err := c.ddlSink.close(); err != nil {
log.Warn("owner close ddlSink failed",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
Expand Down Expand Up @@ -978,15 +974,19 @@ func (c *changefeed) handleBarrier(ctx context.Context,
return nil
}

func (c *changefeed) updateMetrics(currentTs int64, checkpointTs, resolvedTs model.Ts) {
func (c *changefeed) updateMetrics() {
pdTime := c.upstream.PDClock.CurrentTime()
currentTs := oracle.GetPhysical(pdTime)

checkpointTs := c.latestStatus.CheckpointTs
phyCkpTs := oracle.ExtractPhysical(checkpointTs)
c.metricsChangefeedCheckpointTsGauge.Set(float64(phyCkpTs))

checkpointLag := float64(currentTs-phyCkpTs) / 1e3
c.metricsChangefeedCheckpointTsLagGauge.Set(checkpointLag)
c.metricsChangefeedCheckpointLagDuration.Observe(checkpointLag)

phyRTs := oracle.ExtractPhysical(resolvedTs)
phyRTs := oracle.ExtractPhysical(c.resolvedTs)
c.metricsChangefeedResolvedTsGauge.Set(float64(phyRTs))

resolvedLag := float64(currentTs-phyRTs) / 1e3
Expand Down
Loading
Loading