diff options
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 51 |
1 files changed, 25 insertions, 26 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 0fe25106cc..bb0bf82f04 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -103,7 +103,7 @@ {ref :: reference(), socket :: gen_sctp:sctp_socket(), count = 0 :: uint(), %% attached transport processes - pending = {0, ets:new(?MODULE, [ordered_set])}, + pending = {0, queue:new()}, tref :: reference(), accept :: [match()]}). %% Field pending implements two queues: the first of transport-to-be @@ -369,12 +369,10 @@ type(T) -> %% --------------------------------------------------------------------------- handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref, - pending = {N,Q}, count = K} = S) -> - TPid = accept(Ref, Pid, S), - {reply, {ok, TPid}, S#listener{pending = {N-1,Q}, - count = K+1}}; + {TPid, NewS} = accept(Ref, Pid, S), + {reply, {ok, TPid}, NewS#listener{count = K+1}}; handle_call(_, _, State) -> {reply, nok, State}. @@ -455,7 +453,7 @@ l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, NewS; l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> - down(ets:member(Q, TPid), TPid, S); + down(queue:member(TPid, Q), TPid, S); %% Timeout after the last accepting process has died. l({timeout, TRef, close = T}, #listener{tref = TRef, @@ -472,12 +470,12 @@ l({timeout, _, close}, #listener{} = S) -> down(true, TPid, #listener{pending = {N,Q}, count = K} = S) -> - ets:delete(Q, TPid), + NQ = queue:filter(fun(P) -> P /= TPid end, Q), if N < 0 -> %% awaiting an association ... start_timer(S#listener{count = K-1, - pending = {N+1,Q}}); + pending = {N+1, NQ}}); true -> %% ... or one has been assigned - S#listener{pending = {N-1,Q}} + S#listener{pending = {N-1, NQ}} end; %% ... or one that's already attached. @@ -559,16 +557,20 @@ accept_peer(Sock, Matches) -> %% started as a consequence of diameter requesting a transport %% process. +accept(Ref, Pid, #listener{pending = {N,_}} = S) -> + {TPid, NQ} = q(Ref, Pid, S), + {TPid, S#listener{pending = {N-1, NQ}}}. + %% Pending associations: attach to the first in the queue. -accept(_, Pid, #listener{ref = Ref, - pending = {N,Q}}) +q(_, Pid, #listener{ref = Ref, + pending = {N,Q}}) when 0 < N -> - TPid = dq(Q), + {TPid, _} = T = dq(Q), TPid ! {Ref, Pid}, - TPid; + T; %% No pending associations: spawn a new transport. -accept(Ref, Pid, #listener{pending = {_,Q}}) -> +q(Ref, Pid, #listener{pending = {_,Q}}) -> nq({accept, Ref, self(), Pid}, Q). %% send/2 @@ -685,19 +687,18 @@ up(#transport{parent = Pid, %% Start a new transport process or use one that's already been %% started as a consequence of an event to a listener process. -accept(#listener{pending = {N,Q}} = S) -> - {tpid(S), S#listener{pending = {N+1,Q}}}. - -%% tpid/1 +accept(#listener{pending = {N,_}} = S) -> + {TPid, NQ} = q(S), + {TPid, S#listener{pending = {N+1, NQ}}}. %% Transport waiting for an association: use it. -tpid(#listener{pending = {N,Q}}) +q(#listener{pending = {N,Q}}) when N < 0 -> dq(Q); %% No transport start yet: spawn one and queue. -tpid(#listener{ref = Ref, - pending = {_,Q}}) -> +q(#listener{ref = Ref, + pending = {_,Q}}) -> nq({accept, Ref, self()}, Q). %% nq/2 @@ -708,8 +709,7 @@ tpid(#listener{ref = Ref, nq(Arg, Q) -> {ok, TPid} = diameter_sctp_sup:start_child(Arg), monitor(process, TPid), - ets:insert(Q, {TPid, now()}), - TPid. + {TPid, queue:in(TPid, Q)}. %% dq/1 %% @@ -717,9 +717,8 @@ nq(Arg, Q) -> %% it to an existing association. dq(Q) -> - TPid = ets:first(Q), - ets:delete(Q, TPid), - TPid. + {{value, TPid}, NQ} = queue:out(Q), + {TPid, NQ}. %% assoc_id/1 %% |