aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter')
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl52
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl73
2 files changed, 71 insertions, 54 deletions
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index d593d0ab84..de341741db 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -129,16 +129,14 @@
%% below to start the process implementing the Peer State Machine.
%%
-%%% ---------------------------------------------------------------------------
-%%% # start({connect|accept, Ref}, Opts, Service)
-%%%
-%%% Output: Pid
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% # start/3
+%% ---------------------------------------------------------------------------
-spec start(T, [Opt], {diameter:sequence(),
diameter:restriction(),
#diameter_service{}})
- -> pid()
+ -> {reference(), pid()}
when T :: {connect|accept, diameter:transport_ref()},
Opt :: diameter:transport_opt().
@@ -147,9 +145,15 @@
%% specified on the transport in question. Check here that the list is
%% still non-empty.
-start({_,_} = Type, Opts, MS) ->
- {ok, Pid} = diameter_peer_fsm_sup:start_child({self(), Type, Opts, MS}),
- Pid.
+start({_,_} = Type, Opts, S) ->
+ Ack = make_ref(),
+ T = {Ack, self(), Type, Opts, S},
+ {ok, Pid} = diameter_peer_fsm_sup:start_child(T),
+ try
+ {erlang:monitor(process, Pid), Pid}
+ after
+ Pid ! Ack
+ end.
start_link(T) ->
{ok, _} = proc_lib:start_link(?MODULE,
@@ -158,8 +162,8 @@ start_link(T) ->
infinity,
diameter_lib:spawn_opts(server, [])).
-%%% ---------------------------------------------------------------------------
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
%% init/1
@@ -167,12 +171,13 @@ init(T) ->
proc_lib:init_ack({ok, self()}),
gen_server:enter_loop(?MODULE, [], i(T)).
-i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps,
- capabilities = LCaps}
- = Svc}}) ->
- [] /= Apps orelse ?ERROR({no_apps, T, Opts}),
+i({Ack, WPid, {M, Ref} = T, Opts, {Mask,
+ Nodes,
+ #diameter_service{capabilities = LCaps}
+ = Svc}}) ->
+ erlang:monitor(process, WPid),
+ wait(Ack, WPid),
putr(?DWA_KEY, dwa(LCaps)),
- {M, Ref} = T,
diameter_stats:reg(Ref),
{[Cs,Ds], Rest} = proplists:split(Opts, [capabilities_cb, disconnect_cb]),
putr(?CB_KEY, {Ref, [F || {_,F} <- Cs]}),
@@ -180,7 +185,6 @@ i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps,
putr(?REF_KEY, Ref),
putr(?SEQUENCE_KEY, Mask),
putr(?RESTRICT_KEY, Nodes),
- erlang:monitor(process, WPid),
{TPid, Addrs} = start_transport(T, Rest, Svc),
Tmo = proplists:get_value(capx_timeout, Opts, ?EVENT_TIMEOUT),
?IS_TIMEOUT(Tmo) orelse ?ERROR({invalid, {capx_timeout, Tmo}}),
@@ -198,6 +202,16 @@ i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps,
%% watchdog start (start/2) succeeds regardless so as not to crash the
%% service.
+%% Wait for the caller to have a monitor to avoid a race with our
+%% death. (Since the exit reason is used in diameter_service.)
+wait(Ref, Pid) ->
+ receive
+ Ref ->
+ ok;
+ {'DOWN', _, process, Pid, _} = D ->
+ exit({shutdown, D})
+ end.
+
start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) ->
Addrs0 = LCaps#diameter_caps.host_ip_address,
start_transport(Addrs0, {T, Opts, Svc}).
@@ -304,8 +318,8 @@ terminate(_, _) ->
code_change(_, State, _) ->
{ok, State}.
-%%% ---------------------------------------------------------------------------
-%%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
+%% ---------------------------------------------------------------------------
putr(Key, Val) ->
put({?MODULE, Key}, Val).
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index ece5f0e359..10ab246b88 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -62,11 +62,13 @@
restrict :: {diameter:restriction(), boolean()},
shutdown = false :: boolean()}).
+%% ---------------------------------------------------------------------------
%% start/2
%%
%% Start a monitor before the watchdog is allowed to proceed to ensure
%% that a failed capabilities exchange produces the desired exit
%% reason.
+%% ---------------------------------------------------------------------------
-spec start(Type, {RecvData, [Opt], SvcName, SvcOpts, #diameter_service{}})
-> {reference(), pid()}
@@ -77,12 +79,12 @@
SvcName :: diameter:service_name().
start({_,_} = Type, T) ->
- Ref = make_ref(),
- {ok, Pid} = diameter_watchdog_sup:start_child({Ref, {Type, self(), T}}),
+ Ack = make_ref(),
+ {ok, Pid} = diameter_watchdog_sup:start_child({Ack, Type, self(), T}),
try
{erlang:monitor(process, Pid), Pid}
after
- send(Pid, Ref)
+ send(Pid, Ack)
end.
start_link(T) ->
@@ -101,22 +103,15 @@ init(T) ->
proc_lib:init_ack({ok, self()}),
gen_server:enter_loop(?MODULE, [], i(T)).
-i({Ref, {_, Pid, _} = T}) ->
- MRef = erlang:monitor(process, Pid),
- receive
- Ref ->
- make_state(T);
- {'DOWN', MRef, process, _, _} = D ->
- exit({shutdown, D})
- end.
-
-make_state({T, Pid, {RecvData,
- Opts,
- SvcName,
- SvcOpts,
- #diameter_service{applications = Apps,
- capabilities = Caps}
- = Svc}}) ->
+i({Ack, T, Pid, {RecvData,
+ Opts,
+ SvcName,
+ SvcOpts,
+ #diameter_service{applications = Apps,
+ capabilities = Caps}
+ = Svc}}) ->
+ erlang:monitor(process, Pid),
+ wait(Ack, Pid),
random:seed(now()),
putr(restart, {T, Opts, Svc}), %% save seeing it in trace
putr(dwr, dwr(Caps)), %%
@@ -124,9 +119,7 @@ make_state({T, Pid, {RecvData,
Restrict = proplists:get_value(restrict_connections, SvcOpts),
Nodes = restrict_nodes(Restrict),
#watchdog{parent = Pid,
- transport = monitor(diameter_peer_fsm:start(T,
- Opts,
- {Mask, Nodes, Svc})),
+ transport = start(T, Opts, Mask, Nodes, Svc),
tw = proplists:get_value(watchdog_timer,
Opts,
?DEFAULT_TW_INIT),
@@ -134,6 +127,21 @@ make_state({T, Pid, {RecvData,
sequence = Mask,
restrict = {Restrict, lists:member(node(), Nodes)}}.
+wait(Ref, Pid) ->
+ receive
+ Ref ->
+ ok;
+ {'DOWN', _, process, Pid, _} = D ->
+ exit({shutdown, D})
+ end.
+
+%% start/5
+
+start(T, Opts, Mask, Nodes, Svc) ->
+ {_MRef, Pid}
+ = diameter_peer_fsm:start(T, Opts, {Mask, Nodes, Svc}),
+ Pid.
+
%% handle_call/3
handle_call(_, _, State) ->
@@ -234,9 +242,7 @@ transition(close, #watchdog{}) ->
%% Service is asking for the peer to be taken down gracefully.
transition({shutdown, Pid, _}, #watchdog{parent = Pid,
- transport = undefined,
- status = S}) ->
- down = S, %% assert
+ transport = undefined}) ->
stop;
transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid,
transport = TPid}
@@ -312,9 +318,10 @@ transition({'DOWN', _, process, TPid, _Reason},
stop;
transition({'DOWN', _, process, TPid, _Reason},
- #watchdog{transport = TPid}
+ #watchdog{transport = TPid,
+ status = T}
= S) ->
- set_watchdog(S#watchdog{status = down,
+ set_watchdog(S#watchdog{status = case T of initial -> T; _ -> down end,
pending = false,
transport = undefined});
@@ -337,10 +344,6 @@ transition({state, Pid}, #watchdog{status = S}) ->
%% ===========================================================================
-monitor(Pid) ->
- erlang:monitor(process, Pid),
- Pid.
-
putr(Key, Val) ->
put({?MODULE, Key}, Val).
@@ -600,7 +603,9 @@ timeout(#watchdog{status = reopen,
%% process has died. We only need to handle state down since we start
%% the first watchdog when transitioning out of initial.
-timeout(#watchdog{status = down} = S) ->
+timeout(#watchdog{status = T} = S)
+ when T == initial;
+ T == down ->
restart(S).
%% restart/1
@@ -628,9 +633,7 @@ restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid,
= S) ->
send(Pid, {reconnect, self()}),
Nodes = restrict_nodes(R),
- S#watchdog{transport = monitor(diameter_peer_fsm:start(T,
- Opts,
- {Mask, Nodes, Svc})),
+ S#watchdog{transport = start(T, Opts, Mask, Nodes, Svc),
restrict = {R, lists:member(node(), Nodes)}};
%% No restriction on the number of connections to the same peer: just