aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_http.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-12-04 11:17:34 +0100
committerLoïc Hoguin <[email protected]>2020-01-06 12:58:14 +0100
commitdb0d6f8d254f2cc01bd458dc41969e0b96991cc3 (patch)
tree5d77236bc703223fcb36e45cbf3de72de1763d50 /src/cowboy_http.erl
parent592029070dea7c1f7b85d465e250ef6842e1a46b (diff)
downloadcowboy-db0d6f8d254f2cc01bd458dc41969e0b96991cc3.tar.gz
cowboy-db0d6f8d254f2cc01bd458dc41969e0b96991cc3.tar.bz2
cowboy-db0d6f8d254f2cc01bd458dc41969e0b96991cc3.zip
Use active,N
This reduces the number of times we need to ask for more packets, and as a result we get a fairly large boost in performance, especially with HTTP/1.1. Unfortunately this makes Cowboy require at least Erlang/OTP 21.3+ because the ssl application did not have active,N. For simplicity the version required will be Erlang/OTP 22+. In addition this change improves hibernate handling in cowboy_websocket. Hibernate will now work for HTTP/2 transport as well, and stray or unrelated messages will no longer cancel hibernate (the process will handle the message and go back into hibernation). Thanks go to Stressgrid for benchmarking an early version of this commit: https://stressgrid.com/blog/cowboy_performance_part_2/
Diffstat (limited to 'src/cowboy_http.erl')
-rw-r--r--src/cowboy_http.erl117
1 files changed, 78 insertions, 39 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index 90e3203..ad2ef75 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -21,6 +21,7 @@
-export([system_code_change/4]).
-type opts() :: #{
+ active_n => pos_integer(),
chunked => boolean(),
compress_buffering => boolean(),
compress_threshold => non_neg_integer(),
@@ -121,6 +122,9 @@
timer = undefined :: undefined | reference(),
+ %% Whether we are currently receiving data from the socket.
+ active = true :: boolean(),
+
%% Identifier for the stream currently being read (or waiting to be received).
in_streamid = 1 :: pos_integer(),
@@ -173,7 +177,8 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
transport=Transport, proxy_header=ProxyHeader, opts=Opts,
peer=Peer, sock=Sock, cert=Cert,
last_streamid=maps:get(max_keepalive, Opts, 100)},
- before_loop(set_timeout(State, request_timeout));
+ setopts_active(State),
+ loop(set_timeout(State, request_timeout));
{{error, Reason}, _, _} ->
terminate(undefined, {socket_error, Reason,
'A socket error occurred when retrieving the peer name.'});
@@ -185,12 +190,29 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
'A socket error occurred when retrieving the client TLS certificate.'})
end.
-%% Do not read from the socket unless flow is large enough.
-before_loop(State=#state{flow=Flow}) when Flow =< 0 ->
- loop(State);
-before_loop(State=#state{socket=Socket, transport=Transport}) ->
- Transport:setopts(Socket, [{active, once}]),
- loop(State).
+setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
+ N = maps:get(active_n, Opts, 100),
+ Transport:setopts(Socket, [{active, N}]).
+
+active(State) ->
+ setopts_active(State),
+ State#state{active=true}.
+
+passive(State=#state{socket=Socket, transport=Transport}) ->
+ Transport:setopts(Socket, [{active, false}]),
+ Messages = Transport:messages(),
+ flush_passive(Socket, Messages),
+ State#state{active=false}.
+
+flush_passive(Socket, Messages) ->
+ receive
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ flush_passive(Socket, Messages)
+ after 0 ->
+ ok
+ end.
loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
@@ -201,7 +223,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
%% Discard data coming in after the last request
%% we want to process was received fully.
{OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
- before_loop(State);
+ loop(State);
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
parse(<< Buffer/binary, Data/binary >>, State);
@@ -209,6 +231,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'});
+ {Passive, Socket} when Passive =:= element(4, Messages);
+ %% Hardcoded for compatibility with Ranch 1.x.
+ Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ setopts_active(State),
+ loop(State);
%% Timeouts.
{timeout, Ref, {shutdown, Pid}} ->
cowboy_children:shutdown_timeout(Children, Ref, Pid),
@@ -297,12 +324,12 @@ timeout(State, idle_timeout) ->
'Connection idle longer than configuration allows.'}).
parse(<<>>, State) ->
- before_loop(State#state{buffer= <<>>});
+ loop(State#state{buffer= <<>>});
%% Do not process requests that come in after the last request
%% and discard the buffer if any to save memory.
parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{},
last_streamid=LastStreamID}) when InStreamID > LastStreamID ->
- before_loop(State#state{buffer= <<>>});
+ loop(State#state{buffer= <<>>});
parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
after_parse(parse_request(Buffer, State, EmptyLines));
parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
@@ -364,17 +391,26 @@ after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer
%% No corresponding stream. We must skip the body of the previous request
%% in order to process the next one.
after_parse({data, _, IsFin, _, State}) ->
- before_loop(set_timeout(State, case IsFin of
+ loop(set_timeout(State, case IsFin of
fin -> request_timeout;
nofin -> idle_timeout
end));
after_parse({more, State}) ->
- before_loop(set_timeout(State, idle_timeout)).
+ loop(set_timeout(State, idle_timeout)).
update_flow(fin, _, State) ->
+ %% This function is only called after parsing, therefore we
+ %% are expecting to be in active mode already.
State#state{flow=infinity};
-update_flow(nofin, Data, State=#state{flow=Flow0}) ->
- State#state{flow=Flow0 - byte_size(Data)}.
+update_flow(nofin, Data, State0=#state{flow=Flow0}) ->
+ Flow = Flow0 - byte_size(Data),
+ State = State0#state{flow=Flow},
+ if
+ Flow0 > 0, Flow =< 0 ->
+ passive(State);
+ true ->
+ State
+ end.
%% Request-line.
@@ -935,8 +971,7 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command
Stream#stream{queue=Queue ++ Commands}),
State#state{streams=Streams};
%% Read the request body.
-commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID,
- [{flow, Size}|Tail]) ->
+commands(State0=#state{flow=Flow0}, StreamID, [{flow, Size}|Tail]) ->
%% We must read *at least* Size of data otherwise functions
%% like cowboy_req:read_body/1,2 will wait indefinitely.
Flow = if
@@ -944,11 +979,11 @@ commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID,
true -> Flow0 + Size
end,
%% Reenable active mode if necessary.
- _ = if
+ State = if
Flow0 =< 0, Flow > 0 ->
- Transport:setopts(Socket, [{active, once}]);
+ active(State0);
true ->
- ok
+ State0
end,
commands(State#state{flow=Flow}, StreamID, Tail);
%% Error responses are sent only if a response wasn't sent already.
@@ -1118,14 +1153,14 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
%% @todo If there's streams opened after this one, fail instead of 101.
- State = cancel_timeout(State0),
+ State1 = cancel_timeout(State0),
%% Before we send the 101 response we need to stop receiving data
%% from the socket, otherwise the data might be receive before the
%% call to flush/0 and we end up inadvertently dropping a packet.
%%
%% @todo Handle cases where the request came with a body. We need
%% to process or skip the body before the upgrade can be completed.
- Transport:setopts(Socket, [{active, false}]),
+ State = passive(State1),
%% Send a 101 response if necessary, then terminate the stream.
#state{streams=Streams} = case OutState of
wait -> info(State, StreamID, {inform, 101, Headers});
@@ -1415,37 +1450,41 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
0 ->
ok;
infinity ->
- terminate_linger_loop(State, undefined);
+ terminate_linger_before_loop(State, undefined, Transport:messages());
Timeout ->
TimerRef = erlang:start_timer(Timeout, self(), linger_timeout),
- terminate_linger_loop(State, TimerRef)
+ terminate_linger_before_loop(State, TimerRef, Transport:messages())
end;
{error, _} ->
ok
end.
-terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef) ->
- Messages = Transport:messages(),
- %% We may already have a message in the mailbox when we do this
+terminate_linger_before_loop(State, TimerRef, Messages) ->
+ %% We may already be in active mode when we do this
%% but it's OK because we are shutting down anyway.
- case Transport:setopts(Socket, [{active, once}]) of
+ case setopts_active(State) of
ok ->
- receive
- {OK, Socket, _} when OK =:= element(1, Messages) ->
- terminate_linger_loop(State, TimerRef);
- {Closed, Socket} when Closed =:= element(2, Messages) ->
- ok;
- {Error, Socket, _} when Error =:= element(3, Messages) ->
- ok;
- {timeout, TimerRef, linger_timeout} ->
- ok;
- _ ->
- terminate_linger_loop(State, TimerRef)
- end;
+ terminate_linger_loop(State, TimerRef, Messages);
{error, _} ->
ok
end.
+terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) ->
+ receive
+ {OK, Socket, _} when OK =:= element(1, Messages) ->
+ terminate_linger_loop(State, TimerRef, Messages);
+ {Closed, Socket} when Closed =:= element(2, Messages) ->
+ ok;
+ {Error, Socket, _} when Error =:= element(3, Messages) ->
+ ok;
+ {Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive ->
+ terminate_linger_before_loop(State, TimerRef, Messages);
+ {timeout, TimerRef, linger_timeout} ->
+ ok;
+ _ ->
+ terminate_linger_loop(State, TimerRef, Messages)
+ end.
+
%% System callbacks.
-spec system_continue(_, _, #state{}) -> ok.