aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2015-06-19 13:26:19 +0200
committerAnders Svensson <[email protected]>2015-06-19 14:27:27 +0200
commitcb89a27002558f19f236febef877c5cf135c4c8d (patch)
tree26cb2e1d3bc1078472db43f517dc4dacfb0ed663
parent207de95c123a0df754a6886011fc05a4b513a6a5 (diff)
downloadotp-cb89a27002558f19f236febef877c5cf135c4c8d.tar.gz
otp-cb89a27002558f19f236febef877c5cf135c4c8d.tar.bz2
otp-cb89a27002558f19f236febef877c5cf135c4c8d.zip
Ensure accepting processes are first in, first out
A listener process in diameter_sctp starts accepting transport processes as required, either as associations are established or as diameter asks for a processes to be started. Since this can happen in any order, the listener maintains two queues: one for processes that diameter has requested and which are waiting to be given an association, another for processes that have been started to become owners of an association but are waiting for diameter to request them. Only one queue at a time is non-empty. The first queue's length is bounded by the number of accepting processes configured as pool_size. Entries in the second queue are short-lived since diameter starts a replacement transport process whenever an existing one dies or communicates that it has an association. The two queues were previously implemented in an ets ordered_set, whose keys were the pid() of transport processes. Removing an element from the queue was then done with ets:first/1. The problem with this it's not really a queue: there's no guarantee that pid-ordering is the same as the order in which processes are started. If it isn't then it's possible that an established association never be given to diameter as a transport process if there's always a newer association whose pid sorts first. This isn't a problem in practice since it would require new associations to be established faster than diameter starts transport processes, but redo the implementation as a queue, with strict FIFO semantics.
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl51
1 files changed, 25 insertions, 26 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 0fe25106cc..bb0bf82f04 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -103,7 +103,7 @@
{ref :: reference(),
socket :: gen_sctp:sctp_socket(),
count = 0 :: uint(), %% attached transport processes
- pending = {0, ets:new(?MODULE, [ordered_set])},
+ pending = {0, queue:new()},
tref :: reference(),
accept :: [match()]}).
%% Field pending implements two queues: the first of transport-to-be
@@ -369,12 +369,10 @@ type(T) ->
%% ---------------------------------------------------------------------------
handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref,
- pending = {N,Q},
count = K}
= S) ->
- TPid = accept(Ref, Pid, S),
- {reply, {ok, TPid}, S#listener{pending = {N-1,Q},
- count = K+1}};
+ {TPid, NewS} = accept(Ref, Pid, S),
+ {reply, {ok, TPid}, NewS#listener{count = K+1}};
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -455,7 +453,7 @@ l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
NewS;
l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) ->
- down(ets:member(Q, TPid), TPid, S);
+ down(queue:member(TPid, Q), TPid, S);
%% Timeout after the last accepting process has died.
l({timeout, TRef, close = T}, #listener{tref = TRef,
@@ -472,12 +470,12 @@ l({timeout, _, close}, #listener{} = S) ->
down(true, TPid, #listener{pending = {N,Q},
count = K}
= S) ->
- ets:delete(Q, TPid),
+ NQ = queue:filter(fun(P) -> P /= TPid end, Q),
if N < 0 -> %% awaiting an association ...
start_timer(S#listener{count = K-1,
- pending = {N+1,Q}});
+ pending = {N+1, NQ}});
true -> %% ... or one has been assigned
- S#listener{pending = {N-1,Q}}
+ S#listener{pending = {N-1, NQ}}
end;
%% ... or one that's already attached.
@@ -559,16 +557,20 @@ accept_peer(Sock, Matches) ->
%% started as a consequence of diameter requesting a transport
%% process.
+accept(Ref, Pid, #listener{pending = {N,_}} = S) ->
+ {TPid, NQ} = q(Ref, Pid, S),
+ {TPid, S#listener{pending = {N-1, NQ}}}.
+
%% Pending associations: attach to the first in the queue.
-accept(_, Pid, #listener{ref = Ref,
- pending = {N,Q}})
+q(_, Pid, #listener{ref = Ref,
+ pending = {N,Q}})
when 0 < N ->
- TPid = dq(Q),
+ {TPid, _} = T = dq(Q),
TPid ! {Ref, Pid},
- TPid;
+ T;
%% No pending associations: spawn a new transport.
-accept(Ref, Pid, #listener{pending = {_,Q}}) ->
+q(Ref, Pid, #listener{pending = {_,Q}}) ->
nq({accept, Ref, self(), Pid}, Q).
%% send/2
@@ -685,19 +687,18 @@ up(#transport{parent = Pid,
%% Start a new transport process or use one that's already been
%% started as a consequence of an event to a listener process.
-accept(#listener{pending = {N,Q}} = S) ->
- {tpid(S), S#listener{pending = {N+1,Q}}}.
-
-%% tpid/1
+accept(#listener{pending = {N,_}} = S) ->
+ {TPid, NQ} = q(S),
+ {TPid, S#listener{pending = {N+1, NQ}}}.
%% Transport waiting for an association: use it.
-tpid(#listener{pending = {N,Q}})
+q(#listener{pending = {N,Q}})
when N < 0 ->
dq(Q);
%% No transport start yet: spawn one and queue.
-tpid(#listener{ref = Ref,
- pending = {_,Q}}) ->
+q(#listener{ref = Ref,
+ pending = {_,Q}}) ->
nq({accept, Ref, self()}, Q).
%% nq/2
@@ -708,8 +709,7 @@ tpid(#listener{ref = Ref,
nq(Arg, Q) ->
{ok, TPid} = diameter_sctp_sup:start_child(Arg),
monitor(process, TPid),
- ets:insert(Q, {TPid, now()}),
- TPid.
+ {TPid, queue:in(TPid, Q)}.
%% dq/1
%%
@@ -717,9 +717,8 @@ nq(Arg, Q) ->
%% it to an existing association.
dq(Q) ->
- TPid = ets:first(Q),
- ets:delete(Q, TPid),
- TPid.
+ {{value, TPid}, NQ} = queue:out(Q),
+ {TPid, NQ}.
%% assoc_id/1
%%