diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 52 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 73 |
2 files changed, 71 insertions, 54 deletions
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index d593d0ab84..de341741db 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -129,16 +129,14 @@ %% below to start the process implementing the Peer State Machine. %% -%%% --------------------------------------------------------------------------- -%%% # start({connect|accept, Ref}, Opts, Service) -%%% -%%% Output: Pid -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # start/3 +%% --------------------------------------------------------------------------- -spec start(T, [Opt], {diameter:sequence(), diameter:restriction(), #diameter_service{}}) - -> pid() + -> {reference(), pid()} when T :: {connect|accept, diameter:transport_ref()}, Opt :: diameter:transport_opt(). @@ -147,9 +145,15 @@ %% specified on the transport in question. Check here that the list is %% still non-empty. -start({_,_} = Type, Opts, MS) -> - {ok, Pid} = diameter_peer_fsm_sup:start_child({self(), Type, Opts, MS}), - Pid. +start({_,_} = Type, Opts, S) -> + Ack = make_ref(), + T = {Ack, self(), Type, Opts, S}, + {ok, Pid} = diameter_peer_fsm_sup:start_child(T), + try + {erlang:monitor(process, Pid), Pid} + after + Pid ! Ack + end. start_link(T) -> {ok, _} = proc_lib:start_link(?MODULE, @@ -158,8 +162,8 @@ start_link(T) -> infinity, diameter_lib:spawn_opts(server, [])). -%%% --------------------------------------------------------------------------- -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- %% init/1 @@ -167,12 +171,13 @@ init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). -i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps, - capabilities = LCaps} - = Svc}}) -> - [] /= Apps orelse ?ERROR({no_apps, T, Opts}), +i({Ack, WPid, {M, Ref} = T, Opts, {Mask, + Nodes, + #diameter_service{capabilities = LCaps} + = Svc}}) -> + erlang:monitor(process, WPid), + wait(Ack, WPid), putr(?DWA_KEY, dwa(LCaps)), - {M, Ref} = T, diameter_stats:reg(Ref), {[Cs,Ds], Rest} = proplists:split(Opts, [capabilities_cb, disconnect_cb]), putr(?CB_KEY, {Ref, [F || {_,F} <- Cs]}), @@ -180,7 +185,6 @@ i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps, putr(?REF_KEY, Ref), putr(?SEQUENCE_KEY, Mask), putr(?RESTRICT_KEY, Nodes), - erlang:monitor(process, WPid), {TPid, Addrs} = start_transport(T, Rest, Svc), Tmo = proplists:get_value(capx_timeout, Opts, ?EVENT_TIMEOUT), ?IS_TIMEOUT(Tmo) orelse ?ERROR({invalid, {capx_timeout, Tmo}}), @@ -198,6 +202,16 @@ i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps, %% watchdog start (start/2) succeeds regardless so as not to crash the %% service. +%% Wait for the caller to have a monitor to avoid a race with our +%% death. (Since the exit reason is used in diameter_service.) +wait(Ref, Pid) -> + receive + Ref -> + ok; + {'DOWN', _, process, Pid, _} = D -> + exit({shutdown, D}) + end. + start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) -> Addrs0 = LCaps#diameter_caps.host_ip_address, start_transport(Addrs0, {T, Opts, Svc}). @@ -304,8 +318,8 @@ terminate(_, _) -> code_change(_, State, _) -> {ok, State}. -%%% --------------------------------------------------------------------------- -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- putr(Key, Val) -> put({?MODULE, Key}, Val). diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index ece5f0e359..10ab246b88 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -62,11 +62,13 @@ restrict :: {diameter:restriction(), boolean()}, shutdown = false :: boolean()}). +%% --------------------------------------------------------------------------- %% start/2 %% %% Start a monitor before the watchdog is allowed to proceed to ensure %% that a failed capabilities exchange produces the desired exit %% reason. +%% --------------------------------------------------------------------------- -spec start(Type, {RecvData, [Opt], SvcName, SvcOpts, #diameter_service{}}) -> {reference(), pid()} @@ -77,12 +79,12 @@ SvcName :: diameter:service_name(). start({_,_} = Type, T) -> - Ref = make_ref(), - {ok, Pid} = diameter_watchdog_sup:start_child({Ref, {Type, self(), T}}), + Ack = make_ref(), + {ok, Pid} = diameter_watchdog_sup:start_child({Ack, Type, self(), T}), try {erlang:monitor(process, Pid), Pid} after - send(Pid, Ref) + send(Pid, Ack) end. start_link(T) -> @@ -101,22 +103,15 @@ init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). -i({Ref, {_, Pid, _} = T}) -> - MRef = erlang:monitor(process, Pid), - receive - Ref -> - make_state(T); - {'DOWN', MRef, process, _, _} = D -> - exit({shutdown, D}) - end. - -make_state({T, Pid, {RecvData, - Opts, - SvcName, - SvcOpts, - #diameter_service{applications = Apps, - capabilities = Caps} - = Svc}}) -> +i({Ack, T, Pid, {RecvData, + Opts, + SvcName, + SvcOpts, + #diameter_service{applications = Apps, + capabilities = Caps} + = Svc}}) -> + erlang:monitor(process, Pid), + wait(Ack, Pid), random:seed(now()), putr(restart, {T, Opts, Svc}), %% save seeing it in trace putr(dwr, dwr(Caps)), %% @@ -124,9 +119,7 @@ make_state({T, Pid, {RecvData, Restrict = proplists:get_value(restrict_connections, SvcOpts), Nodes = restrict_nodes(Restrict), #watchdog{parent = Pid, - transport = monitor(diameter_peer_fsm:start(T, - Opts, - {Mask, Nodes, Svc})), + transport = start(T, Opts, Mask, Nodes, Svc), tw = proplists:get_value(watchdog_timer, Opts, ?DEFAULT_TW_INIT), @@ -134,6 +127,21 @@ make_state({T, Pid, {RecvData, sequence = Mask, restrict = {Restrict, lists:member(node(), Nodes)}}. +wait(Ref, Pid) -> + receive + Ref -> + ok; + {'DOWN', _, process, Pid, _} = D -> + exit({shutdown, D}) + end. + +%% start/5 + +start(T, Opts, Mask, Nodes, Svc) -> + {_MRef, Pid} + = diameter_peer_fsm:start(T, Opts, {Mask, Nodes, Svc}), + Pid. + %% handle_call/3 handle_call(_, _, State) -> @@ -234,9 +242,7 @@ transition(close, #watchdog{}) -> %% Service is asking for the peer to be taken down gracefully. transition({shutdown, Pid, _}, #watchdog{parent = Pid, - transport = undefined, - status = S}) -> - down = S, %% assert + transport = undefined}) -> stop; transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid, transport = TPid} @@ -312,9 +318,10 @@ transition({'DOWN', _, process, TPid, _Reason}, stop; transition({'DOWN', _, process, TPid, _Reason}, - #watchdog{transport = TPid} + #watchdog{transport = TPid, + status = T} = S) -> - set_watchdog(S#watchdog{status = down, + set_watchdog(S#watchdog{status = case T of initial -> T; _ -> down end, pending = false, transport = undefined}); @@ -337,10 +344,6 @@ transition({state, Pid}, #watchdog{status = S}) -> %% =========================================================================== -monitor(Pid) -> - erlang:monitor(process, Pid), - Pid. - putr(Key, Val) -> put({?MODULE, Key}, Val). @@ -600,7 +603,9 @@ timeout(#watchdog{status = reopen, %% process has died. We only need to handle state down since we start %% the first watchdog when transitioning out of initial. -timeout(#watchdog{status = down} = S) -> +timeout(#watchdog{status = T} = S) + when T == initial; + T == down -> restart(S). %% restart/1 @@ -628,9 +633,7 @@ restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid, = S) -> send(Pid, {reconnect, self()}), Nodes = restrict_nodes(R), - S#watchdog{transport = monitor(diameter_peer_fsm:start(T, - Opts, - {Mask, Nodes, Svc})), + S#watchdog{transport = start(T, Opts, Mask, Nodes, Svc), restrict = {R, lists:member(node(), Nodes)}}; %% No restriction on the number of connections to the same peer: just |