Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support MongoDB 4.0 startAtOperationTime option #306

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions changestreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ type ChangeStreamOptions struct {

// Collation specifies the way the server should collate returned data.
//TODO Collation *Collation

// Specify StartAtOperationTime to open the cursor at a particular point in time.
// If the specified starting point is in the past, it must be in the time range of the oplog.
StartAtOperationTime bson.MongoTimestamp
}

var errMissingResumeToken = errors.New("resume token missing from result")
Expand Down Expand Up @@ -315,9 +319,14 @@ func constructChangeStreamPipeline(pipeline interface{},
if options.FullDocument != "" {
changeStreamStageOptions["fullDocument"] = options.FullDocument
}
// resumeAfter and startAtOperationTime are mutually exclusive but
// according to the spec validation should not be done at the driver level
if options.ResumeAfter != nil {
changeStreamStageOptions["resumeAfter"] = options.ResumeAfter
}
if options.StartAtOperationTime > 0 {
changeStreamStageOptions["startAtOperationTime"] = options.StartAtOperationTime
}
if domain == changeDomainCluster {
changeStreamStageOptions["allChangesForCluster"] = true
}
Expand Down Expand Up @@ -363,6 +372,7 @@ func (changeStream *ChangeStream) resume() error {
opts := changeStream.options
if changeStream.resumeToken != nil {
opts.ResumeAfter = changeStream.resumeToken
opts.StartAtOperationTime = bson.MongoTimestamp(0)
}
// make a new pipeline containing the resume token.
changeStreamPipeline := constructChangeStreamPipeline(changeStream.pipeline, opts, changeStream.domainType)
Expand Down
64 changes: 64 additions & 0 deletions changestreams_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type changeEvent struct {
Ns evNamespace `bson:"ns"`
DocumentKey M `bson:"documentKey"`
UpdateDescription *updateDesc `bson:"updateDescription,omitempty"`
ClusterTime bson.MongoTimestamp `bson:"clusterTime,omitempty"`
}

type watchable interface {
Expand Down Expand Up @@ -58,6 +59,69 @@ func (s *S) TestStreamsWatch(c *C) {
}
}

func (s *S) TestStreamsStartAtOperationTime(c *C) {
if !s.versionAtLeast(4, 0) {
c.Skip("StartAtOperationTime only works on 4.0+")
}
session, err := mgo.Dial("localhost:40011")
c.Assert(err, IsNil)
defer session.Close()
coll := session.DB("mydb").C("mycoll")

var id bson.ObjectId
var ts bson.MongoTimestamp

// checkEv will check the event is correct
var checkEv = func(ev changeEvent) {
oid := ev.DocumentKey["_id"].(bson.ObjectId)
ots := ev.ClusterTime
c.Assert(oid, Equals, id)
c.Assert(ots > 0, Equals, true)
c.Assert(ots, Equals, ts)
}
var testF = func(w watchable) {
var hasEvent bool
pipeline := []bson.M{}
changeStream, e := w.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500})
c.Assert(e, IsNil)

//insert a new document while the change stream is listening
id = bson.NewObjectId()
e = coll.Insert(M{"_id": id, "a": 1})
c.Assert(e, IsNil)

//read the insert event
changeEv := changeEvent{}
hasEvent = changeStream.Next(&changeEv)
c.Assert(hasEvent, Equals, true)

//capture timestamp of insert event
ts = changeEv.ClusterTime

//verify the insert event
checkEv(changeEv)
e = changeStream.Close()
c.Assert(e, IsNil)

//start another change stream starting at the insert event
changeStream, e = w.Watch(pipeline, mgo.ChangeStreamOptions{StartAtOperationTime: ts, MaxAwaitTimeMS: 1500})
changeEv = changeEvent{}
hasEvent = changeStream.Next(&changeEv)
c.Assert(hasEvent, Equals, true)

//check that we restarted at the insert event
checkEv(changeEv)
e = changeStream.Close()
c.Assert(e, IsNil)
}
//collection level
testF(coll)
//db level
testF(session.DB("mydb"))
//cluster level
testF(session)
}

func (s *S) TestStreamsInsert(c *C) {
if !s.versionAtLeast(3, 6) {
c.Skip("ChangeStreams only work on 3.6+")
Expand Down