aboutsummaryrefslogtreecommitdiffstats
path: root/lib/cosEvent/src/oe_CosEventComm_Channel_impl.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/cosEvent/src/oe_CosEventComm_Channel_impl.erl')
-rw-r--r--lib/cosEvent/src/oe_CosEventComm_Channel_impl.erl246
1 files changed, 246 insertions, 0 deletions
diff --git a/lib/cosEvent/src/oe_CosEventComm_Channel_impl.erl b/lib/cosEvent/src/oe_CosEventComm_Channel_impl.erl
new file mode 100644
index 0000000000..531edaa0af
--- /dev/null
+++ b/lib/cosEvent/src/oe_CosEventComm_Channel_impl.erl
@@ -0,0 +1,246 @@
+%%--------------------------------------------------------------------
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2001-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+%%
+%%----------------------------------------------------------------------
+%% File : oe_CosEventComm_Channel_impl.erl
+%% Description :
+%%
+%%----------------------------------------------------------------------
+-module(oe_CosEventComm_Channel_impl).
+
+%%----------------------------------------------------------------------
+%% Include files
+%%----------------------------------------------------------------------
+-include_lib("orber/include/corba.hrl").
+-include("cosEventApp.hrl").
+
+
+%%----------------------------------------------------------------------
+%% External exports
+%%----------------------------------------------------------------------
+%% Mandatory
+-export([init/1,
+ terminate/2,
+ code_change/3,
+ handle_info/2]).
+
+%% Exports from "CosEventChannelAdmin::EventChannel"
+-export([for_consumers/3,
+ for_suppliers/3,
+ destroy/3]).
+
+
+%%----------------------------------------------------------------------
+%% Internal exports
+%%----------------------------------------------------------------------
+%% Exports from "oe_CosEventComm::Event"
+-export([send/3, send_sync/4]).
+
+%%----------------------------------------------------------------------
+%% Records
+%%----------------------------------------------------------------------
+-record(state, {typecheck, pull_interval, maxevents, blocking, cadmins = [],
+ server_options}).
+
+%%----------------------------------------------------------------------
+%% Macros
+%%----------------------------------------------------------------------
+
+%%======================================================================
+%% External functions
+%%======================================================================
+%%----------------------------------------------------------------------
+%% Function : init/1
+%% Returns : {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%%----------------------------------------------------------------------
+init([Options, ServerOpts]) ->
+ process_flag(trap_exit, true),
+ PullI = cosEventApp:get_option(?PULL_INTERVAL, Options),
+ TC = cosEventApp:get_option(?TYPECHECK, Options),
+ Max = cosEventApp:get_option(?MAXEVENTS, Options),
+ Blocking = cosEventApp:get_option(?BLOCKING, Options),
+ {ok, #state{typecheck = TC, pull_interval = PullI, maxevents = Max,
+ blocking = Blocking, server_options = ServerOpts}}.
+
+%%----------------------------------------------------------------------
+%% Function : terminate/2
+%% Returns : any (ignored by gen_server)
+%% Description: Shutdown the server
+%%----------------------------------------------------------------------
+terminate(_Reason, _State) ->
+ ?DBG("Terminating ~p~n", [_Reason]),
+ ok.
+
+%%----------------------------------------------------------------------
+%% Function : code_change/3
+%% Returns : {ok, NewState}
+%% Description: Convert process state when code is changed
+%%----------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%---------------------------------------------------------------------%
+%% function : handle_info
+%% Arguments:
+%% Returns : {noreply, State} |
+%% {stop, Reason, State}
+%% Effect : Functions demanded by the gen_server module.
+%%----------------------------------------------------------------------
+handle_info({'EXIT', Pid, _Reason}, #state{cadmins = CAdmins} = State) ->
+ ?DBG("Probably a child terminated with Reason: ~p~n", [_Reason]),
+ {noreply, State#state{cadmins = lists:keydelete(Pid, 2, CAdmins)}};
+handle_info(_Info, State) ->
+ ?DBG("Unknown Info ~p~n", [_Info]),
+ {noreply, State}.
+
+
+%%----------------------------------------------------------------------
+%% Function : for_consumers
+%% Arguments :
+%% Returns :
+%% Description:
+%%----------------------------------------------------------------------
+for_consumers(_, _, #state{server_options = ServerOpts} = State) ->
+ case catch 'oe_CosEventComm_CAdmin':oe_create_link([self(),
+ State#state.typecheck,
+ State#state.maxevents,
+ ServerOpts],
+ [{sup_child, true}|ServerOpts]) of
+ {ok, Pid, AdminCo} ->
+ ?DBG("Created a new oe_CosEventComm_CAdmin.~n", []),
+ {reply, AdminCo,
+ State#state{cadmins = [{AdminCo, Pid}|State#state.cadmins]}};
+ Other ->
+ orber:dbg("[~p] oe_CosEventComm_Channel:for_consumers(); Error: ~p",
+ [?LINE, Other], ?DEBUG_LEVEL),
+ corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO})
+ end.
+
+%%----------------------------------------------------------------------
+%% Function : for_suppliers
+%% Arguments :
+%% Returns :
+%% Description:
+%%----------------------------------------------------------------------
+for_suppliers(OE_This, _, #state{server_options = ServerOpts} = State) ->
+ case catch 'CosEventChannelAdmin_SupplierAdmin':oe_create_link([OE_This, self(),
+ State#state.typecheck,
+ State#state.pull_interval,
+ ServerOpts],
+ [{sup_child, true}|ServerOpts]) of
+ {ok, _Pid, AdminSu} ->
+ ?DBG("Created a new CosEventChannelAdmin_SupplierAdmin.~n", []),
+ {reply, AdminSu, State};
+ Other ->
+ orber:dbg("[~p] oe_CosEventComm_Channel:for_suppliers();~nError: ~p",
+ [?LINE, Other], ?DEBUG_LEVEL),
+ corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO})
+ end.
+
+%%----------------------------------------------------------------------
+%% Function : destroy
+%% Arguments :
+%% Returns :
+%% Description:
+%%----------------------------------------------------------------------
+destroy(_, _, State) ->
+ ?DBG("Destroy invoked.", []),
+ {stop, normal, ok, State}.
+
+%%----------------------------------------------------------------------
+%% Function : send
+%% Arguments :
+%% Returns :
+%% Description:
+%%----------------------------------------------------------------------
+send(_OE_This, #state{cadmins = CAdmins} = State, Any) ->
+ ?DBG("Received Event ~p~n", [Any]),
+ case send_helper(CAdmins, Any, [], false) of
+ ok ->
+ ?DBG("Received Event and forwarded it successfully.~n", []),
+ {noreply, State};
+ {error, Dropped} ->
+ ?DBG("Received Event but forward failed for: ~p~n", [Dropped]),
+ RemainingAdmins = delete_cadmin(Dropped, CAdmins),
+ {noreply, State#state{cadmins = RemainingAdmins}}
+ end.
+
+%%----------------------------------------------------------------------
+%% Function : send_sync
+%% Arguments :
+%% Returns :
+%% Description:
+%%----------------------------------------------------------------------
+send_sync(_OE_This, OE_From, #state{cadmins = CAdmins, blocking = BL} = State, Any) ->
+ ?DBG("Received Event ~p~n", [Any]),
+ corba:reply(OE_From, ok),
+ case send_helper(CAdmins, Any, [], BL) of
+ ok ->
+ ?DBG("Received Event and forwarded (sync) it successfully.~n", []),
+ {reply, ok, State};
+ {error, Dropped} ->
+ ?DBG("Received Event but forward (sync) failed for: ~p~n", [Dropped]),
+ RemainingAdmins = delete_cadmin(Dropped, CAdmins),
+ {reply, ok, State#state{cadmins = RemainingAdmins}}
+ end.
+
+
+%%======================================================================
+%% Internal functions
+%%======================================================================
+send_helper([], _, [], _) ->
+ ok;
+send_helper([], _, Dropped, _) ->
+ {error, Dropped};
+send_helper([{ObjRef, Pid}|T], Event, Dropped, false) ->
+ case catch 'oe_CosEventComm_CAdmin':send(ObjRef, Event) of
+ ok ->
+ send_helper(T, Event, Dropped, false);
+ What ->
+ orber:dbg("[~p] oe_CosEventComm_Channel:send_helper(~p, ~p);~n"
+ "Bad return value ~p. Closing connection.",
+ [?LINE, ObjRef, Event, What], ?DEBUG_LEVEL),
+ send_helper(T, Event, [{ObjRef, Pid}|Dropped], false)
+ end;
+send_helper([{ObjRef, Pid}|T], Event, Dropped, Sync) ->
+ case catch 'oe_CosEventComm_CAdmin':send_sync(ObjRef, Event) of
+ ok ->
+ send_helper(T, Event, Dropped, Sync);
+ What ->
+ orber:dbg("[~p] oe_CosEventComm_Channel:send_helper(~p, ~p);~n"
+ "Bad return value ~p. Closing connection.",
+ [?LINE, ObjRef, Event, What], ?DEBUG_LEVEL),
+ send_helper(T, Event, [{ObjRef, Pid}|Dropped], Sync)
+ end.
+
+
+delete_cadmin([], RemainingAdmins) ->
+ RemainingAdmins;
+delete_cadmin([{_,Pid}|T], CAdmins) ->
+ Rest = lists:keydelete(Pid, 2, CAdmins),
+ delete_cadmin(T, Rest).
+
+%%======================================================================
+%% END OF MODULE
+%%======================================================================