%% Copyright (c) 2016, Loïc Hoguin %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above %% copyright notice and this permission notice appear in all copies. %% %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -module(gun_http2). -export([check_options/1]). -export([name/0]). -export([init/4]). -export([handle/2]). -export([close/1]). -export([keepalive/1]). -export([request/8]). -export([request/9]). -export([data/5]). -export([cancel/3]). -export([down/1]). -record(stream, { id :: non_neg_integer(), ref :: reference(), reply_to :: pid(), %% Whether we finished sending data. local = nofin :: cowboy_stream:fin(), %% Local flow control window (how much we can send). local_window :: integer(), %% Buffered data waiting for the flow control window to increase. local_buffer = queue:new() :: queue:queue( {fin | nofin, non_neg_integer(), iolist()}), local_buffer_size = 0 :: non_neg_integer(), local_trailers = undefined :: undefined | cowboy:http_headers(), %% Whether we finished receiving data. remote = nofin :: cowboy_stream:fin(), %% Remote flow control window (how much we accept to receive). remote_window :: integer(), %% Content handlers state. handler_state :: undefined | gun_content_handler:state() }). -record(http2_state, { owner :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), opts = #{} :: map(), %% @todo content_handlers :: gun_content_handler:opt(), buffer = <<>> :: binary(), local_settings = #{ initial_window_size => 65535, max_frame_size => 16384 } :: map(), remote_settings = #{ initial_window_size => 65535 } :: map(), %% Connection-wide flow control window. local_window = 65535 :: integer(), %% How much we can send. remote_window = 65535 :: integer(), %% How much we accept to receive. streams = [] :: [#stream{}], stream_id = 1 :: non_neg_integer(), %% HPACK decoding and encoding state. decode_state = cow_hpack:init() :: cow_hpack:state(), encode_state = cow_hpack:init() :: cow_hpack:state() }). check_options(Opts) -> do_check_options(maps:to_list(Opts)). do_check_options([]) -> ok; do_check_options([Opt={content_handlers, Handlers}|Opts]) -> case gun_content_handler:check_option(Handlers) of ok -> do_check_options(Opts); error -> {error, {options, {http, Opt}}} end; do_check_options([{keepalive, infinity}|Opts]) -> do_check_options(Opts); do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> do_check_options(Opts); %% @todo max_frame_size_sent do_check_options([Opt|_]) -> {error, {options, {http2, Opt}}}. name() -> http2. init(Owner, Socket, Transport, Opts) -> Handlers = maps:get(content_handlers, Opts, [gun_data_h]), State = #http2_state{owner=Owner, socket=Socket, transport=Transport, opts=Opts, content_handlers=Handlers}, #http2_state{local_settings=Settings} = State, %% Send the HTTP/2 preface. Transport:send(Socket, [ << "PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n">>, cow_http2:settings(Settings) ]), State. handle(Data, State=#http2_state{buffer=Buffer}) -> parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}). parse(Data0, State0=#http2_state{buffer=Buffer}) -> %% @todo Parse states: Preface. Continuation. Data = << Buffer/binary, Data0/binary >>, case cow_http2:parse(Data) of {ok, Frame, Rest} -> case frame(Frame, State0) of close -> close; State1 -> parse(Rest, State1) end; {stream_error, StreamID, Reason, Human, Rest} -> parse(Rest, stream_reset(State0, StreamID, {stream_error, Reason, Human})); Error = {connection_error, _, _} -> terminate(State0, Error); more -> State0#http2_state{buffer=Data} end. %% DATA frame. frame({data, StreamID, IsFin, Data}, State0=#http2_state{remote_window=ConnWindow}) -> case get_stream_by_id(StreamID, State0) of Stream0 = #stream{remote=nofin, remote_window=StreamWindow, handler_state=Handlers0} -> Handlers = gun_content_handler:handle(IsFin, Data, Handlers0), {Stream, State} = send_window_update( Stream0#stream{remote_window=StreamWindow - byte_size(Data), handler_state=Handlers}, State0#http2_state{remote_window=ConnWindow - byte_size(Data)}), remote_fin(Stream, State, IsFin); _ -> %% @todo protocol_error if not existing stream_reset(State0, StreamID, {stream_error, stream_closed, 'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'}) end; %% Single HEADERS frame headers block. frame({headers, StreamID, IsFin, head_fin, HeaderBlock}, State=#http2_state{decode_state=DecodeState0, content_handlers=Handlers0}) -> case get_stream_by_id(StreamID, State) of Stream = #stream{ref=StreamRef, reply_to=ReplyTo, remote=nofin} -> try cow_hpack:decode(HeaderBlock, DecodeState0) of {Headers0, DecodeState} -> case lists:keytake(<<":status">>, 1, Headers0) of {value, {_, Status}, Headers} -> IntStatus = parse_status(Status), if IntStatus >= 100, IntStatus =< 199 -> ReplyTo ! {gun_inform, self(), StreamRef, IntStatus, Headers}, State#http2_state{decode_state=DecodeState}; true -> ReplyTo ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers}, Handlers = case IsFin of fin -> undefined; nofin -> gun_content_handler:init(ReplyTo, StreamRef, Status, Headers, Handlers0) end, remote_fin(Stream#stream{handler_state=Handlers}, State#http2_state{decode_state=DecodeState}, IsFin) end; %% @todo For now we assume that it's a trailer if there's no :status. %% A better state machine is needed to distinguish between that and errors. false -> %% @todo We probably want to pass this to gun_content_handler? ReplyTo ! {gun_trailers, self(), StreamRef, Headers0}, remote_fin(Stream, State#http2_state{decode_state=DecodeState}, fin) %% false -> %% stream_reset(State, StreamID, {stream_error, protocol_error, %% 'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'}) end catch _:_ -> terminate(State, StreamID, {connection_error, compression_error, 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}) end; _ -> stream_reset(State, StreamID, {stream_error, stream_closed, 'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'}) end; %% @todo HEADERS frame starting a headers block. Enter continuation mode. %frame(State, {headers, StreamID, IsFin, head_nofin, HeaderBlockFragment}) -> % State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}}; %% @todo Single HEADERS frame headers block with priority. %frame(State, {headers, StreamID, IsFin, head_fin, % _IsExclusive, _DepStreamID, _Weight, HeaderBlock}) -> % %% @todo Handle priority. % stream_init(State, StreamID, IsFin, HeaderBlock); %% @todo HEADERS frame starting a headers block. Enter continuation mode. %frame(State, {headers, StreamID, IsFin, head_nofin, % _IsExclusive, _DepStreamID, _Weight, HeaderBlockFragment}) -> % %% @todo Handle priority. % State#http2_state{parse_state={continuation, StreamID, IsFin, HeaderBlockFragment}}; %% @todo PRIORITY frame. %frame(State, {priority, _StreamID, _IsExclusive, _DepStreamID, _Weight}) -> % %% @todo Validate StreamID? % %% @todo Handle priority. % State; %% @todo RST_STREAM frame. frame({rst_stream, StreamID, Reason}, State) -> stream_reset(State, StreamID, {stream_error, Reason, 'Stream reset by server.'}); %% SETTINGS frame. frame({settings, Settings}, State=#http2_state{socket=Socket, transport=Transport, remote_settings=Settings0}) -> Transport:send(Socket, cow_http2:settings_ack()), State#http2_state{remote_settings=maps:merge(Settings0, Settings)}; %% Ack for a previously sent SETTINGS frame. frame(settings_ack, State) -> %% @todo =#http2_state{next_settings=_NextSettings}) -> %% @todo Apply SETTINGS that require synchronization. State; %% PUSH_PROMISE frame. %% @todo Continuation. frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock}, State=#http2_state{streams=Streams, decode_state=DecodeState0}) -> case get_stream_by_id(PromisedStreamID, State) of false -> case get_stream_by_id(StreamID, State) of #stream{ref=StreamRef, reply_to=ReplyTo} -> try cow_hpack:decode(HeaderBlock, DecodeState0) of {Headers0, DecodeState} -> {Method, Scheme, Authority, Path, Headers} = try {value, {_, Method0}, Headers1} = lists:keytake(<<":method">>, 1, Headers0), {value, {_, Scheme0}, Headers2} = lists:keytake(<<":scheme">>, 1, Headers1), {value, {_, Authority0}, Headers3} = lists:keytake(<<":authority">>, 1, Headers2), {value, {_, Path0}, Headers4} = lists:keytake(<<":path">>, 1, Headers3), {Method0, Scheme0, Authority0, Path0, Headers4} catch error:badmatch -> stream_reset(State, StreamID, {stream_error, protocol_error, 'Malformed push promise; missing pseudo-header field. (RFC7540 8.1.2.3)'}) end, NewStreamRef = make_ref(), ReplyTo ! {gun_push, self(), StreamRef, NewStreamRef, Method, iolist_to_binary([Scheme, <<"://">>, Authority, Path]), Headers}, NewStream = new_stream(PromisedStreamID, NewStreamRef, ReplyTo, nofin, fin, State), State#http2_state{streams=[NewStream|Streams], decode_state=DecodeState} catch _:_ -> terminate(State, {connection_error, compression_error, 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}) end; _ -> stream_reset(State, StreamID, {stream_error, stream_closed, 'DATA frame received for a closed or non-existent stream. (RFC7540 6.1)'}) end; _ -> stream_reset(State, StreamID, {stream_error, todo, ''}) end; %% PING frame. frame({ping, Opaque}, State=#http2_state{socket=Socket, transport=Transport}) -> Transport:send(Socket, cow_http2:ping_ack(Opaque)), State; %% Ack for a previously sent PING frame. %% %% @todo Might want to check contents but probably a waste of time. frame({ping_ack, _Opaque}, State) -> State; %% GOAWAY frame. frame(Frame={goaway, StreamID, _, _}, State) -> terminate(State, StreamID, {stop, Frame, 'Client is going away.'}); %% Connection-wide WINDOW_UPDATE frame. frame({window_update, Increment}, State=#http2_state{local_window=ConnWindow}) -> send_data(State#http2_state{local_window=ConnWindow + Increment}); %% Stream-specific WINDOW_UPDATE frame. frame({window_update, StreamID, Increment}, State0=#http2_state{streams=Streams0}) -> case lists:keyfind(StreamID, #stream.id, Streams0) of Stream0 = #stream{local_window=StreamWindow} -> {State, Stream} = send_data(State0, Stream0#stream{local_window=StreamWindow + Increment}), Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream), State#http2_state{streams=Streams}; false -> %% @todo Receiving this frame on a stream in the idle state is an error. %% WINDOW_UPDATE frames may be received for a short period of time %% after a stream is closed. They must be ignored. State0 end; %% Unexpected CONTINUATION frame. frame({continuation, StreamID, _, _}, State) -> terminate(State, StreamID, {connection_error, protocol_error, 'CONTINUATION frames MUST be preceded by a HEADERS frame. (RFC7540 6.10)'}). send_window_update(Stream=#stream{id=StreamID, remote_window=StreamWindow0}, State=#http2_state{socket=Socket, transport=Transport, remote_window=ConnWindow0}) -> %% @todo We should make the windows configurable. MinConnWindow = 8000000, MinStreamWindow = 1000000, ConnWindow = if ConnWindow0 =< MinConnWindow -> Transport:send(Socket, cow_http2:window_update(MinConnWindow)), ConnWindow0 + MinConnWindow; true -> ConnWindow0 end, StreamWindow = if StreamWindow0 =< MinStreamWindow -> Transport:send(Socket, cow_http2:window_update(StreamID, MinStreamWindow)), StreamWindow0 + MinStreamWindow; true -> StreamWindow0 end, {Stream#stream{remote_window=StreamWindow}, State#http2_state{remote_window=ConnWindow}}. parse_status(Status) -> << Code:3/binary, _/bits >> = Status, list_to_integer(binary_to_list(Code)). close(#http2_state{streams=Streams}) -> close_streams(Streams). close_streams([]) -> ok; close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) -> ReplyTo ! {gun_error, self(), StreamRef, {closed, "The connection was lost."}}, close_streams(Tail). keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> Transport:send(Socket, cow_http2:ping(0)), State. request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0, streams=Streams, stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) -> {HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers), IsFin = case (false =/= lists:keyfind(<<"content-type">>, 1, Headers)) orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers)) of true -> nofin; false -> fin end, Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), Stream = new_stream(StreamID, StreamRef, ReplyTo, nofin, IsFin, State), State#http2_state{streams=[Stream|Streams], stream_id=StreamID + 2, encode_state=EncodeState}. %% @todo Handle Body > 16MB. (split it out into many frames) request(State0=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0, streams=Streams, stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body) -> Headers = lists:keystore(<<"content-length">>, 1, Headers0, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), {HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers), Transport:send(Socket, cow_http2:headers(StreamID, nofin, HeaderBlock)), Stream0 = new_stream(StreamID, StreamRef, ReplyTo, nofin, nofin, State0), {State, Stream} = send_data(State0, Stream0, fin, Body), State#http2_state{streams=[Stream|Streams], stream_id=StreamID + 2, encode_state=EncodeState}. prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) -> Host2 = case Host0 of Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); _ -> Host0 end, Authority = case lists:keyfind(<<"host">>, 1, Headers0) of {_, Host} -> Host; _ -> [Host2, $:, integer_to_binary(Port)] end, %% @todo We also must remove any header found in the connection header. Headers1 = lists:keydelete(<<"host">>, 1, lists:keydelete(<<"connection">>, 1, lists:keydelete(<<"keep-alive">>, 1, lists:keydelete(<<"proxy-connection">>, 1, lists:keydelete(<<"transfer-encoding">>, 1, lists:keydelete(<<"upgrade">>, 1, Headers0)))))), Headers = [ {<<":method">>, Method}, {<<":scheme">>, case Transport of gun_tls -> <<"https">>; gun_tcp -> <<"http">> end}, {<<":authority">>, Authority}, {<<":path">>, Path} |Headers1], cow_hpack:encode(Headers, EncodeState). data(State0, StreamRef, ReplyTo, IsFin, Data) -> case get_stream_by_ref(StreamRef, State0) of #stream{local=fin} -> error_stream_closed(State0, StreamRef, ReplyTo); Stream0 = #stream{} -> {State, Stream} = send_data(State0, Stream0, IsFin, Data), maybe_delete_stream(State, Stream); false -> error_stream_not_found(State0, StreamRef, ReplyTo) end. %% @todo Should we ever want to implement the PRIORITY mechanism, %% this would be the place to do it. Right now, we just go over %% all streams and send what we can until either everything is %% sent or we run out of space in the window. send_data(State=#http2_state{streams=Streams}) -> resume_streams(State, Streams, []). %% When SETTINGS_INITIAL_WINDOW_SIZE changes we need to update %% the local stream windows for all active streams and perhaps %% resume sending data. %update_streams_local_window(State=#http2_state{streams=Streams0}, Increment) -> % Streams = [ % S#stream{local_window=StreamWindow + Increment} % || S=#stream{local_window=StreamWindow} <- Streams0], % resume_streams(State, Streams, []). %% When we receive an ack to a SETTINGS frame we sent we need to update %% the remote stream windows for all active streams. %update_streams_remote_window(State=#http2_state{streams=Streams0}, Increment) -> % Streams = [ % S#stream{remote_window=StreamWindow + Increment} % || S=#stream{remote_window=StreamWindow} <- Streams0], % State#http2_state{streams=Streams}. resume_streams(State, [], Acc) -> State#http2_state{streams=lists:reverse(Acc)}; %% While technically we should never get < 0 here, let's be on the safe side. resume_streams(State=#http2_state{local_window=ConnWindow}, Streams, Acc) when ConnWindow =< 0 -> State#http2_state{streams=lists:reverse(Acc, Streams)}; %% We rely on send_data/2 to do all the necessary checks about the stream. resume_streams(State0, [Stream0|Tail], Acc) -> {State1, Stream} = send_data(State0, Stream0), resume_streams(State1, Tail, [Stream|Acc]). send_data(State, Stream=#stream{local=Local, local_buffer_size=0, local_trailers=Trailers}) when (Trailers =/= undefined) andalso ((Local =:= idle) orelse (Local =:= nofin)) -> send_trailers(State, Stream#stream{local_trailers=undefined}, Trailers); %% @todo It's possible that the stream terminates. We must remove it. send_data(State=#http2_state{local_window=ConnWindow}, Stream=#stream{local=IsFin, local_window=StreamWindow, local_buffer_size=BufferSize}) when ConnWindow =< 0; IsFin =:= fin; StreamWindow =< 0; BufferSize =:= 0 -> {State, Stream}; send_data(State0, Stream0=#stream{local_buffer=Q0, local_buffer_size=BufferSize}) -> %% We know there is an item in the queue. {{value, {IsFin, DataSize, Data}}, Q} = queue:out(Q0), {State, Stream} = send_data(State0, Stream0#stream{local_buffer=Q, local_buffer_size=BufferSize - DataSize}, IsFin, Data, in_r), send_data(State, Stream). send_data(State, Stream, IsFin, Data) -> send_data(State, Stream, IsFin, Data, in). %% We can send trailers immediately if the queue is empty, otherwise we queue. %% We always send trailer frames even if the window is empty. send_data(State, Stream=#stream{local_buffer_size=0}, fin, {trailers, Trailers}, _) -> send_trailers(State, Stream, Trailers); send_data(State, Stream, fin, {trailers, Trailers}, _) -> {State, Stream#stream{local_trailers=Trailers}}; %% Send data immediately if we can, buffer otherwise. send_data(State=#http2_state{local_window=ConnWindow}, Stream=#stream{local_window=StreamWindow}, IsFin, Data, In) when ConnWindow =< 0; StreamWindow =< 0 -> {State, queue_data(Stream, IsFin, Data, In)}; send_data(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, remote_settings=RemoteSettings, local_window=ConnWindow}, Stream=#stream{id=StreamID, local_window=StreamWindow}, IsFin, Data, In) -> RemoteMaxFrameSize = maps:get(max_frame_size, RemoteSettings, 16384), ConfiguredMaxFrameSize = maps:get(max_frame_size_sent, Opts, infinity), MaxSendSize = min( min(ConnWindow, StreamWindow), min(RemoteMaxFrameSize, ConfiguredMaxFrameSize) ), case Data of % {sendfile, Offset, Bytes, Path} when Bytes =< MaxSendSize -> % Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)), % Transport:sendfile(Socket, Path, Offset, Bytes), % {State#http2_state{local_window=ConnWindow - Bytes}, % Stream#stream{local=IsFin, local_window=StreamWindow - Bytes}}; % {sendfile, Offset, Bytes, Path} -> % Transport:send(Socket, cow_http2:data_header(StreamID, nofin, MaxSendSize)), % Transport:sendfile(Socket, Path, Offset, MaxSendSize), % send_data(State#http2_state{local_window=ConnWindow - MaxSendSize}, % Stream#stream{local_window=StreamWindow - MaxSendSize}, % IsFin, {sendfile, Offset + MaxSendSize, Bytes - MaxSendSize, Path}, In); Iolist0 -> IolistSize = iolist_size(Iolist0), if IolistSize =< MaxSendSize -> Transport:send(Socket, cow_http2:data(StreamID, IsFin, Iolist0)), {State#http2_state{local_window=ConnWindow - IolistSize}, Stream#stream{local=IsFin, local_window=StreamWindow - IolistSize}}; true -> {Iolist, More} = cow_iolists:split(MaxSendSize, Iolist0), Transport:send(Socket, cow_http2:data(StreamID, nofin, Iolist)), send_data(State#http2_state{local_window=ConnWindow - MaxSendSize}, Stream#stream{local_window=StreamWindow - MaxSendSize}, IsFin, More, In) end end. send_trailers(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0}, Stream=#stream{id=StreamID}, Trailers) -> {HeaderBlock, EncodeState} = cow_hpack:encode(Trailers, EncodeState0), Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), {State#http2_state{encode_state=EncodeState}, Stream#stream{local=fin}}. queue_data(Stream=#stream{local_buffer=Q0, local_buffer_size=Size0}, IsFin, Data, In) -> DataSize = case Data of % {sendfile, _, Bytes, _} -> Bytes; Iolist -> iolist_size(Iolist) end, Q = queue:In({IsFin, DataSize, Data}, Q0), Stream#stream{local_buffer=Q, local_buffer_size=Size0 + DataSize}. cancel(State=#http2_state{socket=Socket, transport=Transport}, StreamRef, ReplyTo) -> case get_stream_by_ref(StreamRef, State) of #stream{id=StreamID} -> Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)), delete_stream(StreamID, State); false -> error_stream_not_found(State, StreamRef, ReplyTo) end. %% @todo Add unprocessed streams when GOAWAY handling is done. down(#http2_state{streams=Streams}) -> KilledStreams = [Ref || #stream{ref=Ref} <- Streams], {KilledStreams, []}. terminate(#http2_state{streams=Streams}, Reason) -> %% Because a particular stream is unknown, %% we're sending the error message to all streams. %% @todo We should not send duplicate messages to processes. %% @todo We should probably also inform the owner process. _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams], %% @todo Send GOAWAY frame. %% @todo LastGoodStreamID close. terminate(State, StreamID, Reason) -> case get_stream_by_id(StreamID, State) of #stream{reply_to=ReplyTo} -> ReplyTo ! {gun_error, self(), Reason}, %% @todo Send GOAWAY frame. %% @todo LastGoodStreamID close; _ -> terminate(State, Reason) end. stream_reset(State=#http2_state{socket=Socket, transport=Transport, streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) -> Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), case lists:keytake(StreamID, #stream.id, Streams0) of {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} -> ReplyTo ! {gun_error, self(), StreamRef, StreamError}, State#http2_state{streams=Streams}; false -> %% @todo Unknown stream. Not sure what to do here. Check again once all %% terminate calls have been written. State end. error_stream_closed(State, StreamRef, ReplyTo) -> ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream has already been closed."}}, State. error_stream_not_found(State, StreamRef, ReplyTo) -> ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream cannot be found."}}, State. %% Streams. %% @todo probably change order of args and have state first? new_stream(StreamID, StreamRef, ReplyTo, Remote, Local, #http2_state{ local_settings=#{initial_window_size := RemoteWindow}, remote_settings=#{initial_window_size := LocalWindow}}) -> #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, remote=Remote, remote_window=RemoteWindow, local=Local, local_window=LocalWindow}. get_stream_by_id(StreamID, #http2_state{streams=Streams}) -> lists:keyfind(StreamID, #stream.id, Streams). get_stream_by_ref(StreamRef, #http2_state{streams=Streams}) -> lists:keyfind(StreamRef, #stream.ref, Streams). delete_stream(StreamID, State=#http2_state{streams=Streams}) -> Streams2 = lists:keydelete(StreamID, #stream.id, Streams), State#http2_state{streams=Streams2}. remote_fin(S=#stream{local=fin}, State, fin) -> delete_stream(S#stream.id, State); %% We always replace the stream in the state because %% the content handler state has changed. remote_fin(S, State=#http2_state{streams=Streams}, IsFin) -> Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams, S#stream{remote=IsFin}), State#http2_state{streams=Streams2}. maybe_delete_stream(State, Stream=#stream{local=fin, remote=fin}) -> delete_stream(Stream#stream.id, State); maybe_delete_stream(State=#http2_state{streams=Streams}, Stream) -> State#http2_state{streams= lists:keyreplace(Stream#stream.id, #stream.id, Streams, Stream)}.