diff options
author | Andrei Nesterov <[email protected]> | 2016-11-16 20:57:11 +0300 |
---|---|---|
committer | Loïc Hoguin <[email protected]> | 2017-05-01 17:31:56 +0200 |
commit | 0a181681223aead4043b2437fe493652db6e5f8a (patch) | |
tree | 807f4b07b0a1995f7c40af1fde1ad9b330a0ba31 /src/gun_spdy.erl | |
parent | fb4bd38ffd2c330cbd677d958477aa909210a0b3 (diff) | |
download | gun-0a181681223aead4043b2437fe493652db6e5f8a.tar.gz gun-0a181681223aead4043b2437fe493652db6e5f8a.tar.bz2 gun-0a181681223aead4043b2437fe493652db6e5f8a.zip |
Add support for choosing a process to reply to
Diffstat (limited to 'src/gun_spdy.erl')
-rw-r--r-- | src/gun_spdy.erl | 95 |
1 files changed, 47 insertions, 48 deletions
diff --git a/src/gun_spdy.erl b/src/gun_spdy.erl index dcd7496..b7306a8 100644 --- a/src/gun_spdy.erl +++ b/src/gun_spdy.erl @@ -20,15 +20,16 @@ -export([handle/2]). -export([close/1]). -export([keepalive/1]). --export([request/7]). -export([request/8]). --export([data/4]). --export([cancel/2]). +-export([request/9]). +-export([data/5]). +-export([cancel/3]). -export([down/1]). -record(stream, { id :: non_neg_integer(), ref :: reference(), + reply_to :: pid(), in :: boolean(), %% true = open out :: boolean(), %% true = open version :: binary() @@ -75,38 +76,36 @@ handle_loop(Data, State=#spdy_state{zinf=Zinf}) -> State#spdy_state{buffer=Data} end. -handle_frame(Rest, State=#spdy_state{owner=Owner, - socket=Socket, transport=Transport}, +handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport}, {data, StreamID, IsFin, Data}) -> case get_stream_by_id(StreamID, State) of #stream{in=false} -> Transport:send(Socket, cow_spdy:rst_stream(StreamID, stream_already_closed)), handle_loop(Rest, delete_stream(StreamID, State)); - S = #stream{ref=StreamRef} when IsFin -> - Owner ! {gun_data, self(), StreamRef, fin, Data}, + S = #stream{ref=StreamRef, reply_to=ReplyTo} when IsFin -> + ReplyTo ! {gun_data, self(), StreamRef, fin, Data}, handle_loop(Rest, in_fin_stream(S, State)); - #stream{ref=StreamRef} -> - Owner ! {gun_data, self(), StreamRef, nofin, Data}, + #stream{ref=StreamRef, reply_to=ReplyTo} -> + ReplyTo ! {gun_data, self(), StreamRef, nofin, Data}, handle_loop(Rest, State); false -> Transport:send(Socket, cow_spdy:rst_stream(StreamID, invalid_stream)), handle_loop(Rest, delete_stream(StreamID, State)) end; -handle_frame(Rest, State=#spdy_state{owner=Owner, - socket=Socket, transport=Transport}, +handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport}, {syn_stream, StreamID, AssocToStreamID, IsFin, IsUnidirectional, _, Method, Scheme, Host, Path, Version, Headers}) when AssocToStreamID =/= 0, IsUnidirectional -> case get_stream_by_id(StreamID, State) of false -> case get_stream_by_id(AssocToStreamID, State) of - #stream{ref=AssocToStreamRef} -> + #stream{ref=AssocToStreamRef, reply_to=ReplyTo} -> StreamRef = make_ref(), - Owner ! {gun_push, self(), AssocToStreamRef, StreamRef, Method, + ReplyTo ! {gun_push, self(), AssocToStreamRef, StreamRef, Method, iolist_to_binary([Scheme, <<"://">>, Host, Path]), Headers}, - handle_loop(Rest, new_stream(StreamID, StreamRef, + handle_loop(Rest, new_stream(StreamID, StreamRef, ReplyTo, not IsFin, false, Version, State)); false -> Transport:send(Socket, @@ -123,20 +122,19 @@ handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport}, Transport:send(Socket, cow_spdy:rst_stream(StreamID, protocol_error)), handle_loop(Rest, State); -handle_frame(Rest, State=#spdy_state{owner=Owner, - socket=Socket, transport=Transport}, +handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport}, {syn_reply, StreamID, IsFin, Status, _, Headers}) -> case get_stream_by_id(StreamID, State) of #stream{in=false} -> Transport:send(Socket, cow_spdy:rst_stream(StreamID, stream_already_closed)), handle_loop(Rest, delete_stream(StreamID, State)); - S = #stream{ref=StreamRef} when IsFin -> - Owner ! {gun_response, self(), StreamRef, fin, + S = #stream{ref=StreamRef, reply_to=ReplyTo} when IsFin -> + ReplyTo ! {gun_response, self(), StreamRef, fin, parse_status(Status), Headers}, handle_loop(Rest, in_fin_stream(S, State)); - #stream{ref=StreamRef} -> - Owner ! {gun_response, self(), StreamRef, nofin, + #stream{ref=StreamRef, reply_to=ReplyTo} -> + ReplyTo ! {gun_response, self(), StreamRef, nofin, parse_status(Status), Headers}, handle_loop(Rest, State); false -> @@ -144,11 +142,10 @@ handle_frame(Rest, State=#spdy_state{owner=Owner, cow_spdy:rst_stream(StreamID, invalid_stream)), handle_loop(Rest, delete_stream(StreamID, State)) end; -handle_frame(Rest, State=#spdy_state{owner=Owner}, - {rst_stream, StreamID, Status}) -> +handle_frame(Rest, State, {rst_stream, StreamID, Status}) -> case get_stream_by_id(StreamID, State) of - #stream{ref=StreamRef} -> - Owner ! {gun_error, self(), StreamRef, Status}, + #stream{ref=StreamRef, reply_to=ReplyTo} -> + ReplyTo ! {gun_error, self(), StreamRef, Status}, handle_loop(Rest, delete_stream(StreamID, State)); false -> handle_loop(Rest, State) @@ -177,10 +174,12 @@ handle_frame(Rest, State, {window_update, StreamID, DeltaWindowSize}) -> error_logger:error_msg("Ignored WINDOW_UPDATE control frame ~p ~p~n", [StreamID, DeltaWindowSize]), handle_loop(Rest, State); -handle_frame(_, #spdy_state{owner=Owner, socket=Socket, transport=Transport}, +handle_frame(_, #spdy_state{streams=Streams, socket=Socket, transport=Transport}, {error, badprotocol}) -> - Owner ! {gun_error, self(), {badprotocol, - "The remote endpoint sent invalid data."}}, + %% Because a particular stream is unknown, + %% we're sending the error message to all streams. + Reason = {badprotocol, "The remote endpoint sent invalid data."}, + _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams], %% @todo LastGoodStreamID Transport:send(Socket, cow_spdy:goaway(0, protocol_error)), close. @@ -189,15 +188,15 @@ parse_status(Status) -> << Code:3/binary, _/bits >> = Status, list_to_integer(binary_to_list(Code)). -close(#spdy_state{owner=Owner, streams=Streams}) -> - close_streams(Owner, Streams). +close(#spdy_state{streams=Streams}) -> + close_streams(Streams). -close_streams(_, []) -> +close_streams([]) -> ok; -close_streams(Owner, [#stream{ref=StreamRef}|Tail]) -> - Owner ! {gun_error, self(), StreamRef, {closed, +close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) -> + ReplyTo ! {gun_error, self(), StreamRef, {closed, "The connection was lost."}}, - close_streams(Owner, Tail). + close_streams(Tail). keepalive(State=#spdy_state{socket=Socket, transport=Transport, ping_id=PingID}) -> @@ -206,20 +205,20 @@ keepalive(State=#spdy_state{socket=Socket, transport=Transport, %% @todo Always https scheme? request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef, - stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers) -> + stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) -> {Host2, Headers2} = prepare_request(Headers, Host, Port), Out = (false =/= lists:keyfind(<<"content-type">>, 1, Headers2)) orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers2)), Transport:send(Socket, cow_spdy:syn_stream(Zdef, StreamID, 0, not Out, false, 0, Method, <<"https">>, Host2, Path, <<"HTTP/1.1">>, Headers2)), - new_stream(StreamID, StreamRef, true, Out, <<"HTTP/1.1">>, + new_stream(StreamID, StreamRef, ReplyTo, true, Out, <<"HTTP/1.1">>, State#spdy_state{stream_id=StreamID + 2}). %% @todo Handle Body > 16MB. (split it out into many frames) %% @todo Always https scheme? request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef, - stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers, Body) -> + stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body) -> {Host2, Headers2} = prepare_request(Headers, Host, Port), Headers3 = lists:keystore(<<"content-length">>, 1, Headers2, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), @@ -229,7 +228,7 @@ request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef, Method, <<"https">>, Host2, Path, <<"HTTP/1.1">>, Headers3), cow_spdy:data(StreamID, true, Body) ]), - new_stream(StreamID, StreamRef, true, false, <<"HTTP/1.1">>, + new_stream(StreamID, StreamRef, ReplyTo, true, false, <<"HTTP/1.1">>, State#spdy_state{stream_id=StreamID + 2}). prepare_request(Headers, Host, Port) -> @@ -243,10 +242,10 @@ prepare_request(Headers, Host, Port) -> end. data(State=#spdy_state{socket=Socket, transport=Transport}, - StreamRef, IsFin, Data) -> + StreamRef, ReplyTo, IsFin, Data) -> case get_stream_by_ref(StreamRef, State) of #stream{out=false} -> - error_stream_closed(State, StreamRef); + error_stream_closed(State, StreamRef, ReplyTo); S = #stream{} -> IsFin2 = IsFin =:= fin, Transport:send(Socket, cow_spdy:data(S#stream.id, IsFin2, Data)), @@ -256,17 +255,17 @@ data(State=#spdy_state{socket=Socket, transport=Transport}, State end; false -> - error_stream_not_found(State, StreamRef) + error_stream_not_found(State, StreamRef, ReplyTo) end. cancel(State=#spdy_state{socket=Socket, transport=Transport}, - StreamRef) -> + StreamRef, ReplyTo) -> case get_stream_by_ref(StreamRef, State) of #stream{id=StreamID} -> Transport:send(Socket, cow_spdy:rst_stream(StreamID, cancel)), delete_stream(StreamID, State); false -> - error_stream_not_found(State, StreamRef) + error_stream_not_found(State, StreamRef, ReplyTo) end. %% @todo Add unprocessed streams when GOAWAY handling is done. @@ -274,22 +273,22 @@ down(#spdy_state{streams=Streams}) -> KilledStreams = [Ref || #stream{ref=Ref} <- Streams], {KilledStreams, []}. -error_stream_closed(State=#spdy_state{owner=Owner}, StreamRef) -> - Owner ! {gun_error, self(), StreamRef, {badstate, +error_stream_closed(State, StreamRef, ReplyTo) -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream has already been closed."}}, State. -error_stream_not_found(State=#spdy_state{owner=Owner}, StreamRef) -> - Owner ! {gun_error, self(), StreamRef, {badstate, +error_stream_not_found(State, StreamRef, ReplyTo) -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream cannot be found."}}, State. %% Streams. %% @todo probably change order of args and have state first? -new_stream(StreamID, StreamRef, In, Out, Version, +new_stream(StreamID, StreamRef, ReplyTo, In, Out, Version, State=#spdy_state{streams=Streams}) -> - New = #stream{id=StreamID, ref=StreamRef, + New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, in=In, out=Out, version=Version}, State#spdy_state{streams=[New|Streams]}. |