aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_http2.erl
diff options
context:
space:
mode:
authorAndrei Nesterov <[email protected]>2016-11-16 20:57:11 +0300
committerLoïc Hoguin <[email protected]>2017-05-01 17:31:56 +0200
commit0a181681223aead4043b2437fe493652db6e5f8a (patch)
tree807f4b07b0a1995f7c40af1fde1ad9b330a0ba31 /src/gun_http2.erl
parentfb4bd38ffd2c330cbd677d958477aa909210a0b3 (diff)
downloadgun-0a181681223aead4043b2437fe493652db6e5f8a.tar.gz
gun-0a181681223aead4043b2437fe493652db6e5f8a.tar.bz2
gun-0a181681223aead4043b2437fe493652db6e5f8a.zip
Add support for choosing a process to reply to
Diffstat (limited to 'src/gun_http2.erl')
-rw-r--r--src/gun_http2.erl98
1 files changed, 56 insertions, 42 deletions
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 30ee06e..6073119 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -20,15 +20,16 @@
-export([handle/2]).
-export([close/1]).
-export([keepalive/1]).
--export([request/7]).
-export([request/8]).
--export([data/4]).
--export([cancel/2]).
+-export([request/9]).
+-export([data/5]).
+-export([cancel/3]).
-export([down/1]).
-record(stream, {
id :: non_neg_integer(),
ref :: reference(),
+ reply_to :: pid(),
%% Whether we finished sending data.
local = nofin :: cowboy_stream:fin(),
%% Whether we finished receiving data.
@@ -111,19 +112,18 @@ frame({data, StreamID, IsFin, Data}, State) ->
end;
%% Single HEADERS frame headers block.
frame({headers, StreamID, IsFin, head_fin, HeaderBlock},
- State=#http2_state{owner=Owner, decode_state=DecodeState0, content_handlers=Handlers0}) ->
+ State=#http2_state{decode_state=DecodeState0, content_handlers=Handlers0}) ->
case get_stream_by_id(StreamID, State) of
- Stream = #stream{ref=StreamRef, remote=nofin} ->
+ Stream = #stream{ref=StreamRef, reply_to=ReplyTo, remote=nofin} ->
try cow_hpack:decode(HeaderBlock, DecodeState0) of
{Headers0, DecodeState} ->
case lists:keytake(<<":status">>, 1, Headers0) of
{value, {_, Status}, Headers} ->
- Owner ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers},
- %% @todo Change to ReplyTo.
+ ReplyTo ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers},
Handlers = case IsFin of
fin -> undefined;
nofin ->
- gun_content_handler:init(Owner, StreamRef,
+ gun_content_handler:init(ReplyTo, StreamRef,
Status, Headers, Handlers0)
end,
remote_fin(Stream#stream{handler_state=Handlers},
@@ -133,7 +133,7 @@ frame({headers, StreamID, IsFin, head_fin, HeaderBlock},
'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'})
end
catch _:_ ->
- terminate(State, {connection_error, compression_error,
+ terminate(State, StreamID, {connection_error, compression_error,
'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
end;
_ ->
@@ -177,7 +177,7 @@ frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock},
case get_stream_by_id(PromisedStreamID, State) of
false ->
case get_stream_by_id(StreamID, State) of
- #stream{ref=StreamRef} ->
+ #stream{ref=StreamRef, reply_to=ReplyTo} ->
try cow_hpack:decode(HeaderBlock, DecodeState0) of
{Headers0, DecodeState} ->
{Method, Scheme, Authority, Path, Headers} = try
@@ -191,9 +191,9 @@ frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock},
'Malformed push promise; missing pseudo-header field. (RFC7540 8.1.2.3)'})
end,
NewStreamRef = make_ref(),
- Owner ! {gun_push, self(), StreamRef, NewStreamRef, Method,
+ ReplyTo ! {gun_push, self(), StreamRef, NewStreamRef, Method,
iolist_to_binary([Scheme, <<"://">>, Authority, Path]), Headers},
- new_stream(PromisedStreamID, NewStreamRef, nofin, fin,
+ new_stream(PromisedStreamID, NewStreamRef, ReplyTo, nofin, fin,
State#http2_state{decode_state=DecodeState})
catch _:_ ->
terminate(State, {connection_error, compression_error,
@@ -216,8 +216,8 @@ frame({ping, Opaque}, State=#http2_state{socket=Socket, transport=Transport}) ->
frame({ping_ack, _Opaque}, State) ->
State;
%% GOAWAY frame.
-frame(Frame={goaway, _, _, _}, State) ->
- terminate(State, {stop, Frame, 'Client is going away.'});
+frame(Frame={goaway, StreamID, _, _}, State) ->
+ terminate(State, StreamID, {stop, Frame, 'Client is going away.'});
%% Connection-wide WINDOW_UPDATE frame.
frame({window_update, _Increment}, State) ->
%% @todo control flow
@@ -227,30 +227,30 @@ frame({window_update, _StreamID, _Increment}, State) ->
%% @todo stream-specific control flow
State;
%% Unexpected CONTINUATION frame.
-frame({continuation, _, _, _}, State) ->
- terminate(State, {connection_error, protocol_error,
+frame({continuation, StreamID, _, _}, State) ->
+ terminate(State, StreamID, {connection_error, protocol_error,
'CONTINUATION frames MUST be preceded by a HEADERS frame. (RFC7540 6.10)'}).
parse_status(Status) ->
<< Code:3/binary, _/bits >> = Status,
list_to_integer(binary_to_list(Code)).
-close(#http2_state{owner=Owner, streams=Streams}) ->
- close_streams(Owner, Streams).
+close(#http2_state{streams=Streams}) ->
+ close_streams(Streams).
-close_streams(_, []) ->
+close_streams([]) ->
ok;
-close_streams(Owner, [#stream{ref=StreamRef}|Tail]) ->
- Owner ! {gun_error, self(), StreamRef, {closed,
+close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {closed,
"The connection was lost."}},
- close_streams(Owner, Tail).
+ close_streams(Tail).
keepalive(State=#http2_state{socket=Socket, transport=Transport}) ->
Transport:send(Socket, cow_http2:ping(0)),
State.
request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0,
- stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers) ->
+ stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) ->
{HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers),
IsFin = case (false =/= lists:keyfind(<<"content-type">>, 1, Headers))
orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers)) of
@@ -258,12 +258,12 @@ request(State=#http2_state{socket=Socket, transport=Transport, encode_state=Enco
false -> fin
end,
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
- new_stream(StreamID, StreamRef, nofin, IsFin,
+ new_stream(StreamID, StreamRef, ReplyTo, nofin, IsFin,
State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}).
%% @todo Handle Body > 16MB. (split it out into many frames)
request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0,
- stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers0, Body) ->
+ stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body) ->
Headers = lists:keystore(<<"content-length">>, 1, Headers0,
{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
{HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers),
@@ -272,7 +272,7 @@ request(State=#http2_state{socket=Socket, transport=Transport, encode_state=Enco
%% Use the length set by the server instead, if any.
%% @todo Would be better if we didn't have to convert to binary.
send_data(Socket, Transport, StreamID, fin, iolist_to_binary(Body), 16384),
- new_stream(StreamID, StreamRef, nofin, fin,
+ new_stream(StreamID, StreamRef, ReplyTo, nofin, fin,
State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}).
prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) ->
@@ -299,10 +299,11 @@ prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) ->
|Headers1],
cow_hpack:encode(Headers, EncodeState).
-data(State=#http2_state{socket=Socket, transport=Transport}, StreamRef, IsFin, Data) ->
+data(State=#http2_state{socket=Socket, transport=Transport},
+ StreamRef, ReplyTo, IsFin, Data) ->
case get_stream_by_ref(StreamRef, State) of
#stream{local=fin} ->
- error_stream_closed(State, StreamRef);
+ error_stream_closed(State, StreamRef, ReplyTo);
S = #stream{} ->
%% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE.
%% Use the length set by the server instead, if any.
@@ -310,7 +311,7 @@ data(State=#http2_state{socket=Socket, transport=Transport}, StreamRef, IsFin, D
send_data(Socket, Transport, S#stream.id, IsFin, iolist_to_binary(Data), 16384),
local_fin(S, State, IsFin);
false ->
- error_stream_not_found(State, StreamRef)
+ error_stream_not_found(State, StreamRef, ReplyTo)
end.
%% This same function is found in cowboy_http2.
@@ -325,13 +326,13 @@ send_data(Socket, Transport, StreamID, IsFin, Data, Length) ->
end.
cancel(State=#http2_state{socket=Socket, transport=Transport},
- StreamRef) ->
+ StreamRef, ReplyTo) ->
case get_stream_by_ref(StreamRef, State) of
#stream{id=StreamID} ->
Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)),
delete_stream(StreamID, State);
false ->
- error_stream_not_found(State, StreamRef)
+ error_stream_not_found(State, StreamRef, ReplyTo)
end.
%% @todo Add unprocessed streams when GOAWAY handling is done.
@@ -339,18 +340,31 @@ down(#http2_state{streams=Streams}) ->
KilledStreams = [Ref || #stream{ref=Ref} <- Streams],
{KilledStreams, []}.
-terminate(#http2_state{owner=Owner}, Reason) ->
- Owner ! {gun_error, self(), Reason},
+terminate(#http2_state{streams=Streams}, Reason) ->
+ %% Because a particular stream is unknown,
+ %% we're sending the error message to all streams.
+ _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams],
%% @todo Send GOAWAY frame.
%% @todo LastGoodStreamID
close.
-stream_reset(State=#http2_state{owner=Owner, socket=Socket, transport=Transport,
+terminate(State, StreamID, Reason) ->
+ case get_stream_by_id(StreamID, State) of
+ #stream{reply_to=ReplyTo} ->
+ ReplyTo ! {gun_error, self(), Reason},
+ %% @todo Send GOAWAY frame.
+ %% @todo LastGoodStreamID
+ close;
+ _ ->
+ terminate(State, Reason)
+ end.
+
+stream_reset(State=#http2_state{socket=Socket, transport=Transport,
streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) ->
Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
case lists:keytake(StreamID, #stream.id, Streams0) of
- {value, #stream{ref=StreamRef}, Streams} ->
- Owner ! {gun_error, self(), StreamRef, StreamError},
+ {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} ->
+ ReplyTo ! {gun_error, self(), StreamRef, StreamError},
State#http2_state{streams=Streams};
false ->
%% @todo Unknown stream. Not sure what to do here. Check again once all
@@ -358,22 +372,22 @@ stream_reset(State=#http2_state{owner=Owner, socket=Socket, transport=Transport,
State
end.
-error_stream_closed(State=#http2_state{owner=Owner}, StreamRef) ->
- Owner ! {gun_error, self(), StreamRef, {badstate,
+error_stream_closed(State, StreamRef, ReplyTo) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"The stream has already been closed."}},
State.
-error_stream_not_found(State=#http2_state{owner=Owner}, StreamRef) ->
- Owner ! {gun_error, self(), StreamRef, {badstate,
+error_stream_not_found(State, StreamRef, ReplyTo) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"The stream cannot be found."}},
State.
%% Streams.
%% @todo probably change order of args and have state first?
-new_stream(StreamID, StreamRef, Remote, Local,
+new_stream(StreamID, StreamRef, ReplyTo, Remote, Local,
State=#http2_state{streams=Streams}) ->
- New = #stream{id=StreamID, ref=StreamRef, remote=Remote, local=Local},
+ New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, remote=Remote, local=Local},
State#http2_state{streams=[New|Streams]}.
get_stream_by_id(StreamID, #http2_state{streams=Streams}) ->