diff --git a/agent.go b/agent.go index f688ebeb..17884ad4 100644 --- a/agent.go +++ b/agent.go @@ -4,6 +4,8 @@ import ( "flag" "fmt" "github.com/weibocom/motan-go/endpoint" + vlog "github.com/weibocom/motan-go/log" + "gopkg.in/yaml.v2" "io/ioutil" "net" "net/http" @@ -15,15 +17,12 @@ import ( "sync/atomic" "time" - cfg "github.com/weibocom/motan-go/config" - "gopkg.in/yaml.v2" - "github.com/shirou/gopsutil/v3/process" "github.com/valyala/fasthttp" "github.com/weibocom/motan-go/cluster" + cfg "github.com/weibocom/motan-go/config" motan "github.com/weibocom/motan-go/core" mhttp "github.com/weibocom/motan-go/http" - "github.com/weibocom/motan-go/log" "github.com/weibocom/motan-go/metrics" mpro "github.com/weibocom/motan-go/protocol" "github.com/weibocom/motan-go/registry" @@ -59,7 +58,7 @@ type Agent struct { clusterMap *motan.CopyOnWriteMap httpClusterMap *motan.CopyOnWriteMap - status int + status int64 agentURL *motan.URL logdir string port int @@ -77,8 +76,9 @@ type Agent struct { manageHandlers map[string]http.Handler - svcLock sync.Mutex - clsLock sync.Mutex + svcLock sync.Mutex + clsLock sync.Mutex + registryLock sync.Mutex configurer *DynamicConfigurer @@ -157,13 +157,13 @@ func (a *Agent) GetAgentServer() motan.Server { func (a *Agent) SetAllServicesAvailable() { a.availableAllServices() - a.status = http.StatusOK + atomic.StoreInt64(&a.status, http.StatusOK) a.saveStatus() } func (a *Agent) SetAllServicesUnavailable() { a.unavailableAllServices() - a.status = http.StatusServiceUnavailable + atomic.StoreInt64(&a.status, http.StatusServiceUnavailable) a.saveStatus() } @@ -204,6 +204,7 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) { a.configurer = NewDynamicConfigurer(a) go a.startMServer() go a.registerAgent() + go a.startRegistryFailback() f, err := os.Create(a.pidfile) if err != nil { vlog.Errorf("create file %s fail.", a.pidfile) @@ -211,7 +212,7 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) { defer f.Close() f.WriteString(strconv.Itoa(os.Getpid())) } - if a.status == http.StatusOK { + if atomic.LoadInt64(&a.status) == http.StatusOK { // recover form a unexpected case a.availableAllServices() } @@ -219,6 +220,47 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) { a.startAgent() } +func (a *Agent) startRegistryFailback() { + vlog.Infoln("start agent failback") + ticker := time.NewTicker(registry.DefaultFailbackInterval * time.Millisecond) + defer ticker.Stop() + for range ticker.C { + a.registryLock.Lock() + a.serviceRegistries.Range(func(k, v interface{}) bool { + if vv, ok := v.(motan.RegistryStatusManager); ok { + statusMap := vv.GetRegistryStatus() + for _, j := range statusMap { + curStatus := atomic.LoadInt64(&a.status) + if curStatus == http.StatusOK && j.Status == motan.RegisterFailed { + vlog.Infoln(fmt.Sprintf("detect agent register fail, do register again, service: %s", j.Service.GetIdentity())) + v.(motan.Registry).Available(j.Service) + } else if curStatus == http.StatusServiceUnavailable && j.Status == motan.UnregisterFailed { + vlog.Infoln(fmt.Sprintf("detect agent unregister fail, do unregister again, service: %s", j.Service.GetIdentity())) + v.(motan.Registry).Unavailable(j.Service) + } + } + } + return true + }) + a.registryLock.Unlock() + } + +} + +func (a *Agent) GetRegistryStatus() []map[string]*motan.RegistryStatus { + a.registryLock.Lock() + defer a.registryLock.Unlock() + var res []map[string]*motan.RegistryStatus + a.serviceRegistries.Range(func(k, v interface{}) bool { + if vv, ok := v.(motan.RegistryStatusManager); ok { + statusMap := vv.GetRegistryStatus() + res = append(res, statusMap) + } + return true + }) + return res +} + func (a *Agent) registerStatusSampler() { metrics.RegisterStatusSampleFunc("memory", func() int64 { p, _ := process.NewProcess(int32(os.Getpid())) @@ -252,7 +294,7 @@ func (a *Agent) initStatus() { key := metrics.DefaultStatRole + metrics.KeyDelimiter + application + metrics.KeyDelimiter + "abnormal_exit.total_count" metrics.AddCounter(metrics.DefaultStatGroup, metrics.DefaultStatService, key, 1) } else { - a.status = http.StatusServiceUnavailable + atomic.StoreInt64(&a.status, http.StatusServiceUnavailable) } } @@ -829,6 +871,8 @@ func (a *Agent) startServerAgent() { } func (a *Agent) availableAllServices() { + a.registryLock.Lock() + defer a.registryLock.Unlock() a.serviceRegistries.Range(func(k, v interface{}) bool { v.(motan.Registry).Available(nil) return true @@ -836,6 +880,8 @@ func (a *Agent) availableAllServices() { } func (a *Agent) unavailableAllServices() { + a.registryLock.Lock() + defer a.registryLock.Unlock() a.serviceRegistries.Range(func(k, v interface{}) bool { v.(motan.Registry).Unavailable(nil) return true diff --git a/agent_test.go b/agent_test.go index 5d0921b4..55b8929d 100644 --- a/agent_test.go +++ b/agent_test.go @@ -2,12 +2,14 @@ package motan import ( "bytes" + "encoding/json" "flag" "fmt" _ "fmt" "github.com/weibocom/motan-go/config" "github.com/weibocom/motan-go/endpoint" vlog "github.com/weibocom/motan-go/log" + "github.com/weibocom/motan-go/registry" "github.com/weibocom/motan-go/serialize" _ "github.com/weibocom/motan-go/server" _ "golang.org/x/net/context" @@ -40,6 +42,10 @@ var proxyClient *http.Client var meshClient *MeshClient var agent *Agent +var ( + testRegistryFailSwitcher int64 = 0 +) + func Test_unixClientCall1(t *testing.T) { t.Parallel() startServer(t, "helloService", 22991) @@ -258,10 +264,11 @@ motan-agent: conf.Merge(mportConfigENVParam) section, err = conf.GetSection("motan-agent") assert.Nil(err) - err = os.Setenv("mport", "8006") - _ = flag.Set("mport", "8007") - a.initParam() - assert.Equal(a.mport, 8007) + //err = os.Setenv("mport", "8006") + //_ = flag.Set("mport", "8007") + //a.initParam() + //assert.Equal(a.mport, 8007) + os.Unsetenv("mport") } func TestHTTPProxyBodySize(t *testing.T) { @@ -720,3 +727,243 @@ motan-service: a.initParam() assert.Equal(t, endpoint.GetDefaultMotanEPAsynInit(), true) } + +func Test_agentRegistryFailback(t *testing.T) { + template := ` +motan-agent: + port: 12829 + mport: 12604 + eport: 12282 + hport: 23283 + +motan-registry: + direct: + protocol: direct + test: + protocol: test-registry + +motan-service: + test01: + protocol: motan2 + provider: motan2 + group: hello + path: helloService + registry: test + serialization: simple + export: motan2:12282 + check: true +` + extFactory := GetDefaultExtFactory() + extFactory.RegistExtRegistry("test-registry", func(url *core.URL) core.Registry { + return &testRegistry{url: url} + }) + config1, err := config.NewConfigFromReader(bytes.NewReader([]byte(template))) + assert.Nil(t, err) + agent := NewAgent(extFactory) + go agent.StartMotanAgentFromConfig(config1) + time.Sleep(time.Second * 10) + + setRegistryFailSwitcher(true) + m := agent.GetRegistryStatus() + assert.Equal(t, len(m), 1) + for _, mm := range m[0] { + if mm.Service.Path == "helloService" { + assert.Equal(t, mm.Status, core.NotRegister) + } + } + agentStatus := getCurAgentStatus(12604) + assert.Equal(t, agentStatus, core.NotRegister) + agent.SetAllServicesAvailable() + m = agent.GetRegistryStatus() + for _, mm := range m[0] { + if mm.Service.Path == "helloService" { + assert.Equal(t, mm.Status, core.RegisterFailed) + } + } + agentStatus = getCurAgentStatus(12604) + assert.Equal(t, agentStatus, core.RegisterFailed) + setRegistryFailSwitcher(false) + time.Sleep(registry.DefaultFailbackInterval * time.Millisecond) + m = agent.GetRegistryStatus() + assert.Equal(t, len(m), 1) + for _, mm := range m[0] { + if mm.Service.Path == "helloService" { + assert.Equal(t, mm.Status, core.RegisterSuccess) + } + } + agentStatus = getCurAgentStatus(12604) + assert.Equal(t, agentStatus, core.RegisterSuccess) + setRegistryFailSwitcher(true) + agent.SetAllServicesUnavailable() + m = agent.GetRegistryStatus() + assert.Equal(t, len(m), 1) + for _, mm := range m[0] { + if mm.Service.Path == "helloService" { + assert.Equal(t, mm.Status, core.UnregisterFailed) + } + } + agentStatus = getCurAgentStatus(12604) + assert.Equal(t, agentStatus, core.UnregisterFailed) + setRegistryFailSwitcher(false) + time.Sleep(registry.DefaultFailbackInterval * time.Millisecond) + m = agent.GetRegistryStatus() + assert.Equal(t, len(m), 1) + for _, mm := range m[0] { + if mm.Service.Path == "helloService" { + assert.Equal(t, mm.Status, core.UnregisterSuccess) + } + } + agentStatus = getCurAgentStatus(12604) + assert.Equal(t, agentStatus, core.UnregisterSuccess) +} + +type testRegistry struct { + url *core.URL + namingServiceStatus *core.CopyOnWriteMap + registeredServices map[string]*core.URL +} + +func (t *testRegistry) Initialize() { + t.registeredServices = make(map[string]*core.URL) + t.namingServiceStatus = core.NewCopyOnWriteMap() +} + +func (t *testRegistry) GetName() string { + return "test-registry" +} + +func (t *testRegistry) GetURL() *core.URL { + return t.url +} + +func (t *testRegistry) SetURL(url *core.URL) { + t.url = url +} + +func (t *testRegistry) Subscribe(url *core.URL, listener core.NotifyListener) { +} + +func (t *testRegistry) Unsubscribe(url *core.URL, listener core.NotifyListener) { +} + +func (t *testRegistry) Discover(url *core.URL) []*core.URL { + return nil +} + +func (t *testRegistry) Register(serverURL *core.URL) { + t.registeredServices[serverURL.GetIdentity()] = serverURL + t.namingServiceStatus.Store(serverURL.GetIdentity(), &core.RegistryStatus{ + Status: core.NotRegister, + Service: serverURL, + Registry: t, + IsCheck: registry.IsCheck(serverURL), + }) + +} + +func (t *testRegistry) UnRegister(serverURL *core.URL) { + delete(t.registeredServices, serverURL.GetIdentity()) + t.namingServiceStatus.Delete(serverURL.GetIdentity()) +} + +func (t *testRegistry) Available(serverURL *core.URL) { + if getRegistryFailSwitcher() { + for _, u := range t.registeredServices { + t.namingServiceStatus.Store(u.GetIdentity(), &core.RegistryStatus{ + Status: core.RegisterFailed, + Registry: t, + Service: u, + ErrMsg: "error", + IsCheck: registry.IsCheck(u), + }) + } + } else { + for _, u := range t.registeredServices { + t.namingServiceStatus.Store(u.GetIdentity(), &core.RegistryStatus{ + Status: core.RegisterSuccess, + Registry: t, + Service: u, + IsCheck: registry.IsCheck(u), + }) + } + } +} + +func (t *testRegistry) Unavailable(serverURL *core.URL) { + if getRegistryFailSwitcher() { + for _, u := range t.registeredServices { + t.namingServiceStatus.Store(u.GetIdentity(), &core.RegistryStatus{ + Status: core.UnregisterFailed, + Registry: t, + Service: u, + ErrMsg: "error", + IsCheck: registry.IsCheck(u), + }) + } + } else { + for _, u := range t.registeredServices { + t.namingServiceStatus.Store(u.GetIdentity(), &core.RegistryStatus{ + Status: core.UnregisterSuccess, + Registry: t, + Service: u, + IsCheck: registry.IsCheck(u), + }) + } + } +} + +func (t *testRegistry) GetRegisteredServices() []*core.URL { + return nil +} + +func (t *testRegistry) StartSnapshot(conf *core.SnapshotConf) { +} + +func (t *testRegistry) GetRegistryStatus() map[string]*core.RegistryStatus { + res := make(map[string]*core.RegistryStatus) + t.namingServiceStatus.Range(func(k, v interface{}) bool { + res[k.(string)] = v.(*core.RegistryStatus) + return true + }) + return res +} + +func setRegistryFailSwitcher(b bool) { + if b { + atomic.StoreInt64(&testRegistryFailSwitcher, 1) + } else { + atomic.StoreInt64(&testRegistryFailSwitcher, 0) + } + +} + +func getRegistryFailSwitcher() bool { + return atomic.LoadInt64(&testRegistryFailSwitcher) == 1 +} + +func getCurAgentStatus(port int64) string { + type ( + Result struct { + Status string `json:"status"` + RegistryStatus interface{} `json:"registryStatus"` + } + ) + client := http.Client{ + Timeout: time.Second * 3, + } + resp, err := client.Get(fmt.Sprintf("http://127.0.0.1:%d/registry/status", port)) + if err != nil { + return err.Error() + } + defer resp.Body.Close() + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err.Error() + } + res := Result{} + err = json.Unmarshal(b, &res) + if err != nil { + return err.Error() + } + return res.Status +} diff --git a/cluster/command.go b/cluster/command.go index 5cd7da20..f49ad7af 100644 --- a/cluster/command.go +++ b/cluster/command.go @@ -166,6 +166,13 @@ func GetCommandRegistryWrapper(cluster *MotanCluster, registry motan.Registry) m return cmdRegistry } +func (c *CommandRegistryWrapper) GetRegistryStatus() map[string]*motan.RegistryStatus { + if v, ok := c.registry.(motan.RegistryStatusManager); ok { + return v.GetRegistryStatus() + } + return nil +} + func (c *CommandRegistryWrapper) Register(serverURL *motan.URL) { c.registry.Register(serverURL) } diff --git a/core/constants.go b/core/constants.go index 2876ea27..20ec7e76 100644 --- a/core/constants.go +++ b/core/constants.go @@ -77,6 +77,15 @@ const ( ConsistentHashKey = "consistentHashKey" //string used to calculate consistent hash ) +// registryStatus +const ( + RegisterSuccess = "register-success" + RegisterFailed = "register-failed" + UnregisterSuccess = "unregister-success" + UnregisterFailed = "unregister-failed" + NotRegister = "not-register" +) + // nodeType const ( NodeTypeService = "service" diff --git a/core/motan.go b/core/motan.go index d52b5de7..adfd9016 100644 --- a/core/motan.go +++ b/core/motan.go @@ -197,6 +197,18 @@ type Registry interface { SnapshotService } +type RegistryStatusManager interface { + GetRegistryStatus() map[string]*RegistryStatus +} + +type RegistryStatus struct { + Status string + Service *URL + Registry RegisterService + ErrMsg string + IsCheck bool +} + // NotifyListener : NotifyListener type NotifyListener interface { Identity diff --git a/default.go b/default.go index e681a818..52e41cd7 100644 --- a/default.go +++ b/default.go @@ -41,6 +41,7 @@ func GetDefaultManageHandlers() map[string]http.Handler { defaultManageHandlers["/503"] = status defaultManageHandlers["/version"] = status defaultManageHandlers["/status"] = status + defaultManageHandlers["/registry/status"] = status info := &InfoHandler{} defaultManageHandlers["/getConfig"] = info @@ -59,6 +60,11 @@ func GetDefaultManageHandlers() map[string]http.Handler { defaultManageHandlers["/debug/stat/process"] = debug defaultManageHandlers["/debug/stat/openFiles"] = debug defaultManageHandlers["/debug/stat/connections"] = debug + defaultManageHandlers["/debug/pprof/allocs"] = debug + defaultManageHandlers["/debug/pprof/block"] = debug + defaultManageHandlers["/debug/pprof/goroutine"] = debug + defaultManageHandlers["/debug/pprof/mutex"] = debug + defaultManageHandlers["/debug/pprof/heap"] = debug switcher := &SwitcherHandler{} defaultManageHandlers["/switcher/set"] = switcher diff --git a/manageHandler.go b/manageHandler.go index 4a4a8365..98fd4208 100644 --- a/manageHandler.go +++ b/manageHandler.go @@ -11,12 +11,14 @@ import ( "math" "math/rand" "net/http" + nppf "net/http/pprof" "os" "runtime" "runtime/pprof" "runtime/trace" "strconv" "strings" + "sync/atomic" "time" "github.com/shirou/gopsutil/v3/cpu" @@ -65,12 +67,72 @@ func (s *StatusHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { rw.Write([]byte(Version)) case "/status": rw.Write(s.getStatus()) + case "/registry/status": + rw.Write(s.getRegistryStatus()) default: - rw.WriteHeader(s.a.status) - rw.Write([]byte(http.StatusText(s.a.status))) + rw.WriteHeader(int(s.a.status)) + rw.Write([]byte(http.StatusText(int(s.a.status)))) } } +func (s *StatusHandler) getRegistryStatus() []byte { + type ( + ResultStatus struct { + Group string `json:"group"` + Service string `json:"service"` + Registry string `json:"registry"` + Status string `json:"status"` + ErrMsg string `json:"errMsg"` + IsCheck bool `json:"isCheck"` + } + Result struct { + Status string `json:"status"` + RegistryStatus []ResultStatus `json:"registryStatus"` + } + ) + statuses := s.a.GetRegistryStatus() + var res []ResultStatus + curAgentStatus := atomic.LoadInt64(&s.a.status) + var resStatus string + if curAgentStatus == http.StatusOK { + resStatus = motan.RegisterSuccess + } else { + resStatus = motan.UnregisterSuccess + } + for _, j := range statuses { + for _, k := range j { + res = append(res, ResultStatus{ + Group: k.Service.Group, + Service: k.Service.Path, + Registry: k.Service.GetParam(motan.RegistryKey, ""), + Status: k.Status, + ErrMsg: k.ErrMsg, + IsCheck: k.IsCheck, + }) + if k.IsCheck { + if curAgentStatus == http.StatusOK { + if k.Status == motan.RegisterFailed { + resStatus = k.Status + } else if k.Status == motan.NotRegister && resStatus != motan.RegisterFailed { + resStatus = k.Status + } + } else { + if k.Status == motan.UnregisterFailed { + resStatus = k.Status + } else if k.Status == motan.NotRegister && resStatus != motan.UnregisterFailed { + resStatus = k.Status + } + } + } + } + } + resByte, _ := json.Marshal(Result{ + Status: resStatus, + RegistryStatus: res, + }) + return resByte +} + func (s *StatusHandler) getStatus() []byte { type ( MethodStatus struct { @@ -89,7 +151,7 @@ func (s *StatusHandler) getStatus() []byte { } ) result := Result{ - Status: s.a.status, + Status: int(s.a.status), Services: make([]ServiceStatus, 0, 16), } s.a.serviceExporters.Range(func(k, v interface{}) bool { @@ -223,6 +285,16 @@ func (d *DebugHandler) ServeHTTP(rw http.ResponseWriter, req *http.Request) { Symbol(rw, req) case "/debug/pprof/trace": Trace(rw, req) + case "/debug/pprof/allocs": + nppf.Handler("allocs").ServeHTTP(rw, req) + case "/debug/pprof/block": + nppf.Handler("block").ServeHTTP(rw, req) + case "/debug/pprof/goroutine": + nppf.Handler("goroutine").ServeHTTP(rw, req) + case "/debug/pprof/mutex": + nppf.Handler("mutex").ServeHTTP(rw, req) + case "/debug/pprof/heap": + nppf.Handler("heap").ServeHTTP(rw, req) case "/debug/mesh/trace": MeshTrace(rw, req) case "/debug/stat/system": diff --git a/registry/registry.go b/registry/registry.go index 5d79171f..0dca1d32 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -5,6 +5,7 @@ import ( "io/ioutil" "os" "path/filepath" + "strconv" "sync" "time" "unsafe" @@ -18,9 +19,10 @@ const ( DefaultHeartbeatInterval = 10 * 1000 //ms DefaultTimeout = 3 * 1000 //ms DefaultSnapshotDir = "./snapshot" + DefaultFailbackInterval = 30 * 1000 //ms ) -//ext name +// ext name const ( Direct = "direct" Local = "local" @@ -133,6 +135,18 @@ func IsAgent(url *motan.URL) bool { return isAgent } +func IsCheck(url *motan.URL) bool { + isCheck := false + var err error + if t, ok := url.Parameters["check"]; ok { + isCheck, err = strconv.ParseBool(t) + if err != nil { + return false + } + } + return isCheck +} + func GetSubKey(url *motan.URL) string { return url.Group + "/" + url.Path + "/service" } diff --git a/server.go b/server.go index f36436a5..808b8e82 100644 --- a/server.go +++ b/server.go @@ -5,16 +5,18 @@ import ( "flag" "fmt" "github.com/weibocom/motan-go/config" + motan "github.com/weibocom/motan-go/core" + "github.com/weibocom/motan-go/log" "github.com/weibocom/motan-go/provider" + "github.com/weibocom/motan-go/registry" + mserver "github.com/weibocom/motan-go/server" "hash/fnv" + "net/http" "reflect" "strconv" "strings" "sync" - - motan "github.com/weibocom/motan-go/core" - "github.com/weibocom/motan-go/log" - mserver "github.com/weibocom/motan-go/server" + "time" ) // MSContext is Motan Server Context @@ -26,9 +28,10 @@ type MSContext struct { portServer map[string]motan.Server serviceImpls map[string]interface{} registries map[string]motan.Registry // all registries used for services - - csync sync.Mutex - inited bool + registryLock sync.Mutex + status int + csync sync.Mutex + inited bool } const ( @@ -102,6 +105,7 @@ func (m *MSContext) Start(extfactory motan.ExtensionFactory) { for _, url := range m.context.ServiceURLs { m.export(url) } + go m.startRegistryFailback() } func (m *MSContext) hashInt(s string) int { @@ -238,14 +242,58 @@ func (m *MSContext) RegisterService(s interface{}, sid string) error { // ServicesAvailable will enable all service registed in registries func (m *MSContext) ServicesAvailable() { // TODO: same as agent + m.registryLock.Lock() + defer m.registryLock.Unlock() + m.status = http.StatusOK availableService(m.registries) } -// ServicesUnavailable will enable all service registed in registries +// ServicesUnavailable will enable all service registered in registries func (m *MSContext) ServicesUnavailable() { + m.registryLock.Lock() + defer m.registryLock.Unlock() + m.status = http.StatusServiceUnavailable unavailableService(m.registries) } +func (m *MSContext) GetRegistryStatus() []map[string]*motan.RegistryStatus { + m.registryLock.Lock() + defer m.registryLock.Unlock() + var res []map[string]*motan.RegistryStatus + for _, v := range m.registries { + if vv, ok := v.(motan.RegistryStatusManager); ok { + res = append(res, vv.GetRegistryStatus()) + } + } + return res +} + +func (m *MSContext) startRegistryFailback() { + vlog.Infoln("start MSContext registry failback") + ticker := time.NewTicker(registry.DefaultFailbackInterval * time.Millisecond) + defer ticker.Stop() + for range ticker.C { + m.registryLock.Lock() + for _, v := range m.registries { + if vv, ok := v.(motan.RegistryStatusManager); ok { + statusMap := vv.GetRegistryStatus() + for _, j := range statusMap { + if m.status == http.StatusOK && j.Status == motan.RegisterFailed { + vlog.Infoln(fmt.Sprintf("detect register fail, do register again, service: %s", j.Service.GetIdentity())) + v.(motan.Registry).Available(j.Service) + } else if m.status == http.StatusServiceUnavailable && j.Status == motan.UnregisterFailed { + vlog.Infoln(fmt.Sprintf("detect unregister fail, do unregister again, service: %s", j.Service.GetIdentity())) + v.(motan.Registry).Unavailable(j.Service) + } + } + } + + } + m.registryLock.Unlock() + } + +} + func canShareChannel(u1 motan.URL, u2 motan.URL) bool { if u1.Protocol != u2.Protocol { return false diff --git a/server_test.go b/server_test.go index b028d4de..3ee7b110 100644 --- a/server_test.go +++ b/server_test.go @@ -6,6 +6,7 @@ import ( assert2 "github.com/stretchr/testify/assert" "github.com/weibocom/motan-go/config" motan "github.com/weibocom/motan-go/core" + "github.com/weibocom/motan-go/registry" "testing" "time" ) @@ -93,6 +94,88 @@ motan-server: request.Attachment = motan.NewStringMap(motan.DefaultAttachmentSize) } +func TestServerRegisterFailBack(t *testing.T) { + assert := assert2.New(t) + cfgText := ` +motan-server: + log_dir: "stdout" + application: "app-golang" # server identify. + +motan-registry: + direct: + protocol: direct + test: + protocol: test-registry + +#conf of services +motan-service: + mytest-motan2: + path: testpath + group: testgroup + protocol: motan2 + registry: test + serialization: simple + ref : "serviceID" + check: true + export: "motan2:14564" +` + conf, err := config.NewConfigFromReader(bytes.NewReader([]byte(cfgText))) + assert.Nil(err) + + extFactory := GetDefaultExtFactory() + extFactory.RegistExtRegistry("test-registry", func(url *motan.URL) motan.Registry { + return &testRegistry{url: url} + }) + mscontext := NewMotanServerContextFromConfig(conf) + err = mscontext.RegisterService(&HelloService{}, "serviceID") + assert.Nil(err) + mscontext.Start(extFactory) + time.Sleep(time.Second * 3) + m := mscontext.GetRegistryStatus() + assert.Equal(len(m), 1) + for _, mm := range m[0] { + if mm.Service.Path == "testpath" { + assert.Equal(mm.Status, motan.NotRegister) + } + } + setRegistryFailSwitcher(true) + mscontext.ServicesAvailable() + m = mscontext.GetRegistryStatus() + assert.Equal(len(m), 1) + for _, mm := range m[0] { + if mm.Service.Path == "testpath" { + assert.Equal(mm.Status, motan.RegisterFailed) + } + } + setRegistryFailSwitcher(false) + time.Sleep(registry.DefaultFailbackInterval * time.Millisecond) + m = mscontext.GetRegistryStatus() + assert.Equal(len(m), 1) + for _, mm := range m[0] { + if mm.Service.Path == "testpath" { + assert.Equal(mm.Status, motan.RegisterSuccess) + } + } + setRegistryFailSwitcher(true) + mscontext.ServicesUnavailable() + m = mscontext.GetRegistryStatus() + assert.Equal(len(m), 1) + for _, mm := range m[0] { + if mm.Service.Path == "testpath" { + assert.Equal(mm.Status, motan.UnregisterFailed) + } + } + setRegistryFailSwitcher(false) + time.Sleep(registry.DefaultFailbackInterval * time.Millisecond) + m = mscontext.GetRegistryStatus() + assert.Equal(len(m), 1) + for _, mm := range m[0] { + if mm.Service.Path == "testpath" { + assert.Equal(mm.Status, motan.UnregisterSuccess) + } + } +} + func TestNewMotanServerContextFromConfig(t *testing.T) { assert := assert2.New(t)