diff --git a/.changeset/strong-bags-yawn.md b/.changeset/strong-bags-yawn.md new file mode 100644 index 000000000..354247bfb --- /dev/null +++ b/.changeset/strong-bags-yawn.md @@ -0,0 +1,5 @@ +--- +"@ponder/core": patch +--- + +Added validations for inconsistent RPC responses. diff --git a/packages/core/src/sync-historical/index.test.ts b/packages/core/src/sync-historical/index.test.ts index fcd3d4c75..9ddfcfd34 100644 --- a/packages/core/src/sync-historical/index.test.ts +++ b/packages/core/src/sync-historical/index.test.ts @@ -61,6 +61,7 @@ test("createHistoricalSync()", async (context) => { sources: [context.sources[0]], syncStore, requestQueue: await getRequestQueue(context.requestQueues[0]), + onFatalError: () => {}, }); expect(historicalSync).toBeDefined(); @@ -77,6 +78,7 @@ test("sync() with log filter", async (context) => { sources: [context.sources[0]], syncStore, requestQueue: await getRequestQueue(context.requestQueues[0]), + onFatalError: () => {}, }); await historicalSync.sync([0, 5]); @@ -106,6 +108,7 @@ test("sync() with log filter and transaction receipts", async (context) => { sources: [context.sources[0]], syncStore, requestQueue: await getRequestQueue(context.requestQueues[0]), + onFatalError: () => {}, }); await historicalSync.sync([0, 5]); @@ -140,6 +143,7 @@ test("sync() with block filter", async (context) => { sources: [context.sources[4]], syncStore, requestQueue: await getRequestQueue(context.requestQueues[0]), + onFatalError: () => {}, }); await historicalSync.sync([0, 5]); @@ -170,6 +174,7 @@ test("sync() with log factory", async (context) => { sources: [context.sources[1]], syncStore, requestQueue: await getRequestQueue(context.requestQueues[0]), + onFatalError: () => {}, }); await historicalSync.sync([0, 5]); @@ -197,6 +202,7 @@ test("sync() with trace filter", async (context) => { sources: [context.sources[3]], syncStore, requestQueue: await getRequestQueue(context.requestQueues[0]), + onFatalError: () => {}, }); await historicalSync.sync([0, 5]); @@ -227,6 +233,7 @@ test("sync() with many filters", async (context) => { sources: context.sources, syncStore, requestQueue: await getRequestQueue(context.requestQueues[0]), + onFatalError: () => {}, }); await historicalSync.sync([0, 5]); @@ -252,6 +259,7 @@ test("sync() with cache hit", async (context) => { sources: [context.sources[0]], syncStore, requestQueue: await getRequestQueue(context.requestQueues[0]), + onFatalError: () => {}, }); await historicalSync.sync([0, 5]); @@ -265,6 +273,7 @@ test("sync() with cache hit", async (context) => { sources: [context.sources[0]], syncStore, requestQueue: await getRequestQueue(context.requestQueues[0]), + onFatalError: () => {}, }); await historicalSync.sync([0, 5]); @@ -289,6 +298,7 @@ test("syncBlock() with cache", async (context) => { ], syncStore, requestQueue: await getRequestQueue(context.requestQueues[0]), + onFatalError: () => {}, }); const spy = vi.spyOn(context.requestQueues[0], "request"); @@ -320,6 +330,7 @@ test("syncAddress() handles many addresses", async (context) => { sources: [context.sources[1]], syncStore, requestQueue: await getRequestQueue(context.requestQueues[0]), + onFatalError: () => {}, }); await historicalSync.sync([0, 10 + 5 + 2]); diff --git a/packages/core/src/sync-historical/index.ts b/packages/core/src/sync-historical/index.ts index 46c0d2685..b5caaef76 100644 --- a/packages/core/src/sync-historical/index.ts +++ b/packages/core/src/sync-historical/index.ts @@ -25,7 +25,6 @@ import { _eth_getTransactionReceipt, _trace_filter, } from "@/utils/rpc.js"; -import { dedupe } from "@ponder/common"; import { getLogsRetryHelper } from "@ponder/utils"; import { type Address, @@ -52,6 +51,7 @@ type CreateHistoricalSyncParameters = { syncStore: SyncStore; network: Network; requestQueue: RequestQueue; + onFatalError: (error: Error) => void; }; export const createHistoricalSync = async ( @@ -226,6 +226,27 @@ export const createHistoricalSync = async ( logs.map((log) => syncBlock(hexToBigInt(log.blockNumber))), ); + // Validate that logs point to the valid transaction hash in the block + for (let i = 0; i < logs.length; i++) { + const log = logs[i]!; + const block = blocks[i]!; + + if (block.hash !== log.blockHash) { + throw new Error( + `Detected inconsistent RPC responses. Log with block hash ${log.blockHash} does not match block ${block.hash}.`, + ); + } + + if ( + block.transactions.find((t) => t.hash === log.transactionHash) === + undefined + ) { + throw new Error( + `Detected inconsistent RPC responses. Log with transaction hash ${log.transactionHash} not found in block ${block.hash}.`, + ); + } + } + const transactionHashes = new Set(logs.map((l) => l.transactionHash)); for (const hash of transactionHashes) { transactionsCache.add(hash); @@ -243,7 +264,7 @@ export const createHistoricalSync = async ( if (filter.includeTransactionReceipts) { const transactionReceipts = await Promise.all( - [...transactionHashes].map((hash) => + Array.from(transactionHashes).map((hash) => _eth_getTransactionReceipt(args.requestQueue, { hash }), ), ); @@ -303,9 +324,36 @@ export const createHistoricalSync = async ( if (isKilled) return; + const blocks = await Promise.all( + callTraces.map((trace) => syncBlock(hexToBigInt(trace.blockNumber))), + ); + + const transactionHashes = new Set(callTraces.map((t) => t.transactionHash)); + + // Validate that traces point to the valid transaction hash in the block + for (let i = 0; i < callTraces.length; i++) { + const callTrace = callTraces[i]!; + const block = blocks[i]!; + + if (block.hash !== callTrace.blockHash) { + throw new Error( + `Detected inconsistent RPC responses. Call trace with block hash ${callTrace.blockHash} does not match block ${block.hash}.`, + ); + } + + if ( + block.transactions.find((t) => t.hash === callTrace.transactionHash) === + undefined + ) { + throw new Error( + `Detected inconsistent RPC responses. Call trace with transaction hash ${callTrace.transactionHash} not found in block ${block.hash}.`, + ); + } + } + // Request transactionReceipts to check for reverted transactions. const transactionReceipts = await Promise.all( - dedupe(callTraces.map((t) => t.transactionHash)).map((hash) => + Array.from(transactionHashes).map((hash) => _eth_getTransactionReceipt(args.requestQueue, { hash, }), @@ -325,12 +373,10 @@ export const createHistoricalSync = async ( if (isKilled) return; - const blocks = await Promise.all( - callTraces.map((trace) => syncBlock(hexToBigInt(trace.blockNumber))), - ); - - for (const { transactionHash } of callTraces) { - transactionsCache.add(transactionHash); + for (const hash of transactionHashes) { + if (revertedTransactions.has(hash) === false) { + transactionsCache.add(hash); + } } if (isKilled) return; @@ -453,35 +499,49 @@ export const createHistoricalSync = async ( // Request last block of interval const blockPromise = syncBlock(BigInt(interval[1])); - // sync required intervals, account for chunk sizes - await Promise.all( - requiredIntervals.map(async (interval) => { - if (source.type === "contract") { - const filter = source.filter; - switch (filter.type) { - case "log": { - await syncLogFilter(filter, interval); - break; + try { + // sync required intervals, account for chunk sizes + await Promise.all( + requiredIntervals.map(async (interval) => { + if (source.type === "contract") { + const filter = source.filter; + switch (filter.type) { + case "log": { + await syncLogFilter(filter, interval); + break; + } + + case "callTrace": + await Promise.all( + getChunks({ interval, maxChunkSize: 10 }).map( + async (interval) => { + await syncTraceFilter(filter, interval); + }, + ), + ); + break; + + default: + never(filter); } - - case "callTrace": - await Promise.all( - getChunks({ interval, maxChunkSize: 10 }).map( - async (interval) => { - await syncTraceFilter(filter, interval); - }, - ), - ); - break; - - default: - never(filter); + } else { + await syncBlockFilter(source.filter, interval); } - } else { - await syncBlockFilter(source.filter, interval); - } - }), - ); + }), + ); + } catch (_error) { + const error = _error as Error; + + args.common.logger.error({ + service: "sync", + msg: `Fatal error: Unable to sync '${args.network.name}' from ${interval[0]} to ${interval[1]}.`, + error, + }); + + args.onFatalError(error); + + return; + } if (isKilled) return; diff --git a/packages/core/src/sync-realtime/index.ts b/packages/core/src/sync-realtime/index.ts index 97e67e727..404ce4fcb 100644 --- a/packages/core/src/sync-realtime/index.ts +++ b/packages/core/src/sync-realtime/index.ts @@ -485,9 +485,18 @@ export const createRealtimeSync = ( // Protect against RPCs returning empty logs. Known to happen near chain tip. if (block.logsBloom !== zeroLogsBloom && logs.length === 0) { throw new Error( - `Detected invalid '${args.network.name}' eth_getLogs response.`, + "Detected invalid eth_getLogs response. `block.logsBloom` is not empty but zero logs were returned.", ); } + + // Check that logs refer to the correct block + for (const log of logs) { + if (log.blockHash !== block.hash) { + throw new Error( + "Detected invalid eth_getLogs response. `log.blockHash` does not match requested block hash.", + ); + } + } } if ( @@ -516,7 +525,7 @@ export const createRealtimeSync = ( // Use the fact that any transaction produces a trace. if (block.transactions.length !== 0 && traces.length === 0) { throw new Error( - `Detected invalid '${args.network.name}' trace_block response.`, + "Detected invalid trace_block response. `block.transactions` is not empty but zero traces were returned.", ); } @@ -529,7 +538,7 @@ export const createRealtimeSync = ( for (const trace of callTraces) { if (trace.blockHash !== block.hash) { throw new Error( - `Received call trace with block hash '${trace.blockHash}' that does not match current head block '${block.hash}'`, + "Detected invalid trace_block response. `trace.blockHash` does not match requested block hash.", ); } } @@ -601,6 +610,18 @@ export const createRealtimeSync = ( requiredTransactions.has(hash), ); + // Validate that filtered logs/callTraces point to valid transaction in the block + const blockTransactionsHashes = new Set( + block.transactions.map((t) => t.hash), + ); + for (const hash of Array.from(requiredTransactions)) { + if (blockTransactionsHashes.has(hash) === false) { + throw new Error( + `Detected inconsistent RPC responses. Transaction with hash ${hash} is missing in \`block.transactions\`.`, + ); + } + } + //////// // Transaction Receipts //////// diff --git a/packages/core/src/sync/index.ts b/packages/core/src/sync/index.ts index 26357f5e0..7c681f65d 100644 --- a/packages/core/src/sync/index.ts +++ b/packages/core/src/sync/index.ts @@ -213,6 +213,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { syncStore: args.syncStore, requestQueue, network, + onFatalError: args.onFatalError, }); const realtimeSync = createRealtimeSync({ common: args.common,