Skip to content

Commit

Permalink
Make heavy loaded optimization configurable
Browse files Browse the repository at this point in the history
Scylla Go Driver has a capability to avoid sending requests to an
overloaded shard, instead sending the request on a different connection
(at the same node).

This change makes it possible to customize the parameters used to
determine when this behavior would kick in.
  • Loading branch information
dkropachev committed Jul 6, 2024
1 parent ed9f13a commit c894f1f
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 30 deletions.
75 changes: 58 additions & 17 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package gocql
import (
"context"
"errors"
"fmt"
"net"
"time"
)
Expand Down Expand Up @@ -91,6 +92,24 @@ type ClusterConfig struct {
// Default: 128 for older CQL versions
MaxRequestsPerConn int

// Threshold for the number of inflight requests per connection
// after which the connection is considered as heavy loaded
// Default: 512
HeavyLoadedConnectionThreshold int

// When a connection is considered as heavy loaded, the driver
// could switch to the least loaded connection for the same node.
// The switch will happen if the other connection is at least
// HeavyLoadedSwitchConnectionPercentage percentage less busy
// (in terms of inflight requests).
//
// For the default value of 20%, if the heavy loaded connection
// has 100 inflight requests, the switch will happen only if the
// least busy connection has less than 80 inflight requests.
//
// Default: 20%
HeavyLoadedSwitchConnectionPercentage int

// Default consistency level.
// Default: Quorum
Consistency Consistency
Expand Down Expand Up @@ -275,23 +294,25 @@ type Dialer interface {
// the same host, and will not mark the node being down or up from events.
func NewCluster(hosts ...string) *ClusterConfig {
cfg := &ClusterConfig{
Hosts: hosts,
CQLVersion: "3.0.0",
Timeout: 11 * time.Second,
ConnectTimeout: 11 * time.Second,
Port: 9042,
NumConns: 2,
Consistency: Quorum,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
PageSize: 5000,
DefaultTimestamp: true,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
SocketKeepalive: 15 * time.Second,
WriteCoalesceWaitTime: 200 * time.Microsecond,
Hosts: hosts,
CQLVersion: "3.0.0",
Timeout: 11 * time.Second,
ConnectTimeout: 11 * time.Second,
Port: 9042,
NumConns: 2,
Consistency: Quorum,
MaxPreparedStmts: defaultMaxPreparedStmts,
MaxRoutingKeyInfo: 1000,
PageSize: 5000,
DefaultTimestamp: true,
MaxWaitSchemaAgreement: 60 * time.Second,
ReconnectInterval: 60 * time.Second,
ConvictionPolicy: &SimpleConvictionPolicy{},
ReconnectionPolicy: &ConstantReconnectionPolicy{MaxRetries: 3, Interval: 1 * time.Second},
SocketKeepalive: 15 * time.Second,
WriteCoalesceWaitTime: 200 * time.Microsecond,
HeavyLoadedConnectionThreshold: 512,
HeavyLoadedSwitchConnectionPercentage: 20,
}

return cfg
Expand Down Expand Up @@ -329,6 +350,26 @@ func (cfg *ClusterConfig) filterHost(host *HostInfo) bool {
return !(cfg.HostFilter == nil || cfg.HostFilter.Accept(host))
}

func (cfg *ClusterConfig) Validate() error {
if len(cfg.Hosts) == 0 {
return ErrNoHosts
}

if cfg.Authenticator != nil && cfg.AuthProvider != nil {
return errors.New("Can't use both Authenticator and AuthProvider in cluster config.")
}

if cfg.HeavyLoadedSwitchConnectionPercentage > 100 || cfg.HeavyLoadedSwitchConnectionPercentage < 0 {
return fmt.Errorf("HeavyLoadedSwitchConnectionPercentage must be between 0 and 100, got %d", cfg.HeavyLoadedSwitchConnectionPercentage)
}

if cfg.HeavyLoadedConnectionThreshold < 0 {
return fmt.Errorf("HeavyLoadedConnectionThreshold must be greater than or equal to 0, got %d", cfg.HeavyLoadedConnectionThreshold)
}

return nil
}

var (
ErrNoHosts = errors.New("no hosts provided")
ErrNoConnectionsStarted = errors.New("no connections were made when creating the session")
Expand Down
4 changes: 4 additions & 0 deletions conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1615,6 +1615,10 @@ func (c *Conn) AvailableStreams() int {
return c.streams.Available()
}

func (c *Conn) StreamsInUse() int {
return c.streams.InUse()
}

func (c *Conn) UseKeyspace(keyspace string) error {
q := &writeQueryFrame{statement: `USE "` + keyspace + `"`}
q.params.consistency = c.session.cons
Expand Down
7 changes: 3 additions & 4 deletions scylla.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,15 +413,14 @@ func (p *scyllaConnPicker) maybeReplaceWithLessBusyConnection(c *Conn) *Conn {
return c
}
alternative := p.leastBusyConn()
if alternative == nil || alternative.AvailableStreams()*120 > c.AvailableStreams()*100 {
return c
} else {
if alternative != nil && alternative.StreamsInUse()*100 >= c.StreamsInUse()*(100-c.session.cfg.HeavyLoadedSwitchConnectionPercentage) {
return alternative
}
return c
}

func isHeavyLoaded(c *Conn) bool {
return c.streams.NumStreams/2 > c.AvailableStreams()
return c.StreamsInUse() > c.session.cfg.HeavyLoadedConnectionThreshold
}

func (p *scyllaConnPicker) leastBusyConn() *Conn {
Expand Down
11 changes: 2 additions & 9 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,9 @@ func addrsToHosts(addrs []string, defaultPort int, logger StdLogger) ([]*HostInf

// NewSession wraps an existing Node.
func NewSession(cfg ClusterConfig) (*Session, error) {
// Check that hosts in the ClusterConfig is not empty
if len(cfg.Hosts) < 1 {
return nil, ErrNoHosts
if err := cfg.Validate(); err != nil {
return nil, fmt.Errorf("gocql: unable to create session: cluster config validation failed: %v", err)
}

// Check that either Authenticator is set or AuthProvider, not both
if cfg.Authenticator != nil && cfg.AuthProvider != nil {
return nil, errors.New("Can't use both Authenticator and AuthProvider in cluster config.")
}

// TODO: we should take a context in here at some point
ctx, cancel := context.WithCancel(context.TODO())

Expand Down

0 comments on commit c894f1f

Please sign in to comment.