Skip to content

Commit

Permalink
tools: support UpdateGCSafePoint and ReportMinResolvedTS for api and …
Browse files Browse the repository at this point in the history
…heartbeat Bench (#7732)

ref #7703

Signed-off-by: Cabinfever_B <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
CabinfeverB and ti-chi-bot[bot] committed Feb 6, 2024
1 parent b2f40b6 commit ba621f0
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 9 deletions.
65 changes: 59 additions & 6 deletions tools/pd-api-bench/cases/cases.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (
"context"
"fmt"
"math/rand"
"strconv"
"time"

"github.com/pingcap/log"
pd "github.com/tikv/pd/client"
Expand Down Expand Up @@ -142,12 +144,14 @@ type GRPCCraeteFn func() GRPCCase

// GRPCCaseFnMap is the map for all gRPC case creation function.
var GRPCCaseFnMap = map[string]GRPCCraeteFn{
"GetRegion": newGetRegion(),
"GetRegionEnableFollower": newGetRegionEnableFollower(),
"GetStore": newGetStore(),
"GetStores": newGetStores(),
"ScanRegions": newScanRegions(),
"Tso": newTso(),
"GetRegion": newGetRegion(),
"GetRegionEnableFollower": newGetRegionEnableFollower(),
"GetStore": newGetStore(),
"GetStores": newGetStores(),
"ScanRegions": newScanRegions(),
"Tso": newTso(),
"UpdateGCSafePoint": newUpdateGCSafePoint(),
"UpdateServiceGCSafePoint": newUpdateServiceGCSafePoint(),
}

// HTTPCase is the interface for all HTTP cases.
Expand Down Expand Up @@ -227,6 +231,55 @@ func (c *regionsStats) Do(ctx context.Context, cli pdHttp.Client) error {
return nil
}

type updateGCSafePoint struct {
*baseCase
}

func newUpdateGCSafePoint() func() GRPCCase {
return func() GRPCCase {
return &updateGCSafePoint{
baseCase: &baseCase{
name: "UpdateGCSafePoint",
cfg: newConfig(),
},
}
}
}

func (c *updateGCSafePoint) Unary(ctx context.Context, cli pd.Client) error {
s := time.Now().Unix()
_, err := cli.UpdateGCSafePoint(ctx, uint64(s))
if err != nil {
return err
}
return nil
}

type updateServiceGCSafePoint struct {
*baseCase
}

func newUpdateServiceGCSafePoint() func() GRPCCase {
return func() GRPCCase {
return &updateServiceGCSafePoint{
baseCase: &baseCase{
name: "UpdateServiceGCSafePoint",
cfg: newConfig(),
},
}
}
}

func (c *updateServiceGCSafePoint) Unary(ctx context.Context, cli pd.Client) error {
s := time.Now().Unix()
id := rand.Int63n(100) + 1
_, err := cli.UpdateServiceGCSafePoint(ctx, strconv.FormatInt(id, 10), id, uint64(s))
if err != nil {
return err
}
return nil
}

type getRegion struct {
*baseCase
}
Expand Down
32 changes: 29 additions & 3 deletions tools/pd-heartbeat-bench/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ func (rs *Regions) update(cfg *config.Config, options *config.Options) {
rs.awakenRegions.Store(awakenRegions)
}

func createHeartbeatStream(ctx context.Context, cfg *config.Config) pdpb.PD_RegionHeartbeatClient {
func createHeartbeatStream(ctx context.Context, cfg *config.Config) (pdpb.PDClient, pdpb.PD_RegionHeartbeatClient) {
cli, err := newClient(ctx, cfg)
if err != nil {
log.Fatal("create client error", zap.Error(err))
Expand All @@ -332,7 +332,7 @@ func createHeartbeatStream(ctx context.Context, cfg *config.Config) pdpb.PD_Regi
stream.Recv()
}
}()
return stream
return cli, stream
}

func (rs *Regions) handleRegionHeartbeat(wg *sync.WaitGroup, stream pdpb.PD_RegionHeartbeatClient, storeID uint64, rep report.Report) {
Expand Down Expand Up @@ -507,14 +507,20 @@ func main() {
bootstrap(ctx, cli)
putStores(ctx, cfg, cli, stores)
log.Info("finish put stores")
clis := make(map[uint64]pdpb.PDClient, cfg.StoreCount)
httpCli := pdHttp.NewClient("tools-heartbeat-bench", []string{cfg.PDAddr}, pdHttp.WithTLSConfig(loadTLSConfig(cfg)))
go deleteOperators(ctx, httpCli)
streams := make(map[uint64]pdpb.PD_RegionHeartbeatClient, cfg.StoreCount)
for i := 1; i <= cfg.StoreCount; i++ {
streams[uint64(i)] = createHeartbeatStream(ctx, cfg)
clis[uint64(i)], streams[uint64(i)] = createHeartbeatStream(ctx, cfg)
}
header := &pdpb.RequestHeader{
ClusterId: clusterID,
}
var heartbeatTicker = time.NewTicker(regionReportInterval * time.Second)
defer heartbeatTicker.Stop()
var resolvedTSTicker = time.NewTicker(time.Second)
defer resolvedTSTicker.Stop()
for {
select {
case <-heartbeatTicker.C:
Expand Down Expand Up @@ -547,6 +553,26 @@ func main() {
log.Info("store heartbeat stats", zap.String("max", fmt.Sprintf("%.4fs", since)))
regions.update(cfg, options)
go stores.update(regions) // update stores in background, unusually region heartbeat is slower than store update.
case <-resolvedTSTicker.C:
wg := &sync.WaitGroup{}
for i := 1; i <= cfg.StoreCount; i++ {
id := uint64(i)
wg.Add(1)
go func(wg *sync.WaitGroup, id uint64) {
defer wg.Done()
cli := clis[id]
_, err := cli.ReportMinResolvedTS(ctx, &pdpb.ReportMinResolvedTsRequest{
Header: header,
StoreId: id,
MinResolvedTs: uint64(time.Now().Unix()),
})
if err != nil {
log.Error("send resolved TS error", zap.Uint64("store-id", id), zap.Error(err))
return
}
}(wg, id)
}
wg.Wait()
case <-ctx.Done():
log.Info("got signal to exit")
switch sig {
Expand Down

0 comments on commit ba621f0

Please sign in to comment.