diff --git a/.changeset/slow-jeans-nail.md b/.changeset/slow-jeans-nail.md new file mode 100644 index 000000000..6ff2fc876 --- /dev/null +++ b/.changeset/slow-jeans-nail.md @@ -0,0 +1,5 @@ +--- +"@ponder/core": patch +--- + +Fixed a bug introduced in v0.6 that caused events to be skipped near the end of the historical backfill. This bug did not affect the sync cache and does not require the app to be resynced. diff --git a/packages/core/src/sync/index.ts b/packages/core/src/sync/index.ts index 7c681f65d..a82fd1b5c 100644 --- a/packages/core/src/sync/index.ts +++ b/packages/core/src/sync/index.ts @@ -15,7 +15,6 @@ import type { SyncStore } from "@/sync-store/index.js"; import type { LightBlock, SyncBlock } from "@/types/sync.js"; import { type Checkpoint, - checkpointMin, decodeCheckpoint, encodeCheckpoint, maxCheckpoint, @@ -83,7 +82,7 @@ export type Status = { export type SyncProgress = { start: SyncBlock | LightBlock; end: SyncBlock | LightBlock | undefined; - cached: SyncBlock | undefined; + cached: SyncBlock | LightBlock | undefined; current: SyncBlock | LightBlock | undefined; finalized: SyncBlock | LightBlock; }; @@ -118,7 +117,7 @@ export const blockToCheckpoint = ( * Returns true if all filters have a defined end block and the current * sync progress has reached the final end block. */ -export const isSyncComplete = (syncProgress: SyncProgress) => { +export const isSyncEnd = (syncProgress: SyncProgress) => { if (syncProgress.end === undefined || syncProgress.current === undefined) { return false; } @@ -129,8 +128,22 @@ export const isSyncComplete = (syncProgress: SyncProgress) => { ); }; +/** Returns true if sync progress has reached the finalized block. */ +export const isSyncFinalized = (syncProgress: SyncProgress) => { + if (syncProgress.current === undefined) { + return false; + } + + return ( + hexToNumber(syncProgress.current.number) >= + hexToNumber(syncProgress.finalized.number) + ); +}; + /** Returns the closest-to-tip block that is part of the historical sync. */ -export const getHistoricalLast = (syncProgress: SyncProgress) => { +export const getHistoricalLast = ( + syncProgress: Pick, +) => { return syncProgress.end === undefined ? syncProgress.finalized : hexToNumber(syncProgress.end.number) > @@ -139,6 +152,16 @@ export const getHistoricalLast = (syncProgress: SyncProgress) => { : syncProgress.end; }; +/** Compute the minimum checkpoint, filtering out undefined */ +export const min = (...checkpoints: (string | undefined)[]) => { + return checkpoints.reduce((acc, cur) => { + if (cur === undefined) return acc; + if (acc === undefined) return cur; + if (acc < cur) return acc; + return cur; + })!; +}; + /** Returns the checkpoint for a given block tag. */ export const getChainCheckpoint = ({ syncProgress, @@ -153,7 +176,7 @@ export const getChainCheckpoint = ({ return undefined; } - if (tag === "current" && isSyncComplete(syncProgress)) { + if (tag === "current" && isSyncEnd(syncProgress)) { return undefined; } @@ -231,6 +254,14 @@ export const createSync = async (args: CreateSyncParameters): Promise => { }), onFatalError: args.onFatalError, }); + + const { start, end, finalized } = await syncDiagnostic({ + common: args.common, + sources, + requestQueue, + network, + }); + const cached = await getCachedBlock({ sources, requestQueue, @@ -246,12 +277,9 @@ export const createSync = async (args: CreateSyncParameters): Promise => { } const syncProgress: SyncProgress = { - ...(await syncDiagnostic({ - common: args.common, - sources, - requestQueue, - network, - })), + start, + end, + finalized, cached, current: cached, }; @@ -314,11 +342,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { return undefined; } - return encodeCheckpoint( - checkpointMin( - ...checkpoints.map((c) => (c ? decodeCheckpoint(c) : maxCheckpoint)), - ), - ); + return min(...checkpoints); }; const updateHistoricalStatus = ({ @@ -432,19 +456,12 @@ export const createSync = async (args: CreateSyncParameters): Promise => { continue; } - /** - * Calculate the mininum "current" checkpoint, falling back to `end` if - * all networks have completed. - * - * `end`: If every network has an `endBlock` and it's less than - * `finalized`, use that. Otherwise, use `finalized` - */ - const end = - getOmnichainCheckpoint("end") !== undefined && - getOmnichainCheckpoint("end")! < getOmnichainCheckpoint("finalized")! - ? getOmnichainCheckpoint("end")! - : getOmnichainCheckpoint("finalized")!; - const to = getOmnichainCheckpoint("current") ?? end; + // Calculate the mininum "current" checkpoint, limited by "finalized" and "end" + const to = min( + getOmnichainCheckpoint("end"), + getOmnichainCheckpoint("finalized"), + getOmnichainCheckpoint("current"), + ); /* * Extract events with `syncStore.getEvents()`, paginating to @@ -521,7 +538,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { const allHistoricalSyncExhaustive = Array.from( localSyncContext.values(), ).every(({ syncProgress }) => { - if (isSyncComplete(syncProgress)) return true; + if (isSyncEnd(syncProgress)) return true; // Determine if `finalized` block is considered "stale" const staleSeconds = (Date.now() - latestFinalizedFetch) / 1_000; @@ -757,7 +774,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { * The realtime service can be killed if `endBlock` is * defined has become finalized. */ - if (isSyncComplete(syncProgress)) { + if (isSyncEnd(syncProgress)) { args.common.metrics.ponder_sync_is_realtime.set( { network: network.name }, 0, @@ -829,7 +846,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { }; status[network.name]!.ready = true; - if (isSyncComplete(syncProgress)) { + if (isSyncEnd(syncProgress)) { args.common.metrics.ponder_sync_is_complete.set( { network: network.name }, 1, @@ -950,7 +967,7 @@ export const getCachedBlock = ({ sources: Source[]; requestQueue: RequestQueue; historicalSync: HistoricalSync; -}): Promise | undefined => { +}): Promise | undefined => { const latestCompletedBlocks = sources.map(({ filter }) => { const requiredInterval = [ filter.fromBlock, @@ -1187,11 +1204,7 @@ export async function* localHistoricalSyncGenerator({ yield; - if ( - isSyncComplete(syncProgress) || - hexToNumber(syncProgress.finalized.number) === - hexToNumber(syncProgress.current.number) - ) { + if (isSyncEnd(syncProgress) || isSyncFinalized(syncProgress)) { return; } } diff --git a/packages/core/src/utils/generators.test.ts b/packages/core/src/utils/generators.test.ts index f925b96b2..a0db1a79d 100644 --- a/packages/core/src/utils/generators.test.ts +++ b/packages/core/src/utils/generators.test.ts @@ -36,3 +36,45 @@ test("mergeAsyncGenerators", async () => { expect(results).toStrictEqual([1, 2, 3, 4]); }); + +test("mergeAsyncGenerators results", async () => { + const p1 = promiseWithResolvers(); + const p2 = promiseWithResolvers(); + const p3 = promiseWithResolvers(); + const p4 = promiseWithResolvers(); + + async function* generator1() { + yield await p1.promise; + yield await p2.promise; + } + + async function* generator2() { + yield await p3.promise; + yield await p4.promise; + } + + const results: number[] = []; + const generator = mergeAsyncGenerators([generator1(), generator2()]); + + const lock = promiseWithResolvers(); + + (async () => { + for await (const result of generator) { + await lock.promise; + results.push(result); + } + })(); + + p1.resolve(1); + p2.resolve(2); + await new Promise((res) => setTimeout(res)); + p3.resolve(3); + p4.resolve(4); + await new Promise((res) => setTimeout(res)); + + lock.resolve(); + + await new Promise((res) => setTimeout(res)); + + expect(results).toStrictEqual([1, 2, 3, 4]); +}); diff --git a/packages/core/src/utils/generators.ts b/packages/core/src/utils/generators.ts index 952d14ac9..dd4ee6145 100644 --- a/packages/core/src/utils/generators.ts +++ b/packages/core/src/utils/generators.ts @@ -16,7 +16,7 @@ export async function* mergeAsyncGenerators( pwr.resolve(); }); - while (count > 0) { + while (count > 0 || results.length > 0) { if (results.length > 0) { yield results.shift()!; } else {