diff options
Diffstat (limited to 'lib/cosEvent/src')
-rw-r--r-- | lib/cosEvent/src/CosEventChannelAdmin.cfg | 6 | ||||
-rw-r--r-- | lib/cosEvent/src/CosEventChannelAdmin.idl | 66 | ||||
-rw-r--r-- | lib/cosEvent/src/CosEventChannelAdmin_ProxyPullConsumer_impl.erl | 205 | ||||
-rw-r--r-- | lib/cosEvent/src/CosEventChannelAdmin_ProxyPushConsumer_impl.erl | 169 | ||||
-rw-r--r-- | lib/cosEvent/src/CosEventChannelAdmin_SupplierAdmin_impl.erl | 159 | ||||
-rw-r--r-- | lib/cosEvent/src/CosEventComm.idl | 37 | ||||
-rw-r--r-- | lib/cosEvent/src/Makefile | 214 | ||||
-rw-r--r-- | lib/cosEvent/src/cosEvent.app.src | 45 | ||||
-rw-r--r-- | lib/cosEvent/src/cosEvent.appup.src | 6 | ||||
-rw-r--r-- | lib/cosEvent/src/cosEventApp.cfg | 15 | ||||
-rw-r--r-- | lib/cosEvent/src/cosEventApp.erl | 290 | ||||
-rw-r--r-- | lib/cosEvent/src/cosEventApp.hrl | 62 | ||||
-rw-r--r-- | lib/cosEvent/src/cosEventApp.idl | 26 | ||||
-rw-r--r-- | lib/cosEvent/src/oe_CosEventComm_CAdmin_impl.erl | 233 | ||||
-rw-r--r-- | lib/cosEvent/src/oe_CosEventComm_Channel_impl.erl | 246 | ||||
-rw-r--r-- | lib/cosEvent/src/oe_CosEventComm_PullerS_impl.erl | 280 | ||||
-rw-r--r-- | lib/cosEvent/src/oe_CosEventComm_PusherS_impl.erl | 217 |
17 files changed, 2276 insertions, 0 deletions
diff --git a/lib/cosEvent/src/CosEventChannelAdmin.cfg b/lib/cosEvent/src/CosEventChannelAdmin.cfg new file mode 100644 index 0000000000..0de579bd6f --- /dev/null +++ b/lib/cosEvent/src/CosEventChannelAdmin.cfg @@ -0,0 +1,6 @@ +{this, "CosEventChannelAdmin::SupplierAdmin"}. +{{handle_info, "CosEventChannelAdmin::SupplierAdmin"}, true}. +{this, "CosEventChannelAdmin::ProxyPushConsumer"}. +{{handle_info, "CosEventChannelAdmin::ProxyPushConsumer"}, true}. +{this, "CosEventChannelAdmin::ProxyPullConsumer"}. +{{handle_info, "CosEventChannelAdmin::ProxyPullConsumer"}, true}. diff --git a/lib/cosEvent/src/CosEventChannelAdmin.idl b/lib/cosEvent/src/CosEventChannelAdmin.idl new file mode 100644 index 0000000000..d5cb92c4e0 --- /dev/null +++ b/lib/cosEvent/src/CosEventChannelAdmin.idl @@ -0,0 +1,66 @@ +#ifndef _COSEVENTCHANELADMIN_IDL +#define _COSEVENTCHANELADMIN_IDL + +#include "CosEventComm.idl" + +#pragma prefix "omg.org" + +module CosEventChannelAdmin +{ + exception AlreadyConnected{}; + exception TypeError{}; + + interface ProxyPushConsumer: CosEventComm::PushConsumer + { + void connect_push_supplier(in CosEventComm:: + PushSupplier push_supplier) + raises (AlreadyConnected); + }; + + interface ProxyPullSupplier: CosEventComm::PullSupplier + { + void connect_pull_consumer(in CosEventComm:: + PullConsumer pull_consumer) + raises (AlreadyConnected); + }; + + interface ProxyPullConsumer: CosEventComm::PullConsumer + { + void connect_pull_supplier(in CosEventComm:: + PullSupplier pull_supplier) + raises (AlreadyConnected, TypeError); + }; + + interface ProxyPushSupplier: CosEventComm::PushSupplier + { + void connect_push_consumer(in CosEventComm:: + PushConsumer push_consumer) + raises (AlreadyConnected, TypeError); + }; + + interface ConsumerAdmin + { + ProxyPushSupplier obtain_push_supplier(); + ProxyPullSupplier obtain_pull_supplier(); + }; + + interface SupplierAdmin + { + ProxyPushConsumer obtain_push_consumer(); + ProxyPullConsumer obtain_pull_consumer(); + }; + + interface EventChannel + { + ConsumerAdmin for_consumers(); + SupplierAdmin for_suppliers(); + void destroy(); + }; + +}; + +#endif + + + + diff --git a/lib/cosEvent/src/CosEventChannelAdmin_ProxyPullConsumer_impl.erl b/lib/cosEvent/src/CosEventChannelAdmin_ProxyPullConsumer_impl.erl new file mode 100644 index 0000000000..26269ad4f7 --- /dev/null +++ b/lib/cosEvent/src/CosEventChannelAdmin_ProxyPullConsumer_impl.erl @@ -0,0 +1,205 @@ +%%-------------------------------------------------------------------- +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2001-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%% +%%---------------------------------------------------------------------- +%% File : CosEventChannelAdmin_ProxyPullConsumer_impl.erl +%% Description : +%% +%%---------------------------------------------------------------------- +-module('CosEventChannelAdmin_ProxyPullConsumer_impl'). + +%%---------------------------------------------------------------------- +%% Include files +%%---------------------------------------------------------------------- +-include("CosEventChannelAdmin.hrl"). +-include("CosEventComm.hrl"). +-include("cosEventApp.hrl"). + +%%---------------------------------------------------------------------- +%% External exports +%%---------------------------------------------------------------------- +%% Mandatory +-export([init/1, + terminate/2, + code_change/3, + handle_info/2]). + +%% Interface functions +-export([connect_pull_supplier/3]). + +%% Exports from "CosEventComm::PullConsumer" +-export([disconnect_pull_consumer/2]). + + +%%---------------------------------------------------------------------- +%% Internal exports +%%---------------------------------------------------------------------- + +%%---------------------------------------------------------------------- +%% Records +%%---------------------------------------------------------------------- +-record(state, {admin, admin_pid, channel, client, + typecheck, pull_interval, timer_ref}). + +%%---------------------------------------------------------------------- +%% Macros +%%---------------------------------------------------------------------- + +%%====================================================================== +%% External functions +%%====================================================================== +%%---------------------------------------------------------------------- +%% Function : init/1 +%% Returns : {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%---------------------------------------------------------------------- +init([Admin, AdminPid, Channel, TypeCheck, PullInterval]) -> + process_flag(trap_exit, true), + Secs = timer:seconds(PullInterval), + timer:start(), + {ok, #state{admin = Admin, admin_pid = AdminPid, channel = Channel, + typecheck = TypeCheck, pull_interval = Secs}}. + +%%---------------------------------------------------------------------- +%% Function : terminate/2 +%% Returns : any (ignored by gen_server) +%% Description: Shutdown the server +%%---------------------------------------------------------------------- +terminate(_Reason, #state{client = undefined}) -> + ?DBG("Terminating ~p; no client connected.~n", [_Reason]), + ok; +terminate(_Reason, #state{client = Client} = State) -> + stop_timer(State), + ?DBG("Terminating ~p~n", [_Reason]), + cosEventApp:disconnect('CosEventComm_PullSupplier', + disconnect_pull_supplier, Client), + ok. + +%%---------------------------------------------------------------------- +%% Function : code_change/3 +%% Returns : {ok, NewState} +%% Description: Convert process state when code is changed +%%---------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------% +%% function : handle_info +%% Arguments: +%% Returns : {noreply, State} | +%% {stop, Reason, State} +%% Effect : If the Parent Admin or the Channel terminates so must this object. +%%---------------------------------------------------------------------- +handle_info({'EXIT', Pid, Reason}, #state{admin_pid = Pid} = State) -> + ?DBG("Parent Admin terminated ~p~n", [Reason]), + orber:dbg("[~p] CosEventChannelAdmin_ProxyPullConsumer:handle_info(~p);~n" + "My Admin terminated and so will I.", [?LINE, Reason], ?DEBUG_LEVEL), + {stop, Reason, State}; +handle_info(try_pull_event, State) -> + try_pull_event(State); +handle_info(_Info, State) -> + ?DBG("Unknown Info ~p~n", [_Info]), + {noreply, State}. + +%%---------------------------------------------------------------------- +%% Function : connect_pull_supplier +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +connect_pull_supplier(_OE_This, #state{client = undefined, + typecheck = TypeCheck} = State, NewClient) -> + case corba_object:is_nil(NewClient) of + true -> + ?DBG("A NIL client supplied.~n", []), + orber:dbg("[~p] CosEventChannelAdmin_ProxyPullConsumer:connect_pull_supplier(..);~n" + "Supplied a NIL reference which is not allowed.", + [?LINE], ?DEBUG_LEVEL), + corba:raise(#'BAD_PARAM'{completion_status = ?COMPLETED_NO}); + false -> + cosEventApp:type_check(NewClient, 'CosEventComm_PullSupplier', TypeCheck), + NewState = start_timer(State), + {reply, ok, NewState#state{client = NewClient}} + end; +connect_pull_supplier(_, _, _) -> + corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}). + + +%%---------------------------------------------------------------------- +%% Function : disconnect_pull_consumer +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +disconnect_pull_consumer(_OE_This, State) -> + NewState = stop_timer(State), + ?DBG("Disconnect invoked ~p~n", [NewState]), + {stop, normal, ok, NewState#state{client = undefined}}. + +%%====================================================================== +%% Internal functions +%%====================================================================== +%% Start timer which send a message each time we should pull for new events. +start_timer(State) -> + case catch timer:send_interval(State#state.pull_interval, try_pull_event) of + {ok,PullTRef} -> + ?DBG("Started timer: ~p~n", [State#state.pull_interval]), + State#state{timer_ref = PullTRef}; + _ -> + corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) + end. +stop_timer(#state{timer_ref = undefined} = State) -> + ?DBG("No timer to stop~n",[]), + State; +stop_timer(State) -> + ?DBG("Stopped timer~n",[]), + timer:cancel(State#state.timer_ref), + State#state{timer_ref = undefined}. + + +try_pull_event(State) -> + case catch 'CosEventComm_PullSupplier':try_pull(State#state.client) of + {_,false} -> + ?DBG("Client did not supply event~n", []), + {noreply, State}; + {Any, true} -> + 'oe_CosEventComm_Channel':send_sync(State#state.channel, Any), + ?DBG("Received Event ~p and forwarded it successfully.~n", [Any]), + {noreply, State}; + {'EXCEPTION', #'CosEventComm_Disconnected'{}} -> + ?DBG("Client claims we are disconnectedwhen trying to pull event.~n", []), + orber:dbg("[~p] CosEventChannelAdmin_ProxyPullConsumer:try_pull_event();~n" + "Client claims we are disconnected when trying to pull event so I terminate.", + [?LINE], ?DEBUG_LEVEL), + {stop, normal, State#state{client = undefined}}; + What -> + orber:dbg("[~p] CosEventChannelAdmin_ProxyPullConsumer:try_pull_event(~p);~n" + "My Client behaves badly so I terminate.", + [?LINE, What], ?DEBUG_LEVEL), + {stop, normal, State} + end. + + +%%====================================================================== +%% END OF MODULE +%%====================================================================== diff --git a/lib/cosEvent/src/CosEventChannelAdmin_ProxyPushConsumer_impl.erl b/lib/cosEvent/src/CosEventChannelAdmin_ProxyPushConsumer_impl.erl new file mode 100644 index 0000000000..969beb1d04 --- /dev/null +++ b/lib/cosEvent/src/CosEventChannelAdmin_ProxyPushConsumer_impl.erl @@ -0,0 +1,169 @@ +%%-------------------------------------------------------------------- +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2001-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%% +%%---------------------------------------------------------------------- +%% File : CosEventChannelAdmin_ProxyPushConsumer_impl.erl +%% Description : +%% +%%---------------------------------------------------------------------- +-module('CosEventChannelAdmin_ProxyPushConsumer_impl'). + +%%---------------------------------------------------------------------- +%% Include files +%%---------------------------------------------------------------------- +-include("CosEventChannelAdmin.hrl"). +-include("CosEventComm.hrl"). +-include("cosEventApp.hrl"). + +%%---------------------------------------------------------------------- +%% External exports +%%---------------------------------------------------------------------- +%% Mandatory +-export([init/1, + terminate/2, + code_change/3, + handle_info/2]). + +%% Exports from "CosEventChannelAdmin::ProxyPushConsumer" +-export([connect_push_supplier/3]). + +%% Exports from "CosEventComm::PushConsumer" +-export([push/3, + disconnect_push_consumer/2]). + +%%---------------------------------------------------------------------- +%% Internal exports +%%---------------------------------------------------------------------- + +%%---------------------------------------------------------------------- +%% Records +%%---------------------------------------------------------------------- +-record(state, {admin, admin_pid, channel, client, typecheck}). + +%%---------------------------------------------------------------------- +%% Macros +%%---------------------------------------------------------------------- + +%%====================================================================== +%% External functions +%%====================================================================== +%%---------------------------------------------------------------------- +%% Function : init/1 +%% Returns : {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%---------------------------------------------------------------------- +init([Admin, AdminPid, Channel, TypeCheck]) -> + process_flag(trap_exit, true), + {ok, #state{admin = Admin, admin_pid = AdminPid, channel = Channel, + typecheck = TypeCheck}}. + +%%---------------------------------------------------------------------- +%% Function : terminate/2 +%% Returns : any (ignored by gen_server) +%% Description: Shutdown the server +%%---------------------------------------------------------------------- +terminate(_Reason, #state{client = undefined}) -> + ?DBG("Terminating ~p; no client connected.~n", [_Reason]), + ok; +terminate(_Reason, #state{client = Client} = _State) -> + ?DBG("Terminating ~p~n", [_Reason]), + cosEventApp:disconnect('CosEventComm_PushSupplier', + disconnect_push_supplier, Client), + ok. + +%%---------------------------------------------------------------------- +%% Function : code_change/3 +%% Returns : {ok, NewState} +%% Description: Convert process state when code is changed +%%---------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------% +%% function : handle_info +%% Arguments: +%% Returns : {noreply, State} | +%% {stop, Reason, State} +%% Effect : If the Parnet Admin or the Channel terminates so must this object. +%%---------------------------------------------------------------------- +handle_info({'EXIT', Pid, Reason}, #state{admin_pid = Pid} = State) -> + ?DBG("Parent Admin terminated ~p~n", [Reason]), + orber:dbg("[~p] CosEventChannelAdmin_ProxyPushConsumer:handle_info(~p);~n" + "My Admin terminated and so will I.", + [?LINE, Reason], ?DEBUG_LEVEL), + {stop, Reason, State}; +handle_info(_Info, State) -> + ?DBG("Unknown Info ~p~n", [_Info]), + {noreply, State}. + +%%---------------------------------------------------------------------- +%% Function : connect_push_supplier +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +connect_push_supplier(_OE_This, #state{client = undefined, + typecheck = TypeCheck} = State, NewClient) -> + case corba_object:is_nil(NewClient) of + true -> + ?DBG("A NIL client supplied.~n", []), + {reply, ok, State}; + false -> + cosEventApp:type_check(NewClient, 'CosEventComm_PushSupplier', TypeCheck), + ?DBG("Connected to client.~n", []), + {reply, ok, State#state{client = NewClient}} + end; +connect_push_supplier(_, _, _) -> + corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}). + + +%%---------------------------------------------------------------------- +%% Function : push +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +push(_OE_This, State, Any) -> + %% We should not use corba:reply here since if we block incoming + %% events this will prevent producers to flood the system. + ?DBG("Received Event ~p and forwarded it successfully.~n", [Any]), + 'oe_CosEventComm_Channel':send_sync(State#state.channel, Any), + {reply, ok, State}. + +%%---------------------------------------------------------------------- +%% Function : disconnect_push_consumer +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +disconnect_push_consumer(_OE_This, State) -> + ?DBG("Disconnect invoked ~p~n", [State]), + {stop, normal, ok, State#state{client = undefined}}. + +%%====================================================================== +%% Internal functions +%%====================================================================== + +%%====================================================================== +%% END OF MODULE +%%====================================================================== diff --git a/lib/cosEvent/src/CosEventChannelAdmin_SupplierAdmin_impl.erl b/lib/cosEvent/src/CosEventChannelAdmin_SupplierAdmin_impl.erl new file mode 100644 index 0000000000..c7cf0bd869 --- /dev/null +++ b/lib/cosEvent/src/CosEventChannelAdmin_SupplierAdmin_impl.erl @@ -0,0 +1,159 @@ +%%-------------------------------------------------------------------- +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1997-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%% +%%---------------------------------------------------------------------- +%% File : CosEventChannelAdmin_SupplierAdmin_impl.erl +%% Created : 21 Mar 2001 +%% Description : +%% +%%---------------------------------------------------------------------- +-module('CosEventChannelAdmin_SupplierAdmin_impl'). + + +%%---------------------------------------------------------------------- +%% Include files +%%---------------------------------------------------------------------- +-include("cosEventApp.hrl"). + + +%%---------------------------------------------------------------------- +%% External exports +%%---------------------------------------------------------------------- +%% Mandatory +-export([init/1, + terminate/2, + code_change/3, + handle_info/2]). + +%% Exports from "CosEventChannelAdmin::SupplierAdmin" +-export([obtain_push_consumer/2, + obtain_pull_consumer/2]). + +%%---------------------------------------------------------------------- +%% Internal exports +%%---------------------------------------------------------------------- + +%%---------------------------------------------------------------------- +%% Records +%%---------------------------------------------------------------------- +-record(state, {channel, channel_pid, typecheck, pull_interval, server_options}). + +%%---------------------------------------------------------------------- +%% Macros +%%---------------------------------------------------------------------- + +%%====================================================================== +%% External functions +%%====================================================================== +%%---------------------------------------------------------------------- +%% Function : init/1 +%% Returns : {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%---------------------------------------------------------------------- +init([Channel, ChannelPid, TypeCheck, PullInterval, ServerOpts]) -> + process_flag(trap_exit, true), + {ok, #state{channel = Channel, channel_pid = ChannelPid, typecheck = TypeCheck, + pull_interval = PullInterval, server_options = ServerOpts}}. + +%%---------------------------------------------------------------------- +%% Function : terminate/2 +%% Returns : any (ignored by gen_server) +%% Description: Shutdown the server +%%---------------------------------------------------------------------- +terminate(_Reason, _State) -> + ?DBG("Terminating ~p~n", [_Reason]), + ok. + +%%---------------------------------------------------------------------- +%% Function : code_change/3 +%% Returns : {ok, NewState} +%% Description: Convert process state when code is changed +%%---------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------% +%% function : handle_info +%% Arguments: +%% Returns : {noreply, State} | +%% {stop, Reason, State} +%% Effect : Functions demanded by the gen_server module. +%%---------------------------------------------------------------------- +handle_info({'EXIT', Pid, Reason}, #state{channel_pid = Pid} = State) -> + ?DBG("Parent Channel terminated ~p~n", [Reason]), + orber:dbg("[~p] CosEventChannelAdmin_SupplierAdmin:handle_info(~p);~n" + "My Channel terminated and so will I.", + [?LINE, Reason], ?DEBUG_LEVEL), + {stop, Reason, State}; +handle_info(_Info, State) -> + ?DBG("Unknown Info ~p~n", [_Info]), + {noreply, State}. + + +%%---------------------------------------------------------------------- +%% Function : obtain_push_consumer +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +obtain_push_consumer(OE_This, #state{channel = Channel, + channel_pid = _ChannelPid, + typecheck = TypeCheck, + server_options = ServerOpts} = State) -> + ?DBG("Starting a new CosEventChannelAdmin_ProxyPushConsumer.~n", []), + {reply, + 'CosEventChannelAdmin_ProxyPushConsumer':oe_create_link([OE_This, + self(), + Channel, + TypeCheck], + ServerOpts), + State}. + +%%---------------------------------------------------------------------- +%% Function : obtain_pull_consumer +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +obtain_pull_consumer(OE_This, #state{channel = Channel, + channel_pid = _ChannelPid, + typecheck = TypeCheck, + pull_interval= PullInterval, + server_options = ServerOpts} = State) -> + ?DBG("Starting a new CosEventChannelAdmin_ProxyPullConsumer.~n", []), + {reply, + 'CosEventChannelAdmin_ProxyPullConsumer':oe_create_link([OE_This, + self(), + Channel, + TypeCheck, + PullInterval], + ServerOpts), + State}. + +%%====================================================================== +%% Internal functions +%%====================================================================== + +%%====================================================================== +%% END OF MODULE +%%====================================================================== diff --git a/lib/cosEvent/src/CosEventComm.idl b/lib/cosEvent/src/CosEventComm.idl new file mode 100644 index 0000000000..bb0c107394 --- /dev/null +++ b/lib/cosEvent/src/CosEventComm.idl @@ -0,0 +1,37 @@ + +#ifndef _COSEVENTCOMM_IDL +#define _COSEVENTCOMM_IDL + +#pragma prefix "omg.org" + +module CosEventComm +{ + exception Disconnected{}; + + interface PushConsumer + { + void push(in any data) raises (Disconnected); + void disconnect_push_consumer(); + }; + + + interface PushSupplier + { + void disconnect_push_supplier(); + }; + + interface PullSupplier + { + any pull() raises(Disconnected); + any try_pull(out boolean has_event) raises(Disconnected); + void disconnect_pull_supplier(); + }; + + interface PullConsumer + { + void disconnect_pull_consumer(); + }; +}; + +#endif + diff --git a/lib/cosEvent/src/Makefile b/lib/cosEvent/src/Makefile new file mode 100644 index 0000000000..a62d47ce74 --- /dev/null +++ b/lib/cosEvent/src/Makefile @@ -0,0 +1,214 @@ +# +# %CopyrightBegin% +# +# Copyright Ericsson AB 1997-2009. All Rights Reserved. +# +# The contents of this file are subject to the Erlang Public License, +# Version 1.1, (the "License"); you may not use this file except in +# compliance with the License. You should have received a copy of the +# Erlang Public License along with this software. If not, it can be +# retrieved online at http://www.erlang.org/. +# +# Software distributed under the License is distributed on an "AS IS" +# basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +# the License for the specific language governing rights and limitations +# under the License. +# +# %CopyrightEnd% +# +# +include $(ERL_TOP)/make/target.mk +ifeq ($(TYPE),debug) +ERL_COMPILE_FLAGS += -Ddebug -W +endif +EBIN=../ebin +include $(ERL_TOP)/make/$(TARGET)/otp.mk + +# ---------------------------------------------------- +# Application version +# ---------------------------------------------------- +include ../vsn.mk +VSN=$(COSEVENT_VSN) + +# ---------------------------------------------------- +# Release directory specification +# ---------------------------------------------------- +RELSYSDIR = $(RELEASE_PATH)/lib/cosEvent-$(VSN) +# ---------------------------------------------------- +# Target Specs +# ---------------------------------------------------- + +MODULES = \ + CosEventChannelAdmin_ProxyPullConsumer_impl \ + CosEventChannelAdmin_ProxyPushConsumer_impl \ + CosEventChannelAdmin_SupplierAdmin_impl \ + oe_CosEventComm_CAdmin_impl \ + oe_CosEventComm_Channel_impl \ + oe_CosEventComm_PullerS_impl \ + oe_CosEventComm_PusherS_impl \ + cosEventApp + + + +ERL_FILES = $(MODULES:%=%.erl) +HRL_FILES = cosEventApp.hrl + + +GEN_ERL_FILES1 = \ + oe_CosEventChannelAdmin.erl \ + CosEventChannelAdmin_ConsumerAdmin.erl \ + CosEventChannelAdmin_EventChannel.erl \ + CosEventChannelAdmin_ProxyPullConsumer.erl \ + CosEventChannelAdmin_ProxyPullSupplier.erl \ + CosEventChannelAdmin_ProxyPushConsumer.erl \ + CosEventChannelAdmin_ProxyPushSupplier.erl \ + CosEventChannelAdmin_SupplierAdmin.erl \ + CosEventChannelAdmin_AlreadyConnected.erl \ + CosEventChannelAdmin_TypeError.erl + +GEN_ERL_FILES2 = \ + oe_CosEventComm_CAdmin.erl \ + oe_CosEventComm_Channel.erl \ + oe_CosEventComm_Event.erl \ + oe_CosEventComm_PullerS.erl \ + oe_CosEventComm_PusherS.erl \ + oe_cosEventApp.erl + +GEN_ERL_FILES3 = \ + oe_CosEventComm.erl \ + CosEventComm_Disconnected.erl \ + CosEventComm_PullConsumer.erl \ + CosEventComm_PullSupplier.erl \ + CosEventComm_PushConsumer.erl \ + CosEventComm_PushSupplier.erl + +GEN_ERL_FILES = \ + $(GEN_ERL_FILES1) $(GEN_ERL_FILES2) $(GEN_ERL_FILES3) + +EXTERNAL_INC_PATH = ../include + +GEN_HRL_FILES1 = \ + oe_CosEventChannelAdmin.hrl \ + CosEventChannelAdmin.hrl \ + CosEventChannelAdmin_ConsumerAdmin.hrl \ + CosEventChannelAdmin_EventChannel.hrl \ + CosEventChannelAdmin_ProxyPullConsumer.hrl \ + CosEventChannelAdmin_ProxyPullSupplier.hrl \ + CosEventChannelAdmin_ProxyPushConsumer.hrl \ + CosEventChannelAdmin_ProxyPushSupplier.hrl \ + CosEventChannelAdmin_SupplierAdmin.hrl + +EXTERNAL_GEN_HRL_FILES1 = $(GEN_HRL_FILES1:%=$(EXTERNAL_INC_PATH)/%) + +GEN_HRL_FILES2 = \ + oe_CosEventComm_PullerS.hrl \ + oe_CosEventComm_CAdmin.hrl \ + oe_CosEventComm_PusherS.hrl \ + oe_CosEventComm_Channel.hrl \ + oe_cosEventApp.hrl \ + oe_CosEventComm_Event.hrl + +GEN_HRL_FILES3 = \ + oe_CosEventComm.hrl \ + CosEventComm.hrl \ + CosEventComm_PullConsumer.hrl \ + CosEventComm_PullSupplier.hrl \ + CosEventComm_PushConsumer.hrl \ + CosEventComm_PushSupplier.hrl + +EXTERNAL_GEN_HRL_FILES3 = $(GEN_HRL_FILES3:%=$(EXTERNAL_INC_PATH)/%) + +GEN_HRL_FILES = \ + $(EXTERNAL_GEN_HRL_FILES1) $(GEN_HRL_FILES2) $(EXTERNAL_GEN_HRL_FILES3) + +TARGET_FILES = \ + $(GEN_ERL_FILES:%.erl=$(EBIN)/%.$(EMULATOR)) \ + $(MODULES:%=$(EBIN)/%.$(EMULATOR)) + +GEN_FILES = $(GEN_HRL_FILES) $(GEN_ERL_FILES) + +IDL_FILES = \ + CosEventChannelAdmin.idl \ + CosEventComm.idl \ + cosEventApp.idl + +APPUP_FILE = cosEvent.appup +APPUP_SRC = $(APPUP_FILE).src +APPUP_TARGET = $(EBIN)/$(APPUP_FILE) + +APP_FILE = cosEvent.app +APP_SRC = $(APP_FILE).src +APP_TARGET = $(EBIN)/$(APP_FILE) + +# ---------------------------------------------------- +# FLAGS +# ---------------------------------------------------- +ERL_IDL_FLAGS += -pa $(ERL_TOP)/lib/cosEvent/ebin -pa $(ERL_TOP)/lib/ic/ebin +# The -pa option is just used temporary until erlc can handle +# includes from other directories than ../include . +ERL_COMPILE_FLAGS += \ + $(ERL_IDL_FLAGS) \ + -I$(ERL_TOP)/lib/orber/include \ + -I$(ERL_TOP)/lib/cosEvent/include \ + +'{parse_transform,sys_pre_attributes}' \ + +'{attribute,insert,app_vsn,"cosEvent_$(COSEVENT_VSN)"}' + +YRL_FLAGS = + +# ---------------------------------------------------- +# Targets +# ---------------------------------------------------- +opt: $(TARGET_FILES) $(APP_TARGET) $(APPUP_TARGET) + +debug: + @${MAKE} TYPE=debug opt + +clean: + rm -f $(TARGET_FILES) $(GEN_FILES) $(APP_TARGET) $(APPUP_TARGET) + rm -f errs core *~ + +$(APP_TARGET): $(APP_SRC) + sed -e 's;%VSN%;$(VSN);' $(APP_SRC) > $(APP_TARGET) +$(APPUP_TARGET): $(APPUP_SRC) + sed -e 's;%VSN%;$(VSN);' $(APPUP_SRC) > $(APPUP_TARGET) + +docs: + +# ---------------------------------------------------- +# Special Build Targets +# ---------------------------------------------------- +$(GEN_ERL_FILES1) $(EXTERNAL_GEN_HRL_FILES1): CosEventChannelAdmin.idl + erlc $(ERL_IDL_FLAGS) +'{cfgfile,"CosEventChannelAdmin.cfg"}' CosEventChannelAdmin.idl + mv $(GEN_HRL_FILES1) $(EXTERNAL_INC_PATH) + +$(GEN_ERL_FILES2) $(GEN_HRL_FILES2): cosEventApp.idl + erlc $(ERL_IDL_FLAGS) +'{cfgfile,"cosEventApp.cfg"}' cosEventApp.idl + +$(GEN_ERL_FILES3) $(EXTERNAL_GEN_HRL_FILES3): CosEventComm.idl + erlc $(ERL_IDL_FLAGS) CosEventComm.idl + mv $(GEN_HRL_FILES3) $(EXTERNAL_INC_PATH) + +# ---------------------------------------------------- +# Release Target +# ---------------------------------------------------- +include $(ERL_TOP)/make/otp_release_targets.mk + + +release_spec: opt + $(INSTALL_DIR) $(RELSYSDIR) + $(INSTALL_DATA) ../info $(RELSYSDIR) + $(INSTALL_DIR) $(RELSYSDIR)/ebin + $(INSTALL_DATA) $(TARGET_FILES) $(APP_TARGET) $(APPUP_TARGET) $(RELSYSDIR)/ebin + $(INSTALL_DIR) $(RELSYSDIR)/src + $(INSTALL_DATA) $(ERL_FILES) $(HRL_FILES) $(GEN_ERL_FILES) $(IDL_FILES) $(RELSYSDIR)/src + $(INSTALL_DIR) $(RELSYSDIR)/include + $(INSTALL_DATA) $(GEN_HRL_FILES) $(RELSYSDIR)/include + + +release_docs_spec: + + + + + + diff --git a/lib/cosEvent/src/cosEvent.app.src b/lib/cosEvent/src/cosEvent.app.src new file mode 100644 index 0000000000..c1cb9e0cc9 --- /dev/null +++ b/lib/cosEvent/src/cosEvent.app.src @@ -0,0 +1,45 @@ +{application, cosEvent, + [{description, "The Erlang CosEvent application"}, + {vsn, "%VSN%"}, + {modules, + [ + 'CosEventChannelAdmin_ProxyPullConsumer_impl', + 'CosEventChannelAdmin_ProxyPushConsumer_impl', + 'CosEventChannelAdmin_SupplierAdmin_impl', + 'oe_CosEventComm_CAdmin_impl', + 'oe_CosEventComm_Channel_impl', + 'oe_CosEventComm_PullerS_impl', + 'oe_CosEventComm_PusherS_impl', + 'cosEventApp', + 'oe_CosEventChannelAdmin', + 'CosEventChannelAdmin_AlreadyConnected', + 'CosEventChannelAdmin_ConsumerAdmin', + 'CosEventChannelAdmin_EventChannel', + 'CosEventChannelAdmin_ProxyPullConsumer', + 'CosEventChannelAdmin_ProxyPullSupplier', + 'CosEventChannelAdmin_ProxyPushConsumer', + 'CosEventChannelAdmin_ProxyPushSupplier', + 'CosEventChannelAdmin_SupplierAdmin', + 'CosEventChannelAdmin_TypeError', + 'oe_CosEventComm_CAdmin', + 'oe_CosEventComm_Channel', + 'oe_CosEventComm_Event', + 'oe_CosEventComm_PullerS', + 'oe_CosEventComm_PusherS', + 'oe_cosEventApp', + 'oe_CosEventComm', + 'CosEventComm_PushSupplier', + 'CosEventComm_PushConsumer', + 'CosEventComm_PullSupplier', + 'CosEventComm_PullConsumer', + 'CosEventComm_Disconnected' + ] + }, + {registered, []}, + {applications, [orber, stdlib, kernel]}, + {env, []}, + {mod, {cosEventApp, []}} +]}. + + + diff --git a/lib/cosEvent/src/cosEvent.appup.src b/lib/cosEvent/src/cosEvent.appup.src new file mode 100644 index 0000000000..d69b2ef20c --- /dev/null +++ b/lib/cosEvent/src/cosEvent.appup.src @@ -0,0 +1,6 @@ +{"%VSN%", + [ + ], + [ + ] +} diff --git a/lib/cosEvent/src/cosEventApp.cfg b/lib/cosEvent/src/cosEventApp.cfg new file mode 100644 index 0000000000..bbacd134f7 --- /dev/null +++ b/lib/cosEvent/src/cosEventApp.cfg @@ -0,0 +1,15 @@ +{this, "oe_CosEventComm::Event"}. +{from, "oe_CosEventComm::Event"}. +{{handle_info, "oe_CosEventComm::Event"}, true}. +{this, "oe_CosEventComm::Channel"}. +{from, "oe_CosEventComm::Channel"}. +{{handle_info, "oe_CosEventComm::Channel"}, true}. +{this, "oe_CosEventComm::CAdmin"}. +{from, "oe_CosEventComm::CAdmin"}. +{{handle_info, "oe_CosEventComm::CAdmin"}, true}. +{this, "oe_CosEventComm::PullerS"}. +{from, "oe_CosEventComm::PullerS"}. +{{handle_info, "oe_CosEventComm::PullerS"}, true}. +{this, "oe_CosEventComm::PusherS"}. +{from, "oe_CosEventComm::PusherS"}. +{{handle_info, "oe_CosEventComm::PusherS"}, true}. diff --git a/lib/cosEvent/src/cosEventApp.erl b/lib/cosEvent/src/cosEventApp.erl new file mode 100644 index 0000000000..084490f845 --- /dev/null +++ b/lib/cosEvent/src/cosEventApp.erl @@ -0,0 +1,290 @@ +%%-------------------------------------------------------------------- +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2001-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%% +%%---------------------------------------------------------------------- +%% File : cosEventApp.erl +%% Description : +%% +%%---------------------------------------------------------------------- +-module(cosEventApp). + +%%--------------- INCLUDES ----------------------------------- +-include_lib("orber/include/corba.hrl"). +-include("cosEventApp.hrl"). + + +%%--------------- EXPORTS------------------------------------- +%% cosEvent API external +-export([start/0, stop/0, install/0, uninstall/0, start_channel/0, start_channel/1, + start_channel_link/0, start_channel_link/1, stop_channel/1]). + +%% cosEvent API internal +-export([create_link/3, get_option/2, type_check/3, disconnect/3, do_disconnect/3]). + +%% Application callbacks +-export([start/2, init/1, stop/1]). + +%%--------------- DEFINES ------------------------------------ +-define(IDL_MODULES, ['oe_CosEventComm', + 'oe_CosEventChannelAdmin', + 'oe_cosEventApp']). + +-define(SUPERVISOR_NAME, oe_cosEventSup). +-define(SUP_FLAG, {simple_one_for_one,50,10}). + +-define(SUP_SPEC(Name, Args), + ['CosEventChannel_EventChannel',Args, + [{sup_child, true}, {regname, {global, Name}}]]). +-define(SUP_CHILD, + {"oe_EventChild", + {cosEventApp,create_link, []}, + transient,100000,worker, + ['CosEventChannel_EventChannel']}). + + +%%-----------------------------------------------------------% +%% function : install +%% Arguments: - +%% Returns : ok | EXIT | EXCEPTION +%% Effect : Install necessary data in the IFR DB +%%------------------------------------------------------------ +install() -> + install_loop(?IDL_MODULES, []). + +install_loop([], _) -> + ok; +install_loop([H|T], Accum) -> + case catch H:'oe_register'() of + {'EXIT',{unregistered,App}} -> + ?write_ErrorMsg("Unable to register '~p'; application ~p not registered. +Trying to unregister ~p~n", [H,App,Accum]), + uninstall_loop(Accum, {exit, register}); + {'EXCEPTION',_} -> + ?write_ErrorMsg("Unable to register '~p'; propably already registered. +You are adviced to confirm this. +Trying to unregister ~p~n", [H,Accum]), + uninstall_loop(Accum, {exit, register}); + ok -> + install_loop(T, [H|Accum]); + _ -> + ?write_ErrorMsg("Unable to register '~p'; reason unknown. +Trying to unregister ~p~n", [H,Accum]), + uninstall_loop(Accum, {exit, register}) + end. + +%%-----------------------------------------------------------% +%% function : uninstall +%% Arguments: - +%% Returns : ok | EXIT | EXCEPTION +%% Effect : Remove data related to cosEvent from the IFR DB +%%------------------------------------------------------------ +uninstall() -> + uninstall_loop(lists:reverse(?IDL_MODULES), ok). + +uninstall_loop([],ok) -> + ok; +uninstall_loop([],{exit, register}) -> + exit({?MODULE, "oe_register failed"}); +uninstall_loop([],{exit, unregister}) -> + exit({?MODULE, "oe_unregister failed"}); +uninstall_loop([],{exit, both}) -> + exit({?MODULE, "oe_register and, for some of those already registered, oe_unregister failed"}); +uninstall_loop([H|T], Status) -> + case catch H:'oe_unregister'() of + ok -> + uninstall_loop(T, Status); + _ when Status == ok -> + ?write_ErrorMsg("Unable to unregister '~p'; propably already unregistered. +You are adviced to confirm this.~n",[H]), + uninstall_loop(T, {exit, unregister}); + _ -> + ?write_ErrorMsg("Unable to unregister '~p'; propably already unregistered. +You are adviced to confirm this.~n",[H]), + uninstall_loop(T, {exit, both}) + end. + +%%-----------------------------------------------------------% +%% function : start/stop +%% Arguments: +%% Returns : +%% Effect : Starts or stops the cosTime application. +%%------------------------------------------------------------ + +start() -> + application:start(cosEvent). +stop() -> + application:stop(cosEvent). + +%%-----------------------------------------------------------% +%% function : start +%% Arguments: Type - see module application +%% Arg - see module application +%% Returns : +%% Effect : Module callback for application +%%------------------------------------------------------------ + +start(_, _) -> + supervisor:start_link({local, ?SUPERVISOR_NAME}, cosEventApp, app_init). + + +%%-----------------------------------------------------------% +%% function : stop +%% Arguments: Arg - see module application +%% Returns : +%% Effect : Module callback for application +%%------------------------------------------------------------ + +stop(_) -> + ok. + +%%-----------------------------------------------------------% +%% function : start_channel +%% Arguments: - +%% Returns : +%% Effect : +%%------------------------------------------------------------ +start_channel() -> + start_channel(?DEFAULT_OPTIONS). + +start_channel(Options) when is_list(Options) -> + ServerOpts = get_option(?SERVER, Options), + 'oe_CosEventComm_Channel':oe_create([Options, ServerOpts], ServerOpts); +start_channel(Options) -> + orber:dbg("[~p] cosEventApp:start_channel(~p);~n" + "Options not correct.", [?LINE, Options], ?DEBUG_LEVEL), + corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}). + +%%-----------------------------------------------------------% +%% function : start_channel +%% Arguments: - +%% Returns : +%% Effect : +%%------------------------------------------------------------ +start_channel_link() -> + start_channel_link(?DEFAULT_OPTIONS). + +start_channel_link(Options) when is_list(Options) -> + ServerOpts = get_option(?SERVER, Options), + 'oe_CosEventComm_Channel':oe_create_link([Options, ServerOpts], ServerOpts); +start_channel_link(Options) -> + orber:dbg("[~p] cosEventApp:start_channel_link(~p);~n" + "Options not correct.", [?LINE, Options], ?DEBUG_LEVEL), + corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}). + +%%-----------------------------------------------------------% +%% function : stop_factory +%% Arguments: ChannelObj +%% Returns : +%% Effect : +%%------------------------------------------------------------ +stop_channel(ChannelObj) -> + corba:dispose(ChannelObj). + +%%-----------------------------------------------------------% +%% function : init +%% Arguments: +%% Returns : +%% Effect : +%%------------------------------------------------------------ + +%% Starting using create_factory/X +init(own_init) -> + {ok,{?SUP_FLAG, [?SUP_CHILD]}}; +%% When starting as an application. +init(app_init) -> + {ok,{?SUP_FLAG, [?SUP_CHILD]}}. + +%%-----------------------------------------------------------% +%% function : create_link +%% Arguments: Module - which Module to call +%% Env/ArgList - ordinary oe_create arguments. +%% Returns : +%% Exception: +%% Effect : Necessary since we want the supervisor to be a +%% 'simple_one_for_one'. Otherwise, using for example, +%% 'one_for_one', we have to call supervisor:delete_child +%% to remove the childs startspecification from the +%% supervisors internal state. +%%------------------------------------------------------------ +create_link(Module, Env, ArgList) -> + Module:oe_create_link(Env, ArgList). + + +%%-----------------------------------------------------------% +%% function : get_option +%% Arguments: +%% Returns : +%% Exception: +%% Effect : +%%------------------------------------------------------------ +get_option(Key, OptionList) -> + case lists:keysearch(Key, 1, OptionList) of + {value,{Key,Value}} -> + Value; + _ -> + case lists:keysearch(Key, 1, ?DEFAULT_OPTIONS) of + {value,{Key,Value}} -> + Value; + _-> + {error, "Invalid option"} + end + end. + +%%-----------------------------------------------------------% +%% function : type_check +%% Arguments: Obj - objectrefernce to test. +%% Mod - Module which contains typeID/0. +%% Returns : 'ok' or raises exception. +%% Effect : +%%------------------------------------------------------------ +type_check(_Obj, _Mod, false) -> + ok; +type_check(Obj, Mod, _) -> + case catch corba_object:is_a(Obj, Mod:typeID()) of + true -> + ok; + _ -> + orber:dbg("[~p] cosEventApp:type_check(~p) failed; Should be ~p", + [?LINE, Obj, Mod], ?DEBUG_LEVEL), + corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}) + end. + +%%-----------------------------------------------------------% +%% function : disconnect +%% Arguments: Module - one of the interfaces defined in CosEventComm. +%% Function - the appropriate disconnect function. +%% Object - the client object reference. +%% Returns : ok +%% Exception: +%% Effect : If the process would try to diconnect itself it could +%% result in a deadlock. Hence, we spawn a new process to do it. +%%------------------------------------------------------------ +disconnect(Module, Function, Object) -> + spawn(cosEventApp, do_disconnect, [Module, Function, Object]), + ok. + +do_disconnect(Module, Function, Object) -> + catch Module:Function(Object), + ?DBG("Disconnect ~p:~p(..).~n", [Module, Function]), + ok. + +%%--------------- END OF MODULE ------------------------------ + + diff --git a/lib/cosEvent/src/cosEventApp.hrl b/lib/cosEvent/src/cosEventApp.hrl new file mode 100644 index 0000000000..ef72277bd6 --- /dev/null +++ b/lib/cosEvent/src/cosEventApp.hrl @@ -0,0 +1,62 @@ +%%---------------------------------------------------------------------- +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2001-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%% +%%---------------------------------------------------------------------- +%% File : cosEventApp.hrl +%% Description : +%% +%%---------------------------------------------------------------------- + +%%--------------- INCLUDES ----------------------------------- +%% External +-include_lib("orber/include/corba.hrl"). +-include_lib("orber/include/ifr_types.hrl"). + +-define(write_ErrorMsg(Txt, Arg), +error_logger:error_msg("================ CosEvent =================~n" + Txt + "===========================================~n", + Arg)). + + +-define(PULL_INTERVAL, pull_interval). +-define(TYPECHECK, typecheck). +-define(MAXEVENTS, maxEvents). +-define(BLOCKING, blocking). +-define(SERVER, server_options). +-define(DEFAULT_OPTIONS, [{?PULL_INTERVAL, 20}, + {?BLOCKING, true}, + {?TYPECHECK, false}, + {?MAXEVENTS, 300}, + {?SERVER, []}]). + +-define(DEBUG_LEVEL, 3). + +-ifdef(debug). +-define(DBG(F,A), + io:format("[~p (~p)] "++F,[?MODULE, ?LINE]++A)). +-else. +-define(DBG(F,A), ok). +-endif. + + + + +%%--------------- END OF MODULE ---------------------------------------- diff --git a/lib/cosEvent/src/cosEventApp.idl b/lib/cosEvent/src/cosEventApp.idl new file mode 100644 index 0000000000..e5a134685f --- /dev/null +++ b/lib/cosEvent/src/cosEventApp.idl @@ -0,0 +1,26 @@ +#ifndef _COS_EVENT_APP_IDL_ +#define _COS_EVENT_APP_IDL_ + +#include<CosEventChannelAdmin.idl> + + +module oe_CosEventComm { + + + interface Event { + oneway void send(in any event); + void send_sync(in any event); + }; + + interface Channel : CosEventChannelAdmin::EventChannel, Event {}; + + interface CAdmin : CosEventChannelAdmin::ConsumerAdmin, Event {}; + + interface PullerS : CosEventChannelAdmin::ProxyPullSupplier, Event {}; + + interface PusherS : CosEventChannelAdmin::ProxyPushSupplier, Event {}; + +}; + + +#endif diff --git a/lib/cosEvent/src/oe_CosEventComm_CAdmin_impl.erl b/lib/cosEvent/src/oe_CosEventComm_CAdmin_impl.erl new file mode 100644 index 0000000000..976c6dbab5 --- /dev/null +++ b/lib/cosEvent/src/oe_CosEventComm_CAdmin_impl.erl @@ -0,0 +1,233 @@ +%%-------------------------------------------------------------------- +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2001-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%% +%%---------------------------------------------------------------------- +%% File : oe_CosEventComm_CAdmin_impl.erl +%% Description : +%% +%%---------------------------------------------------------------------- +-module(oe_CosEventComm_CAdmin_impl). + +%%---------------------------------------------------------------------- +%% Include files +%%---------------------------------------------------------------------- +-include_lib("orber/include/corba.hrl"). +-include("cosEventApp.hrl"). + +%%---------------------------------------------------------------------- +%% External exports +%%---------------------------------------------------------------------- +-export([init/1, + terminate/2, + code_change/3, + handle_info/2]). + +%% Exports from "CosEventChannelAdmin::ConsumerAdmin" +-export([obtain_push_supplier/3, + obtain_pull_supplier/3]). + + +%%---------------------------------------------------------------------- +%% Internal exports +%%---------------------------------------------------------------------- +%% Exports from "oe_CosEventComm::Event" +-export([send/3, send_sync/4]). + +%%---------------------------------------------------------------------- +%% Records +%%---------------------------------------------------------------------- +-record(state, {channel_pid, typecheck, maxevents, proxies = [], + server_options}). + +%%---------------------------------------------------------------------- +%% Macros +%%---------------------------------------------------------------------- + + +%%====================================================================== +%% External functions +%%====================================================================== +%%---------------------------------------------------------------------- +%% Function : init/1 +%% Returns : {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%---------------------------------------------------------------------- +init([ChannelPid, TypeCheck, MaxEvents, ServerOpts]) -> + process_flag(trap_exit, true), + {ok, #state{channel_pid = ChannelPid, typecheck = TypeCheck, + maxevents = MaxEvents, server_options = ServerOpts}}. + +%%---------------------------------------------------------------------- +%% Function : terminate/2 +%% Returns : any (ignored by gen_server) +%% Description: Shutdown the server +%%---------------------------------------------------------------------- +terminate(_Reason, _State) -> + ?DBG("Terminating ~p~n", [_Reason]), + ok. + +%%---------------------------------------------------------------------- +%% Function : code_change/3 +%% Returns : {ok, NewState} +%% Description: Convert process state when code is changed +%%---------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------% +%% function : handle_info +%% Arguments: +%% Returns : {noreply, State} | +%% {stop, Reason, State} +%% Effect : Functions demanded by the gen_server module. +%%---------------------------------------------------------------------- +handle_info({'EXIT', Pid, Reason}, #state{channel_pid = Pid} = State) -> + ?DBG("Parent Channel terminated ~p~n", [Reason]), + orber:dbg("[~p] oe_CosEventComm_PullerS_impl:handle_info(~p);~n" + "My Channel terminated and so will I which will cause" + " my children to do the same thing.", + [?LINE, Reason], ?DEBUG_LEVEL), + {stop, Reason, State}; +handle_info({'EXIT', Pid, _Reason}, #state{proxies = Proxies} = State) -> + %% A child terminated which is normal. Hence, no logging. + ?DBG("Probably a child terminated ~p~n", [_Reason]), + {noreply, State#state{proxies = lists:keydelete(Pid, 2, Proxies)}}; +handle_info(_Info, State) -> + ?DBG("Unknown Info ~p~n", [_Info]), + {noreply, State}. + +%%---------------------------------------------------------------------- +%% Function : obtain_push_supplier +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +obtain_push_supplier(_, _, #state{server_options = ServerOpts} = State) -> + case catch 'oe_CosEventComm_PusherS':oe_create_link([self(), + State#state.typecheck], + [{sup_child, true}|ServerOpts]) of + {ok, Pid, Proxy} -> + ?DBG("Started a new oe_CosEventComm_PusherS.~n", []), + {reply, Proxy, State#state{proxies = [{Proxy, Pid}|State#state.proxies]}}; + Other -> + orber:dbg("[~p] oe_CosEventComm_CAdmin:obtain_push_supplier();~nError: ~p", + [?LINE, Other], ?DEBUG_LEVEL), + corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) + end. + +%%---------------------------------------------------------------------- +%% Function : obtain_pull_supplier +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +obtain_pull_supplier(_, _, #state{server_options = ServerOpts} = State) -> + case catch 'oe_CosEventComm_PullerS':oe_create_link([self(), + State#state.typecheck, + State#state.maxevents], + [{sup_child, true}|ServerOpts]) of + {ok, Pid, Proxy} -> + ?DBG("Started a new oe_CosEventComm_PullerS.~n", []), + {reply, Proxy, State#state{proxies = [{Proxy, Pid}|State#state.proxies]}}; + Other -> + orber:dbg("[~p] oe_CosEventComm_CAdmin:obtain_pull_supplier();~nError: ~p", + [?LINE, Other], ?DEBUG_LEVEL), + corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) + end. + + +%%---------------------------------------------------------------------- +%% Function : send +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +send(_, #state{proxies = Proxies} = State, Any) -> + ?DBG("Received Event ~p~n", [Any]), + case send_helper(Proxies, Any, [], false) of + ok -> + ?DBG("Received Event and forwarded it successfully.~n", []), + {noreply, State}; + {error, Dropped} -> + ?DBG("Received Event but forward failed to: ~p~n", [Dropped]), + RemainingProxies = delete_proxies(Dropped, Proxies), + {noreply, State#state{proxies = RemainingProxies}} + end. + +%%---------------------------------------------------------------------- +%% Function : send_sync +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +send_sync(_, OE_From, #state{proxies = Proxies} = State, Any) -> + ?DBG("Received Event ~p~n", [Any]), + corba:reply(OE_From, ok), + case send_helper(Proxies, Any, [], true) of + ok -> + ?DBG("Received Event and forwarded (sync) it successfully.~n", []), + {noreply, State}; + {error, Dropped} -> + ?DBG("Received Event but forward (sync) failed to: ~p~n", [Dropped]), + RemainingProxies = delete_proxies(Dropped, Proxies), + {noreply, State#state{proxies = RemainingProxies}} + end. + + +%%====================================================================== +%% Internal functions +%%====================================================================== +send_helper([], _, [], _) -> + ok; +send_helper([], _, Dropped, _) -> + {error, Dropped}; +send_helper([{ObjRef, Pid}|T], Event, Dropped, false) -> + case catch 'oe_CosEventComm_Event':send(ObjRef, Event) of + ok -> + send_helper(T, Event, Dropped, false); + What -> + orber:dbg("[~p] oe_CosEventComm_CAdmin:send_helper(~p, ~p);~n" + "Bad return value ~p. Closing connection.", + [?LINE, ObjRef, Event, What], ?DEBUG_LEVEL), + send_helper(T, Event, [{ObjRef, Pid}|Dropped], false) + end; +send_helper([{ObjRef, Pid}|T], Event, Dropped, Sync) -> + case catch 'oe_CosEventComm_Event':send_sync(ObjRef, Event) of + ok -> + send_helper(T, Event, Dropped, Sync); + What -> + orber:dbg("[~p] oe_CosEventComm_CAdmin:send_helper(~p, ~p);~n" + "Bad return value ~p. Closing connection.", + [?LINE, ObjRef, Event, What], ?DEBUG_LEVEL), + send_helper(T, Event, [{ObjRef, Pid}|Dropped], Sync) + end. + +delete_proxies([], RemainingProxies) -> + RemainingProxies; +delete_proxies([{_,Pid}|T], Proxies) -> + Rest = lists:keydelete(Pid, 2, Proxies), + delete_proxies(T, Rest). + +%%====================================================================== +%% END OF MODULE +%%====================================================================== diff --git a/lib/cosEvent/src/oe_CosEventComm_Channel_impl.erl b/lib/cosEvent/src/oe_CosEventComm_Channel_impl.erl new file mode 100644 index 0000000000..531edaa0af --- /dev/null +++ b/lib/cosEvent/src/oe_CosEventComm_Channel_impl.erl @@ -0,0 +1,246 @@ +%%-------------------------------------------------------------------- +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2001-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%% +%%---------------------------------------------------------------------- +%% File : oe_CosEventComm_Channel_impl.erl +%% Description : +%% +%%---------------------------------------------------------------------- +-module(oe_CosEventComm_Channel_impl). + +%%---------------------------------------------------------------------- +%% Include files +%%---------------------------------------------------------------------- +-include_lib("orber/include/corba.hrl"). +-include("cosEventApp.hrl"). + + +%%---------------------------------------------------------------------- +%% External exports +%%---------------------------------------------------------------------- +%% Mandatory +-export([init/1, + terminate/2, + code_change/3, + handle_info/2]). + +%% Exports from "CosEventChannelAdmin::EventChannel" +-export([for_consumers/3, + for_suppliers/3, + destroy/3]). + + +%%---------------------------------------------------------------------- +%% Internal exports +%%---------------------------------------------------------------------- +%% Exports from "oe_CosEventComm::Event" +-export([send/3, send_sync/4]). + +%%---------------------------------------------------------------------- +%% Records +%%---------------------------------------------------------------------- +-record(state, {typecheck, pull_interval, maxevents, blocking, cadmins = [], + server_options}). + +%%---------------------------------------------------------------------- +%% Macros +%%---------------------------------------------------------------------- + +%%====================================================================== +%% External functions +%%====================================================================== +%%---------------------------------------------------------------------- +%% Function : init/1 +%% Returns : {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%---------------------------------------------------------------------- +init([Options, ServerOpts]) -> + process_flag(trap_exit, true), + PullI = cosEventApp:get_option(?PULL_INTERVAL, Options), + TC = cosEventApp:get_option(?TYPECHECK, Options), + Max = cosEventApp:get_option(?MAXEVENTS, Options), + Blocking = cosEventApp:get_option(?BLOCKING, Options), + {ok, #state{typecheck = TC, pull_interval = PullI, maxevents = Max, + blocking = Blocking, server_options = ServerOpts}}. + +%%---------------------------------------------------------------------- +%% Function : terminate/2 +%% Returns : any (ignored by gen_server) +%% Description: Shutdown the server +%%---------------------------------------------------------------------- +terminate(_Reason, _State) -> + ?DBG("Terminating ~p~n", [_Reason]), + ok. + +%%---------------------------------------------------------------------- +%% Function : code_change/3 +%% Returns : {ok, NewState} +%% Description: Convert process state when code is changed +%%---------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------% +%% function : handle_info +%% Arguments: +%% Returns : {noreply, State} | +%% {stop, Reason, State} +%% Effect : Functions demanded by the gen_server module. +%%---------------------------------------------------------------------- +handle_info({'EXIT', Pid, _Reason}, #state{cadmins = CAdmins} = State) -> + ?DBG("Probably a child terminated with Reason: ~p~n", [_Reason]), + {noreply, State#state{cadmins = lists:keydelete(Pid, 2, CAdmins)}}; +handle_info(_Info, State) -> + ?DBG("Unknown Info ~p~n", [_Info]), + {noreply, State}. + + +%%---------------------------------------------------------------------- +%% Function : for_consumers +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +for_consumers(_, _, #state{server_options = ServerOpts} = State) -> + case catch 'oe_CosEventComm_CAdmin':oe_create_link([self(), + State#state.typecheck, + State#state.maxevents, + ServerOpts], + [{sup_child, true}|ServerOpts]) of + {ok, Pid, AdminCo} -> + ?DBG("Created a new oe_CosEventComm_CAdmin.~n", []), + {reply, AdminCo, + State#state{cadmins = [{AdminCo, Pid}|State#state.cadmins]}}; + Other -> + orber:dbg("[~p] oe_CosEventComm_Channel:for_consumers(); Error: ~p", + [?LINE, Other], ?DEBUG_LEVEL), + corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) + end. + +%%---------------------------------------------------------------------- +%% Function : for_suppliers +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +for_suppliers(OE_This, _, #state{server_options = ServerOpts} = State) -> + case catch 'CosEventChannelAdmin_SupplierAdmin':oe_create_link([OE_This, self(), + State#state.typecheck, + State#state.pull_interval, + ServerOpts], + [{sup_child, true}|ServerOpts]) of + {ok, _Pid, AdminSu} -> + ?DBG("Created a new CosEventChannelAdmin_SupplierAdmin.~n", []), + {reply, AdminSu, State}; + Other -> + orber:dbg("[~p] oe_CosEventComm_Channel:for_suppliers();~nError: ~p", + [?LINE, Other], ?DEBUG_LEVEL), + corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) + end. + +%%---------------------------------------------------------------------- +%% Function : destroy +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +destroy(_, _, State) -> + ?DBG("Destroy invoked.", []), + {stop, normal, ok, State}. + +%%---------------------------------------------------------------------- +%% Function : send +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +send(_OE_This, #state{cadmins = CAdmins} = State, Any) -> + ?DBG("Received Event ~p~n", [Any]), + case send_helper(CAdmins, Any, [], false) of + ok -> + ?DBG("Received Event and forwarded it successfully.~n", []), + {noreply, State}; + {error, Dropped} -> + ?DBG("Received Event but forward failed for: ~p~n", [Dropped]), + RemainingAdmins = delete_cadmin(Dropped, CAdmins), + {noreply, State#state{cadmins = RemainingAdmins}} + end. + +%%---------------------------------------------------------------------- +%% Function : send_sync +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +send_sync(_OE_This, OE_From, #state{cadmins = CAdmins, blocking = BL} = State, Any) -> + ?DBG("Received Event ~p~n", [Any]), + corba:reply(OE_From, ok), + case send_helper(CAdmins, Any, [], BL) of + ok -> + ?DBG("Received Event and forwarded (sync) it successfully.~n", []), + {reply, ok, State}; + {error, Dropped} -> + ?DBG("Received Event but forward (sync) failed for: ~p~n", [Dropped]), + RemainingAdmins = delete_cadmin(Dropped, CAdmins), + {reply, ok, State#state{cadmins = RemainingAdmins}} + end. + + +%%====================================================================== +%% Internal functions +%%====================================================================== +send_helper([], _, [], _) -> + ok; +send_helper([], _, Dropped, _) -> + {error, Dropped}; +send_helper([{ObjRef, Pid}|T], Event, Dropped, false) -> + case catch 'oe_CosEventComm_CAdmin':send(ObjRef, Event) of + ok -> + send_helper(T, Event, Dropped, false); + What -> + orber:dbg("[~p] oe_CosEventComm_Channel:send_helper(~p, ~p);~n" + "Bad return value ~p. Closing connection.", + [?LINE, ObjRef, Event, What], ?DEBUG_LEVEL), + send_helper(T, Event, [{ObjRef, Pid}|Dropped], false) + end; +send_helper([{ObjRef, Pid}|T], Event, Dropped, Sync) -> + case catch 'oe_CosEventComm_CAdmin':send_sync(ObjRef, Event) of + ok -> + send_helper(T, Event, Dropped, Sync); + What -> + orber:dbg("[~p] oe_CosEventComm_Channel:send_helper(~p, ~p);~n" + "Bad return value ~p. Closing connection.", + [?LINE, ObjRef, Event, What], ?DEBUG_LEVEL), + send_helper(T, Event, [{ObjRef, Pid}|Dropped], Sync) + end. + + +delete_cadmin([], RemainingAdmins) -> + RemainingAdmins; +delete_cadmin([{_,Pid}|T], CAdmins) -> + Rest = lists:keydelete(Pid, 2, CAdmins), + delete_cadmin(T, Rest). + +%%====================================================================== +%% END OF MODULE +%%====================================================================== diff --git a/lib/cosEvent/src/oe_CosEventComm_PullerS_impl.erl b/lib/cosEvent/src/oe_CosEventComm_PullerS_impl.erl new file mode 100644 index 0000000000..5f2733e72d --- /dev/null +++ b/lib/cosEvent/src/oe_CosEventComm_PullerS_impl.erl @@ -0,0 +1,280 @@ +%%-------------------------------------------------------------------- +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2001-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%% +%%---------------------------------------------------------------------- +%% File : oe_CosEventComm_PullerS_impl.erl +%% Description : +%% +%%---------------------------------------------------------------------- +-module(oe_CosEventComm_PullerS_impl). + +%%---------------------------------------------------------------------- +%% Include files +%%---------------------------------------------------------------------- +-include_lib("orber/include/corba.hrl"). +-include("CosEventChannelAdmin.hrl"). +-include("CosEventComm.hrl"). +-include("cosEventApp.hrl"). + +%%---------------------------------------------------------------------- +%% External exports +%%---------------------------------------------------------------------- +-export([init/1, + terminate/2, + code_change/3, + handle_info/2]). +%% Exports from "CosEventChannelAdmin::ProxyPullSupplier" +-export([connect_pull_consumer/4]). + +%% Exports from "CosEventComm::PullSupplier" +-export([pull/3, + try_pull/3, + disconnect_pull_supplier/3]). + +%%---------------------------------------------------------------------- +%% Internal exports +%%---------------------------------------------------------------------- +%% Exports from "oe_CosEventComm::Event +-export([send/3, send_sync/4]). + +%%---------------------------------------------------------------------- +%% Records +%%---------------------------------------------------------------------- +-record(state, {admin_pid, client, db, respond_to, typecheck, maxevents}). + +%%---------------------------------------------------------------------- +%% Macros +%%---------------------------------------------------------------------- + +%%====================================================================== +%% External functions +%%====================================================================== +%%---------------------------------------------------------------------% +%% Function : init/1 +%% Returns : {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%---------------------------------------------------------------------- +init([AdminPid, TypeCheck, MaxEvents]) -> + process_flag(trap_exit, true), + {ok, #state{admin_pid = AdminPid, + db = ets:new(oe_ets, [set, private, ordered_set]), + typecheck = TypeCheck, maxevents = MaxEvents}}. + +%%---------------------------------------------------------------------% +%% function : handle_info +%% Arguments: +%% Returns : {noreply, State} | +%% {stop, Reason, State} +%% Effect : Functions demanded by the gen_server module. +%% The CosEvent specification states: +%% "A nil object reference may be passed to the connect_pull_consumer operation; +%% if so a channel cannot invoke a disconnect_pull_consumer operation on the +%% consumer; the consumer may be disconnected from the channel without being +%% informed." +%% If we would invoke the disconnect_pull_consumer operation +%% at the same time as the client tries to pull an event it +%% would cause a dead-lock. We can solve this by spawning a process +%% but as is the client will discover that the object no longer exists +%% the next time it tries to pull an event. +%%---------------------------------------------------------------------- +handle_info({'EXIT', Pid, Reason}, #state{admin_pid = Pid} = State) -> + orber:dbg("[~p] oe_CosEventComm_PullerS_impl:handle_info(~p);~n" + "My Admin terminated and so will I.", + [?LINE, Reason], ?DEBUG_LEVEL), + {stop, Reason, State}; +handle_info(_Info, State) -> + ?DBG("Unknown Info ~p~n", [_Info]), + {noreply, State}. + +%%---------------------------------------------------------------------% +%% Function : terminate/2 +%% Returns : any (ignored by gen_server) +%% Description: Shutdown the server +%%---------------------------------------------------------------------- +terminate(_Reason, #state{client = undefined, respond_to = undefined, db = DB}) -> + ?DBG("Terminating ~p; no client connected and no pending pull's.~n", [_Reason]), + ets:delete(DB), + ok; +terminate(_Reason, #state{client = undefined, respond_to = ReplyTo, db = DB}) -> + ?DBG("Terminating ~p; no client connected but a pending pull.~n", [_Reason]), + corba:reply(ReplyTo, {'EXCEPTION', #'CosEventComm_Disconnected'{}}), + ets:delete(DB), + ok; +terminate(_Reason, #state{client = Client, respond_to = undefined, db = DB}) -> + ?DBG("Terminating ~p; no pending pull~n", [_Reason]), + cosEventApp:disconnect('CosEventComm_PullConsumer', + disconnect_pull_consumer, Client), + ets:delete(DB), + ok; +terminate(_Reason, #state{client = Client, respond_to = ReplyTo, db = DB}) -> + ?DBG("Terminating ~p; pending pull~n", [_Reason]), + corba:reply(ReplyTo, {'EXCEPTION', #'CosEventComm_Disconnected'{}}), + cosEventApp:disconnect('CosEventComm_PullConsumer', + disconnect_pull_consumer, Client), + ets:delete(DB), + ok. + +%%---------------------------------------------------------------------% +%% Function : code_change/3 +%% Returns : {ok, NewState} +%% Description: Convert process state when code is changed +%%---------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------% +%% Function : connect_pull_consumer +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +connect_pull_consumer(_OE_This, _OE_From, #state{client = undefined, + typecheck = TypeCheck} = State, + NewClient) -> + case corba_object:is_nil(NewClient) of + true -> + ?DBG("A NIL client supplied.~n", []), + {reply, ok, State}; + false -> + cosEventApp:type_check(NewClient, 'CosEventComm_PullConsumer', TypeCheck), + ?DBG("Connected to client.~n", []), + {reply, ok, State#state{client = NewClient}} + end; +connect_pull_consumer(_, _, _, _) -> + corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}). + + +%%---------------------------------------------------------------------% +%% Function : pull +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +pull(_OE_This, OE_From, State) -> + case get_event(State#state.db) of + false -> + ?DBG("pull invoked but no event stored; put the client on hold.~n", []), + {noreply, State#state{respond_to = OE_From}}; + Event -> + ?DBG("pull invoked and returned: ~p~n", [Event]), + {reply, Event, State} + end. + +%%---------------------------------------------------------------------% +%% Function : try_pull +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +try_pull(_OE_This, _OE_From, State) -> + case get_event(State#state.db) of + false -> + ?DBG("try_pull invoked but no event stored.~n", []), + {reply, {any:create(orber_tc:long(), 0), false}, State}; + Event -> + ?DBG("try_pull invoked and returned: ~p~n", [Event]), + {reply, {Event, true}, State} + end. + +%%---------------------------------------------------------------------% +%% Function : disconnect_pull_supplier +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +disconnect_pull_supplier(_OE_This, _OE_From, State) -> + ?DBG("Disconnect invoked ~p ~n", [State]), + {stop, normal, ok, State#state{client = undefined}}. + + +%%====================================================================== +%% Internal functions +%%====================================================================== +%%---------------------------------------------------------------------% +%% Function : send +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +send(_OE_This, #state{respond_to = undefined} = State, Any) -> + ?DBG("Received event ~p and stored it.~n", [Any]), + store_event(State#state.db, State#state.maxevents, Any), + {noreply, State}; +send(_OE_This, State, Any) -> + ?DBG("Received event ~p and sent it to pending client.~n", [Any]), + corba:reply(State#state.respond_to, Any), + {noreply, State#state{respond_to = undefined}}. + +%%---------------------------------------------------------------------% +%% Function : send_sync +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +send_sync(_OE_This, _OE_From, #state{respond_to = undefined} = State, Any) -> + ?DBG("Received event ~p and stored it (sync).~n", [Any]), + store_event(State#state.db, State#state.maxevents, Any), + {reply, ok, State}; +send_sync(_OE_This, _OE_From, State, Any) -> + ?DBG("Received event ~p and sent it to pending client (sync).~n", [Any]), + corba:reply(State#state.respond_to, Any), + {reply, ok, State#state{respond_to = undefined}}. + + +%%---------------------------------------------------------------------% +%% Function : store_event +%% Arguments : DB - ets reference +%% Event - CORBA::Any +%% Returns : true +%% Description: Insert the event in FIFO order. +%%---------------------------------------------------------------------- +store_event(DB, Max, Event) -> + case ets:info(DB, size) of + CurrentSize when CurrentSize < Max -> + ets:insert(DB, {now(), Event}); + _ -> + orber:dbg("[~p] oe_CosEventComm_PullerS:store_event(~p); DB full drop event.", + [?LINE, Event], ?DEBUG_LEVEL), + true + end. + +%%---------------------------------------------------------------------% +%% Function : get_event +%% Arguments : DB - ets reference +%% Event - CORBA::Any +%% Returns : false | Event (CORBA::Any) +%% Description: Lookup event in FIFO order; return false if no event exists. +%%---------------------------------------------------------------------- +get_event(DB) -> + case ets:first(DB) of + '$end_of_table' -> + false; + Key -> + [{_, Event}] = ets:lookup(DB, Key), + ets:delete(DB, Key), + Event + end. + +%%====================================================================== +%% END OF MODULE +%%====================================================================== diff --git a/lib/cosEvent/src/oe_CosEventComm_PusherS_impl.erl b/lib/cosEvent/src/oe_CosEventComm_PusherS_impl.erl new file mode 100644 index 0000000000..c64b01ea1d --- /dev/null +++ b/lib/cosEvent/src/oe_CosEventComm_PusherS_impl.erl @@ -0,0 +1,217 @@ +%%-------------------------------------------------------------------- +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2001-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%% +%%---------------------------------------------------------------------- +%% File : oe_CosEventComm_PusherS_impl.erl +%% Description : +%% +%%---------------------------------------------------------------------- +-module(oe_CosEventComm_PusherS_impl). + +%%---------------------------------------------------------------------- +%% Include files +%%---------------------------------------------------------------------- +-include_lib("orber/include/corba.hrl"). +-include("CosEventChannelAdmin.hrl"). +-include("CosEventComm.hrl"). +-include("cosEventApp.hrl"). + + +%%---------------------------------------------------------------------- +%% External exports +%%---------------------------------------------------------------------- +-export([init/1, + terminate/2, + code_change/3, + handle_info/2]). + +%% Exports from "CosEventChannelAdmin::ProxyPushSupplier" +-export([connect_push_consumer/4]). + +%% Exports from "CosEventComm::PushSupplier" +-export([disconnect_push_supplier/3]). + + +%%---------------------------------------------------------------------- +%% Internal exports +%%---------------------------------------------------------------------- +%% Exports from "oe_CosEventComm::Event" +-export([send/3, send_sync/4]). + +%%---------------------------------------------------------------------- +%% Records +%%---------------------------------------------------------------------- +-record(state, {admin_pid, client, typecheck}). + +%%---------------------------------------------------------------------- +%% Macros +%%---------------------------------------------------------------------- + +%%====================================================================== +%% External functions +%%====================================================================== +%%---------------------------------------------------------------------- +%% Function : init/1 +%% Returns : {ok, State} | +%% {ok, State, Timeout} | +%% ignore | +%% {stop, Reason} +%% Description: Initiates the server +%%---------------------------------------------------------------------- +init([AdminPid, TypeCheck]) -> + process_flag(trap_exit, true), + {ok, #state{admin_pid = AdminPid, typecheck = TypeCheck}}. + +%%---------------------------------------------------------------------- +%% Function : terminate/2 +%% Returns : any (ignored by gen_server) +%% Description: Shutdown the server +%%---------------------------------------------------------------------- +terminate(_Reason, #state{client = undefined}) -> + ?DBG("Terminating ~p; no client connected.~n", [_Reason]), + ok; +terminate(_Reason, #state{client = Client} = _State) -> + ?DBG("Terminating ~p~n", [_Reason]), + cosEventApp:disconnect('CosEventComm_PushConsumer', + disconnect_push_consumer, Client), + ok. + +%%---------------------------------------------------------------------- +%% Function : code_change/3 +%% Returns : {ok, NewState} +%% Description: Convert process state when code is changed +%%---------------------------------------------------------------------- +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%---------------------------------------------------------------------% +%% function : handle_info +%% Arguments: +%% Returns : {noreply, State} | +%% {stop, Reason, State} +%% Effect : Functions demanded by the gen_server module. +%%---------------------------------------------------------------------- +handle_info({'EXIT', Pid, Reason}, #state{admin_pid = Pid} = State) -> + ?DBG("Parent Admin terminated ~p~n", [Reason]), + orber:dbg("[~p] oe_CosEventComm_PusherS_impl:handle_info(~p);~n" + "My Admin terminated and so will I.", + [?LINE, Reason], ?DEBUG_LEVEL), + {stop, Reason, State}; +handle_info(_Info, State) -> + ?DBG("Unknown Info ~p~n", [_Info]), + {noreply, State}. + +%%---------------------------------------------------------------------% +%% Function : connect_push_consumer +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +connect_push_consumer(_OE_This, _, #state{client = undefined, + typecheck = TypeCheck} = State, NewClient) -> + case corba_object:is_nil(NewClient) of + true -> + orber:dbg("[~p] oe_CosEventComm_PusherS_impl:connect_push_consumer(..);~n" + "Supplied a NIL reference which is not allowed.", + [?LINE], ?DEBUG_LEVEL), + corba:raise(#'BAD_PARAM'{completion_status = ?COMPLETED_NO}); + false -> + cosEventApp:type_check(NewClient, 'CosEventComm_PushConsumer', TypeCheck), + ?DBG("Connected to client.~n", []), + {reply, ok, State#state{client = NewClient}} + end; +connect_push_consumer(_, _, _, _) -> + corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}). + + +%%---------------------------------------------------------------------% +%% Function : disconnect_push_supplier +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +disconnect_push_supplier(_OE_This, _, State) -> + ?DBG("Disconnect invoked ~p ~n", [State]), + {stop, normal, ok, State#state{client = undefined}}. + +%%====================================================================== +%% Internal functions +%%====================================================================== +%%---------------------------------------------------------------------- +%% Function : send +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +send(_OE_This, #state{client = undefined} = State, _Any) -> + %% No consumer connected. + ?DBG("Received event ~p but have no client.~n", [_Any]), + {noreply, State}; +send(_OE_This, #state{client = Client} = State, Any) -> + %% Push Data + case catch 'CosEventComm_PushConsumer':push(Client, Any) of + ok -> + ?DBG("Received event ~p and delivered it client.~n", [Any]), + {noreply, State}; + {'EXCEPTION', #'CosEventComm_Disconnected'{}} -> + ?DBG("Received event ~p but failed to deliver it since the client claims we are disconnected.~n", [Any]), + {stop, normal, State#state{client = undefined}}; + Other -> + ?DBG("Received event ~p but failed to deliver it to client.~n", [Any]), + orber:dbg("[~p] oe_CosEventComm_PusherS_impl:send(~p);~n" + "My Client behaves badly, returned ~p, so I will terminate.", + [?LINE, Any, Other], ?DEBUG_LEVEL), + {stop, normal, State} + end. + + +%%---------------------------------------------------------------------- +%% Function : send_sync +%% Arguments : +%% Returns : +%% Description: +%%---------------------------------------------------------------------- +send_sync(_OE_This, _OE_From, #state{client = undefined} = State, _Any) -> + %% No consumer connected. + ?DBG("Received event ~p but have no client.~n", [_Any]), + {reply, ok, State}; +send_sync(_OE_This, OE_From, #state{client = Client} = State, Any) -> + corba:reply(OE_From, ok), + %% Push Data + case catch 'CosEventComm_PushConsumer':push(Client, Any) of + ok -> + ?DBG("Received event ~p and delivered (sync) it client.~n", [Any]), + {noreply, State}; + {'EXCEPTION', #'CosEventComm_Disconnected'{}} -> + ?DBG("Received event ~p but failed to deliver (sync) it since the client claims we are disconnected.~n", [Any]), + {stop, normal, State#state{client = undefined}}; + Other -> + ?DBG("Received event ~p but failed to deliver (sync) it to client.~n", [Any]), + orber:dbg("[~p] oe_CosEventComm_PusherS_impl:send_sync(~p);~n" + "My Client behaves badly, returned ~p, so I will terminate.", + [?LINE, Any, Other], ?DEBUG_LEVEL), + {stop, normal, State} + end. + + +%%====================================================================== +%% END OF MODULE +%%====================================================================== + |