aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/app/diameter_sync.erl
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2011-05-18 18:29:12 +0200
committerAnders Svensson <[email protected]>2011-05-18 18:29:12 +0200
commit3c15ff32e89e401b4dde2b8acc9699be2614b996 (patch)
tree184dc988fb2ab3af04a532bc59cc794a8d74fbd3 /lib/diameter/src/app/diameter_sync.erl
parentb1e768e86593178810c8a0b3c38443dcf6be5181 (diff)
downloadotp-3c15ff32e89e401b4dde2b8acc9699be2614b996.tar.gz
otp-3c15ff32e89e401b4dde2b8acc9699be2614b996.tar.bz2
otp-3c15ff32e89e401b4dde2b8acc9699be2614b996.zip
Initial commit of the diameter application.
The application provides an implementation of the Diameter protocol as defined in RFC 3588.
Diffstat (limited to 'lib/diameter/src/app/diameter_sync.erl')
-rw-r--r--lib/diameter/src/app/diameter_sync.erl555
1 files changed, 555 insertions, 0 deletions
diff --git a/lib/diameter/src/app/diameter_sync.erl b/lib/diameter/src/app/diameter_sync.erl
new file mode 100644
index 0000000000..f7777ae809
--- /dev/null
+++ b/lib/diameter/src/app/diameter_sync.erl
@@ -0,0 +1,555 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+%%
+%% This module implements a server that serializes requests in named
+%% queues. A request is an MFA or fun and a name can be any term. A
+%% request is applied in a dedicated process that terminates when
+%% the request function returns.
+%%
+
+-module(diameter_sync).
+-behaviour(gen_server).
+
+-export([call/4, call/5,
+ cast/4, cast/5,
+ carp/1, carp/2]).
+
+%% supervisor callback
+-export([start_link/0]).
+
+%% gen_server interface
+-export([init/1,
+ terminate/2,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3]).
+
+%% test/debug
+-export([state/0,
+ uptime/0,
+ flush/1,
+ pending/0,
+ pending/1,
+ queues/0,
+ pids/1]).
+
+-include("diameter_internal.hrl").
+
+%% Locally registered server name.
+-define(SERVER, ?MODULE).
+
+%% Message to the server to queue a request ...
+-define(REQUEST(CallOrCast, Name, Req, Max, Timeout),
+ {request, CallOrCast, Name, Req, Max, Timeout}).
+
+%% ... and to retrieve the pid of the prevailing request process.
+-define(CARP(Name),
+ {carp, Name}).
+
+%% Forever ...
+-define(TIMEOUT, 30000).
+
+%% Server state.
+-record(state,
+ {time = now(),
+ pending = 0 :: non_neg_integer(), %% outstanding requests
+ monitor = new() :: ets:tid(), %% MonitorRef -> {Name, From}
+ queue = new() :: ets:tid()}). %% Name -> queue of {Pid, Ref}
+
+%% ----------------------------------------------------------
+%% # call(Node, Name, Req, Max, Timeout)
+%% # call(Name, Req, Max, Timeout)
+%%
+%% Input: Name = term() identifying the queue in which the request is
+%% to be evaluated.
+%% Req = {M,F,A}
+%% | {Fun, Arg}
+%% | [Fun | Args]
+%% | Fun
+%% Max = Upper bound for the number of outstanding requests
+%% in the named queue for Req to be queued.
+%% If more than this number are in the queue then
+%% 'rejected' is returned to the caller. Can be any
+%% term but integer() | infinity is sufficient.
+%% Timeout = 32 bit integer() number of milliseconds after which
+%% request is cancelled (if not already started), causing
+%% 'timeout' to be returned to the caller.
+%% | infinity
+%%
+%% Output: Req() | rejected | timeout
+%%
+%% Description: Serialize a request in a named queue. Note that if
+%% 'timeout' is returned and the request itself does not
+%% return this atom then request has not been evaluated.
+%% ----------------------------------------------------------
+
+call(Name, Req, Max, Timeout) ->
+ call(node(), Name, Req, Max, Timeout).
+
+call(Node, Name, Req, Max, Timeout) ->
+ gen_call({?SERVER, Node}, ?REQUEST(call, Name, Req, Max, Timeout)).
+
+%%% ----------------------------------------------------------
+%%% # cast(Node, Name, Req, Max, Timeout)
+%%% # cast(Name, Req, Max, Timeout)
+%%%
+%%% Output: ok | rejected | timeout
+%%%
+%%% Description: Serialize a request without returning the result to the
+%%% caller. Returns after the task is queued.
+%%% ----------------------------------------------------------
+
+cast(Name, Req, Max, Timeout) ->
+ cast(node(), Name, Req, Max, Timeout).
+
+cast(Node, Name, Req, Max, Timeout) ->
+ gen_call({?SERVER, Node}, ?REQUEST(cast, Name, Req, Max, Timeout)).
+
+%% 'timeout' is only return if the server process that processes
+%% requests isn't alive. Ditto for call/carp.
+
+%%% ----------------------------------------------------------
+%%% # carp(Node, Name)
+%%% # carp(Name)
+%%%
+%%% Output: {value, Pid} | false | timeout
+%%%
+%%% Description: Return the pid of the process processing the task
+%%% at the head of the named queue. Note that the value
+%%% returned by subsequent calls changes as tasks are
+%%% completed, each task executing in a dedicated
+%%% process. The exit value of this process will be
+%%% {value, Req()} if the task returns.
+%%% ----------------------------------------------------------
+
+%% The intention of this is to let a process enqueue a task that waits
+%% for a message before completing, the target pid being retrieved
+%% with carp/[12].
+
+carp(Name) ->
+ carp(node(), Name).
+
+carp(Node, Name) ->
+ gen_call({?SERVER, Node}, ?CARP(Name)).
+
+%%% ---------------------------------------------------------
+%%% EXPORTED INTERNAL FUNCTIONS
+%%% ---------------------------------------------------------
+
+state() ->
+ call(state).
+
+uptime() ->
+ call(uptime).
+
+flush(Name) ->
+ call({flush, Name}).
+
+pending() ->
+ call(pending).
+
+pending(Name) ->
+ call({pending, Name}).
+
+queues() ->
+ call(queues).
+
+pids(Name) ->
+ call({pids, Name}).
+
+%%% ----------------------------------------------------------
+%%% # start_link()
+%%% ----------------------------------------------------------
+
+start_link() ->
+ ServerName = {local, ?SERVER},
+ Module = ?MODULE,
+ Args = [],
+ Options = [{spawn_opt, diameter_lib:spawn_opts(server, [])}],
+ gen_server:start_link(ServerName, Module, Args, Options).
+
+%%% ----------------------------------------------------------
+%%% # init(_)
+%%% ----------------------------------------------------------
+
+init(_) ->
+ {ok, #state{}}.
+
+%%% ----------------------------------------------------------
+%%% # handle_call(Request, From, State)
+%%% ----------------------------------------------------------
+
+%% Enqueue a new request.
+handle_call(?REQUEST(Type, Name, Req, Max, Timeout),
+ From,
+ #state{queue = QD} = State) ->
+ T = find(Name, QD),
+ nq(queued(T) =< Max, T, {Type, From}, Name, Req, Timeout, State);
+
+handle_call(Request, _From, State) ->
+ {reply, call(Request, State), State}.
+
+%% call/2
+
+call(?CARP(Name), #state{queue = QD}) ->
+ pcar(find(Name, QD));
+
+call(state, State) ->
+ State;
+
+call(uptime, #state{time = T}) ->
+ diameter_lib:now_diff(T);
+
+call({flush, Name}, #state{queue = QD}) ->
+ cancel(find(Name, QD));
+
+call(pending, #state{pending = N}) ->
+ N;
+
+call({pending, Name}, #state{queue = QD}) ->
+ queued(find(Name, QD));
+
+call(queues, #state{queue = QD}) ->
+ fetch_keys(QD);
+
+call({pids, Name}, #state{queue = QD}) ->
+ plist(find(Name, QD));
+
+call(Req, _State) -> %% ignore
+ warning_msg("received unexpected request:~n~w", [Req]),
+ nok.
+
+%%% ----------------------------------------------------------
+%%% handle_cast(Request, State)
+%%% ----------------------------------------------------------
+
+handle_cast(Msg, State) ->
+ warning_msg("received unexpected message:~n~w", [Msg]),
+ {noreply, State}.
+
+%%% ----------------------------------------------------------
+%%% handle_info(Request, State)
+%%% ----------------------------------------------------------
+
+handle_info(Request, State) ->
+ {noreply, info(Request, State)}.
+
+%% info/2
+
+%% A request has completed execution or timed out.
+info({'DOWN', MRef, process, Pid, Info},
+ #state{pending = N,
+ monitor = MD,
+ queue = QD}
+ = State) ->
+ {Name, From} = fetch(MRef, MD),
+ reply(From, rc(Info)),
+ State#state{pending = N-1,
+ monitor = erase(MRef, MD),
+ queue = dq(fetch(Name, QD), Pid, Info, Name, QD)};
+
+info(Info, State) ->
+ warning_msg("received unknown info:~n~w", [Info]),
+ State.
+
+reply({call, From}, T) ->
+ gen_server:reply(From, T);
+reply(cast, _) ->
+ ok.
+
+rc({value, T}) ->
+ T;
+rc(_) ->
+ timeout.
+
+%%% ----------------------------------------------------------
+%%% code_change(OldVsn, State, Extra)
+%%% ----------------------------------------------------------
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%% ----------------------------------------------------------
+%%% terminate(Reason, State)
+%%% ----------------------------------------------------------
+
+terminate(_Reason, _State)->
+ ok.
+
+%%% ---------------------------------------------------------
+%%% INTERNAL FUNCTIONS
+%%% ---------------------------------------------------------
+
+%% queued/1
+
+queued({ok, {N,_}}) ->
+ N;
+queued(error) ->
+ 0.
+
+%% nq/7
+
+%% Maximum number of pending requests exceeded ...
+nq(false, _, _, _Name, _Req, _Timeout, State) ->
+ {reply, rejected, State};
+
+%% ... or not.
+nq(true, T, From, Name, Req, Timeout, #state{pending = N,
+ monitor = MD,
+ queue = QD}
+ = State) ->
+ Ref = make_ref(),
+ Pid = init(Ref, Req, timeout(Timeout, T)),
+ MRef = erlang:monitor(process, Pid),
+ {noreply, State#state{pending = N+1,
+ monitor = store(MRef, {Name, from(From)}, MD),
+ queue = store(Name, nq(T, {Pid, Ref}), QD)}}.
+
+from({call, _} = T) ->
+ T;
+from({cast = T, From}) ->
+ gen_server:reply(From, ok),
+ T.
+
+%% nq/2
+
+%% Other requests in the queue: append.
+nq({ok, {N,Q}}, T) ->
+ {N+1, queue:in(T,Q)};
+
+%% Queue is empty: start execution.
+nq(error, T) ->
+ go(T),
+ {1, queue:from_list([T])}.
+
+%% Don't timeout if the request is evaluated immediately so as to
+%% avoid a race between getting a 'go' and a 'timeout'. Queueing a
+%% request in an empty queue always results in execution.
+timeout(_, error) ->
+ infinity;
+timeout(Timeout, _) ->
+ Timeout.
+
+%% dq/5
+%%
+%% A request process has terminated.
+
+dq({N,Q}, Pid, _Info, Name, QD) ->
+ {{value, T}, TQ} = queue:out(Q),
+ dq(N-1, Pid, T, TQ, Name, QD).
+
+%% dq/6
+
+%% Request was at the head of the queue: start another.
+dq(N, Pid, {Pid, _}, TQ, Name, QD) ->
+ dq(N, TQ, Name, QD);
+
+%% Or not: remove the offender from the queue.
+dq(N, Pid, T, TQ, Name, QD) ->
+ store(Name, {N, req(Pid, queue:from_list([T]), TQ)}, QD).
+
+%% dq/4
+
+%% Queue is empty: erase.
+dq(0, TQ, Name, QD) ->
+ true = queue:is_empty(TQ), %% assert
+ erase(Name, QD);
+
+%% Start the next request.
+dq(N, TQ, Name, QD) ->
+ go(queue:head(TQ)),
+ store(Name, {N, TQ}, QD).
+
+%% req/3
+%%
+%% Find and remove the queue element for the specified pid.
+
+req(Pid, HQ, Q) ->
+ {{value, T}, TQ} = queue:out(Q),
+ req(Pid, T, HQ, TQ).
+
+req(Pid, {Pid, _}, HQ, TQ) ->
+ queue:join(HQ, TQ);
+req(Pid, T, HQ, TQ) ->
+ req(Pid, queue:in(T,HQ), TQ).
+
+%% go/1
+
+go({Pid, Ref}) ->
+ Pid ! {Ref, ok}.
+
+%% init/4
+%%
+%% Start the dedicated process for handling a request. The exit value
+%% is as promised by carp/1.
+
+init(Ref, Req, Timeout) ->
+ spawn(fun() -> exit(i(Ref, Req, Timeout)) end).
+
+i(Ref, Req, Timeout) ->
+ Timer = send_timeout(Ref, Timeout),
+ MRef = erlang:monitor(process, ?SERVER),
+ receive
+ {Ref, ok} -> %% Do the deed.
+ %% Ensure we don't leave messages in the mailbox since the
+ %% request itself might receive. Alternatively, could have
+ %% done the eval in a new process but then we'd have to
+ %% relay messages arriving at this one.
+ cancel_timer(Timer),
+ erlang:demonitor(MRef, [flush]),
+ %% Ref is to ensure that we don't extract any message that
+ %% a client may have sent after retrieving self() with
+ %% carp/1, there being no guarantee that the message
+ %% banged by go/1 is received before the pid becomes
+ %% accessible.
+ {value, eval(Req)};
+ {Ref, timeout = T} ->
+ T;
+ {'DOWN', MRef, process, _Pid, _Info} = D -> %% server death
+ D
+ end.
+
+send_timeout(_Ref, infinity = No) ->
+ No;
+send_timeout(Ref, Ms) ->
+ Msg = {Ref, timeout},
+ TRef = erlang:send_after(Ms, self(), Msg),
+ {TRef, Msg}.
+
+cancel_timer(infinity = No) ->
+ No;
+cancel_timer({TRef, Msg}) ->
+ flush(Msg, erlang:cancel_timer(TRef)).
+
+flush(Msg, false) -> %% Message has already been sent ...
+ %% 'error' should never happen but crash if it does so as not to
+ %% hang the process.
+ ok = receive Msg -> ok after ?TIMEOUT -> error end;
+flush(_, _) -> %% ... or not.
+ ok.
+
+eval({M,F,A}) ->
+ apply(M,F,A);
+eval([Fun | Args]) ->
+ apply(Fun, Args);
+eval({Fun, A}) ->
+ Fun(A);
+eval(Fun) ->
+ Fun().
+
+%% pcar/1
+
+pcar({ok, {_,Q}}) ->
+ {Pid, _Ref} = queue:head(Q),
+ {value, Pid};
+pcar(error) ->
+ false.
+
+%% plist/1
+
+plist({ok, {_,Q}}) ->
+ lists:map(fun({Pid, _Ref}) -> Pid end, queue:to_list(Q));
+plist(error) ->
+ [].
+
+%% cancel/1
+%%
+%% Cancel all but the active request from the named queue. Return the
+%% number of requests cancelled.
+
+%% Just send timeout messages to each request to make them die. Note
+%% that these are guaranteed to arrive before a go message after the
+%% current request completes since both messages are sent from the
+%% server process.
+cancel({ok, {N,Q}}) ->
+ {_,TQ} = queue:split(1,Q),
+ foreach(fun({Pid, Ref}) -> Pid ! {Ref, timeout} end, N-1, TQ),
+ N-1;
+cancel(error) ->
+ 0.
+
+%% foreach/3
+
+foreach(_, 0, _) ->
+ ok;
+foreach(Fun, N, Q) ->
+ Fun(queue:head(Q)),
+ foreach(Fun, N-1, queue:tail(Q)).
+
+%% call/1
+
+%% gen_server:call/3 will exit if the target process dies.
+call(Request) ->
+ try
+ gen_server:call(?SERVER, Request, ?TIMEOUT)
+ catch
+ exit: Reason ->
+ {error, Reason}
+ end.
+
+%% dict-like table manipulation.
+
+erase(Key, Dict) ->
+ ets:delete(Dict, Key),
+ Dict.
+
+fetch(Key, Dict) ->
+ {ok, V} = find(Key, Dict),
+ V.
+
+fetch_keys(Dict) ->
+ ets:foldl(fun({K,_}, Acc) -> [K | Acc] end, [], Dict).
+
+find(Key, Dict) ->
+ case ets:lookup(Dict, Key) of
+ [{Key, V}] ->
+ {ok, V};
+ [] ->
+ error
+ end.
+
+new() ->
+ ets:new(?MODULE, [set]).
+
+store(Key, Value, Dict) ->
+ store({Key, Value}, Dict).
+
+store({_,_} = T, Dict) ->
+ ets:insert(Dict, T),
+ Dict.
+
+%% gen_call/1
+
+gen_call(Server, Req) ->
+ gen_call(Server, Req, infinity).
+
+gen_call(Server, Req, Timeout) ->
+ try
+ gen_server:call(Server, Req, Timeout)
+ catch
+ exit: _ ->
+ timeout
+ end.
+
+%% warning_msg/2
+
+warning_msg(F, A) ->
+ ?diameter_warning("~p: " ++ F, [?MODULE | A]).