Skip to content

Commit

Permalink
fix(core): add detection and recovery for missing mutation events (#7576
Browse files Browse the repository at this point in the history
)

### Description
In very rare cases, the /listen endpoint may drop mutation events. This
PR implements detection of "holes" in the received mutation events, and
implements error recovery by restarting the connection.

### What to review
Are the threshold values sensible? Error detection and recovery is now
triggered by the following thresholds.
- `DEFAULT_MAX_BUFFER_SIZE=20`: If we se more than 20 mutation events
that can't be applied in order we treat it as an error
- `DEFAULT_DEADLINE_MS=30_000`: If 30 seconds pass since we last
received a message that can't be applied in order we treat it as an
error

### Testing
This PR includes unit tests and has also undergone extensive manual
testing.

### Notes for release

- Fixes an issue that could in rare cases lead to an outdated version of
the document being displayed locally
  • Loading branch information
bjoerge authored Oct 7, 2024
1 parent 7868c0b commit 8195c96
Show file tree
Hide file tree
Showing 9 changed files with 628 additions and 17 deletions.
9 changes: 9 additions & 0 deletions dev/test-studio/sanity.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,15 @@ export default defineConfig([
plugins: [sharedSettings()],
basePath: '/playground',
},
{
name: 'listener-events',
title: 'Listener events debug',
subtitle: 'Listener events debugging',
projectId: 'ppsg7ml5',
dataset: 'data-loss',
plugins: [sharedSettings()],
basePath: '/listener-events',
},
{
name: 'playground-partial-indexing',
title: 'Test Studio (playground-partial-indexing)',
Expand Down
3 changes: 3 additions & 0 deletions packages/sanity/src/core/store/_legacy/document/debug.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import createDebug from 'debug'

export const debug = createDebug('sanity:document-store')
91 changes: 74 additions & 17 deletions packages/sanity/src/core/store/_legacy/document/getPairListener.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@
import {type SanityClient} from '@sanity/client'
import {type SanityDocument} from '@sanity/types'
import {groupBy} from 'lodash'
import {defer, type Observable, of as observableOf, of, timer} from 'rxjs'
import {concatMap, map, mergeMap, scan} from 'rxjs/operators'
import {defer, merge, type Observable, of, throwError, timer} from 'rxjs'
import {catchError, concatMap, filter, map, mergeMap, scan, share} from 'rxjs/operators'

import {LISTENER_RESET_DELAY} from '../../../preview/constants'
import {shareReplayLatest} from '../../../preview/utils/shareReplayLatest'
import {debug} from './debug'
import {
type IdPair,
type MutationEvent,
type PendingMutationsEvent,
type ReconnectEvent,
type WelcomeEvent,
} from './types'
import {OutOfSyncError, sequentializeListenerEvents} from './utils/sequentializeListenerEvents'

interface Snapshots {
draft: SanityDocument | null
Expand All @@ -28,6 +32,13 @@ export interface InitialSnapshotEvent {
/** @internal */
export interface PairListenerOptions {
tag?: string

/**
* Called when we recover from sync error
* Meant for error tracking / telemetry purposes
* @param error - the {@link OutOfSyncError} recovered from
*/
onSyncErrorRecovery?(error: OutOfSyncError): void
}

/** @internal */
Expand Down Expand Up @@ -65,9 +76,10 @@ export function getPairListener(
options: PairListenerOptions = {},
): Observable<ListenerEvent> {
const {publishedId, draftId} = idPair
return defer(
() =>
client.observable.listen(

const sharedEvents = defer(() =>
client.observable
.listen(
`*[_id == $publishedId || _id == $draftId]`,
{
publishedId,
Expand All @@ -79,20 +91,35 @@ export function getPairListener(
effectFormat: 'mendoza',
tag: options.tag || 'document.pair-listener',
},
) as Observable<WelcomeEvent | MutationEvent | ReconnectEvent>,
).pipe(
concatMap((event) =>
event.type === 'welcome'
)
.pipe(
//filter((event) => Math.random() < 0.99 || event.type !== 'mutation'),
shareReplayLatest({
predicate: (event) => event.type === 'welcome' || event.type === 'reconnect',
resetOnRefCountZero: () => timer(LISTENER_RESET_DELAY),
}),
),
) as Observable<WelcomeEvent | MutationEvent | ReconnectEvent>

const pairEvents$ = sharedEvents.pipe(
concatMap((event) => {
return event.type === 'welcome'
? fetchInitialDocumentSnapshots().pipe(
concatMap((snapshots) => [
createSnapshotEvent(draftId, snapshots.draft),
createSnapshotEvent(publishedId, snapshots.published),
mergeMap(({draft, published}) => [
createSnapshotEvent(draftId, draft),
createSnapshotEvent(publishedId, published),
]),
)
: observableOf(event),
),
: of(event)
}),
scan(
(acc: {next: ListenerEvent[]; buffer: ListenerEvent[]}, msg) => {
(
acc: {
next: (InitialSnapshotEvent | ListenerEvent)[]
buffer: (InitialSnapshotEvent | ListenerEvent)[]
},
msg,
) => {
// we only care about mutation events
if (!isMutationEvent(msg)) {
return {next: [msg], buffer: []}
Expand Down Expand Up @@ -124,9 +151,39 @@ export function getPairListener(
),
// note: this flattens the array, and in the case of an empty array, no event will be pushed downstream
mergeMap((v) => v.next),
concatMap((result) =>
(window as any).SLOW ? timer(10000).pipe(map(() => result)) : of(result),
share(),
)

const draftEvents$ = pairEvents$.pipe(
filter((event) =>
event.type === 'mutation' || event.type === 'snapshot' ? event.documentId === draftId : true,
),
sequentializeListenerEvents(),
)

const publishedEvents$ = pairEvents$.pipe(
filter((event) =>
event.type === 'mutation' || event.type === 'snapshot'
? event.documentId === publishedId
: true,
),
sequentializeListenerEvents(),
)

return merge(draftEvents$, publishedEvents$).pipe(
catchError((err, caught$) => {
if (err instanceof OutOfSyncError) {
debug('Recovering from OutOfSyncError: %s', OutOfSyncError.name)
if (typeof options?.onSyncErrorRecovery === 'function') {
options?.onSyncErrorRecovery(err)
} else {
console.error(err)
}
// this will retry immediately
return caught$
}
return throwError(() => err)
}),
)

function fetchInitialDocumentSnapshots(): Observable<Snapshots> {
Expand Down
2 changes: 2 additions & 0 deletions packages/sanity/src/core/store/_legacy/document/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ export interface MutationEvent {
mutations: MutationPayload[]
effects: {apply: unknown; revert: unknown}

previousRev: string
resultRev: string
transactionTotalEvents: number
transactionCurrentEvent: number
visibility: 'transaction' | 'query'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import {describe, expect, it} from '@jest/globals'

import {discardChainTo, toOrderedChains} from '../eventChainUtils'
import {mutationEvent} from './test-utils'

describe(toOrderedChains.name, () => {
it('returns a list of chains', () => {
const events = [
// missing
mutationEvent({previousRev: 'a', resultRev: 'b', mutations: []}),
mutationEvent({previousRev: 'b', resultRev: 'c', mutations: []}),
mutationEvent({previousRev: 'c', resultRev: 'd', mutations: []}),
mutationEvent({previousRev: 'd', resultRev: 'e', mutations: []}),
mutationEvent({previousRev: 'e', resultRev: 'f', mutations: []}),

// mutationEvent({previousRev: 'g', resultRev: 'h', mutations: []}), // missing
mutationEvent({previousRev: 'h', resultRev: 'i', mutations: []}),
mutationEvent({previousRev: 'i', resultRev: 'j', mutations: []}),
mutationEvent({previousRev: 'j', resultRev: 'k', mutations: []}),
mutationEvent({previousRev: 'k', resultRev: 'l', mutations: []}),
mutationEvent({previousRev: 'l', resultRev: 'm', mutations: []}),
]
const [first, second] = toOrderedChains(events)

expect(first.map((ev) => ev.resultRev)).toEqual(['b', 'c', 'd', 'e', 'f'])
expect(second.map((ev) => ev.resultRev)).toEqual(['i', 'j', 'k', 'l', 'm'])
})
})

describe(discardChainTo.name, () => {
it('discards mutation events in the chain up to the provided revision', () => {
const events = [
mutationEvent({previousRev: 'a', resultRev: 'b', mutations: []}),
mutationEvent({previousRev: 'b', resultRev: 'c', mutations: []}),
mutationEvent({previousRev: 'c', resultRev: 'd', mutations: []}),
mutationEvent({previousRev: 'd', resultRev: 'e', mutations: []}),
mutationEvent({previousRev: 'e', resultRev: 'f', mutations: []}),
]
const [discarded, applicable] = discardChainTo(events, 'd')
// Note, it's still in the order received
expect(discarded.map((ev) => ev.resultRev)).toEqual(['b', 'c', 'd'])
expect(applicable.map((ev) => ev.resultRev)).toEqual(['e', 'f'])
})
})
Loading

0 comments on commit 8195c96

Please sign in to comment.