aboutsummaryrefslogblamecommitdiffstats
path: root/src/gun_http.erl
blob: d29ca2d6991a2fe5d7e12b96e308d20280d87e1b (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
                                                             














                                                                           
                  


                       
                     
                     

                    
                        
 


                                                

                                                                           

                                                                                                   



                                                  
                                                   

                                                    
                                                                                                       




                                                           




                                                                     
 


                                         


                                                                 







                                                                              

                                                 


                                                                 

                                                            
                       
                                                         


                                                                          
                                                                          


                                                                                      
                                                



























                                                                                                 
                                                                       
















                                                                             


                                                                       
                                                        

































                                                                                                                 
            
 

                                     






















                                                                         
                                                                             
                                           

                   

              
                                                                              
                                                                           




                                                                                   
                                                                                      



                                                             
                                                                            

                                                                                  

                                                                          
                                                                              
                                                                                 

                                                                      
                                                                  
                                                                                      


                                                             
                                           

                                                         
                                                                                  

                            








                                                                 

                                                                           

                                    
                                   


                                                                                          
                                                                                                         

                                                                                

                                                                                        

                                                          
                                                                   
                                                                           
                                                                                        
                                              
                                                                                
                                                                     
                                                                           
                                          
                                                                               
                                                                                   
                                                                               
                                                                                             



                                                                     






























                                                                               




                                                                        














                                                                                  



                                                                                       





















                                                                 




















































































                                                                                                                                         
%% Copyright (c) 2014-2015, Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

-module(gun_http).

-export([init/4]).
-export([handle/2]).
-export([close/1]).
-export([keepalive/1]).
-export([request/7]).
-export([request/8]).
-export([data/4]).
-export([cancel/2]).
-export([ws_upgrade/7]).

-type opts() :: [{version, cow_http:version()}].
-export_type([opts/0]).

-type io() :: head | {body, non_neg_integer()} | body_close | body_chunked.

-type websocket_info() :: {websocket, reference(), binary(), [], []}. %% key, extensions, protocols

-record(http_state, {
	owner :: pid(),
	socket :: inet:socket() | ssl:sslsocket(),
	transport :: module(),
	version = 'HTTP/1.1' :: cow_http:version(),
	connection = keepalive :: keepalive | close,
	buffer = <<>> :: binary(),
	streams = [] :: [{reference() | websocket_info(), boolean()}], %% ref + whether stream is alive
	in = head :: io(),
	in_state :: {non_neg_integer(), non_neg_integer()},
	out = head :: io()
}).

init(Owner, Socket, Transport, []) ->
	#http_state{owner=Owner, socket=Socket, transport=Transport};
init(Owner, Socket, Transport, [{version, Version}]) ->
	#http_state{owner=Owner, socket=Socket, transport=Transport,
		version=Version}.

%% Stop looping when we got no more data.
handle(<<>>, State) ->
	State;
%% Close when server responds and we don't have any open streams.
handle(_, #http_state{streams=[]}) ->
	close;
%% Wait for the full response headers before trying to parse them.
handle(Data, State=#http_state{in=head, buffer=Buffer}) ->
	Data2 = << Buffer/binary, Data/binary >>,
	case binary:match(Data, <<"\r\n\r\n">>) of
		nomatch -> State#http_state{buffer=Data2};
		{_, _} -> handle_head(Data2, State#http_state{buffer= <<>>})
	end;
%% Everything sent to the socket until it closes is part of the response body.
handle(Data, State=#http_state{in=body_close}) ->
	send_data_if_alive(Data, State, nofin),
	State;
handle(Data, State=#http_state{in=body_chunked, in_state=InState,
		buffer=Buffer, connection=Conn}) ->
	Buffer2 = << Buffer/binary, Data/binary >>,
	case cow_http_te:stream_chunked(Buffer2, InState) of
		more ->
			State#http_state{buffer=Buffer2};
		{more, Data2, InState2} ->
			send_data_if_alive(Data2, State, nofin),
			State#http_state{buffer= <<>>, in_state=InState2};
		{more, Data2, Length, InState2} when is_integer(Length) ->
			%% @todo See if we can recv faster than one message at a time.
			send_data_if_alive(Data2, State, nofin),
			State#http_state{buffer= <<>>, in_state=InState2};
		{more, Data2, Rest, InState2} ->
			%% @todo See if we can recv faster than one message at a time.
			send_data_if_alive(Data2, State, nofin),
			State#http_state{buffer=Rest, in_state=InState2};
		{done, _TotalLength, Rest} ->
			%% I suppose it doesn't hurt to append an empty binary.
			send_data_if_alive(<<>>, State, fin),
			case Conn of
				keepalive ->
					handle(Rest, end_stream(State#http_state{buffer= <<>>}));
				close ->
					close
			end;
		{done, Data2, _TotalLength, Rest} ->
			send_data_if_alive(Data2, State, fin),
			case Conn of
				keepalive ->
					handle(Rest, end_stream(State#http_state{buffer= <<>>}));
				close ->
					close
			end
	end;
%% We know the length of the rest of the body.
handle(Data, State=#http_state{in={body, Length}, connection=Conn}) ->
	DataSize = byte_size(Data),
	if
		%% More data coming.
		DataSize < Length ->
			send_data_if_alive(Data, State, nofin),
			State#http_state{in={body, Length - DataSize}};
		%% Stream finished, no rest.
		DataSize =:= Length ->
			send_data_if_alive(Data, State, fin),
			case Conn of
				keepalive -> end_stream(State);
				close -> close
			end;
		%% Stream finished, rest.
		true ->
			<< Body:Length/binary, Rest/bits >> = Data,
			send_data_if_alive(Body, State, fin),
			case Conn of
				keepalive -> handle(Rest, end_stream(State));
				close -> close
			end
	end.

handle_head(Data, State=#http_state{owner=Owner, version=ClientVersion,
		connection=Conn, streams=[{StreamRef, IsAlive}|_]}) ->
	{Version, Status, _, Rest} = cow_http:parse_status_line(Data),
	{Headers, Rest2} = cow_http:parse_headers(Rest),
	case {Status, StreamRef} of
		{101, {websocket, _, WsKey, WsExtensions, WsProtocols, WsOpts}} ->
			ws_handshake(Rest2, State, Headers, WsKey, WsExtensions, WsProtocols, WsOpts);
		_ ->
			In = io_from_headers(Version, Headers),
			IsFin = case In of head -> fin; _ -> nofin end,
			case IsAlive of
				false ->
					ok;
				true ->
					StreamRef2 = case StreamRef of
						{websocket, SR, _, _, _} -> SR;
						_ -> StreamRef
					end,
					Owner ! {gun_response, self(), StreamRef2,
						IsFin, Status, Headers},
					ok
			end,
			Conn2 = if
				Conn =:= close -> close;
				Version =:= 'HTTP/1.0' -> close;
				ClientVersion =:= 'HTTP/1.0' -> close;
				true -> conn_from_headers(Headers)
			end,
			%% We always reset in_state even if not chunked.
			if
				IsFin =:= fin, Conn2 =:= close ->
					close;
				IsFin =:= fin ->
					handle(Rest2, end_stream(State#http_state{in=In,
						in_state={0, 0}, connection=Conn2}));
				true ->
					handle(Rest2, State#http_state{in=In, in_state={0, 0}, connection=Conn2})
			end
	end.

send_data_if_alive(<<>>, _, nofin) ->
	ok;
send_data_if_alive(Data, #http_state{owner=Owner,
		streams=[{StreamRef, true}|_]}, IsFin) ->
	Owner ! {gun_data, self(), StreamRef, IsFin, Data},
	ok;
send_data_if_alive(_, _, _) ->
	ok.

close(State=#http_state{in=body_close, owner=Owner, streams=[_|Tail]}) ->
	send_data_if_alive(<<>>, State, fin),
	close_streams(Owner, Tail);
close(#http_state{owner=Owner, streams=Streams}) ->
	close_streams(Owner, Streams).

close_streams(_, []) ->
	ok;
close_streams(Owner, [{_, false}|Tail]) ->
	close_streams(Owner, Tail);
close_streams(Owner, [{StreamRef, _}|Tail]) ->
	Owner ! {gun_error, self(), StreamRef, {closed,
		"The connection was lost."}},
	close_streams(Owner, Tail).

%% We can only keep-alive by sending an empty line in-between streams.
keepalive(State=#http_state{socket=Socket, transport=Transport, out=head}) ->
	Transport:send(Socket, <<"\r\n">>),
	State;
keepalive(State) ->
	State.

request(State=#http_state{socket=Socket, transport=Transport, version=Version,
		out=head}, StreamRef, Method, Host, Port, Path, Headers) ->
	Headers2 = case Version of
		'HTTP/1.0' -> lists:keydelete(<<"transfer-encoding">>, 1, Headers);
		'HTTP/1.1' -> Headers
	end,
	Headers3 = case lists:keymember(<<"host">>, 1, Headers) of
		false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2];
		true -> Headers2
	end,
	%% We use Headers2 because this is the smallest list.
	Conn = conn_from_headers(Headers2),
	%% @todo This should probably also check for content-type like SPDY.
	Out = io_from_headers(Version, Headers2),
	Transport:send(Socket, cow_http:request(Method, Path, Version, Headers3)),
	new_stream(State#http_state{connection=Conn, out=Out}, StreamRef).

request(State=#http_state{socket=Socket, transport=Transport, version=Version,
		out=head}, StreamRef, Method, Host, Port, Path, Headers, Body) ->
	Headers2 = lists:keydelete(<<"content-length">>, 1,
		lists:keydelete(<<"transfer-encoding">>, 1, Headers)),
	Headers3 = case lists:keymember(<<"host">>, 1, Headers) of
		false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2];
		true -> Headers2
	end,
	%% We use Headers2 because this is the smallest list.
	Conn = conn_from_headers(Headers2),
	Transport:send(Socket, [
		cow_http:request(Method, Path, Version, [
			{<<"content-length">>, integer_to_list(iolist_size(Body))}
		|Headers3]),
		Body]),
	new_stream(State#http_state{connection=Conn}, StreamRef).

%% We are expecting a new stream.
data(State=#http_state{out=head}, _, _, _) ->
	error_stream_closed(State);
%% There are no active streams.
data(State=#http_state{streams=[]}, _, _, _) ->
	error_stream_not_found(State);
%% We can only send data on the last created stream.
data(State=#http_state{socket=Socket, transport=Transport, version=Version,
		out=Out, streams=Streams}, StreamRef, IsFin, Data) ->
	case lists:last(Streams) of
		{StreamRef, true} ->
			case Out of
				body_chunked when Version =:= 'HTTP/1.1', IsFin =:= fin ->
					case Data of
						<<>> ->
							Transport:send(Socket, cow_http_te:last_chunk());
						_ ->
							Transport:send(Socket, [
								cow_http_te:chunk(Data),
								cow_http_te:last_chunk()
							])
					end,
					State#http_state{out=head};
				body_chunked when Version =:= 'HTTP/1.1' ->
					Transport:send(Socket, cow_http_te:chunk(Data)),
					State;
				{body, Length} when byte_size(Data) =< Length ->
					Transport:send(Socket, Data),
					Length2 = Length - byte_size(Data),
					if
						Length2 =:= 0, IsFin =:= fin ->
							State#http_state{out=head};
						Length2 > 0, IsFin =:= nofin ->
							State#http_state{out={body, Length2}}
					end;
				body_chunked -> %% HTTP/1.0
					Transport:send(Socket, Data),
					State
			end;
		{_, _} ->
			error_stream_not_found(State)
	end.

%% We can't cancel anything, we can just stop forwarding messages to the owner.
cancel(State, StreamRef) ->
	case is_stream(State, StreamRef) of
		true ->
			cancel_stream(State, StreamRef);
		false ->
			error_stream_not_found(State)
	end.

error_stream_closed(State=#http_state{owner=Owner}) ->
	Owner ! {gun_error, self(), {badstate,
		"The stream has already been closed."}},
	State.

error_stream_not_found(State=#http_state{owner=Owner}) ->
	Owner ! {gun_error, self(), {badstate,
		"The stream cannot be found."}},
	State.

%% Headers information retrieval.

conn_from_headers(Headers) ->
	case lists:keyfind(<<"connection">>, 1, Headers) of
		false ->
			keepalive;
		{_, ConnHd} ->
			ConnList = cow_http_hd:parse_connection(ConnHd),
			case lists:member(<<"keep-alive">>, ConnList) of
				true -> keepalive;
				false -> close
			end
	end.

io_from_headers(Version, Headers) ->
	case lists:keyfind(<<"content-length">>, 1, Headers) of
		{_, <<"0">>} ->
			head;
		{_, Length} ->
			{body, cow_http_hd:parse_content_length(Length)};
		_ when Version =:= 'HTTP/1.0' ->
			body_close;
		_ ->
			case lists:keyfind(<<"transfer-encoding">>, 1, Headers) of
				false ->
					head;
				{_, TE} ->
					case cow_http_hd:parse_transfer_encoding(TE) of
						[<<"chunked">>] -> body_chunked;
						[<<"identity">>] -> body_close
					end
			end
	end.

%% Streams.

new_stream(State=#http_state{streams=Streams}, StreamRef) ->
	State#http_state{streams=Streams ++ [{StreamRef, true}]}.

is_stream(#http_state{streams=Streams}, StreamRef) ->
	lists:keymember(StreamRef, 1, Streams).

cancel_stream(State=#http_state{streams=Streams}, StreamRef) ->
	Streams2 = [case Ref of
		StreamRef ->
			{Ref, false};
		_ ->
			Tuple
	end || Tuple = {Ref, _} <- Streams],
	State#http_state{streams=Streams2}.

end_stream(State=#http_state{streams=[_|Tail]}) ->
	State#http_state{in=head, streams=Tail}.

%% Websocket upgrade.

%% Ensure version is 1.1.
ws_upgrade(#http_state{version='HTTP/1.0'}, _, _, _, _, _, _) ->
	error; %% @todo
ws_upgrade(State=#http_state{socket=Socket, transport=Transport, out=head},
		StreamRef, Host, Port, Path, Headers, WsOpts) ->
	%% @todo Add option for setting protocol.
	{ExtHeaders, GunExtensions} = case maps:get(compress, WsOpts, false) of
		true -> {[{<<"sec-websocket-extensions">>, <<"permessage-deflate; client_max_window_bits; server_max_window_bits=15">>}],
			[<<"permessage-deflate">>]};
		false -> {[], []}
	end,
	Key = cow_ws:key(),
	Headers2 = [
		{<<"connection">>, <<"upgrade">>},
		{<<"upgrade">>, <<"websocket">>},
		{<<"sec-websocket-version">>, <<"13">>},
		{<<"sec-websocket-key">>, Key}
		|ExtHeaders
	],
	Headers3 = case lists:keymember(<<"host">>, 1, Headers) of
		false -> [{<<"host">>, [Host, $:, integer_to_binary(Port)]}|Headers2];
		true -> Headers2
	end,
	Transport:send(Socket, cow_http:request(<<"GET">>, Path, 'HTTP/1.1', Headers3)),
	new_stream(State#http_state{connection=keepalive, out=head},
		{websocket, StreamRef, Key, GunExtensions, [], WsOpts}).

ws_handshake(Buffer, State, Headers, Key, GunExtensions, GunProtocols, Opts) ->
	%% @todo check upgrade, connection
	case lists:keyfind(<<"sec-websocket-accept">>, 1, Headers) of
		false ->
			close;
		{_, Accept} ->
			case cow_ws:encode_key(Key) of
				Accept -> ws_handshake_extensions(Buffer, State, Headers, GunExtensions, GunProtocols, Opts);
				_ -> close
			end
	end.

ws_handshake_extensions(Buffer, State, Headers, GunExtensions, GunProtocols, Opts) ->
	case lists:keyfind(<<"sec-websocket-extensions">>, 1, Headers) of
		false ->
			ws_handshake_protocols(Buffer, State, Headers, #{}, GunProtocols);
		{_, ExtHd} ->
			case ws_validate_extensions(cow_http_hd:parse_sec_websocket_extensions(ExtHd), GunExtensions, #{}, Opts) of
				close -> close;
				Extensions -> ws_handshake_protocols(Buffer, State, Headers, Extensions, GunProtocols)
			end
	end.

ws_validate_extensions([], _, Acc, _) ->
	Acc;
ws_validate_extensions([{Name = <<"permessage-deflate">>, Params}|Tail], GunExts, Acc, Opts) ->
	case lists:member(Name, GunExts) of
		true ->
			case cow_ws:validate_permessage_deflate(Params, Acc, Opts) of
				{ok, Acc2} -> ws_validate_extensions(Tail, GunExts, Acc2, Opts);
				error -> close
			end;
		%% Fail the connection if extension was not requested.
		false ->
			close
	end;
%% Fail the connection on unknown extension.
ws_validate_extensions(_, _, _, _) ->
	close.

%% @todo Validate protocols.
ws_handshake_protocols(Buffer, State, _Headers, Extensions, _GunProtocols = []) ->
	Protocols = [],
	ws_handshake_end(Buffer, State, Extensions, Protocols).

ws_handshake_end(Buffer, #http_state{owner=Owner, socket=Socket, transport=Transport}, Extensions, Protocols) ->
	%% Send ourselves the remaining buffer, if any.
	_ = case Buffer of
		<<>> ->
			ok;
		_ ->
			{OK, _, _} = Transport:messages(),
			self() ! {OK, Socket, Buffer}
	end,
	gun_ws:init(Owner, Socket, Transport, Extensions, Protocols).