Skip to content

Commit

Permalink
Factor last bits of h2 into something compatible with where we want t…
Browse files Browse the repository at this point in the history
…o take http1

Define HTTPTransport behaviour to model the two transport implementations
  • Loading branch information
mtrudel committed Feb 8, 2024
1 parent 1808a25 commit 589f7a0
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 55 deletions.
90 changes: 36 additions & 54 deletions lib/bandit/http2/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -117,15 +117,15 @@ defmodule Bandit.HTTP2.Adapter do
headers = if compress, do: [{"vary", "accept-encoding"} | headers], else: headers
body_bytes = IO.iodata_length(body)
headers = Bandit.Headers.add_content_length(headers, body_bytes, status)
adapter = %{adapter | status: status}

# Optimization to send end_stream on the header response and avoid a data frame
adapter =
if body_bytes == 0 || !send_resp_body?(adapter) do
if body_bytes == 0 do
adapter
|> send_headers(status, headers, true)
|> send_headers(status, headers, :no_body)
else
adapter
|> send_headers(status, headers, false)
|> send_headers(status, headers, :raw)
|> send_data(body, true)
end

Expand All @@ -140,60 +140,36 @@ defmodule Bandit.HTTP2.Adapter do
@impl Plug.Conn.Adapter
def send_file(%__MODULE__{} = adapter, status, headers, path, offset, length) do
validate_calling_process!(adapter)

start_time = Bandit.Telemetry.monotonic_time()
%File.Stat{type: :regular, size: size} = File.stat!(path)
length = if length == :all, do: size - offset, else: length
headers = Bandit.Headers.add_content_length(headers, length, status)
adapter = %{adapter | status: status}

adapter =
cond do
!send_resp_body?(adapter) ->
send_headers(adapter, status, headers, true)

offset + length == size && offset == 0 ->
adapter = send_headers(adapter, status, headers, false)

path
|> File.stream!([], 2048)
|> Enum.reduce(adapter, fn chunk, adapter -> send_data(adapter, chunk, false) end)
|> send_data(<<>>, true)

offset + length <= size ->
case :file.open(path, [:raw, :binary]) do
{:ok, fd} ->
try do
with {:ok, data} <- :file.pread(fd, offset, length) do
adapter
|> send_headers(status, headers, false)
|> send_data(data, true)
end
after
:file.close(fd)
end

{:error, reason} ->
{:error, reason}
end

true ->
raise "Cannot read #{length} bytes starting at #{offset} as #{path} is only #{size} octets in length"
end
if offset + length <= size do
headers = Bandit.Headers.add_content_length(headers, length, status)
adapter = send_headers(adapter, status, headers, :raw)

metrics = Map.put(adapter.metrics, :resp_end_time, Bandit.Telemetry.monotonic_time())
{:ok, nil, %{adapter | metrics: metrics}}
{stream, bytes_actually_written} =
if send_resp_body?(adapter),
do: {Bandit.HTTP2.Stream.sendfile(adapter.stream, path, offset, length), length},
else: {adapter.stream, 0}

metrics =
adapter.metrics
|> Map.put(:resp_body_bytes, bytes_actually_written)
|> Map.put(:resp_start_time, start_time)
|> Map.put(:resp_end_time, Bandit.Telemetry.monotonic_time())

{:ok, nil, %{adapter | stream: stream, metrics: metrics}}
else
raise "Cannot read #{length} bytes starting at #{offset} as #{path} is only #{size} octets in length"
end
end

@impl Plug.Conn.Adapter
def send_chunked(%__MODULE__{} = adapter, status, headers) do
validate_calling_process!(adapter)

adapter = %{adapter | status: status}

if send_resp_body?(adapter) do
{:ok, nil, send_headers(adapter, status, headers, false)}
else
{:ok, nil, send_headers(adapter, status, headers, true)}
end
{:ok, nil, send_headers(adapter, status, headers, :chunk_encoded)}
end

@impl Plug.Conn.Adapter
Expand Down Expand Up @@ -224,7 +200,7 @@ defmodule Bandit.HTTP2.Adapter do
@impl Plug.Conn.Adapter
def inform(adapter, status, headers) do
validate_calling_process!(adapter)
stream = Bandit.HTTP2.Stream.send_headers(adapter.stream, status, headers, false)
stream = Bandit.HTTP2.Stream.send_headers(adapter.stream, status, headers, :inform)
{:ok, %{adapter | stream: stream}}
end

Expand All @@ -238,14 +214,14 @@ defmodule Bandit.HTTP2.Adapter do
def get_peer_data(req), do: Bandit.TransportInfo.peer_data(req.stream.transport_info)

@impl Plug.Conn.Adapter
def get_http_protocol(%__MODULE__{}), do: :"HTTP/2"
def get_http_protocol(%__MODULE__{} = req), do: Bandit.HTTP2.Stream.version(req.stream)

defp send_resp_body?(%{method: "HEAD"}), do: false
defp send_resp_body?(%{status: 204}), do: false
defp send_resp_body?(%{status: 304}), do: false
defp send_resp_body?(_req), do: true

defp send_headers(adapter, status, headers, end_stream) do
defp send_headers(adapter, status, headers, body_disposition) do
metrics =
adapter.metrics
|> Map.put_new_lazy(:resp_start_time, &Bandit.Telemetry.monotonic_time/0)
Expand All @@ -258,12 +234,18 @@ defmodule Bandit.HTTP2.Adapter do
headers
end

stream = Bandit.HTTP2.Stream.send_headers(adapter.stream, status, headers, end_stream)
adapter = %{adapter | status: status}
body_disposition = if send_resp_body?(adapter), do: body_disposition, else: :no_body
stream = Bandit.HTTP2.Stream.send_headers(adapter.stream, status, headers, body_disposition)
%{adapter | stream: stream, metrics: metrics}
end

defp send_data(adapter, data, end_stream) do
stream = Bandit.HTTP2.Stream.send_data(adapter.stream, data, end_stream)
stream =
if send_resp_body?(adapter),
do: Bandit.HTTP2.Stream.send_data(adapter.stream, data, end_stream),
else: adapter.stream

bytes_sent = IO.iodata_length(data)
metrics = adapter.metrics |> Map.update(:resp_body_bytes, bytes_sent, &(&1 + bytes_sent))
%{adapter | stream: stream, metrics: metrics}
Expand Down
35 changes: 34 additions & 1 deletion lib/bandit/http2/stream.ex
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ defmodule Bandit.HTTP2.Stream do
# functions here, with the luxury of a nicely unwound stack and a process that is guaranteed to
# be terminated as soon as these functions are called

@behaviour Bandit.HTTPTransport

require Integer
require Logger

Expand Down Expand Up @@ -105,6 +107,10 @@ defmodule Bandit.HTTP2.Stream do

# Stream API - Receiving

@impl Bandit.HTTPTransport
def version(%__MODULE__{}), do: :"HTTP/2"

@impl Bandit.HTTPTransport
def read_headers(%__MODULE__{state: :idle} = stream) do
case do_recv(stream, stream.read_timeout) do
{:headers, headers, stream} ->
Expand Down Expand Up @@ -239,6 +245,7 @@ defmodule Bandit.HTTP2.Stream do
[{"cookie", combined_cookie} | other_headers]
end

@impl Bandit.HTTPTransport
def read_data(stream, opts) do
max_bytes = Keyword.get(opts, :length, 8_000_000)
timeout = Keyword.get(opts, :read_timeout, 15_000)
Expand Down Expand Up @@ -395,8 +402,15 @@ defmodule Bandit.HTTP2.Stream do

# Stream API - Sending

def send_headers(%__MODULE__{state: state} = stream, status, headers, end_stream)
@impl Bandit.HTTPTransport
def send_headers(%__MODULE__{state: state} = stream, status, headers, body_disposition)
when state in [:open, :remote_closed] do
# We need to map body_disposition into the state model of HTTP/2. This turns out to be really
# easy, since HTTP/2 only has one way to send data. The only bit we need from the disposition
# is whether there will be any data forthcoming (ie: whether or not to end the stream). That
# will possibly walk us to a different state per RFC9113§5.1, as determined by the tail call
# to set_state_on_send_end_stream/2
end_stream = body_disposition == :no_body
headers = [{":status", to_string(status)} | split_cookies(headers)]
do_send(stream, {:send_headers, headers, end_stream})
set_state_on_send_end_stream(stream, end_stream)
Expand All @@ -414,6 +428,7 @@ defmodule Bandit.HTTP2.Stream do
end)
end

@impl Bandit.HTTPTransport
def send_data(%__MODULE__{state: state} = stream, data, end_stream)
when state in [:open, :remote_closed] do
stream =
Expand Down Expand Up @@ -449,6 +464,23 @@ defmodule Bandit.HTTP2.Stream do
end
end

@impl Bandit.HTTPTransport
def sendfile(%__MODULE__{} = stream, path, offset, length) do
case :file.open(path, [:raw, :binary]) do
{:ok, fd} ->
try do
with {:ok, data} <- :file.pread(fd, offset, length) do
send_data(stream, data, true)
end
after
:file.close(fd)
end

{:error, reason} ->
{:error, reason}
end
end

defp split_data(data, desired_length) do
data_length = IO.iodata_length(data)

Expand All @@ -470,6 +502,7 @@ defmodule Bandit.HTTP2.Stream do

# Closing off the stream upon completion or error

@impl Bandit.HTTPTransport
def ensure_completed(%__MODULE__{state: :closed} = stream), do: stream

def ensure_completed(%__MODULE__{state: :local_closed} = stream) do
Expand Down
31 changes: 31 additions & 0 deletions lib/bandit/http_transport.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule Bandit.HTTPTransport do
@moduledoc false
# A behaviour implemented by the lower level transports (HTTP/1 and HTTP/2) to encapsulate the
# low-level mechanics needed to complete an HTTP request/response cycle. Implementations of this
# behaviour should be broadly concerned with the protocol-specific aspects of a connection, and
# can rely on higher-level code taking care of shared HTTP semantics

@type transport :: Bandit.HTTP2.Stream.t()

@typedoc "How the response body is to be delivered"
@type body_disposition :: :raw | :chunk_encoded | :no_body | :inform

@callback version(transport()) :: Plug.Conn.Adapter.http_protocol()

@callback read_headers(transport()) ::
{:ok, Plug.Conn.method(), Bandit.Pipeline.request_target(), Plug.Conn.headers(),
transport()}

@callback read_data(transport(), opts :: keyword()) ::
{:ok, iodata(), transport()} | {:more, iodata(), transport()}

@callback send_headers(transport(), Plug.Conn.status(), Plug.Conn.headers(), body_disposition()) ::
transport()

@callback send_data(transport(), data :: iodata(), end_request :: boolean()) :: transport()

@callback sendfile(transport(), Path.t(), offset :: integer(), length :: integer() | :all) ::
transport()

@callback ensure_completed(transport()) :: transport()
end

0 comments on commit 589f7a0

Please sign in to comment.