diff options
Diffstat (limited to 'lib/cosNotification/src/PullerConsumer_impl.erl')
| -rw-r--r-- | lib/cosNotification/src/PullerConsumer_impl.erl | 774 | 
1 files changed, 0 insertions, 774 deletions
| diff --git a/lib/cosNotification/src/PullerConsumer_impl.erl b/lib/cosNotification/src/PullerConsumer_impl.erl deleted file mode 100644 index 52bd13918f..0000000000 --- a/lib/cosNotification/src/PullerConsumer_impl.erl +++ /dev/null @@ -1,774 +0,0 @@ -%%-------------------------------------------------------------------- -%% -%% %CopyrightBegin% -%%  -%% Copyright Ericsson AB 1999-2016. All Rights Reserved. -%%  -%% Licensed under the Apache License, Version 2.0 (the "License"); -%% you may not use this file except in compliance with the License. -%% You may obtain a copy of the License at -%% -%%     http://www.apache.org/licenses/LICENSE-2.0 -%% -%% Unless required by applicable law or agreed to in writing, software -%% distributed under the License is distributed on an "AS IS" BASIS, -%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -%% See the License for the specific language governing permissions and -%% limitations under the License. -%%  -%% %CopyrightEnd% -%% -%% -%%---------------------------------------------------------------------- -%% File    : PullerConsumer_impl.erl -%% Purpose :  -%%---------------------------------------------------------------------- - --module('PullerConsumer_impl'). - -%%--------------- INCLUDES ----------------------------------- --include_lib("orber/include/corba.hrl"). --include_lib("orber/include/ifr_types.hrl"). -%% cosEvent files. --include_lib("cosEvent/include/CosEventChannelAdmin.hrl"). --include_lib("cosEvent/include/CosEventComm.hrl"). -%% Application files --include("CosNotification.hrl"). --include("CosNotifyChannelAdmin.hrl"). --include("CosNotifyComm.hrl"). --include("CosNotifyFilter.hrl"). --include("CosNotification_Definitions.hrl"). - -%%--------------- EXPORTS ------------------------------------ -%%--------------- External ----------------------------------- -%%----- CosNotifyChannelAdmin::ProxyPullConsumer ------------- --export([connect_any_pull_supplier/4]). - -%%----- CosNotifyChannelAdmin::SequenceProxyPullConsumer ----- --export([connect_sequence_pull_supplier/4]). - -%%----- CosNotifyChannelAdmin::StructuredProxyPullConsumer --- --export([connect_structured_pull_supplier/4]). - -%%----- CosNotifyChannelAdmin::*ProxyPullConsumer ------------ --export([suspend_connection/3,  -	 resume_connection/3]). - -%%----- Inherit from CosNotifyChannelAdmin::ProxyConsumer ---- --export([obtain_subscription_types/4, -	 validate_event_qos/4]). - -%%----- Inherit from CosNotification::QoSAdmin --------------- --export([get_qos/3, -	 set_qos/4, -	 validate_qos/4]). - -%%----- Inherit from CosNotifyComm::NotifyPublish ------------ --export([offer_change/5]). - -%%----- Inherit from CosNotifyFilter::FilterAdmin ------------ --export([add_filter/4,  -	 remove_filter/4,  -	 get_filter/4, -	 get_all_filters/3,  -	 remove_all_filters/3]). - -%%----- Inherit from CosEventComm::PullConsumer ------------- --export([disconnect_pull_consumer/3]). - -%%----- Inherit from CosNotifyComm::SequencePullConsumer ---- --export([disconnect_sequence_pull_consumer/3]). - -%%----- Inherit from CosNotifyComm::StructuredPullConsumer -- --export([disconnect_structured_pull_consumer/3]). - -%%----- Inherit from CosEventChannelAdmin::ProxyPullConsumer --export([connect_pull_supplier/4]). - - -%% Attributes (external) CosNotifyChannelAdmin::ProxySupplier --export(['_get_MyType'/3,  -	 '_get_MyAdmin'/3]). - -%%--------------- gen_server specific exports ---------------- --export([handle_info/2, code_change/3]). --export([init/1, terminate/2]). - -%%--------------- LOCAL DEFINITIONS -------------------------- -%% Data structures --record(state, {myType, -	        myAdmin, -	        myAdminPid, -		myChannel, -		myFilters = [], -		myOperator, -		idCounter = 0, -		client, -		qosGlobal, -		qosLocal, -		suspended = false, -		pullTimer, -		pullInterval, -		publishType = false, -		etsR, -		eventCounter = 0, -		eventDB, -		this}). - -%% Data structures constructors --define(get_InitState(_MyT, _MyA, _MyAP, _QS, _LQS, _Ch, _PI, _MyOP, _GT, _GL, _TR),  -	#state{myType       = _MyT, -	       myAdmin      = _MyA, -	       myAdminPid   = _MyAP, -	       myChannel    = _Ch, -	       myOperator   = _MyOP, -	       qosGlobal    = _QS, -	       qosLocal     = _LQS, -	       pullInterval = _PI, -	       etsR         = ets:new(oe_ets, [set, protected]), -	       eventDB      = cosNotification_eventDB:create_db(_LQS, _GT, _GL, _TR)}). - -%%-------------- Data structures selectors ----------------- -%% Attributes --define(get_MyType(S),           S#state.myType). --define(get_MyAdmin(S),          S#state.myAdmin). --define(get_MyAdminPid(S),       S#state.myAdminPid). --define(get_MyChannel(S),        S#state.myChannel). --define(get_MyOperator(S),       S#state.myOperator). -%% Client Object --define(get_Client(S),           S#state.client). -%% QoS --define(get_GlobalQoS(S),        S#state.qosGlobal). --define(get_LocalQoS(S),         S#state.qosLocal). --define(get_BothQoS(S),          {S#state.qosGlobal, S#state.qosLocal}). -%% Filters --define(get_Filter(S, I),        find_obj(lists:keysearch(I, 1, S#state.myFilters))). --define(get_AllFilter(S),        S#state.myFilters). --define(get_AllFilterID(S),      find_ids(S#state.myFilters)). -%% Admin --define(get_PullInterval(S),     S#state.pullInterval). --define(get_PullTimer(S),        S#state.pullTimer). --define(get_PacingInterval(S),   round(?not_GetPacingInterval((S#state.qosLocal))/10000000)). --define(get_BatchLimit(S),       ?not_GetMaximumBatchSize((S#state.qosLocal))). -%% Publish --define(get_AllPublish(S),       lists:flatten(ets:match(S#state.etsR, -							 {'$1',publish}))). --define(get_PublishType(S),      S#state.publishType). -%% ID --define(get_IdCounter(S),        S#state.idCounter). -%% Event --define(get_Event(S),            cosNotification_eventDB:get_event(S#state.eventDB)). --define(get_Events(S,M),         cosNotification_eventDB:get_events(S#state.eventDB, M)). --define(get_EventCounter(S),     S#state.eventCounter). - -%%-------------- Data structures modifiers ----------------- -%% Client Object --define(set_Client(S,D),         S#state{client=D}). --define(del_Client(S),           S#state{client=undefined}). --define(set_Suspended(S),        S#state{client=true}). --define(set_NotSuspended(S),     S#state{client=false}). --define(set_Unconnected(S),      S#state{client=undefined}). -%% QoS --define(set_LocalQoS(S,D),       S#state{qosLocal=D}). --define(set_GlobalQoS(S,D),      S#state{qosGlobal=D}). --define(set_BothQoS(S,GD,LD),    S#state{qosGlobal=GD, qosLocal=LD}). -%% Filters --define(add_Filter(S,I,O),       S#state{myFilters=[{I,O}|S#state.myFilters]}). --define(del_Filter(S,I),         S#state{myFilters= -					 delete_obj(lists:keydelete(I, 1, S#state.myFilters), -						    S#state.myFilters)}). --define(del_AllFilter(S),        S#state{myFilters=[]}). -%% Admin --define(set_PullInterval(S,V),   S#state{pullInterval=V}). --define(set_PullTimer(S,T),      S#state{pullTimer=T}). -%% Publish --define(add_Publish(S,E),        ets:insert(S#state.etsR, {E, publish})). --define(del_Publish(S,E),        ets:delete(S#state.etsR, E)). --define(set_PublishType(S,T),    S#state{publishType=T}). -%% ID --define(set_IdCounter(S,V),      S#state{idCounter=V}). --define(new_Id(S),               'CosNotification_Common':create_id(S#state.idCounter)). -%% Event --define(add_Event(S,E),          cosNotification_eventDB:add_event(S#state.eventDB, E)). --define(update_EventDB(S,Q),     S#state{eventDB= -					 cosNotification_eventDB:update(S#state.eventDB, Q)}). - --define(set_EventCounter(S,V),   S#state{eventCounter=V}). --define(add_to_EventCounter(S,V),S#state{eventCounter=S#state.eventCounter+V}). --define(reset_EventCounter(S),   S#state{eventCounter=0}). --define(increase_EventCounter(S),S#state{eventCounter=(S#state.eventCounter+1)}). --define(decrease_EventCounter(S),S#state{eventCounter=S#state.eventCounter-1}). --define(add_ToEventCounter(S,A), S#state{eventCounter=(S#state.eventCounter+A)}). --define(sub_FromEventCounter(S,_A), S#state{eventCounter=(S#state.eventCounter-_A)}). --define(set_EventCounterTo(S,V), S#state{eventCounter=V}). - -%%-------------- MISC ---------------------------------------- --define(is_ANY(S),               S#state.myType == 'PULL_ANY'). --define(is_STRUCTURED(S),        S#state.myType == 'PULL_STRUCTURED'). --define(is_SEQUENCE(S),          S#state.myType == 'PULL_SEQUENCE'). --define(is_ANDOP(S),             S#state.myOperator == 'AND_OP'). --define(is_UnConnected(S),       S#state.client == undefined). --define(is_Connected(S),         S#state.client =/= undefined). --define(is_Suspended(S),         S#state.suspended == true). --define(is_NotSuspended(S),      S#state.suspended == false). --define(is_PersistentConnection(S), -	?not_GetConnectionReliability((S#state.qosLocal)) == ?not_Persistent). --define(is_PersistentEvent(S), -	?not_GetEventReliability((S#state.qosLocal)) == ?not_Persistent). - -%%-----------------------------------------------------------% -%% function : handle_info, code_change -%% Arguments:  -%% Returns  :  -%% Effect   : Functions demanded by the gen_server module.  -%%------------------------------------------------------------ - -code_change(_OldVsn, State, _Extra) -> -    {ok, State}. - -handle_info(Info, State) -> -    ?DBG("INFO: ~p~n", [Info]), -    case Info of -        {'EXIT', Pid, Reason} when ?get_MyAdminPid(State) == Pid-> -            ?DBG("PARENT ADMIN: ~p  TERMINATED.~n",[Reason]), -	    {stop, Reason, State}; -        {'EXIT', _Pid, _Reason} -> -            ?DBG("PROXYPUSHSUPPLIER: ~p  TERMINATED.~n",[_Reason]), -            {noreply, State}; -        pull -> -	    try_pull_events(State); -        _ -> -            {noreply, State} -    end. - -%%----------------------------------------------------------% -%% function : init, terminate -%% Arguments:  -%%----------------------------------------------------------- - -init([MyType, MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel, Options, Operator]) -> -    process_flag(trap_exit, true), -    Secs = timer:seconds('CosNotification_Common':get_option(pullInterval,  -							     Options,  -							     ?not_DEFAULT_SETTINGS)), -    GCTime = 'CosNotification_Common':get_option(gcTime, Options,  -						 ?not_DEFAULT_SETTINGS), -    GCLimit = 'CosNotification_Common':get_option(gcLimit, Options,  -						  ?not_DEFAULT_SETTINGS), -    TimeRef = 'CosNotification_Common':get_option(timeService, Options,  -						  ?not_DEFAULT_SETTINGS), -    timer:start(), -    {ok, ?get_InitState(MyType, MyAdmin, MyAdminPid, InitQoS,  -			LQS, MyChannel, Secs, Operator, GCTime,  -			GCLimit, TimeRef)}. - -terminate(_Reason, State) when ?is_UnConnected(State) -> -    %% We are currently not connected to a client. Hence, no need for sending -    %% a disconnect request. -    stop_timer(State), -    ok; -terminate(_Reason, State) when ?is_ANY(State) -> -    stop_timer(State), -    'CosNotification_Common':disconnect('CosEventComm_PullSupplier',  -					disconnect_pull_supplier,  -					?get_Client(State)); -terminate(_Reason, State) when ?is_SEQUENCE(State) -> -    stop_timer(State), -    'CosNotification_Common':disconnect('CosNotifyComm_SequencePullSupplier', -					disconnect_sequence_pull_supplier, -					?get_Client(State)); -terminate(_Reason, State) when ?is_STRUCTURED(State) -> -    stop_timer(State), -    'CosNotification_Common':disconnect('CosNotifyComm_StructuredPullSupplier',  -					disconnect_structured_pull_supplier,  -					?get_Client(State)). - -%%----------------------------------------------------------- -%%----- CosNotifyChannelAdmin_ProxyConsumer attributes ------ -%%----------------------------------------------------------- -%%----------------------------------------------------------% -%% Attribute: '_get_MyType' -%% Type     : readonly -%% Returns  :  -%%----------------------------------------------------------- -'_get_MyType'(_OE_THIS, _OE_FROM, State) -> -    {reply, ?get_MyType(State), State}. - -%%----------------------------------------------------------% -%% Attribute: '_get_MyAdmin' -%% Type     : readonly -%% Returns  :  -%%----------------------------------------------------------- -'_get_MyAdmin'(_OE_THIS, _OE_FROM, State) -> -    {reply, ?get_MyAdmin(State), State}. - -%%----------------------------------------------------------- -%%------- Exported external functions ----------------------- -%%----------------------------------------------------------- -%%----- CosEventChannelAdmin::ProxyPullConsumer ------------- -%%----------------------------------------------------------% -%% function : connect_pull_supplier -%% Arguments: Client - CosEventComm::PullSupplier -%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} | -%%            {'EXCEPTION', #'TypeError'{}} | -%%            {'EXCEPTION', #'BAD_OPERATION'{}} -%%            Both exceptions from CosEventChannelAdmin!!! -%%----------------------------------------------------------- -connect_pull_supplier(OE_THIS, OE_FROM, State, Client) -> -    connect_any_pull_supplier(OE_THIS, OE_FROM, State, Client). - -%%----- CosNotifyChannelAdmin::ProxyPullConsumer ------------ -%%----------------------------------------------------------% -%% function : connect_any_pull_supplier -%% Arguments: Client - CosEventComm::PullSupplier -%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} | -%%            {'EXCEPTION', #'TypeError'{}} | -%%            {'EXCEPTION', #'BAD_OPERATION'{}} -%%            Both exceptions from CosEventChannelAdmin!!! -%%----------------------------------------------------------- -connect_any_pull_supplier(OE_THIS, _OE_FROM, State, Client) when ?is_ANY(State) -> -    'CosNotification_Common':type_check(Client, 'CosEventComm_PullSupplier'), -    if -	?is_Connected(State) -> -	    corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}); -	true -> -	    NewState = start_timer(State), -	    {reply, ok, NewState#state{client = Client, this = OE_THIS}} -    end; -connect_any_pull_supplier(_, _, _,_) -> -    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). - -%%----- CosNotifyChannelAdmin::SequenceProxyPullConsumer ---- -%%----------------------------------------------------------% -%% function : connect_sequence_pull_supplier -%% Arguments: Client - CosNotifyComm::SequencePullSupplier -%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} | -%%            {'EXCEPTION', #'TypeError'{}} | -%%            {'EXCEPTION', #'BAD_OPERATION'{}} -%%----------------------------------------------------------- -connect_sequence_pull_supplier(OE_THIS, _OE_FROM, State, Client) when ?is_SEQUENCE(State) -> -    'CosNotification_Common':type_check(Client, 'CosNotifyComm_SequencePullSupplier'), -    if -	?is_Connected(State) -> -	    corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}); -	true -> -	    NewState = start_timer(State), -	    {reply, ok, NewState#state{client = Client, this = OE_THIS}} -    end; -connect_sequence_pull_supplier(_, _, _, _) -> -    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). - -%%----- CosNotifyChannelAdmin::StructuredProxyPullConsumer -- -%%----------------------------------------------------------% -%% function : connect_structured_pull_supplier -%% Arguments: Client - CosNotifyComm::StructuredPullSupplier -%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} | -%%            {'EXCEPTION', #'TypeError'{}} | -%%            {'EXCEPTION', #'BAD_OPERATION'{}} -%%----------------------------------------------------------- -connect_structured_pull_supplier(OE_THIS, _OE_FROM, State, Client) when ?is_STRUCTURED(State) -> -    'CosNotification_Common':type_check(Client, 'CosNotifyComm_StructuredPullSupplier'), -    if -	?is_Connected(State) -> -	    corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}); -	true -> -	    NewState = start_timer(State), -	    {reply, ok, NewState#state{client = Client, this = OE_THIS}} -    end; -connect_structured_pull_supplier(_, _, _, _)  -> -    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). - -%%----- CosNotifyChannelAdmin::*ProxyPullConsumer ----------- -%%----------------------------------------------------------% -%% function : suspend_connection -%% Arguments:  -%% Returns  : ok | {'EXCEPTION', #'ConnectionAlreadyInactive'{}} | -%%            {'EXCEPTION', #'NotConneced'{}} -%%----------------------------------------------------------- -suspend_connection(_OE_THIS, _OE_FROM, State) when ?is_Connected(State) -> -    if -	?is_Suspended(State) -> -	    corba:raise(#'CosNotifyChannelAdmin_ConnectionAlreadyInactive'{}); -	true -> -	    stop_timer(State), -	    {reply, ok, ?set_Suspended(State)} -    end; -suspend_connection(_, _, _) -> -    corba:raise(#'CosNotifyChannelAdmin_NotConnected'{}). - -%%----------------------------------------------------------% -%% function : resume_connection -%% Arguments:  -%% Returns  :  ok | {'EXCEPTION', #'ConnectionAlreadyActive'{}} | -%%            {'EXCEPTION', #'NotConneced'{}} -%%----------------------------------------------------------- -resume_connection(_OE_THIS, _OE_FROM, State) when ?is_Connected(State) -> -    if -	?is_NotSuspended(State) -> -	    corba:raise(#'CosNotifyChannelAdmin_ConnectionAlreadyActive'{}); -	true -> -	    NewState = start_timer(State), -	    {reply, ok, ?set_NotSuspended(NewState)} -    end; -resume_connection(_, _, _) -> -    corba:raise(#'CosNotifyChannelAdmin_NotConnected'{}). - -%%----- Inherit from CosNotifyChannelAdmin::ProxyConsumer --- -%%----------------------------------------------------------% -%% function : obtain_subscription_types -%% Arguments: Mode - enum 'ObtainInfoMode' (CosNotifyChannelAdmin) -%% Returns  : CosNotification::EventTypeSeq -%%----------------------------------------------------------- -obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_OFF') -> -    {reply, ?get_AllPublish(State), ?set_PublishType(State, false)}; -obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_ON') -> -    {reply, ?get_AllPublish(State), ?set_PublishType(State, true)}; -obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_OFF') -> -    {reply, [], ?set_PublishType(State, false)}; -obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_ON') -> -    {reply, [], ?set_PublishType(State, true)}; -obtain_subscription_types(_,_,_,What) -> -    orber:dbg("[~p] PullerConsumer:obtain_subscription_types(~p);~n" -	      "Incorrect enumerant", [?LINE, What], ?DEBUG_LEVEL), -    corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}). - -%%----------------------------------------------------------% -%% function : validate_event_qos -%% Arguments: RequiredQoS - CosNotification::QoSProperties -%% Returns  : ok | {'EXCEPTION', #'UnsupportedQoS'{}} -%%            AvilableQoS - CosNotification::NamedPropertyRangeSeq (out) -%%----------------------------------------------------------- -validate_event_qos(_OE_THIS, _OE_FROM, State, RequiredQoS) -> -    AvilableQoS = 'CosNotification_Common':validate_event_qos(RequiredQoS, -							      ?get_LocalQoS(State)), -    {reply, {ok, AvilableQoS}, State}. - -%%----- Inherit from CosNotification::QoSAdmin -------------- -%%----------------------------------------------------------% -%% function : get_qos -%% Arguments:  -%% Returns  :  -%%----------------------------------------------------------- -get_qos(_OE_THIS, _OE_FROM, State) -> -    {reply, ?get_GlobalQoS(State), State}.     - -%%----------------------------------------------------------% -%% function : set_qos -%% Arguments: QoS - CosNotification::QoSProperties, i.e., -%%            [#'Property'{name, value}, ...] where name eq. string() -%%            and value eq. any(). -%% Returns  : ok | {'EXCEPTION', CosNotification::UnsupportedQoS} -%%----------------------------------------------------------- -set_qos(_OE_THIS, _OE_FROM, State, QoS) -> -    {NewQoS, LQS} = 'CosNotification_Common':set_qos(QoS, ?get_BothQoS(State),  -						     proxy, ?get_MyAdmin(State),  -						     false), -    NewState = ?update_EventDB(State, LQS), -    {reply, ok, ?set_BothQoS(NewState, NewQoS, LQS)}. - -%%----------------------------------------------------------% -%% function : validate_qos -%% Arguments: Required_qos - CosNotification::QoSProperties -%%            [#'Property'{name, value}, ...] where name eq. string() -%%            and value eq. any(). -%% Returns  : {'EXCEPTION', CosNotification::UnsupportedQoS} -%%            {ok, CosNotification::NamedPropertyRangeSeq} -%%----------------------------------------------------------- -validate_qos(_OE_THIS, _OE_FROM, State, Required_qos) -> -    QoS = 'CosNotification_Common':validate_qos(Required_qos, ?get_BothQoS(State),  -						proxy, ?get_MyAdmin(State),  -						false), -    {reply, {ok, QoS}, State}. - -%%----- Inherit from CosNotifyComm::NotifyPublish ----------- -%%----------------------------------------------------------% -%% function : offer_change -%% Arguments: Added - #'CosNotification_EventType'{} -%%            Removed - #'CosNotification_EventType'{} -%% Returns  : ok |  -%%            {'EXCEPTION', #'CosNotifyComm_InvalidEventType'{}} -%%----------------------------------------------------------- -offer_change(_OE_THIS, _OE_FROM, State, Added, Removed) -> -    cosNotification_Filter:validate_types(Added),  -    cosNotification_Filter:validate_types(Removed), -    %% On this "side" we don't really care about which -    %% type of events the client will supply.  -    %% Perhaps, later on, if we want to check this against Filters -    %% associated with this object we may change this approach, i.e., if -    %% the filter will not allow passing certain event types. But the -    %% user should see to that that situation never occurs. It would add -    %% extra overhead. Also see PusherSupplier- and PullerSuppler- -    %% 'subscription_change'. -    update_publish(add, State, Added), -    update_publish(remove, State, Removed), -    case ?get_PublishType(State) of -	true -> -	    %% Perhaps we should handle exception here?! -	    %% Probably not. Better to stay "on-line". -	    catch 'CosNotifyComm_NotifySubscribe': -		subscription_change(?get_Client(State), Added, Removed), -	    ok; -	_-> -	    ok -    end,	 -    {reply, ok, State}. - -update_publish(_, _, [])-> -    ok; -update_publish(add, State, [H|T]) -> -    ?add_Publish(State, H), -    update_publish(add, State, T); -update_publish(remove, State, [H|T]) -> -    ?del_Publish(State, H), -    update_publish(remove, State, T). - -%%----- Inherit from CosNotifyFilter::FilterAdmin ----------- -%%----------------------------------------------------------% -%% function : add_filter -%% Arguments: Filter - CosNotifyFilter::Filter -%% Returns  : FilterID - long -%%----------------------------------------------------------- -add_filter(_OE_THIS, _OE_FROM, State, Filter) -> -    'CosNotification_Common':type_check(Filter, 'CosNotifyFilter_Filter'), -    FilterID = ?new_Id(State), -    NewState = ?set_IdCounter(State, FilterID), -    {reply, FilterID, ?add_Filter(NewState, FilterID, Filter)}. - -%%----------------------------------------------------------% -%% function : remove_filter -%% Arguments: FilterID - long -%% Returns  : ok -%%----------------------------------------------------------- -remove_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) -> -    {reply, ok, ?del_Filter(State, FilterID)}; -remove_filter(_,_,_,What) -> -    orber:dbg("[~p] PullerConsumer:remove_filter(~p); Not an integer",  -	      [?LINE, What], ?DEBUG_LEVEL), -    corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}). - -%%----------------------------------------------------------% -%% function : get_filter -%% Arguments: FilterID - long -%% Returns  : Filter - CosNotifyFilter::Filter | -%%            {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}} -%%----------------------------------------------------------- -get_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) -> -    {reply, ?get_Filter(State, FilterID), State}; -get_filter(_,_,_,What) -> -    orber:dbg("[~p] PullerConsumer:get_filter(~p); Not an integer",  -	      [?LINE, What], ?DEBUG_LEVEL), -    corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}). - -%%----------------------------------------------------------% -%% function : get_all_filters -%% Arguments: - -%% Returns  : Filter - CosNotifyFilter::FilterIDSeq -%%----------------------------------------------------------- -get_all_filters(_OE_THIS, _OE_FROM, State) -> -    {reply, ?get_AllFilterID(State), State}. - -%%----------------------------------------------------------% -%% function : remove_all_filters -%% Arguments: - -%% Returns  : ok -%%----------------------------------------------------------- -remove_all_filters(_OE_THIS, _OE_FROM, State) -> -    {reply, ok, ?del_AllFilter(State)}. - -%%----- Inherit from CosEventComm::PullConsumer ------------- -%%----------------------------------------------------------% -%% function : disconnect_pull_consumer -%% Arguments: - -%% Returns  : ok -%%----------------------------------------------------------- -disconnect_pull_consumer(_OE_THIS, _OE_FROM, State) -> -    {stop, normal, ok, ?set_Unconnected(State)}. - -%%----- Inherit from CosNotifyComm::SequencePullConsumer ---- -%%----------------------------------------------------------% -%% function : disconnect_sequence_pull_consumer -%% Arguments: - -%% Returns  : ok  -%%----------------------------------------------------------- -disconnect_sequence_pull_consumer(_OE_THIS, _OE_FROM, State) -> -    {stop, normal, ok, ?set_Unconnected(State)}. - -%%----- Inherit from CosNotifyComm::StructuredPullConsumer ---- -%%----------------------------------------------------------% -%% function : disconnect_structured_pull_consumer -%% Arguments: - -%% Returns  : ok -%%----------------------------------------------------------- -disconnect_structured_pull_consumer(_OE_THIS, _OE_FROM, State) -> -    {stop, normal, ok, ?set_Unconnected(State)}. - -%%--------------- LOCAL FUNCTIONS ---------------------------- -find_obj({value, {_, Obj}}) -> Obj; -find_obj(_) -> {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}. - -find_ids(List) ->           find_ids(List, []). -find_ids([], Acc) ->        Acc; -find_ids([{I,_}|T], Acc) -> find_ids(T, [I|Acc]); -find_ids(What, _) ->  -    orber:dbg("[~p] PullerConsumer:find_ids();~n" -	      "Id corrupt: ~p", [?LINE, What], ?DEBUG_LEVEL), -    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}). - -%% Delete a single object. -%% The list don not differ, i.e., no filter removed, raise exception. -delete_obj(List,List) -> corba:raise(#'CosNotifyFilter_FilterNotFound'{}); -delete_obj(List,_) -> List. - -%% Start timer which send a message each time we should pull for new events. -start_timer(State) -> -    case catch timer:send_interval(?get_PullInterval(State), pull) of -	{ok,PullTRef} -> -	    ?DBG("PULL CONSUMER STARTED PULL TIMER ~p~n", -			 [?get_PullInterval(State)]), -	    ?set_PullTimer(State, PullTRef); -	What -> -	    orber:dbg("[~p] PullerConsumer:start_timer();~n" -		      "Unable to invoke timer:send_interval/2: ~p",  -		      [?LINE, What], ?DEBUG_LEVEL), -	    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) -    end. -stop_timer(State) -> -    ?DBG("PULL CONSUMER STOPPED TIMER~n",[]), -    timer:cancel(?get_PullTimer(State)). - -%% Try pull event(s); which method is determined by which type this proxy is. -try_pull_events(State) when ?is_ANY(State) -> -    case catch 'CosEventComm_PullSupplier':try_pull(?get_Client(State)) of -	{_,false} -> -	    {noreply, State}; -	{Event, true} -> -	    case ?not_isConvertedStructured(Event) of -		true -> -		    forward(seq, State,    -			    cosNotification_eventDB:filter_events([any:get_value(Event)],  -								  ?get_AllFilter(State))); -		_ -> -		    forward(any, State,   -			    cosNotification_eventDB:filter_events([Event],  -								  ?get_AllFilter(State))) -	    end; -	_-> -	    {noreply, State} -    end; -try_pull_events(State) when ?is_SEQUENCE(State) -> -    case catch 'CosNotifyComm_SequencePullSupplier': -	try_pull_structured_events(?get_Client(State), ?get_BatchLimit(State)) of -	{_,false} -> -	    {noreply, State}; -	{EventSeq, true} -> -	    %% We cannot convert parts of the sequence to any, event though they -	    %% are converted from any to structured. Would be 'impossible' to send. -	    forward(seq, State,  -		    cosNotification_eventDB:filter_events(EventSeq,  -							  ?get_AllFilter(State))); -	_-> -	    {noreply, State} -    end; -try_pull_events(State) when ?is_STRUCTURED(State) -> -    case catch 'CosNotifyComm_StructuredPullSupplier': -	try_pull_structured_event(?get_Client(State)) of -	{_,false} -> -	    {noreply, State}; -	{Event, true} when ?not_isConvertedAny(Event) -> -	    forward(any, State,  -		    cosNotification_eventDB:filter_events([Event#'CosNotification_StructuredEvent'.remainder_of_body],  -							  ?get_AllFilter(State)));  -	{Event, true} -> -	    forward(seq, State,  -		    cosNotification_eventDB:filter_events([Event],  -							  ?get_AllFilter(State))); -	_-> -	    {noreply, State} -    end. -   - -					 -%% Forward events -forward(_, State, {[], _}) when ?is_ANDOP(State) -> -    %% Did not pass filtering. Since AND no need to pass on. -    {noreply, State}; -forward(Type, State, {[], Failed}) -> -    %% Did not pass filtering, but since OR it may pass Admin filters, hence, pass  -    %% on to Admin -    forward(Type, State, Failed, ?get_MyAdmin(State), 'MATCH'); -forward(Type, State, {Passed, _}) when ?is_ANDOP(State) -> -    %% Did pass filtering, but since AND we must pass it to Admin to check against -    %% its Filters. Just ignore the ones that failed. -    forward(Type, State, Passed, ?get_MyAdmin(State), 'MATCH'); -forward(Type, State, {Passed, []}) -> -    %% Did pass filtering, and since OR we can pass it to the Channel directly. -    forward(Type, State, Passed, ?get_MyChannel(State), 'MATCHED'); -forward(Type, State, {Passed, Failed}) -> -    %% Some passed filtering, and since OR we can pass the ones that passed directly -    %% to the channel and the other ones via the admin. -    forward(Type, State, Passed, ?get_MyChannel(State), 'MATCHED'), -    forward(Type, State, Failed, ?get_MyAdmin(State), 'MATCH'). - -forward(any, State, [Event], SendTo, Status) -> -    case catch oe_CosNotificationComm_Event:callAny(SendTo, Event, Status) of -	ok -> -	    ?DBG("PROXY FORWARD ANY: ~p~n",[Event]), -	    {noreply, State}; -	{'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') orelse -			      is_record(E, 'NO_PERMISSION') orelse -			      is_record(E, 'CosEventComm_Disconnected') -> -	    orber:dbg("[~p] PullerConsumer:forward();~n" -		      "Admin/Channel no longer exists; terminating and dropping: ~p",  -		      [?LINE, Event], ?DEBUG_LEVEL), -	    'CosNotification_Common':notify([{proxy, State#state.this}, -					     {client, ?get_Client(State)},  -					     {reason, {'EXCEPTION', E}}]), -	    {stop, normal, State}; -	R when ?is_PersistentConnection(State) -> -	    orber:dbg("[~p] PullerConsumer:forward();~n" -		      "Admin/Channel respond incorrect: ~p~n" -		      "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL), -	    {noreply, State}; -	R -> -	    orber:dbg("[~p] PullerConsumer:forward();~n" -		      "Admin/Channel respond incorrect: ~p~n" -		      "Terminating and dropping: ~p",  -		      [?LINE, R, Event], ?DEBUG_LEVEL), -	    'CosNotification_Common':notify([{proxy, State#state.this}, -					     {client, ?get_Client(State)},  -					     {reason, R}]), -	    {stop, normal, State} -    end; -forward(seq, State, Event, SendTo, Status) -> -    case catch oe_CosNotificationComm_Event:callSeq(SendTo, Event, Status) of -	ok -> -	    ?DBG("PROXY FORWARD SEQUENCE: ~p~n",[Event]), -	    {noreply, State}; -	{'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') orelse -			      is_record(E, 'NO_PERMISSION') orelse -			      is_record(E, 'CosEventComm_Disconnected') -> -	    orber:dbg("[~p] PullerConsumer:forward();~n" -		      "Admin/Channel no longer exists; terminating and dropping: ~p",  -		      [?LINE, Event], ?DEBUG_LEVEL), -	    'CosNotification_Common':notify([{proxy, State#state.this}, -					     {client, ?get_Client(State)},  -					     {reason, {'EXCEPTION', E}}]), -	    {stop, normal, State}; -	R when ?is_PersistentConnection(State) -> -	    orber:dbg("[~p] PullerConsumer:forward();~n" -		      "Admin/Channel respond incorrect: ~p~n" -		      "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL), -	    {noreply, State}; -	R -> -	    orber:dbg("[~p] PullerConsumer:forward();~n" -		      "Admin/Channel respond incorrect: ~p~n" -		      "Terminating and dropping: ~p", -		      [?LINE, R, Event], ?DEBUG_LEVEL), -	    'CosNotification_Common':notify([{proxy, State#state.this}, -					     {client, ?get_Client(State)},  -					     {reason, R}]), -	    {stop, normal, State} -    end. - -%%--------------- MISC FUNCTIONS, E.G. DEBUGGING ------------- -%%--------------- END OF MODULE ------------------------------ | 
