diff options
author | Anders Svensson <[email protected]> | 2011-10-14 19:40:29 +0200 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2011-10-17 12:30:58 +0200 |
commit | 14bf1dc855bbd973bb15578f418e37ab2d4f17fe (patch) | |
tree | ed67388edcead44b9867eb034c84b4394d0cf117 /lib/diameter/src/app/diameter_sync.erl | |
parent | f4c38ecd803451280d0021bcfa7f2ad25b96cbcd (diff) | |
download | otp-14bf1dc855bbd973bb15578f418e37ab2d4f17fe.tar.gz otp-14bf1dc855bbd973bb15578f418e37ab2d4f17fe.tar.bz2 otp-14bf1dc855bbd973bb15578f418e37ab2d4f17fe.zip |
One makefile for src build instead of recursion
Simpler, no duplication of similar makefiles and makes for
better dependencies. (Aka, recursive make considered harmful.)
Diffstat (limited to 'lib/diameter/src/app/diameter_sync.erl')
-rw-r--r-- | lib/diameter/src/app/diameter_sync.erl | 550 |
1 files changed, 0 insertions, 550 deletions
diff --git a/lib/diameter/src/app/diameter_sync.erl b/lib/diameter/src/app/diameter_sync.erl deleted file mode 100644 index ce2db4b3a2..0000000000 --- a/lib/diameter/src/app/diameter_sync.erl +++ /dev/null @@ -1,550 +0,0 @@ -%% -%% %CopyrightBegin% -%% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. -%% -%% The contents of this file are subject to the Erlang Public License, -%% Version 1.1, (the "License"); you may not use this file except in -%% compliance with the License. You should have received a copy of the -%% Erlang Public License along with this software. If not, it can be -%% retrieved online at http://www.erlang.org/. -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and limitations -%% under the License. -%% -%% %CopyrightEnd% -%% - -%% -%% This module implements a server that serializes requests in named -%% queues. A request is an MFA or fun and a name can be any term. A -%% request is applied in a dedicated process that terminates when -%% the request function returns. -%% - --module(diameter_sync). --behaviour(gen_server). - --export([call/4, call/5, - cast/4, cast/5, - carp/1, carp/2]). - -%% supervisor callback --export([start_link/0]). - -%% gen_server interface --export([init/1, - terminate/2, - handle_call/3, - handle_cast/2, - handle_info/2, - code_change/3]). - -%% test/debug --export([state/0, - uptime/0, - flush/1, - pending/0, - pending/1, - queues/0, - pids/1]). - --include("diameter_internal.hrl"). - -%% Locally registered server name. --define(SERVER, ?MODULE). - -%% Message to the server to queue a request ... --define(REQUEST(CallOrCast, Name, Req, Max, Timeout), - {request, CallOrCast, Name, Req, Max, Timeout}). - -%% ... and to retrieve the pid of the prevailing request process. --define(CARP(Name), - {carp, Name}). - -%% Forever ... --define(TIMEOUT, 30000). - -%% Server state. --record(state, - {time = now(), - pending = 0 :: non_neg_integer(), %% outstanding requests - monitor = new() :: ets:tid(), %% MonitorRef -> {Name, From} - queue = new() :: ets:tid()}). %% Name -> queue of {Pid, Ref} - -%% ---------------------------------------------------------- -%% # call(Node, Name, Req, Max, Timeout) -%% # call(Name, Req, Max, Timeout) -%% -%% Input: Name = term() identifying the queue in which the request is -%% to be evaluated. -%% Req = {M,F,A} -%% | {Fun, Arg} -%% | [Fun | Args] -%% | Fun -%% Max = Upper bound for the number of outstanding requests -%% in the named queue for Req to be queued. -%% If more than this number are in the queue then -%% 'rejected' is returned to the caller. Can be any -%% term but integer() | infinity is sufficient. -%% Timeout = 32 bit integer() number of milliseconds after which -%% request is cancelled (if not already started), causing -%% 'timeout' to be returned to the caller. -%% | infinity -%% -%% Output: Req() | rejected | timeout -%% -%% Description: Serialize a request in a named queue. Note that if -%% 'timeout' is returned and the request itself does not -%% return this atom then request has not been evaluated. -%% ---------------------------------------------------------- - -call(Name, Req, Max, Timeout) -> - call(node(), Name, Req, Max, Timeout). - -call(Node, Name, Req, Max, Timeout) -> - gen_call({?SERVER, Node}, ?REQUEST(call, Name, Req, Max, Timeout)). - -%%% ---------------------------------------------------------- -%%% # cast(Node, Name, Req, Max, Timeout) -%%% # cast(Name, Req, Max, Timeout) -%%% -%%% Output: ok | rejected | timeout -%%% -%%% Description: Serialize a request without returning the result to the -%%% caller. Returns after the task is queued. -%%% ---------------------------------------------------------- - -cast(Name, Req, Max, Timeout) -> - cast(node(), Name, Req, Max, Timeout). - -cast(Node, Name, Req, Max, Timeout) -> - gen_call({?SERVER, Node}, ?REQUEST(cast, Name, Req, Max, Timeout)). - -%% 'timeout' is only return if the server process that processes -%% requests isn't alive. Ditto for call/carp. - -%%% ---------------------------------------------------------- -%%% # carp(Node, Name) -%%% # carp(Name) -%%% -%%% Output: {value, Pid} | false | timeout -%%% -%%% Description: Return the pid of the process processing the task -%%% at the head of the named queue. Note that the value -%%% returned by subsequent calls changes as tasks are -%%% completed, each task executing in a dedicated -%%% process. The exit value of this process will be -%%% {value, Req()} if the task returns. -%%% ---------------------------------------------------------- - -%% The intention of this is to let a process enqueue a task that waits -%% for a message before completing, the target pid being retrieved -%% with carp/[12]. - -carp(Name) -> - carp(node(), Name). - -carp(Node, Name) -> - gen_call({?SERVER, Node}, ?CARP(Name)). - -%%% --------------------------------------------------------- -%%% EXPORTED INTERNAL FUNCTIONS -%%% --------------------------------------------------------- - -state() -> - call(state). - -uptime() -> - call(uptime). - -flush(Name) -> - call({flush, Name}). - -pending() -> - call(pending). - -pending(Name) -> - call({pending, Name}). - -queues() -> - call(queues). - -pids(Name) -> - call({pids, Name}). - -%%% ---------------------------------------------------------- -%%% # start_link() -%%% ---------------------------------------------------------- - -start_link() -> - ServerName = {local, ?SERVER}, - Module = ?MODULE, - Args = [], - Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}], - gen_server:start_link(ServerName, Module, Args, Options). - -%%% ---------------------------------------------------------- -%%% # init(_) -%%% ---------------------------------------------------------- - -init(_) -> - {ok, #state{}}. - -%%% ---------------------------------------------------------- -%%% # handle_call(Request, From, State) -%%% ---------------------------------------------------------- - -%% Enqueue a new request. -handle_call(?REQUEST(Type, Name, Req, Max, Timeout), - From, - #state{queue = QD} = State) -> - T = find(Name, QD), - nq(queued(T) =< Max, T, {Type, From}, Name, Req, Timeout, State); - -handle_call(Request, From, State) -> - {reply, call(Request, From, State), State}. - -%% call/3 - -call(?CARP(Name), _, #state{queue = QD}) -> - pcar(find(Name, QD)); - -call(state, _, State) -> - State; - -call(uptime, _, #state{time = T}) -> - diameter_lib:now_diff(T); - -call({flush, Name}, _, #state{queue = QD}) -> - cancel(find(Name, QD)); - -call(pending, _, #state{pending = N}) -> - N; - -call({pending, Name}, _, #state{queue = QD}) -> - queued(find(Name, QD)); - -call(queues, _, #state{queue = QD}) -> - fetch_keys(QD); - -call({pids, Name}, _, #state{queue = QD}) -> - plist(find(Name, QD)); - -call(Req, From, _State) -> %% ignore - ?UNEXPECTED(handle_call, [Req, From]), - nok. - -%%% ---------------------------------------------------------- -%%% handle_cast(Request, State) -%%% ---------------------------------------------------------- - -handle_cast(Msg, State) -> - ?UNEXPECTED([Msg]), - {noreply, State}. - -%%% ---------------------------------------------------------- -%%% handle_info(Request, State) -%%% ---------------------------------------------------------- - -handle_info(Request, State) -> - {noreply, info(Request, State)}. - -%% info/2 - -%% A request has completed execution or timed out. -info({'DOWN', MRef, process, Pid, Info}, - #state{pending = N, - monitor = MD, - queue = QD} - = State) -> - {Name, From} = fetch(MRef, MD), - reply(From, rc(Info)), - State#state{pending = N-1, - monitor = erase(MRef, MD), - queue = dq(fetch(Name, QD), Pid, Info, Name, QD)}; - -info(Info, State) -> - ?UNEXPECTED(handle_info, [Info]), - State. - -reply({call, From}, T) -> - gen_server:reply(From, T); -reply(cast, _) -> - ok. - -rc({value, T}) -> - T; -rc(_) -> - timeout. - -%%% ---------------------------------------------------------- -%%% code_change(OldVsn, State, Extra) -%%% ---------------------------------------------------------- - -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%% ---------------------------------------------------------- -%%% terminate(Reason, State) -%%% ---------------------------------------------------------- - -terminate(_Reason, _State)-> - ok. - -%%% --------------------------------------------------------- -%%% INTERNAL FUNCTIONS -%%% --------------------------------------------------------- - -%% queued/1 - -queued({ok, {N,_}}) -> - N; -queued(error) -> - 0. - -%% nq/7 - -%% Maximum number of pending requests exceeded ... -nq(false, _, _, _Name, _Req, _Timeout, State) -> - {reply, rejected, State}; - -%% ... or not. -nq(true, T, From, Name, Req, Timeout, #state{pending = N, - monitor = MD, - queue = QD} - = State) -> - Ref = make_ref(), - Pid = init(Ref, Req, timeout(Timeout, T)), - MRef = erlang:monitor(process, Pid), - {noreply, State#state{pending = N+1, - monitor = store(MRef, {Name, from(From)}, MD), - queue = store(Name, nq(T, {Pid, Ref}), QD)}}. - -from({call, _} = T) -> - T; -from({cast = T, From}) -> - gen_server:reply(From, ok), - T. - -%% nq/2 - -%% Other requests in the queue: append. -nq({ok, {N,Q}}, T) -> - {N+1, queue:in(T,Q)}; - -%% Queue is empty: start execution. -nq(error, T) -> - go(T), - {1, queue:from_list([T])}. - -%% Don't timeout if the request is evaluated immediately so as to -%% avoid a race between getting a 'go' and a 'timeout'. Queueing a -%% request in an empty queue always results in execution. -timeout(_, error) -> - infinity; -timeout(Timeout, _) -> - Timeout. - -%% dq/5 -%% -%% A request process has terminated. - -dq({N,Q}, Pid, _Info, Name, QD) -> - {{value, T}, TQ} = queue:out(Q), - dq(N-1, Pid, T, TQ, Name, QD). - -%% dq/6 - -%% Request was at the head of the queue: start another. -dq(N, Pid, {Pid, _}, TQ, Name, QD) -> - dq(N, TQ, Name, QD); - -%% Or not: remove the offender from the queue. -dq(N, Pid, T, TQ, Name, QD) -> - store(Name, {N, req(Pid, queue:from_list([T]), TQ)}, QD). - -%% dq/4 - -%% Queue is empty: erase. -dq(0, TQ, Name, QD) -> - true = queue:is_empty(TQ), %% assert - erase(Name, QD); - -%% Start the next request. -dq(N, TQ, Name, QD) -> - go(queue:head(TQ)), - store(Name, {N, TQ}, QD). - -%% req/3 -%% -%% Find and remove the queue element for the specified pid. - -req(Pid, HQ, Q) -> - {{value, T}, TQ} = queue:out(Q), - req(Pid, T, HQ, TQ). - -req(Pid, {Pid, _}, HQ, TQ) -> - queue:join(HQ, TQ); -req(Pid, T, HQ, TQ) -> - req(Pid, queue:in(T,HQ), TQ). - -%% go/1 - -go({Pid, Ref}) -> - Pid ! {Ref, ok}. - -%% init/4 -%% -%% Start the dedicated process for handling a request. The exit value -%% is as promised by carp/1. - -init(Ref, Req, Timeout) -> - spawn(fun() -> exit(i(Ref, Req, Timeout)) end). - -i(Ref, Req, Timeout) -> - Timer = send_timeout(Ref, Timeout), - MRef = erlang:monitor(process, ?SERVER), - receive - {Ref, ok} -> %% Do the deed. - %% Ensure we don't leave messages in the mailbox since the - %% request itself might receive. Alternatively, could have - %% done the eval in a new process but then we'd have to - %% relay messages arriving at this one. - cancel_timer(Timer), - erlang:demonitor(MRef, [flush]), - %% Ref is to ensure that we don't extract any message that - %% a client may have sent after retrieving self() with - %% carp/1, there being no guarantee that the message - %% banged by go/1 is received before the pid becomes - %% accessible. - {value, eval(Req)}; - {Ref, timeout = T} -> - T; - {'DOWN', MRef, process, _Pid, _Info} = D -> %% server death - D - end. - -send_timeout(_Ref, infinity = No) -> - No; -send_timeout(Ref, Ms) -> - Msg = {Ref, timeout}, - TRef = erlang:send_after(Ms, self(), Msg), - {TRef, Msg}. - -cancel_timer(infinity = No) -> - No; -cancel_timer({TRef, Msg}) -> - flush(Msg, erlang:cancel_timer(TRef)). - -flush(Msg, false) -> %% Message has already been sent ... - %% 'error' should never happen but crash if it does so as not to - %% hang the process. - ok = receive Msg -> ok after ?TIMEOUT -> error end; -flush(_, _) -> %% ... or not. - ok. - -eval({M,F,A}) -> - apply(M,F,A); -eval([Fun | Args]) -> - apply(Fun, Args); -eval({Fun, A}) -> - Fun(A); -eval(Fun) -> - Fun(). - -%% pcar/1 - -pcar({ok, {_,Q}}) -> - {Pid, _Ref} = queue:head(Q), - {value, Pid}; -pcar(error) -> - false. - -%% plist/1 - -plist({ok, {_,Q}}) -> - lists:map(fun({Pid, _Ref}) -> Pid end, queue:to_list(Q)); -plist(error) -> - []. - -%% cancel/1 -%% -%% Cancel all but the active request from the named queue. Return the -%% number of requests cancelled. - -%% Just send timeout messages to each request to make them die. Note -%% that these are guaranteed to arrive before a go message after the -%% current request completes since both messages are sent from the -%% server process. -cancel({ok, {N,Q}}) -> - {_,TQ} = queue:split(1,Q), - foreach(fun({Pid, Ref}) -> Pid ! {Ref, timeout} end, N-1, TQ), - N-1; -cancel(error) -> - 0. - -%% foreach/3 - -foreach(_, 0, _) -> - ok; -foreach(Fun, N, Q) -> - Fun(queue:head(Q)), - foreach(Fun, N-1, queue:tail(Q)). - -%% call/1 - -%% gen_server:call/3 will exit if the target process dies. -call(Request) -> - try - gen_server:call(?SERVER, Request, ?TIMEOUT) - catch - exit: Reason -> - {error, Reason} - end. - -%% dict-like table manipulation. - -erase(Key, Dict) -> - ets:delete(Dict, Key), - Dict. - -fetch(Key, Dict) -> - {ok, V} = find(Key, Dict), - V. - -fetch_keys(Dict) -> - ets:foldl(fun({K,_}, Acc) -> [K | Acc] end, [], Dict). - -find(Key, Dict) -> - case ets:lookup(Dict, Key) of - [{Key, V}] -> - {ok, V}; - [] -> - error - end. - -new() -> - ets:new(?MODULE, [set]). - -store(Key, Value, Dict) -> - store({Key, Value}, Dict). - -store({_,_} = T, Dict) -> - ets:insert(Dict, T), - Dict. - -%% gen_call/1 - -gen_call(Server, Req) -> - gen_call(Server, Req, infinity). - -gen_call(Server, Req, Timeout) -> - try - gen_server:call(Server, Req, Timeout) - catch - exit: _ -> - timeout - end. |