From f3e5f3e410b6fd2dbf8cd69a00245131c8f0ff4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 19 May 2017 20:18:00 +0200 Subject: Preliminary h2 flow control support Existing tests pass. A number of things remain to be done. Has only been tested with Gun so far. Feedback welcome! --- ebin/cowboy.app | 2 +- src/cowboy_http2.erl | 215 +++++++++++++++++++++++++++++++++++-------------- src/cowboy_iolists.erl | 71 ++++++++++++++++ src/cowboy_req.erl | 2 +- test/req_SUITE.erl | 4 +- 5 files changed, 231 insertions(+), 63 deletions(-) create mode 100644 src/cowboy_iolists.erl diff --git a/ebin/cowboy.app b/ebin/cowboy.app index a8d4a82..4b5088a 100644 --- a/ebin/cowboy.app +++ b/ebin/cowboy.app @@ -1,7 +1,7 @@ {application, 'cowboy', [ {description, "Small, fast, modern HTTP server."}, {vsn, "2.0.0-pre.8"}, - {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_loop','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']}, + {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_iolists','cowboy_loop','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']}, {registered, [cowboy_sup,cowboy_clock]}, {applications, [kernel,stdlib,crypto,cowlib,ranch]}, {mod, {cowboy_app, []}}, diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index e6e8a8b..85fdeef 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -36,11 +36,20 @@ -record(stream, { id = undefined :: cowboy_stream:streamid(), %% Stream handlers and their state. - state = undefined :: {module(), any()}, + state = undefined :: {module(), any()} | flush, %% Whether we finished sending data. - local = idle :: idle | upgrade | cowboy_stream:fin(), + local = idle :: idle | upgrade | cowboy_stream:fin() | flush, + %% Local flow control window (how much we can send). + local_window :: integer(), + %% Buffered data waiting for the flow control window to increase. + local_buffer = queue:new() :: queue:queue( + {cowboy_stream:fin(), non_neg_integer(), iolist() + | {sendfile, non_neg_integer(), pos_integer(), file:name_all()}}), + local_buffer_size = 0 :: non_neg_integer(), %% Whether we finished receiving data. remote = nofin :: cowboy_stream:fin(), + %% Remote flow control window (how much we accept to receive). + remote_window :: integer(), %% Request body length. body_length = 0 :: non_neg_integer() }). @@ -67,7 +76,7 @@ % header_table_size => 4096, % enable_push => false, %% We are the server. Push is never enabled. % max_concurrent_streams => infinity, -% initial_window_size => 65535, + initial_window_size => 65535, max_frame_size => 16384 % max_header_list_size => infinity } :: map(), @@ -75,7 +84,13 @@ %% We need to be careful there. It's well possible that we send %% two SETTINGS frames before we receive a SETTINGS ack. next_settings = #{} :: undefined | map(), %% @todo perhaps set to undefined by default - remote_settings = #{} :: map(), + remote_settings = #{ + initial_window_size => 65535 + } :: map(), + + %% Connection-wide flow control window. + local_window = 65535 :: integer(), %% How much we can send. + remote_window = 65535 :: integer(), %% How much we accept to receive. %% Stream identifiers. client_streamid = 0 :: non_neg_integer(), @@ -269,17 +284,22 @@ parse_settings_preface(State, _, _, _) -> %% and terminate the stream if this is the end of it. %% DATA frame. -frame(State=#state{streams=Streams}, {data, StreamID, IsFin0, Data}) -> +frame(State=#state{remote_window=ConnWindow, streams=Streams}, + {data, StreamID, IsFin0, Data}) -> case lists:keyfind(StreamID, #stream.id, Streams) of - Stream = #stream{state=StreamState0, remote=nofin, body_length=Len0} -> - Len = Len0 + byte_size(Data), + Stream = #stream{state=StreamState0, remote=nofin, + remote_window=StreamWindow, body_length=Len0} -> + DataLen = byte_size(Data), + Len = Len0 + DataLen, IsFin = case IsFin0 of fin -> {fin, Len}; nofin -> nofin end, try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of {Commands, StreamState} -> - commands(State, Stream#stream{state=StreamState, body_length=Len}, Commands) + commands(State#state{remote_window=ConnWindow - DataLen}, + Stream#stream{state=StreamState, remote_window=StreamWindow - DataLen, + body_length=Len}, Commands) catch Class:Reason -> error_logger:error_msg("Exception occurred in " "cowboy_stream:data(~p, ~p, ~p, ~p) with reason ~p:~p.", @@ -342,13 +362,22 @@ frame(State, {ping_ack, _Opaque}) -> frame(State, Frame={goaway, _, _, _}) -> terminate(State, {stop, Frame, 'Client is going away.'}); %% Connection-wide WINDOW_UPDATE frame. -frame(State, {window_update, _Increment}) -> - %% @todo control flow - State; +frame(State=#state{local_window=ConnWindow}, {window_update, Increment}) -> + send_data(State#state{local_window=ConnWindow + Increment}); %% Stream-specific WINDOW_UPDATE frame. -frame(State, {window_update, _StreamID, _Increment}) -> - %% @todo stream-specific control flow - State; +frame(State0=#state{streams=Streams0}, {window_update, StreamID, Increment}) -> + case lists:keyfind(StreamID, #stream.id, Streams0) of + Stream0 = #stream{local_window=StreamWindow} -> + {State, Stream} = send_data(State0, + Stream0#stream{local_window=StreamWindow + Increment}), + Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream), + State#state{streams=Streams}; + false -> + %% @todo Receiving this frame on a stream in the idle state is an error. + %% WINDOW_UPDATE frames may be received for a short period of time + %% after a stream is closed. They must be ignored. + State0 + end; %% Unexpected CONTINUATION frame. frame(State, {continuation, _, _, _}) -> terminate(State, {connection_error, protocol_error, @@ -419,11 +448,8 @@ commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeSta [{sendfile, fin, O, B, P}|Tail]); _ -> Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)), - %% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE. - %% Use the length set by the server instead, if any. - %% @todo Would be better if we didn't have to convert to binary. - send_data(Socket, Transport, StreamID, fin, iolist_to_binary(Body), 16384), - commands(State#state{encode_state=EncodeState}, Stream#stream{local=fin}, Tail) + {State1, Stream1} = send_data(State, Stream#stream{local=nofin}, fin, Body), + commands(State1#state{encode_state=EncodeState}, Stream1, Tail) end; %% @todo response when local!=idle %% Send response headers and initiate chunked encoding. @@ -445,10 +471,9 @@ commands(State=#state{socket=Socket, transport=Transport, encode_state=EncodeSta %% split into multiple calls and flow control should be used to make %% sure we only send as fast as the client can receive and don't block %% anything. -commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{id=StreamID, local=nofin}, - [{data, IsFin, Data}|Tail]) -> - Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)), - commands(State, Stream#stream{local=IsFin}, Tail); +commands(State0, Stream0=#stream{local=nofin}, [{data, IsFin, Data}|Tail]) -> + {State, Stream} = send_data(State0, Stream0, IsFin, Data), + commands(State, Stream, Tail); %% @todo data when local!=nofin @@ -463,16 +488,10 @@ commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{id=Str %% to ensure the file is sent in chunks (which would require a better %% flow control at the stream handler level). One thing for sure, the %% implementation necessarily varies between HTTP/1.1 and HTTP/2. -commands(State=#state{socket=Socket, transport=Transport}, Stream=#stream{id=StreamID, local=nofin}, +commands(State0, Stream0=#stream{local=nofin}, [{sendfile, IsFin, Offset, Bytes, Path}|Tail]) -> - %% @todo We currently have a naive implementation without a - %% scheduler to prioritize frames that need to be sent. - %% A future update will need to queue such data frames - %% and only send them when there is nothing currently - %% being sent. We would probably also benefit from doing - %% asynchronous sends. - sendfile(Socket, Transport, StreamID, IsFin, Offset, Bytes, Path, 16384), - commands(State, Stream#stream{local=IsFin}, Tail); + {State, Stream} = send_data(State0, Stream0, IsFin, {sendfile, Offset, Bytes, Path}), + commands(State, Stream, Tail); %% @todo sendfile when local!=nofin %% Send a push promise. %% @@ -500,9 +519,15 @@ commands(State0=#state{socket=Socket, transport=Transport, server_streamid=Promi State = stream_init(State0#state{server_streamid=PromisedStreamID + 2, encode_state=EncodeState}, PromisedStreamID, fin, iolist_to_binary(HeaderBlock)), commands(State, Stream, Tail); -%% @todo Update the flow control state. -commands(State, Stream, [{flow, _Size}|Tail]) -> - commands(State, Stream, Tail); +commands(State=#state{socket=Socket, transport=Transport, remote_window=ConnWindow}, + Stream=#stream{id=StreamID, remote_window=StreamWindow}, + [{flow, Size}|Tail]) -> + Transport:send(Socket, [ + cow_http2:window_update(Size), + cow_http2:window_update(StreamID, Size) + ]), + commands(State#state{remote_window=ConnWindow + Size}, + Stream#stream{remote_window=StreamWindow + Size}, Tail); %% Supervise a child process. commands(State=#state{children=Children}, Stream=#stream{id=StreamID}, [{spawn, Pid, _Shutdown}|Tail]) -> %% @todo Shutdown @@ -536,30 +561,89 @@ status(Status) when is_integer(Status) -> status(<< H, T, U, _/bits >>) when H >= $1, H =< $9, T >= $0, T =< $9, U >= $0, U =< $9 -> << H, T, U >>. -%% This same function is found in gun_http2. -send_data(Socket, Transport, StreamID, IsFin, Data, Length) -> - if - Length < byte_size(Data) -> - << Payload:Length/binary, Rest/bits >> = Data, - Transport:send(Socket, cow_http2:data(StreamID, nofin, Payload)), - send_data(Socket, Transport, StreamID, IsFin, Rest, Length); - true -> - Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)) - end. -%% @todo This is currently awfully slow. But at least it's correct. -sendfile(Socket, Transport, StreamID, IsFin, Offset, Bytes, Path, Length) -> - if - Length < Bytes -> - Transport:send(Socket, cow_http2:data_header(StreamID, nofin, Length)), - Transport:sendfile(Socket, Path, Offset, Length), - sendfile(Socket, Transport, StreamID, IsFin, - Offset + Length, Bytes - Length, Path, Length); - true -> + + +%% @todo Should we ever want to implement the PRIORITY mechanism, +%% this would be the place to do it. Right now, we just go over +%% all streams and send what we can until either everything is +%% sent or we run out of space in the window. +send_data(State=#state{streams=Streams}) -> + resume_streams(State, Streams, []). + +%% @todo When streams terminate we need to remove the stream. +resume_streams(State, [], Acc) -> + State#state{streams=lists:reverse(Acc)}; +%% While technically we should never get < 0 here, let's be on the safe side. +resume_streams(State=#state{local_window=ConnWindow}, Streams, Acc) + when ConnWindow =< 0 -> + State#state{streams=lists:reverse(Acc, Streams)}; +%% We rely on send_data/2 to do all the necessary checks about the stream. +resume_streams(State0, [Stream0|Tail], Acc) -> + {State, Stream} = send_data(State0, Stream0), + resume_streams(State, Tail, [Stream|Acc]). + +%% @todo We might want to print an error if local=fin. +%% +%% @todo It's possible that the stream terminates. We must remove it. +send_data(State=#state{local_window=ConnWindow}, + Stream=#stream{local=IsFin, local_window=StreamWindow, local_buffer_size=BufferSize}) + when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 -> + {State, Stream}; +send_data(State0, Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize}) -> + %% We know there is an item in the queue. + {{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0), + {State, Stream} = send_data(State0, + Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize}, + IsFin, Data), + send_data(State, Stream). + +%% Send data immediately if we can, buffer otherwise. +%% @todo We might want to print an error if local=fin. +send_data(State=#state{local_window=ConnWindow}, + Stream=#stream{local_window=StreamWindow}, IsFin, Data) + when ConnWindow =< 0; StreamWindow =< 0 -> + {State, queue_data(Stream, IsFin, Data)}; +send_data(State=#state{socket=Socket, transport=Transport, local_window=ConnWindow}, + Stream=#stream{id=StreamID, local_window=StreamWindow}, IsFin, Data) -> + MaxFrameSize = 16384, %% @todo Use the real SETTINGS_MAX_FRAME_SIZE set by the client. + SendSize = min(min(ConnWindow, StreamWindow), MaxFrameSize), + case Data of + {sendfile, Offset, Bytes, Path} when Bytes =< SendSize -> Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)), - Transport:sendfile(Socket, Path, Offset, Bytes) + Transport:sendfile(Socket, Path, Offset, Bytes), + {State#state{local_window=ConnWindow - SendSize}, + Stream#stream{local=IsFin, local_window=StreamWindow - SendSize}}; + {sendfile, Offset, Bytes, Path} -> + Transport:send(Socket, cow_http2:data_header(StreamID, nofin, SendSize)), + Transport:sendfile(Socket, Path, Offset, SendSize), + send_data(State#state{local_window=ConnWindow - SendSize}, + Stream#stream{local_window=StreamWindow - SendSize}, + IsFin, {sendfile, Offset + SendSize, Bytes - SendSize, Path}); + Iolist0 -> + IolistSize = iolist_size(Iolist0), + if + IolistSize =< SendSize -> + Transport:send(Socket, cow_http2:data(StreamID, IsFin, Iolist0)), + {State#state{local_window=ConnWindow - SendSize}, + Stream#stream{local=IsFin, local_window=StreamWindow - SendSize}}; + true -> + {Iolist, More} = cowboy_iolists:split(SendSize, Iolist0), + Transport:send(Socket, cow_http2:data(StreamID, nofin, Iolist)), + send_data(State#state{local_window=ConnWindow - SendSize}, + Stream#stream{local_window=StreamWindow - SendSize}, + IsFin, More) + end end. +queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data) -> + DataSize = case Data of + {sendfile, _, Bytes, _} -> Bytes; + Iolist -> iolist_size(Iolist) + end, + Q = queue:in({IsFin, DataSize, Data}, Q0), + Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}. + -spec terminate(#state{}, _) -> no_return(). terminate(undefined, Reason) -> exit({shutdown, Reason}); @@ -641,12 +725,17 @@ stream_init(State0=#state{ref=Ref, socket=Socket, transport=Transport, peer=Peer 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}) end. -stream_handler_init(State=#state{opts=Opts}, StreamID, RemoteIsFin, LocalIsFin, Req) -> +stream_handler_init(State=#state{opts=Opts, + local_settings=#{initial_window_size := RemoteWindow}, + remote_settings=#{initial_window_size := LocalWindow}}, + StreamID, RemoteIsFin, LocalIsFin, Req) -> try cowboy_stream:init(StreamID, Req, Opts) of {Commands, StreamState} -> commands(State#state{client_streamid=StreamID}, #stream{id=StreamID, state=StreamState, - remote=RemoteIsFin, local=LocalIsFin}, Commands) + remote=RemoteIsFin, local=LocalIsFin, + local_window=LocalWindow, remote_window=RemoteWindow}, + Commands) catch Class:Reason -> error_logger:error_msg("Exception occurred in " "cowboy_stream:init(~p, ~p, ~p) with reason ~p:~p.", @@ -676,12 +765,20 @@ stream_terminate(State=#state{socket=Socket, transport=Transport, Children = stream_terminate_children(Children0, StreamID, []), State1#state{streams=Streams, children=Children}; %% When a response was sent but not terminated, we need to close the stream. - {value, #stream{state=StreamState, local=nofin}, Streams} when Reason =:= normal -> + {value, #stream{state=StreamState, local=nofin, local_buffer_size=0}, Streams} + when Reason =:= normal -> Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)), stream_call_terminate(StreamID, Reason, StreamState), Children = stream_terminate_children(Children0, StreamID, []), State#state{streams=Streams, children=Children}; - %% Otherwise we sent an RST_STREAM and the stream is already closed. + %% Unless there is still data in the buffer. We can however reset + %% a few fields and set a special local state to avoid confusion. + {value, Stream=#stream{state=StreamState, local=nofin}, Streams} -> + stream_call_terminate(StreamID, Reason, StreamState), + Children = stream_terminate_children(Children0, StreamID, []), + State#state{streams=[Stream#stream{state=flush, local=flush}|Streams], + children=Children}; + %% Otherwise we sent an RST_STREAM and/or the stream is already closed. {value, #stream{state=StreamState}, Streams} -> stream_call_terminate(StreamID, Reason, StreamState), Children = stream_terminate_children(Children0, StreamID, []), diff --git a/src/cowboy_iolists.erl b/src/cowboy_iolists.erl new file mode 100644 index 0000000..51e0f33 --- /dev/null +++ b/src/cowboy_iolists.erl @@ -0,0 +1,71 @@ +%% Copyright (c) 2017, Loïc Hoguin +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(cowboy_iolists). + +-export([split/2]). + +-spec split(non_neg_integer(), iodata()) -> {iodata(), iodata()}. +split(N, Iolist) -> + case split(N, Iolist, []) of + {ok, Before, After} -> + {Before, After}; + {more, _, Before} -> + {lists:reverse(Before), <<>>} + end. + +split(0, Rest, Acc) -> + {ok, lists:reverse(Acc), Rest}; +split(N, [], Acc) -> + {more, N, Acc}; +split(N, Binary, Acc) when byte_size(Binary) =< N -> + {ok, lists:reverse([Binary|Acc]), <<>>}; +split(N, Binary, Acc) when is_binary(Binary) -> + << Before:N/binary, After/bits >> = Binary, + {ok, lists:reverse([Before|Acc]), After}; +split(N, [Binary|Tail], Acc) when byte_size(Binary) =< N -> + split(N - byte_size(Binary), Tail, [Binary|Acc]); +split(N, [Binary|Tail], Acc) when is_binary(Binary) -> + << Before:N/binary, After/bits >> = Binary, + {ok, lists:reverse([Before|Acc]), [After|Tail]}; +split(N, [Char|Tail], Acc) when is_integer(Char) -> + split(N - 1, Tail, [Char|Acc]); +split(N, [List|Tail], Acc0) -> + case split(N, List, Acc0) of + {ok, Before, After} -> + {ok, Before, [After|Tail]}; + {more, More, Acc} -> + split(More, Tail, Acc) + end. + +-ifdef(TEST). + +split_test_() -> + Tests = [ + {10, "Hello world!", "Hello worl", "d!"}, + {10, <<"Hello world!">>, "Hello worl", "d!"}, + {10, ["He", [<<"llo">>], $\s, [["world"], <<"!">>]], "Hello worl", "d!"}, + {10, ["Hello "|<<"world!">>], "Hello worl", "d!"}, + {10, "Hello!", "Hello!", ""}, + {10, <<"Hello!">>, "Hello!", ""}, + {10, ["He", [<<"ll">>], $o, [["!"]]], "Hello!", ""}, + {10, ["Hel"|<<"lo!">>], "Hello!", ""} + ], + [{iolist_to_binary(V), fun() -> + {B, A} = split(N, V), + true = iolist_to_binary(RB) =:= iolist_to_binary(B), + true = iolist_to_binary(RA) =:= iolist_to_binary(A) + end} || {N, V, RB, RA} <- Tests]. + +-endif. diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 9417ad1..ffc6e12 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -88,7 +88,7 @@ -export_type([cookie_opts/0]). -type read_body_opts() :: #{ - length => non_neg_integer() | infinity, + length => non_neg_integer(), period => non_neg_integer(), timeout => timeout() }. diff --git a/test/req_SUITE.erl b/test/req_SUITE.erl index fe1b826..a2c040e 100644 --- a/test/req_SUITE.erl +++ b/test/req_SUITE.erl @@ -49,10 +49,10 @@ init_dispatch(Config) -> {"/resp/:key[/:arg]", resp_h, []}, {"/multipart[/:key]", multipart_h, []}, {"/args/:key/:arg[/:default]", echo_h, []}, - {"/crash/:key/period", echo_h, #{length => infinity, period => 1000, crash => true}}, + {"/crash/:key/period", echo_h, #{length => 999999999, period => 1000, crash => true}}, {"/no-opts/:key", echo_h, #{crash => true}}, {"/opts/:key/length", echo_h, #{length => 1000}}, - {"/opts/:key/period", echo_h, #{length => infinity, period => 1000}}, + {"/opts/:key/period", echo_h, #{length => 999999999, period => 1000}}, {"/opts/:key/timeout", echo_h, #{timeout => 1000, crash => true}}, {"/full/:key", echo_h, []}, {"/no/:key", echo_h, []}, -- cgit v1.2.3