aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base/diameter_watchdog.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base/diameter_watchdog.erl')
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl63
1 files changed, 50 insertions, 13 deletions
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index 53f5f42396..b37a1a10e9 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -43,6 +43,7 @@
-include("diameter_internal.hrl").
-define(DEFAULT_TW_INIT, 30000). %% RFC 3539 ch 3.4.1
+-define(NOMASK, {0,32}). %% default sequence mask
-record(watchdog,
{%% PCB - Peer Control Block; see RFC 3539, Appendix A
@@ -56,7 +57,8 @@
parent = self() :: pid(),
transport :: pid() | undefined,
tref :: reference(), %% reference for current watchdog timer
- message_data}). %% term passed into diameter_service with message
+ message_data, %% term passed into diameter_service with message
+ sequence :: diameter:sequence()}). %% mask
%% start/2
%%
@@ -118,12 +120,20 @@ make_state({T, Pid, {RecvData,
random:seed(now()),
putr(restart, {T, Opts, Svc}), %% save seeing it in trace
putr(dwr, dwr(Caps)), %%
+ {_,_} = Mask = call(Pid, sequence),
#watchdog{parent = Pid,
- transport = monitor(diameter_peer_fsm:start(T, Opts, Svc)),
+ transport = monitor(diameter_peer_fsm:start(T,
+ Opts,
+ {Mask, Svc})),
tw = proplists:get_value(watchdog_timer,
Opts,
?DEFAULT_TW_INIT),
- message_data = {RecvData, SvcName, Apps}}.
+ message_data = {RecvData, SvcName, Apps, Mask},
+ sequence = Mask}.
+
+%% Retrieve the sequence mask from the parent from the parent, rather
+%% than having it passed into init/1, for upgrade reasons: the call to
+%% diameter_service:receive_message/3 passes back the mask.
%% handle_call/3
@@ -137,7 +147,7 @@ handle_cast(_, State) ->
%% handle_info/2
-handle_info(T, State) ->
+handle_info(T, #watchdog{} = State) ->
case transition(T, State) of
ok ->
{noreply, State};
@@ -148,7 +158,11 @@ handle_info(T, State) ->
?LOG(stop, T),
event(State, State#watchdog{status = down}),
{stop, {shutdown, T}, State}
- end.
+ end;
+
+handle_info(T, S) -> %% upgrade
+ handle_info(T, #watchdog{} = list_to_tuple(tuple_to_list(S)
+ ++ [?NOMASK])).
event(#watchdog{status = T}, #watchdog{status = T}) ->
ok;
@@ -258,12 +272,15 @@ transition({open, TPid, Hosts, T} = Open,
transition({open = P, TPid, _Hosts, T},
#watchdog{transport = TPid,
+ parent = Pid,
status = down}
= S) ->
%% Store the info we need to notify the parent to reopen the
%% connection after the requisite DWA's are received, at which
- %% time we eraser(open).
+ %% time we eraser(open). The reopen message is a later addition,
+ %% to communicate the new capabilities as soon as they're known.
putr(P, {TPid, T}),
+ Pid ! {reopen, self(), {TPid, T}},
set_watchdog(send_watchdog(S#watchdog{status = reopen,
num_dwa = 0}));
@@ -312,6 +329,15 @@ transition({state, Pid}, #watchdog{status = S}) ->
%% ===========================================================================
+%% Only call "upwards", to the parent service.
+call(Pid, Req) ->
+ try
+ gen_server:call(Pid, Req, infinity)
+ catch
+ exit: Reason ->
+ exit({shutdown, {Req, Reason}})
+ end.
+
monitor(Pid) ->
erlang:monitor(process, Pid),
Pid.
@@ -325,10 +351,16 @@ getr(Key) ->
eraser(Key) ->
erase({?MODULE, Key}).
-%% encode/1
+%% encode/2
-encode(Msg) ->
- #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Msg),
+encode(Msg, Mask) ->
+ Seq = diameter_session:sequence(Mask),
+ Hdr = #diameter_header{version = ?DIAMETER_VERSION,
+ end_to_end_id = Seq,
+ hop_by_hop_id = Seq},
+ Pkt = #diameter_packet{header = Hdr,
+ msg = Msg},
+ #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt),
Bin.
%% okay/2
@@ -400,9 +432,10 @@ close(#watchdog{parent = Pid}) ->
%% send_watchdog/1
send_watchdog(#watchdog{pending = false,
- transport = TPid}
+ transport = TPid,
+ sequence = Mask}
= S) ->
- TPid ! {send, encode(getr(dwr))},
+ TPid ! {send, encode(getr(dwr), Mask)},
?LOG(send, 'DWR'),
S#watchdog{pending = true}.
@@ -606,9 +639,13 @@ restart(S) ->
%% it's actually in state down rather then initial when receiving
%% notification of an open connection.
-restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid} = S) ->
+restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid,
+ sequence = Mask}
+ = S) ->
Pid ! {reconnect, self()},
- S#watchdog{transport = monitor(diameter_peer_fsm:start(T, Opts, Svc))};
+ S#watchdog{transport = monitor(diameter_peer_fsm:start(T,
+ Opts,
+ {Mask, Svc}))};
restart({{accept, _}, _, _}, S) ->
S.
%% Don't currently use Opts/Svc in the accept case but having them in