%%--------------------------------------------------------------------
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1997-2009. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
%%
%%-----------------------------------------------------------------
%% File: orber_iiop_inproxy.erl
%%
%% Description:
%% This file contains the IIOP "proxy" for incomming connections
%%
%%-----------------------------------------------------------------
-module(orber_iiop_inproxy).
-behaviour(gen_server).
-include_lib("orber/src/orber_iiop.hrl").
-include_lib("orber/include/corba.hrl").
%%-----------------------------------------------------------------
%% External exports
%%-----------------------------------------------------------------
-export([start/0, start/1]).
%%-----------------------------------------------------------------
%% Internal exports
%%-----------------------------------------------------------------
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
code_change/3, terminate/2, post_accept/3, stop/1]).
%%-----------------------------------------------------------------
%% Macros
%%-----------------------------------------------------------------
-define(DEBUG_LEVEL, 7).
-record(state, {stype, socket, db, timeout, max_fragments,
max_requests, request_counter = 1, giop_env, peer}).
%%-----------------------------------------------------------------
%% External interface functions
%%-----------------------------------------------------------------
%%-----------------------------------------------------------------
%% Func: start/0
%%-----------------------------------------------------------------
start() ->
ignore.
%%-----------------------------------------------------------------
%% Func: start/1
%%-----------------------------------------------------------------
start(Opts) ->
gen_server:start_link(orber_iiop_inproxy, Opts, []).
post_accept(Pid, ssl, Socket) ->
(catch gen_server:cast(Pid, {post_accept, ssl, Socket})),
ok;
post_accept(_, _, _) ->
ok.
%%-----------------------------------------------------------------
%% Internal interface functions
%%-----------------------------------------------------------------
%%-----------------------------------------------------------------
%% Func: stop/1
%%-----------------------------------------------------------------
stop(Pid) ->
gen_server:cast(Pid, stop).
%%-----------------------------------------------------------------
%% Server functions
%%-----------------------------------------------------------------
%%-----------------------------------------------------------------
%% Func: init/1
%%-----------------------------------------------------------------
init({connect, Type, Socket, Ref, Options}) ->
process_flag(trap_exit, true),
Flags = orber_tb:keysearch(flags, Options, orber_env:get_flags()),
{Address, Port} = PeerData = orber_socket:peerdata(Type, Socket),
{LAddress, LPort} = LocalData = orber_socket:sockdata(Type, Socket),
case {?ORB_FLAG_TEST(Flags, ?ORB_ENV_LOCAL_INTERFACE), LPort} of
{true, 0} ->
orber_tb:info("Unable to lookup the local address and port number.~n"
"Closing the incoming connection.", []),
ignore;
_ ->
orber_iiop_net:add_connection(Socket, Type, PeerData, LocalData, Ref),
Interceptors =
case orber_tb:keysearch(interceptors, Options,
orber_env:get_interceptors()) of
{native, PIs} ->
{native, orber_pi:new_in_connection(PIs, Address, Port,
LAddress, LPort), PIs};
Other ->
Other
end,
Env =
case ?ORB_FLAG_TEST(Flags, ?ORB_ENV_LOCAL_INTERFACE) of
true when Type == ssl ->
#giop_env{interceptors = Interceptors,
flags = Flags, host = [LAddress],
iiop_port =
orber_tb:keysearch(iiop_port, Options,
orber_env:iiop_port()),
iiop_ssl_port = LPort,
domain = orber:domain(),
partial_security = orber:partial_security()};
true ->
#giop_env{interceptors = Interceptors,
flags = Flags, host = [LAddress],
iiop_port = LPort,
iiop_ssl_port =
orber_tb:keysearch(iiop_ssl_port, Options,
orber_env:iiop_ssl_port()),
domain = orber:domain(),
partial_security = orber:partial_security()};
false ->
case ?ORB_FLAG_TEST(Flags, ?ORB_ENV_ENABLE_NAT) of
false ->
#giop_env{interceptors = Interceptors,
flags = Flags, host = orber:host(),
iiop_port = orber:iiop_port(),
iiop_ssl_port = orber:iiop_ssl_port(),
domain = orber:domain(),
partial_security = orber:partial_security()};
true ->
#giop_env{interceptors = Interceptors,
flags = Flags,
host =
orber_tb:keysearch(nat_ip_address, Options,
orber_env:nat_host()),
iiop_port =
orber_tb:keysearch(nat_iiop_port, Options,
orber_env:nat_iiop_port()),
iiop_ssl_port =
orber_tb:keysearch(nat_iiop_ssl_port, Options,
orber_env:nat_iiop_ssl_port()),
domain = orber:domain(),
partial_security = orber:partial_security()}
end
end,
Timeout = orber_tb:keysearch(iiop_in_connection_timeout, Options,
orber_env:iiop_in_connection_timeout()),
MaxFrags = orber_tb:keysearch(iiop_max_fragments, Options,
orber_env:iiop_max_fragments()),
MaxRequests = orber_tb:keysearch(iiop_max_in_requests, Options,
orber_env:iiop_max_in_requests()),
{ok, #state{stype = Type,
socket = Socket,
db = ets:new(orber_incoming_requests, [set]),
timeout = Timeout,
max_fragments = MaxFrags,
max_requests = MaxRequests,
giop_env = Env, peer = PeerData}, Timeout}
end.
%%-----------------------------------------------------------------
%% Func: terminate/2
%%-----------------------------------------------------------------
%% We may want to kill all proxies before terminating, but the best
%% option should be to let the requests complete (especially for one-way
%% functions it's a better alternative.
terminate(_Reason, #state{db = IncRequests, giop_env = Env}) ->
ets:delete(IncRequests),
case Env#giop_env.interceptors of
false ->
ok;
{native, Ref, PIs} ->
orber_pi:closed_in_connection(PIs, Ref);
{_Type, _PIs} ->
ok
end.
%%-----------------------------------------------------------------
%% Func: handle_call/3
%%-----------------------------------------------------------------
handle_call(stop, _From, State) ->
{stop, normal, ok, State};
handle_call(_, _, State) ->
{noreply, State, State#state.timeout}.
%%-----------------------------------------------------------------
%% Func: handle_cast/2
%%-----------------------------------------------------------------
handle_cast({post_accept, Type, Socket}, State) ->
Timeout = orber_env:iiop_ssl_accept_timeout(),
case catch orber_socket:post_accept(Type, Socket, Timeout) of
ok ->
{noreply, State};
_Failed ->
orber_socket:close(Type, Socket),
{stop, normal, State}
end;
handle_cast(stop, State) ->
{stop, normal, State};
handle_cast(_, State) ->
{noreply, State, State#state.timeout}.
%%-----------------------------------------------------------------
%% Func: handle_info/2
%%-----------------------------------------------------------------
%% Normal invocation
handle_info({tcp, Socket, Bytes}, State) ->
handle_msg(normal, Socket, Bytes, State);
handle_info({ssl, Socket, Bytes}, State) ->
handle_msg(ssl, Socket, Bytes, State);
%% Errors, closed connection
handle_info({tcp_closed, _Socket}, State) ->
{stop, normal, State};
handle_info({tcp_error, _Socket, _Reason}, State) ->
{stop, normal, State};
handle_info({ssl_closed, _Socket}, State) ->
{stop, normal, State};
handle_info({ssl_error, _Socket, _Reason}, State) ->
{stop, normal, State};
%% Servant termination.
handle_info({'EXIT', Pid, normal}, State) ->
ets:delete(State#state.db, Pid),
{noreply, decrease_counter(State), State#state.timeout};
handle_info({message_error, _Pid, ReqId}, State) ->
ets:delete(State#state.db, ReqId),
{noreply, State, State#state.timeout};
handle_info(timeout, State) ->
case ets:info(State#state.db, size) of
0 ->
%% No pending requests, close the connection.
{stop, normal, State};
_Amount ->
%% Still pending request, cannot close the connection.
{noreply, State, State#state.timeout}
end;
handle_info({reconfigure, Options}, State) ->
{noreply, update_state(State, Options), State#state.timeout};
handle_info(_X,State) ->
{noreply, State, State#state.timeout}.
handle_msg(Type, Socket, Bytes, #state{stype = Type, socket = Socket,
giop_env = Env} = State) ->
case catch cdr_decode:dec_giop_message_header(Bytes) of
%% Only when using IIOP-1.2 may the client send this message.
%% Introduced in CORBA-2.6
#giop_message{message_type = ?GIOP_MSG_CLOSE_CONNECTION,
giop_version = {1,2}} ->
{stop, normal, State};
#giop_message{message_type = ?GIOP_MSG_CLOSE_CONNECTION} ->
{noreply, State, State#state.timeout};
#giop_message{message_type = ?GIOP_MSG_CANCEL_REQUEST} = GIOPHdr ->
ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order,
GIOPHdr#giop_message.message),
case ets:lookup(State#state.db, ReqId) of
[{RId, PPid}] ->
ets:delete(State#state.db, RId),
PPid ! {self(), cancel_request_header};
[] ->
send_msg_error(Type, Socket, Bytes,
Env#giop_env{version =
GIOPHdr#giop_message.giop_version},
"No such request id")
end,
{noreply, State, State#state.timeout};
%% A fragment; we must have received a Request or LocateRequest
%% with fragment-flag set to true.
%% We need to decode the header to get the request-id.
#giop_message{message_type = ?GIOP_MSG_FRAGMENT,
giop_version = {1,2}} = GIOPHdr ->
ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order,
GIOPHdr#giop_message.message),
case ets:lookup(State#state.db, ReqId) of
[{_RId, PPid}] when GIOPHdr#giop_message.fragments == true ->
PPid ! {self(), GIOPHdr};
[{RId, PPid}] ->
ets:delete(State#state.db, RId),
PPid ! {self(), GIOPHdr};
[] ->
send_msg_error(Type, Socket, Bytes,
Env#giop_env{version =
GIOPHdr#giop_message.giop_version},
"No such fragment id")
end,
{noreply, State, State#state.timeout};
%% Must be a Request or LocateRequest which have been fragmented.
%% We need to decode the header to get the request-id.
#giop_message{fragments = true,
giop_version = {1,2}} = GIOPHdr ->
ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order,
GIOPHdr#giop_message.message),
Pid =
orber_iiop_inrequest:
start_fragment_collector(GIOPHdr, Bytes,
Type, Socket,
ReqId, self(),
State#state.max_fragments,
Env#giop_env{version = {1,2},
request_id = ReqId}),
ets:insert(State#state.db, {Pid, ReqId}),
ets:insert(State#state.db, {ReqId, Pid}),
{noreply, increase_counter(State), State#state.timeout};
GIOPHdr when is_record(GIOPHdr, giop_message) ->
Pid = orber_iiop_inrequest:start(GIOPHdr, Bytes, Type, Socket,
Env#giop_env{version =
GIOPHdr#giop_message.giop_version}),
ets:insert(State#state.db, {Pid, undefined}),
{noreply, increase_counter(State), State#state.timeout};
{'EXIT', message_error} ->
send_msg_error(Type, Socket, Bytes,
Env#giop_env{version = orber_env:giop_version()},
"Unable to decode the GIOP-header"),
{noreply, State, State#state.timeout}
end;
handle_msg(Type, _, Bytes, State) ->
orber:dbg("[~p] orber_iiop_inproxy:handle_msg(~p);~n"
"Received a message from a socket of a different type.~n"
"Should be ~p but was ~p.",
[?LINE, Bytes, State#state.stype, Type], ?DEBUG_LEVEL),
{noreply, State, State#state.timeout}.
send_msg_error(Type, Socket, Data, Env, Msg) ->
orber:dbg("[~p] orber_iiop_inproxy:handle_msg(~p); ~p.",
[?LINE, Data, Msg], ?DEBUG_LEVEL),
Reply = cdr_encode:enc_message_error(Env),
orber_socket:write(Type, Socket, Reply).
increase_counter(#state{max_requests = infinity} = State) ->
State;
increase_counter(#state{max_requests = Max,
request_counter = Counter} = State) when Max > Counter ->
orber_socket:setopts(State#state.stype, State#state.socket, [{active, once}]),
State#state{request_counter = Counter + 1};
increase_counter(State) ->
State#state{request_counter = State#state.request_counter + 1}.
decrease_counter(#state{max_requests = infinity} = State) ->
State;
decrease_counter(#state{max_requests = Max,
request_counter = Counter} = State) when Max =< Counter ->
orber_socket:setopts(State#state.stype, State#state.socket, [{active, once}]),
State#state{request_counter = Counter - 1};
decrease_counter(State) ->
State#state{request_counter = State#state.request_counter - 1}.
update_state(#state{giop_env = Env} = State,
[{interceptors, false}|Options]) ->
update_state(State#state{giop_env =
Env#giop_env{interceptors = false}}, Options);
update_state(#state{giop_env = #giop_env{interceptors = false, host = [SH],
iiop_port = SP} = Env,
peer = {PH, PP}, stype = normal} = State,
[{interceptors, {native, LPIs}}|Options]) ->
%% No Interceptor(s). Add the same Ref used by the built in interceptors.
update_state(State#state{giop_env =
Env#giop_env{interceptors =
{native, {PH, PP, SH, SP}, LPIs}}},
Options);
update_state(#state{giop_env = #giop_env{interceptors = false, host = [SH],
iiop_ssl_port = SP} = Env,
peer = {PH, PP}, stype = ssl} = State,
[{interceptors, {native, LPIs}}|Options]) ->
%% No Interceptor(s). Add the same Ref used by the built in interceptors.
update_state(State#state{giop_env =
Env#giop_env{interceptors =
{native, {PH, PP, SH, SP}, LPIs}}},
Options);
update_state(#state{giop_env = #giop_env{interceptors = {native, Ref, _}} = Env} =
State,
[{interceptors, {native, LPIs}}|Options]) ->
%% Interceptor(s) already in use. We must use the same Ref as before.
update_state(State#state{giop_env =
Env#giop_env{interceptors = {native, Ref, LPIs}}},
Options);
update_state(State, [H|T]) ->
orber:dbg("[~p] orber_iiop_inproxy:update_state(~p, ~p)~n"
"Couldn't change the state.",
[?LINE, H, State], ?DEBUG_LEVEL),
update_state(State, T);
update_state(State, []) ->
State.
%%-----------------------------------------------------------------
%% Func: code_change/3
%%-----------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.