aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/base/diameter_peer_fsm.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/base/diameter_peer_fsm.erl')
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl155
1 files changed, 123 insertions, 32 deletions
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 99644814d2..bbda62c32b 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -54,6 +54,12 @@
-define(NO_INBAND_SECURITY, 0).
-define(TLS, 1).
+%% Keys in process dictionary.
+-define(CB_KEY, cb). %% capabilities callback
+-define(DWA_KEY, dwa). %% outgoing DWA
+-define(Q_KEY, q). %% transport start queue
+-define(START_KEY, start). %% start of connected transport
+
%% A 2xxx series Result-Code. Not necessarily 2001.
-define(IS_SUCCESS(N), 2 == (N) div 1000).
@@ -115,6 +121,11 @@
%%% Output: Pid
%%% ---------------------------------------------------------------------------
+-spec start(T, [Opt], #diameter_service{})
+ -> pid()
+ when T :: {connect|accept, diameter:transport_ref()},
+ Opt :: diameter:transport_opt().
+
%% diameter_config requires a non-empty list of applications on the
%% service but diameter_service then constrains the list to any
%% specified on the transport in question. Check here that the list is
@@ -143,18 +154,17 @@ init(T) ->
proc_lib:init_ack({ok, self()}),
gen_server:enter_loop(?MODULE, [], i(T)).
-i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc0}) ->
- putr(dwa, dwa(Caps)),
+i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc}) ->
+ putr(?DWA_KEY, dwa(Caps)),
{M, Ref} = T,
{[Ts], Rest} = proplists:split(Opts, [capabilities_cb]),
- putr(capabilities_cb, {Ref, [F || {_,F} <- Ts]}),
- {ok, TPid, Svc} = start_transport(T, Rest, Svc0),
- erlang:monitor(process, TPid),
+ putr(?CB_KEY, {Ref, [F || {_,F} <- Ts]}),
erlang:monitor(process, WPid),
+ {TPid, Addrs} = start_transport(T, Rest, Svc),
#state{parent = WPid,
transport = TPid,
mode = M,
- service = Svc}.
+ service = svc(Svc, Addrs)}.
%% The transport returns its local ip addresses so that different
%% transports on the same service can use different local addresses.
%% The local addresses are put into Host-IP-Address avps here when
@@ -164,18 +174,56 @@ i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc0}) ->
%% watchdog start (start/2) succeeds regardless so as not to crash the
%% service.
-start_transport(T, Opts, Svc) ->
- case diameter_peer:start(T, Opts, Svc) of
- {ok, TPid} ->
- {ok, TPid, Svc};
- {ok, TPid, [_|_] = Addrs} ->
- #diameter_service{capabilities = Caps0} = Svc,
- Caps = Caps0#diameter_caps{host_ip_address = Addrs},
- {ok, TPid, Svc#diameter_service{capabilities = Caps}};
+start_transport(T, Opts, #diameter_service{capabilities = Caps} = Svc) ->
+ Addrs0 = Caps#diameter_caps.host_ip_address,
+ start_transport(Addrs0, {T, Opts, Svc}).
+
+start_transport(Addrs0, T) ->
+ case diameter_peer:start(T) of
+ {TPid, Addrs, Tmo, Data} ->
+ erlang:monitor(process, TPid),
+ q_next(TPid, Addrs0, Tmo, Data),
+ {TPid, addrs(Addrs, Addrs0)};
No ->
exit({shutdown, No})
end.
+addrs([], Addrs0) ->
+ Addrs0;
+addrs(Addrs, _) ->
+ Addrs.
+
+svc(Svc, []) ->
+ Svc;
+svc(Svc, Addrs) ->
+ readdr(Svc, Addrs).
+
+readdr(#diameter_service{capabilities = Caps0} = Svc, Addrs) ->
+ Caps = Caps0#diameter_caps{host_ip_address = Addrs},
+ Svc#diameter_service{capabilities = Caps}.
+
+%% The 4-tuple Data returned from diameter_peer:start/1 identifies the
+%% transport module/config use to start the transport process in
+%% question as well as any alternates to try if a connection isn't
+%% established within Tmo.
+q_next(TPid, Addrs0, Tmo, {_,_,_,_} = Data) ->
+ send_after(Tmo, {connection_timeout, TPid}),
+ putr(?Q_KEY, {Addrs0, Tmo, Data}).
+
+%% Connection has been established: retain the started
+%% pid/module/config in the process dictionary. This is a part of the
+%% interface defined by this module, so that the transport pid can be
+%% found when constructing service_info (in order to extract further
+%% information from it).
+keep_transport(TPid) ->
+ {_, _, {{_,_,_} = T, _, _, _}} = eraser(?Q_KEY),
+ putr(?START_KEY, {TPid, T}).
+
+send_after(infinity, _) ->
+ ok;
+send_after(Tmo, T) ->
+ erlang:send_after(Tmo, self(), T).
+
%% handle_call/3
handle_call(_, _, State) ->
@@ -240,25 +288,48 @@ eraser(Key) ->
%% Connection to peer.
transition({diameter, {TPid, connected, Remote}},
- #state{state = PS,
+ #state{transport = TPid,
+ state = PS,
mode = M}
= S) ->
'Wait-Conn-Ack' = PS, %% assert
connect = M, %%
- send_CER(S#state{mode = {M, Remote},
- transport = TPid});
+ keep_transport(TPid),
+ send_CER(S#state{mode = {M, Remote}});
%% Connection from peer.
transition({diameter, {TPid, connected}},
- #state{state = PS,
+ #state{transport = TPid,
+ state = PS,
mode = M,
parent = Pid}
= S) ->
'Wait-Conn-Ack' = PS, %% assert
accept = M, %%
+ keep_transport(TPid),
Pid ! {accepted, self()},
- start_timer(S#state{state = recv_CER,
- transport = TPid});
+ start_timer(S#state{state = recv_CER});
+
+%% Connection established after receiving a connection_timeout
+%% message. This may be followed by an incoming message which arrived
+%% before the transport was killed and this can't be distinguished
+%% from one from the transport that's been started to replace it.
+transition({diameter, {_, connected}}, _) ->
+ {stop, connection_timeout};
+transition({diameter, {_, connected, _}}, _) ->
+ {stop, connection_timeout};
+
+%% Connection has timed out: start an alternate.
+transition({connection_timeout = T, TPid},
+ #state{transport = TPid,
+ state = 'Wait-Conn-Ack'}
+ = S) ->
+ exit(TPid, {shutdown, T}),
+ start_next(S);
+
+%% Connect timeout after connection or alternate start: ignore.
+transition({connection_timeout, _}, _) ->
+ ok;
%% Incoming message from the transport.
transition({diameter, {recv, Pkt}}, S) ->
@@ -305,14 +376,21 @@ transition({resolve_port, _Pid} = T, #state{transport = TPid}) ->
TPid ! T,
ok;
-%% Parent or transport has died.
-transition({'DOWN', _, process, P, _},
- #state{parent = Pid,
- transport = TPid})
- when P == Pid;
- P == TPid ->
+%% Parent has died.
+transition({'DOWN', _, process, WPid, _},
+ #state{parent = WPid}) ->
stop;
+%% Transport has died before connection timeout.
+transition({'DOWN', _, process, TPid, _},
+ #state{transport = TPid}
+ = S) ->
+ start_next(S);
+
+%% Transport has died after connection timeout.
+transition({'DOWN', _, process, _, _}, _) ->
+ ok;
+
%% State query.
transition({state, Pid}, #state{state = S, transport = TPid}) ->
Pid ! {self(), [S, TPid]},
@@ -320,6 +398,19 @@ transition({state, Pid}, #state{state = S, transport = TPid}) ->
%% Crash on anything unexpected.
+%% start_next/1
+
+start_next(#state{service = Svc0} = S) ->
+ case getr(?Q_KEY) of
+ {Addrs0, Tmo, Data} ->
+ Svc = readdr(Svc0, Addrs0),
+ {TPid, Addrs} = start_transport(Addrs0, {Svc, Tmo, Data}),
+ S#state{transport = TPid,
+ service = svc(Svc, Addrs)};
+ undefined ->
+ stop
+ end.
+
%% send_CER/1
send_CER(#state{mode = {connect, Remote},
@@ -649,7 +740,7 @@ rc([RC|_]) ->
%% answer/2
answer('DWR', _) ->
- getr(dwa);
+ getr(?DWA_KEY);
answer(Name, #state{service = #diameter_service{capabilities = Caps}}) ->
a(Name, Caps).
@@ -749,15 +840,15 @@ caps(#state{service = Svc}) ->
%% caps_cb/1
caps_cb(Caps) ->
- {Ref, Ts} = eraser(capabilities_cb),
- ccb(Ts, [Ref, Caps]).
+ {Ref, Ts} = eraser(?CB_KEY),
+ caps_cb(Ts, [Ref, Caps]).
-ccb([], _) ->
+caps_cb([], _) ->
ok;
-ccb([F | Rest], T) ->
+caps_cb([F | Rest], T) ->
case diameter_lib:eval([F|T]) of
ok ->
- ccb(Rest, T);
+ caps_cb(Rest, T);
N when ?IS_SUCCESS(N) -> %% 2xxx result code: accept immediately
N;
Res ->