aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_http.erl
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2020-07-23 17:22:21 +0200
committerLoïc Hoguin <[email protected]>2020-09-21 15:51:56 +0200
commit323bd167fd33f322ab8747e398e54a8a36f5b753 (patch)
tree8105c23a63f76eb204ea32f4b29e933a18976b5e /src/gun_http.erl
parentf8272a1e8d5fbf3b8021479d142a2002846fe062 (diff)
downloadgun-323bd167fd33f322ab8747e398e54a8a36f5b753.tar.gz
gun-323bd167fd33f322ab8747e398e54a8a36f5b753.tar.bz2
gun-323bd167fd33f322ab8747e398e54a8a36f5b753.zip
Add the base_stream_ref to gun_http/gun_http2
Diffstat (limited to 'src/gun_http.erl')
-rw-r--r--src/gun_http.erl76
1 files changed, 45 insertions, 31 deletions
diff --git a/src/gun_http.erl b/src/gun_http.erl
index aae47cf..982df82 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -72,6 +72,11 @@
version = 'HTTP/1.1' :: cow_http:version(),
connection = keepalive :: keepalive | close,
buffer = <<>> :: binary(),
+
+ %% Base stream ref, defined when the protocol runs
+ %% inside an HTTP/2 CONNECT stream.
+ base_stream_ref = undefined :: undefined | reference() | [reference()],
+
streams = [] :: [#stream{}],
in = head :: io(),
in_state = {0, 0} :: {non_neg_integer(), non_neg_integer()},
@@ -114,9 +119,10 @@ has_keepalive() -> true.
default_keepalive() -> infinity.
init(_ReplyTo, Socket, Transport, Opts) ->
+ BaseStreamRef = maps:get(stream_ref, Opts, undefined),
Version = maps:get(version, Opts, 'HTTP/1.1'),
{connected, #http_state{socket=Socket, transport=Transport,
- opts=Opts, version=Version}}.
+ opts=Opts, version=Version, base_stream_ref=BaseStreamRef}}.
switch_transport(Transport, Socket, State) ->
State#http_state{socket=Socket, transport=Transport}.
@@ -149,7 +155,7 @@ handle(Data, State=#http_state{in=head, buffer=Buffer,
EvHandlerState = case Buffer of
<<>> ->
EvHandler:response_start(#{
- stream_ref => stream_ref(StreamRef),
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo
}, EvHandlerState0);
_ ->
@@ -237,7 +243,7 @@ handle(Data, State=#http_state{in=body_trailer, buffer=Buffer, connection=Conn,
{_, _} ->
{Trailers, Rest} = cow_http:parse_headers(Data2),
%% @todo We probably want to pass this to gun_content_handler?
- ReplyTo ! {gun_trailers, self(), stream_ref(StreamRef), Trailers},
+ ReplyTo ! {gun_trailers, self(), stream_ref(State, StreamRef), Trailers},
ResponseEvent = #{
stream_ref => StreamRef,
reply_to => ReplyTo
@@ -310,7 +316,7 @@ handle_connect(Rest, State=#http_state{
%% @todo If the stream is cancelled we probably shouldn't finish the CONNECT setup.
_ = case Stream of
#stream{is_alive=false} -> ok;
- _ -> ReplyTo ! {gun_response, self(), StreamRef, fin, Status, Headers}
+ _ -> ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef), fin, Status, Headers}
end,
%% @todo Figure out whether the event should trigger if the stream was cancelled.
EvHandlerState1 = EvHandler:response_headers(#{
@@ -351,7 +357,7 @@ handle_inform(Rest, State=#http_state{
streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|_]},
EvHandler, EvHandlerState0, Version, Status, Headers) ->
EvHandlerState = EvHandler:response_inform(#{
- stream_ref => stream_ref(StreamRef),
+ stream_ref => stream_ref(State, StreamRef),
reply_to => ReplyTo,
status => Status,
headers => Headers
@@ -367,16 +373,16 @@ handle_inform(Rest, State=#http_state{
%% @todo We shouldn't ignore Rest.
{_, Upgrade0} = lists:keyfind(<<"upgrade">>, 1, Headers),
Upgrade = cow_http_hd:parse_upgrade(Upgrade0),
- ReplyTo ! {gun_upgrade, self(), StreamRef, Upgrade, Headers},
+ ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), Upgrade, Headers},
{handle_ret({switch_protocol, raw, ReplyTo}, State), EvHandlerState0}
catch _:_ ->
%% When the Upgrade header is missing or invalid we treat
%% the response as any other informational response.
- ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers},
+ ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers},
handle(Rest, State, EvHandler, EvHandlerState)
end;
_ ->
- ReplyTo ! {gun_inform, self(), stream_ref(StreamRef), Status, Headers},
+ ReplyTo ! {gun_inform, self(), stream_ref(State, StreamRef), Status, Headers},
handle(Rest, State, EvHandler, EvHandlerState)
end.
@@ -390,7 +396,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
false ->
{undefined, EvHandlerState0};
true ->
- ReplyTo ! {gun_response, self(), stream_ref(StreamRef),
+ ReplyTo ! {gun_response, self(), stream_ref(State, StreamRef),
IsFin, Status, Headers},
EvHandlerState1 = EvHandler:response_headers(#{
stream_ref => StreamRef,
@@ -402,7 +408,7 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
fin -> {undefined, EvHandlerState1};
nofin ->
Handlers0 = maps:get(content_handlers, Opts, [gun_data_h]),
- {gun_content_handler:init(ReplyTo, stream_ref(StreamRef),
+ {gun_content_handler:init(ReplyTo, stream_ref(State, StreamRef),
Status, Headers, Handlers0), EvHandlerState1}
end
end,
@@ -437,10 +443,6 @@ handle_response(Rest, State=#http_state{version=ClientVersion, opts=Opts, connec
EvHandler, EvHandlerState)
end.
-stream_ref({connect, StreamRef, _}) -> StreamRef;
-stream_ref(#websocket{ref=StreamRef}) -> StreamRef;
-stream_ref(StreamRef) -> StreamRef.
-
%% The state must be first in order to retrieve it when the stream ended.
send_data(<<>>, State, nofin) ->
[{state, State}, {active, true}];
@@ -485,7 +487,7 @@ closing(_, #http_state{streams=[]}, _, EvHandlerState) ->
%% Otherwise we set connection: close (even if the header was not sent)
%% and close any pipelined streams, only keeping the active stream.
closing(Reason, State=#http_state{streams=[LastStream|Tail]}, _, EvHandlerState) ->
- close_streams(Tail, {closing, Reason}),
+ close_streams(State, Tail, {closing, Reason}),
{[
{state, State#http_state{connection=close, streams=[LastStream]}},
closing(State)
@@ -499,27 +501,27 @@ close(Reason, State=#http_state{in=body_close,
streams=[#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]},
EvHandler, EvHandlerState) ->
%% We may have more than one stream in case we somehow close abruptly.
- close_streams(Tail, close_reason(Reason)),
+ close_streams(State, Tail, close_reason(Reason)),
_ = send_data(<<>>, State, fin),
EvHandler:response_end(#{
stream_ref => StreamRef,
reply_to => ReplyTo
}, EvHandlerState);
-close(Reason, #http_state{streams=Streams}, _, EvHandlerState) ->
- close_streams(Streams, close_reason(Reason)),
+close(Reason, State=#http_state{streams=Streams}, _, EvHandlerState) ->
+ close_streams(State, Streams, close_reason(Reason)),
EvHandlerState.
close_reason(closed) -> closed;
close_reason(Reason) -> {closed, Reason}.
%% @todo Do we want an event for this?
-close_streams([], _) ->
+close_streams(_, [], _) ->
ok;
-close_streams([#stream{is_alive=false}|Tail], Reason) ->
- close_streams(Tail, Reason);
-close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) ->
- ReplyTo ! {gun_error, self(), StreamRef, Reason},
- close_streams(Tail, Reason).
+close_streams(State, [#stream{is_alive=false}|Tail], Reason) ->
+ close_streams(State, Tail, Reason);
+close_streams(State, [#stream{ref=StreamRef, reply_to=ReplyTo}|Tail], Reason) ->
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), Reason},
+ close_streams(State, Tail, Reason).
%% We don't send a keep-alive when a CONNECT request was initiated.
keepalive(State=#http_state{streams=[#stream{ref={connect, _, _}}]}, _, EvHandlerState) ->
@@ -684,7 +686,7 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
end.
connect(State=#http_state{streams=Streams}, StreamRef, ReplyTo, _, _, _, _) when Streams =/= [] ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"CONNECT can only be used with HTTP/1.1 when no other streams are active."}},
State;
connect(State=#http_state{socket=Socket, transport=Transport, opts=Opts, version=Version},
@@ -758,12 +760,12 @@ down(#http_state{streams=Streams}) ->
end || #stream{ref=Ref} <- Streams].
error_stream_closed(State, StreamRef, ReplyTo) ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream has already been closed."}},
State.
error_stream_not_found(State, StreamRef, ReplyTo) ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"The stream cannot be found."}},
State.
@@ -816,6 +818,18 @@ response_io_from_headers(_, Version, _Status, Headers) ->
%% Streams.
+stream_ref(#http_state{base_stream_ref=undefined}, StreamRef) ->
+ stream_ref(StreamRef);
+stream_ref(#http_state{base_stream_ref=BaseStreamRef}, StreamRef)
+ when is_reference(BaseStreamRef) ->
+ [BaseStreamRef, stream_ref(StreamRef)];
+stream_ref(#http_state{base_stream_ref=BaseStreamRef}, StreamRef) ->
+ BaseStreamRef ++ [stream_ref(StreamRef)].
+
+stream_ref({connect, StreamRef, _}) -> StreamRef;
+stream_ref(#websocket{ref=StreamRef}) -> StreamRef;
+stream_ref(StreamRef) -> StreamRef.
+
new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo,
Method, Authority, Path, InitialFlow) ->
State#http_state{streams=Streams
@@ -840,9 +854,9 @@ end_stream(State=#http_state{streams=[_|Tail]}) ->
%% Websocket upgrade.
-ws_upgrade(#http_state{version='HTTP/1.0'},
+ws_upgrade(State=#http_state{version='HTTP/1.0'},
StreamRef, ReplyTo, _, _, _, _, _, _, EvHandlerState) ->
- ReplyTo ! {gun_error, self(), StreamRef, {badstate,
+ ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef), {badstate,
"Websocket cannot be used over an HTTP/1.0 connection."}},
{[], EvHandlerState};
ws_upgrade(State=#http_state{out=head}, StreamRef, ReplyTo,
@@ -939,7 +953,7 @@ ws_handshake_protocols(Buffer, State, Ws=#websocket{opts=Opts}, Headers, Extensi
%% We know that the most recent stream is the Websocket one.
ws_handshake_end(Buffer,
- #http_state{socket=Socket, transport=Transport, streams=[#stream{flow=InitialFlow}|_]},
+ State=#http_state{socket=Socket, transport=Transport, streams=[#stream{flow=InitialFlow}|_]},
#websocket{ref=StreamRef, reply_to=ReplyTo, opts=Opts}, Headers, Extensions, Handler) ->
%% Send ourselves the remaining buffer, if any.
_ = case Buffer of
@@ -950,7 +964,7 @@ ws_handshake_end(Buffer,
self() ! {OK, Socket, Buffer}
end,
%% Inform the user that the upgrade was successful and switch the protocol.
- ReplyTo ! {gun_upgrade, self(), StreamRef, [<<"websocket">>], Headers},
+ ReplyTo ! {gun_upgrade, self(), stream_ref(State, StreamRef), [<<"websocket">>], Headers},
{switch_protocol, {ws, #{
stream_ref => StreamRef,
headers => Headers,