aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl182
1 files changed, 77 insertions, 105 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index f80de0a816..43039ebd6e 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -103,15 +103,11 @@
{ref :: reference(),
socket :: gen_sctp:sctp_socket(),
count = 0 :: uint(), %% attached transport processes
- tmap = ets:new(?MODULE, []) :: ets:tid(),
- %% {MRef, Pid|AssocId}, {AssocId, Pid}
pending = {0, ets:new(?MODULE, [ordered_set])},
tref :: reference(),
accept :: [match()]}).
-%% Field tmap is used to map an incoming message or event to the
-%% relevant transport process. Field pending implements two queues:
-%% the first of transport-to-be processes to which an association has
-%% been assigned (at comm_up and written into tmap) but for which
+%% Field pending implements two queues: the first of transport-to-be
+%% processes to which an association has been assigned but for which
%% diameter hasn't yet spawned a transport process, a short-lived
%% state of affairs as a new transport is spawned as a consequence of
%% a peer being taken up, transport processes being spawned by the
@@ -297,7 +293,7 @@ listener({LRef, T}) ->
l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
{LAs, _Sock} = AS,
{LPid, LAs};
-
+
%% ... or not.
l([], LRef, T) ->
{ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}),
@@ -489,19 +485,13 @@ start_timer(S) ->
%% Incoming message from SCTP.
l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) ->
Id = assoc_id(Data),
+ {TPid, NewS} = accept(Id, S),
+ TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept},
+ setopts(Sock),
+ NewS;
- try find(Id, Data, S) of
- {TPid, NewS} ->
- TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept},
- NewS;
- false ->
- S
- after
- setopts(Sock)
- end;
-
-l({'DOWN', MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) ->
- down(ets:member(Q, TPid), MRef, TPid, S);
+l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) ->
+ down(ets:member(Q, TPid), TPid, S);
%% Timeout after the last accepting process has died.
l({timeout, TRef, close = T}, #listener{tref = TRef,
@@ -510,36 +500,25 @@ l({timeout, TRef, close = T}, #listener{tref = TRef,
l({timeout, _, close}, #listener{} = S) ->
S.
-%% down/4
+%% down/3
+%%
+%% Accepting transport has died.
-%% Accepting transport has died. One that's awaiting an association ...
-down(true, MRef, TPid, #listener{pending = {N,Q},
- tmap = T,
- count = K}
- = S)
- when N < 0 ->
+%% One that's waiting for transport start in the pending queue ...
+down(true, TPid, #listener{pending = {N,Q},
+ count = K}
+ = S) ->
ets:delete(Q, TPid),
- ets:delete(T, MRef),
- ets:delete(T, TPid),
- start_timer(S#listener{count = K-1,
- pending = {N+1,Q}});
-
-%% ... or one that already has one.
-down(B, MRef, TPid, #listener{socket = Sock,
- tmap = T,
- count = K,
- pending = {N,Q}}
- = S) ->
- [{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId
- ets:delete(T, MRef),
- ets:delete(T, Id),
- Id == TPid orelse close(Sock, Id),
- if B -> %% Waiting for attachment in the pending queue ...
- ets:delete(Q, TPid),
- S#listener{pending = {N-1,Q}};
- true -> %% ... or already attached
- start_timer(S#listener{count = K-1})
- end.
+ if N < 0 -> %% awaiting an association ...
+ start_timer(S#listener{count = K-1,
+ pending = {N+1,Q}});
+ true -> %% ... or one has been assigned
+ S#listener{pending = {N-1,Q}}
+ end;
+
+%% ... or one that's already attached.
+down(false, _TPid, #listener{count = K} = S) ->
+ start_timer(S#listener{count = K-1}).
%% t/2
%%
@@ -563,14 +542,12 @@ transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg, Matches},
socket = LSock}
= S) ->
ok = accept_peer(Sock, Matches),
- transition(Msg, S#transport{socket = Sock});
+ transition(setelement(2, Msg, Sock), S#transport{socket = Sock});
%% Incoming message.
-transition({sctp, _Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
+transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
setopts(Sock),
recv(Data, S);
-%% Don't match on Sock since in R15B01 it can be the listening socket
-%% in the (peeled-off) accept case, which is likely a bug.
%% Outgoing message.
transition({diameter, {send, Msg}}, S) ->
@@ -633,29 +610,21 @@ accept(Opts) ->
%% accept/3
%%
%% Start a new transport process or use one that's already been
-%% started as a consequence of association establishment.
-
-%% No pending associations: spawn a new transport.
-accept(Ref, Pid, #listener{socket = Sock,
- tmap = T,
- pending = {N,Q}})
- when N =< 0 ->
- Arg = {accept, Pid, self(), Sock, Ref},
- {ok, TPid} = diameter_sctp_sup:start_child(Arg),
- MRef = monitor(process, TPid),
- ets:insert(T, [{MRef, TPid}, {TPid, MRef}]),
- ets:insert(Q, {TPid, now()}),
- TPid;
-%% Placing the transport in the second pending table makes it
-%% available to the next association.
+%% started as a consequence of diameter requesting a transport
+%% process.
%% Pending associations: attach to the first in the queue.
accept(_, Pid, #listener{ref = Ref,
- pending = {_,Q}}) ->
- TPid = ets:first(Q),
+ pending = {N,Q}})
+ when 0 < N ->
+ TPid = dq(Q),
TPid ! {Ref, Pid},
- ets:delete(Q, TPid),
- TPid.
+ TPid;
+
+%% No pending associations: spawn a new transport.
+accept(Ref, Pid, #listener{socket = Sock,
+ pending = {_,Q}}) ->
+ nq({accept, Pid, self(), Sock, Ref}, Q).
%% send/2
@@ -716,7 +685,7 @@ recv({_, #sctp_assoc_change{} = E},
= S) ->
S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}};
-%% Lost association after establishment.
+%% Association failure.
recv({_, #sctp_assoc_change{}}, _) ->
stop;
@@ -727,8 +696,10 @@ recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid})
bin = Bin}),
ok;
-recv({_, #sctp_shutdown_event{assoc_id = Id}},
- #transport{assoc_id = Id}) ->
+recv({_, #sctp_shutdown_event{assoc_id = A}},
+ #transport{assoc_id = Id})
+ when A == Id;
+ A == 0 ->
stop;
%% Note that diameter_sctp(3) documents that sctp_events cannot be
@@ -764,52 +735,53 @@ up(#transport{parent = Pid,
diameter_peer:up(Pid),
S#transport{mode = A}.
-%% find/3
-
-find(Id, Data, #listener{tmap = T} = S) ->
- f(ets:lookup(T, Id), Data, S).
-
-%% New association ...
-f([],
- {_, #sctp_assoc_change{state = comm_up, assoc_id = Id}},
- #listener{pending = {N,Q}}
- = S) ->
- {find(Id, S), S#listener{pending = {N+1,Q}}};
-
-%% Known association ...
-f([{_, TPid}], _, S) ->
- {TPid, S};
+%% accept/2
+%%
+%% Start a new transport process or use one that's already been
+%% started as a consequence of an event to a listener process.
-%% ... or not: discard.
-f([], _, _) ->
- false.
+accept(Id, #listener{pending = {N,Q}} = S) ->
+ {tpid(Id, S), S#listener{pending = {N+1,Q}}}.
-%% find/2
+%% tpid/2
%% Transport waiting for an association: use it.
-find(Id, #listener{tmap = T,
- pending = {N,Q}})
+tpid(_Id, #listener{pending = {N,Q}})
when N < 0 ->
- TPid = ets:first(Q),
- [{TPid, MRef}] = ets:lookup(T, TPid),
- ets:insert(T, [{MRef, Id}, {Id, TPid}]),
- ets:delete(T, TPid),
- ets:delete(Q, TPid),
- TPid;
+ dq(Q);
%% No transport start yet: spawn one and queue.
-find(Id, #listener{ref = Ref,
+tpid(Id, #listener{ref = Ref,
socket = Sock,
- tmap = T,
pending = {_,Q}}) ->
- Arg = {accept, Ref, self(), Sock, Id},
+ nq({accept, Ref, self(), Sock, Id}, Q).
+
+%% nq/2
+%%
+%% Place a transport process in the second pending queue to make it
+%% available to the next association.
+
+nq(Arg, Q) ->
{ok, TPid} = diameter_sctp_sup:start_child(Arg),
- MRef = monitor(process, TPid),
- ets:insert(T, [{MRef, Id}, {Id, TPid}]),
+ monitor(process, TPid),
ets:insert(Q, {TPid, now()}),
TPid.
+%% dq/1
+%%
+%% Remove a transport process from the first pending queue to assign
+%% it to an existing association.
+
+dq(Q) ->
+ TPid = ets:first(Q),
+ ets:delete(Q, TPid),
+ TPid.
+
%% assoc_id/1
+%%
+%% It's unclear if this is needed, or if the first message on an
+%% association is always sctp_assoc_change, but don't assume since
+%% SCTP behaviour differs between operating systems.
assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) ->
Id;