From cd7870df151a8686c6d204f9972f9032494c44d4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 4 Dec 2019 11:17:34 +0100 Subject: Experiment with {active,N} --- Makefile | 4 ++-- src/cowboy_http.erl | 34 ++++++++++++++++++++++++---------- 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/Makefile b/Makefile index bd4ffa0..02df0e7 100644 --- a/Makefile +++ b/Makefile @@ -29,10 +29,10 @@ dep_gun = git https://github.com/ninenines/gun master dep_ci.erlang.mk = git https://github.com/ninenines/ci.erlang.mk master DEP_EARLY_PLUGINS = ci.erlang.mk -AUTO_CI_OTP ?= OTP-LATEST-20+ +AUTO_CI_OTP ?= OTP-LATEST-22+ AUTO_CI_HIPE ?= OTP-LATEST # AUTO_CI_ERLLVM ?= OTP-LATEST -AUTO_CI_WINDOWS ?= OTP-LATEST-20+ +AUTO_CI_WINDOWS ?= OTP-LATEST-22+ # Standard targets. diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index f128a44..7cbecef 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -125,6 +125,9 @@ timer = undefined :: undefined | reference(), + %% Whether we are currently receiving data from the socket. + active = false :: boolean(), + %% Identifier for the stream currently being read (or waiting to be received). in_streamid = 1 :: pos_integer(), @@ -189,12 +192,21 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> 'A socket error occurred when retrieving the client TLS certificate.'}) end. +active(State=#state{socket=Socket, transport=Transport}) -> + Transport:setopts(Socket, [{active, 100}]), + State#state{active=true}. + +passive(State=#state{socket=Socket, transport=Transport}) -> + Transport:setopts(Socket, [{active, false}]), + State#state{active=false}. + +before_loop(State=#state{active=true}) -> + loop(State); %% Do not read from the socket unless flow is large enough. before_loop(State=#state{flow=Flow}) when Flow =< 0 -> loop(State); -before_loop(State=#state{socket=Socket, transport=Transport}) -> - Transport:setopts(Socket, [{active, once}]), - loop(State). +before_loop(State) -> + loop(active(State)). loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID, @@ -213,6 +225,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, terminate(State, {socket_error, closed, 'The socket has been closed.'}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'}); + {Passive, Socket} when Passive =:= tcp_passive; Passive =:= ssl_passive -> + loop(active(State)); %% Timeouts. {timeout, Ref, {shutdown, Pid}} -> cowboy_children:shutdown_timeout(Children, Ref, Pid), @@ -939,8 +953,7 @@ commands(State=#state{out_streamid=Current, streams=Streams0}, StreamID, Command Stream#stream{queue=Queue ++ Commands}), State#state{streams=Streams}; %% Read the request body. -commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID, - [{flow, Size}|Tail]) -> +commands(State0=#state{flow=Flow0}, StreamID, [{flow, Size}|Tail]) -> %% We must read *at least* Size of data otherwise functions %% like cowboy_req:read_body/1,2 will wait indefinitely. Flow = if @@ -948,11 +961,11 @@ commands(State=#state{socket=Socket, transport=Transport, flow=Flow0}, StreamID, true -> Flow0 + Size end, %% Reenable active mode if necessary. - _ = if + State = if Flow0 =< 0, Flow > 0 -> - Transport:setopts(Socket, [{active, once}]); + active(State0); true -> - ok + State0 end, commands(State#state{flow=Flow}, StreamID, Tail); %% Error responses are sent only if a response wasn't sent already. @@ -1122,14 +1135,14 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID, [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) -> %% @todo If there's streams opened after this one, fail instead of 101. - State = cancel_timeout(State0), + State1 = cancel_timeout(State0), %% Before we send the 101 response we need to stop receiving data %% from the socket, otherwise the data might be receive before the %% call to flush/0 and we end up inadvertently dropping a packet. %% %% @todo Handle cases where the request came with a body. We need %% to process or skip the body before the upgrade can be completed. - Transport:setopts(Socket, [{active, false}]), + State = passive(State1), %% Send a 101 response if necessary, then terminate the stream. #state{streams=Streams} = case OutState of wait -> info(State, StreamID, {inform, 101, Headers}); @@ -1432,6 +1445,7 @@ terminate_linger_loop(State=#state{socket=Socket, transport=Transport}, TimerRef Messages = Transport:messages(), %% We may already have a message in the mailbox when we do this %% but it's OK because we are shutting down anyway. + %% @todo Use active,N here as well. case Transport:setopts(Socket, [{active, once}]) of ok -> receive -- cgit v1.2.3