aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2017-08-23 12:21:14 +0200
committerAnders Svensson <[email protected]>2017-08-25 13:08:41 +0200
commit5f3becaddef9b18c81a1ec8bd7bf955384c1a225 (patch)
tree23b87c1dd78c561c51c9bf534e7b7b479c9d4c44 /lib/diameter/src
parent2d540b95755a6f628b0cfc5a9bab4ff41046fe4b (diff)
downloadotp-5f3becaddef9b18c81a1ec8bd7bf955384c1a225.tar.gz
otp-5f3becaddef9b18c81a1ec8bd7bf955384c1a225.tar.bz2
otp-5f3becaddef9b18c81a1ec8bd7bf955384c1a225.zip
Let a service configure default transport options
Only a default spawn_opt has been possible to configure, but there's no reason why most others should need to be configured per transport. Those options that still only make sense on a transport are transport_module/config (because of the semantics of multiple values), applications/capabilities (since these override service options), and private (since it's only to allow user-specific options in a backwards compatible way).
Diffstat (limited to 'lib/diameter/src')
-rw-r--r--lib/diameter/src/base/diameter.erl31
-rw-r--r--lib/diameter/src/base/diameter_config.erl297
-rw-r--r--lib/diameter/src/base/diameter_service.erl64
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl35
4 files changed, 223 insertions, 204 deletions
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl
index ad6aaab440..0f919f6c25 100644
--- a/lib/diameter/src/base/diameter.erl
+++ b/lib/diameter/src/base/diameter.erl
@@ -348,6 +348,22 @@ call(SvcName, App, Message) ->
| encode
| decode.
+%% Options common to both start_service/2 and add_transport/2.
+
+-type common_opt()
+ :: {pool_size, pos_integer()}
+ | {capabilities_cb, eval()}
+ | {capx_timeout, 'Unsigned32'()}
+ | {strict_capx, boolean()}
+ | {disconnect_cb, eval()}
+ | {dpr_timeout, 'Unsigned32'()}
+ | {dpa_timeout, 'Unsigned32'()}
+ | {length_errors, exit | handle | discard}
+ | {connect_timer, 'Unsigned32'()}
+ | {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}}
+ | {watchdog_config, [{okay|suspect, non_neg_integer()}]}
+ | {spawn_opt, list()}.
+
%% Options passed to start_service/2
-type service_opt()
@@ -363,7 +379,7 @@ call(SvcName, App, Message) ->
| {strict_mbit, boolean()}
| {incoming_maxlen, message_length()}
| {use_shared_peers, remotes()}
- | {spawn_opt, list()}.
+ | common_opt().
-type application_opt()
:: {alias, app_alias()}
@@ -393,20 +409,9 @@ call(SvcName, App, Message) ->
:: {transport_module, atom()}
| {transport_config, any()}
| {transport_config, any(), 'Unsigned32'() | infinity}
- | {pool_size, pos_integer()}
| {applications, [app_alias()]}
| {capabilities, [capability()]}
- | {capabilities_cb, eval()}
- | {capx_timeout, 'Unsigned32'()}
- | {strict_capx, boolean()}
- | {disconnect_cb, eval()}
- | {dpr_timeout, 'Unsigned32'()}
- | {dpa_timeout, 'Unsigned32'()}
- | {length_errors, exit | handle | discard}
- | {connect_timer, 'Unsigned32'()}
- | {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}}
- | {watchdog_config, [{okay|suspect, non_neg_integer()}]}
- | {spawn_opt, list()}
+ | common_opt()
| {private, any()}.
%% Predicate passed to remove_transport/2
diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl
index d54dbd9976..a4496f9725 100644
--- a/lib/diameter/src/base/diameter_config.erl
+++ b/lib/diameter/src/base/diameter_config.erl
@@ -102,9 +102,6 @@
-record(monitor, {mref = make_ref() :: reference(),
service}). %% name
-%% The default sequence mask.
--define(NOMASK, {0,32}).
-
%% Time to lay low before restarting a dead service.
-define(RESTART_SLEEP, 2000).
@@ -560,89 +557,184 @@ add(SvcName, Type, Opts0) ->
end.
transport_opts(Opts) ->
- lists:map(fun topt/1, Opts).
+ [setopt(transport, T) || T <- Opts].
+
+%% setopt/2
-topt(T) ->
- case opt(T) of
+setopt(K, T) ->
+ case opt(K, T) of
{value, X} ->
X;
true ->
T;
false ->
- ?THROW({invalid, T})
+ ?THROW({invalid, T});
+ {error, Reason} ->
+ ?THROW({invalid, T, Reason})
end.
-opt({transport_module, M}) ->
+%% opt/2
+
+opt(service, {incoming_maxlen, N})
+ when 0 =< N, N < 1 bsl 24 ->
+ true;
+
+opt(service, {K, B})
+ when K == strict_mbit;
+ K == string_decode;
+ K == traffic_counters ->
+ is_boolean(B);
+
+opt(service, {K, false})
+ when K == share_peers;
+ K == use_shared_peers;
+ K == monitor;
+ K == restrict_connections;
+ K == strict_arities;
+ K == decode_format ->
+ true;
+
+opt(service, {K, true})
+ when K == share_peers;
+ K == use_shared_peers;
+ K == strict_arities ->
+ true;
+
+opt(service, {decode_format, T})
+ when T == record;
+ T == list;
+ T == map;
+ T == record_from_map ->
+ true;
+
+opt(service, {strict_arities, T})
+ when T == encode;
+ T == decode ->
+ true;
+
+opt(service, {restrict_connections, T})
+ when T == node;
+ T == nodes ->
+ true;
+
+opt(service, {K, T})
+ when (K == share_peers
+ orelse K == use_shared_peers
+ orelse K == restrict_connections), ([] == T
+ orelse is_atom(hd(T))) ->
+ true;
+
+opt(service, {monitor, P}) ->
+ is_pid(P);
+
+opt(service, {K, F})
+ when K == restrict_connections;
+ K == share_peers;
+ K == use_shared_peers ->
+ try diameter_lib:eval(F) of %% but no guarantee that it won't fail later
+ Nodes ->
+ is_list(Nodes) orelse {error, Nodes}
+ catch
+ E:R ->
+ {error, {E, R, ?STACK}}
+ end;
+
+opt(service, {sequence, {H,N}}) ->
+ 0 =< N andalso N =< 32
+ andalso is_integer(H)
+ andalso 0 =< H
+ andalso 0 == H bsr (32-N);
+
+opt(service = S, {sequence = K, F}) ->
+ try diameter_lib:eval(F) of
+ {_,_} = T ->
+ KT = {K,T},
+ opt(S, KT) andalso {value, KT};
+ V ->
+ {error, V}
+ catch
+ E:R ->
+ {error, {E, R, ?STACK}}
+ end;
+
+opt(transport, {transport_module, M}) ->
is_atom(M);
-opt({transport_config, _, Tmo}) ->
+opt(transport, {transport_config, _, Tmo}) ->
?IS_UINT32(Tmo) orelse Tmo == infinity;
-opt({applications, As}) ->
+opt(transport, {applications, As}) ->
is_list(As);
-opt({capabilities, Os}) ->
- is_list(Os) andalso ok == encode_CER(Os);
+opt(transport, {capabilities, Os}) ->
+ is_list(Os) andalso case encode_CER(Os) of
+ ok -> true;
+ No -> {error, No}
+ end;
-opt({K, Tmo})
+opt(_, {K, Tmo})
when K == capx_timeout;
K == dpr_timeout;
K == dpa_timeout ->
?IS_UINT32(Tmo);
-opt({capx_strictness, B}) ->
+opt(_, {capx_strictness, B}) ->
is_boolean(B) andalso {value, {strict_capx, B}};
-opt({strict_capx, B}) ->
+opt(_, {strict_capx, B}) ->
is_boolean(B);
-opt({length_errors, T}) ->
+opt(_, {length_errors, T}) ->
lists:member(T, [exit, handle, discard]);
-opt({K, Tmo})
- when K == reconnect_timer; %% deprecated
- K == connect_timer ->
+opt(transport, {reconnect_timer, Tmo}) -> %% deprecated
+ ?IS_UINT32(Tmo) andalso {value, {connect_timer, Tmo}};
+opt(_, {connect_timer, Tmo}) ->
?IS_UINT32(Tmo);
-opt({watchdog_timer, {M,F,A}})
+opt(_, {watchdog_timer, {M,F,A}})
when is_atom(M), is_atom(F), is_list(A) ->
true;
-opt({watchdog_timer, Tmo}) ->
+opt(_, {watchdog_timer, Tmo}) ->
?IS_UINT32(Tmo);
-opt({watchdog_config, L}) ->
- is_list(L) andalso lists:all(fun wdopt/1, L);
+opt(_, {watchdog_config, L}) ->
+ is_list(L) andalso lists:all(fun wd/1, L);
-opt({spawn_opt, {M,F,A}})
+opt(_, {spawn_opt, {M,F,A}})
when is_atom(M), is_atom(F), is_list(A) ->
true;
-opt({spawn_opt = K, Opts}) ->
+opt(_, {spawn_opt = K, Opts}) ->
if is_list(Opts) ->
{value, {K, spawn_opts(Opts)}};
true ->
false
end;
-opt({pool_size, N}) ->
+opt(_, {pool_size, N}) ->
is_integer(N) andalso 0 < N;
-%% Options that we can't validate.
-opt({K, _})
+%% Options we can't validate.
+opt(_, {K, _})
+ when K == disconnect_cb;
+ K == capabilities_cb ->
+ true;
+opt(transport, {K, _})
when K == transport_config;
- K == capabilities_cb;
- K == disconnect_cb;
K == private ->
true;
-%% Anything else, which is ignored by us. This makes options sensitive
-%% to spelling mistakes but arbitrary options are passed by some users
-%% as a way to identify transports. (That is, can't just do away with
-%% it.)
-opt(_) ->
- true.
+%% Anything else, which is ignored in transport config. This makes
+%% options sensitive to spelling mistakes, but arbitrary options are
+%% passed by some users as a way to identify transports so can't just
+%% do away with it.
+opt(K, _) ->
+ K == transport.
+
+%% wd/1
-wdopt({K,N}) ->
+wd({K,N}) ->
(K == okay orelse K == suspect) andalso is_integer(N) andalso 0 =< N;
-wdopt(_) ->
+wd(_) ->
false.
%% start_transport/2
@@ -707,19 +799,7 @@ make_config(SvcName, Opts) ->
ok = encode_CER(CapOpts),
- SvcOpts = make_opts((Opts -- AppOpts) -- CapOpts,
- [{false, share_peers},
- {false, use_shared_peers},
- {false, monitor},
- {?NOMASK, sequence},
- {nodes, restrict_connections},
- {16#FFFFFF, incoming_maxlen},
- {true, strict_arities},
- {true, strict_mbit},
- {record, decode_format},
- {true, traffic_counters},
- {true, string_decode},
- {[], spawn_opt}]),
+ SvcOpts = service_opts((Opts -- AppOpts) -- CapOpts),
D = proplists:get_value(string_decode, SvcOpts, true),
@@ -733,115 +813,22 @@ binary_caps(Caps, true) ->
binary_caps(Caps, false) ->
diameter_capx:binary_caps(Caps).
-%% make_opts/2
-
-make_opts(Opts, Defs) ->
- Known = [{K, get_opt(K, Opts, D)} || {D,K} <- Defs],
- Unknown = Opts -- Known,
+%% service_opts/1
- [] == Unknown orelse ?THROW({invalid, hd(Unknown)}),
-
- [{K, opt(K,V)} || {K,V} <- Known].
-
-opt(incoming_maxlen, N)
- when 0 =< N, N < 1 bsl 24 ->
- N;
-
-opt(spawn_opt, {M,F,A} = T)
- when is_atom(M), is_atom(F), is_list(A) ->
- T;
-
-opt(spawn_opt, L)
- when is_list(L) ->
- spawn_opts(L);
-
-opt(K, false = B)
- when K == share_peers;
- K == use_shared_peers;
- K == monitor;
- K == restrict_connections;
- K == strict_arities;
- K == strict_mbit;
- K == decode_format;
- K == traffic_counters;
- K == string_decode ->
- B;
-
-opt(K, true = B)
- when K == share_peers;
- K == use_shared_peers;
- K == strict_arities;
- K == strict_mbit;
- K == traffic_counters;
- K == string_decode ->
- B;
-
-opt(decode_format, T)
- when T == record;
- T == list;
- T == map;
- T == record_from_map ->
- T;
-
-opt(strict_arities, T)
- when T == encode;
- T == decode ->
- T;
-
-opt(restrict_connections, T)
- when T == node;
- T == nodes ->
- T;
-
-opt(K, T)
- when (K == share_peers
- orelse K == use_shared_peers
- orelse K == restrict_connections), ([] == T
- orelse is_atom(hd(T))) ->
- T;
-
-opt(monitor, P)
- when is_pid(P) ->
- P;
-
-opt(K, F)
- when K == restrict_connections;
- K == share_peers;
- K == use_shared_peers ->
- try diameter_lib:eval(F) of %% but no guarantee that it won't fail later
- Nodes when is_list(Nodes) ->
- F;
- V ->
- ?THROW({value, {K,V}})
- catch
- E:R ->
- ?THROW({value, {K, E, R, ?STACK}})
- end;
-
-opt(sequence, {_,_} = T) ->
- sequence(T);
-
-opt(sequence = K, F) ->
- try diameter_lib:eval(F) of
- T -> sequence(T)
- catch
- E:R ->
- ?THROW({value, {K, E, R, ?STACK}})
- end;
-
-opt(K, _) ->
- ?THROW({value, K}).
+service_opts(Opts) ->
+ Res = [setopt(service, T) || T <- Opts],
+ Keys = sets:to_list(sets:from_list([K || {K,_} <- Res])), %% unique
+ Dups = lists:foldl(fun(K,A) -> lists:keydelete(K, 1, A) end, Res, Keys),
+ [] == Dups orelse ?THROW({duplicate, Dups}),
+ Res.
+%% Reject duplicates on a service, but not on a transport. There's no
+%% particular reason for the inconsistency, but the historic behaviour
+%% ignores all but the first of a transport_opt(), and there's no real
+%% reason to change it.
spawn_opts(L) ->
[T || T <- L, T /= link, T /= monitor].
-sequence({H,N} = T)
- when 0 =< N, N =< 32, 0 =< H, 0 == H bsr (32-N) ->
- T;
-
-sequence(_) ->
- ?THROW({value, sequence}).
-
make_caps(Caps, Opts) ->
case diameter_capx:make_caps(Caps, Opts) of
{ok, T} ->
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index 07f1fd3a4a..c7b0e706a5 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -117,6 +117,18 @@
decode_format := diameter:decode_format(),
traffic_counters := boolean(),
string_decode := boolean(),
+ capabilities_cb => diameter:evaluable(),
+ pool_size => pos_integer(),
+ capx_timeout => diameter:'Unsigned32'(),
+ strict_capx => boolean(),
+ disconnect_cb => diameter:evaluable(),
+ dpr_timeout => diameter:'Unsigned32'(),
+ dpa_timeout => diameter:'Unsigned32'(),
+ length_errors => exit | handle | discard,
+ connect_timer => diameter:'Unsigned32'(),
+ watchdog_timer => diameter:'Unsigned32'()
+ | {module(), atom(), list()},
+ watchdog_config => [{okay|suspect, non_neg_integer()}],
spawn_opt := list() | {module(), atom(), list()}}}).
%% Record representing an RFC 3539 watchdog process implemented by
@@ -682,12 +694,15 @@ i(SvcName) ->
cfg_acc({SvcName, #diameter_service{applications = Apps} = Rec, Opts},
{false, Acc}) ->
lists:foreach(fun init_mod/1, Apps),
+ #{monitor := M}
+ = SvcOpts
+ = service_opts(Opts),
S = #state{service_name = SvcName,
service = Rec#diameter_service{pid = self()},
local = init_peers(),
remote = init_peers(),
- monitor = mref(get_value(monitor, Opts)),
- options = service_options(lists:keydelete(monitor, 1, Opts))},
+ monitor = mref(M),
+ options = maps:remove(monitor, SvcOpts)},
{S, Acc};
cfg_acc({_Ref, Type, _Opts} = T, {S, Acc})
@@ -702,8 +717,23 @@ init_peers() ->
%% Alias,
%% TPid}
-service_options(Opts) ->
- maps:from_list(lists:delete({strict_arities, true}, Opts)).
+service_opts(Opts) ->
+ T = {strict_arities, true},
+ maps:merge(maps:from_list([{monitor, false} | def_opts() -- [T]]),
+ maps:from_list(Opts -- [T])).
+
+def_opts() -> %% defaults on the service map
+ [{share_peers, false},
+ {use_shared_peers, false},
+ {sequence, {0,32}},
+ {restrict_connections, nodes},
+ {incoming_maxlen, 16#FFFFFF},
+ {strict_arities, true},
+ {strict_mbit, true},
+ {decode_format, record},
+ {traffic_counters, true},
+ {string_decode, true},
+ {spawn_opt, []}].
mref(false = No) ->
No;
@@ -721,10 +751,6 @@ init_mod(#diameter_app{alias = Alias,
start_fsm({Ref, Type, Opts}, S) ->
start(Ref, {Type, Opts}, S).
-get_value(Key, Vs) ->
- {_, V} = lists:keyfind(Key, 1, Vs),
- V.
-
notify(Share, SvcName, T) ->
Nodes = remotes(Share),
[] /= Nodes andalso diameter_peer:notify(Nodes, SvcName, T).
@@ -809,7 +835,7 @@ start(Ref, Type, Opts, State) ->
start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT,
local = {PeerT, _, _},
options = #{string_decode := SD}
- = SvcOpts0,
+ = SvcOpts,
service_name = SvcName,
service = Svc0})
when Type == connect;
@@ -818,12 +844,12 @@ start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT,
= Svc1
= merge_service(Opts, Svc0),
Svc = binary_caps(Svc1, SD),
- SvcOpts = merge_options(Opts, SvcOpts0),
- RecvData = diameter_traffic:make_recvdata([SvcName, PeerT, Apps, SvcOpts]),
- T = {Opts, SvcOpts, RecvData, Svc},
+ {SOpts, TOpts} = merge_opts(SvcOpts, Opts),
+ RecvData = diameter_traffic:make_recvdata([SvcName, PeerT, Apps, SOpts]),
+ T = {TOpts, SOpts, RecvData, Svc},
Rec = #watchdog{type = Type,
ref = Ref,
- options = Opts},
+ options = TOpts},
diameter_lib:fold_n(fun(_,A) ->
[wd(Type, Ref, T, WatchdogT, Rec) | A]
@@ -831,10 +857,14 @@ start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT,
[],
N).
-merge_options(Opts, SvcOpts) ->
- Keys = maps:keys(SvcOpts),
- Map = maps:from_list([KV || {K,_} = KV <- Opts, lists:member(K, Keys)]),
- maps:merge(SvcOpts, Map).
+merge_opts(SvcOpts, Opts) ->
+ Keys = [K || {K,_} <- def_opts()],
+ SO = [T || {K,_} = T <- Opts, lists:member(K, Keys)],
+ TO = Opts -- SO,
+ {maps:merge(maps:with(Keys, SvcOpts), maps:from_list(SO)),
+ TO ++ [T || {K,_} = T <- maps:to_list(SvcOpts),
+ not lists:member(K, Keys),
+ not lists:keymember(K, 1, Opts)]}.
binary_caps(Svc, true) ->
Svc;
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index b2172356ee..bb671e9860 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -76,10 +76,8 @@
string_decode := false,
strict_arities => diameter:strict_arities(),
strict_mbit := boolean(),
- failed_avp := false,
rfc := 3588 | 6733,
- ordered_encode := false,
- incoming_maxlen := diameter:message_length()},
+ ordered_encode := false},
shutdown = false :: boolean()}).
%% ---------------------------------------------------------------------------
@@ -137,15 +135,6 @@ i({Ack, T, Pid, {Opts,
putr(restart, {T, Opts, Svc, SvcOpts}), %% save seeing it in trace
putr(dwr, dwr(Caps)), %%
Nodes = restrict_nodes(Restrict),
- CodecKeys = [decode_format,
- string_decode,
- strict_arities,
- strict_mbit,
- incoming_maxlen,
- spawn_opt,
- rfc,
- ordered_encode],
-
#watchdog{parent = Pid,
transport = start(T, Opts, SvcOpts, Nodes, Dict0, Svc),
tw = proplists:get_value(watchdog_timer,
@@ -153,13 +142,21 @@ i({Ack, T, Pid, {Opts,
?DEFAULT_TW_INIT),
receive_data = RecvData,
dictionary = Dict0,
- config =
- maps:without([traffic_counters | CodecKeys],
- config(SvcOpts#{restrict => restrict(Nodes),
- suspect => 1,
- okay => 3},
- Opts)),
- codec = maps:with(CodecKeys -- [strict_arities],
+ config = maps:with([sequence,
+ restrict_connections,
+ restrict,
+ suspect,
+ okay],
+ config(SvcOpts#{restrict => restrict(Nodes),
+ suspect => 1,
+ okay => 3},
+ Opts)),
+ codec = maps:with([decode_format,
+ strict_arities,
+ strict_mbit,
+ string_decode,
+ rfc,
+ ordered_encode],
SvcOpts#{decode_format := false,
string_decode := false,
ordered_encode => false})}.