diff options
Diffstat (limited to 'src/cowboy_http2.erl')
-rw-r--r-- | src/cowboy_http2.erl | 410 |
1 files changed, 299 insertions, 111 deletions
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 7440d91..0d22fa1 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2015-2017, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -17,6 +17,7 @@ -export([init/6]). -export([init/10]). -export([init/12]). +-export([loop/2]). -export([system_continue/3]). -export([system_terminate/4]). @@ -24,15 +25,20 @@ -type opts() :: #{ active_n => pos_integer(), + alpn_default_protocol => http | http2, compress_buffering => boolean(), compress_threshold => non_neg_integer(), connection_type => worker | supervisor, connection_window_margin_size => 0..16#7fffffff, connection_window_update_threshold => 0..16#7fffffff, + dynamic_buffer => false | {pos_integer(), pos_integer()}, + dynamic_buffer_initial_average => non_neg_integer(), + dynamic_buffer_initial_size => pos_integer(), enable_connect_protocol => boolean(), env => cowboy_middleware:env(), goaway_initial_timeout => timeout(), goaway_complete_timeout => timeout(), + hibernate => boolean(), idle_timeout => timeout(), inactivity_timeout => timeout(), initial_connection_window_size => 65535..16#7fffffff, @@ -44,10 +50,12 @@ max_connection_window_size => 0..16#7fffffff, max_decode_table_size => non_neg_integer(), max_encode_table_size => non_neg_integer(), + max_fragmented_header_block_size => 16384..16#7fffffff, max_frame_size_received => 16384..16777215, max_frame_size_sent => 16384..16777215 | infinity, max_received_frame_rate => {pos_integer(), timeout()}, max_reset_stream_rate => {pos_integer(), timeout()}, + max_cancel_stream_rate => {pos_integer(), timeout()}, max_stream_buffer_size => non_neg_integer(), max_stream_window_size => 0..16#7fffffff, metrics_callback => cowboy_metrics_h:metrics_callback(), @@ -55,7 +63,9 @@ metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()), middlewares => [module()], preface_timeout => timeout(), + protocols => [http | http2], proxy_header => boolean(), + reset_idle_timeout_on_send => boolean(), sendfile => boolean(), settings_timeout => timeout(), shutdown_timeout => timeout(), @@ -82,6 +92,14 @@ state :: {module, any()} }). +%% We don't want to reset the idle timeout too often, +%% so we don't reset it on data. Instead we reset the +%% number of ticks we have observed. We divide the +%% timeout value by a value and that value becomes +%% the number of ticks at which point we can drop +%% the connection. This value is the number of ticks. +-define(IDLE_TIMEOUT_TICKS, 10). + -record(state, { parent = undefined :: pid(), ref :: ranch:ref(), @@ -92,6 +110,7 @@ %% Timer for idle_timeout; also used for goaway timers. timer = undefined :: undefined | reference(), + idle_timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS, %% Remote address and port for the connection. peer = undefined :: {inet:ip_address(), inet:port_number()}, @@ -114,9 +133,17 @@ reset_rate_num :: undefined | pos_integer(), reset_rate_time :: undefined | integer(), + %% HTTP/2 rapid reset attack protection. + cancel_rate_num :: undefined | pos_integer(), + cancel_rate_time :: undefined | integer(), + %% Flow requested for all streams. flow = 0 :: non_neg_integer(), + %% Dynamic buffer moving average and current buffer size. + dynamic_buffer_size :: pos_integer() | false, + dynamic_buffer_moving_average :: non_neg_integer(), + %% Currently active HTTP/2 streams. Streams may be initiated either %% by the client or by the server through PUSH_PROMISE frames. streams = #{} :: #{cow_http2:streamid() => #stream{}}, @@ -127,11 +154,14 @@ }). -spec init(pid(), ranch:ref(), inet:socket(), module(), - ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> ok. + ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> no_return(). + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> - Peer0 = Transport:peername(Socket), - Sock0 = Transport:sockname(Socket), - Cert1 = case Transport:name() of + {ok, Peer} = maybe_socket_error(undefined, Transport:peername(Socket), + 'A socket error occurred when retrieving the peer name.'), + {ok, Sock} = maybe_socket_error(undefined, Transport:sockname(Socket), + 'A socket error occurred when retrieving the sock name.'), + CertResult = case Transport:name() of ssl -> case ssl:peercert(Socket) of {error, no_peercert} -> @@ -142,40 +172,37 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> _ -> {ok, undefined} end, - case {Peer0, Sock0, Cert1} of - {{ok, Peer}, {ok, Sock}, {ok, Cert}} -> - init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, <<>>); - {{error, Reason}, _, _} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the peer name.'}); - {_, {error, Reason}, _} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the sock name.'}); - {_, _, {error, Reason}} -> - terminate(undefined, {socket_error, Reason, - 'A socket error occurred when retrieving the client TLS certificate.'}) - end. + {ok, Cert} = maybe_socket_error(undefined, CertResult, + 'A socket error occurred when retrieving the client TLS certificate.'), + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, <<>>). -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info() | undefined, cowboy:opts(), {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()}, - binary() | undefined, binary()) -> ok. + binary() | undefined, binary()) -> no_return(). + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) -> + DynamicBuffer = init_dynamic_buffer_size(Opts), {ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts), + %% Send the preface before doing all the init in case we get a socket error. + ok = maybe_socket_error(undefined, Transport:send(Socket, Preface)), State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, - http2_status=sequence, http2_machine=HTTP2Machine})), - Transport:send(Socket, Preface), - setopts_active(State), + dynamic_buffer_size=DynamicBuffer, + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), + http2_status=sequence, http2_machine=HTTP2Machine}), 0), + safe_setopts_active(State), case Buffer of - <<>> -> loop(State, Buffer); + <<>> -> before_loop(State, Buffer); _ -> parse(State, Buffer) end. -init_rate_limiting(State) -> +init_rate_limiting(State0) -> CurrentTime = erlang:monotonic_time(millisecond), - init_reset_rate_limiting(init_frame_rate_limiting(State, CurrentTime), CurrentTime). + State1 = init_frame_rate_limiting(State0, CurrentTime), + State2 = init_reset_rate_limiting(State1, CurrentTime), + init_cancel_rate_limiting(State2, CurrentTime). init_frame_rate_limiting(State=#state{opts=Opts}, CurrentTime) -> {FrameRateNum, FrameRatePeriod} = maps:get(max_received_frame_rate, Opts, {10000, 10000}), @@ -189,6 +216,12 @@ init_reset_rate_limiting(State=#state{opts=Opts}, CurrentTime) -> reset_rate_num=ResetRateNum, reset_rate_time=add_period(CurrentTime, ResetRatePeriod) }. +init_cancel_rate_limiting(State=#state{opts=Opts}, CurrentTime) -> + {CancelRateNum, CancelRatePeriod} = maps:get(max_cancel_stream_rate, Opts, {500, 10000}), + State#state{ + cancel_rate_num=CancelRateNum, cancel_rate_time=add_period(CurrentTime, CancelRatePeriod) + }. + add_period(_, infinity) -> infinity; add_period(Time, Period) -> Time + Period. @@ -196,15 +229,19 @@ add_period(Time, Period) -> Time + Period. -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info() | undefined, cowboy:opts(), {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()}, - binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok. + binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> no_return(). + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer, _Settings, Req=#{method := Method}) -> + DynamicBuffer = init_dynamic_buffer_size(Opts), {ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts), {ok, StreamID, HTTP2Machine} = cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0), State0 = #state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, + dynamic_buffer_size=DynamicBuffer, + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), http2_status=upgrade, http2_machine=HTTP2Machine}, State1 = headers_frame(State0#state{ http2_machine=HTTP2Machine}, StreamID, Req), @@ -214,21 +251,36 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer <<"connection">> => <<"Upgrade">>, <<"upgrade">> => <<"h2c">> }, ?MODULE, undefined}), %% @todo undefined or #{}? - State = set_idle_timeout(init_rate_limiting(State2#state{http2_status=sequence})), - Transport:send(Socket, Preface), - setopts_active(State), + State = set_idle_timeout(init_rate_limiting(State2#state{http2_status=sequence}), 0), + %% In the case of HTTP/1.1 Upgrade we cannot send the Preface + %% until we send the 101 response. + ok = maybe_socket_error(State, Transport:send(Socket, Preface)), + safe_setopts_active(State), case Buffer of - <<>> -> loop(State, Buffer); + <<>> -> before_loop(State, Buffer); _ -> parse(State, Buffer) end. +-include("cowboy_dynamic_buffer.hrl"). + %% Because HTTP/2 has flow control and Cowboy has other rate limiting %% mechanisms implemented, a very large active_n value should be fine, %% as long as the stream handlers do their work in a timely manner. +%% However large active_n values reduce the impact of dynamic_buffer. setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> - N = maps:get(active_n, Opts, 100), + N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). +safe_setopts_active(State) -> + ok = maybe_socket_error(State, setopts_active(State)). + +before_loop(State=#state{opts=#{hibernate := true}}, Buffer) -> + proc_lib:hibernate(?MODULE, loop, [State, Buffer]); +before_loop(State, Buffer) -> + loop(State, Buffer). + +-spec loop(#state{}, binary()) -> no_return(). + loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, timer=TimerRef, children=Children}, Buffer) -> Messages = Transport:messages(), @@ -236,7 +288,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, receive %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(set_idle_timeout(State), << Buffer/binary, Data/binary >>); + State1 = maybe_resize_buffer(State, Data), + parse(State1#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>); {Closed, Socket} when Closed =:= element(2, Messages) -> Reason = case State#state.http2_status of closing -> {stop, closed, 'The client is going away.'}; @@ -248,53 +301,64 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, {Passive, Socket} when Passive =:= element(4, Messages); %% Hardcoded for compatibility with Ranch 1.x. Passive =:= tcp_passive; Passive =:= ssl_passive -> - setopts_active(State), - loop(State, Buffer); + safe_setopts_active(State), + before_loop(State, Buffer); %% System messages. {'EXIT', Parent, shutdown} -> Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'}, - loop(initiate_closing(State, Reason), Buffer); + before_loop(initiate_closing(State, Reason), Buffer); {'EXIT', Parent, Reason} -> terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'}); {system, From, Request} -> sys:handle_system_msg(Request, From, Parent, ?MODULE, [], {State, Buffer}); %% Timeouts. {timeout, TimerRef, idle_timeout} -> - terminate(State, {stop, timeout, - 'Connection idle longer than configuration allows.'}); + tick_idle_timeout(State, Buffer); {timeout, Ref, {shutdown, Pid}} -> cowboy_children:shutdown_timeout(Children, Ref, Pid), - loop(State, Buffer); + before_loop(State, Buffer); {timeout, TRef, {cow_http2_machine, Name}} -> - loop(timeout(State, Name, TRef), Buffer); + before_loop(timeout(State, Name, TRef), Buffer); {timeout, TimerRef, {goaway_initial_timeout, Reason}} -> - loop(closing(State, Reason), Buffer); + before_loop(closing(State, Reason), Buffer); {timeout, TimerRef, {goaway_complete_timeout, Reason}} -> terminate(State, {stop, stop_reason(Reason), 'Graceful shutdown timed out.'}); %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> - loop(info(State, StreamID, Msg), Buffer); + before_loop(info(State, StreamID, Msg), Buffer); %% Exit signal from children. Msg = {'EXIT', Pid, _} -> - loop(down(State, Pid, Msg), Buffer); + before_loop(down(State, Pid, Msg), Buffer); %% Calls from supervisor module. {'$gen_call', From, Call} -> cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE), - loop(State, Buffer); + before_loop(State, Buffer); Msg -> cowboy:log(warning, "Received stray message ~p.", [Msg], Opts), - loop(State, Buffer) + before_loop(State, Buffer) after InactivityTimeout -> terminate(State, {internal_error, timeout, 'No message or data received before timeout.'}) end. -set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef}) +tick_idle_timeout(State=#state{idle_timeout_num=?IDLE_TIMEOUT_TICKS}, _) -> + terminate(State, {stop, timeout, + 'Connection idle longer than configuration allows.'}); +tick_idle_timeout(State=#state{idle_timeout_num=TimeoutNum}, Buffer) -> + before_loop(set_idle_timeout(State, TimeoutNum + 1), Buffer). + +set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef}, _) when Status =:= closing_initiated orelse Status =:= closing, TimerRef =/= undefined -> State; -set_idle_timeout(State=#state{opts=Opts}) -> - set_timeout(State, maps:get(idle_timeout, Opts, 60000), idle_timeout). +set_idle_timeout(State=#state{opts=Opts}, TimeoutNum) -> + case maps:get(idle_timeout, Opts, 60000) of + infinity -> + State#state{timer=undefined}; + Timeout -> + set_timeout(State#state{idle_timeout_num=TimeoutNum}, + Timeout div ?IDLE_TIMEOUT_TICKS, idle_timeout) + end. set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) -> ok = case TimerRef0 of @@ -307,6 +371,14 @@ set_timeout(State=#state{timer=TimerRef0}, Timeout, Message) -> end, State#state{timer=TimerRef}. +maybe_reset_idle_timeout(State=#state{opts=Opts}) -> + case maps:get(reset_idle_timeout_on_send, Opts, false) of + true -> + State#state{idle_timeout_num=0}; + false -> + State + end. + %% HTTP/2 protocol parsing. parse(State=#state{http2_status=sequence}, Data) -> @@ -314,7 +386,7 @@ parse(State=#state{http2_status=sequence}, Data) -> {ok, Rest} -> parse(State#state{http2_status=settings}, Rest); more -> - loop(State, Data); + before_loop(State, Data); Error = {connection_error, _, _} -> terminate(State, Error) end; @@ -333,7 +405,7 @@ parse(State=#state{http2_status=Status, http2_machine=HTTP2Machine, streams=Stre more when Status =:= closing, Streams =:= #{} -> terminate(State, {stop, normal, 'The connection is going away.'}); more -> - loop(State, Data) + before_loop(State, Data) end. %% Frame rate flood protection. @@ -383,10 +455,11 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) -> goaway(State#state{http2_machine=HTTP2Machine}, GoAway); {send, SendData, HTTP2Machine} -> %% We may need to send an alarm for each of the streams sending data. - lists:foldl( + State1 = lists:foldl( fun({StreamID, _, _}, S) -> maybe_send_data_alarm(S, HTTP2Machine0, StreamID) end, send_data(maybe_ack(State#state{http2_machine=HTTP2Machine}, Frame), SendData, []), - SendData); + SendData), + maybe_reset_idle_timeout(State1); {error, {stream_error, StreamID, Reason, Human}, HTTP2Machine} -> reset_stream(State#state{http2_machine=HTTP2Machine}, StreamID, {stream_error, Reason, Human}); @@ -398,15 +471,20 @@ frame(State=#state{http2_machine=HTTP2Machine0}, Frame) -> %% if we were still waiting for a SETTINGS frame. maybe_ack(State=#state{http2_status=settings}, Frame) -> maybe_ack(State#state{http2_status=connected}, Frame); +%% We do not reset the idle timeout on send here because we are +%% sending data as a consequence of receiving data, which means +%% we already resetted the idle timeout. maybe_ack(State=#state{socket=Socket, transport=Transport}, Frame) -> case Frame of - {settings, _} -> Transport:send(Socket, cow_http2:settings_ack()); - {ping, Opaque} -> Transport:send(Socket, cow_http2:ping_ack(Opaque)); + {settings, _} -> + ok = maybe_socket_error(State, Transport:send(Socket, cow_http2:settings_ack())); + {ping, Opaque} -> + ok = maybe_socket_error(State, Transport:send(Socket, cow_http2:ping_ack(Opaque))); _ -> ok end, State. -data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin, Data) -> +data_frame(State0=#state{opts=Opts, flow=Flow0, streams=Streams}, StreamID, IsFin, Data) -> case Streams of #{StreamID := Stream=#stream{status=running, flow=StreamFlow, state=StreamState0}} -> try cowboy_stream:data(StreamID, IsFin, Data, StreamState0) of @@ -415,11 +493,26 @@ data_frame(State0=#state{opts=Opts, flow=Flow, streams=Streams}, StreamID, IsFin %% We may receive more data than we requested. We ensure %% that the flow value doesn't go lower than 0. Size = byte_size(Data), - State = update_window(State0#state{flow=max(0, Flow - Size), + Flow = max(0, Flow0 - Size), + %% We would normally update the window when changing the flow + %% value. But because we are running commands, which themselves + %% may update the window, and we want to avoid updating the + %% window twice in a row, we first run the commands and then + %% only update the window a flow command was executed. We know + %% that it was because the flow value changed in the state. + State1 = State0#state{flow=Flow, streams=Streams#{StreamID => Stream#stream{ flow=max(0, StreamFlow - Size), state=StreamState}}}, - StreamID), - commands(State, StreamID, Commands) + State = commands(State1, StreamID, Commands), + case State of + %% No flow command was executed. We must update the window + %% because we changed the flow value earlier. + #state{flow=Flow} -> + update_window(State, StreamID); + %% Otherwise the window was updated already. + _ -> + State + end catch Class:Exception:Stacktrace -> cowboy:log(cowboy_stream:make_error_log(data, [StreamID, IsFin, Data, StreamState0], @@ -568,11 +661,27 @@ rst_stream_frame(State=#state{streams=Streams0, children=Children0}, StreamID, R {#stream{state=StreamState}, Streams} -> terminate_stream_handler(State, StreamID, Reason, StreamState), Children = cowboy_children:shutdown(Children0, StreamID), - State#state{streams=Streams, children=Children}; + cancel_rate_limit(State#state{streams=Streams, children=Children}); error -> State end. +cancel_rate_limit(State0=#state{cancel_rate_num=Num0, cancel_rate_time=Time}) -> + case Num0 - 1 of + 0 -> + CurrentTime = erlang:monotonic_time(millisecond), + if + CurrentTime < Time -> + terminate(State0, {connection_error, enhance_your_calm, + 'Stream cancel rate larger than configuration allows. Flood? (CVE-2023-44487)'}); + true -> + %% When the option has a period of infinity we cannot reach this clause. + init_cancel_rate_limiting(State0, CurrentTime) + end; + Num -> + State0#state{cancel_rate_num=Num} + end. + ignored_frame(State=#state{http2_machine=HTTP2Machine0}) -> case cow_http2_machine:ignored_frame(HTTP2Machine0) of {ok, HTTP2Machine} -> @@ -657,23 +766,37 @@ commands(State=#state{http2_machine=HTTP2Machine}, StreamID, end; %% Send an informational response. commands(State0, StreamID, [{inform, StatusCode, Headers}|Tail]) -> - State = send_headers(State0, StreamID, idle, StatusCode, Headers), + State1 = send_headers(State0, StreamID, idle, StatusCode, Headers), + State = maybe_reset_idle_timeout(State1), commands(State, StreamID, Tail); %% Send response headers. commands(State0, StreamID, [{response, StatusCode, Headers, Body}|Tail]) -> - State = send_response(State0, StreamID, StatusCode, Headers, Body), + State1 = send_response(State0, StreamID, StatusCode, Headers, Body), + State = maybe_reset_idle_timeout(State1), commands(State, StreamID, Tail); %% Send response headers. commands(State0, StreamID, [{headers, StatusCode, Headers}|Tail]) -> - State = send_headers(State0, StreamID, nofin, StatusCode, Headers), + State1 = send_headers(State0, StreamID, nofin, StatusCode, Headers), + State = maybe_reset_idle_timeout(State1), commands(State, StreamID, Tail); %% Send a response body chunk. commands(State0, StreamID, [{data, IsFin, Data}|Tail]) -> - State = maybe_send_data(State0, StreamID, IsFin, Data, []), + State = case maybe_send_data(State0, StreamID, IsFin, Data, []) of + {data_sent, State1} -> + maybe_reset_idle_timeout(State1); + {no_data_sent, State1} -> + State1 + end, commands(State, StreamID, Tail); %% Send trailers. commands(State0, StreamID, [{trailers, Trailers}|Tail]) -> - State = maybe_send_data(State0, StreamID, fin, {trailers, maps:to_list(Trailers)}, []), + State = case maybe_send_data(State0, StreamID, fin, + {trailers, maps:to_list(Trailers)}, []) of + {data_sent, State1} -> + maybe_reset_idle_timeout(State1); + {no_data_sent, State1} -> + State1 + end, commands(State, StreamID, Tail); %% Send a push promise. %% @@ -705,10 +828,11 @@ commands(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Ma State = case cow_http2_machine:prepare_push_promise(StreamID, HTTP2Machine0, PseudoHeaders, Headers) of {ok, PromisedStreamID, HeaderBlock, HTTP2Machine} -> - Transport:send(Socket, cow_http2:push_promise( - StreamID, PromisedStreamID, HeaderBlock)), - headers_frame(State0#state{http2_machine=HTTP2Machine}, - PromisedStreamID, fin, Headers, PseudoHeaders, 0); + State1 = State0#state{http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State1, Transport:send(Socket, + cow_http2:push_promise(StreamID, PromisedStreamID, HeaderBlock))), + State2 = maybe_reset_idle_timeout(State1), + headers_frame(State2, PromisedStreamID, fin, Headers, PseudoHeaders, 0); {error, no_push} -> State0 end, @@ -731,10 +855,14 @@ commands(State, StreamID, [Error = {internal_error, _, _}|_Tail]) -> %% @todo Only reset when the stream still exists. reset_stream(State, StreamID, Error); %% Upgrade to HTTP/2. This is triggered by cowboy_http2 itself. +%% +%% We do not need to reset the idle timeout on send because it +%% hasn't been set yet. This is called from init/12. commands(State=#state{socket=Socket, transport=Transport, http2_status=upgrade}, StreamID, [{switch_protocol, Headers, ?MODULE, _}|Tail]) -> %% @todo This 101 response needs to be passed through stream handlers. - Transport:send(Socket, cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers))), + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http:response(101, 'HTTP/1.1', maps:to_list(Headers)))), commands(State, StreamID, Tail); %% Use a different protocol within the stream (CONNECT :protocol). %% @todo Make sure we error out when the feature is disabled. @@ -755,22 +883,32 @@ commands(State=#state{opts=Opts}, StreamID, [Log={log, _, _, _}|Tail]) -> %% Tentatively update the window after the flow was updated. -update_window(State=#state{socket=Socket, transport=Transport, +update_window(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0, flow=Flow, streams=Streams}, StreamID) -> - #{StreamID := #stream{flow=StreamFlow}} = Streams, {Data1, HTTP2Machine2} = case cow_http2_machine:ensure_window(Flow, HTTP2Machine0) of ok -> {<<>>, HTTP2Machine0}; {ok, Increment1, HTTP2Machine1} -> {cow_http2:window_update(Increment1), HTTP2Machine1} end, - {Data2, HTTP2Machine} = case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of - ok -> {<<>>, HTTP2Machine2}; - {ok, Increment2, HTTP2Machine3} -> {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} + {Data2, HTTP2Machine} = case Streams of + #{StreamID := #stream{flow=StreamFlow}} -> + case cow_http2_machine:ensure_window(StreamID, StreamFlow, HTTP2Machine2) of + ok -> + {<<>>, HTTP2Machine2}; + {ok, Increment2, HTTP2Machine3} -> + {cow_http2:window_update(StreamID, Increment2), HTTP2Machine3} + end; + _ -> + %% Don't update the stream's window if it stopped. + {<<>>, HTTP2Machine2} end, + State = State0#state{http2_machine=HTTP2Machine}, case {Data1, Data2} of - {<<>>, <<>>} -> ok; - _ -> Transport:send(Socket, [Data1, Data2]) - end, - State#state{http2_machine=HTTP2Machine}. + {<<>>, <<>>} -> + State; + _ -> + ok = maybe_socket_error(State, Transport:send(Socket, [Data1, Data2])), + maybe_reset_idle_timeout(State) + end. %% Send the response, trailers or data. @@ -790,18 +928,21 @@ send_response(State0=#state{http2_machine=HTTP2Machine0}, StreamID, StatusCode, = cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, nofin, #{status => cow_http:status_to_integer(StatusCode)}, headers_to_list(Headers)), - maybe_send_data(State0#state{http2_machine=HTTP2Machine}, StreamID, fin, Body, - [cow_http2:headers(StreamID, nofin, HeaderBlock)]) + {_, State} = maybe_send_data(State0#state{http2_machine=HTTP2Machine}, + StreamID, fin, Body, [cow_http2:headers(StreamID, nofin, HeaderBlock)]), + State end. -send_headers(State=#state{socket=Socket, transport=Transport, +send_headers(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, IsFin0, StatusCode, Headers) -> {ok, IsFin, HeaderBlock, HTTP2Machine} = cow_http2_machine:prepare_headers(StreamID, HTTP2Machine0, IsFin0, #{status => cow_http:status_to_integer(StatusCode)}, headers_to_list(Headers)), - Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), - State#state{http2_machine=HTTP2Machine}. + State = State0#state{http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:headers(StreamID, IsFin, HeaderBlock))), + State. %% The set-cookie header is special; we can only send one cookie per header. headers_to_list(Headers0=#{<<"set-cookie">> := SetCookies}) -> @@ -818,13 +959,18 @@ maybe_send_data(State0=#state{socket=Socket, transport=Transport, end, case cow_http2_machine:send_or_queue_data(StreamID, HTTP2Machine0, IsFin, Data) of {ok, HTTP2Machine} -> + State1 = State0#state{http2_machine=HTTP2Machine}, %% If we have prefix data (like a HEADERS frame) we need to send it %% even if we do not send any DATA frames. - case Prefix of - [] -> ok; - _ -> Transport:send(Socket, Prefix) + WasDataSent = case Prefix of + [] -> + no_data_sent; + _ -> + ok = maybe_socket_error(State1, Transport:send(Socket, Prefix)), + data_sent end, - maybe_send_data_alarm(State0#state{http2_machine=HTTP2Machine}, HTTP2Machine0, StreamID); + State = maybe_send_data_alarm(State1, HTTP2Machine0, StreamID), + {WasDataSent, State}; {send, SendData, HTTP2Machine} -> State = #state{http2_status=Status, streams=Streams} = send_data(State0#state{http2_machine=HTTP2Machine}, SendData, Prefix), @@ -833,7 +979,7 @@ maybe_send_data(State0=#state{socket=Socket, transport=Transport, Status =:= closing, Streams =:= #{} -> terminate(State, {stop, normal, 'The connection is going away.'}); true -> - maybe_send_data_alarm(State, HTTP2Machine0, StreamID) + {data_sent, maybe_send_data_alarm(State, HTTP2Machine0, StreamID)} end end. @@ -842,12 +988,15 @@ send_data(State0=#state{socket=Socket, transport=Transport, opts=Opts}, SendData _ = [case Data of {sendfile, Offset, Bytes, Path} -> %% When sendfile is disabled we explicitly use the fallback. - _ = case maps:get(sendfile, Opts, true) of - true -> Transport:sendfile(Socket, Path, Offset, Bytes); - false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, []) - end; + {ok, _} = maybe_socket_error(State, + case maps:get(sendfile, Opts, true) of + true -> Transport:sendfile(Socket, Path, Offset, Bytes); + false -> ranch_transport:sendfile(Transport, Socket, Path, Offset, Bytes, []) + end + ), + ok; _ -> - Transport:send(Socket, Data) + ok = maybe_socket_error(State, Transport:send(Socket, Data)) end || Data <- Acc], send_data_terminate(State, SendData). @@ -946,22 +1095,26 @@ stream_alarm(State, StreamID, Name, Value) -> %% We may have to cancel streams even if we receive multiple %% GOAWAY frames as the LastStreamID value may be lower than %% the one previously received. +%% +%% We do not reset the idle timeout on send here. We already +%% disabled it if we initiated shutdown; and we already reset +%% it if the client sent a GOAWAY frame. goaway(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0, http2_status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) when Status =:= connected; Status =:= closing_initiated; Status =:= closing -> Streams = goaway_streams(State0, maps:to_list(Streams0), LastStreamID, {stop, {goaway, Reason}, 'The connection is going away.'}, []), - State = State0#state{streams=maps:from_list(Streams)}, + State1 = State0#state{streams=maps:from_list(Streams)}, if Status =:= connected; Status =:= closing_initiated -> {OurLastStreamID, HTTP2Machine} = cow_http2_machine:set_last_streamid(HTTP2Machine0), - Transport:send(Socket, cow_http2:goaway( - OurLastStreamID, no_error, <<>>)), - State#state{http2_status=closing, - http2_machine=HTTP2Machine}; + State = State1#state{http2_status=closing, http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:goaway(OurLastStreamID, no_error, <<>>))), + State; true -> - State + State1 end; %% We terminate the connection immediately if it hasn't fully been initialized. goaway(State, {goaway, _, Reason, _}) -> @@ -984,10 +1137,13 @@ goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) -> %% in-flight stream creation (at least one round-trip time), the server can send %% another GOAWAY frame with an updated last stream identifier. This ensures %% that a connection can be cleanly shut down without losing requests. + -spec initiate_closing(#state{}, _) -> #state{}. + initiate_closing(State=#state{http2_status=connected, socket=Socket, transport=Transport, opts=Opts}, Reason) -> - Transport:send(Socket, cow_http2:goaway(16#7fffffff, no_error, <<>>)), + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:goaway(16#7fffffff, no_error, <<>>))), Timeout = maps:get(goaway_initial_timeout, Opts, 1000), Message = {goaway_initial_timeout, Reason}, set_timeout(State#state{http2_status=closing_initiated}, Timeout, Message); @@ -1000,17 +1156,21 @@ initiate_closing(State, Reason) -> terminate(State, {stop, stop_reason(Reason), 'The connection is going away.'}). %% Switch to 'closing' state and stop accepting new streams. + -spec closing(#state{}, Reason :: term()) -> #state{}. + closing(State=#state{streams=Streams}, Reason) when Streams =:= #{} -> terminate(State, Reason); -closing(State=#state{http2_status=closing_initiated, +closing(State0=#state{http2_status=closing_initiated, http2_machine=HTTP2Machine0, socket=Socket, transport=Transport}, Reason) -> %% Stop accepting new streams. {LastStreamID, HTTP2Machine} = cow_http2_machine:set_last_streamid(HTTP2Machine0), - Transport:send(Socket, cow_http2:goaway(LastStreamID, no_error, <<>>)), - closing(State#state{http2_status=closing, http2_machine=HTTP2Machine}, Reason); + State = State0#state{http2_status=closing, http2_machine=HTTP2Machine}, + ok = maybe_socket_error(State, Transport:send(Socket, + cow_http2:goaway(LastStreamID, no_error, <<>>))), + closing(State, Reason); closing(State=#state{http2_status=closing, opts=Opts}, Reason) -> %% If client sent GOAWAY, we may already be in 'closing' but without the %% goaway complete timeout set. @@ -1021,7 +1181,21 @@ closing(State=#state{http2_status=closing, opts=Opts}, Reason) -> stop_reason({stop, Reason, _}) -> Reason; stop_reason(Reason) -> Reason. --spec terminate(#state{}, _) -> no_return(). +%% Function copied from cowboy_http. +maybe_socket_error(State, {error, closed}) -> + terminate(State, {socket_error, closed, 'The socket has been closed.'}); +maybe_socket_error(State, Reason) -> + maybe_socket_error(State, Reason, 'An error has occurred on the socket.'). + +maybe_socket_error(_, Result = ok, _) -> + Result; +maybe_socket_error(_, Result = {ok, _}, _) -> + Result; +maybe_socket_error(State, {error, Reason}, Human) -> + terminate(State, {socket_error, Reason, Human}). + +-spec terminate(#state{} | undefined, _) -> no_return(). + terminate(undefined, Reason) -> exit({shutdown, Reason}); terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status, @@ -1031,7 +1205,8 @@ terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status, %% as debug data in the GOAWAY frame here. Perhaps more. if Status =:= connected; Status =:= closing_initiated -> - Transport:send(Socket, cow_http2:goaway( + %% We are terminating so it's OK if we can't send the GOAWAY anymore. + _ = Transport:send(Socket, cow_http2:goaway( cow_http2_machine:get_last_streamid(HTTP2Machine), terminate_reason(Reason), <<>>)); %% We already sent the GOAWAY frame. @@ -1040,10 +1215,11 @@ terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status, end, terminate_all_streams(State, maps:to_list(Streams), Reason), cowboy_children:terminate(Children), + %% @todo Don't linger on connection errors. terminate_linger(State), exit({shutdown, Reason}); -terminate(#state{socket=Socket, transport=Transport}, Reason) -> - Transport:close(Socket), +%% We are not fully connected so we can just terminate the connection. +terminate(_State, Reason) -> exit({shutdown, Reason}). terminate_reason({connection_error, Reason, _}) -> Reason; @@ -1077,6 +1253,9 @@ terminate_linger(State=#state{socket=Socket, transport=Transport, opts=Opts}) -> terminate_linger_before_loop(State, TimerRef, Messages) -> %% We may already be in active mode when we do this %% but it's OK because we are shutting down anyway. + %% + %% We specially handle the socket error to terminate + %% when an error occurs. case setopts_active(State) of ok -> terminate_linger_loop(State, TimerRef, Messages); @@ -1101,13 +1280,18 @@ terminate_linger_loop(State=#state{socket=Socket}, TimerRef, Messages) -> end. %% @todo Don't send an RST_STREAM if one was already sent. +%% +%% When resetting the stream we are technically sending data +%% on the socket. However due to implementation complexities +%% we do not attempt to reset the idle timeout on send. reset_stream(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, Error) -> Reason = case Error of {internal_error, _, _} -> internal_error; {stream_error, Reason0, _} -> Reason0 end, - Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), + ok = maybe_socket_error(State0, Transport:send(Socket, + cow_http2:rst_stream(StreamID, Reason))), State1 = case cow_http2_machine:reset_stream(StreamID, HTTP2Machine0) of {ok, HTTP2Machine} -> terminate_stream(State0#state{http2_machine=HTTP2Machine}, StreamID, Error); @@ -1179,7 +1363,8 @@ terminate_stream(State0=#state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID) -> State = case cow_http2_machine:get_stream_local_state(StreamID, HTTP2Machine0) of {ok, fin, _} -> - Transport:send(Socket, cow_http2:rst_stream(StreamID, no_error)), + ok = maybe_socket_error(State0, Transport:send(Socket, + cow_http2:rst_stream(StreamID, no_error))), {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), State0#state{http2_machine=HTTP2Machine}; {error, closed} -> @@ -1211,15 +1396,18 @@ terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) -> %% System callbacks. --spec system_continue(_, _, {#state{}, binary()}) -> ok. +-spec system_continue(_, _, {#state{}, binary()}) -> no_return(). + system_continue(_, _, {State, Buffer}) -> - loop(State, Buffer). + before_loop(State, Buffer). -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return(). + system_terminate(Reason0, _, _, {State, Buffer}) -> Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'}, - loop(initiate_closing(State, Reason), Buffer). + before_loop(initiate_closing(State, Reason), Buffer). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. + system_code_change(Misc, _, _, _) -> {ok, Misc}. |