aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLoïc Hoguin <[email protected]>2019-10-17 14:23:02 +0200
committerLoïc Hoguin <[email protected]>2019-10-17 14:23:02 +0200
commite6b3e080e920007f27afb0d28710d4fdc2c57ebe (patch)
tree4897575c0e5596bef85eb283f8c659b099b99c89
parent8461c8383cfdc39a50a8c7769fe8130818d5f1f5 (diff)
downloadgun-e6b3e080e920007f27afb0d28710d4fdc2c57ebe.tar.gz
gun-e6b3e080e920007f27afb0d28710d4fdc2c57ebe.tar.bz2
gun-e6b3e080e920007f27afb0d28710d4fdc2c57ebe.zip
Use maps for looking up HTTP/2 streams
This should be much faster than using lists:keyfind and friends. This matters for connections that have a lot of concurrent streams.
-rw-r--r--src/gun_http2.erl143
1 files changed, 82 insertions, 61 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 79d7841..eb6f06c 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -65,7 +65,12 @@
%% Currently active HTTP/2 streams. Streams may be initiated either
%% by the client or by the server through PUSH_PROMISE frames.
- streams = [] :: [#stream{}]
+ %%
+ %% Streams can be found by ID or by Ref. The most common should be
+ %% the idea, that's why the main map has the ID as key. Then we also
+ %% have a Ref->ID index for faster lookup when we only have the Ref.
+ streams = #{} :: #{cow_http2:streamid() => #stream{}},
+ stream_refs = #{} :: #{reference() => cow_http2:streamid()}
}).
check_options(Opts) ->
@@ -162,7 +167,7 @@ parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, strea
{connection_error(State0, Error), EvHandlerState0};
%% If we both received and sent a GOAWAY frame and there are no streams
%% currently running, we can close the connection immediately.
- more when Status =/= connected, Streams =:= [] ->
+ more when Status =/= connected, Streams =:= #{} ->
{[{state, State0#http2_state{buffer=Data, status=closing}}, close], EvHandlerState0};
%% Otherwise we enter the closing state.
more when Status =:= goaway ->
@@ -329,10 +334,9 @@ trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) ->
EvHandlerState = EvHandler:response_end(ResponseEvent, EvHandlerState1),
{maybe_delete_stream(State, StreamID, remote, fin), EvHandlerState}.
-rst_stream_frame(State=#http2_state{streams=Streams0},
- StreamID, Reason, EvHandler, EvHandlerState0) ->
- case lists:keytake(StreamID, #stream.id, Streams0) of
- {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} ->
+rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) ->
+ case take_stream(State0, StreamID) of
+ {#stream{ref=StreamRef, reply_to=ReplyTo}, State} ->
ReplyTo ! {gun_error, self(), StreamRef,
{stream_error, Reason, 'Stream reset by server.'}},
EvHandlerState = EvHandler:cancel(#{
@@ -341,14 +345,14 @@ rst_stream_frame(State=#http2_state{streams=Streams0},
endpoint => remote,
reason => Reason
}, EvHandlerState0),
- {State#http2_state{streams=Streams}, EvHandlerState};
- false ->
- {State, EvHandlerState0}
+ {State, EvHandlerState};
+ error ->
+ {State0, EvHandlerState0}
end.
%% Pushed streams receive the same initial flow value as the parent stream.
push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
- status=Status, http2_machine=HTTP2Machine0, streams=Streams},
+ status=Status, http2_machine=HTTP2Machine0},
StreamID, PromisedStreamID, Headers, #{
method := Method, scheme := Scheme,
authority := Authority, path := Path},
@@ -375,7 +379,7 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport,
connected ->
NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef,
reply_to=ReplyTo, flow=InitialFlow},
- {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState};
+ {create_stream(State, NewStream), EvHandlerState};
%% We cancel the push_promise immediately when we are shutting down.
_ ->
{ok, HTTP2Machine} = cow_http2_machine:reset_stream(PromisedStreamID, HTTP2Machine0),
@@ -406,7 +410,7 @@ update_flow(State, _ReplyTo, StreamRef, Inc) ->
true ->
{state, store_stream(State, Stream#stream{flow=Flow})}
end;
- false ->
+ error ->
[]
end.
@@ -443,9 +447,13 @@ update_window(State=#http2_state{socket=Socket, transport=Transport,
%% GOAWAY frames as the LastStreamID value may be lower than
%% the one previously received.
goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine,
- status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) ->
- Streams = goaway_streams(Streams0, LastStreamID, {goaway, Reason, 'The connection is going away.'}, []),
- State = State0#http2_state{streams=Streams},
+ status=Status, streams=Streams0, stream_refs=Refs}, {goaway, LastStreamID, Reason, _}) ->
+ {Streams, RemovedRefs} = goaway_streams(maps:to_list(Streams0), LastStreamID,
+ {goaway, Reason, 'The connection is going away.'}, [], []),
+ State = State0#http2_state{
+ streams=maps:from_list(Streams),
+ stream_refs=maps:without(RemovedRefs, Refs)
+ },
case Status of
connected ->
Transport:send(Socket, cow_http2:goaway(
@@ -457,14 +465,14 @@ goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTT
end.
%% Cancel server-initiated streams that are above LastStreamID.
-goaway_streams([], _, _, Acc) ->
- Acc;
-goaway_streams([Stream=#stream{id=StreamID}|Tail], LastStreamID, Reason, Acc)
+goaway_streams([], _, _, Acc, RefsAcc) ->
+ {Acc, RefsAcc};
+goaway_streams([{StreamID, Stream=#stream{ref=StreamRef}}|Tail], LastStreamID, Reason, Acc, RefsAcc)
when StreamID > LastStreamID, (StreamID rem 2) =:= 1 ->
close_stream(Stream, Reason),
- goaway_streams(Tail, LastStreamID, Reason, Acc);
-goaway_streams([Stream|Tail], LastStreamID, Reason, Acc) ->
- goaway_streams(Tail, LastStreamID, Reason, [Stream|Acc]).
+ goaway_streams(Tail, LastStreamID, Reason, Acc, [StreamRef|RefsAcc]);
+goaway_streams([StreamWithID|Tail], LastStreamID, Reason, Acc, RefsAcc) ->
+ goaway_streams(Tail, LastStreamID, Reason, [StreamWithID|Acc], RefsAcc).
%% We are already closing, do nothing.
closing(_, #http2_state{status=closing}, _, EvHandlerState) ->
@@ -488,19 +496,16 @@ closing(#http2_state{opts=Opts}) ->
Timeout = maps:get(closing_timeout, Opts, 15000),
{closing, Timeout}.
-close(Reason, #http2_state{streams=Streams}, _, EvHandlerState) ->
- close_streams(Streams, close_reason(Reason)),
+close(Reason0, #http2_state{streams=Streams}, _, EvHandlerState) ->
+ Reason = close_reason(Reason0),
+ _ = maps:fold(fun(_, Stream, _) ->
+ close_stream(Stream, Reason)
+ end, [], Streams),
EvHandlerState.
close_reason(closed) -> closed;
close_reason(Reason) -> {closed, Reason}.
-close_streams([], _) ->
- ok;
-close_streams([Stream|Tail], Reason) ->
- close_stream(Stream, Reason),
- close_streams(Tail, Reason).
-
%% @todo Do we want an event for this?
close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) ->
ReplyTo ! {gun_error, self(), StreamRef, Reason},
@@ -511,9 +516,8 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerSt
{State, EvHandlerState}.
headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
- http2_machine=HTTP2Machine0, streams=Streams},
- StreamRef, ReplyTo, Method, Host, Port, Path, Headers0,
- InitialFlow0, EvHandler, EvHandlerState0) ->
+ http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
+ Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0) ->
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
iolist_to_binary(Method), HTTP2Machine0),
{ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0),
@@ -533,13 +537,11 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts,
EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
InitialFlow = initial_flow(InitialFlow0, Opts),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow},
- {State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]},
- EvHandlerState}.
+ {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), EvHandlerState}.
request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
- http2_machine=HTTP2Machine0, streams=Streams},
- StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body,
- InitialFlow0, EvHandler, EvHandlerState0) ->
+ http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port,
+ Path, Headers0, Body, InitialFlow0, EvHandler, EvHandlerState0) ->
Headers1 = lists:keystore(<<"content-length">>, 1, Headers0,
{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
{ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream(
@@ -565,7 +567,7 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts,
EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1),
InitialFlow = initial_flow(InitialFlow0, Opts),
Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow},
- State = State0#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]},
+ State = create_stream(State0#http2_state{http2_machine=HTTP2Machine}, Stream),
case IsFin of
fin ->
RequestEndEvent = #{
@@ -617,7 +619,7 @@ data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin,
{ok, _, _} ->
maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState)
end;
- false ->
+ error ->
{error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState}
end.
@@ -677,15 +679,15 @@ send_data_frame(State=#http2_state{socket=Socket, transport=Transport,
Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)),
State#http2_state{http2_machine=HTTP2Machine}.
-reset_stream(State=#http2_state{socket=Socket, transport=Transport,
- streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) ->
+reset_stream(State0=#http2_state{socket=Socket, transport=Transport},
+ StreamID, StreamError={stream_error, Reason, _}) ->
Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
- case lists:keytake(StreamID, #stream.id, Streams0) of
- {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} ->
+ case take_stream(State0, StreamID) of
+ {#stream{ref=StreamRef, reply_to=ReplyTo}, State} ->
ReplyTo ! {gun_error, self(), StreamRef, StreamError},
- State#http2_state{streams=Streams};
- false ->
- State
+ State;
+ error ->
+ State0
end.
cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0},
@@ -702,7 +704,7 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP
}, EvHandlerState0),
{delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID),
EvHandlerState};
- false ->
+ error ->
{error_stream_not_found(State, StreamRef, ReplyTo),
EvHandlerState0}
end.
@@ -715,17 +717,19 @@ stream_info(State, StreamRef) ->
reply_to => ReplyTo,
state => running
}};
- false ->
+ error ->
{ok, undefined}
end.
-down(#http2_state{streams=Streams}) ->
- [Ref || #stream{ref=Ref} <- Streams].
+down(#http2_state{stream_refs=Refs}) ->
+ maps:keys(Refs).
connection_error(#http2_state{socket=Socket, transport=Transport,
http2_machine=HTTP2Machine, streams=Streams},
{connection_error, Reason, _}) ->
- Pids = lists:usort([ReplyTo || #stream{reply_to=ReplyTo} <- Streams]),
+ Pids = lists:usort(maps:fold(
+ fun(_, #stream{reply_to=ReplyTo}, Acc) -> [ReplyTo|Acc] end,
+ [], Streams)),
_ = [Pid ! {gun_error, self(), Reason} || Pid <- Pids],
Transport:send(Socket, cow_http2:goaway(
cow_http2_machine:get_last_streamid(HTTP2Machine),
@@ -745,17 +749,31 @@ error_stream_not_found(State, StreamRef, ReplyTo) ->
State.
%% Streams.
-%% @todo probably change order of args and have state first? Yes.
get_stream_by_id(#http2_state{streams=Streams}, StreamID) ->
- lists:keyfind(StreamID, #stream.id, Streams).
+ maps:get(StreamID, Streams).
+
+get_stream_by_ref(#http2_state{streams=Streams, stream_refs=Refs}, StreamRef) ->
+ case maps:get(StreamRef, Refs, error) of
+ error -> error;
+ StreamID -> maps:get(StreamID, Streams)
+ end.
-get_stream_by_ref(#http2_state{streams=Streams}, StreamRef) ->
- lists:keyfind(StreamRef, #stream.ref, Streams).
+create_stream(State=#http2_state{streams=Streams, stream_refs=Refs},
+ Stream=#stream{id=StreamID, ref=StreamRef}) ->
+ State#http2_state{
+ streams=Streams#{StreamID => Stream},
+ stream_refs=Refs#{StreamRef => StreamID}
+ }.
-store_stream(State=#http2_state{streams=Streams0}, Stream=#stream{id=StreamID}) ->
- Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream),
- State#http2_state{streams=Streams}.
+store_stream(State=#http2_state{streams=Streams}, Stream=#stream{id=StreamID}) ->
+ State#http2_state{streams=Streams#{StreamID => Stream}}.
+
+take_stream(State=#http2_state{streams=Streams0}, StreamID) ->
+ case maps:take(StreamID, Streams0) of
+ {Stream, Streams} -> {Stream, State#http2_state{streams=Streams}};
+ error -> error
+ end.
maybe_delete_stream(State=#http2_state{http2_machine=HTTP2Machine}, StreamID, local, fin) ->
case cow_http2_machine:get_stream_remote_state(StreamID, HTTP2Machine) of
@@ -772,6 +790,9 @@ maybe_delete_stream(State=#http2_state{http2_machine=HTTP2Machine}, StreamID, re
maybe_delete_stream(State, _, _, _) ->
State.
-delete_stream(State=#http2_state{streams=Streams}, StreamID) ->
- Streams2 = lists:keydelete(StreamID, #stream.id, Streams),
- State#http2_state{streams=Streams2}.
+delete_stream(State=#http2_state{streams=Streams, stream_refs=Refs}, StreamID) ->
+ #{StreamID := #stream{ref=StreamRef}} = Streams,
+ State#http2_state{
+ streams=maps:remove(StreamID, Streams),
+ stream_refs=maps:remove(StreamRef, Refs)
+ }.