%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2010-2018. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %% %% %CopyrightEnd% %% %% %% This module implements (as a process) the RFC 3588/6733 Peer State %% Machine modulo the necessity of adapting the peer election to the %% fact that we don't know the identity of a peer until we've received %% a CER/CEA from it. %% -module(diameter_peer_fsm). -behaviour(gen_server). %% Interface towards diameter_watchdog. -export([start/3, result_code/2]). %% Interface towards diameter. -export([find/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% diameter_peer_fsm_sup callback -export([start_link/1]). %% internal callbacks -export([match/1]). -include_lib("diameter/include/diameter.hrl"). -include("diameter_internal.hrl"). %% Values of Disconnect-Cause in DPR. -define(GOAWAY, 2). %% DO_NOT_WANT_TO_TALK_TO_YOU -define(BUSY, 1). %% BUSY -define(REBOOT, 0). %% REBOOTING %% Values of Inband-Security-Id. -define(NO_INBAND_SECURITY, 0). -define(TLS, 1). %% Note that the a common dictionary hrl is purposely not included %% since the common dictionary is an argument to start/3. %% Keys in process dictionary. -define(CB_KEY, cb). %% capabilities callback -define(DPR_KEY, dpr). %% disconnect callback -define(DPA_KEY, dpa). %% timeout for incoming DPA, or shutdown after %% outgoing DPA -define(REF_KEY, ref). %% transport_ref() -define(Q_KEY, q). %% transport start queue -define(START_KEY, start). %% start of connected transport -define(SEQUENCE_KEY, mask). %% mask for sequence numbers -define(RESTRICT_KEY, restrict). %% nodes for connection check %% The default sequence mask. -define(NOMASK, {0,32}). %% A 2xxx series Result-Code. Not necessarily 2001. -define(IS_SUCCESS(N), 2 == (N) div 1000). %% Guards. -define(IS_UINT32(N), (is_integer(N) andalso 0 =< N andalso 0 == N bsr 32)). -define(IS_TIMEOUT(N), ?IS_UINT32(N)). -define(IS_CAUSE(N), N == ?REBOOT; N == rebooting; N == ?GOAWAY; N == goaway; N == ?BUSY; N == busy). %% RFC 6733: %% %% Timeout An application-defined timer has expired while waiting %% for some event. %% %% Default timeout for reception of CER/CEA. -define(CAPX_TIMEOUT, 10000). %% Default timeout for DPA to be received in response to an outgoing %% DPR. A bit short but the timeout used to be hardcoded. (So it could %% be worse.) -define(DPA_TIMEOUT, 1000). %% Default timeout for the connection to be closed by the peer %% following an outgoing DPA in response to an incoming DPR. It's the %% recipient of DPA that should close the connection according to the %% RFC. -define(DPR_TIMEOUT, 5000). -type uint32() :: diameter:'Unsigned32'(). -record(state, {state %% of RFC 3588 Peer State Machine :: {'Wait-Conn-Ack', uint32()} | recv_CER | {'Wait-CEA', uint32(), uint32()} | 'Open', mode :: accept | connect | {connect, reference()}, parent :: pid(), %% watchdog process transport :: pid(), %% transport process dictionary :: module(), %% common dictionary service :: #diameter_service{} | undefined, dpr = false :: false | true %% DPR received, DPA sent | {boolean(), uint32(), uint32()}, %% hop by hop and end to end identifiers in %% outgoing DPR; boolean says whether or not %% the request was sent explicitly with %% diameter:call/4. codec :: #{decode_format := diameter:decode_format(), string_decode := boolean(), strict_mbit := boolean(), rfc := 3588 | 6733, ordered_encode := false}, strict :: boolean(), ack = false :: boolean(), length_errors :: exit | handle | discard, incoming_maxlen :: integer() | infinity}). %% There are non-3588 states possible as a consequence of 5.6.1 of the %% standard and the corresponding problem for incoming CEA's: we don't %% know who we're talking to until either a CER or CEA has been %% received. The CEA problem in particular makes it impossible to %% follow the state machine exactly as documented in 3588: there can %% be no election until the CEA arrives and we have an Origin-Host to %% elect. %% %% Once upon a time start/2 started a process akin to that started by %% start/3 below, which in turn started a watchdog/transport process %% with the result that the watchdog could send DWR/DWA regardless of %% whether or not the corresponding Peer State Machine was in its open %% state; that is, before capabilities exchange had taken place. This %% is not what RFC's 3588 and 3539 say (albeit not very clearly). %% Watchdog messages are only exchanged on *open* connections, so the %% 3539 state machine is more naturally placed on top of the 3588 Peer %% State Machine rather than closer to the transport. This is what we %% now do below: connect/accept call diameter_watchdog and return the %% pid of the watchdog process, and the watchdog in turn calls start/3 %% below to start the process implementing the Peer State Machine. %% %% --------------------------------------------------------------------------- %% # start/3 %% --------------------------------------------------------------------------- -spec start(T, [Opt], {map(), [node()], module(), #diameter_service{}}) -> {reference(), pid()} when T :: {connect|accept, diameter:transport_ref()}, Opt :: diameter:transport_opt(). %% diameter_config requires a non-empty list of applications on the %% service but diameter_service then constrains the list to any %% specified on the transport in question. Check here that the list is %% still non-empty. start({_,_} = Type, Opts, S) -> Ack = make_ref(), T = {Ack, self(), Type, Opts, S}, {ok, Pid} = diameter_peer_fsm_sup:start_child(T), try {erlang:monitor(process, Pid), Pid} after Pid ! Ack end. start_link(T) -> {ok, _} = proc_lib:start_link(?MODULE, init, [T], infinity, diameter_lib:spawn_opts(server, [])). %% find/1 %% %% Identify both pids of a peer_fsm/transport pair. find(Pid) -> findl([{?MODULE, '_', Pid}, {?MODULE, Pid, '_'}]). findl([]) -> false; findl([Pat | Rest]) -> try [{{_, Pid, TPid}, Pid}] = diameter_reg:match(Pat), {Pid, TPid} catch error:_ -> findl(Rest) end. %% --------------------------------------------------------------------------- %% --------------------------------------------------------------------------- %% init/1 init(T) -> proc_lib:init_ack({ok, self()}), gen_server:enter_loop(?MODULE, [], i(T)). i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> erlang:monitor(process, WPid), wait(Ack, WPid), diameter_stats:reg(Ref), #{sequence := Mask, incoming_maxlen := Maxlen} = SvcOpts, {[Cs,Ds], Rest} = proplists:split(Opts, [capabilities_cb, disconnect_cb]), putr(?CB_KEY, {Ref, [F || {_,F} <- Cs]}), putr(?DPR_KEY, [F || {_, F} <- Ds]), putr(?REF_KEY, Ref), putr(?SEQUENCE_KEY, Mask), putr(?RESTRICT_KEY, Nodes), putr(?DPA_KEY, {proplists:get_value(dpr_timeout, Opts, ?DPR_TIMEOUT), proplists:get_value(dpa_timeout, Opts, ?DPA_TIMEOUT)}), Tmo = proplists:get_value(capx_timeout, Opts, ?CAPX_TIMEOUT), Strict = proplists:get_value(strict_capx, Opts, true), LengthErr = proplists:get_value(length_errors, Opts, exit), {TPid, Addrs} = start_transport(T, Rest, Svc), diameter_reg:add({?MODULE, self(), TPid}), %% lets pairs be discovered #state{state = {'Wait-Conn-Ack', Tmo}, parent = WPid, transport = TPid, dictionary = Dict0, mode = M, service = svc(Svc, Addrs), length_errors = LengthErr, strict = Strict, incoming_maxlen = Maxlen, codec = maps:with([decode_format, string_decode, strict_mbit, rfc, ordered_encode], SvcOpts#{ordered_encode => false})}. %% The transport returns its local ip addresses so that different %% transports on the same service can use different local addresses. %% The local addresses are put into Host-IP-Address avps here when %% sending capabilities exchange messages. %% Wait for the caller to have a monitor to avoid a race with our %% death. (Since the exit reason is used in diameter_service.) wait(Ref, Pid) -> receive Ref -> ok; {'DOWN', _, process, Pid, _} = D -> x(D) end. x(T) -> exit({shutdown, T}). start_transport(T, Opts, #diameter_service{capabilities = LCaps} = Svc) -> Addrs0 = LCaps#diameter_caps.host_ip_address, start_transport(Addrs0, {T, Opts, Svc}). start_transport(Addrs0, T) -> case diameter_peer:start(T) of {TPid, Addrs, Tmo, Data} -> erlang:monitor(process, TPid), q_next(TPid, Addrs0, Tmo, Data), {TPid, Addrs}; {error, No} -> x({no_connection, No}) end. svc(#diameter_service{capabilities = LCaps0} = Svc, Addrs) -> #diameter_caps{host_ip_address = Addrs0} = LCaps0, case Addrs0 of [] -> LCaps = LCaps0#diameter_caps{host_ip_address = Addrs}, Svc#diameter_service{capabilities = LCaps}; [_|_] -> Svc end. readdr(#diameter_service{capabilities = LCaps0} = Svc, Addrs) -> LCaps = LCaps0#diameter_caps{host_ip_address = Addrs}, Svc#diameter_service{capabilities = LCaps}. %% The 4-tuple Data returned from diameter_peer:start/1 identifies the %% transport module/config use to start the transport process in %% question as well as any alternates to try if a connection isn't %% established within Tmo. q_next(TPid, Addrs0, Tmo, {_,_,_,_} = Data) -> send_after(Tmo, {connection_timeout, TPid}), putr(?Q_KEY, {Addrs0, Tmo, Data}). %% Connection has been established: retain the started %% pid/module/config in the process dictionary. This is a part of the %% interface defined by this module, so that the transport pid can be %% found when constructing service_info (in order to extract further %% information from it). keep_transport(TPid) -> {_, _, {{_,_,_} = T, _, _, _}} = eraser(?Q_KEY), putr(?START_KEY, {TPid, T}). send_after(infinity, _) -> ok; send_after(Tmo, T) -> erlang:send_after(Tmo, self(), T). %% handle_call/3 handle_call(_, _, State) -> {reply, nok, State}. %% handle_cast/2 handle_cast(_, State) -> {noreply, State}. %% handle_info/1 handle_info(T, #state{} = State) -> try transition(T, State) of ok -> {noreply, State}; #state{state = X} = S -> ?LOGC(X /= State#state.state, transition, X), {noreply, S}; {stop, Reason} -> ?LOG(stop, Reason), {stop, {shutdown, Reason}, State}; stop -> ?LOG(stop, truncate(T)), {stop, {shutdown, T}, State} catch exit: {diameter_codec, encode, T} = Reason -> incr_error(send, T, State#state.dictionary), ?LOG(stop, Reason), {stop, {shutdown, Reason}, State}; {?MODULE, Tag, Reason} -> ?LOG(stop, Tag), {stop, {shutdown, Reason}, State} end. %% The form of the throw caught here is historical. It's %% significant that it's not a 2-tuple, as in ?FAILURE(Reason), %% since these are caught elsewhere. %% Note that there's no guarantee that the service and transport %% capabilities are good enough to build a CER/CEA that can be %% successfully encoded. It's not checked at diameter:add_transport/2 %% since this can be called before creating the service. %% terminate/2 terminate(_, _) -> ok. %% code_change/3 code_change(_, State, _) -> {ok, State}. %% --------------------------------------------------------------------------- %% --------------------------------------------------------------------------- truncate({'DOWN' = T, _, process, Pid, _}) -> {T, Pid}; truncate(T) -> T. putr(Key, Val) -> put({?MODULE, Key}, Val). getr(Key) -> get({?MODULE, Key}). eraser(Key) -> erase({?MODULE, Key}). %% transition/2 %% Connection to peer. transition({diameter, {TPid, connected, Remote}}, #state{transport = TPid, state = PS, mode = M} = S) -> {'Wait-Conn-Ack', _} = PS, %% assert connect = M, %% keep_transport(TPid), send_CER(S#state{mode = {M, Remote}}); transition({diameter, {TPid, connected, Remote, LAddrs}}, #state{transport = TPid, service = Svc} = S) -> transition({diameter, {TPid, connected, Remote}}, S#state{service = svc(Svc, LAddrs)}); %% Connection from peer. transition({diameter, {TPid, connected}}, #state{transport = TPid, state = PS, mode = M, parent = Pid} = S) -> {'Wait-Conn-Ack', Tmo} = PS, %% assert accept = M, %% keep_transport(TPid), Pid ! {accepted, self()}, start_timer(Tmo, S#state{state = recv_CER}); %% Connection established after receiving a connection_timeout %% message. This may be followed by an incoming message which arrived %% before the transport was killed and this can't be distinguished %% from one from the transport that's been started to replace it. transition({diameter, T}, _) when tuple_size(T) < 5, connected == element(2,T) -> {stop, connection_timeout}; %% Connection has timed out: start an alternate. transition({connection_timeout = T, TPid}, #state{transport = TPid, state = {'Wait-Conn-Ack', _}} = S) -> exit(TPid, {shutdown, T}), start_next(S); %% Connect timeout after connection or alternate start: ignore. transition({connection_timeout, _}, _) -> ok; %% Requests for acknowledgements to the transport. transition({diameter, ack}, S) -> S#state{ack = true}; %% Incoming message from the transport. transition({diameter, {recv, Msg}}, S) -> incoming(recv(Msg, S), S); %% Handler of an incoming request is telling of its existence. transition({handler, Pid}, _) -> put_route(Pid), ok; %% Timeout when still in the same state ... transition({timeout = T, PS}, #state{state = PS}) -> {stop, {capx(PS), T}}; %% ... or not. transition({timeout, _}, _) -> ok; %% Outgoing message. transition({send, Msg}, S) -> outgoing(Msg, S); transition({send, Msg, Route}, S) -> route_outgoing(Route), outgoing(Msg, S); %% Request for graceful shutdown at remove_transport, stop_service of %% application shutdown. transition({shutdown, Pid, Reason}, #state{parent = Pid, dpr = false} = S) -> dpr(Reason, S); transition({shutdown, Pid, _}, #state{parent = Pid}) -> ok; %% DPA reception has timed out, or peer has not closed the connection %% as a result of outgoing DPA. transition(dpa_timeout, _) -> stop; %% Someone wants to know a resolved port: forward to the transport process. transition({resolve_port, _Pid} = T, #state{transport = TPid}) -> TPid ! T, ok; %% Parent has died. transition({'DOWN', _, process, WPid, _}, #state{parent = WPid}) -> stop; %% Transport has died before connection timeout. transition({'DOWN', _, process, TPid, _}, #state{transport = TPid} = S) -> start_next(S#state{ack = false}); %% Transport has died after connection timeout, or handler process has %% died. transition({'DOWN', _, process, Pid, _}, #state{transport = TPid}) -> is_reference(erase_route(Pid)) andalso send(TPid, false), %% answer not forthcoming ok; %% State query. transition({state, Pid}, #state{state = S, transport = TPid}) -> Pid ! {self(), [S, TPid]}, ok. %% Crash on anything unexpected. %% route_outgoing/1 %% Map identifiers in an outgoing request to be able to lookup the %% handler process when the answer is received. route_outgoing({Pid, Ref, Seqs}) -> %% request MRef = monitor(process, Pid), put(Pid, Seqs), put(Seqs, {Pid, Ref, MRef}); %% Remove a mapping made for an incoming request. route_outgoing(Pid) when is_pid(Pid) -> %% answer MRef = erase_route(Pid), undefined == MRef orelse demonitor(MRef). %% put_route/1 %% Monitor on a handler process for an incoming request. put_route(Pid) -> MRef = monitor(process, Pid), put(Pid, MRef). %% get_route/3 %% Incoming answer. get_route(_, _, #diameter_packet{header = #diameter_header{is_request = false}} = Pkt) -> Seqs = diameter_codec:sequence_numbers(Pkt), case erase(Seqs) of {Pid, Ref, MRef} -> demonitor(MRef), erase(Pid), {Pid, Ref, self()}; undefined -> %% request unknown false end; %% Requests answered here ... get_route(_, N, _) when N == 'CER'; N == 'DPR' -> false; %% ... or not. get_route(Ack, _, _) -> Ack. %% erase_route/1 erase_route(Pid) -> case erase(Pid) of {_,_} = Seqs -> erase(Seqs); T -> T end. %% capx/1 capx(recv_CER) -> 'CER'; capx({'Wait-CEA', _, _}) -> 'CEA'. %% start_next/1 start_next(#state{service = Svc0} = S) -> case getr(?Q_KEY) of {Addrs0, Tmo, Data} -> Svc = readdr(Svc0, Addrs0), {TPid, Addrs} = start_transport(Addrs0, {Svc, Tmo, Data}), S#state{transport = TPid, service = svc(Svc, Addrs)}; undefined -> stop end. %% send_CER/1 send_CER(#state{state = {'Wait-Conn-Ack', Tmo}, mode = {connect, Remote}, service = #diameter_service{capabilities = LCaps}, transport = TPid, dictionary = Dict, codec = Opts} = S) -> OH = LCaps#diameter_caps.origin_host, req_send_CER(OH, Remote) orelse close({already_connected, Remote, LCaps}), CER = build_CER(S), #diameter_packet{header = #diameter_header{end_to_end_id = Eid, hop_by_hop_id = Hid}} = Pkt = encode(CER, Opts, Dict), incr(send, Pkt, Dict), send(TPid, Pkt), ?LOG(send, 'CER'), start_timer(Tmo, S#state{state = {'Wait-CEA', Hid, Eid}}). %% Register ourselves as connecting to the remote endpoint in %% question. This isn't strictly necessary since a peer implementing %% the 3588 Peer State Machine should reject duplicate connection's %% from the same peer but there's little point in us setting up a %% duplicate connection in the first place. This could also include %% the transport protocol being used but since we're blind to %% transport just avoid duplicate connections to the same host/port. req_send_CER(OriginHost, Remote) -> register_everywhere({?MODULE, connection, OriginHost, {remote, Remote}}). %% start_timer/2 start_timer(Tmo, #state{state = PS} = S) -> erlang:send_after(Tmo, self(), {timeout, PS}), S. %% build_CER/1 build_CER(#state{service = #diameter_service{capabilities = LCaps}, dictionary = Dict}) -> {ok, CER} = diameter_capx:build_CER(LCaps, Dict), CER. %% encode/3 encode(Rec, Opts, Dict) -> Seq = diameter_session:sequence({_,_} = getr(?SEQUENCE_KEY)), Hdr = #diameter_header{version = ?DIAMETER_VERSION, end_to_end_id = Seq, hop_by_hop_id = Seq}, diameter_codec:encode(Dict, Opts, #diameter_packet{header = Hdr, msg = Rec}). %% incoming/2 incoming(#diameter_header{is_request = R}, #state{transport = TPid, ack = Ack}) -> R andalso Ack andalso send(TPid, false), ok; incoming(<<_:32, 1:1, _/bits>>, #state{ack = true} = S) -> send(S#state.transport, false), ok; incoming(<<_/bits>>, _) -> ok; incoming(T, _) -> T. %% recv/2 recv(#diameter_packet{bin = Bin} = Pkt, S) -> recv(Bin, Pkt, S); recv(Bin, S) -> recv(Bin, Bin, S). %% recv/3 recv(Bin, Msg, S) -> recv(diameter_codec:decode_header(Bin), Bin, Msg, S). %% recv/4 recv(false, Bin, _, #state{length_errors = E}) -> invalid(E, truncated_header, Bin), Bin; recv(#diameter_header{length = Len} = H, Bin, Msg, #state{length_errors = E, incoming_maxlen = M, dictionary = Dict0} = S) when E == handle; 0 == Len rem 4, bit_size(Bin) == 8*Len, size(Bin) =< M -> recv1(diameter_codec:msg_name(Dict0, H), H, Msg, S); recv(H, Bin, _, #state{incoming_maxlen = M}) when M < size(Bin) -> invalid(false, incoming_maxlen_exceeded, {size(Bin), H}), H; recv(H, Bin, _, #state{length_errors = E}) -> T = {size(Bin), bit_size(Bin) rem 8, H}, invalid(E, message_length_mismatch, T), H. %% recv1/4 %% Ignore anything but an expected CER/CEA if so configured. This is %% non-standard behaviour. recv1(Name, H, _, #state{state = {'Wait-CEA', _, _}, strict = false}) when Name /= 'CEA' -> H; recv1(Name, H, _, #state{state = recv_CER, strict = false}) when Name /= 'CER' -> H; %% Incoming request after outgoing DPR: discard. Don't discard DPR, so %% both ends don't do so when sending simultaneously. recv1(Name, #diameter_header{is_request = true} = H, _, #state{dpr = {_,_,_}}) when Name /= 'DPR' -> invalid(false, recv_after_outgoing_dpr, H), H; %% Incoming request after incoming DPR: discard. recv1(_, #diameter_header{is_request = true} = H, _, #state{dpr = true}) -> invalid(false, recv_after_incoming_dpr, H), H; %% DPA with identifier mismatch, or in response to a DPR initiated by %% the service. recv1('DPA' = Name, #diameter_header{hop_by_hop_id = Hid, end_to_end_id = Eid} = H, Msg, #state{dpr = {X,HI,EI}} = S) when HI /= Hid; EI /= Eid; not X -> Pkt = pkt(H, Msg), handle(Name, Pkt, S); %% Any other message with a header and no length errors. recv1(Name, H, Msg, #state{parent = Pid, ack = Ack} = S) -> Pkt = pkt(H, Msg), Pid ! {recv, self(), get_route(Ack, Name, Pkt), Name, Pkt}, handle(Name, Pkt, S). %% pkt/2 pkt(H, Bin) when is_binary(Bin) -> #diameter_packet{header = H, bin = Bin}; pkt(H, Pkt) -> Pkt#diameter_packet{header = H}. %% invalid/3 %% Note that counters here only count discarded messages. invalid(E, Reason, T) -> diameter_stats:incr(Reason), E == exit andalso close({Reason, T}), ?LOG(Reason, T), ok. %% handle/3 %% Incoming CEA. handle('CEA' = N, #diameter_packet{header = #diameter_header{end_to_end_id = Eid, hop_by_hop_id = Hid}} = Pkt, #state{state = {'Wait-CEA', Hid, Eid}} = S) -> ?LOG(recv, N), handle_CEA(Pkt, S); %% Incoming CER handle('CER' = N, Pkt, #state{state = recv_CER} = S) -> handle_request(N, Pkt, S); %% Anything but CER/CEA in a non-Open state is an error, as is %% CER/CEA in anything but recv_CER/Wait-CEA. handle(Name, _, #state{state = PS}) when PS /= 'Open'; Name == 'CER'; Name == 'CEA' -> {stop, {Name, PS}}; handle('DPR' = N, Pkt, S) -> handle_request(N, Pkt, S); %% DPA in response to DPR, with the expected identifiers. handle('DPA' = N, #diameter_packet{header = #diameter_header{end_to_end_id = Eid, hop_by_hop_id = Hid} = H} = Pkt, #state{dictionary = Dict0, transport = TPid, dpr = {X, Hid, Eid}, codec = Opts}) -> ?LOG(recv, N), X orelse begin %% Only count DPA in response to a DPR sent by the %% service: explicit DPR is counted in the same way %% as other explicitly sent requests. incr(recv, H, Dict0), {_, RecPkt} = decode(Dict0, Opts, Pkt), incr_rc(recv, RecPkt, Dict0) end, diameter_peer:close(TPid), {stop, N}; %% Ignore an unsolicited DPA in particular. Note that dpa_timeout %% deals with the case in which the peer sends the wrong identifiers %% in DPA. handle('DPA' = N, #diameter_packet{header = H}, _) -> ?LOG(ignored, N), %% Note that these aren't counted in the normal recv counter. diameter_stats:incr({diameter_codec:msg_id(H), recv, ignored}), ok; handle(_, _, _) -> ok. %% incr/3 incr(Dir, Hdr, Dict0) -> diameter_traffic:incr(Dir, Hdr, self(), Dict0). %% incr_rc/3 incr_rc(Dir, Pkt, Dict0) -> diameter_traffic:incr_rc(Dir, Pkt, self(), Dict0). %% incr_error/3 incr_error(Dir, Pkt, Dict0) -> diameter_traffic:incr_error(Dir, Pkt, self(), Dict0). %% send/2 %% Msg here could be a #diameter_packet or a binary depending on who's %% sending. In particular, the watchdog will send DWR as a binary %% while messages coming from clients will be in a #diameter_packet. send(Pid, Msg) -> diameter_peer:send(Pid, Msg). %% outgoing/2 %% Explicit DPR. outgoing(#diameter_packet{header = #diameter_header{application_id = 0, cmd_code = 282, is_request = true} = H} = Pkt, #state{dpr = T, parent = Pid} = S) -> if T == false -> inform_dpr(Pid), send_dpr(true, Pkt, dpa_timeout(), S); T == true -> invalid(false, dpr_after_dpa, H); %% DPA sent: discard true -> invalid(false, dpr_after_dpr, H) %% DPR sent: discard end; %% Explicit CER or DWR: discard. These are sent by us. outgoing(#diameter_packet{header = #diameter_header{application_id = 0, cmd_code = C, is_request = true} = H}, _) when 257 == C; %% CER 280 == C -> %% DWR invalid(false, invalid_request, H); %% DPR not sent: send. outgoing(Msg, #state{transport = TPid, dpr = false}) -> send(TPid, Msg), ok; %% Outgoing answer: send. outgoing(#diameter_packet{header = #diameter_header{is_request = false}} = Pkt, #state{transport = TPid}) -> send(TPid, Pkt), ok; %% Outgoing request: discard. outgoing(Msg, #state{}) -> invalid(false, send_after_dpr, header(Msg)). header(#diameter_packet{header = H}) -> H; header(Bin) -> %% DWR diameter_codec:decode_header(Bin). %% handle_request/3 %% %% Incoming CER or DPR. handle_request(Name, #diameter_packet{header = H} = Pkt, #state{dictionary = Dict0, codec = Opts} = S) -> ?LOG(recv, Name), incr(recv, H, Dict0), send_answer(Name, decode(Dict0, Opts, Pkt), S). %% decode/3 %% %% Decode the message as record for diameter_capx, and in the %% configured format for events. decode(Dict0, Opts, Pkt) -> {diameter_codec:decode(Dict0, Opts, Pkt), diameter_codec:decode(Dict0, Opts#{decode_format := record}, Pkt)}. %% send_answer/3 send_answer(Type, {DecPkt, RecPkt}, #state{transport = TPid, dictionary = Dict, codec = Opts} = S) -> incr_error(recv, RecPkt, Dict), #diameter_packet{header = H, transport_data = TD} = RecPkt, {Msg, PostF} = build_answer(Type, DecPkt, RecPkt, S), %% An answer message clears the R and T flags and retains the P %% flag. The E flag is set at encode. Pkt = #diameter_packet{header = H#diameter_header{version = ?DIAMETER_VERSION, is_request = false, is_error = undefined, is_retransmitted = false}, msg = Msg, transport_data = TD}, AnsPkt = diameter_codec:encode(Dict, Opts, Pkt), incr(send, AnsPkt, Dict), incr_rc(send, AnsPkt, Dict), send(TPid, AnsPkt), ?LOG(send, ans(Type)), eval(PostF, S). ans('CER') -> 'CEA'; ans('DPR') -> 'DPA'. eval([F|A], S) -> apply(F, A ++ [S]); eval(T, _) -> close(T). %% build_answer/4 build_answer('CER', DecPkt, #diameter_packet{msg = CER, header = #diameter_header{version = ?DIAMETER_VERSION, is_error = false}, errors = []}, #state{dictionary = Dict0} = S) -> {SupportedApps, RCaps, CEA} = recv_CER(CER, S), [RC, IS] = Dict0:'#get-'(['Result-Code', 'Inband-Security-Id'], CEA), #diameter_caps{origin_host = {OH, DH}} = Caps = capz(caps(S), RCaps), try 2001 == RC %% DIAMETER_SUCCESS orelse ?THROW(RC), register_everywhere({?MODULE, connection, OH, DH}) orelse ?THROW(4003), %% DIAMETER_ELECTION_LOST caps_cb(Caps) of N -> {cea(CEA, N, Dict0), [fun open/5, DecPkt, SupportedApps, Caps, {accept, inband_security(IS)}]} catch ?FAILURE(Reason) -> rejected(Reason, {'CER', Reason, Caps, DecPkt}, S) end; %% The error checks below are similar to those in diameter_traffic for %% other messages. Should factor out the commonality. build_answer(Type, DecPkt, #diameter_packet{header = H, errors = Es}, S) -> {RC, FailedAVP} = result_code(Type, H, Es), {answer(Type, RC, FailedAVP, S), post(Type, RC, DecPkt, S)}. inband_security([]) -> ?NO_INBAND_SECURITY; inband_security([IS]) -> IS. cea(CEA, ok, _) -> CEA; cea(CEA, 2001, _) -> CEA; cea(CEA, RC, Dict0) -> Dict0:'#set-'({'Result-Code', RC}, CEA). post('CER' = T, RC, Pkt, S) -> {T, caps(S), {RC, Pkt}}; post('DPR', _, _, #state{parent = Pid}) -> [fun(S) -> dpr_timer(), inform_dpr(Pid), dpr(S) end]. dpr(#state{dpr = false} = S) -> %% not awaiting DPA S#state{dpr = true}; %% DPR received dpr(S) -> %% DPR already sent or received S. inform_dpr(Pid) -> Pid ! {'DPR', self()}. %% tell watchdog to die with us rejected({capabilities_cb, _F, Reason}, T, S) -> rejected(Reason, T, S); rejected(discard, T, _) -> close(T); rejected({N, Es}, T, S) -> {answer('CER', N, failed_avp(N, Es), S), T}; rejected(N, T, S) -> {answer('CER', N, [], S), T}. failed_avp(RC, [{RC, Avp} | _]) -> [{'Failed-AVP', [[{'AVP', [Avp]}]]}]; failed_avp(RC, [_ | Es]) -> failed_avp(RC, Es); failed_avp(_, [] = No) -> No. answer(Type, RC, FailedAVP, S) -> set(answer(Type, RC, S), FailedAVP). answer(Type, RC, S) -> answer_message(answer(Type, S), RC). %% answer_message/2 answer_message([_ | Avps], RC) when 3000 =< RC, RC < 4000 -> ['answer-message', {'Result-Code', RC} | lists:filter(fun is_origin/1, Avps)]; answer_message(Msg, RC) -> Msg ++ [{'Result-Code', RC}]. is_origin({N, _}) -> N == 'Origin-Host' orelse N == 'Origin-Realm' orelse N == 'Origin-State-Id'. %% set/2 set(Ans, []) -> Ans; set(['answer-message' | _] = Ans, FailedAvp) -> Ans ++ [{'AVP', [FailedAvp]}]; set([_|_] = Ans, FailedAvp) -> Ans ++ FailedAvp. %% result_code/3 %% Be lenient with errors in DPR since there's no reason to be %% otherwise. Rejecting may cause the peer to missinterpret the error %% as meaning that the connection should not be closed, which may well %% lead to more problems than any errors in the DPR. result_code('DPR', _, _) -> {2001, []}; result_code('CER', H, Es) -> result_code(H, Es). %% result_code/2 result_code(#diameter_header{is_error = true}, _) -> {3008, []}; %% DIAMETER_INVALID_HDR_BITS result_code(#diameter_header{version = ?DIAMETER_VERSION}, Es) -> rc(Es); result_code(_, _) -> {5011, []}. %% DIAMETER_UNSUPPORTED_VERSION %% rc/1 rc([]) -> {2001, []}; %% DIAMETER_SUCCESS rc([{RC, _} | _] = Es) -> {RC, failed_avp(RC, Es)}; rc([RC|_]) -> {RC, []}. %% DIAMETER_INVALID_HDR_BITS 3008 %% A request was received whose bits in the Diameter header were %% either set to an invalid combination, or to a value that is %% inconsistent with the command code's definition. %% DIAMETER_INVALID_AVP_BITS 3009 %% A request was received that included an AVP whose flag bits are %% set to an unrecognized value, or that is inconsistent with the %% AVP's definition. %% ELECTION_LOST 4003 %% The peer has determined that it has lost the election process and %% has therefore disconnected the transport connection. %% DIAMETER_NO_COMMON_APPLICATION 5010 %% This error is returned when a CER message is received, and there %% are no common applications supported between the peers. %% DIAMETER_UNSUPPORTED_VERSION 5011 %% This error is returned when a request was received, whose version %% number is unsupported. %% answer/2 answer(Name, #state{service = #diameter_service{capabilities = Caps}}) -> a(Name, Caps). a('CER', #diameter_caps{vendor_id = Vid, origin_host = Host, origin_realm = Realm, host_ip_address = Addrs, product_name = Name, origin_state_id = OSI}) -> ['CEA', {'Origin-Host', Host}, {'Origin-Realm', Realm}, {'Host-IP-Address', Addrs}, {'Vendor-Id', Vid}, {'Product-Name', Name}, {'Origin-State-Id', OSI}]; a('DPR', #diameter_caps{origin_host = {Host, _}, origin_realm = {Realm, _}}) -> ['DPA', {'Origin-Host', Host}, {'Origin-Realm', Realm}]. %% recv_CER/2 recv_CER(CER, #state{service = Svc, dictionary = Dict}) -> case diameter_capx:recv_CER(CER, Svc, Dict) of {ok, T} -> T; {error, Reason} -> close({'CER', CER, Svc, Dict, Reason}) end. %% handle_CEA/2 handle_CEA(#diameter_packet{header = H} = Pkt, #state{dictionary = Dict0, service = #diameter_service{capabilities = LCaps}, codec = Opts} = S) -> incr(recv, H, Dict0), {DecPkt, RecPkt} = decode(Dict0, Opts, Pkt), RC = result_code(incr_rc(recv, RecPkt, Dict0)), {SApps, IS, RCaps} = recv_CEA(RecPkt, S), #diameter_caps{origin_host = {OH, DH}} = Caps = capz(LCaps, RCaps), %% Ensure that we don't already have a connection to the peer in %% question. This isn't the peer election of 3588 except in the %% sense that, since we don't know who we're talking to until we %% receive a CER/CEA, the first that arrives wins the right to a %% connection with the peer. try is_integer(RC) andalso ?IS_SUCCESS(RC) orelse ?THROW(RC), [] == SApps andalso ?THROW(no_common_application), [] == IS andalso ?THROW(no_common_security), register_everywhere({?MODULE, connection, OH, DH}) orelse ?THROW(election_lost), caps_cb(Caps) of _ -> open(DecPkt, SApps, Caps, {connect, hd([_] = IS)}, S) catch ?FAILURE(Reason) -> close({'CEA', Reason, Caps, DecPkt}) end. %% Check more than the result code since the peer could send success %% regardless. If not 2001 then a peer_up callback could do anything %% required. It's not unimaginable that a peer agreeing to TLS after %% capabilities exchange could send DIAMETER_LIMITED_SUCCESS = 2002, %% even if this isn't required by RFC 3588. result_code({'Result-Code', N}) -> N; result_code(_) -> undefined. %% recv_CEA/2 recv_CEA(#diameter_packet{header = #diameter_header{version = ?DIAMETER_VERSION, is_error = false}, msg = CEA, errors = []}, #state{service = Svc, dictionary = Dict}) -> case diameter_capx:recv_CEA(CEA, Svc, Dict) of {ok, T} -> T; {error, Reason} -> close({'CEA', CEA, Svc, Dict, Reason}) end; recv_CEA(Pkt, S) -> close({'CEA', caps(S), Pkt}). caps(#diameter_service{capabilities = Caps}) -> Caps; caps(#state{service = Svc}) -> caps(Svc). %% caps_cb/1 caps_cb(Caps) -> {Ref, Ts} = eraser(?CB_KEY), caps_cb(Ts, [Ref, Caps]). caps_cb([], _) -> ok; caps_cb([F | Rest], T) -> case diameter_lib:eval([F|T]) of ok -> caps_cb(Rest, T); N when ?IS_SUCCESS(N) -> %% 2xxx result code: accept immediately N; Res -> ?THROW({capabilities_cb, F, rejected(Res)}) end. %% Note that returning 2xxx causes the capabilities exchange to be %% accepted directly, without further callbacks. rejected(discard = T) -> T; rejected(unknown) -> 3010; %% DIAMETER_UNKNOWN_PEER rejected(N) when is_integer(N) -> N. %% open/5 open(Pkt, SupportedApps, Caps, {Type, IS}, #state{parent = Pid, service = Svc} = S) -> #diameter_caps{origin_host = {_,_} = H, inband_security_id = {LS,_}} = Caps, tls_ack(lists:member(?TLS, LS), Caps, Type, IS, S), Pid ! {open, self(), H, {Caps, SupportedApps, Pkt}}, %% Replace capabilities record with local/remote pairs. S#state{state = 'Open', service = Svc#diameter_service{capabilities = Caps}}. %% We've advertised TLS support: tell the transport the result %% and expect a reply when the handshake is complete. tls_ack(true, Caps, Type, IS, #state{transport = TPid}) -> Ref = make_ref(), TPid ! {diameter, {tls, Ref, Type, IS == ?TLS}}, receive {diameter, {tls, Ref}} -> ok; {'DOWN', _, process, TPid, Reason} -> close({tls_ack, Reason, Caps}) end; %% Or not. Don't send anything to the transport so that transports %% not supporting TLS work as before without modification. tls_ack(false, _, _, _, _) -> ok. capz(#diameter_caps{} = L, #diameter_caps{} = R) -> #diameter_caps{} = list_to_tuple([diameter_caps | lists:zip(tl(tuple_to_list(L)), tl(tuple_to_list(R)))]). %% close/1 %% %% A good function to trace on in case of problems with capabilities %% exchange. close(Reason) -> throw({?MODULE, close, Reason}). %% dpr/2 %% %% The RFC isn't clear on whether DPR should be sent in a non-Open %% state. The Peer State Machine transitions it documents aren't %% exhaustive (no Stop in Wait-I-CEA for example) so assume it's up to %% the implementation and transition to Closed (ie. die) if we haven't %% yet reached Open. %% Connection is open, DPR has not been sent. dpr(Reason, #state{state = 'Open', dpr = false, service = #diameter_service{capabilities = Caps}} = S) -> CBs = getr(?DPR_KEY), Ref = getr(?REF_KEY), Peer = {self(), Caps}, dpr(CBs, [Reason, Ref, Peer], S); %% Connection is open, DPR already sent or received. dpr(_, #state{state = 'Open'}) -> ok; %% Connection not open. dpr(_Reason, _S) -> stop. %% dpr/3 %% %% Note that an implementation that wants to do something %% transport_module-specific can lookup the pid of the transport %% process and contact it. (eg. diameter:service_info/2) dpr([CB|Rest], [Reason | _] = Args, S) -> case diameter_lib:eval([CB | Args]) of {dpr, Opts} when is_list(Opts) -> send_dpr(Reason, Opts, S); dpr -> send_dpr(Reason, [], S); close = T -> {stop, {disconnect_cb, T}}; ignore -> dpr(Rest, Args, S); T -> ?ERROR({disconnect_cb, CB, Args, T}) end; dpr([], [Reason | _], S) -> send_dpr(Reason, [], S). -record(opts, {cause, timeout}). send_dpr(Reason, DprOpts, #state{dictionary = Dict, service = #diameter_service{capabilities = Caps}, codec = Opts} = S) -> #opts{cause = Cause, timeout = Tmo} = lists:foldl(fun opt/2, #opts{cause = case Reason of transport -> ?GOAWAY; _ -> ?REBOOT end, timeout = dpa_timeout()}, DprOpts), #diameter_caps{origin_host = {OH, _}, origin_realm = {OR, _}} = Caps, Pkt = encode(['DPR', {'Origin-Host', OH}, {'Origin-Realm', OR}, {'Disconnect-Cause', Cause}], Opts, Dict), send_dpr(false, Pkt, Tmo, S). %% send_dpr/4 send_dpr(X, #diameter_packet{header = #diameter_header{end_to_end_id = Eid, hop_by_hop_id = Hid}} = Pkt, Tmo, #state{transport = TPid, dictionary = Dict} = S) -> %% Only count DPR sent by the service: explicit DPR is counted in %% the same way as other explicitly sent requests. X orelse incr(send, Pkt, Dict), send(TPid, Pkt), dpa_timer(Tmo), ?LOG(send, 'DPR'), S#state{dpr = {X, Hid, Eid}}. %% opt/2 opt({timeout, Tmo}, Rec) when ?IS_TIMEOUT(Tmo) -> Rec#opts{timeout = Tmo}; opt({cause, Cause}, Rec) when ?IS_CAUSE(Cause) -> Rec#opts{cause = cause(Cause)}; opt(T, _) -> ?ERROR({invalid_option, T}). cause(rebooting) -> ?REBOOT; cause(goaway) -> ?GOAWAY; cause(busy) -> ?BUSY; cause(N) when ?IS_CAUSE(N) -> N; cause(N) -> ?ERROR({invalid_cause, N}). dpa_timer(Tmo) -> erlang:send_after(Tmo, self(), dpa_timeout). dpa_timeout() -> {_, Tmo} = getr(?DPA_KEY), Tmo. dpr_timer() -> dpa_timer(dpr_timeout()). dpr_timeout() -> {Tmo, _} = getr(?DPA_KEY), Tmo. %% register_everywhere/1 %% %% Register a term and ensure it's not registered elsewhere. Note that %% two process that simultaneously register the same term may well %% both fail to do so this isn't foolproof. %% %% Everywhere is no longer everywhere, it's where a %% restrict_connections service_opt() specifies. register_everywhere(T) -> reg(getr(?RESTRICT_KEY), T). reg(Nodes, T) -> add(lists:member(node(), Nodes), T) andalso unregistered(Nodes, T). add(true, T) -> diameter_reg:add_new(T); add(false, T) -> diameter_reg:add(T). %% unregistered %% %% Ensure that the term in question isn't registered on other nodes. unregistered(Nodes, T) -> {ResL, _} = rpc:multicall(Nodes, ?MODULE, match, [{node(), T}]), lists:all(fun nomatch/1, ResL). nomatch({badrpc, {'EXIT', {undef, _}}}) -> %% no diameter on remote node true; nomatch(L) -> [] == L. %% match/1 match({Node, _}) when Node == node() -> []; match({_, T}) -> try diameter_reg:match(T) catch _:_ -> [] end.