Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Plug.Conn.read_body fails inside separate process #215

Closed
danschultzer opened this issue Aug 20, 2023 · 5 comments · Fixed by #217
Closed

Plug.Conn.read_body fails inside separate process #215

danschultzer opened this issue Aug 20, 2023 · 5 comments · Fixed by #217

Comments

@danschultzer
Copy link
Contributor

Hi @mtrudel!

With #101 we removed the requirement for the Plug.Conn function calls to be in the same process for HTTP2. Unfortunately I've found that that introduced an issue when calling Plug.Conn.read_body on HTTP/2 protocol within another process (so your gut feeling in #101 were correct) 😞

This is the test case I've been testing with in test/bandit/http2/plug_test:

  test "reading request body from other process", context do
    response = Req.post!(context.req, url: "/other_process_body_read", body: "OK")

    assert response.status == 200
  end

  def other_process_body_read(conn) do
    fn ->
      {:ok, body, conn} = read_body(conn)
      assert body == "OK"
      conn |> send_resp(200, body)
    end
    |> Task.async()
    |> Task.await()
  end

This will fail because it's assumed that the stream is the very same process as where Plug.Conn.read_req_body/2 is called:

receive do
{:data, data} ->
acc = [data | acc]
remaining_length = remaining_length - byte_size(data)
if remaining_length >= 0 do
do_read_req_body(adapter, timeout, remaining_length, acc)
else
bytes_read = IO.iodata_length(acc)
metrics =
adapter.metrics
|> Map.update(:req_body_bytes, bytes_read, &(&1 + bytes_read))
{:more, wrap_req_body(acc), %{adapter | metrics: metrics}}
end
:end_stream ->
bytes_read = IO.iodata_length(acc)
metrics =
adapter.metrics
|> Map.update(:req_body_bytes, bytes_read, &(&1 + bytes_read))
|> Map.put(:req_body_end_time, Bandit.Telemetry.monotonic_time())
{:ok, wrap_req_body(acc), %{adapter | end_stream: true, metrics: metrics}}
after
timeout ->
bytes_read = IO.iodata_length(acc)
metrics =
adapter.metrics
|> Map.update(:req_body_bytes, bytes_read, &(&1 + bytes_read))
{:more, wrap_req_body(acc), %{adapter | metrics: metrics}}
end

This issue doesn't happen with either Bandit.HTTP1 or Plug.Cowboy because they have encapsulated the message handling to the process that will receive them:

defp read(socket, to_read, already_read, read_size, read_timeout) do
with {:ok, chunk} <- ThousandIsland.Socket.recv(socket, min(to_read, read_size), read_timeout) do
remaining_bytes = to_read - byte_size(chunk)
if remaining_bytes > 0 do
read(socket, remaining_bytes, [already_read | chunk], read_size, read_timeout)
else
{:ok, [already_read | chunk]}
end
end
end

https://github.com/elixir-plug/plug_cowboy/blob/3960916cf06b94934d77f144a94dc1edc7a7801c/lib/plug/cowboy/conn.ex#L77-L82

I was working on a PR with a potential solution, moving the responsibility of listening to the stream messages into the stream process. Ideally Bandit.HTTP2.Adapter.read_req_body calls Bandit.HTTP2.StreamTask (or Bandit.HTTP2.Stream) with pid and let the stream wait on the messages. But as I was working on it, it seems that it require more significant changes, such as the adapter needs to carry the stream pid, the Bandit.Pipeline.run needs to listen to a :done message, and/or that Bandit.HTTP2.StreamTask encapsulates the plug to listen for messages.

I was also thinking that maybe I could handle it with changing how the def recv_data worked.

What do you think would be the best approach here?

We could revert the changes from #101 and just force that all Plug.Conn function calls must happen within the same PID. As discussed it's ambivalent where these function calls comes from, and there are edge cases (like in mine) where the conn would be called in a separate process, but if this is too much of a lift to handle otherwise we can just revert the changes, and I could probably find a way to encapsulate the Bandit HTTP/2 adapter in TestServer.

If we end up reverting #101, then sorry for causing headaches here!

@mtrudel
Copy link
Owner

mtrudel commented Aug 23, 2023

I've actually got a pretty extensive workup planned post 1.0 for HTTP/2 that will involve factoring stream workers into GenServers (prep work in order to land #81). This is a bit of a wrinkle that I hadn't considered in that work; without having thought it completely through, I think reverting 101 probably makes the most sense.

@danschultzer
Copy link
Contributor Author

danschultzer commented Aug 26, 2023

Yeah, the revert is good. It'll keep it consistent. I'll try rework TestServer to handle Bandit HTTP/2.

Looking forward to the refactor of the stream workers! It makes sense to me that the HTTP/2 message handling should be encapsulated in a process separate from the plug call/interaction.

@danschultzer
Copy link
Contributor Author

@mtrudel if you are curious, I solved this in TestServer by encapsulating the Bandit HTTP2 adapter.

@mtrudel
Copy link
Owner

mtrudel commented Aug 26, 2023

Looking forward to the refactor of the stream workers! It makes sense to me that the HTTP/2 message handling should be encapsulated in a process separate from the plug call/interaction.

That's actually how it works today (notes here). The change I'm having to make is to move the individual request handlers (which correspond 1:1 with active HTTP/2 streams & also Plug calls) from Task to GenServer, so that they have the relevant ancestry when an HTTP/2 connection gets upgraded to a WebSocket (as defined in RFC 8441).

@mtrudel
Copy link
Owner

mtrudel commented Jan 12, 2024

Heads up that as part of #286, I'm planning on rejigging owner PID read enforcement by:

  • Adding an owner PID field to the req structure, and enforcing that it's equal to self() whenever you attempt to call any functions on adapter that block on receive calls; and,
  • Removing the PID enforcement from within the parent connection process

This is being done to slim down the as part of moving the H2 stack to a GenServer pattern & moving as much stream logic as possible into the stream process and/or adapter modules. I'm aiming for a better division of responsibility by the end of the PR, whereby the connection process knows almost nothing about the internal state of individual streams, and functions more like a switchboard than anything else. The jump from there to an RFC 8441 implementation is much smaller

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants