diff options
Diffstat (limited to 'src')
30 files changed, 3193 insertions, 470 deletions
diff --git a/src/cowboy.erl b/src/cowboy.erl index c4be25b..6a5634e 100644 --- a/src/cowboy.erl +++ b/src/cowboy.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -16,13 +16,19 @@ -export([start_clear/3]). -export([start_tls/3]). +-export([start_quic/3]). -export([stop_listener/1]). +-export([get_env/2]). +-export([get_env/3]). -export([set_env/3]). %% Internal. -export([log/2]). -export([log/4]). +%% Don't warn about the bad quicer specs. +-dialyzer([{nowarn_function, start_quic/3}]). + -type opts() :: cowboy_http:opts() | cowboy_http2:opts(). -export_type([opts/0]). @@ -42,35 +48,156 @@ -spec start_clear(ranch:ref(), ranch:opts(), opts()) -> {ok, pid()} | {error, any()}. + start_clear(Ref, TransOpts0, ProtoOpts0) -> TransOpts1 = ranch:normalize_opts(TransOpts0), - {TransOpts, ConnectionType} = ensure_connection_type(TransOpts1), - ProtoOpts = ProtoOpts0#{connection_type => ConnectionType}, + {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0), + {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2), + ProtoOpts = ProtoOpts0#{ + connection_type => ConnectionType, + dynamic_buffer => DynamicBuffer + }, ranch:start_listener(Ref, ranch_tcp, TransOpts, cowboy_clear, ProtoOpts). -spec start_tls(ranch:ref(), ranch:opts(), opts()) -> {ok, pid()} | {error, any()}. + start_tls(Ref, TransOpts0, ProtoOpts0) -> TransOpts1 = ranch:normalize_opts(TransOpts0), - SocketOpts = maps:get(socket_opts, TransOpts1, []), - TransOpts2 = TransOpts1#{socket_opts => [ - {next_protocols_advertised, [<<"h2">>, <<"http/1.1">>]}, - {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]} - |SocketOpts]}, - {TransOpts, ConnectionType} = ensure_connection_type(TransOpts2), - ProtoOpts = ProtoOpts0#{connection_type => ConnectionType}, + {TransOpts2, DynamicBuffer} = ensure_dynamic_buffer(TransOpts1, ProtoOpts0), + TransOpts3 = ensure_alpn(TransOpts2), + {TransOpts, ConnectionType} = ensure_connection_type(TransOpts3), + ProtoOpts = ProtoOpts0#{ + connection_type => ConnectionType, + dynamic_buffer => DynamicBuffer + }, ranch:start_listener(Ref, ranch_ssl, TransOpts, cowboy_tls, ProtoOpts). +%% @todo Experimental function to start a barebone QUIC listener. +%% This will need to be reworked to be closer to Ranch +%% listeners and provide equivalent features. +%% +%% @todo Better type for transport options. Might require fixing quicer types. + +-spec start_quic(ranch:ref(), #{socket_opts => [{atom(), _}]}, cowboy_http3:opts()) + -> {ok, pid()}. + +%% @todo Implement dynamic_buffer for HTTP/3 if/when it applies. +start_quic(Ref, TransOpts, ProtoOpts) -> + {ok, _} = application:ensure_all_started(quicer), + Parent = self(), + SocketOpts0 = maps:get(socket_opts, TransOpts, []), + {Port, SocketOpts2} = case lists:keytake(port, 1, SocketOpts0) of + {value, {port, Port0}, SocketOpts1} -> + {Port0, SocketOpts1}; + false -> + {port_0(), SocketOpts0} + end, + SocketOpts = [ + {alpn, ["h3"]}, %% @todo Why not binary? + %% We only need 3 for control and QPACK enc/dec, + %% but we need more for WebTransport. %% @todo Use 3 if WT is disabled. + {peer_unidi_stream_count, 100}, + {peer_bidi_stream_count, 100}, + %% For WebTransport. + %% @todo We probably don't want it enabled if WT isn't used. + {datagram_send_enabled, 1}, + {datagram_receive_enabled, 1} + |SocketOpts2], + _ListenerPid = spawn(fun() -> + {ok, Listener} = quicer:listen(Port, SocketOpts), + Parent ! {ok, Listener}, + _AcceptorPid = [spawn(fun AcceptLoop() -> + {ok, Conn} = quicer:accept(Listener, []), + Pid = spawn(fun() -> + receive go -> ok end, + %% We have to do the handshake after handing control of + %% the connection otherwise streams may come in before + %% the controlling process is changed and messages will + %% not be sent to the correct process. + {ok, Conn} = quicer:handshake(Conn), + process_flag(trap_exit, true), %% @todo Only if supervisor though. + try cowboy_http3:init(Parent, Ref, Conn, ProtoOpts) + catch + exit:{shutdown,_} -> ok; + C:E:S -> log(error, "CRASH ~p:~p:~p", [C,E,S], ProtoOpts) + end + end), + ok = quicer:controlling_process(Conn, Pid), + Pid ! go, + AcceptLoop() + end) || _ <- lists:seq(1, 20)], + %% Listener process must not terminate. + receive after infinity -> ok end + end), + receive + {ok, Listener} -> + {ok, Listener} + end. + +%% Select a random UDP port using gen_udp because quicer +%% does not provide equivalent functionality. Taken from +%% quicer test suites. +port_0() -> + {ok, Socket} = gen_udp:open(0, [{reuseaddr, true}]), + {ok, {_, Port}} = inet:sockname(Socket), + gen_udp:close(Socket), + case os:type() of + {unix, darwin} -> + %% Apparently macOS doesn't free the port immediately. + timer:sleep(500); + _ -> + ok + end, + Port. + +ensure_alpn(TransOpts) -> + SocketOpts = maps:get(socket_opts, TransOpts, []), + TransOpts#{socket_opts => [ + {alpn_preferred_protocols, [<<"h2">>, <<"http/1.1">>]} + |SocketOpts]}. + ensure_connection_type(TransOpts=#{connection_type := ConnectionType}) -> {TransOpts, ConnectionType}; ensure_connection_type(TransOpts) -> {TransOpts#{connection_type => supervisor}, supervisor}. +%% Dynamic buffer was set; accept transport options as-is. +%% Note that initial 'buffer' size may be lower than dynamic buffer allows. +ensure_dynamic_buffer(TransOpts, #{dynamic_buffer := DynamicBuffer}) -> + {TransOpts, DynamicBuffer}; +%% Dynamic buffer was not set; define default dynamic buffer +%% only if 'buffer' size was not configured. In that case we +%% set the 'buffer' size to the lowest value. +ensure_dynamic_buffer(TransOpts=#{socket_opts := SocketOpts}, _) -> + case proplists:get_value(buffer, SocketOpts, undefined) of + undefined -> + {TransOpts#{socket_opts => [{buffer, 1024}|SocketOpts]}, {1024, 131072}}; + _ -> + {TransOpts, false} + end. + -spec stop_listener(ranch:ref()) -> ok | {error, not_found}. + stop_listener(Ref) -> ranch:stop_listener(Ref). +-spec get_env(ranch:ref(), atom()) -> ok. + +get_env(Ref, Name) -> + Opts = ranch:get_protocol_options(Ref), + Env = maps:get(env, Opts, #{}), + maps:get(Name, Env). + +-spec get_env(ranch:ref(), atom(), any()) -> ok. + +get_env(Ref, Name, Default) -> + Opts = ranch:get_protocol_options(Ref), + Env = maps:get(env, Opts, #{}), + maps:get(Name, Env, Default). + -spec set_env(ranch:ref(), atom(), any()) -> ok. + set_env(Ref, Name, Value) -> Opts = ranch:get_protocol_options(Ref), Env = maps:get(env, Opts, #{}), @@ -80,10 +207,12 @@ set_env(Ref, Name, Value) -> %% Internal. -spec log({log, logger:level(), io:format(), list()}, opts()) -> ok. + log({log, Level, Format, Args}, Opts) -> log(Level, Format, Args, Opts). -spec log(logger:level(), io:format(), list(), opts()) -> ok. + log(Level, Format, Args, #{logger := Logger}) when Logger =/= error_logger -> _ = Logger:Level(Format, Args), diff --git a/src/cowboy_app.erl b/src/cowboy_app.erl index 74cba41..e58e1f6 100644 --- a/src/cowboy_app.erl +++ b/src/cowboy_app.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_bstr.erl b/src/cowboy_bstr.erl index d8041e4..d0e7301 100644 --- a/src/cowboy_bstr.erl +++ b/src/cowboy_bstr.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_children.erl b/src/cowboy_children.erl index 05d39fb..2e00c37 100644 --- a/src/cowboy_children.erl +++ b/src/cowboy_children.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_clear.erl b/src/cowboy_clear.erl index 4f3a234..845fdc1 100644 --- a/src/cowboy_clear.erl +++ b/src/cowboy_clear.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2016-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -33,19 +33,9 @@ start_link(Ref, Transport, Opts) -> -spec connection_process(pid(), ranch:ref(), module(), cowboy:opts()) -> ok. connection_process(Parent, Ref, Transport, Opts) -> - ProxyInfo = case maps:get(proxy_header, Opts, false) of - true -> - {ok, ProxyInfo0} = ranch:recv_proxy_header(Ref, 1000), - ProxyInfo0; - false -> - undefined - end, + ProxyInfo = get_proxy_info(Ref, Opts), {ok, Socket} = ranch:handshake(Ref), %% Use cowboy_http2 directly only when 'http' is missing. - %% Otherwise switch to cowboy_http2 from cowboy_http. - %% - %% @todo Extend this option to cowboy_tls and allow disabling - %% the switch to cowboy_http2 in cowboy_http. Also document it. Protocol = case maps:get(protocols, Opts, [http2, http]) of [http2] -> cowboy_http2; [_|_] -> cowboy_http @@ -58,3 +48,11 @@ init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, Protocol) -> supervisor -> process_flag(trap_exit, true) end, Protocol:init(Parent, Ref, Socket, Transport, ProxyInfo, Opts). + +get_proxy_info(Ref, #{proxy_header := true}) -> + case ranch:recv_proxy_header(Ref, 1000) of + {ok, ProxyInfo} -> ProxyInfo; + {error, closed} -> exit({shutdown, closed}) + end; +get_proxy_info(_, _) -> + undefined. diff --git a/src/cowboy_clock.erl b/src/cowboy_clock.erl index 28f8a1b..b6e39f4 100644 --- a/src/cowboy_clock.erl +++ b/src/cowboy_clock.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -93,7 +93,7 @@ handle_cast(_Msg, State) -> -spec handle_info(any(), State) -> {noreply, State} when State::#state{}. handle_info(update, #state{universaltime=Prev, rfc1123=B1, tref=TRef0}) -> %% Cancel the timer in case an external process sent an update message. - _ = erlang:cancel_timer(TRef0), + _ = erlang:cancel_timer(TRef0, [{async, true}, {info, false}]), T = erlang:universaltime(), B2 = update_rfc1123(B1, Prev, T), ets:insert(?MODULE, {rfc1123, B2}), diff --git a/src/cowboy_compress_h.erl b/src/cowboy_compress_h.erl index 374cb6a..785eb0d 100644 --- a/src/cowboy_compress_h.erl +++ b/src/cowboy_compress_h.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -96,11 +96,14 @@ check_req(Req) -> %% Do not compress responses that contain the content-encoding header. check_resp_headers(#{<<"content-encoding">> := _}, State) -> State#state{compress=undefined}; +%% Do not compress responses that contain the etag header. +check_resp_headers(#{<<"etag">> := _}, State) -> + State#state{compress=undefined}; check_resp_headers(_, State) -> State. fold(Commands, State=#state{compress=undefined}) -> - {Commands, State}; + fold_vary_only(Commands, State, []); fold(Commands, State) -> fold(Commands, State, []). @@ -108,32 +111,32 @@ fold([], State, Acc) -> {lists:reverse(Acc), State}; %% We do not compress full sendfile bodies. fold([Response={response, _, _, {sendfile, _, _, _}}|Tail], State, Acc) -> - fold(Tail, State, [Response|Acc]); + fold(Tail, State, [vary_response(Response)|Acc]); %% We compress full responses directly, unless they are lower than %% the configured threshold or we find we are not able to by looking at the headers. fold([Response0={response, _, Headers, Body}|Tail], State0=#state{threshold=CompressThreshold}, Acc) -> case check_resp_headers(Headers, State0) of State=#state{compress=undefined} -> - fold(Tail, State, [Response0|Acc]); + fold(Tail, State, [vary_response(Response0)|Acc]); State1 -> BodyLength = iolist_size(Body), if BodyLength =< CompressThreshold -> - fold(Tail, State1, [Response0|Acc]); + fold(Tail, State1, [vary_response(Response0)|Acc]); true -> {Response, State} = gzip_response(Response0, State1), - fold(Tail, State, [Response|Acc]) + fold(Tail, State, [vary_response(Response)|Acc]) end end; %% Check headers and initiate compression... fold([Response0={headers, _, Headers}|Tail], State0, Acc) -> case check_resp_headers(Headers, State0) of State=#state{compress=undefined} -> - fold(Tail, State, [Response0|Acc]); + fold(Tail, State, [vary_headers(Response0)|Acc]); State1 -> {Response, State} = gzip_headers(Response0, State1), - fold(Tail, State, [Response|Acc]) + fold(Tail, State, [vary_headers(Response)|Acc]) end; %% then compress each data commands individually. fold([Data0={data, _, _}|Tail], State0=#state{compress=gzip}, Acc) -> @@ -161,6 +164,15 @@ fold([SetOptions={set_options, Opts}|Tail], State=#state{ fold([Command|Tail], State, Acc) -> fold(Tail, State, [Command|Acc]). +fold_vary_only([], State, Acc) -> + {lists:reverse(Acc), State}; +fold_vary_only([Response={response, _, _, _}|Tail], State, Acc) -> + fold_vary_only(Tail, State, [vary_response(Response)|Acc]); +fold_vary_only([Response={headers, _, _}|Tail], State, Acc) -> + fold_vary_only(Tail, State, [vary_headers(Response)|Acc]); +fold_vary_only([Command|Tail], State, Acc) -> + fold_vary_only(Tail, State, [Command|Acc]). + buffering_to_zflush(true) -> none; buffering_to_zflush(false) -> sync. @@ -180,10 +192,10 @@ gzip_response({response, Status, Headers, Body}, State) -> after zlib:close(Z) end, - {{response, Status, vary(Headers#{ + {{response, Status, Headers#{ <<"content-length">> => integer_to_binary(iolist_size(GzBody)), <<"content-encoding">> => <<"gzip">> - }), GzBody}, State}. + }, GzBody}, State}. gzip_headers({headers, Status, Headers0}, State) -> Z = zlib:open(), @@ -191,9 +203,15 @@ gzip_headers({headers, Status, Headers0}, State) -> %% @todo It might be good to allow them to be configured? zlib:deflateInit(Z, default, deflated, 31, 8, default), Headers = maps:remove(<<"content-length">>, Headers0), - {{headers, Status, vary(Headers#{ + {{headers, Status, Headers#{ <<"content-encoding">> => <<"gzip">> - })}, State#state{deflate=Z}}. + }}, State#state{deflate=Z}}. + +vary_response({response, Status, Headers, Body}) -> + {response, Status, vary(Headers), Body}. + +vary_headers({headers, Status, Headers}) -> + {headers, Status, vary(Headers)}. %% We must add content-encoding to vary if it's not already there. vary(Headers=#{<<"vary">> := Vary}) -> diff --git a/src/cowboy_constraints.erl b/src/cowboy_constraints.erl index 6509c4b..84ff249 100644 --- a/src/cowboy_constraints.erl +++ b/src/cowboy_constraints.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2014-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_decompress_h.erl b/src/cowboy_decompress_h.erl new file mode 100644 index 0000000..84283e5 --- /dev/null +++ b/src/cowboy_decompress_h.erl @@ -0,0 +1,240 @@ +%% Copyright (c) jdamanalo <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +-module(cowboy_decompress_h). +-behavior(cowboy_stream). + +-export([init/3]). +-export([data/4]). +-export([info/3]). +-export([terminate/3]). +-export([early_error/5]). + +-record(state, { + next :: any(), + enabled = true :: boolean(), + ratio_limit :: non_neg_integer() | undefined, + compress = undefined :: undefined | gzip, + inflate = undefined :: undefined | zlib:zstream(), + is_reading = false :: boolean(), + + %% We use a list of binaries to avoid doing unnecessary + %% memory allocations when inflating. We convert to binary + %% when we propagate the data. The data must be reversed + %% before converting to binary or inflating: this is done + %% via the buffer_to_binary/buffer_to_iovec functions. + read_body_buffer = [] :: [binary()], + read_body_is_fin = nofin :: nofin | {fin, non_neg_integer()} +}). + +-spec init(cowboy_stream:streamid(), cowboy_req:req(), cowboy:opts()) + -> {cowboy_stream:commands(), #state{}}. +init(StreamID, Req0, Opts) -> + Enabled = maps:get(decompress_enabled, Opts, true), + RatioLimit = maps:get(decompress_ratio_limit, Opts, 20), + {Req, State} = check_and_update_req(Req0), + Inflate = case State#state.compress of + undefined -> + undefined; + gzip -> + Z = zlib:open(), + zlib:inflateInit(Z, 31), + Z + end, + {Commands, Next} = cowboy_stream:init(StreamID, Req, Opts), + fold(Commands, State#state{next=Next, enabled=Enabled, + ratio_limit=RatioLimit, inflate=Inflate}). + +-spec data(cowboy_stream:streamid(), cowboy_stream:fin(), cowboy_req:resp_body(), State) + -> {cowboy_stream:commands(), State} when State::#state{}. +data(StreamID, IsFin, Data, State=#state{next=Next0, inflate=undefined}) -> + {Commands, Next} = cowboy_stream:data(StreamID, IsFin, Data, Next0), + fold(Commands, State#state{next=Next, read_body_is_fin=IsFin}); +data(StreamID, IsFin, Data, State=#state{next=Next0, enabled=false, read_body_buffer=Buffer}) -> + {Commands, Next} = cowboy_stream:data(StreamID, IsFin, + buffer_to_binary([Data|Buffer]), Next0), + fold(Commands, State#state{next=Next, read_body_is_fin=IsFin}); +data(StreamID, IsFin, Data0, State0=#state{next=Next0, ratio_limit=RatioLimit, + inflate=Z, is_reading=true, read_body_buffer=Buffer}) -> + Data = buffer_to_iovec([Data0|Buffer]), + Limit = iolist_size(Data) * RatioLimit, + case cow_deflate:inflate(Z, Data, Limit) of + {error, ErrorType} -> + zlib:close(Z), + Status = case ErrorType of + data_error -> 400; + size_error -> 413 + end, + Commands = [ + {error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>}, + stop + ], + fold(Commands, State0#state{inflate=undefined, read_body_buffer=[]}); + {ok, Inflated} -> + State = case IsFin of + nofin -> + State0; + fin -> + zlib:close(Z), + State0#state{inflate=undefined} + end, + {Commands, Next} = cowboy_stream:data(StreamID, IsFin, Inflated, Next0), + fold(Commands, State#state{next=Next, read_body_buffer=[], + read_body_is_fin=IsFin}) + end; +data(_, IsFin, Data, State=#state{read_body_buffer=Buffer}) -> + {[], State#state{read_body_buffer=[Data|Buffer], read_body_is_fin=IsFin}}. + +-spec info(cowboy_stream:streamid(), any(), State) + -> {cowboy_stream:commands(), State} when State::#state{}. +info(StreamID, Info, State=#state{next=Next0, inflate=undefined}) -> + {Commands, Next} = cowboy_stream:info(StreamID, Info, Next0), + fold(Commands, State#state{next=Next}); +info(StreamID, Info={CommandTag, _, _, _, _}, State=#state{next=Next0, read_body_is_fin=IsFin}) + when CommandTag =:= read_body; CommandTag =:= read_body_timeout -> + {Commands0, Next1} = cowboy_stream:info(StreamID, Info, Next0), + {Commands, Next} = data(StreamID, IsFin, <<>>, State#state{next=Next1, is_reading=true}), + fold(Commands ++ Commands0, Next); +info(StreamID, Info={set_options, Opts}, State0=#state{next=Next0, + enabled=Enabled0, ratio_limit=RatioLimit0, is_reading=IsReading}) -> + Enabled = maps:get(decompress_enabled, Opts, Enabled0), + RatioLimit = maps:get(decompress_ratio_limit, Opts, RatioLimit0), + {Commands, Next} = cowboy_stream:info(StreamID, Info, Next0), + %% We can't change the enabled setting after we start reading, + %% otherwise the data becomes garbage. Changing the setting + %% is not treated as an error, it is just ignored. + State = case IsReading of + true -> State0; + false -> State0#state{enabled=Enabled} + end, + fold(Commands, State#state{next=Next, ratio_limit=RatioLimit}); +info(StreamID, Info, State=#state{next=Next0}) -> + {Commands, Next} = cowboy_stream:info(StreamID, Info, Next0), + fold(Commands, State#state{next=Next}). + +-spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), #state{}) -> any(). +terminate(StreamID, Reason, #state{next=Next, inflate=Z}) -> + case Z of + undefined -> ok; + _ -> zlib:close(Z) + end, + cowboy_stream:terminate(StreamID, Reason, Next). + +-spec early_error(cowboy_stream:streamid(), cowboy_stream:reason(), + cowboy_stream:partial_req(), Resp, cowboy:opts()) -> Resp + when Resp::cowboy_stream:resp_command(). +early_error(StreamID, Reason, PartialReq, Resp, Opts) -> + cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts). + +%% Internal. + +%% Check whether the request needs content decoding, and if it does +%% whether it fits our criteria for decoding. We also update the +%% Req to indicate whether content was decoded. +%% +%% We always set the content_decoded value in the Req because it +%% indicates whether content decoding was attempted. +%% +%% A malformed content-encoding header results in no decoding. +check_and_update_req(Req=#{headers := Headers}) -> + ContentDecoded = maps:get(content_decoded, Req, []), + try cowboy_req:parse_header(<<"content-encoding">>, Req) of + %% We only automatically decompress when gzip is the only + %% encoding used. Since it's the only encoding used, we + %% can remove the header entirely before passing the Req + %% forward. + [<<"gzip">>] -> + {Req#{ + headers => maps:remove(<<"content-encoding">>, Headers), + content_decoded => [<<"gzip">>|ContentDecoded] + }, #state{compress=gzip}}; + _ -> + {Req#{content_decoded => ContentDecoded}, + #state{compress=undefined}} + catch _:_ -> + {Req#{content_decoded => ContentDecoded}, + #state{compress=undefined}} + end. + +buffer_to_iovec(Buffer) -> + lists:reverse(Buffer). + +buffer_to_binary(Buffer) -> + iolist_to_binary(lists:reverse(Buffer)). + +fold(Commands, State) -> + fold(Commands, State, []). + +fold([], State, Acc) -> + {lists:reverse(Acc), State}; +fold([{response, Status, Headers0, Body}|Tail], State=#state{enabled=true}, Acc) -> + Headers = add_accept_encoding(Headers0), + fold(Tail, State, [{response, Status, Headers, Body}|Acc]); +fold([{headers, Status, Headers0} | Tail], State=#state{enabled=true}, Acc) -> + Headers = add_accept_encoding(Headers0), + fold(Tail, State, [{headers, Status, Headers}|Acc]); +fold([Command|Tail], State, Acc) -> + fold(Tail, State, [Command|Acc]). + +add_accept_encoding(Headers=#{<<"accept-encoding">> := AcceptEncoding}) -> + try cow_http_hd:parse_accept_encoding(iolist_to_binary(AcceptEncoding)) of + List -> + case lists:keyfind(<<"gzip">>, 1, List) of + %% gzip is excluded but this handler is enabled; we replace. + {_, 0} -> + Replaced = lists:keyreplace(<<"gzip">>, 1, List, {<<"gzip">>, 1000}), + Codings = build_accept_encoding(Replaced), + Headers#{<<"accept-encoding">> => Codings}; + {_, _} -> + Headers; + false -> + case lists:keyfind(<<"*">>, 1, List) of + %% Others are excluded along with gzip; we add. + {_, 0} -> + WithGzip = [{<<"gzip">>, 1000} | List], + Codings = build_accept_encoding(WithGzip), + Headers#{<<"accept-encoding">> => Codings}; + {_, _} -> + Headers; + false -> + Headers#{<<"accept-encoding">> => [AcceptEncoding, <<", gzip">>]} + end + end + catch _:_ -> + %% The accept-encoding header is invalid. Probably empty. We replace it with ours. + Headers#{<<"accept-encoding">> => <<"gzip">>} + end; +add_accept_encoding(Headers) -> + Headers#{<<"accept-encoding">> => <<"gzip">>}. + +%% @todo From cowlib, maybe expose? +qvalue_to_iodata(0) -> <<"0">>; +qvalue_to_iodata(Q) when Q < 10 -> [<<"0.00">>, integer_to_binary(Q)]; +qvalue_to_iodata(Q) when Q < 100 -> [<<"0.0">>, integer_to_binary(Q)]; +qvalue_to_iodata(Q) when Q < 1000 -> [<<"0.">>, integer_to_binary(Q)]; +qvalue_to_iodata(1000) -> <<"1">>. + +%% @todo Should be added to Cowlib. +build_accept_encoding([{ContentCoding, Q}|Tail]) -> + Weight = iolist_to_binary(qvalue_to_iodata(Q)), + Acc = <<ContentCoding/binary, ";q=", Weight/binary>>, + do_build_accept_encoding(Tail, Acc). + +do_build_accept_encoding([{ContentCoding, Q}|Tail], Acc0) -> + Weight = iolist_to_binary(qvalue_to_iodata(Q)), + Acc = <<Acc0/binary, ", ", ContentCoding/binary, ";q=", Weight/binary>>, + do_build_accept_encoding(Tail, Acc); +do_build_accept_encoding([], Acc) -> + Acc. diff --git a/src/cowboy_dynamic_buffer.hrl b/src/cowboy_dynamic_buffer.hrl new file mode 100644 index 0000000..4d05e50 --- /dev/null +++ b/src/cowboy_dynamic_buffer.hrl @@ -0,0 +1,80 @@ +%% Copyright (c) Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% These functions are common to cowboy_http, cowboy_http2 and +%% cowboy_websocket. It requires the options and the state +%% to use the same field names. + +%% Experiments have shown that the size of the 'buffer' can greatly +%% impact performance: a buffer too small leads to more messages +%% being handled and typically more binary appends; and a buffer +%% too large results in inefficient use of memory which in turn +%% reduces the throughput, presumably because large binary appends +%% are not as efficient as smaller ones, and because while the +%% buffer gets allocated only when there is data, the allocated +%% size remains until the binary is GC and so under-use hurts. +%% +%% The performance of a given 'buffer' size will also depend on +%% how the client is sending data, and on the protocol. For example, +%% HTTP/1.1 doesn't need a very large 'buffer' size for reading +%% request headers, but it does need one for reading large request +%% bodies. At the same time, HTTP/2 performs best reading large +%% request bodies when the 'buffer' size is about half that of +%% HTTP/1.1. +%% +%% It therefore becomes important to resize the buffer dynamically +%% depending on what is currently going on. We do this based on +%% the size of data packets we received from the transport. We +%% maintain a moving average and when that moving average is +%% 90% of the current 'buffer' size, we double the 'buffer' size. +%% When things slow down and the moving average falls below +%% 40% of the current 'buffer' size, we halve the 'buffer' size. +%% +%% To calculate the moving average we do (MovAvg + DataLen) div 2. +%% This means that the moving average will change very quickly when +%% DataLen increases or decreases rapidly. That's OK, we want to +%% be reactive, but also setting the buffer size is a pretty fast +%% operation. The formula could be changed to the following if it +%% became a problem: (MovAvg * N + DataLen) div (N + 1). +%% +%% Note that this works best when active,N uses low values of N. +%% We don't want to accumulate too much data because we resize +%% the buffer. + +init_dynamic_buffer_size(#{dynamic_buffer_initial_size := DynamicBuffer}) -> + DynamicBuffer; +init_dynamic_buffer_size(#{dynamic_buffer := {LowDynamicBuffer, _}}) -> + LowDynamicBuffer; +init_dynamic_buffer_size(_) -> + false. + +maybe_resize_buffer(State=#state{dynamic_buffer_size=false}, _) -> + State; +maybe_resize_buffer(State=#state{transport=Transport, socket=Socket, + opts=#{dynamic_buffer := {LowDynamicBuffer, HighDynamicBuffer}}, + dynamic_buffer_size=BufferSize0, dynamic_buffer_moving_average=MovingAvg0}, Data) -> + DataLen = byte_size(Data), + MovingAvg = (MovingAvg0 + DataLen) div 2, + if + BufferSize0 < HighDynamicBuffer andalso MovingAvg > BufferSize0 * 0.9 -> + BufferSize = min(BufferSize0 * 2, HighDynamicBuffer), + ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])), + State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize}; + BufferSize0 > LowDynamicBuffer andalso MovingAvg < BufferSize0 * 0.4 -> + BufferSize = max(BufferSize0 div 2, LowDynamicBuffer), + ok = maybe_socket_error(State, Transport:setopts(Socket, [{buffer, BufferSize}])), + State#state{dynamic_buffer_moving_average=MovingAvg, dynamic_buffer_size=BufferSize}; + true -> + State#state{dynamic_buffer_moving_average=MovingAvg} + end. diff --git a/src/cowboy_handler.erl b/src/cowboy_handler.erl index c0f7ff7..1989512 100644 --- a/src/cowboy_handler.erl +++ b/src/cowboy_handler.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_http.erl b/src/cowboy_http.erl index c9bceed..10eb519 100644 --- a/src/cowboy_http.erl +++ b/src/cowboy_http.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2016-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -12,9 +12,12 @@ %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +%% @todo Worth renaming to cowboy_http1. +%% @todo Change use of cow_http to cow_http1 where appropriate. -module(cowboy_http). -export([init/6]). +-export([loop/1]). -export([system_continue/3]). -export([system_terminate/4]). @@ -22,11 +25,16 @@ -type opts() :: #{ active_n => pos_integer(), + alpn_default_protocol => http | http2, chunked => boolean(), compress_buffering => boolean(), compress_threshold => non_neg_integer(), connection_type => worker | supervisor, + dynamic_buffer => false | {pos_integer(), pos_integer()}, + dynamic_buffer_initial_average => non_neg_integer(), + dynamic_buffer_initial_size => pos_integer(), env => cowboy_middleware:env(), + hibernate => boolean(), http10_keepalive => boolean(), idle_timeout => timeout(), inactivity_timeout => timeout(), @@ -45,8 +53,10 @@ metrics_req_filter => fun((cowboy_req:req()) -> map()), metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()), middlewares => [module()], + protocols => [http | http2], proxy_header => boolean(), request_timeout => timeout(), + reset_idle_timeout_on_send => boolean(), sendfile => boolean(), shutdown_timeout => timeout(), stream_handlers => [module()], @@ -134,6 +144,10 @@ %% Flow requested for the current stream. flow = infinity :: non_neg_integer() | infinity, + %% Dynamic buffer moving average and current buffer size. + dynamic_buffer_size :: pos_integer() | false, + dynamic_buffer_moving_average :: non_neg_integer(), + %% Identifier for the stream currently being written. %% Note that out_streamid =< in_streamid. out_streamid = 1 :: pos_integer(), @@ -157,9 +171,11 @@ -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info(), cowboy:opts()) -> ok. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> - Peer0 = Transport:peername(Socket), - Sock0 = Transport:sockname(Socket), - Cert1 = case Transport:name() of + {ok, Peer} = maybe_socket_error(undefined, Transport:peername(Socket), + 'A socket error occurred when retrieving the peer name.'), + {ok, Sock} = maybe_socket_error(undefined, Transport:sockname(Socket), + 'A socket error occurred when retrieving the sock name.'), + CertResult = case Transport:name() of ssl -> case ssl:peercert(Socket) of {error, no_peercert} -> @@ -170,36 +186,33 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> _ -> {ok, undefined} end, - case {Peer0, Sock0, Cert1} of - {{ok, Peer}, {ok, Sock}, {ok, Cert}} -> - State = #state{ - parent=Parent, ref=Ref, socket=Socket, - transport=Transport, proxy_header=ProxyHeader, opts=Opts, - peer=Peer, sock=Sock, cert=Cert, - last_streamid=maps:get(max_keepalive, Opts, 1000)}, - setopts_active(State), - loop(set_timeout(State, request_timeout)); - {{error, Reason}, _, _} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the peer name.'}); - {_, {error, Reason}, _} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the sock name.'}); - {_, _, {error, Reason}} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the client TLS certificate.'}) - end. + {ok, Cert} = maybe_socket_error(undefined, CertResult, + 'A socket error occurred when retrieving the client TLS certificate.'), + State = #state{ + parent=Parent, ref=Ref, socket=Socket, + transport=Transport, proxy_header=ProxyHeader, opts=Opts, + peer=Peer, sock=Sock, cert=Cert, + dynamic_buffer_size=init_dynamic_buffer_size(Opts), + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), + last_streamid=maps:get(max_keepalive, Opts, 1000)}, + safe_setopts_active(State), + before_loop(set_timeout(State, request_timeout)). + +-include("cowboy_dynamic_buffer.hrl"). setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> - N = maps:get(active_n, Opts, 100), + N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). +safe_setopts_active(State) -> + ok = maybe_socket_error(State, setopts_active(State)). + active(State) -> - setopts_active(State), + safe_setopts_active(State), State#state{active=true}. passive(State=#state{socket=Socket, transport=Transport}) -> - Transport:setopts(Socket, [{active, false}]), + ok = maybe_socket_error(State, Transport:setopts(Socket, [{active, false}])), Messages = Transport:messages(), flush_passive(Socket, Messages), State#state{active=false}. @@ -214,6 +227,13 @@ flush_passive(Socket, Messages) -> ok end. +before_loop(State=#state{opts=#{hibernate := true}}) -> + proc_lib:hibernate(?MODULE, loop, [State]); +before_loop(State) -> + loop(State). + +-spec loop(#state{}) -> ok. + loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, buffer=Buffer, timer=TimerRef, children=Children, in_streamid=InStreamID, last_streamid=LastStreamID}) -> @@ -222,11 +242,13 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, receive %% Discard data coming in after the last request %% we want to process was received fully. - {OK, Socket, _} when OK =:= element(1, Messages), InStreamID > LastStreamID -> - loop(State); + {OK, Socket, Data} when OK =:= element(1, Messages), InStreamID > LastStreamID -> + State1 = maybe_resize_buffer(State, Data), + before_loop(State1); %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(<< Buffer/binary, Data/binary >>, State); + State1 = maybe_resize_buffer(State, Data), + parse(<< Buffer/binary, Data/binary >>, State1); {Closed, Socket} when Closed =:= element(2, Messages) -> terminate(State, {socket_error, closed, 'The socket has been closed.'}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> @@ -234,45 +256,60 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, {Passive, Socket} when Passive =:= element(4, Messages); %% Hardcoded for compatibility with Ranch 1.x. Passive =:= tcp_passive; Passive =:= ssl_passive -> - setopts_active(State), - loop(State); + safe_setopts_active(State), + before_loop(State); %% Timeouts. {timeout, Ref, {shutdown, Pid}} -> cowboy_children:shutdown_timeout(Children, Ref, Pid), - loop(State); + before_loop(State); {timeout, TimerRef, Reason} -> timeout(State, Reason); {timeout, _, _} -> - loop(State); + before_loop(State); %% System messages. {'EXIT', Parent, shutdown} -> Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'}, - loop(initiate_closing(State, Reason)); + before_loop(initiate_closing(State, Reason)); {'EXIT', Parent, Reason} -> terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'}); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], State); %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> - loop(info(State, StreamID, Msg)); + before_loop(info(State, StreamID, Msg)); %% Exit signal from children. Msg = {'EXIT', Pid, _} -> - loop(down(State, Pid, Msg)); + before_loop(down(State, Pid, Msg)); %% Calls from supervisor module. {'$gen_call', From, Call} -> cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE), - loop(State); + before_loop(State); %% Unknown messages. Msg -> cowboy:log(warning, "Received stray message ~p.~n", [Msg], Opts), - loop(State) + before_loop(State) after InactivityTimeout -> terminate(State, {internal_error, timeout, 'No message or data received before timeout.'}) end. -%% We do not set request_timeout if there are active streams. -set_timeout(State=#state{streams=[_|_]}, request_timeout) -> - State; +%% For HTTP/1.1 we have two types of timeouts: the request_timeout +%% is used when there is no currently ongoing request. This means +%% that we are not currently sending or receiving data and that +%% the next data to be received will be a new request. The +%% request_timeout is set once when we no longer have ongoing +%% requests, and runs until the full set of request headers +%% is received. It is not reset. +%% +%% After that point we use the idle_timeout. We continue using +%% the idle_timeout if pipelined requests come in: we are doing +%% work and just want to ensure the socket is not half-closed. +%% We continue using the idle_timeout up until there is no +%% ongoing request. This includes requests that were processed +%% and for which we only want to skip the body. Once the body +%% has been read fully we can go back to request_timeout. The +%% idle_timeout is reset every time we receive data and, +%% optionally, every time we send data. + %% We do not set request_timeout if we are skipping a body. set_timeout(State=#state{in_state=#ps_body{}}, request_timeout) -> State; @@ -282,6 +319,7 @@ set_timeout(State=#state{streams=[], in_state=InState}, idle_timeout) when element(1, InState) =/= ps_body -> State; %% Otherwise we can set the timeout. +%% @todo Don't do this so often, use a strategy similar to Websocket/H2 if possible. set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) -> State = cancel_timeout(State0), Default = case Name of @@ -299,6 +337,14 @@ set_timeout(State0=#state{opts=Opts, overriden_opts=Override}, Name) -> end, State#state{timer=TimerRef}. +maybe_reset_idle_timeout(State=#state{opts=Opts}) -> + case maps:get(reset_idle_timeout_on_send, Opts, false) of + true -> + set_timeout(State, idle_timeout); + false -> + State + end. + cancel_timeout(State=#state{timer=TimerRef}) -> ok = case TimerRef of undefined -> @@ -306,7 +352,7 @@ cancel_timeout(State=#state{timer=TimerRef}) -> _ -> %% Do a synchronous cancel and remove the message if any %% to avoid receiving stray messages. - _ = erlang:cancel_timer(TimerRef), + _ = erlang:cancel_timer(TimerRef, [{async, false}, {info, false}]), receive {timeout, TimerRef, _} -> ok after 0 -> @@ -327,12 +373,12 @@ timeout(State, idle_timeout) -> 'Connection idle longer than configuration allows.'}). parse(<<>>, State) -> - loop(State#state{buffer= <<>>}); + before_loop(State#state{buffer= <<>>}); %% Do not process requests that come in after the last request %% and discard the buffer if any to save memory. parse(_, State=#state{in_streamid=InStreamID, in_state=#ps_request_line{}, last_streamid=LastStreamID}) when InStreamID > LastStreamID -> - loop(State#state{buffer= <<>>}); + before_loop(State#state{buffer= <<>>}); parse(Buffer, State=#state{in_state=#ps_request_line{empty_lines=EmptyLines}}) -> after_parse(parse_request(Buffer, State, EmptyLines)); parse(Buffer, State=#state{in_state=PS=#ps_header{headers=Headers, name=undefined}}) -> @@ -355,16 +401,27 @@ after_parse({request, Req=#{streamid := StreamID, method := Method, TE = maps:get(<<"te">>, Headers, undefined), Streams = [#stream{id=StreamID, state=StreamState, method=Method, version=Version, te=TE}|Streams0], - State1 = case maybe_req_close(State0, Headers, Version) of - close -> State0#state{streams=Streams, last_streamid=StreamID, flow=Flow}; - keepalive -> State0#state{streams=Streams, flow=Flow} + State1 = State0#state{streams=Streams, flow=Flow}, + State2 = case maybe_req_close(State1, Headers, Version) of + close -> + State1#state{last_streamid=StreamID}; + keepalive -> + State1; + bad_connection_header -> + error_terminate(400, State1, {connection_error, protocol_error, + 'The Connection header is invalid. (RFC7230 6.1)'}) end, - State = set_timeout(State1, idle_timeout), + State = set_timeout(State2, idle_timeout), parse(Buffer, commands(State, StreamID, Commands)) catch Class:Exception:Stacktrace -> cowboy:log(cowboy_stream:make_error_log(init, [StreamID, Req, Opts], Class, Exception, Stacktrace), Opts), + %% We do not reset the idle timeout on send here + %% because an error occurred in the application. While we + %% are keeping the connection open for further requests we + %% do not want to keep the connection up too long if no + %% additional requests come in. early_error(500, State0, {internal_error, {Class, Exception}, 'Unhandled exception in cowboy_stream:init/3.'}, Req), parse(Buffer, State0) @@ -377,10 +434,7 @@ after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer {Commands, StreamState} -> Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream#stream{state=StreamState}), - State1 = set_timeout(State0, case IsFin of - fin -> request_timeout; - nofin -> idle_timeout - end), + State1 = set_timeout(State0, idle_timeout), State = update_flow(IsFin, Data, State1#state{streams=Streams}), parse(Buffer, commands(State, StreamID, Commands)) catch Class:Exception:Stacktrace -> @@ -393,13 +447,13 @@ after_parse({data, StreamID, IsFin, Data, State0=#state{opts=Opts, buffer=Buffer end; %% No corresponding stream. We must skip the body of the previous request %% in order to process the next one. -after_parse({data, _, IsFin, _, State}) -> - loop(set_timeout(State, case IsFin of +after_parse({data, _, IsFin, _, State=#state{buffer=Buffer}}) -> + parse(Buffer, set_timeout(State, case IsFin of fin -> request_timeout; nofin -> idle_timeout end)); after_parse({more, State}) -> - loop(set_timeout(State, idle_timeout)). + before_loop(set_timeout(State, idle_timeout)). update_flow(fin, _, State) -> %% This function is only called after parsing, therefore we @@ -459,8 +513,13 @@ parse_request(Buffer, State=#state{opts=Opts, in_streamid=InStreamID}, EmptyLine 'The TRACE method is currently not implemented. (RFC7231 4.3.8)'}); %% Accept direct HTTP/2 only at the beginning of the connection. << "PRI * HTTP/2.0\r\n", _/bits >> when InStreamID =:= 1 -> - %% @todo Might be worth throwing to get a clean stacktrace. - http2_upgrade(State, Buffer); + case lists:member(http2, maps:get(protocols, Opts, [http2, http])) of + true -> + http2_upgrade(State, Buffer); + false -> + error_terminate(501, State, {connection_error, no_error, + 'Prior knowledge upgrade to HTTP/2 is disabled by configuration.'}) + end; _ -> parse_method(Buffer, State, <<>>, maps:get(max_method_length, Opts, 32)) @@ -748,41 +807,44 @@ default_port(_) -> 80. %% End of request parsing. request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock, cert=Cert, - proxy_header=ProxyHeader, in_streamid=StreamID, in_state= + opts=Opts, proxy_header=ProxyHeader, in_streamid=StreamID, in_state= PS=#ps_header{method=Method, path=Path, qs=Qs, version=Version}}, - Headers0, Host, Port) -> + Headers, Host, Port) -> Scheme = case Transport:secure() of true -> <<"https">>; false -> <<"http">> end, - {Headers, HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers0 of + {HasBody, BodyLength, TDecodeFun, TDecodeState} = case Headers of + #{<<"transfer-encoding">> := _, <<"content-length">> := _} -> + error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers}}, + {stream_error, protocol_error, + 'The request had both transfer-encoding and content-length headers. (RFC7230 3.3.3)'}); #{<<"transfer-encoding">> := TransferEncoding0} -> try cow_http_hd:parse_transfer_encoding(TransferEncoding0) of [<<"chunked">>] -> - {maps:remove(<<"content-length">>, Headers0), - true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}}; + {true, undefined, fun cow_http_te:stream_chunked/2, {0, 0}}; _ -> - error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}}, + error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers}}, {stream_error, protocol_error, 'Cowboy only supports transfer-encoding: chunked. (RFC7230 3.3.1)'}) catch _:_ -> - error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}}, + error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers}}, {stream_error, protocol_error, 'The transfer-encoding header is invalid. (RFC7230 3.3.1)'}) end; #{<<"content-length">> := <<"0">>} -> - {Headers0, false, 0, undefined, undefined}; + {false, 0, undefined, undefined}; #{<<"content-length">> := BinLength} -> Length = try cow_http_hd:parse_content_length(BinLength) catch _:_ -> - error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers0}}, + error_terminate(400, State0#state{in_state=PS#ps_header{headers=Headers}}, {stream_error, protocol_error, 'The content-length header is invalid. (RFC7230 3.3.2)'}) end, - {Headers0, true, Length, fun cow_http_te:stream_identity/2, {0, Length}}; + {true, Length, fun cow_http_te:stream_identity/2, {0, Length}}; _ -> - {Headers0, false, 0, undefined, undefined} + {false, 0, undefined, undefined} end, Req0 = #{ ref => Ref, @@ -809,7 +871,7 @@ request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock undefined -> Req0; _ -> Req0#{proxy_header => ProxyHeader} end, - case is_http2_upgrade(Headers, Version) of + case is_http2_upgrade(Headers, Version, Opts) of false -> State = case HasBody of true -> @@ -831,12 +893,13 @@ request(Buffer, State0=#state{ref=Ref, transport=Transport, peer=Peer, sock=Sock %% HTTP/2 upgrade. -%% @todo We must not upgrade to h2c over a TLS connection. is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade, - <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1') -> + <<"http2-settings">> := HTTP2Settings}, 'HTTP/1.1', Opts) -> Conns = cow_http_hd:parse_connection(Conn), - case {lists:member(<<"upgrade">>, Conns), lists:member(<<"http2-settings">>, Conns)} of - {true, true} -> + case lists:member(<<"upgrade">>, Conns) + andalso lists:member(<<"http2-settings">>, Conns) + andalso lists:member(http2, maps:get(protocols, Opts, [http2, http])) of + true -> Protocols = cow_http_hd:parse_upgrade(Upgrade), case lists:member(<<"h2c">>, Protocols) of true -> @@ -847,17 +910,17 @@ is_http2_upgrade(#{<<"connection">> := Conn, <<"upgrade">> := Upgrade, _ -> false end; -is_http2_upgrade(_, _) -> +is_http2_upgrade(_, _, _) -> false. %% Prior knowledge upgrade, without an HTTP/1.1 request. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, Buffer) -> + proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert}, Buffer) -> case Transport:secure() of false -> _ = cancel_timeout(State), - cowboy_http2:init(Parent, Ref, Socket, Transport, - ProxyHeader, Opts, Peer, Sock, Cert, Buffer); + cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader, + opts_for_upgrade(State), Peer, Sock, Cert, Buffer); true -> error_terminate(400, State, {connection_error, protocol_error, 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'}) @@ -865,22 +928,37 @@ http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Tran %% Upgrade via an HTTP/1.1 request. http2_upgrade(State=#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert}, + proxy_header=ProxyHeader, peer=Peer, sock=Sock, cert=Cert}, Buffer, HTTP2Settings, Req) -> - %% @todo - %% However if the client sent a body, we need to read the body in full - %% and if we can't do that, return a 413 response. Some options are in order. - %% Always half-closed stream coming from this side. - try cow_http_hd:parse_http2_settings(HTTP2Settings) of - Settings -> - _ = cancel_timeout(State), - cowboy_http2:init(Parent, Ref, Socket, Transport, - ProxyHeader, Opts, Peer, Sock, Cert, Buffer, Settings, Req) - catch _:_ -> - error_terminate(400, State, {connection_error, protocol_error, - 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'}) + case Transport:secure() of + false -> + %% @todo + %% However if the client sent a body, we need to read the body in full + %% and if we can't do that, return a 413 response. Some options are in order. + %% Always half-closed stream coming from this side. + try cow_http_hd:parse_http2_settings(HTTP2Settings) of + Settings -> + _ = cancel_timeout(State), + cowboy_http2:init(Parent, Ref, Socket, Transport, ProxyHeader, + opts_for_upgrade(State), Peer, Sock, Cert, Buffer, Settings, Req) + catch _:_ -> + error_terminate(400, State, {connection_error, protocol_error, + 'The HTTP2-Settings header must contain a base64 SETTINGS payload. (RFC7540 3.2, RFC7540 3.2.1)'}) + end; + true -> + error_terminate(400, State, {connection_error, protocol_error, + 'Clients that support HTTP/2 over TLS MUST use ALPN. (RFC7540 3.4)'}) end. +opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=false}) -> + Opts; +opts_for_upgrade(#state{opts=Opts, dynamic_buffer_size=Size, + dynamic_buffer_moving_average=MovingAvg}) -> + Opts#{ + dynamic_buffer_initial_average => MovingAvg, + dynamic_buffer_initial_size => Size + }. + %% Request body parsing. parse_body(Buffer, State=#state{in_streamid=StreamID, in_state= @@ -953,6 +1031,11 @@ info(State=#state{opts=Opts, streams=Streams0}, StreamID, Msg) -> end. %% Commands. +%% +%% The order in which the commands are given matters. Cowboy may +%% stop processing commands after the 'stop' command or when an +%% error occurred, such as a socket error. Critical commands such +%% as 'spawn' should always be given first. commands(State, _, []) -> State; @@ -1006,19 +1089,20 @@ commands(State=#state{out_state=wait, out_streamid=StreamID}, StreamID, commands(State, StreamID, [{error_response, _, _, _}|Tail]) -> commands(State, StreamID, Tail); %% Send an informational response. -commands(State=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, +commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, streams=Streams}, StreamID, [{inform, StatusCode, Headers}|Tail]) -> %% @todo I'm pretty sure the last stream in the list is the one we want %% considering all others are queued. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams), _ = case Version of 'HTTP/1.1' -> - Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', - headers_to_list(Headers))); + ok = maybe_socket_error(State0, Transport:send(Socket, + cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)))); %% Do not send informational responses to HTTP/1.0 clients. (RFC7231 6.2) 'HTTP/1.0' -> ok end, + State = maybe_reset_idle_timeout(State0), commands(State, StreamID, Tail); %% Send a full response. %% @@ -1031,17 +1115,18 @@ commands(State0=#state{socket=Socket, transport=Transport, out_state=wait, strea %% considering all others are queued. #stream{version=Version} = lists:keyfind(StreamID, #stream.id, Streams), {State1, Headers} = connection(State0, Headers0, StreamID, Version), - State = State1#state{out_state=done}, + State2 = State1#state{out_state=done}, %% @todo Ensure content-length is set. 204 must never have content-length set. Response = cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)), %% @todo 204 and 304 responses must not include a response body. (RFC7230 3.3.1, RFC7230 3.3.2) case Body of {sendfile, _, _, _} -> - Transport:send(Socket, Response), - sendfile(State, Body); + ok = maybe_socket_error(State2, Transport:send(Socket, Response)), + sendfile(State2, Body); _ -> - Transport:send(Socket, [Response, Body]) + ok = maybe_socket_error(State2, Transport:send(Socket, [Response, Body])) end, + State = maybe_reset_idle_timeout(State2), commands(State, StreamID, Tail); %% Send response headers and initiate chunked encoding or streaming. commands(State0=#state{socket=Socket, transport=Transport, @@ -1078,8 +1163,10 @@ commands(State0=#state{socket=Socket, transport=Transport, trailers -> Headers1; _ -> maps:remove(<<"trailer">>, Headers1) end, - {State, Headers} = connection(State1, Headers2, StreamID, Version), - Transport:send(Socket, cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers))), + {State2, Headers} = connection(State1, Headers2, StreamID, Version), + ok = maybe_socket_error(State2, Transport:send(Socket, + cow_http:response(StatusCode, 'HTTP/1.1', headers_to_list(Headers)))), + State = maybe_reset_idle_timeout(State2), commands(State, StreamID, Tail); %% Send a response body chunk. %% @todo We need to kill the stream if it tries to send data before headers. @@ -1098,27 +1185,33 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out Stream0=#stream{method= <<"HEAD">>} -> Stream0; Stream0 when Size =:= 0, IsFin =:= fin, OutState =:= chunked -> - Transport:send(Socket, <<"0\r\n\r\n">>), + ok = maybe_socket_error(State0, + Transport:send(Socket, <<"0\r\n\r\n">>)), Stream0; Stream0 when Size =:= 0 -> Stream0; Stream0 when is_tuple(Data), OutState =:= chunked -> - Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>]), + ok = maybe_socket_error(State0, + Transport:send(Socket, [integer_to_binary(Size, 16), <<"\r\n">>])), sendfile(State0, Data), - Transport:send(Socket, - case IsFin of - fin -> <<"\r\n0\r\n\r\n">>; - nofin -> <<"\r\n">> - end), + ok = maybe_socket_error(State0, + Transport:send(Socket, + case IsFin of + fin -> <<"\r\n0\r\n\r\n">>; + nofin -> <<"\r\n">> + end) + ), Stream0; Stream0 when OutState =:= chunked -> - Transport:send(Socket, [ - integer_to_binary(Size, 16), <<"\r\n">>, Data, - case IsFin of - fin -> <<"\r\n0\r\n\r\n">>; - nofin -> <<"\r\n">> - end - ]), + ok = maybe_socket_error(State0, + Transport:send(Socket, [ + integer_to_binary(Size, 16), <<"\r\n">>, Data, + case IsFin of + fin -> <<"\r\n0\r\n\r\n">>; + nofin -> <<"\r\n">> + end + ]) + ), Stream0; Stream0 when OutState =:= streaming -> #stream{local_sent_size=SentSize0, local_expected_size=ExpectedSize} = Stream0, @@ -1130,34 +1223,39 @@ commands(State0=#state{socket=Socket, transport=Transport, streams=Streams0, out is_tuple(Data) -> sendfile(State0, Data); true -> - Transport:send(Socket, Data) + ok = maybe_socket_error(State0, Transport:send(Socket, Data)) end, Stream0#stream{local_sent_size=SentSize} end, - State = case IsFin of + State1 = case IsFin of fin -> State0#state{out_state=done}; nofin -> State0 end, + State = maybe_reset_idle_timeout(State1), Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream), commands(State#state{streams=Streams}, StreamID, Tail); -commands(State=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState}, +commands(State0=#state{socket=Socket, transport=Transport, streams=Streams, out_state=OutState}, StreamID, [{trailers, Trailers}|Tail]) -> case stream_te(OutState, lists:keyfind(StreamID, #stream.id, Streams)) of trailers -> - Transport:send(Socket, [ - <<"0\r\n">>, - cow_http:headers(maps:to_list(Trailers)), - <<"\r\n">> - ]); + ok = maybe_socket_error(State0, + Transport:send(Socket, [ + <<"0\r\n">>, + cow_http:headers(maps:to_list(Trailers)), + <<"\r\n">> + ]) + ); no_trailers -> - Transport:send(Socket, <<"0\r\n\r\n">>); + ok = maybe_socket_error(State0, + Transport:send(Socket, <<"0\r\n\r\n">>)); not_chunked -> ok end, - commands(State#state{out_state=done}, StreamID, Tail); + State = maybe_reset_idle_timeout(State0#state{out_state=done}), + commands(State, StreamID, Tail); %% Protocol takeover. commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transport, - out_state=OutState, opts=Opts, buffer=Buffer, children=Children}, StreamID, + out_state=OutState, buffer=Buffer, children=Children}, StreamID, [{switch_protocol, Headers, Protocol, InitialState}|_Tail]) -> %% @todo If there's streams opened after this one, fail instead of 101. State1 = cancel_timeout(State0), @@ -1174,28 +1272,26 @@ commands(State0=#state{ref=Ref, parent=Parent, socket=Socket, transport=Transpor _ -> State end, #stream{state=StreamState} = lists:keyfind(StreamID, #stream.id, Streams), - %% @todo We need to shutdown processes here first. stream_call_terminate(StreamID, switch_protocol, StreamState, State), %% Terminate children processes and flush any remaining messages from the mailbox. cowboy_children:terminate(Children), flush(Parent), - Protocol:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, InitialState); + %% Turn off the trap_exit process flag + %% since this process will no longer be a supervisor. + process_flag(trap_exit, false), + Protocol:takeover(Parent, Ref, Socket, Transport, + opts_for_upgrade(State), Buffer, InitialState); %% Set options dynamically. -commands(State0=#state{overriden_opts=Opts}, - StreamID, [{set_options, SetOpts}|Tail]) -> - State1 = case SetOpts of - #{idle_timeout := IdleTimeout} -> - set_timeout(State0#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}}, +commands(State0, StreamID, [{set_options, SetOpts}|Tail]) -> + State = maps:fold(fun + (chunked, Chunked, StateF=#state{overriden_opts=Opts}) -> + StateF#state{overriden_opts=Opts#{chunked => Chunked}}; + (idle_timeout, IdleTimeout, StateF=#state{overriden_opts=Opts}) -> + set_timeout(StateF#state{overriden_opts=Opts#{idle_timeout => IdleTimeout}}, idle_timeout); - _ -> - State0 - end, - State = case SetOpts of - #{chunked := Chunked} -> - State1#state{overriden_opts=Opts#{chunked => Chunked}}; - _ -> - State1 - end, + (_, _, StateF) -> + StateF + end, State0, SetOpts), commands(State, StreamID, Tail); %% Stream shutdown. commands(State, StreamID, [stop|Tail]) -> @@ -1238,10 +1334,12 @@ sendfile(State=#state{socket=Socket, transport=Transport, opts=Opts}, {sendfile, Offset, Bytes, Path}) -> try %% When sendfile is disabled we explicitly use the fallback. - _ = case maps:get(sendfile, Opts, true) of - true -> Transport:sendfile(Socket, Path, Offset, Bytes); - false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, []) - end, + {ok, _} = maybe_socket_error(State, + case maps:get(sendfile, Opts, true) of + true -> Transport:sendfile(Socket, Path, Offset, Bytes); + false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, []) + end + ), ok catch _:_ -> terminate(State, {socket_error, sendfile_crash, @@ -1312,20 +1410,24 @@ stream_terminate(State0=#state{opts=Opts, in_streamid=InStreamID, in_state=InSta end. stream_next(State0=#state{opts=Opts, active=Active, out_streamid=OutStreamID, streams=Streams}) -> + %% Enable active mode again if it was disabled. + State1 = case Active of + true -> State0; + false -> active(State0) + end, NextOutStreamID = OutStreamID + 1, case lists:keyfind(NextOutStreamID, #stream.id, Streams) of false -> - State0#state{out_streamid=NextOutStreamID, out_state=wait}; + State = State1#state{out_streamid=NextOutStreamID, out_state=wait}, + %% There are no streams remaining. We therefore can + %% and want to switch back to the request_timeout. + set_timeout(State, request_timeout); #stream{queue=Commands} -> - State = case Active of - true -> State0; - false -> active(State0) - end, %% @todo Remove queue from the stream. %% We set the flow to the initial flow size even though %% we might have sent some data through already due to pipelining. Flow = maps:get(initial_stream_flow_size, Opts, 65535), - commands(State#state{flow=Flow, out_streamid=NextOutStreamID, out_state=wait}, + commands(State1#state{flow=Flow, out_streamid=NextOutStreamID, out_state=wait}, NextOutStreamID, Commands) end. @@ -1341,17 +1443,23 @@ stream_call_terminate(StreamID, Reason, StreamState, #state{opts=Opts}) -> maybe_req_close(#state{opts=#{http10_keepalive := false}}, _, 'HTTP/1.0') -> close; maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.0') -> - Conns = cow_http_hd:parse_connection(Conn), - case lists:member(<<"keep-alive">>, Conns) of - true -> keepalive; - false -> close + try cow_http_hd:parse_connection(Conn) of + Conns -> + case lists:member(<<"keep-alive">>, Conns) of + true -> keepalive; + false -> close + end + catch _:_ -> + bad_connection_header end; maybe_req_close(_, _, 'HTTP/1.0') -> close; maybe_req_close(_, #{<<"connection">> := Conn}, 'HTTP/1.1') -> - case connection_hd_is_close(Conn) of + try connection_hd_is_close(Conn) of true -> close; false -> keepalive + catch _:_ -> + bad_connection_header end; maybe_req_close(_, _, _) -> keepalive. @@ -1420,37 +1528,55 @@ error_terminate(StatusCode, State=#state{ref=Ref, peer=Peer, in_state=StreamStat early_error(StatusCode, State, Reason, PartialReq) -> early_error(StatusCode, State, Reason, PartialReq, #{}). -early_error(StatusCode0, #state{socket=Socket, transport=Transport, +early_error(StatusCode0, State=#state{socket=Socket, transport=Transport, opts=Opts, in_streamid=StreamID}, Reason, PartialReq, RespHeaders0) -> RespHeaders1 = RespHeaders0#{<<"content-length">> => <<"0">>}, Resp = {response, StatusCode0, RespHeaders1, <<>>}, try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of {response, StatusCode, RespHeaders, RespBody} -> - Transport:send(Socket, [ - cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)), - %% @todo We shouldn't send the body when the method is HEAD. - %% @todo Technically we allow the sendfile tuple. - RespBody - ]) + ok = maybe_socket_error(State, + Transport:send(Socket, [ + cow_http:response(StatusCode, 'HTTP/1.1', maps:to_list(RespHeaders)), + %% @todo We shouldn't send the body when the method is HEAD. + %% @todo Technically we allow the sendfile tuple. + RespBody + ]) + ) catch Class:Exception:Stacktrace -> cowboy:log(cowboy_stream:make_error_log(early_error, [StreamID, Reason, PartialReq, Resp, Opts], Class, Exception, Stacktrace), Opts), %% We still need to send an error response, so send what we initially %% wanted to send. It's better than nothing. - Transport:send(Socket, cow_http:response(StatusCode0, - 'HTTP/1.1', maps:to_list(RespHeaders1))) - end, - ok. + ok = maybe_socket_error(State, + Transport:send(Socket, cow_http:response(StatusCode0, + 'HTTP/1.1', maps:to_list(RespHeaders1))) + ) + end. initiate_closing(State=#state{streams=[]}, Reason) -> terminate(State, Reason); -initiate_closing(State=#state{streams=[_Stream|Streams], +initiate_closing(State=#state{streams=Streams, out_streamid=OutStreamID}, Reason) -> - terminate_all_streams(State, Streams, Reason), - State#state{last_streamid=OutStreamID}. - --spec terminate(_, _) -> no_return(). + {value, LastStream, TerminatedStreams} + = lists:keytake(OutStreamID, #stream.id, Streams), + terminate_all_streams(State, TerminatedStreams, Reason), + State#state{streams=[LastStream], last_streamid=OutStreamID}. + +%% Function replicated in cowboy_http2. +maybe_socket_error(State, {error, closed}) -> + terminate(State, {socket_error, closed, 'The socket has been closed.'}); +maybe_socket_error(State, Reason) -> + maybe_socket_error(State, Reason, 'An error has occurred on the socket.'). + +maybe_socket_error(_, Result = ok, _) -> + Result; +maybe_socket_error(_, Result = {ok, _}, _) -> + Result; +maybe_socket_error(State, {error, Reason}, Human) -> + terminate(State, {socket_error, Reason, Human}). + +-spec terminate(#state{} | undefined, _) -> no_return(). terminate(undefined, Reason) -> exit({shutdown, Reason}); terminate(State=#state{streams=Streams, children=Children}, Reason) -> @@ -1484,6 +1610,9 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) -> terminate_linger_before_loop(State, TimerRef, Messages) -> %% We may already be in active mode when we do this %% but it's OK because we are shutting down anyway. + %% + %% We specially handle the socket error to terminate + %% when an error occurs. case setopts_active(State) of ok -> terminate_linger_loop(State, TimerRef, Messages); @@ -1511,12 +1640,12 @@ terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) -> -spec system_continue(_, _, #state{}) -> ok. system_continue(_, _, State) -> - loop(State). + before_loop(State). -spec system_terminate(any(), _, _, #state{}) -> no_return(). system_terminate(Reason0, _, _, State) -> Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'}, - loop(initiate_closing(State, Reason)). + before_loop(initiate_closing(State, Reason)). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. system_code_change(Misc, _, _, _) -> diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 7440d91..0d22fa1 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2015-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -17,6 +17,7 @@ -export([init/6]). -export([init/10]). -export([init/12]). +-export([loop/2]). -export([system_continue/3]). -export([system_terminate/4]). @@ -24,15 +25,20 @@ -type opts() :: #{ active_n => pos_integer(), + alpn_default_protocol => http | http2, compress_buffering => boolean(), compress_threshold => non_neg_integer(), connection_type => worker | supervisor, connection_window_margin_size => 0..16#7fffffff, connection_window_update_threshold => 0..16#7fffffff, + dynamic_buffer => false | {pos_integer(), pos_integer()}, + dynamic_buffer_initial_average => non_neg_integer(), + dynamic_buffer_initial_size => pos_integer(), enable_connect_protocol => boolean(), env => cowboy_middleware:env(), goaway_initial_timeout => timeout(), goaway_complete_timeout => timeout(), + hibernate => boolean(), idle_timeout => timeout(), inactivity_timeout => timeout(), initial_connection_window_size => 65535..16#7fffffff, @@ -44,10 +50,12 @@ max_connection_window_size => 0..16#7fffffff, max_decode_table_size => non_neg_integer(), max_encode_table_size => non_neg_integer(), + max_fragmented_header_block_size => 16384..16#7fffffff, max_frame_size_received => 16384..16777215, max_frame_size_sent => 16384..16777215 | infinity, max_received_frame_rate => {pos_integer(), timeout()}, max_reset_stream_rate => {pos_integer(), timeout()}, + max_cancel_stream_rate => {pos_integer(), timeout()}, max_stream_buffer_size => non_neg_integer(), max_stream_window_size => 0..16#7fffffff, metrics_callback => cowboy_metrics_h:metrics_callback(), @@ -55,7 +63,9 @@ metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()), middlewares => [module()], preface_timeout => timeout(), + protocols => [http | http2], proxy_header => boolean(), + reset_idle_timeout_on_send => boolean(), sendfile => boolean(), settings_timeout => timeout(), shutdown_timeout => timeout(), @@ -82,6 +92,14 @@ state :: {module, any()} }). +%% We don't want to reset the idle timeout too often, +%% so we don't reset it on data. Instead we reset the +%% number of ticks we have observed. We divide the +%% timeout value by a value and that value becomes +%% the number of ticks at which point we can drop +%% the connection. This value is the number of ticks. +-define(IDLE_TIMEOUT_TICKS, 10). + -record(state, { parent = undefined :: pid(), ref :: ranch:ref(), @@ -92,6 +110,7 @@ %% Timer for idle_timeout; also used for goaway timers. timer = undefined :: undefined | reference(), + idle_timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS, %% Remote address and port for the connection. peer = undefined :: {inet:ip_address(), inet:port_number()}, @@ -114,9 +133,17 @@ reset_rate_num :: undefined | pos_integer(), reset_rate_time :: undefined | integer(), + %% HTTP/2 rapid reset attack protection. + cancel_rate_num :: undefined | pos_integer(), + cancel_rate_time :: undefined | integer(), + %% Flow requested for all streams. flow = 0 :: non_neg_integer(), + %% Dynamic buffer moving average and current buffer size. + dynamic_buffer_size :: pos_integer() | false, + dynamic_buffer_moving_average :: non_neg_integer(), + %% Currently active HTTP/2 streams. Streams may be initiated either %% by the client or by the server through PUSH_PROMISE frames. streams = #{} :: #{cow_http2:streamid() => #stream{}}, @@ -127,11 +154,14 @@ }). -spec init(pid(), ranch:ref(), inet:socket(), module(), - ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> ok. + ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> no_return(). + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> - Peer0 = Transport:peername(Socket), - Sock0 = Transport:sockname(Socket), - Cert1 = case Transport:name() of + {ok, Peer} = maybe_socket_error(undefined, Transport:peername(Socket), + 'A socket error occurred when retrieving the peer name.'), + {ok, Sock} = maybe_socket_error(undefined, Transport:sockname(Socket), + 'A socket error occurred when retrieving the sock name.'), + CertResult = case Transport:name() of ssl -> case ssl:peercert(Socket) of {error, no_peercert} -> @@ -142,40 +172,37 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> _ -> {ok, undefined} end, - case {Peer0, Sock0, Cert1} of - {{ok, Peer}, {ok, Sock}, {ok, Cert}} -> - init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, <<>>); - {{error, Reason}, _, _} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the peer name.'}); - {_, {error, Reason}, _} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the sock name.'}); - {_, _, {error, Reason}} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the client TLS certificate.'}) - end. + {ok, Cert} = maybe_socket_error(undefined, CertResult, + 'A socket error occurred when retrieving the client TLS certificate.'), + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, <<>>). -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info() | undefined, cowboy:opts(), {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()}, - binary() | undefined, binary()) -> ok. + binary() | undefined, binary()) -> no_return(). + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) -> + DynamicBuffer = init_dynamic_buffer_size(Opts), {ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts), + %% Send the preface before doing all the init in case we get a socket error. + ok = maybe_socket_error(undefined, Transport:send(Socket, Preface)), State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, - http2_status=sequence, http2_machine=HTTP2Machine})), - Transport:send(Socket, Preface), - setopts_active(State), + dynamic_buffer_size=DynamicBuffer, + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), + http2_status=sequence, http2_machine=HTTP2Machine}), 0), + safe_setopts_active(State), case Buffer of - <<>> -> loop(State, Buffer); + <<>> -> before_loop(State, Buffer); _ -> parse(State, Buffer) end. -init_rate_limiting(State) -> +init_rate_limiting(State0) -> CurrentTime = erlang:monotonic_time(millisecond), - init_reset_rate_limiting(init_frame_rate_limiting(State, CurrentTime), CurrentTime). + State1 = init_frame_rate_limiting(State0, CurrentTime), + State2 = init_reset_rate_limiting(State1, CurrentTime), + init_cancel_rate_limiting(State2, CurrentTime). init_frame_rate_limiting(State=#state{opts=Opts}, CurrentTime) -> {FrameRateNum, FrameRatePeriod} = maps:get(max_received_frame_rate, Opts, {10000, 10000}), @@ -189,6 +216,12 @@ init_reset_rate_limiting(State=#state{opts=Opts}, CurrentTime) -> reset_rate_num=ResetRateNum, reset_rate_time=add_period(CurrentTime, ResetRatePeriod) }. +init_cancel_rate_limiting(State=#state{opts=Opts}, CurrentTime) -> + {CancelRateNum, CancelRatePeriod} = maps:get(max_cancel_stream_rate, Opts, {500, 10000}), + State#state{ + cancel_rate_num=CancelRateNum, cancel_rate_time=add_period(CurrentTime, CancelRatePeriod) + }. + add_period(_, infinity) -> infinity; add_period(Time, Period) -> Time + Period. @@ -196,15 +229,19 @@ add_period(Time, Period) -> Time + Period. -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info() | undefined, cowboy:opts(), {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()}, - binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok. + binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> no_return(). + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer, _Settings, Req=#{method := Method}) -> + DynamicBuffer = init_dynamic_buffer_size(Opts), {ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts), {ok, StreamID, HTTP2Machine} = cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0), State0 = #state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, + dynamic_buffer_size=DynamicBuffer, + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), http2_status=upgrade, http2_machine=HTTP2Machine}, State1 = headers_frame(State0#state{ http2_machine=HTTP2Machine}, StreamID, Req), @@ -214,21 +251,36 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer <<"connection">> => <<"Upgrade">>, <<"upgrade">> => <<"h2c">> }, ?MODULE, undefined}), %% @todo undefined or #{}? - State = set_idle_timeout(init_rate_limiting(State2#state{http2_status=sequence})), - Transport:send(Socket, Preface), - setopts_active(State), + State = set_idle_timeout(init_rate_limiting(State2#state{http2_status=sequence}), 0), + %% In the case of HTTP/1.1 Upgrade we cannot send the Preface + %% until we send the 101 response. + ok = maybe_socket_error(State, Transport:send(Socket, Preface)), + safe_setopts_active(State), case Buffer of - <<>> -> loop(State, Buffer); + <<>> -> before_loop(State, Buffer); _ -> parse(State, Buffer) end. +-include("cowboy_dynamic_buffer.hrl"). + %% Because HTTP/2 has flow control and Cowboy has other rate limiting %% mechanisms implemented, a very large active_n value should be fine, %% as long as the stream handlers do their work in a timely manner. +%% However large active_n values reduce the impact of dynamic_buffer. setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> - N = maps:get(active_n, Opts, 100), + N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). +safe_setopts_active(State) -> + ok = maybe_socket_error(State, setopts_active(State)). + +before_loop(State=#state{opts=#{hibernate := true}}, Buffer) -> + proc_lib:hibernate(?MODULE, loop, [State, Buffer]); +before_loop(State, Buffer) -> + loop(State, Buffer). + +-spec loop(#state{}, binary()) -> no_return(). + loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, timer=TimerRef, children=Children}, Buffer) -> Messages = Transport:messages(), @@ -236,7 +288,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, receive %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(set_idle_timeout(State), << Buffer/binary, Data/binary >>); + State1 = maybe_resize_buffer(State, Data), + parse(State1#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>); {Closed, Socket} when Closed =:= element(2, Messages) -> Reason = case State#state.http2_status of closing -> {stop, closed, 'The client is going away.'}; @@ -248,53 +301,64 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, {Passive, Socket} when Passive =:= element(4, Messages); %% Hardcoded for compatibility with Ranch 1.x. Passive =:= tcp_passive; Passive =:= ssl_passive -> - setopts_active(State), - loop(State, Buffer); + safe_setopts_active(State), + before_loop(State, Buffer); %% System messages. {'EXIT', Parent, shutdown} -> Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'}, - loop(initiate_closing(State, Reason), Buffer); + before_loop(initiate_closing(State, Reason), Buffer); {'EXIT', Parent, Reason} -> terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'}); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer}); %% Timeouts. {timeout, TimerRef, idle_timeout} -> - terminate(State, {stop, timeout, - 'Connection idle longer than configuration allows.'}); + tick_idle_timeout(State, Buffer); {timeout, Ref, {shutdown, Pid}} -> cowboy_children:shutdown_timeout(Children, Ref, Pid), - loop(State, Buffer); + before_loop(State, Buffer); {timeout, TRef, {cow_http2_machine, Name}} -> - loop(timeout(State, Name, TRef), Buffer); + before_loop(timeout(State, Name, TRef), Buffer); {timeout, TimerRef, {goaway_initial_timeout, Reason}} -> - loop(closing(State, Reason), Buffer); + before_loop(closing(State, Reason), Buffer); {timeout, TimerRef, {goaway_complete_timeout, Reason}} -> terminate(State, {stop, stop_reason(Reason), 'Graceful shutdown timed out.'}); %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> - loop(info(State, StreamID, Msg), Buffer); + before_loop(info(State, StreamID, Msg), Buffer); %% Exit signal from children. Msg = {'EXIT', Pid, _} -> - loop(down(State, Pid, Msg), Buffer); + before_loop(down(State, Pid, Msg), Buffer); %% Calls from supervisor module. {'$gen_call', From, Call} -> cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE), - loop(State, Buffer); + before_loop(State, Buffer); Msg -> cowboy:log(warning, "Received stray message ~p.", [Msg], Opts), - loop(State, Buffer) + before_loop(State, Buffer) after InactivityTimeout -> terminate(State, {internal_error, timeout, 'No message or data received before timeout.'}) end. -set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef}) +tick_idle_timeout(State=#state{idle_timeout_num=?IDLE_TIMEOUT_TICKS}, _) -> + terminate(State, {stop, timeout, + 'Connection idle longer than configuration allows.'}); +tick_idle_timeout(State=#state{idle_timeout_num=TimeoutNum}, Buffer) -> + before_loop(set_idle_timeout(State, TimeoutNum + 1), Buffer). + +set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef}, _) when Status =:= closing_initiated orelse Status =:= closing, TimerRef =/= undefined -> State; -set_idle_timeout(State=#state{opts=Opts}) -> - set_timeout(State, maps:get(idle_timeout, Opts, 60000), idle_timeout). +set_idle_timeout(State=#state{opts=Opts}, TimeoutNum) -> + case maps:get(idle_timeout, Opts, 60000) of + infinity -> + State#state{timer=undefined}; + Timeout -> + set_timeout(State#state{idle_timeout_num=TimeoutNum}, + Timeout div ?IDLE_TIMEOUT_TICKS, idle_timeout) + end. set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) -> ok = case TimerRef0 of @@ -307,6 +371,14 @@ set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) -> end, State#state{timer=TimerRef}. +maybe_reset_idle_timeout(State=#state{opts=Opts}) -> + case maps:get(reset_idle_timeout_on_send, Opts, false) of + true -> + State#state{idle_timeout_num=0}; + false -> + State + end. + %% HTTP/2 protocol parsing. parse(State=#state{http2_status=sequence}, Data) -> @@ -314,7 +386,7 @@ parse(State=#state{http2_status=sequence}, Data) -> {ok, Rest} -> parse(State#state{http2_status=settings}, Rest); more -> - loop(State, Data); + before_loop(State, Data); Error = {connection_error, _, _} -> terminate(State, Error) end; @@ -333,7 +405,7 @@ parse(State=#state{http2_status=Status, http2_machine=HTTP2Machine, streams=Stre more when Status =:= closing, Streams =:= #{} -> terminate(State, {stop, normal, 'The connection is going away.'}); more -> - loop(State, Data) + before_loop(State, Data) end. %% Frame rate flood protection. @@ -383,10 +455,11 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) -> goaway(State#state{http2_machine=HTTP2Machine}, GoAway); {send, SendData, HTTP2Machine} -> %% We may need to send an alarm for each of the streams sending data. - lists:foldl( + State1 = lists:foldl( fun({StreamID, _, _}, S) -> maybe_send_data_alarm(S, HTTP2Machine0, StreamID) end, send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData, []), - SendData); + SendData), + maybe_reset_idle_timeout(State1); {error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} -> reset_stream(State#state{http2_machine=HTTP2Machine}, StreamID, {stream_error, Reason, Human}); @@ -398,15 +471,20 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) -> %% if we were still waiting for a SETTINGS frame. maybe_ack(State=#state{http2_status=settings}, Frame) -> maybe_ack(State#state{http2_status=connected}, Frame); +%% We do not reset the idle timeout on send here because we are +%% sending data as a consequence of receiving data, which means +%% we already resetted the idle timeout. maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) -> case Frame of - {settings, _} -> Transport:send(Socket, cow_http2:settings_ack()); - {ping, Opaque} -> Transport:send(Socket, cow_http2:ping_ack(Opaque)); + {settings, _} -> + ok = maybe_socket_error(State, Transport:send(Socket, cow_http2:settings_ack())); + {ping, Opaque} -> + ok = maybe_socket_error(State, Transport:send(Socket, cow_http2:ping_ack(Opaque))); _ -> ok end, State. -data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) -> +data_frame(State0=#state{opts=Opts, flow=Flow0, streams=Streams}, StreamID, IsFin, Data) -> case Streams of #{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} -> try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of @@ -415,11 +493,26 @@ data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin %% We may receive more data than we requested. We ensure %% that the flow value doesn't go lower than 0. Size = byte_size(Data), - State = update_window(State0#state{flow=max(0, Flow - Size), + Flow = max(0, Flow0 - Size), + %% We would normally update the window when changing the flow + %% value. But because we are running commands, which themselves + %% may update the window, and we want to avoid updating the + %% window twice in a row, we first run the commands and then + %% only update the window a flow command was executed. We know + %% that it was because the flow value changed in the state. + State1 = State0#state{flow=Flow, streams=Streams#{StreamID => Stream#stream{ flow=max(0, StreamFlow - Size), state=StreamState}}}, - StreamID), - commands(State, StreamID, Commands) + State = commands(State1, StreamID, Commands), + case State of + %% No flow command was executed. We must update the window + %% because we changed the flow value earlier. + #state{flow=Flow} -> + update_window(State, StreamID); + %% Otherwise the window was updated already. + _ -> + State + end catch Class:Exception:Stacktrace -> cowboy:log(cowboy_stream:make_error_log(data, [StreamID, IsFin, Data, StreamState0], @@ -568,11 +661,27 @@ rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, R {#stream{state=StreamState}, Streams} -> terminate_stream_handler(State, StreamID, Reason, StreamState), Children = cowboy_children:shutdown(Children0, StreamID), - State#state{streams=Streams, children=Children}; + cancel_rate_limit(State#state{streams=Streams, children=Children}); error -> State end. +cancel_rate_limit(State0=#state{cancel_rate_num=Num0, cancel_rate_time=Time}) -> + case Num0 - 1 of + 0 -> + CurrentTime = erlang:monotonic_time(millisecond), + if + CurrentTime < Time -> + terminate(State0, {connection_error, enhance_your_calm, + 'Stream cancel rate larger than configuration allows. Flood? (CVE-2023-44487)'}); + true -> + %% When the option has a period of infinity we cannot reach this clause. + init_cancel_rate_limiting(State0, CurrentTime) + end; + Num -> + State0#state{cancel_rate_num=Num} + end. + ignored_frame(State=#state{http2_machine=HTTP2Machine0}) -> case cow_http2_machine:ignored_frame(HTTP2Machine0) of {ok, HTTP2Machine} -> @@ -657,23 +766,37 @@ commands(State=#state{http2_machine=HTTP2Machine}, StreamID, end; %% Send an informational response. commands(State0, StreamID, [{inform, StatusCode, Headers}|Tail]) -> - State = send_headers(State0, StreamID, idle, StatusCode, Headers), + State1 = send_headers(State0, StreamID, idle, StatusCode, Headers), + State = maybe_reset_idle_timeout(State1), commands(State, StreamID, Tail); %% Send response headers. commands(State0, StreamID, [{response, StatusCode, Headers, Body}|Tail]) -> - State = send_response(State0, StreamID, StatusCode, Headers, Body), + State1 = send_response(State0, StreamID, StatusCode, Headers, Body), + State = maybe_reset_idle_timeout(State1), commands(State, StreamID, Tail); %% Send response headers. commands(State0, StreamID, [{headers, StatusCode, Headers}|Tail]) -> - State = send_headers(State0, StreamID, nofin, StatusCode, Headers), + State1 = send_headers(State0, StreamID, nofin, StatusCode, Headers), + State = maybe_reset_idle_timeout(State1), commands(State, StreamID, Tail); %% Send a response body chunk. commands(State0, StreamID, [{data, IsFin, Data}|Tail]) -> - State = maybe_send_data(State0, StreamID, IsFin, Data, []), + State = case maybe_send_data(State0, StreamID, IsFin, Data, []) of + {data_sent, State1} -> + maybe_reset_idle_timeout(State1); + {no_data_sent, State1} -> + State1 + end, commands(State, StreamID, Tail); %% Send trailers. commands(State0, StreamID, [{trailers, Trailers}|Tail]) -> - State = maybe_send_data(State0, StreamID, fin, {trailers, maps:to_list(Trailers)}, []), + State = case maybe_send_data(State0, StreamID, fin, + {trailers, maps:to_list(Trailers)}, []) of + {data_sent, State1} -> + maybe_reset_idle_timeout(State1); + {no_data_sent, State1} -> + State1 + end, commands(State, StreamID, Tail); %% Send a push promise. %% @@ -705,10 +828,11 @@ commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Ma State = case cow_http2_machine:prepare_push_promise(StreamID, HTTP2Machine0, PseudoHeaders, Headers) of {ok, PromisedStreamID, HeaderBlock, HTTP2Machine} -> - Transport:send(Socket, cow_http2:push_promise( - StreamID, PromisedStreamID, HeaderBlock)), - headers_frame(State0#state{http2_machine=HTTP2Machine}, - PromisedStreamID, fin, Headers, PseudoHeaders, 0); + State1 = State0#state{http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State1, Transport:send(Socket, + cow_http2:push_promise(StreamID, PromisedStreamID, HeaderBlock))), + State2 = maybe_reset_idle_timeout(State1), + headers_frame(State2, PromisedStreamID, fin, Headers, PseudoHeaders, 0); {error, no_push} -> State0 end, @@ -731,10 +855,14 @@ commands(State, StreamID, [Error = {internal_error, _, _}|_Tail]) -> %% @todo Only reset when the stream still exists. reset_stream(State, StreamID, Error); %% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself. +%% +%% We do not need to reset the idle timeout on send because it +%% hasn't been set yet. This is called from init/12. commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade}, StreamID, [{switch_protocol, Headers, ?MODULE, _}|Tail]) -> %% @todo This 101 response needs to be passed through stream handlers. - Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))), + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers)))), commands(State, StreamID, Tail); %% Use a different protocol within the stream (CONNECT :protocol). %% @todo Make sure we error out when the feature is disabled. @@ -755,22 +883,32 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) -> %% Tentatively update the window after the flow was updated. -update_window(State=#state{socket=Socket, transport=Transport, +update_window(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) -> - #{StreamID := #stream{flow=StreamFlow}} = Streams, {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of ok -> {<<>>, HTTP2Machine0}; {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1} end, - {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of - ok -> {<<>>, HTTP2Machine2}; - {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} + {Data2, HTTP2Machine} = case Streams of + #{StreamID := #stream{flow=StreamFlow}} -> + case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of + ok -> + {<<>>, HTTP2Machine2}; + {ok, Increment2, HTTP2Machine3} -> + {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} + end; + _ -> + %% Don't update the stream's window if it stopped. + {<<>>, HTTP2Machine2} end, + State = State0#state{http2_machine=HTTP2Machine}, case {Data1, Data2} of - {<<>>, <<>>} -> ok; - _ -> Transport:send(Socket, [Data1, Data2]) - end, - State#state{http2_machine=HTTP2Machine}. + {<<>>, <<>>} -> + State; + _ -> + ok = maybe_socket_error(State, Transport:send(Socket, [Data1, Data2])), + maybe_reset_idle_timeout(State) + end. %% Send the response, trailers or data. @@ -790,18 +928,21 @@ send_response(State0=#state{http2_machine=HTTP2Machine0}, StreamID, StatusCode, = cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, nofin, #{status => cow_http:status_to_integer(StatusCode)}, headers_to_list(Headers)), - maybe_send_data(State0#state{http2_machine=HTTP2Machine}, StreamID, fin, Body, - [cow_http2:headers(StreamID, nofin, HeaderBlock)]) + {_, State} = maybe_send_data(State0#state{http2_machine=HTTP2Machine}, + StreamID, fin, Body, [cow_http2:headers(StreamID, nofin, HeaderBlock)]), + State end. -send_headers(State=#state{socket=Socket, transport=Transport, +send_headers(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, IsFin0, StatusCode, Headers) -> {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, IsFin0, #{status => cow_http:status_to_integer(StatusCode)}, headers_to_list(Headers)), - Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), - State#state{http2_machine=HTTP2Machine}. + State = State0#state{http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:headers(StreamID, IsFin, HeaderBlock))), + State. %% The set-cookie header is special; we can only send one cookie per header. headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) -> @@ -818,13 +959,18 @@ maybe_send_data(State0=#state{socket=Socket, transport=Transport, end, case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of {ok, HTTP2Machine} -> + State1 = State0#state{http2_machine=HTTP2Machine}, %% If we have prefix data (like a HEADERS frame) we need to send it %% even if we do not send any DATA frames. - case Prefix of - [] -> ok; - _ -> Transport:send(Socket, Prefix) + WasDataSent = case Prefix of + [] -> + no_data_sent; + _ -> + ok = maybe_socket_error(State1, Transport:send(Socket, Prefix)), + data_sent end, - maybe_send_data_alarm(State0#state{http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID); + State = maybe_send_data_alarm(State1, HTTP2Machine0, StreamID), + {WasDataSent, State}; {send, SendData, HTTP2Machine} -> State = #state{http2_status=Status, streams=Streams} = send_data(State0#state{http2_machine=HTTP2Machine}, SendData, Prefix), @@ -833,7 +979,7 @@ maybe_send_data(State0=#state{socket=Socket, transport=Transport, Status =:= closing, Streams =:= #{} -> terminate(State, {stop, normal, 'The connection is going away.'}); true -> - maybe_send_data_alarm(State, HTTP2Machine0, StreamID) + {data_sent, maybe_send_data_alarm(State, HTTP2Machine0, StreamID)} end end. @@ -842,12 +988,15 @@ send_data(State0=#state{socket=Socket, transport=Transport, opts=Opts}, SendData _ = [case Data of {sendfile, Offset, Bytes, Path} -> %% When sendfile is disabled we explicitly use the fallback. - _ = case maps:get(sendfile, Opts, true) of - true -> Transport:sendfile(Socket, Path, Offset, Bytes); - false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, []) - end; + {ok, _} = maybe_socket_error(State, + case maps:get(sendfile, Opts, true) of + true -> Transport:sendfile(Socket, Path, Offset, Bytes); + false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, []) + end + ), + ok; _ -> - Transport:send(Socket, Data) + ok = maybe_socket_error(State, Transport:send(Socket, Data)) end || Data <- Acc], send_data_terminate(State, SendData). @@ -946,22 +1095,26 @@ stream_alarm(State, StreamID, Name, Value) -> %% We may have to cancel streams even if we receive multiple %% GOAWAY frames as the LastStreamID value may be lower than %% the one previously received. +%% +%% We do not reset the idle timeout on send here. We already +%% disabled it if we initiated shutdown; and we already reset +%% it if the client sent a GOAWAY frame. goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0, http2_status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) when Status =:= connected; Status =:= closing_initiated; Status =:= closing -> Streams = goaway_streams(State0, maps:to_list(Streams0), LastStreamID, {stop, {goaway, Reason}, 'The connection is going away.'}, []), - State = State0#state{streams=maps:from_list(Streams)}, + State1 = State0#state{streams=maps:from_list(Streams)}, if Status =:= connected; Status =:= closing_initiated -> {OurLastStreamID, HTTP2Machine} = cow_http2_machine:set_last_streamid(HTTP2Machine0), - Transport:send(Socket, cow_http2:goaway( - OurLastStreamID, no_error, <<>>)), - State#state{http2_status=closing, - http2_machine=HTTP2Machine}; + State = State1#state{http2_status=closing, http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:goaway(OurLastStreamID, no_error, <<>>))), + State; true -> - State + State1 end; %% We terminate the connection immediately if it hasn't fully been initialized. goaway(State, {goaway, _, Reason, _}) -> @@ -984,10 +1137,13 @@ goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) -> %% in-flight stream creation (at least one round-trip time), the server can send %% another GOAWAY frame with an updated last stream identifier. This ensures %% that a connection can be cleanly shut down without losing requests. + -spec initiate_closing(#state{}, _) -> #state{}. + initiate_closing(State=#state{http2_status=connected, socket=Socket, transport=Transport, opts=Opts}, Reason) -> - Transport:send(Socket, cow_http2:goaway(16#7fffffff, no_error, <<>>)), + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:goaway(16#7fffffff, no_error, <<>>))), Timeout = maps:get(goaway_initial_timeout, Opts, 1000), Message = {goaway_initial_timeout, Reason}, set_timeout(State#state{http2_status=closing_initiated}, Timeout, Message); @@ -1000,17 +1156,21 @@ initiate_closing(State, Reason) -> terminate(State, {stop, stop_reason(Reason), 'The connection is going away.'}). %% Switch to 'closing' state and stop accepting new streams. + -spec closing(#state{}, Reason :: term()) -> #state{}. + closing(State=#state{streams=Streams}, Reason) when Streams =:= #{} -> terminate(State, Reason); -closing(State=#state{http2_status=closing_initiated, +closing(State0=#state{http2_status=closing_initiated, http2_machine=HTTP2Machine0, socket=Socket, transport=Transport}, Reason) -> %% Stop accepting new streams. {LastStreamID, HTTP2Machine} = cow_http2_machine:set_last_streamid(HTTP2Machine0), - Transport:send(Socket, cow_http2:goaway(LastStreamID, no_error, <<>>)), - closing(State#state{http2_status=closing, http2_machine=HTTP2Machine}, Reason); + State = State0#state{http2_status=closing, http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:goaway(LastStreamID, no_error, <<>>))), + closing(State, Reason); closing(State=#state{http2_status=closing, opts=Opts}, Reason) -> %% If client sent GOAWAY, we may already be in 'closing' but without the %% goaway complete timeout set. @@ -1021,7 +1181,21 @@ closing(State=#state{http2_status=closing, opts=Opts}, Reason) -> stop_reason({stop, Reason, _}) -> Reason; stop_reason(Reason) -> Reason. --spec terminate(#state{}, _) -> no_return(). +%% Function copied from cowboy_http. +maybe_socket_error(State, {error, closed}) -> + terminate(State, {socket_error, closed, 'The socket has been closed.'}); +maybe_socket_error(State, Reason) -> + maybe_socket_error(State, Reason, 'An error has occurred on the socket.'). + +maybe_socket_error(_, Result = ok, _) -> + Result; +maybe_socket_error(_, Result = {ok, _}, _) -> + Result; +maybe_socket_error(State, {error, Reason}, Human) -> + terminate(State, {socket_error, Reason, Human}). + +-spec terminate(#state{} | undefined, _) -> no_return(). + terminate(undefined, Reason) -> exit({shutdown, Reason}); terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status, @@ -1031,7 +1205,8 @@ terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status, %% as debug data in the GOAWAY frame here. Perhaps more. if Status =:= connected; Status =:= closing_initiated -> - Transport:send(Socket, cow_http2:goaway( + %% We are terminating so it's OK if we can't send the GOAWAY anymore. + _ = Transport:send(Socket, cow_http2:goaway( cow_http2_machine:get_last_streamid(HTTP2Machine), terminate_reason(Reason), <<>>)); %% We already sent the GOAWAY frame. @@ -1040,10 +1215,11 @@ terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status, end, terminate_all_streams(State, maps:to_list(Streams), Reason), cowboy_children:terminate(Children), + %% @todo Don't linger on connection errors. terminate_linger(State), exit({shutdown, Reason}); -terminate(#state{socket=Socket, transport=Transport}, Reason) -> - Transport:close(Socket), +%% We are not fully connected so we can just terminate the connection. +terminate(_State, Reason) -> exit({shutdown, Reason}). terminate_reason({connection_error, Reason, _}) -> Reason; @@ -1077,6 +1253,9 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) -> terminate_linger_before_loop(State, TimerRef, Messages) -> %% We may already be in active mode when we do this %% but it's OK because we are shutting down anyway. + %% + %% We specially handle the socket error to terminate + %% when an error occurs. case setopts_active(State) of ok -> terminate_linger_loop(State, TimerRef, Messages); @@ -1101,13 +1280,18 @@ terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) -> end. %% @todo Don't send an RST_STREAM if one was already sent. +%% +%% When resetting the stream we are technically sending data +%% on the socket. However due to implementation complexities +%% we do not attempt to reset the idle timeout on send. reset_stream(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, Error) -> Reason = case Error of {internal_error, _, _} -> internal_error; {stream_error, Reason0, _} -> Reason0 end, - Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), + ok = maybe_socket_error(State0, Transport:send(Socket, + cow_http2:rst_stream(StreamID, Reason))), State1 = case cow_http2_machine:reset_stream(StreamID, HTTP2Machine0) of {ok, HTTP2Machine} -> terminate_stream(State0#state{http2_machine=HTTP2Machine}, StreamID, Error); @@ -1179,7 +1363,8 @@ terminate_stream(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID) -> State = case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine0) of {ok, fin, _} -> - Transport:send(Socket, cow_http2:rst_stream(StreamID, no_error)), + ok = maybe_socket_error(State0, Transport:send(Socket, + cow_http2:rst_stream(StreamID, no_error))), {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), State0#state{http2_machine=HTTP2Machine}; {error, closed} -> @@ -1211,15 +1396,18 @@ terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) -> %% System callbacks. --spec system_continue(_, _, {#state{}, binary()}) -> ok. +-spec system_continue(_, _, {#state{}, binary()}) -> no_return(). + system_continue(_, _, {State, Buffer}) -> - loop(State, Buffer). + before_loop(State, Buffer). -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return(). + system_terminate(Reason0, _, _, {State, Buffer}) -> Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'}, - loop(initiate_closing(State, Reason), Buffer). + before_loop(initiate_closing(State, Reason), Buffer). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. + system_code_change(Misc, _, _, _) -> {ok, Misc}. diff --git a/src/cowboy_http3.erl b/src/cowboy_http3.erl new file mode 100644 index 0000000..9aa6be5 --- /dev/null +++ b/src/cowboy_http3.erl @@ -0,0 +1,1253 @@ +%% Copyright (c) Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% A key difference between cowboy_http2 and cowboy_http3 +%% is that HTTP/3 streams are QUIC streams and therefore +%% much of the connection state is handled outside of +%% Cowboy. + +-module(cowboy_http3). + +-export([init/4]). + +%% Temporary callback to do sendfile over QUIC. +-export([send/2]). + +%% @todo Graceful shutdown? Linger? Timeouts? Frame rates? PROXY header? +-type opts() :: #{ + compress_buffering => boolean(), + compress_threshold => non_neg_integer(), + connection_type => worker | supervisor, + enable_connect_protocol => boolean(), + env => cowboy_middleware:env(), + logger => module(), + max_decode_blocked_streams => 0..16#3fffffffffffffff, + max_decode_table_size => 0..16#3fffffffffffffff, + max_encode_blocked_streams => 0..16#3fffffffffffffff, + max_encode_table_size => 0..16#3fffffffffffffff, + max_ignored_frame_size_received => non_neg_integer() | infinity, + metrics_callback => cowboy_metrics_h:metrics_callback(), + metrics_req_filter => fun((cowboy_req:req()) -> map()), + metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()), + middlewares => [module()], + shutdown_timeout => timeout(), + stream_handlers => [module()], + tracer_callback => cowboy_tracer_h:tracer_callback(), + tracer_flags => [atom()], + tracer_match_specs => cowboy_tracer_h:tracer_match_specs(), + %% Open ended because configured stream handlers might add options. + _ => _ +}. +-export_type([opts/0]). + +%% HTTP/3 or WebTransport stream. +%% +%% WebTransport sessions involve one bidirectional CONNECT stream +%% that must stay open (and can be used for signaling using the +%% Capsule Protocol) and an application-defined number of +%% unidirectional and bidirectional streams, as well as datagrams. +%% +%% WebTransport sessions run in the CONNECT request process and +%% all events related to the session is sent there as a message. +%% The pid of the process is kept in the state. +-record(stream, { + id :: cow_http3:stream_id(), + + %% Whether the stream is currently in a special state. + status :: header | {unidi, control | encoder | decoder} + | normal | {data | ignore, non_neg_integer()} | stopping + | {webtransport_session, normal | {ignore, non_neg_integer()}} + | {webtransport_stream, cow_http3:stream_id()}, + + %% Stream buffer. + buffer = <<>> :: binary(), + + %% Stream state. + state = undefined :: undefined | {module(), any()} +}). + +-record(state, { + parent :: pid(), + ref :: ranch:ref(), + conn :: cowboy_quicer:quicer_connection_handle(), + opts = #{} :: opts(), + + %% Remote address and port for the connection. + peer = undefined :: {inet:ip_address(), inet:port_number()}, + + %% Local address and port for the connection. + sock = undefined :: {inet:ip_address(), inet:port_number()}, + + %% Client certificate. + cert :: undefined | binary(), + + %% HTTP/3 state machine. + http3_machine :: cow_http3_machine:http3_machine(), + + %% Specially handled local unidi streams. + local_control_id = undefined :: undefined | cow_http3:stream_id(), + local_encoder_id = undefined :: undefined | cow_http3:stream_id(), + local_decoder_id = undefined :: undefined | cow_http3:stream_id(), + + %% Bidirectional streams used for requests and responses, + %% as well as unidirectional streams initiated by the client. + streams = #{} :: #{cow_http3:stream_id() => #stream{}}, + + %% Lingering streams that were recently reset. We may receive + %% pending data or messages for these streams a short while + %% after they have been reset. + lingering_streams = [] :: [non_neg_integer()], + + %% Streams can spawn zero or more children which are then managed + %% by this module if operating as a supervisor. + children = cowboy_children:init() :: cowboy_children:children() +}). + +-spec init(pid(), ranch:ref(), cowboy_quicer:quicer_connection_handle(), opts()) + -> no_return(). + +init(Parent, Ref, Conn, Opts) -> + {ok, SettingsBin, HTTP3Machine0} = cow_http3_machine:init(server, Opts), + %% Immediately open a control, encoder and decoder stream. + %% @todo An endpoint MAY avoid creating an encoder stream if it will not be used (for example, if its encoder does not wish to use the dynamic table or if the maximum size of the dynamic table permitted by the peer is zero). + %% @todo An endpoint MAY avoid creating a decoder stream if its decoder sets the maximum capacity of the dynamic table to zero. + {ok, ControlID} = maybe_socket_error(undefined, + cowboy_quicer:start_unidi_stream(Conn, [<<0>>, SettingsBin]), + 'A socket error occurred when opening the control stream.'), + {ok, EncoderID} = maybe_socket_error(undefined, + cowboy_quicer:start_unidi_stream(Conn, <<2>>), + 'A socket error occurred when opening the encoder stream.'), + {ok, DecoderID} = maybe_socket_error(undefined, + cowboy_quicer:start_unidi_stream(Conn, <<3>>), + 'A socket error occurred when opening the encoder stream.'), + %% Set the control, encoder and decoder streams in the machine. + HTTP3Machine = cow_http3_machine:init_unidi_local_streams( + ControlID, EncoderID, DecoderID, HTTP3Machine0), + %% Get the peername/sockname/cert. + {ok, Peer} = maybe_socket_error(undefined, cowboy_quicer:peername(Conn), + 'A socket error occurred when retrieving the peer name.'), + {ok, Sock} = maybe_socket_error(undefined, cowboy_quicer:sockname(Conn), + 'A socket error occurred when retrieving the sock name.'), + CertResult = case cowboy_quicer:peercert(Conn) of + {error, no_peercert} -> + {ok, undefined}; + Cert0 -> + Cert0 + end, + {ok, Cert} = maybe_socket_error(undefined, CertResult, + 'A socket error occurred when retrieving the client TLS certificate.'), + %% Quick! Let's go! + loop(#state{parent=Parent, ref=Ref, conn=Conn, + opts=Opts, peer=Peer, sock=Sock, cert=Cert, + http3_machine=HTTP3Machine, local_control_id=ControlID, + local_encoder_id=EncoderID, local_decoder_id=DecoderID}). + +loop(State0=#state{opts=Opts, children=Children}) -> + receive + Msg when element(1, Msg) =:= quic -> + handle_quic_msg(State0, Msg); + %% Timeouts. + {timeout, Ref, {shutdown, Pid}} -> + cowboy_children:shutdown_timeout(Children, Ref, Pid), + loop(State0); + %% Messages pertaining to a stream. + {{Pid, StreamID}, Msg} when Pid =:= self() -> + loop(info(State0, StreamID, Msg)); + %% WebTransport commands. + {'$webtransport_commands', SessionID, Commands} -> + loop(webtransport_commands(State0, SessionID, Commands)); + %% Exit signal from children. + Msg = {'EXIT', Pid, _} -> + loop(down(State0, Pid, Msg)); + Msg -> + cowboy:log(warning, "Received stray message ~p.", [Msg], Opts), + loop(State0) + end. + +handle_quic_msg(State0=#state{opts=Opts}, Msg) -> + case cowboy_quicer:handle(Msg) of + {data, StreamID, IsFin, Data} -> + parse(State0, StreamID, Data, IsFin); + {datagram, Data} -> + parse_datagram(State0, Data); + {stream_started, StreamID, StreamType} -> + State = stream_new_remote(State0, StreamID, StreamType), + loop(State); + {stream_closed, StreamID, ErrorCode} -> + State = stream_closed(State0, StreamID, ErrorCode), + loop(State); + {peer_send_shutdown, StreamID} -> + State = stream_peer_send_shutdown(State0, StreamID), + loop(State); + closed -> + %% @todo Different error reason if graceful? + Reason = {socket_error, closed, 'The socket has been closed.'}, + terminate(State0, Reason); + ok -> + loop(State0); + unknown -> + cowboy:log(warning, "Received unknown QUIC message ~p.", [Msg], Opts), + loop(State0); + {socket_error, Reason} -> + terminate(State0, {socket_error, Reason, + 'An error has occurred on the socket.'}) + end. + +parse(State=#state{opts=Opts}, StreamID, Data, IsFin) -> + case stream_get(State, StreamID) of + Stream=#stream{buffer= <<>>} -> + parse1(State, Stream, Data, IsFin); + Stream=#stream{buffer=Buffer} -> + Stream1 = Stream#stream{buffer= <<>>}, + parse1(stream_store(State, Stream1), + Stream1, <<Buffer/binary, Data/binary>>, IsFin); + %% Pending data for a stream that has been reset. Ignore. + error -> + case is_lingering_stream(State, StreamID) of + true -> + ok; + false -> + %% We avoid logging the data as it could be quite large. + cowboy:log(warning, "Received data for unknown stream ~p.", + [StreamID], Opts) + end, + loop(State) + end. + +parse1(State, Stream=#stream{status=header}, Data, IsFin) -> + parse_unidirectional_stream_header(State, Stream, Data, IsFin); +parse1(State=#state{http3_machine=HTTP3Machine0}, + #stream{status={unidi, Type}, id=StreamID}, Data, IsFin) + when Type =:= encoder; Type =:= decoder -> + case cow_http3_machine:unidi_data(Data, IsFin, StreamID, HTTP3Machine0) of + {ok, Instrs, HTTP3Machine} -> + loop(send_instructions(State#state{http3_machine=HTTP3Machine}, Instrs)); + {error, Error={connection_error, _, _}, HTTP3Machine} -> + terminate(State#state{http3_machine=HTTP3Machine}, Error) + end; +%% @todo Handle when IsFin = fin which must terminate the WT session. +parse1(State=#state{conn=Conn}, Stream=#stream{id=SessionID, status= + {webtransport_session, normal}}, Data, IsFin) -> + case cow_capsule:parse(Data) of + {ok, wt_drain_session, Rest} -> + webtransport_event(State, SessionID, close_initiated), + parse1(State, Stream, Rest, IsFin); + {ok, {wt_close_session, AppCode, AppMsg}, Rest} -> + %% This event will be handled specially and lead + %% to the termination of the session process. + webtransport_event(State, SessionID, {closed, AppCode, AppMsg}), + %% Shutdown the CONNECT stream immediately. + cowboy_quicer:shutdown_stream(Conn, SessionID), + %% @todo Will we receive a {stream_closed,...} after that? + %% If any data is received past that point this is an error. + %% @todo Don't crash, error out properly. + <<>> = Rest, + loop(webtransport_terminate_session(State, Stream)); + more -> + loop(stream_store(State, Stream#stream{buffer=Data})); + %% Ignore unhandled/unknown capsules. + %% @todo Do this when cow_capsule includes some. +% {ok, _, Rest} -> +% parse1(State, Stream, Rest, IsFin); +% {ok, Rest} -> +% parse1(State, Stream, Rest, IsFin); + %% @todo Make the max length configurable? + {skip, Len} when Len =< 8192 -> + loop(stream_store(State, Stream#stream{ + status={webtransport_session, {ignore, Len}}})); + {skip, Len} -> + %% @todo What should be done on capsule error? + error({todo, capsule_too_long, Len}); + error -> + %% @todo What should be done on capsule error? + error({todo, capsule_error, Data}) + end; +parse1(State, Stream=#stream{status= + {webtransport_session, {ignore, Len}}}, Data, IsFin) -> + case Data of + <<_:Len/unit:8, Rest/bits>> -> + parse1(State, Stream#stream{status={webtransport_session, normal}}, Rest, IsFin); + _ -> + loop(stream_store(State, Stream#stream{ + status={webtransport_session, {ignore, Len - byte_size(Data)}}})) + end; +parse1(State, #stream{id=StreamID, status={webtransport_stream, SessionID}}, Data, IsFin) -> + webtransport_event(State, SessionID, {stream_data, StreamID, IsFin, Data}), + %% No need to store the stream again, WT streams don't get changed here. + loop(State); +parse1(State, Stream=#stream{status={data, Len}, id=StreamID}, Data, IsFin) -> + DataLen = byte_size(Data), + if + DataLen < Len -> + %% We don't have the full frame but this is the end of the + %% data we have. So FrameIsFin is equivalent to IsFin here. + loop(frame(State, Stream#stream{status={data, Len - DataLen}}, {data, Data}, IsFin)); + true -> + <<Data1:Len/binary, Rest/bits>> = Data, + FrameIsFin = is_fin(IsFin, Rest), + parse(frame(State, Stream#stream{status=normal}, {data, Data1}, FrameIsFin), + StreamID, Rest, IsFin) + end; +parse1(State, Stream=#stream{status={ignore, Len}, id=StreamID}, Data, IsFin) -> + DataLen = byte_size(Data), + if + DataLen < Len -> + loop(stream_store(State, Stream#stream{status={ignore, Len - DataLen}})); + true -> + <<_:Len/binary, Rest/bits>> = Data, + parse(stream_store(State, Stream#stream{status=normal}), + StreamID, Rest, IsFin) + end; +%% @todo Clause that discards receiving data for stopping streams. +%% We may receive a few more frames after we abort receiving. +parse1(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Data, IsFin) -> + case cow_http3:parse(Data) of + {ok, Frame, Rest} -> + FrameIsFin = is_fin(IsFin, Rest), + parse(frame(State, Stream, Frame, FrameIsFin), StreamID, Rest, IsFin); + %% The WebTransport stream header is not a real frame. + {webtransport_stream_header, SessionID, Rest} -> + become_webtransport_stream(State, Stream, bidi, SessionID, Rest, IsFin); + {more, Frame = {data, _}, Len} -> + %% We're at the end of the data so FrameIsFin is equivalent to IsFin. + case IsFin of + nofin -> + %% The stream will be stored at the end of processing commands. + loop(frame(State, Stream#stream{status={data, Len}}, Frame, nofin)); + fin -> + terminate(State, {connection_error, h3_frame_error, + 'Last frame on stream was truncated. (RFC9114 7.1)'}) + end; + {more, ignore, Len} -> + %% @todo This setting should be tested. + %% + %% While the default value doesn't warrant doing a streaming ignore + %% (and could work just fine with the 'more' clause), this value + %% is configurable and users may want to set it large. + MaxIgnoredLen = maps:get(max_ignored_frame_size_received, Opts, 16384), + %% We're at the end of the data so FrameIsFin is equivalent to IsFin. + case IsFin of + nofin when Len < MaxIgnoredLen -> + %% We are not processing commands so we must store the stream. + %% We also call ignored_frame here; we will not need to call + %% it again when ignoring the rest of the data. + Stream1 = Stream#stream{status={ignore, Len}}, + State1 = ignored_frame(State, Stream1), + loop(stream_store(State1, Stream1)); + nofin -> + terminate(State, {connection_error, h3_excessive_load, + 'Ignored frame larger than limit. (RFC9114 10.5)'}); + fin -> + terminate(State, {connection_error, h3_frame_error, + 'Last frame on stream was truncated. (RFC9114 7.1)'}) + end; + {ignore, Rest} -> + parse(ignored_frame(State, Stream), StreamID, Rest, IsFin); + Error = {connection_error, _, _} -> + terminate(State, Error); + more when Data =:= <<>> -> + %% The buffer was already reset to <<>>. + loop(stream_store(State, Stream)); + more -> + %% We're at the end of the data so FrameIsFin is equivalent to IsFin. + case IsFin of + nofin -> + loop(stream_store(State, Stream#stream{buffer=Data})); + fin -> + terminate(State, {connection_error, h3_frame_error, + 'Last frame on stream was truncated. (RFC9114 7.1)'}) + end + end. + +%% We may receive multiple frames in a single QUIC packet. +%% The FIN flag applies to the QUIC packet, not to the frame. +%% We must therefore only consider the frame to have a FIN +%% flag if there's no data remaining to be read. +is_fin(fin, <<>>) -> fin; +is_fin(_, _) -> nofin. + +parse_unidirectional_stream_header(State0=#state{http3_machine=HTTP3Machine0}, + Stream0=#stream{id=StreamID}, Data, IsFin) -> + case cow_http3:parse_unidi_stream_header(Data) of + {ok, Type, Rest} when Type =:= control; Type =:= encoder; Type =:= decoder -> + case cow_http3_machine:set_unidi_remote_stream_type( + StreamID, Type, HTTP3Machine0) of + {ok, HTTP3Machine} -> + State = State0#state{http3_machine=HTTP3Machine}, + Stream = Stream0#stream{status={unidi, Type}}, + parse(stream_store(State, Stream), StreamID, Rest, IsFin); + {error, Error={connection_error, _, _}, HTTP3Machine} -> + terminate(State0#state{http3_machine=HTTP3Machine}, Error) + end; + %% @todo Perhaps do this in cow_http3_machine directly. + {ok, push, _} -> + terminate(State0, {connection_error, h3_stream_creation_error, + 'Only servers can push. (RFC9114 6.2.2)'}); + {ok, {webtransport, SessionID}, Rest} -> + become_webtransport_stream(State0, Stream0, unidi, SessionID, Rest, IsFin); + %% Unknown stream types must be ignored. We choose to abort the + %% stream instead of reading and discarding the incoming data. + {undefined, _} -> + loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error)); + %% Very unlikely to happen but WebTransport headers may be fragmented + %% as they are more than one byte. The fin flag in this case is an error, + %% but because it happens in WebTransport application data (the Session ID) + %% we only reset the impacted stream and not the entire connection. + more when IsFin =:= fin -> + loop(stream_abort_receive(State0, Stream0, h3_stream_creation_error)); + more -> + loop(stream_store(State0, Stream0#stream{buffer=Data})) + end. + +frame(State=#state{http3_machine=HTTP3Machine0}, + Stream=#stream{id=StreamID}, Frame, IsFin) -> + case cow_http3_machine:frame(Frame, IsFin, StreamID, HTTP3Machine0) of + {ok, HTTP3Machine} -> + State#state{http3_machine=HTTP3Machine}; + {ok, {data, Data}, HTTP3Machine} -> + data_frame(State#state{http3_machine=HTTP3Machine}, Stream, IsFin, Data); + {ok, {headers, Headers, PseudoHeaders, BodyLen}, Instrs, HTTP3Machine} -> + headers_frame(send_instructions(State#state{http3_machine=HTTP3Machine}, Instrs), + Stream, IsFin, Headers, PseudoHeaders, BodyLen); + {ok, {trailers, _Trailers}, Instrs, HTTP3Machine} -> + %% @todo Propagate trailers. + send_instructions(State#state{http3_machine=HTTP3Machine}, Instrs); + {ok, GoAway={goaway, _}, HTTP3Machine} -> + goaway(State#state{http3_machine=HTTP3Machine}, GoAway); + {error, Error={stream_error, _Reason, _Human}, Instrs, HTTP3Machine} -> + State1 = send_instructions(State#state{http3_machine=HTTP3Machine}, Instrs), + reset_stream(State1, Stream, Error); + {error, Error={connection_error, _, _}, HTTP3Machine} -> + terminate(State#state{http3_machine=HTTP3Machine}, Error) + end. + +data_frame(State=#state{opts=Opts}, + Stream=#stream{id=StreamID, state=StreamState0}, IsFin, Data) -> + try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of + {Commands, StreamState} -> + commands(State, Stream#stream{state=StreamState}, Commands) + catch Class:Exception:Stacktrace -> + cowboy:log(cowboy_stream:make_error_log(data, + [StreamID, IsFin, Data, StreamState0], + Class, Exception, Stacktrace), Opts), + reset_stream(State, Stream, {internal_error, {Class, Exception}, + 'Unhandled exception in cowboy_stream:data/4.'}) + end. + +headers_frame(State, Stream, IsFin, Headers, + PseudoHeaders=#{method := <<"CONNECT">>}, _) + when map_size(PseudoHeaders) =:= 2 -> + early_error(State, Stream, IsFin, Headers, PseudoHeaders, 501, + 'The CONNECT method is currently not implemented. (RFC7231 4.3.6)'); +headers_frame(State, Stream, IsFin, Headers, + PseudoHeaders=#{method := <<"TRACE">>}, _) -> + early_error(State, Stream, IsFin, Headers, PseudoHeaders, 501, + 'The TRACE method is currently not implemented. (RFC9114 4.4, RFC7231 4.3.8)'); +headers_frame(State, Stream, IsFin, Headers, PseudoHeaders=#{authority := Authority}, BodyLen) -> + headers_frame_parse_host(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen, Authority); +headers_frame(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen) -> + case lists:keyfind(<<"host">>, 1, Headers) of + {_, Authority} -> + headers_frame_parse_host(State, Stream, IsFin, Headers, PseudoHeaders, BodyLen, Authority); + _ -> + reset_stream(State, Stream, {stream_error, h3_message_error, + 'Requests translated from HTTP/1.1 must include a host header. (RFC7540 8.1.2.3, RFC7230 5.4)'}) + end. + +headers_frame_parse_host(State=#state{ref=Ref, peer=Peer, sock=Sock, cert=Cert}, + Stream=#stream{id=StreamID}, IsFin, Headers, + PseudoHeaders=#{method := Method, scheme := Scheme, path := PathWithQs}, + BodyLen, Authority) -> + try cow_http_hd:parse_host(Authority) of + {Host, Port0} -> + Port = ensure_port(Scheme, Port0), + try cow_http:parse_fullpath(PathWithQs) of + {<<>>, _} -> + reset_stream(State, Stream, {stream_error, h3_message_error, + 'The path component must not be empty. (RFC7540 8.1.2.3)'}); + {Path, Qs} -> + Req0 = #{ + ref => Ref, + pid => self(), + streamid => StreamID, + peer => Peer, + sock => Sock, + cert => Cert, + method => Method, + scheme => Scheme, + host => Host, + port => Port, + path => Path, + qs => Qs, + version => 'HTTP/3', + headers => headers_to_map(Headers, #{}), + has_body => IsFin =:= nofin, + body_length => BodyLen + }, + %% We add the protocol information for extended CONNECTs. + Req = case PseudoHeaders of + #{protocol := Protocol} -> Req0#{protocol => Protocol}; + _ -> Req0 + end, + headers_frame(State, Stream, Req) + catch _:_ -> + reset_stream(State, Stream, {stream_error, h3_message_error, + 'The :path pseudo-header is invalid. (RFC7540 8.1.2.3)'}) + end + catch _:_ -> + reset_stream(State, Stream, {stream_error, h3_message_error, + 'The :authority pseudo-header is invalid. (RFC7540 8.1.2.3)'}) + end. + +%% @todo Copied from cowboy_http2. +%% @todo How to handle "http"? +ensure_port(<<"http">>, undefined) -> 80; +ensure_port(<<"https">>, undefined) -> 443; +ensure_port(_, Port) -> Port. + +%% @todo Copied from cowboy_http2. +%% This function is necessary to properly handle duplicate headers +%% and the special-case cookie header. +headers_to_map([], Acc) -> + Acc; +headers_to_map([{Name, Value}|Tail], Acc0) -> + Acc = case Acc0 of + %% The cookie header does not use proper HTTP header lists. + #{Name := Value0} when Name =:= <<"cookie">> -> + Acc0#{Name => << Value0/binary, "; ", Value/binary >>}; + #{Name := Value0} -> + Acc0#{Name => << Value0/binary, ", ", Value/binary >>}; + _ -> + Acc0#{Name => Value} + end, + headers_to_map(Tail, Acc). + +%% @todo WebTransport CONNECT requests must have extra checks on settings. +%% @todo We may also need to defer them if we didn't get settings. +headers_frame(State=#state{opts=Opts}, Stream=#stream{id=StreamID}, Req) -> + try cowboy_stream:init(StreamID, Req, Opts) of + {Commands, StreamState} -> + commands(State, Stream#stream{state=StreamState}, Commands) + catch Class:Exception:Stacktrace -> + cowboy:log(cowboy_stream:make_error_log(init, + [StreamID, Req, Opts], + Class, Exception, Stacktrace), Opts), + reset_stream(State, Stream, {internal_error, {Class, Exception}, + 'Unhandled exception in cowboy_stream:init/3.'}) + end. + +early_error(State0=#state{ref=Ref, opts=Opts, peer=Peer}, + Stream=#stream{id=StreamID}, _IsFin, Headers, #{method := Method}, + StatusCode0, HumanReadable) -> + %% We automatically terminate the stream but it is not an error + %% per se (at least not in the first implementation). + Reason = {stream_error, h3_no_error, HumanReadable}, + %% The partial Req is minimal for now. We only have one case + %% where it can be called (when a method is completely disabled). + PartialReq = #{ + ref => Ref, + peer => Peer, + method => Method, + headers => headers_to_map(Headers, #{}) + }, + Resp = {response, StatusCode0, RespHeaders0=#{<<"content-length">> => <<"0">>}, <<>>}, + try cowboy_stream:early_error(StreamID, Reason, PartialReq, Resp, Opts) of + {response, StatusCode, RespHeaders, RespBody} -> + send_response(State0, Stream, StatusCode, RespHeaders, RespBody) + catch Class:Exception:Stacktrace -> + cowboy:log(cowboy_stream:make_error_log(early_error, + [StreamID, Reason, PartialReq, Resp, Opts], + Class, Exception, Stacktrace), Opts), + %% We still need to send an error response, so send what we initially + %% wanted to send. It's better than nothing. + send_headers(State0, Stream, fin, StatusCode0, RespHeaders0) + end. + +%% Datagrams. + +parse_datagram(State, Data0) -> + {SessionID, Data} = cow_http3:parse_datagram(Data0), + case stream_get(State, SessionID) of + #stream{status={webtransport_session, _}} -> + webtransport_event(State, SessionID, {datagram, Data}), + loop(State); + _ -> + error(todo) %% @todo Might be a future WT session or an error. + end. + +%% Erlang messages. + +down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) -> + State = case cowboy_children:down(Children0, Pid) of + %% The stream was terminated already. + {ok, undefined, Children} -> + State0#state{children=Children}; + %% The stream is still running. + {ok, StreamID, Children} -> + info(State0#state{children=Children}, StreamID, Msg); + %% The process was unknown. + error -> + cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n", + [Msg, Pid], Opts), + State0 + end, + if +%% @todo +% State#state.http2_status =:= closing, State#state.streams =:= #{} -> +% terminate(State, {stop, normal, 'The connection is going away.'}); + true -> + State + end. + +info(State=#state{opts=Opts, http3_machine=_HTTP3Machine}, StreamID, Msg) -> + case stream_get(State, StreamID) of + Stream=#stream{state=StreamState0} -> + try cowboy_stream:info(StreamID, Msg, StreamState0) of + {Commands, StreamState} -> + commands(State, Stream#stream{state=StreamState}, Commands) + catch Class:Exception:Stacktrace -> + cowboy:log(cowboy_stream:make_error_log(info, + [StreamID, Msg, StreamState0], + Class, Exception, Stacktrace), Opts), + reset_stream(State, Stream, {internal_error, {Class, Exception}, + 'Unhandled exception in cowboy_stream:info/3.'}) + end; + error -> + case is_lingering_stream(State, StreamID) of + true -> + ok; + false -> + cowboy:log(warning, "Received message ~p for unknown stream ~p.", + [Msg, StreamID], Opts) + end, + State + end. + +%% Stream handler commands. + +commands(State, Stream, []) -> + stream_store(State, Stream); +%% Error responses are sent only if a response wasn't sent already. +commands(State=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamID}, + [{error_response, StatusCode, Headers, Body}|Tail]) -> + case cow_http3_machine:get_bidi_stream_local_state(StreamID, HTTP3Machine) of + {ok, idle} -> + commands(State, Stream, [{response, StatusCode, Headers, Body}|Tail]); + _ -> + commands(State, Stream, Tail) + end; +%% Send an informational response. +commands(State0, Stream, [{inform, StatusCode, Headers}|Tail]) -> + State = send_headers(State0, Stream, idle, StatusCode, Headers), + commands(State, Stream, Tail); +%% Send response headers. +commands(State0, Stream, [{response, StatusCode, Headers, Body}|Tail]) -> + State = send_response(State0, Stream, StatusCode, Headers, Body), + commands(State, Stream, Tail); +%% Send response headers. +commands(State0, Stream, [{headers, StatusCode, Headers}|Tail]) -> + State = send_headers(State0, Stream, nofin, StatusCode, Headers), + commands(State, Stream, Tail); +%%% Send a response body chunk. +commands(State0=#state{conn=Conn}, Stream=#stream{id=StreamID}, [{data, IsFin, Data}|Tail]) -> + _ = case Data of + {sendfile, Offset, Bytes, Path} -> + %% Temporary solution to do sendfile over QUIC. + {ok, _} = ranch_transport:sendfile(?MODULE, {Conn, StreamID}, + Path, Offset, Bytes, []), + ok = maybe_socket_error(State0, + cowboy_quicer:send(Conn, StreamID, cow_http3:data(<<>>), IsFin)); + _ -> + ok = maybe_socket_error(State0, + cowboy_quicer:send(Conn, StreamID, cow_http3:data(Data), IsFin)) + end, + State = maybe_send_is_fin(State0, Stream, IsFin), + commands(State, Stream, Tail); +%%% Send trailers. +commands(State0=#state{conn=Conn, http3_machine=HTTP3Machine0}, + Stream=#stream{id=StreamID}, [{trailers, Trailers}|Tail]) -> + State = case cow_http3_machine:prepare_trailers( + StreamID, HTTP3Machine0, maps:to_list(Trailers)) of + {trailers, HeaderBlock, Instrs, HTTP3Machine} -> + State1 = send_instructions(State0#state{http3_machine=HTTP3Machine}, Instrs), + ok = maybe_socket_error(State1, + cowboy_quicer:send(Conn, StreamID, cow_http3:headers(HeaderBlock), fin)), + State1; + {no_trailers, HTTP3Machine} -> + ok = maybe_socket_error(State0, + cowboy_quicer:send(Conn, StreamID, cow_http3:data(<<>>), fin)), + State0#state{http3_machine=HTTP3Machine} + end, + commands(State, Stream, Tail); +%% Send a push promise. +%% +%% @todo Responses sent as a result of a push_promise request +%% must not send push_promise frames themselves. +%% +%% @todo We should not send push_promise frames when we are +%% in the closing http2_status. +%commands(State0=#state{socket=Socket, transport=Transport, http3_machine=HTTP3Machine0}, +% Stream, [{push, Method, Scheme, Host, Port, Path, Qs, Headers0}|Tail]) -> +% Authority = case {Scheme, Port} of +% {<<"http">>, 80} -> Host; +% {<<"https">>, 443} -> Host; +% _ -> iolist_to_binary([Host, $:, integer_to_binary(Port)]) +% end, +% PathWithQs = iolist_to_binary(case Qs of +% <<>> -> Path; +% _ -> [Path, $?, Qs] +% end), +% PseudoHeaders = #{ +% method => Method, +% scheme => Scheme, +% authority => Authority, +% path => PathWithQs +% }, +% %% We need to make sure the header value is binary before we can +% %% create the Req object, as it expects them to be flat. +% Headers = maps:to_list(maps:map(fun(_, V) -> iolist_to_binary(V) end, Headers0)), +% %% @todo +% State = case cow_http2_machine:prepare_push_promise(StreamID, HTTP3Machine0, +% PseudoHeaders, Headers) of +% {ok, PromisedStreamID, HeaderBlock, HTTP3Machine} -> +% Transport:send(Socket, cow_http2:push_promise( +% StreamID, PromisedStreamID, HeaderBlock)), +% headers_frame(State0#state{http3_machine=HTTP2Machine}, +% PromisedStreamID, fin, Headers, PseudoHeaders, 0); +% {error, no_push} -> +% State0 +% end, +% commands(State, Stream, Tail); +%%% Read the request body. +%commands(State0=#state{flow=Flow, streams=Streams}, Stream, [{flow, Size}|Tail]) -> +commands(State, Stream, [{flow, _Size}|Tail]) -> + %% @todo We should tell the QUIC stream to increase its window size. +% #{StreamID := Stream=#stream{flow=StreamFlow}} = Streams, +% State = update_window(State0#state{flow=Flow + Size, +% streams=Streams#{StreamID => Stream#stream{flow=StreamFlow + Size}}}, +% StreamID), + commands(State, Stream, Tail); +%% Supervise a child process. +commands(State=#state{children=Children}, Stream=#stream{id=StreamID}, + [{spawn, Pid, Shutdown}|Tail]) -> + commands(State#state{children=cowboy_children:up(Children, Pid, StreamID, Shutdown)}, + Stream, Tail); +%% Error handling. +commands(State, Stream, [Error = {internal_error, _, _}|_Tail]) -> + %% @todo Do we want to run the commands after an internal_error? + %% @todo Do we even allow commands after? + %% @todo Only reset when the stream still exists. + reset_stream(State, Stream, Error); +%% Use a different protocol within the stream (CONNECT :protocol). +%% @todo Make sure we error out when the feature is disabled. +commands(State0, Stream0=#stream{id=StreamID}, + [{switch_protocol, Headers, cowboy_webtransport, WTState=#{}}|Tail]) -> + State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}), + #state{http3_machine=HTTP3Machine0} = State, + Stream1 = #stream{state=StreamState} = stream_get(State, StreamID), + %% The stream becomes a WT session at that point. It is the + %% parent stream of all streams in this WT session. The + %% cowboy_stream state is kept because it will be needed + %% to terminate the stream properly. + HTTP3Machine = cow_http3_machine:become_webtransport_session(StreamID, HTTP3Machine0), + Stream = Stream1#stream{ + status={webtransport_session, normal}, + state={cowboy_webtransport, WTState#{stream_state => StreamState}} + }, + %% @todo We must propagate the buffer to capsule handling if any. + commands(State#state{http3_machine=HTTP3Machine}, Stream, Tail); +commands(State0, Stream0=#stream{id=StreamID}, + [{switch_protocol, Headers, _Mod, _ModState}|Tail]) -> + State = info(stream_store(State0, Stream0), StreamID, {headers, 200, Headers}), + Stream = stream_get(State, StreamID), + commands(State, Stream, Tail); +%% Set options dynamically. +commands(State, Stream, [{set_options, _Opts}|Tail]) -> + commands(State, Stream, Tail); +commands(State, Stream, [stop|_Tail]) -> + %% @todo Do we want to run the commands after a stop? + %% @todo Do we even allow commands after? + stop_stream(State, Stream); +%% Log event. +commands(State=#state{opts=Opts}, Stream, [Log={log, _, _, _}|Tail]) -> + cowboy:log(Log, Opts), + commands(State, Stream, Tail). + +send_response(State0=#state{conn=Conn, http3_machine=HTTP3Machine0}, + Stream=#stream{id=StreamID}, StatusCode, Headers, Body) -> + Size = case Body of + {sendfile, _, Bytes0, _} -> Bytes0; + _ -> iolist_size(Body) + end, + case Size of + 0 -> + State = send_headers(State0, Stream, fin, StatusCode, Headers), + maybe_send_is_fin(State, Stream, fin); + _ -> + %% @todo Add a test for HEAD to make sure we don't send the body when + %% returning {response...} from a stream handler (or {headers...} then {data...}). + {ok, _IsFin, HeaderBlock, Instrs, HTTP3Machine} + = cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, nofin, + #{status => cow_http:status_to_integer(StatusCode)}, + headers_to_list(Headers)), + State = send_instructions(State0#state{http3_machine=HTTP3Machine}, Instrs), + %% @todo It might be better to do async sends. + _ = case Body of + {sendfile, Offset, Bytes, Path} -> + ok = maybe_socket_error(State, + cowboy_quicer:send(Conn, StreamID, cow_http3:headers(HeaderBlock))), + %% Temporary solution to do sendfile over QUIC. + {ok, _} = maybe_socket_error(State, + ranch_transport:sendfile(?MODULE, {Conn, StreamID}, + Path, Offset, Bytes, [])), + ok = maybe_socket_error(State, + cowboy_quicer:send(Conn, StreamID, cow_http3:data(<<>>), fin)); + _ -> + ok = maybe_socket_error(State, + cowboy_quicer:send(Conn, StreamID, [ + cow_http3:headers(HeaderBlock), + cow_http3:data(Body) + ], fin)) + end, + maybe_send_is_fin(State, Stream, fin) + end. + +maybe_send_is_fin(State=#state{http3_machine=HTTP3Machine0}, + Stream=#stream{id=StreamID}, fin) -> + HTTP3Machine = cow_http3_machine:close_bidi_stream_for_sending(StreamID, HTTP3Machine0), + maybe_terminate_stream(State#state{http3_machine=HTTP3Machine}, Stream); +maybe_send_is_fin(State, _, _) -> + State. + +%% Temporary callback to do sendfile over QUIC. +-spec send({cowboy_quicer:quicer_connection_handle(), cow_http3:stream_id()}, + iodata()) -> ok | {error, any()}. + +send({Conn, StreamID}, IoData) -> + cowboy_quicer:send(Conn, StreamID, cow_http3:data(IoData)). + +send_headers(State0=#state{conn=Conn, http3_machine=HTTP3Machine0}, + #stream{id=StreamID}, IsFin0, StatusCode, Headers) -> + {ok, IsFin, HeaderBlock, Instrs, HTTP3Machine} + = cow_http3_machine:prepare_headers(StreamID, HTTP3Machine0, IsFin0, + #{status => cow_http:status_to_integer(StatusCode)}, + headers_to_list(Headers)), + State = send_instructions(State0#state{http3_machine=HTTP3Machine}, Instrs), + ok = maybe_socket_error(State, + cowboy_quicer:send(Conn, StreamID, cow_http3:headers(HeaderBlock), IsFin)), + State. + +%% The set-cookie header is special; we can only send one cookie per header. +headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) -> + Headers = maps:to_list(maps:remove(<<"set-cookie">>, Headers0)), + Headers ++ [{<<"set-cookie">>, Value} || Value <- SetCookies]; +headers_to_list(Headers) -> + maps:to_list(Headers). + +%% @todo We would open unidi streams here if we only open on-demand. +%% No instructions. +send_instructions(State, undefined) -> + State; +%% Decoder instructions. +send_instructions(State=#state{conn=Conn, local_decoder_id=DecoderID}, + {decoder_instructions, DecData}) -> + ok = maybe_socket_error(State, + cowboy_quicer:send(Conn, DecoderID, DecData)), + State; +%% Encoder instructions. +send_instructions(State=#state{conn=Conn, local_encoder_id=EncoderID}, + {encoder_instructions, EncData}) -> + ok = maybe_socket_error(State, + cowboy_quicer:send(Conn, EncoderID, EncData)), + State. + +%% We mark the stream as being a WebTransport stream +%% and then continue parsing the data as a WebTransport +%% stream. This function is common for incoming unidi +%% and bidi streams. +become_webtransport_stream(State0=#state{http3_machine=HTTP3Machine0}, + Stream0=#stream{id=StreamID}, StreamType, SessionID, Rest, IsFin) -> + case cow_http3_machine:become_webtransport_stream(StreamID, SessionID, HTTP3Machine0) of + {ok, HTTP3Machine} -> + State = State0#state{http3_machine=HTTP3Machine}, + Stream = Stream0#stream{status={webtransport_stream, SessionID}}, + webtransport_event(State, SessionID, {stream_open, StreamID, StreamType}), + %% We don't need to parse the remaining data if there isn't any. + case {Rest, IsFin} of + {<<>>, nofin} -> loop(stream_store(State, Stream)); + _ -> parse(stream_store(State, Stream), StreamID, Rest, IsFin) + end + %% @todo Error conditions. + end. + +webtransport_event(State, SessionID, Event) -> + #stream{ + status={webtransport_session, _}, + state={cowboy_webtransport, #{session_pid := SessionPid}} + } = stream_get(State, SessionID), + SessionPid ! {'$webtransport_event', SessionID, Event}, + ok. + +webtransport_commands(State, SessionID, Commands) -> + case stream_get(State, SessionID) of + Session = #stream{status={webtransport_session, _}} -> + wt_commands(State, Session, Commands); + %% The stream has been terminated, ignore pending commands. + error -> + State + end. + +wt_commands(State, _, []) -> + State; +wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID}, + [{open_stream, OpenStreamRef, StreamType, InitialData}|Tail]) -> + %% Because opening the stream involves sending a short header + %% we necessarily write data. The InitialData variable allows + %% providing additional data to be sent in the same packet. + StartF = case StreamType of + bidi -> start_bidi_stream; + unidi -> start_unidi_stream + end, + Header = cow_http3:webtransport_stream_header(SessionID, StreamType), + case cowboy_quicer:StartF(Conn, [Header, InitialData]) of + {ok, StreamID} -> + %% @todo Pass Session directly? + webtransport_event(State0, SessionID, + {opened_stream_id, OpenStreamRef, StreamID}), + State = stream_new_local(State0, StreamID, StreamType, + {webtransport_stream, SessionID}), + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State, Session, [{close_stream, StreamID, Code}|Tail]) -> + %% @todo Check that StreamID belongs to Session. + error({todo, State, Session, [{close_stream, StreamID, Code}|Tail]}); +wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, + [{send, datagram, Data}|Tail]) -> + case cowboy_quicer:send_datagram(Conn, cow_http3:datagram(SessionID, Data)) of + ok -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, Data}|Tail]) -> + %% @todo Check that StreamID belongs to Session. + case cowboy_quicer:send(Conn, StreamID, Data, nofin) of + ok -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State=#state{conn=Conn}, Session, [{send, StreamID, IsFin, Data}|Tail]) -> + %% @todo Check that StreamID belongs to Session. + case cowboy_quicer:send(Conn, StreamID, Data, IsFin) of + ok -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State=#state{conn=Conn}, Session=#stream{id=SessionID}, [initiate_close|Tail]) -> + %% We must send a WT_DRAIN_SESSION capsule on the CONNECT stream. + Capsule = cow_capsule:wt_drain_session(), + case cowboy_quicer:send(Conn, SessionID, Capsule, nofin) of + ok -> + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end; +wt_commands(State0=#state{conn=Conn}, Session=#stream{id=SessionID}, [Cmd|Tail]) + when Cmd =:= close; element(1, Cmd) =:= close -> + %% We must send a WT_CLOSE_SESSION capsule on the CONNECT stream. + {AppCode, AppMsg} = case Cmd of + close -> {0, <<>>}; + {close, AppCode0} -> {AppCode0, <<>>}; + {close, AppCode0, AppMsg0} -> {AppCode0, AppMsg0} + end, + Capsule = cow_capsule:wt_close_session(AppCode, AppMsg), + case cowboy_quicer:send(Conn, SessionID, Capsule, fin) of + ok -> + State = webtransport_terminate_session(State0, Session), + %% @todo Because the handler is in a separate process + %% we must wait for it to stop and eventually + %% kill the process if it takes too long. + %% @todo We may need to fully close the CONNECT stream (if remote doesn't reset it). + wt_commands(State, Session, Tail) + %% @todo Handle errors. + end. + +webtransport_terminate_session(State=#state{conn=Conn, http3_machine=HTTP3Machine0, + streams=Streams0, lingering_streams=Lingering0}, #stream{id=SessionID}) -> + %% Reset/abort the WT streams. + Streams = maps:filtermap(fun + (_, #stream{id=StreamID, status={webtransport_session, _}}) + when StreamID =:= SessionID -> + %% We remove the session stream but do the shutdown outside this function. + false; + (StreamID, #stream{status={webtransport_stream, StreamSessionID}}) + when StreamSessionID =:= SessionID -> + cowboy_quicer:shutdown_stream(Conn, StreamID, + both, cow_http3:error_to_code(wt_session_gone)), + false; + (_, _) -> + true + end, Streams0), + %% Keep the streams in lingering state. + %% We only keep up to 100 streams in this state. @todo Make it configurable? + Terminated = maps:keys(Streams0) -- maps:keys(Streams), + Lingering = lists:sublist(Terminated ++ Lingering0, 100), + %% Update the HTTP3 state machine. + HTTP3Machine = cow_http3_machine:close_webtransport_session(SessionID, HTTP3Machine0), + State#state{ + http3_machine=HTTP3Machine, + streams=Streams, + lingering_streams=Lingering + }. + +stream_peer_send_shutdown(State=#state{conn=Conn}, StreamID) -> + case stream_get(State, StreamID) of + %% Cleanly terminating the CONNECT stream is equivalent + %% to an application error code of 0 and empty message. + Stream = #stream{status={webtransport_session, _}} -> + webtransport_event(State, StreamID, {closed, 0, <<>>}), + %% Shutdown the CONNECT stream fully. + cowboy_quicer:shutdown_stream(Conn, StreamID), + webtransport_terminate_session(State, Stream); + _ -> + State + end. + +reset_stream(State0=#state{conn=Conn, http3_machine=HTTP3Machine0}, + Stream=#stream{id=StreamID}, Error) -> + Reason = case Error of + {internal_error, _, _} -> h3_internal_error; + {stream_error, Reason0, _} -> Reason0 + end, + %% @todo Do we want to close both sides? + %% @todo Should we close the send side if the receive side was already closed? + cowboy_quicer:shutdown_stream(Conn, StreamID, + both, cow_http3:error_to_code(Reason)), + State1 = case cow_http3_machine:reset_stream(StreamID, HTTP3Machine0) of + {ok, HTTP3Machine} -> + terminate_stream(State0#state{http3_machine=HTTP3Machine}, Stream, Error); + {error, not_found} -> + terminate_stream(State0, Stream, Error) + end, +%% @todo +% case reset_rate(State1) of +% {ok, State} -> +% State; +% error -> +% terminate(State1, {connection_error, enhance_your_calm, +% 'Stream reset rate larger than configuration allows. Flood? (CVE-2019-9514)'}) +% end. + State1. + +stop_stream(State0=#state{http3_machine=HTTP3Machine}, Stream=#stream{id=StreamID}) -> + %% We abort reading when stopping the stream but only + %% if the client was not finished sending data. + %% We mark the stream as 'stopping' either way. + State = case cow_http3_machine:get_bidi_stream_remote_state(StreamID, HTTP3Machine) of + {ok, fin} -> + stream_store(State0, Stream#stream{status=stopping}); + {error, not_found} -> + stream_store(State0, Stream#stream{status=stopping}); + _ -> + stream_abort_receive(State0, Stream, h3_no_error) + end, + %% Then we may need to send a response or terminate it + %% if the stream handler did not do so already. + case cow_http3_machine:get_bidi_stream_local_state(StreamID, HTTP3Machine) of + %% When the stream terminates normally (without resetting the stream) + %% and no response was sent, we need to send a proper response back to the client. + {ok, idle} -> + info(State, StreamID, {response, 204, #{}, <<>>}); + %% When a response was sent but not terminated, we need to close the stream. + %% We send a final DATA frame to complete the stream. + {ok, nofin} -> + info(State, StreamID, {data, fin, <<>>}); + %% When a response was sent fully we can terminate the stream, + %% regardless of the stream being in half-closed or closed state. + _ -> + terminate_stream(State, Stream, normal) + end. + +maybe_terminate_stream(State, Stream=#stream{status=stopping}) -> + terminate_stream(State, Stream, normal); +%% The Stream will be stored in the State at the end of commands processing. +maybe_terminate_stream(State, _) -> + State. + +terminate_stream(State=#state{streams=Streams0, children=Children0}, + #stream{id=StreamID, state=StreamState}, Reason) -> + Streams = maps:remove(StreamID, Streams0), + terminate_stream_handler(State, StreamID, Reason, StreamState), + Children = cowboy_children:shutdown(Children0, StreamID), + stream_linger(State#state{streams=Streams, children=Children}, StreamID). + +terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) -> + try + cowboy_stream:terminate(StreamID, Reason, StreamState) + catch Class:Exception:Stacktrace -> + cowboy:log(cowboy_stream:make_error_log(terminate, + [StreamID, Reason, StreamState], + Class, Exception, Stacktrace), Opts) + end. + +ignored_frame(State=#state{http3_machine=HTTP3Machine0}, #stream{id=StreamID}) -> + case cow_http3_machine:ignored_frame(StreamID, HTTP3Machine0) of + {ok, HTTP3Machine} -> + State#state{http3_machine=HTTP3Machine}; + {error, Error={connection_error, _, _}, HTTP3Machine} -> + terminate(State#state{http3_machine=HTTP3Machine}, Error) + end. + +stream_abort_receive(State=#state{conn=Conn}, Stream=#stream{id=StreamID}, Reason) -> + cowboy_quicer:shutdown_stream(Conn, StreamID, + receiving, cow_http3:error_to_code(Reason)), + stream_store(State, Stream#stream{status=stopping}). + +%% @todo Graceful connection shutdown. +%% We terminate the connection immediately if it hasn't fully been initialized. +-spec goaway(#state{}, {goaway, _}) -> no_return(). +goaway(State, {goaway, _}) -> + terminate(State, {stop, goaway, 'The connection is going away.'}). + +%% Function copied from cowboy_http. +maybe_socket_error(State, {error, closed}) -> + terminate(State, {socket_error, closed, 'The socket has been closed.'}); +maybe_socket_error(State, Reason) -> + maybe_socket_error(State, Reason, 'An error has occurred on the socket.'). + +maybe_socket_error(_, Result = ok, _) -> + Result; +maybe_socket_error(_, Result = {ok, _}, _) -> + Result; +maybe_socket_error(State, {error, Reason}, Human) -> + terminate(State, {socket_error, Reason, Human}). + +-spec terminate(#state{} | undefined, _) -> no_return(). +terminate(undefined, Reason) -> + exit({shutdown, Reason}); +terminate(State=#state{conn=Conn, %http3_status=Status, + %http3_machine=HTTP3Machine, + streams=Streams, children=Children}, Reason) -> +% if +% Status =:= connected; Status =:= closing_initiated -> +%% @todo +% %% We are terminating so it's OK if we can't send the GOAWAY anymore. +% _ = cowboy_quicer:send(Conn, ControlID, cow_http3:goaway( +% cow_http3_machine:get_last_streamid(HTTP3Machine))), + %% We already sent the GOAWAY frame. +% Status =:= closing -> +% ok +% end, + terminate_all_streams(State, maps:to_list(Streams), Reason), + cowboy_children:terminate(Children), +% terminate_linger(State), + _ = cowboy_quicer:shutdown(Conn, cow_http3:error_to_code(terminate_reason(Reason))), + exit({shutdown, Reason}). + +terminate_reason({connection_error, Reason, _}) -> Reason; +terminate_reason({stop, _, _}) -> h3_no_error; +terminate_reason({socket_error, _, _}) -> h3_internal_error. +%terminate_reason({internal_error, _, _}) -> internal_error. + +terminate_all_streams(_, [], _) -> + ok; +terminate_all_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], Reason) -> + terminate_stream_handler(State, StreamID, Reason, StreamState), + terminate_all_streams(State, Tail, Reason). + +stream_get(#state{streams=Streams}, StreamID) -> + maps:get(StreamID, Streams, error). + +stream_new_local(State, StreamID, StreamType, Status) -> + stream_new(State, StreamID, StreamType, unidi_local, Status). + +stream_new_remote(State, StreamID, StreamType) -> + Status = case StreamType of + unidi -> header; + bidi -> normal + end, + stream_new(State, StreamID, StreamType, unidi_remote, Status). + +stream_new(State=#state{http3_machine=HTTP3Machine0, streams=Streams}, + StreamID, StreamType, UnidiType, Status) -> + {HTTP3Machine, Status} = case StreamType of + unidi -> + {cow_http3_machine:init_unidi_stream(StreamID, UnidiType, HTTP3Machine0), + Status}; + bidi -> + {cow_http3_machine:init_bidi_stream(StreamID, HTTP3Machine0), + Status} + end, + Stream = #stream{id=StreamID, status=Status}, + State#state{http3_machine=HTTP3Machine, streams=Streams#{StreamID => Stream}}. + +%% Stream closed message for a local (write-only) unidi stream. +stream_closed(State=#state{local_control_id=StreamID}, StreamID, _) -> + stream_closed1(State, StreamID); +stream_closed(State=#state{local_encoder_id=StreamID}, StreamID, _) -> + stream_closed1(State, StreamID); +stream_closed(State=#state{local_decoder_id=StreamID}, StreamID, _) -> + stream_closed1(State, StreamID); +stream_closed(State=#state{opts=Opts, + streams=Streams0, children=Children0}, StreamID, ErrorCode) -> + case maps:take(StreamID, Streams0) of + %% In the WT session's case, streams will be + %% removed in webtransport_terminate_session. + {Stream=#stream{status={webtransport_session, _}}, _} -> + webtransport_event(State, StreamID, closed_abruptly), + webtransport_terminate_session(State, Stream); + {#stream{state=undefined}, Streams} -> + %% Unidi stream has no handler/children. + stream_closed1(State#state{streams=Streams}, StreamID); + %% We only stop bidi streams if the stream was closed with an error + %% or the stream was already in the process of stopping. + {#stream{status=Status, state=StreamState}, Streams} + when Status =:= stopping; ErrorCode =/= 0 -> + terminate_stream_handler(State, StreamID, closed, StreamState), + Children = cowboy_children:shutdown(Children0, StreamID), + stream_closed1(State#state{streams=Streams, children=Children}, StreamID); + %% Don't remove a stream that terminated properly but + %% has chosen to remain up (custom stream handlers). + {_, _} -> + stream_closed1(State, StreamID); + %% Stream closed message for a stream that has been reset. Ignore. + error -> + case is_lingering_stream(State, StreamID) of + true -> + ok; + false -> + %% We avoid logging the data as it could be quite large. + cowboy:log(warning, "Received stream_closed for unknown stream ~p. ~p ~p", + [StreamID, self(), Streams0], Opts) + end, + State + end. + +stream_closed1(State=#state{http3_machine=HTTP3Machine0}, StreamID) -> + case cow_http3_machine:close_stream(StreamID, HTTP3Machine0) of + {ok, HTTP3Machine} -> + State#state{http3_machine=HTTP3Machine}; + {error, Error={connection_error, _, _}, HTTP3Machine} -> + terminate(State#state{http3_machine=HTTP3Machine}, Error) + end. + +stream_store(State=#state{streams=Streams}, Stream=#stream{id=StreamID}) -> + State#state{streams=Streams#{StreamID => Stream}}. + +stream_linger(State=#state{lingering_streams=Lingering0}, StreamID) -> + %% We only keep up to 100 streams in this state. @todo Make it configurable? + Lingering = [StreamID|lists:sublist(Lingering0, 100 - 1)], + State#state{lingering_streams=Lingering}. + +is_lingering_stream(#state{lingering_streams=Lingering}, StreamID) -> + lists:member(StreamID, Lingering). diff --git a/src/cowboy_loop.erl b/src/cowboy_loop.erl index 21eb96e..629d06e 100644 --- a/src/cowboy_loop.erl +++ b/src/cowboy_loop.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -17,12 +17,15 @@ -export([upgrade/4]). -export([upgrade/5]). --export([loop/4]). +-export([loop/5]). -export([system_continue/3]). -export([system_terminate/4]). -export([system_code_change/4]). +%% From gen_server. +-define(is_timeout(X), ((X) =:= infinity orelse (is_integer(X) andalso (X) >= 0))). + -callback init(Req, any()) -> {ok | module(), Req, any()} | {module(), Req, any(), any()} @@ -41,40 +44,46 @@ -> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). upgrade(Req, Env, Handler, HandlerState) -> - loop(Req, Env, Handler, HandlerState). + loop(Req, Env, Handler, HandlerState, infinity). --spec upgrade(Req, Env, module(), any(), hibernate) +-spec upgrade(Req, Env, module(), any(), hibernate | timeout()) -> {suspend, ?MODULE, loop, [any()]} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). upgrade(Req, Env, Handler, HandlerState, hibernate) -> - suspend(Req, Env, Handler, HandlerState). + suspend(Req, Env, Handler, HandlerState); +upgrade(Req, Env, Handler, HandlerState, Timeout) when ?is_timeout(Timeout) -> + loop(Req, Env, Handler, HandlerState, Timeout). --spec loop(Req, Env, module(), any()) +-spec loop(Req, Env, module(), any(), timeout()) -> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). %% @todo Handle system messages. -loop(Req=#{pid := Parent}, Env, Handler, HandlerState) -> +loop(Req=#{pid := Parent}, Env, Handler, HandlerState, Timeout) -> receive %% System messages. {'EXIT', Parent, Reason} -> terminate(Req, Env, Handler, HandlerState, Reason); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], - {Req, Env, Handler, HandlerState}); + {Req, Env, Handler, HandlerState, Timeout}); %% Calls from supervisor module. {'$gen_call', From, Call} -> cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE), - loop(Req, Env, Handler, HandlerState); + loop(Req, Env, Handler, HandlerState, Timeout); Message -> - call(Req, Env, Handler, HandlerState, Message) + call(Req, Env, Handler, HandlerState, Timeout, Message) + after Timeout -> + call(Req, Env, Handler, HandlerState, Timeout, timeout) end. -call(Req0, Env, Handler, HandlerState0, Message) -> +call(Req0, Env, Handler, HandlerState0, Timeout, Message) -> try Handler:info(Message, Req0, HandlerState0) of {ok, Req, HandlerState} -> - loop(Req, Env, Handler, HandlerState); + loop(Req, Env, Handler, HandlerState, Timeout); {ok, Req, HandlerState, hibernate} -> suspend(Req, Env, Handler, HandlerState); + {ok, Req, HandlerState, NewTimeout} when ?is_timeout(NewTimeout) -> + loop(Req, Env, Handler, HandlerState, NewTimeout); {stop, Req, HandlerState} -> terminate(Req, Env, Handler, HandlerState, stop) catch Class:Reason:Stacktrace -> @@ -83,7 +92,7 @@ call(Req0, Env, Handler, HandlerState0, Message) -> end. suspend(Req, Env, Handler, HandlerState) -> - {suspend, ?MODULE, loop, [Req, Env, Handler, HandlerState]}. + {suspend, ?MODULE, loop, [Req, Env, Handler, HandlerState, infinity]}. terminate(Req, Env, Handler, HandlerState, Reason) -> Result = cowboy_handler:terminate(Reason, Req, HandlerState, Handler), @@ -91,15 +100,15 @@ terminate(Req, Env, Handler, HandlerState, Reason) -> %% System callbacks. --spec system_continue(_, _, {Req, Env, module(), any()}) +-spec system_continue(_, _, {Req, Env, module(), any(), timeout()}) -> {ok, Req, Env} | {suspend, ?MODULE, loop, [any()]} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). -system_continue(_, _, {Req, Env, Handler, HandlerState}) -> - loop(Req, Env, Handler, HandlerState). +system_continue(_, _, {Req, Env, Handler, HandlerState, Timeout}) -> + loop(Req, Env, Handler, HandlerState, Timeout). --spec system_terminate(any(), _, _, {Req, Env, module(), any()}) +-spec system_terminate(any(), _, _, {Req, Env, module(), any(), timeout()}) -> {ok, Req, Env} when Req::cowboy_req:req(), Env::cowboy_middleware:env(). -system_terminate(Reason, _, _, {Req, Env, Handler, HandlerState}) -> +system_terminate(Reason, _, _, {Req, Env, Handler, HandlerState, _}) -> terminate(Req, Env, Handler, HandlerState, Reason). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} diff --git a/src/cowboy_metrics_h.erl b/src/cowboy_metrics_h.erl index 4107aac..67bf1a6 100644 --- a/src/cowboy_metrics_h.erl +++ b/src/cowboy_metrics_h.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_middleware.erl b/src/cowboy_middleware.erl index 9a739f1..97c1498 100644 --- a/src/cowboy_middleware.erl +++ b/src/cowboy_middleware.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2013-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_quicer.erl b/src/cowboy_quicer.erl new file mode 100644 index 0000000..aa52fae --- /dev/null +++ b/src/cowboy_quicer.erl @@ -0,0 +1,283 @@ +%% Copyright (c) Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% QUIC transport using the emqx/quicer NIF. + +-module(cowboy_quicer). + +%% Connection. +-export([peername/1]). +-export([sockname/1]). +-export([peercert/1]). +-export([shutdown/2]). + +%% Streams. +-export([start_bidi_stream/2]). +-export([start_unidi_stream/2]). +-export([send/3]). +-export([send/4]). +-export([send_datagram/2]). +-export([shutdown_stream/2]). +-export([shutdown_stream/4]). + +%% Messages. +-export([handle/1]). + +-ifndef(COWBOY_QUICER). + +-spec peername(_) -> no_return(). +peername(_) -> no_quicer(). + +-spec sockname(_) -> no_return(). +sockname(_) -> no_quicer(). + +-spec peercert(_) -> no_return(). +peercert(_) -> no_quicer(). + +-spec shutdown(_, _) -> no_return(). +shutdown(_, _) -> no_quicer(). + +-spec start_bidi_stream(_, _) -> no_return(). +start_bidi_stream(_, _) -> no_quicer(). + +-spec start_unidi_stream(_, _) -> no_return(). +start_unidi_stream(_, _) -> no_quicer(). + +-spec send(_, _, _) -> no_return(). +send(_, _, _) -> no_quicer(). + +-spec send(_, _, _, _) -> no_return(). +send(_, _, _, _) -> no_quicer(). + +-spec send_datagram(_, _) -> no_return(). +send_datagram(_, _) -> no_quicer(). + +-spec shutdown_stream(_, _) -> no_return(). +shutdown_stream(_, _) -> no_quicer(). + +-spec shutdown_stream(_, _, _, _) -> no_return(). +shutdown_stream(_, _, _, _) -> no_quicer(). + +-spec handle(_) -> no_return(). +handle(_) -> no_quicer(). + +no_quicer() -> + error({no_quicer, + "Cowboy must be compiled with environment variable COWBOY_QUICER=1 " + "or with compilation flag -D COWBOY_QUICER=1 in order to enable " + "QUIC support using the emqx/quic NIF"}). + +-else. + +%% @todo Make quicer export these types. +-type quicer_connection_handle() :: reference(). +-export_type([quicer_connection_handle/0]). + +-type quicer_app_errno() :: non_neg_integer(). + +-include_lib("quicer/include/quicer.hrl"). + +%% Connection. + +-spec peername(quicer_connection_handle()) + -> {ok, {inet:ip_address(), inet:port_number()}} + | {error, any()}. + +peername(Conn) -> + quicer:peername(Conn). + +-spec sockname(quicer_connection_handle()) + -> {ok, {inet:ip_address(), inet:port_number()}} + | {error, any()}. + +sockname(Conn) -> + quicer:sockname(Conn). + +-spec peercert(quicer_connection_handle()) + -> {ok, public_key:der_encoded()} + | {error, any()}. + +peercert(Conn) -> + quicer_nif:peercert(Conn). + +-spec shutdown(quicer_connection_handle(), quicer_app_errno()) + -> ok | {error, any()}. + +shutdown(Conn, ErrorCode) -> + quicer:shutdown_connection(Conn, + ?QUIC_CONNECTION_SHUTDOWN_FLAG_NONE, + ErrorCode). + +%% Streams. + +-spec start_bidi_stream(quicer_connection_handle(), iodata()) + -> {ok, cow_http3:stream_id()} + | {error, any()}. + +start_bidi_stream(Conn, InitialData) -> + start_stream(Conn, InitialData, ?QUIC_STREAM_OPEN_FLAG_NONE). + +-spec start_unidi_stream(quicer_connection_handle(), iodata()) + -> {ok, cow_http3:stream_id()} + | {error, any()}. + +start_unidi_stream(Conn, InitialData) -> + start_stream(Conn, InitialData, ?QUIC_STREAM_OPEN_FLAG_UNIDIRECTIONAL). + +start_stream(Conn, InitialData, OpenFlag) -> + case quicer:start_stream(Conn, #{ + active => true, + open_flag => OpenFlag}) of + {ok, StreamRef} -> + case quicer:send(StreamRef, InitialData) of + {ok, _} -> + {ok, StreamID} = quicer:get_stream_id(StreamRef), + put({quicer_stream, StreamID}, StreamRef), + {ok, StreamID}; + Error -> + Error + end; + {error, Reason1, Reason2} -> + {error, {Reason1, Reason2}}; + Error -> + Error + end. + +-spec send(quicer_connection_handle(), cow_http3:stream_id(), iodata()) + -> ok | {error, any()}. + +send(Conn, StreamID, Data) -> + send(Conn, StreamID, Data, nofin). + +-spec send(quicer_connection_handle(), cow_http3:stream_id(), iodata(), cow_http:fin()) + -> ok | {error, any()}. + +send(_Conn, StreamID, Data, IsFin) -> + StreamRef = get({quicer_stream, StreamID}), + Size = iolist_size(Data), + case quicer:send(StreamRef, Data, send_flag(IsFin)) of + {ok, Size} -> + ok; + {error, Reason1, Reason2} -> + {error, {Reason1, Reason2}}; + Error -> + Error + end. + +send_flag(nofin) -> ?QUIC_SEND_FLAG_NONE; +send_flag(fin) -> ?QUIC_SEND_FLAG_FIN. + +-spec send_datagram(quicer_connection_handle(), iodata()) + -> ok | {error, any()}. + +send_datagram(Conn, Data) -> + %% @todo Fix/ignore the Dialyzer error instead of doing this. + DataBin = iolist_to_binary(Data), + Size = byte_size(DataBin), + case quicer:send_dgram(Conn, DataBin) of + {ok, Size} -> + ok; + %% @todo Handle error cases. + Error -> + Error + end. + +-spec shutdown_stream(quicer_connection_handle(), cow_http3:stream_id()) + -> ok. + +shutdown_stream(_Conn, StreamID) -> + StreamRef = get({quicer_stream, StreamID}), + _ = quicer:shutdown_stream(StreamRef), + ok. + +-spec shutdown_stream(quicer_connection_handle(), + cow_http3:stream_id(), both | receiving, quicer_app_errno()) + -> ok. + +shutdown_stream(_Conn, StreamID, Dir, ErrorCode) -> + StreamRef = get({quicer_stream, StreamID}), + _ = quicer:shutdown_stream(StreamRef, shutdown_flag(Dir), ErrorCode, infinity), + ok. + +%% @todo Are these flags correct for what we want? +shutdown_flag(both) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT; +shutdown_flag(receiving) -> ?QUIC_STREAM_SHUTDOWN_FLAG_ABORT_RECEIVE. + +%% Messages. + +%% @todo Probably should have the Conn given as argument too? +-spec handle({quic, _, _, _}) + -> {data, cow_http3:stream_id(), cow_http:fin(), binary()} + | {datagram, binary()} + | {stream_started, cow_http3:stream_id(), unidi | bidi} + | {stream_closed, cow_http3:stream_id(), quicer_app_errno()} + | closed + | {peer_send_shutdown, cow_http3:stream_id()} + | ok + | unknown + | {socket_error, any()}. + +handle({quic, Data, StreamRef, #{flags := Flags}}) when is_binary(Data) -> + {ok, StreamID} = quicer:get_stream_id(StreamRef), + IsFin = case Flags band ?QUIC_RECEIVE_FLAG_FIN of + ?QUIC_RECEIVE_FLAG_FIN -> fin; + _ -> nofin + end, + {data, StreamID, IsFin, Data}; +%% @todo Match on Conn. +handle({quic, Data, _Conn, Flags}) when is_binary(Data), is_integer(Flags) -> + {datagram, Data}; +%% QUIC_CONNECTION_EVENT_PEER_STREAM_STARTED. +handle({quic, new_stream, StreamRef, #{flags := Flags}}) -> + case quicer:setopt(StreamRef, active, true) of + ok -> + {ok, StreamID} = quicer:get_stream_id(StreamRef), + put({quicer_stream, StreamID}, StreamRef), + StreamType = case quicer:is_unidirectional(Flags) of + true -> unidi; + false -> bidi + end, + {stream_started, StreamID, StreamType}; + {error, Reason} -> + {socket_error, Reason} + end; +%% QUIC_STREAM_EVENT_SHUTDOWN_COMPLETE. +handle({quic, stream_closed, StreamRef, #{error := ErrorCode}}) -> + {ok, StreamID} = quicer:get_stream_id(StreamRef), + {stream_closed, StreamID, ErrorCode}; +%% QUIC_CONNECTION_EVENT_SHUTDOWN_COMPLETE. +handle({quic, closed, Conn, _Flags}) -> + _ = quicer:close_connection(Conn), + closed; +%% The following events are currently ignored either because +%% I do not know what they do or because we do not need to +%% take action. +handle({quic, streams_available, _Conn, _Props}) -> + ok; +handle({quic, dgram_state_changed, _Conn, _Props}) -> + ok; +%% QUIC_CONNECTION_EVENT_SHUTDOWN_INITIATED_BY_TRANSPORT +handle({quic, transport_shutdown, _Conn, _Flags}) -> + ok; +handle({quic, peer_send_shutdown, StreamRef, undefined}) -> + {ok, StreamID} = quicer:get_stream_id(StreamRef), + {peer_send_shutdown, StreamID}; +handle({quic, send_shutdown_complete, _StreamRef, _IsGraceful}) -> + ok; +handle({quic, shutdown, _Conn, success}) -> + ok; +handle(_Msg) -> + unknown. + +-endif. diff --git a/src/cowboy_req.erl b/src/cowboy_req.erl index 90c5a3a..550054e 100644 --- a/src/cowboy_req.erl +++ b/src/cowboy_req.erl @@ -1,5 +1,5 @@ -%% Copyright (c) 2011-2017, Loïc Hoguin <[email protected]> -%% Copyright (c) 2011, Anthony Ramine <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> +%% Copyright (c) Anthony Ramine <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -445,6 +445,7 @@ parse_header_fun(<<"sec-websocket-protocol">>) -> fun cow_http_hd:parse_sec_webs parse_header_fun(<<"sec-websocket-version">>) -> fun cow_http_hd:parse_sec_websocket_version_req/1; parse_header_fun(<<"trailer">>) -> fun cow_http_hd:parse_trailer/1; parse_header_fun(<<"upgrade">>) -> fun cow_http_hd:parse_upgrade/1; +parse_header_fun(<<"wt-available-protocols">>) -> fun cow_http_hd:parse_wt_available_protocols/1; parse_header_fun(<<"x-forwarded-for">>) -> fun cow_http_hd:parse_x_forwarded_for/1. parse_header(Name, Req, Default, ParseFun) -> @@ -462,7 +463,7 @@ filter_cookies(Names0, Req=#{headers := Headers}) -> case header(<<"cookie">>, Req) of undefined -> Req; Value0 -> - Cookies0 = binary:split(Value0, <<$;>>), + Cookies0 = binary:split(Value0, <<$;>>, [global]), Cookies = lists:filter(fun(Cookie) -> lists:member(cookie_name(Cookie), Names) end, Cookies0), @@ -521,7 +522,11 @@ read_body(Req=#{has_read_body := true}, _) -> read_body(Req, Opts) -> Length = maps:get(length, Opts, 8000000), Period = maps:get(period, Opts, 15000), - Timeout = maps:get(timeout, Opts, Period + 1000), + DefaultTimeout = case Period of + infinity -> infinity; %% infinity + 1000 = infinity. + _ -> Period + 1000 + end, + Timeout = maps:get(timeout, Opts, DefaultTimeout), Ref = make_ref(), cast({read_body, self(), Ref, Length, Period}, Req), receive @@ -710,22 +715,43 @@ set_resp_cookie(Name, Value, Req, Opts) -> RespCookies = maps:get(resp_cookies, Req, #{}), Req#{resp_cookies => RespCookies#{Name => Cookie}}. -%% @todo We could add has_resp_cookie and delete_resp_cookie now. +%% @todo We could add has_resp_cookie and unset_resp_cookie now. -spec set_resp_header(binary(), iodata(), Req) -> Req when Req::req(). +set_resp_header(<<"set-cookie">>, _, _) -> + exit({response_error, invalid_header, + 'Response cookies must be set using cowboy_req:set_resp_cookie/3,4.'}); set_resp_header(Name, Value, Req=#{resp_headers := RespHeaders}) -> Req#{resp_headers => RespHeaders#{Name => Value}}; set_resp_header(Name,Value, Req) -> Req#{resp_headers => #{Name => Value}}. --spec set_resp_headers(cowboy:http_headers(), Req) +-spec set_resp_headers(cowboy:http_headers() | [{binary(), iodata()}], Req) -> Req when Req::req(). +set_resp_headers(Headers, Req) when is_list(Headers) -> + set_resp_headers_list(Headers, Req, #{}); +set_resp_headers(#{<<"set-cookie">> := _}, _) -> + exit({response_error, invalid_header, + 'Response cookies must be set using cowboy_req:set_resp_cookie/3,4.'}); set_resp_headers(Headers, Req=#{resp_headers := RespHeaders}) -> Req#{resp_headers => maps:merge(RespHeaders, Headers)}; set_resp_headers(Headers, Req) -> Req#{resp_headers => Headers}. +set_resp_headers_list([], Req, Acc) -> + set_resp_headers(Acc, Req); +set_resp_headers_list([{<<"set-cookie">>, _}|_], _, _) -> + exit({response_error, invalid_header, + 'Response cookies must be set using cowboy_req:set_resp_cookie/3,4.'}); +set_resp_headers_list([{Name, Value}|Tail], Req, Acc) -> + case Acc of + #{Name := ValueAcc} -> + set_resp_headers_list(Tail, Req, Acc#{Name => [ValueAcc, <<", ">>, Value]}); + _ -> + set_resp_headers_list(Tail, Req, Acc#{Name => Value}) + end. + -spec resp_header(binary(), req()) -> binary() | undefined. resp_header(Name, Req) -> resp_header(Name, Req, undefined). @@ -775,7 +801,11 @@ inform(Status, Req) -> -spec inform(cowboy:http_status(), cowboy:http_headers(), req()) -> ok. inform(_, _, #{has_sent_resp := _}) -> - error(function_clause); %% @todo Better error message. + exit({response_error, response_already_sent, + 'The final response has already been sent.'}); +inform(_, #{<<"set-cookie">> := _}, _) -> + exit({response_error, invalid_header, + 'Response cookies must be set using cowboy_req:set_resp_cookie/3,4.'}); inform(Status, Headers, Req) when is_integer(Status); is_binary(Status) -> cast({inform, Status, Headers}, Req). @@ -793,7 +823,11 @@ reply(Status, Headers, Req) -> -spec reply(cowboy:http_status(), cowboy:http_headers(), resp_body(), Req) -> Req when Req::req(). reply(_, _, _, #{has_sent_resp := _}) -> - error(function_clause); %% @todo Better error message. + exit({response_error, response_already_sent, + 'The final response has already been sent.'}); +reply(_, #{<<"set-cookie">> := _}, _, _) -> + exit({response_error, invalid_header, + 'Response cookies must be set using cowboy_req:set_resp_cookie/3,4.'}); reply(Status, Headers, {sendfile, _, 0, _}, Req) when is_integer(Status); is_binary(Status) -> do_reply(Status, Headers#{ @@ -809,20 +843,26 @@ reply(Status, Headers, SendFile = {sendfile, _, Len, _}, Req) %% Neither status code must include a response body. (RFC7230 3.3) reply(Status, Headers, Body, Req) when Status =:= 204; Status =:= 304 -> - 0 = iolist_size(Body), - do_reply(Status, Headers, Body, Req); + do_reply_ensure_no_body(Status, Headers, Body, Req); reply(Status = <<"204",_/bits>>, Headers, Body, Req) -> - 0 = iolist_size(Body), - do_reply(Status, Headers, Body, Req); + do_reply_ensure_no_body(Status, Headers, Body, Req); reply(Status = <<"304",_/bits>>, Headers, Body, Req) -> - 0 = iolist_size(Body), - do_reply(Status, Headers, Body, Req); + do_reply_ensure_no_body(Status, Headers, Body, Req); reply(Status, Headers, Body, Req) when is_integer(Status); is_binary(Status) -> do_reply(Status, Headers#{ <<"content-length">> => integer_to_binary(iolist_size(Body)) }, Body, Req). +do_reply_ensure_no_body(Status, Headers, Body, Req) -> + case iolist_size(Body) of + 0 -> + do_reply(Status, Headers, Body, Req); + _ -> + exit({response_error, payload_too_large, + '204 and 304 responses must not include a body. (RFC7230 3.3)'}) + end. + %% Don't send any body for HEAD responses. While the protocol code is %% supposed to enforce this rule, we prefer to avoid copying too much %% data around if we can avoid it. @@ -843,16 +883,19 @@ stream_reply(Status, Req) -> -spec stream_reply(cowboy:http_status(), cowboy:http_headers(), Req) -> Req when Req::req(). stream_reply(_, _, #{has_sent_resp := _}) -> - error(function_clause); + exit({response_error, response_already_sent, + 'The final response has already been sent.'}); +stream_reply(_, #{<<"set-cookie">> := _}, _) -> + exit({response_error, invalid_header, + 'Response cookies must be set using cowboy_req:set_resp_cookie/3,4.'}); %% 204 and 304 responses must NOT send a body. We therefore %% transform the call to a full response and expect the user %% to NOT call stream_body/3 afterwards. (RFC7230 3.3) -stream_reply(Status = 204, Headers=#{}, Req) -> +stream_reply(Status, Headers=#{}, Req) + when Status =:= 204; Status =:= 304 -> reply(Status, Headers, <<>>, Req); stream_reply(Status = <<"204",_/bits>>, Headers=#{}, Req) -> reply(Status, Headers, <<>>, Req); -stream_reply(Status = 304, Headers=#{}, Req) -> - reply(Status, Headers, <<>>, Req); stream_reply(Status = <<"304",_/bits>>, Headers=#{}, Req) -> reply(Status, Headers, <<>>, Req); stream_reply(Status, Headers=#{}, Req) when is_integer(Status); is_binary(Status) -> @@ -896,6 +939,9 @@ stream_events(Events, IsFin, Req=#{has_sent_resp := headers}) -> stream_body({data, self(), IsFin, cow_sse:events(Events)}, Req). -spec stream_trailers(cowboy:http_headers(), req()) -> ok. +stream_trailers(#{<<"set-cookie">> := _}, _) -> + exit({response_error, invalid_header, + 'Response cookies must be set using cowboy_req:set_resp_cookie/3,4.'}); stream_trailers(Trailers, Req=#{has_sent_resp := headers}) -> cast({trailers, Trailers}, Req). @@ -907,6 +953,9 @@ push(Path, Headers, Req) -> %% @todo Path, Headers, Opts, everything should be in proper binary, %% or normalized when creating the Req object. -spec push(iodata(), cowboy:http_headers(), req(), push_opts()) -> ok. +push(_, _, #{has_sent_resp := _}, _) -> + exit({response_error, response_already_sent, + 'The final response has already been sent.'}); push(Path, Headers, Req=#{scheme := Scheme0, host := Host0, port := Port0}, Opts) -> Method = maps:get(method, Opts, <<"GET">>), Scheme = maps:get(scheme, Opts, Scheme0), @@ -991,7 +1040,12 @@ filter([], Map, Errors) -> _ -> {error, Errors} end; filter([{Key, Constraints}|Tail], Map, Errors) -> - filter_constraints(Tail, Map, Errors, Key, maps:get(Key, Map), Constraints); + case maps:find(Key, Map) of + {ok, Value} -> + filter_constraints(Tail, Map, Errors, Key, Value, Constraints); + error -> + filter(Tail, Map, Errors#{Key => required}) + end; filter([{Key, Constraints, Default}|Tail], Map, Errors) -> case maps:find(Key, Map) of {ok, Value} -> diff --git a/src/cowboy_rest.erl b/src/cowboy_rest.erl index 7d0fe80..9f30fcf 100644 --- a/src/cowboy_rest.erl +++ b/src/cowboy_rest.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -97,7 +97,7 @@ -optional_callbacks([forbidden/2]). -callback generate_etag(Req, State) - -> {binary() | {weak | strong, binary()}, Req, State} + -> {binary() | {weak | strong, binary()} | undefined, Req, State} when Req::cowboy_req:req(), State::any(). -optional_callbacks([generate_etag/2]). @@ -246,9 +246,6 @@ handler :: atom(), handler_state :: any(), - %% Allowed methods. Only used for OPTIONS requests. - allowed_methods :: [binary()] | undefined, - %% Media type. content_types_p = [] :: [{binary() | {binary(), binary(), [{binary(), binary()}] | '*'}, @@ -307,17 +304,17 @@ known_methods(Req, State=#state{method=Method}) -> Method =:= <<"POST">>; Method =:= <<"PUT">>; Method =:= <<"PATCH">>; Method =:= <<"DELETE">>; Method =:= <<"OPTIONS">> -> - next(Req, State, fun uri_too_long/2); + uri_too_long(Req, State); no_call -> - next(Req, State, 501); + respond(Req, State, 501); {stop, Req2, State2} -> terminate(Req2, State2); {Switch, Req2, State2} when element(1, Switch) =:= switch_handler -> switch_handler(Switch, Req2, State2); {List, Req2, State2} -> case lists:member(Method, List) of - true -> next(Req2, State2, fun uri_too_long/2); - false -> next(Req2, State2, 501) + true -> uri_too_long(Req2, State2); + false -> respond(Req2, State2, 501) end end. @@ -327,39 +324,26 @@ uri_too_long(Req, State) -> %% allowed_methods/2 should return a list of binary methods. allowed_methods(Req, State=#state{method=Method}) -> case call(Req, State, allowed_methods) of - no_call when Method =:= <<"HEAD">>; Method =:= <<"GET">> -> - next(Req, State, fun malformed_request/2); - no_call when Method =:= <<"OPTIONS">> -> - next(Req, State#state{allowed_methods= - [<<"HEAD">>, <<"GET">>, <<"OPTIONS">>]}, - fun malformed_request/2); + no_call when Method =:= <<"HEAD">>; Method =:= <<"GET">>; Method =:= <<"OPTIONS">> -> + Req2 = cowboy_req:set_resp_header(<<"allow">>, <<"HEAD, GET, OPTIONS">>, Req), + malformed_request(Req2, State); no_call -> - method_not_allowed(Req, State, - [<<"HEAD">>, <<"GET">>, <<"OPTIONS">>]); + Req2 = cowboy_req:set_resp_header(<<"allow">>, <<"HEAD, GET, OPTIONS">>, Req), + respond(Req2, State, 405); {stop, Req2, State2} -> terminate(Req2, State2); {Switch, Req2, State2} when element(1, Switch) =:= switch_handler -> switch_handler(Switch, Req2, State2); {List, Req2, State2} -> + Req3 = cowboy_req:set_resp_header(<<"allow">>, cow_http_hd:allow(List), Req2), case lists:member(Method, List) of - true when Method =:= <<"OPTIONS">> -> - next(Req2, State2#state{allowed_methods=List}, - fun malformed_request/2); true -> - next(Req2, State2, fun malformed_request/2); + malformed_request(Req3, State2); false -> - method_not_allowed(Req2, State2, List) + respond(Req3, State2, 405) end end. -method_not_allowed(Req, State, []) -> - Req2 = cowboy_req:set_resp_header(<<"allow">>, <<>>, Req), - respond(Req2, State, 405); -method_not_allowed(Req, State, Methods) -> - << ", ", Allow/binary >> = << << ", ", M/binary >> || M <- Methods >>, - Req2 = cowboy_req:set_resp_header(<<"allow">>, Allow, Req), - respond(Req2, State, 405). - malformed_request(Req, State) -> expect(Req, State, malformed_request, false, fun is_authorized/2, 400). @@ -413,16 +397,10 @@ valid_entity_length(Req, State) -> %% If you need to add additional headers to the response at this point, %% you should do it directly in the options/2 call using set_resp_headers. -options(Req, State=#state{allowed_methods=Methods, method= <<"OPTIONS">>}) -> +options(Req, State=#state{method= <<"OPTIONS">>}) -> case call(Req, State, options) of - no_call when Methods =:= [] -> - Req2 = cowboy_req:set_resp_header(<<"allow">>, <<>>, Req), - respond(Req2, State, 200); no_call -> - << ", ", Allow/binary >> - = << << ", ", M/binary >> || M <- Methods >>, - Req2 = cowboy_req:set_resp_header(<<"allow">>, Allow, Req), - respond(Req2, State, 200); + respond(Req, State, 200); {stop, Req2, State2} -> terminate(Req2, State2); {Switch, Req2, State2} when element(1, Switch) =:= switch_handler -> @@ -471,7 +449,7 @@ content_types_provided(Req, State) -> {[], Req2, State2} -> not_acceptable(Req2, State2); {CTP, Req2, State2} -> - CTP2 = [normalize_content_types(P) || P <- CTP], + CTP2 = [normalize_content_types(P, provide) || P <- CTP], State3 = State2#state{content_types_p=CTP2}, try cowboy_req:parse_header(<<"accept">>, Req2) of undefined -> @@ -491,10 +469,14 @@ content_types_provided(Req, State) -> end end. -normalize_content_types({ContentType, Callback}) +normalize_content_types({ContentType, Callback}, _) when is_binary(ContentType) -> {cow_http_hd:parse_content_type(ContentType), Callback}; -normalize_content_types(Normalized) -> +normalize_content_types(Normalized = {{Type, SubType, _}, _}, _) + when is_binary(Type), is_binary(SubType) -> + Normalized; +%% Wildcard for content_types_accepted. +normalize_content_types(Normalized = {'*', _}, accept) -> Normalized. prioritize_accept(Accept) -> @@ -1059,7 +1041,7 @@ accept_resource(Req, State) -> {Switch, Req2, State2} when element(1, Switch) =:= switch_handler -> switch_handler(Switch, Req2, State2); {CTA, Req2, State2} -> - CTA2 = [normalize_content_types(P) || P <- CTA], + CTA2 = [normalize_content_types(P, accept) || P <- CTA], try cowboy_req:parse_header(<<"content-type">>, Req2) of %% We do not match against the boundary parameter for multipart. {Type = <<"multipart">>, SubType, Params} -> @@ -1099,9 +1081,9 @@ process_content_type(Req, State=#state{method=Method, exists=Exists}, Fun) -> {Switch, Req2, State2} when element(1, Switch) =:= switch_handler -> switch_handler(Switch, Req2, State2); {true, Req2, State2} when Exists -> - next(Req2, State2, fun has_resp_body/2); + has_resp_body(Req2, State2); {true, Req2, State2} -> - next(Req2, State2, fun maybe_created/2); + maybe_created(Req2, State2); {false, Req2, State2} -> respond(Req2, State2, 400); {{created, ResURL}, Req2, State2} when Method =:= <<"POST">> -> @@ -1196,6 +1178,7 @@ if_range(Req=#{headers := #{<<"if-range">> := _, <<"range">> := _}}, if_range(Req, State) -> range(Req, State). +%% @todo This can probably be moved to if_range directly. range(Req, State=#state{ranges_a=[]}) -> set_resp_body(Req, State); range(Req, State) -> @@ -1527,6 +1510,12 @@ generate_etag(Req, State=#state{etag=undefined}) -> case unsafe_call(Req, State, generate_etag) of no_call -> {undefined, Req, State#state{etag=no_call}}; + %% We allow the callback to return 'undefined' + %% to allow conditionally generating etags. We + %% handle 'undefined' the same as if the function + %% was not exported. + {undefined, Req2, State2} -> + {undefined, Req2, State2#state{etag=no_call}}; {Etag, Req2, State2} when is_binary(Etag) -> Etag2 = cow_http_hd:parse_etag(Etag), {Etag2, Req2, State2#state{etag=Etag2}}; @@ -1633,5 +1622,6 @@ error_terminate(Req, #state{handler=Handler, handler_state=HandlerState}, Class, erlang:raise(Class, Reason, Stacktrace). terminate(Req, #state{handler=Handler, handler_state=HandlerState}) -> + %% @todo I don't think the result is used anywhere? Result = cowboy_handler:terminate(normal, Req, HandlerState, Handler), {ok, Req, Result}. diff --git a/src/cowboy_router.erl b/src/cowboy_router.erl index 0b7fe41..393d82d 100644 --- a/src/cowboy_router.erl +++ b/src/cowboy_router.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_static.erl b/src/cowboy_static.erl index b0cf146..ce34b01 100644 --- a/src/cowboy_static.erl +++ b/src/cowboy_static.erl @@ -1,5 +1,5 @@ -%% Copyright (c) 2013-2017, Loïc Hoguin <[email protected]> -%% Copyright (c) 2011, Magnus Klaar <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> +%% Copyright (c) Magnus Klaar <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -29,7 +29,7 @@ -type extra_charset() :: {charset, module(), function()} | {charset, binary()}. -type extra_etag() :: {etag, module(), function()} | {etag, false}. -type extra_mimetypes() :: {mimetypes, module(), function()} - | {mimetypes, binary() | {binary(), binary(), [{binary(), binary()}]}}. + | {mimetypes, binary() | {binary(), binary(), '*' | [{binary(), binary()}]}}. -type extra() :: [extra_charset() | extra_etag() | extra_mimetypes()]. -type opts() :: {file | dir, string() | binary()} | {file | dir, string() | binary(), extra()} @@ -332,7 +332,7 @@ forbidden(Req, State) -> %% Detect the mimetype of the file. -spec content_types_provided(Req, State) - -> {[{binary(), get_file}], Req, State} + -> {[{binary() | {binary(), binary(), '*' | [{binary(), binary()}]}, get_file}], Req, State} when State::state(). content_types_provided(Req, State={Path, _, Extra}) when is_list(Extra) -> case lists:keyfind(mimetypes, 1, Extra) of @@ -347,7 +347,7 @@ content_types_provided(Req, State={Path, _, Extra}) when is_list(Extra) -> %% Detect the charset of the file. -spec charsets_provided(Req, State) - -> {[binary()], Req, State} + -> {[binary()], Req, State} | no_call when State::state(). charsets_provided(Req, State={Path, _, Extra}) -> case lists:keyfind(charset, 1, Extra) of @@ -381,7 +381,7 @@ resource_exists(Req, State) -> %% Generate an etag for the file. -spec generate_etag(Req, State) - -> {{strong | weak, binary()}, Req, State} + -> {{strong | weak, binary() | undefined}, Req, State} when State::state(). generate_etag(Req, State={Path, {_, #file_info{size=Size, mtime=Mtime}}, Extra}) -> @@ -408,7 +408,7 @@ last_modified(Req, State={_, {_, #file_info{mtime=Modified}}, _}) -> %% Stream the file. -spec get_file(Req, State) - -> {{sendfile, 0, non_neg_integer(), binary()}, Req, State} + -> {{sendfile, 0, non_neg_integer(), binary()} | binary(), Req, State} when State::state(). get_file(Req, State={Path, {direct, #file_info{size=Size}}, _}) -> {{sendfile, 0, Size, Path}, Req, State}; diff --git a/src/cowboy_stream.erl b/src/cowboy_stream.erl index 2dad6d0..6680bdc 100644 --- a/src/cowboy_stream.erl +++ b/src/cowboy_stream.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2015-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -49,6 +49,7 @@ -type reason() :: normal | switch_protocol | {internal_error, timeout | {error | exit | throw, any()}, human_reason()} | {socket_error, closed | atom(), human_reason()} + %% @todo Or cow_http3:error(). | {stream_error, cow_http2:error(), human_reason()} | {connection_error, cow_http2:error(), human_reason()} | {stop, cow_http2:frame() | {exit, any()}, human_reason()}. diff --git a/src/cowboy_stream_h.erl b/src/cowboy_stream_h.erl index f516f3d..3c3c084 100644 --- a/src/cowboy_stream_h.erl +++ b/src/cowboy_stream_h.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2016-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -138,7 +138,7 @@ info(StreamID, Info={'EXIT', Pid, {{request_error, Reason, _HumanReadable}, _}}, {error_response, Status, #{<<"content-length">> => <<"0">>}, <<>>}, stop ], State); -info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, pid=Pid}) -> +info(StreamID, Exit={'EXIT', Pid, Reason}, State=#state{ref=Ref, pid=Pid}) -> Commands0 = [{internal_error, Exit, 'Stream process crashed.'}], Commands = case Reason of normal -> Commands0; @@ -146,11 +146,15 @@ info(StreamID, Exit={'EXIT', Pid, {Reason, Stacktrace}}, State=#state{ref=Ref, p {shutdown, _} -> Commands0; _ -> [{log, error, "Ranch listener ~p, connection process ~p, stream ~p " - "had its request process ~p exit with reason " - "~999999p and stacktrace ~999999p~n", - [Ref, self(), StreamID, Pid, Reason, Stacktrace]} + "had its request process ~p exit with reason ~0p~n", + [Ref, self(), StreamID, Pid, Reason]} |Commands0] end, + %% @todo We are trying to send a 500 response before resetting + %% the stream. But due to the way the RESET_STREAM frame + %% works in QUIC the data may be lost. The problem is + %% known and a draft RFC exists at + %% https://www.ietf.org/id/draft-ietf-quic-reliable-stream-reset-03.html do_info(StreamID, Exit, [ {error_response, 500, #{<<"content-length">> => <<"0">>}, <<>>} |Commands], State); diff --git a/src/cowboy_sub_protocol.erl b/src/cowboy_sub_protocol.erl index 6714289..1f24d00 100644 --- a/src/cowboy_sub_protocol.erl +++ b/src/cowboy_sub_protocol.erl @@ -1,5 +1,5 @@ -%% Copyright (c) 2013-2017, Loïc Hoguin <[email protected]> -%% Copyright (c) 2013, James Fish <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> +%% Copyright (c) James Fish <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_sup.erl b/src/cowboy_sup.erl index d3ac3b0..224ef7d 100644 --- a/src/cowboy_sup.erl +++ b/src/cowboy_sup.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_tls.erl b/src/cowboy_tls.erl index c049ecb..6d0dcd3 100644 --- a/src/cowboy_tls.erl +++ b/src/cowboy_tls.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2015-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -33,19 +33,17 @@ start_link(Ref, Transport, Opts) -> -spec connection_process(pid(), ranch:ref(), module(), cowboy:opts()) -> ok. connection_process(Parent, Ref, Transport, Opts) -> - ProxyInfo = case maps:get(proxy_header, Opts, false) of - true -> - {ok, ProxyInfo0} = ranch:recv_proxy_header(Ref, 1000), - ProxyInfo0; - false -> - undefined - end, + ProxyInfo = get_proxy_info(Ref, Opts), {ok, Socket} = ranch:handshake(Ref), case ssl:negotiated_protocol(Socket) of {ok, <<"h2">>} -> init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, cowboy_http2); _ -> %% http/1.1 or no protocol negotiated. - init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, cowboy_http) + Protocol = case maps:get(alpn_default_protocol, Opts, http) of + http -> cowboy_http; + http2 -> cowboy_http2 + end, + init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, Protocol) end. init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, Protocol) -> @@ -54,3 +52,11 @@ init(Parent, Ref, Socket, Transport, ProxyInfo, Opts, Protocol) -> supervisor -> process_flag(trap_exit, true) end, Protocol:init(Parent, Ref, Socket, Transport, ProxyInfo, Opts). + +get_proxy_info(Ref, #{proxy_header := true}) -> + case ranch:recv_proxy_header(Ref, 1000) of + {ok, ProxyInfo} -> ProxyInfo; + {error, closed} -> exit({shutdown, closed}) + end; +get_proxy_info(_, _) -> + undefined. diff --git a/src/cowboy_tracer_h.erl b/src/cowboy_tracer_h.erl index 9a19ae1..91a431b 100644 --- a/src/cowboy_tracer_h.erl +++ b/src/cowboy_tracer_h.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index e7d8f31..cb30c3f 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -69,6 +69,9 @@ active_n => pos_integer(), compress => boolean(), deflate_opts => cow_ws:deflate_opts(), + dynamic_buffer => false | {pos_integer(), pos_integer()}, + dynamic_buffer_initial_average => non_neg_integer(), + dynamic_buffer_initial_size => pos_integer(), idle_timeout => timeout(), max_frame_size => non_neg_integer() | infinity, req_filter => fun((cowboy_req:req()) -> map()), @@ -76,6 +79,14 @@ }. -export_type([opts/0]). +%% We don't want to reset the idle timeout too often, +%% so we don't reset it on data. Instead we reset the +%% number of ticks we have observed. We divide the +%% timeout value by a value and that value becomes +%% the number of ticks at which point we can drop +%% the connection. This value is the number of ticks. +-define(IDLE_TIMEOUT_TICKS, 10). + -record(state, { parent :: undefined | pid(), ref :: ranch:ref(), @@ -86,8 +97,14 @@ handler :: module(), key = undefined :: undefined | binary(), timeout_ref = undefined :: undefined | reference(), + timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS, messages = undefined :: undefined | {atom(), atom(), atom()} | {atom(), atom(), atom(), atom()}, + + %% Dynamic buffer moving average and current buffer size. + dynamic_buffer_size = false :: pos_integer() | false, + dynamic_buffer_moving_average = 0 :: non_neg_integer(), + hibernate = false :: boolean(), frag_state = undefined :: cow_ws:frag_state(), frag_buffer = <<>> :: binary(), @@ -103,7 +120,8 @@ %% is trying to upgrade to the Websocket protocol. -spec is_upgrade_request(cowboy_req:req()) -> boolean(). -is_upgrade_request(#{version := 'HTTP/2', method := <<"CONNECT">>, protocol := Protocol}) -> +is_upgrade_request(#{version := Version, method := <<"CONNECT">>, protocol := Protocol}) + when Version =:= 'HTTP/2'; Version =:= 'HTTP/3' -> <<"websocket">> =:= cowboy_bstr:to_lower(Protocol); is_upgrade_request(Req=#{version := 'HTTP/1.1', method := <<"GET">>}) -> ConnTokens = cowboy_req:parse_header(<<"connection">>, Req, []), @@ -148,13 +166,13 @@ upgrade(Req0=#{version := Version}, Env, Handler, HandlerState, Opts) -> <<"connection">> => <<"upgrade">>, <<"upgrade">> => <<"websocket">> }, Req0), Env}; - %% Use a generic 400 error for HTTP/2. + %% Use 501 Not Implemented for HTTP/2 and HTTP/3 as recommended + %% by RFC9220 3 (WebSockets Upgrade over HTTP/3). {error, upgrade_required} -> - {ok, cowboy_req:reply(400, Req0), Env} + {ok, cowboy_req:reply(501, Req0), Env} catch _:_ -> %% @todo Probably log something here? %% @todo Test that we can have 2 /ws 400 status code in a row on the same connection. - %% @todo Does this even work? {ok, cowboy_req:reply(400, Req0), Env} end. @@ -260,7 +278,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, %% @todo We don't want date and server headers. Headers = cowboy_req:response_headers(#{}, Req), Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}}, - takeover(Pid, Ref, {Pid, StreamID}, undefined, undefined, <<>>, + takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>, {State, HandlerState}). %% Connection process. @@ -285,17 +303,24 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, -spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()}, module() | undefined, any(), binary(), {#state{}, any()}) -> no_return(). -takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, - {State0=#state{handler=Handler}, HandlerState}) -> - %% @todo We should have an option to disable this behavior. - ranch:remove_connection(Ref), +takeover(Parent, Ref, Socket, Transport, Opts, Buffer, + {State0=#state{opts=WsOpts, handler=Handler, req=Req}, HandlerState}) -> + case Req of + #{version := 'HTTP/3'} -> ok; + %% @todo We should have an option to disable this behavior. + _ -> ranch:remove_connection(Ref) + end, Messages = case Transport of undefined -> undefined; _ -> Transport:messages() end, - State = loop_timeout(State0#state{parent=Parent, + State = set_idle_timeout(State0#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - key=undefined, messages=Messages}), + opts=WsOpts#{dynamic_buffer => maps:get(dynamic_buffer, Opts, false)}, + key=undefined, messages=Messages, + %% Dynamic buffer only applies to HTTP/1.1 Websocket. + dynamic_buffer_size=init_dynamic_buffer_size(Opts), + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0)}, 0), %% We call parse_header/3 immediately because there might be %% some data in the buffer that was sent along with the handshake. %% While it is not allowed by the protocol to send frames immediately, @@ -306,6 +331,12 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, false -> after_init(State, HandlerState, #ps_header{buffer=Buffer}) end. +-include("cowboy_dynamic_buffer.hrl"). + +%% @todo Implement early socket error detection. +maybe_socket_error(_, _) -> + ok. + after_init(State=#state{active=true}, HandlerState, ParseState) -> %% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2. %% We must do this only after calling websocket_init/1 (if any) @@ -327,7 +358,7 @@ after_init(State, HandlerState, ParseState) -> setopts_active(#state{transport=undefined}) -> ok; setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> - N = maps:get(active_n, Opts, 100), + N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) -> @@ -369,28 +400,41 @@ before_loop(State=#state{hibernate=true}, HandlerState, ParseState) -> before_loop(State, HandlerState, ParseState) -> loop(State, HandlerState, ParseState). --spec loop_timeout(#state{}) -> #state{}. -loop_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}) -> +-spec set_idle_timeout(#state{}, 0..?IDLE_TIMEOUT_TICKS) -> #state{}. + +%% @todo Do we really need this for HTTP/2? +set_idle_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}, TimeoutNum) -> + %% Most of the time we don't need to cancel the timer since it + %% will have triggered already. But this call is harmless so + %% it is kept to simplify the code as we do need to cancel when + %% options are changed dynamically. _ = case PrevRef of undefined -> ignore; - PrevRef -> erlang:cancel_timer(PrevRef) + PrevRef -> erlang:cancel_timer(PrevRef, [{async, true}, {info, false}]) end, case maps:get(idle_timeout, Opts, 60000) of infinity -> - State#state{timeout_ref=undefined}; + State#state{timeout_ref=undefined, timeout_num=TimeoutNum}; Timeout -> - TRef = erlang:start_timer(Timeout, self(), ?MODULE), - State#state{timeout_ref=TRef} + TRef = erlang:start_timer(Timeout div ?IDLE_TIMEOUT_TICKS, self(), ?MODULE), + State#state{timeout_ref=TRef, timeout_num=TimeoutNum} end. +-define(reset_idle_timeout(State), State#state{timeout_num=0}). + +tick_idle_timeout(State=#state{timeout_num=?IDLE_TIMEOUT_TICKS}, HandlerState, _) -> + websocket_close(State, HandlerState, timeout); +tick_idle_timeout(State=#state{timeout_num=TimeoutNum}, HandlerState, ParseState) -> + before_loop(set_idle_timeout(State, TimeoutNum + 1), HandlerState, ParseState). + -spec loop(#state{}, any(), parse_state()) -> no_return(). loop(State=#state{parent=Parent, socket=Socket, messages=Messages, timeout_ref=TRef}, HandlerState, ParseState) -> receive %% Socket messages. (HTTP/1.1) {OK, Socket, Data} when OK =:= element(1, Messages) -> - State2 = loop_timeout(State), - parse(State2, HandlerState, ParseState, Data); + State1 = maybe_resize_buffer(State, Data), + parse(?reset_idle_timeout(State1), HandlerState, ParseState, Data); {Closed, Socket} when Closed =:= element(2, Messages) -> terminate(State, HandlerState, {error, closed}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> @@ -403,18 +447,16 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages, %% Body reading messages. (HTTP/2) {request_body, _Ref, nofin, Data} -> maybe_read_body(State), - State2 = loop_timeout(State), - parse(State2, HandlerState, ParseState, Data); + parse(?reset_idle_timeout(State), HandlerState, ParseState, Data); %% @todo We need to handle this case as if it was an {error, closed} %% but not before we finish processing frames. We probably should have %% a check in before_loop to let us stop looping if a flag is set. {request_body, _Ref, fin, _, Data} -> maybe_read_body(State), - State2 = loop_timeout(State), - parse(State2, HandlerState, ParseState, Data); + parse(?reset_idle_timeout(State), HandlerState, ParseState, Data); %% Timeouts. {timeout, TRef, ?MODULE} -> - websocket_close(State, HandlerState, timeout); + tick_idle_timeout(State, HandlerState, ParseState); {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> before_loop(State, HandlerState, ParseState); %% System messages. @@ -458,12 +500,16 @@ parse_header(State=#state{opts=Opts, frag_state=FragState, extensions=Extensions websocket_close(State, HandlerState, {error, badframe}) end. -parse_payload(State=#state{frag_state=FragState, utf8_state=Incomplete, extensions=Extensions}, +parse_payload(State=#state{opts=Opts, frag_state=FragState, utf8_state=Incomplete, extensions=Extensions}, HandlerState, ParseState=#ps_payload{ type=Type, len=Len, mask_key=MaskKey, rsv=Rsv, unmasked=Unmasked, unmasked_len=UnmaskedLen}, Data) -> + MaxFrameSize = case maps:get(max_frame_size, Opts, infinity) of + infinity -> infinity; + MaxFrameSize0 -> MaxFrameSize0 - UnmaskedLen + end, case cow_ws:parse_payload(Data, MaskKey, Incomplete, UnmaskedLen, - Type, Len, FragState, Extensions, Rsv) of + Type, Len, FragState, Extensions#{max_inflate_size => MaxFrameSize}, Rsv) of {ok, CloseCode, Payload, Utf8State, Rest} -> dispatch_frame(State#state{utf8_state=Utf8State}, HandlerState, ParseState#ps_payload{unmasked= <<Unmasked/binary, Payload/binary>>, @@ -593,13 +639,16 @@ commands([{active, Active}|Tail], State0=#state{active=Active0}, Data) when is_b commands(Tail, State#state{active=Active}, Data); commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) -> commands(Tail, State#state{deflate=Deflate}, Data); -commands([{set_options, SetOpts}|Tail], State0=#state{opts=Opts}, Data) -> - State = case SetOpts of - #{idle_timeout := IdleTimeout} -> - loop_timeout(State0#state{opts=Opts#{idle_timeout => IdleTimeout}}); - _ -> - State0 - end, +commands([{set_options, SetOpts}|Tail], State0, Data) -> + State = maps:fold(fun + (idle_timeout, IdleTimeout, StateF=#state{opts=Opts}) -> + %% We reset the number of ticks when changing the idle_timeout option. + set_idle_timeout(StateF#state{opts=Opts#{idle_timeout => IdleTimeout}}, 0); + (max_frame_size, MaxFrameSize, StateF=#state{opts=Opts}) -> + StateF#state{opts=Opts#{max_frame_size => MaxFrameSize}}; + (_, _, StateF) -> + StateF + end, State0, SetOpts), commands(Tail, State, Data); commands([{shutdown_reason, ShutdownReason}|Tail], State, Data) -> commands(Tail, State#state{shutdown_reason=ShutdownReason}, Data); diff --git a/src/cowboy_webtransport.erl b/src/cowboy_webtransport.erl new file mode 100644 index 0000000..8c8ca39 --- /dev/null +++ b/src/cowboy_webtransport.erl @@ -0,0 +1,292 @@ +%% Copyright (c) Loïc Hoguin <[email protected]> +%% +%% Permission to use, copy, modify, and/or distribute this software for any +%% purpose with or without fee is hereby granted, provided that the above +%% copyright notice and this permission notice appear in all copies. +%% +%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + +%% @todo To enable WebTransport the following options need to be set: +%% +%% QUIC: +%% - max_datagram_frame_size > 0 +%% +%% HTTP/3: +%% - SETTINGS_H3_DATAGRAM = 1 +%% - SETTINGS_ENABLE_CONNECT_PROTOCOL = 1 +%% - SETTINGS_WT_MAX_SESSIONS >= 1 + +%% Cowboy supports versions 07 through 13 of the WebTransport drafts. +%% Cowboy also has some compatibility with version 02. +%% +%% WebTransport CONNECT requests go through cowboy_stream as normal +%% and then an upgrade/switch_protocol is issued (just like Websocket). +%% After that point none of the events go through cowboy_stream except +%% the final terminate event. The request process becomes the process +%% handling all events in the WebTransport session. +%% +%% WebTransport sessions can be ended via a command, via a crash or +%% exit, via the closing of the connection (client or server inititated), +%% via the client ending the session (mirroring the command) or via +%% the client terminating the CONNECT stream. +-module(cowboy_webtransport). + +-export([upgrade/4]). +-export([upgrade/5]). + +%% cowboy_stream. +-export([info/3]). +-export([terminate/3]). + +-type stream_type() :: unidi | bidi. +-type open_stream_ref() :: any(). + +-type event() :: + {stream_open, cow_http3:stream_id(), stream_type()} | + {opened_stream_id, open_stream_ref(), cow_http3:stream_id()} | + {stream_data, cow_http3:stream_id(), cow_http:fin(), binary()} | + {datagram, binary()} | + close_initiated. + +-type commands() :: [ + {open_stream, open_stream_ref(), stream_type(), iodata()} | + {close_stream, cow_http3:stream_id(), cow_http3:wt_app_error_code()} | + {send, cow_http3:stream_id() | datagram, iodata()} | + initiate_close | + close | + {close, cow_http3:wt_app_error_code()} | + {close, cow_http3:wt_app_error_code(), iodata()} +]. +-export_type([commands/0]). + +-type call_result(State) :: {commands(), State} | {commands(), State, hibernate}. + +-callback init(Req, any()) + -> {ok | module(), Req, any()} + | {module(), Req, any(), any()} + when Req::cowboy_req:req(). + +-callback webtransport_init(State) + -> call_result(State) when State::any(). +-optional_callbacks([webtransport_init/1]). + +-callback webtransport_handle(event(), State) + -> call_result(State) when State::any(). +-optional_callbacks([webtransport_handle/2]). + +-callback webtransport_info(any(), State) + -> call_result(State) when State::any(). +-optional_callbacks([webtransport_info/2]). + +-callback terminate(any(), cowboy_req:req(), any()) -> ok. +-optional_callbacks([terminate/3]). + +-type opts() :: #{ + req_filter => fun((cowboy_req:req()) -> map()) +}. +-export_type([opts/0]). + +-record(state, { + id :: cow_http3:stream_id(), + parent :: pid(), + opts = #{} :: opts(), + handler :: module(), + hibernate = false :: boolean(), + req = #{} :: map() +}). + +%% This function mirrors a similar function for Websocket. + +-spec is_upgrade_request(cowboy_req:req()) -> boolean(). + +is_upgrade_request(#{version := Version, method := <<"CONNECT">>, protocol := Protocol}) + when Version =:= 'HTTP/3' -> + %% @todo scheme MUST BE "https" + <<"webtransport">> =:= cowboy_bstr:to_lower(Protocol); + +is_upgrade_request(_) -> + false. + +%% Stream process. + +-spec upgrade(Req, Env, module(), any()) + -> {ok, Req, Env} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). + +upgrade(Req, Env, Handler, HandlerState) -> + upgrade(Req, Env, Handler, HandlerState, #{}). + +-spec upgrade(Req, Env, module(), any(), opts()) + -> {ok, Req, Env} + when Req::cowboy_req:req(), Env::cowboy_middleware:env(). + +%% @todo Immediately crash if a response has already been sent. +upgrade(Req=#{version := 'HTTP/3', pid := Pid, streamid := StreamID}, Env, Handler, HandlerState, Opts) -> + FilteredReq = case maps:get(req_filter, Opts, undefined) of + undefined -> maps:with([method, version, scheme, host, port, path, qs, peer], Req); + FilterFun -> FilterFun(Req) + end, + State = #state{id=StreamID, parent=Pid, opts=Opts, handler=Handler, req=FilteredReq}, + %% @todo Must ensure the relevant settings are enabled (QUIC and H3). + %% Either we check them BEFORE, or we check them when the handler + %% is OK to initiate a webtransport session. Probably need to + %% check them BEFORE as we need to become (takeover) the webtransport process + %% after we are done with the upgrade. Maybe in cow_http3_machine but + %% it doesn't have QUIC settings currently (max_datagram_size). + case is_upgrade_request(Req) of + true -> + Headers = cowboy_req:response_headers(#{}, Req), + Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, + #{session_pid => self()}}}, + webtransport_init(State, HandlerState); + %% Use 501 Not Implemented to mirror the recommendation in + %% by RFC9220 3 (WebSockets Upgrade over HTTP/3). + false -> + %% @todo I don't think terminate will be called. + {ok, cowboy_req:reply(501, Req), Env} + end. + +webtransport_init(State=#state{handler=Handler}, HandlerState) -> + case erlang:function_exported(Handler, webtransport_init, 1) of + true -> handler_call(State, HandlerState, webtransport_init, undefined); + false -> before_loop(State, HandlerState) + end. + +before_loop(State=#state{hibernate=true}, HandlerState) -> + proc_lib:hibernate(?MODULE, loop, [State#state{hibernate=false}, HandlerState]); +before_loop(State, HandlerState) -> + loop(State, HandlerState). + +-spec loop(#state{}, any()) -> no_return(). + +loop(State=#state{id=SessionID, parent=Parent}, HandlerState) -> + receive + {'$webtransport_event', SessionID, Event={closed, _, _}} -> + terminate_proc(State, HandlerState, Event); + {'$webtransport_event', SessionID, Event=closed_abruptly} -> + terminate_proc(State, HandlerState, Event); + {'$webtransport_event', SessionID, Event} -> + handler_call(State, HandlerState, webtransport_handle, Event); + %% Timeouts. +%% @todo idle_timeout +% {timeout, TRef, ?MODULE} -> +% tick_idle_timeout(State, HandlerState, ParseState); +% {timeout, OlderTRef, ?MODULE} when is_reference(OlderTRef) -> +% before_loop(State, HandlerState, ParseState); + %% System messages. + {'EXIT', Parent, Reason} -> + %% @todo We should exit gracefully. + exit(Reason); + {system, From, Request} -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, [], + {State, HandlerState}); + %% Calls from supervisor module. + {'$gen_call', From, Call} -> + cowboy_children:handle_supervisor_call(Call, From, [], ?MODULE), + before_loop(State, HandlerState); + Message -> + handler_call(State, HandlerState, webtransport_info, Message) + end. + +handler_call(State=#state{handler=Handler}, HandlerState, Callback, Message) -> + try case Callback of + webtransport_init -> Handler:webtransport_init(HandlerState); + _ -> Handler:Callback(Message, HandlerState) + end of + {Commands, HandlerState2} when is_list(Commands) -> + handler_call_result(State, HandlerState2, Commands); + {Commands, HandlerState2, hibernate} when is_list(Commands) -> + handler_call_result(State#state{hibernate=true}, HandlerState2, Commands) + catch Class:Reason:Stacktrace -> + %% @todo Do we need to send a close? Let cowboy_http3 detect and handle it? + handler_terminate(State, HandlerState, {crash, Class, Reason}), + erlang:raise(Class, Reason, Stacktrace) + end. + +handler_call_result(State0, HandlerState, Commands) -> + case commands(Commands, State0, ok, []) of + {ok, State} -> + before_loop(State, HandlerState); + {stop, State} -> + terminate_proc(State, HandlerState, stop) + end. + +%% We accumulate the commands that must be sent to the connection process +%% because we want to send everything into one message. Other commands are +%% processed immediately. + +commands([], State, Res, []) -> + {Res, State}; +commands([], State=#state{id=SessionID, parent=Pid}, Res, Commands) -> + Pid ! {'$webtransport_commands', SessionID, lists:reverse(Commands)}, + {Res, State}; +%% {open_stream, OpenStreamRef, StreamType, InitialData}. +commands([Command={open_stream, _, _, _}|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% {close_stream, StreamID, Code}. +commands([Command={close_stream, _, _}|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% @todo We must reject send to a remote unidi stream. +%% {send, StreamID | datagram, Data}. +commands([Command={send, _, _}|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% {send, StreamID, IsFin, Data}. +commands([Command={send, _, _, _}|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% initiate_close - DRAIN_WT_SESSION +commands([Command=initiate_close|Tail], State, Res, Acc) -> + commands(Tail, State, Res, [Command|Acc]); +%% close | {close, Code} | {close, Code, Msg} - CLOSE_WT_SESSION +%% @todo At this point the handler must not issue stream or send commands. +commands([Command=close|Tail], State, _, Acc) -> + commands(Tail, State, stop, [Command|Acc]); +commands([Command={close, _}|Tail], State, _, Acc) -> + commands(Tail, State, stop, [Command|Acc]); +commands([Command={close, _, _}|Tail], State, _, Acc) -> + commands(Tail, State, stop, [Command|Acc]). +%% @todo A set_options command could be useful to increase the number of allowed streams +%% or other forms of flow control. Alternatively a flow command. Or both. +%% @todo A shutdown_reason command could be useful for the same reasons as Websocekt. + +-spec terminate_proc(_, _, _) -> no_return(). + +terminate_proc(State, HandlerState, Reason) -> + handler_terminate(State, HandlerState, Reason), + %% @todo This is what should be done if shutdown_reason gets implemented. +% case Shutdown of +% normal -> exit(normal); +% _ -> exit({shutdown, Shutdown}) +% end. + exit(normal). + +handler_terminate(#state{handler=Handler, req=Req}, HandlerState, Reason) -> + cowboy_handler:terminate(Reason, Req, HandlerState, Handler). + +%% cowboy_stream callbacks. +%% +%% We shortcut stream handlers but still need to process some events +%% such as process exiting or termination. We implement the relevant +%% callbacks here. Note that as far as WebTransport is concerned, +%% receiving stream data here would be an error therefore the data +%% callback is not implemented. +%% +%% @todo Better type than map() for the cowboy_stream state. + +-spec info(cowboy_stream:streamid(), any(), State) + -> {cowboy_stream:commands(), State} when State::map(). + +info(StreamID, Msg, WTState=#{stream_state := StreamState0}) -> + {Commands, StreamState} = cowboy_stream:info(StreamID, Msg, StreamState0), + {Commands, WTState#{stream_state => StreamState}}. + +-spec terminate(cowboy_stream:streamid(), cowboy_stream:reason(), map()) + -> any(). + +terminate(StreamID, Reason, #{stream_state := StreamState}) -> + cowboy_stream:terminate(StreamID, Reason, StreamState). |