Skip to content

Commit

Permalink
Proper fix, including on Insert.
Browse files Browse the repository at this point in the history
It turns out that I missed we had just reread the document as part of
the 'add txn-insert' or 'add txn-remove' to the document. That gives us
an updated txn-queue that needs to be handled. Since we are now cleaning
directly on the txn-queue instead of the cached queue docs, go ahead and
do it on both Insert and Remove.
  • Loading branch information
jameinel committed Nov 19, 2018
1 parent 8141feb commit 4485ffa
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 19 deletions.
44 changes: 28 additions & 16 deletions txn/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -708,7 +708,7 @@ func (f *flusher) abortOrReload(t *transaction, revnos []int64, pull map[bson.Ob
}
seen[dkey] = true

pullAll, _ := tokensToPull(f.queue[dkey], pull, "")
pullAll := tokensToPull(f.queue[dkey], pull, "")
if len(pullAll) == 0 {
continue
}
Expand Down Expand Up @@ -803,7 +803,7 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err
qdoc[1].Value = bson.D{{Name: "$exists", Value: false}}
}

pullAll, pulledQueue := tokensToPull(dqueue, pull, tt)
pullAll := tokensToPull(dqueue, pull, tt)

var d bson.D
var outcome string
Expand Down Expand Up @@ -857,12 +857,13 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err
updated := false
if !hasToken(stash.Queue, tt) {
var set, unset bson.D
cleanedQueue := cleanQueue(info.Queue, pull, tt)
if revno == 0 {
// Missing revno in stash means -1.
set = bson.D{{Name: "txn-queue", Value: pulledQueue}}
set = bson.D{{Name: "txn-queue", Value: cleanedQueue}}
unset = bson.D{{Name: "n", Value: 1}, {Name: "txn-revno", Value: 1}}
} else {
set = bson.D{{Name: "txn-queue", Value: pulledQueue}, {Name: "txn-revno", Value: newRevno}}
set = bson.D{{Name: "txn-queue", Value: cleanedQueue}, {Name: "txn-revno", Value: newRevno}}
unset = bson.D{{Name: "n", Value: 1}}
}
qdoc := bson.D{{Name: "_id", Value: dkey}, {Name: "n", Value: nonce}}
Expand Down Expand Up @@ -898,13 +899,11 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err
var info txnInfo
if _, err = f.sc.Find(qdoc).Apply(change, &info); err == nil {
f.debugf("Stash for document %v has revno %d and queue: %v", dkey, info.Revno, info.Queue)
// Note(jam): 2018-11-19 for now, we aren't bothering to go through txn-queue and $pullAll the
// tokens that have been marked. As long as *one* of Insert&Remove cleanup the queue, it won't get
// out of hand (you can't Insert again unless you've Removed).
cleanedQueue := cleanQueue(info.Queue, pull, tt)
d = setInDoc(d, bson.D{
{Name: "_id", Value: op.Id},
{Name: "txn-revno", Value: newRevno},
{Name: "txn-queue", Value: info.Queue},
{Name: "txn-queue", Value: cleanedQueue},
})
// Unlikely yet unfortunate race in here if this gets seriously
// delayed. If someone inserts+removes meanwhile, this will
Expand Down Expand Up @@ -989,24 +988,37 @@ func (f *flusher) apply(t *transaction, pull map[bson.ObjectId]*transaction) err
return nil
}

func tokensToPull(dqueue []tokenAndId, pull map[bson.ObjectId]*transaction, dontPull token) ([]token, []token) {
var pullAll []token
var pulledQueue []token
func tokensToPull(dqueue []tokenAndId, pull map[bson.ObjectId]*transaction, dontPull token) []token {
var result []token
for j := len(dqueue) - 1; j >= 0; j-- {
dtt := dqueue[j]
if dtt.tt == dontPull {
pulledQueue = append(pulledQueue, dtt.tt)
continue
}
if _, ok := pull[dtt.Id()]; ok {
// It was handled before and this is a leftover invalid
// nonce in the queue. Cherry-pick it out.
pullAll = append(pullAll, dtt.tt)
} else {
pulledQueue = append(pulledQueue, dtt.tt)
result = append(result, dtt.tt)
}
}
return result
}

// cleanQueue takes an existing queue and removes all the items in pull from it, generating a new txn-queue.
func cleanQueue(queue []token, pull map[bson.ObjectId]*transaction, dontPull token) ([]token) {
cleaned := make([]token, 0, len(queue))
for _, tt := range queue {
if tt == dontPull {
cleaned = append(cleaned, tt)
continue
}
txnId := tt.id()
if _, ok := pull[txnId]; !ok {
// shouldn't be pulled, so add it to the cleaned queue
cleaned = append(cleaned, tt)
}
}
return pullAll, pulledQueue
return cleaned
}

func objToDoc(obj interface{}) (d bson.D, err error) {
Expand Down
5 changes: 2 additions & 3 deletions txn/txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1098,7 +1098,6 @@ func (s *S) TestTxnQueueAddAndRemove(c *C) {
c.Assert(err, IsNil)
err = s.accounts.FindId(0).One(&qdoc)
c.Assert(err, IsNil)
// Remove prunes the txn-queue but Insert currently does not, so we end up with the last Remove
// and the last Insert in the document.
c.Check(len(qdoc.Queue), Equals, 2)
// Both Remove and Insert should prune all the completed transactions
c.Check(len(qdoc.Queue), Equals, 1)
}

0 comments on commit 4485ffa

Please sign in to comment.