diff --git a/lexicons/app/bsky/feed/defs.json b/lexicons/app/bsky/feed/defs.json index 7a9fcf5e68f..10f2812ce24 100644 --- a/lexicons/app/bsky/feed/defs.json +++ b/lexicons/app/bsky/feed/defs.json @@ -30,7 +30,8 @@ "labels": { "type": "array", "items": { "type": "ref", "ref": "com.atproto.label.defs#label" } - } + }, + "threadgate": { "type": "ref", "ref": "#threadgateView" } } }, "viewerState": { @@ -86,7 +87,8 @@ "type": "union", "refs": ["#threadViewPost", "#notFoundPost", "#blockedPost"] } - } + }, + "viewer": { "type": "ref", "ref": "#viewerThreadState" } } }, "notFoundPost": { @@ -114,6 +116,12 @@ "viewer": { "type": "ref", "ref": "app.bsky.actor.defs#viewerState" } } }, + "viewerThreadState": { + "type": "object", + "properties": { + "canReply": { "type": "boolean" } + } + }, "generatorView": { "type": "object", "required": ["uri", "cid", "did", "creator", "displayName", "indexedAt"], @@ -158,6 +166,18 @@ "properties": { "repost": { "type": "string", "format": "at-uri" } } + }, + "threadgateView": { + "type": "object", + "properties": { + "uri": { "type": "string", "format": "at-uri" }, + "cid": { "type": "string", "format": "cid" }, + "record": { "type": "unknown" }, + "lists": { + "type": "array", + "items": { "type": "ref", "ref": "app.bsky.graph.defs#listViewBasic" } + } + } } } } diff --git a/lexicons/app/bsky/feed/threadgate.json b/lexicons/app/bsky/feed/threadgate.json new file mode 100644 index 00000000000..aa2262174d1 --- /dev/null +++ b/lexicons/app/bsky/feed/threadgate.json @@ -0,0 +1,43 @@ +{ + "lexicon": 1, + "id": "app.bsky.feed.threadgate", + "defs": { + "main": { + "type": "record", + "key": "tid", + "description": "Defines interaction gating rules for a thread. The rkey of the threadgate record should match the rkey of the thread's root post.", + "record": { + "type": "object", + "required": ["post", "createdAt"], + "properties": { + "post": { "type": "string", "format": "at-uri" }, + "allow": { + "type": "array", + "maxLength": 5, + "items": { + "type": "union", + "refs": ["#mentionRule", "#followingRule", "#listRule"] + } + }, + "createdAt": { "type": "string", "format": "datetime" } + } + } + }, + "mentionRule": { + "type": "object", + "description": "Allow replies from actors mentioned in your post." + }, + "followingRule": { + "type": "object", + "description": "Allow replies from actors you follow." + }, + "listRule": { + "type": "object", + "description": "Allow replies from actors on a list.", + "required": ["list"], + "properties": { + "list": { "type": "string", "format": "at-uri" } + } + } + } +} diff --git a/packages/api/src/client/index.ts b/packages/api/src/client/index.ts index 8a32bef870a..d72fe659e50 100644 --- a/packages/api/src/client/index.ts +++ b/packages/api/src/client/index.ts @@ -103,6 +103,7 @@ import * as AppBskyFeedGetTimeline from './types/app/bsky/feed/getTimeline' import * as AppBskyFeedLike from './types/app/bsky/feed/like' import * as AppBskyFeedPost from './types/app/bsky/feed/post' import * as AppBskyFeedRepost from './types/app/bsky/feed/repost' +import * as AppBskyFeedThreadgate from './types/app/bsky/feed/threadgate' import * as AppBskyGraphBlock from './types/app/bsky/graph/block' import * as AppBskyGraphDefs from './types/app/bsky/graph/defs' import * as AppBskyGraphFollow from './types/app/bsky/graph/follow' @@ -228,6 +229,7 @@ export * as AppBskyFeedGetTimeline from './types/app/bsky/feed/getTimeline' export * as AppBskyFeedLike from './types/app/bsky/feed/like' export * as AppBskyFeedPost from './types/app/bsky/feed/post' export * as AppBskyFeedRepost from './types/app/bsky/feed/repost' +export * as AppBskyFeedThreadgate from './types/app/bsky/feed/threadgate' export * as AppBskyGraphBlock from './types/app/bsky/graph/block' export * as AppBskyGraphDefs from './types/app/bsky/graph/defs' export * as AppBskyGraphFollow from './types/app/bsky/graph/follow' @@ -1204,6 +1206,7 @@ export class FeedNS { like: LikeRecord post: PostRecord repost: RepostRecord + threadgate: ThreadgateRecord constructor(service: AtpServiceClient) { this._service = service @@ -1211,6 +1214,7 @@ export class FeedNS { this.like = new LikeRecord(service) this.post = new PostRecord(service) this.repost = new RepostRecord(service) + this.threadgate = new ThreadgateRecord(service) } describeFeedGenerator( @@ -1623,6 +1627,71 @@ export class RepostRecord { } } +export class ThreadgateRecord { + _service: AtpServiceClient + + constructor(service: AtpServiceClient) { + this._service = service + } + + async list( + params: Omit, + ): Promise<{ + cursor?: string + records: { uri: string; value: AppBskyFeedThreadgate.Record }[] + }> { + const res = await this._service.xrpc.call('com.atproto.repo.listRecords', { + collection: 'app.bsky.feed.threadgate', + ...params, + }) + return res.data + } + + async get( + params: Omit, + ): Promise<{ + uri: string + cid: string + value: AppBskyFeedThreadgate.Record + }> { + const res = await this._service.xrpc.call('com.atproto.repo.getRecord', { + collection: 'app.bsky.feed.threadgate', + ...params, + }) + return res.data + } + + async create( + params: Omit< + ComAtprotoRepoCreateRecord.InputSchema, + 'collection' | 'record' + >, + record: AppBskyFeedThreadgate.Record, + headers?: Record, + ): Promise<{ uri: string; cid: string }> { + record.$type = 'app.bsky.feed.threadgate' + const res = await this._service.xrpc.call( + 'com.atproto.repo.createRecord', + undefined, + { collection: 'app.bsky.feed.threadgate', ...params, record }, + { encoding: 'application/json', headers }, + ) + return res.data + } + + async delete( + params: Omit, + headers?: Record, + ): Promise { + await this._service.xrpc.call( + 'com.atproto.repo.deleteRecord', + undefined, + { collection: 'app.bsky.feed.threadgate', ...params }, + { headers }, + ) + } +} + export class GraphNS { _service: AtpServiceClient block: BlockRecord diff --git a/packages/api/src/client/lexicons.ts b/packages/api/src/client/lexicons.ts index f1fd2519d74..2ca983aec4b 100644 --- a/packages/api/src/client/lexicons.ts +++ b/packages/api/src/client/lexicons.ts @@ -4390,6 +4390,10 @@ export const schemaDict = { ref: 'lex:com.atproto.label.defs#label', }, }, + threadgate: { + type: 'ref', + ref: 'lex:app.bsky.feed.defs#threadgateView', + }, }, }, viewerState: { @@ -4486,6 +4490,10 @@ export const schemaDict = { ], }, }, + viewer: { + type: 'ref', + ref: 'lex:app.bsky.feed.defs#viewerThreadState', + }, }, }, notFoundPost: { @@ -4534,6 +4542,14 @@ export const schemaDict = { }, }, }, + viewerThreadState: { + type: 'object', + properties: { + canReply: { + type: 'boolean', + }, + }, + }, generatorView: { type: 'object', required: ['uri', 'cid', 'did', 'creator', 'displayName', 'indexedAt'], @@ -4619,6 +4635,29 @@ export const schemaDict = { }, }, }, + threadgateView: { + type: 'object', + properties: { + uri: { + type: 'string', + format: 'at-uri', + }, + cid: { + type: 'string', + format: 'cid', + }, + record: { + type: 'unknown', + }, + lists: { + type: 'array', + items: { + type: 'ref', + ref: 'lex:app.bsky.graph.defs#listViewBasic', + }, + }, + }, + }, }, }, AppBskyFeedDescribeFeedGenerator: { @@ -5615,6 +5654,63 @@ export const schemaDict = { }, }, }, + AppBskyFeedThreadgate: { + lexicon: 1, + id: 'app.bsky.feed.threadgate', + defs: { + main: { + type: 'record', + key: 'tid', + description: + "Defines interaction gating rules for a thread. The rkey of the threadgate record should match the rkey of the thread's root post.", + record: { + type: 'object', + required: ['post', 'createdAt'], + properties: { + post: { + type: 'string', + format: 'at-uri', + }, + allow: { + type: 'array', + maxLength: 5, + items: { + type: 'union', + refs: [ + 'lex:app.bsky.feed.threadgate#mentionRule', + 'lex:app.bsky.feed.threadgate#followingRule', + 'lex:app.bsky.feed.threadgate#listRule', + ], + }, + }, + createdAt: { + type: 'string', + format: 'datetime', + }, + }, + }, + }, + mentionRule: { + type: 'object', + description: 'Allow replies from actors mentioned in your post.', + }, + followingRule: { + type: 'object', + description: 'Allow replies from actors you follow.', + }, + listRule: { + type: 'object', + description: 'Allow replies from actors on a list.', + required: ['list'], + properties: { + list: { + type: 'string', + format: 'at-uri', + }, + }, + }, + }, + }, AppBskyGraphBlock: { lexicon: 1, id: 'app.bsky.graph.block', @@ -6932,6 +7028,7 @@ export const ids = { AppBskyFeedLike: 'app.bsky.feed.like', AppBskyFeedPost: 'app.bsky.feed.post', AppBskyFeedRepost: 'app.bsky.feed.repost', + AppBskyFeedThreadgate: 'app.bsky.feed.threadgate', AppBskyGraphBlock: 'app.bsky.graph.block', AppBskyGraphDefs: 'app.bsky.graph.defs', AppBskyGraphFollow: 'app.bsky.graph.follow', diff --git a/packages/api/src/client/types/app/bsky/feed/defs.ts b/packages/api/src/client/types/app/bsky/feed/defs.ts index 1270dab250b..944fd34b072 100644 --- a/packages/api/src/client/types/app/bsky/feed/defs.ts +++ b/packages/api/src/client/types/app/bsky/feed/defs.ts @@ -12,6 +12,7 @@ import * as AppBskyEmbedRecord from '../embed/record' import * as AppBskyEmbedRecordWithMedia from '../embed/recordWithMedia' import * as ComAtprotoLabelDefs from '../../../com/atproto/label/defs' import * as AppBskyRichtextFacet from '../richtext/facet' +import * as AppBskyGraphDefs from '../graph/defs' export interface PostView { uri: string @@ -30,6 +31,7 @@ export interface PostView { indexedAt: string viewer?: ViewerState labels?: ComAtprotoLabelDefs.Label[] + threadgate?: ThreadgateView [k: string]: unknown } @@ -135,6 +137,7 @@ export interface ThreadViewPost { | BlockedPost | { $type: string; [k: string]: unknown } )[] + viewer?: ViewerThreadState [k: string]: unknown } @@ -205,6 +208,23 @@ export function validateBlockedAuthor(v: unknown): ValidationResult { return lexicons.validate('app.bsky.feed.defs#blockedAuthor', v) } +export interface ViewerThreadState { + canReply?: boolean + [k: string]: unknown +} + +export function isViewerThreadState(v: unknown): v is ViewerThreadState { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'app.bsky.feed.defs#viewerThreadState' + ) +} + +export function validateViewerThreadState(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.defs#viewerThreadState', v) +} + export interface GeneratorView { uri: string cid: string @@ -283,3 +303,23 @@ export function isSkeletonReasonRepost(v: unknown): v is SkeletonReasonRepost { export function validateSkeletonReasonRepost(v: unknown): ValidationResult { return lexicons.validate('app.bsky.feed.defs#skeletonReasonRepost', v) } + +export interface ThreadgateView { + uri?: string + cid?: string + record?: {} + lists?: AppBskyGraphDefs.ListViewBasic[] + [k: string]: unknown +} + +export function isThreadgateView(v: unknown): v is ThreadgateView { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'app.bsky.feed.defs#threadgateView' + ) +} + +export function validateThreadgateView(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.defs#threadgateView', v) +} diff --git a/packages/api/src/client/types/app/bsky/feed/threadgate.ts b/packages/api/src/client/types/app/bsky/feed/threadgate.ts new file mode 100644 index 00000000000..20e0a62eb3a --- /dev/null +++ b/packages/api/src/client/types/app/bsky/feed/threadgate.ts @@ -0,0 +1,80 @@ +/** + * GENERATED CODE - DO NOT MODIFY + */ +import { ValidationResult, BlobRef } from '@atproto/lexicon' +import { isObj, hasProp } from '../../../../util' +import { lexicons } from '../../../../lexicons' +import { CID } from 'multiformats/cid' + +export interface Record { + post: string + allow?: ( + | MentionRule + | FollowingRule + | ListRule + | { $type: string; [k: string]: unknown } + )[] + createdAt: string + [k: string]: unknown +} + +export function isRecord(v: unknown): v is Record { + return ( + isObj(v) && + hasProp(v, '$type') && + (v.$type === 'app.bsky.feed.threadgate#main' || + v.$type === 'app.bsky.feed.threadgate') + ) +} + +export function validateRecord(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.threadgate#main', v) +} + +/** Allow replies from actors mentioned in your post. */ +export interface MentionRule {} + +export function isMentionRule(v: unknown): v is MentionRule { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'app.bsky.feed.threadgate#mentionRule' + ) +} + +export function validateMentionRule(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.threadgate#mentionRule', v) +} + +/** Allow replies from actors you follow. */ +export interface FollowingRule {} + +export function isFollowingRule(v: unknown): v is FollowingRule { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'app.bsky.feed.threadgate#followingRule' + ) +} + +export function validateFollowingRule(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.threadgate#followingRule', v) +} + +/** Allow replies from actors on a list. */ +export interface ListRule { + list: string + [k: string]: unknown +} + +export function isListRule(v: unknown): v is ListRule { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'app.bsky.feed.threadgate#listRule' + ) +} + +export function validateListRule(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.threadgate#listRule', v) +} diff --git a/packages/bsky/src/api/app/bsky/feed/getPostThread.ts b/packages/bsky/src/api/app/bsky/feed/getPostThread.ts index 2d10ff98006..9292f0d5d99 100644 --- a/packages/bsky/src/api/app/bsky/feed/getPostThread.ts +++ b/packages/bsky/src/api/app/bsky/feed/getPostThread.ts @@ -1,26 +1,33 @@ import { InvalidRequestError } from '@atproto/xrpc-server' +import { AtUri } from '@atproto/syntax' import { Server } from '../../../../lexicon' import { BlockedPost, NotFoundPost, ThreadViewPost, isNotFoundPost, + isThreadViewPost, } from '../../../../lexicon/types/app/bsky/feed/defs' +import { Record as PostRecord } from '../../../../lexicon/types/app/bsky/feed/post' +import { Record as ThreadgateRecord } from '../../../../lexicon/types/app/bsky/feed/threadgate' import { QueryParams } from '../../../../lexicon/types/app/bsky/feed/getPostThread' import AppContext from '../../../../context' import { FeedService, FeedRow, FeedHydrationState, + PostInfo, } from '../../../../services/feed' import { getAncestorsAndSelfQb, getDescendentsQb, } from '../../../../services/util/post' import { Database } from '../../../../db' +import DatabaseSchema from '../../../../db/database-schema' import { setRepoRev } from '../../../util' -import { createPipeline, noRules } from '../../../../pipeline' import { ActorInfoMap, ActorService } from '../../../../services/actor' +import { violatesThreadGate } from '../../../../services/feed/util' +import { createPipeline, noRules } from '../../../../pipeline' export default function (server: Server, ctx: AppContext) { const getPostThread = createPipeline( @@ -73,7 +80,21 @@ const hydration = async (state: SkeletonState, ctx: Context) => { } = state const relevant = getRelevantIds(threadData) const hydrated = await feedService.feedHydration({ ...relevant, viewer }) - return { ...state, ...hydrated } + // check root reply interaction rules + const anchorPostUri = threadData.post.postUri + const rootUri = threadData.post.replyRoot || anchorPostUri + const anchor = hydrated.posts[anchorPostUri] + const root = hydrated.posts[rootUri] + const gate = hydrated.threadgates[rootUri]?.record + const viewerCanReply = await checkViewerCanReply( + ctx.db.db, + anchor ?? null, + viewer, + new AtUri(rootUri).host, + (root?.record ?? null) as PostRecord | null, + gate ?? null, + ) + return { ...state, ...hydrated, viewerCanReply } } const presentation = (state: HydrationState, ctx: Context) => { @@ -89,6 +110,9 @@ const presentation = (state: HydrationState, ctx: Context) => { // @TODO technically this could be returned as a NotFoundPost based on lexicon throw new InvalidRequestError(`Post not found: ${params.uri}`, 'NotFound') } + if (isThreadViewPost(thread) && params.viewer) { + thread.viewer = { canReply: state.viewerCanReply } + } return { thread } } @@ -99,17 +123,27 @@ const composeThread = ( ctx: Context, ) => { const { feedService } = ctx - const { posts, embeds, blocks, labels } = state + const { posts, threadgates, embeds, blocks, labels, lists } = state const post = feedService.views.formatPostView( threadData.post.postUri, actors, posts, + threadgates, embeds, labels, + lists, ) - if (!post || blocks[post.uri]?.reply) { + // replies that are invalid due to reply-gating: + // a. may appear as the anchor post, but without any parent or replies. + // b. may not appear anywhere else in the thread. + const isAnchorPost = state.threadData.post.uri === threadData.post.postUri + const info = posts[threadData.post.postUri] + const badReply = !!info?.invalidReplyRoot || !!info?.violatesThreadGate + const omitBadReply = !isAnchorPost && badReply + + if (!post || blocks[post.uri]?.reply || omitBadReply) { return { $type: 'app.bsky.feed.defs#notFoundPost', uri: threadData.post.postUri, @@ -135,7 +169,7 @@ const composeThread = ( } let parent - if (threadData.parent) { + if (threadData.parent && !badReply) { if (threadData.parent instanceof ParentNotFoundError) { parent = { $type: 'app.bsky.feed.defs#notFoundPost', @@ -148,7 +182,7 @@ const composeThread = ( } let replies: (ThreadViewPost | NotFoundPost | BlockedPost)[] | undefined - if (threadData.replies) { + if (threadData.replies && !badReply) { replies = threadData.replies.flatMap((reply) => { const thread = composeThread(reply, actors, state, ctx) // e.g. don't bother including #postNotFound reply placeholders for takedowns. either way matches api contract. @@ -184,6 +218,10 @@ const getRelevantIds = ( } dids.add(thread.post.postAuthorDid) uris.add(thread.post.postUri) + if (thread.post.replyRoot) { + // ensure root is included for checking interactions + uris.add(thread.post.replyRoot) + } return { dids, uris } } @@ -265,6 +303,26 @@ const getChildrenData = ( })) } +const checkViewerCanReply = async ( + db: DatabaseSchema, + anchor: PostInfo | null, + viewer: string | null, + owner: string, + root: PostRecord | null, + threadgate: ThreadgateRecord | null, +) => { + if (!viewer) return false + if (anchor?.invalidReplyRoot || anchor?.violatesThreadGate) return false + const viewerViolatesThreadGate = await violatesThreadGate( + db, + viewer, + owner, + root, + threadgate, + ) + return !viewerViolatesThreadGate +} + class ParentNotFoundError extends Error { constructor(public uri: string) { super(`Parent not found: ${uri}`) @@ -290,4 +348,7 @@ type SkeletonState = { threadData: PostThread } -type HydrationState = SkeletonState & FeedHydrationState +type HydrationState = SkeletonState & + FeedHydrationState & { + viewerCanReply: boolean + } diff --git a/packages/bsky/src/api/app/bsky/feed/getPosts.ts b/packages/bsky/src/api/app/bsky/feed/getPosts.ts index fc35b203034..90268e5f161 100644 --- a/packages/bsky/src/api/app/bsky/feed/getPosts.ts +++ b/packages/bsky/src/api/app/bsky/feed/getPosts.ts @@ -72,8 +72,10 @@ const presentation = (state: HydrationState, ctx: Context) => { uri, actors, state.posts, + state.threadgates, state.embeds, state.labels, + state.lists, ) return postView ?? SKIP }) diff --git a/packages/bsky/src/db/database-schema.ts b/packages/bsky/src/db/database-schema.ts index adb8c088207..e43ade819e6 100644 --- a/packages/bsky/src/db/database-schema.ts +++ b/packages/bsky/src/db/database-schema.ts @@ -6,6 +6,7 @@ import * as post from './tables/post' import * as postEmbed from './tables/post-embed' import * as postAgg from './tables/post-agg' import * as repost from './tables/repost' +import * as threadGate from './tables/thread-gate' import * as feedItem from './tables/feed-item' import * as follow from './tables/follow' import * as like from './tables/like' @@ -38,6 +39,7 @@ export type DatabaseSchemaType = duplicateRecord.PartialDB & postEmbed.PartialDB & postAgg.PartialDB & repost.PartialDB & + threadGate.PartialDB & feedItem.PartialDB & follow.PartialDB & like.PartialDB & diff --git a/packages/bsky/src/db/migrations/20230906T222220386Z-thread-gating.ts b/packages/bsky/src/db/migrations/20230906T222220386Z-thread-gating.ts new file mode 100644 index 00000000000..42296aaccf9 --- /dev/null +++ b/packages/bsky/src/db/migrations/20230906T222220386Z-thread-gating.ts @@ -0,0 +1,31 @@ +import { Kysely } from 'kysely' + +export async function up(db: Kysely): Promise { + await db.schema + .createTable('thread_gate') + .addColumn('uri', 'varchar', (col) => col.primaryKey()) + .addColumn('cid', 'varchar', (col) => col.notNull()) + .addColumn('creator', 'varchar', (col) => col.notNull()) + .addColumn('postUri', 'varchar', (col) => col.notNull().unique()) + .addColumn('createdAt', 'varchar', (col) => col.notNull()) + .addColumn('indexedAt', 'varchar', (col) => col.notNull()) + .execute() + await db.schema + .alterTable('post') + .addColumn('invalidReplyRoot', 'boolean', (col) => + col.notNull().defaultTo(false), + ) + .execute() + await db.schema + .alterTable('post') + .addColumn('violatesThreadGate', 'boolean', (col) => + col.notNull().defaultTo(false), + ) + .execute() +} + +export async function down(db: Kysely): Promise { + await db.schema.dropTable('thread_gate').execute() + await db.schema.alterTable('post').dropColumn('invalidReplyRoot').execute() + await db.schema.alterTable('post').dropColumn('violatesThreadGate').execute() +} diff --git a/packages/bsky/src/db/migrations/index.ts b/packages/bsky/src/db/migrations/index.ts index 505f7c84909..bf18d8dd15b 100644 --- a/packages/bsky/src/db/migrations/index.ts +++ b/packages/bsky/src/db/migrations/index.ts @@ -27,3 +27,4 @@ export * as _20230810T203349843Z from './20230810T203349843Z-action-duration' export * as _20230817T195936007Z from './20230817T195936007Z-native-notifications' export * as _20230830T205507322Z from './20230830T205507322Z-suggested-feeds' export * as _20230904T211011773Z from './20230904T211011773Z-block-lists' +export * as _20230906T222220386Z from './20230906T222220386Z-thread-gating' diff --git a/packages/bsky/src/db/tables/post.ts b/packages/bsky/src/db/tables/post.ts index f878b8dab33..d70a75912a5 100644 --- a/packages/bsky/src/db/tables/post.ts +++ b/packages/bsky/src/db/tables/post.ts @@ -1,4 +1,4 @@ -import { GeneratedAlways } from 'kysely' +import { Generated, GeneratedAlways } from 'kysely' export const tableName = 'post' @@ -12,6 +12,8 @@ export interface Post { replyParent: string | null replyParentCid: string | null langs: string[] | null + invalidReplyRoot: Generated + violatesThreadGate: Generated createdAt: string indexedAt: string sortAt: GeneratedAlways diff --git a/packages/bsky/src/db/tables/thread-gate.ts b/packages/bsky/src/db/tables/thread-gate.ts new file mode 100644 index 00000000000..327ee7e41c6 --- /dev/null +++ b/packages/bsky/src/db/tables/thread-gate.ts @@ -0,0 +1,12 @@ +const tableName = 'thread_gate' + +export interface ThreadGate { + uri: string + cid: string + creator: string + postUri: string + createdAt: string + indexedAt: string +} + +export type PartialDB = { [tableName]: ThreadGate } diff --git a/packages/bsky/src/lexicon/lexicons.ts b/packages/bsky/src/lexicon/lexicons.ts index f1fd2519d74..2ca983aec4b 100644 --- a/packages/bsky/src/lexicon/lexicons.ts +++ b/packages/bsky/src/lexicon/lexicons.ts @@ -4390,6 +4390,10 @@ export const schemaDict = { ref: 'lex:com.atproto.label.defs#label', }, }, + threadgate: { + type: 'ref', + ref: 'lex:app.bsky.feed.defs#threadgateView', + }, }, }, viewerState: { @@ -4486,6 +4490,10 @@ export const schemaDict = { ], }, }, + viewer: { + type: 'ref', + ref: 'lex:app.bsky.feed.defs#viewerThreadState', + }, }, }, notFoundPost: { @@ -4534,6 +4542,14 @@ export const schemaDict = { }, }, }, + viewerThreadState: { + type: 'object', + properties: { + canReply: { + type: 'boolean', + }, + }, + }, generatorView: { type: 'object', required: ['uri', 'cid', 'did', 'creator', 'displayName', 'indexedAt'], @@ -4619,6 +4635,29 @@ export const schemaDict = { }, }, }, + threadgateView: { + type: 'object', + properties: { + uri: { + type: 'string', + format: 'at-uri', + }, + cid: { + type: 'string', + format: 'cid', + }, + record: { + type: 'unknown', + }, + lists: { + type: 'array', + items: { + type: 'ref', + ref: 'lex:app.bsky.graph.defs#listViewBasic', + }, + }, + }, + }, }, }, AppBskyFeedDescribeFeedGenerator: { @@ -5615,6 +5654,63 @@ export const schemaDict = { }, }, }, + AppBskyFeedThreadgate: { + lexicon: 1, + id: 'app.bsky.feed.threadgate', + defs: { + main: { + type: 'record', + key: 'tid', + description: + "Defines interaction gating rules for a thread. The rkey of the threadgate record should match the rkey of the thread's root post.", + record: { + type: 'object', + required: ['post', 'createdAt'], + properties: { + post: { + type: 'string', + format: 'at-uri', + }, + allow: { + type: 'array', + maxLength: 5, + items: { + type: 'union', + refs: [ + 'lex:app.bsky.feed.threadgate#mentionRule', + 'lex:app.bsky.feed.threadgate#followingRule', + 'lex:app.bsky.feed.threadgate#listRule', + ], + }, + }, + createdAt: { + type: 'string', + format: 'datetime', + }, + }, + }, + }, + mentionRule: { + type: 'object', + description: 'Allow replies from actors mentioned in your post.', + }, + followingRule: { + type: 'object', + description: 'Allow replies from actors you follow.', + }, + listRule: { + type: 'object', + description: 'Allow replies from actors on a list.', + required: ['list'], + properties: { + list: { + type: 'string', + format: 'at-uri', + }, + }, + }, + }, + }, AppBskyGraphBlock: { lexicon: 1, id: 'app.bsky.graph.block', @@ -6932,6 +7028,7 @@ export const ids = { AppBskyFeedLike: 'app.bsky.feed.like', AppBskyFeedPost: 'app.bsky.feed.post', AppBskyFeedRepost: 'app.bsky.feed.repost', + AppBskyFeedThreadgate: 'app.bsky.feed.threadgate', AppBskyGraphBlock: 'app.bsky.graph.block', AppBskyGraphDefs: 'app.bsky.graph.defs', AppBskyGraphFollow: 'app.bsky.graph.follow', diff --git a/packages/bsky/src/lexicon/types/app/bsky/feed/defs.ts b/packages/bsky/src/lexicon/types/app/bsky/feed/defs.ts index 463445fbd49..08d34d88ebb 100644 --- a/packages/bsky/src/lexicon/types/app/bsky/feed/defs.ts +++ b/packages/bsky/src/lexicon/types/app/bsky/feed/defs.ts @@ -12,6 +12,7 @@ import * as AppBskyEmbedRecord from '../embed/record' import * as AppBskyEmbedRecordWithMedia from '../embed/recordWithMedia' import * as ComAtprotoLabelDefs from '../../../com/atproto/label/defs' import * as AppBskyRichtextFacet from '../richtext/facet' +import * as AppBskyGraphDefs from '../graph/defs' export interface PostView { uri: string @@ -30,6 +31,7 @@ export interface PostView { indexedAt: string viewer?: ViewerState labels?: ComAtprotoLabelDefs.Label[] + threadgate?: ThreadgateView [k: string]: unknown } @@ -135,6 +137,7 @@ export interface ThreadViewPost { | BlockedPost | { $type: string; [k: string]: unknown } )[] + viewer?: ViewerThreadState [k: string]: unknown } @@ -205,6 +208,23 @@ export function validateBlockedAuthor(v: unknown): ValidationResult { return lexicons.validate('app.bsky.feed.defs#blockedAuthor', v) } +export interface ViewerThreadState { + canReply?: boolean + [k: string]: unknown +} + +export function isViewerThreadState(v: unknown): v is ViewerThreadState { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'app.bsky.feed.defs#viewerThreadState' + ) +} + +export function validateViewerThreadState(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.defs#viewerThreadState', v) +} + export interface GeneratorView { uri: string cid: string @@ -283,3 +303,23 @@ export function isSkeletonReasonRepost(v: unknown): v is SkeletonReasonRepost { export function validateSkeletonReasonRepost(v: unknown): ValidationResult { return lexicons.validate('app.bsky.feed.defs#skeletonReasonRepost', v) } + +export interface ThreadgateView { + uri?: string + cid?: string + record?: {} + lists?: AppBskyGraphDefs.ListViewBasic[] + [k: string]: unknown +} + +export function isThreadgateView(v: unknown): v is ThreadgateView { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'app.bsky.feed.defs#threadgateView' + ) +} + +export function validateThreadgateView(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.defs#threadgateView', v) +} diff --git a/packages/bsky/src/lexicon/types/app/bsky/feed/threadgate.ts b/packages/bsky/src/lexicon/types/app/bsky/feed/threadgate.ts new file mode 100644 index 00000000000..51f9f8e9af1 --- /dev/null +++ b/packages/bsky/src/lexicon/types/app/bsky/feed/threadgate.ts @@ -0,0 +1,80 @@ +/** + * GENERATED CODE - DO NOT MODIFY + */ +import { ValidationResult, BlobRef } from '@atproto/lexicon' +import { lexicons } from '../../../../lexicons' +import { isObj, hasProp } from '../../../../util' +import { CID } from 'multiformats/cid' + +export interface Record { + post: string + allow?: ( + | MentionRule + | FollowingRule + | ListRule + | { $type: string; [k: string]: unknown } + )[] + createdAt: string + [k: string]: unknown +} + +export function isRecord(v: unknown): v is Record { + return ( + isObj(v) && + hasProp(v, '$type') && + (v.$type === 'app.bsky.feed.threadgate#main' || + v.$type === 'app.bsky.feed.threadgate') + ) +} + +export function validateRecord(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.threadgate#main', v) +} + +/** Allow replies from actors mentioned in your post. */ +export interface MentionRule {} + +export function isMentionRule(v: unknown): v is MentionRule { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'app.bsky.feed.threadgate#mentionRule' + ) +} + +export function validateMentionRule(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.threadgate#mentionRule', v) +} + +/** Allow replies from actors you follow. */ +export interface FollowingRule {} + +export function isFollowingRule(v: unknown): v is FollowingRule { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'app.bsky.feed.threadgate#followingRule' + ) +} + +export function validateFollowingRule(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.threadgate#followingRule', v) +} + +/** Allow replies from actors on a list. */ +export interface ListRule { + list: string + [k: string]: unknown +} + +export function isListRule(v: unknown): v is ListRule { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'app.bsky.feed.threadgate#listRule' + ) +} + +export function validateListRule(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.threadgate#listRule', v) +} diff --git a/packages/bsky/src/services/feed/index.ts b/packages/bsky/src/services/feed/index.ts index db32f1971bc..e5ee2d1c8db 100644 --- a/packages/bsky/src/services/feed/index.ts +++ b/packages/bsky/src/services/feed/index.ts @@ -9,6 +9,10 @@ import { Record as PostRecord, isRecord as isPostRecord, } from '../../lexicon/types/app/bsky/feed/post' +import { + Record as ThreadgateRecord, + isListRule, +} from '../../lexicon/types/app/bsky/feed/threadgate' import { isMain as isEmbedImages } from '../../lexicon/types/app/bsky/embed/images' import { isMain as isEmbedExternal } from '../../lexicon/types/app/bsky/embed/external' import { @@ -27,12 +31,19 @@ import { RecordEmbedViewRecord, PostBlocksMap, FeedHydrationState, + ThreadgateInfoMap, } from './types' import { LabelService } from '../label' import { ActorService } from '../actor' -import { BlockAndMuteState, GraphService, RelationshipPair } from '../graph' +import { + BlockAndMuteState, + GraphService, + ListInfoMap, + RelationshipPair, +} from '../graph' import { FeedViews } from './views' import { LabelCache } from '../../label-cache' +import { threadgateToPostUri, postToThreadgateUri } from './util' export * from './types' @@ -130,6 +141,8 @@ export class FeedService { 'post.cid as cid', 'post.creator as creator', 'post.sortAt as indexedAt', + 'post.invalidReplyRoot as invalidReplyRoot', + 'post.violatesThreadGate as violatesThreadGate', 'record.json as recordJson', 'post_agg.likeCount as likeCount', 'post_agg.repostCount as repostCount', @@ -152,11 +165,8 @@ export class FeedService { .execute() return posts.reduce((acc, cur) => { const { recordJson, ...post } = cur - const info: PostInfo = { - ...post, - record: jsonStringToLex(recordJson) as Record, - viewer, - } + const record = jsonStringToLex(recordJson) as PostRecord + const info: PostInfo = { ...post, record, viewer } return Object.assign(acc, { [post.uri]: info }) }, {} as PostInfoMap) } @@ -216,31 +226,35 @@ export class FeedService { depth = 0, ): Promise { const { viewer, dids, uris } = refs - const [posts, labels, bam] = await Promise.all([ + const [posts, threadgates, labels, bam] = await Promise.all([ this.getPostInfos(Array.from(uris), viewer), + this.threadgatesByPostUri(Array.from(uris)), this.services.label.getLabelsForSubjects([...uris, ...dids]), this.services.graph.getBlockAndMuteState( viewer ? [...dids].map((did) => [viewer, did]) : [], ), ]) + // profileState for labels and bam handled above, profileHydration() shouldn't fetch additional - const [profileState, blocks] = await Promise.all([ + const [profileState, blocks, lists] = await Promise.all([ this.services.actor.views.profileHydration( Array.from(dids), { viewer }, { bam, labels }, ), this.blocksForPosts(posts, bam), + this.listsForThreadgates(threadgates, viewer), ]) const embeds = await this.embedsForPosts(posts, blocks, viewer, depth) return { posts, + threadgates, blocks, embeds, labels, // includes info for profiles bam, // includes info for profiles profiles: profileState.profiles, - lists: profileState.lists, + lists: Object.assign(lists, profileState.lists), } } @@ -398,8 +412,10 @@ export class FeedService { uri, actorInfos, feedState.posts, + feedState.threadgates, feedState.embeds, feedState.labels, + feedState.lists, ) recordEmbedViews[uri] = this.views.getRecordEmbedView( uri, @@ -416,6 +432,39 @@ export class FeedService { } return recordEmbedViews } + + async threadgatesByPostUri(postUris: string[]): Promise { + const gates = postUris.length + ? await this.db.db + .selectFrom('record') + .where('uri', 'in', postUris.map(postToThreadgateUri)) + .select(['uri', 'cid', 'json']) + .execute() + : [] + const gatesByPostUri = gates.reduce((acc, gate) => { + const record = jsonStringToLex(gate.json) as ThreadgateRecord + const postUri = threadgateToPostUri(gate.uri) + if (record.post !== postUri) return acc // invalid, skip + acc[postUri] = { uri: gate.uri, cid: gate.cid, record } + return acc + }, {} as ThreadgateInfoMap) + return gatesByPostUri + } + + listsForThreadgates( + threadgates: ThreadgateInfoMap, + viewer: string | null, + ): Promise { + const listsUris = new Set() + Object.values(threadgates).forEach((gate) => { + gate?.record.allow?.forEach((rule) => { + if (isListRule(rule)) { + listsUris.add(rule.list) + } + }) + }) + return this.services.graph.getListViews([...listsUris], viewer) + } } const postRecordsFromInfos = ( diff --git a/packages/bsky/src/services/feed/types.ts b/packages/bsky/src/services/feed/types.ts index 894ee0a564f..8d4bd67f6bb 100644 --- a/packages/bsky/src/services/feed/types.ts +++ b/packages/bsky/src/services/feed/types.ts @@ -1,4 +1,5 @@ import { Selectable } from 'kysely' +import { Record as ThreadgateRecord } from '../../lexicon/types/app/bsky/feed/threadgate' import { View as ImagesEmbedView } from '../../lexicon/types/app/bsky/embed/images' import { View as ExternalEmbedView } from '../../lexicon/types/app/bsky/embed/external' import { @@ -41,6 +42,8 @@ export type PostInfo = { replyCount: number | null requesterRepost: string | null requesterLike: string | null + invalidReplyRoot: boolean + violatesThreadGate: boolean viewer: string | null } @@ -50,6 +53,16 @@ export type PostBlocksMap = { [uri: string]: { reply?: boolean; embed?: boolean } } +export type ThreadgateInfo = { + uri: string + cid: string + record: ThreadgateRecord +} + +export type ThreadgateInfoMap = { + [postUri: string]: ThreadgateInfo +} + export type FeedGenInfo = Selectable & { likeCount: number viewer?: { @@ -86,6 +99,7 @@ export type RecordEmbedViewRecordMap = { [uri: string]: RecordEmbedViewRecord } export type FeedHydrationState = ProfileHydrationState & { posts: PostInfoMap + threadgates: ThreadgateInfoMap embeds: PostEmbedViews labels: Labels blocks: PostBlocksMap diff --git a/packages/bsky/src/services/feed/util.ts b/packages/bsky/src/services/feed/util.ts new file mode 100644 index 00000000000..b2e2ce8d92d --- /dev/null +++ b/packages/bsky/src/services/feed/util.ts @@ -0,0 +1,112 @@ +import { sql } from 'kysely' +import { AtUri } from '@atproto/syntax' +import { + Record as PostRecord, + ReplyRef, +} from '../../lexicon/types/app/bsky/feed/post' +import { + Record as GateRecord, + isFollowingRule, + isListRule, + isMentionRule, +} from '../../lexicon/types/app/bsky/feed/threadgate' +import { isMention } from '../../lexicon/types/app/bsky/richtext/facet' +import { valuesList } from '../../db/util' +import DatabaseSchema from '../../db/database-schema' +import { ids } from '../../lexicon/lexicons' + +export const invalidReplyRoot = ( + reply: ReplyRef, + parent: { + record: PostRecord + invalidReplyRoot: boolean | null + }, +) => { + const replyRoot = reply.root.uri + const replyParent = reply.parent.uri + // if parent is not a valid reply, transitively this is not a valid one either + if (parent.invalidReplyRoot) { + return true + } + // replying to root post: ensure the root looks correct + if (replyParent === replyRoot) { + return !!parent.record.reply + } + // replying to a reply: ensure the parent is a reply for the same root post + return parent.record.reply?.root.uri !== replyRoot +} + +export const violatesThreadGate = async ( + db: DatabaseSchema, + did: string, + owner: string, + root: PostRecord | null, + gate: GateRecord | null, +) => { + if (did === owner) return false + if (!gate?.allow) return false + + const allowMentions = gate.allow.find(isMentionRule) + const allowFollowing = gate.allow.find(isFollowingRule) + const allowListUris = gate.allow?.filter(isListRule).map((item) => item.list) + + // check mentions first since it's quick and synchronous + if (allowMentions) { + const isMentioned = root?.facets?.some((facet) => { + return facet.features.some((item) => isMention(item) && item.did === did) + }) + if (isMentioned) { + return false + } + } + + // check follows and list containment + if (!allowFollowing && !allowListUris.length) { + return true + } + const { ref } = db.dynamic + const nullResult = sql`${null}` + const check = await db + .selectFrom(valuesList([did]).as(sql`subject (did)`)) + .select([ + allowFollowing + ? db + .selectFrom('follow') + .where('creator', '=', owner) + .whereRef('subjectDid', '=', ref('subject.did')) + .select('creator') + .as('isFollowed') + : nullResult.as('isFollowed'), + allowListUris.length + ? db + .selectFrom('list_item') + .where('list_item.listUri', 'in', allowListUris) + .whereRef('list_item.subjectDid', '=', ref('subject.did')) + .limit(1) + .select('listUri') + .as('isInList') + : nullResult.as('isInList'), + ]) + .executeTakeFirst() + + if (allowFollowing && check?.isFollowed) { + return false + } + if (allowListUris.length && check?.isInList) { + return false + } + + return true +} + +export const postToThreadgateUri = (postUri: string) => { + const gateUri = new AtUri(postUri) + gateUri.collection = ids.AppBskyFeedThreadgate + return gateUri.toString() +} + +export const threadgateToPostUri = (gateUri: string) => { + const postUri = new AtUri(gateUri) + postUri.collection = ids.AppBskyFeedPost + return postUri.toString() +} diff --git a/packages/bsky/src/services/feed/views.ts b/packages/bsky/src/services/feed/views.ts index 439e68f3d1f..dc5878db6cd 100644 --- a/packages/bsky/src/services/feed/views.ts +++ b/packages/bsky/src/services/feed/views.ts @@ -1,9 +1,11 @@ +import { mapDefined } from '@atproto/common' import { Database } from '../../db' import { FeedViewPost, GeneratorView, PostView, } from '../../lexicon/types/app/bsky/feed/defs' +import { isListRule } from '../../lexicon/types/app/bsky/feed/threadgate' import { Main as EmbedImages, isMain as isEmbedImages, @@ -29,11 +31,14 @@ import { RecordEmbedViewRecord, PostBlocksMap, FeedHydrationState, + ThreadgateInfoMap, + ThreadgateInfo, } from './types' import { Labels, getSelfLabels } from '../label' import { ImageUriBuilder } from '../../image/uri' import { LabelCache } from '../../label-cache' import { ActorInfoMap, ActorService } from '../actor' +import { ListInfoMap, GraphService } from '../graph' export class FeedViews { constructor( @@ -48,6 +53,7 @@ export class FeedViews { services = { actor: ActorService.creator(this.imgUriBuilder, this.labelCache)(this.db), + graph: GraphService.creator(this.imgUriBuilder)(this.db), } formatFeedGeneratorView( @@ -90,7 +96,8 @@ export class FeedViews { usePostViewUnion?: boolean }, ): FeedViewPost[] { - const { posts, profiles, blocks, embeds, labels } = state + const { posts, threadgates, profiles, blocks, embeds, labels, lists } = + state const actors = this.services.actor.views.profileBasicPresentation( Object.keys(profiles), state, @@ -98,12 +105,15 @@ export class FeedViews { ) const feed: FeedViewPost[] = [] for (const item of items) { + const info = posts[item.postUri] const post = this.formatPostView( item.postUri, actors, posts, + threadgates, embeds, labels, + lists, ) // skip over not found & blocked posts if (!post || blocks[post.uri]?.reply) { @@ -123,13 +133,21 @@ export class FeedViews { } } } - if (item.replyParent && item.replyRoot) { + // posts that violate reply-gating may appear in feeds, but without any thread context + if ( + item.replyParent && + item.replyRoot && + !info?.invalidReplyRoot && + !info?.violatesThreadGate + ) { const replyParent = this.formatMaybePostView( item.replyParent, actors, posts, + threadgates, embeds, labels, + lists, blocks, opts, ) @@ -137,8 +155,10 @@ export class FeedViews { item.replyRoot, actors, posts, + threadgates, embeds, labels, + lists, blocks, opts, ) @@ -158,10 +178,13 @@ export class FeedViews { uri: string, actors: ActorInfoMap, posts: PostInfoMap, + threadgates: ThreadgateInfoMap, embeds: PostEmbedViews, labels: Labels, + lists: ListInfoMap, ): PostView | undefined { const post = posts[uri] + const gate = threadgates[uri] const author = actors[post?.creator] if (!post || !author) return undefined const postLabels = labels[uri] ?? [] @@ -187,6 +210,10 @@ export class FeedViews { } : undefined, labels: [...postLabels, ...postSelfLabels], + threadgate: + !post.record.reply && gate + ? this.formatThreadgate(gate, lists) + : undefined, } } @@ -194,14 +221,24 @@ export class FeedViews { uri: string, actors: ActorInfoMap, posts: PostInfoMap, + threadgates: ThreadgateInfoMap, embeds: PostEmbedViews, labels: Labels, + lists: ListInfoMap, blocks: PostBlocksMap, opts?: { usePostViewUnion?: boolean }, ): MaybePostView | undefined { - const post = this.formatPostView(uri, actors, posts, embeds, labels) + const post = this.formatPostView( + uri, + actors, + posts, + threadgates, + embeds, + labels, + lists, + ) if (!post) { if (!opts?.usePostViewUnion) return return this.notFoundPost(uri) @@ -342,4 +379,18 @@ export class FeedViews { media: mediaEmbed, } } + + formatThreadgate(gate: ThreadgateInfo, lists: ListInfoMap) { + return { + uri: gate.uri, + cid: gate.cid, + record: gate.record, + lists: mapDefined(gate.record.allow ?? [], (rule) => { + if (!isListRule(rule)) return + const list = lists[rule.list] + if (!list) return + return this.services.graph.formatListViewBasic(list) + }), + } + } } diff --git a/packages/bsky/src/services/indexing/index.ts b/packages/bsky/src/services/indexing/index.ts index 05e591c92c4..03dce203f36 100644 --- a/packages/bsky/src/services/indexing/index.ts +++ b/packages/bsky/src/services/indexing/index.ts @@ -14,6 +14,7 @@ import { DAY, HOUR } from '@atproto/common' import { ValidationError } from '@atproto/lexicon' import { PrimaryDatabase } from '../../db' import * as Post from './plugins/post' +import * as Threadgate from './plugins/thread-gate' import * as Like from './plugins/like' import * as Repost from './plugins/repost' import * as Follow from './plugins/follow' @@ -34,6 +35,7 @@ import { Actor } from '../../db/tables/actor' export class IndexingService { records: { post: Post.PluginType + threadGate: Threadgate.PluginType like: Like.PluginType repost: Repost.PluginType follow: Follow.PluginType @@ -54,6 +56,7 @@ export class IndexingService { ) { this.records = { post: Post.makePlugin(this.db, backgroundQueue, notifServer), + threadGate: Threadgate.makePlugin(this.db, backgroundQueue, notifServer), like: Like.makePlugin(this.db, backgroundQueue, notifServer), repost: Repost.makePlugin(this.db, backgroundQueue, notifServer), follow: Follow.makePlugin(this.db, backgroundQueue, notifServer), @@ -360,6 +363,10 @@ export class IndexingService { .where('post_embed_record.postUri', 'in', postByUser) .execute() await this.db.db.deleteFrom('post').where('creator', '=', did).execute() + await this.db.db + .deleteFrom('thread_gate') + .where('creator', '=', did) + .execute() // notifications await this.db.db .deleteFrom('notification') diff --git a/packages/bsky/src/services/indexing/plugins/post.ts b/packages/bsky/src/services/indexing/plugins/post.ts index 7ce431fdcd8..f57bc10179b 100644 --- a/packages/bsky/src/services/indexing/plugins/post.ts +++ b/packages/bsky/src/services/indexing/plugins/post.ts @@ -1,7 +1,12 @@ import { Insertable, Selectable, sql } from 'kysely' import { CID } from 'multiformats/cid' import { AtUri } from '@atproto/syntax' -import { Record as PostRecord } from '../../../lexicon/types/app/bsky/feed/post' +import { jsonStringToLex } from '@atproto/lexicon' +import { + Record as PostRecord, + ReplyRef, +} from '../../../lexicon/types/app/bsky/feed/post' +import { Record as GateRecord } from '../../../lexicon/types/app/bsky/feed/threadgate' import { isMain as isEmbedImage } from '../../../lexicon/types/app/bsky/embed/images' import { isMain as isEmbedExternal } from '../../../lexicon/types/app/bsky/embed/external' import { isMain as isEmbedRecord } from '../../../lexicon/types/app/bsky/embed/record' @@ -20,6 +25,8 @@ import { countAll, excluded } from '../../../db/util' import { BackgroundQueue } from '../../../background' import { getAncestorsAndSelfQb, getDescendentsQb } from '../../util/post' import { NotificationServer } from '../../../notifications' +import * as feedutil from '../../feed/util' +import { postToThreadgateUri } from '../../feed/util' type Notif = Insertable type Post = Selectable @@ -96,6 +103,21 @@ const insertFn = async ( return null // Post already indexed } + if (obj.reply) { + const { invalidReplyRoot, violatesThreadGate } = await validateReply( + db, + uri.host, + obj.reply, + ) + if (invalidReplyRoot || violatesThreadGate) { + await db + .updateTable('post') + .where('uri', '=', post.uri) + .set({ invalidReplyRoot, violatesThreadGate }) + .executeTakeFirst() + } + } + const facets = (obj.facets || []) .flatMap((facet) => facet.features) .flatMap((feature) => { @@ -381,3 +403,58 @@ function separateEmbeds(embed: PostRecord['embed']) { } return [embed] } + +async function validateReply( + db: DatabaseSchema, + creator: string, + reply: ReplyRef, +) { + const replyRefs = await getReplyRefs(db, reply) + // check reply + const invalidReplyRoot = + !replyRefs.parent || feedutil.invalidReplyRoot(reply, replyRefs.parent) + // check interaction + const violatesThreadGate = await feedutil.violatesThreadGate( + db, + creator, + new AtUri(reply.root.uri).host, + replyRefs.root?.record ?? null, + replyRefs.gate?.record ?? null, + ) + return { + invalidReplyRoot, + violatesThreadGate, + } +} + +async function getReplyRefs(db: DatabaseSchema, reply: ReplyRef) { + const replyRoot = reply.root.uri + const replyParent = reply.parent.uri + const replyGate = postToThreadgateUri(replyRoot) + const results = await db + .selectFrom('record') + .where('record.uri', 'in', [replyRoot, replyGate, replyParent]) + .leftJoin('post', 'post.uri', 'record.uri') + .selectAll('post') + .select(['record.uri', 'json']) + .execute() + const root = results.find((ref) => ref.uri === replyRoot) + const parent = results.find((ref) => ref.uri === replyParent) + const gate = results.find((ref) => ref.uri === replyGate) + return { + root: root && { + uri: root.uri, + invalidReplyRoot: root.invalidReplyRoot, + record: jsonStringToLex(root.json) as PostRecord, + }, + parent: parent && { + uri: parent.uri, + invalidReplyRoot: parent.invalidReplyRoot, + record: jsonStringToLex(parent.json) as PostRecord, + }, + gate: gate && { + uri: gate.uri, + record: jsonStringToLex(gate.json) as GateRecord, + }, + } +} diff --git a/packages/bsky/src/services/indexing/plugins/thread-gate.ts b/packages/bsky/src/services/indexing/plugins/thread-gate.ts new file mode 100644 index 00000000000..fb0928f2459 --- /dev/null +++ b/packages/bsky/src/services/indexing/plugins/thread-gate.ts @@ -0,0 +1,95 @@ +import { AtUri } from '@atproto/syntax' +import { InvalidRequestError } from '@atproto/xrpc-server' +import { CID } from 'multiformats/cid' +import * as Threadgate from '../../../lexicon/types/app/bsky/feed/threadgate' +import * as lex from '../../../lexicon/lexicons' +import { DatabaseSchema, DatabaseSchemaType } from '../../../db/database-schema' +import RecordProcessor from '../processor' +import { toSimplifiedISOSafe } from '../util' +import { PrimaryDatabase } from '../../../db' +import { BackgroundQueue } from '../../../background' +import { NotificationServer } from '../../../notifications' + +const lexId = lex.ids.AppBskyFeedThreadgate +type IndexedGate = DatabaseSchemaType['thread_gate'] + +const insertFn = async ( + db: DatabaseSchema, + uri: AtUri, + cid: CID, + obj: Threadgate.Record, + timestamp: string, +): Promise => { + const postUri = new AtUri(obj.post) + if (postUri.host !== uri.host || postUri.rkey !== uri.rkey) { + throw new InvalidRequestError( + 'Creator and rkey of thread gate does not match its post', + ) + } + const inserted = await db + .insertInto('thread_gate') + .values({ + uri: uri.toString(), + cid: cid.toString(), + creator: uri.host, + postUri: obj.post, + createdAt: toSimplifiedISOSafe(obj.createdAt), + indexedAt: timestamp, + }) + .onConflict((oc) => oc.doNothing()) + .returningAll() + .executeTakeFirst() + return inserted || null +} + +const findDuplicate = async ( + db: DatabaseSchema, + _uri: AtUri, + obj: Threadgate.Record, +): Promise => { + const found = await db + .selectFrom('thread_gate') + .where('postUri', '=', obj.post) + .selectAll() + .executeTakeFirst() + return found ? new AtUri(found.uri) : null +} + +const notifsForInsert = () => { + return [] +} + +const deleteFn = async ( + db: DatabaseSchema, + uri: AtUri, +): Promise => { + const deleted = await db + .deleteFrom('thread_gate') + .where('uri', '=', uri.toString()) + .returningAll() + .executeTakeFirst() + return deleted || null +} + +const notifsForDelete = () => { + return { notifs: [], toDelete: [] } +} + +export type PluginType = RecordProcessor + +export const makePlugin = ( + db: PrimaryDatabase, + backgroundQueue: BackgroundQueue, + notifServer?: NotificationServer, +): PluginType => { + return new RecordProcessor(db, backgroundQueue, notifServer, { + lexId, + insertFn, + findDuplicate, + deleteFn, + notifsForInsert, + notifsForDelete, + }) +} + +export default makePlugin diff --git a/packages/bsky/tests/__snapshots__/indexing.test.ts.snap b/packages/bsky/tests/__snapshots__/indexing.test.ts.snap index 9abbe8a3f64..f7ccc4e688a 100644 --- a/packages/bsky/tests/__snapshots__/indexing.test.ts.snap +++ b/packages/bsky/tests/__snapshots__/indexing.test.ts.snap @@ -518,6 +518,9 @@ Object { "viewer": Object {}, }, "replies": Array [], + "viewer": Object { + "canReply": true, + }, }, } `; @@ -584,6 +587,9 @@ Object { "viewer": Object {}, }, "replies": Array [], + "viewer": Object { + "canReply": true, + }, }, } `; diff --git a/packages/bsky/tests/_util.ts b/packages/bsky/tests/_util.ts index 4f08af9e0f6..8d39a0f9c2c 100644 --- a/packages/bsky/tests/_util.ts +++ b/packages/bsky/tests/_util.ts @@ -180,6 +180,7 @@ export const stripViewerFromPost = (postUnknown: unknown): PostView => { // @NOTE mutates export const stripViewerFromThread = (thread: T): T => { if (!isThreadViewPost(thread)) return thread + delete thread.viewer thread.post = stripViewerFromPost(thread.post) if (isThreadViewPost(thread.parent)) { thread.parent = stripViewerFromThread(thread.parent) diff --git a/packages/bsky/tests/views/__snapshots__/block-lists.test.ts.snap b/packages/bsky/tests/views/__snapshots__/block-lists.test.ts.snap index fae6e7f4fa9..009095947c2 100644 --- a/packages/bsky/tests/views/__snapshots__/block-lists.test.ts.snap +++ b/packages/bsky/tests/views/__snapshots__/block-lists.test.ts.snap @@ -126,6 +126,9 @@ Object { "uri": "record(0)", "viewer": Object {}, }, + "viewer": Object { + "canReply": true, + }, }, } `; @@ -201,6 +204,9 @@ Object { "viewer": Object {}, }, "replies": Array [], + "viewer": Object { + "canReply": true, + }, }, } `; @@ -282,6 +288,9 @@ Object { "uri": "record(7)", }, ], + "viewer": Object { + "canReply": true, + }, }, } `; diff --git a/packages/bsky/tests/views/__snapshots__/blocks.test.ts.snap b/packages/bsky/tests/views/__snapshots__/blocks.test.ts.snap index 086f6e10d4d..5ee901c65d8 100644 --- a/packages/bsky/tests/views/__snapshots__/blocks.test.ts.snap +++ b/packages/bsky/tests/views/__snapshots__/blocks.test.ts.snap @@ -126,6 +126,9 @@ Object { "uri": "record(0)", "viewer": Object {}, }, + "viewer": Object { + "canReply": true, + }, }, } `; @@ -201,6 +204,9 @@ Object { "viewer": Object {}, }, "replies": Array [], + "viewer": Object { + "canReply": true, + }, }, } `; @@ -353,6 +359,9 @@ Object { }, }, ], + "viewer": Object { + "canReply": true, + }, }, } `; diff --git a/packages/bsky/tests/views/__snapshots__/mute-lists.test.ts.snap b/packages/bsky/tests/views/__snapshots__/mute-lists.test.ts.snap index 0a081f91292..2585a96ec42 100644 --- a/packages/bsky/tests/views/__snapshots__/mute-lists.test.ts.snap +++ b/packages/bsky/tests/views/__snapshots__/mute-lists.test.ts.snap @@ -292,6 +292,9 @@ Object { }, }, ], + "viewer": Object { + "canReply": true, + }, } `; diff --git a/packages/bsky/tests/views/__snapshots__/mutes.test.ts.snap b/packages/bsky/tests/views/__snapshots__/mutes.test.ts.snap index d6836a810a5..fb0eb1fc5d1 100644 --- a/packages/bsky/tests/views/__snapshots__/mutes.test.ts.snap +++ b/packages/bsky/tests/views/__snapshots__/mutes.test.ts.snap @@ -269,5 +269,8 @@ Object { }, }, ], + "viewer": Object { + "canReply": true, + }, } `; diff --git a/packages/bsky/tests/views/__snapshots__/thread.test.ts.snap b/packages/bsky/tests/views/__snapshots__/thread.test.ts.snap index 5886ed56019..4cdd3555805 100644 --- a/packages/bsky/tests/views/__snapshots__/thread.test.ts.snap +++ b/packages/bsky/tests/views/__snapshots__/thread.test.ts.snap @@ -195,6 +195,9 @@ Object { }, }, "replies": Array [], + "viewer": Object { + "canReply": true, + }, } `; @@ -434,6 +437,9 @@ Object { ], }, ], + "viewer": Object { + "canReply": true, + }, } `; @@ -609,6 +615,9 @@ Object { }, }, ], + "viewer": Object { + "canReply": true, + }, } `; @@ -762,6 +771,9 @@ Object { ], }, ], + "viewer": Object { + "canReply": true, + }, } `; @@ -814,6 +826,9 @@ Object { "viewer": Object {}, }, "replies": Array [], + "viewer": Object { + "canReply": true, + }, } `; @@ -881,6 +896,9 @@ Object { "viewer": Object {}, }, "replies": Array [], + "viewer": Object { + "canReply": true, + }, } `; @@ -950,6 +968,9 @@ Object { }, }, "replies": Array [], + "viewer": Object { + "canReply": true, + }, } `; @@ -1019,6 +1040,9 @@ Object { }, }, "replies": Array [], + "viewer": Object { + "canReply": true, + }, } `; @@ -1219,6 +1243,9 @@ Object { ], }, ], + "viewer": Object { + "canReply": true, + }, } `; @@ -1357,5 +1384,8 @@ Object { "replies": Array [], }, ], + "viewer": Object { + "canReply": true, + }, } `; diff --git a/packages/bsky/tests/views/__snapshots__/threadgating.test.ts.snap b/packages/bsky/tests/views/__snapshots__/threadgating.test.ts.snap new file mode 100644 index 00000000000..1545c19e9f4 --- /dev/null +++ b/packages/bsky/tests/views/__snapshots__/threadgating.test.ts.snap @@ -0,0 +1,164 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`views with thread gating applies gate after root post is deleted. 1`] = `undefined`; + +exports[`views with thread gating applies gate for empty rules. 1`] = ` +Object { + "cid": "cids(0)", + "lists": Array [], + "record": Object { + "$type": "app.bsky.feed.threadgate", + "allow": Array [], + "createdAt": "1970-01-01T00:00:00.000Z", + "post": "record(1)", + }, + "uri": "record(0)", +} +`; + +exports[`views with thread gating applies gate for following rule. 1`] = ` +Object { + "cid": "cids(0)", + "lists": Array [], + "record": Object { + "$type": "app.bsky.feed.threadgate", + "allow": Array [ + Object { + "$type": "app.bsky.feed.threadgate#followingRule", + }, + ], + "createdAt": "1970-01-01T00:00:00.000Z", + "post": "record(1)", + }, + "uri": "record(0)", +} +`; + +exports[`views with thread gating applies gate for list rule. 1`] = ` +Object { + "cid": "cids(0)", + "lists": Array [ + Object { + "cid": "cids(1)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "name": "list a", + "purpose": "app.bsky.graph.defs#modlist", + "uri": "record(2)", + "viewer": Object { + "muted": false, + }, + }, + Object { + "cid": "cids(2)", + "indexedAt": "1970-01-01T00:00:00.000Z", + "name": "list b", + "purpose": "app.bsky.graph.defs#modlist", + "uri": "record(3)", + "viewer": Object { + "muted": false, + }, + }, + ], + "record": Object { + "$type": "app.bsky.feed.threadgate", + "allow": Array [ + Object { + "$type": "app.bsky.feed.threadgate#listRule", + "list": "record(2)", + }, + Object { + "$type": "app.bsky.feed.threadgate#listRule", + "list": "record(3)", + }, + ], + "createdAt": "1970-01-01T00:00:00.000Z", + "post": "record(1)", + }, + "uri": "record(0)", +} +`; + +exports[`views with thread gating applies gate for mention rule. 1`] = ` +Object { + "cid": "cids(0)", + "lists": Array [], + "record": Object { + "$type": "app.bsky.feed.threadgate", + "allow": Array [ + Object { + "$type": "app.bsky.feed.threadgate#mentionRule", + }, + ], + "createdAt": "1970-01-01T00:00:00.000Z", + "post": "record(1)", + }, + "uri": "record(0)", +} +`; + +exports[`views with thread gating applies gate for missing rules, takes no action. 1`] = ` +Object { + "cid": "cids(0)", + "lists": Array [], + "record": Object { + "$type": "app.bsky.feed.threadgate", + "createdAt": "1970-01-01T00:00:00.000Z", + "post": "record(1)", + }, + "uri": "record(0)", +} +`; + +exports[`views with thread gating applies gate for multiple rules. 1`] = ` +Object { + "cid": "cids(0)", + "lists": Array [], + "record": Object { + "$type": "app.bsky.feed.threadgate", + "allow": Array [ + Object { + "$type": "app.bsky.feed.threadgate#mentionRule", + }, + Object { + "$type": "app.bsky.feed.threadgate#followingRule", + }, + ], + "createdAt": "1970-01-01T00:00:00.000Z", + "post": "record(1)", + }, + "uri": "record(0)", +} +`; + +exports[`views with thread gating applies gate for unknown list rule. 1`] = ` +Object { + "cid": "cids(0)", + "lists": Array [], + "record": Object { + "$type": "app.bsky.feed.threadgate", + "allow": Array [ + Object { + "$type": "app.bsky.feed.threadgate#listRule", + "list": "record(1)", + }, + ], + "createdAt": "1970-01-01T00:00:00.000Z", + "post": "record(1)", + }, + "uri": "record(0)", +} +`; + +exports[`views with thread gating does not apply gate to original poster. 1`] = ` +Object { + "cid": "cids(0)", + "lists": Array [], + "record": Object { + "$type": "app.bsky.feed.threadgate", + "allow": Array [], + "createdAt": "1970-01-01T00:00:00.000Z", + "post": "record(1)", + }, + "uri": "record(0)", +} +`; diff --git a/packages/bsky/tests/views/threadgating.test.ts b/packages/bsky/tests/views/threadgating.test.ts new file mode 100644 index 00000000000..7d29addfcf5 --- /dev/null +++ b/packages/bsky/tests/views/threadgating.test.ts @@ -0,0 +1,571 @@ +import assert from 'assert' +import AtpAgent from '@atproto/api' +import { TestNetwork } from '@atproto/dev-env' +import { + isNotFoundPost, + isThreadViewPost, +} from '../../src/lexicon/types/app/bsky/feed/defs' +import { SeedClient } from '../seeds/client' +import basicSeed from '../seeds/basic' +import { forSnapshot } from '../_util' + +describe('views with thread gating', () => { + let network: TestNetwork + let agent: AtpAgent + let pdsAgent: AtpAgent + let sc: SeedClient + + beforeAll(async () => { + network = await TestNetwork.create({ + dbPostgresSchema: 'bsky_views_thread_gating', + }) + agent = network.bsky.getClient() + pdsAgent = network.pds.getClient() + sc = new SeedClient(pdsAgent) + await basicSeed(sc) + await network.processAll() + }) + + afterAll(async () => { + await network.close() + }) + + it('applies gate for empty rules.', async () => { + const post = await sc.post(sc.dids.carol, 'empty rules') + await pdsAgent.api.app.bsky.feed.threadgate.create( + { repo: sc.dids.carol, rkey: post.ref.uri.rkey }, + { post: post.ref.uriStr, createdAt: iso(), allow: [] }, + sc.getHeaders(sc.dids.carol), + ) + await sc.reply(sc.dids.alice, post.ref, post.ref, 'empty rules reply') + await network.processAll() + const { + data: { thread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: post.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.alice) }, + ) + assert(isThreadViewPost(thread)) + expect(forSnapshot(thread.post.threadgate)).toMatchSnapshot() + expect(thread.viewer).toEqual({ canReply: false }) + expect(thread.replies?.length).toEqual(0) + }) + + it('applies gate for mention rule.', async () => { + const post = await sc.post( + sc.dids.carol, + 'mention rules @carol.test @dan.test', + [ + { + index: { byteStart: 14, byteEnd: 25 }, + features: [ + { $type: 'app.bsky.richtext.facet#mention', did: sc.dids.carol }, + ], + }, + { + index: { byteStart: 26, byteEnd: 35 }, + features: [ + { $type: 'app.bsky.richtext.facet#mention', did: sc.dids.dan }, + ], + }, + ], + ) + await pdsAgent.api.app.bsky.feed.threadgate.create( + { repo: sc.dids.carol, rkey: post.ref.uri.rkey }, + { + post: post.ref.uriStr, + createdAt: iso(), + allow: [{ $type: 'app.bsky.feed.threadgate#mentionRule' }], + }, + sc.getHeaders(sc.dids.carol), + ) + await sc.reply( + sc.dids.alice, + post.ref, + post.ref, + 'mention rule reply disallow', + ) + const danReply = await sc.reply( + sc.dids.dan, + post.ref, + post.ref, + 'mention rule reply allow', + ) + await network.processAll() + const { + data: { thread: aliceThread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: post.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.alice) }, + ) + assert(isThreadViewPost(aliceThread)) + expect(aliceThread.viewer).toEqual({ canReply: false }) + const { + data: { thread: danThread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: post.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.dan) }, + ) + assert(isThreadViewPost(danThread)) + expect(forSnapshot(danThread.post.threadgate)).toMatchSnapshot() + expect(danThread.viewer).toEqual({ canReply: true }) + const [reply, ...otherReplies] = danThread.replies ?? [] + assert(isThreadViewPost(reply)) + expect(otherReplies.length).toEqual(0) + expect(reply.post.uri).toEqual(danReply.ref.uriStr) + }) + + it('applies gate for following rule.', async () => { + const post = await sc.post(sc.dids.carol, 'following rule') + await pdsAgent.api.app.bsky.feed.threadgate.create( + { repo: sc.dids.carol, rkey: post.ref.uri.rkey }, + { + post: post.ref.uriStr, + createdAt: iso(), + allow: [{ $type: 'app.bsky.feed.threadgate#followingRule' }], + }, + sc.getHeaders(sc.dids.carol), + ) + // carol only follows alice + await sc.reply( + sc.dids.dan, + post.ref, + post.ref, + 'following rule reply disallow', + ) + const aliceReply = await sc.reply( + sc.dids.alice, + post.ref, + post.ref, + 'following rule reply allow', + ) + await network.processAll() + const { + data: { thread: danThread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: post.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.dan) }, + ) + assert(isThreadViewPost(danThread)) + expect(danThread.viewer).toEqual({ canReply: false }) + const { + data: { thread: aliceThread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: post.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.alice) }, + ) + assert(isThreadViewPost(aliceThread)) + expect(forSnapshot(aliceThread.post.threadgate)).toMatchSnapshot() + expect(aliceThread.viewer).toEqual({ canReply: true }) + const [reply, ...otherReplies] = aliceThread.replies ?? [] + assert(isThreadViewPost(reply)) + expect(otherReplies.length).toEqual(0) + expect(reply.post.uri).toEqual(aliceReply.ref.uriStr) + }) + + it('applies gate for list rule.', async () => { + const post = await sc.post(sc.dids.carol, 'following rule') + // setup lists to allow alice and dan + const listA = await pdsAgent.api.app.bsky.graph.list.create( + { repo: sc.dids.carol }, + { + name: 'list a', + purpose: 'app.bsky.graph.defs#modlist', + createdAt: iso(), + }, + sc.getHeaders(sc.dids.carol), + ) + await pdsAgent.api.app.bsky.graph.listitem.create( + { repo: sc.dids.carol }, + { + list: listA.uri, + subject: sc.dids.alice, + createdAt: iso(), + }, + sc.getHeaders(sc.dids.carol), + ) + const listB = await pdsAgent.api.app.bsky.graph.list.create( + { repo: sc.dids.carol }, + { + name: 'list b', + purpose: 'app.bsky.graph.defs#modlist', + createdAt: iso(), + }, + sc.getHeaders(sc.dids.carol), + ) + await pdsAgent.api.app.bsky.graph.listitem.create( + { repo: sc.dids.carol }, + { + list: listB.uri, + subject: sc.dids.dan, + createdAt: iso(), + }, + sc.getHeaders(sc.dids.carol), + ) + await pdsAgent.api.app.bsky.feed.threadgate.create( + { repo: sc.dids.carol, rkey: post.ref.uri.rkey }, + { + post: post.ref.uriStr, + createdAt: iso(), + allow: [ + { $type: 'app.bsky.feed.threadgate#listRule', list: listA.uri }, + { $type: 'app.bsky.feed.threadgate#listRule', list: listB.uri }, + ], + }, + sc.getHeaders(sc.dids.carol), + ) + // + await sc.reply(sc.dids.bob, post.ref, post.ref, 'list rule reply disallow') + const aliceReply = await sc.reply( + sc.dids.alice, + post.ref, + post.ref, + 'list rule reply allow (list a)', + ) + const danReply = await sc.reply( + sc.dids.dan, + post.ref, + post.ref, + 'list rule reply allow (list b)', + ) + await network.processAll() + const { + data: { thread: bobThread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: post.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.bob) }, + ) + assert(isThreadViewPost(bobThread)) + expect(bobThread.viewer).toEqual({ canReply: false }) + const { + data: { thread: aliceThread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: post.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.alice) }, + ) + assert(isThreadViewPost(aliceThread)) + expect(aliceThread.viewer).toEqual({ canReply: true }) + const { + data: { thread: danThread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: post.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.dan) }, + ) + assert(isThreadViewPost(danThread)) + expect(forSnapshot(danThread.post.threadgate)).toMatchSnapshot() + expect(danThread.viewer).toEqual({ canReply: true }) + const [reply1, reply2, ...otherReplies] = aliceThread.replies ?? [] + assert(isThreadViewPost(reply1)) + assert(isThreadViewPost(reply2)) + expect(otherReplies.length).toEqual(0) + expect(reply1.post.uri).toEqual(danReply.ref.uriStr) + expect(reply2.post.uri).toEqual(aliceReply.ref.uriStr) + }) + + it('applies gate for unknown list rule.', async () => { + const post = await sc.post(sc.dids.carol, 'unknown list rules') + await pdsAgent.api.app.bsky.feed.threadgate.create( + { repo: sc.dids.carol, rkey: post.ref.uri.rkey }, + { + post: post.ref.uriStr, + createdAt: iso(), + allow: [ + { + $type: 'app.bsky.feed.threadgate#listRule', + list: post.ref.uriStr, // bad list link, references a post + }, + ], + }, + sc.getHeaders(sc.dids.carol), + ) + await sc.reply( + sc.dids.alice, + post.ref, + post.ref, + 'unknown list rules reply', + ) + await network.processAll() + const { + data: { thread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: post.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.alice) }, + ) + assert(isThreadViewPost(thread)) + expect(forSnapshot(thread.post.threadgate)).toMatchSnapshot() + expect(thread.viewer).toEqual({ canReply: false }) + expect(thread.replies?.length).toEqual(0) + }) + + it('applies gate for multiple rules.', async () => { + const post = await sc.post(sc.dids.carol, 'multi rules @dan.test', [ + { + index: { byteStart: 12, byteEnd: 21 }, + features: [ + { $type: 'app.bsky.richtext.facet#mention', did: sc.dids.dan }, + ], + }, + ]) + await pdsAgent.api.app.bsky.feed.threadgate.create( + { repo: sc.dids.carol, rkey: post.ref.uri.rkey }, + { + post: post.ref.uriStr, + createdAt: iso(), + allow: [ + { $type: 'app.bsky.feed.threadgate#mentionRule' }, + { $type: 'app.bsky.feed.threadgate#followingRule' }, + ], + }, + sc.getHeaders(sc.dids.carol), + ) + // carol only follows alice, and the post mentions dan. + await sc.reply(sc.dids.bob, post.ref, post.ref, 'multi rule reply disallow') + const aliceReply = await sc.reply( + sc.dids.alice, + post.ref, + post.ref, + 'multi rule reply allow (following)', + ) + const danReply = await sc.reply( + sc.dids.dan, + post.ref, + post.ref, + 'multi rule reply allow (mention)', + ) + await network.processAll() + const { + data: { thread: bobThread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: post.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.bob) }, + ) + assert(isThreadViewPost(bobThread)) + expect(bobThread.viewer).toEqual({ canReply: false }) + const { + data: { thread: aliceThread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: post.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.alice) }, + ) + assert(isThreadViewPost(aliceThread)) + expect(aliceThread.viewer).toEqual({ canReply: true }) + const { + data: { thread: danThread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: post.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.dan) }, + ) + assert(isThreadViewPost(danThread)) + expect(forSnapshot(danThread.post.threadgate)).toMatchSnapshot() + expect(danThread.viewer).toEqual({ canReply: true }) + const [reply1, reply2, ...otherReplies] = aliceThread.replies ?? [] + assert(isThreadViewPost(reply1)) + assert(isThreadViewPost(reply2)) + expect(otherReplies.length).toEqual(0) + expect(reply1.post.uri).toEqual(danReply.ref.uriStr) + expect(reply2.post.uri).toEqual(aliceReply.ref.uriStr) + }) + + it('applies gate for missing rules, takes no action.', async () => { + const post = await sc.post(sc.dids.carol, 'missing rules') + await pdsAgent.api.app.bsky.feed.threadgate.create( + { repo: sc.dids.carol, rkey: post.ref.uri.rkey }, + { post: post.ref.uriStr, createdAt: iso() }, + sc.getHeaders(sc.dids.carol), + ) + const aliceReply = await sc.reply( + sc.dids.alice, + post.ref, + post.ref, + 'missing rules reply', + ) + await network.processAll() + const { + data: { thread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: post.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.alice) }, + ) + assert(isThreadViewPost(thread)) + expect(forSnapshot(thread.post.threadgate)).toMatchSnapshot() + expect(thread.viewer).toEqual({ canReply: true }) + const [reply, ...otherReplies] = thread.replies ?? [] + assert(isThreadViewPost(reply)) + expect(otherReplies.length).toEqual(0) + expect(reply.post.uri).toEqual(aliceReply.ref.uriStr) + }) + + it('applies gate after root post is deleted.', async () => { + // @NOTE also covers rule application more than one level deep + const post = await sc.post(sc.dids.carol, 'following rule w/ post deletion') + await pdsAgent.api.app.bsky.feed.threadgate.create( + { repo: sc.dids.carol, rkey: post.ref.uri.rkey }, + { + post: post.ref.uriStr, + createdAt: iso(), + allow: [{ $type: 'app.bsky.feed.threadgate#followingRule' }], + }, + sc.getHeaders(sc.dids.carol), + ) + // carol only follows alice + const orphanedReply = await sc.reply( + sc.dids.alice, + post.ref, + post.ref, + 'following rule reply allow', + ) + await pdsAgent.api.app.bsky.feed.post.delete( + { repo: sc.dids.carol, rkey: post.ref.uri.rkey }, + sc.getHeaders(sc.dids.carol), + ) + await network.processAll() + await sc.reply( + sc.dids.dan, + post.ref, + orphanedReply.ref, + 'following rule reply disallow', + ) + const aliceReply = await sc.reply( + sc.dids.alice, + post.ref, + orphanedReply.ref, + 'following rule reply allow', + ) + await network.processAll() + const { + data: { thread: danThread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: orphanedReply.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.dan) }, + ) + assert(isThreadViewPost(danThread)) + expect(danThread.viewer).toEqual({ canReply: false }) + const { + data: { thread: aliceThread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: orphanedReply.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.alice) }, + ) + assert(isThreadViewPost(aliceThread)) + assert( + isNotFoundPost(aliceThread.parent) && + aliceThread.parent.uri === post.ref.uriStr, + ) + expect(aliceThread.post.threadgate).toMatchSnapshot() + expect(aliceThread.viewer).toEqual({ canReply: true }) + const [reply, ...otherReplies] = aliceThread.replies ?? [] + assert(isThreadViewPost(reply)) + expect(otherReplies.length).toEqual(0) + expect(reply.post.uri).toEqual(aliceReply.ref.uriStr) + }) + + it('does not apply gate to original poster.', async () => { + const post = await sc.post(sc.dids.carol, 'empty rules') + await pdsAgent.api.app.bsky.feed.threadgate.create( + { repo: sc.dids.carol, rkey: post.ref.uri.rkey }, + { post: post.ref.uriStr, createdAt: iso(), allow: [] }, + sc.getHeaders(sc.dids.carol), + ) + const selfReply = await sc.reply( + sc.dids.carol, + post.ref, + post.ref, + 'empty rules reply allow', + ) + await network.processAll() + const { + data: { thread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: post.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.carol) }, + ) + assert(isThreadViewPost(thread)) + expect(forSnapshot(thread.post.threadgate)).toMatchSnapshot() + expect(thread.viewer).toEqual({ canReply: true }) + const [reply, ...otherReplies] = thread.replies ?? [] + assert(isThreadViewPost(reply)) + expect(otherReplies.length).toEqual(0) + expect(reply.post.uri).toEqual(selfReply.ref.uriStr) + }) + + it('displays gated posts in feed and thread anchor without reply context.', async () => { + const post = await sc.post(sc.dids.carol, 'following rule') + await pdsAgent.api.app.bsky.feed.threadgate.create( + { repo: sc.dids.carol, rkey: post.ref.uri.rkey }, + { + post: post.ref.uriStr, + createdAt: iso(), + allow: [{ $type: 'app.bsky.feed.threadgate#followingRule' }], + }, + sc.getHeaders(sc.dids.carol), + ) + // carol only follows alice + const badReply = await sc.reply( + sc.dids.dan, + post.ref, + post.ref, + 'following rule reply disallow', + ) + // going to ensure this one doesn't appear in badReply's thread + await sc.reply(sc.dids.alice, post.ref, badReply.ref, 'reply to disallowed') + await network.processAll() + // check thread view + const { + data: { thread }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: badReply.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.alice) }, + ) + assert(isThreadViewPost(thread)) + expect(thread.viewer).toEqual({ canReply: false }) // nobody can reply to this, not even alice. + expect(thread.replies).toBeUndefined() + expect(thread.parent).toBeUndefined() + expect(thread.post.threadgate).toBeUndefined() + // check feed view + const { + data: { feed }, + } = await agent.api.app.bsky.feed.getAuthorFeed( + { actor: sc.dids.dan }, + { headers: await network.serviceHeaders(sc.dids.alice) }, + ) + const [feedItem] = feed + expect(feedItem.post.uri).toEqual(badReply.ref.uriStr) + expect(feedItem.post.threadgate).toBeUndefined() + expect(feedItem.reply).toBeUndefined() + }) + + it('does not apply gate unless it matches post rkey.', async () => { + const postA = await sc.post(sc.dids.carol, 'ungated a') + const postB = await sc.post(sc.dids.carol, 'ungated b') + await pdsAgent.api.app.bsky.feed.threadgate.create( + { repo: sc.dids.carol, rkey: postA.ref.uri.rkey }, + { post: postB.ref.uriStr, createdAt: iso(), allow: [] }, + sc.getHeaders(sc.dids.carol), + ) + await sc.reply(sc.dids.alice, postA.ref, postA.ref, 'ungated reply') + await sc.reply(sc.dids.alice, postB.ref, postB.ref, 'ungated reply') + await network.processAll() + const { + data: { thread: threadA }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: postA.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.alice) }, + ) + assert(isThreadViewPost(threadA)) + expect(threadA.post.threadgate).toBeUndefined() + expect(threadA.viewer).toEqual({ canReply: true }) + expect(threadA.replies?.length).toEqual(1) + const { + data: { thread: threadB }, + } = await agent.api.app.bsky.feed.getPostThread( + { uri: postB.ref.uriStr }, + { headers: await network.serviceHeaders(sc.dids.alice) }, + ) + assert(isThreadViewPost(threadB)) + expect(threadB.post.threadgate).toBeUndefined() + expect(threadB.viewer).toEqual({ canReply: true }) + expect(threadB.replies?.length).toEqual(1) + }) +}) + +const iso = (date = new Date()) => date.toISOString() diff --git a/packages/pds/src/lexicon/lexicons.ts b/packages/pds/src/lexicon/lexicons.ts index f1fd2519d74..2ca983aec4b 100644 --- a/packages/pds/src/lexicon/lexicons.ts +++ b/packages/pds/src/lexicon/lexicons.ts @@ -4390,6 +4390,10 @@ export const schemaDict = { ref: 'lex:com.atproto.label.defs#label', }, }, + threadgate: { + type: 'ref', + ref: 'lex:app.bsky.feed.defs#threadgateView', + }, }, }, viewerState: { @@ -4486,6 +4490,10 @@ export const schemaDict = { ], }, }, + viewer: { + type: 'ref', + ref: 'lex:app.bsky.feed.defs#viewerThreadState', + }, }, }, notFoundPost: { @@ -4534,6 +4542,14 @@ export const schemaDict = { }, }, }, + viewerThreadState: { + type: 'object', + properties: { + canReply: { + type: 'boolean', + }, + }, + }, generatorView: { type: 'object', required: ['uri', 'cid', 'did', 'creator', 'displayName', 'indexedAt'], @@ -4619,6 +4635,29 @@ export const schemaDict = { }, }, }, + threadgateView: { + type: 'object', + properties: { + uri: { + type: 'string', + format: 'at-uri', + }, + cid: { + type: 'string', + format: 'cid', + }, + record: { + type: 'unknown', + }, + lists: { + type: 'array', + items: { + type: 'ref', + ref: 'lex:app.bsky.graph.defs#listViewBasic', + }, + }, + }, + }, }, }, AppBskyFeedDescribeFeedGenerator: { @@ -5615,6 +5654,63 @@ export const schemaDict = { }, }, }, + AppBskyFeedThreadgate: { + lexicon: 1, + id: 'app.bsky.feed.threadgate', + defs: { + main: { + type: 'record', + key: 'tid', + description: + "Defines interaction gating rules for a thread. The rkey of the threadgate record should match the rkey of the thread's root post.", + record: { + type: 'object', + required: ['post', 'createdAt'], + properties: { + post: { + type: 'string', + format: 'at-uri', + }, + allow: { + type: 'array', + maxLength: 5, + items: { + type: 'union', + refs: [ + 'lex:app.bsky.feed.threadgate#mentionRule', + 'lex:app.bsky.feed.threadgate#followingRule', + 'lex:app.bsky.feed.threadgate#listRule', + ], + }, + }, + createdAt: { + type: 'string', + format: 'datetime', + }, + }, + }, + }, + mentionRule: { + type: 'object', + description: 'Allow replies from actors mentioned in your post.', + }, + followingRule: { + type: 'object', + description: 'Allow replies from actors you follow.', + }, + listRule: { + type: 'object', + description: 'Allow replies from actors on a list.', + required: ['list'], + properties: { + list: { + type: 'string', + format: 'at-uri', + }, + }, + }, + }, + }, AppBskyGraphBlock: { lexicon: 1, id: 'app.bsky.graph.block', @@ -6932,6 +7028,7 @@ export const ids = { AppBskyFeedLike: 'app.bsky.feed.like', AppBskyFeedPost: 'app.bsky.feed.post', AppBskyFeedRepost: 'app.bsky.feed.repost', + AppBskyFeedThreadgate: 'app.bsky.feed.threadgate', AppBskyGraphBlock: 'app.bsky.graph.block', AppBskyGraphDefs: 'app.bsky.graph.defs', AppBskyGraphFollow: 'app.bsky.graph.follow', diff --git a/packages/pds/src/lexicon/types/app/bsky/feed/defs.ts b/packages/pds/src/lexicon/types/app/bsky/feed/defs.ts index 463445fbd49..08d34d88ebb 100644 --- a/packages/pds/src/lexicon/types/app/bsky/feed/defs.ts +++ b/packages/pds/src/lexicon/types/app/bsky/feed/defs.ts @@ -12,6 +12,7 @@ import * as AppBskyEmbedRecord from '../embed/record' import * as AppBskyEmbedRecordWithMedia from '../embed/recordWithMedia' import * as ComAtprotoLabelDefs from '../../../com/atproto/label/defs' import * as AppBskyRichtextFacet from '../richtext/facet' +import * as AppBskyGraphDefs from '../graph/defs' export interface PostView { uri: string @@ -30,6 +31,7 @@ export interface PostView { indexedAt: string viewer?: ViewerState labels?: ComAtprotoLabelDefs.Label[] + threadgate?: ThreadgateView [k: string]: unknown } @@ -135,6 +137,7 @@ export interface ThreadViewPost { | BlockedPost | { $type: string; [k: string]: unknown } )[] + viewer?: ViewerThreadState [k: string]: unknown } @@ -205,6 +208,23 @@ export function validateBlockedAuthor(v: unknown): ValidationResult { return lexicons.validate('app.bsky.feed.defs#blockedAuthor', v) } +export interface ViewerThreadState { + canReply?: boolean + [k: string]: unknown +} + +export function isViewerThreadState(v: unknown): v is ViewerThreadState { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'app.bsky.feed.defs#viewerThreadState' + ) +} + +export function validateViewerThreadState(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.defs#viewerThreadState', v) +} + export interface GeneratorView { uri: string cid: string @@ -283,3 +303,23 @@ export function isSkeletonReasonRepost(v: unknown): v is SkeletonReasonRepost { export function validateSkeletonReasonRepost(v: unknown): ValidationResult { return lexicons.validate('app.bsky.feed.defs#skeletonReasonRepost', v) } + +export interface ThreadgateView { + uri?: string + cid?: string + record?: {} + lists?: AppBskyGraphDefs.ListViewBasic[] + [k: string]: unknown +} + +export function isThreadgateView(v: unknown): v is ThreadgateView { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'app.bsky.feed.defs#threadgateView' + ) +} + +export function validateThreadgateView(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.defs#threadgateView', v) +} diff --git a/packages/pds/src/lexicon/types/app/bsky/feed/threadgate.ts b/packages/pds/src/lexicon/types/app/bsky/feed/threadgate.ts new file mode 100644 index 00000000000..51f9f8e9af1 --- /dev/null +++ b/packages/pds/src/lexicon/types/app/bsky/feed/threadgate.ts @@ -0,0 +1,80 @@ +/** + * GENERATED CODE - DO NOT MODIFY + */ +import { ValidationResult, BlobRef } from '@atproto/lexicon' +import { lexicons } from '../../../../lexicons' +import { isObj, hasProp } from '../../../../util' +import { CID } from 'multiformats/cid' + +export interface Record { + post: string + allow?: ( + | MentionRule + | FollowingRule + | ListRule + | { $type: string; [k: string]: unknown } + )[] + createdAt: string + [k: string]: unknown +} + +export function isRecord(v: unknown): v is Record { + return ( + isObj(v) && + hasProp(v, '$type') && + (v.$type === 'app.bsky.feed.threadgate#main' || + v.$type === 'app.bsky.feed.threadgate') + ) +} + +export function validateRecord(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.threadgate#main', v) +} + +/** Allow replies from actors mentioned in your post. */ +export interface MentionRule {} + +export function isMentionRule(v: unknown): v is MentionRule { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'app.bsky.feed.threadgate#mentionRule' + ) +} + +export function validateMentionRule(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.threadgate#mentionRule', v) +} + +/** Allow replies from actors you follow. */ +export interface FollowingRule {} + +export function isFollowingRule(v: unknown): v is FollowingRule { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'app.bsky.feed.threadgate#followingRule' + ) +} + +export function validateFollowingRule(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.threadgate#followingRule', v) +} + +/** Allow replies from actors on a list. */ +export interface ListRule { + list: string + [k: string]: unknown +} + +export function isListRule(v: unknown): v is ListRule { + return ( + isObj(v) && + hasProp(v, '$type') && + v.$type === 'app.bsky.feed.threadgate#listRule' + ) +} + +export function validateListRule(v: unknown): ValidationResult { + return lexicons.validate('app.bsky.feed.threadgate#listRule', v) +} diff --git a/packages/pds/src/repo/prepare.ts b/packages/pds/src/repo/prepare.ts index 60fbe2d81cd..581701f1f01 100644 --- a/packages/pds/src/repo/prepare.ts +++ b/packages/pds/src/repo/prepare.ts @@ -1,6 +1,6 @@ import { CID } from 'multiformats/cid' import { AtUri } from '@atproto/syntax' -import { TID, dataToCborBlock } from '@atproto/common' +import { MINUTE, TID, dataToCborBlock } from '@atproto/common' import { LexiconDefNotFoundError, RepoRecord, @@ -155,14 +155,20 @@ export const prepareCreate = async (opts: { if (validate) { assertValidRecord(record) } - if (collection === lex.ids.AppBskyFeedPost && opts.rkey) { - // @TODO temporary + + const nextRkey = TID.next() + if ( + collection === lex.ids.AppBskyFeedPost && + opts.rkey && + !rkeyIsInWindow(nextRkey, new TID(opts.rkey)) + ) { + // @TODO temporary. allowing a window supports creation of post and gate records at the same time. throw new InvalidRequestError( - 'Custom rkeys for post records are not currently supported.', + 'Custom rkeys for post records should be near the present.', ) } - const rkey = opts.rkey || TID.nextStr() + const rkey = opts.rkey || nextRkey.toString() assertNoExplicitSlurs(rkey, record) return { action: WriteOpAction.Create, @@ -298,3 +304,9 @@ function assertNoExplicitSlurs(rkey: string, record: RepoRecord) { throw new InvalidRecordError('Unacceptable slur in record') } } + +// ensures two rkeys are not far apart +function rkeyIsInWindow(rkey1: TID, rkey2: TID) { + const ms = Math.abs(rkey1.timestamp() - rkey2.timestamp()) / 1000 + return ms < 10 * MINUTE +}