diff options
author | Loïc Hoguin <[email protected]> | 2017-08-08 16:59:33 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2017-08-08 16:59:33 +0200 |
commit | 45ddcd8c67f2fc20f4d44e84b5e7e3faef0a1d58 (patch) | |
tree | 04f08112f1c3426c58e756c0f593a17dafe889ab | |
parent | 4fa7aeb0fd2e68fba267b33538685a7e1e18f4aa (diff) | |
download | cowboy-45ddcd8c67f2fc20f4d44e84b5e7e3faef0a1d58.tar.gz cowboy-45ddcd8c67f2fc20f4d44e84b5e7e3faef0a1d58.tar.bz2 cowboy-45ddcd8c67f2fc20f4d44e84b5e7e3faef0a1d58.zip |
Implement the shutdown timeout for request processes
This should work very similar to normal supervisors,
in particular during the shutdown sequence when the
connection process goes down or switches to Websocket.
Processes that need to enforce the shutdown timeout
will be required to trap exits, just like in a supervisor.
In a vanilla Cowboy, this only matters at connection
shutdown, as Cowboy will otherwise wait for the request
process to be down before stopping the stream.
Tests are currently missing.
-rw-r--r-- | ebin/cowboy.app | 2 | ||||
-rw-r--r-- | src/cowboy_http.erl | 64 | ||||
-rw-r--r-- | src/cowboy_http2.erl | 71 |
3 files changed, 59 insertions, 78 deletions
diff --git a/ebin/cowboy.app b/ebin/cowboy.app index 8394055..f85ccec 100644 --- a/ebin/cowboy.app +++ b/ebin/cowboy.app @@ -1,7 +1,7 @@ {application, 'cowboy', [ {description, "Small, fast, modern HTTP server."}, {vsn, "2.0.0-rc.1"}, - {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_iolists','cowboy_loop','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']}, + {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_iolists','cowboy_loop','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']}, {registered, [cowboy_sup,cowboy_clock]}, {applications, [kernel,stdlib,crypto,cowlib,ranch]}, {mod, {cowboy_app, []}}, diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index a612f20..5ee5ceb 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -115,7 +115,7 @@ streams = [] :: [stream()], %% Children processes created by streams. - children = [] :: [{pid(), cowboy_stream:streamid(), timeout()}] + children = cowboy_children:init() :: cowboy_children:children() }). -include_lib("cowlib/include/cow_inline.hrl"). @@ -162,6 +162,9 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, {Error, Socket, Reason} -> terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'}); %% Timeouts. + {timeout, Ref, {shutdown, Pid}} -> + cowboy_children:shutdown_timeout(Children, Ref, Pid), + loop(State, Buffer); {timeout, TimerRef, Reason} -> timeout(State, Reason); {timeout, _, _} -> @@ -179,14 +182,10 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, loop(down(State, Pid, Msg), Buffer); %% Calls from supervisor module. {'$gen_call', {From, Tag}, which_children} -> - Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _, _} <- Children], - From ! {Tag, Workers}, + From ! {Tag, cowboy_children:which_children(Children, ?MODULE)}, loop(State, Buffer); {'$gen_call', {From, Tag}, count_children} -> - NbChildren = length(Children), - Counts = [{specs, 1}, {active, NbChildren}, - {supervisors, 0}, {workers, NbChildren}], - From ! {Tag, Counts}, + From ! {Tag, cowboy_children:count_children(Children)}, loop(State, Buffer); {'$gen_call', {From, Tag}, _} -> From ! {Tag, {error, ?MODULE}}, @@ -720,18 +719,16 @@ parse_body(Buffer, State=#state{in_streamid=StreamID, in_state= %% Message handling. -%% @todo There is a difference in behavior between HTTP/1.1 and HTTP/2 -%% when an error or crash occurs after sending a 500 response. In HTTP/2 -%% the error will be printed, in HTTP/1.1 the error will be ignored. -%% This is due to HTTP/1.1 disabling streams differently after both -%% requests and responses have been sent. down(State=#state{children=Children0}, Pid, Msg) -> - case lists:keytake(Pid, 1, Children0) of - {value, {_, undefined, _}, Children} -> + case cowboy_children:down(Children0, Pid) of + %% The stream was terminated already. + {ok, undefined, Children} -> State#state{children=Children}; - {value, {_, StreamID, _}, Children} -> + %% The stream is still running. + {ok, StreamID, Children} -> info(State#state{children=Children}, StreamID, Msg); - false -> + %% The process was unknown. + error -> error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.~n", [Msg, Pid]), State end. @@ -764,7 +761,8 @@ commands(State, _, []) -> State; %% Supervise a child process. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) -> - commands(State#state{children=[{Pid, StreamID, Shutdown}|Children]}, StreamID, Tail); + commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)}, + StreamID, Tail); %% Error handling. commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) -> commands(stream_reset(State, StreamID, Error), StreamID, Tail); @@ -886,7 +884,9 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor %% stream processes finish but that implies the Websocket module to know about %% them and filter the messages. For now, kill them all and discard all messages %% in the mailbox. - _ = [exit(Pid, kill) || {Pid, _, _} <- Children], + + cowboy_children:terminate(Children), + flush(), %% Everything good, upgrade! _ = commands(State, StreamID, [{inform, 101, Headers}]), @@ -955,12 +955,8 @@ stream_terminate(State0=#state{socket=Socket, transport=Transport, end, stream_call_terminate(StreamID, Reason, StreamState), -%% @todo initiate children shutdown -% Children = stream_terminate_children(Children0, StreamID, []), - Children = [case C of - {Pid, StreamID, Shutdown} -> {Pid, undefined, Shutdown}; - _ -> C - end || C <- Children0], + + Children = cowboy_children:shutdown(Children0, StreamID), %% @todo Skip the body, if any, or drop the connection if too large. @@ -987,15 +983,6 @@ stream_call_terminate(StreamID, Reason, StreamState) -> [StreamID, Reason, StreamState, Class, Reason]) end. -%stream_terminate_children([], _, Acc) -> -% Acc; -%stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) -> -% exit(Pid, kill), -% stream_terminate_children(Tail, StreamID, Acc); -%stream_terminate_children([Child|Tail], StreamID, Acc) -> -% stream_terminate_children(Tail, StreamID, [Child|Acc]). - - %% @todo max_reqs also maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') -> Conns = cow_http_hd:parse_connection(Conn), @@ -1073,9 +1060,8 @@ terminate(undefined, Reason) -> exit({shutdown, Reason}); terminate(#state{streams=Streams, children=Children}, Reason) -> terminate_all_streams(Streams, Reason), - %% @todo Leave them time to terminate. - _ = [exit(Pid, kill) || {Pid, _, _} <- Children], - exit(normal). %% @todo We probably don't want to exit normal on errors. + cowboy_children:terminate(Children), + exit({shutdown, Reason}). terminate_all_streams([], _) -> ok; @@ -1089,9 +1075,9 @@ terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason) -> system_continue(_, _, {State, Buffer}) -> loop(State, Buffer). --spec system_terminate(any(), _, _, _) -> no_return(). -system_terminate(Reason, _, _, _) -> - exit(Reason). +-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return(). +system_terminate(Reason, _, _, {State, _}) -> + terminate(State, Reason). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. system_code_change(Misc, _, _, _) -> diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 23b75eb..2d0b37c 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -102,7 +102,7 @@ %% Streams can spawn zero or more children which are then managed %% by this module if operating as a supervisor. - children = [] :: [{pid(), cowboy_stream:streamid()}], + children = cowboy_children:init() :: cowboy_children:children(), %% The client starts by sending a sequence of bytes as a preface, %% followed by a potentially empty SETTINGS frame. Then the connection @@ -194,6 +194,10 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, exit(Reason); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer}); + %% Timeouts. + {timeout, Ref, {shutdown, Pid}} -> + cowboy_children:shutdown_timeout(Children, Ref, Pid), + loop(State, Buffer); {timeout, TRef, preface_timeout} -> case PS of {preface, _, TRef} -> @@ -210,14 +214,10 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, loop(down(State, Pid, Msg), Buffer); %% Calls from supervisor module. {'$gen_call', {From, Tag}, which_children} -> - Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _} <- Children], - From ! {Tag, Workers}, + From ! {Tag, cowboy_children:which_children(Children, ?MODULE)}, loop(State, Buffer); {'$gen_call', {From, Tag}, count_children} -> - NbChildren = length(Children), - Counts = [{specs, 1}, {active, NbChildren}, - {supervisors, 0}, {workers, NbChildren}], - From ! {Tag, Counts}, + From ! {Tag, cowboy_children:count_children(Children)}, loop(State, Buffer); {'$gen_call', {From, Tag}, _} -> From ! {Tag, {error, ?MODULE}}, @@ -422,11 +422,16 @@ continuation_frame(State, _) -> 'An invalid frame was received while expecting a CONTINUATION frame. (RFC7540 6.2)'}). down(State=#state{children=Children0}, Pid, Msg) -> - case lists:keytake(Pid, 1, Children0) of - {value, {_, StreamID}, Children} -> + case cowboy_children:down(Children0, Pid) of + %% The stream was terminated already. + {ok, undefined, Children} -> + State#state{children=Children}; + %% The stream is still running. + {ok, StreamID, Children} -> info(State#state{children=Children}, StreamID, Msg); - false -> - error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.", [Msg, Pid]), + %% The process was unknown. + error -> + error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.~n", [Msg, Pid]), State end. @@ -559,8 +564,9 @@ commands(State=#state{socket=Socket, transport=Transport, remote_window=ConnWind Stream#stream{remote_window=StreamWindow + Size}, Tail); %% Supervise a child process. commands(State=#state{children=Children}, Stream=#stream{id=StreamID}, - [{spawn, Pid, _Shutdown}|Tail]) -> %% @todo Shutdown - commands(State#state{children=[{Pid, StreamID}|Children]}, Stream, Tail); + [{spawn, Pid, Shutdown}|Tail]) -> + commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)}, + Stream, Tail); %% Error handling. commands(State, Stream=#stream{id=StreamID}, [Error = {internal_error, _, _}|_Tail]) -> %% @todo Do we want to run the commands after an internal_error? @@ -681,7 +687,8 @@ terminate(#state{socket=Socket, transport=Transport, client_streamid=LastStreamI %% @todo We might want to optionally send the Reason value %% as debug data in the GOAWAY frame here. Perhaps more. Transport:send(Socket, cow_http2:goaway(LastStreamID, terminate_reason(Reason), <<>>)), - terminate_all_streams(Streams, Reason, Children), + terminate_all_streams(Streams, Reason), + cowboy_children:terminate(Children), Transport:close(Socket), exit({shutdown, Reason}). @@ -690,15 +697,14 @@ terminate_reason({stop, _, _}) -> no_error; terminate_reason({socket_error, _, _}) -> internal_error; terminate_reason({internal_error, _, _}) -> internal_error. -terminate_all_streams([], _, []) -> +terminate_all_streams([], _) -> ok; %% This stream was already terminated and is now just flushing the data out. Skip it. -terminate_all_streams([#stream{state=flush}|Tail], Reason, Children) -> - terminate_all_streams(Tail, Reason, Children); -terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason, Children0) -> +terminate_all_streams([#stream{state=flush}|Tail], Reason) -> + terminate_all_streams(Tail, Reason); +terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason) -> stream_call_terminate(StreamID, Reason, StreamState), - Children = stream_terminate_children(Children0, StreamID, []), - terminate_all_streams(Tail, Reason, Children). + terminate_all_streams(Tail, Reason). %% Stream functions. @@ -793,26 +799,26 @@ stream_terminate(State=#state{socket=Socket, transport=Transport, {value, #stream{state=StreamState, local=idle}, Streams} when Reason =:= normal -> State1 = info(State, StreamID, {response, 204, #{}, <<>>}), stream_call_terminate(StreamID, Reason, StreamState), - Children = stream_terminate_children(Children0, StreamID, []), + Children = cowboy_children:shutdown(Children0, StreamID), State1#state{streams=Streams, children=Children}; %% When a response was sent but not terminated, we need to close the stream. {value, #stream{state=StreamState, local=nofin, local_buffer_size=0}, Streams} when Reason =:= normal -> Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)), stream_call_terminate(StreamID, Reason, StreamState), - Children = stream_terminate_children(Children0, StreamID, []), + Children = cowboy_children:shutdown(Children0, StreamID), State#state{streams=Streams, children=Children}; %% Unless there is still data in the buffer. We can however reset %% a few fields and set a special local state to avoid confusion. {value, Stream=#stream{state=StreamState, local=nofin}, Streams} -> stream_call_terminate(StreamID, Reason, StreamState), - Children = stream_terminate_children(Children0, StreamID, []), + Children = cowboy_children:shutdown(Children0, StreamID), State#state{streams=[Stream#stream{state=flush, local=flush}|Streams], children=Children}; %% Otherwise we sent an RST_STREAM and/or the stream is already closed. {value, #stream{state=StreamState}, Streams} -> stream_call_terminate(StreamID, Reason, StreamState), - Children = stream_terminate_children(Children0, StreamID, []), + Children = cowboy_children:shutdown(Children0, StreamID), State#state{streams=Streams, children=Children}; false -> %% @todo Unknown stream. Not sure what to do here. Check again once all @@ -829,17 +835,6 @@ stream_call_terminate(StreamID, Reason, StreamState) -> [StreamID, Reason, StreamState, Class, Reason]) end. -stream_terminate_children([], _, Acc) -> - Acc; -stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) -> - %% We unlink and flush the mailbox to avoid receiving a stray message. - unlink(Pid), - receive {'EXIT', Pid, _} -> ok after 0 -> ok end, - exit(Pid, kill), - stream_terminate_children(Tail, StreamID, Acc); -stream_terminate_children([Child|Tail], StreamID, Acc) -> - stream_terminate_children(Tail, StreamID, [Child|Acc]). - %% Headers encode/decode. headers_decode(HeaderBlock, DecodeState0) -> @@ -874,9 +869,9 @@ headers_encode(Headers0, EncodeState) -> system_continue(_, _, {State, Buffer}) -> loop(State, Buffer). --spec system_terminate(any(), _, _, _) -> no_return(). -system_terminate(Reason, _, _, _) -> - exit(Reason). +-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return(). +system_terminate(Reason, _, _, {State, _}) -> + terminate(State, Reason). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. system_code_change(Misc, _, _, _) -> |