Skip to content

Commit

Permalink
feat: implement Reconcile - ability to change upstream list on the fly
Browse files Browse the repository at this point in the history
This is going to be used in Sfyra to provide dynamic load balancer for
the control plane.

Signed-off-by: Andrey Smirnov <[email protected]>
  • Loading branch information
smira authored and talos-bot committed Sep 9, 2020
1 parent 8b1dfa6 commit da8e987
Show file tree
Hide file tree
Showing 6 changed files with 255 additions and 4 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2020-09-01T20:34:11Z by kres ee90a80-dirty.
# Generated on 2020-09-09T15:36:30Z by kres 7e146df-dirty.

# common variables

Expand Down Expand Up @@ -34,7 +34,7 @@ COMMON_ARGS += --build-arg=USERNAME=$(USERNAME)
COMMON_ARGS += --build-arg=TOOLCHAIN=$(TOOLCHAIN)
COMMON_ARGS += --build-arg=GOFUMPT_VERSION=$(GOFUMPT_VERSION)
COMMON_ARGS += --build-arg=TESTPKGS=$(TESTPKGS)
TOOLCHAIN ?= docker.io/golang:1.14-alpine
TOOLCHAIN ?= docker.io/golang:1.15-alpine

# help menu

Expand Down
4 changes: 2 additions & 2 deletions hack/git-chglog/config.yaml
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# THIS FILE WAS AUTOMATICALLY GENERATED, PLEASE DO NOT EDIT.
#
# Generated on 2020-09-01T20:34:11Z by kres ee90a80-dirty.
# Generated on 2020-09-09T15:36:30Z by kres 7e146df-dirty.

style: github
template: CHANGELOG.tpl.md
info:
title: CHANGELOG
repository_url: https://github.com/talos-systems/talos
repository_url: https://github.com/talos-systems/go-loadbalancer
options:
commits:
# filters:
Expand Down
30 changes: 30 additions & 0 deletions loadbalancer/loadbalancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package loadbalancer

import (
"context"
"fmt"
"log"
"net"

Expand All @@ -25,6 +26,8 @@ import (
// Usage: call Run() to start lb and wait for shutdown, call Close() to shutdown lb.
type TCP struct {
tcpproxy.Proxy

routes map[string]*upstream.List
}

type lbUpstream string
Expand Down Expand Up @@ -76,6 +79,10 @@ func (target *lbTarget) HandleConn(conn net.Conn) {
// TCP automatically does background health checks for the upstreams and picks only healthy
// ones. Healthcheck is simple Dial attempt.
func (t *TCP) AddRoute(ipPort string, upstreamAddrs []string, options ...upstream.ListOption) error {
if t.routes == nil {
t.routes = make(map[string]*upstream.List)
}

upstreams := make([]upstream.Backend, len(upstreamAddrs))
for i := range upstreams {
upstreams[i] = lbUpstream(upstreamAddrs[i])
Expand All @@ -86,7 +93,30 @@ func (t *TCP) AddRoute(ipPort string, upstreamAddrs []string, options ...upstrea
return err
}

t.routes[ipPort] = list

t.Proxy.AddRoute(ipPort, &lbTarget{list: list})

return nil
}

// ReconcileRoute updates the list of upstreamAddrs for the specified route (ipPort).
func (t *TCP) ReconcileRoute(ipPort string, upstreamAddrs []string) error {
if t.routes == nil {
t.routes = make(map[string]*upstream.List)
}

list := t.routes[ipPort]
if list == nil {
return fmt.Errorf("handler not registered for %q", ipPort)
}

upstreams := make([]upstream.Backend, len(upstreamAddrs))
for i := range upstreams {
upstreams[i] = lbUpstream(upstreamAddrs[i])
}

list.Reconcile(upstreams)

return nil
}
96 changes: 96 additions & 0 deletions loadbalancer/loadbalancer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,102 @@ type TCPSuite struct {
suite.Suite
}

func (suite *TCPSuite) TestReconcile() {
const (
upstreamCount = 5
pivot = 2
)

upstreams := make([]mockUpstream, upstreamCount)
for i := range upstreams {
upstreams[i].identity = strconv.Itoa(i)
suite.Require().NoError(upstreams[i].Start())
}

upstreamAddrs := make([]string, len(upstreams))
for i := range upstreamAddrs {
upstreamAddrs[i] = upstreams[i].addr
}

listenAddr, err := findListenAddress()
suite.Require().NoError(err)

lb := &loadbalancer.TCP{}
suite.Require().NoError(lb.AddRoute(
listenAddr,
upstreamAddrs[:pivot],
upstream.WithLowHighScores(-3, 3),
upstream.WithInitialScore(1),
upstream.WithScoreDeltas(-1, 1),
upstream.WithHealthcheckInterval(time.Second),
upstream.WithHealthcheckTimeout(100*time.Millisecond),
))

suite.Require().NoError(lb.Start())

var wg sync.WaitGroup

wg.Add(1)

go func() {
defer wg.Done()

lb.Wait() //nolint: errcheck
}()

for i := 0; i < 5*pivot; i++ {
c, err := net.Dial("tcp", listenAddr)
suite.Require().NoError(err)

id, err := ioutil.ReadAll(c)
suite.Require().NoError(err)

// load balancer should go round-robin across all the upstreams [0:pivot]
suite.Assert().Equal([]byte(strconv.Itoa(i%pivot)), id)

suite.Require().NoError(c.Close())
}

// reconcile the list
suite.Require().NoError(lb.ReconcileRoute(listenAddr, upstreamAddrs[pivot:]))

// bring down pre-pivot upstreams
for i := 0; i < pivot; i++ {
upstreams[i].Close()
}

upstreamsUsed := map[int64]int{}

for i := 0; i < 10*(upstreamCount-pivot); i++ {
c, err := net.Dial("tcp", listenAddr)
suite.Require().NoError(err)

id, err := ioutil.ReadAll(c)
suite.Require().NoError(err)

// load balancer should go round-robin across all the upstreams [pivot:]
no, err := strconv.ParseInt(string(id), 10, 32)
suite.Require().NoError(err)

suite.Assert().Less(no, int64(upstreamCount))
suite.Assert().GreaterOrEqual(no, int64(pivot))
upstreamsUsed[no]++

suite.Require().NoError(c.Close())
}

for _, count := range upstreamsUsed {
suite.Assert().Equal(10, count)
}

suite.Require().NoError(lb.Close())
wg.Wait()

for i := range upstreams {
upstreams[i].Close()
}
}

func (suite *TCPSuite) TestBalancer() {
const (
upstreamCount = 5
Expand Down
33 changes: 33 additions & 0 deletions upstream/upstream.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,39 @@ func NewList(upstreams []Backend, options ...ListOption) (*List, error) {
return list, nil
}

// Reconcile the list of backends with passed list.
//
// Any new backends are added with initial score, score is untouched
// for backends which haven't changed their score.
func (list *List) Reconcile(upstreams []Backend) {
newUpstreams := make(map[Backend]struct{}, len(upstreams))

for _, upstream := range upstreams {
newUpstreams[upstream] = struct{}{}
}

list.mu.Lock()
defer list.mu.Unlock()

for i := 0; i < len(list.nodes); i++ {
if _, exists := newUpstreams[list.nodes[i].backend]; exists {
delete(newUpstreams, list.nodes[i].backend)

continue
}

list.nodes = append(list.nodes[:i], list.nodes[i+1:]...)
i--
}

for upstream := range newUpstreams {
list.nodes = append(list.nodes, node{
backend: upstream,
score: list.initialScore,
})
}
}

// Shutdown stops healthchecks.
func (list *List) Shutdown() {
list.healthCtxCancel()
Expand Down
92 changes: 92 additions & 0 deletions upstream/upstream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,98 @@ func (suite *ListSuite) TestHealthcheck() {
}))
}

func (suite *ListSuite) TestReconcile() {
l, err := upstream.NewList(
[]upstream.Backend{
mockBackend("one"),
mockBackend("two"),
mockBackend("three"),
},
upstream.WithLowHighScores(-3, 3),
upstream.WithInitialScore(1),
upstream.WithScoreDeltas(-1, 1),
upstream.WithHealthcheckInterval(time.Hour),
)
suite.Require().NoError(err)

defer l.Shutdown()

backend, err := l.Pick()
suite.Assert().Equal(mockBackend("one"), backend)
suite.Assert().NoError(err)

l.Reconcile([]upstream.Backend{
mockBackend("one"),
mockBackend("two"),
mockBackend("three"),
})

backend, err = l.Pick()
suite.Assert().Equal(mockBackend("two"), backend)
suite.Assert().NoError(err)

l.Reconcile([]upstream.Backend{
mockBackend("one"),
mockBackend("two"),
mockBackend("four"),
})

backend, err = l.Pick()
suite.Assert().Equal(mockBackend("four"), backend)
suite.Assert().NoError(err)

l.Reconcile([]upstream.Backend{
mockBackend("five"),
mockBackend("six"),
mockBackend("four"),
})

backend, err = l.Pick()
suite.Assert().Equal(mockBackend("four"), backend)
suite.Assert().NoError(err)

backend, err = l.Pick()
suite.Assert().Equal(mockBackend("five"), backend)
suite.Assert().NoError(err)

backend, err = l.Pick()
suite.Assert().Equal(mockBackend("six"), backend)
suite.Assert().NoError(err)

l.Down(mockBackend("four")) // score == 2
l.Down(mockBackend("four")) // score == 1
l.Down(mockBackend("four")) // score == 0
l.Down(mockBackend("four")) // score == -1

backend, err = l.Pick()
suite.Assert().Equal(mockBackend("five"), backend)
suite.Assert().NoError(err)

backend, err = l.Pick()
suite.Assert().Equal(mockBackend("six"), backend)
suite.Assert().NoError(err)

l.Reconcile([]upstream.Backend{
mockBackend("five"),
mockBackend("six"),
mockBackend("four"),
})

backend, err = l.Pick()
suite.Assert().Equal(mockBackend("five"), backend)
suite.Assert().NoError(err)

backend, err = l.Pick()
suite.Assert().Equal(mockBackend("six"), backend)
suite.Assert().NoError(err)

l.Reconcile(nil)

backend, err = l.Pick()
suite.Assert().Nil(backend)
suite.Assert().EqualError(err, "no upstreams available")
}

func TestListSuite(t *testing.T) {
suite.Run(t, new(ListSuite))
}

0 comments on commit da8e987

Please sign in to comment.