diff options
-rw-r--r-- | ebin/cowboy.app | 2 | ||||
-rw-r--r-- | src/cowboy_http.erl | 64 | ||||
-rw-r--r-- | src/cowboy_http2.erl | 71 |
3 files changed, 59 insertions, 78 deletions
diff --git a/ebin/cowboy.app b/ebin/cowboy.app index 8394055..f85ccec 100644 --- a/ebin/cowboy.app +++ b/ebin/cowboy.app @@ -1,7 +1,7 @@ {application, 'cowboy', [ {description, "Small, fast, modern HTTP server."}, {vsn, "2.0.0-rc.1"}, - {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_iolists','cowboy_loop','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']}, + {modules, ['cowboy','cowboy_app','cowboy_bstr','cowboy_children','cowboy_clear','cowboy_clock','cowboy_compress_h','cowboy_constraints','cowboy_handler','cowboy_http','cowboy_http2','cowboy_iolists','cowboy_loop','cowboy_middleware','cowboy_req','cowboy_rest','cowboy_router','cowboy_static','cowboy_stream','cowboy_stream_h','cowboy_sub_protocol','cowboy_sup','cowboy_tls','cowboy_websocket']}, {registered, [cowboy_sup,cowboy_clock]}, {applications, [kernel,stdlib,crypto,cowlib,ranch]}, {mod, {cowboy_app, []}}, diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index a612f20..5ee5ceb 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -115,7 +115,7 @@ streams = [] :: [stream()], %% Children processes created by streams. - children = [] :: [{pid(), cowboy_stream:streamid(), timeout()}] + children = cowboy_children:init() :: cowboy_children:children() }). -include_lib("cowlib/include/cow_inline.hrl"). @@ -162,6 +162,9 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, {Error, Socket, Reason} -> terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'}); %% Timeouts. + {timeout, Ref, {shutdown, Pid}} -> + cowboy_children:shutdown_timeout(Children, Ref, Pid), + loop(State, Buffer); {timeout, TimerRef, Reason} -> timeout(State, Reason); {timeout, _, _} -> @@ -179,14 +182,10 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, loop(down(State, Pid, Msg), Buffer); %% Calls from supervisor module. {'$gen_call', {From, Tag}, which_children} -> - Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _, _} <- Children], - From ! {Tag, Workers}, + From ! {Tag, cowboy_children:which_children(Children, ?MODULE)}, loop(State, Buffer); {'$gen_call', {From, Tag}, count_children} -> - NbChildren = length(Children), - Counts = [{specs, 1}, {active, NbChildren}, - {supervisors, 0}, {workers, NbChildren}], - From ! {Tag, Counts}, + From ! {Tag, cowboy_children:count_children(Children)}, loop(State, Buffer); {'$gen_call', {From, Tag}, _} -> From ! {Tag, {error, ?MODULE}}, @@ -720,18 +719,16 @@ parse_body(Buffer, State=#state{in_streamid=StreamID, in_state= %% Message handling. -%% @todo There is a difference in behavior between HTTP/1.1 and HTTP/2 -%% when an error or crash occurs after sending a 500 response. In HTTP/2 -%% the error will be printed, in HTTP/1.1 the error will be ignored. -%% This is due to HTTP/1.1 disabling streams differently after both -%% requests and responses have been sent. down(State=#state{children=Children0}, Pid, Msg) -> - case lists:keytake(Pid, 1, Children0) of - {value, {_, undefined, _}, Children} -> + case cowboy_children:down(Children0, Pid) of + %% The stream was terminated already. + {ok, undefined, Children} -> State#state{children=Children}; - {value, {_, StreamID, _}, Children} -> + %% The stream is still running. + {ok, StreamID, Children} -> info(State#state{children=Children}, StreamID, Msg); - false -> + %% The process was unknown. + error -> error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.~n", [Msg, Pid]), State end. @@ -764,7 +761,8 @@ commands(State, _, []) -> State; %% Supervise a child process. commands(State=#state{children=Children}, StreamID, [{spawn, Pid, Shutdown}|Tail]) -> - commands(State#state{children=[{Pid, StreamID, Shutdown}|Children]}, StreamID, Tail); + commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)}, + StreamID, Tail); %% Error handling. commands(State, StreamID, [Error = {internal_error, _, _}|Tail]) -> commands(stream_reset(State, StreamID, Error), StreamID, Tail); @@ -886,7 +884,9 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor %% stream processes finish but that implies the Websocket module to know about %% them and filter the messages. For now, kill them all and discard all messages %% in the mailbox. - _ = [exit(Pid, kill) || {Pid, _, _} <- Children], + + cowboy_children:terminate(Children), + flush(), %% Everything good, upgrade! _ = commands(State, StreamID, [{inform, 101, Headers}]), @@ -955,12 +955,8 @@ stream_terminate(State0=#state{socket=Socket, transport=Transport, end, stream_call_terminate(StreamID, Reason, StreamState), -%% @todo initiate children shutdown -% Children = stream_terminate_children(Children0, StreamID, []), - Children = [case C of - {Pid, StreamID, Shutdown} -> {Pid, undefined, Shutdown}; - _ -> C - end || C <- Children0], + + Children = cowboy_children:shutdown(Children0, StreamID), %% @todo Skip the body, if any, or drop the connection if too large. @@ -987,15 +983,6 @@ stream_call_terminate(StreamID, Reason, StreamState) -> [StreamID, Reason, StreamState, Class, Reason]) end. -%stream_terminate_children([], _, Acc) -> -% Acc; -%stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) -> -% exit(Pid, kill), -% stream_terminate_children(Tail, StreamID, Acc); -%stream_terminate_children([Child|Tail], StreamID, Acc) -> -% stream_terminate_children(Tail, StreamID, [Child|Acc]). - - %% @todo max_reqs also maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') -> Conns = cow_http_hd:parse_connection(Conn), @@ -1073,9 +1060,8 @@ terminate(undefined, Reason) -> exit({shutdown, Reason}); terminate(#state{streams=Streams, children=Children}, Reason) -> terminate_all_streams(Streams, Reason), - %% @todo Leave them time to terminate. - _ = [exit(Pid, kill) || {Pid, _, _} <- Children], - exit(normal). %% @todo We probably don't want to exit normal on errors. + cowboy_children:terminate(Children), + exit({shutdown, Reason}). terminate_all_streams([], _) -> ok; @@ -1089,9 +1075,9 @@ terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason) -> system_continue(_, _, {State, Buffer}) -> loop(State, Buffer). --spec system_terminate(any(), _, _, _) -> no_return(). -system_terminate(Reason, _, _, _) -> - exit(Reason). +-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return(). +system_terminate(Reason, _, _, {State, _}) -> + terminate(State, Reason). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. system_code_change(Misc, _, _, _) -> diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 23b75eb..2d0b37c 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -102,7 +102,7 @@ %% Streams can spawn zero or more children which are then managed %% by this module if operating as a supervisor. - children = [] :: [{pid(), cowboy_stream:streamid()}], + children = cowboy_children:init() :: cowboy_children:children(), %% The client starts by sending a sequence of bytes as a preface, %% followed by a potentially empty SETTINGS frame. Then the connection @@ -194,6 +194,10 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, exit(Reason); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer}); + %% Timeouts. + {timeout, Ref, {shutdown, Pid}} -> + cowboy_children:shutdown_timeout(Children, Ref, Pid), + loop(State, Buffer); {timeout, TRef, preface_timeout} -> case PS of {preface, _, TRef} -> @@ -210,14 +214,10 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, loop(down(State, Pid, Msg), Buffer); %% Calls from supervisor module. {'$gen_call', {From, Tag}, which_children} -> - Workers = [{?MODULE, Pid, worker, [?MODULE]} || {Pid, _} <- Children], - From ! {Tag, Workers}, + From ! {Tag, cowboy_children:which_children(Children, ?MODULE)}, loop(State, Buffer); {'$gen_call', {From, Tag}, count_children} -> - NbChildren = length(Children), - Counts = [{specs, 1}, {active, NbChildren}, - {supervisors, 0}, {workers, NbChildren}], - From ! {Tag, Counts}, + From ! {Tag, cowboy_children:count_children(Children)}, loop(State, Buffer); {'$gen_call', {From, Tag}, _} -> From ! {Tag, {error, ?MODULE}}, @@ -422,11 +422,16 @@ continuation_frame(State, _) -> 'An invalid frame was received while expecting a CONTINUATION frame. (RFC7540 6.2)'}). down(State=#state{children=Children0}, Pid, Msg) -> - case lists:keytake(Pid, 1, Children0) of - {value, {_, StreamID}, Children} -> + case cowboy_children:down(Children0, Pid) of + %% The stream was terminated already. + {ok, undefined, Children} -> + State#state{children=Children}; + %% The stream is still running. + {ok, StreamID, Children} -> info(State#state{children=Children}, StreamID, Msg); - false -> - error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.", [Msg, Pid]), + %% The process was unknown. + error -> + error_logger:error_msg("Received EXIT signal ~p for unknown process ~p.~n", [Msg, Pid]), State end. @@ -559,8 +564,9 @@ commands(State=#state{socket=Socket, transport=Transport, remote_window=ConnWind Stream#stream{remote_window=StreamWindow + Size}, Tail); %% Supervise a child process. commands(State=#state{children=Children}, Stream=#stream{id=StreamID}, - [{spawn, Pid, _Shutdown}|Tail]) -> %% @todo Shutdown - commands(State#state{children=[{Pid, StreamID}|Children]}, Stream, Tail); + [{spawn, Pid, Shutdown}|Tail]) -> + commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)}, + Stream, Tail); %% Error handling. commands(State, Stream=#stream{id=StreamID}, [Error = {internal_error, _, _}|_Tail]) -> %% @todo Do we want to run the commands after an internal_error? @@ -681,7 +687,8 @@ terminate(#state{socket=Socket, transport=Transport, client_streamid=LastStreamI %% @todo We might want to optionally send the Reason value %% as debug data in the GOAWAY frame here. Perhaps more. Transport:send(Socket, cow_http2:goaway(LastStreamID, terminate_reason(Reason), <<>>)), - terminate_all_streams(Streams, Reason, Children), + terminate_all_streams(Streams, Reason), + cowboy_children:terminate(Children), Transport:close(Socket), exit({shutdown, Reason}). @@ -690,15 +697,14 @@ terminate_reason({stop, _, _}) -> no_error; terminate_reason({socket_error, _, _}) -> internal_error; terminate_reason({internal_error, _, _}) -> internal_error. -terminate_all_streams([], _, []) -> +terminate_all_streams([], _) -> ok; %% This stream was already terminated and is now just flushing the data out. Skip it. -terminate_all_streams([#stream{state=flush}|Tail], Reason, Children) -> - terminate_all_streams(Tail, Reason, Children); -terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason, Children0) -> +terminate_all_streams([#stream{state=flush}|Tail], Reason) -> + terminate_all_streams(Tail, Reason); +terminate_all_streams([#stream{id=StreamID, state=StreamState}|Tail], Reason) -> stream_call_terminate(StreamID, Reason, StreamState), - Children = stream_terminate_children(Children0, StreamID, []), - terminate_all_streams(Tail, Reason, Children). + terminate_all_streams(Tail, Reason). %% Stream functions. @@ -793,26 +799,26 @@ stream_terminate(State=#state{socket=Socket, transport=Transport, {value, #stream{state=StreamState, local=idle}, Streams} when Reason =:= normal -> State1 = info(State, StreamID, {response, 204, #{}, <<>>}), stream_call_terminate(StreamID, Reason, StreamState), - Children = stream_terminate_children(Children0, StreamID, []), + Children = cowboy_children:shutdown(Children0, StreamID), State1#state{streams=Streams, children=Children}; %% When a response was sent but not terminated, we need to close the stream. {value, #stream{state=StreamState, local=nofin, local_buffer_size=0}, Streams} when Reason =:= normal -> Transport:send(Socket, cow_http2:data(StreamID, fin, <<>>)), stream_call_terminate(StreamID, Reason, StreamState), - Children = stream_terminate_children(Children0, StreamID, []), + Children = cowboy_children:shutdown(Children0, StreamID), State#state{streams=Streams, children=Children}; %% Unless there is still data in the buffer. We can however reset %% a few fields and set a special local state to avoid confusion. {value, Stream=#stream{state=StreamState, local=nofin}, Streams} -> stream_call_terminate(StreamID, Reason, StreamState), - Children = stream_terminate_children(Children0, StreamID, []), + Children = cowboy_children:shutdown(Children0, StreamID), State#state{streams=[Stream#stream{state=flush, local=flush}|Streams], children=Children}; %% Otherwise we sent an RST_STREAM and/or the stream is already closed. {value, #stream{state=StreamState}, Streams} -> stream_call_terminate(StreamID, Reason, StreamState), - Children = stream_terminate_children(Children0, StreamID, []), + Children = cowboy_children:shutdown(Children0, StreamID), State#state{streams=Streams, children=Children}; false -> %% @todo Unknown stream. Not sure what to do here. Check again once all @@ -829,17 +835,6 @@ stream_call_terminate(StreamID, Reason, StreamState) -> [StreamID, Reason, StreamState, Class, Reason]) end. -stream_terminate_children([], _, Acc) -> - Acc; -stream_terminate_children([{Pid, StreamID}|Tail], StreamID, Acc) -> - %% We unlink and flush the mailbox to avoid receiving a stray message. - unlink(Pid), - receive {'EXIT', Pid, _} -> ok after 0 -> ok end, - exit(Pid, kill), - stream_terminate_children(Tail, StreamID, Acc); -stream_terminate_children([Child|Tail], StreamID, Acc) -> - stream_terminate_children(Tail, StreamID, [Child|Acc]). - %% Headers encode/decode. headers_decode(HeaderBlock, DecodeState0) -> @@ -874,9 +869,9 @@ headers_encode(Headers0, EncodeState) -> system_continue(_, _, {State, Buffer}) -> loop(State, Buffer). --spec system_terminate(any(), _, _, _) -> no_return(). -system_terminate(Reason, _, _, _) -> - exit(Reason). +-spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return(). +system_terminate(Reason, _, _, {State, _}) -> + terminate(State, Reason). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. system_code_change(Misc, _, _, _) -> |