diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/cowboy_http.erl | 17 | ||||
-rw-r--r-- | src/cowboy_http2.erl | 130 |
2 files changed, 115 insertions, 32 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 89ba9d8..c9bceed 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -245,6 +245,9 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, {timeout, _, _} -> loop(State); %% System messages. + {'EXIT', Parent, shutdown} -> + Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'}, + loop(initiate_closing(State, Reason)); {'EXIT', Parent, Reason} -> terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'}); {system, From, Request} -> @@ -1440,6 +1443,13 @@ early_error(StatusCode0, #state{socket=Socket, transport=Transport, end, ok. +initiate_closing(State=#state{streams=[]}, Reason) -> + terminate(State, Reason); +initiate_closing(State=#state{streams=[_Stream|Streams], + out_streamid=OutStreamID}, Reason) -> + terminate_all_streams(State, Streams, Reason), + State#state{last_streamid=OutStreamID}. + -spec terminate(_, _) -> no_return(). terminate(undefined, Reason) -> exit({shutdown, Reason}); @@ -1503,9 +1513,10 @@ terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) -> system_continue(_, _, State) -> loop(State). --spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return(). -system_terminate(Reason, _, _, State) -> - terminate(State, {stop, {exit, Reason}, 'sys:terminate/2,3 was called.'}). +-spec system_terminate(any(), _, _, #state{}) -> no_return(). +system_terminate(Reason0, _, _, State) -> + Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'}, + loop(initiate_closing(State, Reason)). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. system_code_change(Misc, _, _, _) -> diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 8dc8c3b..ad9fa7a 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -31,6 +31,8 @@ connection_window_update_threshold => 0..16#7fffffff, enable_connect_protocol => boolean(), env => cowboy_middleware:env(), + goaway_initial_timeout => timeout(), + goaway_complete_timeout => timeout(), idle_timeout => timeout(), inactivity_timeout => timeout(), initial_connection_window_size => 65535..16#7fffffff, @@ -88,7 +90,7 @@ proxy_header :: undefined | ranch_proxy_header:proxy_info(), opts = #{} :: opts(), - %% Timer for idle_timeout. + %% Timer for idle_timeout; also used for goaway timers. timer = undefined :: undefined | reference(), %% Remote address and port for the connection. @@ -101,7 +103,7 @@ cert :: undefined | binary(), %% HTTP/2 state machine. - http2_status :: sequence | settings | upgrade | connected | closing, + http2_status :: sequence | settings | upgrade | connected | closing_initiated | closing, http2_machine :: cow_http2_machine:http2_machine(), %% HTTP/2 frame rate flood protection. @@ -160,7 +162,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> binary() | undefined, binary()) -> ok. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) -> {ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts), - State = set_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket, + State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, http2_status=sequence, http2_machine=HTTP2Machine})), @@ -205,7 +207,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer <<"connection">> => <<"Upgrade">>, <<"upgrade">> => <<"h2c">> }, ?MODULE, undefined}), %% @todo undefined or #{}? - State = set_timeout(init_rate_limiting(State2#state{http2_status=sequence})), + State = set_idle_timeout(init_rate_limiting(State2#state{http2_status=sequence})), Transport:send(Socket, Preface), setopts_active(State), case Buffer of @@ -227,9 +229,13 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, receive %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(set_timeout(State), << Buffer/binary, Data/binary >>); + parse(set_idle_timeout(State), << Buffer/binary, Data/binary >>); {Closed, Socket} when Closed =:= element(2, Messages) -> - terminate(State, {socket_error, closed, 'The socket has been closed.'}); + Reason = case State#state.http2_status of + closing -> {stop, closed, 'The client is going away.'}; + _ -> {socket_error, closed, 'The socket has been closed.'} + end, + terminate(State, Reason); {Error, Socket, Reason} when Error =:= element(3, Messages) -> terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'}); {Passive, Socket} when Passive =:= element(4, Messages); @@ -238,8 +244,10 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, setopts_active(State), loop(State, Buffer); %% System messages. + {'EXIT', Parent, shutdown} -> + Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'}, + loop(initiate_closing(State, Reason), Buffer); {'EXIT', Parent, Reason} -> - %% @todo Graceful shutdown here as well? terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'}); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer}); @@ -252,6 +260,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, loop(State, Buffer); {timeout, TRef, {cow_http2_machine, Name}} -> loop(timeout(State, Name, TRef), Buffer); + {timeout, TimerRef, {goaway_initial_timeout, Reason}} -> + loop(closing(State, Reason), Buffer); + {timeout, TimerRef, {goaway_complete_timeout, Reason}} -> + terminate(State, {stop, stop_reason(Reason), + 'Graceful shutdown timed out.'}); %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> loop(info(State, StreamID, Msg), Buffer); @@ -269,14 +282,21 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, terminate(State, {internal_error, timeout, 'No message or data received before timeout.'}) end. -set_timeout(State=#state{opts=Opts, timer=TimerRef0}) -> +set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef}) + when Status =:= closing_initiated orelse Status =:= closing, + TimerRef =/= undefined -> + State; +set_idle_timeout(State=#state{opts=Opts}) -> + set_timeout(State, maps:get(idle_timeout, Opts, 60000), idle_timeout). + +set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) -> ok = case TimerRef0 of undefined -> ok; _ -> erlang:cancel_timer(TimerRef0, [{async, true}, {info, false}]) end, - TimerRef = case maps:get(idle_timeout, Opts, 60000) of + TimerRef = case Timeout of infinity -> undefined; - Timeout -> erlang:start_timer(Timeout, self(), idle_timeout) + Timeout -> erlang:start_timer(Timeout, self(), Message) end, State#state{timer=TimerRef}. @@ -567,18 +587,24 @@ timeout(State=#state{http2_machine=HTTP2Machine0}, Name, TRef) -> %% Erlang messages. -down(State=#state{opts=Opts, children=Children0}, Pid, Msg) -> - case cowboy_children:down(Children0, Pid) of +down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) -> + State = case cowboy_children:down(Children0, Pid) of %% The stream was terminated already. {ok, undefined, Children} -> - State#state{children=Children}; + State0#state{children=Children}; %% The stream is still running. {ok, StreamID, Children} -> - info(State#state{children=Children}, StreamID, Msg); + info(State0#state{children=Children}, StreamID, Msg); %% The process was unknown. error -> cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n", [Msg, Pid], Opts), + State0 + end, + if + State#state.http2_status =:= closing, State#state.streams =:= #{} -> + terminate(State, {stop, normal, 'The connection is going away.'}); + true -> State end. @@ -909,19 +935,21 @@ stream_alarm(State, StreamID, Name, Value) -> %% We may have to cancel streams even if we receive multiple %% GOAWAY frames as the LastStreamID value may be lower than %% the one previously received. -goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine, +goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0, http2_status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) - when Status =:= connected; Status =:= closing -> + when Status =:= connected; Status =:= closing_initiated; Status =:= closing -> Streams = goaway_streams(State0, maps:to_list(Streams0), LastStreamID, {stop, {goaway, Reason}, 'The connection is going away.'}, []), State = State0#state{streams=maps:from_list(Streams)}, - case Status of - connected -> + if + Status =:= connected; Status =:= closing_initiated -> + {OurLastStreamID, HTTP2Machine} = + cow_http2_machine:set_last_streamid(HTTP2Machine0), Transport:send(Socket, cow_http2:goaway( - cow_http2_machine:get_last_streamid(HTTP2Machine), - no_error, <<>>)), - State#state{http2_status=closing}; - _ -> + OurLastStreamID, no_error, <<>>)), + State#state{http2_status=closing, + http2_machine=HTTP2Machine}; + true -> State end; %% We terminate the connection immediately if it hasn't fully been initialized. @@ -938,21 +966,65 @@ goaway_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], LastStreamI goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) -> goaway_streams(State, Tail, LastStreamID, Reason, [Stream|Acc]). +%% A server that is attempting to gracefully shut down a connection SHOULD send +%% an initial GOAWAY frame with the last stream identifier set to 2^31-1 and a +%% NO_ERROR code. This signals to the client that a shutdown is imminent and +%% that initiating further requests is prohibited. After allowing time for any +%% in-flight stream creation (at least one round-trip time), the server can send +%% another GOAWAY frame with an updated last stream identifier. This ensures +%% that a connection can be cleanly shut down without losing requests. +-spec initiate_closing(#state{}, _) -> #state{}. +initiate_closing(State=#state{http2_status=connected, socket=Socket, + transport=Transport, opts=Opts}, Reason) -> + Transport:send(Socket, cow_http2:goaway(16#7fffffff, no_error, <<>>)), + Timeout = maps:get(goaway_initial_timeout, Opts, 1000), + Message = {goaway_initial_timeout, Reason}, + set_timeout(State#state{http2_status=closing_initiated}, Timeout, Message); +initiate_closing(State=#state{http2_status=Status}, _Reason) + when Status =:= closing_initiated; Status =:= closing -> + %% This happens if sys:terminate/2,3 is called twice or if the supervisor + %% tells us to shutdown after sys:terminate/2,3 is called or vice versa. + State; +initiate_closing(State, Reason) -> + terminate(State, {stop, stop_reason(Reason), 'The connection is going away.'}). + +%% Switch to 'closing' state and stop accepting new streams. +-spec closing(#state{}, Reason :: term()) -> #state{}. +closing(State=#state{streams=Streams}, Reason) when Streams =:= #{} -> + terminate(State, Reason); +closing(State=#state{http2_status=closing_initiated, + http2_machine=HTTP2Machine0, socket=Socket, transport=Transport}, + Reason) -> + %% Stop accepting new streams. + {LastStreamID, HTTP2Machine} = + cow_http2_machine:set_last_streamid(HTTP2Machine0), + Transport:send(Socket, cow_http2:goaway(LastStreamID, no_error, <<>>)), + closing(State#state{http2_status=closing, http2_machine=HTTP2Machine}, Reason); +closing(State=#state{http2_status=closing, opts=Opts}, Reason) -> + %% If client sent GOAWAY, we may already be in 'closing' but without the + %% goaway complete timeout set. + Timeout = maps:get(goaway_complete_timeout, Opts, 3000), + Message = {goaway_complete_timeout, Reason}, + set_timeout(State, Timeout, Message). + +stop_reason({stop, Reason, _}) -> Reason; +stop_reason(Reason) -> Reason. + -spec terminate(#state{}, _) -> no_return(). terminate(undefined, Reason) -> exit({shutdown, Reason}); terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status, http2_machine=HTTP2Machine, streams=Streams, children=Children}, Reason) - when Status =:= connected; Status =:= closing -> + when Status =:= connected; Status =:= closing_initiated; Status =:= closing -> %% @todo We might want to optionally send the Reason value %% as debug data in the GOAWAY frame here. Perhaps more. - case Status of - connected -> + if + Status =:= connected; Status =:= closing_initiated -> Transport:send(Socket, cow_http2:goaway( cow_http2_machine:get_last_streamid(HTTP2Machine), terminate_reason(Reason), <<>>)); %% We already sent the GOAWAY frame. - closing -> + Status =:= closing -> ok end, terminate_all_streams(State, maps:to_list(Streams), Reason), @@ -1134,9 +1206,9 @@ system_continue(_, _, {State, Buffer}) -> loop(State, Buffer). -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return(). -system_terminate(Reason, _, _, {State, _}) -> - %% @todo Graceful shutdown here as well? - terminate(State, {stop, {exit, Reason}, 'sys:terminate/2,3 was called.'}). +system_terminate(Reason0, _, _, {State, Buffer}) -> + Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'}, + loop(initiate_closing(State, Reason), Buffer). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. system_code_change(Misc, _, _, _) -> |