aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2015-06-18 13:25:38 +0200
committerAnders Svensson <[email protected]>2015-06-19 03:03:14 +0200
commitda3e5d67b3600f1844b6d4f734eff05ba577ad95 (patch)
tree11ebaea71c94e8abad74577f24a479398c2ec325
parentf3fefbae24a2569a13b538d80d0e99129963ebef (diff)
downloadotp-da3e5d67b3600f1844b6d4f734eff05ba577ad95.tar.gz
otp-da3e5d67b3600f1844b6d4f734eff05ba577ad95.tar.bz2
otp-da3e5d67b3600f1844b6d4f734eff05ba577ad95.zip
Remove assumption that SCTP association ids will be unique
This is not the case under Solaris for one: successive associations can receive the same association id as a result of peeloff, the id only being unique for the controlling port, not for the listening port as is the case under Linux for example. This made for many failures in the diameter test suites, the traffic suite in particular. Peeloff in diameter_sctp was introduced in 9a671bf0, before which the assumption was fine since it was the listening process that owned all associations. (Which obviously had other drawbacks.) Other remnants of the pre-peeloff implementation have also been removed: that the listener process might receive a message on a socket after peeloff for one. Peeloff in gen_sctp became available in commit 067cfe79, after the original implementation of diameter_sctp. This is trace on the unpatched code showing id reuse under Solaris: + {trace_ts,<0.103.0>,call, {diameter_sctp,handle_info, [{sctp,#Port<0.1625>, {127,0,0,1}, 35904, {[],{sctp_assoc_change,comm_up,0,32,32,1}}}, {listener,#Ref<0.0.1.948>,#Port<0.1625>,4, 57384, {-4,61481}, #Ref<0.0.8.12>, []}]}, {1432,458752,612168}} + {trace_ts,<0.103.0>,call, {diameter_sctp,handle_info, [{sctp,#Port<0.1625>, {127,0,0,1}, 35905, {[],{sctp_assoc_change,comm_up,0,32,32,1}}}, {listener,#Ref<0.0.1.948>,#Port<0.1625>,4, 57384, {-3,61481}, #Ref<0.0.8.12>, []}]}, {1432,458752,613042}} The result was this, when the second association was incorrectly forwarded to the first association's controlling process: ** {function_clause, [{diameter_sctp,transition, [{peeloff,#Port<0.1635>, {sctp,#Port<0.1625>, {127,0,0,1}, 35892, {[],{sctp_assoc_change,comm_up,0,32,32,1}}}, []}, {transport,<0.107.0>,accept,#Port<0.1634>,1,undefined,{32,32},0}], [{file,"transport/diameter_sctp.erl"},{line,561}]}, {diameter_sctp,t,2,[{file,"transport/diameter_sctp.erl"},{line,549}]}, {diameter_sctp,handle_info,2, [{file,"transport/diameter_sctp.erl"},{line,397}]}, {gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,614}]}, {gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,680}]}, {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,238}]}]}
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl182
1 files changed, 77 insertions, 105 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index f80de0a816..43039ebd6e 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -103,15 +103,11 @@
{ref :: reference(),
socket :: gen_sctp:sctp_socket(),
count = 0 :: uint(), %% attached transport processes
- tmap = ets:new(?MODULE, []) :: ets:tid(),
- %% {MRef, Pid|AssocId}, {AssocId, Pid}
pending = {0, ets:new(?MODULE, [ordered_set])},
tref :: reference(),
accept :: [match()]}).
-%% Field tmap is used to map an incoming message or event to the
-%% relevant transport process. Field pending implements two queues:
-%% the first of transport-to-be processes to which an association has
-%% been assigned (at comm_up and written into tmap) but for which
+%% Field pending implements two queues: the first of transport-to-be
+%% processes to which an association has been assigned but for which
%% diameter hasn't yet spawned a transport process, a short-lived
%% state of affairs as a new transport is spawned as a consequence of
%% a peer being taken up, transport processes being spawned by the
@@ -297,7 +293,7 @@ listener({LRef, T}) ->
l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
{LAs, _Sock} = AS,
{LPid, LAs};
-
+
%% ... or not.
l([], LRef, T) ->
{ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}),
@@ -489,19 +485,13 @@ start_timer(S) ->
%% Incoming message from SCTP.
l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) ->
Id = assoc_id(Data),
+ {TPid, NewS} = accept(Id, S),
+ TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept},
+ setopts(Sock),
+ NewS;
- try find(Id, Data, S) of
- {TPid, NewS} ->
- TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept},
- NewS;
- false ->
- S
- after
- setopts(Sock)
- end;
-
-l({'DOWN', MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) ->
- down(ets:member(Q, TPid), MRef, TPid, S);
+l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) ->
+ down(ets:member(Q, TPid), TPid, S);
%% Timeout after the last accepting process has died.
l({timeout, TRef, close = T}, #listener{tref = TRef,
@@ -510,36 +500,25 @@ l({timeout, TRef, close = T}, #listener{tref = TRef,
l({timeout, _, close}, #listener{} = S) ->
S.
-%% down/4
+%% down/3
+%%
+%% Accepting transport has died.
-%% Accepting transport has died. One that's awaiting an association ...
-down(true, MRef, TPid, #listener{pending = {N,Q},
- tmap = T,
- count = K}
- = S)
- when N < 0 ->
+%% One that's waiting for transport start in the pending queue ...
+down(true, TPid, #listener{pending = {N,Q},
+ count = K}
+ = S) ->
ets:delete(Q, TPid),
- ets:delete(T, MRef),
- ets:delete(T, TPid),
- start_timer(S#listener{count = K-1,
- pending = {N+1,Q}});
-
-%% ... or one that already has one.
-down(B, MRef, TPid, #listener{socket = Sock,
- tmap = T,
- count = K,
- pending = {N,Q}}
- = S) ->
- [{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId
- ets:delete(T, MRef),
- ets:delete(T, Id),
- Id == TPid orelse close(Sock, Id),
- if B -> %% Waiting for attachment in the pending queue ...
- ets:delete(Q, TPid),
- S#listener{pending = {N-1,Q}};
- true -> %% ... or already attached
- start_timer(S#listener{count = K-1})
- end.
+ if N < 0 -> %% awaiting an association ...
+ start_timer(S#listener{count = K-1,
+ pending = {N+1,Q}});
+ true -> %% ... or one has been assigned
+ S#listener{pending = {N-1,Q}}
+ end;
+
+%% ... or one that's already attached.
+down(false, _TPid, #listener{count = K} = S) ->
+ start_timer(S#listener{count = K-1}).
%% t/2
%%
@@ -563,14 +542,12 @@ transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg, Matches},
socket = LSock}
= S) ->
ok = accept_peer(Sock, Matches),
- transition(Msg, S#transport{socket = Sock});
+ transition(setelement(2, Msg, Sock), S#transport{socket = Sock});
%% Incoming message.
-transition({sctp, _Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
+transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
setopts(Sock),
recv(Data, S);
-%% Don't match on Sock since in R15B01 it can be the listening socket
-%% in the (peeled-off) accept case, which is likely a bug.
%% Outgoing message.
transition({diameter, {send, Msg}}, S) ->
@@ -633,29 +610,21 @@ accept(Opts) ->
%% accept/3
%%
%% Start a new transport process or use one that's already been
-%% started as a consequence of association establishment.
-
-%% No pending associations: spawn a new transport.
-accept(Ref, Pid, #listener{socket = Sock,
- tmap = T,
- pending = {N,Q}})
- when N =< 0 ->
- Arg = {accept, Pid, self(), Sock, Ref},
- {ok, TPid} = diameter_sctp_sup:start_child(Arg),
- MRef = monitor(process, TPid),
- ets:insert(T, [{MRef, TPid}, {TPid, MRef}]),
- ets:insert(Q, {TPid, now()}),
- TPid;
-%% Placing the transport in the second pending table makes it
-%% available to the next association.
+%% started as a consequence of diameter requesting a transport
+%% process.
%% Pending associations: attach to the first in the queue.
accept(_, Pid, #listener{ref = Ref,
- pending = {_,Q}}) ->
- TPid = ets:first(Q),
+ pending = {N,Q}})
+ when 0 < N ->
+ TPid = dq(Q),
TPid ! {Ref, Pid},
- ets:delete(Q, TPid),
- TPid.
+ TPid;
+
+%% No pending associations: spawn a new transport.
+accept(Ref, Pid, #listener{socket = Sock,
+ pending = {_,Q}}) ->
+ nq({accept, Pid, self(), Sock, Ref}, Q).
%% send/2
@@ -716,7 +685,7 @@ recv({_, #sctp_assoc_change{} = E},
= S) ->
S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}};
-%% Lost association after establishment.
+%% Association failure.
recv({_, #sctp_assoc_change{}}, _) ->
stop;
@@ -727,8 +696,10 @@ recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid})
bin = Bin}),
ok;
-recv({_, #sctp_shutdown_event{assoc_id = Id}},
- #transport{assoc_id = Id}) ->
+recv({_, #sctp_shutdown_event{assoc_id = A}},
+ #transport{assoc_id = Id})
+ when A == Id;
+ A == 0 ->
stop;
%% Note that diameter_sctp(3) documents that sctp_events cannot be
@@ -764,52 +735,53 @@ up(#transport{parent = Pid,
diameter_peer:up(Pid),
S#transport{mode = A}.
-%% find/3
-
-find(Id, Data, #listener{tmap = T} = S) ->
- f(ets:lookup(T, Id), Data, S).
-
-%% New association ...
-f([],
- {_, #sctp_assoc_change{state = comm_up, assoc_id = Id}},
- #listener{pending = {N,Q}}
- = S) ->
- {find(Id, S), S#listener{pending = {N+1,Q}}};
-
-%% Known association ...
-f([{_, TPid}], _, S) ->
- {TPid, S};
+%% accept/2
+%%
+%% Start a new transport process or use one that's already been
+%% started as a consequence of an event to a listener process.
-%% ... or not: discard.
-f([], _, _) ->
- false.
+accept(Id, #listener{pending = {N,Q}} = S) ->
+ {tpid(Id, S), S#listener{pending = {N+1,Q}}}.
-%% find/2
+%% tpid/2
%% Transport waiting for an association: use it.
-find(Id, #listener{tmap = T,
- pending = {N,Q}})
+tpid(_Id, #listener{pending = {N,Q}})
when N < 0 ->
- TPid = ets:first(Q),
- [{TPid, MRef}] = ets:lookup(T, TPid),
- ets:insert(T, [{MRef, Id}, {Id, TPid}]),
- ets:delete(T, TPid),
- ets:delete(Q, TPid),
- TPid;
+ dq(Q);
%% No transport start yet: spawn one and queue.
-find(Id, #listener{ref = Ref,
+tpid(Id, #listener{ref = Ref,
socket = Sock,
- tmap = T,
pending = {_,Q}}) ->
- Arg = {accept, Ref, self(), Sock, Id},
+ nq({accept, Ref, self(), Sock, Id}, Q).
+
+%% nq/2
+%%
+%% Place a transport process in the second pending queue to make it
+%% available to the next association.
+
+nq(Arg, Q) ->
{ok, TPid} = diameter_sctp_sup:start_child(Arg),
- MRef = monitor(process, TPid),
- ets:insert(T, [{MRef, Id}, {Id, TPid}]),
+ monitor(process, TPid),
ets:insert(Q, {TPid, now()}),
TPid.
+%% dq/1
+%%
+%% Remove a transport process from the first pending queue to assign
+%% it to an existing association.
+
+dq(Q) ->
+ TPid = ets:first(Q),
+ ets:delete(Q, TPid),
+ TPid.
+
%% assoc_id/1
+%%
+%% It's unclear if this is needed, or if the first message on an
+%% association is always sctp_assoc_change, but don't assume since
+%% SCTP behaviour differs between operating systems.
assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) ->
Id;