aboutsummaryrefslogtreecommitdiffstats
path: root/src/cowboy_http.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/cowboy_http.erl')
-rw-r--r--src/cowboy_http.erl475
1 files changed, 302 insertions, 173 deletions
diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl
index c9bceed..10eb519 100644
--- a/src/cowboy_http.erl
+++ b/src/cowboy_http.erl
@@ -1,4 +1,4 @@
-%% Copyright (c) 2016-2017, Loïc Hoguin <[email protected]>
+%% Copyright (c) Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
@@ -12,9 +12,12 @@
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+%% @todo Worth renaming to cowboy_http1.
+%% @todo Change use of cow_http to cow_http1 where appropriate.
-module(cowboy_http).
-export([init/6]).
+-export([loop/1]).
-export([system_continue/3]).
-export([system_terminate/4]).
@@ -22,11 +25,16 @@
-type opts() :: #{
active_n => pos_integer(),
+ alpn_default_protocol => http | http2,
chunked => boolean(),
compress_buffering => boolean(),
compress_threshold => non_neg_integer(),
connection_type => worker | supervisor,
+ dynamic_buffer => false | {pos_integer(), pos_integer()},
+ dynamic_buffer_initial_average => non_neg_integer(),
+ dynamic_buffer_initial_size => pos_integer(),
env => cowboy_middleware:env(),
+ hibernate => boolean(),
http10_keepalive => boolean(),
idle_timeout => timeout(),
inactivity_timeout => timeout(),
@@ -45,8 +53,10 @@
metrics_req_filter => fun((cowboy_req:req()) -> map()),
metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()),
middlewares => [module()],
+ protocols => [http | http2],
proxy_header => boolean(),
request_timeout => timeout(),
+ reset_idle_timeout_on_send => boolean(),
sendfile => boolean(),
shutdown_timeout => timeout(),
stream_handlers => [module()],
@@ -134,6 +144,10 @@
%% Flow requested for the current stream.
flow = infinity :: non_neg_integer() | infinity,
+ %% Dynamic buffer moving average and current buffer size.
+ dynamic_buffer_size :: pos_integer() | false,
+ dynamic_buffer_moving_average :: non_neg_integer(),
+
%% Identifier for the stream currently being written.
%% Note that out_streamid =< in_streamid.
out_streamid = 1 :: pos_integer(),
@@ -157,9 +171,11 @@
-spec init(pid(), ranch:ref(), inet:socket(), module(),
ranch_proxy_header:proxy_info(), cowboy:opts()) -> ok.
init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
- Peer0 = Transport:peername(Socket),
- Sock0 = Transport:sockname(Socket),
- Cert1 = case Transport:name() of
+ {ok, Peer} = maybe_socket_error(undefined, Transport:peername(Socket),
+ 'A socket error occurred when retrieving the peer name.'),
+ {ok, Sock} = maybe_socket_error(undefined, Transport:sockname(Socket),
+ 'A socket error occurred when retrieving the sock name.'),
+ CertResult = case Transport:name() of
ssl ->
case ssl:peercert(Socket) of
{error, no_peercert} ->
@@ -170,36 +186,33 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) ->
_ ->
{ok, undefined}
end,
- case {Peer0, Sock0, Cert1} of
- {{ok, Peer}, {ok, Sock}, {ok, Cert}} ->
- State = #state{
- parent=Parent, ref=Ref, socket=Socket,
- transport=Transport, proxy_header=ProxyHeader, opts=Opts,
- peer=Peer, sock=Sock, cert=Cert,
- last_streamid=maps:get(max_keepalive, Opts, 1000)},
- setopts_active(State),
- loop(set_timeout(State, request_timeout));
- {{error, Reason}, _, _} ->
- terminate(undefined, {socket_error, Reason,
- 'A socket error occurred when retrieving the peer name.'});
- {_, {error, Reason}, _} ->
- terminate(undefined, {socket_error, Reason,
- 'A socket error occurred when retrieving the sock name.'});
- {_, _, {error, Reason}} ->
- terminate(undefined, {socket_error, Reason,
- 'A socket error occurred when retrieving the client TLS certificate.'})
- end.
+ {ok, Cert} = maybe_socket_error(undefined, CertResult,
+ 'A socket error occurred when retrieving the client TLS certificate.'),
+ State = #state{
+ parent=Parent, ref=Ref, socket=Socket,
+ transport=Transport, proxy_header=ProxyHeader, opts=Opts,
+ peer=Peer, sock=Sock, cert=Cert,
+ dynamic_buffer_size=init_dynamic_buffer_size(Opts),
+ dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0),
+ last_streamid=maps:get(max_keepalive, Opts, 1000)},
+ safe_setopts_active(State),
+ before_loop(set_timeout(State, request_timeout)).
+
+-include("cowboy_dynamic_buffer.hrl").
setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) ->
- N = maps:get(active_n, Opts, 100),
+ N = maps:get(active_n, Opts, 1),
Transport:setopts(Socket, [{active, N}]).
+safe_setopts_active(State) ->
+ ok = maybe_socket_error(State, setopts_active(State)).
+
active(State) ->
- setopts_active(State),
+ safe_setopts_active(State),
State#state{active=true}.
passive(State=#state{socket=Socket, transport=Transport}) ->
- Transport:setopts(Socket, [{active, false}]),
+ ok = maybe_socket_error(State, Transport:setopts(Socket, [{active, false}])),
Messages = Transport:messages(),
flush_passive(Socket, Messages),
State#state{active=false}.
@@ -214,6 +227,13 @@ flush_passive(Socket, Messages) ->
ok
end.
+before_loop(State=#state{opts=#{hibernate := true}}) ->
+ proc_lib:hibernate(?MODULE, loop, [State]);
+before_loop(State) ->
+ loop(State).
+
+-spec loop(#state{}) -> ok.
+
loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID,
last_streamid=LastStreamID}) ->
@@ -222,11 +242,13 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
receive
%% Discard data coming in after the last request
%% we want to process was received fully.
- {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
- loop(State);
+ {OK, Socket, Data} when OK =:= element(1, Messages), InStreamID > LastStreamID ->
+ State1 = maybe_resize_buffer(State, Data),
+ before_loop(State1);
%% Socket messages.
{OK, Socket, Data} when OK =:= element(1, Messages) ->
- parse(<< Buffer/binary, Data/binary >>, State);
+ State1 = maybe_resize_buffer(State, Data),
+ parse(<< Buffer/binary, Data/binary >>, State1);
{Closed, Socket} when Closed =:= element(2, Messages) ->
terminate(State, {socket_error, closed, 'The socket has been closed.'});
{Error, Socket, Reason} when Error =:= element(3, Messages) ->
@@ -234,45 +256,60 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts,
{Passive, Socket} when Passive =:= element(4, Messages);
%% Hardcoded for compatibility with Ranch 1.x.
Passive =:= tcp_passive; Passive =:= ssl_passive ->
- setopts_active(State),
- loop(State);
+ safe_setopts_active(State),
+ before_loop(State);
%% Timeouts.
{timeout, Ref, {shutdown, Pid}} ->
cowboy_children:shutdown_timeout(Children, Ref, Pid),
- loop(State);
+ before_loop(State);
{timeout, TimerRef, Reason} ->
timeout(State, Reason);
{timeout, _, _} ->
- loop(State);
+ before_loop(State);
%% System messages.
{'EXIT', Parent, shutdown} ->
Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'},
- loop(initiate_closing(State, Reason));
+ before_loop(initiate_closing(State, Reason));
{'EXIT', Parent, Reason} ->
terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'});
{system, From, Request} ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State);
%% Messages pertaining to a stream.
{{Pid, StreamID}, Msg} when Pid =:= self() ->
- loop(info(State, StreamID, Msg));
+ before_loop(info(State, StreamID, Msg));
%% Exit signal from children.
Msg = {'EXIT', Pid, _} ->
- loop(down(State, Pid, Msg));
+ before_loop(down(State, Pid, Msg));
%% Calls from supervisor module.
{'$gen_call', From, Call} ->
cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE),
- loop(State);
+ before_loop(State);
%% Unknown messages.
Msg ->
cowboy:log(warning, "Received stray message ~p.~n", [Msg], Opts),
- loop(State)
+ before_loop(State)
after InactivityTimeout ->
terminate(State, {internal_error, timeout, 'No message or data received before timeout.'})
end.
-%% We do not set request_timeout if there are active streams.
-set_timeout(State=#state{streams=[_|_]}, request_timeout) ->
- State;
+%% For HTTP/1.1 we have two types of timeouts: the request_timeout
+%% is used when there is no currently ongoing request. This means
+%% that we are not currently sending or receiving data and that
+%% the next data to be received will be a new request. The
+%% request_timeout is set once when we no longer have ongoing
+%% requests, and runs until the full set of request headers
+%% is received. It is not reset.
+%%
+%% After that point we use the idle_timeout. We continue using
+%% the idle_timeout if pipelined requests come in: we are doing
+%% work and just want to ensure the socket is not half-closed.
+%% We continue using the idle_timeout up until there is no
+%% ongoing request. This includes requests that were processed
+%% and for which we only want to skip the body. Once the body
+%% has been read fully we can go back to request_timeout. The
+%% idle_timeout is reset every time we receive data and,
+%% optionally, every time we send data.
+
%% We do not set request_timeout if we are skipping a body.
set_timeout(State=#state{in_state=#ps_body{}}, request_timeout) ->
State;
@@ -282,6 +319,7 @@ set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout)
when element(1, InState) =/= ps_body ->
State;
%% Otherwise we can set the timeout.
+%% @todo Don't do this so often, use a strategy similar to Websocket/H2 if possible.
set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) ->
State = cancel_timeout(State0),
Default = case Name of
@@ -299,6 +337,14 @@ set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) ->
end,
State#state{timer=TimerRef}.
+maybe_reset_idle_timeout(State=#state{opts=Opts}) ->
+ case maps:get(reset_idle_timeout_on_send, Opts, false) of
+ true ->
+ set_timeout(State, idle_timeout);
+ false ->
+ State
+ end.
+
cancel_timeout(State=#state{timer=TimerRef}) ->
ok = case TimerRef of
undefined ->
@@ -306,7 +352,7 @@ cancel_timeout(State=#state{timer=TimerRef}) ->
_ ->
%% Do a synchronous cancel and remove the message if any
%% to avoid receiving stray messages.
- _ = erlang:cancel_timer(TimerRef),
+ _ = erlang:cancel_timer(TimerRef, [{async, false}, {info, false}]),
receive
{timeout, TimerRef, _} -> ok
after 0 ->
@@ -327,12 +373,12 @@ timeout(State, idle_timeout) ->
'Connection idle longer than configuration allows.'}).
parse(<<>>, State) ->
- loop(State#state{buffer= <<>>});
+ before_loop(State#state{buffer= <<>>});
%% Do not process requests that come in after the last request
%% and discard the buffer if any to save memory.
parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{},
last_streamid=LastStreamID}) when InStreamID > LastStreamID ->
- loop(State#state{buffer= <<>>});
+ before_loop(State#state{buffer= <<>>});
parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) ->
after_parse(parse_request(Buffer, State, EmptyLines));
parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) ->
@@ -355,16 +401,27 @@ after_parse({request, Req=#{streamid := StreamID, method := Method,
TE = maps:get(<<"te">>, Headers, undefined),
Streams = [#stream{id=StreamID, state=StreamState,
method=Method, version=Version, te=TE}|Streams0],
- State1 = case maybe_req_close(State0, Headers, Version) of
- close -> State0#state{streams=Streams, last_streamid=StreamID, flow=Flow};
- keepalive -> State0#state{streams=Streams, flow=Flow}
+ State1 = State0#state{streams=Streams, flow=Flow},
+ State2 = case maybe_req_close(State1, Headers, Version) of
+ close ->
+ State1#state{last_streamid=StreamID};
+ keepalive ->
+ State1;
+ bad_connection_header ->
+ error_terminate(400, State1, {connection_error, protocol_error,
+ 'The Connection header is invalid. (RFC7230 6.1)'})
end,
- State = set_timeout(State1, idle_timeout),
+ State = set_timeout(State2, idle_timeout),
parse(Buffer, commands(State, StreamID, Commands))
catch Class:Exception:Stacktrace ->
cowboy:log(cowboy_stream:make_error_log(init,
[StreamID, Req, Opts],
Class, Exception, Stacktrace), Opts),
+ %% We do not reset the idle timeout on send here
+ %% because an error occurred in the application. While we
+ %% are keeping the connection open for further requests we
+ %% do not want to keep the connection up too long if no
+ %% additional requests come in.
early_error(500, State0, {internal_error, {Class, Exception},
'Unhandled exception in cowboy_stream:init/3.'}, Req),
parse(Buffer, State0)
@@ -377,10 +434,7 @@ after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer
{Commands, StreamState} ->
Streams = lists:keyreplace(StreamID, #stream.id, Streams0,
Stream#stream{state=StreamState}),
- State1 = set_timeout(State0, case IsFin of
- fin -> request_timeout;
- nofin -> idle_timeout
- end),
+ State1 = set_timeout(State0, idle_timeout),
State = update_flow(IsFin, Data, State1#state{streams=Streams}),
parse(Buffer, commands(State, StreamID, Commands))
catch Class:Exception:Stacktrace ->
@@ -393,13 +447,13 @@ after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer
end;
%% No corresponding stream. We must skip the body of the previous request
%% in order to process the next one.
-after_parse({data, _, IsFin, _, State}) ->
- loop(set_timeout(State, case IsFin of
+after_parse({data, _, IsFin, _, State=#state{buffer=Buffer}}) ->
+ parse(Buffer, set_timeout(State, case IsFin of
fin -> request_timeout;
nofin -> idle_timeout
end));
after_parse({more, State}) ->
- loop(set_timeout(State, idle_timeout)).
+ before_loop(set_timeout(State, idle_timeout)).
update_flow(fin, _, State) ->
%% This function is only called after parsing, therefore we
@@ -459,8 +513,13 @@ parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLine
'The TRACE method is currently not implemented. (RFC7231 4.3.8)'});
%% Accept direct HTTP/2 only at the beginning of the connection.
<< "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 ->
- %% @todo Might be worth throwing to get a clean stacktrace.
- http2_upgrade(State, Buffer);
+ case lists:member(http2, maps:get(protocols, Opts, [http2, http])) of
+ true ->
+ http2_upgrade(State, Buffer);
+ false ->
+ error_terminate(501, State, {connection_error, no_error,
+ 'Prior knowledge upgrade to HTTP/2 is disabled by configuration.'})
+ end;
_ ->
parse_method(Buffer, State, <<>>,
maps:get(max_method_length, Opts, 32))
@@ -748,41 +807,44 @@ default_port(_) -> 80.
%% End of request parsing.
request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock, cert=Cert,
- proxy_header=ProxyHeader, in_streamid=StreamID, in_state=
+ opts=Opts, proxy_header=ProxyHeader, in_streamid=StreamID, in_state=
PS=#ps_header{method=Method, path=Path, qs=Qs, version=Version}},
- Headers0, Host, Port) ->
+ Headers, Host, Port) ->
Scheme = case Transport:secure() of
true -> <<"https">>;
false -> <<"http">>
end,
- {Headers, HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers0 of
+ {HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers of
+ #{<<"transfer-encoding">> := _, <<"content-length">> := _} ->
+ error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers}},
+ {stream_error, protocol_error,
+ 'The request had both transfer-encoding and content-length headers. (RFC7230 3.3.3)'});
#{<<"transfer-encoding">> := TransferEncoding0} ->
try cow_http_hd:parse_transfer_encoding(TransferEncoding0) of
[<<"chunked">>] ->
- {maps:remove(<<"content-length">>, Headers0),
- true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}};
+ {true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}};
_ ->
- error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
+ error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers}},
{stream_error, protocol_error,
'Cowboy only supports transfer-encoding: chunked. (RFC7230 3.3.1)'})
catch _:_ ->
- error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
+ error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers}},
{stream_error, protocol_error,
'The transfer-encoding header is invalid. (RFC7230 3.3.1)'})
end;
#{<<"content-length">> := <<"0">>} ->
- {Headers0, false, 0, undefined, undefined};
+ {false, 0, undefined, undefined};
#{<<"content-length">> := BinLength} ->
Length = try
cow_http_hd:parse_content_length(BinLength)
catch _:_ ->
- error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}},
+ error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers}},
{stream_error, protocol_error,
'The content-length header is invalid. (RFC7230 3.3.2)'})
end,
- {Headers0, true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
+ {true, Length, fun cow_http_te:stream_identity/2, {0, Length}};
_ ->
- {Headers0, false, 0, undefined, undefined}
+ {false, 0, undefined, undefined}
end,
Req0 = #{
ref => Ref,
@@ -809,7 +871,7 @@ request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock
undefined -> Req0;
_ -> Req0#{proxy_header => ProxyHeader}
end,
- case is_http2_upgrade(Headers, Version) of
+ case is_http2_upgrade(Headers, Version, Opts) of
false ->
State = case HasBody of
true ->
@@ -831,12 +893,13 @@ request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock
%% HTTP/2 upgrade.
-%% @todo We must not upgrade to h2c over a TLS connection.
is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
- <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') ->
+ <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1', Opts) ->
Conns = cow_http_hd:parse_connection(Conn),
- case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of
- {true, true} ->
+ case lists:member(<<"upgrade">>, Conns)
+ andalso lists:member(<<"http2-settings">>, Conns)
+ andalso lists:member(http2, maps:get(protocols, Opts, [http2, http])) of
+ true ->
Protocols = cow_http_hd:parse_upgrade(Upgrade),
case lists:member(<<"h2c">>, Protocols) of
true ->
@@ -847,17 +910,17 @@ is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade,
_ ->
false
end;
-is_http2_upgrade(_, _) ->
+is_http2_upgrade(_, _, _) ->
false.
%% Prior knowledge upgrade, without an HTTP/1.1 request.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
- proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
+ proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert}, Buffer) ->
case Transport:secure() of
false ->
_ = cancel_timeout(State),
- cowboy_http2:init(Parent, Ref, Socket, Transport,
- ProxyHeader, Opts, Peer, Sock, Cert, Buffer);
+ cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader,
+ opts_for_upgrade(State), Peer, Sock, Cert, Buffer);
true ->
error_terminate(400, State, {connection_error, protocol_error,
'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
@@ -865,22 +928,37 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran
%% Upgrade via an HTTP/1.1 request.
http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport,
- proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert},
+ proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert},
Buffer, HTTP2Settings, Req) ->
- %% @todo
- %% However if the client sent a body, we need to read the body in full
- %% and if we can't do that, return a 413 response. Some options are in order.
- %% Always half-closed stream coming from this side.
- try cow_http_hd:parse_http2_settings(HTTP2Settings) of
- Settings ->
- _ = cancel_timeout(State),
- cowboy_http2:init(Parent, Ref, Socket, Transport,
- ProxyHeader, Opts, Peer, Sock, Cert, Buffer, Settings, Req)
- catch _:_ ->
- error_terminate(400, State, {connection_error, protocol_error,
- 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
+ case Transport:secure() of
+ false ->
+ %% @todo
+ %% However if the client sent a body, we need to read the body in full
+ %% and if we can't do that, return a 413 response. Some options are in order.
+ %% Always half-closed stream coming from this side.
+ try cow_http_hd:parse_http2_settings(HTTP2Settings) of
+ Settings ->
+ _ = cancel_timeout(State),
+ cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader,
+ opts_for_upgrade(State), Peer, Sock, Cert, Buffer, Settings, Req)
+ catch _:_ ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'})
+ end;
+ true ->
+ error_terminate(400, State, {connection_error, protocol_error,
+ 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'})
end.
+opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=false}) ->
+ Opts;
+opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=Size,
+ dynamic_buffer_moving_average=MovingAvg}) ->
+ Opts#{
+ dynamic_buffer_initial_average => MovingAvg,
+ dynamic_buffer_initial_size => Size
+ }.
+
%% Request body parsing.
parse_body(Buffer, State=#state{in_streamid=StreamID, in_state=
@@ -953,6 +1031,11 @@ info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) ->
end.
%% Commands.
+%%
+%% The order in which the commands are given matters. Cowboy may
+%% stop processing commands after the 'stop' command or when an
+%% error occurred, such as a socket error. Critical commands such
+%% as 'spawn' should always be given first.
commands(State, _, []) ->
State;
@@ -1006,19 +1089,20 @@ commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID,
commands(State, StreamID, [{error_response, _, _, _}|Tail]) ->
commands(State, StreamID, Tail);
%% Send an informational response.
-commands(State=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams},
+commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams},
StreamID, [{inform, StatusCode, Headers}|Tail]) ->
%% @todo I'm pretty sure the last stream in the list is the one we want
%% considering all others are queued.
#stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
_ = case Version of
'HTTP/1.1' ->
- Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1',
- headers_to_list(Headers)));
+ ok = maybe_socket_error(State0, Transport:send(Socket,
+ cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))));
%% Do not send informational responses to HTTP/1.0 clients. (RFC7231 6.2)
'HTTP/1.0' ->
ok
end,
+ State = maybe_reset_idle_timeout(State0),
commands(State, StreamID, Tail);
%% Send a full response.
%%
@@ -1031,17 +1115,18 @@ commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, strea
%% considering all others are queued.
#stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams),
{State1, Headers} = connection(State0, Headers0, StreamID, Version),
- State = State1#state{out_state=done},
+ State2 = State1#state{out_state=done},
%% @todo Ensure content-length is set. 204 must never have content-length set.
Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)),
%% @todo 204 and 304 responses must not include a response body. (RFC7230 3.3.1, RFC7230 3.3.2)
case Body of
{sendfile, _, _, _} ->
- Transport:send(Socket, Response),
- sendfile(State, Body);
+ ok = maybe_socket_error(State2, Transport:send(Socket, Response)),
+ sendfile(State2, Body);
_ ->
- Transport:send(Socket, [Response, Body])
+ ok = maybe_socket_error(State2, Transport:send(Socket, [Response, Body]))
end,
+ State = maybe_reset_idle_timeout(State2),
commands(State, StreamID, Tail);
%% Send response headers and initiate chunked encoding or streaming.
commands(State0=#state{socket=Socket, transport=Transport,
@@ -1078,8 +1163,10 @@ commands(State0=#state{socket=Socket, transport=Transport,
trailers -> Headers1;
_ -> maps:remove(<<"trailer">>, Headers1)
end,
- {State, Headers} = connection(State1, Headers2, StreamID, Version),
- Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))),
+ {State2, Headers} = connection(State1, Headers2, StreamID, Version),
+ ok = maybe_socket_error(State2, Transport:send(Socket,
+ cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)))),
+ State = maybe_reset_idle_timeout(State2),
commands(State, StreamID, Tail);
%% Send a response body chunk.
%% @todo We need to kill the stream if it tries to send data before headers.
@@ -1098,27 +1185,33 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out
Stream0=#stream{method= <<"HEAD">>} ->
Stream0;
Stream0 when Size =:= 0, IsFin =:= fin, OutState =:= chunked ->
- Transport:send(Socket, <<"0\r\n\r\n">>),
+ ok = maybe_socket_error(State0,
+ Transport:send(Socket, <<"0\r\n\r\n">>)),
Stream0;
Stream0 when Size =:= 0 ->
Stream0;
Stream0 when is_tuple(Data), OutState =:= chunked ->
- Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>]),
+ ok = maybe_socket_error(State0,
+ Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>])),
sendfile(State0, Data),
- Transport:send(Socket,
- case IsFin of
- fin -> <<"\r\n0\r\n\r\n">>;
- nofin -> <<"\r\n">>
- end),
+ ok = maybe_socket_error(State0,
+ Transport:send(Socket,
+ case IsFin of
+ fin -> <<"\r\n0\r\n\r\n">>;
+ nofin -> <<"\r\n">>
+ end)
+ ),
Stream0;
Stream0 when OutState =:= chunked ->
- Transport:send(Socket, [
- integer_to_binary(Size, 16), <<"\r\n">>, Data,
- case IsFin of
- fin -> <<"\r\n0\r\n\r\n">>;
- nofin -> <<"\r\n">>
- end
- ]),
+ ok = maybe_socket_error(State0,
+ Transport:send(Socket, [
+ integer_to_binary(Size, 16), <<"\r\n">>, Data,
+ case IsFin of
+ fin -> <<"\r\n0\r\n\r\n">>;
+ nofin -> <<"\r\n">>
+ end
+ ])
+ ),
Stream0;
Stream0 when OutState =:= streaming ->
#stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize} = Stream0,
@@ -1130,34 +1223,39 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out
is_tuple(Data) ->
sendfile(State0, Data);
true ->
- Transport:send(Socket, Data)
+ ok = maybe_socket_error(State0, Transport:send(Socket, Data))
end,
Stream0#stream{local_sent_size=SentSize}
end,
- State = case IsFin of
+ State1 = case IsFin of
fin -> State0#state{out_state=done};
nofin -> State0
end,
+ State = maybe_reset_idle_timeout(State1),
Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream),
commands(State#state{streams=Streams}, StreamID, Tail);
-commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState},
+commands(State0=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState},
StreamID, [{trailers, Trailers}|Tail]) ->
case stream_te(OutState, lists:keyfind(StreamID, #stream.id, Streams)) of
trailers ->
- Transport:send(Socket, [
- <<"0\r\n">>,
- cow_http:headers(maps:to_list(Trailers)),
- <<"\r\n">>
- ]);
+ ok = maybe_socket_error(State0,
+ Transport:send(Socket, [
+ <<"0\r\n">>,
+ cow_http:headers(maps:to_list(Trailers)),
+ <<"\r\n">>
+ ])
+ );
no_trailers ->
- Transport:send(Socket, <<"0\r\n\r\n">>);
+ ok = maybe_socket_error(State0,
+ Transport:send(Socket, <<"0\r\n\r\n">>));
not_chunked ->
ok
end,
- commands(State#state{out_state=done}, StreamID, Tail);
+ State = maybe_reset_idle_timeout(State0#state{out_state=done}),
+ commands(State, StreamID, Tail);
%% Protocol takeover.
commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport,
- out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID,
+ out_state=OutState, buffer=Buffer, children=Children}, StreamID,
[{switch_protocol, Headers, Protocol, InitialState}|_Tail]) ->
%% @todo If there's streams opened after this one, fail instead of 101.
State1 = cancel_timeout(State0),
@@ -1174,28 +1272,26 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor
_ -> State
end,
#stream{state=StreamState} = lists:keyfind(StreamID, #stream.id, Streams),
- %% @todo We need to shutdown processes here first.
stream_call_terminate(StreamID, switch_protocol, StreamState, State),
%% Terminate children processes and flush any remaining messages from the mailbox.
cowboy_children:terminate(Children),
flush(Parent),
- Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState);
+ %% Turn off the trap_exit process flag
+ %% since this process will no longer be a supervisor.
+ process_flag(trap_exit, false),
+ Protocol:takeover(Parent, Ref, Socket, Transport,
+ opts_for_upgrade(State), Buffer, InitialState);
%% Set options dynamically.
-commands(State0=#state{overriden_opts=Opts},
- StreamID, [{set_options, SetOpts}|Tail]) ->
- State1 = case SetOpts of
- #{idle_timeout := IdleTimeout} ->
- set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
+commands(State0, StreamID, [{set_options, SetOpts}|Tail]) ->
+ State = maps:fold(fun
+ (chunked, Chunked, StateF=#state{overriden_opts=Opts}) ->
+ StateF#state{overriden_opts=Opts#{chunked => Chunked}};
+ (idle_timeout, IdleTimeout, StateF=#state{overriden_opts=Opts}) ->
+ set_timeout(StateF#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}},
idle_timeout);
- _ ->
- State0
- end,
- State = case SetOpts of
- #{chunked := Chunked} ->
- State1#state{overriden_opts=Opts#{chunked => Chunked}};
- _ ->
- State1
- end,
+ (_, _, StateF) ->
+ StateF
+ end, State0, SetOpts),
commands(State, StreamID, Tail);
%% Stream shutdown.
commands(State, StreamID, [stop|Tail]) ->
@@ -1238,10 +1334,12 @@ sendfile(State=#state{socket=Socket, transport=Transport, opts=Opts},
{sendfile, Offset, Bytes, Path}) ->
try
%% When sendfile is disabled we explicitly use the fallback.
- _ = case maps:get(sendfile, Opts, true) of
- true -> Transport:sendfile(Socket, Path, Offset, Bytes);
- false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
- end,
+ {ok, _} = maybe_socket_error(State,
+ case maps:get(sendfile, Opts, true) of
+ true -> Transport:sendfile(Socket, Path, Offset, Bytes);
+ false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, [])
+ end
+ ),
ok
catch _:_ ->
terminate(State, {socket_error, sendfile_crash,
@@ -1312,20 +1410,24 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta
end.
stream_next(State0=#state{opts=Opts, active=Active, out_streamid=OutStreamID, streams=Streams}) ->
+ %% Enable active mode again if it was disabled.
+ State1 = case Active of
+ true -> State0;
+ false -> active(State0)
+ end,
NextOutStreamID = OutStreamID + 1,
case lists:keyfind(NextOutStreamID, #stream.id, Streams) of
false ->
- State0#state{out_streamid=NextOutStreamID, out_state=wait};
+ State = State1#state{out_streamid=NextOutStreamID, out_state=wait},
+ %% There are no streams remaining. We therefore can
+ %% and want to switch back to the request_timeout.
+ set_timeout(State, request_timeout);
#stream{queue=Commands} ->
- State = case Active of
- true -> State0;
- false -> active(State0)
- end,
%% @todo Remove queue from the stream.
%% We set the flow to the initial flow size even though
%% we might have sent some data through already due to pipelining.
Flow = maps:get(initial_stream_flow_size, Opts, 65535),
- commands(State#state{flow=Flow, out_streamid=NextOutStreamID, out_state=wait},
+ commands(State1#state{flow=Flow, out_streamid=NextOutStreamID, out_state=wait},
NextOutStreamID, Commands)
end.
@@ -1341,17 +1443,23 @@ stream_call_terminate(StreamID, Reason, StreamState, #state{opts=Opts}) ->
maybe_req_close(#state{opts=#{http10_keepalive := false}}, _, 'HTTP/1.0') ->
close;
maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') ->
- Conns = cow_http_hd:parse_connection(Conn),
- case lists:member(<<"keep-alive">>, Conns) of
- true -> keepalive;
- false -> close
+ try cow_http_hd:parse_connection(Conn) of
+ Conns ->
+ case lists:member(<<"keep-alive">>, Conns) of
+ true -> keepalive;
+ false -> close
+ end
+ catch _:_ ->
+ bad_connection_header
end;
maybe_req_close(_, _, 'HTTP/1.0') ->
close;
maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') ->
- case connection_hd_is_close(Conn) of
+ try connection_hd_is_close(Conn) of
true -> close;
false -> keepalive
+ catch _:_ ->
+ bad_connection_header
end;
maybe_req_close(_, _, _) ->
keepalive.
@@ -1420,37 +1528,55 @@ error_terminate(StatusCode, State=#state{ref=Ref, peer=Peer, in_state=StreamStat
early_error(StatusCode, State, Reason, PartialReq) ->
early_error(StatusCode, State, Reason, PartialReq, #{}).
-early_error(StatusCode0, #state{socket=Socket, transport=Transport,
+early_error(StatusCode0, State=#state{socket=Socket, transport=Transport,
opts=Opts, in_streamid=StreamID}, Reason, PartialReq, RespHeaders0) ->
RespHeaders1 = RespHeaders0#{<<"content-length">> => <<"0">>},
Resp = {response, StatusCode0, RespHeaders1, <<>>},
try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of
{response, StatusCode, RespHeaders, RespBody} ->
- Transport:send(Socket, [
- cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)),
- %% @todo We shouldn't send the body when the method is HEAD.
- %% @todo Technically we allow the sendfile tuple.
- RespBody
- ])
+ ok = maybe_socket_error(State,
+ Transport:send(Socket, [
+ cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)),
+ %% @todo We shouldn't send the body when the method is HEAD.
+ %% @todo Technically we allow the sendfile tuple.
+ RespBody
+ ])
+ )
catch Class:Exception:Stacktrace ->
cowboy:log(cowboy_stream:make_error_log(early_error,
[StreamID, Reason, PartialReq, Resp, Opts],
Class, Exception, Stacktrace), Opts),
%% We still need to send an error response, so send what we initially
%% wanted to send. It's better than nothing.
- Transport:send(Socket, cow_http:response(StatusCode0,
- 'HTTP/1.1', maps:to_list(RespHeaders1)))
- end,
- ok.
+ ok = maybe_socket_error(State,
+ Transport:send(Socket, cow_http:response(StatusCode0,
+ 'HTTP/1.1', maps:to_list(RespHeaders1)))
+ )
+ end.
initiate_closing(State=#state{streams=[]}, Reason) ->
terminate(State, Reason);
-initiate_closing(State=#state{streams=[_Stream|Streams],
+initiate_closing(State=#state{streams=Streams,
out_streamid=OutStreamID}, Reason) ->
- terminate_all_streams(State, Streams, Reason),
- State#state{last_streamid=OutStreamID}.
-
--spec terminate(_, _) -> no_return().
+ {value, LastStream, TerminatedStreams}
+ = lists:keytake(OutStreamID, #stream.id, Streams),
+ terminate_all_streams(State, TerminatedStreams, Reason),
+ State#state{streams=[LastStream], last_streamid=OutStreamID}.
+
+%% Function replicated in cowboy_http2.
+maybe_socket_error(State, {error, closed}) ->
+ terminate(State, {socket_error, closed, 'The socket has been closed.'});
+maybe_socket_error(State, Reason) ->
+ maybe_socket_error(State, Reason, 'An error has occurred on the socket.').
+
+maybe_socket_error(_, Result = ok, _) ->
+ Result;
+maybe_socket_error(_, Result = {ok, _}, _) ->
+ Result;
+maybe_socket_error(State, {error, Reason}, Human) ->
+ terminate(State, {socket_error, Reason, Human}).
+
+-spec terminate(#state{} | undefined, _) -> no_return().
terminate(undefined, Reason) ->
exit({shutdown, Reason});
terminate(State=#state{streams=Streams, children=Children}, Reason) ->
@@ -1484,6 +1610,9 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) ->
terminate_linger_before_loop(State, TimerRef, Messages) ->
%% We may already be in active mode when we do this
%% but it's OK because we are shutting down anyway.
+ %%
+ %% We specially handle the socket error to terminate
+ %% when an error occurs.
case setopts_active(State) of
ok ->
terminate_linger_loop(State, TimerRef, Messages);
@@ -1511,12 +1640,12 @@ terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) ->
-spec system_continue(_, _, #state{}) -> ok.
system_continue(_, _, State) ->
- loop(State).
+ before_loop(State).
-spec system_terminate(any(), _, _, #state{}) -> no_return().
system_terminate(Reason0, _, _, State) ->
Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'},
- loop(initiate_closing(State, Reason)).
+ before_loop(initiate_closing(State, Reason)).
-spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}.
system_code_change(Misc, _, _, _) ->