aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2015-06-22 00:40:56 +0200
committerAnders Svensson <[email protected]>2015-06-22 00:40:56 +0200
commit72f5c50ae1929cbbbdafe073477214e3c4015ef2 (patch)
tree15b21ab25716556f27a288481ff72a71458098e9 /lib/diameter/src
parent5fff4543e890229527c2b8875fbc96aa88c87ad9 (diff)
parentb409496fdb0eaa293a63f5c98c8f642aa8ce7aaa (diff)
downloadotp-72f5c50ae1929cbbbdafe073477214e3c4015ef2.tar.gz
otp-72f5c50ae1929cbbbdafe073477214e3c4015ef2.tar.bz2
otp-72f5c50ae1929cbbbdafe073477214e3c4015ef2.zip
Merge branch 'anders/diameter/sctp/OTP-12768'
* anders/diameter/sctp/OTP-12768: Fix connection timeouts in test transports Fix start order of alternate transports Log discarded answers Ensure accepting processes are first in, first out Remove upgrade-related code Be less parallel in traffic suite Increase send/receive buffers for testsuite SCTP listeners Decrease unnecessarily long testsuite timetraps Simplify accepting transport start Simplify peeloff signaling Simplify socket close at terminate Don't monitor listener after peeloff Don't receive initial messages out of order Remove assumption that SCTP association ids will be unique
Diffstat (limited to 'lib/diameter/src')
-rw-r--r--lib/diameter/src/base/diameter_peer.erl13
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl3
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl331
3 files changed, 133 insertions, 214 deletions
diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl
index 89b63c8a92..a814e52f29 100644
--- a/lib/diameter/src/base/diameter_peer.erl
+++ b/lib/diameter/src/base/diameter_peer.erl
@@ -121,7 +121,7 @@ pair([{transport_module, M} | Rest], Mods, Acc) ->
pair([{transport_config = T, C} | Rest], Mods, Acc) ->
pair([{T, C, ?DEFAULT_TTMO} | Rest], Mods, Acc);
pair([{transport_config, C, Tmo} | Rest], Mods, Acc) ->
- pair(Rest, [], acc({Mods, C, Tmo}, Acc));
+ pair(Rest, [], acc({lists:reverse(Mods), C, Tmo}, Acc));
pair([_ | Rest], Mods, Acc) ->
pair(Rest, Mods, Acc);
@@ -130,13 +130,16 @@ pair([_ | Rest], Mods, Acc) ->
pair([], [], []) ->
[{[?DEFAULT_TMOD], ?DEFAULT_TCFG, ?DEFAULT_TTMO}];
-%% One transport_module, one transport_config.
-pair([], [M], [{[], Cfg, Tmo}]) ->
- [{[M], Cfg, Tmo}];
+%% One transport_module, one transport_config: ignore option order.
+%% That is, interpret [{transport_config, _}, {transport_module, _}]
+%% as if the order was reversed, not as config with default module and
+%% module with default config.
+pair([], [_] = Mods, [{[], Cfg, Tmo}]) ->
+ [{Mods, Cfg, Tmo}];
%% Trailing transport_module: default transport_config.
pair([], [_|_] = Mods, Acc) ->
- lists:reverse(acc({Mods, ?DEFAULT_TCFG, ?DEFAULT_TTMO}, Acc));
+ pair([{transport_config, ?DEFAULT_TCFG}], Mods, Acc);
pair([], [], Acc) ->
lists:reverse(def(Acc)).
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index eb4bbae931..230a05fa11 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -261,7 +261,8 @@ recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) ->
%% any others are discarded.
%% ... or not.
-recv(false, false, TPid, _, _, _) ->
+recv(false, false, TPid, Pkt, _, _) ->
+ ?LOG(discarded, Pkt#diameter_packet.header),
incr(TPid, {{unknown, 0}, recv, discarded}),
ok.
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index f80de0a816..bb0bf82f04 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -103,15 +103,11 @@
{ref :: reference(),
socket :: gen_sctp:sctp_socket(),
count = 0 :: uint(), %% attached transport processes
- tmap = ets:new(?MODULE, []) :: ets:tid(),
- %% {MRef, Pid|AssocId}, {AssocId, Pid}
- pending = {0, ets:new(?MODULE, [ordered_set])},
+ pending = {0, queue:new()},
tref :: reference(),
accept :: [match()]}).
-%% Field tmap is used to map an incoming message or event to the
-%% relevant transport process. Field pending implements two queues:
-%% the first of transport-to-be processes to which an association has
-%% been assigned (at comm_up and written into tmap) but for which
+%% Field pending implements two queues: the first of transport-to-be
+%% processes to which an association has been assigned but for which
%% diameter hasn't yet spawned a transport process, a short-lived
%% state of affairs as a new transport is spawned as a consequence of
%% a peer being taken up, transport processes being spawned by the
@@ -125,8 +121,7 @@
%% queue or spawned and placed in the first queue. Thus, there are
%% only elements in one queue at a time, so share an ets table queue
%% and tag it with a positive length if it contains the first queue, a
-%% negative length if it contains the second queue. The case -1 is
-%% handled differently for backwards compatibility reasons.
+%% negative length if it contains the second queue.
%% ---------------------------------------------------------------------------
%% # start/3
@@ -228,7 +223,7 @@ i({listen, Ref, {Opts, Addrs}}) ->
proc_lib:init_ack({ok, self(), LAs}),
start_timer(#listener{ref = Ref,
socket = Sock,
- accept = accept(Matches)});
+ accept = [[M] || {accept, M} <- Matches]});
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
@@ -243,43 +238,49 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
mode = {connect, connect(Sock, RAs, RP, [])},
socket = Sock};
-%% An accepting transport spawned by diameter.
-i({accept, Pid, LPid, Sock, Ref})
+%% An accepting transport spawned by diameter, not yet owning an
+%% association.
+i({accept, Ref, LPid, Pid})
when is_pid(Pid) ->
putr(?REF_KEY, Ref),
proc_lib:init_ack({ok, self()}),
monitor(process, Pid),
- monitor(process, LPid),
- #transport{parent = Pid,
- mode = {accept, LPid},
- socket = Sock};
+ MRef = monitor(process, LPid),
+ wait([{peeloff, MRef}], #transport{parent = Pid,
+ mode = {accept, LPid}});
-%% An accepting transport spawned at association establishment.
-i({accept, Ref, LPid, Sock, Id}) ->
+%% An accepting transport spawned at association establishment, whose
+%% parent is not yet known.
+i({accept, Ref, LPid}) ->
putr(?REF_KEY, Ref),
proc_lib:init_ack({ok, self()}),
+ erlang:send_after(?ACCEPT_TIMEOUT, self(), accept_timeout),
MRef = monitor(process, LPid),
- %% Wait for a signal that the transport has been started before
- %% processing other messages.
+ wait([{parent, Ref}, {peeloff, MRef}], #transport{mode = {accept, LPid}}).
+
+%% wait/2
+%%
+%% Wait for diameter to start the transport process and for the
+%% association to be peeled off before processing other messages.
+
+wait(Keys, S) ->
+ lists:foldl(fun i/2, S, Keys).
+
+i({K, Ref}, #transport{mode = {accept, _}} = S) ->
receive
- {Ref, Pid} -> %% transport started
- #transport{parent = Pid,
- mode = {accept, LPid},
- socket = Sock};
- {'DOWN', MRef, process, _, _} = T -> %% listener down
- close(Sock, Id),
+ {Ref, Pid} when K == parent -> %% transport process started
+ S#transport{parent = Pid};
+ {K, T, Matches} when K == peeloff -> %% association
+ {sctp, Sock, _RA, _RP, _Data} = T,
+ ok = accept_peer(Sock, Matches),
+ demonitor(Ref, [flush]),
+ t(T, S#transport{socket = Sock});
+ accept_timeout = T ->
+ x(T);
+ {'DOWN', _, process, _, _} = T ->
x(T)
- after ?ACCEPT_TIMEOUT ->
- close(Sock, Id),
- x(timeout)
end.
-%% close/2
-
-close(Sock, Id) ->
- gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Id}).
-%% Having to pass a record here is hokey.
-
%% listener/2
%% Accepting processes can be started concurrently: ensure only one
@@ -297,7 +298,7 @@ listener({LRef, T}) ->
l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
{LAs, _Sock} = AS,
{LPid, LAs};
-
+
%% ... or not.
l([], LRef, T) ->
{ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}),
@@ -367,17 +368,11 @@ type(T) ->
%% # handle_call/3
%% ---------------------------------------------------------------------------
-handle_call(T, From, #listener{pending = L} = S)
- when is_list(L) ->
- handle_call(T, From, upgrade(S));
-
handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref,
- pending = {N,Q},
count = K}
= S) ->
- TPid = accept(Ref, Pid, S),
- {reply, {ok, TPid}, downgrade(S#listener{pending = {N-1,Q},
- count = K+1})};
+ {TPid, NewS} = accept(Ref, Pid, S),
+ {reply, {ok, TPid}, NewS#listener{count = K+1}};
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -396,18 +391,9 @@ handle_cast(_, State) ->
handle_info(T, #transport{} = S) ->
{noreply, #transport{} = t(T,S)};
-handle_info(T, #listener{pending = L} = S)
- when is_list(L) ->
- handle_info(T, upgrade(S));
-
handle_info(T, #listener{} = S) ->
- {noreply, downgrade(#listener{} = l(T,S))}.
-
-%% upgrade/1
+ {noreply, #listener{} = l(T,S)}.
-upgrade(#listener{pending = [TPid | {0,Q}]} = S) ->
- ets:insert(Q, {TPid, now()}),
- S#listener{pending = {-1,Q}}.
%% Prior to the possiblity of setting pool_size on in transport
%% configuration, a new accepting transport was only started following
%% the death of a predecessor, so that there was only at most one
@@ -416,26 +402,6 @@ upgrade(#listener{pending = [TPid | {0,Q}]} = S) ->
%% several accepting transports are started concurrently. Deal with
%% this by placing the started transports in a new queue of transport
%% processes waiting for an association.
-%%
-%% Since only one of this queue and the existing queue of controlling
-%% processes waiting for a transport to be started can be non-empty at
-%% any given time, implement both queues in the same ets table. The
-%% absolute value of the first element of the 2-tuple is the queue
-%% length, the sign says which queue it is.
-
-%% downgrade/1
-%%
-%% Revert to the pre-pool_size representation when possible, for
-%% backwards compatibility in the case that the pool_size option
-%% hasn't been used.
-
-downgrade(#listener{pending = {-1,Q}} = S) ->
- TPid = ets:first(Q),
- ets:delete(Q, TPid),
- S#listener{pending = [TPid | {0,Q}]};
-
-downgrade(S) ->
- S.
%% ---------------------------------------------------------------------------
%% # code_change/3
@@ -451,16 +417,6 @@ code_change(_, State, _) ->
terminate(_, #transport{assoc_id = undefined}) ->
ok;
-terminate(_, #transport{socket = Sock,
- mode = accept,
- assoc_id = Id}) ->
- close(Sock, Id);
-
-terminate(_, #transport{socket = Sock,
- mode = {accept, _},
- assoc_id = Id}) ->
- close(Sock, Id);
-
terminate(_, #transport{socket = Sock}) ->
gen_sctp:close(Sock);
@@ -487,21 +443,17 @@ start_timer(S) ->
%% Transition listener state.
%% Incoming message from SCTP.
-l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) ->
+l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
+ accept = Matches}
+ = S) ->
Id = assoc_id(Data),
+ {TPid, NewS} = accept(S),
+ TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches},
+ setopts(Sock),
+ NewS;
- try find(Id, Data, S) of
- {TPid, NewS} ->
- TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept},
- NewS;
- false ->
- S
- after
- setopts(Sock)
- end;
-
-l({'DOWN', MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) ->
- down(ets:member(Q, TPid), MRef, TPid, S);
+l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) ->
+ down(queue:member(TPid, Q), TPid, S);
%% Timeout after the last accepting process has died.
l({timeout, TRef, close = T}, #listener{tref = TRef,
@@ -510,36 +462,25 @@ l({timeout, TRef, close = T}, #listener{tref = TRef,
l({timeout, _, close}, #listener{} = S) ->
S.
-%% down/4
+%% down/3
+%%
+%% Accepting transport has died.
+
+%% One that's waiting for transport start in the pending queue ...
+down(true, TPid, #listener{pending = {N,Q},
+ count = K}
+ = S) ->
+ NQ = queue:filter(fun(P) -> P /= TPid end, Q),
+ if N < 0 -> %% awaiting an association ...
+ start_timer(S#listener{count = K-1,
+ pending = {N+1, NQ}});
+ true -> %% ... or one has been assigned
+ S#listener{pending = {N-1, NQ}}
+ end;
-%% Accepting transport has died. One that's awaiting an association ...
-down(true, MRef, TPid, #listener{pending = {N,Q},
- tmap = T,
- count = K}
- = S)
- when N < 0 ->
- ets:delete(Q, TPid),
- ets:delete(T, MRef),
- ets:delete(T, TPid),
- start_timer(S#listener{count = K-1,
- pending = {N+1,Q}});
-
-%% ... or one that already has one.
-down(B, MRef, TPid, #listener{socket = Sock,
- tmap = T,
- count = K,
- pending = {N,Q}}
- = S) ->
- [{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId
- ets:delete(T, MRef),
- ets:delete(T, Id),
- Id == TPid orelse close(Sock, Id),
- if B -> %% Waiting for attachment in the pending queue ...
- ets:delete(Q, TPid),
- S#listener{pending = {N-1,Q}};
- true -> %% ... or already attached
- start_timer(S#listener{count = K-1})
- end.
+%% ... or one that's already attached.
+down(false, _TPid, #listener{count = K} = S) ->
+ start_timer(S#listener{count = K-1}).
%% t/2
%%
@@ -557,20 +498,10 @@ t(T,S) ->
%% transition/2
-%% Listening process is transfering ownership of an association.
-transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg, Matches},
- #transport{mode = {accept, _},
- socket = LSock}
- = S) ->
- ok = accept_peer(Sock, Matches),
- transition(Msg, S#transport{socket = Sock});
-
%% Incoming message.
-transition({sctp, _Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
+transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
setopts(Sock),
recv(Data, S);
-%% Don't match on Sock since in R15B01 it can be the listening socket
-%% in the (peeled-off) accept case, which is likely a bug.
%% Outgoing message.
transition({diameter, {send, Msg}}, S) ->
@@ -592,13 +523,8 @@ transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
stop;
-%% Listener process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) ->
- stop;
-
-%% Ditto but we have ownership of the association. It might be that
-%% we'll go down anyway though.
-transition({'DOWN', _, process, _Pid, _}, #transport{mode = accept}) ->
+%% Timeout after transport process has been started.
+transition(accept_timeout, _) ->
ok;
%% Request for the local port number.
@@ -625,37 +551,27 @@ accept_peer(Sock, Matches) ->
orelse x({accept, RAddrs, Matches}),
ok.
-%% accept/1
-
-accept(Opts) ->
- [[M] || {accept, M} <- Opts].
-
%% accept/3
%%
%% Start a new transport process or use one that's already been
-%% started as a consequence of association establishment.
+%% started as a consequence of diameter requesting a transport
+%% process.
-%% No pending associations: spawn a new transport.
-accept(Ref, Pid, #listener{socket = Sock,
- tmap = T,
- pending = {N,Q}})
- when N =< 0 ->
- Arg = {accept, Pid, self(), Sock, Ref},
- {ok, TPid} = diameter_sctp_sup:start_child(Arg),
- MRef = monitor(process, TPid),
- ets:insert(T, [{MRef, TPid}, {TPid, MRef}]),
- ets:insert(Q, {TPid, now()}),
- TPid;
-%% Placing the transport in the second pending table makes it
-%% available to the next association.
+accept(Ref, Pid, #listener{pending = {N,_}} = S) ->
+ {TPid, NQ} = q(Ref, Pid, S),
+ {TPid, S#listener{pending = {N-1, NQ}}}.
%% Pending associations: attach to the first in the queue.
-accept(_, Pid, #listener{ref = Ref,
- pending = {_,Q}}) ->
- TPid = ets:first(Q),
+q(_, Pid, #listener{ref = Ref,
+ pending = {N,Q}})
+ when 0 < N ->
+ {TPid, _} = T = dq(Q),
TPid ! {Ref, Pid},
- ets:delete(Q, TPid),
- TPid.
+ T;
+
+%% No pending associations: spawn a new transport.
+q(Ref, Pid, #listener{pending = {_,Q}}) ->
+ nq({accept, Ref, self(), Pid}, Q).
%% send/2
@@ -716,7 +632,7 @@ recv({_, #sctp_assoc_change{} = E},
= S) ->
S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}};
-%% Lost association after establishment.
+%% Association failure.
recv({_, #sctp_assoc_change{}}, _) ->
stop;
@@ -727,8 +643,10 @@ recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid})
bin = Bin}),
ok;
-recv({_, #sctp_shutdown_event{assoc_id = Id}},
- #transport{assoc_id = Id}) ->
+recv({_, #sctp_shutdown_event{assoc_id = A}},
+ #transport{assoc_id = Id})
+ when A == Id;
+ A == 0 ->
stop;
%% Note that diameter_sctp(3) documents that sctp_events cannot be
@@ -764,52 +682,49 @@ up(#transport{parent = Pid,
diameter_peer:up(Pid),
S#transport{mode = A}.
-%% find/3
-
-find(Id, Data, #listener{tmap = T} = S) ->
- f(ets:lookup(T, Id), Data, S).
-
-%% New association ...
-f([],
- {_, #sctp_assoc_change{state = comm_up, assoc_id = Id}},
- #listener{pending = {N,Q}}
- = S) ->
- {find(Id, S), S#listener{pending = {N+1,Q}}};
-
-%% Known association ...
-f([{_, TPid}], _, S) ->
- {TPid, S};
-
-%% ... or not: discard.
-f([], _, _) ->
- false.
+%% accept/1
+%%
+%% Start a new transport process or use one that's already been
+%% started as a consequence of an event to a listener process.
-%% find/2
+accept(#listener{pending = {N,_}} = S) ->
+ {TPid, NQ} = q(S),
+ {TPid, S#listener{pending = {N+1, NQ}}}.
%% Transport waiting for an association: use it.
-find(Id, #listener{tmap = T,
- pending = {N,Q}})
+q(#listener{pending = {N,Q}})
when N < 0 ->
- TPid = ets:first(Q),
- [{TPid, MRef}] = ets:lookup(T, TPid),
- ets:insert(T, [{MRef, Id}, {Id, TPid}]),
- ets:delete(T, TPid),
- ets:delete(Q, TPid),
- TPid;
+ dq(Q);
%% No transport start yet: spawn one and queue.
-find(Id, #listener{ref = Ref,
- socket = Sock,
- tmap = T,
- pending = {_,Q}}) ->
- Arg = {accept, Ref, self(), Sock, Id},
+q(#listener{ref = Ref,
+ pending = {_,Q}}) ->
+ nq({accept, Ref, self()}, Q).
+
+%% nq/2
+%%
+%% Place a transport process in the second pending queue to make it
+%% available to the next association.
+
+nq(Arg, Q) ->
{ok, TPid} = diameter_sctp_sup:start_child(Arg),
- MRef = monitor(process, TPid),
- ets:insert(T, [{MRef, Id}, {Id, TPid}]),
- ets:insert(Q, {TPid, now()}),
- TPid.
+ monitor(process, TPid),
+ {TPid, queue:in(TPid, Q)}.
+
+%% dq/1
+%%
+%% Remove a transport process from the first pending queue to assign
+%% it to an existing association.
+
+dq(Q) ->
+ {{value, TPid}, NQ} = queue:out(Q),
+ {TPid, NQ}.
%% assoc_id/1
+%%
+%% It's unclear if this is needed, or if the first message on an
+%% association is always sctp_assoc_change, but don't assume since
+%% SCTP behaviour differs between operating systems.
assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) ->
Id;