Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cordinator(ticdc): Fix Puller Resolved TS Lag Calculation and Deprecate current_ts Field in Stats #11624

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions cdc/processor/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,11 +386,9 @@ func (p *processor) getStatsFromSourceManagerAndSinkManager(
span tablepb.Span, sinkStats sinkmanager.TableStats,
) tablepb.Stats {
pullerStats := p.sourceManager.r.GetTablePullerStats(span)
now := p.upstream.PDClock.CurrentTime()

stats := tablepb.Stats{
RegionCount: pullerStats.RegionCount,
CurrentTs: oracle.ComposeTS(oracle.GetPhysical(now), 0),
BarrierTs: sinkStats.BarrierTs,
StageCheckpoints: map[string]tablepb.Checkpoint{
"puller-ingress": {
Expand Down
2 changes: 1 addition & 1 deletion cdc/processor/tablepb/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ message Stats {
// Number of captured regions.
uint64 region_count = 1;
// The current timestamp from the table's point of view.
uint64 current_ts = 2 [(gogoproto.casttype) = "Ts"];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we remove this field?

Copy link
Contributor Author

@wlwilliamx wlwilliamx Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently, we cannot directly remove the current_ts field from the protobuf definition. Although this message is used solely for communication within the TiCDC cluster, there is a risk of incompatibility during rolling upgrades. In such scenarios, different versions of TiCDC (coordinator and processor) may coexist, and removing the current_ts field could lead to communication errors between nodes running different versions.

However, the field is being deprecated and is no longer used for any critical functionality. Once we are confident that it is not in use across any versions, we can plan to safely remove it in a future release without breaking compatibility.

uint64 current_ts = 2 [(gogoproto.casttype) = "Ts", deprecated = true]; // Deprecated: Do not use this field.
// Checkponits at each stage.
map<string, Checkpoint> stage_checkpoints = 3 [(gogoproto.nullable) = false];
// The barrier timestamp of the table.
Expand Down
8 changes: 7 additions & 1 deletion cdc/scheduler/internal/v3/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,13 @@
}
c.lastCollectTime = now

pdTime := now
// only nil in unit test
if c.pdClock != nil {
pdTime = c.pdClock.CurrentTime()
}

Check warning on line 456 in cdc/scheduler/internal/v3/coordinator.go

View check run for this annotation

Codecov / codecov/patch

cdc/scheduler/internal/v3/coordinator.go#L455-L456

Added lines #L455 - L456 were not covered by tests

c.schedulerM.CollectMetrics()
c.replicationM.CollectMetrics()
c.replicationM.CollectMetrics(pdTime)
c.captureM.CollectMetrics()
}
12 changes: 5 additions & 7 deletions cdc/scheduler/internal/v3/replication/replication_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@
}

// CollectMetrics collects metrics.
func (r *Manager) CollectMetrics() {
func (r *Manager) CollectMetrics(currentPDTime time.Time) {

Check warning on line 776 in cdc/scheduler/internal/v3/replication/replication_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/scheduler/internal/v3/replication/replication_manager.go#L776

Added line #L776 was not covered by tests
cf := r.changefeedID
tableGauge.
WithLabelValues(cf.Namespace, cf.ID).Set(float64(r.spans.Len()))
Expand All @@ -790,13 +790,12 @@
WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyRTs))

// Slow table latency metrics.
phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
for stage, checkpoint := range table.Stats.StageCheckpoints {
// Checkpoint ts
phyCkpTs := oracle.ExtractPhysical(checkpoint.CheckpointTs)
slowestTableStageCheckpointTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyCkpTs))
checkpointLag := float64(phyCurrentTs-phyCkpTs) / 1e3
checkpointLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.CheckpointTs)).Seconds()

Check warning on line 798 in cdc/scheduler/internal/v3/replication/replication_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/scheduler/internal/v3/replication/replication_manager.go#L798

Added line #L798 was not covered by tests
slowestTableStageCheckpointTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(checkpointLag)
slowestTableStageCheckpointTsLagHistogramVec.
Expand All @@ -805,7 +804,7 @@
phyRTs := oracle.ExtractPhysical(checkpoint.ResolvedTs)
slowestTableStageResolvedTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyRTs))
resolvedTsLag := float64(phyCurrentTs-phyRTs) / 1e3
resolvedTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(checkpoint.ResolvedTs)).Seconds()

Check warning on line 807 in cdc/scheduler/internal/v3/replication/replication_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/scheduler/internal/v3/replication/replication_manager.go#L807

Added line #L807 was not covered by tests
slowestTableStageResolvedTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(resolvedTsLag)
slowestTableStageResolvedTsLagHistogramVec.
Expand All @@ -816,7 +815,7 @@
phyBTs := oracle.ExtractPhysical(table.Stats.BarrierTs)
slowestTableStageResolvedTsGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(float64(phyBTs))
barrierTsLag := float64(phyCurrentTs-phyBTs) / 1e3
barrierTsLag := currentPDTime.Sub(oracle.GetTimeFromTS(table.Stats.BarrierTs)).Seconds()

Check warning on line 818 in cdc/scheduler/internal/v3/replication/replication_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/scheduler/internal/v3/replication/replication_manager.go#L818

Added line #L818 was not covered by tests
slowestTableStageResolvedTsLagGaugeVec.
WithLabelValues(cf.Namespace, cf.ID, stage).Set(barrierTsLag)
slowestTableStageResolvedTsLagHistogramVec.
Expand Down Expand Up @@ -867,8 +866,7 @@
phyCkptTs := oracle.ExtractPhysical(pullerCkpt.ResolvedTs)
slowestTablePullerResolvedTs.WithLabelValues(cf.Namespace, cf.ID).Set(float64(phyCkptTs))

phyCurrentTs := oracle.ExtractPhysical(table.Stats.CurrentTs)
lag := float64(phyCurrentTs-phyCkptTs) / 1e3
lag := currentPDTime.Sub(oracle.GetTimeFromTS(pullerCkpt.ResolvedTs)).Seconds()

Check warning on line 869 in cdc/scheduler/internal/v3/replication/replication_manager.go

View check run for this annotation

Codecov / codecov/patch

cdc/scheduler/internal/v3/replication/replication_manager.go#L869

Added line #L869 was not covered by tests
slowestTablePullerResolvedTsLag.WithLabelValues(cf.Namespace, cf.ID).Set(lag)
}
}
Expand Down
Loading