From 0a181681223aead4043b2437fe493652db6e5f8a Mon Sep 17 00:00:00 2001 From: Andrei Nesterov Date: Wed, 16 Nov 2016 20:57:11 +0300 Subject: Add support for choosing a process to reply to --- doc/src/guide/http.asciidoc | 21 +++++++-- doc/src/guide/protocols.asciidoc | 2 +- doc/src/guide/websocket.asciidoc | 4 ++ doc/src/manual/gun.asciidoc | 81 ++++++++++++++++++++++++++------- src/gun.erl | 70 +++++++++++++++++++++++----- src/gun_http.erl | 81 +++++++++++++++++---------------- src/gun_http2.erl | 98 +++++++++++++++++++++++----------------- src/gun_spdy.erl | 95 +++++++++++++++++++------------------- test/gun_SUITE.erl | 26 +++++++++++ 9 files changed, 314 insertions(+), 164 deletions(-) diff --git a/doc/src/guide/http.asciidoc b/doc/src/guide/http.asciidoc index 465a4c5..e856fb1 100644 --- a/doc/src/guide/http.asciidoc +++ b/doc/src/guide/http.asciidoc @@ -45,7 +45,7 @@ handling of responses will be explained further on. ==== GET and HEAD -Use `gun:get/{2,3}` to request a resource. +Use `gun:get/{2,3,4}` to request a resource. .GET "/organizations/ninenines" @@ -64,7 +64,7 @@ Note that the list of headers has the field name as a binary. The field value is iodata, which is either a binary or an iolist. -Use `gun:head/{2,3}` if you don't need the response body. +Use `gun:head/{2,3,4}` if you don't need the response body. .HEAD "/organizations/ninenines" @@ -101,7 +101,7 @@ desirable. The request body of a PATCH method may be a partial representation or a list of instructions on how to update the resource. -The `gun:post/4`, `gun:put/4` and `gun:patch/4` functions +The `gun:post/{4,5}`, `gun:put/{4,5}` and `gun:patch/{4,5}` functions take a body as their fourth argument. These functions do not require any body-specific header to be set, although it is always recommended to set the content-type header. @@ -171,7 +171,7 @@ do_sendfile(ConnPid, StreamRef, IoDevice) -> ==== DELETE -Use `gun:delete/{2,3}` to delete a resource. +Use `gun:delete/{2,3,4}` to delete a resource. .DELETE "/organizations/ninenines" @@ -211,7 +211,7 @@ StreamRef = gun:options(ConnPid, "*"). ==== Requests with an arbitrary method -The `gun:request/{4,5}` function can be used to send requests +The `gun:request/{4,5,6}` function can be used to send requests with a configurable method name. It is mostly useful when you need a method that Gun does not understand natively. @@ -360,3 +360,14 @@ gun:flush(ConnPid). [source,erlang] gun:flush(StreamRef). + +=== Redirecting responses to a different process + +Gun allows you to specify which process will handle responses +to a request via the `reply_to` request option. + +.GET "/organizations/ninenines" to a different process + +[source,erlang] +StreamRef = gun:get(ConnPid, "/organizations/ninenines", [], + #{reply_to => Pid}). diff --git a/doc/src/guide/protocols.asciidoc b/doc/src/guide/protocols.asciidoc index 2180c5b..5e3b273 100644 --- a/doc/src/guide/protocols.asciidoc +++ b/doc/src/guide/protocols.asciidoc @@ -10,7 +10,7 @@ sends a request, the server sends back a response. Gun provides convenience functions for performing GET, HEAD, OPTIONS, POST, PATCH, PUT, and DELETE requests. All these -functions are aliases of `gun:request/{4,5}` for each respective +functions are aliases of `gun:request/{4,5,6}` for each respective methods. Gun also provides a `gun:data/4` function for streaming the request body. diff --git a/doc/src/guide/websocket.asciidoc b/doc/src/guide/websocket.asciidoc index 4869a2e..f99dea7 100644 --- a/doc/src/guide/websocket.asciidoc +++ b/doc/src/guide/websocket.asciidoc @@ -64,6 +64,10 @@ after 1000 -> exit(timeout) end. +Note that you shouldn't use the `reply_to` request option +for connections you plan to upgrade, because only the +owner of the connection will receive messages about it. + === Sending data Once the Websocket upgrade has completed successfully, you no diff --git a/doc/src/manual/gun.asciidoc b/doc/src/manual/gun.asciidoc index 83f119a..7b9ddd0 100644 --- a/doc/src/manual/gun.asciidoc +++ b/doc/src/manual/gun.asciidoc @@ -64,6 +64,15 @@ keepalive => pos_integer():: version => 'HTTP/1.1' | 'HTTP/1.0':: HTTP version to use. Defaults to 'HTTP/1.1'. +=== req_opts() = map() + +Configuration for a particular request. + +The following keys are defined: + +reply_to => pid():: + The pid of a process that is responsible for the response handling. + === spdy_opts() = map() Configuration for the SPDY protocol. @@ -241,41 +250,56 @@ Gracefully close the connection. A monitor can be used to be notified when the connection is effectively closed. -=== delete(ConnPid, Path) -> delete(ConnPid, Path, []) +=== delete(ConnPid, Path) -> delete(ConnPid, Path, [], #{}) + +Alias of `gun:delete/4`. -Alias of `gun:delete/3`. +=== delete(ConnPid, Path, Headers) -> delete(ConnPid, Path, Headers, #{}) -=== delete(ConnPid, Path, Headers) -> StreamRef +Alias of `gun:delete/4`. + +=== delete(ConnPid, Path, Headers, ReqOpts) -> StreamRef ConnPid = pid():: The pid of the Gun connection process. Path = iodata():: Path to the resource. Headers = [{binary(), iodata()}]:: Additional request headers. +ReqOpts = req_opts():: Request options. StreamRef = reference():: Identifier of the stream for this request. Delete a resource. -=== get(ConnPid, Path) -> get(ConnPid, Path, []) +=== get(ConnPid, Path) -> get(ConnPid, Path, [], #{}) + +Alias of `gun:get/4`. + +=== get(ConnPid, Path, Headers) -> get(ConnPid, Path, Headers, #{}) -Alias of `gun:get/3`. +Alias of `gun:get/4`. -=== get(ConnPid, Path, Headers) -> StreamRef +=== get(ConnPid, Path, Headers, ReqOpts) -> StreamRef ConnPid = pid():: The pid of the Gun connection process. Path = iodata():: Path to the resource. Headers = [{binary(), iodata()}]:: Additional request headers. +ReqOpts = req_opts():: Request options. StreamRef = reference():: Identifier of the stream for this request. Get a resource. -=== head(ConnPid, Path) -> head(ConnPid, Path, []) +=== head(ConnPid, Path) -> head(ConnPid, Path, [], #{}) + +Alias of `gun:head/4`. + +=== head(ConnPid, Path, Headers) -> head(ConnPid, Path, Headers, #{}) -Alias of `gun:head/3`. +Alias of `gun:head/4`. -=== head(ConnPid, Path, Headers) -> StreamRef +=== head(ConnPid, Path, Headers, ReqOpts) -> StreamRef ConnPid = pid():: The pid of the Gun connection process. Path = iodata():: Path to the resource. Headers = [{binary(), iodata()}]:: Additional request headers. +ReqOpts = req_opts():: Request options. StreamRef = reference():: Identifier of the stream for this request. Get headers of a resource. @@ -288,15 +312,20 @@ While servers should send the same headers they would if the request was a GET, like `content-length`, it is not always the case and differences may exist. -=== options(ConnPid, Path) -> options(ConnPid, Path, []) +=== options(ConnPid, Path) -> options(ConnPid, Path, [], #{}) -Alias of `gun:options/3`. +Alias of `gun:options/4`. -=== options(ConnPid, Path, Headers) -> StreamRef +=== options(ConnPid, Path, Headers) -> options(ConnPid, Path, Headers, #{}) + +Alias of `gun:options/4`. + +=== options(ConnPid, Path, Headers, ReqOpts) -> StreamRef ConnPid = pid():: The pid of the Gun connection process. Path = iodata():: Path to the resource. Headers = [{binary(), iodata()}]:: Additional request headers. +ReqOpts = req_opts():: Request options. StreamRef = reference():: Identifier of the stream for this request. Obtain information about the capabilities of the server or of a resource. @@ -324,12 +353,17 @@ with instructions on how to update the resource. You can use the `gun:data/4` function to send the body, if any. -=== patch(ConnPid, Path, Headers, Body) -> StreamRef +=== patch(ConnPid, Path, Headers, Body) -> patch(ConnPid, Path, Headers, Body, #{}) + +Alias of `gun:patch/5`. + +=== patch(ConnPid, Path, Headers, Body, ReqOpts) -> StreamRef ConnPid = pid():: The pid of the Gun connection process. Path = iodata():: Path to the resource. Headers = [{binary(), iodata()}]:: Additional request headers. Body = iodata():: Body of the request. +ReqOpts = req_opts():: Request options. StreamRef = reference():: Identifier of the stream for this request. Request that a set of changes be applied to the resource. @@ -366,12 +400,17 @@ located at a different URI. You can use the `gun:data/4` function to send the body, if any. -=== post(ConnPid, Path, Headers, Body) -> StreamRef +=== post(ConnPid, Path, Headers, Body) -> post(ConnPid, Path, Headers, Body, #{}) + +Alias of `gun:post/5`. + +=== post(ConnPid, Path, Headers, Body, ReqOpts) -> StreamRef ConnPid = pid():: The pid of the Gun connection process. Path = iodata():: Path to the resource. Headers = [{binary(), iodata()}]:: Additional request headers. Body = iodata():: Body of the request. +ReqOpts = req_opts():: Request options. StreamRef = reference():: Identifier of the stream for this request. Process the enclosed representation according to the resource's own semantics. @@ -407,12 +446,17 @@ highly recommended to set both when possible. You can use the `gun:data/4` function to send the body, if any. -=== put(ConnPid, Path, Headers, Body) -> StreamRef +=== put(ConnPid, Path, Headers, Body) -> put(ConnPid, Path, Headers, Body, #{}) + +Alias of `gun:put/5`. + +=== put(ConnPid, Path, Headers, Body, ReqOpts) -> StreamRef ConnPid = pid():: The pid of the Gun connection process. Path = iodata():: Path to the resource. Headers = [{binary(), iodata()}]:: Additional request headers. Body = iodata():: Body of the request. +ReqOpts = req_opts():: Request options. StreamRef = reference():: Identifier of the stream for this request. Create or replace a resource. @@ -447,13 +491,18 @@ highly recommended to set both when possible. You can use the `gun:data/4` function to send the body, if any. -=== request(ConnPid, Method, Path, Headers, Body) -> StreamRef +=== request(ConnPid, Method, Path, Headers, Body) -> request(ConnPid, Method, Path, Headers, Body, #{}) + +Alias of `gun:request/6`. + +=== request(ConnPid, Method, Path, Headers, Body, ReqOpts) -> StreamRef ConnPid = pid():: The pid of the Gun connection process. Method = iodata():: Request method. Path = iodata():: Path of the resource. Headers = [{binary(), iodata()}]:: Additional request headers. Body = iodata():: Body of the request. +ReqOpts = req_opts():: Request options. StreamRef = reference():: Identifier of the stream for this request. Perform the given request. diff --git a/src/gun.erl b/src/gun.erl index 16397c9..8267460 100644 --- a/src/gun.erl +++ b/src/gun.erl @@ -23,20 +23,28 @@ %% Requests. -export([delete/2]). -export([delete/3]). +-export([delete/4]). -export([get/2]). -export([get/3]). +-export([get/4]). -export([head/2]). -export([head/3]). +-export([head/4]). -export([options/2]). -export([options/3]). +-export([options/4]). -export([patch/3]). -export([patch/4]). +-export([patch/5]). -export([post/3]). -export([post/4]). +-export([post/5]). -export([put/3]). -export([put/4]). +-export([put/5]). -export([request/4]). -export([request/5]). +-export([request/6]). %% Streaming data. -export([data/4]). @@ -84,6 +92,9 @@ -type opts() :: map(). -export_type([opts/0]). +-type req_opts() :: map(). +-export_type([req_opts/0]). + -type http_opts() :: map(). -export_type([http_opts/0]). @@ -226,6 +237,10 @@ delete(ServerPid, Path) -> delete(ServerPid, Path, Headers) -> request(ServerPid, <<"DELETE">>, Path, Headers). +-spec delete(pid(), iodata(), headers(), req_opts()) -> reference(). +delete(ServerPid, Path, Headers, ReqOpts) -> + request(ServerPid, <<"DELETE">>, Path, Headers, <<>>, ReqOpts). + -spec get(pid(), iodata()) -> reference(). get(ServerPid, Path) -> request(ServerPid, <<"GET">>, Path, []). @@ -234,6 +249,10 @@ get(ServerPid, Path) -> get(ServerPid, Path, Headers) -> request(ServerPid, <<"GET">>, Path, Headers). +-spec get(pid(), iodata(), headers(), req_opts()) -> reference(). +get(ServerPid, Path, Headers, ReqOpts) -> + request(ServerPid, <<"GET">>, Path, Headers, <<>>, ReqOpts). + -spec head(pid(), iodata()) -> reference(). head(ServerPid, Path) -> request(ServerPid, <<"HEAD">>, Path, []). @@ -242,6 +261,10 @@ head(ServerPid, Path) -> head(ServerPid, Path, Headers) -> request(ServerPid, <<"HEAD">>, Path, Headers). +-spec head(pid(), iodata(), headers(), req_opts()) -> reference(). +head(ServerPid, Path, Headers, ReqOpts) -> + request(ServerPid, <<"HEAD">>, Path, Headers, <<>>, ReqOpts). + -spec options(pid(), iodata()) -> reference(). options(ServerPid, Path) -> request(ServerPid, <<"OPTIONS">>, Path, []). @@ -250,6 +273,10 @@ options(ServerPid, Path) -> options(ServerPid, Path, Headers) -> request(ServerPid, <<"OPTIONS">>, Path, Headers). +-spec options(pid(), iodata(), headers(), req_opts()) -> reference(). +options(ServerPid, Path, Headers, ReqOpts) -> + request(ServerPid, <<"OPTIONS">>, Path, Headers, <<>>, ReqOpts). + -spec patch(pid(), iodata(), headers()) -> reference(). patch(ServerPid, Path, Headers) -> request(ServerPid, <<"PATCH">>, Path, Headers). @@ -258,6 +285,10 @@ patch(ServerPid, Path, Headers) -> patch(ServerPid, Path, Headers, Body) -> request(ServerPid, <<"PATCH">>, Path, Headers, Body). +-spec patch(pid(), iodata(), headers(), iodata(), req_opts()) -> reference(). +patch(ServerPid, Path, Headers, Body, ReqOpts) -> + request(ServerPid, <<"PATCH">>, Path, Headers, Body, ReqOpts). + -spec post(pid(), iodata(), headers()) -> reference(). post(ServerPid, Path, Headers) -> request(ServerPid, <<"POST">>, Path, Headers). @@ -266,6 +297,10 @@ post(ServerPid, Path, Headers) -> post(ServerPid, Path, Headers, Body) -> request(ServerPid, <<"POST">>, Path, Headers, Body). +-spec post(pid(), iodata(), headers(), iodata(), req_opts()) -> reference(). +post(ServerPid, Path, Headers, Body, ReqOpts) -> + request(ServerPid, <<"POST">>, Path, Headers, Body, ReqOpts). + -spec put(pid(), iodata(), headers()) -> reference(). put(ServerPid, Path, Headers) -> request(ServerPid, <<"PUT">>, Path, Headers). @@ -274,16 +309,23 @@ put(ServerPid, Path, Headers) -> put(ServerPid, Path, Headers, Body) -> request(ServerPid, <<"PUT">>, Path, Headers, Body). +-spec put(pid(), iodata(), headers(), iodata(), req_opts()) -> reference(). +put(ServerPid, Path, Headers, Body, ReqOpts) -> + request(ServerPid, <<"PUT">>, Path, Headers, Body, ReqOpts). + -spec request(pid(), iodata(), iodata(), headers()) -> reference(). request(ServerPid, Method, Path, Headers) -> - StreamRef = make_ref(), - _ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers}, - StreamRef. + request(ServerPid, Method, Path, Headers, <<>>, #{}). -spec request(pid(), iodata(), iodata(), headers(), iodata()) -> reference(). request(ServerPid, Method, Path, Headers, Body) -> + request(ServerPid, Method, Path, Headers, Body, #{}). + +-spec request(pid(), iodata(), iodata(), headers(), iodata(), req_opts()) -> reference(). +request(ServerPid, Method, Path, Headers, Body, ReqOpts) -> StreamRef = make_ref(), - _ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers, Body}, + ReplyTo = maps:get(reply_to, ReqOpts, self()), + _ = ServerPid ! {request, ReplyTo, StreamRef, Method, Path, Headers, Body}, StreamRef. %% Streaming data. @@ -611,20 +653,22 @@ loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, por keepalive -> ProtoState2 = Protocol:keepalive(ProtoState), before_loop(State#state{protocol_state=ProtoState2}); - {request, Owner, StreamRef, Method, Path, Headers} -> + {request, ReplyTo, StreamRef, Method, Path, Headers, <<>>} -> ProtoState2 = Protocol:request(ProtoState, - StreamRef, Method, Host, Port, Path, Headers), + StreamRef, ReplyTo, Method, Host, Port, Path, Headers), loop(State#state{protocol_state=ProtoState2}); - {request, Owner, StreamRef, Method, Path, Headers, Body} -> + {request, ReplyTo, StreamRef, Method, Path, Headers, Body} -> ProtoState2 = Protocol:request(ProtoState, - StreamRef, Method, Host, Port, Path, Headers, Body), + StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body), loop(State#state{protocol_state=ProtoState2}); - {data, Owner, StreamRef, IsFin, Data} -> + %% @todo Do we want to reject ReplyTo if it's not the process + %% who initiated the connection? For both data and cancel. + {data, ReplyTo, StreamRef, IsFin, Data} -> ProtoState2 = Protocol:data(ProtoState, - StreamRef, IsFin, Data), + StreamRef, ReplyTo, IsFin, Data), loop(State#state{protocol_state=ProtoState2}); - {cancel, Owner, StreamRef} -> - ProtoState2 = Protocol:cancel(ProtoState, StreamRef), + {cancel, ReplyTo, StreamRef} -> + ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo), loop(State#state{protocol_state=ProtoState2}); %% @todo Maybe make an interface in the protocol module instead of checking on protocol name. %% An interface would also make sure that HTTP/1.0 can't upgrade. @@ -662,6 +706,8 @@ loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, host=Host, por "Connection needs to be upgraded to Websocket " "before the gun:ws_send/1 function can be used."}}, loop(State); + %% @todo The ReplyTo patch disabled the notowner behavior. + %% We need to add an option to enforce this behavior if needed. Any when is_tuple(Any), is_pid(element(2, Any)) -> element(2, Any) ! {gun_error, self(), {notowner, "Operations are restricted to the owner of the connection."}}, diff --git a/src/gun_http.erl b/src/gun_http.erl index 8ebd42e..2388182 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -20,10 +20,10 @@ -export([handle/2]). -export([close/1]). -export([keepalive/1]). --export([request/7]). -export([request/8]). --export([data/4]). --export([cancel/2]). +-export([request/9]). +-export([data/5]). +-export([cancel/3]). -export([down/1]). -export([ws_upgrade/7]). @@ -34,6 +34,7 @@ -record(stream, { ref :: reference() | websocket_info(), + reply_to :: pid(), method :: binary(), is_alive :: boolean(), handler_state :: undefined | gun_content_handler:state() @@ -47,7 +48,6 @@ content_handlers :: gun_content_handler:opt(), connection = keepalive :: keepalive | close, buffer = <<>> :: binary(), - %% Stream reference, request method and whether the stream is alive. streams = [] :: [#stream{}], in = head :: io(), in_state = {0, 0} :: {non_neg_integer(), non_neg_integer()}, @@ -159,9 +159,10 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) -> end end. -handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion, +handle_head(Data, State=#http_state{version=ClientVersion, content_handlers=Handlers0, connection=Conn, - streams=[Stream=#stream{ref=StreamRef, method=Method, is_alive=IsAlive}|Tail]}) -> + streams=[Stream=#stream{ref=StreamRef, reply_to=ReplyTo, + method=Method, is_alive=IsAlive}|Tail]}) -> {Version, Status, _, Rest} = cow_http:parse_status_line(Data), {Headers, Rest2} = cow_http:parse_headers(Rest), case {Status, StreamRef} of @@ -178,13 +179,12 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion, {websocket, SR, _, _, _} -> SR; _ -> StreamRef end, - Owner ! {gun_response, self(), StreamRef2, + ReplyTo ! {gun_response, self(), StreamRef2, IsFin, Status, Headers}, - %% @todo Change to ReplyTo. case IsFin of fin -> undefined; nofin -> - gun_content_handler:init(Owner, StreamRef, + gun_content_handler:init(ReplyTo, StreamRef, Status, Headers, Handlers0) end end, @@ -219,20 +219,20 @@ send_data_if_alive(Data, State=#http_state{streams=[Stream=#stream{ send_data_if_alive(_, State, _) -> State. -close(State=#http_state{in=body_close, owner=Owner, streams=[_|Tail]}) -> +close(State=#http_state{in=body_close, streams=[_|Tail]}) -> _ = send_data_if_alive(<<>>, State, fin), - close_streams(Owner, Tail); -close(#http_state{owner=Owner, streams=Streams}) -> - close_streams(Owner, Streams). + close_streams(Tail); +close(#http_state{streams=Streams}) -> + close_streams(Streams). -close_streams(_, []) -> +close_streams([]) -> ok; -close_streams(Owner, [#stream{is_alive=false}|Tail]) -> - close_streams(Owner, Tail); -close_streams(Owner, [#stream{ref=StreamRef}|Tail]) -> - Owner ! {gun_error, self(), StreamRef, {closed, +close_streams([#stream{is_alive=false}|Tail]) -> + close_streams(Tail); +close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) -> + ReplyTo ! {gun_error, self(), StreamRef, {closed, "The connection was lost."}}, - close_streams(Owner, Tail). + close_streams(Tail). %% We can only keep-alive by sending an empty line in-between streams. keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) -> @@ -242,7 +242,7 @@ keepalive(State) -> State. request(State=#http_state{socket=Socket, transport=Transport, version=Version, - out=head}, StreamRef, Method, Host, Port, Path, Headers) -> + out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) -> Headers2 = lists:keydelete(<<"transfer-encoding">>, 1, Headers), Headers3 = case lists:keymember(<<"host">>, 1, Headers) of false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2]; @@ -256,10 +256,10 @@ request(State=#http_state{socket=Socket, transport=Transport, version=Version, _ -> Headers3 end, Transport:send(Socket, cow_http:request(Method, Path, Version, Headers4)), - new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, Method). + new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method). request(State=#http_state{socket=Socket, transport=Transport, version=Version, - out=head}, StreamRef, Method, Host, Port, Path, Headers, Body) -> + out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body) -> Headers2 = lists:keydelete(<<"content-length">>, 1, lists:keydelete(<<"transfer-encoding">>, 1, Headers)), Headers3 = case lists:keymember(<<"host">>, 1, Headers) of @@ -273,17 +273,17 @@ request(State=#http_state{socket=Socket, transport=Transport, version=Version, {<<"content-length">>, integer_to_binary(iolist_size(Body))} |Headers3]), Body]), - new_stream(State#http_state{connection=Conn}, StreamRef, Method). + new_stream(State#http_state{connection=Conn}, StreamRef, ReplyTo, Method). %% We are expecting a new stream. -data(State=#http_state{out=head}, StreamRef, _, _) -> - error_stream_closed(State, StreamRef); +data(State=#http_state{out=head}, StreamRef, ReplyTo, _, _) -> + error_stream_closed(State, StreamRef, ReplyTo); %% There are no active streams. -data(State=#http_state{streams=[]}, StreamRef, _, _) -> - error_stream_not_found(State, StreamRef); +data(State=#http_state{streams=[]}, StreamRef, ReplyTo, _, _) -> + error_stream_not_found(State, StreamRef, ReplyTo); %% We can only send data on the last created stream. data(State=#http_state{socket=Socket, transport=Transport, version=Version, - out=Out, streams=Streams}, StreamRef, IsFin, Data) -> + out=Out, streams=Streams}, StreamRef, ReplyTo, IsFin, Data) -> case lists:last(Streams) of #stream{ref=StreamRef, is_alive=true} -> DataLength = iolist_size(Data), @@ -316,16 +316,16 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, State end; _ -> - error_stream_not_found(State, StreamRef) + error_stream_not_found(State, StreamRef, ReplyTo) end. %% We can't cancel anything, we can just stop forwarding messages to the owner. -cancel(State, StreamRef) -> +cancel(State, StreamRef, ReplyTo) -> case is_stream(State, StreamRef) of true -> cancel_stream(State, StreamRef); false -> - error_stream_not_found(State, StreamRef) + error_stream_not_found(State, StreamRef, ReplyTo) end. %% HTTP does not provide any way to figure out what streams are unprocessed. @@ -336,13 +336,13 @@ down(#http_state{streams=Streams}) -> end || #stream{ref=Ref} <- Streams], {KilledStreams, []}. -error_stream_closed(State=#http_state{owner=Owner}, StreamRef) -> - Owner ! {gun_error, self(), StreamRef, {badstate, +error_stream_closed(State, StreamRef, ReplyTo) -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream has already been closed."}}, State. -error_stream_not_found(State=#http_state{owner=Owner}, StreamRef) -> - Owner ! {gun_error, self(), StreamRef, {badstate, +error_stream_not_found(State, StreamRef, ReplyTo) -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream cannot be found."}}, State. @@ -401,12 +401,13 @@ response_io_from_headers(_, Version, _Status, Headers) -> %% Streams. -new_stream(State=#http_state{streams=Streams}, StreamRef, Method) -> +new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, Method) -> State#http_state{streams=Streams - ++ [#stream{ref=StreamRef, method=iolist_to_binary(Method), is_alive=true}]}. + ++ [#stream{ref=StreamRef, reply_to=ReplyTo, + method=iolist_to_binary(Method), is_alive=true}]}. is_stream(#http_state{streams=Streams}, StreamRef) -> - lists:keymember(StreamRef, 1, Streams). + lists:keymember(StreamRef, #stream.ref, Streams). cancel_stream(State=#http_state{streams=Streams}, StreamRef) -> Streams2 = [case Ref of @@ -425,7 +426,7 @@ end_stream(State=#http_state{streams=[_|Tail]}) -> %% Ensure version is 1.1. ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _) -> error; %% @todo -ws_upgrade(State=#http_state{socket=Socket, transport=Transport, out=head}, +ws_upgrade(State=#http_state{socket=Socket, transport=Transport, owner=Owner, out=head}, StreamRef, Host, Port, Path, Headers0, WsOpts) -> {Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of true -> {[{<<"sec-websocket-extensions">>, @@ -457,7 +458,7 @@ ws_upgrade(State=#http_state{socket=Socket, transport=Transport, out=head}, end, Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers)), new_stream(State#http_state{connection=keepalive, out=head}, - {websocket, StreamRef, Key, GunExtensions, WsOpts}, <<"GET">>). + {websocket, StreamRef, Key, GunExtensions, WsOpts}, Owner, <<"GET">>). ws_handshake(Buffer, State, Headers, Key, GunExtensions, Opts) -> %% @todo check upgrade, connection diff --git a/src/gun_http2.erl b/src/gun_http2.erl index 30ee06e..6073119 100644 --- a/src/gun_http2.erl +++ b/src/gun_http2.erl @@ -20,15 +20,16 @@ -export([handle/2]). -export([close/1]). -export([keepalive/1]). --export([request/7]). -export([request/8]). --export([data/4]). --export([cancel/2]). +-export([request/9]). +-export([data/5]). +-export([cancel/3]). -export([down/1]). -record(stream, { id :: non_neg_integer(), ref :: reference(), + reply_to :: pid(), %% Whether we finished sending data. local = nofin :: cowboy_stream:fin(), %% Whether we finished receiving data. @@ -111,19 +112,18 @@ frame({data, StreamID, IsFin, Data}, State) -> end; %% Single HEADERS frame headers block. frame({headers, StreamID, IsFin, head_fin, HeaderBlock}, - State=#http2_state{owner=Owner, decode_state=DecodeState0, content_handlers=Handlers0}) -> + State=#http2_state{decode_state=DecodeState0, content_handlers=Handlers0}) -> case get_stream_by_id(StreamID, State) of - Stream = #stream{ref=StreamRef, remote=nofin} -> + Stream = #stream{ref=StreamRef, reply_to=ReplyTo, remote=nofin} -> try cow_hpack:decode(HeaderBlock, DecodeState0) of {Headers0, DecodeState} -> case lists:keytake(<<":status">>, 1, Headers0) of {value, {_, Status}, Headers} -> - Owner ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers}, - %% @todo Change to ReplyTo. + ReplyTo ! {gun_response, self(), StreamRef, IsFin, parse_status(Status), Headers}, Handlers = case IsFin of fin -> undefined; nofin -> - gun_content_handler:init(Owner, StreamRef, + gun_content_handler:init(ReplyTo, StreamRef, Status, Headers, Handlers0) end, remote_fin(Stream#stream{handler_state=Handlers}, @@ -133,7 +133,7 @@ frame({headers, StreamID, IsFin, head_fin, HeaderBlock}, 'Malformed response; missing :status in HEADERS frame. (RFC7540 8.1.2.4)'}) end catch _:_ -> - terminate(State, {connection_error, compression_error, + terminate(State, StreamID, {connection_error, compression_error, 'Error while trying to decode HPACK-encoded header block. (RFC7540 4.3)'}) end; _ -> @@ -177,7 +177,7 @@ frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock}, case get_stream_by_id(PromisedStreamID, State) of false -> case get_stream_by_id(StreamID, State) of - #stream{ref=StreamRef} -> + #stream{ref=StreamRef, reply_to=ReplyTo} -> try cow_hpack:decode(HeaderBlock, DecodeState0) of {Headers0, DecodeState} -> {Method, Scheme, Authority, Path, Headers} = try @@ -191,9 +191,9 @@ frame({push_promise, StreamID, head_fin, PromisedStreamID, HeaderBlock}, 'Malformed push promise; missing pseudo-header field. (RFC7540 8.1.2.3)'}) end, NewStreamRef = make_ref(), - Owner ! {gun_push, self(), StreamRef, NewStreamRef, Method, + ReplyTo ! {gun_push, self(), StreamRef, NewStreamRef, Method, iolist_to_binary([Scheme, <<"://">>, Authority, Path]), Headers}, - new_stream(PromisedStreamID, NewStreamRef, nofin, fin, + new_stream(PromisedStreamID, NewStreamRef, ReplyTo, nofin, fin, State#http2_state{decode_state=DecodeState}) catch _:_ -> terminate(State, {connection_error, compression_error, @@ -216,8 +216,8 @@ frame({ping, Opaque}, State=#http2_state{socket=Socket, transport=Transport}) -> frame({ping_ack, _Opaque}, State) -> State; %% GOAWAY frame. -frame(Frame={goaway, _, _, _}, State) -> - terminate(State, {stop, Frame, 'Client is going away.'}); +frame(Frame={goaway, StreamID, _, _}, State) -> + terminate(State, StreamID, {stop, Frame, 'Client is going away.'}); %% Connection-wide WINDOW_UPDATE frame. frame({window_update, _Increment}, State) -> %% @todo control flow @@ -227,30 +227,30 @@ frame({window_update, _StreamID, _Increment}, State) -> %% @todo stream-specific control flow State; %% Unexpected CONTINUATION frame. -frame({continuation, _, _, _}, State) -> - terminate(State, {connection_error, protocol_error, +frame({continuation, StreamID, _, _}, State) -> + terminate(State, StreamID, {connection_error, protocol_error, 'CONTINUATION frames MUST be preceded by a HEADERS frame. (RFC7540 6.10)'}). parse_status(Status) -> << Code:3/binary, _/bits >> = Status, list_to_integer(binary_to_list(Code)). -close(#http2_state{owner=Owner, streams=Streams}) -> - close_streams(Owner, Streams). +close(#http2_state{streams=Streams}) -> + close_streams(Streams). -close_streams(_, []) -> +close_streams([]) -> ok; -close_streams(Owner, [#stream{ref=StreamRef}|Tail]) -> - Owner ! {gun_error, self(), StreamRef, {closed, +close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) -> + ReplyTo ! {gun_error, self(), StreamRef, {closed, "The connection was lost."}}, - close_streams(Owner, Tail). + close_streams(Tail). keepalive(State=#http2_state{socket=Socket, transport=Transport}) -> Transport:send(Socket, cow_http2:ping(0)), State. request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0, - stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers) -> + stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) -> {HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers), IsFin = case (false =/= lists:keyfind(<<"content-type">>, 1, Headers)) orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers)) of @@ -258,12 +258,12 @@ request(State=#http2_state{socket=Socket, transport=Transport, encode_state=Enco false -> fin end, Transport:send(Socket, cow_http2:headers(StreamID, IsFin, HeaderBlock)), - new_stream(StreamID, StreamRef, nofin, IsFin, + new_stream(StreamID, StreamRef, ReplyTo, nofin, IsFin, State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}). %% @todo Handle Body > 16MB. (split it out into many frames) request(State=#http2_state{socket=Socket, transport=Transport, encode_state=EncodeState0, - stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers0, Body) -> + stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers0, Body) -> Headers = lists:keystore(<<"content-length">>, 1, Headers0, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), {HeaderBlock, EncodeState} = prepare_headers(EncodeState0, Transport, Method, Host, Port, Path, Headers), @@ -272,7 +272,7 @@ request(State=#http2_state{socket=Socket, transport=Transport, encode_state=Enco %% Use the length set by the server instead, if any. %% @todo Would be better if we didn't have to convert to binary. send_data(Socket, Transport, StreamID, fin, iolist_to_binary(Body), 16384), - new_stream(StreamID, StreamRef, nofin, fin, + new_stream(StreamID, StreamRef, ReplyTo, nofin, fin, State#http2_state{stream_id=StreamID + 2, encode_state=EncodeState}). prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) -> @@ -299,10 +299,11 @@ prepare_headers(EncodeState, Transport, Method, Host0, Port, Path, Headers0) -> |Headers1], cow_hpack:encode(Headers, EncodeState). -data(State=#http2_state{socket=Socket, transport=Transport}, StreamRef, IsFin, Data) -> +data(State=#http2_state{socket=Socket, transport=Transport}, + StreamRef, ReplyTo, IsFin, Data) -> case get_stream_by_ref(StreamRef, State) of #stream{local=fin} -> - error_stream_closed(State, StreamRef); + error_stream_closed(State, StreamRef, ReplyTo); S = #stream{} -> %% @todo 16384 is the default SETTINGS_MAX_FRAME_SIZE. %% Use the length set by the server instead, if any. @@ -310,7 +311,7 @@ data(State=#http2_state{socket=Socket, transport=Transport}, StreamRef, IsFin, D send_data(Socket, Transport, S#stream.id, IsFin, iolist_to_binary(Data), 16384), local_fin(S, State, IsFin); false -> - error_stream_not_found(State, StreamRef) + error_stream_not_found(State, StreamRef, ReplyTo) end. %% This same function is found in cowboy_http2. @@ -325,13 +326,13 @@ send_data(Socket, Transport, StreamID, IsFin, Data, Length) -> end. cancel(State=#http2_state{socket=Socket, transport=Transport}, - StreamRef) -> + StreamRef, ReplyTo) -> case get_stream_by_ref(StreamRef, State) of #stream{id=StreamID} -> Transport:send(Socket, cow_http2:rst_stream(StreamID, cancel)), delete_stream(StreamID, State); false -> - error_stream_not_found(State, StreamRef) + error_stream_not_found(State, StreamRef, ReplyTo) end. %% @todo Add unprocessed streams when GOAWAY handling is done. @@ -339,18 +340,31 @@ down(#http2_state{streams=Streams}) -> KilledStreams = [Ref || #stream{ref=Ref} <- Streams], {KilledStreams, []}. -terminate(#http2_state{owner=Owner}, Reason) -> - Owner ! {gun_error, self(), Reason}, +terminate(#http2_state{streams=Streams}, Reason) -> + %% Because a particular stream is unknown, + %% we're sending the error message to all streams. + _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams], %% @todo Send GOAWAY frame. %% @todo LastGoodStreamID close. -stream_reset(State=#http2_state{owner=Owner, socket=Socket, transport=Transport, +terminate(State, StreamID, Reason) -> + case get_stream_by_id(StreamID, State) of + #stream{reply_to=ReplyTo} -> + ReplyTo ! {gun_error, self(), Reason}, + %% @todo Send GOAWAY frame. + %% @todo LastGoodStreamID + close; + _ -> + terminate(State, Reason) + end. + +stream_reset(State=#http2_state{socket=Socket, transport=Transport, streams=Streams0}, StreamID, StreamError={stream_error, Reason, _}) -> Transport:send(Socket, cow_http2:rst_stream(StreamID, Reason)), case lists:keytake(StreamID, #stream.id, Streams0) of - {value, #stream{ref=StreamRef}, Streams} -> - Owner ! {gun_error, self(), StreamRef, StreamError}, + {value, #stream{ref=StreamRef, reply_to=ReplyTo}, Streams} -> + ReplyTo ! {gun_error, self(), StreamRef, StreamError}, State#http2_state{streams=Streams}; false -> %% @todo Unknown stream. Not sure what to do here. Check again once all @@ -358,22 +372,22 @@ stream_reset(State=#http2_state{owner=Owner, socket=Socket, transport=Transport, State end. -error_stream_closed(State=#http2_state{owner=Owner}, StreamRef) -> - Owner ! {gun_error, self(), StreamRef, {badstate, +error_stream_closed(State, StreamRef, ReplyTo) -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream has already been closed."}}, State. -error_stream_not_found(State=#http2_state{owner=Owner}, StreamRef) -> - Owner ! {gun_error, self(), StreamRef, {badstate, +error_stream_not_found(State, StreamRef, ReplyTo) -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream cannot be found."}}, State. %% Streams. %% @todo probably change order of args and have state first? -new_stream(StreamID, StreamRef, Remote, Local, +new_stream(StreamID, StreamRef, ReplyTo, Remote, Local, State=#http2_state{streams=Streams}) -> - New = #stream{id=StreamID, ref=StreamRef, remote=Remote, local=Local}, + New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, remote=Remote, local=Local}, State#http2_state{streams=[New|Streams]}. get_stream_by_id(StreamID, #http2_state{streams=Streams}) -> diff --git a/src/gun_spdy.erl b/src/gun_spdy.erl index dcd7496..b7306a8 100644 --- a/src/gun_spdy.erl +++ b/src/gun_spdy.erl @@ -20,15 +20,16 @@ -export([handle/2]). -export([close/1]). -export([keepalive/1]). --export([request/7]). -export([request/8]). --export([data/4]). --export([cancel/2]). +-export([request/9]). +-export([data/5]). +-export([cancel/3]). -export([down/1]). -record(stream, { id :: non_neg_integer(), ref :: reference(), + reply_to :: pid(), in :: boolean(), %% true = open out :: boolean(), %% true = open version :: binary() @@ -75,38 +76,36 @@ handle_loop(Data, State=#spdy_state{zinf=Zinf}) -> State#spdy_state{buffer=Data} end. -handle_frame(Rest, State=#spdy_state{owner=Owner, - socket=Socket, transport=Transport}, +handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport}, {data, StreamID, IsFin, Data}) -> case get_stream_by_id(StreamID, State) of #stream{in=false} -> Transport:send(Socket, cow_spdy:rst_stream(StreamID, stream_already_closed)), handle_loop(Rest, delete_stream(StreamID, State)); - S = #stream{ref=StreamRef} when IsFin -> - Owner ! {gun_data, self(), StreamRef, fin, Data}, + S = #stream{ref=StreamRef, reply_to=ReplyTo} when IsFin -> + ReplyTo ! {gun_data, self(), StreamRef, fin, Data}, handle_loop(Rest, in_fin_stream(S, State)); - #stream{ref=StreamRef} -> - Owner ! {gun_data, self(), StreamRef, nofin, Data}, + #stream{ref=StreamRef, reply_to=ReplyTo} -> + ReplyTo ! {gun_data, self(), StreamRef, nofin, Data}, handle_loop(Rest, State); false -> Transport:send(Socket, cow_spdy:rst_stream(StreamID, invalid_stream)), handle_loop(Rest, delete_stream(StreamID, State)) end; -handle_frame(Rest, State=#spdy_state{owner=Owner, - socket=Socket, transport=Transport}, +handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport}, {syn_stream, StreamID, AssocToStreamID, IsFin, IsUnidirectional, _, Method, Scheme, Host, Path, Version, Headers}) when AssocToStreamID =/= 0, IsUnidirectional -> case get_stream_by_id(StreamID, State) of false -> case get_stream_by_id(AssocToStreamID, State) of - #stream{ref=AssocToStreamRef} -> + #stream{ref=AssocToStreamRef, reply_to=ReplyTo} -> StreamRef = make_ref(), - Owner ! {gun_push, self(), AssocToStreamRef, StreamRef, Method, + ReplyTo ! {gun_push, self(), AssocToStreamRef, StreamRef, Method, iolist_to_binary([Scheme, <<"://">>, Host, Path]), Headers}, - handle_loop(Rest, new_stream(StreamID, StreamRef, + handle_loop(Rest, new_stream(StreamID, StreamRef, ReplyTo, not IsFin, false, Version, State)); false -> Transport:send(Socket, @@ -123,20 +122,19 @@ handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport}, Transport:send(Socket, cow_spdy:rst_stream(StreamID, protocol_error)), handle_loop(Rest, State); -handle_frame(Rest, State=#spdy_state{owner=Owner, - socket=Socket, transport=Transport}, +handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport}, {syn_reply, StreamID, IsFin, Status, _, Headers}) -> case get_stream_by_id(StreamID, State) of #stream{in=false} -> Transport:send(Socket, cow_spdy:rst_stream(StreamID, stream_already_closed)), handle_loop(Rest, delete_stream(StreamID, State)); - S = #stream{ref=StreamRef} when IsFin -> - Owner ! {gun_response, self(), StreamRef, fin, + S = #stream{ref=StreamRef, reply_to=ReplyTo} when IsFin -> + ReplyTo ! {gun_response, self(), StreamRef, fin, parse_status(Status), Headers}, handle_loop(Rest, in_fin_stream(S, State)); - #stream{ref=StreamRef} -> - Owner ! {gun_response, self(), StreamRef, nofin, + #stream{ref=StreamRef, reply_to=ReplyTo} -> + ReplyTo ! {gun_response, self(), StreamRef, nofin, parse_status(Status), Headers}, handle_loop(Rest, State); false -> @@ -144,11 +142,10 @@ handle_frame(Rest, State=#spdy_state{owner=Owner, cow_spdy:rst_stream(StreamID, invalid_stream)), handle_loop(Rest, delete_stream(StreamID, State)) end; -handle_frame(Rest, State=#spdy_state{owner=Owner}, - {rst_stream, StreamID, Status}) -> +handle_frame(Rest, State, {rst_stream, StreamID, Status}) -> case get_stream_by_id(StreamID, State) of - #stream{ref=StreamRef} -> - Owner ! {gun_error, self(), StreamRef, Status}, + #stream{ref=StreamRef, reply_to=ReplyTo} -> + ReplyTo ! {gun_error, self(), StreamRef, Status}, handle_loop(Rest, delete_stream(StreamID, State)); false -> handle_loop(Rest, State) @@ -177,10 +174,12 @@ handle_frame(Rest, State, {window_update, StreamID, DeltaWindowSize}) -> error_logger:error_msg("Ignored WINDOW_UPDATE control frame ~p ~p~n", [StreamID, DeltaWindowSize]), handle_loop(Rest, State); -handle_frame(_, #spdy_state{owner=Owner, socket=Socket, transport=Transport}, +handle_frame(_, #spdy_state{streams=Streams, socket=Socket, transport=Transport}, {error, badprotocol}) -> - Owner ! {gun_error, self(), {badprotocol, - "The remote endpoint sent invalid data."}}, + %% Because a particular stream is unknown, + %% we're sending the error message to all streams. + Reason = {badprotocol, "The remote endpoint sent invalid data."}, + _ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams], %% @todo LastGoodStreamID Transport:send(Socket, cow_spdy:goaway(0, protocol_error)), close. @@ -189,15 +188,15 @@ parse_status(Status) -> << Code:3/binary, _/bits >> = Status, list_to_integer(binary_to_list(Code)). -close(#spdy_state{owner=Owner, streams=Streams}) -> - close_streams(Owner, Streams). +close(#spdy_state{streams=Streams}) -> + close_streams(Streams). -close_streams(_, []) -> +close_streams([]) -> ok; -close_streams(Owner, [#stream{ref=StreamRef}|Tail]) -> - Owner ! {gun_error, self(), StreamRef, {closed, +close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) -> + ReplyTo ! {gun_error, self(), StreamRef, {closed, "The connection was lost."}}, - close_streams(Owner, Tail). + close_streams(Tail). keepalive(State=#spdy_state{socket=Socket, transport=Transport, ping_id=PingID}) -> @@ -206,20 +205,20 @@ keepalive(State=#spdy_state{socket=Socket, transport=Transport, %% @todo Always https scheme? request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef, - stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers) -> + stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) -> {Host2, Headers2} = prepare_request(Headers, Host, Port), Out = (false =/= lists:keyfind(<<"content-type">>, 1, Headers2)) orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers2)), Transport:send(Socket, cow_spdy:syn_stream(Zdef, StreamID, 0, not Out, false, 0, Method, <<"https">>, Host2, Path, <<"HTTP/1.1">>, Headers2)), - new_stream(StreamID, StreamRef, true, Out, <<"HTTP/1.1">>, + new_stream(StreamID, StreamRef, ReplyTo, true, Out, <<"HTTP/1.1">>, State#spdy_state{stream_id=StreamID + 2}). %% @todo Handle Body > 16MB. (split it out into many frames) %% @todo Always https scheme? request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef, - stream_id=StreamID}, StreamRef, Method, Host, Port, Path, Headers, Body) -> + stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body) -> {Host2, Headers2} = prepare_request(Headers, Host, Port), Headers3 = lists:keystore(<<"content-length">>, 1, Headers2, {<<"content-length">>, integer_to_binary(iolist_size(Body))}), @@ -229,7 +228,7 @@ request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef, Method, <<"https">>, Host2, Path, <<"HTTP/1.1">>, Headers3), cow_spdy:data(StreamID, true, Body) ]), - new_stream(StreamID, StreamRef, true, false, <<"HTTP/1.1">>, + new_stream(StreamID, StreamRef, ReplyTo, true, false, <<"HTTP/1.1">>, State#spdy_state{stream_id=StreamID + 2}). prepare_request(Headers, Host, Port) -> @@ -243,10 +242,10 @@ prepare_request(Headers, Host, Port) -> end. data(State=#spdy_state{socket=Socket, transport=Transport}, - StreamRef, IsFin, Data) -> + StreamRef, ReplyTo, IsFin, Data) -> case get_stream_by_ref(StreamRef, State) of #stream{out=false} -> - error_stream_closed(State, StreamRef); + error_stream_closed(State, StreamRef, ReplyTo); S = #stream{} -> IsFin2 = IsFin =:= fin, Transport:send(Socket, cow_spdy:data(S#stream.id, IsFin2, Data)), @@ -256,17 +255,17 @@ data(State=#spdy_state{socket=Socket, transport=Transport}, State end; false -> - error_stream_not_found(State, StreamRef) + error_stream_not_found(State, StreamRef, ReplyTo) end. cancel(State=#spdy_state{socket=Socket, transport=Transport}, - StreamRef) -> + StreamRef, ReplyTo) -> case get_stream_by_ref(StreamRef, State) of #stream{id=StreamID} -> Transport:send(Socket, cow_spdy:rst_stream(StreamID, cancel)), delete_stream(StreamID, State); false -> - error_stream_not_found(State, StreamRef) + error_stream_not_found(State, StreamRef, ReplyTo) end. %% @todo Add unprocessed streams when GOAWAY handling is done. @@ -274,22 +273,22 @@ down(#spdy_state{streams=Streams}) -> KilledStreams = [Ref || #stream{ref=Ref} <- Streams], {KilledStreams, []}. -error_stream_closed(State=#spdy_state{owner=Owner}, StreamRef) -> - Owner ! {gun_error, self(), StreamRef, {badstate, +error_stream_closed(State, StreamRef, ReplyTo) -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream has already been closed."}}, State. -error_stream_not_found(State=#spdy_state{owner=Owner}, StreamRef) -> - Owner ! {gun_error, self(), StreamRef, {badstate, +error_stream_not_found(State, StreamRef, ReplyTo) -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream cannot be found."}}, State. %% Streams. %% @todo probably change order of args and have state first? -new_stream(StreamID, StreamRef, In, Out, Version, +new_stream(StreamID, StreamRef, ReplyTo, In, Out, Version, State=#spdy_state{streams=Streams}) -> - New = #stream{id=StreamID, ref=StreamRef, + New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo, in=In, out=Out, version=Version}, State#spdy_state{streams=[New|Streams]}. diff --git a/test/gun_SUITE.erl b/test/gun_SUITE.erl index 64b8885..171c874 100644 --- a/test/gun_SUITE.erl +++ b/test/gun_SUITE.erl @@ -75,6 +75,32 @@ gone_reason(_) -> error(timeout) end. +reply_to(_) -> + doc("The reply_to option allows using a separate process for requests."), + do_reply_to(http), + do_reply_to(http2). + +do_reply_to(Protocol) -> + Self = self(), + {ok, Pid} = gun:open("google.com", 443, #{protocols => [Protocol]}), + {ok, Protocol} = gun:await_up(Pid), + ReplyTo = spawn(fun() -> + receive Ref -> + Response = gun:await(Pid, Ref), + Self ! Response + after 1000 -> + error(timeout) + end + end), + Ref = gun:get(Pid, "/", [{<<"host">>, <<"google.com">>}], #{reply_to => ReplyTo}), + ReplyTo ! Ref, + receive + {response, _, _, _} -> + ok + after 1000 -> + error(timeout) + end. + retry_0(_) -> doc("Ensure Gun gives up immediately with retry=0."), {ok, Pid} = gun:open("localhost", 12345, #{retry => 0, retry_timeout => 500}), -- cgit v1.2.3