From 5027671e5d7871f565fd0ce4165ca0039661a79d Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Tue, 29 Jan 2013 14:30:59 +0100 Subject: Tweak (make) silent rules support --- lib/diameter/src/Makefile | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/Makefile b/lib/diameter/src/Makefile index a08c204a23..2b02f1de2b 100644 --- a/lib/diameter/src/Makefile +++ b/lib/diameter/src/Makefile @@ -119,7 +119,8 @@ ERL_COMPILE_FLAGS += \ # erl/hrl from dictionary file. gen/diameter_gen_%.erl gen/diameter_gen_%.hrl: dict/%.dia - $(dia_verbose)../bin/diameterc -o gen -i $(EBIN) $< + $(dia_verbose) \ + ../bin/diameterc -o gen -i $(EBIN) $< opt: $(TARGET_FILES) @@ -134,11 +135,13 @@ debug: # The dictionary parser. gen/$(DICT_YRL).erl: compiler/$(DICT_YRL).yrl - $(yecc_verbose)$(ERLC) -Werror -o $(@D) $< + $(yecc_verbose) \ + $(ERLC) -Werror -o $(@D) $< # Generate the app file. $(APP_TARGET): $(APP_SRC) ../vsn.mk modules.mk - $(gen_verbose)M=`echo $(notdir $(APP_MODULES)) | tr ' ' ,`; \ + $(gen_verbose) \ + M=`echo $(notdir $(APP_MODULES)) | tr ' ' ,`; \ R=`echo $(REGISTERED) | tr ' ' ,`; \ sed -e 's;%VSN%;$(VSN);' \ -e "s;%MODULES%;$$M;" \ @@ -146,7 +149,8 @@ $(APP_TARGET): $(APP_SRC) ../vsn.mk modules.mk $< > $@ $(APPUP_TARGET): $(APPUP_SRC) ../vsn.mk - $(vsn_verbose)sed -e 's;%VSN%;$(VSN);' $< > $@ + $(vsn_verbose) \ + sed -e 's;%VSN%;$(VSN);' $< > $@ app: $(APP_TARGET) $(APPUP_TARGET) dict: $(DICT_ERLS) @@ -262,7 +266,7 @@ depend: depend.mk # Generate dependencies makefile. depend.mk: depend.sed $(MODULES:%=%.erl) Makefile - $(gen_verbose)(for f in $(MODULES); do \ + (for f in $(MODULES); do \ (echo $$f; cat $$f.erl) | sed -f $<; \ done) \ > $@ -- cgit v1.2.3 From 74755d8d18aa1769840b0914fe73e9c5c4b3219b Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 24 Jan 2013 14:34:56 +0100 Subject: Remove upgrade code not needed after application restart Which will be the case with R16B in this case. --- lib/diameter/src/base/diameter_reg.erl | 9 +-------- lib/diameter/src/base/diameter_service.erl | 26 ++++---------------------- lib/diameter/src/base/diameter_watchdog.erl | 9 +-------- lib/diameter/src/transport/diameter_tcp.erl | 2 -- 4 files changed, 6 insertions(+), 40 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 619b12ecad..ac58e4bf5b 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2012. All Rights Reserved. +%% Copyright Ericsson AB 2010-2013. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in @@ -208,10 +208,6 @@ init(_) -> %% # handle_call/3 %% ---------------------------------------------------------- -handle_call(Req, From, S) - when not is_record(S, state) -> - handle_call(Req, From, upgrade(S)); - handle_call({add, Fun, Key, Pid}, _, S) -> B = Fun(?TABLE, {Key, Pid}), monitor(B andalso no_monitor(Pid), Pid), @@ -281,9 +277,6 @@ code_change(_OldVsn, State, _Extra) -> %% =========================================================================== -upgrade(S) -> - #state{} = list_to_tuple(tuple_to_list(S) ++ [[]]). - monitor(true, Pid) -> ets:insert(?TABLE, ?MONITOR(Pid, erlang:monitor(process, Pid))); monitor(false, _) -> diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index d2a416166f..4e6cb3cd79 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -151,7 +151,7 @@ ref :: match(reference()), %% key into diameter_config options :: match([diameter:transport_opt()]),%% from start_transport op_state = {?STATE_DOWN, ?WD_INITIAL} - :: match(op_state() | {op_state(), wd_state()}), + :: match({op_state(), wd_state()}), started = now(), %% at process start conn = false :: match(boolean() | pid())}). %% true at accepted, pid() at connection_up or reopen @@ -742,11 +742,8 @@ shutdown(Who, Reason, T) -> [], T)). -shutdown(conn = Who, #peer{op_state = {OS,_}} = P, Reason, Acc) -> - shutdown(Who, P#peer{op_state = OS}, Reason, Acc); - shutdown(conn, - #peer{pid = Pid, op_state = ?STATE_UP, conn = TPid}, + #peer{pid = Pid, op_state = {?STATE_UP, _}, conn = TPid}, Reason, Acc) -> TPid ! {shutdown, Pid, Reason}, @@ -962,14 +959,7 @@ accepted(Pid, _TPid, #state{peerT = PeerT} = S) -> fetch(Tid, Key) -> [T] = ets:lookup(Tid, Key), - case T of - #peer{op_state = ?STATE_UP} = P -> - P#peer{op_state = {?STATE_UP, ?WD_OKAY}}; - #peer{op_state = ?STATE_DOWN} = P -> - P#peer{op_state = {?STATE_DOWN, ?WD_DOWN}}; - _ -> - T - end. + T. %%% --------------------------------------------------------------------------- %%% # connection_up/3 @@ -3173,10 +3163,9 @@ peer_acc(ConnT, Acc, #peer{pid = Pid, type = Type, ref = Ref, options = Opts, - op_state = OS, + op_state = {_, WS}, started = T, conn = TPid}) -> - WS = wd_state(OS), dict:append(Ref, [{type, Type}, {options, Opts}, @@ -3210,13 +3199,6 @@ config_acc({Ref, T, Opts}, Dict) config_acc(_, Dict) -> Dict. -wd_state({_,S}) -> - S; -wd_state(?STATE_UP) -> - ?WD_OKAY; -wd_state(?STATE_DOWN) -> - ?WD_DOWN. - info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) -> [{peer, {Pid, T}}, {apps, SApps}, diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index a5429c967c..ad4a142f76 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -157,14 +157,7 @@ handle_info(T, #watchdog{} = State) -> ?LOG(stop, T), event(State, State#watchdog{status = down}), {stop, {shutdown, T}, State} - end; - -handle_info(T, S) -> - handle_info(T, upgrade(S)). - -upgrade(S) -> - #watchdog{} = list_to_tuple(tuple_to_list(S) - ++ [?NOMASK, {nodes, true}, false]). + end. event(#watchdog{status = T}, #watchdog{status = T}) -> ok; diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 7ec7b1c5e7..596e582ab0 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -368,8 +368,6 @@ handle_info(T, #monitor{} = S) -> %% # code_change/3 %% --------------------------------------------------------------------------- -code_change(_, {transport, _, _, _, _} = S, _) -> - {ok, #transport{} = list_to_tuple(tuple_to_list(S) ++ [false])}; code_change(_, State, _) -> {ok, State}. -- cgit v1.2.3 From 34b008c323591b0c02f42287c86b9a788aab51ad Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 24 Jan 2013 16:16:05 +0100 Subject: Simplify transport shutdown Service process informs the watchdog process which informs the peer process. (Instead of going directly to the latter in one case.) --- lib/diameter/src/base/diameter_peer_fsm.erl | 2 -- lib/diameter/src/base/diameter_service.erl | 48 ++++++++++------------------- lib/diameter/src/base/diameter_watchdog.erl | 16 +++++----- 3 files changed, 24 insertions(+), 42 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 5dab6214b1..b515a599ed 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -393,8 +393,6 @@ transition({send, Msg}, #state{transport = TPid}) -> %% Request for graceful shutdown at remove_transport, stop_service of %% application shutdown. -transition({shutdown = T, Pid}, S) -> - transition({T, Pid, transport}, S); transition({shutdown, Pid, Reason}, #state{parent = Pid, dpr = false} = S) -> dpr(Reason, S); transition({shutdown, Pid, _}, #state{parent = Pid}) -> diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 4e6cb3cd79..d01a5e5ab6 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -716,46 +716,30 @@ mod_state(Alias, ModS) -> %%% # shutdown/2 %%% --------------------------------------------------------------------------- -%% remove_transport: ask watchdogs to terminate their transport. +%% remove_transport shutdown(Refs, #state{peerT = PeerT}) when is_list(Refs) -> - ets:foldl(fun(P,ok) -> sp(P, Refs), ok end, ok, PeerT); + ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, PeerT); -%% application/service shutdown: ask transports to terminate themselves. -shutdown(Reason, #state{peerT = PeerT}) -> - %% A transport might not be alive to receive the shutdown request - %% but give those that are a chance to shutdown gracefully. - shutdown(conn, Reason, PeerT), - %% Kill the watchdogs explicitly in case there was no transport. - shutdown(peer, Reason, PeerT). +%% application/service shutdown +shutdown(Reason, #state{peerT = PeerT}) + when Reason == application; + Reason == service -> + diameter_lib:wait(ets:foldl(fun(P,A) -> st(P, Reason, A) end, + [], + PeerT)). -%% sp/2 +%% st/2 -sp(#peer{ref = Ref, pid = Pid}, Refs) -> +st(#peer{ref = Ref, pid = Pid}, Refs) -> lists:member(Ref, Refs) - andalso (Pid ! {shutdown, self()}). %% 'DOWN' cleans up - -%% shutdown/3 - -shutdown(Who, Reason, T) -> - diameter_lib:wait(ets:foldl(fun(X,A) -> shutdown(Who, X, Reason, A) end, - [], - T)). + andalso (Pid ! {shutdown, self(), transport}). %% 'DOWN' cleans up -shutdown(conn, - #peer{pid = Pid, op_state = {?STATE_UP, _}, conn = TPid}, - Reason, - Acc) -> - TPid ! {shutdown, Pid, Reason}, - [TPid | Acc]; +%% st/3 -shutdown(peer, #peer{pid = Pid}, _Reason, Acc) - when is_pid(Pid) -> - exit(Pid, shutdown), - [Pid | Acc]; - -shutdown(_, #peer{}, _, Acc) -> - Acc. +st(#peer{pid = Pid}, Reason, Acc) -> + Pid ! {shutdown, self(), Reason}, + [Pid | Acc]. %%% --------------------------------------------------------------------------- %%% # call_service/2 diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index ad4a142f76..49b6776a84 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -208,15 +208,15 @@ transition(close, #watchdog{}) -> ok; %% Service is asking for the peer to be taken down gracefully. -transition({shutdown, Pid}, #watchdog{parent = Pid, - transport = undefined, - status = S}) -> - down = S, %% sanity check +transition({shutdown, Pid, _}, #watchdog{parent = Pid, + transport = undefined, + status = S}) -> + down = S, %% assert stop; -transition({shutdown = T, Pid}, #watchdog{parent = Pid, - transport = TPid} - = S) -> - TPid ! {T, self()}, +transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid, + transport = TPid} + = S) -> + TPid ! {T, self(), Reason}, S#watchdog{shutdown = true}; %% Parent process has died, -- cgit v1.2.3 From f115a9f7428abd12b8ec50d4cbeb654b3efa0eb1 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 25 Jan 2013 18:49:50 +0100 Subject: Simplify watchdog transitions in service process In particular, use watchdog messages as input and do away with the older connection_up/down (and other) messages. Also, only maintain the watchdog state, not the older up/down op state. --- lib/diameter/src/base/diameter_service.erl | 253 ++++++++++++---------------- lib/diameter/src/base/diameter_watchdog.erl | 76 +++------ 2 files changed, 137 insertions(+), 192 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index d01a5e5ab6..a0af92c2a2 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -64,15 +64,7 @@ -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). -%% The states mirrored by peer_up/peer_down callbacks. --define(STATE_UP, up). --define(STATE_DOWN, down). - --type op_state() :: ?STATE_UP - | ?STATE_DOWN. - -%% The RFC 3539 watchdog states that are now maintained, albeit -%% along with the old up/down. okay = up, else down. +%% RFC 3539 watchdog states. -define(WD_INITIAL, initial). -define(WD_OKAY, okay). -define(WD_SUSPECT, suspect). @@ -150,11 +142,10 @@ type :: match(connect | accept), ref :: match(reference()), %% key into diameter_config options :: match([diameter:transport_opt()]),%% from start_transport - op_state = {?STATE_DOWN, ?WD_INITIAL} - :: match({op_state(), wd_state()}), + wd_state = ?WD_INITIAL :: match(wd_state()), started = now(), %% at process start conn = false :: match(boolean() | pid())}). - %% true at accepted, pid() at connection_up or reopen + %% true at accepted, pid() at okay/reopen %% Record representing a peer process as implemented by %% diameter_peer_fsm. The term "conn" is historical. Despite the name @@ -520,67 +511,26 @@ transition({accepted, Pid, TPid}, S) -> accepted(Pid, TPid, S), ok; -%% Peer process has a new open connection. -transition({connection_up, Pid, T}, S) -> - connection_up(Pid, T, S), - ok; - -%% Watchdog has a new connection that will be opened after DW[RA] -%% exchange. This message was added long after connection_up, to -%% communicate the information as soon as it's available. Leave -%% connection_up as is it for now, duplicated information and all. -transition({reopen, Pid, T}, S) -> - reopen(Pid, T, S), - ok; - -%% Watchdog has left state OKAY. -transition({connection_down, Pid}, S) -> - connection_down(Pid, S), - ok; - -%% Watchdog has returned to state OKAY. -transition({connection_up, Pid}, S) -> - connection_up(Pid, S), - ok; - -%% Accepting transport has lost connectivity. -transition({close, Pid}, S) -> - close(Pid, S), - ok; - %% Connecting transport is being restarted by watchdog. transition({reconnect, Pid}, S) -> reconnect(Pid, S), ok; -%% Watchdog is sending notification of a state transition. Note that -%% the connection_up/down messages pre-date this message and are still -%% used. A watchdog message will follow these and communicate the same -%% state as was set in handling connection_up/down. -transition({watchdog, Pid, {TPid, From, To}}, #state{service_name = SvcName, - peerT = PeerT}) -> - #peer{ref = Ref, type = T, options = Opts, op_state = {OS,_}} +%% Watchdog is sending notification of a state transition. +transition({watchdog, Pid, {[TPid | Data], From, To}}, + #state{service_name = SvcName, + peerT = PeerT} + = S) -> + #peer{ref = Ref, type = T, options = Opts} = P = fetch(PeerT, Pid), - insert(PeerT, P#peer{op_state = {OS, To}}), + watchdog(TPid, Data, From, To, P, S), send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}), ok; %% Death of a watchdog process (#peer.pid) results in the removal of -%% it's peer and any associated conn record when 'DOWN' is received -%% (after this) but the states will be {?STATE_UP, ?WD_DOWN} for a -%% short time. (No real problem since ?WD_* is only used in -%% service_info.) We set ?WD_OKAY as a consequence of connection_up -%% since we know a watchdog is coming. We can't set anything at -%% connection_down since we don't know if the subsequent watchdog -%% message will be ?WD_DOWN or ?WD_SUSPECT. We don't (yet) set -%% ?STATE_* as a consequence of a watchdog message since this requires -%% changing some of the matching on ?STATE_*. -%% +%% it's peer and any associated conn record when 'DOWN' is received. %% Death of a peer process process (#conn.pid, #peer.conn) results in -%% connection_down followed by watchdog ?WD_DOWN. The latter doesn't -%% result in the conn record being deleted since 'DOWN' from death of -%% its watchdog doesn't (yet) deal with the record having been -%% removed. +%% ?WD_DOWN. %% Monitor process has died. Just die with a reason that tells %% diameter_config about the happening. If a cleaner shutdown is @@ -945,21 +895,53 @@ fetch(Tid, Key) -> [T] = ets:lookup(Tid, Key), T. +%%% --------------------------------------------------------------------------- +%%% # watchdog/6 +%%% --------------------------------------------------------------------------- + +%% Watchdog has a new open connection. +watchdog(TPid, [T], _, ?WD_OKAY, Peer, State) -> + connection_up({TPid, T}, Peer, State); + +%% Watchdog has a new connection that will be opened after DW[RA] +%% exchange. +watchdog(TPid, [T], _, ?WD_REOPEN, Peer, State) -> + reopen({TPid, T}, Peer, State); + +%% Watchdog has recovered a suspect connection. +watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Peer, State) -> + #peer{conn = TPid} = Peer, %% assert + connection_up(Peer, State); + +%% Watchdog has an unresponsive connection. +watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Peer, State) -> + #peer{conn = TPid} = Peer, %% assert + connection_down(Peer, To, State); + +%% Watchdog has lost its connection. +watchdog(TPid, [], _, ?WD_DOWN = To, Peer, #state{connT = ConnT} = S) -> + close(Peer, S), + connection_down(Peer, To, S), + ets:delete(ConnT, TPid); + +watchdog(_, [], _, _, _, _) -> + ok. + %%% --------------------------------------------------------------------------- %%% # connection_up/3 %%% --------------------------------------------------------------------------- %% Watchdog process has reached state OKAY. -connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT, - connT = ConnT} - = S) -> - P = fetch(PeerT, Pid), +connection_up({TPid, {Caps, SApps, Pkt}}, + #peer{pid = Pid} + = P, + #state{connT = ConnT} + = S) -> C = #conn{pid = TPid, apps = SApps, caps = Caps, peer = Pid}, - insert(ConnT, C), connection_up([Pkt], P#peer{conn = TPid}, C, S). @@ -967,56 +949,45 @@ connection_up(Pid, {TPid, {Caps, SApps, Pkt}}, #state{peerT = PeerT, %%% # reopen/3 %%% --------------------------------------------------------------------------- -%% Note that this connection_up/3 rewrites the same #conn{} now -%% written here. Both do so in case reopen has not happened in old -%% code. - -reopen(Pid, {TPid, {Caps, SApps, _Pkt}}, #state{peerT = PeerT, - connT = ConnT}) -> - P = fetch(PeerT, Pid), - C = #conn{pid = TPid, - apps = SApps, - caps = Caps, - peer = Pid}, - - insert(ConnT, C), - #peer{op_state = {?STATE_DOWN, _}} - = P, - insert(PeerT, P#peer{op_state = {?STATE_DOWN, ?WD_REOPEN}, +reopen({TPid, {Caps, SApps, _Pkt}}, + #peer{pid = Pid} + = P, + #state{peerT = PeerT, + connT = ConnT}) -> + insert(ConnT, #conn{pid = TPid, + apps = SApps, + caps = Caps, + peer = Pid}), + insert(PeerT, P#peer{wd_state = ?WD_REOPEN, conn = TPid}). %%% --------------------------------------------------------------------------- %%% # connection_up/2 %%% --------------------------------------------------------------------------- -%% Peer process has transitioned back into the open state. Note that there -%% has been no new capabilties exchange in this case. +%% Watchdog has recovered as suspect connection. Note that there has +%% been no new capabilties exchange in this case. -connection_up(Pid, #state{peerT = PeerT, - connT = ConnT} - = S) -> - #peer{conn = TPid} = P = fetch(PeerT, Pid), - C = fetch(ConnT, TPid), - connection_up([], P, C, S). +connection_up(#peer{conn = TPid} = P, #state{connT = ConnT} = S) -> + connection_up([], P, fetch(ConnT, TPid), S). %% connection_up/4 -connection_up(T, P, C, #state{peerT = PeerT, - local_peers = LDict, - service_name = SvcName, - service - = #diameter_service{applications = Apps}} - = S) -> - #peer{conn = TPid, op_state = {?STATE_DOWN, _}} - = P, - #conn{apps = SApps, caps = Caps} - = C, - - insert(PeerT, P#peer{op_state = {?STATE_UP, ?WD_OKAY}}), - +connection_up(Extra, + #peer{conn = TPid} + = P, + #conn{apps = SApps, caps = Caps} + = C, + #state{peerT = PeerT, + local_peers = LDict, + service_name = SvcName, + service + = #diameter_service{applications = Apps}} + = S) -> + insert(PeerT, P#peer{wd_state = ?WD_OKAY}), request_peer_up(TPid), insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), - report_status(up, P, C, S, T). + report_status(up, P, C, S, Extra). insert_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps). @@ -1052,30 +1023,11 @@ peer_cb(MFA, Alias) -> end. %%% --------------------------------------------------------------------------- -%%% # connection_down/2 +%%% # connection_down/3 %%% --------------------------------------------------------------------------- -%% Watchdog has transitioned out of state OKAY. - -connection_down(Pid, #state{peerT = PeerT, - connT = ConnT} - = S) -> - #peer{op_state = {?STATE_UP, WS}, %% assert - conn = TPid} - = P - = fetch(PeerT, Pid), - - C = fetch(ConnT, TPid), - insert(PeerT, P#peer{op_state = {?STATE_DOWN, WS}}), - connection_down(P,C,S). - -%% connection_down/3 - -connection_down(#peer{op_state = {?STATE_DOWN, _}}, _, _) -> - ok; - -connection_down(#peer{conn = TPid, - op_state = {?STATE_UP, _}} +connection_down(#peer{wd_state = ?WD_OKAY, + conn = TPid} = P, #conn{caps = Caps, apps = SApps} @@ -1086,7 +1038,23 @@ connection_down(#peer{conn = TPid, = S) -> report_status(down, P, C, S, []), remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), - request_peer_down(TPid, S). + request_peer_down(TPid, S); + +connection_down(#peer{}, #conn{}, _) -> + ok; + +connection_down(#peer{wd_state = WS, + conn = TPid} + = P, + To, + #state{peerT = PeerT, + connT = ConnT} + = S) + when is_atom(To) -> + insert(PeerT, P#peer{wd_state = To}), + ?WD_OKAY == WS + andalso + connection_down(P, fetch(ConnT, TPid), S). remove_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps). @@ -1118,11 +1086,12 @@ peer_down(Pid, Reason, #state{peerT = PeerT} = S) -> %% Send an event at connection establishment failure. closed({shutdown, {close, _TPid, Reason}}, - #peer{op_state = {?STATE_DOWN, _}, + #peer{wd_state = WS, ref = Ref, type = Type, options = Opts}, - #state{service_name = SvcName}) -> + #state{service_name = SvcName}) + when WS /= ?WD_OKAY -> send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}); closed(_, _, _) -> ok. @@ -1134,9 +1103,8 @@ peer_down(#peer{conn = B}, _) %% ... or maybe it has. peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) -> - #conn{} = C = fetch(ConnT, TPid), - ets:delete_object(ConnT, C), - connection_down(P,C,S). + connection_down(P, ?WD_DOWN, S), + ets:delete(ConnT, TPid). %% restart/2 @@ -1222,14 +1190,13 @@ tc(false = No, _, _) -> %% removed %% the accepting watchdog upon reception of a CER from the previously %% connected peer, or us after reconnect_timer timeout. -close(Pid, #state{service_name = SvcName, - peerT = PeerT}) -> - #peer{pid = Pid, - type = accept, - ref = Ref, - options = Opts} - = fetch(PeerT, Pid), - +close(#peer{type = connect}, _) -> + ok; +close(#peer{type = accept, + pid = Pid, + ref = Ref, + options = Opts}, + #state{service_name = SvcName}) -> c(Pid, diameter_config:have_transport(SvcName, Ref), Opts). %% Tell watchdog to (maybe) die later ... @@ -1716,6 +1683,8 @@ have_request(Pkt, TPid) -> %% request_peer_up/1 +%% Insert an element that is used to detect whether or not there has +%% been a failover when inserting an outgoing request. request_peer_up(TPid) -> ets:insert(?REQUEST_TABLE, {TPid}). @@ -3147,7 +3116,7 @@ peer_acc(ConnT, Acc, #peer{pid = Pid, type = Type, ref = Ref, options = Opts, - op_state = {_, WS}, + wd_state = WS, started = T, conn = TPid}) -> dict:append(Ref, diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 49b6776a84..8d10755180 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -151,29 +151,45 @@ handle_info(T, #watchdog{} = State) -> ok -> {noreply, State}; #watchdog{} = S -> - event(State, S), + event(T, State, S), {noreply, S}; stop -> ?LOG(stop, T), - event(State, State#watchdog{status = down}), + event(T, State, State#watchdog{status = down}), {stop, {shutdown, T}, State} end. -event(#watchdog{status = T}, #watchdog{status = T}) -> +event(_, #watchdog{status = T}, #watchdog{status = T}) -> ok; -event(#watchdog{transport = undefined}, #watchdog{transport = undefined}) -> +event(_, #watchdog{transport = undefined}, #watchdog{transport = undefined}) -> ok; -event(#watchdog{status = From, transport = F, parent = Pid}, +event(Msg, + #watchdog{status = From, transport = F, parent = Pid}, #watchdog{status = To, transport = T}) -> - E = {tpid(F,T), From, To}, + TPid = tpid(F,T), + E = {[TPid | data(Msg, TPid, From, To)], From, To}, notify(Pid, E), ?LOG(transition, {self(), E}). +data(Msg, TPid, reopen, okay) -> + {recv, TPid, 'DWA', _Pkt} = Msg, %% assert + {TPid, T} = eraser(open), + [T]; + +data({open, TPid, _Hosts, T}, TPid, _From, To) + when To == okay; + To == reopen -> + [T]; + +data(_, _, _, _) -> + []. + tpid(_, Pid) when is_pid(Pid) -> Pid; + tpid(Pid, _) -> Pid. @@ -248,15 +264,13 @@ transition({close, TPid, _Reason}, #watchdog{transport = TPid}) -> %% know the identity of the peer (ie. now) that we know that we're in %% state down rather than initial. -transition({open, TPid, Hosts, T} = Open, +transition({open, TPid, Hosts, _} = Open, #watchdog{transport = TPid, status = initial, - parent = Pid, restrict = {_, R}} = S) -> case okay(getr(restart), Hosts, R) of okay -> - open(Pid, {TPid, T}), set_watchdog(S#watchdog{status = okay}); reopen -> transition(Open, S#watchdog{status = down}) @@ -267,17 +281,15 @@ transition({open, TPid, Hosts, T} = Open, %% SetWatchdog() %% Pending = TRUE REOPEN -transition({open = P, TPid, _Hosts, T}, +transition({open = Key, TPid, _Hosts, T}, #watchdog{transport = TPid, - parent = Pid, status = down} = S) -> %% Store the info we need to notify the parent to reopen the %% connection after the requisite DWA's are received, at which %% time we eraser(open). The reopen message is a later addition, %% to communicate the new capabilities as soon as they're known. - putr(P, {TPid, T}), - Pid ! {reopen, self(), {TPid, T}}, + putr(Key, {TPid, T}), set_watchdog(send_watchdog(S#watchdog{status = reopen, num_dwa = 0})); @@ -300,8 +312,6 @@ transition({'DOWN', _, process, TPid, _}, transition({'DOWN', _, process, TPid, _}, #watchdog{transport = TPid} = S) -> - failover(S), - close(S), set_watchdog(S#watchdog{status = down, pending = false, transport = undefined}); @@ -401,29 +411,6 @@ tw(T) tw({M,F,A}) -> apply(M,F,A). -%% open/2 - -open(Pid, {_,_} = T) -> - Pid ! {connection_up, self(), T}. - -%% failover/1 - -failover(#watchdog{status = okay, - parent = Pid}) -> - Pid ! {connection_down, self()}; - -failover(_) -> - ok. - -%% close/1 - -close(#watchdog{status = down}) -> - ok; - -close(#watchdog{parent = Pid}) -> - {{T, _}, _, _} = getr(restart), - T == accept andalso (Pid ! {close, self()}). - %% send_watchdog/1 send_watchdog(#watchdog{pending = false, @@ -511,12 +498,10 @@ rcv(_, #watchdog{status = okay} = S) -> %% SetWatchdog() OKAY rcv('DWA', #watchdog{status = suspect} = S) -> - failback(S), set_watchdog(S#watchdog{status = okay, pending = false}); rcv(_, #watchdog{status = suspect} = S) -> - failback(S), set_watchdog(S#watchdog{status = okay}); %% REOPEN Receive DWA & Pending = FALSE @@ -524,10 +509,8 @@ rcv(_, #watchdog{status = suspect} = S) -> %% Failback() OKAY rcv('DWA', #watchdog{status = reopen, - num_dwa = 2 = N, - parent = Pid} + num_dwa = 2 = N} = S) -> - open(Pid, eraser(open)), S#watchdog{status = okay, num_dwa = N+1, pending = false}; @@ -546,11 +529,6 @@ rcv('DWA', #watchdog{status = reopen, rcv(_, #watchdog{status = reopen} = S) -> throwaway(S). -%% failback/1 - -failback(#watchdog{parent = Pid}) -> - Pid ! {connection_up, self()}. - %% timeout/1 %% %% The caller sets the watchdog on the return value. @@ -575,7 +553,6 @@ timeout(#watchdog{status = T, timeout(#watchdog{status = okay, pending = true} = S) -> - failover(S), S#watchdog{status = suspect}; %% SUSPECT Timer expires CloseConnection() @@ -592,7 +569,6 @@ timeout(#watchdog{status = T, when T == suspect; T == reopen, P, N < 0 -> exit(TPid, {shutdown, watchdog_timeout}), - close(S), S#watchdog{status = down}; %% REOPEN Timer expires & NumDWA = -1 -- cgit v1.2.3 From 0a4f11378b9cd5ef3d8358205fc41eb485e7cab8 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 28 Jan 2013 09:40:41 +0100 Subject: Rename records peer/conn -> watchdog/peer in diameter_service That is, make the naming match that of the corresponding modules. This has long been fairly confusing. --- lib/diameter/src/base/diameter_service.erl | 331 ++++++++++++++--------------- 1 file changed, 163 insertions(+), 168 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index a0af92c2a2..efce8c8f10 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -114,8 +114,10 @@ {id = now(), service_name, %% as passed to start_service/2, key in ?STATE_TABLE service :: #diameter_service{}, - peerT = ets_new(peers) :: ets:tid(),%% #peer{} at start_fsm - connT = ets_new(conns) :: ets:tid(),%% #conn{} at connection_up/reopen + watchdogT = ets_new(watchdogs) %% #watchdog{} at start + :: ets:tid(), + peerT = ets_new(peers) %% #peer{pid = TPid} at okay/reopen + :: ets:tid(), shared_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] local_peers = ?Dict:new(), %% Alias -> [{TPid, Caps}, ...] monitor = false :: false | pid(), %% process to die with @@ -131,35 +133,26 @@ %% service record is used to determine whether or not we need to call %% the process for a pick_peer callback. -%% Record representing a watchdog process as implemented by -%% diameter_watchdog. The term "peer" here is historical, made -%% especially confusing by the fact that a peer_ref() in the -%% documentation is the key of a #conn{} record, not a #peer{} record. -%% The name is also unfortunate given the meaning of peer in the -%% Diameter sense. --record(peer, +%% Record representing an RFC 3539 watchdog process implemented by +%% diameter_watchdog. +-record(watchdog, {pid :: match(pid()), type :: match(connect | accept), ref :: match(reference()), %% key into diameter_config options :: match([diameter:transport_opt()]),%% from start_transport - wd_state = ?WD_INITIAL :: match(wd_state()), + state = ?WD_INITIAL :: match(wd_state()), started = now(), %% at process start - conn = false :: match(boolean() | pid())}). + peer = false :: match(boolean() | pid())}). %% true at accepted, pid() at okay/reopen -%% Record representing a peer process as implemented by -%% diameter_peer_fsm. The term "conn" is historical. Despite the name -%% here, comments refer to watchdog and peer processes, that are keys -%% in #peer{} and #conn{} records respectively. To add to the -%% confusion, a #request.transport is a peer process = key in a -%% #conn{} record. The actual transport process (that the peer process -%% knows about and that has a transport connection) isn't seen here. --record(conn, +%% Record representing an Peer State Machine processes implemented by +%% diameter_peer_fsm. +-record(peer, {pid :: pid(), apps :: [{0..16#FFFFFFFF, diameter:app_alias()}], %% {Id, Alias} caps :: #diameter_caps{}, started = now(), %% at process start - peer :: pid()}). %% key into peerT + watchdog :: pid()}). %% key into watchdogT %% Record stored in diameter_request for each outgoing request. -record(request, @@ -519,17 +512,17 @@ transition({reconnect, Pid}, S) -> %% Watchdog is sending notification of a state transition. transition({watchdog, Pid, {[TPid | Data], From, To}}, #state{service_name = SvcName, - peerT = PeerT} + watchdogT = WatchdogT} = S) -> - #peer{ref = Ref, type = T, options = Opts} - = P - = fetch(PeerT, Pid), - watchdog(TPid, Data, From, To, P, S), + #watchdog{ref = Ref, type = T, options = Opts} + = Wd + = fetch(WatchdogT, Pid), + watchdog(TPid, Data, From, To, Wd, S), send_event(SvcName, {watchdog, Ref, TPid, {From, To}, {T, Opts}}), ok; -%% Death of a watchdog process (#peer.pid) results in the removal of +%% Death of a watchdog process (#watchdog.pid) results in the removal of %% it's peer and any associated conn record when 'DOWN' is received. -%% Death of a peer process process (#conn.pid, #peer.conn) results in +%% Death of a peer process process (#peer.pid, #watchdog.peer) results in %% ?WD_DOWN. %% Monitor process has died. Just die with a reason that tells @@ -667,27 +660,27 @@ mod_state(Alias, ModS) -> %%% --------------------------------------------------------------------------- %% remove_transport -shutdown(Refs, #state{peerT = PeerT}) +shutdown(Refs, #state{watchdogT = WatchdogT}) when is_list(Refs) -> - ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, PeerT); + ets:foldl(fun(P,ok) -> st(P, Refs), ok end, ok, WatchdogT); %% application/service shutdown -shutdown(Reason, #state{peerT = PeerT}) +shutdown(Reason, #state{watchdogT = WatchdogT}) when Reason == application; Reason == service -> diameter_lib:wait(ets:foldl(fun(P,A) -> st(P, Reason, A) end, [], - PeerT)). + WatchdogT)). %% st/2 -st(#peer{ref = Ref, pid = Pid}, Refs) -> +st(#watchdog{ref = Ref, pid = Pid}, Refs) -> lists:member(Ref, Refs) andalso (Pid ! {shutdown, self(), transport}). %% 'DOWN' cleans up %% st/3 -st(#peer{pid = Pid}, Reason, Acc) -> +st(#watchdog{pid = Pid}, Reason, Acc) -> Pid ! {shutdown, self(), Reason}, [Pid | Acc]. @@ -822,22 +815,22 @@ type(connect = T) -> T. %% start/4 -start(Ref, Type, Opts, #state{peerT = PeerT, - connT = ConnT, +start(Ref, Type, Opts, #state{watchdogT = WatchdogT, + peerT = PeerT, options = SvcOpts, service_name = SvcName, service = Svc}) when Type == connect; Type == accept -> - Pid = s(Type, Ref, {ConnT, + Pid = s(Type, Ref, {PeerT, Opts, SvcName, SvcOpts, merge_service(Opts, Svc)}), - insert(PeerT, #peer{pid = Pid, - type = Type, - ref = Ref, - options = Opts}), + insert(WatchdogT, #watchdog{pid = Pid, + type = Type, + ref = Ref, + options = Opts}), Pid. %% Note that the service record passed into the watchdog is the merged @@ -884,12 +877,12 @@ ms(_, Svc) -> %%% # accepted/3 %%% --------------------------------------------------------------------------- -accepted(Pid, _TPid, #state{peerT = PeerT} = S) -> - #peer{ref = Ref, type = accept = T, conn = false, options = Opts} - = P - = fetch(PeerT, Pid), - insert(PeerT, P#peer{conn = true}), %% mark replacement as started - start(Ref, T, Opts, S). %% start new watchdog +accepted(Pid, _TPid, #state{watchdogT = WatchdogT} = S) -> + #watchdog{ref = Ref, type = accept = T, peer = false, options = Opts} + = Wd + = fetch(WatchdogT, Pid), + insert(WatchdogT, Wd#watchdog{peer = true}),%% mark replacement as started + start(Ref, T, Opts, S). %% start new watchdog fetch(Tid, Key) -> [T] = ets:lookup(Tid, Key), @@ -900,29 +893,29 @@ fetch(Tid, Key) -> %%% --------------------------------------------------------------------------- %% Watchdog has a new open connection. -watchdog(TPid, [T], _, ?WD_OKAY, Peer, State) -> - connection_up({TPid, T}, Peer, State); +watchdog(TPid, [T], _, ?WD_OKAY, Wd, State) -> + connection_up({TPid, T}, Wd, State); %% Watchdog has a new connection that will be opened after DW[RA] %% exchange. -watchdog(TPid, [T], _, ?WD_REOPEN, Peer, State) -> - reopen({TPid, T}, Peer, State); +watchdog(TPid, [T], _, ?WD_REOPEN, Wd, State) -> + reopen({TPid, T}, Wd, State); %% Watchdog has recovered a suspect connection. -watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Peer, State) -> - #peer{conn = TPid} = Peer, %% assert - connection_up(Peer, State); +watchdog(TPid, [], ?WD_SUSPECT, ?WD_OKAY, Wd, State) -> + #watchdog{peer = TPid} = Wd, %% assert + connection_up(Wd, State); %% Watchdog has an unresponsive connection. -watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Peer, State) -> - #peer{conn = TPid} = Peer, %% assert - connection_down(Peer, To, State); +watchdog(TPid, [], ?WD_OKAY, ?WD_SUSPECT = To, Wd, State) -> + #watchdog{peer = TPid} = Wd, %% assert + connection_down(Wd, To, State); %% Watchdog has lost its connection. -watchdog(TPid, [], _, ?WD_DOWN = To, Peer, #state{connT = ConnT} = S) -> - close(Peer, S), - connection_down(Peer, To, S), - ets:delete(ConnT, TPid); +watchdog(TPid, [], _, ?WD_DOWN = To, Wd, #state{peerT = PeerT} = S) -> + close(Wd, S), + connection_down(Wd, To, S), + ets:delete(PeerT, TPid); watchdog(_, [], _, _, _, _) -> ok. @@ -934,32 +927,32 @@ watchdog(_, [], _, _, _, _) -> %% Watchdog process has reached state OKAY. connection_up({TPid, {Caps, SApps, Pkt}}, - #peer{pid = Pid} - = P, - #state{connT = ConnT} + #watchdog{pid = Pid} + = Wd, + #state{peerT = PeerT} = S) -> - C = #conn{pid = TPid, - apps = SApps, - caps = Caps, - peer = Pid}, - insert(ConnT, C), - connection_up([Pkt], P#peer{conn = TPid}, C, S). + Pr = #peer{pid = TPid, + apps = SApps, + caps = Caps, + watchdog = Pid}, + insert(PeerT, Pr), + connection_up([Pkt], Wd#watchdog{peer = TPid}, Pr, S). %%% --------------------------------------------------------------------------- %%% # reopen/3 %%% --------------------------------------------------------------------------- reopen({TPid, {Caps, SApps, _Pkt}}, - #peer{pid = Pid} - = P, - #state{peerT = PeerT, - connT = ConnT}) -> - insert(ConnT, #conn{pid = TPid, + #watchdog{pid = Pid} + = Wd, + #state{watchdogT = WatchdogT, + peerT = PeerT}) -> + insert(PeerT, #peer{pid = TPid, apps = SApps, caps = Caps, - peer = Pid}), - insert(PeerT, P#peer{wd_state = ?WD_REOPEN, - conn = TPid}). + watchdog = Pid}), + insert(WatchdogT, Wd#watchdog{state = ?WD_REOPEN, + peer = TPid}). %%% --------------------------------------------------------------------------- %%% # connection_up/2 @@ -968,26 +961,26 @@ reopen({TPid, {Caps, SApps, _Pkt}}, %% Watchdog has recovered as suspect connection. Note that there has %% been no new capabilties exchange in this case. -connection_up(#peer{conn = TPid} = P, #state{connT = ConnT} = S) -> - connection_up([], P, fetch(ConnT, TPid), S). +connection_up(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> + connection_up([], Wd, fetch(PeerT, TPid), S). %% connection_up/4 connection_up(Extra, - #peer{conn = TPid} - = P, - #conn{apps = SApps, caps = Caps} - = C, - #state{peerT = PeerT, + #watchdog{peer = TPid} + = Wd, + #peer{apps = SApps, caps = Caps} + = Pr, + #state{watchdogT = WatchdogT, local_peers = LDict, service_name = SvcName, service = #diameter_service{applications = Apps}} = S) -> - insert(PeerT, P#peer{wd_state = ?WD_OKAY}), + insert(WatchdogT, Wd#watchdog{state = ?WD_OKAY}), request_peer_up(TPid), insert_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), - report_status(up, P, C, S, Extra). + report_status(up, Wd, Pr, S, Extra). insert_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> ilp(A, T, D) end, LDict, SApps). @@ -1026,35 +1019,35 @@ peer_cb(MFA, Alias) -> %%% # connection_down/3 %%% --------------------------------------------------------------------------- -connection_down(#peer{wd_state = ?WD_OKAY, - conn = TPid} - = P, - #conn{caps = Caps, +connection_down(#watchdog{state = ?WD_OKAY, + peer = TPid} + = Wd, + #peer{caps = Caps, apps = SApps} - = C, + = Pr, #state{service_name = SvcName, service = #diameter_service{applications = Apps}, local_peers = LDict} = S) -> - report_status(down, P, C, S, []), + report_status(down, Wd, Pr, S, []), remove_local_peer(SApps, {{TPid, Caps}, {SvcName, Apps}}, LDict), request_peer_down(TPid, S); -connection_down(#peer{}, #conn{}, _) -> +connection_down(#watchdog{}, #peer{}, _) -> ok; -connection_down(#peer{wd_state = WS, - conn = TPid} - = P, +connection_down(#watchdog{state = WS, + peer = TPid} + = Wd, To, - #state{peerT = PeerT, - connT = ConnT} + #state{watchdogT = WatchdogT, + peerT = PeerT} = S) when is_atom(To) -> - insert(PeerT, P#peer{wd_state = To}), + insert(WatchdogT, Wd#watchdog{state = To}), ?WD_OKAY == WS andalso - connection_down(P, fetch(ConnT, TPid), S). + connection_down(Wd, fetch(PeerT, TPid), S). remove_local_peer(SApps, T, LDict) -> lists:foldl(fun(A,D) -> rlp(A, T, D) end, LDict, SApps). @@ -1077,19 +1070,19 @@ down_conn(Id, Alias, TC, {SvcName, Apps}) -> %% Watchdog process has died. -peer_down(Pid, Reason, #state{peerT = PeerT} = S) -> - P = fetch(PeerT, Pid), - ets:delete_object(PeerT, P), +peer_down(Pid, Reason, #state{watchdogT = WatchdogT} = S) -> + P = fetch(WatchdogT, Pid), + ets:delete_object(WatchdogT, P), closed(Reason, P, S), restart(P,S), peer_down(P,S). %% Send an event at connection establishment failure. closed({shutdown, {close, _TPid, Reason}}, - #peer{wd_state = WS, - ref = Ref, - type = Type, - options = Opts}, + #watchdog{state = WS, + ref = Ref, + type = Type, + options = Opts}, #state{service_name = SvcName}) when WS /= ?WD_OKAY -> send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}); @@ -1097,14 +1090,14 @@ closed(_, _, _) -> ok. %% The watchdog has never reached OKAY ... -peer_down(#peer{conn = B}, _) +peer_down(#watchdog{peer = B}, _) when is_boolean(B) -> ok; %% ... or maybe it has. -peer_down(#peer{conn = TPid} = P, #state{connT = ConnT} = S) -> - connection_down(P, ?WD_DOWN, S), - ets:delete(ConnT, TPid). +peer_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> + connection_down(Wd, ?WD_DOWN, S), + ets:delete(PeerT, TPid). %% restart/2 @@ -1114,22 +1107,22 @@ restart(P,S) -> %% restart/1 %% Always try to reconnect. -restart(#peer{ref = Ref, - type = connect = T, - options = Opts, - started = Time}) -> +restart(#watchdog{ref = Ref, + type = connect = T, + options = Opts, + started = Time}) -> {Time, {Ref, T, Opts}}; %% Transport connection hasn't yet been accepted ... -restart(#peer{ref = Ref, - type = accept = T, - options = Opts, - conn = false, - started = Time}) -> +restart(#watchdog{ref = Ref, + type = accept = T, + options = Opts, + peer = false, + started = Time}) -> {Time, {Ref, T, Opts}}; %% ... or it has: a replacement has already been spawned. -restart(#peer{type = accept}) -> +restart(#watchdog{type = accept}) -> false. %% q_restart/2 @@ -1190,12 +1183,12 @@ tc(false = No, _, _) -> %% removed %% the accepting watchdog upon reception of a CER from the previously %% connected peer, or us after reconnect_timer timeout. -close(#peer{type = connect}, _) -> +close(#watchdog{type = connect}, _) -> ok; -close(#peer{type = accept, - pid = Pid, - ref = Ref, - options = Opts}, +close(#watchdog{type = accept, + pid = Pid, + ref = Ref, + options = Opts}, #state{service_name = SvcName}) -> c(Pid, diameter_config:have_transport(SvcName, Ref), Opts). @@ -1219,11 +1212,11 @@ c(Pid, false, _Opts) -> %%% --------------------------------------------------------------------------- reconnect(Pid, #state{service_name = SvcName, - peerT = PeerT}) -> - #peer{ref = Ref, - type = connect, - options = Opts} - = fetch(PeerT, Pid), + watchdogT = WatchdogT}) -> + #watchdog{ref = Ref, + type = connect, + options = Opts} + = fetch(WatchdogT, Pid), send_event(SvcName, {reconnect, Ref, Opts}). %%% --------------------------------------------------------------------------- @@ -1703,8 +1696,8 @@ request_peer_down(TPid, S) -> %%% recv_request/3 %%% --------------------------------------------------------------------------- -recv_request(TPid, Pkt, {ConnT, SvcName, Apps, Mask}) -> - try ets:lookup(ConnT, TPid) of +recv_request(TPid, Pkt, {PeerT, SvcName, Apps, Mask}) -> + try ets:lookup(PeerT, TPid) of [C] -> recv_request(C, TPid, Pkt, SvcName, Apps, Mask); [] -> %% transport has gone down @@ -1716,7 +1709,7 @@ recv_request(TPid, Pkt, {ConnT, SvcName, Apps, Mask}) -> %% recv_request/5 -recv_request(#conn{apps = SApps, caps = Caps}, +recv_request(#peer{apps = SApps, caps = Caps}, TPid, Pkt, SvcName, @@ -2536,11 +2529,11 @@ rt(#request{packet = #diameter_packet{msg = Msg}, %%% --------------------------------------------------------------------------- report_status(Status, - #peer{ref = Ref, - conn = TPid, - type = Type, - options = Opts}, - #conn{apps = [_|_] = As, + #watchdog{ref = Ref, + peer = TPid, + type = Type, + options = Opts}, + #peer{apps = [_|_] = As, caps = Caps}, #state{service_name = SvcName} = S, @@ -2909,8 +2902,8 @@ eq(Any, Id, PeerId) -> %% transports/1 -transports(#state{peerT = PeerT}) -> - ets:select(PeerT, [{#peer{conn = '$1', _ = '_'}, +transports(#state{watchdogT = WatchdogT}) -> + ets:select(WatchdogT, [{#watchdog{peer = '$1', _ = '_'}, [{'is_pid', '$1'}], ['$1']}]). @@ -2964,11 +2957,12 @@ tagged_info(Item, S) undefined end; -tagged_info(TPid, #state{peerT = PT, connT = CT}) +tagged_info(TPid, #state{watchdogT = WatchdogT, peerT = PeerT}) when is_pid(TPid) -> try - [#conn{peer = Pid}] = ets:lookup(CT, TPid), - [#peer{ref = Ref, type = Type, options = Opts}] = ets:lookup(PT, Pid), + [#peer{watchdog = Pid}] = ets:lookup(PeerT, TPid), + [#watchdog{ref = Ref, type = Type, options = Opts}] + = ets:lookup(WatchdogT, Pid), [{ref, Ref}, {type, Type}, {options, Opts}] @@ -3051,11 +3045,11 @@ complete(Pre) -> %% info_stats/1 -info_stats(#state{peerT = PeerT}) -> - MatchSpec = [{#peer{ref = '$1', conn = '$2', _ = '_'}, +info_stats(#state{watchdogT = WatchdogT}) -> + MatchSpec = [{#watchdog{ref = '$1', peer = '$2', _ = '_'}, [{'is_pid', '$2'}], [['$1', '$2']]}], - try ets:select(PeerT, MatchSpec) of + try ets:select(WatchdogT, MatchSpec) of L -> diameter_stats:read(lists:append(L)) catch @@ -3065,7 +3059,8 @@ info_stats(#state{peerT = PeerT}) -> %% info_transport/1 %% %% One entry per configured transport. Statistics for each entry are -%% the accumulated values for the ref and associated peer pids. +%% the accumulated values for the ref and associated watchdog/peer +%% pids. info_transport(S) -> PeerD = peer_dict(S, config_dict(S)), @@ -3098,42 +3093,42 @@ transport([[_,_] | L]) -> %% Possibly many peer entries for a listening transport. Note that all %% have the same options by construction, which is not terribly space -%% efficient. (TODO: all entries for the same Ref should share options.) +%% efficient. transport([[{type, accept}, {options, Opts} | _] | _] = Ls) -> [{type, listen}, {options, Opts}, {accept, [lists:nthtail(2,L) || L <- Ls]}]. -peer_dict(#state{peerT = PeerT, connT = ConnT}, Dict0) -> - try ets:tab2list(PeerT) of +peer_dict(#state{watchdogT = WatchdogT, peerT = PeerT}, Dict0) -> + try ets:tab2list(WatchdogT) of L -> - lists:foldl(fun(T,A) -> peer_acc(ConnT, A, T) end, Dict0, L) + lists:foldl(fun(T,A) -> peer_acc(PeerT, A, T) end, Dict0, L) catch error: badarg -> Dict0 %% service has gone down end. -peer_acc(ConnT, Acc, #peer{pid = Pid, - type = Type, - ref = Ref, - options = Opts, - wd_state = WS, - started = T, - conn = TPid}) -> +peer_acc(PeerT, Acc, #watchdog{pid = Pid, + type = Type, + ref = Ref, + options = Opts, + state = WS, + started = At, + peer = TPid}) -> dict:append(Ref, [{type, Type}, {options, Opts}, - {watchdog, {Pid, T, WS}} - | info_conn(ConnT, TPid, WS /= ?WD_DOWN)], + {watchdog, {Pid, At, WS}} + | info_peer(PeerT, TPid, WS)], Acc). -info_conn(ConnT, TPid, true) - when is_pid(TPid) -> - try ets:lookup(ConnT, TPid) of - T -> info_conn(T) +info_peer(PeerT, TPid, WS) + when is_pid(TPid), WS /= ?WD_DOWN -> + try ets:lookup(PeerT, TPid) of + T -> info_peer(T) catch error: badarg -> [] %% service has gone down end; -info_conn(_, _, _) -> +info_peer(_, _, _) -> []. %% The point of extracting the config here is so that 'transport' info @@ -3152,12 +3147,12 @@ config_acc({Ref, T, Opts}, Dict) config_acc(_, Dict) -> Dict. -info_conn([#conn{pid = Pid, apps = SApps, caps = Caps, started = T}]) -> +info_peer([#peer{pid = Pid, apps = SApps, caps = Caps, started = T}]) -> [{peer, {Pid, T}}, {apps, SApps}, {caps, info_caps(Caps)} | try [{port, info_port(Pid)}] catch _:_ -> [] end]; -info_conn([] = No) -> +info_peer([] = No) -> No. %% Extract information that the processes involved are expected to -- cgit v1.2.3 From 5b96ba5ec98c31b32ca5f30dc47b3f60421cea24 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 24 Jan 2013 17:46:54 +0100 Subject: Comment fixes --- lib/diameter/src/base/diameter_service.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index efce8c8f10..4d21d28512 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -711,8 +711,6 @@ cs(undefined, _) -> %%% --------------------------------------------------------------------------- %%% # i/1 -%%% -%%% Output: #state{} %%% --------------------------------------------------------------------------- %% Intialize the state of a service gen_server. @@ -890,6 +888,8 @@ fetch(Tid, Key) -> %%% --------------------------------------------------------------------------- %%% # watchdog/6 +%%% +%%% React to a watchdog state transition. %%% --------------------------------------------------------------------------- %% Watchdog has a new open connection. @@ -2612,7 +2612,7 @@ rpd(Pid, Alias, PDict) -> %%% --------------------------------------------------------------------------- %%% find_transport/[34] %%% -%%% Output: {TransportPid, #diameter_caps{}, #diameter_app{}} +%%% Return: {TransportPid, #diameter_caps{}, #diameter_app{}} %%% | false %%% | {error, Reason} %%% --------------------------------------------------------------------------- @@ -2750,7 +2750,7 @@ avp_decode(_, _, #diameter_avp{value = V}) -> %%% --------------------------------------------------------------------------- %%% # pick_peer(App, [DestRealm, DestHost], Filter, #state{}) %%% -%%% Output: {TransportPid, #diameter_caps{}, App} +%%% Return: {TransportPid, #diameter_caps{}, App} %%% | false %%% | {error, Reason} %%% --------------------------------------------------------------------------- -- cgit v1.2.3 From 32a34109c6547c16a787e1788954b4dc2736976c Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 28 Jan 2013 18:29:43 +0100 Subject: Fix faulty watchdog transition DOWN -> INITIAL This was the result of the watchdog process exiting as a consequence of peer death in some casesi, causing a restarted transport to enter INITIAL when it should enter REOPEN. The watchdog now remains alive as long as peer shutdown isn't requested and a 'close' message to the service process (instead of watchdog death) generates 'closed' events from the service. --- lib/diameter/src/base/diameter_peer_fsm.erl | 57 ++++++++--------------------- lib/diameter/src/base/diameter_service.erl | 52 +++++++++++++------------- lib/diameter/src/base/diameter_watchdog.erl | 48 ++++++++++++------------ 3 files changed, 65 insertions(+), 92 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index b515a599ed..d593d0ab84 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -126,12 +126,7 @@ %% State Machine rather than closer to the transport. This is what we %% now do below: connect/accept call diameter_watchdog and return the %% pid of the watchdog process, and the watchdog in turn calls start/3 -%% below to start the process implementing the Peer State Machine. The -%% former is a "peer" in diameter_service while the latter is a -%% "conn". In a sense, diameter_service sees the watchdog as -%% implementing the Peer State Machine and the process implemented -%% here as being the transport, not being aware of the watchdog at -%% all. +%% below to start the process implementing the Peer State Machine. %% %%% --------------------------------------------------------------------------- @@ -274,13 +269,12 @@ handle_info(T, #state{} = State) -> {noreply, S}; {stop, Reason} -> ?LOG(stop, Reason), - x(Reason, State); + {stop, {shutdown, Reason}, State}; stop -> ?LOG(stop, T), - x(T, State) + {stop, {shutdown, T}, State} catch exit: {diameter_codec, encode, _} = Reason -> - close_wd(Reason, State#state.parent), ?LOG(stop, Reason), %% diameter_codec:encode/2 emits an error report. Only %% indicate the probable reason here. @@ -300,10 +294,6 @@ handle_info(T, #state{} = State) -> %% succesfully encoded. It's not checked at diameter:add_transport/2 %% since this can be called before creating the service. -x(Reason, #state{} = S) -> - close_wd(Reason, S), - {stop, {shutdown, Reason}, S}. - %% terminate/2 terminate(_, _) -> @@ -378,9 +368,8 @@ transition({diameter, {recv, Pkt}}, S) -> recv(Pkt, S); %% Timeout when still in the same state ... -transition({timeout = T, PS}, #state{state = PS} = S) -> - close({capx(PS), T}, S), - stop; +transition({timeout = T, PS}, #state{state = PS}) -> + {stop, {capx(PS), T}}; %% ... or not. transition({timeout, _}, _) -> @@ -457,7 +446,7 @@ send_CER(#state{state = {'Wait-Conn-Ack', Tmo}, OH = LCaps#diameter_caps.origin_host, req_send_CER(OH, Remote) orelse - close({already_connected, Remote, LCaps}, S), + close({already_connected, Remote, LCaps}), CER = build_CER(S), ?LOG(send, 'CER'), #diameter_packet{header = #diameter_header{end_to_end_id = Eid, @@ -690,17 +679,17 @@ cea(CEA, RC) -> CEA#diameter_base_CEA{'Result-Code' = RC}. post('CER' = T, RC, Pkt, S) -> - [fun close/2, {T, caps(S), {RC, Pkt}}]; + [fun(_) -> close({T, caps(S), {RC, Pkt}}) end]; post(_, _, _, _) -> ok. rejected({capabilities_cb, _F, Reason}, T, S) -> rejected(Reason, T, S); -rejected(discard, T, S) -> - close(T, S); +rejected(discard, T, _) -> + close(T); rejected({N, Es}, T, S) -> - {answer('CER', N, Es, S), [fun close/2, T]}; + {answer('CER', N, Es, S), [fun(_) -> close(T) end]}; rejected(N, T, S) -> rejected({N, []}, T, S). @@ -860,7 +849,7 @@ handle_CEA(#diameter_packet{bin = Bin} of _ -> open(DPkt, SApps, Caps, {connect, hd([_] = IS)}, S) catch - ?FAILURE(Reason) -> close({'CEA', Reason, Caps, DPkt}, S) + ?FAILURE(Reason) -> close({'CEA', Reason, Caps, DPkt}) end. %% Check more than the result code since the peer could send success %% regardless. If not 2001 then a peer_up callback could do anything @@ -880,7 +869,7 @@ recv_CEA(#diameter_packet{header = #diameter_header{version T; recv_CEA(Pkt, S) -> - close({'CEA', caps(S), Pkt}, S). + close({'CEA', caps(S), Pkt}). caps(#diameter_service{capabilities = Caps}) -> Caps; @@ -933,14 +922,14 @@ open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid, %% We've advertised TLS support: tell the transport the result %% and expect a reply when the handshake is complete. -tls_ack(true, Caps, Type, IS, #state{transport = TPid} = S) -> +tls_ack(true, Caps, Type, IS, #state{transport = TPid}) -> Ref = make_ref(), TPid ! {diameter, {tls, Ref, Type, IS == ?TLS}}, receive {diameter, {tls, Ref}} -> ok; {'DOWN', _, process, TPid, Reason} -> - close({tls_ack, Reason, Caps}, S) + close({tls_ack, Reason, Caps}) end; %% Or not. Don't send anything to the transport so that transports @@ -953,25 +942,11 @@ capz(#diameter_caps{} = L, #diameter_caps{} = R) -> = list_to_tuple([diameter_caps | lists:zip(tl(tuple_to_list(L)), tl(tuple_to_list(R)))]). -%% close/2 +%% close/1 -%% Tell the watchdog that our death isn't due to transport failure. -close(Reason, #state{parent = Pid}) -> - close_wd(Reason, Pid), +close(Reason) -> throw({?MODULE, close, Reason}). -%% close_wd/2 - -%% Ensure the watchdog dies if DPR has been sent ... -close_wd(_, #state{dpr = false}) -> - ok; -close_wd(Reason, #state{parent = Pid}) -> - close_wd(Reason, Pid); - -%% ... or otherwise -close_wd(Reason, Pid) -> - Pid ! {close, self(), Reason}. - %% dwa/1 dwa(#diameter_caps{origin_host = OH, diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index 4d21d28512..3cab914fdb 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -509,6 +509,19 @@ transition({reconnect, Pid}, S) -> reconnect(Pid, S), ok; +%% Watchdog is sending notification of transport death. +transition({close, Pid, Reason}, #state{service_name = SvcName, + watchdogT = WatchdogT}) -> + #watchdog{state = WS, + ref = Ref, + type = Type, + options = Opts} + = fetch(WatchdogT, Pid), + WS /= ?WD_OKAY + andalso + send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}), + ok; + %% Watchdog is sending notification of a state transition. transition({watchdog, Pid, {[TPid | Data], From, To}}, #state{service_name = SvcName, @@ -532,9 +545,9 @@ transition({'DOWN', MRef, process, _, Reason}, #state{monitor = MRef}) -> {stop, {monitor, Reason}}; %% Local watchdog process has died. -transition({'DOWN', _, process, Pid, Reason}, S) +transition({'DOWN', _, process, Pid, _Reason}, S) when node(Pid) == node() -> - peer_down(Pid, Reason, S), + watchdog_down(Pid, S), ok; %% Remote service wants to know about shared peers. @@ -1065,44 +1078,31 @@ down_conn(Id, Alias, TC, {SvcName, Apps}) -> peer_cb({ModX, peer_down, [SvcName, TC]}, Alias). %%% --------------------------------------------------------------------------- -%%% # peer_down/3 +%%% # watchdog_down/2 %%% --------------------------------------------------------------------------- %% Watchdog process has died. -peer_down(Pid, Reason, #state{watchdogT = WatchdogT} = S) -> - P = fetch(WatchdogT, Pid), - ets:delete_object(WatchdogT, P), - closed(Reason, P, S), - restart(P,S), - peer_down(P,S). - -%% Send an event at connection establishment failure. -closed({shutdown, {close, _TPid, Reason}}, - #watchdog{state = WS, - ref = Ref, - type = Type, - options = Opts}, - #state{service_name = SvcName}) - when WS /= ?WD_OKAY -> - send_event(SvcName, {closed, Ref, Reason, {type(Type), Opts}}); -closed(_, _, _) -> - ok. +watchdog_down(Pid, #state{watchdogT = WatchdogT} = S) -> + Wd = fetch(WatchdogT, Pid), + ets:delete_object(WatchdogT, Wd), + restart(Wd,S), + wd_down(Wd,S). -%% The watchdog has never reached OKAY ... -peer_down(#watchdog{peer = B}, _) +%% Watchdog has never reached OKAY ... +wd_down(#watchdog{peer = B}, _) when is_boolean(B) -> ok; %% ... or maybe it has. -peer_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> +wd_down(#watchdog{peer = TPid} = Wd, #state{peerT = PeerT} = S) -> connection_down(Wd, ?WD_DOWN, S), ets:delete(PeerT, TPid). %% restart/2 -restart(P,S) -> - q_restart(restart(P), S). +restart(Wd, S) -> + q_restart(restart(Wd), S). %% restart/1 diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 8d10755180..ece5f0e359 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -82,7 +82,7 @@ start({_,_} = Type, T) -> try {erlang:monitor(process, Pid), Pid} after - Pid ! Ref + send(Pid, Ref) end. start_link(T) -> @@ -151,7 +151,8 @@ handle_info(T, #watchdog{} = State) -> ok -> {noreply, State}; #watchdog{} = S -> - event(T, State, S), + close(T, State), %% service expects 'close' message + event(T, State, S), %% before 'watchdog' {noreply, S}; stop -> ?LOG(stop, T), @@ -159,6 +160,14 @@ handle_info(T, #watchdog{} = State) -> {stop, {shutdown, T}, State} end. +close({'DOWN', _, process, TPid, {shutdown, Reason}}, + #watchdog{transport = TPid, + parent = Pid}) -> + send(Pid, {close, self(), Reason}); + +close(_, _) -> + ok. + event(_, #watchdog{status = T}, #watchdog{status = T}) -> ok; @@ -170,7 +179,7 @@ event(Msg, #watchdog{status = To, transport = T}) -> TPid = tpid(F,T), E = {[TPid | data(Msg, TPid, From, To)], From, To}, - notify(Pid, E), + send(Pid, {watchdog, self(), E}), ?LOG(transition, {self(), E}). data(Msg, TPid, reopen, okay) -> @@ -193,8 +202,8 @@ tpid(_, Pid) tpid(Pid, _) -> Pid. -notify(Pid, E) -> - Pid ! {watchdog, self(), E}. +send(Pid, T) -> + Pid ! T. %% terminate/2 @@ -232,7 +241,7 @@ transition({shutdown, Pid, _}, #watchdog{parent = Pid, transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid, transport = TPid} = S) -> - TPid ! {T, self(), Reason}, + send(TPid, {T, self(), Reason}), S#watchdog{shutdown = true}; %% Parent process has died, @@ -243,13 +252,9 @@ transition({'DOWN', _, process, Pid, _Reason}, %% Transport has accepted a connection. transition({accepted = T, TPid}, #watchdog{transport = TPid, parent = Pid}) -> - Pid ! {T, self(), TPid}, + send(Pid, {T, self(), TPid}), ok; -%% Transport is telling us that its impending death isn't failure. -transition({close, TPid, _Reason}, #watchdog{transport = TPid}) -> - stop; - %% STATE Event Actions New State %% ===== ------ ------- ---------- %% INITIAL Connection up SetWatchdog() OKAY @@ -301,24 +306,17 @@ transition({open = Key, TPid, _Hosts, T}, %% REOPEN Connection down CloseConnection() %% SetWatchdog() DOWN -transition({'DOWN', _, process, TPid, _}, +transition({'DOWN', _, process, TPid, _Reason}, #watchdog{transport = TPid, - status = S, - shutdown = D}) - when S == initial; - D -> + shutdown = true}) -> stop; -transition({'DOWN', _, process, TPid, _}, +transition({'DOWN', _, process, TPid, _Reason}, #watchdog{transport = TPid} = S) -> set_watchdog(S#watchdog{status = down, pending = false, transport = undefined}); -%% Any outstanding pending (or other messages from the transport) will -%% have arrived before 'DOWN' since the message comes from the same -%% process. Note that we could also get this message in the initial -%% state. %% Incoming message. transition({recv, TPid, Name, Pkt}, #watchdog{transport = TPid} = S) -> @@ -334,7 +332,7 @@ transition({timeout, _, tw}, #watchdog{}) -> %% State query. transition({state, Pid}, #watchdog{status = S}) -> - Pid ! {self(), S}, + send(Pid, {self(), S}), ok. %% =========================================================================== @@ -389,7 +387,7 @@ okay([{_,P}]) -> %% ... or it has. okay(C) -> - [_|_] = [P ! close || {_,P} <- C, self() /= P], + [_|_] = [send(P, close) || {_,P} <- C, self() /= P], reopen. %% set_watchdog/1 @@ -417,7 +415,7 @@ send_watchdog(#watchdog{pending = false, transport = TPid, sequence = Mask} = S) -> - TPid ! {send, encode(getr(dwr), Mask)}, + send(TPid, {send, encode(getr(dwr), Mask)}), ?LOG(send, 'DWR'), S#watchdog{pending = true}. @@ -628,7 +626,7 @@ restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid, sequence = Mask, restrict = {R,_}} = S) -> - Pid ! {reconnect, self()}, + send(Pid, {reconnect, self()}), Nodes = restrict_nodes(R), S#watchdog{transport = monitor(diameter_peer_fsm:start(T, Opts, -- cgit v1.2.3 From ef5fddcb9f4a352253050251a7f145c69618e5d8 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Tue, 29 Jan 2013 18:11:34 +0100 Subject: Fix faulty watchdog transition INITIAL -> DOWN There is no such transition in RFC 3539, the state remains in INITIAL. --- lib/diameter/src/base/diameter_peer_fsm.erl | 52 ++++++++++++-------- lib/diameter/src/base/diameter_watchdog.erl | 73 +++++++++++++++-------------- 2 files changed, 71 insertions(+), 54 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index d593d0ab84..de341741db 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -129,16 +129,14 @@ %% below to start the process implementing the Peer State Machine. %% -%%% --------------------------------------------------------------------------- -%%% # start({connect|accept, Ref}, Opts, Service) -%%% -%%% Output: Pid -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% # start/3 +%% --------------------------------------------------------------------------- -spec start(T, [Opt], {diameter:sequence(), diameter:restriction(), #diameter_service{}}) - -> pid() + -> {reference(), pid()} when T :: {connect|accept, diameter:transport_ref()}, Opt :: diameter:transport_opt(). @@ -147,9 +145,15 @@ %% specified on the transport in question. Check here that the list is %% still non-empty. -start({_,_} = Type, Opts, MS) -> - {ok, Pid} = diameter_peer_fsm_sup:start_child({self(), Type, Opts, MS}), - Pid. +start({_,_} = Type, Opts, S) -> + Ack = make_ref(), + T = {Ack, self(), Type, Opts, S}, + {ok, Pid} = diameter_peer_fsm_sup:start_child(T), + try + {erlang:monitor(process, Pid), Pid} + after + Pid ! Ack + end. start_link(T) -> {ok, _} = proc_lib:start_link(?MODULE, @@ -158,8 +162,8 @@ start_link(T) -> infinity, diameter_lib:spawn_opts(server, [])). -%%% --------------------------------------------------------------------------- -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- %% init/1 @@ -167,12 +171,13 @@ init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). -i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps, - capabilities = LCaps} - = Svc}}) -> - [] /= Apps orelse ?ERROR({no_apps, T, Opts}), +i({Ack, WPid, {M, Ref} = T, Opts, {Mask, + Nodes, + #diameter_service{capabilities = LCaps} + = Svc}}) -> + erlang:monitor(process, WPid), + wait(Ack, WPid), putr(?DWA_KEY, dwa(LCaps)), - {M, Ref} = T, diameter_stats:reg(Ref), {[Cs,Ds], Rest} = proplists:split(Opts, [capabilities_cb, disconnect_cb]), putr(?CB_KEY, {Ref, [F || {_,F} <- Cs]}), @@ -180,7 +185,6 @@ i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps, putr(?REF_KEY, Ref), putr(?SEQUENCE_KEY, Mask), putr(?RESTRICT_KEY, Nodes), - erlang:monitor(process, WPid), {TPid, Addrs} = start_transport(T, Rest, Svc), Tmo = proplists:get_value(capx_timeout, Opts, ?EVENT_TIMEOUT), ?IS_TIMEOUT(Tmo) orelse ?ERROR({invalid, {capx_timeout, Tmo}}), @@ -198,6 +202,16 @@ i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps, %% watchdog start (start/2) succeeds regardless so as not to crash the %% service. +%% Wait for the caller to have a monitor to avoid a race with our +%% death. (Since the exit reason is used in diameter_service.) +wait(Ref, Pid) -> + receive + Ref -> + ok; + {'DOWN', _, process, Pid, _} = D -> + exit({shutdown, D}) + end. + start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) -> Addrs0 = LCaps#diameter_caps.host_ip_address, start_transport(Addrs0, {T, Opts, Svc}). @@ -304,8 +318,8 @@ terminate(_, _) -> code_change(_, State, _) -> {ok, State}. -%%% --------------------------------------------------------------------------- -%%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- +%% --------------------------------------------------------------------------- putr(Key, Val) -> put({?MODULE, Key}, Val). diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index ece5f0e359..10ab246b88 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -62,11 +62,13 @@ restrict :: {diameter:restriction(), boolean()}, shutdown = false :: boolean()}). +%% --------------------------------------------------------------------------- %% start/2 %% %% Start a monitor before the watchdog is allowed to proceed to ensure %% that a failed capabilities exchange produces the desired exit %% reason. +%% --------------------------------------------------------------------------- -spec start(Type, {RecvData, [Opt], SvcName, SvcOpts, #diameter_service{}}) -> {reference(), pid()} @@ -77,12 +79,12 @@ SvcName :: diameter:service_name(). start({_,_} = Type, T) -> - Ref = make_ref(), - {ok, Pid} = diameter_watchdog_sup:start_child({Ref, {Type, self(), T}}), + Ack = make_ref(), + {ok, Pid} = diameter_watchdog_sup:start_child({Ack, Type, self(), T}), try {erlang:monitor(process, Pid), Pid} after - send(Pid, Ref) + send(Pid, Ack) end. start_link(T) -> @@ -101,22 +103,15 @@ init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). -i({Ref, {_, Pid, _} = T}) -> - MRef = erlang:monitor(process, Pid), - receive - Ref -> - make_state(T); - {'DOWN', MRef, process, _, _} = D -> - exit({shutdown, D}) - end. - -make_state({T, Pid, {RecvData, - Opts, - SvcName, - SvcOpts, - #diameter_service{applications = Apps, - capabilities = Caps} - = Svc}}) -> +i({Ack, T, Pid, {RecvData, + Opts, + SvcName, + SvcOpts, + #diameter_service{applications = Apps, + capabilities = Caps} + = Svc}}) -> + erlang:monitor(process, Pid), + wait(Ack, Pid), random:seed(now()), putr(restart, {T, Opts, Svc}), %% save seeing it in trace putr(dwr, dwr(Caps)), %% @@ -124,9 +119,7 @@ make_state({T, Pid, {RecvData, Restrict = proplists:get_value(restrict_connections, SvcOpts), Nodes = restrict_nodes(Restrict), #watchdog{parent = Pid, - transport = monitor(diameter_peer_fsm:start(T, - Opts, - {Mask, Nodes, Svc})), + transport = start(T, Opts, Mask, Nodes, Svc), tw = proplists:get_value(watchdog_timer, Opts, ?DEFAULT_TW_INIT), @@ -134,6 +127,21 @@ make_state({T, Pid, {RecvData, sequence = Mask, restrict = {Restrict, lists:member(node(), Nodes)}}. +wait(Ref, Pid) -> + receive + Ref -> + ok; + {'DOWN', _, process, Pid, _} = D -> + exit({shutdown, D}) + end. + +%% start/5 + +start(T, Opts, Mask, Nodes, Svc) -> + {_MRef, Pid} + = diameter_peer_fsm:start(T, Opts, {Mask, Nodes, Svc}), + Pid. + %% handle_call/3 handle_call(_, _, State) -> @@ -234,9 +242,7 @@ transition(close, #watchdog{}) -> %% Service is asking for the peer to be taken down gracefully. transition({shutdown, Pid, _}, #watchdog{parent = Pid, - transport = undefined, - status = S}) -> - down = S, %% assert + transport = undefined}) -> stop; transition({shutdown = T, Pid, Reason}, #watchdog{parent = Pid, transport = TPid} @@ -312,9 +318,10 @@ transition({'DOWN', _, process, TPid, _Reason}, stop; transition({'DOWN', _, process, TPid, _Reason}, - #watchdog{transport = TPid} + #watchdog{transport = TPid, + status = T} = S) -> - set_watchdog(S#watchdog{status = down, + set_watchdog(S#watchdog{status = case T of initial -> T; _ -> down end, pending = false, transport = undefined}); @@ -337,10 +344,6 @@ transition({state, Pid}, #watchdog{status = S}) -> %% =========================================================================== -monitor(Pid) -> - erlang:monitor(process, Pid), - Pid. - putr(Key, Val) -> put({?MODULE, Key}, Val). @@ -600,7 +603,9 @@ timeout(#watchdog{status = reopen, %% process has died. We only need to handle state down since we start %% the first watchdog when transitioning out of initial. -timeout(#watchdog{status = down} = S) -> +timeout(#watchdog{status = T} = S) + when T == initial; + T == down -> restart(S). %% restart/1 @@ -628,9 +633,7 @@ restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid, = S) -> send(Pid, {reconnect, self()}), Nodes = restrict_nodes(R), - S#watchdog{transport = monitor(diameter_peer_fsm:start(T, - Opts, - {Mask, Nodes, Svc})), + S#watchdog{transport = start(T, Opts, Mask, Nodes, Svc), restrict = {R, lists:member(node(), Nodes)}}; %% No restriction on the number of connections to the same peer: just -- cgit v1.2.3