Skip to content

Commit

Permalink
client: enabling TSO Follower Proxy with TSO service should fail (#7833)
Browse files Browse the repository at this point in the history
ref #5836

Since the TSO service does not support the TSO Follower Proxy, enabling it in this case should fail.

Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Feb 21, 2024
1 parent 46fd313 commit d6d9fea
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 5 deletions.
3 changes: 3 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,9 @@ func (c *client) UpdateOption(option DynamicOption, value any) error {
return err
}
case EnableTSOFollowerProxy:
if c.getServiceMode() != pdpb.ServiceMode_PD_SVC_MODE {
return errors.New("[pd] tso follower proxy is only supported in PD service mode")
}
enable, ok := value.(bool)
if !ok {
return errors.New("[pd] invalid value type for EnableTSOFollowerProxy option, it should be bool")
Expand Down
11 changes: 10 additions & 1 deletion client/tso_dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,6 +372,9 @@ func (c *tsoClient) handleDispatcher(
return
case <-c.option.enableTSOFollowerProxyCh:
enableTSOFollowerProxy := c.option.getEnableTSOFollowerProxy()
log.Info("[tso] tso follower proxy status changed",
zap.String("dc-location", dc),
zap.Bool("enable", enableTSOFollowerProxy))
if enableTSOFollowerProxy && updateTicker.C == nil {
// Because the TSO Follower Proxy is enabled,
// the periodic check needs to be performed.
Expand Down Expand Up @@ -701,7 +704,11 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s
}
// GC the stale one.
connectionCtxs.Range(func(addr, cc any) bool {
if _, ok := tsoStreamBuilders[addr.(string)]; !ok {
addrStr := addr.(string)
if _, ok := tsoStreamBuilders[addrStr]; !ok {
log.Info("[tso] remove the stale tso stream",
zap.String("dc", dc),
zap.String("addr", addrStr))
cc.(*tsoConnectionContext).cancel()
connectionCtxs.Delete(addr)
}
Expand All @@ -712,6 +719,8 @@ func (c *tsoClient) tryConnectToTSOWithProxy(dispatcherCtx context.Context, dc s
if _, ok = connectionCtxs.Load(addr); ok {
continue
}
log.Info("[tso] try to create tso stream",
zap.String("dc", dc), zap.String("addr", addr))
cctx, cancel := context.WithCancel(dispatcherCtx)
// Do not proxy the leader client.
if addr != leaderAddr {
Expand Down
27 changes: 27 additions & 0 deletions tests/integrations/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"reflect"
"sort"
"strconv"
"strings"
"sync"
"testing"
"time"
Expand All @@ -39,6 +40,7 @@ import (
"github.com/tikv/pd/client/retry"
"github.com/tikv/pd/pkg/core"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/mock/mockid"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/tso"
Expand All @@ -49,6 +51,7 @@ import (
"github.com/tikv/pd/server"
"github.com/tikv/pd/server/config"
"github.com/tikv/pd/tests"
"github.com/tikv/pd/tests/integrations/mcs"
"go.etcd.io/etcd/clientv3"
"go.uber.org/goleak"
)
Expand Down Expand Up @@ -319,6 +322,30 @@ func TestTSOFollowerProxy(t *testing.T) {
wg.Wait()
}

func TestTSOFollowerProxyWithTSOService(t *testing.T) {
re := require.New(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
cluster, err := tests.NewTestAPICluster(ctx, 1)
re.NoError(err)
defer cluster.Destroy()
err = cluster.RunInitialServers()
re.NoError(err)
leaderName := cluster.WaitLeader()
pdLeaderServer := cluster.GetServer(leaderName)
re.NoError(pdLeaderServer.BootstrapCluster())
backendEndpoints := pdLeaderServer.GetAddr()
tsoCluster, err := tests.NewTestTSOCluster(ctx, 2, backendEndpoints)
re.NoError(err)
defer tsoCluster.Destroy()
cli := mcs.SetupClientWithKeyspaceID(ctx, re, utils.DefaultKeyspaceID, strings.Split(backendEndpoints, ","))
re.NotNil(cli)
defer cli.Close()
// TSO service does not support the follower proxy, so enabling it should fail.
err = cli.UpdateOption(pd.EnableTSOFollowerProxy, true)
re.Error(err)
}

// TestUnavailableTimeAfterLeaderIsReady is used to test https://github.com/tikv/pd/issues/5207
func TestUnavailableTimeAfterLeaderIsReady(t *testing.T) {
re := require.New(t)
Expand Down
4 changes: 0 additions & 4 deletions tests/integrations/tso/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,10 +269,6 @@ func (suite *tsoClientTestSuite) TestDiscoverTSOServiceWithLegacyPath() {
// TestGetMinTS tests the correctness of GetMinTS.
func (suite *tsoClientTestSuite) TestGetMinTS() {
re := suite.Require()
if !suite.legacy {
suite.waitForAllKeyspaceGroupsInServing(re)
}

var wg sync.WaitGroup
wg.Add(tsoRequestConcurrencyNumber * len(suite.clients))
for i := 0; i < tsoRequestConcurrencyNumber; i++ {
Expand Down

0 comments on commit d6d9fea

Please sign in to comment.