From 4a6503186bf3a72880e7c99be76406550aeded96 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Tue, 2 Jul 2019 17:28:44 +0200 Subject: Add response_inform/response_headers/response_end events This covers many scenarios but more need to be added. --- src/gun_http.erl | 192 ++++++++++++++++++++++++++++++++++++------------------- 1 file changed, 127 insertions(+), 65 deletions(-) (limited to 'src/gun_http.erl') diff --git a/src/gun_http.erl b/src/gun_http.erl index 582106b..7edaf14 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -93,127 +93,158 @@ init(Owner, Socket, Transport, Opts) -> #http_state{owner=Owner, socket=Socket, transport=Transport, version=Version, content_handlers=Handlers, transform_header_name=TransformHeaderName}. -handle(Data, State, _EvHandler, EvHandlerState) -> - {handle(Data, State), EvHandlerState}. - %% Stop looping when we got no more data. -handle(<<>>, State) -> - {state, State}; +handle(<<>>, State, _, EvHandlerState) -> + {{state, State}, EvHandlerState}; %% Close when server responds and we don't have any open streams. -handle(_, #http_state{streams=[]}) -> - close; +handle(_, #http_state{streams=[]}, _, EvHandlerState) -> + {close, EvHandlerState}; %% Wait for the full response headers before trying to parse them. -handle(Data, State=#http_state{in=head, buffer=Buffer}) -> +handle(Data, State=#http_state{in=head, buffer=Buffer}, EvHandler, EvHandlerState) -> Data2 = << Buffer/binary, Data/binary >>, case binary:match(Data2, <<"\r\n\r\n">>) of - nomatch -> {state, State#http_state{buffer=Data2}}; - {_, _} -> handle_head(Data2, State#http_state{buffer= <<>>}) + nomatch -> {{state, State#http_state{buffer=Data2}}, EvHandlerState}; + {_, _} -> handle_head(Data2, State#http_state{buffer= <<>>}, EvHandler, EvHandlerState) end; %% Everything sent to the socket until it closes is part of the response body. -handle(Data, State=#http_state{in=body_close}) -> - {state, send_data_if_alive(Data, State, nofin)}; +handle(Data, State=#http_state{in=body_close}, _, EvHandlerState) -> + {{state, send_data_if_alive(Data, State, nofin)}, EvHandlerState}; %% Chunked transfer-encoding may contain both data and trailers. handle(Data, State=#http_state{in=body_chunked, in_state=InState, - buffer=Buffer, connection=Conn}) -> + buffer=Buffer, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_], + connection=Conn}, EvHandler, EvHandlerState0) -> Buffer2 = << Buffer/binary, Data/binary >>, case cow_http_te:stream_chunked(Buffer2, InState) of more -> - {state, State#http_state{buffer=Buffer2}}; + {{state, State#http_state{buffer=Buffer2}}, EvHandlerState0}; {more, Data2, InState2} -> - {state, send_data_if_alive(Data2, + {{state, send_data_if_alive(Data2, State#http_state{buffer= <<>>, in_state=InState2}, - nofin)}; + nofin)}, EvHandlerState0}; {more, Data2, Length, InState2} when is_integer(Length) -> %% @todo See if we can recv faster than one message at a time. - {state, send_data_if_alive(Data2, + {{state, send_data_if_alive(Data2, State#http_state{buffer= <<>>, in_state=InState2}, - nofin)}; + nofin)}, EvHandlerState0}; {more, Data2, Rest, InState2} -> %% @todo See if we can recv faster than one message at a time. - {state, send_data_if_alive(Data2, + {{state, send_data_if_alive(Data2, State#http_state{buffer=Rest, in_state=InState2}, - nofin)}; + nofin)}, EvHandlerState0}; {done, HasTrailers, Rest} -> - IsFin = case HasTrailers of - trailers -> nofin; - no_trailers -> fin + %% @todo response_end should be called AFTER send_data_if_alive + {IsFin, EvHandlerState} = case HasTrailers of + trailers -> + {nofin, EvHandlerState0}; + no_trailers -> + EvHandlerState1 = EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState0), + {fin, EvHandlerState1} end, %% I suppose it doesn't hurt to append an empty binary. State1 = send_data_if_alive(<<>>, State, IsFin), case {HasTrailers, Conn} of {trailers, _} -> - handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}); + handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}, EvHandler, EvHandlerState); {no_trailers, keepalive} -> - handle(Rest, end_stream(State1#http_state{buffer= <<>>})); + handle(Rest, end_stream(State1#http_state{buffer= <<>>}), EvHandler, EvHandlerState); {no_trailers, close} -> - [{state, end_stream(State1)}, close] + {[{state, end_stream(State1)}, close], EvHandlerState} end; {done, Data2, HasTrailers, Rest} -> - IsFin = case HasTrailers of - trailers -> nofin; - no_trailers -> fin + %% @todo response_end should be called AFTER send_data_if_alive + {IsFin, EvHandlerState} = case HasTrailers of + trailers -> + {nofin, EvHandlerState0}; + no_trailers -> + EvHandlerState1 = EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState0), + {fin, EvHandlerState1} end, State1 = send_data_if_alive(Data2, State, IsFin), case {HasTrailers, Conn} of {trailers, _} -> - handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}); + handle(Rest, State1#http_state{buffer = <<>>, in=body_trailer}, EvHandler, EvHandlerState); {no_trailers, keepalive} -> - handle(Rest, end_stream(State1#http_state{buffer= <<>>})); + handle(Rest, end_stream(State1#http_state{buffer= <<>>}), EvHandler, EvHandlerState); {no_trailers, close} -> - [{state, end_stream(State1)}, close] + {[{state, end_stream(State1)}, close], EvHandlerState} end end; handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn, - streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}) -> + streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}, EvHandler, EvHandlerState0) -> Data2 = << Buffer/binary, Data/binary >>, case binary:match(Data2, <<"\r\n\r\n">>) of - nomatch -> {state, State#http_state{buffer=Data2}}; + nomatch -> + {{state, State#http_state{buffer=Data2}}, EvHandlerState0}; {_, _} -> {Trailers, Rest} = cow_http:parse_headers(Data2), %% @todo We probably want to pass this to gun_content_handler? ReplyTo ! {gun_trailers, self(), stream_ref(StreamRef), Trailers}, + EvHandlerState = EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState0), case Conn of keepalive -> - handle(Rest, end_stream(State#http_state{buffer= <<>>})); + handle(Rest, end_stream(State#http_state{buffer= <<>>}), EvHandler, EvHandlerState); close -> - [{state, end_stream(State)}, close] + {[{state, end_stream(State)}, close], EvHandlerState} end end; %% We know the length of the rest of the body. -handle(Data, State=#http_state{in={body, Length}, connection=Conn}) -> +handle(Data, State=#http_state{in={body, Length}, connection=Conn, + streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}, + EvHandler, EvHandlerState0) -> DataSize = byte_size(Data), if %% More data coming. DataSize < Length -> - {state, send_data_if_alive(Data, + {{state, send_data_if_alive(Data, State#http_state{in={body, Length - DataSize}}, - nofin)}; + nofin)}, EvHandlerState0}; %% Stream finished, no rest. DataSize =:= Length -> State1 = send_data_if_alive(Data, State, fin), + EvHandlerState = EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState0), case Conn of - keepalive -> {state, end_stream(State1)}; - close -> [{state, end_stream(State1)}, close] + keepalive -> {{state, end_stream(State1)}, EvHandlerState}; + close -> {[{state, end_stream(State1)}, close], EvHandlerState} end; %% Stream finished, rest. true -> << Body:Length/binary, Rest/bits >> = Data, State1 = send_data_if_alive(Body, State, fin), + EvHandlerState = EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState0), case Conn of - keepalive -> handle(Rest, end_stream(State1)); - close -> [{state, end_stream(State1)}, close] + keepalive -> handle(Rest, end_stream(State1), EvHandler, EvHandlerState); + close -> {[{state, end_stream(State1)}, close], EvHandlerState} end end. handle_head(Data, State=#http_state{socket=Socket, transport=Transport, version=ClientVersion, content_handlers=Handlers0, connection=Conn, streams=[Stream=#stream{ref=StreamRef, reply_to=ReplyTo, - method=Method, is_alive=IsAlive}|Tail]}) -> + method=Method, is_alive=IsAlive}|Tail]}, + EvHandler, EvHandlerState0) -> {Version, Status, _, Rest} = cow_http:parse_status_line(Data), {Headers, Rest2} = cow_http:parse_headers(Rest), case {Status, StreamRef} of {101, {websocket, RealStreamRef, WsKey, WsExtensions, WsOpts}} -> - ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts); + %% @todo Websocket's 101 response_inform. + {ws_handshake(Rest2, State, RealStreamRef, Headers, WsKey, WsExtensions, WsOpts), + EvHandlerState0}; + %% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup. {_, {connect, RealStreamRef, Destination}} when Status >= 200, Status < 300 -> case IsAlive of false -> @@ -223,6 +254,13 @@ handle_head(Data, State=#http_state{socket=Socket, transport=Transport, fin, Status, Headers}, ok end, + %% @todo Figure out whether the event should trigger if the stream was cancelled. + EvHandlerState = EvHandler:response_headers(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), %% We expect there to be no additional data after the CONNECT response. <<>> = Rest2, State2 = end_stream(State#http_state{streams=[Stream|Tail]}), @@ -237,9 +275,9 @@ handle_head(Data, State=#http_state{socket=Socket, transport=Transport, %% In this case the switch_protocol is delayed and is handled by %% a message sent from gun_tls_proxy once the connection is established, %% and handled by the gun module directly. - [{state, State2#http_state{socket=ProxyPid, transport=gun_tls_proxy}}, + {[{state, State2#http_state{socket=ProxyPid, transport=gun_tls_proxy}}, {origin, <<"https">>, NewHost, NewPort, connect}, - {switch_transport, gun_tls_proxy, ProxyPid}]; + {switch_transport, gun_tls_proxy, ProxyPid}], EvHandlerState}; #{transport := tls} -> TLSOpts = maps:get(tls_opts, Destination, []), TLSTimeout = maps:get(tls_handshake_timeout, Destination, infinity), @@ -247,46 +285,68 @@ handle_head(Data, State=#http_state{socket=Socket, transport=Transport, {ok, TLSSocket} -> case ssl:negotiated_protocol(TLSSocket) of {ok, <<"h2">>} -> - [{origin, <<"https">>, NewHost, NewPort, connect}, + {[{origin, <<"https">>, NewHost, NewPort, connect}, {switch_transport, gun_tls, TLSSocket}, - {switch_protocol, gun_http2, State2}]; + {switch_protocol, gun_http2, State2}], EvHandlerState}; _ -> - [{state, State2#http_state{socket=TLSSocket, transport=gun_tls}}, + {[{state, State2#http_state{socket=TLSSocket, transport=gun_tls}}, {origin, <<"https">>, NewHost, NewPort, connect}, - {switch_transport, gun_tls, TLSSocket}] + {switch_transport, gun_tls, TLSSocket}], EvHandlerState} end; Error -> - Error + {Error, EvHandlerState} end; _ -> case maps:get(protocols, Destination, [http]) of [http] -> - [{state, State2}, - {origin, <<"http">>, NewHost, NewPort, connect}]; + {[{state, State2}, + {origin, <<"http">>, NewHost, NewPort, connect}], EvHandlerState}; [http2] -> - [{origin, <<"http">>, NewHost, NewPort, connect}, - {switch_protocol, gun_http2, State2}] + {[{origin, <<"http">>, NewHost, NewPort, connect}, + {switch_protocol, gun_http2, State2}], EvHandlerState} end end; {_, _} when Status >= 100, Status =< 199 -> ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers}, - handle(Rest2, State); + EvHandlerState = EvHandler:response_inform(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), + handle(Rest2, State, EvHandler, EvHandlerState); _ -> In = response_io_from_headers(Method, Version, Status, Headers), IsFin = case In of head -> fin; _ -> nofin end, - Handlers = case IsAlive of + %% @todo Figure out whether the event should trigger if the stream was cancelled. + {Handlers, EvHandlerState2} = case IsAlive of false -> - undefined; + {undefined, EvHandlerState0}; true -> ReplyTo ! {gun_response, self(), stream_ref(StreamRef), IsFin, Status, Headers}, + EvHandlerState1 = EvHandler:response_headers(#{ + stream_ref => StreamRef, + reply_to => ReplyTo, + status => Status, + headers => Headers + }, EvHandlerState0), case IsFin of - fin -> undefined; + fin -> {undefined, EvHandlerState1}; nofin -> - gun_content_handler:init(ReplyTo, stream_ref(StreamRef), - Status, Headers, Handlers0) + {gun_content_handler:init(ReplyTo, stream_ref(StreamRef), + Status, Headers, Handlers0), EvHandlerState1} end end, + EvHandlerState = case IsFin of + nofin -> + EvHandlerState2; + fin -> + EvHandler:response_end(#{ + stream_ref => StreamRef, + reply_to => ReplyTo + }, EvHandlerState2) + end, Conn2 = if Conn =:= close -> close; Version =:= 'HTTP/1.0' -> close; @@ -296,15 +356,17 @@ handle_head(Data, State=#http_state{socket=Socket, transport=Transport, %% We always reset in_state even if not chunked. if IsFin =:= fin, Conn2 =:= close -> - close; + {close, EvHandlerState}; IsFin =:= fin -> handle(Rest2, end_stream(State#http_state{in=In, in_state={0, 0}, connection=Conn2, - streams=[Stream#stream{handler_state=Handlers}|Tail]})); + streams=[Stream#stream{handler_state=Handlers}|Tail]}), + EvHandler, EvHandlerState); true -> handle(Rest2, State#http_state{in=In, in_state={0, 0}, connection=Conn2, - streams=[Stream#stream{handler_state=Handlers}|Tail]}) + streams=[Stream#stream{handler_state=Handlers}|Tail]}, + EvHandler, EvHandlerState) end end. -- cgit v1.2.3