diff options
Diffstat (limited to 'src/cowboy_websocket.erl')
-rw-r--r-- | src/cowboy_websocket.erl | 60 |
1 files changed, 43 insertions, 17 deletions
diff --git a/src/cowboy_websocket.erl b/src/cowboy_websocket.erl index 577de47..cb30c3f 100644 --- a/src/cowboy_websocket.erl +++ b/src/cowboy_websocket.erl @@ -1,4 +1,4 @@ -%% Copyright (c) 2011-2024, Loïc Hoguin <[email protected]> +%% Copyright (c) Loïc Hoguin <[email protected]> %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above @@ -69,6 +69,9 @@ active_n => pos_integer(), compress => boolean(), deflate_opts => cow_ws:deflate_opts(), + dynamic_buffer => false | {pos_integer(), pos_integer()}, + dynamic_buffer_initial_average => non_neg_integer(), + dynamic_buffer_initial_size => pos_integer(), idle_timeout => timeout(), max_frame_size => non_neg_integer() | infinity, req_filter => fun((cowboy_req:req()) -> map()), @@ -97,6 +100,11 @@ timeout_num = 0 :: 0..?IDLE_TIMEOUT_TICKS, messages = undefined :: undefined | {atom(), atom(), atom()} | {atom(), atom(), atom(), atom()}, + + %% Dynamic buffer moving average and current buffer size. + dynamic_buffer_size = false :: pos_integer() | false, + dynamic_buffer_moving_average = 0 :: non_neg_integer(), + hibernate = false :: boolean(), frag_state = undefined :: cow_ws:frag_state(), frag_buffer = <<>> :: binary(), @@ -270,7 +278,7 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, %% @todo We don't want date and server headers. Headers = cowboy_req:response_headers(#{}, Req), Pid ! {{Pid, StreamID}, {switch_protocol, Headers, ?MODULE, {State, HandlerState}}}, - takeover(Pid, Ref, {Pid, StreamID}, undefined, undefined, <<>>, + takeover(Pid, Ref, {Pid, StreamID}, undefined, #{}, <<>>, {State, HandlerState}). %% Connection process. @@ -295,8 +303,8 @@ websocket_handshake(State, Req=#{ref := Ref, pid := Pid, streamid := StreamID}, -spec takeover(pid(), ranch:ref(), inet:socket() | {pid(), cowboy_stream:streamid()}, module() | undefined, any(), binary(), {#state{}, any()}) -> no_return(). -takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, - {State0=#state{handler=Handler, req=Req}, HandlerState}) -> +takeover(Parent, Ref, Socket, Transport, Opts, Buffer, + {State0=#state{opts=WsOpts, handler=Handler, req=Req}, HandlerState}) -> case Req of #{version := 'HTTP/3'} -> ok; %% @todo We should have an option to disable this behavior. @@ -308,7 +316,11 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, end, State = set_idle_timeout(State0#state{parent=Parent, ref=Ref, socket=Socket, transport=Transport, - key=undefined, messages=Messages}, 0), + opts=WsOpts#{dynamic_buffer => maps:get(dynamic_buffer, Opts, false)}, + key=undefined, messages=Messages, + %% Dynamic buffer only applies to HTTP/1.1 Websocket. + dynamic_buffer_size=init_dynamic_buffer_size(Opts), + dynamic_buffer_moving_average=maps:get(dynamic_buffer_initial_average, Opts, 0)}, 0), %% We call parse_header/3 immediately because there might be %% some data in the buffer that was sent along with the handshake. %% While it is not allowed by the protocol to send frames immediately, @@ -319,6 +331,12 @@ takeover(Parent, Ref, Socket, Transport, _Opts, Buffer, false -> after_init(State, HandlerState, #ps_header{buffer=Buffer}) end. +-include("cowboy_dynamic_buffer.hrl"). + +%% @todo Implement early socket error detection. +maybe_socket_error(_, _) -> + ok. + after_init(State=#state{active=true}, HandlerState, ParseState) -> %% Enable active,N for HTTP/1.1, and auto read_body for HTTP/2. %% We must do this only after calling websocket_init/1 (if any) @@ -340,7 +358,7 @@ after_init(State, HandlerState, ParseState) -> setopts_active(#state{transport=undefined}) -> ok; setopts_active(#state{socket=Socket, transport=Transport, opts=Opts}) -> - N = maps:get(active_n, Opts, 100), + N = maps:get(active_n, Opts, 1), Transport:setopts(Socket, [{active, N}]). maybe_read_body(#state{socket=Stream={Pid, _}, transport=undefined, active=true}) -> @@ -384,6 +402,7 @@ before_loop(State, HandlerState, ParseState) -> -spec set_idle_timeout(#state{}, 0..?IDLE_TIMEOUT_TICKS) -> #state{}. +%% @todo Do we really need this for HTTP/2? set_idle_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}, TimeoutNum) -> %% Most of the time we don't need to cancel the timer since it %% will have triggered already. But this call is harmless so @@ -391,7 +410,7 @@ set_idle_timeout(State=#state{opts=Opts, timeout_ref=PrevRef}, TimeoutNum) -> %% options are changed dynamically. _ = case PrevRef of undefined -> ignore; - PrevRef -> erlang:cancel_timer(PrevRef) + PrevRef -> erlang:cancel_timer(PrevRef, [{async, true}, {info, false}]) end, case maps:get(idle_timeout, Opts, 60000) of infinity -> @@ -414,7 +433,8 @@ loop(State=#state{parent=Parent, socket=Socket, messages=Messages, receive %% Socket messages. (HTTP/1.1) {OK, Socket, Data} when OK =:= element(1, Messages) -> - parse(?reset_idle_timeout(State), HandlerState, ParseState, Data); + State1 = maybe_resize_buffer(State, Data), + parse(?reset_idle_timeout(State1), HandlerState, ParseState, Data); {Closed, Socket} when Closed =:= element(2, Messages) -> terminate(State, HandlerState, {error, closed}); {Error, Socket, Reason} when Error =:= element(3, Messages) -> @@ -480,12 +500,16 @@ parse_header(State=#state{opts=Opts, frag_state=FragState, extensions=Extensions websocket_close(State, HandlerState, {error, badframe}) end. -parse_payload(State=#state{frag_state=FragState, utf8_state=Incomplete, extensions=Extensions}, +parse_payload(State=#state{opts=Opts, frag_state=FragState, utf8_state=Incomplete, extensions=Extensions}, HandlerState, ParseState=#ps_payload{ type=Type, len=Len, mask_key=MaskKey, rsv=Rsv, unmasked=Unmasked, unmasked_len=UnmaskedLen}, Data) -> + MaxFrameSize = case maps:get(max_frame_size, Opts, infinity) of + infinity -> infinity; + MaxFrameSize0 -> MaxFrameSize0 - UnmaskedLen + end, case cow_ws:parse_payload(Data, MaskKey, Incomplete, UnmaskedLen, - Type, Len, FragState, Extensions, Rsv) of + Type, Len, FragState, Extensions#{max_inflate_size => MaxFrameSize}, Rsv) of {ok, CloseCode, Payload, Utf8State, Rest} -> dispatch_frame(State#state{utf8_state=Utf8State}, HandlerState, ParseState#ps_payload{unmasked= <<Unmasked/binary, Payload/binary>>, @@ -615,14 +639,16 @@ commands([{active, Active}|Tail], State0=#state{active=Active0}, Data) when is_b commands(Tail, State#state{active=Active}, Data); commands([{deflate, Deflate}|Tail], State, Data) when is_boolean(Deflate) -> commands(Tail, State#state{deflate=Deflate}, Data); -commands([{set_options, SetOpts}|Tail], State0=#state{opts=Opts}, Data) -> - State = case SetOpts of - #{idle_timeout := IdleTimeout} -> +commands([{set_options, SetOpts}|Tail], State0, Data) -> + State = maps:fold(fun + (idle_timeout, IdleTimeout, StateF=#state{opts=Opts}) -> %% We reset the number of ticks when changing the idle_timeout option. - set_idle_timeout(State0#state{opts=Opts#{idle_timeout => IdleTimeout}}, 0); - _ -> - State0 - end, + set_idle_timeout(StateF#state{opts=Opts#{idle_timeout => IdleTimeout}}, 0); + (max_frame_size, MaxFrameSize, StateF=#state{opts=Opts}) -> + StateF#state{opts=Opts#{max_frame_size => MaxFrameSize}}; + (_, _, StateF) -> + StateF + end, State0, SetOpts), commands(Tail, State, Data); commands([{shutdown_reason, ShutdownReason}|Tail], State, Data) -> commands(Tail, State#state{shutdown_reason=ShutdownReason}, Data); |