aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src')
-rw-r--r--lib/diameter/src/base/diameter_codec.erl3
-rw-r--r--lib/diameter/src/base/diameter_peer.erl13
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl31
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl17
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl5
-rw-r--r--lib/diameter/src/diameter.appup.src82
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl331
7 files changed, 152 insertions, 330 deletions
diff --git a/lib/diameter/src/base/diameter_codec.erl b/lib/diameter/src/base/diameter_codec.erl
index c8bd0ebd15..34276a1674 100644
--- a/lib/diameter/src/base/diameter_codec.erl
+++ b/lib/diameter/src/base/diameter_codec.erl
@@ -591,6 +591,7 @@ split_head(<<Code:32, 0:1, M:1, P:1, _:5, Len:24, _/binary>>) ->
%% Header is truncated.
split_head(Bin) ->
?THROW({5014, #diameter_avp{data = Bin}}).
+%% Note that pack_avp/1 will pad this at encode if sent in a Failed-AVP.
%% 3588:
%%
@@ -620,7 +621,7 @@ split_head(Bin) ->
%% AVP header with zero up to the minimum AVP header length.
%%
%% The underlined clause must be in error since (1) a header less than
-%% the minimum value mean we don't know the identity of the AVP and
+%% the minimum value mean we might not know the identity of the AVP and
%% (2) the last sentence covers this case.
%% split_data/3
diff --git a/lib/diameter/src/base/diameter_peer.erl b/lib/diameter/src/base/diameter_peer.erl
index e19f011920..acec91c43f 100644
--- a/lib/diameter/src/base/diameter_peer.erl
+++ b/lib/diameter/src/base/diameter_peer.erl
@@ -122,7 +122,7 @@ pair([{transport_module, M} | Rest], Mods, Acc) ->
pair([{transport_config = T, C} | Rest], Mods, Acc) ->
pair([{T, C, ?DEFAULT_TTMO} | Rest], Mods, Acc);
pair([{transport_config, C, Tmo} | Rest], Mods, Acc) ->
- pair(Rest, [], acc({Mods, C, Tmo}, Acc));
+ pair(Rest, [], acc({lists:reverse(Mods), C, Tmo}, Acc));
pair([_ | Rest], Mods, Acc) ->
pair(Rest, Mods, Acc);
@@ -131,13 +131,16 @@ pair([_ | Rest], Mods, Acc) ->
pair([], [], []) ->
[{[?DEFAULT_TMOD], ?DEFAULT_TCFG, ?DEFAULT_TTMO}];
-%% One transport_module, one transport_config.
-pair([], [M], [{[], Cfg, Tmo}]) ->
- [{[M], Cfg, Tmo}];
+%% One transport_module, one transport_config: ignore option order.
+%% That is, interpret [{transport_config, _}, {transport_module, _}]
+%% as if the order was reversed, not as config with default module and
+%% module with default config.
+pair([], [_] = Mods, [{[], Cfg, Tmo}]) ->
+ [{Mods, Cfg, Tmo}];
%% Trailing transport_module: default transport_config.
pair([], [_|_] = Mods, Acc) ->
- lists:reverse(acc({Mods, ?DEFAULT_TCFG, ?DEFAULT_TTMO}, Acc));
+ pair([{transport_config, ?DEFAULT_TCFG}], Mods, Acc);
pair([], [], Acc) ->
lists:reverse(def(Acc)).
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 9e8bf2ffcd..f5e04d3eae 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -120,7 +120,6 @@
service :: #diameter_service{},
dpr = false :: false
| true %% DPR received, DPA sent
- | {uint32(), uint32()} %% set in old code
| {boolean(), uint32(), uint32()},
%% hop by hop and end to end identifiers in
%% outgoing DPR; boolean says whether or not
@@ -156,8 +155,7 @@
%% # start/3
%% ---------------------------------------------------------------------------
--spec start(T, [Opt], {[diameter:service_opt()]
- | diameter:sequence(), %% from old code
+-spec start(T, [Opt], {[diameter:service_opt()],
[node()],
module(),
#diameter_service{}})
@@ -196,9 +194,6 @@ init(T) ->
proc_lib:init_ack({ok, self()}),
gen_server:enter_loop(?MODULE, [], i(T)).
-i({Ack, WPid, T, Opts, {{_,_} = Mask, Nodes, Dict0, Svc}}) -> %% from old code
- i({Ack, WPid, T, Opts, {[{sequence, Mask}], Nodes, Dict0, Svc}});
-
i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) ->
erlang:monitor(process, WPid),
wait(Ack, WPid),
@@ -330,14 +325,11 @@ handle_info(T, #state{} = State) ->
{?MODULE, Tag, Reason} ->
?LOG(stop, Tag),
{stop, {shutdown, Reason}, State}
- end;
+ end.
%% The form of the throw caught here is historical. It's
%% significant that it's not a 2-tuple, as in ?FAILURE(Reason),
%% since these are caught elsewhere.
-handle_info(T, S) -> %% started in old code
- handle_info(T, #state{} = erlang:append_element(S, infinity)).
-
%% Note that there's no guarantee that the service and transport
%% capabilities are good enough to build a CER/CEA that can be
%% succesfully encoded. It's not checked at diameter:add_transport/2
@@ -367,9 +359,6 @@ eraser(Key) ->
%% transition/2
-transition(T, #state{dpr = {Hid, Eid}} = S) -> %% DPR sent from old code
- transition(T, S#state{dpr = {false, Hid, Eid}});
-
%% Connection to peer.
transition({diameter, {TPid, connected, Remote}},
#state{transport = TPid,
@@ -1297,25 +1286,15 @@ dpa_timer(Tmo) ->
erlang:send_after(Tmo, self(), dpa_timeout).
dpa_timeout() ->
- dpa_timeout(getr(?DPA_KEY)).
-
-dpa_timeout({_, Tmo}) ->
- Tmo;
-dpa_timeout(undefined) -> %% set in old code
- ?DPA_TIMEOUT;
-dpa_timeout(Tmo) -> %% ditto
+ {_, Tmo} = getr(?DPA_KEY),
Tmo.
dpr_timer() ->
dpa_timer(dpr_timeout()).
dpr_timeout() ->
- dpr_timeout(getr(?DPA_KEY)).
-
-dpr_timeout({Tmo, _}) ->
- Tmo;
-dpr_timeout(_) -> %% set in old code
- ?DPR_TIMEOUT.
+ {Tmo, _} = getr(?DPA_KEY),
+ Tmo.
%% register_everywhere/1
%%
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index 9d6fbc9113..692a01e651 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -98,9 +98,6 @@
%% # make_recvdata/1
%% ---------------------------------------------------------------------------
-make_recvdata([SvcName, PeerT, Apps, {_,_} = Mask | _]) -> %% from old code
- make_recvdata([SvcName, PeerT, Apps, [{sequence, Mask}]]);
-
make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) ->
{_,_} = Mask = proplists:get_value(sequence, SvcOpts),
#recvdata{service_name = SvcName,
@@ -262,7 +259,8 @@ recv(false, #request{ref = Ref, handler = Pid} = Req, _, Pkt, Dict0, _) ->
%% any others are discarded.
%% ... or not.
-recv(false, false, TPid, _, _, _) ->
+recv(false, false, TPid, Pkt, _, _) ->
+ ?LOG(discarded, Pkt#diameter_packet.header),
incr(TPid, {{unknown, 0}, recv, discarded}),
ok.
@@ -301,13 +299,7 @@ recv_request(TPid,
RecvData),
TPid,
Dict0,
- RecvData);
-
-recv_request(TPid, Pkt, Dict0, RecvData) -> %% from old code
- recv_request(TPid,
- Pkt,
- Dict0,
- #recvdata{} = erlang:append_element(RecvData, [])).
+ RecvData).
%% recv_R/5
@@ -1642,9 +1634,6 @@ pick_peer(SvcName,
Filter,
Xtra})).
-pick({{_,_,_} = Transport, Mask}) -> %% from old code; dialyzer complains
- {Transport, Mask, []}; %% about this
-
pick(false) ->
{error, no_connection};
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index 5f333684bc..c6d0a0b6ed 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -811,9 +811,6 @@ restart(S) -> %% reconnect has won race with timeout
%% state down rather then initial when receiving notification of an
%% open connection.
-restart({T, Opts, Svc}, S) -> %% put in old code
- restart({T, Opts, Svc, []}, S);
-
restart({{connect, _} = T, Opts, Svc, SvcOpts},
#watchdog{parent = Pid,
restrict = {R,_},
@@ -828,7 +825,7 @@ restart({{connect, _} = T, Opts, Svc, SvcOpts},
%% die. Note that a state machine never enters state REOPEN in this
%% case.
restart({{accept, _}, _, _, _}, #watchdog{restrict = {_, false}}) ->
- stop; %% 'DOWN' was in old code: 'close' was not sent
+ stop;
%% Otherwise hang around until told to die, either by the service or
%% by another watchdog.
diff --git a/lib/diameter/src/diameter.appup.src b/lib/diameter/src/diameter.appup.src
index 8e82eb8014..86d231179c 100644
--- a/lib/diameter/src/diameter.appup.src
+++ b/lib/diameter/src/diameter.appup.src
@@ -38,42 +38,11 @@
{"1.5", [{restart_application, diameter}]}, %% R16B03
{"1.6", [{restart_application, diameter}]}, %% 17.0
{"1.7", [{restart_application, diameter}]}, %% 17.[12]
- {<<"^1\\.(7\\.1|8)$">>, %% 17.[34]
- [{load_module, diameter_lib},
- {load_module, diameter_peer},
- {load_module, diameter_reg},
- {load_module, diameter_session},
- {load_module, diameter_stats},
- {load_module, diameter_sync},
- {load_module, diameter_capx},
- {load_module, diameter_codec},
- {load_module, diameter_types},
- {load_module, diameter_traffic},
- {load_module, diameter_service},
- {load_module, diameter_peer_fsm},
- {load_module, diameter_watchdog},
- {load_module, diameter_tcp},
- {load_module, diameter_sctp},
- {load_module, diameter_config},
- {load_module, diameter},
- {load_module, diameter_gen_base_rfc6733},
- {load_module, diameter_gen_acct_rfc6733},
- {load_module, diameter_gen_base_rfc3588},
- {load_module, diameter_gen_base_accounting},
- {load_module, diameter_gen_relay},
- {update, diameter_transport_sup, supervisor},
- {update, diameter_service_sup, supervisor},
- {update, diameter_sup, supervisor}]},
- {"1.9", [{load_module, diameter_codec}, %% 17.5
- {load_module, diameter_traffic},
- {load_module, diameter_sctp},
- {load_module, diameter_gen_base_rfc6733},
- {load_module, diameter_gen_acct_rfc6733},
- {load_module, diameter_gen_base_rfc3588},
- {load_module, diameter_gen_base_accounting},
- {load_module, diameter_gen_relay}]},
- {"1.9.1", [{load_module, diameter_traffic}, %% 17.5.3
- {load_module, diameter_sctp}]}
+ {"1.7.1", [{restart_application, diameter}]}, %% 17.3
+ {"1.8", [{restart_application, diameter}]}, %% 17.4
+ {"1.9", [{restart_application, diameter}]}, %% 17.5
+ {"1.9.1", [{restart_application, diameter}]}, %% 17.5.3
+ {"1.9.2", [{restart_application, diameter}]} %% 17.5.5
],
[
{"0.9", [{restart_application, diameter}]},
@@ -93,41 +62,10 @@
{"1.5", [{restart_application, diameter}]},
{"1.6", [{restart_application, diameter}]},
{"1.7", [{restart_application, diameter}]},
- {<<"^1\\.(7\\.1|8)$">>,
- [{update, diameter_sup, supervisor},
- {update, diameter_service_sup, supervisor},
- {update, diameter_transport_sup, supervisor},
- {load_module, diameter_gen_relay},
- {load_module, diameter_gen_base_accounting},
- {load_module, diameter_gen_base_rfc3588},
- {load_module, diameter_gen_acct_rfc6733},
- {load_module, diameter_gen_base_rfc6733},
- {load_module, diameter},
- {load_module, diameter_config},
- {load_module, diameter_sctp},
- {load_module, diameter_tcp},
- {load_module, diameter_watchdog},
- {load_module, diameter_peer_fsm},
- {load_module, diameter_service},
- {load_module, diameter_traffic},
- {load_module, diameter_types},
- {load_module, diameter_codec},
- {load_module, diameter_capx},
- {load_module, diameter_sync},
- {load_module, diameter_stats},
- {load_module, diameter_session},
- {load_module, diameter_reg},
- {load_module, diameter_peer},
- {load_module, diameter_lib}]},
- {"1.9", [{load_module, diameter_gen_relay},
- {load_module, diameter_gen_base_accounting},
- {load_module, diameter_gen_base_rfc3588},
- {load_module, diameter_gen_acct_rfc6733},
- {load_module, diameter_gen_base_rfc6733},
- {load_module, diameter_sctp},
- {load_module, diameter_traffic},
- {load_module, diameter_codec}]},
- {"1.9.1", [{load_module, diameter_sctp},
- {load_module, diameter_traffic}]}
+ {"1.7.1", [{restart_application, diameter}]},
+ {"1.8", [{restart_application, diameter}]},
+ {"1.9", [{restart_application, diameter}]},
+ {"1.9.1", [{restart_application, diameter}]},
+ {"1.9.2", [{restart_application, diameter}]}
]
}.
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 110fa2c6e7..3e08b78ea1 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -104,15 +104,11 @@
{ref :: reference(),
socket :: gen_sctp:sctp_socket(),
count = 0 :: uint(), %% attached transport processes
- tmap = ets:new(?MODULE, []) :: ets:tid(),
- %% {MRef, Pid|AssocId}, {AssocId, Pid}
- pending = {0, ets:new(?MODULE, [ordered_set])},
+ pending = {0, queue:new()},
tref :: reference(),
accept :: [match()]}).
-%% Field tmap is used to map an incoming message or event to the
-%% relevant transport process. Field pending implements two queues:
-%% the first of transport-to-be processes to which an association has
-%% been assigned (at comm_up and written into tmap) but for which
+%% Field pending implements two queues: the first of transport-to-be
+%% processes to which an association has been assigned but for which
%% diameter hasn't yet spawned a transport process, a short-lived
%% state of affairs as a new transport is spawned as a consequence of
%% a peer being taken up, transport processes being spawned by the
@@ -126,8 +122,7 @@
%% queue or spawned and placed in the first queue. Thus, there are
%% only elements in one queue at a time, so share an ets table queue
%% and tag it with a positive length if it contains the first queue, a
-%% negative length if it contains the second queue. The case -1 is
-%% handled differently for backwards compatibility reasons.
+%% negative length if it contains the second queue.
%% ---------------------------------------------------------------------------
%% # start/3
@@ -229,7 +224,7 @@ i({listen, Ref, {Opts, Addrs}}) ->
proc_lib:init_ack({ok, self(), LAs}),
start_timer(#listener{ref = Ref,
socket = Sock,
- accept = accept(Matches)});
+ accept = [[M] || {accept, M} <- Matches]});
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
@@ -244,43 +239,49 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
mode = {connect, connect(Sock, RAs, RP, [])},
socket = Sock};
-%% An accepting transport spawned by diameter.
-i({accept, Pid, LPid, Sock, Ref})
+%% An accepting transport spawned by diameter, not yet owning an
+%% association.
+i({accept, Ref, LPid, Pid})
when is_pid(Pid) ->
putr(?REF_KEY, Ref),
proc_lib:init_ack({ok, self()}),
monitor(process, Pid),
- monitor(process, LPid),
- #transport{parent = Pid,
- mode = {accept, LPid},
- socket = Sock};
+ MRef = monitor(process, LPid),
+ wait([{peeloff, MRef}], #transport{parent = Pid,
+ mode = {accept, LPid}});
-%% An accepting transport spawned at association establishment.
-i({accept, Ref, LPid, Sock, Id}) ->
+%% An accepting transport spawned at association establishment, whose
+%% parent is not yet known.
+i({accept, Ref, LPid}) ->
putr(?REF_KEY, Ref),
proc_lib:init_ack({ok, self()}),
+ erlang:send_after(?ACCEPT_TIMEOUT, self(), accept_timeout),
MRef = monitor(process, LPid),
- %% Wait for a signal that the transport has been started before
- %% processing other messages.
+ wait([{parent, Ref}, {peeloff, MRef}], #transport{mode = {accept, LPid}}).
+
+%% wait/2
+%%
+%% Wait for diameter to start the transport process and for the
+%% association to be peeled off before processing other messages.
+
+wait(Keys, S) ->
+ lists:foldl(fun i/2, S, Keys).
+
+i({K, Ref}, #transport{mode = {accept, _}} = S) ->
receive
- {Ref, Pid} -> %% transport started
- #transport{parent = Pid,
- mode = {accept, LPid},
- socket = Sock};
- {'DOWN', MRef, process, _, _} = T -> %% listener down
- close(Sock, Id),
+ {Ref, Pid} when K == parent -> %% transport process started
+ S#transport{parent = Pid};
+ {K, T, Matches} when K == peeloff -> %% association
+ {sctp, Sock, _RA, _RP, _Data} = T,
+ ok = accept_peer(Sock, Matches),
+ demonitor(Ref, [flush]),
+ t(T, S#transport{socket = Sock});
+ accept_timeout = T ->
+ x(T);
+ {'DOWN', _, process, _, _} = T ->
x(T)
- after ?ACCEPT_TIMEOUT ->
- close(Sock, Id),
- x(timeout)
end.
-%% close/2
-
-close(Sock, Id) ->
- gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Id}).
-%% Having to pass a record here is hokey.
-
%% listener/2
%% Accepting processes can be started concurrently: ensure only one
@@ -298,7 +299,7 @@ listener({LRef, T}) ->
l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
{LAs, _Sock} = AS,
{LPid, LAs};
-
+
%% ... or not.
l([], LRef, T) ->
{ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}),
@@ -368,17 +369,11 @@ type(T) ->
%% # handle_call/3
%% ---------------------------------------------------------------------------
-handle_call(T, From, #listener{pending = L} = S)
- when is_list(L) ->
- handle_call(T, From, upgrade(S));
-
handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref,
- pending = {N,Q},
count = K}
= S) ->
- TPid = accept(Ref, Pid, S),
- {reply, {ok, TPid}, downgrade(S#listener{pending = {N-1,Q},
- count = K+1})};
+ {TPid, NewS} = accept(Ref, Pid, S),
+ {reply, {ok, TPid}, NewS#listener{count = K+1}};
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -397,18 +392,9 @@ handle_cast(_, State) ->
handle_info(T, #transport{} = S) ->
{noreply, #transport{} = t(T,S)};
-handle_info(T, #listener{pending = L} = S)
- when is_list(L) ->
- handle_info(T, upgrade(S));
-
handle_info(T, #listener{} = S) ->
- {noreply, downgrade(#listener{} = l(T,S))}.
-
-%% upgrade/1
+ {noreply, #listener{} = l(T,S)}.
-upgrade(#listener{pending = [TPid | {0,Q}]} = S) ->
- ets:insert(Q, {TPid, now()}),
- S#listener{pending = {-1,Q}}.
%% Prior to the possiblity of setting pool_size on in transport
%% configuration, a new accepting transport was only started following
%% the death of a predecessor, so that there was only at most one
@@ -417,26 +403,6 @@ upgrade(#listener{pending = [TPid | {0,Q}]} = S) ->
%% several accepting transports are started concurrently. Deal with
%% this by placing the started transports in a new queue of transport
%% processes waiting for an association.
-%%
-%% Since only one of this queue and the existing queue of controlling
-%% processes waiting for a transport to be started can be non-empty at
-%% any given time, implement both queues in the same ets table. The
-%% absolute value of the first element of the 2-tuple is the queue
-%% length, the sign says which queue it is.
-
-%% downgrade/1
-%%
-%% Revert to the pre-pool_size representation when possible, for
-%% backwards compatibility in the case that the pool_size option
-%% hasn't been used.
-
-downgrade(#listener{pending = {-1,Q}} = S) ->
- TPid = ets:first(Q),
- ets:delete(Q, TPid),
- S#listener{pending = [TPid | {0,Q}]};
-
-downgrade(S) ->
- S.
%% ---------------------------------------------------------------------------
%% # code_change/3
@@ -452,16 +418,6 @@ code_change(_, State, _) ->
terminate(_, #transport{assoc_id = undefined}) ->
ok;
-terminate(_, #transport{socket = Sock,
- mode = accept,
- assoc_id = Id}) ->
- close(Sock, Id);
-
-terminate(_, #transport{socket = Sock,
- mode = {accept, _},
- assoc_id = Id}) ->
- close(Sock, Id);
-
terminate(_, #transport{socket = Sock}) ->
gen_sctp:close(Sock);
@@ -488,21 +444,17 @@ start_timer(S) ->
%% Transition listener state.
%% Incoming message from SCTP.
-l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) ->
+l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
+ accept = Matches}
+ = S) ->
Id = assoc_id(Data),
+ {TPid, NewS} = accept(S),
+ TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches},
+ setopts(Sock),
+ NewS;
- try find(Id, Data, S) of
- {TPid, NewS} ->
- TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept},
- NewS;
- false ->
- S
- after
- setopts(Sock)
- end;
-
-l({'DOWN', MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) ->
- down(ets:member(Q, TPid), MRef, TPid, S);
+l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) ->
+ down(queue:member(TPid, Q), TPid, S);
%% Timeout after the last accepting process has died.
l({timeout, TRef, close = T}, #listener{tref = TRef,
@@ -511,36 +463,25 @@ l({timeout, TRef, close = T}, #listener{tref = TRef,
l({timeout, _, close}, #listener{} = S) ->
S.
-%% down/4
+%% down/3
+%%
+%% Accepting transport has died.
+
+%% One that's waiting for transport start in the pending queue ...
+down(true, TPid, #listener{pending = {N,Q},
+ count = K}
+ = S) ->
+ NQ = queue:filter(fun(P) -> P /= TPid end, Q),
+ if N < 0 -> %% awaiting an association ...
+ start_timer(S#listener{count = K-1,
+ pending = {N+1, NQ}});
+ true -> %% ... or one has been assigned
+ S#listener{pending = {N-1, NQ}}
+ end;
-%% Accepting transport has died. One that's awaiting an association ...
-down(true, MRef, TPid, #listener{pending = {N,Q},
- tmap = T,
- count = K}
- = S)
- when N < 0 ->
- ets:delete(Q, TPid),
- ets:delete(T, MRef),
- ets:delete(T, TPid),
- start_timer(S#listener{count = K-1,
- pending = {N+1,Q}});
-
-%% ... or one that already has one.
-down(B, MRef, TPid, #listener{socket = Sock,
- tmap = T,
- count = K,
- pending = {N,Q}}
- = S) ->
- [{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId
- ets:delete(T, MRef),
- ets:delete(T, Id),
- Id == TPid orelse close(Sock, Id),
- if B -> %% Waiting for attachment in the pending queue ...
- ets:delete(Q, TPid),
- S#listener{pending = {N-1,Q}};
- true -> %% ... or already attached
- start_timer(S#listener{count = K-1})
- end.
+%% ... or one that's already attached.
+down(false, _TPid, #listener{count = K} = S) ->
+ start_timer(S#listener{count = K-1}).
%% t/2
%%
@@ -558,20 +499,10 @@ t(T,S) ->
%% transition/2
-%% Listening process is transfering ownership of an association.
-transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg, Matches},
- #transport{mode = {accept, _},
- socket = LSock}
- = S) ->
- ok = accept_peer(Sock, Matches),
- transition(Msg, S#transport{socket = Sock});
-
%% Incoming message.
-transition({sctp, _Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
+transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
setopts(Sock),
recv(Data, S);
-%% Don't match on Sock since in R15B01 it can be the listening socket
-%% in the (peeled-off) accept case, which is likely a bug.
%% Outgoing message.
transition({diameter, {send, Msg}}, S) ->
@@ -593,13 +524,8 @@ transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
stop;
-%% Listener process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) ->
- stop;
-
-%% Ditto but we have ownership of the association. It might be that
-%% we'll go down anyway though.
-transition({'DOWN', _, process, _Pid, _}, #transport{mode = accept}) ->
+%% Timeout after transport process has been started.
+transition(accept_timeout, _) ->
ok;
%% Request for the local port number.
@@ -626,37 +552,27 @@ accept_peer(Sock, Matches) ->
orelse x({accept, RAddrs, Matches}),
ok.
-%% accept/1
-
-accept(Opts) ->
- [[M] || {accept, M} <- Opts].
-
%% accept/3
%%
%% Start a new transport process or use one that's already been
-%% started as a consequence of association establishment.
+%% started as a consequence of diameter requesting a transport
+%% process.
-%% No pending associations: spawn a new transport.
-accept(Ref, Pid, #listener{socket = Sock,
- tmap = T,
- pending = {N,Q}})
- when N =< 0 ->
- Arg = {accept, Pid, self(), Sock, Ref},
- {ok, TPid} = diameter_sctp_sup:start_child(Arg),
- MRef = monitor(process, TPid),
- ets:insert(T, [{MRef, TPid}, {TPid, MRef}]),
- ets:insert(Q, {TPid, now()}),
- TPid;
-%% Placing the transport in the second pending table makes it
-%% available to the next association.
+accept(Ref, Pid, #listener{pending = {N,_}} = S) ->
+ {TPid, NQ} = q(Ref, Pid, S),
+ {TPid, S#listener{pending = {N-1, NQ}}}.
%% Pending associations: attach to the first in the queue.
-accept(_, Pid, #listener{ref = Ref,
- pending = {_,Q}}) ->
- TPid = ets:first(Q),
+q(_, Pid, #listener{ref = Ref,
+ pending = {N,Q}})
+ when 0 < N ->
+ {TPid, _} = T = dq(Q),
TPid ! {Ref, Pid},
- ets:delete(Q, TPid),
- TPid.
+ T;
+
+%% No pending associations: spawn a new transport.
+q(Ref, Pid, #listener{pending = {_,Q}}) ->
+ nq({accept, Ref, self(), Pid}, Q).
%% send/2
@@ -717,7 +633,7 @@ recv({_, #sctp_assoc_change{} = E},
= S) ->
S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}};
-%% Lost association after establishment.
+%% Association failure.
recv({_, #sctp_assoc_change{}}, _) ->
stop;
@@ -728,8 +644,10 @@ recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid})
bin = Bin}),
ok;
-recv({_, #sctp_shutdown_event{assoc_id = Id}},
- #transport{assoc_id = Id}) ->
+recv({_, #sctp_shutdown_event{assoc_id = A}},
+ #transport{assoc_id = Id})
+ when A == Id;
+ A == 0 ->
stop;
%% Note that diameter_sctp(3) documents that sctp_events cannot be
@@ -765,52 +683,49 @@ up(#transport{parent = Pid,
diameter_peer:up(Pid),
S#transport{mode = A}.
-%% find/3
-
-find(Id, Data, #listener{tmap = T} = S) ->
- f(ets:lookup(T, Id), Data, S).
-
-%% New association ...
-f([],
- {_, #sctp_assoc_change{state = comm_up, assoc_id = Id}},
- #listener{pending = {N,Q}}
- = S) ->
- {find(Id, S), S#listener{pending = {N+1,Q}}};
-
-%% Known association ...
-f([{_, TPid}], _, S) ->
- {TPid, S};
-
-%% ... or not: discard.
-f([], _, _) ->
- false.
+%% accept/1
+%%
+%% Start a new transport process or use one that's already been
+%% started as a consequence of an event to a listener process.
-%% find/2
+accept(#listener{pending = {N,_}} = S) ->
+ {TPid, NQ} = q(S),
+ {TPid, S#listener{pending = {N+1, NQ}}}.
%% Transport waiting for an association: use it.
-find(Id, #listener{tmap = T,
- pending = {N,Q}})
+q(#listener{pending = {N,Q}})
when N < 0 ->
- TPid = ets:first(Q),
- [{TPid, MRef}] = ets:lookup(T, TPid),
- ets:insert(T, [{MRef, Id}, {Id, TPid}]),
- ets:delete(T, TPid),
- ets:delete(Q, TPid),
- TPid;
+ dq(Q);
%% No transport start yet: spawn one and queue.
-find(Id, #listener{ref = Ref,
- socket = Sock,
- tmap = T,
- pending = {_,Q}}) ->
- Arg = {accept, Ref, self(), Sock, Id},
+q(#listener{ref = Ref,
+ pending = {_,Q}}) ->
+ nq({accept, Ref, self()}, Q).
+
+%% nq/2
+%%
+%% Place a transport process in the second pending queue to make it
+%% available to the next association.
+
+nq(Arg, Q) ->
{ok, TPid} = diameter_sctp_sup:start_child(Arg),
- MRef = monitor(process, TPid),
- ets:insert(T, [{MRef, Id}, {Id, TPid}]),
- ets:insert(Q, {TPid, now()}),
- TPid.
+ monitor(process, TPid),
+ {TPid, queue:in(TPid, Q)}.
+
+%% dq/1
+%%
+%% Remove a transport process from the first pending queue to assign
+%% it to an existing association.
+
+dq(Q) ->
+ {{value, TPid}, NQ} = queue:out(Q),
+ {TPid, NQ}.
%% assoc_id/1
+%%
+%% It's unclear if this is needed, or if the first message on an
+%% association is always sctp_assoc_change, but don't assume since
+%% SCTP behaviour differs between operating systems.
assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) ->
Id;