%%--------------------------------------------------------------------
%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1999-2015. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
%%
%%----------------------------------------------------------------------
%% File : PullerSupplier_impl.erl
%% Purpose :
%%----------------------------------------------------------------------
-module('PullerSupplier_impl').
%%--------------- INCLUDES -----------------------------------
-include_lib("orber/include/corba.hrl").
-include_lib("orber/include/ifr_types.hrl").
%% cosEvent files.
-include_lib("cosEvent/include/CosEventChannelAdmin.hrl").
%% Application files
-include("CosNotification.hrl").
-include("CosNotifyChannelAdmin.hrl").
-include("CosNotifyComm.hrl").
-include("CosNotifyFilter.hrl").
-include("CosNotification_Definitions.hrl").
%%--------------- EXPORTS ------------------------------------
%%--------------- External -----------------------------------
%%----- CosNotifyChannelAdmin::ProxyPullSupplier -------------
-export([connect_any_pull_consumer/4]).
%%----- CosNotifyChannelAdmin::SequenceProxyPullSupplier -----
-export([connect_sequence_pull_consumer/4]).
%%----- CosNotifyChannelAdmin::StructuredProxyPullSupplier ---
-export([connect_structured_pull_consumer/4]).
%%----- Inherit from CosNotifyChannelAdmin::ProxySupplier ----
-export([obtain_offered_types/4,
validate_event_qos/4]).
%%----- Inherit from CosNotification::QoSAdmin ---------------
-export([get_qos/3,
set_qos/4,
validate_qos/4]).
%%----- Inherit from CosNotifyComm::NotifySubscribe ----------
-export([subscription_change/5]).
%%----- Inherit from CosNotifyFilter::FilterAdmin ------------
-export([add_filter/4,
remove_filter/4,
get_filter/4,
get_all_filters/3,
remove_all_filters/3]).
%%----- Inherit from CosEventComm::PullSupplier -------------
-export([pull/3,
try_pull/3,
disconnect_pull_supplier/3]).
%%----- Inherit from CosNotifyComm::SequencePullSupplier --
-export([pull_structured_events/4,
try_pull_structured_events/4,
disconnect_sequence_pull_supplier/3]).
%%----- Inherit from CosNotifyComm::StructuredPullSupplier --
-export([pull_structured_event/3,
try_pull_structured_event/3,
disconnect_structured_pull_supplier/3]).
%%----- Inherit from CosEventChannelAdmin::ProxyPullSupplier
-export([connect_pull_consumer/4]).
%% Attributes (external) CosNotifyChannelAdmin::ProxySupplier
-export(['_get_MyType'/3,
'_get_MyAdmin'/3,
'_get_priority_filter'/3,
'_set_priority_filter'/4,
'_get_lifetime_filter'/3,
'_set_lifetime_filter'/4]).
%%--------------- Internal -----------------------------------
%%----- Inherit from cosNotificationComm --------------------
-export([callAny/5,
callSeq/5]).
%%--------------- gen_server specific exports ----------------
-export([handle_info/2, code_change/3]).
-export([init/1, terminate/2]).
%%--------------- LOCAL DEFINITIONS --------------------------
%% Data structures
-record(state, {myType,
myAdmin,
myAdminPid,
myChannel,
myFilters = [],
myOperator,
idCounter = 0,
prioFil,
lifetFil,
client,
qosGlobal,
qosLocal,
pacingTimer,
respondTo,
subscribeType = false,
subscribeData = true,
etsR,
eventDB}).
%% Data structures constructors
-define(get_InitState(_MyT, _MyA, _MyAP, _QS, _LQS, _Ch, _MyOp, _GT, _GL, _TR),
#state{myType = _MyT,
myAdmin = _MyA,
myAdminPid= _MyAP,
myChannel = _Ch,
myOperator= _MyOp,
qosGlobal = _QS,
qosLocal = _LQS,
etsR = ets:new(oe_ets, [set, protected]),
eventDB = cosNotification_eventDB:create_db(_LQS, _GT, _GL, _TR)}).
%% Data structures selectors
%% Attributes
-define(get_MyType(S), S#state.myType).
-define(get_MyAdmin(S), S#state.myAdmin).
-define(get_MyAdminPid(S), S#state.myAdmin).
-define(get_MyChannel(S), S#state.myChannel).
-define(get_MyOperator(S), S#state.myOperator).
-define(get_PrioFil(S), S#state.prioFil).
-define(get_LifeTFil(S), S#state.lifetFil).
%% Client Object
-define(get_Client(S), S#state.client).
%% QoS
-define(get_GlobalQoS(S), S#state.qosGlobal).
-define(get_LocalQoS(S), S#state.qosLocal).
-define(get_BothQoS(S), {S#state.qosGlobal, S#state.qosLocal}).
%% Filters
-define(get_Filter(S, I), find_obj(lists:keysearch(I, 1, S#state.myFilters))).
-define(get_AllFilter(S), S#state.myFilters).
-define(get_AllFilterID(S), find_ids(S#state.myFilters)).
%% Event
-define(get_Event(S), cosNotification_eventDB:get_event(S#state.eventDB)).
-define(get_Events(S,M), cosNotification_eventDB:get_events(S#state.eventDB, M)).
-define(get_RespondTo(S), S#state.respondTo).
%% Amin
-define(get_PacingTimer(S), S#state.pacingTimer).
-define(get_PacingInterval(S), round(?not_GetPacingInterval((S#state.qosLocal))/10000000)).
-define(get_BatchLimit(S), ?not_GetMaximumBatchSize((S#state.qosLocal))).
%% Subscribe
-define(get_AllSubscribe(S), lists:flatten(ets:match(S#state.etsR,
{'$1',subscribe}))).
-define(get_SubscribeType(S), S#state.subscribeType).
-define(get_SubscribeData(S), S#state.subscribeData).
%% ID
-define(get_IdCounter(S), S#state.idCounter).
-define(get_SubscribeDB(S), S#state.etsR).
%% Data structures modifiers
%% Attributes
-define(set_PrioFil(S,D), S#state{prioFil=D}).
-define(set_LifeTFil(S,D), S#state{lifetFil=D}).
%% Client Object
-define(set_Client(S,D), S#state{client=D}).
-define(del_Client(S), S#state{client=undefined}).
%% QoS
-define(set_LocalQoS(S,D), S#state{qosLocal=D}).
-define(set_GlobalQoS(S,D), S#state{qosGlobal=D}).
-define(set_BothQoS(S,GD,LD), S#state{qosGlobal=GD, qosLocal=LD}).
-define(update_EventDB(S,Q), S#state{eventDB=
cosNotification_eventDB:update(S#state.eventDB, Q)}).
%% Filters
-define(add_Filter(S,I,O), S#state{myFilters=[{I,O}|S#state.myFilters]}).
-define(del_Filter(S,I), S#state{myFilters=
delete_obj(lists:keydelete(I, 1, S#state.myFilters),
S#state.myFilters)}).
-define(del_AllFilter(S), S#state{myFilters=[]}).
-define(set_Unconnected(S), S#state{client=undefined}).
-define(reset_RespondTo(S), S#state{respondTo=undefined}).
-define(set_RespondTo(S,F), S#state{respondTo=F}).
%% Event
-define(add_Event(S,E), catch cosNotification_eventDB:
add_event(S#state.eventDB, E, S#state.lifetFil, S#state.prioFil)).
-define(addAndGet_Event(S,E), catch cosNotification_eventDB:
add_and_get_event(S#state.eventDB, E, S#state.lifetFil, S#state.prioFil)).
%% Admin
-define(set_PacingTimer(S,T), S#state{pacingTimer=T}).
%% Subscribe
-define(add_Subscribe(S,E), ets:insert(S#state.etsR, {E, subscribe})).
-define(del_Subscribe(S,E), ets:delete(S#state.etsR, E)).
-define(set_SubscribeType(S,T), S#state{subscribeType=T}).
-define(set_SubscribeData(S,D), S#state{subscribeData=D}).
%% ID
-define(set_IdCounter(S,V), S#state{idCounter=V}).
-define(new_Id(S), 'CosNotification_Common':create_id(S#state.idCounter)).
%% MISC
-define(is_ANY(S), S#state.myType == 'PULL_ANY').
-define(is_STRUCTURED(S), S#state.myType == 'PULL_STRUCTURED').
-define(is_SEQUENCE(S), S#state.myType == 'PULL_SEQUENCE').
-define(is_ANDOP(S), S#state.myOperator == 'AND_OP').
-define(is_UnConnected(S), S#state.client == undefined).
-define(is_Connected(S), S#state.client =/= undefined).
-define(is_Waiting(S), S#state.respondTo =/= undefined).
-define(is_SubscribedFor(S,K), ets:lookup(S#state.etsR, K) =/= []).
-define(is_BatchLimitReached(S,M), cosNotification_eventDB:
status(S#state.eventDB, {batchLimit,
?not_GetMaximumBatchSize((S#state.qosLocal)), M})).
%%----------------------------------------------------------%
%% function : handle_info, code_change
%% Arguments:
%% Returns :
%% Effect : Functions demanded by the gen_server module.
%%-----------------------------------------------------------
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_info(Info, State) ->
?DBG("INFO: ~p~n", [Info]),
case Info of
{'EXIT', Pid, Reason} when ?get_MyAdminPid(State)==Pid->
?DBG("PARENT ADMIN: ~p TERMINATED.~n",[Reason]),
{stop, Reason, State};
{'EXIT', _Pid, _Reason} ->
?DBG("PROXYPUSHSUPPLIER: ~p TERMINATED.~n",[_Reason]),
{noreply, State};
{pacing, TS} when ?is_Waiting(State) ->
case ?get_PacingTimer(State) of
{_, TS} ->
?DBG("PULL SUPPLIER PACING LIMIT REACHED~n",[]),
{RespondTo, Max} = ?get_RespondTo(State),
{EventSeq, _} = ?get_Events(State, Max),
corba:reply(RespondTo, EventSeq),
{noreply, ?reset_RespondTo(State)};
_ ->
%% Must have been an old timer event, i.e., we reached the
%% Batch Limit before Pace limit and we were not able
%% to stop the timer before it triggered an event.
?DBG("PULL SUPPLIER OLD PACING LIMIT REACHED~n",[]),
{noreply, State}
end;
{pacing, _} ->
?DBG("PULL SUPPLIER PACING LIMIT REACHED BUT NO CLIENT; IMPOSSIBLE!!!~n",[]),
{noreply, State};
_ ->
{noreply, State}
end.
%%----------------------------------------------------------%
%% function : init, terminate
%% Arguments:
%%-----------------------------------------------------------
init([MyType, MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel, Options, Operator]) ->
process_flag(trap_exit, true),
GCTime = 'CosNotification_Common':get_option(gcTime, Options,
?not_DEFAULT_SETTINGS),
GCLimit = 'CosNotification_Common':get_option(gcLimit, Options,
?not_DEFAULT_SETTINGS),
TimeRef = 'CosNotification_Common':get_option(timeService, Options,
?not_DEFAULT_SETTINGS),
{ok, ?get_InitState(MyType, MyAdmin, MyAdminPid, InitQoS, LQS, MyChannel,
Operator, GCTime, GCLimit, TimeRef)}.
terminate(_Reason, State) when ?is_UnConnected(State) ->
ok;
terminate(_Reason, State) ->
Client = ?get_Client(State),
case catch corba_object:is_nil(Client) of
false when ?is_ANY(State) ->
'CosNotification_Common':disconnect('CosEventComm_PullConsumer',
disconnect_pull_consumer,
Client);
false when ?is_SEQUENCE(State) ->
'CosNotification_Common':disconnect('CosNotifyComm_SequencePullConsumer',
disconnect_sequence_pull_consumer,
Client);
false when ?is_STRUCTURED(State) ->
'CosNotification_Common':disconnect('CosNotifyComm_StructuredPullConsumer',
disconnect_structured_pull_consumer,
Client);
_ ->
ok
end.
%%-----------------------------------------------------------
%%----- CosNotifyChannelAdmin_ProxySupplier attributes ------
%%-----------------------------------------------------------
%%----------------------------------------------------------%
%% Attribute: '_get_MyType'
%% Type : readonly
%% Returns :
%%-----------------------------------------------------------
'_get_MyType'(_OE_THIS, _OE_FROM, State) ->
{reply, ?get_MyType(State), State}.
%%----------------------------------------------------------%
%% Attribute: '_get_MyAdmin'
%% Type : readonly
%% Returns :
%%-----------------------------------------------------------
'_get_MyAdmin'(_OE_THIS, _OE_FROM, State) ->
{reply, ?get_MyAdmin(State), State}.
%%----------------------------------------------------------%
%% Attribute: '_*et_priority_filter'
%% Type : read/write
%% Returns :
%%-----------------------------------------------------------
'_get_priority_filter'(_OE_THIS, _OE_FROM, State) ->
{reply, ?get_PrioFil(State), State}.
'_set_priority_filter'(_OE_THIS, _OE_FROM, State, PrioF) ->
{reply, ok, ?set_PrioFil(State, PrioF)}.
%%----------------------------------------------------------%
%% Attribute: '_*et_lifetime_filter'
%% Type : read/write
%% Returns :
%%-----------------------------------------------------------
'_get_lifetime_filter'(_OE_THIS, _OE_FROM, State) ->
{reply, ?get_LifeTFil(State), State}.
'_set_lifetime_filter'(_OE_THIS, _OE_FROM, State, LifeTF) ->
{reply, ok, ?set_LifeTFil(State, LifeTF)}.
%%-----------------------------------------------------------
%%------- Exported external functions -----------------------
%%-----------------------------------------------------------
%%----- CosEventChannelAdmin::ProxyPullSupplier -------------
%%----------------------------------------------------------%
%% function : connect_pull_consumer
%% Arguments: Client - CosEventComm::PullConsumer
%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} |
%% {'EXCEPTION', #'TypeError'{}} |
%% {'EXCEPTION', #'BAD_OPERATION'{}}
%% Both exceptions from CosEventChannelAdmin!!!
%%-----------------------------------------------------------
connect_pull_consumer(OE_THIS, OE_FROM, State, Client) ->
connect_any_pull_consumer(OE_THIS, OE_FROM, State, Client).
%%----- CosNotifyChannelAdmin::ProxyPullSupplier ------------
%%----------------------------------------------------------%
%% function : connect_any_pull_consumer
%% Arguments: Client - CosEventComm::PullConsumer
%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} |
%% {'EXCEPTION', #'TypeError'{}} |
%% {'EXCEPTION', #'BAD_OPERATION'{}}
%% Both exceptions from CosEventChannelAdmin!!!
%%-----------------------------------------------------------
connect_any_pull_consumer(_OE_THIS, _OE_FROM, State, Client) when ?is_ANY(State) ->
?not_TypeCheck(Client, 'CosEventComm_PullConsumer'),
if
?is_Connected(State) ->
corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{});
true ->
{reply, ok, ?set_Client(State, Client)}
end;
connect_any_pull_consumer(_, _, _, _) ->
corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).
%%----- CosNotifyChannelAdmin::SequenceProxyPullSupplier ----
%%----------------------------------------------------------%
%% function : connect_sequence_pull_consumer
%% Arguments: Client - CosNotifyComm::SequencePullConsumer
%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} |
%% {'EXCEPTION', #'TypeError'{}} |
%% {'EXCEPTION', #'BAD_OPERATION'{}}
%%-----------------------------------------------------------
connect_sequence_pull_consumer(_OE_THIS, _OE_FROM, State, Client) when ?is_SEQUENCE(State) ->
?not_TypeCheck(Client, 'CosNotifyComm_SequencePullConsumer'),
if
?is_Connected(State) ->
corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{});
true ->
{reply, ok, ?set_Client(State, Client)}
end;
connect_sequence_pull_consumer(_, _, _, _) ->
corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).
%%----- CosNotifyChannelAdmin::StructuredProxyPullSupplier --
%%----------------------------------------------------------%
%% function : connect_structured_pull_consumer
%% Arguments: Client - CosNotifyComm::StructuredPullConsumer
%% Returns : ok | {'EXCEPTION', #'AlreadyConnected'{}} |
%% {'EXCEPTION', #'TypeError'{}} |
%% {'EXCEPTION', #'BAD_OPERATION'{}}
%%-----------------------------------------------------------
connect_structured_pull_consumer(_OE_THIS, _OE_FROM, State, Client) when ?is_STRUCTURED(State) ->
?not_TypeCheck(Client, 'CosNotifyComm_StructuredPullConsumer'),
if
?is_Connected(State) ->
corba:raise(#'CosEventChannelAdmin_AlreadyConnected'{});
true ->
{reply, ok, ?set_Client(State, Client)}
end;
connect_structured_pull_consumer(_, _, _, _) ->
corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).
%%----- Inherit from CosNotifyChannelAdmin::ProxySupplier ---
%%----------------------------------------------------------%
%% function : obtain_offered_types
%% Arguments: Mode - enum 'ObtainInfoMode' (CosNotifyChannelAdmin)
%% Returns : CosNotification::EventTypeSeq
%%-----------------------------------------------------------
obtain_offered_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_OFF') ->
{reply, ?get_AllSubscribe(State), ?set_SubscribeType(State, false)};
obtain_offered_types(_OE_THIS, _OE_FROM, State, 'ALL_NOW_UPDATES_ON') ->
{reply, ?get_AllSubscribe(State), ?set_SubscribeType(State, true)};
obtain_offered_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_OFF') ->
{reply, [], ?set_SubscribeType(State, false)};
obtain_offered_types(_OE_THIS, _OE_FROM, State, 'NONE_NOW_UPDATES_ON') ->
{reply, [], ?set_SubscribeType(State, true)};
obtain_offered_types(_,_,_,What) ->
orber:dbg("[~p] PullerSupplier:obtain_offered_types(~p);~n"
"Incorrect enumerant", [?LINE, What], ?DEBUG_LEVEL),
corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).
%%----------------------------------------------------------%
%% function : validate_event_qos
%% Arguments: RequiredQoS - CosNotification::QoSProperties
%% Returns : ok | {'EXCEPTION', #'UnsupportedQoS'{}}
%% AvilableQoS - CosNotification::NamedPropertyRangeSeq (out)
%%-----------------------------------------------------------
validate_event_qos(_OE_THIS, _OE_FROM, State, RequiredQoS) ->
AvilableQoS = 'CosNotification_Common':validate_event_qos(RequiredQoS,
?get_LocalQoS(State)),
{reply, {ok, AvilableQoS}, State}.
%%----- Inherit from CosNotification::QoSAdmin --------------
%%----------------------------------------------------------%
%% function : get_qos
%% Arguments:
%% Returns :
%%-----------------------------------------------------------
get_qos(_OE_THIS, _OE_FROM, State) ->
{reply, ?get_GlobalQoS(State), State}.
%%----------------------------------------------------------%
%% function : set_qos
%% Arguments: QoS - CosNotification::QoSProperties, i.e.,
%% [#'Property'{name, value}, ...] where name eq. string()
%% and value eq. any().
%% Returns : ok | {'EXCEPTION', CosNotification::UnsupportedQoS}
%%-----------------------------------------------------------
set_qos(_OE_THIS, _OE_FROM, State, QoS) ->
{NewQoS, LQS} = 'CosNotification_Common':set_qos(QoS, ?get_BothQoS(State),
proxy, ?get_MyAdmin(State),
false),
NewState = ?update_EventDB(State, LQS),
{reply, ok, ?set_BothQoS(NewState, NewQoS, LQS)}.
%%----------------------------------------------------------%
%% function : validate_qos
%% Arguments: Required_qos - CosNotification::QoSProperties
%% [#'Property'{name, value}, ...] where name eq. string()
%% and value eq. any().
%% Returns : {'EXCEPTION', CosNotification::UnsupportedQoS}
%% {ok, CosNotification::NamedPropertyRangeSeq}
%%-----------------------------------------------------------
validate_qos(_OE_THIS, _OE_FROM, State, Required_qos) ->
QoS = 'CosNotification_Common':validate_qos(Required_qos, ?get_BothQoS(State),
proxy, ?get_MyAdmin(State),
false),
{reply, {ok, QoS}, State}.
%%----- Inherit from CosNotifyComm::NotifySubscribe ---------
%%----------------------------------------------------------%
%% function : subscription_change
%% Arguments: Added - Removed - CosNotification::EventTypeSeq
%% Returns : ok
%%-----------------------------------------------------------
subscription_change(_OE_THIS, _OE_FROM, State, Added, Removed) ->
cosNotification_Filter:validate_types(Added),
cosNotification_Filter:validate_types(Removed),
%% On this "side", we care about which type of events the client
%% will require, since the client (or an agent) clearly stated
%% that it's only interested in these types of events.
%% Also see PusherConsumer- and PullerConsumer-'offer_change'.
update_subscribe(remove, State, Removed),
CurrentSub = ?get_AllSubscribe(State),
NewState =
case cosNotification_Filter:check_types(Added++CurrentSub) of
true ->
%% Types supplied does in some way cause all events to be valid.
%% Smart? Would have been better to not supply any at all.
?set_SubscribeData(State, true);
{ok, Which, WC} ->
?set_SubscribeData(State, {Which, WC})
end,
update_subscribe(add, NewState, Added),
case ?get_SubscribeType(NewState) of
true ->
%% Perhaps we should handle exception here?!
%% Probably not. Better to stay "on-line".
catch 'CosNotifyComm_NotifyPublish':
offer_change(?get_Client(NewState), Added, Removed),
ok;
_->
ok
end,
{reply, ok, NewState}.
update_subscribe(_, _, [])->
ok;
update_subscribe(add, State, [H|T]) ->
?add_Subscribe(State, H),
update_subscribe(add, State, T);
update_subscribe(remove, State, [H|T]) ->
?del_Subscribe(State, H),
update_subscribe(remove, State, T).
%%----- Inherit from CosNotifyFilter::FilterAdmin -----------
%%----------------------------------------------------------%
%% function : add_filter
%% Arguments: Filter - CosNotifyFilter::Filter
%% Returns : FilterID - long
%%-----------------------------------------------------------
add_filter(_OE_THIS, _OE_FROM, State, Filter) ->
'CosNotification_Common':type_check(Filter, 'CosNotifyFilter_Filter'),
FilterID = ?new_Id(State),
NewState = ?set_IdCounter(State, FilterID),
{reply, FilterID, ?add_Filter(NewState, FilterID, Filter)}.
%%----------------------------------------------------------%
%% function : remove_filter
%% Arguments: FilterID - long
%% Returns : ok
%%-----------------------------------------------------------
remove_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) ->
{reply, ok, ?del_Filter(State, FilterID)};
remove_filter(_,_,_,What) ->
orber:dbg("[~p] PullerSupplier:remove_filter(~p); Not an integer",
[?LINE, What], ?DEBUG_LEVEL),
corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).
%%----------------------------------------------------------%
%% function : get_filter
%% Arguments: FilterID - long
%% Returns : Filter - CosNotifyFilter::Filter |
%% {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}
%%-----------------------------------------------------------
get_filter(_OE_THIS, _OE_FROM, State, FilterID) when is_integer(FilterID) ->
{reply, ?get_Filter(State, FilterID), State};
get_filter(_,_,_,What) ->
orber:dbg("[~p] PullerSupplier:get_filter(~p); Not an integer",
[?LINE, What], ?DEBUG_LEVEL),
corba:raise(#'BAD_PARAM'{completion_status=?COMPLETED_NO}).
%%----------------------------------------------------------%
%% function : get_all_filters
%% Arguments: -
%% Returns : Filter - CosNotifyFilter::FilterIDSeq
%%-----------------------------------------------------------
get_all_filters(_OE_THIS, _OE_FROM, State) ->
{reply, ?get_AllFilterID(State), State}.
%%----------------------------------------------------------%
%% function : remove_all_filters
%% Arguments: -
%% Returns : ok
%%-----------------------------------------------------------
remove_all_filters(_OE_THIS, _OE_FROM, State) ->
{reply, ok, ?del_AllFilter(State)}.
%%----- Inherit from CosEventComm::PullSupplier -------------
%%----------------------------------------------------------%
%% function : disconnect_pull_supplier
%% Arguments: -
%% Returns : ok
%%-----------------------------------------------------------
disconnect_pull_supplier(_OE_THIS, _OE_FROM, State) ->
{stop, normal, ok, ?set_Unconnected(State)}.
%%----------------------------------------------------------%
%% function : pull
%% Arguments: -
%% Returns : any - CORBA::ANY
%%-----------------------------------------------------------
pull(_OE_THIS, OE_FROM, State) when ?is_ANY(State) ->
case ?get_Event(State) of
{[], _} ->
{noreply, ?set_RespondTo(State, OE_FROM)};
{Event,_} ->
{reply, Event, State}
end;
pull(_,_,_) ->
corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).
%%----------------------------------------------------------%
%% function : try_pull
%% Arguments: -
%% Returns : any - CORBA::ANY
%% HasEvent - boolean (out-type)
%%-----------------------------------------------------------
try_pull(_OE_THIS, _OE_FROM, State) when ?is_ANY(State) ->
case ?get_Event(State) of
{[], _} ->
{reply, {any:create(orber_tc:null(), null), false}, State};
{Event, Bool} ->
{reply, {Event, Bool}, State}
end;
try_pull(_,_,_) ->
corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).
%%----- Inherit from CosNotifyComm::SequencePullSupplier ----
%%----------------------------------------------------------%
%% function : disconnect_sequence_pull_supplier
%% Arguments: -
%% Returns : ok
%%-----------------------------------------------------------
disconnect_sequence_pull_supplier(_OE_THIS, _OE_FROM, State) ->
{stop, normal, ok, ?set_Unconnected(State)}.
%%----------------------------------------------------------%
%% function : pull_structured_events
%% Arguments: Max - long()
%% Returns : [StructuredEvent, ..]
%%-----------------------------------------------------------
pull_structured_events(_OE_THIS, OE_FROM, State, Max) when ?is_SEQUENCE(State) ->
case ?is_BatchLimitReached(State, Max) of
true ->
%% This test is not fool-proof; if Events have been stored
%% using StartTime they will still be there but we cannot
%% deliver them anyway. To solve this "problem" would cost!
%% Hence, since it works fine otherwise it will do.
case ?get_Events(State, Max) of
{[], false} ->
NewState = start_timer(State),
{noreply, ?set_RespondTo(NewState, {OE_FROM, Max})};
{Event,_} ->
{reply, Event, State}
end;
_->
NewState = start_timer(State),
{noreply, ?set_RespondTo(NewState, {OE_FROM, Max})}
end;
pull_structured_events(_,_,_,_) ->
corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).
%%----------------------------------------------------------%
%% function : try_pull_structured_events
%% Arguments: Max - long()
%% Returns : [StructuredEvent, ..]
%% HasEvent - Boolean()
%%-----------------------------------------------------------
try_pull_structured_events(_OE_THIS, _OE_FROM, State, Max) when ?is_SEQUENCE(State) ->
{reply, ?get_Events(State, Max), State};
try_pull_structured_events(_,_,_,_) ->
corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).
%%----- Inherit from CosNotifyComm::StructuredPullSupplier --
%%----------------------------------------------------------%
%% function : disconnect_structured_pull_supplier
%% Arguments: -
%% Returns : ok
%%-----------------------------------------------------------
disconnect_structured_pull_supplier(_OE_THIS, _OE_FROM, State) ->
{stop, normal, ok, ?set_Unconnected(State)}.
%%----------------------------------------------------------%
%% function : pull_structured_event
%% Arguments: -
%% Returns :
%%-----------------------------------------------------------
pull_structured_event(_OE_THIS, OE_FROM, State) when ?is_STRUCTURED(State) ->
case ?get_Event(State) of
{[], _} ->
{noreply, ?set_RespondTo(State, OE_FROM)};
{Event,_} ->
{reply, Event, State}
end;
pull_structured_event(_,_,_) ->
corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).
%%----------------------------------------------------------%
%% function : try_pull_structured_event
%% Arguments: -
%% Returns :
%%-----------------------------------------------------------
try_pull_structured_event(_OE_THIS, _OE_FROM, State) when ?is_STRUCTURED(State) ->
case ?get_Event(State) of
{[], _} ->
{reply,
{?not_CreateSE("","","",[],[],any:create(orber_tc:null(), null)), false},
State};
{Event, Bool} ->
{reply, {Event, Bool}, State}
end;
try_pull_structured_event(_,_,_) ->
corba:raise(#'BAD_OPERATION'{completion_status=?COMPLETED_NO}).
%%--------------- LOCAL FUNCTIONS ----------------------------
find_obj({value, {_, Obj}}) -> Obj;
find_obj(_) -> {'EXCEPTION', #'CosNotifyFilter_FilterNotFound'{}}.
find_ids(List) -> find_ids(List, []).
find_ids([], Acc) -> Acc;
find_ids([{I,_}|T], Acc) -> find_ids(T, [I|Acc]);
find_ids(_, _) -> corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO}).
%% Delete a single object.
%% The list do not differ, i.e., no filter removed, raise exception.
delete_obj(List,List) -> corba:raise(#'CosNotifyFilter_FilterNotFound'{});
delete_obj(List,_) -> List.
%%-----------------------------------------------------------
%% function : callSeq
%% Arguments:
%% Returns :
%%-----------------------------------------------------------
callSeq(_OE_THIS, OE_FROM, State, EventsIn, Status) ->
%% We should do something here, i.e., see what QoS this Object offers and
%% act accordingly.
corba:reply(OE_FROM, ok),
case cosNotification_eventDB:validate_event(?get_SubscribeData(State), EventsIn,
?get_AllFilter(State),
?get_SubscribeDB(State),
Status) of
{[], _} ->
?DBG("PROXY NOSUBSCRIPTION SEQUENCE/STRUCTURED: ~p~n",[EventsIn]),
{noreply, State};
%% Just one event and we got a client waiting => there is no need to store
%% the event, just transform it and pass it on.
{[Event], _} when ?is_ANY(State), ?is_Waiting(State) ->
?DBG("PROXY RECEIVED SEQUENCE[1]==>ANY: ~p~n",[Event]),
AnyEvent = any:create('CosNotification_StructuredEvent':tc(),Event),
case ?addAndGet_Event(State, AnyEvent) of
{[], _} ->
?DBG("PROXY RECEIVED UNDELIVERABLE SEQUENCE[1]: ~p~n",
[Event]),
%% Cannot deliver the event at the moment; perhaps Starttime
%% set or Deadline passed.
{noreply, State};
{PossiblyOtherEvent, _} ->
?DBG("PROXY RECEIVED SEQUENCE[1] ~p; DELIVER: ~p~n",
[Event, PossiblyOtherEvent]),
corba:reply(?get_RespondTo(State), PossiblyOtherEvent),
{noreply, ?reset_RespondTo(State)}
end;
{[Event],_} when ?is_STRUCTURED(State), ?is_Waiting(State) ->
case ?addAndGet_Event(State, Event) of
{[], _} ->
?DBG("PROXY RECEIVED UNDELIVERABLE SEQUENCE[1]: ~p~n",
[Event]),
%% Cannot deliver the event at the moment; perhaps Starttime
%% set or Deadline passed.
{noreply, State};
{PossiblyOtherEvent, _} ->
?DBG("PROXY RECEIVED SEQUENCE[1] ~p; DELIVER: ~p~n",
[Event, PossiblyOtherEvent]),
corba:reply(?get_RespondTo(State), PossiblyOtherEvent),
{noreply, ?reset_RespondTo(State)}
end;
%% A sequence of events => store them and extract the first (according to QoS)
%% event and forward it.
{Events,_} when ?is_ANY(State), ?is_Waiting(State) ->
?DBG("PROXY RECEIVED SEQUENCE==>ANY: ~p~n",[Events]),
store_events(State, Events),
case ?get_Event(State) of
{[], _} ->
{noreply, State};
{AnyEvent, _} ->
corba:reply(?get_RespondTo(State), AnyEvent),
{noreply, ?reset_RespondTo(State)}
end;
{Events, _} when ?is_STRUCTURED(State), ?is_Waiting(State) ->
?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Events]),
store_events(State, Events),
case ?get_Event(State) of
{[], _} ->
{noreply, State};
{_StrEvent, _} ->
corba:reply(?get_RespondTo(State), Events),
{noreply, ?reset_RespondTo(State)}
end;
{Events, _} when ?is_SEQUENCE(State), ?is_Waiting(State) ->
?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Events]),
%% Store them first and extract Max events in QoS order.
store_events(State, Events),
{RespondTo, Max} = ?get_RespondTo(State),
case ?is_BatchLimitReached(State, Max) of
true ->
{EventSeq, _} = ?get_Events(State, Max),
corba:reply(RespondTo, EventSeq),
stop_timer(State),
{noreply, ?reset_RespondTo(State)};
_->
{noreply, State}
end;
%% No client waiting. Store the event(s).
{Events, _} ->
?DBG("PROXY RECEIVED SEQUENCE: ~p~n",[Events]),
store_events(State, Events),
{noreply, State}
end.
store_events(_State, []) ->
ok;
store_events(State, [Event|Rest]) when ?is_ANY(State) ->
AnyEvent = any:create('CosNotification_StructuredEvent':tc(),Event),
?add_Event(State,AnyEvent),
store_events(State, Rest);
store_events(State, [Event|Rest]) ->
?add_Event(State,Event),
store_events(State, Rest).
%%-----------------------------------------------------------
%% function : callAny
%% Arguments:
%% Returns :
%%-----------------------------------------------------------
callAny(_OE_THIS, OE_FROM, State, EventIn, Status) ->
corba:reply(OE_FROM, ok),
case cosNotification_eventDB:validate_event(?get_SubscribeData(State), EventIn,
?get_AllFilter(State),
?get_SubscribeDB(State),
Status) of
{[],_} ->
?DBG("PROXY NOSUBSCRIPTION ANY: ~p~n",[EventIn]),
{noreply, State};
{Event,_} when ?is_ANY(State), ?is_Waiting(State) ->
?DBG("PROXY RECEIVED ANY: ~p~n",[Event]),
case ?addAndGet_Event(State, Event) of
{[],_} ->
%% Unable to deliver the event (Starttime, Deadline etc).
{noreply, State};
{MaybeOtherEvent , _} ->
corba:reply(?get_RespondTo(State), MaybeOtherEvent),
{noreply, ?reset_RespondTo(State)}
end;
{Event,_} when ?is_ANY(State) ->
?DBG("PROXY RECEIVED ANY: ~p~n",[Event]),
?add_Event(State,Event),
{noreply, State};
{Event,_} when ?is_STRUCTURED(State), ?is_Waiting(State) ->
?DBG("PROXY RECEIVED ANY==>STRUCTURED: ~p~n",[Event]),
case ?addAndGet_Event(State, ?not_CreateSE("","%ANY","",[],[],Event)) of
{[],_} ->
%% Unable to deliver the event (Starttime, Deadline etc).
{noreply, State};
{MaybeOtherEvent , _} ->
corba:reply(?get_RespondTo(State), MaybeOtherEvent),
{noreply, ?reset_RespondTo(State)}
end;
{Event,_} when ?is_SEQUENCE(State), ?is_Waiting(State) ->
?DBG("PROXY RECEIVED ANY==>SEQUENCE[1]: ~p~n",[Event]),
?add_Event(State,?not_CreateSE("","%ANY","",[],[],Event)),
{RespondTo, Max} = ?get_RespondTo(State),
case ?is_BatchLimitReached(State, Max) of
true ->
{EventSeq, _} = ?get_Events(State, Max),
corba:reply(RespondTo, EventSeq),
stop_timer(State),
{noreply, ?reset_RespondTo(State)};
_ ->
{noreply, State}
end;
{Event,_} ->
?DBG("PROXY RECEIVED ANY==>STRUCTURED/SEQUENCE: ~p~n",[Event]),
?add_Event(State,?not_CreateSE("","%ANY","",[],[],Event)),
{noreply, State}
end.
%% Start timers which send a message each time we should push events. Only used
%% when this objects is defined to supply sequences.
start_timer(State) ->
TS = erlang:timestamp(),
case catch timer:send_after(timer:seconds(?get_PacingInterval(State)),
{pacing, TS}) of
{ok,PacTRef} ->
?DBG("PULL SUPPLIER STARTED TIMER, BATCH LIMIT: ~p~n",
[?get_BatchLimit(State)]),
?set_PacingTimer(State, {PacTRef, TS});
What ->
orber:dbg("[~p] PullerSupplier:start_timer();~n"
"Unable to invoke timer:send_interval/2: ~p",
[?LINE, What], ?DEBUG_LEVEL),
corba:raise(#'INTERNAL'{completion_status=?COMPLETED_NO})
end.
stop_timer(State) ->
case ?get_PacingTimer(State) of
undefined ->
ok;
{Timer, _} ->
?DBG("PULL SUPPLIER STOPPED TIMER~n",[]),
timer:cancel(Timer)
end.
%%--------------- MISC FUNCTIONS, E.G. DEBUGGING -------------
%%--------------- END OF MODULE ------------------------------