aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_ws.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/gun_ws.erl')
-rw-r--r--src/gun_ws.erl54
1 files changed, 39 insertions, 15 deletions
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index 7acf74e..42cf049 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -16,8 +16,9 @@
-export([check_options/1]).
-export([name/0]).
--export([init/8]).
+-export([init/9]).
-export([handle/4]).
+-export([update_flow/4]).
-export([close/4]).
-export([send/4]).
-export([down/1]).
@@ -42,6 +43,7 @@
frag_state = undefined :: cow_ws:frag_state(),
utf8_state = 0 :: cow_ws:utf8_state(),
extensions = #{} :: cow_ws:extensions(),
+ flow :: integer() | infinity,
handler :: module(),
handler_state :: any()
}).
@@ -55,6 +57,8 @@ do_check_options([{compress, B}|Opts]) when B =:= true; B =:= false ->
do_check_options(Opts);
do_check_options([{default_protocol, M}|Opts]) when is_atom(M) ->
do_check_options(Opts);
+do_check_options([{flow, InitialFlow}|Opts]) when is_integer(InitialFlow), InitialFlow > 0 ->
+ do_check_options(Opts);
do_check_options([Opt={protocols, L}|Opts]) when is_list(L) ->
case lists:usort(lists:flatten([[is_binary(B), is_atom(M)] || {B, M} <- L])) of
[true] -> do_check_options(Opts);
@@ -67,19 +71,19 @@ do_check_options([Opt|_]) ->
name() -> ws.
-init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) ->
+init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Handler, Opts) ->
Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
- HandlerState = Handler:init(Owner, StreamRef, Headers, Opts),
+ {ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts),
{switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef,
socket=Socket, transport=Transport, extensions=Extensions,
- handler=Handler, handler_state=HandlerState}}.
+ flow=InitialFlow, handler=Handler, handler_state=HandlerState}}.
%% Do not handle anything if we received a close frame.
handle(_, State=#ws_state{in=close}, _, EvHandlerState) ->
{{state, State}, EvHandlerState};
%% Shortcut for common case when Data is empty after processing a frame.
handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) ->
- {{state, State}, EvHandlerState};
+ maybe_active(State, EvHandlerState);
handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer,
in=head, frag_state=FragState, extensions=Extensions},
EvHandler, EvHandlerState0) ->
@@ -113,7 +117,7 @@ handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer,
in=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey},
frag_state=FragState2}, EvHandler, EvHandlerState);
more ->
- {{state, State#ws_state{buffer=Data2}}, EvHandlerState1};
+ maybe_active(State#ws_state{buffer=Data2}, EvHandlerState1);
error ->
close({error, badframe}, State, EvHandler, EvHandlerState1)
end;
@@ -130,20 +134,26 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke
<<Unmasked/binary, Payload/binary>>, CloseCode,
EvHandler, EvHandlerState);
{more, CloseCode2, Payload, Utf8State2} ->
- {{state, State#ws_state{in=In#payload{close_code=CloseCode2,
+ maybe_active(State#ws_state{in=In#payload{close_code=CloseCode2,
unmasked= <<Unmasked/binary, Payload/binary>>,
- len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}},
- EvHandlerState};
+ len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2},
+ EvHandlerState);
{more, Payload, Utf8State2} ->
- {{state, State#ws_state{in=In#payload{unmasked= <<Unmasked/binary, Payload/binary>>,
- len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}},
- EvHandlerState};
+ maybe_active(State#ws_state{in=In#payload{unmasked= <<Unmasked/binary, Payload/binary>>,
+ len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2},
+ EvHandlerState);
Error = {error, _Reason} ->
close(Error, State, EvHandler, EvHandlerState)
end.
+maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) ->
+ {[
+ {state, State},
+ {active, Flow > 0}
+ ], EvHandlerState}.
+
dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
- frag_state=FragState, extensions=Extensions,
+ frag_state=FragState, extensions=Extensions, flow=Flow0,
handler=Handler, handler_state=HandlerState0},
Type, Payload, CloseCode, EvHandler, EvHandlerState0) ->
EvHandlerState1 = EvHandler:ws_recv_frame_end(#{
@@ -165,8 +175,12 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
{pong, _} ->
handle(Rest, State0, EvHandler, EvHandlerState1);
Frame ->
- HandlerState = Handler:handle(Frame, HandlerState0),
- State1 = State0#ws_state{handler_state=HandlerState},
+ {ok, Dec, HandlerState} = Handler:handle(Frame, HandlerState0),
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 - Dec
+ end,
+ State1 = State0#ws_state{flow=Flow, handler_state=HandlerState},
State = case Frame of
close -> State1#ws_state{in=close};
{close, _, _} -> State1#ws_state{in=close};
@@ -176,6 +190,16 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
handle(Rest, State, EvHandler, EvHandlerState1)
end.
+update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) ->
+ Flow = case Flow0 of
+ infinity -> infinity;
+ _ -> Flow0 + Inc
+ end,
+ [
+ {state, State#ws_state{flow=Flow}},
+ {active, Flow > 0}
+ ].
+
close(Reason, State, EvHandler, EvHandlerState) ->
case Reason of
normal ->