diff --git a/packages/core/src/infrastructure/bull/bull.types.ts b/packages/core/src/infrastructure/bull/bull.types.ts index 763d4c7d..ceac0bec 100644 --- a/packages/core/src/infrastructure/bull/bull.types.ts +++ b/packages/core/src/infrastructure/bull/bull.types.ts @@ -486,6 +486,22 @@ export const SlackBullJob = z.discriminatedUnion('name', [ id: true, }), }), + z.object({ + name: z.literal('slack.profile_picture.changed'), + data: z.object({ + profilePicture: Student.shape.profilePicture, + slackId: Student.shape.slackId.unwrap(), + }), + }), + z.object({ + name: z.literal('slack.question.answer.private'), + data: z.object({ + channelId: z.string().trim().min(1), + question: z.string().trim().min(1), + threadId: z.string().trim().min(1), + userId: z.string().trim().min(1), + }), + }), z.object({ name: z.literal('slack.reaction.add'), data: SlackReaction.pick({ @@ -504,13 +520,6 @@ export const SlackBullJob = z.discriminatedUnion('name', [ userId: true, }), }), - z.object({ - name: z.literal('slack.profile_picture.changed'), - data: z.object({ - profilePicture: Student.shape.profilePicture, - slackId: Student.shape.slackId.unwrap(), - }), - }), z.object({ name: z.literal('slack.thread.sync_embedding'), data: z.object({ diff --git a/packages/core/src/modules/mixpanel.ts b/packages/core/src/modules/mixpanel.ts index dd615af6..953e86a6 100644 --- a/packages/core/src/modules/mixpanel.ts +++ b/packages/core/src/modules/mixpanel.ts @@ -53,6 +53,12 @@ export type MixpanelEvent = { | 'Resources'; }; + 'Public Question Answered': { + '# of Threads Found': number; + Question: string; + Where: 'DM'; + }; + 'Resource Added': undefined; 'Resource Link Copied': undefined; 'Resource Tag Added': undefined; @@ -66,7 +72,7 @@ export type TrackInput = { event: Event; properties: MixpanelEvent[Event]; request?: Request; - user: string; + user?: string; }; export function track({ @@ -83,8 +89,8 @@ export function track({ if (!request) { mixpanel.track(event, { ...properties, + ...(user && { distinct_id: user }), Application: application, - distinct_id: user, }); return; @@ -99,6 +105,7 @@ export function track({ mixpanel.track(event, { ...properties, + ...(user && { distinct_id: user }), Application: application, $browser: result.browser.name, $browser_version: result.browser.version, @@ -106,7 +113,6 @@ export function track({ $referrer: referrer, $os: result.os.name, $os_version: result.os.version, - distinct_id: user, ip, }); } diff --git a/packages/core/src/modules/notification/use-cases/send-slack-notification.ts b/packages/core/src/modules/notification/use-cases/send-slack-notification.ts index 4fc20f4f..e3a93fce 100644 --- a/packages/core/src/modules/notification/use-cases/send-slack-notification.ts +++ b/packages/core/src/modules/notification/use-cases/send-slack-notification.ts @@ -27,9 +27,11 @@ export async function sendSlackNotification(input: SendNotificationInput) { const channel = input.channel || ENV.INTERNAL_SLACK_NOTIFICATIONS_CHANNEL_ID; - await client.chat.postMessage({ + const { ts } = await client.chat.postMessage({ channel, text: input.message, thread_ts: input.threadId, }); + + return ts; } diff --git a/packages/core/src/modules/slack/slack.ts b/packages/core/src/modules/slack/slack.ts index ddb770b4..2b3b4f14 100644 --- a/packages/core/src/modules/slack/slack.ts +++ b/packages/core/src/modules/slack/slack.ts @@ -11,9 +11,16 @@ import { rerankDocuments, } from '@/modules/ai/ai'; import { track } from '@/modules/mixpanel'; +import { sendSlackNotification } from '@/modules/notification/use-cases/send-slack-notification'; import { getPineconeIndex } from '@/modules/pinecone'; import { fail, type Result, success } from '@/shared/utils/core.utils'; +// Constants + +const BLANK_LINE = '\n\n'; + +// Core + type AnswerChatbotQuestionInput = { /** * The ID of the channel where the message was sent. This should be the @@ -102,17 +109,26 @@ export async function answerChatbotQuestion({ workspace: 'regular', }); - const answerResult = await getAnswerFromSlackHistory(text); + const threadsResult = await getMostRelevantThreads(text, { + threshold: 0.5, + topK: 5, + }); + + if (!threadsResult.ok) { + return threadsResult; + } + + const threads = threadsResult.data; + + const answerResult = await getAnswerFromSlackHistory(text, threads); if (!answerResult.ok) { throw new Error(answerResult.error); } - const answerWithReferences = addThreadReferences(answerResult.data); - job('notification.slack.send', { channel: channelId, - message: answerWithReferences, + message: answerResult.data, threadId: id, workspace: 'regular', }); @@ -120,6 +136,121 @@ export async function answerChatbotQuestion({ // TODO: Delete the loading message after the answer is sent. } +type AnswerPublicQuestionInPrivateInput = { + /** + * The ID of the channel where the question was asked (ie: public channel). + */ + channelId: string; + + /** + * The text of the question that was asked. + */ + question: string; + + /** + * The ID of the message in which the question was asked. This should be a + * top-level message (ie: start of a thread). + */ + threadId: string; + + /** + * The ID of the Slack user who asked the question. + */ + userId: string; +}; + +/** + * Answers a question asked in a public Slack message in a private DM to the + * user who asked the question. + * + * This uses the underlying `getAnswerFromSlackHistory` function to answer + * the question, so this is a full RAG implementation. + * + * @param input - The message (public question) to answer. + * @returns The result of the answer. + */ +export async function answerPublicQuestionInPrivate({ + channelId, + question, + threadId, + userId, +}: AnswerPublicQuestionInPrivateInput) { + const questionResult = await isQuestion(question); + + if (!questionResult.ok) { + return questionResult; + } + + // If the question is not actually a question, then we can't answer it and + // we should gracefully exit. + if (!questionResult.data) { + return success({}); + } + + const threadsResult = await getMostRelevantThreads(question, { + exclude: [threadId], // Don't include the thread where question was asked. + threshold: 0.95, // High threshold for high confidence. + topK: 5, + }); + + if (!threadsResult.ok) { + return threadsResult; + } + + const threads = threadsResult.data; + + // If we can't find any relevant threads, then we should gracefully exit + // instead of asking the LLM to answer the question. + if (!threads.length) { + return success({}); + } + + const answerResult = await getAnswerFromSlackHistory(question, threads); + + if (!answerResult.ok) { + return answerResult; + } + + const message = [ + `I saw your question in <#${channelId}>:`, + `>${question}`, + "I'll respond in this thread shortly! 🧵", + ].join(BLANK_LINE); + + // We're doing this synchronously so that we can get the ID of the message + // that was just sent, which is needed in order to "reply" to the thread. + const notificationTs = await sendSlackNotification({ + channel: userId, // Sending a DM, not responding in public thread. + message, + workspace: 'regular', + }); + + job( + 'notification.slack.send', + { + channel: userId, + message: answerResult.data, + threadId: notificationTs, + workspace: 'regular', + }, + { + delay: 1000 * 2, // Give the impression that we're "thinking"... + } + ); + + track({ + application: 'Slack', + event: 'Public Question Answered', + properties: { + '# of Threads Found': threads.length, + Question: question, + Where: 'DM', + }, + }); + + return success({}); +} + type AnswerPublicQuestionInput = { /** * The ID of the channel where the message is located. This is typically @@ -232,6 +363,7 @@ export async function answerPublicQuestion({ const threadsResult = await getMostRelevantThreads(text, { exclude: [threadId], + threshold: 0.98, topK: 5, }); @@ -246,19 +378,15 @@ export async function answerPublicQuestion({ return threadsResult; } - const threads = threadsResult.data - .filter((thread) => { - return thread.score >= 0.98; - }) - .map((thread, i) => { - const date = dayjs(thread.createdAt) - .tz('America/Los_Angeles') - .format("MMM. 'YY"); + const threads = threadsResult.data.map((thread, i) => { + const date = dayjs(thread.createdAt) + .tz('America/Los_Angeles') + .format("MMM. 'YY"); - const uri = `https://colorstack-family.slack.com/archives/${thread.channelId}/p${thread.id}`; + const uri = `https://colorstack-family.slack.com/archives/${thread.channelId}/p${thread.id}`; - return `• <${uri}|*Thread #${i + 1}*> [${date}]`; - }); + return `• <${uri}|*Thread #${i + 1}*> [${date}]`; + }); if (!threads.length) { job('notification.slack.ephemeral.send', { @@ -298,23 +426,6 @@ export async function answerPublicQuestion({ return success({}); } -/** - * Removes all references and replace them with an actual - * Slack message link and the display text (ie: `[1]`, `[2]`, etc). - * - * - * @param text - The text to add thread references to. - * @returns The text with thread references added. - * - * @todo Replace the Slack workspace URL with an environment variable. - */ -function addThreadReferences(text: string): string { - return text.replace( - /(.*?):(.*?):(.*?)<\/thread>/g, - `` - ); -} - /** * Determines if the given text is a question. * @@ -360,20 +471,14 @@ async function isQuestion(question: string): Promise> { * to an LLM with additional instructions for answering. * * @param question - The question to ask. + * @param threads - The most relevant threads to the question. * @returns The answer to the question. */ async function getAnswerFromSlackHistory( - question: string + question: string, + threads: RelevantThread[] ): Promise> { - const threadsResult = await getMostRelevantThreads(question, { - topK: 5, - }); - - if (!threadsResult.ok) { - return threadsResult; - } - - const threads = threadsResult.data.map((thread) => { + const formattedThreads = threads.map((thread) => { const parts = [ '[Relevance Score]: ' + thread.score, '[Timestamp]: ' + thread.createdAt, @@ -389,7 +494,7 @@ async function getAnswerFromSlackHistory( const userPrompt = [ 'Please answer the following question based on the Slack context provided:', `${question}`, - `${threads.join('\n\n')}`, + `${formattedThreads.join('\n\n')}`, ].join('\n'); const systemPrompt = dedent` @@ -481,7 +586,27 @@ async function getAnswerFromSlackHistory( return fail(completionResult); } - return success(completionResult.data); + const answer = completionResult.data; + const answerWithReferences = addThreadReferences(answer); + + return success(answerWithReferences); +} + +/** + * Removes all references and replace them with an actual + * Slack message link and the display text (ie: `[1]`, `[2]`, etc). + * + * + * @param text - The text to add thread references to. + * @returns The text with thread references added. + * + * @todo Replace the Slack workspace URL with an environment variable. + */ +function addThreadReferences(text: string): string { + return text.replace( + /(.*?):(.*?):(.*?)<\/thread>/g, + `` + ); } type GetMostRelevantThreadsOptions = { @@ -493,6 +618,18 @@ type GetMostRelevantThreadsOptions = { */ exclude?: string[]; + /** + * The minimum relevance score to include in the results. This is useful if + * we want to filter out threads that are too low of a relevance score. + * + * Must be between 0 and 1. + * + * @example 0.5 + * @example 0.95 + * @example 0.98 + */ + threshold?: number; + /** * The maximum number of threads to return. Note that this refers to the final * number of threads AFTER reranking, not the initial vector database @@ -602,12 +739,16 @@ async function getMostRelevantThreads( return rerankingResult; } - const threads = rerankingResult.data.map((document) => { - return { - ...messages[document.index], - score: document.relevance_score, - }; - }); + const threads = rerankingResult.data + .map((document) => { + return { + ...messages[document.index], + score: document.relevance_score, + }; + }) + .filter((document) => { + return options.threshold ? document.score >= options.threshold : true; + }); return success(threads); } diff --git a/packages/core/src/modules/slack/slack.worker.ts b/packages/core/src/modules/slack/slack.worker.ts index 25b86228..b651704c 100644 --- a/packages/core/src/modules/slack/slack.worker.ts +++ b/packages/core/src/modules/slack/slack.worker.ts @@ -6,6 +6,7 @@ import { onSlackUserInvited } from '@/modules/slack/events/slack-user-invited'; import { answerChatbotQuestion, answerPublicQuestion, + answerPublicQuestionInPrivate, syncThreadToPinecone, } from '@/modules/slack/slack'; import { updateBirthdatesFromSlack } from '@/modules/slack/use-cases/update-birthdates-from-slack'; @@ -83,6 +84,15 @@ export const slackWorker = registerWorker( .with({ name: 'slack.profile_picture.changed' }, async ({ data }) => { return onSlackProfilePictureChanged(data); }) + .with({ name: 'slack.question.answer.private' }, async ({ data }) => { + const result = await answerPublicQuestionInPrivate(data); + + if (!result.ok) { + throw new Error(result.error); + } + + return result.data; + }) .with({ name: 'slack.reaction.add' }, async ({ data }) => { return addSlackReaction(data); }) diff --git a/packages/core/src/modules/slack/use-cases/add-slack-message.ts b/packages/core/src/modules/slack/use-cases/add-slack-message.ts index 3b0918a3..852d8b0e 100644 --- a/packages/core/src/modules/slack/use-cases/add-slack-message.ts +++ b/packages/core/src/modules/slack/use-cases/add-slack-message.ts @@ -2,6 +2,7 @@ import { db } from '@oyster/db'; import { type GetBullJobData } from '@/infrastructure/bull/bull.types'; import { job } from '@/infrastructure/bull/use-cases/job'; +import { redis } from '@/infrastructure/redis'; import { ErrorWithContext } from '@/shared/errors'; import { retryWithBackoff } from '@/shared/utils/core.utils'; import { getSlackMessage } from '../services/slack-message.service'; @@ -50,6 +51,23 @@ export async function addSlackMessage( action: 'add', threadId: data.threadId || data.id, }); + + // We track channels that are "auto-reply" channels in Redis. If a message is + // sent to one of those channels, we should attempt to answer the question + // using AI in private (DM). + const isAutoReplyChannel = await redis.sismember( + 'slack:auto_reply_channels', + data.channelId + ); + + if (isAutoReplyChannel && !data.threadId) { + job('slack.question.answer.private', { + channelId: data.channelId, + question: data.text as string, + threadId: data.id, + userId: data.userId, + }); + } } async function ensureThreadExistsIfNecessary(