Skip to content

Commit

Permalink
server: pause scheduling immediately (#7809)
Browse files Browse the repository at this point in the history
ref #5839

Signed-off-by: Ryan Leung <[email protected]>
  • Loading branch information
rleungx committed Feb 7, 2024
1 parent ba621f0 commit 3965b4c
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 2 deletions.
7 changes: 6 additions & 1 deletion server/forward.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/tikv/pd/pkg/utils/grpcutil"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/server/cluster"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -249,7 +250,7 @@ func (s *GrpcServer) createRegionHeartbeatSchedulingStream(ctx context.Context,
return forwardStream, forwardCtx, cancelForward, err
}

func forwardRegionHeartbeatToScheduling(forwardStream schedulingpb.Scheduling_RegionHeartbeatClient, server *heartbeatServer, errCh chan error) {
func forwardRegionHeartbeatToScheduling(rc *cluster.RaftCluster, forwardStream schedulingpb.Scheduling_RegionHeartbeatClient, server *heartbeatServer, errCh chan error) {
defer logutil.LogPanic()
defer close(errCh)
for {
Expand All @@ -262,6 +263,10 @@ func forwardRegionHeartbeatToScheduling(forwardStream schedulingpb.Scheduling_Re
errCh <- errors.WithStack(err)
return
}
// TODO: find a better way to halt scheduling immediately.
if rc.GetOpts().IsSchedulingHalted() {
continue
}
// The error types defined for schedulingpb and pdpb are different, so we need to convert them.
var pdpbErr *pdpb.Error
schedulingpbErr := resp.GetHeader().GetError()
Expand Down
2 changes: 1 addition & 1 deletion server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ func (s *GrpcServer) RegionHeartbeat(stream pdpb.PD_RegionHeartbeatServer) error
}
lastForwardedSchedulingHost = forwardedSchedulingHost
forwardErrCh = make(chan error, 1)
go forwardRegionHeartbeatToScheduling(forwardSchedulingStream, server, forwardErrCh)
go forwardRegionHeartbeatToScheduling(rc, forwardSchedulingStream, server, forwardErrCh)
}
schedulingpbReq := &schedulingpb.RegionHeartbeatRequest{
Header: &schedulingpb.RequestHeader{
Expand Down

0 comments on commit 3965b4c

Please sign in to comment.