From 265ece680c53f77d1685434d0636216c94021497 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 15 Jul 2019 16:18:16 +0200 Subject: Add Websocket frames related events --- src/gun.erl | 36 ++++++----- src/gun_default_event_h.erl | 20 ++++++ src/gun_event.erl | 55 ++++++++++++++-- src/gun_http.erl | 10 +-- src/gun_http2.erl | 6 +- src/gun_ws.erl | 154 +++++++++++++++++++++++++++++--------------- 6 files changed, 200 insertions(+), 81 deletions(-) (limited to 'src') diff --git a/src/gun.erl b/src/gun.erl index 803a5eb..12e4ae6 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -928,9 +928,11 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) -> ReplyTo ! {gun_error, self(), StreamRef, {badstate, "Websocket is only supported over HTTP/1.1."}}, keep_state_and_data; -connected(cast, {ws_send, Owner, Frame}, - State=#state{owner=Owner, protocol=Protocol=gun_ws, protocol_state=ProtoState}) -> - commands(Protocol:send(Frame, ProtoState), State); +connected(cast, {ws_send, Owner, Frame}, State=#state{ + owner=Owner, protocol=Protocol=gun_ws, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {Commands, EvHandlerState} = Protocol:send(Frame, ProtoState, EvHandler, EvHandlerState0), + commands(Commands, State#state{event_handler_state=EvHandlerState}); connected(cast, {ws_send, ReplyTo, _}, _) -> ReplyTo ! {gun_error, self(), {badstate, "Connection needs to be upgraded to Websocket " @@ -947,18 +949,19 @@ handle_common(cast, {shutdown, Owner}, _, #state{owner=Owner}) -> %% @todo Graceful shutdown. stop; %% We stop when the owner is down. -handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, _, #state{ +handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, _, State=#state{ owner=Owner, owner_ref=OwnerRef, socket=Socket, transport=Transport, - protocol=Protocol, protocol_state=ProtoState}) -> - _ = case Protocol of - undefined -> ok; - _ -> Protocol:close(owner_down, ProtoState) + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {_, EvHandlerState} = case Protocol of + undefined -> {ok, EvHandlerState0}; + _ -> Protocol:close(owner_down, ProtoState, EvHandler, EvHandlerState0) end, _ = case Socket of undefined -> ok; _ -> Transport:close(Socket) end, - owner_down(Reason); + owner_down(Reason, State#state{event_handler_state=EvHandlerState}); handle_common({call, From}, _, _, _) -> {keep_state_and_data, {reply, From, {error, bad_call}}}; %% @todo The ReplyTo patch disabled the notowner behavior. @@ -1023,9 +1026,9 @@ disconnect(State=#state{owner=Owner, opts=Opts, socket=Socket, transport=Transport, protocol=Protocol, protocol_state=ProtoState, event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) -> - Protocol:close(Reason, ProtoState), + {_, EvHandlerState1} = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0), %% @todo Need a special state for orderly shutdown of a connection. - Transport:close(Socket), + _ = Transport:close(Socket), %% We closed the socket, discard any remaining socket events. disconnect_flush(State), %% @todo Stop keepalive timeout, flush message. @@ -1035,7 +1038,7 @@ disconnect(State=#state{owner=Owner, opts=Opts, DisconnectEvent = #{ reason => Reason }, - EvHandlerState = EvHandler:disconnect(DisconnectEvent, EvHandlerState0), + EvHandlerState = EvHandler:disconnect(DisconnectEvent, EvHandlerState1), Retry = maps:get(retry, Opts, 5), case Retry of 0 -> @@ -1087,11 +1090,10 @@ keepalive_cancel(State=#state{keepalive_ref=KeepaliveRef}) -> end, State#state{keepalive_ref=undefined}. --spec owner_down(_) -> stop | {stop, _}. -owner_down(normal) -> stop; -owner_down(shutdown) -> {stop, shutdown}; -owner_down(Shutdown = {shutdown, _}) -> {stop, Shutdown}; -owner_down(Reason) -> {stop, {shutdown, {owner_down, Reason}}}. +owner_down(normal, State) -> {stop, normal, State}; +owner_down(shutdown, State) -> {stop, shutdown, State}; +owner_down(Shutdown = {shutdown, _}, State) -> {stop, Shutdown, State}; +owner_down(Reason, State) -> {stop, {shutdown, {owner_down, Reason}}, State}. terminate(Reason, StateName, #state{event_handler=EvHandler, event_handler_state=EvHandlerState}) -> diff --git a/src/gun_default_event_h.erl b/src/gun_default_event_h.erl index 6ef2e11..de63f17 100644 --- a/src/gun_default_event_h.erl +++ b/src/gun_default_event_h.erl @@ -28,6 +28,11 @@ -export([response_end/2]). -export([ws_upgrade/2]). -export([protocol_changed/2]). +-export([ws_recv_frame_start/2]). +-export([ws_recv_frame_header/2]). +-export([ws_recv_frame_end/2]). +-export([ws_send_frame_start/2]). +-export([ws_send_frame_end/2]). -export([disconnect/2]). -export([terminate/2]). @@ -70,6 +75,21 @@ ws_upgrade(_EventData, State) -> protocol_changed(_EventData, State) -> State. +ws_recv_frame_start(_EventData, State) -> + State. + +ws_recv_frame_header(_EventData, State) -> + State. + +ws_recv_frame_end(_EventData, State) -> + State. + +ws_send_frame_start(_EventData, State) -> + State. + +ws_send_frame_end(_EventData, State) -> + State. + disconnect(_EventData, State) -> State. diff --git a/src/gun_event.erl b/src/gun_event.erl index 9e83cf8..5332568 100644 --- a/src/gun_event.erl +++ b/src/gun_event.erl @@ -140,6 +140,56 @@ -callback protocol_changed(protocol_changed_event(), State) -> State. +%% ws_recv_frame_start. + +-type ws_recv_frame_start_event() :: #{ + stream_ref := reference(), + reply_to := pid(), + frag_state := cow_ws:frag_state(), + extensions := cow_ws:extensions() +}. + +-callback ws_recv_frame_start(ws_recv_frame_start_event(), State) -> State. + +%% ws_recv_frame_header. + +-type ws_recv_frame_header_event() :: #{ + stream_ref := reference(), + reply_to := pid(), + frag_state := cow_ws:frag_state(), + extensions := cow_ws:extensions(), + type := cow_ws:frame_type(), + rsv := cow_ws:rsv(), + len := non_neg_integer(), + mask_key := cow_ws:mask_key() +}. + +-callback ws_recv_frame_header(ws_recv_frame_header_event(), State) -> State. + +%% ws_recv_frame_end. + +-type ws_recv_frame_end_event() :: #{ + stream_ref := reference(), + reply_to := pid(), + extensions := cow_ws:extensions(), + close_code := undefined | cow_ws:close_code(), + payload := binary() +}. + +-callback ws_recv_frame_end(ws_recv_frame_end_event(), State) -> State. + +%% ws_send_frame_start/ws_send_frame_end. + +-type ws_send_frame_event() :: #{ + stream_ref := reference(), + reply_to := pid(), + extensions := cow_ws:extensions(), + frame := gun:ws_frame() +}. + +-callback ws_send_frame_start(ws_send_frame_event(), State) -> State. +-callback ws_send_frame_end(ws_send_frame_event(), State) -> State. + %% disconnect. -type disconnect_event() :: #{ @@ -167,8 +217,3 @@ %% @todo push_promise_end %% @todo cancel_start %% @todo cancel_end -%% @todo ws_frame_read_start -%% @todo ws_frame_read_header -%% @todo ws_frame_read_end -%% @todo ws_frame_write_start -%% @todo ws_frame_write_end diff --git a/src/gun_http.erl b/src/gun_http.erl index 7598eff..da72527 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -18,7 +18,7 @@ -export([name/0]). -export([init/4]). -export([handle/4]). --export([close/2]). +-export([close/4]). -export([keepalive/1]). -export([headers/10]). -export([request/11]). @@ -404,11 +404,11 @@ send_data_if_alive(_, State, _) -> State. %% @todo Use Reason. -close(_, State=#http_state{in=body_close, streams=[_|Tail]}) -> +close(_, State=#http_state{in=body_close, streams=[_|Tail]}, _, EvHandlerState) -> _ = send_data_if_alive(<<>>, State, fin), - close_streams(Tail); -close(_, #http_state{streams=Streams}) -> - close_streams(Streams). + {close_streams(Tail), EvHandlerState}; +close(_, #http_state{streams=Streams}, _, EvHandlerState) -> + {close_streams(Streams), EvHandlerState}. close_streams([]) -> ok; diff --git a/src/gun_http2.erl b/src/gun_http2.erl index a1ba46e..55e7c06 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -18,7 +18,7 @@ -export([name/0]). -export([init/4]). -export([handle/4]). --export([close/2]). +-export([close/4]). -export([keepalive/1]). -export([headers/10]). -export([request/11]). @@ -294,8 +294,8 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) -> end. %% @todo Use Reason. -close(_, #http2_state{streams=Streams}) -> - close_streams(Streams). +close(_, #http2_state{streams=Streams}, _, EvHandlerState) -> + {close_streams(Streams), EvHandlerState}. close_streams([]) -> ok; diff --git a/src/gun_ws.erl b/src/gun_ws.erl index ff54ecd..7acf74e 100644 --- a/src/gun_ws.erl +++ b/src/gun_ws.erl @@ -18,8 +18,8 @@ -export([name/0]). -export([init/8]). -export([handle/4]). --export([close/2]). --export([send/2]). +-export([close/4]). +-export([send/4]). -export([down/1]). -record(payload, { @@ -34,6 +34,7 @@ -record(ws_state, { owner :: pid(), + stream_ref :: reference(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), buffer = <<>> :: binary(), @@ -69,94 +70,145 @@ name() -> ws. init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) -> Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, HandlerState = Handler:init(Owner, StreamRef, Headers, Opts), - {switch_protocol, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport, - extensions=Extensions, handler=Handler, handler_state=HandlerState}}. - -handle(Data, State, _EvHandler, EvHandlerState) -> - {handle(Data, State), EvHandlerState}. + {switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef, + socket=Socket, transport=Transport, extensions=Extensions, + handler=Handler, handler_state=HandlerState}}. %% Do not handle anything if we received a close frame. -handle(_, State=#ws_state{in=close}) -> - {state, State}; +handle(_, State=#ws_state{in=close}, _, EvHandlerState) -> + {{state, State}, EvHandlerState}; %% Shortcut for common case when Data is empty after processing a frame. -handle(<<>>, State=#ws_state{in=head}) -> - {state, State}; -handle(Data, State=#ws_state{buffer=Buffer, in=head, frag_state=FragState, extensions=Extensions}) -> +handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) -> + {{state, State}, EvHandlerState}; +handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer, + in=head, frag_state=FragState, extensions=Extensions}, + EvHandler, EvHandlerState0) -> + %% Send the event only if there was no data in the buffer. + %% If there is data in the buffer then we already sent the event. + EvHandlerState1 = case Buffer of + <<>> -> + EvHandler:ws_recv_frame_start(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + frag_state => FragState, + extensions => Extensions + }, EvHandlerState0); + _ -> + EvHandlerState0 + end, Data2 = << Buffer/binary, Data/binary >>, case cow_ws:parse_header(Data2, Extensions, FragState) of {Type, FragState2, Rsv, Len, MaskKey, Rest} -> + EvHandlerState = EvHandler:ws_recv_frame_header(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + frag_state => FragState2, + extensions => Extensions, + type => Type, + rsv => Rsv, + len => Len, + mask_key => MaskKey + }, EvHandlerState1), handle(Rest, State#ws_state{buffer= <<>>, - in=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey}, frag_state=FragState2}); + in=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey}, + frag_state=FragState2}, EvHandler, EvHandlerState); more -> - {state, State#ws_state{buffer=Data2}}; + {{state, State#ws_state{buffer=Data2}}, EvHandlerState1}; error -> - close({error, badframe}, State) + close({error, badframe}, State, EvHandler, EvHandlerState1) end; -handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey, close_code=CloseCode, - unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState, utf8_state=Utf8State, extensions=Extensions}) -> +handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey, + close_code=CloseCode, unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState, + utf8_state=Utf8State, extensions=Extensions}, EvHandler, EvHandlerState) -> case cow_ws:parse_payload(Data, MaskKey, Utf8State, UnmaskedLen, Type, Len, FragState, Extensions, Rsv) of {ok, CloseCode2, Payload, Utf8State2, Rest} -> - dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, << Unmasked/binary, Payload/binary >>, CloseCode2); + dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, + <>, CloseCode2, + EvHandler, EvHandlerState); {ok, Payload, Utf8State2, Rest} -> - dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, << Unmasked/binary, Payload/binary >>, CloseCode); + dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, + <>, CloseCode, + EvHandler, EvHandlerState); {more, CloseCode2, Payload, Utf8State2} -> - {state, State#ws_state{in=In#payload{close_code=CloseCode2, unmasked= << Unmasked/binary, Payload/binary >>, - len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}}; + {{state, State#ws_state{in=In#payload{close_code=CloseCode2, + unmasked= <>, + len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}}, + EvHandlerState}; {more, Payload, Utf8State2} -> - {state, State#ws_state{in=In#payload{unmasked= << Unmasked/binary, Payload/binary >>, - len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}}; + {{state, State#ws_state{in=In#payload{unmasked= <>, + len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}}, + EvHandlerState}; Error = {error, _Reason} -> - close(Error, State) + close(Error, State, EvHandler, EvHandlerState) end. -dispatch(Rest, State0=#ws_state{frag_state=FragState, +dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef, + frag_state=FragState, extensions=Extensions, handler=Handler, handler_state=HandlerState0}, - Type0, Payload0, CloseCode0) -> - case cow_ws:make_frame(Type0, Payload0, CloseCode0, FragState) of + Type, Payload, CloseCode, EvHandler, EvHandlerState0) -> + EvHandlerState1 = EvHandler:ws_recv_frame_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + extensions => Extensions, + close_code => CloseCode, + payload => Payload + }, EvHandlerState0), + case cow_ws:make_frame(Type, Payload, CloseCode, FragState) of ping -> - {state, State} = send(pong, State0), - handle(Rest, State); + {{state, State}, EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1), + handle(Rest, State, EvHandler, EvHandlerState); {ping, Payload} -> - {state, State} = send({pong, Payload}, State0), - handle(Rest, State); + {{state, State}, EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1), + handle(Rest, State, EvHandler, EvHandlerState); pong -> - handle(Rest, State0); + handle(Rest, State0, EvHandler, EvHandlerState1); {pong, _} -> - handle(Rest, State0); + handle(Rest, State0, EvHandler, EvHandlerState1); Frame -> HandlerState = Handler:handle(Frame, HandlerState0), - State = State0#ws_state{handler_state=HandlerState}, - case Frame of - close -> handle(Rest, State#ws_state{in=close}); - {close, _, _} -> handle(Rest, State#ws_state{in=close}); - {fragment, fin, _, _} -> handle(Rest, State#ws_state{frag_state=undefined}); - _ -> handle(Rest, State) - end + State1 = State0#ws_state{handler_state=HandlerState}, + State = case Frame of + close -> State1#ws_state{in=close}; + {close, _, _} -> State1#ws_state{in=close}; + {fragment, fin, _, _} -> State1#ws_state{frag_state=undefined}; + _ -> State1 + end, + handle(Rest, State, EvHandler, EvHandlerState1) end. -close(Reason, State) -> +close(Reason, State, EvHandler, EvHandlerState) -> case Reason of normal -> - send({close, 1000, <<>>}, State); + send({close, 1000, <<>>}, State, EvHandler, EvHandlerState); owner_down -> - send({close, 1001, <<>>}, State); + send({close, 1001, <<>>}, State, EvHandler, EvHandlerState); {error, badframe} -> - send({close, 1002, <<>>}, State); + send({close, 1002, <<>>}, State, EvHandler, EvHandlerState); {error, badencoding} -> - send({close, 1007, <<>>}, State); + send({close, 1007, <<>>}, State, EvHandler, EvHandlerState); %% Socket errors; do nothing. closed -> - ok; + {ok, EvHandlerState}; {error, _} -> - ok + {ok, EvHandlerState} end. -send(Frame, State=#ws_state{socket=Socket, transport=Transport, extensions=Extensions}) -> +send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, + socket=Socket, transport=Transport, extensions=Extensions}, + EvHandler, EvHandlerState0) -> + WsSendFrameEvent = #{ + stream_ref => StreamRef, + reply_to => ReplyTo, + extensions => Extensions, + frame => Frame + }, + EvHandlerState1 = EvHandler:ws_send_frame_start(WsSendFrameEvent, EvHandlerState0), Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)), + EvHandlerState = EvHandler:ws_send_frame_end(WsSendFrameEvent, EvHandlerState1), case Frame of - close -> close; - {close, _, _} -> close; - _ -> {state, State} + close -> {close, EvHandlerState}; + {close, _, _} -> {close, EvHandlerState}; + _ -> {{state, State}, EvHandlerState} end. %% Websocket has no concept of streams. -- cgit v1.2.3