diff options
Diffstat (limited to 'src/cowboy_http2.erl')
| -rw-r--r-- | src/cowboy_http2.erl | 85 |
1 files changed, 62 insertions, 23 deletions
diff --git a/src/cowboy_http2.erl b/src/cowboy_http2.erl index 0e110cd..0d22fa1 100644 --- a/src/cowboy_http2.erl +++ b/src/cowboy_http2.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2015-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -17,6 +17,7 @@ -export([init/6]). -export([init/10]). -export([init/12]). +-export([loop/2]). -export([system_continue/3]). -export([system_terminate/4]). @@ -24,15 +25,20 @@ -type opts() :: #{ active_n => pos_integer(), + alpn_default_protocol => http | http2, compress_buffering => boolean(), compress_threshold => non_neg_integer(), connection_type => worker | supervisor, connection_window_margin_size => 0..16#7fffffff, connection_window_update_threshold => 0..16#7fffffff, + dynamic_buffer => false | {pos_integer(), pos_integer()}, + dynamic_buffer_initial_average => non_neg_integer(), + dynamic_buffer_initial_size => pos_integer(), enable_connect_protocol => boolean(), env => cowboy_middleware:env(), goaway_initial_timeout => timeout(), goaway_complete_timeout => timeout(), + hibernate => boolean(), idle_timeout => timeout(), inactivity_timeout => timeout(), initial_connection_window_size => 65535..16#7fffffff, @@ -57,6 +63,7 @@ metrics_resp_headers_filter => fun((cowboy:http_headers()) -> cowboy:http_headers()), middlewares => [module()], preface_timeout => timeout(), + protocols => [http | http2], proxy_header => boolean(), reset_idle_timeout_on_send => boolean(), sendfile => boolean(), @@ -133,6 +140,10 @@ %% Flow requested for all streams. flow = 0 :: non_neg_integer(), + %% Dynamic buffer moving average and current buffer size. + dynamic_buffer_size :: pos_integer() | false, + dynamic_buffer_moving_average :: non_neg_integer(), + %% Currently active HTTP/2 streams. Streams may be initiated either %% by the client or by the server through PUSH_PROMISE frames. streams = #{} :: #{cow_http2:streamid() => #stream{}}, @@ -143,7 +154,8 @@ }). -spec init(pid(), ranch:ref(), inet:socket(), module(), - ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> ok. + ranch_proxy_header:proxy_info() | undefined, cowboy:opts()) -> no_return(). + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> {ok, Peer} = maybe_socket_error(undefined, Transport:peername(Socket), 'A socket error occurred when retrieving the peer name.'), @@ -167,18 +179,22 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts) -> -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info() | undefined, cowboy:opts(), {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()}, - binary() | undefined, binary()) -> ok. + binary() | undefined, binary()) -> no_return(). + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer) -> + DynamicBuffer = init_dynamic_buffer_size(Opts), {ok, Preface, HTTP2Machine} = cow_http2_machine:init(server, Opts), %% Send the preface before doing all the init in case we get a socket error. ok = maybe_socket_error(undefined, Transport:send(Socket, Preface)), State = set_idle_timeout(init_rate_limiting(#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, + dynamic_buffer_size=DynamicBuffer, + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), http2_status=sequence, http2_machine=HTTP2Machine}), 0), safe_setopts_active(State), case Buffer of - <<>> -> loop(State, Buffer); + <<>> -> before_loop(State, Buffer); _ -> parse(State, Buffer) end. @@ -213,15 +229,19 @@ add_period(Time, Period) -> Time + Period. -spec init(pid(), ranch:ref(), inet:socket(), module(), ranch_proxy_header:proxy_info() | undefined, cowboy:opts(), {inet:ip_address(), inet:port_number()}, {inet:ip_address(), inet:port_number()}, - binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> ok. + binary() | undefined, binary(), map() | undefined, cowboy_req:req()) -> no_return(). + init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer, _Settings, Req=#{method := Method}) -> + DynamicBuffer = init_dynamic_buffer_size(Opts), {ok, Preface, HTTP2Machine0} = cow_http2_machine:init(server, Opts), {ok, StreamID, HTTP2Machine} = cow_http2_machine:init_upgrade_stream(Method, HTTP2Machine0), State0 = #state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, proxy_header=ProxyHeader, opts=Opts, peer=Peer, sock=Sock, cert=Cert, + dynamic_buffer_size=DynamicBuffer, + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0), http2_status=upgrade, http2_machine=HTTP2Machine}, State1 = headers_frame(State0#state{ http2_machine=HTTP2Machine}, StreamID, Req), @@ -237,20 +257,30 @@ init(Parent, Ref, Socket, Transport, ProxyHeader, Opts, Peer, Sock, Cert, Buffer ok = maybe_socket_error(State, Transport:send(Socket, Preface)), safe_setopts_active(State), case Buffer of - <<>> -> loop(State, Buffer); + <<>> -> before_loop(State, Buffer); _ -> parse(State, Buffer) end. +-include("cowboy_dynamic_buffer.hrl"). + %% Because HTTP/2 has flow control and Cowboy has other rate limiting %% mechanisms implemented, a very large active_n value should be fine, %% as long as the stream handlers do their work in a timely manner. +%% However large active_n values reduce the impact of dynamic_buffer. setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> - N = maps:get(active_n, Opts, 100), + N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). safe_setopts_active(State) -> ok = maybe_socket_error(State, setopts_active(State)). +before_loop(State=#state{opts=#{hibernate := true}}, Buffer) -> + proc_lib:hibernate(?MODULE, loop, [State, Buffer]); +before_loop(State, Buffer) -> + loop(State, Buffer). + +-spec loop(#state{}, binary()) -> no_return(). + loop(State=#state{parent=Parent, socket=Socket, transport=Transport, opts=Opts, timer=TimerRef, children=Children}, Buffer) -> Messages = Transport:messages(), @@ -258,7 +288,8 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, receive %% Socket messages. {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(State#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>); + State1 = maybe_resize_buffer(State, Data), + parse(State1#state{idle_timeout_num=0}, << Buffer/binary, Data/binary >>); {Closed, Socket} when Closed =:= element(2, Messages) -> Reason = case State#state.http2_status of closing -> {stop, closed, 'The client is going away.'}; @@ -271,11 +302,11 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, %% Hardcoded for compatibility with Ranch 1.x. Passive =:= tcp_passive; Passive =:= ssl_passive -> safe_setopts_active(State), - loop(State, Buffer); + before_loop(State, Buffer); %% System messages. {'EXIT', Parent, shutdown} -> Reason = {stop, {exit, shutdown}, 'Parent process requested shutdown.'}, - loop(initiate_closing(State, Reason), Buffer); + before_loop(initiate_closing(State, Reason), Buffer); {'EXIT', Parent, Reason} -> terminate(State, {stop, {exit, Reason}, 'Parent process terminated.'}); {system, From, Request} -> @@ -285,27 +316,27 @@ loop(State=#state{parent=Parent, socket=Socket, transport=Transport, tick_idle_timeout(State, Buffer); {timeout, Ref, {shutdown, Pid}} -> cowboy_children:shutdown_timeout(Children, Ref, Pid), - loop(State, Buffer); + before_loop(State, Buffer); {timeout, TRef, {cow_http2_machine, Name}} -> - loop(timeout(State, Name, TRef), Buffer); + before_loop(timeout(State, Name, TRef), Buffer); {timeout, TimerRef, {goaway_initial_timeout, Reason}} -> - loop(closing(State, Reason), Buffer); + before_loop(closing(State, Reason), Buffer); {timeout, TimerRef, {goaway_complete_timeout, Reason}} -> terminate(State, {stop, stop_reason(Reason), 'Graceful shutdown timed out.'}); %% Messages pertaining to a stream. {{Pid, StreamID}, Msg} when Pid =:= self() -> - loop(info(State, StreamID, Msg), Buffer); + before_loop(info(State, StreamID, Msg), Buffer); %% Exit signal from children. Msg = {'EXIT', Pid, _} -> - loop(down(State, Pid, Msg), Buffer); + before_loop(down(State, Pid, Msg), Buffer); %% Calls from supervisor module. {'$gen_call', From, Call} -> cowboy_children:handle_supervisor_call(Call, From, Children, ?MODULE), - loop(State, Buffer); + before_loop(State, Buffer); Msg -> cowboy:log(warning, "Received stray message ~p.", [Msg], Opts), - loop(State, Buffer) + before_loop(State, Buffer) after InactivityTimeout -> terminate(State, {internal_error, timeout, 'No message or data received before timeout.'}) end. @@ -314,7 +345,7 @@ tick_idle_timeout(State=#state{idle_timeout_num=?IDLE_TIMEOUT_TICKS}, _) -> terminate(State, {stop, timeout, 'Connection idle longer than configuration allows.'}); tick_idle_timeout(State=#state{idle_timeout_num=TimeoutNum}, Buffer) -> - loop(set_idle_timeout(State, TimeoutNum + 1), Buffer). + before_loop(set_idle_timeout(State, TimeoutNum + 1), Buffer). set_idle_timeout(State=#state{http2_status=Status, timer=TimerRef}, _) when Status =:= closing_initiated orelse Status =:= closing, @@ -355,7 +386,7 @@ parse(State=#state{http2_status=sequence}, Data) -> {ok, Rest} -> parse(State#state{http2_status=settings}, Rest); more -> - loop(State, Data); + before_loop(State, Data); Error = {connection_error, _, _} -> terminate(State, Error) end; @@ -374,7 +405,7 @@ parse(State=#state{http2_status=Status, http2_machine=HTTP2Machine, streams=Stre more when Status =:= closing, Streams =:= #{} -> terminate(State, {stop, normal, 'The connection is going away.'}); more -> - loop(State, Data) + before_loop(State, Data) end. %% Frame rate flood protection. @@ -1106,7 +1137,9 @@ goaway_streams(State, [Stream|Tail], LastStreamID, Reason, Acc) -> %% in-flight stream creation (at least one round-trip time), the server can send %% another GOAWAY frame with an updated last stream identifier. This ensures %% that a connection can be cleanly shut down without losing requests. + -spec initiate_closing(#state{}, _) -> #state{}. + initiate_closing(State=#state{http2_status=connected, socket=Socket, transport=Transport, opts=Opts}, Reason) -> ok = maybe_socket_error(State, Transport:send(Socket, @@ -1123,7 +1156,9 @@ initiate_closing(State, Reason) -> terminate(State, {stop, stop_reason(Reason), 'The connection is going away.'}). %% Switch to 'closing' state and stop accepting new streams. + -spec closing(#state{}, Reason :: term()) -> #state{}. + closing(State=#state{streams=Streams}, Reason) when Streams =:= #{} -> terminate(State, Reason); closing(State0=#state{http2_status=closing_initiated, @@ -1160,6 +1195,7 @@ maybe_socket_error(State, {error, Reason}, Human) -> terminate(State, {socket_error, Reason, Human}). -spec terminate(#state{} | undefined, _) -> no_return(). + terminate(undefined, Reason) -> exit({shutdown, Reason}); terminate(State=#state{socket=Socket, transport=Transport, http2_status=Status, @@ -1360,15 +1396,18 @@ terminate_stream_handler(#state{opts=Opts}, StreamID, Reason, StreamState) -> %% System callbacks. --spec system_continue(_, _, {#state{}, binary()}) -> ok. +-spec system_continue(_, _, {#state{}, binary()}) -> no_return(). + system_continue(_, _, {State, Buffer}) -> - loop(State, Buffer). + before_loop(State, Buffer). -spec system_terminate(any(), _, _, {#state{}, binary()}) -> no_return(). + system_terminate(Reason0, _, _, {State, Buffer}) -> Reason = {stop, {exit, Reason0}, 'sys:terminate/2,3 was called.'}, - loop(initiate_closing(State, Reason), Buffer). + before_loop(initiate_closing(State, Reason), Buffer). -spec system_code_change(Misc, _, _, _) -> {ok, Misc} when Misc::{#state{}, binary()}. + system_code_change(Misc, _, _, _) -> {ok, Misc}. |
