Skip to content

Commit

Permalink
Streaming response body as enumerable (#362)
Browse files Browse the repository at this point in the history
  • Loading branch information
wojtekmach authored May 24, 2024
1 parent 84531dc commit 8fa73e4
Show file tree
Hide file tree
Showing 9 changed files with 161 additions and 39 deletions.
25 changes: 19 additions & 6 deletions lib/req.ex
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ defmodule Req do
{:ok, [:done]}
""
Same as above, using enumerable API:
iex> resp = Req.get!("http://httpbin.org/stream/2", into: :self)
iex> resp.body
#Req.Response.Async<...>
iex> Enum.map(resp.body, & &1["id"])
[0, 1]
## Header Names
The HTTP specification requires that header names should be case-insensitive.
Expand Down Expand Up @@ -242,8 +250,13 @@ defmodule Req do
* `collectable` - stream response body into a `t:Collectable.t/0`.
* `:self` - stream response body into the process mailbox. The messages should be parsed using
`Req.parse_message/2`.
* `:self` - stream response body into the current process mailbox.
Received messages should be parsed with `Req.parse_message/2`.
`response.body` is set to opaque data structure `Req.Response.Async` which implements
`Enumerable` that receives and automatically parses messages. See module documentation
for example usage.
Response redirect options ([`redirect`](`Req.Steps.redirect/1`) step):
Expand Down Expand Up @@ -1180,8 +1193,8 @@ defmodule Req do
iex> Req.parse_message(resp, receive do message -> message end)
{:ok, [:done]}
"""
def parse_message(%Req.Response{} = resp, message) do
resp.async.stream_fun.(resp.async.ref, message)
def parse_message(%Req.Response{body: %Req.Response.Async{stream_fun: fun, ref: ref}}, message) do
fun.(ref, message)
end

def parse_message(%Req.Request{} = request, message) do
Expand All @@ -1201,8 +1214,8 @@ defmodule Req do
iex> Req.cancel_async_response(resp)
:ok
"""
def cancel_async_response(%Req.Response{} = response) do
response.async.cancel_fun.(response.async.ref)
def cancel_async_response(%Req.Response{body: %Req.Response.Async{cancel_fun: fun, ref: ref}}) do
fun.(ref)
end

@deprecated "use Req.cancel_async_response(resp)) instead"
Expand Down
5 changes: 0 additions & 5 deletions lib/req/async.ex

This file was deleted.

7 changes: 2 additions & 5 deletions lib/req/response.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,16 @@ defmodule Req.Response do
@type t() :: %__MODULE__{
status: non_neg_integer(),
headers: %{optional(binary()) => [binary()]},
body: binary() | term(),
body: binary() | %Req.Response.Async{} | term(),
trailers: %{optional(binary()) => [binary()]},
private: map()
}

@derive {Inspect, except: [:async]}

defstruct status: 200,
headers: if(Req.MixProject.legacy_headers_as_lists?(), do: [], else: %{}),
body: "",
trailers: %{},
private: %{},
async: nil
private: %{}

@doc """
Returns a new response.
Expand Down
80 changes: 80 additions & 0 deletions lib/req/response_async.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
defmodule Req.Response.Async do
@moduledoc """
Asynchronous response body.
This is the `response.body` when making a request with `into: :self`, that is,
streaming response body chunks to the current process mailbox.
This struct implements the `Enumerale` protocol where each element is a body chunk received
from the current process mailbox. HTTP Trailer fields are ignored.
**Note:** this feature is currently experimental and it may change in future releases.
## Examples
iex> resp = Req.get!("https://reqbin.org/ndjson?delay=1000", into: :self)
iex> resp.body
#Req.Response.Async<...>
iex> Enum.each(resp.body, &IO.puts/1)
# {"id":0}
# {"id":1}
# {"id":2}
:ok
"""

@derive {Inspect, only: []}
defstruct [:pid, :ref, :stream_fun, :cancel_fun]

defimpl Enumerable do
def count(_async), do: {:error, __MODULE__}

def member?(_async, _value), do: {:error, __MODULE__}

def slice(_async), do: {:error, __MODULE__}

def reduce(async, {:halt, acc}, _fun) do
cancel(async)
{:halted, acc}
end

def reduce(async, {:suspend, acc}, fun) do
{:suspended, acc, &reduce(async, &1, fun)}
end

def reduce(async, {:cont, acc}, fun) do
if async.pid != self() do
raise "expected to read body chunk in the process #{inspect(async.pid)} which made the request, got: #{inspect(self())}"
end

receive do
message ->
case async.stream_fun.(async.ref, message) do
{:ok, [data: data]} ->
result =
try do
fun.(data, acc)
rescue
e ->
cancel(async)
reraise e, __STACKTRACE__
end

reduce(async, result, fun)

{:ok, [:done]} ->
{:done, acc}

{:ok, [trailers: _trailers]} ->
reduce(async, {:cont, acc}, fun)

{:error, e} ->
raise e
end
end
end

defp cancel(async) do
async.cancel_fun.(async.ref)
end
end
end
19 changes: 12 additions & 7 deletions lib/req/steps.ex
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,8 @@ defmodule Req.Steps do
end)
end

async = %Req.Async{
async = %Req.Response.Async{
pid: self(),
ref: ref,
stream_fun: &finch_parse_message/2,
cancel_fun: &finch_cancel/1
Expand Down Expand Up @@ -959,14 +960,15 @@ defmodule Req.Steps do
end)
end

async = %Req.Async{
async = %Req.Response.Async{
pid: self(),
ref: ref,
stream_fun: &finch_parse_message/2,
cancel_fun: &finch_cancel/1
}

resp = Req.Response.new(status: status, headers: headers)
resp = put_in(resp.async, async)
resp = put_in(resp.body, async)
{req, resp}
end

Expand Down Expand Up @@ -1008,16 +1010,19 @@ defmodule Req.Steps do
end

defp finch_parse_message(ref, {ref, {:data, data}}) do
{:ok, [{:data, data}]}
{:ok, [data: data]}
end

defp finch_parse_message(ref, {ref, :done}) do
{:ok, [:done]}
end

# TODO: handle remaining possible Finch results
defp finch_parse_message(_ref, _other) do
:unknown
defp finch_parse_message(ref, {ref, {:trailers, trailers}}) do
{:ok, [trailers: trailers]}
end

defp finch_parse_message(ref, {ref, {:error, reason}}) do
{:error, reason}
end

defp finch_cancel(ref) do
Expand Down
6 changes: 5 additions & 1 deletion test/req/httpc_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ defmodule Req.HttpcTest do

require Logger

# TODO
@moduletag :skip

setup do
bypass = Bypass.open()

Expand Down Expand Up @@ -249,7 +252,8 @@ defmodule Req.HttpcTest do
_ -> 200
end

async = %Req.Async{
async = %Req.Response.Async{
pid: self(),
ref: ref,
stream_fun: &httpc_stream/2,
cancel_fun: &httpc_cancel/1
Expand Down
16 changes: 1 addition & 15 deletions test/req/steps_test.exs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Req.StepsTest do
use ExUnit.Case, async: true

import TestHelper, only: [start_server: 1]
require Logger

setup do
Expand Down Expand Up @@ -2211,20 +2211,6 @@ defmodule Req.StepsTest do
end
end

defp start_server(plug) do
options = [
scheme: :http,
port: 0,
plug: fn conn, _ -> plug.(conn) end,
startup_log: false,
http_options: [compress: false]
]

pid = start_supervised!({Bandit, options})
{:ok, {_ip, port}} = ThousandIsland.listener_info(pid)
%{pid: pid, url: "http://localhost:#{port}"}
end

defp start_socket(fun) do
{:ok, listen_socket} = :gen_tcp.listen(0, mode: :binary, active: false)
{:ok, port} = :inet.port(listen_socket)
Expand Down
26 changes: 26 additions & 0 deletions test/req_test.exs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
defmodule ReqTest do
use ExUnit.Case, async: true
import TestHelper, only: [start_server: 1]

doctest Req,
only: [
Expand Down Expand Up @@ -72,4 +73,29 @@ defmodule ReqTest do
req = Req.new(plugins: [foo], foo: 42)
assert req.options.foo == 42
end

test "async enumerable" do
%{url: origin_url} =
start_server(fn conn ->
conn = Plug.Conn.send_chunked(conn, 200)
{:ok, conn} = Plug.Conn.chunk(conn, "foo")
{:ok, conn} = Plug.Conn.chunk(conn, "bar")
{:ok, conn} = Plug.Conn.chunk(conn, "baz")
conn
end)

%{url: proxy_url} =
start_server(fn conn ->
%{status: 200, body: async} = Req.get!(url: origin_url, into: :self)
conn = Plug.Conn.send_chunked(conn, 200)
Enum.reduce(async, conn, &chunk(&2, &1))
end)

assert Req.get!(proxy_url, into: []).body == ~w[foo bar baz]
end

defp chunk(conn, data) do
{:ok, conn} = Plug.Conn.chunk(conn, data)
conn
end
end
16 changes: 16 additions & 0 deletions test/test_helper.exs
Original file line number Diff line number Diff line change
@@ -1,3 +1,19 @@
defmodule TestHelper do
def start_server(plug) do
options = [
scheme: :http,
port: 0,
plug: fn conn, _ -> plug.(conn) end,
startup_log: false,
http_options: [compress: false]
]

pid = ExUnit.Callbacks.start_supervised!({Bandit, options})
{:ok, {_ip, port}} = ThousandIsland.listener_info(pid)
%{pid: pid, url: "http://localhost:#{port}"}
end
end

defmodule EzstdFilter do
# Filter out:
# 17:56:39.116 [debug] Loading library: ~c"/path/to/req/_build/test/lib/ezstd/priv/ezstd_nif"
Expand Down

0 comments on commit 8fa73e4

Please sign in to comment.