aboutsummaryrefslogblamecommitdiffstats
path: root/src/gun_spdy.erl
blob: b7306a87d37f2e2dd0fda84cc1b70e703062f257 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
                                                             














                                                                           
                           
                  
                  
                    
                   
                       
                     


                     
                  



                                
                          
















                                                  
                      
                                             
 

                       




                                                                    

               
                                        











                                                                            

                                                     

            
                                                                         





                                                                                      

                                                                           
                                                                   

                                                                             





                                                                               
                                                                         
                                                                                
                                                                 
                                                               

                                                 
                                                                        
                                                                                  
                                                               
                                                                                                         
                                                                                                            
                                                                                                  





                                                                                                      




                                                                              




                                                                         
                                                                         





                                                                                      

                                                                          
                                                               
                                                                   

                                                                          
                                                               
                                                 




                                                                               
                                                            
                                                 

                                                                         



























                                                                             
                                                                                 
                                        



                                                                                            

                                                                   

              



                                              

                                      
 
                    
           

                                                                 
                                             
                            





                                                               
                             
                                                                        
                                                                                              


                                                                                    

                                                        
                                                                             
                                                                           

                                                          
                                                            
                             
                                                                        
                                                                                                    


                                                                              


                                                     
                                                                                    

                                                   
                                                                             

                                                          









                                                                                 
                                                           
                                                   

                                                   
                                                                       








                                                                                         
                                                                         


                                                             
                                      




                                                                                      
                                                                         

            




                                                              

                                                           


                                                        

                                                           



                                                
                                                            
 
                                                          
                                                      
                                                                   



                                                           
                                                     

                                                             
                                                       

















                                                                     
%% Copyright (c) 2013-2015, Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

-module(gun_spdy).

-export([check_options/1]).
-export([name/0]).
-export([init/4]).
-export([handle/2]).
-export([close/1]).
-export([keepalive/1]).
-export([request/8]).
-export([request/9]).
-export([data/5]).
-export([cancel/3]).
-export([down/1]).

-record(stream, {
	id :: non_neg_integer(),
	ref :: reference(),
	reply_to :: pid(),
	in :: boolean(), %% true = open
	out :: boolean(), %% true = open
	version :: binary()
}).

-record(spdy_state, {
	owner :: pid(),
	socket :: inet:socket() | ssl:sslsocket(),
	transport :: module(),
	buffer = <<>> :: binary(),
	zdef :: zlib:zstream(),
	zinf :: zlib:zstream(),
	streams = [] :: [#stream{}],
	stream_id = 1 :: non_neg_integer(),
	ping_id = 1 :: non_neg_integer()
}).

check_options(Opts) ->
	do_check_options(maps:to_list(Opts)).

do_check_options([]) ->
	ok;
do_check_options([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
	do_check_options(Opts);
do_check_options([Opt|_]) ->
	{error, {options, {spdy, Opt}}}.

name() -> spdy.

init(Owner, Socket, Transport, _Opts) ->
	#spdy_state{owner=Owner, socket=Socket, transport=Transport,
		zdef=cow_spdy:deflate_init(), zinf=cow_spdy:inflate_init()}.

handle(Data, State=#spdy_state{buffer=Buffer}) ->
	handle_loop(<< Buffer/binary, Data/binary >>,
		State#spdy_state{buffer= <<>>}).

handle_loop(Data, State=#spdy_state{zinf=Zinf}) ->
	case cow_spdy:split(Data) of
		{true, Frame, Rest} ->
			P = cow_spdy:parse(Frame, Zinf),
			handle_frame(Rest, State, P);
		false ->
			State#spdy_state{buffer=Data}
	end.

handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport},
		{data, StreamID, IsFin, Data}) ->
	case get_stream_by_id(StreamID, State) of
		#stream{in=false} ->
			Transport:send(Socket,
				cow_spdy:rst_stream(StreamID, stream_already_closed)),
			handle_loop(Rest, delete_stream(StreamID, State));
		S = #stream{ref=StreamRef, reply_to=ReplyTo} when IsFin ->
			ReplyTo ! {gun_data, self(), StreamRef, fin, Data},
			handle_loop(Rest, in_fin_stream(S, State));
		#stream{ref=StreamRef, reply_to=ReplyTo} ->
			ReplyTo ! {gun_data, self(), StreamRef, nofin, Data},
			handle_loop(Rest, State);
		false ->
			Transport:send(Socket,
				cow_spdy:rst_stream(StreamID, invalid_stream)),
			handle_loop(Rest, delete_stream(StreamID, State))
	end;
handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport},
		{syn_stream, StreamID, AssocToStreamID, IsFin, IsUnidirectional,
		_, Method, Scheme, Host, Path, Version, Headers})
		when AssocToStreamID =/= 0, IsUnidirectional ->
	case get_stream_by_id(StreamID, State) of
		false ->
			case get_stream_by_id(AssocToStreamID, State) of
				#stream{ref=AssocToStreamRef, reply_to=ReplyTo} ->
					StreamRef = make_ref(),
					ReplyTo ! {gun_push, self(), AssocToStreamRef, StreamRef, Method,
						iolist_to_binary([Scheme, <<"://">>, Host, Path]), Headers},
					handle_loop(Rest, new_stream(StreamID, StreamRef, ReplyTo,
						not IsFin, false, Version, State));
				false ->
					Transport:send(Socket,
						cow_spdy:rst_stream(AssocToStreamID, invalid_stream)),
					handle_loop(Rest, State)
			end;
		#stream{} ->
			Transport:send(Socket,
				cow_spdy:rst_stream(StreamID, stream_in_use)),
			handle_loop(Rest, State)
	end;
handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport},
		{syn_stream, StreamID, _, _, _, _, _, _, _, _, _, _}) ->
	Transport:send(Socket,
		cow_spdy:rst_stream(StreamID, protocol_error)),
	handle_loop(Rest, State);
handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport},
		{syn_reply, StreamID, IsFin, Status, _, Headers}) ->
	case get_stream_by_id(StreamID, State) of
		#stream{in=false} ->
			Transport:send(Socket,
				cow_spdy:rst_stream(StreamID, stream_already_closed)),
			handle_loop(Rest, delete_stream(StreamID, State));
		S = #stream{ref=StreamRef, reply_to=ReplyTo} when IsFin ->
			ReplyTo ! {gun_response, self(), StreamRef, fin,
				parse_status(Status), Headers},
			handle_loop(Rest, in_fin_stream(S, State));
		#stream{ref=StreamRef, reply_to=ReplyTo} ->
			ReplyTo ! {gun_response, self(), StreamRef, nofin,
				parse_status(Status), Headers},
			handle_loop(Rest, State);
		false ->
			Transport:send(Socket,
				cow_spdy:rst_stream(StreamID, invalid_stream)),
			handle_loop(Rest, delete_stream(StreamID, State))
	end;
handle_frame(Rest, State, {rst_stream, StreamID, Status}) ->
	case get_stream_by_id(StreamID, State) of
		#stream{ref=StreamRef, reply_to=ReplyTo} ->
			ReplyTo ! {gun_error, self(), StreamRef, Status},
			handle_loop(Rest, delete_stream(StreamID, State));
		false ->
			handle_loop(Rest, State)
	end;
handle_frame(Rest, State, {settings, ClearSettings, Settings}) ->
	error_logger:error_msg("Ignored SETTINGS control frame ~p ~p~n",
		[ClearSettings, Settings]),
	handle_loop(Rest, State);
%% Server PING.
handle_frame(Rest, State=#spdy_state{socket=Socket, transport=Transport},
		{ping, PingID}) when PingID rem 2 =:= 0 ->
	Transport:send(Socket, cow_spdy:ping(PingID)),
	handle_loop(Rest, State);
%% Client PING.
handle_frame(Rest, State, {ping, _}) ->
	handle_loop(Rest, State);
handle_frame(Rest, State, {goaway, LastGoodStreamID, Status}) ->
	error_logger:error_msg("Ignored GOAWAY control frame ~p ~p~n",
		[LastGoodStreamID, Status]),
	handle_loop(Rest, State);
handle_frame(Rest, State, {headers, StreamID, IsFin, Headers}) ->
	error_logger:error_msg("Ignored HEADERS control frame ~p ~p ~p~n",
		[StreamID, IsFin, Headers]),
	handle_loop(Rest, State);
handle_frame(Rest, State, {window_update, StreamID, DeltaWindowSize}) ->
	error_logger:error_msg("Ignored WINDOW_UPDATE control frame ~p ~p~n",
		[StreamID, DeltaWindowSize]),
	handle_loop(Rest, State);
handle_frame(_, #spdy_state{streams=Streams, socket=Socket, transport=Transport},
		{error, badprotocol}) ->
	%% Because a particular stream is unknown,
	%% we're sending the error message to all streams.
	Reason = {badprotocol, "The remote endpoint sent invalid data."},
	_ = [ReplyTo ! {gun_error, self(), Reason} || #stream{reply_to=ReplyTo} <- Streams],
	%% @todo LastGoodStreamID
	Transport:send(Socket, cow_spdy:goaway(0, protocol_error)),
	close.

parse_status(Status) ->
	<< Code:3/binary, _/bits >> = Status,
	list_to_integer(binary_to_list(Code)).

close(#spdy_state{streams=Streams}) ->
	close_streams(Streams).

close_streams([]) ->
	ok;
close_streams([#stream{ref=StreamRef, reply_to=ReplyTo}|Tail]) ->
	ReplyTo ! {gun_error, self(), StreamRef, {closed,
		"The connection was lost."}},
	close_streams(Tail).

keepalive(State=#spdy_state{socket=Socket, transport=Transport,
		ping_id=PingID}) ->
	Transport:send(Socket, cow_spdy:ping(PingID)),
	State#spdy_state{ping_id=PingID + 2}.

%% @todo Always https scheme?
request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef,
		stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers) ->
	{Host2, Headers2} = prepare_request(Headers, Host, Port),
	Out = (false =/= lists:keyfind(<<"content-type">>, 1, Headers2))
		orelse (false =/= lists:keyfind(<<"content-length">>, 1, Headers2)),
	Transport:send(Socket, cow_spdy:syn_stream(Zdef,
		StreamID, 0, not Out, false, 0,
		Method, <<"https">>, Host2, Path, <<"HTTP/1.1">>, Headers2)),
	new_stream(StreamID, StreamRef, ReplyTo, true, Out, <<"HTTP/1.1">>,
		State#spdy_state{stream_id=StreamID + 2}).

%% @todo Handle Body > 16MB. (split it out into many frames)
%% @todo Always https scheme?
request(State=#spdy_state{socket=Socket, transport=Transport, zdef=Zdef,
		stream_id=StreamID}, StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body) ->
	{Host2, Headers2} = prepare_request(Headers, Host, Port),
	Headers3 = lists:keystore(<<"content-length">>, 1, Headers2,
		{<<"content-length">>, integer_to_binary(iolist_size(Body))}),
	Transport:send(Socket, [
		cow_spdy:syn_stream(Zdef,
			StreamID, 0, false, false, 0,
			Method, <<"https">>, Host2, Path, <<"HTTP/1.1">>, Headers3),
		cow_spdy:data(StreamID, true, Body)
	]),
	new_stream(StreamID, StreamRef, ReplyTo, true, false, <<"HTTP/1.1">>,
		State#spdy_state{stream_id=StreamID + 2}).

prepare_request(Headers, Host, Port) ->
	Headers2 = lists:keydelete(<<"keep-alive">>, 1,
		lists:keydelete(<<"proxy-connection">>, 1,
			lists:keydelete(<<"transfer-encoding">>, 1,
				lists:keydelete(<<"connection">>, 1, Headers)))),
	case lists:keytake(<<"host">>, 1, Headers2) of
		false -> {[Host, $:, integer_to_binary(Port)], Headers2};
		{value, {_, Host1}, Headers3} -> {Host1, Headers3}
	end.

data(State=#spdy_state{socket=Socket, transport=Transport},
		StreamRef, ReplyTo, IsFin, Data) ->
	case get_stream_by_ref(StreamRef, State) of
		#stream{out=false} ->
			error_stream_closed(State, StreamRef, ReplyTo);
		S = #stream{} ->
			IsFin2 = IsFin =:= fin,
			Transport:send(Socket, cow_spdy:data(S#stream.id, IsFin2, Data)),
			if IsFin2 ->
				out_fin_stream(S, State);
			true ->
				State
			end;
		false ->
			error_stream_not_found(State, StreamRef, ReplyTo)
	end.

cancel(State=#spdy_state{socket=Socket, transport=Transport},
		StreamRef, ReplyTo) ->
	case get_stream_by_ref(StreamRef, State) of
		#stream{id=StreamID} ->
			Transport:send(Socket, cow_spdy:rst_stream(StreamID, cancel)),
			delete_stream(StreamID, State);
		false ->
			error_stream_not_found(State, StreamRef, ReplyTo)
	end.

%% @todo Add unprocessed streams when GOAWAY handling is done.
down(#spdy_state{streams=Streams}) ->
	KilledStreams = [Ref || #stream{ref=Ref} <- Streams],
	{KilledStreams, []}.

error_stream_closed(State, StreamRef, ReplyTo) ->
	ReplyTo ! {gun_error, self(), StreamRef, {badstate,
		"The stream has already been closed."}},
	State.

error_stream_not_found(State, StreamRef, ReplyTo) ->
	ReplyTo ! {gun_error, self(), StreamRef, {badstate,
		"The stream cannot be found."}},
	State.

%% Streams.
%% @todo probably change order of args and have state first?

new_stream(StreamID, StreamRef, ReplyTo, In, Out, Version,
		State=#spdy_state{streams=Streams}) ->
	New = #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo,
		in=In, out=Out, version=Version},
	State#spdy_state{streams=[New|Streams]}.

get_stream_by_id(StreamID, #spdy_state{streams=Streams}) ->
	lists:keyfind(StreamID, #stream.id, Streams).

get_stream_by_ref(StreamRef, #spdy_state{streams=Streams}) ->
	lists:keyfind(StreamRef, #stream.ref, Streams).

delete_stream(StreamID, State=#spdy_state{streams=Streams}) ->
	Streams2 = lists:keydelete(StreamID, #stream.id, Streams),
	State#spdy_state{streams=Streams2}.

in_fin_stream(S=#stream{out=false}, State) ->
	delete_stream(S#stream.id, State);
in_fin_stream(S, State=#spdy_state{streams=Streams}) ->
	Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams,
		S#stream{in=false}),
	State#spdy_state{streams=Streams2}.

out_fin_stream(S=#stream{in=false}, State) ->
	delete_stream(S#stream.id, State);
out_fin_stream(S, State=#spdy_state{streams=Streams}) ->
	Streams2 = lists:keyreplace(S#stream.id, #stream.id, Streams,
		S#stream{out=false}),
	State#spdy_state{streams=Streams2}.