aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/gun_http2.erl61
-rw-r--r--test/rfc7540_SUITE.erl30
2 files changed, 68 insertions, 23 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 0774e46..d8853e7 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -1004,7 +1004,43 @@ headers(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
{[], CookieStore0, EvHandlerState0}
end.
-request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
+request(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, Method, Host, Port,
+ Path, Headers, Body, InitialFlow, CookieStore, EvHandler, EvHandlerState)
+ when is_reference(StreamRef) ->
+ case cow_http2_machine:is_remote_concurrency_limit_reached(HTTP2Machine) of
+ true ->
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef),
+ {stream_error, too_many_streams,
+ 'Maximum concurrency limit has been reached.'}}),
+ {[], CookieStore, EvHandlerState};
+ false ->
+ request1(State, StreamRef, ReplyTo, Method, Host, Port,
+ Path, Headers, Body, InitialFlow, CookieStore,
+ EvHandler, EvHandlerState)
+ end;
+%% Tunneled request.
+request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
+ Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
+ case get_stream_by_ref(State, StreamRef) of
+ %% @todo We should send an error to the user if the stream isn't ready.
+ Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
+ origin_host := OriginHost, origin_port := OriginPort}}} ->
+ {Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, RealStreamRef,
+ ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
+ InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
+ {ResCommands, EvHandlerState} = tunnel_commands(Commands,
+ Stream, State, EvHandler, EvHandlerState1),
+ {ResCommands, CookieStore, EvHandlerState};
+ #stream{tunnel=undefined} ->
+ gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
+ "The stream is not a tunnel."}}),
+ {[], CookieStore0, EvHandlerState0};
+ error ->
+ error_stream_not_found(State, StreamRef, ReplyTo),
+ {[], CookieStore0, EvHandlerState0}
+ end.
+
+request1(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
Path, Headers0, Body, InitialFlow0, CookieStore0, EvHandler, EvHandlerState0)
when is_reference(StreamRef) ->
@@ -1019,7 +1055,7 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
RequestEvent = #{
stream_ref => RealStreamRef,
reply_to => ReplyTo,
- function => ?FUNCTION_NAME,
+ function => request,
method => Method,
authority => Authority,
path => Path,
@@ -1053,27 +1089,6 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
end;
Error={error, _} ->
{Error, CookieStore, EvHandlerState1}
- end;
-%% Tunneled request.
-request(State, RealStreamRef=[StreamRef|_], ReplyTo, Method, _Host, _Port,
- Path, Headers, Body, InitialFlow, CookieStore0, EvHandler, EvHandlerState0) ->
- case get_stream_by_ref(State, StreamRef) of
- %% @todo We should send an error to the user if the stream isn't ready.
- Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0, info=#{
- origin_host := OriginHost, origin_port := OriginPort}}} ->
- {Commands, CookieStore, EvHandlerState1} = Proto:request(ProtoState0, RealStreamRef,
- ReplyTo, Method, OriginHost, OriginPort, Path, Headers, Body,
- InitialFlow, CookieStore0, EvHandler, EvHandlerState0),
- {ResCommands, EvHandlerState} = tunnel_commands(Commands,
- Stream, State, EvHandler, EvHandlerState1),
- {ResCommands, CookieStore, EvHandlerState};
- #stream{tunnel=undefined} ->
- gun:reply(ReplyTo, {gun_error, self(), stream_ref(State, StreamRef), {badstate,
- "The stream is not a tunnel."}}),
- {[], CookieStore0, EvHandlerState0};
- error ->
- error_stream_not_found(State, StreamRef, ReplyTo),
- {[], CookieStore0, EvHandlerState0}
end.
initial_flow(infinity, #{flow := InitialFlow}) -> InitialFlow;
diff --git a/test/rfc7540_SUITE.erl b/test/rfc7540_SUITE.erl
index 6ed3314..6ac06d5 100644
--- a/test/rfc7540_SUITE.erl
+++ b/test/rfc7540_SUITE.erl
@@ -377,6 +377,36 @@ lingering_data_counts_toward_connection_window(_) ->
timer:sleep(300),
gun:close(ConnPid).
+max_concurrent_streams(_) ->
+ doc("The SETTINGS_MAX_CONCURRENT_STREAMS setting can be used to "
+ "restrict the number of concurrent streams. (RFC7540 5.1.2, RFC7540 6.5.2)"),
+ Ref = make_ref(),
+ Routes = [{'_', [{"/delayed", delayed_hello_h, 1000}]}],
+ ProtoOpts = #{
+ env => #{dispatch => cowboy_router:compile(Routes)},
+ tcp => #{protocols => [http2]},
+ max_concurrent_streams => 1
+ },
+ [{ref, _}, {port, Port}] = gun_test:init_cowboy_tcp(Ref, ProtoOpts, []),
+ try
+ {ok, ConnPid} = gun:open("localhost", Port, #{
+ protocols => [http2],
+ http2_opts => #{notify_settings_changed => true}
+ }),
+ {ok, http2} = gun:await_up(ConnPid),
+ %% Wait for SETTINGS_MAX_CONCURRENT_STREAMS to be received by Gun.
+ receive {gun_notify, ConnPid, settings_changed, _} -> ok after 5000 -> error(timeout) end,
+ StreamRef1 = gun:get(ConnPid, "/delayed"),
+ StreamRef2 = gun:get(ConnPid, "/delayed"),
+ {error, {stream_error, Reason}} = gun:await(ConnPid, StreamRef2),
+ {stream_error, too_many_streams, _Human} = Reason,
+ {response, nofin, 200, _} = gun:await(ConnPid, StreamRef1),
+ {ok, _} = gun:await_body(ConnPid, StreamRef1),
+ gun:close(ConnPid)
+ after
+ cowboy:stop_listener(Ref)
+ end.
+
headers_priority_flag(_) ->
doc("HEADERS frames may include a PRIORITY flag indicating "
"that stream dependency information is attached. (RFC7540 6.2)"),