diff options
Diffstat (limited to 'src/cowboy_http.erl')
-rw-r--r-- | src/cowboy_http.erl | 117 |
1 files changed, 78 insertions, 39 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index 90e3203..ad2ef75 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -21,6 +21,7 @@ -export([system_code_change/4]). -type opts() :: #{ + active_n => pos_integer(), chunked => boolean(), compress_buffering => boolean(), compress_threshold => non_neg_integer(), @@ -121,6 +122,9 @@ timer = undefined :: undefined | reference(), + %% Whether we are currently receiving data from the socket. + active = true :: boolean(), + %% Identifier for the stream currently being read (or waiting to be received). in_streamid = 1 :: pos_integer(), @@ -173,7 +177,8 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, last_streamid=maps:get(max_keepalive, Opts, 100)}, - before_loop(set_timeout(State, request_timeout)); + setopts_active(State), + loop(set_timeout(State, request_timeout)); {{error, Reason}, _, _} -> terminate(undefined, {socket_error, Reason, 'A socket error occurred when retrieving the peer name.'}); @@ -185,12 +190,29 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> 'A socket error occurred when retrieving the client TLS certificate.'}) end. -%% Do not read from the socket unless flow is large enough. -before_loop(State=#state{flow=Flow}) when Flow =< 0 -> - loop(State); -before_loop(State=#state{socket=Socket, transport=Transport}) -> - Transport:setopts(Socket, [{active, once}]), - loop(State). +setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> + N = maps:get(active_n, Opts, 100), + Transport:setopts(Socket, [{active, N}]). + +active(State) -> + setopts_active(State), + State#state{active=true}. + +passive(State=#state{socket=Socket, transport=Transport}) -> + Transport:setopts(Socket, [{active, false}]), + Messages = Transport:messages(), + flush_passive(Socket, Messages), + State#state{active=false}. + +flush_passive(Socket, Messages) -> + receive + {Passive, Socket} when Passive =:= element(4, Messages); + %% Hardcoded for compatibility with Ranch 1.x. + Passive =:= tcp_passive; Passive =:= ssl_passive -> + flush_passive(Socket, Messages) + after 0 -> + ok + end. loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID, @@ -201,7 +223,7 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, %% Discard data coming in after the last request %% we want to process was received fully. {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID -> - before_loop(State); + loop(State); %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> parse(<< Buffer/binary, Data/binary >>, State); @@ -209,6 +231,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, terminate(State, {socket_error, closed, 'The socket has been closed.'}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'}); + {Passive, Socket} when Passive =:= element(4, Messages); + %% Hardcoded for compatibility with Ranch 1.x. + Passive =:= tcp_passive; Passive =:= ssl_passive -> + setopts_active(State), + loop(State); %% Timeouts. {timeout, Ref, {shutdown, Pid}} -> cowboy_children:shutdown_timeout(Children, Ref, Pid), @@ -297,12 +324,12 @@ timeout(State, idle_timeout) -> 'Connection idle longer than configuration allows.'}). parse(<<>>, State) -> - before_loop(State#state{buffer= <<>>}); + loop(State#state{buffer= <<>>}); %% Do not process requests that come in after the last request %% and discard the buffer if any to save memory. parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{}, last_streamid=LastStreamID}) when InStreamID > LastStreamID -> - before_loop(State#state{buffer= <<>>}); + loop(State#state{buffer= <<>>}); parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) -> after_parse(parse_request(Buffer, State, EmptyLines)); parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) -> @@ -364,17 +391,26 @@ after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer %% No corresponding stream. We must skip the body of the previous request %% in order to process the next one. after_parse({data, _, IsFin, _, State}) -> - before_loop(set_timeout(State, case IsFin of + loop(set_timeout(State, case IsFin of fin -> request_timeout; nofin -> idle_timeout end)); after_parse({more, State}) -> - before_loop(set_timeout(State, idle_timeout)). + loop(set_timeout(State, idle_timeout)). update_flow(fin, _, State) -> + %% This function is only called after parsing, therefore we + %% are expecting to be in active mode already. State#state{flow=infinity}; -update_flow(nofin, Data, State=#state{flow=Flow0}) -> - State#state{flow=Flow0 - byte_size(Data)}. +update_flow(nofin, Data, State0=#state{flow=Flow0}) -> + Flow = Flow0 - byte_size(Data), + State = State0#state{flow=Flow}, + if + Flow0 > 0, Flow =< 0 -> + passive(State); + true -> + State + end. %% Request-line. @@ -935,8 +971,7 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command Stream#stream{queue=Queue ++ Commands}), State#state{streams=Streams}; %% Read the request body. -commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID, - [{flow, Size}|Tail]) -> +commands(State0=#state{flow=Flow0}, StreamID, [{flow, Size}|Tail]) -> %% We must read *at least* Size of data otherwise functions %% like cowboy_req:read_body/1,2 will wait indefinitely. Flow = if @@ -944,11 +979,11 @@ commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID, true -> Flow0 + Size end, %% Reenable active mode if necessary. - _ = if + State = if Flow0 =< 0, Flow > 0 -> - Transport:setopts(Socket, [{active, once}]); + active(State0); true -> - ok + State0 end, commands(State#state{flow=Flow}, StreamID, Tail); %% Error responses are sent only if a response wasn't sent already. @@ -1118,14 +1153,14 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID, [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) -> %% @todo If there's streams opened after this one, fail instead of 101. - State = cancel_timeout(State0), + State1 = cancel_timeout(State0), %% Before we send the 101 response we need to stop receiving data %% from the socket, otherwise the data might be receive before the %% call to flush/0 and we end up inadvertently dropping a packet. %% %% @todo Handle cases where the request came with a body. We need %% to process or skip the body before the upgrade can be completed. - Transport:setopts(Socket, [{active, false}]), + State = passive(State1), %% Send a 101 response if necessary, then terminate the stream. #state{streams=Streams} = case OutState of wait -> info(State, StreamID, {inform, 101, Headers}); @@ -1415,37 +1450,41 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) -> 0 -> ok; infinity -> - terminate_linger_loop(State, undefined); + terminate_linger_before_loop(State, undefined, Transport:messages()); Timeout -> TimerRef = erlang:start_timer(Timeout, self(), linger_timeout), - terminate_linger_loop(State, TimerRef) + terminate_linger_before_loop(State, TimerRef, Transport:messages()) end; {error, _} -> ok end. -terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef) -> - Messages = Transport:messages(), - %% We may already have a message in the mailbox when we do this +terminate_linger_before_loop(State, TimerRef, Messages) -> + %% We may already be in active mode when we do this %% but it's OK because we are shutting down anyway. - case Transport:setopts(Socket, [{active, once}]) of + case setopts_active(State) of ok -> - receive - {OK, Socket, _} when OK =:= element(1, Messages) -> - terminate_linger_loop(State, TimerRef); - {Closed, Socket} when Closed =:= element(2, Messages) -> - ok; - {Error, Socket, _} when Error =:= element(3, Messages) -> - ok; - {timeout, TimerRef, linger_timeout} -> - ok; - _ -> - terminate_linger_loop(State, TimerRef) - end; + terminate_linger_loop(State, TimerRef, Messages); {error, _} -> ok end. +terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) -> + receive + {OK, Socket, _} when OK =:= element(1, Messages) -> + terminate_linger_loop(State, TimerRef, Messages); + {Closed, Socket} when Closed =:= element(2, Messages) -> + ok; + {Error, Socket, _} when Error =:= element(3, Messages) -> + ok; + {Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive -> + terminate_linger_before_loop(State, TimerRef, Messages); + {timeout, TimerRef, linger_timeout} -> + ok; + _ -> + terminate_linger_loop(State, TimerRef, Messages) + end. + %% System callbacks. -spec system_continue(_, _, #state{}) -> ok. |