diff options
Diffstat (limited to 'src/gun_http.erl')
-rw-r--r-- | src/gun_http.erl | 81 |
1 files changed, 54 insertions, 27 deletions
diff --git a/src/gun_http.erl b/src/gun_http.erl index e2c0f1d..d877068 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -29,7 +29,7 @@ -export([headers/11]). -export([request/12]). -export([data/7]). --export([connect/7]). +-export([connect/9]). -export([cancel/5]). -export([stream_info/2]). -export([down/1]). @@ -195,7 +195,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState, {nofin, EvHandlerState0}; no_trailers -> EvHandlerState1 = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState0), {fin, EvHandlerState1} @@ -218,7 +218,7 @@ handle(Data, State=#http_state{in=body_chunked, in_state=InState, {nofin, EvHandlerState0}; no_trailers -> EvHandlerState1 = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState0), {fin, EvHandlerState1} @@ -243,9 +243,10 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn, {_, _} -> {Trailers, Rest} = cow_http:parse_headers(Data2), %% @todo We probably want to pass this to gun_content_handler? - ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers}, + RealStreamRef = stream_ref(State, StreamRef), + ReplyTo ! {gun_trailers, self(), RealStreamRef, Trailers}, ResponseEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, EvHandlerState1 = EvHandler:response_trailers(ResponseEvent#{headers => Trailers}, EvHandlerState0), @@ -272,7 +273,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn, %% We ignore the active command because the stream ended. [{state, State1}|_] = send_data(Data, State, fin), EvHandlerState = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState0), case Conn of @@ -287,7 +288,7 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn, %% We ignore the active command because the stream ended. [{state, State1}|_] = send_data(Body, State, fin), EvHandlerState = EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State1, StreamRef), reply_to => ReplyTo }, EvHandlerState0), case Conn of @@ -396,15 +397,15 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec EvHandler, EvHandlerState0, Version, Status, Headers) -> In = response_io_from_headers(Method, Version, Status, Headers), IsFin = case In of head -> fin; _ -> nofin end, + RealStreamRef = stream_ref(State, StreamRef), %% @todo Figure out whether the event should trigger if the stream was cancelled. {Handlers, EvHandlerState2} = case IsAlive of false -> {undefined, EvHandlerState0}; true -> - ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), - IsFin, Status, Headers}, + ReplyTo ! {gun_response, self(), RealStreamRef, IsFin, Status, Headers}, EvHandlerState1 = EvHandler:response_headers(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, status => Status, headers => Headers @@ -413,7 +414,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec fin -> {undefined, EvHandlerState1}; nofin -> Handlers0 = maps:get(content_handlers, Opts, [gun_data_h]), - {gun_content_handler:init(ReplyTo, stream_ref(State, StreamRef), + {gun_content_handler:init(ReplyTo, RealStreamRef, Status, Headers, Handlers0), EvHandlerState1} end end, @@ -422,7 +423,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec EvHandlerState2; fin -> EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, EvHandlerState2) end, @@ -509,7 +510,7 @@ close(Reason, State=#http_state{in=body_close, close_streams(State, Tail, close_reason(Reason)), _ = send_data(<<>>, State, fin), EvHandler:response_end(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState); close(Reason, State=#http_state{streams=Streams}, _, EvHandlerState) -> @@ -601,8 +602,9 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi {undefined, _} -> Headers4; _ -> [{<<"content-length">>, integer_to_binary(iolist_size(Body))}|Headers4] end, + RealStreamRef = stream_ref(State, StreamRef), RequestEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo, function => Function, method => Method, @@ -618,7 +620,7 @@ send_request(State=#http_state{socket=Socket, transport=Transport, version=Versi EvHandlerState = case Out of head -> RequestEndEvent = #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, reply_to => ReplyTo }, EvHandler:request_end(RequestEndEvent, EvHandlerState2); @@ -672,7 +674,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, ]) end, RequestEndEvent = #{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0), @@ -686,7 +688,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, if Length2 =:= 0, IsFin =:= fin -> RequestEndEvent = #{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState0), @@ -702,17 +704,19 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} end. -connect(State, StreamRef, ReplyTo, _, _, _, _) +connect(State, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState) when is_list(StreamRef) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream is not a tunnel."}}, - State; -connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _) when Streams =/= [] -> + {State, EvHandlerState}; +connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _, _, EvHandlerState) + when Streams =/= [] -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}, - State; + {State, EvHandlerState}; connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version}, - StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0) -> + StreamRef, ReplyTo, Destination=#{host := Host0}, _TunnelInfo, Headers0, InitialFlow0, + EvHandler, EvHandlerState0) -> Host = case Host0 of Tuple when is_tuple(Tuple) -> inet:ntoa(Tuple); _ -> Host0 @@ -736,12 +740,29 @@ connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version Headers2 end, Headers = transform_header_names(State, Headers3), + RealStreamRef = stream_ref(State, StreamRef), + RequestEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo, + function => connect, + method => <<"CONNECT">>, + authority => Authority, + headers => Headers + }, + EvHandlerState1 = EvHandler:request_start(RequestEvent, EvHandlerState0), Transport:send(Socket, [ cow_http:request(<<"CONNECT">>, Authority, Version, Headers) ]), + EvHandlerState2 = EvHandler:request_headers(RequestEvent, EvHandlerState1), + RequestEndEvent = #{ + stream_ref => RealStreamRef, + reply_to => ReplyTo + }, + EvHandlerState = EvHandler:request_end(RequestEndEvent, EvHandlerState2), InitialFlow = initial_flow(InitialFlow0, Opts), - new_stream(State, {connect, StreamRef, Destination}, ReplyTo, - <<"CONNECT">>, Authority, <<>>, InitialFlow). + {new_stream(State, {connect, StreamRef, Destination}, ReplyTo, + <<"CONNECT">>, Authority, <<>>, InitialFlow), + EvHandlerState}. %% We can't cancel anything, we can just stop forwarding messages to the owner. cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> @@ -749,7 +770,7 @@ cancel(State0, StreamRef, ReplyTo, EvHandler, EvHandlerState0) -> true -> State = cancel_stream(State0, StreamRef), EvHandlerState = EvHandler:cancel(#{ - stream_ref => StreamRef, + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, endpoint => local, reason => cancel @@ -876,6 +897,11 @@ end_stream(State=#http_state{streams=[_|Tail]}) -> %% Websocket upgrade. +ws_upgrade(State, StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) + when is_list(StreamRef) -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), + {badstate, "The stream is not a tunnel."}}, + {State, EvHandlerState}; ws_upgrade(State=#http_state{version='HTTP/1.0'}, StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) -> ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, @@ -986,9 +1012,10 @@ ws_handshake_end(Buffer, self() ! {OK, Socket, Buffer} end, %% Inform the user that the upgrade was successful and switch the protocol. - ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), [<<"websocket">>], Headers}, + RealStreamRef = stream_ref(State, StreamRef), + ReplyTo ! {gun_upgrade, self(), RealStreamRef, [<<"websocket">>], Headers}, {switch_protocol, {ws, #{ - stream_ref => StreamRef, + stream_ref => RealStreamRef, headers => Headers, extensions => Extensions, flow => InitialFlow, |