%%-------------------------------------------------------------------- %% %% %CopyrightBegin% %% %% Copyright Ericsson AB 1999-2009. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. %% %% %CopyrightEnd% %% %% %%----------------------------------------------------------------- %% File: orber_iiop_outproxy.erl %% %% Description: %% This file contains the IIOP "proxy" for outgoing connections %% %% %%----------------------------------------------------------------- -module(orber_iiop_outproxy). -behaviour(gen_server). -include_lib("orber/src/orber_iiop.hrl"). -include_lib("orber/include/corba.hrl"). %%----------------------------------------------------------------- %% External exports %%----------------------------------------------------------------- -export([start/0, start/1, request/5, cancel/2, cancel/3]). %%----------------------------------------------------------------- %% Internal exports %%----------------------------------------------------------------- -export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2, stop/2, stop/1, checkheaders/1]). %%----------------------------------------------------------------- %% Macros/Defines %%----------------------------------------------------------------- -define(DEBUG_LEVEL, 7). -record(state, {stype, socket, db, timeout, client_timeout, host, port, parent, error_reason = {'EXCEPTION', #'COMM_FAILURE' {completion_status=?COMPLETED_MAYBE}}}). %%----------------------------------------------------------------- %% External interface functions %%----------------------------------------------------------------- start() -> ignore. start(Opts) -> gen_server:start_link(orber_iiop_outproxy, Opts, []). request(Pid, true, Timeout, Msg, RequestId) -> %% Why not simply use gen_server:call? We must be able to receive %% more than one reply (i.e. fragmented messages). MRef = erlang:monitor(process, Pid), gen_server:cast(Pid, {request, Timeout, Msg, RequestId, self(), MRef}), receive {MRef, Reply} -> erlang:demonitor(MRef), receive {'DOWN', MRef, _, _, _} -> Reply after 0 -> Reply end; {'DOWN', MRef, _, Pid, _Reason} when is_pid(Pid) -> receive %% Clear EXIT message from queue {'EXIT', _Pid, _What} -> corba:raise(#'COMM_FAILURE'{completion_status=?COMPLETED_MAYBE}) after 0 -> corba:raise(#'COMM_FAILURE'{completion_status=?COMPLETED_MAYBE}) end; {fragmented, GIOPHdr, Bytes, RequestId, MRef} -> collect_fragments(GIOPHdr, [], Bytes, Pid, RequestId, MRef) end; request(Pid, _, _, Msg, _RequestId) -> %% No response expected gen_server:cast(Pid, {oneway_request, Msg}). cancel(Pid, RequestId) -> gen_server:cast(Pid, {cancel, RequestId}). cancel(Pid, RequestId, MRef) -> gen_server:cast(Pid, {cancel, RequestId, MRef, self()}). %%----------------------------------------------------------------- %% Internal interface functions %%----------------------------------------------------------------- %%----------------------------------------------------------------- %% Func: stop/2 %%----------------------------------------------------------------- stop(Pid, Timeout) -> gen_server:call(Pid, stop, Timeout). stop(Pid) -> gen_server:cast(Pid, stop). %%----------------------------------------------------------------- %% Server functions %%----------------------------------------------------------------- %%----------------------------------------------------------------- %% Func: init/1 %%----------------------------------------------------------------- init({connect, Host, Port, SocketType, SocketOptions, Parent, Key, NewKey}) -> process_flag(trap_exit, true), case catch orber_socket:connect(SocketType, Host, Port, SocketOptions) of {'EXCEPTION', _E} -> ignore; %% We used to reply the below but since this would generate a CRASH REPORT %% if '-boot start_sasl' used. Due to a request to change this behaviour %% we did. %% {stop, {'EXCEPTION', E}}; Socket -> SockData = orber_socket:sockdata(SocketType, Socket), orber_iiop_pm:add_connection(Key, NewKey, SockData), Timeout = orber:iiop_connection_timeout(), {ok, #state{stype = SocketType, socket = Socket, db = ets:new(orber_outgoing_requests, [set]), timeout = Timeout, client_timeout = orber:iiop_timeout(), host = Host, port = Port, parent = Parent}, Timeout} end. %%----------------------------------------------------------------- %% Func: terminate/2 %%----------------------------------------------------------------- terminate(_Reason, #state{db = OutRequests, error_reason = ER}) -> %% Kill all proxies and delete table before terminating notify_clients(OutRequests, ets:first(OutRequests), ER), ets:delete(OutRequests), ok. notify_clients(_, '$end_of_table', _ER) -> ok; notify_clients(OutRequests, Key, ER) -> case ets:lookup(OutRequests, Key) of [{_, Pid, TRef, MRef}] -> cancel_timer(TRef), Pid ! {MRef, ER}, notify_clients(OutRequests, ets:next(OutRequests, Key), ER) end. %%----------------------------------------------------------------- %% Func: handle_call/3 %%----------------------------------------------------------------- handle_call(stop, _From, State) -> {stop, normal, ok, State}; handle_call(X, From, State) -> orber:dbg("[~p] orber_iiop_outproxy:handle_call(~p);~n" "Un-recognized call from ~p", [?LINE, X, From], ?DEBUG_LEVEL), {noreply, State, State#state.timeout}. %%----------------------------------------------------------------- %% Func: handle_cast/2 %%----------------------------------------------------------------- handle_cast({request, Timeout, Msg, RequestId, From, MRef}, #state{client_timeout = DefaultTimeout} = State) -> orber_socket:write(State#state.stype, State#state.socket, Msg), true = ets:insert(State#state.db, {RequestId, From, start_timer(Timeout, DefaultTimeout, RequestId), MRef}), {noreply, State, State#state.timeout}; handle_cast({oneway_request, Msg}, State) -> orber_socket:write(State#state.stype, State#state.socket, Msg), {noreply, State, State#state.timeout}; handle_cast({cancel, ReqId}, State) -> case ets:lookup(State#state.db, ReqId) of [{ReqId, _From, TRef, _MRef}] -> cancel_timer(TRef), ets:delete(State#state.db, ReqId), orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p);~n" "Request cancelled", [?LINE, State], ?DEBUG_LEVEL), {noreply, State, State#state.timeout}; _ -> {noreply, State, State#state.timeout} end; handle_cast({cancel, ReqId, MRef, From}, State) -> case ets:lookup(State#state.db, ReqId) of [{ReqId, From, TRef, MRef}] -> cancel_timer(TRef), ets:delete(State#state.db, ReqId), From ! {MRef, ReqId, cancelled}, orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p); Request cancelled", [?LINE, State], ?DEBUG_LEVEL), {noreply, State, State#state.timeout}; _ -> From ! {MRef, ReqId, cancelled}, {noreply, State, State#state.timeout} end; handle_cast(stop, State) -> {stop, normal, State}; handle_cast(X, State) -> orber:dbg("[~p] orber_iiop_outproxy:handle_cast(~p); Un-recognized cast.", [?LINE, X], ?DEBUG_LEVEL), {noreply, State, State#state.timeout}. %%----------------------------------------------------------------- %% Func: handle_info/2 %%----------------------------------------------------------------- handle_info({tcp, _Socket, Bytes}, State) -> handle_reply(Bytes, State); handle_info({ssl, _Socket, Bytes}, State) -> handle_reply(Bytes, State); handle_info({tcp_closed, _Socket}, State) -> {stop, normal, State}; handle_info({ssl_closed, _Socket}, State) -> {stop, normal, State}; handle_info({tcp_error, Socket, Reason}, #state{socket = Socket, host = Host, port = Port} = State) -> orber:error("[~p] IIOP proxy received the TCP error message: ~p~n" "The server-side ORB is located at '~p:~p'~n" "See the gen_tcp/inet documentation for more information.", [?LINE, Reason, Host, Port], ?DEBUG_LEVEL), {stop, normal, State}; handle_info({ssl_error, Socket, Reason}, #state{socket = Socket, host = Host, port = Port} = State) -> orber:error("[~p] IIOP proxy received the SSL error message: ~p~n" "The server-side ORB is located at '~p:~p'~n" "See the SSL-application documentation for more information.", [?LINE, Reason, Host, Port], ?DEBUG_LEVEL), {stop, normal, State}; handle_info({timeout, _TRef, ReqId}, State) -> case ets:lookup(State#state.db, ReqId) of [{ReqId, Pid, _, MRef}] -> ets:delete(State#state.db, ReqId), Pid ! {MRef, {'EXCEPTION', #'TIMEOUT'{completion_status=?COMPLETED_MAYBE}}}, orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p, ~p);~n" "Request timed out", [?LINE, State#state.host, State#state.port], ?DEBUG_LEVEL), {noreply, State, State#state.timeout}; _ -> {noreply, State, State#state.timeout} end; handle_info(stop, State) -> {stop, normal, State}; handle_info(timeout, State) -> case ets:info(State#state.db, size) of 0 -> orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p, ~p);~n" "Outgoing connection timed out after ~p msec", [?LINE, State#state.host, State#state.port, State#state.timeout], ?DEBUG_LEVEL), {stop, normal, State}; _Amount -> %% Still pending request, cannot close the connection. {noreply, State, State#state.timeout} end; handle_info({'EXIT', Parent, Reason}, #state{parent = Parent} = State) -> orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p);~nParent terminated.", [?LINE, Reason], ?DEBUG_LEVEL), {stop, normal, State}; handle_info({reconfigure, _Options}, State) -> %% Currently there are no parameters that can be changed. {noreply, State, State#state.timeout}; handle_info(X, State) -> orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p);~nUn-recognized info.", [?LINE, X], ?DEBUG_LEVEL), {noreply, State, State#state.timeout}. handle_reply(Bytes, State) -> %% Check IIOP headers and fetch request id case catch checkheaders(cdr_decode:dec_giop_message_header(Bytes)) of {'reply', ReplyHeader, Rest, Len, ByteOrder} -> case ets:lookup(State#state.db, ReplyHeader#reply_header.request_id) of [{_, Pid, TRef, MRef}] -> %% Send reply to the correct request process cancel_timer(TRef), Pid ! {MRef, {reply, ReplyHeader, Rest, Len, ByteOrder, Bytes}}, ets:delete(State#state.db, ReplyHeader#reply_header.request_id), {noreply, State, State#state.timeout}; _ -> {noreply, State, State#state.timeout} end; {'locate_reply', LocateReplyHeader, LocateRest, LocateLen, LocateByteOrder} -> case ets:lookup(State#state.db, LocateReplyHeader#locate_reply_header.request_id) of [{_, Pid, TRef, MRef}] -> %% Send reply to the correct request process cancel_timer(TRef), Pid ! {MRef, {locate_reply, LocateReplyHeader, LocateRest, LocateLen, LocateByteOrder}}, ets:delete(State#state.db, LocateReplyHeader#locate_reply_header.request_id), {noreply, State, State#state.timeout}; _ -> {noreply, State, State#state.timeout} end; {fragment, GIOPHdr, ReqId, false} -> %% Last fragment, cancel timer and remove from DB. case ets:lookup(State#state.db, ReqId) of [{_, Pid, TRef, MRef}] -> cancel_timer(TRef), Pid ! {fragment, GIOPHdr, ReqId, MRef}, ets:delete(State#state.db, ReqId), {noreply, State, State#state.timeout}; _ -> %% Probably cancelled {noreply, State, State#state.timeout} end; {fragment, GIOPHdr, ReqId, _} -> %% More fragments expected case ets:lookup(State#state.db, ReqId) of [{_, Pid, _, MRef}] -> Pid ! {fragment, GIOPHdr, ReqId, MRef}, {noreply, State, State#state.timeout}; _ -> %% Probably cancelled {noreply, State, State#state.timeout} end; {fragmented, GIOPHdr, ReqId} -> %% This the initial message (i.e. a LocateReply or Reply). case ets:lookup(State#state.db, ReqId) of [{_, Pid, _TRef, MRef}] -> Pid ! {fragmented, GIOPHdr, Bytes, ReqId, MRef}, {noreply, State, State#state.timeout}; _ -> {noreply, State, State#state.timeout} end; {'EXCEPTION', DecodeException} -> orber:dbg("[~p] orber_iiop_outproxy:handle_reply(~p); decode exception(~p).", [?LINE, Bytes, DecodeException], ?DEBUG_LEVEL), {noreply, State, State#state.timeout}; {'EXIT', message_error} -> orber:dbg("[~p] orber_iiop_outproxy:handle_reply(~p); message error.", [?LINE, Bytes], ?DEBUG_LEVEL), ME = cdr_encode:enc_message_error(#giop_env{version = orber:giop_version()}), orber_socket:write(State#state.stype, State#state.socket, ME), {noreply, State, State#state.timeout}; {'EXIT', R} -> orber:dbg("[~p] orber_iiop_outproxy:handle_reply(~p); got exit(~p)", [?LINE, Bytes, R], ?DEBUG_LEVEL), {noreply, State, State#state.timeout}; close_connection -> orber:dbg("[~p] orber_iiop_outproxy:handle_reply(); The Server-side ORB closed the connection.", [?LINE], ?DEBUG_LEVEL), {stop, normal, State}; {error, no_reply} -> {noreply, State, State#state.timeout}; X -> orber:dbg("[~p] orber_iiop_outproxy:handle_reply(~p); message error(~p).", [?LINE, Bytes, X], ?DEBUG_LEVEL), {noreply, State, State#state.timeout} end. %%----------------------------------------------------------------- %% Func: code_change/3 %%----------------------------------------------------------------- code_change(_OldVsn, State, _Extra) -> {ok, State}. %%----------------------------------------------------------------- %% Internal functions %%----------------------------------------------------------------- checkheaders(#giop_message{message_type = ?GIOP_MSG_CLOSE_CONNECTION}) -> close_connection; checkheaders(#giop_message{message_type = ?GIOP_MSG_FRAGMENT, giop_version = {1,2}, fragments = MoreFrag} = GIOPHdr) -> %% A fragment; we must have received a Request or LocateRequest %% with fragment-flag set to true. %% We need to decode the header to get the request-id. ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order, GIOPHdr#giop_message.message), {fragment, GIOPHdr, ReqId, MoreFrag}; checkheaders(#giop_message{fragments = true, giop_version = {1,2}} = GIOPHdr) -> %% Must be a Reply or LocateReply which have been fragmented. %% We need to decode the header to get the request-id. ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order, GIOPHdr#giop_message.message), {fragmented, GIOPHdr, ReqId}; checkheaders(#giop_message{fragments = false, message_type = ?GIOP_MSG_REPLY} = GIOPHdr) -> {ReplyHeader, Rest, Len} = cdr_decode:dec_reply_header(GIOPHdr#giop_message.giop_version, GIOPHdr#giop_message.message, ?GIOP_HEADER_SIZE, GIOPHdr#giop_message.byte_order), {'reply', ReplyHeader, Rest, Len, GIOPHdr#giop_message.byte_order}; checkheaders(#giop_message{fragments = false, message_type = ?GIOP_MSG_LOCATE_REPLY} = GIOPHdr) -> {LocateReplyHeader, Rest, Len} = cdr_decode:dec_locate_reply_header(GIOPHdr#giop_message.giop_version, GIOPHdr#giop_message.message, ?GIOP_HEADER_SIZE, GIOPHdr#giop_message.byte_order), {'locate_reply', LocateReplyHeader, Rest, Len, GIOPHdr#giop_message.byte_order}; checkheaders(What) -> orber:dbg("[~p] orber_iiop_outproxy:checkheaders(~p) Un-recognized GIOP header.", [?LINE, What], ?DEBUG_LEVEL), {error, no_reply}. cancel_timer(infinity) -> ok; cancel_timer(TRef) -> erlang:cancel_timer(TRef). start_timer(infinity, infinity, _) -> infinity; start_timer(infinity, Timeout, RequestId) -> erlang:start_timer(Timeout, self(), RequestId); start_timer(Timeout, _, RequestId) -> erlang:start_timer(Timeout, self(), RequestId). collect_fragments(GIOPHdr1, InBuffer, Bytes, Proxy, RequestId, MRef) -> receive %% There are more framents to come; just collect this message and wait for %% the rest. {fragment, #giop_message{byte_order = _ByteOrder, message = Message, fragments = true} = GIOPHdr2, RequestId, MRef} -> case catch cdr_decode:dec_message_header(null, GIOPHdr2, Message) of {_, #fragment_header{}, FragBody, _, _} -> collect_fragments(GIOPHdr1, [FragBody|InBuffer], Bytes, Proxy, RequestId, MRef); Other -> cancel(Proxy, RequestId, MRef), clear_queue(Proxy, RequestId, MRef), orber:dbg("[~p] orber_iiop:collect_fragments(~p)", [?LINE, Other], ?DEBUG_LEVEL), corba:raise(#'MARSHAL'{minor=(?ORBER_VMCID bor 18), completion_status=?COMPLETED_YES}) end; %% This is the last fragment. Now we can but together the fragments, decode %% the reply and send it to the client. {fragment, #giop_message{byte_order = ByteOrder, message = Message} = GIOPHdr2, RequestId, MRef} -> erlang:demonitor(MRef), receive {'DOWN', MRef, _, _, _} -> ok after 0 -> ok end, case catch cdr_decode:dec_message_header(null, GIOPHdr2, Message) of {_, #fragment_header{}, FragBody, _, _} -> %% This buffer is all the fragments concatenated. Buffer = lists:reverse([FragBody|InBuffer]), %% Create a GIOP-message which is exactly as if hadn't been fragmented. NewGIOP = GIOPHdr1#giop_message {message = list_to_binary([GIOPHdr1#giop_message.message|Buffer]), fragments = false}, case checkheaders(NewGIOP) of {'reply', ReplyHeader, Rest, Len, ByteOrder} -> %% We must keep create a copy of all bytes, as if the %% message wasn't fragmented, to be able handle TypeCode %% indirection. {'reply', ReplyHeader, Rest, Len, ByteOrder, list_to_binary([Bytes|Buffer])}; {'locate_reply', ReplyHdr, Rest, Len, ByteOrder} -> {'locate_reply', ReplyHdr, Rest, Len, ByteOrder}; Error -> orber:dbg("[~p] orber_iiop:collect_fragments(~p, ~p); Unable to decode Reply or LocateReply header",[?LINE, NewGIOP, Error], ?DEBUG_LEVEL), corba:raise(#'MARSHAL'{minor=(?ORBER_VMCID bor 18), completion_status=?COMPLETED_YES}) end; Other -> orber:dbg("[~p] orber_iiop:collect_fragments(~p);", [?LINE, Other], ?DEBUG_LEVEL), corba:raise(#'MARSHAL'{minor=(?ORBER_VMCID bor 18), completion_status=?COMPLETED_YES}) end; {MRef, {'EXCEPTION', E}} -> orber:dbg("[~p] orber_iiop:collect_fragments(~p);", [?LINE, E], ?DEBUG_LEVEL), erlang:demonitor(MRef), receive {'DOWN', MRef, _, _, _} -> corba:raise(E) after 0 -> corba:raise(E) end; {'DOWN', MRef, _, Proxy, Reason} when is_pid(Proxy) -> orber:dbg("[~p] orber_iiop:collect_fragments(~p);~n" "Monitor generated a DOWN message.", [?LINE, Reason], ?DEBUG_LEVEL), receive %% Clear EXIT message from queue {'EXIT', _Proxy, _What} -> corba:raise(#'COMM_FAILURE'{completion_status=?COMPLETED_MAYBE}) after 0 -> corba:raise(#'COMM_FAILURE'{completion_status=?COMPLETED_MAYBE}) end end. clear_queue(Proxy, RequestId, MRef) -> receive {fragment, _, RequestId, MRef} -> clear_queue(Proxy, RequestId, MRef); {MRef, RequestId, cancelled} -> %% This is the last message that the proxy will send %% after we've cancelled the request. erlang:demonitor(MRef), receive {'DOWN', MRef, _, _, _} -> ok after 0 -> ok end; {'DOWN', MRef, _, Proxy, _Reason} -> %% The proxy terminated. Clear EXIT message from queue receive {'EXIT', Proxy, _What} -> ok after 0 -> ok end end.