diff options
Diffstat (limited to 'lib/diameter/src/transport/diameter_sctp.erl')
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 117 |
1 files changed, 61 insertions, 56 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 678dc9b5d6..f48e4347ee 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2015. All Rights Reserved. +%% Copyright Ericsson AB 2010-2016. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -61,10 +61,6 @@ %% Remote addresses to accept connections from. -define(DEFAULT_ACCEPT, []). %% any -%% How long a listener with no associations lives before offing -%% itself. --define(LISTENER_TIMEOUT, 30000). - %% How long to wait for a transport process to attach after %% association establishment. -define(ACCEPT_TIMEOUT, 5000). @@ -84,25 +80,26 @@ %% Accepting/connecting transport process state. -record(transport, - {parent :: pid(), + {parent :: pid() | undefined, mode :: {accept, pid()} | accept | {connect, {[inet:ip_address()], uint(), list()}} %% {RAs, RP, Errors} | connect, - socket :: gen_sctp:sctp_socket(), + socket :: gen_sctp:sctp_socket() | undefined, assoc_id :: gen_sctp:assoc_id(), %% association identifier - peer :: {[inet:ip_address()], uint()}, %% {RAs, RP} - streams :: {uint(), uint()}, %% {InStream, OutStream} counts + peer :: {[inet:ip_address()], uint()} %% {RAs, RP} + | undefined, + streams :: {uint(), uint()} %% {InStream, OutStream} counts + | undefined, os = 0 :: uint()}). %% next output stream %% Listener process state. -record(listener, {ref :: reference(), socket :: gen_sctp:sctp_socket(), - count = 0 :: uint(), %% attached transport processes + service = false :: false | pid(), %% service process pending = {0, queue:new()}, - tref :: reference(), accept :: [match()]}). %% Field pending implements two queues: the first of transport-to-be %% processes to which an association has been assigned but for which @@ -132,11 +129,14 @@ -> {ok, pid(), [inet:ip_address()]} when Ref :: diameter:transport_ref(). -start(T, #diameter_service{capabilities = Caps}, Opts) +start(T, Svc, Opts) when is_list(Opts) -> + #diameter_service{capabilities = Caps, + pid = SPid} + = Svc, diameter_sctp_sup:start(), %% start supervisors on demand Addrs = Caps#diameter_caps.host_ip_address, - s(T, Addrs, lists:map(fun ip/1, Opts)). + s(T, Addrs, SPid, lists:map(fun ip/1, Opts)). ip({ifaddr, A}) -> {ip, A}; @@ -147,18 +147,22 @@ ip(T) -> %% when there is not yet an association to assign it, or at comm_up on %% a new association in which case the call retrieves a transport from %% the pending queue. -s({accept, Ref} = A, Addrs, Opts) -> - {LPid, LAs} = listener(Ref, {Opts, Addrs}), - try gen_server:call(LPid, {A, self()}, infinity) of - {ok, TPid} -> {ok, TPid, LAs} +s({accept, Ref} = A, Addrs, SPid, Opts) -> + {ok, LPid, LAs} = listener(Ref, {Opts, Addrs}), + try gen_server:call(LPid, {A, self(), SPid}, infinity) of + {ok, TPid} -> + {ok, TPid, LAs}; + No -> + {error, No} catch - exit: Reason -> {error, Reason} + exit: Reason -> + {error, Reason} end; %% This implementation is due to there being no accept call in %% gen_sctp in order to be able to accept a new association only %% *after* an accepting transport has been spawned. -s({connect = C, Ref}, Addrs, Opts) -> +s({connect = C, Ref}, Addrs, _SPid, Opts) -> diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}). %% start_link/1 @@ -214,14 +218,15 @@ init(T) -> %% A process owning a listening socket. i({listen, Ref, {Opts, Addrs}}) -> + [_] = diameter_config:subscribe(Ref, transport), %% assert existence {[Matches], Rest} = proplists:split(Opts, [accept]), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), proc_lib:init_ack({ok, self(), LAs}), - start_timer(#listener{ref = Ref, - socket = Sock, - accept = [[M] || {accept, M} <- Matches]}); + #listener{ref = Ref, + socket = Sock, + accept = [[M] || {accept, M} <- Matches]}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> @@ -283,24 +288,23 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> %% Accepting processes can be started concurrently: ensure only one %% listener is started. -listener(LRef, T) -> - diameter_sync:call({?MODULE, listener, LRef}, - {?MODULE, listener, [{LRef, T}]}, +listener(Ref, T) -> + diameter_sync:call({?MODULE, listener, Ref}, + {?MODULE, listener, [{Ref, T}]}, infinity, infinity). -listener({LRef, T}) -> - l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T). +listener({Ref, T}) -> + l(diameter_reg:match({?MODULE, listener, {Ref, '_'}}), Ref, T). %% Existing listening process ... l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> - {LAs, _Sock} = AS, - {LPid, LAs}; + {LAs, _Sock} = AS, + {ok, LPid, LAs}; %% ... or not. -l([], LRef, T) -> - {ok, LPid, LAs} = diameter_sctp_sup:start_child({listen, LRef, T}), - {LPid, LAs}. +l([], Ref, T) -> + diameter_sctp_sup:start_child({listen, Ref, T}). %% open/3 @@ -366,11 +370,17 @@ type(T) -> %% # handle_call/3 %% --------------------------------------------------------------------------- -handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref, - count = K} - = S) -> +handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref} = S) -> {TPid, NewS} = accept(Ref, Pid, S), - {reply, {ok, TPid}, NewS#listener{count = K+1}}; + {reply, {ok, TPid}, NewS}; + +handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) -> + handle_call({T, Pid}, From, if not is_pid(P), is_pid(SPid) -> + monitor(process, SPid), + S#listener{service = SPid}; + true -> + S + end); handle_call(_, _, State) -> {reply, nok, State}. @@ -429,13 +439,6 @@ putr(Key, Val) -> getr(Key) -> get({?MODULE, Key}). -%% start_timer/1 - -start_timer(#listener{count = 0} = S) -> - S#listener{tref = erlang:start_timer(?LISTENER_TIMEOUT, self(), close)}; -start_timer(S) -> - S. - %% l/2 %% %% Transition listener state. @@ -450,35 +453,37 @@ l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, setopts(Sock), NewS; +%% Service process has died. +l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid, + socket = Sock}) -> + gen_sctp:close(Sock), + x(T); + +%% Accepting process has died. l({'DOWN', _MRef, process, TPid, _}, #listener{pending = {_,Q}} = S) -> down(queue:member(TPid, Q), TPid, S); -%% Timeout after the last accepting process has died. -l({timeout, TRef, close = T}, #listener{tref = TRef, - count = 0}) -> - x(T); -l({timeout, _, close}, #listener{} = S) -> - S. +%% Transport has been removed. +l({transport, remove, _} = T, #listener{socket = Sock}) -> + gen_sctp:close(Sock), + x(T). %% down/3 %% %% Accepting transport has died. %% One that's waiting for transport start in the pending queue ... -down(true, TPid, #listener{pending = {N,Q}, - count = K} - = S) -> +down(true, TPid, #listener{pending = {N,Q}} = S) -> NQ = queue:filter(fun(P) -> P /= TPid end, Q), if N < 0 -> %% awaiting an association ... - start_timer(S#listener{count = K-1, - pending = {N+1, NQ}}); + S#listener{pending = {N+1, NQ}}; true -> %% ... or one has been assigned S#listener{pending = {N-1, NQ}} end; %% ... or one that's already attached. -down(false, _TPid, #listener{count = K} = S) -> - start_timer(S#listener{count = K-1}). +down(false, _TPid, S) -> + S. %% t/2 %% |