aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2017-05-19 18:13:30 +0200
committerLoïc Hoguin <[email protected]>2017-05-19 18:13:30 +0200
commit9bba03430c69eb756079ea925995e5a94a52776a (patch)
tree5900a7d8b160b705bd4e1c9955f12152792df32d
parent8e623296f4fa735e430cedeb57d816dfcca7883b (diff)
downloadgun-9bba03430c69eb756079ea925995e5a94a52776a.tar.gz
gun-9bba03430c69eb756079ea925995e5a94a52776a.tar.bz2
gun-9bba03430c69eb756079ea925995e5a94a52776a.zip
Add partial support for h2 flow control
Specifically we send WINDOW_UPDATE frames in order to receive more data, but we do not respect the flow control window when we are sending request bodies.
-rw-r--r--src/gun_http2.erl100
1 files changed, 79 insertions, 21 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 2195037..4e108ae 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -32,8 +32,12 @@
reply_to :: pid(),
%% Whether we finished sending data.
local = nofin :: cowboy_stream:fin(),
+ %% Local flow control window (how much we can send).
+ local_window :: integer(),
%% Whether we finished receiving data.
remote = nofin :: cowboy_stream:fin(),
+ %% Remote flow control window (how much we accept to receive).
+ remote_window :: integer(),
%% Content handlers state.
handler_state :: undefined | gun_content_handler:state()
}).
@@ -45,7 +49,17 @@
content_handlers :: gun_content_handler:opt(),
buffer = <<>> :: binary(),
- %% @todo local_settings, next_settings, remote_settings
+ local_settings = #{
+ initial_window_size => 65535,
+ max_frame_size => 16384
+ } :: map(),
+ remote_settings = #{
+ initial_window_size => 65535
+ } :: map(),
+
+ %% Connection-wide flow control window.
+ local_window = 65535 :: integer(), %% How much we can send.
+ remote_window = 65535 :: integer(), %% How much we accept to receive.
streams = [] :: [#stream{}],
stream_id = 1 :: non_neg_integer(),
@@ -73,14 +87,16 @@ do_check_options([Opt|_]) ->
name() -> http2.
init(Owner, Socket, Transport, Opts) ->
+ Handlers = maps:get(content_handlers, Opts, [gun_data]),
+ State = #http2_state{owner=Owner, socket=Socket,
+ transport=Transport, content_handlers=Handlers},
+ #http2_state{local_settings=Settings} = State,
%% Send the HTTP/2 preface.
Transport:send(Socket, [
<< "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>,
- cow_http2:settings(#{}) %% @todo Settings.
+ cow_http2:settings(Settings)
]),
- Handlers = maps:get(content_handlers, Opts, [gun_data]),
- #http2_state{owner=Owner, socket=Socket, transport=Transport,
- content_handlers=Handlers}.
+ State.
handle(Data, State=#http2_state{buffer=Buffer}) ->
parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}).
@@ -100,14 +116,18 @@ parse(Data0, State=#http2_state{buffer=Buffer}) ->
end.
%% DATA frame.
-frame({data, StreamID, IsFin, Data}, State) ->
- case get_stream_by_id(StreamID, State) of
- Stream = #stream{remote=nofin, handler_state=Handlers0} ->
+frame({data, StreamID, IsFin, Data}, State0=#http2_state{remote_window=ConnWindow}) ->
+ case get_stream_by_id(StreamID, State0) of
+ Stream0 = #stream{remote=nofin, remote_window=StreamWindow, handler_state=Handlers0} ->
Handlers = gun_content_handler:handle(IsFin, Data, Handlers0),
- remote_fin(Stream#stream{handler_state=Handlers}, State, IsFin);
+ {Stream, State} = send_window_update(
+ Stream0#stream{remote_window=StreamWindow - byte_size(Data),
+ handler_state=Handlers},
+ State0#http2_state{remote_window=ConnWindow - byte_size(Data)}),
+ remote_fin(Stream, State, IsFin);
_ ->
%% @todo protocol_error if not existing
- stream_reset(State, StreamID, {stream_error, stream_closed,
+ stream_reset(State0, StreamID, {stream_error, stream_closed,
'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'})
end;
%% Single HEADERS frame headers block.
@@ -162,10 +182,10 @@ frame({headers, StreamID, IsFin, head_fin, HeaderBlock},
frame({rst_stream, StreamID, Reason}, State) ->
stream_reset(State, StreamID, {stream_error, Reason, 'Stream reset by server.'});
%% SETTINGS frame.
-frame({settings, _Settings}, State=#http2_state{socket=Socket, transport=Transport}) ->
- %% @todo Apply SETTINGS.
+frame({settings, Settings}, State=#http2_state{socket=Socket, transport=Transport,
+ remote_settings=Settings0}) ->
Transport:send(Socket, cow_http2:settings_ack()),
- State;
+ State#http2_state{remote_settings=maps:merge(Settings0, Settings)};
%% Ack for a previously sent SETTINGS frame.
frame(settings_ack, State) -> %% @todo =#http2_state{next_settings=_NextSettings}) ->
%% @todo Apply SETTINGS that require synchronization.
@@ -219,18 +239,49 @@ frame({ping_ack, _Opaque}, State) ->
frame(Frame={goaway, StreamID, _, _}, State) ->
terminate(State, StreamID, {stop, Frame, 'Client is going away.'});
%% Connection-wide WINDOW_UPDATE frame.
-frame({window_update, _Increment}, State) ->
- %% @todo control flow
- State;
+frame({window_update, Increment}, State=#http2_state{local_window=ConnWindow}) ->
+ send_data(State#http2_state{local_window=ConnWindow + Increment});
%% Stream-specific WINDOW_UPDATE frame.
-frame({window_update, _StreamID, _Increment}, State) ->
- %% @todo stream-specific control flow
- State;
+frame({window_update, StreamID, Increment}, State0=#http2_state{streams=Streams0}) ->
+ case lists:keyfind(StreamID, #stream.id, Streams0) of
+ Stream0 = #stream{local_window=StreamWindow} ->
+ {State, Stream} = send_data(State0,
+ Stream0#stream{local_window=StreamWindow + Increment}),
+ Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream),
+ State#http2_state{streams=Streams};
+ false ->
+ %% @todo Receiving this frame on a stream in the idle state is an error.
+ %% WINDOW_UPDATE frames may be received for a short period of time
+ %% after a stream is closed. They must be ignored.
+ State0
+ end;
%% Unexpected CONTINUATION frame.
frame({continuation, StreamID, _, _}, State) ->
terminate(State, StreamID, {connection_error, protocol_error,
'CONTINUATION frames MUST be preceded by a HEADERS frame. (RFC7540 6.10)'}).
+send_window_update(Stream=#stream{id=StreamID, remote_window=StreamWindow0},
+ State=#http2_state{socket=Socket, transport=Transport, remote_window=ConnWindow0}) ->
+ %% @todo We should make the windows configurable.
+ MinConnWindow = 8000000,
+ MinStreamWindow = 1000000,
+ ConnWindow = if
+ ConnWindow0 =< MinConnWindow ->
+ Transport:send(Socket, cow_http2:window_update(MinConnWindow)),
+ ConnWindow0 + MinConnWindow;
+ true ->
+ ConnWindow0
+ end,
+ StreamWindow = if
+ StreamWindow0 =< MinStreamWindow ->
+ Transport:send(Socket, cow_http2:window_update(StreamID, MinStreamWindow)),
+ StreamWindow0 + MinStreamWindow;
+ true ->
+ StreamWindow0
+ end,
+ {Stream#stream{remote_window=StreamWindow},
+ State#http2_state{remote_window=ConnWindow}}.
+
parse_status(Status) ->
<< Code:3/binary, _/bits >> = Status,
list_to_integer(binary_to_list(Code)).
@@ -314,6 +365,9 @@ data(State=#http2_state{socket=Socket, transport=Transport},
error_stream_not_found(State, StreamRef, ReplyTo)
end.
+send_data(State) -> State.
+send_data(State, Stream) -> {State, Stream}.
+
%% This same function is found in cowboy_http2.
send_data(Socket, Transport, StreamID, IsFin, Data, Length) ->
if
@@ -386,8 +440,12 @@ error_stream_not_found(State, StreamRef, ReplyTo) ->
%% @todo probably change order of args and have state first?
new_stream(StreamID, StreamRef, ReplyTo, Remote, Local,
- State=#http2_state{streams=Streams}) ->
- New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, remote=Remote, local=Local},
+ State=#http2_state{streams=Streams,
+ local_settings=#{initial_window_size := RemoteWindow},
+ remote_settings=#{initial_window_size := LocalWindow}}) ->
+ New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo,
+ remote=Remote, remote_window=RemoteWindow,
+ local=Local, local_window=LocalWindow},
State#http2_state{streams=[New|Streams]}.
get_stream_by_id(StreamID, #http2_state{streams=Streams}) ->