diff options
Diffstat (limited to 'src/gun_http2.erl')
-rw-r--r-- | src/gun_http2.erl | 144 |
1 files changed, 115 insertions, 29 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 3b3b79b..5942037 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -19,6 +19,7 @@ -export([init/4]). -export([handle/4]). -export([update_flow/4]). +-export([closing/4]). -export([close/4]). -export([keepalive/1]). -export([headers/11]). @@ -53,6 +54,10 @@ content_handlers :: gun_content_handler:opt(), buffer = <<>> :: binary(), + %% Current status of the connection. We use this to ensure we are + %% not sending the GOAWAY frame more than once. + status = connected :: connected | goaway | closing, + %% HTTP/2 state machine. http2_machine :: cow_http2_machine:http2_machine(), @@ -66,6 +71,10 @@ check_options(Opts) -> do_check_options([]) -> ok; +do_check_options([{closing_timeout, infinity}|Opts]) -> + do_check_options(Opts); +do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 -> + do_check_options(Opts); do_check_options([Opt={content_handlers, Handlers}|Opts]) -> case gun_content_handler:check_option(Handlers) of ok -> do_check_options(Opts); @@ -99,7 +108,8 @@ handle(Data, State=#http2_state{buffer=Buffer}, EvHandler, EvHandlerState) -> parse(<< Buffer/binary, Data/binary >>, State#http2_state{buffer= <<>>}, EvHandler, EvHandlerState). -parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandlerState0) -> +parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, streams=Streams}, + EvHandler, EvHandlerState0) -> MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine), case cow_http2:parse(Data, MaxFrameSize) of {ok, Frame, Rest} -> @@ -116,7 +126,14 @@ parse(Data, State0=#http2_state{http2_machine=HTTP2Machine}, EvHandler, EvHandle parse(Rest, reset_stream(State0, StreamID, {stream_error, Reason, Human}), EvHandler, EvHandlerState0); Error = {connection_error, _, _} -> - {terminate(State0, Error), EvHandlerState0}; + {connection_error(State0, Error), EvHandlerState0}; + %% If we both received and sent a GOAWAY frame and there are no streams + %% currently running, we can close the connection immediately. + more when Status =/= connected, Streams =:= [] -> + {[{state, State0#http2_state{buffer=Data, status=closing}}, close], EvHandlerState0}; + %% Otherwise we enter the closing state. + more when Status =:= goaway -> + {[{state, State0#http2_state{buffer=Data, status=closing}}, closing(State0)], EvHandlerState0}; more -> {{state, State0#http2_state{buffer=Data}}, EvHandlerState0} end. @@ -169,9 +186,8 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl push_promise_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, PromisedStreamID, Headers, PseudoHeaders, EvHandler, EvHandlerState); - {ok, Frame={goaway, _StreamID, _Reason, _Data}, HTTP2Machine} -> - {terminate(State#http2_state{http2_machine=HTTP2Machine}, - {stop, Frame, 'Server is going away.'}), + {ok, GoAway={goaway, _, _, _}, HTTP2Machine} -> + {goaway(State#http2_state{http2_machine=HTTP2Machine}, GoAway), EvHandlerState}; {send, SendData, HTTP2Machine} -> send_data(maybe_ack(State#http2_state{http2_machine=HTTP2Machine}, Frame), SendData, @@ -181,7 +197,7 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl StreamID, {stream_error, Reason, Human}), EvHandlerState}; {error, Error={connection_error, _, _}, HTTP2Machine} -> - {terminate(State#http2_state{http2_machine=HTTP2Machine}, Error), + {connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error), EvHandlerState} end. @@ -318,7 +334,8 @@ rst_stream_frame(State=#http2_state{streams=Streams0}, end. %% Pushed streams receive the same initial flow value as the parent stream. -push_promise_frame(State=#http2_state{streams=Streams}, +push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, + status=Status, http2_machine=HTTP2Machine0, streams=Streams}, StreamID, PromisedStreamID, Headers, #{ method := Method, scheme := Scheme, authority := Authority, path := Path}, @@ -326,25 +343,39 @@ push_promise_frame(State=#http2_state{streams=Streams}, #stream{ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow} = get_stream_by_id(State, StreamID), PromisedStreamRef = make_ref(), URI = iolist_to_binary([Scheme, <<"://">>, Authority, Path]), - ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers}, - EvHandlerState = EvHandler:push_promise_end(#{ + PushPromiseEvent0 = #{ stream_ref => StreamRef, reply_to => ReplyTo, - promised_stream_ref => PromisedStreamRef, method => Method, uri => URI, headers => Headers - }, EvHandlerState0), - NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, - reply_to=ReplyTo, flow=InitialFlow}, - {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}. + }, + PushPromiseEvent = case Status of + connected -> + ReplyTo ! {gun_push, self(), StreamRef, PromisedStreamRef, Method, URI, Headers}, + PushPromiseEvent0#{promised_stream_ref => PromisedStreamRef}; + _ -> + PushPromiseEvent0 + end, + EvHandlerState = EvHandler:push_promise_end(PushPromiseEvent, EvHandlerState0), + case Status of + connected -> + NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, + reply_to=ReplyTo, flow=InitialFlow}, + {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}; + %% We cancel the push_promise immediately when we are shutting down. + _ -> + {ok, HTTP2Machine} = cow_http2_machine:reset_stream(PromisedStreamID, HTTP2Machine0), + Transport:send(Socket, cow_http2:rst_stream(PromisedStreamID, cancel)), + {State#http2_state{http2_machine=HTTP2Machine}, EvHandlerState} + end. ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> case cow_http2_machine:ignored_frame(HTTP2Machine0) of {ok, HTTP2Machine} -> State#http2_state{http2_machine=HTTP2Machine}; {error, Error={connection_error, _, _}, HTTP2Machine} -> - terminate(State#http2_state{http2_machine=HTTP2Machine}, Error) + connection_error(State#http2_state{http2_machine=HTTP2Machine}, Error) end. update_flow(State=#http2_state{socket=Socket, transport=Transport, @@ -369,16 +400,72 @@ update_flow(State=#http2_state{socket=Socket, transport=Transport, [] end. -%% @todo Use Reason. -close(_, #http2_state{streams=Streams}, _, EvHandlerState) -> - {close_streams(Streams), EvHandlerState}. +%% We may have to cancel streams even if we receive multiple +%% GOAWAY frames as the LastStreamID value may be lower than +%% the one previously received. +goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine, + status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) -> + Streams = goaway_streams(Streams0, LastStreamID, {goaway, Reason, 'The connection is going away.'}, []), + State = State0#http2_state{streams=Streams}, + case Status of + connected -> + Transport:send(Socket, cow_http2:goaway( + cow_http2_machine:get_last_streamid(HTTP2Machine), + no_error, <<>>)), + State#http2_state{status=goaway}; + _ -> + State + end. + +%% Cancel server-initiated streams that are above LastStreamID. +goaway_streams([], _, _, Acc) -> + Acc; +goaway_streams([Stream=#stream{id=StreamID}|Tail], LastStreamID, Reason, Acc) + when StreamID > LastStreamID, (StreamID rem 2) =:= 1 -> + close_stream(Stream, Reason), + goaway_streams(Tail, LastStreamID, Reason, Acc); +goaway_streams([Stream|Tail], LastStreamID, Reason, Acc) -> + goaway_streams(Tail, LastStreamID, Reason, [Stream|Acc]). + +%% We are already closing, do nothing. +closing(_, #http2_state{status=closing}, _, EvHandlerState) -> + {[], EvHandlerState}; +closing(Reason0, State=#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine}, _, EvHandlerState) -> + Reason = case Reason0 of + normal -> no_error; + owner_down -> no_error; + _ -> internal_error + end, + Transport:send(Socket, cow_http2:goaway( + cow_http2_machine:get_last_streamid(HTTP2Machine), + Reason, <<>>)), + {[ + {state, State#http2_state{status=closing}}, + closing(State) + ], EvHandlerState}. -close_streams([]) -> +closing(#http2_state{opts=Opts}) -> + Timeout = maps:get(closing_timeout, Opts, 15000), + {closing, Timeout}. + +close(Reason, #http2_state{streams=Streams}, _, EvHandlerState) -> + close_streams(Streams, close_reason(Reason)), + EvHandlerState. + +close_reason(closed) -> closed; +close_reason(Reason) -> {closed, Reason}. + +close_streams([], _) -> ok; -close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) -> - ReplyTo ! {gun_error, self(), StreamRef, {closed, - "The connection was lost."}}, - close_streams(Tail). +close_streams([Stream|Tail], Reason) -> + close_stream(Stream, Reason), + close_streams(Tail, Reason). + +%% @todo Do we want an event for this? +close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) -> + ReplyTo ! {gun_error, self(), StreamRef, Reason}, + ok. keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> Transport:send(Socket, cow_http2:ping(0)), @@ -429,6 +516,7 @@ request(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, headers => Headers }, EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), + %% @todo We should not send an empty DATA frame on empty bodies. {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers( StreamID, HTTP2Machine1, nofin, PseudoHeaders, Headers), Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), @@ -586,8 +674,9 @@ down(#http2_state{streams=Streams}) -> KilledStreams = [Ref || #stream{ref=Ref} <- Streams], {KilledStreams, []}. -terminate(#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine, streams=Streams}, Reason) -> +connection_error(#http2_state{socket=Socket, transport=Transport, + http2_machine=HTTP2Machine, streams=Streams}, + {connection_error, Reason, _}) -> %% The connection is going away either at the request of the server, %% or because an error occurred in the protocol. Inform the streams. %% @todo We should not send duplicate messages to processes. @@ -597,12 +686,9 @@ terminate(#http2_state{socket=Socket, transport=Transport, _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams], Transport:send(Socket, cow_http2:goaway( cow_http2_machine:get_last_streamid(HTTP2Machine), - terminate_reason(Reason), <<>>)), + Reason, <<>>)), close. -terminate_reason({connection_error, Reason, _}) -> Reason; -terminate_reason({stop, _, _}) -> no_error. - %% Stream functions. error_stream_closed(State, StreamRef, ReplyTo) -> |