aboutsummaryrefslogblamecommitdiffstats
path: root/lib/inets/src/http_client/httpc_handler.erl
blob: 25f9b0777f9c34c593a20a1bd325ac0ba194a692 (plain) (tree)
1
2
3
4
5
6
7
8
9
10

                   


                                                        




                                                                      
  



                                                                         
  













                                                                      

                                                     




















                                                                          
                                                                                  

































                                                                            






                                                  







































































































                                                                                          

                                                                       
  









                                                                      

                                                                           
                                  
                                            





                                               









                                                                         












































                                                                           



                                                                           


                                      
                                                                




                                                                         
 
                                       

                                                                
                                                                        
                                                        
                                                                          
                                  




                                                                          
                                                 
                                                                   

                                                                   
                            
                                                                   
                                                       
                                                    







                                                                          







                                                                        

                          
                                                                


                                                     







                                                                         
    





                                                                           

                                                        



                                                                
                       



                                                                         



                                                                        

                                                                
                                                                             
                                   




                                                                          
                                                 
                                                                      

                                                                     


                                                                  
                                                    







                                                                          




                                                                     
                


                                                                


                                                    
 


















                                                                       






                                                                     
                                                           
                          
                   







                                                                  
                                                           


                                                              





















                                                                          






                                                        









                                                                 

                                                                          



                                                                    






                                                                    

                                                  

                                                                       








                                                                                


                                                                            
                                         



                                                                               







                                                     



                                           
















                                                                              
 






































                                                                            
                                                           
                                              

                                                                




                                                                         
                                        

                                                                              
                                                                 


                                                                   
                                                     





                                                      
                                  



                                                                    
                                    









































                                                                       



                                                               
























                                                                        








                                                                 




                                                           
                                                      
 





                                                               


















                                                                        
                                     
       
                                                                       

                                                                     
                                                                            

                                        

                                                       















































































                                                                                  







                                                                      
                                                                        


                                                        

                                                              
                                                                            

                                                                
                                                    
                                  
                                                                 

                                                                      


                                                                          
                                                                










                                                                            


                                                                  




                                                                             

                




                                                                     






                                                                    
                                                              
            
                                                                    





                                                                           


                                                                             

                                                                    
                                                                   
                                                           
                                                                    













                                                                            
                                                               
                                                                  






                                                                  







                                                                    



                                                        



                                                                        


                                                              















                                                                             
                                                                              


                                                                   


                                                                          














                                                                               




                                                                 





                                                 








                                                            



                                                                       
                                                    









                                                                            






                                                            


                                                                  
                                                              
                                                                      


                                                                
                                                           

                                                            
                                                                
 



                                                              

                                                              


                                                                         

                                                                       


                                                                          
                                                                      



                                                           
                                                          












                                                                             
                                                                  





                                                                        
                                                  

                          
                                                          

                                         
                                                        

                                 

                                                
                                                  







                                                                              

                                           

                                              



                                                                  

                                    

                                                                     
                                                                




                                                                  

                                                     
                                                    

                                                      



                                                                
                                                                 




                                                                           
                                                                    






                                                                            


                                                                    
                               


                                                              
                                                            

                                                             















                                                                       










                                                                         

                                             
                                                

























                                                                             


                                                                        
                        
                                                                    

                                                                            

                                                                        
                              


                                                              
                                                            

                                                             

                                
                                                    









                                                                      






                                                            







                                                                     


                   
















                                                                     
























                                                                            





                                                  











                                                                       


                                                                         




                                       

                                                                 


































                                                                          
                                               
                          





                                                                               





















































































                                                                            

                                                         

                                      

                                                       











                                                                

                                                        




































































































































                                                                                 

















                                                               
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2002-2010. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
%%

-module(httpc_handler).

-behaviour(gen_server).

-include("httpc_internal.hrl").
-include("http_internal.hrl").


%%--------------------------------------------------------------------
%% Internal Application API
-export([start_link/2, connect_and_send/2, 
	 send/2, cancel/2, stream/3, stream_next/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
	 terminate/2, code_change/3]).

-record(timers, 
	{
	  request_timers = [], % [ref()]
	  queue_timer          % ref()
	 }).

-record(state, 
	{
	  request,                   % #request{}
	  session,                   % #tcp_session{} 
	  status_line,               % {Version, StatusCode, ReasonPharse}
	  headers,                   % #http_response_h{}
	  body,                      % binary()
	  mfa,                       % {Moduel, Function, Args}
	  pipeline = queue:new(),    % queue() 
	  keep_alive = queue:new(),  % queue() 
	  status,   % undefined | new | pipeline | keep_alive | close | ssl_tunnel
	  canceled = [],	     % [RequestId]
	  max_header_size = nolimit, % nolimit | integer() 
	  max_body_size = nolimit,   % nolimit | integer()
	  options,                   % #options{}
	  timers = #timers{},        % #timers{}
	  profile_name,              % atom() - id of httpc_manager process.
	  once                       % send | undefined 
	 }).


%%====================================================================
%% External functions
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link(Request, Options, ProfileName) -> {ok, Pid}
%%
%%	Request = #request{}
%%	Options =  #options{} 
%%      ProfileName = atom() - id of httpc manager process
%%
%% Description: Starts a http-request handler process. Intended to be
%% called by the httpc profile supervisor or the http manager process
%% if the client is started stand alone form inets.
%%
%% Note: Uses proc_lib and gen_server:enter_loop so that waiting
%% for gen_tcp:connect to timeout in init/1 will not
%% block the httpc manager process in odd cases such as trying to call
%% a server that does not exist. (See OTP-6735) The only API function
%% sending messages to the handler process that can be called before
%% init has compleated is cancel and that is not a problem! (Send and
%% stream will not be called before the first request has been sent and
%% the reply or part of it has arrived.)
%%--------------------------------------------------------------------
%%--------------------------------------------------------------------

start_link(Options, ProfileName) ->
    Args = [Options, ProfileName], 
    gen_server:start_link(?MODULE, Args, []).

connect_and_send(Request, HandlerPid) ->
    call({connect_and_send, Request}, HandlerPid).


%%--------------------------------------------------------------------
%% Function: send(Request, Pid) -> ok 
%%	Request = #request{}
%%      Pid = pid() - the pid of the http-request handler process.
%%
%% Description: Uses this handlers session to send a request. Intended
%% to be called by the httpc manager process.
%%--------------------------------------------------------------------
send(Request, Pid) ->
    call(Request, Pid, 5000).


%%--------------------------------------------------------------------
%% Function: cancel(RequestId, Pid) -> ok
%%	RequestId = ref()
%%      Pid = pid() -  the pid of the http-request handler process.
%%
%% Description: Cancels a request. Intended to be called by the httpc
%% manager process.
%%--------------------------------------------------------------------
cancel(RequestId, Pid) ->
    cast({cancel, RequestId}, Pid).


%%--------------------------------------------------------------------
%% Function: stream_next(Pid) -> ok
%%      Pid = pid() -  the pid of the http-request handler process.
%%
%% Description: Works as inets:setopts(active, once) but for
%% body chunks sent to the user.
%%--------------------------------------------------------------------
stream_next(Pid) ->
    cast(stream_next, Pid).


%%--------------------------------------------------------------------
%% Function: stream(BodyPart, Request, Code) -> _
%%	BodyPart = binary()
%%      Request = #request{}
%%      Code = integer()
%%
%% Description: Stream the HTTP body to the caller process (client) 
%%              or to a file. Note that the data that has been stream
%%              does not have to be saved. (We do not want to use up
%%              memory in vain.)
%%--------------------------------------------------------------------
%% Request should not be streamed
stream(BodyPart, Request = #request{stream = none}, _) ->
    ?hcrt("stream - none", [{body_part, BodyPart}]),
    {BodyPart, Request};

%% Stream to caller
stream(BodyPart, Request = #request{stream = Self}, Code) 
  when ((Code =:= 200) orelse  (Code =:= 206)) andalso 
       ((Self =:= self) orelse (Self =:= {self, once})) ->
    ?hcrt("stream - self", [{stream, Self}, {code, Code}, {body_part, BodyPart}]),
    httpc_response:send(Request#request.from, 
			{Request#request.id, stream, BodyPart}),
    {<<>>, Request};

stream(BodyPart, Request = #request{stream = Self}, 404) 
  when (Self =:= self) orelse (Self =:= {self, once}) ->
    ?hcrt("stream - self with 404", [{stream, Self}, {body_part, BodyPart}]),
    httpc_response:send(Request#request.from,
                       {Request#request.id, stream, BodyPart}),
    {<<>>, Request};

%% Stream to file
%% This has been moved to start_stream/3
%% We keep this for backward compatibillity...
stream(BodyPart, Request = #request{stream = Filename}, Code)
  when ((Code =:= 200) orelse (Code =:= 206)) andalso is_list(Filename) -> 
    ?hcrt("stream - filename", [{stream, Filename}, {code, Code}, {body_part, BodyPart}]),
    case file:open(Filename, [write, raw, append, delayed_write]) of
	{ok, Fd} ->
	    ?hcrt("stream - file open ok", [{fd, Fd}]),
	    stream(BodyPart, Request#request{stream = Fd}, 200);
	{error, Reason} ->
	    exit({stream_to_file_failed, Reason})
    end;

%% Stream to file
stream(BodyPart, Request = #request{stream = Fd}, Code)  
  when ((Code =:= 200) orelse (Code =:= 206)) -> 
    ?hcrt("stream to file", [{stream, Fd}, {code, Code}, {body_part, BodyPart}]),
    case file:write(Fd, BodyPart) of
	ok ->
	    {<<>>, Request};
	{error, Reason} ->
	    exit({stream_to_file_failed, Reason})
    end;

stream(BodyPart, Request,_) -> % only 200 and 206 responses can be streamed
    ?hcrt("stream - ignore", [{request, Request}, {body_part, BodyPart}]),
    {BodyPart, Request}.


%%====================================================================
%% Server functions
%%====================================================================

%%--------------------------------------------------------------------
%% Function: init([Options, ProfileName]) -> {ok, State} | 
%%                       {ok, State, Timeout} | ignore | {stop, Reason}
%%
%%	Options =  #options{} 
%%      ProfileName = atom() - id of httpc manager process
%%
%% Description: Initiates the httpc_handler process 
%%
%% Note: The init function may not fail, that will kill the
%% httpc_manager process. We could make the httpc_manager more comlex
%% but we do not want that so errors will be handled by the process
%% sending an init_error message to itself.
%%--------------------------------------------------------------------
init([Options, ProfileName]) ->
    ?hcrv("init - starting", [{options, Options}, {profile, ProfileName}]),
    process_flag(trap_exit, true),
    handle_verbose(Options#options.verbose),
    State = #state{status       = undefined,
		   options      = Options, 
		   profile_name = ProfileName},
    ?hcrd("init - started", []),
    {ok, State}.


%%--------------------------------------------------------------------
%% Function: handle_call(Request, From, State) -> {reply, Reply, State} |
%%          {reply, Reply, State, Timeout} |
%%          {noreply, State}               |
%%          {noreply, State, Timeout}      |
%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
%%          {stop, Reason, State}            (terminate/2 is called)
%% Description: Handling call messages
%%--------------------------------------------------------------------


%% This is the first request, the reason the proc was started
handle_call({connect_and_send, #request{address = Address0,
					scheme  = Scheme} = Request}, 
	    _From, 
	    #state{options = #options{proxy = Proxy}, 
		   status  = undefined,
		   session = undefined} = State) ->
    ?hcrv("connect and send", [{address0, Address0}, {proxy, Proxy}]),
    Address = handle_proxy(Address0, Proxy),
    if
	((Address =/= Address0) andalso (Scheme  =:= https)) ->
	    %% This is what we should do if and when ssl supports 
	    %% "socket upgrading"
	    %%send_ssl_tunnel_request(Address, Request,
	    %%		    #state{options = Options,
	    %%		   status = ssl_tunnel});
	    Reason = https_through_proxy_is_not_currently_supported,
	    Error  = {error, Reason},
	    {stop, Error, Error, State};
	true ->
	    case connect_and_send_first_request(Address, Request, State) of
		{ok, NewState} ->
		    {reply, ok, NewState};
		{stop, Error, NewState} ->
		    {stop, Error, Error, NewState}
	    end
    end;
	
handle_call(Request, _, 
	    #state{status  = Status,
		   session = #tcp_session{socket = Socket,
					  type   = pipeline} = Session,
		   timers  = Timers,
		   options = Options,
		   profile_name = ProfileName} = State) 
  when Status =/= undefined ->

    ?hcrv("new request", [{request,      Request}, 
			  {profile,      ProfileName}, 
			  {status,       Status}, 
			  {session_type, pipeline}, 
			  {timers,       Timers}]),

    Address = handle_proxy(Request#request.address, Options#options.proxy),

    case httpc_request:send(Address, Request, Socket) of
        ok ->

	    ?hcrd("request sent", []),

	    %% Activate the request time out for the new request
	    NewState = 
		activate_request_timeout(State#state{request = Request}),

	    ClientClose = 
		httpc_request:is_client_closing(Request#request.headers),

            case State#state.request of
                #request{} -> %% Old request not yet finished
		    ?hcrd("old request still not finished", []),
		    %% Make sure to use the new value of timers in state
		    NewTimers   = NewState#state.timers,
                    NewPipeline = queue:in(Request, State#state.pipeline),
		    NewSession  = 
			Session#tcp_session{queue_length = 
					    %% Queue + current
					    queue:len(NewPipeline) + 1,
					    client_close = ClientClose},
		    httpc_manager:insert_session(NewSession, ProfileName),
		    ?hcrd("session updated", []),
                    {reply, ok, State#state{pipeline = NewPipeline,
					    session  = NewSession,
					    timers   = NewTimers}};
		undefined ->
		    %% Note: tcp-message receiving has already been
		    %% activated by handle_pipeline/2. 
		    ?hcrd("no current request", []),
		    cancel_timer(Timers#timers.queue_timer, 
				 timeout_queue),
		    NewSession = 
			Session#tcp_session{queue_length = 1,
					    client_close = ClientClose},
		    httpc_manager:insert_session(NewSession, ProfileName),
		    Relaxed = 
			(Request#request.settings)#http_options.relaxed, 
		    MFA = {httpc_response, parse, 
			   [State#state.max_header_size, Relaxed]}, 
		    NewTimers = Timers#timers{queue_timer = undefined}, 
		    ?hcrd("session created", []),
		    {reply, ok, NewState#state{request = Request,
					       session = NewSession,
					       mfa     = MFA,
					       timers  = NewTimers}}
	    end;
	{error, Reason} ->
	    ?hcri("failed sending request", [{reason, Reason}]),
	    {reply, {pipeline_failed, Reason}, State}
    end;

handle_call(Request, _, 
	    #state{status  = Status,
		   session = #tcp_session{socket = Socket,
					  type   = keep_alive} = Session,
		   timers  = Timers,
		   options = Options,
		   profile_name = ProfileName} = State) 
  when Status =/= undefined ->
    
    ?hcrv("new request", [{request,      Request}, 
			  {profile,      ProfileName}, 
			  {status,       Status}, 
			  {session_type, keep_alive}]),

    Address = handle_proxy(Request#request.address, Options#options.proxy),
    case httpc_request:send(Address, Request, Socket) of
	ok ->

	    ?hcrd("request sent", []),

	    %% Activate the request time out for the new request
	    NewState = 
		activate_request_timeout(State#state{request = Request}),

	    ClientClose = 
		httpc_request:is_client_closing(Request#request.headers),

	    case State#state.request of
		#request{} -> %% Old request not yet finished
		    %% Make sure to use the new value of timers in state
		    ?hcrd("old request still not finished", []),
		    NewTimers    = NewState#state.timers,
                    NewKeepAlive = queue:in(Request, State#state.keep_alive),
		    NewSession   = 
			Session#tcp_session{queue_length = 
					    %% Queue + current
					    queue:len(NewKeepAlive) + 1,
					    client_close = ClientClose},
		    httpc_manager:insert_session(NewSession, ProfileName),
		    ?hcrd("session updated", []),
                    {reply, ok, State#state{keep_alive = NewKeepAlive,
					    session    = NewSession,
					    timers     = NewTimers}};
		undefined ->
		    %% Note: tcp-message reciving has already been
		    %% activated by handle_pipeline/2. 
		    ?hcrd("no current request", []),
		    cancel_timer(Timers#timers.queue_timer, 
				 timeout_queue),
		    NewSession = 
			Session#tcp_session{queue_length = 1,
					    client_close = ClientClose},
		    httpc_manager:insert_session(NewSession, ProfileName),
		    Relaxed = 
			(Request#request.settings)#http_options.relaxed,
		    MFA = {httpc_response, parse,
			   [State#state.max_header_size, Relaxed]}, 
		    {reply, ok, NewState#state{request = Request,
					       session = NewSession, 
					       mfa     = MFA}}
	    end;

	{error, Reason} ->
	    ?hcri("failed sending request", [{reason, Reason}]),
	    {reply, {request_failed, Reason}, State}
    end.


%%--------------------------------------------------------------------
%% Function: handle_cast(Msg, State) -> {noreply, State} |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%% Description: Handling cast messages
%%--------------------------------------------------------------------

%% When the request in process has been canceled the handler process is
%% stopped and the pipelined requests will be reissued or remaining
%% requests will be sent on a new connection. This is is
%% based on the assumption that it is proably cheaper to reissue the
%% requests than to wait for a potentiall large response that we then
%% only throw away. This of course is not always true maybe we could
%% do something smarter here?! If the request canceled is not
%% the one handled right now the same effect will take place in
%% handle_pipeline/2 when the canceled request is on turn, 
%% handle_keep_alive_queue/2 on the other hand will just skip the
%% request as if it was never issued as in this case the request will
%% not have been sent. 
handle_cast({cancel, RequestId}, 
	    #state{request      = #request{id = RequestId} = Request,
		   profile_name = ProfileName,
		   canceled     = Canceled} = State) ->
    ?hcrv("cancel current request", [{request_id, RequestId}, 
				     {profile,    ProfileName},
				     {canceled,   Canceled}]),
    httpc_manager:request_canceled(RequestId, ProfileName),
    ?hcrv("canceled", []),
    {stop, normal, 
     State#state{canceled = [RequestId | Canceled],
		 request  = Request#request{from = answer_sent}}};
handle_cast({cancel, RequestId}, 
	    #state{profile_name = ProfileName,
		   canceled     = Canceled} = State) ->
    ?hcrv("cancel", [{request_id, RequestId}, 
		     {profile, ProfileName},
		     {canceled,   Canceled}]),
    httpc_manager:request_canceled(RequestId, ProfileName),
    ?hcrv("canceled", []),
    {noreply, State#state{canceled = [RequestId | Canceled]}};

handle_cast(stream_next, #state{session = Session} = State) ->
    http_transport:setopts(socket_type(Session#tcp_session.scheme), 
			   Session#tcp_session.socket, [{active, once}]), 
    {noreply, State#state{once = once}}.


%%--------------------------------------------------------------------
%% Function: handle_info(Info, State) -> {noreply, State} |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%% Description: Handling all non call/cast messages
%%--------------------------------------------------------------------
handle_info({Proto, _Socket, Data}, 
	    #state{mfa = {Module, Function, Args} = MFA, 
		   request = #request{method = Method, 
				      stream = Stream} = Request, 
		   session = Session, 
		   status_line = StatusLine} = State) 
  when (Proto =:= tcp) orelse 
       (Proto =:= ssl) orelse 
       (Proto =:= httpc_handler) ->

    ?hcri("received data", [{proto,       Proto}, 
			    {data,        Data}, 
			    {mfa,         MFA}, 
			    {method,      Method}, 
			    {stream,      Stream}, 
			    {session,     Session}, 
			    {status_line, StatusLine}]),

    FinalResult = 
	try Module:Function([Data | Args]) of
	    {ok, Result} ->
		?hcrd("data processed - ok", [{result, Result}]),
		handle_http_msg(Result, State); 
	    {_, whole_body, _} when Method =:= head ->
		?hcrd("data processed - whole body", []),
		handle_response(State#state{body = <<>>}); 
	    {Module, whole_body, [Body, Length]} ->
		?hcrd("data processed - whole body", 
		      [{module, Module}, {body, Body}, {length, Length}]),
		{_, Code, _} = StatusLine,
		{NewBody, NewRequest} = stream(Body, Request, Code),
		%% When we stream we will not keep the already
		%% streamed data, that would be a waste of memory.
		NewLength = 
		    case Stream of
			none ->   
			    Length;
			_ ->
			    Length - size(Body)			    
		    end,
		
		NewState = next_body_chunk(State),
		NewMFA   = {Module, whole_body, [NewBody, NewLength]}, 
		{noreply, NewState#state{mfa     = NewMFA,
					 request = NewRequest}};
	    NewMFA ->
		?hcrd("data processed", [{new_mfa, NewMFA}]),
		http_transport:setopts(socket_type(Session#tcp_session.scheme), 
				       Session#tcp_session.socket, 
				       [{active, once}]),
		{noreply, State#state{mfa = NewMFA}}
	catch
	    exit:_ ->
		ClientReason = {could_not_parse_as_http, Data}, 
		ClientErrMsg = httpc_response:error(Request, ClientReason),
		NewState     = answer_request(Request, ClientErrMsg, State),
		{stop, normal, NewState};
	    error:_ ->    
		ClientReason = {could_not_parse_as_http, Data}, 
		ClientErrMsg = httpc_response:error(Request, ClientReason),
		NewState     = answer_request(Request, ClientErrMsg, State),   
		{stop, normal, NewState}
	
	end,
    ?hcri("data processed", [{result, FinalResult}]),
    FinalResult;


handle_info({Proto, Socket, Data}, 
	    #state{mfa          = MFA, 
		   request      = Request, 
		   session      = Session, 
		   status       = Status,
		   status_line  = StatusLine, 
		   profile_name = Profile} = State) 
  when (Proto =:= tcp) orelse 
       (Proto =:= ssl) orelse 
       (Proto =:= httpc_handler) ->

    error_logger:warning_msg("Received unexpected ~p data on ~p"
			     "~n   Data:       ~p"
			     "~n   MFA:        ~p"
			     "~n   Request:    ~p"
			     "~n   Session:    ~p"
			     "~n   Status:     ~p"
			     "~n   StatusLine: ~p"
			     "~n   Profile:    ~p"
			     "~n", 
			     [Proto, Socket, Data, MFA, 
			      Request, Session, Status, StatusLine, Profile]),

    {noreply, State};


%% The Server may close the connection to indicate that the
%% whole body is now sent instead of sending an length
%% indicator.
handle_info({tcp_closed, _}, State = #state{mfa = {_, whole_body, Args}}) ->
    handle_response(State#state{body = hd(Args)}); 
handle_info({ssl_closed, _}, State = #state{mfa = {_, whole_body, Args}}) ->
    handle_response(State#state{body = hd(Args)}); 

%%% Server closes idle pipeline
handle_info({tcp_closed, _}, State = #state{request = undefined}) ->
    {stop, normal, State};
handle_info({ssl_closed, _}, State = #state{request = undefined}) ->
    {stop, normal, State};

%%% Error cases
handle_info({tcp_closed, _}, #state{session = Session0} = State) ->
    Socket  = Session0#tcp_session.socket,
    Session = Session0#tcp_session{socket = {remote_close, Socket}},
    %% {stop, session_remotly_closed, State};
    {stop, normal, State#state{session = Session}};
handle_info({ssl_closed, _}, #state{session = Session0} = State) ->
    Socket  = Session0#tcp_session.socket,
    Session = Session0#tcp_session{socket = {remote_close, Socket}},
    %% {stop, session_remotly_closed, State};
    {stop, normal, State#state{session = Session}};
handle_info({tcp_error, _, _} = Reason, State) ->
    {stop, Reason, State};
handle_info({ssl_error, _, _} = Reason, State) ->
    {stop, Reason, State};

%% Timeouts
%% Internally, to a request handling process, a request timeout is
%% seen as a canceled request.
handle_info({timeout, RequestId}, 
	    #state{request  = #request{id = RequestId} = Request,
		   canceled = Canceled} = State) ->
    ?hcri("timeout of current request", [{id, RequestId}]),
    httpc_response:send(Request#request.from, 
			httpc_response:error(Request, timeout)),
    ?hcrv("response (timeout) sent - now terminate", []),
    {stop, normal, 
     State#state{request  = Request#request{from = answer_sent},
		 canceled = [RequestId | Canceled]}};

handle_info({timeout, RequestId}, #state{canceled = Canceled} = State) ->
    ?hcri("timeout", [{id, RequestId}]),
    Filter = 
	fun(#request{id = Id, from = From} = Request) when Id =:= RequestId ->
		?hcrv("found request", [{id, Id}, {from, From}]),
		%% Notify the owner
		Response = httpc_response:error(Request, timeout), 
		httpc_response:send(From, Response),
		?hcrv("response (timeout) sent", []),
		[Request#request{from = answer_sent}];
	   (_) ->
		true
	end,
    case State#state.status of
	pipeline ->
	    ?hcrd("pipeline", []),
	    Pipeline = queue:filter(Filter, State#state.pipeline),
	    {noreply, State#state{canceled = [RequestId | Canceled],
				  pipeline = Pipeline}};
	keep_alive ->
	    ?hcrd("keep_alive", []),
	    KeepAlive = queue:filter(Filter, State#state.keep_alive),
	    {noreply, State#state{canceled   = [RequestId | Canceled],
				  keep_alive = KeepAlive}}
    end;

handle_info(timeout_queue, State = #state{request = undefined}) ->
    {stop, normal, State};

%% Timing was such as the pipeline_timout was not canceled!
handle_info(timeout_queue, #state{timers = Timers} = State) ->
    {noreply, State#state{timers = 
			  Timers#timers{queue_timer = undefined}}};

%% Setting up the connection to the server somehow failed. 
handle_info({init_error, _, ClientErrMsg},
	    State = #state{request = Request}) ->
    NewState = answer_request(Request, ClientErrMsg, State),
    {stop, normal, NewState};


%%% httpc_manager process dies. 
handle_info({'EXIT', _, _}, State = #state{request = undefined}) ->
    {stop, normal, State};
%%Try to finish the current request anyway,
%% there is a fairly high probability that it can be done successfully.
%% Then close the connection, hopefully a new manager is started that
%% can retry requests in the pipeline.
handle_info({'EXIT', _, _}, State) ->
    {noreply, State#state{status = close}}.
    

%%--------------------------------------------------------------------
%% Function: terminate(Reason, State) -> _  (ignored by gen_server)
%% Description: Shutdown the httpc_handler
%%--------------------------------------------------------------------

%% Init error there is no socket to be closed.
terminate(normal, #state{session = undefined}) ->
    ok;  

%% Init error sending, no session information has been setup but
%% there is a socket that needs closing.
terminate(normal, 
	  #state{request = Request,
		 session = #tcp_session{id     = undefined,
					socket = Socket}}) ->  
    http_transport:close(socket_type(Request), Socket);

%% Socket closed remotely
terminate(normal, 
	  #state{session = #tcp_session{socket = {remote_close, Socket},
					id     = Id}, 
		 profile_name = ProfileName,
		 request = Request,
		 timers  = Timers,
		 pipeline = Pipeline}) ->  
    %% Clobber session
    (catch httpc_manager:delete_session(Id, ProfileName)),

    %% Cancel timers
    #timers{request_timers = ReqTmrs, queue_timer = QTmr} = Timers, 
    cancel_timer(QTmr, timeout_queue),
    lists:foreach(fun({_, Timer}) -> cancel_timer(Timer, timeout) end, 
		  ReqTmrs),

    %% Maybe deliver answers to requests
    deliver_answers([Request | queue:to_list(Pipeline)]),

    %% And, just in case, close our side (**really** overkill)
    http_transport:close(socket_type(Request), Socket);

terminate(_, #state{session      = #tcp_session{id     = Id,
						socket = Socket, 
						scheme = Scheme},
		    request      = undefined,
		    profile_name = ProfileName,
		    timers       = Timers,
		    pipeline     = Pipeline,
		    keep_alive   = KeepAlive} = State) -> 
    (catch httpc_manager:delete_session(Id, ProfileName)),

    maybe_retry_queue(Pipeline, State),
    maybe_retry_queue(KeepAlive, State),

    cancel_timer(Timers#timers.queue_timer, timeout_queue),
    http_transport:close(socket_type(Scheme), Socket);

terminate(Reason, #state{request = undefined}) -> 
    ?hcrt("terminate", [{reason, Reason}]),
    ok;

terminate(Reason, #state{request = Request} = State) -> 
    ?hcrd("terminate", [{reason, Reason}, {request, Request}]),
    NewState = maybe_send_answer(Request, 
				 httpc_response:error(Request, Reason), 
				 State),
    terminate(Reason, NewState#state{request = undefined}).

maybe_retry_queue(Q, State) ->
    case queue:is_empty(Q) of 
	false ->
	    retry_pipeline(queue:to_list(Q), State);
	true ->
	    ok
    end.
    
maybe_send_answer(#request{from = answer_sent}, _Reason, State) ->
    State;
maybe_send_answer(Request, Answer, State) ->
    answer_request(Request, Answer, State).

deliver_answers([]) ->
    ?hcrd("deliver answer done", []),
    ok;
deliver_answers([#request{id = Id, from = From} = Request | Requests]) 
  when is_pid(From) ->
    Response = httpc_response:error(Request, socket_closed_remotely),
    ?hcrd("deliver answer", [{id, Id}, {from, From}, {response, Response}]),
    httpc_response:send(From, Response),
    deliver_answers(Requests);
deliver_answers([Request|Requests]) ->
    ?hcrd("skip deliver answer", [{request, Request}]),
    deliver_answers(Requests).


%%--------------------------------------------------------------------
%% Func: code_change(_OldVsn, State, Extra) -> {ok, NewState}
%% Purpose: Convert process state when code is changed
%%--------------------------------------------------------------------
code_change(_, #state{request = Request, pipeline = Queue} = State, 
	    [{from, '5.0.1'}, {to, '5.0.2'}]) ->
    Settings = new_http_options(Request#request.settings),
    NewRequest = Request#request{settings = Settings},
    NewQueue = new_queue(Queue, fun new_http_options/1),
    {ok, State#state{request = NewRequest, pipeline = NewQueue}};

code_change(_, #state{request = Request, pipeline = Queue} = State, 
	    [{from, '5.0.2'}, {to, '5.0.1'}]) ->
    Settings = old_http_options(Request#request.settings),
    NewRequest = Request#request{settings = Settings},
    NewQueue = new_queue(Queue, fun old_http_options/1),
    {ok, State#state{request = NewRequest, pipeline = NewQueue}};

code_change(_, State, _) ->
    {ok, State}.

new_http_options({http_options, TimeOut, AutoRedirect, SslOpts,
		  Auth, Relaxed}) ->
    {http_options, "HTTP/1.1", TimeOut, AutoRedirect, SslOpts,
     Auth, Relaxed}.

old_http_options({http_options, _, TimeOut, AutoRedirect,
		  SslOpts, Auth, Relaxed}) ->
    {http_options, TimeOut, AutoRedirect, SslOpts, Auth, Relaxed}.

new_queue(Queue, Fun) ->
    List = queue:to_list(Queue),
    NewList = 
	lists:map(fun(Request) ->
			  Settings = 
			      Fun(Request#request.settings),
			  Request#request{settings = Settings}
		  end, List),
    queue:from_list(NewList).
    
%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------

connect(SocketType, ToAddress, #options{ipfamily = IpFamily,
					ip       = FromAddress,
					port     = FromPort}, Timeout) ->
    Opts1 = 
	case FromPort of
	    default ->
		[];
	    _ ->
		[{port, FromPort}]
	end,
    Opts2 = 
	case FromAddress of
	    default ->
		Opts1;
	    _ ->
		[{ip, FromAddress} | Opts1]
	end,
    case IpFamily of
	inet6fb4 ->
	    Opts3 = [inet6 | Opts2],
	    case http_transport:connect(SocketType, ToAddress, Opts3, Timeout) of
		{error, Reason} when ((Reason =:= nxdomain) orelse 
				      (Reason =:= eafnosupport)) -> 
		    Opts4 = [inet | Opts2], 
		    http_transport:connect(SocketType, ToAddress, Opts4, Timeout);
		Other ->
		    Other
	    end;
	_ ->
	    Opts3 = [IpFamily | Opts2], 
	    http_transport:connect(SocketType, ToAddress, Opts3, Timeout)
    end.
		
connect_and_send_first_request(Address, 
			       #request{settings = Settings,
					headers  = Headers,
					address  = OrigAddress,
					scheme   = Scheme} = Request, 
			       #state{options = Options} = State) ->

    ?hcrd("connect", 
	  [{address, Address}, {request, Request}, {options, Options}]),

    SocketType  = socket_type(Request),
    ConnTimeout = Settings#http_options.connect_timeout,
    case connect(SocketType, Address, Options, ConnTimeout) of
	{ok, Socket} ->
	    ?hcrd("connected - now send first request", [{socket, Socket}]),
	    case httpc_request:send(Address, Request, Socket) of
		ok ->
		    ?hcrd("first request sent", []),
		    ClientClose = 
			httpc_request:is_client_closing(Headers),
		    SessionType = httpc_manager:session_type(Options),
		    Session =
			#tcp_session{id           = {OrigAddress, self()},
				     scheme       = Scheme,
				     socket       = Socket,
				     client_close = ClientClose,
				     type         = SessionType},
		    TmpState = 
			State#state{request = Request, 
				    session = Session, 
				    mfa     = init_mfa(Request, State),
				    status_line = init_status_line(Request),
				    headers = undefined,
				    body    = undefined,
				    status  = new},
		    ?hcrt("activate socket", []),
		    activate_once(Session),
		    NewState = activate_request_timeout(TmpState),
		    {ok, NewState};

		{error, Reason} -> 
		    ?hcrv("failed sending request", [{reason, Reason}]),
		    Error = {error, {send_failed, 
				     httpc_response:error(Request, Reason)}},
		    {stop, Error, State#state{request = Request}}
	    end;

	{error, Reason} -> 
	    ?hcri("connect failed", [{reason, Reason}]),
	    Error = {error, {connect_failed, 
			     httpc_response:error(Request, Reason)}},
	    {stop, Error, State#state{request = Request}}
    end.

handle_http_msg({Version, StatusCode, ReasonPharse, Headers, Body}, 
		State = #state{request = Request}) ->
    ?hcrt("handle_http_msg", [{body, Body}]),
    case Headers#http_response_h.'content-type' of
        "multipart/byteranges" ++ _Param ->
            exit({not_yet_implemented, multypart_nyteranges});
        _ ->
	    StatusLine       = {Version, StatusCode, ReasonPharse}, 
	    {ok, NewRequest} = start_stream(StatusLine, Headers, Request), 
            handle_http_body(Body, 
			     State#state{request     = NewRequest,
					 status_line = StatusLine, 
					 headers     = Headers})
    end;
handle_http_msg({ChunkedHeaders, Body}, #state{headers = Headers} = State) ->
    ?hcrt("handle_http_msg", 
	  [{chunked_headers, ChunkedHeaders}, {body, Body}]),
    NewHeaders = http_chunk:handle_headers(Headers, ChunkedHeaders),
    handle_response(State#state{headers = NewHeaders, body = Body});
handle_http_msg(Body, #state{status_line = {_,Code, _}} = State) ->
    ?hcrt("handle_http_msg", [{body, Body}, {code, Code}]),
    {NewBody, NewRequest} = stream(Body, State#state.request, Code),
    handle_response(State#state{body = NewBody, request = NewRequest}).

handle_http_body(<<>>, State = #state{status_line = {_,304, _}}) ->
    ?hcrt("handle_http_body - 304", []),
    handle_response(State#state{body = <<>>});

handle_http_body(<<>>, State = #state{status_line = {_,204, _}}) ->
    ?hcrt("handle_http_body - 204", []),
    handle_response(State#state{body = <<>>});

handle_http_body(<<>>, State = #state{request = #request{method = head}}) ->
    ?hcrt("handle_http_body - head", []),
    handle_response(State#state{body = <<>>});

handle_http_body(Body, State = #state{headers       = Headers, 
				      max_body_size = MaxBodySize,
				      status_line   = {_,Code, _},
				      request       = Request}) ->
    ?hcrt("handle_http_body", 
	  [{headers,       Headers}, 
	   {body,          Body}, 
	   {max_body_size, MaxBodySize}, 
	   {code,          Code}]),
    TransferEnc = Headers#http_response_h.'transfer-encoding',
    case case_insensitive_header(TransferEnc) of
        "chunked" ->
	    ?hcrt("handle_http_body - chunked", []),
	    case http_chunk:decode(Body, State#state.max_body_size, 
				   State#state.max_header_size, 
				   {Code, Request}) of
		{Module, Function, Args} ->
		    ?hcrt("handle_http_body - new mfa", 
			  [{module,   Module}, 
			   {function, Function}, 
			   {args,     Args}]),
		    NewState = next_body_chunk(State),
		    {noreply, NewState#state{mfa = 
					     {Module, Function, Args}}};
		{ok, {ChunkedHeaders, NewBody}} ->
		    ?hcrt("handle_http_body - new body", 
			  [{chunked_headers, ChunkedHeaders}, 
			   {new_body,        NewBody}]),
		    NewHeaders = http_chunk:handle_headers(Headers, 
							   ChunkedHeaders),
		    handle_response(State#state{headers = NewHeaders, 
						body    = NewBody})
	    end;
        Encoding when is_list(Encoding) ->
	    ?hcrt("handle_http_body - encoding", [{encoding, Encoding}]),
	    NewState = answer_request(Request, 
				      httpc_response:error(Request, 
							   unknown_encoding),
				      State),
	    {stop, normal, NewState};
        _ ->
	    ?hcrt("handle_http_body - other", []),
            Length =
                list_to_integer(Headers#http_response_h.'content-length'),
            case ((Length =< MaxBodySize) orelse (MaxBodySize =:= nolimit)) of
                true ->
                    case httpc_response:whole_body(Body, Length) of
                        {ok, Body} ->
			    {NewBody, NewRequest} = 
				stream(Body, Request, Code),
			    handle_response(State#state{body    = NewBody,
							request = NewRequest});
                        MFA ->
			    NewState = next_body_chunk(State),   
			    {noreply, NewState#state{mfa = MFA}}
		    end;
                false ->
		    NewState = 
			answer_request(Request,
				       httpc_response:error(Request, 
							    body_too_big),
				       State),
                    {stop, normal, NewState}
            end
    end.

handle_response(#state{status = new} = State) ->
    ?hcrd("handle response - status = new", []),
    handle_response(try_to_enable_pipeline_or_keep_alive(State));

handle_response(#state{request      = Request,
		       status       = Status,
		       session      = Session, 
		       status_line  = StatusLine,
		       headers      = Headers, 
		       body         = Body,
		       options      = Options,
		       profile_name = ProfileName} = State) 
  when Status =/= new ->
    
    ?hcrd("handle response", [{profile,     ProfileName},
			      {status,      Status},
			      {request,     Request},
			      {session,     Session}, 
			      {status_line, StatusLine}]),

    handle_cookies(Headers, Request, Options, ProfileName),
    case httpc_response:result({StatusLine, Headers, Body}, Request) of
	%% 100-continue
	continue -> 
	    ?hcrd("handle response - continue", []),
	    %% Send request body
	    {_, RequestBody} = Request#request.content,
	    http_transport:send(socket_type(Session#tcp_session.scheme), 
					    Session#tcp_session.socket, 
				RequestBody),
	    %% Wait for next response
	    http_transport:setopts(socket_type(Session#tcp_session.scheme), 
				   Session#tcp_session.socket, 
				   [{active, once}]),
	    Relaxed = (Request#request.settings)#http_options.relaxed,
	    MFA = {httpc_response, parse,
		   [State#state.max_header_size, Relaxed]}, 
	    {noreply, State#state{mfa         = MFA,
				  status_line = undefined,
				  headers     = undefined,
				  body        = undefined}};

	%% Ignore unexpected 100-continue response and receive the
	%% actual response that the server will send right away. 
	{ignore, Data} -> 
	    ?hcrd("handle response - ignore", [{data, Data}]),
	    Relaxed = (Request#request.settings)#http_options.relaxed,
	    MFA     = {httpc_response, parse,
		       [State#state.max_header_size, Relaxed]}, 
	    NewState = State#state{mfa         = MFA, 
				   status_line = undefined,
				   headers     = undefined,
				   body        = undefined},
	    handle_info({httpc_handler, dummy, Data}, NewState);

	%% On a redirect or retry the current request becomes 
	%% obsolete and the manager will create a new request 
	%% with the same id as the current.
	{redirect, NewRequest, Data} ->
	    ?hcrt("handle response - redirect", 
		  [{new_request, NewRequest}, {data, Data}]), 
	    ok = httpc_manager:redirect_request(NewRequest, ProfileName),
	    handle_queue(State#state{request = undefined}, Data);
	{retry, TimeNewRequest, Data} ->
	    ?hcrt("handle response - retry", 
		  [{time_new_request, TimeNewRequest}, {data, Data}]), 
	    ok = httpc_manager:retry_request(TimeNewRequest, ProfileName),
	    handle_queue(State#state{request = undefined}, Data);
	{ok, Msg, Data} ->
	    ?hcrd("handle response - ok", [{msg, Msg}, {data, Data}]),
	    end_stream(StatusLine, Request),
	    NewState = answer_request(Request, Msg, State),
	    handle_queue(NewState, Data); 
	{stop, Msg} ->
	    ?hcrd("handle response - stop", [{msg, Msg}]),
	    end_stream(StatusLine, Request),
	    NewState = answer_request(Request, Msg, State),
	    {stop, normal, NewState}
    end.

handle_cookies(_,_, #options{cookies = disabled}, _) ->
    ok;
%% User wants to verify the cookies before they are stored,
%% so the user will have to call a store command.
handle_cookies(_,_, #options{cookies = verify}, _) ->
    ok;
handle_cookies(Headers, Request, #options{cookies = enabled}, ProfileName) ->
    {Host, _ } = Request#request.address,
    Cookies = httpc_cookie:cookies(Headers#http_response_h.other, 
				  Request#request.path, Host),
    httpc_manager:store_cookies(Cookies, Request#request.address,
				ProfileName).

%% This request could not be pipelined or used as sequential keept alive
%% queue
handle_queue(#state{status = close} = State, _) ->
    {stop, normal, State};

handle_queue(#state{status = keep_alive} = State, Data) ->
    handle_keep_alive_queue(State, Data);

handle_queue(#state{status = pipeline} = State, Data) ->
    handle_pipeline(State, Data).

handle_pipeline(#state{status       = pipeline, 
		       session      = Session,
		       profile_name = ProfileName,
		       options      = #options{pipeline_timeout = TimeOut}} = 
		State, 
		Data) ->

    ?hcrd("handle pipeline", [{profile, ProfileName}, 
			      {session, Session},
			      {timeout, TimeOut}]),

    case queue:out(State#state.pipeline) of
	{empty, _} ->
	    ?hcrd("epmty pipeline queue", []),
	    
	    %% The server may choose too teminate an idle pipeline
	    %% in this case we want to receive the close message
	    %% at once and not when trying to pipeline the next
	    %% request.
	    activate_once(Session), 

	    %% If a pipeline that has been idle for some time is not
	    %% closed by the server, the client may want to close it.
	    NewState   = activate_queue_timeout(TimeOut, State),
	    NewSession = Session#tcp_session{queue_length = 0},
	    httpc_manager:insert_session(NewSession, ProfileName),
	    %% Note mfa will be initilized when a new request 
	    %% arrives.
	    {noreply, 
	     NewState#state{request     = undefined, 
			    mfa         = undefined,
			    status_line = undefined,
			    headers     = undefined,
			    body        = undefined}};
	{{value, NextRequest}, Pipeline} ->    
	    case lists:member(NextRequest#request.id, 
			      State#state.canceled) of		
		true ->
		    ?hcrv("next request had been cancelled", []),
		    %% See comment for handle_cast({cancel, RequestId})
		    {stop, normal, 
		     State#state{request = 
				 NextRequest#request{from = answer_sent}}};
		false ->
		    ?hcrv("next request", [{request, NextRequest}]),
		    NewSession = 
			Session#tcp_session{queue_length =
					    %% Queue + current
					    queue:len(Pipeline) + 1},
		    httpc_manager:insert_session(NewSession, ProfileName),
		    Relaxed = 
			(NextRequest#request.settings)#http_options.relaxed,
		    MFA = {httpc_response, 
			   parse,
			   [State#state.max_header_size, Relaxed]}, 
		    NewState = 
			State#state{pipeline    = Pipeline,
				    request     = NextRequest,
				    mfa         = MFA,
				    status_line = undefined,
				    headers     = undefined,
				    body        = undefined},
		    case Data of
			<<>> ->
			    http_transport:setopts(
			      socket_type(Session#tcp_session.scheme), 
			      Session#tcp_session.socket, 
			      [{active, once}]),
			    {noreply, NewState};
			_ ->
			    %% If we already received some bytes of
			    %% the next response
			    handle_info({httpc_handler, dummy, Data}, 
					NewState) 
		    end
	    end
    end.

handle_keep_alive_queue(
  #state{status       = keep_alive,
	 session      = Session,
	 profile_name = ProfileName,
	 options      = #options{keep_alive_timeout = TimeOut}} = State, 
  Data) ->

    ?hcrd("handle keep_alive", [{profile, ProfileName}, 
				{session, Session},
				{timeout, TimeOut}]),

    case queue:out(State#state.keep_alive) of
	{empty, _} ->
	    ?hcrd("epmty keep_alive queue", []),
	    %% The server may choose too terminate an idle keep_alive session
	    %% in this case we want to receive the close message
	    %% at once and not when trying to send the next
	    %% request.
	    http_transport:setopts(socket_type(Session#tcp_session.scheme), 
				   Session#tcp_session.socket, 
				   [{active, once}]),
	    %% If a keep_alive session has been idle for some time is not
	    %% closed by the server, the client may want to close it.
	    NewState = activate_queue_timeout(TimeOut, State),
	    NewSession = Session#tcp_session{queue_length = 0},
	    httpc_manager:insert_session(NewSession, ProfileName),
	    %% Note mfa will be initilized when a new request 
	    %% arrives.
	    {noreply, 
	     NewState#state{request = undefined, 
			    mfa = undefined,
			    status_line = undefined,
			    headers = undefined,
			    body = undefined
			   } 
	    };
	{{value, NextRequest}, KeepAlive} ->    
	    case lists:member(NextRequest#request.id, 
			      State#state.canceled) of		
		true ->
		    ?hcrv("next request has already been canceled", []),
		    handle_keep_alive_queue(
		      State#state{keep_alive = KeepAlive}, Data);
		false ->
		    ?hcrv("next request", [{request, NextRequest}]),
		    Relaxed = 
			(NextRequest#request.settings)#http_options.relaxed,
		    MFA     = {httpc_response, parse, 
			       [State#state.max_header_size, Relaxed]}, 
		    NewState =
			State#state{request     = NextRequest,
				    keep_alive  = KeepAlive,
				    mfa         = MFA,
				    status_line = undefined,
				    headers     = undefined,
				    body        = undefined},
		    case Data of
			<<>> ->
			    activate_once(Session), 
			    {noreply, NewState};
			_ ->
			    %% If we already received some bytes of
			    %% the next response
			    handle_info({httpc_handler, dummy, Data}, 
					NewState) 
		    end
	    end
    end.


case_insensitive_header(Str) when is_list(Str) ->
    http_util:to_lower(Str);
%% Might be undefined if server does not send such a header 
case_insensitive_header(Str) ->
    Str.

activate_once(#tcp_session{scheme = Scheme, socket = Socket}) ->
    SocketType = socket_type(Scheme),
    http_transport:setopts(SocketType, Socket, [{active, once}]).

activate_request_timeout(
  #state{request = #request{timer = undefined} = Request} = State) ->
    Timeout = (Request#request.settings)#http_options.timeout,
    case Timeout of
	infinity ->
	    State;
	_ ->
	    ReqId = Request#request.id, 
	    ?hcrt("activate request timer", 
		  [{request_id,    ReqId}, 
		   {time_consumed, t() - Request#request.started},
		   {timeout,       Timeout}]),
	    Msg       = {timeout, ReqId}, 
	    Ref       = erlang:send_after(Timeout, self(), Msg), 
	    Request2  = Request#request{timer = Ref}, 
	    ReqTimers = [{Request#request.id, Ref} |
			 (State#state.timers)#timers.request_timers],
	    Timers    = #timers{request_timers = ReqTimers}, 
	    State#state{request = Request2, timers = Timers}
    end;

%% Timer is already running! This is the case for a redirect or retry
activate_request_timeout(State) ->
    State.

activate_queue_timeout(infinity, State) ->
    State;
activate_queue_timeout(Time, State) ->
    Ref = erlang:send_after(Time, self(), timeout_queue),
    State#state{timers = #timers{queue_timer = Ref}}.


is_pipeline_enabled_client(#tcp_session{type = pipeline}) ->
    true;
is_pipeline_enabled_client(_) ->
    false.

is_keep_alive_enabled_server("HTTP/1." ++ N, _) when (hd(N) >= $1) ->
    true;
is_keep_alive_enabled_server("HTTP/1.0", 
			     #http_response_h{connection = "keep-alive"}) ->
    true;
is_keep_alive_enabled_server(_,_) ->
    false.

is_keep_alive_connection(Headers, Session) ->
    (not ((Session#tcp_session.client_close) or  
	  httpc_response:is_server_closing(Headers))).

try_to_enable_pipeline_or_keep_alive(
  #state{session      = Session, 
	 request      = #request{method = Method},
	 status_line  = {Version, _, _},
	 headers      = Headers,
	 profile_name = ProfileName} = State) ->
    case (is_keep_alive_enabled_server(Version, Headers) andalso 
	  is_keep_alive_connection(Headers, Session)) of
	true ->
	    case (is_pipeline_enabled_client(Session) andalso 
		  httpc_request:is_idempotent(Method)) of
		true ->
		    httpc_manager:insert_session(Session, ProfileName),
		    State#state{status = pipeline};
		false ->
		    httpc_manager:insert_session(Session, ProfileName),
		    %% Make sure type is keep_alive in session
		    %% as it in this case might be pipeline
		    NewSession = Session#tcp_session{type = keep_alive}, 
		    State#state{status  = keep_alive,
				session = NewSession}
	    end;
	false ->
	    State#state{status = close}
    end.

answer_request(Request, Msg, #state{timers = Timers} = State) -> 
    ?hcrt("answer request", [{request, Request}, {msg, Msg}]),
    httpc_response:send(Request#request.from, Msg),
    RequestTimers = Timers#timers.request_timers,
    TimerRef =
	proplists:get_value(Request#request.id, RequestTimers, undefined),
    Timer = {Request#request.id, TimerRef},
    cancel_timer(TimerRef, {timeout, Request#request.id}),
    State#state{request = Request#request{from = answer_sent},
		timers = 
		Timers#timers{request_timers =
			      lists:delete(Timer, RequestTimers)}}.
cancel_timer(undefined, _) ->
    ok;
cancel_timer(Timer, TimeoutMsg) ->
    erlang:cancel_timer(Timer),
    receive 
	TimeoutMsg ->
	    ok
    after 0 ->
	    ok
    end.

retry_pipeline([], _) ->
    ok;

%% Skip requests when the answer has already been sent
retry_pipeline([#request{from = answer_sent}|PipeLine], State) ->
    retry_pipeline(PipeLine, State);

retry_pipeline([Request | PipeLine],  
	      #state{timers       = Timers, 
		     profile_name = ProfileName} = State) ->
    NewState =
	case (catch httpc_manager:retry_request(Request, ProfileName)) of
	    ok ->
		RequestTimers = Timers#timers.request_timers,
		ReqId    = Request#request.id, 
		TimerRef =
		    proplists:get_value(ReqId, RequestTimers, undefined),
		cancel_timer(TimerRef, {timeout, ReqId}),
		NewReqsTimers = lists:delete({ReqId, TimerRef}, RequestTimers),
		NewTimers     = Timers#timers{request_timers = NewReqsTimers},
		State#state{timers = NewTimers};

	    Error ->
		answer_request(Request#request.from,
			       httpc_response:error(Request, Error), State) 
	end,
    retry_pipeline(PipeLine, NewState).

%%% Check to see if the given {Host,Port} tuple is in the NoProxyList
%%% Returns an eventually updated {Host,Port} tuple, with the proxy address
handle_proxy(HostPort = {Host, _Port}, {Proxy, NoProxy}) ->
    case Proxy of
	undefined ->
	    HostPort;
	Proxy ->
	    case is_no_proxy_dest(Host, NoProxy) of
		true ->
		    HostPort;
		false ->
		    Proxy
	    end
    end.

is_no_proxy_dest(_, []) ->
    false;
is_no_proxy_dest(Host, [ "*." ++ NoProxyDomain | NoProxyDests]) ->    
    
    case is_no_proxy_dest_domain(Host, NoProxyDomain) of
	true ->
	    true;
	false ->
	    is_no_proxy_dest(Host, NoProxyDests)
    end;

is_no_proxy_dest(Host, [NoProxyDest | NoProxyDests]) ->
    IsNoProxyDest = case http_util:is_hostname(NoProxyDest) of
			true ->
			    fun is_no_proxy_host_name/2;
			false ->
			    fun is_no_proxy_dest_address/2
		    end,
    
    case IsNoProxyDest(Host, NoProxyDest) of
	true ->
	    true;
	false ->
	    is_no_proxy_dest(Host, NoProxyDests)
    end.

is_no_proxy_host_name(Host, Host) ->
    true;
is_no_proxy_host_name(_,_) ->
    false.

is_no_proxy_dest_domain(Dest, DomainPart) ->
    lists:suffix(DomainPart, Dest).

is_no_proxy_dest_address(Dest, Dest) ->
    true;
is_no_proxy_dest_address(Dest, AddressPart) ->
    lists:prefix(AddressPart, Dest).

init_mfa(#request{settings = Settings}, State) ->
    case Settings#http_options.version of
	"HTTP/0.9" ->
	    {httpc_response, whole_body, [<<>>, -1]};
	_ ->
	    Relaxed = Settings#http_options.relaxed,
	    {httpc_response, parse, [State#state.max_header_size, Relaxed]}
    end.

init_status_line(#request{settings = Settings}) ->
    case Settings#http_options.version of
	"HTTP/0.9" ->
	    {"HTTP/0.9", 200, "OK"};
	_ ->
	    undefined
    end.

socket_type(#request{scheme = http}) ->
    ip_comm;
socket_type(#request{scheme = https, settings = Settings}) ->
    {ssl, Settings#http_options.ssl};
socket_type(http) ->
    ip_comm;
socket_type(https) ->
    {ssl, []}. %% Dummy value ok for ex setops that does not use this value

start_stream({_Version, _Code, _ReasonPhrase}, _Headers, 
	     #request{stream = none} = Request) ->
    ?hcrt("start stream - none", []), 
    {ok, Request};
start_stream({_Version, Code, _ReasonPhrase}, Headers, 
	     #request{stream = self} = Request) 
  when (Code =:= 200) orelse (Code =:= 206) ->
    ?hcrt("start stream - self", [{code, Code}]), 
    Msg = httpc_response:stream_start(Headers, Request, ignore),
    httpc_response:send(Request#request.from, Msg),
    {ok, Request};
start_stream({_Version, Code, _ReasonPhrase}, Headers, 
	     #request{stream = {self, once}} = Request) 
  when (Code =:= 200) orelse (Code =:= 206) ->
    ?hcrt("start stream - self:once", [{code, Code}]), 
    Msg = httpc_response:stream_start(Headers, Request, self()),
    httpc_response:send(Request#request.from, Msg),
    {ok, Request};    
start_stream({_Version, Code, _ReasonPhrase}, _Headers, 
	     #request{stream = Filename} = Request)
  when ((Code =:= 200) orelse (Code =:= 206)) andalso is_list(Filename) ->
    ?hcrt("start stream", [{code, Code}, {filename, Filename}]),
    case file:open(Filename, [write, raw, append, delayed_write]) of
        {ok, Fd} ->
            ?hcri("start stream - file open ok", [{fd, Fd}]),
            {ok, Request#request{stream = Fd}};
        {error, Reason} ->
            exit({stream_to_file_failed, Reason})
    end;
start_stream(_StatusLine, _Headers, Request) ->
    ?hcrt("start stream - no op", []),
    {ok, Request}.


%% Note the end stream message is handled by httpc_response and will
%% be sent by answer_request
end_stream(_, #request{stream = none}) ->
    ?hcrt("end stream - none", []), 
    ok;
end_stream(_, #request{stream = self}) ->
    ?hcrt("end stream - self", []), 
    ok;
end_stream(_, #request{stream = {self, once}}) ->
    ?hcrt("end stream - self:once", []), 
    ok;
end_stream({_,200,_}, #request{stream = Fd}) ->
    ?hcrt("end stream - 200", [{stream, Fd}]), 
    case file:close(Fd) of 
	ok ->
	    ok;
	{error, enospc} -> % Could be due to delayed_write
	    file:close(Fd)
    end;
end_stream({_,206,_}, #request{stream = Fd}) ->
    ?hcrt("end stream - 206", [{stream, Fd}]), 
    case file:close(Fd) of
       ok ->
           ok;
       {error, enospc} -> % Could be due to delayed_write
           file:close(Fd)
    end;
end_stream(SL, R) ->
    ?hcrt("end stream", [{status_line, SL}, {request, R}]), 
    ok.


next_body_chunk(#state{request = #request{stream = {self, once}}, 
		       once = once, session = Session} = State) ->
    http_transport:setopts(socket_type(Session#tcp_session.scheme), 
			   Session#tcp_session.socket, 
			   [{active, once}]),
    State#state{once = inactive};
next_body_chunk(#state{request = #request{stream = {self, once}}, 
		       once = inactive} = State) ->
    State; %% Wait for user to call stream_next
next_body_chunk(#state{session = Session} = State) ->
    http_transport:setopts(socket_type(Session#tcp_session.scheme), 
			   Session#tcp_session.socket, 
			   [{active, once}]),
    State.

handle_verbose(verbose) ->
    dbg:p(self(), [r]);
handle_verbose(debug) ->
    dbg:p(self(), [call]),
    dbg:tp(?MODULE, [{'_', [], [{return_trace}]}]);
handle_verbose(trace) ->
    dbg:p(self(), [call]),
    dbg:tpl(?MODULE, [{'_', [], [{return_trace}]}]);
handle_verbose(_) ->
    ok.    

%%% Normaly I do not comment out code, I throw it away. But this might
%%% actually be used one day if ssl is improved.
%% send_ssl_tunnel_request(Address, Request = #request{address = {Host, Port}}, 
%% 			State) ->
%%     %% A ssl tunnel request is a special http request that looks like
%%     %% CONNECT host:port HTTP/1.1
%%     SslTunnelRequest = #request{method = connect, scheme = http,
%% 				headers = 
%% 				#http_request_h{
%% 				  host = Host, 
%% 				  address = Address, 
%% 				  path = Host ++ ":",
%% 				  pquery = integer_to_list(Port),
%% 				  other = [{ "Proxy-Connection", "keep-alive"}]},
%%     Ipv6 = (State#state.options)#options.ipv6,
%%     SocketType = socket_type(SslTunnelRequest),
%%     case http_transport:connect(SocketType, 
%%                                 SslTunnelRequest#request.address, Ipv6) of
%% 	{ok, Socket} ->
%% 	    case httpc_request:send(Address, SslTunnelRequest, Socket) of
%% 		ok ->
%% 		    Session = #tcp_session{id = 
%% 					   {SslTunnelRequest#request.address,
%% 					    self()},
%% 					   scheme = 
%% 					   SslTunnelRequest#request.scheme,
%% 					   socket = Socket},
%% 		    NewState = State#state{mfa = 
%% 					   {httpc_response, parse,
%% 					    [State#state.max_header_size]},
%% 					   request = Request,
%% 					   session = Session},
%% 		    http_transport:setopts(socket_type(
%%                                           SslTunnelRequest#request.scheme), 
%% 					   Socket, 
%% 					   [{active, once}]),
%% 		    {ok, NewState};
%% 		{error, Reason} -> 
%% 		    self() ! {init_error, error_sending, 
%% 			      httpc_response:error(Request, Reason)},
%% 		    {ok, State#state{request = Request,
%% 				     session = #tcp_session{socket = 
%% 							    Socket}}}
%% 	    end;
%% 	{error, Reason} ->
%% 	    self() ! {init_error, error_connecting, 
%% 		      httpc_response:error(Request, Reason)},
%% 	    {ok, State#state{request = Request}}
%%     end.

%% d(F) ->
%%    d(F, []).

%% d(F, A) -> 
%%     d(get(dbg), F, A).

%% d(true, F, A) ->
%%     io:format(user, "~w:~w:" ++ F ++ "~n", [self(), ?MODULE | A]);
%% d(_, _, _) ->
%%     ok.


call(Msg, Pid) ->
    Timeout = infinity,
    call(Msg, Pid, Timeout).
call(Msg, Pid, Timeout) ->
    gen_server:call(Pid, Msg, Timeout).

cast(Msg, Pid) ->
    gen_server:cast(Pid, Msg).


%% to(To, Start) when is_integer(Start) andalso (Start >= 0) ->
%%     http_util:timeout(To, Start);
%% to(To, _Start) ->
%%     http_util:timeout(To, t()).

t() ->
    http_util:timestamp().