%%%------------------------------------------------------------------- %%% File : gcpFlowControl.erl %%% Author : EAB/UPD/AV %%% Description : Implements overload protection. %%%------------------------------------------------------------------- -module(gcpFlowControl). -id('24/190 55-CNA 113 033 Ux'). -vsn('/main/R1A/14'). -date('2005-05-04'). -author('uabasve'). %%% ---------------------------------------------------------- %%% %CCaseTemplateFile: module.erl % %%% %CCaseTemplateId: 16/002 01-FEA 202 714 Ux, Rev: /main/4 % %%% %%% Copyright (C) 2001-2005 by Ericsson Telecom AB %%% SE-126 25 STOCKHOLM %%% SWEDEN, tel int + 46 8 719 0000 %%% %%% The program may be used and/or copied only with the written %%% permission from Ericsson Telecom AB, or in accordance with %%% the terms and conditions stipulated in the agreement/contract %%% under which the program has been supplied. %%% %%% All rights reserved %%% %%% %%% ---------------------------------------------------------- %%% #1. REVISION LOG %%% ---------------------------------------------------------- %%% Rev Date Name What %%% -------- -------- -------- ------------------------ %%% R1A/1-2 05-02-07 ejojmjn Copied from EAS R7A/11. %%% R1A/3-14 05-03-14 uabasve Clean. %%%-------------------------------------------------------------------- -include_lib("megaco/include/megaco.hrl"). -include_lib("megaco/include/megaco_message_v1.hrl"). -include("gcp.hrl"). -export([send_request/4, %% user send from gcpInterface receive_reply/2, %% from callback in gcpTransaction init_ets_tables/1, init_data/2]). -define(PRIO_INFINITY, 16). -define(MIN_WINDOW, 10). -define(MAX_WINDOW, 100). -define(BUCKET_MAX, 100). -define(BUCKET_THRESH_HIGH, 80). -define(BUCKET_THRESH_LOW, 20). -define(ALLOW_TIMEOUT, 1000). %% Holds counters for flow control in GCP -record(gcpFlowControlTable, {key, window = 50, available = 50, bucket = 0, q = 0, sent = 0, %% Counts all attempts rejectable = 0, %% Counts rejectable attempts t95, errors = 0, rejects = 0, replies = 0}). -record(gcpFlowControlBitmap, {key, count = 0}). %%==================================================================== %% External functions %%==================================================================== %%-------------------------------------------------------------------- %% Function: send_request/4 %% %% Output: ok | {error, Reason} %%-------------------------------------------------------------------- send_request(ActiveLink, TimerOptions, ActionRequests, UserData) -> #gcpActiveLinkTable{key = Key, conn_handle = ConnHandle} = ActiveLink, Prio = prio(ActionRequests), incr(Key, sent), case allow(Key, Prio) of {true, Timestamp} -> grant_request(user_data(ConnHandle), Key, Prio, Timestamp, ConnHandle, TimerOptions, ActionRequests, UserData); false -> {error, rejected} end. %%-------------------------------------------------------------------- %% Function: receive_reply/2 %% Description: %%-------------------------------------------------------------------- receive_reply(Key, Timestamp) -> incr(Key, available), incr(Key, replies), release(Key), report_time(Key, Timestamp). %%-------------------------------------------------------------------- %% Func: init_ets_tables/1 %% %% Returns: ok %%-------------------------------------------------------------------- init_ets_tables(Role) -> create_ets(Role, gcpFlowControlTable, #gcpFlowControlTable.key), create_ets(Role, gcpFlowControlBitmap, #gcpFlowControlBitmap.key), ok. create_ets(Role, Table, Pos) when integer(Pos) -> create_ets(Role, Table, [named_table, ordered_set, public, {keypos, Pos}]); create_ets(test, Table, ArgList) -> ets:new(Table, ArgList); create_ets(Role, Table, ArgList) -> case ets:info(Table) of undefined -> sysCmd:ets_new(Table, ArgList); _ when Role == ch -> sysCmd:inherit_tables([Table]); _ when Role == om -> ok end. %%-------------------------------------------------------------------- %% Func: init_data/2 %%-------------------------------------------------------------------- init_data(Key, T95) -> ets:insert(gcpFlowControlTable, #gcpFlowControlTable{key = Key, t95 = T95}). %%-------------------------------------------------------------------- %%% Internal functions %%-------------------------------------------------------------------- %%% ---------------------------------------------------------- %%% incr %%% ---------------------------------------------------------- cntr(Key, Field) -> incr(Key, Field, 0). incr(Key, Field) -> incr(Key, Field, 1). -define(INCR(Field), incr(Key, Field, X) -> upd_c(Key, {#gcpFlowControlTable.Field, X})). ?INCR(sent); ?INCR(replies); ?INCR(q); ?INCR(t95); ?INCR(errors); ?INCR(rejects); ?INCR(rejectable); ?INCR(window); ?INCR(available); incr(Key, bucket, X)-> upd_c(Key, {#gcpFlowControlTable.bucket, X, ?BUCKET_MAX, ?BUCKET_MAX}). upd_c(Key, N) -> ets:update_counter(gcpFlowControlTable, Key, N). %%% ---------------------------------------------------------- %%% decr %%% %%% Beware that decr is implemented as incr, care has to be taken %%% not to bungle things when max/min values are used. %%% ---------------------------------------------------------- decr(Key, available, X) -> upd_c(Key, {#gcpFlowControlTable.available, -X}); decr(Key, window, X) -> upd_c(Key, {#gcpFlowControlTable.window, -X}); decr(Key, bucket, X) -> upd_c(Key, {#gcpFlowControlTable.bucket, -X, 0, 0}). decr(Key, Field) -> decr(Key, Field, 1). %%% ---------------------------------------------------------- %%% allow %%% ---------------------------------------------------------- allow(Key, ?PRIO_INFINITY) -> decr(Key, available), {true, now()}; allow(Key, Prio) -> incr(Key, rejectable), case decr(Key, available) of N when N > 0 -> {true, no_stamp}; _ -> %% We did not send it, therefore incr available again incr(Key, available), queue(Key, Prio) end. %%% ---------------------------------------------------------- %%% queue %%% ---------------------------------------------------------- queue(Key, Prio) -> incr(Key, q), T = {Key, Prio, now(), self()}, ets:insert(gcpFlowControlBitmap, #gcpFlowControlBitmap{key = T}), wait(T). %%% ---------------------------------------------------------- %%% wait %%% ---------------------------------------------------------- wait({Key, _Prio, _When, _Self} = T) -> receive allow -> ets:delete(gcpFlowControlBitmap, T), decr(Key, available), {true, no_stamp} after ?ALLOW_TIMEOUT -> timeout(T), adjust_window(Key), incr(Key, rejects), false end. timeout(T) -> case ets:update_counter(gcpFlowControlBitmap, T, 1) of 1 -> %% Got the lock: no one has released Key and sent 'allow'. ets:delete(gcpFlowControlBitmap, T), ok; _ -> %% A releasing process got the lock: 'allow' has been %% sent. Try to remove the message before proceeding. %% (This is to keep mdisp from complaining apparently.) ets:delete(gcpFlowControlBitmap, T), receive allow -> ok after ?ALLOW_TIMEOUT -> io:format("~p: errant allow: ~p~n", [?MODULE, T]) end end. %% Now, if we reject and our general response time is low %% (i.e. low bucket) then we increase the window size. adjust_window(Key) -> adjust_window(Key, cntr(Key, bucket) < ?BUCKET_THRESH_LOW andalso cntr(Key, window) < ?MAX_WINDOW). adjust_window(Key, true) -> incr(Key, window), incr(Key, available), incr(Key, bucket, 20); adjust_window(_, false) -> ok. %%-------------------------------------------------------------------- %% Func: report_time/2 %%-------------------------------------------------------------------- report_time(_, no_stamp) -> ok; report_time(Key, {MS, S, Ms})-> {MegaSecs, Secs, MicroSecs} = now(), p(Key, MicroSecs - Ms + 1000000*(Secs - S + 1000000*(MegaSecs - MS)), cntr(Key, t95)). %%% ---------------------------------------------------------- %%% p %%% ---------------------------------------------------------- p(Key, Time, T95) when Time =< T95 -> decr(Key, bucket); p(Key, _Time, _T95) -> %% If we have a long response time, then increase the leaky %% bucket. If the bucket is over the high watermark and the window %% is not already at its minimum size, then decrease the window %% and available. case {cntr(Key, window), incr(Key, bucket, 20)} of {Window, Bucket} when Window > ?MIN_WINDOW, Bucket > ?BUCKET_THRESH_HIGH -> decr(Key, window), decr(Key, available); _ -> ok end. %%% ---------------------------------------------------------- %%% release %%% ---------------------------------------------------------- release(Key) -> %% The choice of the key below will cause ets:prev/2 to return %% the key with the highest priority which was queued most %% recently. This relies on the fact that integers sort before %% atoms, the atom 'prio' in this case. The atoms 'queued' and %% 'pid' are of no significance. release(Key, {Key, prio, queued, pid}). %% This isn't a (FIFO) queue within each priority, but a (LIFO) stack. release(Key, T) -> release(Key, cntr(Key, available), ets:prev(gcpFlowControlBitmap, T)). %% Note that only keys on the same Key are matched. release(Key, N, {Key, _Prio, _When, Pid} = T) when N > 0 -> case catch ets:update_counter(gcpFlowControlBitmap, T, 1) of 1 -> Pid ! allow; _ -> %% Another process has released this key. release(Key, T) end; release(_, _, _)-> ok. %%% ---------------------------------------------------------- %%% user_data %%% ---------------------------------------------------------- user_data(ConnHandle) -> case catch megaco:conn_info(ConnHandle, reply_data) of {'EXIT', _Reason} -> false; Rec -> {value, Rec} end. %%% ---------------------------------------------------------- %%% grant_request %%% ---------------------------------------------------------- grant_request({value, Rec}, Key, Prio, Time, ConnHandle, Options, ActionRequests, UserData) -> ReplyData = Rec#gcpReplyData{user_data = UserData, prio = Prio, timestamp = Time}, cast_rc(megaco:cast(ConnHandle, ActionRequests, [{reply_data, ReplyData} | Options]), Key, ActionRequests); grant_request(false, Key, _, _, _, _, _, _) -> incr(Key, available), {error, reply_data}. cast_rc(ok = Ok, _, _) -> Ok; cast_rc({error, Reason}, Key, ActionRequests) -> incr(Key, available), gcpLib:error_report(?MODULE, send_request, [ActionRequests], "send failed", Reason), {error, {encode, Reason}}. %%-------------------------------------------------------------------- %% Func: prio/1 %% Returns: The priority of the request %%-------------------------------------------------------------------- prio([ActionRequest | _]) -> #'ActionRequest'{contextId = ContextId, contextRequest = ContextRequest} = ActionRequest, prio(ContextId, ContextRequest). prio(?megaco_choose_context_id, #'ContextRequest'{priority = Prio}) when integer(Prio) -> Prio; prio(_, _) -> ?PRIO_INFINITY.