diff --git a/CHANGELOG.md b/CHANGELOG.md index 7fa4de71bd..83347a961a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,7 @@ - Reduce size of embedded genesis CAR files by removing WASM actor blocks and compressing with zstd. This reduces the `lotus` binary size by approximately 10 MiB. ([filecoin-project/lotus#12439](https://github.com/filecoin-project/lotus/pull/12439)) - Add ChainSafe operated Calibration archival node to the bootstrap list ([filecoin-project/lotus#12517](https://github.com/filecoin-project/lotus/pull/12517)) - Fix hotloop in F3 pariticpation API ([filecoin-project/lotus#12575](https://github.com/filecoin-project/lotus/pull/12575)) +- `lotus-shed indexes inspect-indexes` now performs a comprehensive comparison of the event index data for each message by comparing the AMT root CID from the message receipt with the root of a reconstructed AMT. Previously `inspect-indexes` simply compared event counts, comparing AMT roots confirms all the event data is byte-perfect. ([filecoin-project/lotus#12570](https://github.com/filecoin-project/lotus/pull/12570)) ## Bug Fixes - Fix a bug in the `lotus-shed indexes backfill-events` command that may result in either duplicate events being backfilled where there are existing events (such an operation *should* be idempotent) or events erroneously having duplicate `logIndex` values when queried via ETH APIs. ([filecoin-project/lotus#12567](https://github.com/filecoin-project/lotus/pull/12567)) @@ -14,8 +15,6 @@ ## Improvements -## Improvements - ## Deps # UNRELEASED Node v1.30.0 diff --git a/cmd/lotus-shed/indexes.go b/cmd/lotus-shed/indexes.go index 4d8e76e138..9b9181b4be 100644 --- a/cmd/lotus-shed/indexes.go +++ b/cmd/lotus-shed/indexes.go @@ -10,16 +10,21 @@ import ( "strings" "time" + "github.com/ipfs/go-cid" + cbor "github.com/ipfs/go-ipld-cbor" "github.com/mitchellh/go-homedir" "github.com/urfave/cli/v2" + cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + amt4 "github.com/filecoin-project/go-amt-ipld/v4" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/exitcode" lapi "github.com/filecoin-project/lotus/api" + bstore "github.com/filecoin-project/lotus/blockstore" "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/chain/types/ethtypes" lcli "github.com/filecoin-project/lotus/cli" @@ -34,6 +39,10 @@ const ( insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)` upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false` tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?` + + // these queries are used to extract just the information used to reconstruct the event AMT from the database + selectEventIdAndEmitter = `SELECT id, emitter_addr FROM event WHERE tipset_key_cid=? and message_cid=? ORDER BY event_index ASC` + selectEventEntries = `SELECT flags, key, codec, value FROM event_entry WHERE event_id=? ORDER BY _rowid_ ASC` ) func withCategory(cat string, cmd *cli.Command) *cli.Command { @@ -483,46 +492,108 @@ var inspectEventsCmd = &cli.Command{ if err != nil { return err } + stmtSelectEventIdAndEmitter, err := db.Prepare(selectEventIdAndEmitter) + if err != nil { + return err + } + stmtSelectEventEntries, err := db.Prepare(selectEventEntries) + if err != nil { + return err + } - processHeight := func(ctx context.Context, ts *types.TipSet, receipts []*types.MessageReceipt) error { - tsKeyCid, err := ts.Key().Cid() + processHeight := func(ctx context.Context, messages []lapi.Message, receipts []*types.MessageReceipt) error { + tsKeyCid, err := currTs.Key().Cid() if err != nil { - return fmt.Errorf("failed to get tipset key cid: %w", err) + return xerrors.Errorf("failed to get tipset key cid: %w", err) } - var expectEvents int - var expectEntries int + var problems []string + var hasEvents bool - for _, receipt := range receipts { - if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil { - continue + checkEventAndEntryCounts := func() error { + // compare by counting events, using ChainGetEvents to load the events from the chain + expectEvents, expectEntries, err := chainEventAndEntryCountsAt(ctx, currTs, receipts, api) + if err != nil { + return err } - events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot) + if expectEvents > 0 { + hasEvents = true + } + + actualEvents, actualEntries, err := dbEventAndEntryCountsAt(currTs, stmtEventCount, stmtEntryCount) if err != nil { - return fmt.Errorf("failed to load events for tipset %s: %w", currTs, err) + return err } - expectEvents += len(events) - for _, event := range events { - expectEntries += len(event.Entries) + + if actualEvents != expectEvents { + problems = append(problems, fmt.Sprintf("expected %d events, got %d", expectEvents, actualEvents)) + } + if actualEntries != expectEntries { + problems = append(problems, fmt.Sprintf("expected %d entries, got %d", expectEntries, actualEntries)) } + + return nil } - var problems []string + // Compare the AMT roots: we reconstruct the event AMT from the database data we have and + // compare it with the on-chain AMT root from the receipt. If it's the same CID then we have + // exactly the same event data. Any variation, in number of events, and even a single byte + // in event data, will be considered a mismatch. + + // cache for address -> actorID because it's typical for tipsets to generate many events for + // the same actors so we can try and avoid too many StateLookupID calls + addrIdCache := make(map[address.Address]abi.ActorID) + + eventIndex := 0 + for msgIndex, receipt := range receipts { + if receipt.EventsRoot == nil { + continue + } + + amtRoot, has, problem, err := amtRootForEvents( + ctx, + api, + tsKeyCid, + prevTs.Key(), + stmtSelectEventIdAndEmitter, + stmtSelectEventEntries, + messages[msgIndex], + addrIdCache, + ) + if err != nil { + return err + } + if has && !hasEvents { + hasEvents = true + } + + if problem != "" { + problems = append(problems, problem) + } else if amtRoot != *receipt.EventsRoot { + problems = append(problems, fmt.Sprintf("events root mismatch for event %d", eventIndex)) + // also provide more information about the mismatch + if err := checkEventAndEntryCounts(); err != nil { + return err + } + } + + eventIndex++ + } var seenHeight int var seenReverted int if err := stmtTipsetSeen.QueryRow(tsKeyCid.Bytes()).Scan(&seenHeight, &seenReverted); err != nil { if err == sql.ErrNoRows { - if expectEvents > 0 { + if hasEvents { problems = append(problems, "not in events_seen table") } else { problems = append(problems, "zero-event epoch not in events_seen table") } } else { - return fmt.Errorf("failed to check if tipset is seen: %w", err) + return xerrors.Errorf("failed to check if tipset is seen: %w", err) } } else { - if seenHeight != int(ts.Height()) { + if seenHeight != int(currTs.Height()) { problems = append(problems, fmt.Sprintf("events_seen height mismatch (%d)", seenHeight)) } if seenReverted != 0 { @@ -530,40 +601,36 @@ var inspectEventsCmd = &cli.Command{ } } - var actualEvents int - if err := stmtEventCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEvents); err != nil { - return fmt.Errorf("failed to count events for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err) - } - var actualEntries int - if err := stmtEntryCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEntries); err != nil { - return fmt.Errorf("failed to count entries for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err) - } - - if actualEvents != expectEvents { - problems = append(problems, fmt.Sprintf("expected %d events, got %d", expectEvents, actualEvents)) - } - if actualEntries != expectEntries { - problems = append(problems, fmt.Sprintf("expected %d entries, got %d", expectEntries, actualEntries)) - } - if len(problems) > 0 { - _, _ = fmt.Fprintf(cctx.App.Writer, "✗ Epoch %d (%s): %s\n", ts.Height(), tsKeyCid, problems) + _, _ = fmt.Fprintf(cctx.App.Writer, "✗ Epoch %d (%s): %s\n", currTs.Height(), tsKeyCid, strings.Join(problems, ", ")) } else if logGood { - _, _ = fmt.Fprintf(cctx.App.Writer, "✓ Epoch %d (%s)\n", ts.Height(), tsKeyCid) + _, _ = fmt.Fprintf(cctx.App.Writer, "✓ Epoch %d (%s)\n", currTs.Height(), tsKeyCid) } return nil } for i := 0; ctx.Err() == nil && i < epochs; i++ { - // get receipts for the parent of the previous tipset (which will be currTs) - receipts, err := api.ChainGetParentReceipts(ctx, prevTs.Blocks()[0].Cid()) + // get receipts and messages for the parent of the previous tipset (which will be currTs) + + blockCid := prevTs.Blocks()[0].Cid() + + messages, err := api.ChainGetParentMessages(ctx, blockCid) + if err != nil { + _, _ = fmt.Fprintf(cctx.App.ErrWriter, "Missing parent messages for epoch %d (checked %d epochs)", prevTs.Height(), i) + break + } + receipts, err := api.ChainGetParentReceipts(ctx, blockCid) if err != nil { _, _ = fmt.Fprintf(cctx.App.ErrWriter, "Missing parent receipts for epoch %d (checked %d epochs)", prevTs.Height(), i) break } - err = processHeight(ctx, currTs, receipts) + if len(messages) != len(receipts) { + return fmt.Errorf("mismatched in message and receipt count: %d != %d", len(messages), len(receipts)) + } + + err = processHeight(ctx, messages, receipts) if err != nil { return err } @@ -572,7 +639,7 @@ var inspectEventsCmd = &cli.Command{ prevTs = currTs currTs, err = api.ChainGetTipSet(ctx, currTs.Parents()) if err != nil { - return fmt.Errorf("failed to load tipset %s: %w", currTs, err) + return xerrors.Errorf("failed to load tipset %s: %w", currTs, err) } } @@ -580,6 +647,140 @@ var inspectEventsCmd = &cli.Command{ }, } +// amtRootForEvents generates the events AMT root CID for a given message's events, and returns +// whether the message has events, a string describing any non-fatal problem encountered, +// and a fatal error if one occurred. +func amtRootForEvents( + ctx context.Context, + api lapi.FullNode, + tsKeyCid cid.Cid, + prevTsKey types.TipSetKey, + stmtSelectEventIdAndEmitter, stmtSelectEventEntries *sql.Stmt, + message lapi.Message, + addrIdCache map[address.Address]abi.ActorID, +) (cid.Cid, bool, string, error) { + + events := make([]cbg.CBORMarshaler, 0) + + rows, err := stmtSelectEventIdAndEmitter.Query(tsKeyCid.Bytes(), message.Cid.Bytes()) + if err != nil { + return cid.Undef, false, "", xerrors.Errorf("failed to query events: %w", err) + } + defer func() { + _ = rows.Close() + }() + + for rows.Next() { + var eventId int + var emitterAddr []byte + if err := rows.Scan(&eventId, &emitterAddr); err != nil { + return cid.Undef, false, "", xerrors.Errorf("failed to scan row: %w", err) + } + + addr, err := address.NewFromBytes(emitterAddr) + if err != nil { + return cid.Undef, false, "", xerrors.Errorf("failed to parse address: %w", err) + } + var actorId abi.ActorID + if id, ok := addrIdCache[addr]; ok { + actorId = id + } else { + if addr.Protocol() != address.ID { + // use the previous tipset (height+1) to do an address lookup because the actor + // may have been created in the current tipset (i.e. deferred execution means the + // changed state isn't available until the next epoch) + idAddr, err := api.StateLookupID(ctx, addr, prevTsKey) + if err != nil { + // TODO: fix this? we should be able to resolve all addresses + return cid.Undef, false, fmt.Sprintf("failed to resolve address (%s), could not compare amt", addr.String()), nil + } + addr = idAddr + } + id, err := address.IDFromAddress(addr) + if err != nil { + return cid.Undef, false, "", xerrors.Errorf("failed to get ID from address: %w", err) + } + actorId = abi.ActorID(id) + addrIdCache[addr] = actorId + } + + event := types.Event{ + Emitter: actorId, + Entries: make([]types.EventEntry, 0), + } + + rows2, err := stmtSelectEventEntries.Query(eventId) + if err != nil { + return cid.Undef, false, "", xerrors.Errorf("failed to query event entries: %w", err) + } + defer func() { + _ = rows2.Close() + }() + + for rows2.Next() { + var flags []byte + var key string + var codec uint64 + var value []byte + if err := rows2.Scan(&flags, &key, &codec, &value); err != nil { + return cid.Undef, false, "", xerrors.Errorf("failed to scan row: %w", err) + } + entry := types.EventEntry{ + Flags: flags[0], + Key: key, + Codec: codec, + Value: value, + } + event.Entries = append(event.Entries, entry) + } + + events = append(events, &event) + } + + // construct the AMT from our slice to an in-memory IPLD store just so we can get the root, + // we don't need the blocks themselves + root, err := amt4.FromArray(ctx, cbor.NewCborStore(bstore.NewMemory()), events, amt4.UseTreeBitWidth(types.EventAMTBitwidth)) + if err != nil { + return cid.Undef, false, "", xerrors.Errorf("failed to create AMT: %w", err) + } + return root, len(events) > 0, "", nil +} + +func chainEventAndEntryCountsAt(ctx context.Context, ts *types.TipSet, receipts []*types.MessageReceipt, api lapi.FullNode) (int, int, error) { + var expectEvents int + var expectEntries int + for _, receipt := range receipts { + if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil { + continue + } + events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot) + if err != nil { + return 0, 0, xerrors.Errorf("failed to load events for tipset %s: %w", ts, err) + } + expectEvents += len(events) + for _, event := range events { + expectEntries += len(event.Entries) + } + } + return expectEvents, expectEntries, nil +} + +func dbEventAndEntryCountsAt(ts *types.TipSet, stmtEventCount, stmtEntryCount *sql.Stmt) (int, int, error) { + tsKeyCid, err := ts.Key().Cid() + if err != nil { + return 0, 0, xerrors.Errorf("failed to get tipset key cid: %w", err) + } + var actualEvents int + if err := stmtEventCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEvents); err != nil { + return 0, 0, xerrors.Errorf("failed to count events for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err) + } + var actualEntries int + if err := stmtEntryCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEntries); err != nil { + return 0, 0, xerrors.Errorf("failed to count entries for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err) + } + return actualEvents, actualEntries, nil +} + var backfillMsgIndexCmd = &cli.Command{ Name: "backfill-msgindex", Usage: "Backfill the msgindex.db for a number of epochs starting from a specified height",