diff options
Diffstat (limited to 'lib/diameter/src/transport')
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 74 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 64 |
2 files changed, 117 insertions, 21 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 8b8c2a6694..49a530b4eb 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -42,6 +42,9 @@ -export([ports/0, ports/1]). +-export_type([listen_option/0, + connect_option/0]). + -include_lib("kernel/include/inet_sctp.hrl"). -include_lib("diameter/include/diameter.hrl"). @@ -54,6 +57,9 @@ %% The default port for a listener. -define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 +%% Remote addresses to accept connections from. +-define(DEFAULT_ACCEPT, []). %% any + %% How long a listener with no associations lives before offing %% itself. -define(LISTENER_TIMEOUT, 30000). @@ -62,6 +68,17 @@ %% association establishment. -define(ACCEPT_TIMEOUT, 5000). +-type connect_option() :: {raddr, inet:ip_address()} + | {rport, inet:port_number()} + | gen_sctp:open_option(). + +-type match() :: inet:ip_address() + | string() + | [match()]. + +-type listen_option() :: {accept, match()} + | gen_sctp:open_option(). + -type uint() :: non_neg_integer(). %% Accepting/connecting transport process state. @@ -69,7 +86,7 @@ {parent :: pid(), mode :: {accept, pid()} | accept - | {connect, {list(inet:ip_address()), uint(), list()}} + | {connect, {[inet:ip_address()], uint(), list()}} %% {RAs, RP, Errors} | connect, socket :: gen_sctp:sctp_socket(), @@ -86,7 +103,8 @@ tmap = ets:new(?MODULE, []) :: ets:tid(), %% {MRef, Pid|AssocId}, {AssocId, Pid} pending = {0, ets:new(?MODULE, [ordered_set])}, - tref :: reference()}). + tref :: reference(), + accept :: [match()]}). %% Field tmap is used to map an incoming message or event to the %% relevent transport process. Field pending implements a queue of %% transport processes to which an association has been assigned (at @@ -102,6 +120,13 @@ %% # start/3 %% --------------------------------------------------------------------------- +-spec start({accept, Ref}, #diameter_service{}, [listen_option()]) + -> {ok, pid(), [inet:ip_address()]} + when Ref :: diameter:transport_ref(); + ({connect, Ref}, #diameter_service{}, [connect_option()]) + -> {ok, pid(), [inet:ip_address()]} + when Ref :: diameter:transport_ref(). + start(T, #diameter_service{capabilities = Caps}, Opts) when is_list(Opts) -> diameter_sctp_sup:start(), %% start supervisors on demand @@ -169,12 +194,14 @@ init(T) -> %% A process owning a listening socket. i({listen, Ref, {Opts, Addrs}}) -> - {LAs, Sock} = AS = open(Addrs, Opts, ?DEFAULT_PORT), + {[Matches], Rest} = proplists:split(Opts, [accept]), + {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), proc_lib:init_ack({ok, self(), LAs}), ok = gen_sctp:listen(Sock, true), true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), start_timer(#listener{ref = Ref, - socket = Sock}); + socket = Sock, + accept = accept(Matches)}); %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> @@ -311,6 +338,9 @@ handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref, {TPid, NewS} = accept(Ref, Pid, S), {reply, {ok, TPid}, NewS#listener{count = N+1}}; +handle_call(T, From, {listener,_,_,_,_,_,_} = S) -> % started in old code + handle_call(T, From, upgrade(S)); + handle_call(_, _, State) -> {reply, nok, State}. @@ -329,7 +359,10 @@ handle_info(T, #transport{} = S) -> {noreply, #transport{} = t(T,S)}; handle_info(T, #listener{} = S) -> - {noreply, #listener{} = l(T,S)}. + {noreply, #listener{} = l(T,S)}; + +handle_info(T, {listener,_,_,_,_,_,_} = S) -> % started in old code + handle_info(T, upgrade(S)). %% --------------------------------------------------------------------------- %% # code_change/3 @@ -363,6 +396,9 @@ terminate(_, #listener{socket = Sock}) -> %% --------------------------------------------------------------------------- +upgrade(S) -> + #listener{} = erlang:append_element(S, ?DEFAULT_ACCEPT). + putr(Key, Val) -> put({?MODULE, Key}, Val). @@ -386,7 +422,7 @@ l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) -> try find(Id, Data, S) of {TPid, NewS} -> - TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg}, + TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept}, NewS; false -> S @@ -460,11 +496,14 @@ t(T,S) -> %% transition/2 %% Listening process is transfering ownership of an association. -transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg}, +transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg, Matches}, #transport{mode = {accept, _}, socket = LSock} = S) -> + ok = accept_peer(Sock, Matches), transition(Msg, S#transport{socket = Sock}); +transition({peeloff = T, _Sock, _Msg} = T, #transport{} = S) ->% from old code + transition(erlang:append_element(T, ?DEFAULT_ACCEPT), S); %% Incoming message. transition({sctp, _Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> @@ -510,6 +549,27 @@ transition({resolve_port, Pid}, #transport{socket = Sock}) %% Crash on anything unexpected. +ok({ok, T}) -> + T; +ok(T) -> + x(T). + +%% accept_peer/2 + +accept_peer(_, []) -> + ok; + +accept_peer(Sock, Matches) -> + {RAddrs, _} = ok(inet:peername(Sock)), + diameter_peer:match(RAddrs, Matches) + orelse x({accept, RAddrs, Matches}), + ok. + +%% accept/1 + +accept(Opts) -> + [[M] || {accept, M} <- Opts]. + %% accept/3 %% %% Start a new transport process or use one that's already been diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index cbbba714ac..4d1b8bec51 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -42,6 +42,9 @@ -export([ports/0, ports/1]). +-export_type([connect_option/0, + listen_option/0]). + -include_lib("diameter/include/diameter.hrl"). %% Keys into process dictionary. @@ -80,6 +83,25 @@ -type frag() :: {length(), size(), binary(), list(binary())} | binary(). +-type connect_option() :: {raddr, inet:ip_address()} + | {rport, pos_integer()} + | {ssl_options, true | [ssl:connect_option()]} + | option() + | ssl:connect_option() + | gen_tcp:connect_option(). + +-type match() :: inet:ip_address() + | string() + | [match()]. + +-type listen_option() :: {accept, match()} + | {ssl_options, true | [ssl:listen_option()]} + | ssl:listen_option() + | gen_tcp:listen_option(). + +-type option() :: {port, non_neg_integer()} + | {fragment_timer, 0..16#FFFFFFFF}. + %% Accepting/connecting transport process state. -record(transport, {socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket @@ -100,18 +122,14 @@ %% # start/3 %% --------------------------------------------------------------------------- --spec start({accept, Ref}, Svc, [Opt]) +-spec start({accept, Ref}, #diameter_service{}, [listen_option()]) -> {ok, pid(), [inet:ip_address()]} - when Ref :: diameter:transport_ref(), - Svc :: #diameter_service{}, - Opt :: diameter:transport_opt(); - ({connect, Ref}, Svc, [Opt]) + when Ref :: diameter:transport_ref(); + ({connect, Ref}, #diameter_service{}, [connect_option()]) -> {ok, pid(), [inet:ip_address()]} | {ok, pid()} - when Ref :: diameter:transport_ref(), - Svc :: #diameter_service{}, - Opt :: diameter:transport_opt(). - + when Ref :: diameter:transport_ref(). + start({T, Ref}, #diameter_service{capabilities = Caps}, Opts) -> diameter_tcp_sup:start(), %% start tcp supervisors on demand {Mod, Rest} = split(Opts), @@ -225,10 +243,10 @@ laddr([], Mod, Sock) -> Addr; laddr([{ip, Addr}], _, _) -> Addr. - + own(Opts) -> - {Own, Rest} = proplists:split(Opts, [fragment_timer]), - {lists:append(Own), Rest}. + {[Own], Rest} = proplists:split(Opts, [fragment_timer]), + {Own, Rest}. ssl(Opts) -> {[SslOpts], Rest} = proplists:split(Opts, [ssl_options]), @@ -257,9 +275,11 @@ init(Type, Ref, Mod, Pid, _, Opts, Addrs) -> %% init/6 init(accept = T, Ref, Mod, Pid, Opts, Addrs) -> - {LAddr, LSock} = listener(Ref, {Mod, Opts, Addrs}), + {[Matches], Rest} = proplists:split(Opts, [accept]), + {LAddr, LSock} = listener(Ref, {Mod, Rest, Addrs}), proc_lib:init_ack({ok, self(), [LAddr]}), Sock = ok(accept(Mod, LSock)), + ok = accept_peer(Mod, Sock, accept(Matches)), publish(Mod, T, Ref, Sock), diameter_peer:up(Pid), Sock; @@ -282,7 +302,7 @@ init_rc([]) -> up(Pid, Remote, [{ip, _Addr}], _, _) -> diameter_peer:up(Pid, Remote); -up(Pid, Remote, [], Mod, Sock) -> +up(Pid, Remote, [], Mod, Sock) -> {Addr, _Port} = ok(sockname(Mod, Sock)), diameter_peer:up(Pid, Remote, [Addr]). @@ -298,6 +318,22 @@ ok(No) -> x(Reason) -> exit({shutdown, Reason}). +%% accept_peer/3 + +accept_peer(_Mod, _Sock, []) -> + ok; + +accept_peer(Mod, Sock, Matches) -> + {RAddr, _} = ok(peername(Mod, Sock)), + diameter_peer:match([RAddr], Matches) + orelse x({accept, RAddr, Matches}), + ok. + +%% accept/1 + +accept(Opts) -> + [[M] || {accept, M} <- Opts]. + %% listener/2 listener(LRef, T) -> |