From 0a181681223aead4043b2437fe493652db6e5f8a Mon Sep 17 00:00:00 2001 From: Andrei Nesterov Date: Wed, 16 Nov 2016 20:57:11 +0300 Subject: Add support for choosing a process to reply to --- src/gun_http2.erl | 98 +++++++++++++++++++++++++++++++------------------------ 1 file changed, 56 insertions(+), 42 deletions(-) (limited to 'src/gun_http2.erl') diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 30ee06e..6073119 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -20,15 +20,16 @@ -export([handle/2]). -export([close/1]). -export([keepalive/1]). --export([request/7]). -export([request/8]). --export([data/4]). --export([cancel/2]). +-export([request/9]). +-export([data/5]). +-export([cancel/3]). -export([down/1]). -record(stream, { id :: non_neg_integer(), ref :: reference(), + reply_to :: pid(), %% Whether we finished sending data. local = nofin :: cowboy_stream:fin(), %% Whether we finished receiving data. @@ -111,19 +112,18 @@ frame({data, StreamID, IsFin, Data}, State) -> end; %% Single HEADERS frame headers block. frame({headers, StreamID, IsFin, head_fin, HeaderBlock}, - State=#http2_state{owner=Owner, decode_state=DecodeState0, content_handlers=Handlers0}) -> + State=#http2_state{decode_state=DecodeState0, content_handlers=Handlers0}) -> case get_stream_by_id(StreamID, State) of - Stream = #stream{ref=StreamRef, remote=nofin} -> + Stream = #stream{ref=StreamRef, reply_to=ReplyTo, remote=nofin} -> try cow_hpack:decode(HeaderBlock, DecodeState0) of {Headers0, DecodeState} -> case lists:keytake(<<":status">>, 1, Headers0) of {value, {_, Status}, Headers} -> - Owner ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers}, - %% @todo Change to ReplyTo. + ReplyTo ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers}, Handlers = case IsFin of fin -> undefined; nofin -> - gun_content_handler:init(Owner, StreamRef, + gun_content_handler:init(ReplyTo, StreamRef, Status, Headers, Handlers0) end, remote_fin(Stream#stream{handler_state=Handlers}, @@ -133,7 +133,7 @@ frame({headers, StreamID, IsFin, head_fin, HeaderBlock}, 'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'}) end catch _:_ -> - terminate(State, {connection_error, compression_error, + terminate(State, StreamID, {connection_error, compression_error, 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}) end; _ -> @@ -177,7 +177,7 @@ frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock}, case get_stream_by_id(PromisedStreamID, State) of false -> case get_stream_by_id(StreamID, State) of - #stream{ref=StreamRef} -> + #stream{ref=StreamRef, reply_to=ReplyTo} -> try cow_hpack:decode(HeaderBlock, DecodeState0) of {Headers0, DecodeState} -> {Method, Scheme, Authority, Path, Headers} = try @@ -191,9 +191,9 @@ frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock}, 'Malformed push promise; missing pseudo-header field. (RFC7540 8.1.2.3)'}) end, NewStreamRef = make_ref(), - Owner ! {gun_push, self(), StreamRef, NewStreamRef, Method, + ReplyTo ! {gun_push, self(), StreamRef, NewStreamRef, Method, iolist_to_binary([Scheme, <<"://">>, Authority, Path]), Headers}, - new_stream(PromisedStreamID, NewStreamRef, nofin, fin, + new_stream(PromisedStreamID, NewStreamRef, ReplyTo, nofin, fin, State#http2_state{decode_state=DecodeState}) catch _:_ -> terminate(State, {connection_error, compression_error, @@ -216,8 +216,8 @@ frame({ping, Opaque}, State=#http2_state{socket=Socket, transport=Transport}) -> frame({ping_ack, _Opaque}, State) -> State; %% GOAWAY frame. -frame(Frame={goaway, _, _, _}, State) -> - terminate(State, {stop, Frame, 'Client is going away.'}); +frame(Frame={goaway, StreamID, _, _}, State) -> + terminate(State, StreamID, {stop, Frame, 'Client is going away.'}); %% Connection-wide WINDOW_UPDATE frame. frame({window_update, _Increment}, State) -> %% @todo control flow @@ -227,30 +227,30 @@ frame({window_update, _StreamID, _Increment}, State) -> %% @todo stream-specific control flow State; %% Unexpected CONTINUATION frame. -frame({continuation, _, _, _}, State) -> - terminate(State, {connection_error, protocol_error, +frame({continuation, StreamID, _, _}, State) -> + terminate(State, StreamID, {connection_error, protocol_error, 'CONTINUATION frames MUST be preceded by a HEADERS frame. (RFC7540 6.10)'}). parse_status(Status) -> << Code:3/binary, _/bits >> = Status, list_to_integer(binary_to_list(Code)). -close(#http2_state{owner=Owner, streams=Streams}) -> - close_streams(Owner, Streams). +close(#http2_state{streams=Streams}) -> + close_streams(Streams). -close_streams(_, []) -> +close_streams([]) -> ok; -close_streams(Owner, [#stream{ref=StreamRef}|Tail]) -> - Owner ! {gun_error, self(), StreamRef, {closed, +close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) -> + ReplyTo ! {gun_error, self(), StreamRef, {closed, "The connection was lost."}}, - close_streams(Owner, Tail). + close_streams(Tail). keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> Transport:send(Socket, cow_http2:ping(0)), State. request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0, - stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers) -> + stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) -> {HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers), IsFin = case (false =/= lists:keyfind(<<"content-type">>, 1, Headers)) orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers)) of @@ -258,12 +258,12 @@ request(State=#http2_state{socket=Socket, transport=Transport, encode_state=Enco false -> fin end, Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), - new_stream(StreamID, StreamRef, nofin, IsFin, + new_stream(StreamID, StreamRef, ReplyTo, nofin, IsFin, State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}). %% @todo Handle Body > 16MB. (split it out into many frames) request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0, - stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers0, Body) -> + stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body) -> Headers = lists:keystore(<<"content-length">>, 1, Headers0, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), {HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers), @@ -272,7 +272,7 @@ request(State=#http2_state{socket=Socket, transport=Transport, encode_state=Enco %% Use the length set by the server instead, if any. %% @todo Would be better if we didn't have to convert to binary. send_data(Socket, Transport, StreamID, fin, iolist_to_binary(Body), 16384), - new_stream(StreamID, StreamRef, nofin, fin, + new_stream(StreamID, StreamRef, ReplyTo, nofin, fin, State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}). prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) -> @@ -299,10 +299,11 @@ prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) -> |Headers1], cow_hpack:encode(Headers, EncodeState). -data(State=#http2_state{socket=Socket, transport=Transport}, StreamRef, IsFin, Data) -> +data(State=#http2_state{socket=Socket, transport=Transport}, + StreamRef, ReplyTo, IsFin, Data) -> case get_stream_by_ref(StreamRef, State) of #stream{local=fin} -> - error_stream_closed(State, StreamRef); + error_stream_closed(State, StreamRef, ReplyTo); S = #stream{} -> %% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE. %% Use the length set by the server instead, if any. @@ -310,7 +311,7 @@ data(State=#http2_state{socket=Socket, transport=Transport}, StreamRef, IsFin, D send_data(Socket, Transport, S#stream.id, IsFin, iolist_to_binary(Data), 16384), local_fin(S, State, IsFin); false -> - error_stream_not_found(State, StreamRef) + error_stream_not_found(State, StreamRef, ReplyTo) end. %% This same function is found in cowboy_http2. @@ -325,13 +326,13 @@ send_data(Socket, Transport, StreamID, IsFin, Data, Length) -> end. cancel(State=#http2_state{socket=Socket, transport=Transport}, - StreamRef) -> + StreamRef, ReplyTo) -> case get_stream_by_ref(StreamRef, State) of #stream{id=StreamID} -> Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)), delete_stream(StreamID, State); false -> - error_stream_not_found(State, StreamRef) + error_stream_not_found(State, StreamRef, ReplyTo) end. %% @todo Add unprocessed streams when GOAWAY handling is done. @@ -339,18 +340,31 @@ down(#http2_state{streams=Streams}) -> KilledStreams = [Ref || #stream{ref=Ref} <- Streams], {KilledStreams, []}. -terminate(#http2_state{owner=Owner}, Reason) -> - Owner ! {gun_error, self(), Reason}, +terminate(#http2_state{streams=Streams}, Reason) -> + %% Because a particular stream is unknown, + %% we're sending the error message to all streams. + _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams], %% @todo Send GOAWAY frame. %% @todo LastGoodStreamID close. -stream_reset(State=#http2_state{owner=Owner, socket=Socket, transport=Transport, +terminate(State, StreamID, Reason) -> + case get_stream_by_id(StreamID, State) of + #stream{reply_to=ReplyTo} -> + ReplyTo ! {gun_error, self(), Reason}, + %% @todo Send GOAWAY frame. + %% @todo LastGoodStreamID + close; + _ -> + terminate(State, Reason) + end. + +stream_reset(State=#http2_state{socket=Socket, transport=Transport, streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) -> Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), case lists:keytake(StreamID, #stream.id, Streams0) of - {value, #stream{ref=StreamRef}, Streams} -> - Owner ! {gun_error, self(), StreamRef, StreamError}, + {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} -> + ReplyTo ! {gun_error, self(), StreamRef, StreamError}, State#http2_state{streams=Streams}; false -> %% @todo Unknown stream. Not sure what to do here. Check again once all @@ -358,22 +372,22 @@ stream_reset(State=#http2_state{owner=Owner, socket=Socket, transport=Transport, State end. -error_stream_closed(State=#http2_state{owner=Owner}, StreamRef) -> - Owner ! {gun_error, self(), StreamRef, {badstate, +error_stream_closed(State, StreamRef, ReplyTo) -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream has already been closed."}}, State. -error_stream_not_found(State=#http2_state{owner=Owner}, StreamRef) -> - Owner ! {gun_error, self(), StreamRef, {badstate, +error_stream_not_found(State, StreamRef, ReplyTo) -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream cannot be found."}}, State. %% Streams. %% @todo probably change order of args and have state first? -new_stream(StreamID, StreamRef, Remote, Local, +new_stream(StreamID, StreamRef, ReplyTo, Remote, Local, State=#http2_state{streams=Streams}) -> - New = #stream{id=StreamID, ref=StreamRef, remote=Remote, local=Local}, + New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, remote=Remote, local=Local}, State#http2_state{streams=[New|Streams]}. get_stream_by_id(StreamID, #http2_state{streams=Streams}) -> -- cgit v1.2.3