diff options
Diffstat (limited to 'src/gun_ws.erl')
-rw-r--r-- | src/gun_ws.erl | 94 |
1 files changed, 63 insertions, 31 deletions
diff --git a/src/gun_ws.erl b/src/gun_ws.erl index 42cf049..49911dc 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -19,6 +19,7 @@ -export([init/9]). -export([handle/4]). -export([update_flow/4]). +-export([closing/4]). -export([close/4]). -export([send/4]). -export([down/1]). @@ -38,8 +39,10 @@ stream_ref :: reference(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), + opts = #{} :: map(), %% @todo buffer = <<>> :: binary(), in = head :: head | #payload{} | close, + out = head :: head | close, frag_state = undefined :: cow_ws:frag_state(), utf8_state = 0 :: cow_ws:utf8_state(), extensions = #{} :: cow_ws:extensions(), @@ -53,6 +56,10 @@ check_options(Opts) -> do_check_options([]) -> ok; +do_check_options([{closing_timeout, infinity}|Opts]) -> + do_check_options(Opts); +do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 -> + do_check_options(Opts); do_check_options([{compress, B}|Opts]) when B =:= true; B =:= false -> do_check_options(Opts); do_check_options([{default_protocol, M}|Opts]) when is_atom(M) -> @@ -75,12 +82,15 @@ init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Hand Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, {ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts), {switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef, - socket=Socket, transport=Transport, extensions=Extensions, + socket=Socket, transport=Transport, opts=Opts, extensions=Extensions, flow=InitialFlow, handler=Handler, handler_state=HandlerState}}. %% Do not handle anything if we received a close frame. -handle(_, State=#ws_state{in=close}, _, EvHandlerState) -> - {{state, State}, EvHandlerState}; +%% Initiate or terminate the closing state depending on whether we sent a close yet. +handle(_, State=#ws_state{in=close, out=close}, _, EvHandlerState) -> + {[{state, State}, close], EvHandlerState}; +handle(_, State=#ws_state{in=close}, EvHandler, EvHandlerState) -> + closing(normal, State, EvHandler, EvHandlerState); %% Shortcut for common case when Data is empty after processing a frame. handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) -> maybe_active(State, EvHandlerState); @@ -119,7 +129,7 @@ handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer, more -> maybe_active(State#ws_state{buffer=Data2}, EvHandlerState1); error -> - close({error, badframe}, State, EvHandler, EvHandlerState1) + closing({error, badframe}, State, EvHandler, EvHandlerState1) end; handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey, close_code=CloseCode, unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState, @@ -143,7 +153,7 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}, EvHandlerState); Error = {error, _Reason} -> - close(Error, State, EvHandler, EvHandlerState) + closing(Error, State, EvHandler, EvHandlerState) end. maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) -> @@ -165,11 +175,11 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, }, EvHandlerState0), case cow_ws:make_frame(Type, Payload, CloseCode, FragState) of ping -> - {{state, State}, EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1), - handle(Rest, State, EvHandler, EvHandlerState); + {[], EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1), + handle(Rest, State0, EvHandler, EvHandlerState); {ping, Payload} -> - {{state, State}, EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1), - handle(Rest, State, EvHandler, EvHandlerState); + {[], EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1), + handle(Rest, State0, EvHandler, EvHandlerState); pong -> handle(Rest, State0, EvHandler, EvHandlerState1); {pong, _} -> @@ -200,26 +210,30 @@ update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) -> {active, Flow > 0} ]. -close(Reason, State, EvHandler, EvHandlerState) -> - case Reason of - normal -> - send({close, 1000, <<>>}, State, EvHandler, EvHandlerState); - owner_down -> - send({close, 1001, <<>>}, State, EvHandler, EvHandlerState); - {error, badframe} -> - send({close, 1002, <<>>}, State, EvHandler, EvHandlerState); - {error, badencoding} -> - send({close, 1007, <<>>}, State, EvHandler, EvHandlerState); - %% Socket errors; do nothing. - closed -> - {ok, EvHandlerState}; - {error, _} -> - {ok, EvHandlerState} - end. +%% The user already sent the close frame; do nothing. +closing(_, State=#ws_state{out=close}, _, EvHandlerState) -> + {closing(State), EvHandlerState}; +closing(Reason, State, EvHandler, EvHandlerState) -> + Code = case Reason of + normal -> 1000; + owner_down -> 1001; + shutdown -> 1001; + {error, badframe} -> 1002; + {error, badencoding} -> 1007 + end, + send({close, Code, <<>>}, State, EvHandler, EvHandlerState). +closing(#ws_state{opts=Opts}) -> + Timeout = maps:get(closing_timeout, Opts, 15000), + {closing, Timeout}. + +close(_, _, _, EvHandlerState) -> + EvHandlerState. + +%% Send one frame. send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, - socket=Socket, transport=Transport, extensions=Extensions}, - EvHandler, EvHandlerState0) -> + socket=Socket, transport=Transport, in=In, extensions=Extensions}, + EvHandler, EvHandlerState0) when not is_list(Frame) -> WsSendFrameEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo, @@ -229,10 +243,28 @@ send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, EvHandlerState1 = EvHandler:ws_send_frame_start(WsSendFrameEvent, EvHandlerState0), Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)), EvHandlerState = EvHandler:ws_send_frame_end(WsSendFrameEvent, EvHandlerState1), - case Frame of - close -> {close, EvHandlerState}; - {close, _, _} -> {close, EvHandlerState}; - _ -> {{state, State}, EvHandlerState} + if + Frame =:= close; element(1, Frame) =:= close -> + {[ + {state, State#ws_state{out=close}}, + %% We can close immediately if we already received a close frame. + case In of + close -> close; + _ -> closing(State) + end + ], EvHandlerState}; + true -> + {[], EvHandlerState} + end; +%% Send many frames. +send([], _, _, EvHandlerState) -> + {[], EvHandlerState}; +send([Frame|Tail], State, EvHandler, EvHandlerState0) -> + case send(Frame, State, EvHandler, EvHandlerState0) of + {[], EvHandlerState} -> + send(Tail, State, EvHandler, EvHandlerState); + Other -> + Other end. %% Websocket has no concept of streams. |