From cb89a27002558f19f236febef877c5cf135c4c8d Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 19 Jun 2015 13:26:19 +0200 Subject: Ensure accepting processes are first in, first out A listener process in diameter_sctp starts accepting transport processes as required, either as associations are established or as diameter asks for a processes to be started. Since this can happen in any order, the listener maintains two queues: one for processes that diameter has requested and which are waiting to be given an association, another for processes that have been started to become owners of an association but are waiting for diameter to request them. Only one queue at a time is non-empty. The first queue's length is bounded by the number of accepting processes configured as pool_size. Entries in the second queue are short-lived since diameter starts a replacement transport process whenever an existing one dies or communicates that it has an association. The two queues were previously implemented in an ets ordered_set, whose keys were the pid() of transport processes. Removing an element from the queue was then done with ets:first/1. The problem with this it's not really a queue: there's no guarantee that pid-ordering is the same as the order in which processes are started. If it isn't then it's possible that an established association never be given to diameter as a transport process if there's always a newer association whose pid sorts first. This isn't a problem in practice since it would require new associations to be established faster than diameter starts transport processes, but redo the implementation as a queue, with strict FIFO semantics. --- lib/diameter/src/transport/diameter_sctp.erl | 51 ++++++++++++++-------------- 1 file changed, 25 insertions(+), 26 deletions(-) (limited to 'lib') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 0fe25106cc..bb0bf82f04 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -103,7 +103,7 @@ {ref :: reference(), socket :: gen_sctp:sctp_socket(), count = 0 :: uint(), %% attached transport processes - pending = {0, ets:new(?MODULE, [ordered_set])}, + pending = {0, queue:new()}, tref :: reference(), accept :: [match()]}). %% Field pending implements two queues: the first of transport-to-be @@ -369,12 +369,10 @@ type(T) -> %% --------------------------------------------------------------------------- handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref, - pending = {N,Q}, count = K} = S) -> - TPid = accept(Ref, Pid, S), - {reply, {ok, TPid}, S#listener{pending = {N-1,Q}, - count = K+1}}; + {TPid, NewS} = accept(Ref, Pid, S), + {reply, {ok, TPid}, NewS#listener{count = K+1}}; handle_call(_, _, State) -> {reply, nok, State}. @@ -455,7 +453,7 @@ l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, NewS; l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> - down(ets:member(Q, TPid), TPid, S); + down(queue:member(TPid, Q), TPid, S); %% Timeout after the last accepting process has died. l({timeout, TRef, close = T}, #listener{tref = TRef, @@ -472,12 +470,12 @@ l({timeout, _, close}, #listener{} = S) -> down(true, TPid, #listener{pending = {N,Q}, count = K} = S) -> - ets:delete(Q, TPid), + NQ = queue:filter(fun(P) -> P /= TPid end, Q), if N < 0 -> %% awaiting an association ... start_timer(S#listener{count = K-1, - pending = {N+1,Q}}); + pending = {N+1, NQ}}); true -> %% ... or one has been assigned - S#listener{pending = {N-1,Q}} + S#listener{pending = {N-1, NQ}} end; %% ... or one that's already attached. @@ -559,16 +557,20 @@ accept_peer(Sock, Matches) -> %% started as a consequence of diameter requesting a transport %% process. +accept(Ref, Pid, #listener{pending = {N,_}} = S) -> + {TPid, NQ} = q(Ref, Pid, S), + {TPid, S#listener{pending = {N-1, NQ}}}. + %% Pending associations: attach to the first in the queue. -accept(_, Pid, #listener{ref = Ref, - pending = {N,Q}}) +q(_, Pid, #listener{ref = Ref, + pending = {N,Q}}) when 0 < N -> - TPid = dq(Q), + {TPid, _} = T = dq(Q), TPid ! {Ref, Pid}, - TPid; + T; %% No pending associations: spawn a new transport. -accept(Ref, Pid, #listener{pending = {_,Q}}) -> +q(Ref, Pid, #listener{pending = {_,Q}}) -> nq({accept, Ref, self(), Pid}, Q). %% send/2 @@ -685,19 +687,18 @@ up(#transport{parent = Pid, %% Start a new transport process or use one that's already been %% started as a consequence of an event to a listener process. -accept(#listener{pending = {N,Q}} = S) -> - {tpid(S), S#listener{pending = {N+1,Q}}}. - -%% tpid/1 +accept(#listener{pending = {N,_}} = S) -> + {TPid, NQ} = q(S), + {TPid, S#listener{pending = {N+1, NQ}}}. %% Transport waiting for an association: use it. -tpid(#listener{pending = {N,Q}}) +q(#listener{pending = {N,Q}}) when N < 0 -> dq(Q); %% No transport start yet: spawn one and queue. -tpid(#listener{ref = Ref, - pending = {_,Q}}) -> +q(#listener{ref = Ref, + pending = {_,Q}}) -> nq({accept, Ref, self()}, Q). %% nq/2 @@ -708,8 +709,7 @@ tpid(#listener{ref = Ref, nq(Arg, Q) -> {ok, TPid} = diameter_sctp_sup:start_child(Arg), monitor(process, TPid), - ets:insert(Q, {TPid, now()}), - TPid. + {TPid, queue:in(TPid, Q)}. %% dq/1 %% @@ -717,9 +717,8 @@ nq(Arg, Q) -> %% it to an existing association. dq(Q) -> - TPid = ets:first(Q), - ets:delete(Q, TPid), - TPid. + {{value, TPid}, NQ} = queue:out(Q), + {TPid, NQ}. %% assoc_id/1 %% -- cgit v1.2.3