From 3614f0545f615fe7ba93bf23b136a22eff45600b Mon Sep 17 00:00:00 2001 From: zhanglei28 Date: Thu, 9 Nov 2023 20:01:57 +0800 Subject: [PATCH] add consistentHashKey load balance --- core/constants.go | 9 +- go.mod | 2 + lb/consistentHashKeyLb.go | 183 +++++++++++++++++++++++++++ lb/consistentHashKeyLb_test.go | 220 +++++++++++++++++++++++++++++++++ lb/lb.go | 8 +- 5 files changed, 418 insertions(+), 4 deletions(-) create mode 100644 lb/consistentHashKeyLb.go create mode 100644 lb/consistentHashKeyLb_test.go diff --git a/core/constants.go b/core/constants.go index aa16fdd0..2876ea27 100644 --- a/core/constants.go +++ b/core/constants.go @@ -68,8 +68,13 @@ const ( MixGroups = "mixGroups" MaxContentLength = "maxContentLength" UnixSockProtocolFlag = "unix://" - XForwardedForLower = "x-forwarded-for" // used as motan default proxy key - XForwardedFor = "X-Forwarded-For" +) + +// attachment keys +const ( + XForwardedForLower = "x-forwarded-for" // used as motan default proxy key + XForwardedFor = "X-Forwarded-For" + ConsistentHashKey = "consistentHashKey" //string used to calculate consistent hash ) // nodeType diff --git a/go.mod b/go.mod index a83fbb4b..989f8b55 100644 --- a/go.mod +++ b/go.mod @@ -5,6 +5,8 @@ go 1.11 require ( github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 github.com/beberlei/fastcgi-serve v0.0.0-20151230120321-4676005f65b7 + github.com/buraksezer/consistent v0.10.0 + github.com/cespare/xxhash/v2 v2.2.0 github.com/davecgh/go-spew v1.1.1 // indirect github.com/golang/protobuf v1.3.2 github.com/juju/ratelimit v1.0.1 diff --git a/lb/consistentHashKeyLb.go b/lb/consistentHashKeyLb.go new file mode 100644 index 00000000..28767561 --- /dev/null +++ b/lb/consistentHashKeyLb.go @@ -0,0 +1,183 @@ +package lb + +import ( + "errors" + "github.com/buraksezer/consistent" + "github.com/cespare/xxhash/v2" + motan "github.com/weibocom/motan-go/core" + vlog "github.com/weibocom/motan-go/log" + "math/rand" + "strconv" +) + +const ( + // config keys + PartSizeKey = "consistentHashKey.partSize" + LoadKey = "consistentHashKey.load" + ReplicaKey = "consistentHashKey.replica" + + // default values + DefaultPartSizeAddend = 271 + DefaultReplica = 10 + DefaultMinLoad = 1.1 +) + +var BuildConsistentHashFailError = errors.New("build consistent hash fail") + +type ConsistentHashLB struct { + url *motan.URL + endpoints []motan.EndPoint + cHash *consistent.Consistent + lastPartSize int + load float64 + replica int +} + +type member struct { + Key string // Key needs to be generated at build time + Endpoint motan.EndPoint +} + +func (m *member) String() string { + return m.Key +} + +type hasher struct{} + +func (h hasher) Sum64(data []byte) uint64 { + return xxhash.Sum64(data) +} + +func (c *ConsistentHashLB) OnRefresh(endpoints []motan.EndPoint) { + if len(endpoints) == 1 { + c.endpoints = endpoints + c.cHash = nil + return + } + ok := c.buildConsistent(endpoints) + if !ok { + vlog.Errorf("ConsistentHashLB OnRefresh failed, endpoints not update. endpoints size:%d\n", len(endpoints)) + return + } + c.endpoints = endpoints +} + +func (c *ConsistentHashLB) Select(request motan.Request) motan.EndPoint { + if len(c.endpoints) == 1 { + return c.endpoints[0] + } + key := request.GetAttachment(motan.ConsistentHashKey) + var endpoint motan.EndPoint + if key != "" { // Use consistent hashing when hash key is not empty + endpoint = c.cHash.LocateKey([]byte(key)).(*member).Endpoint + } + if endpoint == nil || !endpoint.IsAvailable() { // When the hash key is empty or the hash endpoint is unavailable, the endpoint is randomly selected. + _, endpoint = SelectOneAtRandom(c.endpoints) + } + return endpoint +} + +func (c *ConsistentHashLB) SelectArray(request motan.Request) []motan.EndPoint { + if len(c.endpoints) > MaxSelectArraySize { + key := request.GetAttachment(motan.ConsistentHashKey) + if key != "" { + members, err := c.cHash.GetClosestN([]byte(key), MaxSelectArraySize) + if err != nil { + vlog.Warningf("ConsistentHashLB SelectArray failed, key:%s, err:%v\n", key, err) + } else { + endpoints := make([]motan.EndPoint, 0, len(members)) + for _, m := range members { + if m.(*member).Endpoint.IsAvailable() { + endpoints = append(endpoints, m.(*member).Endpoint) + } + } + return endpoints + } + } + } + return SelectArrayFromIndex(c.endpoints, rand.Intn(len(c.endpoints))) +} + +func (c *ConsistentHashLB) SetWeight(weight string) { +} + +func (c *ConsistentHashLB) buildConsistent(endpoints []motan.EndPoint) bool { + //Calculate partSize + partSize := 0 + if c.lastPartSize == 0 { // first build + partSize = int(c.url.GetIntValue(PartSizeKey, 0)) + } else { // Try not to change the size when building again + partSize = c.lastPartSize + } + if partSize < len(endpoints)*2 { // Recalculate size when conditions are not met + partSize = len(endpoints)*5 + DefaultPartSizeAddend + } + + // Calculate load on first build + if c.load == 0 { + load, err := strconv.ParseFloat(c.url.GetParam(LoadKey, "0"), 64) + if err != nil || load < DefaultMinLoad { + load = DefaultMinLoad + } + c.load = load + } + + // Calculate replica on first build + if c.replica == 0 { + replica := int(c.url.GetIntValue(ReplicaKey, 0)) + if replica <= 0 { + if len(endpoints) < 100 { + replica = DefaultReplica * 2 + } else { + replica = DefaultReplica + } + } + c.replica = replica + } + // Build member list + members := make([]consistent.Member, len(endpoints)) + for i := 0; i < len(endpoints); i++ { + members[i] = &member{Key: endpoints[i].GetURL().GetAddressStr(), Endpoint: endpoints[i]} + } + var cHash *consistent.Consistent + var err error + count := 0 + load := c.load + for { + cHash, err = c.buildConsistent0(members, partSize, c.replica, load) + if err == nil { + break + } + // Increase load when build failed + load += 0.1 + count++ + if count > 20 { // Try 20 times at most, if it still fails, give up + vlog.Errorf("build consistent hash ring failed after maximum retries. part size:%d, load:%.2f, replica:%d, member:%d\n", partSize, load, c.replica, len(endpoints)) + return false + } + } + c.cHash = cHash + c.lastPartSize = partSize + if load != c.load { + c.load = load + } + vlog.Infof("build consistent hash ring, part size:%d, load:%.2f, replica:%d, member:%d", c.lastPartSize, c.load, c.replica, len(endpoints)) + return true +} + +func (c *ConsistentHashLB) buildConsistent0(members []consistent.Member, partSize int, replica int, load float64) (cHash *consistent.Consistent, err error) { + defer func() { + if r := recover(); r != nil { + vlog.Warningf("build consistent hash ring failed with part size:%d, load:%.2f, replica:%d, member:%d, err:%v\n", partSize, load, replica, len(members), r) + err = BuildConsistentHashFailError + } + }() + cfg := consistent.Config{ + PartitionCount: partSize, + ReplicationFactor: replica, + Load: load, + Hasher: hasher{}, + } + cHash = consistent.New(members, cfg) + return cHash, nil +} diff --git a/lb/consistentHashKeyLb_test.go b/lb/consistentHashKeyLb_test.go new file mode 100644 index 00000000..11737af9 --- /dev/null +++ b/lb/consistentHashKeyLb_test.go @@ -0,0 +1,220 @@ +package lb + +import ( + "fmt" + "github.com/stretchr/testify/assert" + motan "github.com/weibocom/motan-go/core" + "github.com/weibocom/motan-go/endpoint" + "testing" +) + +func TestConsistentHashLB_OnRefresh(t *testing.T) { + lb := &ConsistentHashLB{} + + // first refresh + resetLB(lb) + checkDefault(t, lb, 10) + + // refresh multiple times + sizeArray := []int{2, 8, 12, 51, 100, 123, 500, 1000} + for _, s := range sizeArray { + if s >= 500 { + lb.replica = 10 // To reduce testing costs + } + checkDefault(t, lb, s) + } + + // specified part size, loadFactor, replica + resetLB(lb) + lb.url.PutParam(PartSizeKey, "100") + lb.url.PutParam(LoadKey, "1.35") + lb.url.PutParam(ReplicaKey, "5") + checkDefault(t, lb, 50) + assert.Equal(t, 100, lb.lastPartSize) + assert.Equal(t, 1.35, lb.load) + assert.Equal(t, 5, lb.replica) + + // inappropriate value specified + resetLB(lb) + lb.url.PutParam(PartSizeKey, "70") // < 2 * len(eps) + lb.url.PutParam(LoadKey, "1.0") // < 1.1 + lb.url.PutParam(ReplicaKey, "-5") // < 0 + checkDefault(t, lb, 50) + assert.NotEqual(t, 70, lb.lastPartSize) + assert.NotEqual(t, 1.0, lb.load) + assert.NotEqual(t, -5, lb.replica) + + // change part size + resetLB(lb) + lb.url.PutParam(PartSizeKey, "100") + checkDefault(t, lb, 30) + assert.Equal(t, 100, lb.lastPartSize) + checkDefault(t, lb, 55) + assert.NotEqual(t, 100, lb.lastPartSize) + + // change loadFactor + resetLB(lb) + lb.load = 1.0 + checkDefault(t, lb, 30) +} + +func TestConsistentHashLB_Select(t *testing.T) { + lb := &ConsistentHashLB{} + resetLB(lb) + req := &motan.MotanRequest{Method: "test"} + + // one endpoint + lb.OnRefresh(buildEndpoint(1)) + checkHashEP(t, lb, req, lb.endpoints[0]) + + // use hash key + req.SetAttachment(motan.ConsistentHashKey, "234234") + checkHashEP(t, lb, req, lb.endpoints[0]) + + // repeat select + lb.OnRefresh(buildEndpoint(30)) + keys := []string{"234", "1", "sjide", "Y&^(U23j49", "skd9f0i9*(RK3erp3oi29kf"} + for _, k := range keys { + checkConsistent(t, lb, k, 20) + } + + // not use hash key(random) + req.SetAttachment(motan.ConsistentHashKey, "") + ep := lb.Select(req) + ep2 := lb.Select(req) + assert.NotEqual(t, ep, ep2) + + // change part size + resetLB(lb) + lb.url.PutParam(PartSizeKey, "100") + lb.OnRefresh(buildEndpoint(49)) + key := "sjide123" + checkConsistent(t, lb, key, 20) + req.SetAttachment(motan.ConsistentHashKey, key) + ep = lb.Select(req) + lb.OnRefresh(buildEndpoint(51)) + checkConsistent(t, lb, key, 20) + ep2 = lb.Select(req) + assert.NotEqual(t, ep, ep2) + + // endpoint unavailable + resetLB(lb) + lb.OnRefresh(buildEndpoint(30)) + ep = lb.Select(req) + ep.(*lbTestMockEndpoint).isAvail = false + ep2 = lb.Select(req) + assert.NotEqual(t, ep, ep2) +} + +func TestConsistentHashLB_SelectArray(t *testing.T) { + lb := &ConsistentHashLB{} + resetLB(lb) + req := &motan.MotanRequest{Method: "test"} + + // less endpoint + lb.OnRefresh(buildEndpoint(MaxSelectArraySize)) + eps := lb.SelectArray(req) + assert.Equal(t, MaxSelectArraySize, len(eps)) + + // use hash key + resetLB(lb) + lb.OnRefresh(buildEndpoint(30)) + req.SetAttachment(motan.ConsistentHashKey, "234234") + ep := lb.Select(req) + eps = lb.SelectArray(req) + assert.Equal(t, ep, eps[0]) + assert.Equal(t, MaxSelectArraySize, len(eps)) + + // repeat select + var eps2 []motan.EndPoint + for i := 0; i < 20; i++ { + eps2 = lb.SelectArray(req) + assert.Equal(t, eps, eps2) + } + + // test random + req.SetAttachment(motan.ConsistentHashKey, "") + eps = lb.SelectArray(req) + eps2 = lb.SelectArray(req) + assert.NotEqual(t, eps, eps2) +} + +func TestConsistentMigration(t *testing.T) { + lb := &ConsistentHashLB{} + resetLB(lb) + + // distribution migration under default configuration + // endpoint changes slightly + diffDistribution(lb, 49, 50) + diffDistribution(lb, 49, 51) + diffDistribution(lb, 49, 48) + diffDistribution(lb, 49, 47) + + // endpoint changes significantly + diffDistribution(lb, 49, 70) + diffDistribution(lb, 49, 99) + diffDistribution(lb, 49, 31) + diffDistribution(lb, 49, 27) + diffDistribution(lb, 500, 1000) +} + +func checkDefault(t *testing.T, lb *ConsistentHashLB, epsSize int) { + eps := buildEndpoint(epsSize) + lb.OnRefresh(eps) + assert.True(t, lb.lastPartSize >= len(eps)*2) + assert.True(t, lb.load >= DefaultMinLoad) + assert.True(t, lb.replica > 0) + assert.NotNil(t, lb.cHash) + assert.Equal(t, eps, lb.endpoints) +} + +func checkHashEP(t *testing.T, lb *ConsistentHashLB, req motan.Request, expectEP motan.EndPoint) { + ep := lb.Select(req) + assert.Equal(t, expectEP, ep) +} + +func checkConsistent(t *testing.T, lb *ConsistentHashLB, hashKey string, times int) { + req := &motan.MotanRequest{Method: "test"} + req.SetAttachment(motan.ConsistentHashKey, hashKey) + ep := lb.Select(req) + for i := 0; i < times; i++ { + assert.Equal(t, ep, lb.Select(req)) + } +} + +func diffDistribution(lb *ConsistentHashLB, epSize1 int, epSize2 int) int { + lb.OnRefresh(buildEndpoint(epSize1)) + cHash1 := lb.cHash + partSize := lb.lastPartSize + lb.OnRefresh(buildEndpoint(epSize2)) + cHash2 := lb.cHash + if lb.lastPartSize < partSize { + partSize = lb.lastPartSize + } + diff := 0 + for i := 0; i < partSize; i++ { + if cHash1.GetPartitionOwner(i).String() != cHash2.GetPartitionOwner(i).String() { + diff++ + } + } + fmt.Printf("=== ep size %d -> %d, diff distribution: %f\n", epSize1, epSize2, float64(diff)/float64(partSize)) + return diff +} + +func resetLB(lb *ConsistentHashLB) { + lb.endpoints = nil + lb.lastPartSize = 0 + lb.load = 0 + lb.replica = 0 + lb.cHash = nil + lb.url = &motan.URL{Host: "localhost", Port: 0, Protocol: "motan2"} +} + +func buildEndpoint(size int) []motan.EndPoint { + endpoints := make([]motan.EndPoint, 0, size) + for i := 0; i < size; i++ { + url := &motan.URL{Host: "10.10.10." + fmt.Sprintf("%d", i), Port: 8000 + i, Protocol: "motan2"} + endpoints = append(endpoints, &lbTestMockEndpoint{MockEndpoint: &endpoint.MockEndpoint{URL: url}, index: i, isAvail: true}) + } + return endpoints +} diff --git a/lb/lb.go b/lb/lb.go index d275ee70..062c5333 100644 --- a/lb/lb.go +++ b/lb/lb.go @@ -13,8 +13,9 @@ import ( // ext name const ( - Random = "random" - Roundrobin = "roundrobin" + Random = "random" + Roundrobin = "roundrobin" + ConsistentHashKey = "consistentHashKey" ) const ( @@ -35,6 +36,9 @@ func RegistDefaultLb(extFactory motan.ExtensionFactory) { extFactory.RegistExtLb(Roundrobin, NewWeightLbFunc(func(url *motan.URL) motan.LoadBalance { return &RoundrobinLB{url: url} })) + extFactory.RegistExtLb(ConsistentHashKey, NewWeightLbFunc(func(url *motan.URL) motan.LoadBalance { + return &ConsistentHashLB{url: url} + })) } // WeightedLbWrapper support multi group weighted LB