From 84adefa331c4159d432d22840663c38f155cd4c1 Mon Sep 17 00:00:00 2001 From: Erlang/OTP Date: Fri, 20 Nov 2009 14:54:40 +0000 Subject: The R13B03 release. --- lib/cosNotification/src/PullerConsumer_impl.erl | 773 ++++++++++++++++++++++++ 1 file changed, 773 insertions(+) create mode 100644 lib/cosNotification/src/PullerConsumer_impl.erl (limited to 'lib/cosNotification/src/PullerConsumer_impl.erl') diff --git a/lib/cosNotification/src/PullerConsumer_impl.erl b/lib/cosNotification/src/PullerConsumer_impl.erl new file mode 100644 index 0000000000..fe6f9f8968 --- /dev/null +++ b/lib/cosNotification/src/PullerConsumer_impl.erl @@ -0,0 +1,773 @@ +%%-------------------------------------------------------------------- +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 1999-2009. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% +%% +%%---------------------------------------------------------------------- +%% File : PullerConsumer_impl.erl +%% Purpose : +%%---------------------------------------------------------------------- + +-module('PullerConsumer_impl'). + +%%--------------- INCLUDES ----------------------------------- +-include_lib("orber/include/corba.hrl"). +-include_lib("orber/include/ifr_types.hrl"). +%% cosEvent files. +-include_lib("cosEvent/include/CosEventChannelAdmin.hrl"). +-include_lib("cosEvent/include/CosEventComm.hrl"). +%% Application files +-include("CosNotification.hrl"). +-include("CosNotifyChannelAdmin.hrl"). +-include("CosNotifyComm.hrl"). +-include("CosNotifyFilter.hrl"). +-include("CosNotification_Definitions.hrl"). + +%%--------------- EXPORTS ------------------------------------ +%%--------------- External ----------------------------------- +%%----- CosNotifyChannelAdmin::ProxyPullConsumer ------------- +-export([connect_any_pull_supplier/4]). + +%%----- CosNotifyChannelAdmin::SequenceProxyPullConsumer ----- +-export([connect_sequence_pull_supplier/4]). + +%%----- CosNotifyChannelAdmin::StructuredProxyPullConsumer --- +-export([connect_structured_pull_supplier/4]). + +%%----- CosNotifyChannelAdmin::*ProxyPullConsumer ------------ +-export([suspend_connection/3, + resume_connection/3]). + +%%----- Inherit from CosNotifyChannelAdmin::ProxyConsumer ---- +-export([obtain_subscription_types/4, + validate_event_qos/4]). + +%%----- Inherit from CosNotification::QoSAdmin --------------- +-export([get_qos/3, + set_qos/4, + validate_qos/4]). + +%%----- Inherit from CosNotifyComm::NotifyPublish ------------ +-export([offer_change/5]). + +%%----- Inherit from CosNotifyFilter::FilterAdmin ------------ +-export([add_filter/4, + remove_filter/4, + get_filter/4, + get_all_filters/3, + remove_all_filters/3]). + +%%----- Inherit from CosEventComm::PullConsumer ------------- +-export([disconnect_pull_consumer/3]). + +%%----- Inherit from CosNotifyComm::SequencePullConsumer ---- +-export([disconnect_sequence_pull_consumer/3]). + +%%----- Inherit from CosNotifyComm::StructuredPullConsumer -- +-export([disconnect_structured_pull_consumer/3]). + +%%----- Inherit from CosEventChannelAdmin::ProxyPullConsumer +-export([connect_pull_supplier/4]). + + +%% Attributes (external) CosNotifyChannelAdmin::ProxySupplier +-export(['_get_MyType'/3, + '_get_MyAdmin'/3]). + +%%--------------- gen_server specific exports ---------------- +-export([handle_info/2, code_change/3]). +-export([init/1, terminate/2]). + +%%--------------- LOCAL DEFINITIONS -------------------------- +%% Data structures +-record(state, {myType, + myAdmin, + myAdminPid, + myChannel, + myFilters = [], + myOperator, + idCounter = 0, + client, + qosGlobal, + qosLocal, + suspended = false, + pullTimer, + pullInterval, + publishType = false, + etsR, + eventCounter = 0, + eventDB, + this}). + +%% Data structures constructors +-define(get_InitState(_MyT, _MyA, _MyAP, _QS, _LQS, _Ch, _PI, _MyOP, _GT, _GL, _TR), + #state{myType = _MyT, + myAdmin = _MyA, + myAdminPid = _MyAP, + myChannel = _Ch, + myOperator = _MyOP, + qosGlobal = _QS, + qosLocal = _LQS, + pullInterval = _PI, + etsR = ets:new(oe_ets, [set, protected]), + eventDB = cosNotification_eventDB:create_db(_LQS, _GT, _GL, _TR)}). + +%%-------------- Data structures selectors ----------------- +%% Attributes +-define(get_MyType(S), S#state.myType). +-define(get_MyAdmin(S), S#state.myAdmin). +-define(get_MyAdminPid(S), S#state.myAdminPid). +-define(get_MyChannel(S), S#state.myChannel). +-define(get_MyOperator(S), S#state.myOperator). +%% Client Object +-define(get_Client(S), S#state.client). +%% QoS +-define(get_GlobalQoS(S), S#state.qosGlobal). +-define(get_LocalQoS(S), S#state.qosLocal). +-define(get_BothQoS(S), {S#state.qosGlobal, S#state.qosLocal}). +%% Filters +-define(get_Filter(S, I), find_obj(lists:keysearch(I, 1, S#state.myFilters))). +-define(get_AllFilter(S), S#state.myFilters). +-define(get_AllFilterID(S), find_ids(S#state.myFilters)). +%% Admin +-define(get_PullInterval(S), S#state.pullInterval). +-define(get_PullTimer(S), S#state.pullTimer). +-define(get_PacingInterval(S), round(?not_GetPacingInterval((S#state.qosLocal))/10000000)). +-define(get_BatchLimit(S), ?not_GetMaximumBatchSize((S#state.qosLocal))). +%% Publish +-define(get_AllPublish(S), lists:flatten(ets:match(S#state.etsR, + {'$1',publish}))). +-define(get_PublishType(S), S#state.publishType). +%% ID +-define(get_IdCounter(S), S#state.idCounter). +%% Event +-define(get_Event(S), cosNotification_eventDB:get_event(S#state.eventDB)). +-define(get_Events(S,M), cosNotification_eventDB:get_events(S#state.eventDB, M)). +-define(get_EventCounter(S), S#state.eventCounter). + +%%-------------- Data structures modifiers ----------------- +%% Client Object +-define(set_Client(S,D), S#state{client=D}). +-define(del_Client(S), S#state{client=undefined}). +-define(set_Suspended(S), S#state{client=true}). +-define(set_NotSuspended(S), S#state{client=false}). +-define(set_Unconnected(S), S#state{client=undefined}). +%% QoS +-define(set_LocalQoS(S,D), S#state{qosLocal=D}). +-define(set_GlobalQoS(S,D), S#state{qosGlobal=D}). +-define(set_BothQoS(S,GD,LD), S#state{qosGlobal=GD, qosLocal=LD}). +%% Filters +-define(add_Filter(S,I,O), S#state{myFilters=[{I,O}|S#state.myFilters]}). +-define(del_Filter(S,I), S#state{myFilters= + delete_obj(lists:keydelete(I, 1, S#state.myFilters), + S#state.myFilters)}). +-define(del_AllFilter(S), S#state{myFilters=[]}). +%% Admin +-define(set_PullInterval(S,V), S#state{pullInterval=V}). +-define(set_PullTimer(S,T), S#state{pullTimer=T}). +%% Publish +-define(add_Publish(S,E), ets:insert(S#state.etsR, {E, publish})). +-define(del_Publish(S,E), ets:delete(S#state.etsR, E)). +-define(set_PublishType(S,T), S#state{publishType=T}). +%% ID +-define(set_IdCounter(S,V), S#state{idCounter=V}). +-define(new_Id(S), 'CosNotification_Common':create_id(S#state.idCounter)). +%% Event +-define(add_Event(S,E), cosNotification_eventDB:add_event(S#state.eventDB, E)). +-define(update_EventDB(S,Q), S#state{eventDB= + cosNotification_eventDB:update(S#state.eventDB, Q)}). + +-define(set_EventCounter(S,V), S#state{eventCounter=V}). +-define(add_to_EventCounter(S,V),S#state{eventCounter=S#state.eventCounter+V}). +-define(reset_EventCounter(S), S#state{eventCounter=0}). +-define(increase_EventCounter(S),S#state{eventCounter=(S#state.eventCounter+1)}). +-define(decrease_EventCounter(S),S#state{eventCounter=S#state.eventCounter-1}). +-define(add_ToEventCounter(S,A), S#state{eventCounter=(S#state.eventCounter+A)}). +-define(sub_FromEventCounter(S,_A), S#state{eventCounter=(S#state.eventCounter-_A)}). +-define(set_EventCounterTo(S,V), S#state{eventCounter=V}). + +%%-------------- MISC ---------------------------------------- +-define(is_ANY(S), S#state.myType == 'PULL_ANY'). +-define(is_STRUCTURED(S), S#state.myType == 'PULL_STRUCTURED'). +-define(is_SEQUENCE(S), S#state.myType == 'PULL_SEQUENCE'). +-define(is_ANDOP(S), S#state.myOperator == 'AND_OP'). +-define(is_UnConnected(S), S#state.client == undefined). +-define(is_Connected(S), S#state.client =/= undefined). +-define(is_Suspended(S), S#state.suspended == true). +-define(is_NotSuspended(S), S#state.suspended == false). +-define(is_PersistentConnection(S), + ?not_GetConnectionReliability((S#state.qosLocal)) == ?not_Persistent). +-define(is_PersistentEvent(S), + ?not_GetEventReliability((S#state.qosLocal)) == ?not_Persistent). + +%%-----------------------------------------------------------% +%% function : handle_info, code_change +%% Arguments: +%% Returns : +%% Effect : Functions demanded by the gen_server module. +%%------------------------------------------------------------ + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +handle_info(Info, State) -> + ?DBG("INFO: ~p~n", [Info]), + case Info of + {'EXIT', Pid, Reason} when ?get_MyAdminPid(State) == Pid-> + ?DBG("PARENT ADMIN: ~p TERMINATED.~n",[Reason]), + {stop, Reason, State}; + {'EXIT', _Pid, _Reason} -> + ?DBG("PROXYPUSHSUPPLIER: ~p TERMINATED.~n",[_Reason]), + {noreply, State}; + pull -> + try_pull_events(State); + _ -> + {noreply, State} + end. + +%%----------------------------------------------------------% +%% function : init, terminate +%% Arguments: +%%----------------------------------------------------------- + +init([MyType, MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel, Options, Operator]) -> + process_flag(trap_exit, true), + Secs = timer:seconds('CosNotification_Common':get_option(pullInterval, + Options, + ?not_DEFAULT_SETTINGS)), + GCTime = 'CosNotification_Common':get_option(gcTime, Options, + ?not_DEFAULT_SETTINGS), + GCLimit = 'CosNotification_Common':get_option(gcLimit, Options, + ?not_DEFAULT_SETTINGS), + TimeRef = 'CosNotification_Common':get_option(timeService, Options, + ?not_DEFAULT_SETTINGS), + timer:start(), + {ok, ?get_InitState(MyType, MyAdmin, MyAdminPid, InitQoS, + LQS, MyChannel, Secs, Operator, GCTime, + GCLimit, TimeRef)}. + +terminate(_Reason, State) when ?is_UnConnected(State) -> + %% We are currently not connected to a client. Hence, no need for sending + %% a disconnect request. + stop_timer(State), + ok; +terminate(_Reason, State) when ?is_ANY(State) -> + stop_timer(State), + 'CosNotification_Common':disconnect('CosEventComm_PullSupplier', + disconnect_pull_supplier, + ?get_Client(State)); +terminate(_Reason, State) when ?is_SEQUENCE(State) -> + stop_timer(State), + 'CosNotification_Common':disconnect('CosNotifyComm_SequencePullSupplier', + disconnect_sequence_pull_supplier, + ?get_Client(State)); +terminate(_Reason, State) when ?is_STRUCTURED(State) -> + stop_timer(State), + 'CosNotification_Common':disconnect('CosNotifyComm_StructuredPullSupplier', + disconnect_structured_pull_supplier, + ?get_Client(State)). + +%%----------------------------------------------------------- +%%----- CosNotifyChannelAdmin_ProxyConsumer attributes ------ +%%----------------------------------------------------------- +%%----------------------------------------------------------% +%% Attribute: '_get_MyType' +%% Type : readonly +%% Returns : +%%----------------------------------------------------------- +'_get_MyType'(_OE_THIS, _OE_FROM, State) -> + {reply, ?get_MyType(State), State}. + +%%----------------------------------------------------------% +%% Attribute: '_get_MyAdmin' +%% Type : readonly +%% Returns : +%%----------------------------------------------------------- +'_get_MyAdmin'(_OE_THIS, _OE_FROM, State) -> + {reply, ?get_MyAdmin(State), State}. + +%%----------------------------------------------------------- +%%------- Exported external functions ----------------------- +%%----------------------------------------------------------- +%%----- CosEventChannelAdmin::ProxyPullConsumer ------------- +%%----------------------------------------------------------% +%% function : connect_pull_supplier +%% Arguments: Client - CosEventComm::PullSupplier +%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} | +%% {'EXCEPTION', #'TypeError'{}} | +%% {'EXCEPTION', #'BAD_OPERATION'{}} +%% Both exceptions from CosEventChannelAdmin!!! +%%----------------------------------------------------------- +connect_pull_supplier(OE_THIS, OE_FROM, State, Client) -> + connect_any_pull_supplier(OE_THIS, OE_FROM, State, Client). + +%%----- CosNotifyChannelAdmin::ProxyPullConsumer ------------ +%%----------------------------------------------------------% +%% function : connect_any_pull_supplier +%% Arguments: Client - CosEventComm::PullSupplier +%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} | +%% {'EXCEPTION', #'TypeError'{}} | +%% {'EXCEPTION', #'BAD_OPERATION'{}} +%% Both exceptions from CosEventChannelAdmin!!! +%%----------------------------------------------------------- +connect_any_pull_supplier(OE_THIS, _OE_FROM, State, Client) when ?is_ANY(State) -> + 'CosNotification_Common':type_check(Client, 'CosEventComm_PullSupplier'), + if + ?is_Connected(State) -> + corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}); + true -> + NewState = start_timer(State), + {reply, ok, NewState#state{client = Client, this = OE_THIS}} + end; +connect_any_pull_supplier(_, _, _,_) -> + corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). + +%%----- CosNotifyChannelAdmin::SequenceProxyPullConsumer ---- +%%----------------------------------------------------------% +%% function : connect_sequence_pull_supplier +%% Arguments: Client - CosNotifyComm::SequencePullSupplier +%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} | +%% {'EXCEPTION', #'TypeError'{}} | +%% {'EXCEPTION', #'BAD_OPERATION'{}} +%%----------------------------------------------------------- +connect_sequence_pull_supplier(OE_THIS, _OE_FROM, State, Client) when ?is_SEQUENCE(State) -> + 'CosNotification_Common':type_check(Client, 'CosNotifyComm_SequencePullSupplier'), + if + ?is_Connected(State) -> + corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}); + true -> + NewState = start_timer(State), + {reply, ok, NewState#state{client = Client, this = OE_THIS}} + end; +connect_sequence_pull_supplier(_, _, _, _) -> + corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). + +%%----- CosNotifyChannelAdmin::StructuredProxyPullConsumer -- +%%----------------------------------------------------------% +%% function : connect_structured_pull_supplier +%% Arguments: Client - CosNotifyComm::StructuredPullSupplier +%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} | +%% {'EXCEPTION', #'TypeError'{}} | +%% {'EXCEPTION', #'BAD_OPERATION'{}} +%%----------------------------------------------------------- +connect_structured_pull_supplier(OE_THIS, _OE_FROM, State, Client) when ?is_STRUCTURED(State) -> + 'CosNotification_Common':type_check(Client, 'CosNotifyComm_StructuredPullSupplier'), + if + ?is_Connected(State) -> + corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{}); + true -> + NewState = start_timer(State), + {reply, ok, NewState#state{client = Client, this = OE_THIS}} + end; +connect_structured_pull_supplier(_, _, _, _) -> + corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}). + +%%----- CosNotifyChannelAdmin::*ProxyPullConsumer ----------- +%%----------------------------------------------------------% +%% function : suspend_connection +%% Arguments: +%% Returns : ok | {'EXCEPTION', #'ConnectionAlreadyInactive'{}} | +%% {'EXCEPTION', #'NotConneced'{}} +%%----------------------------------------------------------- +suspend_connection(_OE_THIS, _OE_FROM, State) when ?is_Connected(State) -> + if + ?is_Suspended(State) -> + corba:raise(#'CosNotifyChannelAdmin_ConnectionAlreadyInactive'{}); + true -> + stop_timer(State), + {reply, ok, ?set_Suspended(State)} + end; +suspend_connection(_, _, _) -> + corba:raise(#'CosNotifyChannelAdmin_NotConnected'{}). + +%%----------------------------------------------------------% +%% function : resume_connection +%% Arguments: +%% Returns : ok | {'EXCEPTION', #'ConnectionAlreadyActive'{}} | +%% {'EXCEPTION', #'NotConneced'{}} +%%----------------------------------------------------------- +resume_connection(_OE_THIS, _OE_FROM, State) when ?is_Connected(State) -> + if + ?is_NotSuspended(State) -> + corba:raise(#'CosNotifyChannelAdmin_ConnectionAlreadyActive'{}); + true -> + NewState = start_timer(State), + {reply, ok, ?set_NotSuspended(NewState)} + end; +resume_connection(_, _, _) -> + corba:raise(#'CosNotifyChannelAdmin_NotConnected'{}). + +%%----- Inherit from CosNotifyChannelAdmin::ProxyConsumer --- +%%----------------------------------------------------------% +%% function : obtain_subscription_types +%% Arguments: Mode - enum 'ObtainInfoMode' (CosNotifyChannelAdmin) +%% Returns : CosNotification::EventTypeSeq +%%----------------------------------------------------------- +obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_OFF') -> + {reply, ?get_AllPublish(State), ?set_PublishType(State, false)}; +obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_ON') -> + {reply, ?get_AllPublish(State), ?set_PublishType(State, true)}; +obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_OFF') -> + {reply, [], ?set_PublishType(State, false)}; +obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_ON') -> + {reply, [], ?set_PublishType(State, true)}; +obtain_subscription_types(_,_,_,What) -> + orber:dbg("[~p] PullerConsumer:obtain_subscription_types(~p);~n" + "Incorrect enumerant", [?LINE, What], ?DEBUG_LEVEL), + corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}). + +%%----------------------------------------------------------% +%% function : validate_event_qos +%% Arguments: RequiredQoS - CosNotification::QoSProperties +%% Returns : ok | {'EXCEPTION', #'UnsupportedQoS'{}} +%% AvilableQoS - CosNotification::NamedPropertyRangeSeq (out) +%%----------------------------------------------------------- +validate_event_qos(_OE_THIS, _OE_FROM, State, RequiredQoS) -> + AvilableQoS = 'CosNotification_Common':validate_event_qos(RequiredQoS, + ?get_LocalQoS(State)), + {reply, {ok, AvilableQoS}, State}. + +%%----- Inherit from CosNotification::QoSAdmin -------------- +%%----------------------------------------------------------% +%% function : get_qos +%% Arguments: +%% Returns : +%%----------------------------------------------------------- +get_qos(_OE_THIS, _OE_FROM, State) -> + {reply, ?get_GlobalQoS(State), State}. + +%%----------------------------------------------------------% +%% function : set_qos +%% Arguments: QoS - CosNotification::QoSProperties, i.e., +%% [#'Property'{name, value}, ...] where name eq. string() +%% and value eq. any(). +%% Returns : ok | {'EXCEPTION', CosNotification::UnsupportedQoS} +%%----------------------------------------------------------- +set_qos(_OE_THIS, _OE_FROM, State, QoS) -> + {NewQoS, LQS} = 'CosNotification_Common':set_qos(QoS, ?get_BothQoS(State), + proxy, ?get_MyAdmin(State), + false), + NewState = ?update_EventDB(State, LQS), + {reply, ok, ?set_BothQoS(NewState, NewQoS, LQS)}. + +%%----------------------------------------------------------% +%% function : validate_qos +%% Arguments: Required_qos - CosNotification::QoSProperties +%% [#'Property'{name, value}, ...] where name eq. string() +%% and value eq. any(). +%% Returns : {'EXCEPTION', CosNotification::UnsupportedQoS} +%% {ok, CosNotification::NamedPropertyRangeSeq} +%%----------------------------------------------------------- +validate_qos(_OE_THIS, _OE_FROM, State, Required_qos) -> + QoS = 'CosNotification_Common':validate_qos(Required_qos, ?get_BothQoS(State), + proxy, ?get_MyAdmin(State), + false), + {reply, {ok, QoS}, State}. + +%%----- Inherit from CosNotifyComm::NotifyPublish ----------- +%%----------------------------------------------------------% +%% function : offer_change +%% Arguments: Added - #'CosNotification_EventType'{} +%% Removed - #'CosNotification_EventType'{} +%% Returns : ok | +%% {'EXCEPTION', #'CosNotifyComm_InvalidEventType'{}} +%%----------------------------------------------------------- +offer_change(_OE_THIS, _OE_FROM, State, Added, Removed) -> + cosNotification_Filter:validate_types(Added), + cosNotification_Filter:validate_types(Removed), + %% On this "side" we don't really care about which + %% type of events the client will supply. + %% Perhaps, later on, if we want to check this against Filters + %% associated with this object we may change this approach, i.e., if + %% the filter will not allow passing certain event types. But the + %% user should see to that that situation never occurs. It would add + %% extra overhead. Also see PusherSupplier- and PullerSuppler- + %% 'subscription_change'. + update_publish(add, State, Added), + update_publish(remove, State, Removed), + case ?get_PublishType(State) of + true -> + %% Perhaps we should handle exception here?! + %% Probably not. Better to stay "on-line". + catch 'CosNotifyComm_NotifySubscribe': + subscription_change(?get_Client(State), Added, Removed), + ok; + _-> + ok + end, + {reply, ok, State}. + +update_publish(_, _, [])-> + ok; +update_publish(add, State, [H|T]) -> + ?add_Publish(State, H), + update_publish(add, State, T); +update_publish(remove, State, [H|T]) -> + ?del_Publish(State, H), + update_publish(remove, State, T). + +%%----- Inherit from CosNotifyFilter::FilterAdmin ----------- +%%----------------------------------------------------------% +%% function : add_filter +%% Arguments: Filter - CosNotifyFilter::Filter +%% Returns : FilterID - long +%%----------------------------------------------------------- +add_filter(_OE_THIS, _OE_FROM, State, Filter) -> + 'CosNotification_Common':type_check(Filter, 'CosNotifyFilter_Filter'), + FilterID = ?new_Id(State), + NewState = ?set_IdCounter(State, FilterID), + {reply, FilterID, ?add_Filter(NewState, FilterID, Filter)}. + +%%----------------------------------------------------------% +%% function : remove_filter +%% Arguments: FilterID - long +%% Returns : ok +%%----------------------------------------------------------- +remove_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) -> + {reply, ok, ?del_Filter(State, FilterID)}; +remove_filter(_,_,_,What) -> + orber:dbg("[~p] PullerConsumer:remove_filter(~p); Not an integer", + [?LINE, What], ?DEBUG_LEVEL), + corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}). + +%%----------------------------------------------------------% +%% function : get_filter +%% Arguments: FilterID - long +%% Returns : Filter - CosNotifyFilter::Filter | +%% {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}} +%%----------------------------------------------------------- +get_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) -> + {reply, ?get_Filter(State, FilterID), State}; +get_filter(_,_,_,What) -> + orber:dbg("[~p] PullerConsumer:get_filter(~p); Not an integer", + [?LINE, What], ?DEBUG_LEVEL), + corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}). + +%%----------------------------------------------------------% +%% function : get_all_filters +%% Arguments: - +%% Returns : Filter - CosNotifyFilter::FilterIDSeq +%%----------------------------------------------------------- +get_all_filters(_OE_THIS, _OE_FROM, State) -> + {reply, ?get_AllFilterID(State), State}. + +%%----------------------------------------------------------% +%% function : remove_all_filters +%% Arguments: - +%% Returns : ok +%%----------------------------------------------------------- +remove_all_filters(_OE_THIS, _OE_FROM, State) -> + {reply, ok, ?del_AllFilter(State)}. + +%%----- Inherit from CosEventComm::PullConsumer ------------- +%%----------------------------------------------------------% +%% function : disconnect_pull_consumer +%% Arguments: - +%% Returns : ok +%%----------------------------------------------------------- +disconnect_pull_consumer(_OE_THIS, _OE_FROM, State) -> + {stop, normal, ok, ?set_Unconnected(State)}. + +%%----- Inherit from CosNotifyComm::SequencePullConsumer ---- +%%----------------------------------------------------------% +%% function : disconnect_sequence_pull_consumer +%% Arguments: - +%% Returns : ok +%%----------------------------------------------------------- +disconnect_sequence_pull_consumer(_OE_THIS, _OE_FROM, State) -> + {stop, normal, ok, ?set_Unconnected(State)}. + +%%----- Inherit from CosNotifyComm::StructuredPullConsumer ---- +%%----------------------------------------------------------% +%% function : disconnect_structured_pull_consumer +%% Arguments: - +%% Returns : ok +%%----------------------------------------------------------- +disconnect_structured_pull_consumer(_OE_THIS, _OE_FROM, State) -> + {stop, normal, ok, ?set_Unconnected(State)}. + +%%--------------- LOCAL FUNCTIONS ---------------------------- +find_obj({value, {_, Obj}}) -> Obj; +find_obj(_) -> {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}. + +find_ids(List) -> find_ids(List, []). +find_ids([], Acc) -> Acc; +find_ids([{I,_}|T], Acc) -> find_ids(T, [I|Acc]); +find_ids(What, _) -> + orber:dbg("[~p] PullerConsumer:find_ids();~n" + "Id corrupt: ~p", [?LINE, What], ?DEBUG_LEVEL), + corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}). + +%% Delete a single object. +%% The list don not differ, i.e., no filter removed, raise exception. +delete_obj(List,List) -> corba:raise(#'CosNotifyFilter_FilterNotFound'{}); +delete_obj(List,_) -> List. + +%% Start timer which send a message each time we should pull for new events. +start_timer(State) -> + case catch timer:send_interval(?get_PullInterval(State), pull) of + {ok,PullTRef} -> + ?DBG("PULL CONSUMER STARTED PULL TIMER ~p~n", + [?get_PullInterval(State)]), + ?set_PullTimer(State, PullTRef); + What -> + orber:dbg("[~p] PullerConsumer:start_timer();~n" + "Unable to invoke timer:send_interval/2: ~p", + [?LINE, What], ?DEBUG_LEVEL), + corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}) + end. +stop_timer(State) -> + ?DBG("PULL CONSUMER STOPPED TIMER~n",[]), + timer:cancel(?get_PullTimer(State)). + +%% Try pull event(s); which method is determined by which type this proxy is. +try_pull_events(State) when ?is_ANY(State) -> + case catch 'CosEventComm_PullSupplier':try_pull(?get_Client(State)) of + {_,false} -> + {noreply, State}; + {Event, true} -> + case ?not_isConvertedStructured(Event) of + true -> + forward(seq, State, + cosNotification_eventDB:filter_events([any:get_value(Event)], + ?get_AllFilter(State))); + _ -> + forward(any, State, + cosNotification_eventDB:filter_events([Event], + ?get_AllFilter(State))) + end; + _-> + {noreply, State} + end; +try_pull_events(State) when ?is_SEQUENCE(State) -> + case catch 'CosNotifyComm_SequencePullSupplier': + try_pull_structured_events(?get_Client(State), ?get_BatchLimit(State)) of + {_,false} -> + {noreply, State}; + {EventSeq, true} -> + %% We cannot convert parts of the sequence to any, event though they + %% are converted from any to structured. Would be 'impossible' to send. + forward(seq, State, + cosNotification_eventDB:filter_events(EventSeq, + ?get_AllFilter(State))); + _-> + {noreply, State} + end; +try_pull_events(State) when ?is_STRUCTURED(State) -> + case catch 'CosNotifyComm_StructuredPullSupplier': + try_pull_structured_event(?get_Client(State)) of + {_,false} -> + {noreply, State}; + {Event, true} when ?not_isConvertedAny(Event) -> + forward(any, State, + cosNotification_eventDB:filter_events([Event#'CosNotification_StructuredEvent'.remainder_of_body], + ?get_AllFilter(State))); + {Event, true} -> + forward(seq, State, + cosNotification_eventDB:filter_events([Event], + ?get_AllFilter(State))); + _-> + {noreply, State} + end. + + + +%% Forward events +forward(_, State, {[], _}) when ?is_ANDOP(State) -> + %% Did not pass filtering. Since AND no need to pass on. + {noreply, State}; +forward(Type, State, {[], Failed}) -> + %% Did not pass filtering, but since OR it may pass Admin filters, hence, pass + %% on to Admin + forward(Type, State, Failed, ?get_MyAdmin(State), 'MATCH'); +forward(Type, State, {Passed, _}) when ?is_ANDOP(State) -> + %% Did pass filtering, but since AND we must pass it to Admin to check against + %% its Filters. Just ignore the ones that failed. + forward(Type, State, Passed, ?get_MyAdmin(State), 'MATCH'); +forward(Type, State, {Passed, []}) -> + %% Did pass filtering, and since OR we can pass it to the Channel directly. + forward(Type, State, Passed, ?get_MyChannel(State), 'MATCHED'); +forward(Type, State, {Passed, Failed}) -> + %% Some passed filtering, and since OR we can pass the ones that passed directly + %% to the channel and the other ones via the admin. + forward(Type, State, Passed, ?get_MyChannel(State), 'MATCHED'), + forward(Type, State, Failed, ?get_MyAdmin(State), 'MATCH'). + +forward(any, State, [Event], SendTo, Status) -> + case catch oe_CosNotificationComm_Event:callAny(SendTo, Event, Status) of + ok -> + ?DBG("PROXY FORWARD ANY: ~p~n",[Event]), + {noreply, State}; + {'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') orelse + is_record(E, 'NO_PERMISSION') orelse + is_record(E, 'CosEventComm_Disconnected') -> + orber:dbg("[~p] PullerConsumer:forward();~n" + "Admin/Channel no longer exists; terminating and dropping: ~p", + [?LINE, Event], ?DEBUG_LEVEL), + 'CosNotification_Common':notify([{proxy, State#state.this}, + {client, ?get_Client(State)}, + {reason, {'EXCEPTION', E}}]), + {stop, normal, State}; + R when ?is_PersistentConnection(State) -> + orber:dbg("[~p] PullerConsumer:forward();~n" + "Admin/Channel respond incorrect: ~p~n" + "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL), + {noreply, State}; + R -> + orber:dbg("[~p] PullerConsumer:forward();~n" + "Admin/Channel respond incorrect: ~p~n" + "Terminating and dropping: ~p", + [?LINE, R, Event], ?DEBUG_LEVEL), + 'CosNotification_Common':notify([{proxy, State#state.this}, + {client, ?get_Client(State)}, + {reason, R}]), + {stop, normal, State} + end; +forward(seq, State, Event, SendTo, Status) -> + case catch oe_CosNotificationComm_Event:callSeq(SendTo, Event, Status) of + ok -> + ?DBG("PROXY FORWARD SEQUENCE: ~p~n",[Event]), + {noreply, State}; + {'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') orelse + is_record(E, 'NO_PERMISSION') orelse + is_record(E, 'CosEventComm_Disconnected') -> + orber:dbg("[~p] PullerConsumer:forward();~n" + "Admin/Channel no longer exists; terminating and dropping: ~p", + [?LINE, Event], ?DEBUG_LEVEL), + 'CosNotification_Common':notify([{proxy, State#state.this}, + {client, ?get_Client(State)}, + {reason, {'EXCEPTION', E}}]), + {stop, normal, State}; + R when ?is_PersistentConnection(State) -> + orber:dbg("[~p] PullerConsumer:forward();~n" + "Admin/Channel respond incorrect: ~p~n" + "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL), + {noreply, State}; + R -> + orber:dbg("[~p] PullerConsumer:forward();~n" + "Admin/Channel respond incorrect: ~p~n" + "Terminating and dropping: ~p", + [?LINE, R, Event], ?DEBUG_LEVEL), + 'CosNotification_Common':notify([{proxy, State#state.this}, + {client, ?get_Client(State)}, + {reason, R}]), + {stop, normal, State} + end. + +%%--------------- MISC FUNCTIONS, E.G. DEBUGGING ------------- +%%--------------- END OF MODULE ------------------------------ -- cgit v1.2.3