diff options
author | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
---|---|---|
committer | Erlang/OTP <[email protected]> | 2009-11-20 14:54:40 +0000 |
commit | 84adefa331c4159d432d22840663c38f155cd4c1 (patch) | |
tree | bff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/cosNotification/src/PusherSupplier_impl.erl | |
download | otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2 otp-84adefa331c4159d432d22840663c38f155cd4c1.zip |
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/cosNotification/src/PusherSupplier_impl.erl')
-rw-r--r-- | lib/cosNotification/src/PusherSupplier_impl.erl | 1052 |
1 files changed, 1052 insertions, 0 deletions
diff --git a/lib/cosNotification/src/PusherSupplier_impl.erl b/lib/cosNotification/src/PusherSupplier_impl.erl new file mode 100644 index 0000000000..51949b8c46 --- /dev/null +++ b/lib/cosNotification/src/PusherSupplier_impl.erl @@ -0,0 +1,1052 @@ +%%-------------------------------------------------------------------- +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1999-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%% +%%---------------------------------------------------------------------- +%% File : PusherSupplier_impl.erl +%% Purpose : +%%---------------------------------------------------------------------- + +-module('PusherSupplier_impl'). + + +%%--------------- INCLUDES ----------------------------------- +-include_lib("orber/include/corba.hrl"). +-include_lib("orber/include/ifr_types.hrl"). +%% cosEvent files. +-include_lib("cosEvent/include/CosEventChannelAdmin.hrl"). +-include_lib("cosEvent/include/CosEventComm.hrl"). +%% Application files +-include("CosNotification.hrl"). +-include("CosNotifyChannelAdmin.hrl"). +-include("CosNotifyComm.hrl"). +-include("CosNotifyFilter.hrl"). + +-include("CosNotification_Definitions.hrl"). + +%%--------------- EXPORTS ------------------------------------ +%%--------------- External ----------------------------------- +%%----- CosNotifyChannelAdmin::ProxyPushSupplier ------------- +-export([connect_any_push_consumer/4]). + +%%----- CosNotifyChannelAdmin::StructuredProxyPushSupplier --- +-export([connect_structured_push_consumer/4]). + +%%----- CosNotifyChannelAdmin::SequenceProxyPushSupplier ----- +-export([connect_sequence_push_consumer/4]). + +%%----- CosNotifyChannelAdmin::*ProxyPushSupplier ------------ +-export([suspend_connection/3, + resume_connection/3]). + +%%----- Inherit from CosNotifyChannelAdmin::ProxySupplier ---- +-export([obtain_offered_types/4, + validate_event_qos/4]). + +%%----- Inherit from CosNotification::QoSAdmin --------------- +-export([get_qos/3, + set_qos/4, + validate_qos/4]). + +%%----- Inherit from CosNotifyComm::NotifySubscribe ---------- +-export([subscription_change/5]). + +%%----- Inherit from CosNotifyFilter::FilterAdmin ------------ +-export([add_filter/4, + remove_filter/4, + get_filter/4, + get_all_filters/3, + remove_all_filters/3]). + +%%----- Inherit from CosEventComm::PushSupplier ------------- +-export([disconnect_push_supplier/3]). + +%%----- Inherit from CosNotifyComm::StructuredPushSupplier -- +-export([disconnect_structured_push_supplier/3]). + +%%----- Inherit from CosNotifyComm::SequencePushSupplier ---- +-export([disconnect_sequence_push_supplier/3]). + +%%----- Inherit from CosEventChannelAdmin::ProxyPushSupplier +-export([connect_push_consumer/4]). + +%% Attributes (external) CosNotifyChannelAdmin::ProxySupplier +-export(['_get_MyType'/3, + '_get_MyAdmin'/3, + '_get_priority_filter'/3, + '_set_priority_filter'/4, + '_get_lifetime_filter'/3, + '_set_lifetime_filter'/4]). + +%%--------------- Internal ----------------------------------- +%%----- Inherit from cosNotificationComm --------------------- +-export([callAny/5, + callSeq/5]). + +%%--------------- gen_server specific exports ---------------- +-export([handle_info/2, code_change/3]). +-export([init/1, terminate/2]). + +%%--------------- LOCAL DEFINITIONS -------------------------- +%% Data structures +-record(state, {myType, + myAdmin, + myAdminPid, + myChannel, + myFilters = [], + myOperator, + idCounter = 0, + prioFil, + lifetFil, + client, + qosGlobal, + qosLocal, + suspended = false, + pacingTimer, + subscribeType = false, + subscribeData = true, + etsR, + eventDB, + this, + maxCache, + cacheTimeout, + cacheInterval}). + +%% Data structures constructors +-define(get_InitState(_MyT, _MyA, _MyAP, _QS, _LQS, _Ch, _MyOp, _GT, _GL, _TR), + #state{myType = _MyT, + myAdmin = _MyA, + myAdminPid= _MyAP, + qosGlobal = _QS, + qosLocal = _LQS, + myChannel = _Ch, + myOperator=_MyOp, + etsR = ets:new(oe_ets, [set, protected]), + eventDB = cosNotification_eventDB:create_db(_LQS, _GT, _GL, _TR), + maxCache = cosNotificationApp:max_events()}). + +%% Data structures selectors +%%-------------- Data structures selectors ----------------- +%% Attributes +-define(get_MyType(S), S#state.myType). +-define(get_MyAdmin(S), S#state.myAdmin). +-define(get_MyAdminPid(S), S#state.myAdminPid). +-define(get_MyChannel(S), S#state.myChannel). +-define(get_MyOperator(S), S#state.myOperator). +-define(get_PrioFil(S), S#state.prioFil). +-define(get_LifeTFil(S), S#state.lifetFil). +%% Client Object +-define(get_Client(S), S#state.client). +%% QoS +-define(get_GlobalQoS(S), S#state.qosGlobal). +-define(get_LocalQoS(S), S#state.qosLocal). +-define(get_BothQoS(S), {S#state.qosGlobal, S#state.qosLocal}). +%% Filters +-define(get_Filter(S, I), find_obj(lists:keysearch(I, 1, S#state.myFilters))). +-define(get_AllFilter(S), S#state.myFilters). +-define(get_AllFilterID(S), find_ids(S#state.myFilters)). +%% Amin +-define(get_PacingTimer(S), S#state.pacingTimer). +-define(get_PacingInterval(S), round(?not_GetPacingInterval((S#state.qosLocal))/10000000)). +-define(get_BatchLimit(S), ?not_GetMaximumBatchSize((S#state.qosLocal))). +%% Subscribe +-define(get_AllSubscribe(S), lists:flatten(ets:match(S#state.etsR, + {'$1',subscribe}))). +-define(get_SubscribeType(S), S#state.subscribeType). +-define(get_SubscribeData(S), S#state.subscribeData). +%% ID +-define(get_IdCounter(S), S#state.idCounter). +-define(get_SubscribeDB(S), S#state.etsR). +%% Event +-define(is_PersistentConnection(S), + (?not_GetConnectionReliability((S#state.qosLocal)) == ?not_Persistent)). +-define(is_PersistentEvent(S), + (?not_GetEventReliability((S#state.qosLocal)) == ?not_Persistent)). + +-define(get_Event(S), cosNotification_eventDB:get_event(S#state.eventDB, + false)). +% (not ?is_PersistentEvent(S)))). +-define(get_Events(S,M), cosNotification_eventDB:get_events(S#state.eventDB, M, false)). +% (not ?is_PersistentEvent(S)))). + +%%-------------- Data structures modifiers ----------------- +%% Attributes +-define(set_PrioFil(S,D), S#state{prioFil=D}). +-define(set_LifeTFil(S,D), S#state{lifetFil=D}). +%% Client Object +-define(set_Client(S,D), S#state{client=D}). +-define(del_Client(S), S#state{client=undefined}). +-define(set_Unconnected(S), S#state{client=undefined}). +-define(set_Suspended(S), S#state{suspended=true}). +-define(set_NotSuspended(S), S#state{suspended=false}). +%% QoS +-define(set_LocalQoS(S,D), S#state{qosLocal=D}). +-define(set_GlobalQoS(S,D), S#state{qosGlobal=D}). +-define(set_BothQoS(S,GD,LD), S#state{qosGlobal=GD, qosLocal=LD}). +%% Filters +-define(add_Filter(S,I,O), S#state{myFilters=[{I,O}|S#state.myFilters]}). +-define(del_Filter(S,I), S#state{myFilters= + delete_obj(lists:keydelete(I, 1, S#state.myFilters), + S#state.myFilters)}). +-define(del_AllFilter(S), S#state{myFilters=[]}). +%% Admin +-define(set_PacingTimer(S,T), S#state{pacingTimer=T}). +%% Publish +%% Subscribe +-define(add_Subscribe(S,E), ets:insert(S#state.etsR, {E, subscribe})). +-define(del_Subscribe(S,E), ets:delete(S#state.etsR, E)). +-define(set_SubscribeType(S,T), S#state{subscribeType=T}). +-define(set_SubscribeData(S,D), S#state{subscribeData=D}). +%% ID +-define(set_IdCounter(S,V), S#state{idCounter=V}). +-define(new_Id(S), 'CosNotification_Common':create_id(S#state.idCounter)). +%% Events +-define(add_Event(S,E), catch cosNotification_eventDB: + add_event(S#state.eventDB, E, S#state.lifetFil, S#state.prioFil)). +-define(addAndGet_Event(S,E), catch cosNotification_eventDB: + add_and_get_event(S#state.eventDB, E, S#state.lifetFil, S#state.prioFil, + false)). +% ?is_PersistentEvent(S))). +-define(update_EventDB(S,Q), S#state{eventDB= + cosNotification_eventDB:update(S#state.eventDB, Q)}). + + +%%-------------- MISC ---------------------------------------- +-define(is_ANY(S), S#state.myType == 'PUSH_ANY'). +-define(is_STRUCTURED(S), S#state.myType == 'PUSH_STRUCTURED'). +-define(is_SEQUENCE(S), S#state.myType == 'PUSH_SEQUENCE'). +-define(is_ANDOP(S), S#state.myOperator == 'AND_OP'). +-define(is_UnConnected(S), S#state.client == undefined). +-define(is_Connected(S), S#state.client =/= undefined). +-define(is_Suspended(S), S#state.suspended == true). +-define(is_NotSuspended(S), S#state.suspended == false). +-define(is_BatchLimitReached(S), cosNotification_eventDB:status(S#state.eventDB, + {batchLimit, + ?not_GetMaximumBatchSize((S#state.qosLocal))})). +-define(has_Filters(S), S#state.myFilters =/= []). + +%%----------------------------------------------------------% +%% function : handle_info, code_change +%% Arguments: +%% Returns : +%% Effect : Functions demanded by the gen_server module. +%%----------------------------------------------------------- + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_info(Info, #state{cacheTimeout = Timeout, + cacheInterval = Interval} = State) -> + ?DBG("INFO: ~p~n", [Info]), + case Info of + {'EXIT', Pid, Reason} when ?get_MyAdminPid(State)==Pid -> + ?DBG("PARENT ADMIN: ~p TERMINATED.~n",[Reason]), + {stop, Reason, State}; + {'EXIT', _Pid, _Reason} -> + ?DBG("PROXYPUSHSUPPLIER: ~p TERMINATED.~n",[_Reason]), + {noreply, State}; + pacing -> + lookup_and_push(State, true); + cacheInterval -> + lookup_and_push(State, true); + cacheTimeout when Timeout == undefined, Interval == undefined -> + %% Late message, do not terminate + {noreply, State}; + cacheTimeout -> + 'CosNotification_Common':notify([{proxy, State#state.this}, + {client, State#state.client}, + {reason, + {timer, "Reached upper limit"}}]), + {stop, normal, State}; + _ -> + {noreply, State} + end. + +%%----------------------------------------------------------% +%% function : init, terminate +%% Arguments: +%%----------------------------------------------------------- + +init([MyType, MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel, Options, Operator]) -> + process_flag(trap_exit, true), + GCTime = 'CosNotification_Common':get_option(gcTime, Options, + ?not_DEFAULT_SETTINGS), + GCLimit = 'CosNotification_Common':get_option(gcLimit, Options, + ?not_DEFAULT_SETTINGS), + TimeRef = 'CosNotification_Common':get_option(timeService, Options, + ?not_DEFAULT_SETTINGS), + timer:start(), + {ok, ?get_InitState(MyType, MyAdmin, MyAdminPid, + InitQoS, LQS, MyChannel, Operator, GCTime, GCLimit, TimeRef)}. + +terminate(_Reason, State) when ?is_UnConnected(State) -> + stop_timer(State#state.cacheTimeout), + stop_timer(State#state.cacheInterval), + stop_timer(State#state.pacingTimer), + %% We are not connected to a Client. Hence, no need to invoke disconnect. + ok; +terminate(_Reason, State) when ?is_ANY(State) -> + stop_timer(State#state.cacheTimeout), + stop_timer(State#state.cacheInterval), + stop_timer(State#state.pacingTimer), + 'CosNotification_Common':disconnect('CosEventComm_PushConsumer', + disconnect_push_consumer, + ?get_Client(State)); +terminate(_Reason, State) when ?is_SEQUENCE(State) -> + stop_timer(State#state.cacheTimeout), + stop_timer(State#state.cacheInterval), + stop_timer(State#state.pacingTimer), + 'CosNotification_Common':disconnect('CosNotifyComm_SequencePushConsumer', + disconnect_sequence_push_consumer, + ?get_Client(State)); +terminate(_Reason, State) when ?is_STRUCTURED(State) -> + stop_timer(State#state.cacheTimeout), + stop_timer(State#state.cacheInterval), + stop_timer(State#state.pacingTimer), + 'CosNotification_Common':disconnect('CosNotifyComm_StructuredPushConsumer', + disconnect_structured_push_consumer, + ?get_Client(State)). + +%%----------------------------------------------------------- +%%----- CosNotifyChannelAdmin_ProxySupplier attributes ------ +%%----------------------------------------------------------- +%%----------------------------------------------------------% +%% Attribute: '_get_MyType' +%% Type : readonly +%% Returns : +%%----------------------------------------------------------- +'_get_MyType'(_OE_THIS, _OE_FROM, State) -> + {reply, ?get_MyType(State), State}. + +%%----------------------------------------------------------% +%% Attribute: '_get_MyAdmin' +%% Type : readonly +%% Returns : +%%----------------------------------------------------------- +'_get_MyAdmin'(_OE_THIS, _OE_FROM, State) -> + {reply, ?get_MyAdmin(State), State}. + +%%----------------------------------------------------------% +%% Attribute: '_*et_priority_filter' +%% Type : read/write +%% Returns : +%%----------------------------------------------------------- +'_get_priority_filter'(_OE_THIS, _OE_FROM, State) -> + {reply, ?get_PrioFil(State), State}. +'_set_priority_filter'(_OE_THIS, _OE_FROM, State, PrioF) -> + {reply, ok, ?set_PrioFil(State, PrioF)}. + +%%----------------------------------------------------------% +%% Attribute: '_*et_lifetime_filter' +%% Type : read/write +%% Returns : +%%----------------------------------------------------------- +'_get_lifetime_filter'(_OE_THIS, _OE_FROM, State) -> + {reply, ?get_LifeTFil(State), State}. +'_set_lifetime_filter'(_OE_THIS, _OE_FROM, State, LifeTF) -> + {reply, ok, ?set_LifeTFil(State, LifeTF)}. + +%%----------------------------------------------------------- +%%------- Exported external functions ----------------------- +%%----------------------------------------------------------- +%%----- CosEventChannelAdmin::ProxyPushSupplier ------------- +%%----------------------------------------------------------% +%% function : connect_push_consumer +%% Arguments: Client - CosEventComm::PushConsumer +%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} | +%% {'EXCEPTION', #'TypeError'{}} +%% Both exceptions from CosEventChannelAdmin!!!! +%%----------------------------------------------------------- +connect_push_consumer(OE_THIS, OE_FROM, State, Client) -> + connect_any_push_consumer(OE_THIS, OE_FROM, State, Client). + +%%----- CosNotifyChannelAdmin::ProxyPushSupplier ------------ +%%----------------------------------------------------------% +%% function : connect_any_push_consumer +%% Arguments: Client - CosEventComm::PushConsumer +%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} | +%% {'EXCEPTION', #'TypeError'{}} +%% Both exceptions from CosEventChannelAdmin!!!! +%%----------------------------------------------------------- +connect_any_push_consumer(OE_THIS, _OE_FROM, State, Client) when ?is_ANY(State) -> + 'CosNotification_Common':type_check(Client, 'CosEventComm_PushConsumer'), + if + ?is_Connected(State) -> + corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}); + true -> + {reply, ok, State#state{client = Client, this = OE_THIS}} + end; +connect_any_push_consumer(_, _, _, _) -> + corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). + +%%----- CosNotifyChannelAdmin::SequenceProxyPushSupplier ---- +%%----------------------------------------------------------% +%% function : connect_sequence_push_consumer +%% Arguments: Client - CosNotifyComm::SequencePushConsumer +%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} | +%% {'EXCEPTION', #'TypeError'{}} +%%----------------------------------------------------------- +connect_sequence_push_consumer(OE_THIS, _OE_FROM, State, Client) when ?is_SEQUENCE(State) -> + 'CosNotification_Common':type_check(Client, + 'CosNotifyComm_SequencePushConsumer'), + if + ?is_Connected(State) -> + corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}); + true -> + NewState = start_timer(State), + {reply, ok, NewState#state{client = Client, this = OE_THIS}} + end; +connect_sequence_push_consumer(_, _, _, _) -> + corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). + +%%----- CosNotifyChannelAdmin::StructuredProxyPushSupplier --- +%%----------------------------------------------------------% +%% function : connect_structured_push_consumer +%% Arguments: Client - CosNotifyComm::StructuredPushConsumer +%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} | +%% {'EXCEPTION', #'TypeError'{}} +%%----------------------------------------------------------- +connect_structured_push_consumer(OE_THIS, _OE_FROM, State, Client) when ?is_STRUCTURED(State) -> + 'CosNotification_Common':type_check(Client, + 'CosNotifyComm_StructuredPushConsumer'), + if + ?is_Connected(State) -> + corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}); + true -> + {reply, ok, State#state{client = Client, this = OE_THIS}} + end; +connect_structured_push_consumer(_, _, _, _) -> + corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). + +%%----- CosNotifyChannelAdmin::*ProxyPushSupplier ----------- +%%----------------------------------------------------------% +%% function : suspend_connection +%% Arguments: +%% Returns : ok | {'EXCEPTION', #'ConnectionAlreadyInactive'{}} | +%% {'EXCEPTION', #'NotConneced'{}} +%%----------------------------------------------------------- +suspend_connection(_OE_THIS, _OE_FROM, State) when ?is_Connected(State) -> + if + ?is_Suspended(State) -> + corba:raise(#'CosNotifyChannelAdmin_ConnectionAlreadyInactive'{}); + true -> + stop_timer(State#state.pacingTimer), + {reply, ok, State#state{pacingTimer = undefined, + suspended=true}} + end; +suspend_connection(_,_,_)-> + corba:raise(#'CosNotifyChannelAdmin_NotConnected'{}). + +%%----------------------------------------------------------% +%% function : resume_connection +%% Arguments: +%% Returns : ok | {'EXCEPTION', #'ConnectionAlreadyActive'{}} | +%% {'EXCEPTION', #'NotConneced'{}} +%%----------------------------------------------------------- +resume_connection(_OE_THIS, OE_FROM, State) when ?is_Connected(State) -> + if + ?is_NotSuspended(State) -> + corba:raise(#'CosNotifyChannelAdmin_ConnectionAlreadyActive'{}); + true -> + corba:reply(OE_FROM, ok), + if + ?is_SEQUENCE(State) -> + start_timer(State); + true -> + ok + end, + lookup_and_push(?set_NotSuspended(State)) + end; +resume_connection(_,_,_) -> + corba:raise(#'CosNotifyChannelAdmin_NotConnected'{}). + +%%----- Inherit from CosNotifyChannelAdmin::ProxySupplier --- +%%----------------------------------------------------------% +%% function : obtain_offered_types +%% Arguments: Mode - enum 'ObtainInfoMode' (CosNotifyChannelAdmin) +%% Returns : CosNotification::EventTypeSeq +%%----------------------------------------------------------- +obtain_offered_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_OFF') -> + {reply, ?get_AllSubscribe(State), ?set_SubscribeType(State, false)}; +obtain_offered_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_ON') -> + {reply, ?get_AllSubscribe(State), ?set_SubscribeType(State, true)}; +obtain_offered_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_OFF') -> + {reply, [], ?set_SubscribeType(State, false)}; +obtain_offered_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_ON') -> + {reply, [], ?set_SubscribeType(State, true)}; +obtain_offered_types(_,_,_,What) -> + orber:dbg("[~p] PusherSupplier:obtain_offered_types(~p);~n" + "Bad enumerant", [?LINE, What], ?DEBUG_LEVEL), + corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). + +%%----------------------------------------------------------% +%% function : validate_event_qos +%% Arguments: RequiredQoS - CosNotification::QoSProperties +%% Returns : ok | {'EXCEPTION', #'UnsupportedQoS'{}} +%% AvilableQoS - CosNotification::NamedPropertyRangeSeq (out) +%%----------------------------------------------------------- +validate_event_qos(_OE_THIS, _OE_FROM, State, RequiredQoS) -> + AvilableQoS = 'CosNotification_Common':validate_event_qos(RequiredQoS, + ?get_LocalQoS(State)), + {reply, {ok, AvilableQoS}, State}. + +%%----- Inherit from CosNotification::QoSAdmin -------------- +%%----------------------------------------------------------% +%% function : get_qos +%% Arguments: +%% Returns : +%%----------------------------------------------------------- +get_qos(_OE_THIS, _OE_FROM, State) -> + {reply, ?get_GlobalQoS(State), State}. + +%%----------------------------------------------------------% +%% function : set_qos +%% Arguments: QoS - CosNotification::QoSProperties, i.e., +%% [#'Property'{name, value}, ...] where name eq. string() +%% and value eq. any(). +%% Returns : ok | {'EXCEPTION', CosNotification::UnsupportedQoS} +%%----------------------------------------------------------- +set_qos(_OE_THIS, _OE_FROM, State, QoS) -> + {NewQoS, LQS} = 'CosNotification_Common':set_qos(QoS, ?get_BothQoS(State), + proxy, ?get_MyAdmin(State), + false), + NewState = ?update_EventDB(State, LQS), + {reply, ok, ?set_BothQoS(NewState, NewQoS, LQS)}. + +%%----------------------------------------------------------% +%% function : validate_qos +%% Arguments: Required_qos - CosNotification::QoSProperties +%% [#'Property'{name, value}, ...] where name eq. string() +%% and value eq. any(). +%% Returns : {'EXCEPTION', CosNotification::UnsupportedQoS} +%% {ok, CosNotification::NamedPropertyRangeSeq} +%%----------------------------------------------------------- +validate_qos(_OE_THIS, _OE_FROM, State, Required_qos) -> + QoS = 'CosNotification_Common':validate_qos(Required_qos, ?get_BothQoS(State), + proxy, ?get_MyAdmin(State), + false), + {reply, {ok, QoS}, State}. + +%%----- Inherit from CosNotifyComm::NotifySubscribe --------- +%%----------------------------------------------------------% +%% function : subscription_change +%% Arguments: Added - #'CosNotification_EventType'{} +%% Removed - #'CosNotification_EventType'{} +%% Returns : ok | +%% {'EXCEPTION', #'CosNotifyComm_InvalidEventType'{}} +%%----------------------------------------------------------- +subscription_change(_OE_THIS, _OE_FROM, State, Added, Removed) -> + cosNotification_Filter:validate_types(Added), + cosNotification_Filter:validate_types(Removed), + %% On this "side", we care about which type of events the client + %% will require, since the client (or an agent) clearly stated + %% that it's only interested in these types of events. + %% Also see PusherConsumer- and PullerConsumer-'offer_change'. + update_subscribe(remove, State, Removed), + CurrentSub = ?get_AllSubscribe(State), + NewState = + case cosNotification_Filter:check_types(Added++CurrentSub) of + true -> + %% Types supplied does in some way cause all events to be valid. + %% Smart? Would have been better to not supply any at all. + ?set_SubscribeData(State, true); + {ok, Which, WC} -> + ?set_SubscribeData(State, {Which, WC}) + end, + update_subscribe(add, NewState, Added), + case ?get_SubscribeType(NewState) of + true -> + %% Perhaps we should handle exception here?! + %% Probably not. Better to stay "on-line". + catch 'CosNotifyComm_NotifyPublish': + offer_change(?get_Client(NewState), Added, Removed), + ok; + _-> + ok + end, + {reply, ok, NewState}. + +update_subscribe(_, _, [])-> + ok; +update_subscribe(add, State, [H|T]) -> + ?add_Subscribe(State, H), + update_subscribe(add, State, T); +update_subscribe(remove, State, [H|T]) -> + ?del_Subscribe(State, H), + update_subscribe(remove, State, T). + +%%----- Inherit from CosNotifyFilter::FilterAdmin ----------- +%%----------------------------------------------------------% +%% function : add_filter +%% Arguments: Filter - CosNotifyFilter::Filter +%% Returns : FilterID - long +%%----------------------------------------------------------- +add_filter(_OE_THIS, _OE_FROM, State, Filter) -> + 'CosNotification_Common':type_check(Filter, 'CosNotifyFilter_Filter'), + FilterID = ?new_Id(State), + NewState = ?set_IdCounter(State, FilterID), + {reply, FilterID, ?add_Filter(NewState, FilterID, Filter)}. + +%%----------------------------------------------------------% +%% function : remove_filter +%% Arguments: FilterID - long +%% Returns : ok +%%----------------------------------------------------------- +remove_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) -> + {reply, ok, ?del_Filter(State, FilterID)}; +remove_filter(_,_,_,What) -> + orber:dbg("[~p] PusherSupplier:remove_filter(~p); Not an integer", + [?LINE, What], ?DEBUG_LEVEL), + corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}). + +%%----------------------------------------------------------% +%% function : get_filter +%% Arguments: FilterID - long +%% Returns : Filter - CosNotifyFilter::Filter | +%% {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}} +%%----------------------------------------------------------- +get_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) -> + {reply, ?get_Filter(State, FilterID), State}; +get_filter(_,_,_,What) -> + orber:dbg("[~p] PusherSupplier:get_filter(~p); Not an integer", + [?LINE, What], ?DEBUG_LEVEL), + corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}). + +%%----------------------------------------------------------% +%% function : get_all_filters +%% Arguments: - +%% Returns : Filter - CosNotifyFilter::FilterIDSeq +%%----------------------------------------------------------- +get_all_filters(_OE_THIS, _OE_FROM, State) -> + {reply, ?get_AllFilterID(State), State}. + +%%----------------------------------------------------------% +%% function : remove_all_filters +%% Arguments: - +%% Returns : ok +%%----------------------------------------------------------- +remove_all_filters(_OE_THIS, _OE_FROM, State) -> + {reply, ok, ?del_AllFilter(State)}. + + +%%----- Inherit from CosEventComm::PushSupplier ------------- +%%----------------------------------------------------------% +%% function : disconnect_push_supplier +%% Arguments: - +%% Returns : ok +%%----------------------------------------------------------- +disconnect_push_supplier(_OE_THIS, _OE_FROM, State) -> + {stop, normal, ok, ?set_Unconnected(State)}. + +%%----- Inherit from CosNotifyComm::StructuredPushSupplier -- +%%----------------------------------------------------------% +%% function : disconnect_structured_push_supplier +%% Arguments: - +%% Returns : ok +%%----------------------------------------------------------- +disconnect_structured_push_supplier(_OE_THIS, _OE_FROM, State) -> + {stop, normal, ok, ?set_Unconnected(State)}. + +%%----- Inherit from CosNotifyComm::SequencePushSupplier ---- +%%----------------------------------------------------------% +%% function : disconnect_sequence_push_supplier +%% Arguments: - +%% Returns : ok +%%----------------------------------------------------------- +disconnect_sequence_push_supplier(_OE_THIS, _OE_FROM, State) -> + {stop, normal, ok, ?set_Unconnected(State)}. + +%%--------------- LOCAL FUNCTIONS ---------------------------- +find_obj({value, {_, Obj}}) -> Obj; +find_obj(_) -> {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}. + +find_ids(List) -> find_ids(List, []). +find_ids([], Acc) -> Acc; +find_ids([{I,_}|T], Acc) -> find_ids(T, [I|Acc]); +find_ids(What, _) -> + orber:dbg("[~p] PusherSupplier:find_ids();~n" + "Id corrupt: ~p", [?LINE, What], ?DEBUG_LEVEL), + corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}). + +%% Delete a single object. +%% The list do not differ, i.e., no filter removed, raise exception. +delete_obj(List,List) -> corba:raise(#'CosNotifyFilter_FilterNotFound'{}); +delete_obj(List,_) -> List. + +%%----------------------------------------------------------- +%% function : callSeq +%% Arguments: +%% Returns : +%%----------------------------------------------------------- +callSeq(_OE_THIS, OE_FROM, State, EventsIn, Status) -> + corba:reply(OE_FROM, ok), + case cosNotification_eventDB:validate_event(?get_SubscribeData(State), EventsIn, + ?get_AllFilter(State), + ?get_SubscribeDB(State), + Status) of + {[],_} -> + ?DBG("PROXY NOSUBSCRIPTION SEQUENCE/STRUCTURED: ~p~n",[EventsIn]), + {noreply, State}; + {Events,_} when ?is_Suspended(State) -> + store_events(State, Events), + {noreply, State}; + {Events,_} when ?is_UnConnected(State) -> + orber:dbg("[~p] PusherSupplier:callAny();~n" + "Not connected, dropping event(s): ~p", + [?LINE, Events], ?DEBUG_LEVEL), + {noreply, State}; + {[Event],_} when ?is_STRUCTURED(State) -> + ?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Event]), + empty_db(State, ?addAndGet_Event(State, Event)); + {[Event],_} when ?is_ANY(State) -> + ?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Event]), + AnyEvent = any:create('CosNotification_StructuredEvent':tc(),Event), + empty_db(State, ?addAndGet_Event(State, AnyEvent)); + {Events,_} -> + ?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Events]), + store_events(State, Events), + lookup_and_push(State) + end. + +%%----------------------------------------------------------- +%% function : callAny +%% Arguments: +%% Returns : +%%----------------------------------------------------------- +callAny(_OE_THIS, OE_FROM, State, EventIn, Status) -> + corba:reply(OE_FROM, ok), + case cosNotification_eventDB:validate_event(?get_SubscribeData(State), EventIn, + ?get_AllFilter(State), + ?get_SubscribeDB(State), + Status) of + {[],_} -> + ?DBG("PROXY NOSUBSCRIPTION ANY: ~p~n",[EventIn]), + %% To be on the safe side, test if there are any events that not + %% have been forwarded (should only be possible if StartTime is used). + lookup_and_push(State); + {Event,_} when ?is_Suspended(State), ?is_ANY(State) -> + ?add_Event(State, Event), + {noreply, State}; + {Event,_} when ?is_Suspended(State) -> + ?add_Event(State, ?not_CreateSE("","%ANY","",[],[],Event)), + {noreply, State}; + {Event,_} when ?is_UnConnected(State) -> + orber:dbg("[~p] PusherSupplier:callAny();~n" + "Not connected, dropping event: ~p", + [?LINE, Event], ?DEBUG_LEVEL), + {noreply, State}; + {Event,_} when ?is_ANY(State) -> + ?DBG("PROXY RECEIVED ANY: ~p~n",[Event]), + %% We must store the event since there may be other events that should + %% be delivered first, e.g., higher priority. + empty_db(State, ?addAndGet_Event(State, Event)); + {Event,_} when ?is_SEQUENCE(State) -> + ?DBG("PROXY RECEIVED ANY==>SEQUENCE: ~p~n",[Event]), + StrEvent = ?not_CreateSE("","%ANY","",[],[],Event), + ?add_Event(State, StrEvent), + lookup_and_push(State); + {Event,_} -> + ?DBG("PROXY RECEIVED ANY==>STRUCTURED: ~p~n",[Event]), + StrEvent = ?not_CreateSE("","%ANY","",[],[],Event), + empty_db(State, ?addAndGet_Event(State, StrEvent)) + end. + +%% Lookup and push "the correct" amount of events. +lookup_and_push(State) -> + %% The boolean indicates, if false, that we will only push events if we have + %% passed the BatchLimit. If true we will ignore this limit and push events + %% anyway (typcially invoked when pacing limit passed). + lookup_and_push(State, false). +lookup_and_push(State, false) when ?is_SEQUENCE(State) -> + case ?is_BatchLimitReached(State) of + true -> + case ?get_Events(State, ?get_BatchLimit(State)) of + {[], _, _} -> + ?DBG("BATCHLIMIT (~p) REACHED BUT NO EVENTS FOUND~n", + [?get_BatchLimit(State)]), + {noreply, State}; + {Events, _, Keys} -> + ?DBG("BATCHLIMIT (~p) REACHED, EVENTS FOUND: ~p~n", + [?get_BatchLimit(State), Events]), + case catch 'CosNotifyComm_SequencePushConsumer': + push_structured_events(?get_Client(State), Events) of + ok -> + cosNotification_eventDB:delete_events(Keys), + lookup_and_push(reset_cache(State), false); + {'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') orelse + is_record(E, 'NO_PERMISSION') orelse + is_record(E, 'CosEventComm_Disconnected') -> + ?DBG("PUSH SUPPLIER CLIENT NO LONGER EXIST~n", []), + 'CosNotification_Common':notify([{proxy, State#state.this}, + {client, ?get_Client(State)}, + {reason, {'EXCEPTION', E}}]), + {stop, normal, State}; + What when ?is_PersistentEvent(State), + ?is_PersistentConnection(State) -> + orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" + "Client respond incorrect: ~p", + [?LINE, What], ?DEBUG_LEVEL), + check_cache(State); + What when ?is_PersistentConnection(State) -> + %% Here we should do something when we want to handle + %% Persistent EventReliability. + orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" + "Client respond incorrect: ~p~n" + "Dropping events: ~p", + [?LINE, What, Events], ?DEBUG_LEVEL), + cosNotification_eventDB:delete_events(Keys), + {noreply, State}; + WhatII -> + orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" + "Client respond incorrect: ~p~n" + "Terminating and dropping events: ~p", + [?LINE, WhatII, Events], ?DEBUG_LEVEL), + 'CosNotification_Common':notify([{proxy, State#state.this}, + {client, ?get_Client(State)}, + {reason, WhatII}]), + {stop, normal, State} + end + end; + _ -> + ?DBG("BATCHLIMIT (~p) NOT REACHED~n",[?get_BatchLimit(State)]), + {noreply, State} + end; +lookup_and_push(State, true) when ?is_SEQUENCE(State) -> + case ?get_Events(State, ?get_BatchLimit(State)) of + {[], _, _} -> + ?DBG("PACELIMIT REACHED BUT NO EVENTS FOUND~n", []), + {noreply, State}; + {Events, _, Keys} -> + ?DBG("PACELIMIT REACHED, EVENTS FOUND: ~p~n", [Events]), + case catch 'CosNotifyComm_SequencePushConsumer': + push_structured_events(?get_Client(State), Events) of + ok -> + cosNotification_eventDB:delete_events(Keys), + lookup_and_push(reset_cache(State), false); + {'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') orelse + is_record(E, 'NO_PERMISSION') orelse + is_record(E, 'CosEventComm_Disconnected') -> + orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" + "Client no longer exists; terminating and dropping events: ~p", + [?LINE, Events], ?DEBUG_LEVEL), + 'CosNotification_Common':notify([{proxy, State#state.this}, + {client, ?get_Client(State)}, + {reason, {'EXCEPTION', E}}]), + {stop, normal, State}; + What when ?is_PersistentEvent(State), + ?is_PersistentConnection(State) -> + orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" + "Client respond incorrect: ~p", + [?LINE, What], ?DEBUG_LEVEL), + check_cache(State); + What when ?is_PersistentConnection(State) -> + %% Here we should do something when we want to handle + %% Persistent EventReliability. + orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" + "Client respond incorrect: ~p~n" + "Dropping events: ~p", + [?LINE, What, Events], ?DEBUG_LEVEL), + cosNotification_eventDB:delete_events(Keys), + {noreply, State}; + WhatII -> + orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" + "Client respond incorrect: ~p~n" + "Terminating and dropping events: ~p", + [?LINE, WhatII, Events], ?DEBUG_LEVEL), + 'CosNotification_Common':notify([{proxy, State#state.this}, + {client, ?get_Client(State)}, + {reason, WhatII}]), + {stop, normal, State} + end + end; +lookup_and_push(State, _) -> + empty_db(State, ?get_Event(State)). + + +%% Push all events stored while not connected or received in sequence. +empty_db(State, {[], _, _}) -> + {noreply, State}; +empty_db(State, {Event, _, Keys}) when ?is_STRUCTURED(State) -> + case catch 'CosNotifyComm_StructuredPushConsumer': + push_structured_event(?get_Client(State), Event) of + ok -> + cosNotification_eventDB:delete_events(Keys), + NewState = reset_cache(State), + empty_db(NewState, ?get_Event(NewState)); + {'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') orelse + is_record(E, 'NO_PERMISSION') orelse + is_record(E, 'CosEventComm_Disconnected') -> + orber:dbg("[~p] PusherSupplier:empty_db();~n" + "Client no longer exists; terminating and dropping: ~p", + [?LINE, Event], ?DEBUG_LEVEL), + 'CosNotification_Common':notify([{proxy, State#state.this}, + {client, ?get_Client(State)}, + {reason, {'EXCEPTION', E}}]), + {stop, normal, State}; + What when ?is_PersistentEvent(State), + ?is_PersistentConnection(State) -> + orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" + "Client respond incorrect: ~p", + [?LINE, What], ?DEBUG_LEVEL), + check_cache(State); + What when ?is_PersistentConnection(State) -> + %% Here we should do something when we want to handle + %% Persistent EventReliability. + orber:dbg("[~p] PusherSupplier:empty_db();~n" + "Client respond incorrect: ~p~n" + "Dropping event: ~p", + [?LINE, What, Event], ?DEBUG_LEVEL), + cosNotification_eventDB:delete_events(Keys), + {noreply, State}; + WhatII -> + orber:dbg("[~p] PusherSupplier:empty_db();~n" + "Client respond incorrect: ~p~n" + "Terminating and dropping: ~p", + [?LINE, WhatII, Event], ?DEBUG_LEVEL), + 'CosNotification_Common':notify([{proxy, State#state.this}, + {client, ?get_Client(State)}, + {reason, WhatII}]), + {stop, normal, State} + end; +empty_db(State, {Event, _, Keys}) when ?is_ANY(State) -> + case catch 'CosEventComm_PushConsumer':push(?get_Client(State), Event) of + ok -> + cosNotification_eventDB:delete_events(Keys), + NewState = reset_cache(State), + empty_db(NewState, ?get_Event(NewState)); + {'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') orelse + is_record(E, 'NO_PERMISSION') orelse + is_record(E, 'CosEventComm_Disconnected') -> + orber:dbg("[~p] PusherSupplier:empty_db();~n" + "Client no longer exists; terminating and dropping: ~p", + [?LINE, Event], ?DEBUG_LEVEL), + 'CosNotification_Common':notify([{proxy, State#state.this}, + {client, ?get_Client(State)}, + {reason, {'EXCEPTION', E}}]), + {stop, normal, State}; + What when ?is_PersistentEvent(State), + ?is_PersistentConnection(State) -> + orber:dbg("[~p] PusherSupplier:lookup_and_push();~n" + "Client respond incorrect: ~p", + [?LINE, What], ?DEBUG_LEVEL), + check_cache(State); + What when ?is_PersistentConnection(State) -> + %% Here we should do something when we want to handle + %% Persistent EventReliability. + orber:dbg("[~p] PusherSupplier:empty_db();~n" + "Client respond incorrect: ~p~n" + "Dropping Event: ~p", + [?LINE, What, Event], ?DEBUG_LEVEL), + cosNotification_eventDB:delete_events(Keys), + {noreply, State}; + WhatII -> + orber:dbg("[~p] PusherSupplier:empty_db();~n" + "Client respond incorrect: ~p~n" + "Terminating and dropping: ~p", + [?LINE, WhatII, Event], ?DEBUG_LEVEL), + 'CosNotification_Common':notify([{proxy, State#state.this}, + {client, ?get_Client(State)}, + {reason, WhatII}]), + {stop, normal, State} + end. + +reset_cache(#state{cacheTimeout = undefined, + cacheInterval = undefined} = State) -> + State; +reset_cache(State) -> + stop_timer(State#state.cacheTimeout), + stop_timer(State#state.cacheInterval), + State#state{cacheTimeout = undefined, + cacheInterval = undefined}. + +check_cache(#state{maxCache = Max, cacheTimeout = Timeout, + cacheInterval = Interval} = State) -> + case cosNotification_eventDB:status(State#state.eventDB, eventCounter) of + Count when Count > Max -> + %% Reached the upper limit, terminate. + 'CosNotification_Common':notify([{proxy, State#state.this}, + {client, State#state.client}, + {reason, {max_events, Max}}]), + {stop, normal, State}; + _ when Timeout == undefined, Interval == undefined -> + case {timer:send_interval(cosNotificationApp:interval_events(), + cacheInterval), + timer:send_after(cosNotificationApp:timeout_events(), + cacheTimeout)} of + {{ok, IntervalRef}, {ok, TimeoutRef}} -> + {noreply, State#state{cacheTimeout = TimeoutRef, + cacheInterval = IntervalRef}}; + Error -> + orber:dbg("[~p] PusherSupplier:check_cache();~n" + "Unable to start timers: ~p", + [?LINE, Error], ?DEBUG_LEVEL), + 'CosNotification_Common':notify([{proxy, State#state.this}, + {client, State#state.client}, + {reason, {timer, Error}}]), + {stop, normal, State} + end; + _ -> + %% Timers already started. + {noreply, State} + end. + +store_events(_State, []) -> + ok; +store_events(State, [Event|Rest]) when ?is_ANY(State) -> + AnyEvent = any:create('CosNotification_StructuredEvent':tc(),Event), + ?add_Event(State, AnyEvent), + store_events(State, Rest); +store_events(State, [Event|Rest]) -> + ?add_Event(State, Event), + store_events(State, Rest). + +%% Start timers which send a message each time we should push events. Only used +%% when this objects is defined to supply sequences. +start_timer(State) -> + case ?get_PacingInterval(State) of + 0 -> + ?DBG("PUSH SUPPLIER STARTED NO TIMER (0), BATCH LIMIT: ~p~n", + [?get_BatchLimit(State)]), + + State; + PacInt -> + case catch timer:send_interval(timer:seconds(PacInt), pacing) of + {ok,PacTRef} -> + ?DBG("PUSH SUPPLIER STARTED TIMER, BATCH LIMIT: ~p~n", + [?get_BatchLimit(State)]), + ?set_PacingTimer(State, PacTRef); + What -> + orber:dbg("[~p] PusherSupplier:start_timer();~n" + "Unable to invoke timer:send_interval/2: ~p", + [?LINE, What], ?DEBUG_LEVEL), + corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) + end + end. + +stop_timer(undefined) -> + ?DBG("PUSH SUPPLIER HAVE NO TIMER TO STOP~n",[]), + ok; +stop_timer(Timer) -> + ?DBG("PUSH SUPPLIER STOPPED TIMER~n",[]), + timer:cancel(Timer), + ok. + + +%%--------------- MISC FUNCTIONS, E.G. DEBUGGING ------------- +%%--------------- END OF MODULE ------------------------------ |