diff --git a/.dialyzer_ignore b/.dialyzer_ignore index a13cca82..2dbafa78 100644 --- a/.dialyzer_ignore +++ b/.dialyzer_ignore @@ -1,5 +1,5 @@ lib/mint/tunnel_proxy.ex:49 -lib/mint/http1.ex:915 +lib/mint/http1.ex:927 lib/mint/unsafe_proxy.ex:173 lib/mint/unsafe_proxy.ex:198 test/support diff --git a/lib/mint/core/conn.ex b/lib/mint/core/conn.ex index 12af3b2c..11ae1b6a 100644 --- a/lib/mint/core/conn.ex +++ b/lib/mint/core/conn.ex @@ -62,4 +62,6 @@ defmodule Mint.Core.Conn do @callback put_proxy_headers(conn(), Mint.Types.headers()) :: conn() @callback put_log(conn(), boolean()) :: conn() + + @callback request_body_window(conn(), Types.request_ref()) :: non_neg_integer() | :infinity end diff --git a/lib/mint/http.ex b/lib/mint/http.ex index 71c1c6ff..bb37b22d 100644 --- a/lib/mint/http.ex +++ b/lib/mint/http.ex @@ -623,6 +623,10 @@ defmodule Mint.HTTP do This function always returns an updated connection to be stored over the old connection. + When streaming a body of arbitrary size, use `request_body_window/2` to learn + how many bytes you can send right now without violating HTTP/2 flow control, + then split your body accordingly before passing each chunk to this function. + For information about transfer encoding and content length in HTTP/1, see `Mint.HTTP1.stream_request_body/3`. @@ -1065,6 +1069,75 @@ defmodule Mint.HTTP do @impl true def put_proxy_headers(conn, headers), do: conn_apply(conn, :put_proxy_headers, [conn, headers]) + @doc """ + Returns the request body flow-control window for the streaming request + identified by `request_ref`. + + The semantics differ by protocol: + + * In HTTP/2, returns `min(connection_window, stream_window)` — the maximum + number of body bytes that can be sent right now without violating flow + control. Exceeding this value in a single `DATA` frame would close the + connection with a `FLOW_CONTROL_ERROR`. See `Mint.HTTP2.get_window_size/2` + for the underlying primitives. + + * In HTTP/1, returns `:infinity`. HTTP/1 has no application-level + flow-control mechanism: any amount of body data is protocol-valid. + + The value returned reflects only the protocol-level flow-control + constraint. It does not account for the operating-system socket send + buffer: under either protocol, `stream_request_body/3` can still block + when that buffer fills up. To bound this behavior, configure + `send_timeout` on the socket via `:transport_opts` when establishing the + connection (see `Mint.HTTP.connect/4`). + + Raises `ArgumentError` if `request_ref` is not associated with an active + streaming request. + + ## Examples + + Streaming a binary body in chunks that respect the protocol window: + + defp stream_body(conn, ref, "") do + Mint.HTTP.stream_request_body(conn, ref, :eof) + end + + defp stream_body(conn, ref, body) do + conn + |> Mint.HTTP.request_body_window(ref) + |> send_body_chunk(conn, ref, body) + end + + defp send_body_chunk(0, conn, ref, body) do + with {:ok, conn} <- wait(conn, ref) do + stream_body(conn, ref, body) + end + end + + defp send_body_chunk(window, conn, ref, body) do + chunk_size = min(window, byte_size(body)) + <> = body + + with {:ok, conn} <- Mint.HTTP.stream_request_body(conn, ref, chunk) do + stream_body(conn, ref, rest) + end + end + + defp wait(conn, ref) do + # Wait for the server to refill the request body window with a + # WINDOW_UPDATE frame. The concrete implementation depends on the + # socket mode and other context. + end + + Note that `min(:infinity, n) == n` thanks to Erlang term ordering, so the + same loop works on HTTP/1 (each iteration sends the entire remaining body in + a single chunk) and on HTTP/2 (each iteration sends at most the current + flow-control window). + """ + @doc since: "1.8.0" + @impl true + def request_body_window(conn, ref), do: conn_apply(conn, :request_body_window, [conn, ref]) + ## Helpers defp conn_apply(%UnsafeProxy{}, fun, args), do: apply(UnsafeProxy, fun, args) diff --git a/lib/mint/http1.ex b/lib/mint/http1.ex index 941e12d0..63c35f43 100644 --- a/lib/mint/http1.ex +++ b/lib/mint/http1.ex @@ -667,6 +667,18 @@ defmodule Mint.HTTP1 do %{conn | proxy_headers: headers} end + @doc """ + See `Mint.HTTP.request_body_window/2`. + """ + @doc since: "1.8.0" + @impl true + def request_body_window(%__MODULE__{streaming_request: %{ref: ref}}, ref), do: :infinity + + def request_body_window(%__MODULE__{}, ref) do + raise ArgumentError, + "request with request reference #{inspect(ref)} was not found or is not streaming a body" + end + ## Helpers defp decode(:status, %{request: request} = conn, data, responses) do diff --git a/lib/mint/http2.ex b/lib/mint/http2.ex index d1b562d0..6f923c4b 100644 --- a/lib/mint/http2.ex +++ b/lib/mint/http2.ex @@ -1039,6 +1039,15 @@ defmodule Mint.HTTP2 do %{conn | proxy_headers: headers} end + @doc """ + See `Mint.HTTP.request_body_window/2`. + """ + @doc since: "1.8.0" + @impl true + def request_body_window(%__MODULE__{} = conn, ref) do + min(get_window_size(conn, :connection), get_window_size(conn, {:request, ref})) + end + ## Helpers defp handle_closed(conn) do diff --git a/lib/mint/unsafe_proxy.ex b/lib/mint/unsafe_proxy.ex index b6318ffa..5c33a551 100644 --- a/lib/mint/unsafe_proxy.ex +++ b/lib/mint/unsafe_proxy.ex @@ -199,4 +199,9 @@ defmodule Mint.UnsafeProxy do def put_proxy_headers(%__MODULE__{}, _headers) do raise "invalid function for proxy unsafe proxy connections" end + + @impl true + def request_body_window(%__MODULE__{module: module, state: state}, ref) do + module.request_body_window(state, ref) + end end diff --git a/test/http_test.exs b/test/http_test.exs index 70e307c2..28b93988 100644 --- a/test/http_test.exs +++ b/test/http_test.exs @@ -1,4 +1,27 @@ defmodule Mint.HTTPTest do use ExUnit.Case, async: true doctest Mint.HTTP + + alias Mint.{HTTP, HTTP1.TestServer} + + setup do + {:ok, port, server_ref} = TestServer.start() + assert {:ok, conn} = HTTP.connect(:http, "localhost", port) + assert_receive {^server_ref, server_socket} + + [conn: conn, server_socket: server_socket] + end + + describe "request_body_window/2" do + test "returns :infinity for an HTTP/1 streaming request", %{conn: conn} do + {:ok, conn, ref} = HTTP.request(conn, "GET", "/", [], :stream) + assert HTTP.request_body_window(conn, ref) == :infinity + end + + test "raises ArgumentError for an unknown request ref", %{conn: conn} do + assert_raise ArgumentError, fn -> + HTTP.request_body_window(conn, make_ref()) + end + end + end end diff --git a/test/mint/http1/conn_test.exs b/test/mint/http1/conn_test.exs index e354be14..37cf4262 100644 --- a/test/mint/http1/conn_test.exs +++ b/test/mint/http1/conn_test.exs @@ -1151,6 +1151,19 @@ defmodule Mint.HTTP1Test do {:ok, conn, responses} end + describe "request_body_window/2" do + test "returns :infinity for an active streaming request", %{conn: conn} do + {:ok, conn, ref} = HTTP1.request(conn, "GET", "/", [], :stream) + assert HTTP1.request_body_window(conn, ref) == :infinity + end + + test "raises if no request is currently streaming a body", %{conn: conn} do + assert_raise ArgumentError, ~r/was not found or is not streaming a body/, fn -> + HTTP1.request_body_window(conn, make_ref()) + end + end + end + @mint_user_agent "mint/#{Mix.Project.config()[:version]}" defp mint_user_agent, do: @mint_user_agent end diff --git a/test/mint/http2/conn_test.exs b/test/mint/http2/conn_test.exs index 27215664..f9143eca 100644 --- a/test/mint/http2/conn_test.exs +++ b/test/mint/http2/conn_test.exs @@ -1772,6 +1772,82 @@ defmodule Mint.HTTP2Test do HTTP2.get_window_size(conn, {:request, make_ref()}) end end + + test "request_body_window/2 returns the minimum of connection and request window sizes", + %{conn: conn} do + {conn, ref} = open_request(conn, :stream) + + send_window = HTTP2.request_body_window(conn, ref) + conn_window = HTTP2.get_window_size(conn, :connection) + request_window = HTTP2.get_window_size(conn, {:request, ref}) + + assert send_window == min(conn_window, request_window) + end + + test "request_body_window/2 decreases after streaming body data", %{conn: conn} do + {conn, ref} = open_request(conn, :stream) + + initial_send_window = HTTP2.request_body_window(conn, ref) + assert initial_send_window > 0 + + body_chunk = "hello" + {:ok, conn} = HTTP2.stream_request_body(conn, ref, body_chunk) + + assert HTTP2.request_body_window(conn, ref) == initial_send_window - byte_size(body_chunk) + end + + test "request_body_window/2 raises if the request is not found", %{conn: conn} do + assert_raise ArgumentError, ~r/request with request reference .+ was not found/, fn -> + HTTP2.request_body_window(conn, make_ref()) + end + end + + @tag server_settings: [initial_window_size: 5] + test "streaming a body larger than the window using request_body_window/2 in a loop", + %{conn: conn} do + {conn, ref} = open_request(conn, :stream) + + assert_recv_frames [headers(stream_id: stream_id)] + + body = "0123456789ABCDE" + + # First chunk: window is 5, so we send 5 bytes. + assert HTTP2.request_body_window(conn, ref) == 5 + <> = body + {:ok, conn} = HTTP2.stream_request_body(conn, ref, chunk1) + + assert HTTP2.request_body_window(conn, ref) == 0 + + assert_recv_frames [data(stream_id: ^stream_id, data: ^chunk1, flags: flags1)] + assert flags1 == set_flags(:data, []) + + # Server replenishes the stream window so we can send more. + {:ok, conn, []} = + stream_frames(conn, [window_update(stream_id: stream_id, window_size_increment: 5)]) + + assert HTTP2.request_body_window(conn, ref) == 5 + <> = rest1 + {:ok, conn} = HTTP2.stream_request_body(conn, ref, chunk2) + + assert_recv_frames [data(stream_id: ^stream_id, data: ^chunk2)] + + # Final replenishment for the remaining bytes plus :eof. + {:ok, conn, []} = + stream_frames(conn, [ + window_update(stream_id: stream_id, window_size_increment: byte_size(rest2)) + ]) + + assert HTTP2.request_body_window(conn, ref) == byte_size(rest2) + {:ok, conn} = HTTP2.stream_request_body(conn, ref, rest2) + {:ok, _conn} = HTTP2.stream_request_body(conn, ref, :eof) + + assert_recv_frames [ + data(stream_id: ^stream_id, data: ^rest2), + data(stream_id: ^stream_id, data: "", flags: end_flags) + ] + + assert end_flags == set_flags(:data, [:end_stream]) + end end describe "settings" do