diff options
Diffstat (limited to 'src/gun_http.erl')
-rw-r--r-- | src/gun_http.erl | 76 |
1 files changed, 45 insertions, 31 deletions
diff --git a/src/gun_http.erl b/src/gun_http.erl index aae47cf..982df82 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -72,6 +72,11 @@ version = 'HTTP/1.1' :: cow_http:version(), connection = keepalive :: keepalive | close, buffer = <<>> :: binary(), + + %% Base stream ref, defined when the protocol runs + %% inside an HTTP/2 CONNECT stream. + base_stream_ref = undefined :: undefined | reference() | [reference()], + streams = [] :: [#stream{}], in = head :: io(), in_state = {0, 0} :: {non_neg_integer(), non_neg_integer()}, @@ -114,9 +119,10 @@ has_keepalive() -> true. default_keepalive() -> infinity. init(_ReplyTo, Socket, Transport, Opts) -> + BaseStreamRef = maps:get(stream_ref, Opts, undefined), Version = maps:get(version, Opts, 'HTTP/1.1'), {connected, #http_state{socket=Socket, transport=Transport, - opts=Opts, version=Version}}. + opts=Opts, version=Version, base_stream_ref=BaseStreamRef}}. switch_transport(Transport, Socket, State) -> State#http_state{socket=Socket, transport=Transport}. @@ -149,7 +155,7 @@ handle(Data, State=#http_state{in=head, buffer=Buffer, EvHandlerState = case Buffer of <<>> -> EvHandler:response_start(#{ - stream_ref => stream_ref(StreamRef), + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo }, EvHandlerState0); _ -> @@ -237,7 +243,7 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn, {_, _} -> {Trailers, Rest} = cow_http:parse_headers(Data2), %% @todo We probably want to pass this to gun_content_handler? - ReplyTo ! {gun_trailers, self(), stream_ref(StreamRef), Trailers}, + ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers}, ResponseEvent = #{ stream_ref => StreamRef, reply_to => ReplyTo @@ -310,7 +316,7 @@ handle_connect(Rest, State=#http_state{ %% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup. _ = case Stream of #stream{is_alive=false} -> ok; - _ -> ReplyTo ! {gun_response, self(), StreamRef, fin, Status, Headers} + _ -> ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), fin, Status, Headers} end, %% @todo Figure out whether the event should trigger if the stream was cancelled. EvHandlerState1 = EvHandler:response_headers(#{ @@ -351,7 +357,7 @@ handle_inform(Rest, State=#http_state{ streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]}, EvHandler, EvHandlerState0, Version, Status, Headers) -> EvHandlerState = EvHandler:response_inform(#{ - stream_ref => stream_ref(StreamRef), + stream_ref => stream_ref(State, StreamRef), reply_to => ReplyTo, status => Status, headers => Headers @@ -367,16 +373,16 @@ handle_inform(Rest, State=#http_state{ %% @todo We shouldn't ignore Rest. {_, Upgrade0} = lists:keyfind(<<"upgrade">>, 1, Headers), Upgrade = cow_http_hd:parse_upgrade(Upgrade0), - ReplyTo ! {gun_upgrade, self(), StreamRef, Upgrade, Headers}, + ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), Upgrade, Headers}, {handle_ret({switch_protocol, raw, ReplyTo}, State), EvHandlerState0} catch _:_ -> %% When the Upgrade header is missing or invalid we treat %% the response as any other informational response. - ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers}, + ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers}, handle(Rest, State, EvHandler, EvHandlerState) end; _ -> - ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers}, + ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers}, handle(Rest, State, EvHandler, EvHandlerState) end. @@ -390,7 +396,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec false -> {undefined, EvHandlerState0}; true -> - ReplyTo ! {gun_response, self(), stream_ref(StreamRef), + ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), IsFin, Status, Headers}, EvHandlerState1 = EvHandler:response_headers(#{ stream_ref => StreamRef, @@ -402,7 +408,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec fin -> {undefined, EvHandlerState1}; nofin -> Handlers0 = maps:get(content_handlers, Opts, [gun_data_h]), - {gun_content_handler:init(ReplyTo, stream_ref(StreamRef), + {gun_content_handler:init(ReplyTo, stream_ref(State, StreamRef), Status, Headers, Handlers0), EvHandlerState1} end end, @@ -437,10 +443,6 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec EvHandler, EvHandlerState) end. -stream_ref({connect, StreamRef, _}) -> StreamRef; -stream_ref(#websocket{ref=StreamRef}) -> StreamRef; -stream_ref(StreamRef) -> StreamRef. - %% The state must be first in order to retrieve it when the stream ended. send_data(<<>>, State, nofin) -> [{state, State}, {active, true}]; @@ -485,7 +487,7 @@ closing(_, #http_state{streams=[]}, _, EvHandlerState) -> %% Otherwise we set connection: close (even if the header was not sent) %% and close any pipelined streams, only keeping the active stream. closing(Reason, State=#http_state{streams=[LastStream|Tail]}, _, EvHandlerState) -> - close_streams(Tail, {closing, Reason}), + close_streams(State, Tail, {closing, Reason}), {[ {state, State#http_state{connection=close, streams=[LastStream]}}, closing(State) @@ -499,27 +501,27 @@ close(Reason, State=#http_state{in=body_close, streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]}, EvHandler, EvHandlerState) -> %% We may have more than one stream in case we somehow close abruptly. - close_streams(Tail, close_reason(Reason)), + close_streams(State, Tail, close_reason(Reason)), _ = send_data(<<>>, State, fin), EvHandler:response_end(#{ stream_ref => StreamRef, reply_to => ReplyTo }, EvHandlerState); -close(Reason, #http_state{streams=Streams}, _, EvHandlerState) -> - close_streams(Streams, close_reason(Reason)), +close(Reason, State=#http_state{streams=Streams}, _, EvHandlerState) -> + close_streams(State, Streams, close_reason(Reason)), EvHandlerState. close_reason(closed) -> closed; close_reason(Reason) -> {closed, Reason}. %% @todo Do we want an event for this? -close_streams([], _) -> +close_streams(_, [], _) -> ok; -close_streams([#stream{is_alive=false}|Tail], Reason) -> - close_streams(Tail, Reason); -close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) -> - ReplyTo ! {gun_error, self(), StreamRef, Reason}, - close_streams(Tail, Reason). +close_streams(State, [#stream{is_alive=false}|Tail], Reason) -> + close_streams(State, Tail, Reason); +close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) -> + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason}, + close_streams(State, Tail, Reason). %% We don't send a keep-alive when a CONNECT request was initiated. keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState) -> @@ -684,7 +686,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, end. connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _) when Streams =/= [] -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "CONNECT can only be used with HTTP/1.1 when no other streams are active."}}, State; connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version}, @@ -758,12 +760,12 @@ down(#http_state{streams=Streams}) -> end || #stream{ref=Ref} <- Streams]. error_stream_closed(State, StreamRef, ReplyTo) -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream has already been closed."}}, State. error_stream_not_found(State, StreamRef, ReplyTo) -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "The stream cannot be found."}}, State. @@ -816,6 +818,18 @@ response_io_from_headers(_, Version, _Status, Headers) -> %% Streams. +stream_ref(#http_state{base_stream_ref=undefined}, StreamRef) -> + stream_ref(StreamRef); +stream_ref(#http_state{base_stream_ref=BaseStreamRef}, StreamRef) + when is_reference(BaseStreamRef) -> + [BaseStreamRef, stream_ref(StreamRef)]; +stream_ref(#http_state{base_stream_ref=BaseStreamRef}, StreamRef) -> + BaseStreamRef ++ [stream_ref(StreamRef)]. + +stream_ref({connect, StreamRef, _}) -> StreamRef; +stream_ref(#websocket{ref=StreamRef}) -> StreamRef; +stream_ref(StreamRef) -> StreamRef. + new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, Method, Authority, Path, InitialFlow) -> State#http_state{streams=Streams @@ -840,9 +854,9 @@ end_stream(State=#http_state{streams=[_|Tail]}) -> %% Websocket upgrade. -ws_upgrade(#http_state{version='HTTP/1.0'}, +ws_upgrade(State=#http_state{version='HTTP/1.0'}, StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) -> - ReplyTo ! {gun_error, self(), StreamRef, {badstate, + ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate, "Websocket cannot be used over an HTTP/1.0 connection."}}, {[], EvHandlerState}; ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo, @@ -939,7 +953,7 @@ ws_handshake_protocols(Buffer, State, Ws=#websocket{opts=Opts}, Headers, Extensi %% We know that the most recent stream is the Websocket one. ws_handshake_end(Buffer, - #http_state{socket=Socket, transport=Transport, streams=[#stream{flow=InitialFlow}|_]}, + State=#http_state{socket=Socket, transport=Transport, streams=[#stream{flow=InitialFlow}|_]}, #websocket{ref=StreamRef, reply_to=ReplyTo, opts=Opts}, Headers, Extensions, Handler) -> %% Send ourselves the remaining buffer, if any. _ = case Buffer of @@ -950,7 +964,7 @@ ws_handshake_end(Buffer, self() ! {OK, Socket, Buffer} end, %% Inform the user that the upgrade was successful and switch the protocol. - ReplyTo ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers}, + ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), [<<"websocket">>], Headers}, {switch_protocol, {ws, #{ stream_ref => StreamRef, headers => Headers, |