aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_http.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2020-10-16 11:33:31 +0200
committerLoïc Hoguin <[email protected]>2020-10-16 11:33:31 +0200
commit356bf47edeb5b78765200e78d9b7a48aa98b97f5 (patch)
tree83c35cbb5e7120bd1d1e0a5693571f8b18c088d7 /src/gun_http.erl
parentf2e8d103dd7827251fa726c42e307e42cef8a3dc (diff)
downloadgun-356bf47edeb5b78765200e78d9b7a48aa98b97f5.tar.gz
gun-356bf47edeb5b78765200e78d9b7a48aa98b97f5.tar.bz2
gun-356bf47edeb5b78765200e78d9b7a48aa98b97f5.zip
Add or fix events inside or related to CONNECT tunnels
Diffstat (limited to 'src/gun_http.erl')
-rw-r--r--src/gun_http.erl81
1 files changed, 54 insertions, 27 deletions
diff --git a/src/gun_http.erl b/src/gun_http.erl
index e2c0f1d..d877068 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -29,7 +29,7 @@
-export([headers/11]).
-export([request/12]).
-export([data/7]).
--export([connect/7]).
+-export([connect/9]).
-export([cancel/5]).
-export([stream_info/2]).
-export([down/1]).
@@ -195,7 +195,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState,
{nofin, EvHandlerState0};
no_trailers ->
EvHandlerState1 = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
{fin, EvHandlerState1}
@@ -218,7 +218,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState,
{nofin, EvHandlerState0};
no_trailers ->
EvHandlerState1 = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
{fin, EvHandlerState1}
@@ -243,9 +243,10 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn,
{_, _} ->
{Trailers, Rest} = cow_http:parse_headers(Data2),
%% @todo We probably want to pass this to gun_content_handler?
- ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers},
+ RealStreamRef = stream_ref(State, StreamRef),
+ ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers},
ResponseEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
},
EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0),
@@ -272,7 +273,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn,
%% We ignore the active command because the stream ended.
[{state, State1}|_] = send_data(Data, State, fin),
EvHandlerState = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
case Conn of
@@ -287,7 +288,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn,
%% We ignore the active command because the stream ended.
[{state, State1}|_] = send_data(Body, State, fin),
EvHandlerState = EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State1, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0),
case Conn of
@@ -396,15 +397,15 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
EvHandler, EvHandlerState0, Version, Status, Headers) ->
In = response_io_from_headers(Method, Version, Status, Headers),
IsFin = case In of head -> fin; _ -> nofin end,
+ RealStreamRef = stream_ref(State, StreamRef),
%% @todo Figure out whether the event should trigger if the stream was cancelled.
{Handlers, EvHandlerState2} = case IsAlive of
false ->
{undefined, EvHandlerState0};
true ->
- ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef),
- IsFin, Status, Headers},
+ ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers},
EvHandlerState1 = EvHandler:response_headers(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
status => Status,
headers => Headers
@@ -413,7 +414,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
fin -> {undefined, EvHandlerState1};
nofin ->
Handlers0 = maps:get(content_handlers, Opts, [gun_data_h]),
- {gun_content_handler:init(ReplyTo, stream_ref(State, StreamRef),
+ {gun_content_handler:init(ReplyTo, RealStreamRef,
Status, Headers, Handlers0), EvHandlerState1}
end
end,
@@ -422,7 +423,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
EvHandlerState2;
fin ->
EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
}, EvHandlerState2)
end,
@@ -509,7 +510,7 @@ close(Reason, State=#http_state{in=body_close,
close_streams(State, Tail, close_reason(Reason)),
_ = send_data(<<>>, State, fin),
EvHandler:response_end(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState);
close(Reason, State=#http_state{streams=Streams}, _, EvHandlerState) ->
@@ -601,8 +602,9 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi
{undefined, _} -> Headers4;
_ -> [{<<"content-length">>, integer_to_binary(iolist_size(Body))}|Headers4]
end,
+ RealStreamRef = stream_ref(State, StreamRef),
RequestEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo,
function => Function,
method => Method,
@@ -618,7 +620,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi
EvHandlerState = case Out of
head ->
RequestEndEvent = #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
reply_to => ReplyTo
},
EvHandler:request_end(RequestEndEvent, EvHandlerState2);
@@ -672,7 +674,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
])
end,
RequestEndEvent = #{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
@@ -686,7 +688,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
if
Length2 =:= 0, IsFin =:= fin ->
RequestEndEvent = #{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
},
EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0),
@@ -702,17 +704,19 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0}
end.
-connect(State, StreamRef, ReplyTo, _, _, _, _)
+connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
when is_list(StreamRef) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{badstate, "The stream is not a tunnel."}},
- State;
-connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _) when Streams =/= [] ->
+ {State, EvHandlerState};
+connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState)
+ when Streams =/= [] ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
- State;
+ {State, EvHandlerState};
connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version},
- StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0) ->
+ StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0,
+ EvHandler, EvHandlerState0) ->
Host = case Host0 of
Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple);
_ -> Host0
@@ -736,12 +740,29 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version
Headers2
end,
Headers = transform_header_names(State, Headers3),
+ RealStreamRef = stream_ref(State, StreamRef),
+ RequestEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo,
+ function => connect,
+ method => <<"CONNECT">>,
+ authority => Authority,
+ headers => Headers
+ },
+ EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0),
Transport:send(Socket, [
cow_http:request(<<"CONNECT">>, Authority, Version, Headers)
]),
+ EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1),
+ RequestEndEvent = #{
+ stream_ref => RealStreamRef,
+ reply_to => ReplyTo
+ },
+ EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2),
InitialFlow = initial_flow(InitialFlow0, Opts),
- new_stream(State, {connect, StreamRef, Destination}, ReplyTo,
- <<"CONNECT">>, Authority, <<>>, InitialFlow).
+ {new_stream(State, {connect, StreamRef, Destination}, ReplyTo,
+ <<"CONNECT">>, Authority, <<>>, InitialFlow),
+ EvHandlerState}.
%% We can't cancel anything, we can just stop forwarding messages to the owner.
cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
@@ -749,7 +770,7 @@ cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) ->
true ->
State = cancel_stream(State0, StreamRef),
EvHandlerState = EvHandler:cancel(#{
- stream_ref => StreamRef,
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
endpoint => local,
reason => cancel
@@ -876,6 +897,11 @@ end_stream(State=#http_state{streams=[_|Tail]}) ->
%% Websocket upgrade.
+ws_upgrade(State, StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState)
+ when is_list(StreamRef) ->
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
+ {badstate, "The stream is not a tunnel."}},
+ {State, EvHandlerState};
ws_upgrade(State=#http_state{version='HTTP/1.0'},
StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
@@ -986,9 +1012,10 @@ ws_handshake_end(Buffer,
self() ! {OK, Socket, Buffer}
end,
%% Inform the user that the upgrade was successful and switch the protocol.
- ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), [<<"websocket">>], Headers},
+ RealStreamRef = stream_ref(State, StreamRef),
+ ReplyTo ! {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers},
{switch_protocol, {ws, #{
- stream_ref => StreamRef,
+ stream_ref => RealStreamRef,
headers => Headers,
extensions => Extensions,
flow => InitialFlow,