aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base/diameter_service.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base/diameter_service.erl')
-rw-r--r--lib/diameter/src/base/diameter_service.erl196
1 files changed, 48 insertions, 148 deletions
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index e4f77e3a24..a976a8b998 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -88,12 +88,6 @@
%% outside of the service process.
-define(STATE_TABLE, ?MODULE).
-%% The default sequence mask.
--define(NOMASK, {0,32}).
-
-%% The default restrict_connections.
--define(RESTRICT, nodes).
-
%% Workaround for dialyzer's lack of understanding of match specs.
-type match(T)
:: T | '_' | '$1' | '$2'.
@@ -110,21 +104,17 @@
service :: #diameter_service{},
watchdogT = ets_new(watchdogs) %% #watchdog{} at start
:: ets:tid(),
- peerT, %% undefined in new code, but remain for upgrade
- shared_peers, %% reasons. Replaced by local/remote.
- local_peers, %%
local :: {ets:tid(), ets:tid(), ets:tid()},
remote :: {ets:tid(), ets:tid(), ets:tid()},
monitor = false :: false | pid(), %% process to die with
- options
- :: [{sequence, diameter:sequence()} %% sequence mask
- | {share_peers, diameter:remotes()} %% broadcast to
- | {use_shared_peers, diameter:remotes()} %% use from
- | {restrict_connections, diameter:restriction()}
- | {strict_mbit, boolean()}
- | {string_decode, boolean()}
- | {incoming_maxlen, diameter:message_length()}]}).
-%% shared_peers reflects the peers broadcast from remote nodes.
+ options :: #{sequence := diameter:sequence(), %% sequence mask
+ share_peers := diameter:remotes(),%% broadcast to
+ use_shared_peers := diameter:remotes(),%% use from
+ restrict_connections := diameter:restriction(),
+ incoming_maxlen := diameter:message_length(),
+ strict_mbit := boolean(),
+ string_decode := boolean(),
+ spawn_opt := list() | {module(), atom(), list()}}}).
%% Record representing an RFC 3539 watchdog process implemented by
%% diameter_watchdog.
@@ -284,7 +274,7 @@ whois(SvcName) ->
%% ---------------------------------------------------------------------------
-spec pick_peer(SvcName, AppOrAlias, Opts)
- -> {{TPid, Caps, App}, Mask, SvcOpts}
+ -> {{{TPid, Caps}, App}, SvcOpts}
| false %% no selection
| {error, no_service}
when SvcName :: diameter:service_name(),
@@ -292,14 +282,12 @@ whois(SvcName) ->
| {alias, diameter:app_alias()},
Opts :: {fun((Dict :: module()) -> [term()]),
diameter:peer_filter(),
- Xtra :: list()},
+ Xtra :: list(),
+ [diameter:peer_ref()]},
TPid :: pid(),
Caps :: #diameter_caps{},
App :: #diameter_app{},
- Mask :: diameter:sequence(),
- SvcOpts :: [diameter:service_opt()].
-%% Extract Mask in the returned tuple so that diameter_traffic doesn't
-%% need to know about the ordering of SvcOpts used here.
+ SvcOpts :: map().
pick_peer(SvcName, App, Opts) ->
pick(lookup_state(SvcName), App, Opts).
@@ -319,16 +307,16 @@ pick(#state{service = #diameter_service{applications = Apps}}
pick(_, false = No, _) ->
No;
-pick(#state{options = [{_, Mask} | SvcOpts]}
+pick(#state{options = SvcOpts}
= S,
#diameter_app{module = ModX, dictionary = Dict}
= App0,
- {DestF, Filter, Xtra}) ->
+ {DestF, Filter, Xtra, TPids}) ->
App = App0#diameter_app{module = ModX ++ Xtra},
[_,_] = RealmAndHost = diameter_lib:eval([DestF, Dict]),
- case pick_peer(App, RealmAndHost, Filter, S) of
- {TPid, Caps} ->
- {{TPid, Caps, App}, Mask, SvcOpts};
+ case pick_peer(App, RealmAndHost, [Filter | TPids], S) of
+ {_TPid, _Caps} = TC ->
+ {{TC, App}, SvcOpts};
false = No ->
No
end.
@@ -556,81 +544,9 @@ terminate(Reason, #state{service_name = Name, local = {PeerT, _, _}} = S) ->
%% # code_change/3
%% ---------------------------------------------------------------------------
-code_change(_FromVsn, #state{} = S, _Extra) ->
- {ok, S};
-
-%% Don't support downgrade since we won't in appup.
-code_change({down = T, _}, _, _Extra) ->
- {error, T};
-
-%% Upgrade local/shared peers dicts populated in old code. Don't
-code_change(_FromVsn, S0, _Extra) ->
- {state, Id, SvcName, Svc, WT, PeerT, SDict, LDict, Monitor, Opts}
- = S0,
-
- init_peers(LT = setelement(1, {PT, _, _} = init_peers(), PeerT),
- fun({_,A}) -> A end),
- init_peers(init_peers(RT = init_peers(), SDict),
- fun(A) -> A end),
-
- S = #state{id = Id,
- service_name = SvcName,
- service = Svc,
- watchdogT = WT,
- peerT = PT, %% empty
- shared_peers = SDict,
- local_peers = LDict,
- local = LT,
- remote = RT,
- monitor = Monitor,
- options = Opts},
-
- %% Replacing the table entry and deleting the old shared tables
- %% can make outgoing requests return {error, no_connection} until
- %% everyone is running new code. Don't delete the tables to avoid
- %% crashing request processes.
- ets:delete_all_objects(SDict),
- ets:delete_all_objects(LDict),
- ets:insert(?STATE_TABLE, S),
+code_change(_FromVsn, S, _Extra) ->
{ok, S}.
-%% init_peers/2
-
-%% Populate app and identity bags from a new-style #peer{} sets.
-init_peers({PeerT, _, _} = T, F)
- when is_function(F) ->
- ets:foldl(fun(#peer{pid = P, apps = As, caps = C}, N) ->
- insert_peer(P, lists:map(F, As), C, T),
- N+1
- end,
- 0,
- PeerT);
-
-%% Populate #peer{} table given a shared peers dict.
-init_peers({PeerT, _, _}, SDict) ->
- dict:fold(fun(P, As, N) ->
- ets:update_element(PeerT, P, {#peer.apps, As}),
- N+1
- end,
- 0,
- diameter_dict:fold(fun(A, Ps, D) ->
- init_peers(A, Ps, PeerT, D)
- end,
- dict:new(),
- SDict)).
-
-%% init_peers/4
-
-init_peers(App, TCs, PeerT, Dict) ->
- lists:foldl(fun({P,C}, D) ->
- ets:insert(PeerT, #peer{pid = P,
- apps = [],
- caps = C}),
- dict:append(P, App, D)
- end,
- Dict,
- TCs).
-
%% ===========================================================================
%% ===========================================================================
@@ -768,7 +684,7 @@ cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts},
local = init_peers(),
remote = init_peers(),
monitor = mref(get_value(monitor, Opts)),
- options = service_options(Opts)},
+ options = service_options(lists:keydelete(monitor, 1, Opts))},
{S, Acc};
cfg_acc({_Ref, Type, _Opts} = T, {S, Acc})
@@ -784,24 +700,14 @@ init_peers() ->
%% TPid}
service_options(Opts) ->
- [{sequence, proplists:get_value(sequence, Opts, ?NOMASK)},
- {share_peers, get_value(share_peers, Opts)},
- {use_shared_peers, get_value(use_shared_peers, Opts)},
- {restrict_connections, proplists:get_value(restrict_connections,
- Opts,
- ?RESTRICT)},
- {spawn_opt, proplists:get_value(spawn_opt, Opts, [])},
- {string_decode, proplists:get_value(string_decode, Opts, true)},
- {incoming_maxlen, proplists:get_value(incoming_maxlen, Opts, 16#FFFFFF)},
- {strict_mbit, proplists:get_value(strict_mbit, Opts, true)}].
-%% The order of options is significant since we match against the list.
+ maps:from_list(Opts).
mref(false = No) ->
No;
mref(P) ->
monitor(process, P).
-init_shared(#state{options = [_, _, {_,T} | _],
+init_shared(#state{options = #{use_shared_peers := T},
service_name = Svc}) ->
notify(T, Svc, {service, self()}).
@@ -899,7 +805,8 @@ start(Ref, Type, Opts, State) ->
start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT,
local = {PeerT, _, _},
- options = SvcOpts,
+ options = #{string_decode := SD}
+ = SvcOpts0,
service_name = SvcName,
service = Svc0})
when Type == connect;
@@ -907,21 +814,25 @@ start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT,
#diameter_service{applications = Apps}
= Svc1
= merge_service(Opts, Svc0),
- Svc = binary_caps(Svc1, proplists:get_value(string_decode, SvcOpts, true)),
- RecvData = diameter_traffic:make_recvdata([SvcName,
- PeerT,
- Apps,
- SvcOpts]),
- T = {{spawn_opts([Opts, SvcOpts]), RecvData}, Opts, SvcOpts, Svc},
+ Svc = binary_caps(Svc1, SD),
+ SvcOpts = merge_options(Opts, SvcOpts0),
+ RecvData = diameter_traffic:make_recvdata([SvcName, PeerT, Apps, SvcOpts]),
+ T = {Opts, SvcOpts, RecvData, Svc},
Rec = #watchdog{type = Type,
ref = Ref,
options = Opts},
+
diameter_lib:fold_n(fun(_,A) ->
[wd(Type, Ref, T, WatchdogT, Rec) | A]
end,
[],
N).
+merge_options(Opts, SvcOpts) ->
+ Keys = maps:keys(SvcOpts),
+ Map = maps:from_list([KV || {K,_} = KV <- Opts, lists:member(K, Keys)]),
+ maps:merge(SvcOpts, Map).
+
binary_caps(Svc, true) ->
Svc;
binary_caps(#diameter_service{capabilities = Caps} = Svc, false) ->
@@ -936,12 +847,6 @@ wd(Type, Ref, T, WatchdogT, Rec) ->
%% record so that each watchdog may get a different record. This
%% record is what is passed back into application callbacks.
-spawn_opts(Optss) ->
- SpawnOpts = get_value(spawn_opt, Optss, []),
- [T || T <- SpawnOpts,
- T /= link,
- T /= monitor].
-
start_watchdog(Type, Ref, T) ->
{_MRef, Pid} = diameter_watchdog:start({Type, Ref}, T),
Pid.
@@ -1154,18 +1059,6 @@ keyfind([Key | Rest], Pos, L) ->
T
end.
-%% get_value/3
-
-get_value(_, [], Def) ->
- Def;
-get_value(Key, [L | Rest], Def) ->
- case lists:keyfind(Key, 1, L) of
- {_,V} ->
- V;
- _ ->
- get_value(Key, Rest, Def)
- end.
-
%% find_outgoing_app/2
find_outgoing_app(Alias, Apps) ->
@@ -1463,19 +1356,19 @@ send_event(#diameter_event{service = SvcName} = E) ->
%% # share_peer/5
%% ---------------------------------------------------------------------------
-share_peer(up, Caps, Apps, TPid, #state{options = [_, {_,T} | _],
+share_peer(up, Caps, Apps, TPid, #state{options = #{share_peers := SP},
service_name = Svc}) ->
- notify(T, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps});
+ notify(SP, Svc, {peer, TPid, [A || {_,A} <- Apps], Caps});
-share_peer(down, _Caps, _Apps, TPid, #state{options = [_, {_,T} | _],
+share_peer(down, _Caps, _Apps, TPid, #state{options = #{share_peers := SP},
service_name = Svc}) ->
- notify(T, Svc, {peer, TPid}).
+ notify(SP, Svc, {peer, TPid}).
%% ---------------------------------------------------------------------------
%% # share_peers/2
%% ---------------------------------------------------------------------------
-share_peers(Pid, #state{options = [_, {_,SP} | _],
+share_peers(Pid, #state{options = #{share_peers := SP},
local = {PeerT, AppT, _}}) ->
is_remote(Pid, SP)
andalso ets:foldl(fun(T, N) -> N + sp(Pid, AppT, T) end,
@@ -1507,7 +1400,8 @@ is_remote(Pid, T) ->
%% # remote_peer_up/4
%% ---------------------------------------------------------------------------
-remote_peer_up(TPid, Aliases, Caps, #state{options = [_, _, {_,T} | _]} = S) ->
+remote_peer_up(TPid, Aliases, Caps, #state{options = #{use_shared_peers := T}}
+ = S) ->
is_remote(TPid, T) andalso rpu(TPid, Aliases, Caps, S).
rpu(TPid, Aliases, Caps, #state{service = Svc, remote = RT}) ->
@@ -1629,8 +1523,14 @@ pick_peer(Local,
%% peers/4
-peers(Alias, RH, Filter, T) ->
- filter(Alias, RH, Filter, T, true).
+%% No peer options pointing at specific peers: search for them.
+peers(Alias, RH, [Filter], T) ->
+ filter(Alias, RH, Filter, T, true);
+
+%% Or just lookup.
+peers(_Alias, RH, [Filter | TPids], {PeerT, _AppT, _IdentT}) ->
+ {Ts, _} = filter(caps(PeerT, TPids), RH, Filter),
+ Ts.
%% filter/5
%%