From 8dd39857bc4b7312515d7ffdb89c5130a607f4ca Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 9 Dec 2019 15:22:25 +0100 Subject: WIP Use a map for streams in cow_http2_machine Still has some maps:from_list/to_list for now. --- src/cow_http2_machine.erl | 46 +++++++++++++++++++++------------------------- 1 file changed, 21 insertions(+), 25 deletions(-) diff --git a/src/cow_http2_machine.erl b/src/cow_http2_machine.erl index 30654d0..405e2d2 100644 --- a/src/cow_http2_machine.erl +++ b/src/cow_http2_machine.erl @@ -146,7 +146,7 @@ %% Currently active HTTP/2 streams. Streams may be initiated either %% by the client or by the server through PUSH_PROMISE frames. - streams = [] :: [stream()], + streams = #{} :: #{cow_http2:streamid() => stream()}, %% HTTP/2 streams that have recently been reset locally. %% We are expected to keep receiving additional frames after @@ -519,7 +519,7 @@ headers_enforce_concurrency_limit(Frame=#headers{id=StreamID}, MaxConcurrentStreams = maps:get(max_concurrent_streams, LocalSettings, infinity), %% Using < is correct because this new stream is not included %% in the Streams variable yet and so we'll end up with +1 stream. - case length(Streams) < MaxConcurrentStreams of + case map_size(Streams) < MaxConcurrentStreams of true -> headers_pseudo_headers(Frame, State, Type, Stream, Headers); false -> @@ -803,7 +803,7 @@ rst_stream_frame({rst_stream, StreamID, _}, State=#http2_machine{mode=Mode, State}; rst_stream_frame({rst_stream, StreamID, Reason}, State=#http2_machine{ streams=Streams0, remote_lingering_streams=Lingering0}) -> - Streams = lists:keydelete(StreamID, #stream.id, Streams0), + Streams = maps:remove(StreamID, Streams0), %% We only keep up to 10 streams in this state. @todo Make it configurable? Lingering = [StreamID|lists:sublist(Lingering0, 10 - 1)], {ok, {rst_stream, StreamID, Reason}, @@ -839,9 +839,9 @@ settings_frame(_F, State) -> %% the local stream windows for all active streams and perhaps %% resume sending data. streams_update_local_window(State=#http2_machine{streams=Streams0}, Increment) -> - Streams = [ + Streams = maps:map(fun(_, S=#stream{local_window=StreamWindow}) -> S#stream{local_window=StreamWindow + Increment} - || S=#stream{local_window=StreamWindow} <- Streams0], + end, Streams0), State#http2_machine{streams=Streams}. %% Ack for a previously sent SETTINGS frame. @@ -869,9 +869,9 @@ settings_ack_frame(State0=#http2_machine{settings_timer=TRef, %% When we receive an ack to a SETTINGS frame we sent we need to update %% the remote stream windows for all active streams. streams_update_remote_window(State=#http2_machine{streams=Streams0}, Increment) -> - Streams = [ + Streams = maps:map(fun(_, S=#stream{remote_window=StreamWindow}) -> S#stream{remote_window=StreamWindow + Increment} - || S=#stream{remote_window=StreamWindow} <- Streams0], + end, Streams0), State#http2_machine{streams=Streams}. %% PUSH_PROMISE frame. @@ -1213,11 +1213,11 @@ send_or_queue_data(StreamID, State0=#http2_machine{opts=Opts, local_window=ConnW %% all streams and send what we can until either everything is %% sent or we run out of space in the window. send_data(State0=#http2_machine{streams=Streams0}) -> - case send_data_for_all_streams(Streams0, State0, [], []) of + case send_data_for_all_streams(maps:to_list(Streams0), State0, [], []) of {ok, Streams, State, []} -> - {ok, State#http2_machine{streams=Streams}}; + {ok, State#http2_machine{streams=maps:from_list(Streams)}}; {ok, Streams, State, Send} -> - {send, Send, State#http2_machine{streams=Streams}} + {send, Send, State#http2_machine{streams=maps:from_list(Streams)}} end. send_data_for_all_streams([], State, Acc, Send) -> @@ -1227,16 +1227,16 @@ send_data_for_all_streams(Tail, State=#http2_machine{local_window=ConnWindow}, A when ConnWindow =< 0 -> {ok, lists:reverse(Acc, Tail), State, Send}; %% We rely on send_data_for_one_stream/3 to do all the necessary checks about the stream. -send_data_for_all_streams([Stream0|Tail], State0, Acc, Send) -> +send_data_for_all_streams([{StreamID, Stream0}|Tail], State0, Acc, Send) -> case send_data_for_one_stream(Stream0, State0, []) of {ok, Stream, State, []} -> - send_data_for_all_streams(Tail, State, [Stream|Acc], Send); + send_data_for_all_streams(Tail, State, [{StreamID, Stream}|Acc], Send); %% We need to remove the stream here because we do not use stream_store/2. {ok, #stream{id=StreamID, local=fin, remote=fin}, State, SendData} -> send_data_for_all_streams(Tail, State, Acc, [{StreamID, fin, SendData}|Send]); {ok, Stream=#stream{id=StreamID, local=IsFin}, State, SendData} -> - send_data_for_all_streams(Tail, State, [Stream|Acc], + send_data_for_all_streams(Tail, State, [{StreamID, Stream}|Acc], [{StreamID, IsFin, SendData}|Send]) end. @@ -1460,10 +1460,10 @@ update_window(StreamID, Size, State) -spec reset_stream(cow_http2:streamid(), State) -> {ok, State} | {error, not_found} when State::http2_machine(). reset_stream(StreamID, State=#http2_machine{streams=Streams0}) -> - case lists:keytake(StreamID, #stream.id, Streams0) of - {value, _, Streams} -> + case maps:take(StreamID, #stream.id, Streams0) of + {_, Streams} -> {ok, stream_linger(StreamID, State#http2_machine{streams=Streams})}; - false -> + error -> {error, not_found} end. @@ -1471,7 +1471,7 @@ reset_stream(StreamID, State=#http2_machine{streams=Streams0}) -> -spec get_connection_local_buffer_size(http2_machine()) -> non_neg_integer(). get_connection_local_buffer_size(#http2_machine{streams=Streams}) -> - lists:foldl(fun(#stream{local_buffer_size=Size}, Acc) -> + maps:fold(fun(_, #stream{local_buffer_size=Size}, Acc) -> Acc + Size end, 0, Streams). @@ -1564,19 +1564,15 @@ is_lingering_stream(StreamID, #http2_machine{ %% Stream-related functions. stream_get(StreamID, #http2_machine{streams=Streams}) -> - case lists:keyfind(StreamID, #stream.id, Streams) of - false -> undefined; - Stream -> Stream - end. + maps:get(StreamID, Streams, undefined). stream_store(#stream{id=StreamID, local=fin, remote=fin}, State=#http2_machine{streams=Streams0}) -> - Streams = lists:keydelete(StreamID, #stream.id, Streams0), + Streams = maps:remove(StreamID, Streams0), State#http2_machine{streams=Streams}; stream_store(Stream=#stream{id=StreamID}, - State=#http2_machine{streams=Streams0}) -> - Streams = lists:keystore(StreamID, #stream.id, Streams0, Stream), - State#http2_machine{streams=Streams}. + State=#http2_machine{streams=Streams}) -> + State#http2_machine{streams=Streams#{StreamID => Stream}}. %% @todo Don't send an RST_STREAM if one was already sent. stream_reset(StreamID, State, Reason, HumanReadable) -> -- cgit v1.2.3