Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ClientRequest): support 100 continue flow #599

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 21 additions & 10 deletions src/interceptors/ClientRequest/MockHttpSocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import { Readable } from 'node:stream'
import { invariant } from 'outvariant'
import { INTERNAL_REQUEST_ID_HEADER_NAME } from '../../Interceptor'
import { MockSocket } from '../Socket/MockSocket'
import type { NormalizedSocketWriteArgs } from '../Socket/utils/normalizeSocketWriteArgs'
import { type NormalizedSocketWriteArgs } from '../Socket/utils/normalizeSocketWriteArgs'
import { isPropertyAccessible } from '../../utils/isPropertyAccessible'
import { baseUrlFromConnectionOptions } from '../Socket/utils/baseUrlFromConnectionOptions'
import { parseRawHeaders } from '../Socket/utils/parseRawHeaders'
import {
createResponse,
createServerErrorResponse,
RESPONSE_STATUS_CODES_WITHOUT_BODY,
isResponseWithoutBody,
} from '../../utils/responseUtils'
import { createRequestId } from '../../createRequestId'
import { getRawFetchHeaders } from './utils/recordRawHeaders'
Expand Down Expand Up @@ -128,7 +129,9 @@ export class MockHttpSocket extends MockSocket {
const emitEvent = super.emit.bind(this, event as any, ...args)

if (this.responseListenersPromise) {
this.responseListenersPromise.finally(emitEvent)
this.responseListenersPromise.finally(() => {
emitEvent()
})
return this.listenerCount(event) > 0
}

Expand Down Expand Up @@ -517,13 +520,20 @@ export class MockHttpSocket extends MockSocket {
'Failed to write to a request stream: stream does not exist'
)

console.log(
'REQ BODY!',
chunk.toString('utf8'),
this.request?.headers.has('expect')
)
this.requestStream.push(chunk)
}

private onRequestEnd(): void {
// console.log('[MHS] REQ END', this.writableFinished)

// Request end can be called for requests without body.
if (this.requestStream) {
this.requestStream.push(null)
this.requestStream.push('\r\n')
}
}

Expand All @@ -537,22 +547,23 @@ export class MockHttpSocket extends MockSocket {
statusText
) => {
const headers = parseRawHeaders(rawHeaders)
const canHaveBody = !RESPONSE_STATUS_CODES_WITHOUT_BODY.has(status)

// Similarly, create a new stream for each response.
if (canHaveBody) {
this.responseStream = new Readable({ read() {} })
}
this.responseStream = isResponseWithoutBody(status)
? undefined
: new Readable({ read() {} })

const response = new Response(
const response = createResponse(
/**
* @note The Fetch API response instance exposed to the consumer
* is created over the response stream of the HTTP parser. It is NOT
* related to the Socket instance. This way, you can read response body
* in response listener while the Socket instance delays the emission
* of "end" and other events until those response listeners are finished.
*/
canHaveBody ? (Readable.toWeb(this.responseStream!) as any) : null,
this.responseStream
? (Readable.toWeb(this.responseStream) as ReadableStream<any>)
: null,
{
status,
statusText,
Expand Down
29 changes: 29 additions & 0 deletions src/utils/responseUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,35 @@ export function createServerErrorResponse(body: unknown): Response {
)
}

/**
* Creates a Fetch API `Response` instance.
* Unlike the `Response` constructor, this function supports
* non-configurable status codes (e.g. 101).
*/
export function createResponse(
bodyInit?: BodyInit | null,
init?: ResponseInit
): Response {
const status = init?.status || 200
const isAllowedStatus = status >= 200
const body = isResponseWithoutBody(status) ? null : bodyInit

const response = new Response(body, {
...init,
status: isAllowedStatus ? status : 428,
})

if (!isAllowedStatus) {
Object.defineProperty(response, 'status', {
value: status,
enumerable: true,
writable: false,
})
}

return response
}

export type ResponseError = Response & { type: 'error' }

/**
Expand Down
103 changes: 103 additions & 0 deletions test/modules/http/compliance/http-request-continue.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// @vitest-environment node
import { vi, it, expect, beforeAll, afterEach, afterAll } from 'vitest'
import http from 'http'
import { HttpServer } from '@open-draft/test-server/http'
import { ClientRequestInterceptor } from '../../../../src/interceptors/ClientRequest'
import { waitForClientRequest } from '../../../helpers'

const interceptor = new ClientRequestInterceptor()

const httpServer = new HttpServer((app) => {
app.post('/resource', (req, res) => {
req.on('data', (chunk) =>
console.log('[server] req data:', chunk.toString())
)
console.log('!!![server] added req.on(data)')

req.pipe(res)
})
})

beforeAll(async () => {
interceptor.apply()
await httpServer.listen()
})

afterEach(() => {
interceptor.removeAllListeners()
})

afterAll(async () => {
interceptor.dispose()
await httpServer.close()
})

it('emits "continue" event for a request with "100-continue" expect header', async () => {
interceptor
.on('request', ({ request }) => {
console.log('[*] request', request.method, request.url)
})
.on('response', ({ request, response }) => {
console.log('[*] response', response.status, request.method, request.url)
})

const request = http.request(httpServer.http.url('/resource'), {
method: 'POST',
headers: {
expect: '100-continue',
},
})

const continueListener = vi.fn()
request.on('continue', continueListener)
request.on('continue', () => {
console.log('REQ CONTINUE')
console.log('REQ END')

console.log('!!!! writing request...')
request.end('hello')
})
request.on('finish', () => console.log('REQ FINISH'))
request.on('response', () => console.log('REQ RESPONSE'))

request.on('socket', (socket) => {
socket.write = new Proxy(socket.write, {
apply(target, thisArg, args) {
console.log('SOCKET WRITE', args[0].toString())
return Reflect.apply(target, thisArg, args)
},
})
socket.push = new Proxy(socket.push, {
apply(target, thisArg, args) {
console.log('SOCKET PUSH', args[0].toString())
return Reflect.apply(target, thisArg, args)
},
})
socket.emit = new Proxy(socket.emit, {
apply(target, thisArg, args) {
console.log(
'SOCKET EMIT',
args[0] === 'data' ? ['data', args[1].toString()] : args
)
return Reflect.apply(target, thisArg, args)
},
})

socket.on('connect', () => console.log('SOCKET CONNECT'))
// socket.on('data', (chunk) =>
// console.log('SOCKET DATA:\n', chunk.toString())
// )
socket.on('finish', () => console.log('SOCKET FINISH'))
socket.on('close', () => console.log('SOCKET CLOSE'))
socket.on('error', () => console.log('SOCKET ERROR!!!'))
socket.on('end', () => console.log('SOCKET END'))
})

const { res, text } = await waitForClientRequest(request)

expect(res.statusCode).toBe(200)
await expect(text()).resolves.toBe('hello')
expect(continueListener).toHaveBeenCalledOnce()
})

it.todo('emits "continue" event for a ')
Loading