diff options
author | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
---|---|---|
committer | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
commit | 84adefa331c4159d432d22840663c38f155cd4c1 (patch) | |
tree | bff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/orber/src/orber_iiop_outproxy.erl | |
download | otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2 otp-84adefa331c4159d432d22840663c38f155cd4c1.zip |
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/orber/src/orber_iiop_outproxy.erl')
-rw-r--r-- | lib/orber/src/orber_iiop_outproxy.erl | 530 |
1 files changed, 530 insertions, 0 deletions
diff --git a/lib/orber/src/orber_iiop_outproxy.erl b/lib/orber/src/orber_iiop_outproxy.erl new file mode 100644 index 0000000000..879af8222d --- /dev/null +++ b/lib/orber/src/orber_iiop_outproxy.erl @@ -0,0 +1,530 @@ +%%-------------------------------------------------------------------- +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1999-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%% +%%----------------------------------------------------------------- +%% File: orber_iiop_outproxy.erl +%% +%% Description: +%% This file contains the IIOP "proxy" for outgoing connections +%% +%% +%%----------------------------------------------------------------- +-module(orber_iiop_outproxy). + +-behaviour(gen_server). + +-include_lib("orber/src/orber_iiop.hrl"). +-include_lib("orber/include/corba.hrl"). + +%%----------------------------------------------------------------- +%% External exports +%%----------------------------------------------------------------- +-export([start/0, start/1, request/5, cancel/2, cancel/3]). + +%%----------------------------------------------------------------- +%% Internal exports +%%----------------------------------------------------------------- +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + code_change/3, terminate/2, stop/2, stop/1, checkheaders/1]). + +%%----------------------------------------------------------------- +%% Macros/Defines +%%----------------------------------------------------------------- +-define(DEBUG_LEVEL, 7). + +-record(state, {stype, socket, db, timeout, client_timeout, host, port, parent, + error_reason = {'EXCEPTION', #'COMM_FAILURE' + {completion_status=?COMPLETED_MAYBE}}}). + +%%----------------------------------------------------------------- +%% External interface functions +%%----------------------------------------------------------------- +start() -> + ignore. + +start(Opts) -> + gen_server:start_link(orber_iiop_outproxy, Opts, []). + +request(Pid, true, Timeout, Msg, RequestId) -> + %% Why not simply use gen_server:call? We must be able to receive + %% more than one reply (i.e. fragmented messages). + MRef = erlang:monitor(process, Pid), + gen_server:cast(Pid, {request, Timeout, Msg, RequestId, self(), MRef}), + receive + {MRef, Reply} -> + erlang:demonitor(MRef), + receive + {'DOWN', MRef, _, _, _} -> + Reply + after 0 -> + Reply + end; + {'DOWN', MRef, _, Pid, _Reason} when is_pid(Pid) -> + receive + %% Clear EXIT message from queue + {'EXIT', _Pid, _What} -> + corba:raise(#'COMM_FAILURE'{completion_status=?COMPLETED_MAYBE}) + after 0 -> + corba:raise(#'COMM_FAILURE'{completion_status=?COMPLETED_MAYBE}) + end; + {fragmented, GIOPHdr, Bytes, RequestId, MRef} -> + collect_fragments(GIOPHdr, [], Bytes, Pid, RequestId, MRef) + end; +request(Pid, _, _, Msg, _RequestId) -> + %% No response expected + gen_server:cast(Pid, {oneway_request, Msg}). + +cancel(Pid, RequestId) -> + gen_server:cast(Pid, {cancel, RequestId}). + +cancel(Pid, RequestId, MRef) -> + gen_server:cast(Pid, {cancel, RequestId, MRef, self()}). + +%%----------------------------------------------------------------- +%% Internal interface functions +%%----------------------------------------------------------------- +%%----------------------------------------------------------------- +%% Func: stop/2 +%%----------------------------------------------------------------- +stop(Pid, Timeout) -> + gen_server:call(Pid, stop, Timeout). +stop(Pid) -> + gen_server:cast(Pid, stop). + + +%%----------------------------------------------------------------- +%% Server functions +%%----------------------------------------------------------------- +%%----------------------------------------------------------------- +%% Func: init/1 +%%----------------------------------------------------------------- +init({connect, Host, Port, SocketType, SocketOptions, Parent, Key, NewKey}) -> + process_flag(trap_exit, true), + case catch orber_socket:connect(SocketType, Host, Port, SocketOptions) of + {'EXCEPTION', _E} -> + ignore; + %% We used to reply the below but since this would generate a CRASH REPORT + %% if '-boot start_sasl' used. Due to a request to change this behaviour + %% we did. + %% {stop, {'EXCEPTION', E}}; + Socket -> + SockData = orber_socket:sockdata(SocketType, Socket), + orber_iiop_pm:add_connection(Key, NewKey, SockData), + Timeout = orber:iiop_connection_timeout(), + {ok, #state{stype = SocketType, socket = Socket, + db = ets:new(orber_outgoing_requests, [set]), + timeout = Timeout, client_timeout = orber:iiop_timeout(), + host = Host, port = Port, parent = Parent}, Timeout} + end. + +%%----------------------------------------------------------------- +%% Func: terminate/2 +%%----------------------------------------------------------------- +terminate(_Reason, #state{db = OutRequests, error_reason = ER}) -> + %% Kill all proxies and delete table before terminating + notify_clients(OutRequests, ets:first(OutRequests), ER), + ets:delete(OutRequests), + ok. + +notify_clients(_, '$end_of_table', _ER) -> + ok; +notify_clients(OutRequests, Key, ER) -> + case ets:lookup(OutRequests, Key) of + [{_, Pid, TRef, MRef}] -> + cancel_timer(TRef), + Pid ! {MRef, ER}, + notify_clients(OutRequests, ets:next(OutRequests, Key), ER) + end. + +%%----------------------------------------------------------------- +%% Func: handle_call/3 +%%----------------------------------------------------------------- +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; +handle_call(X, From, State) -> + orber:dbg("[~p] orber_iiop_outproxy:handle_call(~p);~n" + "Un-recognized call from ~p", [?LINE, X, From], ?DEBUG_LEVEL), + {noreply, State, State#state.timeout}. + +%%----------------------------------------------------------------- +%% Func: handle_cast/2 +%%----------------------------------------------------------------- +handle_cast({request, Timeout, Msg, RequestId, From, MRef}, + #state{client_timeout = DefaultTimeout} = State) -> + orber_socket:write(State#state.stype, State#state.socket, Msg), + true = ets:insert(State#state.db, {RequestId, From, + start_timer(Timeout, DefaultTimeout, RequestId), + MRef}), + {noreply, State, State#state.timeout}; +handle_cast({oneway_request, Msg}, State) -> + orber_socket:write(State#state.stype, State#state.socket, Msg), + {noreply, State, State#state.timeout}; +handle_cast({cancel, ReqId}, State) -> + case ets:lookup(State#state.db, ReqId) of + [{ReqId, _From, TRef, _MRef}] -> + cancel_timer(TRef), + ets:delete(State#state.db, ReqId), + orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p);~n" + "Request cancelled", [?LINE, State], ?DEBUG_LEVEL), + {noreply, State, State#state.timeout}; + _ -> + {noreply, State, State#state.timeout} + end; +handle_cast({cancel, ReqId, MRef, From}, State) -> + case ets:lookup(State#state.db, ReqId) of + [{ReqId, From, TRef, MRef}] -> + cancel_timer(TRef), + ets:delete(State#state.db, ReqId), + From ! {MRef, ReqId, cancelled}, + orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p); +Request cancelled", [?LINE, State], ?DEBUG_LEVEL), + {noreply, State, State#state.timeout}; + _ -> + From ! {MRef, ReqId, cancelled}, + {noreply, State, State#state.timeout} + end; +handle_cast(stop, State) -> + {stop, normal, State}; +handle_cast(X, State) -> + orber:dbg("[~p] orber_iiop_outproxy:handle_cast(~p); +Un-recognized cast.", [?LINE, X], ?DEBUG_LEVEL), + {noreply, State, State#state.timeout}. + +%%----------------------------------------------------------------- +%% Func: handle_info/2 +%%----------------------------------------------------------------- +handle_info({tcp, _Socket, Bytes}, State) -> + handle_reply(Bytes, State); +handle_info({ssl, _Socket, Bytes}, State) -> + handle_reply(Bytes, State); +handle_info({tcp_closed, _Socket}, State) -> + {stop, normal, State}; +handle_info({ssl_closed, _Socket}, State) -> + {stop, normal, State}; +handle_info({tcp_error, Socket, Reason}, #state{socket = Socket, host = Host, + port = Port} = State) -> + orber:error("[~p] IIOP proxy received the TCP error message: ~p~n" + "The server-side ORB is located at '~p:~p'~n" + "See the gen_tcp/inet documentation for more information.", + [?LINE, Reason, Host, Port], ?DEBUG_LEVEL), + {stop, normal, State}; +handle_info({ssl_error, Socket, Reason}, #state{socket = Socket, host = Host, + port = Port} = State) -> + orber:error("[~p] IIOP proxy received the SSL error message: ~p~n" + "The server-side ORB is located at '~p:~p'~n" + "See the SSL-application documentation for more information.", + [?LINE, Reason, Host, Port], ?DEBUG_LEVEL), + {stop, normal, State}; +handle_info({timeout, _TRef, ReqId}, State) -> + case ets:lookup(State#state.db, ReqId) of + [{ReqId, Pid, _, MRef}] -> + ets:delete(State#state.db, ReqId), + Pid ! {MRef, {'EXCEPTION', #'TIMEOUT'{completion_status=?COMPLETED_MAYBE}}}, + orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p, ~p);~n" + "Request timed out", + [?LINE, State#state.host, State#state.port], ?DEBUG_LEVEL), + {noreply, State, State#state.timeout}; + _ -> + {noreply, State, State#state.timeout} + end; +handle_info(stop, State) -> + {stop, normal, State}; +handle_info(timeout, State) -> + case ets:info(State#state.db, size) of + 0 -> + orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p, ~p);~n" + "Outgoing connection timed out after ~p msec", + [?LINE, State#state.host, State#state.port, + State#state.timeout], ?DEBUG_LEVEL), + {stop, normal, State}; + _Amount -> + %% Still pending request, cannot close the connection. + {noreply, State, State#state.timeout} + end; +handle_info({'EXIT', Parent, Reason}, #state{parent = Parent} = State) -> + orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p);~nParent terminated.", + [?LINE, Reason], ?DEBUG_LEVEL), + {stop, normal, State}; +handle_info({reconfigure, _Options}, State) -> + %% Currently there are no parameters that can be changed. + {noreply, State, State#state.timeout}; +handle_info(X, State) -> + orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p);~nUn-recognized info.", + [?LINE, X], ?DEBUG_LEVEL), + {noreply, State, State#state.timeout}. + + +handle_reply(Bytes, State) -> + %% Check IIOP headers and fetch request id + case catch checkheaders(cdr_decode:dec_giop_message_header(Bytes)) of + {'reply', ReplyHeader, Rest, Len, ByteOrder} -> + case ets:lookup(State#state.db, ReplyHeader#reply_header.request_id) of + [{_, Pid, TRef, MRef}] -> + %% Send reply to the correct request process + cancel_timer(TRef), + Pid ! {MRef, {reply, ReplyHeader, Rest, Len, ByteOrder, Bytes}}, + ets:delete(State#state.db, ReplyHeader#reply_header.request_id), + {noreply, State, State#state.timeout}; + _ -> + {noreply, State, State#state.timeout} + end; + {'locate_reply', LocateReplyHeader, LocateRest, LocateLen, LocateByteOrder} -> + case ets:lookup(State#state.db, + LocateReplyHeader#locate_reply_header.request_id) of + [{_, Pid, TRef, MRef}] -> + %% Send reply to the correct request process + cancel_timer(TRef), + Pid ! {MRef, {locate_reply, LocateReplyHeader, + LocateRest, LocateLen, LocateByteOrder}}, + ets:delete(State#state.db, + LocateReplyHeader#locate_reply_header.request_id), + {noreply, State, State#state.timeout}; + _ -> + {noreply, State, State#state.timeout} + end; + {fragment, GIOPHdr, ReqId, false} -> + %% Last fragment, cancel timer and remove from DB. + case ets:lookup(State#state.db, ReqId) of + [{_, Pid, TRef, MRef}] -> + cancel_timer(TRef), + Pid ! {fragment, GIOPHdr, ReqId, MRef}, + ets:delete(State#state.db, ReqId), + {noreply, State, State#state.timeout}; + _ -> + %% Probably cancelled + {noreply, State, State#state.timeout} + end; + {fragment, GIOPHdr, ReqId, _} -> + %% More fragments expected + case ets:lookup(State#state.db, ReqId) of + [{_, Pid, _, MRef}] -> + Pid ! {fragment, GIOPHdr, ReqId, MRef}, + {noreply, State, State#state.timeout}; + _ -> + %% Probably cancelled + {noreply, State, State#state.timeout} + end; + {fragmented, GIOPHdr, ReqId} -> + %% This the initial message (i.e. a LocateReply or Reply). + case ets:lookup(State#state.db, ReqId) of + [{_, Pid, _TRef, MRef}] -> + Pid ! {fragmented, GIOPHdr, Bytes, ReqId, MRef}, + {noreply, State, State#state.timeout}; + _ -> + {noreply, State, State#state.timeout} + end; + {'EXCEPTION', DecodeException} -> + orber:dbg("[~p] orber_iiop_outproxy:handle_reply(~p); decode exception(~p).", + [?LINE, Bytes, DecodeException], ?DEBUG_LEVEL), + {noreply, State, State#state.timeout}; + {'EXIT', message_error} -> + orber:dbg("[~p] orber_iiop_outproxy:handle_reply(~p); message error.", + [?LINE, Bytes], ?DEBUG_LEVEL), + ME = cdr_encode:enc_message_error(#giop_env{version = + orber:giop_version()}), + orber_socket:write(State#state.stype, State#state.socket, ME), + {noreply, State, State#state.timeout}; + {'EXIT', R} -> + orber:dbg("[~p] orber_iiop_outproxy:handle_reply(~p); got exit(~p)", + [?LINE, Bytes, R], ?DEBUG_LEVEL), + {noreply, State, State#state.timeout}; + close_connection -> + orber:dbg("[~p] orber_iiop_outproxy:handle_reply(); +The Server-side ORB closed the connection.", [?LINE], ?DEBUG_LEVEL), + {stop, normal, State}; + {error, no_reply} -> + {noreply, State, State#state.timeout}; + X -> + orber:dbg("[~p] orber_iiop_outproxy:handle_reply(~p); message error(~p).", + [?LINE, Bytes, X], ?DEBUG_LEVEL), + {noreply, State, State#state.timeout} + end. + + +%%----------------------------------------------------------------- +%% Func: code_change/3 +%%----------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%----------------------------------------------------------------- +%% Internal functions +%%----------------------------------------------------------------- +checkheaders(#giop_message{message_type = ?GIOP_MSG_CLOSE_CONNECTION}) -> + close_connection; +checkheaders(#giop_message{message_type = ?GIOP_MSG_FRAGMENT, + giop_version = {1,2}, + fragments = MoreFrag} = GIOPHdr) -> + %% A fragment; we must have received a Request or LocateRequest + %% with fragment-flag set to true. + %% We need to decode the header to get the request-id. + ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order, + GIOPHdr#giop_message.message), + {fragment, GIOPHdr, ReqId, MoreFrag}; +checkheaders(#giop_message{fragments = true, + giop_version = {1,2}} = GIOPHdr) -> + %% Must be a Reply or LocateReply which have been fragmented. + %% We need to decode the header to get the request-id. + ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order, + GIOPHdr#giop_message.message), + {fragmented, GIOPHdr, ReqId}; +checkheaders(#giop_message{fragments = false, + message_type = ?GIOP_MSG_REPLY} = GIOPHdr) -> + {ReplyHeader, Rest, Len} = + cdr_decode:dec_reply_header(GIOPHdr#giop_message.giop_version, + GIOPHdr#giop_message.message, + ?GIOP_HEADER_SIZE, + GIOPHdr#giop_message.byte_order), + {'reply', ReplyHeader, Rest, Len, GIOPHdr#giop_message.byte_order}; +checkheaders(#giop_message{fragments = false, + message_type = ?GIOP_MSG_LOCATE_REPLY} = GIOPHdr) -> + {LocateReplyHeader, Rest, Len} = + cdr_decode:dec_locate_reply_header(GIOPHdr#giop_message.giop_version, + GIOPHdr#giop_message.message, + ?GIOP_HEADER_SIZE, + GIOPHdr#giop_message.byte_order), + {'locate_reply', LocateReplyHeader, Rest, Len, GIOPHdr#giop_message.byte_order}; +checkheaders(What) -> + orber:dbg("[~p] orber_iiop_outproxy:checkheaders(~p) +Un-recognized GIOP header.", [?LINE, What], ?DEBUG_LEVEL), + {error, no_reply}. + + +cancel_timer(infinity) -> + ok; +cancel_timer(TRef) -> + erlang:cancel_timer(TRef). + +start_timer(infinity, infinity, _) -> + infinity; +start_timer(infinity, Timeout, RequestId) -> + erlang:start_timer(Timeout, self(), RequestId); +start_timer(Timeout, _, RequestId) -> + erlang:start_timer(Timeout, self(), RequestId). + + + +collect_fragments(GIOPHdr1, InBuffer, Bytes, Proxy, RequestId, MRef) -> + receive + %% There are more framents to come; just collect this message and wait for + %% the rest. + {fragment, #giop_message{byte_order = _ByteOrder, + message = Message, + fragments = true} = GIOPHdr2, RequestId, MRef} -> + case catch cdr_decode:dec_message_header(null, GIOPHdr2, Message) of + {_, #fragment_header{}, FragBody, _, _} -> + collect_fragments(GIOPHdr1, [FragBody|InBuffer], + Bytes, Proxy, RequestId, MRef); + Other -> + cancel(Proxy, RequestId, MRef), + clear_queue(Proxy, RequestId, MRef), + orber:dbg("[~p] orber_iiop:collect_fragments(~p)", + [?LINE, Other], ?DEBUG_LEVEL), + corba:raise(#'MARSHAL'{minor=(?ORBER_VMCID bor 18), + completion_status=?COMPLETED_YES}) + end; + %% This is the last fragment. Now we can but together the fragments, decode + %% the reply and send it to the client. + {fragment, #giop_message{byte_order = ByteOrder, + message = Message} = GIOPHdr2, RequestId, MRef} -> + erlang:demonitor(MRef), + receive + {'DOWN', MRef, _, _, _} -> + ok + after 0 -> + ok + end, + case catch cdr_decode:dec_message_header(null, GIOPHdr2, Message) of + {_, #fragment_header{}, FragBody, _, _} -> + %% This buffer is all the fragments concatenated. + Buffer = lists:reverse([FragBody|InBuffer]), + + %% Create a GIOP-message which is exactly as if hadn't been fragmented. + NewGIOP = GIOPHdr1#giop_message + {message = list_to_binary([GIOPHdr1#giop_message.message|Buffer]), + fragments = false}, + case checkheaders(NewGIOP) of + {'reply', ReplyHeader, Rest, Len, ByteOrder} -> + %% We must keep create a copy of all bytes, as if the + %% message wasn't fragmented, to be able handle TypeCode + %% indirection. + {'reply', ReplyHeader, Rest, Len, ByteOrder, + list_to_binary([Bytes|Buffer])}; + {'locate_reply', ReplyHdr, Rest, Len, ByteOrder} -> + {'locate_reply', ReplyHdr, Rest, Len, ByteOrder}; + Error -> + orber:dbg("[~p] orber_iiop:collect_fragments(~p, ~p); +Unable to decode Reply or LocateReply header",[?LINE, NewGIOP, Error], ?DEBUG_LEVEL), + corba:raise(#'MARSHAL'{minor=(?ORBER_VMCID bor 18), + completion_status=?COMPLETED_YES}) + end; + Other -> + orber:dbg("[~p] orber_iiop:collect_fragments(~p);", + [?LINE, Other], ?DEBUG_LEVEL), + corba:raise(#'MARSHAL'{minor=(?ORBER_VMCID bor 18), + completion_status=?COMPLETED_YES}) + end; + {MRef, {'EXCEPTION', E}} -> + orber:dbg("[~p] orber_iiop:collect_fragments(~p);", + [?LINE, E], ?DEBUG_LEVEL), + erlang:demonitor(MRef), + receive + {'DOWN', MRef, _, _, _} -> + corba:raise(E) + after 0 -> + corba:raise(E) + end; + {'DOWN', MRef, _, Proxy, Reason} when is_pid(Proxy) -> + orber:dbg("[~p] orber_iiop:collect_fragments(~p);~n" + "Monitor generated a DOWN message.", + [?LINE, Reason], ?DEBUG_LEVEL), + receive + %% Clear EXIT message from queue + {'EXIT', _Proxy, _What} -> + corba:raise(#'COMM_FAILURE'{completion_status=?COMPLETED_MAYBE}) + after 0 -> + corba:raise(#'COMM_FAILURE'{completion_status=?COMPLETED_MAYBE}) + end + end. + +clear_queue(Proxy, RequestId, MRef) -> + receive + {fragment, _, RequestId, MRef} -> + clear_queue(Proxy, RequestId, MRef); + {MRef, RequestId, cancelled} -> + %% This is the last message that the proxy will send + %% after we've cancelled the request. + erlang:demonitor(MRef), + receive + {'DOWN', MRef, _, _, _} -> + ok + after 0 -> + ok + end; + {'DOWN', MRef, _, Proxy, _Reason} -> + %% The proxy terminated. Clear EXIT message from queue + receive + {'EXIT', Proxy, _What} -> + ok + after 0 -> + ok + end + end. + |