From 45ddcd8c67f2fc20f4d44e84b5e7e3faef0a1d58 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Tue, 8 Aug 2017 16:59:33 +0200 Subject: Implement the shutdown timeout for request processes This should work very similar to normal supervisors, in particular during the shutdown sequence when the connection process goes down or switches to Websocket. Processes that need to enforce the shutdown timeout will be required to trap exits, just like in a supervisor. In a vanilla Cowboy, this only matters at connection shutdown, as Cowboy will otherwise wait for the request process to be down before stopping the stream. Tests are currently missing. --- src/cowboy_http.erl | 64 +++++++++++++++++++++-------------------------------- 1 file changed, 25 insertions(+), 39 deletions(-) (limited to 'src/cowboy_http.erl') diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index a612f20..5ee5ceb 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -115,7 +115,7 @@ streams = [] :: [stream()], %% Children processes created by streams. - children = [] :: [{pid(), cowboy_stream:streamid(), timeout()}] + children = cowboy_children:init() :: cowboy_children:children() }). -include_lib("cowlib/include/cow_inline.hrl"). @@ -162,6 +162,9 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, {Error, Socket, Reason} -> terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'}); %% Timeouts. + {timeout, Ref, {shutdown, Pid}} -> + cowboy_children:shutdown_timeout(Children, Ref, Pid), + loop(State, Buffer); {timeout, TimerRef, Reason} -> timeout(State, Reason); {timeout, _, _} -> @@ -179,14 +182,10 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, loop(down(State, Pid, Msg), Buffer); %% Calls from supervisor module. {'$gen_call', {From, Tag}, which_children} -> - Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _, _} <- Children], - From ! {Tag, Workers}, + From ! {Tag, cowboy_children:which_children(Children, ?MODULE)}, loop(State, Buffer); {'$gen_call', {From, Tag}, count_children} -> - NbChildren = length(Children), - Counts = [{specs, 1}, {active, NbChildren}, - {supervisors, 0}, {workers, NbChildren}], - From ! {Tag, Counts}, + From ! {Tag, cowboy_children:count_children(Children)}, loop(State, Buffer); {'$gen_call', {From, Tag}, _} -> From ! {Tag, {error, ?MODULE}}, @@ -720,18 +719,16 @@ parse_body(Buffer, State=#state{in_streamid=StreamID, in_state= %% Message handling. -%% @todo There is a difference in behavior between HTTP/1.1 and HTTP/2 -%% when an error or crash occurs after sending a 500 response. In HTTP/2 -%% the error will be printed, in HTTP/1.1 the error will be ignored. -%% This is due to HTTP/1.1 disabling streams differently after both -%% requests and responses have been sent. down(State=#state{children=Children0}, Pid, Msg) -> - case lists:keytake(Pid, 1, Children0) of - {value, {_, undefined, _}, Children} -> + case cowboy_children:down(Children0, Pid) of + %% The stream was terminated already. + {ok, undefined, Children} -> State#state{children=Children}; - {value, {_, StreamID, _}, Children} -> + %% The stream is still running. + {ok, StreamID, Children} -> info(State#state{children=Children}, StreamID, Msg); - false -> + %% The process was unknown. + error -> error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.~n", [Msg, Pid]), State end. @@ -764,7 +761,8 @@ commands(State, _, []) -> State; %% Supervise a child process. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) -> - commands(State#state{children=[{Pid, StreamID, Shutdown}|Children]}, StreamID, Tail); + commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)}, + StreamID, Tail); %% Error handling. commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) -> commands(stream_reset(State, StreamID, Error), StreamID, Tail); @@ -886,7 +884,9 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor %% stream processes finish but that implies the Websocket module to know about %% them and filter the messages. For now, kill them all and discard all messages %% in the mailbox. - _ = [exit(Pid, kill) || {Pid, _, _} <- Children], + + cowboy_children:terminate(Children), + flush(), %% Everything good, upgrade! _ = commands(State, StreamID, [{inform, 101, Headers}]), @@ -955,12 +955,8 @@ stream_terminate(State0=#state{socket=Socket, transport=Transport, end, stream_call_terminate(StreamID, Reason, StreamState), -%% @todo initiate children shutdown -% Children = stream_terminate_children(Children0, StreamID, []), - Children = [case C of - {Pid, StreamID, Shutdown} -> {Pid, undefined, Shutdown}; - _ -> C - end || C <- Children0], + + Children = cowboy_children:shutdown(Children0, StreamID), %% @todo Skip the body, if any, or drop the connection if too large. @@ -987,15 +983,6 @@ stream_call_terminate(StreamID, Reason, StreamState) -> [StreamID, Reason, StreamState, Class, Reason]) end. -%stream_terminate_children([], _, Acc) -> -% Acc; -%stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) -> -% exit(Pid, kill), -% stream_terminate_children(Tail, StreamID, Acc); -%stream_terminate_children([Child|Tail], StreamID, Acc) -> -% stream_terminate_children(Tail, StreamID, [Child|Acc]). - - %% @todo max_reqs also maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') -> Conns = cow_http_hd:parse_connection(Conn), @@ -1073,9 +1060,8 @@ terminate(undefined, Reason) -> exit({shutdown, Reason}); terminate(#state{streams=Streams, children=Children}, Reason) -> terminate_all_streams(Streams, Reason), - %% @todo Leave them time to terminate. - _ = [exit(Pid, kill) || {Pid, _, _} <- Children], - exit(normal). %% @todo We probably don't want to exit normal on errors. + cowboy_children:terminate(Children), + exit({shutdown, Reason}). terminate_all_streams([], _) -> ok; @@ -1089,9 +1075,9 @@ terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason) -> system_continue(_, _, {State, Buffer}) -> loop(State, Buffer). --spec system_terminate(any(), _, _, _) -> no_return(). -system_terminate(Reason, _, _, _) -> - exit(Reason). +-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return(). +system_terminate(Reason, _, _, {State, _}) -> + terminate(State, Reason). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. system_code_change(Misc, _, _, _) -> -- cgit v1.2.3