diff options
author | Anders Svensson <anders@erlang.org> | 2015-06-19 13:26:19 +0200 |
---|---|---|
committer | Anders Svensson <anders@erlang.org> | 2015-06-19 14:27:27 +0200 |
commit | cb89a27002558f19f236febef877c5cf135c4c8d (patch) | |
tree | 26cb2e1d3bc1078472db43f517dc4dacfb0ed663 /lib | |
parent | 207de95c123a0df754a6886011fc05a4b513a6a5 (diff) | |
download | otp-cb89a27002558f19f236febef877c5cf135c4c8d.tar.gz otp-cb89a27002558f19f236febef877c5cf135c4c8d.tar.bz2 otp-cb89a27002558f19f236febef877c5cf135c4c8d.zip |
Ensure accepting processes are first in, first out
A listener process in diameter_sctp starts accepting transport processes
as required, either as associations are established or as diameter asks
for a processes to be started. Since this can happen in any order, the
listener maintains two queues: one for processes that diameter has
requested and which are waiting to be given an association, another for
processes that have been started to become owners of an association but
are waiting for diameter to request them. Only one queue at a time is
non-empty. The first queue's length is bounded by the number of
accepting processes configured as pool_size. Entries in the second queue
are short-lived since diameter starts a replacement transport process
whenever an existing one dies or communicates that it has an
association.
The two queues were previously implemented in an ets ordered_set, whose
keys were the pid() of transport processes. Removing an element from the
queue was then done with ets:first/1. The problem with this it's not
really a queue: there's no guarantee that pid-ordering is the same as
the order in which processes are started. If it isn't then it's possible
that an established association never be given to diameter as a
transport process if there's always a newer association whose pid sorts
first. This isn't a problem in practice since it would require new
associations to be established faster than diameter starts transport
processes, but redo the implementation as a queue, with strict FIFO
semantics.
Diffstat (limited to 'lib')
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 51 |
1 files changed, 25 insertions, 26 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 0fe25106cc..bb0bf82f04 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -103,7 +103,7 @@ {ref :: reference(), socket :: gen_sctp:sctp_socket(), count = 0 :: uint(), %% attached transport processes - pending = {0, ets:new(?MODULE, [ordered_set])}, + pending = {0, queue:new()}, tref :: reference(), accept :: [match()]}). %% Field pending implements two queues: the first of transport-to-be @@ -369,12 +369,10 @@ type(T) -> %% --------------------------------------------------------------------------- handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref, - pending = {N,Q}, count = K} = S) -> - TPid = accept(Ref, Pid, S), - {reply, {ok, TPid}, S#listener{pending = {N-1,Q}, - count = K+1}}; + {TPid, NewS} = accept(Ref, Pid, S), + {reply, {ok, TPid}, NewS#listener{count = K+1}}; handle_call(_, _, State) -> {reply, nok, State}. @@ -455,7 +453,7 @@ l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, NewS; l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> - down(ets:member(Q, TPid), TPid, S); + down(queue:member(TPid, Q), TPid, S); %% Timeout after the last accepting process has died. l({timeout, TRef, close = T}, #listener{tref = TRef, @@ -472,12 +470,12 @@ l({timeout, _, close}, #listener{} = S) -> down(true, TPid, #listener{pending = {N,Q}, count = K} = S) -> - ets:delete(Q, TPid), + NQ = queue:filter(fun(P) -> P /= TPid end, Q), if N < 0 -> %% awaiting an association ... start_timer(S#listener{count = K-1, - pending = {N+1,Q}}); + pending = {N+1, NQ}}); true -> %% ... or one has been assigned - S#listener{pending = {N-1,Q}} + S#listener{pending = {N-1, NQ}} end; %% ... or one that's already attached. @@ -559,16 +557,20 @@ accept_peer(Sock, Matches) -> %% started as a consequence of diameter requesting a transport %% process. +accept(Ref, Pid, #listener{pending = {N,_}} = S) -> + {TPid, NQ} = q(Ref, Pid, S), + {TPid, S#listener{pending = {N-1, NQ}}}. + %% Pending associations: attach to the first in the queue. -accept(_, Pid, #listener{ref = Ref, - pending = {N,Q}}) +q(_, Pid, #listener{ref = Ref, + pending = {N,Q}}) when 0 < N -> - TPid = dq(Q), + {TPid, _} = T = dq(Q), TPid ! {Ref, Pid}, - TPid; + T; %% No pending associations: spawn a new transport. -accept(Ref, Pid, #listener{pending = {_,Q}}) -> +q(Ref, Pid, #listener{pending = {_,Q}}) -> nq({accept, Ref, self(), Pid}, Q). %% send/2 @@ -685,19 +687,18 @@ up(#transport{parent = Pid, %% Start a new transport process or use one that's already been %% started as a consequence of an event to a listener process. -accept(#listener{pending = {N,Q}} = S) -> - {tpid(S), S#listener{pending = {N+1,Q}}}. - -%% tpid/1 +accept(#listener{pending = {N,_}} = S) -> + {TPid, NQ} = q(S), + {TPid, S#listener{pending = {N+1, NQ}}}. %% Transport waiting for an association: use it. -tpid(#listener{pending = {N,Q}}) +q(#listener{pending = {N,Q}}) when N < 0 -> dq(Q); %% No transport start yet: spawn one and queue. -tpid(#listener{ref = Ref, - pending = {_,Q}}) -> +q(#listener{ref = Ref, + pending = {_,Q}}) -> nq({accept, Ref, self()}, Q). %% nq/2 @@ -708,8 +709,7 @@ tpid(#listener{ref = Ref, nq(Arg, Q) -> {ok, TPid} = diameter_sctp_sup:start_child(Arg), monitor(process, TPid), - ets:insert(Q, {TPid, now()}), - TPid. + {TPid, queue:in(TPid, Q)}. %% dq/1 %% @@ -717,9 +717,8 @@ nq(Arg, Q) -> %% it to an existing association. dq(Q) -> - TPid = ets:first(Q), - ets:delete(Q, TPid), - TPid. + {{value, TPid}, NQ} = queue:out(Q), + {TPid, NQ}. %% assoc_id/1 %% |