%%-------------------------------------------------------------------- %% %% %CopyrightBegin% %% %% Copyright Ericsson AB 1997-2009. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. %% %% %CopyrightEnd% %% %% %%----------------------------------------------------------------- %% File: orber_iiop_inproxy.erl %% %% Description: %% This file contains the IIOP "proxy" for incomming connections %% %%----------------------------------------------------------------- -module(orber_iiop_inproxy). -behaviour(gen_server). -include_lib("orber/src/orber_iiop.hrl"). -include_lib("orber/include/corba.hrl"). %%----------------------------------------------------------------- %% External exports %%----------------------------------------------------------------- -export([start/0, start/1]). %%----------------------------------------------------------------- %% Internal exports %%----------------------------------------------------------------- -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2, post_accept/3, stop/1]). %%----------------------------------------------------------------- %% Macros %%----------------------------------------------------------------- -define(DEBUG_LEVEL, 7). -record(state, {stype, socket, db, timeout, max_fragments, max_requests, request_counter = 1, giop_env, peer}). %%----------------------------------------------------------------- %% External interface functions %%----------------------------------------------------------------- %%----------------------------------------------------------------- %% Func: start/0 %%----------------------------------------------------------------- start() -> ignore. %%----------------------------------------------------------------- %% Func: start/1 %%----------------------------------------------------------------- start(Opts) -> gen_server:start_link(orber_iiop_inproxy, Opts, []). post_accept(Pid, ssl, Socket) -> (catch gen_server:cast(Pid, {post_accept, ssl, Socket})), ok; post_accept(_, _, _) -> ok. %%----------------------------------------------------------------- %% Internal interface functions %%----------------------------------------------------------------- %%----------------------------------------------------------------- %% Func: stop/1 %%----------------------------------------------------------------- stop(Pid) -> gen_server:cast(Pid, stop). %%----------------------------------------------------------------- %% Server functions %%----------------------------------------------------------------- %%----------------------------------------------------------------- %% Func: init/1 %%----------------------------------------------------------------- init({connect, Type, Socket, Ref, Options}) -> process_flag(trap_exit, true), Flags = orber_tb:keysearch(flags, Options, orber_env:get_flags()), {Address, Port} = PeerData = orber_socket:peerdata(Type, Socket), {LAddress, LPort} = LocalData = orber_socket:sockdata(Type, Socket), case {?ORB_FLAG_TEST(Flags, ?ORB_ENV_LOCAL_INTERFACE), LPort} of {true, 0} -> orber_tb:info("Unable to lookup the local address and port number.~n" "Closing the incoming connection.", []), ignore; _ -> orber_iiop_net:add_connection(Socket, Type, PeerData, LocalData, Ref), Interceptors = case orber_tb:keysearch(interceptors, Options, orber_env:get_interceptors()) of {native, PIs} -> {native, orber_pi:new_in_connection(PIs, Address, Port, LAddress, LPort), PIs}; Other -> Other end, Env = case ?ORB_FLAG_TEST(Flags, ?ORB_ENV_LOCAL_INTERFACE) of true when Type == ssl -> #giop_env{interceptors = Interceptors, flags = Flags, host = [LAddress], iiop_port = orber_tb:keysearch(iiop_port, Options, orber_env:iiop_port()), iiop_ssl_port = LPort, domain = orber:domain(), partial_security = orber:partial_security()}; true -> #giop_env{interceptors = Interceptors, flags = Flags, host = [LAddress], iiop_port = LPort, iiop_ssl_port = orber_tb:keysearch(iiop_ssl_port, Options, orber_env:iiop_ssl_port()), domain = orber:domain(), partial_security = orber:partial_security()}; false -> case ?ORB_FLAG_TEST(Flags, ?ORB_ENV_ENABLE_NAT) of false -> #giop_env{interceptors = Interceptors, flags = Flags, host = orber:host(), iiop_port = orber:iiop_port(), iiop_ssl_port = orber:iiop_ssl_port(), domain = orber:domain(), partial_security = orber:partial_security()}; true -> #giop_env{interceptors = Interceptors, flags = Flags, host = orber_tb:keysearch(nat_ip_address, Options, orber_env:nat_host()), iiop_port = orber_tb:keysearch(nat_iiop_port, Options, orber_env:nat_iiop_port()), iiop_ssl_port = orber_tb:keysearch(nat_iiop_ssl_port, Options, orber_env:nat_iiop_ssl_port()), domain = orber:domain(), partial_security = orber:partial_security()} end end, Timeout = orber_tb:keysearch(iiop_in_connection_timeout, Options, orber_env:iiop_in_connection_timeout()), MaxFrags = orber_tb:keysearch(iiop_max_fragments, Options, orber_env:iiop_max_fragments()), MaxRequests = orber_tb:keysearch(iiop_max_in_requests, Options, orber_env:iiop_max_in_requests()), {ok, #state{stype = Type, socket = Socket, db = ets:new(orber_incoming_requests, [set]), timeout = Timeout, max_fragments = MaxFrags, max_requests = MaxRequests, giop_env = Env, peer = PeerData}, Timeout} end. %%----------------------------------------------------------------- %% Func: terminate/2 %%----------------------------------------------------------------- %% We may want to kill all proxies before terminating, but the best %% option should be to let the requests complete (especially for one-way %% functions it's a better alternative. terminate(_Reason, #state{db = IncRequests, giop_env = Env}) -> ets:delete(IncRequests), case Env#giop_env.interceptors of false -> ok; {native, Ref, PIs} -> orber_pi:closed_in_connection(PIs, Ref); {_Type, _PIs} -> ok end. %%----------------------------------------------------------------- %% Func: handle_call/3 %%----------------------------------------------------------------- handle_call(stop, _From, State) -> {stop, normal, ok, State}; handle_call(_, _, State) -> {noreply, State, State#state.timeout}. %%----------------------------------------------------------------- %% Func: handle_cast/2 %%----------------------------------------------------------------- handle_cast({post_accept, Type, Socket}, State) -> Timeout = orber_env:iiop_ssl_accept_timeout(), case catch orber_socket:post_accept(Type, Socket, Timeout) of ok -> {noreply, State}; _Failed -> orber_socket:close(Type, Socket), {stop, normal, State} end; handle_cast(stop, State) -> {stop, normal, State}; handle_cast(_, State) -> {noreply, State, State#state.timeout}. %%----------------------------------------------------------------- %% Func: handle_info/2 %%----------------------------------------------------------------- %% Normal invocation handle_info({tcp, Socket, Bytes}, State) -> handle_msg(normal, Socket, Bytes, State); handle_info({ssl, Socket, Bytes}, State) -> handle_msg(ssl, Socket, Bytes, State); %% Errors, closed connection handle_info({tcp_closed, _Socket}, State) -> {stop, normal, State}; handle_info({tcp_error, _Socket, _Reason}, State) -> {stop, normal, State}; handle_info({ssl_closed, _Socket}, State) -> {stop, normal, State}; handle_info({ssl_error, _Socket, _Reason}, State) -> {stop, normal, State}; %% Servant termination. handle_info({'EXIT', Pid, normal}, State) -> ets:delete(State#state.db, Pid), {noreply, decrease_counter(State), State#state.timeout}; handle_info({message_error, _Pid, ReqId}, State) -> ets:delete(State#state.db, ReqId), {noreply, State, State#state.timeout}; handle_info(timeout, State) -> case ets:info(State#state.db, size) of 0 -> %% No pending requests, close the connection. {stop, normal, State}; _Amount -> %% Still pending request, cannot close the connection. {noreply, State, State#state.timeout} end; handle_info({reconfigure, Options}, State) -> {noreply, update_state(State, Options), State#state.timeout}; handle_info(_X,State) -> {noreply, State, State#state.timeout}. handle_msg(Type, Socket, Bytes, #state{stype = Type, socket = Socket, giop_env = Env} = State) -> case catch cdr_decode:dec_giop_message_header(Bytes) of %% Only when using IIOP-1.2 may the client send this message. %% Introduced in CORBA-2.6 #giop_message{message_type = ?GIOP_MSG_CLOSE_CONNECTION, giop_version = {1,2}} -> {stop, normal, State}; #giop_message{message_type = ?GIOP_MSG_CLOSE_CONNECTION} -> {noreply, State, State#state.timeout}; #giop_message{message_type = ?GIOP_MSG_CANCEL_REQUEST} = GIOPHdr -> ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order, GIOPHdr#giop_message.message), case ets:lookup(State#state.db, ReqId) of [{RId, PPid}] -> ets:delete(State#state.db, RId), PPid ! {self(), cancel_request_header}; [] -> send_msg_error(Type, Socket, Bytes, Env#giop_env{version = GIOPHdr#giop_message.giop_version}, "No such request id") end, {noreply, State, State#state.timeout}; %% A fragment; we must have received a Request or LocateRequest %% with fragment-flag set to true. %% We need to decode the header to get the request-id. #giop_message{message_type = ?GIOP_MSG_FRAGMENT, giop_version = {1,2}} = GIOPHdr -> ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order, GIOPHdr#giop_message.message), case ets:lookup(State#state.db, ReqId) of [{_RId, PPid}] when GIOPHdr#giop_message.fragments == true -> PPid ! {self(), GIOPHdr}; [{RId, PPid}] -> ets:delete(State#state.db, RId), PPid ! {self(), GIOPHdr}; [] -> send_msg_error(Type, Socket, Bytes, Env#giop_env{version = GIOPHdr#giop_message.giop_version}, "No such fragment id") end, {noreply, State, State#state.timeout}; %% Must be a Request or LocateRequest which have been fragmented. %% We need to decode the header to get the request-id. #giop_message{fragments = true, giop_version = {1,2}} = GIOPHdr -> ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order, GIOPHdr#giop_message.message), Pid = orber_iiop_inrequest: start_fragment_collector(GIOPHdr, Bytes, Type, Socket, ReqId, self(), State#state.max_fragments, Env#giop_env{version = {1,2}, request_id = ReqId}), ets:insert(State#state.db, {Pid, ReqId}), ets:insert(State#state.db, {ReqId, Pid}), {noreply, increase_counter(State), State#state.timeout}; GIOPHdr when is_record(GIOPHdr, giop_message) -> Pid = orber_iiop_inrequest:start(GIOPHdr, Bytes, Type, Socket, Env#giop_env{version = GIOPHdr#giop_message.giop_version}), ets:insert(State#state.db, {Pid, undefined}), {noreply, increase_counter(State), State#state.timeout}; {'EXIT', message_error} -> send_msg_error(Type, Socket, Bytes, Env#giop_env{version = orber_env:giop_version()}, "Unable to decode the GIOP-header"), {noreply, State, State#state.timeout} end; handle_msg(Type, _, Bytes, State) -> orber:dbg("[~p] orber_iiop_inproxy:handle_msg(~p);~n" "Received a message from a socket of a different type.~n" "Should be ~p but was ~p.", [?LINE, Bytes, State#state.stype, Type], ?DEBUG_LEVEL), {noreply, State, State#state.timeout}. send_msg_error(Type, Socket, Data, Env, Msg) -> orber:dbg("[~p] orber_iiop_inproxy:handle_msg(~p); ~p.", [?LINE, Data, Msg], ?DEBUG_LEVEL), Reply = cdr_encode:enc_message_error(Env), orber_socket:write(Type, Socket, Reply). increase_counter(#state{max_requests = infinity} = State) -> State; increase_counter(#state{max_requests = Max, request_counter = Counter} = State) when Max > Counter -> orber_socket:setopts(State#state.stype, State#state.socket, [{active, once}]), State#state{request_counter = Counter + 1}; increase_counter(State) -> State#state{request_counter = State#state.request_counter + 1}. decrease_counter(#state{max_requests = infinity} = State) -> State; decrease_counter(#state{max_requests = Max, request_counter = Counter} = State) when Max =< Counter -> orber_socket:setopts(State#state.stype, State#state.socket, [{active, once}]), State#state{request_counter = Counter - 1}; decrease_counter(State) -> State#state{request_counter = State#state.request_counter - 1}. update_state(#state{giop_env = Env} = State, [{interceptors, false}|Options]) -> update_state(State#state{giop_env = Env#giop_env{interceptors = false}}, Options); update_state(#state{giop_env = #giop_env{interceptors = false, host = [SH], iiop_port = SP} = Env, peer = {PH, PP}, stype = normal} = State, [{interceptors, {native, LPIs}}|Options]) -> %% No Interceptor(s). Add the same Ref used by the built in interceptors. update_state(State#state{giop_env = Env#giop_env{interceptors = {native, {PH, PP, SH, SP}, LPIs}}}, Options); update_state(#state{giop_env = #giop_env{interceptors = false, host = [SH], iiop_ssl_port = SP} = Env, peer = {PH, PP}, stype = ssl} = State, [{interceptors, {native, LPIs}}|Options]) -> %% No Interceptor(s). Add the same Ref used by the built in interceptors. update_state(State#state{giop_env = Env#giop_env{interceptors = {native, {PH, PP, SH, SP}, LPIs}}}, Options); update_state(#state{giop_env = #giop_env{interceptors = {native, Ref, _}} = Env} = State, [{interceptors, {native, LPIs}}|Options]) -> %% Interceptor(s) already in use. We must use the same Ref as before. update_state(State#state{giop_env = Env#giop_env{interceptors = {native, Ref, LPIs}}}, Options); update_state(State, [H|T]) -> orber:dbg("[~p] orber_iiop_inproxy:update_state(~p, ~p)~n" "Couldn't change the state.", [?LINE, H, State], ?DEBUG_LEVEL), update_state(State, T); update_state(State, []) -> State. %%----------------------------------------------------------------- %% Func: code_change/3 %%----------------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}.