Skip to content

Commit

Permalink
Improve pruneByBlock performance using block hashes in query (#1075)
Browse files Browse the repository at this point in the history
* use block hashes for pruneByBlock

* reorg test

* format block numbers before operator

* chore: changeset
  • Loading branch information
kyscott18 authored Sep 6, 2024
1 parent 0f6213f commit 6f1a579
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 28 deletions.
5 changes: 5 additions & 0 deletions .changeset/tidy-tips-trade.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@ponder/core": patch
---

Improved performance of reorg handling, that in some cases was leading to query timeouts and stalled indexing.
1 change: 1 addition & 0 deletions packages/core/src/sync-realtime/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ test("handleReorg() finds common ancestor", async (context) => {
expect(onEvent).toHaveBeenCalledWith({
type: "reorg",
block: expect.any(Object),
reorgedBlocks: [expect.any(Object), expect.any(Object), expect.any(Object)],
});

expect(realtimeSync.localChain).toHaveLength(2);
Expand Down
11 changes: 9 additions & 2 deletions packages/core/src/sync-realtime/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ export type RealtimeSyncEvent =
| {
type: "reorg";
block: LightBlock;
reorgedBlocks: LightBlock[];
};

const ERROR_TIMEOUT = [
Expand Down Expand Up @@ -240,6 +241,11 @@ export const createRealtimeSync = (
msg: `Detected forked '${args.network.name}' block at height ${hexToNumber(block.number)}`,
});

// Record blocks that have been removed from the local chain.
const reorgedBlocks = localChain.filter(
(lb) => hexToNumber(lb.number) >= hexToNumber(block.number),
);

// Prune the local chain of blocks that have been reorged out
localChain = localChain.filter(
(lb) => hexToNumber(lb.number) < hexToNumber(block.number),
Expand All @@ -252,7 +258,7 @@ export const createRealtimeSync = (
const parentBlock = getLatestLocalBlock();

if (parentBlock.hash === remoteBlock.parentHash) {
args.onEvent({ type: "reorg", block: parentBlock });
args.onEvent({ type: "reorg", block: parentBlock, reorgedBlocks });

args.common.logger.warn({
service: "realtime",
Expand All @@ -269,7 +275,8 @@ export const createRealtimeSync = (
remoteBlock = await _eth_getBlockByHash(args.requestQueue, {
hash: remoteBlock.parentHash,
});
localChain.pop();
// Add tip to `reorgedBlocks`
reorgedBlocks.push(localChain.pop()!);
}
}

Expand Down
5 changes: 4 additions & 1 deletion packages/core/src/sync-store/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1018,7 +1018,10 @@ test("pruneByBlock", async (context) => {
await syncStore.insertBlocks({ blocks: [rpcData.block3.block], chainId: 1 });
await syncStore.insertBlocks({ blocks: [rpcData.block4.block], chainId: 1 });

await syncStore.pruneByBlock({ fromBlock: 2, chainId: 1 });
await syncStore.pruneByBlock({
blocks: [rpcData.block3.block, rpcData.block4.block],
chainId: 1,
});

const blocks = await database.syncDb
.selectFrom("blocks")
Expand Down
45 changes: 21 additions & 24 deletions packages/core/src/sync-store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
} from "@/sync/source.js";
import type { CallTrace, Log, TransactionReceipt } from "@/types/eth.js";
import type {
LightBlock,
SyncBlock,
SyncCallTrace,
SyncLog,
Expand Down Expand Up @@ -115,7 +116,7 @@ export type SyncStore = {
chainId: number;
}): Promise<string | null>;
pruneByBlock(args: {
fromBlock: number;
blocks: Pick<LightBlock, "hash" | "number">[];
chainId: number;
}): Promise<void>;
pruneByChain(args: {
Expand Down Expand Up @@ -1272,30 +1273,26 @@ export const createSyncStore = ({

return result?.result ?? null;
}),
pruneByBlock: async ({ fromBlock, chainId }) =>
pruneByBlock: async ({ blocks, chainId }) =>
db.wrap({ method: "pruneByBlock" }, async () => {
await db.transaction().execute(async (tx) => {
await tx
.deleteFrom("logs")
.where("chainId", "=", chainId)
.where("blockNumber", ">", formatBig(sql, fromBlock))
.execute();
await tx
.deleteFrom("blocks")
.where("chainId", "=", chainId)
.where("number", ">", formatBig(sql, fromBlock))
.execute();
await tx
.deleteFrom("rpcRequestResults")
.where("chainId", "=", chainId)
.where("blockNumber", ">", formatBig(sql, fromBlock))
.execute();
await tx
.deleteFrom("callTraces")
.where("chainId", "=", chainId)
.where("blockNumber", ">", formatBig(sql, fromBlock))
.execute();
});
if (blocks.length === 0) return;

const hashes = blocks.map(({ hash }) => hash);
const numbers = blocks.map(({ number }) =>
formatBig(sql, hexToBigInt(number)),
);

await db.deleteFrom("blocks").where("hash", "in", hashes).execute();
await db.deleteFrom("logs").where("blockHash", "in", hashes).execute();
await db
.deleteFrom("callTraces")
.where("blockHash", "in", hashes)
.execute();
await db
.deleteFrom("rpcRequestResults")
.where("chainId", "=", chainId)
.where("blockNumber", "in", numbers)
.execute();
}),
pruneByChain: async ({ fromBlock, chainId }) =>
db.wrap({ method: "pruneByChain" }, () =>
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,7 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
const checkpoint = getChainsCheckpoint("latest")!;

await args.syncStore.pruneByBlock({
fromBlock: hexToNumber(event.block.number),
blocks: event.reorgedBlocks,
chainId: network.chainId,
});

Expand Down

0 comments on commit 6f1a579

Please sign in to comment.