aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_http2.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2017-08-08 16:59:33 +0200
committerLoïc Hoguin <[email protected]>2017-08-08 16:59:33 +0200
commit45ddcd8c67f2fc20f4d44e84b5e7e3faef0a1d58 (patch)
tree04f08112f1c3426c58e756c0f593a17dafe889ab /src/cowboy_http2.erl
parent4fa7aeb0fd2e68fba267b33538685a7e1e18f4aa (diff)
downloadcowboy-45ddcd8c67f2fc20f4d44e84b5e7e3faef0a1d58.tar.gz
cowboy-45ddcd8c67f2fc20f4d44e84b5e7e3faef0a1d58.tar.bz2
cowboy-45ddcd8c67f2fc20f4d44e84b5e7e3faef0a1d58.zip
Implement the shutdown timeout for request processes
This should work very similar to normal supervisors, in particular during the shutdown sequence when the connection process goes down or switches to Websocket. Processes that need to enforce the shutdown timeout will be required to trap exits, just like in a supervisor. In a vanilla Cowboy, this only matters at connection shutdown, as Cowboy will otherwise wait for the request process to be down before stopping the stream. Tests are currently missing.
Diffstat (limited to 'src/cowboy_http2.erl')
-rw-r--r--src/cowboy_http2.erl71
1 files changed, 33 insertions, 38 deletions
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl
index 23b75eb..2d0b37c 100644
--- a/src/cowboy_http2.erl
+++ b/src/cowboy_http2.erl
@@ -102,7 +102,7 @@
%% Streams can spawn zero or more children which are then managed
%% by this module if operating as a supervisor.
- children = [] :: [{pid(), cowboy_stream:streamid()}],
+ children = cowboy_children:init() :: cowboy_children:children(),
%% The client starts by sending a sequence of bytes as a preface,
%% followed by a potentially empty SETTINGS frame. Then the connection
@@ -194,6 +194,10 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
exit(Reason);
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer});
+ %% Timeouts.
+ {timeout, Ref, {shutdown, Pid}} ->
+ cowboy_children:shutdown_timeout(Children, Ref, Pid),
+ loop(State, Buffer);
{timeout, TRef, preface_timeout} ->
case PS of
{preface, _, TRef} ->
@@ -210,14 +214,10 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport,
loop(down(State, Pid, Msg), Buffer);
%% Calls from supervisor module.
{'$gen_call', {From, Tag}, which_children} ->
- Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _} <- Children],
- From ! {Tag, Workers},
+ From ! {Tag, cowboy_children:which_children(Children, ?MODULE)},
loop(State, Buffer);
{'$gen_call', {From, Tag}, count_children} ->
- NbChildren = length(Children),
- Counts = [{specs, 1}, {active, NbChildren},
- {supervisors, 0}, {workers, NbChildren}],
- From ! {Tag, Counts},
+ From ! {Tag, cowboy_children:count_children(Children)},
loop(State, Buffer);
{'$gen_call', {From, Tag}, _} ->
From ! {Tag, {error, ?MODULE}},
@@ -422,11 +422,16 @@ continuation_frame(State, _) ->
'An invalid frame was received while expecting a CONTINUATION frame. (RFC7540 6.2)'}).
down(State=#state{children=Children0}, Pid, Msg) ->
- case lists:keytake(Pid, 1, Children0) of
- {value, {_, StreamID}, Children} ->
+ case cowboy_children:down(Children0, Pid) of
+ %% The stream was terminated already.
+ {ok, undefined, Children} ->
+ State#state{children=Children};
+ %% The stream is still running.
+ {ok, StreamID, Children} ->
info(State#state{children=Children}, StreamID, Msg);
- false ->
- error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.", [Msg, Pid]),
+ %% The process was unknown.
+ error ->
+ error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.~n", [Msg, Pid]),
State
end.
@@ -559,8 +564,9 @@ commands(State=#state{socket=Socket, transport=Transport, remote_window=ConnWind
Stream#stream{remote_window=StreamWindow + Size}, Tail);
%% Supervise a child process.
commands(State=#state{children=Children}, Stream=#stream{id=StreamID},
- [{spawn, Pid, _Shutdown}|Tail]) -> %% @todo Shutdown
- commands(State#state{children=[{Pid, StreamID}|Children]}, Stream, Tail);
+ [{spawn, Pid, Shutdown}|Tail]) ->
+ commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)},
+ Stream, Tail);
%% Error handling.
commands(State, Stream=#stream{id=StreamID}, [Error = {internal_error, _, _}|_Tail]) ->
%% @todo Do we want to run the commands after an internal_error?
@@ -681,7 +687,8 @@ terminate(#state{socket=Socket, transport=Transport, client_streamid=LastStreamI
%% @todo We might want to optionally send the Reason value
%% as debug data in the GOAWAY frame here. Perhaps more.
Transport:send(Socket, cow_http2:goaway(LastStreamID, terminate_reason(Reason), <<>>)),
- terminate_all_streams(Streams, Reason, Children),
+ terminate_all_streams(Streams, Reason),
+ cowboy_children:terminate(Children),
Transport:close(Socket),
exit({shutdown, Reason}).
@@ -690,15 +697,14 @@ terminate_reason({stop, _, _}) -> no_error;
terminate_reason({socket_error, _, _}) -> internal_error;
terminate_reason({internal_error, _, _}) -> internal_error.
-terminate_all_streams([], _, []) ->
+terminate_all_streams([], _) ->
ok;
%% This stream was already terminated and is now just flushing the data out. Skip it.
-terminate_all_streams([#stream{state=flush}|Tail], Reason, Children) ->
- terminate_all_streams(Tail, Reason, Children);
-terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason, Children0) ->
+terminate_all_streams([#stream{state=flush}|Tail], Reason) ->
+ terminate_all_streams(Tail, Reason);
+terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason) ->
stream_call_terminate(StreamID, Reason, StreamState),
- Children = stream_terminate_children(Children0, StreamID, []),
- terminate_all_streams(Tail, Reason, Children).
+ terminate_all_streams(Tail, Reason).
%% Stream functions.
@@ -793,26 +799,26 @@ stream_terminate(State=#state{socket=Socket, transport=Transport,
{value, #stream{state=StreamState, local=idle}, Streams} when Reason =:= normal ->
State1 = info(State, StreamID, {response, 204, #{}, <<>>}),
stream_call_terminate(StreamID, Reason, StreamState),
- Children = stream_terminate_children(Children0, StreamID, []),
+ Children = cowboy_children:shutdown(Children0, StreamID),
State1#state{streams=Streams, children=Children};
%% When a response was sent but not terminated, we need to close the stream.
{value, #stream{state=StreamState, local=nofin, local_buffer_size=0}, Streams}
when Reason =:= normal ->
Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)),
stream_call_terminate(StreamID, Reason, StreamState),
- Children = stream_terminate_children(Children0, StreamID, []),
+ Children = cowboy_children:shutdown(Children0, StreamID),
State#state{streams=Streams, children=Children};
%% Unless there is still data in the buffer. We can however reset
%% a few fields and set a special local state to avoid confusion.
{value, Stream=#stream{state=StreamState, local=nofin}, Streams} ->
stream_call_terminate(StreamID, Reason, StreamState),
- Children = stream_terminate_children(Children0, StreamID, []),
+ Children = cowboy_children:shutdown(Children0, StreamID),
State#state{streams=[Stream#stream{state=flush, local=flush}|Streams],
children=Children};
%% Otherwise we sent an RST_STREAM and/or the stream is already closed.
{value, #stream{state=StreamState}, Streams} ->
stream_call_terminate(StreamID, Reason, StreamState),
- Children = stream_terminate_children(Children0, StreamID, []),
+ Children = cowboy_children:shutdown(Children0, StreamID),
State#state{streams=Streams, children=Children};
false ->
%% @todo Unknown stream. Not sure what to do here. Check again once all
@@ -829,17 +835,6 @@ stream_call_terminate(StreamID, Reason, StreamState) ->
[StreamID, Reason, StreamState, Class, Reason])
end.
-stream_terminate_children([], _, Acc) ->
- Acc;
-stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) ->
- %% We unlink and flush the mailbox to avoid receiving a stray message.
- unlink(Pid),
- receive {'EXIT', Pid, _} -> ok after 0 -> ok end,
- exit(Pid, kill),
- stream_terminate_children(Tail, StreamID, Acc);
-stream_terminate_children([Child|Tail], StreamID, Acc) ->
- stream_terminate_children(Tail, StreamID, [Child|Acc]).
-
%% Headers encode/decode.
headers_decode(HeaderBlock, DecodeState0) ->
@@ -874,9 +869,9 @@ headers_encode(Headers0, EncodeState) ->
system_continue(_, _, {State, Buffer}) ->
loop(State, Buffer).
--spec system_terminate(any(), _, _, _) -> no_return().
-system_terminate(Reason, _, _, _) ->
- exit(Reason).
+-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return().
+system_terminate(Reason, _, _, {State, _}) ->
+ terminate(State, Reason).
-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
system_code_change(Misc, _, _, _) ->