aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base/diameter_sync.erl
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2011-10-21 10:43:17 +0200
committerAnders Svensson <[email protected]>2011-10-21 10:43:17 +0200
commit5dc64cffb084e8abc6e5908025833481331f38de (patch)
tree43e8643b235791b81ade3caf9f3671d206a44527 /lib/diameter/src/base/diameter_sync.erl
parent1cc659425002846f4f553add9d027ca620b42a22 (diff)
parent6c048c57a4714e033f484ff79425ce847e9a43e9 (diff)
downloadotp-5dc64cffb084e8abc6e5908025833481331f38de.tar.gz
otp-5dc64cffb084e8abc6e5908025833481331f38de.tar.bz2
otp-5dc64cffb084e8abc6e5908025833481331f38de.zip
Merge branch 'anders/diameter/make/OTP-9638'
* anders/diameter/make/OTP-9638: Dumb down release target to Solaris /usr/ucb/install Dumb down opt/release targets to make 3.80 Minor tweaks and cleanup Need absolute -pa for bootstrap build Simpler release targets for src subdirectories Use secondary expansion for src subdirectory rules One makefile for src build instead of recursion Remove app dependency on compiler to avoid forced recompilation Move diameter_exprecs to compiler directory
Diffstat (limited to 'lib/diameter/src/base/diameter_sync.erl')
-rw-r--r--lib/diameter/src/base/diameter_sync.erl550
1 files changed, 550 insertions, 0 deletions
diff --git a/lib/diameter/src/base/diameter_sync.erl b/lib/diameter/src/base/diameter_sync.erl
new file mode 100644
index 0000000000..ce2db4b3a2
--- /dev/null
+++ b/lib/diameter/src/base/diameter_sync.erl
@@ -0,0 +1,550 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% This module implements a server that serializes requests in named
+%% queues. A request is an MFA or fun and a name can be any term. A
+%% request is applied in a dedicated process that terminates when
+%% the request function returns.
+%%
+
+-module(diameter_sync).
+-behaviour(gen_server).
+
+-export([call/4, call/5,
+ cast/4, cast/5,
+ carp/1, carp/2]).
+
+%% supervisor callback
+-export([start_link/0]).
+
+%% gen_server interface
+-export([init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3]).
+
+%% test/debug
+-export([state/0,
+ uptime/0,
+ flush/1,
+ pending/0,
+ pending/1,
+ queues/0,
+ pids/1]).
+
+-include("diameter_internal.hrl").
+
+%% Locally registered server name.
+-define(SERVER, ?MODULE).
+
+%% Message to the server to queue a request ...
+-define(REQUEST(CallOrCast, Name, Req, Max, Timeout),
+ {request, CallOrCast, Name, Req, Max, Timeout}).
+
+%% ... and to retrieve the pid of the prevailing request process.
+-define(CARP(Name),
+ {carp, Name}).
+
+%% Forever ...
+-define(TIMEOUT, 30000).
+
+%% Server state.
+-record(state,
+ {time = now(),
+ pending = 0 :: non_neg_integer(), %% outstanding requests
+ monitor = new() :: ets:tid(), %% MonitorRef -> {Name, From}
+ queue = new() :: ets:tid()}). %% Name -> queue of {Pid, Ref}
+
+%% ----------------------------------------------------------
+%% # call(Node, Name, Req, Max, Timeout)
+%% # call(Name, Req, Max, Timeout)
+%%
+%% Input: Name = term() identifying the queue in which the request is
+%% to be evaluated.
+%% Req = {M,F,A}
+%% | {Fun, Arg}
+%% | [Fun | Args]
+%% | Fun
+%% Max = Upper bound for the number of outstanding requests
+%% in the named queue for Req to be queued.
+%% If more than this number are in the queue then
+%% 'rejected' is returned to the caller. Can be any
+%% term but integer() | infinity is sufficient.
+%% Timeout = 32 bit integer() number of milliseconds after which
+%% request is cancelled (if not already started), causing
+%% 'timeout' to be returned to the caller.
+%% | infinity
+%%
+%% Output: Req() | rejected | timeout
+%%
+%% Description: Serialize a request in a named queue. Note that if
+%% 'timeout' is returned and the request itself does not
+%% return this atom then request has not been evaluated.
+%% ----------------------------------------------------------
+
+call(Name, Req, Max, Timeout) ->
+ call(node(), Name, Req, Max, Timeout).
+
+call(Node, Name, Req, Max, Timeout) ->
+ gen_call({?SERVER, Node}, ?REQUEST(call, Name, Req, Max, Timeout)).
+
+%%% ----------------------------------------------------------
+%%% # cast(Node, Name, Req, Max, Timeout)
+%%% # cast(Name, Req, Max, Timeout)
+%%%
+%%% Output: ok | rejected | timeout
+%%%
+%%% Description: Serialize a request without returning the result to the
+%%% caller. Returns after the task is queued.
+%%% ----------------------------------------------------------
+
+cast(Name, Req, Max, Timeout) ->
+ cast(node(), Name, Req, Max, Timeout).
+
+cast(Node, Name, Req, Max, Timeout) ->
+ gen_call({?SERVER, Node}, ?REQUEST(cast, Name, Req, Max, Timeout)).
+
+%% 'timeout' is only return if the server process that processes
+%% requests isn't alive. Ditto for call/carp.
+
+%%% ----------------------------------------------------------
+%%% # carp(Node, Name)
+%%% # carp(Name)
+%%%
+%%% Output: {value, Pid} | false | timeout
+%%%
+%%% Description: Return the pid of the process processing the task
+%%% at the head of the named queue. Note that the value
+%%% returned by subsequent calls changes as tasks are
+%%% completed, each task executing in a dedicated
+%%% process. The exit value of this process will be
+%%% {value, Req()} if the task returns.
+%%% ----------------------------------------------------------
+
+%% The intention of this is to let a process enqueue a task that waits
+%% for a message before completing, the target pid being retrieved
+%% with carp/[12].
+
+carp(Name) ->
+ carp(node(), Name).
+
+carp(Node, Name) ->
+ gen_call({?SERVER, Node}, ?CARP(Name)).
+
+%%% ---------------------------------------------------------
+%%% EXPORTED INTERNAL FUNCTIONS
+%%% ---------------------------------------------------------
+
+state() ->
+ call(state).
+
+uptime() ->
+ call(uptime).
+
+flush(Name) ->
+ call({flush, Name}).
+
+pending() ->
+ call(pending).
+
+pending(Name) ->
+ call({pending, Name}).
+
+queues() ->
+ call(queues).
+
+pids(Name) ->
+ call({pids, Name}).
+
+%%% ----------------------------------------------------------
+%%% # start_link()
+%%% ----------------------------------------------------------
+
+start_link() ->
+ ServerName = {local, ?SERVER},
+ Module = ?MODULE,
+ Args = [],
+ Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}],
+ gen_server:start_link(ServerName, Module, Args, Options).
+
+%%% ----------------------------------------------------------
+%%% # init(_)
+%%% ----------------------------------------------------------
+
+init(_) ->
+ {ok, #state{}}.
+
+%%% ----------------------------------------------------------
+%%% # handle_call(Request, From, State)
+%%% ----------------------------------------------------------
+
+%% Enqueue a new request.
+handle_call(?REQUEST(Type, Name, Req, Max, Timeout),
+ From,
+ #state{queue = QD} = State) ->
+ T = find(Name, QD),
+ nq(queued(T) =< Max, T, {Type, From}, Name, Req, Timeout, State);
+
+handle_call(Request, From, State) ->
+ {reply, call(Request, From, State), State}.
+
+%% call/3
+
+call(?CARP(Name), _, #state{queue = QD}) ->
+ pcar(find(Name, QD));
+
+call(state, _, State) ->
+ State;
+
+call(uptime, _, #state{time = T}) ->
+ diameter_lib:now_diff(T);
+
+call({flush, Name}, _, #state{queue = QD}) ->
+ cancel(find(Name, QD));
+
+call(pending, _, #state{pending = N}) ->
+ N;
+
+call({pending, Name}, _, #state{queue = QD}) ->
+ queued(find(Name, QD));
+
+call(queues, _, #state{queue = QD}) ->
+ fetch_keys(QD);
+
+call({pids, Name}, _, #state{queue = QD}) ->
+ plist(find(Name, QD));
+
+call(Req, From, _State) -> %% ignore
+ ?UNEXPECTED(handle_call, [Req, From]),
+ nok.
+
+%%% ----------------------------------------------------------
+%%% handle_cast(Request, State)
+%%% ----------------------------------------------------------
+
+handle_cast(Msg, State) ->
+ ?UNEXPECTED([Msg]),
+ {noreply, State}.
+
+%%% ----------------------------------------------------------
+%%% handle_info(Request, State)
+%%% ----------------------------------------------------------
+
+handle_info(Request, State) ->
+ {noreply, info(Request, State)}.
+
+%% info/2
+
+%% A request has completed execution or timed out.
+info({'DOWN', MRef, process, Pid, Info},
+ #state{pending = N,
+ monitor = MD,
+ queue = QD}
+ = State) ->
+ {Name, From} = fetch(MRef, MD),
+ reply(From, rc(Info)),
+ State#state{pending = N-1,
+ monitor = erase(MRef, MD),
+ queue = dq(fetch(Name, QD), Pid, Info, Name, QD)};
+
+info(Info, State) ->
+ ?UNEXPECTED(handle_info, [Info]),
+ State.
+
+reply({call, From}, T) ->
+ gen_server:reply(From, T);
+reply(cast, _) ->
+ ok.
+
+rc({value, T}) ->
+ T;
+rc(_) ->
+ timeout.
+
+%%% ----------------------------------------------------------
+%%% code_change(OldVsn, State, Extra)
+%%% ----------------------------------------------------------
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%% ----------------------------------------------------------
+%%% terminate(Reason, State)
+%%% ----------------------------------------------------------
+
+terminate(_Reason, _State)->
+ ok.
+
+%%% ---------------------------------------------------------
+%%% INTERNAL FUNCTIONS
+%%% ---------------------------------------------------------
+
+%% queued/1
+
+queued({ok, {N,_}}) ->
+ N;
+queued(error) ->
+ 0.
+
+%% nq/7
+
+%% Maximum number of pending requests exceeded ...
+nq(false, _, _, _Name, _Req, _Timeout, State) ->
+ {reply, rejected, State};
+
+%% ... or not.
+nq(true, T, From, Name, Req, Timeout, #state{pending = N,
+ monitor = MD,
+ queue = QD}
+ = State) ->
+ Ref = make_ref(),
+ Pid = init(Ref, Req, timeout(Timeout, T)),
+ MRef = erlang:monitor(process, Pid),
+ {noreply, State#state{pending = N+1,
+ monitor = store(MRef, {Name, from(From)}, MD),
+ queue = store(Name, nq(T, {Pid, Ref}), QD)}}.
+
+from({call, _} = T) ->
+ T;
+from({cast = T, From}) ->
+ gen_server:reply(From, ok),
+ T.
+
+%% nq/2
+
+%% Other requests in the queue: append.
+nq({ok, {N,Q}}, T) ->
+ {N+1, queue:in(T,Q)};
+
+%% Queue is empty: start execution.
+nq(error, T) ->
+ go(T),
+ {1, queue:from_list([T])}.
+
+%% Don't timeout if the request is evaluated immediately so as to
+%% avoid a race between getting a 'go' and a 'timeout'. Queueing a
+%% request in an empty queue always results in execution.
+timeout(_, error) ->
+ infinity;
+timeout(Timeout, _) ->
+ Timeout.
+
+%% dq/5
+%%
+%% A request process has terminated.
+
+dq({N,Q}, Pid, _Info, Name, QD) ->
+ {{value, T}, TQ} = queue:out(Q),
+ dq(N-1, Pid, T, TQ, Name, QD).
+
+%% dq/6
+
+%% Request was at the head of the queue: start another.
+dq(N, Pid, {Pid, _}, TQ, Name, QD) ->
+ dq(N, TQ, Name, QD);
+
+%% Or not: remove the offender from the queue.
+dq(N, Pid, T, TQ, Name, QD) ->
+ store(Name, {N, req(Pid, queue:from_list([T]), TQ)}, QD).
+
+%% dq/4
+
+%% Queue is empty: erase.
+dq(0, TQ, Name, QD) ->
+ true = queue:is_empty(TQ), %% assert
+ erase(Name, QD);
+
+%% Start the next request.
+dq(N, TQ, Name, QD) ->
+ go(queue:head(TQ)),
+ store(Name, {N, TQ}, QD).
+
+%% req/3
+%%
+%% Find and remove the queue element for the specified pid.
+
+req(Pid, HQ, Q) ->
+ {{value, T}, TQ} = queue:out(Q),
+ req(Pid, T, HQ, TQ).
+
+req(Pid, {Pid, _}, HQ, TQ) ->
+ queue:join(HQ, TQ);
+req(Pid, T, HQ, TQ) ->
+ req(Pid, queue:in(T,HQ), TQ).
+
+%% go/1
+
+go({Pid, Ref}) ->
+ Pid ! {Ref, ok}.
+
+%% init/4
+%%
+%% Start the dedicated process for handling a request. The exit value
+%% is as promised by carp/1.
+
+init(Ref, Req, Timeout) ->
+ spawn(fun() -> exit(i(Ref, Req, Timeout)) end).
+
+i(Ref, Req, Timeout) ->
+ Timer = send_timeout(Ref, Timeout),
+ MRef = erlang:monitor(process, ?SERVER),
+ receive
+ {Ref, ok} -> %% Do the deed.
+ %% Ensure we don't leave messages in the mailbox since the
+ %% request itself might receive. Alternatively, could have
+ %% done the eval in a new process but then we'd have to
+ %% relay messages arriving at this one.
+ cancel_timer(Timer),
+ erlang:demonitor(MRef, [flush]),
+ %% Ref is to ensure that we don't extract any message that
+ %% a client may have sent after retrieving self() with
+ %% carp/1, there being no guarantee that the message
+ %% banged by go/1 is received before the pid becomes
+ %% accessible.
+ {value, eval(Req)};
+ {Ref, timeout = T} ->
+ T;
+ {'DOWN', MRef, process, _Pid, _Info} = D -> %% server death
+ D
+ end.
+
+send_timeout(_Ref, infinity = No) ->
+ No;
+send_timeout(Ref, Ms) ->
+ Msg = {Ref, timeout},
+ TRef = erlang:send_after(Ms, self(), Msg),
+ {TRef, Msg}.
+
+cancel_timer(infinity = No) ->
+ No;
+cancel_timer({TRef, Msg}) ->
+ flush(Msg, erlang:cancel_timer(TRef)).
+
+flush(Msg, false) -> %% Message has already been sent ...
+ %% 'error' should never happen but crash if it does so as not to
+ %% hang the process.
+ ok = receive Msg -> ok after ?TIMEOUT -> error end;
+flush(_, _) -> %% ... or not.
+ ok.
+
+eval({M,F,A}) ->
+ apply(M,F,A);
+eval([Fun | Args]) ->
+ apply(Fun, Args);
+eval({Fun, A}) ->
+ Fun(A);
+eval(Fun) ->
+ Fun().
+
+%% pcar/1
+
+pcar({ok, {_,Q}}) ->
+ {Pid, _Ref} = queue:head(Q),
+ {value, Pid};
+pcar(error) ->
+ false.
+
+%% plist/1
+
+plist({ok, {_,Q}}) ->
+ lists:map(fun({Pid, _Ref}) -> Pid end, queue:to_list(Q));
+plist(error) ->
+ [].
+
+%% cancel/1
+%%
+%% Cancel all but the active request from the named queue. Return the
+%% number of requests cancelled.
+
+%% Just send timeout messages to each request to make them die. Note
+%% that these are guaranteed to arrive before a go message after the
+%% current request completes since both messages are sent from the
+%% server process.
+cancel({ok, {N,Q}}) ->
+ {_,TQ} = queue:split(1,Q),
+ foreach(fun({Pid, Ref}) -> Pid ! {Ref, timeout} end, N-1, TQ),
+ N-1;
+cancel(error) ->
+ 0.
+
+%% foreach/3
+
+foreach(_, 0, _) ->
+ ok;
+foreach(Fun, N, Q) ->
+ Fun(queue:head(Q)),
+ foreach(Fun, N-1, queue:tail(Q)).
+
+%% call/1
+
+%% gen_server:call/3 will exit if the target process dies.
+call(Request) ->
+ try
+ gen_server:call(?SERVER, Request, ?TIMEOUT)
+ catch
+ exit: Reason ->
+ {error, Reason}
+ end.
+
+%% dict-like table manipulation.
+
+erase(Key, Dict) ->
+ ets:delete(Dict, Key),
+ Dict.
+
+fetch(Key, Dict) ->
+ {ok, V} = find(Key, Dict),
+ V.
+
+fetch_keys(Dict) ->
+ ets:foldl(fun({K,_}, Acc) -> [K | Acc] end, [], Dict).
+
+find(Key, Dict) ->
+ case ets:lookup(Dict, Key) of
+ [{Key, V}] ->
+ {ok, V};
+ [] ->
+ error
+ end.
+
+new() ->
+ ets:new(?MODULE, [set]).
+
+store(Key, Value, Dict) ->
+ store({Key, Value}, Dict).
+
+store({_,_} = T, Dict) ->
+ ets:insert(Dict, T),
+ Dict.
+
+%% gen_call/1
+
+gen_call(Server, Req) ->
+ gen_call(Server, Req, infinity).
+
+gen_call(Server, Req, Timeout) ->
+ try
+ gen_server:call(Server, Req, Timeout)
+ catch
+ exit: _ ->
+ timeout
+ end.