Skip to content

Commit

Permalink
feat(events): compare-amt option for lotus-shed indexes inspect-events
Browse files Browse the repository at this point in the history
Instead of relying just on entry counts, compare the regenerated AMT root using
just what we have in the db with the message receipt event root. This should
tell us precisely that we have what we should or not.

Ref: #12570
  • Loading branch information
rvagg committed Oct 11, 2024
1 parent 4586794 commit eccb10d
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 38 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
* Implement [FIP-0081](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0081.md) and its migration for NV24. Initial pledge collateral will now be calculated using a 70% / 30% split between "simple" and "baseline" in the initial consensus pledge contribution to collateral calculation. The change in this calculation will begin at NV24 activation and ramp up from the current split of 100% / 0% to the eventual 70% / 30% over the course of a year so as to minimise impact on existing operations. ([filecoin-project/lotus#12526](https://github.com/filecoin-project/lotus/pull/12526)
* Update to F3 0.4.0 ([filecoin-project/lotus#12547](https://github.com/filecoin-project/lotus/pull/12547)). This includes additional performance enhancements and bug fixes.
* [Ticket-based F3 participation API](https://github.com/filecoin-project/lotus/pull/12531): This update introduces a new design where participation tickets grant a temporary lease, allowing storage providers to sign as part of a single GPBFT instance at any given point in time. This design ensures that tickets are checked for validity and issuer alignment, handling errors robustly in order to avoid self-equivocation during GPBFT instances.
* `lotus-shed indexes inspect-indexes` now has a `--compare-amt` flag that will do a comprehensive comparison of the event index data for each message by comparing the AMT root CID from the message receipt with the root of a reconstructed AMT. ([filecoin-project/lotus#12570](https://github.com/filecoin-project/lotus/pull/12570))

## Improvements

Expand Down
230 changes: 192 additions & 38 deletions cmd/lotus-shed/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,21 @@ import (
"strings"
"time"

"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/exitcode"

lapi "github.com/filecoin-project/lotus/api"
bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
lcli "github.com/filecoin-project/lotus/cli"
Expand All @@ -34,6 +39,10 @@ const (
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false`
tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?`

// these queries are used to extract just the information used to reconstruct the event AMT from the database
selectEventIdAndEmitter = `SELECT id, emitter_addr FROM event WHERE tipset_key_cid=? and message_cid=? ORDER BY event_index ASC`
selectEventEntries = `SELECT flags, key, codec, value FROM event_entry WHERE event_id=? ORDER BY _rowid_ ASC`
)

func withCategory(cat string, cmd *cli.Command) *cli.Command {
Expand Down Expand Up @@ -415,6 +424,11 @@ var inspectEventsCmd = &cli.Command{
Usage: "log tipsets that have no detected problems",
Value: false,
},
&cli.BoolFlag{
Name: "compare-amt",
Usage: "compare the reconstructed event AMT with the on-chain AMT",
Value: false,
},
},
Action: func(cctx *cli.Context) error {
srv, err := lcli.GetFullNodeServices(cctx)
Expand All @@ -440,6 +454,7 @@ var inspectEventsCmd = &cli.Command{
}

logGood := cctx.Bool("log-good")
compareAmt := cctx.Bool("compare-amt")

// advance currTs by one epoch and maintain prevTs as the previous tipset (this allows us to easily use the ChainGetParentMessages/Receipt API)
prevTs := currTs
Expand Down Expand Up @@ -483,43 +498,186 @@ var inspectEventsCmd = &cli.Command{
if err != nil {
return err
}
stmtSelectEventIdAndEmitter, err := db.Prepare(selectEventIdAndEmitter)
if err != nil {
return err
}
stmtSelectEventEntries, err := db.Prepare(selectEventEntries)
if err != nil {
return err
}

processHeight := func(ctx context.Context, ts *types.TipSet, receipts []*types.MessageReceipt) error {
processHeight := func(ctx context.Context, ts *types.TipSet, messages []lapi.Message, receipts []*types.MessageReceipt) error {
tsKeyCid, err := ts.Key().Cid()
if err != nil {
return fmt.Errorf("failed to get tipset key cid: %w", err)
return xerrors.Errorf("failed to get tipset key cid: %w", err)
}

var expectEvents int
var expectEntries int
var problems []string
var hasEvents bool

for _, receipt := range receipts {
if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil {
continue
if !compareAmt {
// compare by counting events, using ChainGetEvents to load the events from the chain
var expectEvents int
var expectEntries int

for _, receipt := range receipts {
if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil {
continue
}
events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot)
if err != nil {
return xerrors.Errorf("failed to load events for tipset %s: %w", currTs, err)
}
if len(events) > 0 {
hasEvents = true
}
expectEvents += len(events)
for _, event := range events {
expectEntries += len(event.Entries)
}
}
events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot)
if err != nil {
return fmt.Errorf("failed to load events for tipset %s: %w", currTs, err)

var actualEvents int
if err := stmtEventCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEvents); err != nil {
return xerrors.Errorf("failed to count events for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
}
expectEvents += len(events)
for _, event := range events {
expectEntries += len(event.Entries)
var actualEntries int
if err := stmtEntryCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEntries); err != nil {
return xerrors.Errorf("failed to count entries for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
}
}

var problems []string
if actualEvents != expectEvents {
problems = append(problems, fmt.Sprintf("expected %d events, got %d", expectEvents, actualEvents))
}
if actualEntries != expectEntries {
problems = append(problems, fmt.Sprintf("expected %d entries, got %d", expectEntries, actualEntries))
}
} else { // compareAmt
addrIdCache := make(map[address.Address]abi.ActorID)
eventIndex := 0
for msgIndex, receipt := range receipts {
if receipt.EventsRoot == nil {
continue
}

amtRoot, problem, err := func() (cid.Cid, string, error) {
events := make([]cbg.CBORMarshaler, 0)

rows, err := stmtSelectEventIdAndEmitter.Query(tsKeyCid.Bytes(), messages[msgIndex].Cid.Bytes())
if err != nil {
return cid.Undef, "", xerrors.Errorf("failed to query events: %w", err)
}
defer func() {
_ = rows.Close()
}()

for rows.Next() {
var eventId int
var emitterAddr []byte
if err := rows.Scan(&eventId, &emitterAddr); err != nil {
return cid.Undef, "", xerrors.Errorf("failed to scan row: %w", err)
}

addr, err := address.NewFromBytes(emitterAddr)
if err != nil {
return cid.Undef, "", xerrors.Errorf("failed to parse address: %w", err)
}
var actorId abi.ActorID
if id, ok := addrIdCache[addr]; ok {
actorId = id
} else {
if addr.Protocol() != address.ID {
idAddr, err := api.StateLookupID(ctx, addr, ts.Key())
if err != nil {
// TODO: fix this? we should be able to resolve all addresses
return cid.Undef, fmt.Sprintf("failed to resolve address (%s), could not compare amt", addr.String()), nil
}
addr = idAddr
}
id, err := address.IDFromAddress(addr)
if err != nil {
return cid.Undef, "", xerrors.Errorf("failed to get ID from address: %w", err)
}
actorId = abi.ActorID(id)
addrIdCache[addr] = actorId
}

event := types.Event{
Emitter: actorId,
Entries: make([]types.EventEntry, 0),
}

rows2, err := stmtSelectEventEntries.Query(eventId)
if err != nil {
return cid.Undef, "", xerrors.Errorf("failed to query event entries: %w", err)
}
defer func() {
_ = rows2.Close()
}()

for rows2.Next() {
var flags []byte
var key string
var codec uint64
var value []byte
if err := rows2.Scan(&flags, &key, &codec, &value); err != nil {
return cid.Undef, "", xerrors.Errorf("failed to scan row: %w", err)
}
entry := types.EventEntry{
Flags: flags[0],
Key: key,
Codec: codec,
Value: value,
}
event.Entries = append(event.Entries, entry)
}

events = append(events, &event)
}

getEvents, err := api.ChainGetEvents(ctx, *receipt.EventsRoot)
if err != nil {
return cid.Undef, "", xerrors.Errorf("failed to load events: %w", err)
}
if len(getEvents) != len(events) {
return cid.Undef, fmt.Sprintf("event count mismatch for message %d (%d != %d)", msgIndex, len(getEvents), len(events)), nil
}
if len(events) > 0 {
hasEvents = true
}

root, err := amt4.FromArray(ctx, cbor.NewCborStore(bstore.NewMemory()), events, amt4.UseTreeBitWidth(types.EventAMTBitwidth))
if err != nil {
return cid.Undef, "", xerrors.Errorf("failed to create AMT: %w", err)
}
return root, "", nil
}()
if err != nil {
return err
}

if problem != "" {
problems = append(problems, problem)
} else if amtRoot != *receipt.EventsRoot {
problems = append(problems, fmt.Sprintf("events root mismatch for event %d", eventIndex))
}

eventIndex++
}
}

var seenHeight int
var seenReverted int
if err := stmtTipsetSeen.QueryRow(tsKeyCid.Bytes()).Scan(&seenHeight, &seenReverted); err != nil {
if err == sql.ErrNoRows {
if expectEvents > 0 {
if hasEvents {
problems = append(problems, "not in events_seen table")
} else {
problems = append(problems, "zero-event epoch not in events_seen table")
}
} else {
return fmt.Errorf("failed to check if tipset is seen: %w", err)
return xerrors.Errorf("failed to check if tipset is seen: %w", err)
}
} else {
if seenHeight != int(ts.Height()) {
Expand All @@ -530,24 +688,8 @@ var inspectEventsCmd = &cli.Command{
}
}

var actualEvents int
if err := stmtEventCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEvents); err != nil {
return fmt.Errorf("failed to count events for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
}
var actualEntries int
if err := stmtEntryCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEntries); err != nil {
return fmt.Errorf("failed to count entries for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
}

if actualEvents != expectEvents {
problems = append(problems, fmt.Sprintf("expected %d events, got %d", expectEvents, actualEvents))
}
if actualEntries != expectEntries {
problems = append(problems, fmt.Sprintf("expected %d entries, got %d", expectEntries, actualEntries))
}

if len(problems) > 0 {
_, _ = fmt.Fprintf(cctx.App.Writer, "✗ Epoch %d (%s): %s\n", ts.Height(), tsKeyCid, problems)
_, _ = fmt.Fprintf(cctx.App.Writer, "✗ Epoch %d (%s): %s\n", ts.Height(), tsKeyCid, strings.Join(problems, ", "))
} else if logGood {
_, _ = fmt.Fprintf(cctx.App.Writer, "✓ Epoch %d (%s)\n", ts.Height(), tsKeyCid)
}
Expand All @@ -556,14 +698,26 @@ var inspectEventsCmd = &cli.Command{
}

for i := 0; ctx.Err() == nil && i < epochs; i++ {
// get receipts for the parent of the previous tipset (which will be currTs)
receipts, err := api.ChainGetParentReceipts(ctx, prevTs.Blocks()[0].Cid())
// get receipts and messages for the parent of the previous tipset (which will be currTs)

blockCid := prevTs.Blocks()[0].Cid()

messages, err := api.ChainGetParentMessages(ctx, blockCid)
if err != nil {
_, _ = fmt.Fprintf(cctx.App.ErrWriter, "Missing parent messages for epoch %d (checked %d epochs)", prevTs.Height(), i)
break
}
receipts, err := api.ChainGetParentReceipts(ctx, blockCid)
if err != nil {
_, _ = fmt.Fprintf(cctx.App.ErrWriter, "Missing parent receipts for epoch %d (checked %d epochs)", prevTs.Height(), i)
break
}

err = processHeight(ctx, currTs, receipts)
if len(messages) != len(receipts) {
return fmt.Errorf("mismatched in message and receipt count: %d != %d", len(messages), len(receipts))
}

err = processHeight(ctx, currTs, messages, receipts)
if err != nil {
return err
}
Expand All @@ -572,7 +726,7 @@ var inspectEventsCmd = &cli.Command{
prevTs = currTs
currTs, err = api.ChainGetTipSet(ctx, currTs.Parents())
if err != nil {
return fmt.Errorf("failed to load tipset %s: %w", currTs, err)
return xerrors.Errorf("failed to load tipset %s: %w", currTs, err)
}
}

Expand Down

0 comments on commit eccb10d

Please sign in to comment.