diff options
Diffstat (limited to 'src/gun_http.erl')
-rw-r--r-- | src/gun_http.erl | 81 |
1 files changed, 41 insertions, 40 deletions
diff --git a/src/gun_http.erl b/src/gun_http.erl index 8ebd42e..2388182 100644 --- a/src/gun_http.erl +++ b/src/gun_http.erl @@ -20,10 +20,10 @@ -export([handle/2]). -export([close/1]). -export([keepalive/1]). --export([request/7]). -export([request/8]). --export([data/4]). --export([cancel/2]). +-export([request/9]). +-export([data/5]). +-export([cancel/3]). -export([down/1]). -export([ws_upgrade/7]). @@ -34,6 +34,7 @@ -record(stream, { ref :: reference() | websocket_info(), + reply_to :: pid(), method :: binary(), is_alive :: boolean(), handler_state :: undefined | gun_content_handler:state() @@ -47,7 +48,6 @@ content_handlers :: gun_content_handler:opt(), connection = keepalive :: keepalive | close, buffer = <<>> :: binary(), - %% Stream reference, request method and whether the stream is alive. streams = [] :: [#stream{}], in = head :: io(), in_state = {0, 0} :: {non_neg_integer(), non_neg_integer()}, @@ -159,9 +159,10 @@ handle(Data, State=#http_state{in={body, Length}, connection=Conn}) -> end end. -handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion, +handle_head(Data, State=#http_state{version=ClientVersion, content_handlers=Handlers0, connection=Conn, - streams=[Stream=#stream{ref=StreamRef, method=Method, is_alive=IsAlive}|Tail]}) -> + streams=[Stream=#stream{ref=StreamRef, reply_to=ReplyTo, + method=Method, is_alive=IsAlive}|Tail]}) -> {Version, Status, _, Rest} = cow_http:parse_status_line(Data), {Headers, Rest2} = cow_http:parse_headers(Rest), case {Status, StreamRef} of @@ -178,13 +179,12 @@ handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion, {websocket, SR, _, _, _} -> SR; _ -> StreamRef end, - Owner ! {gun_response, self(), StreamRef2, + ReplyTo ! {gun_response, self(), StreamRef2, IsFin, Status, Headers}, - %% @todo Change to ReplyTo. case IsFin of fin -> undefined; nofin -> - gun_content_handler:init(Owner, StreamRef, + gun_content_handler:init(ReplyTo, StreamRef, Status, Headers, Handlers0) end end, @@ -219,20 +219,20 @@ send_data_if_alive(Data, State=#http_state{streams=[Stream=#stream{ send_data_if_alive(_, State, _) -> State. -close(State=#http_state{in=body_close, owner=Owner, streams=[_|Tail]}) -> +close(State=#http_state{in=body_close, streams=[_|Tail]}) -> _ = send_data_if_alive(<<>>, State, fin), - close_streams(Owner, Tail); -close(#http_state{owner=Owner, streams=Streams}) -> - close_streams(Owner, Streams). + close_streams(Tail); +close(#http_state{streams=Streams}) -> + close_streams(Streams). -close_streams(_, []) -> +close_streams([]) -> ok; -close_streams(Owner, [#stream{is_alive=false}|Tail]) -> - close_streams(Owner, Tail); -close_streams(Owner, [#stream{ref=StreamRef}|Tail]) -> - Owner ! {gun_error, self(), StreamRef, {closed, +close_streams([#stream{is_alive=false}|Tail]) -> + close_streams(Tail); +close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) -> + ReplyTo ! {gun_error, self(), StreamRef, {closed, "The connection was lost."}}, - close_streams(Owner, Tail). + close_streams(Tail). %% We can only keep-alive by sending an empty line in-between streams. keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) -> @@ -242,7 +242,7 @@ keepalive(State) -> State. request(State=#http_state{socket=Socket, transport=Transport, version=Version, - out=head}, StreamRef, Method, Host, Port, Path, Headers) -> + out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) -> Headers2 = lists:keydelete(<<"transfer-encoding">>, 1, Headers), Headers3 = case lists:keymember(<<"host">>, 1, Headers) of false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2]; @@ -256,10 +256,10 @@ request(State=#http_state{socket=Socket, transport=Transport, version=Version, _ -> Headers3 end, Transport:send(Socket, cow_http:request(Method, Path, Version, Headers4)), - new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, Method). + new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, ReplyTo, Method). request(State=#http_state{socket=Socket, transport=Transport, version=Version, - out=head}, StreamRef, Method, Host, Port, Path, Headers, Body) -> + out=head}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body) -> Headers2 = lists:keydelete(<<"content-length">>, 1, lists:keydelete(<<"transfer-encoding">>, 1, Headers)), Headers3 = case lists:keymember(<<"host">>, 1, Headers) of @@ -273,17 +273,17 @@ request(State=#http_state{socket=Socket, transport=Transport, version=Version, {<<"content-length">>, integer_to_binary(iolist_size(Body))} |Headers3]), Body]), - new_stream(State#http_state{connection=Conn}, StreamRef, Method). + new_stream(State#http_state{connection=Conn}, StreamRef, ReplyTo, Method). %% We are expecting a new stream. -data(State=#http_state{out=head}, StreamRef, _, _) -> - error_stream_closed(State, StreamRef); +data(State=#http_state{out=head}, StreamRef, ReplyTo, _, _) -> + error_stream_closed(State, StreamRef, ReplyTo); %% There are no active streams. -data(State=#http_state{streams=[]}, StreamRef, _, _) -> - error_stream_not_found(State, StreamRef); +data(State=#http_state{streams=[]}, StreamRef, ReplyTo, _, _) -> + error_stream_not_found(State, StreamRef, ReplyTo); %% We can only send data on the last created stream. data(State=#http_state{socket=Socket, transport=Transport, version=Version, - out=Out, streams=Streams}, StreamRef, IsFin, Data) -> + out=Out, streams=Streams}, StreamRef, ReplyTo, IsFin, Data) -> case lists:last(Streams) of #stream{ref=StreamRef, is_alive=true} -> DataLength = iolist_size(Data), @@ -316,16 +316,16 @@ data(State=#http_state{socket=Socket, transport=Transport, version=Version, State end; _ -> - error_stream_not_found(State, StreamRef) + error_stream_not_found(State, StreamRef, ReplyTo) end. %% We can't cancel anything, we can just stop forwarding messages to the owner. -cancel(State, StreamRef) -> +cancel(State, StreamRef, ReplyTo) -> case is_stream(State, StreamRef) of true -> cancel_stream(State, StreamRef); false -> - error_stream_not_found(State, StreamRef) + error_stream_not_found(State, StreamRef, ReplyTo) end. %% HTTP does not provide any way to figure out what streams are unprocessed. @@ -336,13 +336,13 @@ down(#http_state{streams=Streams}) -> end || #stream{ref=Ref} <- Streams], {KilledStreams, []}. -error_stream_closed(State=#http_state{owner=Owner}, StreamRef) -> - Owner ! {gun_error, self(), StreamRef, {badstate, +error_stream_closed(State, StreamRef, ReplyTo) -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream has already been closed."}}, State. -error_stream_not_found(State=#http_state{owner=Owner}, StreamRef) -> - Owner ! {gun_error, self(), StreamRef, {badstate, +error_stream_not_found(State, StreamRef, ReplyTo) -> + ReplyTo ! {gun_error, self(), StreamRef, {badstate, "The stream cannot be found."}}, State. @@ -401,12 +401,13 @@ response_io_from_headers(_, Version, _Status, Headers) -> %% Streams. -new_stream(State=#http_state{streams=Streams}, StreamRef, Method) -> +new_stream(State=#http_state{streams=Streams}, StreamRef, ReplyTo, Method) -> State#http_state{streams=Streams - ++ [#stream{ref=StreamRef, method=iolist_to_binary(Method), is_alive=true}]}. + ++ [#stream{ref=StreamRef, reply_to=ReplyTo, + method=iolist_to_binary(Method), is_alive=true}]}. is_stream(#http_state{streams=Streams}, StreamRef) -> - lists:keymember(StreamRef, 1, Streams). + lists:keymember(StreamRef, #stream.ref, Streams). cancel_stream(State=#http_state{streams=Streams}, StreamRef) -> Streams2 = [case Ref of @@ -425,7 +426,7 @@ end_stream(State=#http_state{streams=[_|Tail]}) -> %% Ensure version is 1.1. ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _) -> error; %% @todo -ws_upgrade(State=#http_state{socket=Socket, transport=Transport, out=head}, +ws_upgrade(State=#http_state{socket=Socket, transport=Transport, owner=Owner, out=head}, StreamRef, Host, Port, Path, Headers0, WsOpts) -> {Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of true -> {[{<<"sec-websocket-extensions">>, @@ -457,7 +458,7 @@ ws_upgrade(State=#http_state{socket=Socket, transport=Transport, out=head}, end, Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers)), new_stream(State#http_state{connection=keepalive, out=head}, - {websocket, StreamRef, Key, GunExtensions, WsOpts}, <<"GET">>). + {websocket, StreamRef, Key, GunExtensions, WsOpts}, Owner, <<"GET">>). ws_handshake(Buffer, State, Headers, Key, GunExtensions, Opts) -> %% @todo check upgrade, connection |