Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/Weighted lb and Meta info #394

Merged
merged 13 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ jobs:
testing:
strategy:
matrix:
go-version: [1.12.x,1.13.x,1.14.x,1.15.x,1.16.x,1.17.x,1.18.x,1.19.x,1.20.x,1.21.x]
go-version: [1.16.x,1.17.x,1.18.x,1.19.x,1.20.x,1.21.x]
platform: [ubuntu-latest]
runs-on: ${{ matrix.platform }}
steps:
Expand All @@ -45,10 +45,10 @@ jobs:
name: codecov
runs-on: ubuntu-latest
steps:
- name: Set up Go 1.15
- name: Set up Go 1.16
uses: actions/setup-go@v3
with:
go-version: 1.15.x
go-version: 1.16.x
id: go

- name: Checkout code
Expand Down
20 changes: 19 additions & 1 deletion agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"github.com/weibocom/motan-go/endpoint"
vlog "github.com/weibocom/motan-go/log"
"github.com/weibocom/motan-go/meta"
"github.com/weibocom/motan-go/provider"
"gopkg.in/yaml.v2"
"io/ioutil"
"net"
Expand Down Expand Up @@ -193,6 +195,8 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) {
return
}
fmt.Println("init agent context success.")
// initialize meta package
meta.Initialize(a.Context)
a.initParam()
a.SetSanpshotConf()
a.initAgentURL()
Expand Down Expand Up @@ -948,7 +952,8 @@ func (a *Agent) doExportService(url *motan.URL) {
}

type serverAgentMessageHandler struct {
providers *motan.CopyOnWriteMap
providers *motan.CopyOnWriteMap
frameworkProviders *motan.CopyOnWriteMap
}

func (sa *serverAgentMessageHandler) GetName() string {
Expand All @@ -973,6 +978,12 @@ func (sa *serverAgentMessageHandler) GetRuntimeInfo() map[string]interface{} {

func (sa *serverAgentMessageHandler) Initialize() {
sa.providers = motan.NewCopyOnWriteMap()
sa.frameworkProviders = motan.NewCopyOnWriteMap()
sa.initFrameworkServiceProvider()
}

func (sa *serverAgentMessageHandler) initFrameworkServiceProvider() {
sa.frameworkProviders.Store(meta.MetaServiceName, &provider.MetaProvider{})
}

func getServiceKey(group, path string) string {
Expand All @@ -990,6 +1001,13 @@ func (sa *serverAgentMessageHandler) Call(request motan.Request) (res motan.Resp
group = request.GetAttachment(motan.GroupKey)
}
serviceKey := getServiceKey(group, request.GetServiceName())
if mfs := request.GetAttachment(mpro.MFrameworkService); mfs != "" {
if fp, ok := sa.frameworkProviders.Load(request.GetServiceName()); ok {
return fp.(motan.Provider).Call(request)
}
//throw specific exception to avoid triggering forced fusing on the client side。
return motan.BuildExceptionResponse(request.GetRequestID(), &motan.Exception{ErrCode: 501, ErrMsg: motan.ServiceNotSupport, ErrType: motan.ServiceException})
}
if p := sa.providers.LoadOrNil(serviceKey); p != nil {
p := p.(motan.Provider)
res = p.Call(request)
Expand Down
4 changes: 4 additions & 0 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,3 +173,7 @@ func (m *MCContext) GetRefer(service string) interface{} {
// TODO 对client的封装,可以根据idl自动生成代码时支持
return nil
}

func (m *MCContext) GetContext() *motan.Context {
return m.context
}
3 changes: 3 additions & 0 deletions cluster/motanCluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,9 @@ func (m *MotanCluster) Destroy() {
vlog.Infof("destroy endpoint %s .", e.GetURL().GetIdentity())
e.Destroy()
}
if d, ok := m.LoadBalance.(motan.Destroyable); ok {
d.Destroy()
}
m.closed = true
}
}
Expand Down
9 changes: 7 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,12 +127,17 @@ func (c *Config) Int64(key string) (int64, error) {

// String returns the string value for a given key.
func (c *Config) String(key string) string {
return c.GetStringWithDefault(key, "")
}

// String returns the string value for a given key.
func (c *Config) GetStringWithDefault(key string, def string) string {
if value, err := c.getData(key); err != nil {
return ""
return def
} else if vv, ok := value.(string); ok {
return vv
}
return ""
return def
}

// GetSection returns map for the given key
Expand Down
15 changes: 15 additions & 0 deletions core/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,21 @@ const (
DefaultReferVersion = "1.0"
)

// meta info
const (
DefaultMetaPrefix = "META_"
EnvMetaPrefixKey = "envMetaPrefix"
URLRegisterMeta = "registerMeta"
DefaultRegisterMeta = true
MetaCacheExpireSecondKey = "metaCacheExpireSecond"
DynamicMetaKey = "dynamicMeta"
DefaultDynamicMeta = true
WeightRefreshPeriodSecondKey = "weightRefreshPeriodSecond"
WeightMetaSuffixKey = "WEIGHT"
ServiceNotSupport = "service not support"
)


//----------- runtime -------------

const (
Expand Down
1 change: 0 additions & 1 deletion core/globalContext.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,7 +438,6 @@ func (c *Context) basicConfToURLs(section string) map[string]*URL {
if len(finalFilters) > 0 {
newURL.PutParam(FilterKey, c.FilterSetToStr(finalFilters))
}

newURLs[key] = newURL
}
return newURLs
Expand Down
6 changes: 6 additions & 0 deletions core/motan.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,12 @@ type LoadBalance interface {
SetWeight(weight string)
}

// WeightLoadBalance : weight loadBalance for cluster
type WeightLoadBalance interface {
LoadBalance
NotifyWeightChange()
}

// DiscoverService : discover service for cluster
type DiscoverService interface {
Subscribe(url *URL, listener NotifyListener)
Expand Down
8 changes: 5 additions & 3 deletions core/url.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,9 +336,11 @@ func (u *URL) CanServe(other *URL) bool {
vlog.Errorf("can not serve path, err : p1:%s, p2:%s", u.Path, other.Path)
return false
}
if !IsSame(u.Parameters, other.Parameters, SerializationKey, "") {
vlog.Errorf("can not serve serialization, err : s1:%s, s2:%s", u.Parameters[SerializationKey], other.Parameters[SerializationKey])
return false
if u.Protocol != "motan2" {
if !IsSame(u.Parameters, other.Parameters, SerializationKey, "") {
vlog.Errorf("can not serve serialization, err : s1:%s, s2:%s", u.Parameters[SerializationKey], other.Parameters[SerializationKey])
return false
}
}
// compatible with old version: 0.1
if !(IsSame(u.Parameters, other.Parameters, VersionKey, "0.1") || IsSame(u.Parameters, other.Parameters, VersionKey, DefaultReferVersion)) {
Expand Down
25 changes: 25 additions & 0 deletions core/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,24 @@ func SliceShuffle(slice []string) []string {
return slice
}

func EndpointShuffle(slice []EndPoint) []EndPoint {
rayzhang0603 marked this conversation as resolved.
Show resolved Hide resolved
for i := 0; i < len(slice); i++ {
a := rand.Intn(len(slice))
b := rand.Intn(len(slice))
slice[a], slice[b] = slice[b], slice[a]
}
return slice
}

func ByteSliceShuffle(slice []byte) []byte {
for i := 0; i < len(slice); i++ {
a := rand.Intn(len(slice))
b := rand.Intn(len(slice))
slice[a], slice[b] = slice[b], slice[a]
}
return slice
}

func FirstUpper(s string) string {
r := []rune(s)

Expand Down Expand Up @@ -286,3 +304,10 @@ func ClearDirectEnvRegistry() {
directRpc = nil
initDirectEnv = sync.Once{}
}

func GetNonNegative(originValue int64) int64 {
if originValue > 0 {
return originValue
}
return 0x7fffffffffffffff & originValue
}
6 changes: 6 additions & 0 deletions default.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,12 @@ func GetDefaultManageHandlers() map[string]http.Handler {
defaultManageHandlers["/registry/list"] = dynamicConfigurer
defaultManageHandlers["/registry/info"] = dynamicConfigurer

metaInfo := &MetaInfo{}
defaultManageHandlers["/meta/update"] = metaInfo
defaultManageHandlers["/meta/delete"] = metaInfo
defaultManageHandlers["/meta/get"] = metaInfo
defaultManageHandlers["/meta/getAll"] = metaInfo

hotReload := &HotReload{}
defaultManageHandlers["/reload/clusters"] = hotReload

Expand Down
91 changes: 91 additions & 0 deletions endpoint/mockDynamicEndpoint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package endpoint

import (
motan "github.com/weibocom/motan-go/core"
mpro "github.com/weibocom/motan-go/protocol"
"strconv"
"sync"
"sync/atomic"
)

type MockDynamicEndpoint struct {
URL *motan.URL
available bool
DynamicWeight int64
StaticWeight int64
Count int64
dynamicMeta sync.Map
}

func (m *MockDynamicEndpoint) GetName() string {
return "mockEndpoint"
}

func (m *MockDynamicEndpoint) GetURL() *motan.URL {
return m.URL
}

func (m *MockDynamicEndpoint) SetURL(url *motan.URL) {
m.URL = url
}

func (m *MockDynamicEndpoint) IsAvailable() bool {
return m.available
}

func (m *MockDynamicEndpoint) SetAvailable(a bool) {
m.available = a
}

func (m *MockDynamicEndpoint) SetProxy(proxy bool) {}

func (m *MockDynamicEndpoint) SetSerialization(s motan.Serialization) {}

func (m *MockDynamicEndpoint) Call(request motan.Request) motan.Response {
if isMetaServiceRequest(request) {
resMap := make(map[string]string)
m.dynamicMeta.Range(func(key, value interface{}) bool {
resMap[key.(string)] = value.(string)
return true
})
atomic.AddInt64(&m.Count, 1)
return &motan.MotanResponse{ProcessTime: 1, Value: resMap}
}
atomic.AddInt64(&m.Count, 1)
return &motan.MotanResponse{ProcessTime: 1, Value: "ok"}
}

func (m *MockDynamicEndpoint) Destroy() {}

func (m *MockDynamicEndpoint) GetRuntimeInfo() map[string]interface{} {
return make(map[string]interface{})
}

func (m *MockDynamicEndpoint) SetWeight(isDynamic bool, weight int64) {
if isDynamic {
m.DynamicWeight = weight
m.dynamicMeta.Store(motan.DefaultMetaPrefix+motan.WeightMetaSuffixKey, strconv.Itoa(int(weight)))
} else {
m.StaticWeight = weight
m.URL.PutParam(motan.DefaultMetaPrefix+motan.WeightMetaSuffixKey, strconv.Itoa(int(weight)))
}
}

func NewMockDynamicEndpoint(url *motan.URL) *MockDynamicEndpoint {
return &MockDynamicEndpoint{
URL: url,
available: true,
}
}

func NewMockDynamicEndpointWithWeight(url *motan.URL, staticWeight int64) *MockDynamicEndpoint {
res := NewMockDynamicEndpoint(url)
res.StaticWeight = staticWeight
res.URL.PutParam(motan.DefaultMetaPrefix+motan.WeightMetaSuffixKey, strconv.Itoa(int(staticWeight)))
return res
}

func isMetaServiceRequest(request motan.Request) bool {
return request != nil && "com.weibo.api.motan.runtime.meta.MetaService" == request.GetServiceName() &&
"getDynamicMeta" == request.GetMethod() && "y" == request.GetAttachment(mpro.MFrameworkService)
}
2 changes: 1 addition & 1 deletion endpoint/motanCommonEndpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,7 +620,7 @@ func (s *Stream) RemoveFromChannel() bool {

// Call send request to the server.
//
// about return: exception in response will record error count, err will not.
// about return: exception in response will record error Count, err will not.
func (c *Channel) Call(req motan.Request, deadline time.Duration, rc *motan.RPCContext) (motan.Response, error) {
stream, err := c.newStream(req, rc, deadline)
if err != nil {
Expand Down
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
module github.com/weibocom/motan-go

go 1.11
go 1.16

require (
github.com/afex/hystrix-go v0.0.0-20180502004556-fa1af6a1f4f5
Expand All @@ -16,15 +16,16 @@ require (
github.com/mitchellh/mapstructure v1.1.2
github.com/opentracing/opentracing-go v1.0.2
github.com/panjf2000/ants/v2 v2.9.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a
github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
github.com/shirou/gopsutil/v3 v3.21.9
github.com/smartystreets/goconvey v1.6.4 // indirect
github.com/stretchr/testify v1.8.2
github.com/stretchr/testify v1.8.4
github.com/valyala/fasthttp v1.2.0
github.com/weibreeze/breeze-go v0.1.1
go.uber.org/atomic v1.4.0 // indirect
go.uber.org/atomic v1.4.0
go.uber.org/multierr v1.1.0 // indirect
go.uber.org/zap v1.10.0
golang.org/x/net v0.0.0-20201224014010-6772e930b67b
Expand Down
Loading
Loading