aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base/diameter_service.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base/diameter_service.erl')
-rw-r--r--lib/diameter/src/base/diameter_service.erl2689
1 files changed, 673 insertions, 2016 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index 725cccda1e..f1342df16c 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2012. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -24,33 +24,38 @@
-module(diameter_service).
-behaviour(gen_server).
+%% towards diameter_service_sup
+-export([start_link/1]).
+
+%% towards diameter
+-export([subscribe/1,
+ unsubscribe/1,
+ services/0,
+ info/2]).
+
+%% towards diameter_config
-export([start/1,
stop/1,
start_transport/2,
- stop_transport/2,
- info/2,
- call/4]).
+ stop_transport/2]).
-%% towards diameter_watchdog
--export([receive_message/3]).
+%% towards diameter_peer
+-export([notify/2]).
-%% service supervisor
--export([start_link/1]).
+%% towards diameter_traffic
+-export([find_incoming_app/4,
+ pick_peer/3]).
--export([subscribe/1,
- unsubscribe/1,
+%% test/debug
+-export([services/1,
subscriptions/1,
subscriptions/0,
- services/0,
- services/1,
- whois/1]).
-
-%% test/debug
--export([call_module/3,
+ call_module/3,
+ whois/1,
state/1,
uptime/1]).
-%%% gen_server callbacks
+%% gen_server callbacks
-export([init/1,
handle_call/3,
handle_cast/2,
@@ -58,21 +63,10 @@
terminate/2,
code_change/3]).
-%% Other callbacks.
--export([send/1]).
-
-include_lib("diameter/include/diameter.hrl").
-include("diameter_internal.hrl").
-%% The "old" states maintained in this module historically.
--define(STATE_UP, up).
--define(STATE_DOWN, down).
-
--type op_state() :: ?STATE_UP
- | ?STATE_DOWN.
-
-%% The RFC 3539 watchdog states that are now maintained, albeit
-%% along with the old up/down. okay = up, else down.
+%% RFC 3539 watchdog states.
-define(WD_INITIAL, initial).
-define(WD_OKAY, okay).
-define(WD_SUSPECT, suspect).
@@ -86,11 +80,8 @@
| ?WD_REOPEN.
-define(DEFAULT_TC, 30000). %% RFC 3588 ch 2.1
--define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests
-define(RESTART_TC, 1000). %% if restart was this recent
--define(RELAY, ?DIAMETER_DICT_RELAY).
-
%% Used to be able to swap this with anything else dict-like but now
%% rely on the fact that a service's #state{} record does not change
%% in storing in it ?STATE table and not always going through the
@@ -98,95 +89,71 @@
%% a ?Dict don't change the handle to it.
-define(Dict, diameter_dict).
-%% Table containing outgoing requests for which a reply has yet to be
-%% received.
--define(REQUEST_TABLE, diameter_request).
-
%% Maintains state in a table. In contrast to previously, a service's
%% stat is not constant and is accessed outside of the service
%% process.
-define(STATE_TABLE, ?MODULE).
+%% The default sequence mask.
+-define(NOMASK, {0,32}).
+
+%% The default restrict_connections.
+-define(RESTRICT, nodes).
+
%% Workaround for dialyzer's lack of understanding of match specs.
-type match(T)
- :: T | '_' | '$1' | '$2' | '$3' | '$4'.
-
-%% State of service gen_server.
+ :: T | '_' | '$1' | '$2'.
+
+%% State of service gen_server. Note that the state term itself
+%% doesn't change, which is relevant for the stateless application
+%% callbacks since the state is retrieved from ?STATE_TABLE from
+%% outside the service process. The pid in the service record is used
+%% to determine whether or not we need to call the process for a
+%% pick_peer callback in the statefull case.
-record(state,
{id = now(),
- service_name, %% as passed to start_service/2, key in ?STATE_TABLE
+ service_name :: diameter:service_name(), %% key in ?STATE_TABLE
service :: #diameter_service{},
- peerT = ets_new(peers) :: ets:tid(), %% #peer{} at start_fsm
- connT = ets_new(conns) :: ets:tid(), %% #conn{} at connection_up
- share_peers = false :: boolean(), %% broadcast peers to remote nodes?
- use_shared_peers = false :: boolean(), %% use broadcasted peers?
- shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
- local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...]
- monitor = false :: false | pid()}). %% process to die with
-%% shared_peers reflects the peers broadcast from remote nodes. Note
-%% that the state term itself doesn't change, which is relevant for
-%% the stateless application callbacks since the state is retrieved
-%% from ?STATE_TABLE from outside the service process. The pid in the
-%% service record is used to determine whether or not we need to call
-%% the process for a pick_peer callback.
-
-%% Record representing a watchdog process.
--record(peer,
+ watchdogT = ets_new(watchdogs) %% #watchdog{} at start
+ :: ets:tid(),
+ peerT = ets_new(peers) %% #peer{pid = TPid} at okay/reopen
+ :: ets:tid(),
+ shared_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...]
+ :: ets:tid(),
+ local_peers = ?Dict:new() %% Alias -> [{TPid, Caps}, ...]
+ :: ets:tid(),
+ monitor = false :: false | pid(), %% process to die with
+ options
+ :: [{sequence, diameter:sequence()} %% sequence mask
+ | {restrict_connections, diameter:restriction()}
+ | {share_peers, boolean()} %% broadcast peers to remote nodes?
+ | {use_shared_peers, boolean()}]}).%% use broadcasted peers?
+%% shared_peers reflects the peers broadcast from remote nodes.
+
+%% Record representing an RFC 3539 watchdog process implemented by
+%% diameter_watchdog.
+-record(watchdog,
{pid :: match(pid()),
type :: match(connect | accept),
ref :: match(reference()), %% key into diameter_config
options :: match([diameter:transport_opt()]),%% from start_transport
- op_state = {?STATE_DOWN, ?WD_INITIAL}
- :: match(op_state() | {op_state(), wd_state()}),
+ state = ?WD_INITIAL :: match(wd_state()),
started = now(), %% at process start
- conn = false :: match(boolean() | pid())}).
- %% true at accept, pid() at connection_up (connT key)
+ peer = false :: match(boolean() | pid())}).
+ %% true at accepted, pid() at okay/reopen
-%% Record representing a peer_fsm process.
--record(conn,
+%% Record representing an Peer State Machine processes implemented by
+%% diameter_peer_fsm.
+-record(peer,
{pid :: pid(),
apps :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias}
caps :: #diameter_caps{},
started = now(), %% at process start
- peer :: pid()}). %% key into peerT
-
-%% Record stored in diameter_request for each outgoing request.
--record(request,
- {from, %% arg 2 of handle_call/3
- handler :: match(pid()), %% request process
- transport :: match(pid()), %% peer process
- caps :: match(#diameter_caps{}),
- app :: match(diameter:app_alias()), %% #diameter_app.alias
- dictionary :: match(module()), %% #diameter_app.dictionary
- module :: match([module() | list()]),
- %% #diameter_app.module
- filter :: match(diameter:peer_filter()),
- packet :: match(#diameter_packet{})}).
-
-%% Record call/4 options are parsed into.
--record(options,
- {filter = none :: diameter:peer_filter(),
- extra = [] :: list(),
- timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF,
- detach = false :: boolean()}).
-
-%% Since RFC 3588 requires that a Diameter agent not modify End-to-End
-%% Identifiers, the possibility of explicitly setting an End-to-End
-%% Identifier would be needed to be able to implement an agent in
-%% which one side of the communication is not implemented on top of
-%% diameter. For example, Diameter being sent or received encapsulated
-%% in some other protocol, or even another Diameter stack in a
-%% non-Erlang environment. (Not that this is likely to be a normal
-%% case.)
-%%
-%% The implemented solution is not an option but to respect any header
-%% values set in a diameter_header record returned from a
-%% prepare_request callback. A call to diameter:call/4 can communicate
-%% values to the callback using the 'extra' option if so desired.
+ watchdog :: pid()}). %% key into watchdogT
-%%% ---------------------------------------------------------------------------
-%%% # start(SvcName)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # start/1
+%% ---------------------------------------------------------------------------
start(SvcName) ->
diameter_service_sup:start_child(SvcName).
@@ -197,9 +164,9 @@ start_link(SvcName) ->
%% Put the arbitrary term SvcName in a list in case we ever want to
%% send more than this and need to distinguish old from new.
-%%% ---------------------------------------------------------------------------
-%%% # stop(SvcName)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # stop/1
+%% ---------------------------------------------------------------------------
stop(SvcName) ->
case whois(SvcName) of
@@ -215,165 +182,43 @@ stop(ok, Pid) ->
stop(No, _) ->
No.
-%%% ---------------------------------------------------------------------------
-%%% # start_transport(SvcName, {Ref, Type, Opts})
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # start_transport/3
+%% ---------------------------------------------------------------------------
-start_transport(SvcName, {_,_,_} = T) ->
+start_transport(SvcName, {_Ref, _Type, _Opts} = T) ->
call_service_by_name(SvcName, {start, T}).
-%%% ---------------------------------------------------------------------------
-%%% # stop_transport(SvcName, Refs)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # stop_transport/2
+%% ---------------------------------------------------------------------------
stop_transport(_, []) ->
ok;
stop_transport(SvcName, [_|_] = Refs) ->
call_service_by_name(SvcName, {stop, Refs}).
-%%% ---------------------------------------------------------------------------
-%%% # info(SvcName, Item)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # info/2
+%% ---------------------------------------------------------------------------
info(SvcName, Item) ->
- info_rc(call_service_by_name(SvcName, {info, Item})).
-
-info_rc({error, _}) ->
- undefined;
-info_rc(Info) ->
- Info.
-
-%%% ---------------------------------------------------------------------------
-%%% # receive_message(TPid, Pkt, MessageData)
-%%% ---------------------------------------------------------------------------
-
-%% Handle an incoming message in the watchdog process. This used to
-%% come through the service process but this avoids that becoming a
-%% bottleneck.
-
-receive_message(TPid, Pkt, T)
- when is_pid(TPid) ->
- #diameter_packet{header = #diameter_header{is_request = R}} = Pkt,
- recv(R, (not R) andalso lookup_request(Pkt, TPid), TPid, Pkt, T).
-
-%% Incoming request ...
-recv(true, false, TPid, Pkt, T) ->
- try
- spawn(fun() -> recv_request(TPid, Pkt, T) end)
- catch
- error: system_limit = E -> %% discard
- ?LOG({error, E}, now())
- end;
-
-%% ... answer to known request ...
-recv(false, #request{from = {_, Ref}, handler = Pid} = Req, _, Pkt, _) ->
- Pid ! {answer, Ref, Req, Pkt};
-%% Note that failover could have happened prior to this message being
-%% received and triggering failback. That is, both a failover message
-%% and answer may be on their way to the handler process. In the worst
-%% case the request process gets notification of the failover and
-%% sends to the alternate peer before an answer arrives, so it's
-%% always the case that we can receive more than one answer after
-%% failover. The first answer received by the request process wins,
-%% any others are discarded.
-
-%% ... or not.
-recv(false, false, _, _, _) ->
- ok.
-
-%%% ---------------------------------------------------------------------------
-%%% # call(SvcName, App, Msg, Options)
-%%% ---------------------------------------------------------------------------
-
-call(SvcName, App, Msg, Options)
- when is_list(Options) ->
- Rec = make_options(Options),
- Ref = make_ref(),
- Caller = {self(), Ref},
- Fun = fun() -> exit({Ref, call(SvcName, App, Msg, Rec, Caller)}) end,
- try spawn_monitor(Fun) of
- {_, MRef} ->
- recv(MRef, Ref, Rec#options.detach, false)
- catch
- error: system_limit = E ->
- {error, E}
- end.
-
-%% Don't rely on gen_server:call/3 for the timeout handling since it
-%% makes no guarantees about not leaving a reply message in the
-%% mailbox if we catch its exit at timeout. It currently *can* do so,
-%% which is also undocumented.
-
-recv(MRef, _, true, true) ->
- erlang:demonitor(MRef, [flush]),
- ok;
-
-recv(MRef, Ref, Detach, Sent) ->
- receive
- Ref -> %% send has been attempted
- recv(MRef, Ref, Detach, true);
- {'DOWN', MRef, process, _, Reason} ->
- call_rc(Reason, Ref, Sent)
+ case lookup_state(SvcName) of
+ [#state{} = S] ->
+ service_info(Item, S);
+ [] ->
+ undefined
end.
-%% call/5 has returned ...
-call_rc({Ref, Ans}, Ref, _) ->
- Ans;
-
-%% ... or not. In this case failure/encode are documented.
-call_rc(_, _, Sent) ->
- {error, choose(Sent, failure, encode)}.
-
-%% call/5
-%%
-%% In the process spawned for the outgoing request.
-
-call(SvcName, App, Msg, Opts, Caller) ->
- c(ets:lookup(?STATE_TABLE, SvcName), App, Msg, Opts, Caller).
-
-c([#state{service_name = SvcName} = S], App, Msg, Opts, Caller) ->
- case find_transport(App, Msg, Opts, S) of
- {_,_,_} = T ->
- send_request(T, Msg, Opts, Caller, SvcName);
- false ->
- {error, no_connection};
- {error, _} = No ->
- No
- end;
-
-c([], _, _, _, _) ->
- {error, no_service}.
+%% lookup_state/1
-%% make_options/1
+lookup_state(SvcName) ->
+ ets:lookup(?STATE_TABLE, SvcName).
-make_options(Options) ->
- lists:foldl(fun mo/2, #options{}, Options).
-
-mo({timeout, T}, Rec)
- when is_integer(T), 0 =< T ->
- Rec#options{timeout = T};
-
-mo({filter, F}, #options{filter = none} = Rec) ->
- Rec#options{filter = F};
-mo({filter, F}, #options{filter = {all, Fs}} = Rec) ->
- Rec#options{filter = {all, [F | Fs]}};
-mo({filter, F}, #options{filter = F0} = Rec) ->
- Rec#options{filter = {all, [F0, F]}};
-
-mo({extra, L}, #options{extra = X} = Rec)
- when is_list(L) ->
- Rec#options{extra = X ++ L};
-
-mo(detach, Rec) ->
- Rec#options{detach = true};
-
-mo(T, _) ->
- ?ERROR({invalid_option, T}).
-
-%%% ---------------------------------------------------------------------------
-%%% # subscribe(SvcName)
-%%% # unsubscribe(SvcName)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # subscribe/1
+%% # unsubscribe/1
+%% ---------------------------------------------------------------------------
subscribe(SvcName) ->
diameter_reg:add({?MODULE, subscriber, SvcName}).
@@ -390,9 +235,9 @@ subscriptions() ->
pmap(Props) ->
lists:map(fun({{?MODULE, _, Name}, Pid}) -> {Name, Pid} end, Props).
-%%% ---------------------------------------------------------------------------
-%%% # services(Pattern)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # services/1
+%% ---------------------------------------------------------------------------
services(Pat) ->
pmap(diameter_reg:match({?MODULE, service, Pat})).
@@ -408,6 +253,86 @@ whois(SvcName) ->
undefined
end.
+%% ---------------------------------------------------------------------------
+%% # pick_peer/3
+%% ---------------------------------------------------------------------------
+
+-spec pick_peer(SvcName, AppOrAlias, Opts)
+ -> {{TPid, Caps, App}, Mask}
+ | false
+ | {error, term()}
+ when SvcName :: diameter:service_name(),
+ AppOrAlias :: {alias, diameter:app_alias()} | #diameter_app{},
+ Opts :: tuple(),
+ TPid :: pid(),
+ Caps :: #diameter_caps{},
+ App :: #diameter_app{},
+ Mask :: diameter:sequence().
+
+pick_peer(SvcName, App, Opts) ->
+ pick(lookup_state(SvcName), App, Opts).
+
+pick([], _, _) ->
+ {error, no_service};
+
+pick([S], App, Opts) ->
+ pick(S, App, Opts);
+
+pick(#state{service = #diameter_service{applications = Apps}}
+ = S,
+ {alias, Alias},
+ Opts) -> %% initial call from diameter:call/4
+ pick(S, find_outgoing_app(Alias, Apps), Opts);
+
+pick(_, false, _) ->
+ false;
+
+pick(#state{options = [{_, Mask} | _]}
+ = S,
+ #diameter_app{module = ModX, dictionary = Dict}
+ = App0,
+ {DestF, Filter, Xtra}) ->
+ App = App0#diameter_app{module = ModX ++ Xtra},
+ [_,_] = RealmAndHost = diameter_lib:eval([DestF, Dict]),
+ case pick_peer(App, RealmAndHost, Filter, S) of
+ {TPid, Caps} ->
+ {{TPid, Caps, App}, Mask};
+ false = No ->
+ No
+ end.
+
+%% ---------------------------------------------------------------------------
+%% # find_incoming_app/4
+%% ---------------------------------------------------------------------------
+
+-spec find_incoming_app(PeerT, TPid, Id, Apps)
+ -> {#diameter_app{}, #diameter_caps{}} %% connection and suitable app
+ | #diameter_caps{} %% connection but no suitable app
+ | false %% no connection
+ when PeerT :: ets:tid(),
+ TPid :: pid(),
+ Id :: non_neg_integer(),
+ Apps :: [#diameter_app{}].
+
+find_incoming_app(PeerT, TPid, Id, Apps) ->
+ try ets:lookup(PeerT, TPid) of
+ [#peer{} = P] ->
+ find_incoming_app(P, Id, Apps);
+ [] -> %% transport has gone down
+ false
+ catch
+ error: badarg -> %% service has gone down (and taken table with it)
+ false
+ end.
+
+%% ---------------------------------------------------------------------------
+%% # notify/2
+%% ---------------------------------------------------------------------------
+
+notify(SvcName, Msg) ->
+ Pid = whois(SvcName),
+ is_pid(Pid) andalso (Pid ! Msg).
+
%% ===========================================================================
%% ===========================================================================
@@ -422,9 +347,9 @@ uptime(Svc) ->
call_module(Service, AppMod, Request) ->
call_service(Service, {call_module, AppMod, Request}).
-%%% ---------------------------------------------------------------------------
-%%% # init([SvcName])
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # init/1
+%% ---------------------------------------------------------------------------
init([SvcName]) ->
process_flag(trap_exit, true), %% ensure terminate(shutdown, _)
@@ -435,9 +360,9 @@ i(SvcName, true) ->
i(_, false) ->
{stop, {shutdown, already_started}}.
-%%% ---------------------------------------------------------------------------
-%%% # handle_call(Req, From, State)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # handle_call/3
+%% ---------------------------------------------------------------------------
handle_call(state, _, S) ->
{reply, S, S};
@@ -462,11 +387,8 @@ handle_call({pick_peer, Local, Remote, App}, _From, S) ->
handle_call({call_module, AppMod, Req}, From, S) ->
call_module(AppMod, Req, From, S);
-handle_call({info, Item}, _From, S) ->
- {reply, service_info(Item, S), S};
-
handle_call(stop, _From, S) ->
- shutdown(S),
+ shutdown(service, S),
{stop, normal, ok, S};
%% The server currently isn't guaranteed to be dead when the caller
%% gets the reply. We deal with this in the call to the server,
@@ -476,24 +398,22 @@ handle_call(Req, From, S) ->
unexpected(handle_call, [Req, From], S),
{reply, nok, S}.
-%%% ---------------------------------------------------------------------------
-%%% # handle_cast(Req, State)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # handle_cast/2
+%% ---------------------------------------------------------------------------
handle_cast(Req, S) ->
unexpected(handle_cast, [Req], S),
{noreply, S}.
-%%% ---------------------------------------------------------------------------
-%%% # handle_info(Req, State)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # handle_info/2
+%% ---------------------------------------------------------------------------
-handle_info(T,S) ->
+handle_info(T, #state{} = S) ->
case transition(T,S) of
ok ->
{noreply, S};
- #state{} = NS ->
- {noreply, NS};
{stop, Reason} ->
{stop, {shutdown, Reason}, S}
end.
@@ -505,55 +425,39 @@ transition({accepted, Pid, TPid}, S) ->
accepted(Pid, TPid, S),
ok;
-%% Peer process has a new open connection.
-transition({connection_up, Pid, T}, S) ->
- connection_up(Pid, T, S);
-
-%% Peer process has left state open.
-transition({connection_down, Pid}, S) ->
- connection_down(Pid, S);
-
-%% Peer process has returned to state open.
-transition({connection_up, Pid}, S) ->
- connection_up(Pid, S);
-
-%% Accepting transport has lost connectivity.
-transition({close, Pid}, S) ->
- close(Pid, S),
- ok;
-
%% Connecting transport is being restarted by watchdog.
transition({reconnect, Pid}, S) ->
reconnect(Pid, S),
ok;
-%% Watchdog is sending notification of a state transition. Note that
-%% the connection_up/down messages are pre-date this message and are
-%% still used. A 'watchdog' message will follow these and communicate
-%% the same state as was set in handling connection_up/down.
-transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName,
- peerT = PeerT}) ->
- #peer{ref = Ref, type = T, options = Opts, op_state = {OS,_}}
- = P
- = fetch(PeerT, Pid),
- insert(PeerT, P#peer{op_state = {OS, To}}),
+%% Watchdog is sending notification of transport death.
+transition({close, Pid, Reason}, #state{service_name = SvcName,
+ watchdogT = WatchdogT}) ->
+ #watchdog{state = WS,
+ ref = Ref,
+ type = Type,
+ options = Opts}
+ = fetch(WatchdogT, Pid),
+ WS /= ?WD_OKAY
+ andalso
+ send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}),
+ ok;
+
+%% Watchdog is sending notification of a state transition.
+transition({watchdog, Pid, {[TPid | Data], From, To}},
+ #state{service_name = SvcName,
+ watchdogT = WatchdogT}
+ = S) ->
+ #watchdog{ref = Ref, type = T, options = Opts}
+ = Wd
+ = fetch(WatchdogT, Pid),
+ watchdog(TPid, Data, From, To, Wd, S),
send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}),
ok;
-%% Death of a peer process results in the removal of it's peer and any
-%% associated conn record when 'DOWN' is received (after this) but the
-%% states will be {?STATE_UP, ?WD_DOWN} for a short time. (No real
-%% problem since ?WD_* is only used in service_info.) We set ?WD_OKAY
-%% as a consequence of connection_up since we know a watchdog is
-%% coming. We can't set anything at connection_down since we don't
-%% know if the subsequent watchdog message will be ?WD_DOWN or
-%% ?WD_SUSPECT. We don't (yet) set ?STATE_* as a consequence of a
-%% watchdog message since this requires changing some of the matching
-%% on ?STATE_*.
-%%
-%% Death of a conn process results in connection_down followed by
-%% watchdog ?WD_DOWN. The latter doesn't result in the conn record
-%% being deleted since 'DOWN' from death of its peer doesn't (yet)
-%% deal with the record having been removed.
+%% Death of a watchdog process (#watchdog.pid) results in the removal of
+%% it's peer and any associated conn record when 'DOWN' is received.
+%% Death of a peer process process (#peer.pid, #watchdog.peer) results in
+%% ?WD_DOWN.
%% Monitor process has died. Just die with a reason that tells
%% diameter_config about the happening. If a cleaner shutdown is
@@ -561,52 +465,49 @@ transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName,
transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) ->
{stop, {monitor, Reason}};
-%% Local peer process has died.
-transition({'DOWN', _, process, Pid, Reason}, S)
+%% Local watchdog process has died.
+transition({'DOWN', _, process, Pid, _Reason}, S)
when node(Pid) == node() ->
- peer_down(Pid, Reason, S);
+ watchdog_down(Pid, S),
+ ok;
-%% Remote service wants to know about shared transports.
+%% Remote service wants to know about shared peers.
transition({service, Pid}, S) ->
share_peers(Pid, S),
ok;
%% Remote service is communicating a shared peer.
transition({peer, TPid, Aliases, Caps}, S) ->
- remote_peer_up(TPid, Aliases, Caps, S);
+ remote_peer_up(TPid, Aliases, Caps, S),
+ ok;
%% Remote peer process has died.
transition({'DOWN', _, process, TPid, _}, S) ->
- remote_peer_down(TPid, S);
+ remote_peer_down(TPid, S),
+ ok;
%% Restart after tc expiry.
transition({tc_timeout, T}, S) ->
tc_timeout(T, S),
ok;
-%% Request process is telling us it may have missed a failover message
-%% after a transport went down and the service process looked up
-%% outstanding requests.
-transition({failover, TRef, Seqs}, S) ->
- failover(TRef, Seqs, S),
- ok;
-
transition(Req, S) ->
unexpected(handle_info, [Req], S),
ok.
-%%% ---------------------------------------------------------------------------
-%%% # terminate(Reason, State)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # terminate/2
+%% ---------------------------------------------------------------------------
terminate(Reason, #state{service_name = Name} = S) ->
+ send_event(Name, stop),
ets:delete(?STATE_TABLE, Name),
shutdown == Reason %% application shutdown
- andalso shutdown(S).
+ andalso shutdown(application, S).
-%%% ---------------------------------------------------------------------------
-%%% # code_change(FromVsn, State, Extra)
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # code_change/3
+%% ---------------------------------------------------------------------------
code_change(FromVsn,
#state{service_name = SvcName,
@@ -632,30 +533,20 @@ code_change(FromVsn, SvcName, Extra, #diameter_app{alias = Alias} = A) ->
unexpected(F, A, #state{service_name = Name}) ->
?UNEXPECTED(F, A ++ [Name]).
-cb([_|_] = M, F, A) ->
- eval(M, F, A);
-cb(Rec, F, A) ->
- {_, M} = app(Rec),
+cb(#diameter_app{module = [_|_] = M}, F, A) ->
eval(M, F, A).
-app(#request{app = A, module = M}) ->
- {A,M};
-app(#diameter_app{alias = A, module = M}) ->
- {A,M}.
-
eval([M|X], F, A) ->
apply(M, F, A ++ X).
%% Callback with state.
-state_cb(#diameter_app{mutable = false, init_state = S}, {ModX, F, A}) ->
+state_cb(#diameter_app{module = ModX, mutable = false, init_state = S},
+ pick_peer = F,
+ A) ->
eval(ModX, F, A ++ [S]);
-state_cb(#diameter_app{mutable = true, alias = Alias}, {_,_,_} = MFA) ->
- state_cb(MFA, Alias);
-
-state_cb({ModX,F,A}, Alias)
- when is_list(ModX) ->
+state_cb(#diameter_app{module = ModX, alias = Alias}, F, A) ->
eval(ModX, F, A ++ [mod_state(Alias)]).
choose(true, X, _) -> X;
@@ -681,49 +572,38 @@ mod_state(Alias) ->
mod_state(Alias, ModS) ->
put({?MODULE, mod_state, Alias}, ModS).
-%%% ---------------------------------------------------------------------------
-%%% # shutdown/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # shutdown/2
+%% ---------------------------------------------------------------------------
-shutdown(Refs, #state{peerT = PeerT}) ->
- ets:foldl(fun(P,ok) -> s(P, Refs), ok end, ok, PeerT).
+%% remove_transport
+shutdown(Refs, #state{watchdogT = WatchdogT})
+ when is_list(Refs) ->
+ ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, WatchdogT);
-s(#peer{ref = Ref, pid = Pid}, Refs) ->
- s(lists:member(Ref, Refs), Pid);
+%% application/service shutdown
+shutdown(Reason, #state{watchdogT = WatchdogT})
+ when Reason == application;
+ Reason == service ->
+ diameter_lib:wait(ets:foldl(fun(P,A) -> st(P, Reason, A) end,
+ [],
+ WatchdogT)).
-s(true, Pid) ->
- Pid ! {shutdown, self()}; %% 'DOWN' will cleanup as usual
-s(false, _) ->
- ok.
-
-%%% ---------------------------------------------------------------------------
-%%% # shutdown/1
-%%% ---------------------------------------------------------------------------
+%% st/2
-shutdown(#state{peerT = PeerT}) ->
- %% A transport might not be alive to receive the shutdown request
- %% but give those that are a chance to shutdown gracefully.
- wait(fun st/2, PeerT),
- %% Kill the watchdogs explicitly in case there was no transport.
- wait(fun sw/2, PeerT).
+st(#watchdog{ref = Ref, pid = Pid}, Refs) ->
+ lists:member(Ref, Refs)
+ andalso (Pid ! {shutdown, self(), transport}). %% 'DOWN' cleans up
-wait(Fun, T) ->
- diameter_lib:wait(ets:foldl(Fun, [], T)).
+%% st/3
-st(#peer{conn = B}, Acc)
- when is_boolean(B) ->
- Acc;
-st(#peer{conn = Pid}, Acc) ->
- Pid ! shutdown,
+st(#watchdog{pid = Pid}, Reason, Acc) ->
+ Pid ! {shutdown, self(), Reason},
[Pid | Acc].
-sw(#peer{pid = Pid}, Acc) ->
- exit(Pid, shutdown),
- [Pid | Acc].
-
-%%% ---------------------------------------------------------------------------
-%%% # call_service/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # call_service/2
+%% ---------------------------------------------------------------------------
call_service(Pid, Req)
when is_pid(Pid) ->
@@ -746,11 +626,9 @@ cs(Pid, Req)
cs(undefined, _) ->
{error, no_service}.
-%%% ---------------------------------------------------------------------------
-%%% # i/1
-%%%
-%%% Output: #state{}
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # i/1
+%% ---------------------------------------------------------------------------
%% Intialize the state of a service gen_server.
@@ -769,6 +647,7 @@ i(SvcName) ->
true = ets:insert_new(?STATE_TABLE, S),
%% Start fsms for each transport.
+ send_event(SvcName, start),
lists:foreach(fun(T) -> start_fsm(T,S) end, CL),
init_shared(S),
@@ -779,9 +658,8 @@ cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts},
lists:foreach(fun init_mod/1, Apps),
S = #state{service_name = SvcName,
service = Rec#diameter_service{pid = self()},
- share_peers = get_value(share_peers, Opts),
- use_shared_peers = get_value(use_shared_peers, Opts),
- monitor = mref(get_value(monitor, Opts))},
+ monitor = mref(get_value(monitor, Opts)),
+ options = service_options(Opts)},
{S, Acc};
cfg_acc({_Ref, Type, _Opts} = T, {S, Acc})
@@ -789,15 +667,24 @@ cfg_acc({_Ref, Type, _Opts} = T, {S, Acc})
Type == listen ->
{S, [T | Acc]}.
+service_options(Opts) ->
+ [{sequence, proplists:get_value(sequence, Opts, ?NOMASK)},
+ {share_peers, get_value(share_peers, Opts)},
+ {use_shared_peers, get_value(use_shared_peers, Opts)},
+ {restrict_connections, proplists:get_value(restrict_connections,
+ Opts,
+ ?RESTRICT)}].
+%% The order of options is significant since we match against the list.
+
mref(false = No) ->
No;
mref(P) ->
erlang:monitor(process, P).
-init_shared(#state{use_shared_peers = true,
+init_shared(#state{options = [_, _, {_, true} | _],
service_name = Svc}) ->
diameter_peer:notify(Svc, {service, self()});
-init_shared(#state{use_shared_peers = false}) ->
+init_shared(#state{options = [_, _, {_, false} | _]}) ->
ok.
init_mod(#diameter_app{alias = Alias,
@@ -811,9 +698,9 @@ get_value(Key, Vs) ->
{_, V} = lists:keyfind(Key, 1, Vs),
V.
-%%% ---------------------------------------------------------------------------
-%%% # start/3
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # start/3
+%% ---------------------------------------------------------------------------
%% If the initial start/3 at service/transport start succeeds then
%% subsequent calls to start/4 on the same service will also succeed
@@ -843,35 +730,37 @@ type(connect = T) -> T.
%% start/4
-start(Ref, Type, Opts, #state{peerT = PeerT,
- connT = ConnT,
+start(Ref, Type, Opts, #state{watchdogT = WatchdogT,
+ peerT = PeerT,
+ options = SvcOpts,
service_name = SvcName,
- service = Svc})
+ service = Svc0})
when Type == connect;
Type == accept ->
- Pid = s(Type, Ref, {ConnT,
+ #diameter_service{applications = Apps}
+ = Svc
+ = merge_service(Opts, Svc0),
+ {_,_} = Mask = proplists:get_value(sequence, SvcOpts),
+ Pid = s(Type, Ref, {diameter_traffic:make_recvdata([SvcName,
+ PeerT,
+ Apps,
+ Mask]),
Opts,
- SvcName,
- merge_service(Opts, Svc)}),
- insert(PeerT, #peer{pid = Pid,
- type = Type,
- ref = Ref,
- options = Opts}),
+ SvcOpts,
+ Svc}),
+ insert(WatchdogT, #watchdog{pid = Pid,
+ type = Type,
+ ref = Ref,
+ options = Opts}),
Pid.
%% Note that the service record passed into the watchdog is the merged
-%% record so that each watchdog (and peer_fsm) may get a different
-%% record. This record is what is passed back into application
-%% callbacks.
+%% record so that each watchdog may get a different record. This
+%% record is what is passed back into application callbacks.
s(Type, Ref, T) ->
- case diameter_watchdog:start({Type, Ref}, T) of
- {_MRef, Pid} ->
- Pid;
- Pid when is_pid(Pid) -> %% from old code
- erlang:monitor(process, Pid),
- Pid
- end.
+ {_MRef, Pid} = diameter_watchdog:start({Type, Ref}, T),
+ Pid.
%% merge_service/2
@@ -905,84 +794,115 @@ ms({capabilities, Opts}, #diameter_service{capabilities = Caps0} = Svc)
ms(_, Svc) ->
Svc.
-%%% ---------------------------------------------------------------------------
-%%% # accepted/3
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # accepted/3
+%% ---------------------------------------------------------------------------
-accepted(Pid, _TPid, #state{peerT = PeerT} = S) ->
- #peer{ref = Ref, type = accept = T, conn = false, options = Opts}
- = P
- = fetch(PeerT, Pid),
- insert(PeerT, P#peer{conn = true}), %% mark replacement transport started
- start(Ref, T, Opts, S). %% start new peer
+accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) ->
+ #watchdog{ref = Ref, type = accept = T, peer = false, options = Opts}
+ = Wd
+ = fetch(WatchdogT, Pid),
+ insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement as started
+ start(Ref, T, Opts, S). %% start new watchdog
fetch(Tid, Key) ->
[T] = ets:lookup(Tid, Key),
- case T of
- #peer{op_state = ?STATE_UP} = P ->
- P#peer{op_state = {?STATE_UP, ?WD_OKAY}};
- #peer{op_state = ?STATE_DOWN} = P ->
- P#peer{op_state = {?STATE_DOWN, ?WD_DOWN}};
- _ ->
- T
- end.
+ T.
+
+%% ---------------------------------------------------------------------------
+%% # watchdog/6
+%%
+%% React to a watchdog state transition.
+%% ---------------------------------------------------------------------------
+
+%% Watchdog has a new open connection.
+watchdog(TPid, [T], _, ?WD_OKAY, Wd, State) ->
+ connection_up({TPid, T}, Wd, State);
+
+%% Watchdog has a new connection that will be opened after DW[RA]
+%% exchange.
+watchdog(TPid, [T], _, ?WD_REOPEN, Wd, State) ->
+ reopen({TPid, T}, Wd, State);
+
+%% Watchdog has recovered a suspect connection.
+watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) ->
+ #watchdog{peer = TPid} = Wd, %% assert
+ connection_up(Wd, State);
-%%% ---------------------------------------------------------------------------
-%%% # connection_up/3
-%%%
-%%% Output: #state{}
-%%% ---------------------------------------------------------------------------
-
-%% Peer process has reached the open state.
-
-connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT,
- connT = ConnT}
- = S) ->
- P = fetch(PeerT, Pid),
- C = #conn{pid = TPid,
- apps = SApps,
- caps = Caps,
- peer = Pid},
-
- insert(ConnT, C),
- connection_up([Pkt], P#peer{conn = TPid}, C, S).
-
-%%% ---------------------------------------------------------------------------
-%%% # connection_up/2
-%%%
-%%% Output: #state{}
-%%% ---------------------------------------------------------------------------
-
-%% Peer process has transitioned back into the open state. Note that there
-%% has been no new capabilties exchange in this case.
-
-connection_up(Pid, #state{peerT = PeerT,
- connT = ConnT}
- = S) ->
- #peer{conn = TPid} = P = fetch(PeerT, Pid),
- C = fetch(ConnT, TPid),
- connection_up([], P, C, S).
+%% Watchdog has an unresponsive connection.
+watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) ->
+ #watchdog{peer = TPid} = Wd, %% assert
+ connection_down(Wd, To, State);
+
+%% Watchdog has lost its connection.
+watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) ->
+ close(Wd, S),
+ connection_down(Wd, To, S),
+ ets:delete(PeerT, TPid);
+
+watchdog(_, [], _, _, _, _) ->
+ ok.
+
+%% ---------------------------------------------------------------------------
+%% # connection_up/3
+%% ---------------------------------------------------------------------------
+
+%% Watchdog process has reached state OKAY.
+
+connection_up({TPid, {Caps, SupportedApps, Pkt}},
+ #watchdog{pid = Pid}
+ = Wd,
+ #state{peerT = PeerT}
+ = S) ->
+ Pr = #peer{pid = TPid,
+ apps = SupportedApps,
+ caps = Caps,
+ watchdog = Pid},
+ insert(PeerT, Pr),
+ connection_up([Pkt], Wd#watchdog{peer = TPid}, Pr, S).
+
+%% ---------------------------------------------------------------------------
+%% # reopen/3
+%% ---------------------------------------------------------------------------
+
+reopen({TPid, {Caps, SupportedApps, _Pkt}},
+ #watchdog{pid = Pid}
+ = Wd,
+ #state{watchdogT = WatchdogT,
+ peerT = PeerT}) ->
+ insert(PeerT, #peer{pid = TPid,
+ apps = SupportedApps,
+ caps = Caps,
+ watchdog = Pid}),
+ insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN,
+ peer = TPid}).
+
+%% ---------------------------------------------------------------------------
+%% # connection_up/2
+%% ---------------------------------------------------------------------------
+
+%% Watchdog has recovered as suspect connection. Note that there has
+%% been no new capabilties exchange in this case.
+
+connection_up(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
+ connection_up([], Wd, fetch(PeerT, TPid), S).
%% connection_up/4
-connection_up(T, P, C, #state{peerT = PeerT,
- local_peers = LDict,
- service_name = SvcName,
- service
- = #diameter_service{applications = Apps}}
- = S) ->
- #peer{conn = TPid, op_state = {?STATE_DOWN, _}}
- = P,
- #conn{apps = SApps, caps = Caps}
- = C,
-
- insert(PeerT, P#peer{op_state = {?STATE_UP, ?WD_OKAY}}),
-
- request_peer_up(TPid),
- report_status(up, P, C, S, T),
- S#state{local_peers = insert_local_peer(SApps,
- {{TPid, Caps}, {SvcName, Apps}},
- LDict)}.
+connection_up(Extra,
+ #watchdog{peer = TPid}
+ = Wd,
+ #peer{apps = SApps, caps = Caps}
+ = Pr,
+ #state{watchdogT = WatchdogT,
+ local_peers = LDict,
+ service_name = SvcName,
+ service = #diameter_service{applications = Apps}}
+ = S) ->
+ insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}),
+ diameter_traffic:peer_up(TPid),
+ insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
+ report_status(up, Wd, Pr, S, Extra).
insert_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps).
@@ -992,78 +912,109 @@ ilp({Id, Alias}, {TC, SA}, LDict) ->
?Dict:append(Alias, TC, LDict).
init_conn(Id, Alias, {TPid, _} = TC, {SvcName, Apps}) ->
- #diameter_app{module = ModX,
- id = Id} %% assert
+ #diameter_app{id = Id} %% assert
+ = App
= find_app(Alias, Apps),
- peer_cb({ModX, peer_up, [SvcName, TC]}, Alias)
+ peer_cb(App, peer_up, [SvcName, TC])
orelse exit(TPid, kill). %% fake transport failure
-%% find_app/2
+%% ---------------------------------------------------------------------------
+%% # find_incoming_app/3
+%% ---------------------------------------------------------------------------
-find_app(Alias, Apps) ->
- case lists:keyfind(Alias, #diameter_app.alias, Apps) of
- #diameter_app{options = E} = A when is_atom(E) -> %% upgrade
- A#diameter_app{options = [{answer_errors, E}]};
+%% No one should be sending the relay identifier.
+find_incoming_app(#peer{caps = Caps}, ?APP_ID_RELAY, _) ->
+ Caps;
+
+find_incoming_app(Peer, Id, Apps)
+ when is_integer(Id) ->
+ find_incoming_app(Peer, [Id, ?APP_ID_RELAY], Apps);
+
+%% Note that the apps represented in SApps may be a strict subset of
+%% those in Apps.
+find_incoming_app(#peer{apps = SApps, caps = Caps}, Ids, Apps) ->
+ case keyfind(Ids, 1, SApps) of
+ {_Id, Alias} ->
+ {#diameter_app{} = find_app(Alias, Apps), Caps};
+ false ->
+ Caps
+ end.
+
+%% keyfind/3
+
+keyfind([], _, _) ->
+ false;
+keyfind([Key | Rest], Pos, L) ->
+ case lists:keyfind(Key, Pos, L) of
+ false ->
+ keyfind(Rest, Pos, L);
+ T ->
+ T
+ end.
+
+%% find_outgoing_app/2
+
+find_outgoing_app(Alias, Apps) ->
+ case find_app(Alias, Apps) of
+ #diameter_app{id = ?APP_ID_RELAY} ->
+ false;
A ->
A
end.
+%% find_app/2
+
+find_app(Alias, Apps) ->
+ lists:keyfind(Alias, #diameter_app.alias, Apps).
+
%% Don't bring down the service (and all associated connections)
%% regardless of what happens.
-peer_cb(MFA, Alias) ->
- try state_cb(MFA, Alias) of
+peer_cb(App, F, A) ->
+ try state_cb(App, F, A) of
ModS ->
- mod_state(Alias, ModS),
+ mod_state(App#diameter_app.alias, ModS),
true
catch
E:R ->
- diameter_lib:error_report({failure, {E, R, Alias, ?STACK}}, MFA),
+ diameter_lib:error_report({failure, {E, R, ?STACK}},
+ {App, F, A}),
false
end.
-%%% ---------------------------------------------------------------------------
-%%% # connection_down/2
-%%%
-%%% Output: #state{}
-%%% ---------------------------------------------------------------------------
-
-%% Peer process has transitioned out of the open state.
-
-connection_down(Pid, #state{peerT = PeerT,
- connT = ConnT}
- = S) ->
- #peer{op_state = {?STATE_UP, WS}, %% assert
- conn = TPid}
- = P
- = fetch(PeerT, Pid),
-
- C = fetch(ConnT, TPid),
- insert(PeerT, P#peer{op_state = {?STATE_DOWN, WS}}),
- connection_down(P,C,S).
+%% ---------------------------------------------------------------------------
+%% # connection_down/3
+%% ---------------------------------------------------------------------------
-%% connection_down/3
-
-connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, S) ->
- S;
-
-connection_down(#peer{conn = TPid,
- op_state = {?STATE_UP, _}}
- = P,
- #conn{caps = Caps,
+connection_down(#watchdog{state = ?WD_OKAY,
+ peer = TPid}
+ = Wd,
+ #peer{caps = Caps,
apps = SApps}
- = C,
+ = Pr,
#state{service_name = SvcName,
service = #diameter_service{applications = Apps},
local_peers = LDict}
= S) ->
- report_status(down, P, C, S, []),
- NewS = S#state{local_peers
- = remove_local_peer(SApps,
- {{TPid, Caps}, {SvcName, Apps}},
- LDict)},
- request_peer_down(TPid, NewS),
- NewS.
+ report_status(down, Wd, Pr, S, []),
+ remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict),
+ diameter_traffic:peer_down(TPid);
+
+connection_down(#watchdog{}, #peer{}, _) ->
+ ok;
+
+connection_down(#watchdog{state = WS,
+ peer = TPid}
+ = Wd,
+ To,
+ #state{watchdogT = WatchdogT,
+ peerT = PeerT}
+ = S)
+ when is_atom(To) ->
+ insert(WatchdogT, Wd#watchdog{state = To}),
+ ?WD_OKAY == WS
+ andalso
+ connection_down(Wd, fetch(PeerT, TPid), S).
remove_local_peer(SApps, T, LDict) ->
lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps).
@@ -1074,73 +1025,58 @@ rlp({Id, Alias}, {TC, SA}, LDict) ->
?Dict:store(Alias, lists:delete(TC, L), LDict).
down_conn(Id, Alias, TC, {SvcName, Apps}) ->
- #diameter_app{module = ModX,
- id = Id} %% assert
+ #diameter_app{id = Id} %% assert
+ = App
= find_app(Alias, Apps),
- peer_cb({ModX, peer_down, [SvcName, TC]}, Alias).
-
-%%% ---------------------------------------------------------------------------
-%%% # peer_down/3
-%%%
-%%% Output: #state{}
-%%% ---------------------------------------------------------------------------
-
-%% Peer process has died.
-
-peer_down(Pid, Reason, #state{peerT = PeerT} = S) ->
- P = fetch(PeerT, Pid),
- ets:delete_object(PeerT, P),
- closed(Reason, P, S),
- restart(P,S),
- peer_down(P,S).
-
-%% Send an event at connection establishment failure.
-closed({shutdown, {close, _TPid, Reason}},
- #peer{op_state = {?STATE_DOWN, _},
- ref = Ref,
- type = Type,
- options = Opts},
- #state{service_name = SvcName}) ->
- send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}});
-closed(_, _, _) ->
- ok.
+ peer_cb(App, peer_down, [SvcName, TC]).
+
+%% ---------------------------------------------------------------------------
+%% # watchdog_down/2
+%% ---------------------------------------------------------------------------
+
+%% Watchdog process has died.
+
+watchdog_down(Pid, #state{watchdogT = WatchdogT} = S) ->
+ Wd = fetch(WatchdogT, Pid),
+ ets:delete_object(WatchdogT, Wd),
+ restart(Wd,S),
+ wd_down(Wd,S).
-%% The peer has never come up ...
-peer_down(#peer{conn = B}, S)
+%% Watchdog has never reached OKAY ...
+wd_down(#watchdog{peer = B}, _)
when is_boolean(B) ->
- S;
+ ok;
-%% ... or it has.
-peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) ->
- #conn{} = C = fetch(ConnT, TPid),
- ets:delete_object(ConnT, C),
- connection_down(P,C,S).
+%% ... or maybe it has.
+wd_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) ->
+ connection_down(Wd, ?WD_DOWN, S),
+ ets:delete(PeerT, TPid).
%% restart/2
-restart(P,S) ->
- q_restart(restart(P), S).
+restart(Wd, S) ->
+ q_restart(restart(Wd), S).
%% restart/1
%% Always try to reconnect.
-restart(#peer{ref = Ref,
- type = connect = T,
- options = Opts,
- started = Time}) ->
+restart(#watchdog{ref = Ref,
+ type = connect = T,
+ options = Opts,
+ started = Time}) ->
{Time, {Ref, T, Opts}};
%% Transport connection hasn't yet been accepted ...
-restart(#peer{ref = Ref,
- type = accept = T,
- options = Opts,
- conn = false,
- started = Time}) ->
+restart(#watchdog{ref = Ref,
+ type = accept = T,
+ options = Opts,
+ peer = false,
+ started = Time}) ->
{Time, {Ref, T, Opts}};
-%% ... or it has: a replacement transport has already been spawned.
-restart(#peer{type = accept}) ->
+%% ... or it has: a replacement has already been spawned.
+restart(#watchdog{type = accept}) ->
false.
%% q_restart/2
@@ -1165,8 +1101,8 @@ default_tc(connect, Opts) ->
default_tc(accept, _) ->
0.
-%% Bound tc below if the peer was restarted recently to avoid
-%% continuous in case of faulty config or other problems.
+%% Bound tc below if the watchdog was restarted recently to avoid
+%% continuous restarted in case of faulty config or other problems.
tc(Time, Tc) ->
choose(Tc > ?RESTART_TC
orelse timer:now_diff(now(), Time) > 1000*?RESTART_TC,
@@ -1190,9 +1126,9 @@ tc(true, {Ref, Type, Opts}, #state{service_name = SvcName}
tc(false = No, _, _) -> %% removed
No.
-%%% ---------------------------------------------------------------------------
-%%% # close/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # close/2
+%% ---------------------------------------------------------------------------
%% The watchdog doesn't start a new fsm in the accept case, it
%% simply stays alive until someone tells it to die in order for
@@ -1201,14 +1137,13 @@ tc(false = No, _, _) -> %% removed
%% the accepting watchdog upon reception of a CER from the previously
%% connected peer, or us after reconnect_timer timeout.
-close(Pid, #state{service_name = SvcName,
- peerT = PeerT}) ->
- #peer{pid = Pid,
- type = accept,
- ref = Ref,
- options = Opts}
- = fetch(PeerT, Pid),
-
+close(#watchdog{type = connect}, _) ->
+ ok;
+close(#watchdog{type = accept,
+ pid = Pid,
+ ref = Ref,
+ options = Opts},
+ #state{service_name = SvcName}) ->
c(Pid, diameter_config:have_transport(SvcName, Ref), Opts).
%% Tell watchdog to (maybe) die later ...
@@ -1226,21 +1161,21 @@ c(Pid, false, _Opts) ->
%% which a new connection attempt is expected of a connecting peer.
%% The value should be greater than the peer's Tc + jitter.
-%%% ---------------------------------------------------------------------------
-%%% # reconnect/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # reconnect/2
+%% ---------------------------------------------------------------------------
reconnect(Pid, #state{service_name = SvcName,
- peerT = PeerT}) ->
- #peer{ref = Ref,
- type = connect,
- options = Opts}
- = fetch(PeerT, Pid),
+ watchdogT = WatchdogT}) ->
+ #watchdog{ref = Ref,
+ type = connect,
+ options = Opts}
+ = fetch(WatchdogT, Pid),
send_event(SvcName, {reconnect, Ref, Opts}).
-%%% ---------------------------------------------------------------------------
-%%% # call_module/4
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # call_module/4
+%% ---------------------------------------------------------------------------
%% Backwards compatibility and never documented/advertised. May be
%% removed.
@@ -1262,10 +1197,10 @@ call_module(Mod, Req, From, #state{service
{reply, {error, Reason}, S}
end.
-cm([#diameter_app{module = ModX, alias = Alias}], Req, From, Svc) ->
- MFA = {ModX, handle_call, [Req, From, Svc]},
+cm([#diameter_app{alias = Alias} = App], Req, From, Svc) ->
+ Args = [Req, From, Svc],
- try state_cb(MFA, Alias) of
+ try state_cb(App, handle_call, Args) of
{noreply = T, ModS} ->
mod_state(Alias, ModS),
T;
@@ -1273,11 +1208,13 @@ cm([#diameter_app{module = ModX, alias = Alias}], Req, From, Svc) ->
mod_state(Alias, ModS),
{T, RC};
T ->
- diameter_lib:error_report({invalid, T}, MFA),
+ diameter_lib:error_report({invalid, T},
+ {App, handle_call, Args}),
invalid
catch
E: Reason ->
- diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA),
+ diameter_lib:error_report({failure, {E, Reason, ?STACK}},
+ {App, handle_call, Args}),
failure
end;
@@ -1287,1167 +1224,16 @@ cm([], _, _, _) ->
cm([_,_|_], _, _, _) ->
multiple.
-%%% ---------------------------------------------------------------------------
-%%% # send_request/5
-%%% ---------------------------------------------------------------------------
-
-%% Send an outgoing request in its dedicated process.
-%%
-%% Note that both encode of the outgoing request and of the received
-%% answer happens in this process. It's also this process that replies
-%% to the caller. The service process only handles the state-retaining
-%% callbacks.
-%%
-%% The mod field of the #diameter_app{} here includes any extra
-%% arguments passed to diameter:call/2.
-
-send_request({TPid, Caps, App}, Msg, Opts, Caller, SvcName) ->
- #diameter_app{module = ModX}
- = App,
-
- Pkt = make_request_packet(Msg),
-
- case cb(ModX, prepare_request, [Pkt, SvcName, {TPid, Caps}]) of
- {send, P} ->
- send_request(make_request_packet(P, Pkt),
- TPid,
- Caps,
- App,
- Opts,
- Caller,
- SvcName);
- {discard, Reason} ->
- {error, Reason};
- discard ->
- {error, discarded};
- T ->
- ?ERROR({invalid_return, prepare_request, App, T})
- end.
-
-%% make_request_packet/1
-%%
-%% Turn an outgoing request as passed to call/4 into a diameter_packet
-%% record in preparation for a prepare_request callback.
-
-make_request_packet(Bin)
- when is_binary(Bin) ->
- #diameter_packet{header = diameter_codec:decode_header(Bin),
- bin = Bin};
-
-make_request_packet(#diameter_packet{msg = [#diameter_header{} = Hdr | Avps]}
- = Pkt) ->
- Pkt#diameter_packet{msg = [make_request_header(Hdr) | Avps]};
-
-make_request_packet(#diameter_packet{header = Hdr} = Pkt) ->
- Pkt#diameter_packet{header = make_request_header(Hdr)};
-
-make_request_packet(Msg) ->
- make_request_packet(#diameter_packet{msg = Msg}).
-
-%% make_request_header/1
-
-make_request_header(undefined) ->
- Seq = diameter_session:sequence(),
- make_request_header(#diameter_header{end_to_end_id = Seq,
- hop_by_hop_id = Seq});
-
-make_request_header(#diameter_header{version = undefined} = Hdr) ->
- make_request_header(Hdr#diameter_header{version = ?DIAMETER_VERSION});
-
-make_request_header(#diameter_header{end_to_end_id = undefined} = H) ->
- Seq = diameter_session:sequence(),
- make_request_header(H#diameter_header{end_to_end_id = Seq});
-
-make_request_header(#diameter_header{hop_by_hop_id = undefined} = H) ->
- Seq = diameter_session:sequence(),
- make_request_header(H#diameter_header{hop_by_hop_id = Seq});
-
-make_request_header(#diameter_header{} = Hdr) ->
- Hdr;
-
-make_request_header(T) ->
- ?ERROR({invalid_header, T}).
-
-%% make_request_packet/2
-%%
-%% Reconstruct a diameter_packet from the return value of
-%% prepare_request or prepare_retransmit callback.
-
-make_request_packet(Bin, _)
- when is_binary(Bin) ->
- make_request_packet(Bin);
-
-make_request_packet(#diameter_packet{msg = [#diameter_header{} | _]}
- = Pkt,
- _) ->
- Pkt;
-
-%% Returning a diameter_packet with no header from a prepare_request
-%% or prepare_retransmit callback retains the header passed into it.
-%% This is primarily so that the end to end and hop by hop identifiers
-%% are retained.
-make_request_packet(#diameter_packet{header = Hdr} = Pkt,
- #diameter_packet{header = Hdr0}) ->
- Pkt#diameter_packet{header = fold_record(Hdr0, Hdr)};
-
-make_request_packet(Msg, Pkt) ->
- Pkt#diameter_packet{msg = Msg}.
-
-%% fold_record/2
-
-fold_record(undefined, R) ->
- R;
-fold_record(Rec, R) ->
- diameter_lib:fold_tuple(2, Rec, R).
-
-%% send_request/7
-
-send_request(Pkt, TPid, Caps, App, Opts, Caller, SvcName) ->
- #diameter_app{alias = Alias,
- dictionary = Dict,
- module = ModX,
- options = [{answer_errors, AE} | _]}
- = App,
-
- EPkt = encode(Dict, Pkt),
-
- #options{filter = Filter,
- timeout = Timeout}
- = Opts,
-
- Req = #request{packet = Pkt,
- from = Caller,
- handler = self(),
- transport = TPid,
- caps = Caps,
- app = Alias,
- filter = Filter,
- dictionary = Dict,
- module = ModX},
-
- try
- TRef = send_request(TPid, EPkt, Req, Timeout),
- ack(Caller),
- handle_answer(SvcName, AE, recv_answer(Timeout, SvcName, {TRef, Req}))
- after
- erase_request(EPkt)
- end.
-
-%% Tell caller a send has been attempted.
-ack({Pid, Ref}) ->
- Pid ! Ref.
-
-%% recv_answer/3
-
-recv_answer(Timeout,
- SvcName,
- {TRef, #request{from = {_, Ref}, packet = RPkt} = Req}
- = T) ->
-
- %% Matching on TRef below ensures we ignore messages that pertain
- %% to a previous transport prior to failover. The answer message
- %% includes the #request{} since it's not necessarily Req; that
- %% is, from the last peer to which we've transmitted.
-
- receive
- {answer = A, Ref, Rq, Pkt} -> %% Answer from peer
- {A, Rq, Pkt};
- {timeout = Reason, TRef, _} -> %% No timely reply
- {error, Req, Reason};
- {failover = Reason, TRef, false} -> %% No alternate peer
- {error, Req, Reason};
- {failover, TRef, Transport} -> %% Resend to alternate peer
- try_retransmit(Timeout, SvcName, Req, Transport);
- {failover, TRef} -> %% May have missed failover notification
- Seqs = diameter_codec:sequence_numbers(RPkt),
- Pid = whois(SvcName),
- is_pid(Pid) andalso (Pid ! {failover, TRef, Seqs}),
- recv_answer(Timeout, SvcName, T)
- end.
-%% Note that failover starts a new timer and that expiry of an old
-%% timer value is ignored. This means that an answer could be accepted
-%% from a peer after timeout in the case of failover.
-
-try_retransmit(Timeout, SvcName, Req, Transport) ->
- try retransmit(Transport, Req, SvcName, Timeout) of
- T -> recv_answer(Timeout, SvcName, T)
- catch
- ?FAILURE(Reason) -> {error, Req, Reason}
- end.
-
-%% handle_error/3
-
-handle_error(Req, Reason, SvcName) ->
- #request{module = ModX,
- packet = Pkt,
- transport = TPid,
- caps = Caps}
- = Req,
- cb(ModX, handle_error, [Reason, msg(Pkt), SvcName, {TPid, Caps}]).
-
-msg(#diameter_packet{msg = undefined, bin = Bin}) ->
- Bin;
-msg(#diameter_packet{msg = Msg}) ->
- Msg.
-
-%% encode/2
-
-%% Note that prepare_request can return a diameter_packet containing
-%% header or transport_data. Even allow the returned record to contain
-%% an encoded binary. This isn't the usual case but could some in
-%% handy, for test at least. (For example, to send garbage.)
-
-%% The normal case: encode the returned message.
-encode(Dict, #diameter_packet{msg = Msg, bin = undefined} = Pkt) ->
- D = pick_dictionary([Dict, ?BASE], Msg),
- diameter_codec:encode(D, Pkt);
-
-%% Callback has returned an encoded binary: just send.
-encode(_, #diameter_packet{} = Pkt) ->
- Pkt.
-
-%% pick_dictionary/2
-
-%% Pick the first dictionary that declares the application id in the
-%% specified header.
-pick_dictionary(Ds, [#diameter_header{application_id = Id} | _]) ->
- pd(Ds, fun(D) -> Id = D:id() end);
-
-%% Pick the first dictionary that knows the specified message name.
-pick_dictionary(Ds, [MsgName|_]) ->
- pd(Ds, fun(D) -> D:msg2rec(MsgName) end);
-
-%% Pick the first dictionary that knows the name of the specified
-%% message record.
-pick_dictionary(Ds, Rec) ->
- Name = element(1, Rec),
- pd(Ds, fun(D) -> D:rec2msg(Name) end).
-
-pd([D|Ds], F) ->
- try
- F(D),
- D
- catch
- error:_ ->
- pd(Ds, F)
- end;
-
-pd([], _) ->
- ?ERROR(no_dictionary).
-
-%% send_request/4
-
-send_request(TPid, #diameter_packet{bin = Bin} = Pkt, Req, Timeout)
- when node() == node(TPid) ->
- %% Store the outgoing request before sending to avoid a race with
- %% reply reception.
- TRef = store_request(TPid, Bin, Req, Timeout),
- send(TPid, Pkt),
- TRef;
-
-%% Send using a remote transport: spawn a process on the remote node
-%% to relay the answer.
-send_request(TPid, #diameter_packet{} = Pkt, Req, Timeout) ->
- TRef = erlang:start_timer(Timeout, self(), timeout),
- T = {TPid, Pkt, Req, Timeout, TRef},
- spawn(node(TPid), ?MODULE, send, [T]),
- TRef.
-
-%% send/1
-
-send({TPid, Pkt, #request{handler = Pid} = Req, Timeout, TRef}) ->
- Ref = send_request(TPid, Pkt, Req#request{handler = self()}, Timeout),
- Pid ! reref(receive T -> T end, Ref, TRef).
-
-reref({T, Ref, R}, Ref, TRef) ->
- {T, TRef, R};
-reref(T, _, _) ->
- T.
-
-%% send/2
-
-send(Pid, Pkt) ->
- Pid ! {send, Pkt}.
-
-%% retransmit/4
-
-retransmit({TPid, Caps, #diameter_app{alias = Alias} = App},
- #request{app = Alias,
- packet = Pkt}
- = Req,
- SvcName,
- Timeout) ->
- have_request(Pkt, TPid) %% Don't failover to a peer we've
- andalso ?THROW(timeout), %% already sent to.
-
- case cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]) of
- {send, P} ->
- retransmit(make_request_packet(P, Pkt), TPid, Caps, Req, Timeout);
- {discard, Reason} ->
- ?THROW(Reason);
- discard ->
- ?THROW(discarded);
- T ->
- ?ERROR({invalid_return, prepare_retransmit, App, T})
- end.
-
-%% retransmit/5
-
-retransmit(Pkt, TPid, Caps, #request{dictionary = Dict} = Req, Timeout) ->
- EPkt = encode(Dict, Pkt),
-
- NewReq = Req#request{transport = TPid,
- packet = Pkt,
- caps = Caps},
-
- ?LOG(retransmission, NewReq),
- TRef = send_request(TPid, EPkt, NewReq, Timeout),
- {TRef, NewReq}.
-
-%% store_request/4
-
-store_request(TPid, Bin, Req, Timeout) ->
- Seqs = diameter_codec:sequence_numbers(Bin),
- TRef = erlang:start_timer(Timeout, self(), timeout),
- ets:insert(?REQUEST_TABLE, {Seqs, Req, TRef}),
- ets:member(?REQUEST_TABLE, TPid)
- orelse (self() ! {failover, TRef}), %% possibly missed failover
- TRef.
-
-%% lookup_request/2
-
-lookup_request(Msg, TPid)
- when is_pid(TPid) ->
- lookup(Msg, TPid, '_');
-
-lookup_request(Msg, TRef)
- when is_reference(TRef) ->
- lookup(Msg, '_', TRef).
-
-lookup(Msg, TPid, TRef) ->
- Seqs = diameter_codec:sequence_numbers(Msg),
- Spec = [{{Seqs, #request{transport = TPid, _ = '_'}, TRef},
- [],
- ['$_']}],
- case ets:select(?REQUEST_TABLE, Spec) of
- [{_, Req, _}] ->
- Req;
- [] ->
- false
- end.
-
-%% erase_request/1
-
-erase_request(Pkt) ->
- ets:delete(?REQUEST_TABLE, diameter_codec:sequence_numbers(Pkt)).
-
-%% match_requests/1
-
-match_requests(TPid) ->
- Pat = {'_', #request{transport = TPid, _ = '_'}, '_'},
- ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}]).
-
-%% have_request/2
-
-have_request(Pkt, TPid) ->
- Seqs = diameter_codec:sequence_numbers(Pkt),
- Pat = {Seqs, #request{transport = TPid, _ = '_'}, '_'},
- '$end_of_table' /= ets:select(?REQUEST_TABLE, [{Pat, [], ['$_']}], 1).
-
-%% request_peer_up/1
-
-request_peer_up(TPid) ->
- ets:insert(?REQUEST_TABLE, {TPid}).
-
-%% request_peer_down/2
-
-request_peer_down(TPid, S) ->
- ets:delete(?REQUEST_TABLE, TPid),
- lists:foreach(fun(T) -> failover(T,S) end, match_requests(TPid)).
-%% Note that a request process can store its request after failover
-%% notifications are sent here: store_request/4 sends the notification
-%% in that case. Note also that we'll send as many notifications to a
-%% given handler as there are peers its sent to. All but one of these
-%% will be ignored.
-
-%%% ---------------------------------------------------------------------------
-%%% recv_request/3
-%%% ---------------------------------------------------------------------------
-
-recv_request(TPid, Pkt, {ConnT, SvcName, Apps}) ->
- try ets:lookup(ConnT, TPid) of
- [C] ->
- recv_request(C, TPid, Pkt, SvcName, Apps);
- [] -> %% transport has gone down
- ok
- catch
- error: badarg -> %% service has gone down (and taken table with it)
- ok
- end.
-
-%% recv_request/5
-
-recv_request(#conn{apps = SApps, caps = Caps}, TPid, Pkt, SvcName, Apps) ->
- #diameter_caps{origin_host = {OH,_},
- origin_realm = {OR,_}}
- = Caps,
-
- #diameter_packet{header = #diameter_header{application_id = Id}}
- = Pkt,
-
- recv_request(find_recv_app(Id, SApps),
- {SvcName, OH, OR},
- TPid,
- Apps,
- Caps,
- Pkt).
-
-%% find_recv_app/2
-
-%% No one should be sending the relay identifier.
-find_recv_app(?APP_ID_RELAY, _) ->
- false;
-
-%% With any other id we either support it locally or as a relay.
-find_recv_app(Id, SApps) ->
- keyfind([Id, ?APP_ID_RELAY], 1, SApps).
-
-%% keyfind/3
-
-keyfind([], _, _) ->
- false;
-keyfind([Key | Rest], Pos, L) ->
- case lists:keyfind(Key, Pos, L) of
- false ->
- keyfind(Rest, Pos, L);
- T ->
- T
- end.
-
-%% recv_request/6
-
-recv_request({Id, Alias}, T, TPid, Apps, Caps, Pkt) ->
- #diameter_app{dictionary = Dict}
- = A
- = find_app(Alias, Apps),
- recv_request(T, {TPid, Caps}, A, diameter_codec:decode(Id, Dict, Pkt));
-%% Note that the decode is different depending on whether or not Id is
-%% ?APP_ID_RELAY.
-
-%% DIAMETER_APPLICATION_UNSUPPORTED 3007
-%% A request was sent for an application that is not supported.
-
-recv_request(false, T, TPid, _, _, Pkt) ->
- As = collect_avps(Pkt),
- protocol_error(3007, T, TPid, Pkt#diameter_packet{avps = As}).
-
-collect_avps(Pkt) ->
- case diameter_codec:collect_avps(Pkt) of
- {_Bs, As} ->
- As;
- As ->
- As
- end.
-
-%% recv_request/4
-
-%% Wrong number of bits somewhere in the message: reply.
-%%
-%% DIAMETER_INVALID_AVP_BITS 3009
-%% A request was received that included an AVP whose flag bits are
-%% set to an unrecognized value, or that is inconsistent with the
-%% AVP's definition.
-%%
-recv_request(T, {TPid, _}, _, #diameter_packet{errors = [Bs | _]} = Pkt)
- when is_bitstring(Bs) ->
- protocol_error(3009, T, TPid, Pkt);
-
-%% Either we support this application but don't recognize the command
-%% or we're a relay and the command isn't proxiable.
-%%
-%% DIAMETER_COMMAND_UNSUPPORTED 3001
-%% The Request contained a Command-Code that the receiver did not
-%% recognize or support. This MUST be used when a Diameter node
-%% receives an experimental command that it does not understand.
-%%
-recv_request(T,
- {TPid, _},
- #diameter_app{id = Id},
- #diameter_packet{header = #diameter_header{is_proxiable = P},
- msg = M}
- = Pkt)
- when ?APP_ID_RELAY /= Id, undefined == M;
- ?APP_ID_RELAY == Id, not P ->
- protocol_error(3001, T, TPid, Pkt);
-
-%% Error bit was set on a request.
-%%
-%% DIAMETER_INVALID_HDR_BITS 3008
-%% A request was received whose bits in the Diameter header were
-%% either set to an invalid combination, or to a value that is
-%% inconsistent with the command code's definition.
-%%
-recv_request(T,
- {TPid, _},
- _,
- #diameter_packet{header = #diameter_header{is_error = true}}
- = Pkt) ->
- protocol_error(3008, T, TPid, Pkt);
-
-%% A message in a locally supported application or a proxiable message
-%% in the relay application. Don't distinguish between the two since
-%% each application has its own callback config. That is, the user can
-%% easily distinguish between the two cases.
-recv_request(T, TC, App, Pkt) ->
- request_cb(T, TC, App, examine(Pkt)).
-
-%% Note that there may still be errors but these aren't protocol
-%% (3xxx) errors that lead to an answer-message.
-
-request_cb({SvcName, _OH, _OR} = T, TC, App, Pkt) ->
- request_cb(cb(App, handle_request, [Pkt, SvcName, TC]), App, T, TC, Pkt).
-
-%% examine/1
-%%
-%% Look for errors in a decoded message. Length errors result in
-%% decode failure in diameter_codec.
-
-examine(#diameter_packet{header = #diameter_header{version
- = ?DIAMETER_VERSION}}
- = Pkt) ->
- Pkt;
-
-%% DIAMETER_UNSUPPORTED_VERSION 5011
-%% This error is returned when a request was received, whose version
-%% number is unsupported.
-
-examine(#diameter_packet{errors = Es} = Pkt) ->
- Pkt#diameter_packet{errors = [5011 | Es]}.
-%% It's odd/unfortunate that this isn't a protocol error.
-
-%% request_cb/5
-
-%% A reply may be an answer-message, constructed either here or by
-%% the handle_request callback. The header from the incoming request
-%% is passed into the encode so that it can retrieve the relevant
-%% command code in this case. It will also then ignore Dict and use
-%% the base encoder.
-request_cb({reply, Ans},
- #diameter_app{dictionary = Dict},
- _,
- {TPid, _},
- Pkt) ->
- reply(Ans, Dict, TPid, Pkt);
-
-%% An 3xxx result code, for which the E-bit is set in the header.
-request_cb({protocol_error, RC}, _, T, {TPid, _}, Pkt)
- when 3000 =< RC, RC < 4000 ->
- protocol_error(RC, T, TPid, Pkt);
-
-%% RFC 3588 says we must reply 3001 to anything unrecognized or
-%% unsupported. 'noreply' is undocumented (and inappropriately named)
-%% backwards compatibility for this, protocol_error the documented
-%% alternative.
-request_cb(noreply, _, T, {TPid, _}, Pkt) ->
- protocol_error(3001, T, TPid, Pkt);
-
-%% Relay a request to another peer. This is equivalent to doing an
-%% explicit call/4 with the message in question except that (1) a loop
-%% will be detected by examining Route-Record AVP's, (3) a
-%% Route-Record AVP will be added to the outgoing request and (3) the
-%% End-to-End Identifier will default to that in the
-%% #diameter_header{} without the need for an end_to_end_identifier
-%% option.
-%%
-%% relay and proxy are similar in that they require the same handling
-%% with respect to Route-Record and End-to-End identifier. The
-%% difference is that a proxy advertises specific applications, while
-%% a relay advertises the relay application. If a callback doesn't
-%% want to distinguish between the cases in the callback return value
-%% then 'resend' is a neutral alternative.
-%%
-request_cb({A, Opts},
- #diameter_app{id = Id}
- = App,
- T,
- TC,
- Pkt)
- when A == relay, Id == ?APP_ID_RELAY;
- A == proxy, Id /= ?APP_ID_RELAY;
- A == resend ->
- resend(Opts, App, T, TC, Pkt);
-
-request_cb(discard, _, _, _, _) ->
- ok;
-
-request_cb({eval, RC, F}, App, T, TC, Pkt) ->
- request_cb(RC, App, T, TC, Pkt),
- diameter_lib:eval(F).
-
-%% protocol_error/4
-
-protocol_error(RC, {_, OH, OR}, TPid, #diameter_packet{avps = Avps} = Pkt) ->
- ?LOG({error, RC}, Pkt),
- reply(answer_message({OH, OR, RC}, Avps), ?BASE, TPid, Pkt).
-
-%% resend/5
-%%
-%% Resend a message as a relay or proxy agent.
-
-resend(Opts,
- #diameter_app{} = App,
- {_SvcName, OH, _OR} = T,
- {_TPid, _Caps} = TC,
- #diameter_packet{avps = Avps} = Pkt) ->
- {Code, _Flags, Vid} = ?BASE:avp_header('Route-Record'),
- resend(is_loop(Code, Vid, OH, Avps), Opts, App, T, TC, Pkt).
-
-%% DIAMETER_LOOP_DETECTED 3005
-%% An agent detected a loop while trying to get the message to the
-%% intended recipient. The message MAY be sent to an alternate peer,
-%% if one is available, but the peer reporting the error has
-%% identified a configuration problem.
-
-resend(true, _, _, T, {TPid, _}, Pkt) -> %% Route-Record loop
- protocol_error(3005, T, TPid, Pkt);
-
-%% 6.1.8. Relaying and Proxying Requests
-%%
-%% A relay or proxy agent MUST append a Route-Record AVP to all requests
-%% forwarded. The AVP contains the identity of the peer the request was
-%% received from.
-
-resend(false,
- Opts,
- App,
- {SvcName, _, _} = T,
- {TPid, #diameter_caps{origin_host = {_, OH}}},
- #diameter_packet{header = Hdr0,
- avps = Avps}
- = Pkt) ->
- Route = #diameter_avp{data = {?BASE, 'Route-Record', OH}},
- Seq = diameter_session:sequence(),
- Hdr = Hdr0#diameter_header{hop_by_hop_id = Seq},
- Msg = [Hdr, Route | Avps],
- resend(call(SvcName, App, Msg, Opts), T, TPid, Pkt).
-%% The incoming request is relayed with the addition of a
-%% Route-Record. Note the requirement on the return from call/4 below,
-%% which places a requirement on the value returned by the
-%% handle_answer callback of the application module in question.
-%%
-%% Note that there's nothing stopping the request from being relayed
-%% back to the sender. A pick_peer callback may want to avoid this but
-%% a smart peer might recognize the potential loop and choose another
-%% route. A less smart one will probably just relay the request back
-%% again and force us to detect the loop. A pick_peer that wants to
-%% avoid this can specify filter to avoid the possibility.
-%% Eg. {neg, {host, OH} where #diameter_caps{origin_host = {OH, _}}.
-%%
-%% RFC 6.3 says that a relay agent does not modify Origin-Host but
-%% says nothing about a proxy. Assume it should behave the same way.
-
-%% resend/4
-%%
-%% Relay a reply to a relayed request.
-
-%% Answer from the peer: reset the hop by hop identifier and send.
-resend(#diameter_packet{bin = B}
- = Pkt,
- _,
- TPid,
- #diameter_packet{header = #diameter_header{hop_by_hop_id = Id},
- transport_data = TD}) ->
- send(TPid, Pkt#diameter_packet{bin = diameter_codec:hop_by_hop_id(Id, B),
- transport_data = TD});
-%% TODO: counters
-
-%% Or not: DIAMETER_UNABLE_TO_DELIVER.
-resend(_, T, TPid, Pkt) ->
- protocol_error(3002, T, TPid, Pkt).
-
-%% is_loop/4
-%%
-%% Is there a Route-Record AVP with our Origin-Host?
-
-is_loop(Code,
- Vid,
- Bin,
- [#diameter_avp{code = Code, vendor_id = Vid, data = Bin} | _]) ->
- true;
-
-is_loop(_, _, _, []) ->
- false;
-
-is_loop(Code, Vid, OH, [_ | Avps])
- when is_binary(OH) ->
- is_loop(Code, Vid, OH, Avps);
-
-is_loop(Code, Vid, OH, Avps) ->
- is_loop(Code, Vid, ?BASE:avp(encode, OH, 'Route-Record'), Avps).
-
-%% reply/4
-%%
-%% Send a locally originating reply.
-
-%% Skip the setting of Result-Code and Failed-AVP's below.
-reply([Msg], Dict, TPid, Pkt)
- when is_list(Msg);
- is_tuple(Msg) ->
- reply(Msg, Dict, TPid, Pkt#diameter_packet{errors = []});
-
-%% No errors or a diameter_header/avp list.
-reply(Msg, Dict, TPid, #diameter_packet{errors = Es,
- transport_data = TD}
- = ReqPkt)
- when [] == Es;
- is_record(hd(Msg), diameter_header) ->
- Pkt = diameter_codec:encode(Dict, make_answer_packet(Msg, ReqPkt)),
- incr(send, Pkt, Dict, TPid), %% count result codes in sent answers
- send(TPid, Pkt#diameter_packet{transport_data = TD});
-
-%% Or not: set Result-Code and Failed-AVP AVP's.
-reply(Msg, Dict, TPid, #diameter_packet{errors = [H|_] = Es} = Pkt) ->
- reply(rc(Msg, rc(H), [A || {_,A} <- Es], Dict),
- Dict,
- TPid,
- Pkt#diameter_packet{errors = []}).
-
-%% make_answer_packet/2
-
-%% Binaries and header/avp lists are sent as-is.
-make_answer_packet(Bin, _)
- when is_binary(Bin) ->
- #diameter_packet{bin = Bin};
-make_answer_packet([#diameter_header{} | _] = Msg, _) ->
- #diameter_packet{msg = Msg};
-
-%% Otherwise a reply message clears the R and T flags and retains the
-%% P flag. The E flag will be set at encode. 6.2 of 3588 requires the
-%% same P flag on an answer as on the request.
-make_answer_packet(Msg, #diameter_packet{header = ReqHdr}) ->
- Hdr = ReqHdr#diameter_header{version = ?DIAMETER_VERSION,
- is_request = false,
- is_error = undefined,
- is_retransmitted = false},
- #diameter_packet{header = Hdr,
- msg = Msg}.
-
-%% rc/1
-
-rc({RC, _}) ->
- RC;
-rc(RC) ->
- RC.
-
-%% rc/4
-
-rc(Rec, RC, Failed, Dict)
- when is_integer(RC) ->
- set(Rec,
- lists:append([rc(Rec, {'Result-Code', RC}, Dict),
- failed_avp(Rec, Failed, Dict)]),
- Dict).
-
-%% Reply as name and tuple list ...
-set([_|_] = Ans, Avps, _) ->
- Ans ++ Avps; %% Values nearer tail take precedence.
-
-%% ... or record.
-set(Rec, Avps, Dict) ->
- Dict:'#set-'(Avps, Rec).
-
-%% rc/3
-%%
-%% Turn the result code into a list if its optional and only set it if
-%% the arity is 1 or {0,1}. In other cases (which probably shouldn't
-%% exist in practise) we can't know what's appropriate.
-
-rc([MsgName | _], {'Result-Code' = K, RC} = T, Dict) ->
- case Dict:avp_arity(MsgName, 'Result-Code') of
- 1 -> [T];
- {0,1} -> [{K, [RC]}];
- _ -> []
- end;
-
-rc(Rec, T, Dict) ->
- rc([Dict:rec2msg(element(1, Rec))], T, Dict).
-
-%% failed_avp/3
-
-failed_avp(_, [] = No, _) ->
- No;
-
-failed_avp(Rec, Failed, Dict) ->
- [fa(Rec, [{'AVP', Failed}], Dict)].
-
-%% Reply as name and tuple list ...
-fa([MsgName | Values], FailedAvp, Dict) ->
- R = Dict:msg2rec(MsgName),
- try
- Dict:'#info-'(R, {index, 'Failed-AVP'}),
- {'Failed-AVP', [FailedAvp]}
- catch
- error: _ ->
- Avps = proplists:get_value('AVP', Values, []),
- A = #diameter_avp{name = 'Failed-AVP',
- value = FailedAvp},
- {'AVP', [A|Avps]}
- end;
-
-%% ... or record.
-fa(Rec, FailedAvp, Dict) ->
- try
- {'Failed-AVP', [FailedAvp]}
- catch
- error: _ ->
- Avps = Dict:'get-'('AVP', Rec),
- A = #diameter_avp{name = 'Failed-AVP',
- value = FailedAvp},
- {'AVP', [A|Avps]}
- end.
-
-%% 3. Diameter Header
-%%
-%% E(rror) - If set, the message contains a protocol error,
-%% and the message will not conform to the ABNF
-%% described for this command. Messages with the 'E'
-%% bit set are commonly referred to as error
-%% messages. This bit MUST NOT be set in request
-%% messages. See Section 7.2.
-
-%% 3.2. Command Code ABNF specification
-%%
-%% e-bit = ", ERR"
-%% ; If present, the 'E' bit in the Command
-%% ; Flags is set, indicating that the answer
-%% ; message contains a Result-Code AVP in
-%% ; the "protocol error" class.
-
-%% 7.1.3. Protocol Errors
-%%
-%% Errors that fall within the Protocol Error category SHOULD be treated
-%% on a per-hop basis, and Diameter proxies MAY attempt to correct the
-%% error, if it is possible. Note that these and only these errors MUST
-%% only be used in answer messages whose 'E' bit is set.
-
-%% Thus, only construct answers to protocol errors. Other errors
-%% require an message-specific answer and must be handled by the
-%% application.
-
-%% 6.2. Diameter Answer Processing
-%%
-%% When a request is locally processed, the following procedures MUST be
-%% applied to create the associated answer, in addition to any
-%% additional procedures that MAY be discussed in the Diameter
-%% application defining the command:
-%%
-%% - The same Hop-by-Hop identifier in the request is used in the
-%% answer.
-%%
-%% - The local host's identity is encoded in the Origin-Host AVP.
-%%
-%% - The Destination-Host and Destination-Realm AVPs MUST NOT be
-%% present in the answer message.
-%%
-%% - The Result-Code AVP is added with its value indicating success or
-%% failure.
-%%
-%% - If the Session-Id is present in the request, it MUST be included
-%% in the answer.
-%%
-%% - Any Proxy-Info AVPs in the request MUST be added to the answer
-%% message, in the same order they were present in the request.
-%%
-%% - The 'P' bit is set to the same value as the one in the request.
-%%
-%% - The same End-to-End identifier in the request is used in the
-%% answer.
-%%
-%% Note that the error messages (see Section 7.3) are also subjected to
-%% the above processing rules.
-
-%% 7.3. Error-Message AVP
-%%
-%% The Error-Message AVP (AVP Code 281) is of type UTF8String. It MAY
-%% accompany a Result-Code AVP as a human readable error message. The
-%% Error-Message AVP is not intended to be useful in real-time, and
-%% SHOULD NOT be expected to be parsed by network entities.
-
-%% answer_message/2
-
-answer_message({OH, OR, RC}, Avps) ->
- {Code, _, Vid} = ?BASE:avp_header('Session-Id'),
- ['answer-message', {'Origin-Host', OH},
- {'Origin-Realm', OR},
- {'Result-Code', RC}
- | session_id(Code, Vid, Avps)].
-
-session_id(Code, Vid, Avps)
- when is_list(Avps) ->
- try
- {value, #diameter_avp{data = D}} = find_avp(Code, Vid, Avps),
- [{'Session-Id', [?BASE:avp(decode, D, 'Session-Id')]}]
- catch
- error: _ ->
- []
- end.
-
-%% find_avp/3
-
-find_avp(Code, Vid, Avps)
- when is_integer(Code), (undefined == Vid orelse is_integer(Vid)) ->
- find(fun(A) -> is_avp(Code, Vid, A) end, Avps).
-
-%% The final argument here could be a list of AVP's, depending on the case,
-%% but we're only searching at the top level.
-is_avp(Code, Vid, #diameter_avp{code = Code, vendor_id = Vid}) ->
- true;
-is_avp(_, _, _) ->
- false.
-
-find(_, []) ->
- false;
-find(Pred, [H|T]) ->
- case Pred(H) of
- true ->
- {value, H};
- false ->
- find(Pred, T)
- end.
-
-%% 7. Error Handling
-%%
-%% There are certain Result-Code AVP application errors that require
-%% additional AVPs to be present in the answer. In these cases, the
-%% Diameter node that sets the Result-Code AVP to indicate the error
-%% MUST add the AVPs. Examples are:
-%%
-%% - An unrecognized AVP is received with the 'M' bit (Mandatory bit)
-%% set, causes an answer to be sent with the Result-Code AVP set to
-%% DIAMETER_AVP_UNSUPPORTED, and the Failed-AVP AVP containing the
-%% offending AVP.
-%%
-%% - An AVP that is received with an unrecognized value causes an
-%% answer to be returned with the Result-Code AVP set to
-%% DIAMETER_INVALID_AVP_VALUE, with the Failed-AVP AVP containing the
-%% AVP causing the error.
-%%
-%% - A command is received with an AVP that is omitted, yet is
-%% mandatory according to the command's ABNF. The receiver issues an
-%% answer with the Result-Code set to DIAMETER_MISSING_AVP, and
-%% creates an AVP with the AVP Code and other fields set as expected
-%% in the missing AVP. The created AVP is then added to the Failed-
-%% AVP AVP.
-%%
-%% The Result-Code AVP describes the error that the Diameter node
-%% encountered in its processing. In case there are multiple errors,
-%% the Diameter node MUST report only the first error it encountered
-%% (detected possibly in some implementation dependent order). The
-%% specific errors that can be described by this AVP are described in
-%% the following section.
-
-%% 7.5. Failed-AVP AVP
-%%
-%% The Failed-AVP AVP (AVP Code 279) is of type Grouped and provides
-%% debugging information in cases where a request is rejected or not
-%% fully processed due to erroneous information in a specific AVP. The
-%% value of the Result-Code AVP will provide information on the reason
-%% for the Failed-AVP AVP.
-%%
-%% The possible reasons for this AVP are the presence of an improperly
-%% constructed AVP, an unsupported or unrecognized AVP, an invalid AVP
-%% value, the omission of a required AVP, the presence of an explicitly
-%% excluded AVP (see tables in Section 10), or the presence of two or
-%% more occurrences of an AVP which is restricted to 0, 1, or 0-1
-%% occurrences.
-%%
-%% A Diameter message MAY contain one Failed-AVP AVP, containing the
-%% entire AVP that could not be processed successfully. If the failure
-%% reason is omission of a required AVP, an AVP with the missing AVP
-%% code, the missing vendor id, and a zero filled payload of the minimum
-%% required length for the omitted AVP will be added.
-
-%%% ---------------------------------------------------------------------------
-%%% # handle_answer/3
-%%% ---------------------------------------------------------------------------
-
-%% Process an answer message in call-specific process.
-
-handle_answer(SvcName, _, {error, Req, Reason}) ->
- handle_error(Req, Reason, SvcName);
-
-handle_answer(SvcName,
- AnswerErrors,
- {answer, #request{dictionary = Dict} = Req, Pkt}) ->
- answer(examine(diameter_codec:decode(Dict, Pkt)),
- SvcName,
- AnswerErrors,
- Req).
-
-%% We don't really need to do a full decode if we're a relay and will
-%% just resend with a new hop by hop identifier, but might a proxy
-%% want to examine the answer?
-
-answer(Pkt, SvcName, AE, #request{transport = TPid,
- dictionary = Dict}
- = Req) ->
- try
- incr(recv, Pkt, Dict, TPid)
- of
- _ -> a(Pkt, SvcName, AE, Req)
- catch
- exit: {invalid_error_bit, _} = E ->
- a(Pkt#diameter_packet{errors = [E]}, SvcName, AE, Req)
- end.
-
-a(#diameter_packet{errors = Es} = Pkt, SvcName, AE, #request{transport = TPid,
- caps = Caps,
- packet = P}
- = Req)
- when [] == Es;
- callback == AE ->
- cb(Req, handle_answer, [Pkt, msg(P), SvcName, {TPid, Caps}]);
-
-a(Pkt, SvcName, report, Req) ->
- x(errors, handle_answer, [SvcName, Req, Pkt]);
-
-a(Pkt, SvcName, discard, Req) ->
- x({errors, handle_answer, [SvcName, Req, Pkt]}).
-
-%% Note that we don't check that the application id in the answer's
-%% header is what we expect. (TODO: Does the rfc says anything about
-%% this?)
-
-%% incr/4
-%%
-%% Increment a stats counter for an incoming or outgoing message.
-
-%% TODO: fix
-incr(_, #diameter_packet{msg = undefined}, _, _) ->
- ok;
-
-incr(recv = D, #diameter_packet{header = H, errors = [_|_]}, _, TPid) ->
- incr(TPid, {diameter_codec:msg_id(H), D, error});
-
-incr(Dir, Pkt, Dict, TPid) ->
- #diameter_packet{header = #diameter_header{is_error = E}
- = Hdr,
- msg = Rec}
- = Pkt,
-
- RC = int(get_avp_value(Dict, 'Result-Code', Rec)),
- PE = is_protocol_error(RC),
-
- %% Check that the E bit is set only for 3xxx result codes.
- (not (E orelse PE))
- orelse (E andalso PE)
- orelse x({invalid_error_bit, RC}, answer, [Dir, Pkt]),
-
- irc(TPid, Hdr, Dir, rc_counter(Dict, Rec, RC)).
-
-irc(_, _, _, undefined) ->
- false;
-
-irc(TPid, Hdr, Dir, Ctr) ->
- incr(TPid, {diameter_codec:msg_id(Hdr), Dir, Ctr}).
-
-%% incr/2
-
-incr(TPid, Counter) ->
- diameter_stats:incr(Counter, TPid, 1).
-
-%% error_counter/2
-
-%% RFC 3588, 7.6:
-%%
-%% All Diameter answer messages defined in vendor-specific
-%% applications MUST include either one Result-Code AVP or one
-%% Experimental-Result AVP.
-%%
-%% Maintain statistics assuming one or the other, not both, which is
-%% surely the intent of the RFC.
-
-rc_counter(Dict, Rec, undefined) ->
- er(get_avp_value(Dict, 'Experimental-Result', Rec));
-rc_counter(_, _, RC) ->
- {'Result-Code', RC}.
-
-%% Outgoing answers may be in any of the forms messages can be sent
-%% in. Incoming messages will be records. We're assuming here that the
-%% arity of the result code AVP's is 0 or 1.
-
-er([{_,_,N} = T | _])
- when is_integer(N) ->
- T;
-er({_,_,N} = T)
- when is_integer(N) ->
- T;
-er(_) ->
- undefined.
-
-%% Extract the first good looking integer. There's no guarantee
-%% that what we're looking for has arity 1.
-int([N|_])
- when is_integer(N) ->
- N;
-int(N)
- when is_integer(N) ->
- N;
-int(_) ->
- undefined.
-
-is_protocol_error(RC) ->
- 3000 =< RC andalso RC < 4000.
-
--spec x(any(), atom(), list()) -> no_return().
-
-%% Warn and exit request process on errors in an incoming answer.
-x(Reason, F, A) ->
- diameter_lib:warning_report(Reason, {?MODULE, F, A}),
- x(Reason).
-
-x(T) ->
- exit(T).
-
-%%% ---------------------------------------------------------------------------
-%%% # failover/[23]
-%%% ---------------------------------------------------------------------------
-
-%% Failover as a consequence of request_peer_down/2.
-failover({_, #request{handler = Pid} = Req, TRef}, S) ->
- Pid ! {failover, TRef, rt(Req, S)}.
-
-%% Failover as a consequence of store_request/4.
-failover(TRef, Seqs, S)
- when is_reference(TRef) ->
- case lookup_request(Seqs, TRef) of
- #request{} = Req ->
- failover({Seqs, Req, TRef}, S);
- false ->
- ok
- end.
-
-%% prepare_request returned a binary ...
-rt(#request{packet = #diameter_packet{msg = undefined}}, _) ->
- false; %% TODO: Not what we should do.
-
-%% ... or not.
-rt(#request{packet = #diameter_packet{msg = Msg},
- dictionary = Dict}
- = Req,
- S) ->
- find_transport(get_destination(Dict, Msg), Req, S).
-
-%%% ---------------------------------------------------------------------------
-%%% # report_status/5
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # report_status/5
+%% ---------------------------------------------------------------------------
report_status(Status,
- #peer{ref = Ref,
- conn = TPid,
- type = Type,
- options = Opts},
- #conn{apps = [_|_] = As,
+ #watchdog{ref = Ref,
+ peer = TPid,
+ type = Type,
+ options = Opts},
+ #peer{apps = [_|_] = As,
caps = Caps},
#state{service_name = SvcName}
= S,
@@ -2465,230 +1251,78 @@ send_event(SvcName, Info) ->
send_event(#diameter_event{service = SvcName} = E) ->
lists:foreach(fun({_, Pid}) -> Pid ! E end, subscriptions(SvcName)).
-%%% ---------------------------------------------------------------------------
-%%% # share_peer/5
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # share_peer/5
+%% ---------------------------------------------------------------------------
-share_peer(up, Caps, Aliases, TPid, #state{share_peers = true,
+share_peer(up, Caps, Aliases, TPid, #state{options = [_, {_, true} | _],
service_name = Svc}) ->
diameter_peer:notify(Svc, {peer, TPid, Aliases, Caps});
share_peer(_, _, _, _, _) ->
ok.
-%%% ---------------------------------------------------------------------------
-%%% # share_peers/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # share_peers/2
+%% ---------------------------------------------------------------------------
-share_peers(Pid, #state{share_peers = true,
+share_peers(Pid, #state{options = [_, {_, true} | _],
local_peers = PDict}) ->
?Dict:fold(fun(A,Ps,ok) -> sp(Pid, A, Ps), ok end, ok, PDict);
-share_peers(_, #state{share_peers = false}) ->
+share_peers(_, _) ->
ok.
sp(Pid, Alias, Peers) ->
lists:foreach(fun({P,C}) -> Pid ! {peer, P, [Alias], C} end, Peers).
-%%% ---------------------------------------------------------------------------
-%%% # remote_peer_up/4
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # remote_peer_up/4
+%% ---------------------------------------------------------------------------
-remote_peer_up(Pid, Aliases, Caps, #state{use_shared_peers = true,
+remote_peer_up(Pid, Aliases, Caps, #state{options = [_, _, {_, true} | _],
service = Svc,
- shared_peers = PDict}
- = S) ->
+ shared_peers = PDict}) ->
#diameter_service{applications = Apps} = Svc,
- Update = lists:filter(fun(A) ->
- lists:keymember(A, #diameter_app.alias, Apps)
- end,
- Aliases),
- S#state{shared_peers = rpu(Pid, Caps, PDict, Update)};
+ Key = #diameter_app.alias,
+ As = lists:filter(fun(A) -> lists:keymember(A, Key, Apps) end, Aliases),
+ rpu(Pid, Caps, PDict, As);
-remote_peer_up(_, _, _, #state{use_shared_peers = false} = S) ->
- S.
+remote_peer_up(_, _, _, #state{options = [_, _, {_, false} | _]}) ->
+ ok.
rpu(_, _, PDict, []) ->
PDict;
rpu(Pid, Caps, PDict, Aliases) ->
erlang:monitor(process, Pid),
T = {Pid, Caps},
- lists:foldl(fun(A,D) -> ?Dict:append(A, T, D) end,
- PDict,
- Aliases).
+ lists:foreach(fun(A) -> ?Dict:append(A, T, PDict) end, Aliases).
-%%% ---------------------------------------------------------------------------
-%%% # remote_peer_down/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # remote_peer_down/2
+%% ---------------------------------------------------------------------------
-remote_peer_down(Pid, #state{use_shared_peers = true,
- shared_peers = PDict}
- = S) ->
- S#state{shared_peers = lists:foldl(fun(A,D) -> rpd(Pid, A, D) end,
- PDict,
- ?Dict:fetch_keys(PDict))}.
+remote_peer_down(Pid, #state{options = [_, _, {_, true} | _],
+ shared_peers = PDict}) ->
+ lists:foreach(fun(A) -> rpd(Pid, A, PDict) end, ?Dict:fetch_keys(PDict)).
rpd(Pid, Alias, PDict) ->
?Dict:update(Alias, fun(Ps) -> lists:keydelete(Pid, 1, Ps) end, PDict).
-%%% ---------------------------------------------------------------------------
-%%% find_transport/[34]
-%%%
-%%% Output: {TransportPid, #diameter_caps{}, #diameter_app{}}
-%%% | false
-%%% | {error, Reason}
-%%% ---------------------------------------------------------------------------
-
-%% Initial call, from an arbitrary process.
-find_transport({alias, Alias}, Msg, Opts, #state{service = Svc} = S) ->
- #diameter_service{applications = Apps} = Svc,
- ft(find_send_app(Alias, Apps), Msg, Opts, S);
-
-%% Relay or proxy send.
-find_transport(#diameter_app{} = App, Msg, Opts, S) ->
- ft(App, Msg, Opts, S).
-
-ft(#diameter_app{module = Mod, dictionary = Dict} = App, Msg, Opts, S) ->
- #options{filter = Filter,
- extra = Xtra}
- = Opts,
- pick_peer(App#diameter_app{module = Mod ++ Xtra},
- get_destination(Dict, Msg),
- Filter,
- S);
-ft(false = No, _, _, _) ->
- No.
-
-%% This can't be used if we're a relay and sending a message
-%% in an application not known locally. (TODO)
-find_send_app(Alias, Apps) ->
- case lists:keyfind(Alias, #diameter_app.alias, Apps) of
- #diameter_app{id = ?APP_ID_RELAY} ->
- false;
- T ->
- T
- end.
-
-%% Retransmission, in the service process.
-find_transport([_,_] = RH,
- Req,
- #state{service = #diameter_service{pid = Pid,
- applications = Apps}}
- = S)
- when self() == Pid ->
- #request{app = Alias,
- filter = Filter,
- module = ModX}
- = Req,
- #diameter_app{}
- = App
- = lists:keyfind(Alias, #diameter_app.alias, Apps),
-
- pick_peer(App#diameter_app{module = ModX},
- RH,
- Filter,
- S).
-
-%% get_destination/2
-
-get_destination(Dict, Msg) ->
- [str(get_avp_value(Dict, 'Destination-Realm', Msg)),
- str(get_avp_value(Dict, 'Destination-Host', Msg))].
-
-%% This is not entirely correct. The avp could have an arity 1, in
-%% which case an empty list is a DiameterIdentity of length 0 rather
-%% than the list of no values we treat it as by mapping to undefined.
-%% This behaviour is documented.
-str([]) ->
- undefined;
-str(T) ->
- T.
-
-%% get_avp_value/3
-%%
-%% Find an AVP in a message of one of three forms:
-%%
-%% - a message record (as generated from a .dia spec) or
-%% - a list of an atom message name followed by 2-tuple, avp name/value pairs.
-%% - a list of a #diameter_header{} followed by #diameter_avp{} records,
-%%
-%% In the first two forms a dictionary module is used at encode to
-%% identify the type of the AVP and its arity in the message in
-%% question. The third form allows messages to be sent as is, without
-%% a dictionary, which is needed in the case of relay agents, for one.
-
-%% Messages will be header/avps list as a relay and the only AVP's we
-%% look for are in the common dictionary. This is required since the
-%% relay dictionary doesn't inherit the common dictionary (which maybe
-%% it should).
-get_avp_value(?RELAY, Name, Msg) ->
- get_avp_value(?BASE, Name, Msg);
-
-%% Message sent as a header/avps list, probably a relay case but not
-%% necessarily.
-get_avp_value(Dict, Name, [#diameter_header{} | Avps]) ->
- try
- {Code, _, VId} = Dict:avp_header(Name),
- [A|_] = lists:dropwhile(fun(#diameter_avp{code = C, vendor_id = V}) ->
- C /= Code orelse V /= VId
- end,
- Avps),
- avp_decode(Dict, Name, A)
- catch
- error: _ ->
- undefined
- end;
-
-%% Outgoing message as a name/values list.
-get_avp_value(_, Name, [_MsgName | Avps]) ->
- case lists:keyfind(Name, 1, Avps) of
- {_, V} ->
- V;
- _ ->
- undefined
- end;
-
-%% Record might be an answer message in the common dictionary.
-get_avp_value(Dict, Name, Rec)
- when Dict /= ?BASE, element(1, Rec) == 'diameter_base_answer-message' ->
- get_avp_value(?BASE, Name, Rec);
-
-%% Message is typically a record but not necessarily: diameter:call/4
-%% can be passed an arbitrary term.
-get_avp_value(Dict, Name, Rec) ->
- try
- Dict:'#get-'(Name, Rec)
- catch
- error:_ ->
- undefined
- end.
-
-avp_decode(Dict, Name, #diameter_avp{value = undefined,
- data = Bin}) ->
- Dict:avp(decode, Bin, Name);
-avp_decode(_, _, #diameter_avp{value = V}) ->
- V.
-
-%%% ---------------------------------------------------------------------------
-%%% # pick_peer(App, [DestRealm, DestHost], Filter, #state{})
-%%%
-%%% Output: {TransportPid, #diameter_caps{}, App}
-%%% | false
-%%% | {error, Reason}
-%%% ---------------------------------------------------------------------------
-
-%% Find transports to a given realm/host.
+%% ---------------------------------------------------------------------------
+%% pick_peer/4
+%% ---------------------------------------------------------------------------
pick_peer(#diameter_app{alias = Alias}
= App,
- [_,_] = RH,
+ RealmAndHost,
Filter,
#state{local_peers = L,
shared_peers = S,
service_name = SvcName,
service = #diameter_service{pid = Pid}}) ->
- pick_peer(peers(Alias, RH, Filter, L),
- peers(Alias, RH, Filter, S),
+ pick_peer(peers(Alias, RealmAndHost, Filter, L),
+ peers(Alias, RealmAndHost, Filter, S),
Pid,
SvcName,
App).
@@ -2701,7 +1335,12 @@ pick_peer([], [], _, _, _) ->
%% App state is mutable but we're not in the service process: go there.
pick_peer(Local, Remote, Pid, _SvcName, #diameter_app{mutable = true} = App)
when self() /= Pid ->
- call_service(Pid, {pick_peer, Local, Remote, App});
+ case call_service(Pid, {pick_peer, Local, Remote, App}) of
+ {TPid, _} = T when is_pid(TPid) ->
+ T;
+ {error, _} ->
+ false
+ end;
%% App state isn't mutable or it is and we're in the service process:
%% do the deed.
@@ -2709,19 +1348,18 @@ pick_peer(Local,
Remote,
_Pid,
SvcName,
- #diameter_app{module = ModX,
- alias = Alias,
+ #diameter_app{alias = Alias,
init_state = S,
mutable = M}
= App) ->
- MFA = {ModX, pick_peer, [Local, Remote, SvcName]},
+ Args = [Local, Remote, SvcName],
- try state_cb(App, MFA) of
- {ok, {TPid, #diameter_caps{} = Caps}} when is_pid(TPid) ->
- {TPid, Caps, App};
- {{TPid, #diameter_caps{} = Caps}, ModS} when is_pid(TPid), M ->
+ try state_cb(App, pick_peer, Args) of
+ {ok, {TPid, #diameter_caps{}} = T} when is_pid(TPid) ->
+ T;
+ {{TPid, #diameter_caps{}} = T, ModS} when is_pid(TPid), M ->
mod_state(Alias, ModS),
- {TPid, Caps, App};
+ T;
{false = No, ModS} when M ->
mod_state(Alias, ModS),
No;
@@ -2729,15 +1367,17 @@ pick_peer(Local,
No;
false = No ->
No;
- {{TPid, #diameter_caps{} = Caps}, S} when is_pid(TPid) ->
- {TPid, Caps, App}; %% Accept returned state in the immutable
+ {{TPid, #diameter_caps{}} = T, S} when is_pid(TPid) ->
+ T; %% Accept returned state in the immutable
{false = No, S} -> %% case as long it isn't changed.
No;
T ->
- diameter_lib:error_report({invalid, T, App}, MFA)
+ diameter_lib:error_report({invalid, T, App},
+ {App, pick_peer, Args})
catch
E: Reason ->
- diameter_lib:error_report({failure, {E, Reason, ?STACK}}, MFA)
+ diameter_lib:error_report({failure, {E, Reason, ?STACK}},
+ {App, pick_peer, Args})
end.
%% peers/4
@@ -2824,14 +1464,14 @@ eq(Any, Id, PeerId) ->
%% transports/1
-transports(#state{peerT = PeerT}) ->
- ets:select(PeerT, [{#peer{conn = '$1', _ = '_'},
+transports(#state{watchdogT = WatchdogT}) ->
+ ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'},
[{'is_pid', '$1'}],
['$1']}]).
-%%% ---------------------------------------------------------------------------
-%%% # service_info/2
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # service_info/2
+%% ---------------------------------------------------------------------------
%% The config passed to diameter:start_service/2.
-define(CAP_INFO, ['Origin-Host',
@@ -2851,7 +1491,8 @@ transports(#state{peerT = PeerT}) ->
-define(ALL_INFO, [capabilities,
applications,
transport,
- pending]).
+ pending,
+ options]).
%% The rest.
-define(OTHER_INFO, [connections,
@@ -2878,6 +1519,20 @@ tagged_info(Item, S)
undefined
end;
+tagged_info(TPid, #state{watchdogT = WatchdogT, peerT = PeerT})
+ when is_pid(TPid) ->
+ try
+ [#peer{watchdog = Pid}] = ets:lookup(PeerT, TPid),
+ [#watchdog{ref = Ref, type = Type, options = Opts}]
+ = ets:lookup(WatchdogT, Pid),
+ [{ref, Ref},
+ {type, Type},
+ {options, Opts}]
+ catch
+ error:_ ->
+ []
+ end;
+
tagged_info(Items, S)
when is_list(Items) ->
[T || I <- Items, T <- [tagged_info(I,S)], T /= undefined, T /= []];
@@ -2928,6 +1583,7 @@ complete_info(Item, #state{service = Svc} = S) ->
capabilities -> service_info(?CAP_INFO, S);
applications -> info_apps(S);
transport -> info_transport(S);
+ options -> info_options(S);
pending -> info_pending(S);
keys -> ?ALL_INFO ++ ?CAP_INFO ++ ?OTHER_INFO;
all -> service_info(?ALL_INFO, S);
@@ -2951,16 +1607,22 @@ complete(Pre) ->
%% info_stats/1
-info_stats(#state{peerT = PeerT}) ->
- MatchSpec = [{#peer{ref = '$1', conn = '$2', _ = '_'},
+info_stats(#state{watchdogT = WatchdogT}) ->
+ MatchSpec = [{#watchdog{ref = '$1', peer = '$2', _ = '_'},
[{'is_pid', '$2'}],
[['$1', '$2']]}],
- diameter_stats:read(lists:append(ets:select(PeerT, MatchSpec))).
+ try ets:select(WatchdogT, MatchSpec) of
+ L ->
+ diameter_stats:read(lists:append(L))
+ catch
+ error: badarg -> [] %% service has gone down
+ end.
%% info_transport/1
%%
%% One entry per configured transport. Statistics for each entry are
-%% the accumulated values for the ref and associated peer pids.
+%% the accumulated values for the ref and associated watchdog/peer
+%% pids.
info_transport(S) ->
PeerD = peer_dict(S, config_dict(S)),
@@ -2993,34 +1655,42 @@ transport([[_,_] | L]) ->
%% Possibly many peer entries for a listening transport. Note that all
%% have the same options by construction, which is not terribly space
-%% efficient. (TODO: all entries for the same Ref should share options.)
+%% efficient.
transport([[{type, accept}, {options, Opts} | _] | _] = Ls) ->
[{type, listen},
{options, Opts},
{accept, [lists:nthtail(2,L) || L <- Ls]}].
-peer_dict(#state{peerT = PeerT, connT = ConnT}, Dict0) ->
- ets:foldl(fun(T,A) -> peer_acc(ConnT, A, T) end, Dict0, PeerT).
-
-peer_acc(ConnT, Acc, #peer{pid = Pid,
- type = Type,
- ref = Ref,
- options = Opts,
- op_state = OS,
- started = T,
- conn = TPid}) ->
- WS = wd_state(OS),
+peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) ->
+ try ets:tab2list(WatchdogT) of
+ L ->
+ lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L)
+ catch
+ error: badarg -> Dict0 %% service has gone down
+ end.
+
+peer_acc(PeerT, Acc, #watchdog{pid = Pid,
+ type = Type,
+ ref = Ref,
+ options = Opts,
+ state = WS,
+ started = At,
+ peer = TPid}) ->
dict:append(Ref,
[{type, Type},
{options, Opts},
- {watchdog, {Pid, T, WS}}
- | info_conn(ConnT, TPid, WS /= ?WD_DOWN)],
+ {watchdog, {Pid, At, WS}}
+ | info_peer(PeerT, TPid, WS)],
Acc).
-info_conn(ConnT, TPid, true)
- when is_pid(TPid) ->
- info_conn(ets:lookup(ConnT, TPid));
-info_conn(_, _, _) ->
+info_peer(PeerT, TPid, WS)
+ when is_pid(TPid), WS /= ?WD_DOWN ->
+ try ets:lookup(PeerT, TPid) of
+ T -> info_peer(T)
+ catch
+ error: badarg -> [] %% service has gone down
+ end;
+info_peer(_, _, _) ->
[].
%% The point of extracting the config here is so that 'transport' info
@@ -3039,19 +1709,12 @@ config_acc({Ref, T, Opts}, Dict)
config_acc(_, Dict) ->
Dict.
-wd_state({_,S}) ->
- S;
-wd_state(?STATE_UP) ->
- ?WD_OKAY;
-wd_state(?STATE_DOWN) ->
- ?WD_DOWN.
-
-info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
+info_peer([#peer{pid = Pid, apps = SApps, caps = Caps, started = T}]) ->
[{peer, {Pid, T}},
{apps, SApps},
{caps, info_caps(Caps)}
| try [{port, info_port(Pid)}] catch _:_ -> [] end];
-info_conn([] = No) ->
+info_peer([] = No) ->
No.
%% Extract information that the processes involved are expected to
@@ -3085,18 +1748,7 @@ mk_app(#diameter_app{} = A) ->
%% One entry for each outgoing request whose answer is outstanding.
info_pending(#state{} = S) ->
- MatchSpec = [{{'$1',
- #request{transport = '$2',
- from = '$3',
- app = '$4',
- _ = '_'},
- '_'},
- [?ORCOND([{'==', T, '$2'} || T <- transports(S)])],
- [{{'$1', [{{app, '$4'}},
- {{transport, '$2'}},
- {{from, '$3'}}]}}]}],
-
- ets:select(?REQUEST_TABLE, MatchSpec).
+ diameter_traffic:pending(transports(S)).
%% info_connections/1
%%
@@ -3151,3 +1803,8 @@ peer_acc(Peer, {PeerD, RefD}) ->
[{TPid, _}, [{origin_host, {_, OH}} | _]]
= [proplists:get_value(K, Peer) || K <- [peer, caps]],
{dict:append(OH, Peer, PeerD), dict:append(OH, TPid, RefD)}.
+
+%% info_options/1
+
+info_options(S) ->
+ S#state.options.