diff options
Diffstat (limited to 'lib/diameter/src/transport/diameter_sctp.erl')
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 81 |
1 files changed, 65 insertions, 16 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index cb024c77b1..209f8c01c1 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -37,6 +37,9 @@ code_change/3, terminate/2]). +-export([ports/0, + ports/1]). + -include_lib("kernel/include/inet_sctp.hrl"). -include_lib("diameter/include/diameter.hrl"). @@ -118,8 +121,8 @@ s({accept, Ref} = A, Addrs, Opts) -> %% gen_sctp in order to be able to accept a new association only %% *after* an accepting transport has been spawned. -s({connect = C, _}, Addrs, Opts) -> - diameter_sctp_sup:start_child({C, self(), Opts, Addrs}). +s({connect = C, Ref}, Addrs, Opts) -> + diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}). %% start_link/1 @@ -149,28 +152,36 @@ i({listen, Ref, {Opts, Addrs}}) -> socket = Sock}); %% A connecting transport. -i({connect, Pid, Opts, Addrs}) -> +i({connect, Pid, Opts, Addrs, Ref}) -> {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]), RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As], [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], {LAs, Sock} = open(Addrs, Rest, 0), + putr(ref, Ref), proc_lib:init_ack({ok, self(), LAs}), erlang:monitor(process, Pid), #transport{parent = Pid, mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock}; +i({connect, _, _, _} = T) -> %% from old code + x(T); %% An accepting transport spawned by diameter. -i({accept, Pid, LPid, Sock}) -> +i({accept, Pid, LPid, Sock, Ref}) + when is_pid(Pid) -> + putr(ref, Ref), proc_lib:init_ack({ok, self()}), erlang:monitor(process, Pid), erlang:monitor(process, LPid), #transport{parent = Pid, mode = {accept, LPid}, socket = Sock}; +i({accept, _, _, _} = T) -> %% from old code + x(T); %% An accepting transport spawned at association establishment. i({accept, Ref, LPid, Sock, Id}) -> + putr(ref, Ref), proc_lib:init_ack({ok, self()}), MRef = erlang:monitor(process, LPid), %% Wait for a signal that the transport has been started before @@ -250,13 +261,33 @@ gen_opts(Opts) -> [binary, {active, once} | Opts]. %% --------------------------------------------------------------------------- +%% # ports/0-1 +%% --------------------------------------------------------------------------- + +ports() -> + Ts = diameter_reg:match({?MODULE, '_', '_'}), + [{type(T), N, Pid} || {{?MODULE, T, {_, {_, S}}}, Pid} <- Ts, + {ok, N} <- [inet:port(S)]]. + +ports(Ref) -> + Ts = diameter_reg:match({?MODULE, '_', {Ref, '_'}}), + [{type(T), N, Pid} || {{?MODULE, T, {R, {_, S}}}, Pid} <- Ts, + R == Ref, + {ok, N} <- [inet:port(S)]]. + +type(listener) -> + listen; +type(T) -> + T. + +%% --------------------------------------------------------------------------- %% # handle_call/3 %% --------------------------------------------------------------------------- handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref, count = N} = S) -> - {TPid, NewS} = accept(Pid, S), + {TPid, NewS} = accept(Ref, Pid, S), {reply, {ok, TPid}, NewS#listener{count = N+1}}; handle_call(_, _, State) -> @@ -306,6 +337,12 @@ terminate(_, #listener{socket = Sock}) -> %% --------------------------------------------------------------------------- +putr(Key, Val) -> + put({?MODULE, Key}, Val). + +getr(Key) -> + get({?MODULE, Key}). + %% start_timer/1 start_timer(#listener{count = 0} = S) -> @@ -425,21 +462,27 @@ transition({'DOWN', _, process, Pid, _}, #transport{mode = {accept, Pid}}) -> %% Parent process has died. transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> - stop. + stop; + +%% Request for the local port number. +transition({resolve_port, Pid}, #transport{socket = Sock}) + when is_pid(Pid) -> + Pid ! inet:port(Sock), + ok. %% Crash on anything unexpected. -%% accept/2 +%% accept/3 %% %% Start a new transport process or use one that's already been %% started as a consequence of association establishment. %% No pending associations: spawn a new transport. -accept(Pid, #listener{socket = Sock, - tmap = T, - pending = {0,_} = Q} - = S) -> - Arg = {accept, Pid, self(), Sock}, +accept(Ref, Pid, #listener{socket = Sock, + tmap = T, + pending = {0,_} = Q} + = S) -> + Arg = {accept, Pid, self(), Sock, Ref}, {ok, TPid} = diameter_sctp_sup:start_child(Arg), MRef = erlang:monitor(process, TPid), ets:insert(T, [{MRef, TPid}, {TPid, MRef}]), @@ -450,12 +493,12 @@ accept(Pid, #listener{socket = Sock, %% Accepting transport has died. This can happen if a new transport is %% started before the DOWN has arrived. -accept(Pid, #listener{pending = [TPid | {0,_} = Q]} = S) -> +accept(Ref, Pid, #listener{pending = [TPid | {0,_} = Q]} = S) -> false = is_process_alive(TPid), %% assert - accept(Pid, S#listener{pending = Q}); + accept(Ref, Pid, S#listener{pending = Q}); %% Pending associations: attach to the first in the queue. -accept(Pid, #listener{ref = Ref, pending = {N,Q}} = S) -> +accept(_, Pid, #listener{ref = Ref, pending = {N,Q}} = S) -> TPid = ets:first(Q), TPid ! {Ref, Pid}, ets:delete(Q, TPid), @@ -507,8 +550,14 @@ recv({[], #sctp_assoc_change{state = comm_up, outbound_streams = OS, inbound_streams = IS, assoc_id = Id}}, - #transport{assoc_id = undefined} + #transport{assoc_id = undefined, + mode = {T, _}, + socket = Sock} = S) -> + Ref = getr(ref), + is_reference(Ref) %% started in new code + andalso + (true = diameter_reg:add_new({?MODULE, T, {Ref, {Id, Sock}}})), up(S#transport{assoc_id = Id, streams = {IS, OS}}); |