Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: skipped events near end of historical range #1160

Merged
merged 9 commits into from
Oct 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/slow-jeans-nail.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@ponder/core": patch
---

Fixed a bug introduced in v0.6 that caused events to be skipped near the end of the historical backfill. This bug did not affect the sync cache and does not require the app to be resynced.
89 changes: 51 additions & 38 deletions packages/core/src/sync/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import type { SyncStore } from "@/sync-store/index.js";
import type { LightBlock, SyncBlock } from "@/types/sync.js";
import {
type Checkpoint,
checkpointMin,
decodeCheckpoint,
encodeCheckpoint,
maxCheckpoint,
Expand Down Expand Up @@ -83,7 +82,7 @@ export type Status = {
export type SyncProgress = {
start: SyncBlock | LightBlock;
end: SyncBlock | LightBlock | undefined;
cached: SyncBlock | undefined;
cached: SyncBlock | LightBlock | undefined;
current: SyncBlock | LightBlock | undefined;
finalized: SyncBlock | LightBlock;
};
Expand Down Expand Up @@ -118,7 +117,7 @@ export const blockToCheckpoint = (
* Returns true if all filters have a defined end block and the current
* sync progress has reached the final end block.
*/
export const isSyncComplete = (syncProgress: SyncProgress) => {
export const isSyncEnd = (syncProgress: SyncProgress) => {
if (syncProgress.end === undefined || syncProgress.current === undefined) {
return false;
}
Expand All @@ -129,8 +128,22 @@ export const isSyncComplete = (syncProgress: SyncProgress) => {
);
};

/** Returns true if sync progress has reached the finalized block. */
export const isSyncFinalized = (syncProgress: SyncProgress) => {
if (syncProgress.current === undefined) {
return false;
}

return (
hexToNumber(syncProgress.current.number) >=
hexToNumber(syncProgress.finalized.number)
);
};

/** Returns the closest-to-tip block that is part of the historical sync. */
export const getHistoricalLast = (syncProgress: SyncProgress) => {
export const getHistoricalLast = (
syncProgress: Pick<SyncProgress, "finalized" | "end">,
) => {
return syncProgress.end === undefined
? syncProgress.finalized
: hexToNumber(syncProgress.end.number) >
Expand All @@ -139,6 +152,16 @@ export const getHistoricalLast = (syncProgress: SyncProgress) => {
: syncProgress.end;
};

/** Compute the minimum checkpoint, filtering out undefined */
export const min = (...checkpoints: (string | undefined)[]) => {
return checkpoints.reduce((acc, cur) => {
if (cur === undefined) return acc;
if (acc === undefined) return cur;
if (acc < cur) return acc;
return cur;
})!;
};

/** Returns the checkpoint for a given block tag. */
export const getChainCheckpoint = ({
syncProgress,
Expand All @@ -153,7 +176,7 @@ export const getChainCheckpoint = ({
return undefined;
}

if (tag === "current" && isSyncComplete(syncProgress)) {
if (tag === "current" && isSyncEnd(syncProgress)) {
return undefined;
}

Expand Down Expand Up @@ -231,6 +254,14 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
}),
onFatalError: args.onFatalError,
});

const { start, end, finalized } = await syncDiagnostic({
common: args.common,
sources,
requestQueue,
network,
});

const cached = await getCachedBlock({
sources,
requestQueue,
Expand All @@ -246,12 +277,9 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
}

const syncProgress: SyncProgress = {
...(await syncDiagnostic({
common: args.common,
sources,
requestQueue,
network,
})),
start,
end,
finalized,
cached,
current: cached,
};
Expand Down Expand Up @@ -314,11 +342,7 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
return undefined;
}

return encodeCheckpoint(
checkpointMin(
...checkpoints.map((c) => (c ? decodeCheckpoint(c) : maxCheckpoint)),
),
);
return min(...checkpoints);
};

const updateHistoricalStatus = ({
Expand Down Expand Up @@ -432,19 +456,12 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
continue;
}

/**
* Calculate the mininum "current" checkpoint, falling back to `end` if
* all networks have completed.
*
* `end`: If every network has an `endBlock` and it's less than
* `finalized`, use that. Otherwise, use `finalized`
*/
const end =
getOmnichainCheckpoint("end") !== undefined &&
getOmnichainCheckpoint("end")! < getOmnichainCheckpoint("finalized")!
? getOmnichainCheckpoint("end")!
: getOmnichainCheckpoint("finalized")!;
const to = getOmnichainCheckpoint("current") ?? end;
// Calculate the mininum "current" checkpoint, limited by "finalized" and "end"
const to = min(
getOmnichainCheckpoint("end"),
getOmnichainCheckpoint("finalized"),
getOmnichainCheckpoint("current"),
);

/*
* Extract events with `syncStore.getEvents()`, paginating to
Expand Down Expand Up @@ -521,7 +538,7 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
const allHistoricalSyncExhaustive = Array.from(
localSyncContext.values(),
).every(({ syncProgress }) => {
if (isSyncComplete(syncProgress)) return true;
if (isSyncEnd(syncProgress)) return true;

// Determine if `finalized` block is considered "stale"
const staleSeconds = (Date.now() - latestFinalizedFetch) / 1_000;
Expand Down Expand Up @@ -757,7 +774,7 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
* The realtime service can be killed if `endBlock` is
* defined has become finalized.
*/
if (isSyncComplete(syncProgress)) {
if (isSyncEnd(syncProgress)) {
args.common.metrics.ponder_sync_is_realtime.set(
{ network: network.name },
0,
Expand Down Expand Up @@ -829,7 +846,7 @@ export const createSync = async (args: CreateSyncParameters): Promise<Sync> => {
};
status[network.name]!.ready = true;

if (isSyncComplete(syncProgress)) {
if (isSyncEnd(syncProgress)) {
args.common.metrics.ponder_sync_is_complete.set(
{ network: network.name },
1,
Expand Down Expand Up @@ -950,7 +967,7 @@ export const getCachedBlock = ({
sources: Source[];
requestQueue: RequestQueue;
historicalSync: HistoricalSync;
}): Promise<SyncBlock> | undefined => {
}): Promise<SyncBlock | LightBlock> | undefined => {
const latestCompletedBlocks = sources.map(({ filter }) => {
const requiredInterval = [
filter.fromBlock,
Expand Down Expand Up @@ -1187,11 +1204,7 @@ export async function* localHistoricalSyncGenerator({

yield;

if (
isSyncComplete(syncProgress) ||
hexToNumber(syncProgress.finalized.number) ===
hexToNumber(syncProgress.current.number)
) {
if (isSyncEnd(syncProgress) || isSyncFinalized(syncProgress)) {
return;
}
}
Expand Down
42 changes: 42 additions & 0 deletions packages/core/src/utils/generators.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,45 @@ test("mergeAsyncGenerators", async () => {

expect(results).toStrictEqual([1, 2, 3, 4]);
});

test("mergeAsyncGenerators results", async () => {
const p1 = promiseWithResolvers<number>();
const p2 = promiseWithResolvers<number>();
const p3 = promiseWithResolvers<number>();
const p4 = promiseWithResolvers<number>();

async function* generator1() {
yield await p1.promise;
yield await p2.promise;
}

async function* generator2() {
yield await p3.promise;
yield await p4.promise;
}

const results: number[] = [];
const generator = mergeAsyncGenerators([generator1(), generator2()]);

const lock = promiseWithResolvers<void>();

(async () => {
for await (const result of generator) {
await lock.promise;
results.push(result);
}
})();

p1.resolve(1);
p2.resolve(2);
await new Promise((res) => setTimeout(res));
p3.resolve(3);
p4.resolve(4);
await new Promise((res) => setTimeout(res));

lock.resolve();

await new Promise((res) => setTimeout(res));

expect(results).toStrictEqual([1, 2, 3, 4]);
});
2 changes: 1 addition & 1 deletion packages/core/src/utils/generators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ export async function* mergeAsyncGenerators<T>(
pwr.resolve();
});

while (count > 0) {
while (count > 0 || results.length > 0) {
if (results.length > 0) {
yield results.shift()!;
} else {
Expand Down
Loading