aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_sctp.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/diameter/src/transport/diameter_sctp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl428
1 files changed, 347 insertions, 81 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index f48e4347ee..64b34da690 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -52,21 +52,20 @@
%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY, ref).
+-define(TRANSPORT_KEY, transport).
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
%% The default port for a listener.
-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1
-%% Remote addresses to accept connections from.
--define(DEFAULT_ACCEPT, []). %% any
-
%% How long to wait for a transport process to attach after
%% association establishment.
-define(ACCEPT_TIMEOUT, 5000).
-type connect_option() :: {raddr, inet:ip_address()}
| {rport, inet:port_number()}
+ | option()
| term(). %% gen_sctp:open_option().
-type match() :: inet:ip_address()
@@ -74,8 +73,14 @@
| [match()].
-type listen_option() :: {accept, match()}
+ | option()
| term(). %% gen_sctp:open_option().
+-type option() :: {sender, boolean()}
+ | sender
+ | {packet, boolean() | raw}
+ | {message_cb, false | diameter:eval()}.
+
-type uint() :: non_neg_integer().
%% Accepting/connecting transport process state.
@@ -87,20 +92,38 @@
%% {RAs, RP, Errors}
| connect,
socket :: gen_sctp:sctp_socket() | undefined,
- assoc_id :: gen_sctp:assoc_id(), %% association identifier
+ active = false :: boolean(), %% is socket active?
+ recv = true :: boolean(), %% should it be active?
+ assoc_id :: gen_sctp:assoc_id() %% association identifier
+ | undefined
+ | true,
peer :: {[inet:ip_address()], uint()} %% {RAs, RP}
| undefined,
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
- os = 0 :: uint()}). %% next output stream
+ os = 0 :: uint(), %% next output stream
+ rotate = 1 :: boolean() | 0 | 1, %% rotate os?
+ unordered = false :: boolean() %% always send unordered?
+ | pos_integer(),% or if =< N outbound streams?
+ packet = true :: boolean() %% legacy transport_data?
+ | raw,
+ message_cb = false :: false | diameter:eval(),
+ send = false :: pid() | boolean()}). %% sending process
+
+%% Monitor process state.
+-record(monitor,
+ {transport :: pid(),
+ ack = false :: boolean(),
+ socket :: gen_sctp:sctp_socket(),
+ assoc_id :: gen_sctp:assoc_id()}).
%% Listener process state.
-record(listener,
{ref :: reference(),
socket :: gen_sctp:sctp_socket(),
- service = false :: false | pid(), %% service process
+ service :: pid(), %% service process
pending = {0, queue:new()},
- accept :: [match()]}).
+ opts :: [[match()] | boolean() | diameter:eval()]}).
%% Field pending implements two queues: the first of transport-to-be
%% processes to which an association has been assigned but for which
%% diameter hasn't yet spawned a transport process, a short-lived
@@ -132,24 +155,19 @@
start(T, Svc, Opts)
when is_list(Opts) ->
#diameter_service{capabilities = Caps,
- pid = SPid}
+ pid = Pid}
= Svc,
diameter_sctp_sup:start(), %% start supervisors on demand
Addrs = Caps#diameter_caps.host_ip_address,
- s(T, Addrs, SPid, lists:map(fun ip/1, Opts)).
-
-ip({ifaddr, A}) ->
- {ip, A};
-ip(T) ->
- T.
+ s(T, Addrs, Pid, Opts).
%% A listener spawns transports either as a consequence of this call
%% when there is not yet an association to assign it, or at comm_up on
%% a new association in which case the call retrieves a transport from
%% the pending queue.
-s({accept, Ref} = A, Addrs, SPid, Opts) ->
- {ok, LPid, LAs} = listener(Ref, {Opts, Addrs}),
- try gen_server:call(LPid, {A, self(), SPid}, infinity) of
+s({accept, Ref} = A, Addrs, SvcPid, Opts) ->
+ {ok, LPid, LAs} = listener(Ref, {Opts, SvcPid, Addrs}),
+ try gen_server:call(LPid, {A, self()}, infinity) of
{ok, TPid} ->
{ok, TPid, LAs};
No ->
@@ -162,7 +180,7 @@ s({accept, Ref} = A, Addrs, SPid, Opts) ->
%% gen_sctp in order to be able to accept a new association only
%% *after* an accepting transport has been spawned.
-s({connect = C, Ref}, Addrs, _SPid, Opts) ->
+s({connect = C, Ref}, Addrs, _SvcPid, Opts) ->
diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}).
%% start_link/1
@@ -216,22 +234,46 @@ init(T) ->
%% i/1
+i(#monitor{transport = TPid} = S) ->
+ monitor(process, TPid),
+ putr(?TRANSPORT_KEY, TPid),
+ proc_lib:init_ack({ok, self()}),
+ S;
+
%% A process owning a listening socket.
-i({listen, Ref, {Opts, Addrs}}) ->
+i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
+ monitor(process, SvcPid),
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
- {[Matches], Rest} = proplists:split(Opts, [accept]),
+ {Split, Rest} = proplists:split(Opts, [accept,
+ packet,
+ sender,
+ message_cb,
+ unordered]),
+ OwnOpts = lists:append(Split),
{LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
ok = gen_sctp:listen(Sock, true),
true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}),
proc_lib:init_ack({ok, self(), LAs}),
#listener{ref = Ref,
+ service = SvcPid,
socket = Sock,
- accept = [[M] || {accept, M} <- Matches]};
+ opts = [[[M] || {accept, M} <- OwnOpts],
+ proplists:get_value(packet, OwnOpts, true)
+ | [proplists:get_value(K, OwnOpts, false)
+ || K <- [sender, message_cb, unordered]]]};
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
- {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]),
- RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As],
+ {[Ps | Split], Rest} = proplists:split(Opts, [rport,
+ raddr,
+ packet,
+ sender,
+ message_cb,
+ unordered]),
+ OwnOpts = lists:append(Split),
+ CB = proplists:get_value(message_cb, OwnOpts, false),
+ false == CB orelse (Pid ! {diameter, ack}),
+ RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts],
[RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps],
{LAs, Sock} = open(Addrs, Rest, 0),
putr(?REF_KEY, Ref),
@@ -239,7 +281,11 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
monitor(process, Pid),
#transport{parent = Pid,
mode = {connect, connect(Sock, RAs, RP, [])},
- socket = Sock};
+ socket = Sock,
+ message_cb = CB,
+ unordered = proplists:get_value(ordered, OwnOpts, false),
+ packet = proplists:get_value(packet, OwnOpts, true),
+ send = proplists:get_value(sender, OwnOpts, false)};
%% An accepting transport spawned by diameter, not yet owning an
%% association.
@@ -273,11 +319,17 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
receive
{Ref, Pid} when K == parent -> %% transport process started
S#transport{parent = Pid};
- {K, T, Matches} when K == peeloff -> %% association
+ {K, T, Opts} when K == peeloff -> %% association
{sctp, Sock, _RA, _RP, _Data} = T,
+ [Matches, Packet, Sender, CB, Unordered] = Opts,
ok = accept_peer(Sock, Matches),
demonitor(Ref, [flush]),
- t(T, S#transport{socket = Sock});
+ false == CB orelse (S#transport.parent ! {diameter, ack}),
+ t(T, S#transport{socket = Sock,
+ message_cb = CB,
+ unordered = Unordered,
+ packet = Packet,
+ send = Sender});
accept_timeout = T ->
x(T);
{'DOWN', _, process, _, _} = T ->
@@ -309,23 +361,35 @@ l([], Ref, T) ->
%% open/3
open(Addrs, Opts, PortNr) ->
- {LAs, Os} = addrs(Addrs, Opts),
- {LAs, case gen_sctp:open(gen_opts(portnr(Os, PortNr))) of
- {ok, Sock} ->
- Sock;
- {error, Reason} ->
- x({open, Reason})
- end}.
+ case gen_sctp:open(gen_opts(portnr(addrs(Addrs, Opts), PortNr))) of
+ {ok, Sock} ->
+ {addrs(Sock), Sock};
+ {error, Reason} ->
+ x({open, Reason})
+ end.
addrs(Addrs, Opts) ->
- case proplists:split(Opts, [ip]) of
- {[[]], _} ->
- {Addrs, Opts ++ [{ip, A} || A <- Addrs]};
- {[As], Os} ->
- LAs = [diameter_lib:ipaddr(A) || {ip, A} <- As],
- {LAs, Os ++ [{ip, A} || A <- LAs]}
+ case lists:mapfoldl(fun ipaddr/2, false, Opts) of
+ {Os, true} ->
+ Os;
+ {_, false} ->
+ Opts ++ [{ip, A} || A <- Addrs]
end.
+ipaddr({K,A}, _)
+ when K == ifaddr;
+ K == ip ->
+ {{ip, ipaddr(A)}, true};
+ipaddr(T, B) ->
+ {T, B}.
+
+ipaddr(A)
+ when A == loopback;
+ A == any ->
+ A;
+ipaddr(A) ->
+ diameter_lib:ipaddr(A).
+
portnr(Opts, PortNr) ->
case proplists:get_value(port, Opts) of
undefined ->
@@ -334,6 +398,14 @@ portnr(Opts, PortNr) ->
Opts
end.
+addrs(Sock) ->
+ case inet:socknames(Sock) of
+ {ok, As} ->
+ [A || {A,_} <- As];
+ {error, Reason} ->
+ x({socknames, Reason})
+ end.
+
%% x/1
x(Reason) ->
@@ -374,13 +446,9 @@ handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref} = S) ->
{TPid, NewS} = accept(Ref, Pid, S),
{reply, {ok, TPid}, NewS};
-handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) ->
- handle_call({T, Pid}, From, if not is_pid(P), is_pid(SPid) ->
- monitor(process, SPid),
- S#listener{service = SPid};
- true ->
- S
- end);
+%% Transport is telling us of parent death.
+handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
+ {stop, {shutdown, Reason}, ok, S};
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -400,9 +468,13 @@ handle_info(T, #transport{} = S) ->
{noreply, #transport{} = t(T,S)};
handle_info(T, #listener{} = S) ->
- {noreply, #listener{} = l(T,S)}.
+ {noreply, #listener{} = l(T,S)};
+
+handle_info(T, #monitor{} = S) ->
+ m(T,S),
+ {noreply, S}.
-%% Prior to the possiblity of setting pool_size on in transport
+%% Prior to the possibility of setting pool_size on in transport
%% configuration, a new accepting transport was only started following
%% the death of a predecessor, so that there was only at most one
%% previously started transport process waiting for an association.
@@ -422,6 +494,9 @@ code_change(_, State, _) ->
%% # terminate/2
%% ---------------------------------------------------------------------------
+terminate(_, #monitor{}) ->
+ ok;
+
terminate(_, #transport{assoc_id = undefined}) ->
ok;
@@ -445,11 +520,11 @@ getr(Key) ->
%% Incoming message from SCTP.
l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
- accept = Matches}
+ opts = Opts}
= S) ->
Id = assoc_id(Data),
{TPid, NewS} = accept(S),
- TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches},
+ TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Opts},
setopts(Sock),
NewS;
@@ -503,12 +578,21 @@ t(T,S) ->
%% Incoming message.
transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
- setopts(Sock),
- recv(Data, S);
+ setopts(S, recv(Data, S#transport{active = false}));
%% Outgoing message.
transition({diameter, {send, Msg}}, S) ->
- send(Msg, S);
+ message(send, Msg, S);
+
+%% Monitor has sent an outgoing message.
+transition(Msg, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ message(ack, Msg, S);
+
+%% Deferred actions from a message_cb.
+transition({actions, Dir, Acts}, S) ->
+ setopts(ok, actions(Acts, Dir, S));
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
@@ -522,8 +606,18 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
stop;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+%% Parent process has died: call the monitor to not close the socket
+%% during an ongoing send, but don't let it take forever.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
+ send = MPid}) ->
+ is_boolean(MPid)
+ orelse ok == (catch gen_server:call(MPid, {stop, Pid}))
+ orelse exit(MPid, kill),
+ stop;
+
+%% Monitor process has died.
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop;
%% Timeout after transport process has been started.
@@ -536,6 +630,18 @@ transition({resolve_port, Pid}, #transport{socket = Sock})
Pid ! inet:port(Sock),
ok.
+%% m/2
+
+m({Msg, StreamId}, #monitor{socket = Sock,
+ transport = TPid,
+ assoc_id = AId,
+ ack = B}) ->
+ send(Sock, AId, StreamId, Msg),
+ B andalso (TPid ! Msg);
+
+m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) ->
+ x(T).
+
%% Crash on anything unexpected.
ok({ok, T}) ->
@@ -578,33 +684,57 @@ q(Ref, Pid, #listener{pending = {_,Q}}) ->
%% send/2
+%% Start monitor process on first send.
+send(Msg, #transport{send = true,
+ socket = Sock,
+ assoc_id = AId,
+ message_cb = CB}
+ = S) ->
+ {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(),
+ socket = Sock,
+ assoc_id = AId,
+ ack = false /= CB}),
+ monitor(process, MPid),
+ send(Msg, S#transport{send = MPid});
+
%% Outbound Diameter message on a specified stream ...
-send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}},
+send(#diameter_packet{transport_data = {outstream, SId}}
+ = Msg,
#transport{streams = {_, OS}}
= S) ->
- send(SId rem OS, Bin, S),
- S;
+ send(SId rem OS, Msg, S);
-%% ... or not: rotate through all streams.
-send(#diameter_packet{bin = Bin}, S) ->
- send(Bin, S);
-send(Bin, #transport{streams = {_, OS},
+%% ... or not: rotate when sending on multiple streams ...
+send(Msg, #transport{rotate = true,
+ streams = {_, OS},
os = N}
- = S)
- when is_binary(Bin) ->
- send(N, Bin, S),
- S#transport{os = (N + 1) rem OS}.
+ = S) ->
+ send(N, Msg, S#transport{os = (N + 1) rem OS});
+
+%% ... or send on the only stream available.
+send(Msg, S) ->
+ send(0, Msg, S).
%% send/3
-send(StreamId, Bin, #transport{socket = Sock,
- assoc_id = AId}) ->
- send(Sock, AId, StreamId, Bin).
+send(StreamId, Msg, #transport{send = false,
+ socket = Sock,
+ assoc_id = AId}
+ = S) ->
+ send(Sock, AId, StreamId, Msg),
+ message(ack, Msg, S);
+
+send(StreamId, Msg, #transport{send = MPid} = S) ->
+ MPid ! {Msg, StreamId},
+ S.
%% send/4
-send(Sock, AssocId, Stream, Bin) ->
- case gen_sctp:send(Sock, AssocId, Stream, Bin) of
+send(Sock, AssocId, StreamId, #diameter_packet{bin = Bin}) ->
+ send(Sock, AssocId, StreamId, Bin);
+
+send(Sock, AssocId, StreamId, Bin) ->
+ case gen_sctp:send(Sock, AssocId, StreamId, Bin) of
ok ->
ok;
{error, Reason} ->
@@ -624,7 +754,9 @@ recv({_, #sctp_assoc_change{state = comm_up,
= S) ->
Ref = getr(?REF_KEY),
publish(T, Ref, Id, Sock),
- up(S#transport{assoc_id = Id,
+ %% Deal with different association id after peeloff on Solaris by
+ %% taking the id from the first reception.
+ up(S#transport{assoc_id = T == accept orelse Id,
streams = {IS, OS}});
%% ... or not: try the next address.
@@ -639,17 +771,19 @@ recv({_, #sctp_assoc_change{} = E},
recv({_, #sctp_assoc_change{}}, _) ->
stop;
+%% First inbound on an accepting transport.
+recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin}
+ = T,
+ #transport{assoc_id = true}
+ = S) ->
+ recv(T, S#transport{assoc_id = Id});
+
%% Inbound Diameter message.
-recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid})
+recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S)
when is_binary(Bin) ->
- diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id},
- bin = Bin}),
- ok;
+ message(recv, Msg, recv(S));
-recv({_, #sctp_shutdown_event{assoc_id = A}},
- #transport{assoc_id = Id})
- when A == Id;
- A == 0 ->
+recv({_, #sctp_shutdown_event{}}, _) ->
stop;
%% Note that diameter_sctp(3) documents that sctp_events cannot be
@@ -667,6 +801,41 @@ recv({_, #sctp_paddr_change{}}, _) ->
recv({_, #sctp_pdapi_event{}}, _) ->
ok.
+%% recv/1
+%%
+%% Start sending unordered after the second reception, so that an
+%% outgoing CER/CEA will arrive at the peer before another request.
+
+recv(#transport{rotate = B} = S)
+ when is_boolean(B) ->
+ S;
+
+recv(#transport{rotate = 0,
+ streams = {_,OS},
+ socket = Sock,
+ unordered = B}
+ = S) ->
+ ok = unordered(Sock, OS, B),
+ S#transport{rotate = 1 < OS};
+
+recv(#transport{rotate = N} = S) ->
+ S#transport{rotate = N-1}.
+
+%% unordered/3
+
+unordered(Sock, OS, B)
+ when B;
+ is_integer(B), OS =< B ->
+ inet:setopts(Sock, [{sctp_default_send_param,
+ #sctp_sndrcvinfo{flags = [unordered]}}]);
+
+unordered(_, OS, B)
+ when not B;
+ is_integer(B), B < OS ->
+ ok.
+
+%% publish/4
+
publish(T, Ref, Id, Sock) ->
true = diameter_reg:add_new({?MODULE, T, {Ref, {Id, Sock}}}),
putr(?INFO_KEY, {gen_sctp, Sock}). %% for info/1
@@ -765,6 +934,23 @@ connect(Sock, [Addr | AT] = As, Port, Reasons) ->
connect(Sock, AT, Port, [{Addr, E} | Reasons])
end.
+%% setopts/2
+
+setopts(_, #transport{socket = Sock,
+ active = A,
+ recv = B}
+ = S)
+ when B, not A ->
+ setopts(Sock),
+ S#transport{active = true};
+
+setopts(_, #transport{} = S) ->
+ S;
+
+setopts(#transport{socket = Sock}, T) ->
+ setopts(Sock),
+ T.
+
%% setopts/1
setopts(Sock) ->
@@ -772,3 +958,83 @@ setopts(Sock) ->
ok -> ok;
X -> x({setopts, Sock, X}) %% possibly on peer disconnect
end.
+
+%% A message_cb is invoked whenever a message is sent or received, or
+%% to provide acknowledgement of a completed send or discarded
+%% request. See diameter_tcp for semantics, the only difference being
+%% that a recv callback can get a diameter_packet record as Msg
+%% depending on how/if option packet has been specified.
+
+%% message/3
+
+message(send, false = M, S) ->
+ message(ack, M, S);
+
+message(ack, _, #transport{message_cb = false} = S) ->
+ S;
+
+message(Dir, Msg, S) ->
+ setopts(S, actions(cb(S, Dir, Msg), Dir, S)).
+
+%% actions/3
+
+actions([], _, S) ->
+ S;
+
+actions([B | As], Dir, S)
+ when is_boolean(B) ->
+ actions(As, Dir, S#transport{recv = B});
+
+actions([Dir | As], _, S)
+ when Dir == send;
+ Dir == recv ->
+ actions(As, Dir, S);
+
+actions([Msg | As], send = Dir, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ actions(As, Dir, send(Msg, S));
+
+actions([Msg | As], recv = Dir, #transport{parent = Pid} = S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ diameter_peer:recv(Pid, Msg),
+ actions(As, Dir, S);
+
+actions([{defer, Tmo, Acts} | As], Dir, S) ->
+ erlang:send_after(Tmo, self(), {actions, Dir, Acts}),
+ actions(As, Dir, S);
+
+actions(CB, _, S) ->
+ S#transport{message_cb = CB}.
+
+%% cb/3
+
+cb(#transport{message_cb = false, packet = P}, recv, Msg) ->
+ [pkt(P, true, Msg)];
+
+cb(#transport{message_cb = CB, packet = P}, recv = D, Msg) ->
+ cb(CB, D, pkt(P, false, Msg));
+
+cb(#transport{message_cb = CB}, Dir, Msg) ->
+ cb(CB, Dir, Msg);
+
+cb(false, send, Msg) ->
+ [Msg];
+
+cb(CB, Dir, Msg) ->
+ diameter_lib:eval([CB, Dir, Msg]).
+
+%% pkt/3
+
+pkt(false, _, {_Info, Bin}) ->
+ Bin;
+
+pkt(true, _, {[#sctp_sndrcvinfo{stream = Id}], Bin}) ->
+ #diameter_packet{bin = Bin, transport_data = {stream, Id}};
+
+pkt(raw, true, {[Info], Bin}) ->
+ #diameter_packet{bin = Bin, transport_data = Info};
+
+pkt(raw, false, {[_], _} = Msg) ->
+ Msg.