aboutsummaryrefslogtreecommitdiffstats
path: root/lib/orber/src/orber_iiop_outproxy.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/orber/src/orber_iiop_outproxy.erl')
-rw-r--r--lib/orber/src/orber_iiop_outproxy.erl511
1 files changed, 0 insertions, 511 deletions
diff --git a/lib/orber/src/orber_iiop_outproxy.erl b/lib/orber/src/orber_iiop_outproxy.erl
deleted file mode 100644
index 1406a1ad56..0000000000
--- a/lib/orber/src/orber_iiop_outproxy.erl
+++ /dev/null
@@ -1,511 +0,0 @@
-%%--------------------------------------------------------------------
-%%
-%% %CopyrightBegin%
-%%
-%% Copyright Ericsson AB 1999-2016. All Rights Reserved.
-%%
-%% Licensed under the Apache License, Version 2.0 (the "License");
-%% you may not use this file except in compliance with the License.
-%% You may obtain a copy of the License at
-%%
-%% http://www.apache.org/licenses/LICENSE-2.0
-%%
-%% Unless required by applicable law or agreed to in writing, software
-%% distributed under the License is distributed on an "AS IS" BASIS,
-%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-%% See the License for the specific language governing permissions and
-%% limitations under the License.
-%%
-%% %CopyrightEnd%
-%%
-%%
-%%-----------------------------------------------------------------
-%% File: orber_iiop_outproxy.erl
-%%
-%% Description:
-%% This file contains the IIOP "proxy" for outgoing connections
-%%
-%%
-%%-----------------------------------------------------------------
--module(orber_iiop_outproxy).
-
--behaviour(gen_server).
-
--include_lib("orber/src/orber_iiop.hrl").
--include_lib("orber/include/corba.hrl").
-
-%%-----------------------------------------------------------------
-%% External exports
-%%-----------------------------------------------------------------
--export([start/0, start/1, request/5, cancel/2, cancel/3]).
-
-%%-----------------------------------------------------------------
-%% Internal exports
-%%-----------------------------------------------------------------
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- code_change/3, terminate/2, stop/2, stop/1, checkheaders/1]).
-
-%%-----------------------------------------------------------------
-%% Macros/Defines
-%%-----------------------------------------------------------------
--define(DEBUG_LEVEL, 7).
-
--record(state, {stype, socket, db, timeout, client_timeout, host, port, parent,
- error_reason = {'EXCEPTION', #'COMM_FAILURE'
- {completion_status=?COMPLETED_MAYBE}}}).
-
-%%-----------------------------------------------------------------
-%% External interface functions
-%%-----------------------------------------------------------------
-start() ->
- ignore.
-
-start(Opts) ->
- gen_server:start_link(orber_iiop_outproxy, Opts, []).
-
-request(Pid, true, Timeout, Msg, RequestId) ->
- %% Why not simply use gen_server:call? We must be able to receive
- %% more than one reply (i.e. fragmented messages).
- MRef = erlang:monitor(process, Pid),
- gen_server:cast(Pid, {request, Timeout, Msg, RequestId, self(), MRef}),
- receive
- {MRef, Reply} ->
- erlang:demonitor(MRef, [flush]),
- Reply;
- {'DOWN', MRef, _, Pid, _Reason} when is_pid(Pid) ->
- receive
- %% Clear EXIT message from queue
- {'EXIT', _Pid, _What} ->
- corba:raise(#'COMM_FAILURE'{completion_status=?COMPLETED_MAYBE})
- after 0 ->
- corba:raise(#'COMM_FAILURE'{completion_status=?COMPLETED_MAYBE})
- end;
- {fragmented, GIOPHdr, Bytes, RequestId, MRef} ->
- collect_fragments(GIOPHdr, [], Bytes, Pid, RequestId, MRef)
- end;
-request(Pid, _, _, Msg, _RequestId) ->
- %% No response expected
- gen_server:cast(Pid, {oneway_request, Msg}).
-
-cancel(Pid, RequestId) ->
- gen_server:cast(Pid, {cancel, RequestId}).
-
-cancel(Pid, RequestId, MRef) ->
- gen_server:cast(Pid, {cancel, RequestId, MRef, self()}).
-
-%%-----------------------------------------------------------------
-%% Internal interface functions
-%%-----------------------------------------------------------------
-%%-----------------------------------------------------------------
-%% Func: stop/2
-%%-----------------------------------------------------------------
-stop(Pid, Timeout) ->
- gen_server:call(Pid, stop, Timeout).
-stop(Pid) ->
- gen_server:cast(Pid, stop).
-
-
-%%-----------------------------------------------------------------
-%% Server functions
-%%-----------------------------------------------------------------
-%%-----------------------------------------------------------------
-%% Func: init/1
-%%-----------------------------------------------------------------
-init({connect, Host, Port, SocketType, SocketOptions, Parent, Key, NewKey}) ->
- process_flag(trap_exit, true),
- case catch orber_socket:connect(SocketType, Host, Port,
- orber_socket:get_ip_family_opts(Host) ++ SocketOptions) of
- {'EXCEPTION', _E} ->
- ignore;
- %% We used to reply the below but since this would generate a CRASH REPORT
- %% if '-boot start_sasl' used. Due to a request to change this behaviour
- %% we did.
- %% {stop, {'EXCEPTION', E}};
- Socket ->
- SockData = orber_socket:sockdata(SocketType, Socket),
- orber_iiop_pm:add_connection(Key, NewKey, SockData),
- Timeout = orber:iiop_connection_timeout(),
- {ok, #state{stype = SocketType, socket = Socket,
- db = ets:new(orber_outgoing_requests, [set]),
- timeout = Timeout, client_timeout = orber:iiop_timeout(),
- host = Host, port = Port, parent = Parent}, Timeout}
- end.
-
-%%-----------------------------------------------------------------
-%% Func: terminate/2
-%%-----------------------------------------------------------------
-terminate(_Reason, #state{db = OutRequests, error_reason = ER}) ->
- %% Kill all proxies and delete table before terminating
- notify_clients(OutRequests, ets:first(OutRequests), ER),
- ets:delete(OutRequests),
- ok.
-
-notify_clients(_, '$end_of_table', _ER) ->
- ok;
-notify_clients(OutRequests, Key, ER) ->
- case ets:lookup(OutRequests, Key) of
- [{_, Pid, TRef, MRef}] ->
- cancel_timer(TRef),
- Pid ! {MRef, ER},
- notify_clients(OutRequests, ets:next(OutRequests, Key), ER)
- end.
-
-%%-----------------------------------------------------------------
-%% Func: handle_call/3
-%%-----------------------------------------------------------------
-handle_call(stop, _From, State) ->
- {stop, normal, ok, State};
-handle_call(X, From, State) ->
- orber:dbg("[~p] orber_iiop_outproxy:handle_call(~p);~n"
- "Un-recognized call from ~p", [?LINE, X, From], ?DEBUG_LEVEL),
- {noreply, State, State#state.timeout}.
-
-%%-----------------------------------------------------------------
-%% Func: handle_cast/2
-%%-----------------------------------------------------------------
-handle_cast({request, Timeout, Msg, RequestId, From, MRef},
- #state{client_timeout = DefaultTimeout} = State) ->
- orber_socket:write(State#state.stype, State#state.socket, Msg),
- true = ets:insert(State#state.db, {RequestId, From,
- start_timer(Timeout, DefaultTimeout, RequestId),
- MRef}),
- {noreply, State, State#state.timeout};
-handle_cast({oneway_request, Msg}, State) ->
- orber_socket:write(State#state.stype, State#state.socket, Msg),
- {noreply, State, State#state.timeout};
-handle_cast({cancel, ReqId}, State) ->
- case ets:lookup(State#state.db, ReqId) of
- [{ReqId, _From, TRef, _MRef}] ->
- cancel_timer(TRef),
- ets:delete(State#state.db, ReqId),
- orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p);~n"
- "Request cancelled", [?LINE, State], ?DEBUG_LEVEL),
- {noreply, State, State#state.timeout};
- _ ->
- {noreply, State, State#state.timeout}
- end;
-handle_cast({cancel, ReqId, MRef, From}, State) ->
- case ets:lookup(State#state.db, ReqId) of
- [{ReqId, From, TRef, MRef}] ->
- cancel_timer(TRef),
- ets:delete(State#state.db, ReqId),
- From ! {MRef, ReqId, cancelled},
- orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p);
-Request cancelled", [?LINE, State], ?DEBUG_LEVEL),
- {noreply, State, State#state.timeout};
- _ ->
- From ! {MRef, ReqId, cancelled},
- {noreply, State, State#state.timeout}
- end;
-handle_cast(stop, State) ->
- {stop, normal, State};
-handle_cast(X, State) ->
- orber:dbg("[~p] orber_iiop_outproxy:handle_cast(~p);
-Un-recognized cast.", [?LINE, X], ?DEBUG_LEVEL),
- {noreply, State, State#state.timeout}.
-
-%%-----------------------------------------------------------------
-%% Func: handle_info/2
-%%-----------------------------------------------------------------
-handle_info({tcp, _Socket, Bytes}, State) ->
- handle_reply(Bytes, State);
-handle_info({ssl, _Socket, Bytes}, State) ->
- handle_reply(Bytes, State);
-handle_info({tcp_closed, _Socket}, State) ->
- {stop, normal, State};
-handle_info({ssl_closed, _Socket}, State) ->
- {stop, normal, State};
-handle_info({tcp_error, Socket, Reason}, #state{socket = Socket, host = Host,
- port = Port} = State) ->
- orber:error("[~p] IIOP proxy received the TCP error message: ~p~n"
- "The server-side ORB is located at '~p:~p'~n"
- "See the gen_tcp/inet documentation for more information.",
- [?LINE, Reason, Host, Port], ?DEBUG_LEVEL),
- {stop, normal, State};
-handle_info({ssl_error, Socket, Reason}, #state{socket = Socket, host = Host,
- port = Port} = State) ->
- orber:error("[~p] IIOP proxy received the SSL error message: ~p~n"
- "The server-side ORB is located at '~p:~p'~n"
- "See the SSL-application documentation for more information.",
- [?LINE, Reason, Host, Port], ?DEBUG_LEVEL),
- {stop, normal, State};
-handle_info({timeout, _TRef, ReqId}, State) ->
- case ets:lookup(State#state.db, ReqId) of
- [{ReqId, Pid, _, MRef}] ->
- ets:delete(State#state.db, ReqId),
- Pid ! {MRef, {'EXCEPTION', #'TIMEOUT'{completion_status=?COMPLETED_MAYBE}}},
- orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p, ~p);~n"
- "Request timed out",
- [?LINE, State#state.host, State#state.port], ?DEBUG_LEVEL),
- {noreply, State, State#state.timeout};
- _ ->
- {noreply, State, State#state.timeout}
- end;
-handle_info(stop, State) ->
- {stop, normal, State};
-handle_info(timeout, State) ->
- case ets:info(State#state.db, size) of
- 0 ->
- orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p, ~p);~n"
- "Outgoing connection timed out after ~p msec",
- [?LINE, State#state.host, State#state.port,
- State#state.timeout], ?DEBUG_LEVEL),
- {stop, normal, State};
- _Amount ->
- %% Still pending request, cannot close the connection.
- {noreply, State, State#state.timeout}
- end;
-handle_info({'EXIT', Parent, Reason}, #state{parent = Parent} = State) ->
- orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p);~nParent terminated.",
- [?LINE, Reason], ?DEBUG_LEVEL),
- {stop, normal, State};
-handle_info({reconfigure, _Options}, State) ->
- %% Currently there are no parameters that can be changed.
- {noreply, State, State#state.timeout};
-handle_info(X, State) ->
- orber:dbg("[~p] orber_iiop_outproxy:handle_info(~p);~nUn-recognized info.",
- [?LINE, X], ?DEBUG_LEVEL),
- {noreply, State, State#state.timeout}.
-
-
-handle_reply(Bytes, State) ->
- %% Check IIOP headers and fetch request id
- case catch checkheaders(cdr_decode:dec_giop_message_header(Bytes)) of
- {'reply', ReplyHeader, Rest, Len, ByteOrder} ->
- case ets:lookup(State#state.db, ReplyHeader#reply_header.request_id) of
- [{_, Pid, TRef, MRef}] ->
- %% Send reply to the correct request process
- cancel_timer(TRef),
- Pid ! {MRef, {reply, ReplyHeader, Rest, Len, ByteOrder, Bytes}},
- ets:delete(State#state.db, ReplyHeader#reply_header.request_id),
- {noreply, State, State#state.timeout};
- _ ->
- {noreply, State, State#state.timeout}
- end;
- {'locate_reply', LocateReplyHeader, LocateRest, LocateLen, LocateByteOrder} ->
- case ets:lookup(State#state.db,
- LocateReplyHeader#locate_reply_header.request_id) of
- [{_, Pid, TRef, MRef}] ->
- %% Send reply to the correct request process
- cancel_timer(TRef),
- Pid ! {MRef, {locate_reply, LocateReplyHeader,
- LocateRest, LocateLen, LocateByteOrder}},
- ets:delete(State#state.db,
- LocateReplyHeader#locate_reply_header.request_id),
- {noreply, State, State#state.timeout};
- _ ->
- {noreply, State, State#state.timeout}
- end;
- {fragment, GIOPHdr, ReqId, false} ->
- %% Last fragment, cancel timer and remove from DB.
- case ets:lookup(State#state.db, ReqId) of
- [{_, Pid, TRef, MRef}] ->
- cancel_timer(TRef),
- Pid ! {fragment, GIOPHdr, ReqId, MRef},
- ets:delete(State#state.db, ReqId),
- {noreply, State, State#state.timeout};
- _ ->
- %% Probably cancelled
- {noreply, State, State#state.timeout}
- end;
- {fragment, GIOPHdr, ReqId, _} ->
- %% More fragments expected
- case ets:lookup(State#state.db, ReqId) of
- [{_, Pid, _, MRef}] ->
- Pid ! {fragment, GIOPHdr, ReqId, MRef},
- {noreply, State, State#state.timeout};
- _ ->
- %% Probably cancelled
- {noreply, State, State#state.timeout}
- end;
- {fragmented, GIOPHdr, ReqId} ->
- %% This the initial message (i.e. a LocateReply or Reply).
- case ets:lookup(State#state.db, ReqId) of
- [{_, Pid, _TRef, MRef}] ->
- Pid ! {fragmented, GIOPHdr, Bytes, ReqId, MRef},
- {noreply, State, State#state.timeout};
- _ ->
- {noreply, State, State#state.timeout}
- end;
- {'EXCEPTION', DecodeException} ->
- orber:dbg("[~p] orber_iiop_outproxy:handle_reply(~p); decode exception(~p).",
- [?LINE, Bytes, DecodeException], ?DEBUG_LEVEL),
- {noreply, State, State#state.timeout};
- {'EXIT', message_error} ->
- orber:dbg("[~p] orber_iiop_outproxy:handle_reply(~p); message error.",
- [?LINE, Bytes], ?DEBUG_LEVEL),
- ME = cdr_encode:enc_message_error(#giop_env{version =
- orber:giop_version()}),
- orber_socket:write(State#state.stype, State#state.socket, ME),
- {noreply, State, State#state.timeout};
- {'EXIT', R} ->
- orber:dbg("[~p] orber_iiop_outproxy:handle_reply(~p); got exit(~p)",
- [?LINE, Bytes, R], ?DEBUG_LEVEL),
- {noreply, State, State#state.timeout};
- close_connection ->
- orber:dbg("[~p] orber_iiop_outproxy:handle_reply();
-The Server-side ORB closed the connection.", [?LINE], ?DEBUG_LEVEL),
- {stop, normal, State};
- {error, no_reply} ->
- {noreply, State, State#state.timeout};
- X ->
- orber:dbg("[~p] orber_iiop_outproxy:handle_reply(~p); message error(~p).",
- [?LINE, Bytes, X], ?DEBUG_LEVEL),
- {noreply, State, State#state.timeout}
- end.
-
-
-%%-----------------------------------------------------------------
-%% Func: code_change/3
-%%-----------------------------------------------------------------
-code_change(_OldVsn, State, _Extra) ->
- {ok, State}.
-
-%%-----------------------------------------------------------------
-%% Internal functions
-%%-----------------------------------------------------------------
-checkheaders(#giop_message{message_type = ?GIOP_MSG_CLOSE_CONNECTION}) ->
- close_connection;
-checkheaders(#giop_message{message_type = ?GIOP_MSG_FRAGMENT,
- giop_version = {1,2},
- fragments = MoreFrag} = GIOPHdr) ->
- %% A fragment; we must have received a Request or LocateRequest
- %% with fragment-flag set to true.
- %% We need to decode the header to get the request-id.
- ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order,
- GIOPHdr#giop_message.message),
- {fragment, GIOPHdr, ReqId, MoreFrag};
-checkheaders(#giop_message{fragments = true,
- giop_version = {1,2}} = GIOPHdr) ->
- %% Must be a Reply or LocateReply which have been fragmented.
- %% We need to decode the header to get the request-id.
- ReqId = cdr_decode:peek_request_id(GIOPHdr#giop_message.byte_order,
- GIOPHdr#giop_message.message),
- {fragmented, GIOPHdr, ReqId};
-checkheaders(#giop_message{fragments = false,
- message_type = ?GIOP_MSG_REPLY} = GIOPHdr) ->
- {ReplyHeader, Rest, Len} =
- cdr_decode:dec_reply_header(GIOPHdr#giop_message.giop_version,
- GIOPHdr#giop_message.message,
- ?GIOP_HEADER_SIZE,
- GIOPHdr#giop_message.byte_order),
- {'reply', ReplyHeader, Rest, Len, GIOPHdr#giop_message.byte_order};
-checkheaders(#giop_message{fragments = false,
- message_type = ?GIOP_MSG_LOCATE_REPLY} = GIOPHdr) ->
- {LocateReplyHeader, Rest, Len} =
- cdr_decode:dec_locate_reply_header(GIOPHdr#giop_message.giop_version,
- GIOPHdr#giop_message.message,
- ?GIOP_HEADER_SIZE,
- GIOPHdr#giop_message.byte_order),
- {'locate_reply', LocateReplyHeader, Rest, Len, GIOPHdr#giop_message.byte_order};
-checkheaders(What) ->
- orber:dbg("[~p] orber_iiop_outproxy:checkheaders(~p)
-Un-recognized GIOP header.", [?LINE, What], ?DEBUG_LEVEL),
- {error, no_reply}.
-
-
-cancel_timer(infinity) ->
- ok;
-cancel_timer(TRef) ->
- erlang:cancel_timer(TRef).
-
-start_timer(infinity, infinity, _) ->
- infinity;
-start_timer(infinity, Timeout, RequestId) ->
- erlang:start_timer(Timeout, self(), RequestId);
-start_timer(Timeout, _, RequestId) ->
- erlang:start_timer(Timeout, self(), RequestId).
-
-
-
-collect_fragments(GIOPHdr1, InBuffer, Bytes, Proxy, RequestId, MRef) ->
- receive
- %% There are more framents to come; just collect this message and wait for
- %% the rest.
- {fragment, #giop_message{byte_order = _ByteOrder,
- message = Message,
- fragments = true} = GIOPHdr2, RequestId, MRef} ->
- case catch cdr_decode:dec_message_header(null, GIOPHdr2, Message) of
- {_, #fragment_header{}, FragBody, _, _} ->
- collect_fragments(GIOPHdr1, [FragBody|InBuffer],
- Bytes, Proxy, RequestId, MRef);
- Other ->
- cancel(Proxy, RequestId, MRef),
- clear_queue(Proxy, RequestId, MRef),
- orber:dbg("[~p] orber_iiop:collect_fragments(~p)",
- [?LINE, Other], ?DEBUG_LEVEL),
- corba:raise(#'MARSHAL'{minor=(?ORBER_VMCID bor 18),
- completion_status=?COMPLETED_YES})
- end;
- %% This is the last fragment. Now we can but together the fragments, decode
- %% the reply and send it to the client.
- {fragment, #giop_message{byte_order = ByteOrder,
- message = Message} = GIOPHdr2, RequestId, MRef} ->
- erlang:demonitor(MRef, [flush]),
- case catch cdr_decode:dec_message_header(null, GIOPHdr2, Message) of
- {_, #fragment_header{}, FragBody, _, _} ->
- %% This buffer is all the fragments concatenated.
- Buffer = lists:reverse([FragBody|InBuffer]),
-
- %% Create a GIOP-message which is exactly as if hadn't been fragmented.
- NewGIOP = GIOPHdr1#giop_message
- {message = list_to_binary([GIOPHdr1#giop_message.message|Buffer]),
- fragments = false},
- case checkheaders(NewGIOP) of
- {'reply', ReplyHeader, Rest, Len, ByteOrder} ->
- %% We must keep create a copy of all bytes, as if the
- %% message wasn't fragmented, to be able handle TypeCode
- %% indirection.
- {'reply', ReplyHeader, Rest, Len, ByteOrder,
- list_to_binary([Bytes|Buffer])};
- {'locate_reply', ReplyHdr, Rest, Len, ByteOrder} ->
- {'locate_reply', ReplyHdr, Rest, Len, ByteOrder};
- Error ->
- orber:dbg("[~p] orber_iiop:collect_fragments(~p, ~p);
-Unable to decode Reply or LocateReply header",[?LINE, NewGIOP, Error], ?DEBUG_LEVEL),
- corba:raise(#'MARSHAL'{minor=(?ORBER_VMCID bor 18),
- completion_status=?COMPLETED_YES})
- end;
- Other ->
- orber:dbg("[~p] orber_iiop:collect_fragments(~p);",
- [?LINE, Other], ?DEBUG_LEVEL),
- corba:raise(#'MARSHAL'{minor=(?ORBER_VMCID bor 18),
- completion_status=?COMPLETED_YES})
- end;
- {MRef, {'EXCEPTION', E}} ->
- orber:dbg("[~p] orber_iiop:collect_fragments(~p);",
- [?LINE, E], ?DEBUG_LEVEL),
- erlang:demonitor(MRef, [flush]),
- corba:raise(E);
- {'DOWN', MRef, _, Proxy, Reason} when is_pid(Proxy) ->
- orber:dbg("[~p] orber_iiop:collect_fragments(~p);~n"
- "Monitor generated a DOWN message.",
- [?LINE, Reason], ?DEBUG_LEVEL),
- receive
- %% Clear EXIT message from queue
- {'EXIT', _Proxy, _What} ->
- corba:raise(#'COMM_FAILURE'{completion_status=?COMPLETED_MAYBE})
- after 0 ->
- corba:raise(#'COMM_FAILURE'{completion_status=?COMPLETED_MAYBE})
- end
- end.
-
-clear_queue(Proxy, RequestId, MRef) ->
- receive
- {fragment, _, RequestId, MRef} ->
- clear_queue(Proxy, RequestId, MRef);
- {MRef, RequestId, cancelled} ->
- %% This is the last message that the proxy will send
- %% after we've cancelled the request.
- erlang:demonitor(MRef, [flush]),
- ok;
- {'DOWN', MRef, _, Proxy, _Reason} ->
- %% The proxy terminated. Clear EXIT message from queue
- receive
- {'EXIT', Proxy, _What} ->
- ok
- after 0 ->
- ok
- end
- end.
-