Skip to content

Commit

Permalink
Add support to MaxStalenessSeconds in ReadPreference
Browse files Browse the repository at this point in the history
  • Loading branch information
wpjunior committed Oct 7, 2018
1 parent 1a9c7a9 commit 39ae9df
Show file tree
Hide file tree
Showing 3 changed files with 63 additions and 6 deletions.
46 changes: 46 additions & 0 deletions session.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,11 @@ const (
// false: Initiate the connection without TLS/SSL.
// The default value is false.
//
// maxStalenessSeconds=<seconds>
//
// specify a maximum replication lag, or “staleness” in seconds, for reads from secondaries, minimum value allowed is 90.
// Works on MongoDB 3.4+
//
// Relevant documentation:
//
// http://docs.mongodb.org/manual/reference/connection-string/
Expand Down Expand Up @@ -353,6 +358,7 @@ func ParseURL(url string) (*DialInfo, error) {
var readPreferenceTagSets []bson.D
minPoolSize := 0
maxIdleTimeMS := 0
maxStalenessSeconds := 0
safe := Safe{}
for _, opt := range uinfo.options {
switch opt.key {
Expand Down Expand Up @@ -390,6 +396,17 @@ func ParseURL(url string) (*DialInfo, error) {
if err != nil {
return nil, errors.New("bad value for maxPoolSize: " + opt.value)
}
case "maxStalenessSeconds":
maxStalenessSeconds, err = strconv.Atoi(opt.value)

if err != nil {
return nil, errors.New("bad value for maxStalenessSeconds: " + opt.value)
}

if maxStalenessSeconds > 0 && maxStalenessSeconds < 90 {
return nil, errors.New("maxStalenessSeconds too low " + opt.value + ", must be >= 90 seconds")
}

case "appName":
if len(opt.value) > 128 {
return nil, errors.New("appName too long, must be < 128 bytes: " + opt.value)
Expand Down Expand Up @@ -455,6 +472,10 @@ func ParseURL(url string) (*DialInfo, error) {
return nil, errors.New("readPreferenceTagSet may not be specified when readPreference is primary")
}

if readPreferenceMode == Primary && maxStalenessSeconds > 0 {
return nil, errors.New("maxStalenessSeconds may not be specified when readPreference is primary")
}

info := DialInfo{
Addrs: uinfo.addrs,
Direct: direct,
Expand All @@ -469,6 +490,8 @@ func ParseURL(url string) (*DialInfo, error) {
ReadPreference: &ReadPreference{
Mode: readPreferenceMode,
TagSets: readPreferenceTagSets,

MaxStalenessSeconds: maxStalenessSeconds,
},
Safe: safe,
ReplicaSetName: setName,
Expand Down Expand Up @@ -607,6 +630,8 @@ func (i *DialInfo) Copy() *DialInfo {
if i.ReadPreference != nil {
readPreference = &ReadPreference{
Mode: i.ReadPreference.Mode,

MaxStalenessSeconds: i.ReadPreference.MaxStalenessSeconds,
}
readPreference.TagSets = make([]bson.D, len(i.ReadPreference.TagSets))
copy(readPreference.TagSets, i.ReadPreference.TagSets)
Expand Down Expand Up @@ -679,6 +704,9 @@ type ReadPreference struct {
// Mode determines the consistency of results. See Session.SetMode.
Mode Mode

// MaxStalenessSeconds specify a maximum replication lag, or “staleness” in seconds, for reads from secondaries.
MaxStalenessSeconds int

// TagSets indicates which servers are allowed to be used. See Session.SelectServers.
TagSets []bson.D
}
Expand Down Expand Up @@ -768,6 +796,7 @@ func DialWithInfo(dialInfo *DialInfo) (*Session, error) {
if info.ReadPreference != nil {
session.SelectServers(info.ReadPreference.TagSets...)
session.SetMode(info.ReadPreference.Mode, true)
session.SetMaxStalenessSeconds(info.ReadPreference.MaxStalenessSeconds)
} else {
session.SetMode(Strong, true)
}
Expand Down Expand Up @@ -2190,6 +2219,23 @@ func (s *Session) SetPoolTimeout(timeout time.Duration) {
s.m.Unlock()
}

// SetMaxStalenessSeconds set the maximum of seconds of replication lag from secondaries
// Works on MongoDB 3.4+
//
// Relevant documentation:
//
// https://docs.mongodb.com/manual/core/read-preference/#maxstalenessseconds
//
func (s *Session) SetMaxStalenessSeconds(seconds int) error {
s.m.Lock()
defer s.m.Unlock()
if seconds > 0 && seconds < 90 {
return errors.New("SetMaxStalenessSeconds: minimum of seconds is 90")
}
s.queryConfig.op.maxStalenessSeconds = seconds
return nil
}

// SetBypassValidation sets whether the server should bypass the registered
// validation expressions executed when documents are inserted or modified,
// in the interest of preserving invariants in the collection being modified.
Expand Down
15 changes: 10 additions & 5 deletions session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,28 +167,33 @@ func (s *S) TestURLReadPreference(c *C) {
type test struct {
url string
mode mgo.Mode

maxStalenessSeconds int
}

tests := []test{
{"localhost:40001?readPreference=primary", mgo.Primary},
{"localhost:40001?readPreference=primaryPreferred", mgo.PrimaryPreferred},
{"localhost:40001?readPreference=secondary", mgo.Secondary},
{"localhost:40001?readPreference=secondaryPreferred", mgo.SecondaryPreferred},
{"localhost:40001?readPreference=nearest", mgo.Nearest},
{"localhost:40001?readPreference=primary", mgo.Primary, 0},
{"localhost:40001?readPreference=primaryPreferred", mgo.PrimaryPreferred, 0},
{"localhost:40001?readPreference=secondary", mgo.Secondary, 0},
{"localhost:40001?readPreference=secondaryPreferred", mgo.SecondaryPreferred, 0},
{"localhost:40001?readPreference=secondary&maxStalenessSeconds=110", mgo.Secondary, 110},
{"localhost:40001?readPreference=nearest", mgo.Nearest, 0},
}

for _, test := range tests {
info, err := mgo.ParseURL(test.url)
c.Assert(err, IsNil)
c.Assert(info.ReadPreference, NotNil)
c.Assert(info.ReadPreference.Mode, Equals, test.mode)
c.Assert(info.ReadPreference.MaxStalenessSeconds, Equals, test.maxStalenessSeconds)
}
}

func (s *S) TestURLInvalidReadPreference(c *C) {
urls := []string{
"localhost:40001?readPreference=foo",
"localhost:40001?readPreference=primarypreferred",
"localhost:40001?readPreference=primary&maxStalenessSeconds=90",
}
for _, url := range urls {
_, err := mgo.ParseURL(url)
Expand Down
8 changes: 7 additions & 1 deletion socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ type queryOp struct {
hasOptions bool
flags queryOpFlags
readConcern string

maxStalenessSeconds int
}

type queryWrapper struct {
Expand Down Expand Up @@ -120,11 +122,15 @@ func (op *queryOp) finalQuery(socket *mongoSocket) interface{} {
panic(fmt.Sprintf("unsupported read mode: %d", op.mode))
}
op.hasOptions = true
op.options.ReadPreference = make(bson.D, 0, 2)
op.options.ReadPreference = make(bson.D, 0, 3)
op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "mode", Value: modeName})
if len(op.serverTags) > 0 {
op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "tags", Value: op.serverTags})
}

if op.maxStalenessSeconds > 0 {
op.options.ReadPreference = append(op.options.ReadPreference, bson.DocElem{Name: "maxStalenessSeconds", Value: op.maxStalenessSeconds})
}
}
if op.hasOptions {
if op.query == nil {
Expand Down

0 comments on commit 39ae9df

Please sign in to comment.