Skip to content

Commit

Permalink
in-memory realtime sync (#1115)
Browse files Browse the repository at this point in the history
* start in-memory realtime sync

* fix realtime log

* notes

* no queue for onRealtimeSyncEvent

* pass factory information to buildEvents

* cleanup

* backpressure log

* remove reorged events

* on conflict do nothing

* fix reorg

* filter out reverted call traces

* fix finalize interval

* fix reorg handling

* filter call traces with function selector

* cleanup

* fix test

* nits

* chore: changeset

* cleanup
  • Loading branch information
kyscott18 authored Oct 10, 2024
1 parent 6d45c52 commit 1a7d1f8
Show file tree
Hide file tree
Showing 16 changed files with 1,143 additions and 432 deletions.
5 changes: 5 additions & 0 deletions .changeset/large-crabs-listen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@ponder/core": patch
---

Improved performance of realtime event processing.
22 changes: 11 additions & 11 deletions packages/core/src/_test/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ export const getRawRPCData = async () => {
block: blocks[0],
transactions: [],
transactionReceipts: [],
traces: [
callTraces: [
{
action: {
from: ALICE,
Expand Down Expand Up @@ -293,7 +293,7 @@ export const getRawRPCData = async () => {
transactionReceipts: transactionReceipts.filter(
(tr) => tr?.blockNumber === blocks[1]?.number,
),
traces: [
callTraces: [
{
action: {
callType: "call",
Expand Down Expand Up @@ -363,7 +363,7 @@ export const getRawRPCData = async () => {
transactionReceipts: transactionReceipts.filter(
(tr) => tr?.blockNumber === blocks[2]?.number,
),
traces: [
callTraces: [
{
action: {
callType: "call",
Expand Down Expand Up @@ -403,7 +403,7 @@ export const getRawRPCData = async () => {
transactionReceipts: transactionReceipts.filter(
(tr) => tr?.blockNumber === blocks[3]?.number,
),
traces: [
callTraces: [
{
action: {
callType: "call",
Expand Down Expand Up @@ -441,43 +441,43 @@ export const getRawRPCData = async () => {
block: blocks[4]!,
transactions: [],
transactionReceipts: [],
traces: [],
callTraces: [],
},
} as unknown as {
block1: {
logs: [];
block: SyncBlock;
transactions: [];
transactionReceipts: [];
traces: [SyncCreateTrace, SyncCreateTrace];
callTraces: [SyncCreateTrace, SyncCreateTrace];
};
block2: {
logs: [SyncLog, SyncLog];
block: SyncBlock;
transactions: [SyncTransaction, SyncTransaction];
transactionReceipts: [SyncTransactionReceipt, SyncTransactionReceipt];
traces: [SyncCallTrace, SyncCallTrace];
callTraces: [SyncCallTrace, SyncCallTrace];
};
block3: {
logs: [SyncLog];
block: SyncBlock;
transactions: [SyncTransaction];
transactionReceipts: [SyncTransactionReceipt];
traces: [SyncCallTrace];
callTraces: [SyncCallTrace];
};
block4: {
logs: [SyncLog];
block: SyncBlock;
transactions: [SyncTransaction];
transactionReceipts: [SyncTransactionReceipt];
traces: [SyncCallTrace];
callTraces: [SyncCallTrace];
};
block5: {
logs: [];
block: SyncBlock;
transactions: [];
transactionReceipts: [];
traces: [];
callTraces: [];
};
};
};
Expand Down Expand Up @@ -592,7 +592,7 @@ export const getEventsTrace = async (

return [
{
trace: rpcData.block3.traces[0],
trace: rpcData.block3.callTraces[0],
block: rpcData.block3.block,
transaction: rpcData.block3.transactions[0]!,
transactionReceipt: rpcData.block3.transactionReceipts[0]!,
Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/bin/utils/run.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ export async function run({

indexingService.updateIndexingStore({ indexingStore, schema });

sync.startRealtime();
await sync.startRealtime();

await metadataStore.setStatus(sync.getStatus());

Expand Down
8 changes: 4 additions & 4 deletions packages/core/src/build/configAndIndexingFunctions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type { Options } from "@/common/options.js";
import type { CallTraceFilter, LogFactory, LogFilter } from "@/sync/source.js";
import {
http,
type Address,
getEventSelector,
getFunctionSelector,
parseAbiItem,
Expand Down Expand Up @@ -399,8 +400,7 @@ test("buildConfigAndIndexingFunctions() validates address empty string", async (
a: {
network: "mainnet",
abi: [event0],
// @ts-expect-error
address: "",
address: "" as Address,
},
},
}) as Config;
Expand All @@ -426,8 +426,8 @@ test("buildConfigAndIndexingFunctions() validates address prefix", async () => {
a: {
network: "mainnet",
abi: [event0],
// @ts-expect-error
address: "0b0000000000000000000000000000000000000001",

address: "0b0000000000000000000000000000000000000001" as Address,
},
},
}) as Config;
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/sync-historical/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ const getRequestQueue = async (requestQueue: RequestQueue) => {
request: (request: any) => {
if (request.method === "trace_filter") {
let traces = [
...rpcData.block2.traces,
...rpcData.block3.traces,
...rpcData.block4.traces,
...rpcData.block2.callTraces,
...rpcData.block3.callTraces,
...rpcData.block4.callTraces,
];

if (request.params[0].fromBlock !== undefined) {
Expand Down
6 changes: 3 additions & 3 deletions packages/core/src/sync-realtime/filter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,21 +70,21 @@ test("isCallTraceFilterMatched", async (context) => {
let isMatched = isCallTraceFilterMatched({
filter: context.sources[3].filter,
block: rpcData.block3.block,
callTrace: rpcData.block3.traces[0],
callTrace: rpcData.block3.callTraces[0],
});
expect(isMatched).toBe(true);

isMatched = isCallTraceFilterMatched({
filter: context.sources[2].filter,
block: rpcData.block3.block,
callTrace: rpcData.block3.traces[0],
callTrace: rpcData.block3.callTraces[0],
});
expect(isMatched).toBe(true);

isMatched = isCallTraceFilterMatched({
filter: context.sources[3].filter,
block: rpcData.block2.block,
callTrace: rpcData.block2.traces[0],
callTrace: rpcData.block2.callTraces[0],
});
expect(isMatched).toBe(false);
});
Expand Down
Loading

0 comments on commit 1a7d1f8

Please sign in to comment.