aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_spdy.erl
diff options
context:
space:
mode:
authorAndrei Nesterov <[email protected]>2016-11-16 20:57:11 +0300
committerLoïc Hoguin <[email protected]>2017-05-01 17:31:56 +0200
commit0a181681223aead4043b2437fe493652db6e5f8a (patch)
tree807f4b07b0a1995f7c40af1fde1ad9b330a0ba31 /src/gun_spdy.erl
parentfb4bd38ffd2c330cbd677d958477aa909210a0b3 (diff)
downloadgun-0a181681223aead4043b2437fe493652db6e5f8a.tar.gz
gun-0a181681223aead4043b2437fe493652db6e5f8a.tar.bz2
gun-0a181681223aead4043b2437fe493652db6e5f8a.zip
Add support for choosing a process to reply to
Diffstat (limited to 'src/gun_spdy.erl')
-rw-r--r--src/gun_spdy.erl95
1 files changed, 47 insertions, 48 deletions
diff --git a/src/gun_spdy.erl b/src/gun_spdy.erl
index dcd7496..b7306a8 100644
--- a/src/gun_spdy.erl
+++ b/src/gun_spdy.erl
@@ -20,15 +20,16 @@
-export([handle/2]).
-export([close/1]).
-export([keepalive/1]).
--export([request/7]).
-export([request/8]).
--export([data/4]).
--export([cancel/2]).
+-export([request/9]).
+-export([data/5]).
+-export([cancel/3]).
-export([down/1]).
-record(stream, {
id :: non_neg_integer(),
ref :: reference(),
+ reply_to :: pid(),
in :: boolean(), %% true = open
out :: boolean(), %% true = open
version :: binary()
@@ -75,38 +76,36 @@ handle_loop(Data, State=#spdy_state{zinf=Zinf}) ->
State#spdy_state{buffer=Data}
end.
-handle_frame(Rest, State=#spdy_state{owner=Owner,
- socket=Socket, transport=Transport},
+handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport},
{data, StreamID, IsFin, Data}) ->
case get_stream_by_id(StreamID, State) of
#stream{in=false} ->
Transport:send(Socket,
cow_spdy:rst_stream(StreamID, stream_already_closed)),
handle_loop(Rest, delete_stream(StreamID, State));
- S = #stream{ref=StreamRef} when IsFin ->
- Owner ! {gun_data, self(), StreamRef, fin, Data},
+ S = #stream{ref=StreamRef, reply_to=ReplyTo} when IsFin ->
+ ReplyTo ! {gun_data, self(), StreamRef, fin, Data},
handle_loop(Rest, in_fin_stream(S, State));
- #stream{ref=StreamRef} ->
- Owner ! {gun_data, self(), StreamRef, nofin, Data},
+ #stream{ref=StreamRef, reply_to=ReplyTo} ->
+ ReplyTo ! {gun_data, self(), StreamRef, nofin, Data},
handle_loop(Rest, State);
false ->
Transport:send(Socket,
cow_spdy:rst_stream(StreamID, invalid_stream)),
handle_loop(Rest, delete_stream(StreamID, State))
end;
-handle_frame(Rest, State=#spdy_state{owner=Owner,
- socket=Socket, transport=Transport},
+handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport},
{syn_stream, StreamID, AssocToStreamID, IsFin, IsUnidirectional,
_, Method, Scheme, Host, Path, Version, Headers})
when AssocToStreamID =/= 0, IsUnidirectional ->
case get_stream_by_id(StreamID, State) of
false ->
case get_stream_by_id(AssocToStreamID, State) of
- #stream{ref=AssocToStreamRef} ->
+ #stream{ref=AssocToStreamRef, reply_to=ReplyTo} ->
StreamRef = make_ref(),
- Owner ! {gun_push, self(), AssocToStreamRef, StreamRef, Method,
+ ReplyTo ! {gun_push, self(), AssocToStreamRef, StreamRef, Method,
iolist_to_binary([Scheme, <<"://">>, Host, Path]), Headers},
- handle_loop(Rest, new_stream(StreamID, StreamRef,
+ handle_loop(Rest, new_stream(StreamID, StreamRef, ReplyTo,
not IsFin, false, Version, State));
false ->
Transport:send(Socket,
@@ -123,20 +122,19 @@ handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport},
Transport:send(Socket,
cow_spdy:rst_stream(StreamID, protocol_error)),
handle_loop(Rest, State);
-handle_frame(Rest, State=#spdy_state{owner=Owner,
- socket=Socket, transport=Transport},
+handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport},
{syn_reply, StreamID, IsFin, Status, _, Headers}) ->
case get_stream_by_id(StreamID, State) of
#stream{in=false} ->
Transport:send(Socket,
cow_spdy:rst_stream(StreamID, stream_already_closed)),
handle_loop(Rest, delete_stream(StreamID, State));
- S = #stream{ref=StreamRef} when IsFin ->
- Owner ! {gun_response, self(), StreamRef, fin,
+ S = #stream{ref=StreamRef, reply_to=ReplyTo} when IsFin ->
+ ReplyTo ! {gun_response, self(), StreamRef, fin,
parse_status(Status), Headers},
handle_loop(Rest, in_fin_stream(S, State));
- #stream{ref=StreamRef} ->
- Owner ! {gun_response, self(), StreamRef, nofin,
+ #stream{ref=StreamRef, reply_to=ReplyTo} ->
+ ReplyTo ! {gun_response, self(), StreamRef, nofin,
parse_status(Status), Headers},
handle_loop(Rest, State);
false ->
@@ -144,11 +142,10 @@ handle_frame(Rest, State=#spdy_state{owner=Owner,
cow_spdy:rst_stream(StreamID, invalid_stream)),
handle_loop(Rest, delete_stream(StreamID, State))
end;
-handle_frame(Rest, State=#spdy_state{owner=Owner},
- {rst_stream, StreamID, Status}) ->
+handle_frame(Rest, State, {rst_stream, StreamID, Status}) ->
case get_stream_by_id(StreamID, State) of
- #stream{ref=StreamRef} ->
- Owner ! {gun_error, self(), StreamRef, Status},
+ #stream{ref=StreamRef, reply_to=ReplyTo} ->
+ ReplyTo ! {gun_error, self(), StreamRef, Status},
handle_loop(Rest, delete_stream(StreamID, State));
false ->
handle_loop(Rest, State)
@@ -177,10 +174,12 @@ handle_frame(Rest, State, {window_update, StreamID, DeltaWindowSize}) ->
error_logger:error_msg("Ignored WINDOW_UPDATE control frame ~p ~p~n",
[StreamID, DeltaWindowSize]),
handle_loop(Rest, State);
-handle_frame(_, #spdy_state{owner=Owner, socket=Socket, transport=Transport},
+handle_frame(_, #spdy_state{streams=Streams, socket=Socket, transport=Transport},
{error, badprotocol}) ->
- Owner ! {gun_error, self(), {badprotocol,
- "The remote endpoint sent invalid data."}},
+ %% Because a particular stream is unknown,
+ %% we're sending the error message to all streams.
+ Reason = {badprotocol, "The remote endpoint sent invalid data."},
+ _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams],
%% @todo LastGoodStreamID
Transport:send(Socket, cow_spdy:goaway(0, protocol_error)),
close.
@@ -189,15 +188,15 @@ parse_status(Status) ->
<< Code:3/binary, _/bits >> = Status,
list_to_integer(binary_to_list(Code)).
-close(#spdy_state{owner=Owner, streams=Streams}) ->
- close_streams(Owner, Streams).
+close(#spdy_state{streams=Streams}) ->
+ close_streams(Streams).
-close_streams(_, []) ->
+close_streams([]) ->
ok;
-close_streams(Owner, [#stream{ref=StreamRef}|Tail]) ->
- Owner ! {gun_error, self(), StreamRef, {closed,
+close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {closed,
"The connection was lost."}},
- close_streams(Owner, Tail).
+ close_streams(Tail).
keepalive(State=#spdy_state{socket=Socket, transport=Transport,
ping_id=PingID}) ->
@@ -206,20 +205,20 @@ keepalive(State=#spdy_state{socket=Socket, transport=Transport,
%% @todo Always https scheme?
request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef,
- stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers) ->
+ stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) ->
{Host2, Headers2} = prepare_request(Headers, Host, Port),
Out = (false =/= lists:keyfind(<<"content-type">>, 1, Headers2))
orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers2)),
Transport:send(Socket, cow_spdy:syn_stream(Zdef,
StreamID, 0, not Out, false, 0,
Method, <<"https">>, Host2, Path, <<"HTTP/1.1">>, Headers2)),
- new_stream(StreamID, StreamRef, true, Out, <<"HTTP/1.1">>,
+ new_stream(StreamID, StreamRef, ReplyTo, true, Out, <<"HTTP/1.1">>,
State#spdy_state{stream_id=StreamID + 2}).
%% @todo Handle Body > 16MB. (split it out into many frames)
%% @todo Always https scheme?
request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef,
- stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers, Body) ->
+ stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body) ->
{Host2, Headers2} = prepare_request(Headers, Host, Port),
Headers3 = lists:keystore(<<"content-length">>, 1, Headers2,
{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
@@ -229,7 +228,7 @@ request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef,
Method, <<"https">>, Host2, Path, <<"HTTP/1.1">>, Headers3),
cow_spdy:data(StreamID, true, Body)
]),
- new_stream(StreamID, StreamRef, true, false, <<"HTTP/1.1">>,
+ new_stream(StreamID, StreamRef, ReplyTo, true, false, <<"HTTP/1.1">>,
State#spdy_state{stream_id=StreamID + 2}).
prepare_request(Headers, Host, Port) ->
@@ -243,10 +242,10 @@ prepare_request(Headers, Host, Port) ->
end.
data(State=#spdy_state{socket=Socket, transport=Transport},
- StreamRef, IsFin, Data) ->
+ StreamRef, ReplyTo, IsFin, Data) ->
case get_stream_by_ref(StreamRef, State) of
#stream{out=false} ->
- error_stream_closed(State, StreamRef);
+ error_stream_closed(State, StreamRef, ReplyTo);
S = #stream{} ->
IsFin2 = IsFin =:= fin,
Transport:send(Socket, cow_spdy:data(S#stream.id, IsFin2, Data)),
@@ -256,17 +255,17 @@ data(State=#spdy_state{socket=Socket, transport=Transport},
State
end;
false ->
- error_stream_not_found(State, StreamRef)
+ error_stream_not_found(State, StreamRef, ReplyTo)
end.
cancel(State=#spdy_state{socket=Socket, transport=Transport},
- StreamRef) ->
+ StreamRef, ReplyTo) ->
case get_stream_by_ref(StreamRef, State) of
#stream{id=StreamID} ->
Transport:send(Socket, cow_spdy:rst_stream(StreamID, cancel)),
delete_stream(StreamID, State);
false ->
- error_stream_not_found(State, StreamRef)
+ error_stream_not_found(State, StreamRef, ReplyTo)
end.
%% @todo Add unprocessed streams when GOAWAY handling is done.
@@ -274,22 +273,22 @@ down(#spdy_state{streams=Streams}) ->
KilledStreams = [Ref || #stream{ref=Ref} <- Streams],
{KilledStreams, []}.
-error_stream_closed(State=#spdy_state{owner=Owner}, StreamRef) ->
- Owner ! {gun_error, self(), StreamRef, {badstate,
+error_stream_closed(State, StreamRef, ReplyTo) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"The stream has already been closed."}},
State.
-error_stream_not_found(State=#spdy_state{owner=Owner}, StreamRef) ->
- Owner ! {gun_error, self(), StreamRef, {badstate,
+error_stream_not_found(State, StreamRef, ReplyTo) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"The stream cannot be found."}},
State.
%% Streams.
%% @todo probably change order of args and have state first?
-new_stream(StreamID, StreamRef, In, Out, Version,
+new_stream(StreamID, StreamRef, ReplyTo, In, Out, Version,
State=#spdy_state{streams=Streams}) ->
- New = #stream{id=StreamID, ref=StreamRef,
+ New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo,
in=In, out=Out, version=Version},
State#spdy_state{streams=[New|Streams]}.