diff options
Diffstat (limited to 'lib/diameter/src')
-rw-r--r-- | lib/diameter/src/base/diameter.erl | 3 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_config.erl | 5 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_lib.erl | 31 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_service.erl | 83 | ||||
-rw-r--r-- | lib/diameter/src/modules.mk | 4 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 268 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 37 |
7 files changed, 275 insertions, 156 deletions
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl index d74e091e11..1bbdf6e34d 100644 --- a/lib/diameter/src/base/diameter.erl +++ b/lib/diameter/src/base/diameter.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2014. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -337,6 +337,7 @@ call(SvcName, App, Message) -> :: {transport_module, atom()} | {transport_config, any()} | {transport_config, any(), 'Unsigned32'() | infinity} + | {pool_size, pos_integer()} | {applications, [app_alias()]} | {capabilities, [capability()]} | {capabilities_cb, evaluable()} diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index dd1c9b73bb..270faa9542 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2014. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -554,6 +554,9 @@ opt({watchdog_config, L}) -> opt({spawn_opt, Opts}) -> is_list(Opts); +opt({pool_size, N}) -> + is_integer(N) andalso 0 < N; + %% Options that we can't validate. opt({K, _}) when K == transport_config; diff --git a/lib/diameter/src/base/diameter_lib.erl b/lib/diameter/src/base/diameter_lib.erl index b5f1ae3937..cd88619b66 100644 --- a/lib/diameter/src/base/diameter_lib.erl +++ b/lib/diameter/src/base/diameter_lib.erl @@ -31,6 +31,8 @@ spawn_opts/2, wait/1, fold_tuple/3, + fold_n/3, + for_n/2, log/4]). %% --------------------------------------------------------------------------- @@ -292,6 +294,35 @@ ft(Value, {Idx, T}) -> setelement(Idx, T, Value). %% --------------------------------------------------------------------------- +%% # fold_n/3 +%% --------------------------------------------------------------------------- + +-spec fold_n(F, Acc0, N) + -> term() + when F :: fun((non_neg_integer(), term()) -> term()), + Acc0 :: term(), + N :: non_neg_integer(). + +fold_n(F, Acc, N) + when is_integer(N), 0 < N -> + fold_n(F, F(N, Acc), N-1); + +fold_n(_, Acc, _) -> + Acc. + +%% --------------------------------------------------------------------------- +%% # for_n/2 +%% --------------------------------------------------------------------------- + +-spec for_n(F, N) + -> non_neg_integer() + when F :: fun((non_neg_integer()) -> term()), + N :: non_neg_integer(). + +for_n(F, N) -> + fold_n(fun(M,A) -> F(M), A+1 end, 0, N). + +%% --------------------------------------------------------------------------- %% # log/4 %% %% Called to have something to trace on for happenings of interest. diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index f4afa6eb94..e1cc7bdfc9 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -766,8 +766,9 @@ reason(failure) -> start(Ref, {T, Opts}, S) when T == connect; T == listen -> + N = proplists:get_value(pool_size, Opts, 1), try - {ok, start(Ref, type(T), Opts, S)} + {ok, start(Ref, type(T), Opts, N, S)} catch ?FAILURE(Reason) -> {error, Reason} @@ -785,11 +786,16 @@ type(connect = T) -> T. %% start/4 -start(Ref, Type, Opts, #state{watchdogT = WatchdogT, - peerT = PeerT, - options = SvcOpts, - service_name = SvcName, - service = Svc0}) +start(Ref, Type, Opts, State) -> + start(Ref, Type, Opts, 1, State). + +%% start/5 + +start(Ref, Type, Opts, N, #state{watchdogT = WatchdogT, + peerT = PeerT, + options = SvcOpts, + service_name = SvcName, + service = Svc0}) when Type == connect; Type == accept -> #diameter_service{applications = Apps} @@ -797,14 +803,19 @@ start(Ref, Type, Opts, #state{watchdogT = WatchdogT, = merge_service(Opts, Svc0), {_,_} = Mask = proplists:get_value(sequence, SvcOpts), RecvData = diameter_traffic:make_recvdata([SvcName, PeerT, Apps, Mask]), - Pid = s(Type, Ref, {{spawn_opts([Opts, SvcOpts]), RecvData}, - Opts, - SvcOpts, - Svc}), - insert(WatchdogT, #watchdog{pid = Pid, - type = Type, - ref = Ref, - options = Opts}), + T = {{spawn_opts([Opts, SvcOpts]), RecvData}, Opts, SvcOpts, Svc}, + Rec = #watchdog{type = Type, + ref = Ref, + options = Opts}, + diameter_lib:fold_n(fun(_,A) -> + [wd(Type, Ref, T, WatchdogT, Rec) | A] + end, + [], + N). + +wd(Type, Ref, T, WatchdogT, Rec) -> + Pid = wd(Type, Ref, T), + insert(WatchdogT, Rec#watchdog{pid = Pid}), Pid. %% Note that the service record passed into the watchdog is the merged @@ -817,7 +828,7 @@ spawn_opts(Optss) -> T /= link, T /= monitor]. -s(Type, Ref, T) -> +wd(Type, Ref, T) -> {_MRef, Pid} = diameter_watchdog:start({Type, Ref}, T), Pid. @@ -1719,31 +1730,43 @@ info_transport(S) -> [], PeerD). -%% Only a config entry for a listening transport: use it. -transport([[{type, listen}, _] = L]) -> - L ++ [{accept, []}]; - -%% Only one config or peer entry for a connecting transport: use it. -transport([[{type, connect} | _] = L]) -> - L; +%% Single config entry. Distinguish between pool_size config or not on +%% a connecting transport for backwards compatibility: with the option +%% the form is similar to the listening case, with connections grouped +%% in a pool tuple (for lack of a better name), without as before. +transport([[{type, Type}, {options, Opts}] = L]) + when Type == listen; + Type == connect -> + L ++ [{K, []} || [{_,K}] <- [keys(Type, Opts)]]; %% Peer entries: discard config. Note that the peer entries have %% length at least 3. transport([[_,_] | L]) -> transport(L); -%% Possibly many peer entries for a listening transport. Note that all -%% have the same options by construction, which is not terribly space -%% efficient. -transport([[{type, accept}, {options, Opts} | _] | _] = Ls) -> - [{type, listen}, +%% Multiple tranports. Note that all have the same options by +%% construction, which is not terribly space efficient. +transport([[{type, Type}, {options, Opts} | _] | _] = Ls) -> + transport(keys(Type, Opts), Ls). + +%% Group transports in an accept or pool tuple ... +transport([{Type, Key}], [[{type, _}, {options, Opts} | _] | _] = Ls) -> + [{type, Type}, {options, Opts}, - {accept, [lists:nthtail(2,L) || L <- Ls]}]. + {Key, [tl(tl(L)) || L <- Ls]}]; + +%% ... or not: there can only be one. +transport([], [L]) -> + L. + +keys(connect = T, Opts) -> + [{T, pool} || lists:keymember(pool_size, 1, Opts)]; +keys(_, _) -> + [{listen, accept}]. peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) -> try ets:tab2list(WatchdogT) of - L -> - lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L) + L -> lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L) catch error: badarg -> Dict0 %% service has gone down end. diff --git a/lib/diameter/src/modules.mk b/lib/diameter/src/modules.mk index a2a7a51892..c9dd4e683a 100644 --- a/lib/diameter/src/modules.mk +++ b/lib/diameter/src/modules.mk @@ -1,7 +1,7 @@ # %CopyrightBegin% # -# Copyright Ericsson AB 2010-2014. All Rights Reserved. +# Copyright Ericsson AB 2010-2015. All Rights Reserved. # # The contents of this file are subject to the Erlang Public License, # Version 1.1, (the "License"); you may not use this file except in @@ -94,7 +94,7 @@ BINS = \ # Released files relative to ../examples. EXAMPLES = \ code/GNUmakefile \ - code/peer.erl \ + code/node.erl \ code/client.erl \ code/client_cb.erl \ code/server.erl \ diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 32e7aaca39..66cffa41c0 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2014. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -37,7 +37,8 @@ code_change/3, terminate/2]). --export([info/1]). %% service_info callback +-export([listener/1,%% diameter_sync callback + info/1]). %% service_info callback -export([ports/0, ports/1]). @@ -99,22 +100,31 @@ -record(listener, {ref :: reference(), socket :: gen_sctp:sctp_socket(), - count = 0 :: uint(), + count = 0 :: uint(), %% attached transport processes tmap = ets:new(?MODULE, []) :: ets:tid(), %% {MRef, Pid|AssocId}, {AssocId, Pid} pending = {0, ets:new(?MODULE, [ordered_set])}, tref :: reference(), accept :: [match()]}). %% Field tmap is used to map an incoming message or event to the -%% relevent transport process. Field pending implements a queue of -%% transport processes to which an association has been assigned (at -%% comm_up and written into tmap) but for which diameter hasn't yet -%% spawned a transport process: a short-lived state of affairs as a -%% new transport is spawned as a consequence of a peer being taken up, -%% transport processes being spawned by the listener on demand. In -%% case diameter starts a transport before comm_up on a new -%% association, pending is set to an improper list with the spawned -%% transport as head and the queue as tail. +%% relevant transport process. Field pending implements two queues: +%% the first of transport-to-be processes to which an association has +%% been assigned (at comm_up and written into tmap) but for which +%% diameter hasn't yet spawned a transport process, a short-lived +%% state of affairs as a new transport is spawned as a consequence of +%% a peer being taken up, transport processes being spawned by the +%% listener on demand; the second of started transport processes that +%% have not yet been assigned an association. +%% +%% When diameter calls start/3, the transport process is either taken +%% from the first queue or spawned and placed in the second queue +%% until an association is established. When an association is +%% established, a controlling process is either taken from the second +%% queue or spawned and placed in the first queue. Thus, there are +%% only elements in one queue at a time, so share an ets table queue +%% and tag it with a positive length if it contains the first queue, a +%% negative length if it contains the second queue. The case -1 is +%% handled differently for backwards compatibility reasons. %% --------------------------------------------------------------------------- %% # start/3 @@ -139,9 +149,9 @@ ip(T) -> T. %% A listener spawns transports either as a consequence of this call -%% when there is not yet an association to associate with it, or at -%% comm_up on a new association in which case the call retrieves a -%% transport from the pending queue. +%% when there is not yet an association to assign it, or at comm_up on +%% a new association in which case the call retrieves a transport from +%% the pending queue. s({accept, Ref} = A, Addrs, Opts) -> {LPid, LAs} = listener(Ref, {Opts, Addrs}), try gen_server:call(LPid, {A, self()}, infinity) of @@ -226,7 +236,7 @@ i({connect, Pid, Opts, Addrs, Ref}) -> {LAs, Sock} = open(Addrs, Rest, 0), putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self(), LAs}), - erlang:monitor(process, Pid), + monitor(process, Pid), #transport{parent = Pid, mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock}; @@ -236,8 +246,8 @@ i({accept, Pid, LPid, Sock, Ref}) when is_pid(Pid) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), - erlang:monitor(process, Pid), - erlang:monitor(process, LPid), + monitor(process, Pid), + monitor(process, LPid), #transport{parent = Pid, mode = {accept, LPid}, socket = Sock}; @@ -246,7 +256,7 @@ i({accept, Pid, LPid, Sock, Ref}) i({accept, Ref, LPid, Sock, Id}) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), - MRef = erlang:monitor(process, LPid), + MRef = monitor(process, LPid), %% Wait for a signal that the transport has been started before %% processing other messages. receive @@ -270,15 +280,23 @@ close(Sock, Id) -> %% listener/2 +%% Accepting processes can be started concurrently: ensure only one +%% listener is started. listener(LRef, T) -> + diameter_sync:call({?MODULE, listener, LRef}, + {?MODULE, listener, [{LRef, T}]}, + infinity, + infinity). + +listener({LRef, T}) -> l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T). -%% Existing process with the listening socket ... +%% Existing listening process ... l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> - {LAs, _Sock} = AS, - {LPid, LAs}; - -%% ... or not: start one. + {LAs, _Sock} = AS, + {LPid, LAs}; + +%% ... or not. l([], LRef, T) -> {ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}), {LPid, LAs}. @@ -347,11 +365,17 @@ type(T) -> %% # handle_call/3 %% --------------------------------------------------------------------------- +handle_call(T, From, #listener{pending = L} = S) + when is_list(L) -> + handle_call(T, From, upgrade(S)); + handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref, - count = N} + pending = {N,Q}, + count = K} = S) -> - {TPid, NewS} = accept(Ref, Pid, S), - {reply, {ok, TPid}, NewS#listener{count = N+1}}; + TPid = accept(Ref, Pid, S), + {reply, {ok, TPid}, downgrade(S#listener{pending = {N-1,Q}, + count = K+1})}; handle_call(_, _, State) -> {reply, nok, State}. @@ -370,8 +394,46 @@ handle_cast(_, State) -> handle_info(T, #transport{} = S) -> {noreply, #transport{} = t(T,S)}; +handle_info(T, #listener{pending = L} = S) + when is_list(L) -> + handle_info(T, upgrade(S)); + handle_info(T, #listener{} = S) -> - {noreply, #listener{} = l(T,S)}. + {noreply, downgrade(#listener{} = l(T,S))}. + +%% upgrade/1 + +upgrade(#listener{pending = [TPid | {0,Q}]} = S) -> + ets:insert(Q, {TPid, now()}), + S#listener{pending = {-1,Q}}. +%% Prior to the possiblity of setting pool_size on in transport +%% configuration, a new accepting transport was only started following +%% the death of a predecessor, so that there was only at most one +%% previously started transport process waiting for an association. +%% This assumption no longer holds with pool_size > 1, in which case +%% several accepting transports are started concurrently. Deal with +%% this by placing the started transports in a new queue of transport +%% processes waiting for an association. +%% +%% Since only one of this queue and the existing queue of controlling +%% processes waiting for a transport to be started can be non-empty at +%% any given time, implement both queues in the same ets table. The +%% absolute value of the first element of the 2-tuple is the queue +%% length, the sign says which queue it is. + +%% downgrade/1 +%% +%% Revert to the pre-pool_size representation when possible, for +%% backwards compatibility in the case that the pool_size option +%% hasn't been used. + +downgrade(#listener{pending = {-1,Q}} = S) -> + TPid = ets:first(Q), + ets:delete(Q, TPid), + S#listener{pending = [TPid | {0,Q}]}; + +downgrade(S) -> + S. %% --------------------------------------------------------------------------- %% # code_change/3 @@ -436,54 +498,46 @@ l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) -> setopts(Sock) end; -%% Transport is asking message to be sent. See send/3 for why the send -%% isn't directly from the transport. -l({send, AssocId, StreamId, Bin}, #listener{socket = Sock} = S) -> - send(Sock, AssocId, StreamId, Bin), - S; +l({'DOWN', MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> + down(ets:member(Q, TPid), MRef, TPid, S); + +%% Timeout after the last accepting process has died. +l({timeout, TRef, close = T}, #listener{tref = TRef, + count = 0}) -> + x(T); +l({timeout, _, close}, #listener{} = S) -> + S. + +%% down/4 %% Accepting transport has died. One that's awaiting an association ... -l({'DOWN', MRef, process, TPid, _}, #listener{pending = [TPid | Q], - tmap = T, - count = N} - = S) -> +down(true, MRef, TPid, #listener{pending = {N,Q}, + tmap = T, + count = K} + = S) + when N < 0 -> + ets:delete(Q, TPid), ets:delete(T, MRef), ets:delete(T, TPid), - start_timer(S#listener{count = N-1, - pending = Q}); - -%% ... ditto and a new transport has already been started ... -l({'DOWN', _, process, _, _} = T, #listener{pending = [TPid | Q]} - = S) -> - #listener{pending = NQ} - = NewS - = l(T, S#listener{pending = Q}), - NewS#listener{pending = [TPid | NQ]}; - -%% ... or not. -l({'DOWN', MRef, process, TPid, _}, #listener{socket = Sock, - tmap = T, - count = N, - pending = {P,Q}} - = S) -> + start_timer(S#listener{count = K-1, + pending = {N+1,Q}}); + +%% ... or one that already has one. +down(B, MRef, TPid, #listener{socket = Sock, + tmap = T, + count = K, + pending = {N,Q}} + = S) -> [{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId ets:delete(T, MRef), ets:delete(T, Id), Id == TPid orelse close(Sock, Id), - case ets:lookup(Q, TPid) of - [{TPid, _}] -> %% transport in the pending queue ... + if B -> %% Waiting for attachment in the pending queue ... ets:delete(Q, TPid), - S#listener{pending = {P-1, Q}}; - [] -> %% ... or not - start_timer(S#listener{count = N-1}) - end; - -%% Timeout after the last accepting process has died. -l({timeout, TRef, close = T}, #listener{tref = TRef, - count = 0}) -> - x(T); -l({timeout, _, close}, #listener{} = S) -> - S. + S#listener{pending = {N-1,Q}}; + true -> %% ... or already attached + start_timer(S#listener{count = K-1}) + end. %% t/2 %% @@ -582,29 +636,24 @@ accept(Opts) -> %% No pending associations: spawn a new transport. accept(Ref, Pid, #listener{socket = Sock, tmap = T, - pending = {0,_} = Q} - = S) -> + pending = {N,Q}}) + when N =< 0 -> Arg = {accept, Pid, self(), Sock, Ref}, {ok, TPid} = diameter_sctp_sup:start_child(Arg), - MRef = erlang:monitor(process, TPid), + MRef = monitor(process, TPid), ets:insert(T, [{MRef, TPid}, {TPid, MRef}]), - {TPid, S#listener{pending = [TPid | Q]}}; -%% Placing the transport in the pending field makes it available to -%% the next association. The stack starts a new accepting transport -%% only after this one brings the connection up (or dies). - -%% Accepting transport has died. This can happen if a new transport is -%% started before the DOWN has arrived. -accept(Ref, Pid, #listener{pending = [TPid | {0,_} = Q]} = S) -> - false = is_process_alive(TPid), %% assert - accept(Ref, Pid, S#listener{pending = Q}); + ets:insert(Q, {TPid, now()}), + TPid; +%% Placing the transport in the second pending table makes it +%% available to the next association. %% Pending associations: attach to the first in the queue. -accept(_, Pid, #listener{ref = Ref, pending = {N,Q}} = S) -> +accept(_, Pid, #listener{ref = Ref, + pending = {_,Q}}) -> TPid = ets:first(Q), TPid ! {Ref, Pid}, ets:delete(Q, TPid), - {TPid, S#listener{pending = {N-1, Q}}}. + TPid. %% send/2 @@ -718,34 +767,12 @@ up(#transport{parent = Pid, find(Id, Data, #listener{tmap = T} = S) -> f(ets:lookup(T, Id), Data, S). -%% New association and a transport waiting for one: use it. +%% New association ... f([], - {_, #sctp_assoc_change{state = comm_up, - assoc_id = Id}}, - #listener{tmap = T, - pending = [TPid | {_,_} = Q]} + {_, #sctp_assoc_change{state = comm_up, assoc_id = Id}}, + #listener{pending = {N,Q}} = S) -> - [{TPid, MRef}] = ets:lookup(T, TPid), - ets:insert(T, [{MRef, Id}, {Id, TPid}]), - ets:delete(T, TPid), - {TPid, S#listener{pending = Q}}; - -%% New association and no transport start yet: spawn one and place it -%% in the queue. -f([], - {_, #sctp_assoc_change{state = comm_up, - assoc_id = Id}}, - #listener{ref = Ref, - socket = Sock, - tmap = T, - pending = {N,Q}} - = S) -> - Arg = {accept, Ref, self(), Sock, Id}, - {ok, TPid} = diameter_sctp_sup:start_child(Arg), - MRef = erlang:monitor(process, TPid), - ets:insert(T, [{MRef, Id}, {Id, TPid}]), - ets:insert(Q, {TPid, now()}), - {TPid, S#listener{pending = {N+1, Q}}}; + {find(Id, S), S#listener{pending = {N+1,Q}}}; %% Known association ... f([{_, TPid}], _, S) -> @@ -755,6 +782,31 @@ f([{_, TPid}], _, S) -> f([], _, _) -> false. +%% find/2 + +%% Transport waiting for an association: use it. +find(Id, #listener{tmap = T, + pending = {N,Q}}) + when N < 0 -> + TPid = ets:first(Q), + [{TPid, MRef}] = ets:lookup(T, TPid), + ets:insert(T, [{MRef, Id}, {Id, TPid}]), + ets:delete(T, TPid), + ets:delete(Q, TPid), + TPid; + +%% No transport start yet: spawn one and queue. +find(Id, #listener{ref = Ref, + socket = Sock, + tmap = T, + pending = {_,Q}}) -> + Arg = {accept, Ref, self(), Sock, Id}, + {ok, TPid} = diameter_sctp_sup:start_child(Arg), + MRef = monitor(process, TPid), + ets:insert(T, [{MRef, Id}, {Id, TPid}]), + ets:insert(Q, {TPid, now()}), + TPid. + %% assoc_id/1 assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) -> diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 4d1b8bec51..0b26f429fb 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2013. All Rights Reserved. +%% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -37,7 +37,8 @@ code_change/3, terminate/2]). --export([info/1]). %% service_info callback +-export([listener/1,%% diameter_sync callback + info/1]). %% service_info callback -export([ports/0, ports/1]). @@ -191,7 +192,7 @@ init(T) -> i({T, Ref, Mod, Pid, Opts, Addrs}) when T == accept; T == connect -> - erlang:monitor(process, Pid), + monitor(process, Pid), %% Since accept/connect might block indefinitely, spawn a process %% that does nothing but kill us with the parent until call %% returns. @@ -218,8 +219,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs}) %% A monitor process to kill the transport if the parent dies. i(#monitor{parent = Pid, transport = TPid} = S) -> proc_lib:init_ack({ok, self()}), - erlang:monitor(process, Pid), - erlang:monitor(process, TPid), + monitor(process, Pid), + monitor(process, TPid), S; %% In principle a link between the transport and killer processes %% could do the same thing: have the accepting/connecting process be @@ -235,7 +236,7 @@ i({listen, LRef, APid, {Mod, Opts, Addrs}}) -> LAddr = laddr(LAddrOpt, Mod, LSock), true = diameter_reg:add_new({?MODULE, listener, {LRef, {LAddr, LSock}}}), proc_lib:init_ack({ok, self(), {LAddr, LSock}}), - erlang:monitor(process, APid), + monitor(process, APid), start_timer(#listener{socket = LSock}). laddr([], Mod, Sock) -> @@ -336,17 +337,25 @@ accept(Opts) -> %% listener/2 +%% Accepting processes can be started concurrently: ensure only one +%% listener is started. listener(LRef, T) -> - l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T). + diameter_sync:call({?MODULE, listener, LRef}, + {?MODULE, listener, [{LRef, T, self()}]}, + infinity, + infinity). -%% Existing process with the listening socket ... -l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> - LPid ! {accept, self()}, +listener({LRef, T, TPid}) -> + l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T, TPid). + +%% Existing listening process ... +l([{{?MODULE, listener, {_, AS}}, LPid}], _, _, TPid) -> + LPid ! {accept, TPid}, AS; -%% ... or not: start one. -l([], LRef, T) -> - {ok, _, AS} = diameter_tcp_sup:start_child({listen, LRef, self(), T}), +%% ... or not. +l([], LRef, T, TPid) -> + {ok, _, AS} = diameter_tcp_sup:start_child({listen, LRef, TPid, T}), AS. %% get_addr/1 @@ -502,7 +511,7 @@ m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid, %% Another accept transport is attaching. l({accept, TPid}, #listener{count = N} = S) -> - erlang:monitor(process, TPid), + monitor(process, TPid), S#listener{count = N+1}; %% Accepting process has died. |