From 3d1e3c9e6e779bca839b86717adb2487b4a1c2c6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Wed, 24 Jul 2019 16:22:26 +0200 Subject: Add the cancel event for local/remote stream cancellation --- src/gun.erl | 10 ++++++---- src/gun_default_event_h.erl | 4 ++++ src/gun_event.erl | 21 +++++++++++++++++++-- src/gun_http.erl | 17 ++++++++++++----- src/gun_http2.erl | 35 +++++++++++++++++++++++++---------- 5 files changed, 66 insertions(+), 21 deletions(-) (limited to 'src') diff --git a/src/gun.erl b/src/gun.erl index b55d3a1..9add685 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -973,10 +973,12 @@ connected(info, {connect_protocol, Protocol}, #state{protocol=Protocol}) -> keep_state_and_data; connected(info, {connect_protocol, Protocol}, State=#state{protocol_state=ProtoState}) -> commands([{switch_protocol, Protocol, ProtoState}], State); -connected(cast, {cancel, ReplyTo, StreamRef}, - State=#state{protocol=Protocol, protocol_state=ProtoState}) -> - ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo), - {keep_state, State#state{protocol_state=ProtoState2}}; +connected(cast, {cancel, ReplyTo, StreamRef}, State=#state{ + protocol=Protocol, protocol_state=ProtoState, + event_handler=EvHandler, event_handler_state=EvHandlerState0}) -> + {ProtoState2, EvHandlerState} = Protocol:cancel(ProtoState, + StreamRef, ReplyTo, EvHandler, EvHandlerState0), + {keep_state, State#state{protocol_state=ProtoState2, event_handler_state=EvHandlerState}}; %% Public Websocket interface. %% @todo Maybe make an interface in the protocol module instead of checking on protocol name. %% An interface would also make sure that HTTP/1.0 can't upgrade. diff --git a/src/gun_default_event_h.erl b/src/gun_default_event_h.erl index 25076ac..213db06 100644 --- a/src/gun_default_event_h.erl +++ b/src/gun_default_event_h.erl @@ -39,6 +39,7 @@ -export([ws_recv_frame_end/2]). -export([ws_send_frame_start/2]). -export([ws_send_frame_end/2]). +-export([cancel/2]). -export([disconnect/2]). -export([terminate/2]). @@ -114,6 +115,9 @@ ws_send_frame_start(_EventData, State) -> ws_send_frame_end(_EventData, State) -> State. +cancel(_EventData, State) -> + State. + disconnect(_EventData, State) -> State. diff --git a/src/gun_event.erl b/src/gun_event.erl index f27d336..c87af58 100644 --- a/src/gun_event.erl +++ b/src/gun_event.erl @@ -236,6 +236,25 @@ -callback ws_send_frame_start(ws_send_frame_event(), State) -> State. -callback ws_send_frame_end(ws_send_frame_event(), State) -> State. +%% cancel. +%% +%% In the case of HTTP/1.1 we cannot actually cancel the stream, +%% we only silence the stream to the user. Further response events +%% may therefore be received and they provide a useful metric as +%% these canceled requests monopolize the connection. +%% +%% For HTTP/2 both the client and the server may cancel streams. +%% Events may still occur for a short time after the cancel. + +-type cancel_event() :: #{ + stream_ref := reference(), + reply_to := pid(), + endpoint := local | remote, + reason := atom() +}. + +-callback cancel(cancel_event(), State) -> State. + %% disconnect. -type disconnect_event() :: #{ @@ -255,5 +274,3 @@ %% @todo origin_changed %% @todo transport_changed -%% @todo cancel_start -%% @todo cancel_end diff --git a/src/gun_http.erl b/src/gun_http.erl index da72527..738b6e9 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -24,7 +24,7 @@ -export([request/11]). -export([data/7]). -export([connect/5]). --export([cancel/3]). +-export([cancel/5]). -export([stream_info/2]). -export([down/1]). -export([ws_upgrade/9]). @@ -605,12 +605,19 @@ connect(State=#http_state{socket=Socket, transport=Transport, version=Version}, new_stream(State, {connect, StreamRef, Destination}, ReplyTo, <<"CONNECT">>). %% We can't cancel anything, we can just stop forwarding messages to the owner. -cancel(State, StreamRef, ReplyTo) -> - case is_stream(State, StreamRef) of +cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> + case is_stream(State0, StreamRef) of true -> - cancel_stream(State, StreamRef); + State = cancel_stream(State0, StreamRef), + EvHandlerState = EvHandler:cancel(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + endpoint => local, + reason => cancel + }, EvHandlerState0), + {State, EvHandlerState}; false -> - error_stream_not_found(State, StreamRef, ReplyTo) + {error_stream_not_found(State0, StreamRef, ReplyTo), EvHandlerState0} end. stream_info(#http_state{streams=Streams}, StreamRef) -> diff --git a/src/gun_http2.erl b/src/gun_http2.erl index d039ecb..78b137b 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -23,7 +23,7 @@ -export([headers/10]). -export([request/11]). -export([data/7]). --export([cancel/3]). +-export([cancel/5]). -export([stream_info/2]). -export([down/1]). @@ -150,8 +150,8 @@ frame(State=#http2_state{http2_machine=HTTP2Machine0}, Frame, EvHandler, EvHandl trailers_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, Trailers, EvHandler, EvHandlerState); {ok, {rst_stream, StreamID, Reason}, HTTP2Machine} -> - {rst_stream_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, Reason), - EvHandlerState}; + rst_stream_frame(State#http2_state{http2_machine=HTTP2Machine}, + StreamID, Reason, EvHandler, EvHandlerState); {ok, {push_promise, StreamID, PromisedStreamID, Headers, PseudoHeaders}, HTTP2Machine} -> push_promise_frame(State#http2_state{http2_machine=HTTP2Machine}, StreamID, PromisedStreamID, Headers, PseudoHeaders, @@ -268,14 +268,21 @@ trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> EvHandlerState = EvHandler:response_end(ResponseEvent, EvHandlerState1), {maybe_delete_stream(State, StreamID, remote, fin), EvHandlerState}. -rst_stream_frame(State=#http2_state{streams=Streams0}, StreamID, Reason) -> +rst_stream_frame(State=#http2_state{streams=Streams0}, + StreamID, Reason, EvHandler, EvHandlerState0) -> case lists:keytake(StreamID, #stream.id, Streams0) of {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} -> ReplyTo ! {gun_error, self(), StreamRef, {stream_error, Reason, 'Stream reset by server.'}}, - State#http2_state{streams=Streams}; + EvHandlerState = EvHandler:cancel(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + endpoint => remote, + reason => Reason + }, EvHandlerState0), + {State#http2_state{streams=Streams}, EvHandlerState}; false -> - State + {State, EvHandlerState0} end. push_promise_frame(State=#http2_state{streams=Streams}, @@ -482,15 +489,23 @@ reset_stream(State=#http2_state{socket=Socket, transport=Transport, State end. -cancel(State=#http2_state{socket=Socket, transport=Transport, - http2_machine=HTTP2Machine0}, StreamRef, ReplyTo) -> +cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, + StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> case get_stream_by_ref(State, StreamRef) of #stream{id=StreamID} -> {ok, HTTP2Machine} = cow_http2_machine:reset_stream(StreamID, HTTP2Machine0), Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)), - delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID); + EvHandlerState = EvHandler:cancel(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + endpoint => local, + reason => cancel + }, EvHandlerState0), + {delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID), + EvHandlerState}; false -> - error_stream_not_found(State, StreamRef, ReplyTo) + {error_stream_not_found(State, StreamRef, ReplyTo), + EvHandlerState0} end. stream_info(State, StreamRef) -> -- cgit v1.2.3