diff options
Diffstat (limited to 'lib/diameter/src/transport')
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 331 |
1 files changed, 123 insertions, 208 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index f80de0a816..bb0bf82f04 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -103,15 +103,11 @@ {ref :: reference(), socket :: gen_sctp:sctp_socket(), count = 0 :: uint(), %% attached transport processes - tmap = ets:new(?MODULE, []) :: ets:tid(), - %% {MRef, Pid|AssocId}, {AssocId, Pid} - pending = {0, ets:new(?MODULE, [ordered_set])}, + pending = {0, queue:new()}, tref :: reference(), accept :: [match()]}). -%% Field tmap is used to map an incoming message or event to the -%% relevant transport process. Field pending implements two queues: -%% the first of transport-to-be processes to which an association has -%% been assigned (at comm_up and written into tmap) but for which +%% Field pending implements two queues: the first of transport-to-be +%% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived %% state of affairs as a new transport is spawned as a consequence of %% a peer being taken up, transport processes being spawned by the @@ -125,8 +121,7 @@ %% queue or spawned and placed in the first queue. Thus, there are %% only elements in one queue at a time, so share an ets table queue %% and tag it with a positive length if it contains the first queue, a -%% negative length if it contains the second queue. The case -1 is -%% handled differently for backwards compatibility reasons. +%% negative length if it contains the second queue. %% --------------------------------------------------------------------------- %% # start/3 @@ -228,7 +223,7 @@ i({listen, Ref, {Opts, Addrs}}) -> proc_lib:init_ack({ok, self(), LAs}), start_timer(#listener{ref = Ref, socket = Sock, - accept = accept(Matches)}); + accept = [[M] || {accept, M} <- Matches]}); %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> @@ -243,43 +238,49 @@ i({connect, Pid, Opts, Addrs, Ref}) -> mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock}; -%% An accepting transport spawned by diameter. -i({accept, Pid, LPid, Sock, Ref}) +%% An accepting transport spawned by diameter, not yet owning an +%% association. +i({accept, Ref, LPid, Pid}) when is_pid(Pid) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), monitor(process, Pid), - monitor(process, LPid), - #transport{parent = Pid, - mode = {accept, LPid}, - socket = Sock}; + MRef = monitor(process, LPid), + wait([{peeloff, MRef}], #transport{parent = Pid, + mode = {accept, LPid}}); -%% An accepting transport spawned at association establishment. -i({accept, Ref, LPid, Sock, Id}) -> +%% An accepting transport spawned at association establishment, whose +%% parent is not yet known. +i({accept, Ref, LPid}) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), + erlang:send_after(?ACCEPT_TIMEOUT, self(), accept_timeout), MRef = monitor(process, LPid), - %% Wait for a signal that the transport has been started before - %% processing other messages. + wait([{parent, Ref}, {peeloff, MRef}], #transport{mode = {accept, LPid}}). + +%% wait/2 +%% +%% Wait for diameter to start the transport process and for the +%% association to be peeled off before processing other messages. + +wait(Keys, S) -> + lists:foldl(fun i/2, S, Keys). + +i({K, Ref}, #transport{mode = {accept, _}} = S) -> receive - {Ref, Pid} -> %% transport started - #transport{parent = Pid, - mode = {accept, LPid}, - socket = Sock}; - {'DOWN', MRef, process, _, _} = T -> %% listener down - close(Sock, Id), + {Ref, Pid} when K == parent -> %% transport process started + S#transport{parent = Pid}; + {K, T, Matches} when K == peeloff -> %% association + {sctp, Sock, _RA, _RP, _Data} = T, + ok = accept_peer(Sock, Matches), + demonitor(Ref, [flush]), + t(T, S#transport{socket = Sock}); + accept_timeout = T -> + x(T); + {'DOWN', _, process, _, _} = T -> x(T) - after ?ACCEPT_TIMEOUT -> - close(Sock, Id), - x(timeout) end. -%% close/2 - -close(Sock, Id) -> - gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Id}). -%% Having to pass a record here is hokey. - %% listener/2 %% Accepting processes can be started concurrently: ensure only one @@ -297,7 +298,7 @@ listener({LRef, T}) -> l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> {LAs, _Sock} = AS, {LPid, LAs}; - + %% ... or not. l([], LRef, T) -> {ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}), @@ -367,17 +368,11 @@ type(T) -> %% # handle_call/3 %% --------------------------------------------------------------------------- -handle_call(T, From, #listener{pending = L} = S) - when is_list(L) -> - handle_call(T, From, upgrade(S)); - handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref, - pending = {N,Q}, count = K} = S) -> - TPid = accept(Ref, Pid, S), - {reply, {ok, TPid}, downgrade(S#listener{pending = {N-1,Q}, - count = K+1})}; + {TPid, NewS} = accept(Ref, Pid, S), + {reply, {ok, TPid}, NewS#listener{count = K+1}}; handle_call(_, _, State) -> {reply, nok, State}. @@ -396,18 +391,9 @@ handle_cast(_, State) -> handle_info(T, #transport{} = S) -> {noreply, #transport{} = t(T,S)}; -handle_info(T, #listener{pending = L} = S) - when is_list(L) -> - handle_info(T, upgrade(S)); - handle_info(T, #listener{} = S) -> - {noreply, downgrade(#listener{} = l(T,S))}. - -%% upgrade/1 + {noreply, #listener{} = l(T,S)}. -upgrade(#listener{pending = [TPid | {0,Q}]} = S) -> - ets:insert(Q, {TPid, now()}), - S#listener{pending = {-1,Q}}. %% Prior to the possiblity of setting pool_size on in transport %% configuration, a new accepting transport was only started following %% the death of a predecessor, so that there was only at most one @@ -416,26 +402,6 @@ upgrade(#listener{pending = [TPid | {0,Q}]} = S) -> %% several accepting transports are started concurrently. Deal with %% this by placing the started transports in a new queue of transport %% processes waiting for an association. -%% -%% Since only one of this queue and the existing queue of controlling -%% processes waiting for a transport to be started can be non-empty at -%% any given time, implement both queues in the same ets table. The -%% absolute value of the first element of the 2-tuple is the queue -%% length, the sign says which queue it is. - -%% downgrade/1 -%% -%% Revert to the pre-pool_size representation when possible, for -%% backwards compatibility in the case that the pool_size option -%% hasn't been used. - -downgrade(#listener{pending = {-1,Q}} = S) -> - TPid = ets:first(Q), - ets:delete(Q, TPid), - S#listener{pending = [TPid | {0,Q}]}; - -downgrade(S) -> - S. %% --------------------------------------------------------------------------- %% # code_change/3 @@ -451,16 +417,6 @@ code_change(_, State, _) -> terminate(_, #transport{assoc_id = undefined}) -> ok; -terminate(_, #transport{socket = Sock, - mode = accept, - assoc_id = Id}) -> - close(Sock, Id); - -terminate(_, #transport{socket = Sock, - mode = {accept, _}, - assoc_id = Id}) -> - close(Sock, Id); - terminate(_, #transport{socket = Sock}) -> gen_sctp:close(Sock); @@ -487,21 +443,17 @@ start_timer(S) -> %% Transition listener state. %% Incoming message from SCTP. -l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) -> +l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, + accept = Matches} + = S) -> Id = assoc_id(Data), + {TPid, NewS} = accept(S), + TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches}, + setopts(Sock), + NewS; - try find(Id, Data, S) of - {TPid, NewS} -> - TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept}, - NewS; - false -> - S - after - setopts(Sock) - end; - -l({'DOWN', MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> - down(ets:member(Q, TPid), MRef, TPid, S); +l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> + down(queue:member(TPid, Q), TPid, S); %% Timeout after the last accepting process has died. l({timeout, TRef, close = T}, #listener{tref = TRef, @@ -510,36 +462,25 @@ l({timeout, TRef, close = T}, #listener{tref = TRef, l({timeout, _, close}, #listener{} = S) -> S. -%% down/4 +%% down/3 +%% +%% Accepting transport has died. + +%% One that's waiting for transport start in the pending queue ... +down(true, TPid, #listener{pending = {N,Q}, + count = K} + = S) -> + NQ = queue:filter(fun(P) -> P /= TPid end, Q), + if N < 0 -> %% awaiting an association ... + start_timer(S#listener{count = K-1, + pending = {N+1, NQ}}); + true -> %% ... or one has been assigned + S#listener{pending = {N-1, NQ}} + end; -%% Accepting transport has died. One that's awaiting an association ... -down(true, MRef, TPid, #listener{pending = {N,Q}, - tmap = T, - count = K} - = S) - when N < 0 -> - ets:delete(Q, TPid), - ets:delete(T, MRef), - ets:delete(T, TPid), - start_timer(S#listener{count = K-1, - pending = {N+1,Q}}); - -%% ... or one that already has one. -down(B, MRef, TPid, #listener{socket = Sock, - tmap = T, - count = K, - pending = {N,Q}} - = S) -> - [{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId - ets:delete(T, MRef), - ets:delete(T, Id), - Id == TPid orelse close(Sock, Id), - if B -> %% Waiting for attachment in the pending queue ... - ets:delete(Q, TPid), - S#listener{pending = {N-1,Q}}; - true -> %% ... or already attached - start_timer(S#listener{count = K-1}) - end. +%% ... or one that's already attached. +down(false, _TPid, #listener{count = K} = S) -> + start_timer(S#listener{count = K-1}). %% t/2 %% @@ -557,20 +498,10 @@ t(T,S) -> %% transition/2 -%% Listening process is transfering ownership of an association. -transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg, Matches}, - #transport{mode = {accept, _}, - socket = LSock} - = S) -> - ok = accept_peer(Sock, Matches), - transition(Msg, S#transport{socket = Sock}); - %% Incoming message. -transition({sctp, _Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> +transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> setopts(Sock), recv(Data, S); -%% Don't match on Sock since in R15B01 it can be the listening socket -%% in the (peeled-off) accept case, which is likely a bug. %% Outgoing message. transition({diameter, {send, Msg}}, S) -> @@ -592,13 +523,8 @@ transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> stop; -%% Listener process has died. -transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) -> - stop; - -%% Ditto but we have ownership of the association. It might be that -%% we'll go down anyway though. -transition({'DOWN', _, process, _Pid, _}, #transport{mode = accept}) -> +%% Timeout after transport process has been started. +transition(accept_timeout, _) -> ok; %% Request for the local port number. @@ -625,37 +551,27 @@ accept_peer(Sock, Matches) -> orelse x({accept, RAddrs, Matches}), ok. -%% accept/1 - -accept(Opts) -> - [[M] || {accept, M} <- Opts]. - %% accept/3 %% %% Start a new transport process or use one that's already been -%% started as a consequence of association establishment. +%% started as a consequence of diameter requesting a transport +%% process. -%% No pending associations: spawn a new transport. -accept(Ref, Pid, #listener{socket = Sock, - tmap = T, - pending = {N,Q}}) - when N =< 0 -> - Arg = {accept, Pid, self(), Sock, Ref}, - {ok, TPid} = diameter_sctp_sup:start_child(Arg), - MRef = monitor(process, TPid), - ets:insert(T, [{MRef, TPid}, {TPid, MRef}]), - ets:insert(Q, {TPid, now()}), - TPid; -%% Placing the transport in the second pending table makes it -%% available to the next association. +accept(Ref, Pid, #listener{pending = {N,_}} = S) -> + {TPid, NQ} = q(Ref, Pid, S), + {TPid, S#listener{pending = {N-1, NQ}}}. %% Pending associations: attach to the first in the queue. -accept(_, Pid, #listener{ref = Ref, - pending = {_,Q}}) -> - TPid = ets:first(Q), +q(_, Pid, #listener{ref = Ref, + pending = {N,Q}}) + when 0 < N -> + {TPid, _} = T = dq(Q), TPid ! {Ref, Pid}, - ets:delete(Q, TPid), - TPid. + T; + +%% No pending associations: spawn a new transport. +q(Ref, Pid, #listener{pending = {_,Q}}) -> + nq({accept, Ref, self(), Pid}, Q). %% send/2 @@ -716,7 +632,7 @@ recv({_, #sctp_assoc_change{} = E}, = S) -> S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}}; -%% Lost association after establishment. +%% Association failure. recv({_, #sctp_assoc_change{}}, _) -> stop; @@ -727,8 +643,10 @@ recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid}) bin = Bin}), ok; -recv({_, #sctp_shutdown_event{assoc_id = Id}}, - #transport{assoc_id = Id}) -> +recv({_, #sctp_shutdown_event{assoc_id = A}}, + #transport{assoc_id = Id}) + when A == Id; + A == 0 -> stop; %% Note that diameter_sctp(3) documents that sctp_events cannot be @@ -764,52 +682,49 @@ up(#transport{parent = Pid, diameter_peer:up(Pid), S#transport{mode = A}. -%% find/3 - -find(Id, Data, #listener{tmap = T} = S) -> - f(ets:lookup(T, Id), Data, S). - -%% New association ... -f([], - {_, #sctp_assoc_change{state = comm_up, assoc_id = Id}}, - #listener{pending = {N,Q}} - = S) -> - {find(Id, S), S#listener{pending = {N+1,Q}}}; - -%% Known association ... -f([{_, TPid}], _, S) -> - {TPid, S}; - -%% ... or not: discard. -f([], _, _) -> - false. +%% accept/1 +%% +%% Start a new transport process or use one that's already been +%% started as a consequence of an event to a listener process. -%% find/2 +accept(#listener{pending = {N,_}} = S) -> + {TPid, NQ} = q(S), + {TPid, S#listener{pending = {N+1, NQ}}}. %% Transport waiting for an association: use it. -find(Id, #listener{tmap = T, - pending = {N,Q}}) +q(#listener{pending = {N,Q}}) when N < 0 -> - TPid = ets:first(Q), - [{TPid, MRef}] = ets:lookup(T, TPid), - ets:insert(T, [{MRef, Id}, {Id, TPid}]), - ets:delete(T, TPid), - ets:delete(Q, TPid), - TPid; + dq(Q); %% No transport start yet: spawn one and queue. -find(Id, #listener{ref = Ref, - socket = Sock, - tmap = T, - pending = {_,Q}}) -> - Arg = {accept, Ref, self(), Sock, Id}, +q(#listener{ref = Ref, + pending = {_,Q}}) -> + nq({accept, Ref, self()}, Q). + +%% nq/2 +%% +%% Place a transport process in the second pending queue to make it +%% available to the next association. + +nq(Arg, Q) -> {ok, TPid} = diameter_sctp_sup:start_child(Arg), - MRef = monitor(process, TPid), - ets:insert(T, [{MRef, Id}, {Id, TPid}]), - ets:insert(Q, {TPid, now()}), - TPid. + monitor(process, TPid), + {TPid, queue:in(TPid, Q)}. + +%% dq/1 +%% +%% Remove a transport process from the first pending queue to assign +%% it to an existing association. + +dq(Q) -> + {{value, TPid}, NQ} = queue:out(Q), + {TPid, NQ}. %% assoc_id/1 +%% +%% It's unclear if this is needed, or if the first message on an +%% association is always sctp_assoc_change, but don't assume since +%% SCTP behaviour differs between operating systems. assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) -> Id; |