%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2010-2015. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. %% %% %CopyrightEnd% %% -module(diameter_sctp). -behaviour(gen_server). -compile({no_auto_import, [now/0]}). -import(diameter_lib, [now/0]). %% interface -export([start/3]). %% child start from supervisor -export([start_link/1]). %% child start from here -export([init/1]). %% gen_server callbacks -export([handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). -export([listener/1,%% diameter_sync callback info/1]). %% service_info callback -export([ports/0, ports/1]). -export_type([listen_option/0, connect_option/0]). -include_lib("kernel/include/inet_sctp.hrl"). -include_lib("diameter/include/diameter.hrl"). %% Keys into process dictionary. -define(INFO_KEY, info). -define(REF_KEY, ref). -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). %% The default port for a listener. -define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 %% Remote addresses to accept connections from. -define(DEFAULT_ACCEPT, []). %% any %% How long a listener with no associations lives before offing %% itself. -define(LISTENER_TIMEOUT, 30000). %% How long to wait for a transport process to attach after %% association establishment. -define(ACCEPT_TIMEOUT, 5000). -type connect_option() :: {raddr, inet:ip_address()} | {rport, inet:port_number()} | term(). %% gen_sctp:open_option(). -type match() :: inet:ip_address() | string() | [match()]. -type listen_option() :: {accept, match()} | term(). %% gen_sctp:open_option(). -type uint() :: non_neg_integer(). %% Accepting/connecting transport process state. -record(transport, {parent :: pid(), mode :: {accept, pid()} | accept | {connect, {[inet:ip_address()], uint(), list()}} %% {RAs, RP, Errors} | connect, socket :: gen_sctp:sctp_socket(), assoc_id :: gen_sctp:assoc_id(), %% association identifier peer :: {[inet:ip_address()], uint()}, %% {RAs, RP} streams :: {uint(), uint()}, %% {InStream, OutStream} counts os = 0 :: uint()}). %% next output stream %% Listener process state. -record(listener, {ref :: reference(), socket :: gen_sctp:sctp_socket(), count = 0 :: uint(), %% attached transport processes pending = {0, ets:new(?MODULE, [ordered_set])}, tref :: reference(), accept :: [match()]}). %% Field pending implements two queues: the first of transport-to-be %% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived %% state of affairs as a new transport is spawned as a consequence of %% a peer being taken up, transport processes being spawned by the %% listener on demand; the second of started transport processes that %% have not yet been assigned an association. %% %% When diameter calls start/3, the transport process is either taken %% from the first queue or spawned and placed in the second queue %% until an association is established. When an association is %% established, a controlling process is either taken from the second %% queue or spawned and placed in the first queue. Thus, there are %% only elements in one queue at a time, so share an ets table queue %% and tag it with a positive length if it contains the first queue, a %% negative length if it contains the second queue. The case -1 is %% handled differently for backwards compatibility reasons. %% --------------------------------------------------------------------------- %% # start/3 %% --------------------------------------------------------------------------- -spec start({accept, Ref}, #diameter_service{}, [listen_option()]) -> {ok, pid(), [inet:ip_address()]} when Ref :: diameter:transport_ref(); ({connect, Ref}, #diameter_service{}, [connect_option()]) -> {ok, pid(), [inet:ip_address()]} when Ref :: diameter:transport_ref(). start(T, #diameter_service{capabilities = Caps}, Opts) when is_list(Opts) -> diameter_sctp_sup:start(), %% start supervisors on demand Addrs = Caps#diameter_caps.host_ip_address, s(T, Addrs, lists:map(fun ip/1, Opts)). ip({ifaddr, A}) -> {ip, A}; ip(T) -> T. %% A listener spawns transports either as a consequence of this call %% when there is not yet an association to assign it, or at comm_up on %% a new association in which case the call retrieves a transport from %% the pending queue. s({accept, Ref} = A, Addrs, Opts) -> {LPid, LAs} = listener(Ref, {Opts, Addrs}), try gen_server:call(LPid, {A, self()}, infinity) of {ok, TPid} -> {ok, TPid, LAs} catch exit: Reason -> {error, Reason} end; %% This implementation is due to there being no accept call in %% gen_sctp in order to be able to accept a new association only %% *after* an accepting transport has been spawned. s({connect = C, Ref}, Addrs, Opts) -> diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}). %% start_link/1 start_link(T) -> proc_lib:start_link(?MODULE, init, [T], infinity, diameter_lib:spawn_opts(server, [])). %% --------------------------------------------------------------------------- %% # info/1 %% --------------------------------------------------------------------------- info({gen_sctp, Sock}) -> lists:flatmap(fun(K) -> info(K, Sock) end, [{socket, socknames}, {peer, peernames}, {statistics, getstat}]). info({K,F}, Sock) -> case inet:F(Sock) of {ok, V} -> [{K, map(F,V)}]; _ -> [] end. %% inet:{sock,peer}names/1 returns [{Addr, Port}] but the port number %% should be the same in each tuple. Map to a {[Addr], Port} tuple if %% so. map(K, [{_, Port} | _] = APs) when K == socknames; K == peernames -> try [A || {A,P} <- APs, P == Port orelse throw(?MODULE)] of As -> {As, Port} catch ?MODULE -> APs end; map(_, V) -> V. %% --------------------------------------------------------------------------- %% # init/1 %% --------------------------------------------------------------------------- init(T) -> gen_server:enter_loop(?MODULE, [], i(T)). %% i/1 %% A process owning a listening socket. i({listen, Ref, {Opts, Addrs}}) -> {[Matches], Rest} = proplists:split(Opts, [accept]), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), proc_lib:init_ack({ok, self(), LAs}), start_timer(#listener{ref = Ref, socket = Sock, accept = accept(Matches)}); %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]), RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As], [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], {LAs, Sock} = open(Addrs, Rest, 0), putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self(), LAs}), monitor(process, Pid), #transport{parent = Pid, mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock}; %% An accepting transport spawned by diameter. i({accept, Pid, LPid, Sock, Ref}) when is_pid(Pid) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), monitor(process, Pid), monitor(process, LPid), #transport{parent = Pid, mode = {accept, LPid}, socket = Sock}; %% An accepting transport spawned at association establishment. i({accept, Ref, LPid, Sock, Id}) -> putr(?REF_KEY, Ref), proc_lib:init_ack({ok, self()}), MRef = monitor(process, LPid), %% Wait for a signal that the transport has been started before %% processing other messages. receive {Ref, Pid} -> %% transport started #transport{parent = Pid, mode = {accept, LPid}, socket = Sock}; {'DOWN', MRef, process, _, _} = T -> %% listener down close(Sock, Id), x(T) after ?ACCEPT_TIMEOUT -> close(Sock, Id), x(timeout) end. %% close/2 close(Sock, Id) -> gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Id}). %% Having to pass a record here is hokey. %% listener/2 %% Accepting processes can be started concurrently: ensure only one %% listener is started. listener(LRef, T) -> diameter_sync:call({?MODULE, listener, LRef}, {?MODULE, listener, [{LRef, T}]}, infinity, infinity). listener({LRef, T}) -> l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T). %% Existing listening process ... l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> {LAs, _Sock} = AS, {LPid, LAs}; %% ... or not. l([], LRef, T) -> {ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}), {LPid, LAs}. %% open/3 open(Addrs, Opts, PortNr) -> {LAs, Os} = addrs(Addrs, Opts), {LAs, case gen_sctp:open(gen_opts(portnr(Os, PortNr))) of {ok, Sock} -> Sock; {error, Reason} -> x({open, Reason}) end}. addrs(Addrs, Opts) -> case proplists:split(Opts, [ip]) of {[[]], _} -> {Addrs, Opts ++ [{ip, A} || A <- Addrs]}; {[As], Os} -> LAs = [diameter_lib:ipaddr(A) || {ip, A} <- As], {LAs, Os ++ [{ip, A} || A <- LAs]} end. portnr(Opts, PortNr) -> case proplists:get_value(port, Opts) of undefined -> [{port, PortNr} | Opts]; _ -> Opts end. %% x/1 x(Reason) -> exit({shutdown, Reason}). %% gen_opts/1 gen_opts(Opts) -> {L,_} = proplists:split(Opts, [binary, list, mode, active, sctp_events]), [[],[],[],[],[]] == L orelse ?ERROR({reserved_options, Opts}), [binary, {active, once} | Opts]. %% --------------------------------------------------------------------------- %% # ports/0-1 %% --------------------------------------------------------------------------- ports() -> Ts = diameter_reg:match({?MODULE, '_', '_'}), [{type(T), N, Pid} || {{?MODULE, T, {_, {_, S}}}, Pid} <- Ts, {ok, N} <- [inet:port(S)]]. ports(Ref) -> Ts = diameter_reg:match({?MODULE, '_', {Ref, '_'}}), [{type(T), N, Pid} || {{?MODULE, T, {R, {_, S}}}, Pid} <- Ts, R == Ref, {ok, N} <- [inet:port(S)]]. type(listener) -> listen; type(T) -> T. %% --------------------------------------------------------------------------- %% # handle_call/3 %% --------------------------------------------------------------------------- handle_call(T, From, #listener{pending = L} = S) when is_list(L) -> handle_call(T, From, upgrade(S)); handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref, pending = {N,Q}, count = K} = S) -> TPid = accept(Ref, Pid, S), {reply, {ok, TPid}, downgrade(S#listener{pending = {N-1,Q}, count = K+1})}; handle_call(_, _, State) -> {reply, nok, State}. %% --------------------------------------------------------------------------- %% # handle_cast/2 %% --------------------------------------------------------------------------- handle_cast(_, State) -> {noreply, State}. %% --------------------------------------------------------------------------- %% # handle_info/2 %% --------------------------------------------------------------------------- handle_info(T, #transport{} = S) -> {noreply, #transport{} = t(T,S)}; handle_info(T, #listener{pending = L} = S) when is_list(L) -> handle_info(T, upgrade(S)); handle_info(T, #listener{} = S) -> {noreply, downgrade(#listener{} = l(T,S))}. %% upgrade/1 upgrade(#listener{pending = [TPid | {0,Q}]} = S) -> ets:insert(Q, {TPid, now()}), S#listener{pending = {-1,Q}}. %% Prior to the possiblity of setting pool_size on in transport %% configuration, a new accepting transport was only started following %% the death of a predecessor, so that there was only at most one %% previously started transport process waiting for an association. %% This assumption no longer holds with pool_size > 1, in which case %% several accepting transports are started concurrently. Deal with %% this by placing the started transports in a new queue of transport %% processes waiting for an association. %% %% Since only one of this queue and the existing queue of controlling %% processes waiting for a transport to be started can be non-empty at %% any given time, implement both queues in the same ets table. The %% absolute value of the first element of the 2-tuple is the queue %% length, the sign says which queue it is. %% downgrade/1 %% %% Revert to the pre-pool_size representation when possible, for %% backwards compatibility in the case that the pool_size option %% hasn't been used. downgrade(#listener{pending = {-1,Q}} = S) -> TPid = ets:first(Q), ets:delete(Q, TPid), S#listener{pending = [TPid | {0,Q}]}; downgrade(S) -> S. %% --------------------------------------------------------------------------- %% # code_change/3 %% --------------------------------------------------------------------------- code_change(_, State, _) -> {ok, State}. %% --------------------------------------------------------------------------- %% # terminate/2 %% --------------------------------------------------------------------------- terminate(_, #transport{assoc_id = undefined}) -> ok; terminate(_, #transport{socket = Sock, mode = accept, assoc_id = Id}) -> close(Sock, Id); terminate(_, #transport{socket = Sock, mode = {accept, _}, assoc_id = Id}) -> close(Sock, Id); terminate(_, #transport{socket = Sock}) -> gen_sctp:close(Sock); terminate(_, #listener{socket = Sock}) -> gen_sctp:close(Sock). %% --------------------------------------------------------------------------- putr(Key, Val) -> put({?MODULE, Key}, Val). getr(Key) -> get({?MODULE, Key}). %% start_timer/1 start_timer(#listener{count = 0} = S) -> S#listener{tref = erlang:start_timer(?LISTENER_TIMEOUT, self(), close)}; start_timer(S) -> S. %% l/2 %% %% Transition listener state. %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) -> Id = assoc_id(Data), {TPid, NewS} = accept(Id, S), TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept}, setopts(Sock), NewS; l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> down(ets:member(Q, TPid), TPid, S); %% Timeout after the last accepting process has died. l({timeout, TRef, close = T}, #listener{tref = TRef, count = 0}) -> x(T); l({timeout, _, close}, #listener{} = S) -> S. %% down/3 %% %% Accepting transport has died. %% One that's waiting for transport start in the pending queue ... down(true, TPid, #listener{pending = {N,Q}, count = K} = S) -> ets:delete(Q, TPid), if N < 0 -> %% awaiting an association ... start_timer(S#listener{count = K-1, pending = {N+1,Q}}); true -> %% ... or one has been assigned S#listener{pending = {N-1,Q}} end; %% ... or one that's already attached. down(false, _TPid, #listener{count = K} = S) -> start_timer(S#listener{count = K-1}). %% t/2 %% %% Transition transport state. t(T,S) -> case transition(T,S) of ok -> S; #transport{} = NS -> NS; stop -> x(T) end. %% transition/2 %% Listening process is transfering ownership of an association. transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg, Matches}, #transport{mode = {accept, _}, socket = LSock} = S) -> ok = accept_peer(Sock, Matches), transition(setelement(2, Msg, Sock), S#transport{socket = Sock}); %% Incoming message. transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> setopts(Sock), recv(Data, S); %% Outgoing message. transition({diameter, {send, Msg}}, S) -> send(Msg, S); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> stop; %% TLS over SCTP is described in RFC 3436 but has limitations as %% described in RFC 6083. The latter describes DTLS over SCTP, which %% addresses these limitations, DTLS itself being described in RFC %% 4347. TLS is primarily used over TCP, which RFC 6733 acknowledges %% by equating TLS with TLS/TCP and DTLS/SCTP. transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> stop; %% Parent process has died. transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> stop; %% Listener process has died. transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) -> stop; %% Ditto but we have ownership of the association. It might be that %% we'll go down anyway though. transition({'DOWN', _, process, _Pid, _}, #transport{mode = accept}) -> ok; %% Request for the local port number. transition({resolve_port, Pid}, #transport{socket = Sock}) when is_pid(Pid) -> Pid ! inet:port(Sock), ok. %% Crash on anything unexpected. ok({ok, T}) -> T; ok(T) -> x(T). %% accept_peer/2 accept_peer(_, []) -> ok; accept_peer(Sock, Matches) -> RAddrs = [A || {A,_} <- ok(inet:peernames(Sock))], diameter_peer:match(RAddrs, Matches) orelse x({accept, RAddrs, Matches}), ok. %% accept/1 accept(Opts) -> [[M] || {accept, M} <- Opts]. %% accept/3 %% %% Start a new transport process or use one that's already been %% started as a consequence of diameter requesting a transport %% process. %% Pending associations: attach to the first in the queue. accept(_, Pid, #listener{ref = Ref, pending = {N,Q}}) when 0 < N -> TPid = dq(Q), TPid ! {Ref, Pid}, TPid; %% No pending associations: spawn a new transport. accept(Ref, Pid, #listener{socket = Sock, pending = {_,Q}}) -> nq({accept, Pid, self(), Sock, Ref}, Q). %% send/2 %% Outbound Diameter message on a specified stream ... send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, #transport{streams = {_, OS}} = S) -> send(SId rem OS, Bin, S), S; %% ... or not: rotate through all streams. send(#diameter_packet{bin = Bin}, S) -> send(Bin, S); send(Bin, #transport{streams = {_, OS}, os = N} = S) when is_binary(Bin) -> send(N, Bin, S), S#transport{os = (N + 1) rem OS}. %% send/3 send(StreamId, Bin, #transport{socket = Sock, assoc_id = AId}) -> send(Sock, AId, StreamId, Bin). %% send/4 send(Sock, AssocId, Stream, Bin) -> case gen_sctp:send(Sock, AssocId, Stream, Bin) of ok -> ok; {error, Reason} -> x({send, Reason}) end. %% recv/2 %% Association established ... recv({_, #sctp_assoc_change{state = comm_up, outbound_streams = OS, inbound_streams = IS, assoc_id = Id}}, #transport{assoc_id = undefined, mode = {T, _}, socket = Sock} = S) -> Ref = getr(?REF_KEY), publish(T, Ref, Id, Sock), up(S#transport{assoc_id = Id, streams = {IS, OS}}); %% ... or not: try the next address. recv({_, #sctp_assoc_change{} = E}, #transport{assoc_id = undefined, socket = Sock, mode = {connect = C, {[RA|RAs], RP, Es}}} = S) -> S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}}; %% Association failure. recv({_, #sctp_assoc_change{}}, _) -> stop; %% Inbound Diameter message. recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid}) when is_binary(Bin) -> diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id}, bin = Bin}), ok; recv({_, #sctp_shutdown_event{assoc_id = A}}, #transport{assoc_id = Id}) when A == Id; A == 0 -> stop; %% Note that diameter_sctp(3) documents that sctp_events cannot be %% specified in the list of options passed to gen_sctp and that %% gen_opts/1 guards against this. This is to ensure that we know what %% events to expect and also to ensure that we receive %% #sctp_sndrcvinfo{} with each incoming message (data_io_event = %% true). Adaptation layer events (ie. #sctp_adaptation_event{}) are %% disabled by default so don't handle it. We could simply disable %% events we don't react to but don't. recv({_, #sctp_paddr_change{}}, _) -> ok; recv({_, #sctp_pdapi_event{}}, _) -> ok. publish(T, Ref, Id, Sock) -> true = diameter_reg:add_new({?MODULE, T, {Ref, {Id, Sock}}}), putr(?INFO_KEY, {gen_sctp, Sock}). %% for info/1 %% up/1 up(#transport{parent = Pid, mode = {connect = C, {[RA | _], RP, _}}} = S) -> diameter_peer:up(Pid, {RA,RP}), S#transport{mode = C}; up(#transport{parent = Pid, mode = {accept = A, _}} = S) -> diameter_peer:up(Pid), S#transport{mode = A}. %% accept/2 %% %% Start a new transport process or use one that's already been %% started as a consequence of an event to a listener process. accept(Id, #listener{pending = {N,Q}} = S) -> {tpid(Id, S), S#listener{pending = {N+1,Q}}}. %% tpid/2 %% Transport waiting for an association: use it. tpid(_Id, #listener{pending = {N,Q}}) when N < 0 -> dq(Q); %% No transport start yet: spawn one and queue. tpid(Id, #listener{ref = Ref, socket = Sock, pending = {_,Q}}) -> nq({accept, Ref, self(), Sock, Id}, Q). %% nq/2 %% %% Place a transport process in the second pending queue to make it %% available to the next association. nq(Arg, Q) -> {ok, TPid} = diameter_sctp_sup:start_child(Arg), monitor(process, TPid), ets:insert(Q, {TPid, now()}), TPid. %% dq/1 %% %% Remove a transport process from the first pending queue to assign %% it to an existing association. dq(Q) -> TPid = ets:first(Q), ets:delete(Q, TPid), TPid. %% assoc_id/1 %% %% It's unclear if this is needed, or if the first message on an %% association is always sctp_assoc_change, but don't assume since %% SCTP behaviour differs between operating systems. assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) -> Id; assoc_id({_, Rec}) -> id(Rec). id(#sctp_shutdown_event{assoc_id = Id}) -> Id; id(#sctp_assoc_change{assoc_id = Id}) -> Id; id(#sctp_sndrcvinfo{assoc_id = Id}) -> Id; id(#sctp_paddr_change{assoc_id = Id}) -> Id; id(#sctp_adaptation_event{assoc_id = Id}) -> Id. %% peeloff/3 peeloff(LSock, Id, TPid) -> {ok, Sock} = gen_sctp:peeloff(LSock, Id), ok = gen_sctp:controlling_process(Sock, TPid), Sock. %% connect/4 connect(_, [], _, Reasons) -> x({connect, lists:reverse(Reasons)}); connect(Sock, [Addr | AT] = As, Port, Reasons) -> case gen_sctp:connect_init(Sock, Addr, Port, []) of ok -> {As, Port, Reasons}; {error, _} = E -> connect(Sock, AT, Port, [{Addr, E} | Reasons]) end. %% setopts/1 setopts(Sock) -> case inet:setopts(Sock, [{active, once}]) of ok -> ok; X -> x({setopts, Sock, X}) %% possibly on peer disconnect end.