aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/gun_http2.erl205
1 files changed, 152 insertions, 53 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index cc57f65..28b64eb 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -34,6 +34,11 @@
local = nofin :: cowboy_stream:fin(),
%% Local flow control window (how much we can send).
local_window :: integer(),
+ %% Buffered data waiting for the flow control window to increase.
+ local_buffer = queue:new() :: queue:queue(
+ {fin | nofin, non_neg_integer(), iolist()}),
+ local_buffer_size = 0 :: non_neg_integer(),
+ local_trailers = undefined :: undefined | cowboy:http_headers(),
%% Whether we finished receiving data.
remote = nofin :: cowboy_stream:fin(),
%% Remote flow control window (how much we accept to receive).
@@ -46,6 +51,7 @@
owner :: pid(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
+ opts = #{} :: map(), %% @todo
content_handlers :: gun_content_handler:opt(),
buffer = <<>> :: binary(),
@@ -91,7 +97,7 @@ name() -> http2.
init(Owner, Socket, Transport, Opts) ->
Handlers = maps:get(content_handlers, Opts, [gun_data]),
State = #http2_state{owner=Owner, socket=Socket,
- transport=Transport, content_handlers=Handlers},
+ transport=Transport, opts=Opts, content_handlers=Handlers},
#http2_state{local_settings=Settings} = State,
%% Send the HTTP/2 preface.
Transport:send(Socket, [
@@ -211,7 +217,7 @@ frame(settings_ack, State) -> %% @todo =#http2_state{next_settings=_NextSettings
%% PUSH_PROMISE frame.
%% @todo Continuation.
frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock},
- State=#http2_state{decode_state=DecodeState0}) ->
+ State=#http2_state{streams=Streams, decode_state=DecodeState0}) ->
case get_stream_by_id(PromisedStreamID, State) of
false ->
case get_stream_by_id(StreamID, State) of
@@ -231,8 +237,9 @@ frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock},
NewStreamRef = make_ref(),
ReplyTo ! {gun_push, self(), StreamRef, NewStreamRef, Method,
iolist_to_binary([Scheme, <<"://">>, Authority, Path]), Headers},
- new_stream(PromisedStreamID, NewStreamRef, ReplyTo, nofin, fin,
- State#http2_state{decode_state=DecodeState})
+ NewStream = new_stream(PromisedStreamID, NewStreamRef, ReplyTo,
+ nofin, fin, State),
+ State#http2_state{streams=[NewStream|Streams], decode_state=DecodeState}
catch _:_ ->
terminate(State, {connection_error, compression_error,
'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
@@ -319,7 +326,8 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}) ->
State.
request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0,
- stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) ->
+ streams=Streams, stream_id=StreamID}, StreamRef, ReplyTo,
+ Method, Host, Port, Path, Headers) ->
{HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers),
IsFin = case (false =/= lists:keyfind(<<"content-type">>, 1, Headers))
orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers)) of
@@ -327,22 +335,20 @@ request(State=#http2_state{socket=Socket, transport=Transport, encode_state=Enco
false -> fin
end,
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
- new_stream(StreamID, StreamRef, ReplyTo, nofin, IsFin,
- State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}).
+ Stream = new_stream(StreamID, StreamRef, ReplyTo, nofin, IsFin, State),
+ State#http2_state{streams=[Stream|Streams], stream_id=StreamID + 2, encode_state=EncodeState}.
%% @todo Handle Body > 16MB. (split it out into many frames)
-request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0,
- stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body) ->
+request(State0=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0,
+ streams=Streams, stream_id=StreamID}, StreamRef, ReplyTo,
+ Method, Host, Port, Path, Headers0, Body) ->
Headers = lists:keystore(<<"content-length">>, 1, Headers0,
{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
{HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers),
Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)),
- %% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE.
- %% Use the length set by the server instead, if any.
- %% @todo Would be better if we didn't have to convert to binary.
- send_data(Socket, Transport, StreamID, fin, iolist_to_binary(Body), 16384),
- new_stream(StreamID, StreamRef, ReplyTo, nofin, fin,
- State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}).
+ Stream0 = new_stream(StreamID, StreamRef, ReplyTo, nofin, nofin, State0),
+ {State, Stream} = send_data(State0, Stream0, fin, Body),
+ State#http2_state{streams=[Stream|Streams], stream_id=StreamID + 2, encode_state=EncodeState}.
prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) ->
Authority = case lists:keyfind(<<"host">>, 1, Headers0) of
@@ -368,35 +374,133 @@ prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) ->
|Headers1],
cow_hpack:encode(Headers, EncodeState).
-data(State=#http2_state{socket=Socket, transport=Transport},
- StreamRef, ReplyTo, IsFin, Data) ->
- case get_stream_by_ref(StreamRef, State) of
+data(State0, StreamRef, ReplyTo, IsFin, Data) ->
+ case get_stream_by_ref(StreamRef, State0) of
#stream{local=fin} ->
- error_stream_closed(State, StreamRef, ReplyTo);
- S = #stream{} ->
- %% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE.
- %% Use the length set by the server instead, if any.
- %% @todo Would be better if we didn't have to convert to binary.
- send_data(Socket, Transport, S#stream.id, IsFin, iolist_to_binary(Data), 16384),
- local_fin(S, State, IsFin);
+ error_stream_closed(State0, StreamRef, ReplyTo);
+ Stream0 = #stream{} ->
+ {State, Stream} = send_data(State0, Stream0, IsFin, Data),
+ maybe_delete_stream(State, Stream);
false ->
- error_stream_not_found(State, StreamRef, ReplyTo)
+ error_stream_not_found(State0, StreamRef, ReplyTo)
end.
-send_data(State) -> State.
-send_data(State, Stream) -> {State, Stream}.
-
-%% This same function is found in cowboy_http2.
-send_data(Socket, Transport, StreamID, IsFin, Data, Length) ->
- if
- Length < byte_size(Data) ->
- << Payload:Length/binary, Rest/bits >> = Data,
- Transport:send(Socket, cow_http2:data(StreamID, nofin, Payload)),
- send_data(Socket, Transport, StreamID, IsFin, Rest, Length);
- true ->
- Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data))
+%% @todo Should we ever want to implement the PRIORITY mechanism,
+%% this would be the place to do it. Right now, we just go over
+%% all streams and send what we can until either everything is
+%% sent or we run out of space in the window.
+send_data(State=#http2_state{streams=Streams}) ->
+ resume_streams(State, Streams, []).
+
+%% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update
+%% the local stream windows for all active streams and perhaps
+%% resume sending data.
+%update_streams_local_window(State=#http2_state{streams=Streams0}, Increment) ->
+% Streams = [
+% S#stream{local_window=StreamWindow + Increment}
+% || S=#stream{local_window=StreamWindow} <- Streams0],
+% resume_streams(State, Streams, []).
+
+%% When we receive an ack to a SETTINGS frame we sent we need to update
+%% the remote stream windows for all active streams.
+%update_streams_remote_window(State=#http2_state{streams=Streams0}, Increment) ->
+% Streams = [
+% S#stream{remote_window=StreamWindow + Increment}
+% || S=#stream{remote_window=StreamWindow} <- Streams0],
+% State#http2_state{streams=Streams}.
+
+resume_streams(State, [], Acc) ->
+ State#http2_state{streams=lists:reverse(Acc)};
+%% While technically we should never get < 0 here, let's be on the safe side.
+resume_streams(State=#http2_state{local_window=ConnWindow}, Streams, Acc)
+ when ConnWindow =< 0 ->
+ State#http2_state{streams=lists:reverse(Acc, Streams)};
+%% We rely on send_data/2 to do all the necessary checks about the stream.
+resume_streams(State0, [Stream0|Tail], Acc) ->
+ {State1, Stream} = send_data(State0, Stream0),
+ resume_streams(State1, Tail, [Stream|Acc]).
+
+send_data(State, Stream=#stream{local=Local, local_buffer_size=0, local_trailers=Trailers})
+ when (Trailers =/= undefined) andalso ((Local =:= idle) orelse (Local =:= nofin)) ->
+ send_trailers(State, Stream#stream{local_trailers=undefined}, Trailers);
+%% @todo It's possible that the stream terminates. We must remove it.
+send_data(State=#http2_state{local_window=ConnWindow},
+ Stream=#stream{local=IsFin, local_window=StreamWindow, local_buffer_size=BufferSize})
+ when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 ->
+ {State, Stream};
+send_data(State0, Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize}) ->
+ %% We know there is an item in the queue.
+ {{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0),
+ {State, Stream} = send_data(State0,
+ Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize},
+ IsFin, Data, in_r),
+ send_data(State, Stream).
+
+send_data(State, Stream, IsFin, Data) ->
+ send_data(State, Stream, IsFin, Data, in).
+
+%% We can send trailers immediately if the queue is empty, otherwise we queue.
+%% We always send trailer frames even if the window is empty.
+send_data(State, Stream=#stream{local_buffer_size=0}, fin, {trailers, Trailers}, _) ->
+ send_trailers(State, Stream, Trailers);
+send_data(State, Stream, fin, {trailers, Trailers}, _) ->
+ {State, Stream#stream{local_trailers=Trailers}};
+%% Send data immediately if we can, buffer otherwise.
+send_data(State=#http2_state{local_window=ConnWindow},
+ Stream=#stream{local_window=StreamWindow}, IsFin, Data, In)
+ when ConnWindow =< 0; StreamWindow =< 0 ->
+ {State, queue_data(Stream, IsFin, Data, In)};
+send_data(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
+ remote_settings=RemoteSettings, local_window=ConnWindow},
+ Stream=#stream{id=StreamID, local_window=StreamWindow}, IsFin, Data, In) ->
+ RemoteMaxFrameSize = maps:get(max_frame_size, RemoteSettings, 16384),
+ ConfiguredMaxFrameSize = maps:get(max_frame_size_sent, Opts, infinity),
+ MaxSendSize = min(
+ min(ConnWindow, StreamWindow),
+ min(RemoteMaxFrameSize, ConfiguredMaxFrameSize)
+ ),
+ case Data of
+ {sendfile, Offset, Bytes, Path} when Bytes =< MaxSendSize ->
+ Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)),
+ Transport:sendfile(Socket, Path, Offset, Bytes),
+ {State#http2_state{local_window=ConnWindow - Bytes},
+ Stream#stream{local=IsFin, local_window=StreamWindow - Bytes}};
+ {sendfile, Offset, Bytes, Path} ->
+ Transport:send(Socket, cow_http2:data_header(StreamID, nofin, MaxSendSize)),
+ Transport:sendfile(Socket, Path, Offset, MaxSendSize),
+ send_data(State#http2_state{local_window=ConnWindow - MaxSendSize},
+ Stream#stream{local_window=StreamWindow - MaxSendSize},
+ IsFin, {sendfile, Offset + MaxSendSize, Bytes - MaxSendSize, Path}, In);
+ Iolist0 ->
+ IolistSize = iolist_size(Iolist0),
+ if
+ IolistSize =< MaxSendSize ->
+ Transport:send(Socket, cow_http2:data(StreamID, IsFin, Iolist0)),
+ {State#http2_state{local_window=ConnWindow - IolistSize},
+ Stream#stream{local=IsFin, local_window=StreamWindow - IolistSize}};
+ true ->
+ {Iolist, More} = cow_iolists:split(MaxSendSize, Iolist0),
+ Transport:send(Socket, cow_http2:data(StreamID, nofin, Iolist)),
+ send_data(State#http2_state{local_window=ConnWindow - MaxSendSize},
+ Stream#stream{local_window=StreamWindow - MaxSendSize},
+ IsFin, More, In)
+ end
end.
+send_trailers(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0},
+ Stream=#stream{id=StreamID}, Trailers) ->
+ {HeaderBlock, EncodeState} = cow_hpack:encode(Trailers, EncodeState0),
+ Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
+ {State#http2_state{encode_state=EncodeState}, Stream#stream{local=fin}}.
+
+queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) ->
+ DataSize = case Data of
+ {sendfile, _, Bytes, _} -> Bytes;
+ Iolist -> iolist_size(Iolist)
+ end,
+ Q = queue:In({IsFin, DataSize, Data}, Q0),
+ Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}.
+
cancel(State=#http2_state{socket=Socket, transport=Transport},
StreamRef, ReplyTo) ->
case get_stream_by_ref(StreamRef, State) of
@@ -457,14 +561,12 @@ error_stream_not_found(State, StreamRef, ReplyTo) ->
%% Streams.
%% @todo probably change order of args and have state first?
-new_stream(StreamID, StreamRef, ReplyTo, Remote, Local,
- State=#http2_state{streams=Streams,
- local_settings=#{initial_window_size := RemoteWindow},
- remote_settings=#{initial_window_size := LocalWindow}}) ->
- New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo,
+new_stream(StreamID, StreamRef, ReplyTo, Remote, Local, #http2_state{
+ local_settings=#{initial_window_size := RemoteWindow},
+ remote_settings=#{initial_window_size := LocalWindow}}) ->
+ #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo,
remote=Remote, remote_window=RemoteWindow,
- local=Local, local_window=LocalWindow},
- State#http2_state{streams=[New|Streams]}.
+ local=Local, local_window=LocalWindow}.
get_stream_by_id(StreamID, #http2_state{streams=Streams}) ->
lists:keyfind(StreamID, #stream.id, Streams).
@@ -485,11 +587,8 @@ remote_fin(S, State=#http2_state{streams=Streams}, IsFin) ->
S#stream{remote=IsFin}),
State#http2_state{streams=Streams2}.
-local_fin(_, State, nofin) ->
- State;
-local_fin(S=#stream{remote=fin}, State, fin) ->
- delete_stream(S#stream.id, State);
-local_fin(S, State=#http2_state{streams=Streams}, IsFin) ->
- Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams,
- S#stream{local=IsFin}),
- State#http2_state{streams=Streams2}.
+maybe_delete_stream(State, Stream=#stream{local=fin, remote=fin}) ->
+ delete_stream(Stream#stream.id, State);
+maybe_delete_stream(State=#http2_state{streams=Streams}, Stream) ->
+ State#http2_state{streams=
+ lists:keyreplace(Stream#stream.id, #stream.id, Streams, Stream)}.