From a0a752ab10a5090142346c192b28c488bd1666f6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 16 Aug 2019 16:37:04 +0200 Subject: Gracefully shut down HTTP/2 connections on GOAWAY --- src/cowboy_http2.erl | 100 +++++++++++++++++++++++++++++++++++++++------------ 1 file changed, 77 insertions(+), 23 deletions(-) (limited to 'src') diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 3f45670..7c9f070 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -78,7 +78,7 @@ cert :: undefined | binary(), %% HTTP/2 state machine. - http2_init :: sequence | settings | upgrade | complete, + http2_status :: sequence | settings | upgrade | connected | closing, http2_machine :: cow_http2_machine:http2_machine(), %% Currently active HTTP/2 streams. Streams may be initiated either @@ -129,7 +129,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer State = set_timeout(#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, - http2_init=sequence, http2_machine=HTTP2Machine}), + http2_status=sequence, http2_machine=HTTP2Machine}), Transport:send(Socket, Preface), case Buffer of <<>> -> loop(State, Buffer); @@ -149,7 +149,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer State0 = #state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, - http2_init=upgrade, http2_machine=HTTP2Machine}, + http2_status=upgrade, http2_machine=HTTP2Machine}, State1 = headers_frame(State0#state{ http2_machine=HTTP2Machine}, StreamID, Req), %% We assume that the upgrade will be applied. A stream handler @@ -158,7 +158,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer <<"connection">> => <<"Upgrade">>, <<"upgrade">> => <<"h2c">> }, ?MODULE, undefined}), %% @todo undefined or #{}? - State = set_timeout(State2#state{http2_init=sequence}), + State = set_timeout(State2#state{http2_status=sequence}), Transport:send(Socket, Preface), case Buffer of <<>> -> loop(State, Buffer); @@ -181,6 +181,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'}); %% System messages. {'EXIT', Parent, Reason} -> + %% @todo Graceful shutdown here as well? terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'}); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer}); @@ -223,16 +224,16 @@ set_timeout(State=#state{opts=Opts, timer=TimerRef0}) -> %% HTTP/2 protocol parsing. -parse(State=#state{http2_init=sequence}, Data) -> +parse(State=#state{http2_status=sequence}, Data) -> case cow_http2:parse_sequence(Data) of {ok, Rest} -> - parse(State#state{http2_init=settings}, Rest); + parse(State#state{http2_status=settings}, Rest); more -> loop(State, Data); Error = {connection_error, _, _} -> terminate(State, Error) end; -parse(State=#state{http2_machine=HTTP2Machine}, Data) -> +parse(State=#state{http2_status=Status, http2_machine=HTTP2Machine, streams=Streams}, Data) -> MaxFrameSize = cow_http2_machine:get_local_setting(max_frame_size, HTTP2Machine), case cow_http2:parse(Data, MaxFrameSize) of {ok, Frame, Rest} -> @@ -243,6 +244,9 @@ parse(State=#state{http2_machine=HTTP2Machine}, Data) -> parse(reset_stream(State, StreamID, {stream_error, Reason, Human}), Rest); Error = {connection_error, _, _} -> terminate(State, Error); + %% Terminate the connection if we are closing and all streams have completed. + more when Status =:= closing, Streams =:= #{} -> + terminate(State, {stop, normal, 'The connection is going away.'}); more -> loop(State, Data) end. @@ -265,9 +269,8 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) -> State#state{http2_machine=HTTP2Machine}; {ok, {rst_stream, StreamID, Reason}, HTTP2Machine} -> rst_stream_frame(State#state{http2_machine=HTTP2Machine}, StreamID, Reason); - {ok, Frame={goaway, _StreamID, _Reason, _Data}, HTTP2Machine} -> - terminate(State#state{http2_machine=HTTP2Machine}, - {stop, Frame, 'Client is going away.'}); + {ok, GoAway={goaway, _, _, _}, HTTP2Machine} -> + goaway(State#state{http2_machine=HTTP2Machine}, GoAway); {send, SendData, HTTP2Machine} -> send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData); {error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} -> @@ -277,10 +280,10 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) -> terminate(State#state{http2_machine=HTTP2Machine}, Error) end. -%% We use this opportunity to mark the HTTP/2 initialization -%% as complete if we were still waiting for a SETTINGS frame. -maybe_ack(State=#state{http2_init=settings}, Frame) -> - maybe_ack(State#state{http2_init=complete}, Frame); +%% We use this opportunity to mark the HTTP/2 status as connected +%% if we were still waiting for a SETTINGS frame. +maybe_ack(State=#state{http2_status=settings}, Frame) -> + maybe_ack(State#state{http2_status=connected}, Frame); maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) -> case Frame of {settings, _} -> Transport:send(Socket, cow_http2:settings_ack()); @@ -549,6 +552,9 @@ commands(State0, StreamID, [{trailers, Trailers}|Tail]) -> %% %% @todo Responses sent as a result of a push_promise request %% must not send push_promise frames themselves. +%% +%% @todo We should not send push_promise frames when we are +%% in the closing http2_status. commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) -> Authority = case {Scheme, Port} of @@ -600,7 +606,7 @@ commands(State, StreamID, [Error = {internal_error, _, _}|_Tail]) -> %% @todo Only reset when the stream still exists. reset_stream(State, StreamID, Error); %% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself. -commands(State=#state{socket=Socket, transport=Transport, http2_init=upgrade}, +commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade}, StreamID, [{switch_protocol, Headers, ?MODULE, _}|Tail]) -> %% @todo This 101 response needs to be passed through stream handlers. Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))), @@ -654,16 +660,24 @@ headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) -> headers_to_list(Headers) -> maps:to_list(Headers). -maybe_send_data(State=#state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0) -> +maybe_send_data(State0=#state{http2_machine=HTTP2Machine0}, StreamID, IsFin, Data0) -> Data = case is_tuple(Data0) of false -> {data, Data0}; true -> Data0 end, case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of {ok, HTTP2Machine} -> - State#state{http2_machine=HTTP2Machine}; + State0#state{http2_machine=HTTP2Machine}; {send, SendData, HTTP2Machine} -> - send_data(State#state{http2_machine=HTTP2Machine}, SendData) + State = #state{http2_status=Status, streams=Streams} + = send_data(State0#state{http2_machine=HTTP2Machine}, SendData), + %% Terminate the connection if we are closing and all streams have completed. + if + Status =:= closing, Streams =:= #{} -> + terminate(State, {stop, normal, 'The connection is going away.'}); + true -> + State + end end. send_data(State, []) -> @@ -702,16 +716,55 @@ send_data_frame(State=#state{socket=Socket, transport=Transport, %% Terminate a stream or the connection. +%% We may have to cancel streams even if we receive multiple +%% GOAWAY frames as the LastStreamID value may be lower than +%% the one previously received. +goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine, + http2_status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) + when Status =:= connected; Status =:= closing -> + Streams = goaway_streams(State0, maps:to_list(Streams0), LastStreamID, + {stop, {goaway, Reason}, 'The connection is going away.'}, []), + State = State0#state{streams=maps:from_list(Streams)}, + case Status of + connected -> + Transport:send(Socket, cow_http2:goaway( + cow_http2_machine:get_last_streamid(HTTP2Machine), + no_error, <<>>)), + State#state{http2_status=closing}; + _ -> + State + end; +%% We terminate the connection immediately if it hasn't fully been initialized. +goaway(State, {goaway, _, Reason, _}) -> + terminate(State, {stop, {goaway, Reason}, 'The connection is going away.'}). + +%% Cancel client-initiated streams that are above LastStreamID. +goaway_streams(_, [], _, _, Acc) -> + Acc; +goaway_streams(State, [{StreamID, {_, StreamState}}|Tail], LastStreamID, Reason, Acc) + when StreamID > LastStreamID, (StreamID rem 2) =:= 0 -> + terminate_stream_handler(State, StreamID, Reason, StreamState), + goaway_streams(State, Tail, LastStreamID, Reason, Acc); +goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) -> + goaway_streams(State, Tail, LastStreamID, Reason, [Stream|Acc]). + -spec terminate(#state{}, _) -> no_return(). terminate(undefined, Reason) -> exit({shutdown, Reason}); -terminate(State=#state{socket=Socket, transport=Transport, http2_init=complete, - http2_machine=HTTP2Machine, streams=Streams, children=Children}, Reason) -> +terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status, + http2_machine=HTTP2Machine, streams=Streams, children=Children}, Reason) + when Status =:= connected; Status =:= closing -> %% @todo We might want to optionally send the Reason value %% as debug data in the GOAWAY frame here. Perhaps more. - Transport:send(Socket, cow_http2:goaway( - cow_http2_machine:get_last_streamid(HTTP2Machine), - terminate_reason(Reason), <<>>)), + case Status of + connected -> + Transport:send(Socket, cow_http2:goaway( + cow_http2_machine:get_last_streamid(HTTP2Machine), + terminate_reason(Reason), <<>>)); + %% We already sent the GOAWAY frame. + closing -> + ok + end, terminate_all_streams(State, maps:to_list(Streams), Reason), cowboy_children:terminate(Children), Transport:close(Socket), @@ -823,6 +876,7 @@ system_continue(_, _, {State, Buffer}) -> -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return(). system_terminate(Reason, _, _, {State, _}) -> + %% @todo Graceful shutdown here as well? terminate(State, {stop, {exit, Reason}, 'sys:terminate/2,3 was called.'}). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. -- cgit v1.2.3