%% Copyright (c) 2014-2015, Loïc Hoguin %% %% Permission to use, copy, modify, and/or distribute this software for any %% purpose with or without fee is hereby granted, provided that the above %% copyright notice and this permission notice appear in all copies. %% %% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES %% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF %% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR %% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES %% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN %% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF %% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -module(gun_http). -export([check_options/1]). -export([name/0]). -export([init/4]). -export([handle/2]). -export([close/1]). -export([keepalive/1]). -export([request/7]). -export([request/8]). -export([data/4]). -export([cancel/2]). -export([down/1]). -export([ws_upgrade/7]). -type io() :: head | {body, non_neg_integer()} | body_close | body_chunked. -type websocket_info() :: {websocket, reference(), binary(), [binary()], [], gun:ws_opts()}. %% key, extensions, protocols, options -record(http_state, { owner :: pid(), socket :: inet:socket() | ssl:sslsocket(), transport :: module(), version = 'HTTP/1.1' :: cow_http:version(), connection = keepalive :: keepalive | close, buffer = <<>> :: binary(), %% Stream reference, request method and whether the stream is alive. streams = [] :: [{reference() | websocket_info(), binary(), boolean()}], in = head :: io(), in_state :: {non_neg_integer(), non_neg_integer()}, out = head :: io() }). check_options(Opts) -> do_check_options(maps:to_list(Opts)). do_check_options([]) -> ok; do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 -> do_check_options(Opts); do_check_options([{version, V}|Opts]) when V =:= 'HTTP/1.1'; V =:= 'HTTP/1.0' -> do_check_options(Opts); do_check_options([Opt|_]) -> {error, {options, {http, Opt}}}. name() -> http. init(Owner, Socket, Transport, Opts) -> Version = maps:get(version, Opts, 'HTTP/1.1'), #http_state{owner=Owner, socket=Socket, transport=Transport, version=Version}. %% Stop looping when we got no more data. handle(<<>>, State) -> State; %% Close when server responds and we don't have any open streams. handle(_, #http_state{streams=[]}) -> close; %% Wait for the full response headers before trying to parse them. handle(Data, State=#http_state{in=head, buffer=Buffer}) -> Data2 = << Buffer/binary, Data/binary >>, case binary:match(Data, <<"\r\n\r\n">>) of nomatch -> State#http_state{buffer=Data2}; {_, _} -> handle_head(Data2, State#http_state{buffer= <<>>}) end; %% Everything sent to the socket until it closes is part of the response body. handle(Data, State=#http_state{in=body_close}) -> send_data_if_alive(Data, State, nofin), State; handle(Data, State=#http_state{in=body_chunked, in_state=InState, buffer=Buffer, connection=Conn}) -> Buffer2 = << Buffer/binary, Data/binary >>, case cow_http_te:stream_chunked(Buffer2, InState) of more -> State#http_state{buffer=Buffer2}; {more, Data2, InState2} -> send_data_if_alive(Data2, State, nofin), State#http_state{buffer= <<>>, in_state=InState2}; {more, Data2, Length, InState2} when is_integer(Length) -> %% @todo See if we can recv faster than one message at a time. send_data_if_alive(Data2, State, nofin), State#http_state{buffer= <<>>, in_state=InState2}; {more, Data2, Rest, InState2} -> %% @todo See if we can recv faster than one message at a time. send_data_if_alive(Data2, State, nofin), State#http_state{buffer=Rest, in_state=InState2}; {done, _TotalLength, Rest} -> %% I suppose it doesn't hurt to append an empty binary. send_data_if_alive(<<>>, State, fin), case Conn of keepalive -> handle(Rest, end_stream(State#http_state{buffer= <<>>})); close -> close end; {done, Data2, _TotalLength, Rest} -> send_data_if_alive(Data2, State, fin), case Conn of keepalive -> handle(Rest, end_stream(State#http_state{buffer= <<>>})); close -> close end end; %% We know the length of the rest of the body. handle(Data, State=#http_state{in={body, Length}, connection=Conn}) -> DataSize = byte_size(Data), if %% More data coming. DataSize < Length -> send_data_if_alive(Data, State, nofin), State#http_state{in={body, Length - DataSize}}; %% Stream finished, no rest. DataSize =:= Length -> send_data_if_alive(Data, State, fin), case Conn of keepalive -> end_stream(State); close -> close end; %% Stream finished, rest. true -> << Body:Length/binary, Rest/bits >> = Data, send_data_if_alive(Body, State, fin), case Conn of keepalive -> handle(Rest, end_stream(State)); close -> close end end. handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion, connection=Conn, streams=[{StreamRef, Method, IsAlive}|_]}) -> {Version, Status, _, Rest} = cow_http:parse_status_line(Data), {Headers, Rest2} = cow_http:parse_headers(Rest), case {Status, StreamRef} of {101, {websocket, _, WsKey, WsExtensions, WsProtocols, WsOpts}} -> ws_handshake(Rest2, State, Headers, WsKey, WsExtensions, WsProtocols, WsOpts); _ -> In = response_io_from_headers(Method, Version, Status, Headers), IsFin = case In of head -> fin; _ -> nofin end, case IsAlive of false -> ok; true -> StreamRef2 = case StreamRef of {websocket, SR, _, _, _, _} -> SR; _ -> StreamRef end, Owner ! {gun_response, self(), StreamRef2, IsFin, Status, Headers}, ok end, Conn2 = if Conn =:= close -> close; Version =:= 'HTTP/1.0' -> close; ClientVersion =:= 'HTTP/1.0' -> close; true -> conn_from_headers(Version, Headers) end, %% We always reset in_state even if not chunked. if IsFin =:= fin, Conn2 =:= close -> close; IsFin =:= fin -> handle(Rest2, end_stream(State#http_state{in=In, in_state={0, 0}, connection=Conn2})); true -> handle(Rest2, State#http_state{in=In, in_state={0, 0}, connection=Conn2}) end end. send_data_if_alive(<<>>, _, nofin) -> ok; %% @todo What if we receive data when the HEAD method was used? send_data_if_alive(Data, #http_state{owner=Owner, streams=[{StreamRef, _, true}|_]}, IsFin) -> Owner ! {gun_data, self(), StreamRef, IsFin, Data}, ok; send_data_if_alive(_, _, _) -> ok. close(State=#http_state{in=body_close, owner=Owner, streams=[_|Tail]}) -> send_data_if_alive(<<>>, State, fin), close_streams(Owner, Tail); close(#http_state{owner=Owner, streams=Streams}) -> close_streams(Owner, Streams). close_streams(_, []) -> ok; close_streams(Owner, [{_, _, false}|Tail]) -> close_streams(Owner, Tail); close_streams(Owner, [{StreamRef, _, _}|Tail]) -> Owner ! {gun_error, self(), StreamRef, {closed, "The connection was lost."}}, close_streams(Owner, Tail). %% We can only keep-alive by sending an empty line in-between streams. keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) -> Transport:send(Socket, <<"\r\n">>), State; keepalive(State) -> State. request(State=#http_state{socket=Socket, transport=Transport, version=Version, out=head}, StreamRef, Method, Host, Port, Path, Headers) -> Headers2 = lists:keydelete(<<"transfer-encoding">>, 1, Headers), Headers3 = case lists:keymember(<<"host">>, 1, Headers) of false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2]; true -> Headers2 end, %% We use Headers2 because this is the smallest list. Conn = conn_from_headers(Version, Headers2), Out = request_io_from_headers(Headers2), Headers4 = case Out of body_chunked -> [{<<"transfer-encoding">>, <<"chunked">>}|Headers3]; _ -> Headers3 end, Transport:send(Socket, cow_http:request(Method, Path, Version, Headers4)), new_stream(State#http_state{connection=Conn, out=Out}, StreamRef, Method). request(State=#http_state{socket=Socket, transport=Transport, version=Version, out=head}, StreamRef, Method, Host, Port, Path, Headers, Body) -> Headers2 = lists:keydelete(<<"content-length">>, 1, lists:keydelete(<<"transfer-encoding">>, 1, Headers)), Headers3 = case lists:keymember(<<"host">>, 1, Headers) of false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2]; true -> Headers2 end, %% We use Headers2 because this is the smallest list. Conn = conn_from_headers(Version, Headers2), Transport:send(Socket, [ cow_http:request(Method, Path, Version, [ {<<"content-length">>, integer_to_binary(iolist_size(Body))} |Headers3]), Body]), new_stream(State#http_state{connection=Conn}, StreamRef, Method). %% We are expecting a new stream. data(State=#http_state{out=head}, StreamRef, _, _) -> error_stream_closed(State, StreamRef); %% There are no active streams. data(State=#http_state{streams=[]}, StreamRef, _, _) -> error_stream_not_found(State, StreamRef); %% We can only send data on the last created stream. data(State=#http_state{socket=Socket, transport=Transport, version=Version, out=Out, streams=Streams}, StreamRef, IsFin, Data) -> case lists:last(Streams) of {StreamRef, _, true} -> case Out of body_chunked when Version =:= 'HTTP/1.1', IsFin =:= fin -> case Data of <<>> -> Transport:send(Socket, cow_http_te:last_chunk()); _ -> Transport:send(Socket, [ cow_http_te:chunk(Data), cow_http_te:last_chunk() ]) end, State#http_state{out=head}; body_chunked when Version =:= 'HTTP/1.1' -> Transport:send(Socket, cow_http_te:chunk(Data)), State; {body, Length} when byte_size(Data) =< Length -> Transport:send(Socket, Data), Length2 = Length - byte_size(Data), if Length2 =:= 0, IsFin =:= fin -> State#http_state{out=head}; Length2 > 0, IsFin =:= nofin -> State#http_state{out={body, Length2}} end; body_chunked -> %% HTTP/1.0 Transport:send(Socket, Data), State end; _ -> error_stream_not_found(State, StreamRef) end. %% We can't cancel anything, we can just stop forwarding messages to the owner. cancel(State, StreamRef) -> case is_stream(State, StreamRef) of true -> cancel_stream(State, StreamRef); false -> error_stream_not_found(State, StreamRef) end. %% HTTP does not provide any way to figure out what streams are unprocessed. down(#http_state{streams=Streams}) -> KilledStreams = [case Ref of {websocket, Ref2, _, _, _, _} -> Ref2; _ -> Ref end || {Ref, _, _} <- Streams], {KilledStreams, []}. error_stream_closed(State=#http_state{owner=Owner}, StreamRef) -> Owner ! {gun_error, self(), StreamRef, {badstate, "The stream has already been closed."}}, State. error_stream_not_found(State=#http_state{owner=Owner}, StreamRef) -> Owner ! {gun_error, self(), StreamRef, {badstate, "The stream cannot be found."}}, State. %% Headers information retrieval. conn_from_headers(Version, Headers) -> case lists:keyfind(<<"connection">>, 1, Headers) of false when Version =:= 'HTTP/1.0' -> close; false -> keepalive; {_, ConnHd} -> ConnList = cow_http_hd:parse_connection(ConnHd), case lists:member(<<"keep-alive">>, ConnList) of true -> keepalive; false -> close end end. request_io_from_headers(Headers) -> case lists:keyfind(<<"content-length">>, 1, Headers) of {_, <<"0">>} -> head; {_, Length} -> {body, cow_http_hd:parse_content_length(Length)}; _ -> case lists:keymember(<<"content-type">>, 1, Headers) of true -> body_chunked; false -> head end end. response_io_from_headers(<<"HEAD">>, _, _, _) -> head; response_io_from_headers(_, _, 204, _) -> head; response_io_from_headers(_, Version, _Status, Headers) -> case lists:keyfind(<<"content-length">>, 1, Headers) of {_, <<"0">>} -> head; {_, Length} -> {body, cow_http_hd:parse_content_length(Length)}; _ when Version =:= 'HTTP/1.0' -> body_close; _ -> case lists:keyfind(<<"transfer-encoding">>, 1, Headers) of false -> body_close; {_, TE} -> case cow_http_hd:parse_transfer_encoding(TE) of [<<"chunked">>] -> body_chunked; [<<"identity">>] -> body_close end end end. %% Streams. new_stream(State=#http_state{streams=Streams}, StreamRef, Method) -> State#http_state{streams=Streams ++ [{StreamRef, iolist_to_binary(Method), true}]}. is_stream(#http_state{streams=Streams}, StreamRef) -> lists:keymember(StreamRef, 1, Streams). cancel_stream(State=#http_state{streams=Streams}, StreamRef) -> Streams2 = [case Ref of StreamRef -> {Ref, false}; _ -> Tuple end || Tuple = {Ref, _, _} <- Streams], State#http_state{streams=Streams2}. end_stream(State=#http_state{streams=[_|Tail]}) -> State#http_state{in=head, streams=Tail}. %% Websocket upgrade. %% Ensure version is 1.1. ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _) -> error; %% @todo ws_upgrade(State=#http_state{socket=Socket, transport=Transport, out=head}, StreamRef, Host, Port, Path, Headers, WsOpts) -> {Headers1, GunExtensions} = case maps:get(compress, WsOpts, false) of true -> {[{<<"sec-websocket-extensions">>, <<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>} |Headers], [<<"permessage-deflate">>]}; false -> {Headers, []} end, Key = cow_ws:key(), Headers2 = [ {<<"connection">>, <<"upgrade">>}, {<<"upgrade">>, <<"websocket">>}, {<<"sec-websocket-version">>, <<"13">>}, {<<"sec-websocket-key">>, Key} |Headers1 ], IsSecure = Transport:secure(), Headers3 = case lists:keymember(<<"host">>, 1, Headers) of true -> Headers2; false when Port =:= 80, not IsSecure -> [{<<"host">>, Host}|Headers2]; false when Port =:= 443, IsSecure -> [{<<"host">>, Host}|Headers2]; false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2] end, Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers3)), new_stream(State#http_state{connection=keepalive, out=head}, {websocket, StreamRef, Key, GunExtensions, [], WsOpts}, <<"GET">>). ws_handshake(Buffer, State, Headers, Key, GunExtensions, GunProtocols, Opts) -> %% @todo check upgrade, connection case lists:keyfind(<<"sec-websocket-accept">>, 1, Headers) of false -> close; {_, Accept} -> case cow_ws:encode_key(Key) of Accept -> ws_handshake_extensions(Buffer, State, Headers, GunExtensions, GunProtocols, Opts); _ -> close end end. ws_handshake_extensions(Buffer, State, Headers, GunExtensions, GunProtocols, Opts) -> case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of false -> ws_handshake_protocols(Buffer, State, Headers, #{}, GunProtocols); {_, ExtHd} -> case ws_validate_extensions(cow_http_hd:parse_sec_websocket_extensions(ExtHd), GunExtensions, #{}, Opts) of close -> close; Extensions -> ws_handshake_protocols(Buffer, State, Headers, Extensions, GunProtocols) end end. ws_validate_extensions([], _, Acc, _) -> Acc; ws_validate_extensions([{Name = <<"permessage-deflate">>, Params}|Tail], GunExts, Acc, Opts) -> case lists:member(Name, GunExts) of true -> case cow_ws:validate_permessage_deflate(Params, Acc, Opts) of {ok, Acc2} -> ws_validate_extensions(Tail, GunExts, Acc2, Opts); error -> close end; %% Fail the connection if extension was not requested. false -> close end; %% Fail the connection on unknown extension. ws_validate_extensions(_, _, _, _) -> close. %% @todo Validate protocols. ws_handshake_protocols(Buffer, State, Headers, Extensions, _GunProtocols = []) -> Protocols = [], ws_handshake_end(Buffer, State, Headers, Extensions, Protocols). ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport}, Headers, Extensions, Protocols) -> %% Send ourselves the remaining buffer, if any. _ = case Buffer of <<>> -> ok; _ -> {OK, _, _} = Transport:messages(), self() ! {OK, Socket, Buffer} end, gun_ws:init(Owner, Socket, Transport, Headers, Extensions, Protocols).