From 6d5710c509548dfe0965d3c279b119978a3fc19d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 7 May 2018 18:11:11 +0200 Subject: Import the HTTP/2 send_data function from Cowboy Few changes were required so that's pretty good. It will be split off in a separate common module at a later time. --- src/gun_http2.erl | 205 ++++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 152 insertions(+), 53 deletions(-) diff --git a/src/gun_http2.erl b/src/gun_http2.erl index cc57f65..28b64eb 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -34,6 +34,11 @@ local = nofin :: cowboy_stream:fin(), %% Local flow control window (how much we can send). local_window :: integer(), + %% Buffered data waiting for the flow control window to increase. + local_buffer = queue:new() :: queue:queue( + {fin | nofin, non_neg_integer(), iolist()}), + local_buffer_size = 0 :: non_neg_integer(), + local_trailers = undefined :: undefined | cowboy:http_headers(), %% Whether we finished receiving data. remote = nofin :: cowboy_stream:fin(), %% Remote flow control window (how much we accept to receive). @@ -46,6 +51,7 @@ owner :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), + opts = #{} :: map(), %% @todo content_handlers :: gun_content_handler:opt(), buffer = <<>> :: binary(), @@ -91,7 +97,7 @@ name() -> http2. init(Owner, Socket, Transport, Opts) -> Handlers = maps:get(content_handlers, Opts, [gun_data]), State = #http2_state{owner=Owner, socket=Socket, - transport=Transport, content_handlers=Handlers}, + transport=Transport, opts=Opts, content_handlers=Handlers}, #http2_state{local_settings=Settings} = State, %% Send the HTTP/2 preface. Transport:send(Socket, [ @@ -211,7 +217,7 @@ frame(settings_ack, State) -> %% @todo =#http2_state{next_settings=_NextSettings %% PUSH_PROMISE frame. %% @todo Continuation. frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock}, - State=#http2_state{decode_state=DecodeState0}) -> + State=#http2_state{streams=Streams, decode_state=DecodeState0}) -> case get_stream_by_id(PromisedStreamID, State) of false -> case get_stream_by_id(StreamID, State) of @@ -231,8 +237,9 @@ frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock}, NewStreamRef = make_ref(), ReplyTo ! {gun_push, self(), StreamRef, NewStreamRef, Method, iolist_to_binary([Scheme, <<"://">>, Authority, Path]), Headers}, - new_stream(PromisedStreamID, NewStreamRef, ReplyTo, nofin, fin, - State#http2_state{decode_state=DecodeState}) + NewStream = new_stream(PromisedStreamID, NewStreamRef, ReplyTo, + nofin, fin, State), + State#http2_state{streams=[NewStream|Streams], decode_state=DecodeState} catch _:_ -> terminate(State, {connection_error, compression_error, 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}) @@ -319,7 +326,8 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> State. request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0, - stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) -> + streams=Streams, stream_id=StreamID}, StreamRef, ReplyTo, + Method, Host, Port, Path, Headers) -> {HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers), IsFin = case (false =/= lists:keyfind(<<"content-type">>, 1, Headers)) orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers)) of @@ -327,22 +335,20 @@ request(State=#http2_state{socket=Socket, transport=Transport, encode_state=Enco false -> fin end, Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), - new_stream(StreamID, StreamRef, ReplyTo, nofin, IsFin, - State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}). + Stream = new_stream(StreamID, StreamRef, ReplyTo, nofin, IsFin, State), + State#http2_state{streams=[Stream|Streams], stream_id=StreamID + 2, encode_state=EncodeState}. %% @todo Handle Body > 16MB. (split it out into many frames) -request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0, - stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body) -> +request(State0=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0, + streams=Streams, stream_id=StreamID}, StreamRef, ReplyTo, + Method, Host, Port, Path, Headers0, Body) -> Headers = lists:keystore(<<"content-length">>, 1, Headers0, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), {HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers), Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)), - %% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE. - %% Use the length set by the server instead, if any. - %% @todo Would be better if we didn't have to convert to binary. - send_data(Socket, Transport, StreamID, fin, iolist_to_binary(Body), 16384), - new_stream(StreamID, StreamRef, ReplyTo, nofin, fin, - State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}). + Stream0 = new_stream(StreamID, StreamRef, ReplyTo, nofin, nofin, State0), + {State, Stream} = send_data(State0, Stream0, fin, Body), + State#http2_state{streams=[Stream|Streams], stream_id=StreamID + 2, encode_state=EncodeState}. prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) -> Authority = case lists:keyfind(<<"host">>, 1, Headers0) of @@ -368,35 +374,133 @@ prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) -> |Headers1], cow_hpack:encode(Headers, EncodeState). -data(State=#http2_state{socket=Socket, transport=Transport}, - StreamRef, ReplyTo, IsFin, Data) -> - case get_stream_by_ref(StreamRef, State) of +data(State0, StreamRef, ReplyTo, IsFin, Data) -> + case get_stream_by_ref(StreamRef, State0) of #stream{local=fin} -> - error_stream_closed(State, StreamRef, ReplyTo); - S = #stream{} -> - %% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE. - %% Use the length set by the server instead, if any. - %% @todo Would be better if we didn't have to convert to binary. - send_data(Socket, Transport, S#stream.id, IsFin, iolist_to_binary(Data), 16384), - local_fin(S, State, IsFin); + error_stream_closed(State0, StreamRef, ReplyTo); + Stream0 = #stream{} -> + {State, Stream} = send_data(State0, Stream0, IsFin, Data), + maybe_delete_stream(State, Stream); false -> - error_stream_not_found(State, StreamRef, ReplyTo) + error_stream_not_found(State0, StreamRef, ReplyTo) end. -send_data(State) -> State. -send_data(State, Stream) -> {State, Stream}. - -%% This same function is found in cowboy_http2. -send_data(Socket, Transport, StreamID, IsFin, Data, Length) -> - if - Length < byte_size(Data) -> - << Payload:Length/binary, Rest/bits >> = Data, - Transport:send(Socket, cow_http2:data(StreamID, nofin, Payload)), - send_data(Socket, Transport, StreamID, IsFin, Rest, Length); - true -> - Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)) +%% @todo Should we ever want to implement the PRIORITY mechanism, +%% this would be the place to do it. Right now, we just go over +%% all streams and send what we can until either everything is +%% sent or we run out of space in the window. +send_data(State=#http2_state{streams=Streams}) -> + resume_streams(State, Streams, []). + +%% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update +%% the local stream windows for all active streams and perhaps +%% resume sending data. +%update_streams_local_window(State=#http2_state{streams=Streams0}, Increment) -> +% Streams = [ +% S#stream{local_window=StreamWindow + Increment} +% || S=#stream{local_window=StreamWindow} <- Streams0], +% resume_streams(State, Streams, []). + +%% When we receive an ack to a SETTINGS frame we sent we need to update +%% the remote stream windows for all active streams. +%update_streams_remote_window(State=#http2_state{streams=Streams0}, Increment) -> +% Streams = [ +% S#stream{remote_window=StreamWindow + Increment} +% || S=#stream{remote_window=StreamWindow} <- Streams0], +% State#http2_state{streams=Streams}. + +resume_streams(State, [], Acc) -> + State#http2_state{streams=lists:reverse(Acc)}; +%% While technically we should never get < 0 here, let's be on the safe side. +resume_streams(State=#http2_state{local_window=ConnWindow}, Streams, Acc) + when ConnWindow =< 0 -> + State#http2_state{streams=lists:reverse(Acc, Streams)}; +%% We rely on send_data/2 to do all the necessary checks about the stream. +resume_streams(State0, [Stream0|Tail], Acc) -> + {State1, Stream} = send_data(State0, Stream0), + resume_streams(State1, Tail, [Stream|Acc]). + +send_data(State, Stream=#stream{local=Local, local_buffer_size=0, local_trailers=Trailers}) + when (Trailers =/= undefined) andalso ((Local =:= idle) orelse (Local =:= nofin)) -> + send_trailers(State, Stream#stream{local_trailers=undefined}, Trailers); +%% @todo It's possible that the stream terminates. We must remove it. +send_data(State=#http2_state{local_window=ConnWindow}, + Stream=#stream{local=IsFin, local_window=StreamWindow, local_buffer_size=BufferSize}) + when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 -> + {State, Stream}; +send_data(State0, Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize}) -> + %% We know there is an item in the queue. + {{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0), + {State, Stream} = send_data(State0, + Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize}, + IsFin, Data, in_r), + send_data(State, Stream). + +send_data(State, Stream, IsFin, Data) -> + send_data(State, Stream, IsFin, Data, in). + +%% We can send trailers immediately if the queue is empty, otherwise we queue. +%% We always send trailer frames even if the window is empty. +send_data(State, Stream=#stream{local_buffer_size=0}, fin, {trailers, Trailers}, _) -> + send_trailers(State, Stream, Trailers); +send_data(State, Stream, fin, {trailers, Trailers}, _) -> + {State, Stream#stream{local_trailers=Trailers}}; +%% Send data immediately if we can, buffer otherwise. +send_data(State=#http2_state{local_window=ConnWindow}, + Stream=#stream{local_window=StreamWindow}, IsFin, Data, In) + when ConnWindow =< 0; StreamWindow =< 0 -> + {State, queue_data(Stream, IsFin, Data, In)}; +send_data(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, + remote_settings=RemoteSettings, local_window=ConnWindow}, + Stream=#stream{id=StreamID, local_window=StreamWindow}, IsFin, Data, In) -> + RemoteMaxFrameSize = maps:get(max_frame_size, RemoteSettings, 16384), + ConfiguredMaxFrameSize = maps:get(max_frame_size_sent, Opts, infinity), + MaxSendSize = min( + min(ConnWindow, StreamWindow), + min(RemoteMaxFrameSize, ConfiguredMaxFrameSize) + ), + case Data of + {sendfile, Offset, Bytes, Path} when Bytes =< MaxSendSize -> + Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)), + Transport:sendfile(Socket, Path, Offset, Bytes), + {State#http2_state{local_window=ConnWindow - Bytes}, + Stream#stream{local=IsFin, local_window=StreamWindow - Bytes}}; + {sendfile, Offset, Bytes, Path} -> + Transport:send(Socket, cow_http2:data_header(StreamID, nofin, MaxSendSize)), + Transport:sendfile(Socket, Path, Offset, MaxSendSize), + send_data(State#http2_state{local_window=ConnWindow - MaxSendSize}, + Stream#stream{local_window=StreamWindow - MaxSendSize}, + IsFin, {sendfile, Offset + MaxSendSize, Bytes - MaxSendSize, Path}, In); + Iolist0 -> + IolistSize = iolist_size(Iolist0), + if + IolistSize =< MaxSendSize -> + Transport:send(Socket, cow_http2:data(StreamID, IsFin, Iolist0)), + {State#http2_state{local_window=ConnWindow - IolistSize}, + Stream#stream{local=IsFin, local_window=StreamWindow - IolistSize}}; + true -> + {Iolist, More} = cow_iolists:split(MaxSendSize, Iolist0), + Transport:send(Socket, cow_http2:data(StreamID, nofin, Iolist)), + send_data(State#http2_state{local_window=ConnWindow - MaxSendSize}, + Stream#stream{local_window=StreamWindow - MaxSendSize}, + IsFin, More, In) + end end. +send_trailers(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0}, + Stream=#stream{id=StreamID}, Trailers) -> + {HeaderBlock, EncodeState} = cow_hpack:encode(Trailers, EncodeState0), + Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), + {State#http2_state{encode_state=EncodeState}, Stream#stream{local=fin}}. + +queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) -> + DataSize = case Data of + {sendfile, _, Bytes, _} -> Bytes; + Iolist -> iolist_size(Iolist) + end, + Q = queue:In({IsFin, DataSize, Data}, Q0), + Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}. + cancel(State=#http2_state{socket=Socket, transport=Transport}, StreamRef, ReplyTo) -> case get_stream_by_ref(StreamRef, State) of @@ -457,14 +561,12 @@ error_stream_not_found(State, StreamRef, ReplyTo) -> %% Streams. %% @todo probably change order of args and have state first? -new_stream(StreamID, StreamRef, ReplyTo, Remote, Local, - State=#http2_state{streams=Streams, - local_settings=#{initial_window_size := RemoteWindow}, - remote_settings=#{initial_window_size := LocalWindow}}) -> - New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, +new_stream(StreamID, StreamRef, ReplyTo, Remote, Local, #http2_state{ + local_settings=#{initial_window_size := RemoteWindow}, + remote_settings=#{initial_window_size := LocalWindow}}) -> + #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, remote=Remote, remote_window=RemoteWindow, - local=Local, local_window=LocalWindow}, - State#http2_state{streams=[New|Streams]}. + local=Local, local_window=LocalWindow}. get_stream_by_id(StreamID, #http2_state{streams=Streams}) -> lists:keyfind(StreamID, #stream.id, Streams). @@ -485,11 +587,8 @@ remote_fin(S, State=#http2_state{streams=Streams}, IsFin) -> S#stream{remote=IsFin}), State#http2_state{streams=Streams2}. -local_fin(_, State, nofin) -> - State; -local_fin(S=#stream{remote=fin}, State, fin) -> - delete_stream(S#stream.id, State); -local_fin(S, State=#http2_state{streams=Streams}, IsFin) -> - Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams, - S#stream{local=IsFin}), - State#http2_state{streams=Streams2}. +maybe_delete_stream(State, Stream=#stream{local=fin, remote=fin}) -> + delete_stream(Stream#stream.id, State); +maybe_delete_stream(State=#http2_state{streams=Streams}, Stream) -> + State#http2_state{streams= + lists:keyreplace(Stream#stream.id, #stream.id, Streams, Stream)}. -- cgit v1.2.3