From baa28fcb8e7d5dfab92026c0920cb6c9ae72faa2 Mon Sep 17 00:00:00 2001 From: Dom Date: Tue, 20 Feb 2018 10:02:56 +0000 Subject: [PATCH] Merge Development (#111) * Brings in a patch on having flusher not suppress errors. (#81) https://github.com/go-mgo/mgo/pull/360 * Fallback to JSON tags when BSON tag isn't present (#91) * Fallback to JSON tags when BSON tag isn't present Cleanup. * Add test to demonstrate tagging fallback. - Test coverage for tagging test. * socket: only send client metadata once per socket Periodic cluster synchronisation calls isMaster() which currently resends the "client" metadata every call - the spec specifies: isMaster commands issued after the initial connection handshake MUST NOT contain handshake arguments https://github.com/mongodb/specifications/blob/master/source/mongodb-handshake/handshake.rst#connection-handshake This hotfix prevents subsequent isMaster calls from sending the client metadata again - fixes #101 and fixes #103. Thanks to @changwoo-nam @qhenkart @canthefason @jyoon17 for spotting the initial issue, opening tickets, and having the problem debugged with a PoC fix before I even woke up. * Cluster abended test 254 (#100) * Add a test that mongo Server gets their abended reset as necessary. See https://github.com/go-mgo/mgo/issues/254 and https://github.com/go-mgo/mgo/pull/255/files * Include the patch from Issue 255. This brings in a test which fails without the patch, and passes with the patch. Still to be tested, manual tcpkill of a socket. * changeStream support (#97) Add $changeStream support * readme: credit @peterdeka and @steve-gray (#110) --- .gitignore | 2 + README.md | 4 + bson/bson.go | 16 +- bson/compatability_test.go | 54 +++++ bson/compatibility.go | 16 ++ changestreams.go | 357 ++++++++++++++++++++++++++++ changestreams_test.go | 464 +++++++++++++++++++++++++++++++++++++ cluster.go | 1 + export_test.go | 13 ++ harness/daemons/.env | 11 +- harness/setup.sh | 2 +- server_test.go | 64 +++++ session.go | 64 ++++- txn/flusher.go | 2 +- 14 files changed, 1055 insertions(+), 15 deletions(-) create mode 100644 .gitignore create mode 100644 bson/compatability_test.go create mode 100644 bson/compatibility.go create mode 100644 changestreams.go create mode 100644 changestreams_test.go create mode 100644 server_test.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 000000000..9f4fa6d20 --- /dev/null +++ b/.gitignore @@ -0,0 +1,2 @@ +_harness + diff --git a/README.md b/README.md index c605e6bb0..cd5edff49 100644 --- a/README.md +++ b/README.md @@ -34,6 +34,8 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * Minimise socket connection timeouts due to excessive locking ([details](https://github.com/globalsign/mgo/pull/52)) * Natively support X509 client authentication ([details](https://github.com/globalsign/mgo/pull/55)) * Gracefully recover from a temporarily unreachable server ([details](https://github.com/globalsign/mgo/pull/69)) +* Use JSON tags when no explicit BSON are tags set ([details](https://github.com/globalsign/mgo/pull/91)) +* Support [$changeStream](https://docs.mongodb.com/manual/changeStreams/) tailing on 3.6+ ([details](https://github.com/globalsign/mgo/pull/97)) --- @@ -51,6 +53,8 @@ Further PR's (with tests) are welcome, but please maintain backwards compatibili * @jameinel * @gazoon * @mapete94 +* @peterdeka * @Reenjii * @smoya +* @steve-gray * @wgallagher \ No newline at end of file diff --git a/bson/bson.go b/bson/bson.go index d960f7a37..31beab191 100644 --- a/bson/bson.go +++ b/bson/bson.go @@ -698,9 +698,21 @@ func getStructInfo(st reflect.Type) (*structInfo, error) { info := fieldInfo{Num: i} tag := field.Tag.Get("bson") - if tag == "" && strings.Index(string(field.Tag), ":") < 0 { - tag = string(field.Tag) + + // Fall-back to JSON struct tag, if feature flag is set. + if tag == "" && useJSONTagFallback { + tag = field.Tag.Get("json") } + + // If there's no bson/json tag available. + if tag == "" { + // If there's no tag, and also no tag: value splits (i.e. no colon) + // then assume the entire tag is the value + if strings.Index(string(field.Tag), ":") < 0 { + tag = string(field.Tag) + } + } + if tag == "-" { continue } diff --git a/bson/compatability_test.go b/bson/compatability_test.go new file mode 100644 index 000000000..743a00e8a --- /dev/null +++ b/bson/compatability_test.go @@ -0,0 +1,54 @@ +package bson_test + +import ( + "github.com/globalsign/mgo/bson" + . "gopkg.in/check.v1" +) + +type mixedTagging struct { + First string + Second string `bson:"second_field"` + Third string `json:"third_field"` + Fourth string `bson:"fourth_field" json:"alternate"` +} + +// TestTaggingFallback checks that tagging fallback can be used/works as expected. +func (s *S) TestTaggingFallback(c *C) { + initial := &mixedTagging{ + First: "One", + Second: "Two", + Third: "Three", + Fourth: "Four", + } + + // Take only testing.T, leave only footprints. + initialState := bson.JSONTagFallbackState() + defer bson.SetJSONTagFallback(initialState) + + // Marshal with the new mode applied. + bson.SetJSONTagFallback(true) + bsonState, errBSON := bson.Marshal(initial) + c.Assert(errBSON, IsNil) + + // Unmarshal into a generic map so that we can pick up the actual field names + // selected. + target := make(map[string]string) + errUnmarshal := bson.Unmarshal(bsonState, target) + c.Assert(errUnmarshal, IsNil) + + // No tag, so standard naming + _, firstExists := target["first"] + c.Assert(firstExists, Equals, true) + + // Just a BSON tag + _, secondExists := target["second_field"] + c.Assert(secondExists, Equals, true) + + // Just a JSON tag + _, thirdExists := target["third_field"] + c.Assert(thirdExists, Equals, true) + + // Should marshal 4th as fourth_field (since we have both tags) + _, fourthExists := target["fourth_field"] + c.Assert(fourthExists, Equals, true) +} diff --git a/bson/compatibility.go b/bson/compatibility.go new file mode 100644 index 000000000..6afecf53c --- /dev/null +++ b/bson/compatibility.go @@ -0,0 +1,16 @@ +package bson + +// Current state of the JSON tag fallback option. +var useJSONTagFallback = false + +// SetJSONTagFallback enables or disables the JSON-tag fallback for structure tagging. When this is enabled, structures +// without BSON tags on a field will fall-back to using the JSON tag (if present). +func SetJSONTagFallback(state bool) { + useJSONTagFallback = state +} + +// JSONTagFallbackState returns the current status of the JSON tag fallback compatability option. See SetJSONTagFallback +// for more information. +func JSONTagFallbackState() bool { + return useJSONTagFallback +} diff --git a/changestreams.go b/changestreams.go new file mode 100644 index 000000000..5c2279c66 --- /dev/null +++ b/changestreams.go @@ -0,0 +1,357 @@ +package mgo + +import ( + "errors" + "fmt" + "reflect" + "sync" + "time" + + "github.com/globalsign/mgo/bson" +) + +type FullDocument string + +const ( + Default = "default" + UpdateLookup = "updateLookup" +) + +type ChangeStream struct { + iter *Iter + isClosed bool + options ChangeStreamOptions + pipeline interface{} + resumeToken *bson.Raw + collection *Collection + readPreference *ReadPreference + err error + m sync.Mutex + sessionCopied bool +} + +type ChangeStreamOptions struct { + + // FullDocument controls the amount of data that the server will return when + // returning a changes document. + FullDocument FullDocument + + // ResumeAfter specifies the logical starting point for the new change stream. + ResumeAfter *bson.Raw + + // MaxAwaitTimeMS specifies the maximum amount of time for the server to wait + // on new documents to satisfy a change stream query. + MaxAwaitTimeMS time.Duration + + // BatchSize specifies the number of documents to return per batch. + BatchSize int + + // Collation specifies the way the server should collate returned data. + //TODO Collation *Collation +} + +var errMissingResumeToken = errors.New("resume token missing from result") + +// Watch constructs a new ChangeStream capable of receiving continuing data +// from the database. +func (coll *Collection) Watch(pipeline interface{}, + options ChangeStreamOptions) (*ChangeStream, error) { + + if pipeline == nil { + pipeline = []bson.M{} + } + + csPipe := constructChangeStreamPipeline(pipeline, options) + pipe := coll.Pipe(&csPipe) + if options.MaxAwaitTimeMS > 0 { + pipe.SetMaxTime(options.MaxAwaitTimeMS) + } + if options.BatchSize > 0 { + pipe.Batch(options.BatchSize) + } + pIter := pipe.Iter() + + // check that there was no issue creating the iterator. + // this will fail immediately with an error from the server if running against + // a standalone. + if err := pIter.Err(); err != nil { + return nil, err + } + + pIter.isChangeStream = true + return &ChangeStream{ + iter: pIter, + collection: coll, + resumeToken: nil, + options: options, + pipeline: pipeline, + }, nil +} + +// Next retrieves the next document from the change stream, blocking if necessary. +// Next returns true if a document was successfully unmarshalled into result, +// and false if an error occured. When Next returns false, the Err method should +// be called to check what error occurred during iteration. If there were no events +// available (ErrNotFound), the Err method returns nil so the user can retry the invocaton. +// +// For example: +// +// pipeline := []bson.M{} +// +// changeStream := collection.Watch(pipeline, ChangeStreamOptions{}) +// for changeStream.Next(&changeDoc) { +// fmt.Printf("Change: %v\n", changeDoc) +// } +// +// if err := changeStream.Close(); err != nil { +// return err +// } +// +// If the pipeline used removes the _id field from the result, Next will error +// because the _id field is needed to resume iteration when an error occurs. +// +func (changeStream *ChangeStream) Next(result interface{}) bool { + // the err field is being constantly overwritten and we don't want the user to + // attempt to read it at this point so we lock. + changeStream.m.Lock() + + defer changeStream.m.Unlock() + + // if we are in a state of error, then don't continue. + if changeStream.err != nil { + return false + } + + if changeStream.isClosed { + changeStream.err = fmt.Errorf("illegal use of a closed ChangeStream") + return false + } + + var err error + + // attempt to fetch the change stream result. + err = changeStream.fetchResultSet(result) + if err == nil { + return true + } + + // if we get no results we return false with no errors so the user can call Next + // again, resuming is not needed as the iterator is simply timed out as no events happened. + // The user will call Timeout in order to understand if this was the case. + if err == ErrNotFound { + return false + } + + // check if the error is resumable + if !isResumableError(err) { + // error is not resumable, give up and return it to the user. + changeStream.err = err + return false + } + + // try to resume. + err = changeStream.resume() + if err != nil { + // we've not been able to successfully resume and should only try once, + // so we give up. + changeStream.err = err + return false + } + + // we've successfully resumed the changestream. + // try to fetch the next result. + err = changeStream.fetchResultSet(result) + if err != nil { + changeStream.err = err + return false + } + + return true +} + +// Err returns nil if no errors happened during iteration, or the actual +// error otherwise. +func (changeStream *ChangeStream) Err() error { + changeStream.m.Lock() + defer changeStream.m.Unlock() + return changeStream.err +} + +// Close kills the server cursor used by the iterator, if any, and returns +// nil if no errors happened during iteration, or the actual error otherwise. +func (changeStream *ChangeStream) Close() error { + changeStream.m.Lock() + defer changeStream.m.Unlock() + changeStream.isClosed = true + err := changeStream.iter.Close() + if err != nil { + changeStream.err = err + } + if changeStream.sessionCopied { + changeStream.iter.session.Close() + changeStream.sessionCopied = false + } + return err +} + +// ResumeToken returns a copy of the current resume token held by the change stream. +// This token should be treated as an opaque token that can be provided to instantiate +// a new change stream. +func (changeStream *ChangeStream) ResumeToken() *bson.Raw { + changeStream.m.Lock() + defer changeStream.m.Unlock() + if changeStream.resumeToken == nil { + return nil + } + var tokenCopy = *changeStream.resumeToken + return &tokenCopy +} + +// Timeout returns true if the last call of Next returned false because of an iterator timeout. +func (changeStream *ChangeStream) Timeout() bool { + return changeStream.iter.Timeout() +} + +func constructChangeStreamPipeline(pipeline interface{}, + options ChangeStreamOptions) interface{} { + pipelinev := reflect.ValueOf(pipeline) + + // ensure that the pipeline passed in is a slice. + if pipelinev.Kind() != reflect.Slice { + panic("pipeline argument must be a slice") + } + + // construct the options to be used by the change notification + // pipeline stage. + changeStreamStageOptions := bson.M{} + + if options.FullDocument != "" { + changeStreamStageOptions["fullDocument"] = options.FullDocument + } + if options.ResumeAfter != nil { + changeStreamStageOptions["resumeAfter"] = options.ResumeAfter + } + + changeStreamStage := bson.M{"$changeStream": changeStreamStageOptions} + + pipeOfInterfaces := make([]interface{}, pipelinev.Len()+1) + + // insert the change notification pipeline stage at the beginning of the + // aggregation. + pipeOfInterfaces[0] = changeStreamStage + + // convert the passed in slice to a slice of interfaces. + for i := 0; i < pipelinev.Len(); i++ { + pipeOfInterfaces[1+i] = pipelinev.Index(i).Addr().Interface() + } + var pipelineAsInterface interface{} = pipeOfInterfaces + return pipelineAsInterface +} + +func (changeStream *ChangeStream) resume() error { + // copy the information for the new socket. + + // Thanks to Copy() future uses will acquire a new socket against the newly selected DB. + newSession := changeStream.iter.session.Copy() + + // fetch the cursor from the iterator and use it to run a killCursors + // on the connection. + cursorId := changeStream.iter.op.cursorId + err := runKillCursorsOnSession(newSession, cursorId) + if err != nil { + return err + } + + // change out the old connection to the database with the new connection. + if changeStream.sessionCopied { + changeStream.collection.Database.Session.Close() + } + changeStream.collection.Database.Session = newSession + changeStream.sessionCopied = true + + opts := changeStream.options + if changeStream.resumeToken != nil { + opts.ResumeAfter = changeStream.resumeToken + } + // make a new pipeline containing the resume token. + changeStreamPipeline := constructChangeStreamPipeline(changeStream.pipeline, opts) + + // generate the new iterator with the new connection. + newPipe := changeStream.collection.Pipe(changeStreamPipeline) + changeStream.iter = newPipe.Iter() + if err := changeStream.iter.Err(); err != nil { + return err + } + changeStream.iter.isChangeStream = true + return nil +} + +// fetchResumeToken unmarshals the _id field from the document, setting an error +// on the changeStream if it is unable to. +func (changeStream *ChangeStream) fetchResumeToken(rawResult *bson.Raw) error { + changeStreamResult := struct { + ResumeToken *bson.Raw `bson:"_id,omitempty"` + }{} + + err := rawResult.Unmarshal(&changeStreamResult) + if err != nil { + return err + } + + if changeStreamResult.ResumeToken == nil { + return errMissingResumeToken + } + + changeStream.resumeToken = changeStreamResult.ResumeToken + return nil +} + +func (changeStream *ChangeStream) fetchResultSet(result interface{}) error { + rawResult := bson.Raw{} + + // fetch the next set of documents from the cursor. + gotNext := changeStream.iter.Next(&rawResult) + err := changeStream.iter.Err() + if err != nil { + return err + } + + if !gotNext && err == nil { + // If the iter.Err() method returns nil despite us not getting a next batch, + // it is becuase iter.Err() silences this case. + return ErrNotFound + } + + // grab the resumeToken from the results + if err := changeStream.fetchResumeToken(&rawResult); err != nil { + return err + } + + // put the raw results into the data structure the user provided. + if err := rawResult.Unmarshal(result); err != nil { + return err + } + return nil +} + +func isResumableError(err error) bool { + _, isQueryError := err.(*QueryError) + // if it is not a database error OR it is a database error, + // but the error is a notMaster error + //and is not a missingResumeToken error (caused by the user provided pipeline) + return (!isQueryError || isNotMasterError(err)) && (err != errMissingResumeToken) +} + +func runKillCursorsOnSession(session *Session, cursorId int64) error { + socket, err := session.acquireSocket(true) + if err != nil { + return err + } + err = socket.Query(&killCursorsOp{[]int64{cursorId}}) + if err != nil { + return err + } + socket.Release() + + return nil +} diff --git a/changestreams_test.go b/changestreams_test.go new file mode 100644 index 000000000..792f5d6ef --- /dev/null +++ b/changestreams_test.go @@ -0,0 +1,464 @@ +package mgo_test + +import ( + mgo "github.com/globalsign/mgo" + "github.com/globalsign/mgo/bson" + . "gopkg.in/check.v1" +) + +type updateDesc struct { + UpdatedFields map[string]interface{} `bson:"updatedFields"` + RemovedFields []string `bson:"removedFields"` +} + +type evNamespace struct { + DB string `bson:"db"` + Coll string `bson:"coll"` +} + +type changeEvent struct { + ID interface{} `bson:"_id"` + OperationType string `bson:"operationType"` + FullDocument *bson.Raw `bson:"fullDocument,omitempty"` + Ns evNamespace `bson:"ns"` + DocumentKey M `bson:"documentKey"` + UpdateDescription *updateDesc `bson:"updateDescription,omitempty"` +} + +func (s *S) TestStreamsWatch(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + coll := session.DB("mydb").C("mycoll") + //add a mock document + coll.Insert(M{"a": 0}) + + pipeline := []bson.M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{}) + c.Assert(err, IsNil) + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsInsert(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + err = coll.Insert(M{"a": 0}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //insert a new document + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 1}) + c.Assert(err, IsNil) + //get the _id for later check + type A struct { + ID bson.ObjectId `bson:"_id"` + A int `bson:"a"` + } + + //get the event + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + //check event is correct + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id) + c.Assert(ev.OperationType, Equals, "insert") + c.Assert(ev.FullDocument, NotNil) + a := A{} + err = ev.FullDocument.Unmarshal(&a) + c.Assert(err, IsNil) + c.Assert(a.A, Equals, 1) + c.Assert(ev.Ns.DB, Equals, "mydb") + c.Assert(ev.Ns.Coll, Equals, "mycoll") + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsNextNoEventTimeout(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 0}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //check we timeout correctly on no events + //we should get a false result and no error + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, false) + c.Assert(changeStream.Err(), IsNil) + c.Assert(changeStream.Timeout(), Equals, true) + + //test the same with default timeout (MaxTimeMS=1000) + //create the stream + changeStream, err = coll.Watch(pipeline, mgo.ChangeStreamOptions{}) + c.Assert(err, IsNil) + hasEvent = changeStream.Next(&ev) + c.Assert(hasEvent, Equals, false) + c.Assert(changeStream.Err(), IsNil) + c.Assert(changeStream.Timeout(), Equals, true) + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsNextTimeout(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 0}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //insert a new document to trigger an event + id = bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 1}) + c.Assert(err, IsNil) + + //ensure we get the event + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + //check we timeout correctly on no subsequent events + //we should get a false result and no error + ev = changeEvent{} + hasEvent = changeStream.Next(&ev) + c.Assert(hasEvent, Equals, false) + c.Assert(changeStream.Err(), IsNil) + c.Assert(changeStream.Timeout(), Equals, true) + + //insert a new document to trigger an event + id = bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 1}) + c.Assert(err, IsNil) + + //ensure we get the event + ev = changeEvent{} + hasEvent = changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsDelete(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 0}) + c.Assert(err, IsNil) + + //create the changeStream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //delete the document + err = coll.Remove(M{"_id": id}) + c.Assert(err, IsNil) + + //get the event + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + //check event is correct + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id) + c.Assert(ev.OperationType, Equals, "delete") + c.Assert(ev.FullDocument, IsNil) + c.Assert(ev.Ns.DB, Equals, "mydb") + c.Assert(ev.Ns.Coll, Equals, "mycoll") + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsUpdate(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 0, "toremove": 2}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //update document + err = coll.UpdateId(id, M{"$set": M{"a": 1}, "$unset": M{"toremove": ""}}) + c.Assert(err, IsNil) + + //get the event + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + //check event is correct + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id) + c.Assert(ev.OperationType, Equals, "update") + c.Assert(ev.FullDocument, IsNil) + c.Assert(len(ev.UpdateDescription.UpdatedFields), Equals, 1) + c.Assert(len(ev.UpdateDescription.RemovedFields), Equals, 1) + c.Assert(ev.UpdateDescription.UpdatedFields["a"], Equals, 1) + c.Assert(ev.UpdateDescription.RemovedFields[0], Equals, "toremove") + c.Assert(ev.Ns.DB, Equals, "mydb") + c.Assert(ev.Ns.Coll, Equals, "mycoll") + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsUpdateFullDocument(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 0, "toremove": "bla"}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500, FullDocument: mgo.UpdateLookup}) + c.Assert(err, IsNil) + + //update document + err = coll.UpdateId(id, M{"$set": M{"a": 1}, "$unset": M{"toremove": ""}}) + c.Assert(err, IsNil) + + //get the event + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, true) + + type A struct { + A int `bson:"a"` + ToRemove *string `bson:"toremove"` + } + + //check event is correct + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id) + c.Assert(ev.OperationType, Equals, "update") + c.Assert(len(ev.UpdateDescription.UpdatedFields), Equals, 1) + c.Assert(len(ev.UpdateDescription.RemovedFields), Equals, 1) + c.Assert(ev.UpdateDescription.UpdatedFields["a"], Equals, 1) + c.Assert(ev.UpdateDescription.RemovedFields[0], Equals, "toremove") + + c.Assert(ev.FullDocument, NotNil) + a := A{} + err = ev.FullDocument.Unmarshal(&a) + c.Assert(err, IsNil) + c.Assert(a.A, Equals, 1) + c.Assert(a.ToRemove, IsNil) + c.Assert(ev.Ns.DB, Equals, "mydb") + c.Assert(ev.Ns.Coll, Equals, "mycoll") + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsUpdateWithPipeline(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add two docs + id1 := bson.NewObjectId() + err = coll.Insert(M{"_id": id1, "a": 1}) + c.Assert(err, IsNil) + id2 := bson.NewObjectId() + err = coll.Insert(M{"_id": id2, "a": 2}) + c.Assert(err, IsNil) + + pipeline1 := []M{M{"$match": M{"documentKey._id": id1}}} + changeStream1, err := coll.Watch(pipeline1, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + pipeline2 := []M{M{"$match": M{"documentKey._id": id2}}} + changeStream2, err := coll.Watch(pipeline2, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //update documents + _, err = coll.UpdateAll(M{"_id": M{"$in": []bson.ObjectId{id1, id2}}}, M{"$inc": M{"a": 1}}) + c.Assert(err, IsNil) + + got1 := false + got2 := false + + //check we got the update for id1 (and no other) + for i := 0; i < 2; i++ { + ev := changeEvent{} + hasEvent := changeStream1.Next(&ev) + //we will accept only one event, the one that corresponds to our id1 + c.Assert(got1 && hasEvent, Equals, false) + if hasEvent { + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id1) + got1 = true + } + } + c.Assert(got1, Equals, true) + + //check we got the update for id2 (and no other) + for i := 0; i < 2; i++ { + ev := changeEvent{} + hasEvent := changeStream2.Next(&ev) + //we will accept only one event, the one that corresponds to our id2 + c.Assert(got2 && hasEvent, Equals, false) + if hasEvent { + oid := ev.DocumentKey["_id"].(bson.ObjectId) + c.Assert(oid, Equals, id2) + got2 = true + } + } + c.Assert(got2, Equals, true) + + err = changeStream1.Close() + c.Assert(err, IsNil) + err = changeStream2.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsResumeTokenMissingError(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + err = coll.Insert(M{"a": 0}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{{"$project": M{"_id": 0}}} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //insert a new document + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 1}) + c.Assert(err, IsNil) + + //check we get the correct error + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, false) + c.Assert(changeStream.Err().Error(), Equals, "resume token missing from result") + + err = changeStream.Close() + c.Assert(err, IsNil) +} + +func (s *S) TestStreamsClosedStreamError(c *C) { + if !s.versionAtLeast(3, 6) { + c.Skip("ChangeStreams only work on 3.6+") + } + session, err := mgo.Dial("localhost:40011") + c.Assert(err, IsNil) + defer session.Close() + + coll := session.DB("mydb").C("mycoll") + + //add a mock document in order for the DB to be created + err = coll.Insert(M{"a": 0}) + c.Assert(err, IsNil) + + //create the stream + pipeline := []M{{"$project": M{"_id": 0}}} + changeStream, err := coll.Watch(pipeline, mgo.ChangeStreamOptions{MaxAwaitTimeMS: 1500}) + c.Assert(err, IsNil) + + //insert a new document + id := bson.NewObjectId() + err = coll.Insert(M{"_id": id, "a": 1}) + c.Assert(err, IsNil) + + err = changeStream.Close() + c.Assert(err, IsNil) + + //check we get the correct error + ev := changeEvent{} + hasEvent := changeStream.Next(&ev) + c.Assert(hasEvent, Equals, false) + c.Assert(changeStream.Err().Error(), Equals, "illegal use of a closed ChangeStream") +} diff --git a/cluster.go b/cluster.go index 087da61e5..ac461d5b9 100644 --- a/cluster.go +++ b/cluster.go @@ -682,6 +682,7 @@ func (cluster *mongoCluster) AcquireSocket(mode Mode, slaveOk bool, syncTimeout time.Sleep(100 * time.Millisecond) continue } else { + // We've managed to successfully reconnect to the master, we are no longer abnormaly ended server.Lock() server.abended = false server.Unlock() diff --git a/export_test.go b/export_test.go index 690f84d38..998c7a2dd 100644 --- a/export_test.go +++ b/export_test.go @@ -1,6 +1,7 @@ package mgo import ( + "net" "time" ) @@ -31,3 +32,15 @@ func HackSyncSocketTimeout(newTimeout time.Duration) (restore func()) { syncSocketTimeout = newTimeout return } + +func (s *Session) Cluster() *mongoCluster { + return s.cluster() +} + +func (cluster *mongoCluster) Server(addr string) *mongoServer { + tcpaddr, err := net.ResolveTCPAddr("tcp", addr) + if err != nil { + panic(err) + } + return cluster.server(addr, tcpaddr) +} diff --git a/harness/daemons/.env b/harness/daemons/.env index 7ba8cf599..70acb5b92 100644 --- a/harness/daemons/.env +++ b/harness/daemons/.env @@ -59,7 +59,16 @@ if versionAtLeast 3 2; then COMMONDOPTS="$(echo "$COMMONDOPTS" | sed '/--nohttpinterface/d')" COMMONCOPTS="$(echo "$COMMONCOPTS" | sed '/--nohttpinterface/d')" - # config server need to be started as replica set + + if versionAtLeast 3 6; then + #In version 3.6 --nojournal is deprecated for replica set members using WiredTiger + COMMONDOPTSNOIP="$(echo "$COMMONDOPTSNOIP" | sed '/--nojournal/d')" + COMMONDOPTS="$(echo "$COMMONDOPTS" | sed '/--nojournal/d')" + COMMONCOPTS="$(echo "$COMMONCOPTS" | sed '/--nojournal/d')" + fi + + # config server need to be started as replica set + CFG1OPTS="--replSet conf1" CFG2OPTS="--replSet conf2" CFG3OPTS="--replSet conf3" diff --git a/harness/setup.sh b/harness/setup.sh index e5db78a78..25ba562ec 100755 --- a/harness/setup.sh +++ b/harness/setup.sh @@ -30,7 +30,7 @@ start() { UP=$(svstat daemons/* | grep ' up ' | grep -v ' [0-3] seconds' | wc -l) echo "$UP processes up..." if [ x$COUNT = x$UP ]; then - echo "Running setup.js with mongo..." + echo "Running init.js with mongo..." mongo --nodb ../harness/mongojs/init.js exit 0 fi diff --git a/server_test.go b/server_test.go new file mode 100644 index 000000000..1d21ef08b --- /dev/null +++ b/server_test.go @@ -0,0 +1,64 @@ +// mgo - MongoDB driver for Go +// +// Copyright (c) 2018 Canonical Ltd +// +// All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this +// list of conditions and the following disclaimer. +// 2. Redistributions in binary form must reproduce the above copyright notice, +// this list of conditions and the following disclaimer in the documentation +// and/or other materials provided with the distribution. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR +// ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES +// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; +// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND +// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS +// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package mgo_test + +import ( + "time" + + . "gopkg.in/check.v1" + "github.com/globalsign/mgo" +) + +func (s *S) TestServerRecoversFromAbend(c *C) { + session, err := mgo.Dial("localhost:40001") + c.Assert(err, IsNil) + defer session.Close() + // Peek behind the scenes + cluster := session.Cluster() + server := cluster.Server("127.0.0.1:40001") + sock, abended, err := server.AcquireSocket(100, time.Second) + c.Assert(err, IsNil) + c.Assert(sock, NotNil) + sock.Release() + c.Check(abended, Equals, false) + // Forcefully abend this socket + sock.Close() + server.AbendSocket(sock) + // Next acquire notices the connection was abnormally ended + sock, abended, err = server.AcquireSocket(100, time.Second) + c.Assert(err, IsNil) + sock.Release() + c.Check(abended, Equals, true) + // cluster.AcquireSocket should fix the abended problems + sock, err = cluster.AcquireSocket(mgo.Primary, false, time.Minute, time.Second, nil, 100) + c.Assert(err, IsNil) + sock.Release() + sock, abended, err = server.AcquireSocket(100, time.Second) + c.Assert(err, IsNil) + c.Check(abended, Equals, false) + sock.Release() +} diff --git a/session.go b/session.go index b62707c84..561f79487 100644 --- a/session.go +++ b/session.go @@ -169,7 +169,9 @@ type Iter struct { timeout time.Duration limit int32 timedout bool - findCmd bool + isFindCmd bool + isChangeStream bool + maxTimeMS int64 } var ( @@ -1117,6 +1119,11 @@ func isAuthError(err error) bool { return ok && e.Code == 13 } +func isNotMasterError(err error) bool { + e, ok := err.(*QueryError) + return ok && strings.Contains(e.Message, "not master") +} + func (db *Database) runUserCmd(cmdName string, user *User) error { cmd := make(bson.D, 0, 16) cmd = append(cmd, bson.DocElem{Name: cmdName, Value: user.Username}) @@ -2423,6 +2430,7 @@ type Pipe struct { pipeline interface{} allowDisk bool batchSize int + maxTimeMS int64 } type pipeCmd struct { @@ -2431,6 +2439,7 @@ type pipeCmd struct { Cursor *pipeCmdCursor `bson:",omitempty"` Explain bool `bson:",omitempty"` AllowDisk bool `bson:"allowDiskUse,omitempty"` + MaxTimeMS int64 `bson:"maxTimeMS,omitempty"` } type pipeCmdCursor struct { @@ -2485,6 +2494,9 @@ func (p *Pipe) Iter() *Iter { AllowDisk: p.allowDisk, Cursor: &pipeCmdCursor{p.batchSize}, } + if p.maxTimeMS > 0 { + cmd.MaxTimeMS = p.maxTimeMS + } err := c.Database.Run(cmd, &result) if e, ok := err.(*QueryError); ok && e.Message == `unrecognized field "cursor` { cmd.Cursor = nil @@ -2495,7 +2507,11 @@ func (p *Pipe) Iter() *Iter { if firstBatch == nil { firstBatch = result.Cursor.FirstBatch } - return c.NewIter(p.session, firstBatch, result.Cursor.Id, err) + it := c.NewIter(p.session, firstBatch, result.Cursor.Id, err) + if p.maxTimeMS > 0 { + it.maxTimeMS = p.maxTimeMS + } + return it } // NewIter returns a newly created iterator with the provided parameters. Using @@ -2557,7 +2573,7 @@ func (c *Collection) NewIter(session *Session, firstBatch []bson.Raw, cursorId i } if socket.ServerInfo().MaxWireVersion >= 4 && c.FullName != "admin.$cmd" { - iter.findCmd = true + iter.isFindCmd = true } iter.gotReply.L = &iter.m @@ -2659,6 +2675,13 @@ func (p *Pipe) Batch(n int) *Pipe { return p } +// SetMaxTime sets the maximum amount of time to allow the query to run. +// +func (p *Pipe) SetMaxTime(d time.Duration) *Pipe { + p.maxTimeMS = int64(d / time.Millisecond) + return p +} + // LastError the error status of the preceding write operation on the current connection. // // Relevant documentation: @@ -3801,7 +3824,7 @@ func (q *Query) Iter() *Iter { op.replyFunc = iter.op.replyFunc if prepareFindOp(socket, &op, limit) { - iter.findCmd = true + iter.isFindCmd = true } iter.server = socket.Server() @@ -4015,7 +4038,8 @@ func (iter *Iter) Timeout() bool { // Next returns true if a document was successfully unmarshalled onto result, // and false at the end of the result set or if an error happened. // When Next returns false, the Err method should be called to verify if -// there was an error during iteration. +// there was an error during iteration, and the Timeout method to verify if the +// false return value was caused by a timeout (no available results). // // For example: // @@ -4031,7 +4055,16 @@ func (iter *Iter) Next(result interface{}) bool { iter.m.Lock() iter.timedout = false timeout := time.Time{} + // for a ChangeStream iterator we have to call getMore before the loop otherwise + // we'll always return false + if iter.isChangeStream { + iter.getMore() + } + // check should we expect more data. for iter.err == nil && iter.docData.Len() == 0 && (iter.docsToReceive > 0 || iter.op.cursorId != 0) { + // we should expect more data. + + // If we have yet to receive data, increment the timer until we timeout. if iter.docsToReceive == 0 { if iter.timeout >= 0 { if timeout.IsZero() { @@ -4043,6 +4076,13 @@ func (iter *Iter) Next(result interface{}) bool { return false } } + // for a ChangeStream one loop i enought to declare the timeout + if iter.isChangeStream { + iter.timedout = true + iter.m.Unlock() + return false + } + // run a getmore to fetch more data. iter.getMore() if iter.err != nil { break @@ -4050,7 +4090,7 @@ func (iter *Iter) Next(result interface{}) bool { } iter.gotReply.Wait() } - + // We have data from the getMore. // Exhaust available data before reporting any errors. if docData, ok := iter.docData.Pop().([]byte); ok { close := false @@ -4066,6 +4106,7 @@ func (iter *Iter) Next(result interface{}) bool { } } if iter.op.cursorId != 0 && iter.err == nil { + // we still have a live cursor and currently expect data. iter.docsBeforeMore-- if iter.docsBeforeMore == -1 { iter.getMore() @@ -4255,7 +4296,7 @@ func (iter *Iter) getMore() { } } var op interface{} - if iter.findCmd { + if iter.isFindCmd || iter.isChangeStream { op = iter.getMoreCmd() } else { op = &iter.op @@ -4278,6 +4319,9 @@ func (iter *Iter) getMoreCmd() *queryOp { Collection: iter.op.collection[nameDot+1:], BatchSize: iter.op.limit, } + if iter.maxTimeMS > 0 { + getMore.MaxTimeMS = iter.maxTimeMS + } var op queryOp op.collection = iter.op.collection[:nameDot] + ".$cmd" @@ -4882,7 +4926,7 @@ func (iter *Iter) replyFunc() replyFunc { } else { iter.err = ErrNotFound } - } else if iter.findCmd { + } else if iter.isFindCmd { debugf("Iter %p received reply document %d/%d (cursor=%d)", iter, docNum+1, int(op.replyDocs), op.cursorId) var findReply struct { Ok bool @@ -4894,7 +4938,7 @@ func (iter *Iter) replyFunc() replyFunc { iter.err = err } else if !findReply.Ok && findReply.Errmsg != "" { iter.err = &QueryError{Code: findReply.Code, Message: findReply.Errmsg} - } else if len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 { + } else if !iter.isChangeStream && len(findReply.Cursor.FirstBatch) == 0 && len(findReply.Cursor.NextBatch) == 0 { iter.err = ErrNotFound } else { batch := findReply.Cursor.FirstBatch @@ -5262,7 +5306,7 @@ func getRFC2253NameString(RDNElements *pkix.RDNSequence) string { var replacer = strings.NewReplacer(",", "\\,", "=", "\\=", "+", "\\+", "<", "\\<", ">", "\\>", ";", "\\;") //The elements in the sequence needs to be reversed when converting them for i := len(*RDNElements) - 1; i >= 0; i-- { - var nameAndValueList = make([]string,len((*RDNElements)[i])) + var nameAndValueList = make([]string, len((*RDNElements)[i])) for j, attribute := range (*RDNElements)[i] { var shortAttributeName = rdnOIDToShortName(attribute.Type) if len(shortAttributeName) <= 0 { diff --git a/txn/flusher.go b/txn/flusher.go index 3d1882d7f..5d1c1bdd8 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -759,7 +759,7 @@ func (f *flusher) checkpoint(t *transaction, revnos []int64) error { f.debugf("Ready to apply %s. Saving revnos %v: LOST RACE", t, debugRevnos) return f.reload(t) } - return nil + return err } func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) error {