diff options
Diffstat (limited to 'lib/dialyzer/test/user_tests_SUITE_data/src/gcpFlowControl.erl')
-rw-r--r-- | lib/dialyzer/test/user_tests_SUITE_data/src/gcpFlowControl.erl | 397 |
1 files changed, 397 insertions, 0 deletions
diff --git a/lib/dialyzer/test/user_tests_SUITE_data/src/gcpFlowControl.erl b/lib/dialyzer/test/user_tests_SUITE_data/src/gcpFlowControl.erl new file mode 100644 index 0000000000..1653220352 --- /dev/null +++ b/lib/dialyzer/test/user_tests_SUITE_data/src/gcpFlowControl.erl @@ -0,0 +1,397 @@ +%%%------------------------------------------------------------------- +%%% File : gcpFlowControl.erl +%%% Author : EAB/UPD/AV +%%% Description : Implements overload protection. +%%%------------------------------------------------------------------- +-module(gcpFlowControl). +-id('24/190 55-CNA 113 033 Ux'). +-vsn('/main/R1A/14'). +-date('2005-05-04'). +-author('uabasve'). +%%% ---------------------------------------------------------- +%%% %CCaseTemplateFile: module.erl % +%%% %CCaseTemplateId: 16/002 01-FEA 202 714 Ux, Rev: /main/4 % +%%% +%%% Copyright (C) 2001-2005 by Ericsson Telecom AB +%%% SE-126 25 STOCKHOLM +%%% SWEDEN, tel int + 46 8 719 0000 +%%% +%%% The program may be used and/or copied only with the written +%%% permission from Ericsson Telecom AB, or in accordance with +%%% the terms and conditions stipulated in the agreement/contract +%%% under which the program has been supplied. +%%% +%%% All rights reserved +%%% +%%% +%%% ---------------------------------------------------------- +%%% #1. REVISION LOG +%%% ---------------------------------------------------------- +%%% Rev Date Name What +%%% -------- -------- -------- ------------------------ +%%% R1A/1-2 05-02-07 ejojmjn Copied from EAS R7A/11. +%%% R1A/3-14 05-03-14 uabasve Clean. +%%%-------------------------------------------------------------------- + +-include_lib("megaco/include/megaco.hrl"). +-include_lib("megaco/include/megaco_message_v1.hrl"). +-include("gcp.hrl"). + +-export([send_request/4, %% user send from gcpInterface + receive_reply/2, %% from callback in gcpTransaction + init_ets_tables/1, + init_data/2]). + +-define(PRIO_INFINITY, 16). +-define(MIN_WINDOW, 10). +-define(MAX_WINDOW, 100). + +-define(BUCKET_MAX, 100). +-define(BUCKET_THRESH_HIGH, 80). +-define(BUCKET_THRESH_LOW, 20). + +-define(ALLOW_TIMEOUT, 1000). + +%% Holds counters for flow control in GCP +-record(gcpFlowControlTable, + {key, + window = 50, + available = 50, + bucket = 0, + q = 0, + sent = 0, %% Counts all attempts + rejectable = 0, %% Counts rejectable attempts + t95, + errors = 0, + rejects = 0, + replies = 0}). + +-record(gcpFlowControlBitmap, + {key, + count = 0}). + +%%==================================================================== +%% External functions +%%==================================================================== + +%%-------------------------------------------------------------------- +%% Function: send_request/4 +%% +%% Output: ok | {error, Reason} +%%-------------------------------------------------------------------- + +send_request(ActiveLink, TimerOptions, ActionRequests, UserData) -> + #gcpActiveLinkTable{key = Key, + conn_handle = ConnHandle} + = ActiveLink, + Prio = prio(ActionRequests), + incr(Key, sent), + case allow(Key, Prio) of + {true, Timestamp} -> + grant_request(user_data(ConnHandle), + Key, + Prio, + Timestamp, + ConnHandle, + TimerOptions, + ActionRequests, + UserData); + false -> + {error, rejected} + end. + +%%-------------------------------------------------------------------- +%% Function: receive_reply/2 +%% Description: +%%-------------------------------------------------------------------- + +receive_reply(Key, Timestamp) -> + incr(Key, available), + incr(Key, replies), + release(Key), + report_time(Key, Timestamp). + +%%-------------------------------------------------------------------- +%% Func: init_ets_tables/1 +%% +%% Returns: ok +%%-------------------------------------------------------------------- + +init_ets_tables(Role) -> + create_ets(Role, gcpFlowControlTable, #gcpFlowControlTable.key), + create_ets(Role, gcpFlowControlBitmap, #gcpFlowControlBitmap.key), + ok. + +create_ets(Role, Table, Pos) when integer(Pos) -> + create_ets(Role, + Table, + [named_table, ordered_set, public, {keypos, Pos}]); + +create_ets(test, Table, ArgList) -> + ets:new(Table, ArgList); +create_ets(Role, Table, ArgList) -> + case ets:info(Table) of + undefined -> + sysCmd:ets_new(Table, ArgList); + _ when Role == ch -> + sysCmd:inherit_tables([Table]); + _ when Role == om -> + ok + end. + +%%-------------------------------------------------------------------- +%% Func: init_data/2 +%%-------------------------------------------------------------------- + +init_data(Key, T95) -> + ets:insert(gcpFlowControlTable, #gcpFlowControlTable{key = Key, + t95 = T95}). + +%%-------------------------------------------------------------------- +%%% Internal functions +%%-------------------------------------------------------------------- + +%%% ---------------------------------------------------------- +%%% incr +%%% ---------------------------------------------------------- + +cntr(Key, Field) -> + incr(Key, Field, 0). + +incr(Key, Field) -> + incr(Key, Field, 1). + +-define(INCR(Field), + incr(Key, Field, X) -> upd_c(Key, {#gcpFlowControlTable.Field, X})). + +?INCR(sent); +?INCR(replies); +?INCR(q); +?INCR(t95); +?INCR(errors); +?INCR(rejects); +?INCR(rejectable); +?INCR(window); +?INCR(available); + +incr(Key, bucket, X)-> + upd_c(Key, {#gcpFlowControlTable.bucket, X, ?BUCKET_MAX, ?BUCKET_MAX}). + +upd_c(Key, N) -> + ets:update_counter(gcpFlowControlTable, Key, N). + +%%% ---------------------------------------------------------- +%%% decr +%%% +%%% Beware that decr is implemented as incr, care has to be taken +%%% not to bungle things when max/min values are used. +%%% ---------------------------------------------------------- + +decr(Key, available, X) -> + upd_c(Key, {#gcpFlowControlTable.available, -X}); +decr(Key, window, X) -> + upd_c(Key, {#gcpFlowControlTable.window, -X}); +decr(Key, bucket, X) -> + upd_c(Key, {#gcpFlowControlTable.bucket, -X, 0, 0}). + +decr(Key, Field) -> + decr(Key, Field, 1). + +%%% ---------------------------------------------------------- +%%% allow +%%% ---------------------------------------------------------- + +allow(Key, ?PRIO_INFINITY) -> + decr(Key, available), + {true, now()}; + +allow(Key, Prio) -> + incr(Key, rejectable), + case decr(Key, available) of + N when N > 0 -> + {true, no_stamp}; + _ -> + %% We did not send it, therefore incr available again + incr(Key, available), + queue(Key, Prio) + end. + +%%% ---------------------------------------------------------- +%%% queue +%%% ---------------------------------------------------------- + +queue(Key, Prio) -> + incr(Key, q), + T = {Key, Prio, now(), self()}, + ets:insert(gcpFlowControlBitmap, #gcpFlowControlBitmap{key = T}), + wait(T). + +%%% ---------------------------------------------------------- +%%% wait +%%% ---------------------------------------------------------- + +wait({Key, _Prio, _When, _Self} = T) -> + receive + allow -> + ets:delete(gcpFlowControlBitmap, T), + decr(Key, available), + {true, no_stamp} + after ?ALLOW_TIMEOUT -> + timeout(T), + adjust_window(Key), + incr(Key, rejects), + false + end. + +timeout(T) -> + case ets:update_counter(gcpFlowControlBitmap, T, 1) of + 1 -> + %% Got the lock: no one has released Key and sent 'allow'. + ets:delete(gcpFlowControlBitmap, T), + ok; + _ -> + %% A releasing process got the lock: 'allow' has been + %% sent. Try to remove the message before proceeding. + %% (This is to keep mdisp from complaining apparently.) + ets:delete(gcpFlowControlBitmap, T), + receive + allow -> + ok + after ?ALLOW_TIMEOUT -> + io:format("~p: errant allow: ~p~n", [?MODULE, T]) + end + end. + +%% Now, if we reject and our general response time is low +%% (i.e. low bucket) then we increase the window size. +adjust_window(Key) -> + adjust_window(Key, + cntr(Key, bucket) < ?BUCKET_THRESH_LOW + andalso cntr(Key, window) < ?MAX_WINDOW). + +adjust_window(Key, true) -> + incr(Key, window), + incr(Key, available), + incr(Key, bucket, 20); +adjust_window(_, false) -> + ok. + +%%-------------------------------------------------------------------- +%% Func: report_time/2 +%%-------------------------------------------------------------------- + +report_time(_, no_stamp) -> + ok; +report_time(Key, {MS, S, Ms})-> + {MegaSecs, Secs, MicroSecs} = now(), + p(Key, + MicroSecs - Ms + 1000000*(Secs - S + 1000000*(MegaSecs - MS)), + cntr(Key, t95)). + +%%% ---------------------------------------------------------- +%%% p +%%% ---------------------------------------------------------- + +p(Key, Time, T95) when Time =< T95 -> + decr(Key, bucket); +p(Key, _Time, _T95) -> + %% If we have a long response time, then increase the leaky + %% bucket. If the bucket is over the high watermark and the window + %% is not already at its minimum size, then decrease the window + %% and available. + case {cntr(Key, window), incr(Key, bucket, 20)} of + {Window, Bucket} when Window > ?MIN_WINDOW, + Bucket > ?BUCKET_THRESH_HIGH -> + decr(Key, window), + decr(Key, available); + _ -> + ok + end. + +%%% ---------------------------------------------------------- +%%% release +%%% ---------------------------------------------------------- + +release(Key) -> + %% The choice of the key below will cause ets:prev/2 to return + %% the key with the highest priority which was queued most + %% recently. This relies on the fact that integers sort before + %% atoms, the atom 'prio' in this case. The atoms 'queued' and + %% 'pid' are of no significance. + release(Key, {Key, prio, queued, pid}). + +%% This isn't a (FIFO) queue within each priority, but a (LIFO) stack. + +release(Key, T) -> + release(Key, cntr(Key, available), ets:prev(gcpFlowControlBitmap, T)). + +%% Note that only keys on the same Key are matched. +release(Key, N, {Key, _Prio, _When, Pid} = T) when N > 0 -> + case catch ets:update_counter(gcpFlowControlBitmap, T, 1) of + 1 -> + Pid ! allow; + _ -> + %% Another process has released this key. + release(Key, T) + end; + +release(_, _, _)-> + ok. + +%%% ---------------------------------------------------------- +%%% user_data +%%% ---------------------------------------------------------- + +user_data(ConnHandle) -> + case catch megaco:conn_info(ConnHandle, reply_data) of + {'EXIT', _Reason} -> + false; + Rec -> + {value, Rec} + end. + +%%% ---------------------------------------------------------- +%%% grant_request +%%% ---------------------------------------------------------- + +grant_request({value, Rec}, + Key, Prio, Time, + ConnHandle, Options, ActionRequests, UserData) -> + ReplyData = Rec#gcpReplyData{user_data = UserData, + prio = Prio, + timestamp = Time}, + cast_rc(megaco:cast(ConnHandle, + ActionRequests, + [{reply_data, ReplyData} | Options]), + Key, + ActionRequests); + +grant_request(false, Key, _, _, _, _, _, _) -> + incr(Key, available), + {error, reply_data}. + +cast_rc(ok = Ok, _, _) -> + Ok; +cast_rc({error, Reason}, Key, ActionRequests) -> + incr(Key, available), + gcpLib:error_report(?MODULE, send_request, [ActionRequests], + "send failed", + Reason), + {error, {encode, Reason}}. + +%%-------------------------------------------------------------------- +%% Func: prio/1 +%% Returns: The priority of the request +%%-------------------------------------------------------------------- + +prio([ActionRequest | _]) -> + #'ActionRequest'{contextId = ContextId, + contextRequest = ContextRequest} + = ActionRequest, + prio(ContextId, ContextRequest). + +prio(?megaco_choose_context_id, #'ContextRequest'{priority = Prio}) + when integer(Prio) -> + Prio; +prio(_, _) -> + ?PRIO_INFINITY. |