aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src/transport/diameter_sctp.erl
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2017-06-14 09:29:05 +0200
committerAnders Svensson <[email protected]>2017-06-14 09:29:05 +0200
commit1bf842f3cd603ddd6246d874e188e4f75b0cc692 (patch)
treed47b488ce7b0e6402241ac99ee4161e0ebffee6b /lib/diameter/src/transport/diameter_sctp.erl
parentd4fea060349a72fb58267e82c2d6bfa7b638b2c9 (diff)
parent69c5a74179e13e145da3da70e02dd43881a82008 (diff)
downloadotp-1bf842f3cd603ddd6246d874e188e4f75b0cc692.tar.gz
otp-1bf842f3cd603ddd6246d874e188e4f75b0cc692.tar.bz2
otp-1bf842f3cd603ddd6246d874e188e4f75b0cc692.zip
Merge branch 'anders/diameter/transport/ERL-332'
* anders/diameter/transport/ERL-332: (35 commits) Capitulate on SCTP vs sparc-sun-solaris2.10 Remove obsolete traffic testcase Fix dialyzer warnings Remove client/server string decode from traffic suite Add diameter_sctp option packet Add diameter_sctp send/recv callbacks Let diameter_tcp send/recv callbacks deal in diameter_packet Randomly select traffic testcases Exercise diameter_tcp message callbacks in traffic suite Exercise diameter_{tcp,sctp} sender in traffic suite Remove upgrade from diameter_traffic Add diameter_tcp send/recv callbacks Make diameter_{tcp,sctp} sender configurable Remove upgrade from diameter_sctp; tweak diameter_tcp to match Fix incomprehensible dialyzer warning Simplify acks to transport processes Strip throttling callbacks from diameter_tcp Deal with (another) SCTP association id quirk on Solaris Use binary:copy/2 when generating largish data in test suites Deal with SCTP association id quirk on Solaris ...
Diffstat (limited to 'lib/diameter/src/transport/diameter_sctp.erl')
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl319
1 files changed, 259 insertions, 60 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 76aacabcb8..6a9f1f940b 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -52,21 +52,20 @@
%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY, ref).
+-define(TRANSPORT_KEY, transport).
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
%% The default port for a listener.
-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1
-%% Remote addresses to accept connections from.
--define(DEFAULT_ACCEPT, []). %% any
-
%% How long to wait for a transport process to attach after
%% association establishment.
-define(ACCEPT_TIMEOUT, 5000).
-type connect_option() :: {raddr, inet:ip_address()}
| {rport, inet:port_number()}
+ | option()
| term(). %% gen_sctp:open_option().
-type match() :: inet:ip_address()
@@ -74,8 +73,14 @@
| [match()].
-type listen_option() :: {accept, match()}
+ | option()
| term(). %% gen_sctp:open_option().
+-type option() :: {sender, boolean()}
+ | sender
+ | {packet, boolean() | raw}
+ | {message_cb, false | diameter:evaluable()}.
+
-type uint() :: non_neg_integer().
%% Accepting/connecting transport process state.
@@ -87,20 +92,35 @@
%% {RAs, RP, Errors}
| connect,
socket :: gen_sctp:sctp_socket() | undefined,
- assoc_id :: gen_sctp:assoc_id(), %% association identifier
+ active = false :: boolean(), %% is socket active?
+ recv = true :: boolean(), %% should it be active?
+ assoc_id :: gen_sctp:assoc_id() %% association identifier
+ | undefined
+ | true,
peer :: {[inet:ip_address()], uint()} %% {RAs, RP}
| undefined,
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
- os = 0 :: uint()}). %% next output stream
+ os = 0 :: uint(), %% next output stream
+ packet = true :: boolean() %% legacy transport_data?
+ | raw,
+ message_cb = false :: false | diameter:evaluable(),
+ send = false :: pid() | boolean()}). %% sending process
+
+%% Monitor process state.
+-record(monitor,
+ {transport :: pid(),
+ ack = false :: boolean(),
+ socket :: gen_sctp:sctp_socket(),
+ assoc_id :: gen_sctp:assoc_id()}). %% next output stream
%% Listener process state.
-record(listener,
{ref :: reference(),
socket :: gen_sctp:sctp_socket(),
- service = false :: false | pid(), %% service process
+ service :: pid(), %% service process
pending = {0, queue:new()},
- accept :: [match()]}).
+ opts :: [[match()] | boolean() | diameter:evaluable()]}).
%% Field pending implements two queues: the first of transport-to-be
%% processes to which an association has been assigned but for which
%% diameter hasn't yet spawned a transport process, a short-lived
@@ -132,11 +152,11 @@
start(T, Svc, Opts)
when is_list(Opts) ->
#diameter_service{capabilities = Caps,
- pid = SPid}
+ pid = Pid}
= Svc,
diameter_sctp_sup:start(), %% start supervisors on demand
Addrs = Caps#diameter_caps.host_ip_address,
- s(T, Addrs, SPid, lists:map(fun ip/1, Opts)).
+ s(T, Addrs, Pid, lists:map(fun ip/1, Opts)).
ip({ifaddr, A}) ->
{ip, A};
@@ -147,9 +167,9 @@ ip(T) ->
%% when there is not yet an association to assign it, or at comm_up on
%% a new association in which case the call retrieves a transport from
%% the pending queue.
-s({accept, Ref} = A, Addrs, SPid, Opts) ->
- {ok, LPid, LAs} = listener(Ref, {Opts, Addrs}),
- try gen_server:call(LPid, {A, self(), SPid}, infinity) of
+s({accept, Ref} = A, Addrs, SvcPid, Opts) ->
+ {ok, LPid, LAs} = listener(Ref, {Opts, SvcPid, Addrs}),
+ try gen_server:call(LPid, {A, self()}, infinity) of
{ok, TPid} ->
{ok, TPid, LAs};
No ->
@@ -162,7 +182,7 @@ s({accept, Ref} = A, Addrs, SPid, Opts) ->
%% gen_sctp in order to be able to accept a new association only
%% *after* an accepting transport has been spawned.
-s({connect = C, Ref}, Addrs, _SPid, Opts) ->
+s({connect = C, Ref}, Addrs, _SvcPid, Opts) ->
diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}).
%% start_link/1
@@ -216,22 +236,39 @@ init(T) ->
%% i/1
+i(#monitor{transport = TPid} = S) ->
+ monitor(process, TPid),
+ putr(?TRANSPORT_KEY, TPid),
+ proc_lib:init_ack({ok, self()}),
+ S;
+
%% A process owning a listening socket.
-i({listen, Ref, {Opts, Addrs}}) ->
+i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
+ monitor(process, SvcPid),
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
- {[Matches], Rest} = proplists:split(Opts, [accept]),
+ {Split, Rest}
+ = proplists:split(Opts, [accept, packet, sender, message_cb]),
+ OwnOpts = lists:append(Split),
{LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
ok = gen_sctp:listen(Sock, true),
true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}),
proc_lib:init_ack({ok, self(), LAs}),
#listener{ref = Ref,
+ service = SvcPid,
socket = Sock,
- accept = [[M] || {accept, M} <- Matches]};
+ opts = [[[M] || {accept, M} <- OwnOpts],
+ proplists:get_value(packet, OwnOpts, true)
+ | [proplists:get_value(K, OwnOpts, false)
+ || K <- [sender, message_cb]]]};
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
- {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]),
- RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As],
+ {[Ps | Split], Rest}
+ = proplists:split(Opts, [rport, raddr, packet, sender, message_cb]),
+ OwnOpts = lists:append(Split),
+ CB = proplists:get_value(message_cb, OwnOpts, false),
+ false == CB orelse (Pid ! {diameter, ack}),
+ RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts],
[RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps],
{LAs, Sock} = open(Addrs, Rest, 0),
putr(?REF_KEY, Ref),
@@ -239,7 +276,10 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
monitor(process, Pid),
#transport{parent = Pid,
mode = {connect, connect(Sock, RAs, RP, [])},
- socket = Sock};
+ socket = Sock,
+ message_cb = CB,
+ packet = proplists:get_value(packet, OwnOpts, true),
+ send = proplists:get_value(sender, OwnOpts, false)};
%% An accepting transport spawned by diameter, not yet owning an
%% association.
@@ -273,11 +313,16 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
receive
{Ref, Pid} when K == parent -> %% transport process started
S#transport{parent = Pid};
- {K, T, Matches} when K == peeloff -> %% association
+ {K, T, Opts} when K == peeloff -> %% association
{sctp, Sock, _RA, _RP, _Data} = T,
+ [Matches, Packet, Sender, CB] = Opts,
ok = accept_peer(Sock, Matches),
demonitor(Ref, [flush]),
- t(T, S#transport{socket = Sock});
+ false == CB orelse (S#transport.parent ! {diameter, ack}),
+ t(T, S#transport{socket = Sock,
+ message_cb = CB,
+ packet = Packet,
+ send = Sender});
accept_timeout = T ->
x(T);
{'DOWN', _, process, _, _} = T ->
@@ -374,13 +419,9 @@ handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref} = S) ->
{TPid, NewS} = accept(Ref, Pid, S),
{reply, {ok, TPid}, NewS};
-handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) ->
- handle_call({T, Pid}, From, if not is_pid(P), is_pid(SPid) ->
- monitor(process, SPid),
- S#listener{service = SPid};
- true ->
- S
- end);
+%% Transport is telling us of parent death.
+handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
+ {stop, {shutdown, Reason}, ok, S};
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -400,7 +441,11 @@ handle_info(T, #transport{} = S) ->
{noreply, #transport{} = t(T,S)};
handle_info(T, #listener{} = S) ->
- {noreply, #listener{} = l(T,S)}.
+ {noreply, #listener{} = l(T,S)};
+
+handle_info(T, #monitor{} = S) ->
+ m(T,S),
+ {noreply, S}.
%% Prior to the possibility of setting pool_size on in transport
%% configuration, a new accepting transport was only started following
@@ -422,6 +467,9 @@ code_change(_, State, _) ->
%% # terminate/2
%% ---------------------------------------------------------------------------
+terminate(_, #monitor{}) ->
+ ok;
+
terminate(_, #transport{assoc_id = undefined}) ->
ok;
@@ -445,11 +493,11 @@ getr(Key) ->
%% Incoming message from SCTP.
l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
- accept = Matches}
+ opts = Opts}
= S) ->
Id = assoc_id(Data),
{TPid, NewS} = accept(S),
- TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches},
+ TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Opts},
setopts(Sock),
NewS;
@@ -503,12 +551,21 @@ t(T,S) ->
%% Incoming message.
transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
- setopts(Sock),
- recv(Data, S);
+ setopts(S, recv(Data, S#transport{active = false}));
%% Outgoing message.
transition({diameter, {send, Msg}}, S) ->
- send(Msg, S);
+ message(send, Msg, S);
+
+%% Monitor has sent an outgoing message.
+transition(Msg, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ message(ack, Msg, S);
+
+%% Deferred actions from a message_cb.
+transition({actions, Dir, Acts}, S) ->
+ actions(Acts, Dir, S);
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
@@ -522,8 +579,18 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
stop;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+%% Parent process has died: call the monitor to not close the socket
+%% during an ongoing send, but don't let it take forever.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
+ send = MPid}) ->
+ is_boolean(MPid)
+ orelse ok == (catch gen_server:call(MPid, {stop, Pid}))
+ orelse exit(MPid, kill),
+ stop;
+
+%% Monitor process has died.
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop;
%% Timeout after transport process has been started.
@@ -536,6 +603,18 @@ transition({resolve_port, Pid}, #transport{socket = Sock})
Pid ! inet:port(Sock),
ok.
+%% m/2
+
+m({Msg, StreamId}, #monitor{socket = Sock,
+ transport = TPid,
+ assoc_id = AId,
+ ack = B}) ->
+ send(Sock, AId, StreamId, Msg),
+ B andalso (TPid ! Msg);
+
+m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) ->
+ x(T).
+
%% Crash on anything unexpected.
ok({ok, T}) ->
@@ -578,33 +657,52 @@ q(Ref, Pid, #listener{pending = {_,Q}}) ->
%% send/2
+%% Start monitor process on first send.
+send(Msg, #transport{send = true,
+ socket = Sock,
+ assoc_id = AId,
+ message_cb = CB}
+ = S) ->
+ {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(),
+ socket = Sock,
+ assoc_id = AId,
+ ack = false /= CB}),
+ monitor(process, MPid),
+ send(Msg, S#transport{send = MPid});
+
%% Outbound Diameter message on a specified stream ...
-send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}},
+send(#diameter_packet{transport_data = {outstream, SId}}
+ = Msg,
#transport{streams = {_, OS}}
= S) ->
- send(SId rem OS, Bin, S),
- S;
+ send(SId rem OS, Msg, S);
%% ... or not: rotate through all streams.
-send(#diameter_packet{bin = Bin}, S) ->
- send(Bin, S);
-send(Bin, #transport{streams = {_, OS},
+send(Msg, #transport{streams = {_, OS},
os = N}
- = S)
- when is_binary(Bin) ->
- send(N, Bin, S),
- S#transport{os = (N + 1) rem OS}.
+ = S) ->
+ send(N, Msg, S#transport{os = (N + 1) rem OS}).
%% send/3
-send(StreamId, Bin, #transport{socket = Sock,
- assoc_id = AId}) ->
- send(Sock, AId, StreamId, Bin).
+send(StreamId, Msg, #transport{send = false,
+ socket = Sock,
+ assoc_id = AId}
+ = S) ->
+ send(Sock, AId, StreamId, Msg),
+ message(ack, Msg, S);
+
+send(StreamId, Msg, #transport{send = MPid} = S) ->
+ MPid ! {Msg, StreamId},
+ S.
%% send/4
-send(Sock, AssocId, Stream, Bin) ->
- case gen_sctp:send(Sock, AssocId, Stream, Bin) of
+send(Sock, AssocId, StreamId, #diameter_packet{bin = Bin}) ->
+ send(Sock, AssocId, StreamId, Bin);
+
+send(Sock, AssocId, StreamId, Bin) ->
+ case gen_sctp:send(Sock, AssocId, StreamId, Bin) of
ok ->
ok;
{error, Reason} ->
@@ -624,7 +722,9 @@ recv({_, #sctp_assoc_change{state = comm_up,
= S) ->
Ref = getr(?REF_KEY),
publish(T, Ref, Id, Sock),
- up(S#transport{assoc_id = Id,
+ %% Deal with different association id after peeloff on Solaris by
+ %% taking the id from the first reception.
+ up(S#transport{assoc_id = T == accept orelse Id,
streams = {IS, OS}});
%% ... or not: try the next address.
@@ -639,17 +739,19 @@ recv({_, #sctp_assoc_change{} = E},
recv({_, #sctp_assoc_change{}}, _) ->
stop;
+%% First inbound on an accepting transport.
+recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin}
+ = T,
+ #transport{assoc_id = true}
+ = S) ->
+ recv(T, S#transport{assoc_id = Id});
+
%% Inbound Diameter message.
-recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid})
+recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S)
when is_binary(Bin) ->
- diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id},
- bin = Bin}),
- ok;
+ message(recv, Msg, S);
-recv({_, #sctp_shutdown_event{assoc_id = A}},
- #transport{assoc_id = Id})
- when A == Id;
- A == 0 ->
+recv({_, #sctp_shutdown_event{}}, _) ->
stop;
%% Note that diameter_sctp(3) documents that sctp_events cannot be
@@ -765,6 +867,23 @@ connect(Sock, [Addr | AT] = As, Port, Reasons) ->
connect(Sock, AT, Port, [{Addr, E} | Reasons])
end.
+%% setopts/2
+
+setopts(_, #transport{socket = Sock,
+ active = A,
+ recv = B}
+ = S)
+ when B, not A ->
+ setopts(Sock),
+ S#transport{active = true};
+
+setopts(_, #transport{} = S) ->
+ S;
+
+setopts(#transport{socket = Sock}, T) ->
+ setopts(Sock),
+ T.
+
%% setopts/1
setopts(Sock) ->
@@ -772,3 +891,83 @@ setopts(Sock) ->
ok -> ok;
X -> x({setopts, Sock, X}) %% possibly on peer disconnect
end.
+
+%% A message_cb is invoked whenever a message is sent or received, or
+%% to provide acknowledgement of a completed send or discarded
+%% request. See diameter_tcp for semantics, the only difference being
+%% that a recv callback can get a diameter_packet record as Msg
+%% depending on how/if option packet has been specified.
+
+%% message/3
+
+message(send, false = M, S) ->
+ message(ack, M, S);
+
+message(ack, _, #transport{message_cb = false} = S) ->
+ S;
+
+message(Dir, Msg, S) ->
+ setopts(S, actions(cb(S, Dir, Msg), Dir, S)).
+
+%% actions/3
+
+actions([], _, S) ->
+ S;
+
+actions([B | As], Dir, S)
+ when is_boolean(B) ->
+ actions(As, Dir, S#transport{recv = B});
+
+actions([Dir | As], _, S)
+ when Dir == send;
+ Dir == recv ->
+ actions(As, Dir, S);
+
+actions([Msg | As], send = Dir, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ actions(As, Dir, send(Msg, S));
+
+actions([Msg | As], recv = Dir, #transport{parent = Pid} = S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ diameter_peer:recv(Pid, Msg),
+ actions(As, Dir, S);
+
+actions([{defer, Tmo, Acts} | As], Dir, S) ->
+ erlang:send_after(Tmo, self(), {actions, Dir, Acts}),
+ actions(As, Dir, S);
+
+actions(CB, _, S) ->
+ S#transport{message_cb = CB}.
+
+%% cb/3
+
+cb(#transport{message_cb = false, packet = P}, recv, Msg) ->
+ [pkt(P, true, Msg)];
+
+cb(#transport{message_cb = CB, packet = P}, recv = D, Msg) ->
+ cb(CB, D, pkt(P, false, Msg));
+
+cb(#transport{message_cb = CB}, Dir, Msg) ->
+ cb(CB, Dir, Msg);
+
+cb(false, send, Msg) ->
+ [Msg];
+
+cb(CB, Dir, Msg) ->
+ diameter_lib:eval([CB, Dir, Msg]).
+
+%% pkt/3
+
+pkt(false, _, {_Info, Bin}) ->
+ Bin;
+
+pkt(true, _, {[#sctp_sndrcvinfo{stream = Id}], Bin}) ->
+ #diameter_packet{bin = Bin, transport_data = {stream, Id}};
+
+pkt(raw, true, {[Info], Bin}) ->
+ #diameter_packet{bin = Bin, transport_data = Info};
+
+pkt(raw, false, {[_], _} = Msg) ->
+ Msg.