From db0d6f8d254f2cc01bd458dc41969e0b96991cc3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 4 Dec 2019 11:17:34 +0100 Subject: Use active,N This reduces the number of times we need to ask for more packets, and as a result we get a fairly large boost in performance, especially with HTTP/1.1. Unfortunately this makes Cowboy require at least Erlang/OTP 21.3+ because the ssl application did not have active,N. For simplicity the version required will be Erlang/OTP 22+. In addition this change improves hibernate handling in cowboy_websocket. Hibernate will now work for HTTP/2 transport as well, and stray or unrelated messages will no longer cancel hibernate (the process will handle the message and go back into hibernation). Thanks go to Stressgrid for benchmarking an early version of this commit: https://stressgrid.com/blog/cowboy_performance_part_2/ --- src/cowboy_http.erl | 117 ++++++++++++++++++++++++++++++++++------------------ 1 file changed, 78 insertions(+), 39 deletions(-) (limited to 'src/cowboy_http.erl') diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 90e3203..ad2ef75 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -21,6 +21,7 @@ -export([system_code_change/4]). -type opts() :: #{ + active_n => pos_integer(), chunked => boolean(), compress_buffering => boolean(), compress_threshold => non_neg_integer(), @@ -121,6 +122,9 @@ timer = undefined :: undefined | reference(), + %% Whether we are currently receiving data from the socket. + active = true :: boolean(), + %% Identifier for the stream currently being read (or waiting to be received). in_streamid = 1 :: pos_integer(), @@ -173,7 +177,8 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, last_streamid=maps:get(max_keepalive, Opts, 100)}, - before_loop(set_timeout(State, request_timeout)); + setopts_active(State), + loop(set_timeout(State, request_timeout)); {{error, Reason}, _, _} -> terminate(undefined, {socket_error, Reason, 'A socket error occurred when retrieving the peer name.'}); @@ -185,12 +190,29 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> 'A socket error occurred when retrieving the client TLS certificate.'}) end. -%% Do not read from the socket unless flow is large enough. -before_loop(State=#state{flow=Flow}) when Flow =< 0 -> - loop(State); -before_loop(State=#state{socket=Socket, transport=Transport}) -> - Transport:setopts(Socket, [{active, once}]), - loop(State). +setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> + N = maps:get(active_n, Opts, 100), + Transport:setopts(Socket, [{active, N}]). + +active(State) -> + setopts_active(State), + State#state{active=true}. + +passive(State=#state{socket=Socket, transport=Transport}) -> + Transport:setopts(Socket, [{active, false}]), + Messages = Transport:messages(), + flush_passive(Socket, Messages), + State#state{active=false}. + +flush_passive(Socket, Messages) -> + receive + {Passive, Socket} when Passive =:= element(4, Messages); + %% Hardcoded for compatibility with Ranch 1.x. + Passive =:= tcp_passive; Passive =:= ssl_passive -> + flush_passive(Socket, Messages) + after 0 -> + ok + end. loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID, @@ -201,7 +223,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, %% Discard data coming in after the last request %% we want to process was received fully. {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID -> - before_loop(State); + loop(State); %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> parse(<< Buffer/binary, Data/binary >>, State); @@ -209,6 +231,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, terminate(State, {socket_error, closed, 'The socket has been closed.'}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'}); + {Passive, Socket} when Passive =:= element(4, Messages); + %% Hardcoded for compatibility with Ranch 1.x. + Passive =:= tcp_passive; Passive =:= ssl_passive -> + setopts_active(State), + loop(State); %% Timeouts. {timeout, Ref, {shutdown, Pid}} -> cowboy_children:shutdown_timeout(Children, Ref, Pid), @@ -297,12 +324,12 @@ timeout(State, idle_timeout) -> 'Connection idle longer than configuration allows.'}). parse(<<>>, State) -> - before_loop(State#state{buffer= <<>>}); + loop(State#state{buffer= <<>>}); %% Do not process requests that come in after the last request %% and discard the buffer if any to save memory. parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{}, last_streamid=LastStreamID}) when InStreamID > LastStreamID -> - before_loop(State#state{buffer= <<>>}); + loop(State#state{buffer= <<>>}); parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) -> after_parse(parse_request(Buffer, State, EmptyLines)); parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) -> @@ -364,17 +391,26 @@ after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer %% No corresponding stream. We must skip the body of the previous request %% in order to process the next one. after_parse({data, _, IsFin, _, State}) -> - before_loop(set_timeout(State, case IsFin of + loop(set_timeout(State, case IsFin of fin -> request_timeout; nofin -> idle_timeout end)); after_parse({more, State}) -> - before_loop(set_timeout(State, idle_timeout)). + loop(set_timeout(State, idle_timeout)). update_flow(fin, _, State) -> + %% This function is only called after parsing, therefore we + %% are expecting to be in active mode already. State#state{flow=infinity}; -update_flow(nofin, Data, State=#state{flow=Flow0}) -> - State#state{flow=Flow0 - byte_size(Data)}. +update_flow(nofin, Data, State0=#state{flow=Flow0}) -> + Flow = Flow0 - byte_size(Data), + State = State0#state{flow=Flow}, + if + Flow0 > 0, Flow =< 0 -> + passive(State); + true -> + State + end. %% Request-line. @@ -935,8 +971,7 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command Stream#stream{queue=Queue ++ Commands}), State#state{streams=Streams}; %% Read the request body. -commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID, - [{flow, Size}|Tail]) -> +commands(State0=#state{flow=Flow0}, StreamID, [{flow, Size}|Tail]) -> %% We must read *at least* Size of data otherwise functions %% like cowboy_req:read_body/1,2 will wait indefinitely. Flow = if @@ -944,11 +979,11 @@ commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID, true -> Flow0 + Size end, %% Reenable active mode if necessary. - _ = if + State = if Flow0 =< 0, Flow > 0 -> - Transport:setopts(Socket, [{active, once}]); + active(State0); true -> - ok + State0 end, commands(State#state{flow=Flow}, StreamID, Tail); %% Error responses are sent only if a response wasn't sent already. @@ -1118,14 +1153,14 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID, [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) -> %% @todo If there's streams opened after this one, fail instead of 101. - State = cancel_timeout(State0), + State1 = cancel_timeout(State0), %% Before we send the 101 response we need to stop receiving data %% from the socket, otherwise the data might be receive before the %% call to flush/0 and we end up inadvertently dropping a packet. %% %% @todo Handle cases where the request came with a body. We need %% to process or skip the body before the upgrade can be completed. - Transport:setopts(Socket, [{active, false}]), + State = passive(State1), %% Send a 101 response if necessary, then terminate the stream. #state{streams=Streams} = case OutState of wait -> info(State, StreamID, {inform, 101, Headers}); @@ -1415,37 +1450,41 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) -> 0 -> ok; infinity -> - terminate_linger_loop(State, undefined); + terminate_linger_before_loop(State, undefined, Transport:messages()); Timeout -> TimerRef = erlang:start_timer(Timeout, self(), linger_timeout), - terminate_linger_loop(State, TimerRef) + terminate_linger_before_loop(State, TimerRef, Transport:messages()) end; {error, _} -> ok end. -terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef) -> - Messages = Transport:messages(), - %% We may already have a message in the mailbox when we do this +terminate_linger_before_loop(State, TimerRef, Messages) -> + %% We may already be in active mode when we do this %% but it's OK because we are shutting down anyway. - case Transport:setopts(Socket, [{active, once}]) of + case setopts_active(State) of ok -> - receive - {OK, Socket, _} when OK =:= element(1, Messages) -> - terminate_linger_loop(State, TimerRef); - {Closed, Socket} when Closed =:= element(2, Messages) -> - ok; - {Error, Socket, _} when Error =:= element(3, Messages) -> - ok; - {timeout, TimerRef, linger_timeout} -> - ok; - _ -> - terminate_linger_loop(State, TimerRef) - end; + terminate_linger_loop(State, TimerRef, Messages); {error, _} -> ok end. +terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) -> + receive + {OK, Socket, _} when OK =:= element(1, Messages) -> + terminate_linger_loop(State, TimerRef, Messages); + {Closed, Socket} when Closed =:= element(2, Messages) -> + ok; + {Error, Socket, _} when Error =:= element(3, Messages) -> + ok; + {Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive -> + terminate_linger_before_loop(State, TimerRef, Messages); + {timeout, TimerRef, linger_timeout} -> + ok; + _ -> + terminate_linger_loop(State, TimerRef, Messages) + end. + %% System callbacks. -spec system_continue(_, _, #state{}) -> ok. -- cgit v1.2.3