aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAndrei Nesterov <[email protected]>2016-11-16 20:57:11 +0300
committerLoïc Hoguin <[email protected]>2017-05-01 17:31:56 +0200
commit0a181681223aead4043b2437fe493652db6e5f8a (patch)
tree807f4b07b0a1995f7c40af1fde1ad9b330a0ba31
parentfb4bd38ffd2c330cbd677d958477aa909210a0b3 (diff)
downloadgun-0a181681223aead4043b2437fe493652db6e5f8a.tar.gz
gun-0a181681223aead4043b2437fe493652db6e5f8a.tar.bz2
gun-0a181681223aead4043b2437fe493652db6e5f8a.zip
Add support for choosing a process to reply to
-rw-r--r--doc/src/guide/http.asciidoc21
-rw-r--r--doc/src/guide/protocols.asciidoc2
-rw-r--r--doc/src/guide/websocket.asciidoc4
-rw-r--r--doc/src/manual/gun.asciidoc81
-rw-r--r--src/gun.erl70
-rw-r--r--src/gun_http.erl81
-rw-r--r--src/gun_http2.erl98
-rw-r--r--src/gun_spdy.erl95
-rw-r--r--test/gun_SUITE.erl26
9 files changed, 314 insertions, 164 deletions
diff --git a/doc/src/guide/http.asciidoc b/doc/src/guide/http.asciidoc
index 465a4c5..e856fb1 100644
--- a/doc/src/guide/http.asciidoc
+++ b/doc/src/guide/http.asciidoc
@@ -45,7 +45,7 @@ handling of responses will be explained further on.
==== GET and HEAD
-Use `gun:get/{2,3}` to request a resource.
+Use `gun:get/{2,3,4}` to request a resource.
.GET "/organizations/ninenines"
@@ -64,7 +64,7 @@ Note that the list of headers has the field name as a binary.
The field value is iodata, which is either a binary or an
iolist.
-Use `gun:head/{2,3}` if you don't need the response body.
+Use `gun:head/{2,3,4}` if you don't need the response body.
.HEAD "/organizations/ninenines"
@@ -101,7 +101,7 @@ desirable. The request body of a PATCH method may be a partial
representation or a list of instructions on how to update the
resource.
-The `gun:post/4`, `gun:put/4` and `gun:patch/4` functions
+The `gun:post/{4,5}`, `gun:put/{4,5}` and `gun:patch/{4,5}` functions
take a body as their fourth argument. These functions do
not require any body-specific header to be set, although
it is always recommended to set the content-type header.
@@ -171,7 +171,7 @@ do_sendfile(ConnPid, StreamRef, IoDevice) ->
==== DELETE
-Use `gun:delete/{2,3}` to delete a resource.
+Use `gun:delete/{2,3,4}` to delete a resource.
.DELETE "/organizations/ninenines"
@@ -211,7 +211,7 @@ StreamRef = gun:options(ConnPid, "*").
==== Requests with an arbitrary method
-The `gun:request/{4,5}` function can be used to send requests
+The `gun:request/{4,5,6}` function can be used to send requests
with a configurable method name. It is mostly useful when you
need a method that Gun does not understand natively.
@@ -360,3 +360,14 @@ gun:flush(ConnPid).
[source,erlang]
gun:flush(StreamRef).
+
+=== Redirecting responses to a different process
+
+Gun allows you to specify which process will handle responses
+to a request via the `reply_to` request option.
+
+.GET "/organizations/ninenines" to a different process
+
+[source,erlang]
+StreamRef = gun:get(ConnPid, "/organizations/ninenines", [],
+ #{reply_to => Pid}).
diff --git a/doc/src/guide/protocols.asciidoc b/doc/src/guide/protocols.asciidoc
index 2180c5b..5e3b273 100644
--- a/doc/src/guide/protocols.asciidoc
+++ b/doc/src/guide/protocols.asciidoc
@@ -10,7 +10,7 @@ sends a request, the server sends back a response.
Gun provides convenience functions for performing GET, HEAD,
OPTIONS, POST, PATCH, PUT, and DELETE requests. All these
-functions are aliases of `gun:request/{4,5}` for each respective
+functions are aliases of `gun:request/{4,5,6}` for each respective
methods. Gun also provides a `gun:data/4` function for streaming
the request body.
diff --git a/doc/src/guide/websocket.asciidoc b/doc/src/guide/websocket.asciidoc
index 4869a2e..f99dea7 100644
--- a/doc/src/guide/websocket.asciidoc
+++ b/doc/src/guide/websocket.asciidoc
@@ -64,6 +64,10 @@ after 1000 ->
exit(timeout)
end.
+Note that you shouldn't use the `reply_to` request option
+for connections you plan to upgrade, because only the
+owner of the connection will receive messages about it.
+
=== Sending data
Once the Websocket upgrade has completed successfully, you no
diff --git a/doc/src/manual/gun.asciidoc b/doc/src/manual/gun.asciidoc
index 83f119a..7b9ddd0 100644
--- a/doc/src/manual/gun.asciidoc
+++ b/doc/src/manual/gun.asciidoc
@@ -64,6 +64,15 @@ keepalive => pos_integer()::
version => 'HTTP/1.1' | 'HTTP/1.0'::
HTTP version to use. Defaults to 'HTTP/1.1'.
+=== req_opts() = map()
+
+Configuration for a particular request.
+
+The following keys are defined:
+
+reply_to => pid()::
+ The pid of a process that is responsible for the response handling.
+
=== spdy_opts() = map()
Configuration for the SPDY protocol.
@@ -241,41 +250,56 @@ Gracefully close the connection.
A monitor can be used to be notified when the connection is
effectively closed.
-=== delete(ConnPid, Path) -> delete(ConnPid, Path, [])
+=== delete(ConnPid, Path) -> delete(ConnPid, Path, [], #{})
+
+Alias of `gun:delete/4`.
-Alias of `gun:delete/3`.
+=== delete(ConnPid, Path, Headers) -> delete(ConnPid, Path, Headers, #{})
-=== delete(ConnPid, Path, Headers) -> StreamRef
+Alias of `gun:delete/4`.
+
+=== delete(ConnPid, Path, Headers, ReqOpts) -> StreamRef
ConnPid = pid():: The pid of the Gun connection process.
Path = iodata():: Path to the resource.
Headers = [{binary(), iodata()}]:: Additional request headers.
+ReqOpts = req_opts():: Request options.
StreamRef = reference():: Identifier of the stream for this request.
Delete a resource.
-=== get(ConnPid, Path) -> get(ConnPid, Path, [])
+=== get(ConnPid, Path) -> get(ConnPid, Path, [], #{})
+
+Alias of `gun:get/4`.
+
+=== get(ConnPid, Path, Headers) -> get(ConnPid, Path, Headers, #{})
-Alias of `gun:get/3`.
+Alias of `gun:get/4`.
-=== get(ConnPid, Path, Headers) -> StreamRef
+=== get(ConnPid, Path, Headers, ReqOpts) -> StreamRef
ConnPid = pid():: The pid of the Gun connection process.
Path = iodata():: Path to the resource.
Headers = [{binary(), iodata()}]:: Additional request headers.
+ReqOpts = req_opts():: Request options.
StreamRef = reference():: Identifier of the stream for this request.
Get a resource.
-=== head(ConnPid, Path) -> head(ConnPid, Path, [])
+=== head(ConnPid, Path) -> head(ConnPid, Path, [], #{})
+
+Alias of `gun:head/4`.
+
+=== head(ConnPid, Path, Headers) -> head(ConnPid, Path, Headers, #{})
-Alias of `gun:head/3`.
+Alias of `gun:head/4`.
-=== head(ConnPid, Path, Headers) -> StreamRef
+=== head(ConnPid, Path, Headers, ReqOpts) -> StreamRef
ConnPid = pid():: The pid of the Gun connection process.
Path = iodata():: Path to the resource.
Headers = [{binary(), iodata()}]:: Additional request headers.
+ReqOpts = req_opts():: Request options.
StreamRef = reference():: Identifier of the stream for this request.
Get headers of a resource.
@@ -288,15 +312,20 @@ While servers should send the same headers they would if the
request was a GET, like `content-length`, it is not always
the case and differences may exist.
-=== options(ConnPid, Path) -> options(ConnPid, Path, [])
+=== options(ConnPid, Path) -> options(ConnPid, Path, [], #{})
-Alias of `gun:options/3`.
+Alias of `gun:options/4`.
-=== options(ConnPid, Path, Headers) -> StreamRef
+=== options(ConnPid, Path, Headers) -> options(ConnPid, Path, Headers, #{})
+
+Alias of `gun:options/4`.
+
+=== options(ConnPid, Path, Headers, ReqOpts) -> StreamRef
ConnPid = pid():: The pid of the Gun connection process.
Path = iodata():: Path to the resource.
Headers = [{binary(), iodata()}]:: Additional request headers.
+ReqOpts = req_opts():: Request options.
StreamRef = reference():: Identifier of the stream for this request.
Obtain information about the capabilities of the server or of a resource.
@@ -324,12 +353,17 @@ with instructions on how to update the resource.
You can use the `gun:data/4` function to send the body, if any.
-=== patch(ConnPid, Path, Headers, Body) -> StreamRef
+=== patch(ConnPid, Path, Headers, Body) -> patch(ConnPid, Path, Headers, Body, #{})
+
+Alias of `gun:patch/5`.
+
+=== patch(ConnPid, Path, Headers, Body, ReqOpts) -> StreamRef
ConnPid = pid():: The pid of the Gun connection process.
Path = iodata():: Path to the resource.
Headers = [{binary(), iodata()}]:: Additional request headers.
Body = iodata():: Body of the request.
+ReqOpts = req_opts():: Request options.
StreamRef = reference():: Identifier of the stream for this request.
Request that a set of changes be applied to the resource.
@@ -366,12 +400,17 @@ located at a different URI.
You can use the `gun:data/4` function to send the body, if any.
-=== post(ConnPid, Path, Headers, Body) -> StreamRef
+=== post(ConnPid, Path, Headers, Body) -> post(ConnPid, Path, Headers, Body, #{})
+
+Alias of `gun:post/5`.
+
+=== post(ConnPid, Path, Headers, Body, ReqOpts) -> StreamRef
ConnPid = pid():: The pid of the Gun connection process.
Path = iodata():: Path to the resource.
Headers = [{binary(), iodata()}]:: Additional request headers.
Body = iodata():: Body of the request.
+ReqOpts = req_opts():: Request options.
StreamRef = reference():: Identifier of the stream for this request.
Process the enclosed representation according to the resource's own semantics.
@@ -407,12 +446,17 @@ highly recommended to set both when possible.
You can use the `gun:data/4` function to send the body, if any.
-=== put(ConnPid, Path, Headers, Body) -> StreamRef
+=== put(ConnPid, Path, Headers, Body) -> put(ConnPid, Path, Headers, Body, #{})
+
+Alias of `gun:put/5`.
+
+=== put(ConnPid, Path, Headers, Body, ReqOpts) -> StreamRef
ConnPid = pid():: The pid of the Gun connection process.
Path = iodata():: Path to the resource.
Headers = [{binary(), iodata()}]:: Additional request headers.
Body = iodata():: Body of the request.
+ReqOpts = req_opts():: Request options.
StreamRef = reference():: Identifier of the stream for this request.
Create or replace a resource.
@@ -447,13 +491,18 @@ highly recommended to set both when possible.
You can use the `gun:data/4` function to send the body, if any.
-=== request(ConnPid, Method, Path, Headers, Body) -> StreamRef
+=== request(ConnPid, Method, Path, Headers, Body) -> request(ConnPid, Method, Path, Headers, Body, #{})
+
+Alias of `gun:request/6`.
+
+=== request(ConnPid, Method, Path, Headers, Body, ReqOpts) -> StreamRef
ConnPid = pid():: The pid of the Gun connection process.
Method = iodata():: Request method.
Path = iodata():: Path of the resource.
Headers = [{binary(), iodata()}]:: Additional request headers.
Body = iodata():: Body of the request.
+ReqOpts = req_opts():: Request options.
StreamRef = reference():: Identifier of the stream for this request.
Perform the given request.
diff --git a/src/gun.erl b/src/gun.erl
index 16397c9..8267460 100644
--- a/src/gun.erl
+++ b/src/gun.erl
@@ -23,20 +23,28 @@
%% Requests.
-export([delete/2]).
-export([delete/3]).
+-export([delete/4]).
-export([get/2]).
-export([get/3]).
+-export([get/4]).
-export([head/2]).
-export([head/3]).
+-export([head/4]).
-export([options/2]).
-export([options/3]).
+-export([options/4]).
-export([patch/3]).
-export([patch/4]).
+-export([patch/5]).
-export([post/3]).
-export([post/4]).
+-export([post/5]).
-export([put/3]).
-export([put/4]).
+-export([put/5]).
-export([request/4]).
-export([request/5]).
+-export([request/6]).
%% Streaming data.
-export([data/4]).
@@ -84,6 +92,9 @@
-type opts() :: map().
-export_type([opts/0]).
+-type req_opts() :: map().
+-export_type([req_opts/0]).
+
-type http_opts() :: map().
-export_type([http_opts/0]).
@@ -226,6 +237,10 @@ delete(ServerPid, Path) ->
delete(ServerPid, Path, Headers) ->
request(ServerPid, <<"DELETE">>, Path, Headers).
+-spec delete(pid(), iodata(), headers(), req_opts()) -> reference().
+delete(ServerPid, Path, Headers, ReqOpts) ->
+ request(ServerPid, <<"DELETE">>, Path, Headers, <<>>, ReqOpts).
+
-spec get(pid(), iodata()) -> reference().
get(ServerPid, Path) ->
request(ServerPid, <<"GET">>, Path, []).
@@ -234,6 +249,10 @@ get(ServerPid, Path) ->
get(ServerPid, Path, Headers) ->
request(ServerPid, <<"GET">>, Path, Headers).
+-spec get(pid(), iodata(), headers(), req_opts()) -> reference().
+get(ServerPid, Path, Headers, ReqOpts) ->
+ request(ServerPid, <<"GET">>, Path, Headers, <<>>, ReqOpts).
+
-spec head(pid(), iodata()) -> reference().
head(ServerPid, Path) ->
request(ServerPid, <<"HEAD">>, Path, []).
@@ -242,6 +261,10 @@ head(ServerPid, Path) ->
head(ServerPid, Path, Headers) ->
request(ServerPid, <<"HEAD">>, Path, Headers).
+-spec head(pid(), iodata(), headers(), req_opts()) -> reference().
+head(ServerPid, Path, Headers, ReqOpts) ->
+ request(ServerPid, <<"HEAD">>, Path, Headers, <<>>, ReqOpts).
+
-spec options(pid(), iodata()) -> reference().
options(ServerPid, Path) ->
request(ServerPid, <<"OPTIONS">>, Path, []).
@@ -250,6 +273,10 @@ options(ServerPid, Path) ->
options(ServerPid, Path, Headers) ->
request(ServerPid, <<"OPTIONS">>, Path, Headers).
+-spec options(pid(), iodata(), headers(), req_opts()) -> reference().
+options(ServerPid, Path, Headers, ReqOpts) ->
+ request(ServerPid, <<"OPTIONS">>, Path, Headers, <<>>, ReqOpts).
+
-spec patch(pid(), iodata(), headers()) -> reference().
patch(ServerPid, Path, Headers) ->
request(ServerPid, <<"PATCH">>, Path, Headers).
@@ -258,6 +285,10 @@ patch(ServerPid, Path, Headers) ->
patch(ServerPid, Path, Headers, Body) ->
request(ServerPid, <<"PATCH">>, Path, Headers, Body).
+-spec patch(pid(), iodata(), headers(), iodata(), req_opts()) -> reference().
+patch(ServerPid, Path, Headers, Body, ReqOpts) ->
+ request(ServerPid, <<"PATCH">>, Path, Headers, Body, ReqOpts).
+
-spec post(pid(), iodata(), headers()) -> reference().
post(ServerPid, Path, Headers) ->
request(ServerPid, <<"POST">>, Path, Headers).
@@ -266,6 +297,10 @@ post(ServerPid, Path, Headers) ->
post(ServerPid, Path, Headers, Body) ->
request(ServerPid, <<"POST">>, Path, Headers, Body).
+-spec post(pid(), iodata(), headers(), iodata(), req_opts()) -> reference().
+post(ServerPid, Path, Headers, Body, ReqOpts) ->
+ request(ServerPid, <<"POST">>, Path, Headers, Body, ReqOpts).
+
-spec put(pid(), iodata(), headers()) -> reference().
put(ServerPid, Path, Headers) ->
request(ServerPid, <<"PUT">>, Path, Headers).
@@ -274,16 +309,23 @@ put(ServerPid, Path, Headers) ->
put(ServerPid, Path, Headers, Body) ->
request(ServerPid, <<"PUT">>, Path, Headers, Body).
+-spec put(pid(), iodata(), headers(), iodata(), req_opts()) -> reference().
+put(ServerPid, Path, Headers, Body, ReqOpts) ->
+ request(ServerPid, <<"PUT">>, Path, Headers, Body, ReqOpts).
+
-spec request(pid(), iodata(), iodata(), headers()) -> reference().
request(ServerPid, Method, Path, Headers) ->
- StreamRef = make_ref(),
- _ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers},
- StreamRef.
+ request(ServerPid, Method, Path, Headers, <<>>, #{}).
-spec request(pid(), iodata(), iodata(), headers(), iodata()) -> reference().
request(ServerPid, Method, Path, Headers, Body) ->
+ request(ServerPid, Method, Path, Headers, Body, #{}).
+
+-spec request(pid(), iodata(), iodata(), headers(), iodata(), req_opts()) -> reference().
+request(ServerPid, Method, Path, Headers, Body, ReqOpts) ->
StreamRef = make_ref(),
- _ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers, Body},
+ ReplyTo = maps:get(reply_to, ReqOpts, self()),
+ _ = ServerPid ! {request, ReplyTo, StreamRef, Method, Path, Headers, Body},
StreamRef.
%% Streaming data.
@@ -611,20 +653,22 @@ loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, por
keepalive ->
ProtoState2 = Protocol:keepalive(ProtoState),
before_loop(State#state{protocol_state=ProtoState2});
- {request, Owner, StreamRef, Method, Path, Headers} ->
+ {request, ReplyTo, StreamRef, Method, Path, Headers, <<>>} ->
ProtoState2 = Protocol:request(ProtoState,
- StreamRef, Method, Host, Port, Path, Headers),
+ StreamRef, ReplyTo, Method, Host, Port, Path, Headers),
loop(State#state{protocol_state=ProtoState2});
- {request, Owner, StreamRef, Method, Path, Headers, Body} ->
+ {request, ReplyTo, StreamRef, Method, Path, Headers, Body} ->
ProtoState2 = Protocol:request(ProtoState,
- StreamRef, Method, Host, Port, Path, Headers, Body),
+ StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body),
loop(State#state{protocol_state=ProtoState2});
- {data, Owner, StreamRef, IsFin, Data} ->
+ %% @todo Do we want to reject ReplyTo if it's not the process
+ %% who initiated the connection? For both data and cancel.
+ {data, ReplyTo, StreamRef, IsFin, Data} ->
ProtoState2 = Protocol:data(ProtoState,
- StreamRef, IsFin, Data),
+ StreamRef, ReplyTo, IsFin, Data),
loop(State#state{protocol_state=ProtoState2});
- {cancel, Owner, StreamRef} ->
- ProtoState2 = Protocol:cancel(ProtoState, StreamRef),
+ {cancel, ReplyTo, StreamRef} ->
+ ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo),
loop(State#state{protocol_state=ProtoState2});
%% @todo Maybe make an interface in the protocol module instead of checking on protocol name.
%% An interface would also make sure that HTTP/1.0 can't upgrade.
@@ -662,6 +706,8 @@ loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, por
"Connection needs to be upgraded to Websocket "
"before the gun:ws_send/1 function can be used."}},
loop(State);
+ %% @todo The ReplyTo patch disabled the notowner behavior.
+ %% We need to add an option to enforce this behavior if needed.
Any when is_tuple(Any), is_pid(element(2, Any)) ->
element(2, Any) ! {gun_error, self(), {notowner,
"Operations are restricted to the owner of the connection."}},
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 8ebd42e..2388182 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -20,10 +20,10 @@
-export([handle/2]).
-export([close/1]).
-export([keepalive/1]).
--export([request/7]).
-export([request/8]).
--export([data/4]).
--export([cancel/2]).
+-export([request/9]).
+-export([data/5]).
+-export([cancel/3]).
-export([down/1]).
-export([ws_upgrade/7]).
@@ -34,6 +34,7 @@
-record(stream, {
ref :: reference() | websocket_info(),
+ reply_to :: pid(),
method :: binary(),
is_alive :: boolean(),
handler_state :: undefined | gun_content_handler:state()
@@ -47,7 +48,6 @@
content_handlers :: gun_content_handler:opt(),
connection = keepalive :: keepalive | close,
buffer = <<>> :: binary(),
- %% Stream reference, request method and whether the stream is alive.
streams = [] :: [#stream{}],
in = head :: io(),
in_state = {0, 0} :: {non_neg_integer(), non_neg_integer()},
@@ -159,9 +159,10 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) ->
end
end.
-handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion,
+handle_head(Data, State=#http_state{version=ClientVersion,
content_handlers=Handlers0, connection=Conn,
- streams=[Stream=#stream{ref=StreamRef, method=Method, is_alive=IsAlive}|Tail]}) ->
+ streams=[Stream=#stream{ref=StreamRef, reply_to=ReplyTo,
+ method=Method, is_alive=IsAlive}|Tail]}) ->
{Version, Status, _, Rest} = cow_http:parse_status_line(Data),
{Headers, Rest2} = cow_http:parse_headers(Rest),
case {Status, StreamRef} of
@@ -178,13 +179,12 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion,
{websocket, SR, _, _, _} -> SR;
_ -> StreamRef
end,
- Owner ! {gun_response, self(), StreamRef2,
+ ReplyTo ! {gun_response, self(), StreamRef2,
IsFin, Status, Headers},
- %% @todo Change to ReplyTo.
case IsFin of
fin -> undefined;
nofin ->
- gun_content_handler:init(Owner, StreamRef,
+ gun_content_handler:init(ReplyTo, StreamRef,
Status, Headers, Handlers0)
end
end,
@@ -219,20 +219,20 @@ send_data_if_alive(Data, State=#http_state{streams=[Stream=#stream{
send_data_if_alive(_, State, _) ->
State.
-close(State=#http_state{in=body_close, owner=Owner, streams=[_|Tail]}) ->
+close(State=#http_state{in=body_close, streams=[_|Tail]}) ->
_ = send_data_if_alive(<<>>, State, fin),
- close_streams(Owner, Tail);
-close(#http_state{owner=Owner, streams=Streams}) ->
- close_streams(Owner, Streams).
+ close_streams(Tail);
+close(#http_state{streams=Streams}) ->
+ close_streams(Streams).
-close_streams(_, []) ->
+close_streams([]) ->
ok;
-close_streams(Owner, [#stream{is_alive=false}|Tail]) ->
- close_streams(Owner, Tail);
-close_streams(Owner, [#stream{ref=StreamRef}|Tail]) ->
- Owner ! {gun_error, self(), StreamRef, {closed,
+close_streams([#stream{is_alive=false}|Tail]) ->
+ close_streams(Tail);
+close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {closed,
"The connection was lost."}},
- close_streams(Owner, Tail).
+ close_streams(Tail).
%% We can only keep-alive by sending an empty line in-between streams.
keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) ->
@@ -242,7 +242,7 @@ keepalive(State) ->
State.
request(State=#http_state{socket=Socket, transport=Transport, version=Version,
- out=head}, StreamRef, Method, Host, Port, Path, Headers) ->
+ out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) ->
Headers2 = lists:keydelete(<<"transfer-encoding">>, 1, Headers),
Headers3 = case lists:keymember(<<"host">>, 1, Headers) of
false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2];
@@ -256,10 +256,10 @@ request(State=#http_state{socket=Socket, transport=Transport, version=Version,
_ -> Headers3
end,
Transport:send(Socket, cow_http:request(Method, Path, Version, Headers4)),
- new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, Method).
+ new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method).
request(State=#http_state{socket=Socket, transport=Transport, version=Version,
- out=head}, StreamRef, Method, Host, Port, Path, Headers, Body) ->
+ out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body) ->
Headers2 = lists:keydelete(<<"content-length">>, 1,
lists:keydelete(<<"transfer-encoding">>, 1, Headers)),
Headers3 = case lists:keymember(<<"host">>, 1, Headers) of
@@ -273,17 +273,17 @@ request(State=#http_state{socket=Socket, transport=Transport, version=Version,
{<<"content-length">>, integer_to_binary(iolist_size(Body))}
|Headers3]),
Body]),
- new_stream(State#http_state{connection=Conn}, StreamRef, Method).
+ new_stream(State#http_state{connection=Conn}, StreamRef, ReplyTo, Method).
%% We are expecting a new stream.
-data(State=#http_state{out=head}, StreamRef, _, _) ->
- error_stream_closed(State, StreamRef);
+data(State=#http_state{out=head}, StreamRef, ReplyTo, _, _) ->
+ error_stream_closed(State, StreamRef, ReplyTo);
%% There are no active streams.
-data(State=#http_state{streams=[]}, StreamRef, _, _) ->
- error_stream_not_found(State, StreamRef);
+data(State=#http_state{streams=[]}, StreamRef, ReplyTo, _, _) ->
+ error_stream_not_found(State, StreamRef, ReplyTo);
%% We can only send data on the last created stream.
data(State=#http_state{socket=Socket, transport=Transport, version=Version,
- out=Out, streams=Streams}, StreamRef, IsFin, Data) ->
+ out=Out, streams=Streams}, StreamRef, ReplyTo, IsFin, Data) ->
case lists:last(Streams) of
#stream{ref=StreamRef, is_alive=true} ->
DataLength = iolist_size(Data),
@@ -316,16 +316,16 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
State
end;
_ ->
- error_stream_not_found(State, StreamRef)
+ error_stream_not_found(State, StreamRef, ReplyTo)
end.
%% We can't cancel anything, we can just stop forwarding messages to the owner.
-cancel(State, StreamRef) ->
+cancel(State, StreamRef, ReplyTo) ->
case is_stream(State, StreamRef) of
true ->
cancel_stream(State, StreamRef);
false ->
- error_stream_not_found(State, StreamRef)
+ error_stream_not_found(State, StreamRef, ReplyTo)
end.
%% HTTP does not provide any way to figure out what streams are unprocessed.
@@ -336,13 +336,13 @@ down(#http_state{streams=Streams}) ->
end || #stream{ref=Ref} <- Streams],
{KilledStreams, []}.
-error_stream_closed(State=#http_state{owner=Owner}, StreamRef) ->
- Owner ! {gun_error, self(), StreamRef, {badstate,
+error_stream_closed(State, StreamRef, ReplyTo) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"The stream has already been closed."}},
State.
-error_stream_not_found(State=#http_state{owner=Owner}, StreamRef) ->
- Owner ! {gun_error, self(), StreamRef, {badstate,
+error_stream_not_found(State, StreamRef, ReplyTo) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"The stream cannot be found."}},
State.
@@ -401,12 +401,13 @@ response_io_from_headers(_, Version, _Status, Headers) ->
%% Streams.
-new_stream(State=#http_state{streams=Streams}, StreamRef, Method) ->
+new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, Method) ->
State#http_state{streams=Streams
- ++ [#stream{ref=StreamRef, method=iolist_to_binary(Method), is_alive=true}]}.
+ ++ [#stream{ref=StreamRef, reply_to=ReplyTo,
+ method=iolist_to_binary(Method), is_alive=true}]}.
is_stream(#http_state{streams=Streams}, StreamRef) ->
- lists:keymember(StreamRef, 1, Streams).
+ lists:keymember(StreamRef, #stream.ref, Streams).
cancel_stream(State=#http_state{streams=Streams}, StreamRef) ->
Streams2 = [case Ref of
@@ -425,7 +426,7 @@ end_stream(State=#http_state{streams=[_|Tail]}) ->
%% Ensure version is 1.1.
ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _) ->
error; %% @todo
-ws_upgrade(State=#http_state{socket=Socket, transport=Transport, out=head},
+ws_upgrade(State=#http_state{socket=Socket, transport=Transport, owner=Owner, out=head},
StreamRef, Host, Port, Path, Headers0, WsOpts) ->
{Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of
true -> {[{<<"sec-websocket-extensions">>,
@@ -457,7 +458,7 @@ ws_upgrade(State=#http_state{socket=Socket, transport=Transport, out=head},
end,
Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers)),
new_stream(State#http_state{connection=keepalive, out=head},
- {websocket, StreamRef, Key, GunExtensions, WsOpts}, <<"GET">>).
+ {websocket, StreamRef, Key, GunExtensions, WsOpts}, Owner, <<"GET">>).
ws_handshake(Buffer, State, Headers, Key, GunExtensions, Opts) ->
%% @todo check upgrade, connection
diff --git a/src/gun_http2.erl b/src/gun_http2.erl
index 30ee06e..6073119 100644
--- a/src/gun_http2.erl
+++ b/src/gun_http2.erl
@@ -20,15 +20,16 @@
-export([handle/2]).
-export([close/1]).
-export([keepalive/1]).
--export([request/7]).
-export([request/8]).
--export([data/4]).
--export([cancel/2]).
+-export([request/9]).
+-export([data/5]).
+-export([cancel/3]).
-export([down/1]).
-record(stream, {
id :: non_neg_integer(),
ref :: reference(),
+ reply_to :: pid(),
%% Whether we finished sending data.
local = nofin :: cowboy_stream:fin(),
%% Whether we finished receiving data.
@@ -111,19 +112,18 @@ frame({data, StreamID, IsFin, Data}, State) ->
end;
%% Single HEADERS frame headers block.
frame({headers, StreamID, IsFin, head_fin, HeaderBlock},
- State=#http2_state{owner=Owner, decode_state=DecodeState0, content_handlers=Handlers0}) ->
+ State=#http2_state{decode_state=DecodeState0, content_handlers=Handlers0}) ->
case get_stream_by_id(StreamID, State) of
- Stream = #stream{ref=StreamRef, remote=nofin} ->
+ Stream = #stream{ref=StreamRef, reply_to=ReplyTo, remote=nofin} ->
try cow_hpack:decode(HeaderBlock, DecodeState0) of
{Headers0, DecodeState} ->
case lists:keytake(<<":status">>, 1, Headers0) of
{value, {_, Status}, Headers} ->
- Owner ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers},
- %% @todo Change to ReplyTo.
+ ReplyTo ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers},
Handlers = case IsFin of
fin -> undefined;
nofin ->
- gun_content_handler:init(Owner, StreamRef,
+ gun_content_handler:init(ReplyTo, StreamRef,
Status, Headers, Handlers0)
end,
remote_fin(Stream#stream{handler_state=Handlers},
@@ -133,7 +133,7 @@ frame({headers, StreamID, IsFin, head_fin, HeaderBlock},
'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'})
end
catch _:_ ->
- terminate(State, {connection_error, compression_error,
+ terminate(State, StreamID, {connection_error, compression_error,
'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'})
end;
_ ->
@@ -177,7 +177,7 @@ frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock},
case get_stream_by_id(PromisedStreamID, State) of
false ->
case get_stream_by_id(StreamID, State) of
- #stream{ref=StreamRef} ->
+ #stream{ref=StreamRef, reply_to=ReplyTo} ->
try cow_hpack:decode(HeaderBlock, DecodeState0) of
{Headers0, DecodeState} ->
{Method, Scheme, Authority, Path, Headers} = try
@@ -191,9 +191,9 @@ frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock},
'Malformed push promise; missing pseudo-header field. (RFC7540 8.1.2.3)'})
end,
NewStreamRef = make_ref(),
- Owner ! {gun_push, self(), StreamRef, NewStreamRef, Method,
+ ReplyTo ! {gun_push, self(), StreamRef, NewStreamRef, Method,
iolist_to_binary([Scheme, <<"://">>, Authority, Path]), Headers},
- new_stream(PromisedStreamID, NewStreamRef, nofin, fin,
+ new_stream(PromisedStreamID, NewStreamRef, ReplyTo, nofin, fin,
State#http2_state{decode_state=DecodeState})
catch _:_ ->
terminate(State, {connection_error, compression_error,
@@ -216,8 +216,8 @@ frame({ping, Opaque}, State=#http2_state{socket=Socket, transport=Transport}) ->
frame({ping_ack, _Opaque}, State) ->
State;
%% GOAWAY frame.
-frame(Frame={goaway, _, _, _}, State) ->
- terminate(State, {stop, Frame, 'Client is going away.'});
+frame(Frame={goaway, StreamID, _, _}, State) ->
+ terminate(State, StreamID, {stop, Frame, 'Client is going away.'});
%% Connection-wide WINDOW_UPDATE frame.
frame({window_update, _Increment}, State) ->
%% @todo control flow
@@ -227,30 +227,30 @@ frame({window_update, _StreamID, _Increment}, State) ->
%% @todo stream-specific control flow
State;
%% Unexpected CONTINUATION frame.
-frame({continuation, _, _, _}, State) ->
- terminate(State, {connection_error, protocol_error,
+frame({continuation, StreamID, _, _}, State) ->
+ terminate(State, StreamID, {connection_error, protocol_error,
'CONTINUATION frames MUST be preceded by a HEADERS frame. (RFC7540 6.10)'}).
parse_status(Status) ->
<< Code:3/binary, _/bits >> = Status,
list_to_integer(binary_to_list(Code)).
-close(#http2_state{owner=Owner, streams=Streams}) ->
- close_streams(Owner, Streams).
+close(#http2_state{streams=Streams}) ->
+ close_streams(Streams).
-close_streams(_, []) ->
+close_streams([]) ->
ok;
-close_streams(Owner, [#stream{ref=StreamRef}|Tail]) ->
- Owner ! {gun_error, self(), StreamRef, {closed,
+close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {closed,
"The connection was lost."}},
- close_streams(Owner, Tail).
+ close_streams(Tail).
keepalive(State=#http2_state{socket=Socket, transport=Transport}) ->
Transport:send(Socket, cow_http2:ping(0)),
State.
request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0,
- stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers) ->
+ stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) ->
{HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers),
IsFin = case (false =/= lists:keyfind(<<"content-type">>, 1, Headers))
orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers)) of
@@ -258,12 +258,12 @@ request(State=#http2_state{socket=Socket, transport=Transport, encode_state=Enco
false -> fin
end,
Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)),
- new_stream(StreamID, StreamRef, nofin, IsFin,
+ new_stream(StreamID, StreamRef, ReplyTo, nofin, IsFin,
State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}).
%% @todo Handle Body > 16MB. (split it out into many frames)
request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0,
- stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers0, Body) ->
+ stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body) ->
Headers = lists:keystore(<<"content-length">>, 1, Headers0,
{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
{HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers),
@@ -272,7 +272,7 @@ request(State=#http2_state{socket=Socket, transport=Transport, encode_state=Enco
%% Use the length set by the server instead, if any.
%% @todo Would be better if we didn't have to convert to binary.
send_data(Socket, Transport, StreamID, fin, iolist_to_binary(Body), 16384),
- new_stream(StreamID, StreamRef, nofin, fin,
+ new_stream(StreamID, StreamRef, ReplyTo, nofin, fin,
State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}).
prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) ->
@@ -299,10 +299,11 @@ prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) ->
|Headers1],
cow_hpack:encode(Headers, EncodeState).
-data(State=#http2_state{socket=Socket, transport=Transport}, StreamRef, IsFin, Data) ->
+data(State=#http2_state{socket=Socket, transport=Transport},
+ StreamRef, ReplyTo, IsFin, Data) ->
case get_stream_by_ref(StreamRef, State) of
#stream{local=fin} ->
- error_stream_closed(State, StreamRef);
+ error_stream_closed(State, StreamRef, ReplyTo);
S = #stream{} ->
%% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE.
%% Use the length set by the server instead, if any.
@@ -310,7 +311,7 @@ data(State=#http2_state{socket=Socket, transport=Transport}, StreamRef, IsFin, D
send_data(Socket, Transport, S#stream.id, IsFin, iolist_to_binary(Data), 16384),
local_fin(S, State, IsFin);
false ->
- error_stream_not_found(State, StreamRef)
+ error_stream_not_found(State, StreamRef, ReplyTo)
end.
%% This same function is found in cowboy_http2.
@@ -325,13 +326,13 @@ send_data(Socket, Transport, StreamID, IsFin, Data, Length) ->
end.
cancel(State=#http2_state{socket=Socket, transport=Transport},
- StreamRef) ->
+ StreamRef, ReplyTo) ->
case get_stream_by_ref(StreamRef, State) of
#stream{id=StreamID} ->
Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)),
delete_stream(StreamID, State);
false ->
- error_stream_not_found(State, StreamRef)
+ error_stream_not_found(State, StreamRef, ReplyTo)
end.
%% @todo Add unprocessed streams when GOAWAY handling is done.
@@ -339,18 +340,31 @@ down(#http2_state{streams=Streams}) ->
KilledStreams = [Ref || #stream{ref=Ref} <- Streams],
{KilledStreams, []}.
-terminate(#http2_state{owner=Owner}, Reason) ->
- Owner ! {gun_error, self(), Reason},
+terminate(#http2_state{streams=Streams}, Reason) ->
+ %% Because a particular stream is unknown,
+ %% we're sending the error message to all streams.
+ _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams],
%% @todo Send GOAWAY frame.
%% @todo LastGoodStreamID
close.
-stream_reset(State=#http2_state{owner=Owner, socket=Socket, transport=Transport,
+terminate(State, StreamID, Reason) ->
+ case get_stream_by_id(StreamID, State) of
+ #stream{reply_to=ReplyTo} ->
+ ReplyTo ! {gun_error, self(), Reason},
+ %% @todo Send GOAWAY frame.
+ %% @todo LastGoodStreamID
+ close;
+ _ ->
+ terminate(State, Reason)
+ end.
+
+stream_reset(State=#http2_state{socket=Socket, transport=Transport,
streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) ->
Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)),
case lists:keytake(StreamID, #stream.id, Streams0) of
- {value, #stream{ref=StreamRef}, Streams} ->
- Owner ! {gun_error, self(), StreamRef, StreamError},
+ {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} ->
+ ReplyTo ! {gun_error, self(), StreamRef, StreamError},
State#http2_state{streams=Streams};
false ->
%% @todo Unknown stream. Not sure what to do here. Check again once all
@@ -358,22 +372,22 @@ stream_reset(State=#http2_state{owner=Owner, socket=Socket, transport=Transport,
State
end.
-error_stream_closed(State=#http2_state{owner=Owner}, StreamRef) ->
- Owner ! {gun_error, self(), StreamRef, {badstate,
+error_stream_closed(State, StreamRef, ReplyTo) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"The stream has already been closed."}},
State.
-error_stream_not_found(State=#http2_state{owner=Owner}, StreamRef) ->
- Owner ! {gun_error, self(), StreamRef, {badstate,
+error_stream_not_found(State, StreamRef, ReplyTo) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"The stream cannot be found."}},
State.
%% Streams.
%% @todo probably change order of args and have state first?
-new_stream(StreamID, StreamRef, Remote, Local,
+new_stream(StreamID, StreamRef, ReplyTo, Remote, Local,
State=#http2_state{streams=Streams}) ->
- New = #stream{id=StreamID, ref=StreamRef, remote=Remote, local=Local},
+ New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, remote=Remote, local=Local},
State#http2_state{streams=[New|Streams]}.
get_stream_by_id(StreamID, #http2_state{streams=Streams}) ->
diff --git a/src/gun_spdy.erl b/src/gun_spdy.erl
index dcd7496..b7306a8 100644
--- a/src/gun_spdy.erl
+++ b/src/gun_spdy.erl
@@ -20,15 +20,16 @@
-export([handle/2]).
-export([close/1]).
-export([keepalive/1]).
--export([request/7]).
-export([request/8]).
--export([data/4]).
--export([cancel/2]).
+-export([request/9]).
+-export([data/5]).
+-export([cancel/3]).
-export([down/1]).
-record(stream, {
id :: non_neg_integer(),
ref :: reference(),
+ reply_to :: pid(),
in :: boolean(), %% true = open
out :: boolean(), %% true = open
version :: binary()
@@ -75,38 +76,36 @@ handle_loop(Data, State=#spdy_state{zinf=Zinf}) ->
State#spdy_state{buffer=Data}
end.
-handle_frame(Rest, State=#spdy_state{owner=Owner,
- socket=Socket, transport=Transport},
+handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport},
{data, StreamID, IsFin, Data}) ->
case get_stream_by_id(StreamID, State) of
#stream{in=false} ->
Transport:send(Socket,
cow_spdy:rst_stream(StreamID, stream_already_closed)),
handle_loop(Rest, delete_stream(StreamID, State));
- S = #stream{ref=StreamRef} when IsFin ->
- Owner ! {gun_data, self(), StreamRef, fin, Data},
+ S = #stream{ref=StreamRef, reply_to=ReplyTo} when IsFin ->
+ ReplyTo ! {gun_data, self(), StreamRef, fin, Data},
handle_loop(Rest, in_fin_stream(S, State));
- #stream{ref=StreamRef} ->
- Owner ! {gun_data, self(), StreamRef, nofin, Data},
+ #stream{ref=StreamRef, reply_to=ReplyTo} ->
+ ReplyTo ! {gun_data, self(), StreamRef, nofin, Data},
handle_loop(Rest, State);
false ->
Transport:send(Socket,
cow_spdy:rst_stream(StreamID, invalid_stream)),
handle_loop(Rest, delete_stream(StreamID, State))
end;
-handle_frame(Rest, State=#spdy_state{owner=Owner,
- socket=Socket, transport=Transport},
+handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport},
{syn_stream, StreamID, AssocToStreamID, IsFin, IsUnidirectional,
_, Method, Scheme, Host, Path, Version, Headers})
when AssocToStreamID =/= 0, IsUnidirectional ->
case get_stream_by_id(StreamID, State) of
false ->
case get_stream_by_id(AssocToStreamID, State) of
- #stream{ref=AssocToStreamRef} ->
+ #stream{ref=AssocToStreamRef, reply_to=ReplyTo} ->
StreamRef = make_ref(),
- Owner ! {gun_push, self(), AssocToStreamRef, StreamRef, Method,
+ ReplyTo ! {gun_push, self(), AssocToStreamRef, StreamRef, Method,
iolist_to_binary([Scheme, <<"://">>, Host, Path]), Headers},
- handle_loop(Rest, new_stream(StreamID, StreamRef,
+ handle_loop(Rest, new_stream(StreamID, StreamRef, ReplyTo,
not IsFin, false, Version, State));
false ->
Transport:send(Socket,
@@ -123,20 +122,19 @@ handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport},
Transport:send(Socket,
cow_spdy:rst_stream(StreamID, protocol_error)),
handle_loop(Rest, State);
-handle_frame(Rest, State=#spdy_state{owner=Owner,
- socket=Socket, transport=Transport},
+handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport},
{syn_reply, StreamID, IsFin, Status, _, Headers}) ->
case get_stream_by_id(StreamID, State) of
#stream{in=false} ->
Transport:send(Socket,
cow_spdy:rst_stream(StreamID, stream_already_closed)),
handle_loop(Rest, delete_stream(StreamID, State));
- S = #stream{ref=StreamRef} when IsFin ->
- Owner ! {gun_response, self(), StreamRef, fin,
+ S = #stream{ref=StreamRef, reply_to=ReplyTo} when IsFin ->
+ ReplyTo ! {gun_response, self(), StreamRef, fin,
parse_status(Status), Headers},
handle_loop(Rest, in_fin_stream(S, State));
- #stream{ref=StreamRef} ->
- Owner ! {gun_response, self(), StreamRef, nofin,
+ #stream{ref=StreamRef, reply_to=ReplyTo} ->
+ ReplyTo ! {gun_response, self(), StreamRef, nofin,
parse_status(Status), Headers},
handle_loop(Rest, State);
false ->
@@ -144,11 +142,10 @@ handle_frame(Rest, State=#spdy_state{owner=Owner,
cow_spdy:rst_stream(StreamID, invalid_stream)),
handle_loop(Rest, delete_stream(StreamID, State))
end;
-handle_frame(Rest, State=#spdy_state{owner=Owner},
- {rst_stream, StreamID, Status}) ->
+handle_frame(Rest, State, {rst_stream, StreamID, Status}) ->
case get_stream_by_id(StreamID, State) of
- #stream{ref=StreamRef} ->
- Owner ! {gun_error, self(), StreamRef, Status},
+ #stream{ref=StreamRef, reply_to=ReplyTo} ->
+ ReplyTo ! {gun_error, self(), StreamRef, Status},
handle_loop(Rest, delete_stream(StreamID, State));
false ->
handle_loop(Rest, State)
@@ -177,10 +174,12 @@ handle_frame(Rest, State, {window_update, StreamID, DeltaWindowSize}) ->
error_logger:error_msg("Ignored WINDOW_UPDATE control frame ~p ~p~n",
[StreamID, DeltaWindowSize]),
handle_loop(Rest, State);
-handle_frame(_, #spdy_state{owner=Owner, socket=Socket, transport=Transport},
+handle_frame(_, #spdy_state{streams=Streams, socket=Socket, transport=Transport},
{error, badprotocol}) ->
- Owner ! {gun_error, self(), {badprotocol,
- "The remote endpoint sent invalid data."}},
+ %% Because a particular stream is unknown,
+ %% we're sending the error message to all streams.
+ Reason = {badprotocol, "The remote endpoint sent invalid data."},
+ _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams],
%% @todo LastGoodStreamID
Transport:send(Socket, cow_spdy:goaway(0, protocol_error)),
close.
@@ -189,15 +188,15 @@ parse_status(Status) ->
<< Code:3/binary, _/bits >> = Status,
list_to_integer(binary_to_list(Code)).
-close(#spdy_state{owner=Owner, streams=Streams}) ->
- close_streams(Owner, Streams).
+close(#spdy_state{streams=Streams}) ->
+ close_streams(Streams).
-close_streams(_, []) ->
+close_streams([]) ->
ok;
-close_streams(Owner, [#stream{ref=StreamRef}|Tail]) ->
- Owner ! {gun_error, self(), StreamRef, {closed,
+close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {closed,
"The connection was lost."}},
- close_streams(Owner, Tail).
+ close_streams(Tail).
keepalive(State=#spdy_state{socket=Socket, transport=Transport,
ping_id=PingID}) ->
@@ -206,20 +205,20 @@ keepalive(State=#spdy_state{socket=Socket, transport=Transport,
%% @todo Always https scheme?
request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef,
- stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers) ->
+ stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) ->
{Host2, Headers2} = prepare_request(Headers, Host, Port),
Out = (false =/= lists:keyfind(<<"content-type">>, 1, Headers2))
orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers2)),
Transport:send(Socket, cow_spdy:syn_stream(Zdef,
StreamID, 0, not Out, false, 0,
Method, <<"https">>, Host2, Path, <<"HTTP/1.1">>, Headers2)),
- new_stream(StreamID, StreamRef, true, Out, <<"HTTP/1.1">>,
+ new_stream(StreamID, StreamRef, ReplyTo, true, Out, <<"HTTP/1.1">>,
State#spdy_state{stream_id=StreamID + 2}).
%% @todo Handle Body > 16MB. (split it out into many frames)
%% @todo Always https scheme?
request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef,
- stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers, Body) ->
+ stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body) ->
{Host2, Headers2} = prepare_request(Headers, Host, Port),
Headers3 = lists:keystore(<<"content-length">>, 1, Headers2,
{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
@@ -229,7 +228,7 @@ request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef,
Method, <<"https">>, Host2, Path, <<"HTTP/1.1">>, Headers3),
cow_spdy:data(StreamID, true, Body)
]),
- new_stream(StreamID, StreamRef, true, false, <<"HTTP/1.1">>,
+ new_stream(StreamID, StreamRef, ReplyTo, true, false, <<"HTTP/1.1">>,
State#spdy_state{stream_id=StreamID + 2}).
prepare_request(Headers, Host, Port) ->
@@ -243,10 +242,10 @@ prepare_request(Headers, Host, Port) ->
end.
data(State=#spdy_state{socket=Socket, transport=Transport},
- StreamRef, IsFin, Data) ->
+ StreamRef, ReplyTo, IsFin, Data) ->
case get_stream_by_ref(StreamRef, State) of
#stream{out=false} ->
- error_stream_closed(State, StreamRef);
+ error_stream_closed(State, StreamRef, ReplyTo);
S = #stream{} ->
IsFin2 = IsFin =:= fin,
Transport:send(Socket, cow_spdy:data(S#stream.id, IsFin2, Data)),
@@ -256,17 +255,17 @@ data(State=#spdy_state{socket=Socket, transport=Transport},
State
end;
false ->
- error_stream_not_found(State, StreamRef)
+ error_stream_not_found(State, StreamRef, ReplyTo)
end.
cancel(State=#spdy_state{socket=Socket, transport=Transport},
- StreamRef) ->
+ StreamRef, ReplyTo) ->
case get_stream_by_ref(StreamRef, State) of
#stream{id=StreamID} ->
Transport:send(Socket, cow_spdy:rst_stream(StreamID, cancel)),
delete_stream(StreamID, State);
false ->
- error_stream_not_found(State, StreamRef)
+ error_stream_not_found(State, StreamRef, ReplyTo)
end.
%% @todo Add unprocessed streams when GOAWAY handling is done.
@@ -274,22 +273,22 @@ down(#spdy_state{streams=Streams}) ->
KilledStreams = [Ref || #stream{ref=Ref} <- Streams],
{KilledStreams, []}.
-error_stream_closed(State=#spdy_state{owner=Owner}, StreamRef) ->
- Owner ! {gun_error, self(), StreamRef, {badstate,
+error_stream_closed(State, StreamRef, ReplyTo) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"The stream has already been closed."}},
State.
-error_stream_not_found(State=#spdy_state{owner=Owner}, StreamRef) ->
- Owner ! {gun_error, self(), StreamRef, {badstate,
+error_stream_not_found(State, StreamRef, ReplyTo) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"The stream cannot be found."}},
State.
%% Streams.
%% @todo probably change order of args and have state first?
-new_stream(StreamID, StreamRef, In, Out, Version,
+new_stream(StreamID, StreamRef, ReplyTo, In, Out, Version,
State=#spdy_state{streams=Streams}) ->
- New = #stream{id=StreamID, ref=StreamRef,
+ New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo,
in=In, out=Out, version=Version},
State#spdy_state{streams=[New|Streams]}.
diff --git a/test/gun_SUITE.erl b/test/gun_SUITE.erl
index 64b8885..171c874 100644
--- a/test/gun_SUITE.erl
+++ b/test/gun_SUITE.erl
@@ -75,6 +75,32 @@ gone_reason(_) ->
error(timeout)
end.
+reply_to(_) ->
+ doc("The reply_to option allows using a separate process for requests."),
+ do_reply_to(http),
+ do_reply_to(http2).
+
+do_reply_to(Protocol) ->
+ Self = self(),
+ {ok, Pid} = gun:open("google.com", 443, #{protocols => [Protocol]}),
+ {ok, Protocol} = gun:await_up(Pid),
+ ReplyTo = spawn(fun() ->
+ receive Ref ->
+ Response = gun:await(Pid, Ref),
+ Self ! Response
+ after 1000 ->
+ error(timeout)
+ end
+ end),
+ Ref = gun:get(Pid, "/", [{<<"host">>, <<"google.com">>}], #{reply_to => ReplyTo}),
+ ReplyTo ! Ref,
+ receive
+ {response, _, _, _} ->
+ ok
+ after 1000 ->
+ error(timeout)
+ end.
+
retry_0(_) ->
doc("Ensure Gun gives up immediately with retry=0."),
{ok, Pid} = gun:open("localhost", 12345, #{retry => 0, retry_timeout => 500}),