Skip to content

Commit

Permalink
Update (#316)
Browse files Browse the repository at this point in the history
after notify, offline endpoints will be destroyed asynchronously after 2s delay
  • Loading branch information
Hoofffman authored Nov 10, 2023
1 parent 9f8bdae commit 890edb9
Show file tree
Hide file tree
Showing 10 changed files with 571 additions and 27 deletions.
68 changes: 57 additions & 11 deletions agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"flag"
"fmt"
"github.com/weibocom/motan-go/endpoint"
vlog "github.com/weibocom/motan-go/log"
"gopkg.in/yaml.v2"
"io/ioutil"
"net"
"net/http"
Expand All @@ -15,15 +17,12 @@ import (
"sync/atomic"
"time"

cfg "github.com/weibocom/motan-go/config"
"gopkg.in/yaml.v2"

"github.com/shirou/gopsutil/v3/process"
"github.com/valyala/fasthttp"
"github.com/weibocom/motan-go/cluster"
cfg "github.com/weibocom/motan-go/config"
motan "github.com/weibocom/motan-go/core"
mhttp "github.com/weibocom/motan-go/http"
"github.com/weibocom/motan-go/log"
"github.com/weibocom/motan-go/metrics"
mpro "github.com/weibocom/motan-go/protocol"
"github.com/weibocom/motan-go/registry"
Expand Down Expand Up @@ -59,7 +58,7 @@ type Agent struct {

clusterMap *motan.CopyOnWriteMap
httpClusterMap *motan.CopyOnWriteMap
status int
status int64
agentURL *motan.URL
logdir string
port int
Expand All @@ -77,8 +76,9 @@ type Agent struct {

manageHandlers map[string]http.Handler

svcLock sync.Mutex
clsLock sync.Mutex
svcLock sync.Mutex
clsLock sync.Mutex
registryLock sync.Mutex

configurer *DynamicConfigurer

Expand Down Expand Up @@ -157,13 +157,13 @@ func (a *Agent) GetAgentServer() motan.Server {

func (a *Agent) SetAllServicesAvailable() {
a.availableAllServices()
a.status = http.StatusOK
atomic.StoreInt64(&a.status, http.StatusOK)
a.saveStatus()
}

func (a *Agent) SetAllServicesUnavailable() {
a.unavailableAllServices()
a.status = http.StatusServiceUnavailable
atomic.StoreInt64(&a.status, http.StatusServiceUnavailable)
a.saveStatus()
}

Expand Down Expand Up @@ -204,21 +204,63 @@ func (a *Agent) StartMotanAgentFromConfig(config *cfg.Config) {
a.configurer = NewDynamicConfigurer(a)
go a.startMServer()
go a.registerAgent()
go a.startRegistryFailback()
f, err := os.Create(a.pidfile)
if err != nil {
vlog.Errorf("create file %s fail.", a.pidfile)
} else {
defer f.Close()
f.WriteString(strconv.Itoa(os.Getpid()))
}
if a.status == http.StatusOK {
if atomic.LoadInt64(&a.status) == http.StatusOK {
// recover form a unexpected case
a.availableAllServices()
}
vlog.Infoln("Motan agent is starting...")
a.startAgent()
}

func (a *Agent) startRegistryFailback() {
vlog.Infoln("start agent failback")
ticker := time.NewTicker(registry.DefaultFailbackInterval * time.Millisecond)
defer ticker.Stop()
for range ticker.C {
a.registryLock.Lock()
a.serviceRegistries.Range(func(k, v interface{}) bool {
if vv, ok := v.(motan.RegistryStatusManager); ok {
statusMap := vv.GetRegistryStatus()
for _, j := range statusMap {
curStatus := atomic.LoadInt64(&a.status)
if curStatus == http.StatusOK && j.Status == motan.RegisterFailed {
vlog.Infoln(fmt.Sprintf("detect agent register fail, do register again, service: %s", j.Service.GetIdentity()))
v.(motan.Registry).Available(j.Service)
} else if curStatus == http.StatusServiceUnavailable && j.Status == motan.UnregisterFailed {
vlog.Infoln(fmt.Sprintf("detect agent unregister fail, do unregister again, service: %s", j.Service.GetIdentity()))
v.(motan.Registry).Unavailable(j.Service)
}
}
}
return true
})
a.registryLock.Unlock()
}

}

func (a *Agent) GetRegistryStatus() []map[string]*motan.RegistryStatus {
a.registryLock.Lock()
defer a.registryLock.Unlock()
var res []map[string]*motan.RegistryStatus
a.serviceRegistries.Range(func(k, v interface{}) bool {
if vv, ok := v.(motan.RegistryStatusManager); ok {
statusMap := vv.GetRegistryStatus()
res = append(res, statusMap)
}
return true
})
return res
}

func (a *Agent) registerStatusSampler() {
metrics.RegisterStatusSampleFunc("memory", func() int64 {
p, _ := process.NewProcess(int32(os.Getpid()))
Expand Down Expand Up @@ -252,7 +294,7 @@ func (a *Agent) initStatus() {
key := metrics.DefaultStatRole + metrics.KeyDelimiter + application + metrics.KeyDelimiter + "abnormal_exit.total_count"
metrics.AddCounter(metrics.DefaultStatGroup, metrics.DefaultStatService, key, 1)
} else {
a.status = http.StatusServiceUnavailable
atomic.StoreInt64(&a.status, http.StatusServiceUnavailable)
}
}

Expand Down Expand Up @@ -829,13 +871,17 @@ func (a *Agent) startServerAgent() {
}

func (a *Agent) availableAllServices() {
a.registryLock.Lock()
defer a.registryLock.Unlock()
a.serviceRegistries.Range(func(k, v interface{}) bool {
v.(motan.Registry).Available(nil)
return true
})
}

func (a *Agent) unavailableAllServices() {
a.registryLock.Lock()
defer a.registryLock.Unlock()
a.serviceRegistries.Range(func(k, v interface{}) bool {
v.(motan.Registry).Unavailable(nil)
return true
Expand Down
Loading

0 comments on commit 890edb9

Please sign in to comment.