diff --git a/txn/flusher.go b/txn/flusher.go index 2a71b16a9..adc7ebff0 100644 --- a/txn/flusher.go +++ b/txn/flusher.go @@ -708,7 +708,7 @@ func (f *flusher) abortOrReload(t *transaction, revnos []int64, pull map[bson.Ob } seen[dkey] = true - pullAll, _ := tokensToPull(f.queue[dkey], pull, "") + pullAll := tokensToPull(f.queue[dkey], pull, "") if len(pullAll) == 0 { continue } @@ -803,7 +803,7 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err qdoc[1].Value = bson.D{{Name: "$exists", Value: false}} } - pullAll, pulledQueue := tokensToPull(dqueue, pull, tt) + pullAll := tokensToPull(dqueue, pull, tt) var d bson.D var outcome string @@ -857,12 +857,13 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err updated := false if !hasToken(stash.Queue, tt) { var set, unset bson.D + cleanedQueue := cleanQueue(info.Queue, pull, tt) if revno == 0 { // Missing revno in stash means -1. - set = bson.D{{Name: "txn-queue", Value: pulledQueue}} + set = bson.D{{Name: "txn-queue", Value: cleanedQueue}} unset = bson.D{{Name: "n", Value: 1}, {Name: "txn-revno", Value: 1}} } else { - set = bson.D{{Name: "txn-queue", Value: pulledQueue}, {Name: "txn-revno", Value: newRevno}} + set = bson.D{{Name: "txn-queue", Value: cleanedQueue}, {Name: "txn-revno", Value: newRevno}} unset = bson.D{{Name: "n", Value: 1}} } qdoc := bson.D{{Name: "_id", Value: dkey}, {Name: "n", Value: nonce}} @@ -898,13 +899,11 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err var info txnInfo if _, err = f.sc.Find(qdoc).Apply(change, &info); err == nil { f.debugf("Stash for document %v has revno %d and queue: %v", dkey, info.Revno, info.Queue) - // Note(jam): 2018-11-19 for now, we aren't bothering to go through txn-queue and $pullAll the - // tokens that have been marked. As long as *one* of Insert&Remove cleanup the queue, it won't get - // out of hand (you can't Insert again unless you've Removed). + cleanedQueue := cleanQueue(info.Queue, pull, tt) d = setInDoc(d, bson.D{ {Name: "_id", Value: op.Id}, {Name: "txn-revno", Value: newRevno}, - {Name: "txn-queue", Value: info.Queue}, + {Name: "txn-queue", Value: cleanedQueue}, }) // Unlikely yet unfortunate race in here if this gets seriously // delayed. If someone inserts+removes meanwhile, this will @@ -989,24 +988,37 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err return nil } -func tokensToPull(dqueue []tokenAndId, pull map[bson.ObjectId]*transaction, dontPull token) ([]token, []token) { - var pullAll []token - var pulledQueue []token +func tokensToPull(dqueue []tokenAndId, pull map[bson.ObjectId]*transaction, dontPull token) []token { + var result []token for j := len(dqueue) - 1; j >= 0; j-- { dtt := dqueue[j] if dtt.tt == dontPull { - pulledQueue = append(pulledQueue, dtt.tt) continue } if _, ok := pull[dtt.Id()]; ok { // It was handled before and this is a leftover invalid // nonce in the queue. Cherry-pick it out. - pullAll = append(pullAll, dtt.tt) - } else { - pulledQueue = append(pulledQueue, dtt.tt) + result = append(result, dtt.tt) + } + } + return result +} + +// cleanQueue takes an existing queue and removes all the items in pull from it, generating a new txn-queue. +func cleanQueue(queue []token, pull map[bson.ObjectId]*transaction, dontPull token) ([]token) { + cleaned := make([]token, 0, len(queue)) + for _, tt := range queue { + if tt == dontPull { + cleaned = append(cleaned, tt) + continue + } + txnId := tt.id() + if _, ok := pull[txnId]; !ok { + // shouldn't be pulled, so add it to the cleaned queue + cleaned = append(cleaned, tt) } } - return pullAll, pulledQueue + return cleaned } func objToDoc(obj interface{}) (d bson.D, err error) { diff --git a/txn/txn_test.go b/txn/txn_test.go index 1c25302b4..6f92a0ba7 100644 --- a/txn/txn_test.go +++ b/txn/txn_test.go @@ -1098,7 +1098,6 @@ func (s *S) TestTxnQueueAddAndRemove(c *C) { c.Assert(err, IsNil) err = s.accounts.FindId(0).One(&qdoc) c.Assert(err, IsNil) - // Remove prunes the txn-queue but Insert currently does not, so we end up with the last Remove - // and the last Insert in the document. - c.Check(len(qdoc.Queue), Equals, 2) + // Both Remove and Insert should prune all the completed transactions + c.Check(len(qdoc.Queue), Equals, 1) }