diff options
Diffstat (limited to 'lib/diameter/src/base/diameter_peer_fsm.erl')
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 155 |
1 files changed, 123 insertions, 32 deletions
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 99644814d2..bbda62c32b 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -54,6 +54,12 @@ -define(NO_INBAND_SECURITY, 0). -define(TLS, 1). +%% Keys in process dictionary. +-define(CB_KEY, cb). %% capabilities callback +-define(DWA_KEY, dwa). %% outgoing DWA +-define(Q_KEY, q). %% transport start queue +-define(START_KEY, start). %% start of connected transport + %% A 2xxx series Result-Code. Not necessarily 2001. -define(IS_SUCCESS(N), 2 == (N) div 1000). @@ -115,6 +121,11 @@ %%% Output: Pid %%% --------------------------------------------------------------------------- +-spec start(T, [Opt], #diameter_service{}) + -> pid() + when T :: {connect|accept, diameter:transport_ref()}, + Opt :: diameter:transport_opt(). + %% diameter_config requires a non-empty list of applications on the %% service but diameter_service then constrains the list to any %% specified on the transport in question. Check here that the list is @@ -143,18 +154,17 @@ init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). -i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc0}) -> - putr(dwa, dwa(Caps)), +i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc}) -> + putr(?DWA_KEY, dwa(Caps)), {M, Ref} = T, {[Ts], Rest} = proplists:split(Opts, [capabilities_cb]), - putr(capabilities_cb, {Ref, [F || {_,F} <- Ts]}), - {ok, TPid, Svc} = start_transport(T, Rest, Svc0), - erlang:monitor(process, TPid), + putr(?CB_KEY, {Ref, [F || {_,F} <- Ts]}), erlang:monitor(process, WPid), + {TPid, Addrs} = start_transport(T, Rest, Svc), #state{parent = WPid, transport = TPid, mode = M, - service = Svc}. + service = svc(Svc, Addrs)}. %% The transport returns its local ip addresses so that different %% transports on the same service can use different local addresses. %% The local addresses are put into Host-IP-Address avps here when @@ -164,18 +174,56 @@ i({WPid, T, Opts, #diameter_service{capabilities = Caps} = Svc0}) -> %% watchdog start (start/2) succeeds regardless so as not to crash the %% service. -start_transport(T, Opts, Svc) -> - case diameter_peer:start(T, Opts, Svc) of - {ok, TPid} -> - {ok, TPid, Svc}; - {ok, TPid, [_|_] = Addrs} -> - #diameter_service{capabilities = Caps0} = Svc, - Caps = Caps0#diameter_caps{host_ip_address = Addrs}, - {ok, TPid, Svc#diameter_service{capabilities = Caps}}; +start_transport(T, Opts, #diameter_service{capabilities = Caps} = Svc) -> + Addrs0 = Caps#diameter_caps.host_ip_address, + start_transport(Addrs0, {T, Opts, Svc}). + +start_transport(Addrs0, T) -> + case diameter_peer:start(T) of + {TPid, Addrs, Tmo, Data} -> + erlang:monitor(process, TPid), + q_next(TPid, Addrs0, Tmo, Data), + {TPid, addrs(Addrs, Addrs0)}; No -> exit({shutdown, No}) end. +addrs([], Addrs0) -> + Addrs0; +addrs(Addrs, _) -> + Addrs. + +svc(Svc, []) -> + Svc; +svc(Svc, Addrs) -> + readdr(Svc, Addrs). + +readdr(#diameter_service{capabilities = Caps0} = Svc, Addrs) -> + Caps = Caps0#diameter_caps{host_ip_address = Addrs}, + Svc#diameter_service{capabilities = Caps}. + +%% The 4-tuple Data returned from diameter_peer:start/1 identifies the +%% transport module/config use to start the transport process in +%% question as well as any alternates to try if a connection isn't +%% established within Tmo. +q_next(TPid, Addrs0, Tmo, {_,_,_,_} = Data) -> + send_after(Tmo, {connection_timeout, TPid}), + putr(?Q_KEY, {Addrs0, Tmo, Data}). + +%% Connection has been established: retain the started +%% pid/module/config in the process dictionary. This is a part of the +%% interface defined by this module, so that the transport pid can be +%% found when constructing service_info (in order to extract further +%% information from it). +keep_transport(TPid) -> + {_, _, {{_,_,_} = T, _, _, _}} = eraser(?Q_KEY), + putr(?START_KEY, {TPid, T}). + +send_after(infinity, _) -> + ok; +send_after(Tmo, T) -> + erlang:send_after(Tmo, self(), T). + %% handle_call/3 handle_call(_, _, State) -> @@ -240,25 +288,48 @@ eraser(Key) -> %% Connection to peer. transition({diameter, {TPid, connected, Remote}}, - #state{state = PS, + #state{transport = TPid, + state = PS, mode = M} = S) -> 'Wait-Conn-Ack' = PS, %% assert connect = M, %% - send_CER(S#state{mode = {M, Remote}, - transport = TPid}); + keep_transport(TPid), + send_CER(S#state{mode = {M, Remote}}); %% Connection from peer. transition({diameter, {TPid, connected}}, - #state{state = PS, + #state{transport = TPid, + state = PS, mode = M, parent = Pid} = S) -> 'Wait-Conn-Ack' = PS, %% assert accept = M, %% + keep_transport(TPid), Pid ! {accepted, self()}, - start_timer(S#state{state = recv_CER, - transport = TPid}); + start_timer(S#state{state = recv_CER}); + +%% Connection established after receiving a connection_timeout +%% message. This may be followed by an incoming message which arrived +%% before the transport was killed and this can't be distinguished +%% from one from the transport that's been started to replace it. +transition({diameter, {_, connected}}, _) -> + {stop, connection_timeout}; +transition({diameter, {_, connected, _}}, _) -> + {stop, connection_timeout}; + +%% Connection has timed out: start an alternate. +transition({connection_timeout = T, TPid}, + #state{transport = TPid, + state = 'Wait-Conn-Ack'} + = S) -> + exit(TPid, {shutdown, T}), + start_next(S); + +%% Connect timeout after connection or alternate start: ignore. +transition({connection_timeout, _}, _) -> + ok; %% Incoming message from the transport. transition({diameter, {recv, Pkt}}, S) -> @@ -305,14 +376,21 @@ transition({resolve_port, _Pid} = T, #state{transport = TPid}) -> TPid ! T, ok; -%% Parent or transport has died. -transition({'DOWN', _, process, P, _}, - #state{parent = Pid, - transport = TPid}) - when P == Pid; - P == TPid -> +%% Parent has died. +transition({'DOWN', _, process, WPid, _}, + #state{parent = WPid}) -> stop; +%% Transport has died before connection timeout. +transition({'DOWN', _, process, TPid, _}, + #state{transport = TPid} + = S) -> + start_next(S); + +%% Transport has died after connection timeout. +transition({'DOWN', _, process, _, _}, _) -> + ok; + %% State query. transition({state, Pid}, #state{state = S, transport = TPid}) -> Pid ! {self(), [S, TPid]}, @@ -320,6 +398,19 @@ transition({state, Pid}, #state{state = S, transport = TPid}) -> %% Crash on anything unexpected. +%% start_next/1 + +start_next(#state{service = Svc0} = S) -> + case getr(?Q_KEY) of + {Addrs0, Tmo, Data} -> + Svc = readdr(Svc0, Addrs0), + {TPid, Addrs} = start_transport(Addrs0, {Svc, Tmo, Data}), + S#state{transport = TPid, + service = svc(Svc, Addrs)}; + undefined -> + stop + end. + %% send_CER/1 send_CER(#state{mode = {connect, Remote}, @@ -649,7 +740,7 @@ rc([RC|_]) -> %% answer/2 answer('DWR', _) -> - getr(dwa); + getr(?DWA_KEY); answer(Name, #state{service = #diameter_service{capabilities = Caps}}) -> a(Name, Caps). @@ -749,15 +840,15 @@ caps(#state{service = Svc}) -> %% caps_cb/1 caps_cb(Caps) -> - {Ref, Ts} = eraser(capabilities_cb), - ccb(Ts, [Ref, Caps]). + {Ref, Ts} = eraser(?CB_KEY), + caps_cb(Ts, [Ref, Caps]). -ccb([], _) -> +caps_cb([], _) -> ok; -ccb([F | Rest], T) -> +caps_cb([F | Rest], T) -> case diameter_lib:eval([F|T]) of ok -> - ccb(Rest, T); + caps_cb(Rest, T); N when ?IS_SUCCESS(N) -> %% 2xxx result code: accept immediately N; Res -> |