aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src')
-rw-r--r--lib/diameter/src/base/diameter.erl4
-rw-r--r--lib/diameter/src/base/diameter_codec.erl21
-rw-r--r--lib/diameter/src/base/diameter_config.erl14
-rw-r--r--lib/diameter/src/base/diameter_lib.erl155
-rw-r--r--lib/diameter/src/base/diameter_peer.erl6
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl168
-rw-r--r--lib/diameter/src/base/diameter_reg.erl7
-rw-r--r--lib/diameter/src/base/diameter_service.erl93
-rw-r--r--lib/diameter/src/base/diameter_service_sup.erl4
-rw-r--r--lib/diameter/src/base/diameter_session.erl4
-rw-r--r--lib/diameter/src/base/diameter_stats.erl6
-rw-r--r--lib/diameter/src/base/diameter_sup.erl4
-rw-r--r--lib/diameter/src/base/diameter_sync.erl5
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl35
-rw-r--r--lib/diameter/src/base/diameter_types.erl2
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl17
-rw-r--r--lib/diameter/src/modules.mk4
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl272
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl37
-rw-r--r--lib/diameter/src/transport/diameter_transport_sup.erl4
20 files changed, 616 insertions, 246 deletions
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl
index d74e091e11..a45d84f95b 100644
--- a/lib/diameter/src/base/diameter.erl
+++ b/lib/diameter/src/base/diameter.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2014. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -337,11 +337,13 @@ call(SvcName, App, Message) ->
:: {transport_module, atom()}
| {transport_config, any()}
| {transport_config, any(), 'Unsigned32'() | infinity}
+ | {pool_size, pos_integer()}
| {applications, [app_alias()]}
| {capabilities, [capability()]}
| {capabilities_cb, evaluable()}
| {capx_timeout, 'Unsigned32'()}
| {disconnect_cb, evaluable()}
+ | {dpa_timeout, 'Unsigned32'()}
| {length_errors, exit | handle | discard}
| {connect_timer, 'Unsigned32'()}
| {watchdog_timer, 'Unsigned32'() | {module(), atom(), list()}}
diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl
index 07ad5f97d7..3a0a030fc9 100644
--- a/lib/diameter/src/base/diameter_codec.erl
+++ b/lib/diameter/src/base/diameter_codec.erl
@@ -414,6 +414,9 @@ sequence_numbers(#diameter_packet{bin = Bin})
sequence_numbers(#diameter_packet{header = #diameter_header{} = H}) ->
sequence_numbers(H);
+sequence_numbers(#diameter_packet{msg = [#diameter_header{} = H | _]}) ->
+ sequence_numbers(H);
+
sequence_numbers(#diameter_header{hop_by_hop_id = H,
end_to_end_id = E}) ->
{H,E};
@@ -585,14 +588,14 @@ split_data(Bin, Len) ->
<<Data:Len/binary, _:Pad/binary, Rest/binary>> ->
{Data, Rest};
_ ->
- %% Header length points past the end of the message. As
- %% stated in the 6733 text above, it's sufficient to
- %% return a zero-filled minimal payload if this is a
- %% request. Do this (in cases that we know the type) by
- %% inducing a decode failure and letting the dictionary's
- %% decode (in diameter_gen) deal with it. Here we don't
- %% know type. If the type isn't known, then the decode
- %% just strips the extra bit.
+ %% Header length points past the end of the message, or
+ %% doesn't span the header. As stated in the 6733 text
+ %% above, it's sufficient to return a zero-filled minimal
+ %% payload if this is a request. Do this (in cases that we
+ %% know the type) by inducing a decode failure and letting
+ %% the dictionary's decode (in diameter_gen) deal with it.
+ %% Here we don't know type. If the type isn't known, then
+ %% the decode just strips the extra bit.
{<<0:1, Bin/binary>>, <<>>}
end.
@@ -606,6 +609,8 @@ split_data(Bin, Len) ->
%% dictionary doesn't know about specific AVP's.
%% Grouped AVP whose components need packing ...
+pack_avp([#diameter_avp{} = A | Avps]) ->
+ pack_avp(A#diameter_avp{data = Avps});
pack_avp(#diameter_avp{data = [#diameter_avp{} | _] = Avps} = A) ->
pack_avp(A#diameter_avp{data = encode_avps(Avps)});
diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl
index dd1c9b73bb..aa4d6e5a20 100644
--- a/lib/diameter/src/base/diameter_config.erl
+++ b/lib/diameter/src/base/diameter_config.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2014. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -35,10 +35,11 @@
%%
-module(diameter_config).
--compile({no_auto_import, [monitor/2]}).
-
-behaviour(gen_server).
+-compile({no_auto_import, [monitor/2, now/0]}).
+-import(diameter_lib, [now/0]).
+
-export([start_service/2,
stop_service/1,
add_transport/2,
@@ -531,7 +532,9 @@ opt({applications, As}) ->
opt({capabilities, Os}) ->
is_list(Os) andalso ok == encode_CER(Os);
-opt({capx_timeout, Tmo}) ->
+opt({K, Tmo})
+ when K == capx_timeout;
+ K == dpa_timeout ->
?IS_UINT32(Tmo);
opt({length_errors, T}) ->
@@ -554,6 +557,9 @@ opt({watchdog_config, L}) ->
opt({spawn_opt, Opts}) ->
is_list(Opts);
+opt({pool_size, N}) ->
+ is_integer(N) andalso 0 < N;
+
%% Options that we can't validate.
opt({K, _})
when K == transport_config;
diff --git a/lib/diameter/src/base/diameter_lib.erl b/lib/diameter/src/base/diameter_lib.erl
index 5b3a2063f8..3f327f3653 100644
--- a/lib/diameter/src/base/diameter_lib.erl
+++ b/lib/diameter/src/base/diameter_lib.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2014. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -18,12 +18,18 @@
%%
-module(diameter_lib).
+-compile({no_auto_import, [now/0]}).
-export([info_report/2,
error_report/2,
warning_report/2,
+ now/0,
+ timestamp/1,
now_diff/1,
+ micro_diff/1,
+ micro_diff/2,
time/1,
+ seed/0,
eval/1,
eval_name/1,
get_stacktrace/0,
@@ -31,6 +37,8 @@
spawn_opts/2,
wait/1,
fold_tuple/3,
+ fold_n/3,
+ for_n/2,
log/4]).
%% ---------------------------------------------------------------------------
@@ -90,13 +98,51 @@ fmt(T) ->
end.
%% ---------------------------------------------------------------------------
+%% # now/0
+%% ---------------------------------------------------------------------------
+
+-type timestamp() :: {non_neg_integer(), 0..999999, 0..999999}.
+-type now() :: integer() %% monotonic time
+ | timestamp().
+
+-spec now()
+ -> now().
+
+%% Use monotonic time if it exists, fall back to erlang:now()
+%% otherwise.
+
+now() ->
+ try
+ erlang:monotonic_time()
+ catch
+ error: undef -> erlang:now()
+ end.
+
+%% ---------------------------------------------------------------------------
+%% # timestamp/1
+%% ---------------------------------------------------------------------------
+
+-spec timestamp(NowT :: now())
+ -> timestamp().
+
+timestamp({_,_,_} = T) -> %% erlang:now()
+ T;
+
+timestamp(MonoT) -> %% monotonic time
+ MicroSecs = monotonic_to_microseconds(MonoT + erlang:time_offset()),
+ Secs = MicroSecs div 1000000,
+ {Secs div 1000000, Secs rem 1000000, MicroSecs rem 1000000}.
+
+monotonic_to_microseconds(MonoT) ->
+ erlang:convert_time_unit(MonoT, native, micro_seconds).
+
+%% ---------------------------------------------------------------------------
%% # now_diff/1
%% ---------------------------------------------------------------------------
--spec now_diff(NowT)
+-spec now_diff(NowT :: now())
-> {Hours, Mins, Secs, MicroSecs}
- when NowT :: {non_neg_integer(), 0..999999, 0..999999},
- Hours :: non_neg_integer(),
+ when Hours :: non_neg_integer(),
Mins :: 0..59,
Secs :: 0..59,
MicroSecs :: 0..999999.
@@ -104,8 +150,37 @@ fmt(T) ->
%% Return timer:now_diff(now(), NowT) as an {H, M, S, MicroS} tuple
%% instead of as integer microseconds.
-now_diff({_,_,_} = Time) ->
- time(timer:now_diff(now(), Time)).
+now_diff(Time) ->
+ time(micro_diff(Time)).
+
+%% ---------------------------------------------------------------------------
+%% # micro_diff/1
+%% ---------------------------------------------------------------------------
+
+-spec micro_diff(NowT :: now())
+ -> MicroSecs
+ when MicroSecs :: non_neg_integer().
+
+micro_diff({_,_,_} = T0) ->
+ timer:now_diff(erlang:now(), T0);
+
+micro_diff(T0) -> %% monotonic time
+ monotonic_to_microseconds(erlang:monotonic_time() - T0).
+
+%% ---------------------------------------------------------------------------
+%% # micro_diff/2
+%% ---------------------------------------------------------------------------
+
+-spec micro_diff(T1 :: now(), T0 :: now())
+ -> MicroSecs
+ when MicroSecs :: non_neg_integer().
+
+micro_diff(T1, T0)
+ when is_integer(T1), is_integer(T0) -> %% monotonic time
+ monotonic_to_microseconds(T1 - T0);
+
+micro_diff(T1, T0) -> %% at least one erlang:now()
+ timer:now_diff(timestamp(T1), timestamp(T0)).
%% ---------------------------------------------------------------------------
%% # time/1
@@ -115,7 +190,7 @@ now_diff({_,_,_} = Time) ->
-spec time(NowT | Diff)
-> {Hours, Mins, Secs, MicroSecs}
- when NowT :: {non_neg_integer(), 0..999999, 0..999999},
+ when NowT :: timestamp(),
Diff :: non_neg_integer(),
Hours :: non_neg_integer(),
Mins :: 0..59,
@@ -134,6 +209,27 @@ time(Micro) -> %% elapsed time
{H, M, S, Micro rem 1000000}.
%% ---------------------------------------------------------------------------
+%% # seed/0
+%% ---------------------------------------------------------------------------
+
+-spec seed()
+ -> {timestamp(), {integer(), integer(), integer()}}.
+
+%% Return an argument for random:seed/1.
+
+seed() ->
+ T = now(),
+ {timestamp(T), seed(T)}.
+
+%% seed/1
+
+seed({_,_,_} = T) ->
+ T;
+
+seed(T) -> %% monotonic time
+ {erlang:phash2(node()), T, erlang:unique_integer()}.
+
+%% ---------------------------------------------------------------------------
%% # eval/1
%%
%% Evaluate a function in various forms.
@@ -247,17 +343,19 @@ opts(HeapSize, Opts) ->
%% # wait/1
%% ---------------------------------------------------------------------------
--spec wait([pid()])
+-spec wait([pid() | reference()])
-> ok.
wait(L) ->
- down([erlang:monitor(process, P) || P <- L]).
+ lists:foreach(fun down/1, L).
-down([]) ->
- ok;
-down([MRef|T]) ->
- receive {'DOWN', MRef, process, _, _} -> ok end,
- down(T).
+down(Pid)
+ when is_pid(Pid) ->
+ down(monitor(process, Pid));
+
+down(MRef)
+ when is_reference(MRef) ->
+ receive {'DOWN', MRef, process, _, _} = T -> T end.
%% ---------------------------------------------------------------------------
%% # fold_tuple/3
@@ -290,6 +388,35 @@ ft(Value, {Idx, T}) ->
setelement(Idx, T, Value).
%% ---------------------------------------------------------------------------
+%% # fold_n/3
+%% ---------------------------------------------------------------------------
+
+-spec fold_n(F, Acc0, N)
+ -> term()
+ when F :: fun((non_neg_integer(), term()) -> term()),
+ Acc0 :: term(),
+ N :: non_neg_integer().
+
+fold_n(F, Acc, N)
+ when is_integer(N), 0 < N ->
+ fold_n(F, F(N, Acc), N-1);
+
+fold_n(_, Acc, _) ->
+ Acc.
+
+%% ---------------------------------------------------------------------------
+%% # for_n/2
+%% ---------------------------------------------------------------------------
+
+-spec for_n(F, N)
+ -> non_neg_integer()
+ when F :: fun((non_neg_integer()) -> term()),
+ N :: non_neg_integer().
+
+for_n(F, N) ->
+ fold_n(fun(M,A) -> F(M), A+1 end, 0, N).
+
+%% ---------------------------------------------------------------------------
%% # log/4
%%
%% Called to have something to trace on for happenings of interest.
diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl
index e5d4b28766..ea326dd03e 100644
--- a/lib/diameter/src/base/diameter_peer.erl
+++ b/lib/diameter/src/base/diameter_peer.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -18,9 +18,11 @@
%%
-module(diameter_peer).
-
-behaviour(gen_server).
+-compile({no_auto_import, [now/0]}).
+-import(diameter_lib, [now/0]).
+
%% Interface towards transport modules ...
-export([recv/2,
up/1,
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index ee6e7dd89e..9ff6845ab7 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2014. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -63,6 +63,7 @@
%% Keys in process dictionary.
-define(CB_KEY, cb). %% capabilities callback
-define(DPR_KEY, dpr). %% disconnect callback
+-define(DPA_KEY, dpa). %% timeout for DPA reception
-define(REF_KEY, ref). %% transport_ref()
-define(Q_KEY, q). %% transport start queue
-define(START_KEY, start). %% start of connected transport
@@ -107,7 +108,8 @@
transport :: pid(), %% transport process
dictionary :: module(), %% common dictionary
service :: #diameter_service{},
- dpr = false :: false | {uint32(), uint32()},
+ dpr = false :: false | {uint32(), uint32()} %% set in old code
+ | {boolean(), uint32(), uint32()},
%% | hop by hop and end to end identifiers
length_errors :: exit | handle | discard}).
@@ -187,6 +189,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {Mask, Nodes, Dict0, Svc}}) ->
putr(?REF_KEY, Ref),
putr(?SEQUENCE_KEY, Mask),
putr(?RESTRICT_KEY, Nodes),
+ putr(?DPA_KEY, proplists:get_value(dpa_timeout, Opts, ?DPA_TIMEOUT)),
Tmo = proplists:get_value(capx_timeout, Opts, ?EVENT_TIMEOUT),
OnLengthErr = proplists:get_value(length_errors, Opts, exit),
@@ -212,9 +215,12 @@ wait(Ref, Pid) ->
Ref ->
ok;
{'DOWN', _, process, Pid, _} = D ->
- exit({shutdown, D})
+ x(D)
end.
+x(T) ->
+ exit({shutdown, T}).
+
start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) ->
Addrs0 = LCaps#diameter_caps.host_ip_address,
start_transport(Addrs0, {T, Opts, Svc}).
@@ -226,7 +232,7 @@ start_transport(Addrs0, T) ->
q_next(TPid, Addrs0, Tmo, Data),
{TPid, Addrs};
{error, No} ->
- exit({shutdown, {no_connection, No}})
+ x({no_connection, No})
end.
svc(#diameter_service{capabilities = LCaps0} = Svc, Addrs) ->
@@ -333,6 +339,9 @@ eraser(Key) ->
%% transition/2
+transition(T, #state{dpr = {Hid, Eid}} = S) -> %% DPR sent from old code
+ transition(T, S#state{dpr = {false, Hid, Eid}});
+
%% Connection to peer.
transition({diameter, {TPid, connected, Remote}},
#state{transport = TPid,
@@ -397,9 +406,8 @@ transition({timeout, _}, _) ->
ok;
%% Outgoing message.
-transition({send, Msg}, #state{transport = TPid}) ->
- send(TPid, Msg),
- ok;
+transition({send, Msg}, S) ->
+ outgoing(Msg, S);
%% Request for graceful shutdown at remove_transport, stop_service of
%% application shutdown.
@@ -516,12 +524,9 @@ encode(Rec, Dict) ->
recv(#diameter_packet{header = #diameter_header{} = Hdr}
= Pkt,
- #state{parent = Pid,
- dictionary = Dict0}
+ #state{dictionary = Dict0}
= S) ->
- Name = diameter_codec:msg_name(Dict0, Hdr),
- Pid ! {recv, self(), Name, Pkt},
- rcv(Name, Pkt, S);
+ recv1(diameter_codec:msg_name(Dict0, Hdr), Pkt, S);
recv(#diameter_packet{header = undefined,
bin = Bin}
@@ -532,6 +537,35 @@ recv(#diameter_packet{header = undefined,
recv(Bin, S) ->
recv(#diameter_packet{bin = Bin}, S).
+%% recv1/3
+
+%% Incoming request after DPR has been sent: discard. Don't discard
+%% DPR, so both ends don't do so when sending simultaneously.
+recv1(Name,
+ #diameter_packet{header = #diameter_header{is_request = true} = H},
+ #state{dpr = {_,_,_}})
+ when Name /= 'DPR' ->
+ invalid(false, recv_after_dpr, H);
+
+%% DPA with identifier mismatch, or in response to a DPR initiated by
+%% the service.
+recv1('DPA' = N,
+ #diameter_packet{header = #diameter_header{hop_by_hop_id = Hid,
+ end_to_end_id = Eid}}
+ = Pkt,
+ #state{dpr = {X,H,E}}
+ = S)
+ when H /= Hid;
+ E /= Eid;
+ not X ->
+ rcv(N, Pkt, S);
+
+%% Any other message with a header and no length errors: send to the
+%% parent.
+recv1(Name, Pkt, #state{parent = Pid} = S) ->
+ Pid ! {recv, self(), Name, Pkt},
+ rcv(Name, Pkt, S).
+
%% recv/3
recv(#diameter_header{length = Len}
@@ -588,7 +622,7 @@ rcv(Name, _, #state{state = PS})
rcv('DPR' = N, Pkt, S) ->
handle_request(N, Pkt, S);
-%% DPA in response to DPR and with the expected identifiers.
+%% DPA in response to DPR, with the expected identifiers.
rcv('DPA' = N,
#diameter_packet{header = #diameter_header{end_to_end_id = Eid,
hop_by_hop_id = Hid}
@@ -596,10 +630,15 @@ rcv('DPA' = N,
= Pkt,
#state{dictionary = Dict0,
transport = TPid,
- dpr = {Hid, Eid}}) ->
+ dpr = {X, Hid, Eid}}) ->
?LOG(recv, N),
- incr(recv, H, Dict0),
- incr_rc(recv, diameter_codec:decode(Dict0, Pkt), Dict0),
+ X orelse begin
+ %% Only count DPA in response to a DPR sent by the
+ %% service: explicit DPR is counted in the same way
+ %% as other explicitly sent requests.
+ incr(recv, H, Dict0),
+ incr_rc(recv, diameter_codec:decode(Dict0, Pkt), Dict0)
+ end,
diameter_peer:close(TPid),
{stop, N};
@@ -637,9 +676,59 @@ incr_error(Dir, Pkt, Dict0) ->
%% Msg here could be a #diameter_packet or a binary depending on who's
%% sending. In particular, the watchdog will send DWR as a binary
%% while messages coming from clients will be in a #diameter_packet.
+
send(Pid, Msg) ->
diameter_peer:send(Pid, Msg).
+%% outgoing/2
+
+%% Explicit DPR.
+outgoing(#diameter_packet{header = #diameter_header{application_id = 0,
+ cmd_code = 282,
+ is_request = true}
+ = H}
+ = Pkt,
+ #state{dpr = T,
+ parent = Pid}
+ = S) ->
+ if T == false ->
+ inform_dpr(Pid),
+ send_dpr(true, Pkt, dpa_timeout(), S);
+ true ->
+ invalid(false, dpr_after_dpr, H) %% already sent: discard
+ end;
+
+%% Explict CER or DWR: discard. These are sent by us.
+outgoing(#diameter_packet{header = #diameter_header{application_id = 0,
+ cmd_code = C,
+ is_request = true}
+ = H},
+ _)
+ when 257 == C; %% CER
+ 280 == C -> %% DWR
+ invalid(false, invalid_request, H);
+
+%% DPR not sent: send.
+outgoing(Msg, #state{transport = TPid, dpr = false}) ->
+ send(TPid, Msg),
+ ok;
+
+%% Outgoing answer: send.
+outgoing(#diameter_packet{header = #diameter_header{is_request = false}}
+ = Pkt,
+ #state{transport = TPid}) ->
+ send(TPid, Pkt),
+ ok;
+
+%% Outgoing request: discard.
+outgoing(Msg, #state{dpr = {_,_,_}}) ->
+ invalid(false, send_after_dpr, header(Msg)).
+
+header(#diameter_packet{header = H}) ->
+ H;
+header(Bin) -> %% DWR
+ diameter_codec:decode_header(Bin).
+
%% handle_request/3
%%
%% Incoming CER or DPR.
@@ -748,8 +837,11 @@ cea(CEA, RC, Dict0) ->
post('CER' = T, RC, Pkt, S) ->
{T, caps(S), {RC, Pkt}};
-post('DPR' = T, _, _, #state{parent = Pid}) ->
- [fun(S) -> Pid ! {T, self()}, S end].
+post('DPR', _, _, #state{parent = Pid}) ->
+ [fun(S) -> inform_dpr(Pid), S end].
+
+inform_dpr(Pid) ->
+ Pid ! {'DPR', self()}. %% tell watchdog to die with us
rejected({capabilities_cb, _F, Reason}, T, S) ->
rejected(Reason, T, S);
@@ -1073,10 +1165,9 @@ dpr([CB|Rest], [Reason | _] = Args, S) ->
dpr([], [Reason | _], S) ->
send_dpr(Reason, [], S).
--record(opts, {cause, timeout = ?DPA_TIMEOUT}).
+-record(opts, {cause, timeout}).
-send_dpr(Reason, Opts, #state{transport = TPid,
- dictionary = Dict,
+send_dpr(Reason, Opts, #state{dictionary = Dict,
service = #diameter_service{capabilities = Caps}}
= S) ->
#opts{cause = Cause, timeout = Tmo}
@@ -1085,24 +1176,37 @@ send_dpr(Reason, Opts, #state{transport = TPid,
transport -> ?GOAWAY;
_ -> ?REBOOT
end,
- timeout = ?DPA_TIMEOUT},
+ timeout = dpa_timeout()},
Opts),
#diameter_caps{origin_host = {OH, _},
origin_realm = {OR, _}}
= Caps,
- #diameter_packet{header = #diameter_header{end_to_end_id = Eid,
- hop_by_hop_id = Hid}}
- = Pkt
- = encode(['DPR', {'Origin-Host', OH},
+ Pkt = encode(['DPR', {'Origin-Host', OH},
{'Origin-Realm', OR},
{'Disconnect-Cause', Cause}],
Dict),
- incr(send, Pkt, Dict),
+ send_dpr(false, Pkt, Tmo, S).
+
+%% send_dpr/4
+
+send_dpr(X,
+ #diameter_packet{header = #diameter_header{end_to_end_id = Eid,
+ hop_by_hop_id = Hid}}
+ = Pkt,
+ Tmo,
+ #state{transport = TPid,
+ dictionary = Dict}
+ = S) ->
+ %% Only count DPR sent by the service: explicit DPR is counted in
+ %% the same way as other explicitly sent requests.
+ X orelse incr(send, Pkt, Dict),
send(TPid, Pkt),
dpa_timer(Tmo),
?LOG(send, 'DPR'),
- S#state{dpr = {Hid, Eid}}.
+ S#state{dpr = {X, Hid, Eid}}.
+
+%% opt/2
opt({timeout, Tmo}, Rec)
when ?IS_TIMEOUT(Tmo) ->
@@ -1125,6 +1229,14 @@ cause(N) ->
dpa_timer(Tmo) ->
erlang:send_after(Tmo, self(), dpa_timeout).
+dpa_timeout() ->
+ dpa_timeout(getr(?DPA_KEY)).
+
+dpa_timeout(undefined) ->
+ ?DPA_TIMEOUT;
+dpa_timeout(Tmo) ->
+ Tmo.
+
%% register_everywhere/1
%%
%% Register a term and ensure it's not registered elsewhere. Note that
diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl
index 3197c1aee1..f785777874 100644
--- a/lib/diameter/src/base/diameter_reg.erl
+++ b/lib/diameter/src/base/diameter_reg.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -22,10 +22,11 @@
%%
-module(diameter_reg).
--compile({no_auto_import, [monitor/2]}).
-
-behaviour(gen_server).
+-compile({no_auto_import, [monitor/2, now/0]}).
+-import(diameter_lib, [now/0]).
+
-export([add/1,
add_new/1,
del/1,
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index 76b05a2ad4..04401a3d87 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2014. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -24,6 +24,9 @@
-module(diameter_service).
-behaviour(gen_server).
+-compile({no_auto_import, [now/0]}).
+-import(diameter_lib, [now/0]).
+
%% towards diameter_service_sup
-export([start_link/1]).
@@ -610,8 +613,9 @@ st(#watchdog{ref = Ref, pid = Pid}, Refs) ->
%% st/3
st(#watchdog{pid = Pid}, Reason, Acc) ->
+ MRef = monitor(process, Pid),
Pid ! {shutdown, self(), Reason},
- [Pid | Acc].
+ [MRef | Acc].
%% ---------------------------------------------------------------------------
%% # call_service/2
@@ -765,8 +769,9 @@ reason(failure) ->
start(Ref, {T, Opts}, S)
when T == connect;
T == listen ->
+ N = proplists:get_value(pool_size, Opts, 1),
try
- {ok, start(Ref, type(T), Opts, S)}
+ {ok, start(Ref, type(T), Opts, N, S)}
catch
?FAILURE(Reason) ->
{error, Reason}
@@ -784,11 +789,16 @@ type(connect = T) -> T.
%% start/4
-start(Ref, Type, Opts, #state{watchdogT = WatchdogT,
- peerT = PeerT,
- options = SvcOpts,
- service_name = SvcName,
- service = Svc0})
+start(Ref, Type, Opts, State) ->
+ start(Ref, Type, Opts, 1, State).
+
+%% start/5
+
+start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT,
+ peerT = PeerT,
+ options = SvcOpts,
+ service_name = SvcName,
+ service = Svc0})
when Type == connect;
Type == accept ->
#diameter_service{applications = Apps}
@@ -796,14 +806,19 @@ start(Ref, Type, Opts, #state{watchdogT = WatchdogT,
= merge_service(Opts, Svc0),
{_,_} = Mask = proplists:get_value(sequence, SvcOpts),
RecvData = diameter_traffic:make_recvdata([SvcName, PeerT, Apps, Mask]),
- Pid = s(Type, Ref, {{spawn_opts([Opts, SvcOpts]), RecvData},
- Opts,
- SvcOpts,
- Svc}),
- insert(WatchdogT, #watchdog{pid = Pid,
- type = Type,
- ref = Ref,
- options = Opts}),
+ T = {{spawn_opts([Opts, SvcOpts]), RecvData}, Opts, SvcOpts, Svc},
+ Rec = #watchdog{type = Type,
+ ref = Ref,
+ options = Opts},
+ diameter_lib:fold_n(fun(_,A) ->
+ [wd(Type, Ref, T, WatchdogT, Rec) | A]
+ end,
+ [],
+ N).
+
+wd(Type, Ref, T, WatchdogT, Rec) ->
+ Pid = wd(Type, Ref, T),
+ insert(WatchdogT, Rec#watchdog{pid = Pid}),
Pid.
%% Note that the service record passed into the watchdog is the merged
@@ -816,7 +831,7 @@ spawn_opts(Optss) ->
T /= link,
T /= monitor].
-s(Type, Ref, T) ->
+wd(Type, Ref, T) ->
{_MRef, Pid} = diameter_watchdog:start({Type, Ref}, T),
Pid.
@@ -1185,7 +1200,7 @@ connect_timer(Opts, Def0) ->
%% continuous restarted in case of faulty config or other problems.
tc(Time, Tc) ->
choose(Tc > ?RESTART_TC
- orelse timer:now_diff(now(), Time) > 1000*?RESTART_TC,
+ orelse diameter_lib:micro_diff(Time) > 1000*?RESTART_TC,
Tc,
?RESTART_TC).
@@ -1718,31 +1733,43 @@ info_transport(S) ->
[],
PeerD).
-%% Only a config entry for a listening transport: use it.
-transport([[{type, listen}, _] = L]) ->
- L ++ [{accept, []}];
-
-%% Only one config or peer entry for a connecting transport: use it.
-transport([[{type, connect} | _] = L]) ->
- L;
+%% Single config entry. Distinguish between pool_size config or not on
+%% a connecting transport for backwards compatibility: with the option
+%% the form is similar to the listening case, with connections grouped
+%% in a pool tuple (for lack of a better name), without as before.
+transport([[{type, Type}, {options, Opts}] = L])
+ when Type == listen;
+ Type == connect ->
+ L ++ [{K, []} || [{_,K}] <- [keys(Type, Opts)]];
%% Peer entries: discard config. Note that the peer entries have
%% length at least 3.
transport([[_,_] | L]) ->
transport(L);
-%% Possibly many peer entries for a listening transport. Note that all
-%% have the same options by construction, which is not terribly space
-%% efficient.
-transport([[{type, accept}, {options, Opts} | _] | _] = Ls) ->
- [{type, listen},
+%% Multiple tranports. Note that all have the same options by
+%% construction, which is not terribly space efficient.
+transport([[{type, Type}, {options, Opts} | _] | _] = Ls) ->
+ transport(keys(Type, Opts), Ls).
+
+%% Group transports in an accept or pool tuple ...
+transport([{Type, Key}], [[{type, _}, {options, Opts} | _] | _] = Ls) ->
+ [{type, Type},
{options, Opts},
- {accept, [lists:nthtail(2,L) || L <- Ls]}].
+ {Key, [tl(tl(L)) || L <- Ls]}];
+
+%% ... or not: there can only be one.
+transport([], [L]) ->
+ L.
+
+keys(connect = T, Opts) ->
+ [{T, pool} || lists:keymember(pool_size, 1, Opts)];
+keys(_, _) ->
+ [{listen, accept}].
peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) ->
try ets:tab2list(WatchdogT) of
- L ->
- lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L)
+ L -> lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L)
catch
error: badarg -> Dict0 %% service has gone down
end.
diff --git a/lib/diameter/src/base/diameter_service_sup.erl b/lib/diameter/src/base/diameter_service_sup.erl
index 153fff902f..e3177f0083 100644
--- a/lib/diameter/src/base/diameter_service_sup.erl
+++ b/lib/diameter/src/base/diameter_service_sup.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -58,7 +58,7 @@ init([]) ->
ChildSpec = {Mod,
{Mod, start_link, []},
temporary,
- 1000,
+ 5000,
worker,
[Mod]},
{ok, {Flags, [ChildSpec]}}.
diff --git a/lib/diameter/src/base/diameter_session.erl b/lib/diameter/src/base/diameter_session.erl
index 3b236f109a..c5ea0428b5 100644
--- a/lib/diameter/src/base/diameter_session.erl
+++ b/lib/diameter/src/base/diameter_session.erl
@@ -157,8 +157,8 @@ session_id(Host) ->
%% ---------------------------------------------------------------------------
init() ->
- Now = now(),
- random:seed(Now),
+ {Now, Seed} = diameter_lib:seed(),
+ random:seed(Seed),
Time = time32(Now),
Seq = (?INT32 band (Time bsl 20)) bor (random:uniform(1 bsl 20) - 1),
ets:insert(diameter_sequence, [{origin_state_id, Time},
diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl
index 8353613d32..64ea082be0 100644
--- a/lib/diameter/src/base/diameter_stats.erl
+++ b/lib/diameter/src/base/diameter_stats.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2014. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -22,9 +22,11 @@
%%
-module(diameter_stats).
-
-behaviour(gen_server).
+-compile({no_auto_import, [now/0]}).
+-import(diameter_lib, [now/0]).
+
-export([reg/2, reg/1,
incr/3, incr/1,
read/1,
diff --git a/lib/diameter/src/base/diameter_sup.erl b/lib/diameter/src/base/diameter_sup.erl
index e5afd23dcd..4ede4086d8 100644
--- a/lib/diameter/src/base/diameter_sup.erl
+++ b/lib/diameter/src/base/diameter_sup.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -64,7 +64,7 @@ spec(Mod) ->
{Mod,
{Mod, start_link, []},
permanent,
- 1000,
+ infinity,
supervisor,
[Mod]}.
diff --git a/lib/diameter/src/base/diameter_sync.erl b/lib/diameter/src/base/diameter_sync.erl
index ce2db4b3a2..90eabece3d 100644
--- a/lib/diameter/src/base/diameter_sync.erl
+++ b/lib/diameter/src/base/diameter_sync.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -27,6 +27,9 @@
-module(diameter_sync).
-behaviour(gen_server).
+-compile({no_auto_import, [now/0]}).
+-import(diameter_lib, [now/0]).
+
-export([call/4, call/5,
cast/4, cast/5,
carp/1, carp/2]).
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index 3717e43e4a..0c62ccea49 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -162,24 +162,28 @@ incr_error(Dir, Id, TPid) ->
%% incr_rc/4
%% ---------------------------------------------------------------------------
--spec incr_rc(send|recv, Pkt, TPid, Dict0)
+-spec incr_rc(send|recv, Pkt, TPid, DictT)
-> {Counter, non_neg_integer()}
| Reason
when Pkt :: #diameter_packet{},
TPid :: pid(),
- Dict0 :: module(),
+ DictT :: module() | {module(), module(), module()},
Counter :: {'Result-Code', integer()}
| {'Experimental-Result', integer(), integer()},
Reason :: atom().
-incr_rc(Dir, Pkt, TPid, Dict0) ->
+incr_rc(Dir, Pkt, TPid, {Dict, _, _} = DictT) ->
try
- incr_result(Dir, Pkt, TPid, {Dict0, Dict0, Dict0})
+ incr_result(Dir, Pkt, TPid, DictT)
catch
exit: {E,_} when E == no_result_code;
E == invalid_error_bit ->
+ incr(TPid, {msg_id(Pkt#diameter_packet.header, Dict), Dir, E}),
E
- end.
+ end;
+
+incr_rc(Dir, Pkt, TPid, Dict0) ->
+ incr_rc(Dir, Pkt, TPid, {Dict0, Dict0, Dict0}).
%% ---------------------------------------------------------------------------
%% pending/1
@@ -678,7 +682,7 @@ local(Msg, TPid, {Dict, AppDict, Dict0} = DictT, Fs, ReqPkt) ->
reset(make_answer_packet(Msg, ReqPkt), Dict, Dict0),
Fs),
incr(send, Pkt, TPid, AppDict),
- incr_result(send, Pkt, TPid, DictT), %% count outgoing
+ incr_rc(send, Pkt, TPid, DictT), %% count outgoing
send(TPid, Pkt).
%% reset/3
@@ -1388,6 +1392,21 @@ make_request_packet(#diameter_packet{header = Hdr} = Pkt,
make_request_packet(Msg, Pkt) ->
Pkt#diameter_packet{msg = Msg}.
+%% make_retransmit_packet/2
+
+make_retransmit_packet(#diameter_packet{msg = [#diameter_header{} = Hdr
+ | Avps]}
+ = Pkt) ->
+ Pkt#diameter_packet{msg = [make_retransmit_header(Hdr) | Avps]};
+
+make_retransmit_packet(#diameter_packet{header = Hdr} = Pkt) ->
+ Pkt#diameter_packet{header = make_retransmit_header(Hdr)}.
+
+%% make_retransmit_header/1
+
+make_retransmit_header(Hdr) ->
+ Hdr#diameter_header{is_retransmitted = true}.
+
%% fold_record/2
fold_record(undefined, R) ->
@@ -1674,9 +1693,7 @@ retransmit({TPid, Caps, App}
have_request(Pkt0, TPid) %% Don't failover to a peer we've
andalso ?THROW(timeout), %% already sent to.
- #diameter_packet{header = Hdr0} = Pkt0,
- Hdr = Hdr0#diameter_header{is_retransmitted = true},
- Pkt = Pkt0#diameter_packet{header = Hdr},
+ Pkt = make_retransmit_packet(Pkt0),
retransmit(cb(App, prepare_retransmit, [Pkt, SvcName, {TPid, Caps}]),
Transport,
diff --git a/lib/diameter/src/base/diameter_types.erl b/lib/diameter/src/base/diameter_types.erl
index ca3338be5f..442d90c98b 100644
--- a/lib/diameter/src/base/diameter_types.erl
+++ b/lib/diameter/src/base/diameter_types.erl
@@ -75,7 +75,7 @@
%% message indicating this error MUST include the offending AVPs
%% within a Failed-AVP AVP.
%%
--define(INVALID_LENGTH(Bin), erlang:error({'DIAMETER', 5014, Bin})).
+-define(INVALID_LENGTH(Bitstr), erlang:error({'DIAMETER', 5014, Bitstr})).
%% -------------------------------------------------------------------------
%% 3588, 4.2. Basic AVP Data Formats
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index b7f2d24941..ff51c6dcf7 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2014. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -122,7 +122,8 @@ i({Ack, T, Pid, {RecvData,
= Svc}}) ->
erlang:monitor(process, Pid),
wait(Ack, Pid),
- random:seed(now()),
+ {_, Seed} = diameter_lib:seed(),
+ random:seed(Seed),
putr(restart, {T, Opts, Svc}), %% save seeing it in trace
putr(dwr, dwr(Caps)), %%
{_,_} = Mask = proplists:get_value(sequence, SvcOpts),
@@ -332,8 +333,9 @@ transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid,
send(TPid, {T, self(), Reason}),
S#watchdog{shutdown = true};
-%% Transport is telling us that DPA has been sent in response to DPR:
-%% its death should lead to ours.
+%% Transport is telling us that DPA has been sent in response to DPR,
+%% or that DPR has been explicitly sent: transport death should lead
+%% to ours.
transition({'DPR', TPid}, #watchdog{transport = TPid} = S) ->
S#watchdog{shutdown = true};
@@ -549,7 +551,7 @@ send_watchdog(#watchdog{pending = false,
?LOG(send, 'DWR'),
S#watchdog{pending = true}.
-%% Dont' count encode errors since we don't expect any on DWR/DWA.
+%% Don't count encode errors since we don't expect any on DWR/DWA.
%% recv/3
@@ -590,9 +592,10 @@ rcv('DWA', Pkt, #watchdog{transport = TPid,
rcv(N, _, _)
when N == 'CER';
N == 'CEA';
- N == 'DPR';
- N == 'DPA' ->
+ N == 'DPR' ->
false;
+%% DPR can be sent explicitly with diameter:call/4. Only the
+%% corresponding DPAs arrive here.
rcv(_, Pkt, #watchdog{transport = TPid,
dictionary = Dict0,
diff --git a/lib/diameter/src/modules.mk b/lib/diameter/src/modules.mk
index a2a7a51892..c9dd4e683a 100644
--- a/lib/diameter/src/modules.mk
+++ b/lib/diameter/src/modules.mk
@@ -1,7 +1,7 @@
# %CopyrightBegin%
#
-# Copyright Ericsson AB 2010-2014. All Rights Reserved.
+# Copyright Ericsson AB 2010-2015. All Rights Reserved.
#
# The contents of this file are subject to the Erlang Public License,
# Version 1.1, (the "License"); you may not use this file except in
@@ -94,7 +94,7 @@ BINS = \
# Released files relative to ../examples.
EXAMPLES = \
code/GNUmakefile \
- code/peer.erl \
+ code/node.erl \
code/client.erl \
code/client_cb.erl \
code/server.erl \
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 32e7aaca39..2c8d6f0a14 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2014. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -18,9 +18,11 @@
%%
-module(diameter_sctp).
-
-behaviour(gen_server).
+-compile({no_auto_import, [now/0]}).
+-import(diameter_lib, [now/0]).
+
%% interface
-export([start/3]).
@@ -37,7 +39,8 @@
code_change/3,
terminate/2]).
--export([info/1]). %% service_info callback
+-export([listener/1,%% diameter_sync callback
+ info/1]). %% service_info callback
-export([ports/0,
ports/1]).
@@ -99,22 +102,31 @@
-record(listener,
{ref :: reference(),
socket :: gen_sctp:sctp_socket(),
- count = 0 :: uint(),
+ count = 0 :: uint(), %% attached transport processes
tmap = ets:new(?MODULE, []) :: ets:tid(),
%% {MRef, Pid|AssocId}, {AssocId, Pid}
pending = {0, ets:new(?MODULE, [ordered_set])},
tref :: reference(),
accept :: [match()]}).
%% Field tmap is used to map an incoming message or event to the
-%% relevent transport process. Field pending implements a queue of
-%% transport processes to which an association has been assigned (at
-%% comm_up and written into tmap) but for which diameter hasn't yet
-%% spawned a transport process: a short-lived state of affairs as a
-%% new transport is spawned as a consequence of a peer being taken up,
-%% transport processes being spawned by the listener on demand. In
-%% case diameter starts a transport before comm_up on a new
-%% association, pending is set to an improper list with the spawned
-%% transport as head and the queue as tail.
+%% relevant transport process. Field pending implements two queues:
+%% the first of transport-to-be processes to which an association has
+%% been assigned (at comm_up and written into tmap) but for which
+%% diameter hasn't yet spawned a transport process, a short-lived
+%% state of affairs as a new transport is spawned as a consequence of
+%% a peer being taken up, transport processes being spawned by the
+%% listener on demand; the second of started transport processes that
+%% have not yet been assigned an association.
+%%
+%% When diameter calls start/3, the transport process is either taken
+%% from the first queue or spawned and placed in the second queue
+%% until an association is established. When an association is
+%% established, a controlling process is either taken from the second
+%% queue or spawned and placed in the first queue. Thus, there are
+%% only elements in one queue at a time, so share an ets table queue
+%% and tag it with a positive length if it contains the first queue, a
+%% negative length if it contains the second queue. The case -1 is
+%% handled differently for backwards compatibility reasons.
%% ---------------------------------------------------------------------------
%% # start/3
@@ -139,9 +151,9 @@ ip(T) ->
T.
%% A listener spawns transports either as a consequence of this call
-%% when there is not yet an association to associate with it, or at
-%% comm_up on a new association in which case the call retrieves a
-%% transport from the pending queue.
+%% when there is not yet an association to assign it, or at comm_up on
+%% a new association in which case the call retrieves a transport from
+%% the pending queue.
s({accept, Ref} = A, Addrs, Opts) ->
{LPid, LAs} = listener(Ref, {Opts, Addrs}),
try gen_server:call(LPid, {A, self()}, infinity) of
@@ -226,7 +238,7 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
{LAs, Sock} = open(Addrs, Rest, 0),
putr(?REF_KEY, Ref),
proc_lib:init_ack({ok, self(), LAs}),
- erlang:monitor(process, Pid),
+ monitor(process, Pid),
#transport{parent = Pid,
mode = {connect, connect(Sock, RAs, RP, [])},
socket = Sock};
@@ -236,8 +248,8 @@ i({accept, Pid, LPid, Sock, Ref})
when is_pid(Pid) ->
putr(?REF_KEY, Ref),
proc_lib:init_ack({ok, self()}),
- erlang:monitor(process, Pid),
- erlang:monitor(process, LPid),
+ monitor(process, Pid),
+ monitor(process, LPid),
#transport{parent = Pid,
mode = {accept, LPid},
socket = Sock};
@@ -246,7 +258,7 @@ i({accept, Pid, LPid, Sock, Ref})
i({accept, Ref, LPid, Sock, Id}) ->
putr(?REF_KEY, Ref),
proc_lib:init_ack({ok, self()}),
- MRef = erlang:monitor(process, LPid),
+ MRef = monitor(process, LPid),
%% Wait for a signal that the transport has been started before
%% processing other messages.
receive
@@ -270,15 +282,23 @@ close(Sock, Id) ->
%% listener/2
+%% Accepting processes can be started concurrently: ensure only one
+%% listener is started.
listener(LRef, T) ->
+ diameter_sync:call({?MODULE, listener, LRef},
+ {?MODULE, listener, [{LRef, T}]},
+ infinity,
+ infinity).
+
+listener({LRef, T}) ->
l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T).
-%% Existing process with the listening socket ...
+%% Existing listening process ...
l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
- {LAs, _Sock} = AS,
- {LPid, LAs};
-
-%% ... or not: start one.
+ {LAs, _Sock} = AS,
+ {LPid, LAs};
+
+%% ... or not.
l([], LRef, T) ->
{ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}),
{LPid, LAs}.
@@ -347,11 +367,17 @@ type(T) ->
%% # handle_call/3
%% ---------------------------------------------------------------------------
+handle_call(T, From, #listener{pending = L} = S)
+ when is_list(L) ->
+ handle_call(T, From, upgrade(S));
+
handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref,
- count = N}
+ pending = {N,Q},
+ count = K}
= S) ->
- {TPid, NewS} = accept(Ref, Pid, S),
- {reply, {ok, TPid}, NewS#listener{count = N+1}};
+ TPid = accept(Ref, Pid, S),
+ {reply, {ok, TPid}, downgrade(S#listener{pending = {N-1,Q},
+ count = K+1})};
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -370,8 +396,46 @@ handle_cast(_, State) ->
handle_info(T, #transport{} = S) ->
{noreply, #transport{} = t(T,S)};
+handle_info(T, #listener{pending = L} = S)
+ when is_list(L) ->
+ handle_info(T, upgrade(S));
+
handle_info(T, #listener{} = S) ->
- {noreply, #listener{} = l(T,S)}.
+ {noreply, downgrade(#listener{} = l(T,S))}.
+
+%% upgrade/1
+
+upgrade(#listener{pending = [TPid | {0,Q}]} = S) ->
+ ets:insert(Q, {TPid, now()}),
+ S#listener{pending = {-1,Q}}.
+%% Prior to the possiblity of setting pool_size on in transport
+%% configuration, a new accepting transport was only started following
+%% the death of a predecessor, so that there was only at most one
+%% previously started transport process waiting for an association.
+%% This assumption no longer holds with pool_size > 1, in which case
+%% several accepting transports are started concurrently. Deal with
+%% this by placing the started transports in a new queue of transport
+%% processes waiting for an association.
+%%
+%% Since only one of this queue and the existing queue of controlling
+%% processes waiting for a transport to be started can be non-empty at
+%% any given time, implement both queues in the same ets table. The
+%% absolute value of the first element of the 2-tuple is the queue
+%% length, the sign says which queue it is.
+
+%% downgrade/1
+%%
+%% Revert to the pre-pool_size representation when possible, for
+%% backwards compatibility in the case that the pool_size option
+%% hasn't been used.
+
+downgrade(#listener{pending = {-1,Q}} = S) ->
+ TPid = ets:first(Q),
+ ets:delete(Q, TPid),
+ S#listener{pending = [TPid | {0,Q}]};
+
+downgrade(S) ->
+ S.
%% ---------------------------------------------------------------------------
%% # code_change/3
@@ -436,54 +500,46 @@ l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) ->
setopts(Sock)
end;
-%% Transport is asking message to be sent. See send/3 for why the send
-%% isn't directly from the transport.
-l({send, AssocId, StreamId, Bin}, #listener{socket = Sock} = S) ->
- send(Sock, AssocId, StreamId, Bin),
- S;
+l({'DOWN', MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) ->
+ down(ets:member(Q, TPid), MRef, TPid, S);
+
+%% Timeout after the last accepting process has died.
+l({timeout, TRef, close = T}, #listener{tref = TRef,
+ count = 0}) ->
+ x(T);
+l({timeout, _, close}, #listener{} = S) ->
+ S.
+
+%% down/4
%% Accepting transport has died. One that's awaiting an association ...
-l({'DOWN', MRef, process, TPid, _}, #listener{pending = [TPid | Q],
- tmap = T,
- count = N}
- = S) ->
+down(true, MRef, TPid, #listener{pending = {N,Q},
+ tmap = T,
+ count = K}
+ = S)
+ when N < 0 ->
+ ets:delete(Q, TPid),
ets:delete(T, MRef),
ets:delete(T, TPid),
- start_timer(S#listener{count = N-1,
- pending = Q});
-
-%% ... ditto and a new transport has already been started ...
-l({'DOWN', _, process, _, _} = T, #listener{pending = [TPid | Q]}
- = S) ->
- #listener{pending = NQ}
- = NewS
- = l(T, S#listener{pending = Q}),
- NewS#listener{pending = [TPid | NQ]};
-
-%% ... or not.
-l({'DOWN', MRef, process, TPid, _}, #listener{socket = Sock,
- tmap = T,
- count = N,
- pending = {P,Q}}
- = S) ->
+ start_timer(S#listener{count = K-1,
+ pending = {N+1,Q}});
+
+%% ... or one that already has one.
+down(B, MRef, TPid, #listener{socket = Sock,
+ tmap = T,
+ count = K,
+ pending = {N,Q}}
+ = S) ->
[{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId
ets:delete(T, MRef),
ets:delete(T, Id),
Id == TPid orelse close(Sock, Id),
- case ets:lookup(Q, TPid) of
- [{TPid, _}] -> %% transport in the pending queue ...
+ if B -> %% Waiting for attachment in the pending queue ...
ets:delete(Q, TPid),
- S#listener{pending = {P-1, Q}};
- [] -> %% ... or not
- start_timer(S#listener{count = N-1})
- end;
-
-%% Timeout after the last accepting process has died.
-l({timeout, TRef, close = T}, #listener{tref = TRef,
- count = 0}) ->
- x(T);
-l({timeout, _, close}, #listener{} = S) ->
- S.
+ S#listener{pending = {N-1,Q}};
+ true -> %% ... or already attached
+ start_timer(S#listener{count = K-1})
+ end.
%% t/2
%%
@@ -582,29 +638,24 @@ accept(Opts) ->
%% No pending associations: spawn a new transport.
accept(Ref, Pid, #listener{socket = Sock,
tmap = T,
- pending = {0,_} = Q}
- = S) ->
+ pending = {N,Q}})
+ when N =< 0 ->
Arg = {accept, Pid, self(), Sock, Ref},
{ok, TPid} = diameter_sctp_sup:start_child(Arg),
- MRef = erlang:monitor(process, TPid),
+ MRef = monitor(process, TPid),
ets:insert(T, [{MRef, TPid}, {TPid, MRef}]),
- {TPid, S#listener{pending = [TPid | Q]}};
-%% Placing the transport in the pending field makes it available to
-%% the next association. The stack starts a new accepting transport
-%% only after this one brings the connection up (or dies).
-
-%% Accepting transport has died. This can happen if a new transport is
-%% started before the DOWN has arrived.
-accept(Ref, Pid, #listener{pending = [TPid | {0,_} = Q]} = S) ->
- false = is_process_alive(TPid), %% assert
- accept(Ref, Pid, S#listener{pending = Q});
+ ets:insert(Q, {TPid, now()}),
+ TPid;
+%% Placing the transport in the second pending table makes it
+%% available to the next association.
%% Pending associations: attach to the first in the queue.
-accept(_, Pid, #listener{ref = Ref, pending = {N,Q}} = S) ->
+accept(_, Pid, #listener{ref = Ref,
+ pending = {_,Q}}) ->
TPid = ets:first(Q),
TPid ! {Ref, Pid},
ets:delete(Q, TPid),
- {TPid, S#listener{pending = {N-1, Q}}}.
+ TPid.
%% send/2
@@ -718,34 +769,12 @@ up(#transport{parent = Pid,
find(Id, Data, #listener{tmap = T} = S) ->
f(ets:lookup(T, Id), Data, S).
-%% New association and a transport waiting for one: use it.
-f([],
- {_, #sctp_assoc_change{state = comm_up,
- assoc_id = Id}},
- #listener{tmap = T,
- pending = [TPid | {_,_} = Q]}
- = S) ->
- [{TPid, MRef}] = ets:lookup(T, TPid),
- ets:insert(T, [{MRef, Id}, {Id, TPid}]),
- ets:delete(T, TPid),
- {TPid, S#listener{pending = Q}};
-
-%% New association and no transport start yet: spawn one and place it
-%% in the queue.
+%% New association ...
f([],
- {_, #sctp_assoc_change{state = comm_up,
- assoc_id = Id}},
- #listener{ref = Ref,
- socket = Sock,
- tmap = T,
- pending = {N,Q}}
+ {_, #sctp_assoc_change{state = comm_up, assoc_id = Id}},
+ #listener{pending = {N,Q}}
= S) ->
- Arg = {accept, Ref, self(), Sock, Id},
- {ok, TPid} = diameter_sctp_sup:start_child(Arg),
- MRef = erlang:monitor(process, TPid),
- ets:insert(T, [{MRef, Id}, {Id, TPid}]),
- ets:insert(Q, {TPid, now()}),
- {TPid, S#listener{pending = {N+1, Q}}};
+ {find(Id, S), S#listener{pending = {N+1,Q}}};
%% Known association ...
f([{_, TPid}], _, S) ->
@@ -755,6 +784,31 @@ f([{_, TPid}], _, S) ->
f([], _, _) ->
false.
+%% find/2
+
+%% Transport waiting for an association: use it.
+find(Id, #listener{tmap = T,
+ pending = {N,Q}})
+ when N < 0 ->
+ TPid = ets:first(Q),
+ [{TPid, MRef}] = ets:lookup(T, TPid),
+ ets:insert(T, [{MRef, Id}, {Id, TPid}]),
+ ets:delete(T, TPid),
+ ets:delete(Q, TPid),
+ TPid;
+
+%% No transport start yet: spawn one and queue.
+find(Id, #listener{ref = Ref,
+ socket = Sock,
+ tmap = T,
+ pending = {_,Q}}) ->
+ Arg = {accept, Ref, self(), Sock, Id},
+ {ok, TPid} = diameter_sctp_sup:start_child(Arg),
+ MRef = monitor(process, TPid),
+ ets:insert(T, [{MRef, Id}, {Id, TPid}]),
+ ets:insert(Q, {TPid, now()}),
+ TPid.
+
%% assoc_id/1
assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) ->
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 4d1b8bec51..0b26f429fb 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2013. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -37,7 +37,8 @@
code_change/3,
terminate/2]).
--export([info/1]). %% service_info callback
+-export([listener/1,%% diameter_sync callback
+ info/1]). %% service_info callback
-export([ports/0,
ports/1]).
@@ -191,7 +192,7 @@ init(T) ->
i({T, Ref, Mod, Pid, Opts, Addrs})
when T == accept;
T == connect ->
- erlang:monitor(process, Pid),
+ monitor(process, Pid),
%% Since accept/connect might block indefinitely, spawn a process
%% that does nothing but kill us with the parent until call
%% returns.
@@ -218,8 +219,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs})
%% A monitor process to kill the transport if the parent dies.
i(#monitor{parent = Pid, transport = TPid} = S) ->
proc_lib:init_ack({ok, self()}),
- erlang:monitor(process, Pid),
- erlang:monitor(process, TPid),
+ monitor(process, Pid),
+ monitor(process, TPid),
S;
%% In principle a link between the transport and killer processes
%% could do the same thing: have the accepting/connecting process be
@@ -235,7 +236,7 @@ i({listen, LRef, APid, {Mod, Opts, Addrs}}) ->
LAddr = laddr(LAddrOpt, Mod, LSock),
true = diameter_reg:add_new({?MODULE, listener, {LRef, {LAddr, LSock}}}),
proc_lib:init_ack({ok, self(), {LAddr, LSock}}),
- erlang:monitor(process, APid),
+ monitor(process, APid),
start_timer(#listener{socket = LSock}).
laddr([], Mod, Sock) ->
@@ -336,17 +337,25 @@ accept(Opts) ->
%% listener/2
+%% Accepting processes can be started concurrently: ensure only one
+%% listener is started.
listener(LRef, T) ->
- l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T).
+ diameter_sync:call({?MODULE, listener, LRef},
+ {?MODULE, listener, [{LRef, T, self()}]},
+ infinity,
+ infinity).
-%% Existing process with the listening socket ...
-l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
- LPid ! {accept, self()},
+listener({LRef, T, TPid}) ->
+ l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T, TPid).
+
+%% Existing listening process ...
+l([{{?MODULE, listener, {_, AS}}, LPid}], _, _, TPid) ->
+ LPid ! {accept, TPid},
AS;
-%% ... or not: start one.
-l([], LRef, T) ->
- {ok, _, AS} = diameter_tcp_sup:start_child({listen, LRef, self(), T}),
+%% ... or not.
+l([], LRef, T, TPid) ->
+ {ok, _, AS} = diameter_tcp_sup:start_child({listen, LRef, TPid, T}),
AS.
%% get_addr/1
@@ -502,7 +511,7 @@ m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid,
%% Another accept transport is attaching.
l({accept, TPid}, #listener{count = N} = S) ->
- erlang:monitor(process, TPid),
+ monitor(process, TPid),
S#listener{count = N+1};
%% Accepting process has died.
diff --git a/lib/diameter/src/transport/diameter_transport_sup.erl b/lib/diameter/src/transport/diameter_transport_sup.erl
index 6457ab78b0..284a41a752 100644
--- a/lib/diameter/src/transport/diameter_transport_sup.erl
+++ b/lib/diameter/src/transport/diameter_transport_sup.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2015. All Rights Reserved.
%%
%% The contents of this file are subject to the Erlang Public License,
%% Version 1.1, (the "License"); you may not use this file except in
@@ -54,7 +54,7 @@ start_child(Name, Module) ->
Spec = {Name,
{Module, start_link, [Name]},
permanent,
- 1000,
+ infinity,
supervisor,
[Module]},
supervisor:start_child(?MODULE, Spec).