diff options
Diffstat (limited to 'src/cowboy_http2.erl')
-rw-r--r-- | src/cowboy_http2.erl | 441 |
1 files changed, 326 insertions, 115 deletions
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 8dc8c3b..2e73d5f 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2015-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) 2015-2024, Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -31,6 +31,8 @@ connection_window_update_threshold => 0..16#7fffffff, enable_connect_protocol => boolean(), env => cowboy_middleware:env(), + goaway_initial_timeout => timeout(), + goaway_complete_timeout => timeout(), idle_timeout => timeout(), inactivity_timeout => timeout(), initial_connection_window_size => 65535..16#7fffffff, @@ -42,10 +44,12 @@ max_connection_window_size => 0..16#7fffffff, max_decode_table_size => non_neg_integer(), max_encode_table_size => non_neg_integer(), + max_fragmented_header_block_size => 16384..16#7fffffff, max_frame_size_received => 16384..16777215, max_frame_size_sent => 16384..16777215 | infinity, max_received_frame_rate => {pos_integer(), timeout()}, max_reset_stream_rate => {pos_integer(), timeout()}, + max_cancel_stream_rate => {pos_integer(), timeout()}, max_stream_buffer_size => non_neg_integer(), max_stream_window_size => 0..16#7fffffff, metrics_callback => cowboy_metrics_h:metrics_callback(), @@ -54,6 +58,7 @@ middlewares => [module()], preface_timeout => timeout(), proxy_header => boolean(), + reset_idle_timeout_on_send => boolean(), sendfile => boolean(), settings_timeout => timeout(), shutdown_timeout => timeout(), @@ -88,7 +93,7 @@ proxy_header :: undefined | ranch_proxy_header:proxy_info(), opts = #{} :: opts(), - %% Timer for idle_timeout. + %% Timer for idle_timeout; also used for goaway timers. timer = undefined :: undefined | reference(), %% Remote address and port for the connection. @@ -101,7 +106,7 @@ cert :: undefined | binary(), %% HTTP/2 state machine. - http2_status :: sequence | settings | upgrade | connected | closing, + http2_status :: sequence | settings | upgrade | connected | closing_initiated | closing, http2_machine :: cow_http2_machine:http2_machine(), %% HTTP/2 frame rate flood protection. @@ -112,6 +117,10 @@ reset_rate_num :: undefined | pos_integer(), reset_rate_time :: undefined | integer(), + %% HTTP/2 rapid reset attack protection. + cancel_rate_num :: undefined | pos_integer(), + cancel_rate_time :: undefined | integer(), + %% Flow requested for all streams. flow = 0 :: non_neg_integer(), @@ -127,9 +136,11 @@ -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> ok. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> - Peer0 = Transport:peername(Socket), - Sock0 = Transport:sockname(Socket), - Cert1 = case Transport:name() of + {ok, Peer} = maybe_socket_error(undefined, Transport:peername(Socket), + 'A socket error occurred when retrieving the peer name.'), + {ok, Sock} = maybe_socket_error(undefined, Transport:sockname(Socket), + 'A socket error occurred when retrieving the sock name.'), + CertResult = case Transport:name() of ssl -> case ssl:peercert(Socket) of {error, no_peercert} -> @@ -140,19 +151,9 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> _ -> {ok, undefined} end, - case {Peer0, Sock0, Cert1} of - {{ok, Peer}, {ok, Sock}, {ok, Cert}} -> - init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, <<>>); - {{error, Reason}, _, _} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the peer name.'}); - {_, {error, Reason}, _} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the sock name.'}); - {_, _, {error, Reason}} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the client TLS certificate.'}) - end. + {ok, Cert} = maybe_socket_error(undefined, CertResult, + 'A socket error occurred when retrieving the client TLS certificate.'), + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, <<>>). -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info() | undefined, cowboy:opts(), @@ -160,26 +161,42 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> binary() | undefined, binary()) -> ok. init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) -> {ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts), - State = set_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket, + %% Send the preface before doing all the init in case we get a socket error. + ok = maybe_socket_error(undefined, Transport:send(Socket, Preface)), + State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, http2_status=sequence, http2_machine=HTTP2Machine})), - Transport:send(Socket, Preface), - setopts_active(State), + safe_setopts_active(State), case Buffer of <<>> -> loop(State, Buffer); _ -> parse(State, Buffer) end. -init_rate_limiting(State=#state{opts=Opts}) -> +init_rate_limiting(State0) -> + CurrentTime = erlang:monotonic_time(millisecond), + State1 = init_frame_rate_limiting(State0, CurrentTime), + State2 = init_reset_rate_limiting(State1, CurrentTime), + init_cancel_rate_limiting(State2, CurrentTime). + +init_frame_rate_limiting(State=#state{opts=Opts}, CurrentTime) -> {FrameRateNum, FrameRatePeriod} = maps:get(max_received_frame_rate, Opts, {10000, 10000}), + State#state{ + frame_rate_num=FrameRateNum, frame_rate_time=add_period(CurrentTime, FrameRatePeriod) + }. + +init_reset_rate_limiting(State=#state{opts=Opts}, CurrentTime) -> {ResetRateNum, ResetRatePeriod} = maps:get(max_reset_stream_rate, Opts, {10, 10000}), - CurrentTime = erlang:monotonic_time(millisecond), State#state{ - frame_rate_num=FrameRateNum, frame_rate_time=add_period(CurrentTime, FrameRatePeriod), reset_rate_num=ResetRateNum, reset_rate_time=add_period(CurrentTime, ResetRatePeriod) }. +init_cancel_rate_limiting(State=#state{opts=Opts}, CurrentTime) -> + {CancelRateNum, CancelRatePeriod} = maps:get(max_cancel_stream_rate, Opts, {500, 10000}), + State#state{ + cancel_rate_num=CancelRateNum, cancel_rate_time=add_period(CurrentTime, CancelRatePeriod) + }. + add_period(_, infinity) -> infinity; add_period(Time, Period) -> Time + Period. @@ -205,9 +222,11 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer <<"connection">> => <<"Upgrade">>, <<"upgrade">> => <<"h2c">> }, ?MODULE, undefined}), %% @todo undefined or #{}? - State = set_timeout(init_rate_limiting(State2#state{http2_status=sequence})), - Transport:send(Socket, Preface), - setopts_active(State), + State = set_idle_timeout(init_rate_limiting(State2#state{http2_status=sequence})), + %% In the case of HTTP/1.1 Upgrade we cannot send the Preface + %% until we send the 101 response. + ok = maybe_socket_error(State, Transport:send(Socket, Preface)), + safe_setopts_active(State), case Buffer of <<>> -> loop(State, Buffer); _ -> parse(State, Buffer) @@ -220,6 +239,9 @@ setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> N = maps:get(active_n, Opts, 100), Transport:setopts(Socket, [{active, N}]). +safe_setopts_active(State) -> + ok = maybe_socket_error(State, setopts_active(State)). + loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, timer=TimerRef, children=Children}, Buffer) -> Messages = Transport:messages(), @@ -227,19 +249,25 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, receive %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(set_timeout(State), << Buffer/binary, Data/binary >>); + parse(set_idle_timeout(State), << Buffer/binary, Data/binary >>); {Closed, Socket} when Closed =:= element(2, Messages) -> - terminate(State, {socket_error, closed, 'The socket has been closed.'}); + Reason = case State#state.http2_status of + closing -> {stop, closed, 'The client is going away.'}; + _ -> {socket_error, closed, 'The socket has been closed.'} + end, + terminate(State, Reason); {Error, Socket, Reason} when Error =:= element(3, Messages) -> terminate(State, {socket_error, Reason, 'An error has occurred on the socket.'}); {Passive, Socket} when Passive =:= element(4, Messages); %% Hardcoded for compatibility with Ranch 1.x. Passive =:= tcp_passive; Passive =:= ssl_passive -> - setopts_active(State), + safe_setopts_active(State), loop(State, Buffer); %% System messages. + {'EXIT', Parent, shutdown} -> + Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'}, + loop(initiate_closing(State, Reason), Buffer); {'EXIT', Parent, Reason} -> - %% @todo Graceful shutdown here as well? terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'}); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer}); @@ -252,6 +280,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, loop(State, Buffer); {timeout, TRef, {cow_http2_machine, Name}} -> loop(timeout(State, Name, TRef), Buffer); + {timeout, TimerRef, {goaway_initial_timeout, Reason}} -> + loop(closing(State, Reason), Buffer); + {timeout, TimerRef, {goaway_complete_timeout, Reason}} -> + terminate(State, {stop, stop_reason(Reason), + 'Graceful shutdown timed out.'}); %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> loop(info(State, StreamID, Msg), Buffer); @@ -269,17 +302,32 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, terminate(State, {internal_error, timeout, 'No message or data received before timeout.'}) end. -set_timeout(State=#state{opts=Opts, timer=TimerRef0}) -> +set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef}) + when Status =:= closing_initiated orelse Status =:= closing, + TimerRef =/= undefined -> + State; +set_idle_timeout(State=#state{opts=Opts}) -> + set_timeout(State, maps:get(idle_timeout, Opts, 60000), idle_timeout). + +set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) -> ok = case TimerRef0 of undefined -> ok; _ -> erlang:cancel_timer(TimerRef0, [{async, true}, {info, false}]) end, - TimerRef = case maps:get(idle_timeout, Opts, 60000) of + TimerRef = case Timeout of infinity -> undefined; - Timeout -> erlang:start_timer(Timeout, self(), idle_timeout) + Timeout -> erlang:start_timer(Timeout, self(), Message) end, State#state{timer=TimerRef}. +maybe_reset_idle_timeout(State=#state{opts=Opts}) -> + case maps:get(reset_idle_timeout_on_send, Opts, false) of + true -> + set_idle_timeout(State); + false -> + State + end. + %% HTTP/2 protocol parsing. parse(State=#state{http2_status=sequence}, Data) -> @@ -311,7 +359,7 @@ parse(State=#state{http2_status=Status, http2_machine=HTTP2Machine, streams=Stre %% Frame rate flood protection. -frame_rate(State0=#state{opts=Opts, frame_rate_num=Num0, frame_rate_time=Time}, Frame) -> +frame_rate(State0=#state{frame_rate_num=Num0, frame_rate_time=Time}, Frame) -> {Result, State} = case Num0 - 1 of 0 -> CurrentTime = erlang:monotonic_time(millisecond), @@ -320,8 +368,7 @@ frame_rate(State0=#state{opts=Opts, frame_rate_num=Num0, frame_rate_time=Time}, {error, State0}; true -> %% When the option has a period of infinity we cannot reach this clause. - {Num, Period} = maps:get(max_received_frame_rate, Opts, {1000, 10000}), - {ok, State0#state{frame_rate_num=Num, frame_rate_time=CurrentTime + Period}} + {ok, init_frame_rate_limiting(State0, CurrentTime)} end; Num -> {ok, State0#state{frame_rate_num=Num}} @@ -357,10 +404,11 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) -> goaway(State#state{http2_machine=HTTP2Machine}, GoAway); {send, SendData, HTTP2Machine} -> %% We may need to send an alarm for each of the streams sending data. - lists:foldl( + State1 = lists:foldl( fun({StreamID, _, _}, S) -> maybe_send_data_alarm(S, HTTP2Machine0, StreamID) end, send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData, []), - SendData); + SendData), + maybe_reset_idle_timeout(State1); {error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} -> reset_stream(State#state{http2_machine=HTTP2Machine}, StreamID, {stream_error, Reason, Human}); @@ -372,15 +420,20 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) -> %% if we were still waiting for a SETTINGS frame. maybe_ack(State=#state{http2_status=settings}, Frame) -> maybe_ack(State#state{http2_status=connected}, Frame); +%% We do not reset the idle timeout on send here because we are +%% sending data as a consequence of receiving data, which means +%% we already resetted the idle timeout. maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) -> case Frame of - {settings, _} -> Transport:send(Socket, cow_http2:settings_ack()); - {ping, Opaque} -> Transport:send(Socket, cow_http2:ping_ack(Opaque)); + {settings, _} -> + ok = maybe_socket_error(State, Transport:send(Socket, cow_http2:settings_ack())); + {ping, Opaque} -> + ok = maybe_socket_error(State, Transport:send(Socket, cow_http2:ping_ack(Opaque))); _ -> ok end, State. -data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) -> +data_frame(State0=#state{opts=Opts, flow=Flow0, streams=Streams}, StreamID, IsFin, Data) -> case Streams of #{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} -> try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of @@ -389,11 +442,26 @@ data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin %% We may receive more data than we requested. We ensure %% that the flow value doesn't go lower than 0. Size = byte_size(Data), - State = update_window(State0#state{flow=max(0, Flow - Size), + Flow = max(0, Flow0 - Size), + %% We would normally update the window when changing the flow + %% value. But because we are running commands, which themselves + %% may update the window, and we want to avoid updating the + %% window twice in a row, we first run the commands and then + %% only update the window a flow command was executed. We know + %% that it was because the flow value changed in the state. + State1 = State0#state{flow=Flow, streams=Streams#{StreamID => Stream#stream{ flow=max(0, StreamFlow - Size), state=StreamState}}}, - StreamID), - commands(State, StreamID, Commands) + State = commands(State1, StreamID, Commands), + case State of + %% No flow command was executed. We must update the window + %% because we changed the flow value earlier. + #state{flow=Flow} -> + update_window(State, StreamID); + %% Otherwise the window was updated already. + _ -> + State + end catch Class:Exception:Stacktrace -> cowboy:log(cowboy_stream:make_error_log(data, [StreamID, IsFin, Data, StreamState0], @@ -542,11 +610,27 @@ rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, R {#stream{state=StreamState}, Streams} -> terminate_stream_handler(State, StreamID, Reason, StreamState), Children = cowboy_children:shutdown(Children0, StreamID), - State#state{streams=Streams, children=Children}; + cancel_rate_limit(State#state{streams=Streams, children=Children}); error -> State end. +cancel_rate_limit(State0=#state{cancel_rate_num=Num0, cancel_rate_time=Time}) -> + case Num0 - 1 of + 0 -> + CurrentTime = erlang:monotonic_time(millisecond), + if + CurrentTime < Time -> + terminate(State0, {connection_error, enhance_your_calm, + 'Stream cancel rate larger than configuration allows. Flood? (CVE-2023-44487)'}); + true -> + %% When the option has a period of infinity we cannot reach this clause. + init_cancel_rate_limiting(State0, CurrentTime) + end; + Num -> + State0#state{cancel_rate_num=Num} + end. + ignored_frame(State=#state{http2_machine=HTTP2Machine0}) -> case cow_http2_machine:ignored_frame(HTTP2Machine0) of {ok, HTTP2Machine} -> @@ -567,18 +651,24 @@ timeout(State=#state{http2_machine=HTTP2Machine0}, Name, TRef) -> %% Erlang messages. -down(State=#state{opts=Opts, children=Children0}, Pid, Msg) -> - case cowboy_children:down(Children0, Pid) of +down(State0=#state{opts=Opts, children=Children0}, Pid, Msg) -> + State = case cowboy_children:down(Children0, Pid) of %% The stream was terminated already. {ok, undefined, Children} -> - State#state{children=Children}; + State0#state{children=Children}; %% The stream is still running. {ok, StreamID, Children} -> - info(State#state{children=Children}, StreamID, Msg); + info(State0#state{children=Children}, StreamID, Msg); %% The process was unknown. error -> cowboy:log(warning, "Received EXIT signal ~p for unknown process ~p.~n", [Msg, Pid], Opts), + State0 + end, + if + State#state.http2_status =:= closing, State#state.streams =:= #{} -> + terminate(State, {stop, normal, 'The connection is going away.'}); + true -> State end. @@ -625,23 +715,37 @@ commands(State=#state{http2_machine=HTTP2Machine}, StreamID, end; %% Send an informational response. commands(State0, StreamID, [{inform, StatusCode, Headers}|Tail]) -> - State = send_headers(State0, StreamID, idle, StatusCode, Headers), + State1 = send_headers(State0, StreamID, idle, StatusCode, Headers), + State = maybe_reset_idle_timeout(State1), commands(State, StreamID, Tail); %% Send response headers. commands(State0, StreamID, [{response, StatusCode, Headers, Body}|Tail]) -> - State = send_response(State0, StreamID, StatusCode, Headers, Body), + State1 = send_response(State0, StreamID, StatusCode, Headers, Body), + State = maybe_reset_idle_timeout(State1), commands(State, StreamID, Tail); %% Send response headers. commands(State0, StreamID, [{headers, StatusCode, Headers}|Tail]) -> - State = send_headers(State0, StreamID, nofin, StatusCode, Headers), + State1 = send_headers(State0, StreamID, nofin, StatusCode, Headers), + State = maybe_reset_idle_timeout(State1), commands(State, StreamID, Tail); %% Send a response body chunk. commands(State0, StreamID, [{data, IsFin, Data}|Tail]) -> - State = maybe_send_data(State0, StreamID, IsFin, Data, []), + State = case maybe_send_data(State0, StreamID, IsFin, Data, []) of + {data_sent, State1} -> + maybe_reset_idle_timeout(State1); + {no_data_sent, State1} -> + State1 + end, commands(State, StreamID, Tail); %% Send trailers. commands(State0, StreamID, [{trailers, Trailers}|Tail]) -> - State = maybe_send_data(State0, StreamID, fin, {trailers, maps:to_list(Trailers)}, []), + State = case maybe_send_data(State0, StreamID, fin, + {trailers, maps:to_list(Trailers)}, []) of + {data_sent, State1} -> + maybe_reset_idle_timeout(State1); + {no_data_sent, State1} -> + State1 + end, commands(State, StreamID, Tail); %% Send a push promise. %% @@ -673,10 +777,11 @@ commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Ma State = case cow_http2_machine:prepare_push_promise(StreamID, HTTP2Machine0, PseudoHeaders, Headers) of {ok, PromisedStreamID, HeaderBlock, HTTP2Machine} -> - Transport:send(Socket, cow_http2:push_promise( - StreamID, PromisedStreamID, HeaderBlock)), - headers_frame(State0#state{http2_machine=HTTP2Machine}, - PromisedStreamID, fin, Headers, PseudoHeaders, 0); + State1 = State0#state{http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State1, Transport:send(Socket, + cow_http2:push_promise(StreamID, PromisedStreamID, HeaderBlock))), + State2 = maybe_reset_idle_timeout(State1), + headers_frame(State2, PromisedStreamID, fin, Headers, PseudoHeaders, 0); {error, no_push} -> State0 end, @@ -699,10 +804,14 @@ commands(State, StreamID, [Error = {internal_error, _, _}|_Tail]) -> %% @todo Only reset when the stream still exists. reset_stream(State, StreamID, Error); %% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself. +%% +%% We do not need to reset the idle timeout on send because it +%% hasn't been set yet. This is called from init/12. commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade}, StreamID, [{switch_protocol, Headers, ?MODULE, _}|Tail]) -> %% @todo This 101 response needs to be passed through stream handlers. - Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))), + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers)))), commands(State, StreamID, Tail); %% Use a different protocol within the stream (CONNECT :protocol). %% @todo Make sure we error out when the feature is disabled. @@ -723,22 +832,32 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) -> %% Tentatively update the window after the flow was updated. -update_window(State=#state{socket=Socket, transport=Transport, +update_window(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) -> - #{StreamID := #stream{flow=StreamFlow}} = Streams, {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of ok -> {<<>>, HTTP2Machine0}; {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1} end, - {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of - ok -> {<<>>, HTTP2Machine2}; - {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} + {Data2, HTTP2Machine} = case Streams of + #{StreamID := #stream{flow=StreamFlow}} -> + case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of + ok -> + {<<>>, HTTP2Machine2}; + {ok, Increment2, HTTP2Machine3} -> + {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} + end; + _ -> + %% Don't update the stream's window if it stopped. + {<<>>, HTTP2Machine2} end, + State = State0#state{http2_machine=HTTP2Machine}, case {Data1, Data2} of - {<<>>, <<>>} -> ok; - _ -> Transport:send(Socket, [Data1, Data2]) - end, - State#state{http2_machine=HTTP2Machine}. + {<<>>, <<>>} -> + State; + _ -> + ok = maybe_socket_error(State, Transport:send(Socket, [Data1, Data2])), + maybe_reset_idle_timeout(State) + end. %% Send the response, trailers or data. @@ -758,18 +877,21 @@ send_response(State0=#state{http2_machine=HTTP2Machine0}, StreamID, StatusCode, = cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, nofin, #{status => cow_http:status_to_integer(StatusCode)}, headers_to_list(Headers)), - maybe_send_data(State0#state{http2_machine=HTTP2Machine}, StreamID, fin, Body, - [cow_http2:headers(StreamID, nofin, HeaderBlock)]) + {_, State} = maybe_send_data(State0#state{http2_machine=HTTP2Machine}, + StreamID, fin, Body, [cow_http2:headers(StreamID, nofin, HeaderBlock)]), + State end. -send_headers(State=#state{socket=Socket, transport=Transport, +send_headers(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, IsFin0, StatusCode, Headers) -> {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, IsFin0, #{status => cow_http:status_to_integer(StatusCode)}, headers_to_list(Headers)), - Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), - State#state{http2_machine=HTTP2Machine}. + State = State0#state{http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:headers(StreamID, IsFin, HeaderBlock))), + State. %% The set-cookie header is special; we can only send one cookie per header. headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) -> @@ -786,13 +908,18 @@ maybe_send_data(State0=#state{socket=Socket, transport=Transport, end, case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of {ok, HTTP2Machine} -> + State1 = State0#state{http2_machine=HTTP2Machine}, %% If we have prefix data (like a HEADERS frame) we need to send it %% even if we do not send any DATA frames. - case Prefix of - [] -> ok; - _ -> Transport:send(Socket, Prefix) + WasDataSent = case Prefix of + [] -> + no_data_sent; + _ -> + ok = maybe_socket_error(State1, Transport:send(Socket, Prefix)), + data_sent end, - maybe_send_data_alarm(State0#state{http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID); + State = maybe_send_data_alarm(State1, HTTP2Machine0, StreamID), + {WasDataSent, State}; {send, SendData, HTTP2Machine} -> State = #state{http2_status=Status, streams=Streams} = send_data(State0#state{http2_machine=HTTP2Machine}, SendData, Prefix), @@ -801,7 +928,7 @@ maybe_send_data(State0=#state{socket=Socket, transport=Transport, Status =:= closing, Streams =:= #{} -> terminate(State, {stop, normal, 'The connection is going away.'}); true -> - maybe_send_data_alarm(State, HTTP2Machine0, StreamID) + {data_sent, maybe_send_data_alarm(State, HTTP2Machine0, StreamID)} end end. @@ -810,14 +937,23 @@ send_data(State0=#state{socket=Socket, transport=Transport, opts=Opts}, SendData _ = [case Data of {sendfile, Offset, Bytes, Path} -> %% When sendfile is disabled we explicitly use the fallback. - _ = case maps:get(sendfile, Opts, true) of - true -> Transport:sendfile(Socket, Path, Offset, Bytes); - false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, []) - end; + {ok, _} = maybe_socket_error(State, + case maps:get(sendfile, Opts, true) of + true -> Transport:sendfile(Socket, Path, Offset, Bytes); + false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, []) + end + ), + ok; _ -> - Transport:send(Socket, Data) + ok = maybe_socket_error(State, Transport:send(Socket, Data)) end || Data <- Acc], - State. + send_data_terminate(State, SendData). + +send_data_terminate(State, []) -> + State; +send_data_terminate(State0, [{StreamID, IsFin, _}|Tail]) -> + State = maybe_terminate_stream(State0, StreamID, IsFin), + send_data_terminate(State, Tail). prepare_data(State, [], Acc, []) -> {lists:reverse(Acc), State}; @@ -827,8 +963,7 @@ prepare_data(State0, [{StreamID, IsFin, SendData}|Tail], Acc0, Buffer0) -> {Acc, Buffer, State} = prepare_data(State0, StreamID, IsFin, SendData, Acc0, Buffer0), prepare_data(State, Tail, Acc, Buffer). -prepare_data(State0, StreamID, IsFin, [], Acc, Buffer) -> - State = maybe_terminate_stream(State0, StreamID, IsFin), +prepare_data(State, _, _, [], Acc, Buffer) -> {Acc, Buffer, State}; prepare_data(State0, StreamID, IsFin, [FrameData|Tail], Acc, Buffer) -> FrameIsFin = case Tail of @@ -909,20 +1044,26 @@ stream_alarm(State, StreamID, Name, Value) -> %% We may have to cancel streams even if we receive multiple %% GOAWAY frames as the LastStreamID value may be lower than %% the one previously received. -goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine, +%% +%% We do not reset the idle timeout on send here. We already +%% disabled it if we initiated shutdown; and we already reset +%% it if the client sent a GOAWAY frame. +goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0, http2_status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) - when Status =:= connected; Status =:= closing -> + when Status =:= connected; Status =:= closing_initiated; Status =:= closing -> Streams = goaway_streams(State0, maps:to_list(Streams0), LastStreamID, {stop, {goaway, Reason}, 'The connection is going away.'}, []), - State = State0#state{streams=maps:from_list(Streams)}, - case Status of - connected -> - Transport:send(Socket, cow_http2:goaway( - cow_http2_machine:get_last_streamid(HTTP2Machine), - no_error, <<>>)), - State#state{http2_status=closing}; - _ -> - State + State1 = State0#state{streams=maps:from_list(Streams)}, + if + Status =:= connected; Status =:= closing_initiated -> + {OurLastStreamID, HTTP2Machine} = + cow_http2_machine:set_last_streamid(HTTP2Machine0), + State = State1#state{http2_status=closing, http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:goaway(OurLastStreamID, no_error, <<>>))), + State; + true -> + State1 end; %% We terminate the connection immediately if it hasn't fully been initialized. goaway(State, {goaway, _, Reason, _}) -> @@ -938,29 +1079,91 @@ goaway_streams(State, [{StreamID, #stream{state=StreamState}}|Tail], LastStreamI goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) -> goaway_streams(State, Tail, LastStreamID, Reason, [Stream|Acc]). --spec terminate(#state{}, _) -> no_return(). +%% A server that is attempting to gracefully shut down a connection SHOULD send +%% an initial GOAWAY frame with the last stream identifier set to 2^31-1 and a +%% NO_ERROR code. This signals to the client that a shutdown is imminent and +%% that initiating further requests is prohibited. After allowing time for any +%% in-flight stream creation (at least one round-trip time), the server can send +%% another GOAWAY frame with an updated last stream identifier. This ensures +%% that a connection can be cleanly shut down without losing requests. +-spec initiate_closing(#state{}, _) -> #state{}. +initiate_closing(State=#state{http2_status=connected, socket=Socket, + transport=Transport, opts=Opts}, Reason) -> + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:goaway(16#7fffffff, no_error, <<>>))), + Timeout = maps:get(goaway_initial_timeout, Opts, 1000), + Message = {goaway_initial_timeout, Reason}, + set_timeout(State#state{http2_status=closing_initiated}, Timeout, Message); +initiate_closing(State=#state{http2_status=Status}, _Reason) + when Status =:= closing_initiated; Status =:= closing -> + %% This happens if sys:terminate/2,3 is called twice or if the supervisor + %% tells us to shutdown after sys:terminate/2,3 is called or vice versa. + State; +initiate_closing(State, Reason) -> + terminate(State, {stop, stop_reason(Reason), 'The connection is going away.'}). + +%% Switch to 'closing' state and stop accepting new streams. +-spec closing(#state{}, Reason :: term()) -> #state{}. +closing(State=#state{streams=Streams}, Reason) when Streams =:= #{} -> + terminate(State, Reason); +closing(State0=#state{http2_status=closing_initiated, + http2_machine=HTTP2Machine0, socket=Socket, transport=Transport}, + Reason) -> + %% Stop accepting new streams. + {LastStreamID, HTTP2Machine} = + cow_http2_machine:set_last_streamid(HTTP2Machine0), + State = State0#state{http2_status=closing, http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:goaway(LastStreamID, no_error, <<>>))), + closing(State, Reason); +closing(State=#state{http2_status=closing, opts=Opts}, Reason) -> + %% If client sent GOAWAY, we may already be in 'closing' but without the + %% goaway complete timeout set. + Timeout = maps:get(goaway_complete_timeout, Opts, 3000), + Message = {goaway_complete_timeout, Reason}, + set_timeout(State, Timeout, Message). + +stop_reason({stop, Reason, _}) -> Reason; +stop_reason(Reason) -> Reason. + +%% Function copied from cowboy_http. +maybe_socket_error(State, {error, closed}) -> + terminate(State, {socket_error, closed, 'The socket has been closed.'}); +maybe_socket_error(State, Reason) -> + maybe_socket_error(State, Reason, 'An error has occurred on the socket.'). + +maybe_socket_error(_, Result = ok, _) -> + Result; +maybe_socket_error(_, Result = {ok, _}, _) -> + Result; +maybe_socket_error(State, {error, Reason}, Human) -> + terminate(State, {socket_error, Reason, Human}). + +-spec terminate(#state{} | undefined, _) -> no_return(). terminate(undefined, Reason) -> exit({shutdown, Reason}); terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status, http2_machine=HTTP2Machine, streams=Streams, children=Children}, Reason) - when Status =:= connected; Status =:= closing -> + when Status =:= connected; Status =:= closing_initiated; Status =:= closing -> %% @todo We might want to optionally send the Reason value %% as debug data in the GOAWAY frame here. Perhaps more. - case Status of - connected -> - Transport:send(Socket, cow_http2:goaway( + if + Status =:= connected; Status =:= closing_initiated -> + %% We are terminating so it's OK if we can't send the GOAWAY anymore. + _ = Transport:send(Socket, cow_http2:goaway( cow_http2_machine:get_last_streamid(HTTP2Machine), terminate_reason(Reason), <<>>)); %% We already sent the GOAWAY frame. - closing -> + Status =:= closing -> ok end, terminate_all_streams(State, maps:to_list(Streams), Reason), cowboy_children:terminate(Children), + %% @todo Don't linger on connection errors. terminate_linger(State), exit({shutdown, Reason}); -terminate(#state{socket=Socket, transport=Transport}, Reason) -> - Transport:close(Socket), +%% We are not fully connected so we can just terminate the connection. +terminate(_State, Reason) -> exit({shutdown, Reason}). terminate_reason({connection_error, Reason, _}) -> Reason; @@ -994,6 +1197,9 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) -> terminate_linger_before_loop(State, TimerRef, Messages) -> %% We may already be in active mode when we do this %% but it's OK because we are shutting down anyway. + %% + %% We specially handle the socket error to terminate + %% when an error occurs. case setopts_active(State) of ok -> terminate_linger_loop(State, TimerRef, Messages); @@ -1018,13 +1224,18 @@ terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) -> end. %% @todo Don't send an RST_STREAM if one was already sent. +%% +%% When resetting the stream we are technically sending data +%% on the socket. However due to implementation complexities +%% we do not attempt to reset the idle timeout on send. reset_stream(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, Error) -> Reason = case Error of {internal_error, _, _} -> internal_error; {stream_error, Reason0, _} -> Reason0 end, - Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), + ok = maybe_socket_error(State0, Transport:send(Socket, + cow_http2:rst_stream(StreamID, Reason))), State1 = case cow_http2_machine:reset_stream(StreamID, HTTP2Machine0) of {ok, HTTP2Machine} -> terminate_stream(State0#state{http2_machine=HTTP2Machine}, StreamID, Error); @@ -1039,7 +1250,7 @@ reset_stream(State0=#state{socket=Socket, transport=Transport, 'Stream reset rate larger than configuration allows. Flood? (CVE-2019-9514)'}) end. -reset_rate(State0=#state{opts=Opts, reset_rate_num=Num0, reset_rate_time=Time}) -> +reset_rate(State0=#state{reset_rate_num=Num0, reset_rate_time=Time}) -> case Num0 - 1 of 0 -> CurrentTime = erlang:monotonic_time(millisecond), @@ -1048,8 +1259,7 @@ reset_rate(State0=#state{opts=Opts, reset_rate_num=Num0, reset_rate_time=Time}) error; true -> %% When the option has a period of infinity we cannot reach this clause. - {Num, Period} = maps:get(max_reset_stream_rate, Opts, {10, 10000}), - {ok, State0#state{reset_rate_num=Num, reset_rate_time=CurrentTime + Period}} + {ok, init_reset_rate_limiting(State0, CurrentTime)} end; Num -> {ok, State0#state{reset_rate_num=Num}} @@ -1097,7 +1307,8 @@ terminate_stream(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID) -> State = case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine0) of {ok, fin, _} -> - Transport:send(Socket, cow_http2:rst_stream(StreamID, no_error)), + ok = maybe_socket_error(State0, Transport:send(Socket, + cow_http2:rst_stream(StreamID, no_error))), {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), State0#state{http2_machine=HTTP2Machine}; {error, closed} -> @@ -1134,9 +1345,9 @@ system_continue(_, _, {State, Buffer}) -> loop(State, Buffer). -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return(). -system_terminate(Reason, _, _, {State, _}) -> - %% @todo Graceful shutdown here as well? - terminate(State, {stop, {exit, Reason}, 'sys:terminate/2,3 was called.'}). +system_terminate(Reason0, _, _, {State, Buffer}) -> + Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'}, + loop(initiate_closing(State, Reason), Buffer). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. system_code_change(Misc, _, _, _) -> |