aboutsummaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-07-02 17:28:44 +0200
committerLoïc Hoguin <[email protected]>2019-07-02 17:29:40 +0200
commit4a6503186bf3a72880e7c99be76406550aeded96 (patch)
tree3be68b90bc7813fc87a0ac6167793842fa4557d5 /src
parent1c03ef37c3b9060db8483e3870771d900e176c97 (diff)
downloadgun-4a6503186bf3a72880e7c99be76406550aeded96.tar.gz
gun-4a6503186bf3a72880e7c99be76406550aeded96.tar.bz2
gun-4a6503186bf3a72880e7c99be76406550aeded96.zip
Add response_inform/response_headers/response_end events
This covers many scenarios but more need to be added.
Diffstat (limited to 'src')
-rw-r--r--src/gun_default_event_h.erl12
-rw-r--r--src/gun_event.erl27
-rw-r--r--src/gun_http.erl192
-rw-r--r--src/gun_http2.erl57
4 files changed, 207 insertions, 81 deletions
diff --git a/src/gun_default_event_h.erl b/src/gun_default_event_h.erl
index a29183d..4156ed7 100644
--- a/src/gun_default_event_h.erl
+++ b/src/gun_default_event_h.erl
@@ -21,6 +21,9 @@
-export([request_start/2]).
-export([request_headers/2]).
-export([request_end/2]).
+-export([response_inform/2]).
+-export([response_headers/2]).
+-export([response_end/2]).
-export([disconnect/2]).
-export([terminate/2]).
@@ -42,6 +45,15 @@ request_headers(_EventData, State) ->
request_end(_EventData, State) ->
State.
+response_inform(_EventData, State) ->
+ State.
+
+response_headers(_EventData, State) ->
+ State.
+
+response_end(_EventData, State) ->
+ State.
+
disconnect(_EventData, State) ->
State.
diff --git a/src/gun_event.erl b/src/gun_event.erl
index 56f1a36..8e66400 100644
--- a/src/gun_event.erl
+++ b/src/gun_event.erl
@@ -68,6 +68,27 @@
-callback request_end(request_end_event(), State) -> State.
+%% response_inform/response_headers.
+
+-type response_headers_event() :: #{
+ stream_ref := reference(),
+ reply_to := pid(),
+ status := non_neg_integer(),
+ headers := [{binary(), binary()}]
+}.
+
+-callback response_inform(response_headers_event(), State) -> State.
+-callback response_headers(response_headers_event(), State) -> State.
+
+%% response_end.
+
+-type response_end_event() :: #{
+ stream_ref := reference(),
+ reply_to := pid()
+}.
+
+-callback response_end(response_end_event(), State) -> State.
+
%% disconnect.
-type disconnect_event() :: #{
@@ -92,10 +113,8 @@
%% @todo origin_changed
%% @todo transport_changed
%% @todo protocol_changed
-%% @todo response_start (call it once per inform + one for the response)
-%% @todo response_inform
-%% @todo response_headers
-%% @todo response_end
+%% @todo response_start (needs changes in cow_http2_machine to have an event before decoding headers)
+%% @todo response_trailers (same)
%% @todo push_promise_start
%% @todo push_promise_end
%% @todo cancel_start
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 582106b..7edaf14 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -93,127 +93,158 @@ init(Owner, Socket, Transport, Opts) ->
#http_state{owner=Owner, socket=Socket, transport=Transport, version=Version,
content_handlers=Handlers, transform_header_name=TransformHeaderName}.
-handle(Data, State, _EvHandler, EvHandlerState) ->
- {handle(Data, State), EvHandlerState}.
-
%% Stop looping when we got no more data.
-handle(<<>>, State) ->
- {state, State};
+handle(<<>>, State, _, EvHandlerState) ->
+ {{state, State}, EvHandlerState};
%% Close when server responds and we don't have any open streams.
-handle(_, #http_state{streams=[]}) ->
- close;
+handle(_, #http_state{streams=[]}, _, EvHandlerState) ->
+ {close, EvHandlerState};
%% Wait for the full response headers before trying to parse them.
-handle(Data, State=#http_state{in=head, buffer=Buffer}) ->
+handle(Data, State=#http_state{in=head, buffer=Buffer}, EvHandler, EvHandlerState) ->
Data2 = << Buffer/binary, Data/binary >>,
case binary:match(Data2, <<"\r\n\r\n">>) of
- nomatch -> {state, State#http_state{buffer=Data2}};
- {_, _} -> handle_head(Data2, State#http_state{buffer= <<>>})
+ nomatch -> {{state, State#http_state{buffer=Data2}}, EvHandlerState};
+ {_, _} -> handle_head(Data2, State#http_state{buffer= <<>>}, EvHandler, EvHandlerState)
end;
%% Everything sent to the socket until it closes is part of the response body.
-handle(Data, State=#http_state{in=body_close}) ->
- {state, send_data_if_alive(Data, State, nofin)};
+handle(Data, State=#http_state{in=body_close}, _, EvHandlerState) ->
+ {{state, send_data_if_alive(Data, State, nofin)}, EvHandlerState};
%% Chunked transfer-encoding may contain both data and trailers.
handle(Data, State=#http_state{in=body_chunked, in_state=InState,
- buffer=Buffer, connection=Conn}) ->
+ buffer=Buffer, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_],
+ connection=Conn}, EvHandler, EvHandlerState0) ->
Buffer2 = << Buffer/binary, Data/binary >>,
case cow_http_te:stream_chunked(Buffer2, InState) of
more ->
- {state, State#http_state{buffer=Buffer2}};
+ {{state, State#http_state{buffer=Buffer2}}, EvHandlerState0};
{more, Data2, InState2} ->
- {state, send_data_if_alive(Data2,
+ {{state, send_data_if_alive(Data2,
State#http_state{buffer= <<>>, in_state=InState2},
- nofin)};
+ nofin)}, EvHandlerState0};
{more, Data2, Length, InState2} when is_integer(Length) ->
%% @todo See if we can recv faster than one message at a time.
- {state, send_data_if_alive(Data2,
+ {{state, send_data_if_alive(Data2,
State#http_state{buffer= <<>>, in_state=InState2},
- nofin)};
+ nofin)}, EvHandlerState0};
{more, Data2, Rest, InState2} ->
%% @todo See if we can recv faster than one message at a time.
- {state, send_data_if_alive(Data2,
+ {{state, send_data_if_alive(Data2,
State#http_state{buffer=Rest, in_state=InState2},
- nofin)};
+ nofin)}, EvHandlerState0};
{done, HasTrailers, Rest} ->
- IsFin = case HasTrailers of
- trailers -> nofin;
- no_trailers -> fin
+ %% @todo response_end should be called AFTER send_data_if_alive
+ {IsFin, EvHandlerState} = case HasTrailers of
+ trailers ->
+ {nofin, EvHandlerState0};
+ no_trailers ->
+ EvHandlerState1 = EvHandler:response_end(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo
+ }, EvHandlerState0),
+ {fin, EvHandlerState1}
end,
%% I suppose it doesn't hurt to append an empty binary.
State1 = send_data_if_alive(<<>>, State, IsFin),
case {HasTrailers, Conn} of
{trailers, _} ->
- handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer});
+ handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}, EvHandler, EvHandlerState);
{no_trailers, keepalive} ->
- handle(Rest, end_stream(State1#http_state{buffer= <<>>}));
+ handle(Rest, end_stream(State1#http_state{buffer= <<>>}), EvHandler, EvHandlerState);
{no_trailers, close} ->
- [{state, end_stream(State1)}, close]
+ {[{state, end_stream(State1)}, close], EvHandlerState}
end;
{done, Data2, HasTrailers, Rest} ->
- IsFin = case HasTrailers of
- trailers -> nofin;
- no_trailers -> fin
+ %% @todo response_end should be called AFTER send_data_if_alive
+ {IsFin, EvHandlerState} = case HasTrailers of
+ trailers ->
+ {nofin, EvHandlerState0};
+ no_trailers ->
+ EvHandlerState1 = EvHandler:response_end(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo
+ }, EvHandlerState0),
+ {fin, EvHandlerState1}
end,
State1 = send_data_if_alive(Data2, State, IsFin),
case {HasTrailers, Conn} of
{trailers, _} ->
- handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer});
+ handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}, EvHandler, EvHandlerState);
{no_trailers, keepalive} ->
- handle(Rest, end_stream(State1#http_state{buffer= <<>>}));
+ handle(Rest, end_stream(State1#http_state{buffer= <<>>}), EvHandler, EvHandlerState);
{no_trailers, close} ->
- [{state, end_stream(State1)}, close]
+ {[{state, end_stream(State1)}, close], EvHandlerState}
end
end;
handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn,
- streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}) ->
+ streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}, EvHandler, EvHandlerState0) ->
Data2 = << Buffer/binary, Data/binary >>,
case binary:match(Data2, <<"\r\n\r\n">>) of
- nomatch -> {state, State#http_state{buffer=Data2}};
+ nomatch ->
+ {{state, State#http_state{buffer=Data2}}, EvHandlerState0};
{_, _} ->
{Trailers, Rest} = cow_http:parse_headers(Data2),
%% @todo We probably want to pass this to gun_content_handler?
ReplyTo ! {gun_trailers, self(), stream_ref(StreamRef), Trailers},
+ EvHandlerState = EvHandler:response_end(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo
+ }, EvHandlerState0),
case Conn of
keepalive ->
- handle(Rest, end_stream(State#http_state{buffer= <<>>}));
+ handle(Rest, end_stream(State#http_state{buffer= <<>>}), EvHandler, EvHandlerState);
close ->
- [{state, end_stream(State)}, close]
+ {[{state, end_stream(State)}, close], EvHandlerState}
end
end;
%% We know the length of the rest of the body.
-handle(Data, State=#http_state{in={body, Length}, connection=Conn}) ->
+handle(Data, State=#http_state{in={body, Length}, connection=Conn,
+ streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]},
+ EvHandler, EvHandlerState0) ->
DataSize = byte_size(Data),
if
%% More data coming.
DataSize < Length ->
- {state, send_data_if_alive(Data,
+ {{state, send_data_if_alive(Data,
State#http_state{in={body, Length - DataSize}},
- nofin)};
+ nofin)}, EvHandlerState0};
%% Stream finished, no rest.
DataSize =:= Length ->
State1 = send_data_if_alive(Data, State, fin),
+ EvHandlerState = EvHandler:response_end(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo
+ }, EvHandlerState0),
case Conn of
- keepalive -> {state, end_stream(State1)};
- close -> [{state, end_stream(State1)}, close]
+ keepalive -> {{state, end_stream(State1)}, EvHandlerState};
+ close -> {[{state, end_stream(State1)}, close], EvHandlerState}
end;
%% Stream finished, rest.
true ->
<< Body:Length/binary, Rest/bits >> = Data,
State1 = send_data_if_alive(Body, State, fin),
+ EvHandlerState = EvHandler:response_end(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo
+ }, EvHandlerState0),
case Conn of
- keepalive -> handle(Rest, end_stream(State1));
- close -> [{state, end_stream(State1)}, close]
+ keepalive -> handle(Rest, end_stream(State1), EvHandler, EvHandlerState);
+ close -> {[{state, end_stream(State1)}, close], EvHandlerState}
end
end.
handle_head(Data, State=#http_state{socket=Socket, transport=Transport,
version=ClientVersion, content_handlers=Handlers0, connection=Conn,
streams=[Stream=#stream{ref=StreamRef, reply_to=ReplyTo,
- method=Method, is_alive=IsAlive}|Tail]}) ->
+ method=Method, is_alive=IsAlive}|Tail]},
+ EvHandler, EvHandlerState0) ->
{Version, Status, _, Rest} = cow_http:parse_status_line(Data),
{Headers, Rest2} = cow_http:parse_headers(Rest),
case {Status, StreamRef} of
{101, {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts}} ->
- ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts);
+ %% @todo Websocket's 101 response_inform.
+ {ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts),
+ EvHandlerState0};
+ %% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup.
{_, {connect, RealStreamRef, Destination}} when Status >= 200, Status < 300 ->
case IsAlive of
false ->
@@ -223,6 +254,13 @@ handle_head(Data, State=#http_state{socket=Socket, transport=Transport,
fin, Status, Headers},
ok
end,
+ %% @todo Figure out whether the event should trigger if the stream was cancelled.
+ EvHandlerState = EvHandler:response_headers(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ status => Status,
+ headers => Headers
+ }, EvHandlerState0),
%% We expect there to be no additional data after the CONNECT response.
<<>> = Rest2,
State2 = end_stream(State#http_state{streams=[Stream|Tail]}),
@@ -237,9 +275,9 @@ handle_head(Data, State=#http_state{socket=Socket, transport=Transport,
%% In this case the switch_protocol is delayed and is handled by
%% a message sent from gun_tls_proxy once the connection is established,
%% and handled by the gun module directly.
- [{state, State2#http_state{socket=ProxyPid, transport=gun_tls_proxy}},
+ {[{state, State2#http_state{socket=ProxyPid, transport=gun_tls_proxy}},
{origin, <<"https">>, NewHost, NewPort, connect},
- {switch_transport, gun_tls_proxy, ProxyPid}];
+ {switch_transport, gun_tls_proxy, ProxyPid}], EvHandlerState};
#{transport := tls} ->
TLSOpts = maps:get(tls_opts, Destination, []),
TLSTimeout = maps:get(tls_handshake_timeout, Destination, infinity),
@@ -247,46 +285,68 @@ handle_head(Data, State=#http_state{socket=Socket, transport=Transport,
{ok, TLSSocket} ->
case ssl:negotiated_protocol(TLSSocket) of
{ok, <<"h2">>} ->
- [{origin, <<"https">>, NewHost, NewPort, connect},
+ {[{origin, <<"https">>, NewHost, NewPort, connect},
{switch_transport, gun_tls, TLSSocket},
- {switch_protocol, gun_http2, State2}];
+ {switch_protocol, gun_http2, State2}], EvHandlerState};
_ ->
- [{state, State2#http_state{socket=TLSSocket, transport=gun_tls}},
+ {[{state, State2#http_state{socket=TLSSocket, transport=gun_tls}},
{origin, <<"https">>, NewHost, NewPort, connect},
- {switch_transport, gun_tls, TLSSocket}]
+ {switch_transport, gun_tls, TLSSocket}], EvHandlerState}
end;
Error ->
- Error
+ {Error, EvHandlerState}
end;
_ ->
case maps:get(protocols, Destination, [http]) of
[http] ->
- [{state, State2},
- {origin, <<"http">>, NewHost, NewPort, connect}];
+ {[{state, State2},
+ {origin, <<"http">>, NewHost, NewPort, connect}], EvHandlerState};
[http2] ->
- [{origin, <<"http">>, NewHost, NewPort, connect},
- {switch_protocol, gun_http2, State2}]
+ {[{origin, <<"http">>, NewHost, NewPort, connect},
+ {switch_protocol, gun_http2, State2}], EvHandlerState}
end
end;
{_, _} when Status >= 100, Status =< 199 ->
ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers},
- handle(Rest2, State);
+ EvHandlerState = EvHandler:response_inform(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ status => Status,
+ headers => Headers
+ }, EvHandlerState0),
+ handle(Rest2, State, EvHandler, EvHandlerState);
_ ->
In = response_io_from_headers(Method, Version, Status, Headers),
IsFin = case In of head -> fin; _ -> nofin end,
- Handlers = case IsAlive of
+ %% @todo Figure out whether the event should trigger if the stream was cancelled.
+ {Handlers, EvHandlerState2} = case IsAlive of
false ->
- undefined;
+ {undefined, EvHandlerState0};
true ->
ReplyTo ! {gun_response, self(), stream_ref(StreamRef),
IsFin, Status, Headers},
+ EvHandlerState1 = EvHandler:response_headers(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ status => Status,
+ headers => Headers
+ }, EvHandlerState0),
case IsFin of
- fin -> undefined;
+ fin -> {undefined, EvHandlerState1};
nofin ->
- gun_content_handler:init(ReplyTo, stream_ref(StreamRef),
- Status, Headers, Handlers0)
+ {gun_content_handler:init(ReplyTo, stream_ref(StreamRef),
+ Status, Headers, Handlers0), EvHandlerState1}
end
end,
+ EvHandlerState = case IsFin of
+ nofin ->
+ EvHandlerState2;
+ fin ->
+ EvHandler:response_end(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo
+ }, EvHandlerState2)
+ end,
Conn2 = if
Conn =:= close -> close;
Version =:= 'HTTP/1.0' -> close;
@@ -296,15 +356,17 @@ handle_head(Data, State=#http_state{socket=Socket, transport=Transport,
%% We always reset in_state even if not chunked.
if
IsFin =:= fin, Conn2 =:= close ->
- close;
+ {close, EvHandlerState};
IsFin =:= fin ->
handle(Rest2, end_stream(State#http_state{in=In,
in_state={0, 0}, connection=Conn2,
- streams=[Stream#stream{handler_state=Handlers}|Tail]}));
+ streams=[Stream#stream{handler_state=Handlers}|Tail]}),
+ EvHandler, EvHandlerState);
true ->
handle(Rest2, State#http_state{in=In,
in_state={0, 0}, connection=Conn2,
- streams=[Stream#stream{handler_state=Handlers}|Tail]})
+ streams=[Stream#stream{handler_state=Handlers}|Tail]},
+ EvHandler, EvHandlerState)
end
end.
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 40afb16..17bedde 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -161,14 +161,21 @@ maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) ->
data_frame(State=#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine0}, StreamID, IsFin, Data,
- _EvHandler, EvHandlerState) ->
- Stream = #stream{handler_state=Handlers0} = get_stream_by_id(State, StreamID),
+ EvHandler, EvHandlerState0) ->
+ Stream = #stream{ref=StreamRef, reply_to=ReplyTo,
+ handler_state=Handlers0} = get_stream_by_id(State, StreamID),
Handlers = gun_content_handler:handle(IsFin, Data, Handlers0),
Size = byte_size(Data),
- HTTP2Machine = case Size of
+ {HTTP2Machine, EvHandlerState} = case Size of
%% We do not send a WINDOW_UPDATE if the DATA frame was of size 0.
+ 0 when IsFin =:= fin ->
+ EvHandlerState1 = EvHandler:response_end(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo
+ }, EvHandlerState0),
+ {HTTP2Machine0, EvHandlerState1};
0 ->
- HTTP2Machine0;
+ {HTTP2Machine0, EvHandlerState0};
_ ->
Transport:send(Socket, cow_http2:window_update(Size)),
HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0),
@@ -176,9 +183,14 @@ data_frame(State=#http2_state{socket=Socket, transport=Transport,
case IsFin of
nofin ->
Transport:send(Socket, cow_http2:window_update(StreamID, Size)),
- cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1);
+ {cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1),
+ EvHandlerState0};
fin ->
- HTTP2Machine1
+ EvHandlerState1 = EvHandler:response_end(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo
+ }, EvHandlerState0),
+ {HTTP2Machine1, EvHandlerState1}
end
end,
{maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine},
@@ -187,29 +199,50 @@ data_frame(State=#http2_state{socket=Socket, transport=Transport,
headers_frame(State=#http2_state{content_handlers=Handlers0},
StreamID, IsFin, Headers, PseudoHeaders, _BodyLen,
- _EvHandler, EvHandlerState) ->
+ EvHandler, EvHandlerState0) ->
Stream = #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
case PseudoHeaders of
#{status := Status} when Status >= 100, Status =< 199 ->
ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers},
+ EvHandlerState = EvHandler:response_inform(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ status => Status,
+ headers => Headers
+ }, EvHandlerState0),
{State, EvHandlerState};
#{status := Status} ->
ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers},
- Handlers = case IsFin of
- fin -> undefined;
+ EvHandlerState1 = EvHandler:response_headers(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo,
+ status => Status,
+ headers => Headers
+ }, EvHandlerState0),
+ {Handlers, EvHandlerState} = case IsFin of
+ fin ->
+ EvHandlerState2 = EvHandler:response_end(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo
+ }, EvHandlerState1),
+ {undefined, EvHandlerState2};
nofin ->
- gun_content_handler:init(ReplyTo, StreamRef,
- Status, Headers, Handlers0)
+ {gun_content_handler:init(ReplyTo, StreamRef,
+ Status, Headers, Handlers0), EvHandlerState1}
end,
{maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}),
StreamID, remote, IsFin),
EvHandlerState}
end.
-trailers_frame(State, StreamID, Trailers, _EvHandler, EvHandlerState) ->
+trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) ->
#stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID),
%% @todo We probably want to pass this to gun_content_handler?
ReplyTo ! {gun_trailers, self(), StreamRef, Trailers},
+ EvHandlerState = EvHandler:response_end(#{
+ stream_ref => StreamRef,
+ reply_to => ReplyTo
+ }, EvHandlerState0),
{maybe_delete_stream(State, StreamID, remote, fin), EvHandlerState}.
rst_stream_frame(State=#http2_state{streams=Streams0}, StreamID, Reason) ->