diff options
Diffstat (limited to 'src/gun_http2.erl')
-rw-r--r-- | src/gun_http2.erl | 143 |
1 files changed, 82 insertions, 61 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 79d7841..eb6f06c 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -65,7 +65,12 @@ %% Currently active HTTP/2 streams. Streams may be initiated either %% by the client or by the server through PUSH_PROMISE frames. - streams = [] :: [#stream{}] + %% + %% Streams can be found by ID or by Ref. The most common should be + %% the idea, that's why the main map has the ID as key. Then we also + %% have a Ref->ID index for faster lookup when we only have the Ref. + streams = #{} :: #{cow_http2:streamid() => #stream{}}, + stream_refs = #{} :: #{reference() => cow_http2:streamid()} }). check_options(Opts) -> @@ -162,7 +167,7 @@ parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, strea {connection_error(State0, Error), EvHandlerState0}; %% If we both received and sent a GOAWAY frame and there are no streams %% currently running, we can close the connection immediately. - more when Status =/= connected, Streams =:= [] -> + more when Status =/= connected, Streams =:= #{} -> {[{state, State0#http2_state{buffer=Data, status=closing}}, close], EvHandlerState0}; %% Otherwise we enter the closing state. more when Status =:= goaway -> @@ -329,10 +334,9 @@ trailers_frame(State, StreamID, Trailers, EvHandler, EvHandlerState0) -> EvHandlerState = EvHandler:response_end(ResponseEvent, EvHandlerState1), {maybe_delete_stream(State, StreamID, remote, fin), EvHandlerState}. -rst_stream_frame(State=#http2_state{streams=Streams0}, - StreamID, Reason, EvHandler, EvHandlerState0) -> - case lists:keytake(StreamID, #stream.id, Streams0) of - {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} -> +rst_stream_frame(State0, StreamID, Reason, EvHandler, EvHandlerState0) -> + case take_stream(State0, StreamID) of + {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> ReplyTo ! {gun_error, self(), StreamRef, {stream_error, Reason, 'Stream reset by server.'}}, EvHandlerState = EvHandler:cancel(#{ @@ -341,14 +345,14 @@ rst_stream_frame(State=#http2_state{streams=Streams0}, endpoint => remote, reason => Reason }, EvHandlerState0), - {State#http2_state{streams=Streams}, EvHandlerState}; - false -> - {State, EvHandlerState0} + {State, EvHandlerState}; + error -> + {State0, EvHandlerState0} end. %% Pushed streams receive the same initial flow value as the parent stream. push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, - status=Status, http2_machine=HTTP2Machine0, streams=Streams}, + status=Status, http2_machine=HTTP2Machine0}, StreamID, PromisedStreamID, Headers, #{ method := Method, scheme := Scheme, authority := Authority, path := Path}, @@ -375,7 +379,7 @@ push_promise_frame(State=#http2_state{socket=Socket, transport=Transport, connected -> NewStream = #stream{id=PromisedStreamID, ref=PromisedStreamRef, reply_to=ReplyTo, flow=InitialFlow}, - {State#http2_state{streams=[NewStream|Streams]}, EvHandlerState}; + {create_stream(State, NewStream), EvHandlerState}; %% We cancel the push_promise immediately when we are shutting down. _ -> {ok, HTTP2Machine} = cow_http2_machine:reset_stream(PromisedStreamID, HTTP2Machine0), @@ -406,7 +410,7 @@ update_flow(State, _ReplyTo, StreamRef, Inc) -> true -> {state, store_stream(State, Stream#stream{flow=Flow})} end; - false -> + error -> [] end. @@ -443,9 +447,13 @@ update_window(State=#http2_state{socket=Socket, transport=Transport, %% GOAWAY frames as the LastStreamID value may be lower than %% the one previously received. goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine, - status=Status, streams=Streams0}, {goaway, LastStreamID, Reason, _}) -> - Streams = goaway_streams(Streams0, LastStreamID, {goaway, Reason, 'The connection is going away.'}, []), - State = State0#http2_state{streams=Streams}, + status=Status, streams=Streams0, stream_refs=Refs}, {goaway, LastStreamID, Reason, _}) -> + {Streams, RemovedRefs} = goaway_streams(maps:to_list(Streams0), LastStreamID, + {goaway, Reason, 'The connection is going away.'}, [], []), + State = State0#http2_state{ + streams=maps:from_list(Streams), + stream_refs=maps:without(RemovedRefs, Refs) + }, case Status of connected -> Transport:send(Socket, cow_http2:goaway( @@ -457,14 +465,14 @@ goaway(State0=#http2_state{socket=Socket, transport=Transport, http2_machine=HTT end. %% Cancel server-initiated streams that are above LastStreamID. -goaway_streams([], _, _, Acc) -> - Acc; -goaway_streams([Stream=#stream{id=StreamID}|Tail], LastStreamID, Reason, Acc) +goaway_streams([], _, _, Acc, RefsAcc) -> + {Acc, RefsAcc}; +goaway_streams([{StreamID, Stream=#stream{ref=StreamRef}}|Tail], LastStreamID, Reason, Acc, RefsAcc) when StreamID > LastStreamID, (StreamID rem 2) =:= 1 -> close_stream(Stream, Reason), - goaway_streams(Tail, LastStreamID, Reason, Acc); -goaway_streams([Stream|Tail], LastStreamID, Reason, Acc) -> - goaway_streams(Tail, LastStreamID, Reason, [Stream|Acc]). + goaway_streams(Tail, LastStreamID, Reason, Acc, [StreamRef|RefsAcc]); +goaway_streams([StreamWithID|Tail], LastStreamID, Reason, Acc, RefsAcc) -> + goaway_streams(Tail, LastStreamID, Reason, [StreamWithID|Acc], RefsAcc). %% We are already closing, do nothing. closing(_, #http2_state{status=closing}, _, EvHandlerState) -> @@ -488,19 +496,16 @@ closing(#http2_state{opts=Opts}) -> Timeout = maps:get(closing_timeout, Opts, 15000), {closing, Timeout}. -close(Reason, #http2_state{streams=Streams}, _, EvHandlerState) -> - close_streams(Streams, close_reason(Reason)), +close(Reason0, #http2_state{streams=Streams}, _, EvHandlerState) -> + Reason = close_reason(Reason0), + _ = maps:fold(fun(_, Stream, _) -> + close_stream(Stream, Reason) + end, [], Streams), EvHandlerState. close_reason(closed) -> closed; close_reason(Reason) -> {closed, Reason}. -close_streams([], _) -> - ok; -close_streams([Stream|Tail], Reason) -> - close_stream(Stream, Reason), - close_streams(Tail, Reason). - %% @todo Do we want an event for this? close_stream(#stream{ref=StreamRef, reply_to=ReplyTo}, Reason) -> ReplyTo ! {gun_error, self(), StreamRef, Reason}, @@ -511,9 +516,8 @@ keepalive(State=#http2_state{socket=Socket, transport=Transport}, _, EvHandlerSt {State, EvHandlerState}. headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, - http2_machine=HTTP2Machine0, streams=Streams}, - StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, - InitialFlow0, EvHandler, EvHandlerState0) -> + http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, + Path, Headers0, InitialFlow0, EvHandler, EvHandlerState0) -> {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( iolist_to_binary(Method), HTTP2Machine0), {ok, PseudoHeaders, Headers} = prepare_headers(State, Method, Host, Port, Path, Headers0), @@ -533,13 +537,11 @@ headers(State=#http2_state{socket=Socket, transport=Transport, opts=Opts, EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), InitialFlow = initial_flow(InitialFlow0, Opts), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow}, - {State#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]}, - EvHandlerState}. + {create_stream(State#http2_state{http2_machine=HTTP2Machine}, Stream), EvHandlerState}. request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, - http2_machine=HTTP2Machine0, streams=Streams}, - StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body, - InitialFlow0, EvHandler, EvHandlerState0) -> + http2_machine=HTTP2Machine0}, StreamRef, ReplyTo, Method, Host, Port, + Path, Headers0, Body, InitialFlow0, EvHandler, EvHandlerState0) -> Headers1 = lists:keystore(<<"content-length">>, 1, Headers0, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), {ok, StreamID, HTTP2Machine1} = cow_http2_machine:init_stream( @@ -565,7 +567,7 @@ request(State0=#http2_state{socket=Socket, transport=Transport, opts=Opts, EvHandlerState = EvHandler:request_headers(RequestEvent, EvHandlerState1), InitialFlow = initial_flow(InitialFlow0, Opts), Stream = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, flow=InitialFlow}, - State = State0#http2_state{http2_machine=HTTP2Machine, streams=[Stream|Streams]}, + State = create_stream(State0#http2_state{http2_machine=HTTP2Machine}, Stream), case IsFin of fin -> RequestEndEvent = #{ @@ -617,7 +619,7 @@ data(State=#http2_state{http2_machine=HTTP2Machine}, StreamRef, ReplyTo, IsFin, {ok, _, _} -> maybe_send_data(State, StreamID, IsFin, Data, EvHandler, EvHandlerState) end; - false -> + error -> {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState} end. @@ -677,15 +679,15 @@ send_data_frame(State=#http2_state{socket=Socket, transport=Transport, Transport:send(Socket, cow_http2:headers(StreamID, fin, HeaderBlock)), State#http2_state{http2_machine=HTTP2Machine}. -reset_stream(State=#http2_state{socket=Socket, transport=Transport, - streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) -> +reset_stream(State0=#http2_state{socket=Socket, transport=Transport}, + StreamID, StreamError={stream_error, Reason, _}) -> Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), - case lists:keytake(StreamID, #stream.id, Streams0) of - {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} -> + case take_stream(State0, StreamID) of + {#stream{ref=StreamRef, reply_to=ReplyTo}, State} -> ReplyTo ! {gun_error, self(), StreamRef, StreamError}, - State#http2_state{streams=Streams}; - false -> - State + State; + error -> + State0 end. cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine0}, @@ -702,7 +704,7 @@ cancel(State=#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP }, EvHandlerState0), {delete_stream(State#http2_state{http2_machine=HTTP2Machine}, StreamID), EvHandlerState}; - false -> + error -> {error_stream_not_found(State, StreamRef, ReplyTo), EvHandlerState0} end. @@ -715,17 +717,19 @@ stream_info(State, StreamRef) -> reply_to => ReplyTo, state => running }}; - false -> + error -> {ok, undefined} end. -down(#http2_state{streams=Streams}) -> - [Ref || #stream{ref=Ref} <- Streams]. +down(#http2_state{stream_refs=Refs}) -> + maps:keys(Refs). connection_error(#http2_state{socket=Socket, transport=Transport, http2_machine=HTTP2Machine, streams=Streams}, {connection_error, Reason, _}) -> - Pids = lists:usort([ReplyTo || #stream{reply_to=ReplyTo} <- Streams]), + Pids = lists:usort(maps:fold( + fun(_, #stream{reply_to=ReplyTo}, Acc) -> [ReplyTo|Acc] end, + [], Streams)), _ = [Pid ! {gun_error, self(), Reason} || Pid <- Pids], Transport:send(Socket, cow_http2:goaway( cow_http2_machine:get_last_streamid(HTTP2Machine), @@ -745,17 +749,31 @@ error_stream_not_found(State, StreamRef, ReplyTo) -> State. %% Streams. -%% @todo probably change order of args and have state first? Yes. get_stream_by_id(#http2_state{streams=Streams}, StreamID) -> - lists:keyfind(StreamID, #stream.id, Streams). + maps:get(StreamID, Streams). + +get_stream_by_ref(#http2_state{streams=Streams, stream_refs=Refs}, StreamRef) -> + case maps:get(StreamRef, Refs, error) of + error -> error; + StreamID -> maps:get(StreamID, Streams) + end. -get_stream_by_ref(#http2_state{streams=Streams}, StreamRef) -> - lists:keyfind(StreamRef, #stream.ref, Streams). +create_stream(State=#http2_state{streams=Streams, stream_refs=Refs}, + Stream=#stream{id=StreamID, ref=StreamRef}) -> + State#http2_state{ + streams=Streams#{StreamID => Stream}, + stream_refs=Refs#{StreamRef => StreamID} + }. -store_stream(State=#http2_state{streams=Streams0}, Stream=#stream{id=StreamID}) -> - Streams = lists:keyreplace(StreamID, #stream.id, Streams0, Stream), - State#http2_state{streams=Streams}. +store_stream(State=#http2_state{streams=Streams}, Stream=#stream{id=StreamID}) -> + State#http2_state{streams=Streams#{StreamID => Stream}}. + +take_stream(State=#http2_state{streams=Streams0}, StreamID) -> + case maps:take(StreamID, Streams0) of + {Stream, Streams} -> {Stream, State#http2_state{streams=Streams}}; + error -> error + end. maybe_delete_stream(State=#http2_state{http2_machine=HTTP2Machine}, StreamID, local, fin) -> case cow_http2_machine:get_stream_remote_state(StreamID, HTTP2Machine) of @@ -772,6 +790,9 @@ maybe_delete_stream(State=#http2_state{http2_machine=HTTP2Machine}, StreamID, re maybe_delete_stream(State, _, _, _) -> State. -delete_stream(State=#http2_state{streams=Streams}, StreamID) -> - Streams2 = lists:keydelete(StreamID, #stream.id, Streams), - State#http2_state{streams=Streams2}. +delete_stream(State=#http2_state{streams=Streams, stream_refs=Refs}, StreamID) -> + #{StreamID := #stream{ref=StreamRef}} = Streams, + State#http2_state{ + streams=maps:remove(StreamID, Streams), + stream_refs=maps:remove(StreamRef, Refs) + }. |