diff options
Diffstat (limited to 'lib/diameter/src/base/diameter_sync.erl')
-rw-r--r-- | lib/diameter/src/base/diameter_sync.erl | 550 |
1 files changed, 550 insertions, 0 deletions
diff --git a/lib/diameter/src/base/diameter_sync.erl b/lib/diameter/src/base/diameter_sync.erl new file mode 100644 index 0000000000..ce2db4b3a2 --- /dev/null +++ b/lib/diameter/src/base/diameter_sync.erl @@ -0,0 +1,550 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +%% +%% This module implements a server that serializes requests in named +%% queues. A request is an MFA or fun and a name can be any term. A +%% request is applied in a dedicated process that terminates when +%% the request function returns. +%% + +-module(diameter_sync). +-behaviour(gen_server). + +-export([call/4, call/5, + cast/4, cast/5, + carp/1, carp/2]). + +%% supervisor callback +-export([start_link/0]). + +%% gen_server interface +-export([init/1, + terminate/2, + handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3]). + +%% test/debug +-export([state/0, + uptime/0, + flush/1, + pending/0, + pending/1, + queues/0, + pids/1]). + +-include("diameter_internal.hrl"). + +%% Locally registered server name. +-define(SERVER, ?MODULE). + +%% Message to the server to queue a request ... +-define(REQUEST(CallOrCast, Name, Req, Max, Timeout), + {request, CallOrCast, Name, Req, Max, Timeout}). + +%% ... and to retrieve the pid of the prevailing request process. +-define(CARP(Name), + {carp, Name}). + +%% Forever ... +-define(TIMEOUT, 30000). + +%% Server state. +-record(state, + {time = now(), + pending = 0 :: non_neg_integer(), %% outstanding requests + monitor = new() :: ets:tid(), %% MonitorRef -> {Name, From} + queue = new() :: ets:tid()}). %% Name -> queue of {Pid, Ref} + +%% ---------------------------------------------------------- +%% # call(Node, Name, Req, Max, Timeout) +%% # call(Name, Req, Max, Timeout) +%% +%% Input: Name = term() identifying the queue in which the request is +%% to be evaluated. +%% Req = {M,F,A} +%% | {Fun, Arg} +%% | [Fun | Args] +%% | Fun +%% Max = Upper bound for the number of outstanding requests +%% in the named queue for Req to be queued. +%% If more than this number are in the queue then +%% 'rejected' is returned to the caller. Can be any +%% term but integer() | infinity is sufficient. +%% Timeout = 32 bit integer() number of milliseconds after which +%% request is cancelled (if not already started), causing +%% 'timeout' to be returned to the caller. +%% | infinity +%% +%% Output: Req() | rejected | timeout +%% +%% Description: Serialize a request in a named queue. Note that if +%% 'timeout' is returned and the request itself does not +%% return this atom then request has not been evaluated. +%% ---------------------------------------------------------- + +call(Name, Req, Max, Timeout) -> + call(node(), Name, Req, Max, Timeout). + +call(Node, Name, Req, Max, Timeout) -> + gen_call({?SERVER, Node}, ?REQUEST(call, Name, Req, Max, Timeout)). + +%%% ---------------------------------------------------------- +%%% # cast(Node, Name, Req, Max, Timeout) +%%% # cast(Name, Req, Max, Timeout) +%%% +%%% Output: ok | rejected | timeout +%%% +%%% Description: Serialize a request without returning the result to the +%%% caller. Returns after the task is queued. +%%% ---------------------------------------------------------- + +cast(Name, Req, Max, Timeout) -> + cast(node(), Name, Req, Max, Timeout). + +cast(Node, Name, Req, Max, Timeout) -> + gen_call({?SERVER, Node}, ?REQUEST(cast, Name, Req, Max, Timeout)). + +%% 'timeout' is only return if the server process that processes +%% requests isn't alive. Ditto for call/carp. + +%%% ---------------------------------------------------------- +%%% # carp(Node, Name) +%%% # carp(Name) +%%% +%%% Output: {value, Pid} | false | timeout +%%% +%%% Description: Return the pid of the process processing the task +%%% at the head of the named queue. Note that the value +%%% returned by subsequent calls changes as tasks are +%%% completed, each task executing in a dedicated +%%% process. The exit value of this process will be +%%% {value, Req()} if the task returns. +%%% ---------------------------------------------------------- + +%% The intention of this is to let a process enqueue a task that waits +%% for a message before completing, the target pid being retrieved +%% with carp/[12]. + +carp(Name) -> + carp(node(), Name). + +carp(Node, Name) -> + gen_call({?SERVER, Node}, ?CARP(Name)). + +%%% --------------------------------------------------------- +%%% EXPORTED INTERNAL FUNCTIONS +%%% --------------------------------------------------------- + +state() -> + call(state). + +uptime() -> + call(uptime). + +flush(Name) -> + call({flush, Name}). + +pending() -> + call(pending). + +pending(Name) -> + call({pending, Name}). + +queues() -> + call(queues). + +pids(Name) -> + call({pids, Name}). + +%%% ---------------------------------------------------------- +%%% # start_link() +%%% ---------------------------------------------------------- + +start_link() -> + ServerName = {local, ?SERVER}, + Module = ?MODULE, + Args = [], + Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}], + gen_server:start_link(ServerName, Module, Args, Options). + +%%% ---------------------------------------------------------- +%%% # init(_) +%%% ---------------------------------------------------------- + +init(_) -> + {ok, #state{}}. + +%%% ---------------------------------------------------------- +%%% # handle_call(Request, From, State) +%%% ---------------------------------------------------------- + +%% Enqueue a new request. +handle_call(?REQUEST(Type, Name, Req, Max, Timeout), + From, + #state{queue = QD} = State) -> + T = find(Name, QD), + nq(queued(T) =< Max, T, {Type, From}, Name, Req, Timeout, State); + +handle_call(Request, From, State) -> + {reply, call(Request, From, State), State}. + +%% call/3 + +call(?CARP(Name), _, #state{queue = QD}) -> + pcar(find(Name, QD)); + +call(state, _, State) -> + State; + +call(uptime, _, #state{time = T}) -> + diameter_lib:now_diff(T); + +call({flush, Name}, _, #state{queue = QD}) -> + cancel(find(Name, QD)); + +call(pending, _, #state{pending = N}) -> + N; + +call({pending, Name}, _, #state{queue = QD}) -> + queued(find(Name, QD)); + +call(queues, _, #state{queue = QD}) -> + fetch_keys(QD); + +call({pids, Name}, _, #state{queue = QD}) -> + plist(find(Name, QD)); + +call(Req, From, _State) -> %% ignore + ?UNEXPECTED(handle_call, [Req, From]), + nok. + +%%% ---------------------------------------------------------- +%%% handle_cast(Request, State) +%%% ---------------------------------------------------------- + +handle_cast(Msg, State) -> + ?UNEXPECTED([Msg]), + {noreply, State}. + +%%% ---------------------------------------------------------- +%%% handle_info(Request, State) +%%% ---------------------------------------------------------- + +handle_info(Request, State) -> + {noreply, info(Request, State)}. + +%% info/2 + +%% A request has completed execution or timed out. +info({'DOWN', MRef, process, Pid, Info}, + #state{pending = N, + monitor = MD, + queue = QD} + = State) -> + {Name, From} = fetch(MRef, MD), + reply(From, rc(Info)), + State#state{pending = N-1, + monitor = erase(MRef, MD), + queue = dq(fetch(Name, QD), Pid, Info, Name, QD)}; + +info(Info, State) -> + ?UNEXPECTED(handle_info, [Info]), + State. + +reply({call, From}, T) -> + gen_server:reply(From, T); +reply(cast, _) -> + ok. + +rc({value, T}) -> + T; +rc(_) -> + timeout. + +%%% ---------------------------------------------------------- +%%% code_change(OldVsn, State, Extra) +%%% ---------------------------------------------------------- + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%%% ---------------------------------------------------------- +%%% terminate(Reason, State) +%%% ---------------------------------------------------------- + +terminate(_Reason, _State)-> + ok. + +%%% --------------------------------------------------------- +%%% INTERNAL FUNCTIONS +%%% --------------------------------------------------------- + +%% queued/1 + +queued({ok, {N,_}}) -> + N; +queued(error) -> + 0. + +%% nq/7 + +%% Maximum number of pending requests exceeded ... +nq(false, _, _, _Name, _Req, _Timeout, State) -> + {reply, rejected, State}; + +%% ... or not. +nq(true, T, From, Name, Req, Timeout, #state{pending = N, + monitor = MD, + queue = QD} + = State) -> + Ref = make_ref(), + Pid = init(Ref, Req, timeout(Timeout, T)), + MRef = erlang:monitor(process, Pid), + {noreply, State#state{pending = N+1, + monitor = store(MRef, {Name, from(From)}, MD), + queue = store(Name, nq(T, {Pid, Ref}), QD)}}. + +from({call, _} = T) -> + T; +from({cast = T, From}) -> + gen_server:reply(From, ok), + T. + +%% nq/2 + +%% Other requests in the queue: append. +nq({ok, {N,Q}}, T) -> + {N+1, queue:in(T,Q)}; + +%% Queue is empty: start execution. +nq(error, T) -> + go(T), + {1, queue:from_list([T])}. + +%% Don't timeout if the request is evaluated immediately so as to +%% avoid a race between getting a 'go' and a 'timeout'. Queueing a +%% request in an empty queue always results in execution. +timeout(_, error) -> + infinity; +timeout(Timeout, _) -> + Timeout. + +%% dq/5 +%% +%% A request process has terminated. + +dq({N,Q}, Pid, _Info, Name, QD) -> + {{value, T}, TQ} = queue:out(Q), + dq(N-1, Pid, T, TQ, Name, QD). + +%% dq/6 + +%% Request was at the head of the queue: start another. +dq(N, Pid, {Pid, _}, TQ, Name, QD) -> + dq(N, TQ, Name, QD); + +%% Or not: remove the offender from the queue. +dq(N, Pid, T, TQ, Name, QD) -> + store(Name, {N, req(Pid, queue:from_list([T]), TQ)}, QD). + +%% dq/4 + +%% Queue is empty: erase. +dq(0, TQ, Name, QD) -> + true = queue:is_empty(TQ), %% assert + erase(Name, QD); + +%% Start the next request. +dq(N, TQ, Name, QD) -> + go(queue:head(TQ)), + store(Name, {N, TQ}, QD). + +%% req/3 +%% +%% Find and remove the queue element for the specified pid. + +req(Pid, HQ, Q) -> + {{value, T}, TQ} = queue:out(Q), + req(Pid, T, HQ, TQ). + +req(Pid, {Pid, _}, HQ, TQ) -> + queue:join(HQ, TQ); +req(Pid, T, HQ, TQ) -> + req(Pid, queue:in(T,HQ), TQ). + +%% go/1 + +go({Pid, Ref}) -> + Pid ! {Ref, ok}. + +%% init/4 +%% +%% Start the dedicated process for handling a request. The exit value +%% is as promised by carp/1. + +init(Ref, Req, Timeout) -> + spawn(fun() -> exit(i(Ref, Req, Timeout)) end). + +i(Ref, Req, Timeout) -> + Timer = send_timeout(Ref, Timeout), + MRef = erlang:monitor(process, ?SERVER), + receive + {Ref, ok} -> %% Do the deed. + %% Ensure we don't leave messages in the mailbox since the + %% request itself might receive. Alternatively, could have + %% done the eval in a new process but then we'd have to + %% relay messages arriving at this one. + cancel_timer(Timer), + erlang:demonitor(MRef, [flush]), + %% Ref is to ensure that we don't extract any message that + %% a client may have sent after retrieving self() with + %% carp/1, there being no guarantee that the message + %% banged by go/1 is received before the pid becomes + %% accessible. + {value, eval(Req)}; + {Ref, timeout = T} -> + T; + {'DOWN', MRef, process, _Pid, _Info} = D -> %% server death + D + end. + +send_timeout(_Ref, infinity = No) -> + No; +send_timeout(Ref, Ms) -> + Msg = {Ref, timeout}, + TRef = erlang:send_after(Ms, self(), Msg), + {TRef, Msg}. + +cancel_timer(infinity = No) -> + No; +cancel_timer({TRef, Msg}) -> + flush(Msg, erlang:cancel_timer(TRef)). + +flush(Msg, false) -> %% Message has already been sent ... + %% 'error' should never happen but crash if it does so as not to + %% hang the process. + ok = receive Msg -> ok after ?TIMEOUT -> error end; +flush(_, _) -> %% ... or not. + ok. + +eval({M,F,A}) -> + apply(M,F,A); +eval([Fun | Args]) -> + apply(Fun, Args); +eval({Fun, A}) -> + Fun(A); +eval(Fun) -> + Fun(). + +%% pcar/1 + +pcar({ok, {_,Q}}) -> + {Pid, _Ref} = queue:head(Q), + {value, Pid}; +pcar(error) -> + false. + +%% plist/1 + +plist({ok, {_,Q}}) -> + lists:map(fun({Pid, _Ref}) -> Pid end, queue:to_list(Q)); +plist(error) -> + []. + +%% cancel/1 +%% +%% Cancel all but the active request from the named queue. Return the +%% number of requests cancelled. + +%% Just send timeout messages to each request to make them die. Note +%% that these are guaranteed to arrive before a go message after the +%% current request completes since both messages are sent from the +%% server process. +cancel({ok, {N,Q}}) -> + {_,TQ} = queue:split(1,Q), + foreach(fun({Pid, Ref}) -> Pid ! {Ref, timeout} end, N-1, TQ), + N-1; +cancel(error) -> + 0. + +%% foreach/3 + +foreach(_, 0, _) -> + ok; +foreach(Fun, N, Q) -> + Fun(queue:head(Q)), + foreach(Fun, N-1, queue:tail(Q)). + +%% call/1 + +%% gen_server:call/3 will exit if the target process dies. +call(Request) -> + try + gen_server:call(?SERVER, Request, ?TIMEOUT) + catch + exit: Reason -> + {error, Reason} + end. + +%% dict-like table manipulation. + +erase(Key, Dict) -> + ets:delete(Dict, Key), + Dict. + +fetch(Key, Dict) -> + {ok, V} = find(Key, Dict), + V. + +fetch_keys(Dict) -> + ets:foldl(fun({K,_}, Acc) -> [K | Acc] end, [], Dict). + +find(Key, Dict) -> + case ets:lookup(Dict, Key) of + [{Key, V}] -> + {ok, V}; + [] -> + error + end. + +new() -> + ets:new(?MODULE, [set]). + +store(Key, Value, Dict) -> + store({Key, Value}, Dict). + +store({_,_} = T, Dict) -> + ets:insert(Dict, T), + Dict. + +%% gen_call/1 + +gen_call(Server, Req) -> + gen_call(Server, Req, infinity). + +gen_call(Server, Req, Timeout) -> + try + gen_server:call(Server, Req, Timeout) + catch + exit: _ -> + timeout + end. |