From c2323adfc4418c530702584fdab02af6c77ff1e1 Mon Sep 17 00:00:00 2001 From: Rod Vagg Date: Thu, 10 Oct 2024 15:50:38 +1100 Subject: [PATCH] feat(events): compare-amt option for lotus-shed indexes inspect-events Instead of relying just on entry counts, compare the regenerated AMT root using just what we have in the db with the message receipt event root. This should tell us precisely that we have what we should or not. Ref: https://github.com/filecoin-project/lotus/issues/12570 --- CHANGELOG.md | 1 + cmd/lotus-shed/indexes.go | 213 +++++++++++++++++++++++++++++++------- 2 files changed, 176 insertions(+), 38 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ab7a81ef3..6c8dbce435 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ * Implement [FIP-0081](https://github.com/filecoin-project/FIPs/blob/master/FIPS/fip-0081.md) and its migration for NV24. Initial pledge collateral will now be calculated using a 70% / 30% split between "simple" and "baseline" in the initial consensus pledge contribution to collateral calculation. The change in this calculation will begin at NV24 activation and ramp up from the current split of 100% / 0% to the eventual 70% / 30% over the course of a year so as to minimise impact on existing operations. ([filecoin-project/lotus#12526](https://github.com/filecoin-project/lotus/pull/12526) * Update to F3 0.4.0 ([filecoin-project/lotus#12547](https://github.com/filecoin-project/lotus/pull/12547)). This includes additional performance enhancements and bug fixes. * [Ticket-based F3 participation API](https://github.com/filecoin-project/lotus/pull/12531): This update introduces a new design where participation tickets grant a temporary lease, allowing storage providers to sign as part of a single GPBFT instance at any given point in time. This design ensures that tickets are checked for validity and issuer alignment, handling errors robustly in order to avoid self-equivocation during GPBFT instances. +* `lotus-shed indexes inspect-indexes` now has a `--compare-amt` flag that will do a comprehensive comparison of the event index data for each message by comparing the AMT root CID from the message receipt with the root of a reconstructed AMT. ([filecoin-project/lotus#12570](https://github.com/filecoin-project/lotus/pull/12570)) ## Improvements diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 4d8e76e138..1b6667688b 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -10,16 +10,21 @@ import ( "strings" "time" + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" "github.com/mitchellh/go-homedir" "github.com/urfave/cli/v2" + cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + amt4 "github.com/filecoin-project/go-amt-ipld/v4" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/exitcode" lapi "github.com/filecoin-project/lotus/api" + bstore "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/ethtypes" lcli "github.com/filecoin-project/lotus/cli" @@ -34,6 +39,10 @@ const ( insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?` + + // these queries are used to extract just the information used to reconstruct the event AMT from the database + selectEventIdAndEmitter = `SELECT id, emitter_addr FROM event WHERE tipset_key_cid=? and message_cid=? ORDER BY event_index ASC` + selectEventEntries = `SELECT flags, key, codec, value FROM event_entry WHERE event_id=? ORDER BY _rowid_ ASC` ) func withCategory(cat string, cmd *cli.Command) *cli.Command { @@ -415,6 +424,11 @@ var inspectEventsCmd = &cli.Command{ Usage: "log tipsets that have no detected problems", Value: false, }, + &cli.BoolFlag{ + Name: "compare-amt", + Usage: "compare the reconstructed event AMT with the on-chain AMT", + Value: false, + }, }, Action: func(cctx *cli.Context) error { srv, err := lcli.GetFullNodeServices(cctx) @@ -440,6 +454,7 @@ var inspectEventsCmd = &cli.Command{ } logGood := cctx.Bool("log-good") + compareAmt := cctx.Bool("compare-amt") // advance currTs by one epoch and maintain prevTs as the previous tipset (this allows us to easily use the ChainGetParentMessages/Receipt API) prevTs := currTs @@ -483,43 +498,169 @@ var inspectEventsCmd = &cli.Command{ if err != nil { return err } + stmtSelectEventIdAndEmitter, err := db.Prepare(selectEventIdAndEmitter) + if err != nil { + return err + } + stmtSelectEventEntries, err := db.Prepare(selectEventEntries) + if err != nil { + return err + } - processHeight := func(ctx context.Context, ts *types.TipSet, receipts []*types.MessageReceipt) error { + processHeight := func(ctx context.Context, ts *types.TipSet, messages []lapi.Message, receipts []*types.MessageReceipt) error { tsKeyCid, err := ts.Key().Cid() if err != nil { - return fmt.Errorf("failed to get tipset key cid: %w", err) + return xerrors.Errorf("failed to get tipset key cid: %w", err) } - var expectEvents int - var expectEntries int + var problems []string + var hasEvents bool - for _, receipt := range receipts { - if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil { - continue + if !compareAmt { + // comapre by counting events, using ChainGetEvents to load the events from the chain + var expectEvents int + var expectEntries int + + for _, receipt := range receipts { + if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil { + continue + } + events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot) + if err != nil { + return xerrors.Errorf("failed to load events for tipset %s: %w", currTs, err) + } + if len(events) > 0 { + hasEvents = true + } + expectEvents += len(events) + for _, event := range events { + expectEntries += len(event.Entries) + } } - events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot) - if err != nil { - return fmt.Errorf("failed to load events for tipset %s: %w", currTs, err) + + var actualEvents int + if err := stmtEventCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEvents); err != nil { + return xerrors.Errorf("failed to count events for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err) } - expectEvents += len(events) - for _, event := range events { - expectEntries += len(event.Entries) + var actualEntries int + if err := stmtEntryCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEntries); err != nil { + return xerrors.Errorf("failed to count entries for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err) } - } - var problems []string + if actualEvents != expectEvents { + problems = append(problems, fmt.Sprintf("expected %d events, got %d", expectEvents, actualEvents)) + } + if actualEntries != expectEntries { + problems = append(problems, fmt.Sprintf("expected %d entries, got %d", expectEntries, actualEntries)) + } + } else { // compareAmt + eventIndex := 0 + for msgIndex, receipt := range receipts { + if receipt.EventsRoot == nil { + continue + } + + amtRoot, err := func() (cid.Cid, error) { + events := make([]cbg.CBORMarshaler, 0) + + rows, err := stmtSelectEventIdAndEmitter.Query(tsKeyCid.Bytes(), messages[msgIndex].Cid.Bytes()) + if err != nil { + return cid.Undef, xerrors.Errorf("failed to query events: %w", err) + } + defer func() { + _ = rows.Close() + }() + + for rows.Next() { + var eventId int + var emitterAddr []byte + if err := rows.Scan(&eventId, &emitterAddr); err != nil { + return cid.Undef, xerrors.Errorf("failed to scan row: %w", err) + } + + addr, err := address.NewFromBytes(emitterAddr) + if err != nil { + return cid.Undef, xerrors.Errorf("failed to parse address: %w", err) + } + idAddr, err := api.StateLookupID(ctx, addr, ts.Key()) + if err != nil { + return cid.Undef, xerrors.Errorf("failed to resolve address: %w", err) + } + id, err := address.IDFromAddress(idAddr) + if err != nil { + return cid.Undef, xerrors.Errorf("failed to get ID from address: %w", err) + } + + event := types.Event{ + Emitter: abi.ActorID(id), + Entries: make([]types.EventEntry, 0), + } + + rows2, err := stmtSelectEventEntries.Query(eventId) + if err != nil { + return cid.Undef, xerrors.Errorf("failed to query event entries: %w", err) + } + defer func() { + _ = rows2.Close() + }() + + for rows2.Next() { + var flags []byte + var key string + var codec uint64 + var value []byte + if err := rows2.Scan(&flags, &key, &codec, &value); err != nil { + return cid.Undef, xerrors.Errorf("failed to scan row: %w", err) + } + entry := types.EventEntry{ + Flags: flags[0], + Key: key, + Codec: codec, + Value: value, + } + event.Entries = append(event.Entries, entry) + } + + events = append(events, &event) + } + + getEvents, err := api.ChainGetEvents(ctx, *receipt.EventsRoot) + if err != nil { + return cid.Undef, xerrors.Errorf("failed to load events: %w", err) + } + if len(getEvents) != len(events) { + // this will show up in the mismatched CID, but is interesting for debugging + problems = append(problems, fmt.Sprintf("event count mismatch for message %d (%d != %d)", msgIndex, len(getEvents), len(events))) + } + if len(events) > 0 { + hasEvents = true + } + + return amt4.FromArray(ctx, cbor.NewCborStore(bstore.NewMemory()), events, amt4.UseTreeBitWidth(types.EventAMTBitwidth)) + }() + if err != nil { + return err + } + + if amtRoot != *receipt.EventsRoot { + problems = append(problems, fmt.Sprintf("events root mismatch for event %d", eventIndex)) + } + + eventIndex++ + } + } var seenHeight int var seenReverted int if err := stmtTipsetSeen.QueryRow(tsKeyCid.Bytes()).Scan(&seenHeight, &seenReverted); err != nil { if err == sql.ErrNoRows { - if expectEvents > 0 { + if hasEvents { problems = append(problems, "not in events_seen table") } else { problems = append(problems, "zero-event epoch not in events_seen table") } } else { - return fmt.Errorf("failed to check if tipset is seen: %w", err) + return xerrors.Errorf("failed to check if tipset is seen: %w", err) } } else { if seenHeight != int(ts.Height()) { @@ -530,24 +671,8 @@ var inspectEventsCmd = &cli.Command{ } } - var actualEvents int - if err := stmtEventCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEvents); err != nil { - return fmt.Errorf("failed to count events for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err) - } - var actualEntries int - if err := stmtEntryCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEntries); err != nil { - return fmt.Errorf("failed to count entries for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err) - } - - if actualEvents != expectEvents { - problems = append(problems, fmt.Sprintf("expected %d events, got %d", expectEvents, actualEvents)) - } - if actualEntries != expectEntries { - problems = append(problems, fmt.Sprintf("expected %d entries, got %d", expectEntries, actualEntries)) - } - if len(problems) > 0 { - _, _ = fmt.Fprintf(cctx.App.Writer, "✗ Epoch %d (%s): %s\n", ts.Height(), tsKeyCid, problems) + _, _ = fmt.Fprintf(cctx.App.Writer, "✗ Epoch %d (%s): %s\n", ts.Height(), tsKeyCid, strings.Join(problems, ", ")) } else if logGood { _, _ = fmt.Fprintf(cctx.App.Writer, "✓ Epoch %d (%s)\n", ts.Height(), tsKeyCid) } @@ -556,14 +681,26 @@ var inspectEventsCmd = &cli.Command{ } for i := 0; ctx.Err() == nil && i < epochs; i++ { - // get receipts for the parent of the previous tipset (which will be currTs) - receipts, err := api.ChainGetParentReceipts(ctx, prevTs.Blocks()[0].Cid()) + // get receipts and messages for the parent of the previous tipset (which will be currTs) + + blockCid := prevTs.Blocks()[0].Cid() + + messages, err := api.ChainGetParentMessages(ctx, blockCid) + if err != nil { + _, _ = fmt.Fprintf(cctx.App.ErrWriter, "Missing parent messages for epoch %d (checked %d epochs)", prevTs.Height(), i) + break + } + receipts, err := api.ChainGetParentReceipts(ctx, blockCid) if err != nil { _, _ = fmt.Fprintf(cctx.App.ErrWriter, "Missing parent receipts for epoch %d (checked %d epochs)", prevTs.Height(), i) break } - err = processHeight(ctx, currTs, receipts) + if len(messages) != len(receipts) { + return fmt.Errorf("mismatched in message and receipt count: %d != %d", len(messages), len(receipts)) + } + + err = processHeight(ctx, currTs, messages, receipts) if err != nil { return err } @@ -572,7 +709,7 @@ var inspectEventsCmd = &cli.Command{ prevTs = currTs currTs, err = api.ChainGetTipSet(ctx, currTs.Parents()) if err != nil { - return fmt.Errorf("failed to load tipset %s: %w", currTs, err) + return xerrors.Errorf("failed to load tipset %s: %w", currTs, err) } }