From da3e5d67b3600f1844b6d4f734eff05ba577ad95 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 18 Jun 2015 13:25:38 +0200 Subject: Remove assumption that SCTP association ids will be unique This is not the case under Solaris for one: successive associations can receive the same association id as a result of peeloff, the id only being unique for the controlling port, not for the listening port as is the case under Linux for example. This made for many failures in the diameter test suites, the traffic suite in particular. Peeloff in diameter_sctp was introduced in 9a671bf0, before which the assumption was fine since it was the listening process that owned all associations. (Which obviously had other drawbacks.) Other remnants of the pre-peeloff implementation have also been removed: that the listener process might receive a message on a socket after peeloff for one. Peeloff in gen_sctp became available in commit 067cfe79, after the original implementation of diameter_sctp. This is trace on the unpatched code showing id reuse under Solaris: + {trace_ts,<0.103.0>,call, {diameter_sctp,handle_info, [{sctp,#Port<0.1625>, {127,0,0,1}, 35904, {[],{sctp_assoc_change,comm_up,0,32,32,1}}}, {listener,#Ref<0.0.1.948>,#Port<0.1625>,4, 57384, {-4,61481}, #Ref<0.0.8.12>, []}]}, {1432,458752,612168}} + {trace_ts,<0.103.0>,call, {diameter_sctp,handle_info, [{sctp,#Port<0.1625>, {127,0,0,1}, 35905, {[],{sctp_assoc_change,comm_up,0,32,32,1}}}, {listener,#Ref<0.0.1.948>,#Port<0.1625>,4, 57384, {-3,61481}, #Ref<0.0.8.12>, []}]}, {1432,458752,613042}} The result was this, when the second association was incorrectly forwarded to the first association's controlling process: ** {function_clause, [{diameter_sctp,transition, [{peeloff,#Port<0.1635>, {sctp,#Port<0.1625>, {127,0,0,1}, 35892, {[],{sctp_assoc_change,comm_up,0,32,32,1}}}, []}, {transport,<0.107.0>,accept,#Port<0.1634>,1,undefined,{32,32},0}], [{file,"transport/diameter_sctp.erl"},{line,561}]}, {diameter_sctp,t,2,[{file,"transport/diameter_sctp.erl"},{line,549}]}, {diameter_sctp,handle_info,2, [{file,"transport/diameter_sctp.erl"},{line,397}]}, {gen_server,try_dispatch,4,[{file,"gen_server.erl"},{line,614}]}, {gen_server,handle_msg,5,[{file,"gen_server.erl"},{line,680}]}, {proc_lib,init_p_do_apply,3,[{file,"proc_lib.erl"},{line,238}]}]} --- lib/diameter/src/transport/diameter_sctp.erl | 182 ++++++++++++--------------- 1 file changed, 77 insertions(+), 105 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index f80de0a816..43039ebd6e 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -103,15 +103,11 @@ {ref :: reference(), socket :: gen_sctp:sctp_socket(), count = 0 :: uint(), %% attached transport processes - tmap = ets:new(?MODULE, []) :: ets:tid(), - %% {MRef, Pid|AssocId}, {AssocId, Pid} pending = {0, ets:new(?MODULE, [ordered_set])}, tref :: reference(), accept :: [match()]}). -%% Field tmap is used to map an incoming message or event to the -%% relevant transport process. Field pending implements two queues: -%% the first of transport-to-be processes to which an association has -%% been assigned (at comm_up and written into tmap) but for which +%% Field pending implements two queues: the first of transport-to-be +%% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived %% state of affairs as a new transport is spawned as a consequence of %% a peer being taken up, transport processes being spawned by the @@ -297,7 +293,7 @@ listener({LRef, T}) -> l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> {LAs, _Sock} = AS, {LPid, LAs}; - + %% ... or not. l([], LRef, T) -> {ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}), @@ -489,19 +485,13 @@ start_timer(S) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) -> Id = assoc_id(Data), + {TPid, NewS} = accept(Id, S), + TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept}, + setopts(Sock), + NewS; - try find(Id, Data, S) of - {TPid, NewS} -> - TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept}, - NewS; - false -> - S - after - setopts(Sock) - end; - -l({'DOWN', MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> - down(ets:member(Q, TPid), MRef, TPid, S); +l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> + down(ets:member(Q, TPid), TPid, S); %% Timeout after the last accepting process has died. l({timeout, TRef, close = T}, #listener{tref = TRef, @@ -510,36 +500,25 @@ l({timeout, TRef, close = T}, #listener{tref = TRef, l({timeout, _, close}, #listener{} = S) -> S. -%% down/4 +%% down/3 +%% +%% Accepting transport has died. -%% Accepting transport has died. One that's awaiting an association ... -down(true, MRef, TPid, #listener{pending = {N,Q}, - tmap = T, - count = K} - = S) - when N < 0 -> +%% One that's waiting for transport start in the pending queue ... +down(true, TPid, #listener{pending = {N,Q}, + count = K} + = S) -> ets:delete(Q, TPid), - ets:delete(T, MRef), - ets:delete(T, TPid), - start_timer(S#listener{count = K-1, - pending = {N+1,Q}}); - -%% ... or one that already has one. -down(B, MRef, TPid, #listener{socket = Sock, - tmap = T, - count = K, - pending = {N,Q}} - = S) -> - [{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId - ets:delete(T, MRef), - ets:delete(T, Id), - Id == TPid orelse close(Sock, Id), - if B -> %% Waiting for attachment in the pending queue ... - ets:delete(Q, TPid), - S#listener{pending = {N-1,Q}}; - true -> %% ... or already attached - start_timer(S#listener{count = K-1}) - end. + if N < 0 -> %% awaiting an association ... + start_timer(S#listener{count = K-1, + pending = {N+1,Q}}); + true -> %% ... or one has been assigned + S#listener{pending = {N-1,Q}} + end; + +%% ... or one that's already attached. +down(false, _TPid, #listener{count = K} = S) -> + start_timer(S#listener{count = K-1}). %% t/2 %% @@ -563,14 +542,12 @@ transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg, Matches}, socket = LSock} = S) -> ok = accept_peer(Sock, Matches), - transition(Msg, S#transport{socket = Sock}); + transition(setelement(2, Msg, Sock), S#transport{socket = Sock}); %% Incoming message. -transition({sctp, _Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> +transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> setopts(Sock), recv(Data, S); -%% Don't match on Sock since in R15B01 it can be the listening socket -%% in the (peeled-off) accept case, which is likely a bug. %% Outgoing message. transition({diameter, {send, Msg}}, S) -> @@ -633,29 +610,21 @@ accept(Opts) -> %% accept/3 %% %% Start a new transport process or use one that's already been -%% started as a consequence of association establishment. - -%% No pending associations: spawn a new transport. -accept(Ref, Pid, #listener{socket = Sock, - tmap = T, - pending = {N,Q}}) - when N =< 0 -> - Arg = {accept, Pid, self(), Sock, Ref}, - {ok, TPid} = diameter_sctp_sup:start_child(Arg), - MRef = monitor(process, TPid), - ets:insert(T, [{MRef, TPid}, {TPid, MRef}]), - ets:insert(Q, {TPid, now()}), - TPid; -%% Placing the transport in the second pending table makes it -%% available to the next association. +%% started as a consequence of diameter requesting a transport +%% process. %% Pending associations: attach to the first in the queue. accept(_, Pid, #listener{ref = Ref, - pending = {_,Q}}) -> - TPid = ets:first(Q), + pending = {N,Q}}) + when 0 < N -> + TPid = dq(Q), TPid ! {Ref, Pid}, - ets:delete(Q, TPid), - TPid. + TPid; + +%% No pending associations: spawn a new transport. +accept(Ref, Pid, #listener{socket = Sock, + pending = {_,Q}}) -> + nq({accept, Pid, self(), Sock, Ref}, Q). %% send/2 @@ -716,7 +685,7 @@ recv({_, #sctp_assoc_change{} = E}, = S) -> S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}}; -%% Lost association after establishment. +%% Association failure. recv({_, #sctp_assoc_change{}}, _) -> stop; @@ -727,8 +696,10 @@ recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid}) bin = Bin}), ok; -recv({_, #sctp_shutdown_event{assoc_id = Id}}, - #transport{assoc_id = Id}) -> +recv({_, #sctp_shutdown_event{assoc_id = A}}, + #transport{assoc_id = Id}) + when A == Id; + A == 0 -> stop; %% Note that diameter_sctp(3) documents that sctp_events cannot be @@ -764,52 +735,53 @@ up(#transport{parent = Pid, diameter_peer:up(Pid), S#transport{mode = A}. -%% find/3 - -find(Id, Data, #listener{tmap = T} = S) -> - f(ets:lookup(T, Id), Data, S). - -%% New association ... -f([], - {_, #sctp_assoc_change{state = comm_up, assoc_id = Id}}, - #listener{pending = {N,Q}} - = S) -> - {find(Id, S), S#listener{pending = {N+1,Q}}}; - -%% Known association ... -f([{_, TPid}], _, S) -> - {TPid, S}; +%% accept/2 +%% +%% Start a new transport process or use one that's already been +%% started as a consequence of an event to a listener process. -%% ... or not: discard. -f([], _, _) -> - false. +accept(Id, #listener{pending = {N,Q}} = S) -> + {tpid(Id, S), S#listener{pending = {N+1,Q}}}. -%% find/2 +%% tpid/2 %% Transport waiting for an association: use it. -find(Id, #listener{tmap = T, - pending = {N,Q}}) +tpid(_Id, #listener{pending = {N,Q}}) when N < 0 -> - TPid = ets:first(Q), - [{TPid, MRef}] = ets:lookup(T, TPid), - ets:insert(T, [{MRef, Id}, {Id, TPid}]), - ets:delete(T, TPid), - ets:delete(Q, TPid), - TPid; + dq(Q); %% No transport start yet: spawn one and queue. -find(Id, #listener{ref = Ref, +tpid(Id, #listener{ref = Ref, socket = Sock, - tmap = T, pending = {_,Q}}) -> - Arg = {accept, Ref, self(), Sock, Id}, + nq({accept, Ref, self(), Sock, Id}, Q). + +%% nq/2 +%% +%% Place a transport process in the second pending queue to make it +%% available to the next association. + +nq(Arg, Q) -> {ok, TPid} = diameter_sctp_sup:start_child(Arg), - MRef = monitor(process, TPid), - ets:insert(T, [{MRef, Id}, {Id, TPid}]), + monitor(process, TPid), ets:insert(Q, {TPid, now()}), TPid. +%% dq/1 +%% +%% Remove a transport process from the first pending queue to assign +%% it to an existing association. + +dq(Q) -> + TPid = ets:first(Q), + ets:delete(Q, TPid), + TPid. + %% assoc_id/1 +%% +%% It's unclear if this is needed, or if the first message on an +%% association is always sctp_assoc_change, but don't assume since +%% SCTP behaviour differs between operating systems. assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) -> Id; -- cgit v1.2.3 From 083ae6d719a8ab5d626ad4b28e836475b6c1a92b Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 19 Jun 2015 00:56:55 +0200 Subject: Don't receive initial messages out of order Forwarding an sctp message from the listener process at the same time that the controlling process is changed means there's no guarantee that the message order will be preserved. Selectively receive the peeloff message before entering the gen_server loop to ensure the order is preserved. --- lib/diameter/src/transport/diameter_sctp.erl | 62 ++++++++++++++++------------ 1 file changed, 36 insertions(+), 26 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 43039ebd6e..29ac73a6a6 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -239,35 +239,49 @@ i({connect, Pid, Opts, Addrs, Ref}) -> mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock}; -%% An accepting transport spawned by diameter. -i({accept, Pid, LPid, Sock, Ref}) +%% An accepting transport spawned by diameter, not yet owning an +%% association. +i({accept, Pid, LPid, LSock, Ref}) when is_pid(Pid) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), monitor(process, Pid), monitor(process, LPid), - #transport{parent = Pid, - mode = {accept, LPid}, - socket = Sock}; + wait([peeloff], #transport{parent = Pid, + mode = {accept, LPid}, + socket = LSock}); %% An accepting transport spawned at association establishment. -i({accept, Ref, LPid, Sock, Id}) -> +i({accept, Ref, LPid, LSock, _Id}) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), - MRef = monitor(process, LPid), - %% Wait for a signal that the transport has been started before - %% processing other messages. + erlang:send_after(?ACCEPT_TIMEOUT, self(), accept_timeout), + monitor(process, LPid), + wait([Ref, peeloff], #transport{mode = {accept, LPid}, + socket = LSock}). + +%% wait/2 +%% +%% Wait for diameter to start the transport process and for the +%% association to be peeled off before processing other messages. + +wait(Keys, S) -> + lists:foldl(fun i/2, S, Keys). + +i(K, #transport{mode = {accept, _}, + socket = LSock} + = S) -> receive - {Ref, Pid} -> %% transport started - #transport{parent = Pid, - mode = {accept, LPid}, - socket = Sock}; - {'DOWN', MRef, process, _, _} = T -> %% listener down - close(Sock, Id), + {K, Pid} when is_reference(K) -> %% transport process started + S#transport{parent = Pid}; + {K, Sock, T, Matches} when K == peeloff -> %% association + {sctp, LSock, _RA, _RP, _Data} = T, %% assert + ok = accept_peer(Sock, Matches), + t(setelement(2, T, Sock), S#transport{socket = Sock}); + accept_timeout = T -> + x(T); + {'DOWN', _, process, _, _} = T -> x(T) - after ?ACCEPT_TIMEOUT -> - close(Sock, Id), - x(timeout) end. %% close/2 @@ -536,14 +550,6 @@ t(T,S) -> %% transition/2 -%% Listening process is transfering ownership of an association. -transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg, Matches}, - #transport{mode = {accept, _}, - socket = LSock} - = S) -> - ok = accept_peer(Sock, Matches), - transition(setelement(2, Msg, Sock), S#transport{socket = Sock}); - %% Incoming message. transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> setopts(Sock), @@ -578,6 +584,10 @@ transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) -> transition({'DOWN', _, process, _Pid, _}, #transport{mode = accept}) -> ok; +%% Timeout after transport process has been started. +transition(accept_timeout, _) -> + ok; + %% Request for the local port number. transition({resolve_port, Pid}, #transport{socket = Sock}) when is_pid(Pid) -> -- cgit v1.2.3 From 1d3827223b0c4174ab39fa6af3e969e4a62f598f Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 19 Jun 2015 01:26:06 +0200 Subject: Don't monitor listener after peeloff Listener death should have no effect on a peeled off association. --- lib/diameter/src/transport/diameter_sctp.erl | 32 +++++++++++----------------- 1 file changed, 12 insertions(+), 20 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 29ac73a6a6..51b4858ab0 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -246,19 +246,19 @@ i({accept, Pid, LPid, LSock, Ref}) putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), monitor(process, Pid), - monitor(process, LPid), - wait([peeloff], #transport{parent = Pid, - mode = {accept, LPid}, - socket = LSock}); + MRef = monitor(process, LPid), + wait([{peeloff, MRef}], #transport{parent = Pid, + mode = {accept, LPid}, + socket = LSock}); %% An accepting transport spawned at association establishment. i({accept, Ref, LPid, LSock, _Id}) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), erlang:send_after(?ACCEPT_TIMEOUT, self(), accept_timeout), - monitor(process, LPid), - wait([Ref, peeloff], #transport{mode = {accept, LPid}, - socket = LSock}). + MRef = monitor(process, LPid), + wait([{parent, Ref}, {peeloff, MRef}], #transport{mode = {accept, LPid}, + socket = LSock}). %% wait/2 %% @@ -268,15 +268,16 @@ i({accept, Ref, LPid, LSock, _Id}) -> wait(Keys, S) -> lists:foldl(fun i/2, S, Keys). -i(K, #transport{mode = {accept, _}, - socket = LSock} - = S) -> +i({K, Ref}, #transport{mode = {accept, _}, + socket = LSock} + = S) -> receive - {K, Pid} when is_reference(K) -> %% transport process started + {Ref, Pid} when K == parent -> %% transport process started S#transport{parent = Pid}; {K, Sock, T, Matches} when K == peeloff -> %% association {sctp, LSock, _RA, _RP, _Data} = T, %% assert ok = accept_peer(Sock, Matches), + demonitor(Ref, [flush]), t(setelement(2, T, Sock), S#transport{socket = Sock}); accept_timeout = T -> x(T); @@ -575,15 +576,6 @@ transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> stop; -%% Listener process has died. -transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) -> - stop; - -%% Ditto but we have ownership of the association. It might be that -%% we'll go down anyway though. -transition({'DOWN', _, process, _Pid, _}, #transport{mode = accept}) -> - ok; - %% Timeout after transport process has been started. transition(accept_timeout, _) -> ok; -- cgit v1.2.3 From 29d23ca325f52c86517f1c6d0b53328c14e4af65 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 19 Jun 2015 01:35:07 +0200 Subject: Simplify socket close at terminate The existing code was a remnant of the pre-peeloff implementation. There's no need to close anything but the whole socket. --- lib/diameter/src/transport/diameter_sctp.erl | 16 ---------------- 1 file changed, 16 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 51b4858ab0..7e54acaa01 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -285,12 +285,6 @@ i({K, Ref}, #transport{mode = {accept, _}, x(T) end. -%% close/2 - -close(Sock, Id) -> - gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Id}). -%% Having to pass a record here is hokey. - %% listener/2 %% Accepting processes can be started concurrently: ensure only one @@ -462,16 +456,6 @@ code_change(_, State, _) -> terminate(_, #transport{assoc_id = undefined}) -> ok; -terminate(_, #transport{socket = Sock, - mode = accept, - assoc_id = Id}) -> - close(Sock, Id); - -terminate(_, #transport{socket = Sock, - mode = {accept, _}, - assoc_id = Id}) -> - close(Sock, Id); - terminate(_, #transport{socket = Sock}) -> gen_sctp:close(Sock); -- cgit v1.2.3 From c6c18a2eca711e738b187b4f7aa0bd2a4765bea3 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 19 Jun 2015 02:39:29 +0200 Subject: Simplify peeloff signaling In particular, don't give the accepting transport process the listening socket. It was used to match the initial sctp message received in a peeloff message, but replace the socket in the forwarded message instead. --- lib/diameter/src/transport/diameter_sctp.erl | 34 ++++++++++++---------------- 1 file changed, 15 insertions(+), 19 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 7e54acaa01..f2949c1242 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -241,24 +241,22 @@ i({connect, Pid, Opts, Addrs, Ref}) -> %% An accepting transport spawned by diameter, not yet owning an %% association. -i({accept, Pid, LPid, LSock, Ref}) +i({accept, Pid, LPid, Ref}) when is_pid(Pid) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), monitor(process, Pid), MRef = monitor(process, LPid), wait([{peeloff, MRef}], #transport{parent = Pid, - mode = {accept, LPid}, - socket = LSock}); + mode = {accept, LPid}}); %% An accepting transport spawned at association establishment. -i({accept, Ref, LPid, LSock, _Id}) -> +i({accept, Ref, LPid, _Id}) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), erlang:send_after(?ACCEPT_TIMEOUT, self(), accept_timeout), MRef = monitor(process, LPid), - wait([{parent, Ref}, {peeloff, MRef}], #transport{mode = {accept, LPid}, - socket = LSock}). + wait([{parent, Ref}, {peeloff, MRef}], #transport{mode = {accept, LPid}}). %% wait/2 %% @@ -268,17 +266,15 @@ i({accept, Ref, LPid, LSock, _Id}) -> wait(Keys, S) -> lists:foldl(fun i/2, S, Keys). -i({K, Ref}, #transport{mode = {accept, _}, - socket = LSock} - = S) -> +i({K, Ref}, #transport{mode = {accept, _}} = S) -> receive {Ref, Pid} when K == parent -> %% transport process started S#transport{parent = Pid}; - {K, Sock, T, Matches} when K == peeloff -> %% association - {sctp, LSock, _RA, _RP, _Data} = T, %% assert + {K, T, Matches} when K == peeloff -> %% association + {sctp, Sock, _RA, _RP, _Data} = T, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), - t(setelement(2, T, Sock), S#transport{socket = Sock}); + t(T, S#transport{socket = Sock}); accept_timeout = T -> x(T); {'DOWN', _, process, _, _} = T -> @@ -482,10 +478,12 @@ start_timer(S) -> %% Transition listener state. %% Incoming message from SCTP. -l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) -> +l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, + accept = Matches} + = S) -> Id = assoc_id(Data), {TPid, NewS} = accept(Id, S), - TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept}, + TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches}, setopts(Sock), NewS; @@ -608,9 +606,8 @@ accept(_, Pid, #listener{ref = Ref, TPid; %% No pending associations: spawn a new transport. -accept(Ref, Pid, #listener{socket = Sock, - pending = {_,Q}}) -> - nq({accept, Pid, self(), Sock, Ref}, Q). +accept(Ref, Pid, #listener{pending = {_,Q}}) -> + nq({accept, Pid, self(), Ref}, Q). %% send/2 @@ -738,9 +735,8 @@ tpid(_Id, #listener{pending = {N,Q}}) %% No transport start yet: spawn one and queue. tpid(Id, #listener{ref = Ref, - socket = Sock, pending = {_,Q}}) -> - nq({accept, Ref, self(), Sock, Id}, Q). + nq({accept, Ref, self(), Id}, Q). %% nq/2 %% -- cgit v1.2.3 From 25f280a5af09bba20ca9db84ebfd9d0d67f3c7ff Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 19 Jun 2015 02:45:05 +0200 Subject: Simplify accepting transport start Don't pass an association id that's no longer used. --- lib/diameter/src/transport/diameter_sctp.erl | 34 ++++++++++++---------------- 1 file changed, 15 insertions(+), 19 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index f2949c1242..41f5ea7ce8 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -224,7 +224,7 @@ i({listen, Ref, {Opts, Addrs}}) -> proc_lib:init_ack({ok, self(), LAs}), start_timer(#listener{ref = Ref, socket = Sock, - accept = accept(Matches)}); + accept = [[M] || {accept, M} <- Matches]}); %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> @@ -241,7 +241,7 @@ i({connect, Pid, Opts, Addrs, Ref}) -> %% An accepting transport spawned by diameter, not yet owning an %% association. -i({accept, Pid, LPid, Ref}) +i({accept, Ref, LPid, Pid}) when is_pid(Pid) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), @@ -250,8 +250,9 @@ i({accept, Pid, LPid, Ref}) wait([{peeloff, MRef}], #transport{parent = Pid, mode = {accept, LPid}}); -%% An accepting transport spawned at association establishment. -i({accept, Ref, LPid, _Id}) -> +%% An accepting transport spawned at association establishment, whose +%% parent is not yet known. +i({accept, Ref, LPid}) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), erlang:send_after(?ACCEPT_TIMEOUT, self(), accept_timeout), @@ -482,7 +483,7 @@ l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, accept = Matches} = S) -> Id = assoc_id(Data), - {TPid, NewS} = accept(Id, S), + {TPid, NewS} = accept(S), TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches}, setopts(Sock), NewS; @@ -586,11 +587,6 @@ accept_peer(Sock, Matches) -> orelse x({accept, RAddrs, Matches}), ok. -%% accept/1 - -accept(Opts) -> - [[M] || {accept, M} <- Opts]. - %% accept/3 %% %% Start a new transport process or use one that's already been @@ -607,7 +603,7 @@ accept(_, Pid, #listener{ref = Ref, %% No pending associations: spawn a new transport. accept(Ref, Pid, #listener{pending = {_,Q}}) -> - nq({accept, Pid, self(), Ref}, Q). + nq({accept, Ref, self(), Pid}, Q). %% send/2 @@ -718,25 +714,25 @@ up(#transport{parent = Pid, diameter_peer:up(Pid), S#transport{mode = A}. -%% accept/2 +%% accept/1 %% %% Start a new transport process or use one that's already been %% started as a consequence of an event to a listener process. -accept(Id, #listener{pending = {N,Q}} = S) -> - {tpid(Id, S), S#listener{pending = {N+1,Q}}}. +accept(#listener{pending = {N,Q}} = S) -> + {tpid(S), S#listener{pending = {N+1,Q}}}. -%% tpid/2 +%% tpid/1 %% Transport waiting for an association: use it. -tpid(_Id, #listener{pending = {N,Q}}) +tpid(#listener{pending = {N,Q}}) when N < 0 -> dq(Q); %% No transport start yet: spawn one and queue. -tpid(Id, #listener{ref = Ref, - pending = {_,Q}}) -> - nq({accept, Ref, self(), Id}, Q). +tpid(#listener{ref = Ref, + pending = {_,Q}}) -> + nq({accept, Ref, self()}, Q). %% nq/2 %% -- cgit v1.2.3 From 207de95c123a0df754a6886011fc05a4b513a6a5 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 18 Jun 2015 01:44:58 +0200 Subject: Remove upgrade-related code The changes in some of the previous commits assume application restart. --- lib/diameter/src/transport/diameter_sctp.erl | 42 +++------------------------- 1 file changed, 4 insertions(+), 38 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 41f5ea7ce8..0fe25106cc 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -121,8 +121,7 @@ %% queue or spawned and placed in the first queue. Thus, there are %% only elements in one queue at a time, so share an ets table queue %% and tag it with a positive length if it contains the first queue, a -%% negative length if it contains the second queue. The case -1 is -%% handled differently for backwards compatibility reasons. +%% negative length if it contains the second queue. %% --------------------------------------------------------------------------- %% # start/3 @@ -369,17 +368,13 @@ type(T) -> %% # handle_call/3 %% --------------------------------------------------------------------------- -handle_call(T, From, #listener{pending = L} = S) - when is_list(L) -> - handle_call(T, From, upgrade(S)); - handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref, pending = {N,Q}, count = K} = S) -> TPid = accept(Ref, Pid, S), - {reply, {ok, TPid}, downgrade(S#listener{pending = {N-1,Q}, - count = K+1})}; + {reply, {ok, TPid}, S#listener{pending = {N-1,Q}, + count = K+1}}; handle_call(_, _, State) -> {reply, nok, State}. @@ -398,18 +393,9 @@ handle_cast(_, State) -> handle_info(T, #transport{} = S) -> {noreply, #transport{} = t(T,S)}; -handle_info(T, #listener{pending = L} = S) - when is_list(L) -> - handle_info(T, upgrade(S)); - handle_info(T, #listener{} = S) -> - {noreply, downgrade(#listener{} = l(T,S))}. - -%% upgrade/1 + {noreply, #listener{} = l(T,S)}. -upgrade(#listener{pending = [TPid | {0,Q}]} = S) -> - ets:insert(Q, {TPid, now()}), - S#listener{pending = {-1,Q}}. %% Prior to the possiblity of setting pool_size on in transport %% configuration, a new accepting transport was only started following %% the death of a predecessor, so that there was only at most one @@ -418,26 +404,6 @@ upgrade(#listener{pending = [TPid | {0,Q}]} = S) -> %% several accepting transports are started concurrently. Deal with %% this by placing the started transports in a new queue of transport %% processes waiting for an association. -%% -%% Since only one of this queue and the existing queue of controlling -%% processes waiting for a transport to be started can be non-empty at -%% any given time, implement both queues in the same ets table. The -%% absolute value of the first element of the 2-tuple is the queue -%% length, the sign says which queue it is. - -%% downgrade/1 -%% -%% Revert to the pre-pool_size representation when possible, for -%% backwards compatibility in the case that the pool_size option -%% hasn't been used. - -downgrade(#listener{pending = {-1,Q}} = S) -> - TPid = ets:first(Q), - ets:delete(Q, TPid), - S#listener{pending = [TPid | {0,Q}]}; - -downgrade(S) -> - S. %% --------------------------------------------------------------------------- %% # code_change/3 -- cgit v1.2.3 From cb89a27002558f19f236febef877c5cf135c4c8d Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 19 Jun 2015 13:26:19 +0200 Subject: Ensure accepting processes are first in, first out A listener process in diameter_sctp starts accepting transport processes as required, either as associations are established or as diameter asks for a processes to be started. Since this can happen in any order, the listener maintains two queues: one for processes that diameter has requested and which are waiting to be given an association, another for processes that have been started to become owners of an association but are waiting for diameter to request them. Only one queue at a time is non-empty. The first queue's length is bounded by the number of accepting processes configured as pool_size. Entries in the second queue are short-lived since diameter starts a replacement transport process whenever an existing one dies or communicates that it has an association. The two queues were previously implemented in an ets ordered_set, whose keys were the pid() of transport processes. Removing an element from the queue was then done with ets:first/1. The problem with this it's not really a queue: there's no guarantee that pid-ordering is the same as the order in which processes are started. If it isn't then it's possible that an established association never be given to diameter as a transport process if there's always a newer association whose pid sorts first. This isn't a problem in practice since it would require new associations to be established faster than diameter starts transport processes, but redo the implementation as a queue, with strict FIFO semantics. --- lib/diameter/src/transport/diameter_sctp.erl | 51 ++++++++++++++-------------- 1 file changed, 25 insertions(+), 26 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 0fe25106cc..bb0bf82f04 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -103,7 +103,7 @@ {ref :: reference(), socket :: gen_sctp:sctp_socket(), count = 0 :: uint(), %% attached transport processes - pending = {0, ets:new(?MODULE, [ordered_set])}, + pending = {0, queue:new()}, tref :: reference(), accept :: [match()]}). %% Field pending implements two queues: the first of transport-to-be @@ -369,12 +369,10 @@ type(T) -> %% --------------------------------------------------------------------------- handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref, - pending = {N,Q}, count = K} = S) -> - TPid = accept(Ref, Pid, S), - {reply, {ok, TPid}, S#listener{pending = {N-1,Q}, - count = K+1}}; + {TPid, NewS} = accept(Ref, Pid, S), + {reply, {ok, TPid}, NewS#listener{count = K+1}}; handle_call(_, _, State) -> {reply, nok, State}. @@ -455,7 +453,7 @@ l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, NewS; l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> - down(ets:member(Q, TPid), TPid, S); + down(queue:member(TPid, Q), TPid, S); %% Timeout after the last accepting process has died. l({timeout, TRef, close = T}, #listener{tref = TRef, @@ -472,12 +470,12 @@ l({timeout, _, close}, #listener{} = S) -> down(true, TPid, #listener{pending = {N,Q}, count = K} = S) -> - ets:delete(Q, TPid), + NQ = queue:filter(fun(P) -> P /= TPid end, Q), if N < 0 -> %% awaiting an association ... start_timer(S#listener{count = K-1, - pending = {N+1,Q}}); + pending = {N+1, NQ}}); true -> %% ... or one has been assigned - S#listener{pending = {N-1,Q}} + S#listener{pending = {N-1, NQ}} end; %% ... or one that's already attached. @@ -559,16 +557,20 @@ accept_peer(Sock, Matches) -> %% started as a consequence of diameter requesting a transport %% process. +accept(Ref, Pid, #listener{pending = {N,_}} = S) -> + {TPid, NQ} = q(Ref, Pid, S), + {TPid, S#listener{pending = {N-1, NQ}}}. + %% Pending associations: attach to the first in the queue. -accept(_, Pid, #listener{ref = Ref, - pending = {N,Q}}) +q(_, Pid, #listener{ref = Ref, + pending = {N,Q}}) when 0 < N -> - TPid = dq(Q), + {TPid, _} = T = dq(Q), TPid ! {Ref, Pid}, - TPid; + T; %% No pending associations: spawn a new transport. -accept(Ref, Pid, #listener{pending = {_,Q}}) -> +q(Ref, Pid, #listener{pending = {_,Q}}) -> nq({accept, Ref, self(), Pid}, Q). %% send/2 @@ -685,19 +687,18 @@ up(#transport{parent = Pid, %% Start a new transport process or use one that's already been %% started as a consequence of an event to a listener process. -accept(#listener{pending = {N,Q}} = S) -> - {tpid(S), S#listener{pending = {N+1,Q}}}. - -%% tpid/1 +accept(#listener{pending = {N,_}} = S) -> + {TPid, NQ} = q(S), + {TPid, S#listener{pending = {N+1, NQ}}}. %% Transport waiting for an association: use it. -tpid(#listener{pending = {N,Q}}) +q(#listener{pending = {N,Q}}) when N < 0 -> dq(Q); %% No transport start yet: spawn one and queue. -tpid(#listener{ref = Ref, - pending = {_,Q}}) -> +q(#listener{ref = Ref, + pending = {_,Q}}) -> nq({accept, Ref, self()}, Q). %% nq/2 @@ -708,8 +709,7 @@ tpid(#listener{ref = Ref, nq(Arg, Q) -> {ok, TPid} = diameter_sctp_sup:start_child(Arg), monitor(process, TPid), - ets:insert(Q, {TPid, now()}), - TPid. + {TPid, queue:in(TPid, Q)}. %% dq/1 %% @@ -717,9 +717,8 @@ nq(Arg, Q) -> %% it to an existing association. dq(Q) -> - TPid = ets:first(Q), - ets:delete(Q, TPid), - TPid. + {{value, TPid}, NQ} = queue:out(Q), + {TPid, NQ}. %% assoc_id/1 %% -- cgit v1.2.3 From 1f483bd5eb6371a1bf2517188658cc4a9cb57cae Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 19 Jun 2015 15:12:19 +0200 Subject: Log discarded answers To diameter_lib:log/4, which was last motivated in commit 39acfdb0. --- lib/diameter/src/base/diameter_traffic.erl | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index eb4bbae931..230a05fa11 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -261,7 +261,8 @@ recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) -> %% any others are discarded. %% ... or not. -recv(false, false, TPid, _, _, _) -> +recv(false, false, TPid, Pkt, _, _) -> + ?LOG(discarded, Pkt#diameter_packet.header), incr(TPid, {{unknown, 0}, recv, discarded}), ok. -- cgit v1.2.3 From fc9e8a6bade31c7216a8d6a194ba221f6f15459c Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sun, 21 Jun 2015 09:55:17 +0200 Subject: Fix start order of alternate transports A transport configured with diameter:add_transport/2 can be passed multiple transport_module/transport_config tuples in order to specify alternate configuration, modules being attempted in order until one succeeds. This is primarily for the connecting case, to allow a transport to be configured to first attempt connection over SCTP, and then TCP in case SCTP fails, with configuration like that documented: {transport_module, diameter_sctp}, {transport_config, [...], 5000}, {transport_module, diameter_tcp}, {transport_config, [...]} If the options are the same in both cases, another possibility would be configuration like this, which attaches the same transport_config to both modules: {transport_module, diameter_sctp}, {transport_module, diameter_tcp}, {transport_config, [...], 5000}, However, in this case the start order was reversed relative to the documented order: first tcp, then sctp. This commit restores the intended order. OTP-12851 --- lib/diameter/src/base/diameter_peer.erl | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl index 89b63c8a92..a814e52f29 100644 --- a/lib/diameter/src/base/diameter_peer.erl +++ b/lib/diameter/src/base/diameter_peer.erl @@ -121,7 +121,7 @@ pair([{transport_module, M} | Rest], Mods, Acc) -> pair([{transport_config = T, C} | Rest], Mods, Acc) -> pair([{T, C, ?DEFAULT_TTMO} | Rest], Mods, Acc); pair([{transport_config, C, Tmo} | Rest], Mods, Acc) -> - pair(Rest, [], acc({Mods, C, Tmo}, Acc)); + pair(Rest, [], acc({lists:reverse(Mods), C, Tmo}, Acc)); pair([_ | Rest], Mods, Acc) -> pair(Rest, Mods, Acc); @@ -130,13 +130,16 @@ pair([_ | Rest], Mods, Acc) -> pair([], [], []) -> [{[?DEFAULT_TMOD], ?DEFAULT_TCFG, ?DEFAULT_TTMO}]; -%% One transport_module, one transport_config. -pair([], [M], [{[], Cfg, Tmo}]) -> - [{[M], Cfg, Tmo}]; +%% One transport_module, one transport_config: ignore option order. +%% That is, interpret [{transport_config, _}, {transport_module, _}] +%% as if the order was reversed, not as config with default module and +%% module with default config. +pair([], [_] = Mods, [{[], Cfg, Tmo}]) -> + [{Mods, Cfg, Tmo}]; %% Trailing transport_module: default transport_config. pair([], [_|_] = Mods, Acc) -> - lists:reverse(acc({Mods, ?DEFAULT_TCFG, ?DEFAULT_TTMO}, Acc)); + pair([{transport_config, ?DEFAULT_TCFG}], Mods, Acc); pair([], [], Acc) -> lists:reverse(def(Acc)). -- cgit v1.2.3