diff --git a/packages/dev-env/src/bsky.ts b/packages/dev-env/src/bsky.ts index 75206fe768c..da3010adf22 100644 --- a/packages/dev-env/src/bsky.ts +++ b/packages/dev-env/src/bsky.ts @@ -86,6 +86,10 @@ export class TestBsky { return new AtpAgent({ service: this.url }) } + async processAll() { + await this.ctx.backgroundQueue.processAll() + } + async close() { await this.server.destroy() } diff --git a/packages/dev-env/src/network-no-appview.ts b/packages/dev-env/src/network-no-appview.ts index f5244eff2db..24a0b72fb59 100644 --- a/packages/dev-env/src/network-no-appview.ts +++ b/packages/dev-env/src/network-no-appview.ts @@ -37,6 +37,10 @@ export class TestNetworkNoAppView { return fg } + async processAll() { + await this.pds.processAll() + } + async close() { await Promise.all(this.feedGens.map((fg) => fg.close())) await this.pds.close() diff --git a/packages/dev-env/src/network.ts b/packages/dev-env/src/network.ts index 5d589335346..1cce66cfde9 100644 --- a/packages/dev-env/src/network.ts +++ b/packages/dev-env/src/network.ts @@ -54,7 +54,6 @@ export class TestNetwork extends TestNetworkNoAppView { } async processFullSubscription(timeout = 5000) { - if (!this.bsky) return const sub = this.bsky.sub if (!sub) return const { db } = this.pds.ctx.db @@ -76,10 +75,9 @@ export class TestNetwork extends TestNetworkNoAppView { } async processAll(timeout?: number) { - await this.pds.ctx.backgroundQueue.processAll() - if (!this.bsky) return + await this.pds.processAll() await this.processFullSubscription(timeout) - await this.bsky.ctx.backgroundQueue.processAll() + await this.bsky.processAll() } async serviceHeaders(did: string, aud?: string) { diff --git a/packages/dev-env/src/pds.ts b/packages/dev-env/src/pds.ts index ee0c307f141..28b6727c4d8 100644 --- a/packages/dev-env/src/pds.ts +++ b/packages/dev-env/src/pds.ts @@ -96,6 +96,9 @@ export class TestPds { }) await server.start() + + // we refresh label cache by hand in `processAll` instead of on a timer + server.ctx.labelCache.stop() return new TestPds(url, port, server) } @@ -123,6 +126,11 @@ export class TestPds { } } + async processAll() { + await this.ctx.backgroundQueue.processAll() + await this.ctx.labelCache.fullRefresh() + } + async close() { await this.server.destroy() } diff --git a/packages/identifier/src/reserved.ts b/packages/identifier/src/reserved.ts index 83180160e61..c49c85f5378 100644 --- a/packages/identifier/src/reserved.ts +++ b/packages/identifier/src/reserved.ts @@ -864,6 +864,7 @@ const famousAccounts = [ // reserving some large twitter accounts (top 100 by followers according to wikidata dump) '10ronaldinho', '3gerardpique', + 'aclu', 'adele', 'akshaykumar', 'aliaa08', diff --git a/packages/pds/src/app-view/api/app/bsky/util/feed.ts b/packages/pds/src/app-view/api/app/bsky/util/feed.ts index 606580b3046..9d9d0323b95 100644 --- a/packages/pds/src/app-view/api/app/bsky/util/feed.ts +++ b/packages/pds/src/app-view/api/app/bsky/util/feed.ts @@ -12,7 +12,7 @@ export class FeedKeyset extends TimeCidKeyset { } // For users with sparse feeds, avoid scanning more than one week for a single page -export const getFeedDateThreshold = (from: string | undefined, days = 3) => { +export const getFeedDateThreshold = (from: string | undefined, days = 1) => { const timelineDateThreshold = from ? new Date(from) : new Date() timelineDateThreshold.setDate(timelineDateThreshold.getDate() - days) return timelineDateThreshold.toISOString() diff --git a/packages/pds/src/app-view/services/actor/index.ts b/packages/pds/src/app-view/services/actor/index.ts index 8ed28e4691b..e6c6fd6e756 100644 --- a/packages/pds/src/app-view/services/actor/index.ts +++ b/packages/pds/src/app-view/services/actor/index.ts @@ -3,15 +3,20 @@ import { DidHandle } from '../../../db/tables/did-handle' import { notSoftDeletedClause } from '../../../db/util' import { ActorViews } from './views' import { ImageUriBuilder } from '../../../image/uri' +import { LabelCache } from '../../../label-cache' export class ActorService { - constructor(public db: Database, public imgUriBuilder: ImageUriBuilder) {} + constructor( + public db: Database, + public imgUriBuilder: ImageUriBuilder, + public labelCache: LabelCache, + ) {} - static creator(imgUriBuilder: ImageUriBuilder) { - return (db: Database) => new ActorService(db, imgUriBuilder) + static creator(imgUriBuilder: ImageUriBuilder, labelCache: LabelCache) { + return (db: Database) => new ActorService(db, imgUriBuilder, labelCache) } - views = new ActorViews(this.db, this.imgUriBuilder) + views = new ActorViews(this.db, this.imgUriBuilder, this.labelCache) async getActor( handleOrDid: string, diff --git a/packages/pds/src/app-view/services/actor/views.ts b/packages/pds/src/app-view/services/actor/views.ts index 50fd021a78c..b0f652ef901 100644 --- a/packages/pds/src/app-view/services/actor/views.ts +++ b/packages/pds/src/app-view/services/actor/views.ts @@ -8,13 +8,19 @@ import { DidHandle } from '../../../db/tables/did-handle' import Database from '../../../db' import { ImageUriBuilder } from '../../../image/uri' import { LabelService } from '../label' -import { ListViewBasic } from '../../../lexicon/types/app/bsky/graph/defs' +import { GraphService } from '../graph' +import { LabelCache } from '../../../label-cache' export class ActorViews { - constructor(private db: Database, private imgUriBuilder: ImageUriBuilder) {} + constructor( + private db: Database, + private imgUriBuilder: ImageUriBuilder, + private labelCache: LabelCache, + ) {} services = { - label: LabelService.creator(), + label: LabelService.creator(this.labelCache)(this.db), + graph: GraphService.creator(this.imgUriBuilder)(this.db), } profileDetailed( @@ -82,18 +88,30 @@ export class ActorViews { .where('mutedByDid', '=', viewer) .select('did') .as('requesterMuted'), + this.db.db + .selectFrom('list_item') + .innerJoin('list_mute', 'list_mute.listUri', 'list_item.listUri') + .where('list_mute.mutedByDid', '=', viewer) + .whereRef('list_item.subjectDid', '=', ref('did_handle.did')) + .select('list_item.listUri') + .limit(1) + .as('requesterMutedByList'), ]) - const [profileInfos, labels, listMutes] = await Promise.all([ + const [profileInfos, labels] = await Promise.all([ profileInfosQb.execute(), - this.services.label(this.db).getLabelsForSubjects(dids), - this.getListMutes(dids, viewer), + this.services.label.getLabelsForSubjects(dids), ]) const profileInfoByDid = profileInfos.reduce((acc, info) => { return Object.assign(acc, { [info.did]: info }) }, {} as Record>) + const listUris: string[] = profileInfos + .map((a) => a.requesterMutedByList) + .filter((list) => !!list) + const listViews = await this.services.graph.getListViews(listUris, viewer) + const views = results.map((result) => { const profileInfo = profileInfoByDid[result.did] const avatar = profileInfo?.avatarCid @@ -114,8 +132,14 @@ export class ActorViews { postsCount: profileInfo?.postsCount || 0, indexedAt: profileInfo?.indexedAt || undefined, viewer: { - muted: !!profileInfo?.requesterMuted || !!listMutes[result.did], - mutedByList: listMutes[result.did], + muted: + !!profileInfo?.requesterMuted || + !!profileInfo?.requesterMutedByList, + mutedByList: profileInfo.requesterMutedByList + ? this.services.graph.formatListViewBasic( + listViews[profileInfo.requesterMutedByList], + ) + : undefined, blockedBy: !!profileInfo.requesterBlockedBy, blocking: profileInfo.requesterBlocking || undefined, following: profileInfo?.requesterFollowing || undefined, @@ -181,18 +205,30 @@ export class ActorViews { .where('mutedByDid', '=', viewer) .select('did') .as('requesterMuted'), + this.db.db + .selectFrom('list_item') + .innerJoin('list_mute', 'list_mute.listUri', 'list_item.listUri') + .where('list_mute.mutedByDid', '=', viewer) + .whereRef('list_item.subjectDid', '=', ref('did_handle.did')) + .select('list_item.listUri') + .limit(1) + .as('requesterMutedByList'), ]) - const [profileInfos, labels, listMutes] = await Promise.all([ + const [profileInfos, labels] = await Promise.all([ profileInfosQb.execute(), - this.services.label(this.db).getLabelsForSubjects(dids), - this.getListMutes(dids, viewer), + this.services.label.getLabelsForSubjects(dids), ]) const profileInfoByDid = profileInfos.reduce((acc, info) => { return Object.assign(acc, { [info.did]: info }) }, {} as Record>) + const listUris: string[] = profileInfos + .map((a) => a.requesterMutedByList) + .filter((list) => !!list) + const listViews = await this.services.graph.getListViews(listUris, viewer) + const views = results.map((result) => { const profileInfo = profileInfoByDid[result.did] const avatar = profileInfo?.avatarCid @@ -206,8 +242,14 @@ export class ActorViews { avatar, indexedAt: profileInfo?.indexedAt || undefined, viewer: { - muted: !!profileInfo?.requesterMuted || !!listMutes[result.did], - mutedByList: listMutes[result.did], + muted: + !!profileInfo?.requesterMuted || + !!profileInfo?.requesterMutedByList, + mutedByList: profileInfo.requesterMutedByList + ? this.services.graph.formatListViewBasic( + listViews[profileInfo.requesterMutedByList], + ) + : undefined, blockedBy: !!profileInfo.requesterBlockedBy, blocking: profileInfo.requesterBlocking || undefined, following: profileInfo?.requesterFollowing || undefined, @@ -245,41 +287,6 @@ export class ActorViews { return Array.isArray(result) ? views : views[0] } - - async getListMutes( - subjects: string[], - mutedBy: string, - ): Promise> { - if (subjects.length < 1) return {} - const res = await this.db.db - .selectFrom('list_item') - .innerJoin('list_mute', 'list_mute.listUri', 'list_item.listUri') - .innerJoin('list', 'list.uri', 'list_item.listUri') - .where('list_mute.mutedByDid', '=', mutedBy) - .where('list_item.subjectDid', 'in', subjects) - .selectAll('list') - .select('list_item.subjectDid as subjectDid') - .execute() - return res.reduce( - (acc, cur) => ({ - ...acc, - [cur.subjectDid]: { - uri: cur.uri, - cid: cur.cid, - name: cur.name, - purpose: cur.purpose, - avatar: cur.avatarCid - ? this.imgUriBuilder.getCommonSignedUri('avatar', cur.avatarCid) - : undefined, - viewer: { - muted: true, - }, - indexedAt: cur.indexedAt, - }, - }), - {} as Record, - ) - } } type ActorResult = DidHandle diff --git a/packages/pds/src/app-view/services/feed/index.ts b/packages/pds/src/app-view/services/feed/index.ts index 326588efe36..732ba6623b3 100644 --- a/packages/pds/src/app-view/services/feed/index.ts +++ b/packages/pds/src/app-view/services/feed/index.ts @@ -1,6 +1,7 @@ import { sql } from 'kysely' import { AtUri } from '@atproto/uri' import { dedupeStrs } from '@atproto/common' +import { cborToLexRecord } from '@atproto/repo' import Database from '../../../db' import { countAll, notSoftDeletedClause } from '../../../db/util' import { ImageUriBuilder } from '../../../image/uri' @@ -28,21 +29,25 @@ import { LabelService, Labels } from '../label' import { ActorService } from '../actor' import { GraphService } from '../graph' import { FeedViews } from './views' -import { cborToLexRecord } from '@atproto/repo' +import { LabelCache } from '../../../label-cache' export * from './types' export class FeedService { - constructor(public db: Database, public imgUriBuilder: ImageUriBuilder) {} + constructor( + public db: Database, + public imgUriBuilder: ImageUriBuilder, + public labelCache: LabelCache, + ) {} - static creator(imgUriBuilder: ImageUriBuilder) { - return (db: Database) => new FeedService(db, imgUriBuilder) + static creator(imgUriBuilder: ImageUriBuilder, labelCache: LabelCache) { + return (db: Database) => new FeedService(db, imgUriBuilder, labelCache) } views = new FeedViews(this.db, this.imgUriBuilder) services = { - label: LabelService.creator()(this.db), - actor: ActorService.creator(this.imgUriBuilder)(this.db), + label: LabelService.creator(this.labelCache)(this.db), + actor: ActorService.creator(this.imgUriBuilder, this.labelCache)(this.db), graph: GraphService.creator(this.imgUriBuilder)(this.db), } @@ -114,7 +119,7 @@ export class FeedService { if (dids.length < 1) return {} const { ref } = this.db.db.dynamic const { skipLabels = false, includeSoftDeleted = false } = opts ?? {} - const [actors, labels, listMutes] = await Promise.all([ + const [actors, labels] = await Promise.all([ this.db.db .selectFrom('did_handle') .where('did_handle.did', 'in', dids) @@ -160,11 +165,25 @@ export class FeedService { .where('mutedByDid', '=', requester) .select('did') .as('requesterMuted'), + this.db.db + .selectFrom('list_item') + .innerJoin('list_mute', 'list_mute.listUri', 'list_item.listUri') + .where('list_mute.mutedByDid', '=', requester) + .whereRef('list_item.subjectDid', '=', ref('did_handle.did')) + .select('list_item.listUri') + .limit(1) + .as('requesterMutedByList'), ]) .execute(), this.services.label.getLabelsForSubjects(skipLabels ? [] : dids), - this.services.actor.views.getListMutes(dids, requester), ]) + const listUris: string[] = actors + .map((a) => a.requesterMutedByList) + .filter((list) => !!list) + const listViews = await this.services.graph.getListViews( + listUris, + requester, + ) return actors.reduce((acc, cur) => { const actorLabels = labels[cur.did] ?? [] return { @@ -177,8 +196,12 @@ export class FeedService { ? this.imgUriBuilder.getCommonSignedUri('avatar', cur.avatarCid) : undefined, viewer: { - muted: !!cur?.requesterMuted || !!listMutes[cur.did], - mutedByList: listMutes[cur.did], + muted: !!cur?.requesterMuted || !!cur?.requesterMutedByList, + mutedByList: cur.requesterMutedByList + ? this.services.graph.formatListViewBasic( + listViews[cur.requesterMutedByList], + ) + : undefined, blockedBy: !!cur?.requesterBlockedBy, blocking: cur?.requesterBlocking || undefined, following: cur?.requesterFollowing || undefined, diff --git a/packages/pds/src/app-view/services/graph/index.ts b/packages/pds/src/app-view/services/graph/index.ts index bded56aeed4..94f7c58f54d 100644 --- a/packages/pds/src/app-view/services/graph/index.ts +++ b/packages/pds/src/app-view/services/graph/index.ts @@ -132,6 +132,22 @@ export class GraphService { }, } } + + formatListViewBasic(list: ListInfo) { + return { + uri: list.uri, + cid: list.cid, + name: list.name, + purpose: list.purpose, + avatar: list.avatarCid + ? this.imgUriBuilder.getCommonSignedUri('avatar', list.avatarCid) + : undefined, + indexedAt: list.indexedAt, + viewer: { + muted: !!list.viewerMuted, + }, + } + } } type ListInfo = List & { diff --git a/packages/pds/src/app-view/services/label/index.ts b/packages/pds/src/app-view/services/label/index.ts index 9de4670c16e..aa00ca29f95 100644 --- a/packages/pds/src/app-view/services/label/index.ts +++ b/packages/pds/src/app-view/services/label/index.ts @@ -3,14 +3,15 @@ import Database from '../../../db' import { Label } from '../../../lexicon/types/com/atproto/label/defs' import { ids } from '../../../lexicon/lexicons' import { sql } from 'kysely' +import { LabelCache } from '../../../label-cache' export type Labels = Record export class LabelService { - constructor(public db: Database) {} + constructor(public db: Database, public cache: LabelCache) {} - static creator() { - return (db: Database) => new LabelService(db) + static creator(cache: LabelCache) { + return (db: Database) => new LabelService(db, cache) } async formatAndCreate( @@ -63,14 +64,17 @@ export class LabelService { async getLabelsForUris( subjects: string[], includeNeg?: boolean, + skipCache?: boolean, ): Promise { if (subjects.length < 1) return {} - const res = await this.db.db - .selectFrom('label') - .where('label.uri', 'in', subjects) - .if(!includeNeg, (qb) => qb.where('neg', '=', 0)) - .selectAll() - .execute() + const res = skipCache + ? await this.db.db + .selectFrom('label') + .where('label.uri', 'in', subjects) + .if(!includeNeg, (qb) => qb.where('neg', '=', 0)) + .selectAll() + .execute() + : this.cache.forSubjects(subjects, includeNeg) return res.reduce((acc, cur) => { acc[cur.uri] ??= [] acc[cur.uri].push({ @@ -86,6 +90,7 @@ export class LabelService { async getLabelsForSubjects( subjects: string[], includeNeg?: boolean, + skipCache?: boolean, ): Promise { if (subjects.length < 1) return {} const expandedSubjects = subjects.flatMap((subject) => { @@ -97,7 +102,11 @@ export class LabelService { } return subject }) - const labels = await this.getLabelsForUris(expandedSubjects, includeNeg) + const labels = await this.getLabelsForUris( + expandedSubjects, + includeNeg, + skipCache, + ) return Object.keys(labels).reduce((acc, cur) => { const uri = cur.startsWith('at://') ? new AtUri(cur) : null if ( @@ -116,8 +125,12 @@ export class LabelService { }, {} as Labels) } - async getLabels(subject: string, includeNeg?: boolean): Promise { - const labels = await this.getLabelsForUris([subject], includeNeg) + async getLabels( + subject: string, + includeNeg?: boolean, + skipCache?: boolean, + ): Promise { + const labels = await this.getLabelsForUris([subject], includeNeg, skipCache) return labels[subject] ?? [] } diff --git a/packages/pds/src/context.ts b/packages/pds/src/context.ts index 1e3ce8167d4..093edb1ba84 100644 --- a/packages/pds/src/context.ts +++ b/packages/pds/src/context.ts @@ -18,6 +18,7 @@ import { BackgroundQueue } from './event-stream/background-queue' import DidSqlCache from './did-cache' import { MountedAlgos } from './feed-gen/types' import { Crawlers } from './crawlers' +import { LabelCache } from './label-cache' export class AppContext { private _appviewAgent: AtpAgent | null @@ -39,6 +40,7 @@ export class AppContext { sequencer: Sequencer sequencerLeader: SequencerLeader labeler: Labeler + labelCache: LabelCache backgroundQueue: BackgroundQueue crawlers: Crawlers algos: MountedAlgos @@ -131,6 +133,10 @@ export class AppContext { return this.opts.labeler } + get labelCache(): LabelCache { + return this.opts.labelCache + } + get backgroundQueue(): BackgroundQueue { return this.opts.backgroundQueue } diff --git a/packages/pds/src/feed-gen/with-friends.ts b/packages/pds/src/feed-gen/with-friends.ts index a4a40364563..682387a5e61 100644 --- a/packages/pds/src/feed-gen/with-friends.ts +++ b/packages/pds/src/feed-gen/with-friends.ts @@ -12,39 +12,56 @@ const handler: AlgoHandler = async ( params: SkeletonParams, requester: string, ): Promise => { - const { cursor, limit = 50 } = params - const accountService = ctx.services.account(ctx.db) - const feedService = ctx.services.appView.feed(ctx.db) - const graphService = ctx.services.appView.graph(ctx.db) + // Temporary change to only return a post notifying users that the feed is down + return { + feedItems: [ + { + type: 'post', + uri: 'at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jzinucnmbi2c', + cid: 'bafyreifmtn55tubbv7tefrq277nzfy4zu7ioithky276aho5ehb6w3nu6q', + postUri: + 'at://did:plc:z72i7hdynmk6r22z27h6tvur/app.bsky.feed.post/3jzinucnmbi2c', + postAuthorDid: 'did:plc:z72i7hdynmk6r22z27h6tvur', + originatorDid: 'did:plc:z72i7hdynmk6r22z27h6tvur', + replyParent: null, + replyRoot: null, + sortAt: '2023-07-01T23:04:27.853Z', + }, + ], + } + // const { cursor, limit = 50 } = params + // const accountService = ctx.services.account(ctx.db) + // const feedService = ctx.services.appView.feed(ctx.db) + // const graphService = ctx.services.appView.graph(ctx.db) - const { ref } = ctx.db.db.dynamic + // const { ref } = ctx.db.db.dynamic - const keyset = new FeedKeyset(ref('post.indexedAt'), ref('post.cid')) - const sortFrom = keyset.unpack(cursor)?.primary + // const keyset = new FeedKeyset(ref('post.indexedAt'), ref('post.cid')) + // const sortFrom = keyset.unpack(cursor)?.primary - let postsQb = feedService - .selectPostQb() - .innerJoin('post_agg', 'post_agg.uri', 'post.uri') - .where('post_agg.likeCount', '>=', 5) - .whereExists((qb) => - qb - .selectFrom('follow') - .where('follow.creator', '=', requester) - .whereRef('follow.subjectDid', '=', 'post.creator'), - ) - .where((qb) => - accountService.whereNotMuted(qb, requester, [ref('post.creator')]), - ) - .whereNotExists(graphService.blockQb(requester, [ref('post.creator')])) - .where('post.indexedAt', '>', getFeedDateThreshold(sortFrom)) + // let postsQb = feedService + // .selectPostQb() + // // .innerJoin('post_agg', 'post_agg.uri', 'post.uri') + // // .where('post_agg.likeCount', '>=', 6) + // .whereExists((qb) => + // qb + // .selectFrom('follow') + // .where('follow.creator', '=', requester) + // .whereRef('follow.subjectDid', '=', 'post.creator'), + // ) + // .where((qb) => + // accountService.whereNotMuted(qb, requester, [ref('post.creator')]), + // ) + // .whereNotExists(graphService.blockQb(requester, [ref('post.creator')])) + // .where('post.indexedAt', '>', getFeedDateThreshold(sortFrom)) - postsQb = paginate(postsQb, { limit, cursor, keyset, tryIndex: true }) + // postsQb = paginate(postsQb, { limit, cursor, keyset, tryIndex: true }) - const feedItems = await postsQb.execute() - return { - feedItems, - cursor: keyset.packFromResult(feedItems), - } + // const feedItems = await postsQb.execute() + // return { + // feedItems, + // cursor: keyset.packFromResult(feedItems), + // } } export default handler diff --git a/packages/pds/src/index.ts b/packages/pds/src/index.ts index f77b654a069..3449425f63b 100644 --- a/packages/pds/src/index.ts +++ b/packages/pds/src/index.ts @@ -41,6 +41,7 @@ import { BackgroundQueue } from './event-stream/background-queue' import DidSqlCache from './did-cache' import { MountedAlgos } from './feed-gen/types' import { Crawlers } from './crawlers' +import { LabelCache } from './label-cache' export type { ServerConfigValues } from './config' export { ServerConfig } from './config' @@ -176,6 +177,8 @@ export class PDS { }) } + const labelCache = new LabelCache(db) + const services = createServices({ repoSigningKey, messageDispatcher, @@ -183,6 +186,7 @@ export class PDS { imgUriBuilder, imgInvalidator, labeler, + labelCache, backgroundQueue, crawlers, }) @@ -200,6 +204,7 @@ export class PDS { sequencer, sequencerLeader, labeler, + labelCache, services, mailer, imgUriBuilder, @@ -266,6 +271,7 @@ export class PDS { this.ctx.sequencerLeader.run() await this.ctx.sequencer.start() await this.ctx.db.startListeningToChannels() + this.ctx.labelCache.start() const server = this.app.listen(this.ctx.cfg.port) this.server = server this.server.keepAliveTimeout = 90000 @@ -275,6 +281,7 @@ export class PDS { } async destroy(): Promise { + this.ctx.labelCache.stop() await this.ctx.sequencerLeader.destroy() await this.terminator?.terminate() await this.ctx.backgroundQueue.destroy() diff --git a/packages/pds/src/label-cache.ts b/packages/pds/src/label-cache.ts new file mode 100644 index 00000000000..e4f23daa599 --- /dev/null +++ b/packages/pds/src/label-cache.ts @@ -0,0 +1,90 @@ +import { wait } from '@atproto/common' +import Database from './db' +import { Label } from './db/tables/label' +import { labelerLogger as log } from './logger' + +export class LabelCache { + bySubject: Record = {} + latestLabel = '' + refreshes = 0 + + destroyed = false + + constructor(public db: Database) {} + + start() { + this.poll() + } + + async fullRefresh() { + const allLabels = await this.db.db.selectFrom('label').selectAll().execute() + this.wipeCache() + this.processLabels(allLabels) + } + + async partialRefresh() { + const labels = await this.db.db + .selectFrom('label') + .selectAll() + .where('cts', '>', this.latestLabel) + .execute() + this.processLabels(labels) + } + + async poll() { + try { + if (this.destroyed) return + if (this.refreshes >= 120) { + await this.fullRefresh() + this.refreshes = 0 + } else { + await this.partialRefresh() + this.refreshes++ + } + } catch (err) { + log.error( + { err, latestLabel: this.latestLabel, refreshes: this.refreshes }, + 'label cache failed to refresh', + ) + } + await wait(500) + this.poll() + } + + processLabels(labels: Label[]) { + for (const label of labels) { + if (label.cts > this.latestLabel) { + this.latestLabel = label.cts + } + this.bySubject[label.uri] ??= [] + this.bySubject[label.uri].push(label) + } + } + + wipeCache() { + this.bySubject = {} + } + + stop() { + this.destroyed = true + } + + forSubject(subject: string, includeNeg = false): Label[] { + const labels = this.bySubject[subject] ?? [] + return includeNeg ? labels : labels.filter((l) => l.neg === 0) + } + + forSubjects(subjects: string[], includeNeg?: boolean): Label[] { + let labels: Label[] = [] + const alreadyAdded = new Set() + for (const subject of subjects) { + if (alreadyAdded.has(subject)) { + continue + } + const subLabels = this.forSubject(subject, includeNeg) + labels = [...labels, ...subLabels] + alreadyAdded.add(subject) + } + return labels + } +} diff --git a/packages/pds/src/sequencer/outbox.ts b/packages/pds/src/sequencer/outbox.ts index 335cd33cc3c..f0f62421a3d 100644 --- a/packages/pds/src/sequencer/outbox.ts +++ b/packages/pds/src/sequencer/outbox.ts @@ -108,14 +108,14 @@ export class Outbox { const evts = await this.sequencer.requestSeqRange({ earliestTime: backfillTime, earliestSeq: this.lastSeen > -1 ? this.lastSeen : backfillCursor, - limit: 10, + limit: 100, }) for (const evt of evts) { yield evt } // if we're within 50 of the sequencer, we call it good & switch to cutover const seqCursor = this.sequencer.lastSeen ?? -1 - if (seqCursor - this.lastSeen < 10) break + if (seqCursor - this.lastSeen < 100) break if (evts.length < 1) break } } diff --git a/packages/pds/src/sequencer/sequencer-leader.ts b/packages/pds/src/sequencer/sequencer-leader.ts index c951fda0d25..32964243f2e 100644 --- a/packages/pds/src/sequencer/sequencer-leader.ts +++ b/packages/pds/src/sequencer/sequencer-leader.ts @@ -1,5 +1,5 @@ import { DisconnectError } from '@atproto/xrpc-server' -import { jitter, wait } from '@atproto/common' +import { chunkArray, jitter, wait } from '@atproto/common' import { Leader } from '../db/leader' import { seqLogger as log } from '../logger' import Database from '../db' @@ -112,12 +112,20 @@ export class SequencerLeader { async sequenceOutgoing() { const unsequenced = await this.getUnsequenced() - for (const row of unsequenced) { - await this.db.db - .updateTable('repo_seq') - .set({ seq: this.nextSeqVal() }) - .where('id', '=', row.id) - .execute() + const chunks = chunkArray(unsequenced, 2000) + for (const chunk of chunks) { + await this.db.transaction(async (dbTxn) => { + await Promise.all( + chunk.map(async (row) => { + await dbTxn.db + .updateTable('repo_seq') + .set({ seq: this.nextSeqVal() }) + .where('id', '=', row.id) + .execute() + await this.db.notify('outgoing_repo_seq') + }), + ) + }) await this.db.notify('outgoing_repo_seq') } } diff --git a/packages/pds/src/sequencer/sequencer.ts b/packages/pds/src/sequencer/sequencer.ts index 4ea3b4ac836..61bcfa0efa7 100644 --- a/packages/pds/src/sequencer/sequencer.ts +++ b/packages/pds/src/sequencer/sequencer.ts @@ -126,7 +126,7 @@ export class Sequencer extends (EventEmitter as new () => SequencerEmitter) { try { const evts = await this.requestSeqRange({ earliestSeq: this.lastSeen, - limit: 10, + limit: 50, }) if (evts.length > 0) { this.emit('events', evts) diff --git a/packages/pds/src/services/index.ts b/packages/pds/src/services/index.ts index dd23198a8fa..f89fd917082 100644 --- a/packages/pds/src/services/index.ts +++ b/packages/pds/src/services/index.ts @@ -17,6 +17,7 @@ import { Labeler } from '../labeler' import { LabelService } from '../app-view/services/label' import { BackgroundQueue } from '../event-stream/background-queue' import { Crawlers } from '../crawlers' +import { LabelCache } from '../label-cache' export function createServices(resources: { repoSigningKey: crypto.Keypair @@ -25,6 +26,7 @@ export function createServices(resources: { imgUriBuilder: ImageUriBuilder imgInvalidator: ImageInvalidator labeler: Labeler + labelCache: LabelCache backgroundQueue: BackgroundQueue crawlers: Crawlers }): Services { @@ -35,6 +37,7 @@ export function createServices(resources: { imgUriBuilder, imgInvalidator, labeler, + labelCache, backgroundQueue, crawlers, } = resources @@ -57,11 +60,11 @@ export function createServices(resources: { imgInvalidator, ), appView: { - actor: ActorService.creator(imgUriBuilder), + actor: ActorService.creator(imgUriBuilder, labelCache), graph: GraphService.creator(imgUriBuilder), - feed: FeedService.creator(imgUriBuilder), + feed: FeedService.creator(imgUriBuilder, labelCache), indexing: IndexingService.creator(backgroundQueue), - label: LabelService.creator(), + label: LabelService.creator(labelCache), }, } } diff --git a/packages/pds/src/services/repo/index.ts b/packages/pds/src/services/repo/index.ts index c41e01b4c18..7dc1dac4252 100644 --- a/packages/pds/src/services/repo/index.ts +++ b/packages/pds/src/services/repo/index.ts @@ -132,10 +132,12 @@ export class RepoService { toWrite: { did: string; writes: PreparedWrite[]; swapCommitCid?: CID }, times: number, timeout = 100, + prevStorage?: SqlRepoStorage, ) { this.db.assertNotTransaction() const { did, writes, swapCommitCid } = toWrite - const storage = new SqlRepoStorage(this.db, did) + // we may have some useful cached blocks in the storage, so re-use the previous instance + const storage = prevStorage ?? new SqlRepoStorage(this.db, did) const commit = await this.formatCommit(storage, did, writes, swapCommitCid) try { await this.serviceTx(async (srvcTx) => @@ -147,7 +149,7 @@ export class RepoService { throw err } await wait(timeout) - return this.processWrites(toWrite, times - 1, timeout) + return this.processWrites(toWrite, times - 1, timeout, storage) } else { throw err } @@ -169,6 +171,8 @@ export class RepoService { if (swapCommit && !currRoot.equals(swapCommit)) { throw new BadCommitSwapError(currRoot) } + // cache last commit since there's likely overlap + await storage.cacheCommit(currRoot) const recordTxn = this.services.record(this.db) for (const write of writes) { const { action, uri, swapCid } = write diff --git a/packages/pds/src/sql-repo-storage.ts b/packages/pds/src/sql-repo-storage.ts index 3002ebe6ecf..0b9928c0371 100644 --- a/packages/pds/src/sql-repo-storage.ts +++ b/packages/pds/src/sql-repo-storage.ts @@ -42,6 +42,24 @@ export class SqlRepoStorage extends RepoStorage { return CID.parse(res.root) } + // proactively cache all blocks from a particular commit (to prevent multiple roundtrips) + async cacheCommit(cid: CID): Promise { + const res = await this.db.db + .selectFrom('repo_commit_block') + .innerJoin('ipld_block', (join) => + join + .onRef('ipld_block.cid', '=', 'repo_commit_block.block') + .onRef('ipld_block.creator', '=', 'repo_commit_block.creator'), + ) + .where('repo_commit_block.creator', '=', this.did) + .where('repo_commit_block.commit', '=', cid.toString()) + .select(['ipld_block.cid', 'ipld_block.content']) + .execute() + for (const row of res) { + this.cache.set(CID.parse(row.cid), row.content) + } + } + async getBytes(cid: CID): Promise { const cached = this.cache.get(cid) if (cached) return cached diff --git a/packages/pds/tests/_util.ts b/packages/pds/tests/_util.ts index 225b3a85a1d..115d62a4a9f 100644 --- a/packages/pds/tests/_util.ts +++ b/packages/pds/tests/_util.ts @@ -25,6 +25,7 @@ export type TestServerInfo = { url: string ctx: AppContext close: CloseFn + processAll: () => Promise } export type TestServerOpts = { @@ -154,6 +155,9 @@ export const runTestServer = async ( const pdsServer = await pds.start() const pdsPort = (pdsServer.address() as AddressInfo).port + // we refresh label cache by hand in `processAll` instead of on a timer + pds.ctx.labelCache.stop() + return { url: `http://localhost:${pdsPort}`, ctx: pds.ctx, @@ -161,6 +165,10 @@ export const runTestServer = async ( await pds.destroy() await plcServer.destroy() }, + processAll: async () => { + await pds.ctx.backgroundQueue.processAll() + await pds.ctx.labelCache.fullRefresh() + }, } } diff --git a/packages/pds/tests/account-deletion.test.ts b/packages/pds/tests/account-deletion.test.ts index 00004d78d7e..6720b52aee4 100644 --- a/packages/pds/tests/account-deletion.test.ts +++ b/packages/pds/tests/account-deletion.test.ts @@ -146,7 +146,7 @@ describe('account deletion', () => { did: carol.did, password: carol.password, }) - await server.ctx.backgroundQueue.processAll() // Finish background hard-deletions + await server.processAll() // Finish background hard-deletions }) it('no longer lets the user log in', async () => { diff --git a/packages/pds/tests/algos/hot-classic.test.ts b/packages/pds/tests/algos/hot-classic.test.ts index 180c4c90b92..1d5804f689a 100644 --- a/packages/pds/tests/algos/hot-classic.test.ts +++ b/packages/pds/tests/algos/hot-classic.test.ts @@ -35,7 +35,7 @@ describe('algo hot-classic', () => { alice = sc.dids.alice bob = sc.dids.bob - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => { @@ -63,7 +63,7 @@ describe('algo hot-classic', () => { await sc.like(sc.dids[name], two.ref) await sc.like(sc.dids[name], three.ref) } - await server.ctx.backgroundQueue.processAll() + await server.processAll() const res = await agent.api.app.bsky.feed.getFeed( { feed: feedUri }, diff --git a/packages/pds/tests/algos/whats-hot.test.ts b/packages/pds/tests/algos/whats-hot.test.ts index d26c73dcf62..08d9f1a1b82 100644 --- a/packages/pds/tests/algos/whats-hot.test.ts +++ b/packages/pds/tests/algos/whats-hot.test.ts @@ -38,7 +38,7 @@ describe('algo whats-hot', () => { alice = sc.dids.alice bob = sc.dids.bob carol = sc.dids.carol - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => { @@ -76,7 +76,7 @@ describe('algo whats-hot', () => { await sc.like(sc.dids[name], five.ref) } } - await server.ctx.backgroundQueue.processAll() + await server.processAll() // move the 3rd post 5 hours into the past to check gravity await server.ctx.db.db diff --git a/packages/pds/tests/algos/with-friends.test.ts b/packages/pds/tests/algos/with-friends.test.ts index 510274242cd..7e6e79357d9 100644 --- a/packages/pds/tests/algos/with-friends.test.ts +++ b/packages/pds/tests/algos/with-friends.test.ts @@ -40,7 +40,7 @@ describe.skip('algo with friends', () => { carol = sc.dids.carol dan = sc.dids.dan - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => { diff --git a/packages/pds/tests/blob-deletes.test.ts b/packages/pds/tests/blob-deletes.test.ts index bf902b57169..aa8122423ba 100644 --- a/packages/pds/tests/blob-deletes.test.ts +++ b/packages/pds/tests/blob-deletes.test.ts @@ -58,7 +58,7 @@ describe('blob deletes', () => { ) const post = await sc.post(alice, 'test', undefined, [img]) await sc.deletePost(alice, post.ref.uri) - await server.ctx.backgroundQueue.processAll() + await server.processAll() const dbBlobs = await getDbBlobsForDid(alice) expect(dbBlobs.length).toBe(0) @@ -80,7 +80,7 @@ describe('blob deletes', () => { ) await updateProfile(sc, alice, img.image, img.image) await updateProfile(sc, alice, img2.image, img2.image) - await server.ctx.backgroundQueue.processAll() + await server.processAll() const dbBlobs = await getDbBlobsForDid(alice) expect(dbBlobs.length).toBe(1) @@ -109,7 +109,7 @@ describe('blob deletes', () => { ) await updateProfile(sc, alice, img.image, img.image) await updateProfile(sc, alice, img.image, img2.image) - await server.ctx.backgroundQueue.processAll() + await server.processAll() const dbBlobs = await getDbBlobsForDid(alice) expect(dbBlobs.length).toBe(2) @@ -160,7 +160,7 @@ describe('blob deletes', () => { }, { encoding: 'application/json', headers: sc.getHeaders(alice) }, ) - await server.ctx.backgroundQueue.processAll() + await server.processAll() const dbBlobs = await getDbBlobsForDid(alice) expect(dbBlobs.length).toBe(1) diff --git a/packages/pds/tests/event-stream/sync.test.ts b/packages/pds/tests/event-stream/sync.test.ts index cf42f86223c..a596b7240ce 100644 --- a/packages/pds/tests/event-stream/sync.test.ts +++ b/packages/pds/tests/event-stream/sync.test.ts @@ -68,6 +68,7 @@ describe('sync', () => { services.repo(dbTxn).indexWrites(writes, now), ) } + await server.processAll() // Check indexed timeline const aliceTL = await agent.api.app.bsky.feed.getTimeline( {}, diff --git a/packages/pds/tests/feed-generation.test.ts b/packages/pds/tests/feed-generation.test.ts index e5c9ab8e9c2..caa1f38423c 100644 --- a/packages/pds/tests/feed-generation.test.ts +++ b/packages/pds/tests/feed-generation.test.ts @@ -38,7 +38,7 @@ describe('feed generation', () => { agent = network.pds.getClient() sc = new SeedClient(agent) await basicSeed(sc) - await network.pds.ctx.backgroundQueue.processAll() + await network.processAll() alice = sc.dids.alice const allUri = AtUri.make(alice, 'app.bsky.feed.generator', 'all') const feedUriBadPagination = AtUri.make( diff --git a/packages/pds/tests/indexing.test.ts b/packages/pds/tests/indexing.test.ts index 7402d7e6aa9..4aede170617 100644 --- a/packages/pds/tests/indexing.test.ts +++ b/packages/pds/tests/indexing.test.ts @@ -82,7 +82,7 @@ describe('indexing', () => { await services .repo(db) .processWrites({ did: sc.dids.alice, writes: [createRecord] }, 1) - await server.ctx.backgroundQueue.processAll() + await server.processAll() const getAfterCreate = await agent.api.app.bsky.feed.getPostThread( { uri: uri.toString() }, @@ -95,7 +95,7 @@ describe('indexing', () => { await services .repo(db) .processWrites({ did: sc.dids.alice, writes: [updateRecord] }, 1) - await server.ctx.backgroundQueue.processAll() + await server.processAll() const getAfterUpdate = await agent.api.app.bsky.feed.getPostThread( { uri: uri.toString() }, @@ -108,7 +108,7 @@ describe('indexing', () => { await services .repo(db) .processWrites({ did: sc.dids.alice, writes: [deleteRecord] }, 1) - await server.ctx.backgroundQueue.processAll() + await server.processAll() const getAfterDelete = agent.api.app.bsky.feed.getPostThread( { uri: uri.toString() }, @@ -157,7 +157,7 @@ describe('indexing', () => { await services .repo(db) .processWrites({ did: sc.dids.dan, writes: [createRecord] }, 1) - await server.ctx.backgroundQueue.processAll() + await server.processAll() const getAfterCreate = await agent.api.app.bsky.actor.getProfile( { actor: sc.dids.dan }, @@ -170,7 +170,7 @@ describe('indexing', () => { await services .repo(db) .processWrites({ did: sc.dids.dan, writes: [updateRecord] }, 1) - await server.ctx.backgroundQueue.processAll() + await server.processAll() const getAfterUpdate = await agent.api.app.bsky.actor.getProfile( { actor: sc.dids.dan }, @@ -183,7 +183,7 @@ describe('indexing', () => { await services .repo(db) .processWrites({ did: sc.dids.dan, writes: [deleteRecord] }, 1) - await server.ctx.backgroundQueue.processAll() + await server.processAll() const getAfterDelete = await agent.api.app.bsky.actor.getProfile( { actor: sc.dids.dan }, diff --git a/packages/pds/tests/labeler/labeler.test.ts b/packages/pds/tests/labeler/labeler.test.ts index ea943dcf46d..2525732357a 100644 --- a/packages/pds/tests/labeler/labeler.test.ts +++ b/packages/pds/tests/labeler/labeler.test.ts @@ -1,6 +1,6 @@ import { AtUri, BlobRef } from '@atproto/api' import stream from 'stream' -import { runTestServer, CloseFn } from '../_util' +import { runTestServer, CloseFn, TestServerInfo } from '../_util' import { Labeler } from '../../src/labeler' import { AppContext, Database } from '../../src' import { BlobStore, cidForRecord } from '@atproto/repo' @@ -11,6 +11,7 @@ import { LabelService } from '../../src/app-view/services/label' import { BackgroundQueue } from '../../src/event-stream/background-queue' describe('labeler', () => { + let server: TestServerInfo let close: CloseFn let labeler: Labeler let labelSrvc: LabelService @@ -21,7 +22,7 @@ describe('labeler', () => { let goodBlob: BlobRef beforeAll(async () => { - const server = await runTestServer({ + server = await runTestServer({ dbPostgresSchema: 'views_author_feed', }) close = server.close @@ -63,6 +64,7 @@ describe('labeler', () => { const uri = postUri() labeler.processRecord(uri, post) await labeler.processAll() + await server.processAll() const labels = await labelSrvc.getLabels(uri.toString()) expect(labels.length).toBe(1) expect(labels[0]).toMatchObject({ @@ -100,6 +102,7 @@ describe('labeler', () => { const uri = postUri() labeler.processRecord(uri, post) await labeler.processAll() + await server.processAll() const dbLabels = await labelSrvc.getLabels(uri.toString()) const labels = dbLabels.map((row) => row.val).sort() expect(labels).toEqual( @@ -119,6 +122,7 @@ describe('labeler', () => { cts: new Date().toISOString(), }) .execute() + await server.processAll() const labels = await labelSrvc.getLabelsForProfile('did:example:alice') // 4 from earlier & then just added one diff --git a/packages/pds/tests/migrations/blob-creator.test.ts b/packages/pds/tests/migrations/blob-creator.test.ts index 8d6843a1136..defe3bde6f4 100644 --- a/packages/pds/tests/migrations/blob-creator.test.ts +++ b/packages/pds/tests/migrations/blob-creator.test.ts @@ -4,7 +4,7 @@ import { cidForCbor, TID } from '@atproto/common' import { Kysely } from 'kysely' import { AtUri } from '@atproto/uri' -describe('blob creator migration', () => { +describe.skip('blob creator migration', () => { let db: Database let rawDb: Kysely diff --git a/packages/pds/tests/migrations/indexed-at-on-record.test.ts b/packages/pds/tests/migrations/indexed-at-on-record.test.ts index d2a3f8f4f8f..02147816203 100644 --- a/packages/pds/tests/migrations/indexed-at-on-record.test.ts +++ b/packages/pds/tests/migrations/indexed-at-on-record.test.ts @@ -4,7 +4,7 @@ import { dataToCborBlock, TID } from '@atproto/common' import { AtUri } from '@atproto/uri' import { Kysely } from 'kysely' -describe('indexedAt on record migration', () => { +describe.skip('indexedAt on record migration', () => { let db: Database let rawDb: Kysely diff --git a/packages/pds/tests/migrations/post-hierarchy.test.ts b/packages/pds/tests/migrations/post-hierarchy.test.ts index 145be7bb16c..3e4a0a4ff3d 100644 --- a/packages/pds/tests/migrations/post-hierarchy.test.ts +++ b/packages/pds/tests/migrations/post-hierarchy.test.ts @@ -5,7 +5,7 @@ import usersSeed from '../seeds/users' import threadSeed, { walk, item, Item } from '../seeds/thread' import { CloseFn, runTestServer } from '../_util' -describe('post hierarchy migration', () => { +describe.skip('post hierarchy migration', () => { let db: Database let close: CloseFn let sc: SeedClient diff --git a/packages/pds/tests/migrations/repo-sync-data-pt2.test.ts b/packages/pds/tests/migrations/repo-sync-data-pt2.test.ts index ad046836ed6..91971fb7043 100644 --- a/packages/pds/tests/migrations/repo-sync-data-pt2.test.ts +++ b/packages/pds/tests/migrations/repo-sync-data-pt2.test.ts @@ -5,7 +5,7 @@ import { CloseFn, runTestServer } from '../_util' import { SeedClient } from '../seeds/client' import basicSeed from '../seeds/basic' -describe('repo sync data migration', () => { +describe.skip('repo sync data migration', () => { let db: Database let rawDb: Kysely let close: CloseFn diff --git a/packages/pds/tests/migrations/repo-sync-data.test.ts b/packages/pds/tests/migrations/repo-sync-data.test.ts index 4b56aede9db..2fd17c199eb 100644 --- a/packages/pds/tests/migrations/repo-sync-data.test.ts +++ b/packages/pds/tests/migrations/repo-sync-data.test.ts @@ -5,7 +5,7 @@ import { TID } from '@atproto/common' import { Kysely } from 'kysely' import { CID } from 'multiformats/cid' -describe('repo sync data migration', () => { +describe.skip('repo sync data migration', () => { let db: Database let rawDb: Kysely let memoryStore: MemoryBlockstore diff --git a/packages/pds/tests/migrations/user-partitioned-cids.test.ts b/packages/pds/tests/migrations/user-partitioned-cids.test.ts index 048ccd9dacf..e6eff220445 100644 --- a/packages/pds/tests/migrations/user-partitioned-cids.test.ts +++ b/packages/pds/tests/migrations/user-partitioned-cids.test.ts @@ -5,7 +5,7 @@ import { Kysely } from 'kysely' import { Block } from 'multiformats/block' import * as uint8arrays from 'uint8arrays' -describe('user partitioned cids migration', () => { +describe.skip('user partitioned cids migration', () => { let db: Database let rawDb: Kysely diff --git a/packages/pds/tests/migrations/user-table-did-pkey.test.ts b/packages/pds/tests/migrations/user-table-did-pkey.test.ts index bda821a1539..881907e71b8 100644 --- a/packages/pds/tests/migrations/user-table-did-pkey.test.ts +++ b/packages/pds/tests/migrations/user-table-did-pkey.test.ts @@ -2,7 +2,7 @@ import { Database } from '../../src' import { randomStr } from '@atproto/crypto' import { Kysely } from 'kysely' -describe('user table did pkey migration', () => { +describe.skip('user table did pkey migration', () => { let db: Database let rawDb: Kysely diff --git a/packages/pds/tests/moderation.test.ts b/packages/pds/tests/moderation.test.ts index caf645b3c9d..19bcbee1ad3 100644 --- a/packages/pds/tests/moderation.test.ts +++ b/packages/pds/tests/moderation.test.ts @@ -292,7 +292,7 @@ describe('moderation', () => { password: 'password', token: deletionToken, }) - await server.ctx.backgroundQueue.processAll() + await server.processAll() // Take action on deleted content const { data: action } = await agent.api.com.atproto.admin.takeModerationAction( diff --git a/packages/pds/tests/proxied/feedgen.test.ts b/packages/pds/tests/proxied/feedgen.test.ts index 2914813750d..44a89e0b34c 100644 --- a/packages/pds/tests/proxied/feedgen.test.ts +++ b/packages/pds/tests/proxied/feedgen.test.ts @@ -36,7 +36,6 @@ describe('feedgen proxy view', () => { sc.getHeaders(sc.dids.alice), ) await network.processAll() - await network.bsky.ctx.backgroundQueue.processAll() // mock getFeedGenerator() for use by pds's getFeed since we don't have a proper feedGenDid or feed publisher FeedNS.prototype.getFeedGenerator = async function (params, opts) { if (params?.feed === feedUri.toString()) { diff --git a/packages/pds/tests/views/actor-search.test.ts b/packages/pds/tests/views/actor-search.test.ts index 5cb778db4a7..f778f1387c4 100644 --- a/packages/pds/tests/views/actor-search.test.ts +++ b/packages/pds/tests/views/actor-search.test.ts @@ -28,7 +28,7 @@ describe('pds user search views', () => { sc = new SeedClient(agent) await usersBulkSeed(sc) headers = sc.getHeaders(Object.values(sc.dids)[0]) - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => { diff --git a/packages/pds/tests/views/author-feed.test.ts b/packages/pds/tests/views/author-feed.test.ts index 68cdd0bc48b..64246f0fcf2 100644 --- a/packages/pds/tests/views/author-feed.test.ts +++ b/packages/pds/tests/views/author-feed.test.ts @@ -33,7 +33,7 @@ describe('pds author feed views', () => { bob = sc.dids.bob carol = sc.dids.carol dan = sc.dids.dan - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => { diff --git a/packages/pds/tests/views/blocks.test.ts b/packages/pds/tests/views/blocks.test.ts index 4d72e639b1d..a3815a6a668 100644 --- a/packages/pds/tests/views/blocks.test.ts +++ b/packages/pds/tests/views/blocks.test.ts @@ -43,7 +43,7 @@ describe('pds views with blocking', () => { sc.posts[dan][0].ref, 'alice replies to dan', ) - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => { diff --git a/packages/pds/tests/views/follows.test.ts b/packages/pds/tests/views/follows.test.ts index 982aa2c0d29..606b85f9d48 100644 --- a/packages/pds/tests/views/follows.test.ts +++ b/packages/pds/tests/views/follows.test.ts @@ -27,7 +27,7 @@ describe('pds follow views', () => { sc = new SeedClient(agent) await followsSeed(sc) alice = sc.dids.alice - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => { diff --git a/packages/pds/tests/views/likes.test.ts b/packages/pds/tests/views/likes.test.ts index 10bef798aba..d77251d8b25 100644 --- a/packages/pds/tests/views/likes.test.ts +++ b/packages/pds/tests/views/likes.test.ts @@ -28,7 +28,7 @@ describe('pds like views', () => { await likesSeed(sc) alice = sc.dids.alice bob = sc.dids.bob - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => { diff --git a/packages/pds/tests/views/mute-lists.test.ts b/packages/pds/tests/views/mute-lists.test.ts index 0cf15770617..64692a81b24 100644 --- a/packages/pds/tests/views/mute-lists.test.ts +++ b/packages/pds/tests/views/mute-lists.test.ts @@ -30,7 +30,7 @@ describe('pds views with mutes from mute lists', () => { // add follows to ensure mutes work even w follows await sc.follow(carol, dan) await sc.follow(dan, carol) - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => { diff --git a/packages/pds/tests/views/mutes.test.ts b/packages/pds/tests/views/mutes.test.ts index a3a17eff595..ac32d43ab6f 100644 --- a/packages/pds/tests/views/mutes.test.ts +++ b/packages/pds/tests/views/mutes.test.ts @@ -32,7 +32,7 @@ describe('mute views', () => { { headers: sc.getHeaders(silas), encoding: 'application/json' }, ) } - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => { diff --git a/packages/pds/tests/views/notifications.test.ts b/packages/pds/tests/views/notifications.test.ts index 67b00331e3c..b5d9907140c 100644 --- a/packages/pds/tests/views/notifications.test.ts +++ b/packages/pds/tests/views/notifications.test.ts @@ -33,7 +33,7 @@ describe('pds notification views', () => { sc = new SeedClient(agent) await basicSeed(sc) alice = sc.dids.alice - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => { @@ -75,7 +75,7 @@ describe('pds notification views', () => { sc.replies[alice][0].ref, 'indeed', ) - await server.ctx.backgroundQueue.processAll() + await server.processAll() const notifCountAlice = await agent.api.app.bsky.notification.getUnreadCount( diff --git a/packages/pds/tests/views/popular.test.ts b/packages/pds/tests/views/popular.test.ts index 1456f4cbc33..ca21ee31f4f 100644 --- a/packages/pds/tests/views/popular.test.ts +++ b/packages/pds/tests/views/popular.test.ts @@ -54,7 +54,7 @@ describe('popular views', () => { alice = sc.dids.alice bob = sc.dids.bob - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => { @@ -82,7 +82,7 @@ describe('popular views', () => { await sc.like(sc.dids[name], two.ref) await sc.like(sc.dids[name], three.ref) } - await server.ctx.backgroundQueue.processAll() + await server.processAll() const res = await agent.api.app.bsky.unspecced.getPopular( {}, diff --git a/packages/pds/tests/views/posts.test.ts b/packages/pds/tests/views/posts.test.ts index 55b2312cf12..7bf6e6919cf 100644 --- a/packages/pds/tests/views/posts.test.ts +++ b/packages/pds/tests/views/posts.test.ts @@ -15,7 +15,7 @@ describe('pds posts views', () => { agent = new AtpAgent({ service: server.url }) sc = new SeedClient(agent) await basicSeed(sc) - await server.ctx.labeler.processAll() + await server.processAll() }) afterAll(async () => { diff --git a/packages/pds/tests/views/profile.test.ts b/packages/pds/tests/views/profile.test.ts index 1f68946811d..b3f5520a8b9 100644 --- a/packages/pds/tests/views/profile.test.ts +++ b/packages/pds/tests/views/profile.test.ts @@ -27,7 +27,7 @@ describe('pds profile views', () => { alice = sc.dids.alice bob = sc.dids.bob dan = sc.dids.dan - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => { diff --git a/packages/pds/tests/views/reposts.test.ts b/packages/pds/tests/views/reposts.test.ts index 5e4eb6e0b37..1c61215e3e5 100644 --- a/packages/pds/tests/views/reposts.test.ts +++ b/packages/pds/tests/views/reposts.test.ts @@ -22,7 +22,7 @@ describe('pds repost views', () => { await repostsSeed(sc) alice = sc.dids.alice bob = sc.dids.bob - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => { diff --git a/packages/pds/tests/views/suggestions.test.ts b/packages/pds/tests/views/suggestions.test.ts index 1c075fe17a8..4bec6dc3dd2 100644 --- a/packages/pds/tests/views/suggestions.test.ts +++ b/packages/pds/tests/views/suggestions.test.ts @@ -16,7 +16,7 @@ describe('pds user search views', () => { agent = new AtpAgent({ service: server.url }) sc = new SeedClient(agent) await basicSeed(sc) - await server.ctx.backgroundQueue.processAll() + await server.processAll() const suggestions = [ { did: sc.dids.bob, order: 1 }, diff --git a/packages/pds/tests/views/thread.test.ts b/packages/pds/tests/views/thread.test.ts index 0f4fa68b35e..8838ea20e44 100644 --- a/packages/pds/tests/views/thread.test.ts +++ b/packages/pds/tests/views/thread.test.ts @@ -33,6 +33,7 @@ describe('pds thread views', () => { agent = new AtpAgent({ service: server.url }) sc = new SeedClient(agent) await basicSeed(sc) + await server.processAll() alice = sc.dids.alice bob = sc.dids.bob carol = sc.dids.carol @@ -41,7 +42,7 @@ describe('pds thread views', () => { beforeAll(async () => { // Add a repost of a reply so that we can confirm viewer state in the thread await sc.repost(bob, sc.replies[alice][0].ref) - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => { @@ -140,7 +141,7 @@ describe('pds thread views', () => { 'Reply reply', ) indexes.aliceReplyReply = sc.replies[alice].length - 1 - await server.ctx.backgroundQueue.processAll() + await server.processAll() const thread1 = await agent.api.app.bsky.feed.getPostThread( { uri: sc.posts[alice][indexes.aliceRoot].ref.uriStr }, @@ -149,7 +150,7 @@ describe('pds thread views', () => { expect(forSnapshot(thread1.data.thread)).toMatchSnapshot() await sc.deletePost(bob, sc.replies[bob][indexes.bobReply].ref.uri) - await server.ctx.backgroundQueue.processAll() + await server.processAll() const thread2 = await agent.api.app.bsky.feed.getPostThread( { uri: sc.posts[alice][indexes.aliceRoot].ref.uriStr }, diff --git a/packages/pds/tests/views/timeline.test.ts b/packages/pds/tests/views/timeline.test.ts index dc0d10a79cb..b55a96e691d 100644 --- a/packages/pds/tests/views/timeline.test.ts +++ b/packages/pds/tests/views/timeline.test.ts @@ -55,7 +55,7 @@ describe('timeline views', () => { labelPostB.cidStr, { create: ['kind'] }, ) - await server.ctx.backgroundQueue.processAll() + await server.processAll() }) afterAll(async () => {