Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: reduce rand NewSource #8675

Merged
merged 2 commits into from
Sep 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 52 additions & 47 deletions pkg/schedule/checker/replica_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

import (
"fmt"
"math/rand"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
"github.com/pingcap/log"
Expand Down Expand Up @@ -45,6 +47,7 @@
cluster sche.CheckerCluster
conf config.CheckerConfigProvider
pendingProcessedRegions *cache.TTLUint64
r *rand.Rand
}

// NewReplicaChecker creates a replica checker.
Expand All @@ -53,6 +56,7 @@
cluster: cluster,
conf: conf,
pendingProcessedRegions: pendingProcessedRegions,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

Expand All @@ -67,40 +71,40 @@
}

// Check verifies a region's replicas, creating an operator.Operator if need.
func (r *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator {
func (c *ReplicaChecker) Check(region *core.RegionInfo) *operator.Operator {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it seem to contain the change from another pr?

Copy link
Member Author

@rleungx rleungx Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this change is needed since we use r as the field's naming.

replicaCheckerCounter.Inc()
if r.IsPaused() {
if c.IsPaused() {
replicaCheckerPausedCounter.Inc()
return nil
}
if op := r.checkDownPeer(region); op != nil {
if op := c.checkDownPeer(region); op != nil {
replicaCheckerNewOpCounter.Inc()
op.SetPriorityLevel(constant.High)
return op
}
if op := r.checkOfflinePeer(region); op != nil {
if op := c.checkOfflinePeer(region); op != nil {
replicaCheckerNewOpCounter.Inc()
op.SetPriorityLevel(constant.High)
return op
}
if op := r.checkMakeUpReplica(region); op != nil {
if op := c.checkMakeUpReplica(region); op != nil {
replicaCheckerNewOpCounter.Inc()
op.SetPriorityLevel(constant.High)
return op
}
if op := r.checkRemoveExtraReplica(region); op != nil {
if op := c.checkRemoveExtraReplica(region); op != nil {
replicaCheckerNewOpCounter.Inc()
return op
}
if op := r.checkLocationReplacement(region); op != nil {
if op := c.checkLocationReplacement(region); op != nil {
replicaCheckerNewOpCounter.Inc()
return op
}
return nil
}

func (r *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsRemoveDownReplicaEnabled() {
func (c *ReplicaChecker) checkDownPeer(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsRemoveDownReplicaEnabled() {
return nil
}

Expand All @@ -110,22 +114,22 @@
continue
}
storeID := peer.GetStoreId()
store := r.cluster.GetStore(storeID)
store := c.cluster.GetStore(storeID)
if store == nil {
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return nil
}
// Only consider the state of the Store, not `stats.DownSeconds`.
if store.DownTime() < r.conf.GetMaxStoreDownTime() {
if store.DownTime() < c.conf.GetMaxStoreDownTime() {
continue
}
return r.fixPeer(region, storeID, downStatus)
return c.fixPeer(region, storeID, downStatus)
}
return nil
}

func (r *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsReplaceOfflineReplicaEnabled() {
func (c *ReplicaChecker) checkOfflinePeer(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsReplaceOfflineReplicaEnabled() {
return nil
}

Expand All @@ -136,7 +140,7 @@

for _, peer := range region.GetPeers() {
storeID := peer.GetStoreId()
store := r.cluster.GetStore(storeID)
store := c.cluster.GetStore(storeID)
if store == nil {
log.Warn("lost the store, maybe you are recovering the PD cluster", zap.Uint64("store-id", storeID))
return nil
Expand All @@ -145,71 +149,71 @@
continue
}

return r.fixPeer(region, storeID, offlineStatus)
return c.fixPeer(region, storeID, offlineStatus)
}

return nil
}

func (r *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsMakeUpReplicaEnabled() {
func (c *ReplicaChecker) checkMakeUpReplica(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsMakeUpReplicaEnabled() {
return nil
}
if len(region.GetPeers()) >= r.conf.GetMaxReplicas() {
if len(region.GetPeers()) >= c.conf.GetMaxReplicas() {
return nil
}
log.Debug("region has fewer than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers())))
regionStores := r.cluster.GetRegionStores(region)
target, filterByTempState := r.strategy(region).SelectStoreToAdd(regionStores)
regionStores := c.cluster.GetRegionStores(region)
target, filterByTempState := c.strategy(c.r, region).SelectStoreToAdd(regionStores)
if target == 0 {
log.Debug("no store to add replica", zap.Uint64("region-id", region.GetID()))
replicaCheckerNoTargetStoreCounter.Inc()
if filterByTempState {
r.pendingProcessedRegions.Put(region.GetID(), nil)
c.pendingProcessedRegions.Put(region.GetID(), nil)

Check warning on line 172 in pkg/schedule/checker/replica_checker.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/checker/replica_checker.go#L172

Added line #L172 was not covered by tests
}
return nil
}
newPeer := &metapb.Peer{StoreId: target}
op, err := operator.CreateAddPeerOperator("make-up-replica", r.cluster, region, newPeer, operator.OpReplica)
op, err := operator.CreateAddPeerOperator("make-up-replica", c.cluster, region, newPeer, operator.OpReplica)
if err != nil {
log.Debug("create make-up-replica operator fail", errs.ZapError(err))
return nil
}
return op
}

func (r *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsRemoveExtraReplicaEnabled() {
func (c *ReplicaChecker) checkRemoveExtraReplica(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsRemoveExtraReplicaEnabled() {
return nil
}
// when add learner peer, the number of peer will exceed max replicas for a while,
// just comparing the the number of voters to avoid too many cancel add operator log.
if len(region.GetVoters()) <= r.conf.GetMaxReplicas() {
if len(region.GetVoters()) <= c.conf.GetMaxReplicas() {
return nil
}
log.Debug("region has more than max replicas", zap.Uint64("region-id", region.GetID()), zap.Int("peers", len(region.GetPeers())))
regionStores := r.cluster.GetRegionStores(region)
old := r.strategy(region).SelectStoreToRemove(regionStores)
regionStores := c.cluster.GetRegionStores(region)
old := c.strategy(c.r, region).SelectStoreToRemove(regionStores)
if old == 0 {
replicaCheckerNoWorstPeerCounter.Inc()
r.pendingProcessedRegions.Put(region.GetID(), nil)
c.pendingProcessedRegions.Put(region.GetID(), nil)

Check warning on line 199 in pkg/schedule/checker/replica_checker.go

View check run for this annotation

Codecov / codecov/patch

pkg/schedule/checker/replica_checker.go#L199

Added line #L199 was not covered by tests
return nil
}
op, err := operator.CreateRemovePeerOperator("remove-extra-replica", r.cluster, operator.OpReplica, region, old)
op, err := operator.CreateRemovePeerOperator("remove-extra-replica", c.cluster, operator.OpReplica, region, old)
if err != nil {
replicaCheckerCreateOpFailedCounter.Inc()
return nil
}
return op
}

func (r *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator {
if !r.conf.IsLocationReplacementEnabled() {
func (c *ReplicaChecker) checkLocationReplacement(region *core.RegionInfo) *operator.Operator {
if !c.conf.IsLocationReplacementEnabled() {
return nil
}

strategy := r.strategy(region)
regionStores := r.cluster.GetRegionStores(region)
strategy := c.strategy(c.r, region)
regionStores := c.cluster.GetRegionStores(region)
oldStore := strategy.SelectStoreToRemove(regionStores)
if oldStore == 0 {
replicaCheckerAllRightCounter.Inc()
Expand All @@ -223,19 +227,19 @@
}

newPeer := &metapb.Peer{StoreId: newStore}
op, err := operator.CreateMovePeerOperator("move-to-better-location", r.cluster, region, operator.OpReplica, oldStore, newPeer)
op, err := operator.CreateMovePeerOperator("move-to-better-location", c.cluster, region, operator.OpReplica, oldStore, newPeer)
if err != nil {
replicaCheckerCreateOpFailedCounter.Inc()
return nil
}
return op
}

func (r *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator {
func (c *ReplicaChecker) fixPeer(region *core.RegionInfo, storeID uint64, status string) *operator.Operator {
// Check the number of replicas first.
if len(region.GetVoters()) > r.conf.GetMaxReplicas() {
if len(region.GetVoters()) > c.conf.GetMaxReplicas() {
removeExtra := fmt.Sprintf("remove-extra-%s-replica", status)
op, err := operator.CreateRemovePeerOperator(removeExtra, r.cluster, operator.OpReplica, region, storeID)
op, err := operator.CreateRemovePeerOperator(removeExtra, c.cluster, operator.OpReplica, region, storeID)
if err != nil {
if status == offlineStatus {
replicaCheckerRemoveExtraOfflineFailedCounter.Inc()
Expand All @@ -247,8 +251,8 @@
return op
}

regionStores := r.cluster.GetRegionStores(region)
target, filterByTempState := r.strategy(region).SelectStoreToFix(regionStores, storeID)
regionStores := c.cluster.GetRegionStores(region)
target, filterByTempState := c.strategy(c.r, region).SelectStoreToFix(regionStores, storeID)
if target == 0 {
if status == offlineStatus {
replicaCheckerNoStoreOfflineCounter.Inc()
Expand All @@ -257,13 +261,13 @@
}
log.Debug("no best store to add replica", zap.Uint64("region-id", region.GetID()))
if filterByTempState {
r.pendingProcessedRegions.Put(region.GetID(), nil)
c.pendingProcessedRegions.Put(region.GetID(), nil)
}
return nil
}
newPeer := &metapb.Peer{StoreId: target}
replace := fmt.Sprintf("replace-%s-replica", status)
op, err := operator.CreateMovePeerOperator(replace, r.cluster, region, operator.OpReplica, storeID, newPeer)
op, err := operator.CreateMovePeerOperator(replace, c.cluster, region, operator.OpReplica, storeID, newPeer)
if err != nil {
if status == offlineStatus {
replicaCheckerReplaceOfflineFailedCounter.Inc()
Expand All @@ -275,12 +279,13 @@
return op
}

func (r *ReplicaChecker) strategy(region *core.RegionInfo) *ReplicaStrategy {
func (c *ReplicaChecker) strategy(r *rand.Rand, region *core.RegionInfo) *ReplicaStrategy {
return &ReplicaStrategy{
checkerName: r.Name(),
cluster: r.cluster,
locationLabels: r.conf.GetLocationLabels(),
isolationLevel: r.conf.GetIsolationLevel(),
checkerName: c.Name(),
cluster: c.cluster,
locationLabels: c.conf.GetLocationLabels(),
isolationLevel: c.conf.GetIsolationLevel(),
region: region,
r: r,
}
}
7 changes: 5 additions & 2 deletions pkg/schedule/checker/replica_strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package checker

import (
"math/rand"

"github.com/pingcap/log"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/core/constant"
Expand All @@ -26,6 +28,7 @@ import (
// ReplicaStrategy collects some utilities to manipulate region peers. It
// exists to allow replica_checker and rule_checker to reuse common logics.
type ReplicaStrategy struct {
r *rand.Rand
checkerName string // replica-checker / rule-checker
cluster sche.CheckerCluster
locationLabels []string
Expand Down Expand Up @@ -76,7 +79,7 @@ func (s *ReplicaStrategy) SelectStoreToAdd(coLocationStores []*core.StoreInfo, e

isolationComparer := filter.IsolationComparer(s.locationLabels, coLocationStores)
strictStateFilter := &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, AllowFastFailover: s.fastFailover, OperatorLevel: level}
targetCandidate := filter.NewCandidates(s.cluster.GetStores()).
targetCandidate := filter.NewCandidates(s.r, s.cluster.GetStores()).
FilterTarget(s.cluster.GetCheckerConfig(), nil, nil, filters...).
KeepTheTopStores(isolationComparer, false) // greater isolation score is better
if targetCandidate.Len() == 0 {
Expand Down Expand Up @@ -143,7 +146,7 @@ func (s *ReplicaStrategy) SelectStoreToRemove(coLocationStores []*core.StoreInfo
if s.fastFailover {
level = constant.Urgent
}
source := filter.NewCandidates(coLocationStores).
source := filter.NewCandidates(s.r, coLocationStores).
FilterSource(s.cluster.GetCheckerConfig(), nil, nil, &filter.StoreStateFilter{ActionScope: s.checkerName, MoveRegion: true, OperatorLevel: level}).
KeepTheTopStores(isolationComparer, true).
PickTheTopStore(filter.RegionScoreComparer(s.cluster.GetCheckerConfig()), false)
Expand Down
12 changes: 8 additions & 4 deletions pkg/schedule/checker/rule_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"
"errors"
"math"
"math/rand"
"time"

"github.com/pingcap/kvproto/pkg/metapb"
Expand Down Expand Up @@ -56,6 +57,7 @@ type RuleChecker struct {
pendingList cache.Cache
switchWitnessCache *cache.TTLUint64
record *recorder
r *rand.Rand
}

// NewRuleChecker creates a checker instance.
Expand All @@ -67,6 +69,7 @@ func NewRuleChecker(ctx context.Context, cluster sche.CheckerCluster, ruleManage
pendingList: cache.NewDefaultCache(maxPendingListLen),
switchWitnessCache: cache.NewIDTTL(ctx, time.Minute, cluster.GetCheckerConfig().GetSwitchWitnessInterval()),
record: newRecord(),
r: rand.New(rand.NewSource(time.Now().UnixNano())),
}
}

Expand Down Expand Up @@ -201,7 +204,7 @@ func (c *RuleChecker) addRulePeer(region *core.RegionInfo, fit *placement.Region
ruleStores := c.getRuleFitStores(rf)
isWitness := rf.Rule.IsWitness && c.isWitnessEnabled()
// If the peer to be added is a witness, since no snapshot is needed, we also reuse the fast failover logic.
store, filterByTempState := c.strategy(region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores)
store, filterByTempState := c.strategy(c.r, region, rf.Rule, isWitness).SelectStoreToAdd(ruleStores)
if store == 0 {
ruleCheckerNoStoreAddCounter.Inc()
c.handleFilterState(region, filterByTempState)
Expand Down Expand Up @@ -252,7 +255,7 @@ func (c *RuleChecker) replaceUnexpectedRulePeer(region *core.RegionInfo, rf *pla
fastFailover = false
}
ruleStores := c.getRuleFitStores(rf)
store, filterByTempState := c.strategy(region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId())
store, filterByTempState := c.strategy(c.r, region, rf.Rule, fastFailover).SelectStoreToFix(ruleStores, peer.GetStoreId())
if store == 0 {
ruleCheckerNoStoreReplaceCounter.Inc()
c.handleFilterState(region, filterByTempState)
Expand Down Expand Up @@ -393,7 +396,7 @@ func (c *RuleChecker) fixBetterLocation(region *core.RegionInfo, rf *placement.R

isWitness := rf.Rule.IsWitness && c.isWitnessEnabled()
// If the peer to be moved is a witness, since no snapshot is needed, we also reuse the fast failover logic.
strategy := c.strategy(region, rf.Rule, isWitness)
strategy := c.strategy(c.r, region, rf.Rule, isWitness)
ruleStores := c.getRuleFitStores(rf)
oldStore := strategy.SelectStoreToRemove(ruleStores)
if oldStore == 0 {
Expand Down Expand Up @@ -618,7 +621,7 @@ func (c *RuleChecker) hasAvailableWitness(region *core.RegionInfo, peer *metapb.
return nil, false
}

func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy {
func (c *RuleChecker) strategy(r *rand.Rand, region *core.RegionInfo, rule *placement.Rule, fastFailover bool) *ReplicaStrategy {
return &ReplicaStrategy{
checkerName: c.Name(),
cluster: c.cluster,
Expand All @@ -627,6 +630,7 @@ func (c *RuleChecker) strategy(region *core.RegionInfo, rule *placement.Rule, fa
region: region,
extraFilters: []filter.Filter{filter.NewLabelConstraintFilter(c.Name(), rule.LabelConstraints)},
fastFailover: fastFailover,
r: r,
}
}

Expand Down
5 changes: 2 additions & 3 deletions pkg/schedule/filter/candidates.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package filter
import (
"math/rand"
"sort"
"time"

"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/schedule/config"
Expand All @@ -32,8 +31,8 @@ type StoreCandidates struct {
}

// NewCandidates creates StoreCandidates with store list.
func NewCandidates(stores []*core.StoreInfo) *StoreCandidates {
return &StoreCandidates{r: rand.New(rand.NewSource(time.Now().UnixNano())), Stores: stores}
func NewCandidates(r *rand.Rand, stores []*core.StoreInfo) *StoreCandidates {
return &StoreCandidates{r: r, Stores: stores}
}

// FilterSource keeps stores that can pass all source filters.
Expand Down
Loading