aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl51
1 files changed, 25 insertions, 26 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 0fe25106cc..bb0bf82f04 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -103,7 +103,7 @@
{ref :: reference(),
socket :: gen_sctp:sctp_socket(),
count = 0 :: uint(), %% attached transport processes
- pending = {0, ets:new(?MODULE, [ordered_set])},
+ pending = {0, queue:new()},
tref :: reference(),
accept :: [match()]}).
%% Field pending implements two queues: the first of transport-to-be
@@ -369,12 +369,10 @@ type(T) ->
%% ---------------------------------------------------------------------------
handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref,
- pending = {N,Q},
count = K}
= S) ->
- TPid = accept(Ref, Pid, S),
- {reply, {ok, TPid}, S#listener{pending = {N-1,Q},
- count = K+1}};
+ {TPid, NewS} = accept(Ref, Pid, S),
+ {reply, {ok, TPid}, NewS#listener{count = K+1}};
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -455,7 +453,7 @@ l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
NewS;
l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) ->
- down(ets:member(Q, TPid), TPid, S);
+ down(queue:member(TPid, Q), TPid, S);
%% Timeout after the last accepting process has died.
l({timeout, TRef, close = T}, #listener{tref = TRef,
@@ -472,12 +470,12 @@ l({timeout, _, close}, #listener{} = S) ->
down(true, TPid, #listener{pending = {N,Q},
count = K}
= S) ->
- ets:delete(Q, TPid),
+ NQ = queue:filter(fun(P) -> P /= TPid end, Q),
if N < 0 -> %% awaiting an association ...
start_timer(S#listener{count = K-1,
- pending = {N+1,Q}});
+ pending = {N+1, NQ}});
true -> %% ... or one has been assigned
- S#listener{pending = {N-1,Q}}
+ S#listener{pending = {N-1, NQ}}
end;
%% ... or one that's already attached.
@@ -559,16 +557,20 @@ accept_peer(Sock, Matches) ->
%% started as a consequence of diameter requesting a transport
%% process.
+accept(Ref, Pid, #listener{pending = {N,_}} = S) ->
+ {TPid, NQ} = q(Ref, Pid, S),
+ {TPid, S#listener{pending = {N-1, NQ}}}.
+
%% Pending associations: attach to the first in the queue.
-accept(_, Pid, #listener{ref = Ref,
- pending = {N,Q}})
+q(_, Pid, #listener{ref = Ref,
+ pending = {N,Q}})
when 0 < N ->
- TPid = dq(Q),
+ {TPid, _} = T = dq(Q),
TPid ! {Ref, Pid},
- TPid;
+ T;
%% No pending associations: spawn a new transport.
-accept(Ref, Pid, #listener{pending = {_,Q}}) ->
+q(Ref, Pid, #listener{pending = {_,Q}}) ->
nq({accept, Ref, self(), Pid}, Q).
%% send/2
@@ -685,19 +687,18 @@ up(#transport{parent = Pid,
%% Start a new transport process or use one that's already been
%% started as a consequence of an event to a listener process.
-accept(#listener{pending = {N,Q}} = S) ->
- {tpid(S), S#listener{pending = {N+1,Q}}}.
-
-%% tpid/1
+accept(#listener{pending = {N,_}} = S) ->
+ {TPid, NQ} = q(S),
+ {TPid, S#listener{pending = {N+1, NQ}}}.
%% Transport waiting for an association: use it.
-tpid(#listener{pending = {N,Q}})
+q(#listener{pending = {N,Q}})
when N < 0 ->
dq(Q);
%% No transport start yet: spawn one and queue.
-tpid(#listener{ref = Ref,
- pending = {_,Q}}) ->
+q(#listener{ref = Ref,
+ pending = {_,Q}}) ->
nq({accept, Ref, self()}, Q).
%% nq/2
@@ -708,8 +709,7 @@ tpid(#listener{ref = Ref,
nq(Arg, Q) ->
{ok, TPid} = diameter_sctp_sup:start_child(Arg),
monitor(process, TPid),
- ets:insert(Q, {TPid, now()}),
- TPid.
+ {TPid, queue:in(TPid, Q)}.
%% dq/1
%%
@@ -717,9 +717,8 @@ nq(Arg, Q) ->
%% it to an existing association.
dq(Q) ->
- TPid = ets:first(Q),
- ets:delete(Q, TPid),
- TPid.
+ {{value, TPid}, NQ} = queue:out(Q),
+ {TPid, NQ}.
%% assoc_id/1
%%