aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl76
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl104
2 files changed, 108 insertions, 72 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 4a005b853d..f48e4347ee 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -98,7 +98,7 @@
-record(listener,
{ref :: reference(),
socket :: gen_sctp:sctp_socket(),
- count = 0 :: uint(), %% attached transport processes
+ service = false :: false | pid(), %% service process
pending = {0, queue:new()},
accept :: [match()]}).
%% Field pending implements two queues: the first of transport-to-be
@@ -129,11 +129,14 @@
-> {ok, pid(), [inet:ip_address()]}
when Ref :: diameter:transport_ref().
-start(T, #diameter_service{capabilities = Caps}, Opts)
+start(T, Svc, Opts)
when is_list(Opts) ->
+ #diameter_service{capabilities = Caps,
+ pid = SPid}
+ = Svc,
diameter_sctp_sup:start(), %% start supervisors on demand
Addrs = Caps#diameter_caps.host_ip_address,
- s(T, Addrs, lists:map(fun ip/1, Opts)).
+ s(T, Addrs, SPid, lists:map(fun ip/1, Opts)).
ip({ifaddr, A}) ->
{ip, A};
@@ -144,18 +147,22 @@ ip(T) ->
%% when there is not yet an association to assign it, or at comm_up on
%% a new association in which case the call retrieves a transport from
%% the pending queue.
-s({accept, Ref} = A, Addrs, Opts) ->
- {LPid, LAs} = listener(Ref, {Opts, Addrs}),
- try gen_server:call(LPid, {A, self()}, infinity) of
- {ok, TPid} -> {ok, TPid, LAs}
+s({accept, Ref} = A, Addrs, SPid, Opts) ->
+ {ok, LPid, LAs} = listener(Ref, {Opts, Addrs}),
+ try gen_server:call(LPid, {A, self(), SPid}, infinity) of
+ {ok, TPid} ->
+ {ok, TPid, LAs};
+ No ->
+ {error, No}
catch
- exit: Reason -> {error, Reason}
+ exit: Reason ->
+ {error, Reason}
end;
%% This implementation is due to there being no accept call in
%% gen_sctp in order to be able to accept a new association only
%% *after* an accepting transport has been spawned.
-s({connect = C, Ref}, Addrs, Opts) ->
+s({connect = C, Ref}, Addrs, _SPid, Opts) ->
diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}).
%% start_link/1
@@ -281,24 +288,23 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
%% Accepting processes can be started concurrently: ensure only one
%% listener is started.
-listener(LRef, T) ->
- diameter_sync:call({?MODULE, listener, LRef},
- {?MODULE, listener, [{LRef, T}]},
+listener(Ref, T) ->
+ diameter_sync:call({?MODULE, listener, Ref},
+ {?MODULE, listener, [{Ref, T}]},
infinity,
infinity).
-listener({LRef, T}) ->
- l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T).
+listener({Ref, T}) ->
+ l(diameter_reg:match({?MODULE, listener, {Ref, '_'}}), Ref, T).
%% Existing listening process ...
l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
- {LAs, _Sock} = AS,
- {LPid, LAs};
+ {LAs, _Sock} = AS,
+ {ok, LPid, LAs};
%% ... or not.
-l([], LRef, T) ->
- {ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}),
- {LPid, LAs}.
+l([], Ref, T) ->
+ diameter_sctp_sup:start_child({listen, Ref, T}).
%% open/3
@@ -364,11 +370,17 @@ type(T) ->
%% # handle_call/3
%% ---------------------------------------------------------------------------
-handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref,
- count = K}
- = S) ->
+handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref} = S) ->
{TPid, NewS} = accept(Ref, Pid, S),
- {reply, {ok, TPid}, NewS#listener{count = K+1}};
+ {reply, {ok, TPid}, NewS};
+
+handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) ->
+ handle_call({T, Pid}, From, if not is_pid(P), is_pid(SPid) ->
+ monitor(process, SPid),
+ S#listener{service = SPid};
+ true ->
+ S
+ end);
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -441,6 +453,13 @@ l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
setopts(Sock),
NewS;
+%% Service process has died.
+l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid,
+ socket = Sock}) ->
+ gen_sctp:close(Sock),
+ x(T);
+
+%% Accepting process has died.
l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) ->
down(queue:member(TPid, Q), TPid, S);
@@ -454,20 +473,17 @@ l({transport, remove, _} = T, #listener{socket = Sock}) ->
%% Accepting transport has died.
%% One that's waiting for transport start in the pending queue ...
-down(true, TPid, #listener{pending = {N,Q},
- count = K}
- = S) ->
+down(true, TPid, #listener{pending = {N,Q}} = S) ->
NQ = queue:filter(fun(P) -> P /= TPid end, Q),
if N < 0 -> %% awaiting an association ...
- S#listener{count = K-1,
- pending = {N+1, NQ}};
+ S#listener{pending = {N+1, NQ}};
true -> %% ... or one has been assigned
S#listener{pending = {N-1, NQ}}
end;
%% ... or one that's already attached.
-down(false, _TPid, #listener{count = K} = S) ->
- S#listener{count = K-1}.
+down(false, _TPid, S) ->
+ S.
%% t/2
%%
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 546c2cfa5e..44abc5c3b4 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -71,11 +71,8 @@
%% a process owning the listening port.
%% Listener process state.
--record(listener, {socket :: inet:socket(),
- count = 1 :: non_neg_integer()}). %% accepting processes
-%% The count of accepting processes was previously used to terminate
-%% the listening process, but diameter_reg:subscribe/2 is now used for
-%% this. Leave the the count for trace purposes.
+-record(listener, {socket :: inet:socket(),
+ service = false :: false | pid()}). %% service process
%% Monitor process state.
-record(monitor,
@@ -138,11 +135,15 @@
| {ok, pid()}
when Ref :: diameter:transport_ref().
-start({T, Ref}, #diameter_service{capabilities = Caps}, Opts) ->
+start({T, Ref}, Svc, Opts) ->
+ #diameter_service{capabilities = Caps,
+ pid = SPid}
+ = Svc,
+
diameter_tcp_sup:start(), %% start tcp supervisors on demand
{Mod, Rest} = split(Opts),
Addrs = Caps#diameter_caps.host_ip_address,
- Arg = {T, Ref, Mod, self(), Rest, Addrs},
+ Arg = {T, Ref, Mod, self(), Rest, Addrs, SPid},
diameter_tcp_sup:start_child(Arg).
split([{module, M} | Opts]) ->
@@ -196,7 +197,7 @@ init(T) ->
%% i/1
%% A transport process.
-i({T, Ref, Mod, Pid, Opts, Addrs})
+i({T, Ref, Mod, Pid, Opts, Addrs, SPid})
when T == accept;
T == connect ->
monitor(process, Pid),
@@ -214,7 +215,7 @@ i({T, Ref, Mod, Pid, Opts, Addrs})
?DEFAULT_FRAGMENT_TIMEOUT),
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
Throttle = proplists:get_value(throttle_cb, OwnOpts, false),
- Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs),
+ Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid),
MPid ! {stop, self()}, %% tell the monitor to die
M = if SslOpts -> ssl; true -> Mod end,
putr(?REF_KEY, Ref),
@@ -228,6 +229,11 @@ i({T, Ref, Mod, Pid, Opts, Addrs})
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
+i({T, _Ref, _Mod, _Pid, _Opts, _Addrs} = Arg) %% from old code
+ when T == accept;
+ T == connect ->
+ i(erlang:append_element(Arg, _SPid = false));
+
%% A monitor process to kill the transport if the parent dies.
i(#monitor{parent = Pid, transport = TPid} = S) ->
proc_lib:init_ack({ok, self()}),
@@ -240,16 +246,18 @@ i(#monitor{parent = Pid, transport = TPid} = S) ->
%% death. However, a link can be unlinked and this is exactly what
%% gen_tcp seems to so. Links should be left to supervisors.
-i({listen, LRef, APid, {Mod, Opts, Addrs}}) ->
- [_] = diameter_config:subscribe(LRef, transport), %% assert existence
+i({listen = L, Ref, _APid, T}) -> %% from old code
+ i({L, Ref, T});
+
+i({listen, Ref, {Mod, Opts, Addrs}}) ->
+ [_] = diameter_config:subscribe(Ref, transport), %% assert existence
{[LA, LP], Rest} = proplists:split(Opts, [ip, port]),
LAddrOpt = get_addr(LA, Addrs),
LPort = get_port(LP),
{ok, LSock} = Mod:listen(LPort, gen_opts(LAddrOpt, Rest)),
LAddr = laddr(LAddrOpt, Mod, LSock),
- true = diameter_reg:add_new({?MODULE, listener, {LRef, {LAddr, LSock}}}),
+ true = diameter_reg:add_new({?MODULE, listener, {Ref, {LAddr, LSock}}}),
proc_lib:init_ack({ok, self(), {LAddr, LSock}}),
- monitor(process, APid),
#listener{socket = LSock}.
laddr([], Mod, Sock) ->
@@ -268,21 +276,22 @@ ssl_opts([{ssl_options, Opts}])
ssl_opts(T) ->
?ERROR({ssl_options, T}).
-%% init/7
+%% init/8
%% Establish a TLS connection before capabilities exchange ...
-init(Type, Ref, Mod, Pid, true, Opts, Addrs) ->
- init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs);
+init(Type, Ref, Mod, Pid, true, Opts, Addrs, SPid) ->
+ init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SPid);
%% ... or not.
-init(Type, Ref, Mod, Pid, _, Opts, Addrs) ->
- init(Type, Ref, Mod, Pid, Opts, Addrs).
+init(Type, Ref, Mod, Pid, _, Opts, Addrs, SPid) ->
+ init(Type, Ref, Mod, Pid, Opts, Addrs, SPid).
-%% init/6
+%% init/7
-init(accept = T, Ref, Mod, Pid, Opts, Addrs) ->
+init(accept = T, Ref, Mod, Pid, Opts, Addrs, SPid) ->
{[Matches], Rest} = proplists:split(Opts, [accept]),
- {LAddr, LSock} = listener(Ref, {Mod, Rest, Addrs}),
+ {ok, LPid, {LAddr, LSock}} = listener(Ref, {Mod, Rest, Addrs}),
+ ok = gen_server:call(LPid, {accept, SPid}, infinity),
proc_lib:init_ack({ok, self(), [LAddr]}),
Sock = ok(accept(Mod, LSock)),
ok = accept_peer(Mod, Sock, accept(Matches)),
@@ -290,7 +299,7 @@ init(accept = T, Ref, Mod, Pid, Opts, Addrs) ->
diameter_peer:up(Pid),
Sock;
-init(connect = T, Ref, Mod, Pid, Opts, Addrs) ->
+init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SPid) ->
{[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]),
LAddrOpt = get_addr(LA, Addrs),
RAddr = get_addr(RA),
@@ -344,24 +353,26 @@ accept(Opts) ->
%% Accepting processes can be started concurrently: ensure only one
%% listener is started.
-listener(LRef, T) ->
- diameter_sync:call({?MODULE, listener, LRef},
- {?MODULE, listener, [{LRef, T, self()}]},
+listener(Ref, T) ->
+ diameter_sync:call({?MODULE, listener, Ref},
+ {?MODULE, listener, [{Ref, T, self()}]},
infinity,
infinity).
-listener({LRef, T, TPid}) ->
- l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T, TPid).
+%% listener/1
+
+listener({Ref, T, _TPid}) ->
+ l(diameter_reg:match({?MODULE, listener, {Ref, '_'}}), Ref, T).
+
+%% l/3
%% Existing listening process ...
-l([{{?MODULE, listener, {_, AS}}, LPid}], _, _, TPid) ->
- LPid ! {accept, TPid},
- AS;
+l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) ->
+ {ok, LPid, AS};
%% ... or not.
-l([], LRef, T, TPid) ->
- {ok, _, AS} = diameter_tcp_sup:start_child({listen, LRef, TPid, T}),
- AS.
+l([], Ref, T) ->
+ diameter_tcp_sup:start_child({listen, Ref, T}).
%% get_addr/1
@@ -440,6 +451,14 @@ portnr(Sock) ->
%% # handle_call/3
%% ---------------------------------------------------------------------------
+handle_call({accept, SPid}, _From, #listener{service = P} = S) ->
+ {reply, ok, if not is_pid(P), is_pid(SPid) ->
+ monitor(process, SPid),
+ S#listener{service = SPid};
+ true ->
+ S
+ end};
+
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -507,19 +526,20 @@ m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid,
%%
%% Transition listener state.
-%% An accepting transport is attaching.
-l({accept, TPid}, #listener{count = N} = S) ->
- monitor(process, TPid),
- S#listener{count = N+1};
-
-%% Accepting process has died.
-l({'DOWN', _, process, _, _}, #listener{count = N} = S) ->
- S#listener{count = N-1};
+%% Service process has died.
+l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid,
+ socket = Sock}) ->
+ gen_tcp:close(Sock),
+ x(T);
%% Transport has been removed.
l({transport, remove, _} = T, #listener{socket = Sock}) ->
gen_tcp:close(Sock),
- x(T).
+ x(T);
+
+%% Possibly death of an accepting process monitored in old code.
+l(_, S) ->
+ S.
%% t/2
%%