From db0d6f8d254f2cc01bd458dc41969e0b96991cc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 4 Dec 2019 11:17:34 +0100 Subject: Use active,N This reduces the number of times we need to ask for more packets, and as a result we get a fairly large boost in performance, especially with HTTP/1.1. Unfortunately this makes Cowboy require at least Erlang/OTP 21.3+ because the ssl application did not have active,N. For simplicity the version required will be Erlang/OTP 22+. In addition this change improves hibernate handling in cowboy_websocket. Hibernate will now work for HTTP/2 transport as well, and stray or unrelated messages will no longer cancel hibernate (the process will handle the message and go back into hibernation). Thanks go to Stressgrid for benchmarking an early version of this commit: https://stressgrid.com/blog/cowboy_performance_part_2/ --- src/cowboy_http.erl | 117 +++++++++++++++++++++++++++++++---------------- src/cowboy_http2.erl | 17 ++++++- src/cowboy_stream_h.erl | 1 + src/cowboy_websocket.erl | 100 +++++++++++++++++++++++++++++++--------- 4 files changed, 173 insertions(+), 62 deletions(-) (limited to 'src') diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 90e3203..ad2ef75 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -21,6 +21,7 @@ -export([system_code_change/4]). -type opts() :: #{ + active_n => pos_integer(), chunked => boolean(), compress_buffering => boolean(), compress_threshold => non_neg_integer(), @@ -121,6 +122,9 @@ timer = undefined :: undefined | reference(), + %% Whether we are currently receiving data from the socket. + active = true :: boolean(), + %% Identifier for the stream currently being read (or waiting to be received). in_streamid = 1 :: pos_integer(), @@ -173,7 +177,8 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, last_streamid=maps:get(max_keepalive, Opts, 100)}, - before_loop(set_timeout(State, request_timeout)); + setopts_active(State), + loop(set_timeout(State, request_timeout)); {{error, Reason}, _, _} -> terminate(undefined, {socket_error, Reason, 'A socket error occurred when retrieving the peer name.'}); @@ -185,12 +190,29 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> 'A socket error occurred when retrieving the client TLS certificate.'}) end. -%% Do not read from the socket unless flow is large enough. -before_loop(State=#state{flow=Flow}) when Flow =< 0 -> - loop(State); -before_loop(State=#state{socket=Socket, transport=Transport}) -> - Transport:setopts(Socket, [{active, once}]), - loop(State). +setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> + N = maps:get(active_n, Opts, 100), + Transport:setopts(Socket, [{active, N}]). + +active(State) -> + setopts_active(State), + State#state{active=true}. + +passive(State=#state{socket=Socket, transport=Transport}) -> + Transport:setopts(Socket, [{active, false}]), + Messages = Transport:messages(), + flush_passive(Socket, Messages), + State#state{active=false}. + +flush_passive(Socket, Messages) -> + receive + {Passive, Socket} when Passive =:= element(4, Messages); + %% Hardcoded for compatibility with Ranch 1.x. + Passive =:= tcp_passive; Passive =:= ssl_passive -> + flush_passive(Socket, Messages) + after 0 -> + ok + end. loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID, @@ -201,7 +223,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, %% Discard data coming in after the last request %% we want to process was received fully. {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID -> - before_loop(State); + loop(State); %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> parse(<< Buffer/binary, Data/binary >>, State); @@ -209,6 +231,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, terminate(State, {socket_error, closed, 'The socket has been closed.'}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'}); + {Passive, Socket} when Passive =:= element(4, Messages); + %% Hardcoded for compatibility with Ranch 1.x. + Passive =:= tcp_passive; Passive =:= ssl_passive -> + setopts_active(State), + loop(State); %% Timeouts. {timeout, Ref, {shutdown, Pid}} -> cowboy_children:shutdown_timeout(Children, Ref, Pid), @@ -297,12 +324,12 @@ timeout(State, idle_timeout) -> 'Connection idle longer than configuration allows.'}). parse(<<>>, State) -> - before_loop(State#state{buffer= <<>>}); + loop(State#state{buffer= <<>>}); %% Do not process requests that come in after the last request %% and discard the buffer if any to save memory. parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{}, last_streamid=LastStreamID}) when InStreamID > LastStreamID -> - before_loop(State#state{buffer= <<>>}); + loop(State#state{buffer= <<>>}); parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) -> after_parse(parse_request(Buffer, State, EmptyLines)); parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) -> @@ -364,17 +391,26 @@ after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer %% No corresponding stream. We must skip the body of the previous request %% in order to process the next one. after_parse({data, _, IsFin, _, State}) -> - before_loop(set_timeout(State, case IsFin of + loop(set_timeout(State, case IsFin of fin -> request_timeout; nofin -> idle_timeout end)); after_parse({more, State}) -> - before_loop(set_timeout(State, idle_timeout)). + loop(set_timeout(State, idle_timeout)). update_flow(fin, _, State) -> + %% This function is only called after parsing, therefore we + %% are expecting to be in active mode already. State#state{flow=infinity}; -update_flow(nofin, Data, State=#state{flow=Flow0}) -> - State#state{flow=Flow0 - byte_size(Data)}. +update_flow(nofin, Data, State0=#state{flow=Flow0}) -> + Flow = Flow0 - byte_size(Data), + State = State0#state{flow=Flow}, + if + Flow0 > 0, Flow =< 0 -> + passive(State); + true -> + State + end. %% Request-line. @@ -935,8 +971,7 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command Stream#stream{queue=Queue ++ Commands}), State#state{streams=Streams}; %% Read the request body. -commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID, - [{flow, Size}|Tail]) -> +commands(State0=#state{flow=Flow0}, StreamID, [{flow, Size}|Tail]) -> %% We must read *at least* Size of data otherwise functions %% like cowboy_req:read_body/1,2 will wait indefinitely. Flow = if @@ -944,11 +979,11 @@ commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID, true -> Flow0 + Size end, %% Reenable active mode if necessary. - _ = if + State = if Flow0 =< 0, Flow > 0 -> - Transport:setopts(Socket, [{active, once}]); + active(State0); true -> - ok + State0 end, commands(State#state{flow=Flow}, StreamID, Tail); %% Error responses are sent only if a response wasn't sent already. @@ -1118,14 +1153,14 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID, [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) -> %% @todo If there's streams opened after this one, fail instead of 101. - State = cancel_timeout(State0), + State1 = cancel_timeout(State0), %% Before we send the 101 response we need to stop receiving data %% from the socket, otherwise the data might be receive before the %% call to flush/0 and we end up inadvertently dropping a packet. %% %% @todo Handle cases where the request came with a body. We need %% to process or skip the body before the upgrade can be completed. - Transport:setopts(Socket, [{active, false}]), + State = passive(State1), %% Send a 101 response if necessary, then terminate the stream. #state{streams=Streams} = case OutState of wait -> info(State, StreamID, {inform, 101, Headers}); @@ -1415,37 +1450,41 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) -> 0 -> ok; infinity -> - terminate_linger_loop(State, undefined); + terminate_linger_before_loop(State, undefined, Transport:messages()); Timeout -> TimerRef = erlang:start_timer(Timeout, self(), linger_timeout), - terminate_linger_loop(State, TimerRef) + terminate_linger_before_loop(State, TimerRef, Transport:messages()) end; {error, _} -> ok end. -terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef) -> - Messages = Transport:messages(), - %% We may already have a message in the mailbox when we do this +terminate_linger_before_loop(State, TimerRef, Messages) -> + %% We may already be in active mode when we do this %% but it's OK because we are shutting down anyway. - case Transport:setopts(Socket, [{active, once}]) of + case setopts_active(State) of ok -> - receive - {OK, Socket, _} when OK =:= element(1, Messages) -> - terminate_linger_loop(State, TimerRef); - {Closed, Socket} when Closed =:= element(2, Messages) -> - ok; - {Error, Socket, _} when Error =:= element(3, Messages) -> - ok; - {timeout, TimerRef, linger_timeout} -> - ok; - _ -> - terminate_linger_loop(State, TimerRef) - end; + terminate_linger_loop(State, TimerRef, Messages); {error, _} -> ok end. +terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) -> + receive + {OK, Socket, _} when OK =:= element(1, Messages) -> + terminate_linger_loop(State, TimerRef, Messages); + {Closed, Socket} when Closed =:= element(2, Messages) -> + ok; + {Error, Socket, _} when Error =:= element(3, Messages) -> + ok; + {Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive -> + terminate_linger_before_loop(State, TimerRef, Messages); + {timeout, TimerRef, linger_timeout} -> + ok; + _ -> + terminate_linger_loop(State, TimerRef, Messages) + end. + %% System callbacks. -spec system_continue(_, _, #state{}) -> ok. diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index c8a4c7a..03ec9f8 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -23,6 +23,7 @@ -export([system_code_change/4]). -type opts() :: #{ + active_n => pos_integer(), compress_buffering => boolean(), compress_threshold => non_neg_integer(), connection_type => worker | supervisor, @@ -163,6 +164,7 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer opts=Opts, peer=Peer, sock=Sock, cert=Cert, http2_status=sequence, http2_machine=HTTP2Machine})), Transport:send(Socket, Preface), + setopts_active(State), case Buffer of <<>> -> loop(State, Buffer); _ -> parse(State, Buffer) @@ -204,15 +206,21 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer }, ?MODULE, undefined}), %% @todo undefined or #{}? State = set_timeout(init_rate_limiting(State2#state{http2_status=sequence})), Transport:send(Socket, Preface), + setopts_active(State), case Buffer of <<>> -> loop(State, Buffer); _ -> parse(State, Buffer) end. +%% Because HTTP/2 has flow control and Cowboy has other rate limiting +%% mechanisms implemented, a very large active_n value should be fine, +%% as long as the stream handlers do their work in a timely manner. +setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> + N = maps:get(active_n, Opts, 100), + Transport:setopts(Socket, [{active, N}]). + loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, timer=TimerRef, children=Children}, Buffer) -> - %% @todo This should only be called when data was read. - Transport:setopts(Socket, [{active, once}]), Messages = Transport:messages(), InactivityTimeout = maps:get(inactivity_timeout, Opts, 300000), receive @@ -223,6 +231,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, terminate(State, {socket_error, closed, 'The socket has been closed.'}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'}); + {Passive, Socket} when Passive =:= element(4, Messages); + %% Hardcoded for compatibility with Ranch 1.x. + Passive =:= tcp_passive; Passive =:= ssl_passive -> + setopts_active(State), + loop(State, Buffer); %% System messages. {'EXIT', Parent, Reason} -> %% @todo Graceful shutdown here as well? diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index 2a50d6a..9f42acc 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -92,6 +92,7 @@ data(StreamID, IsFin, Data, State=#state{read_body_pid=Pid, read_body_ref=Ref, send_request_body(Pid, Ref, IsFin, BodyLen, Data), do_data(StreamID, IsFin, Data, [{flow, byte_size(Data)}], State#state{ read_body_ref=undefined, + %% @todo This is wrong, it's missing byte_size(Data). body_length=BodyLen }); %% Stream is waiting for data but we didn't receive enough to send yet. diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index b3600be..e7d8f31 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -66,6 +66,7 @@ -optional_callbacks([terminate/3]). -type opts() :: #{ + active_n => pos_integer(), compress => boolean(), deflate_opts => cow_ws:deflate_opts(), idle_timeout => timeout(), @@ -85,7 +86,8 @@ handler :: module(), key = undefined :: undefined | binary(), timeout_ref = undefined :: undefined | reference(), - messages = undefined :: undefined | {atom(), atom(), atom()}, + messages = undefined :: undefined | {atom(), atom(), atom()} + | {atom(), atom(), atom(), atom()}, hibernate = false :: boolean(), frag_state = undefined :: cow_ws:frag_state(), frag_buffer = <<>> :: binary(), @@ -300,28 +302,71 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, %% we still want to process that data if any. case erlang:function_exported(Handler, websocket_init, 1) of true -> handler_call(State, HandlerState, #ps_header{buffer=Buffer}, - websocket_init, undefined, fun parse_header/3); - false -> parse_header(State, HandlerState, #ps_header{buffer=Buffer}) + websocket_init, undefined, fun after_init/3); + false -> after_init(State, HandlerState, #ps_header{buffer=Buffer}) end. -before_loop(State=#state{active=false}, HandlerState, ParseState) -> - loop(State, HandlerState, ParseState); -%% @todo We probably shouldn't do the setopts if we have not received a socket message. -%% @todo We need to hibernate when HTTP/2 is used too. -before_loop(State=#state{socket=Stream={Pid, _}, transport=undefined}, - HandlerState, ParseState) -> +after_init(State=#state{active=true}, HandlerState, ParseState) -> + %% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2. + %% We must do this only after calling websocket_init/1 (if any) + %% to give the handler a chance to disable active mode immediately. + setopts_active(State), + maybe_read_body(State), + parse_header(State, HandlerState, ParseState); +after_init(State, HandlerState, ParseState) -> + parse_header(State, HandlerState, ParseState). + +%% We have two ways of reading the body for Websocket. For HTTP/1.1 +%% we have full control of the socket and can therefore use active,N. +%% For HTTP/2 we are just a stream, and are instead using read_body +%% (automatic mode). Technically HTTP/2 will only go passive after +%% receiving the next data message, while HTTP/1.1 goes passive +%% immediately but there might still be data to be processed in +%% the message queue. + +setopts_active(#state{transport=undefined}) -> + ok; +setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> + N = maps:get(active_n, Opts, 100), + Transport:setopts(Socket, [{active, N}]). + +maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) -> %% @todo Keep Ref around. ReadBodyRef = make_ref(), Pid ! {Stream, {read_body, self(), ReadBodyRef, auto, infinity}}, - loop(State, HandlerState, ParseState); -before_loop(State=#state{socket=Socket, transport=Transport, hibernate=true}, - HandlerState, ParseState) -> - Transport:setopts(Socket, [{active, once}]), + ok; +maybe_read_body(_) -> + ok. + +active(State) -> + setopts_active(State), + maybe_read_body(State), + State#state{active=true}. + +passive(State=#state{transport=undefined}) -> + %% Unfortunately we cannot currently cancel read_body. + %% But that's OK, we will just stop reading the body + %% after the next message. + State#state{active=false}; +passive(State=#state{socket=Socket, transport=Transport, messages=Messages}) -> + Transport:setopts(Socket, [{active, false}]), + flush_passive(Socket, Messages), + State#state{active=false}. + +flush_passive(Socket, Messages) -> + receive + {Passive, Socket} when Passive =:= element(4, Messages); + %% Hardcoded for compatibility with Ranch 1.x. + Passive =:= tcp_passive; Passive =:= ssl_passive -> + flush_passive(Socket, Messages) + after 0 -> + ok + end. + +before_loop(State=#state{hibernate=true}, HandlerState, ParseState) -> proc_lib:hibernate(?MODULE, loop, [State#state{hibernate=false}, HandlerState, ParseState]); -before_loop(State=#state{socket=Socket, transport=Transport}, - HandlerState, ParseState) -> - Transport:setopts(Socket, [{active, once}]), +before_loop(State, HandlerState, ParseState) -> loop(State, HandlerState, ParseState). -spec loop_timeout(#state{}) -> #state{}. @@ -350,22 +395,28 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages, terminate(State, HandlerState, {error, closed}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> terminate(State, HandlerState, {error, Reason}); + {Passive, Socket} when Passive =:= element(4, Messages); + %% Hardcoded for compatibility with Ranch 1.x. + Passive =:= tcp_passive; Passive =:= ssl_passive -> + setopts_active(State), + loop(State, HandlerState, ParseState); %% Body reading messages. (HTTP/2) {request_body, _Ref, nofin, Data} -> + maybe_read_body(State), State2 = loop_timeout(State), parse(State2, HandlerState, ParseState, Data); %% @todo We need to handle this case as if it was an {error, closed} %% but not before we finish processing frames. We probably should have %% a check in before_loop to let us stop looping if a flag is set. {request_body, _Ref, fin, _, Data} -> + maybe_read_body(State), State2 = loop_timeout(State), parse(State2, HandlerState, ParseState, Data); %% Timeouts. {timeout, TRef, ?MODULE} -> websocket_close(State, HandlerState, timeout); {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> - %% @todo This should call before_loop. - loop(State, HandlerState, ParseState); + before_loop(State, HandlerState, ParseState); %% System messages. {'EXIT', Parent, Reason} -> %% @todo We should exit gracefully. @@ -376,8 +427,7 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages, %% Calls from supervisor module. {'$gen_call', From, Call} -> cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE), - %% @todo This should call before_loop. - loop(State, HandlerState, ParseState); + before_loop(State, HandlerState, ParseState); Message -> handler_call(State, HandlerState, ParseState, websocket_info, Message, fun before_loop/3) @@ -531,7 +581,15 @@ commands([], State, []) -> commands([], State, Data) -> Result = transport_send(State, nofin, lists:reverse(Data)), {Result, State}; -commands([{active, Active}|Tail], State, Data) when is_boolean(Active) -> +commands([{active, Active}|Tail], State0=#state{active=Active0}, Data) when is_boolean(Active) -> + State = if + Active, not Active0 -> + active(State0); + Active0, not Active -> + passive(State0); + true -> + State0 + end, commands(Tail, State#state{active=Active}, Data); commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) -> commands(Tail, State#state{deflate=Deflate}, Data); -- cgit v1.2.3