diff options
author | Anders Svensson <[email protected]> | 2015-03-05 01:34:41 +0100 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2015-03-05 01:34:41 +0100 |
commit | ac82610a03c8b17b6bcae91aa4252a1930c51e85 (patch) | |
tree | 23c9bd58e6792ed656af6889cafbda8805377b55 /lib/diameter/src | |
parent | 2456cf80733d3ee5d1c0dca3c52366166d8ad2b6 (diff) | |
parent | 71b52f31772210a7160317966ca46e36140b935a (diff) | |
download | otp-ac82610a03c8b17b6bcae91aa4252a1930c51e85.tar.gz otp-ac82610a03c8b17b6bcae91aa4252a1930c51e85.tar.bz2 otp-ac82610a03c8b17b6bcae91aa4252a1930c51e85.zip |
Merge branch 'maint'
Diffstat (limited to 'lib/diameter/src')
-rw-r--r-- | lib/diameter/src/base/diameter.erl | 3 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_config.erl | 10 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_lib.erl | 140 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer.erl | 6 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_reg.erl | 7 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 88 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_session.erl | 4 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_stats.erl | 6 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_sync.erl | 5 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 5 | ||||
-rw-r--r-- | lib/diameter/src/modules.mk | 4 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 272 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 37 |
13 files changed, 409 insertions, 178 deletions
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index d74e091e11..1bbdf6e34d 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2014. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -337,6 +337,7 @@ call(SvcName, App, Message) -> :: {transport_module, atom()} | {transport_config, any()} | {transport_config, any(), 'Unsigned32'() | infinity} + | {pool_size, pos_integer()} | {applications, [app_alias()]} | {capabilities, [capability()]} | {capabilities_cb, evaluable()} diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index dd1c9b73bb..c0a4f7df69 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2014. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -35,10 +35,11 @@ %% -module(diameter_config). --compile({no_auto_import, [monitor/2]}). - -behaviour(gen_server). +-compile({no_auto_import, [monitor/2, now/0]}). +-import(diameter_lib, [now/0]). + -export([start_service/2, stop_service/1, add_transport/2, @@ -554,6 +555,9 @@ opt({watchdog_config, L}) -> opt({spawn_opt, Opts}) -> is_list(Opts); +opt({pool_size, N}) -> + is_integer(N) andalso 0 < N; + %% Options that we can't validate. opt({K, _}) when K == transport_config; diff --git a/lib/diameter/src/base/diameter_lib.erl b/lib/diameter/src/base/diameter_lib.erl index b5f1ae3937..d0d730f47c 100644 --- a/lib/diameter/src/base/diameter_lib.erl +++ b/lib/diameter/src/base/diameter_lib.erl @@ -18,12 +18,18 @@ %% -module(diameter_lib). +-compile({no_auto_import, [now/0]}). -export([info_report/2, error_report/2, warning_report/2, + now/0, + timestamp/1, now_diff/1, + micro_diff/1, + micro_diff/2, time/1, + seed/0, eval/1, eval_name/1, get_stacktrace/0, @@ -31,6 +37,8 @@ spawn_opts/2, wait/1, fold_tuple/3, + fold_n/3, + for_n/2, log/4]). %% --------------------------------------------------------------------------- @@ -90,13 +98,50 @@ fmt(T) -> end. %% --------------------------------------------------------------------------- +%% # now/0 +%% --------------------------------------------------------------------------- + +-type timestamp() :: {non_neg_integer(), 0..999999, 0..999999}. +-type now() :: integer() %% monotonic time + | timestamp(). + +-spec now() + -> now(). + +%% Use monotonic time if it exists, fall back to erlang:now() +%% otherwise. + +now() -> + try + erlang:monotonic_time() + catch + error: undef -> erlang:now() + end. + +%% --------------------------------------------------------------------------- +%% # timestamp/1 +%% --------------------------------------------------------------------------- + +-spec timestamp(NowT :: now()) + -> timestamp(). + +timestamp({_,_,_} = T) -> %% erlang:now() + T; + +timestamp(MonoT) -> %% monotonic time + MicroSecs = erlang:convert_time_resolution(MonoT + erlang:time_offset(), + erlang:time_resolution(), + 1000000), + Secs = MicroSecs div 1000000, + {Secs div 1000000, Secs rem 1000000, MicroSecs rem 1000000}. + +%% --------------------------------------------------------------------------- %% # now_diff/1 %% --------------------------------------------------------------------------- --spec now_diff(NowT) +-spec now_diff(NowT :: now()) -> {Hours, Mins, Secs, MicroSecs} - when NowT :: {non_neg_integer(), 0..999999, 0..999999}, - Hours :: non_neg_integer(), + when Hours :: non_neg_integer(), Mins :: 0..59, Secs :: 0..59, MicroSecs :: 0..999999. @@ -104,8 +149,41 @@ fmt(T) -> %% Return timer:now_diff(now(), NowT) as an {H, M, S, MicroS} tuple %% instead of as integer microseconds. -now_diff({_,_,_} = Time) -> - time(timer:now_diff(now(), Time)). +now_diff(Time) -> + time(micro_diff(Time)). + +%% --------------------------------------------------------------------------- +%% # micro_diff/1 +%% --------------------------------------------------------------------------- + +-spec micro_diff(NowT :: now()) + -> MicroSecs + when MicroSecs :: non_neg_integer(). + +micro_diff({_,_,_} = T0) -> + timer:now_diff(erlang:now(), T0); + +micro_diff(T0) -> %% monotonic time + erlang:convert_time_resolution(erlang:monotonic_time() - T0, + erlang:time_resolution(), + 1000000). + +%% --------------------------------------------------------------------------- +%% # micro_diff/2 +%% --------------------------------------------------------------------------- + +-spec micro_diff(T1 :: now(), T0 :: now()) + -> MicroSecs + when MicroSecs :: non_neg_integer(). + +micro_diff(T1, T0) + when is_integer(T1), is_integer(T0) -> %% monotonic time + erlang:convert_time_resolution(T1 - T0, + erlang:time_resolution(), + 1000000); + +micro_diff(T1, T0) -> %% at least one erlang:now() + timer:now_diff(timestamp(T1), timestamp(T0)). %% --------------------------------------------------------------------------- %% # time/1 @@ -115,7 +193,7 @@ now_diff({_,_,_} = Time) -> -spec time(NowT | Diff) -> {Hours, Mins, Secs, MicroSecs} - when NowT :: {non_neg_integer(), 0..999999, 0..999999}, + when NowT :: timestamp(), Diff :: non_neg_integer(), Hours :: non_neg_integer(), Mins :: 0..59, @@ -134,6 +212,27 @@ time(Micro) -> %% elapsed time {H, M, S, Micro rem 1000000}. %% --------------------------------------------------------------------------- +%% # seed/0 +%% --------------------------------------------------------------------------- + +-spec seed() + -> {timestamp(), {integer(), integer(), integer()}}. + +%% Return an argument for random:seed/1. + +seed() -> + T = now(), + {timestamp(T), seed(T)}. + +%% seed/1 + +seed({_,_,_} = T) -> + T; + +seed(T) -> %% monotonic time + {erlang:phash2(node()), T, erlang:unique_integer()}. + +%% --------------------------------------------------------------------------- %% # eval/1 %% %% Evaluate a function in various forms. @@ -292,6 +391,35 @@ ft(Value, {Idx, T}) -> setelement(Idx, T, Value). %% --------------------------------------------------------------------------- +%% # fold_n/3 +%% --------------------------------------------------------------------------- + +-spec fold_n(F, Acc0, N) + -> term() + when F :: fun((non_neg_integer(), term()) -> term()), + Acc0 :: term(), + N :: non_neg_integer(). + +fold_n(F, Acc, N) + when is_integer(N), 0 < N -> + fold_n(F, F(N, Acc), N-1); + +fold_n(_, Acc, _) -> + Acc. + +%% --------------------------------------------------------------------------- +%% # for_n/2 +%% --------------------------------------------------------------------------- + +-spec for_n(F, N) + -> non_neg_integer() + when F :: fun((non_neg_integer()) -> term()), + N :: non_neg_integer(). + +for_n(F, N) -> + fold_n(fun(M,A) -> F(M), A+1 end, 0, N). + +%% --------------------------------------------------------------------------- %% # log/4 %% %% Called to have something to trace on for happenings of interest. diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index e5d4b28766..ea326dd03e 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -18,9 +18,11 @@ %% -module(diameter_peer). - -behaviour(gen_server). +-compile({no_auto_import, [now/0]}). +-import(diameter_lib, [now/0]). + %% Interface towards transport modules ... -export([recv/2, up/1, diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 3197c1aee1..f785777874 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -22,10 +22,11 @@ %% -module(diameter_reg). --compile({no_auto_import, [monitor/2]}). - -behaviour(gen_server). +-compile({no_auto_import, [monitor/2, now/0]}). +-import(diameter_lib, [now/0]). + -export([add/1, add_new/1, del/1, diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index f4afa6eb94..04401a3d87 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -24,6 +24,9 @@ -module(diameter_service). -behaviour(gen_server). +-compile({no_auto_import, [now/0]}). +-import(diameter_lib, [now/0]). + %% towards diameter_service_sup -export([start_link/1]). @@ -766,8 +769,9 @@ reason(failure) -> start(Ref, {T, Opts}, S) when T == connect; T == listen -> + N = proplists:get_value(pool_size, Opts, 1), try - {ok, start(Ref, type(T), Opts, S)} + {ok, start(Ref, type(T), Opts, N, S)} catch ?FAILURE(Reason) -> {error, Reason} @@ -785,11 +789,16 @@ type(connect = T) -> T. %% start/4 -start(Ref, Type, Opts, #state{watchdogT = WatchdogT, - peerT = PeerT, - options = SvcOpts, - service_name = SvcName, - service = Svc0}) +start(Ref, Type, Opts, State) -> + start(Ref, Type, Opts, 1, State). + +%% start/5 + +start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT, + peerT = PeerT, + options = SvcOpts, + service_name = SvcName, + service = Svc0}) when Type == connect; Type == accept -> #diameter_service{applications = Apps} @@ -797,14 +806,19 @@ start(Ref, Type, Opts, #state{watchdogT = WatchdogT, = merge_service(Opts, Svc0), {_,_} = Mask = proplists:get_value(sequence, SvcOpts), RecvData = diameter_traffic:make_recvdata([SvcName, PeerT, Apps, Mask]), - Pid = s(Type, Ref, {{spawn_opts([Opts, SvcOpts]), RecvData}, - Opts, - SvcOpts, - Svc}), - insert(WatchdogT, #watchdog{pid = Pid, - type = Type, - ref = Ref, - options = Opts}), + T = {{spawn_opts([Opts, SvcOpts]), RecvData}, Opts, SvcOpts, Svc}, + Rec = #watchdog{type = Type, + ref = Ref, + options = Opts}, + diameter_lib:fold_n(fun(_,A) -> + [wd(Type, Ref, T, WatchdogT, Rec) | A] + end, + [], + N). + +wd(Type, Ref, T, WatchdogT, Rec) -> + Pid = wd(Type, Ref, T), + insert(WatchdogT, Rec#watchdog{pid = Pid}), Pid. %% Note that the service record passed into the watchdog is the merged @@ -817,7 +831,7 @@ spawn_opts(Optss) -> T /= link, T /= monitor]. -s(Type, Ref, T) -> +wd(Type, Ref, T) -> {_MRef, Pid} = diameter_watchdog:start({Type, Ref}, T), Pid. @@ -1186,7 +1200,7 @@ connect_timer(Opts, Def0) -> %% continuous restarted in case of faulty config or other problems. tc(Time, Tc) -> choose(Tc > ?RESTART_TC - orelse timer:now_diff(now(), Time) > 1000*?RESTART_TC, + orelse diameter_lib:micro_diff(Time) > 1000*?RESTART_TC, Tc, ?RESTART_TC). @@ -1719,31 +1733,43 @@ info_transport(S) -> [], PeerD). -%% Only a config entry for a listening transport: use it. -transport([[{type, listen}, _] = L]) -> - L ++ [{accept, []}]; - -%% Only one config or peer entry for a connecting transport: use it. -transport([[{type, connect} | _] = L]) -> - L; +%% Single config entry. Distinguish between pool_size config or not on +%% a connecting transport for backwards compatibility: with the option +%% the form is similar to the listening case, with connections grouped +%% in a pool tuple (for lack of a better name), without as before. +transport([[{type, Type}, {options, Opts}] = L]) + when Type == listen; + Type == connect -> + L ++ [{K, []} || [{_,K}] <- [keys(Type, Opts)]]; %% Peer entries: discard config. Note that the peer entries have %% length at least 3. transport([[_,_] | L]) -> transport(L); -%% Possibly many peer entries for a listening transport. Note that all -%% have the same options by construction, which is not terribly space -%% efficient. -transport([[{type, accept}, {options, Opts} | _] | _] = Ls) -> - [{type, listen}, +%% Multiple tranports. Note that all have the same options by +%% construction, which is not terribly space efficient. +transport([[{type, Type}, {options, Opts} | _] | _] = Ls) -> + transport(keys(Type, Opts), Ls). + +%% Group transports in an accept or pool tuple ... +transport([{Type, Key}], [[{type, _}, {options, Opts} | _] | _] = Ls) -> + [{type, Type}, {options, Opts}, - {accept, [lists:nthtail(2,L) || L <- Ls]}]. + {Key, [tl(tl(L)) || L <- Ls]}]; + +%% ... or not: there can only be one. +transport([], [L]) -> + L. + +keys(connect = T, Opts) -> + [{T, pool} || lists:keymember(pool_size, 1, Opts)]; +keys(_, _) -> + [{listen, accept}]. peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) -> try ets:tab2list(WatchdogT) of - L -> - lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L) + L -> lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L) catch error: badarg -> Dict0 %% service has gone down end. diff --git a/lib/diameter/src/base/diameter_session.erl b/lib/diameter/src/base/diameter_session.erl index 3b236f109a..c5ea0428b5 100644 --- a/lib/diameter/src/base/diameter_session.erl +++ b/lib/diameter/src/base/diameter_session.erl @@ -157,8 +157,8 @@ session_id(Host) -> %% --------------------------------------------------------------------------- init() -> - Now = now(), - random:seed(Now), + {Now, Seed} = diameter_lib:seed(), + random:seed(Seed), Time = time32(Now), Seq = (?INT32 band (Time bsl 20)) bor (random:uniform(1 bsl 20) - 1), ets:insert(diameter_sequence, [{origin_state_id, Time}, diff --git a/lib/diameter/src/base/diameter_stats.erl b/lib/diameter/src/base/diameter_stats.erl index 8353613d32..64ea082be0 100644 --- a/lib/diameter/src/base/diameter_stats.erl +++ b/lib/diameter/src/base/diameter_stats.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2014. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -22,9 +22,11 @@ %% -module(diameter_stats). - -behaviour(gen_server). +-compile({no_auto_import, [now/0]}). +-import(diameter_lib, [now/0]). + -export([reg/2, reg/1, incr/3, incr/1, read/1, diff --git a/lib/diameter/src/base/diameter_sync.erl b/lib/diameter/src/base/diameter_sync.erl index ce2db4b3a2..90eabece3d 100644 --- a/lib/diameter/src/base/diameter_sync.erl +++ b/lib/diameter/src/base/diameter_sync.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -27,6 +27,9 @@ -module(diameter_sync). -behaviour(gen_server). +-compile({no_auto_import, [now/0]}). +-import(diameter_lib, [now/0]). + -export([call/4, call/5, cast/4, cast/5, carp/1, carp/2]). diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index b7f2d24941..67715906e8 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2014. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -122,7 +122,8 @@ i({Ack, T, Pid, {RecvData, = Svc}}) -> erlang:monitor(process, Pid), wait(Ack, Pid), - random:seed(now()), + {_, Seed} = diameter_lib:seed(), + random:seed(Seed), putr(restart, {T, Opts, Svc}), %% save seeing it in trace putr(dwr, dwr(Caps)), %% {_,_} = Mask = proplists:get_value(sequence, SvcOpts), diff --git a/lib/diameter/src/modules.mk b/lib/diameter/src/modules.mk index a2a7a51892..c9dd4e683a 100644 --- a/lib/diameter/src/modules.mk +++ b/lib/diameter/src/modules.mk @@ -1,7 +1,7 @@ # %CopyrightBegin% # -# Copyright Ericsson AB 2010-2014. All Rights Reserved. +# Copyright Ericsson AB 2010-2015. All Rights Reserved. # # The contents of this file are subject to the Erlang Public License, # Version 1.1, (the "License"); you may not use this file except in @@ -94,7 +94,7 @@ BINS = \ # Released files relative to ../examples. EXAMPLES = \ code/GNUmakefile \ - code/peer.erl \ + code/node.erl \ code/client.erl \ code/client_cb.erl \ code/server.erl \ diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 32e7aaca39..2c8d6f0a14 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2014. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -18,9 +18,11 @@ %% -module(diameter_sctp). - -behaviour(gen_server). +-compile({no_auto_import, [now/0]}). +-import(diameter_lib, [now/0]). + %% interface -export([start/3]). @@ -37,7 +39,8 @@ code_change/3, terminate/2]). --export([info/1]). %% service_info callback +-export([listener/1,%% diameter_sync callback + info/1]). %% service_info callback -export([ports/0, ports/1]). @@ -99,22 +102,31 @@ -record(listener, {ref :: reference(), socket :: gen_sctp:sctp_socket(), - count = 0 :: uint(), + count = 0 :: uint(), %% attached transport processes tmap = ets:new(?MODULE, []) :: ets:tid(), %% {MRef, Pid|AssocId}, {AssocId, Pid} pending = {0, ets:new(?MODULE, [ordered_set])}, tref :: reference(), accept :: [match()]}). %% Field tmap is used to map an incoming message or event to the -%% relevent transport process. Field pending implements a queue of -%% transport processes to which an association has been assigned (at -%% comm_up and written into tmap) but for which diameter hasn't yet -%% spawned a transport process: a short-lived state of affairs as a -%% new transport is spawned as a consequence of a peer being taken up, -%% transport processes being spawned by the listener on demand. In -%% case diameter starts a transport before comm_up on a new -%% association, pending is set to an improper list with the spawned -%% transport as head and the queue as tail. +%% relevant transport process. Field pending implements two queues: +%% the first of transport-to-be processes to which an association has +%% been assigned (at comm_up and written into tmap) but for which +%% diameter hasn't yet spawned a transport process, a short-lived +%% state of affairs as a new transport is spawned as a consequence of +%% a peer being taken up, transport processes being spawned by the +%% listener on demand; the second of started transport processes that +%% have not yet been assigned an association. +%% +%% When diameter calls start/3, the transport process is either taken +%% from the first queue or spawned and placed in the second queue +%% until an association is established. When an association is +%% established, a controlling process is either taken from the second +%% queue or spawned and placed in the first queue. Thus, there are +%% only elements in one queue at a time, so share an ets table queue +%% and tag it with a positive length if it contains the first queue, a +%% negative length if it contains the second queue. The case -1 is +%% handled differently for backwards compatibility reasons. %% --------------------------------------------------------------------------- %% # start/3 @@ -139,9 +151,9 @@ ip(T) -> T. %% A listener spawns transports either as a consequence of this call -%% when there is not yet an association to associate with it, or at -%% comm_up on a new association in which case the call retrieves a -%% transport from the pending queue. +%% when there is not yet an association to assign it, or at comm_up on +%% a new association in which case the call retrieves a transport from +%% the pending queue. s({accept, Ref} = A, Addrs, Opts) -> {LPid, LAs} = listener(Ref, {Opts, Addrs}), try gen_server:call(LPid, {A, self()}, infinity) of @@ -226,7 +238,7 @@ i({connect, Pid, Opts, Addrs, Ref}) -> {LAs, Sock} = open(Addrs, Rest, 0), putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self(), LAs}), - erlang:monitor(process, Pid), + monitor(process, Pid), #transport{parent = Pid, mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock}; @@ -236,8 +248,8 @@ i({accept, Pid, LPid, Sock, Ref}) when is_pid(Pid) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), - erlang:monitor(process, Pid), - erlang:monitor(process, LPid), + monitor(process, Pid), + monitor(process, LPid), #transport{parent = Pid, mode = {accept, LPid}, socket = Sock}; @@ -246,7 +258,7 @@ i({accept, Pid, LPid, Sock, Ref}) i({accept, Ref, LPid, Sock, Id}) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), - MRef = erlang:monitor(process, LPid), + MRef = monitor(process, LPid), %% Wait for a signal that the transport has been started before %% processing other messages. receive @@ -270,15 +282,23 @@ close(Sock, Id) -> %% listener/2 +%% Accepting processes can be started concurrently: ensure only one +%% listener is started. listener(LRef, T) -> + diameter_sync:call({?MODULE, listener, LRef}, + {?MODULE, listener, [{LRef, T}]}, + infinity, + infinity). + +listener({LRef, T}) -> l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T). -%% Existing process with the listening socket ... +%% Existing listening process ... l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> - {LAs, _Sock} = AS, - {LPid, LAs}; - -%% ... or not: start one. + {LAs, _Sock} = AS, + {LPid, LAs}; + +%% ... or not. l([], LRef, T) -> {ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}), {LPid, LAs}. @@ -347,11 +367,17 @@ type(T) -> %% # handle_call/3 %% --------------------------------------------------------------------------- +handle_call(T, From, #listener{pending = L} = S) + when is_list(L) -> + handle_call(T, From, upgrade(S)); + handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref, - count = N} + pending = {N,Q}, + count = K} = S) -> - {TPid, NewS} = accept(Ref, Pid, S), - {reply, {ok, TPid}, NewS#listener{count = N+1}}; + TPid = accept(Ref, Pid, S), + {reply, {ok, TPid}, downgrade(S#listener{pending = {N-1,Q}, + count = K+1})}; handle_call(_, _, State) -> {reply, nok, State}. @@ -370,8 +396,46 @@ handle_cast(_, State) -> handle_info(T, #transport{} = S) -> {noreply, #transport{} = t(T,S)}; +handle_info(T, #listener{pending = L} = S) + when is_list(L) -> + handle_info(T, upgrade(S)); + handle_info(T, #listener{} = S) -> - {noreply, #listener{} = l(T,S)}. + {noreply, downgrade(#listener{} = l(T,S))}. + +%% upgrade/1 + +upgrade(#listener{pending = [TPid | {0,Q}]} = S) -> + ets:insert(Q, {TPid, now()}), + S#listener{pending = {-1,Q}}. +%% Prior to the possiblity of setting pool_size on in transport +%% configuration, a new accepting transport was only started following +%% the death of a predecessor, so that there was only at most one +%% previously started transport process waiting for an association. +%% This assumption no longer holds with pool_size > 1, in which case +%% several accepting transports are started concurrently. Deal with +%% this by placing the started transports in a new queue of transport +%% processes waiting for an association. +%% +%% Since only one of this queue and the existing queue of controlling +%% processes waiting for a transport to be started can be non-empty at +%% any given time, implement both queues in the same ets table. The +%% absolute value of the first element of the 2-tuple is the queue +%% length, the sign says which queue it is. + +%% downgrade/1 +%% +%% Revert to the pre-pool_size representation when possible, for +%% backwards compatibility in the case that the pool_size option +%% hasn't been used. + +downgrade(#listener{pending = {-1,Q}} = S) -> + TPid = ets:first(Q), + ets:delete(Q, TPid), + S#listener{pending = [TPid | {0,Q}]}; + +downgrade(S) -> + S. %% --------------------------------------------------------------------------- %% # code_change/3 @@ -436,54 +500,46 @@ l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) -> setopts(Sock) end; -%% Transport is asking message to be sent. See send/3 for why the send -%% isn't directly from the transport. -l({send, AssocId, StreamId, Bin}, #listener{socket = Sock} = S) -> - send(Sock, AssocId, StreamId, Bin), - S; +l({'DOWN', MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> + down(ets:member(Q, TPid), MRef, TPid, S); + +%% Timeout after the last accepting process has died. +l({timeout, TRef, close = T}, #listener{tref = TRef, + count = 0}) -> + x(T); +l({timeout, _, close}, #listener{} = S) -> + S. + +%% down/4 %% Accepting transport has died. One that's awaiting an association ... -l({'DOWN', MRef, process, TPid, _}, #listener{pending = [TPid | Q], - tmap = T, - count = N} - = S) -> +down(true, MRef, TPid, #listener{pending = {N,Q}, + tmap = T, + count = K} + = S) + when N < 0 -> + ets:delete(Q, TPid), ets:delete(T, MRef), ets:delete(T, TPid), - start_timer(S#listener{count = N-1, - pending = Q}); - -%% ... ditto and a new transport has already been started ... -l({'DOWN', _, process, _, _} = T, #listener{pending = [TPid | Q]} - = S) -> - #listener{pending = NQ} - = NewS - = l(T, S#listener{pending = Q}), - NewS#listener{pending = [TPid | NQ]}; - -%% ... or not. -l({'DOWN', MRef, process, TPid, _}, #listener{socket = Sock, - tmap = T, - count = N, - pending = {P,Q}} - = S) -> + start_timer(S#listener{count = K-1, + pending = {N+1,Q}}); + +%% ... or one that already has one. +down(B, MRef, TPid, #listener{socket = Sock, + tmap = T, + count = K, + pending = {N,Q}} + = S) -> [{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId ets:delete(T, MRef), ets:delete(T, Id), Id == TPid orelse close(Sock, Id), - case ets:lookup(Q, TPid) of - [{TPid, _}] -> %% transport in the pending queue ... + if B -> %% Waiting for attachment in the pending queue ... ets:delete(Q, TPid), - S#listener{pending = {P-1, Q}}; - [] -> %% ... or not - start_timer(S#listener{count = N-1}) - end; - -%% Timeout after the last accepting process has died. -l({timeout, TRef, close = T}, #listener{tref = TRef, - count = 0}) -> - x(T); -l({timeout, _, close}, #listener{} = S) -> - S. + S#listener{pending = {N-1,Q}}; + true -> %% ... or already attached + start_timer(S#listener{count = K-1}) + end. %% t/2 %% @@ -582,29 +638,24 @@ accept(Opts) -> %% No pending associations: spawn a new transport. accept(Ref, Pid, #listener{socket = Sock, tmap = T, - pending = {0,_} = Q} - = S) -> + pending = {N,Q}}) + when N =< 0 -> Arg = {accept, Pid, self(), Sock, Ref}, {ok, TPid} = diameter_sctp_sup:start_child(Arg), - MRef = erlang:monitor(process, TPid), + MRef = monitor(process, TPid), ets:insert(T, [{MRef, TPid}, {TPid, MRef}]), - {TPid, S#listener{pending = [TPid | Q]}}; -%% Placing the transport in the pending field makes it available to -%% the next association. The stack starts a new accepting transport -%% only after this one brings the connection up (or dies). - -%% Accepting transport has died. This can happen if a new transport is -%% started before the DOWN has arrived. -accept(Ref, Pid, #listener{pending = [TPid | {0,_} = Q]} = S) -> - false = is_process_alive(TPid), %% assert - accept(Ref, Pid, S#listener{pending = Q}); + ets:insert(Q, {TPid, now()}), + TPid; +%% Placing the transport in the second pending table makes it +%% available to the next association. %% Pending associations: attach to the first in the queue. -accept(_, Pid, #listener{ref = Ref, pending = {N,Q}} = S) -> +accept(_, Pid, #listener{ref = Ref, + pending = {_,Q}}) -> TPid = ets:first(Q), TPid ! {Ref, Pid}, ets:delete(Q, TPid), - {TPid, S#listener{pending = {N-1, Q}}}. + TPid. %% send/2 @@ -718,34 +769,12 @@ up(#transport{parent = Pid, find(Id, Data, #listener{tmap = T} = S) -> f(ets:lookup(T, Id), Data, S). -%% New association and a transport waiting for one: use it. -f([], - {_, #sctp_assoc_change{state = comm_up, - assoc_id = Id}}, - #listener{tmap = T, - pending = [TPid | {_,_} = Q]} - = S) -> - [{TPid, MRef}] = ets:lookup(T, TPid), - ets:insert(T, [{MRef, Id}, {Id, TPid}]), - ets:delete(T, TPid), - {TPid, S#listener{pending = Q}}; - -%% New association and no transport start yet: spawn one and place it -%% in the queue. +%% New association ... f([], - {_, #sctp_assoc_change{state = comm_up, - assoc_id = Id}}, - #listener{ref = Ref, - socket = Sock, - tmap = T, - pending = {N,Q}} + {_, #sctp_assoc_change{state = comm_up, assoc_id = Id}}, + #listener{pending = {N,Q}} = S) -> - Arg = {accept, Ref, self(), Sock, Id}, - {ok, TPid} = diameter_sctp_sup:start_child(Arg), - MRef = erlang:monitor(process, TPid), - ets:insert(T, [{MRef, Id}, {Id, TPid}]), - ets:insert(Q, {TPid, now()}), - {TPid, S#listener{pending = {N+1, Q}}}; + {find(Id, S), S#listener{pending = {N+1,Q}}}; %% Known association ... f([{_, TPid}], _, S) -> @@ -755,6 +784,31 @@ f([{_, TPid}], _, S) -> f([], _, _) -> false. +%% find/2 + +%% Transport waiting for an association: use it. +find(Id, #listener{tmap = T, + pending = {N,Q}}) + when N < 0 -> + TPid = ets:first(Q), + [{TPid, MRef}] = ets:lookup(T, TPid), + ets:insert(T, [{MRef, Id}, {Id, TPid}]), + ets:delete(T, TPid), + ets:delete(Q, TPid), + TPid; + +%% No transport start yet: spawn one and queue. +find(Id, #listener{ref = Ref, + socket = Sock, + tmap = T, + pending = {_,Q}}) -> + Arg = {accept, Ref, self(), Sock, Id}, + {ok, TPid} = diameter_sctp_sup:start_child(Arg), + MRef = monitor(process, TPid), + ets:insert(T, [{MRef, Id}, {Id, TPid}]), + ets:insert(Q, {TPid, now()}), + TPid. + %% assoc_id/1 assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) -> diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 4d1b8bec51..0b26f429fb 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -37,7 +37,8 @@ code_change/3, terminate/2]). --export([info/1]). %% service_info callback +-export([listener/1,%% diameter_sync callback + info/1]). %% service_info callback -export([ports/0, ports/1]). @@ -191,7 +192,7 @@ init(T) -> i({T, Ref, Mod, Pid, Opts, Addrs}) when T == accept; T == connect -> - erlang:monitor(process, Pid), + monitor(process, Pid), %% Since accept/connect might block indefinitely, spawn a process %% that does nothing but kill us with the parent until call %% returns. @@ -218,8 +219,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) %% A monitor process to kill the transport if the parent dies. i(#monitor{parent = Pid, transport = TPid} = S) -> proc_lib:init_ack({ok, self()}), - erlang:monitor(process, Pid), - erlang:monitor(process, TPid), + monitor(process, Pid), + monitor(process, TPid), S; %% In principle a link between the transport and killer processes %% could do the same thing: have the accepting/connecting process be @@ -235,7 +236,7 @@ i({listen, LRef, APid, {Mod, Opts, Addrs}}) -> LAddr = laddr(LAddrOpt, Mod, LSock), true = diameter_reg:add_new({?MODULE, listener, {LRef, {LAddr, LSock}}}), proc_lib:init_ack({ok, self(), {LAddr, LSock}}), - erlang:monitor(process, APid), + monitor(process, APid), start_timer(#listener{socket = LSock}). laddr([], Mod, Sock) -> @@ -336,17 +337,25 @@ accept(Opts) -> %% listener/2 +%% Accepting processes can be started concurrently: ensure only one +%% listener is started. listener(LRef, T) -> - l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T). + diameter_sync:call({?MODULE, listener, LRef}, + {?MODULE, listener, [{LRef, T, self()}]}, + infinity, + infinity). -%% Existing process with the listening socket ... -l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> - LPid ! {accept, self()}, +listener({LRef, T, TPid}) -> + l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T, TPid). + +%% Existing listening process ... +l([{{?MODULE, listener, {_, AS}}, LPid}], _, _, TPid) -> + LPid ! {accept, TPid}, AS; -%% ... or not: start one. -l([], LRef, T) -> - {ok, _, AS} = diameter_tcp_sup:start_child({listen, LRef, self(), T}), +%% ... or not. +l([], LRef, T, TPid) -> + {ok, _, AS} = diameter_tcp_sup:start_child({listen, LRef, TPid, T}), AS. %% get_addr/1 @@ -502,7 +511,7 @@ m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid, %% Another accept transport is attaching. l({accept, TPid}, #listener{count = N} = S) -> - erlang:monitor(process, TPid), + monitor(process, TPid), S#listener{count = N+1}; %% Accepting process has died. |