Skip to content

Commit

Permalink
Tidy repo sub usage of indexing service (#1296)
Browse files Browse the repository at this point in the history
* use single indexing service in repo sub

* tidy
  • Loading branch information
devinivy authored Jul 7, 2023
1 parent 0d3a555 commit d7f8741
Showing 1 changed file with 15 additions and 16 deletions.
31 changes: 15 additions & 16 deletions packages/bsky/src/subscription/repo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,16 @@ import {
def,
Commit,
} from '@atproto/repo'
import { ValidationError } from '@atproto/lexicon'
import { OutputSchema as Message } from '../lexicon/types/com/atproto/sync/subscribeRepos'
import * as message from '../lexicon/types/com/atproto/sync/subscribeRepos'
import { ids, lexicons } from '../lexicon/lexicons'
import Database from '../db'
import AppContext from '../context'
import { Leader } from '../db/leader'
import { IndexingService } from '../services/indexing'
import { subLogger } from '../logger'
import { ConsecutiveList, LatestQueue, PartitionedQueue } from './util'
import { ValidationError } from '@atproto/lexicon'

const METHOD = ids.ComAtprotoSyncSubscribeRepos
export const REPO_SUB_ID = 1000
Expand All @@ -31,6 +32,7 @@ export class RepoSubscription {
destroyed = false
lastSeq: number | undefined
lastCursor: number | undefined
indexingSvc: IndexingService

constructor(
public ctx: AppContext,
Expand All @@ -39,6 +41,7 @@ export class RepoSubscription {
public concurrency = Infinity,
) {
this.repoQueue = new PartitionedQueue({ concurrency })
this.indexingSvc = ctx.services.indexing(ctx.db)
}

async run() {
Expand Down Expand Up @@ -136,31 +139,29 @@ export class RepoSubscription {
}

private async handleCommit(msg: message.Commit) {
const { db, services } = this.ctx
const indexingService = services.indexing(db)
const indexRecords = async () => {
const { root, rootCid, ops } = await getOps(msg)
if (msg.tooBig) {
await indexingService.indexRepo(msg.repo, rootCid.toString())
await indexingService.setCommitLastSeen(root, msg)
await this.indexingSvc.indexRepo(msg.repo, rootCid.toString())
await this.indexingSvc.setCommitLastSeen(root, msg)
return
}
if (msg.rebase) {
const needsReindex = await indexingService.checkCommitNeedsIndexing(
const needsReindex = await this.indexingSvc.checkCommitNeedsIndexing(
root,
)
if (needsReindex) {
await indexingService.indexRepo(msg.repo, rootCid.toString())
await this.indexingSvc.indexRepo(msg.repo, rootCid.toString())
}
await indexingService.setCommitLastSeen(root, msg)
await this.indexingSvc.setCommitLastSeen(root, msg)
return
}
for (const op of ops) {
if (op.action === WriteOpAction.Delete) {
await indexingService.deleteRecord(op.uri)
await this.indexingSvc.deleteRecord(op.uri)
} else {
try {
await indexingService.indexRecord(
await this.indexingSvc.indexRecord(
op.uri,
op.cid,
op.record,
Expand Down Expand Up @@ -193,23 +194,21 @@ export class RepoSubscription {
}
}
}
await indexingService.setCommitLastSeen(root, msg)
await this.indexingSvc.setCommitLastSeen(root, msg)
}
const results = await Promise.allSettled([
indexRecords(),
indexingService.indexHandle(msg.repo, msg.time),
this.indexingSvc.indexHandle(msg.repo, msg.time),
])
handleAllSettledErrors(results)
}

private async handleUpdateHandle(msg: message.Handle) {
const { db, services } = this.ctx
await services.indexing(db).indexHandle(msg.did, msg.time, true)
await this.indexingSvc.indexHandle(msg.did, msg.time, true)
}

private async handleTombstone(msg: message.Tombstone) {
const { db, services } = this.ctx
await services.indexing(db).tombstoneActor(msg.did)
await this.indexingSvc.tombstoneActor(msg.did)
}

private async handleCursor(seq: number) {
Expand Down

0 comments on commit d7f8741

Please sign in to comment.