aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-07-15 16:18:16 +0200
committerLoïc Hoguin <[email protected]>2019-07-15 16:18:16 +0200
commit265ece680c53f77d1685434d0636216c94021497 (patch)
treec50428fe30c4af079f568a76dd5e091f2a2fe395 /src
parentc7138443995ebd56f061b85e5ee0aebb5c04a00e (diff)
downloadgun-265ece680c53f77d1685434d0636216c94021497.tar.gz
gun-265ece680c53f77d1685434d0636216c94021497.tar.bz2
gun-265ece680c53f77d1685434d0636216c94021497.zip
Add Websocket frames related events
Diffstat (limited to 'src')
-rw-r--r--src/gun.erl36
-rw-r--r--src/gun_default_event_h.erl20
-rw-r--r--src/gun_event.erl55
-rw-r--r--src/gun_http.erl10
-rw-r--r--src/gun_http2.erl6
-rw-r--r--src/gun_ws.erl154
6 files changed, 200 insertions, 81 deletions
diff --git a/src/gun.erl b/src/gun.erl
index 803a5eb..12e4ae6 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -928,9 +928,11 @@ connected(cast, {ws_upgrade, ReplyTo, StreamRef, _, _, _}, _) ->
ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"Websocket is only supported over HTTP/1.1."}},
keep_state_and_data;
-connected(cast, {ws_send, Owner, Frame},
- State=#state{owner=Owner, protocol=Protocol=gun_ws, protocol_state=ProtoState}) ->
- commands(Protocol:send(Frame, ProtoState), State);
+connected(cast, {ws_send, Owner, Frame}, State=#state{
+ owner=Owner, protocol=Protocol=gun_ws, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {Commands, EvHandlerState} = Protocol:send(Frame, ProtoState, EvHandler, EvHandlerState0),
+ commands(Commands, State#state{event_handler_state=EvHandlerState});
connected(cast, {ws_send, ReplyTo, _}, _) ->
ReplyTo ! {gun_error, self(), {badstate,
"Connection needs to be upgraded to Websocket "
@@ -947,18 +949,19 @@ handle_common(cast, {shutdown, Owner}, _, #state{owner=Owner}) ->
%% @todo Graceful shutdown.
stop;
%% We stop when the owner is down.
-handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, _, #state{
+handle_common(info, {'DOWN', OwnerRef, process, Owner, Reason}, _, State=#state{
owner=Owner, owner_ref=OwnerRef, socket=Socket, transport=Transport,
- protocol=Protocol, protocol_state=ProtoState}) ->
- _ = case Protocol of
- undefined -> ok;
- _ -> Protocol:close(owner_down, ProtoState)
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {_, EvHandlerState} = case Protocol of
+ undefined -> {ok, EvHandlerState0};
+ _ -> Protocol:close(owner_down, ProtoState, EvHandler, EvHandlerState0)
end,
_ = case Socket of
undefined -> ok;
_ -> Transport:close(Socket)
end,
- owner_down(Reason);
+ owner_down(Reason, State#state{event_handler_state=EvHandlerState});
handle_common({call, From}, _, _, _) ->
{keep_state_and_data, {reply, From, {error, bad_call}}};
%% @todo The ReplyTo patch disabled the notowner behavior.
@@ -1023,9 +1026,9 @@ disconnect(State=#state{owner=Owner, opts=Opts,
socket=Socket, transport=Transport,
protocol=Protocol, protocol_state=ProtoState,
event_handler=EvHandler, event_handler_state=EvHandlerState0}, Reason) ->
- Protocol:close(Reason, ProtoState),
+ {_, EvHandlerState1} = Protocol:close(Reason, ProtoState, EvHandler, EvHandlerState0),
%% @todo Need a special state for orderly shutdown of a connection.
- Transport:close(Socket),
+ _ = Transport:close(Socket),
%% We closed the socket, discard any remaining socket events.
disconnect_flush(State),
%% @todo Stop keepalive timeout, flush message.
@@ -1035,7 +1038,7 @@ disconnect(State=#state{owner=Owner, opts=Opts,
DisconnectEvent = #{
reason => Reason
},
- EvHandlerState = EvHandler:disconnect(DisconnectEvent, EvHandlerState0),
+ EvHandlerState = EvHandler:disconnect(DisconnectEvent, EvHandlerState1),
Retry = maps:get(retry, Opts, 5),
case Retry of
0 ->
@@ -1087,11 +1090,10 @@ keepalive_cancel(State=#state{keepalive_ref=KeepaliveRef}) ->
end,
State#state{keepalive_ref=undefined}.
--spec owner_down(_) -> stop | {stop, _}.
-owner_down(normal) -> stop;
-owner_down(shutdown) -> {stop, shutdown};
-owner_down(Shutdown = {shutdown, _}) -> {stop, Shutdown};
-owner_down(Reason) -> {stop, {shutdown, {owner_down, Reason}}}.
+owner_down(normal, State) -> {stop, normal, State};
+owner_down(shutdown, State) -> {stop, shutdown, State};
+owner_down(Shutdown = {shutdown, _}, State) -> {stop, Shutdown, State};
+owner_down(Reason, State) -> {stop, {shutdown, {owner_down, Reason}}, State}.
terminate(Reason, StateName, #state{event_handler=EvHandler,
event_handler_state=EvHandlerState}) ->
diff --git a/src/gun_default_event_h.erl b/src/gun_default_event_h.erl
index 6ef2e11..de63f17 100644
--- a/src/gun_default_event_h.erl
+++ b/src/gun_default_event_h.erl
@@ -28,6 +28,11 @@
-export([response_end/2]).
-export([ws_upgrade/2]).
-export([protocol_changed/2]).
+-export([ws_recv_frame_start/2]).
+-export([ws_recv_frame_header/2]).
+-export([ws_recv_frame_end/2]).
+-export([ws_send_frame_start/2]).
+-export([ws_send_frame_end/2]).
-export([disconnect/2]).
-export([terminate/2]).
@@ -70,6 +75,21 @@ ws_upgrade(_EventData, State) ->
protocol_changed(_EventData, State) ->
State.
+ws_recv_frame_start(_EventData, State) ->
+ State.
+
+ws_recv_frame_header(_EventData, State) ->
+ State.
+
+ws_recv_frame_end(_EventData, State) ->
+ State.
+
+ws_send_frame_start(_EventData, State) ->
+ State.
+
+ws_send_frame_end(_EventData, State) ->
+ State.
+
disconnect(_EventData, State) ->
State.
diff --git a/src/gun_event.erl b/src/gun_event.erl
index 9e83cf8..5332568 100644
--- a/src/gun_event.erl
+++ b/src/gun_event.erl
@@ -140,6 +140,56 @@
-callback protocol_changed(protocol_changed_event(), State) -> State.
+%% ws_recv_frame_start.
+
+-type ws_recv_frame_start_event() :: #{
+ stream_ref := reference(),
+ reply_to := pid(),
+ frag_state := cow_ws:frag_state(),
+ extensions := cow_ws:extensions()
+}.
+
+-callback ws_recv_frame_start(ws_recv_frame_start_event(), State) -> State.
+
+%% ws_recv_frame_header.
+
+-type ws_recv_frame_header_event() :: #{
+ stream_ref := reference(),
+ reply_to := pid(),
+ frag_state := cow_ws:frag_state(),
+ extensions := cow_ws:extensions(),
+ type := cow_ws:frame_type(),
+ rsv := cow_ws:rsv(),
+ len := non_neg_integer(),
+ mask_key := cow_ws:mask_key()
+}.
+
+-callback ws_recv_frame_header(ws_recv_frame_header_event(), State) -> State.
+
+%% ws_recv_frame_end.
+
+-type ws_recv_frame_end_event() :: #{
+ stream_ref := reference(),
+ reply_to := pid(),
+ extensions := cow_ws:extensions(),
+ close_code := undefined | cow_ws:close_code(),
+ payload := binary()
+}.
+
+-callback ws_recv_frame_end(ws_recv_frame_end_event(), State) -> State.
+
+%% ws_send_frame_start/ws_send_frame_end.
+
+-type ws_send_frame_event() :: #{
+ stream_ref := reference(),
+ reply_to := pid(),
+ extensions := cow_ws:extensions(),
+ frame := gun:ws_frame()
+}.
+
+-callback ws_send_frame_start(ws_send_frame_event(), State) -> State.
+-callback ws_send_frame_end(ws_send_frame_event(), State) -> State.
+
%% disconnect.
-type disconnect_event() :: #{
@@ -167,8 +217,3 @@
%% @todo push_promise_end
%% @todo cancel_start
%% @todo cancel_end
-%% @todo ws_frame_read_start
-%% @todo ws_frame_read_header
-%% @todo ws_frame_read_end
-%% @todo ws_frame_write_start
-%% @todo ws_frame_write_end
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 7598eff..da72527 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -18,7 +18,7 @@
-export([name/0]).
-export([init/4]).
-export([handle/4]).
--export([close/2]).
+-export([close/4]).
-export([keepalive/1]).
-export([headers/10]).
-export([request/11]).
@@ -404,11 +404,11 @@ send_data_if_alive(_, State, _) ->
State.
%% @todo Use Reason.
-close(_, State=#http_state{in=body_close, streams=[_|Tail]}) ->
+close(_, State=#http_state{in=body_close, streams=[_|Tail]}, _, EvHandlerState) ->
_ = send_data_if_alive(<<>>, State, fin),
- close_streams(Tail);
-close(_, #http_state{streams=Streams}) ->
- close_streams(Streams).
+ {close_streams(Tail), EvHandlerState};
+close(_, #http_state{streams=Streams}, _, EvHandlerState) ->
+ {close_streams(Streams), EvHandlerState}.
close_streams([]) ->
ok;
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index a1ba46e..55e7c06 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -18,7 +18,7 @@
-export([name/0]).
-export([init/4]).
-export([handle/4]).
--export([close/2]).
+-export([close/4]).
-export([keepalive/1]).
-export([headers/10]).
-export([request/11]).
@@ -294,8 +294,8 @@ ignored_frame(State=#http2_state{http2_machine=HTTP2Machine0}) ->
end.
%% @todo Use Reason.
-close(_, #http2_state{streams=Streams}) ->
- close_streams(Streams).
+close(_, #http2_state{streams=Streams}, _, EvHandlerState) ->
+ {close_streams(Streams), EvHandlerState}.
close_streams([]) ->
ok;
diff --git a/src/gun_ws.erl b/src/gun_ws.erl
index ff54ecd..7acf74e 100644
--- a/src/gun_ws.erl
+++ b/src/gun_ws.erl
@@ -18,8 +18,8 @@
-export([name/0]).
-export([init/8]).
-export([handle/4]).
--export([close/2]).
--export([send/2]).
+-export([close/4]).
+-export([send/4]).
-export([down/1]).
-record(payload, {
@@ -34,6 +34,7 @@
-record(ws_state, {
owner :: pid(),
+ stream_ref :: reference(),
socket :: inet:socket() | ssl:sslsocket(),
transport :: module(),
buffer = <<>> :: binary(),
@@ -69,94 +70,145 @@ name() -> ws.
init(Owner, Socket, Transport, StreamRef, Headers, Extensions, Handler, Opts) ->
Owner ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
HandlerState = Handler:init(Owner, StreamRef, Headers, Opts),
- {switch_protocol, ?MODULE, #ws_state{owner=Owner, socket=Socket, transport=Transport,
- extensions=Extensions, handler=Handler, handler_state=HandlerState}}.
-
-handle(Data, State, _EvHandler, EvHandlerState) ->
- {handle(Data, State), EvHandlerState}.
+ {switch_protocol, ?MODULE, #ws_state{owner=Owner, stream_ref=StreamRef,
+ socket=Socket, transport=Transport, extensions=Extensions,
+ handler=Handler, handler_state=HandlerState}}.
%% Do not handle anything if we received a close frame.
-handle(_, State=#ws_state{in=close}) ->
- {state, State};
+handle(_, State=#ws_state{in=close}, _, EvHandlerState) ->
+ {{state, State}, EvHandlerState};
%% Shortcut for common case when Data is empty after processing a frame.
-handle(<<>>, State=#ws_state{in=head}) ->
- {state, State};
-handle(Data, State=#ws_state{buffer=Buffer, in=head, frag_state=FragState, extensions=Extensions}) ->
+handle(<<>>, State=#ws_state{in=head}, _, EvHandlerState) ->
+ {{state, State}, EvHandlerState};
+handle(Data, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef, buffer=Buffer,
+ in=head, frag_state=FragState, extensions=Extensions},
+ EvHandler, EvHandlerState0) ->
+ %% Send the event only if there was no data in the buffer.
+ %% If there is data in the buffer then we already sent the event.
+ EvHandlerState1 = case Buffer of
+ <<>> ->
+ EvHandler:ws_recv_frame_start(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ frag_state => FragState,
+ extensions => Extensions
+ }, EvHandlerState0);
+ _ ->
+ EvHandlerState0
+ end,
Data2 = << Buffer/binary, Data/binary >>,
case cow_ws:parse_header(Data2, Extensions, FragState) of
{Type, FragState2, Rsv, Len, MaskKey, Rest} ->
+ EvHandlerState = EvHandler:ws_recv_frame_header(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ frag_state => FragState2,
+ extensions => Extensions,
+ type => Type,
+ rsv => Rsv,
+ len => Len,
+ mask_key => MaskKey
+ }, EvHandlerState1),
handle(Rest, State#ws_state{buffer= <<>>,
- in=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey}, frag_state=FragState2});
+ in=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey},
+ frag_state=FragState2}, EvHandler, EvHandlerState);
more ->
- {state, State#ws_state{buffer=Data2}};
+ {{state, State#ws_state{buffer=Data2}}, EvHandlerState1};
error ->
- close({error, badframe}, State)
+ close({error, badframe}, State, EvHandler, EvHandlerState1)
end;
-handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey, close_code=CloseCode,
- unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState, utf8_state=Utf8State, extensions=Extensions}) ->
+handle(Data, State=#ws_state{in=In=#payload{type=Type, rsv=Rsv, len=Len, mask_key=MaskKey,
+ close_code=CloseCode, unmasked=Unmasked, unmasked_len=UnmaskedLen}, frag_state=FragState,
+ utf8_state=Utf8State, extensions=Extensions}, EvHandler, EvHandlerState) ->
case cow_ws:parse_payload(Data, MaskKey, Utf8State, UnmaskedLen, Type, Len, FragState, Extensions, Rsv) of
{ok, CloseCode2, Payload, Utf8State2, Rest} ->
- dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, << Unmasked/binary, Payload/binary >>, CloseCode2);
+ dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type,
+ <<Unmasked/binary, Payload/binary>>, CloseCode2,
+ EvHandler, EvHandlerState);
{ok, Payload, Utf8State2, Rest} ->
- dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type, << Unmasked/binary, Payload/binary >>, CloseCode);
+ dispatch(Rest, State#ws_state{in=head, utf8_state=Utf8State2}, Type,
+ <<Unmasked/binary, Payload/binary>>, CloseCode,
+ EvHandler, EvHandlerState);
{more, CloseCode2, Payload, Utf8State2} ->
- {state, State#ws_state{in=In#payload{close_code=CloseCode2, unmasked= << Unmasked/binary, Payload/binary >>,
- len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}};
+ {{state, State#ws_state{in=In#payload{close_code=CloseCode2,
+ unmasked= <<Unmasked/binary, Payload/binary>>,
+ len=Len - byte_size(Data), unmasked_len=2 + byte_size(Data)}, utf8_state=Utf8State2}},
+ EvHandlerState};
{more, Payload, Utf8State2} ->
- {state, State#ws_state{in=In#payload{unmasked= << Unmasked/binary, Payload/binary >>,
- len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}};
+ {{state, State#ws_state{in=In#payload{unmasked= <<Unmasked/binary, Payload/binary>>,
+ len=Len - byte_size(Data), unmasked_len=UnmaskedLen + byte_size(Data)}, utf8_state=Utf8State2}},
+ EvHandlerState};
Error = {error, _Reason} ->
- close(Error, State)
+ close(Error, State, EvHandler, EvHandlerState)
end.
-dispatch(Rest, State0=#ws_state{frag_state=FragState,
+dispatch(Rest, State0=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
+ frag_state=FragState, extensions=Extensions,
handler=Handler, handler_state=HandlerState0},
- Type0, Payload0, CloseCode0) ->
- case cow_ws:make_frame(Type0, Payload0, CloseCode0, FragState) of
+ Type, Payload, CloseCode, EvHandler, EvHandlerState0) ->
+ EvHandlerState1 = EvHandler:ws_recv_frame_end(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ extensions => Extensions,
+ close_code => CloseCode,
+ payload => Payload
+ }, EvHandlerState0),
+ case cow_ws:make_frame(Type, Payload, CloseCode, FragState) of
ping ->
- {state, State} = send(pong, State0),
- handle(Rest, State);
+ {{state, State}, EvHandlerState} = send(pong, State0, EvHandler, EvHandlerState1),
+ handle(Rest, State, EvHandler, EvHandlerState);
{ping, Payload} ->
- {state, State} = send({pong, Payload}, State0),
- handle(Rest, State);
+ {{state, State}, EvHandlerState} = send({pong, Payload}, State0, EvHandler, EvHandlerState1),
+ handle(Rest, State, EvHandler, EvHandlerState);
pong ->
- handle(Rest, State0);
+ handle(Rest, State0, EvHandler, EvHandlerState1);
{pong, _} ->
- handle(Rest, State0);
+ handle(Rest, State0, EvHandler, EvHandlerState1);
Frame ->
HandlerState = Handler:handle(Frame, HandlerState0),
- State = State0#ws_state{handler_state=HandlerState},
- case Frame of
- close -> handle(Rest, State#ws_state{in=close});
- {close, _, _} -> handle(Rest, State#ws_state{in=close});
- {fragment, fin, _, _} -> handle(Rest, State#ws_state{frag_state=undefined});
- _ -> handle(Rest, State)
- end
+ State1 = State0#ws_state{handler_state=HandlerState},
+ State = case Frame of
+ close -> State1#ws_state{in=close};
+ {close, _, _} -> State1#ws_state{in=close};
+ {fragment, fin, _, _} -> State1#ws_state{frag_state=undefined};
+ _ -> State1
+ end,
+ handle(Rest, State, EvHandler, EvHandlerState1)
end.
-close(Reason, State) ->
+close(Reason, State, EvHandler, EvHandlerState) ->
case Reason of
normal ->
- send({close, 1000, <<>>}, State);
+ send({close, 1000, <<>>}, State, EvHandler, EvHandlerState);
owner_down ->
- send({close, 1001, <<>>}, State);
+ send({close, 1001, <<>>}, State, EvHandler, EvHandlerState);
{error, badframe} ->
- send({close, 1002, <<>>}, State);
+ send({close, 1002, <<>>}, State, EvHandler, EvHandlerState);
{error, badencoding} ->
- send({close, 1007, <<>>}, State);
+ send({close, 1007, <<>>}, State, EvHandler, EvHandlerState);
%% Socket errors; do nothing.
closed ->
- ok;
+ {ok, EvHandlerState};
{error, _} ->
- ok
+ {ok, EvHandlerState}
end.
-send(Frame, State=#ws_state{socket=Socket, transport=Transport, extensions=Extensions}) ->
+send(Frame, State=#ws_state{owner=ReplyTo, stream_ref=StreamRef,
+ socket=Socket, transport=Transport, extensions=Extensions},
+ EvHandler, EvHandlerState0) ->
+ WsSendFrameEvent = #{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ extensions => Extensions,
+ frame => Frame
+ },
+ EvHandlerState1 = EvHandler:ws_send_frame_start(WsSendFrameEvent, EvHandlerState0),
Transport:send(Socket, cow_ws:masked_frame(Frame, Extensions)),
+ EvHandlerState = EvHandler:ws_send_frame_end(WsSendFrameEvent, EvHandlerState1),
case Frame of
- close -> close;
- {close, _, _} -> close;
- _ -> {state, State}
+ close -> {close, EvHandlerState};
+ {close, _, _} -> {close, EvHandlerState};
+ _ -> {{state, State}, EvHandlerState}
end.
%% Websocket has no concept of streams.