aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_ws.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/gun_ws.erl')
-rw-r--r--src/gun_ws.erl94
1 files changed, 63 insertions, 31 deletions
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index 42cf049..49911dc 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -19,6 +19,7 @@
-export([init/9]).
-export([handle/4]).
-export([update_flow/4]).
+-export([closing/4]).
-export([close/4]).
-export([send/4]).
-export([down/1]).
@@ -38,8 +39,10 @@
stream_ref :: reference(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
+ opts = #{} :: map(), %% @todo
buffer = <<>> :: binary(),
in = head :: head | #payload{} | close,
+ out = head :: head | close,
frag_state = undefined :: cow_ws:frag_state(),
utf8_state = 0 :: cow_ws:utf8_state(),
extensions = #{} :: cow_ws:extensions(),
@@ -53,6 +56,10 @@ check_options(Opts) ->
do_check_options([]) ->
ok;
+do_check_options([{closing_timeout, infinity}|Opts]) ->
+ do_check_options(Opts);
+do_check_options([{closing_timeout, T}|Opts]) when is_integer(T), T > 0 ->
+ do_check_options(Opts);
do_check_options([{compress, B}|Opts]) when B =:= true; B =:= false ->
do_check_options(Opts);
do_check_options([{default_protocol, M}|Opts]) when is_atom(M) ->
@@ -75,12 +82,15 @@ init(Owner, Socket, Transport, StreamRef, Headers, Extensions, InitialFlow, Hand
Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
{ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts),
{switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef,
- socket=Socket, transport=Transport, extensions=Extensions,
+ socket=Socket, transport=Transport, opts=Opts, extensions=Extensions,
flow=InitialFlow, handler=Handler, handler_state=HandlerState}}.
%% Do not handle anything if we received a close frame.
-handle(_, State=#ws_state{in=close}, _, EvHandlerState) ->
- {{state, State}, EvHandlerState};
+%% Initiate or terminate the closing state depending on whether we sent a close yet.
+handle(_, State=#ws_state{in=close, out=close}, _, EvHandlerState) ->
+ {[{state, State}, close], EvHandlerState};
+handle(_, State=#ws_state{in=close}, EvHandler, EvHandlerState) ->
+ closing(normal, State, EvHandler, EvHandlerState);
%% Shortcut for common case when Data is empty after processing a frame.
handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) ->
maybe_active(State, EvHandlerState);
@@ -119,7 +129,7 @@ handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer,
more ->
maybe_active(State#ws_state{buffer=Data2}, EvHandlerState1);
error ->
- close({error, badframe}, State, EvHandler, EvHandlerState1)
+ closing({error, badframe}, State, EvHandler, EvHandlerState1)
end;
handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey,
close_code=CloseCode, unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState,
@@ -143,7 +153,7 @@ handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_ke
len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2},
EvHandlerState);
Error = {error, _Reason} ->
- close(Error, State, EvHandler, EvHandlerState)
+ closing(Error, State, EvHandler, EvHandlerState)
end.
maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) ->
@@ -165,11 +175,11 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
}, EvHandlerState0),
case cow_ws:make_frame(Type, Payload, CloseCode, FragState) of
ping ->
- {{state, State}, EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1),
- handle(Rest, State, EvHandler, EvHandlerState);
+ {[], EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1),
+ handle(Rest, State0, EvHandler, EvHandlerState);
{ping, Payload} ->
- {{state, State}, EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1),
- handle(Rest, State, EvHandler, EvHandlerState);
+ {[], EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1),
+ handle(Rest, State0, EvHandler, EvHandlerState);
pong ->
handle(Rest, State0, EvHandler, EvHandlerState1);
{pong, _} ->
@@ -200,26 +210,30 @@ update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) ->
{active, Flow > 0}
].
-close(Reason, State, EvHandler, EvHandlerState) ->
- case Reason of
- normal ->
- send({close, 1000, <<>>}, State, EvHandler, EvHandlerState);
- owner_down ->
- send({close, 1001, <<>>}, State, EvHandler, EvHandlerState);
- {error, badframe} ->
- send({close, 1002, <<>>}, State, EvHandler, EvHandlerState);
- {error, badencoding} ->
- send({close, 1007, <<>>}, State, EvHandler, EvHandlerState);
- %% Socket errors; do nothing.
- closed ->
- {ok, EvHandlerState};
- {error, _} ->
- {ok, EvHandlerState}
- end.
+%% The user already sent the close frame; do nothing.
+closing(_, State=#ws_state{out=close}, _, EvHandlerState) ->
+ {closing(State), EvHandlerState};
+closing(Reason, State, EvHandler, EvHandlerState) ->
+ Code = case Reason of
+ normal -> 1000;
+ owner_down -> 1001;
+ shutdown -> 1001;
+ {error, badframe} -> 1002;
+ {error, badencoding} -> 1007
+ end,
+ send({close, Code, <<>>}, State, EvHandler, EvHandlerState).
+closing(#ws_state{opts=Opts}) ->
+ Timeout = maps:get(closing_timeout, Opts, 15000),
+ {closing, Timeout}.
+
+close(_, _, _, EvHandlerState) ->
+ EvHandlerState.
+
+%% Send one frame.
send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
- socket=Socket, transport=Transport, extensions=Extensions},
- EvHandler, EvHandlerState0) ->
+ socket=Socket, transport=Transport, in=In, extensions=Extensions},
+ EvHandler, EvHandlerState0) when not is_list(Frame) ->
WsSendFrameEvent = #{
stream_ref => StreamRef,
reply_to => ReplyTo,
@@ -229,10 +243,28 @@ send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
EvHandlerState1 = EvHandler:ws_send_frame_start(WsSendFrameEvent, EvHandlerState0),
Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)),
EvHandlerState = EvHandler:ws_send_frame_end(WsSendFrameEvent, EvHandlerState1),
- case Frame of
- close -> {close, EvHandlerState};
- {close, _, _} -> {close, EvHandlerState};
- _ -> {{state, State}, EvHandlerState}
+ if
+ Frame =:= close; element(1, Frame) =:= close ->
+ {[
+ {state, State#ws_state{out=close}},
+ %% We can close immediately if we already received a close frame.
+ case In of
+ close -> close;
+ _ -> closing(State)
+ end
+ ], EvHandlerState};
+ true ->
+ {[], EvHandlerState}
+ end;
+%% Send many frames.
+send([], _, _, EvHandlerState) ->
+ {[], EvHandlerState};
+send([Frame|Tail], State, EvHandler, EvHandlerState0) ->
+ case send(Frame, State, EvHandler, EvHandlerState0) of
+ {[], EvHandlerState} ->
+ send(Tail, State, EvHandler, EvHandlerState);
+ Other ->
+ Other
end.
%% Websocket has no concept of streams.