From 4098d9890173f4d6c6512f2d8994eebbf12b5e13 Mon Sep 17 00:00:00 2001 From: Matthieu Sieben Date: Tue, 1 Oct 2024 10:43:15 +0200 Subject: [PATCH] Default to unencoded responses (#2834) * Allow defaulting to unencoded responses when proxying client requests that do not specify accept-encoding * fix content-encoding negotiation --- .changeset/dirty-foxes-bow.md | 5 + .changeset/happy-islands-wash.md | 5 + packages/common-web/src/util.ts | 35 ++- packages/common/src/streams.ts | 25 +- packages/pds/src/config/config.ts | 10 + packages/pds/src/config/env.ts | 2 + packages/pds/src/pipethrough.ts | 223 ++++++++++-------- .../tests/proxied/read-after-write.test.ts | 77 ++++++ 8 files changed, 274 insertions(+), 108 deletions(-) create mode 100644 .changeset/dirty-foxes-bow.md create mode 100644 .changeset/happy-islands-wash.md diff --git a/.changeset/dirty-foxes-bow.md b/.changeset/dirty-foxes-bow.md new file mode 100644 index 00000000000..9c4899b1143 --- /dev/null +++ b/.changeset/dirty-foxes-bow.md @@ -0,0 +1,5 @@ +--- +"@atproto/pds": patch +--- + +Allow defaulting to unencoded responses when proxying client requests that do not specify accept-encoding diff --git a/.changeset/happy-islands-wash.md b/.changeset/happy-islands-wash.md new file mode 100644 index 00000000000..88b68b345b4 --- /dev/null +++ b/.changeset/happy-islands-wash.md @@ -0,0 +1,5 @@ +--- +"@atproto/common": patch +--- + +Allow contentEncoding to be an array for consistency with typing of headers diff --git a/packages/common-web/src/util.ts b/packages/common-web/src/util.ts index 2887896d634..3082b045b78 100644 --- a/packages/common-web/src/util.ts +++ b/packages/common-web/src/util.ts @@ -9,19 +9,34 @@ export const noUndefinedVals = ( return obj as Record } +/** + * Returns a shallow copy of the object without the specified keys. If the input + * is nullish, it returns the input. + */ export function omit< - T extends undefined | Record, + T extends undefined | null | Record, K extends keyof NonNullable, ->(obj: T, keys: readonly K[]): T extends undefined ? undefined : Omit +>( + object: T, + rejectedKeys: readonly K[], +): T extends undefined ? undefined : T extends null ? null : Omit export function omit( - obj: Record, - keys: readonly string[], -): undefined | Record { - if (!obj) return obj - - return Object.fromEntries( - Object.entries(obj).filter((entry) => !keys.includes(entry[0])), - ) + src: undefined | null | Record, + rejectedKeys: readonly string[], +): undefined | null | Record { + // Hot path + + if (!src) return src + + const dst = {} + const srcKeys = Object.keys(src) + for (let i = 0; i < srcKeys.length; i++) { + const key = srcKeys[i] + if (!rejectedKeys.includes(key)) { + dst[key] = src[key] + } + } + return dst } export const jitter = (maxMs: number) => { diff --git a/packages/common/src/streams.ts b/packages/common/src/streams.ts index d5154fd6c29..c5400f4a2ba 100644 --- a/packages/common/src/streams.ts +++ b/packages/common/src/streams.ts @@ -88,15 +88,15 @@ export class MaxSizeChecker extends Transform { export function decodeStream( stream: Readable, - contentEncoding?: string, + contentEncoding?: string | string[], ): Readable export function decodeStream( stream: AsyncIterable, - contentEncoding?: string, + contentEncoding?: string | string[], ): AsyncIterable | Readable export function decodeStream( stream: Readable | AsyncIterable, - contentEncoding?: string, + contentEncoding?: string | string[], ): Readable | AsyncIterable { const decoders = createDecoders(contentEncoding) if (decoders.length === 0) return stream @@ -106,14 +106,23 @@ export function decodeStream( /** * Create a series of decoding streams based on the content-encoding header. The * resulting streams should be piped together to decode the content. + * + * @see {@link https://datatracker.ietf.org/doc/html/rfc9110#section-8.4.1} */ -export function createDecoders(contentEncoding?: string): Duplex[] { +export function createDecoders(contentEncoding?: string | string[]): Duplex[] { const decoders: Duplex[] = [] - if (contentEncoding) { - const encodings = contentEncoding.split(',') + if (contentEncoding?.length) { + const encodings: string[] = Array.isArray(contentEncoding) + ? contentEncoding.flatMap(commaSplit) + : contentEncoding.split(',') for (const encoding of encodings) { const normalizedEncoding = normalizeEncoding(encoding) + + // @NOTE + // > The default (identity) encoding [...] is used only in the + // > Accept-Encoding header, and SHOULD NOT be used in the + // > Content-Encoding header. if (normalizedEncoding === 'identity') continue decoders.push(createDecoder(normalizedEncoding)) @@ -123,6 +132,10 @@ export function createDecoders(contentEncoding?: string): Duplex[] { return decoders.reverse() } +function commaSplit(header: string): string[] { + return header.split(',') +} + function normalizeEncoding(encoding: string) { // https://www.rfc-editor.org/rfc/rfc7231#section-3.1.2.1 // > All content-coding values are case-insensitive... diff --git a/packages/pds/src/config/config.ts b/packages/pds/src/config/config.ts index d31eadfcb52..2d901f58ec6 100644 --- a/packages/pds/src/config/config.ts +++ b/packages/pds/src/config/config.ts @@ -246,6 +246,7 @@ export const envToCfg = (env: ServerEnvironment): ServerConfig => { headersTimeout: env.proxyHeadersTimeout ?? 10e3, bodyTimeout: env.proxyBodyTimeout ?? 30e3, maxResponseSize: env.proxyMaxResponseSize ?? 10 * 1024 * 1024, // 10mb + preferCompressed: env.proxyPreferCompressed ?? false, } const oauthCfg: ServerConfig['oauth'] = entrywayCfg @@ -413,6 +414,15 @@ export type ProxyConfig = { headersTimeout: number bodyTimeout: number maxResponseSize: number + + /** + * When proxying requests that might get intercepted (for read-after-write) we + * negotiate the encoding based on the client's preferences. We will however + * use or own weights in order to be able to better control if the PDS will + * need to perform content decoding. This settings allows to prefer compressed + * content over uncompressed one. + */ + preferCompressed: boolean } export type OAuthConfig = { diff --git a/packages/pds/src/config/env.ts b/packages/pds/src/config/env.ts index feca40720bc..0c44e1fdc8d 100644 --- a/packages/pds/src/config/env.ts +++ b/packages/pds/src/config/env.ts @@ -128,6 +128,7 @@ export const readEnv = (): ServerEnvironment => { proxyHeadersTimeout: envInt('PDS_PROXY_HEADERS_TIMEOUT'), proxyBodyTimeout: envInt('PDS_PROXY_BODY_TIMEOUT'), proxyMaxResponseSize: envInt('PDS_PROXY_MAX_RESPONSE_SIZE'), + proxyPreferCompressed: envBool('PDS_PROXY_PREFER_COMPRESSED'), } } @@ -253,4 +254,5 @@ export type ServerEnvironment = { proxyHeadersTimeout?: number proxyBodyTimeout?: number proxyMaxResponseSize?: number + proxyPreferCompressed?: boolean } diff --git a/packages/pds/src/pipethrough.ts b/packages/pds/src/pipethrough.ts index 528a35b8ecb..285320d22c6 100644 --- a/packages/pds/src/pipethrough.ts +++ b/packages/pds/src/pipethrough.ts @@ -60,7 +60,7 @@ export const proxyHandler = (ctx: AppContext): CatchallHandler => { const { url: origin, did: aud } = await parseProxyInfo(ctx, req, lxm) const headers: IncomingHttpHeaders = { - 'accept-encoding': req.headers['accept-encoding'], + 'accept-encoding': req.headers['accept-encoding'] || 'identity', 'accept-language': req.headers['accept-language'], 'atproto-accept-labelers': req.headers['atproto-accept-labelers'], 'x-bsky-topics': req.headers['x-bsky-topics'], @@ -102,6 +102,20 @@ export const proxyHandler = (ctx: AppContext): CatchallHandler => { } } +const ACCEPT_ENCODING_COMPRESSED = [ + ['gzip', { q: 1.0 }], + ['deflate', { q: 0.9 }], + ['br', { q: 0.8 }], + ['identity', { q: 0.1 }], +] as const satisfies Accept[] + +const ACCEPT_ENCODING_UNCOMPRESSED = [ + ['identity', { q: 1.0 }], + ['gzip', { q: 0.3 }], + ['deflate', { q: 0.2 }], + ['br', { q: 0.1 }], +] as const satisfies Accept[] + export type PipethroughOptions = { /** * Specify the issuer (requester) for service auth. If not provided, no @@ -122,28 +136,17 @@ export type PipethroughOptions = { lxm?: string } -// List of content encodings that are supported by the PDS. Because proxying -// occurs between data centers, where connectivity is supposedly stable & good, -// and because payloads are small, we prefer encoding that are fast (gzip, -// deflate, identity) over heavier encodings (Brotli). Upstream servers should -// be configured to prefer any encoding over identity in case of big, -// uncompressed payloads. -const SUPPORTED_ENCODINGS = [ - ['gzip', { q: '1.0' }], - ['deflate', { q: '0.9' }], - ['identity', { q: '0.3' }], - ['br', { q: '0.1' }], -] as const satisfies Accept[] - export async function pipethrough( ctx: AppContext, req: express.Request, options?: PipethroughOptions, -): Promise<{ - stream: Readable - headers: Record - encoding: string -}> { +): Promise< + HandlerPipeThroughStream & { + stream: Readable + headers: Record + encoding: string + } +> { if (req.method !== 'GET' && req.method !== 'HEAD') { // pipethrough() is used from within xrpcServer handlers, which means that // the request body either has been parsed or is a readable stream that has @@ -160,32 +163,31 @@ export async function pipethrough( const { url: origin, did: aud } = await parseProxyInfo(ctx, req, lxm) - // Because we sometimes need to interpret the response (e.g. during - // read-after-write, through asPipeThroughBuffer()), we need to ask the - // upstream server for an encoding that both the requester and the PDS can - // understand. - const acceptEncoding = negotiateAccept( - req.headers['accept-encoding'], - SUPPORTED_ENCODINGS, - ) - - const headers: IncomingHttpHeaders = { - 'accept-language': req.headers['accept-language'], - 'atproto-accept-labelers': req.headers['atproto-accept-labelers'], - 'x-bsky-topics': req.headers['x-bsky-topics'], - - 'accept-encoding': `${formatAccepted(acceptEncoding)}, *;q=0`, // Reject anything else (q=0) - - authorization: options?.iss - ? `Bearer ${await ctx.serviceAuthJwt(options.iss, options.aud ?? aud, options.lxm ?? lxm)}` - : undefined, - } - const dispatchOptions: Dispatcher.RequestOptions = { origin, method: req.method, path: req.originalUrl, - headers, + headers: { + 'accept-language': req.headers['accept-language'], + 'atproto-accept-labelers': req.headers['atproto-accept-labelers'], + 'x-bsky-topics': req.headers['x-bsky-topics'], + + // Because we sometimes need to interpret the response (e.g. during + // read-after-write, through asPipeThroughBuffer()), we need to ask the + // upstream server for an encoding that both the requester and the PDS can + // understand. Since we might have to do the decoding ourselves, we will + // use our own preferences (and weight) to negotiate the encoding. + 'accept-encoding': negotiateContentEncoding( + req.headers['accept-encoding'], + ctx.cfg.proxy.preferCompressed + ? ACCEPT_ENCODING_COMPRESSED + : ACCEPT_ENCODING_UNCOMPRESSED, + ), + + authorization: options?.iss + ? `Bearer ${await ctx.serviceAuthJwt(options.iss, options.aud ?? aud, options.lxm ?? lxm)}` + : undefined, + }, // Use a high water mark to buffer more data while performing async // operations before this stream is consumed. This is especially useful @@ -193,14 +195,13 @@ export async function pipethrough( highWaterMark: 2 * 65536, // twice the default (64KiB) } - const upstream = await pipethroughRequest(ctx, dispatchOptions) + const { headers, body } = await pipethroughRequest(ctx, dispatchOptions) return { - stream: upstream.body, - headers: Object.fromEntries(responseHeaders(upstream.headers)), - encoding: - safeString(upstream.headers['content-type']) ?? 'application/json', - } satisfies HandlerPipeThroughStream + encoding: safeString(headers['content-type']) ?? 'application/json', + headers: Object.fromEntries(responseHeaders(headers)), + stream: body, + } } // Request setup/formatting @@ -367,80 +368,118 @@ function handleUpstreamRequestError( // Request parsing/forwarding // ------------------- -type Accept = [name: string, flags: Record] +type AcceptFlags = { q: number } +type Accept = [name: string, flags: AcceptFlags] -function negotiateAccept( +// accept-encoding defaults to "identity with lowest priority" +const ACCEPT_ENC_DEFAULT = ['identity', { q: 0.001 }] as const satisfies Accept +const ACCEPT_FORBID_STAR = ['*', { q: 0 }] as const satisfies Accept + +function negotiateContentEncoding( acceptHeader: undefined | string | string[], - supported: readonly Accept[], -): readonly Accept[] { - // Optimization: if no accept-encoding header is present, skip negotiation - if (!acceptHeader?.length) { - return supported + preferences: readonly Accept[], +): string { + const acceptMap = Object.fromEntries( + parseAcceptEncoding(acceptHeader), + ) + + // Make sure the default (identity) is covered by the preferences + if (!preferences.some(coversIdentityAccept)) { + preferences = [...preferences, ACCEPT_ENC_DEFAULT] } - const acceptNames = extractAcceptedNames(acceptHeader) - const common = acceptNames.includes('*') - ? supported - : supported.filter(nameIncludedIn, acceptNames) + const common = preferences.filter(([name]) => { + const acceptQ = (acceptMap[name] ?? acceptMap['*'])?.q + // Per HTTP/1.1, "identity" is always acceptable unless explicitly rejected + if (name === 'identity') { + return acceptQ == null || acceptQ > 0 + } else { + return acceptQ != null && acceptQ > 0 + } + }) + + // Since "identity" was present in the preferences, a missing "identity" in + // the common array means that the client explicitly rejected it. Let's reflect + // this by adding it to the common array. + if (!common.some(coversIdentityAccept)) { + common.push(ACCEPT_FORBID_STAR) + } - // There must be at least one common encoding with a non-zero q value - if (!common.some(isNotRejected)) { + // If no common encodings are acceptable, throw a 406 Not Acceptable error + if (!common.some(isAllowedAccept)) { throw new XRPCServerError( ResponseType.NotAcceptable, 'this service does not support any of the requested encodings', ) } - return common + return formatAcceptHeader(common as [Accept, ...Accept[]]) } -function formatAccepted(accept: readonly Accept[]): string { - return accept.map(formatEncodingDev).join(', ') +function coversIdentityAccept([name]: Accept): boolean { + return name === 'identity' || name === '*' } -function formatEncodingDev([enc, flags]: Accept): string { - let ret = enc - for (const name in flags) ret += `;${name}=${flags[name]}` - return ret +function isAllowedAccept([, flags]: Accept): boolean { + return flags.q > 0 } -function nameIncludedIn(this: readonly string[], accept: Accept): boolean { - return this.includes(accept[0]) +/** + * @see {@link https://developer.mozilla.org/en-US/docs/Glossary/Quality_values} + */ +function formatAcceptHeader(accept: readonly [Accept, ...Accept[]]): string { + return accept.map(formatAcceptPart).join(',') } -function isNotRejected(accept: Accept): boolean { - return accept[1]['q'] !== '0' +function formatAcceptPart([name, flags]: Accept): string { + return `${name};q=${flags.q}` } -function extractAcceptedNames( - acceptHeader: undefined | string | string[], -): string[] { - if (!acceptHeader?.length) { - return ['*'] - } +function parseAcceptEncoding( + acceptEncodings: undefined | string | string[], +): Accept[] { + if (!acceptEncodings?.length) return [] - return Array.isArray(acceptHeader) - ? acceptHeader.flatMap(extractAcceptedNames) - : acceptHeader.split(',').map(extractAcceptedName).filter(isNonNullable) + return Array.isArray(acceptEncodings) + ? acceptEncodings.flatMap(parseAcceptEncoding) + : acceptEncodings.split(',').map(parseAcceptEncodingDefinition) } -function extractAcceptedName(def: string): string | undefined { - // No need to fully parse since we only care about allowed values - const parts = def.split(';') - if (parts.some(isQzero)) return undefined - return parts[0].trim() -} +function parseAcceptEncodingDefinition(def: string): Accept { + const { length, 0: encoding, 1: params } = def.trim().split(';', 3) -function isQzero(def: string): boolean { - return def.trim() === 'q=0' -} + if (length > 2) { + throw new InvalidRequestError(`Invalid accept-encoding: "${def}"`) + } + + if (!encoding || encoding.includes('=')) { + throw new InvalidRequestError(`Invalid accept-encoding: "${def}"`) + } + + const flags = { q: 1 } + if (length === 2) { + const { length, 0: key, 1: value } = params.split('=', 3) + if (length !== 2) { + throw new InvalidRequestError(`Invalid accept-encoding: "${def}"`) + } + + if (key === 'q' || key === 'Q') { + const q = parseFloat(value) + if (q === 0 || (Number.isFinite(q) && q <= 1 && q >= 0.001)) { + flags.q = q + } else { + throw new InvalidRequestError(`Invalid accept-encoding: "${def}"`) + } + } else { + throw new InvalidRequestError(`Invalid accept-encoding: "${def}"`) + } + } -function isNonNullable(val: T): val is NonNullable { - return val != null + return [encoding.toLowerCase(), flags] } export function isJsonContentType(contentType?: string): boolean | undefined { - if (contentType == null) return undefined + if (!contentType) return undefined return /application\/(?:\w+\+)?json/i.test(contentType) } diff --git a/packages/pds/tests/proxied/read-after-write.test.ts b/packages/pds/tests/proxied/read-after-write.test.ts index 2f3f94da3ed..23888ae4fc8 100644 --- a/packages/pds/tests/proxied/read-after-write.test.ts +++ b/packages/pds/tests/proxied/read-after-write.test.ts @@ -1,6 +1,7 @@ import util from 'node:util' import assert from 'node:assert' import { AtpAgent } from '@atproto/api' +import { request } from 'undici' import { TestNetwork, SeedClient, RecordRef } from '@atproto/dev-env' import basicSeed from '../seeds/basic' import { ThreadViewPost } from '../../src/lexicon/types/app/bsky/feed/defs' @@ -266,4 +267,80 @@ describe('proxy read after write', () => { const parsed = parseInt(lag) expect(parsed > 0).toBe(true) }) + + it('negotiates encoding', async () => { + const identity = await agent.api.app.bsky.feed.getTimeline( + {}, + { headers: { ...sc.getHeaders(alice), 'accept-encoding': 'identity' } }, + ) + expect(identity.headers['content-encoding']).toBeUndefined() + + const gzip = await agent.api.app.bsky.feed.getTimeline( + {}, + { + headers: { ...sc.getHeaders(alice), 'accept-encoding': 'gzip, *;q=0' }, + }, + ) + expect(gzip.headers['content-encoding']).toBe('gzip') + }) + + it('defaults to identity encoding', async () => { + // Not using the "agent" because "fetch()" will add "accept-encoding: gzip, + // deflate" if not "accept-encoding" header is provided + const res = await request( + new URL(`/xrpc/app.bsky.feed.getTimeline`, agent.dispatchUrl), + { + headers: { ...sc.getHeaders(alice) }, + }, + ) + expect(res.statusCode).toBe(200) + expect(res.headers['content-encoding']).toBeUndefined() + }) + + it('falls back to identity encoding', async () => { + const invalid = await agent.api.app.bsky.feed.getTimeline( + {}, + { headers: { ...sc.getHeaders(alice), 'accept-encoding': 'invalid' } }, + ) + + expect(invalid.headers['content-encoding']).toBeUndefined() + }) + + it('errors when failing to negotiate encoding', async () => { + await expect( + agent.api.app.bsky.feed.getTimeline( + {}, + { + headers: { + ...sc.getHeaders(alice), + 'accept-encoding': 'invalid, *;q=0', + }, + }, + ), + ).rejects.toThrow( + expect.objectContaining({ + status: 406, + message: 'this service does not support any of the requested encodings', + }), + ) + }) + + it('errors on invalid content-encoding format', async () => { + await expect( + agent.api.app.bsky.feed.getTimeline( + {}, + { + headers: { + ...sc.getHeaders(alice), + 'accept-encoding': ';q=1', + }, + }, + ), + ).rejects.toThrow( + expect.objectContaining({ + status: 400, + message: 'Invalid accept-encoding: ";q=1"', + }), + ) + }) })