aboutsummaryrefslogtreecommitdiffstats
path: root/lib/cosNotification/src/cosNotification_eventDB.erl
diff options
context:
space:
mode:
authorErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
committerErlang/OTP <[email protected]>2009-11-20 14:54:40 +0000
commit84adefa331c4159d432d22840663c38f155cd4c1 (patch)
treebff9a9c66adda4df2106dfd0e5c053ab182a12bd /lib/cosNotification/src/cosNotification_eventDB.erl
downloadotp-84adefa331c4159d432d22840663c38f155cd4c1.tar.gz
otp-84adefa331c4159d432d22840663c38f155cd4c1.tar.bz2
otp-84adefa331c4159d432d22840663c38f155cd4c1.zip
The R13B03 release.OTP_R13B03
Diffstat (limited to 'lib/cosNotification/src/cosNotification_eventDB.erl')
-rw-r--r--lib/cosNotification/src/cosNotification_eventDB.erl1350
1 files changed, 1350 insertions, 0 deletions
diff --git a/lib/cosNotification/src/cosNotification_eventDB.erl b/lib/cosNotification/src/cosNotification_eventDB.erl
new file mode 100644
index 0000000000..89332d53f2
--- /dev/null
+++ b/lib/cosNotification/src/cosNotification_eventDB.erl
@@ -0,0 +1,1350 @@
+%%--------------------------------------------------------------------
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2000-2009. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+%%
+%%----------------------------------------------------------------------
+%% File : cosNotification_eventDB.erl
+%% Purpose :
+%% Purpose : This module is supposed to centralize Event storage.
+%% Comments:
+%% * Setting Order Policy to AnyOrder eq. Priority order
+%%
+%% * Setting Discard Policy to AnyOrder eq. RejectNewEvents.
+%%
+%% * DB ordering: Since the deliver- and discard-order may differ we need
+%% two ets-tables, both of type 'ordered_set'. They contain:
+%% - table 1 (T1): deliver order key and the associated event.
+%% - table 2 (T2): discard order key.
+%%
+%% When adding a new event we add, if necessary, related keys in T2.
+%% For example, if we should discard events in FIFO order, the delivery
+%% order may be set to Priority order. If the Max Event limit is reached
+%% we first look in T2 to find out which event to discard by using and
+%% reorder the key elements. T2 gives {TimeStamp, Priority}, which is used
+%% to lookup in T1 as {Priority, TimeStamp}.
+%% A TimeStamp is always included in the DB keys, even if FIFO or LIFO
+%% is used, since lots of events probably will have the same prioity and
+%% with a little bit of bad luck some events will never be delivered.
+%%
+%% Note: deliver order AnyOrder and PriorityOrder is equal since the later
+%% is defined as default.
+%% discard order AnyOrder and RejectNewEvents is equal since the later
+%% is defined as default.
+%% The keys used is ('-' indicates T2 is not needed and, thus, not instantiated):
+%%
+%% T1 policy T1 Key T2 Policy T2 Key
+%% ------------------------------------------------------------------
+%% DeadlineOrder {DL, Key, Prio} PriorityOrder {Prio, Key, DL}
+%% DeadlineOrder {DL, Key} FifoOrder {Key, DL}
+%% DeadlineOrder {DL, Key} LifoOrder {Key, DL}
+%% DeadlineOrder {DL, Key} RejectNewEvents -
+%% DeadlineOrder {DL, Key} DeadlineOrder -
+%% FifoOrder {Key, DL} DeadlineOrder {DL, Key}
+%% FifoOrder {Key, Prio} PriorityOrder {Prio, Key}
+%% FifoOrder Key RejectNewEvents -
+%% FifoOrder Key Fifo -
+%% FifoOrder Key Lifo -
+%% PriorityOrder {Prio, Key, DL} DeadlineOrder {DL, Key, Prio}
+%% PriorityOrder {Prio, Key} Fifo {Key, Prio}
+%% PriorityOrder {Prio, Key} Lifo {Key, Prio}
+%% PriorityOrder {Prio, Key} RejectNewEvents -
+%% ------------------------------------------------------------------
+%% DL == Deadline, Key == TimeStamp, Prio == Priority
+%%
+%% NOTE: If defined, the Discard DB Keys are the same as in Event DB, except
+%% that the first and last Key change place. {K1,K2}<->{K2,K1} and
+%% {K1,K2,K3}<->{K3,K2,K1}.
+%%----------------------------------------------------------------------
+
+-module(cosNotification_eventDB).
+
+
+%%--------------- INCLUDES -----------------------------------
+-include_lib("orber/include/corba.hrl").
+-include_lib("orber/include/ifr_types.hrl").
+-include_lib("cosTime/include/TimeBase.hrl").
+
+%% Application files
+-include("CosNotification.hrl").
+-include("CosNotifyChannelAdmin.hrl").
+-include("CosNotifyComm.hrl").
+-include("CosNotifyFilter.hrl").
+
+-include("CosNotification_Definitions.hrl").
+
+%%--------------- EXPORTS ------------------------------------
+%% Internal Filter Functions
+-export([validate_event/5,
+ create_db/4,
+ destroy_db/1,
+ get_event/1,
+ get_event/2,
+ get_events/2,
+ get_events/3,
+ delete_events/1,
+ update/2,
+ update/4,
+ add_event/2,
+ add_event/4,
+ add_and_get_event/2,
+ add_and_get_event/3,
+ add_and_get_event/4,
+ add_and_get_event/5,
+ gc_events/2,
+ gc_events_local/4,
+ gc_start/2,
+ filter_events/2,
+ filter_events/3,
+ status/2]).
+
+%%--------------- DATA STRUCTURES ----------------------------
+-record(dbRef, {orderRef, discardRef, orderPolicy, discardPolicy,
+ defPriority, maxEvents, defStopT, startTsupport,
+ stopTsupport, gcTime, gcLimit, timeRef}).
+
+
+
+%%--------------- DEFINES ------------------------------------
+
+-define(CreateRef(OR, DR, O, D, DP, ME, DS, StaT, StoT, GT, GL, TR),
+ #dbRef{orderRef=OR, discardRef=DR, orderPolicy=O, discardPolicy=D,
+ defPriority=DP, maxEvents=ME, defStopT=DS, startTsupport=StaT,
+ stopTsupport=StoT, gcTime=GT, gcLimit=round(ME*GL/100),
+ timeRef=TR}).
+
+
+-define(get_OrderP(DR), DR#dbRef.orderPolicy).
+-define(get_DiscardP(DR), DR#dbRef.discardPolicy).
+-define(get_OrderRef(DR), DR#dbRef.orderRef).
+-define(get_DiscardRef(DR), DR#dbRef.orderRef).
+-define(get_DefPriority(DR), DR#dbRef.defPriority).
+-define(get_MaxEvents(DR), DR#dbRef.maxEvents).
+-define(get_DefStopT(DR), DR#dbRef.defStopT).
+-define(get_StartTsupport(DR), DR#dbRef.startTsupport).
+-define(get_StopTsupport(DR), DR#dbRef.stopTsupport).
+-define(get_GCTime(DR), DR#dbRef.gcTime).
+-define(get_GCLimit(DR), DR#dbRef.gcLimit).
+-define(get_TimeRef(DR), DR#dbRef.timeRef).
+
+-define(set_OrderP(DR, O), DR#dbRef{orderPolicy = O}).
+-define(set_DiscardP(DR, D), DR#dbRef{discardPolicy = D}).
+-define(set_OrderRef(DR, E), DR#dbRef{orderRef = E}).
+-define(set_DiscardRef(DR, E), DR#dbRef{orderRef = E}).
+-define(set_DefPriority(DR, DP), DR#dbRef{defPriority = DP}).
+-define(set_MaxEvents(DR, ME), DR#dbRef{maxEvents = ME}).
+-define(set_DefStopT(DR, DS), DR#dbRef{defStopT = DS}).
+-define(set_StartTsupport(DR, B), DR#dbRef{startTsupport = B}).
+-define(set_StopTsupport(DR, B), DR#dbRef{stopTsupport = B}).
+
+-define(is_StartTNotSupported(DR), DR#dbRef.startTsupport == false).
+-define(is_StopTNotSupported(DR), DR#dbRef.stopTsupport == false).
+-define(is_TimeoutNotUsed(DR), DR#dbRef.defStopT == 0).
+
+
+%%------------------------------------------------------------
+%% function : status
+%% Arguments: DBRef
+%% Key - which information we want.
+%% Returns : Data related to the Key.
+%%------------------------------------------------------------
+status(DBRef, eventCounter) ->
+ ets:info(?get_OrderRef(DBRef), size);
+status(DBRef, {batchLimit, Limit}) ->
+ case ets:info(?get_OrderRef(DBRef), size) of
+ Current when is_integer(Current) andalso Current >= Limit ->
+ ?debug_print("BATCH LIMIT (~p) REACHED, CONTAINS: ~p~n", [Limit, Current]),
+ true;
+ _Other ->
+ ?debug_print("BATCH LIMIT (~p) NOT REACHED, CONTAINS: ~p~n",
+ [Limit, _Other]),
+ false
+ end;
+status(DBRef, {batchLimit, Limit, TemporaryMax}) ->
+ case ets:info(?get_OrderRef(DBRef), size) of
+ Current when is_integer(Current) andalso Current >= TemporaryMax ->
+ ?debug_print("MAX LIMIT (~p) REACHED, CONTAINS: ~p~n",
+ [TemporaryMax, Current]),
+ true;
+ Current when is_integer(Current) andalso Current >= Limit ->
+ ?debug_print("BATCH LIMIT (~p) REACHED, CONTAINS: ~p~n", [Limit, Current]),
+ true;
+ _Other ->
+ ?debug_print("BATCH LIMIT (~p) NOT REACHED, CONTAINS: ~p~n",
+ [Limit, _Other]),
+ false
+ end;
+status(_, _) ->
+ error.
+
+
+%%------------------------------------------------------------
+%% function : gc_events_local
+%% Arguments: DBRef
+%% Returns :
+%% Comment : This function is intended for "emergency" GC, i.e.,
+%% when the DB must discard events we should first try
+%% to remove events with expired deadlines.
+%%------------------------------------------------------------
+gc_events_local(_, _, false, _) ->
+ ok;
+gc_events_local(_, _, _, 0) ->
+ ok;
+gc_events_local(ORef, DRef, _, _) ->
+ gc_loop(ets:first(ORef), ORef, DRef).
+
+%%------------------------------------------------------------
+%% function : gc_events
+%% Arguments: DBRef
+%% Priority - 'low', 'medium' or 'high'; will determine
+%% how important a fast gc is.
+%% Returns :
+%% Comment : This function is intended for background GC.
+%%------------------------------------------------------------
+gc_events(DBRef, _Priority) when ?is_TimeoutNotUsed(DBRef) ->
+ ok;
+gc_events(DBRef, _Priority) when ?is_StopTNotSupported(DBRef) ->
+ ok;
+gc_events(DBRef, Priority) ->
+ {M,S,U} = now(),
+ case get(oe_GC_timestamp) of
+ Num when {M,S,U} > Num ->
+ put(oe_GC_timestamp, {M,S+?get_GCTime(DBRef),U}),
+ spawn_link(?MODULE, gc_start, [DBRef, Priority]);
+ _->
+ ok
+ end.
+
+
+%%------------------------------------------------------------
+%% function : gc_start
+%% Arguments:
+%% Returns :
+%%------------------------------------------------------------
+gc_start(#dbRef{orderRef = ORef, discardRef = DRef}, Priority) ->
+ process_flag(priority, Priority),
+ gc_loop(ets:first(ORef), ORef, DRef).
+
+gc_loop('$end_of_table', _, _) ->
+ ok;
+gc_loop(Key, ORef, DRef) ->
+ [{Keys,DL,_,_,_}]=ets:lookup(ORef, Key),
+ case check_deadline(DL) of
+ true when DRef == undefined ->
+ ets:delete(ORef, Key);
+ true ->
+ ets:delete(ORef, Key),
+ gc_discard_DB(Keys, DRef);
+ _ ->
+ ok
+ end,
+ gc_loop(ets:next(ORef, Key), ORef, DRef).
+
+gc_discard_DB({Key1, Key2}, DRef) ->
+ ets:delete(DRef, {Key2, Key1});
+gc_discard_DB({Key1, Key2, Key3}, DRef) ->
+ ets:delete(DRef, {Key3, Key2, Key1}).
+
+%%------------------------------------------------------------
+%% function : create_FIFO_Key
+%% Arguments:
+%% Returns :
+%%------------------------------------------------------------
+create_FIFO_Key() ->
+ {M, S, U} = erlang:now(),
+ -M*1000000000000 - S*1000000 - U.
+
+%%------------------------------------------------------------
+%% function : convert_FIFO_Key
+%% Arguments:
+%% Returns : A now tuple
+%% Comment : Used when we must reuse a timestamp, i.e., only
+%% when we must reorder the DB.
+%%------------------------------------------------------------
+convert_FIFO_Key(Key) ->
+ K = abs(Key),
+ Secs = trunc(K/1000000),
+ M = trunc(K/1000000000000),
+ S = Secs-M*1000000,
+ U = K - S*1000000-M*1000000000000,
+ {M, S, U}.
+
+%%------------------------------------------------------------
+%% function : extract_priority
+%% Arguments: Event
+%% Defalt Value
+%% Mapping Filter Value
+%% - false value not needed (depends on QoS type)
+%% - undefined value needed but no filter associated.
+%% Returns :
+%%------------------------------------------------------------
+extract_priority(_, _, false) ->
+ false;
+extract_priority(#'CosNotification_StructuredEvent'
+ {header = #'CosNotification_EventHeader'
+ {variable_header = VH}}, DefPriority, undefined) ->
+ extract_value(VH, ?not_Priority, DefPriority);
+%% Maybe a unstructured event.
+extract_priority(_, DefPriority, undefined) ->
+ DefPriority;
+extract_priority(_, _, PriorityOverride) ->
+ %% Must have an associated MappingFilter for Priority.
+ PriorityOverride.
+
+%%------------------------------------------------------------
+%% function : extract_start_time
+%% Arguments:
+%% Returns :
+%%------------------------------------------------------------
+extract_start_time(_, false, _) ->
+ false;
+extract_start_time(#'CosNotification_StructuredEvent'
+ {header = #'CosNotification_EventHeader'
+ {variable_header = VH}}, _, TRef) ->
+ ST = case extract_value(VH, ?not_StartTime, undefined) of
+ UTC when is_record(UTC, 'TimeBase_UtcT') ->
+ UTC;
+ _ ->
+ false
+ end,
+ convert_time(ST, TRef, now());
+extract_start_time(_, _, _) ->
+ false.
+
+%%------------------------------------------------------------
+%% function : extract_deadline
+%% Arguments: Structured Event
+%% Default Timeout Value - TimeT or UtcT (see cosTime).
+%% StopTSupported - boolean().
+%% TRef - reference to TimeService
+%% Mapping Filter Value
+%% - false eq. value not needed (depends on QoS type)
+%% - undefined eq. value needed but no filter associated.
+%% Now - used when we want to reuse old TimeStamp which
+%% must be done when changing QoS.
+%% Returns : A modified return from now().
+%%------------------------------------------------------------
+extract_deadline(_, _, _, _, false) ->
+ false;
+extract_deadline(Event, DefaultT, StopTSupported, TRef, MappingVal) ->
+ extract_deadline(Event, DefaultT, StopTSupported, TRef, MappingVal, now()).
+
+extract_deadline(_, _, _, _, false, _) ->
+ false;
+extract_deadline(#'CosNotification_StructuredEvent'
+ {header = #'CosNotification_EventHeader'
+ {variable_header = VH}}, DefaultT, StopTSupported,
+ TRef, undefined, Now) ->
+ DL = case extract_value(VH, ?not_Timeout, undefined) of
+ undefined when StopTSupported == true, TRef =/= undefined ->
+ case extract_value(VH, ?not_StopTime, undefined) of
+ undefined ->
+ DefaultT;
+ DefinedTime ->
+ DefinedTime
+ end;
+ undefined ->
+ DefaultT;
+ DefinedTime ->
+ DefinedTime
+ end,
+ convert_time(DL, TRef, Now);
+%% Maybe a unstructured event.
+extract_deadline(_, Time, _, TRef, undefined, Now) ->
+ convert_time(Time, TRef, Now);
+extract_deadline(_, _, _, TRef, DOverride, Now) ->
+ %% Must have an associated MappingFilter defining a Deadline.
+ convert_time(DOverride, TRef, Now).
+
+convert_time(0, _, _) ->
+ false;
+convert_time(UTC, TRef, {M,S,U}) when is_record(UTC, 'TimeBase_UtcT') ->
+ case catch get_time_diff(UTC, TRef) of
+ {'EXCEPTION', _} ->
+ false;
+ {'EXIT', _} ->
+ false;
+ DL ->
+ MicroSecs = round(DL/10),
+ Secs = round(MicroSecs/1000000),
+ MegaSecs = round(Secs/1000000),
+ {-M-MegaSecs, -S-Secs+MegaSecs, -U-MicroSecs+Secs}
+ end;
+convert_time(DL, _, {M,S,U}) when is_integer(DL) ->
+ MicroSecs = round(DL/10),
+ Secs = round(MicroSecs/1000000),
+ MegaSecs = round(Secs/1000000),
+ {-M-MegaSecs, -S-Secs+MegaSecs, -U-MicroSecs+Secs};
+convert_time(_, _, _) ->
+ false.
+
+
+get_time_diff(UTC, TRef) ->
+ UTO = 'CosTime_TimeService':universal_time(TRef),
+ UTO2 = 'CosTime_TimeService':uto_from_utc(TRef, UTC),
+ TIO = 'CosTime_UTO':time_to_interval(UTO, UTO2),
+ #'TimeBase_IntervalT'{lower_bound=LB, upper_bound = UB} =
+ 'CosTime_TIO':'_get_time_interval'(TIO),
+ UB-LB.
+
+check_deadline(DL) when is_tuple(DL) ->
+ {M,S,U} = now(),
+ DL >= {-M,-S,-U};
+check_deadline(_DL) ->
+ %% This case will cover if no timeout is set.
+ false.
+
+check_start_time(ST) when is_tuple(ST) ->
+ {M,S,U} = now(),
+ ST >= {-M,-S,-U};
+check_start_time(_ST) ->
+ %% This case will cover if no earliest delivery time is set.
+ true.
+
+%%------------------------------------------------------------
+%% function : extract_value
+%% Arguments: A Property Sequence
+%% ID - wanted property string()
+%% Other - default-value.
+%% Returns : Value associated with given ID or default value.
+%%------------------------------------------------------------
+extract_value([], _, Other) ->
+ Other;
+extract_value([#'CosNotification_Property'{name=ID, value=V}|_], ID, _) ->
+ any:get_value(V);
+extract_value([_H|T], ID, Other) ->
+ extract_value(T, ID, Other).
+
+%%------------------------------------------------------------
+%% function : get_event
+%% Arguments:
+%% Returns :
+%%------------------------------------------------------------
+get_event(DBRef) ->
+ get_event(DBRef, true).
+get_event(DBRef, Delete) ->
+ case get_events(DBRef, 1, Delete) of
+ {[], false} ->
+ {[], false};
+ {[], false, Keys} ->
+ {[], false, Keys};
+ {[Event], Bool} ->
+ {Event, Bool};
+ {[Event], Bool, Keys} ->
+ {Event, Bool, Keys}
+ end.
+
+%%------------------------------------------------------------
+%% function : get_events
+%% Arguments:
+%% Returns : A list of events (possibly empty) and a boolean
+%% indicating if event found.
+%% Comments : Try to extract Max events from the database.
+%%------------------------------------------------------------
+get_events(#dbRef{orderRef = ORef, discardRef = DRef}, Max) ->
+ event_loop(ets:last(ORef), ORef, DRef, Max, [], [], true).
+
+get_events(#dbRef{orderRef = ORef, discardRef = DRef}, Max, Delete) ->
+ event_loop(ets:last(ORef), ORef, DRef, Max, [], [], Delete).
+
+event_loop('$end_of_table', _, _, _, [], _, true) ->
+ {[], false};
+event_loop('$end_of_table', _, _, _, [], [], _) ->
+ {[], false, []};
+event_loop('$end_of_table', _ORef, _, _, Accum, _Keys, true) ->
+ {lists:reverse(Accum), true};
+event_loop('$end_of_table', _ORef, _, _, Accum, Keys, _) ->
+ {lists:reverse(Accum), true, Keys};
+event_loop(_, _ORef, _, 0, [], _Keys, true) ->
+ %% Only possible if some tries to pull a sequence of 0 events.
+ %% Should we really test for this case?
+ {[], false};
+event_loop(_, _ORef, _, 0, [], Keys, _) ->
+ {[], false, Keys};
+event_loop(_, _ORef, _, 0, Accum, _Keys, true) ->
+ {lists:reverse(Accum), true};
+event_loop(_, _ORef, _, 0, Accum, Keys, _) ->
+ {lists:reverse(Accum), true, Keys};
+event_loop(Key, ORef, undefined, Left, Accum, Keys, Delete) ->
+ [{_,DL,ST,_PO,Event}]=ets:lookup(ORef, Key),
+ case check_deadline(DL) of
+ true ->
+ ets:delete(ORef, Key),
+ event_loop(ets:prev(ORef, Key), ORef, undefined,
+ Left, Accum, Keys, Delete);
+ false ->
+ case check_start_time(ST) of
+ true when Delete == true ->
+ ets:delete(ORef, Key),
+ event_loop(ets:prev(ORef, Key), ORef, undefined,
+ Left-1, [Event|Accum], Keys, Delete);
+ true ->
+ event_loop(ets:prev(ORef, Key), ORef, undefined,
+ Left-1, [Event|Accum], [{ORef, Key}|Keys], Delete);
+ false ->
+ event_loop(ets:prev(ORef, Key), ORef, undefined,
+ Left, Accum, Keys, Delete)
+ end
+ end;
+event_loop({Key1, Key2}, ORef, DRef, Left, Accum, Keys, Delete) ->
+ [{_,DL,ST,_PO,Event}]=ets:lookup(ORef, {Key1, Key2}),
+ case check_deadline(DL) of
+ true ->
+ ets:delete(ORef, {Key1, Key2}),
+ ets:delete(DRef, {Key2, Key1}),
+ event_loop(ets:prev(ORef, {Key1, Key2}), ORef, DRef,
+ Left, Accum, Keys, Delete);
+ false ->
+ case check_start_time(ST) of
+ true when Delete == true ->
+ ets:delete(ORef, {Key1, Key2}),
+ ets:delete(DRef, {Key2, Key1}),
+ event_loop(ets:prev(ORef, {Key1, Key2}), ORef, DRef,
+ Left-1, [Event|Accum], Keys, Delete);
+ true ->
+ event_loop(ets:prev(ORef, {Key1, Key2}), ORef, DRef,
+ Left-1, [Event|Accum],
+ [{ORef, {Key1, Key2}}, {DRef, {Key2, Key1}}|Keys],
+ Delete);
+ false ->
+ event_loop(ets:prev(ORef, {Key1, Key2}), ORef, DRef,
+ Left, Accum, Keys, Delete)
+ end
+ end;
+event_loop({Key1, Key2, Key3}, ORef, DRef, Left, Accum, Keys, Delete) ->
+ [{_,DL,ST,_PO,Event}]=ets:lookup(ORef, {Key1, Key2, Key3}),
+ case check_deadline(DL) of
+ true ->
+ ets:delete(ORef, {Key1, Key2, Key3}),
+ ets:delete(DRef, {Key3, Key2, Key1}),
+ event_loop(ets:prev(ORef, {Key1, Key2, Key3}), ORef, DRef,
+ Left, Accum, Keys, Delete);
+ false ->
+ case check_start_time(ST) of
+ true when Delete == true ->
+ ets:delete(ORef, {Key1, Key2, Key3}),
+ ets:delete(DRef, {Key3, Key2, Key1}),
+ event_loop(ets:prev(ORef, {Key1, Key2, Key3}), ORef, DRef,
+ Left-1, [Event|Accum], Keys, Delete);
+ true ->
+ event_loop(ets:prev(ORef, {Key1, Key2, Key3}), ORef, DRef,
+ Left-1, [Event|Accum],
+ [{ORef, {Key1, Key2, Key3}},
+ {DRef, {Key3, Key2, Key1}}|Keys], Delete);
+ false ->
+ event_loop(ets:prev(ORef, {Key1, Key2, Key3}), ORef, DRef,
+ Left, Accum, Keys, Delete)
+ end
+ end.
+
+%%------------------------------------------------------------
+%% function : delete_events
+%% Arguments: EventList - what's returned by get_event, get_events
+%% and add_and_get_event.
+%% Returns :
+%% Comment : Shall be invoked when it's safe to premanently remove
+%% the events found in the EventList.
+%%
+%%------------------------------------------------------------
+delete_events([]) ->
+ ok;
+delete_events([{DB, Key}|T]) ->
+ ets:delete(DB, Key),
+ delete_events(T).
+
+%%------------------------------------------------------------
+%% function : update
+%% Arguments:
+%% Returns :
+%% Comment : As default we shall deliver Events in Priority order.
+%% Hence, if AnyOrder set we will still deliver in
+%% Priority order.
+%%------------------------------------------------------------
+update(undefined, _QoS) ->
+ ok;
+update(DBRef, QoS) ->
+ update(DBRef, QoS, undefined, undefined).
+
+update(DBRef, QoS, LifeFilter, PrioFilter) ->
+ case updated_order(DBRef, ?not_GetOrderPolicy(QoS)) of
+ false ->
+ case updated_discard(DBRef, ?not_GetDiscardPolicy(QoS)) of
+ false ->
+ DBR2 = ?set_DefPriority(DBRef, ?not_GetPriority(QoS)),
+ DBR3 = ?set_MaxEvents(DBR2, ?not_GetMaxEventsPerConsumer(QoS)),
+ DBR4 = ?set_DefStopT(DBR3, ?not_GetTimeout(QoS)),
+ DBR5 = ?set_StartTsupport(DBR4, ?not_GetStartTimeSupported(QoS)),
+ DBR6 = ?set_StopTsupport(DBR5, ?not_GetStopTimeSupported(QoS)),
+ case ets:info(?get_OrderRef(DBR6), size) of
+ N when N =< ?get_MaxEvents(DBR6) ->
+ %% Even if the QoS MaxEvents have been changed
+ %% we don't reach the limit.
+ DBR6;
+ N ->
+ %% The QoS MaxEvents must have been decreased.
+ discard_events(DBR6, N-?get_MaxEvents(DBR6)),
+ DBR6
+ end;
+ true ->
+ destroy_discard_db(DBRef),
+ NewDBRef = create_db(QoS, ?get_GCTime(DBRef), ?get_GCLimit(DBRef),
+ ?get_TimeRef(DBRef)),
+ move_events(DBRef, NewDBRef, ets:first(?get_OrderRef(DBRef)),
+ LifeFilter, PrioFilter)
+ end;
+ true ->
+ destroy_discard_db(DBRef),
+ NewDBRef = create_db(QoS, ?get_GCTime(DBRef), ?get_GCLimit(DBRef),
+ ?get_TimeRef(DBRef)),
+ move_events(DBRef, NewDBRef, ets:first(?get_OrderRef(DBRef)),
+ LifeFilter, PrioFilter)
+ end.
+
+updated_order(#dbRef{orderPolicy = Equal}, Equal) -> false;
+updated_order(#dbRef{orderPolicy = ?not_PriorityOrder}, ?not_AnyOrder) -> false;
+updated_order(#dbRef{orderPolicy = ?not_AnyOrder}, ?not_PriorityOrder) -> false;
+updated_order(_, _) -> true.
+
+updated_discard(#dbRef{discardPolicy = Equal}, Equal) -> false;
+updated_discard(#dbRef{discardPolicy = ?not_RejectNewEvents}, ?not_AnyOrder) -> false;
+updated_discard(#dbRef{discardPolicy = ?not_AnyOrder}, ?not_RejectNewEvents) -> false;
+updated_discard(_, _) -> true.
+
+move_events(DBRef, NewDBRef, '$end_of_table', _, _) ->
+ destroy_order_db(DBRef),
+ case ets:info(?get_OrderRef(NewDBRef), size) of
+ N when N =< ?get_MaxEvents(NewDBRef) ->
+ %% Even if the QoS MaxEvents have been changed
+ %% we don't reach the limit.
+ NewDBRef;
+ N ->
+ %% The QoS MaxEvents must have been decreased.
+ discard_events(DBRef, N-?get_MaxEvents(NewDBRef)),
+ NewDBRef
+ end;
+move_events(DBRef, NewDBRef, Key, LifeFilter, PrioFilter) ->
+ [{Keys, DeadLine, StartTime, PriorityOverride, Event}] =
+ ets:lookup(?get_OrderRef(DBRef), Key),
+ case check_deadline(DeadLine) of
+ true ->
+ ok;
+ _->
+ write_event(?get_OrderP(DBRef),
+ {Keys, DeadLine, StartTime, PriorityOverride, Event},
+ DBRef, NewDBRef, Key, LifeFilter, PrioFilter)
+ end,
+ ets:delete(?get_OrderRef(DBRef), Key),
+ move_events(DBRef, NewDBRef, ets:next(?get_OrderRef(DBRef), Key),
+ LifeFilter, PrioFilter).
+
+%% We cannot use do_add_event directly since we MUST lookup the timestamp (TS).
+write_event(?not_DeadlineOrder, {{_, TS, _Prio}, DL, ST, PO, Event}, _DBRef, NewDBRef,
+ _Key, _LifeFilter, _PrioFilter) ->
+ StartT = update_starttime(NewDBRef, Event, ST),
+ %% Deadline and Priority previously extracted.
+ do_add_event(NewDBRef, Event, TS, DL, StartT, PO);
+write_event(?not_DeadlineOrder, {{_, TS}, DL, _ST, PO, Event}, _DBRef, NewDBRef,
+ _Key, _LifeFilter, PrioFilter) ->
+ %% Priority not previously extracted.
+ POverride = update_priority(NewDBRef, PrioFilter, Event, PO),
+ StartT = extract_start_time(Event, ?get_StartTsupport(NewDBRef),
+ ?get_TimeRef(NewDBRef)),
+ do_add_event(NewDBRef, Event, TS, DL, StartT, POverride);
+write_event(?not_FifoOrder, {{TS, _PorD}, DL, ST, PO, Event}, _DBRef, NewDBRef,
+ _Key, LifeFilter, PrioFilter) ->
+ %% Priority or Deadline have been extracted before but we cannot tell which.
+ POverride = update_priority(NewDBRef, PrioFilter, Event, PO),
+ DeadL = update_deadline(NewDBRef, LifeFilter, Event, TS, DL),
+ StartT = update_starttime(NewDBRef, Event, ST),
+ do_add_event(NewDBRef, Event, TS, DeadL, StartT, POverride);
+write_event(?not_FifoOrder, {TS, DL, ST, PO, Event}, _DBRef, NewDBRef,
+ _Key, LifeFilter, PrioFilter) ->
+ %% Priority and Deadline not extracetd before. Do it now.
+ POverride = update_priority(NewDBRef, PrioFilter, Event, PO),
+ DeadL = update_deadline(NewDBRef, LifeFilter, Event, TS, DL),
+ StartT = update_starttime(NewDBRef, Event, ST),
+ do_add_event(NewDBRef, Event, TS, DeadL, StartT, POverride);
+%% Order Policy must be AnyOrder or PriorityOrder.
+write_event(_, {{_Prio, TS}, DL, ST, PO, Event}, _DBRef, NewDBRef,
+ _Key, LifeFilter, _PrioFilter) ->
+ DeadL = update_deadline(NewDBRef, LifeFilter, Event, TS, DL),
+ StartT = update_starttime(NewDBRef, Event, ST),
+ do_add_event(NewDBRef, Event, TS, DeadL, StartT, PO);
+write_event(_, {{_Prio, TS, DL}, DL, ST, PO, Event}, _DBRef, NewDBRef, _Key, _, _) ->
+ %% Both Deadline and Priority have been extracetd before.
+ StartT = update_starttime(NewDBRef, Event, ST),
+ do_add_event(NewDBRef, Event, TS, DL, StartT, PO).
+
+
+%%------------------------------------------------------------
+%% function : update_priority
+%% Arguments:
+%% Returns :
+%% Comment : The purpose with this function is to avoid
+%% calling MappingFilter priority again, especially
+%% deadline again (we especially want to avoid calling
+%% since it may require intra-ORB communication.
+%% Use only when doing an update.
+%%------------------------------------------------------------
+update_priority(DBRef, PrioFilter, Event, OldPrio) when is_atom(OldPrio) ->
+ get_prio_mapping_value(DBRef, PrioFilter, Event);
+update_priority(_DBRef, _PrioFilter, _Event, OldPrio) ->
+ OldPrio.
+
+%%------------------------------------------------------------
+%% function : update_deadline
+%% Arguments:
+%% Returns :
+%% Comment : The purpose with this function is to avoid
+%% calling MappingFilter or parsing the events for
+%% deadline again (we especially want to avoid calling
+%% the MappingFilter since it may require intra-ORB
+%% communication. Use only when doing an update.
+%%------------------------------------------------------------
+update_deadline(DBRef, _LifeFilter, _Event, _TS, _OldDeadL) when
+ ?get_DiscardP(DBRef) =/= ?not_DeadlineOrder,
+ ?get_OrderP(DBRef) =/= ?not_DeadlineOrder,
+ ?is_StopTNotSupported(DBRef) ->
+ %% We do not need to extract the Deadline since it will not be used.
+ false;
+update_deadline(DBRef, LifeFilter, Event, TS, OldDeadL) when is_atom(OldDeadL) ->
+ %% We need the Deadline and it have not been extracetd before.
+ DOverride = get_life_mapping_value(DBRef, LifeFilter, Event),
+ %% We must find out when the event was delivered; setting a deadline using
+ %% a new timestamp would not be accurate since we cannot tell for how long
+ %% the event have been waiting.
+ OldNow = convert_FIFO_Key(TS),
+ extract_deadline(Event, ?get_DefStopT(DBRef), ?get_StopTsupport(DBRef),
+ ?get_TimeRef(DBRef), DOverride, OldNow);
+update_deadline(_DBRef, _LifeFilter, _Event, _TS, OldDeadL) ->
+ %% We need the Deadline and it have been extracetd before.
+ OldDeadL.
+
+%%------------------------------------------------------------
+%% function : update_starttime
+%% Arguments:
+%% Returns :
+%% Comment : The purpose with this function is to avoid
+%% parsing the events for starttime again.
+%% Use only when doing an update.
+%%------------------------------------------------------------
+update_starttime(DBRef, Event, OldStartT) when is_atom(OldStartT) ->
+ %% Probably not previously extracted; try to get it.
+ extract_start_time(Event, ?get_StartTsupport(DBRef), ?get_TimeRef(DBRef));
+update_starttime(_DBRef, _Event, OldStartT) ->
+ %% Previously extracted.
+ OldStartT.
+
+%%------------------------------------------------------------
+%% function : discard_events
+%% Arguments: DBRef
+%% N - number of events we must discard.
+%% Returns :
+%% Comment : As default we shall Reject New Events when the limit
+%% is reached. Any discard order will do the same.
+%%
+%% This function can only be used for the discard policies
+%% Fifo, Priority and Deadline. Any or RejectNewEvents
+%% will not allow events to be stored at all, i.e., no events
+%% to discard. Lifo will not be stored either since when
+%% trying to add an event it is definitely the last event in.
+%%------------------------------------------------------------
+%% Since no Discard DB must the same Order policy.
+discard_events(#dbRef{orderRef = ORef, discardRef = undefined,
+ discardPolicy = ?not_DeadlineOrder}, N) ->
+ ?debug_print("Discarding ~p events Deadline Order.",[N]),
+ index_loop_backward(ets:last(ORef), undefined, ORef, N);
+discard_events(#dbRef{orderRef = ORef, discardRef = DRef,
+ discardPolicy = ?not_DeadlineOrder}, N) ->
+ ?debug_print("Discarding ~p events Deadline Order.",[N]),
+ index_loop_backward(ets:last(DRef), DRef, ORef, N);
+%% Fifo.
+discard_events(#dbRef{orderRef = ORef, discardRef = undefined,
+ discardPolicy = ?not_FifoOrder}, N) ->
+ ?debug_print("Discarding ~p events Fifo Order.",[N]),
+ index_loop_backward(ets:last(ORef), undefined, ORef, N);
+discard_events(#dbRef{orderRef = ORef, discardRef = DRef,
+ discardPolicy = ?not_FifoOrder}, N) ->
+ ?debug_print("Discarding ~p events Fifo Order.",[N]),
+ index_loop_backward(ets:last(DRef), DRef, ORef, N);
+%% Lifo- or Priority-Order
+discard_events(#dbRef{orderRef = ORef, discardRef = undefined}, N) ->
+ ?debug_print("Discarding ~p events Lifo- or Priority-Order.",[N]),
+ index_loop_forward(ets:first(ORef), undefined, ORef, N);
+discard_events(#dbRef{orderRef = ORef, discardRef = DRef}, N) ->
+ ?debug_print("Discarding ~p events Lifo- or Priority-Order.",[N]),
+ index_loop_forward(ets:first(DRef), DRef, ORef, N).
+
+
+index_loop_forward('$end_of_table', _, _, _Left) ->
+ ok;
+index_loop_forward(_, _, _, 0) ->
+ ok;
+index_loop_forward(Key, undefined, ORef, Left) ->
+ ets:delete(ORef, Key),
+ NewKey=ets:next(ORef, Key),
+ index_loop_forward(NewKey, undefined, ORef, Left-1);
+
+index_loop_forward({Key1, Key2, Key3}, DRef, ORef, Left) ->
+ ets:delete(DRef, {Key1, Key2, Key3}),
+ ets:delete(ORef, {Key3, Key2, Key1}),
+ NewKey=ets:next(DRef, {Key1, Key2, Key3}),
+ index_loop_forward(NewKey, DRef, ORef, Left-1);
+
+index_loop_forward({Key1, Key2}, DRef, ORef, Left) ->
+ ets:delete(DRef, {Key1, Key2}),
+ ets:delete(ORef, {Key2, Key1}),
+ NewKey=ets:next(DRef, {Key1, Key2}),
+ index_loop_forward(NewKey, DRef, ORef, Left-1).
+
+index_loop_backward('$end_of_table', _, _, _) ->
+ ok;
+index_loop_backward(_, _, _, 0) ->
+ ok;
+index_loop_backward(Key, undefined, ORef, Left) ->
+ ets:delete(ORef, Key),
+ NewKey=ets:prev(ORef, Key),
+ index_loop_backward(NewKey, undefined, ORef, Left-1);
+index_loop_backward({Key1, Key2}, DRef, ORef, Left) ->
+ ets:delete(DRef, {Key1, Key2}),
+ ets:delete(ORef, {Key2, Key1}),
+ NewKey=ets:prev(DRef, {Key1, Key2}),
+ index_loop_backward(NewKey, DRef, ORef, Left-1);
+index_loop_backward({Key1, Key2, Key3}, DRef, ORef, Left) ->
+ ets:delete(DRef, {Key1, Key2, Key3}),
+ ets:delete(ORef, {Key3, Key2, Key1}),
+ NewKey=ets:prev(DRef, {Key1, Key2, Key3}),
+ index_loop_backward(NewKey, DRef, ORef, Left-1).
+
+%%------------------------------------------------------------
+%% function : add_and_get_event
+%% Arguments: DBRef and Event
+%% Returns : {[], bool()} | {Event, bool()}
+%% Comment : This function is a mixture of ad anf get events.
+%% The intended use to avoid storing an event when
+%% not necessary.
+%%------------------------------------------------------------
+add_and_get_event(DBRef, Event) ->
+ add_and_get_event(DBRef, Event, undefined, undefined, true).
+
+add_and_get_event(DBRef, Event, Delete) ->
+ add_and_get_event(DBRef, Event, undefined, undefined, Delete).
+
+add_and_get_event(DBRef, Event, LifeFilter, PrioFilter) ->
+ add_and_get_event(DBRef, Event, LifeFilter, PrioFilter, true).
+
+add_and_get_event(DBRef, Event, LifeFilter, PrioFilter, Delete) ->
+ case ets:info(?get_OrderRef(DBRef), size) of
+ 0 when ?is_StartTNotSupported(DBRef), ?is_StopTNotSupported(DBRef),
+ Delete == true ->
+ %% No stored events and no timeouts used; just return the event.
+ {Event, false};
+ 0 when ?is_StartTNotSupported(DBRef), ?is_StopTNotSupported(DBRef) ->
+ %% No stored events and no timeouts used; just return the event.
+ {Event, false, []};
+ 0 when ?is_StartTNotSupported(DBRef) ->
+ %% Only deadline supported, lookup values and cehck if ok.
+ DOverride = get_life_mapping_value(DBRef, LifeFilter, Event),
+ DL = extract_deadline(Event, ?get_DefStopT(DBRef),
+ ?get_StopTsupport(DBRef), ?get_TimeRef(DBRef),
+ DOverride),
+ case check_deadline(DL) of
+ true when Delete == true ->
+ %% Expired, just discard the event.
+ {[], false};
+ true ->
+ {[], false, []};
+ _ when Delete == true ->
+ %% Not expired, we can safely return the event.
+ {Event, false};
+ _ ->
+ %% Not expired, we can safely return the event.
+ {Event, false, []}
+ end;
+ 0 when ?is_StopTNotSupported(DBRef) ->
+ %% Only starttime allowed, test if we can deliver the event now.
+ ST = extract_start_time(Event, ?get_StartTsupport(DBRef),
+ ?get_TimeRef(DBRef)),
+ case check_start_time(ST) of
+ false when Delete == true ->
+ DOverride = get_life_mapping_value(DBRef, LifeFilter, Event),
+ POverride = get_prio_mapping_value(DBRef, PrioFilter, Event),
+ DL = extract_deadline(Event, ?get_DefStopT(DBRef),
+ ?get_StopTsupport(DBRef),
+ ?get_TimeRef(DBRef), DOverride),
+ do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride),
+ {[], true};
+ false ->
+ DOverride = get_life_mapping_value(DBRef, LifeFilter, Event),
+ POverride = get_prio_mapping_value(DBRef, PrioFilter, Event),
+ DL = extract_deadline(Event, ?get_DefStopT(DBRef),
+ ?get_StopTsupport(DBRef),
+ ?get_TimeRef(DBRef), DOverride),
+ do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride),
+ {[], true, []};
+ _ when Delete == true ->
+ %% Starttime ok, just return the event.
+ {Event, false};
+ _ ->
+ %% Starttime ok, just return the event.
+ {Event, false, []}
+ end;
+ _->
+ %% Event already stored, just have to accept the overhead.
+ ST = extract_start_time(Event, ?get_StartTsupport(DBRef),
+ ?get_TimeRef(DBRef)),
+ DOverride = get_life_mapping_value(DBRef, LifeFilter, Event),
+ POverride = get_prio_mapping_value(DBRef, PrioFilter, Event),
+ DL = extract_deadline(Event, ?get_DefStopT(DBRef),
+ ?get_StopTsupport(DBRef),
+ ?get_TimeRef(DBRef), DOverride),
+ do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride),
+ get_event(DBRef, Delete)
+ end.
+
+%%------------------------------------------------------------
+%% function : add_event
+%% Arguments: DBRef and Event
+%% Returns : true (or whatever 'ets:insert' returns) |
+%% {'EXCEPTION', #'IMP_LIMIT'{}}
+%% Comment : As default we shall deliver Events in Priority order.
+%% Hence, if AnyOrder set we will still deliver in
+%% Priority order. But we cannot use only the Priority
+%% value since if "all" events have the same priority
+%% there is a risk that some never will be delivered if
+%% the EventDB always contain events.
+%%
+%% When discard and order policy is equal we only use one
+%% DB since all we have to do is to "read from the other
+%% end" to discard the correct event(s).
+%%
+%% In the discard DB we must also store keys necessary to
+%% lookup the event in the order DB.
+%%
+%% If event limit reached 'IMPL_LIMIT' is raised if
+%% the discard policy is RejectNewEvents or AnyOrder.
+%% Theses two policies we currently define to be equal.
+%%------------------------------------------------------------
+
+add_event(DBRef, Event) ->
+ %% Save overhead by first checking if we really need to extract
+ %% Deadline and/or Priority.
+ Deadline = get_life_mapping_value(DBRef, undefined, Event),
+ Priority = get_prio_mapping_value(DBRef, undefined, Event),
+ add_event_helper(DBRef, Event, Deadline, Priority).
+
+add_event(DBRef, Event, LifeFilter, PrioFilter) ->
+ %% Save overhead by first checking if we really need to extract
+ %% Deadline and/or Priority.
+ Deadline = get_life_mapping_value(DBRef, LifeFilter, Event),
+ Priority = get_prio_mapping_value(DBRef, PrioFilter, Event),
+ add_event_helper(DBRef, Event, Deadline, Priority).
+
+add_event_helper(DBRef, Event, DOverride, POverride) ->
+ case ets:info(?get_OrderRef(DBRef), size) of
+ N when N < ?get_MaxEvents(DBRef), N > ?get_GCLimit(DBRef) ->
+ gc_events(DBRef, low),
+ DL = extract_deadline(Event, ?get_DefStopT(DBRef),
+ ?get_StopTsupport(DBRef), ?get_TimeRef(DBRef),
+ DOverride),
+ case check_deadline(DL) of
+ true ->
+ true;
+ _ ->
+ ST = extract_start_time(Event, ?get_StartTsupport(DBRef),
+ ?get_TimeRef(DBRef)),
+ do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride)
+ end;
+ N when N < ?get_MaxEvents(DBRef) ->
+ DL = extract_deadline(Event, ?get_DefStopT(DBRef),
+ ?get_StopTsupport(DBRef), ?get_TimeRef(DBRef),
+ DOverride),
+ case check_deadline(DL) of
+ true ->
+ true;
+ _ ->
+ ST = extract_start_time(Event, ?get_StartTsupport(DBRef),
+ ?get_TimeRef(DBRef)),
+ do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride)
+ end;
+ _N when ?get_DiscardP(DBRef) == ?not_RejectNewEvents ->
+ gc_events(DBRef, low),
+ corba:raise(#'IMP_LIMIT'{completion_status=?COMPLETED_NO});
+ _N when ?get_DiscardP(DBRef) == ?not_AnyOrder ->
+ gc_events(DBRef, low),
+ corba:raise(#'IMP_LIMIT'{completion_status=?COMPLETED_NO});
+ _N when ?get_DiscardP(DBRef) == ?not_LifoOrder ->
+ gc_events(DBRef, low),
+ corba:raise(#'IMP_LIMIT'{completion_status=?COMPLETED_NO});
+ _N ->
+ gc_events(DBRef, low),
+ %% Other discard policy; we must first store the event
+ %% and the look up in the Discard DB which event we
+ %% should remove.
+ DL = extract_deadline(Event, ?get_DefStopT(DBRef),
+ ?get_StopTsupport(DBRef), ?get_TimeRef(DBRef),
+ DOverride),
+ case check_deadline(DL) of
+ true ->
+ true;
+ _ ->
+ ST = extract_start_time(Event, ?get_StartTsupport(DBRef),
+ ?get_TimeRef(DBRef)),
+ do_add_event(DBRef, Event, create_FIFO_Key(), DL, ST, POverride),
+ discard_events(DBRef, 1)
+ end
+ end.
+
+
+do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_DeadlineOrder,
+ discardRef = DRef, discardPolicy = ?not_PriorityOrder,
+ defPriority = DefPrio, defStopT = _DefStopT}, Event, Key, DL, ST, PO) ->
+ Prio = extract_priority(Event, DefPrio, PO),
+ ets:insert(ORef, {{DL, Key, Prio}, DL, ST, PO, Event}),
+ ets:insert(DRef, {{Prio, Key, DL}});
+do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_DeadlineOrder,
+ discardRef = DRef, discardPolicy = ?not_FifoOrder,
+ defStopT = _DefStopT}, Event, Key, DL, ST, PO) ->
+ ets:insert(ORef, {{DL, Key}, DL, ST, PO, Event}),
+ ets:insert(DRef, {{Key, DL}});
+do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_DeadlineOrder,
+ discardRef = DRef, discardPolicy = ?not_LifoOrder,
+ defStopT = _DefStopT}, Event, Key, DL, ST, PO) ->
+ ets:insert(ORef, {{DL, Key}, DL, ST, PO, Event}),
+ ets:insert(DRef, {{Key, DL}});
+%% Either the same (DeadlineOrder), RejectNewEvents or AnyOrder. No need
+%% to store anything in the discard policy, i.e., if the same we'll just
+%% read "from the other end" and AnyOrder and RejectNewEvents is equal.
+do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_DeadlineOrder,
+ defStopT = _DefStopT}, Event, Key, DL, ST, PO) ->
+ ets:insert(ORef, {{DL, Key}, DL, ST, PO, Event});
+
+
+do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_FifoOrder,
+ discardRef = DRef, discardPolicy = ?not_DeadlineOrder,
+ defStopT = _DefStopT}, Event, Key, DL, ST, PO) ->
+ ets:insert(ORef, {{Key, DL}, DL, ST, PO, Event}),
+ ets:insert(DRef, {{DL, Key}});
+do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_FifoOrder,
+ discardRef = DRef, discardPolicy = ?not_PriorityOrder,
+ defPriority = DefPrio}, Event, Key, DL, ST, PO) ->
+ Prio = extract_priority(Event, DefPrio, PO),
+ ets:insert(ORef, {{Key, Prio}, DL, ST, PO, Event}),
+ ets:insert(DRef, {{Prio, Key}});
+%% The discard policy must RejectNewEvents, AnyOrder, Fifo or Lifo order.
+do_add_event(#dbRef{orderRef = ORef, orderPolicy = ?not_FifoOrder,
+ discardRef = _DRef}, Event, Key, DL, ST, PO) ->
+ ets:insert(ORef, {Key, DL, ST, PO, Event});
+
+%% Order Policy must be AnyOrder or PriorityOrder.
+do_add_event(#dbRef{orderRef = ORef,
+ discardRef = DRef, discardPolicy = ?not_DeadlineOrder,
+ defPriority = DefPrio, defStopT = _DefStopT}, Event, Key, DL, ST, PO) ->
+ Prio = extract_priority(Event, DefPrio, PO),
+ ets:insert(ORef, {{Prio, Key, DL}, DL, ST, PO, Event}),
+ ets:insert(DRef, {{DL, Key, Prio}});
+do_add_event(#dbRef{orderRef = ORef,
+ discardRef = DRef, discardPolicy = ?not_FifoOrder,
+ defPriority = DefPrio}, Event, Key, DL, ST, PO) ->
+ Prio = extract_priority(Event, DefPrio, PO),
+ ets:insert(ORef, {{Prio, Key}, DL, ST, PO, Event}),
+ ets:insert(DRef, {{Key, Prio}});
+
+do_add_event(#dbRef{orderRef = ORef,
+ discardRef = DRef, discardPolicy = ?not_LifoOrder,
+ defPriority = DefPrio}, Event, Key, DL, ST, PO) ->
+ Prio = extract_priority(Event, DefPrio, PO),
+ ets:insert(ORef, {{Prio, Key}, DL, ST, PO, Event}),
+ ets:insert(DRef, {{Key, Prio}});
+
+%% Order Policy must be AnyOrder or PriorityOrder and Discard Policy must be
+%% AnyOrder or RejectNewEvents
+do_add_event(#dbRef{orderRef = ORef, defPriority = DefPrio}, Event, Key, DL, ST, PO) ->
+ Prio = extract_priority(Event, DefPrio, PO),
+ ets:insert(ORef, {{Prio, Key}, DL, ST, PO, Event}).
+
+%%------------------------------------------------------------
+%% function : destroy_db
+%% Arguments: A DB reference
+%% Returns :
+%%------------------------------------------------------------
+destroy_db(#dbRef{orderRef = ORef, discardRef = undefined}) ->
+ ets:delete(ORef);
+destroy_db(#dbRef{orderRef = ORef, discardRef = DRef}) ->
+ ets:delete(ORef),
+ ets:delete(DRef).
+
+%%------------------------------------------------------------
+%% function : destroy_discard_db
+%% Arguments: A DB reference
+%% Returns :
+%%------------------------------------------------------------
+destroy_discard_db(#dbRef{discardRef = undefined}) ->
+ ok;
+destroy_discard_db(#dbRef{discardRef = DRef}) ->
+ ets:delete(DRef).
+
+%%------------------------------------------------------------
+%% function : destroy_order_db
+%% Arguments: A DB reference
+%% Returns :
+%%------------------------------------------------------------
+destroy_order_db(#dbRef{orderRef = ORef}) ->
+ ets:delete(ORef).
+
+%%------------------------------------------------------------
+%% function : create_db
+%% Arguments: QoS (local representation).
+%% Returns : A DB reference
+%%------------------------------------------------------------
+create_db(QoS, GCTime, GCLimit, TimeRef) ->
+ DiscardRef =
+ case {?not_GetDiscardPolicy(QoS), ?not_GetOrderPolicy(QoS)} of
+ {Equal, Equal} ->
+ undefined;
+ {?not_PriorityOrder, ?not_AnyOrder} ->
+ %% NOTE: Any- and Priority-Order delivery policy is equal.
+ undefined;
+ {?not_RejectNewEvents, _} ->
+ undefined;
+ {?not_AnyOrder, _} ->
+ undefined;
+ {?not_LifoOrder, ?not_FifoOrder} ->
+ undefined;
+ _ ->
+ ets:new(oe_ets, [set, public, ordered_set])
+ end,
+ DBRef = ?CreateRef(ets:new(oe_ets, [set, public, ordered_set]),
+ DiscardRef,
+ ?not_GetOrderPolicy(QoS), ?not_GetDiscardPolicy(QoS),
+ ?not_GetPriority(QoS), ?not_GetMaxEventsPerConsumer(QoS),
+ ?not_GetTimeout(QoS), ?not_GetStartTimeSupported(QoS),
+ ?not_GetStopTimeSupported(QoS), GCTime, GCLimit, TimeRef),
+ if
+ ?is_TimeoutNotUsed(DBRef), ?is_StopTNotSupported(DBRef) ->
+ ok;
+ true ->
+ {M,S,U} = now(),
+ put(oe_GC_timestamp, {M,S+GCTime,U})
+ end,
+ DBRef.
+
+%%------------------------------------------------------------
+%% function : get_prio_mapping_value
+%% Arguments: A MappingFilter reference | undefined
+%% Event (Any or Structured)
+%% Returns : undefined | Data
+%%------------------------------------------------------------
+get_prio_mapping_value(DBRef, _, _) when ?get_DiscardP(DBRef) =/= ?not_PriorityOrder,
+ ?get_OrderP(DBRef) =/= ?not_AnyOrder,
+ ?get_OrderP(DBRef) =/= ?not_PriorityOrder ->
+ false;
+get_prio_mapping_value(_, undefined, _) ->
+ undefined;
+get_prio_mapping_value(_, MFilter, Event) when is_record(Event, 'any') ->
+ case catch 'CosNotifyFilter_MappingFilter':match(MFilter, Event) of
+ {false, DefVal} when is_record(DefVal, 'any') ->
+ any:get_value(DefVal);
+ {true, Matched} when is_record(Matched, 'any') ->
+ any:get_value(Matched);
+ _ ->
+ undefined
+ end;
+get_prio_mapping_value(_, MFilter, Event) ->
+ case catch 'CosNotifyFilter_MappingFilter':match_structured(MFilter, Event) of
+ {false, DefVal} when is_record(DefVal, 'any') ->
+ any:get_value(DefVal);
+ {true, Matched} when is_record(Matched, 'any') ->
+ any:get_value(Matched);
+ _ ->
+ undefined
+ end.
+
+%%------------------------------------------------------------
+%% function : get_life_mapping_value
+%% Arguments: A MappingFilter reference | undefined
+%% Event (Any or Structured)
+%% Returns : undefined | Data
+%%------------------------------------------------------------
+get_life_mapping_value(DBRef, _, _) when ?get_DiscardP(DBRef) =/= ?not_DeadlineOrder,
+ ?get_OrderP(DBRef) =/= ?not_DeadlineOrder,
+ ?is_StopTNotSupported(DBRef) ->
+ false;
+get_life_mapping_value(_, undefined, _) ->
+ undefined;
+get_life_mapping_value(_, MFilter, Event) when is_record(Event, 'any') ->
+ case catch 'CosNotifyFilter_MappingFilter':match(MFilter, Event) of
+ {false, DefVal} when is_record(DefVal, 'any') ->
+ any:get_value(DefVal);
+ {true, Matched} when is_record(Matched, 'any') ->
+ any:get_value(Matched);
+ _ ->
+ undefined
+ end;
+get_life_mapping_value(_, MFilter, Event) ->
+ case catch 'CosNotifyFilter_MappingFilter':match_structured(MFilter, Event) of
+ {false, DefVal} when is_record(DefVal, 'any') ->
+ any:get_value(DefVal);
+ {true, Matched} when is_record(Matched, 'any') ->
+ any:get_value(Matched);
+ _ ->
+ undefined
+ end.
+
+%%------------------------------------------------------------
+%% function : validate_event
+%% Arguments: Subscribe data
+%% A sequence of Events, 'structured' or an 'any' record
+%% A list of filter references
+%% Status, i.e., do we have to filter the events or just check subscr.
+%% Returns : A tuple of two lists; list1 the events that passed
+%% and list2 the events that didn't pass.
+%%------------------------------------------------------------
+validate_event(true, Events, Filters, _, 'MATCH') ->
+ filter_events(Events, Filters, false);
+validate_event(true, Events, _Filters, _, _) ->
+ {Events, []};
+validate_event({_Which, _WC}, Event, Filters, _, 'MATCH') when is_record(Event, any) ->
+ filter_events(Event, Filters, false);
+validate_event({_Which, _WC}, Event, _Filters, _, _) when is_record(Event, any) ->
+ {Event, []};
+validate_event({Which, WC}, Events, Filters, DBRef, 'MATCH') ->
+ Passed=validate_event2(DBRef, Events, Which, WC, []),
+ filter_events(Passed, Filters, true);
+validate_event({Which, WC}, Events, _Filters, DBRef, _) ->
+ Passed=validate_event2(DBRef, Events, Which, WC, []),
+ {lists:reverse(Passed), []}.
+
+validate_event2(_, [], _, _, []) ->
+ [];
+validate_event2(_, [], _, _, Acc) ->
+ Acc;
+validate_event2(DBRef, [Event|T], Which, WC, Acc) ->
+ ET = ((Event#'CosNotification_StructuredEvent'.header)
+ #'CosNotification_EventHeader'.fixed_header)
+ #'CosNotification_FixedEventHeader'.event_type,
+ CheckList =
+ case Which of
+ both ->
+ [ET];
+ domain ->
+ [ET,
+ ET#'CosNotification_EventType'{type_name=""},
+ ET#'CosNotification_EventType'{type_name="*"}];
+ type ->
+ [ET,
+ ET#'CosNotification_EventType'{domain_name=""},
+ ET#'CosNotification_EventType'{domain_name="*"}];
+ _ ->
+ [ET,
+ ET#'CosNotification_EventType'{type_name=""},
+ ET#'CosNotification_EventType'{type_name="*"},
+ ET#'CosNotification_EventType'{domain_name=""},
+ ET#'CosNotification_EventType'{domain_name="*"}]
+ end,
+ case check_subscription(DBRef, CheckList) of
+ true ->
+ validate_event2(DBRef, T, Which, WC, [Event|Acc]);
+ _->
+ case catch cosNotification_Filter:match_types(
+ ET#'CosNotification_EventType'.domain_name,
+ ET#'CosNotification_EventType'.type_name,
+ WC) of
+ true ->
+ validate_event2(DBRef, T, Which, WC, [Event|Acc]);
+ _->
+ validate_event2(DBRef, T, Which, WC, Acc)
+ end
+ end.
+
+check_subscription(_, []) ->
+ false;
+check_subscription(DBRef, [H|T]) ->
+ case ets:lookup(DBRef, H) of
+ [] ->
+ check_subscription(DBRef, T);
+ _ ->
+ true
+ end.
+
+
+%%------------------------------------------------------------
+%% function : filter_events
+%% Arguments: A sequence of structured Events or #any
+%% Returns : A tuple of two lists; list1 the events that passed
+%% and list2 the events that didn't pass.
+%%------------------------------------------------------------
+
+filter_events(Events, []) ->
+ {Events, []};
+filter_events(Events, Filters) ->
+ filter_events(Events, Filters, [], [], false).
+
+filter_events(Events, [], false) ->
+ {Events, []};
+filter_events(Events, [], _) ->
+ {lists:reverse(Events), []};
+filter_events(Events, Filters, Reversed) ->
+ filter_events(Events, Filters, [], [], Reversed).
+
+filter_events([], _, AccPassed, AccFailed, false) ->
+ {lists:reverse(AccPassed), lists:reverse(AccFailed)};
+filter_events([], _, AccPassed, AccFailed, _) ->
+ {AccPassed, AccFailed};
+filter_events([H|T], Filters, AccPassed, AccFailed, Reversed) ->
+ case call_filters(Filters, H) of
+ true ->
+ filter_events(T, Filters, [H|AccPassed], AccFailed, Reversed);
+ _ ->
+ filter_events(T, Filters, AccPassed, [H|AccFailed], Reversed)
+ end;
+filter_events(Any, Filters, _AccPassed, _AccFailed, _Reversed) ->
+ case call_filters(Filters, Any) of
+ true ->
+ {Any, []};
+ _ ->
+ {[], Any}
+ end.
+
+call_filters([], _) ->
+ false;
+call_filters([{_,H}|T], Event) when is_record(Event, any) ->
+ case catch 'CosNotifyFilter_Filter':match(H, Event) of
+ true ->
+ true;
+ _->
+ call_filters(T, Event)
+ end;
+call_filters([{_,H}|T], Event) when ?not_isConvertedAny(Event) ->
+ case catch 'CosNotifyFilter_Filter':match(H,
+ Event#'CosNotification_StructuredEvent'.remainder_of_body) of
+ true ->
+ true;
+ _->
+ call_filters(T, Event)
+ end;
+call_filters([{_,H}|T], Event) ->
+ case catch 'CosNotifyFilter_Filter':match_structured(H, Event) of
+ true ->
+ true;
+ _->
+ call_filters(T, Event)
+ end.
+
+
+%%--------------- END OF MODULE ------------------------------