From 3965b4c32a202ffe566ebbe7292b4dd40e44d63f Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Wed, 7 Feb 2024 14:19:14 +0800 Subject: [PATCH] server: pause scheduling immediately (#7809) ref tikv/pd#5839 Signed-off-by: Ryan Leung --- server/forward.go | 7 ++++++- server/grpc_service.go | 2 +- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/server/forward.go b/server/forward.go index 82a145e47b7..e478f4869c9 100644 --- a/server/forward.go +++ b/server/forward.go @@ -32,6 +32,7 @@ import ( "github.com/tikv/pd/pkg/utils/grpcutil" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/tsoutil" + "github.com/tikv/pd/server/cluster" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" @@ -249,7 +250,7 @@ func (s *GrpcServer) createRegionHeartbeatSchedulingStream(ctx context.Context, return forwardStream, forwardCtx, cancelForward, err } -func forwardRegionHeartbeatToScheduling(forwardStream schedulingpb.Scheduling_RegionHeartbeatClient, server *heartbeatServer, errCh chan error) { +func forwardRegionHeartbeatToScheduling(rc *cluster.RaftCluster, forwardStream schedulingpb.Scheduling_RegionHeartbeatClient, server *heartbeatServer, errCh chan error) { defer logutil.LogPanic() defer close(errCh) for { @@ -262,6 +263,10 @@ func forwardRegionHeartbeatToScheduling(forwardStream schedulingpb.Scheduling_Re errCh <- errors.WithStack(err) return } + // TODO: find a better way to halt scheduling immediately. + if rc.GetOpts().IsSchedulingHalted() { + continue + } // The error types defined for schedulingpb and pdpb are different, so we need to convert them. var pdpbErr *pdpb.Error schedulingpbErr := resp.GetHeader().GetError() diff --git a/server/grpc_service.go b/server/grpc_service.go index 38796fc7ff5..08272da5cf1 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1347,7 +1347,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error } lastForwardedSchedulingHost = forwardedSchedulingHost forwardErrCh = make(chan error, 1) - go forwardRegionHeartbeatToScheduling(forwardSchedulingStream, server, forwardErrCh) + go forwardRegionHeartbeatToScheduling(rc, forwardSchedulingStream, server, forwardErrCh) } schedulingpbReq := &schedulingpb.RegionHeartbeatRequest{ Header: &schedulingpb.RequestHeader{