Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GODRIVER-2689 (POC) Simplify the "readpref" API #1733

Draft
wants to merge 7 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions event/description.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (

"go.mongodb.org/mongo-driver/v2/bson"
"go.mongodb.org/mongo-driver/v2/mongo/address"
"go.mongodb.org/mongo-driver/v2/tag"
"go.mongodb.org/mongo-driver/v2/mongo/readpref"
)

// ServerDescription contains information about a node in a cluster. This is
Expand Down Expand Up @@ -43,7 +43,7 @@ type ServerDescription struct {
SessionTimeoutMinutes *int64
SetName string
SetVersion uint32
Tags tag.Set
Tags readpref.TagSet
TopologyVersionProcessID bson.ObjectID
TopologyVersionCounter int64
}
Expand Down
4 changes: 2 additions & 2 deletions internal/driverutil/description.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"go.mongodb.org/mongo-driver/v2/internal/handshake"
"go.mongodb.org/mongo-driver/v2/internal/ptrutil"
"go.mongodb.org/mongo-driver/v2/mongo/address"
"go.mongodb.org/mongo-driver/v2/tag"
"go.mongodb.org/mongo-driver/v2/mongo/readpref"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
)

Expand Down Expand Up @@ -421,7 +421,7 @@ func NewServerDescription(addr address.Address, response bson.Raw) description.S
desc.LastError = err
return desc
}
desc.Tags = tag.NewTagSetFromMap(m)
desc.Tags = readpref.NewTagSetFromMap(m)
case "topologyVersion":
doc, ok := element.Value().DocumentOK()
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion internal/integration/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestDatabase(t *testing.T) {
// layer, which should add a top-level $readPreference field to the command.

runCmdOpts := options.RunCmd().
SetReadPreference(readpref.SecondaryPreferred())
SetReadPreference(&readpref.ReadPref{Mode: readpref.SecondaryPreferredMode})
err := mt.DB.RunCommand(context.Background(), bson.D{{handshake.LegacyHello, 1}}, runCmdOpts).Err()
assert.Nil(mt, err, "RunCommand error: %v", err)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ func runSeedlistDiscoveryPingTest(mt *mtest.T, clientOpts *options.ClientOptions
defer cancel()

// Ping the server.
err = client.Ping(pingCtx, readpref.Nearest())
err = client.Ping(pingCtx, &readpref.ReadPref{Mode: readpref.NearestMode})
assert.Nil(mt, err, "Ping error: %v", err)
}

Expand Down
8 changes: 4 additions & 4 deletions internal/integration/json_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,13 +420,13 @@ func readPrefFromString(s string) *readpref.ReadPref {
case "primary":
return readpref.Primary()
case "primarypreferred":
return readpref.PrimaryPreferred()
return &readpref.ReadPref{Mode: readpref.PrimaryPreferredMode}
case "secondary":
return readpref.Secondary()
return &readpref.ReadPref{Mode: readpref.SecondaryMode}
case "secondarypreferred":
return readpref.SecondaryPreferred()
return &readpref.ReadPref{Mode: readpref.SecondaryPreferredMode}
case "nearest":
return readpref.Nearest()
return &readpref.ReadPref{Mode: readpref.NearestMode}
}
return readpref.Primary()
}
Expand Down
2 changes: 1 addition & 1 deletion internal/integration/mtest/mongotest.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ var (
// PrimaryRp is the primary read preference.
PrimaryRp = readpref.Primary()
// SecondaryRp is the secondary read preference.
SecondaryRp = readpref.Secondary()
SecondaryRp = &readpref.ReadPref{Mode: readpref.SecondaryMode}
// LocalRc is the local read concern
LocalRc = readconcern.Local()
// MajorityRc is the majority read concern
Expand Down
19 changes: 10 additions & 9 deletions internal/integration/unified/common_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.mongodb.org/mongo-driver/v2/mongo/readconcern"
"go.mongodb.org/mongo-driver/v2/mongo/readpref"
"go.mongodb.org/mongo-driver/v2/mongo/writeconcern"
"go.mongodb.org/mongo-driver/v2/tag"
)

// This file defines helper types to convert BSON documents to ReadConcern, WriteConcern, and ReadPref objects.
Expand Down Expand Up @@ -70,32 +69,34 @@ func (rp *ReadPreference) ToReadPrefOption() (*readpref.ReadPref, error) {
return nil, fmt.Errorf("invalid read preference mode %q", rp.Mode)
}

var rpOptions []readpref.Option
rpOpts := readpref.Options()

if rp.TagSets != nil {
// Each item in the TagSets slice is a document that represents one set.
sets := make([]tag.Set, 0, len(rp.TagSets))
sets := make([]readpref.TagSet, 0, len(rp.TagSets))
for _, rawSet := range rp.TagSets {
parsed := make(tag.Set, 0, len(rawSet))
parsed := make(readpref.TagSet, 0, len(rawSet))
for k, v := range rawSet {
parsed = append(parsed, tag.Tag{Name: k, Value: v})
parsed = append(parsed, readpref.Tag{Name: k, Value: v})
}
sets = append(sets, parsed)
}

rpOptions = append(rpOptions, readpref.WithTagSets(sets...))
rpOpts.SetTagSets(sets)
}
if rp.MaxStalenessSeconds != nil {
maxStaleness := time.Duration(*rp.MaxStalenessSeconds) * time.Second
rpOptions = append(rpOptions, readpref.WithMaxStaleness(maxStaleness))
rpOpts.SetMaxStaleness(maxStaleness)
}
if rp.Hedge != nil {
if len(rp.Hedge) > 1 {
return nil, fmt.Errorf("invalid read preference hedge document: length cannot be greater than 1")
}
if enabled, ok := rp.Hedge["enabled"]; ok {
rpOptions = append(rpOptions, readpref.WithHedgeEnabled(enabled.(bool)))
hedgeEnabled := enabled.(bool)
rpOpts.SetHedgeEnabled(hedgeEnabled)
}
}

return readpref.New(mode, rpOptions...)
return readpref.New(mode, rpOpts)
}
21 changes: 10 additions & 11 deletions internal/serverselector/server_selector.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"time"

"go.mongodb.org/mongo-driver/v2/mongo/readpref"
"go.mongodb.org/mongo-driver/v2/tag"
"go.mongodb.org/mongo-driver/v2/x/mongo/driver/description"
)

Expand Down Expand Up @@ -190,12 +189,12 @@ func (ssf Func) SelectServer(
}

func verifyMaxStaleness(rp *readpref.ReadPref, topo description.Topology) error {
maxStaleness, set := rp.MaxStaleness()
if !set {
maxStaleness := rp.MaxStaleness()
if maxStaleness == nil {
return nil
}

if maxStaleness < 90*time.Second {
if *maxStaleness < 90*time.Second {
return fmt.Errorf("max staleness (%s) must be greater than or equal to 90s", maxStaleness)
}

Expand All @@ -208,7 +207,7 @@ func verifyMaxStaleness(rp *readpref.ReadPref, topo description.Topology) error
s := topo.Servers[0]
idleWritePeriod := 10 * time.Second

if maxStaleness < s.HeartbeatInterval+idleWritePeriod {
if *maxStaleness < s.HeartbeatInterval+idleWritePeriod {
return fmt.Errorf(
"max staleness (%s) must be greater than or equal to the heartbeat interval (%s) plus idle write period (%s)",
maxStaleness, s.HeartbeatInterval, idleWritePeriod,
Expand Down Expand Up @@ -242,7 +241,7 @@ func selectSecondaries(rp *readpref.ReadPref, candidates []description.Server) [
if len(secondaries) == 0 {
return secondaries
}
if maxStaleness, set := rp.MaxStaleness(); set {
if maxStaleness := rp.MaxStaleness(); maxStaleness != nil {
primaries := selectByKind(candidates, description.ServerKindRSPrimary)
if len(primaries) == 0 {
baseTime := secondaries[0].LastWriteTime
Expand All @@ -255,7 +254,7 @@ func selectSecondaries(rp *readpref.ReadPref, candidates []description.Server) [
var selected []description.Server
for _, secondary := range secondaries {
estimatedStaleness := baseTime.Sub(secondary.LastWriteTime) + secondary.HeartbeatInterval
if estimatedStaleness <= maxStaleness {
if estimatedStaleness <= *maxStaleness {
selected = append(selected, secondary)
}
}
Expand All @@ -269,7 +268,7 @@ func selectSecondaries(rp *readpref.ReadPref, candidates []description.Server) [
for _, secondary := range secondaries {
estimatedStaleness := secondary.LastUpdateTime.Sub(secondary.LastWriteTime) -
primary.LastUpdateTime.Sub(primary.LastWriteTime) + secondary.HeartbeatInterval
if estimatedStaleness <= maxStaleness {
if estimatedStaleness <= *maxStaleness {
selected = append(selected, secondary)
}
}
Expand All @@ -279,7 +278,7 @@ func selectSecondaries(rp *readpref.ReadPref, candidates []description.Server) [
return secondaries
}

func selectByTagSet(candidates []description.Server, tagSets []tag.Set) []description.Server {
func selectByTagSet(candidates []description.Server, tagSets []readpref.TagSet) []description.Server {
if len(tagSets) == 0 {
return candidates
}
Expand Down Expand Up @@ -327,7 +326,7 @@ func selectForReplicaSet(
}
}

switch rp.Mode() {
switch rp.Mode {
case readpref.PrimaryMode:
return selectByKind(candidates, description.ServerKindRSPrimary), nil
case readpref.PrimaryPreferredMode:
Expand Down Expand Up @@ -355,5 +354,5 @@ func selectForReplicaSet(
return selectByTagSet(selected, rp.TagSets()), nil
}

return nil, fmt.Errorf("unsupported mode: %d", rp.Mode())
return nil, fmt.Errorf("unsupported mode: %d", rp.Mode)
}
Loading
Loading