From 6f1a57947e2062ce2f8d1a59068fe751bfc764ef Mon Sep 17 00:00:00 2001 From: kyscott18 <43524469+kyscott18@users.noreply.github.com> Date: Fri, 6 Sep 2024 12:01:48 -0400 Subject: [PATCH] Improve `pruneByBlock` performance using block hashes in query (#1075) * use block hashes for pruneByBlock * reorg test * format block numbers before operator * chore: changeset --- .changeset/tidy-tips-trade.md | 5 +++ packages/core/src/sync-realtime/index.test.ts | 1 + packages/core/src/sync-realtime/index.ts | 11 ++++- packages/core/src/sync-store/index.test.ts | 5 ++- packages/core/src/sync-store/index.ts | 45 +++++++++---------- packages/core/src/sync/index.ts | 2 +- 6 files changed, 41 insertions(+), 28 deletions(-) create mode 100644 .changeset/tidy-tips-trade.md diff --git a/.changeset/tidy-tips-trade.md b/.changeset/tidy-tips-trade.md new file mode 100644 index 000000000..880e39242 --- /dev/null +++ b/.changeset/tidy-tips-trade.md @@ -0,0 +1,5 @@ +--- +"@ponder/core": patch +--- + +Improved performance of reorg handling, that in some cases was leading to query timeouts and stalled indexing. diff --git a/packages/core/src/sync-realtime/index.test.ts b/packages/core/src/sync-realtime/index.test.ts index c8994dbd5..7361ea9ec 100644 --- a/packages/core/src/sync-realtime/index.test.ts +++ b/packages/core/src/sync-realtime/index.test.ts @@ -354,6 +354,7 @@ test("handleReorg() finds common ancestor", async (context) => { expect(onEvent).toHaveBeenCalledWith({ type: "reorg", block: expect.any(Object), + reorgedBlocks: [expect.any(Object), expect.any(Object), expect.any(Object)], }); expect(realtimeSync.localChain).toHaveLength(2); diff --git a/packages/core/src/sync-realtime/index.ts b/packages/core/src/sync-realtime/index.ts index 0b4fd2214..a4fea25b4 100644 --- a/packages/core/src/sync-realtime/index.ts +++ b/packages/core/src/sync-realtime/index.ts @@ -72,6 +72,7 @@ export type RealtimeSyncEvent = | { type: "reorg"; block: LightBlock; + reorgedBlocks: LightBlock[]; }; const ERROR_TIMEOUT = [ @@ -240,6 +241,11 @@ export const createRealtimeSync = ( msg: `Detected forked '${args.network.name}' block at height ${hexToNumber(block.number)}`, }); + // Record blocks that have been removed from the local chain. + const reorgedBlocks = localChain.filter( + (lb) => hexToNumber(lb.number) >= hexToNumber(block.number), + ); + // Prune the local chain of blocks that have been reorged out localChain = localChain.filter( (lb) => hexToNumber(lb.number) < hexToNumber(block.number), @@ -252,7 +258,7 @@ export const createRealtimeSync = ( const parentBlock = getLatestLocalBlock(); if (parentBlock.hash === remoteBlock.parentHash) { - args.onEvent({ type: "reorg", block: parentBlock }); + args.onEvent({ type: "reorg", block: parentBlock, reorgedBlocks }); args.common.logger.warn({ service: "realtime", @@ -269,7 +275,8 @@ export const createRealtimeSync = ( remoteBlock = await _eth_getBlockByHash(args.requestQueue, { hash: remoteBlock.parentHash, }); - localChain.pop(); + // Add tip to `reorgedBlocks` + reorgedBlocks.push(localChain.pop()!); } } diff --git a/packages/core/src/sync-store/index.test.ts b/packages/core/src/sync-store/index.test.ts index 1a3fe9a8c..7fa7270e2 100644 --- a/packages/core/src/sync-store/index.test.ts +++ b/packages/core/src/sync-store/index.test.ts @@ -1018,7 +1018,10 @@ test("pruneByBlock", async (context) => { await syncStore.insertBlocks({ blocks: [rpcData.block3.block], chainId: 1 }); await syncStore.insertBlocks({ blocks: [rpcData.block4.block], chainId: 1 }); - await syncStore.pruneByBlock({ fromBlock: 2, chainId: 1 }); + await syncStore.pruneByBlock({ + blocks: [rpcData.block3.block, rpcData.block4.block], + chainId: 1, + }); const blocks = await database.syncDb .selectFrom("blocks") diff --git a/packages/core/src/sync-store/index.ts b/packages/core/src/sync-store/index.ts index f0afd73bd..bbea9c6f9 100644 --- a/packages/core/src/sync-store/index.ts +++ b/packages/core/src/sync-store/index.ts @@ -21,6 +21,7 @@ import { } from "@/sync/source.js"; import type { CallTrace, Log, TransactionReceipt } from "@/types/eth.js"; import type { + LightBlock, SyncBlock, SyncCallTrace, SyncLog, @@ -115,7 +116,7 @@ export type SyncStore = { chainId: number; }): Promise; pruneByBlock(args: { - fromBlock: number; + blocks: Pick[]; chainId: number; }): Promise; pruneByChain(args: { @@ -1272,30 +1273,26 @@ export const createSyncStore = ({ return result?.result ?? null; }), - pruneByBlock: async ({ fromBlock, chainId }) => + pruneByBlock: async ({ blocks, chainId }) => db.wrap({ method: "pruneByBlock" }, async () => { - await db.transaction().execute(async (tx) => { - await tx - .deleteFrom("logs") - .where("chainId", "=", chainId) - .where("blockNumber", ">", formatBig(sql, fromBlock)) - .execute(); - await tx - .deleteFrom("blocks") - .where("chainId", "=", chainId) - .where("number", ">", formatBig(sql, fromBlock)) - .execute(); - await tx - .deleteFrom("rpcRequestResults") - .where("chainId", "=", chainId) - .where("blockNumber", ">", formatBig(sql, fromBlock)) - .execute(); - await tx - .deleteFrom("callTraces") - .where("chainId", "=", chainId) - .where("blockNumber", ">", formatBig(sql, fromBlock)) - .execute(); - }); + if (blocks.length === 0) return; + + const hashes = blocks.map(({ hash }) => hash); + const numbers = blocks.map(({ number }) => + formatBig(sql, hexToBigInt(number)), + ); + + await db.deleteFrom("blocks").where("hash", "in", hashes).execute(); + await db.deleteFrom("logs").where("blockHash", "in", hashes).execute(); + await db + .deleteFrom("callTraces") + .where("blockHash", "in", hashes) + .execute(); + await db + .deleteFrom("rpcRequestResults") + .where("chainId", "=", chainId) + .where("blockNumber", "in", numbers) + .execute(); }), pruneByChain: async ({ fromBlock, chainId }) => db.wrap({ method: "pruneByChain" }, () => diff --git a/packages/core/src/sync/index.ts b/packages/core/src/sync/index.ts index f3a8e195d..c257d1d4a 100644 --- a/packages/core/src/sync/index.ts +++ b/packages/core/src/sync/index.ts @@ -538,7 +538,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { const checkpoint = getChainsCheckpoint("latest")!; await args.syncStore.pruneByBlock({ - fromBlock: hexToNumber(event.block.number), + blocks: event.reorgedBlocks, chainId: network.chainId, });