From 40d092de8cc7000b96141b6ac2a61b196f3c0ec2 Mon Sep 17 00:00:00 2001 From: Kyle Scott Date: Mon, 7 Oct 2024 16:45:14 -0400 Subject: [PATCH 1/6] protect against sync cache moving too far ahead --- packages/core/src/sync/index.ts | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/core/src/sync/index.ts b/packages/core/src/sync/index.ts index 41ccca1b4..f05268d1a 100644 --- a/packages/core/src/sync/index.ts +++ b/packages/core/src/sync/index.ts @@ -427,7 +427,11 @@ export const createSync = async (args: CreateSyncParameters): Promise => { getOmnichainCheckpoint("end")! < getOmnichainCheckpoint("finalized")! ? getOmnichainCheckpoint("end")! : getOmnichainCheckpoint("finalized")!; - const to = getOmnichainCheckpoint("current") ?? end; + const to = + getOmnichainCheckpoint("current") && + getOmnichainCheckpoint("current")! < end + ? getOmnichainCheckpoint("current")! + : end; /* * Extract events with `syncStore.getEvents()`, paginating to From 98cb7da893b9d94ba5f37b3d5ef29615e3ac324e Mon Sep 17 00:00:00 2001 From: Kyle Scott Date: Thu, 10 Oct 2024 10:00:49 -0400 Subject: [PATCH 2/6] fix cached block calculation --- packages/core/src/sync/index.ts | 30 +++++++++++++++++++----------- 1 file changed, 19 insertions(+), 11 deletions(-) diff --git a/packages/core/src/sync/index.ts b/packages/core/src/sync/index.ts index c9ae1165b..6a54a350e 100644 --- a/packages/core/src/sync/index.ts +++ b/packages/core/src/sync/index.ts @@ -216,10 +216,19 @@ export const createSync = async (args: CreateSyncParameters): Promise => { }), onFatalError: args.onFatalError, }); + + const { start, end, finalized } = await syncDiagnostic({ + common: args.common, + sources, + requestQueue, + network, + }); + const cached = await getCachedBlock({ sources, requestQueue, historicalSync, + syncProgress: { finalized }, }); // Update "ponder_sync_block" metric @@ -231,12 +240,9 @@ export const createSync = async (args: CreateSyncParameters): Promise => { } const syncProgress: SyncProgress = { - ...(await syncDiagnostic({ - common: args.common, - sources, - requestQueue, - network, - })), + start, + end, + finalized, cached, current: cached, }; @@ -428,11 +434,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { getOmnichainCheckpoint("end")! < getOmnichainCheckpoint("finalized")! ? getOmnichainCheckpoint("end")! : getOmnichainCheckpoint("finalized")!; - const to = - getOmnichainCheckpoint("current") && - getOmnichainCheckpoint("current")! < end - ? getOmnichainCheckpoint("current")! - : end; + const to = getOmnichainCheckpoint("current") ?? end; /* * Extract events with `syncStore.getEvents()`, paginating to @@ -864,10 +866,12 @@ export const getCachedBlock = ({ sources, requestQueue, historicalSync, + syncProgress, }: { sources: Source[]; requestQueue: RequestQueue; historicalSync: HistoricalSync; + syncProgress: Pick; }): Promise | undefined => { const latestCompletedBlocks = sources.map(({ filter }) => { const requiredInterval = [ @@ -903,6 +907,10 @@ export const getCachedBlock = ({ block !== undefined || sources[i]!.filter.fromBlock > minCompletedBlock, ) ) { + if (minCompletedBlock > hexToNumber(syncProgress.finalized.number)) { + return Promise.resolve(syncProgress.finalized as SyncBlock); + } + return _eth_getBlockByNumber(requestQueue, { blockNumber: minCompletedBlock, }); From bb569c674ca992a4ae147c971b3f80c42ca59178 Mon Sep 17 00:00:00 2001 From: Kyle Scott Date: Thu, 10 Oct 2024 11:50:11 -0400 Subject: [PATCH 3/6] . --- packages/core/src/sync/index.ts | 82 ++++++++++++++++++--------------- 1 file changed, 46 insertions(+), 36 deletions(-) diff --git a/packages/core/src/sync/index.ts b/packages/core/src/sync/index.ts index 6a54a350e..6842849c5 100644 --- a/packages/core/src/sync/index.ts +++ b/packages/core/src/sync/index.ts @@ -14,7 +14,6 @@ import type { SyncStore } from "@/sync-store/index.js"; import type { LightBlock, SyncBlock } from "@/types/sync.js"; import { type Checkpoint, - checkpointMin, decodeCheckpoint, encodeCheckpoint, maxCheckpoint, @@ -77,7 +76,7 @@ export type Status = { export type SyncProgress = { start: SyncBlock | LightBlock; end: SyncBlock | LightBlock | undefined; - cached: SyncBlock | undefined; + cached: SyncBlock | LightBlock | undefined; current: SyncBlock | LightBlock | undefined; finalized: SyncBlock | LightBlock; }; @@ -107,7 +106,7 @@ export const blockToCheckpoint = ( * Returns true if all filters have a defined end block and the current * sync progress has reached the final end block. */ -export const isSyncComplete = (syncProgress: SyncProgress) => { +export const isSyncEnd = (syncProgress: SyncProgress) => { if (syncProgress.end === undefined || syncProgress.current === undefined) { return false; } @@ -118,8 +117,22 @@ export const isSyncComplete = (syncProgress: SyncProgress) => { ); }; +/** Returns true if sync progress has reached the finalized block. */ +export const isSyncFinalized = (syncProgress: SyncProgress) => { + if (syncProgress.current === undefined) { + return false; + } + + return ( + hexToNumber(syncProgress.current.number) >= + hexToNumber(syncProgress.finalized.number) + ); +}; + /** Returns the closest-to-tip block that is part of the historical sync. */ -export const getHistoricalLast = (syncProgress: SyncProgress) => { +export const getHistoricalLast = ( + syncProgress: Pick, +) => { return syncProgress.end === undefined ? syncProgress.finalized : hexToNumber(syncProgress.end.number) > @@ -128,6 +141,16 @@ export const getHistoricalLast = (syncProgress: SyncProgress) => { : syncProgress.end; }; +/** Compute the minimum checkpoint, filtering out undefined */ +export const min = (...checkpoints: (string | undefined)[]) => { + return checkpoints.reduce((acc, cur) => { + if (cur === undefined) return acc; + if (acc === undefined) return cur; + if (acc < cur) return acc; + return cur; + })!; +}; + /** Returns the checkpoint for a given block tag. */ export const getChainCheckpoint = ({ syncProgress, @@ -142,7 +165,7 @@ export const getChainCheckpoint = ({ return undefined; } - if (tag === "current" && isSyncComplete(syncProgress)) { + if (tag === "current" && isSyncEnd(syncProgress)) { return undefined; } @@ -228,7 +251,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { sources, requestQueue, historicalSync, - syncProgress: { finalized }, + syncProgress: { end, finalized }, }); // Update "ponder_sync_block" metric @@ -304,11 +327,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { return undefined; } - return encodeCheckpoint( - checkpointMin( - ...checkpoints.map((c) => (c ? decodeCheckpoint(c) : maxCheckpoint)), - ), - ); + return min(...checkpoints); }; const updateHistoricalStatus = ({ @@ -422,19 +441,12 @@ export const createSync = async (args: CreateSyncParameters): Promise => { continue; } - /** - * Calculate the mininum "current" checkpoint, falling back to `end` if - * all networks have completed. - * - * `end`: If every network has an `endBlock` and it's less than - * `finalized`, use that. Otherwise, use `finalized` - */ - const end = - getOmnichainCheckpoint("end") !== undefined && - getOmnichainCheckpoint("end")! < getOmnichainCheckpoint("finalized")! - ? getOmnichainCheckpoint("end")! - : getOmnichainCheckpoint("finalized")!; - const to = getOmnichainCheckpoint("current") ?? end; + // Calculate the mininum "current" checkpoint, limited by "finalized" and "end" + const to = min( + getOmnichainCheckpoint("end"), + getOmnichainCheckpoint("finalized"), + getOmnichainCheckpoint("current"), + ); /* * Extract events with `syncStore.getEvents()`, paginating to @@ -511,7 +523,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { const allHistoricalSyncExhaustive = Array.from( localSyncContext.values(), ).every(({ syncProgress }) => { - if (isSyncComplete(syncProgress)) return true; + if (isSyncEnd(syncProgress)) return true; // Determine if `finalized` block is considered "stale" const staleSeconds = (Date.now() - latestFinalizedFetch) / 1_000; @@ -703,7 +715,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { * The realtime service can be killed if `endBlock` is * defined has become finalized. */ - if (isSyncComplete(syncProgress)) { + if (isSyncEnd(syncProgress)) { args.common.metrics.ponder_sync_is_realtime.set( { network: network.name }, 0, @@ -761,7 +773,7 @@ export const createSync = async (args: CreateSyncParameters): Promise => { }; status[network.name]!.ready = true; - if (isSyncComplete(syncProgress)) { + if (isSyncEnd(syncProgress)) { args.common.metrics.ponder_sync_is_complete.set( { network: network.name }, 1, @@ -871,8 +883,8 @@ export const getCachedBlock = ({ sources: Source[]; requestQueue: RequestQueue; historicalSync: HistoricalSync; - syncProgress: Pick; -}): Promise | undefined => { + syncProgress: Pick; +}): Promise | undefined => { const latestCompletedBlocks = sources.map(({ filter }) => { const requiredInterval = [ filter.fromBlock, @@ -907,8 +919,10 @@ export const getCachedBlock = ({ block !== undefined || sources[i]!.filter.fromBlock > minCompletedBlock, ) ) { - if (minCompletedBlock > hexToNumber(syncProgress.finalized.number)) { - return Promise.resolve(syncProgress.finalized as SyncBlock); + if ( + minCompletedBlock > hexToNumber(getHistoricalLast(syncProgress).number) + ) { + return Promise.resolve(getHistoricalLast(syncProgress)); } return _eth_getBlockByNumber(requestQueue, { @@ -1113,11 +1127,7 @@ export async function* localHistoricalSyncGenerator({ yield; - if ( - isSyncComplete(syncProgress) || - hexToNumber(syncProgress.finalized.number) === - hexToNumber(syncProgress.current.number) - ) { + if (isSyncEnd(syncProgress) || isSyncFinalized(syncProgress)) { return; } } From 0372964ff25ba5cf88fe5b4a3d185367f3be2532 Mon Sep 17 00:00:00 2001 From: Kyle Scott Date: Sat, 12 Oct 2024 09:59:28 -0400 Subject: [PATCH 4/6] fix async generator --- packages/core/src/utils/generators.test.ts | 42 ++++++++++++++++++++++ packages/core/src/utils/generators.ts | 2 +- 2 files changed, 43 insertions(+), 1 deletion(-) diff --git a/packages/core/src/utils/generators.test.ts b/packages/core/src/utils/generators.test.ts index f925b96b2..a0db1a79d 100644 --- a/packages/core/src/utils/generators.test.ts +++ b/packages/core/src/utils/generators.test.ts @@ -36,3 +36,45 @@ test("mergeAsyncGenerators", async () => { expect(results).toStrictEqual([1, 2, 3, 4]); }); + +test("mergeAsyncGenerators results", async () => { + const p1 = promiseWithResolvers(); + const p2 = promiseWithResolvers(); + const p3 = promiseWithResolvers(); + const p4 = promiseWithResolvers(); + + async function* generator1() { + yield await p1.promise; + yield await p2.promise; + } + + async function* generator2() { + yield await p3.promise; + yield await p4.promise; + } + + const results: number[] = []; + const generator = mergeAsyncGenerators([generator1(), generator2()]); + + const lock = promiseWithResolvers(); + + (async () => { + for await (const result of generator) { + await lock.promise; + results.push(result); + } + })(); + + p1.resolve(1); + p2.resolve(2); + await new Promise((res) => setTimeout(res)); + p3.resolve(3); + p4.resolve(4); + await new Promise((res) => setTimeout(res)); + + lock.resolve(); + + await new Promise((res) => setTimeout(res)); + + expect(results).toStrictEqual([1, 2, 3, 4]); +}); diff --git a/packages/core/src/utils/generators.ts b/packages/core/src/utils/generators.ts index 952d14ac9..dd4ee6145 100644 --- a/packages/core/src/utils/generators.ts +++ b/packages/core/src/utils/generators.ts @@ -16,7 +16,7 @@ export async function* mergeAsyncGenerators( pwr.resolve(); }); - while (count > 0) { + while (count > 0 || results.length > 0) { if (results.length > 0) { yield results.shift()!; } else { From 31104df8132ab93b44eec5b3f7c39e82e13765f0 Mon Sep 17 00:00:00 2001 From: Kyle Scott Date: Sat, 12 Oct 2024 10:18:28 -0400 Subject: [PATCH 5/6] revert cached block calculations --- packages/core/src/sync/index.ts | 9 --------- 1 file changed, 9 deletions(-) diff --git a/packages/core/src/sync/index.ts b/packages/core/src/sync/index.ts index ea5745b01..021da59e1 100644 --- a/packages/core/src/sync/index.ts +++ b/packages/core/src/sync/index.ts @@ -265,7 +265,6 @@ export const createSync = async (args: CreateSyncParameters): Promise => { sources, requestQueue, historicalSync, - syncProgress: { end, finalized }, }); // Update "ponder_sync_block" metric @@ -963,12 +962,10 @@ export const getCachedBlock = ({ sources, requestQueue, historicalSync, - syncProgress, }: { sources: Source[]; requestQueue: RequestQueue; historicalSync: HistoricalSync; - syncProgress: Pick; }): Promise | undefined => { const latestCompletedBlocks = sources.map(({ filter }) => { const requiredInterval = [ @@ -1004,12 +1001,6 @@ export const getCachedBlock = ({ block !== undefined || sources[i]!.filter.fromBlock > minCompletedBlock, ) ) { - if ( - minCompletedBlock > hexToNumber(getHistoricalLast(syncProgress).number) - ) { - return Promise.resolve(getHistoricalLast(syncProgress)); - } - return _eth_getBlockByNumber(requestQueue, { blockNumber: minCompletedBlock, }); From d9bf4c5af18a4d4566ff445fd6789a0dc2e8f627 Mon Sep 17 00:00:00 2001 From: Kyle Scott Date: Sat, 12 Oct 2024 10:24:10 -0400 Subject: [PATCH 6/6] chore: changeset --- .changeset/slow-jeans-nail.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/slow-jeans-nail.md diff --git a/.changeset/slow-jeans-nail.md b/.changeset/slow-jeans-nail.md new file mode 100644 index 000000000..6ff2fc876 --- /dev/null +++ b/.changeset/slow-jeans-nail.md @@ -0,0 +1,5 @@ +--- +"@ponder/core": patch +--- + +Fixed a bug introduced in v0.6 that caused events to be skipped near the end of the historical backfill. This bug did not affect the sync cache and does not require the app to be resynced.