From 9b114b04f66c1f1ce8eccd158c258cc9ab069168 Mon Sep 17 00:00:00 2001 From: tchardin Date: Thu, 28 Oct 2021 06:59:30 -0400 Subject: [PATCH] fix: use an index callback to sync with remote index (#212) --- exchange/exchange.go | 1 + exchange/index.go | 19 ++++++++++++++++--- exchange/options.go | 5 +++-- exchange/replication.go | 10 ++++++---- node/popn.go | 28 +++++++++++++++++----------- 5 files changed, 43 insertions(+), 20 deletions(-) diff --git a/exchange/exchange.go b/exchange/exchange.go index 569e523a..713081e8 100644 --- a/exchange/exchange.go +++ b/exchange/exchange.go @@ -57,6 +57,7 @@ func New(ctx context.Context, h host.Host, ds datastore.Batching, opts Options) // leave a 20% lower bound so we don't evict too frequently WithBounds(opts.Capacity, opts.Capacity-uint64(math.Round(float64(opts.Capacity)*0.2))), WithDeleteFunc(opts.WatchEvictionFunc), + WithSetFunc(opts.WatchAdditionFunc), ) if err != nil { return nil, err diff --git a/exchange/index.go b/exchange/index.go index bf810f21..454c621b 100644 --- a/exchange/index.go +++ b/exchange/index.go @@ -47,8 +47,10 @@ type Index struct { // Lower bound is the size we target when evicting to make room for new content // the interval between ub and lb is to try not evicting after every write once we reach ub lb uint64 + // setFunc is an optional callback called every time a new ref is set. Make sure not to block. + setFunc func(DataRef) // deleteFunc, if not nil, is called after a ref is evicted. Make sure not to block there. - deleteFunc func(cid.Cid) + deleteFunc func(DataRef) emu sync.Mutex // gcSet is a cid Set where we put all the cid that will be evicted when calling the Garbage Collector GC() @@ -105,8 +107,15 @@ func WithBounds(up, lo uint64) IndexOption { } } +// WithSetFunc sets a setFunc callback +func WithSetFunc(fn func(DataRef)) IndexOption { + return func(idx *Index) { + idx.setFunc = fn + } +} + // WithDeleteFunc sets a deleteFunc callback -func WithDeleteFunc(fn func(cid.Cid)) IndexOption { +func WithDeleteFunc(fn func(DataRef)) IndexOption { return func(idx *Index) { idx.deleteFunc = fn } @@ -293,6 +302,10 @@ func (idx *Index) UpdateRef(ref *DataRef) error { // SetRef adds a ref in the index and increments the LFU queue func (idx *Index) SetRef(ref *DataRef) error { + if idx.setFunc != nil { + defer idx.setFunc(*ref) + } + idx.mu.Lock() defer idx.mu.Unlock() k := ref.PayloadCID.String() @@ -462,7 +475,7 @@ func (idx *Index) evict(size uint64) uint64 { evicted += uint64(entry.PayloadSize) idx.size -= uint64(entry.PayloadSize) if idx.deleteFunc != nil { - idx.deleteFunc(entry.PayloadCID) + idx.deleteFunc(*entry) } if evicted >= size { return evicted diff --git a/exchange/options.go b/exchange/options.go index 4361b78b..51e6bfe4 100644 --- a/exchange/options.go +++ b/exchange/options.go @@ -11,7 +11,6 @@ import ( dtfimpl "github.com/filecoin-project/go-data-transfer/impl" dtnet "github.com/filecoin-project/go-data-transfer/network" gstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync" - "github.com/ipfs/go-cid" "github.com/ipfs/go-datastore" "github.com/ipfs/go-datastore/namespace" "github.com/ipfs/go-graphsync" @@ -74,7 +73,9 @@ type Options struct { // WatchQueriesFunc is an optional function that will return any queries received as a provider WatchQueriesFunc func(deal.Query) // WatchEvictionFunc is an optional function that will yield the root cid for any evicted content. - WatchEvictionFunc func(cid.Cid) + WatchEvictionFunc func(DataRef) + // WatchAdditionFunc is an optional callback notifying when content is added to the index. + WatchAdditionFunc func(DataRef) } // Everything isn't thoroughly validated so we trust users who provide options know what they're doing diff --git a/exchange/replication.go b/exchange/replication.go index 0c54b32f..575e1a76 100644 --- a/exchange/replication.go +++ b/exchange/replication.go @@ -411,6 +411,12 @@ func (r *Replication) handleRequest(s network.Stream) { log.Debug().Err(err).Msg("error when loading keys") } + if err := utils.MigrateBlocks(ctx, store.Bstore, r.bs); err != nil { + log.Error().Err(err).Msg("error when migrating blocks") + // if we fail migrating the blocks we shouldn't set the ref + return + } + ref := &DataRef{ PayloadCID: req.PayloadCID, PayloadSize: int64(req.Size), @@ -422,10 +428,6 @@ func (r *Replication) handleRequest(s network.Stream) { log.Error().Err(err).Msg("error when setting ref") } - if err := utils.MigrateBlocks(ctx, store.Bstore, r.bs); err != nil { - log.Error().Err(err).Msg("error when migrating blocks") - } - if err := r.ms.Delete(sid); err != nil { log.Error().Err(err).Msg("error when deleting store") } diff --git a/node/popn.go b/node/popn.go index 824b3308..14a6839c 100644 --- a/node/popn.go +++ b/node/popn.go @@ -247,7 +247,9 @@ func New(ctx context.Context, opts Options) (*node, error) { Regions: regions, Capacity: opts.Capacity, ReplInterval: opts.ReplInterval, - WatchEvictionFunc: nd.onDeleteRef, + // every time new content is added/deleted we notify the remote routing service + WatchEvictionFunc: nd.onDeleteRef, + WatchAdditionFunc: nd.onSetRef, } if opts.FilToken != "" { eopts.FilecoinRPCHeader = http.Header{ @@ -630,12 +632,6 @@ func (nd *node) Commit(ctx context.Context, args *CommArgs) { return } - err = nd.remind.Publish(ref.PayloadCID, ref.PayloadSize) - if err != nil { - sendErr(err) - return - } - nd.send(Notify{CommResult: &CommResult{ Size: filecoin.SizeStr(filecoin.NewInt(uint64(ref.PayloadSize))), Ref: ref.PayloadCID.String(), @@ -1094,13 +1090,13 @@ func (nd *node) Import(ctx context.Context, args *ImportArgs) { }) tx.Close() - err = nd.exch.Index().GC() + err = nd.exch.Index().SetRef(ref) if err != nil { sendErr(err) return } - err = nd.remind.Publish(ref.PayloadCID, ref.PayloadSize) + err = nd.exch.Index().GC() if err != nil { sendErr(err) return @@ -1288,11 +1284,21 @@ func (nd *node) connPeers() []peer.ID { // onDeleteRef calls any remote index to delete the related record // we spin up routines not to block eviction execution. -func (nd *node) onDeleteRef(key cid.Cid) { +func (nd *node) onDeleteRef(ref exchange.DataRef) { go func() { - err := nd.remind.Delete(key) + err := nd.remind.Delete(ref.PayloadCID) if err != nil { log.Error().Err(err).Msg("deleting remote record") } }() } + +// onSetRef calls any remote index to publish the related record +func (nd *node) onSetRef(ref exchange.DataRef) { + go func() { + err := nd.remind.Publish(ref.PayloadCID, ref.PayloadSize) + if err != nil { + log.Error().Err(err).Msg("failed to publish to remote index") + } + }() +}