Skip to content

Commit

Permalink
Move reset handling into stream process
Browse files Browse the repository at this point in the history
  • Loading branch information
mtrudel committed Jan 13, 2024
1 parent 23d302a commit d27d690
Show file tree
Hide file tree
Showing 7 changed files with 69 additions and 32 deletions.
4 changes: 4 additions & 0 deletions lib/bandit/http2/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -349,4 +349,8 @@ defmodule Bandit.HTTP2.Adapter do
raise "Adapter functions may only be called by the stream owner"
end
end

def send_rst_stream(adapter, error_code) do
GenServer.call(adapter.connection, {:send_rst_stream, adapter.stream_id, error_code})
end
end
21 changes: 12 additions & 9 deletions lib/bandit/http2/connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -460,17 +460,20 @@ defmodule Bandit.HTTP2.Connection do
end
end

@spec stream_terminated(pid(), term(), Socket.t(), t()) :: {:ok, t()} | {:error, term()}
def stream_terminated(pid, reason, socket, connection) do
@spec send_rst_stream(Stream.stream_id(), Errors.error_code(), Socket.t(), t()) :: :ok
def send_rst_stream(stream_id, error_code, socket, connection) do
_ =
%Frame.RstStream{stream_id: stream_id, error_code: error_code}
|> send_frame(socket, connection)

:ok
end

@spec stream_terminated(pid(), term(), t()) :: {:ok, t()} | {:error, term()}
def stream_terminated(pid, reason, connection) do
with {:ok, stream} <- StreamCollection.get_active_stream_by_pid(connection.streams, pid),
{:ok, stream, error_code} <- Stream.stream_terminated(stream, reason),
{:ok, stream} <- Stream.stream_terminated(stream, reason),
{:ok, streams} <- StreamCollection.put_stream(connection.streams, stream) do
_ =
if !is_nil(error_code) do
%Frame.RstStream{stream_id: stream.stream_id, error_code: error_code}
|> send_frame(socket, connection)
end

{:ok, %{connection | streams: streams}}
end
end
Expand Down
7 changes: 6 additions & 1 deletion lib/bandit/http2/handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,13 @@ defmodule Bandit.HTTP2.Handler do
end
end

def handle_call({:send_rst_stream, stream_id, error_code}, _from, {socket, state}) do
Connection.send_rst_stream(stream_id, error_code, socket, state.connection)
{:reply, :ok, {socket, state}, socket.read_timeout}
end

def handle_info({:EXIT, pid, reason}, {socket, state}) do
case Connection.stream_terminated(pid, reason, socket, state.connection) do
case Connection.stream_terminated(pid, reason, state.connection) do
{:ok, connection} ->
{:noreply, {socket, %{state | connection: connection}}, socket.read_timeout}

Expand Down
12 changes: 6 additions & 6 deletions lib/bandit/http2/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -246,18 +246,18 @@ defmodule Bandit.HTTP2.Stream do
:ok
end

@spec stream_terminated(t(), term()) :: {:ok, t(), Errors.error_code() | nil}
@spec stream_terminated(t(), term()) :: {:ok, t()}
def stream_terminated(%__MODULE__{state: :closed} = stream, :normal) do
# In the normal case, stop telemetry is emitted by the stream process to keep the main
# connection process unblocked. In error cases we send from here, however, since there are
# many error cases which never involve the stream process at all
{:ok, %{stream | state: :closed, pid: nil}, nil}
{:ok, %{stream | state: :closed, pid: nil}}
end

def stream_terminated(%__MODULE__{} = stream, {:bandit, reason}) do
Bandit.Telemetry.stop_span(stream.span, %{}, %{error: reason})
Logger.warning("Stream #{stream.stream_id} was killed by bandit (#{reason})")
{:ok, %{stream | state: :closed, pid: nil}, nil}
{:ok, %{stream | state: :closed, pid: nil}}
end

def stream_terminated(%__MODULE__{} = stream, {%StreamError{} = error, _}) do
Expand All @@ -269,12 +269,12 @@ defmodule Bandit.HTTP2.Stream do
})

Logger.warning("Stream #{stream.stream_id} encountered a stream error (#{inspect(error)})")
{:ok, %{stream | state: :closed, pid: nil}, Errors.protocol_error()}
{:ok, %{stream | state: :closed, pid: nil}}
end

def stream_terminated(%__MODULE__{} = stream, :normal) do
Logger.warning("Stream #{stream.stream_id} completed in unexpected state #{stream.state}")
{:ok, %{stream | state: :closed, pid: nil}, Errors.no_error()}
{:ok, %{stream | state: :closed, pid: nil}}
end

def stream_terminated(%__MODULE__{} = stream, reason) do
Expand All @@ -288,6 +288,6 @@ defmodule Bandit.HTTP2.Stream do

Logger.error("Process for stream #{stream.stream_id} crashed with #{inspect(reason)}")

{:ok, %{stream | state: :closed, pid: nil}, Errors.internal_error()}
{:ok, %{stream | state: :closed, pid: nil}}
end
end
34 changes: 31 additions & 3 deletions lib/bandit/http2/stream_process.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ defmodule Bandit.HTTP2.StreamProcess do

use GenServer, restart: :temporary

alias Bandit.HTTP2.{Adapter, Errors, Stream}

# A stream process can be created only once we have an adapter & set of headers. Pass them in
# at creation time to ensure this invariant
@spec start_link(
Expand All @@ -25,7 +27,13 @@ defmodule Bandit.HTTP2.StreamProcess do
Bandit.Telemetry.t()
) :: GenServer.on_start()
def start_link(req, transport_info, headers, plug, span) do
GenServer.start_link(__MODULE__, {req, transport_info, headers, plug, span})
GenServer.start_link(__MODULE__, %{
req: req,
transport_info: transport_info,
headers: headers,
plug: plug,
span: span
})
end

# Let the stream process know that body data has arrived from the client. The other half of this
Expand All @@ -43,11 +51,19 @@ defmodule Bandit.HTTP2.StreamProcess do
@spec recv_rst_stream(pid(), Bandit.HTTP2.Errors.error_code()) :: true
def recv_rst_stream(pid, error_code), do: Process.exit(pid, {:recv_rst_stream, error_code})

@impl GenServer
def init(state) do
{:ok, state, {:continue, :run}}
end

def handle_continue(:run, {req, transport_info, all_headers, plug, span}) do
@impl GenServer
def handle_continue(:run, %{
req: req,
transport_info: transport_info,
headers: all_headers,
plug: plug,
span: span
}) do
req = %{req | owner_pid: self()}

with {:ok, request_target} <- build_request_target(all_headers),
Expand Down Expand Up @@ -77,7 +93,8 @@ defmodule Bandit.HTTP2.StreamProcess do
status: conn.status
})

{:stop, :normal, {req, transport_info, all_headers, plug, span}}
{:stop, :normal,
%{req: req, transport_info: transport_info, headers: all_headers, plug: plug, span: span}}
else
{:error, reason} ->
raise Bandit.HTTP2.Stream.StreamError,
Expand Down Expand Up @@ -194,4 +211,15 @@ defmodule Bandit.HTTP2.StreamProcess do
combined_cookie = Enum.map_join(crumbs, "; ", fn {"cookie", crumb} -> crumb end)
[{"cookie", combined_cookie} | other_headers]
end

@impl GenServer
def terminate(:normal, _state), do: :ok

def terminate({%Stream.StreamError{}, _stacktrace}, state) do
Adapter.send_rst_stream(state.req, Errors.protocol_error())
end

def terminate(_reason, state) do
Adapter.send_rst_stream(state.req, Errors.internal_error())
end
end
10 changes: 10 additions & 0 deletions test/bandit/http2/plug_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,8 @@ defmodule HTTP2PlugTest do

assert {:error, %Mint.HTTPError{reason: {:server_closed_request, :internal_error}}} =
response

Process.sleep(100)
end)

assert errors =~
Expand All @@ -341,6 +343,8 @@ defmodule HTTP2PlugTest do

assert {:error, %Mint.HTTPError{reason: {:server_closed_request, :internal_error}}} =
response

Process.sleep(100)
end)

assert errors =~
Expand All @@ -356,6 +360,8 @@ defmodule HTTP2PlugTest do

assert {:error, %Mint.HTTPError{reason: {:server_closed_request, :internal_error}}} =
response

Process.sleep(100)
end)

assert errors =~
Expand Down Expand Up @@ -405,6 +411,8 @@ defmodule HTTP2PlugTest do

assert {:error, %Mint.HTTPError{reason: {:server_closed_request, :internal_error}}} =
response

Process.sleep(100)
end)

assert errors =~
Expand Down Expand Up @@ -894,6 +902,8 @@ defmodule HTTP2PlugTest do

Req.get(context.req, url: "/raise_error")

Process.sleep(100)

assert Bandit.TelemetryCollector.get_events(collector_pid)
~> [
{[:bandit, :request, :exception], %{monotonic_time: integer()},
Expand Down
13 changes: 0 additions & 13 deletions test/bandit/http2/protocol_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -1565,19 +1565,6 @@ defmodule HTTP2ProtocolTest do
end

describe "RST_STREAM frames" do
@tag capture_log: true
test "sends RST_FRAME with no error if stream task ends without closed stream", context do
socket = SimpleH2Client.setup_connection(context)

# Send headers with end_stream bit cleared
SimpleH2Client.send_simple_headers(socket, 1, :post, "/body_response", context.port)
SimpleH2Client.recv_headers(socket)
SimpleH2Client.recv_body(socket)

assert SimpleH2Client.recv_rst_stream(socket) == {:ok, 1, 0}
assert SimpleH2Client.connection_alive?(socket)
end

@tag capture_log: true
test "sends RST_FRAME with error if stream task crashes", context do
socket = SimpleH2Client.setup_connection(context)
Expand Down

0 comments on commit d27d690

Please sign in to comment.