diff options
author | Anders Svensson <[email protected]> | 2011-05-18 18:58:01 +0200 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2011-05-18 18:58:01 +0200 |
commit | 1756ed583f24ba2206a0f573635a6fa3cdea5c54 (patch) | |
tree | c762a6e700f09fb580f8c9945fd6886ecc2c9923 /lib/diameter/src/transport/diameter_sctp.erl | |
parent | e993da4426a76bb172290a10999267d3023120d5 (diff) | |
parent | 3c15ff32e89e401b4dde2b8acc9699be2614b996 (diff) | |
download | otp-1756ed583f24ba2206a0f573635a6fa3cdea5c54.tar.gz otp-1756ed583f24ba2206a0f573635a6fa3cdea5c54.tar.bz2 otp-1756ed583f24ba2206a0f573635a6fa3cdea5c54.zip |
Merge branch 'anders/diameter_import/OTP-9321' into dev
* anders/diameter_import/OTP-9321:
Initial commit of the diameter application.
Diffstat (limited to 'lib/diameter/src/transport/diameter_sctp.erl')
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 624 |
1 files changed, 624 insertions, 0 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl new file mode 100644 index 0000000000..92aa8488a0 --- /dev/null +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -0,0 +1,624 @@ +%% +%% %CopyrightBegin% +%% +%% Copyright Ericsson AB 2010-2011. All Rights Reserved. +%% +%% The contents of this file are subject to the Erlang Public License, +%% Version 1.1, (the "License"); you may not use this file except in +%% compliance with the License. You should have received a copy of the +%% Erlang Public License along with this software. If not, it can be +%% retrieved online at http://www.erlang.org/. +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and limitations +%% under the License. +%% +%% %CopyrightEnd% +%% + +-module(diameter_sctp). + +-behaviour(gen_server). + +%% interface +-export([start/3]). + +%% child start from supervisor +-export([start_link/1]). + +%% child start from here +-export([init/1]). + +%% gen_server callbacks +-export([handle_call/3, + handle_cast/2, + handle_info/2, + code_change/3, + terminate/2]). + +-include_lib("kernel/include/inet_sctp.hrl"). +-include_lib("diameter/include/diameter.hrl"). + +-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). + +%% The default port for a listener. +-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 + +%% How long a listener with no associations lives before offing +%% itself. +-define(LISTENER_TIMEOUT, 30000). + +%% How long to wait for a transport process to attach after +%% association establishment. +-define(ACCEPT_TIMEOUT, 5000). + +-type uint() :: non_neg_integer(). + +%% Accepting/connecting transport process state. +-record(transport, + {parent :: pid(), + mode :: {accept, pid()} + | {connect, {list(inet:ip_address()), uint(), list()}} + %% {RAs, RP, Errors} + | connect, + socket :: gen_sctp:sctp_socket(), + assoc_id :: gen_sctp:assoc_id(), %% association identifier + peer :: {[inet:ip_address()], uint()}, %% {RAs, RP} + streams :: {uint(), uint()}, %% {InStream, OutStream} counts + os = 0 :: uint()}). %% next output stream + +%% Listener process state. +-record(listener, + {ref :: reference(), + socket :: gen_sctp:sctp_socket(), + count = 0 :: uint(), + tmap = ets:new(?MODULE, []) :: ets:tid(), + %% {MRef, Pid|AssocId}, {AssocId, Pid} + pending = {0, ets:new(?MODULE, [ordered_set])}, + tref :: reference()}). +%% Field tmap is used to map an incoming message or event to the +%% relevent transport process. Field pending implements a queue of +%% transport processes to which an association has been assigned (at +%% comm_up and written into tmap) but for which diameter hasn't yet +%% spawned a transport process: a short-lived state of affairs as a +%% new transport is spawned as a consequence of a peer being taken up, +%% transport processes being spawned by the listener on demand. In +%% case diameter starts a transport before comm_up on a new +%% association, pending is set to an improper list with the spawned +%% transport as head and the queue as tail. + +%% --------------------------------------------------------------------------- +%% # start/3 +%% --------------------------------------------------------------------------- + +start(T, #diameter_service{capabilities = Caps}, Opts) + when is_list(Opts) -> + diameter_sctp_sup:start(), %% start supervisors on demand + Addrs = Caps#diameter_caps.host_ip_address, + s(T, Addrs, lists:map(fun ip/1, Opts)). + +ip({ifaddr, A}) -> + {ip, A}; +ip(T) -> + T. + +%% A listener spawns transports either as a consequence of this call +%% when there is not yet an association to associate with it, or at +%% comm_up on a new association in which case the call retrieves a +%% transport from the pending queue. +s({accept, Ref} = A, Addrs, Opts) -> + {LPid, LAs} = listener(Ref, {Opts, Addrs}), + try gen_server:call(LPid, {A, self()}, infinity) of + {ok, TPid} -> {ok, TPid, LAs} + catch + exit: Reason -> {error, Reason} + end; +%% This implementation is due to there being no accept call in +%% gen_sctp in order to be able to accept a new association only +%% *after* an accepting transport has been spawned. + +s({connect = C, _}, Addrs, Opts) -> + diameter_sctp_sup:start_child({C, self(), Opts, Addrs}). + +%% start_link/1 + +start_link(T) -> + proc_lib:start_link(?MODULE, + init, + [T], + infinity, + diameter_lib:spawn_opts(server, [])). + +%% --------------------------------------------------------------------------- +%% # init/1 +%% --------------------------------------------------------------------------- + +init(T) -> + gen_server:enter_loop(?MODULE, [], i(T)). + +%% i/1 + +%% A process owning a listening socket. +i({listen, Ref, {Opts, Addrs}}) -> + {LAs, Sock} = AS = open(Addrs, Opts, ?DEFAULT_PORT), + proc_lib:init_ack({ok, self(), LAs}), + ok = gen_sctp:listen(Sock, true), + true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), + start_timer(#listener{ref = Ref, + socket = Sock}); + +%% A connecting transport. +i({connect, Pid, Opts, Addrs}) -> + {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]), + RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As], + [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], + {LAs, Sock} = open(Addrs, Rest, 0), + proc_lib:init_ack({ok, self(), LAs}), + erlang:monitor(process, Pid), + #transport{parent = Pid, + mode = {connect, connect(Sock, RAs, RP, [])}, + socket = Sock}; + +%% An accepting transport spawned by diameter. +i({accept, Pid, LPid, Sock}) -> + proc_lib:init_ack({ok, self()}), + erlang:monitor(process, Pid), + erlang:monitor(process, LPid), + #transport{parent = Pid, + mode = {accept, LPid}, + socket = Sock}; + +%% An accepting transport spawned at association establishment. +i({accept, Ref, LPid, Sock, Id}) -> + proc_lib:init_ack({ok, self()}), + MRef = erlang:monitor(process, LPid), + %% Wait for a signal that the transport has been started before + %% processing other messages. + receive + {Ref, Pid} -> %% transport started + #transport{parent = Pid, + mode = {accept, LPid}, + socket = Sock}; + {'DOWN', MRef, process, _, _} = T -> %% listener down + close(Sock, Id), + x(T) + after ?ACCEPT_TIMEOUT -> + close(Sock, Id), + x(timeout) + end. + +%% close/2 + +close(Sock, Id) -> + gen_sctp:eof(Sock, #sctp_assoc_change{assoc_id = Id}). +%% Having to pass a record here is hokey. + +%% listener/2 + +listener(LRef, T) -> + l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T). + +%% Existing process with the listening socket ... +l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> + {LAs, _Sock} = AS, + {LPid, LAs}; + +%% ... or not: start one. +l([], LRef, T) -> + {ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}), + {LPid, LAs}. + +%% open/3 + +open(Addrs, Opts, PortNr) -> + {LAs, Os} = addrs(Addrs, Opts), + {LAs, case gen_sctp:open(gen_opts(portnr(Os, PortNr))) of + {ok, Sock} -> + Sock; + {error, Reason} -> + x({open, Reason}) + end}. + +addrs(Addrs, Opts) -> + case proplists:split(Opts, [ip]) of + {[[]], _} -> + {Addrs, Opts ++ [{ip, A} || A <- Addrs]}; + {[As], Os} -> + LAs = [diameter_lib:ipaddr(A) || {ip, A} <- As], + {LAs, Os ++ [{ip, A} || A <- LAs]} + end. + +portnr(Opts, PortNr) -> + case proplists:get_value(port, Opts) of + undefined -> + [{port, PortNr} | Opts]; + _ -> + Opts + end. + +%% x/1 + +x(Reason) -> + exit({shutdown, Reason}). + +%% gen_opts/1 + +gen_opts(Opts) -> + {L,_} = proplists:split(Opts, [binary, list, mode, active, sctp_events]), + [[],[],[],[],[]] == L orelse ?ERROR({reserved_options, Opts}), + [binary, {active, once} | Opts]. + +%% --------------------------------------------------------------------------- +%% # handle_call/3 +%% --------------------------------------------------------------------------- + +handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref, + count = N} + = S) -> + {TPid, NewS} = accept(Pid, S), + {reply, {ok, TPid}, NewS#listener{count = N+1}}; + +handle_call(_, _, State) -> + {reply, nok, State}. + +%% --------------------------------------------------------------------------- +%% # handle_cast/2 +%% --------------------------------------------------------------------------- + +handle_cast(_, State) -> + {noreply, State}. + +%% --------------------------------------------------------------------------- +%% # handle_info/2 +%% --------------------------------------------------------------------------- + +handle_info(T, #transport{} = S) -> + {noreply, #transport{} = t(T,S)}; + +handle_info(T, #listener{} = S) -> + {noreply, #listener{} = l(T,S)}. + +%% --------------------------------------------------------------------------- +%% # code_change/3 +%% --------------------------------------------------------------------------- + +code_change(_, State, _) -> + {ok, State}. + +%% --------------------------------------------------------------------------- +%% # terminate/2 +%% --------------------------------------------------------------------------- + +terminate(_, #transport{assoc_id = undefined}) -> + ok; + +terminate(_, #transport{socket = Sock, + mode = {accept, _}, + assoc_id = Id}) -> + close(Sock, Id); + +terminate(_, #transport{socket = Sock}) -> + gen_sctp:close(Sock); + +terminate(_, #listener{socket = Sock}) -> + gen_sctp:close(Sock). + +%% --------------------------------------------------------------------------- + +%% start_timer/1 + +start_timer(#listener{count = 0} = S) -> + S#listener{tref = erlang:start_timer(?LISTENER_TIMEOUT, self(), close)}; +start_timer(S) -> + S. + +%% l/2 +%% +%% Transition listener state. + +%% Incoming message from SCTP. +l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) -> + setopts(Sock), + case find(Data, S) of + {TPid, NewS} -> + TPid ! Msg, + NewS; + false -> + S + end; + +%% Transport is asking message to be sent. See send/3 for why the send +%% isn't directly from the transport. +l({send, AssocId, StreamId, Bin}, #listener{socket = Sock} = S) -> + send(Sock, AssocId, StreamId, Bin), + S; + +%% Accepting transport has died. One that's awaiting an association ... +l({'DOWN', MRef, process, TPid, _}, #listener{pending = [TPid | Q], + tmap = T, + count = N} + = S) -> + ets:delete(T, MRef), + ets:delete(T, TPid), + start_timer(S#listener{count = N-1, + pending = Q}); + +%% ... ditto and a new transport has already been started ... +l({'DOWN', _, process, _, _} = T, #listener{pending = [TPid | Q]} + = S) -> + #listener{pending = NQ} + = NewS + = l(T, S#listener{pending = Q}), + NewS#listener{pending = [TPid | NQ]}; + +%% ... or not. +l({'DOWN', MRef, process, TPid, _}, #listener{socket = Sock, + tmap = T, + count = N, + pending = {P,Q}} + = S) -> + [{MRef, Id}] = ets:lookup(T, MRef), %% Id = TPid | AssocId + ets:delete(T, MRef), + ets:delete(T, Id), + Id == TPid orelse close(Sock, Id), + case ets:lookup(Q, TPid) of + [{TPid, _}] -> %% transport in the pending queue ... + ets:delete(Q, TPid), + S#listener{pending = {P-1, Q}}; + [] -> %% ... or not + start_timer(S#listener{count = N-1}) + end; + +%% Timeout after the last accepting process has died. +l({timeout, TRef, close = T}, #listener{tref = TRef, + count = 0}) -> + x(T); +l({timeout, _, close}, #listener{} = S) -> + S. + +%% t/2 +%% +%% Transition transport state. + +t(T,S) -> + case transition(T,S) of + ok -> + S; + #transport{} = NS -> + NS; + stop -> + x(T) + end. + +%% transition/2 + +%% Incoming message. +transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock, + mode = {accept, _}} + = S) -> + recv(Data, S); + +transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> + setopts(Sock), + recv(Data, S); + +%% Outgoing message. +transition({diameter, {send, Msg}}, S) -> + send(Msg, S); + +%% Request to close the transport connection. +transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> + stop; + +%% Listener process has died. +transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) -> + stop; + +%% Parent process has died. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> + stop. + +%% Crash on anything unexpected. + +%% accept/2 +%% +%% Start a new transport process or use one that's already been +%% started as a consequence of association establishment. + +%% No pending associations: spawn a new transport. +accept(Pid, #listener{socket = Sock, + tmap = T, + pending = {0,_} = Q} + = S) -> + Arg = {accept, Pid, self(), Sock}, + {ok, TPid} = diameter_sctp_sup:start_child(Arg), + MRef = erlang:monitor(process, TPid), + ets:insert(T, [{MRef, TPid}, {TPid, MRef}]), + {TPid, S#listener{pending = [TPid | Q]}}; +%% Placing the transport in the pending field makes it available to +%% the next association. The stack starts a new accepting transport +%% only after this one brings the connection up (or dies). + +%% Accepting transport has died. This can happen if a new transport is +%% started before the DOWN has arrived. +accept(Pid, #listener{pending = [TPid | {0,_} = Q]} = S) -> + false = is_process_alive(TPid), %% assert + accept(Pid, S#listener{pending = Q}); + +%% Pending associations: attach to the first in the queue. +accept(Pid, #listener{ref = Ref, pending = {N,Q}} = S) -> + TPid = ets:first(Q), + TPid ! {Ref, Pid}, + ets:delete(Q, TPid), + {TPid, S#listener{pending = {N-1, Q}}}. + +%% send/2 + +%% Outbound Diameter message on a specified stream ... +send(#diameter_packet{bin = Bin, transport_data = {stream, SId}}, S) -> + send(SId, Bin, S), + S; + +%% ... or not: rotate through all steams. +send(Bin, #transport{streams = {_, OS}, + os = N} + = S) + when is_binary(Bin) -> + send(N, Bin, S), + S#transport{os = (N + 1) rem OS}. + +%% send/3 + +%% Messages have to be sent from the controlling process, which is +%% probably a bug. Sending from here causes an inet_reply, Sock, +%% Status} message to be sent to the controlling process while +%% gen_sctp:send/4 here hangs. +send(StreamId, Bin, #transport{assoc_id = AId, + mode = {accept, LPid}}) -> + LPid ! {send, AId, StreamId, Bin}; + +send(StreamId, Bin, #transport{socket = Sock, + assoc_id = AId}) -> + send(Sock, AId, StreamId, Bin). + +%% send/4 + +send(Sock, AssocId, Stream, Bin) -> + case gen_sctp:send(Sock, AssocId, Stream, Bin) of + ok -> + ok; + {error, Reason} -> + x({send, Reason}) + end. + +%% recv/2 + +%% Association established ... +recv({[], #sctp_assoc_change{state = comm_up, + outbound_streams = OS, + inbound_streams = IS, + assoc_id = Id}}, + #transport{assoc_id = undefined} + = S) -> + up(S#transport{assoc_id = Id, + streams = {IS, OS}}); + +%% ... or not: try the next address. +recv({[], #sctp_assoc_change{} = E}, + #transport{assoc_id = undefined, + socket = Sock, + mode = {connect = C, {[RA|RAs], RP, Es}}} + = S) -> + S#transport{mode = {C, connect(Sock, RAs, RP, [{RA,E} | Es])}}; + +%% Lost association after establishment. +recv({[], #sctp_assoc_change{}}, _) -> + stop; + +%% Inbound Diameter message. +recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid}) + when is_binary(Bin) -> + diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id}, + bin = Bin}), + ok; + +recv({[], #sctp_shutdown_event{assoc_id = Id}}, + #transport{assoc_id = Id}) -> + stop. + +%% up/1 + +up(#transport{parent = Pid, + mode = {connect = C, {[RA | _], RP, _}}} + = S) -> + diameter_peer:up(Pid, {RA,RP}), + S#transport{mode = C}; + +up(#transport{parent = Pid, + mode = {accept, _}} + = S) -> + diameter_peer:up(Pid), + S. + +%% find/2 + +find({[#sctp_sndrcvinfo{assoc_id = Id}], _} + = Data, + #listener{tmap = T} + = S) -> + f(ets:lookup(T, Id), Data, S); + +find({_, Rec} = Data, #listener{tmap = T} = S) -> + f(ets:lookup(T, assoc_id(Rec)), Data, S). + +%% New association and a transport waiting for one: use it. +f([], + {_, #sctp_assoc_change{state = comm_up, + assoc_id = Id}}, + #listener{tmap = T, + pending = [TPid | {_,_} = Q]} + = S) -> + [{TPid, MRef}] = ets:lookup(T, TPid), + ets:insert(T, [{MRef, Id}, {Id, TPid}]), + ets:delete(T, TPid), + {TPid, S#listener{pending = Q}}; + +%% New association and no transport start yet: spawn one and place it +%% in the queue. +f([], + {_, #sctp_assoc_change{state = comm_up, + assoc_id = Id}}, + #listener{ref = Ref, + socket = Sock, + tmap = T, + pending = {N,Q}} + = S) -> + Arg = {accept, Ref, self(), Sock, Id}, + {ok, TPid} = diameter_sctp_sup:start_child(Arg), + MRef = erlang:monitor(process, TPid), + ets:insert(T, [{MRef, Id}, {Id, TPid}]), + ets:insert(Q, {TPid, now()}), + {TPid, S#listener{pending = {N+1, Q}}}; + +%% Known association ... +f([{_, TPid}], _, S) -> + {TPid, S}; + +%% ... or not: discard. +f([], _, _) -> + false. + +%% assoc_id/1 + +assoc_id(#sctp_shutdown_event{assoc_id = Id}) -> %% undocumented + Id; +assoc_id(#sctp_assoc_change{assoc_id = Id}) -> + Id; +assoc_id(#sctp_sndrcvinfo{assoc_id = Id}) -> + Id; +assoc_id(#sctp_paddr_change{assoc_id = Id}) -> + Id; +assoc_id(#sctp_adaptation_event{assoc_id = Id}) -> + Id. + +%% connect/4 + +connect(_, [], _, Reasons) -> + x({connect, lists:reverse(Reasons)}); + +connect(Sock, [Addr | AT] = As, Port, Reasons) -> + case gen_sctp:connect_init(Sock, Addr, Port, []) of + ok -> + {As, Port, Reasons}; + {error, _} = E -> + connect(Sock, AT, Port, [{Addr, E} | Reasons]) + end. + +%% setopts/1 + +setopts(Sock) -> + case inet:setopts(Sock, [{active, once}]) of + ok -> ok; + X -> x({setopts, Sock, X}) %% possibly on peer disconnect + end. |