Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Selenium Grid scaler #6169

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,9 @@ Here is an overview of all new **experimental** features:
- **Grafana dashboard**: Fix dashboard to handle wildcard scaledObject variables ([#6214](https://github.com/kedacore/keda/issues/6214))
- **Kafka**: Fix logic to scale to zero on invalid offset even with earliest offsetResetPolicy ([#5689](https://github.com/kedacore/keda/issues/5689))
- **RabbitMQ Scaler**: Add connection name for AMQP ([#5958](https://github.com/kedacore/keda/issues/5958))
- **Selenium Scaler**: Add Support for Username and Password Authentication ([#6144](https://github.com/kedacore/keda/issues/6144))
- **Selenium Grid Scaler**: Add optional auth parameters `username`, `password`, `authType`, `accessToken` to configure a secure GraphQL endpoint ([#6144](https://github.com/kedacore/keda/issues/6144))
- **Selenium Grid Scaler**: Add parameter `nodeMaxSessions` to configure scaler sync with `--max-sessions` capacity in the Node ([#6080](https://github.com/kedacore/keda/issues/6080))
- **Selenium Grid Scaler**: Improve logic based on node stereotypes, node sessions and queue requests capabilities ([#6080](https://github.com/kedacore/keda/issues/6080))
- TODO ([#XXX](https://github.com/kedacore/keda/issues/XXX))

### Fixes
Expand Down
254 changes: 195 additions & 59 deletions pkg/scalers/selenium_grid_scaler.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"errors"
"fmt"
"io"
"math"
"net/http"
"strings"

Expand All @@ -29,55 +28,86 @@ type seleniumGridScaler struct {
type seleniumGridScalerMetadata struct {
triggerIndex int

URL string `keda:"name=url, order=triggerMetadata;authParams"`
URL string `keda:"name=url, order=authParams;triggerMetadata"`
AuthType string `keda:"name=authType, order=authParams;resolvedEnv, optional"`
Username string `keda:"name=username, order=authParams;resolvedEnv, optional"`
Password string `keda:"name=password, order=authParams;resolvedEnv, optional"`
AccessToken string `keda:"name=accessToken, order=authParams;resolvedEnv, optional"`
BrowserName string `keda:"name=browserName, order=triggerMetadata"`
SessionBrowserName string `keda:"name=sessionBrowserName, order=triggerMetadata, optional"`
ActivationThreshold int64 `keda:"name=activationThreshold, order=triggerMetadata, optional"`
BrowserVersion string `keda:"name=browserVersion, order=triggerMetadata, optional, default=latest"`
UnsafeSsl bool `keda:"name=unsafeSsl, order=triggerMetadata, optional, default=false"`
PlatformName string `keda:"name=platformName, order=triggerMetadata, optional, default=linux"`

// auth
Username string `keda:"name=username, order=authParams;resolvedEnv;triggerMetadata, optional"`
Password string `keda:"name=password, order=authParams;resolvedEnv;triggerMetadata, optional"`
NodeMaxSessions int `keda:"name=nodeMaxSessions, order=triggerMetadata, optional, default=1"`

TargetValue int64
}

type seleniumResponse struct {
Data data `json:"data"`
type SeleniumResponse struct {
Data Data `json:"data"`
}

type Data struct {
Grid Grid `json:"grid"`
NodesInfo NodesInfo `json:"nodesInfo"`
SessionsInfo SessionsInfo `json:"sessionsInfo"`
}

type Grid struct {
SessionCount int `json:"sessionCount"`
MaxSession int `json:"maxSession"`
TotalSlots int `json:"totalSlots"`
}

type NodesInfo struct {
Nodes Nodes `json:"nodes"`
}

type data struct {
Grid grid `json:"grid"`
SessionsInfo sessionsInfo `json:"sessionsInfo"`
type SessionsInfo struct {
SessionQueueRequests []string `json:"sessionQueueRequests"`
}

type grid struct {
MaxSession int `json:"maxSession"`
NodeCount int `json:"nodeCount"`
type Nodes []struct {
ID string `json:"id"`
Status string `json:"status"`
SessionCount int `json:"sessionCount"`
MaxSession int `json:"maxSession"`
SlotCount int `json:"slotCount"`
Stereotypes string `json:"stereotypes"`
Sessions Sessions `json:"sessions"`
}

type sessionsInfo struct {
SessionQueueRequests []string `json:"sessionQueueRequests"`
Sessions []seleniumSession `json:"sessions"`
type ReservedNodes struct {
ID string `json:"id"`
MaxSession int `json:"maxSession"`
SlotCount int `json:"slotCount"`
}

type seleniumSession struct {
type Sessions []struct {
ID string `json:"id"`
Capabilities string `json:"capabilities"`
NodeID string `json:"nodeId"`
Slot Slot `json:"slot"`
}

type Slot struct {
ID string `json:"id"`
Stereotype string `json:"stereotype"`
}

type capability struct {
type Capability struct {
BrowserName string `json:"browserName"`
BrowserVersion string `json:"browserVersion"`
PlatformName string `json:"platformName"`
}

type Stereotypes []struct {
Slots int `json:"slots"`
Stereotype Capability `json:"stereotype"`
}

const (
DefaultBrowserVersion string = "latest"
DefaultPlatformName string = "linux"
)

func NewSeleniumGridScaler(config *scalersconfig.ScalerConfig) (Scaler, error) {
Expand Down Expand Up @@ -121,7 +151,7 @@ func parseSeleniumGridScalerMetadata(config *scalersconfig.ScalerConfig) (*selen
return meta, nil
}

// No cleanup required for selenium grid scaler
// No cleanup required for Selenium Grid scaler
func (s *seleniumGridScaler) Close(context.Context) error {
if s.httpClient != nil {
s.httpClient.CloseIdleConnections()
Expand Down Expand Up @@ -156,7 +186,7 @@ func (s *seleniumGridScaler) GetMetricSpecForScaling(context.Context) []v2.Metri

func (s *seleniumGridScaler) getSessionsCount(ctx context.Context, logger logr.Logger) (int64, error) {
body, err := json.Marshal(map[string]string{
"query": "{ grid { maxSession, nodeCount }, sessionsInfo { sessionQueueRequests, sessions { id, capabilities, nodeId } } }",
"query": "{ grid { sessionCount, maxSession, totalSlots }, nodesInfo { nodes { id, status, sessionCount, maxSession, slotCount, stereotypes, sessions { id, capabilities, slot { id, stereotype } } } }, sessionsInfo { sessionQueueRequests } }",
})

if err != nil {
Expand All @@ -168,8 +198,11 @@ func (s *seleniumGridScaler) getSessionsCount(ctx context.Context, logger logr.L
return -1, err
}

// Add HTTP Auth
req.SetBasicAuth(s.metadata.Username, s.metadata.Password)
if (s.metadata.AuthType == "" || strings.EqualFold(s.metadata.AuthType, "Basic")) && s.metadata.Username != "" && s.metadata.Password != "" {
req.SetBasicAuth(s.metadata.Username, s.metadata.Password)
} else if !strings.EqualFold(s.metadata.AuthType, "Basic") && s.metadata.AccessToken != "" {
req.Header.Set("Authorization", fmt.Sprintf("%s %s", s.metadata.AuthType, s.metadata.AccessToken))
}

res, err := s.httpClient.Do(req)
if err != nil {
Expand All @@ -186,62 +219,165 @@ func (s *seleniumGridScaler) getSessionsCount(ctx context.Context, logger logr.L
if err != nil {
return -1, err
}
v, err := getCountFromSeleniumResponse(b, s.metadata.BrowserName, s.metadata.BrowserVersion, s.metadata.SessionBrowserName, s.metadata.PlatformName, logger)
v, err := getCountFromSeleniumResponse(b, s.metadata.BrowserName, s.metadata.BrowserVersion, s.metadata.SessionBrowserName, s.metadata.PlatformName, s.metadata.NodeMaxSessions, logger)
if err != nil {
return -1, err
}
return v, nil
}

func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion string, sessionBrowserName string, platformName string, logger logr.Logger) (int64, error) {
func countMatchingSlotsStereotypes(stereotypes Stereotypes, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string) int {
var matchingSlots int
for _, stereotype := range stereotypes {
if checkCapabilitiesMatch(stereotype.Stereotype, request, browserName, browserVersion, sessionBrowserName, platformName) {
matchingSlots += stereotype.Slots
}
}
return matchingSlots
}

func countMatchingSessions(sessions Sessions, request Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string, logger logr.Logger) int {
var matchingSessions int
for _, session := range sessions {
var capability = Capability{}
if err := json.Unmarshal([]byte(session.Capabilities), &capability); err == nil {
if checkCapabilitiesMatch(capability, request, browserName, browserVersion, sessionBrowserName, platformName) {
matchingSessions++
}
} else {
logger.Error(err, fmt.Sprintf("Error when unmarshaling session capabilities: %s", err))
}
}
return matchingSessions
}

func checkCapabilitiesMatch(capability Capability, requestCapability Capability, browserName string, browserVersion string, sessionBrowserName string, platformName string) bool {
// Ensure the logic should be aligned with DefaultSlotMatcher in Selenium Grid - SeleniumHQ/selenium/java/src/org/openqa/selenium/grid/data/DefaultSlotMatcher.java
// A browserName matches when one of the following conditions is met:
// 1. `browserName` in capability matches with `browserName` or `sessionBrowserName` in scaler metadata
// 2. `browserName` in request capability is empty or not provided
var browserNameMatches = strings.EqualFold(capability.BrowserName, browserName) || strings.EqualFold(capability.BrowserName, sessionBrowserName) ||
requestCapability.BrowserName == ""
// A browserVersion matches when one of the following conditions is met:
// 1. `browserVersion` in request capability is empty or not provided or `stable`
// 2. `browserVersion` in capability matches with prefix of the scaler metadata `browserVersion`
// 3. `browserVersion` in scaler metadata is `latest`
var browserVersionMatches = requestCapability.BrowserVersion == "" || requestCapability.BrowserVersion == "stable" ||
strings.HasPrefix(capability.BrowserVersion, browserVersion) || browserVersion == DefaultBrowserVersion
// A platformName matches when one of the following conditions is met:
// 1. `platformName` in request capability is empty or not provided
// 2. `platformName` in capability is empty or not provided
// 3. `platformName` in capability matches with the scaler metadata `platformName`
// 4. `platformName` in scaler metadata is empty or not provided
var platformNameMatches = requestCapability.PlatformName == "" || capability.PlatformName == "" ||
strings.EqualFold(capability.PlatformName, platformName) || platformName == ""
return browserNameMatches && browserVersionMatches && platformNameMatches
}

func checkNodeReservedSlots(reservedNodes []ReservedNodes, nodeID string, availableSlots int) int {
for _, reservedNode := range reservedNodes {
if strings.EqualFold(reservedNode.ID, nodeID) {
return reservedNode.SlotCount
}
}
return availableSlots
}

func updateOrAddReservedNode(reservedNodes []ReservedNodes, nodeID string, slotCount int, maxSession int) []ReservedNodes {
for i, reservedNode := range reservedNodes {
if strings.EqualFold(reservedNode.ID, nodeID) {
// Update remaining available slots for the reserved node
reservedNodes[i].SlotCount = slotCount
return reservedNodes
}
}
// Add new reserved node if not found
return append(reservedNodes, ReservedNodes{ID: nodeID, SlotCount: slotCount, MaxSession: maxSession})
}

func getCountFromSeleniumResponse(b []byte, browserName string, browserVersion string, sessionBrowserName string, platformName string, nodeMaxSessions int, logger logr.Logger) (int64, error) {
// The returned count of the number of new Nodes will be scaled up
var count int64
var seleniumResponse = seleniumResponse{}
// Track number of available slots of existing Nodes in the Grid can be reserved for the matched requests
var availableSlots int
// Track number of matched requests in the sessions queue will be served by this scaler
var queueSlots int

var seleniumResponse = SeleniumResponse{}
if err := json.Unmarshal(b, &seleniumResponse); err != nil {
return 0, err
}

var sessionQueueRequests = seleniumResponse.Data.SessionsInfo.SessionQueueRequests
for _, sessionQueueRequest := range sessionQueueRequests {
var capability = capability{}
if err := json.Unmarshal([]byte(sessionQueueRequest), &capability); err == nil {
if capability.BrowserName == browserName {
var platformNameMatches = capability.PlatformName == "" || strings.EqualFold(capability.PlatformName, platformName)
if strings.HasPrefix(capability.BrowserVersion, browserVersion) && platformNameMatches {
count++
} else if len(strings.TrimSpace(capability.BrowserVersion)) == 0 && browserVersion == DefaultBrowserVersion && platformNameMatches {
count++
}
var nodes = seleniumResponse.Data.NodesInfo.Nodes
// Track list of existing Nodes that have available slots for the matched requests
var reservedNodes []ReservedNodes
// Track list of new Nodes will be scaled up with number of available slots following scaler parameter `nodeMaxSessions`
var newRequestNodes []ReservedNodes
for requestIndex, sessionQueueRequest := range sessionQueueRequests {
var isRequestMatched bool
var requestCapability = Capability{}
if err := json.Unmarshal([]byte(sessionQueueRequest), &requestCapability); err == nil {
if checkCapabilitiesMatch(requestCapability, requestCapability, browserName, browserVersion, sessionBrowserName, platformName) {
queueSlots++
isRequestMatched = true
}
} else {
logger.Error(err, fmt.Sprintf("Error when unmarshaling session queue requests: %s", err))
logger.Error(err, fmt.Sprintf("Error when unmarshaling sessionQueueRequest capability: %s", err))
}
}

var sessions = seleniumResponse.Data.SessionsInfo.Sessions
for _, session := range sessions {
var capability = capability{}
if err := json.Unmarshal([]byte(session.Capabilities), &capability); err == nil {
var platformNameMatches = capability.PlatformName == "" || strings.EqualFold(capability.PlatformName, platformName)
if capability.BrowserName == sessionBrowserName {
if strings.HasPrefix(capability.BrowserVersion, browserVersion) && platformNameMatches {
count++
} else if browserVersion == DefaultBrowserVersion && platformNameMatches {
count++
// Skip the request if the capability does not match the scaler parameters
if !isRequestMatched {
continue
}

var isRequestReserved bool
// Check if the matched request can be assigned to available slots of existing Nodes in the Grid
for _, node := range nodes {
// Check if node is UP and has available slots (maxSession > sessionCount)
if strings.EqualFold(node.Status, "UP") && checkNodeReservedSlots(reservedNodes, node.ID, node.MaxSession-node.SessionCount) > 0 {
var stereotypes = Stereotypes{}
var availableSlotsMatch int
if err := json.Unmarshal([]byte(node.Stereotypes), &stereotypes); err == nil {
// Count available slots that match the request capability and scaler metadata
availableSlotsMatch += countMatchingSlotsStereotypes(stereotypes, requestCapability, browserName, browserVersion, sessionBrowserName, platformName)
} else {
logger.Error(err, fmt.Sprintf("Error when unmarshaling node stereotypes: %s", err))
}
// Count ongoing sessions that match the request capability and scaler metadata
var currentSessionsMatch = countMatchingSessions(node.Sessions, requestCapability, browserName, browserVersion, sessionBrowserName, platformName, logger)
// Count remaining available slots can be reserved for this request
var availableSlotsCanBeReserved = checkNodeReservedSlots(reservedNodes, node.ID, node.MaxSession-node.SessionCount)
// Reserve one available slot for the request if available slots match is greater than current sessions match
if availableSlotsMatch > currentSessionsMatch {
availableSlots++
reservedNodes = updateOrAddReservedNode(reservedNodes, node.ID, availableSlotsCanBeReserved-1, node.MaxSession)
isRequestReserved = true
break
}
}
} else {
logger.Error(err, fmt.Sprintf("Error when unmarshaling sessions info: %s", err))
}
// Check if the matched request can be assigned to available slots of new Nodes will be scaled up, since the scaler parameter `nodeMaxSessions` can be greater than 1
if !isRequestReserved {
for _, newRequestNode := range newRequestNodes {
if newRequestNode.SlotCount > 0 {
newRequestNodes = updateOrAddReservedNode(newRequestNodes, newRequestNode.ID, newRequestNode.SlotCount-1, nodeMaxSessions)
isRequestReserved = true
break
}
}
}
// Check if a new Node should be scaled up to reserve for the matched request
if !isRequestReserved {
newRequestNodes = updateOrAddReservedNode(newRequestNodes, string(rune(requestIndex)), nodeMaxSessions-1, nodeMaxSessions)
}
}

var gridMaxSession = int64(seleniumResponse.Data.Grid.MaxSession)
var gridNodeCount = int64(seleniumResponse.Data.Grid.NodeCount)

if gridMaxSession > 0 && gridNodeCount > 0 {
// Get count, convert count to next highest int64
var floatCount = float64(count) / (float64(gridMaxSession) / float64(gridNodeCount))
count = int64(math.Ceil(floatCount))
if queueSlots > availableSlots {
count = int64(len(newRequestNodes))
} else {
count = 0
}

return count, nil
}
Loading
Loading