diff options
Diffstat (limited to 'lib/diameter')
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 182 |
1 files changed, 77 insertions, 105 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index f80de0a816..43039ebd6e 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -103,15 +103,11 @@ {ref :: reference(), socket :: gen_sctp:sctp_socket(), count = 0 :: uint(), %% attached transport processes - tmap = ets:new(?MODULE, []) :: ets:tid(), - %% {MRef, Pid|AssocId}, {AssocId, Pid} pending = {0, ets:new(?MODULE, [ordered_set])}, tref :: reference(), accept :: [match()]}). -%% Field tmap is used to map an incoming message or event to the -%% relevant transport process. Field pending implements two queues: -%% the first of transport-to-be processes to which an association has -%% been assigned (at comm_up and written into tmap) but for which +%% Field pending implements two queues: the first of transport-to-be +%% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived %% state of affairs as a new transport is spawned as a consequence of %% a peer being taken up, transport processes being spawned by the @@ -297,7 +293,7 @@ listener({LRef, T}) -> l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> {LAs, _Sock} = AS, {LPid, LAs}; - + %% ... or not. l([], LRef, T) -> {ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}), @@ -489,19 +485,13 @@ start_timer(S) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) -> Id = assoc_id(Data), + {TPid, NewS} = accept(Id, S), + TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept}, + setopts(Sock), + NewS; - try find(Id, Data, S) of - {TPid, NewS} -> - TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept}, - NewS; - false -> - S - after - setopts(Sock) - end; - -l({'DOWN', MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> - down(ets:member(Q, TPid), MRef, TPid, S); +l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> + down(ets:member(Q, TPid), TPid, S); %% Timeout after the last accepting process has died. l({timeout, TRef, close = T}, #listener{tref = TRef, @@ -510,36 +500,25 @@ l({timeout, TRef, close = T}, #listener{tref = TRef, l({timeout, _, close}, #listener{} = S) -> S. -%% down/4 +%% down/3 +%% +%% Accepting transport has died. -%% Accepting transport has died. One that's awaiting an association ... -down(true, MRef, TPid, #listener{pending = {N,Q}, - tmap = T, - count = K} - = S) - when N < 0 -> +%% One that's waiting for transport start in the pending queue ... +down(true, TPid, #listener{pending = {N,Q}, + count = K} + = S) -> ets:delete(Q, TPid), - ets:delete(T, MRef), - ets:delete(T, TPid), - start_timer(S#listener{count = K-1, - pending = {N+1,Q}}); - -%% ... or one that already has one. -down(B, MRef, TPid, #listener{socket = Sock, - tmap = T, - count = K, - pending = {N,Q}} - = S) -> - [{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId - ets:delete(T, MRef), - ets:delete(T, Id), - Id == TPid orelse close(Sock, Id), - if B -> %% Waiting for attachment in the pending queue ... - ets:delete(Q, TPid), - S#listener{pending = {N-1,Q}}; - true -> %% ... or already attached - start_timer(S#listener{count = K-1}) - end. + if N < 0 -> %% awaiting an association ... + start_timer(S#listener{count = K-1, + pending = {N+1,Q}}); + true -> %% ... or one has been assigned + S#listener{pending = {N-1,Q}} + end; + +%% ... or one that's already attached. +down(false, _TPid, #listener{count = K} = S) -> + start_timer(S#listener{count = K-1}). %% t/2 %% @@ -563,14 +542,12 @@ transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg, Matches}, socket = LSock} = S) -> ok = accept_peer(Sock, Matches), - transition(Msg, S#transport{socket = Sock}); + transition(setelement(2, Msg, Sock), S#transport{socket = Sock}); %% Incoming message. -transition({sctp, _Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> +transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> setopts(Sock), recv(Data, S); -%% Don't match on Sock since in R15B01 it can be the listening socket -%% in the (peeled-off) accept case, which is likely a bug. %% Outgoing message. transition({diameter, {send, Msg}}, S) -> @@ -633,29 +610,21 @@ accept(Opts) -> %% accept/3 %% %% Start a new transport process or use one that's already been -%% started as a consequence of association establishment. - -%% No pending associations: spawn a new transport. -accept(Ref, Pid, #listener{socket = Sock, - tmap = T, - pending = {N,Q}}) - when N =< 0 -> - Arg = {accept, Pid, self(), Sock, Ref}, - {ok, TPid} = diameter_sctp_sup:start_child(Arg), - MRef = monitor(process, TPid), - ets:insert(T, [{MRef, TPid}, {TPid, MRef}]), - ets:insert(Q, {TPid, now()}), - TPid; -%% Placing the transport in the second pending table makes it -%% available to the next association. +%% started as a consequence of diameter requesting a transport +%% process. %% Pending associations: attach to the first in the queue. accept(_, Pid, #listener{ref = Ref, - pending = {_,Q}}) -> - TPid = ets:first(Q), + pending = {N,Q}}) + when 0 < N -> + TPid = dq(Q), TPid ! {Ref, Pid}, - ets:delete(Q, TPid), - TPid. + TPid; + +%% No pending associations: spawn a new transport. +accept(Ref, Pid, #listener{socket = Sock, + pending = {_,Q}}) -> + nq({accept, Pid, self(), Sock, Ref}, Q). %% send/2 @@ -716,7 +685,7 @@ recv({_, #sctp_assoc_change{} = E}, = S) -> S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}}; -%% Lost association after establishment. +%% Association failure. recv({_, #sctp_assoc_change{}}, _) -> stop; @@ -727,8 +696,10 @@ recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid}) bin = Bin}), ok; -recv({_, #sctp_shutdown_event{assoc_id = Id}}, - #transport{assoc_id = Id}) -> +recv({_, #sctp_shutdown_event{assoc_id = A}}, + #transport{assoc_id = Id}) + when A == Id; + A == 0 -> stop; %% Note that diameter_sctp(3) documents that sctp_events cannot be @@ -764,52 +735,53 @@ up(#transport{parent = Pid, diameter_peer:up(Pid), S#transport{mode = A}. -%% find/3 - -find(Id, Data, #listener{tmap = T} = S) -> - f(ets:lookup(T, Id), Data, S). - -%% New association ... -f([], - {_, #sctp_assoc_change{state = comm_up, assoc_id = Id}}, - #listener{pending = {N,Q}} - = S) -> - {find(Id, S), S#listener{pending = {N+1,Q}}}; - -%% Known association ... -f([{_, TPid}], _, S) -> - {TPid, S}; +%% accept/2 +%% +%% Start a new transport process or use one that's already been +%% started as a consequence of an event to a listener process. -%% ... or not: discard. -f([], _, _) -> - false. +accept(Id, #listener{pending = {N,Q}} = S) -> + {tpid(Id, S), S#listener{pending = {N+1,Q}}}. -%% find/2 +%% tpid/2 %% Transport waiting for an association: use it. -find(Id, #listener{tmap = T, - pending = {N,Q}}) +tpid(_Id, #listener{pending = {N,Q}}) when N < 0 -> - TPid = ets:first(Q), - [{TPid, MRef}] = ets:lookup(T, TPid), - ets:insert(T, [{MRef, Id}, {Id, TPid}]), - ets:delete(T, TPid), - ets:delete(Q, TPid), - TPid; + dq(Q); %% No transport start yet: spawn one and queue. -find(Id, #listener{ref = Ref, +tpid(Id, #listener{ref = Ref, socket = Sock, - tmap = T, pending = {_,Q}}) -> - Arg = {accept, Ref, self(), Sock, Id}, + nq({accept, Ref, self(), Sock, Id}, Q). + +%% nq/2 +%% +%% Place a transport process in the second pending queue to make it +%% available to the next association. + +nq(Arg, Q) -> {ok, TPid} = diameter_sctp_sup:start_child(Arg), - MRef = monitor(process, TPid), - ets:insert(T, [{MRef, Id}, {Id, TPid}]), + monitor(process, TPid), ets:insert(Q, {TPid, now()}), TPid. +%% dq/1 +%% +%% Remove a transport process from the first pending queue to assign +%% it to an existing association. + +dq(Q) -> + TPid = ets:first(Q), + ets:delete(Q, TPid), + TPid. + %% assoc_id/1 +%% +%% It's unclear if this is needed, or if the first message on an +%% association is always sctp_assoc_change, but don't assume since +%% SCTP behaviour differs between operating systems. assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) -> Id; |