diff options
Diffstat (limited to 'lib/diameter/src/base/diameter_peer_fsm.erl')
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 289 |
1 files changed, 208 insertions, 81 deletions
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 3f4945f7a6..c4320fcb99 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -48,15 +48,19 @@ -include("diameter_internal.hrl"). -include("diameter_gen_base_rfc3588.hrl"). +%% Values of Disconnect-Cause in DPR. -define(GOAWAY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_DO_NOT_WANT_TO_TALK_TO_YOU'). -define(REBOOT, ?'DIAMETER_BASE_DISCONNECT-CAUSE_REBOOTING'). +-define(BUSY, ?'DIAMETER_BASE_DISCONNECT-CAUSE_BUSY'). -define(NO_INBAND_SECURITY, 0). -define(TLS, 1). %% Keys in process dictionary. -define(CB_KEY, cb). %% capabilities callback +-define(DPR_KEY, dpr). %% disconnect callback -define(DWA_KEY, dwa). %% outgoing DWA +-define(REF_KEY, ref). %% transport_ref() -define(Q_KEY, q). %% transport start queue -define(START_KEY, start). %% start of connected transport -define(SEQUENCE_KEY, mask). %% mask for sequence numbers @@ -68,28 +72,40 @@ %% A 2xxx series Result-Code. Not necessarily 2001. -define(IS_SUCCESS(N), 2 == (N) div 1000). +%% Guards. +-define(IS_UINT32(N), (is_integer(N) andalso 0 =< N andalso 0 == N bsr 32)). +-define(IS_TIMEOUT(N), ?IS_UINT32(N)). +-define(IS_CAUSE(N), N == ?REBOOT; N == rebooting; + N == ?GOAWAY; N == goaway; + N == ?BUSY; N == busy). + %% RFC 3588: %% %% Timeout An application-defined timer has expired while waiting %% for some event. %% -define(EVENT_TIMEOUT, 10000). +%% Default timeout for reception of CER/CEA. -%% How long to wait for a DPA in response to DPR before simply -%% aborting. Used to distinguish between shutdown and not but there's -%% not really any need. Stopping a service will require a timeout if -%% the peer doesn't answer DPR so the value should be short-ish. +%% Default timeout for DPA in response to DPR. A bit short but the +%% timeout used to be hardcoded. (So it could be worse.) -define(DPA_TIMEOUT, 1000). +-type uint32() :: diameter:'Unsigned32'(). + -record(state, - {state = 'Wait-Conn-Ack' %% state of RFC 3588 Peer State Machine - :: 'Wait-Conn-Ack' | recv_CER | 'Wait-CEA' | 'Open', + {state %% of RFC 3588 Peer State Machine + :: 'Wait-Conn-Ack' %% old code + | {'Wait-Conn-Ack', uint32()} + | recv_CER + | 'Wait-CEA' %% old code + | {'Wait-CEA', uint32(), uint32()} + | 'Open', mode :: accept | connect | {connect, reference()}, - parent :: pid(), - transport :: pid(), + parent :: pid(), %% watchdog process + transport :: pid(), %% transport process service :: #diameter_service{}, - dpr = false :: false | {diameter:'Unsigned32'(), - diameter:'Unsigned32'()}}). + dpr = false :: false | {uint32(), uint32()}}). %% | hop by hop and end to end identifiers %% There are non-3588 states possible as a consequence of 5.6.1 of the @@ -163,19 +179,24 @@ i({WPid, Type, Opts, #diameter_service{} = Svc}) -> %% from old code i({WPid, Type, Opts, {?NOMASK, [node() | nodes()], Svc}}); i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps, - capabilities = Caps} + capabilities = LCaps} = Svc}}) -> [] /= Apps orelse ?ERROR({no_apps, T, Opts}), - putr(?DWA_KEY, dwa(Caps)), + putr(?DWA_KEY, dwa(LCaps)), {M, Ref} = T, diameter_stats:reg(Ref), - {[Ts], Rest} = proplists:split(Opts, [capabilities_cb]), - putr(?CB_KEY, {Ref, [F || {_,F} <- Ts]}), + {[Cs,Ds], Rest} = proplists:split(Opts, [capabilities_cb, disconnect_cb]), + putr(?CB_KEY, {Ref, [F || {_,F} <- Cs]}), + putr(?DPR_KEY, [F || {_, F} <- Ds]), + putr(?REF_KEY, Ref), putr(?SEQUENCE_KEY, Mask), putr(?RESTRICT_KEY, Nodes), erlang:monitor(process, WPid), {TPid, Addrs} = start_transport(T, Rest, Svc), - #state{parent = WPid, + Tmo = proplists:get_value(capx_timeout, Opts, ?EVENT_TIMEOUT), + ?IS_TIMEOUT(Tmo) orelse ?ERROR({invalid, {capx_timeout, Tmo}}), + #state{state = {'Wait-Conn-Ack', Tmo}, + parent = WPid, transport = TPid, mode = M, service = svc(Svc, Addrs)}. @@ -188,8 +209,8 @@ i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps, %% watchdog start (start/2) succeeds regardless so as not to crash the %% service. -start_transport(T, Opts, #diameter_service{capabilities = Caps} = Svc) -> - Addrs0 = Caps#diameter_caps.host_ip_address, +start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) -> + Addrs0 = LCaps#diameter_caps.host_ip_address, start_transport(Addrs0, {T, Opts, Svc}). start_transport(Addrs0, T) -> @@ -212,9 +233,9 @@ svc(Svc, []) -> svc(Svc, Addrs) -> readdr(Svc, Addrs). -readdr(#diameter_service{capabilities = Caps0} = Svc, Addrs) -> - Caps = Caps0#diameter_caps{host_ip_address = Addrs}, - Svc#diameter_service{capabilities = Caps}. +readdr(#diameter_service{capabilities = LCaps0} = Svc, Addrs) -> + LCaps = LCaps0#diameter_caps{host_ip_address = Addrs}, + Svc#diameter_service{capabilities = LCaps}. %% The 4-tuple Data returned from diameter_peer:start/1 identifies the %% transport module/config use to start the transport process in @@ -313,13 +334,17 @@ eraser(Key) -> %% transition/2 +%% Started in old code. +transition(T, #state{state = 'Wait-Conn-Ack' = PS} = S) -> + transition(T, S#state{state = {PS, ?EVENT_TIMEOUT}}); + %% Connection to peer. transition({diameter, {TPid, connected, Remote}}, #state{transport = TPid, state = PS, mode = M} = S) -> - 'Wait-Conn-Ack' = PS, %% assert + {'Wait-Conn-Ack', _} = PS, %% assert connect = M, %% keep_transport(TPid), send_CER(S#state{mode = {M, Remote}}); @@ -331,11 +356,11 @@ transition({diameter, {TPid, connected}}, mode = M, parent = Pid} = S) -> - 'Wait-Conn-Ack' = PS, %% assert + {'Wait-Conn-Ack', Tmo} = PS, %% assert accept = M, %% keep_transport(TPid), Pid ! {accepted, self()}, - start_timer(S#state{state = recv_CER}); + start_timer(Tmo, S#state{state = recv_CER}); %% Connection established after receiving a connection_timeout %% message. This may be followed by an incoming message which arrived @@ -349,7 +374,7 @@ transition({diameter, {_, connected, _}}, _) -> %% Connection has timed out: start an alternate. transition({connection_timeout = T, TPid}, #state{transport = TPid, - state = 'Wait-Conn-Ack'} + state = {'Wait-Conn-Ack', _}} = S) -> exit(TPid, {shutdown, T}), start_next(S); @@ -364,7 +389,7 @@ transition({diameter, {recv, Pkt}}, S) -> %% Timeout when still in the same state ... transition({timeout, PS}, #state{state = PS}) -> - stop; + {stop, {capx(PS), timeout}}; %% ... or not. transition({timeout, _}, _) -> @@ -375,25 +400,19 @@ transition({send, Msg}, #state{transport = TPid}) -> send(TPid, Msg), ok; -%% Request for graceful shutdown. -transition({shutdown, Pid}, #state{parent = Pid, dpr = false} = S) -> - dpr(?GOAWAY, S); -transition({shutdown, Pid}, #state{parent = Pid}) -> - ok; - -%% Application shutdown. -transition(shutdown, #state{dpr = false} = S) -> - dpr(?REBOOT, S); -transition(shutdown, _) -> %% DPR already send: ensure expected timeout - dpa_timer(), +%% Messages from old (diameter_service) code. +transition(shutdown = T, #state{parent = Pid} = S) -> + transition({T, Pid, service}, S); %% Reason irrelevant: old code has no cb + +%% Request for graceful shutdown at remove_transport, stop_service of +%% application shutdown. +transition({shutdown = T, Pid}, S) -> + transition({T, Pid, transport}, S); +transition({shutdown, Pid, Reason}, #state{parent = Pid, dpr = false} = S) -> + dpr(Reason, S); +transition({shutdown, Pid, _}, #state{parent = Pid}) -> ok; -%% Request to close the transport connection. -transition({close = T, Pid}, #state{parent = Pid, - transport = TPid}) -> - diameter_peer:close(TPid), - {stop, T}; - %% DPA reception has timed out. transition(dpa_timeout, _) -> stop; @@ -425,6 +444,11 @@ transition({state, Pid}, #state{state = S, transport = TPid}) -> %% Crash on anything unexpected. +capx(recv_CER) -> + 'CER'; +capx({'Wait-CEA', _, _}) -> + 'CEA'. + %% start_next/1 start_next(#state{service = Svc0} = S) -> @@ -440,18 +464,23 @@ start_next(#state{service = Svc0} = S) -> %% send_CER/1 -send_CER(#state{mode = {connect, Remote}, - service = #diameter_service{capabilities = Caps}, +send_CER(#state{state = {'Wait-Conn-Ack', Tmo}, + mode = {connect, Remote}, + service = #diameter_service{capabilities = LCaps}, transport = TPid} = S) -> - OH = Caps#diameter_caps.origin_host, + OH = LCaps#diameter_caps.origin_host, req_send_CER(OH, Remote) orelse - close({already_connected, Remote, Caps}, S), + close({already_connected, Remote, LCaps}, S), CER = build_CER(S), ?LOG(send, 'CER'), - send(TPid, encode(CER)), - start_timer(S#state{state = 'Wait-CEA'}). + #diameter_packet{header = #diameter_header{end_to_end_id = Eid, + hop_by_hop_id = Hid}} + = Pkt + = encode(CER), + send(TPid, Pkt), + start_timer(Tmo, S#state{state = {'Wait-CEA', Hid, Eid}}). %% Register ourselves as connecting to the remote endpoint in %% question. This isn't strictly necessary since a peer implementing @@ -463,16 +492,16 @@ send_CER(#state{mode = {connect, Remote}, req_send_CER(OriginHost, Remote) -> register_everywhere({?MODULE, connection, OriginHost, {remote, Remote}}). -%% start_timer/1 +%% start_timer/2 -start_timer(#state{state = PS} = S) -> - erlang:send_after(?EVENT_TIMEOUT, self(), {timeout, PS}), +start_timer(Tmo, #state{state = PS} = S) -> + erlang:send_after(Tmo, self(), {timeout, PS}), S. %% build_CER/1 -build_CER(#state{service = #diameter_service{capabilities = Caps}}) -> - {ok, CER} = diameter_capx:build_CER(Caps), +build_CER(#state{service = #diameter_service{capabilities = LCaps}}) -> + {ok, CER} = diameter_capx:build_CER(LCaps), CER. %% encode/1 @@ -482,10 +511,8 @@ encode(Rec) -> Hdr = #diameter_header{version = ?DIAMETER_VERSION, end_to_end_id = Seq, hop_by_hop_id = Seq}, - Pkt = #diameter_packet{header = Hdr, - msg = Rec}, - #diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt), - Bin. + diameter_codec:encode(?BASE, #diameter_packet{header = Hdr, + msg = Rec}). sequence() -> case getr(?SEQUENCE_KEY) of @@ -553,7 +580,14 @@ discard(Reason, F, A) -> %% rcv/3 %% Incoming CEA. -rcv('CEA', Pkt, #state{state = 'Wait-CEA'} = S) -> +rcv('CEA', + #diameter_packet{header = #diameter_header{end_to_end_id = Eid, + hop_by_hop_id = Hid}} + = Pkt, + #state{state = {'Wait-CEA' = T, Hid, Eid}} + = S) -> + handle_CEA(Pkt, S#state{state = T}); +rcv('CEA', Pkt, #state{state = 'Wait-CEA'} = S) -> %% old code handle_CEA(Pkt, S); %% Incoming CER @@ -573,16 +607,16 @@ rcv(N, Pkt, S) N == 'DPR' -> handle_request(N, Pkt, S); -%% DPA even though we haven't sent DPR: ignore. -rcv('DPA', _Pkt, #state{dpr = false}) -> - ok; - -%% DPA in response to DPR. We could check the sequence numbers but -%% don't bother, just close. -rcv('DPA' = N, _Pkt, #state{transport = TPid}) -> +%% DPA in response to DPR and with the expected identifiers. +rcv('DPA' = N, + #diameter_packet{header = #diameter_header{end_to_end_id = Eid, + hop_by_hop_id = Hid}}, + #state{transport = TPid, + dpr = {Hid, Eid}}) -> diameter_peer:close(TPid), {stop, N}; +%% Ignore anything else, an unsolicited DPA in particular. rcv(_, _, _) -> ok. @@ -800,8 +834,8 @@ a('CER', #diameter_caps{vendor_id = Vid, {'Product-Name', Name}, {'Origin-State-Id', OSI}]; -a('DPR', #diameter_caps{origin_host = Host, - origin_realm = Realm}) -> +a('DPR', #diameter_caps{origin_host = {Host, _}, + origin_realm = {Realm, _}}) -> ['DPA', {'Origin-Host', Host}, {'Origin-Realm', Realm}]. @@ -909,7 +943,9 @@ rejected(N) %% open/5 -open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid} = S) -> +open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid, + service = Svc} + = S) -> #diameter_caps{origin_host = {_,_} = H, inband_security_id = {LS,_}} = Caps, @@ -917,7 +953,9 @@ open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid} = S) -> tls_ack(lists:member(?TLS, LS), Caps, Type, IS, S), Pid ! {open, self(), H, {Caps, SupportedApps, Pkt}}, - S#state{state = 'Open'}. + %% Replace capabilities record with local/remote pairs. + S#state{state = 'Open', + service = Svc#diameter_service{capabilities = Caps}}. %% We've advertised TLS support: tell the transport the result %% and expect a reply when the handshake is complete. @@ -970,24 +1008,113 @@ dwa(#diameter_caps{origin_host = OH, {'Origin-State-Id', OSI}]. %% dpr/2 +%% +%% The RFC isn't clear on whether DPR should be send in a non-Open +%% state. The Peer State Machine transitions it documents aren't +%% exhaustive (no Stop in Wait-I-CEA for example) so assume it's up to +%% the implementation and transition to Closed (ie. die) if we haven't +%% yet reached Open. + +%% Connection is open, DPR has not been sent. +dpr(Reason, #state{state = 'Open', + dpr = false, + service = #diameter_service{capabilities = Caps}} + = S) -> + case getr(?DPR_KEY) of + CBs when is_list(CBs) -> + Ref = getr(?REF_KEY), + Peer = {self(), Caps}, + dpr(CBs, [Reason, Ref, Peer], S); + undefined -> %% started in old code + send_dpr(Reason, [], S) + end; -dpr(Cause, #state{transport = TPid, - service = #diameter_service{capabilities = Caps}} - = S) -> - #diameter_caps{origin_host = OH, - origin_realm = OR} +%% Connection is open, DPR already sent. +dpr(_, #state{state = 'Open'}) -> + ok; + +%% Connection not open. +dpr(_Reason, _S) -> + stop. + +%% dpr/3 +%% +%% Note that an implementation that wants to do something +%% transport_module-specific can lookup the pid of the transport +%% process and contact it. (eg. diameter:service_info/2) + +dpr([CB|Rest], [Reason | _] = Args, S) -> + try diameter_lib:eval([CB | Args]) of + {dpr, Opts} when is_list(Opts) -> + send_dpr(Reason, Opts, S); + dpr -> + send_dpr(Reason, [], S); + close = T -> + {stop, {disconnect_cb, T}}; + ignore -> + dpr(Rest, Args, S); + T -> + No = {disconnect_cb, T}, + diameter_lib:error_report(invalid, No), + {stop, No} + catch + E:R -> + No = {disconnect_cb, E, R, ?STACK}, + diameter_lib:error_report(failure, No), + {stop, No} + end; + +dpr([], [Reason | _], S) -> + send_dpr(Reason, [], S). + +-record(opts, {cause, timeout = ?DPA_TIMEOUT}). + +send_dpr(Reason, Opts, #state{transport = TPid, + service = #diameter_service{capabilities = Caps}} + = S) -> + #opts{cause = Cause, timeout = Tmo} + = lists:foldl(fun opt/2, + #opts{cause = case Reason of + transport -> ?GOAWAY; + _ -> ?REBOOT + end, + timeout = ?DPA_TIMEOUT}, + Opts), + #diameter_caps{origin_host = {OH, _}, + origin_realm = {OR, _}} = Caps, - Bin = encode(['DPR', {'Origin-Host', OH}, + #diameter_packet{header = #diameter_header{end_to_end_id = Eid, + hop_by_hop_id = Hid}} + = Pkt + = encode(['DPR', {'Origin-Host', OH}, {'Origin-Realm', OR}, {'Disconnect-Cause', Cause}]), - send(TPid, Bin), - dpa_timer(), + send(TPid, Pkt), + dpa_timer(Tmo), ?LOG(send, 'DPR'), - S#state{dpr = diameter_codec:sequence_numbers(Bin)}. - -dpa_timer() -> - erlang:send_after(?DPA_TIMEOUT, self(), dpa_timeout). + S#state{dpr = {Hid, Eid}}. + +opt({timeout, Tmo}, Rec) + when ?IS_TIMEOUT(Tmo) -> + Rec#opts{timeout = Tmo}; +opt({cause, Cause}, Rec) + when ?IS_CAUSE(Cause) -> + Rec#opts{cause = cause(Cause)}; +opt(T, _) -> + ?ERROR({invalid_option, T}). + +cause(rebooting) -> ?REBOOT; +cause(goaway) -> ?GOAWAY; +cause(busy) -> ?BUSY; +cause(N) + when ?IS_CAUSE(N) -> + N; +cause(N) -> + ?ERROR({invalid_cause, N}). + +dpa_timer(Tmo) -> + erlang:send_after(Tmo, self(), dpa_timeout). %% register_everywhere/1 %% |