Skip to content

Commit

Permalink
Make heavy loaded optimization configurable
Browse files Browse the repository at this point in the history
Scylla Go Driver has a capability to avoid sending requests to an
overloaded shard, instead sending the request on a different connection
(at the same node).

This change makes it possible to customize the parameters used to
determine when this behavior would kick in.
  • Loading branch information
dkropachev committed Aug 20, 2024
1 parent 00277be commit 014cb35
Show file tree
Hide file tree
Showing 6 changed files with 146 additions and 33 deletions.
81 changes: 61 additions & 20 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package gocql
import (
"context"
"errors"
"fmt"
"net"
"time"
)
Expand Down Expand Up @@ -93,6 +94,24 @@ type ClusterConfig struct {
// Default: 128 for older CQL versions
MaxRequestsPerConn int

// Threshold for the number of inflight requests per connection
// after which the connection is considered as heavy loaded
// Default: 512
HeavyLoadedConnectionThreshold int

// When a connection is considered as heavy loaded, the driver
// could switch to the least loaded connection for the same node.
// The switch will happen if the other connection is at least
// HeavyLoadedSwitchConnectionPercentage percentage less busy
// (in terms of inflight requests).
//
// For the default value of 20%, if the heavy loaded connection
// has 100 inflight requests, the switch will happen only if the
// least busy connection has less than 80 inflight requests.
//
// Default: 20%
HeavyLoadedSwitchConnectionPercentage int

// Default consistency level.
// Default: Quorum
Consistency Consistency
Expand Down Expand Up @@ -288,26 +307,28 @@ type Dialer interface {
// the same host, and will not mark the node being down or up from events.
func NewCluster(hosts ...string) *ClusterConfig {
cfg := &ClusterConfig{
Hosts: hosts,
CQLVersion: "3.0.0",
Timeout: 11 * time.Second,
ConnectTimeout: 11 * time.Second,
Port: 9042,
NumConns: 2,
Consistency: Quorum,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
PageSize: 5000,
DefaultTimestamp: true,
DriverName: defaultDriverName,
DriverVersion: defaultDriverVersion,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
SocketKeepalive: 15 * time.Second,
WriteCoalesceWaitTime: 200 * time.Microsecond,
MetadataSchemaRequestTimeout: 60 * time.Second,
Hosts: hosts,
CQLVersion: "3.0.0",
Timeout: 11 * time.Second,
ConnectTimeout: 11 * time.Second,
Port: 9042,
NumConns: 2,
Consistency: Quorum,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
PageSize: 5000,
DefaultTimestamp: true,
DriverName: defaultDriverName,
DriverVersion: defaultDriverVersion,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
SocketKeepalive: 15 * time.Second,
WriteCoalesceWaitTime: 200 * time.Microsecond,
MetadataSchemaRequestTimeout: 60 * time.Second,
HeavyLoadedConnectionThreshold: 512,
HeavyLoadedSwitchConnectionPercentage: 20,
}

return cfg
Expand Down Expand Up @@ -345,6 +366,26 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
}

func (cfg *ClusterConfig) Validate() error {
if len(cfg.Hosts) == 0 {
return ErrNoHosts
}

if cfg.Authenticator != nil && cfg.AuthProvider != nil {
return errors.New("Can't use both Authenticator and AuthProvider in cluster config.")
}

if cfg.HeavyLoadedSwitchConnectionPercentage > 100 || cfg.HeavyLoadedSwitchConnectionPercentage < 0 {
return fmt.Errorf("HeavyLoadedSwitchConnectionPercentage must be between 0 and 100, got %d", cfg.HeavyLoadedSwitchConnectionPercentage)
}

if cfg.HeavyLoadedConnectionThreshold < 0 {
return fmt.Errorf("HeavyLoadedConnectionThreshold must be greater than or equal to 0, got %d", cfg.HeavyLoadedConnectionThreshold)
}

return nil
}

var (
ErrNoHosts = errors.New("no hosts provided")
ErrNoConnectionsStarted = errors.New("no connections were made when creating the session")
Expand Down
4 changes: 4 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,10 @@ func (c *Conn) AvailableStreams() int {
return c.streams.Available()
}

func (c *Conn) StreamsInUse() int {
return c.streams.InUse()
}

func (c *Conn) UseKeyspace(keyspace string) error {
q := &writeQueryFrame{statement: `USE "` + keyspace + `"`}
q.params.consistency = c.session.cons
Expand Down
5 changes: 5 additions & 0 deletions internal/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,3 +144,8 @@ func (s *IDGenerator) Available() int {
func (s *IDGenerator) InUse() int {
return int(atomic.LoadInt32(&s.inuseStreams))
}

// SetStreamsInUse sets streams in use counter, to be used for testing only
func SetStreamsInUse(s *IDGenerator, val int32) {
atomic.StoreInt32(&s.inuseStreams, val)
}
7 changes: 3 additions & 4 deletions scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -416,15 +416,14 @@ func (p *scyllaConnPicker) maybeReplaceWithLessBusyConnection(c *Conn) *Conn {
return c
}
alternative := p.leastBusyConn()
if alternative == nil || alternative.AvailableStreams()*120 > c.AvailableStreams()*100 {
return c
} else {
if alternative != nil && alternative.StreamsInUse()*100 <= c.StreamsInUse()*(100-c.session.cfg.HeavyLoadedSwitchConnectionPercentage) {
return alternative
}
return c
}

func isHeavyLoaded(c *Conn) bool {
return c.streams.NumStreams/2 > c.AvailableStreams()
return c.StreamsInUse() > c.session.cfg.HeavyLoadedConnectionThreshold
}

func (p *scyllaConnPicker) leastBusyConn() *Conn {
Expand Down
71 changes: 71 additions & 0 deletions scylla_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,76 @@ func TestScyllaConnPickerHammerPickNilToken(t *testing.T) {
wg.Wait()
}

func TestScyllaConnPicker(t *testing.T) {
t.Parallel()

t.Run("maybeReplaceWithLessBusyConnection", func(t *testing.T) {

cfg := ClusterConfig{
HeavyLoadedSwitchConnectionPercentage: 30,
HeavyLoadedConnectionThreshold: 100,
}

tcases := []struct {
name string
streamsInUse [3]int32
expected int
}{
{
name: "all connections below threshold",
streamsInUse: [3]int32{99, 98, 97},
expected: 0,
},
{
name: "all connections in threshold, but none is switchable",
streamsInUse: [3]int32{110, 109, 108},
expected: 0,
},
{
name: "all connections in threshold, one is below threshold",
streamsInUse: [3]int32{110, 109, 70},
expected: 2,
},
{
name: "all connections in threshold, one is above threshold, but below switchable percentage",
streamsInUse: [3]int32{210, 130, 209},
expected: 1,
},
}

for _, tcase := range tcases {
t.Run(tcase.name, func(t *testing.T) {
s := scyllaConnPicker{
nrShards: 4,
msbIgnore: 12,
}

conns := [3]*Conn{
mockConn(0),
mockConn(1),
mockConn(2),
}

for _, conn := range conns {
conn.session.cfg = cfg
s.Put(conn)
}

for id, inUse := range tcase.streamsInUse {
streams.SetStreamsInUse(conns[id].streams, inUse)
}

expectedConn := conns[tcase.expected]

c := s.maybeReplaceWithLessBusyConnection(conns[0])
if c != expectedConn {
t.Errorf("expected connection from shard %d, got %d", expectedConn.scyllaSupported.shard, c.scyllaSupported.shard)
}
})
}
})
}

func TestScyllaConnPickerRemove(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -135,6 +205,7 @@ func mockConn(shard int) *Conn {
partitioner: "org.apache.cassandra.dht.Murmur3Partitioner",
shardingAlgorithm: "biased-token-round-robin",
},
session: &Session{},
}
}

Expand Down
11 changes: 2 additions & 9 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,16 +119,9 @@ func addrsToHosts(addrs []string, defaultPort int, logger StdLogger) ([]*HostInf

// NewSession wraps an existing Node.
func NewSession(cfg ClusterConfig) (*Session, error) {
// Check that hosts in the ClusterConfig is not empty
if len(cfg.Hosts) < 1 {
return nil, ErrNoHosts
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("gocql: unable to create session: cluster config validation failed: %v", err)
}

// Check that either Authenticator is set or AuthProvider, not both
if cfg.Authenticator != nil && cfg.AuthProvider != nil {
return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.")
}

// TODO: we should take a context in here at some point
ctx, cancel := context.WithCancel(context.TODO())

Expand Down

0 comments on commit 014cb35

Please sign in to comment.