Skip to content

Commit

Permalink
refactor(core): add reusable rxSwr operator (#7562)
Browse files Browse the repository at this point in the history
### Description

This PR introduces a new utility function `createSWR` for implementing
stale-while-revalidate caching in RxJS observables. The `createSwr`
function lets you create an operator function that can be used with a
cache key for storing the last emitted value. Once an observable with
the same cache key is subscribed to, it will emit the cached value (if
any) immediately while waiting for the fresh value to arrive.

This PR also refactors the `listenSearchQuery` function to use this new
SWR implementation, replacing the previous LRU cache approach.

### What to review

- The new `rxSwr.ts` file in the `core/util` directory, which contains
the `createSWR` function and related types.
- The changes in `listenSearchQuery.ts`, particularly the removal of the
`memoLRU` function and the integration of the new `swr` utility.

To add SWR to any observable:

```typescript
import {createSWR} from 'sanity'

const swr = createSWR<ObservableValueType>({maxSize: 100})

// In your observable pipeline:
someObservable$.pipe(
  swr('<some cacheKey identifying the observable>'),
  map(({fromCache, value}) => {
    // Handle cached and fresh data
    // `fromCache` is true if `value` was read from cache, otherwise false.
  })
)
```

### Testing

Included some basic unit tests for rxSwr

### Notes for release

n/a internal
  • Loading branch information
bjoerge committed Oct 7, 2024
1 parent 4910662 commit f7ca105
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 40 deletions.
55 changes: 55 additions & 0 deletions packages/sanity/src/core/util/__tests__/rxSwr.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import {describe, expect, it} from '@jest/globals'
import {lastValueFrom, timer} from 'rxjs'
import {map, toArray} from 'rxjs/operators'

import {createSWR} from '../rxSwr'

describe('rxSwr', () => {
it('should cache the last known value and emit sync', async () => {
const swr = createSWR({maxSize: 1})

const observable = timer(100).pipe(
map(() => 'value!'),
swr('someKey'),
toArray(),
)

expect(await lastValueFrom(observable)).toEqual([{fromCache: false, value: 'value!'}])

// Second subscription, now with warm cache
expect(await lastValueFrom(observable)).toEqual([
{fromCache: true, value: 'value!'},
{fromCache: false, value: 'value!'},
])
})

it('should discard old cache keys when exceeding maxSize', async () => {
const swr = createSWR({maxSize: 1})

const observable1 = timer(100).pipe(
map(() => 'observable1!'),
swr('key1'),
toArray(),
)

expect(await lastValueFrom(observable1)).toEqual([{fromCache: false, value: 'observable1!'}])

// Second subscription, now with warm cache
expect(await lastValueFrom(observable1)).toEqual([
{fromCache: true, value: 'observable1!'},
{fromCache: false, value: 'observable1!'},
])

const observable2 = timer(100).pipe(
map(() => 'observable2!'),
swr('key2'),
toArray(),
)

// Subscribing to observable2 should purge the key of observable1
expect(await lastValueFrom(observable2)).toEqual([{fromCache: false, value: 'observable2!'}])

// re-subscribing to the first should now not have a cache
expect(await lastValueFrom(observable1)).toEqual([{fromCache: false, value: 'observable1!'}])
})
})
1 change: 1 addition & 0 deletions packages/sanity/src/core/util/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ export * from './isString'
export * from './isTruthy'
export * from './PartialExcept'
export * from './resizeObserver'
export * from './rxSwr'
export * from './schemaUtils'
export * from './searchUtils'
export * from './supportsTouch'
Expand Down
70 changes: 70 additions & 0 deletions packages/sanity/src/core/util/rxSwr.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import QuickLRU from 'quick-lru'
import {concat, defer, EMPTY, map, type Observable, of, type OperatorFunction} from 'rxjs'
import {tap} from 'rxjs/operators'

/**
* The interface that any caching layer must implement
* @internal
*/
interface SWRCache<T> {
/**
* Note: This will throw if key does not exist. Always check for existence with `has` before calling
*/
get(key: string): T
has(key: string): boolean
set(key: string, value: T): void
delete(key: string): void
}

const createSWRCache = createLRUCache

/**
*
* Create an SWR (Stale While Revalidate) rxjs operator that will store the latest value in a cache and emit the last know value upon observable subscription
* @param options - Options
* @internal
*/
export function createSWR<T>(options: {maxSize: number}) {
const cache = createSWRCache<T>(options)
return function rxSwr(key: string): OperatorFunction<T, {fromCache: boolean; value: T}> {
return (input$: Observable<T>) => {
return concat(
defer(() => (cache.has(key) ? of({fromCache: true, value: cache.get(key)}) : EMPTY)),
input$.pipe(
tap((result) => cache.set(key, result)),
map((value) => ({
fromCache: false,
value: value,
})),
),
)
}
}
}

/**
* For now, the only cache layer implemented is an in-memory LRU.
* @param options - LRU options
* @internal
*/
function createLRUCache<T>(options: {maxSize: number}): SWRCache<T> {
const lru = new QuickLRU<string, {value: T}>(options)
return {
get(key: string) {
const entry = lru.get(key)
if (!entry) {
throw new Error(`Key not found in LRU cache: ${key}`)
}
return entry.value
},
set(key: string, value: T) {
lru.set(key, {value})
},
delete(key: string) {
lru.delete(key)
},
has(key: string) {
return lru.has(key)
},
}
}
Original file line number Diff line number Diff line change
@@ -1,29 +1,29 @@
import {type SanityClient} from '@sanity/client'
import QuickLRU from 'quick-lru'
import {
asyncScheduler,
defer,
EMPTY,
map,
merge,
mergeMap,
type Observable,
of,
type OperatorFunction,
partition,
pipe,
share,
take,
throttleTime,
throwError,
timer,
} from 'rxjs'
import {tap} from 'rxjs/operators'
import {exhaustMapWithTrailing} from 'rxjs-exhaustmap-with-trailing'
import {createSearch, getSearchableTypes, type SanityDocumentLike, type Schema} from 'sanity'
import {
createSearch,
createSWR,
getSearchableTypes,
type SanityDocumentLike,
type Schema,
} from 'sanity'

import {getExtendedProjection} from '../../structureBuilder/util/getExtendedProjection'
import {ENABLE_LRU_MEMO} from './constants'
import {type SortOrder} from './types'

interface ListenQueryOptions {
Expand All @@ -44,6 +44,8 @@ export interface SearchQueryResult {
documents: SanityDocumentLike[]
}

const swr = createSWR<SanityDocumentLike[]>({maxSize: 100})

export function listenSearchQuery(options: ListenQueryOptions): Observable<SearchQueryResult> {
const {
client,
Expand Down Expand Up @@ -91,7 +93,7 @@ export function listenSearchQuery(options: ListenQueryOptions): Observable<Searc

const [welcome$, mutationAndReconnect$] = partition(events$, (ev) => ev.type === 'welcome')

const memoKey = JSON.stringify({filter, limit, params, searchQuery, sort, staticTypeNames})
const swrKey = JSON.stringify({filter, limit, params, searchQuery, sort, staticTypeNames})

return merge(
welcome$.pipe(take(1)),
Expand Down Expand Up @@ -157,37 +159,7 @@ export function listenSearchQuery(options: ListenQueryOptions): Observable<Searc
}),
)
}),
ENABLE_LRU_MEMO
? pipe(
memoLRU(memoKey, lru),
map((memo) => ({
fromCache: memo.type === 'memo',
documents: memo.value,
})),
)
: map((documents) => ({
fromCache: false,
documents,
})),
swr(swrKey),
map(({fromCache, value}) => ({fromCache, documents: value})),
)
}

const lru = new QuickLRU<string, SanityDocumentLike[]>({maxSize: 100})
function memoLRU<T>(
memoKey: string,
cache: QuickLRU<string, T>,
): OperatorFunction<T, {type: 'memo'; value: T} | {type: 'value'; value: T}> {
return (input$: Observable<T>) =>
merge(
defer(() =>
cache.has(memoKey) ? of({type: 'memo' as const, value: cache.get(memoKey)!}) : EMPTY,
),
input$.pipe(
tap((result) => cache.set(memoKey, result)),
map((value) => ({
type: 'value' as const,
value: value,
})),
),
)
}

0 comments on commit f7ca105

Please sign in to comment.