aboutsummaryrefslogblamecommitdiffstats
path: root/src/gun.erl
blob: 1659ee3383edd15a469e2d53685c4ebb1515d47b (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
                                                             














                                                                           



                                                                      
              
                  
                  
                       
                  





                      
                    

                 
                 

                  
                  

                     
                     

                   
                   

                  
                  

                 
                 

                     
                     
 


                  




                     






                         


                      
 


                         





                        
                        



                        
                           


                                
 
                                          





                                                         
                  









                                                                               
  
                       
                                                               
 




                                                    

                                                             





                                           







                                                    



                                                                             
                      
                         
  

                           
                       


                                                             
  

                            
                        
                              
  

                             
                  
                     

                                           
  
                          
 
                

                        
                                 
                                                    
                                   

                                                           
                                                
                       

                                                              

                              

                                



              
                                                                   
                                        
                   
                              
 
                                                                           
                                        
                                                                           






                                              





                                                                   
                                                 
                     








                                                                                           

            
                    
           

                                                    

                                                                        
                                                                      
                                                 




                                            






                                                                       



                                                             
                                                                                           










                                                               
                                                                      
                            

                                                                
                                                                 


                                                            






                                                                    
                         

                                




                                                         
                                                          




                                                       

                           







                                             
                                                              
          
                                 








                                                                                 
 
                         


                                                       
                            
                      

                                           


            
                                             

                                                   

                                                        


                                                        



                                                                       
                                          

                                                

                                                     


                                                     



                                                                    
                                           

                                                 

                                                      


                                                      



                                                                     
                                              

                                                    

                                                         


                                                         



                                                                        
                                                       

                                                       

                                                                 


                                                             



                                                                             
                                                      

                                                      

                                                                


                                                            



                                                                            
                                                     

                                                     

                                                               


                                                           



                                                                           
                                                                   
                                            
                                                             

                                                                             
                                                  

                                                             
                                     

                                                                                         
                               

                                                                                   

                  

                  
                                                            



                                                               
















                                                                                  

                         

                              













                                                            
                                    

                                             

                                                                      



                                                                               

                                                                 

                                                                                       

































                                                                       



                                                                     









                                                             
                                                              





                                              
                                                                                       







                                                    
                                                                                      









                                                             







                                          



                                                    

                                                   



                                                        

                                                  





                                                          
                                                    
                                             
                                            
                                             







                                                     

                                                   



                                                        

                                                  


                                                          



                                                    




                                            

                       
                                       



                                                    


                                                          

             
                                                 
                              
                                        
 
                                                            
                                       


                                                                       


                                                                       
                                        


                                                                             
 

                                                                                   


                                                      




                                      
                                                   

                                                   









                                                                   
                                        
                                                     

                                                                              

                               
            
                                           


                                                                                
 
                              

                            
                                                                                                         


                                                             
                                                                                                       
                               
                                                                                          
                                                                          
                                                          
                            
                                                                  

                                                                      
            
                                                                                                 

                                                     
                                                                                                       
                               

                                                                                            
                                                                  
                            
                                                                  

                                                                      

            









                                                                                      








                                                                                                   

                                                                                         
 
                                      
                                 
                                                                                           






                                               
                                                                      
                        
                                       
 

                                                                                  

                        
                                                          

                                                                                 
                                                             

            
                                                          
                                                        

                                       
                                       


                                                         



                                                                    
                                                      
 


                                                                                      
                                                   
                                                    


                                                                 



                                                                  

                                   
                                                   
                                                

                                            
                                                   
                                                
                                                     





                                               


                                                                             
                                                                             
                                                                  
                                                                                       
                                                                      
                                                                             
                                                                  
                                                                                             
                                                                      


                                                                             
                                                               
                                                                 
                                                                      


















                                                                                                    

                                                                                                             

                                                                                      
                                                                      


                                                                                                             


                                                                                                                    
                                                                                                   
                                                                                                                    

                                                                      


                                                   


                                                             
                                           

                                                                                 
                                               


                                                                               
                                    



                                                                               
                                  
                                                              


                                                                                   

                                                                               



                                                                                              
                      

                                                                                  

            










                                                                                   
















                                                                                   











                                                                                            
                                                                                   

                                                                                      
                                                    

                                     


                                                                 
                                                            


                                                                                        

                                                

                                            
                                                
                                                     




                                                                
                                          


                                                                
                                                            


                                                                                        
                                    
                                                                        
                           


                                                               
                                           

                                                                                 
                                                  
                                                                  
                                                                        
                                                                                              
                                       


                                                                                 
 
                                   




                                                       












                                                      
%% Copyright (c) 2013-2018, Loïc Hoguin <[email protected]>
%%
%% Permission to use, copy, modify, and/or distribute this software for any
%% purpose with or without fee is hereby granted, provided that the above
%% copyright notice and this permission notice appear in all copies.
%%
%% THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
%% WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
%% MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
%% ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
%% WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
%% ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
%% OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

-module(gun).

-ifdef(OTP_RELEASE).
-compile({nowarn_deprecated_function, [{erlang, get_stacktrace, 0}]}).
-endif.

%% Connection.
-export([open/2]).
-export([open/3]).
-export([open_unix/2]).
-export([info/1]).
-export([close/1]).
-export([shutdown/1]).

%% Requests.
-export([delete/2]).
-export([delete/3]).
-export([delete/4]).
-export([get/2]).
-export([get/3]).
-export([get/4]).
-export([head/2]).
-export([head/3]).
-export([head/4]).
-export([options/2]).
-export([options/3]).
-export([options/4]).
-export([patch/3]).
-export([patch/4]).
-export([patch/5]).
-export([post/3]).
-export([post/4]).
-export([post/5]).
-export([put/3]).
-export([put/4]).
-export([put/5]).
-export([request/4]).
-export([request/5]).
-export([request/6]).

%% Streaming data.
-export([data/4]).

%% Tunneling.
-export([connect/2]).
-export([connect/3]).
-export([connect/4]).

%% Awaiting gun messages.
-export([await/2]).
-export([await/3]).
-export([await/4]).
-export([await_body/2]).
-export([await_body/3]).
-export([await_body/4]).
-export([await_up/1]).
-export([await_up/2]).
-export([await_up/3]).

%% Flushing gun messages.
-export([flush/1]).

%% Cancelling a stream.
-export([cancel/2]).

%% Websocket.
-export([ws_upgrade/2]).
-export([ws_upgrade/3]).
-export([ws_upgrade/4]).
-export([ws_send/2]).

%% Internals.
-export([start_link/4]).
-export([proc_lib_hack/5]).
-export([system_continue/3]).
-export([system_terminate/4]).
-export([system_code_change/4]).

-type headers() :: [{binary(), iodata()}].

-type ws_close_code() :: 1000..4999.
-type ws_frame() :: close | ping | pong
	| {text | binary | close | ping | pong, iodata()}
	| {close, ws_close_code(), iodata()}.

-type opts() :: #{
	connect_timeout => timeout(),
	http_opts       => http_opts(),
	http2_opts      => http2_opts(),
	protocols       => [http | http2],
	retry           => non_neg_integer(),
	retry_timeout   => pos_integer(),
	trace           => boolean(),
	transport       => tcp | tls | ssl,
	transport_opts  => [gen_tcp:connect_option()] | [ssl:connect_option()],
	ws_opts         => ws_opts()
}.
-export_type([opts/0]).
%% @todo Add an option to disable/enable the notowner behavior.

-type connect_destination() :: #{
	host := inet:hostname() | inet:ip_address(),
	port := inet:port_number(),
	username => iodata(),
	password => iodata(),
	protocol => http | http2, %% @todo Remove in Gun 2.0.
	protocols => [http | http2],
	transport => tcp | tls,
	tls_opts => [ssl:connect_option()],
	tls_handshake_timeout => timeout()
}.
-export_type([connect_destination/0]).

-type intermediary() :: #{
	type := connect,
	host := inet:hostname() | inet:ip_address(),
	port := inet:port_number(),
	transport := tcp | tls,
	protocol := http | http2
}.

%% @todo When/if HTTP/2 CONNECT gets implemented, we will want an option here
%% to indicate that the request must be sent on an existing CONNECT stream.
%% This is of course not required for HTTP/1.1 since the CONNECT takes over
%% the entire connection.
-type req_opts() :: #{
	reply_to => pid()
}.
-export_type([req_opts/0]).

-type http_opts() :: #{
	keepalive             => timeout(),
	transform_header_name => fun((binary()) -> binary()),
	version               => 'HTTP/1.1' | 'HTTP/1.0'
}.
-export_type([http_opts/0]).

-type http2_opts() :: #{
	keepalive => timeout()
}.
-export_type([http2_opts/0]).

%% @todo keepalive
-type ws_opts() :: #{
	compress => boolean(),
	protocols => [{binary(), module()}]
}.
-export_type([ws_opts/0]).

-record(state, {
	parent :: pid(),
	owner :: pid(),
	owner_ref :: reference(),
	host :: inet:hostname() | inet:ip_address(),
	port :: inet:port_number(),
	origin_host :: inet:hostname() | inet:ip_address(),
	origin_port :: inet:port_number(),
	intermediaries = [] :: [intermediary()],
	opts :: opts(),
	keepalive_ref :: undefined | reference(),
	socket :: undefined | inet:socket() | ssl:sslsocket(),
	transport :: module(),
	protocol :: module(),
	protocol_state :: any(),
	last_error :: any()
}).

%% Connection.

-spec open(inet:hostname() | inet:ip_address(), inet:port_number())
	-> {ok, pid()} | {error, any()}.
open(Host, Port) ->
	open(Host, Port, #{}).

-spec open(inet:hostname() | inet:ip_address(), inet:port_number(), opts())
	-> {ok, pid()} | {error, any()}.
open(Host, Port, Opts) when is_list(Host); is_atom(Host); is_tuple(Host) ->
	do_open(Host, Port, Opts).

-spec open_unix(Path::string(), opts())
	-> {ok, pid()} | {error, any()}.
open_unix(SocketPath, Opts) ->
	do_open({local, SocketPath}, 0, Opts).

do_open(Host, Port, Opts0) ->
	%% We accept both ssl and tls but only use tls in the code.
	Opts = case Opts0 of
		#{transport := ssl} -> Opts0#{transport => tls};
		_ -> Opts0
	end,
	case check_options(maps:to_list(Opts)) of
		ok ->
			case supervisor:start_child(gun_sup, [self(), Host, Port, Opts]) of
				OK = {ok, ServerPid} ->
					consider_tracing(ServerPid, Opts),
					OK;
				StartError ->
					StartError
			end;
		CheckError ->
			CheckError
	end.

check_options([]) ->
	ok;
check_options([{connect_timeout, infinity}|Opts]) ->
	check_options(Opts);
check_options([{connect_timeout, T}|Opts]) when is_integer(T), T >= 0 ->
	check_options(Opts);
check_options([{http_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) ->
	case gun_http:check_options(ProtoOpts) of
		ok ->
			check_options(Opts);
		Error ->
			Error
	end;
check_options([{http2_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) ->
	case gun_http2:check_options(ProtoOpts) of
		ok ->
			check_options(Opts);
		Error ->
			Error
	end;
check_options([Opt = {protocols, L}|Opts]) when is_list(L) ->
	Len = length(L),
	case length(lists:usort(L)) of
		Len when Len > 0 ->
			Check = lists:usort([(P =:= http) orelse (P =:= http2) || P <- L]),
			case Check of
				[true] ->
					check_options(Opts);
				_ ->
					{error, {options, Opt}}
			end;
		_ ->
			{error, {options, Opt}}
	end;
check_options([{retry, R}|Opts]) when is_integer(R), R >= 0 ->
	check_options(Opts);
check_options([{retry_timeout, T}|Opts]) when is_integer(T), T >= 0 ->
	check_options(Opts);
check_options([{trace, B}|Opts]) when B =:= true; B =:= false ->
	check_options(Opts);
check_options([{transport, T}|Opts]) when T =:= tcp; T =:= tls ->
	check_options(Opts);
check_options([{transport_opts, L}|Opts]) when is_list(L) ->
	check_options(Opts);
check_options([{ws_opts, ProtoOpts}|Opts]) when is_map(ProtoOpts) ->
	case gun_ws:check_options(ProtoOpts) of
		ok ->
			check_options(Opts);
		Error ->
			Error
	end;
check_options([Opt|_]) ->
	{error, {options, Opt}}.

consider_tracing(ServerPid, #{trace := true}) ->
	dbg:start(),
	dbg:tracer(),
	dbg:tpl(gun, [{'_', [], [{return_trace}]}]),
	dbg:tpl(gun_http, [{'_', [], [{return_trace}]}]),
	dbg:tpl(gun_http2, [{'_', [], [{return_trace}]}]),
	dbg:tpl(gun_ws, [{'_', [], [{return_trace}]}]),
	dbg:p(ServerPid, all);
consider_tracing(_, _) ->
	ok.

-spec info(pid()) -> map().
info(ServerPid) ->
	{_, #state{
		socket=Socket,
		transport=Transport,
		protocol=Protocol,
		origin_host=OriginHost,
		origin_port=OriginPort,
		intermediaries=Intermediaries
	}} = sys:get_state(ServerPid),
	{ok, {SockIP, SockPort}} = Transport:sockname(Socket),
	#{
		socket => Socket,
		transport => Transport:name(),
		protocol => Protocol:name(),
		sock_ip => SockIP,
		sock_port => SockPort,
		origin_host => OriginHost,
		origin_port => OriginPort,
		%% Intermediaries are listed in the order data goes through them.
		intermediaries => lists:reverse(Intermediaries)
	}.

-spec close(pid()) -> ok.
close(ServerPid) ->
	supervisor:terminate_child(gun_sup, ServerPid).

-spec shutdown(pid()) -> ok.
shutdown(ServerPid) ->
	_ = ServerPid ! {shutdown, self()},
	ok.

%% Requests.

-spec delete(pid(), iodata()) -> reference().
delete(ServerPid, Path) ->
	request(ServerPid, <<"DELETE">>, Path, []).

-spec delete(pid(), iodata(), headers()) -> reference().
delete(ServerPid, Path, Headers) ->
	request(ServerPid, <<"DELETE">>, Path, Headers).

-spec delete(pid(), iodata(), headers(), req_opts()) -> reference().
delete(ServerPid, Path, Headers, ReqOpts) ->
	request(ServerPid, <<"DELETE">>, Path, Headers, <<>>, ReqOpts).

-spec get(pid(), iodata()) -> reference().
get(ServerPid, Path) ->
	request(ServerPid, <<"GET">>, Path, []).

-spec get(pid(), iodata(), headers()) -> reference().
get(ServerPid, Path, Headers) ->
	request(ServerPid, <<"GET">>, Path, Headers).

-spec get(pid(), iodata(), headers(), req_opts()) -> reference().
get(ServerPid, Path, Headers, ReqOpts) ->
	request(ServerPid, <<"GET">>, Path, Headers, <<>>, ReqOpts).

-spec head(pid(), iodata()) -> reference().
head(ServerPid, Path) ->
	request(ServerPid, <<"HEAD">>, Path, []).

-spec head(pid(), iodata(), headers()) -> reference().
head(ServerPid, Path, Headers) ->
	request(ServerPid, <<"HEAD">>, Path, Headers).

-spec head(pid(), iodata(), headers(), req_opts()) -> reference().
head(ServerPid, Path, Headers, ReqOpts) ->
	request(ServerPid, <<"HEAD">>, Path, Headers, <<>>, ReqOpts).

-spec options(pid(), iodata()) -> reference().
options(ServerPid, Path) ->
	request(ServerPid, <<"OPTIONS">>, Path, []).

-spec options(pid(), iodata(), headers()) -> reference().
options(ServerPid, Path, Headers) ->
	request(ServerPid, <<"OPTIONS">>, Path, Headers).

-spec options(pid(), iodata(), headers(), req_opts()) -> reference().
options(ServerPid, Path, Headers, ReqOpts) ->
	request(ServerPid, <<"OPTIONS">>, Path, Headers, <<>>, ReqOpts).

-spec patch(pid(), iodata(), headers()) -> reference().
patch(ServerPid, Path, Headers) ->
	request(ServerPid, <<"PATCH">>, Path, Headers).

-spec patch(pid(), iodata(), headers(), iodata()) -> reference().
patch(ServerPid, Path, Headers, Body) ->
	request(ServerPid, <<"PATCH">>, Path, Headers, Body).

-spec patch(pid(), iodata(), headers(), iodata(), req_opts()) -> reference().
patch(ServerPid, Path, Headers, Body, ReqOpts) ->
	request(ServerPid, <<"PATCH">>, Path, Headers, Body, ReqOpts).

-spec post(pid(), iodata(), headers()) -> reference().
post(ServerPid, Path, Headers) ->
	request(ServerPid, <<"POST">>, Path, Headers).

-spec post(pid(), iodata(), headers(), iodata()) -> reference().
post(ServerPid, Path, Headers, Body) ->
	request(ServerPid, <<"POST">>, Path, Headers, Body).

-spec post(pid(), iodata(), headers(), iodata(), req_opts()) -> reference().
post(ServerPid, Path, Headers, Body, ReqOpts) ->
	request(ServerPid, <<"POST">>, Path, Headers, Body, ReqOpts).

-spec put(pid(), iodata(), headers()) -> reference().
put(ServerPid, Path, Headers) ->
	request(ServerPid, <<"PUT">>, Path, Headers).

-spec put(pid(), iodata(), headers(), iodata()) -> reference().
put(ServerPid, Path, Headers, Body) ->
	request(ServerPid, <<"PUT">>, Path, Headers, Body).

-spec put(pid(), iodata(), headers(), iodata(), req_opts()) -> reference().
put(ServerPid, Path, Headers, Body, ReqOpts) ->
	request(ServerPid, <<"PUT">>, Path, Headers, Body, ReqOpts).

-spec request(pid(), iodata(), iodata(), headers()) -> reference().
request(ServerPid, Method, Path, Headers) ->
	request(ServerPid, Method, Path, Headers, <<>>, #{}).

-spec request(pid(), iodata(), iodata(), headers(), iodata()) -> reference().
request(ServerPid, Method, Path, Headers, Body) ->
	request(ServerPid, Method, Path, Headers, Body, #{}).

%% @todo Accept header names as maps.
-spec request(pid(), iodata(), iodata(), headers(), iodata(), req_opts()) -> reference().
request(ServerPid, Method, Path, Headers, Body, ReqOpts) ->
	StreamRef = make_ref(),
	ReplyTo = maps:get(reply_to, ReqOpts, self()),
	_ = ServerPid ! {request, ReplyTo, StreamRef, Method, Path, Headers, Body},
	StreamRef.

%% Streaming data.

-spec data(pid(), reference(), fin | nofin, iodata()) -> ok.
data(ServerPid, StreamRef, IsFin, Data) ->
	_ = ServerPid ! {data, self(), StreamRef, IsFin, Data},
	ok.

%% Tunneling.

-spec connect(pid(), connect_destination()) -> reference().
connect(ServerPid, Destination) ->
	connect(ServerPid, Destination, [], #{}).

-spec connect(pid(), connect_destination(), headers()) -> reference().
connect(ServerPid, Destination, Headers) ->
	connect(ServerPid, Destination, Headers, #{}).

-spec connect(pid(), connect_destination(), headers(), req_opts()) -> reference().
connect(ServerPid, Destination, Headers, ReqOpts) ->
	StreamRef = make_ref(),
	ReplyTo = maps:get(reply_to, ReqOpts, self()),
	_ = ServerPid ! {connect, ReplyTo, StreamRef, Destination, Headers},
	StreamRef.

%% Awaiting gun messages.

%% @todo spec await await_body

await(ServerPid, StreamRef) ->
	MRef = monitor(process, ServerPid),
	Res = await(ServerPid, StreamRef, 5000, MRef),
	demonitor(MRef, [flush]),
	Res.

await(ServerPid, StreamRef, MRef) when is_reference(MRef) ->
	await(ServerPid, StreamRef, 5000, MRef);
await(ServerPid, StreamRef, Timeout) ->
	MRef = monitor(process, ServerPid),
	Res = await(ServerPid, StreamRef, Timeout, MRef),
	demonitor(MRef, [flush]),
	Res.

%% @todo Add gun_upgrade and gun_ws?
await(ServerPid, StreamRef, Timeout, MRef) ->
	receive
		{gun_inform, ServerPid, StreamRef, Status, Headers} ->
			{inform, Status, Headers};
		{gun_response, ServerPid, StreamRef, IsFin, Status, Headers} ->
			{response, IsFin, Status, Headers};
		{gun_data, ServerPid, StreamRef, IsFin, Data} ->
			{data, IsFin, Data};
		{gun_trailers, ServerPid, StreamRef, Trailers} ->
			{trailers, Trailers};
		{gun_push, ServerPid, StreamRef, NewStreamRef, Method, URI, Headers} ->
			{push, NewStreamRef, Method, URI, Headers};
		{gun_error, ServerPid, StreamRef, Reason} ->
			{error, Reason};
		{gun_error, ServerPid, Reason} ->
			{error, Reason};
		{'DOWN', MRef, process, ServerPid, Reason} ->
			{error, Reason}
	after Timeout ->
		{error, timeout}
	end.

await_body(ServerPid, StreamRef) ->
	MRef = monitor(process, ServerPid),
	Res = await_body(ServerPid, StreamRef, 5000, MRef, <<>>),
	demonitor(MRef, [flush]),
	Res.

await_body(ServerPid, StreamRef, MRef) when is_reference(MRef) ->
	await_body(ServerPid, StreamRef, 5000, MRef, <<>>);
await_body(ServerPid, StreamRef, Timeout) ->
	MRef = monitor(process, ServerPid),
	Res = await_body(ServerPid, StreamRef, Timeout, MRef, <<>>),
	demonitor(MRef, [flush]),
	Res.

await_body(ServerPid, StreamRef, Timeout, MRef) ->
	await_body(ServerPid, StreamRef, Timeout, MRef, <<>>).

await_body(ServerPid, StreamRef, Timeout, MRef, Acc) ->
	receive
		{gun_data, ServerPid, StreamRef, nofin, Data} ->
			await_body(ServerPid, StreamRef, Timeout, MRef,
				<< Acc/binary, Data/binary >>);
		{gun_data, ServerPid, StreamRef, fin, Data} ->
			{ok, << Acc/binary, Data/binary >>};
		%% It's OK to return trailers here because the client
		%% specifically requested them.
		{gun_trailers, ServerPid, StreamRef, Trailers} ->
			{ok, Acc, Trailers};
		{gun_error, ServerPid, StreamRef, Reason} ->
			{error, Reason};
		{gun_error, ServerPid, Reason} ->
			{error, Reason};
		{'DOWN', MRef, process, ServerPid, Reason} ->
			{error, Reason}
	after Timeout ->
		{error, timeout}
	end.

-spec await_up(pid()) -> {ok, http | http2} | {error, atom()}.
await_up(ServerPid) ->
	MRef = monitor(process, ServerPid),
	Res = await_up(ServerPid, 5000, MRef),
	demonitor(MRef, [flush]),
	Res.

-spec await_up(pid(), reference() | timeout()) -> {ok, http | http2} | {error, atom()}.
await_up(ServerPid, MRef) when is_reference(MRef) ->
	await_up(ServerPid, 5000, MRef);
await_up(ServerPid, Timeout) ->
	MRef = monitor(process, ServerPid),
	Res = await_up(ServerPid, Timeout, MRef),
	demonitor(MRef, [flush]),
	Res.

-spec await_up(pid(), timeout(), reference()) -> {ok, http | http2} | {error, atom()}.
await_up(ServerPid, Timeout, MRef) ->
	receive
		{gun_up, ServerPid, Protocol} ->
			{ok, Protocol};
		{'DOWN', MRef, process, ServerPid, Reason} ->
			{error, Reason}
	after Timeout ->
		{error, timeout}
	end.

-spec flush(pid() | reference()) -> ok.
flush(ServerPid) when is_pid(ServerPid) ->
	flush_pid(ServerPid);
flush(StreamRef) ->
	flush_ref(StreamRef).

flush_pid(ServerPid) ->
	receive
		{gun_up, ServerPid, _} ->
			flush_pid(ServerPid);
		{gun_down, ServerPid, _, _, _, _} ->
			flush_pid(ServerPid);
		{gun_inform, ServerPid, _, _, _} ->
			flush_pid(ServerPid);
		{gun_response, ServerPid, _, _, _, _} ->
			flush_pid(ServerPid);
		{gun_data, ServerPid, _, _, _} ->
			flush_pid(ServerPid);
		{gun_trailers, ServerPid, _, _} ->
			flush_pid(ServerPid);
		{gun_push, ServerPid, _, _, _, _, _, _} ->
			flush_pid(ServerPid);
		{gun_error, ServerPid, _, _} ->
			flush_pid(ServerPid);
		{gun_error, ServerPid, _} ->
			flush_pid(ServerPid);
		{gun_upgrade, ServerPid, _, _, _} ->
			flush_pid(ServerPid);
		{gun_ws, ServerPid, _, _} ->
			flush_pid(ServerPid);
		{'DOWN', _, process, ServerPid, _} ->
			flush_pid(ServerPid)
	after 0 ->
		ok
	end.

flush_ref(StreamRef) ->
	receive
		{gun_inform, _, StreamRef, _, _} ->
			flush_pid(StreamRef);
		{gun_response, _, StreamRef, _, _, _} ->
			flush_ref(StreamRef);
		{gun_data, _, StreamRef, _, _} ->
			flush_ref(StreamRef);
		{gun_trailers, _, StreamRef, _} ->
			flush_ref(StreamRef);
		{gun_push, _, StreamRef, _, _, _, _, _} ->
			flush_ref(StreamRef);
		{gun_error, _, StreamRef, _} ->
			flush_ref(StreamRef);
		{gun_upgrade, _, StreamRef, _, _} ->
			flush_ref(StreamRef);
		{gun_ws, _, StreamRef, _} ->
			flush_ref(StreamRef)
	after 0 ->
		ok
	end.

%% Cancelling a stream.

-spec cancel(pid(), reference()) -> ok.
cancel(ServerPid, StreamRef) ->
	_ = ServerPid ! {cancel, self(), StreamRef},
	ok.

%% @todo Allow upgrading an HTTP/1.1 connection to HTTP/2.
%% http2_upgrade

%% Websocket.

-spec ws_upgrade(pid(), iodata()) -> reference().
ws_upgrade(ServerPid, Path) ->
	ws_upgrade(ServerPid, Path, []).

-spec ws_upgrade(pid(), iodata(), headers()) -> reference().
ws_upgrade(ServerPid, Path, Headers) ->
	StreamRef = make_ref(),
	_ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers},
	StreamRef.

-spec ws_upgrade(pid(), iodata(), headers(), ws_opts()) -> reference().
ws_upgrade(ServerPid, Path, Headers, Opts) ->
	ok = gun_ws:check_options(Opts),
	StreamRef = make_ref(),
	_ = ServerPid ! {ws_upgrade, self(), StreamRef, Path, Headers, Opts},
	StreamRef.

%% @todo ws_send/2 will need to be deprecated in favor of a variant with StreamRef.
%% But it can be kept for the time being since it can still work for HTTP/1.1.
-spec ws_send(pid(), ws_frame() | [ws_frame()]) -> ok.
ws_send(ServerPid, Frames) ->
	_ = ServerPid ! {ws_send, self(), Frames},
	ok.

%% Internals.

start_link(Owner, Host, Port, Opts) ->
	proc_lib:start_link(?MODULE, proc_lib_hack,
		[self(), Owner, Host, Port, Opts]).

proc_lib_hack(Parent, Owner, Host, Port, Opts) ->
	try
		init(Parent, Owner, Host, Port, Opts)
	catch
		_:normal -> exit(normal);
		_:shutdown -> exit(shutdown);
		_:Reason = {shutdown, _} -> exit(Reason);
		_:Reason -> exit({Reason, erlang:get_stacktrace()})
	end.

init(Parent, Owner, Host, Port, Opts) ->
	ok = proc_lib:init_ack(Parent, {ok, self()}),
	Retry = maps:get(retry, Opts, 5),
	Transport = case maps:get(transport, Opts, default_transport(Port)) of
		tcp -> gun_tcp;
		tls -> gun_tls
	end,
	OwnerRef = monitor(process, Owner),
	transport_connect(#state{parent=Parent, owner=Owner, owner_ref=OwnerRef,
		host=Host, port=Port, origin_host=Host, origin_port=Port,
		opts=Opts, transport=Transport}, Retry).

default_transport(443) -> tls;
default_transport(_) -> tcp.

transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport=gun_tls}, Retries) ->
	TransportOpts = [binary, {active, false}|ensure_alpn(
		maps:get(protocols, Opts, [http2, http]),
		maps:get(transport_opts, Opts, []))],
	case Transport:connect(Host, Port, TransportOpts, maps:get(connect_timeout, Opts, infinity)) of
		{ok, Socket} ->
			{Protocol, ProtoOptsKey} = case ssl:negotiated_protocol(Socket) of
				{ok, <<"h2">>} -> {gun_http2, http2_opts};
				_ -> {gun_http, http_opts}
			end,
			up(State, Socket, Protocol, ProtoOptsKey);
		{error, Reason} ->
			retry(State#state{last_error=Reason}, Retries)
	end;
transport_connect(State=#state{host=Host, port=Port, opts=Opts, transport=Transport}, Retries) ->
	TransportOpts = [binary, {active, false}
		|maps:get(transport_opts, Opts, [])],
	case Transport:connect(Host, Port, TransportOpts, maps:get(connect_timeout, Opts, infinity)) of
		{ok, Socket} ->
			{Protocol, ProtoOptsKey} = case maps:get(protocols, Opts, [http]) of
				[http] -> {gun_http, http_opts};
				[http2] -> {gun_http2, http2_opts}
			end,
			up(State, Socket, Protocol, ProtoOptsKey);
		{error, Reason} ->
			retry(State#state{last_error=Reason}, Retries)
	end.

ensure_alpn(Protocols0, TransportOpts) ->
	Protocols = [case P of
		http -> <<"http/1.1">>;
		http2 -> <<"h2">>
	end || P <- Protocols0],
	[
		{alpn_advertised_protocols, Protocols},
		{client_preferred_next_protocols, {client, Protocols, <<"http/1.1">>}}
	|TransportOpts].

up(State=#state{owner=Owner, opts=Opts, transport=Transport}, Socket, Protocol, ProtoOptsKey) ->
	ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}),
	ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
	Owner ! {gun_up, self(), Protocol:name()},
	before_loop(State#state{socket=Socket, protocol=Protocol, protocol_state=ProtoState}).

down(State=#state{owner=Owner, opts=Opts, protocol=Protocol, protocol_state=ProtoState}, Reason) ->
	{KilledStreams, UnprocessedStreams} = Protocol:down(ProtoState),
	Owner ! {gun_down, self(), Protocol:name(), Reason, KilledStreams, UnprocessedStreams},
	retry(State#state{socket=undefined, protocol=undefined, protocol_state=undefined,
		last_error=Reason}, maps:get(retry, Opts, 5)).

retry(#state{last_error=Reason}, 0) ->
	exit({shutdown, Reason});
retry(State=#state{keepalive_ref=KeepaliveRef}, Retries) when is_reference(KeepaliveRef) ->
	_ = erlang:cancel_timer(KeepaliveRef),
	%% Flush if we have a keepalive message
	receive
		keepalive -> ok
	after 0 ->
		ok
	end,
	retry_loop(State#state{keepalive_ref=undefined}, Retries - 1);
retry(State, Retries) ->
	retry_loop(State, Retries - 1).

retry_loop(State=#state{parent=Parent, opts=Opts}, Retries) ->
	_ = erlang:send_after(maps:get(retry_timeout, Opts, 5000), self(), retry),
	receive
		retry ->
			transport_connect(State, Retries);
		{system, From, Request} ->
			sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
				{retry_loop, State, Retries})
	end.

before_loop(State=#state{opts=Opts, protocol=Protocol}) ->
	%% @todo Might not be worth checking every time?
	ProtoOptsKey = case Protocol of
		gun_http -> http_opts;
		gun_http2 -> http2_opts
	end,
	ProtoOpts = maps:get(ProtoOptsKey, Opts, #{}),
	Keepalive = maps:get(keepalive, ProtoOpts, 5000),
	KeepaliveRef = case Keepalive of
		infinity -> undefined;
		_ -> erlang:send_after(Keepalive, self(), keepalive)
	end,
	loop(State#state{keepalive_ref=KeepaliveRef}).

loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef,
		origin_host=Host, origin_port=Port, opts=Opts, socket=Socket,
		transport=Transport, protocol=Protocol, protocol_state=ProtoState}) ->
	{OK, Closed, Error} = Transport:messages(),
	Transport:setopts(Socket, [{active, once}]),
	receive
		{OK, Socket, Data} ->
			case Protocol:handle(Data, ProtoState) of
				Commands when is_list(Commands) ->
					commands(Commands, State);
				Command ->
					commands([Command], State)
			end;
		{Closed, Socket} ->
			Protocol:close(ProtoState),
			Transport:close(Socket),
			down(State, closed);
		{Error, Socket, Reason} ->
			Protocol:close(ProtoState),
			Transport:close(Socket),
			down(State, {error, Reason});
		{OK, _PreviousSocket, _Data} ->
			loop(State);
		{Closed, _PreviousSocket} ->
			loop(State);
		{Error, _PreviousSocket, _} ->
			loop(State);
		keepalive ->
			ProtoState2 = Protocol:keepalive(ProtoState),
			before_loop(State#state{protocol_state=ProtoState2});
		{request, ReplyTo, StreamRef, Method, Path, Headers, <<>>} ->
			ProtoState2 = Protocol:request(ProtoState,
				StreamRef, ReplyTo, Method, Host, Port, Path, Headers),
			loop(State#state{protocol_state=ProtoState2});
		{request, ReplyTo, StreamRef, Method, Path, Headers, Body} ->
			ProtoState2 = Protocol:request(ProtoState,
				StreamRef, ReplyTo, Method, Host, Port, Path, Headers, Body),
			loop(State#state{protocol_state=ProtoState2});
		%% @todo Do we want to reject ReplyTo if it's not the process
		%% who initiated the connection? For both data and cancel.
		{data, ReplyTo, StreamRef, IsFin, Data} ->
			ProtoState2 = Protocol:data(ProtoState,
				StreamRef, ReplyTo, IsFin, Data),
			loop(State#state{protocol_state=ProtoState2});
		{connect, ReplyTo, StreamRef, Destination0, Headers} ->
			%% The protocol option has been deprecated in favor of the protocols option.
			%% Nobody probably ended up using it, but let's not break the interface.
			Destination1 = case Destination0 of
				#{protocols := _} ->
					Destination0;
				#{protocol := DestProto} ->
					Destination0#{protocols => [DestProto]};
				_ ->
					Destination0
			end,
			Destination = case Destination1 of
				#{transport := tls} ->
					Destination1#{tls_opts => ensure_alpn(
						maps:get(protocols, Destination1, [http]),
						maps:get(tls_opts, Destination1, []))};
				_ ->
					Destination1
			end,
			ProtoState2 = Protocol:connect(ProtoState, StreamRef, ReplyTo, Destination, Headers),
			loop(State#state{protocol_state=ProtoState2});
		{cancel, ReplyTo, StreamRef} ->
			ProtoState2 = Protocol:cancel(ProtoState, StreamRef, ReplyTo),
			loop(State#state{protocol_state=ProtoState2});
		%% @todo Maybe make an interface in the protocol module instead of checking on protocol name.
		%% An interface would also make sure that HTTP/1.0 can't upgrade.
		{ws_upgrade, Owner, StreamRef, Path, Headers} when Protocol =:= gun_http ->
			WsOpts = maps:get(ws_opts, Opts, #{}),
			ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts),
			loop(State#state{protocol_state=ProtoState2});
		{ws_upgrade, Owner, StreamRef, Path, Headers, WsOpts} when Protocol =:= gun_http ->
			ProtoState2 = Protocol:ws_upgrade(ProtoState, StreamRef, Host, Port, Path, Headers, WsOpts),
			loop(State#state{protocol_state=ProtoState2});
			%% @todo can fail if http/1.0
		{shutdown, Owner} ->
			%% @todo Protocol:shutdown?
			ok;
		{'DOWN', OwnerRef, process, Owner, Reason} ->
			Protocol:close(ProtoState),
			Transport:close(Socket),
			owner_gone(Reason);
		{system, From, Request} ->
			sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
				{loop, State});
		{ws_upgrade, _, StreamRef, _, _} ->
			Owner ! {gun_error, self(), StreamRef, {badstate,
				"Websocket is only supported over HTTP/1.1."}},
			loop(State);
		{ws_upgrade, _, StreamRef, _, _, _} ->
			Owner ! {gun_error, self(), StreamRef, {badstate,
				"Websocket is only supported over HTTP/1.1."}},
			loop(State);
		{ws_send, _, _} ->
			Owner ! {gun_error, self(), {badstate,
				"Connection needs to be upgraded to Websocket "
				"before the gun:ws_send/1 function can be used."}},
			loop(State);
		%% @todo The ReplyTo patch disabled the notowner behavior.
		%% We need to add an option to enforce this behavior if needed.
		Any when is_tuple(Any), is_pid(element(2, Any)) ->
			element(2, Any) ! {gun_error, self(), {notowner,
				"Operations are restricted to the owner of the connection."}},
			loop(State);
		Any ->
			error_logger:error_msg("Unexpected message: ~w~n", [Any]),
			loop(State)
	end.

commands([], State) ->
	loop(State);
commands([close|_], State=#state{socket=Socket, transport=Transport}) ->
	Transport:close(Socket),
	down(State, normal);
commands([Error={error, _}|_], State=#state{socket=Socket, transport=Transport}) ->
	Transport:close(Socket),
	down(State, Error);
commands([{state, ProtoState}|Tail], State) ->
	commands(Tail, State#state{protocol_state=ProtoState});
%% @todo The scheme should probably not be ignored.
%%
%% Order is important: the origin must be changed before
%% the transport and/or protocol in order to keep track
%% of the intermediaries properly.
commands([{origin, _Scheme, Host, Port, Type}|Tail],
		State=#state{transport=Transport, protocol=Protocol,
			origin_host=IntermediateHost, origin_port=IntermediatePort,
			intermediaries=Intermediaries}) ->
	Info = #{
		type => Type,
		host => IntermediateHost,
		port => IntermediatePort,
		transport => Transport:name(),
		protocol => Protocol:name()
	},
	commands(Tail, State#state{origin_host=Host, origin_port=Port,
		intermediaries=[Info|Intermediaries]});
commands([{switch_transport, Transport, Socket}|Tail], State) ->
	commands(Tail, State#state{socket=Socket, transport=Transport});
%% @todo The two loops should be reunified and this clause generalized.
commands([{switch_protocol, Protocol=gun_ws, ProtoState}], State) ->
	ws_loop(State#state{protocol=Protocol, protocol_state=ProtoState});
%% @todo And this state should probably not be ignored.
commands([{switch_protocol, Protocol, _ProtoState0}|Tail],
		State=#state{owner=Owner, opts=Opts, socket=Socket, transport=Transport}) ->
	ProtoOpts = maps:get(http2_opts, Opts, #{}),
	ProtoState = Protocol:init(Owner, Socket, Transport, ProtoOpts),
	commands(Tail, State#state{protocol=Protocol, protocol_state=ProtoState}).

ws_loop(State=#state{parent=Parent, owner=Owner, owner_ref=OwnerRef, socket=Socket,
		transport=Transport, protocol=Protocol, protocol_state=ProtoState}) ->
	{OK, Closed, Error} = Transport:messages(),
	Transport:setopts(Socket, [{active, once}]),
	receive
		{OK, Socket, Data} ->
			case Protocol:handle(Data, ProtoState) of
				close ->
					Transport:close(Socket),
					down(State, normal);
				ProtoState2 ->
					ws_loop(State#state{protocol_state=ProtoState2})
			end;
		{Closed, Socket} ->
			Transport:close(Socket),
			down(State, closed);
		{Error, Socket, Reason} ->
			Transport:close(Socket),
			down(State, {error, Reason});
		%% Ignore any previous HTTP keep-alive.
		keepalive ->
			ws_loop(State);
%		{ws_send, Owner, Frames} when is_list(Frames) ->
%			todo; %% @todo
		{ws_send, Owner, Frame} ->
			case Protocol:send(Frame, ProtoState) of
				close ->
					Transport:close(Socket),
					down(State, normal);
				ProtoState2 ->
					ws_loop(State#state{protocol_state=ProtoState2})
			end;
		{shutdown, Owner} ->
			%% @todo Protocol:shutdown? %% @todo close frame
			ok;
		{'DOWN', OwnerRef, process, Owner, Reason} ->
			Protocol:close(owner_gone, ProtoState),
			Transport:close(Socket),
			owner_gone(Reason);
		{system, From, Request} ->
			sys:handle_system_msg(Request, From, Parent, ?MODULE, [],
				{ws_loop, State});
		Any when is_tuple(Any), is_pid(element(2, Any)) ->
			element(2, Any) ! {gun_error, self(), {notowner,
				"Operations are restricted to the owner of the connection."}},
			ws_loop(State);
		Any ->
			error_logger:error_msg("Unexpected message: ~w~n", [Any])
	end.

-spec owner_gone(_) -> no_return().
owner_gone(normal) -> exit(normal);
owner_gone(shutdown) -> exit(shutdown);
owner_gone(Shutdown = {shutdown, _}) -> exit(Shutdown);
owner_gone(Reason) -> error({owner_gone, Reason}).

system_continue(_, _, {retry_loop, State, Retry}) ->
	retry_loop(State, Retry);
system_continue(_, _, {loop, State}) ->
	loop(State);
system_continue(_, _, {ws_loop, State}) ->
	ws_loop(State).

-spec system_terminate(any(), _, _, _) -> no_return().
system_terminate(Reason, _, _, _) ->
	exit(Reason).

system_code_change(Misc, _, _, _) ->
	{ok, Misc}.