Skip to content
This repository has been archived by the owner on Nov 22, 2023. It is now read-only.

Commit

Permalink
feat: import car and dispatch result (#210)
Browse files Browse the repository at this point in the history
  • Loading branch information
tchardin authored Oct 27, 2021
1 parent 079d14e commit 0cdd05e
Show file tree
Hide file tree
Showing 9 changed files with 464 additions and 15 deletions.
1 change: 0 additions & 1 deletion cmd/pop/cli/commit.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
var commArgs struct {
cacheOnly bool
cacheRF int
storageRF int
}

var commCmd = &ffcli.Command{
Expand Down
59 changes: 59 additions & 0 deletions cmd/pop/cli/import.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package cli

import (
"context"
"errors"
"flag"
"fmt"
"strings"

"github.com/myelnet/pop/node"
"github.com/peterbourgon/ff/v3/ffcli"
)

var importArgs struct {
cacheRF int
}

var importCmd = &ffcli.Command{
Name: "import",
ShortUsage: "import <path>",
ShortHelp: "Import a CAR file to the blockstore",
LongHelp: strings.TrimSpace(`
The 'pop import <path-to-car>' directly imports archived DAGs to the blockstore.
`),
Exec: runImport,
FlagSet: (func() *flag.FlagSet {
fs := flag.NewFlagSet("import", flag.ExitOnError)
fs.IntVar(&importArgs.cacheRF, "cache-rf", 0, "number of providers to replicate the content to")
return fs
})(),
}

func runImport(ctx context.Context, args []string) error {
c, cc, ctx, cancel := connect(ctx)
defer cancel()

irc := make(chan *node.ImportResult, 1)
cc.SetNotifyCallback(func(n node.Notify) {
if ir := n.ImportResult; ir != nil {
irc <- ir
}
})
go receive(ctx, cc, c)

cc.Import(&node.ImportArgs{Path: args[0], CacheRF: importArgs.cacheRF})
select {
case ir := <-irc:
if ir.Err != "" {
return errors.New(ir.Err)
}

fmt.Println("Imported car with roots", ir.Roots)
return nil
case <-ctx.Done():
return ctx.Err()
}
}
7 changes: 7 additions & 0 deletions exchange/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,13 @@ func WithRoot(r cid.Cid) TxOption {
}
}

// WithSize allows overriding the size manually if we know it ahead of time
func WithSize(size int64) TxOption {
return func(tx *Tx) {
tx.size = size
}
}

// WithTriage allows a transaction to manually prompt for external confirmation before executing an offer
func WithTriage() TxOption {
return func(tx *Tx) {
Expand Down
7 changes: 3 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ require (
github.com/ipfs/go-bitswap v0.3.2 // indirect
github.com/ipfs/go-block-format v0.0.3
github.com/ipfs/go-blockservice v0.1.4
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-cid v0.0.8-0.20210716091050-de6c03deae1c
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ds-badger v0.2.6
github.com/ipfs/go-graphsync v0.9.1
Expand All @@ -42,10 +42,10 @@ require (
github.com/ipfs/go-merkledag v0.3.2
github.com/ipfs/go-path v0.0.9
github.com/ipfs/go-unixfs v0.2.4
github.com/ipld/go-car/v2 v2.1.0
github.com/ipld/go-codec-dagpb v1.3.0
github.com/ipld/go-ipld-prime v0.12.0
github.com/jpillora/backoff v1.0.0
github.com/koding/websocketproxy v0.0.0-20181220232114-7ed82d81a28c
github.com/libp2p/go-eventbus v0.2.1
github.com/libp2p/go-libp2p v0.13.0
github.com/libp2p/go-libp2p-blankhost v0.2.0
Expand All @@ -54,7 +54,6 @@ require (
github.com/libp2p/go-libp2p-kad-dht v0.11.1
github.com/libp2p/go-libp2p-noise v0.1.2 // indirect
github.com/libp2p/go-libp2p-peer v0.2.0
github.com/libp2p/go-libp2p-peerstore v0.2.6
github.com/libp2p/go-libp2p-pubsub v0.4.1
github.com/libp2p/go-libp2p-swarm v0.4.0
github.com/libp2p/go-libp2p-testing v0.4.0
Expand All @@ -68,7 +67,7 @@ require (
github.com/onsi/ginkgo v1.16.1 // indirect
github.com/onsi/gomega v1.11.0 // indirect
github.com/peterbourgon/ff/v3 v3.0.0
github.com/rs/zerolog v1.20.0
github.com/rs/zerolog v1.21.0
github.com/soheilhy/cmux v0.1.5
github.com/stretchr/testify v1.7.0
github.com/tchardin/go-libp2p-blankhost v0.2.1-0.20210408134851-9396bc83e200
Expand Down
259 changes: 249 additions & 10 deletions go.sum

Large diffs are not rendered by default.

Binary file added internal/testutil/testcar-v1.car
Binary file not shown.
23 changes: 23 additions & 0 deletions node/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ type ListArgs struct {
Page int // potential pagination as the amount may be very large
}

// ImportArgs provides the path to the car file
type ImportArgs struct {
Path string
CacheRF int
}

// PayArgs provides params for controlling a payment channel
type PayArgs struct {
ChAddr string
Expand All @@ -91,6 +97,7 @@ type Command struct {
Commit *CommArgs
Get *GetArgs
List *ListArgs
Import *ImportArgs
PaySubmit *PayArgs
PayList *PayArgs
PayTrack *PayArgs
Expand Down Expand Up @@ -169,6 +176,13 @@ type ListResult struct {
Err string
}

// ImportResult returns the resulting index entries from the CAR file
type ImportResult struct {
Roots []string
Caches []string
Err string
}

// PayResult returns the result of submitted vouchers
type PayResult struct {
SettlingIn string
Expand All @@ -186,6 +200,7 @@ type Notify struct {
CommResult *CommResult
GetResult *GetResult
ListResult *ListResult
ImportResult *ImportResult
PayResult *PayResult
}

Expand Down Expand Up @@ -257,6 +272,10 @@ func (cs *CommandServer) GotMsg(ctx context.Context, cmd *Command) error {
go cs.n.List(ctx, c)
return nil
}
if c := cmd.Import; c != nil {
go cs.n.Import(ctx, c)
return nil
}
if c := cmd.PaySubmit; c != nil {
go cs.n.PaySubmit(ctx, c)
return nil
Expand Down Expand Up @@ -371,6 +390,10 @@ func (cc *CommandClient) List(args *ListArgs) {
cc.send(Command{List: args})
}

func (cc *CommandClient) Import(args *ImportArgs) {
cc.send(Command{Import: args})
}

func (cc *CommandClient) PaySubmit(args *PayArgs) {
cc.send(Command{PaySubmit: args})
}
Expand Down
30 changes: 30 additions & 0 deletions node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,36 @@ func TestList(t *testing.T) {
}
}

func TestImport(t *testing.T) {
ctx := context.Background()
mn := mocknet.New(ctx)

cn := newTestNode(ctx, mn, t)

var nds []*node
nds = append(nds, newTestNode(ctx, mn, t))
nds = append(nds, newTestNode(ctx, mn, t))

require.NoError(t, mn.LinkAll())
require.NoError(t, mn.ConnectAllButSelf())

res := make(chan *ImportResult, 3)
cn.notify = func(n Notify) {
require.Equal(t, n.ImportResult.Err, "")
res <- n.ImportResult
}
cn.Import(ctx, &ImportArgs{Path: "../internal/testutil/testcar-v1.car", CacheRF: 2})

i := 0
for i < 2 {
<-res
i++
}

r := <-res
require.Equal(t, []string{"QmRtuDzjipZnWUgjAHgaatG5sPEJuyCUV41xpFYZ6DtFJr"}, r.Roots)
}

// Commit 2 different files into a single transaction and then retrieve (Get)
// the files individually with 2 separate operations. Both Get operations are on the
// same transaction (ref) and based on the same root CID but retrieve 2 different files.
Expand Down
93 changes: 93 additions & 0 deletions node/popn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/docker/go-units"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
Expand All @@ -32,6 +33,7 @@ import (
"github.com/ipfs/go-unixfs/importer/balanced"
"github.com/ipfs/go-unixfs/importer/helpers"
"github.com/ipfs/go-unixfs/importer/trickle"
"github.com/ipld/go-car/v2"
"github.com/libp2p/go-libp2p"
connmgr "github.com/libp2p/go-libp2p-connmgr"
"github.com/libp2p/go-libp2p-core/host"
Expand Down Expand Up @@ -1020,6 +1022,97 @@ func (nd *node) List(ctx context.Context, args *ListArgs) {
}
}

// Import blocks from a CAR file
func (nd *node) Import(ctx context.Context, args *ImportArgs) {
sendErr := func(err error) {
nd.send(Notify{
ImportResult: &ImportResult{
Err: err.Error(),
}})
}

f, err := os.Open(args.Path)
if err != nil {
sendErr(err)
return
}
info, err := f.Stat()
if err != nil {
sendErr(err)
return
}

br, err := car.NewBlockReader(f)
if err != nil {
sendErr(err)
return
}

if len(br.Roots) != 1 {
sendErr(fmt.Errorf("unexpected number of roots, exepected 1 got %d", len(br.Roots)))
return
}

blks := []blocks.Block{}

for {
block, err := br.Next()
if err != nil && err != io.EOF {
sendErr(err)
return
} else if block == nil {
break
}
blks = append(blks, block)
}

root := br.Roots[0]
tx := nd.exch.Tx(ctx, exchange.WithRoot(root), exchange.WithSize(info.Size()))
tx.SetCacheRF(args.CacheRF)

err = tx.Store().Bstore.PutMany(blks)
if err != nil {
sendErr(err)
return
}

err = tx.Commit()
if err != nil {
sendErr(err)
return
}

ref := tx.Ref()
tx.WatchDispatch(func(r exchange.PRecord) {
nd.send(Notify{
ImportResult: &ImportResult{
Caches: []string{
r.Provider.String(),
},
},
})
})
tx.Close()

err = nd.exch.Index().GC()
if err != nil {
sendErr(err)
return
}

err = nd.remind.Publish(ref.PayloadCID, ref.PayloadSize)
if err != nil {
sendErr(err)
return
}

nd.send(Notify{
ImportResult: &ImportResult{
Roots: []string{ref.PayloadCID.String()},
},
})
}

// DAGParams is a set of features used to chunk and format files into DAGs
type DAGParams struct {
chunker chunk.Splitter
Expand Down

0 comments on commit 0cdd05e

Please sign in to comment.