Skip to content

Commit

Permalink
update: (#314)
Browse files Browse the repository at this point in the history
after notify, offline endpoints will be destroyed asynchronously after 2s delay
  • Loading branch information
Hoofffman authored Jul 6, 2023
1 parent f409348 commit 6b17456
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 8 deletions.
10 changes: 7 additions & 3 deletions cluster/motanCluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,9 +191,13 @@ func (m *MotanCluster) Notify(registryURL *motan.URL, urls []*motan.URL) {
m.registryRefers[registryURL.GetIdentity()] = endpoints
}
m.refresh()
for _, ep := range endpointMap {
ep.Destroy()
}
go func() {
defer motan.HandlePanic(nil)
time.Sleep(time.Second * 2)
for _, ep := range endpointMap {
ep.Destroy()
}
}()
}

// remove rule protocol && set weight
Expand Down
25 changes: 23 additions & 2 deletions cluster/motanCluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/weibocom/motan-go/registry"
"os"
"testing"
"time"

motan "github.com/weibocom/motan-go/core"
"github.com/weibocom/motan-go/ha"
Expand Down Expand Up @@ -72,7 +73,27 @@ func TestNotify(t *testing.T) {
if len(cluster.Refers) == 0 {
t.Fatalf("cluster notify-refers size not correct. expect :2, refers size:%d", len(cluster.Refers))
}

urls = append(urls, &motan.URL{Host: "127.0.0.1", Port: 8001, Protocol: "test"})
var destroyEndpoint motan.EndPoint
for _, j := range cluster.Refers {
if j.GetURL().Port == 8002 {
destroyEndpoint = j
}
}
if destroyEndpoint == nil {
t.Fatalf("cluster endpoint is nil")
}
if !destroyEndpoint.IsAvailable() {
t.Fatalf("cluster endpoint should be not available")
}
cluster.Notify(RegistryURL, urls)
time.Sleep(time.Second * 3)
if len(cluster.Refers) != 1 {
t.Fatalf("cluster notify-refers size not correct. expect :2, refers size:%d", len(cluster.Refers))
}
if destroyEndpoint.IsAvailable() {
t.Fatalf("cluster endpoint should not be available")
}
}

func TestCall(t *testing.T) {
Expand Down Expand Up @@ -123,7 +144,7 @@ func TestParseRegistryFromEnv(t *testing.T) {
}
}

//-------------test struct--------------------
// -------------test struct--------------------
func getCustomExt() motan.ExtensionFactory {
ext := &motan.DefaultExtensionFactory{}
ext.Initialize()
Expand Down
16 changes: 14 additions & 2 deletions core/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"errors"
"fmt"
"sync/atomic"
"time"
)

Expand Down Expand Up @@ -116,6 +117,7 @@ func (t *TestProvider) GetPath() string {
type TestEndPoint struct {
URL *URL
ProcessTime int64
available atomic.Value
}

func (t *TestEndPoint) GetURL() *URL {
Expand All @@ -137,10 +139,20 @@ func (t *TestEndPoint) Call(request Request) Response {
}

func (t *TestEndPoint) IsAvailable() bool {
return true
return t.available.Load().(bool)
}

func (t *TestEndPoint) Initialize() {
t.SetAvailable(true)
}

func (t *TestEndPoint) Destroy() {}
func (t *TestEndPoint) Destroy() {
t.SetAvailable(false)
}

func (t *TestEndPoint) SetAvailable(a bool) {
t.available.Store(a)
}

func (t *TestEndPoint) SetProxy(proxy bool) {}

Expand Down
4 changes: 3 additions & 1 deletion ha/backupRequestHA_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func TestBackupRequestHA_Call3(t *testing.T) {
}

func getEP(processTime int64) motan.EndPoint {
fep := &motan.FilterEndPoint{Caller: &motan.TestEndPoint{ProcessTime: processTime}}
caller := &motan.TestEndPoint{ProcessTime: processTime}
motan.Initialize(caller)
fep := &motan.FilterEndPoint{Caller: caller}
mf := &filter.MetricsFilter{}
mf.SetContext(&motan.Context{Config: config.NewConfig()})
mf.SetNext(motan.GetLastEndPointFilter())
Expand Down

0 comments on commit 6b17456

Please sign in to comment.