%% Copyright (c) 2016-2019, Loïc Hoguin %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above %% copyright notice and this permission notice appear in all copies. %% %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -module(gun_http2). -export([check_options/1]). -export([name/0]). -export([init/4]). -export([handle/4]). -export([update_flow/4]). -export([close/4]). -export([keepalive/1]). -export([headers/11]). -export([request/12]). -export([data/7]). -export([cancel/5]). -export([stream_info/2]). -export([down/1]). -record(stream, { id = undefined :: cow_http2:streamid(), %% Reference used by the user of Gun to refer to this stream. ref :: reference(), %% Process to send messages to. reply_to :: pid(), %% Flow control. flow :: integer() | infinity, flow_window = 0 :: non_neg_integer(), %% Content handlers state. handler_state :: undefined | gun_content_handler:state() }). -record(http2_state, { owner :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), opts = #{} :: map(), %% @todo content_handlers :: gun_content_handler:opt(), buffer = <<>> :: binary(), %% HTTP/2 state machine. http2_machine :: cow_http2_machine:http2_machine(), %% Currently active HTTP/2 streams. Streams may be initiated either %% by the client or by the server through PUSH_PROMISE frames. streams = [] :: [#stream{}] }). check_options(Opts) -> do_check_options(maps:to_list(Opts)). do_check_options([]) -> ok; do_check_options([Opt={content_handlers, Handlers}|Opts]) -> case gun_content_handler:check_option(Handlers) of ok -> do_check_options(Opts); error -> {error, {options, {http2, Opt}}} end; do_check_options([{flow, InitialFlow}|Opts]) when is_integer(InitialFlow), InitialFlow > 0 -> do_check_options(Opts); do_check_options([{keepalive, infinity}|Opts]) -> do_check_options(Opts); do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> do_check_options(Opts); %% @todo Add all http2_machine options. do_check_options([{max_frame_size_received, _}|Opts]) -> do_check_options(Opts); do_check_options([Opt|_]) -> {error, {options, {http2, Opt}}}. name() -> http2. init(Owner, Socket, Transport, Opts) -> {ok, Preface, HTTP2Machine} = cow_http2_machine:init(client, Opts), Handlers = maps:get(content_handlers, Opts, [gun_data_h]), %% @todo Better validate the preface being received. State = #http2_state{owner=Owner, socket=Socket, transport=Transport, opts=Opts, content_handlers=Handlers, http2_machine=HTTP2Machine}, Transport:send(Socket, Preface), State. handle(Data, State=#http2_state{buffer=Buffer}, EvHandler, EvHandlerState) -> parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}, EvHandler, EvHandlerState). parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandlerState0) -> MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine), case cow_http2:parse(Data, MaxFrameSize) of {ok, Frame, Rest} -> case frame(State0, Frame, EvHandler, EvHandlerState0) of Close = {close, _} -> Close; {State, EvHandlerState} -> parse(Rest, State, EvHandler, EvHandlerState) end; {ignore, Rest} -> case ignored_frame(State0) of close -> {close, EvHandlerState0}; State -> parse(Rest, State, EvHandler, EvHandlerState0) end; {stream_error, StreamID, Reason, Human, Rest} -> parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human}), EvHandler, EvHandlerState0); Error = {connection_error, _, _} -> {terminate(State0, Error), EvHandlerState0}; more -> {{state, State0#http2_state{buffer=Data}}, EvHandlerState0} end. %% Frames received. frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandlerState0) -> EvHandlerState = if element(1, Frame) =:= headers; element(1, Frame) =:= push_promise -> EvStreamID = element(2, Frame), case cow_http2_machine:get_stream_remote_state(EvStreamID, HTTP2Machine0) of {ok, idle} -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, EvStreamID), EvCallback = case element(1, Frame) of headers -> response_start; push_promise -> push_promise_start end, EvHandler:EvCallback(#{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandlerState0); %% Trailers or invalid header frame. _ -> EvHandlerState0 end; true -> EvHandlerState0 end, case cow_http2_machine:frame(Frame, HTTP2Machine0) of {ok, HTTP2Machine} -> {maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), EvHandlerState}; {ok, {data, StreamID, IsFin, Data}, HTTP2Machine} -> data_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Data, EvHandler, EvHandlerState); {ok, {lingering_data, _StreamID, DataLen}, HTTP2Machine} -> {lingering_data_frame(State#http2_state{http2_machine=HTTP2Machine}, DataLen), EvHandlerState}; {ok, {headers, StreamID, IsFin, Headers, PseudoHeaders, BodyLen}, HTTP2Machine} -> headers_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, IsFin, Headers, PseudoHeaders, BodyLen, EvHandler, EvHandlerState); {ok, {trailers, StreamID, Trailers}, HTTP2Machine} -> trailers_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, Trailers, EvHandler, EvHandlerState); {ok, {rst_stream, StreamID, Reason}, HTTP2Machine} -> rst_stream_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, Reason, EvHandler, EvHandlerState); {ok, {push_promise, StreamID, PromisedStreamID, Headers, PseudoHeaders}, HTTP2Machine} -> push_promise_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, PromisedStreamID, Headers, PseudoHeaders, EvHandler, EvHandlerState); {ok, Frame={goaway, _StreamID, _Reason, _Data}, HTTP2Machine} -> {terminate(State#http2_state{http2_machine=HTTP2Machine}, {stop, Frame, 'Server is going away.'}), EvHandlerState}; {send, SendData, HTTP2Machine} -> send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData, EvHandler, EvHandlerState); {error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} -> {reset_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID, {stream_error, Reason, Human}), EvHandlerState}; {error, Error={connection_error, _, _}, HTTP2Machine} -> {terminate(State#http2_state{http2_machine=HTTP2Machine}, Error), EvHandlerState} end. maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) -> case Frame of {settings, _} -> Transport:send(Socket, cow_http2:settings_ack()); {ping, Opaque} -> Transport:send(Socket, cow_http2:ping_ack(Opaque)); _ -> ok end, State. lingering_data_frame(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, DataLen) -> Transport:send(Socket, cow_http2:window_update(DataLen)), HTTP2Machine1 = cow_http2_machine:update_window(DataLen, HTTP2Machine0), State#http2_state{http2_machine=HTTP2Machine1}. data_frame(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, IsFin, Data, EvHandler, EvHandlerState0) -> Stream = #stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0, flow_window=FlowWindow0, handler_state=Handlers0} = get_stream_by_id(State, StreamID), {ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0), Flow = case Flow0 of infinity -> infinity; _ -> Flow0 - Dec end, Size = byte_size(Data), FlowWindow = if IsFin =:= nofin, Flow =< 0 -> FlowWindow0 + Size; true -> FlowWindow0 end, {HTTP2Machine, EvHandlerState} = case Size of %% We do not send a WINDOW_UPDATE if the DATA frame was of size 0. 0 when IsFin =:= fin -> EvHandlerState1 = EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandlerState0), {HTTP2Machine0, EvHandlerState1}; 0 -> {HTTP2Machine0, EvHandlerState0}; _ -> Transport:send(Socket, cow_http2:window_update(Size)), HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0), %% We do not send a stream WINDOW_UPDATE when the flow control kicks in %% (it'll be sent when the flow recovers) or for the last DATA frame. case IsFin of nofin when Flow =< 0 -> {HTTP2Machine1, EvHandlerState0}; nofin -> Transport:send(Socket, cow_http2:window_update(StreamID, Size)), {cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1), EvHandlerState0}; fin -> EvHandlerState1 = EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandlerState0), {HTTP2Machine1, EvHandlerState1} end end, {maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream#stream{flow=Flow, flow_window=FlowWindow, handler_state=Handlers}), StreamID, remote, IsFin), EvHandlerState}. headers_frame(State=#http2_state{content_handlers=Handlers0}, StreamID, IsFin, Headers, PseudoHeaders, _BodyLen, EvHandler, EvHandlerState0) -> Stream = #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), case PseudoHeaders of #{status := Status} when Status >= 100, Status =< 199 -> ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers}, EvHandlerState = EvHandler:response_inform(#{ stream_ref => StreamRef, reply_to => ReplyTo, status => Status, headers => Headers }, EvHandlerState0), {State, EvHandlerState}; #{status := Status} -> ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers}, EvHandlerState1 = EvHandler:response_headers(#{ stream_ref => StreamRef, reply_to => ReplyTo, status => Status, headers => Headers }, EvHandlerState0), {Handlers, EvHandlerState} = case IsFin of fin -> EvHandlerState2 = EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandlerState1), {undefined, EvHandlerState2}; nofin -> {gun_content_handler:init(ReplyTo, StreamRef, Status, Headers, Handlers0), EvHandlerState1} end, {maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}), StreamID, remote, IsFin), EvHandlerState} end. trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), %% @todo We probably want to pass this to gun_content_handler? ReplyTo ! {gun_trailers, self(), StreamRef, Trailers}, ResponseEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0), EvHandlerState = EvHandler:response_end(ResponseEvent, EvHandlerState1), {maybe_delete_stream(State, StreamID, remote, fin), EvHandlerState}. rst_stream_frame(State=#http2_state{streams=Streams0}, StreamID, Reason, EvHandler, EvHandlerState0) -> case lists:keytake(StreamID, #stream.id, Streams0) of {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} -> ReplyTo ! {gun_error, self(), StreamRef, {stream_error, Reason, 'Stream reset by server.'}}, EvHandlerState = EvHandler:cancel(#{ stream_ref => StreamRef, reply_to => ReplyTo, endpoint => remote, reason => Reason }, EvHandlerState0), {State#http2_state{streams=Streams}, EvHandlerState}; false -> {State, EvHandlerState0} end. %% Pushed streams receive the same initial flow value as the parent stream. push_promise_frame(State=#http2_state{streams=Streams}, StreamID, PromisedStreamID, Headers, #{ method := Method, scheme := Scheme, authority := Authority, path := Path}, EvHandler, EvHandlerState0) -> #stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID), PromisedStreamRef = make_ref(), URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]), ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers}, EvHandlerState = EvHandler:push_promise_end(#{ stream_ref => StreamRef, reply_to => ReplyTo, promised_stream_ref => PromisedStreamRef, method => Method, uri => URI, headers => Headers }, EvHandlerState0), NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, reply_to=ReplyTo, flow=InitialFlow}, {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}. ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> case cow_http2_machine:ignored_frame(HTTP2Machine0) of {ok, HTTP2Machine} -> State#http2_state{http2_machine=HTTP2Machine}; {error, Error={connection_error, _, _}, HTTP2Machine} -> terminate(State#http2_state{http2_machine=HTTP2Machine}, Error) end. update_flow(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, _ReplyTo, StreamRef, Inc) -> case get_stream_by_ref(State, StreamRef) of Stream=#stream{id=StreamID, flow=Flow0, flow_window=FlowWindow} -> Flow = case Flow0 of infinity -> infinity; _ -> Flow0 + Inc end, if %% Flow is active again, update the window. Flow0 =< 0, Flow > 0 -> Transport:send(Socket, cow_http2:window_update(StreamID, FlowWindow)), HTTP2Machine = cow_http2_machine:update_window(StreamID, FlowWindow, HTTP2Machine0), {state, store_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream#stream{flow=Flow, flow_window=0})}; true -> {state, store_stream(State, Stream#stream{flow=Flow})} end; false -> [] end. %% @todo Use Reason. close(_, #http2_state{streams=Streams}, _, EvHandlerState) -> {close_streams(Streams), EvHandlerState}. close_streams([]) -> ok; close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) -> ReplyTo ! {gun_error, self(), StreamRef, {closed, "The connection was lost."}}, close_streams(Tail). keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> Transport:send(Socket, cow_http2:ping(0)), State. headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0, streams=Streams}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0) -> {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0), RequestEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo, function => ?FUNCTION_NAME, method => Method, authority => maps:get(authority, PseudoHeaders), path => Path, headers => Headers }, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), InitialFlow = initial_flow(InitialFlow0, Opts), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow}, {State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]}, EvHandlerState}. request(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, http2_machine=HTTP2Machine0, streams=Streams}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body, InitialFlow0, EvHandler, EvHandlerState0) -> Headers1 = lists:keystore(<<"content-length">>, 1, Headers0, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers1), RequestEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo, function => ?FUNCTION_NAME, method => Method, authority => maps:get(authority, PseudoHeaders), path => Path, headers => Headers }, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), InitialFlow = initial_flow(InitialFlow0, Opts), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow}, maybe_send_data(State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]}, StreamID, fin, Body, EvHandler, EvHandlerState). initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow; initial_flow(InitialFlow, _) -> InitialFlow. prepare_headers(#http2_state{transport=Transport}, Method, Host0, Port, Path, Headers0) -> Authority = case lists:keyfind(<<"host">>, 1, Headers0) of {_, Host} -> Host; _ -> gun_http:host_header(Transport, Host0, Port) end, %% @todo We also must remove any header found in the connection header. Headers = lists:keydelete(<<"host">>, 1, lists:keydelete(<<"connection">>, 1, lists:keydelete(<<"keep-alive">>, 1, lists:keydelete(<<"proxy-connection">>, 1, lists:keydelete(<<"transfer-encoding">>, 1, lists:keydelete(<<"upgrade">>, 1, Headers0)))))), PseudoHeaders = #{ method => Method, scheme => case Transport of gun_tls -> <<"https">>; gun_tls_proxy -> <<"https">>; gun_tcp -> <<"http">> end, authority => Authority, path => Path }, {ok, PseudoHeaders, Headers}. data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, Data, EvHandler, EvHandlerState) -> case get_stream_by_ref(State, StreamRef) of #stream{id=StreamID} -> case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of {ok, fin, _} -> {error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState}; {ok, _, fin} -> {error_stream_closed(State, StreamRef, ReplyTo), EvHandlerState}; {ok, _, _} -> maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState) end; false -> {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState} end. maybe_send_data(State=#http2_state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0, EvHandler, EvHandlerState) -> Data = case is_tuple(Data0) of false -> {data, Data0}; true -> Data0 end, case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of {ok, HTTP2Machine} -> {State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState}; {send, SendData, HTTP2Machine} -> send_data(State#http2_state{http2_machine=HTTP2Machine}, SendData, EvHandler, EvHandlerState) end. send_data(State, [], _, EvHandlerState) -> {State, EvHandlerState}; send_data(State0, [{StreamID, IsFin, SendData}|Tail], EvHandler, EvHandlerState0) -> {State, EvHandlerState} = send_data(State0, StreamID, IsFin, SendData, EvHandler, EvHandlerState0), send_data(State, Tail, EvHandler, EvHandlerState). send_data(State0, StreamID, IsFin, [Data], EvHandler, EvHandlerState0) -> State = send_data_frame(State0, StreamID, IsFin, Data), EvHandlerState = case IsFin of nofin -> EvHandlerState0; fin -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), RequestEndEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandler:request_end(RequestEndEvent, EvHandlerState0) end, {maybe_delete_stream(State, StreamID, local, IsFin), EvHandlerState}; send_data(State0, StreamID, IsFin, [Data|Tail], EvHandler, EvHandlerState) -> State = send_data_frame(State0, StreamID, nofin, Data), send_data(State, StreamID, IsFin, Tail, EvHandler, EvHandlerState). send_data_frame(State=#http2_state{socket=Socket, transport=Transport}, StreamID, IsFin, {data, Data}) -> Transport:send(Socket, cow_http2:data(StreamID, IsFin, Data)), State; %% @todo Uncomment this once sendfile is supported. %send_data_frame(State=#http2_state{socket=Socket, transport=Transport}, % StreamID, IsFin, {sendfile, Offset, Bytes, Path}) -> % Transport:send(Socket, cow_http2:data_header(StreamID, IsFin, Bytes)), % Transport:sendfile(Socket, Path, Offset, Bytes), % State; %% The stream is terminated in cow_http2_machine:prepare_trailers. send_data_frame(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, nofin, {trailers, Trailers}) -> {ok, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_trailers(StreamID, HTTP2Machine0, Trailers), Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), State#http2_state{http2_machine=HTTP2Machine}. reset_stream(State=#http2_state{socket=Socket, transport=Transport, streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) -> Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), case lists:keytake(StreamID, #stream.id, Streams0) of {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} -> ReplyTo ! {gun_error, self(), StreamRef, StreamError}, State#http2_state{streams=Streams}; false -> State end. cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of #stream{id=StreamID} -> {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)), EvHandlerState = EvHandler:cancel(#{ stream_ref => StreamRef, reply_to => ReplyTo, endpoint => local, reason => cancel }, EvHandlerState0), {delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID), EvHandlerState}; false -> {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} end. stream_info(State, StreamRef) -> case get_stream_by_ref(State, StreamRef) of #stream{reply_to=ReplyTo} -> {ok, #{ ref => StreamRef, reply_to => ReplyTo, state => running }}; false -> {ok, undefined} end. %% @todo Add unprocessed streams when GOAWAY handling is done. down(#http2_state{streams=Streams}) -> KilledStreams = [Ref || #stream{ref=Ref} <- Streams], {KilledStreams, []}. terminate(#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine, streams=Streams}, Reason) -> %% The connection is going away either at the request of the server, %% or because an error occurred in the protocol. Inform the streams. %% @todo We should not send duplicate messages to processes. %% @todo We should probably also inform the owner process. %% @todo Somehow streams aren't removed on receiving a response. _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams], Transport:send(Socket, cow_http2:goaway( cow_http2_machine:get_last_streamid(HTTP2Machine), terminate_reason(Reason), <<>>)), close. terminate_reason({connection_error, Reason, _}) -> Reason; terminate_reason({stop, _, _}) -> no_error. %% Stream functions. error_stream_closed(State, StreamRef, ReplyTo) -> ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream has already been closed."}}, State. error_stream_not_found(State, StreamRef, ReplyTo) -> ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream cannot be found."}}, State. %% Streams. %% @todo probably change order of args and have state first? Yes. get_stream_by_id(#http2_state{streams=Streams}, StreamID) -> lists:keyfind(StreamID, #stream.id, Streams). get_stream_by_ref(#http2_state{streams=Streams}, StreamRef) -> lists:keyfind(StreamRef, #stream.ref, Streams). store_stream(State=#http2_state{streams=Streams0}, Stream=#stream{id=StreamID}) -> Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream), State#http2_state{streams=Streams}. maybe_delete_stream(State=#http2_state{http2_machine=HTTP2Machine}, StreamID, local, fin) -> case cow_http2_machine:get_stream_remote_state(StreamID, HTTP2Machine) of {ok, fin} -> delete_stream(State, StreamID); {error, closed} -> delete_stream(State, StreamID); _ -> State end; maybe_delete_stream(State=#http2_state{http2_machine=HTTP2Machine}, StreamID, remote, fin) -> case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine) of {ok, fin, _} -> delete_stream(State, StreamID); {error, closed} -> delete_stream(State, StreamID); _ -> State end; maybe_delete_stream(State, _, _, _) -> State. delete_stream(State=#http2_state{streams=Streams}, StreamID) -> Streams2 = lists:keydelete(StreamID, #stream.id, Streams), State#http2_state{streams=Streams2}.