Skip to content

Commit

Permalink
Make the initial synchronization of a sharing faster (#4147)
Browse files Browse the repository at this point in the history
  • Loading branch information
nono authored Oct 10, 2023
2 parents 10923c7 + 22eb86f commit 843f8d4
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 93 deletions.
17 changes: 15 additions & 2 deletions model/sharing/replicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ const MaxRetries = 5
// (each next retry will wait 4 times longer than its previous retry)
const InitialBackoffPeriod = 1 * time.Minute

// BatchSize is the maximal number of documents mainpulated at once by the
// BatchSize is the maximal number of documents manipulated at once by the
// replicator
const BatchSize = 100
const BatchSize = 400

// ReplicateMsg is used for jobs on the share-replicate worker.
type ReplicateMsg struct {
Expand Down Expand Up @@ -143,6 +143,19 @@ func (s *Sharing) retryWorker(inst *instance.Instance, worker string, errors int
}
}

func (s *Sharing) InitialReplication(inst *instance.Instance, m *Member) error {
for i := 0; i < 1000; i++ {
pending, err := s.ReplicateTo(inst, m, true)
if err != nil {
return err
}
if !pending {
return nil
}
}
return ErrInternalServerError
}

// ReplicateTo starts a replicator on this sharing to the given member.
// http://docs.couchdb.org/en/stable/replication/protocol.html
// https://github.com/pouchdb/pouchdb/blob/master/packages/node_modules/pouchdb-replication/src/replicate.js
Expand Down
16 changes: 8 additions & 8 deletions model/sharing/replicator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestReplicator(t *testing.T) {
assert.Equal(t, feed.Seq, seq3)
})

t.Run("InitialCopy", func(t *testing.T) {
t.Run("InitialIndex", func(t *testing.T) {
// Start with an empty io.cozy.shared database
_ = couchdb.DeleteDB(inst, consts.Shared)
if err := couchdb.CreateDB(inst, consts.Shared); err != nil {
Expand All @@ -102,7 +102,7 @@ func TestReplicator(t *testing.T) {
Values: []string{settingsDocID},
Local: true,
})
assert.NoError(t, s.InitialCopy(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
assert.NoError(t, s.InitialIndex(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
nbShared := 0
assertNbSharedRef(t, inst, nbShared)

Expand All @@ -114,7 +114,7 @@ func TestReplicator(t *testing.T) {
DocType: testDoctype,
Values: []string{oneID},
})
assert.NoError(t, s.InitialCopy(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
assert.NoError(t, s.InitialIndex(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
nbShared++
assertNbSharedRef(t, inst, nbShared)
oneRef := getSharedRef(t, inst, testDoctype, oneID)
Expand All @@ -135,7 +135,7 @@ func TestReplicator(t *testing.T) {
Selector: "foo",
Values: []string{"bar"},
})
assert.NoError(t, s.InitialCopy(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
assert.NoError(t, s.InitialIndex(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
nbShared += len(twoIDs)
assertNbSharedRef(t, inst, nbShared)
for _, id := range twoIDs {
Expand All @@ -160,7 +160,7 @@ func TestReplicator(t *testing.T) {
Selector: "foo",
Values: []string{"qux", "quux", "quuux"},
})
assert.NoError(t, s.InitialCopy(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
assert.NoError(t, s.InitialIndex(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1))
nbShared += len(threeIDs)
assertNbSharedRef(t, inst, nbShared)
for _, id := range threeIDs {
Expand All @@ -172,7 +172,7 @@ func TestReplicator(t *testing.T) {

// Another member accepts the sharing
for r, rule := range s.Rules {
assert.NoError(t, s.InitialCopy(inst, rule, r))
assert.NoError(t, s.InitialIndex(inst, rule, r))
}
assertNbSharedRef(t, inst, nbShared)

Expand All @@ -189,7 +189,7 @@ func TestReplicator(t *testing.T) {

// A third member accepts the sharing
for r, rule := range s.Rules {
assert.NoError(t, s.InitialCopy(inst, rule, r))
assert.NoError(t, s.InitialIndex(inst, rule, r))
}
nbShared++
assertNbSharedRef(t, inst, nbShared)
Expand All @@ -212,7 +212,7 @@ func TestReplicator(t *testing.T) {
Selector: "foo",
Values: []string{"qux", "quux", "quuux"},
})
assert.NoError(t, s2.InitialCopy(inst, s2.Rules[len(s2.Rules)-1], len(s2.Rules)-1))
assert.NoError(t, s2.InitialIndex(inst, s2.Rules[len(s2.Rules)-1], len(s2.Rules)-1))
assertNbSharedRef(t, inst, nbShared)
for _, id := range threeIDs {
threeRef := getSharedRef(t, inst, testDoctype, id)
Expand Down
16 changes: 8 additions & 8 deletions model/sharing/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func (s *Sharing) Setup(inst *instance.Instance, m *Member) {
}
if s.Triggers.ReplicateID == "" {
for i, rule := range s.Rules {
if err := s.InitialCopy(inst, rule, i); err != nil {
if err := s.InitialIndex(inst, rule, i); err != nil {
inst.Logger().Warnf("Error on initial copy for %s (%s): %s", rule.Title, s.SID, err)
}
}
Expand All @@ -119,14 +119,14 @@ func (s *Sharing) Setup(inst *instance.Instance, m *Member) {
inst.Logger().WithNamespace("sharing").
Warnf("Error on setup replicate trigger (%s): %s", s.SID, err)
}
if pending, err := s.ReplicateTo(inst, m, true); err != nil {
if err := s.InitialReplication(inst, m); err != nil {
inst.Logger().WithNamespace("sharing").
Warnf("Error on initial replication (%s): %s", s.SID, err)
s.retryWorker(inst, "share-replicate", 0)
} else {
if pending {
s.pushJob(inst, "share-replicate")
if s.FirstFilesRule() != nil {
s.retryWorker(inst, "share-upload", 1) // 1, so that it will start after share-replicate
}
} else {
if s.FirstFilesRule() == nil {
return
}
Expand All @@ -141,7 +141,7 @@ func (s *Sharing) Setup(inst *instance.Instance, m *Member) {
}
}

go s.NotifyRecipients(inst, m)
s.NotifyRecipients(inst, m)
}

// AddTrackTriggers creates the share-track triggers for each rule of the
Expand Down Expand Up @@ -208,9 +208,9 @@ func (s *Sharing) AddReplicateTrigger(inst *instance.Instance) error {
return couchdb.UpdateDoc(inst, s)
}

// InitialCopy lists the shared documents and put a reference in the
// InitialIndex lists the shared documents and put a reference in the
// io.cozy.shared database
func (s *Sharing) InitialCopy(inst *instance.Instance, rule Rule, r int) error {
func (s *Sharing) InitialIndex(inst *instance.Instance, rule Rule, r int) error {
if rule.Local || len(rule.Values) == 0 {
return nil
}
Expand Down
164 changes: 89 additions & 75 deletions model/sharing/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sharing

import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
Expand All @@ -18,8 +19,8 @@ import (
"github.com/cozy/cozy-stack/pkg/consts"
"github.com/cozy/cozy-stack/pkg/couchdb"
"github.com/cozy/cozy-stack/pkg/realtime"
multierror "github.com/hashicorp/go-multierror"
"github.com/labstack/echo/v4"
"golang.org/x/sync/errgroup"
)

// UploadMsg is used for jobs on the share-upload worker.
Expand Down Expand Up @@ -52,25 +53,27 @@ func (s *Sharing) Upload(inst *instance.Instance, errors int) error {
}

lastTry := errors+1 == MaxRetries
for i := 0; i < BatchSize; i++ {
if len(members) == 0 {
break
}
m := members[0]
members = members[1:]
more, err := s.UploadTo(inst, m, lastTry)
if err != nil {
errm = multierror.Append(errm, err)
}
if more {
members = append(members, m)
}
done := true
g, _ := errgroup.WithContext(context.Background())
for i := range members {
m := members[i]
g.Go(func() error {
more, err := s.UploadBatchTo(inst, m, lastTry)
if err != nil {
return err
}
if more {
done = false
}
return nil
})
}
err := g.Wait()

if errm != nil {
if err != nil {
s.retryWorker(inst, "share-upload", errors)
inst.Logger().WithNamespace("upload").Infof("errm=%s\n", errm)
} else if len(members) > 0 {
inst.Logger().WithNamespace("upload").Infof("err=%s\n", err)
} else if !done {
s.pushJob(inst, "share-upload")
}
return errm
Expand All @@ -84,14 +87,12 @@ func (s *Sharing) InitialUpload(inst *instance.Instance, m *Member) error {
}
defer mu.Unlock()

for i := 0; i < BatchSize; i++ {
more, err := s.UploadTo(inst, m, false)
if err != nil {
return err
}
if !more {
return s.sendInitialEndNotif(inst, m)
}
more, err := s.UploadBatchTo(inst, m, false)
if err != nil {
return err
}
if !more {
return s.sendInitialEndNotif(inst, m)
}

s.pushJob(inst, "share-upload")
Expand Down Expand Up @@ -126,9 +127,9 @@ func (s *Sharing) sendInitialEndNotif(inst *instance.Instance, m *Member) error
return nil
}

// UploadTo uploads one file to the given member. It returns false if there
// are no more files to upload to this member currently.
func (s *Sharing) UploadTo(inst *instance.Instance, m *Member, lastTry bool) (bool, error) {
// UploadBatchTo uploads a batch of files to the given member. It returns false
// if there are no more files to upload to this member currently.
func (s *Sharing) UploadBatchTo(inst *instance.Instance, m *Member, lastTry bool) (bool, error) {
if m.Instance == "" {
return false, ErrInvalidURL
}
Expand All @@ -143,58 +144,71 @@ func (s *Sharing) UploadTo(inst *instance.Instance, m *Member, lastTry bool) (bo
}
inst.Logger().WithNamespace("upload").Debugf("lastSeq = %s", lastSeq)

file, ruleIndex, seq, err := s.findNextFileToUpload(inst, lastSeq)
if errors.Is(err, ErrInternalServerError) {
// Retrying is useless in this case, let's skip this file
if seq != lastSeq {
_ = s.UpdateLastSequenceNumber(inst, m, "upload", seq)
}
return false, nil
batch := &batchUpload{
Sharing: s,
Instance: inst,
Seq: lastSeq,
}
if err != nil {
return false, err
}
if file == nil {
if seq != lastSeq {
err = s.UpdateLastSequenceNumber(inst, m, "upload", seq)
defer func() {
if batch.Seq != lastSeq {
_ = s.UpdateLastSequenceNumber(inst, m, "upload", batch.Seq)
}
return false, err
}
}()

if err = s.uploadFile(inst, m, file, ruleIndex); err != nil {
if lastTry {
_ = s.UpdateLastSequenceNumber(inst, m, "upload", seq)
for i := 0; i < BatchSize; i++ {
file, ruleIndex, err := batch.findNextFileToUpload()
if err != nil {
return false, err
}
if file == nil {
return false, nil
}
if err = s.uploadFile(inst, m, file, ruleIndex); err != nil {
return false, err
}
return false, err
}
return true, nil
}

type batchUpload struct {
Sharing *Sharing
Instance *instance.Instance
Seq string

return true, s.UpdateLastSequenceNumber(inst, m, "upload", seq)
// changes is used to batch calls to the changes feed and improves
// performances.
changes []couchdb.Change
}

// findNextFileToUpload uses the changes feed to find the next file that needs
// to be uploaded. It returns a file document if there is one file to upload,
// and the sequence number where it is in the changes feed.
func (s *Sharing) findNextFileToUpload(inst *instance.Instance, since string) (map[string]interface{}, int, string, error) {
// and the index of the sharing rule that applies to this file.
func (b *batchUpload) findNextFileToUpload() (map[string]interface{}, int, error) {
for {
response, err := couchdb.GetChanges(inst, &couchdb.ChangesRequest{
DocType: consts.Shared,
IncludeDocs: true,
Since: since,
Limit: 1,
})
if err != nil {
return nil, 0, since, err
}
since = response.LastSeq
if len(response.Results) == 0 {
break
seq := b.Seq
if len(b.changes) == 0 {
response, err := couchdb.GetChanges(b.Instance, &couchdb.ChangesRequest{
DocType: consts.Shared,
IncludeDocs: true,
Since: seq,
Limit: BatchSize,
})
if err != nil {
return nil, 0, err
}
if len(response.Results) == 0 {
return nil, 0, nil
}
b.changes = response.Results
}
r := response.Results[0]
infos, ok := r.Doc.Get("infos").(map[string]interface{})
change := b.changes[0]
b.changes = b.changes[1:]
b.Seq = change.Seq
infos, ok := change.Doc.Get("infos").(map[string]interface{})
if !ok {
continue
}
info, ok := infos[s.SID].(map[string]interface{})
info, ok := infos[b.Sharing.SID].(map[string]interface{})
if !ok {
continue
}
Expand All @@ -208,30 +222,30 @@ func (s *Sharing) findNextFileToUpload(inst *instance.Instance, since string) (m
if !ok {
continue
}
rev := extractLastRevision(r.Doc)
rev := extractLastRevision(change.Doc)
if rev == "" {
continue
}
docID := strings.SplitN(r.DocID, "/", 2)[1]
docID := strings.SplitN(change.DocID, "/", 2)[1]
ir := couchdb.IDRev{ID: docID, Rev: rev}
query := []couchdb.IDRev{ir}
results, err := couchdb.BulkGetDocs(inst, consts.Files, query)
results, err := couchdb.BulkGetDocs(b.Instance, consts.Files, query)
if err != nil {
return nil, 0, since, err
b.Seq = seq
return nil, 0, err
}
if len(results) == 0 {
inst.Logger().WithNamespace("upload").
b.Instance.Logger().WithNamespace("upload").
Warnf("missing results for bulk get %v", query)
return nil, 0, since, ErrInternalServerError
return nil, 0, ErrInternalServerError
}
if results[0]["_deleted"] == true {
inst.Logger().WithNamespace("upload").
b.Instance.Logger().WithNamespace("upload").
Warnf("cannot upload _deleted file %v", results[0])
return nil, 0, since, ErrInternalServerError
return nil, 0, ErrInternalServerError
}
return results[0], int(idx), since, nil
return results[0], int(idx), nil
}
return nil, 0, since, nil
}

// uploadFile uploads one file to the given member. It first try to just send
Expand Down

0 comments on commit 843f8d4

Please sign in to comment.