From 93f6ab6aebcdb09a1bd585e476905ea668fbcb50 Mon Sep 17 00:00:00 2001 From: Ling Jin <7138436+3AceShowHand@users.noreply.github.com> Date: Wed, 25 Sep 2024 15:37:14 +0800 Subject: [PATCH] kvclient(ticdc): add table id to kv client logs (#11622) close pingcap/tiflow#11621 --- cdc/kv/region_state.go | 2 +- cdc/kv/shared_client.go | 45 ++++++++++++++------- cdc/kv/shared_region_worker.go | 10 ++--- cdc/kv/shared_stream.go | 73 +++++++++++++++------------------- cdc/processor/processor.go | 20 +++++----- cdc/processor/tablepb/table.go | 14 +++---- 6 files changed, 84 insertions(+), 80 deletions(-) diff --git a/cdc/kv/region_state.go b/cdc/kv/region_state.go index 8b662337beb..42c46cf53dd 100644 --- a/cdc/kv/region_state.go +++ b/cdc/kv/region_state.go @@ -45,7 +45,7 @@ type regionInfo struct { lockedRangeState *regionlock.LockedRangeState } -func (s regionInfo) isStoped() bool { +func (s regionInfo) isStopped() bool { // lockedRange only nil when the region's subscribedTable is stopped. return s.lockedRangeState == nil } diff --git a/cdc/kv/shared_client.go b/cdc/kv/shared_client.go index 186c000df85..789fbfa6f30 100644 --- a/cdc/kv/shared_client.go +++ b/cdc/kv/shared_client.go @@ -288,7 +288,6 @@ func (s *SharedClient) Subscribe(subID SubscriptionID, span tablepb.Span, startT s.totalSpans.Lock() s.totalSpans.v[subID] = rt s.totalSpans.Unlock() - s.rangeTaskCh.In() <- rangeTask{span: span, subscribedTable: rt} log.Info("event feed subscribes table success", zap.String("namespace", s.changefeed.Namespace), @@ -306,13 +305,17 @@ func (s *SharedClient) Unsubscribe(subID SubscriptionID) { s.totalSpans.Unlock() if rt != nil { s.setTableStopped(rt) + log.Info("event feed unsubscribes table", + zap.String("namespace", s.changefeed.Namespace), + zap.String("changefeed", s.changefeed.ID), + zap.Any("subscriptionID", rt.subscriptionID), + zap.String("span", rt.span.String())) + return } - - log.Info("event feed unsubscribes table", + log.Warn("event feed unsubscribes table, but not found", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), - zap.Any("subscriptionID", rt.subscriptionID), - zap.Bool("exists", rt != nil)) + zap.Any("subscriptionID", subID)) } // ResolveLock is a function. If outsider subscribers find a span resolved timestamp is @@ -382,7 +385,8 @@ func (s *SharedClient) setTableStopped(rt *subscribedTable) { log.Info("event feed starts to stop table", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), - zap.Any("subscriptionID", rt.subscriptionID)) + zap.Any("subscriptionID", rt.subscriptionID), + zap.Int64("tableID", rt.span.TableID)) // Set stopped to true so we can stop handling region events from the table. // Then send a special singleRegionInfo to regionRouter to deregister the table @@ -399,7 +403,8 @@ func (s *SharedClient) onTableDrained(rt *subscribedTable) { log.Info("event feed stop table is finished", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), - zap.Any("subscriptionID", rt.subscriptionID)) + zap.Any("subscriptionID", rt.subscriptionID), + zap.Int64("tableID", rt.span.TableID)) s.totalSpans.Lock() defer s.totalSpans.Unlock() @@ -418,7 +423,7 @@ func (s *SharedClient) handleRegions(ctx context.Context, eg *errgroup.Group) er case <-ctx.Done(): return errors.Trace(ctx.Err()) case region := <-s.regionCh.Out(): - if region.isStoped() { + if region.isStopped() { for _, rs := range s.stores { s.broadcastRequest(rs, region) } @@ -441,6 +446,7 @@ func (s *SharedClient) handleRegions(ctx context.Context, eg *errgroup.Group) er zap.Uint64("streamID", stream.streamID), zap.Any("subscriptionID", region.subscribedTable.subscriptionID), zap.Uint64("regionID", region.verID.GetID()), + zap.String("span", region.span.String()), zap.Uint64("storeID", store.storeID), zap.String("addr", store.storeAddr)) } @@ -554,7 +560,7 @@ func (s *SharedClient) divideSpanAndScheduleRegionRequests( zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Any("subscriptionID", subscribedTable.subscriptionID), - zap.Any("span", nextSpan), + zap.String("span", nextSpan.String()), zap.Error(err)) backoffBeforeLoad = true continue @@ -572,7 +578,7 @@ func (s *SharedClient) divideSpanAndScheduleRegionRequests( zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Any("subscriptionID", subscribedTable.subscriptionID), - zap.Any("span", nextSpan)) + zap.String("span", nextSpan.String())) backoffBeforeLoad = true continue } @@ -590,7 +596,8 @@ func (s *SharedClient) divideSpanAndScheduleRegionRequests( log.Panic("event feed check spans intersect shouldn't fail", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), - zap.Any("subscriptionID", subscribedTable.subscriptionID)) + zap.Any("subscriptionID", subscribedTable.subscriptionID), + zap.String("span", nextSpan.String())) } verID := tikv.NewRegionVerID(regionMeta.Id, regionMeta.RegionEpoch.ConfVer, regionMeta.RegionEpoch.Version) @@ -676,6 +683,8 @@ func (s *SharedClient) doHandleError(ctx context.Context, errInfo regionErrorInf zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Any("subscriptionID", errInfo.subscribedTable.subscriptionID), + zap.Uint64("regionID", errInfo.verID.GetID()), + zap.Int64("tableID", errInfo.span.TableID), zap.Stringer("error", innerErr)) if notLeader := innerErr.GetNotLeader(); notLeader != nil { @@ -720,6 +729,8 @@ func (s *SharedClient) doHandleError(ctx context.Context, errInfo regionErrorInf zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Any("subscriptionID", errInfo.subscribedTable.subscriptionID), + zap.Uint64("regionID", errInfo.verID.GetID()), + zap.Int64("tableID", errInfo.span.TableID), zap.Stringer("error", innerErr)) metricFeedUnknownErrorCounter.Inc() s.scheduleRegionRequest(ctx, errInfo.regionInfo) @@ -740,6 +751,8 @@ func (s *SharedClient) doHandleError(ctx context.Context, errInfo regionErrorInf zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Any("subscriptionID", errInfo.subscribedTable.subscriptionID), + zap.Uint64("regionID", errInfo.verID.GetID()), + zap.Int64("tableID", errInfo.span.TableID), zap.Error(err)) return err } @@ -816,7 +829,7 @@ func (s *SharedClient) logSlowRegions(ctx context.Context) error { currTime := s.pdClock.CurrentTime() s.totalSpans.RLock() - slowInitializeRegion := 0 + var slowInitializeRegionCount int for subscriptionID, rt := range s.totalSpans.v { attr := rt.rangeLock.IterAll(nil) ckptTime := oracle.GetTimeFromTS(attr.SlowestRegion.ResolvedTs) @@ -826,20 +839,23 @@ func (s *SharedClient) logSlowRegions(ctx context.Context) error { zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Any("subscriptionID", subscriptionID), + zap.Int64("tableID", rt.span.TableID), zap.Any("slowRegion", attr.SlowestRegion)) } } else if currTime.Sub(attr.SlowestRegion.Created) > 10*time.Minute { - slowInitializeRegion += 1 + slowInitializeRegionCount += 1 log.Info("event feed initializes a region too slow", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Any("subscriptionID", subscriptionID), + zap.Int64("tableID", rt.span.TableID), zap.Any("slowRegion", attr.SlowestRegion)) } else if currTime.Sub(ckptTime) > 10*time.Minute { log.Info("event feed finds a uninitialized slow region", zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Any("subscriptionID", subscriptionID), + zap.Int64("tableID", rt.span.TableID), zap.Any("slowRegion", attr.SlowestRegion)) } if len(attr.UnLockedRanges) > 0 { @@ -847,11 +863,12 @@ func (s *SharedClient) logSlowRegions(ctx context.Context) error { zap.String("namespace", s.changefeed.Namespace), zap.String("changefeed", s.changefeed.ID), zap.Any("subscriptionID", subscriptionID), + zap.Int64("tableID", rt.span.TableID), zap.Any("holes", attr.UnLockedRanges)) } } s.totalSpans.RUnlock() - s.metrics.slowInitializeRegion.Set(float64(slowInitializeRegion)) + s.metrics.slowInitializeRegion.Set(float64(slowInitializeRegionCount)) } } diff --git a/cdc/kv/shared_region_worker.go b/cdc/kv/shared_region_worker.go index 032a5d8d995..f00b131d3d7 100644 --- a/cdc/kv/shared_region_worker.go +++ b/cdc/kv/shared_region_worker.go @@ -175,6 +175,7 @@ func (w *sharedRegionWorker) handleSingleRegionError(state *regionFeedState, str zap.Uint64("streamID", stream.streamID), zap.Any("subscriptionID", state.getRegionID()), zap.Uint64("regionID", state.region.verID.GetID()), + zap.Int64("tableID", state.region.span.TableID), zap.Bool("reschedule", stepsToRemoved), zap.Error(err)) } @@ -228,12 +229,6 @@ func (w *sharedRegionWorker) handleEventEntry(ctx context.Context, x *cdcpb.Even } } tableID := state.region.subscribedTable.span.TableID - log.Debug("region worker get an Event", - zap.String("namespace", w.changefeed.Namespace), - zap.String("changefeed", w.changefeed.ID), - zap.Any("subscriptionID", state.region.subscribedTable.subscriptionID), - zap.Int64("tableID", tableID), - zap.Int("rows", len(x.Entries.GetEntries()))) return handleEventEntry(x, startTs, state, w.metrics, emit, w.changefeed, tableID, w.client.logRegionDetails) } @@ -250,7 +245,7 @@ func handleEventEntry( regionID, regionSpan, _ := state.getRegionMeta() for _, entry := range x.Entries.GetEntries() { // NOTE: from TiKV 7.0.0, entries are already filtered out in TiKV side. - // We can remove the check in future. + // We can remove the check in the future. comparableKey := spanz.ToComparableKey(entry.GetKey()) if entry.Type != cdcpb.Event_INITIALIZED && !spanz.KeyInSpan(comparableKey, regionSpan) { @@ -266,6 +261,7 @@ func handleEventEntry( zap.String("changefeed", changefeed.ID), zap.Int64("tableID", tableID), zap.Uint64("regionID", regionID), + zap.Int64("tableID", state.region.span.TableID), zap.Uint64("requestID", state.requestID), zap.Stringer("span", &state.region.span)) diff --git a/cdc/kv/shared_stream.go b/cdc/kv/shared_stream.go index 42d8e718292..7a9b1204f89 100644 --- a/cdc/kv/shared_stream.go +++ b/cdc/kv/shared_stream.go @@ -76,7 +76,7 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque case <-ctx.Done(): return ctx.Err() case region := <-stream.requests.Out(): - if !region.isStoped() { + if !region.isStopped() { stream.preFetchForConnecting = new(regionInfo) *stream.preFetchForConnecting = region return nil @@ -104,7 +104,7 @@ func newStream(ctx context.Context, c *SharedClient, g *errgroup.Group, r *reque // Why we need to re-schedule pending regions? This because the store can // fail forever, and all regions are scheduled to other stores. for _, region := range stream.clearPendingRegions() { - if region.isStoped() { + if region.isStopped() { // It means it's a special task for stopping the table. continue } @@ -254,30 +254,6 @@ func (s *requestedStream) receive( } func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *requestedStore) (err error) { - doSend := func(cc *sharedconn.ConnAndClient, req *cdcpb.ChangeDataRequest, subscriptionID SubscriptionID) error { - if err := cc.Client().Send(req); err != nil { - log.Warn("event feed send request to grpc stream failed", - zap.String("namespace", c.changefeed.Namespace), - zap.String("changefeed", c.changefeed.ID), - zap.Uint64("streamID", s.streamID), - zap.Any("subscriptionID", subscriptionID), - zap.Uint64("regionID", req.RegionId), - zap.Uint64("storeID", rs.storeID), - zap.String("addr", rs.storeAddr), - zap.Error(err)) - return errors.Trace(err) - } - log.Debug("event feed send request to grpc stream success", - zap.String("namespace", c.changefeed.Namespace), - zap.String("changefeed", c.changefeed.ID), - zap.Uint64("streamID", s.streamID), - zap.Any("subscriptionID", subscriptionID), - zap.Uint64("regionID", req.RegionId), - zap.Uint64("storeID", rs.storeID), - zap.String("addr", rs.storeAddr)) - return nil - } - fetchMoreReq := func() (regionInfo, error) { waitReqTicker := time.NewTicker(60 * time.Second) defer waitReqTicker.Stop() @@ -329,23 +305,24 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request s.preFetchForConnecting = nil for { subscriptionID := region.subscribedTable.subscriptionID - log.Debug("event feed gets a singleRegionInfo", - zap.String("namespace", c.changefeed.Namespace), - zap.String("changefeed", c.changefeed.ID), - zap.Uint64("streamID", s.streamID), - zap.Any("subscriptionID", subscriptionID), - zap.Uint64("regionID", region.verID.GetID()), - zap.Uint64("storeID", rs.storeID), - zap.String("addr", rs.storeAddr)) // It means it's a special task for stopping the table. - if region.isStoped() { + if region.isStopped() { if s.multiplexing != nil { req := &cdcpb.ChangeDataRequest{ RequestId: uint64(subscriptionID), Request: &cdcpb.ChangeDataRequest_Deregister_{}, } - if err = doSend(s.multiplexing, req, subscriptionID); err != nil { - return err + if err = s.multiplexing.Client().Send(req); err != nil { + log.Warn("event feed send deregister request to grpc stream failed", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", s.streamID), + zap.Any("subscriptionID", subscriptionID), + zap.Int64("tableID", region.span.TableID), + zap.Uint64("regionID", req.RegionId), + zap.Uint64("storeID", rs.storeID), + zap.String("addr", rs.storeAddr), + zap.Error(err)) } } else if cc := tableExclusives[subscriptionID]; cc != nil { delete(tableExclusives, subscriptionID) @@ -385,8 +362,17 @@ func (s *requestedStream) send(ctx context.Context, c *SharedClient, rs *request } else if cc, err = getTableExclusiveConn(subscriptionID); err != nil { return err } - if err = doSend(cc, c.createRegionRequest(region), subscriptionID); err != nil { - return err + if err = cc.Client().Send(c.createRegionRequest(region)); err != nil { + log.Warn("event feed send request to grpc stream failed", + zap.String("namespace", c.changefeed.Namespace), + zap.String("changefeed", c.changefeed.ID), + zap.Uint64("streamID", s.streamID), + zap.Any("subscriptionID", subscriptionID), + zap.Uint64("regionID", region.verID.GetID()), + zap.Int64("tableID", region.span.TableID), + zap.Uint64("storeID", rs.storeID), + zap.String("addr", rs.storeAddr), + zap.Error(err)) } } @@ -483,14 +469,19 @@ func (s *requestedStream) sendRegionChangeEvents( state := s.getState(subscriptionID, regionID) switch x := event.Event.(type) { case *cdcpb.Event_Error: - s.logRegionDetails("event feed receives a region error", + fields := []zap.Field{ zap.String("namespace", c.changefeed.Namespace), zap.String("changefeed", c.changefeed.ID), zap.Uint64("streamID", s.streamID), zap.Any("subscriptionID", subscriptionID), zap.Uint64("regionID", event.RegionId), zap.Bool("stateIsNil", state == nil), - zap.Any("error", x.Error)) + zap.Any("error", x.Error), + } + if state != nil { + fields = append(fields, zap.Int64("tableID", state.region.span.TableID)) + } + s.logRegionDetails("event feed receives a region error", fields...) } if state != nil { diff --git a/cdc/processor/processor.go b/cdc/processor/processor.go index 76bdbfb502f..dd0aede484b 100644 --- a/cdc/processor/processor.go +++ b/cdc/processor/processor.go @@ -599,13 +599,13 @@ func isMysqlCompatibleBackend(sinkURIStr string) (bool, error) { } // lazyInitImpl create Filter, SchemaStorage, Mounter instances at the first tick. -func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) { +func (p *processor) lazyInitImpl(_ context.Context) (err error) { if p.initialized.Load() { return nil } // Here we use a separated context for sub-components, so we can custom the // order of stopping all sub-components when closing the processor. - prcCtx := context.Background() + ctx := context.Background() var tz *time.Location // todo: get the timezone from the global config or the changefeed config? @@ -622,21 +622,21 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) { return errors.Trace(err) } - if err = p.initDDLHandler(prcCtx); err != nil { + if err = p.initDDLHandler(ctx); err != nil { return err } p.ddlHandler.name = "ddlHandler" p.ddlHandler.changefeedID = p.changefeedID - p.ddlHandler.spawn(prcCtx) + p.ddlHandler.spawn(ctx) p.mg.r = entry.NewMounterGroup(p.ddlHandler.r.schemaStorage, cfConfig.Mounter.WorkerNum, p.filter, tz, p.changefeedID, cfConfig.Integrity) p.mg.name = "MounterGroup" p.mg.changefeedID = p.changefeedID - p.mg.spawn(prcCtx) + p.mg.spawn(ctx) - sourceID, err := pdutil.GetSourceID(prcCtx, p.upstream.PDClient) + sourceID, err := pdutil.GetSourceID(ctx, p.upstream.PDClient) if err != nil { return errors.Trace(err) } @@ -646,7 +646,7 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) { p.redo.r = redo.NewDMLManager(p.changefeedID, cfConfig.Consistent) p.redo.name = "RedoManager" p.redo.changefeedID = p.changefeedID - p.redo.spawn(prcCtx) + p.redo.spawn(ctx) sortEngine, err := p.globalVars.SortEngineFactory.Create(p.changefeedID) log.Info("Processor creates sort engine", @@ -668,18 +668,18 @@ func (p *processor) lazyInitImpl(etcdCtx context.Context) (err error) { isMysqlBackend) p.sourceManager.name = "SourceManager" p.sourceManager.changefeedID = p.changefeedID - p.sourceManager.spawn(prcCtx) + p.sourceManager.spawn(ctx) p.sinkManager.r = sinkmanager.New( p.changefeedID, p.latestInfo.SinkURI, cfConfig, p.upstream, p.ddlHandler.r.schemaStorage, p.redo.r, p.sourceManager.r, isMysqlBackend) p.sinkManager.name = "SinkManager" p.sinkManager.changefeedID = p.changefeedID - p.sinkManager.spawn(prcCtx) + p.sinkManager.spawn(ctx) // Bind them so that sourceManager can notify sinkManager.r. p.sourceManager.r.OnResolve(p.sinkManager.r.UpdateReceivedSorterResolvedTs) - p.agent, err = p.newAgent(prcCtx, p.liveness, p.changefeedEpoch, p.cfg, p.ownerCaptureInfoClient) + p.agent, err = p.newAgent(ctx, p.liveness, p.changefeedEpoch, p.cfg, p.ownerCaptureInfoClient) if err != nil { return err } diff --git a/cdc/processor/tablepb/table.go b/cdc/processor/tablepb/table.go index 23d10bafcde..66010ce6edb 100644 --- a/cdc/processor/tablepb/table.go +++ b/cdc/processor/tablepb/table.go @@ -72,30 +72,30 @@ func (k Key) MarshalJSON() ([]byte, error) { } var ( - _ encoding.TextMarshaler = Span{} + _ encoding.TextMarshaler = &Span{} _ encoding.TextMarshaler = (*Span)(nil) ) // MarshalText implements encoding.TextMarshaler (used in proto.CompactTextString). // It is helpful to format span in log. -func (s Span) MarshalText() ([]byte, error) { +func (s *Span) MarshalText() ([]byte, error) { return []byte(s.String()), nil } func (s *Span) String() string { - length := len("{table_id:,start_key:,end_key:}") + length := len("{tableID:, startKey:, endKey:}") length += 8 // for TableID length += len(s.StartKey) + len(s.EndKey) - b := strings.Builder{} + var b strings.Builder b.Grow(length) - b.Write([]byte("{table_id:")) + b.Write([]byte("{tableID:")) b.Write([]byte(strconv.Itoa(int(s.TableID)))) if len(s.StartKey) > 0 { - b.Write([]byte(",start_key:")) + b.Write([]byte(", startKey:")) b.Write([]byte(s.StartKey.String())) } if len(s.EndKey) > 0 { - b.Write([]byte(",end_key:")) + b.Write([]byte(", endKey:")) b.Write([]byte(s.EndKey.String())) } b.Write([]byte("}"))