Skip to content

Commit

Permalink
Track state in stream process
Browse files Browse the repository at this point in the history
  • Loading branch information
mtrudel committed Jan 23, 2024
1 parent 067a670 commit 30537fc
Show file tree
Hide file tree
Showing 6 changed files with 252 additions and 149 deletions.
61 changes: 34 additions & 27 deletions lib/bandit/http2/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,21 @@ defmodule Bandit.HTTP2.Adapter do

defstruct stream_transport: nil,
owner_pid: nil,
end_stream: false,
recv_window_size: 65_535,
method: nil,
content_encoding: nil,
status: nil,
metrics: nil,
opts: nil

@typedoc "A struct for backing a Plug.Conn.Adapter"
@type t :: %__MODULE__{
stream_transport: Bandit.HTTP2.StreamTransport.t(),
owner_pid: pid() | nil,
end_stream: boolean(),
recv_window_size: non_neg_integer(),
method: Plug.Conn.method() | nil,
content_encoding: String.t() | nil,
status: Plug.Conn.status() | nil,
metrics: map(),
opts: keyword()
}
Expand All @@ -45,8 +45,6 @@ defmodule Bandit.HTTP2.Adapter do
end

@impl Plug.Conn.Adapter
def read_req_body(%__MODULE__{end_stream: true}, _opts), do: raise(Bandit.BodyAlreadyReadError)

def read_req_body(%__MODULE__{} = adapter, opts) do
validate_calling_process!(adapter)
length = Keyword.get(opts, :length, 8_000_000)
Expand All @@ -56,15 +54,14 @@ defmodule Bandit.HTTP2.Adapter do
adapter.metrics
|> Map.put_new_lazy(:req_body_start_time, &Bandit.Telemetry.monotonic_time/0)

case Bandit.HTTP2.StreamTransport.read_body(adapter.stream_transport, length, timeout) do
case Bandit.HTTP2.StreamTransport.recv_body(adapter.stream_transport, length, timeout) do
{:ok, body, stream_transport} ->
metrics =
metrics
|> Map.update(:req_body_bytes, byte_size(body), &(&1 + byte_size(body)))
|> Map.put(:req_body_end_time, Bandit.Telemetry.monotonic_time())

{:ok, body,
%{adapter | stream_transport: stream_transport, metrics: metrics, end_stream: true}}
{:ok, body, %{adapter | stream_transport: stream_transport, metrics: metrics}}

{:more, body, stream_transport} ->
metrics =
Expand Down Expand Up @@ -118,9 +115,10 @@ defmodule Bandit.HTTP2.Adapter do
headers = if compress, do: [{"vary", "accept-encoding"} | headers], else: headers
body_bytes = IO.iodata_length(body)
headers = Bandit.Headers.add_content_length(headers, body_bytes, status)
adapter = %{adapter | status: status}

adapter =
if body_bytes == 0 || !send_resp_body?(adapter, status) do
if body_bytes == 0 || !send_resp_body?(adapter) do
adapter
|> send_headers(status, headers, true)
else
Expand All @@ -144,9 +142,11 @@ defmodule Bandit.HTTP2.Adapter do
length = if length == :all, do: size - offset, else: length
headers = Bandit.Headers.add_content_length(headers, length, status)

adapter = %{adapter | status: status}

adapter =
cond do
!send_resp_body?(adapter, status) ->
!send_resp_body?(adapter) ->
send_headers(adapter, status, headers, true)

offset + length == size && offset == 0 ->
Expand Down Expand Up @@ -187,7 +187,9 @@ defmodule Bandit.HTTP2.Adapter do
def send_chunked(%__MODULE__{} = adapter, status, headers) do
validate_calling_process!(adapter)

if send_resp_body?(adapter, status) do
adapter = %{adapter | status: status}

if send_resp_body?(adapter) do
{:ok, nil, send_headers(adapter, status, headers, false)}
else
{:ok, nil, send_headers(adapter, status, headers, true)}
Expand All @@ -201,18 +203,19 @@ defmodule Bandit.HTTP2.Adapter do
# details) and closing the stream here carves closest to the underlying HTTP/1.1 behaviour
# (RFC9112§7.1). The whole notion of chunked encoding is moot in HTTP/2 anyway (RFC9113§8.1)
# so this entire section of the API is a bit slanty regardless.
#
# Moreover, if the caller is chunking out on a HEAD, 204 or 304 response, the underlying
# stream will have been closed in send_chunked/3 above, and so this call will return an
# `{:error, :not_owner}` error here (which we ignore, but it's still kinda odd)

validate_calling_process!(adapter)

byte_size = chunk |> IO.iodata_length()
adapter = send_data(adapter, chunk, byte_size == 0)
if send_resp_body?(adapter) do
byte_size = chunk |> IO.iodata_length()
adapter = send_data(adapter, chunk, byte_size == 0)

if byte_size == 0 do
metrics = Map.put(adapter.metrics, :resp_end_time, Bandit.Telemetry.monotonic_time())
{:ok, nil, %{adapter | metrics: metrics}}
if byte_size == 0 do
metrics = Map.put(adapter.metrics, :resp_end_time, Bandit.Telemetry.monotonic_time())
{:ok, nil, %{adapter | metrics: metrics}}
else
{:ok, nil, adapter}
end
else
{:ok, nil, adapter}
end
Expand All @@ -224,7 +227,10 @@ defmodule Bandit.HTTP2.Adapter do
headers = split_cookies(headers)
headers = [{":status", to_string(status)} | headers]

Bandit.HTTP2.StreamTransport.send_headers(adapter.stream_transport, headers, false)
stream_transport =
Bandit.HTTP2.StreamTransport.send_headers(adapter.stream_transport, headers, false)

{:ok, %{adapter | stream_transport: stream_transport}}
end

@impl Plug.Conn.Adapter
Expand All @@ -239,10 +245,10 @@ defmodule Bandit.HTTP2.Adapter do
@impl Plug.Conn.Adapter
def get_http_protocol(%__MODULE__{}), do: :"HTTP/2"

defp send_resp_body?(%{method: "HEAD"}, _status), do: false
defp send_resp_body?(_req, 204), do: false
defp send_resp_body?(_req, 304), do: false
defp send_resp_body?(_req, _status), do: true
defp send_resp_body?(%{method: "HEAD"}), do: false
defp send_resp_body?(%{status: 204}), do: false
defp send_resp_body?(%{status: 304}), do: false
defp send_resp_body?(_req), do: true

defp send_headers(adapter, status, headers, end_stream) do
metrics =
Expand All @@ -261,13 +267,14 @@ defmodule Bandit.HTTP2.Adapter do

headers = [{":status", to_string(status)} | headers]

Bandit.HTTP2.StreamTransport.send_headers(adapter.stream_transport, headers, end_stream)
stream_transport =
Bandit.HTTP2.StreamTransport.send_headers(adapter.stream_transport, headers, end_stream)

%{adapter | metrics: metrics}
%{adapter | stream_transport: stream_transport, metrics: metrics}
end

defp send_data(adapter, data, end_stream) do
{stream_transport, bytes_sent} =
{bytes_sent, stream_transport} =
Bandit.HTTP2.StreamTransport.send_data(adapter.stream_transport, data, end_stream)

metrics =
Expand Down
10 changes: 2 additions & 8 deletions lib/bandit/http2/errors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,10 @@ defmodule Bandit.HTTP2.Errors do
end

defmodule StreamError do
defexception message: nil,
method: nil,
request_target: nil,
error_code: error_codes[:protocol_error]
defexception [:message, :method, :request_target, :error_code]
end

defmodule ConnectionError do
defexception message: nil,
method: nil,
request_target: nil,
error_code: error_codes[:protocol_error]
defexception [:message, :method, :request_target, :error_code]
end
end
7 changes: 3 additions & 4 deletions lib/bandit/http2/stream_process.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,9 @@ defmodule Bandit.HTTP2.StreamProcess do
@spec recv_end_of_stream(pid()) :: :ok | :noconnect | :nosuspend
def recv_end_of_stream(pid), do: send(pid, :end_stream)

# Let the stream process know that the client has reset the stream. This will terminate the
# stream's handling process
# Let the stream process know that the client has reset the stream
@spec recv_rst_stream(pid(), Errors.error_code()) :: true
def recv_rst_stream(pid, error_code), do: Process.exit(pid, {:recv_rst_stream, error_code})
def recv_rst_stream(pid, error_code), do: send(pid, {:rst_stream, error_code})

@impl GenServer
def init({stream_transport, plug, opts, connection_span}) do
Expand Down Expand Up @@ -123,7 +122,7 @@ defmodule Bandit.HTTP2.StreamProcess do
)
end

def terminate({exception, stacktrace}, state) when is_exception(exception) do
def terminate({exception, stacktrace}, state) do
Bandit.Telemetry.span_exception(state.span, :exit, exception, stacktrace)
StreamTransport.send_rst_stream(state.stream_transport, Errors.internal_error())
end
Expand Down
Loading

0 comments on commit 30537fc

Please sign in to comment.