diff options
author | Anders Svensson <[email protected]> | 2015-06-18 13:25:38 +0200 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2015-06-19 03:03:14 +0200 |
commit | da3e5d67b3600f1844b6d4f734eff05ba577ad95 (patch) | |
tree | 11ebaea71c94e8abad74577f24a479398c2ec325 /lib | |
parent | f3fefbae24a2569a13b538d80d0e99129963ebef (diff) | |
download | otp-da3e5d67b3600f1844b6d4f734eff05ba577ad95.tar.gz otp-da3e5d67b3600f1844b6d4f734eff05ba577ad95.tar.bz2 otp-da3e5d67b3600f1844b6d4f734eff05ba577ad95.zip |
Remove assumption that SCTP association ids will be unique
This is not the case under Solaris for one: successive
associations can receive the same association id as a result of peeloff,
the id only being unique for the controlling port, not for the listening
port as is the case under Linux for example. This made for many failures
in the diameter test suites, the traffic suite in particular.
Peeloff in diameter_sctp was introduced in 9a671bf0, before which the
assumption was fine since it was the listening process that owned all
associations. (Which obviously had other drawbacks.) Other remnants of
the pre-peeloff implementation have also been removed: that the listener
process might receive a message on a socket after peeloff for one.
Peeloff in gen_sctp became available in commit 067cfe79, after the
original implementation of diameter_sctp.
This is trace on the unpatched code showing id reuse under Solaris:
+ {trace_ts,<0.103.0>,call,
{diameter_sctp,handle_info,
[{sctp,#Port<0.1625>,
{127,0,0,1},
35904,
{[],{sctp_assoc_change,comm_up,0,32,32,1}}},
{listener,#Ref<0.0.1.948>,#Port<0.1625>,4,
57384,
{-4,61481},
#Ref<0.0.8.12>,
[]}]},
{1432,458752,612168}}
+ {trace_ts,<0.103.0>,call,
{diameter_sctp,handle_info,
[{sctp,#Port<0.1625>,
{127,0,0,1},
35905,
{[],{sctp_assoc_change,comm_up,0,32,32,1}}},
{listener,#Ref<0.0.1.948>,#Port<0.1625>,4,
57384,
{-3,61481},
#Ref<0.0.8.12>,
[]}]},
{1432,458752,613042}}
The result was this, when the second association was incorrectly
forwarded to the first association's controlling process:
** {function_clause,
[{diameter_sctp,transition,
[{peeloff,#Port<0.1635>,
{sctp,#Port<0.1625>,
{127,0,0,1},
35892,
{[],{sctp_assoc_change,comm_up,0,32,32,1}}},
[]},
{transport,<0.107.0>,accept,#Port<0.1634>,1,undefined,{32,32},0}],
[{file,"transport/diameter_sctp.erl"},{line,561}]},
{diameter_sctp,t,2,[{file,"transport/diameter_sctp.erl"},{line,549}]},
{diameter_sctp,handle_info,2,
[{file,"transport/diameter_sctp.erl"},{line,397}]},
{gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,614}]},
{gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,680}]},
{proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,238}]}]}
Diffstat (limited to 'lib')
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 182 |
1 files changed, 77 insertions, 105 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index f80de0a816..43039ebd6e 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -103,15 +103,11 @@ {ref :: reference(), socket :: gen_sctp:sctp_socket(), count = 0 :: uint(), %% attached transport processes - tmap = ets:new(?MODULE, []) :: ets:tid(), - %% {MRef, Pid|AssocId}, {AssocId, Pid} pending = {0, ets:new(?MODULE, [ordered_set])}, tref :: reference(), accept :: [match()]}). -%% Field tmap is used to map an incoming message or event to the -%% relevant transport process. Field pending implements two queues: -%% the first of transport-to-be processes to which an association has -%% been assigned (at comm_up and written into tmap) but for which +%% Field pending implements two queues: the first of transport-to-be +%% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived %% state of affairs as a new transport is spawned as a consequence of %% a peer being taken up, transport processes being spawned by the @@ -297,7 +293,7 @@ listener({LRef, T}) -> l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> {LAs, _Sock} = AS, {LPid, LAs}; - + %% ... or not. l([], LRef, T) -> {ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}), @@ -489,19 +485,13 @@ start_timer(S) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) -> Id = assoc_id(Data), + {TPid, NewS} = accept(Id, S), + TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept}, + setopts(Sock), + NewS; - try find(Id, Data, S) of - {TPid, NewS} -> - TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept}, - NewS; - false -> - S - after - setopts(Sock) - end; - -l({'DOWN', MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> - down(ets:member(Q, TPid), MRef, TPid, S); +l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> + down(ets:member(Q, TPid), TPid, S); %% Timeout after the last accepting process has died. l({timeout, TRef, close = T}, #listener{tref = TRef, @@ -510,36 +500,25 @@ l({timeout, TRef, close = T}, #listener{tref = TRef, l({timeout, _, close}, #listener{} = S) -> S. -%% down/4 +%% down/3 +%% +%% Accepting transport has died. -%% Accepting transport has died. One that's awaiting an association ... -down(true, MRef, TPid, #listener{pending = {N,Q}, - tmap = T, - count = K} - = S) - when N < 0 -> +%% One that's waiting for transport start in the pending queue ... +down(true, TPid, #listener{pending = {N,Q}, + count = K} + = S) -> ets:delete(Q, TPid), - ets:delete(T, MRef), - ets:delete(T, TPid), - start_timer(S#listener{count = K-1, - pending = {N+1,Q}}); - -%% ... or one that already has one. -down(B, MRef, TPid, #listener{socket = Sock, - tmap = T, - count = K, - pending = {N,Q}} - = S) -> - [{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId - ets:delete(T, MRef), - ets:delete(T, Id), - Id == TPid orelse close(Sock, Id), - if B -> %% Waiting for attachment in the pending queue ... - ets:delete(Q, TPid), - S#listener{pending = {N-1,Q}}; - true -> %% ... or already attached - start_timer(S#listener{count = K-1}) - end. + if N < 0 -> %% awaiting an association ... + start_timer(S#listener{count = K-1, + pending = {N+1,Q}}); + true -> %% ... or one has been assigned + S#listener{pending = {N-1,Q}} + end; + +%% ... or one that's already attached. +down(false, _TPid, #listener{count = K} = S) -> + start_timer(S#listener{count = K-1}). %% t/2 %% @@ -563,14 +542,12 @@ transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg, Matches}, socket = LSock} = S) -> ok = accept_peer(Sock, Matches), - transition(Msg, S#transport{socket = Sock}); + transition(setelement(2, Msg, Sock), S#transport{socket = Sock}); %% Incoming message. -transition({sctp, _Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> +transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> setopts(Sock), recv(Data, S); -%% Don't match on Sock since in R15B01 it can be the listening socket -%% in the (peeled-off) accept case, which is likely a bug. %% Outgoing message. transition({diameter, {send, Msg}}, S) -> @@ -633,29 +610,21 @@ accept(Opts) -> %% accept/3 %% %% Start a new transport process or use one that's already been -%% started as a consequence of association establishment. - -%% No pending associations: spawn a new transport. -accept(Ref, Pid, #listener{socket = Sock, - tmap = T, - pending = {N,Q}}) - when N =< 0 -> - Arg = {accept, Pid, self(), Sock, Ref}, - {ok, TPid} = diameter_sctp_sup:start_child(Arg), - MRef = monitor(process, TPid), - ets:insert(T, [{MRef, TPid}, {TPid, MRef}]), - ets:insert(Q, {TPid, now()}), - TPid; -%% Placing the transport in the second pending table makes it -%% available to the next association. +%% started as a consequence of diameter requesting a transport +%% process. %% Pending associations: attach to the first in the queue. accept(_, Pid, #listener{ref = Ref, - pending = {_,Q}}) -> - TPid = ets:first(Q), + pending = {N,Q}}) + when 0 < N -> + TPid = dq(Q), TPid ! {Ref, Pid}, - ets:delete(Q, TPid), - TPid. + TPid; + +%% No pending associations: spawn a new transport. +accept(Ref, Pid, #listener{socket = Sock, + pending = {_,Q}}) -> + nq({accept, Pid, self(), Sock, Ref}, Q). %% send/2 @@ -716,7 +685,7 @@ recv({_, #sctp_assoc_change{} = E}, = S) -> S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}}; -%% Lost association after establishment. +%% Association failure. recv({_, #sctp_assoc_change{}}, _) -> stop; @@ -727,8 +696,10 @@ recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid}) bin = Bin}), ok; -recv({_, #sctp_shutdown_event{assoc_id = Id}}, - #transport{assoc_id = Id}) -> +recv({_, #sctp_shutdown_event{assoc_id = A}}, + #transport{assoc_id = Id}) + when A == Id; + A == 0 -> stop; %% Note that diameter_sctp(3) documents that sctp_events cannot be @@ -764,52 +735,53 @@ up(#transport{parent = Pid, diameter_peer:up(Pid), S#transport{mode = A}. -%% find/3 - -find(Id, Data, #listener{tmap = T} = S) -> - f(ets:lookup(T, Id), Data, S). - -%% New association ... -f([], - {_, #sctp_assoc_change{state = comm_up, assoc_id = Id}}, - #listener{pending = {N,Q}} - = S) -> - {find(Id, S), S#listener{pending = {N+1,Q}}}; - -%% Known association ... -f([{_, TPid}], _, S) -> - {TPid, S}; +%% accept/2 +%% +%% Start a new transport process or use one that's already been +%% started as a consequence of an event to a listener process. -%% ... or not: discard. -f([], _, _) -> - false. +accept(Id, #listener{pending = {N,Q}} = S) -> + {tpid(Id, S), S#listener{pending = {N+1,Q}}}. -%% find/2 +%% tpid/2 %% Transport waiting for an association: use it. -find(Id, #listener{tmap = T, - pending = {N,Q}}) +tpid(_Id, #listener{pending = {N,Q}}) when N < 0 -> - TPid = ets:first(Q), - [{TPid, MRef}] = ets:lookup(T, TPid), - ets:insert(T, [{MRef, Id}, {Id, TPid}]), - ets:delete(T, TPid), - ets:delete(Q, TPid), - TPid; + dq(Q); %% No transport start yet: spawn one and queue. -find(Id, #listener{ref = Ref, +tpid(Id, #listener{ref = Ref, socket = Sock, - tmap = T, pending = {_,Q}}) -> - Arg = {accept, Ref, self(), Sock, Id}, + nq({accept, Ref, self(), Sock, Id}, Q). + +%% nq/2 +%% +%% Place a transport process in the second pending queue to make it +%% available to the next association. + +nq(Arg, Q) -> {ok, TPid} = diameter_sctp_sup:start_child(Arg), - MRef = monitor(process, TPid), - ets:insert(T, [{MRef, Id}, {Id, TPid}]), + monitor(process, TPid), ets:insert(Q, {TPid, now()}), TPid. +%% dq/1 +%% +%% Remove a transport process from the first pending queue to assign +%% it to an existing association. + +dq(Q) -> + TPid = ets:first(Q), + ets:delete(Q, TPid), + TPid. + %% assoc_id/1 +%% +%% It's unclear if this is needed, or if the first message on an +%% association is always sctp_assoc_change, but don't assume since +%% SCTP behaviour differs between operating systems. assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) -> Id; |