%%--------------------------------------------------------------------
%%
%% %CopyrightBegin%
%% 
%% Copyright Ericsson AB 1999-2009. All Rights Reserved.
%% 
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
%% compliance with the License. You should have received a copy of the
%% Erlang Public License along with this software. If not, it can be
%% retrieved online at http://www.erlang.org/.
%% 
%% Software distributed under the License is distributed on an "AS IS"
%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
%% the License for the specific language governing rights and limitations
%% under the License.
%% 
%% %CopyrightEnd%
%%
%%
%%----------------------------------------------------------------------
%% File    : PusherConsumer_impl.erl
%% Purpose : 
%%----------------------------------------------------------------------

-module('PusherConsumer_impl').

%%--------------- INCLUDES -----------------------------------
-include_lib("orber/include/corba.hrl").
-include_lib("orber/include/ifr_types.hrl").
%% cosEvent files.
-include_lib("cosEvent/include/CosEventChannelAdmin.hrl").
-include_lib("cosEvent/include/CosEventComm.hrl").
%% Application files
-include("CosNotification.hrl").
-include("CosNotifyChannelAdmin.hrl").
-include("CosNotifyComm.hrl").
-include("CosNotifyFilter.hrl").

-include("CosNotification_Definitions.hrl").

%%--------------- EXPORTS ------------------------------------
%%--------------- External -----------------------------------
%%----- CosNotifyChannelAdmin::ProxyPushConsumer -------------
-export([connect_any_push_supplier/4]).

%%----- CosNotifyChannelAdmin::SequenceProxyPushConsumer -----
-export([connect_sequence_push_supplier/4]).

%%----- CosNotifyChannelAdmin::StructuredProxyPushConsumer ---
-export([connect_structured_push_supplier/4]).

%%----- Inherit from CosNotifyChannelAdmin::ProxyConsumer ----
-export([obtain_subscription_types/4,
	 validate_event_qos/4]).

%%----- Inherit from CosNotification::QoSAdmin ---------------
-export([get_qos/3,
	 set_qos/4,
	 validate_qos/4]).

%%----- Inherit from CosNotifyComm::NotifyPublish ------------
-export([offer_change/5]).

%%----- Inherit from CosNotifyFilter::FilterAdmin ------------
-export([add_filter/4, 
	 remove_filter/4, 
	 get_filter/4,
	 get_all_filters/3, 
	 remove_all_filters/3]).

%%----- Inherit from CosEventComm::PushConsumer -------------
-export([push/4,
	 disconnect_push_consumer/3]).

%%----- Inherit from CosNotifyComm::SequencePushConsumer ----
-export([push_structured_events/4,
	 disconnect_sequence_push_consumer/3]).

%%----- Inherit from CosNotifyComm::StructuredPushConsumer --
-export([push_structured_event/4,
	 disconnect_structured_push_consumer/3]).

%%----- Inherit from CosEventChannelAdmin::ProxyPushConsumer
-export([connect_push_supplier/4]).

%% Attributes (external) CosNotifyChannelAdmin::ProxySupplier
-export(['_get_MyType'/3, 
	 '_get_MyAdmin'/3]).

%%--------------- gen_server specific exports ----------------
-export([handle_info/2, code_change/3]).
-export([init/1, terminate/2]).

%%--------------- LOCAL DEFINITIONS --------------------------
%% Data structures
-record(state, {myType,
	        myAdmin,
	        myAdminPid,
		myChannel,
		myFilters = [],
		myOperator,
		idCounter = 0,
		client,
		qosGlobal,
		qosLocal,
		publishType = false,
		etsR,
		eventDB}).

%% Data structures constructors
-define(get_InitState(_MyT, _MyA, _MyAP, _QS, _LQS, _Ch, _EDB, _MyOP), 
	#state{myType    = _MyT,
	       myAdmin   = _MyA,
	       myAdminPid= _MyAP,
	       myChannel = _Ch,
	       myOperator= _MyOP,
	       qosGlobal = _QS,
	       qosLocal  = _LQS,
	       etsR      = ets:new(oe_ets, [set, protected]),
	       eventDB   = _EDB}).

%%-------------- Data structures selectors -----------------
%% Attributes
-define(get_MyType(S),           S#state.myType).
-define(get_MyAdmin(S),          S#state.myAdmin).
-define(get_MyAdminPid(S),       S#state.myAdminPid).
-define(get_MyChannel(S),        S#state.myChannel).
-define(get_MyOperator(S),       S#state.myOperator).
%% Client Object
-define(get_Client(S),           S#state.client).
%% QoS
-define(get_GlobalQoS(S),        S#state.qosGlobal).
-define(get_LocalQoS(S),         S#state.qosLocal).
-define(get_BothQoS(S),          {S#state.qosGlobal, S#state.qosLocal}).
%% Filters
-define(get_Filter(S, I),        find_obj(lists:keysearch(I, 1, S#state.myFilters))).
-define(get_AllFilter(S),        S#state.myFilters).
-define(get_AllFilterID(S),      find_ids(S#state.myFilters)).
%% Publish
-define(get_AllPublish(S),       lists:flatten(ets:match(S#state.etsR,
							 {'$1',publish}))).
-define(get_PublishType(S),      S#state.publishType).
%% ID
-define(get_IdCounter(S),        S#state.idCounter).
%% Event
-define(get_Event(S),            cosNotification_eventDB:get_event(S#state.eventDB)).
-define(get_Events(S,M),         cosNotification_eventDB:get_events(S#state.eventDB, M)).

-define(get_EventCounter(S),     S#state.eventCounter).
%% Admin
-define(get_BatchLimit(S),       ?not_GetMaximumBatchSize((S#state.qosLocal))).

%%-------------- Data structures modifiers -----------------
%% Client Object
-define(set_Client(S,D),         S#state{client=D}).
-define(del_Client(S),           S#state{client=undefined}).
-define(set_Unconnected(S),      S#state{client=undefined}).
%% QoS
-define(set_LocalQoS(S,D),       S#state{qosLocal=D}).
-define(set_GlobalQoS(S,D),      S#state{qosGlobal=D}).
-define(set_BothQoS(S,GD,LD),    S#state{qosGlobal=GD, qosLocal=LD}).
%% Filters
-define(add_Filter(S,I,O),       S#state{myFilters=[{I,O}|S#state.myFilters]}).
-define(del_Filter(S,I),         S#state{myFilters=
					 delete_obj(lists:keydelete(I, 1, S#state.myFilters),
						    S#state.myFilters)}).
-define(del_AllFilter(S),        S#state{myFilters=[]}).
%% Publish
-define(add_Publish(S,E),        ets:insert(S#state.etsR, {E, publish})).
-define(del_Publish(S,E),        ets:delete(S#state.etsR, E)).
-define(set_PublishType(S,T),    S#state{publishType=T}).
%% ID
-define(set_IdCounter(S,V),      S#state{idCounter=V}).
-define(new_Id(S),               'CosNotification_Common':create_id(S#state.idCounter)).
%% Event
-define(add_Event(S,E),          cosNotification_eventDB:add_event(S#state.eventDB, E)).
-define(update_EventDB(S,Q),     S#state{eventDB=
					 cosNotification_eventDB:update(S#state.eventDB, Q)}).

-define(set_EventCounter(S,V),   S#state{eventCounter=V}).
-define(add_to_EventCounter(S,V),S#state{eventCounter=S#state.eventCounter+V}).
-define(reset_EventCounter(S),   S#state{eventCounter=0}).
-define(increase_EventCounter(S),S#state{eventCounter=(S#state.eventCounter+1)}).
-define(decrease_EventCounter(S),S#state{eventCounter=S#state.eventCounter-1}).
-define(add_ToEventCounter(S,A), S#state{eventCounter=(S#state.eventCounter+A)}).
-define(sub_FromEventCounter(S,_A), S#state{eventCounter=(S#state.eventCounter-_A)}).
-define(set_EventCounterTo(S,V), S#state{eventCounter=V}).

%% MISC
-define(is_ANY(S),               S#state.myType == 'PUSH_ANY').
-define(is_STRUCTURED(S),        S#state.myType == 'PUSH_STRUCTURED').
-define(is_SEQUENCE(S),          S#state.myType == 'PUSH_SEQUENCE').
-define(is_ANDOP(S),             S#state.myOperator == 'AND_OP').
-define(is_UnConnected(S),       S#state.client == undefined).
-define(is_Connected(S),         S#state.client =/= undefined).
-define(is_PersistentConnection(S),
	?not_GetConnectionReliability((S#state.qosLocal)) == ?not_Persistent).
-define(is_PersistentEvent(S),
	?not_GetEventReliability((S#state.qosLocal)) == ?not_Persistent).
-define(is_BatchLimitReached(S), S#state.eventCounter >= 
                                 ?not_GetMaximumBatchSize((S#state.qosLocal))).


%%----------------------------------------------------------%
%% function : handle_info, code_change
%% Arguments: 
%% Returns  : 
%% Effect   : Functions demanded by the gen_server module. 
%%-----------------------------------------------------------

code_change(_OldVsn, State, _Extra) ->
    {ok, State}.

handle_info(Info, State) ->
    ?DBG("INFO: ~p~n", [Info]),
    case Info of
        {'EXIT', Pid, Reason} when ?get_MyAdminPid(State)==Pid->
            ?DBG("PARENT ADMIN: ~p  TERMINATED.~n",[Reason]),
	    {stop, Reason, State};
        {'EXIT', _Pid, _Reason} ->
            ?DBG("PROXYPUSHCONSUMER: ~p  TERMINATED.~n",[_Reason]),
            {noreply, State};
        _ ->
            {noreply, State}
    end.

%%----------------------------------------------------------%
%% function : init, terminate
%% Arguments: 
%%-----------------------------------------------------------

init(['PUSH_SEQUENCE', MyAdmin, MyAdminPid, InitQoS, LQS, 
      MyChannel, Options, Operator]) ->
    process_flag(trap_exit, true),
    %% Only if MyType is 'PUSH_SEQUENCE' we need an ets to store events in.
    %% Otherwise we'll forward them at once. Why? We don't know when the next event
    %% is due.
    GCTime = 'CosNotification_Common':get_option(gcTime, Options, 
						 ?not_DEFAULT_SETTINGS),
    GCLimit = 'CosNotification_Common':get_option(gcLimit, Options, 
						  ?not_DEFAULT_SETTINGS),
    TimeRef = 'CosNotification_Common':get_option(timeService, Options, 
						  ?not_DEFAULT_SETTINGS),
    {ok, ?get_InitState('PUSH_SEQUENCE', MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel,
			cosNotification_eventDB:create_db(LQS, GCTime, GCLimit, TimeRef), 
			Operator)};
init([MyType, MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel, _Options, Operator]) ->
    process_flag(trap_exit, true),
    {ok, ?get_InitState(MyType, MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel, 
			undefined, Operator)}.

terminate(_Reason, State) when ?is_UnConnected(State) ->
    ok;
terminate(_Reason, State) ->
    Client = ?get_Client(State),
    case catch corba_object:is_nil(Client) of
	false when ?is_ANY(State) ->
	    'CosNotification_Common':disconnect('CosEventComm_PushSupplier', 
						disconnect_push_supplier,
						Client);
	false when ?is_SEQUENCE(State) ->
	    'CosNotification_Common':disconnect('CosNotifyComm_SequencePushSupplier',
						disconnect_sequence_push_supplier,
						Client);
	false when ?is_STRUCTURED(State) ->
	    'CosNotification_Common':disconnect('CosNotifyComm_StructuredPushSupplier',
						disconnect_structured_push_supplier,
						Client);
	_ ->
	    ok
    end.

%%-----------------------------------------------------------
%%----- CosNotifyChannelAdmin_ProxyConsumer attributes ------
%%-----------------------------------------------------------
%%----------------------------------------------------------%
%% Attribute: '_get_MyType'
%% Type     : readonly
%% Returns  : 
%%-----------------------------------------------------------
'_get_MyType'(_OE_THIS, _OE_FROM, State) ->
    {reply, ?get_MyType(State), State}.

%%----------------------------------------------------------%
%% Attribute: '_get_MyAdmin'
%% Type     : readonly
%% Returns  : 
%%-----------------------------------------------------------
'_get_MyAdmin'(_OE_THIS, _OE_FROM, State) ->
    {reply, ?get_MyAdmin(State), State}.

%%-----------------------------------------------------------
%%------- Exported external functions -----------------------
%%-----------------------------------------------------------
%%----- CosEventChannelAdmin::ProxyPushConsumer -------------
%%----------------------------------------------------------%
%% function : connect_push_supplier
%% Arguments: Client - CosEventComm::PushSupplier
%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} |
%%            {'EXCEPTION', #'TypeError'{}}
%%            Both exceptions from CosEventChannelAdmin!!!
%%-----------------------------------------------------------
connect_push_supplier(OE_THIS, OE_FROM, State, Client) ->
    connect_any_push_supplier(OE_THIS, OE_FROM, State, Client).

%%----- CosNotifyChannelAdmin::ProxyPushConsumer ------------
%%----------------------------------------------------------%
%% function : connect_any_push_supplier
%% Arguments: Client - CosEventComm::PushSupplier
%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} |
%%            {'EXCEPTION', #'TypeError'{}}
%%            Both exceptions from CosEventChannelAdmin!!!
%%-----------------------------------------------------------
connect_any_push_supplier(_OE_THIS, _OE_FROM, State, Client) when ?is_ANY(State) ->
    ?not_TypeCheck(Client, 'CosEventComm_PushSupplier'),
    if
	?is_Connected(State) ->
	    corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{});
	true ->
	    {reply, ok, ?set_Client(State, Client)}
    end;
connect_any_push_supplier(_, _, _, _) ->
    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).

%%----- CosNotifyChannelAdmin::SequenceProxyPushConsumer ----
%%----------------------------------------------------------%
%% function : connect_sequence_push_supplier
%% Arguments: Client - CosNotifyComm::SequencePushSupplier
%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} |
%%            {'EXCEPTION', #'TypeError'{}}
%%-----------------------------------------------------------
connect_sequence_push_supplier(_OE_THIS, _OE_FROM, State, Client) when ?is_SEQUENCE(State) ->
    ?not_TypeCheck(Client, 'CosNotifyComm_SequencePushSupplier'),
     if
	?is_Connected(State) ->
	    corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{});
	true ->
	    {reply, ok, ?set_Client(State, Client)}
    end;
connect_sequence_push_supplier(_, _, _, _) ->
    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).

%%----- CosNotifyChannelAdmin::StructuredProxyPushConsumer --
%%----------------------------------------------------------%
%% function : connect_structured_push_supplier
%% Arguments: Client - CosNotifyComm::StructuredPushSupplier
%% Returns  :  ok | {'EXCEPTION', #'AlreadyConnected'{}} |
%%            {'EXCEPTION', #'TypeError'{}}
%%-----------------------------------------------------------
connect_structured_push_supplier(_OE_THIS, _OE_FROM, State, Client) when ?is_STRUCTURED(State) ->
    ?not_TypeCheck(Client, 'CosNotifyComm_StructuredPushSupplier'),
     if
	?is_Connected(State) ->
	    corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{});
	true ->
	    {reply, ok, ?set_Client(State, Client)}
    end;
connect_structured_push_supplier(_, _, _, _) ->
    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).

%%----- Inherit from CosNotifyChannelAdmin::ProxyConsumer ---
%%----------------------------------------------------------%
%% function : obtain_subscription_types
%% Arguments: Mode - enum 'ObtainInfoMode' (CosNotifyChannelAdmin)
%% Returns  : CosNotification::EventTypeSeq
%%-----------------------------------------------------------
obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_OFF') ->
    {reply, ?get_AllPublish(State), ?set_PublishType(State, false)};
obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_ON') ->
    {reply, ?get_AllPublish(State), ?set_PublishType(State, true)};
obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_OFF') ->
    {reply, [], ?set_PublishType(State, false)};
obtain_subscription_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_ON') ->
    {reply, [], ?set_PublishType(State, true)};
obtain_subscription_types(_,_,_,What) ->
    orber:dbg("[~p] PusherConsumer:obtain_subscription_types(~p);~n"
	      "Incorrect enumerant", [?LINE, What], ?DEBUG_LEVEL),
    corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).

%%----------------------------------------------------------%
%% function : validate_event_qos
%% Arguments: RequiredQoS - CosNotification::QoSProperties
%% Returns  : ok | {'EXCEPTION', #'UnsupportedQoS'{}}
%%            AvilableQoS - CosNotification::NamedPropertyRangeSeq (out)
%%-----------------------------------------------------------
validate_event_qos(_OE_THIS, _OE_FROM, State, RequiredQoS) ->
    AvilableQoS = 'CosNotification_Common':validate_event_qos(RequiredQoS,
							      ?get_LocalQoS(State)),
    {reply, {ok, AvilableQoS}, State}.

%%----- Inherit from CosNotification::QoSAdmin --------------
%%----------------------------------------------------------%
%% function : get_qos
%% Arguments: 
%% Returns  : 
%%-----------------------------------------------------------
get_qos(_OE_THIS, _OE_FROM, State) ->
    {reply, ?get_GlobalQoS(State), State}.    

%%----------------------------------------------------------%
%% function : set_qos
%% Arguments: QoS - CosNotification::QoSProperties, i.e.,
%%            [#'Property'{name, value}, ...] where name eq. string()
%%            and value eq. any().
%% Returns  : ok | {'EXCEPTION', CosNotification::UnsupportedQoS}
%%-----------------------------------------------------------
set_qos(_OE_THIS, _OE_FROM, State, QoS) ->
    {NewQoS, LQS} = 'CosNotification_Common':set_qos(QoS, ?get_BothQoS(State), 
						     proxy, ?get_MyAdmin(State), 
						     false),
    NewState = ?update_EventDB(State, LQS),
    {reply, ok, ?set_BothQoS(NewState, NewQoS, LQS)}.

%%----------------------------------------------------------%
%% function : validate_qos
%% Arguments: Required_qos - CosNotification::QoSProperties
%%            [#'Property'{name, value}, ...] where name eq. string()
%%            and value eq. any().
%% Returns  : {'EXCEPTION', CosNotification::UnsupportedQoS}
%%            {ok, CosNotification::NamedPropertyRangeSeq}
%%-----------------------------------------------------------
validate_qos(_OE_THIS, _OE_FROM, State, Required_qos) ->
    QoS = 'CosNotification_Common':validate_qos(Required_qos, ?get_BothQoS(State), 
						proxy, ?get_MyAdmin(State), 
						false),
    {reply, {ok, QoS}, State}.

%%----- Inherit from CosNotifyComm::NotifyPublish -----------
%%----------------------------------------------------------%
%% function : offer_change
%% Arguments: Added - #'CosNotification_EventType'{}
%%            Removed - #'CosNotification_EventType'{}
%% Returns  : ok | 
%%            {'EXCEPTION', #'CosNotifyComm_InvalidEventType'{}}
%%-----------------------------------------------------------
offer_change(_OE_THIS, _OE_FROM, State, Added, Removed) ->
    cosNotification_Filter:validate_types(Added), 
    cosNotification_Filter:validate_types(Removed),
    %% On this "side" we don't really care about which
    %% type of events the client will supply. 
    %% Perhaps, later on, if we want to check this against Filters
    %% associated with this object we may change this approach, i.e., if
    %% the filter will not allow passing certain event types. But the
    %% user should see to that that situation never occurs. It would add
    %% extra overhead. Also see PusherSupplier- and PullerSuppler-
    %% 'subscription_change'.
    update_publish(add, State, Added),
    update_publish(remove, State, Removed),
    case ?get_PublishType(State) of
	true ->
	    %% Perhaps we should handle exception here?!
	    %% Probably not. Better to stay "on-line".
	    catch 'CosNotifyComm_NotifySubscribe':
			subscription_change(?get_Client(State), Added, Removed),
	    ok;
	_->
	    ok
    end,	
    {reply, ok, State}.

update_publish(_, _, [])->
    ok;
update_publish(add, State, [H|T]) ->
    ?add_Publish(State, H),
    update_publish(add, State, T);
update_publish(remove, State, [H|T]) ->
    ?del_Publish(State, H),
    update_publish(remove, State, T).
    
%%----- Inherit from CosNotifyFilter::FilterAdmin -----------
%%----------------------------------------------------------%
%% function : add_filter
%% Arguments: Filter - CosNotifyFilter::Filter
%% Returns  : FilterID - long
%%-----------------------------------------------------------
add_filter(_OE_THIS, _OE_FROM, State, Filter) ->
    'CosNotification_Common':type_check(Filter, 'CosNotifyFilter_Filter'),
    FilterID = ?new_Id(State),
    NewState = ?set_IdCounter(State, FilterID),
    {reply, FilterID, ?add_Filter(NewState, FilterID, Filter)}.

%%----------------------------------------------------------%
%% function : remove_filter
%% Arguments: FilterID - long
%% Returns  : ok
%%-----------------------------------------------------------
remove_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) ->
    {reply, ok, ?del_Filter(State, FilterID)};
remove_filter(_,_,_,What) ->
    orber:dbg("[~p] PusherConsumer:remove_filter(~p); Not an integer", 
	      [?LINE, What], ?DEBUG_LEVEL),
    corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).

%%----------------------------------------------------------%
%% function : get_filter
%% Arguments: FilterID - long
%% Returns  : Filter - CosNotifyFilter::Filter |
%%            {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}
%%-----------------------------------------------------------
get_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) ->
    {reply, ?get_Filter(State, FilterID), State};
get_filter(_,_,_,What) ->
    orber:dbg("[~p] PusherConsumer:get_filter(~p); Not an integer", 
	      [?LINE, What], ?DEBUG_LEVEL),
    corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).

%%----------------------------------------------------------%
%% function : get_all_filters
%% Arguments: -
%% Returns  : Filter - CosNotifyFilter::FilterIDSeq
%%-----------------------------------------------------------
get_all_filters(_OE_THIS, _OE_FROM, State) ->
    {reply, ?get_AllFilterID(State), State}.

%%----------------------------------------------------------%
%% function : remove_all_filters
%% Arguments: -
%% Returns  : ok
%%-----------------------------------------------------------
remove_all_filters(_OE_THIS, _OE_FROM, State) ->
    {reply, ok, ?del_AllFilter(State)}.

%%----- Inherit from CosEventComm::PushConsumer -------------
%%----------------------------------------------------------%
%% function : disconnect_push_consumer
%% Arguments: -
%% Returns  : ok
%%-----------------------------------------------------------
disconnect_push_consumer(_OE_THIS, _OE_FROM, State) ->
    {stop, normal, ok, ?set_Unconnected(State)}.

%%----------------------------------------------------------%
%% function : push
%% Arguments: AnyEvent
%% Returns  : ok | 
%%-----------------------------------------------------------
push(OE_THIS, OE_FROM, State, Event) when ?is_ANY(State) ->
    corba:reply(OE_FROM, ok),
    case {?not_isConvertedStructured(Event),
	  cosNotification_eventDB:filter_events([Event], ?get_AllFilter(State))} of
	{_, {[],_}} when ?is_ANDOP(State) ->
	    {noreply, State};
	{true, {[],[_]}} ->
	    %% Is OR and converted, change back and forward to Admin
	    forward(seq, ?get_MyAdmin(State), State, [any:get_value(Event)], 
		    'MATCH', OE_THIS);
	{_, {[],[_]}} ->
	    %% Is OR and not converted, forward to Admin
	    forward(any, ?get_MyAdmin(State), State, Event, 'MATCH', OE_THIS);
	{true, {[_],_}} when ?is_ANDOP(State) ->
	    %% Is AND and converted, change back and forward to Admin
	    forward(seq, ?get_MyAdmin(State), State, [any:get_value(Event)], 
		    'MATCH', OE_THIS);
	{true, {[_],_}} ->
	    %% Is OR and converted, change back and forward to Channel
	    forward(seq, ?get_MyChannel(State), State, [any:get_value(Event)], 
		    'MATCHED', OE_THIS);
	{_, {[_],_}} when ?is_ANDOP(State) ->
	    %% Is AND and not converted, forward to Admin
	    forward(any, ?get_MyAdmin(State), State, Event, 'MATCH', OE_THIS);
	_ ->
	    %% Is OR and not converted, forward to Channel
	    forward(any, ?get_MyChannel(State), State, Event, 'MATCHED', OE_THIS)
    end;
push(_, _, _, _) ->
    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).

%%----- Inherit from CosNotifyComm::SequencePushConsumer ----
%%----------------------------------------------------------%
%% function : disconnect_sequence_push_consumer
%% Arguments: -
%% Returns  : ok
%%-----------------------------------------------------------
disconnect_sequence_push_consumer(_OE_THIS, _OE_FROM, State) ->
    {stop, normal, ok, ?set_Unconnected(State)}.

%%----------------------------------------------------------%
%% function : push_structured_events
%% Arguments: CosNotification::EventBatch
%% Returns  : ok | 
%%-----------------------------------------------------------
push_structured_events(OE_THIS, OE_FROM, State, Events) when ?is_SEQUENCE(State) ->
    corba:reply(OE_FROM, ok),
    %% We cannot convert parts of the sequence to any, event though they
    %% are converted from any to structured. Would be 'impossible' to send.
    case cosNotification_eventDB:filter_events(Events, ?get_AllFilter(State)) of
	{[],_} when ?is_ANDOP(State) ->
	    {noreply, State};
	{[],Failed} ->
	    forward(seq, ?get_MyAdmin(State), State, Failed, 'MATCH', OE_THIS);
	{Passed, _} when ?is_ANDOP(State) ->
	    forward(seq, ?get_MyAdmin(State), State, Passed, 'MATCH', OE_THIS);
	{Passed, []} ->
	    forward(seq, ?get_MyChannel(State), State, Passed, 'MATCHED', OE_THIS);
	{Passed, Failed} ->
	    %% Is OR, send Passed to channel and Failed to Admin.
	    forward(seq, ?get_MyChannel(State), State, Passed, 'MATCHED', OE_THIS),
	    forward(seq, ?get_MyAdmin(State), State, Failed, 'MATCH', OE_THIS)
    end;
push_structured_events(_,_,_,_) ->
    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).

%%----- Inherit from CosNotifyComm::StructuredPushConsumer --
%%----------------------------------------------------------%
%% function : disconnect_structured_push_consumer
%% Arguments: -
%% Returns  : ok
%%-----------------------------------------------------------
disconnect_structured_push_consumer(_OE_THIS, _OE_FROM, State) ->
    {stop, normal, ok, ?set_Unconnected(State)}.

%%----------------------------------------------------------%
%% function : push_structured_event
%% Arguments: CosNotification::StructuredEvent
%% Returns  : ok | 
%%-----------------------------------------------------------
push_structured_event(OE_THIS, OE_FROM, State, Event) when ?is_STRUCTURED(State) ->
    corba:reply(OE_FROM, ok),
    case {?not_isConvertedAny(Event),
	  cosNotification_eventDB:filter_events([Event], ?get_AllFilter(State))} of
	{_, {[],_}} when ?is_ANDOP(State) ->
	    {noreply, State};
	{true, {[],[_]}} ->
	    %% Is OR and converted, change back and forward to Admin
	    forward(any, ?get_MyAdmin(State), State, 
		    Event#'CosNotification_StructuredEvent'.remainder_of_body, 
		    'MATCH', OE_THIS);
	{_, {[],[_]}} ->
	    %% Is OR and not converted, forward to Admin
	    forward(seq, ?get_MyAdmin(State), State, [Event], 'MATCH', OE_THIS);
	{true, {[_],_}} when ?is_ANDOP(State) ->
	    %% Is AND and converted, change back and forward to Admin
	    forward(any, ?get_MyAdmin(State), State, 
		    Event#'CosNotification_StructuredEvent'.remainder_of_body, 
		    'MATCH', OE_THIS);
	{true, {[_],_}} ->
	    %% Is OR and converted, change back and forward to Channel
	    forward(any, ?get_MyChannel(State), State, 
		    Event#'CosNotification_StructuredEvent'.remainder_of_body, 
		    'MATCHED', OE_THIS);
	{_, {[_],_}} when ?is_ANDOP(State) ->
	    %% Is AND and not converted, forward to Admin
	    forward(seq, ?get_MyAdmin(State), State, [Event], 'MATCH', OE_THIS);
	_ ->
	    %% Is OR and not converted, forward to Channel
	    forward(seq, ?get_MyChannel(State), State, [Event], 'MATCHED', OE_THIS)
    end;
push_structured_event(_,_,_,_) ->
    corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).

%%--------------- LOCAL FUNCTIONS ----------------------------
find_obj({value, {_, Obj}}) -> Obj;
find_obj(_) -> {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}.

find_ids(List) ->           find_ids(List, []).
find_ids([], Acc) ->        Acc;
find_ids([{I,_}|T], Acc) -> find_ids(T, [I|Acc]);
find_ids(What, _) ->
    orber:dbg("[~p] PusherConsumer:find_ids();~n"
	      "Id corrupt: ~p", [?LINE, What], ?DEBUG_LEVEL),
    corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}).

%% Delete a single object.
%% The list don not differ, i.e., no filter removed, raise exception.
delete_obj(List,List) -> corba:raise(#'CosNotifyFilter_FilterNotFound'{});
delete_obj(List,_) -> List.

%% Forward events
forward(any, SendTo, State, Event, Status, OE_THIS) ->
    case catch oe_CosNotificationComm_Event:callAny(SendTo, Event, Status) of
	ok ->
	    ?DBG("PROXY FORWARD ANY: ~p~n",[Event]),
	    {noreply, State};
	{'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') orelse
			      is_record(E, 'NO_PERMISSION') orelse
			      is_record(E, 'CosEventComm_Disconnected') ->
	    orber:dbg("[~p] PusherConsumer:forward();~n"
		      "Admin/Channel no longer exists; terminating and dropping: ~p", 
		      [?LINE, Event], ?DEBUG_LEVEL),
	    'CosNotification_Common':notify([{proxy, OE_THIS},
					     {client, ?get_Client(State)}, 
					     {reason, {'EXCEPTION', E}}]),
	    {stop, normal, State};
	R when ?is_PersistentConnection(State) ->
	    orber:dbg("[~p] PusherConsumer:forward();~n"
		      "Admin/Channel respond incorrect: ~p~n"
		      "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL),
	    {noreply, State};
	R ->
	    orber:dbg("[~p] PusherConsumer:forward();~n"
		      "Admin/Channel respond incorrect: ~p~n"
		      "Terminating and dropping: ~p", 
		      [?LINE, R, Event], ?DEBUG_LEVEL),
	    'CosNotification_Common':notify([{proxy, OE_THIS},
					     {client, ?get_Client(State)}, 
					     {reason, R}]),
	    {stop, normal, State}
    end;
forward(seq, SendTo, State, Event, Status, OE_THIS) ->
    case catch oe_CosNotificationComm_Event:callSeq(SendTo, Event, Status) of
	ok ->
	    {noreply, State};
	{'EXCEPTION', E} when is_record(E, 'OBJECT_NOT_EXIST') orelse
			      is_record(E, 'NO_PERMISSION') orelse
			      is_record(E, 'CosEventComm_Disconnected') ->
	    ?DBG("ADMIN NO LONGER EXIST; DROPPING: ~p~n", [Event]),
	    'CosNotification_Common':notify([{proxy, OE_THIS},
					     {client, ?get_Client(State)}, 
					     {reason, {'EXCEPTION', E}}]),
	    {stop, normal, State};
	R when ?is_PersistentConnection(State) ->
	    orber:dbg("[~p] PusherConsumer:forward();~n"
		      "Admin/Channel respond incorrect: ~p~n"
		      "Dropping: ~p", [?LINE, R, Event], ?DEBUG_LEVEL),
	    {noreply, State};
	R ->
	    orber:dbg("[~p] PusherConsumer:forward();~n"
		      "Admin/Channel respond incorrect: ~p~n"
		      "Terminating and dropping: ~p", 
		      [?LINE, R, Event], ?DEBUG_LEVEL),
	    'CosNotification_Common':notify([{proxy, OE_THIS},
					     {client, ?get_Client(State)}, 
					     {reason, R}]),
	    {stop, normal, State}
    end.

%%--------------- MISC FUNCTIONS, E.G. DEBUGGING -------------
%%--------------- END OF MODULE ------------------------------