From 84adefa331c4159d432d22840663c38f155cd4c1 Mon Sep 17 00:00:00 2001 From: Erlang/OTP Date: Fri, 20 Nov 2009 14:54:40 +0000 Subject: The R13B03 release. --- lib/orber/src/orber_iiop_inproxy.erl | 398 +++++++++++++++++++++++++++++++++++ 1 file changed, 398 insertions(+) create mode 100644 lib/orber/src/orber_iiop_inproxy.erl (limited to 'lib/orber/src/orber_iiop_inproxy.erl') diff --git a/lib/orber/src/orber_iiop_inproxy.erl b/lib/orber/src/orber_iiop_inproxy.erl new file mode 100644 index 0000000000..ede1e0749f --- /dev/null +++ b/lib/orber/src/orber_iiop_inproxy.erl @@ -0,0 +1,398 @@ +%%-------------------------------------------------------------------- +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1997-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%% +%%----------------------------------------------------------------- +%% File: orber_iiop_inproxy.erl +%% +%% Description: +%% This file contains the IIOP "proxy" for incomming connections +%% +%%----------------------------------------------------------------- +-module(orber_iiop_inproxy). + +-behaviour(gen_server). + +-include_lib("orber/src/orber_iiop.hrl"). +-include_lib("orber/include/corba.hrl"). + +%%----------------------------------------------------------------- +%% External exports +%%----------------------------------------------------------------- +-export([start/0, start/1]). + +%%----------------------------------------------------------------- +%% Internal exports +%%----------------------------------------------------------------- +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + code_change/3, terminate/2, post_accept/3, stop/1]). + +%%----------------------------------------------------------------- +%% Macros +%%----------------------------------------------------------------- +-define(DEBUG_LEVEL, 7). + +-record(state, {stype, socket, db, timeout, max_fragments, + max_requests, request_counter = 1, giop_env, peer}). + +%%----------------------------------------------------------------- +%% External interface functions +%%----------------------------------------------------------------- +%%----------------------------------------------------------------- +%% Func: start/0 +%%----------------------------------------------------------------- +start() -> + ignore. + +%%----------------------------------------------------------------- +%% Func: start/1 +%%----------------------------------------------------------------- +start(Opts) -> + gen_server:start_link(orber_iiop_inproxy, Opts, []). + +post_accept(Pid, ssl, Socket) -> + (catch gen_server:cast(Pid, {post_accept, ssl, Socket})), + ok; +post_accept(_, _, _) -> + ok. + +%%----------------------------------------------------------------- +%% Internal interface functions +%%----------------------------------------------------------------- +%%----------------------------------------------------------------- +%% Func: stop/1 +%%----------------------------------------------------------------- +stop(Pid) -> + gen_server:cast(Pid, stop). + +%%----------------------------------------------------------------- +%% Server functions +%%----------------------------------------------------------------- +%%----------------------------------------------------------------- +%% Func: init/1 +%%----------------------------------------------------------------- +init({connect, Type, Socket, Ref, Options}) -> + process_flag(trap_exit, true), + Flags = orber_tb:keysearch(flags, Options, orber_env:get_flags()), + {Address, Port} = PeerData = orber_socket:peerdata(Type, Socket), + {LAddress, LPort} = LocalData = orber_socket:sockdata(Type, Socket), + case {?ORB_FLAG_TEST(Flags, ?ORB_ENV_LOCAL_INTERFACE), LPort} of + {true, 0} -> + orber_tb:info("Unable to lookup the local address and port number.~n" + "Closing the incoming connection.", []), + ignore; + _ -> + orber_iiop_net:add_connection(Socket, Type, PeerData, LocalData, Ref), + Interceptors = + case orber_tb:keysearch(interceptors, Options, + orber_env:get_interceptors()) of + {native, PIs} -> + {native, orber_pi:new_in_connection(PIs, Address, Port, + LAddress, LPort), PIs}; + Other -> + Other + end, + Env = + case ?ORB_FLAG_TEST(Flags, ?ORB_ENV_LOCAL_INTERFACE) of + true when Type == ssl -> + #giop_env{interceptors = Interceptors, + flags = Flags, host = [LAddress], + iiop_port = + orber_tb:keysearch(iiop_port, Options, + orber_env:iiop_port()), + iiop_ssl_port = LPort, + domain = orber:domain(), + partial_security = orber:partial_security()}; + true -> + #giop_env{interceptors = Interceptors, + flags = Flags, host = [LAddress], + iiop_port = LPort, + iiop_ssl_port = + orber_tb:keysearch(iiop_ssl_port, Options, + orber_env:iiop_ssl_port()), + domain = orber:domain(), + partial_security = orber:partial_security()}; + false -> + case ?ORB_FLAG_TEST(Flags, ?ORB_ENV_ENABLE_NAT) of + false -> + #giop_env{interceptors = Interceptors, + flags = Flags, host = orber:host(), + iiop_port = orber:iiop_port(), + iiop_ssl_port = orber:iiop_ssl_port(), + domain = orber:domain(), + partial_security = orber:partial_security()}; + true -> + #giop_env{interceptors = Interceptors, + flags = Flags, + host = + orber_tb:keysearch(nat_ip_address, Options, + orber_env:nat_host()), + iiop_port = + orber_tb:keysearch(nat_iiop_port, Options, + orber_env:nat_iiop_port()), + iiop_ssl_port = + orber_tb:keysearch(nat_iiop_ssl_port, Options, + orber_env:nat_iiop_ssl_port()), + domain = orber:domain(), + partial_security = orber:partial_security()} + end + end, + Timeout = orber_tb:keysearch(iiop_in_connection_timeout, Options, + orber_env:iiop_in_connection_timeout()), + MaxFrags = orber_tb:keysearch(iiop_max_fragments, Options, + orber_env:iiop_max_fragments()), + MaxRequests = orber_tb:keysearch(iiop_max_in_requests, Options, + orber_env:iiop_max_in_requests()), + {ok, #state{stype = Type, + socket = Socket, + db = ets:new(orber_incoming_requests, [set]), + timeout = Timeout, + max_fragments = MaxFrags, + max_requests = MaxRequests, + giop_env = Env, peer = PeerData}, Timeout} + end. + + +%%----------------------------------------------------------------- +%% Func: terminate/2 +%%----------------------------------------------------------------- +%% We may want to kill all proxies before terminating, but the best +%% option should be to let the requests complete (especially for one-way +%% functions it's a better alternative. +terminate(_Reason, #state{db = IncRequests, giop_env = Env}) -> + ets:delete(IncRequests), + case Env#giop_env.interceptors of + false -> + ok; + {native, Ref, PIs} -> + orber_pi:closed_in_connection(PIs, Ref); + {_Type, _PIs} -> + ok + end. + +%%----------------------------------------------------------------- +%% Func: handle_call/3 +%%----------------------------------------------------------------- +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; +handle_call(_, _, State) -> + {noreply, State, State#state.timeout}. + +%%----------------------------------------------------------------- +%% Func: handle_cast/2 +%%----------------------------------------------------------------- +handle_cast({post_accept, Type, Socket}, State) -> + Timeout = orber_env:iiop_ssl_accept_timeout(), + case catch orber_socket:post_accept(Type, Socket, Timeout) of + ok -> + {noreply, State}; + _Failed -> + orber_socket:close(Type, Socket), + {stop, normal, State} + end; +handle_cast(stop, State) -> + {stop, normal, State}; +handle_cast(_, State) -> + {noreply, State, State#state.timeout}. + +%%----------------------------------------------------------------- +%% Func: handle_info/2 +%%----------------------------------------------------------------- +%% Normal invocation +handle_info({tcp, Socket, Bytes}, State) -> + handle_msg(normal, Socket, Bytes, State); +handle_info({ssl, Socket, Bytes}, State) -> + handle_msg(ssl, Socket, Bytes, State); +%% Errors, closed connection +handle_info({tcp_closed, _Socket}, State) -> + {stop, normal, State}; +handle_info({tcp_error, _Socket, _Reason}, State) -> + {stop, normal, State}; +handle_info({ssl_closed, _Socket}, State) -> + {stop, normal, State}; +handle_info({ssl_error, _Socket, _Reason}, State) -> + {stop, normal, State}; +%% Servant termination. +handle_info({'EXIT', Pid, normal}, State) -> + ets:delete(State#state.db, Pid), + {noreply, decrease_counter(State), State#state.timeout}; +handle_info({message_error, _Pid, ReqId}, State) -> + ets:delete(State#state.db, ReqId), + {noreply, State, State#state.timeout}; +handle_info(timeout, State) -> + case ets:info(State#state.db, size) of + 0 -> + %% No pending requests, close the connection. + {stop, normal, State}; + _Amount -> + %% Still pending request, cannot close the connection. + {noreply, State, State#state.timeout} + end; +handle_info({reconfigure, Options}, State) -> + {noreply, update_state(State, Options), State#state.timeout}; +handle_info(_X,State) -> + {noreply, State, State#state.timeout}. + +handle_msg(Type, Socket, Bytes, #state{stype = Type, socket = Socket, + giop_env = Env} = State) -> + case catch cdr_decode:dec_giop_message_header(Bytes) of + %% Only when using IIOP-1.2 may the client send this message. + %% Introduced in CORBA-2.6 + #giop_message{message_type = ?GIOP_MSG_CLOSE_CONNECTION, + giop_version = {1,2}} -> + {stop, normal, State}; + #giop_message{message_type = ?GIOP_MSG_CLOSE_CONNECTION} -> + {noreply, State, State#state.timeout}; + #giop_message{message_type = ?GIOP_MSG_CANCEL_REQUEST} = GIOPHdr -> + ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order, + GIOPHdr#giop_message.message), + case ets:lookup(State#state.db, ReqId) of + [{RId, PPid}] -> + ets:delete(State#state.db, RId), + PPid ! {self(), cancel_request_header}; + [] -> + send_msg_error(Type, Socket, Bytes, + Env#giop_env{version = + GIOPHdr#giop_message.giop_version}, + "No such request id") + end, + {noreply, State, State#state.timeout}; + %% A fragment; we must have received a Request or LocateRequest + %% with fragment-flag set to true. + %% We need to decode the header to get the request-id. + #giop_message{message_type = ?GIOP_MSG_FRAGMENT, + giop_version = {1,2}} = GIOPHdr -> + ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order, + GIOPHdr#giop_message.message), + case ets:lookup(State#state.db, ReqId) of + [{_RId, PPid}] when GIOPHdr#giop_message.fragments == true -> + PPid ! {self(), GIOPHdr}; + [{RId, PPid}] -> + ets:delete(State#state.db, RId), + PPid ! {self(), GIOPHdr}; + [] -> + send_msg_error(Type, Socket, Bytes, + Env#giop_env{version = + GIOPHdr#giop_message.giop_version}, + "No such fragment id") + end, + {noreply, State, State#state.timeout}; + %% Must be a Request or LocateRequest which have been fragmented. + %% We need to decode the header to get the request-id. + #giop_message{fragments = true, + giop_version = {1,2}} = GIOPHdr -> + ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order, + GIOPHdr#giop_message.message), + Pid = + orber_iiop_inrequest: + start_fragment_collector(GIOPHdr, Bytes, + Type, Socket, + ReqId, self(), + State#state.max_fragments, + Env#giop_env{version = {1,2}, + request_id = ReqId}), + ets:insert(State#state.db, {Pid, ReqId}), + ets:insert(State#state.db, {ReqId, Pid}), + {noreply, increase_counter(State), State#state.timeout}; + GIOPHdr when is_record(GIOPHdr, giop_message) -> + Pid = orber_iiop_inrequest:start(GIOPHdr, Bytes, Type, Socket, + Env#giop_env{version = + GIOPHdr#giop_message.giop_version}), + ets:insert(State#state.db, {Pid, undefined}), + {noreply, increase_counter(State), State#state.timeout}; + {'EXIT', message_error} -> + send_msg_error(Type, Socket, Bytes, + Env#giop_env{version = orber_env:giop_version()}, + "Unable to decode the GIOP-header"), + {noreply, State, State#state.timeout} + end; +handle_msg(Type, _, Bytes, State) -> + orber:dbg("[~p] orber_iiop_inproxy:handle_msg(~p);~n" + "Received a message from a socket of a different type.~n" + "Should be ~p but was ~p.", + [?LINE, Bytes, State#state.stype, Type], ?DEBUG_LEVEL), + {noreply, State, State#state.timeout}. + +send_msg_error(Type, Socket, Data, Env, Msg) -> + orber:dbg("[~p] orber_iiop_inproxy:handle_msg(~p); ~p.", + [?LINE, Data, Msg], ?DEBUG_LEVEL), + Reply = cdr_encode:enc_message_error(Env), + orber_socket:write(Type, Socket, Reply). + +increase_counter(#state{max_requests = infinity} = State) -> + State; +increase_counter(#state{max_requests = Max, + request_counter = Counter} = State) when Max > Counter -> + orber_socket:setopts(State#state.stype, State#state.socket, [{active, once}]), + State#state{request_counter = Counter + 1}; +increase_counter(State) -> + State#state{request_counter = State#state.request_counter + 1}. + +decrease_counter(#state{max_requests = infinity} = State) -> + State; +decrease_counter(#state{max_requests = Max, + request_counter = Counter} = State) when Max =< Counter -> + orber_socket:setopts(State#state.stype, State#state.socket, [{active, once}]), + State#state{request_counter = Counter - 1}; +decrease_counter(State) -> + State#state{request_counter = State#state.request_counter - 1}. + +update_state(#state{giop_env = Env} = State, + [{interceptors, false}|Options]) -> + update_state(State#state{giop_env = + Env#giop_env{interceptors = false}}, Options); +update_state(#state{giop_env = #giop_env{interceptors = false, host = [SH], + iiop_port = SP} = Env, + peer = {PH, PP}, stype = normal} = State, + [{interceptors, {native, LPIs}}|Options]) -> + %% No Interceptor(s). Add the same Ref used by the built in interceptors. + update_state(State#state{giop_env = + Env#giop_env{interceptors = + {native, {PH, PP, SH, SP}, LPIs}}}, + Options); +update_state(#state{giop_env = #giop_env{interceptors = false, host = [SH], + iiop_ssl_port = SP} = Env, + peer = {PH, PP}, stype = ssl} = State, + [{interceptors, {native, LPIs}}|Options]) -> + %% No Interceptor(s). Add the same Ref used by the built in interceptors. + update_state(State#state{giop_env = + Env#giop_env{interceptors = + {native, {PH, PP, SH, SP}, LPIs}}}, + Options); +update_state(#state{giop_env = #giop_env{interceptors = {native, Ref, _}} = Env} = + State, + [{interceptors, {native, LPIs}}|Options]) -> + %% Interceptor(s) already in use. We must use the same Ref as before. + update_state(State#state{giop_env = + Env#giop_env{interceptors = {native, Ref, LPIs}}}, + Options); +update_state(State, [H|T]) -> + orber:dbg("[~p] orber_iiop_inproxy:update_state(~p, ~p)~n" + "Couldn't change the state.", + [?LINE, H, State], ?DEBUG_LEVEL), + update_state(State, T); +update_state(State, []) -> + State. + +%%----------------------------------------------------------------- +%% Func: code_change/3 +%%----------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + -- cgit v1.2.3