From 611f9a9b78cab4005892e13dffb7a2c8e44580ee Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Fri, 2 Aug 2019 14:30:08 +0200 Subject: Add flow control Flow control is disabled by default. The initial flow value must be set to enable it (either for the entire connection or on a per-request basis). Flow applies to all HTTP streams as well as Websocket. HTTP/2 pushed streams receive the same value as their originating stream. --- src/gun_ws.erl | 54 +++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 39 insertions(+), 15 deletions(-) (limited to 'src/gun_ws.erl') diff --git a/src/gun_ws.erl b/src/gun_ws.erl index 7acf74e..42cf049 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -16,8 +16,9 @@ -export([check_options/1]). -export([name/0]). --export([init/8]). +-export([init/9]). -export([handle/4]). +-export([update_flow/4]). -export([close/4]). -export([send/4]). -export([down/1]). @@ -42,6 +43,7 @@ frag_state = undefined :: cow_ws:frag_state(), utf8_state = 0 :: cow_ws:utf8_state(), extensions = #{} :: cow_ws:extensions(), + flow :: integer() | infinity, handler :: module(), handler_state :: any() }). @@ -55,6 +57,8 @@ do_check_options([{compress, B}|Opts]) when B =:= true; B =:= false -> do_check_options(Opts); do_check_options([{default_protocol, M}|Opts]) when is_atom(M) -> do_check_options(Opts); +do_check_options([{flow, InitialFlow}|Opts]) when is_integer(InitialFlow), InitialFlow > 0 -> + do_check_options(Opts); do_check_options([Opt={protocols, L}|Opts]) when is_list(L) -> case lists:usort(lists:flatten([[is_binary(B), is_atom(M)] || {B, M} <- L])) of [true] -> do_check_options(Opts); @@ -67,19 +71,19 @@ do_check_options([Opt|_]) -> name() -> ws. -init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) -> +init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Handler, Opts) -> Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, - HandlerState = Handler:init(Owner, StreamRef, Headers, Opts), + {ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts), {switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef, socket=Socket, transport=Transport, extensions=Extensions, - handler=Handler, handler_state=HandlerState}}. + flow=InitialFlow, handler=Handler, handler_state=HandlerState}}. %% Do not handle anything if we received a close frame. handle(_, State=#ws_state{in=close}, _, EvHandlerState) -> {{state, State}, EvHandlerState}; %% Shortcut for common case when Data is empty after processing a frame. handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) -> - {{state, State}, EvHandlerState}; + maybe_active(State, EvHandlerState); handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer, in=head, frag_state=FragState, extensions=Extensions}, EvHandler, EvHandlerState0) -> @@ -113,7 +117,7 @@ handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer, in=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey}, frag_state=FragState2}, EvHandler, EvHandlerState); more -> - {{state, State#ws_state{buffer=Data2}}, EvHandlerState1}; + maybe_active(State#ws_state{buffer=Data2}, EvHandlerState1); error -> close({error, badframe}, State, EvHandler, EvHandlerState1) end; @@ -130,20 +134,26 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke <>, CloseCode, EvHandler, EvHandlerState); {more, CloseCode2, Payload, Utf8State2} -> - {{state, State#ws_state{in=In#payload{close_code=CloseCode2, + maybe_active(State#ws_state{in=In#payload{close_code=CloseCode2, unmasked= <>, - len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}}, - EvHandlerState}; + len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}, + EvHandlerState); {more, Payload, Utf8State2} -> - {{state, State#ws_state{in=In#payload{unmasked= <>, - len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}}, - EvHandlerState}; + maybe_active(State#ws_state{in=In#payload{unmasked= <>, + len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}, + EvHandlerState); Error = {error, _Reason} -> close(Error, State, EvHandler, EvHandlerState) end. +maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) -> + {[ + {state, State}, + {active, Flow > 0} + ], EvHandlerState}. + dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, - frag_state=FragState, extensions=Extensions, + frag_state=FragState, extensions=Extensions, flow=Flow0, handler=Handler, handler_state=HandlerState0}, Type, Payload, CloseCode, EvHandler, EvHandlerState0) -> EvHandlerState1 = EvHandler:ws_recv_frame_end(#{ @@ -165,8 +175,12 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, {pong, _} -> handle(Rest, State0, EvHandler, EvHandlerState1); Frame -> - HandlerState = Handler:handle(Frame, HandlerState0), - State1 = State0#ws_state{handler_state=HandlerState}, + {ok, Dec, HandlerState} = Handler:handle(Frame, HandlerState0), + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 - Dec + end, + State1 = State0#ws_state{flow=Flow, handler_state=HandlerState}, State = case Frame of close -> State1#ws_state{in=close}; {close, _, _} -> State1#ws_state{in=close}; @@ -176,6 +190,16 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, handle(Rest, State, EvHandler, EvHandlerState1) end. +update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) -> + Flow = case Flow0 of + infinity -> infinity; + _ -> Flow0 + Inc + end, + [ + {state, State#ws_state{flow=Flow}}, + {active, Flow > 0} + ]. + close(Reason, State, EvHandler, EvHandlerState) -> case Reason of normal -> -- cgit v1.2.3