-
Notifications
You must be signed in to change notification settings - Fork 93
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
in-memory realtime sync #1115
in-memory realtime sync #1115
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking great, couple questions.
) as SyncCallTrace[]; | ||
callTraces = traces | ||
.filter((trace) => trace.type === "call") | ||
.filter((trace) => trace.result !== null) as SyncCallTrace[]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the story behind this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Previously, we were relying on the getEvents
query to filter out traces with error. Now it happens here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makese sense. Though, this means we'll technically be storing different data in the database in historical vs realtime, correct? In realtime, we're filtering before insertion but the opposite in historical.
transactionIndex: eb.ref("excluded.transactionIndex"), | ||
})), | ||
) | ||
.onConflict((oc) => oc.column("hash").doNothing()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the context behind this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This previous logic was to handle the case when a reorg occurred. It no longer applies because all data in the sync-store
is finalized.
packages/core/src/sync/index.ts
Outdated
const events: RawEvent[] = unindexedEvents.filter( | ||
({ checkpoint }) => checkpoint <= to, | ||
); | ||
unindexedEvents = unindexedEvents.filter( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find it a bit confusing that this is called unindexedEvents
, because this service does not actually do the indexing. Am I correct that this is the list of events (across all chains) that the sync service is holding onto, but can't pass up to the runtime because there is another network with a lagging checkpoint? Essentially a buffer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, exactly. Is eventBuffer
more clear?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe bufferedEvents
or pendingEvents
?
packages/core/src/sync/index.ts
Outdated
...args.sources | ||
.filter(({ filter }) => filter.chainId === network.chainId) | ||
.map(({ filter }) => | ||
args.syncStore.insertInterval({ filter, interval }), | ||
), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really nice that this is now happening in the same spot - not necessary for this PR, but ultimately I think it would be great to use a database transaction here.
packages/core/src/sync/index.ts
Outdated
localSyncContext.get(network)!.unfinalizedEventData = | ||
unfinalizedEventData.filter( | ||
(led) => | ||
hexToNumber(led.block.number) <= hexToNumber(event.block.number), | ||
); | ||
|
||
break; | ||
const reorgedHashes = new Set<Hash>(); | ||
for (const b of event.reorgedBlocks) { | ||
reorgedHashes.add(b.hash); | ||
} | ||
|
||
default: | ||
never(event); | ||
unindexedEvents = unindexedEvents.filter( | ||
(e) => reorgedHashes.has(e.block.hash) === false, | ||
); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm getting a bit confused by the difference between unfinalizedEventData
and unindexedEvents
here. It seems like they contain the same information, why do we need both?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
They do contain the same information, where unindexedEvents
is derived from unfinalizedEventData
. However, when building events, point-in-time child addresses are needed. Also, the lifetime of each of these variables is different.
eventQueue.pause(); | ||
eventQueue.clear(); | ||
promises.push(eventQueue.onIdle()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why'd you remove these?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The sync
module no longer contains a queue. Instead, it is synchronous with sync-realtime
. This is primarily to maintain consistency for factory addresses. Without this change, it would be almost impossible to maintain unfinalizedChildAddresses
and pass the correct value for the correct block height to buildEvents
.
// @ts-ignore | ||
block.transactions = undefined; | ||
|
||
await args.onEvent({ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why did you add the await
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
]); | ||
|
||
// Add corresponding intervals to the sync-store | ||
// Note: this should happen after so the database doesn't become corrupted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch, I bet this would have bit us.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
The "sync" portion of ponder handling extracting and ordering events is entirely in-memory, removing the need for database queries. Another property is that only finalized data is added to the
sync-store
. This is a combination of #1092 and #1103. Closes #1091.This involves:
getEvents()
)Child address methodology
unfinalizedChildAddresses
andfinalizedChildAddresses
factoryLogsPerBlock
, so thatunfinalizedChildAddresses
can always be recomputedfactoryLogsPerBlock
and recomputeunfinalizedChildAddresses
finalizedChildAddresses
, evict finalized blocks fromfactoryLogsPerBlock
and recomputeunfinalizedChildAddresses
Sync ordering methodology
unindexedEvents
unfinalizedEventData
buildEvents()
and add tounindexedEvents
unindexedEvents
, and pass them to the indexing functionsunindexedEvents
andunfinalizedEventData
unfinalizedEventData
and insert it into sync-storeOther
A metric to track how far away from tip the indexing progress is (known block - indexed block) might be helpful. However, it also may be unintuitive for many consumers when using omnichain ordering as the metric will appear far behind tip quite consistently