Skip to content
This repository has been archived by the owner on Nov 22, 2023. It is now read-only.

Commit

Permalink
fix: use an index callback to sync with remote index (#212)
Browse files Browse the repository at this point in the history
  • Loading branch information
tchardin authored Oct 28, 2021
1 parent 0be7876 commit 9b114b0
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 20 deletions.
1 change: 1 addition & 0 deletions exchange/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ func New(ctx context.Context, h host.Host, ds datastore.Batching, opts Options)
// leave a 20% lower bound so we don't evict too frequently
WithBounds(opts.Capacity, opts.Capacity-uint64(math.Round(float64(opts.Capacity)*0.2))),
WithDeleteFunc(opts.WatchEvictionFunc),
WithSetFunc(opts.WatchAdditionFunc),
)
if err != nil {
return nil, err
Expand Down
19 changes: 16 additions & 3 deletions exchange/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,10 @@ type Index struct {
// Lower bound is the size we target when evicting to make room for new content
// the interval between ub and lb is to try not evicting after every write once we reach ub
lb uint64
// setFunc is an optional callback called every time a new ref is set. Make sure not to block.
setFunc func(DataRef)
// deleteFunc, if not nil, is called after a ref is evicted. Make sure not to block there.
deleteFunc func(cid.Cid)
deleteFunc func(DataRef)

emu sync.Mutex
// gcSet is a cid Set where we put all the cid that will be evicted when calling the Garbage Collector GC()
Expand Down Expand Up @@ -105,8 +107,15 @@ func WithBounds(up, lo uint64) IndexOption {
}
}

// WithSetFunc sets a setFunc callback
func WithSetFunc(fn func(DataRef)) IndexOption {
return func(idx *Index) {
idx.setFunc = fn
}
}

// WithDeleteFunc sets a deleteFunc callback
func WithDeleteFunc(fn func(cid.Cid)) IndexOption {
func WithDeleteFunc(fn func(DataRef)) IndexOption {
return func(idx *Index) {
idx.deleteFunc = fn
}
Expand Down Expand Up @@ -293,6 +302,10 @@ func (idx *Index) UpdateRef(ref *DataRef) error {

// SetRef adds a ref in the index and increments the LFU queue
func (idx *Index) SetRef(ref *DataRef) error {
if idx.setFunc != nil {
defer idx.setFunc(*ref)
}

idx.mu.Lock()
defer idx.mu.Unlock()
k := ref.PayloadCID.String()
Expand Down Expand Up @@ -462,7 +475,7 @@ func (idx *Index) evict(size uint64) uint64 {
evicted += uint64(entry.PayloadSize)
idx.size -= uint64(entry.PayloadSize)
if idx.deleteFunc != nil {
idx.deleteFunc(entry.PayloadCID)
idx.deleteFunc(*entry)
}
if evicted >= size {
return evicted
Expand Down
5 changes: 3 additions & 2 deletions exchange/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
dtfimpl "github.com/filecoin-project/go-data-transfer/impl"
dtnet "github.com/filecoin-project/go-data-transfer/network"
gstransport "github.com/filecoin-project/go-data-transfer/transport/graphsync"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
"github.com/ipfs/go-datastore/namespace"
"github.com/ipfs/go-graphsync"
Expand Down Expand Up @@ -74,7 +73,9 @@ type Options struct {
// WatchQueriesFunc is an optional function that will return any queries received as a provider
WatchQueriesFunc func(deal.Query)
// WatchEvictionFunc is an optional function that will yield the root cid for any evicted content.
WatchEvictionFunc func(cid.Cid)
WatchEvictionFunc func(DataRef)
// WatchAdditionFunc is an optional callback notifying when content is added to the index.
WatchAdditionFunc func(DataRef)
}

// Everything isn't thoroughly validated so we trust users who provide options know what they're doing
Expand Down
10 changes: 6 additions & 4 deletions exchange/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,12 @@ func (r *Replication) handleRequest(s network.Stream) {
log.Debug().Err(err).Msg("error when loading keys")
}

if err := utils.MigrateBlocks(ctx, store.Bstore, r.bs); err != nil {
log.Error().Err(err).Msg("error when migrating blocks")
// if we fail migrating the blocks we shouldn't set the ref
return
}

ref := &DataRef{
PayloadCID: req.PayloadCID,
PayloadSize: int64(req.Size),
Expand All @@ -422,10 +428,6 @@ func (r *Replication) handleRequest(s network.Stream) {
log.Error().Err(err).Msg("error when setting ref")
}

if err := utils.MigrateBlocks(ctx, store.Bstore, r.bs); err != nil {
log.Error().Err(err).Msg("error when migrating blocks")
}

if err := r.ms.Delete(sid); err != nil {
log.Error().Err(err).Msg("error when deleting store")
}
Expand Down
28 changes: 17 additions & 11 deletions node/popn.go
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,9 @@ func New(ctx context.Context, opts Options) (*node, error) {
Regions: regions,
Capacity: opts.Capacity,
ReplInterval: opts.ReplInterval,
WatchEvictionFunc: nd.onDeleteRef,
// every time new content is added/deleted we notify the remote routing service
WatchEvictionFunc: nd.onDeleteRef,
WatchAdditionFunc: nd.onSetRef,
}
if opts.FilToken != "" {
eopts.FilecoinRPCHeader = http.Header{
Expand Down Expand Up @@ -630,12 +632,6 @@ func (nd *node) Commit(ctx context.Context, args *CommArgs) {
return
}

err = nd.remind.Publish(ref.PayloadCID, ref.PayloadSize)
if err != nil {
sendErr(err)
return
}

nd.send(Notify{CommResult: &CommResult{
Size: filecoin.SizeStr(filecoin.NewInt(uint64(ref.PayloadSize))),
Ref: ref.PayloadCID.String(),
Expand Down Expand Up @@ -1094,13 +1090,13 @@ func (nd *node) Import(ctx context.Context, args *ImportArgs) {
})
tx.Close()

err = nd.exch.Index().GC()
err = nd.exch.Index().SetRef(ref)
if err != nil {
sendErr(err)
return
}

err = nd.remind.Publish(ref.PayloadCID, ref.PayloadSize)
err = nd.exch.Index().GC()
if err != nil {
sendErr(err)
return
Expand Down Expand Up @@ -1288,11 +1284,21 @@ func (nd *node) connPeers() []peer.ID {

// onDeleteRef calls any remote index to delete the related record
// we spin up routines not to block eviction execution.
func (nd *node) onDeleteRef(key cid.Cid) {
func (nd *node) onDeleteRef(ref exchange.DataRef) {
go func() {
err := nd.remind.Delete(key)
err := nd.remind.Delete(ref.PayloadCID)
if err != nil {
log.Error().Err(err).Msg("deleting remote record")
}
}()
}

// onSetRef calls any remote index to publish the related record
func (nd *node) onSetRef(ref exchange.DataRef) {
go func() {
err := nd.remind.Publish(ref.PayloadCID, ref.PayloadSize)
if err != nil {
log.Error().Err(err).Msg("failed to publish to remote index")
}
}()
}

0 comments on commit 9b114b0

Please sign in to comment.