aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base/diameter_service.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base/diameter_service.erl')
-rw-r--r--lib/diameter/src/base/diameter_service.erl295
1 files changed, 133 insertions, 162 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index e4f77e3a24..cbe66ef27a 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -88,12 +88,6 @@
%% outside of the service process.
-define(STATE_TABLE, ?MODULE).
-%% The default sequence mask.
--define(NOMASK, {0,32}).
-
-%% The default restrict_connections.
--define(RESTRICT, nodes).
-
%% Workaround for dialyzer's lack of understanding of match specs.
-type match(T)
:: T | '_' | '$1' | '$2'.
@@ -110,21 +104,33 @@
service :: #diameter_service{},
watchdogT = ets_new(watchdogs) %% #watchdog{} at start
:: ets:tid(),
- peerT, %% undefined in new code, but remain for upgrade
- shared_peers, %% reasons. Replaced by local/remote.
- local_peers, %%
local :: {ets:tid(), ets:tid(), ets:tid()},
remote :: {ets:tid(), ets:tid(), ets:tid()},
monitor = false :: false | pid(), %% process to die with
- options
- :: [{sequence, diameter:sequence()} %% sequence mask
- | {share_peers, diameter:remotes()} %% broadcast to
- | {use_shared_peers, diameter:remotes()} %% use from
- | {restrict_connections, diameter:restriction()}
- | {strict_mbit, boolean()}
- | {string_decode, boolean()}
- | {incoming_maxlen, diameter:message_length()}]}).
-%% shared_peers reflects the peers broadcast from remote nodes.
+ options :: #{sequence := diameter:sequence(), %% sequence mask
+ share_peers := diameter:remotes(),%% broadcast to
+ use_shared_peers := diameter:remotes(),%% use from
+ restrict_connections := diameter:restriction(),
+ incoming_maxlen := diameter:message_length(),
+ strict_arities => diameter:strict_arities(),
+ strict_mbit := boolean(),
+ decode_format := diameter:decode_format(),
+ avp_dictionaries => nonempty_list(module()),
+ traffic_counters := boolean(),
+ string_decode := boolean(),
+ capabilities_cb => diameter:evaluable(),
+ pool_size => pos_integer(),
+ capx_timeout => diameter:'Unsigned32'(),
+ strict_capx => boolean(),
+ disconnect_cb => diameter:evaluable(),
+ dpr_timeout => diameter:'Unsigned32'(),
+ dpa_timeout => diameter:'Unsigned32'(),
+ length_errors => exit | handle | discard,
+ connect_timer => diameter:'Unsigned32'(),
+ watchdog_timer => diameter:'Unsigned32'()
+ | {module(), atom(), list()},
+ watchdog_config => [{okay|suspect, non_neg_integer()}],
+ spawn_opt := list() | {module(), atom(), list()}}}).
%% Record representing an RFC 3539 watchdog process implemented by
%% diameter_watchdog.
@@ -145,7 +151,7 @@
apps :: match([{0..16#FFFFFFFF, diameter:app_alias()}] %% {Id, Alias}
| [diameter:app_alias()]), %% remote
caps :: match(#diameter_caps{}),
- started = diameter_lib:now(), %% at process start or sharing
+ started = diameter_lib:now(), %% at connection_up
watchdog :: match(pid() %% key into watchdogT
| undefined)}). %% undefined if remote
@@ -284,7 +290,7 @@ whois(SvcName) ->
%% ---------------------------------------------------------------------------
-spec pick_peer(SvcName, AppOrAlias, Opts)
- -> {{TPid, Caps, App}, Mask, SvcOpts}
+ -> {{{TPid, Caps}, App}, SvcOpts}
| false %% no selection
| {error, no_service}
when SvcName :: diameter:service_name(),
@@ -292,14 +298,12 @@ whois(SvcName) ->
| {alias, diameter:app_alias()},
Opts :: {fun((Dict :: module()) -> [term()]),
diameter:peer_filter(),
- Xtra :: list()},
+ Xtra :: list(),
+ [diameter:peer_ref()]},
TPid :: pid(),
Caps :: #diameter_caps{},
App :: #diameter_app{},
- Mask :: diameter:sequence(),
- SvcOpts :: [diameter:service_opt()].
-%% Extract Mask in the returned tuple so that diameter_traffic doesn't
-%% need to know about the ordering of SvcOpts used here.
+ SvcOpts :: map().
pick_peer(SvcName, App, Opts) ->
pick(lookup_state(SvcName), App, Opts).
@@ -319,16 +323,16 @@ pick(#state{service = #diameter_service{applications = Apps}}
pick(_, false = No, _) ->
No;
-pick(#state{options = [{_, Mask} | SvcOpts]}
+pick(#state{options = SvcOpts}
= S,
#diameter_app{module = ModX, dictionary = Dict}
= App0,
- {DestF, Filter, Xtra}) ->
+ {DestF, Filter, Xtra, TPids}) ->
App = App0#diameter_app{module = ModX ++ Xtra},
[_,_] = RealmAndHost = diameter_lib:eval([DestF, Dict]),
- case pick_peer(App, RealmAndHost, Filter, S) of
- {TPid, Caps} ->
- {{TPid, Caps, App}, Mask, SvcOpts};
+ case pick_peer(App, RealmAndHost, [Filter | TPids], S) of
+ {_TPid, _Caps} = TC ->
+ {{TC, App}, SvcOpts};
false = No ->
No
end.
@@ -526,6 +530,13 @@ transition({tc_timeout, T}, S) ->
tc_timeout(T, S),
ok;
+transition({nodeup, Node, _}, S) ->
+ nodeup(Node, S),
+ ok;
+
+transition({nodedown, _Node, _}, _) ->
+ ok;
+
transition(Req, S) ->
unexpected(handle_info, [Req], S),
ok.
@@ -543,94 +554,32 @@ terminate(Reason, #state{service_name = Name, local = {PeerT, _, _}} = S) ->
%% wait for watchdog state changes to take care of if. That this
%% takes place after deleting the state entry ensures that the
%% resulting failover by request processes accomplishes nothing.
- ets:foldl(fun(#peer{pid = TPid}, _) ->
- diameter_traffic:peer_down(TPid)
- end,
- ok,
- PeerT),
+ ets:foldl(fun peer_down/2, ok, PeerT),
shutdown == Reason %% application shutdown
andalso shutdown(application, S).
+%% peer_down/1
+%%
+%% Entries with watchdog state SUSPECT are already down: ignore the
+%% expected failure. This assumes the current implementation, but
+%% double the number of lookups (in the typical case) could be the
+%% greater evil if there are many peer connections.
+
+peer_down(#peer{pid = TPid}, _) ->
+ try
+ diameter_traffic:peer_down(TPid)
+ catch
+ error: {badmatch, []} -> ok
+ end.
+
%% ---------------------------------------------------------------------------
%% # code_change/3
%% ---------------------------------------------------------------------------
-code_change(_FromVsn, #state{} = S, _Extra) ->
- {ok, S};
-
-%% Don't support downgrade since we won't in appup.
-code_change({down = T, _}, _, _Extra) ->
- {error, T};
-
-%% Upgrade local/shared peers dicts populated in old code. Don't
-code_change(_FromVsn, S0, _Extra) ->
- {state, Id, SvcName, Svc, WT, PeerT, SDict, LDict, Monitor, Opts}
- = S0,
-
- init_peers(LT = setelement(1, {PT, _, _} = init_peers(), PeerT),
- fun({_,A}) -> A end),
- init_peers(init_peers(RT = init_peers(), SDict),
- fun(A) -> A end),
-
- S = #state{id = Id,
- service_name = SvcName,
- service = Svc,
- watchdogT = WT,
- peerT = PT, %% empty
- shared_peers = SDict,
- local_peers = LDict,
- local = LT,
- remote = RT,
- monitor = Monitor,
- options = Opts},
-
- %% Replacing the table entry and deleting the old shared tables
- %% can make outgoing requests return {error, no_connection} until
- %% everyone is running new code. Don't delete the tables to avoid
- %% crashing request processes.
- ets:delete_all_objects(SDict),
- ets:delete_all_objects(LDict),
- ets:insert(?STATE_TABLE, S),
+code_change(_FromVsn, S, _Extra) ->
{ok, S}.
-%% init_peers/2
-
-%% Populate app and identity bags from a new-style #peer{} sets.
-init_peers({PeerT, _, _} = T, F)
- when is_function(F) ->
- ets:foldl(fun(#peer{pid = P, apps = As, caps = C}, N) ->
- insert_peer(P, lists:map(F, As), C, T),
- N+1
- end,
- 0,
- PeerT);
-
-%% Populate #peer{} table given a shared peers dict.
-init_peers({PeerT, _, _}, SDict) ->
- dict:fold(fun(P, As, N) ->
- ets:update_element(PeerT, P, {#peer.apps, As}),
- N+1
- end,
- 0,
- diameter_dict:fold(fun(A, Ps, D) ->
- init_peers(A, Ps, PeerT, D)
- end,
- dict:new(),
- SDict)).
-
-%% init_peers/4
-
-init_peers(App, TCs, PeerT, Dict) ->
- lists:foldl(fun({P,C}, D) ->
- ets:insert(PeerT, #peer{pid = P,
- apps = [],
- caps = C}),
- dict:append(P, App, D)
- end,
- Dict,
- TCs).
-
%% ===========================================================================
%% ===========================================================================
@@ -763,12 +712,15 @@ i(SvcName) ->
cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts},
{false, Acc}) ->
lists:foreach(fun init_mod/1, Apps),
+ #{monitor := M}
+ = SvcOpts
+ = service_opts(Opts),
S = #state{service_name = SvcName,
service = Rec#diameter_service{pid = self()},
local = init_peers(),
remote = init_peers(),
- monitor = mref(get_value(monitor, Opts)),
- options = service_options(Opts)},
+ monitor = mref(M),
+ options = maps:remove(monitor, SvcOpts)},
{S, Acc};
cfg_acc({_Ref, Type, _Opts} = T, {S, Acc})
@@ -783,26 +735,39 @@ init_peers() ->
%% Alias,
%% TPid}
-service_options(Opts) ->
- [{sequence, proplists:get_value(sequence, Opts, ?NOMASK)},
- {share_peers, get_value(share_peers, Opts)},
- {use_shared_peers, get_value(use_shared_peers, Opts)},
- {restrict_connections, proplists:get_value(restrict_connections,
- Opts,
- ?RESTRICT)},
- {spawn_opt, proplists:get_value(spawn_opt, Opts, [])},
- {string_decode, proplists:get_value(string_decode, Opts, true)},
- {incoming_maxlen, proplists:get_value(incoming_maxlen, Opts, 16#FFFFFF)},
- {strict_mbit, proplists:get_value(strict_mbit, Opts, true)}].
-%% The order of options is significant since we match against the list.
+service_opts(Opts) ->
+ remove([{strict_arities, true},
+ {avp_dictionaries, []}],
+ maps:merge(maps:from_list([{monitor, false} | def_opts()]),
+ maps:from_list(Opts))).
+
+remove(List, Map) ->
+ maps:filter(fun(K,V) -> not lists:member({K,V}, List) end,
+ Map).
+
+def_opts() -> %% defaults on the service map
+ [{share_peers, false},
+ {use_shared_peers, false},
+ {sequence, {0,32}},
+ {restrict_connections, nodes},
+ {incoming_maxlen, 16#FFFFFF},
+ {strict_arities, true},
+ {strict_mbit, true},
+ {decode_format, record},
+ {avp_dictionaries, []},
+ {traffic_counters, true},
+ {string_decode, true},
+ {spawn_opt, []}].
mref(false = No) ->
No;
mref(P) ->
monitor(process, P).
-init_shared(#state{options = [_, _, {_,T} | _],
+init_shared(#state{options = #{use_shared_peers := T},
service_name = Svc}) ->
+ T == false orelse net_kernel:monitor_nodes(true, [{node_type, visible},
+ nodedown_reason]),
notify(T, Svc, {service, self()}).
init_mod(#diameter_app{alias = Alias,
@@ -812,16 +777,17 @@ init_mod(#diameter_app{alias = Alias,
start_fsm({Ref, Type, Opts}, S) ->
start(Ref, {Type, Opts}, S).
-get_value(Key, Vs) ->
- {_, V} = lists:keyfind(Key, 1, Vs),
- V.
-
notify(Share, SvcName, T) ->
Nodes = remotes(Share),
[] /= Nodes andalso diameter_peer:notify(Nodes, SvcName, T).
%% Test for the empty list for upgrade reasons: there's no
%% diameter_peer:notify/3 in old code.
+nodeup(Node, #state{options = #{share_peers := SP},
+ service_name = SvcName}) ->
+ lists:member(Node, remotes(SP))
+ andalso diameter_peer:notify([Node], SvcName, {service, self()}).
+
remotes(false) ->
[];
@@ -899,7 +865,8 @@ start(Ref, Type, Opts, State) ->
start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT,
local = {PeerT, _, _},
- options = SvcOpts,
+ options = #{string_decode := SD}
+ = SvcOpts,
service_name = SvcName,
service = Svc0})
when Type == connect;
@@ -907,21 +874,29 @@ start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT,
#diameter_service{applications = Apps}
= Svc1
= merge_service(Opts, Svc0),
- Svc = binary_caps(Svc1, proplists:get_value(string_decode, SvcOpts, true)),
- RecvData = diameter_traffic:make_recvdata([SvcName,
- PeerT,
- Apps,
- SvcOpts]),
- T = {{spawn_opts([Opts, SvcOpts]), RecvData}, Opts, SvcOpts, Svc},
+ Svc = binary_caps(Svc1, SD),
+ {SOpts, TOpts} = merge_opts(SvcOpts, Opts),
+ RecvData = diameter_traffic:make_recvdata([SvcName, PeerT, Apps, SOpts]),
+ T = {TOpts, SOpts, RecvData, Svc},
Rec = #watchdog{type = Type,
ref = Ref,
- options = Opts},
+ options = TOpts},
+
diameter_lib:fold_n(fun(_,A) ->
[wd(Type, Ref, T, WatchdogT, Rec) | A]
end,
[],
N).
+merge_opts(SvcOpts, Opts) ->
+ Keys = [K || {K,_} <- def_opts()],
+ SO = [T || {K,_} = T <- Opts, lists:member(K, Keys)],
+ TO = Opts -- SO,
+ {maps:merge(maps:with(Keys, SvcOpts), maps:from_list(SO)),
+ TO ++ [T || {K,_} = T <- maps:to_list(SvcOpts),
+ not lists:member(K, Keys),
+ not lists:keymember(K, 1, Opts)]}.
+
binary_caps(Svc, true) ->
Svc;
binary_caps(#diameter_service{capabilities = Caps} = Svc, false) ->
@@ -936,12 +911,6 @@ wd(Type, Ref, T, WatchdogT, Rec) ->
%% record so that each watchdog may get a different record. This
%% record is what is passed back into application callbacks.
-spawn_opts(Optss) ->
- SpawnOpts = get_value(spawn_opt, Optss, []),
- [T || T <- SpawnOpts,
- T /= link,
- T /= monitor].
-
start_watchdog(Type, Ref, T) ->
{_MRef, Pid} = diameter_watchdog:start({Type, Ref}, T),
Pid.
@@ -1154,18 +1123,6 @@ keyfind([Key | Rest], Pos, L) ->
T
end.
-%% get_value/3
-
-get_value(_, [], Def) ->
- Def;
-get_value(Key, [L | Rest], Def) ->
- case lists:keyfind(Key, 1, L) of
- {_,V} ->
- V;
- _ ->
- get_value(Key, Rest, Def)
- end.
-
%% find_outgoing_app/2
find_outgoing_app(Alias, Apps) ->
@@ -1463,19 +1420,19 @@ send_event(#diameter_event{service = SvcName} = E) ->
%% # share_peer/5
%% ---------------------------------------------------------------------------
-share_peer(up, Caps, Apps, TPid, #state{options = [_, {_,T} | _],
+share_peer(up, Caps, Apps, TPid, #state{options = #{share_peers := SP},
service_name = Svc}) ->
- notify(T, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps});
+ notify(SP, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps});
-share_peer(down, _Caps, _Apps, TPid, #state{options = [_, {_,T} | _],
+share_peer(down, _Caps, _Apps, TPid, #state{options = #{share_peers := SP},
service_name = Svc}) ->
- notify(T, Svc, {peer, TPid}).
+ notify(SP, Svc, {peer, TPid}).
%% ---------------------------------------------------------------------------
%% # share_peers/2
%% ---------------------------------------------------------------------------
-share_peers(Pid, #state{options = [_, {_,SP} | _],
+share_peers(Pid, #state{options = #{share_peers := SP},
local = {PeerT, AppT, _}}) ->
is_remote(Pid, SP)
andalso ets:foldl(fun(T, N) -> N + sp(Pid, AppT, T) end,
@@ -1507,8 +1464,15 @@ is_remote(Pid, T) ->
%% # remote_peer_up/4
%% ---------------------------------------------------------------------------
-remote_peer_up(TPid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) ->
- is_remote(TPid, T) andalso rpu(TPid, Aliases, Caps, S).
+remote_peer_up(TPid, Aliases, Caps, #state{options = #{use_shared_peers := T},
+ remote = {PeerT, _, _}}
+ = S) ->
+ is_remote(TPid, T)
+ andalso not ets:member(PeerT, TPid)
+ andalso rpu(TPid, Aliases, Caps, S).
+
+%% Notification can be duplicate since remote nodes push and the local
+%% node pulls.
rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) ->
#diameter_service{applications = Apps} = Svc,
@@ -1518,6 +1482,7 @@ rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) ->
rpu(_, [] = No, _, _) ->
No;
+
rpu(TPid, Aliases, Caps, {PeerT, _, _} = RT) ->
monitor(process, TPid),
ets:insert(PeerT, #peer{pid = TPid,
@@ -1629,8 +1594,14 @@ pick_peer(Local,
%% peers/4
-peers(Alias, RH, Filter, T) ->
- filter(Alias, RH, Filter, T, true).
+%% No peer options pointing at specific peers: search for them.
+peers(Alias, RH, [Filter], T) ->
+ filter(Alias, RH, Filter, T, true);
+
+%% Or just lookup.
+peers(_Alias, RH, [Filter | TPids], {PeerT, _AppT, _IdentT}) ->
+ {Ts, _} = filter(caps(PeerT, TPids), RH, Filter),
+ Ts.
%% filter/5
%%