aboutsummaryrefslogtreecommitdiffstats
path: root/src/gun_http.erl
diff options
context:
space:
mode:
authorAndrei Nesterov <[email protected]>2016-11-16 20:57:11 +0300
committerLoïc Hoguin <[email protected]>2017-05-01 17:31:56 +0200
commit0a181681223aead4043b2437fe493652db6e5f8a (patch)
tree807f4b07b0a1995f7c40af1fde1ad9b330a0ba31 /src/gun_http.erl
parentfb4bd38ffd2c330cbd677d958477aa909210a0b3 (diff)
downloadgun-0a181681223aead4043b2437fe493652db6e5f8a.tar.gz
gun-0a181681223aead4043b2437fe493652db6e5f8a.tar.bz2
gun-0a181681223aead4043b2437fe493652db6e5f8a.zip
Add support for choosing a process to reply to
Diffstat (limited to 'src/gun_http.erl')
-rw-r--r--src/gun_http.erl81
1 files changed, 41 insertions, 40 deletions
diff --git a/src/gun_http.erl b/src/gun_http.erl
index 8ebd42e..2388182 100644
--- a/src/gun_http.erl
+++ b/src/gun_http.erl
@@ -20,10 +20,10 @@
-export([handle/2]).
-export([close/1]).
-export([keepalive/1]).
--export([request/7]).
-export([request/8]).
--export([data/4]).
--export([cancel/2]).
+-export([request/9]).
+-export([data/5]).
+-export([cancel/3]).
-export([down/1]).
-export([ws_upgrade/7]).
@@ -34,6 +34,7 @@
-record(stream, {
ref :: reference() | websocket_info(),
+ reply_to :: pid(),
method :: binary(),
is_alive :: boolean(),
handler_state :: undefined | gun_content_handler:state()
@@ -47,7 +48,6 @@
content_handlers :: gun_content_handler:opt(),
connection = keepalive :: keepalive | close,
buffer = <<>> :: binary(),
- %% Stream reference, request method and whether the stream is alive.
streams = [] :: [#stream{}],
in = head :: io(),
in_state = {0, 0} :: {non_neg_integer(), non_neg_integer()},
@@ -159,9 +159,10 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) ->
end
end.
-handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion,
+handle_head(Data, State=#http_state{version=ClientVersion,
content_handlers=Handlers0, connection=Conn,
- streams=[Stream=#stream{ref=StreamRef, method=Method, is_alive=IsAlive}|Tail]}) ->
+ streams=[Stream=#stream{ref=StreamRef, reply_to=ReplyTo,
+ method=Method, is_alive=IsAlive}|Tail]}) ->
{Version, Status, _, Rest} = cow_http:parse_status_line(Data),
{Headers, Rest2} = cow_http:parse_headers(Rest),
case {Status, StreamRef} of
@@ -178,13 +179,12 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion,
{websocket, SR, _, _, _} -> SR;
_ -> StreamRef
end,
- Owner ! {gun_response, self(), StreamRef2,
+ ReplyTo ! {gun_response, self(), StreamRef2,
IsFin, Status, Headers},
- %% @todo Change to ReplyTo.
case IsFin of
fin -> undefined;
nofin ->
- gun_content_handler:init(Owner, StreamRef,
+ gun_content_handler:init(ReplyTo, StreamRef,
Status, Headers, Handlers0)
end
end,
@@ -219,20 +219,20 @@ send_data_if_alive(Data, State=#http_state{streams=[Stream=#stream{
send_data_if_alive(_, State, _) ->
State.
-close(State=#http_state{in=body_close, owner=Owner, streams=[_|Tail]}) ->
+close(State=#http_state{in=body_close, streams=[_|Tail]}) ->
_ = send_data_if_alive(<<>>, State, fin),
- close_streams(Owner, Tail);
-close(#http_state{owner=Owner, streams=Streams}) ->
- close_streams(Owner, Streams).
+ close_streams(Tail);
+close(#http_state{streams=Streams}) ->
+ close_streams(Streams).
-close_streams(_, []) ->
+close_streams([]) ->
ok;
-close_streams(Owner, [#stream{is_alive=false}|Tail]) ->
- close_streams(Owner, Tail);
-close_streams(Owner, [#stream{ref=StreamRef}|Tail]) ->
- Owner ! {gun_error, self(), StreamRef, {closed,
+close_streams([#stream{is_alive=false}|Tail]) ->
+ close_streams(Tail);
+close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {closed,
"The connection was lost."}},
- close_streams(Owner, Tail).
+ close_streams(Tail).
%% We can only keep-alive by sending an empty line in-between streams.
keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) ->
@@ -242,7 +242,7 @@ keepalive(State) ->
State.
request(State=#http_state{socket=Socket, transport=Transport, version=Version,
- out=head}, StreamRef, Method, Host, Port, Path, Headers) ->
+ out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) ->
Headers2 = lists:keydelete(<<"transfer-encoding">>, 1, Headers),
Headers3 = case lists:keymember(<<"host">>, 1, Headers) of
false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2];
@@ -256,10 +256,10 @@ request(State=#http_state{socket=Socket, transport=Transport, version=Version,
_ -> Headers3
end,
Transport:send(Socket, cow_http:request(Method, Path, Version, Headers4)),
- new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, Method).
+ new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method).
request(State=#http_state{socket=Socket, transport=Transport, version=Version,
- out=head}, StreamRef, Method, Host, Port, Path, Headers, Body) ->
+ out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body) ->
Headers2 = lists:keydelete(<<"content-length">>, 1,
lists:keydelete(<<"transfer-encoding">>, 1, Headers)),
Headers3 = case lists:keymember(<<"host">>, 1, Headers) of
@@ -273,17 +273,17 @@ request(State=#http_state{socket=Socket, transport=Transport, version=Version,
{<<"content-length">>, integer_to_binary(iolist_size(Body))}
|Headers3]),
Body]),
- new_stream(State#http_state{connection=Conn}, StreamRef, Method).
+ new_stream(State#http_state{connection=Conn}, StreamRef, ReplyTo, Method).
%% We are expecting a new stream.
-data(State=#http_state{out=head}, StreamRef, _, _) ->
- error_stream_closed(State, StreamRef);
+data(State=#http_state{out=head}, StreamRef, ReplyTo, _, _) ->
+ error_stream_closed(State, StreamRef, ReplyTo);
%% There are no active streams.
-data(State=#http_state{streams=[]}, StreamRef, _, _) ->
- error_stream_not_found(State, StreamRef);
+data(State=#http_state{streams=[]}, StreamRef, ReplyTo, _, _) ->
+ error_stream_not_found(State, StreamRef, ReplyTo);
%% We can only send data on the last created stream.
data(State=#http_state{socket=Socket, transport=Transport, version=Version,
- out=Out, streams=Streams}, StreamRef, IsFin, Data) ->
+ out=Out, streams=Streams}, StreamRef, ReplyTo, IsFin, Data) ->
case lists:last(Streams) of
#stream{ref=StreamRef, is_alive=true} ->
DataLength = iolist_size(Data),
@@ -316,16 +316,16 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version,
State
end;
_ ->
- error_stream_not_found(State, StreamRef)
+ error_stream_not_found(State, StreamRef, ReplyTo)
end.
%% We can't cancel anything, we can just stop forwarding messages to the owner.
-cancel(State, StreamRef) ->
+cancel(State, StreamRef, ReplyTo) ->
case is_stream(State, StreamRef) of
true ->
cancel_stream(State, StreamRef);
false ->
- error_stream_not_found(State, StreamRef)
+ error_stream_not_found(State, StreamRef, ReplyTo)
end.
%% HTTP does not provide any way to figure out what streams are unprocessed.
@@ -336,13 +336,13 @@ down(#http_state{streams=Streams}) ->
end || #stream{ref=Ref} <- Streams],
{KilledStreams, []}.
-error_stream_closed(State=#http_state{owner=Owner}, StreamRef) ->
- Owner ! {gun_error, self(), StreamRef, {badstate,
+error_stream_closed(State, StreamRef, ReplyTo) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"The stream has already been closed."}},
State.
-error_stream_not_found(State=#http_state{owner=Owner}, StreamRef) ->
- Owner ! {gun_error, self(), StreamRef, {badstate,
+error_stream_not_found(State, StreamRef, ReplyTo) ->
+ ReplyTo ! {gun_error, self(), StreamRef, {badstate,
"The stream cannot be found."}},
State.
@@ -401,12 +401,13 @@ response_io_from_headers(_, Version, _Status, Headers) ->
%% Streams.
-new_stream(State=#http_state{streams=Streams}, StreamRef, Method) ->
+new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, Method) ->
State#http_state{streams=Streams
- ++ [#stream{ref=StreamRef, method=iolist_to_binary(Method), is_alive=true}]}.
+ ++ [#stream{ref=StreamRef, reply_to=ReplyTo,
+ method=iolist_to_binary(Method), is_alive=true}]}.
is_stream(#http_state{streams=Streams}, StreamRef) ->
- lists:keymember(StreamRef, 1, Streams).
+ lists:keymember(StreamRef, #stream.ref, Streams).
cancel_stream(State=#http_state{streams=Streams}, StreamRef) ->
Streams2 = [case Ref of
@@ -425,7 +426,7 @@ end_stream(State=#http_state{streams=[_|Tail]}) ->
%% Ensure version is 1.1.
ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _) ->
error; %% @todo
-ws_upgrade(State=#http_state{socket=Socket, transport=Transport, out=head},
+ws_upgrade(State=#http_state{socket=Socket, transport=Transport, owner=Owner, out=head},
StreamRef, Host, Port, Path, Headers0, WsOpts) ->
{Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of
true -> {[{<<"sec-websocket-extensions">>,
@@ -457,7 +458,7 @@ ws_upgrade(State=#http_state{socket=Socket, transport=Transport, out=head},
end,
Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers)),
new_stream(State#http_state{connection=keepalive, out=head},
- {websocket, StreamRef, Key, GunExtensions, WsOpts}, <<"GET">>).
+ {websocket, StreamRef, Key, GunExtensions, WsOpts}, Owner, <<"GET">>).
ws_handshake(Buffer, State, Headers, Key, GunExtensions, Opts) ->
%% @todo check upgrade, connection