From 265ece680c53f77d1685434d0636216c94021497 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 15 Jul 2019 16:18:16 +0200 Subject: Add Websocket frames related events --- src/gun_ws.erl | 154 ++++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 103 insertions(+), 51 deletions(-) (limited to 'src/gun_ws.erl') diff --git a/src/gun_ws.erl b/src/gun_ws.erl index ff54ecd..7acf74e 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -18,8 +18,8 @@ -export([name/0]). -export([init/8]). -export([handle/4]). --export([close/2]). --export([send/2]). +-export([close/4]). +-export([send/4]). -export([down/1]). -record(payload, { @@ -34,6 +34,7 @@ -record(ws_state, { owner :: pid(), + stream_ref :: reference(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), buffer = <<>> :: binary(), @@ -69,94 +70,145 @@ name() -> ws. init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) -> Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, HandlerState = Handler:init(Owner, StreamRef, Headers, Opts), - {switch_protocol, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport, - extensions=Extensions, handler=Handler, handler_state=HandlerState}}. - -handle(Data, State, _EvHandler, EvHandlerState) -> - {handle(Data, State), EvHandlerState}. + {switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef, + socket=Socket, transport=Transport, extensions=Extensions, + handler=Handler, handler_state=HandlerState}}. %% Do not handle anything if we received a close frame. -handle(_, State=#ws_state{in=close}) -> - {state, State}; +handle(_, State=#ws_state{in=close}, _, EvHandlerState) -> + {{state, State}, EvHandlerState}; %% Shortcut for common case when Data is empty after processing a frame. -handle(<<>>, State=#ws_state{in=head}) -> - {state, State}; -handle(Data, State=#ws_state{buffer=Buffer, in=head, frag_state=FragState, extensions=Extensions}) -> +handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) -> + {{state, State}, EvHandlerState}; +handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer, + in=head, frag_state=FragState, extensions=Extensions}, + EvHandler, EvHandlerState0) -> + %% Send the event only if there was no data in the buffer. + %% If there is data in the buffer then we already sent the event. + EvHandlerState1 = case Buffer of + <<>> -> + EvHandler:ws_recv_frame_start(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + frag_state => FragState, + extensions => Extensions + }, EvHandlerState0); + _ -> + EvHandlerState0 + end, Data2 = << Buffer/binary, Data/binary >>, case cow_ws:parse_header(Data2, Extensions, FragState) of {Type, FragState2, Rsv, Len, MaskKey, Rest} -> + EvHandlerState = EvHandler:ws_recv_frame_header(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + frag_state => FragState2, + extensions => Extensions, + type => Type, + rsv => Rsv, + len => Len, + mask_key => MaskKey + }, EvHandlerState1), handle(Rest, State#ws_state{buffer= <<>>, - in=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey}, frag_state=FragState2}); + in=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey}, + frag_state=FragState2}, EvHandler, EvHandlerState); more -> - {state, State#ws_state{buffer=Data2}}; + {{state, State#ws_state{buffer=Data2}}, EvHandlerState1}; error -> - close({error, badframe}, State) + close({error, badframe}, State, EvHandler, EvHandlerState1) end; -handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey, close_code=CloseCode, - unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState, utf8_state=Utf8State, extensions=Extensions}) -> +handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey, + close_code=CloseCode, unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState, + utf8_state=Utf8State, extensions=Extensions}, EvHandler, EvHandlerState) -> case cow_ws:parse_payload(Data, MaskKey, Utf8State, UnmaskedLen, Type, Len, FragState, Extensions, Rsv) of {ok, CloseCode2, Payload, Utf8State2, Rest} -> - dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, << Unmasked/binary, Payload/binary >>, CloseCode2); + dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, + <>, CloseCode2, + EvHandler, EvHandlerState); {ok, Payload, Utf8State2, Rest} -> - dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, << Unmasked/binary, Payload/binary >>, CloseCode); + dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, + <>, CloseCode, + EvHandler, EvHandlerState); {more, CloseCode2, Payload, Utf8State2} -> - {state, State#ws_state{in=In#payload{close_code=CloseCode2, unmasked= << Unmasked/binary, Payload/binary >>, - len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}}; + {{state, State#ws_state{in=In#payload{close_code=CloseCode2, + unmasked= <>, + len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}}, + EvHandlerState}; {more, Payload, Utf8State2} -> - {state, State#ws_state{in=In#payload{unmasked= << Unmasked/binary, Payload/binary >>, - len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}}; + {{state, State#ws_state{in=In#payload{unmasked= <>, + len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}}, + EvHandlerState}; Error = {error, _Reason} -> - close(Error, State) + close(Error, State, EvHandler, EvHandlerState) end. -dispatch(Rest, State0=#ws_state{frag_state=FragState, +dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, + frag_state=FragState, extensions=Extensions, handler=Handler, handler_state=HandlerState0}, - Type0, Payload0, CloseCode0) -> - case cow_ws:make_frame(Type0, Payload0, CloseCode0, FragState) of + Type, Payload, CloseCode, EvHandler, EvHandlerState0) -> + EvHandlerState1 = EvHandler:ws_recv_frame_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + extensions => Extensions, + close_code => CloseCode, + payload => Payload + }, EvHandlerState0), + case cow_ws:make_frame(Type, Payload, CloseCode, FragState) of ping -> - {state, State} = send(pong, State0), - handle(Rest, State); + {{state, State}, EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1), + handle(Rest, State, EvHandler, EvHandlerState); {ping, Payload} -> - {state, State} = send({pong, Payload}, State0), - handle(Rest, State); + {{state, State}, EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1), + handle(Rest, State, EvHandler, EvHandlerState); pong -> - handle(Rest, State0); + handle(Rest, State0, EvHandler, EvHandlerState1); {pong, _} -> - handle(Rest, State0); + handle(Rest, State0, EvHandler, EvHandlerState1); Frame -> HandlerState = Handler:handle(Frame, HandlerState0), - State = State0#ws_state{handler_state=HandlerState}, - case Frame of - close -> handle(Rest, State#ws_state{in=close}); - {close, _, _} -> handle(Rest, State#ws_state{in=close}); - {fragment, fin, _, _} -> handle(Rest, State#ws_state{frag_state=undefined}); - _ -> handle(Rest, State) - end + State1 = State0#ws_state{handler_state=HandlerState}, + State = case Frame of + close -> State1#ws_state{in=close}; + {close, _, _} -> State1#ws_state{in=close}; + {fragment, fin, _, _} -> State1#ws_state{frag_state=undefined}; + _ -> State1 + end, + handle(Rest, State, EvHandler, EvHandlerState1) end. -close(Reason, State) -> +close(Reason, State, EvHandler, EvHandlerState) -> case Reason of normal -> - send({close, 1000, <<>>}, State); + send({close, 1000, <<>>}, State, EvHandler, EvHandlerState); owner_down -> - send({close, 1001, <<>>}, State); + send({close, 1001, <<>>}, State, EvHandler, EvHandlerState); {error, badframe} -> - send({close, 1002, <<>>}, State); + send({close, 1002, <<>>}, State, EvHandler, EvHandlerState); {error, badencoding} -> - send({close, 1007, <<>>}, State); + send({close, 1007, <<>>}, State, EvHandler, EvHandlerState); %% Socket errors; do nothing. closed -> - ok; + {ok, EvHandlerState}; {error, _} -> - ok + {ok, EvHandlerState} end. -send(Frame, State=#ws_state{socket=Socket, transport=Transport, extensions=Extensions}) -> +send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, + socket=Socket, transport=Transport, extensions=Extensions}, + EvHandler, EvHandlerState0) -> + WsSendFrameEvent = #{ + stream_ref => StreamRef, + reply_to => ReplyTo, + extensions => Extensions, + frame => Frame + }, + EvHandlerState1 = EvHandler:ws_send_frame_start(WsSendFrameEvent, EvHandlerState0), Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)), + EvHandlerState = EvHandler:ws_send_frame_end(WsSendFrameEvent, EvHandlerState1), case Frame of - close -> close; - {close, _, _} -> close; - _ -> {state, State} + close -> {close, EvHandlerState}; + {close, _, _} -> {close, EvHandlerState}; + _ -> {{state, State}, EvHandlerState} end. %% Websocket has no concept of streams. -- cgit v1.2.3