Skip to content

Commit

Permalink
feat(events): compare-amt option for lotus-shed indexes inspect-events
Browse files Browse the repository at this point in the history
Instead of relying just on entry counts, compare the regenerated AMT root using
just what we have in the db with the message receipt event root. This should
tell us precisely that we have what we should or not.

Ref: #12570
  • Loading branch information
rvagg committed Oct 17, 2024
1 parent d516f3a commit c4a83ec
Show file tree
Hide file tree
Showing 2 changed files with 241 additions and 41 deletions.
3 changes: 1 addition & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,13 @@
- Add ChainSafe operated Calibration archival node to the bootstrap list ([filecoin-project/lotus#12517](https://github.com/filecoin-project/lotus/pull/12517))
- Fix hotloop in F3 pariticpation API ([filecoin-project/lotus#12575](https://github.com/filecoin-project/lotus/pull/12575))
- `lotus chain head` now supports a `--height` flag to print just the epoch number of the current chain head ([filecoin-project/lotus#12609](https://github.com/filecoin-project/lotus/pull/12609))
- `lotus-shed indexes inspect-indexes` now performs a comprehensive comparison of the event index data for each message by comparing the AMT root CID from the message receipt with the root of a reconstructed AMT. Previously `inspect-indexes` simply compared event counts, comparing AMT roots confirms all the event data is byte-perfect. ([filecoin-project/lotus#12570](https://github.com/filecoin-project/lotus/pull/12570))

## Bug Fixes
- Fix a bug in the `lotus-shed indexes backfill-events` command that may result in either duplicate events being backfilled where there are existing events (such an operation *should* be idempotent) or events erroneously having duplicate `logIndex` values when queried via ETH APIs. ([filecoin-project/lotus#12567](https://github.com/filecoin-project/lotus/pull/12567))
- Event APIs (Eth events and actor events) should only return reverted events if client queries by specific block hash / tipset. Eth and actor event subscription APIs should always return reverted events to enable accurate observation of real-time changes. ([filecoin-project/lotus#12585](https://github.com/filecoin-project/lotus/pull/12585))
- Add logic to check if the miner's owner address is delegated (f4 address). If it is delegated, the `lotus-shed sectors termination-estimate` command now sends the termination state call using the worker ID. This fix resolves the issue where termination-estimate did not function correctly for miners with delegated owner addresses. ([filecoin-project/lotus#12569](https://github.com/filecoin-project/lotus/pull/12569))

## Improvements

## Deps

# UNRELEASED Node v1.30.0
Expand Down
279 changes: 240 additions & 39 deletions cmd/lotus-shed/indexes.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,21 @@ import (
"strings"
"time"

"github.com/ipfs/go-cid"
cbor "github.com/ipfs/go-ipld-cbor"
"github.com/mitchellh/go-homedir"
"github.com/urfave/cli/v2"
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
amt4 "github.com/filecoin-project/go-amt-ipld/v4"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/exitcode"

lapi "github.com/filecoin-project/lotus/api"
bstore "github.com/filecoin-project/lotus/blockstore"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/ethtypes"
lcli "github.com/filecoin-project/lotus/cli"
Expand All @@ -34,6 +39,10 @@ const (
insertEntry = `INSERT OR IGNORE INTO event_entry(event_id, indexed, flags, key, codec, value) VALUES(?, ?, ?, ?, ?, ?)`
upsertEventsSeen = `INSERT INTO events_seen(height, tipset_key_cid, reverted) VALUES(?, ?, false) ON CONFLICT(height, tipset_key_cid) DO UPDATE SET reverted=false`
tipsetSeen = `SELECT height,reverted FROM events_seen WHERE tipset_key_cid=?`

// these queries are used to extract just the information used to reconstruct the event AMT from the database
selectEventIdAndEmitter = `SELECT id, emitter_addr FROM event WHERE tipset_key_cid=? and message_cid=? ORDER BY event_index ASC`
selectEventEntries = `SELECT flags, key, codec, value FROM event_entry WHERE event_id=? ORDER BY _rowid_ ASC`
)

func withCategory(cat string, cmd *cli.Command) *cli.Command {
Expand Down Expand Up @@ -483,87 +492,145 @@ var inspectEventsCmd = &cli.Command{
if err != nil {
return err
}
stmtSelectEventIdAndEmitter, err := db.Prepare(selectEventIdAndEmitter)
if err != nil {
return err
}
stmtSelectEventEntries, err := db.Prepare(selectEventEntries)
if err != nil {
return err
}

processHeight := func(ctx context.Context, ts *types.TipSet, receipts []*types.MessageReceipt) error {
tsKeyCid, err := ts.Key().Cid()
processHeight := func(ctx context.Context, messages []lapi.Message, receipts []*types.MessageReceipt) error {
tsKeyCid, err := currTs.Key().Cid()
if err != nil {
return fmt.Errorf("failed to get tipset key cid: %w", err)
return xerrors.Errorf("failed to get tipset key cid: %w", err)
}

var expectEvents int
var expectEntries int
var problems []string
var hasEvents bool

for _, receipt := range receipts {
if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil {
continue
checkEventAndEntryCounts := func() error {
// compare by counting events, using ChainGetEvents to load the events from the chain
expectEvents, expectEntries, err := chainEventAndEntryCountsAt(ctx, currTs, receipts, api)
if err != nil {
return err
}
events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot)
if expectEvents > 0 {
hasEvents = true
}

actualEvents, actualEntries, err := dbEventAndEntryCountsAt(currTs, stmtEventCount, stmtEntryCount)
if err != nil {
return fmt.Errorf("failed to load events for tipset %s: %w", currTs, err)
return err
}
expectEvents += len(events)
for _, event := range events {
expectEntries += len(event.Entries)

if actualEvents != expectEvents {
problems = append(problems, fmt.Sprintf("expected %d events, got %d", expectEvents, actualEvents))
}
if actualEntries != expectEntries {
problems = append(problems, fmt.Sprintf("expected %d entries, got %d", expectEntries, actualEntries))
}

return nil
}

var problems []string
// Compare the AMT roots: we reconstruct the event AMT from the database data we have and
// compare it with the on-chain AMT root from the receipt. If it's the same CID then we have
// exactly the same event data. Any variation, in number of events, and even a single byte
// in event data, will be considered a mismatch.

// cache for address -> actorID because it's typical for tipsets to generate many events for
// the same actors so we can try and avoid too many StateLookupID calls
addrIdCache := make(map[address.Address]abi.ActorID)

eventIndex := 0
for msgIndex, receipt := range receipts {
if receipt.EventsRoot == nil {
continue
}

amtRoot, has, problem, err := amtRootForEvents(
ctx,
api,
tsKeyCid,
prevTs.Key(),
stmtSelectEventIdAndEmitter,
stmtSelectEventEntries,
messages[msgIndex],
addrIdCache,
)
if err != nil {
return err
}
if has && !hasEvents {
hasEvents = true
}

if problem != "" {
problems = append(problems, problem)
} else if amtRoot != *receipt.EventsRoot {
problems = append(problems, fmt.Sprintf("events root mismatch for event %d", eventIndex))
// also provide more information about the mismatch
if err := checkEventAndEntryCounts(); err != nil {
return err
}
}

eventIndex++
}

var seenHeight int
var seenReverted int
if err := stmtTipsetSeen.QueryRow(tsKeyCid.Bytes()).Scan(&seenHeight, &seenReverted); err != nil {
if err == sql.ErrNoRows {
if expectEvents > 0 {
if hasEvents {
problems = append(problems, "not in events_seen table")
} else {
problems = append(problems, "zero-event epoch not in events_seen table")
}
} else {
return fmt.Errorf("failed to check if tipset is seen: %w", err)
return xerrors.Errorf("failed to check if tipset is seen: %w", err)
}
} else {
if seenHeight != int(ts.Height()) {
if seenHeight != int(currTs.Height()) {
problems = append(problems, fmt.Sprintf("events_seen height mismatch (%d)", seenHeight))
}
if seenReverted != 0 {
problems = append(problems, "events_seen marked as reverted")
}
}

var actualEvents int
if err := stmtEventCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEvents); err != nil {
return fmt.Errorf("failed to count events for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
}
var actualEntries int
if err := stmtEntryCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEntries); err != nil {
return fmt.Errorf("failed to count entries for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
}

if actualEvents != expectEvents {
problems = append(problems, fmt.Sprintf("expected %d events, got %d", expectEvents, actualEvents))
}
if actualEntries != expectEntries {
problems = append(problems, fmt.Sprintf("expected %d entries, got %d", expectEntries, actualEntries))
}

if len(problems) > 0 {
_, _ = fmt.Fprintf(cctx.App.Writer, "✗ Epoch %d (%s): %s\n", ts.Height(), tsKeyCid, problems)
_, _ = fmt.Fprintf(cctx.App.Writer, "✗ Epoch %d (%s): %s\n", currTs.Height(), tsKeyCid, strings.Join(problems, ", "))
} else if logGood {
_, _ = fmt.Fprintf(cctx.App.Writer, "✓ Epoch %d (%s)\n", ts.Height(), tsKeyCid)
_, _ = fmt.Fprintf(cctx.App.Writer, "✓ Epoch %d (%s)\n", currTs.Height(), tsKeyCid)
}

return nil
}

for i := 0; ctx.Err() == nil && i < epochs; i++ {
// get receipts for the parent of the previous tipset (which will be currTs)
receipts, err := api.ChainGetParentReceipts(ctx, prevTs.Blocks()[0].Cid())
// get receipts and messages for the parent of the previous tipset (which will be currTs)

blockCid := prevTs.Blocks()[0].Cid()

messages, err := api.ChainGetParentMessages(ctx, blockCid)
if err != nil {
_, _ = fmt.Fprintf(cctx.App.ErrWriter, "Missing parent messages for epoch %d (checked %d epochs)", prevTs.Height(), i)
break
}
receipts, err := api.ChainGetParentReceipts(ctx, blockCid)
if err != nil {
_, _ = fmt.Fprintf(cctx.App.ErrWriter, "Missing parent receipts for epoch %d (checked %d epochs)", prevTs.Height(), i)
break
}

err = processHeight(ctx, currTs, receipts)
if len(messages) != len(receipts) {
return fmt.Errorf("mismatched in message and receipt count: %d != %d", len(messages), len(receipts))
}

err = processHeight(ctx, messages, receipts)
if err != nil {
return err
}
Expand All @@ -572,14 +639,148 @@ var inspectEventsCmd = &cli.Command{
prevTs = currTs
currTs, err = api.ChainGetTipSet(ctx, currTs.Parents())
if err != nil {
return fmt.Errorf("failed to load tipset %s: %w", currTs, err)
return xerrors.Errorf("failed to load tipset %s: %w", currTs, err)
}
}

return nil
},
}

// amtRootForEvents generates the events AMT root CID for a given message's events, and returns
// whether the message has events, a string describing any non-fatal problem encountered,
// and a fatal error if one occurred.
func amtRootForEvents(
ctx context.Context,
api lapi.FullNode,
tsKeyCid cid.Cid,
prevTsKey types.TipSetKey,
stmtSelectEventIdAndEmitter, stmtSelectEventEntries *sql.Stmt,
message lapi.Message,
addrIdCache map[address.Address]abi.ActorID,
) (cid.Cid, bool, string, error) {

events := make([]cbg.CBORMarshaler, 0)

rows, err := stmtSelectEventIdAndEmitter.Query(tsKeyCid.Bytes(), message.Cid.Bytes())
if err != nil {
return cid.Undef, false, "", xerrors.Errorf("failed to query events: %w", err)
}
defer func() {
_ = rows.Close()
}()

for rows.Next() {
var eventId int
var emitterAddr []byte
if err := rows.Scan(&eventId, &emitterAddr); err != nil {
return cid.Undef, false, "", xerrors.Errorf("failed to scan row: %w", err)
}

addr, err := address.NewFromBytes(emitterAddr)
if err != nil {
return cid.Undef, false, "", xerrors.Errorf("failed to parse address: %w", err)
}
var actorId abi.ActorID
if id, ok := addrIdCache[addr]; ok {
actorId = id
} else {
if addr.Protocol() != address.ID {
// use the previous tipset (height+1) to do an address lookup because the actor
// may have been created in the current tipset (i.e. deferred execution means the
// changed state isn't available until the next epoch)
idAddr, err := api.StateLookupID(ctx, addr, prevTsKey)
if err != nil {
// TODO: fix this? we should be able to resolve all addresses
return cid.Undef, false, fmt.Sprintf("failed to resolve address (%s), could not compare amt", addr.String()), nil
}
addr = idAddr
}
id, err := address.IDFromAddress(addr)
if err != nil {
return cid.Undef, false, "", xerrors.Errorf("failed to get ID from address: %w", err)
}
actorId = abi.ActorID(id)
addrIdCache[addr] = actorId
}

event := types.Event{
Emitter: actorId,
Entries: make([]types.EventEntry, 0),
}

rows2, err := stmtSelectEventEntries.Query(eventId)
if err != nil {
return cid.Undef, false, "", xerrors.Errorf("failed to query event entries: %w", err)
}
defer func() {
_ = rows2.Close()
}()

for rows2.Next() {
var flags []byte
var key string
var codec uint64
var value []byte
if err := rows2.Scan(&flags, &key, &codec, &value); err != nil {
return cid.Undef, false, "", xerrors.Errorf("failed to scan row: %w", err)
}
entry := types.EventEntry{
Flags: flags[0],
Key: key,
Codec: codec,
Value: value,
}
event.Entries = append(event.Entries, entry)
}

events = append(events, &event)
}

// construct the AMT from our slice to an in-memory IPLD store just so we can get the root,
// we don't need the blocks themselves
root, err := amt4.FromArray(ctx, cbor.NewCborStore(bstore.NewMemory()), events, amt4.UseTreeBitWidth(types.EventAMTBitwidth))
if err != nil {
return cid.Undef, false, "", xerrors.Errorf("failed to create AMT: %w", err)
}
return root, len(events) > 0, "", nil
}

func chainEventAndEntryCountsAt(ctx context.Context, ts *types.TipSet, receipts []*types.MessageReceipt, api lapi.FullNode) (int, int, error) {
var expectEvents int
var expectEntries int
for _, receipt := range receipts {
if receipt.ExitCode != exitcode.Ok || receipt.EventsRoot == nil {
continue
}
events, err := api.ChainGetEvents(ctx, *receipt.EventsRoot)
if err != nil {
return 0, 0, xerrors.Errorf("failed to load events for tipset %s: %w", ts, err)
}
expectEvents += len(events)
for _, event := range events {
expectEntries += len(event.Entries)
}
}
return expectEvents, expectEntries, nil
}

func dbEventAndEntryCountsAt(ts *types.TipSet, stmtEventCount, stmtEntryCount *sql.Stmt) (int, int, error) {
tsKeyCid, err := ts.Key().Cid()
if err != nil {
return 0, 0, xerrors.Errorf("failed to get tipset key cid: %w", err)
}
var actualEvents int
if err := stmtEventCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEvents); err != nil {
return 0, 0, xerrors.Errorf("failed to count events for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
}
var actualEntries int
if err := stmtEntryCount.QueryRow(tsKeyCid.Bytes()).Scan(&actualEntries); err != nil {
return 0, 0, xerrors.Errorf("failed to count entries for epoch %d (tsk CID %s): %w", ts.Height(), tsKeyCid, err)
}
return actualEvents, actualEntries, nil
}

var backfillMsgIndexCmd = &cli.Command{
Name: "backfill-msgindex",
Usage: "Backfill the msgindex.db for a number of epochs starting from a specified height",
Expand Down

0 comments on commit c4a83ec

Please sign in to comment.