Skip to content

Commit

Permalink
Misc scaling (#1284)
Browse files Browse the repository at this point in the history
* limit backsearch to 1 day instead of 3

* lower like count threshold

* bump to 6

* disable like count check

* disable with friends

* preemptively cache last commit

* inline list mutes

* actor service

* label cache

* placehodler on popular with friends

* bulk sequence

* no limit but chunk

* bump chunk to 5k

* try 10k

* fix notify

* tweaking

* syntax

* one more fix

* increase backfill allowance

* full refresh label cache

* limit 1 on mute list

* reserve aclu handle

* clean up testing with label cache

* note on with-friends

* rm defer from label cache

* label cache error handling

* rm branch build
  • Loading branch information
dholms authored Jul 5, 2023
1 parent bb2848e commit 4d1f8d3
Show file tree
Hide file tree
Showing 55 changed files with 414 additions and 169 deletions.
4 changes: 4 additions & 0 deletions packages/dev-env/src/bsky.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,10 @@ export class TestBsky {
return new AtpAgent({ service: this.url })
}

async processAll() {
await this.ctx.backgroundQueue.processAll()
}

async close() {
await this.server.destroy()
}
Expand Down
4 changes: 4 additions & 0 deletions packages/dev-env/src/network-no-appview.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ export class TestNetworkNoAppView {
return fg
}

async processAll() {
await this.pds.processAll()
}

async close() {
await Promise.all(this.feedGens.map((fg) => fg.close()))
await this.pds.close()
Expand Down
6 changes: 2 additions & 4 deletions packages/dev-env/src/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ export class TestNetwork extends TestNetworkNoAppView {
}

async processFullSubscription(timeout = 5000) {
if (!this.bsky) return
const sub = this.bsky.sub
if (!sub) return
const { db } = this.pds.ctx.db
Expand All @@ -76,10 +75,9 @@ export class TestNetwork extends TestNetworkNoAppView {
}

async processAll(timeout?: number) {
await this.pds.ctx.backgroundQueue.processAll()
if (!this.bsky) return
await this.pds.processAll()
await this.processFullSubscription(timeout)
await this.bsky.ctx.backgroundQueue.processAll()
await this.bsky.processAll()
}

async serviceHeaders(did: string, aud?: string) {
Expand Down
8 changes: 8 additions & 0 deletions packages/dev-env/src/pds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ export class TestPds {
})

await server.start()

// we refresh label cache by hand in `processAll` instead of on a timer
server.ctx.labelCache.stop()
return new TestPds(url, port, server)
}

Expand Down Expand Up @@ -123,6 +126,11 @@ export class TestPds {
}
}

async processAll() {
await this.ctx.backgroundQueue.processAll()
await this.ctx.labelCache.fullRefresh()
}

async close() {
await this.server.destroy()
}
Expand Down
1 change: 1 addition & 0 deletions packages/identifier/src/reserved.ts
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,7 @@ const famousAccounts = [
// reserving some large twitter accounts (top 100 by followers according to wikidata dump)
'10ronaldinho',
'3gerardpique',
'aclu',
'adele',
'akshaykumar',
'aliaa08',
Expand Down
2 changes: 1 addition & 1 deletion packages/pds/src/app-view/api/app/bsky/util/feed.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ export class FeedKeyset extends TimeCidKeyset<FeedRow> {
}

// For users with sparse feeds, avoid scanning more than one week for a single page
export const getFeedDateThreshold = (from: string | undefined, days = 3) => {
export const getFeedDateThreshold = (from: string | undefined, days = 1) => {
const timelineDateThreshold = from ? new Date(from) : new Date()
timelineDateThreshold.setDate(timelineDateThreshold.getDate() - days)
return timelineDateThreshold.toISOString()
Expand Down
13 changes: 9 additions & 4 deletions packages/pds/src/app-view/services/actor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,20 @@ import { DidHandle } from '../../../db/tables/did-handle'
import { notSoftDeletedClause } from '../../../db/util'
import { ActorViews } from './views'
import { ImageUriBuilder } from '../../../image/uri'
import { LabelCache } from '../../../label-cache'

export class ActorService {
constructor(public db: Database, public imgUriBuilder: ImageUriBuilder) {}
constructor(
public db: Database,
public imgUriBuilder: ImageUriBuilder,
public labelCache: LabelCache,
) {}

static creator(imgUriBuilder: ImageUriBuilder) {
return (db: Database) => new ActorService(db, imgUriBuilder)
static creator(imgUriBuilder: ImageUriBuilder, labelCache: LabelCache) {
return (db: Database) => new ActorService(db, imgUriBuilder, labelCache)
}

views = new ActorViews(this.db, this.imgUriBuilder)
views = new ActorViews(this.db, this.imgUriBuilder, this.labelCache)

async getActor(
handleOrDid: string,
Expand Down
103 changes: 55 additions & 48 deletions packages/pds/src/app-view/services/actor/views.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,19 @@ import { DidHandle } from '../../../db/tables/did-handle'
import Database from '../../../db'
import { ImageUriBuilder } from '../../../image/uri'
import { LabelService } from '../label'
import { ListViewBasic } from '../../../lexicon/types/app/bsky/graph/defs'
import { GraphService } from '../graph'
import { LabelCache } from '../../../label-cache'

export class ActorViews {
constructor(private db: Database, private imgUriBuilder: ImageUriBuilder) {}
constructor(
private db: Database,
private imgUriBuilder: ImageUriBuilder,
private labelCache: LabelCache,
) {}

services = {
label: LabelService.creator(),
label: LabelService.creator(this.labelCache)(this.db),
graph: GraphService.creator(this.imgUriBuilder)(this.db),
}

profileDetailed(
Expand Down Expand Up @@ -82,18 +88,30 @@ export class ActorViews {
.where('mutedByDid', '=', viewer)
.select('did')
.as('requesterMuted'),
this.db.db
.selectFrom('list_item')
.innerJoin('list_mute', 'list_mute.listUri', 'list_item.listUri')
.where('list_mute.mutedByDid', '=', viewer)
.whereRef('list_item.subjectDid', '=', ref('did_handle.did'))
.select('list_item.listUri')
.limit(1)
.as('requesterMutedByList'),
])

const [profileInfos, labels, listMutes] = await Promise.all([
const [profileInfos, labels] = await Promise.all([
profileInfosQb.execute(),
this.services.label(this.db).getLabelsForSubjects(dids),
this.getListMutes(dids, viewer),
this.services.label.getLabelsForSubjects(dids),
])

const profileInfoByDid = profileInfos.reduce((acc, info) => {
return Object.assign(acc, { [info.did]: info })
}, {} as Record<string, ArrayEl<typeof profileInfos>>)

const listUris: string[] = profileInfos
.map((a) => a.requesterMutedByList)
.filter((list) => !!list)
const listViews = await this.services.graph.getListViews(listUris, viewer)

const views = results.map((result) => {
const profileInfo = profileInfoByDid[result.did]
const avatar = profileInfo?.avatarCid
Expand All @@ -114,8 +132,14 @@ export class ActorViews {
postsCount: profileInfo?.postsCount || 0,
indexedAt: profileInfo?.indexedAt || undefined,
viewer: {
muted: !!profileInfo?.requesterMuted || !!listMutes[result.did],
mutedByList: listMutes[result.did],
muted:
!!profileInfo?.requesterMuted ||
!!profileInfo?.requesterMutedByList,
mutedByList: profileInfo.requesterMutedByList
? this.services.graph.formatListViewBasic(
listViews[profileInfo.requesterMutedByList],
)
: undefined,
blockedBy: !!profileInfo.requesterBlockedBy,
blocking: profileInfo.requesterBlocking || undefined,
following: profileInfo?.requesterFollowing || undefined,
Expand Down Expand Up @@ -181,18 +205,30 @@ export class ActorViews {
.where('mutedByDid', '=', viewer)
.select('did')
.as('requesterMuted'),
this.db.db
.selectFrom('list_item')
.innerJoin('list_mute', 'list_mute.listUri', 'list_item.listUri')
.where('list_mute.mutedByDid', '=', viewer)
.whereRef('list_item.subjectDid', '=', ref('did_handle.did'))
.select('list_item.listUri')
.limit(1)
.as('requesterMutedByList'),
])

const [profileInfos, labels, listMutes] = await Promise.all([
const [profileInfos, labels] = await Promise.all([
profileInfosQb.execute(),
this.services.label(this.db).getLabelsForSubjects(dids),
this.getListMutes(dids, viewer),
this.services.label.getLabelsForSubjects(dids),
])

const profileInfoByDid = profileInfos.reduce((acc, info) => {
return Object.assign(acc, { [info.did]: info })
}, {} as Record<string, ArrayEl<typeof profileInfos>>)

const listUris: string[] = profileInfos
.map((a) => a.requesterMutedByList)
.filter((list) => !!list)
const listViews = await this.services.graph.getListViews(listUris, viewer)

const views = results.map((result) => {
const profileInfo = profileInfoByDid[result.did]
const avatar = profileInfo?.avatarCid
Expand All @@ -206,8 +242,14 @@ export class ActorViews {
avatar,
indexedAt: profileInfo?.indexedAt || undefined,
viewer: {
muted: !!profileInfo?.requesterMuted || !!listMutes[result.did],
mutedByList: listMutes[result.did],
muted:
!!profileInfo?.requesterMuted ||
!!profileInfo?.requesterMutedByList,
mutedByList: profileInfo.requesterMutedByList
? this.services.graph.formatListViewBasic(
listViews[profileInfo.requesterMutedByList],
)
: undefined,
blockedBy: !!profileInfo.requesterBlockedBy,
blocking: profileInfo.requesterBlocking || undefined,
following: profileInfo?.requesterFollowing || undefined,
Expand Down Expand Up @@ -245,41 +287,6 @@ export class ActorViews {

return Array.isArray(result) ? views : views[0]
}

async getListMutes(
subjects: string[],
mutedBy: string,
): Promise<Record<string, ListViewBasic>> {
if (subjects.length < 1) return {}
const res = await this.db.db
.selectFrom('list_item')
.innerJoin('list_mute', 'list_mute.listUri', 'list_item.listUri')
.innerJoin('list', 'list.uri', 'list_item.listUri')
.where('list_mute.mutedByDid', '=', mutedBy)
.where('list_item.subjectDid', 'in', subjects)
.selectAll('list')
.select('list_item.subjectDid as subjectDid')
.execute()
return res.reduce(
(acc, cur) => ({
...acc,
[cur.subjectDid]: {
uri: cur.uri,
cid: cur.cid,
name: cur.name,
purpose: cur.purpose,
avatar: cur.avatarCid
? this.imgUriBuilder.getCommonSignedUri('avatar', cur.avatarCid)
: undefined,
viewer: {
muted: true,
},
indexedAt: cur.indexedAt,
},
}),
{} as Record<string, ListViewBasic>,
)
}
}

type ActorResult = DidHandle
Expand Down
43 changes: 33 additions & 10 deletions packages/pds/src/app-view/services/feed/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { sql } from 'kysely'
import { AtUri } from '@atproto/uri'
import { dedupeStrs } from '@atproto/common'
import { cborToLexRecord } from '@atproto/repo'
import Database from '../../../db'
import { countAll, notSoftDeletedClause } from '../../../db/util'
import { ImageUriBuilder } from '../../../image/uri'
Expand Down Expand Up @@ -28,21 +29,25 @@ import { LabelService, Labels } from '../label'
import { ActorService } from '../actor'
import { GraphService } from '../graph'
import { FeedViews } from './views'
import { cborToLexRecord } from '@atproto/repo'
import { LabelCache } from '../../../label-cache'

export * from './types'

export class FeedService {
constructor(public db: Database, public imgUriBuilder: ImageUriBuilder) {}
constructor(
public db: Database,
public imgUriBuilder: ImageUriBuilder,
public labelCache: LabelCache,
) {}

static creator(imgUriBuilder: ImageUriBuilder) {
return (db: Database) => new FeedService(db, imgUriBuilder)
static creator(imgUriBuilder: ImageUriBuilder, labelCache: LabelCache) {
return (db: Database) => new FeedService(db, imgUriBuilder, labelCache)
}

views = new FeedViews(this.db, this.imgUriBuilder)
services = {
label: LabelService.creator()(this.db),
actor: ActorService.creator(this.imgUriBuilder)(this.db),
label: LabelService.creator(this.labelCache)(this.db),
actor: ActorService.creator(this.imgUriBuilder, this.labelCache)(this.db),
graph: GraphService.creator(this.imgUriBuilder)(this.db),
}

Expand Down Expand Up @@ -114,7 +119,7 @@ export class FeedService {
if (dids.length < 1) return {}
const { ref } = this.db.db.dynamic
const { skipLabels = false, includeSoftDeleted = false } = opts ?? {}
const [actors, labels, listMutes] = await Promise.all([
const [actors, labels] = await Promise.all([
this.db.db
.selectFrom('did_handle')
.where('did_handle.did', 'in', dids)
Expand Down Expand Up @@ -160,11 +165,25 @@ export class FeedService {
.where('mutedByDid', '=', requester)
.select('did')
.as('requesterMuted'),
this.db.db
.selectFrom('list_item')
.innerJoin('list_mute', 'list_mute.listUri', 'list_item.listUri')
.where('list_mute.mutedByDid', '=', requester)
.whereRef('list_item.subjectDid', '=', ref('did_handle.did'))
.select('list_item.listUri')
.limit(1)
.as('requesterMutedByList'),
])
.execute(),
this.services.label.getLabelsForSubjects(skipLabels ? [] : dids),
this.services.actor.views.getListMutes(dids, requester),
])
const listUris: string[] = actors
.map((a) => a.requesterMutedByList)
.filter((list) => !!list)
const listViews = await this.services.graph.getListViews(
listUris,
requester,
)
return actors.reduce((acc, cur) => {
const actorLabels = labels[cur.did] ?? []
return {
Expand All @@ -177,8 +196,12 @@ export class FeedService {
? this.imgUriBuilder.getCommonSignedUri('avatar', cur.avatarCid)
: undefined,
viewer: {
muted: !!cur?.requesterMuted || !!listMutes[cur.did],
mutedByList: listMutes[cur.did],
muted: !!cur?.requesterMuted || !!cur?.requesterMutedByList,
mutedByList: cur.requesterMutedByList
? this.services.graph.formatListViewBasic(
listViews[cur.requesterMutedByList],
)
: undefined,
blockedBy: !!cur?.requesterBlockedBy,
blocking: cur?.requesterBlocking || undefined,
following: cur?.requesterFollowing || undefined,
Expand Down
16 changes: 16 additions & 0 deletions packages/pds/src/app-view/services/graph/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,22 @@ export class GraphService {
},
}
}

formatListViewBasic(list: ListInfo) {
return {
uri: list.uri,
cid: list.cid,
name: list.name,
purpose: list.purpose,
avatar: list.avatarCid
? this.imgUriBuilder.getCommonSignedUri('avatar', list.avatarCid)
: undefined,
indexedAt: list.indexedAt,
viewer: {
muted: !!list.viewerMuted,
},
}
}
}

type ListInfo = List & {
Expand Down
Loading

0 comments on commit 4d1f8d3

Please sign in to comment.