From f35a8e2488f6aecf254a9046216079e4b2daefb8 Mon Sep 17 00:00:00 2001 From: Bruno Michel Date: Tue, 3 Oct 2023 15:18:41 +0200 Subject: [PATCH 1/4] Rename Sharing.InitialCopy to Sharing.InitialIndex --- model/sharing/replicator_test.go | 16 ++++++++-------- model/sharing/setup.go | 6 +++--- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/model/sharing/replicator_test.go b/model/sharing/replicator_test.go index d7a4c6006ef..683aaf7cfd7 100644 --- a/model/sharing/replicator_test.go +++ b/model/sharing/replicator_test.go @@ -77,7 +77,7 @@ func TestReplicator(t *testing.T) { assert.Equal(t, feed.Seq, seq3) }) - t.Run("InitialCopy", func(t *testing.T) { + t.Run("InitialIndex", func(t *testing.T) { // Start with an empty io.cozy.shared database _ = couchdb.DeleteDB(inst, consts.Shared) if err := couchdb.CreateDB(inst, consts.Shared); err != nil { @@ -102,7 +102,7 @@ func TestReplicator(t *testing.T) { Values: []string{settingsDocID}, Local: true, }) - assert.NoError(t, s.InitialCopy(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1)) + assert.NoError(t, s.InitialIndex(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1)) nbShared := 0 assertNbSharedRef(t, inst, nbShared) @@ -114,7 +114,7 @@ func TestReplicator(t *testing.T) { DocType: testDoctype, Values: []string{oneID}, }) - assert.NoError(t, s.InitialCopy(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1)) + assert.NoError(t, s.InitialIndex(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1)) nbShared++ assertNbSharedRef(t, inst, nbShared) oneRef := getSharedRef(t, inst, testDoctype, oneID) @@ -135,7 +135,7 @@ func TestReplicator(t *testing.T) { Selector: "foo", Values: []string{"bar"}, }) - assert.NoError(t, s.InitialCopy(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1)) + assert.NoError(t, s.InitialIndex(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1)) nbShared += len(twoIDs) assertNbSharedRef(t, inst, nbShared) for _, id := range twoIDs { @@ -160,7 +160,7 @@ func TestReplicator(t *testing.T) { Selector: "foo", Values: []string{"qux", "quux", "quuux"}, }) - assert.NoError(t, s.InitialCopy(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1)) + assert.NoError(t, s.InitialIndex(inst, s.Rules[len(s.Rules)-1], len(s.Rules)-1)) nbShared += len(threeIDs) assertNbSharedRef(t, inst, nbShared) for _, id := range threeIDs { @@ -172,7 +172,7 @@ func TestReplicator(t *testing.T) { // Another member accepts the sharing for r, rule := range s.Rules { - assert.NoError(t, s.InitialCopy(inst, rule, r)) + assert.NoError(t, s.InitialIndex(inst, rule, r)) } assertNbSharedRef(t, inst, nbShared) @@ -189,7 +189,7 @@ func TestReplicator(t *testing.T) { // A third member accepts the sharing for r, rule := range s.Rules { - assert.NoError(t, s.InitialCopy(inst, rule, r)) + assert.NoError(t, s.InitialIndex(inst, rule, r)) } nbShared++ assertNbSharedRef(t, inst, nbShared) @@ -212,7 +212,7 @@ func TestReplicator(t *testing.T) { Selector: "foo", Values: []string{"qux", "quux", "quuux"}, }) - assert.NoError(t, s2.InitialCopy(inst, s2.Rules[len(s2.Rules)-1], len(s2.Rules)-1)) + assert.NoError(t, s2.InitialIndex(inst, s2.Rules[len(s2.Rules)-1], len(s2.Rules)-1)) assertNbSharedRef(t, inst, nbShared) for _, id := range threeIDs { threeRef := getSharedRef(t, inst, testDoctype, id) diff --git a/model/sharing/setup.go b/model/sharing/setup.go index 5451baea913..02cf4c56fe8 100644 --- a/model/sharing/setup.go +++ b/model/sharing/setup.go @@ -110,7 +110,7 @@ func (s *Sharing) Setup(inst *instance.Instance, m *Member) { } if s.Triggers.ReplicateID == "" { for i, rule := range s.Rules { - if err := s.InitialCopy(inst, rule, i); err != nil { + if err := s.InitialIndex(inst, rule, i); err != nil { inst.Logger().Warnf("Error on initial copy for %s (%s): %s", rule.Title, s.SID, err) } } @@ -208,9 +208,9 @@ func (s *Sharing) AddReplicateTrigger(inst *instance.Instance) error { return couchdb.UpdateDoc(inst, s) } -// InitialCopy lists the shared documents and put a reference in the +// InitialIndex lists the shared documents and put a reference in the // io.cozy.shared database -func (s *Sharing) InitialCopy(inst *instance.Instance, rule Rule, r int) error { +func (s *Sharing) InitialIndex(inst *instance.Instance, rule Rule, r int) error { if rule.Local || len(rule.Values) == 0 { return nil } From 9e08696d62285fa7d08b9a86dc5233d722a05cb2 Mon Sep 17 00:00:00 2001 From: Bruno Michel Date: Tue, 3 Oct 2023 15:24:07 +0200 Subject: [PATCH 2/4] Increase the batch size to improve sharing throughput --- model/sharing/replicator.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/model/sharing/replicator.go b/model/sharing/replicator.go index 14fb8c835ef..842ceafee1d 100644 --- a/model/sharing/replicator.go +++ b/model/sharing/replicator.go @@ -32,9 +32,9 @@ const MaxRetries = 5 // (each next retry will wait 4 times longer than its previous retry) const InitialBackoffPeriod = 1 * time.Minute -// BatchSize is the maximal number of documents mainpulated at once by the +// BatchSize is the maximal number of documents manipulated at once by the // replicator -const BatchSize = 100 +const BatchSize = 400 // ReplicateMsg is used for jobs on the share-replicate worker. type ReplicateMsg struct { From abc1b744900e3b602603bf3ce68caa48daf90962 Mon Sep 17 00:00:00 2001 From: Bruno Michel Date: Tue, 3 Oct 2023 15:48:34 +0200 Subject: [PATCH 3/4] Make share-upload work in parallel for each member --- model/sharing/upload.go | 40 +++++++++++++++++++++++----------------- 1 file changed, 23 insertions(+), 17 deletions(-) diff --git a/model/sharing/upload.go b/model/sharing/upload.go index 6cd8de7ecec..1e2e908970e 100644 --- a/model/sharing/upload.go +++ b/model/sharing/upload.go @@ -2,6 +2,7 @@ package sharing import ( "bytes" + "context" "encoding/json" "errors" "fmt" @@ -18,8 +19,8 @@ import ( "github.com/cozy/cozy-stack/pkg/consts" "github.com/cozy/cozy-stack/pkg/couchdb" "github.com/cozy/cozy-stack/pkg/realtime" - multierror "github.com/hashicorp/go-multierror" "github.com/labstack/echo/v4" + "golang.org/x/sync/errgroup" ) // UploadMsg is used for jobs on the share-upload worker. @@ -52,25 +53,30 @@ func (s *Sharing) Upload(inst *instance.Instance, errors int) error { } lastTry := errors+1 == MaxRetries - for i := 0; i < BatchSize; i++ { - if len(members) == 0 { - break - } - m := members[0] - members = members[1:] - more, err := s.UploadTo(inst, m, lastTry) - if err != nil { - errm = multierror.Append(errm, err) - } - if more { - members = append(members, m) - } + done := true + g, _ := errgroup.WithContext(context.Background()) + for i := range members { + m := members[i] + g.Go(func() error { + for i := 0; i < BatchSize; i++ { + more, err := s.UploadTo(inst, m, lastTry) + if err != nil { + return err + } + if !more { + return nil + } + } + done = false + return nil + }) } + err := g.Wait() - if errm != nil { + if err != nil { s.retryWorker(inst, "share-upload", errors) - inst.Logger().WithNamespace("upload").Infof("errm=%s\n", errm) - } else if len(members) > 0 { + inst.Logger().WithNamespace("upload").Infof("err=%s\n", err) + } else if !done { s.pushJob(inst, "share-upload") } return errm From 22eb86feb7504c98642b090b50b19a71eb699185 Mon Sep 17 00:00:00 2001 From: Bruno Michel Date: Mon, 9 Oct 2023 15:57:54 +0200 Subject: [PATCH 4/4] Make the initial replication of a sharing faster When a user accepts to synchronize a sharing to their Cozy, the stack starts a replication of the io.cozy.files, and uploads the files inside the sharing. We can make this process faster by batching calls to the changes feed. It is particulary useful when there are lots of documents in the io.cozy.shared database, from other sharings. --- model/sharing/replicator.go | 13 ++++ model/sharing/setup.go | 10 +-- model/sharing/upload.go | 142 +++++++++++++++++++----------------- 3 files changed, 93 insertions(+), 72 deletions(-) diff --git a/model/sharing/replicator.go b/model/sharing/replicator.go index 842ceafee1d..798ea56dcf1 100644 --- a/model/sharing/replicator.go +++ b/model/sharing/replicator.go @@ -143,6 +143,19 @@ func (s *Sharing) retryWorker(inst *instance.Instance, worker string, errors int } } +func (s *Sharing) InitialReplication(inst *instance.Instance, m *Member) error { + for i := 0; i < 1000; i++ { + pending, err := s.ReplicateTo(inst, m, true) + if err != nil { + return err + } + if !pending { + return nil + } + } + return ErrInternalServerError +} + // ReplicateTo starts a replicator on this sharing to the given member. // http://docs.couchdb.org/en/stable/replication/protocol.html // https://github.com/pouchdb/pouchdb/blob/master/packages/node_modules/pouchdb-replication/src/replicate.js diff --git a/model/sharing/setup.go b/model/sharing/setup.go index 02cf4c56fe8..93dde07371f 100644 --- a/model/sharing/setup.go +++ b/model/sharing/setup.go @@ -119,14 +119,14 @@ func (s *Sharing) Setup(inst *instance.Instance, m *Member) { inst.Logger().WithNamespace("sharing"). Warnf("Error on setup replicate trigger (%s): %s", s.SID, err) } - if pending, err := s.ReplicateTo(inst, m, true); err != nil { + if err := s.InitialReplication(inst, m); err != nil { inst.Logger().WithNamespace("sharing"). Warnf("Error on initial replication (%s): %s", s.SID, err) s.retryWorker(inst, "share-replicate", 0) - } else { - if pending { - s.pushJob(inst, "share-replicate") + if s.FirstFilesRule() != nil { + s.retryWorker(inst, "share-upload", 1) // 1, so that it will start after share-replicate } + } else { if s.FirstFilesRule() == nil { return } @@ -141,7 +141,7 @@ func (s *Sharing) Setup(inst *instance.Instance, m *Member) { } } - go s.NotifyRecipients(inst, m) + s.NotifyRecipients(inst, m) } // AddTrackTriggers creates the share-track triggers for each rule of the diff --git a/model/sharing/upload.go b/model/sharing/upload.go index 1e2e908970e..c17dee27abb 100644 --- a/model/sharing/upload.go +++ b/model/sharing/upload.go @@ -58,16 +58,13 @@ func (s *Sharing) Upload(inst *instance.Instance, errors int) error { for i := range members { m := members[i] g.Go(func() error { - for i := 0; i < BatchSize; i++ { - more, err := s.UploadTo(inst, m, lastTry) - if err != nil { - return err - } - if !more { - return nil - } + more, err := s.UploadBatchTo(inst, m, lastTry) + if err != nil { + return err + } + if more { + done = false } - done = false return nil }) } @@ -90,14 +87,12 @@ func (s *Sharing) InitialUpload(inst *instance.Instance, m *Member) error { } defer mu.Unlock() - for i := 0; i < BatchSize; i++ { - more, err := s.UploadTo(inst, m, false) - if err != nil { - return err - } - if !more { - return s.sendInitialEndNotif(inst, m) - } + more, err := s.UploadBatchTo(inst, m, false) + if err != nil { + return err + } + if !more { + return s.sendInitialEndNotif(inst, m) } s.pushJob(inst, "share-upload") @@ -132,9 +127,9 @@ func (s *Sharing) sendInitialEndNotif(inst *instance.Instance, m *Member) error return nil } -// UploadTo uploads one file to the given member. It returns false if there -// are no more files to upload to this member currently. -func (s *Sharing) UploadTo(inst *instance.Instance, m *Member, lastTry bool) (bool, error) { +// UploadBatchTo uploads a batch of files to the given member. It returns false +// if there are no more files to upload to this member currently. +func (s *Sharing) UploadBatchTo(inst *instance.Instance, m *Member, lastTry bool) (bool, error) { if m.Instance == "" { return false, ErrInvalidURL } @@ -149,58 +144,71 @@ func (s *Sharing) UploadTo(inst *instance.Instance, m *Member, lastTry bool) (bo } inst.Logger().WithNamespace("upload").Debugf("lastSeq = %s", lastSeq) - file, ruleIndex, seq, err := s.findNextFileToUpload(inst, lastSeq) - if errors.Is(err, ErrInternalServerError) { - // Retrying is useless in this case, let's skip this file - if seq != lastSeq { - _ = s.UpdateLastSequenceNumber(inst, m, "upload", seq) - } - return false, nil + batch := &batchUpload{ + Sharing: s, + Instance: inst, + Seq: lastSeq, } - if err != nil { - return false, err - } - if file == nil { - if seq != lastSeq { - err = s.UpdateLastSequenceNumber(inst, m, "upload", seq) + defer func() { + if batch.Seq != lastSeq { + _ = s.UpdateLastSequenceNumber(inst, m, "upload", batch.Seq) } - return false, err - } + }() - if err = s.uploadFile(inst, m, file, ruleIndex); err != nil { - if lastTry { - _ = s.UpdateLastSequenceNumber(inst, m, "upload", seq) + for i := 0; i < BatchSize; i++ { + file, ruleIndex, err := batch.findNextFileToUpload() + if err != nil { + return false, err + } + if file == nil { + return false, nil + } + if err = s.uploadFile(inst, m, file, ruleIndex); err != nil { + return false, err } - return false, err } + return true, nil +} + +type batchUpload struct { + Sharing *Sharing + Instance *instance.Instance + Seq string - return true, s.UpdateLastSequenceNumber(inst, m, "upload", seq) + // changes is used to batch calls to the changes feed and improves + // performances. + changes []couchdb.Change } // findNextFileToUpload uses the changes feed to find the next file that needs // to be uploaded. It returns a file document if there is one file to upload, -// and the sequence number where it is in the changes feed. -func (s *Sharing) findNextFileToUpload(inst *instance.Instance, since string) (map[string]interface{}, int, string, error) { +// and the index of the sharing rule that applies to this file. +func (b *batchUpload) findNextFileToUpload() (map[string]interface{}, int, error) { for { - response, err := couchdb.GetChanges(inst, &couchdb.ChangesRequest{ - DocType: consts.Shared, - IncludeDocs: true, - Since: since, - Limit: 1, - }) - if err != nil { - return nil, 0, since, err - } - since = response.LastSeq - if len(response.Results) == 0 { - break + seq := b.Seq + if len(b.changes) == 0 { + response, err := couchdb.GetChanges(b.Instance, &couchdb.ChangesRequest{ + DocType: consts.Shared, + IncludeDocs: true, + Since: seq, + Limit: BatchSize, + }) + if err != nil { + return nil, 0, err + } + if len(response.Results) == 0 { + return nil, 0, nil + } + b.changes = response.Results } - r := response.Results[0] - infos, ok := r.Doc.Get("infos").(map[string]interface{}) + change := b.changes[0] + b.changes = b.changes[1:] + b.Seq = change.Seq + infos, ok := change.Doc.Get("infos").(map[string]interface{}) if !ok { continue } - info, ok := infos[s.SID].(map[string]interface{}) + info, ok := infos[b.Sharing.SID].(map[string]interface{}) if !ok { continue } @@ -214,30 +222,30 @@ func (s *Sharing) findNextFileToUpload(inst *instance.Instance, since string) (m if !ok { continue } - rev := extractLastRevision(r.Doc) + rev := extractLastRevision(change.Doc) if rev == "" { continue } - docID := strings.SplitN(r.DocID, "/", 2)[1] + docID := strings.SplitN(change.DocID, "/", 2)[1] ir := couchdb.IDRev{ID: docID, Rev: rev} query := []couchdb.IDRev{ir} - results, err := couchdb.BulkGetDocs(inst, consts.Files, query) + results, err := couchdb.BulkGetDocs(b.Instance, consts.Files, query) if err != nil { - return nil, 0, since, err + b.Seq = seq + return nil, 0, err } if len(results) == 0 { - inst.Logger().WithNamespace("upload"). + b.Instance.Logger().WithNamespace("upload"). Warnf("missing results for bulk get %v", query) - return nil, 0, since, ErrInternalServerError + return nil, 0, ErrInternalServerError } if results[0]["_deleted"] == true { - inst.Logger().WithNamespace("upload"). + b.Instance.Logger().WithNamespace("upload"). Warnf("cannot upload _deleted file %v", results[0]) - return nil, 0, since, ErrInternalServerError + return nil, 0, ErrInternalServerError } - return results[0], int(idx), since, nil + return results[0], int(idx), nil } - return nil, 0, since, nil } // uploadFile uploads one file to the given member. It first try to just send