aboutsummaryrefslogtreecommitdiffstats
path: root/lib/orber/src/orber_iiop_inproxy.erl
diff options
context:
space:
mode:
authorErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
committerErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
commit84adefa331c4159d432d22840663c38f155cd4c1 (patch)
treebff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/orber/src/orber_iiop_inproxy.erl
downloadotp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz
otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2
otp-84adefa331c4159d432d22840663c38f155cd4c1.zip
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/orber/src/orber_iiop_inproxy.erl')
-rw-r--r--lib/orber/src/orber_iiop_inproxy.erl398
1 files changed, 398 insertions, 0 deletions
diff --git a/lib/orber/src/orber_iiop_inproxy.erl b/lib/orber/src/orber_iiop_inproxy.erl
new file mode 100644
index 0000000000..ede1e0749f
--- /dev/null
+++ b/lib/orber/src/orber_iiop_inproxy.erl
@@ -0,0 +1,398 @@
+%%--------------------------------------------------------------------
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 1997-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+%%
+%%-----------------------------------------------------------------
+%% File: orber_iiop_inproxy.erl
+%%
+%% Description:
+%% This file contains the IIOP "proxy" for incomming connections
+%%
+%%-----------------------------------------------------------------
+-module(orber_iiop_inproxy).
+
+-behaviour(gen_server).
+
+-include_lib("orber/src/orber_iiop.hrl").
+-include_lib("orber/include/corba.hrl").
+
+%%-----------------------------------------------------------------
+%% External exports
+%%-----------------------------------------------------------------
+-export([start/0, start/1]).
+
+%%-----------------------------------------------------------------
+%% Internal exports
+%%-----------------------------------------------------------------
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ code_change/3, terminate/2, post_accept/3, stop/1]).
+
+%%-----------------------------------------------------------------
+%% Macros
+%%-----------------------------------------------------------------
+-define(DEBUG_LEVEL, 7).
+
+-record(state, {stype, socket, db, timeout, max_fragments,
+ max_requests, request_counter = 1, giop_env, peer}).
+
+%%-----------------------------------------------------------------
+%% External interface functions
+%%-----------------------------------------------------------------
+%%-----------------------------------------------------------------
+%% Func: start/0
+%%-----------------------------------------------------------------
+start() ->
+ ignore.
+
+%%-----------------------------------------------------------------
+%% Func: start/1
+%%-----------------------------------------------------------------
+start(Opts) ->
+ gen_server:start_link(orber_iiop_inproxy, Opts, []).
+
+post_accept(Pid, ssl, Socket) ->
+ (catch gen_server:cast(Pid, {post_accept, ssl, Socket})),
+ ok;
+post_accept(_, _, _) ->
+ ok.
+
+%%-----------------------------------------------------------------
+%% Internal interface functions
+%%-----------------------------------------------------------------
+%%-----------------------------------------------------------------
+%% Func: stop/1
+%%-----------------------------------------------------------------
+stop(Pid) ->
+ gen_server:cast(Pid, stop).
+
+%%-----------------------------------------------------------------
+%% Server functions
+%%-----------------------------------------------------------------
+%%-----------------------------------------------------------------
+%% Func: init/1
+%%-----------------------------------------------------------------
+init({connect, Type, Socket, Ref, Options}) ->
+ process_flag(trap_exit, true),
+ Flags = orber_tb:keysearch(flags, Options, orber_env:get_flags()),
+ {Address, Port} = PeerData = orber_socket:peerdata(Type, Socket),
+ {LAddress, LPort} = LocalData = orber_socket:sockdata(Type, Socket),
+ case {?ORB_FLAG_TEST(Flags, ?ORB_ENV_LOCAL_INTERFACE), LPort} of
+ {true, 0} ->
+ orber_tb:info("Unable to lookup the local address and port number.~n"
+ "Closing the incoming connection.", []),
+ ignore;
+ _ ->
+ orber_iiop_net:add_connection(Socket, Type, PeerData, LocalData, Ref),
+ Interceptors =
+ case orber_tb:keysearch(interceptors, Options,
+ orber_env:get_interceptors()) of
+ {native, PIs} ->
+ {native, orber_pi:new_in_connection(PIs, Address, Port,
+ LAddress, LPort), PIs};
+ Other ->
+ Other
+ end,
+ Env =
+ case ?ORB_FLAG_TEST(Flags, ?ORB_ENV_LOCAL_INTERFACE) of
+ true when Type == ssl ->
+ #giop_env{interceptors = Interceptors,
+ flags = Flags, host = [LAddress],
+ iiop_port =
+ orber_tb:keysearch(iiop_port, Options,
+ orber_env:iiop_port()),
+ iiop_ssl_port = LPort,
+ domain = orber:domain(),
+ partial_security = orber:partial_security()};
+ true ->
+ #giop_env{interceptors = Interceptors,
+ flags = Flags, host = [LAddress],
+ iiop_port = LPort,
+ iiop_ssl_port =
+ orber_tb:keysearch(iiop_ssl_port, Options,
+ orber_env:iiop_ssl_port()),
+ domain = orber:domain(),
+ partial_security = orber:partial_security()};
+ false ->
+ case ?ORB_FLAG_TEST(Flags, ?ORB_ENV_ENABLE_NAT) of
+ false ->
+ #giop_env{interceptors = Interceptors,
+ flags = Flags, host = orber:host(),
+ iiop_port = orber:iiop_port(),
+ iiop_ssl_port = orber:iiop_ssl_port(),
+ domain = orber:domain(),
+ partial_security = orber:partial_security()};
+ true ->
+ #giop_env{interceptors = Interceptors,
+ flags = Flags,
+ host =
+ orber_tb:keysearch(nat_ip_address, Options,
+ orber_env:nat_host()),
+ iiop_port =
+ orber_tb:keysearch(nat_iiop_port, Options,
+ orber_env:nat_iiop_port()),
+ iiop_ssl_port =
+ orber_tb:keysearch(nat_iiop_ssl_port, Options,
+ orber_env:nat_iiop_ssl_port()),
+ domain = orber:domain(),
+ partial_security = orber:partial_security()}
+ end
+ end,
+ Timeout = orber_tb:keysearch(iiop_in_connection_timeout, Options,
+ orber_env:iiop_in_connection_timeout()),
+ MaxFrags = orber_tb:keysearch(iiop_max_fragments, Options,
+ orber_env:iiop_max_fragments()),
+ MaxRequests = orber_tb:keysearch(iiop_max_in_requests, Options,
+ orber_env:iiop_max_in_requests()),
+ {ok, #state{stype = Type,
+ socket = Socket,
+ db = ets:new(orber_incoming_requests, [set]),
+ timeout = Timeout,
+ max_fragments = MaxFrags,
+ max_requests = MaxRequests,
+ giop_env = Env, peer = PeerData}, Timeout}
+ end.
+
+
+%%-----------------------------------------------------------------
+%% Func: terminate/2
+%%-----------------------------------------------------------------
+%% We may want to kill all proxies before terminating, but the best
+%% option should be to let the requests complete (especially for one-way
+%% functions it's a better alternative.
+terminate(_Reason, #state{db = IncRequests, giop_env = Env}) ->
+ ets:delete(IncRequests),
+ case Env#giop_env.interceptors of
+ false ->
+ ok;
+ {native, Ref, PIs} ->
+ orber_pi:closed_in_connection(PIs, Ref);
+ {_Type, _PIs} ->
+ ok
+ end.
+
+%%-----------------------------------------------------------------
+%% Func: handle_call/3
+%%-----------------------------------------------------------------
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State};
+handle_call(_, _, State) ->
+ {noreply, State, State#state.timeout}.
+
+%%-----------------------------------------------------------------
+%% Func: handle_cast/2
+%%-----------------------------------------------------------------
+handle_cast({post_accept, Type, Socket}, State) ->
+ Timeout = orber_env:iiop_ssl_accept_timeout(),
+ case catch orber_socket:post_accept(Type, Socket, Timeout) of
+ ok ->
+ {noreply, State};
+ _Failed ->
+ orber_socket:close(Type, Socket),
+ {stop, normal, State}
+ end;
+handle_cast(stop, State) ->
+ {stop, normal, State};
+handle_cast(_, State) ->
+ {noreply, State, State#state.timeout}.
+
+%%-----------------------------------------------------------------
+%% Func: handle_info/2
+%%-----------------------------------------------------------------
+%% Normal invocation
+handle_info({tcp, Socket, Bytes}, State) ->
+ handle_msg(normal, Socket, Bytes, State);
+handle_info({ssl, Socket, Bytes}, State) ->
+ handle_msg(ssl, Socket, Bytes, State);
+%% Errors, closed connection
+handle_info({tcp_closed, _Socket}, State) ->
+ {stop, normal, State};
+handle_info({tcp_error, _Socket, _Reason}, State) ->
+ {stop, normal, State};
+handle_info({ssl_closed, _Socket}, State) ->
+ {stop, normal, State};
+handle_info({ssl_error, _Socket, _Reason}, State) ->
+ {stop, normal, State};
+%% Servant termination.
+handle_info({'EXIT', Pid, normal}, State) ->
+ ets:delete(State#state.db, Pid),
+ {noreply, decrease_counter(State), State#state.timeout};
+handle_info({message_error, _Pid, ReqId}, State) ->
+ ets:delete(State#state.db, ReqId),
+ {noreply, State, State#state.timeout};
+handle_info(timeout, State) ->
+ case ets:info(State#state.db, size) of
+ 0 ->
+ %% No pending requests, close the connection.
+ {stop, normal, State};
+ _Amount ->
+ %% Still pending request, cannot close the connection.
+ {noreply, State, State#state.timeout}
+ end;
+handle_info({reconfigure, Options}, State) ->
+ {noreply, update_state(State, Options), State#state.timeout};
+handle_info(_X,State) ->
+ {noreply, State, State#state.timeout}.
+
+handle_msg(Type, Socket, Bytes, #state{stype = Type, socket = Socket,
+ giop_env = Env} = State) ->
+ case catch cdr_decode:dec_giop_message_header(Bytes) of
+ %% Only when using IIOP-1.2 may the client send this message.
+ %% Introduced in CORBA-2.6
+ #giop_message{message_type = ?GIOP_MSG_CLOSE_CONNECTION,
+ giop_version = {1,2}} ->
+ {stop, normal, State};
+ #giop_message{message_type = ?GIOP_MSG_CLOSE_CONNECTION} ->
+ {noreply, State, State#state.timeout};
+ #giop_message{message_type = ?GIOP_MSG_CANCEL_REQUEST} = GIOPHdr ->
+ ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order,
+ GIOPHdr#giop_message.message),
+ case ets:lookup(State#state.db, ReqId) of
+ [{RId, PPid}] ->
+ ets:delete(State#state.db, RId),
+ PPid ! {self(), cancel_request_header};
+ [] ->
+ send_msg_error(Type, Socket, Bytes,
+ Env#giop_env{version =
+ GIOPHdr#giop_message.giop_version},
+ "No such request id")
+ end,
+ {noreply, State, State#state.timeout};
+ %% A fragment; we must have received a Request or LocateRequest
+ %% with fragment-flag set to true.
+ %% We need to decode the header to get the request-id.
+ #giop_message{message_type = ?GIOP_MSG_FRAGMENT,
+ giop_version = {1,2}} = GIOPHdr ->
+ ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order,
+ GIOPHdr#giop_message.message),
+ case ets:lookup(State#state.db, ReqId) of
+ [{_RId, PPid}] when GIOPHdr#giop_message.fragments == true ->
+ PPid ! {self(), GIOPHdr};
+ [{RId, PPid}] ->
+ ets:delete(State#state.db, RId),
+ PPid ! {self(), GIOPHdr};
+ [] ->
+ send_msg_error(Type, Socket, Bytes,
+ Env#giop_env{version =
+ GIOPHdr#giop_message.giop_version},
+ "No such fragment id")
+ end,
+ {noreply, State, State#state.timeout};
+ %% Must be a Request or LocateRequest which have been fragmented.
+ %% We need to decode the header to get the request-id.
+ #giop_message{fragments = true,
+ giop_version = {1,2}} = GIOPHdr ->
+ ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order,
+ GIOPHdr#giop_message.message),
+ Pid =
+ orber_iiop_inrequest:
+ start_fragment_collector(GIOPHdr, Bytes,
+ Type, Socket,
+ ReqId, self(),
+ State#state.max_fragments,
+ Env#giop_env{version = {1,2},
+ request_id = ReqId}),
+ ets:insert(State#state.db, {Pid, ReqId}),
+ ets:insert(State#state.db, {ReqId, Pid}),
+ {noreply, increase_counter(State), State#state.timeout};
+ GIOPHdr when is_record(GIOPHdr, giop_message) ->
+ Pid = orber_iiop_inrequest:start(GIOPHdr, Bytes, Type, Socket,
+ Env#giop_env{version =
+ GIOPHdr#giop_message.giop_version}),
+ ets:insert(State#state.db, {Pid, undefined}),
+ {noreply, increase_counter(State), State#state.timeout};
+ {'EXIT', message_error} ->
+ send_msg_error(Type, Socket, Bytes,
+ Env#giop_env{version = orber_env:giop_version()},
+ "Unable to decode the GIOP-header"),
+ {noreply, State, State#state.timeout}
+ end;
+handle_msg(Type, _, Bytes, State) ->
+ orber:dbg("[~p] orber_iiop_inproxy:handle_msg(~p);~n"
+ "Received a message from a socket of a different type.~n"
+ "Should be ~p but was ~p.",
+ [?LINE, Bytes, State#state.stype, Type], ?DEBUG_LEVEL),
+ {noreply, State, State#state.timeout}.
+
+send_msg_error(Type, Socket, Data, Env, Msg) ->
+ orber:dbg("[~p] orber_iiop_inproxy:handle_msg(~p); ~p.",
+ [?LINE, Data, Msg], ?DEBUG_LEVEL),
+ Reply = cdr_encode:enc_message_error(Env),
+ orber_socket:write(Type, Socket, Reply).
+
+increase_counter(#state{max_requests = infinity} = State) ->
+ State;
+increase_counter(#state{max_requests = Max,
+ request_counter = Counter} = State) when Max > Counter ->
+ orber_socket:setopts(State#state.stype, State#state.socket, [{active, once}]),
+ State#state{request_counter = Counter + 1};
+increase_counter(State) ->
+ State#state{request_counter = State#state.request_counter + 1}.
+
+decrease_counter(#state{max_requests = infinity} = State) ->
+ State;
+decrease_counter(#state{max_requests = Max,
+ request_counter = Counter} = State) when Max =< Counter ->
+ orber_socket:setopts(State#state.stype, State#state.socket, [{active, once}]),
+ State#state{request_counter = Counter - 1};
+decrease_counter(State) ->
+ State#state{request_counter = State#state.request_counter - 1}.
+
+update_state(#state{giop_env = Env} = State,
+ [{interceptors, false}|Options]) ->
+ update_state(State#state{giop_env =
+ Env#giop_env{interceptors = false}}, Options);
+update_state(#state{giop_env = #giop_env{interceptors = false, host = [SH],
+ iiop_port = SP} = Env,
+ peer = {PH, PP}, stype = normal} = State,
+ [{interceptors, {native, LPIs}}|Options]) ->
+ %% No Interceptor(s). Add the same Ref used by the built in interceptors.
+ update_state(State#state{giop_env =
+ Env#giop_env{interceptors =
+ {native, {PH, PP, SH, SP}, LPIs}}},
+ Options);
+update_state(#state{giop_env = #giop_env{interceptors = false, host = [SH],
+ iiop_ssl_port = SP} = Env,
+ peer = {PH, PP}, stype = ssl} = State,
+ [{interceptors, {native, LPIs}}|Options]) ->
+ %% No Interceptor(s). Add the same Ref used by the built in interceptors.
+ update_state(State#state{giop_env =
+ Env#giop_env{interceptors =
+ {native, {PH, PP, SH, SP}, LPIs}}},
+ Options);
+update_state(#state{giop_env = #giop_env{interceptors = {native, Ref, _}} = Env} =
+ State,
+ [{interceptors, {native, LPIs}}|Options]) ->
+ %% Interceptor(s) already in use. We must use the same Ref as before.
+ update_state(State#state{giop_env =
+ Env#giop_env{interceptors = {native, Ref, LPIs}}},
+ Options);
+update_state(State, [H|T]) ->
+ orber:dbg("[~p] orber_iiop_inproxy:update_state(~p, ~p)~n"
+ "Couldn't change the state.",
+ [?LINE, H, State], ?DEBUG_LEVEL),
+ update_state(State, T);
+update_state(State, []) ->
+ State.
+
+%%-----------------------------------------------------------------
+%% Func: code_change/3
+%%-----------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+