diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 7f511c5f..ba52da95 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -19,7 +19,7 @@ jobs: testing: strategy: matrix: - go-version: [1.12.x,1.13.x,1.14.x,1.15.x,1.16.x,1.17.x,1.18.x,1.19.x,1.20.x,1.21.x] + go-version: [1.16.x,1.17.x,1.18.x,1.19.x,1.20.x,1.21.x] platform: [ubuntu-latest] runs-on: ${{ matrix.platform }} steps: @@ -45,10 +45,10 @@ jobs: name: codecov runs-on: ubuntu-latest steps: - - name: Set up Go 1.15 + - name: Set up Go 1.16 uses: actions/setup-go@v3 with: - go-version: 1.15.x + go-version: 1.16.x id: go - name: Checkout code diff --git a/agent.go b/agent.go index a45d8abb..5d70e816 100644 --- a/agent.go +++ b/agent.go @@ -5,6 +5,8 @@ import ( "fmt" "github.com/weibocom/motan-go/endpoint" vlog "github.com/weibocom/motan-go/log" + "github.com/weibocom/motan-go/meta" + "github.com/weibocom/motan-go/provider" "gopkg.in/yaml.v2" "io/ioutil" "net" @@ -193,6 +195,8 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) { return } fmt.Println("init agent context success.") + // initialize meta package + meta.Initialize(a.Context) a.initParam() a.SetSanpshotConf() a.initAgentURL() @@ -948,7 +952,8 @@ func (a *Agent) doExportService(url *motan.URL) { } type serverAgentMessageHandler struct { - providers *motan.CopyOnWriteMap + providers *motan.CopyOnWriteMap + frameworkProviders *motan.CopyOnWriteMap } func (sa *serverAgentMessageHandler) GetName() string { @@ -973,6 +978,12 @@ func (sa *serverAgentMessageHandler) GetRuntimeInfo() map[string]interface{} { func (sa *serverAgentMessageHandler) Initialize() { sa.providers = motan.NewCopyOnWriteMap() + sa.frameworkProviders = motan.NewCopyOnWriteMap() + sa.initFrameworkServiceProvider() +} + +func (sa *serverAgentMessageHandler) initFrameworkServiceProvider() { + sa.frameworkProviders.Store(meta.MetaServiceName, &provider.MetaProvider{}) } func getServiceKey(group, path string) string { @@ -990,6 +1001,13 @@ func (sa *serverAgentMessageHandler) Call(request motan.Request) (res motan.Resp group = request.GetAttachment(motan.GroupKey) } serviceKey := getServiceKey(group, request.GetServiceName()) + if mfs := request.GetAttachment(mpro.MFrameworkService); mfs != "" { + if fp, ok := sa.frameworkProviders.Load(request.GetServiceName()); ok { + return fp.(motan.Provider).Call(request) + } + //throw specific exception to avoid triggering forced fusing on the client side。 + return motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: 501, ErrMsg: motan.ServiceNotSupport, ErrType: motan.ServiceException}) + } if p := sa.providers.LoadOrNil(serviceKey); p != nil { p := p.(motan.Provider) res = p.Call(request) diff --git a/client.go b/client.go index 48aa8931..e2000de9 100644 --- a/client.go +++ b/client.go @@ -173,3 +173,7 @@ func (m *MCContext) GetRefer(service string) interface{} { // TODO 对client的封装,可以根据idl自动生成代码时支持 return nil } + +func (m *MCContext) GetContext() *motan.Context { + return m.context +} diff --git a/cluster/motanCluster.go b/cluster/motanCluster.go index f925cb38..98b17779 100644 --- a/cluster/motanCluster.go +++ b/cluster/motanCluster.go @@ -268,6 +268,9 @@ func (m *MotanCluster) Destroy() { vlog.Infof("destroy endpoint %s .", e.GetURL().GetIdentity()) e.Destroy() } + if d, ok := m.LoadBalance.(motan.Destroyable); ok { + d.Destroy() + } m.closed = true } } diff --git a/config/config.go b/config/config.go index d7b0675b..b9789ef5 100644 --- a/config/config.go +++ b/config/config.go @@ -127,12 +127,17 @@ func (c *Config) Int64(key string) (int64, error) { // String returns the string value for a given key. func (c *Config) String(key string) string { + return c.GetStringWithDefault(key, "") +} + +// String returns the string value for a given key. +func (c *Config) GetStringWithDefault(key string, def string) string { if value, err := c.getData(key); err != nil { - return "" + return def } else if vv, ok := value.(string); ok { return vv } - return "" + return def } // GetSection returns map for the given key diff --git a/core/constants.go b/core/constants.go index c01b25a2..8f9e4f3c 100644 --- a/core/constants.go +++ b/core/constants.go @@ -149,6 +149,21 @@ const ( DefaultReferVersion = "1.0" ) +// meta info +const ( + DefaultMetaPrefix = "META_" + EnvMetaPrefixKey = "envMetaPrefix" + URLRegisterMeta = "registerMeta" + DefaultRegisterMeta = true + MetaCacheExpireSecondKey = "metaCacheExpireSecond" + DynamicMetaKey = "dynamicMeta" + DefaultDynamicMeta = true + WeightRefreshPeriodSecondKey = "weightRefreshPeriodSecond" + WeightMetaSuffixKey = "WEIGHT" + ServiceNotSupport = "service not support" +) + + //----------- runtime ------------- const ( diff --git a/core/globalContext.go b/core/globalContext.go index d9c32b0a..1c1a287a 100644 --- a/core/globalContext.go +++ b/core/globalContext.go @@ -438,7 +438,6 @@ func (c *Context) basicConfToURLs(section string) map[string]*URL { if len(finalFilters) > 0 { newURL.PutParam(FilterKey, c.FilterSetToStr(finalFilters)) } - newURLs[key] = newURL } return newURLs diff --git a/core/motan.go b/core/motan.go index 59f64191..03e764d8 100644 --- a/core/motan.go +++ b/core/motan.go @@ -162,6 +162,12 @@ type LoadBalance interface { SetWeight(weight string) } +// WeightLoadBalance : weight loadBalance for cluster +type WeightLoadBalance interface { + LoadBalance + NotifyWeightChange() +} + // DiscoverService : discover service for cluster type DiscoverService interface { Subscribe(url *URL, listener NotifyListener) diff --git a/core/url.go b/core/url.go index 11ac3be6..1a3b4e27 100644 --- a/core/url.go +++ b/core/url.go @@ -336,9 +336,11 @@ func (u *URL) CanServe(other *URL) bool { vlog.Errorf("can not serve path, err : p1:%s, p2:%s", u.Path, other.Path) return false } - if !IsSame(u.Parameters, other.Parameters, SerializationKey, "") { - vlog.Errorf("can not serve serialization, err : s1:%s, s2:%s", u.Parameters[SerializationKey], other.Parameters[SerializationKey]) - return false + if u.Protocol != "motan2" { + if !IsSame(u.Parameters, other.Parameters, SerializationKey, "") { + vlog.Errorf("can not serve serialization, err : s1:%s, s2:%s", u.Parameters[SerializationKey], other.Parameters[SerializationKey]) + return false + } } // compatible with old version: 0.1 if !(IsSame(u.Parameters, other.Parameters, VersionKey, "0.1") || IsSame(u.Parameters, other.Parameters, VersionKey, DefaultReferVersion)) { diff --git a/core/util.go b/core/util.go index 0438ca69..0ba6f4c3 100644 --- a/core/util.go +++ b/core/util.go @@ -100,6 +100,24 @@ func SliceShuffle(slice []string) []string { return slice } +func EndpointShuffle(slice []EndPoint) []EndPoint { + for i := 0; i < len(slice); i++ { + a := rand.Intn(len(slice)) + b := rand.Intn(len(slice)) + slice[a], slice[b] = slice[b], slice[a] + } + return slice +} + +func ByteSliceShuffle(slice []byte) []byte { + for i := 0; i < len(slice); i++ { + a := rand.Intn(len(slice)) + b := rand.Intn(len(slice)) + slice[a], slice[b] = slice[b], slice[a] + } + return slice +} + func FirstUpper(s string) string { r := []rune(s) @@ -286,3 +304,10 @@ func ClearDirectEnvRegistry() { directRpc = nil initDirectEnv = sync.Once{} } + +func GetNonNegative(originValue int64) int64 { + if originValue > 0 { + return originValue + } + return 0x7fffffffffffffff & originValue +} diff --git a/default.go b/default.go index 71b0a729..ef77faf7 100644 --- a/default.go +++ b/default.go @@ -82,6 +82,12 @@ func GetDefaultManageHandlers() map[string]http.Handler { defaultManageHandlers["/registry/list"] = dynamicConfigurer defaultManageHandlers["/registry/info"] = dynamicConfigurer + metaInfo := &MetaInfo{} + defaultManageHandlers["/meta/update"] = metaInfo + defaultManageHandlers["/meta/delete"] = metaInfo + defaultManageHandlers["/meta/get"] = metaInfo + defaultManageHandlers["/meta/getAll"] = metaInfo + hotReload := &HotReload{} defaultManageHandlers["/reload/clusters"] = hotReload diff --git a/endpoint/mockDynamicEndpoint.go b/endpoint/mockDynamicEndpoint.go new file mode 100644 index 00000000..4361f450 --- /dev/null +++ b/endpoint/mockDynamicEndpoint.go @@ -0,0 +1,91 @@ +package endpoint + +import ( + motan "github.com/weibocom/motan-go/core" + mpro "github.com/weibocom/motan-go/protocol" + "strconv" + "sync" + "sync/atomic" +) + +type MockDynamicEndpoint struct { + URL *motan.URL + available bool + DynamicWeight int64 + StaticWeight int64 + Count int64 + dynamicMeta sync.Map +} + +func (m *MockDynamicEndpoint) GetName() string { + return "mockEndpoint" +} + +func (m *MockDynamicEndpoint) GetURL() *motan.URL { + return m.URL +} + +func (m *MockDynamicEndpoint) SetURL(url *motan.URL) { + m.URL = url +} + +func (m *MockDynamicEndpoint) IsAvailable() bool { + return m.available +} + +func (m *MockDynamicEndpoint) SetAvailable(a bool) { + m.available = a +} + +func (m *MockDynamicEndpoint) SetProxy(proxy bool) {} + +func (m *MockDynamicEndpoint) SetSerialization(s motan.Serialization) {} + +func (m *MockDynamicEndpoint) Call(request motan.Request) motan.Response { + if isMetaServiceRequest(request) { + resMap := make(map[string]string) + m.dynamicMeta.Range(func(key, value interface{}) bool { + resMap[key.(string)] = value.(string) + return true + }) + atomic.AddInt64(&m.Count, 1) + return &motan.MotanResponse{ProcessTime: 1, Value: resMap} + } + atomic.AddInt64(&m.Count, 1) + return &motan.MotanResponse{ProcessTime: 1, Value: "ok"} +} + +func (m *MockDynamicEndpoint) Destroy() {} + +func (m *MockDynamicEndpoint) GetRuntimeInfo() map[string]interface{} { + return make(map[string]interface{}) +} + +func (m *MockDynamicEndpoint) SetWeight(isDynamic bool, weight int64) { + if isDynamic { + m.DynamicWeight = weight + m.dynamicMeta.Store(motan.DefaultMetaPrefix+motan.WeightMetaSuffixKey, strconv.Itoa(int(weight))) + } else { + m.StaticWeight = weight + m.URL.PutParam(motan.DefaultMetaPrefix+motan.WeightMetaSuffixKey, strconv.Itoa(int(weight))) + } +} + +func NewMockDynamicEndpoint(url *motan.URL) *MockDynamicEndpoint { + return &MockDynamicEndpoint{ + URL: url, + available: true, + } +} + +func NewMockDynamicEndpointWithWeight(url *motan.URL, staticWeight int64) *MockDynamicEndpoint { + res := NewMockDynamicEndpoint(url) + res.StaticWeight = staticWeight + res.URL.PutParam(motan.DefaultMetaPrefix+motan.WeightMetaSuffixKey, strconv.Itoa(int(staticWeight))) + return res +} + +func isMetaServiceRequest(request motan.Request) bool { + return request != nil && "com.weibo.api.motan.runtime.meta.MetaService" == request.GetServiceName() && + "getDynamicMeta" == request.GetMethod() && "y" == request.GetAttachment(mpro.MFrameworkService) +} diff --git a/endpoint/motanCommonEndpoint.go b/endpoint/motanCommonEndpoint.go index ab04cf9e..8f6a87a0 100644 --- a/endpoint/motanCommonEndpoint.go +++ b/endpoint/motanCommonEndpoint.go @@ -620,7 +620,7 @@ func (s *Stream) RemoveFromChannel() bool { // Call send request to the server. // -// about return: exception in response will record error count, err will not. +// about return: exception in response will record error Count, err will not. func (c *Channel) Call(req motan.Request, deadline time.Duration, rc *motan.RPCContext) (motan.Response, error) { stream, err := c.newStream(req, rc, deadline) if err != nil { diff --git a/go.mod b/go.mod index e7623234..c2ab501c 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/weibocom/motan-go -go 1.11 +go 1.16 require ( github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5 @@ -16,15 +16,16 @@ require ( github.com/mitchellh/mapstructure v1.1.2 github.com/opentracing/opentracing-go v1.0.2 github.com/panjf2000/ants/v2 v2.9.0 + github.com/patrickmn/go-cache v2.1.0+incompatible github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec github.com/shirou/gopsutil/v3 v3.21.9 github.com/smartystreets/goconvey v1.6.4 // indirect - github.com/stretchr/testify v1.8.2 + github.com/stretchr/testify v1.8.4 github.com/valyala/fasthttp v1.2.0 github.com/weibreeze/breeze-go v0.1.1 - go.uber.org/atomic v1.4.0 // indirect + go.uber.org/atomic v1.4.0 go.uber.org/multierr v1.1.0 // indirect go.uber.org/zap v1.10.0 golang.org/x/net v0.0.0-20201224014010-6772e930b67b diff --git a/lb/lb.go b/lb/lb.go index 062c5333..48a59161 100644 --- a/lb/lb.go +++ b/lb/lb.go @@ -16,6 +16,7 @@ const ( Random = "random" Roundrobin = "roundrobin" ConsistentHashKey = "consistentHashKey" + WeightRoundRobin = "wrr" ) const ( @@ -39,6 +40,9 @@ func RegistDefaultLb(extFactory motan.ExtensionFactory) { extFactory.RegistExtLb(ConsistentHashKey, NewWeightLbFunc(func(url *motan.URL) motan.LoadBalance { return &ConsistentHashLB{url: url} })) + extFactory.RegistExtLb(WeightRoundRobin, NewWeightLbFunc(func(url *motan.URL) motan.LoadBalance { + return NewWeightRondRobinLb(url) + })) } // WeightedLbWrapper support multi group weighted LB @@ -55,6 +59,25 @@ func NewWeightLbFunc(newLb motan.NewLbFunc) motan.NewLbFunc { } } +func (w *WeightedLbWrapper) Destroy() { + destroyInnerRefers(w.refers) +} + +func destroyInnerRefers(refers innerRefers) { + if v, ok := refers.(*singleGroupRefers); ok { + if vlb, ok := v.lb.(motan.Destroyable); ok { + vlb.Destroy() + } + } + if v, ok := refers.(*weightedRefers); ok { + for _, lb := range v.groupLb { + if vlb, ok := lb.(motan.Destroyable); ok { + vlb.Destroy() + } + } + } +} + func (w *WeightedLbWrapper) OnRefresh(endpoints []motan.EndPoint) { if w.weightString == "" { //not weighted lb vlog.Infof("WeightedLbWrapper: %s - OnRefresh:not have weight", w.url.GetIdentity()) @@ -130,7 +153,9 @@ func (w *WeightedLbWrapper) OnRefresh(endpoints []motan.EndPoint) { } wr.weightRing = motan.SliceShuffle(ring) wr.ringSize = len(wr.weightRing) + oldRefers := w.refers w.refers = wr + destroyInnerRefers(oldRefers) vlog.Infof("WeightedLbWrapper: %s - OnRefresh: weight:%s", w.url.GetIdentity(), w.weightString) } @@ -140,7 +165,9 @@ func (w *WeightedLbWrapper) onRefreshSingleGroup(endpoints []motan.EndPoint) { } else { lb := w.newLb(w.url) lb.OnRefresh(endpoints) + oldRefers := w.refers w.refers = &singleGroupRefers{lb: lb} + destroyInnerRefers(oldRefers) } } diff --git a/lb/weightRoundRobinLb.go b/lb/weightRoundRobinLb.go new file mode 100644 index 00000000..9558344c --- /dev/null +++ b/lb/weightRoundRobinLb.go @@ -0,0 +1,308 @@ +package lb + +import ( + motan "github.com/weibocom/motan-go/core" + vlog "github.com/weibocom/motan-go/log" + "math/rand" + "sync" + "sync/atomic" +) + +type WeightRoundRobinLB struct { + selector Selector + mutex sync.RWMutex + refresher *WeightedEpRefresher +} + +func NewWeightRondRobinLb(url *motan.URL) *WeightRoundRobinLB { + lb := &WeightRoundRobinLB{} + lb.refresher = NewWeightEpRefresher(url, lb) + return lb +} + +func (r *WeightRoundRobinLB) OnRefresh(endpoints []motan.EndPoint) { + r.refresher.RefreshWeightedHolders(endpoints) +} + +func (r *WeightRoundRobinLB) Select(request motan.Request) motan.EndPoint { + selector := r.getSelector() + if selector == nil { + return nil + } + return selector.(Selector).DoSelect(request) +} + +func (r *WeightRoundRobinLB) SelectArray(request motan.Request) []motan.EndPoint { + // cannot use select array, return nil + return nil +} + +func (r *WeightRoundRobinLB) SetWeight(weight string) {} + +func (r *WeightRoundRobinLB) Destroy() { + r.refresher.Destroy() +} + +func (r *WeightRoundRobinLB) getSelector() Selector { + r.mutex.RLock() + defer r.mutex.RUnlock() + return r.selector +} + +func (r *WeightRoundRobinLB) setSelector(s Selector) { + r.mutex.Lock() + defer r.mutex.Unlock() + r.selector = s +} + +func (r *WeightRoundRobinLB) NotifyWeightChange() { + var tempHolders []*WeightedEpHolder + h := r.refresher.weightedEpHolders.Load() + if h == nil { + // fast stop + return + } + tempHolders = h.([]*WeightedEpHolder) + weights := make([]int, len(tempHolders)) + haveSameWeight := true + totalWeight := 0 + for i := 0; i < len(tempHolders); i++ { + weights[i] = int(tempHolders[i].getWeight()) + totalWeight += weights[i] + if weights[i] != weights[0] { + haveSameWeight = false + } + } + // if all eps have the same weight, then use RoundRobinSelector + if haveSameWeight { // use RoundRobinLoadBalance + selector := r.getSelector() + if selector != nil { + if v, ok := selector.(*roundRobinSelector); ok { // reuse the RoundRobinSelector + v.refresh(tempHolders) + return + } + } + // new RoundRobinLoadBalance + r.setSelector(newRoundRobinSelector(tempHolders)) + vlog.Infoln("WeightRoundRobinLoadBalance use RoundRobinSelector. url:" + r.getURLLogInfo()) + return + } + // find the GCD and divide the weights + gcd := findGcd(weights) + if gcd > 1 { + totalWeight = 0 // recalculate totalWeight + for i := 0; i < len(weights); i++ { + weights[i] /= gcd + totalWeight += weights[i] + } + } + // Check whether it is suitable to use WeightedRingSelector + if len(weights) <= wrMaxEpSize && totalWeight <= wrMaxTotalWeight { + r.setSelector(newWeightedRingSelector(tempHolders, totalWeight, weights)) + vlog.Infoln("WeightRoundRobinLoadBalance use WeightedRingSelector. url:" + r.getURLLogInfo()) + return + } + r.setSelector(newSlidingWindowWeightedRoundRobinSelector(tempHolders, weights)) + vlog.Infoln("WeightRoundRobinLoadBalance use SlidingWindowWeightedRoundRobinSelector. url:" + r.getURLLogInfo()) +} + +func (r *WeightRoundRobinLB) getURLLogInfo() string { + url := r.refresher.url + if url == nil { + holders := r.refresher.weightedEpHolders.Load() + if v, ok := holders.([]*WeightedEpHolder); ok { + if len(v) > 0 { + url = v[0].ep.GetURL() + } + } + } + if url == nil { + return "" + } + return url.GetIdentity() +} + +type Selector interface { + DoSelect(request motan.Request) motan.EndPoint +} + +type roundRobinSelector struct { + weightHolders atomic.Value + idx int64 +} + +func newRoundRobinSelector(holders []*WeightedEpHolder) *roundRobinSelector { + res := &roundRobinSelector{} + res.weightHolders.Store(holders) + return res +} + +func (r *roundRobinSelector) DoSelect(request motan.Request) motan.EndPoint { + temp := r.weightHolders.Load() + if temp == nil { + return nil + } + tempHolders := temp.([]*WeightedEpHolder) + ep := tempHolders[int(motan.GetNonNegative(atomic.AddInt64(&r.idx, 1)))%len(tempHolders)].ep + if ep.IsAvailable() { + return ep + } + // if the ep is not available, loop section from random position + start := rand.Intn(len(tempHolders)) + for i := 0; i < len(tempHolders); i++ { + ep = tempHolders[(i+start)%len(tempHolders)].ep + if ep.IsAvailable() { + return ep + } + } + return nil +} + +func (r *roundRobinSelector) refresh(holders []*WeightedEpHolder) { + r.weightHolders.Store(holders) +} + +const ( + wrMaxEpSize = 256 + wrMaxTotalWeight = 256 * 20 +) + +type weightedRingSelector struct { + weightHolders []*WeightedEpHolder + idx int64 + weights []int + weightRing []byte +} + +func newWeightedRingSelector(holders []*WeightedEpHolder, totalWeight int, weights []int) *weightedRingSelector { + wrs := &weightedRingSelector{ + weightHolders: holders, + weightRing: make([]byte, totalWeight), + weights: weights, + } + wrs.initWeightRing() + return wrs +} + +func (r *weightedRingSelector) initWeightRing() { + ringIndex := 0 + for i := 0; i < len(r.weights); i++ { + for j := 0; j < r.weights[i]; j++ { + r.weightRing[ringIndex] = byte(i) + ringIndex++ + } + } + if ringIndex != len(r.weightRing) { + vlog.Warningf("WeightedRingSelector initWeightRing with wrong totalWeight. expect:%d, actual: %d", len(r.weightRing), ringIndex) + } + r.weightRing = motan.ByteSliceShuffle(r.weightRing) + +} + +func (r *weightedRingSelector) DoSelect(request motan.Request) motan.EndPoint { + ep := r.weightHolders[r.getHolderIndex(int(motan.GetNonNegative(atomic.AddInt64(&r.idx, 1))))].ep + if ep.IsAvailable() { + return ep + } + // If the ep is not available, loop selection from random position + start := rand.Intn(len(r.weightRing)) + for i := 0; i < len(r.weightRing); i++ { + // byte could indicate 0~255 + ep = r.weightHolders[r.getHolderIndex(start+i)].getEp() + if ep.IsAvailable() { + return ep + } + } + return nil +} + +func (r *weightedRingSelector) getHolderIndex(ringIndex int) int { + holderIndex := int(r.weightRing[ringIndex%len(r.weightRing)]) + return holderIndex +} + +const ( + swwrDefaultWindowSize = 50 +) + +type slidingWindowWeightedRoundRobinSelector struct { + idx int64 + windowSize int + items []*selectorItem +} + +func newSlidingWindowWeightedRoundRobinSelector(holders []*WeightedEpHolder, weights []int) *slidingWindowWeightedRoundRobinSelector { + windowSize := len(weights) + if windowSize > swwrDefaultWindowSize { + windowSize = swwrDefaultWindowSize + // The window size cannot be divided by the number of referers, which ensures that the starting position + // of the window will gradually change during sliding + for len(weights)%windowSize == 0 { + windowSize-- + } + } + items := make([]*selectorItem, 0, len(holders)) + for i := 0; i < len(weights); i++ { + items = append(items, newSelectorItem(holders[i].getEp(), weights[i])) + } + return &slidingWindowWeightedRoundRobinSelector{ + items: items, + windowSize: windowSize, + } +} + +func (r *slidingWindowWeightedRoundRobinSelector) DoSelect(request motan.Request) motan.EndPoint { + windowStartIndex := motan.GetNonNegative(atomic.AddInt64(&r.idx, int64(r.windowSize))) + totalWeight := 0 + var sMaxWeight int64 = 0 + maxWeightIndex := 0 + // Use SWRR(https://github.com/nginx/nginx/commit/52327e0627f49dbda1e8db695e63a4b0af4448b1) to select referer from the current window. + // In order to reduce costs, do not limit concurrency in the entire selection process, + // and only use atomic updates for the current weight. + // Since concurrent threads will execute Select in different windows, + // the problem of instantaneous requests increase on one node due to concurrency will not be serious. + // And because the referers used on different client sides are shuffled, + // the impact of high instantaneous concurrent selection on the server side will be further reduced. + for i := 0; i < r.windowSize; i++ { + idx := (int(windowStartIndex) + i) % len(r.items) + item := r.items[idx] + if item.ep.IsAvailable() { + currentWeight := atomic.AddInt64(&item.currentWeight, int64(item.weight)) + totalWeight += item.weight + if currentWeight > sMaxWeight { + sMaxWeight = currentWeight + maxWeightIndex = idx + } + } + } + if sMaxWeight > 0 { + item := r.items[maxWeightIndex] + atomic.AddInt64(&item.currentWeight, int64(-totalWeight)) + if item.ep.IsAvailable() { + return item.ep + } + } + // If no suitable node is selected or the node is unavailable, + // then select an available referer from a random index + var idx = int(windowStartIndex) + rand.Intn(r.windowSize) + for i := 1; i < len(r.items); i++ { + item := r.items[(idx+i)%len(r.items)] + if item.ep.IsAvailable() { + return item.ep + } + } + return nil +} + +type selectorItem struct { + ep motan.EndPoint + weight int + currentWeight int64 +} + +func newSelectorItem(ep motan.EndPoint, weight int) *selectorItem { + return &selectorItem{ + weight: weight, + ep: ep, + } +} diff --git a/lb/weightRoundRobinLb_test.go b/lb/weightRoundRobinLb_test.go new file mode 100644 index 00000000..64093acb --- /dev/null +++ b/lb/weightRoundRobinLb_test.go @@ -0,0 +1,358 @@ +package lb + +import ( + "fmt" + "github.com/stretchr/testify/assert" + "github.com/weibocom/motan-go/core" + "github.com/weibocom/motan-go/endpoint" + "github.com/weibocom/motan-go/meta" + "math" + "math/rand" + "sync/atomic" + "testing" + "time" +) + +func TestDynamicStaticWeight(t *testing.T) { + url := &core.URL{ + Protocol: "motan2", + Host: "127.0.0.1", + Port: 8080, + Path: "mockService", + } + url.PutParam(core.DynamicMetaKey, "false") + // test dynamic weight config + lb := NewWeightRondRobinLb(url) + assert.False(t, lb.refresher.supportDynamicWeight) + lb.Destroy() + + url.PutParam(core.DynamicMetaKey, "true") + lb = NewWeightRondRobinLb(url) + assert.True(t, lb.refresher.supportDynamicWeight) + + meta.ClearMetaCache() + var staticWeight int64 = 9 + eps := buildTestDynamicEps(10, true, staticWeight, url) + eps = core.EndpointShuffle(eps) + lb.OnRefresh(eps) + _, ok := lb.getSelector().(*roundRobinSelector) + assert.True(t, ok) + for _, j := range lb.refresher.weightedEpHolders.Load().([]*WeightedEpHolder) { + // test static weight + assert.Equal(t, j.staticWeight, staticWeight) + assert.Equal(t, j.dynamicWeight, int64(0)) + assert.Equal(t, j.getWeight(), staticWeight) + } + + // test dynamic wight change + lb.refresher.weightedEpHolders.Load().([]*WeightedEpHolder)[3].ep.(*endpoint.MockDynamicEndpoint).SetWeight(true, 22) + meta.ClearMetaCache() + time.Sleep(time.Second * 5) + //assert.Equal(t, int64(22), lb.refresher.weightedEpHolders.Load().([]*WeightedEpHolder)[3].dynamicWeight) + //assert.Equal(t, int64(22), lb.refresher.weightedEpHolders.Load().([]*WeightedEpHolder)[3].getWeight()) + _, ok = lb.getSelector().(*weightedRingSelector) + assert.True(t, ok) + // test close refresh task + lb.Destroy() + time.Sleep(time.Second * 5) + assert.True(t, lb.refresher.isDestroyed.Load()) +} + +func TestGetEpWeight(t *testing.T) { + url := &core.URL{ + Protocol: "motan2", + Host: "127.0.0.1", + Port: 8080, + Path: "mockService", + } + ep := endpoint.NewMockDynamicEndpoint(url) + type test struct { + expectWeight int64 + fromDynamic bool + defaultWeight int64 + setWeight bool + setWeightDynamic bool + setWeightValue int64 + } + testSet := []test{ + // test static weight + {expectWeight: 11, fromDynamic: false, defaultWeight: 11, setWeight: false}, + {expectWeight: 5, fromDynamic: false, defaultWeight: 5, setWeight: false}, + {expectWeight: 8, fromDynamic: false, defaultWeight: 11, setWeight: true, setWeightDynamic: false, setWeightValue: 8}, + // test dynamic weight + {expectWeight: 0, fromDynamic: true, defaultWeight: 0, setWeight: false}, + {expectWeight: 5, fromDynamic: true, defaultWeight: 5, setWeight: false}, + {expectWeight: 15, fromDynamic: true, defaultWeight: 11, setWeight: true, setWeightDynamic: true, setWeightValue: 15}, + // test abnormal weight + {expectWeight: MinEpWeight, fromDynamic: false, defaultWeight: 11, setWeight: true, setWeightDynamic: false, setWeightValue: -8}, + {expectWeight: MaxEpWeight, fromDynamic: false, defaultWeight: 11, setWeight: true, setWeightDynamic: false, setWeightValue: 501}, + {expectWeight: MinEpWeight, fromDynamic: true, defaultWeight: 11, setWeight: true, setWeightDynamic: true, setWeightValue: -1}, + {expectWeight: MaxEpWeight, fromDynamic: true, defaultWeight: 11, setWeight: true, setWeightDynamic: true, setWeightValue: 666}, + } + + for _, j := range testSet { + if j.setWeight { + ep.SetWeight(j.setWeightDynamic, j.setWeightValue) + } + w, e := getEpWeight(ep, j.fromDynamic, j.defaultWeight) + assert.Nil(t, e) + assert.Equal(t, j.expectWeight, w) + meta.ClearMetaCache() + } +} + +func TestNotifyWeightChange(t *testing.T) { + type test struct { + size int + sameWeight bool + maxWeight int + selector string + } + testSet := []test{ + // test RR + {size: 20, sameWeight: true, maxWeight: 8, selector: "roundRobinSelector"}, + {size: 20, sameWeight: true, maxWeight: 0, selector: "roundRobinSelector"}, + {size: 20, sameWeight: true, maxWeight: 101, selector: "roundRobinSelector"}, + {size: 20, sameWeight: true, maxWeight: -10, selector: "roundRobinSelector"}, //abnormal weight + //// test WR + {size: 20, sameWeight: false, maxWeight: 8, selector: "weightedRingSelector"}, + {size: wrMaxEpSize, sameWeight: false, maxWeight: 8, selector: "weightedRingSelector"}, + {size: wrMaxEpSize, sameWeight: false, maxWeight: wrMaxTotalWeight / wrMaxEpSize, selector: "weightedRingSelector"}, + // test SWWRR + {size: wrMaxEpSize + 1, sameWeight: false, maxWeight: 6, selector: "slidingWindowWeightedRoundRobinSelector"}, + {size: 40, sameWeight: false, maxWeight: 800, selector: "slidingWindowWeightedRoundRobinSelector"}, + } + url := &core.URL{ + Protocol: "motan2", + Host: "127.0.0.1", + Port: 8080, + Path: "mockService", + } + url.PutParam(core.DynamicMetaKey, "false") + lb := NewWeightRondRobinLb(url) + for _, j := range testSet { + eps := buildTestDynamicEps(j.size, j.sameWeight, int64(j.maxWeight), url) + eps = core.EndpointShuffle(eps) + lb.OnRefresh(eps) + var ok bool + switch j.selector { + case "roundRobinSelector": + _, ok = lb.getSelector().(*roundRobinSelector) + case "weightedRingSelector": + _, ok = lb.getSelector().(*weightedRingSelector) + case "slidingWindowWeightedRoundRobinSelector": + _, ok = lb.getSelector().(*slidingWindowWeightedRoundRobinSelector) + } + assert.True(t, ok) + meta.ClearMetaCache() + } + lb.Destroy() +} + +func TestRoundRobinSelector(t *testing.T) { + url := &core.URL{ + Protocol: "motan2", + Host: "127.0.0.1", + Port: 8080, + Path: "mockService", + } + url.PutParam(core.DynamicMetaKey, "false") + lb := NewWeightRondRobinLb(url) + round := 100 + // small size + checkRR(t, lb, 20, 8, round, 1, 1, 0, url) + // large size + checkRR(t, lb, 500, 8, round, 1, 1, 0, url) + // some nodes are unavailable + maxRatio := 0.4 + avgRatio := 0.1 + round = 200 + checkRR(t, lb, 20, 8, round, float64(round)*maxRatio, float64(round)*avgRatio, 2, url) + checkRR(t, lb, 100, 8, round, float64(round)*maxRatio, float64(round)*avgRatio, 10, url) + + maxRatio = 0.7 + checkRR(t, lb, 300, 8, round, float64(round)*maxRatio, float64(round)*avgRatio, 50, url) + lb.Destroy() +} + +func checkRR(t *testing.T, lb *WeightRoundRobinLB, size int, initialMaxWeight int64, + round int, expectMaxDelta float64, expectAvgDelta float64, unavailableSize int, url *core.URL) { + eps := buildTestDynamicEpsWithUnavailable(size, true, initialMaxWeight, true, unavailableSize, url) + rand.Seed(time.Now().UnixNano()) + eps = core.EndpointShuffle(eps) + lb.OnRefresh(eps) + _, ok := lb.getSelector().(*roundRobinSelector) + assert.True(t, ok) + processCheck(t, lb, "RR", eps, round, expectMaxDelta, expectAvgDelta, unavailableSize) +} + +func TestWeightRingSelector(t *testing.T) { + url := &core.URL{ + Protocol: "motan2", + Host: "127.0.0.1", + Port: 8080, + Path: "mockService", + } + url.PutParam(core.DynamicMetaKey, "false") + lb := NewWeightRondRobinLb(url) + round := 100 + // small size + checkKWR(t, lb, 51, 49, round, 1, 1, 0, url) + // max node size of WR + checkKWR(t, lb, 256, 15, round, 1, 1, 0, url) + + // same nodes are unavailable + maxRatio := 0.4 + avgRatio := 0.1 + checkKWR(t, lb, 46, 75, round, float64(round)*maxRatio, float64(round)*avgRatio, 5, url) + checkKWR(t, lb, 231, 31, round, float64(round)*maxRatio, float64(round)*avgRatio, 35, url) + maxRatio = 0.6 + //checkKWR(t, lb, 211, 31, round, float64(round)*maxRatio, float64(round)*avgRatio, 45, url) + lb.Destroy() +} + +func checkKWR(t *testing.T, lb *WeightRoundRobinLB, size int, initialMaxWeight int64, + round int, expectMaxDelta float64, expectAvgDelta float64, unavailableSize int, url *core.URL) { + eps := buildTestDynamicEpsWithUnavailable(size, false, initialMaxWeight, true, unavailableSize, url) + rand.Seed(time.Now().UnixNano()) + eps = core.EndpointShuffle(eps) + lb.OnRefresh(eps) + _, ok := lb.getSelector().(*weightedRingSelector) + assert.True(t, ok) + processCheck(t, lb, "WR", eps, round, expectMaxDelta, expectAvgDelta, unavailableSize) +} + +func TestSlidingWindowWeightedRoundRobinSelector(t *testing.T) { + url := &core.URL{ + Protocol: "motan2", + Host: "127.0.0.1", + Port: 8080, + Path: "mockService", + } + url.PutParam(core.DynamicMetaKey, "false") + lb := NewWeightRondRobinLb(url) + round := 100 + // equals default window size, the accuracy is higher than sliding window + size := swwrDefaultWindowSize + checkSWWRR(t, lb, size, int64(wrMaxTotalWeight*3/size), round, 2, 1, 0, url) + // less than default window size + size = swwrDefaultWindowSize - 9 + checkSWWRR(t, lb, size, int64(wrMaxTotalWeight*3/size), round, 2, 1, 0, url) + + // greater than default window size + // sliding windows will reduce the accuracy of WRR, so the threshold should be appropriately increased + maxRatio := 0.5 + avgRatio := 0.1 + round = 200 + size = 270 + checkSWWRR(t, lb, size, 45, round, float64(round)*maxRatio, float64(round)*avgRatio, 0, url) + + // some nodes are unavailable + size = 260 + checkSWWRR(t, lb, size, int64(wrMaxTotalWeight*3/size), round, float64(round)*maxRatio, float64(round)*avgRatio, 10, url) + size = 399 + checkSWWRR(t, lb, size, 67, round, float64(round)*maxRatio, float64(round)*avgRatio, 40, url) + lb.Destroy() +} + +func checkSWWRR(t *testing.T, lb *WeightRoundRobinLB, size int, initialMaxWeight int64, + round int, expectMaxDelta float64, expectAvgDelta float64, unavailableSize int, url *core.URL) { + eps := buildTestDynamicEpsWithUnavailable(size, false, initialMaxWeight, true, unavailableSize, url) + rand.Seed(time.Now().UnixNano()) + eps = core.EndpointShuffle(eps) + lb.OnRefresh(eps) + _, ok := lb.getSelector().(*slidingWindowWeightedRoundRobinSelector) + assert.True(t, ok) + processCheck(t, lb, "SWWRR", eps, round, expectMaxDelta, expectAvgDelta, unavailableSize) +} + +func processCheck(t *testing.T, lb *WeightRoundRobinLB, typ string, eps []core.EndPoint, round int, + expectMaxDelta float64, expectAvgDelta float64, unavailableSize int) { + var totalWeight int64 = 0 + for _, ep := range eps { + if !ep.IsAvailable() { + continue + } + totalWeight += ep.(*endpoint.MockDynamicEndpoint).StaticWeight + } + for i := 0; i < int(totalWeight)*round; i++ { + lb.Select(nil).Call(nil) + } + var maxDelta float64 = 0.0 + var totalDelta float64 = 0.0 + unavailableCount := 0 + for _, ep := range eps { + if !ep.IsAvailable() { + unavailableCount++ + } else { + mep := ep.(*endpoint.MockDynamicEndpoint) + ratio := float64(atomic.LoadInt64(&mep.Count)) / float64(mep.StaticWeight) + delta := math.Abs(ratio - float64(round)) + if delta > maxDelta { + maxDelta = delta + } + totalDelta += delta + if delta > expectMaxDelta { + fmt.Printf("%s: count=%d, staticWeight=%d, ratio=%.2f, delta=%.2f\n", typ, atomic.LoadInt64(&mep.Count), mep.StaticWeight, ratio, delta) + } + assert.True(t, delta <= expectMaxDelta) // check max delta + } + } + // avg delta + avgDelta := totalDelta / float64(len(eps)-unavailableSize) + assert.True(t, avgDelta-float64(round) < expectAvgDelta) + fmt.Printf("%s: avgDeltaPercent=%.2f%%, maxDeltaPercent=%.2f%%, avgDelta=%.2f, maxDelta=%.2f\n", typ, avgDelta*float64(100)/float64(round), maxDelta*float64(100)/float64(round), avgDelta, maxDelta) + if unavailableSize > 0 { + assert.Equal(t, unavailableSize, unavailableCount) + } +} + +func buildTestDynamicEps(size int, sameStaticWeight bool, maxWeight int64, url *core.URL) []core.EndPoint { + return buildTestDynamicEpsWithUnavailable(size, sameStaticWeight, maxWeight, false, 0, url) +} + +func buildTestDynamicEpsWithUnavailable(size int, sameStaticWeight bool, maxWeight int64, adjust bool, unavailableSize int, url *core.URL) []core.EndPoint { + return buildTestEps(size, sameStaticWeight, maxWeight, adjust, unavailableSize, "", url) +} + +func buildTestEps(size int, sameStaticWeight bool, maxWeight int64, adjust bool, unavailableSize int, group string, url *core.URL) []core.EndPoint { + rand.Seed(time.Now().UnixNano()) + var res []core.EndPoint + for i := 0; i < size; i++ { + weight := maxWeight + if !sameStaticWeight { + weight = int64(rand.Float64() * float64(maxWeight)) + } + if adjust { + weight = doAdjust(weight) + } + curUrl := &core.URL{ + Protocol: "motan2", + Host: "127.0.0.1", + Port: 8080 + i, + Path: "mockService", + } + if url.GetParam(core.DynamicMetaKey, "") == "false" { + curUrl.PutParam(core.DynamicMetaKey, "false") + } + ep := endpoint.NewMockDynamicEndpointWithWeight(curUrl, weight) + if i < unavailableSize { + ep.SetAvailable(false) + } + if group != "" { + ep.URL.PutParam(core.GroupKey, group) + } + res = append(res, ep) + } + return res +} + +func doAdjust(w int64) int64 { + if w < MinEpWeight { + return MinEpWeight + } else if w > MaxEpWeight { + return MaxEpWeight + } else { + return w + } +} diff --git a/lb/weightedEpRefresher.go b/lb/weightedEpRefresher.go new file mode 100644 index 00000000..5633046a --- /dev/null +++ b/lb/weightedEpRefresher.go @@ -0,0 +1,219 @@ +package lb + +import ( + "errors" + motan "github.com/weibocom/motan-go/core" + vlog "github.com/weibocom/motan-go/log" + "github.com/weibocom/motan-go/meta" + "go.uber.org/atomic" + "golang.org/x/net/context" + "strconv" + "sync" + "time" +) + +const ( + defaultEpWeight = 10 + MinEpWeight = 1 + MaxEpWeight = 500 //protective restriction + defaultWeightRefreshPeriodSecond = 3 +) + +// WeightedEpRefresher is held by load balancer who needs dynamic endpoint weights +type WeightedEpRefresher struct { + url *motan.URL + refreshCanceler context.CancelFunc + supportDynamicWeight bool + weightedEpHolders atomic.Value + weightLB motan.WeightLoadBalance + isDestroyed atomic.Bool + mutex sync.Mutex +} + +func NewWeightEpRefresher(url *motan.URL, lb motan.WeightLoadBalance) *WeightedEpRefresher { + refreshPeriod := url.GetIntValue(motan.WeightRefreshPeriodSecondKey, defaultWeightRefreshPeriodSecond) + refreshCtx, refreshCancel := context.WithCancel(context.Background()) + wer := &WeightedEpRefresher{ + url: url, + supportDynamicWeight: url.GetBoolValue(motan.DynamicMetaKey, motan.DefaultDynamicMeta), + refreshCanceler: refreshCancel, + weightLB: lb, + } + go wer.doRefresh(refreshCtx, refreshPeriod) + return wer +} + +func (w *WeightedEpRefresher) Destroy() { + w.refreshCanceler() +} + +func (w *WeightedEpRefresher) notifyWeightChange() { + w.mutex.Lock() + defer w.mutex.Unlock() + vlog.Infoln("weight has changed, url: " + w.url.GetIdentity()) + w.weightLB.NotifyWeightChange() +} + +func (w *WeightedEpRefresher) RefreshWeightedHolders(eps []motan.EndPoint) { + var newHolder []*WeightedEpHolder + var oldHolder []*WeightedEpHolder + if t := w.weightedEpHolders.Load(); t != nil { + oldHolder = t.([]*WeightedEpHolder) + } + allHolder := make([]*WeightedEpHolder, 0, len(eps)) + for _, ep := range eps { + var holder *WeightedEpHolder + if oldHolder != nil { + // Check whether referer can be reused + for _, tempHolder := range oldHolder { + if ep == tempHolder.getEp() { + holder = tempHolder + break + } + } + } + if holder == nil { + staticWeight, _ := getEpWeight(ep, false, defaultEpWeight) + holder = BuildWeightedEpHolder(ep, staticWeight) + newHolder = append(newHolder, holder) + } + allHolder = append(allHolder, holder) + } + // only refresh new holders' dynamic weight + if len(newHolder) != 0 { + refreshDynamicWeight(newHolder, 15*1000) + } + w.weightedEpHolders.Store(allHolder) + w.notifyWeightChange() +} + +func refreshDynamicWeight(holders []*WeightedEpHolder, taskTimeout int64) bool { + needNotify := atomic.NewBool(false) + wg := sync.WaitGroup{} + wg.Add(len(holders)) + for _, h := range holders { + if h.supportDynamicWeight { + go func(holder *WeightedEpHolder) { + defer wg.Done() + oldWeight := holder.dynamicWeight + var err error + dw, err := getEpWeight(holder.getEp(), true, 0) + if err != nil { + if errors.Is(err, meta.ServiceNotSupportError) { + holder.supportDynamicWeight = false + } else { + vlog.Warningf("refresh dynamic weight fail! url:%s, error: %s\n", holder.getEp().GetURL().GetIdentity(), err.Error()) + } + return + } + holder.dynamicWeight = dw + if oldWeight != holder.dynamicWeight { + needNotify.Store(true) + } + }(h) + } else { + wg.Done() + } + } + // just wait certain amount of time + timer := time.NewTimer(time.Millisecond * time.Duration(taskTimeout)) + finishChan := make(chan struct{}) + defer close(finishChan) + go func() { + wg.Wait() + finishChan <- struct{}{} + }() + select { + case <-timer.C: + case <-finishChan: + timer.Stop() + } + return needNotify.Load() +} + +func getEpWeight(ep motan.EndPoint, fromDynamicMeta bool, defaultWeight int64) (int64, error) { + var metaMap map[string]string + var err error + if fromDynamicMeta { + metaMap, err = meta.GetEpDynamicMeta(ep) + } else { + metaMap = meta.GetEpStaticMeta(ep) + } + if err != nil { + return defaultWeight, err + } + weightStr := meta.GetMetaValue(metaMap, motan.WeightMetaSuffixKey) + return adjustWeight(ep, weightStr, defaultWeight), nil +} + +func adjustWeight(ep motan.EndPoint, weight string, defaultWeight int64) int64 { + res := defaultWeight + if weight != "" { + temp, err := strconv.ParseInt(weight, 10, 64) + if err != nil { + vlog.Warningf("WeightedRefererHolder parse weight fail. %s, use default weight %d, org weight: %s, err: %s\n", ep.GetURL().GetIdentity(), defaultWeight, weight, err.Error()) + return defaultWeight + } + if temp < MinEpWeight { + temp = MinEpWeight + } else if temp > MaxEpWeight { + temp = MaxEpWeight + } + res = temp + } + return res +} + +func (w *WeightedEpRefresher) doRefresh(ctx context.Context, refreshPeriod int64) { + ticker := time.NewTicker(time.Second * time.Duration(refreshPeriod)) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + w.isDestroyed.Store(true) + return + case <-ticker.C: + if w.supportDynamicWeight { + w.refreshHolderDynamicWeightTask() + } + } + } +} + +func (w *WeightedEpRefresher) refreshHolderDynamicWeightTask() { + // The reference of holders might be changed during the loop + // Only refresh historical holders + var tempLoaders []*WeightedEpHolder + if t := w.weightedEpHolders.Load(); t != nil { + tempLoaders = t.([]*WeightedEpHolder) + } + if refreshDynamicWeight(tempLoaders, 30*1000) { + w.notifyWeightChange() + } +} + +type WeightedEpHolder struct { + ep motan.EndPoint + staticWeight int64 + supportDynamicWeight bool + dynamicWeight int64 +} + +func BuildWeightedEpHolder(ep motan.EndPoint, staticWeight int64) *WeightedEpHolder { + return &WeightedEpHolder{ + ep: ep, + staticWeight: staticWeight, + supportDynamicWeight: ep.GetURL().GetBoolValue(motan.DynamicMetaKey, motan.DefaultDynamicMeta), + } +} + +func (w *WeightedEpHolder) getWeight() int64 { + if w.dynamicWeight > 0 { + return w.dynamicWeight + } + return w.staticWeight +} + +func (w *WeightedEpHolder) getEp() motan.EndPoint { + return w.ep +} diff --git a/manageHandler.go b/manageHandler.go index b60385af..8cd84f0c 100644 --- a/manageHandler.go +++ b/manageHandler.go @@ -5,6 +5,7 @@ import ( "bytes" "encoding/json" "fmt" + "github.com/weibocom/motan-go/meta" mserver "github.com/weibocom/motan-go/server" "html/template" "io" @@ -742,6 +743,36 @@ func setLogStatus(jsonEncoder *json.Encoder, logType, available string) { } } +type MetaInfo struct { + agent *Agent +} + +func (m *MetaInfo) SetAgent(agent *Agent) { + m.agent = agent +} + +func (m *MetaInfo) ServeHTTP(w http.ResponseWriter, r *http.Request) { + switch r.URL.Path { + case "/meta/update": + if r.Form == nil { + r.ParseMultipartForm(32 << 20) + } + for k, v := range r.Form { + if len(v) > 0 { + meta.PutDynamicMeta(k, v[0]) + } + } + JSONSuccess(w, "ok") + case "/meta/delete": + meta.RemoveDynamicMeta(r.FormValue("key")) + JSONSuccess(w, "ok") + case "/meta/get": + JSONSuccess(w, map[string]string{r.FormValue("key"): meta.GetDynamicMeta()[r.FormValue("key")]}) + case "/meta/getAll": + JSONSuccess(w, map[string]map[string]string{"meta": meta.GetDynamicMeta()}) + } +} + type HotReload struct { agent *Agent } @@ -870,6 +901,41 @@ func (h *RuntimeHandler) addInfos(info map[string]interface{}, key string, resul } } +// JSONSuccess return success JSON data +func JSONSuccess(w http.ResponseWriter, data interface{}) { + JSON(w, "", "ok", data) +} + +// JSONError return Error JSON data +func JSONError(w http.ResponseWriter, msg string) { + JSON(w, msg, "fail", nil) +} + +func JSON(w http.ResponseWriter, errMsg string, result string, data interface{}) { + if errMsg != "" { + d := struct { + Result string `json:"result"` + Error string `json:"error"` + }{ + Result: result, + Error: errMsg, + } + bs, _ := json.Marshal(d) + w.Write(bs) + } else { + d := struct { + Result string `json:"result"` + Data interface{} `json:"data"` + }{ + Result: result, + Data: data, + } + bs, _ := json.Marshal(d) + w.Write(bs) + } + +} + //------------ below code is copied from net/http/pprof ------------- // Cmdline responds with the running program's diff --git a/meta/meta.go b/meta/meta.go new file mode 100644 index 00000000..254babcf --- /dev/null +++ b/meta/meta.go @@ -0,0 +1,188 @@ +package meta + +import ( + "errors" + "github.com/patrickmn/go-cache" + "github.com/weibocom/motan-go/core" + "github.com/weibocom/motan-go/endpoint" + vlog "github.com/weibocom/motan-go/log" + mpro "github.com/weibocom/motan-go/protocol" + "os" + "strconv" + "strings" + "sync" + "time" +) + +const ( + defaultCacheExpireSecond = 3 + notSupportCacheExpireSecond = 30 +) + +var ( + dynamicMeta = core.NewStringMap(30) + envMeta = make(map[string]string) + envPrefix = core.DefaultMetaPrefix + metaCache = cache.New(time.Second*time.Duration(defaultCacheExpireSecond), 30*time.Second) + notSupportCache = cache.New(time.Second*time.Duration(notSupportCacheExpireSecond), 30*time.Second) + ServiceNotSupportError = errors.New(core.ServiceNotSupport) + notSupportSerializer = map[string]bool{ + "protobuf": true, + "grpc-pb": true, + "grpc-pb-json": true, + } + supportProtocols = map[string]bool{ + "motan": true, + "motan2": true, + "motanV1Compatible": true, + } + once = sync.Once{} +) + +func Initialize(ctx *core.Context) { + once.Do(func() { + expireSecond := defaultCacheExpireSecond + if ctx != nil && ctx.Config != nil { + envPrefix = ctx.Config.GetStringWithDefault(core.EnvMetaPrefixKey, core.DefaultMetaPrefix) + expireSecondStr := ctx.Config.GetStringWithDefault(core.MetaCacheExpireSecondKey, "") + if expireSecondStr != "" { + tempCacheExpireSecond, err := strconv.Atoi(expireSecondStr) + if err == nil && tempCacheExpireSecond > 0 { + expireSecond = tempCacheExpireSecond + } + } + } + vlog.Infof("meta cache expire time : %d(s)\n", expireSecond) + metaCache = cache.New(time.Second*time.Duration(expireSecond), 30*time.Second) + vlog.Infof("using meta prefix : %s\n", envPrefix) + // load meta info from env variable + for _, env := range os.Environ() { + if strings.HasPrefix(env, envPrefix) { + kv := strings.Split(env, "=") + envMeta[kv[0]] = kv[1] + } + } + }) +} + +func GetEnvMeta() map[string]string { + return envMeta +} + +func PutDynamicMeta(key, value string) { + dynamicMeta.Store(key, value) +} + +func RemoveDynamicMeta(key string) { + dynamicMeta.Delete(key) +} + +func GetDynamicMeta() map[string]string { + return dynamicMeta.RawMap() +} + +func GetMergedMeta() map[string]string { + mergedMap := make(map[string]string) + for k, v := range envMeta { + mergedMap[k] = v + } + for k, v := range dynamicMeta.RawMap() { + mergedMap[k] = v + } + return mergedMap +} + +func GetMetaValue(meta map[string]string, keySuffix string) string { + var res string + if meta != nil { + if v, ok := meta[envPrefix+keySuffix]; ok { + res = v + } else { + res = meta[core.DefaultMetaPrefix+keySuffix] + } + } + return res +} + +func GetEpDynamicMeta(endpoint core.EndPoint) (map[string]string, error) { + cacheKey := getCacheKey(endpoint.GetURL()) + if v, ok := metaCache.Get(cacheKey); ok { + return v.(map[string]string), nil + } + res, err := getRemoteDynamicMeta(cacheKey, endpoint) + if err != nil { + return nil, err + } + metaCache.Set(cacheKey, res, cache.DefaultExpiration) + return res, nil +} + +// GetEpStaticMeta get remote static meta information from referer url attachments. +// the static meta is init at server start from env. +func GetEpStaticMeta(endpoint core.EndPoint) map[string]string { + res := make(map[string]string) + url := endpoint.GetURL() + if url != nil { + for k, v := range url.Parameters { + if strings.HasPrefix(k, core.DefaultMetaPrefix) || strings.HasPrefix(k, envPrefix) { + res[k] = v + } + } + } + return res +} + +func getRemoteDynamicMeta(cacheKey string, endpoint core.EndPoint) (map[string]string, error) { + if _, ok := notSupportCache.Get(cacheKey); ok || !isSupport(cacheKey, endpoint.GetURL()) { + return nil, ServiceNotSupportError + } + if !endpoint.IsAvailable() { + return nil, errors.New("endpoint unavailable") + } + resp := endpoint.Call(getMetaServiceRequest()) + if resp.GetException() != nil { + if resp.GetException().ErrMsg == core.ServiceNotSupport { + notSupportCache.Set(cacheKey, true, cache.DefaultExpiration) + return nil, ServiceNotSupportError + } + return nil, errors.New(resp.GetException().ErrMsg) + } + reply := make(map[string]string) + err := resp.ProcessDeserializable(&reply) + if err != nil { + return nil, err + } + return resp.GetValue().(map[string]string), nil +} + +func getMetaServiceRequest() core.Request { + req := &core.MotanRequest{ + RequestID: endpoint.GenerateRequestID(), + ServiceName: MetaServiceName, + Method: MetaMethodName, + Attachment: core.NewStringMap(core.DefaultAttachmentSize), + Arguments: []interface{}{}, + } + req.SetAttachment(mpro.MFrameworkService, "y") + return req +} + +func getCacheKey(url *core.URL) string { + return url.Host + ":" + url.GetPortStr() +} + +func isSupport(cacheKey string, url *core.URL) bool { + // check dynamicMeta config, protocol and serializer + if url.GetBoolValue(core.DynamicMetaKey, core.DefaultDynamicMeta) && + !notSupportSerializer[url.GetStringParamsWithDefault(core.SerializationKey, "")] && + supportProtocols[url.Protocol] { + return true + } + notSupportCache.Set(cacheKey, true, cache.DefaultExpiration) + return false +} + +func ClearMetaCache() { + metaCache.Flush() + notSupportCache.Flush() +} diff --git a/meta/service.go b/meta/service.go new file mode 100644 index 00000000..326a7843 --- /dev/null +++ b/meta/service.go @@ -0,0 +1,13 @@ +package meta + +const ( + MetaServiceName = "com.weibo.api.motan.runtime.meta.MetaService" + MetaMethodName = "getDynamicMeta" +) + +type MetaService struct { +} + +func (s *MetaService) getDynamicMeta() map[string]string { + return GetDynamicMeta() +} diff --git a/protocol/motanProtocol.go b/protocol/motanProtocol.go index ea9321ec..789db413 100644 --- a/protocol/motanProtocol.go +++ b/protocol/motanProtocol.go @@ -41,18 +41,19 @@ const ( ) const ( - MPath = "M_p" - MMethod = "M_m" - MException = "M_e" - MProcessTime = "M_pt" - MMethodDesc = "M_md" - MGroup = "M_g" - MProxyProtocol = "M_pp" - MVersion = "M_v" - MModule = "M_mdu" - MSource = "M_s" - MRequestID = "M_rid" - MTimeout = "M_tmo" + MPath = "M_p" + MMethod = "M_m" + MException = "M_e" + MProcessTime = "M_pt" + MMethodDesc = "M_md" + MGroup = "M_g" + MProxyProtocol = "M_pp" + MVersion = "M_v" + MModule = "M_mdu" + MSource = "M_s" + MRequestID = "M_rid" + MTimeout = "M_tmo" + MFrameworkService = "M_fws" ) type Header struct { diff --git a/provider/metaProvider.go b/provider/metaProvider.go new file mode 100644 index 00000000..bde124b8 --- /dev/null +++ b/provider/metaProvider.go @@ -0,0 +1,50 @@ +package provider + +import ( + motan "github.com/weibocom/motan-go/core" + "github.com/weibocom/motan-go/meta" +) + +type MetaProvider struct{} + +func (m *MetaProvider) Initialize() {} + +func (m *MetaProvider) Call(request motan.Request) motan.Response { + resp := &motan.MotanResponse{ + RequestID: request.GetRequestID(), + Value: meta.GetDynamicMeta(), + } + return resp +} + +func (m *MetaProvider) GetPath() string { + return "com.weibo.api.motan.runtime.meta.MetaService" +} + +func (m *MetaProvider) SetService(s interface{}) {} + +func (m *MetaProvider) SetContext(context *motan.Context) {} + +func (m *MetaProvider) GetName() string { + return "metaProvider" +} + +func (m *MetaProvider) GetURL() *motan.URL { + return nil +} + +func (m *MetaProvider) SetURL(url *motan.URL) {} + +func (m *MetaProvider) SetSerialization(s motan.Serialization) {} + +func (m *MetaProvider) SetProxy(proxy bool) {} + +func (m *MetaProvider) Destroy() {} + +func (m *MetaProvider) IsAvailable() bool { + return true +} + +func (m *MetaProvider) GetRuntimeInfo() map[string]interface{} { + return make(map[string]interface{}) +} diff --git a/server/server.go b/server/server.go index 3206b517..cae13134 100644 --- a/server/server.go +++ b/server/server.go @@ -3,6 +3,9 @@ package server import ( "errors" "fmt" + "github.com/weibocom/motan-go/meta" + mpro "github.com/weibocom/motan-go/protocol" + "github.com/weibocom/motan-go/provider" "sync" motan "github.com/weibocom/motan-go/core" @@ -74,6 +77,12 @@ func (d *DefaultExporter) Export(server motan.Server, extFactory motan.Extension d.extFactory = extFactory d.server = server d.url = d.provider.GetURL() + // add server side meta info to the url, so these meta info can be passed to the client side through the registration mechanism. + if d.url.GetBoolValue(motan.URLRegisterMeta, motan.DefaultRegisterMeta) { + for k, v := range meta.GetEnvMeta() { + d.url.PutParam(k, v) + } + } d.url.PutParam(motan.NodeTypeKey, motan.NodeTypeService) // node type must be service in export regs, ok := d.url.Parameters[motan.RegistryKey] if !ok { @@ -152,7 +161,8 @@ func (d *DefaultExporter) SetURL(url *motan.URL) { } type DefaultMessageHandler struct { - providers map[string]motan.Provider + providers map[string]motan.Provider + frameworkProviders map[string]motan.Provider } func (d *DefaultMessageHandler) GetName() string { @@ -172,6 +182,12 @@ func (d *DefaultMessageHandler) GetRuntimeInfo() map[string]interface{} { func (d *DefaultMessageHandler) Initialize() { d.providers = make(map[string]motan.Provider) + d.frameworkProviders = make(map[string]motan.Provider) + d.initFrameworkServiceProvider() +} + +func (d *DefaultMessageHandler) initFrameworkServiceProvider() { + d.frameworkProviders[meta.MetaServiceName] = &provider.MetaProvider{} } func (d *DefaultMessageHandler) AddProvider(p motan.Provider) error { @@ -195,6 +211,13 @@ func (d *DefaultMessageHandler) Call(request motan.Request) (res motan.Response) res = motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: 500, ErrMsg: "provider call panic", ErrType: motan.ServiceException}) vlog.Errorf("provider call panic. req:%s", motan.GetReqInfo(request)) }) + if mfs := request.GetAttachment(mpro.MFrameworkService); mfs != "" { + if fp, ok := d.frameworkProviders[request.GetServiceName()]; ok { + return fp.(motan.Provider).Call(request) + } + //throw specific exception to avoid triggering forced fusing on the client side。 + return motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: 501, ErrMsg: motan.ServiceNotSupport, ErrType: motan.ServiceException}) + } p := d.providers[request.GetServiceName()] if p != nil { res = p.Call(request)