diff options
-rw-r--r-- | src/gun.erl | 10 | ||||
-rw-r--r-- | src/gun_default_event_h.erl | 4 | ||||
-rw-r--r-- | src/gun_event.erl | 21 | ||||
-rw-r--r-- | src/gun_http.erl | 17 | ||||
-rw-r--r-- | src/gun_http2.erl | 35 | ||||
-rw-r--r-- | test/event_SUITE.erl | 40 |
6 files changed, 103 insertions, 24 deletions
diff --git a/src/gun.erl b/src/gun.erl index b55d3a1..9add685 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -973,10 +973,12 @@ connected(info, {connect_protocol, Protocol}, #state{protocol=Protocol}) -> keep_state_and_data; connected(info, {connect_protocol, Protocol}, State=#state{protocol_state=ProtoState}) -> commands([{switch_protocol, Protocol, ProtoState}], State); -connected(cast, {cancel, ReplyTo, StreamRef}, - State=#state{protocol=Protocol, protocol_state=ProtoState}) -> - ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo), - {keep_state, State#state{protocol_state=ProtoState2}}; +connected(cast, {cancel, ReplyTo, StreamRef}, State=#state{ + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState, + StreamRef, ReplyTo, EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; %% Public Websocket interface. %% @todo Maybe make an interface in the protocol module instead of checking on protocol name. %% An interface would also make sure that HTTP/1.0 can't upgrade. diff --git a/src/gun_default_event_h.erl b/src/gun_default_event_h.erl index 25076ac..213db06 100644 --- a/src/gun_default_event_h.erl +++ b/src/gun_default_event_h.erl @@ -39,6 +39,7 @@ -export([ws_recv_frame_end/2]). -export([ws_send_frame_start/2]). -export([ws_send_frame_end/2]). +-export([cancel/2]). -export([disconnect/2]). -export([terminate/2]). @@ -114,6 +115,9 @@ ws_send_frame_start(_EventData, State) -> ws_send_frame_end(_EventData, State) -> State. +cancel(_EventData, State) -> + State. + disconnect(_EventData, State) -> State. diff --git a/src/gun_event.erl b/src/gun_event.erl index f27d336..c87af58 100644 --- a/src/gun_event.erl +++ b/src/gun_event.erl @@ -236,6 +236,25 @@ -callback ws_send_frame_start(ws_send_frame_event(), State) -> State. -callback ws_send_frame_end(ws_send_frame_event(), State) -> State. +%% cancel. +%% +%% In the case of HTTP/1.1 we cannot actually cancel the stream, +%% we only silence the stream to the user. Further response events +%% may therefore be received and they provide a useful metric as +%% these canceled requests monopolize the connection. +%% +%% For HTTP/2 both the client and the server may cancel streams. +%% Events may still occur for a short time after the cancel. + +-type cancel_event() :: #{ + stream_ref := reference(), + reply_to := pid(), + endpoint := local | remote, + reason := atom() +}. + +-callback cancel(cancel_event(), State) -> State. + %% disconnect. -type disconnect_event() :: #{ @@ -255,5 +274,3 @@ %% @todo origin_changed %% @todo transport_changed -%% @todo cancel_start -%% @todo cancel_end diff --git a/src/gun_http.erl b/src/gun_http.erl index da72527..738b6e9 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -24,7 +24,7 @@ -export([request/11]). -export([data/7]). -export([connect/5]). --export([cancel/3]). +-export([cancel/5]). -export([stream_info/2]). -export([down/1]). -export([ws_upgrade/9]). @@ -605,12 +605,19 @@ connect(State=#http_state{socket=Socket, transport=Transport, version=Version}, new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>). %% We can't cancel anything, we can just stop forwarding messages to the owner. -cancel(State, StreamRef, ReplyTo) -> - case is_stream(State, StreamRef) of +cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> + case is_stream(State0, StreamRef) of true -> - cancel_stream(State, StreamRef); + State = cancel_stream(State0, StreamRef), + EvHandlerState = EvHandler:cancel(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + endpoint => local, + reason => cancel + }, EvHandlerState0), + {State, EvHandlerState}; false -> - error_stream_not_found(State, StreamRef, ReplyTo) + {error_stream_not_found(State0, StreamRef, ReplyTo), EvHandlerState0} end. stream_info(#http_state{streams=Streams}, StreamRef) -> diff --git a/src/gun_http2.erl b/src/gun_http2.erl index d039ecb..78b137b 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -23,7 +23,7 @@ -export([headers/10]). -export([request/11]). -export([data/7]). --export([cancel/3]). +-export([cancel/5]). -export([stream_info/2]). -export([down/1]). @@ -150,8 +150,8 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl trailers_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, Trailers, EvHandler, EvHandlerState); {ok, {rst_stream, StreamID, Reason}, HTTP2Machine} -> - {rst_stream_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, Reason), - EvHandlerState}; + rst_stream_frame(State#http2_state{http2_machine=HTTP2Machine}, + StreamID, Reason, EvHandler, EvHandlerState); {ok, {push_promise, StreamID, PromisedStreamID, Headers, PseudoHeaders}, HTTP2Machine} -> push_promise_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, PromisedStreamID, Headers, PseudoHeaders, @@ -268,14 +268,21 @@ trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> EvHandlerState = EvHandler:response_end(ResponseEvent, EvHandlerState1), {maybe_delete_stream(State, StreamID, remote, fin), EvHandlerState}. -rst_stream_frame(State=#http2_state{streams=Streams0}, StreamID, Reason) -> +rst_stream_frame(State=#http2_state{streams=Streams0}, + StreamID, Reason, EvHandler, EvHandlerState0) -> case lists:keytake(StreamID, #stream.id, Streams0) of {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} -> ReplyTo ! {gun_error, self(), StreamRef, {stream_error, Reason, 'Stream reset by server.'}}, - State#http2_state{streams=Streams}; + EvHandlerState = EvHandler:cancel(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + endpoint => remote, + reason => Reason + }, EvHandlerState0), + {State#http2_state{streams=Streams}, EvHandlerState}; false -> - State + {State, EvHandlerState0} end. push_promise_frame(State=#http2_state{streams=Streams}, @@ -482,15 +489,23 @@ reset_stream(State=#http2_state{socket=Socket, transport=Transport, State end. -cancel(State=#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, StreamRef, ReplyTo) -> +cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, + StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of #stream{id=StreamID} -> {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)), - delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID); + EvHandlerState = EvHandler:cancel(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + endpoint => local, + reason => cancel + }, EvHandlerState0), + {delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID), + EvHandlerState}; false -> - error_stream_not_found(State, StreamRef, ReplyTo) + {error_stream_not_found(State, StreamRef, ReplyTo), + EvHandlerState0} end. stream_info(State, StreamRef) -> diff --git a/test/event_SUITE.erl b/test/event_SUITE.erl index ace509a..d043a11 100644 --- a/test/event_SUITE.erl +++ b/test/event_SUITE.erl @@ -35,7 +35,7 @@ groups() -> %% We currently do not support Websocket over HTTP/2. WsTests = [T || T <- Tests, lists:sublist(atom_to_list(T), 3) =:= "ws_"], [ - {http, [parallel], Tests -- PushTests}, + {http, [parallel], Tests -- [cancel_remote|PushTests]}, {http2, [parallel], Tests -- [protocol_changed|WsTests]} ]. @@ -629,6 +629,36 @@ do_ws_send_frame(Config, EventName) -> } = do_receive_event(EventName), gun:close(Pid). +cancel(Config) -> + doc("Confirm that the cancel event callback is called when we cancel a stream."), + {ok, Pid, _} = do_gun_open(Config), + {ok, _} = gun:await_up(Pid), + StreamRef = gun:post(Pid, "/stream", []), + gun:cancel(Pid, StreamRef), + ReplyTo = self(), + #{ + stream_ref := StreamRef, + reply_to := ReplyTo, + endpoint := local, + reason := cancel + } = do_receive_event(?FUNCTION_NAME), + gun:close(Pid). + +cancel_remote(Config) -> + doc("Confirm that the cancel event callback is called " + "when the remote endpoint cancels the stream."), + {ok, Pid, _} = do_gun_open(Config), + {ok, _} = gun:await_up(Pid), + StreamRef = gun:post(Pid, "/stream", []), + ReplyTo = self(), + #{ + stream_ref := StreamRef, + reply_to := ReplyTo, + endpoint := remote, + reason := _ + } = do_receive_event(cancel), + gun:close(Pid). + disconnect(Config) -> doc("Confirm that the disconnect event callback is called on disconnect."), {ok, OriginPid, OriginPort} = init_origin(tcp), @@ -639,7 +669,7 @@ disconnect(Config) -> exit(OriginPid, shutdown), #{ reason := closed - } = do_receive_event(disconnect), + } = do_receive_event(?FUNCTION_NAME), gun:close(Pid). terminate(Config) -> @@ -649,7 +679,7 @@ terminate(Config) -> #{ state := _, reason := shutdown - } = do_receive_event(terminate), + } = do_receive_event(?FUNCTION_NAME), ok. %% Internal. @@ -785,6 +815,10 @@ ws_send_frame_end(EventData, Pid) -> Pid ! {?FUNCTION_NAME, EventData}, Pid. +cancel(EventData, Pid) -> + Pid ! {?FUNCTION_NAME, EventData}, + Pid. + disconnect(EventData, Pid) -> Pid ! {?FUNCTION_NAME, EventData}, Pid. |