diff options
author | Loïc Hoguin <[email protected]> | 2019-07-02 17:28:44 +0200 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2019-07-02 17:29:40 +0200 |
commit | 4a6503186bf3a72880e7c99be76406550aeded96 (patch) | |
tree | 3be68b90bc7813fc87a0ac6167793842fa4557d5 /src | |
parent | 1c03ef37c3b9060db8483e3870771d900e176c97 (diff) | |
download | gun-4a6503186bf3a72880e7c99be76406550aeded96.tar.gz gun-4a6503186bf3a72880e7c99be76406550aeded96.tar.bz2 gun-4a6503186bf3a72880e7c99be76406550aeded96.zip |
Add response_inform/response_headers/response_end events
This covers many scenarios but more need to be added.
Diffstat (limited to 'src')
-rw-r--r-- | src/gun_default_event_h.erl | 12 | ||||
-rw-r--r-- | src/gun_event.erl | 27 | ||||
-rw-r--r-- | src/gun_http.erl | 192 | ||||
-rw-r--r-- | src/gun_http2.erl | 57 |
4 files changed, 207 insertions, 81 deletions
diff --git a/src/gun_default_event_h.erl b/src/gun_default_event_h.erl index a29183d..4156ed7 100644 --- a/src/gun_default_event_h.erl +++ b/src/gun_default_event_h.erl @@ -21,6 +21,9 @@ -export([request_start/2]). -export([request_headers/2]). -export([request_end/2]). +-export([response_inform/2]). +-export([response_headers/2]). +-export([response_end/2]). -export([disconnect/2]). -export([terminate/2]). @@ -42,6 +45,15 @@ request_headers(_EventData, State) -> request_end(_EventData, State) -> State. +response_inform(_EventData, State) -> + State. + +response_headers(_EventData, State) -> + State. + +response_end(_EventData, State) -> + State. + disconnect(_EventData, State) -> State. diff --git a/src/gun_event.erl b/src/gun_event.erl index 56f1a36..8e66400 100644 --- a/src/gun_event.erl +++ b/src/gun_event.erl @@ -68,6 +68,27 @@ -callback request_end(request_end_event(), State) -> State. +%% response_inform/response_headers. + +-type response_headers_event() :: #{ + stream_ref := reference(), + reply_to := pid(), + status := non_neg_integer(), + headers := [{binary(), binary()}] +}. + +-callback response_inform(response_headers_event(), State) -> State. +-callback response_headers(response_headers_event(), State) -> State. + +%% response_end. + +-type response_end_event() :: #{ + stream_ref := reference(), + reply_to := pid() +}. + +-callback response_end(response_end_event(), State) -> State. + %% disconnect. -type disconnect_event() :: #{ @@ -92,10 +113,8 @@ %% @todo origin_changed %% @todo transport_changed %% @todo protocol_changed -%% @todo response_start (call it once per inform + one for the response) -%% @todo response_inform -%% @todo response_headers -%% @todo response_end +%% @todo response_start (needs changes in cow_http2_machine to have an event before decoding headers) +%% @todo response_trailers (same) %% @todo push_promise_start %% @todo push_promise_end %% @todo cancel_start diff --git a/src/gun_http.erl b/src/gun_http.erl index 582106b..7edaf14 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -93,127 +93,158 @@ init(Owner, Socket, Transport, Opts) -> #http_state{owner=Owner, socket=Socket, transport=Transport, version=Version, content_handlers=Handlers, transform_header_name=TransformHeaderName}. -handle(Data, State, _EvHandler, EvHandlerState) -> - {handle(Data, State), EvHandlerState}. - %% Stop looping when we got no more data. -handle(<<>>, State) -> - {state, State}; +handle(<<>>, State, _, EvHandlerState) -> + {{state, State}, EvHandlerState}; %% Close when server responds and we don't have any open streams. -handle(_, #http_state{streams=[]}) -> - close; +handle(_, #http_state{streams=[]}, _, EvHandlerState) -> + {close, EvHandlerState}; %% Wait for the full response headers before trying to parse them. -handle(Data, State=#http_state{in=head, buffer=Buffer}) -> +handle(Data, State=#http_state{in=head, buffer=Buffer}, EvHandler, EvHandlerState) -> Data2 = << Buffer/binary, Data/binary >>, case binary:match(Data2, <<"\r\n\r\n">>) of - nomatch -> {state, State#http_state{buffer=Data2}}; - {_, _} -> handle_head(Data2, State#http_state{buffer= <<>>}) + nomatch -> {{state, State#http_state{buffer=Data2}}, EvHandlerState}; + {_, _} -> handle_head(Data2, State#http_state{buffer= <<>>}, EvHandler, EvHandlerState) end; %% Everything sent to the socket until it closes is part of the response body. -handle(Data, State=#http_state{in=body_close}) -> - {state, send_data_if_alive(Data, State, nofin)}; +handle(Data, State=#http_state{in=body_close}, _, EvHandlerState) -> + {{state, send_data_if_alive(Data, State, nofin)}, EvHandlerState}; %% Chunked transfer-encoding may contain both data and trailers. handle(Data, State=#http_state{in=body_chunked, in_state=InState, - buffer=Buffer, connection=Conn}) -> + buffer=Buffer, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_], + connection=Conn}, EvHandler, EvHandlerState0) -> Buffer2 = << Buffer/binary, Data/binary >>, case cow_http_te:stream_chunked(Buffer2, InState) of more -> - {state, State#http_state{buffer=Buffer2}}; + {{state, State#http_state{buffer=Buffer2}}, EvHandlerState0}; {more, Data2, InState2} -> - {state, send_data_if_alive(Data2, + {{state, send_data_if_alive(Data2, State#http_state{buffer= <<>>, in_state=InState2}, - nofin)}; + nofin)}, EvHandlerState0}; {more, Data2, Length, InState2} when is_integer(Length) -> %% @todo See if we can recv faster than one message at a time. - {state, send_data_if_alive(Data2, + {{state, send_data_if_alive(Data2, State#http_state{buffer= <<>>, in_state=InState2}, - nofin)}; + nofin)}, EvHandlerState0}; {more, Data2, Rest, InState2} -> %% @todo See if we can recv faster than one message at a time. - {state, send_data_if_alive(Data2, + {{state, send_data_if_alive(Data2, State#http_state{buffer=Rest, in_state=InState2}, - nofin)}; + nofin)}, EvHandlerState0}; {done, HasTrailers, Rest} -> - IsFin = case HasTrailers of - trailers -> nofin; - no_trailers -> fin + %% @todo response_end should be called AFTER send_data_if_alive + {IsFin, EvHandlerState} = case HasTrailers of + trailers -> + {nofin, EvHandlerState0}; + no_trailers -> + EvHandlerState1 = EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState0), + {fin, EvHandlerState1} end, %% I suppose it doesn't hurt to append an empty binary. State1 = send_data_if_alive(<<>>, State, IsFin), case {HasTrailers, Conn} of {trailers, _} -> - handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}); + handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}, EvHandler, EvHandlerState); {no_trailers, keepalive} -> - handle(Rest, end_stream(State1#http_state{buffer= <<>>})); + handle(Rest, end_stream(State1#http_state{buffer= <<>>}), EvHandler, EvHandlerState); {no_trailers, close} -> - [{state, end_stream(State1)}, close] + {[{state, end_stream(State1)}, close], EvHandlerState} end; {done, Data2, HasTrailers, Rest} -> - IsFin = case HasTrailers of - trailers -> nofin; - no_trailers -> fin + %% @todo response_end should be called AFTER send_data_if_alive + {IsFin, EvHandlerState} = case HasTrailers of + trailers -> + {nofin, EvHandlerState0}; + no_trailers -> + EvHandlerState1 = EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState0), + {fin, EvHandlerState1} end, State1 = send_data_if_alive(Data2, State, IsFin), case {HasTrailers, Conn} of {trailers, _} -> - handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}); + handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}, EvHandler, EvHandlerState); {no_trailers, keepalive} -> - handle(Rest, end_stream(State1#http_state{buffer= <<>>})); + handle(Rest, end_stream(State1#http_state{buffer= <<>>}), EvHandler, EvHandlerState); {no_trailers, close} -> - [{state, end_stream(State1)}, close] + {[{state, end_stream(State1)}, close], EvHandlerState} end end; handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn, - streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}) -> + streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}, EvHandler, EvHandlerState0) -> Data2 = << Buffer/binary, Data/binary >>, case binary:match(Data2, <<"\r\n\r\n">>) of - nomatch -> {state, State#http_state{buffer=Data2}}; + nomatch -> + {{state, State#http_state{buffer=Data2}}, EvHandlerState0}; {_, _} -> {Trailers, Rest} = cow_http:parse_headers(Data2), %% @todo We probably want to pass this to gun_content_handler? ReplyTo ! {gun_trailers, self(), stream_ref(StreamRef), Trailers}, + EvHandlerState = EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState0), case Conn of keepalive -> - handle(Rest, end_stream(State#http_state{buffer= <<>>})); + handle(Rest, end_stream(State#http_state{buffer= <<>>}), EvHandler, EvHandlerState); close -> - [{state, end_stream(State)}, close] + {[{state, end_stream(State)}, close], EvHandlerState} end end; %% We know the length of the rest of the body. -handle(Data, State=#http_state{in={body, Length}, connection=Conn}) -> +handle(Data, State=#http_state{in={body, Length}, connection=Conn, + streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}, + EvHandler, EvHandlerState0) -> DataSize = byte_size(Data), if %% More data coming. DataSize < Length -> - {state, send_data_if_alive(Data, + {{state, send_data_if_alive(Data, State#http_state{in={body, Length - DataSize}}, - nofin)}; + nofin)}, EvHandlerState0}; %% Stream finished, no rest. DataSize =:= Length -> State1 = send_data_if_alive(Data, State, fin), + EvHandlerState = EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState0), case Conn of - keepalive -> {state, end_stream(State1)}; - close -> [{state, end_stream(State1)}, close] + keepalive -> {{state, end_stream(State1)}, EvHandlerState}; + close -> {[{state, end_stream(State1)}, close], EvHandlerState} end; %% Stream finished, rest. true -> << Body:Length/binary, Rest/bits >> = Data, State1 = send_data_if_alive(Body, State, fin), + EvHandlerState = EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState0), case Conn of - keepalive -> handle(Rest, end_stream(State1)); - close -> [{state, end_stream(State1)}, close] + keepalive -> handle(Rest, end_stream(State1), EvHandler, EvHandlerState); + close -> {[{state, end_stream(State1)}, close], EvHandlerState} end end. handle_head(Data, State=#http_state{socket=Socket, transport=Transport, version=ClientVersion, content_handlers=Handlers0, connection=Conn, streams=[Stream=#stream{ref=StreamRef, reply_to=ReplyTo, - method=Method, is_alive=IsAlive}|Tail]}) -> + method=Method, is_alive=IsAlive}|Tail]}, + EvHandler, EvHandlerState0) -> {Version, Status, _, Rest} = cow_http:parse_status_line(Data), {Headers, Rest2} = cow_http:parse_headers(Rest), case {Status, StreamRef} of {101, {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts}} -> - ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts); + %% @todo Websocket's 101 response_inform. + {ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts), + EvHandlerState0}; + %% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup. {_, {connect, RealStreamRef, Destination}} when Status >= 200, Status < 300 -> case IsAlive of false -> @@ -223,6 +254,13 @@ handle_head(Data, State=#http_state{socket=Socket, transport=Transport, fin, Status, Headers}, ok end, + %% @todo Figure out whether the event should trigger if the stream was cancelled. + EvHandlerState = EvHandler:response_headers(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), %% We expect there to be no additional data after the CONNECT response. <<>> = Rest2, State2 = end_stream(State#http_state{streams=[Stream|Tail]}), @@ -237,9 +275,9 @@ handle_head(Data, State=#http_state{socket=Socket, transport=Transport, %% In this case the switch_protocol is delayed and is handled by %% a message sent from gun_tls_proxy once the connection is established, %% and handled by the gun module directly. - [{state, State2#http_state{socket=ProxyPid, transport=gun_tls_proxy}}, + {[{state, State2#http_state{socket=ProxyPid, transport=gun_tls_proxy}}, {origin, <<"https">>, NewHost, NewPort, connect}, - {switch_transport, gun_tls_proxy, ProxyPid}]; + {switch_transport, gun_tls_proxy, ProxyPid}], EvHandlerState}; #{transport := tls} -> TLSOpts = maps:get(tls_opts, Destination, []), TLSTimeout = maps:get(tls_handshake_timeout, Destination, infinity), @@ -247,46 +285,68 @@ handle_head(Data, State=#http_state{socket=Socket, transport=Transport, {ok, TLSSocket} -> case ssl:negotiated_protocol(TLSSocket) of {ok, <<"h2">>} -> - [{origin, <<"https">>, NewHost, NewPort, connect}, + {[{origin, <<"https">>, NewHost, NewPort, connect}, {switch_transport, gun_tls, TLSSocket}, - {switch_protocol, gun_http2, State2}]; + {switch_protocol, gun_http2, State2}], EvHandlerState}; _ -> - [{state, State2#http_state{socket=TLSSocket, transport=gun_tls}}, + {[{state, State2#http_state{socket=TLSSocket, transport=gun_tls}}, {origin, <<"https">>, NewHost, NewPort, connect}, - {switch_transport, gun_tls, TLSSocket}] + {switch_transport, gun_tls, TLSSocket}], EvHandlerState} end; Error -> - Error + {Error, EvHandlerState} end; _ -> case maps:get(protocols, Destination, [http]) of [http] -> - [{state, State2}, - {origin, <<"http">>, NewHost, NewPort, connect}]; + {[{state, State2}, + {origin, <<"http">>, NewHost, NewPort, connect}], EvHandlerState}; [http2] -> - [{origin, <<"http">>, NewHost, NewPort, connect}, - {switch_protocol, gun_http2, State2}] + {[{origin, <<"http">>, NewHost, NewPort, connect}, + {switch_protocol, gun_http2, State2}], EvHandlerState} end end; {_, _} when Status >= 100, Status =< 199 -> ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers}, - handle(Rest2, State); + EvHandlerState = EvHandler:response_inform(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), + handle(Rest2, State, EvHandler, EvHandlerState); _ -> In = response_io_from_headers(Method, Version, Status, Headers), IsFin = case In of head -> fin; _ -> nofin end, - Handlers = case IsAlive of + %% @todo Figure out whether the event should trigger if the stream was cancelled. + {Handlers, EvHandlerState2} = case IsAlive of false -> - undefined; + {undefined, EvHandlerState0}; true -> ReplyTo ! {gun_response, self(), stream_ref(StreamRef), IsFin, Status, Headers}, + EvHandlerState1 = EvHandler:response_headers(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), case IsFin of - fin -> undefined; + fin -> {undefined, EvHandlerState1}; nofin -> - gun_content_handler:init(ReplyTo, stream_ref(StreamRef), - Status, Headers, Handlers0) + {gun_content_handler:init(ReplyTo, stream_ref(StreamRef), + Status, Headers, Handlers0), EvHandlerState1} end end, + EvHandlerState = case IsFin of + nofin -> + EvHandlerState2; + fin -> + EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState2) + end, Conn2 = if Conn =:= close -> close; Version =:= 'HTTP/1.0' -> close; @@ -296,15 +356,17 @@ handle_head(Data, State=#http_state{socket=Socket, transport=Transport, %% We always reset in_state even if not chunked. if IsFin =:= fin, Conn2 =:= close -> - close; + {close, EvHandlerState}; IsFin =:= fin -> handle(Rest2, end_stream(State#http_state{in=In, in_state={0, 0}, connection=Conn2, - streams=[Stream#stream{handler_state=Handlers}|Tail]})); + streams=[Stream#stream{handler_state=Handlers}|Tail]}), + EvHandler, EvHandlerState); true -> handle(Rest2, State#http_state{in=In, in_state={0, 0}, connection=Conn2, - streams=[Stream#stream{handler_state=Handlers}|Tail]}) + streams=[Stream#stream{handler_state=Handlers}|Tail]}, + EvHandler, EvHandlerState) end end. diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 40afb16..17bedde 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -161,14 +161,21 @@ maybe_ack(State=#http2_state{socket=Socket, transport=Transport}, Frame) -> data_frame(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, StreamID, IsFin, Data, - _EvHandler, EvHandlerState) -> - Stream = #stream{handler_state=Handlers0} = get_stream_by_id(State, StreamID), + EvHandler, EvHandlerState0) -> + Stream = #stream{ref=StreamRef, reply_to=ReplyTo, + handler_state=Handlers0} = get_stream_by_id(State, StreamID), Handlers = gun_content_handler:handle(IsFin, Data, Handlers0), Size = byte_size(Data), - HTTP2Machine = case Size of + {HTTP2Machine, EvHandlerState} = case Size of %% We do not send a WINDOW_UPDATE if the DATA frame was of size 0. + 0 when IsFin =:= fin -> + EvHandlerState1 = EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState0), + {HTTP2Machine0, EvHandlerState1}; 0 -> - HTTP2Machine0; + {HTTP2Machine0, EvHandlerState0}; _ -> Transport:send(Socket, cow_http2:window_update(Size)), HTTP2Machine1 = cow_http2_machine:update_window(Size, HTTP2Machine0), @@ -176,9 +183,14 @@ data_frame(State=#http2_state{socket=Socket, transport=Transport, case IsFin of nofin -> Transport:send(Socket, cow_http2:window_update(StreamID, Size)), - cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1); + {cow_http2_machine:update_window(StreamID, Size, HTTP2Machine1), + EvHandlerState0}; fin -> - HTTP2Machine1 + EvHandlerState1 = EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState0), + {HTTP2Machine1, EvHandlerState1} end end, {maybe_delete_stream(store_stream(State#http2_state{http2_machine=HTTP2Machine}, @@ -187,29 +199,50 @@ data_frame(State=#http2_state{socket=Socket, transport=Transport, headers_frame(State=#http2_state{content_handlers=Handlers0}, StreamID, IsFin, Headers, PseudoHeaders, _BodyLen, - _EvHandler, EvHandlerState) -> + EvHandler, EvHandlerState0) -> Stream = #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), case PseudoHeaders of #{status := Status} when Status >= 100, Status =< 199 -> ReplyTo ! {gun_inform, self(), StreamRef, Status, Headers}, + EvHandlerState = EvHandler:response_inform(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), {State, EvHandlerState}; #{status := Status} -> ReplyTo ! {gun_response, self(), StreamRef, IsFin, Status, Headers}, - Handlers = case IsFin of - fin -> undefined; + EvHandlerState1 = EvHandler:response_headers(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), + {Handlers, EvHandlerState} = case IsFin of + fin -> + EvHandlerState2 = EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState1), + {undefined, EvHandlerState2}; nofin -> - gun_content_handler:init(ReplyTo, StreamRef, - Status, Headers, Handlers0) + {gun_content_handler:init(ReplyTo, StreamRef, + Status, Headers, Handlers0), EvHandlerState1} end, {maybe_delete_stream(store_stream(State, Stream#stream{handler_state=Handlers}), StreamID, remote, IsFin), EvHandlerState} end. -trailers_frame(State, StreamID, Trailers, _EvHandler, EvHandlerState) -> +trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> #stream{ref=StreamRef, reply_to=ReplyTo} = get_stream_by_id(State, StreamID), %% @todo We probably want to pass this to gun_content_handler? ReplyTo ! {gun_trailers, self(), StreamRef, Trailers}, + EvHandlerState = EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState0), {maybe_delete_stream(State, StreamID, remote, fin), EvHandlerState}. rst_stream_frame(State=#http2_state{streams=Streams0}, StreamID, Reason) -> |