aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/gun.erl10
-rw-r--r--src/gun_default_event_h.erl4
-rw-r--r--src/gun_event.erl21
-rw-r--r--src/gun_http.erl17
-rw-r--r--src/gun_http2.erl35
-rw-r--r--test/event_SUITE.erl40
6 files changed, 103 insertions, 24 deletions
diff --git a/src/gun.erl b/src/gun.erl
index b55d3a1..9add685 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -973,10 +973,12 @@ connected(info, {connect_protocol, Protocol}, #state{protocol=Protocol}) ->
keep_state_and_data;
connected(info, {connect_protocol, Protocol}, State=#state{protocol_state=ProtoState}) ->
commands([{switch_protocol, Protocol, ProtoState}], State);
-connected(cast, {cancel, ReplyTo, StreamRef},
- State=#state{protocol=Protocol, protocol_state=ProtoState}) ->
- ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo),
- {keep_state, State#state{protocol_state=ProtoState2}};
+connected(cast, {cancel, ReplyTo, StreamRef}, State=#state{
+ protocol=Protocol, protocol_state=ProtoState,
+ event_handler=EvHandler, event_handler_state=EvHandlerState0}) ->
+ {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState,
+ StreamRef, ReplyTo, EvHandler, EvHandlerState0),
+ {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}};
%% Public Websocket interface.
%% @todo Maybe make an interface in the protocol module instead of checking on protocol name.
%% An interface would also make sure that HTTP/1.0 can't upgrade.
diff --git a/src/gun_default_event_h.erl b/src/gun_default_event_h.erl
index 25076ac..213db06 100644
--- a/src/gun_default_event_h.erl
+++ b/src/gun_default_event_h.erl
@@ -39,6 +39,7 @@
-export([ws_recv_frame_end/2]).
-export([ws_send_frame_start/2]).
-export([ws_send_frame_end/2]).
+-export([cancel/2]).
-export([disconnect/2]).
-export([terminate/2]).
@@ -114,6 +115,9 @@ ws_send_frame_start(_EventData, State) ->
ws_send_frame_end(_EventData, State) ->
State.
+cancel(_EventData, State) ->
+ State.
+
disconnect(_EventData, State) ->
State.
diff --git a/src/gun_event.erl b/src/gun_event.erl
index f27d336..c87af58 100644
--- a/src/gun_event.erl
+++ b/src/gun_event.erl
@@ -236,6 +236,25 @@
-callback ws_send_frame_start(ws_send_frame_event(), State) -> State.
-callback ws_send_frame_end(ws_send_frame_event(), State) -> State.
+%% cancel.
+%%
+%% In the case of HTTP/1.1 we cannot actually cancel the stream,
+%% we only silence the stream to the user. Further response events
+%% may therefore be received and they provide a useful metric as
+%% these canceled requests monopolize the connection.
+%%
+%% For HTTP/2 both the client and the server may cancel streams.
+%% Events may still occur for a short time after the cancel.
+
+-type cancel_event() :: #{
+ stream_ref := reference(),
+ reply_to := pid(),
+ endpoint := local | remote,
+ reason := atom()
+}.
+
+-callback cancel(cancel_event(), State) -> State.
+
%% disconnect.
-type disconnect_event() :: #{
@@ -255,5 +274,3 @@
%% @todo origin_changed
%% @todo transport_changed
-%% @todo cancel_start
-%% @todo cancel_end
diff --git a/src/gun_http.erl b/src/gun_http.erl
index da72527..738b6e9 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -24,7 +24,7 @@
-export([request/11]).
-export([data/7]).
-export([connect/5]).
--export([cancel/3]).
+-export([cancel/5]).
-export([stream_info/2]).
-export([down/1]).
-export([ws_upgrade/9]).
@@ -605,12 +605,19 @@ connect(State=#http_state{socket=Socket, transport=Transport, version=Version},
new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>).
%% We can't cancel anything, we can just stop forwarding messages to the owner.
-cancel(State, StreamRef, ReplyTo) ->
- case is_stream(State, StreamRef) of
+cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
+ case is_stream(State0, StreamRef) of
true ->
- cancel_stream(State, StreamRef);
+ State = cancel_stream(State0, StreamRef),
+ EvHandlerState = EvHandler:cancel(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ endpoint => local,
+ reason => cancel
+ }, EvHandlerState0),
+ {State, EvHandlerState};
false ->
- error_stream_not_found(State, StreamRef, ReplyTo)
+ {error_stream_not_found(State0, StreamRef, ReplyTo), EvHandlerState0}
end.
stream_info(#http_state{streams=Streams}, StreamRef) ->
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index d039ecb..78b137b 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -23,7 +23,7 @@
-export([headers/10]).
-export([request/11]).
-export([data/7]).
--export([cancel/3]).
+-export([cancel/5]).
-export([stream_info/2]).
-export([down/1]).
@@ -150,8 +150,8 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl
trailers_frame(State#http2_state{http2_machine=HTTP2Machine},
StreamID, Trailers, EvHandler, EvHandlerState);
{ok, {rst_stream, StreamID, Reason}, HTTP2Machine} ->
- {rst_stream_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, Reason),
- EvHandlerState};
+ rst_stream_frame(State#http2_state{http2_machine=HTTP2Machine},
+ StreamID, Reason, EvHandler, EvHandlerState);
{ok, {push_promise, StreamID, PromisedStreamID, Headers, PseudoHeaders}, HTTP2Machine} ->
push_promise_frame(State#http2_state{http2_machine=HTTP2Machine},
StreamID, PromisedStreamID, Headers, PseudoHeaders,
@@ -268,14 +268,21 @@ trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) ->
EvHandlerState = EvHandler:response_end(ResponseEvent, EvHandlerState1),
{maybe_delete_stream(State, StreamID, remote, fin), EvHandlerState}.
-rst_stream_frame(State=#http2_state{streams=Streams0}, StreamID, Reason) ->
+rst_stream_frame(State=#http2_state{streams=Streams0},
+ StreamID, Reason, EvHandler, EvHandlerState0) ->
case lists:keytake(StreamID, #stream.id, Streams0) of
{value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} ->
ReplyTo ! {gun_error, self(), StreamRef,
{stream_error, Reason, 'Stream reset by server.'}},
- State#http2_state{streams=Streams};
+ EvHandlerState = EvHandler:cancel(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ endpoint => remote,
+ reason => Reason
+ }, EvHandlerState0),
+ {State#http2_state{streams=Streams}, EvHandlerState};
false ->
- State
+ {State, EvHandlerState0}
end.
push_promise_frame(State=#http2_state{streams=Streams},
@@ -482,15 +489,23 @@ reset_stream(State=#http2_state{socket=Socket, transport=Transport,
State
end.
-cancel(State=#http2_state{socket=Socket, transport=Transport,
- http2_machine=HTTP2Machine0}, StreamRef, ReplyTo) ->
+cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
+ StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
case get_stream_by_ref(State, StreamRef) of
#stream{id=StreamID} ->
{ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0),
Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)),
- delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID);
+ EvHandlerState = EvHandler:cancel(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ endpoint => local,
+ reason => cancel
+ }, EvHandlerState0),
+ {delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID),
+ EvHandlerState};
false ->
- error_stream_not_found(State, StreamRef, ReplyTo)
+ {error_stream_not_found(State, StreamRef, ReplyTo),
+ EvHandlerState0}
end.
stream_info(State, StreamRef) ->
diff --git a/test/event_SUITE.erl b/test/event_SUITE.erl
index ace509a..d043a11 100644
--- a/test/event_SUITE.erl
+++ b/test/event_SUITE.erl
@@ -35,7 +35,7 @@ groups() ->
%% We currently do not support Websocket over HTTP/2.
WsTests = [T || T <- Tests, lists:sublist(atom_to_list(T), 3) =:= "ws_"],
[
- {http, [parallel], Tests -- PushTests},
+ {http, [parallel], Tests -- [cancel_remote|PushTests]},
{http2, [parallel], Tests -- [protocol_changed|WsTests]}
].
@@ -629,6 +629,36 @@ do_ws_send_frame(Config, EventName) ->
} = do_receive_event(EventName),
gun:close(Pid).
+cancel(Config) ->
+ doc("Confirm that the cancel event callback is called when we cancel a stream."),
+ {ok, Pid, _} = do_gun_open(Config),
+ {ok, _} = gun:await_up(Pid),
+ StreamRef = gun:post(Pid, "/stream", []),
+ gun:cancel(Pid, StreamRef),
+ ReplyTo = self(),
+ #{
+ stream_ref := StreamRef,
+ reply_to := ReplyTo,
+ endpoint := local,
+ reason := cancel
+ } = do_receive_event(?FUNCTION_NAME),
+ gun:close(Pid).
+
+cancel_remote(Config) ->
+ doc("Confirm that the cancel event callback is called "
+ "when the remote endpoint cancels the stream."),
+ {ok, Pid, _} = do_gun_open(Config),
+ {ok, _} = gun:await_up(Pid),
+ StreamRef = gun:post(Pid, "/stream", []),
+ ReplyTo = self(),
+ #{
+ stream_ref := StreamRef,
+ reply_to := ReplyTo,
+ endpoint := remote,
+ reason := _
+ } = do_receive_event(cancel),
+ gun:close(Pid).
+
disconnect(Config) ->
doc("Confirm that the disconnect event callback is called on disconnect."),
{ok, OriginPid, OriginPort} = init_origin(tcp),
@@ -639,7 +669,7 @@ disconnect(Config) ->
exit(OriginPid, shutdown),
#{
reason := closed
- } = do_receive_event(disconnect),
+ } = do_receive_event(?FUNCTION_NAME),
gun:close(Pid).
terminate(Config) ->
@@ -649,7 +679,7 @@ terminate(Config) ->
#{
state := _,
reason := shutdown
- } = do_receive_event(terminate),
+ } = do_receive_event(?FUNCTION_NAME),
ok.
%% Internal.
@@ -785,6 +815,10 @@ ws_send_frame_end(EventData, Pid) ->
Pid ! {?FUNCTION_NAME, EventData},
Pid.
+cancel(EventData, Pid) ->
+ Pid ! {?FUNCTION_NAME, EventData},
+ Pid.
+
disconnect(EventData, Pid) ->
Pid ! {?FUNCTION_NAME, EventData},
Pid.