aboutsummaryrefslogblamecommitdiffstats
path: root/lib/cosEvent/src/oe_CosEventComm_Channel_impl.erl
blob: 531edaa0af24654c583f194f505eec515411427c (plain) (tree)




















































































































































































































































                                                                                                     
%%--------------------------------------------------------------------
%%
%% %CopyrightBegin%
%% 
%% Copyright Ericsson AB 2001-2009. All Rights Reserved.
%% 
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%% 
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%% 
%% %CopyrightEnd%
%%
%%
%%----------------------------------------------------------------------
%% File        : oe_CosEventComm_Channel_impl.erl
%% Description : 
%%
%%----------------------------------------------------------------------
-module(oe_CosEventComm_Channel_impl).

%%----------------------------------------------------------------------
%% Include files
%%----------------------------------------------------------------------
-include_lib("orber/include/corba.hrl").
-include("cosEventApp.hrl").


%%----------------------------------------------------------------------
%% External exports
%%----------------------------------------------------------------------
%% Mandatory
-export([init/1,
	 terminate/2,
	 code_change/3,
         handle_info/2]).

%% Exports from "CosEventChannelAdmin::EventChannel"
-export([for_consumers/3, 
	 for_suppliers/3, 
	 destroy/3]).
 

%%----------------------------------------------------------------------
%% Internal exports
%%----------------------------------------------------------------------
%% Exports from "oe_CosEventComm::Event"
-export([send/3, send_sync/4]).

%%----------------------------------------------------------------------
%% Records
%%----------------------------------------------------------------------
-record(state, {typecheck, pull_interval, maxevents, blocking, cadmins = [],
		server_options}).

%%----------------------------------------------------------------------
%% Macros
%%----------------------------------------------------------------------

%%======================================================================
%% External functions
%%======================================================================
%%----------------------------------------------------------------------
%% Function   : init/1
%% Returns    : {ok, State}          |
%%              {ok, State, Timeout} |
%%              ignore               |
%%              {stop, Reason}
%% Description: Initiates the server
%%----------------------------------------------------------------------
init([Options, ServerOpts]) ->
    process_flag(trap_exit, true),
    PullI = cosEventApp:get_option(?PULL_INTERVAL, Options),
    TC = cosEventApp:get_option(?TYPECHECK, Options),
    Max = cosEventApp:get_option(?MAXEVENTS, Options),
    Blocking = cosEventApp:get_option(?BLOCKING, Options),
    {ok, #state{typecheck = TC, pull_interval = PullI, maxevents = Max,
		blocking = Blocking, server_options = ServerOpts}}.

%%----------------------------------------------------------------------
%% Function   : terminate/2
%% Returns    : any (ignored by gen_server)
%% Description: Shutdown the server
%%----------------------------------------------------------------------
terminate(_Reason, _State) ->
    ?DBG("Terminating ~p~n", [_Reason]),
    ok.

%%----------------------------------------------------------------------
%% Function   : code_change/3
%% Returns    : {ok, NewState}
%% Description: Convert process state when code is changed
%%----------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

%%---------------------------------------------------------------------%
%% function : handle_info
%% Arguments: 
%% Returns  : {noreply, State} | 
%%            {stop, Reason, State}
%% Effect   : Functions demanded by the gen_server module. 
%%----------------------------------------------------------------------
handle_info({'EXIT', Pid, _Reason}, #state{cadmins = CAdmins} = State) ->
    ?DBG("Probably a child terminated with Reason: ~p~n", [_Reason]),
    {noreply, State#state{cadmins = lists:keydelete(Pid, 2, CAdmins)}};
handle_info(_Info, State) ->
    ?DBG("Unknown Info ~p~n", [_Info]),
    {noreply, State}.


%%----------------------------------------------------------------------
%% Function   : for_consumers
%% Arguments  : 
%% Returns    : 
%% Description: 
%%----------------------------------------------------------------------
for_consumers(_, _, #state{server_options = ServerOpts} = State) ->
    case catch 'oe_CosEventComm_CAdmin':oe_create_link([self(),
							State#state.typecheck,
							State#state.maxevents,
							ServerOpts],
						       [{sup_child, true}|ServerOpts]) of
	{ok, Pid, AdminCo} ->
	    ?DBG("Created a new oe_CosEventComm_CAdmin.~n", []),
	    {reply, AdminCo,
	     State#state{cadmins = [{AdminCo, Pid}|State#state.cadmins]}};
	Other ->
	    orber:dbg("[~p] oe_CosEventComm_Channel:for_consumers(); Error: ~p", 
		      [?LINE, Other], ?DEBUG_LEVEL),
	    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO})
    end.

%%----------------------------------------------------------------------
%% Function   : for_suppliers
%% Arguments  : 
%% Returns    : 
%% Description: 
%%----------------------------------------------------------------------
for_suppliers(OE_This, _, #state{server_options = ServerOpts} = State) ->
    case catch 'CosEventChannelAdmin_SupplierAdmin':oe_create_link([OE_This, self(),
								    State#state.typecheck, 
								    State#state.pull_interval,
								    ServerOpts],
								   [{sup_child, true}|ServerOpts]) of
	{ok, _Pid, AdminSu} ->
	    ?DBG("Created a new CosEventChannelAdmin_SupplierAdmin.~n", []),
	    {reply, AdminSu, State};
	Other ->
	    orber:dbg("[~p] oe_CosEventComm_Channel:for_suppliers();~nError: ~p", 
		      [?LINE, Other], ?DEBUG_LEVEL),
	    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO})
    end.

%%----------------------------------------------------------------------
%% Function   : destroy
%% Arguments  : 
%% Returns    : 
%% Description: 
%%----------------------------------------------------------------------
destroy(_, _, State) ->
    ?DBG("Destroy invoked.", []),
    {stop, normal, ok, State}.

%%----------------------------------------------------------------------
%% Function   : send
%% Arguments  : 
%% Returns    : 
%% Description: 
%%----------------------------------------------------------------------
send(_OE_This, #state{cadmins = CAdmins} = State, Any) ->
    ?DBG("Received Event ~p~n", [Any]),
    case send_helper(CAdmins, Any, [], false) of
	ok ->
	    ?DBG("Received Event and forwarded it successfully.~n", []),
	    {noreply, State};
	{error, Dropped} ->
	    ?DBG("Received Event but forward failed for: ~p~n", [Dropped]),
	    RemainingAdmins = delete_cadmin(Dropped, CAdmins),
	    {noreply, State#state{cadmins = RemainingAdmins}}
    end.

%%----------------------------------------------------------------------
%% Function   : send_sync
%% Arguments  : 
%% Returns    : 
%% Description: 
%%----------------------------------------------------------------------
send_sync(_OE_This, OE_From, #state{cadmins = CAdmins, blocking = BL} = State, Any) ->
    ?DBG("Received Event ~p~n", [Any]),
    corba:reply(OE_From, ok),
    case send_helper(CAdmins, Any, [], BL) of
	ok ->
	    ?DBG("Received Event and forwarded (sync) it successfully.~n", []),
	    {reply, ok, State};
	{error, Dropped} ->
	    ?DBG("Received Event but forward (sync) failed for: ~p~n", [Dropped]),
	    RemainingAdmins = delete_cadmin(Dropped, CAdmins),
	    {reply, ok, State#state{cadmins = RemainingAdmins}}
    end.


%%======================================================================
%% Internal functions
%%======================================================================
send_helper([], _, [], _) ->
    ok;
send_helper([], _, Dropped, _) ->
    {error, Dropped};
send_helper([{ObjRef, Pid}|T], Event, Dropped, false) ->
    case catch 'oe_CosEventComm_CAdmin':send(ObjRef, Event) of
	ok ->
	    send_helper(T, Event, Dropped, false);
	What ->
	    orber:dbg("[~p] oe_CosEventComm_Channel:send_helper(~p, ~p);~n" 
		      "Bad return value ~p. Closing connection.", 
		      [?LINE, ObjRef, Event, What], ?DEBUG_LEVEL),
	    send_helper(T, Event, [{ObjRef, Pid}|Dropped], false)
    end;
send_helper([{ObjRef, Pid}|T], Event, Dropped, Sync) ->
    case catch 'oe_CosEventComm_CAdmin':send_sync(ObjRef, Event) of
	ok ->
	    send_helper(T, Event, Dropped, Sync);
	What ->
	    orber:dbg("[~p] oe_CosEventComm_Channel:send_helper(~p, ~p);~n" 
		      "Bad return value ~p. Closing connection.", 
		      [?LINE, ObjRef, Event, What], ?DEBUG_LEVEL),
	    send_helper(T, Event, [{ObjRef, Pid}|Dropped], Sync)
    end.


delete_cadmin([], RemainingAdmins) ->
    RemainingAdmins;
delete_cadmin([{_,Pid}|T], CAdmins) ->
    Rest = lists:keydelete(Pid, 2, CAdmins),
    delete_cadmin(T, Rest).

%%======================================================================
%% END OF MODULE
%%======================================================================