Skip to content

Commit

Permalink
adjust method
Browse files Browse the repository at this point in the history
  • Loading branch information
3AceShowHand committed Sep 26, 2024
1 parent 0b89c56 commit 4560889
Show file tree
Hide file tree
Showing 7 changed files with 116 additions and 108 deletions.
99 changes: 51 additions & 48 deletions cdc/owner/changefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package owner
import (
"context"
"fmt"
"github.com/pingcap/tiflow/pkg/orchestrator"
"math"
"strings"
"sync"
Expand Down Expand Up @@ -56,9 +57,8 @@ type Changefeed interface {
//
// It can be called in etcd ticks, so it should never be blocked.
// Tick Returns: checkpointTs, minTableBarrierTs
Tick(context.Context, *model.ChangeFeedInfo,
*model.ChangeFeedStatus,
map[model.CaptureID]*model.CaptureInfo) (model.Ts, model.Ts)
Tick(context.Context, *orchestrator.ChangefeedReactorState,
map[model.CaptureID]*model.CaptureInfo)

// Close closes the changefeed.
Close(ctx context.Context)
Expand Down Expand Up @@ -266,13 +266,12 @@ func newChangefeed4Test(
}

func (c *changefeed) Tick(ctx context.Context,
cfInfo *model.ChangeFeedInfo,
cfStatus *model.ChangeFeedStatus,
state *orchestrator.ChangefeedReactorState,
captures map[model.CaptureID]*model.CaptureInfo,
) (model.Ts, model.Ts) {
) {
startTime := time.Now()
c.latestInfo = cfInfo
c.latestStatus = cfStatus
c.latestInfo = state.Info
c.latestStatus = state.Status
// Handle all internal warnings.
noMoreWarnings := false
for !noMoreWarnings {
Expand All @@ -288,10 +287,10 @@ func (c *changefeed) Tick(ctx context.Context,
if err != nil {
c.handleErr(ctx, err)
}
return 0, 0
return
}

checkpointTs, minTableBarrierTs, err := c.tick(ctx, captures, cfInfo, cfStatus)
checkpointTs, minTableBarrierTs, err := c.tick(ctx, captures, state)

// The tick duration is recorded only if changefeed has completed initialization
if c.initialized.Load() {
Expand All @@ -309,7 +308,8 @@ func (c *changefeed) Tick(ctx context.Context,
log.Error("changefeed tick failed", zap.Error(err))
c.handleErr(ctx, err)
}
return checkpointTs, minTableBarrierTs
//return checkpointTs, minTableBarrierTs
updateStatus(state, checkpointTs, minTableBarrierTs)
}

func (c *changefeed) Throw(ctx context.Context) func(error) {
Expand Down Expand Up @@ -378,14 +378,16 @@ func (c *changefeed) checkStaleCheckpointTs(
// tick returns the checkpointTs and minTableBarrierTs.
func (c *changefeed) tick(ctx context.Context,
captures map[model.CaptureID]*model.CaptureInfo,
cfInfo *model.ChangeFeedInfo,
cfStatus *model.ChangeFeedStatus,
state *orchestrator.ChangefeedReactorState,
) (model.Ts, model.Ts, error) {
adminJobPending := c.feedStateManager.Tick(c.resolvedTs, cfStatus, cfInfo)
preCheckpointTs := cfInfo.GetCheckpointTs(cfStatus)
adminJobPending := c.feedStateManager.Tick(c.resolvedTs, state)
preCheckpointTs := state.Checkpoint()

info := state.Info
status := state.Status
// checkStaleCheckpointTs must be called before `feedStateManager.ShouldRunning()`
// to ensure all changefeeds, no matter whether they are running or not, will be checked.
if err := c.checkStaleCheckpointTs(ctx, preCheckpointTs, cfInfo); err != nil {
if err := c.checkStaleCheckpointTs(ctx, preCheckpointTs, info); err != nil {
return 0, 0, errors.Trace(err)
}

Expand All @@ -402,7 +404,7 @@ func (c *changefeed) tick(ctx context.Context,
if !c.initialized.Load() {
initialized, err := c.initializer.TryInitialize(ctx,
func(ctx context.Context) error {
return c.initialize(ctx, cfInfo, cfStatus)
return c.initialize(ctx, state)
},
c.globalVars.ChangefeedThreadPool)
if err != nil {
Expand Down Expand Up @@ -434,7 +436,7 @@ func (c *changefeed) tick(ctx context.Context,
return 0, 0, nil
}

err = c.handleBarrier(ctx, cfInfo, cfStatus, barrier)
err = c.handleBarrier(ctx, info, status, barrier)
if err != nil {
return 0, 0, errors.Trace(err)
}
Expand Down Expand Up @@ -485,7 +487,7 @@ func (c *changefeed) tick(ctx context.Context,
// CheckpointCannotProceed implies that not all tables are being replicated normally,
// so in that case there is no need to advance the global watermarks.
if watermark.CheckpointTs == scheduler.CheckpointCannotProceed {
if cfStatus != nil {
if status != nil {
// We should keep the metrics updated even if the scheduler cannot
// advance the watermarks for now.
c.updateMetrics()
Expand All @@ -505,23 +507,23 @@ func (c *changefeed) tick(ctx context.Context,
}

// MinTableBarrierTs should never regress
if barrier.MinTableBarrierTs < cfStatus.MinTableBarrierTs {
barrier.MinTableBarrierTs = cfStatus.MinTableBarrierTs
if barrier.MinTableBarrierTs < status.MinTableBarrierTs {
barrier.MinTableBarrierTs = status.MinTableBarrierTs
}

failpoint.Inject("ChangefeedOwnerDontUpdateCheckpoint", func() {
if c.lastDDLTs != 0 && cfStatus.CheckpointTs >= c.lastDDLTs {
if c.lastDDLTs != 0 && status.CheckpointTs >= c.lastDDLTs {
log.Info("owner won't update checkpoint because of failpoint",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Uint64("keepCheckpoint", cfStatus.CheckpointTs),
zap.Uint64("keepCheckpoint", status.CheckpointTs),
zap.Uint64("skipCheckpoint", watermark.CheckpointTs))
watermark.CheckpointTs = cfStatus.CheckpointTs
watermark.CheckpointTs = status.CheckpointTs
}
})

failpoint.Inject("ChangefeedOwnerNotUpdateCheckpoint", func() {
watermark.CheckpointTs = cfStatus.CheckpointTs
watermark.CheckpointTs = status.CheckpointTs
})

c.latestStatus.CheckpointTs = watermark.CheckpointTs
Expand All @@ -532,10 +534,9 @@ func (c *changefeed) tick(ctx context.Context,
}

func (c *changefeed) initialize(ctx context.Context,
cfInfo *model.ChangeFeedInfo,
cfStatus *model.ChangeFeedStatus,
state *orchestrator.ChangefeedReactorState,
) (err error) {
if c.initialized.Load() || cfStatus == nil {
if c.initialized.Load() || state.Status == nil {
// If `c.latestStatus` is nil it means the changefeed struct is just created, it needs to
// 1. use startTs as checkpointTs and resolvedTs, if it's a new created changefeed; or
// 2. load checkpointTs and resolvedTs from etcd, if it's an existing changefeed.
Expand Down Expand Up @@ -564,12 +565,14 @@ LOOP2:
}
}

checkpointTs := cfStatus.CheckpointTs
status := state.Status
info := state.Info
checkpointTs := status.CheckpointTs
if c.resolvedTs == 0 {
c.resolvedTs = checkpointTs
}

minTableBarrierTs := cfStatus.MinTableBarrierTs
minTableBarrierTs := status.MinTableBarrierTs

failpoint.Inject("NewChangefeedNoRetryError", func() {
failpoint.Return(cerror.ErrStartTsBeforeGC.GenWithStackByArgs(checkpointTs-300, checkpointTs))
Expand All @@ -578,7 +581,7 @@ LOOP2:
failpoint.Return(errors.New("failpoint injected retriable error"))
})

if cfInfo.Config.CheckGCSafePoint {
if info.Config.CheckGCSafePoint {
// Check TiDB GC safepoint does not exceed the checkpoint.
//
// We update TTL to 10 minutes,
Expand Down Expand Up @@ -632,26 +635,26 @@ LOOP2:
}

c.barriers = newBarriers()
if util.GetOrZero(cfInfo.Config.EnableSyncPoint) {
if util.GetOrZero(info.Config.EnableSyncPoint) {
// firstSyncPointStartTs = k * syncPointInterval,
// which >= startTs, and choose the minimal k
syncPointInterval := util.GetOrZero(cfInfo.Config.SyncPointInterval)
syncPointInterval := util.GetOrZero(info.Config.SyncPointInterval)
k := oracle.GetTimeFromTS(c.resolvedTs).Sub(time.Unix(0, 0)) / syncPointInterval
if oracle.GetTimeFromTS(c.resolvedTs).Sub(time.Unix(0, 0))%syncPointInterval != 0 || oracle.ExtractLogical(c.resolvedTs) != 0 {
k += 1
}
firstSyncPointTs := oracle.GoTimeToTS(time.Unix(0, 0).Add(k * syncPointInterval))
c.barriers.Update(syncPointBarrier, firstSyncPointTs)
}
c.barriers.Update(finishBarrier, cfInfo.GetTargetTs())
c.barriers.Update(finishBarrier, info.GetTargetTs())

filter, err := pfilter.NewFilter(cfInfo.Config, "")
filter, err := pfilter.NewFilter(info.Config, "")
if err != nil {
return errors.Trace(err)
}
c.schema, err = entry.NewSchemaStorage(
c.upstream.KVStorage, ddlStartTs,
cfInfo.Config.ForceReplicate, c.id, util.RoleOwner, filter)
info.Config.ForceReplicate, c.id, util.RoleOwner, filter)
if err != nil {
return errors.Trace(err)
}
Expand All @@ -663,10 +666,10 @@ LOOP2:
if err != nil {
return errors.Trace(err)
}
cfInfo.Config.Sink.TiDBSourceID = sourceID
info.Config.Sink.TiDBSourceID = sourceID
log.Info("get sourceID from PD", zap.Uint64("sourceID", sourceID), zap.Stringer("changefeedID", c.id))

c.ddlSink = c.newSink(c.id, cfInfo, c.Throw(ctx), func(err error) {
c.ddlSink = c.newSink(c.id, info, c.Throw(ctx), func(err error) {
select {
case <-ctx.Done():
case c.warningCh <- err:
Expand All @@ -681,13 +684,13 @@ LOOP2:
c.Throw(ctx)(c.ddlPuller.Run(cancelCtx))
}()

c.downstreamObserver, err = c.newDownstreamObserver(ctx, c.id, cfInfo.SinkURI, cfInfo.Config)
c.downstreamObserver, err = c.newDownstreamObserver(ctx, c.id, info.SinkURI, info.Config)
if err != nil {
return err
}
c.observerLastTick = atomic.NewTime(time.Time{})

c.redoDDLMgr = redo.NewDDLManager(c.id, cfInfo.Config.Consistent, ddlStartTs)
c.redoDDLMgr = redo.NewDDLManager(c.id, info.Config.Consistent, ddlStartTs)
if c.redoDDLMgr.Enabled() {
c.wg.Add(1)
go func() {
Expand All @@ -696,7 +699,7 @@ LOOP2:
}()
}

c.redoMetaMgr = redo.NewMetaManager(c.id, cfInfo.Config.Consistent, checkpointTs)
c.redoMetaMgr = redo.NewMetaManager(c.id, info.Config.Consistent, checkpointTs)
if c.redoMetaMgr.Enabled() {
c.wg.Add(1)
go func() {
Expand All @@ -711,22 +714,22 @@ LOOP2:
c.ddlManager = newDDLManager(
c.id,
ddlStartTs,
cfStatus.CheckpointTs,
checkpointTs,
c.ddlSink,
filter,
c.ddlPuller,
c.schema,
c.redoDDLMgr,
c.redoMetaMgr,
util.GetOrZero(cfInfo.Config.BDRMode),
cfInfo.Config.Sink.ShouldSendAllBootstrapAtStart(),
util.GetOrZero(info.Config.BDRMode),
info.Config.Sink.ShouldSendAllBootstrapAtStart(),
c.Throw(ctx),
)

// create scheduler
cfg := *c.cfg
cfg.ChangefeedSettings = cfInfo.Config.Scheduler
epoch := cfInfo.Epoch
cfg.ChangefeedSettings = info.Config.Scheduler
epoch := info.Epoch
c.scheduler, err = c.newScheduler(ctx, c.id, c.upstream, epoch, &cfg, c.redoMetaMgr, c.globalVars)
if err != nil {
return errors.Trace(err)
Expand All @@ -735,15 +738,15 @@ LOOP2:
c.initMetrics()

c.initialized.Store(true)
c.metricsChangefeedCreateTimeGuage.Set(float64(oracle.GetPhysical(cfInfo.CreateTime)))
c.metricsChangefeedCreateTimeGuage.Set(float64(oracle.GetPhysical(info.CreateTime)))
c.metricsChangefeedRestartTimeGauge.Set(float64(oracle.GetPhysical(time.Now())))
log.Info("changefeed initialized",
zap.String("namespace", c.id.Namespace),
zap.String("changefeed", c.id.ID),
zap.Uint64("changefeedEpoch", epoch),
zap.Uint64("checkpointTs", checkpointTs),
zap.Uint64("resolvedTs", c.resolvedTs),
zap.String("info", cfInfo.String()))
zap.String("info", info.String()))

return nil
}
Expand Down
Loading

0 comments on commit 4560889

Please sign in to comment.