aboutsummaryrefslogblamecommitdiffstats
path: root/lib/dialyzer/test/user_SUITE_data/src/gcpFlowControl.erl
blob: 8598efb5d1e51bc2e5f94c6563546d9029435db8 (plain) (tree)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
















                                                                      
   



                                                                 
   
                       
   














































































                                                                       
               










































                                                                      
 




                                                                      
            



























                                                                            
            















                                                                     
             

















                                                                 
             








                                                                     
            

























                                                                      
                   

































                                                                      
         


















                                                                      
               








































                                                                          
                     








                                                               
                                       
































                                                                      
%%%-------------------------------------------------------------------
%%% File    : gcpFlowControl.erl
%%% Author  : EAB/UPD/AV
%%% Description : Implements overload protection.
%%%-------------------------------------------------------------------
-module(gcpFlowControl).
-id('24/190 55-CNA 113 033 Ux').
-vsn('/main/R1A/14').
-date('2005-05-04').
-author('uabasve').
%%% ----------------------------------------------------------
%%% %CCaseTemplateFile: module.erl %
%%% %CCaseTemplateId: 16/002 01-FEA 202 714 Ux, Rev: /main/4 %
%%%
%%% Copyright (C) 2001-2005 by Ericsson Telecom AB
%%% SE-126 25  STOCKHOLM
%%% SWEDEN, tel int + 46 8 719 0000
%%%
%%% The program may be used and/or copied only with the written
%%% permission from Ericsson Telecom AB, or in accordance with
%%% the terms and conditions stipulated in the agreement/contract
%%% under which the program has been supplied.
%%%
%%% All rights reserved
%%%
%%%
%%% ----------------------------------------------------------
%%% #1.    REVISION LOG
%%% ----------------------------------------------------------
%%% Rev        Date       Name        What
%%% --------   --------   --------    ------------------------
%%% R1A/1-2    05-02-07   ejojmjn     Copied from EAS R7A/11.
%%% R1A/3-14   05-03-14   uabasve     Clean.
%%%--------------------------------------------------------------------

-include_lib("megaco/include/megaco.hrl").
-include_lib("megaco/include/megaco_message_v1.hrl").
-include("gcp.hrl").

-export([send_request/4,     %% user send from gcpInterface
         receive_reply/2,    %% from callback in gcpTransaction
         init_ets_tables/1,
         init_data/2]).

-define(PRIO_INFINITY, 16).
-define(MIN_WINDOW, 10).
-define(MAX_WINDOW, 100).

-define(BUCKET_MAX, 100).
-define(BUCKET_THRESH_HIGH, 80).
-define(BUCKET_THRESH_LOW, 20).

-define(ALLOW_TIMEOUT, 1000).

%% Holds counters for flow control in GCP
-record(gcpFlowControlTable,
	{key,
	 window        = 50,
	 available     = 50,
	 bucket        = 0,
	 q             = 0,
	 sent          = 0,   %% Counts all attempts
	 rejectable    = 0,   %% Counts rejectable attempts
	 t95,
	 errors        = 0,
	 rejects       = 0,
	 replies       = 0}).

-record(gcpFlowControlBitmap,
        {key,
         count = 0}).

%%====================================================================
%% External functions
%%====================================================================

%%--------------------------------------------------------------------
%% Function: send_request/4
%%
%% Output: ok | {error, Reason}
%%--------------------------------------------------------------------

send_request(ActiveLink, TimerOptions, ActionRequests, UserData) ->
    #gcpActiveLinkTable{key = Key,
                        conn_handle = ConnHandle}
        = ActiveLink,
    Prio = prio(ActionRequests),
    incr(Key, sent),
    case allow(Key, Prio) of
        {true, Timestamp} ->
            grant_request(user_data(ConnHandle),
                          Key,
                          Prio,
                          Timestamp,
                          ConnHandle,
                          TimerOptions,
                          ActionRequests,
                          UserData);
        false ->
            {error, rejected}
    end.

%%--------------------------------------------------------------------
%% Function: receive_reply/2
%% Description:
%%--------------------------------------------------------------------

receive_reply(Key, Timestamp) ->
    incr(Key, available),
    incr(Key, replies),
    release(Key),
    report_time(Key, Timestamp).

%%--------------------------------------------------------------------
%% Func: init_ets_tables/1
%%
%% Returns: ok
%%--------------------------------------------------------------------

init_ets_tables(Role) ->
    create_ets(Role, gcpFlowControlTable, #gcpFlowControlTable.key),
    create_ets(Role, gcpFlowControlBitmap, #gcpFlowControlBitmap.key),
    ok.

create_ets(Role, Table, Pos) when integer(Pos) ->
    create_ets(Role,
               Table,
               [named_table, ordered_set, public, {keypos, Pos}]);

create_ets(test, Table, ArgList) ->
    ets:new(Table, ArgList);
create_ets(Role, Table, ArgList) ->
    case ets:info(Table) of
        undefined ->
            sysCmd:ets_new(Table, ArgList);
        _ when Role == ch ->
            sysCmd:inherit_tables([Table]);
        _ when Role == om ->
            ok
    end.

%%--------------------------------------------------------------------
%% Func: init_data/2
%%--------------------------------------------------------------------

init_data(Key, T95) ->
    ets:insert(gcpFlowControlTable, #gcpFlowControlTable{key = Key,
                                                         t95 = T95}).

%%--------------------------------------------------------------------
%%% Internal functions
%%--------------------------------------------------------------------

%%% ----------------------------------------------------------
%%%     incr
%%% ----------------------------------------------------------

cntr(Key, Field) ->
    incr(Key, Field, 0).

incr(Key, Field) ->
    incr(Key, Field, 1).

-define(INCR(Field),
        incr(Key, Field, X) -> upd_c(Key, {#gcpFlowControlTable.Field, X})).

?INCR(sent);
?INCR(replies);
?INCR(q);
?INCR(t95);
?INCR(errors);
?INCR(rejects);
?INCR(rejectable);
?INCR(window);
?INCR(available);

incr(Key, bucket, X)->
    upd_c(Key, {#gcpFlowControlTable.bucket, X, ?BUCKET_MAX, ?BUCKET_MAX}).

upd_c(Key, N) ->
    ets:update_counter(gcpFlowControlTable, Key, N).

%%% ----------------------------------------------------------
%%%     decr
%%%
%%%     Beware that decr is implemented as incr, care has to be taken
%%%     not to bungle things when max/min values are used.
%%% ----------------------------------------------------------

decr(Key, available, X) ->
    upd_c(Key, {#gcpFlowControlTable.available, -X});
decr(Key, window, X) ->
    upd_c(Key, {#gcpFlowControlTable.window, -X});
decr(Key, bucket, X) ->
    upd_c(Key, {#gcpFlowControlTable.bucket, -X, 0, 0}).

decr(Key, Field) ->
    decr(Key, Field, 1).

%%% ----------------------------------------------------------
%%%     allow
%%% ----------------------------------------------------------

allow(Key, ?PRIO_INFINITY) ->
    decr(Key, available),
    {true, now()};

allow(Key, Prio) ->
    incr(Key, rejectable),
    case decr(Key, available) of
        N when N > 0 ->
            {true, no_stamp};
        _ ->
            %% We did not send it, therefore incr available again
            incr(Key, available),
            queue(Key, Prio)
    end.

%%% ----------------------------------------------------------
%%%     queue
%%% ----------------------------------------------------------

queue(Key, Prio) ->
    incr(Key, q),
    T = {Key, Prio, now(), self()},
    ets:insert(gcpFlowControlBitmap, #gcpFlowControlBitmap{key = T}),
    wait(T).

%%% ----------------------------------------------------------
%%%     wait
%%% ----------------------------------------------------------

wait({Key, _Prio, _When, _Self} = T) ->
    receive
        allow ->
            ets:delete(gcpFlowControlBitmap, T),
            decr(Key, available),
            {true, no_stamp}
    after ?ALLOW_TIMEOUT ->
            timeout(T),
            adjust_window(Key),
            incr(Key, rejects),
            false
    end.

timeout(T) ->
    case ets:update_counter(gcpFlowControlBitmap, T, 1) of
        1 ->
            %% Got the lock: no one has released Key and sent 'allow'.
            ets:delete(gcpFlowControlBitmap, T),
            ok;
        _ ->
            %% A releasing process got the lock: 'allow' has been
            %% sent. Try to remove the message before proceeding.
            %% (This is to keep mdisp from complaining apparently.)
            ets:delete(gcpFlowControlBitmap, T),
            receive
                allow ->
                    ok
            after ?ALLOW_TIMEOUT ->
                    io:format("~p: errant allow: ~p~n", [?MODULE, T])
            end
    end.

%% Now, if we reject and our general response time is low
%% (i.e. low bucket) then we increase the window size.
adjust_window(Key) ->
    adjust_window(Key,
                  cntr(Key, bucket) < ?BUCKET_THRESH_LOW
                  andalso cntr(Key, window) < ?MAX_WINDOW).

adjust_window(Key, true) ->
    incr(Key, window),
    incr(Key, available),
    incr(Key, bucket, 20);
adjust_window(_, false) ->
    ok.

%%--------------------------------------------------------------------
%% Func: report_time/2
%%--------------------------------------------------------------------

report_time(_, no_stamp) ->
    ok;
report_time(Key, {MS, S, Ms})->
    {MegaSecs, Secs, MicroSecs} = now(),
    p(Key,
      MicroSecs - Ms + 1000000*(Secs - S + 1000000*(MegaSecs - MS)),
      cntr(Key, t95)).

%%% ----------------------------------------------------------
%%%     p
%%% ----------------------------------------------------------

p(Key, Time, T95) when Time =< T95 ->
    decr(Key, bucket);
p(Key, _Time, _T95) ->
    %% If we have a long response time, then increase the leaky
    %% bucket. If the bucket is over the high watermark and the window
    %% is not already at its minimum size, then decrease the window
    %% and available.
    case {cntr(Key, window), incr(Key, bucket, 20)} of
        {Window, Bucket} when Window > ?MIN_WINDOW,
                              Bucket > ?BUCKET_THRESH_HIGH ->
            decr(Key, window),
            decr(Key, available);
        _ ->
            ok
    end.

%%% ----------------------------------------------------------
%%%     release
%%% ----------------------------------------------------------

release(Key) ->
    %% The choice of the key below will cause ets:prev/2 to return
    %% the key with the highest priority which was queued most
    %% recently. This relies on the fact that integers sort before
    %% atoms, the atom 'prio' in this case. The atoms 'queued' and
    %% 'pid' are of no significance.
    release(Key, {Key, prio, queued, pid}).

%% This isn't a (FIFO) queue within each priority, but a (LIFO) stack.

release(Key, T) ->
    release(Key, cntr(Key, available), ets:prev(gcpFlowControlBitmap, T)).

%% Note that only keys on the same Key are matched.
release(Key, N, {Key, _Prio, _When, Pid} = T) when N > 0 ->
    case catch ets:update_counter(gcpFlowControlBitmap, T, 1) of
        1 ->
            Pid ! allow;
        _ ->
            %% Another process has released this key.
            release(Key, T)
    end;

release(_, _, _)->
    ok.

%%% ----------------------------------------------------------
%%%     user_data
%%% ----------------------------------------------------------

user_data(ConnHandle) ->
    case catch megaco:conn_info(ConnHandle, reply_data) of
        {'EXIT', _Reason} ->
            false;
        Rec ->
            {value, Rec}
    end.

%%% ----------------------------------------------------------
%%%     grant_request
%%% ----------------------------------------------------------

grant_request({value, Rec},
              Key, Prio, Time,
              ConnHandle, Options, ActionRequests, UserData) ->
    ReplyData = Rec#gcpReplyData{user_data = UserData,
                                 prio      = Prio,
                                 timestamp = Time},
    cast_rc(megaco:cast(ConnHandle,
                        ActionRequests,
                        [{reply_data, ReplyData} | Options]),
            Key,
            ActionRequests);

grant_request(false, Key, _, _, _, _, _, _) ->
    incr(Key, available),
    {error, reply_data}.

cast_rc(ok = Ok, _, _) ->
    Ok;
cast_rc({error, Reason}, Key, ActionRequests) ->
    incr(Key, available),
    gcpLib:error_report(?MODULE, send_request, [ActionRequests],
                        "send failed",
                        Reason),
    {error, {encode, Reason}}.

%%--------------------------------------------------------------------
%% Func: prio/1
%% Returns: The priority of the request
%%--------------------------------------------------------------------

prio([ActionRequest | _]) ->
    #'ActionRequest'{contextId = ContextId,
                     contextRequest = ContextRequest}
        = ActionRequest,
    prio(ContextId, ContextRequest).

prio(?megaco_choose_context_id, #'ContextRequest'{priority = Prio})
  when integer(Prio) ->
    Prio;
prio(_, _) ->
    ?PRIO_INFINITY.