diff options
Diffstat (limited to 'src/gun_ws.erl')
-rw-r--r-- | src/gun_ws.erl | 45 |
1 files changed, 25 insertions, 20 deletions
diff --git a/src/gun_ws.erl b/src/gun_ws.erl index ba61577..15c4a81 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -25,7 +25,7 @@ -export([closing/4]). -export([close/4]). -export([keepalive/3]). --export([send/4]). +-export([ws_send/5]). -export([down/1]). -record(payload, { @@ -39,7 +39,7 @@ }). -record(ws_state, { - owner :: pid(), + reply_to :: pid(), stream_ref :: reference(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), @@ -79,6 +79,8 @@ do_check_options([Opt={protocols, L}|Opts]) when is_list(L) -> [true] -> do_check_options(Opts); _ -> {error, {options, {ws, Opt}}} end; +do_check_options([{reply_to, P}|Opts]) when is_pid(P) -> + do_check_options(Opts); do_check_options([{silence_pings, B}|Opts]) when B =:= true; B =:= false -> do_check_options(Opts); do_check_options([{user_opts, _}|Opts]) -> @@ -91,10 +93,10 @@ opts_name() -> ws_opts. has_keepalive() -> true. default_keepalive() -> 5000. -init(Owner, Socket, Transport, #{stream_ref := StreamRef, headers := Headers, +init(ReplyTo, Socket, Transport, #{stream_ref := StreamRef, headers := Headers, extensions := Extensions, flow := InitialFlow, handler := Handler, opts := Opts}) -> - {ok, HandlerState} = Handler:init(Owner, StreamRef, Headers, Opts), - {connected_ws_only, #ws_state{owner=Owner, stream_ref=StreamRef, + {ok, HandlerState} = Handler:init(ReplyTo, StreamRef, Headers, Opts), + {connected_ws_only, #ws_state{reply_to=ReplyTo, stream_ref=StreamRef, socket=Socket, transport=Transport, opts=Opts, extensions=Extensions, flow=InitialFlow, handler=Handler, handler_state=HandlerState}}. @@ -107,7 +109,7 @@ handle(_, State=#ws_state{in=close}, EvHandler, EvHandlerState) -> %% Shortcut for common case when Data is empty after processing a frame. handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) -> maybe_active(State, EvHandlerState); -handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer, +handle(Data, State=#ws_state{reply_to=ReplyTo, stream_ref=StreamRef, buffer=Buffer, in=head, frag_state=FragState, extensions=Extensions}, EvHandler, EvHandlerState0) -> %% Send the event only if there was no data in the buffer. @@ -175,7 +177,7 @@ maybe_active(State=#ws_state{flow=Flow}, EvHandlerState) -> {active, Flow > 0} ], EvHandlerState}. -dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, +dispatch(Rest, State0=#ws_state{reply_to=ReplyTo, stream_ref=StreamRef, frag_state=FragState, extensions=Extensions, flow=Flow0, handler=Handler, handler_state=HandlerState0}, Type, Payload, CloseCode, EvHandler, EvHandlerState0) -> @@ -196,10 +198,10 @@ dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, State1 = State0#ws_state{flow=Flow, handler_state=HandlerState}, {State, EvHandlerState} = case Frame of ping -> - {[], EvHandlerState2} = send(pong, State1, EvHandler, EvHandlerState1), + {[], EvHandlerState2} = send(pong, State1, ReplyTo, EvHandler, EvHandlerState1), {State1, EvHandlerState2}; {ping, Payload} -> - {[], EvHandlerState2} = send({pong, Payload}, State1, EvHandler, EvHandlerState1), + {[], EvHandlerState2} = send({pong, Payload}, State1, ReplyTo, EvHandler, EvHandlerState1), {State1, EvHandlerState2}; close -> {State1#ws_state{in=close}, EvHandlerState1}; @@ -226,7 +228,7 @@ update_flow(State=#ws_state{flow=Flow0}, _ReplyTo, _StreamRef, Inc) -> %% The user already sent the close frame; do nothing. closing(_, State=#ws_state{out=close}, _, EvHandlerState) -> {closing(State), EvHandlerState}; -closing(Reason, State, EvHandler, EvHandlerState) -> +closing(Reason, State=#ws_state{reply_to=ReplyTo}, EvHandler, EvHandlerState) -> Code = case Reason of normal -> 1000; owner_down -> 1001; @@ -234,7 +236,7 @@ closing(Reason, State, EvHandler, EvHandlerState) -> {error, badframe} -> 1002; {error, badencoding} -> 1007 end, - send({close, Code, <<>>}, State, EvHandler, EvHandlerState). + send({close, Code, <<>>}, State, ReplyTo, EvHandler, EvHandlerState). closing(#ws_state{opts=Opts}) -> Timeout = maps:get(closing_timeout, Opts, 15000), @@ -243,14 +245,14 @@ closing(#ws_state{opts=Opts}) -> close(_, _, _, EvHandlerState) -> EvHandlerState. -keepalive(State, EvHandler, EvHandlerState0) -> - {[], EvHandlerState} = send(ping, State, EvHandler, EvHandlerState0), +keepalive(State=#ws_state{reply_to=ReplyTo}, EvHandler, EvHandlerState0) -> + {[], EvHandlerState} = send(ping, State, ReplyTo, EvHandler, EvHandlerState0), {State, EvHandlerState}. %% Send one frame. -send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, +send(Frame, State=#ws_state{stream_ref=StreamRef, socket=Socket, transport=Transport, in=In, extensions=Extensions}, - EvHandler, EvHandlerState0) when not is_list(Frame) -> + ReplyTo, EvHandler, EvHandlerState0) -> WsSendFrameEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo, @@ -272,14 +274,17 @@ send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, ], EvHandlerState}; true -> {[], EvHandlerState} - end; + end. + %% Send many frames. -send([], _, _, EvHandlerState) -> +ws_send(Frame, State, ReplyTo, EvHandler, EvHandlerState) when not is_list(Frame) -> + send(Frame, State, ReplyTo, EvHandler, EvHandlerState); +ws_send([], _, _, _, EvHandlerState) -> {[], EvHandlerState}; -send([Frame|Tail], State, EvHandler, EvHandlerState0) -> - case send(Frame, State, EvHandler, EvHandlerState0) of +ws_send([Frame|Tail], State, ReplyTo, EvHandler, EvHandlerState0) -> + case send(Frame, State, ReplyTo, EvHandler, EvHandlerState0) of {[], EvHandlerState} -> - send(Tail, State, EvHandler, EvHandlerState); + ws_send(Tail, State, ReplyTo, EvHandler, EvHandlerState); Other -> Other end. |