aboutsummaryrefslogblamecommitdiffstats
path: root/lib/dialyzer/test/r9c_SUITE_data/src/inets/httpc_manager.erl
blob: 29659ce1cea1cdfa5e741ffc636c59cacf624b3c (plain) (tree)
1
2
3
4
5
6
7
8
9
10




                                                                        
  



                                                                         
  


                                                                     
  





















































































                                                                            
 



                                     
 










































                                                                              
                                                                

                                                                                                  
                                                                      

































                                                                          
                        

























































































































































                                                                                                    
         
























































































































                                                                                
                         




























































                                                                                   
                    













                                                                                    
%% ``The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved via the world wide web at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% The Initial Developer of the Original Code is Mobile Arts AB
%% Portions created by Mobile Arts are Copyright 2002, Mobile Arts AB
%% All Rights Reserved.''
%%
%%
%% Created : 18 Dec 2001 by Johan Blom <[email protected]>
%%

-module(httpc_manager).

-behaviour(gen_server).

-include("http.hrl").

-define(HMACALL, ?MODULE).
-define(HMANAME, ?MODULE).

%%--------------------------------------------------------------------
%% External exports
-export([start_link/0,start/0,
	 request/1,cancel_request/1,
	 next_request/2,
	 register_socket/3,
	 abort_session/3,close_session/2,close_session/3
	]).

%% Debugging only
-export([status/0]).

%% gen_server callbacks
-export([init/1,handle_call/3,handle_cast/2,handle_info/2,terminate/2,
	 code_change/3]).

%%% address_db - ets() Contains mappings from a tuple {Host,Port} to a tuple
%%%  {LastSID,OpenSessions,ets()} where
%%%      LastSid is the last allocated session id,
%%%      OpenSessions is the number of currently open sessions and
%%%      ets() contains mappings from Session Id to #session{}.
%%%
%%% Note:
%%% - Only persistent connections are stored in address_db
%%% - When automatically redirecting, multiple requests are performed.
-record(state,{
	  address_db,   % ets()
	  reqid         % int() Next Request id to use (identifies request).
	 }).

%%====================================================================
%% External functions
%%====================================================================
%%--------------------------------------------------------------------
%% Function: start_link/0
%% Description: Starts the server
%%--------------------------------------------------------------------
start() ->
    ensure_started().

start_link() ->
    gen_server:start_link({local,?HMACALL}, ?HMANAME, [], []).


%% Find available session process and store in address_db. If no
%% available, start new handler process.
request(Req) ->
    ensure_started(),
    ClientClose=http_lib:connection_close(Req#request.headers),
    gen_server:call(?HMACALL,{request,ClientClose,Req},infinity).

cancel_request(ReqId) ->
    gen_server:call(?HMACALL,{cancel_request,ReqId},infinity).


%%% Close Session
close_session(Addr,Sid) ->
    gen_server:call(?HMACALL,{close_session,Addr,Sid},infinity).
close_session(Req,Addr,Sid) ->
    gen_server:call(?HMACALL,{close_session,Req,Addr,Sid},infinity).

abort_session(Addr,Sid,Msg) ->
    gen_server:call(?HMACALL,{abort_session,Addr,Sid,Msg},infinity).


%%%  Pick next in request que
next_request(Addr,Sid) ->
    gen_server:call(?HMACALL,{next_request,Addr,Sid},infinity).

%%% Session handler has succeded to set up a new session, now register
%%% the socket
register_socket(Addr,Sid,Socket) ->
    gen_server:cast(?HMACALL,{register_socket,Addr,Sid,Socket}).


%%% Debugging
status() ->
    gen_server:cast(?HMACALL,status).


%%--------------------------------------------------------------------
%% Function: init/1
%% Description: Initiates the server
%% Returns: {ok, State}          |
%%          {ok, State, Timeout} |
%%          ignore               |
%%          {stop, Reason}
%%--------------------------------------------------------------------
init([]) ->
    process_flag(trap_exit, true),
    {ok,#state{address_db=ets:new(address_db,[private]),
	       reqid=0}}.


%%--------------------------------------------------------------------
%% Function: handle_call/3
%% Description: Handling call messages
%% Returns: {reply, Reply, State}          |
%%          {reply, Reply, State, Timeout} |
%%          {noreply, State}               |
%%          {noreply, State, Timeout}      |
%%          {stop, Reason, Reply, State}   | (terminate/2 is called)
%%          {stop, Reason, State}            (terminate/2 is called)
%%--------------------------------------------------------------------
%%% Note:
%%% - We may have multiple non-persistent connections, each will be handled in
%%%   separate processes, thus don't add such connections to address_db
handle_call({request,false,Req},_From,State) ->
    case ets:lookup(State#state.address_db,Req#request.address) of
	[] ->
	    STab=ets:new(session_db,[private,{keypos,2},set]),
	    case persistent_new_session_request(0,Req,STab,State) of
		{Reply,LastSid,State2} ->
		    ets:insert(State2#state.address_db,
			       {Req#request.address,{LastSid,1,STab}}),
		    {reply,Reply,State2};
		{ErrorReply,State2} ->
		    {reply,ErrorReply,State2}
	    end;
	[{_,{LastSid,OpenS,STab}}] ->
	    case lookup_session_entry(STab) of
		{ok,Session} ->
		    old_session_request(Session,Req,STab,State);
		need_new_session when OpenS<(Req#request.settings)#client_settings.max_sessions ->
		    case persistent_new_session_request(LastSid,Req,
							STab,State) of
			{Reply,LastSid2,State2} ->
			    ets:insert(State2#state.address_db,
				       {Req#request.address,
					{LastSid2,OpenS+1,STab}}),
			    {reply,Reply,State2};
			{ErrorReply,State2} ->
			    {reply,ErrorReply,State2}
		    end;
		need_new_session ->
		    {reply,{error,too_many_sessions},State}
	    end
    end;
handle_call({request,true,Req},_From,State) ->
    {Reply,State2}=not_persistent_new_session_request(Req,State),
    {reply,Reply,State2};
handle_call({cancel_request,true,_ReqId},_From,State) ->
%% FIXME Should be possible to scan through all requests made, but perhaps
%% better to give some more hints (such as Addr etc)
    Reply=ok,
    {reply,Reply,State};
handle_call({next_request,Addr,Sid},_From,State) ->
    case ets:lookup(State#state.address_db,Addr) of
	[] ->
	    {reply,{error,no_connection},State};
	[{_,{_,_,STab}}] ->
	    case ets:lookup(STab,Sid) of
		[] ->
		    {reply,{error,session_not_registered},State};
		[S=#session{pipeline=[],quelength=QueLen}] ->
		    if
			QueLen==1 ->
			    ets:insert(STab,S#session{quelength=0});
			true ->
			    ok
		    end,
		    {reply,no_more_requests,State};
		[S=#session{pipeline=Que}] ->
		    [Req|RevQue]=lists:reverse(Que),
		    ets:insert(STab,S#session{pipeline=lists:reverse(RevQue),
					      quelength=S#session.quelength-1}),
		    {reply,Req,State}
	    end
    end;
handle_call({close_session,Addr,Sid},_From,State) ->
    case ets:lookup(State#state.address_db,Addr) of
	[] ->
	    {reply,{error,no_connection},State};
	[{_,{LastSid,OpenS,STab}}] ->
	    case ets:lookup(STab,Sid) of
		[#session{pipeline=Que}] ->
		    R=handle_close_session(lists:reverse(Que),STab,Sid,State),
		    ets:insert(State#state.address_db,
			       {Addr,{LastSid,OpenS-1,STab}}),
		    {reply,R,State};
		[] ->
		    {reply,{error,session_not_registered},State}
	    end
    end;
handle_call({close_session,Req,Addr,Sid},_From,State) ->
    case ets:lookup(State#state.address_db,Addr) of
	[] ->
	    {reply,{error,no_connection},State};
	[{_,{LastSid,OpenS,STab}}] ->
	    case ets:lookup(STab,Sid) of
		[#session{pipeline=Que}] ->
		    R=handle_close_session([Req|lists:reverse(Que)],
					   STab,Sid,State),
		    ets:insert(State#state.address_db,
			       {Addr,{LastSid,OpenS-1,STab}}),
		    {reply,R,State};
		[] ->
		    {reply,{error,session_not_registered},State}
	    end
    end;
handle_call({abort_session,Addr,Sid,Msg},_From,State) ->
    case ets:lookup(State#state.address_db,Addr) of
	[] ->
	    {reply,{error,no_connection},State};
	[{_,{LastSid,OpenS,STab}}] ->
	    case ets:lookup(STab,Sid) of
		[#session{pipeline=Que}] ->
		    R=abort_request_que(Que,{error,Msg}),
		    ets:delete(STab,Sid),
		    ets:insert(State#state.address_db,
			       {Addr,{LastSid,OpenS-1,STab}}),
		    {reply,R,State};
		[] ->
		    {reply,{error,session_not_registered},State}
	    end
    end.


%%--------------------------------------------------------------------
%% Function: handle_cast/2
%% Description: Handling cast messages
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%--------------------------------------------------------------------
handle_cast(status, State) ->
    io:format("Status:~n"),
    print_all(lists:sort(ets:tab2list(State#state.address_db))),
    {noreply, State};
handle_cast({register_socket,Addr,Sid,Socket},State) ->
    case ets:lookup(State#state.address_db,Addr) of
	[] ->
	    {noreply,State};
	[{_,{_,_,STab}}] ->
	    case ets:lookup(STab,Sid) of
		[Session] ->
		    ets:insert(STab,Session#session{socket=Socket}),
		    {noreply,State};
		[] ->
		    {noreply,State}
	    end
    end.

print_all([]) ->
    ok;
print_all([{Addr,{LastSid,OpenSessions,STab}}|Rest]) ->
    io:format(" Address:~p LastSid=~p OpenSessions=~p~n",[Addr,LastSid,OpenSessions]),
    SortedList=lists:sort(fun(A,B) ->
				  if
				      A#session.id<B#session.id ->
					  true;
				      true ->
					  false
				  end
			  end,ets:tab2list(STab)),
    print_all2(SortedList),
    print_all(Rest).

print_all2([]) ->
    ok;
print_all2([Session|Rest]) ->
    io:format("   Session:~p~n",[Session#session.id]),
    io:format("     Client close:~p~n",[Session#session.clientclose]),
    io:format("     Socket:~p~n",[Session#session.socket]),
    io:format("     Pipe: length=~p Que=~p~n",[Session#session.quelength,Session#session.pipeline]),
    print_all2(Rest).

%%--------------------------------------------------------------------
%% Function: handle_info/2
%% Description: Handling all non call/cast messages
%% Returns: {noreply, State}          |
%%          {noreply, State, Timeout} |
%%          {stop, Reason, State}            (terminate/2 is called)
%%--------------------------------------------------------------------
handle_info({'EXIT',_Pid,normal}, State) ->
    {noreply, State};
handle_info(Info, State) ->
    io:format("ERROR httpc_manager:handle_info ~p~n",[Info]),
    {noreply, State}.

%%--------------------------------------------------------------------
%% Function: terminate/2
%% Description: Shutdown the server
%% Returns: any (ignored by gen_server)
%%--------------------------------------------------------------------
terminate(_Reason, State) ->
    ets:delete(State#state.address_db).

%%--------------------------------------------------------------------
%% Func: code_change/3
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState}
%%--------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------

%%% From RFC 2616, Section 8.1.4
%%%   A client, server, or proxy MAY close the transport connection at any
%%%   time. For example, a client might have started to send a new request
%%%   at the same time that the server has decided to close the "idle"
%%%   connection. From the server's point of view, the connection is being
%%%   closed while it was idle, but from the client's point of view, a
%%%   request is in progress.
%%%
%%%   This means that clients, servers, and proxies MUST be able to recover
%%%   from asynchronous close events. Client software SHOULD reopen the
%%%   transport connection and retransmit the aborted sequence of requests
%%%   without user interaction so long as the request sequence is
%%%   idempotent (see section 9.1.2). Non-idempotent methods or sequences
%%%
%%% FIXME
%%% Note:
%%% - If this happen (server close because of idle) there can't be any requests
%%%   in the que.
%%% - This is the main function for closing of sessions
handle_close_session([],STab,Sid,_State) ->
    ets:delete(STab,Sid);
handle_close_session(Que,STab,Sid,_State) ->
    ets:delete(STab,Sid),
    abort_request_que(Que,{error,aborted_request}).


%%% From RFC 2616, Section 8.1.2.2
%%%   Clients which assume persistent connections and pipeline immediately
%%%   after connection establishment SHOULD be prepared to retry their
%%%   connection if the first pipelined attempt fails. If a client does
%%%   such a retry, it MUST NOT pipeline before it knows the connection is
%%%   persistent. Clients MUST also be prepared to resend their requests if
%%%   the server closes the connection before sending all of the
%%%   corresponding responses.
%%% FIXME! I'm currently not checking if tis is the first attempt on the session
%%% FIXME! Pipeline size must be dynamically variable (e.g. 0 if resend, 2 else)
%%% The que contains requests that have been sent ok previously, but the session
%%% was closed prematurely when reading the response.
%%% Try setup a new session and resend these requests.
%%% Note:
%%% - This MUST be a persistent session
% handle_closed_pipelined_session_que([],_State) ->
%     ok;
% handle_closed_pipelined_session_que(_Que,_State) ->
%     ok.


%%% From RFC 2616, Section 8.2.4
%%%   If an HTTP/1.1 client sends a request which includes a request body,
%%%   but which does not include an Expect request-header field with the
%%%   "100-continue" expectation, and if the client is not directly
%%%   connected to an HTTP/1.1 origin server, and if the client sees the
%%%   connection close before receiving any status from the server, the
%%%   client SHOULD retry the request.  If the client does retry this
%%%   request, it MAY use the following "binary exponential backoff"
%%%   algorithm to be assured of obtaining a reliable response:
%%%   ...
%%% FIXME! I'm currently not checking if a "Expect: 100-continue" has been sent.
% handle_remotely_closed_session_que([],_State) ->
%     ok;
% handle_remotely_closed_session_que(_Que,_State) ->
% %    resend_que(Que,Socket),
%     ok.

%%% Resend all requests in the request que
% resend_que([],_) ->
%     ok;
% resend_que([Req|Que],Socket) ->
%     case catch httpc_handler:http_request(Req,Socket) of
% 	ok ->
% 	    resend_que(Que,Socket);
% 	{error,Reason} ->
% 	    {error,Reason}
%     end.


%%% From RFC 2616,
%%% Section 8.1.2.2:
%%%   Clients SHOULD NOT pipeline requests using non-idempotent methods or
%%%   non-idempotent sequences of methods (see section 9.1.2). Otherwise, a
%%%   premature termination of the transport connection could lead to
%%%   indeterminate results. A client wishing to send a non-idempotent
%%%   request SHOULD wait to send that request until it has received the
%%%   response status for the previous request.
%%% Section 9.1.2:
%%%   Methods can also have the property of "idempotence" in that (aside
%%%   from error or expiration issues) the side-effects of N > 0 identical
%%%   requests is the same as for a single request. The methods GET, HEAD,
%%%   PUT and DELETE share this property. Also, the methods OPTIONS and
%%%   TRACE SHOULD NOT have side effects, and so are inherently idempotent.
%%%
%%% Note that POST and CONNECT are idempotent methods.
%%%
%%% Tries to find an open, free session i STab. Such a session has quelength
%%% less than ?MAX_PIPELINE_LENGTH
%%% Don't care about non-standard, user defined methods.
%%%
%%% Returns {ok,Session} or need_new_session where
%%%  Session is the session that may be used
lookup_session_entry(STab) ->
    MS=[{#session{quelength='$1',max_quelength='$2',
		  id='_',clientclose='_',socket='$3',scheme='_',pipeline='_'},
	 [{'<','$1','$2'},{is_port,'$3'}],
	 ['$_']}],
    case ets:select(STab,MS) of
	[] ->
	    need_new_session;
	SessionList -> % Now check if any of these has an empty pipeline.
	    case lists:keysearch(0,2,SessionList) of
		{value,Session} ->
		    {ok,Session};
		false ->
		    {ok,hd(SessionList)}
	    end
    end.


%%% Returns a tuple {Reply,State} where
%%%  Reply is the response sent back to the application
%%%
%%% Note:
%%% - An {error,einval} from a send should sometimes rather be {error,closed}
%%% - Don't close the session from here, let httpc_handler take care of that.
%old_session_request(Session,Req,STab,State)
%  when (Req#request.settings)#client_settings.max_quelength==0 ->
%    Session1=Session#session{pipeline=[Req]},
%    ets:insert(STab,Session1),
%    {reply,{ok,ReqId},State#state{reqid=ReqId+1}};
old_session_request(Session,Req,STab,State) ->
    ReqId=State#state.reqid,
    Req1=Req#request{id=ReqId},
    case catch httpc_handler:http_request(Req1,Session#session.socket) of
	ok ->
	    Session1=Session#session{pipeline=[Req1|Session#session.pipeline],
				     quelength=Session#session.quelength+1},
	    ets:insert(STab,Session1),
	    {reply,{ok,ReqId},State#state{reqid=ReqId+1}};
	{error,Reason} ->
	    ets:insert(STab,Session#session{socket=undefined}),
%	    http_lib:close(Session#session.sockettype,Session#session.socket),
	    {reply,{error,Reason},State#state{reqid=ReqId+1}}
    end.

%%% Returns atuple {Reply,Sid,State} where
%%%  Reply is the response sent back to the application, and
%%%  Sid is the last used Session Id
persistent_new_session_request(Sid,Req,STab,State) ->
    ReqId=State#state.reqid,
    case setup_new_session(Req#request{id=ReqId},false,Sid) of
	{error,Reason} ->
	    {{error,Reason},State#state{reqid=ReqId+1}};
	{NewSid,Session} ->
	    ets:insert(STab,Session),
	    {{ok,ReqId},NewSid,State#state{reqid=ReqId+1}}
    end.

%%% Returns a tuple {Reply,State} where
%%%  Reply is the response sent back to the application
not_persistent_new_session_request(Req,State) ->
    ReqId=State#state.reqid,
    case setup_new_session(Req#request{id=ReqId},true,undefined) of
	{error,Reason} ->
	    {{error,Reason},State#state{reqid=ReqId+1}};
	ok ->
	    {{ok,ReqId},State#state{reqid=ReqId+1}}
    end.

%%% As there are no sessions available, setup a new session and send the request
%%% on it.
setup_new_session(Req,ClientClose,Sid) ->
    S=#session{id=Sid,clientclose=ClientClose,
	       scheme=Req#request.scheme,
	       max_quelength=(Req#request.settings)#client_settings.max_quelength},
    spawn_link(httpc_handler,init_connection,[Req,S]),
    case ClientClose of
	false ->
	    {Sid+1,S};
	true ->
	    ok
    end.


%%% ----------------------------------------------------------------------------
%%% Abort all requests in the request que.
abort_request_que([],_Msg) ->
    ok;
abort_request_que([#request{from=From,ref=Ref,id=Id}|Que],Msg) ->
    gen_server:cast(From,{Ref,Id,Msg}),
    abort_request_que(Que,Msg);
abort_request_que(#request{from=From,ref=Ref,id=Id},Msg) ->
    gen_server:cast(From,{Ref,Id,Msg}).


%%% --------------------------------
%            C={httpc_manager,{?MODULE,start_link,[]},permanent,1000,
%	       worker,[?MODULE]},
%            supervisor:start_child(inets_sup, C),
ensure_started() ->
    case whereis(?HMANAME) of
        undefined ->
	    start_link();
        _ ->
	    ok
    end.


%%% ============================================================================
%%% This is deprecated code, to be removed

% format_time() ->
%     {_,_,MicroSecs}=TS=now(),
%     {{Y,Mon,D},{H,M,S}}=calendar:now_to_universal_time(TS),
%     lists:flatten(io_lib:format("~4.4.0w-~2.2.0w-~2.2.0w,~2.2.0w:~2.2.0w:~6.3.0f",
% 				[Y,Mon,D,H,M,S+(MicroSecs/1000000)])).