aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/transport')
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl74
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl64
2 files changed, 117 insertions, 21 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 8b8c2a6694..49a530b4eb 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -42,6 +42,9 @@
-export([ports/0,
ports/1]).
+-export_type([listen_option/0,
+ connect_option/0]).
+
-include_lib("kernel/include/inet_sctp.hrl").
-include_lib("diameter/include/diameter.hrl").
@@ -54,6 +57,9 @@
%% The default port for a listener.
-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1
+%% Remote addresses to accept connections from.
+-define(DEFAULT_ACCEPT, []). %% any
+
%% How long a listener with no associations lives before offing
%% itself.
-define(LISTENER_TIMEOUT, 30000).
@@ -62,6 +68,17 @@
%% association establishment.
-define(ACCEPT_TIMEOUT, 5000).
+-type connect_option() :: {raddr, inet:ip_address()}
+ | {rport, inet:port_number()}
+ | gen_sctp:open_option().
+
+-type match() :: inet:ip_address()
+ | string()
+ | [match()].
+
+-type listen_option() :: {accept, match()}
+ | gen_sctp:open_option().
+
-type uint() :: non_neg_integer().
%% Accepting/connecting transport process state.
@@ -69,7 +86,7 @@
{parent :: pid(),
mode :: {accept, pid()}
| accept
- | {connect, {list(inet:ip_address()), uint(), list()}}
+ | {connect, {[inet:ip_address()], uint(), list()}}
%% {RAs, RP, Errors}
| connect,
socket :: gen_sctp:sctp_socket(),
@@ -86,7 +103,8 @@
tmap = ets:new(?MODULE, []) :: ets:tid(),
%% {MRef, Pid|AssocId}, {AssocId, Pid}
pending = {0, ets:new(?MODULE, [ordered_set])},
- tref :: reference()}).
+ tref :: reference(),
+ accept :: [match()]}).
%% Field tmap is used to map an incoming message or event to the
%% relevent transport process. Field pending implements a queue of
%% transport processes to which an association has been assigned (at
@@ -102,6 +120,13 @@
%% # start/3
%% ---------------------------------------------------------------------------
+-spec start({accept, Ref}, #diameter_service{}, [listen_option()])
+ -> {ok, pid(), [inet:ip_address()]}
+ when Ref :: diameter:transport_ref();
+ ({connect, Ref}, #diameter_service{}, [connect_option()])
+ -> {ok, pid(), [inet:ip_address()]}
+ when Ref :: diameter:transport_ref().
+
start(T, #diameter_service{capabilities = Caps}, Opts)
when is_list(Opts) ->
diameter_sctp_sup:start(), %% start supervisors on demand
@@ -169,12 +194,14 @@ init(T) ->
%% A process owning a listening socket.
i({listen, Ref, {Opts, Addrs}}) ->
- {LAs, Sock} = AS = open(Addrs, Opts, ?DEFAULT_PORT),
+ {[Matches], Rest} = proplists:split(Opts, [accept]),
+ {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
proc_lib:init_ack({ok, self(), LAs}),
ok = gen_sctp:listen(Sock, true),
true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}),
start_timer(#listener{ref = Ref,
- socket = Sock});
+ socket = Sock,
+ accept = accept(Matches)});
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
@@ -311,6 +338,9 @@ handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref,
{TPid, NewS} = accept(Ref, Pid, S),
{reply, {ok, TPid}, NewS#listener{count = N+1}};
+handle_call(T, From, {listener,_,_,_,_,_,_} = S) -> % started in old code
+ handle_call(T, From, upgrade(S));
+
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -329,7 +359,10 @@ handle_info(T, #transport{} = S) ->
{noreply, #transport{} = t(T,S)};
handle_info(T, #listener{} = S) ->
- {noreply, #listener{} = l(T,S)}.
+ {noreply, #listener{} = l(T,S)};
+
+handle_info(T, {listener,_,_,_,_,_,_} = S) -> % started in old code
+ handle_info(T, upgrade(S)).
%% ---------------------------------------------------------------------------
%% # code_change/3
@@ -363,6 +396,9 @@ terminate(_, #listener{socket = Sock}) ->
%% ---------------------------------------------------------------------------
+upgrade(S) ->
+ #listener{} = erlang:append_element(S, ?DEFAULT_ACCEPT).
+
putr(Key, Val) ->
put({?MODULE, Key}, Val).
@@ -386,7 +422,7 @@ l({sctp, Sock, _RA, _RP, Data} = Msg, #listener{socket = Sock} = S) ->
try find(Id, Data, S) of
{TPid, NewS} ->
- TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg},
+ TPid ! {peeloff, peeloff(Sock, Id, TPid), Msg, S#listener.accept},
NewS;
false ->
S
@@ -460,11 +496,14 @@ t(T,S) ->
%% transition/2
%% Listening process is transfering ownership of an association.
-transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg},
+transition({peeloff, Sock, {sctp, LSock, _RA, _RP, _Data} = Msg, Matches},
#transport{mode = {accept, _},
socket = LSock}
= S) ->
+ ok = accept_peer(Sock, Matches),
transition(Msg, S#transport{socket = Sock});
+transition({peeloff = T, _Sock, _Msg} = T, #transport{} = S) ->% from old code
+ transition(erlang:append_element(T, ?DEFAULT_ACCEPT), S);
%% Incoming message.
transition({sctp, _Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
@@ -510,6 +549,27 @@ transition({resolve_port, Pid}, #transport{socket = Sock})
%% Crash on anything unexpected.
+ok({ok, T}) ->
+ T;
+ok(T) ->
+ x(T).
+
+%% accept_peer/2
+
+accept_peer(_, []) ->
+ ok;
+
+accept_peer(Sock, Matches) ->
+ {RAddrs, _} = ok(inet:peername(Sock)),
+ diameter_peer:match(RAddrs, Matches)
+ orelse x({accept, RAddrs, Matches}),
+ ok.
+
+%% accept/1
+
+accept(Opts) ->
+ [[M] || {accept, M} <- Opts].
+
%% accept/3
%%
%% Start a new transport process or use one that's already been
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index cbbba714ac..4d1b8bec51 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -42,6 +42,9 @@
-export([ports/0,
ports/1]).
+-export_type([connect_option/0,
+ listen_option/0]).
+
-include_lib("diameter/include/diameter.hrl").
%% Keys into process dictionary.
@@ -80,6 +83,25 @@
-type frag() :: {length(), size(), binary(), list(binary())}
| binary().
+-type connect_option() :: {raddr, inet:ip_address()}
+ | {rport, pos_integer()}
+ | {ssl_options, true | [ssl:connect_option()]}
+ | option()
+ | ssl:connect_option()
+ | gen_tcp:connect_option().
+
+-type match() :: inet:ip_address()
+ | string()
+ | [match()].
+
+-type listen_option() :: {accept, match()}
+ | {ssl_options, true | [ssl:listen_option()]}
+ | ssl:listen_option()
+ | gen_tcp:listen_option().
+
+-type option() :: {port, non_neg_integer()}
+ | {fragment_timer, 0..16#FFFFFFFF}.
+
%% Accepting/connecting transport process state.
-record(transport,
{socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket
@@ -100,18 +122,14 @@
%% # start/3
%% ---------------------------------------------------------------------------
--spec start({accept, Ref}, Svc, [Opt])
+-spec start({accept, Ref}, #diameter_service{}, [listen_option()])
-> {ok, pid(), [inet:ip_address()]}
- when Ref :: diameter:transport_ref(),
- Svc :: #diameter_service{},
- Opt :: diameter:transport_opt();
- ({connect, Ref}, Svc, [Opt])
+ when Ref :: diameter:transport_ref();
+ ({connect, Ref}, #diameter_service{}, [connect_option()])
-> {ok, pid(), [inet:ip_address()]}
| {ok, pid()}
- when Ref :: diameter:transport_ref(),
- Svc :: #diameter_service{},
- Opt :: diameter:transport_opt().
-
+ when Ref :: diameter:transport_ref().
+
start({T, Ref}, #diameter_service{capabilities = Caps}, Opts) ->
diameter_tcp_sup:start(), %% start tcp supervisors on demand
{Mod, Rest} = split(Opts),
@@ -225,10 +243,10 @@ laddr([], Mod, Sock) ->
Addr;
laddr([{ip, Addr}], _, _) ->
Addr.
-
+
own(Opts) ->
- {Own, Rest} = proplists:split(Opts, [fragment_timer]),
- {lists:append(Own), Rest}.
+ {[Own], Rest} = proplists:split(Opts, [fragment_timer]),
+ {Own, Rest}.
ssl(Opts) ->
{[SslOpts], Rest} = proplists:split(Opts, [ssl_options]),
@@ -257,9 +275,11 @@ init(Type, Ref, Mod, Pid, _, Opts, Addrs) ->
%% init/6
init(accept = T, Ref, Mod, Pid, Opts, Addrs) ->
- {LAddr, LSock} = listener(Ref, {Mod, Opts, Addrs}),
+ {[Matches], Rest} = proplists:split(Opts, [accept]),
+ {LAddr, LSock} = listener(Ref, {Mod, Rest, Addrs}),
proc_lib:init_ack({ok, self(), [LAddr]}),
Sock = ok(accept(Mod, LSock)),
+ ok = accept_peer(Mod, Sock, accept(Matches)),
publish(Mod, T, Ref, Sock),
diameter_peer:up(Pid),
Sock;
@@ -282,7 +302,7 @@ init_rc([]) ->
up(Pid, Remote, [{ip, _Addr}], _, _) ->
diameter_peer:up(Pid, Remote);
-up(Pid, Remote, [], Mod, Sock) ->
+up(Pid, Remote, [], Mod, Sock) ->
{Addr, _Port} = ok(sockname(Mod, Sock)),
diameter_peer:up(Pid, Remote, [Addr]).
@@ -298,6 +318,22 @@ ok(No) ->
x(Reason) ->
exit({shutdown, Reason}).
+%% accept_peer/3
+
+accept_peer(_Mod, _Sock, []) ->
+ ok;
+
+accept_peer(Mod, Sock, Matches) ->
+ {RAddr, _} = ok(peername(Mod, Sock)),
+ diameter_peer:match([RAddr], Matches)
+ orelse x({accept, RAddr, Matches}),
+ ok.
+
+%% accept/1
+
+accept(Opts) ->
+ [[M] || {accept, M} <- Opts].
+
%% listener/2
listener(LRef, T) ->