From 083ae6d719a8ab5d626ad4b28e836475b6c1a92b Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 19 Jun 2015 00:56:55 +0200 Subject: Don't receive initial messages out of order Forwarding an sctp message from the listener process at the same time that the controlling process is changed means there's no guarantee that the message order will be preserved. Selectively receive the peeloff message before entering the gen_server loop to ensure the order is preserved. --- lib/diameter/src/transport/diameter_sctp.erl | 62 ++++++++++++++++------------ 1 file changed, 36 insertions(+), 26 deletions(-) diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 43039ebd6e..29ac73a6a6 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -239,35 +239,49 @@ i({connect, Pid, Opts, Addrs, Ref}) -> mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock}; -%% An accepting transport spawned by diameter. -i({accept, Pid, LPid, Sock, Ref}) +%% An accepting transport spawned by diameter, not yet owning an +%% association. +i({accept, Pid, LPid, LSock, Ref}) when is_pid(Pid) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), monitor(process, Pid), monitor(process, LPid), - #transport{parent = Pid, - mode = {accept, LPid}, - socket = Sock}; + wait([peeloff], #transport{parent = Pid, + mode = {accept, LPid}, + socket = LSock}); %% An accepting transport spawned at association establishment. -i({accept, Ref, LPid, Sock, Id}) -> +i({accept, Ref, LPid, LSock, _Id}) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), - MRef = monitor(process, LPid), - %% Wait for a signal that the transport has been started before - %% processing other messages. + erlang:send_after(?ACCEPT_TIMEOUT, self(), accept_timeout), + monitor(process, LPid), + wait([Ref, peeloff], #transport{mode = {accept, LPid}, + socket = LSock}). + +%% wait/2 +%% +%% Wait for diameter to start the transport process and for the +%% association to be peeled off before processing other messages. + +wait(Keys, S) -> + lists:foldl(fun i/2, S, Keys). + +i(K, #transport{mode = {accept, _}, + socket = LSock} + = S) -> receive - {Ref, Pid} -> %% transport started - #transport{parent = Pid, - mode = {accept, LPid}, - socket = Sock}; - {'DOWN', MRef, process, _, _} = T -> %% listener down - close(Sock, Id), + {K, Pid} when is_reference(K) -> %% transport process started + S#transport{parent = Pid}; + {K, Sock, T, Matches} when K == peeloff -> %% association + {sctp, LSock, _RA, _RP, _Data} = T, %% assert + ok = accept_peer(Sock, Matches), + t(setelement(2, T, Sock), S#transport{socket = Sock}); + accept_timeout = T -> + x(T); + {'DOWN', _, process, _, _} = T -> x(T) - after ?ACCEPT_TIMEOUT -> - close(Sock, Id), - x(timeout) end. %% close/2 @@ -536,14 +550,6 @@ t(T,S) -> %% transition/2 -%% Listening process is transfering ownership of an association. -transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg, Matches}, - #transport{mode = {accept, _}, - socket = LSock} - = S) -> - ok = accept_peer(Sock, Matches), - transition(setelement(2, Msg, Sock), S#transport{socket = Sock}); - %% Incoming message. transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> setopts(Sock), @@ -578,6 +584,10 @@ transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) -> transition({'DOWN', _, process, _Pid, _}, #transport{mode = accept}) -> ok; +%% Timeout after transport process has been started. +transition(accept_timeout, _) -> + ok; + %% Request for the local port number. transition({resolve_port, Pid}, #transport{socket = Sock}) when is_pid(Pid) -> -- cgit v1.2.3