%%--------------------------------------------------------------------
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2001-2009. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%%
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%%
%% %CopyrightEnd%
%%
%%
%%----------------------------------------------------------------------
%% File : oe_CosEventComm_Channel_impl.erl
%% Description :
%%
%%----------------------------------------------------------------------
-module(oe_CosEventComm_Channel_impl).
%%----------------------------------------------------------------------
%% Include files
%%----------------------------------------------------------------------
-include_lib("orber/include/corba.hrl").
-include("cosEventApp.hrl").
%%----------------------------------------------------------------------
%% External exports
%%----------------------------------------------------------------------
%% Mandatory
-export([init/1,
terminate/2,
code_change/3,
handle_info/2]).
%% Exports from "CosEventChannelAdmin::EventChannel"
-export([for_consumers/3,
for_suppliers/3,
destroy/3]).
%%----------------------------------------------------------------------
%% Internal exports
%%----------------------------------------------------------------------
%% Exports from "oe_CosEventComm::Event"
-export([send/3, send_sync/4]).
%%----------------------------------------------------------------------
%% Records
%%----------------------------------------------------------------------
-record(state, {typecheck, pull_interval, maxevents, blocking, cadmins = [],
server_options}).
%%----------------------------------------------------------------------
%% Macros
%%----------------------------------------------------------------------
%%======================================================================
%% External functions
%%======================================================================
%%----------------------------------------------------------------------
%% Function : init/1
%% Returns : {ok, State} |
%% {ok, State, Timeout} |
%% ignore |
%% {stop, Reason}
%% Description: Initiates the server
%%----------------------------------------------------------------------
init([Options, ServerOpts]) ->
process_flag(trap_exit, true),
PullI = cosEventApp:get_option(?PULL_INTERVAL, Options),
TC = cosEventApp:get_option(?TYPECHECK, Options),
Max = cosEventApp:get_option(?MAXEVENTS, Options),
Blocking = cosEventApp:get_option(?BLOCKING, Options),
{ok, #state{typecheck = TC, pull_interval = PullI, maxevents = Max,
blocking = Blocking, server_options = ServerOpts}}.
%%----------------------------------------------------------------------
%% Function : terminate/2
%% Returns : any (ignored by gen_server)
%% Description: Shutdown the server
%%----------------------------------------------------------------------
terminate(_Reason, _State) ->
?DBG("Terminating ~p~n", [_Reason]),
ok.
%%----------------------------------------------------------------------
%% Function : code_change/3
%% Returns : {ok, NewState}
%% Description: Convert process state when code is changed
%%----------------------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
%%---------------------------------------------------------------------%
%% function : handle_info
%% Arguments:
%% Returns : {noreply, State} |
%% {stop, Reason, State}
%% Effect : Functions demanded by the gen_server module.
%%----------------------------------------------------------------------
handle_info({'EXIT', Pid, _Reason}, #state{cadmins = CAdmins} = State) ->
?DBG("Probably a child terminated with Reason: ~p~n", [_Reason]),
{noreply, State#state{cadmins = lists:keydelete(Pid, 2, CAdmins)}};
handle_info(_Info, State) ->
?DBG("Unknown Info ~p~n", [_Info]),
{noreply, State}.
%%----------------------------------------------------------------------
%% Function : for_consumers
%% Arguments :
%% Returns :
%% Description:
%%----------------------------------------------------------------------
for_consumers(_, _, #state{server_options = ServerOpts} = State) ->
case catch 'oe_CosEventComm_CAdmin':oe_create_link([self(),
State#state.typecheck,
State#state.maxevents,
ServerOpts],
[{sup_child, true}|ServerOpts]) of
{ok, Pid, AdminCo} ->
?DBG("Created a new oe_CosEventComm_CAdmin.~n", []),
{reply, AdminCo,
State#state{cadmins = [{AdminCo, Pid}|State#state.cadmins]}};
Other ->
orber:dbg("[~p] oe_CosEventComm_Channel:for_consumers(); Error: ~p",
[?LINE, Other], ?DEBUG_LEVEL),
corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO})
end.
%%----------------------------------------------------------------------
%% Function : for_suppliers
%% Arguments :
%% Returns :
%% Description:
%%----------------------------------------------------------------------
for_suppliers(OE_This, _, #state{server_options = ServerOpts} = State) ->
case catch 'CosEventChannelAdmin_SupplierAdmin':oe_create_link([OE_This, self(),
State#state.typecheck,
State#state.pull_interval,
ServerOpts],
[{sup_child, true}|ServerOpts]) of
{ok, _Pid, AdminSu} ->
?DBG("Created a new CosEventChannelAdmin_SupplierAdmin.~n", []),
{reply, AdminSu, State};
Other ->
orber:dbg("[~p] oe_CosEventComm_Channel:for_suppliers();~nError: ~p",
[?LINE, Other], ?DEBUG_LEVEL),
corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO})
end.
%%----------------------------------------------------------------------
%% Function : destroy
%% Arguments :
%% Returns :
%% Description:
%%----------------------------------------------------------------------
destroy(_, _, State) ->
?DBG("Destroy invoked.", []),
{stop, normal, ok, State}.
%%----------------------------------------------------------------------
%% Function : send
%% Arguments :
%% Returns :
%% Description:
%%----------------------------------------------------------------------
send(_OE_This, #state{cadmins = CAdmins} = State, Any) ->
?DBG("Received Event ~p~n", [Any]),
case send_helper(CAdmins, Any, [], false) of
ok ->
?DBG("Received Event and forwarded it successfully.~n", []),
{noreply, State};
{error, Dropped} ->
?DBG("Received Event but forward failed for: ~p~n", [Dropped]),
RemainingAdmins = delete_cadmin(Dropped, CAdmins),
{noreply, State#state{cadmins = RemainingAdmins}}
end.
%%----------------------------------------------------------------------
%% Function : send_sync
%% Arguments :
%% Returns :
%% Description:
%%----------------------------------------------------------------------
send_sync(_OE_This, OE_From, #state{cadmins = CAdmins, blocking = BL} = State, Any) ->
?DBG("Received Event ~p~n", [Any]),
corba:reply(OE_From, ok),
case send_helper(CAdmins, Any, [], BL) of
ok ->
?DBG("Received Event and forwarded (sync) it successfully.~n", []),
{reply, ok, State};
{error, Dropped} ->
?DBG("Received Event but forward (sync) failed for: ~p~n", [Dropped]),
RemainingAdmins = delete_cadmin(Dropped, CAdmins),
{reply, ok, State#state{cadmins = RemainingAdmins}}
end.
%%======================================================================
%% Internal functions
%%======================================================================
send_helper([], _, [], _) ->
ok;
send_helper([], _, Dropped, _) ->
{error, Dropped};
send_helper([{ObjRef, Pid}|T], Event, Dropped, false) ->
case catch 'oe_CosEventComm_CAdmin':send(ObjRef, Event) of
ok ->
send_helper(T, Event, Dropped, false);
What ->
orber:dbg("[~p] oe_CosEventComm_Channel:send_helper(~p, ~p);~n"
"Bad return value ~p. Closing connection.",
[?LINE, ObjRef, Event, What], ?DEBUG_LEVEL),
send_helper(T, Event, [{ObjRef, Pid}|Dropped], false)
end;
send_helper([{ObjRef, Pid}|T], Event, Dropped, Sync) ->
case catch 'oe_CosEventComm_CAdmin':send_sync(ObjRef, Event) of
ok ->
send_helper(T, Event, Dropped, Sync);
What ->
orber:dbg("[~p] oe_CosEventComm_Channel:send_helper(~p, ~p);~n"
"Bad return value ~p. Closing connection.",
[?LINE, ObjRef, Event, What], ?DEBUG_LEVEL),
send_helper(T, Event, [{ObjRef, Pid}|Dropped], Sync)
end.
delete_cadmin([], RemainingAdmins) ->
RemainingAdmins;
delete_cadmin([{_,Pid}|T], CAdmins) ->
Rest = lists:keydelete(Pid, 2, CAdmins),
delete_cadmin(T, Rest).
%%======================================================================
%% END OF MODULE
%%======================================================================