Skip to content

Commit

Permalink
*: fix region stats check (#7748)
Browse files Browse the repository at this point in the history
close #7728

Signed-off-by: Cabinfever_B <[email protected]>
  • Loading branch information
CabinfeverB committed Feb 6, 2024
1 parent 0419004 commit cce1464
Show file tree
Hide file tree
Showing 10 changed files with 136 additions and 31 deletions.
2 changes: 1 addition & 1 deletion pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func HandleOverlaps(c Cluster, overlaps []*core.RegionInfo) {
}

// Collect collects the cluster information.
func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats, isNew, isPrepared bool) {
func Collect(c Cluster, region *core.RegionInfo, stores []*core.StoreInfo, hasRegionStats bool) {
if hasRegionStats {
c.GetRegionStats().Observe(region, stores)
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/core/region.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,12 @@ func (r *RegionInfo) LoadedFromStorage() bool {
return r.source == Storage
}

// LoadedFromSync means this region's meta info loaded from region syncer.
// Only used for test.
func (r *RegionInfo) LoadedFromSync() bool {
return r.source == Sync
}

// NewRegionInfo creates RegionInfo with region's meta and leader peer.
func NewRegionInfo(region *metapb.Region, leader *metapb.Peer, opts ...RegionCreateOption) *RegionInfo {
regionInfo := &RegionInfo{
Expand Down Expand Up @@ -705,7 +711,7 @@ func (r *RegionInfo) isRegionRecreated() bool {

// RegionGuideFunc is a function that determines which follow-up operations need to be performed based on the origin
// and new region information.
type RegionGuideFunc func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool)
type RegionGuideFunc func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool)

// GenerateRegionGuideFunc is used to generate a RegionGuideFunc. Control the log output by specifying the log function.
// nil means do not print the log.
Expand All @@ -718,19 +724,15 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
}
// Save to storage if meta is updated.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
return func(region, origin *RegionInfo) (isNew, saveKV, saveCache, needSync bool) {
return func(region, origin *RegionInfo) (saveKV, saveCache, needSync bool) {
if origin == nil {
if log.GetLevel() <= zap.DebugLevel {
debug("insert new region",
zap.Uint64("region-id", region.GetID()),
logutil.ZapRedactStringer("meta-region", RegionToHexMeta(region.GetMeta())))
}
saveKV, saveCache, isNew = true, true, true
saveKV, saveCache = true, true
} else {
if origin.LoadedFromStorage() {
isNew = true
}
r := region.GetRegionEpoch()
o := origin.GetRegionEpoch()
if r.GetVersion() > o.GetVersion() {
Expand All @@ -756,9 +758,7 @@ func GenerateRegionGuideFunc(enableLog bool) RegionGuideFunc {
saveKV, saveCache = true, true
}
if region.GetLeader().GetId() != origin.GetLeader().GetId() {
if origin.GetLeader().GetId() == 0 {
isNew = true
} else if log.GetLevel() <= zap.InfoLevel {
if origin.GetLeader().GetId() != 0 && log.GetLevel() <= zap.InfoLevel {
info("leader changed",
zap.Uint64("region-id", region.GetID()),
zap.Uint64("from", origin.GetLeader().GetStoreId()),
Expand Down
2 changes: 1 addition & 1 deletion pkg/core/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ func TestNeedSync(t *testing.T) {
for _, testCase := range testCases {
regionA := region.Clone(testCase.optionsA...)
regionB := region.Clone(testCase.optionsB...)
_, _, _, needSync := RegionGuide(regionA, regionB)
_, _, needSync := RegionGuide(regionA, regionB)
re.Equal(testCase.needSync, needSync)
}
}
Expand Down
7 changes: 3 additions & 4 deletions pkg/mcs/scheduling/server/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -557,9 +557,8 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, _, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin)
if !saveCache && !isNew {
_, saveCache, _ := core.GenerateRegionGuideFunc(true)(region, origin)
if !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand All @@ -581,7 +580,7 @@ func (c *Cluster) processRegionHeartbeat(region *core.RegionInfo) error {
cluster.HandleOverlaps(c, overlaps)
}

cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, c.IsPrepared())
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats)
return nil
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/mcs/scheduling/server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (s *Service) RegionHeartbeat(stream schedulingpb.Scheduling_RegionHeartbeat
s.hbStreams.BindStream(storeID, server)
lastBind = time.Now()
}
region := core.RegionFromHeartbeat(request, core.SetSource(core.Heartbeat))
region := core.RegionFromHeartbeat(request)
err = c.HandleRegionHeartbeat(region)
if err != nil {
// TODO: if we need to send the error back to API server.
Expand Down
25 changes: 20 additions & 5 deletions pkg/statistics/region_collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ type RegionInfoProvider interface {
// RegionStatisticType represents the type of the region's status.
type RegionStatisticType uint32

const emptyStatistic = RegionStatisticType(0)

// region status type
const (
MissPeer RegionStatisticType = 1 << iota
Expand Down Expand Up @@ -148,6 +150,9 @@ func (r *RegionStatistics) deleteEntry(deleteIndex RegionStatisticType, regionID
// due to some special state types.
func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool {
regionID := region.GetID()
if !r.isObserved(regionID) {
return true
}
if r.IsRegionStatsType(regionID, OversizedRegion) !=
region.IsOversized(int64(r.conf.GetRegionMaxSize()), int64(r.conf.GetRegionMaxKeys())) {
return true
Expand All @@ -156,6 +161,14 @@ func (r *RegionStatistics) RegionStatsNeedUpdate(region *core.RegionInfo) bool {
region.NeedMerge(int64(r.conf.GetMaxMergeRegionSize()), int64(r.conf.GetMaxMergeRegionKeys()))
}

// isObserved returns whether the region is observed. And it also shows whether PD received heartbeat of this region.
func (r *RegionStatistics) isObserved(id uint64) bool {
r.RLock()
defer r.RUnlock()
_, ok := r.index[id]
return ok
}

// Observe records the current regions' status.
func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.StoreInfo) {
r.Lock()
Expand All @@ -164,7 +177,6 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store
desiredReplicas = r.conf.GetMaxReplicas()
desiredVoters = desiredReplicas
peerTypeIndex RegionStatisticType
deleteIndex RegionStatisticType
)
// Check if the region meets count requirements of its rules.
if r.conf.IsPlacementRulesEnabled() {
Expand Down Expand Up @@ -240,10 +252,10 @@ func (r *RegionStatistics) Observe(region *core.RegionInfo, stores []*core.Store
}
}
// Remove the info if any of the conditions are not met any more.
if oldIndex, ok := r.index[regionID]; ok {
deleteIndex = oldIndex &^ peerTypeIndex
if oldIndex, ok := r.index[regionID]; ok && oldIndex > emptyStatistic {
deleteIndex := oldIndex &^ peerTypeIndex
r.deleteEntry(deleteIndex, regionID)
}
r.deleteEntry(deleteIndex, regionID)
r.index[regionID] = peerTypeIndex
}

Expand All @@ -252,7 +264,10 @@ func (r *RegionStatistics) ClearDefunctRegion(regionID uint64) {
r.Lock()
defer r.Unlock()
if oldIndex, ok := r.index[regionID]; ok {
r.deleteEntry(oldIndex, regionID)
delete(r.index, regionID)
if oldIndex > emptyStatistic {
r.deleteEntry(oldIndex, regionID)
}
}
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/syncer/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) {
log.Debug("region is stale", zap.Stringer("origin", origin.GetMeta()), errs.ZapError(err))
continue
}
_, saveKV, _, _ := regionGuide(region, origin)
saveKV, _, _ := regionGuide(region, origin)
overlaps := bc.PutRegion(region)

if hasBuckets {
Expand Down
2 changes: 2 additions & 0 deletions server/api/region_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@ func (suite *regionTestSuite) TestRegionCheck() {
histKeys := []*histItem{{Start: 1000, End: 1999, Count: 1}}
re.Equal(histKeys, r7)

// ref https://github.com/tikv/pd/issues/3558, we should change size to pass `NeedUpdate` for observing.
r = r.Clone(core.SetApproximateKeys(0))
mustPutStore(re, suite.svr, 2, metapb.StoreState_Offline, metapb.NodeState_Removing, []*metapb.StoreLabel{})
mustRegionHeartbeat(re, suite.svr, r)
url = fmt.Sprintf("%s/regions/check/%s", suite.urlPrefix, "offline-peer")
Expand Down
11 changes: 3 additions & 8 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -1004,9 +1004,8 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
hasRegionStats := c.regionStats != nil
// Save to storage if meta is updated, except for flashback.
// Save to cache if meta or leader is updated, or contains any down/pending peer.
// Mark isNew if the region in cache does not have leader.
isNew, saveKV, saveCache, needSync := regionGuide(region, origin)
if !saveKV && !saveCache && !isNew {
saveKV, saveCache, needSync := regionGuide(region, origin)
if !saveKV && !saveCache {
// Due to some config changes need to update the region stats as well,
// so we do some extra checks here.
if hasRegionStats && c.regionStats.RegionStatsNeedUpdate(region) {
Expand Down Expand Up @@ -1037,11 +1036,7 @@ func (c *RaftCluster) processRegionHeartbeat(region *core.RegionInfo) error {
regionUpdateCacheEventCounter.Inc()
}

isPrepared := true
if !c.IsServiceIndependent(mcsutils.SchedulingServiceName) {
isPrepared = c.IsPrepared()
}
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats, isNew, isPrepared)
cluster.Collect(c, region, c.GetRegionStores(region), hasRegionStats)

if c.storage != nil {
// If there are concurrent heartbeats from the same region, the last write will win even if
Expand Down
94 changes: 94 additions & 0 deletions tests/server/cluster/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (
sc "github.com/tikv/pd/pkg/schedule/config"
"github.com/tikv/pd/pkg/schedule/operator"
"github.com/tikv/pd/pkg/schedule/schedulers"
"github.com/tikv/pd/pkg/statistics"
"github.com/tikv/pd/pkg/storage"
"github.com/tikv/pd/pkg/syncer"
"github.com/tikv/pd/pkg/tso"
Expand Down Expand Up @@ -181,6 +182,99 @@ func TestDamagedRegion(t *testing.T) {
re.Equal(uint64(1), rc.GetOperatorController().OperatorCount(operator.OpAdmin))
}

func TestRegionStatistics(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
tc, err := tests.NewTestCluster(ctx, 2)
defer tc.Destroy()
re.NoError(err)

err = tc.RunInitialServers()
re.NoError(err)

leaderName := tc.WaitLeader()
leaderServer := tc.GetLeaderServer()
grpcPDClient := testutil.MustNewGrpcClient(re, leaderServer.GetAddr())
clusterID := leaderServer.GetClusterID()
bootstrapCluster(re, clusterID, grpcPDClient)
rc := leaderServer.GetRaftCluster()

region := &metapb.Region{
Id: 10,
StartKey: []byte("abc"),
EndKey: []byte("xyz"),
Peers: []*metapb.Peer{
{Id: 101, StoreId: 1},
{Id: 102, StoreId: 2},
{Id: 103, StoreId: 3},
{Id: 104, StoreId: 4, Role: metapb.PeerRole_Learner},
},
}

// To put region.
regionInfo := core.NewRegionInfo(region, region.Peers[0], core.SetApproximateSize(0))
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
regions := rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Len(regions, 1)

// wait for sync region
time.Sleep(1000 * time.Millisecond)

leaderServer.ResignLeader()
newLeaderName := tc.WaitLeader()
re.NotEqual(newLeaderName, leaderName)
leaderServer = tc.GetLeaderServer()
rc = leaderServer.GetRaftCluster()
r := rc.GetRegion(region.Id)
re.NotNil(r)
re.True(r.LoadedFromSync())
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Empty(regions)
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Len(regions, 1)

leaderServer.ResignLeader()
newLeaderName = tc.WaitLeader()
re.Equal(newLeaderName, leaderName)
leaderServer = tc.GetLeaderServer()
rc = leaderServer.GetRaftCluster()
re.NotNil(r)
re.True(r.LoadedFromStorage() || r.LoadedFromSync())
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Empty(regions)
regionInfo = regionInfo.Clone(core.SetSource(core.Heartbeat), core.SetApproximateSize(30))
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
rc = leaderServer.GetRaftCluster()
r = rc.GetRegion(region.Id)
re.NotNil(r)
re.False(r.LoadedFromStorage() && r.LoadedFromSync())

leaderServer.ResignLeader()
newLeaderName = tc.WaitLeader()
re.NotEqual(newLeaderName, leaderName)
leaderServer.ResignLeader()
newLeaderName = tc.WaitLeader()
re.Equal(newLeaderName, leaderName)
leaderServer = tc.GetLeaderServer()
rc = leaderServer.GetRaftCluster()
r = rc.GetRegion(region.Id)
re.NotNil(r)
re.False(r.LoadedFromStorage() && r.LoadedFromSync())
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Empty(regions)

regionInfo = regionInfo.Clone(core.SetSource(core.Heartbeat), core.SetApproximateSize(30))
err = tc.HandleRegionHeartbeat(regionInfo)
re.NoError(err)
regions = rc.GetRegionStatsByType(statistics.LearnerPeer)
re.Len(regions, 1)
}

func TestStaleRegion(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit cce1464

Please sign in to comment.