aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_sctp.erl
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2011-05-18 18:29:12 +0200
committerAnders Svensson <[email protected]>2011-05-18 18:29:12 +0200
commit3c15ff32e89e401b4dde2b8acc9699be2614b996 (patch)
tree184dc988fb2ab3af04a532bc59cc794a8d74fbd3 /lib/diameter/src/transport/diameter_sctp.erl
parentb1e768e86593178810c8a0b3c38443dcf6be5181 (diff)
downloadotp-3c15ff32e89e401b4dde2b8acc9699be2614b996.tar.gz
otp-3c15ff32e89e401b4dde2b8acc9699be2614b996.tar.bz2
otp-3c15ff32e89e401b4dde2b8acc9699be2614b996.zip
Initial commit of the diameter application.
The application provides an implementation of the Diameter protocol as defined in RFC 3588.
Diffstat (limited to 'lib/diameter/src/transport/diameter_sctp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl624
1 files changed, 624 insertions, 0 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
new file mode 100644
index 0000000000..92aa8488a0
--- /dev/null
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -0,0 +1,624 @@
+%%
+%% %CopyrightBegin%
+%%
+%% Copyright Ericsson AB 2010-2011. All Rights Reserved.
+%%
+%% The contents of this file are subject to the Erlang Public License,
+%% Version 1.1, (the "License"); you may not use this file except in
+%% compliance with the License. You should have received a copy of the
+%% Erlang Public License along with this software. If not, it can be
+%% retrieved online at http://www.erlang.org/.
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% %CopyrightEnd%
+%%
+
+-module(diameter_sctp).
+
+-behaviour(gen_server).
+
+%% interface
+-export([start/3]).
+
+%% child start from supervisor
+-export([start_link/1]).
+
+%% child start from here
+-export([init/1]).
+
+%% gen_server callbacks
+-export([handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ code_change/3,
+ terminate/2]).
+
+-include_lib("kernel/include/inet_sctp.hrl").
+-include_lib("diameter/include/diameter.hrl").
+
+-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
+
+%% The default port for a listener.
+-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1
+
+%% How long a listener with no associations lives before offing
+%% itself.
+-define(LISTENER_TIMEOUT, 30000).
+
+%% How long to wait for a transport process to attach after
+%% association establishment.
+-define(ACCEPT_TIMEOUT, 5000).
+
+-type uint() :: non_neg_integer().
+
+%% Accepting/connecting transport process state.
+-record(transport,
+ {parent :: pid(),
+ mode :: {accept, pid()}
+ | {connect, {list(inet:ip_address()), uint(), list()}}
+ %% {RAs, RP, Errors}
+ | connect,
+ socket :: gen_sctp:sctp_socket(),
+ assoc_id :: gen_sctp:assoc_id(), %% association identifier
+ peer :: {[inet:ip_address()], uint()}, %% {RAs, RP}
+ streams :: {uint(), uint()}, %% {InStream, OutStream} counts
+ os = 0 :: uint()}). %% next output stream
+
+%% Listener process state.
+-record(listener,
+ {ref :: reference(),
+ socket :: gen_sctp:sctp_socket(),
+ count = 0 :: uint(),
+ tmap = ets:new(?MODULE, []) :: ets:tid(),
+ %% {MRef, Pid|AssocId}, {AssocId, Pid}
+ pending = {0, ets:new(?MODULE, [ordered_set])},
+ tref :: reference()}).
+%% Field tmap is used to map an incoming message or event to the
+%% relevent transport process. Field pending implements a queue of
+%% transport processes to which an association has been assigned (at
+%% comm_up and written into tmap) but for which diameter hasn't yet
+%% spawned a transport process: a short-lived state of affairs as a
+%% new transport is spawned as a consequence of a peer being taken up,
+%% transport processes being spawned by the listener on demand. In
+%% case diameter starts a transport before comm_up on a new
+%% association, pending is set to an improper list with the spawned
+%% transport as head and the queue as tail.
+
+%% ---------------------------------------------------------------------------
+%% # start/3
+%% ---------------------------------------------------------------------------
+
+start(T, #diameter_service{capabilities = Caps}, Opts)
+ when is_list(Opts) ->
+ diameter_sctp_sup:start(), %% start supervisors on demand
+ Addrs = Caps#diameter_caps.host_ip_address,
+ s(T, Addrs, lists:map(fun ip/1, Opts)).
+
+ip({ifaddr, A}) ->
+ {ip, A};
+ip(T) ->
+ T.
+
+%% A listener spawns transports either as a consequence of this call
+%% when there is not yet an association to associate with it, or at
+%% comm_up on a new association in which case the call retrieves a
+%% transport from the pending queue.
+s({accept, Ref} = A, Addrs, Opts) ->
+ {LPid, LAs} = listener(Ref, {Opts, Addrs}),
+ try gen_server:call(LPid, {A, self()}, infinity) of
+ {ok, TPid} -> {ok, TPid, LAs}
+ catch
+ exit: Reason -> {error, Reason}
+ end;
+%% This implementation is due to there being no accept call in
+%% gen_sctp in order to be able to accept a new association only
+%% *after* an accepting transport has been spawned.
+
+s({connect = C, _}, Addrs, Opts) ->
+ diameter_sctp_sup:start_child({C, self(), Opts, Addrs}).
+
+%% start_link/1
+
+start_link(T) ->
+ proc_lib:start_link(?MODULE,
+ init,
+ [T],
+ infinity,
+ diameter_lib:spawn_opts(server, [])).
+
+%% ---------------------------------------------------------------------------
+%% # init/1
+%% ---------------------------------------------------------------------------
+
+init(T) ->
+ gen_server:enter_loop(?MODULE, [], i(T)).
+
+%% i/1
+
+%% A process owning a listening socket.
+i({listen, Ref, {Opts, Addrs}}) ->
+ {LAs, Sock} = AS = open(Addrs, Opts, ?DEFAULT_PORT),
+ proc_lib:init_ack({ok, self(), LAs}),
+ ok = gen_sctp:listen(Sock, true),
+ true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}),
+ start_timer(#listener{ref = Ref,
+ socket = Sock});
+
+%% A connecting transport.
+i({connect, Pid, Opts, Addrs}) ->
+ {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]),
+ RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As],
+ [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps],
+ {LAs, Sock} = open(Addrs, Rest, 0),
+ proc_lib:init_ack({ok, self(), LAs}),
+ erlang:monitor(process, Pid),
+ #transport{parent = Pid,
+ mode = {connect, connect(Sock, RAs, RP, [])},
+ socket = Sock};
+
+%% An accepting transport spawned by diameter.
+i({accept, Pid, LPid, Sock}) ->
+ proc_lib:init_ack({ok, self()}),
+ erlang:monitor(process, Pid),
+ erlang:monitor(process, LPid),
+ #transport{parent = Pid,
+ mode = {accept, LPid},
+ socket = Sock};
+
+%% An accepting transport spawned at association establishment.
+i({accept, Ref, LPid, Sock, Id}) ->
+ proc_lib:init_ack({ok, self()}),
+ MRef = erlang:monitor(process, LPid),
+ %% Wait for a signal that the transport has been started before
+ %% processing other messages.
+ receive
+ {Ref, Pid} -> %% transport started
+ #transport{parent = Pid,
+ mode = {accept, LPid},
+ socket = Sock};
+ {'DOWN', MRef, process, _, _} = T -> %% listener down
+ close(Sock, Id),
+ x(T)
+ after ?ACCEPT_TIMEOUT ->
+ close(Sock, Id),
+ x(timeout)
+ end.
+
+%% close/2
+
+close(Sock, Id) ->
+ gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Id}).
+%% Having to pass a record here is hokey.
+
+%% listener/2
+
+listener(LRef, T) ->
+ l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T).
+
+%% Existing process with the listening socket ...
+l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
+ {LAs, _Sock} = AS,
+ {LPid, LAs};
+
+%% ... or not: start one.
+l([], LRef, T) ->
+ {ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}),
+ {LPid, LAs}.
+
+%% open/3
+
+open(Addrs, Opts, PortNr) ->
+ {LAs, Os} = addrs(Addrs, Opts),
+ {LAs, case gen_sctp:open(gen_opts(portnr(Os, PortNr))) of
+ {ok, Sock} ->
+ Sock;
+ {error, Reason} ->
+ x({open, Reason})
+ end}.
+
+addrs(Addrs, Opts) ->
+ case proplists:split(Opts, [ip]) of
+ {[[]], _} ->
+ {Addrs, Opts ++ [{ip, A} || A <- Addrs]};
+ {[As], Os} ->
+ LAs = [diameter_lib:ipaddr(A) || {ip, A} <- As],
+ {LAs, Os ++ [{ip, A} || A <- LAs]}
+ end.
+
+portnr(Opts, PortNr) ->
+ case proplists:get_value(port, Opts) of
+ undefined ->
+ [{port, PortNr} | Opts];
+ _ ->
+ Opts
+ end.
+
+%% x/1
+
+x(Reason) ->
+ exit({shutdown, Reason}).
+
+%% gen_opts/1
+
+gen_opts(Opts) ->
+ {L,_} = proplists:split(Opts, [binary, list, mode, active, sctp_events]),
+ [[],[],[],[],[]] == L orelse ?ERROR({reserved_options, Opts}),
+ [binary, {active, once} | Opts].
+
+%% ---------------------------------------------------------------------------
+%% # handle_call/3
+%% ---------------------------------------------------------------------------
+
+handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref,
+ count = N}
+ = S) ->
+ {TPid, NewS} = accept(Pid, S),
+ {reply, {ok, TPid}, NewS#listener{count = N+1}};
+
+handle_call(_, _, State) ->
+ {reply, nok, State}.
+
+%% ---------------------------------------------------------------------------
+%% # handle_cast/2
+%% ---------------------------------------------------------------------------
+
+handle_cast(_, State) ->
+ {noreply, State}.
+
+%% ---------------------------------------------------------------------------
+%% # handle_info/2
+%% ---------------------------------------------------------------------------
+
+handle_info(T, #transport{} = S) ->
+ {noreply, #transport{} = t(T,S)};
+
+handle_info(T, #listener{} = S) ->
+ {noreply, #listener{} = l(T,S)}.
+
+%% ---------------------------------------------------------------------------
+%% # code_change/3
+%% ---------------------------------------------------------------------------
+
+code_change(_, State, _) ->
+ {ok, State}.
+
+%% ---------------------------------------------------------------------------
+%% # terminate/2
+%% ---------------------------------------------------------------------------
+
+terminate(_, #transport{assoc_id = undefined}) ->
+ ok;
+
+terminate(_, #transport{socket = Sock,
+ mode = {accept, _},
+ assoc_id = Id}) ->
+ close(Sock, Id);
+
+terminate(_, #transport{socket = Sock}) ->
+ gen_sctp:close(Sock);
+
+terminate(_, #listener{socket = Sock}) ->
+ gen_sctp:close(Sock).
+
+%% ---------------------------------------------------------------------------
+
+%% start_timer/1
+
+start_timer(#listener{count = 0} = S) ->
+ S#listener{tref = erlang:start_timer(?LISTENER_TIMEOUT, self(), close)};
+start_timer(S) ->
+ S.
+
+%% l/2
+%%
+%% Transition listener state.
+
+%% Incoming message from SCTP.
+l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) ->
+ setopts(Sock),
+ case find(Data, S) of
+ {TPid, NewS} ->
+ TPid ! Msg,
+ NewS;
+ false ->
+ S
+ end;
+
+%% Transport is asking message to be sent. See send/3 for why the send
+%% isn't directly from the transport.
+l({send, AssocId, StreamId, Bin}, #listener{socket = Sock} = S) ->
+ send(Sock, AssocId, StreamId, Bin),
+ S;
+
+%% Accepting transport has died. One that's awaiting an association ...
+l({'DOWN', MRef, process, TPid, _}, #listener{pending = [TPid | Q],
+ tmap = T,
+ count = N}
+ = S) ->
+ ets:delete(T, MRef),
+ ets:delete(T, TPid),
+ start_timer(S#listener{count = N-1,
+ pending = Q});
+
+%% ... ditto and a new transport has already been started ...
+l({'DOWN', _, process, _, _} = T, #listener{pending = [TPid | Q]}
+ = S) ->
+ #listener{pending = NQ}
+ = NewS
+ = l(T, S#listener{pending = Q}),
+ NewS#listener{pending = [TPid | NQ]};
+
+%% ... or not.
+l({'DOWN', MRef, process, TPid, _}, #listener{socket = Sock,
+ tmap = T,
+ count = N,
+ pending = {P,Q}}
+ = S) ->
+ [{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId
+ ets:delete(T, MRef),
+ ets:delete(T, Id),
+ Id == TPid orelse close(Sock, Id),
+ case ets:lookup(Q, TPid) of
+ [{TPid, _}] -> %% transport in the pending queue ...
+ ets:delete(Q, TPid),
+ S#listener{pending = {P-1, Q}};
+ [] -> %% ... or not
+ start_timer(S#listener{count = N-1})
+ end;
+
+%% Timeout after the last accepting process has died.
+l({timeout, TRef, close = T}, #listener{tref = TRef,
+ count = 0}) ->
+ x(T);
+l({timeout, _, close}, #listener{} = S) ->
+ S.
+
+%% t/2
+%%
+%% Transition transport state.
+
+t(T,S) ->
+ case transition(T,S) of
+ ok ->
+ S;
+ #transport{} = NS ->
+ NS;
+ stop ->
+ x(T)
+ end.
+
+%% transition/2
+
+%% Incoming message.
+transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock,
+ mode = {accept, _}}
+ = S) ->
+ recv(Data, S);
+
+transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
+ setopts(Sock),
+ recv(Data, S);
+
+%% Outgoing message.
+transition({diameter, {send, Msg}}, S) ->
+ send(Msg, S);
+
+%% Request to close the transport connection.
+transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
+ stop;
+
+%% Listener process has died.
+transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) ->
+ stop;
+
+%% Parent process has died.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+ stop.
+
+%% Crash on anything unexpected.
+
+%% accept/2
+%%
+%% Start a new transport process or use one that's already been
+%% started as a consequence of association establishment.
+
+%% No pending associations: spawn a new transport.
+accept(Pid, #listener{socket = Sock,
+ tmap = T,
+ pending = {0,_} = Q}
+ = S) ->
+ Arg = {accept, Pid, self(), Sock},
+ {ok, TPid} = diameter_sctp_sup:start_child(Arg),
+ MRef = erlang:monitor(process, TPid),
+ ets:insert(T, [{MRef, TPid}, {TPid, MRef}]),
+ {TPid, S#listener{pending = [TPid | Q]}};
+%% Placing the transport in the pending field makes it available to
+%% the next association. The stack starts a new accepting transport
+%% only after this one brings the connection up (or dies).
+
+%% Accepting transport has died. This can happen if a new transport is
+%% started before the DOWN has arrived.
+accept(Pid, #listener{pending = [TPid | {0,_} = Q]} = S) ->
+ false = is_process_alive(TPid), %% assert
+ accept(Pid, S#listener{pending = Q});
+
+%% Pending associations: attach to the first in the queue.
+accept(Pid, #listener{ref = Ref, pending = {N,Q}} = S) ->
+ TPid = ets:first(Q),
+ TPid ! {Ref, Pid},
+ ets:delete(Q, TPid),
+ {TPid, S#listener{pending = {N-1, Q}}}.
+
+%% send/2
+
+%% Outbound Diameter message on a specified stream ...
+send(#diameter_packet{bin = Bin, transport_data = {stream, SId}}, S) ->
+ send(SId, Bin, S),
+ S;
+
+%% ... or not: rotate through all steams.
+send(Bin, #transport{streams = {_, OS},
+ os = N}
+ = S)
+ when is_binary(Bin) ->
+ send(N, Bin, S),
+ S#transport{os = (N + 1) rem OS}.
+
+%% send/3
+
+%% Messages have to be sent from the controlling process, which is
+%% probably a bug. Sending from here causes an inet_reply, Sock,
+%% Status} message to be sent to the controlling process while
+%% gen_sctp:send/4 here hangs.
+send(StreamId, Bin, #transport{assoc_id = AId,
+ mode = {accept, LPid}}) ->
+ LPid ! {send, AId, StreamId, Bin};
+
+send(StreamId, Bin, #transport{socket = Sock,
+ assoc_id = AId}) ->
+ send(Sock, AId, StreamId, Bin).
+
+%% send/4
+
+send(Sock, AssocId, Stream, Bin) ->
+ case gen_sctp:send(Sock, AssocId, Stream, Bin) of
+ ok ->
+ ok;
+ {error, Reason} ->
+ x({send, Reason})
+ end.
+
+%% recv/2
+
+%% Association established ...
+recv({[], #sctp_assoc_change{state = comm_up,
+ outbound_streams = OS,
+ inbound_streams = IS,
+ assoc_id = Id}},
+ #transport{assoc_id = undefined}
+ = S) ->
+ up(S#transport{assoc_id = Id,
+ streams = {IS, OS}});
+
+%% ... or not: try the next address.
+recv({[], #sctp_assoc_change{} = E},
+ #transport{assoc_id = undefined,
+ socket = Sock,
+ mode = {connect = C, {[RA|RAs], RP, Es}}}
+ = S) ->
+ S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}};
+
+%% Lost association after establishment.
+recv({[], #sctp_assoc_change{}}, _) ->
+ stop;
+
+%% Inbound Diameter message.
+recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid})
+ when is_binary(Bin) ->
+ diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id},
+ bin = Bin}),
+ ok;
+
+recv({[], #sctp_shutdown_event{assoc_id = Id}},
+ #transport{assoc_id = Id}) ->
+ stop.
+
+%% up/1
+
+up(#transport{parent = Pid,
+ mode = {connect = C, {[RA | _], RP, _}}}
+ = S) ->
+ diameter_peer:up(Pid, {RA,RP}),
+ S#transport{mode = C};
+
+up(#transport{parent = Pid,
+ mode = {accept, _}}
+ = S) ->
+ diameter_peer:up(Pid),
+ S.
+
+%% find/2
+
+find({[#sctp_sndrcvinfo{assoc_id = Id}], _}
+ = Data,
+ #listener{tmap = T}
+ = S) ->
+ f(ets:lookup(T, Id), Data, S);
+
+find({_, Rec} = Data, #listener{tmap = T} = S) ->
+ f(ets:lookup(T, assoc_id(Rec)), Data, S).
+
+%% New association and a transport waiting for one: use it.
+f([],
+ {_, #sctp_assoc_change{state = comm_up,
+ assoc_id = Id}},
+ #listener{tmap = T,
+ pending = [TPid | {_,_} = Q]}
+ = S) ->
+ [{TPid, MRef}] = ets:lookup(T, TPid),
+ ets:insert(T, [{MRef, Id}, {Id, TPid}]),
+ ets:delete(T, TPid),
+ {TPid, S#listener{pending = Q}};
+
+%% New association and no transport start yet: spawn one and place it
+%% in the queue.
+f([],
+ {_, #sctp_assoc_change{state = comm_up,
+ assoc_id = Id}},
+ #listener{ref = Ref,
+ socket = Sock,
+ tmap = T,
+ pending = {N,Q}}
+ = S) ->
+ Arg = {accept, Ref, self(), Sock, Id},
+ {ok, TPid} = diameter_sctp_sup:start_child(Arg),
+ MRef = erlang:monitor(process, TPid),
+ ets:insert(T, [{MRef, Id}, {Id, TPid}]),
+ ets:insert(Q, {TPid, now()}),
+ {TPid, S#listener{pending = {N+1, Q}}};
+
+%% Known association ...
+f([{_, TPid}], _, S) ->
+ {TPid, S};
+
+%% ... or not: discard.
+f([], _, _) ->
+ false.
+
+%% assoc_id/1
+
+assoc_id(#sctp_shutdown_event{assoc_id = Id}) -> %% undocumented
+ Id;
+assoc_id(#sctp_assoc_change{assoc_id = Id}) ->
+ Id;
+assoc_id(#sctp_sndrcvinfo{assoc_id = Id}) ->
+ Id;
+assoc_id(#sctp_paddr_change{assoc_id = Id}) ->
+ Id;
+assoc_id(#sctp_adaptation_event{assoc_id = Id}) ->
+ Id.
+
+%% connect/4
+
+connect(_, [], _, Reasons) ->
+ x({connect, lists:reverse(Reasons)});
+
+connect(Sock, [Addr | AT] = As, Port, Reasons) ->
+ case gen_sctp:connect_init(Sock, Addr, Port, []) of
+ ok ->
+ {As, Port, Reasons};
+ {error, _} = E ->
+ connect(Sock, AT, Port, [{Addr, E} | Reasons])
+ end.
+
+%% setopts/1
+
+setopts(Sock) ->
+ case inet:setopts(Sock, [{active, once}]) of
+ ok -> ok;
+ X -> x({setopts, Sock, X}) %% possibly on peer disconnect
+ end.