%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
-module(diameter_sctp).
-behaviour(gen_server).
%% interface
-export([start/3]).
%% child start from supervisor
-export([start_link/1]).
%% child start from here
-export([init/1]).
%% gen_server callbacks
-export([handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3,
terminate/2]).
-export([listener/1,%% diameter_sync callback
info/1]). %% service_info callback
-export([ports/0,
ports/1]).
-export_type([listen_option/0,
connect_option/0]).
-include_lib("kernel/include/inet_sctp.hrl").
-include_lib("diameter/include/diameter.hrl").
%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY, ref).
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
%% The default port for a listener.
-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1
%% Remote addresses to accept connections from.
-define(DEFAULT_ACCEPT, []). %% any
%% How long to wait for a transport process to attach after
%% association establishment.
-define(ACCEPT_TIMEOUT, 5000).
-type connect_option() :: {raddr, inet:ip_address()}
| {rport, inet:port_number()}
| term(). %% gen_sctp:open_option().
-type match() :: inet:ip_address()
| string()
| [match()].
-type listen_option() :: {accept, match()}
| term(). %% gen_sctp:open_option().
-type uint() :: non_neg_integer().
%% Accepting/connecting transport process state.
-record(transport,
{parent :: pid() | undefined,
mode :: {accept, pid()}
| accept
| {connect, {[inet:ip_address()], uint(), list()}}
%% {RAs, RP, Errors}
| connect,
socket :: gen_sctp:sctp_socket() | undefined,
assoc_id :: gen_sctp:assoc_id(), %% association identifier
peer :: {[inet:ip_address()], uint()} %% {RAs, RP}
| undefined,
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
os = 0 :: uint()}). %% next output stream
%% Listener process state.
-record(listener,
{ref :: reference(),
socket :: gen_sctp:sctp_socket(),
service = false :: false | pid(), %% service process
pending = {0, queue:new()},
accept :: [match()]}).
%% Field pending implements two queues: the first of transport-to-be
%% processes to which an association has been assigned but for which
%% diameter hasn't yet spawned a transport process, a short-lived
%% state of affairs as a new transport is spawned as a consequence of
%% a peer being taken up, transport processes being spawned by the
%% listener on demand; the second of started transport processes that
%% have not yet been assigned an association.
%%
%% When diameter calls start/3, the transport process is either taken
%% from the first queue or spawned and placed in the second queue
%% until an association is established. When an association is
%% established, a controlling process is either taken from the second
%% queue or spawned and placed in the first queue. Thus, there are
%% only elements in one queue at a time, so share an ets table queue
%% and tag it with a positive length if it contains the first queue, a
%% negative length if it contains the second queue.
%% ---------------------------------------------------------------------------
%% # start/3
%% ---------------------------------------------------------------------------
-spec start({accept, Ref}, #diameter_service{}, [listen_option()])
-> {ok, pid(), [inet:ip_address()]}
when Ref :: diameter:transport_ref();
({connect, Ref}, #diameter_service{}, [connect_option()])
-> {ok, pid(), [inet:ip_address()]}
when Ref :: diameter:transport_ref().
start(T, Svc, Opts)
when is_list(Opts) ->
#diameter_service{capabilities = Caps,
pid = SPid}
= Svc,
diameter_sctp_sup:start(), %% start supervisors on demand
Addrs = Caps#diameter_caps.host_ip_address,
s(T, Addrs, SPid, lists:map(fun ip/1, Opts)).
ip({ifaddr, A}) ->
{ip, A};
ip(T) ->
T.
%% A listener spawns transports either as a consequence of this call
%% when there is not yet an association to assign it, or at comm_up on
%% a new association in which case the call retrieves a transport from
%% the pending queue.
s({accept, Ref} = A, Addrs, SPid, Opts) ->
{ok, LPid, LAs} = listener(Ref, {Opts, Addrs}),
try gen_server:call(LPid, {A, self(), SPid}, infinity) of
{ok, TPid} ->
{ok, TPid, LAs};
No ->
{error, No}
catch
exit: Reason ->
{error, Reason}
end;
%% This implementation is due to there being no accept call in
%% gen_sctp in order to be able to accept a new association only
%% *after* an accepting transport has been spawned.
s({connect = C, Ref}, Addrs, _SPid, Opts) ->
diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}).
%% start_link/1
start_link(T) ->
proc_lib:start_link(?MODULE,
init,
[T],
infinity,
diameter_lib:spawn_opts(server, [])).
%% ---------------------------------------------------------------------------
%% # info/1
%% ---------------------------------------------------------------------------
info({gen_sctp, Sock}) ->
lists:flatmap(fun(K) -> info(K, Sock) end,
[{socket, socknames},
{peer, peernames},
{statistics, getstat}]).
info({K,F}, Sock) ->
case inet:F(Sock) of
{ok, V} ->
[{K, map(F,V)}];
_ ->
[]
end.
%% inet:{sock,peer}names/1 returns [{Addr, Port}] but the port number
%% should be the same in each tuple. Map to a {[Addr], Port} tuple if
%% so.
map(K, [{_, Port} | _] = APs)
when K == socknames;
K == peernames ->
try [A || {A,P} <- APs, P == Port orelse throw(?MODULE)] of
As -> {As, Port}
catch
?MODULE -> APs
end;
map(_, V) ->
V.
%% ---------------------------------------------------------------------------
%% # init/1
%% ---------------------------------------------------------------------------
init(T) ->
gen_server:enter_loop(?MODULE, [], i(T)).
%% i/1
%% A process owning a listening socket.
i({listen, Ref, {Opts, Addrs}}) ->
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
{[Matches], Rest} = proplists:split(Opts, [accept]),
{LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
ok = gen_sctp:listen(Sock, true),
true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}),
proc_lib:init_ack({ok, self(), LAs}),
#listener{ref = Ref,
socket = Sock,
accept = [[M] || {accept, M} <- Matches]};
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
{[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]),
RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As],
[RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps],
{LAs, Sock} = open(Addrs, Rest, 0),
putr(?REF_KEY, Ref),
proc_lib:init_ack({ok, self(), LAs}),
monitor(process, Pid),
#transport{parent = Pid,
mode = {connect, connect(Sock, RAs, RP, [])},
socket = Sock};
%% An accepting transport spawned by diameter, not yet owning an
%% association.
i({accept, Ref, LPid, Pid})
when is_pid(Pid) ->
putr(?REF_KEY, Ref),
proc_lib:init_ack({ok, self()}),
monitor(process, Pid),
MRef = monitor(process, LPid),
wait([{peeloff, MRef}], #transport{parent = Pid,
mode = {accept, LPid}});
%% An accepting transport spawned at association establishment, whose
%% parent is not yet known.
i({accept, Ref, LPid}) ->
putr(?REF_KEY, Ref),
proc_lib:init_ack({ok, self()}),
erlang:send_after(?ACCEPT_TIMEOUT, self(), accept_timeout),
MRef = monitor(process, LPid),
wait([{parent, Ref}, {peeloff, MRef}], #transport{mode = {accept, LPid}}).
%% wait/2
%%
%% Wait for diameter to start the transport process and for the
%% association to be peeled off before processing other messages.
wait(Keys, S) ->
lists:foldl(fun i/2, S, Keys).
i({K, Ref}, #transport{mode = {accept, _}} = S) ->
receive
{Ref, Pid} when K == parent -> %% transport process started
S#transport{parent = Pid};
{K, T, Matches} when K == peeloff -> %% association
{sctp, Sock, _RA, _RP, _Data} = T,
ok = accept_peer(Sock, Matches),
demonitor(Ref, [flush]),
t(T, S#transport{socket = Sock});
accept_timeout = T ->
x(T);
{'DOWN', _, process, _, _} = T ->
x(T)
end.
%% listener/2
%% Accepting processes can be started concurrently: ensure only one
%% listener is started.
listener(Ref, T) ->
diameter_sync:call({?MODULE, listener, Ref},
{?MODULE, listener, [{Ref, T}]},
infinity,
infinity).
listener({Ref, T}) ->
l(diameter_reg:match({?MODULE, listener, {Ref, '_'}}), Ref, T).
%% Existing listening process ...
l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
{LAs, _Sock} = AS,
{ok, LPid, LAs};
%% ... or not.
l([], Ref, T) ->
diameter_sctp_sup:start_child({listen, Ref, T}).
%% open/3
open(Addrs, Opts, PortNr) ->
{LAs, Os} = addrs(Addrs, Opts),
{LAs, case gen_sctp:open(gen_opts(portnr(Os, PortNr))) of
{ok, Sock} ->
Sock;
{error, Reason} ->
x({open, Reason})
end}.
addrs(Addrs, Opts) ->
case proplists:split(Opts, [ip]) of
{[[]], _} ->
{Addrs, Opts ++ [{ip, A} || A <- Addrs]};
{[As], Os} ->
LAs = [diameter_lib:ipaddr(A) || {ip, A} <- As],
{LAs, Os ++ [{ip, A} || A <- LAs]}
end.
portnr(Opts, PortNr) ->
case proplists:get_value(port, Opts) of
undefined ->
[{port, PortNr} | Opts];
_ ->
Opts
end.
%% x/1
x(Reason) ->
exit({shutdown, Reason}).
%% gen_opts/1
gen_opts(Opts) ->
{L,_} = proplists:split(Opts, [binary, list, mode, active, sctp_events]),
[[],[],[],[],[]] == L orelse ?ERROR({reserved_options, Opts}),
[binary, {active, once} | Opts].
%% ---------------------------------------------------------------------------
%% # ports/0-1
%% ---------------------------------------------------------------------------
ports() ->
Ts = diameter_reg:match({?MODULE, '_', '_'}),
[{type(T), N, Pid} || {{?MODULE, T, {_, {_, S}}}, Pid} <- Ts,
{ok, N} <- [inet:port(S)]].
ports(Ref) ->
Ts = diameter_reg:match({?MODULE, '_', {Ref, '_'}}),
[{type(T), N, Pid} || {{?MODULE, T, {R, {_, S}}}, Pid} <- Ts,
R == Ref,
{ok, N} <- [inet:port(S)]].
type(listener) ->
listen;
type(T) ->
T.
%% ---------------------------------------------------------------------------
%% # handle_call/3
%% ---------------------------------------------------------------------------
handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref} = S) ->
{TPid, NewS} = accept(Ref, Pid, S),
{reply, {ok, TPid}, NewS};
handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) ->
handle_call({T, Pid}, From, if not is_pid(P), is_pid(SPid) ->
monitor(process, SPid),
S#listener{service = SPid};
true ->
S
end);
handle_call(_, _, State) ->
{reply, nok, State}.
%% ---------------------------------------------------------------------------
%% # handle_cast/2
%% ---------------------------------------------------------------------------
handle_cast(_, State) ->
{noreply, State}.
%% ---------------------------------------------------------------------------
%% # handle_info/2
%% ---------------------------------------------------------------------------
handle_info(T, #transport{} = S) ->
{noreply, #transport{} = t(T,S)};
handle_info(T, #listener{} = S) ->
{noreply, #listener{} = l(T,S)}.
%% Prior to the possibility of setting pool_size on in transport
%% configuration, a new accepting transport was only started following
%% the death of a predecessor, so that there was only at most one
%% previously started transport process waiting for an association.
%% This assumption no longer holds with pool_size > 1, in which case
%% several accepting transports are started concurrently. Deal with
%% this by placing the started transports in a new queue of transport
%% processes waiting for an association.
%% ---------------------------------------------------------------------------
%% # code_change/3
%% ---------------------------------------------------------------------------
code_change(_, State, _) ->
{ok, State}.
%% ---------------------------------------------------------------------------
%% # terminate/2
%% ---------------------------------------------------------------------------
terminate(_, #transport{assoc_id = undefined}) ->
ok;
terminate(_, #transport{socket = Sock}) ->
gen_sctp:close(Sock);
terminate(_, #listener{socket = Sock}) ->
gen_sctp:close(Sock).
%% ---------------------------------------------------------------------------
putr(Key, Val) ->
put({?MODULE, Key}, Val).
getr(Key) ->
get({?MODULE, Key}).
%% l/2
%%
%% Transition listener state.
%% Incoming message from SCTP.
l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
accept = Matches}
= S) ->
Id = assoc_id(Data),
{TPid, NewS} = accept(S),
TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches},
setopts(Sock),
NewS;
%% Service process has died.
l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid,
socket = Sock}) ->
gen_sctp:close(Sock),
x(T);
%% Accepting process has died.
l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) ->
down(queue:member(TPid, Q), TPid, S);
%% Transport has been removed.
l({transport, remove, _} = T, #listener{socket = Sock}) ->
gen_sctp:close(Sock),
x(T).
%% down/3
%%
%% Accepting transport has died.
%% One that's waiting for transport start in the pending queue ...
down(true, TPid, #listener{pending = {N,Q}} = S) ->
NQ = queue:filter(fun(P) -> P /= TPid end, Q),
if N < 0 -> %% awaiting an association ...
S#listener{pending = {N+1, NQ}};
true -> %% ... or one has been assigned
S#listener{pending = {N-1, NQ}}
end;
%% ... or one that's already attached.
down(false, _TPid, S) ->
S.
%% t/2
%%
%% Transition transport state.
t(T,S) ->
case transition(T,S) of
ok ->
S;
#transport{} = NS ->
NS;
stop ->
x(T)
end.
%% transition/2
%% Incoming message.
transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
setopts(Sock),
recv(Data, S);
%% Outgoing message.
transition({diameter, {send, Msg}}, S) ->
send(Msg, S);
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
stop;
%% TLS over SCTP is described in RFC 3436 but has limitations as
%% described in RFC 6083. The latter describes DTLS over SCTP, which
%% addresses these limitations, DTLS itself being described in RFC
%% 4347. TLS is primarily used over TCP, which RFC 6733 acknowledges
%% by equating TLS with TLS/TCP and DTLS/SCTP.
transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
stop;
%% Parent process has died.
transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
stop;
%% Timeout after transport process has been started.
transition(accept_timeout, _) ->
ok;
%% Request for the local port number.
transition({resolve_port, Pid}, #transport{socket = Sock})
when is_pid(Pid) ->
Pid ! inet:port(Sock),
ok.
%% Crash on anything unexpected.
ok({ok, T}) ->
T;
ok(T) ->
x(T).
%% accept_peer/2
accept_peer(_, []) ->
ok;
accept_peer(Sock, Matches) ->
RAddrs = [A || {A,_} <- ok(inet:peernames(Sock))],
diameter_peer:match(RAddrs, Matches)
orelse x({accept, RAddrs, Matches}),
ok.
%% accept/3
%%
%% Start a new transport process or use one that's already been
%% started as a consequence of diameter requesting a transport
%% process.
accept(Ref, Pid, #listener{pending = {N,_}} = S) ->
{TPid, NQ} = q(Ref, Pid, S),
{TPid, S#listener{pending = {N-1, NQ}}}.
%% Pending associations: attach to the first in the queue.
q(_, Pid, #listener{ref = Ref,
pending = {N,Q}})
when 0 < N ->
{TPid, _} = T = dq(Q),
TPid ! {Ref, Pid},
T;
%% No pending associations: spawn a new transport.
q(Ref, Pid, #listener{pending = {_,Q}}) ->
nq({accept, Ref, self(), Pid}, Q).
%% send/2
%% Outbound Diameter message on a specified stream ...
send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}},
#transport{streams = {_, OS}}
= S) ->
send(SId rem OS, Bin, S),
S;
%% ... or not: rotate through all streams.
send(#diameter_packet{bin = Bin}, S) ->
send(Bin, S);
send(Bin, #transport{streams = {_, OS},
os = N}
= S)
when is_binary(Bin) ->
send(N, Bin, S),
S#transport{os = (N + 1) rem OS}.
%% send/3
send(StreamId, Bin, #transport{socket = Sock,
assoc_id = AId}) ->
send(Sock, AId, StreamId, Bin).
%% send/4
send(Sock, AssocId, Stream, Bin) ->
case gen_sctp:send(Sock, AssocId, Stream, Bin) of
ok ->
ok;
{error, Reason} ->
x({send, Reason})
end.
%% recv/2
%% Association established ...
recv({_, #sctp_assoc_change{state = comm_up,
outbound_streams = OS,
inbound_streams = IS,
assoc_id = Id}},
#transport{assoc_id = undefined,
mode = {T, _},
socket = Sock}
= S) ->
Ref = getr(?REF_KEY),
publish(T, Ref, Id, Sock),
up(S#transport{assoc_id = Id,
streams = {IS, OS}});
%% ... or not: try the next address.
recv({_, #sctp_assoc_change{} = E},
#transport{assoc_id = undefined,
socket = Sock,
mode = {connect = C, {[RA|RAs], RP, Es}}}
= S) ->
S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}};
%% Association failure.
recv({_, #sctp_assoc_change{}}, _) ->
stop;
%% Inbound Diameter message.
recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid})
when is_binary(Bin) ->
diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id},
bin = Bin}),
ok;
recv({_, #sctp_shutdown_event{assoc_id = A}},
#transport{assoc_id = Id})
when A == Id;
A == 0 ->
stop;
%% Note that diameter_sctp(3) documents that sctp_events cannot be
%% specified in the list of options passed to gen_sctp and that
%% gen_opts/1 guards against this. This is to ensure that we know what
%% events to expect and also to ensure that we receive
%% #sctp_sndrcvinfo{} with each incoming message (data_io_event =
%% true). Adaptation layer events (ie. #sctp_adaptation_event{}) are
%% disabled by default so don't handle it. We could simply disable
%% events we don't react to but don't.
recv({_, #sctp_paddr_change{}}, _) ->
ok;
recv({_, #sctp_pdapi_event{}}, _) ->
ok.
publish(T, Ref, Id, Sock) ->
true = diameter_reg:add_new({?MODULE, T, {Ref, {Id, Sock}}}),
putr(?INFO_KEY, {gen_sctp, Sock}). %% for info/1
%% up/1
up(#transport{parent = Pid,
mode = {connect = C, {[RA | _], RP, _}}}
= S) ->
diameter_peer:up(Pid, {RA,RP}),
S#transport{mode = C};
up(#transport{parent = Pid,
mode = {accept = A, _}}
= S) ->
diameter_peer:up(Pid),
S#transport{mode = A}.
%% accept/1
%%
%% Start a new transport process or use one that's already been
%% started as a consequence of an event to a listener process.
accept(#listener{pending = {N,_}} = S) ->
{TPid, NQ} = q(S),
{TPid, S#listener{pending = {N+1, NQ}}}.
%% Transport waiting for an association: use it.
q(#listener{pending = {N,Q}})
when N < 0 ->
dq(Q);
%% No transport start yet: spawn one and queue.
q(#listener{ref = Ref,
pending = {_,Q}}) ->
nq({accept, Ref, self()}, Q).
%% nq/2
%%
%% Place a transport process in the second pending queue to make it
%% available to the next association.
nq(Arg, Q) ->
{ok, TPid} = diameter_sctp_sup:start_child(Arg),
monitor(process, TPid),
{TPid, queue:in(TPid, Q)}.
%% dq/1
%%
%% Remove a transport process from the first pending queue to assign
%% it to an existing association.
dq(Q) ->
{{value, TPid}, NQ} = queue:out(Q),
{TPid, NQ}.
%% assoc_id/1
%%
%% It's unclear if this is needed, or if the first message on an
%% association is always sctp_assoc_change, but don't assume since
%% SCTP behaviour differs between operating systems.
assoc_id({[#sctp_sndrcvinfo{assoc_id = Id}], _}) ->
Id;
assoc_id({_, Rec}) ->
id(Rec).
id(#sctp_shutdown_event{assoc_id = Id}) ->
Id;
id(#sctp_assoc_change{assoc_id = Id}) ->
Id;
id(#sctp_sndrcvinfo{assoc_id = Id}) ->
Id;
id(#sctp_paddr_change{assoc_id = Id}) ->
Id;
id(#sctp_adaptation_event{assoc_id = Id}) ->
Id.
%% peeloff/3
peeloff(LSock, Id, TPid) ->
{ok, Sock} = gen_sctp:peeloff(LSock, Id),
ok = gen_sctp:controlling_process(Sock, TPid),
Sock.
%% connect/4
connect(_, [], _, Reasons) ->
x({connect, lists:reverse(Reasons)});
connect(Sock, [Addr | AT] = As, Port, Reasons) ->
case gen_sctp:connect_init(Sock, Addr, Port, []) of
ok ->
{As, Port, Reasons};
{error, _} = E ->
connect(Sock, AT, Port, [{Addr, E} | Reasons])
end.
%% setopts/1
setopts(Sock) ->
case inet:setopts(Sock, [{active, once}]) of
ok -> ok;
X -> x({setopts, Sock, X}) %% possibly on peer disconnect
end.