aboutsummaryrefslogblamecommitdiffstats
path: root/src/gun.erl
blob: 8f056aaab444fc63f29f18d012a22963b3d07cb1 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
                                                             















                                                                           
                  





















                      


                  







                         


                         







                        


                          


                        


                                
 
                                          
                                          





                                                         

                                        




                                        
                










                                                  
                                                                                  
                               



              

                                               


                             

                                                       










                                                                                    

                                              
                                                             
                        
                                                          
                        
                                                              






                                                            
                         


                                                       
                            
                      

                                           


            
                                             

                                                   

                                                        


                                                        
                                          

                                                

                                                     


                                                     
                                           

                                                 

                                                      


                                                      
                                              

                                                    

                                                         


                                                         
                                                       

                                                       

                                                                 


                                                             
                                                      

                                                      

                                                                


                                                            
                                                     

                                                     

                                                               


                                                           
                                                                   



                                                                            

                                                                             




                                                                                  

                  
                                                            



                                                               




































































                                                                               





































                                                          

                       
                                       





                                                    
                                        

                                        

                                                   



                                                            


                                                      

           






                                                     














                                                    




                                                            

                                                                             





                                                                         
                              





                                                                                 

                                                   







                                                                                     
                            
                                                                                        




                                                                                   

                                                                  



                                                   


                                                            
                            
                                                                                        





                                                                                   
                             
                   
                    






                                                                                 
                                                             









                                                                 
                                                    


                                                                 
                                        






                                                                                                 
                                                   



                                                                                     
                                                   













                                                                                     
















                                                                                          
                                               


                                                     
                                                                  
                                                                        


                                                                                              
                                                              


                                                                         
                                                              






















                                                                                      
                                  








                                                                                 
                                                  
                                                                  
                                                                        
                                                                                              
                                       


                                                                                 













                                                      
%% Copyright (c) 2013-2014, Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

-module(gun).

%% Connection.
-export([open/2]).
-export([open/3]).
-export([close/1]).
-export([shutdown/1]).

%% Requests.
-export([delete/2]).
-export([delete/3]).
-export([get/2]).
-export([get/3]).
-export([head/2]).
-export([head/3]).
-export([options/2]).
-export([options/3]).
-export([patch/3]).
-export([patch/4]).
-export([post/3]).
-export([post/4]).
-export([put/3]).
-export([put/4]).
-export([request/4]).
-export([request/5]).

%% Streaming data.
-export([data/4]).

%% Awaiting gun messages.
-export([await/2]).
-export([await/3]).
-export([await/4]).
-export([await_body/2]).
-export([await_body/3]).
-export([await_body/4]).

%% Flushing gun messages.
-export([flush/1]).

%% Cancelling a stream.
-export([cancel/2]).

%% Websocket.
-export([ws_upgrade/2]).
-export([ws_upgrade/3]).
-export([ws_send/2]).

%% Debug.
-export([dbg_send_raw/2]).

%% Internals.
-export([start_link/4]).
-export([init/5]).
-export([system_continue/3]).
-export([system_terminate/4]).
-export([system_code_change/4]).

-type conn_type() :: ssl | tcp | tcp_spdy.
-type headers() :: [{binary(), iodata()}].

-type ws_close_code() :: 1000..4999.
-type ws_frame() :: close | ping | pong
	| {text | binary | close | ping | pong, iodata()}
	| {close, ws_close_code(), iodata()}.

-type opts() :: [{http, gun_http:opts()}
	| {keepalive, pos_integer()}
	| {retry, non_neg_integer()}
	| {retry_timeout, pos_integer()}
	| {type, conn_type()}].
-export_type([opts/0]).

-record(state, {
	parent :: pid(),
	owner :: pid(),
	host :: inet:hostname(),
	port :: inet:port_number(),
	keepalive :: pos_integer(),
	type :: conn_type(),
	retry :: non_neg_integer(),
	retry_timeout :: pos_integer(),
	socket :: inet:socket() | ssl:sslsocket(),
	transport :: module(),
	protocol :: module(),
	proto_opts :: gun_http:opts(), %% @todo Make a tuple with SPDY and WS too.
	protocol_state :: any()
}).

%% Connection.

-spec open(inet:hostname(), inet:port_number())
	-> {ok, pid()} | {error, any()}.
open(Host, Port) ->
	open(Host, Port, []).

-spec open(inet:hostname(), inet:port_number(), opts())
	-> {ok, pid()} | {error, any()}.
open(Host, Port, Opts) ->
	case open_opts(Opts) of
		ok ->
			supervisor:start_child(gun_sup, [self(), Host, Port, Opts]);
		Error ->
			Error
	end.

%% @private
open_opts([]) ->
	ok;
open_opts([{http, O}|Opts]) when is_list(O) ->
	open_opts(Opts);
open_opts([{keepalive, K}|Opts]) when is_integer(K), K > 0 ->
	open_opts(Opts);
open_opts([{retry, R}|Opts]) when is_integer(R), R >= 0 ->
	open_opts(Opts);
open_opts([{retry_timeout, T}|Opts]) when is_integer(T) > 0 ->
	open_opts(Opts);
open_opts([{type, T}|Opts])
		when T =:= tcp; T =:= tcp_spdy; T =:= ssl ->
	open_opts(Opts);
open_opts([Opt|_]) ->
	{error, {options, Opt}}.

-spec close(pid()) -> ok.
close(ServerPid) ->
	supervisor:terminate_child(gun_sup, ServerPid).

-spec shutdown(pid()) -> ok.
shutdown(ServerPid) ->
	_ = ServerPid ! {shutdown, self()},
	ok.

%% Requests.

-spec delete(pid(), iodata()) -> reference().
delete(ServerPid, Path) ->
	request(ServerPid, <<"DELETE">>, Path, []).

-spec delete(pid(), iodata(), headers()) -> reference().
delete(ServerPid, Path, Headers) ->
	request(ServerPid, <<"DELETE">>, Path, Headers).

-spec get(pid(), iodata()) -> reference().
get(ServerPid, Path) ->
	request(ServerPid, <<"GET">>, Path, []).

-spec get(pid(), iodata(), headers()) -> reference().
get(ServerPid, Path, Headers) ->
	request(ServerPid, <<"GET">>, Path, Headers).

-spec head(pid(), iodata()) -> reference().
head(ServerPid, Path) ->
	request(ServerPid, <<"HEAD">>, Path, []).

-spec head(pid(), iodata(), headers()) -> reference().
head(ServerPid, Path, Headers) ->
	request(ServerPid, <<"HEAD">>, Path, Headers).

-spec options(pid(), iodata()) -> reference().
options(ServerPid, Path) ->
	request(ServerPid, <<"OPTIONS">>, Path, []).

-spec options(pid(), iodata(), headers()) -> reference().
options(ServerPid, Path, Headers) ->
	request(ServerPid, <<"OPTIONS">>, Path, Headers).

-spec patch(pid(), iodata(), headers()) -> reference().
patch(ServerPid, Path, Headers) ->
	request(ServerPid, <<"PATCH">>, Path, Headers).

-spec patch(pid(), iodata(), headers(), iodata()) -> reference().
patch(ServerPid, Path, Headers, Body) ->
	request(ServerPid, <<"PATCH">>, Path, Headers, Body).

-spec post(pid(), iodata(), headers()) -> reference().
post(ServerPid, Path, Headers) ->
	request(ServerPid, <<"POST">>, Path, Headers).

-spec post(pid(), iodata(), headers(), iodata()) -> reference().
post(ServerPid, Path, Headers, Body) ->
	request(ServerPid, <<"POST">>, Path, Headers, Body).

-spec put(pid(), iodata(), headers()) -> reference().
put(ServerPid, Path, Headers) ->
	request(ServerPid, <<"PUT">>, Path, Headers).

-spec put(pid(), iodata(), headers(), iodata()) -> reference().
put(ServerPid, Path, Headers, Body) ->
	request(ServerPid, <<"PUT">>, Path, Headers, Body).

-spec request(pid(), iodata(), iodata(), headers()) -> reference().
request(ServerPid, Method, Path, Headers) ->
	StreamRef = make_ref(),
	_ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers},
	StreamRef.

-spec request(pid(), iodata(), iodata(), headers(), iodata()) -> reference().
request(ServerPid, Method, Path, Headers, Body) ->
	StreamRef = make_ref(),
	_ = ServerPid ! {request, self(), StreamRef, Method, Path, Headers, Body},
	StreamRef.

%% Streaming data.

-spec data(pid(), reference(), fin | nofin, iodata()) -> ok.
data(ServerPid, StreamRef, IsFin, Data) ->
	_ = ServerPid ! {data, self(), StreamRef, IsFin, Data},
	ok.

%% Awaiting gun messages.

await(ServerPid, StreamRef) ->
	MRef = monitor(process, ServerPid),
	Res = await(ServerPid, StreamRef, 5000, MRef),
	demonitor(MRef, [flush]),
	Res.

await(ServerPid, StreamRef, MRef) when is_reference(MRef) ->
	await(ServerPid, StreamRef, 5000, MRef);
await(ServerPid, StreamRef, Timeout) ->
	MRef = monitor(process, ServerPid),
	Res = await(ServerPid, StreamRef, Timeout, MRef),
	demonitor(MRef, [flush]),
	Res.

await(ServerPid, StreamRef, Timeout, MRef) ->
	receive
		{gun_response, ServerPid, StreamRef, IsFin, Status, Headers} ->
			{response, IsFin, Status, Headers};
		{gun_data, ServerPid, StreamRef, IsFin, Data} ->
			{data, IsFin, Data};
		{gun_push, ServerPid, StreamRef, AssocToStreamRef,
				Method, Host, Path, Headers} ->
			{push, AssocToStreamRef, Method, Host, Path, Headers};
		{gun_error, ServerPid, StreamRef, Reason} ->
			{error, Reason};
		{gun_error, ServerPid, Reason} ->
			{error, Reason};
		{'DOWN', MRef, process, ServerPid, Reason} ->
			{error, Reason}
	after Timeout ->
		{error, timeout}
	end.

await_body(ServerPid, StreamRef) ->
	MRef = monitor(process, ServerPid),
	Res = await_body(ServerPid, StreamRef, 5000, MRef, <<>>),
	demonitor(MRef, [flush]),
	Res.

await_body(ServerPid, StreamRef, MRef) when is_reference(MRef) ->
	await_body(ServerPid, StreamRef, 5000, MRef, <<>>);
await_body(ServerPid, StreamRef, Timeout) ->
	MRef = monitor(process, ServerPid),
	Res = await_body(ServerPid, StreamRef, Timeout, MRef, <<>>),
	demonitor(MRef, [flush]),
	Res.

await_body(ServerPid, StreamRef, Timeout, MRef) ->
	await_body(ServerPid, StreamRef, Timeout, MRef, <<>>).

await_body(ServerPid, StreamRef, Timeout, MRef, Acc) ->
	receive
		{gun_data, ServerPid, StreamRef, nofin, Data} ->
			await_body(ServerPid, StreamRef, Timeout, MRef,
				<< Acc/binary, Data/binary >>);
		{gun_data, ServerPid, StreamRef, fin, Data} ->
			{ok, << Acc/binary, Data/binary >>};
		{gun_error, ServerPid, StreamRef, Reason} ->
			{error, Reason};
		{gun_error, ServerPid, Reason} ->
			{error, Reason};
		{'DOWN', MRef, process, ServerPid, Reason} ->
			{error, Reason}
	after Timeout ->
		{error, timeout}
	end.

-spec flush(pid() | reference()) -> ok.
flush(ServerPid) when is_pid(ServerPid) ->
	flush_pid(ServerPid);
flush(StreamRef) ->
	flush_ref(StreamRef).

flush_pid(ServerPid) ->
	receive
		{gun_response, ServerPid, _, _, _, _} ->
			flush_pid(ServerPid);
		{gun_data, ServerPid, _, _, _} ->
			flush_pid(ServerPid);
		{gun_push, ServerPid, _, _, _, _, _, _} ->
			flush_pid(ServerPid);
		{gun_error, ServerPid, _, _} ->
			flush_pid(ServerPid);
		{gun_error, ServerPid, _} ->
			flush_pid(ServerPid);
		{'DOWN', _, process, ServerPid, _} ->
			flush_pid(ServerPid)
	after 0 ->
		ok
	end.

flush_ref(StreamRef) ->
	receive
		{gun_response, _, StreamRef, _, _, _} ->
			flush_ref(StreamRef);
		{gun_data, _, StreamRef, _, _} ->
			flush_ref(StreamRef);
		{gun_push, _, StreamRef, _, _, _, _, _} ->
			flush_ref(StreamRef);
		{gun_error, _, StreamRef, _} ->
			flush_ref(StreamRef)
	after 0 ->
		ok
	end.

%% Cancelling a stream.

-spec cancel(pid(), reference()) -> ok.
cancel(ServerPid, StreamRef) ->
	_ = ServerPid ! {cancel, self(), StreamRef},
	ok.

%% Websocket.

-spec ws_upgrade(pid(), iodata()) -> ok.
ws_upgrade(ServerPid, Path) ->
	ws_upgrade(ServerPid, Path, []).

-spec ws_upgrade(pid(), iodata(), headers()) -> ok.
ws_upgrade(ServerPid, Path, Headers) ->
	_ = ServerPid ! {ws_upgrade, self(), Path, Headers},
	ok.

-spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok.
ws_send(ServerPid, Frames) ->
	_ = ServerPid ! {ws_send, self(), Frames},
	ok.

%% Debug.

-spec dbg_send_raw(pid(), iodata()) -> ok.
dbg_send_raw(ServerPid, Data) ->
	_ = ServerPid ! {dbg_send_raw, self(), Data},
	ok.

%% Internals.

start_link(Owner, Host, Port, Opts) ->
	proc_lib:start_link(?MODULE, init,
		[self(), Owner, Host, Port, Opts]).

%% @doc Faster alternative to proplists:get_value/3.
%% @private
get_value(Key, Opts, Default) ->
	case lists:keyfind(Key, 1, Opts) of
		{_, Value} -> Value;
		_ -> Default
	end.

init(Parent, Owner, Host, Port, Opts) ->
	ok = proc_lib:init_ack(Parent, {ok, self()}),
	HTTPOpts = get_value(http, Opts, []),
	Keepalive = get_value(keepalive, Opts, 5000),
	Retry = get_value(retry, Opts, 5),
	RetryTimeout = get_value(retry_timeout, Opts, 5000),
	%% Default to TCP if port 80 is given, otherwise SSL.
	Type = get_value(type, Opts, if Port =:= 80 -> tcp; true -> ssl end),
	connect(#state{parent=Parent, owner=Owner, host=Host, port=Port,
		keepalive=Keepalive, type=Type, retry=Retry,
		proto_opts=HTTPOpts, retry_timeout=RetryTimeout}, Retry).

connect(State=#state{owner=Owner, host=Host, port=Port, type=ssl,
		proto_opts=HTTPOpts}, Retries) ->
	Transport = ranch_ssl,
	%% R15 support.
	HasNPN = erlang:function_exported(ssl, negotiated_next_protocol, 1),
	Opts = [binary, {active, false}
		|[{client_preferred_next_protocols,
			{client, [<<"spdy/3">>, <<"http/1.1">>], <<"http/1.1">>}}
			|| HasNPN]],
	case Transport:connect(Host, Port, Opts) of
		{ok, Socket} ->
			{Protocol, ProtoOpts} = case HasNPN of
				false ->
					{gun_http, HTTPOpts};
				true ->
					case ssl:negotiated_next_protocol(Socket) of
						{ok, <<"spdy/3">>} -> {gun_spdy, []};
						_ -> {gun_http, HTTPOpts}
					end
			end,
			ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
			before_loop(State#state{socket=Socket, transport=Transport,
				protocol=Protocol, protocol_state=ProtoState});
		{error, _} ->
			retry_loop(State, Retries - 1)
	end;
connect(State=#state{owner=Owner, host=Host, port=Port, type=Type,
		proto_opts=HTTPOpts}, Retries) ->
	Transport = ranch_tcp,
	Opts = [binary, {active, false}],
	case Transport:connect(Host, Port, Opts) of
		{ok, Socket} ->
			{Protocol, ProtoOpts} = case Type of
				tcp_spdy -> {gun_spdy, []};
				tcp -> {gun_http, HTTPOpts}
			end,
			ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
			before_loop(State#state{socket=Socket, transport=Transport,
				protocol=Protocol, protocol_state=ProtoState});
		{error, _} ->
			retry_loop(State, Retries - 1)
	end.

%% Too many retries, give up.
retry_loop(_, 0) ->
	error(gone);
retry_loop(State=#state{parent=Parent, retry_timeout=RetryTimeout}, Retries) ->
	_ = erlang:send_after(RetryTimeout, self(), retry),
	receive
		retry ->
			connect(State, Retries);
		{system, From, Request} ->
			sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
				{retry_loop, State, Retries})
	end.

before_loop(State=#state{keepalive=Keepalive}) ->
	_ = erlang:send_after(Keepalive, self(), keepalive),
	loop(State).

loop(State=#state{parent=Parent, owner=Owner, host=Host,
		retry=Retry, socket=Socket, transport=Transport,
		protocol=Protocol, protocol_state=ProtoState}) ->
	{OK, Closed, Error} = Transport:messages(),
	Transport:setopts(Socket, [{active, once}]),
	receive
		{OK, Socket, Data} ->
			case Protocol:handle(Data, ProtoState) of
				close ->
					Transport:close(Socket),
					retry_loop(State#state{socket=undefined,
						transport=undefined, protocol=undefined}, Retry);
				ProtoState2 ->
					loop(State#state{protocol_state=ProtoState2})
			end;
		{Closed, Socket} ->
			Protocol:close(ProtoState),
			Transport:close(Socket),
			retry_loop(State#state{socket=undefined, transport=undefined,
				protocol=undefined}, Retry);
		{Error, Socket, _} ->
			Protocol:close(ProtoState),
			Transport:close(Socket),
			retry_loop(State#state{socket=undefined, transport=undefined,
				protocol=undefined}, Retry);
		keepalive ->
			ProtoState2 = Protocol:keepalive(ProtoState),
			before_loop(State#state{protocol_state=ProtoState2});
		{request, Owner, StreamRef, Method, Path, Headers} ->
			ProtoState2 = Protocol:request(ProtoState,
				StreamRef, Method, Host, Path, Headers),
			loop(State#state{protocol_state=ProtoState2});
		{request, Owner, StreamRef, Method, Path, Headers, Body} ->
			ProtoState2 = Protocol:request(ProtoState,
				StreamRef, Method, Host, Path, Headers, Body),
			loop(State#state{protocol_state=ProtoState2});
		{data, Owner, StreamRef, IsFin, Data} ->
			ProtoState2 = Protocol:data(ProtoState,
				StreamRef, IsFin, Data),
			loop(State#state{protocol_state=ProtoState2});
		{cancel, Owner, StreamRef} ->
			ProtoState2 = Protocol:cancel(ProtoState, StreamRef),
			loop(State#state{protocol_state=ProtoState2});
		{ws_upgrade, Owner, Path, Headers} when Protocol =/= gun_spdy ->
			%% @todo
			ProtoState2 = Protocol:ws_upgrade(ProtoState,
				Path, Headers),
			ws_loop(State#state{protocol=gun_ws, protocol_state=ProtoState2});
		{shutdown, Owner} ->
			%% @todo Protocol:shutdown?
			ok;
		{system, From, Request} ->
			sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
				{loop, State});
		{dbg_send_raw, Owner, Data} ->
			Transport:send(Socket, Data),
			loop(State);
		Any when is_tuple(Any), is_pid(element(2, Any)) ->
			element(2, Any) ! {gun_error, self(), {notowner,
				"Operations are restricted to the owner of the connection."}},
			loop(State);
		{ws_upgrade, _, _, _} ->
			Owner ! {gun_error, self(), {badstate,
				"Websocket over SPDY isn't supported."}},
			loop(State);
		{ws_send, _, _} ->
			Owner ! {gun_error, self(), {badstate,
				"Connection needs to be upgraded to Websocket "
				"before the gun:ws_send/1 function can be used."}},
			loop(State);
		Any ->
			error_logger:error_msg("Unexpected message: ~w~n", [Any])
	end.

ws_loop(State=#state{parent=Parent, owner=Owner, retry=Retry, socket=Socket,
		transport=Transport, protocol=Protocol, protocol_state=ProtoState}) ->
	{OK, Closed, Error} = Transport:messages(),
	ok = Transport:setopts(Socket, [{active, once}]),
	receive
		{OK, Socket, Data} ->
			ProtoState2 = Protocol:handle(ProtoState, Data),
			ws_loop(State#state{protocol_state=ProtoState2});
		{Closed, Socket} ->
			Transport:close(Socket),
			retry_loop(State#state{socket=undefined, transport=undefined,
				protocol=undefined}, Retry);
		{Error, Socket, _} ->
			Transport:close(Socket),
			retry_loop(State#state{socket=undefined, transport=undefined,
				protocol=undefined}, Retry);
		%% @todo keepalive
		{ws_send, Owner, Frames} when is_list(Frames) ->
			todo; %% @todo
		{ws_send, Owner, Frame} ->
			{todo, Frame}; %% @todo
		{shutdown, Owner} ->
			%% @todo Protocol:shutdown?
			ok;
		{system, From, Request} ->
			sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
				{ws_loop, State});
		Any when is_tuple(Any), is_pid(element(2, Any)) ->
			element(2, Any) ! {gun_error, self(), {notowner,
				"Operations are restricted to the owner of the connection."}},
			ws_loop(State);
		Any ->
			error_logger:error_msg("Unexpected message: ~w~n", [Any])
	end.

system_continue(_, _, {retry_loop, State, Retry}) ->
	retry_loop(State, Retry);
system_continue(_, _, {loop, State}) ->
	loop(State);
system_continue(_, _, {ws_loop, State}) ->
	ws_loop(State).

-spec system_terminate(any(), _, _, _) -> no_return().
system_terminate(Reason, _, _, _) ->
	exit(Reason).

system_code_change(Misc, _, _, _) ->
	{ok, Misc}.