aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2017-06-11 01:13:36 +0200
committerAnders Svensson <[email protected]>2017-06-12 16:13:03 +0200
commit373cd07c28bbe3e299eaca1c96b1441623ad4979 (patch)
tree726877cfda5687f446d66e62816d0a4180753e02
parent84bfb4980a5d6dd806cff07c8dc1c9f2ef85fc20 (diff)
downloadotp-373cd07c28bbe3e299eaca1c96b1441623ad4979.tar.gz
otp-373cd07c28bbe3e299eaca1c96b1441623ad4979.tar.bz2
otp-373cd07c28bbe3e299eaca1c96b1441623ad4979.zip
Add diameter_sctp send/recv callbacks
Corresponding to diameter_tcp callbacks a few commits back. Exercise the callbacks in the traffic suite.
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl183
-rw-r--r--lib/diameter/test/diameter_traffic_SUITE.erl12
2 files changed, 144 insertions, 51 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 3919596cb1..470a6aa9bf 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -59,9 +59,6 @@
%% The default port for a listener.
-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1
-%% Remote addresses to accept connections from.
--define(DEFAULT_ACCEPT, []). %% any
-
%% How long to wait for a transport process to attach after
%% association establishment.
-define(ACCEPT_TIMEOUT, 5000).
@@ -80,7 +77,8 @@
| term(). %% gen_sctp:open_option().
-type option() :: {sender, boolean()}
- | sender.
+ | sender
+ | {message_cb, false | diameter:evaluable()}.
-type uint() :: non_neg_integer().
@@ -93,6 +91,8 @@
%% {RAs, RP, Errors}
| connect,
socket :: gen_sctp:sctp_socket() | undefined,
+ active = false :: boolean(), %% is socket active?
+ recv = true :: boolean(), %% should it be active?
assoc_id :: gen_sctp:assoc_id() %% association identifier
| undefined
| true,
@@ -101,11 +101,13 @@
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
os = 0 :: uint(), %% next output stream
+ message_cb :: false | diameter:evaluable(),
send = false :: pid() | boolean()}). %% sending process
%% Monitor process state.
-record(monitor,
{transport :: pid(),
+ ack = false :: boolean(),
socket :: gen_sctp:sctp_socket(),
assoc_id :: gen_sctp:assoc_id()}). %% next output stream
@@ -115,8 +117,7 @@
socket :: gen_sctp:sctp_socket(),
service :: pid(), %% service process
pending = {0, queue:new()},
- accept :: [match()],
- sender :: boolean()}).
+ opts :: [[match()] | boolean() | diameter:evaluable()]}).
%% Field pending implements two queues: the first of transport-to-be
%% processes to which an association has been assigned but for which
%% diameter hasn't yet spawned a transport process, a short-lived
@@ -242,7 +243,7 @@ i(#monitor{transport = TPid} = S) ->
i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
monitor(process, SvcPid),
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
- {Split, Rest} = proplists:split(Opts, [accept, sender]),
+ {Split, Rest} = proplists:split(Opts, [accept, sender, message_cb]),
OwnOpts = lists:append(Split),
{LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
ok = gen_sctp:listen(Sock, true),
@@ -251,13 +252,17 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
#listener{ref = Ref,
service = SvcPid,
socket = Sock,
- accept = [[M] || {accept, M} <- OwnOpts],
- sender = proplists:get_value(sender, OwnOpts, false)};
+ opts = [[[M] || {accept, M} <- OwnOpts]
+ | [proplists:get_value(K, OwnOpts, false)
+ || K <- [sender, message_cb]]]};
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
- {[Ps | Split], Rest} = proplists:split(Opts, [rport, raddr, sender]),
+ {[Ps | Split], Rest}
+ = proplists:split(Opts, [rport, raddr, sender, message_cb]),
OwnOpts = lists:append(Split),
+ CB = proplists:get_value(message_cb, OwnOpts, false),
+ false == CB orelse (Pid ! {diameter, ack}),
RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts],
[RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps],
{LAs, Sock} = open(Addrs, Rest, 0),
@@ -267,6 +272,7 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
#transport{parent = Pid,
mode = {connect, connect(Sock, RAs, RP, [])},
socket = Sock,
+ message_cb = CB,
send = proplists:get_value(sender, OwnOpts, false)};
%% An accepting transport spawned by diameter, not yet owning an
@@ -301,12 +307,15 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
receive
{Ref, Pid} when K == parent -> %% transport process started
S#transport{parent = Pid};
- {K, T, Matches, Bool} when K == peeloff -> %% association
+ {K, T, Opts} when K == peeloff -> %% association
{sctp, Sock, _RA, _RP, _Data} = T,
+ [Matches, Sender, CB] = Opts,
ok = accept_peer(Sock, Matches),
demonitor(Ref, [flush]),
+ false == CB orelse (S#transport.parent ! {diameter, ack}),
t(T, S#transport{socket = Sock,
- send = Bool});
+ message_cb = CB,
+ send = Sender});
accept_timeout = T ->
x(T);
{'DOWN', _, process, _, _} = T ->
@@ -477,12 +486,11 @@ getr(Key) ->
%% Incoming message from SCTP.
l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
- accept = Matches,
- sender = Sender}
+ opts = Opts}
= S) ->
Id = assoc_id(Data),
{TPid, NewS} = accept(S),
- TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches, Sender},
+ TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Opts},
setopts(Sock),
NewS;
@@ -536,12 +544,21 @@ t(T,S) ->
%% Incoming message.
transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
- setopts(Sock),
- recv(Data, S);
+ setopts(S, recv(Data, S#transport{active = false}));
%% Outgoing message.
transition({diameter, {send, Msg}}, S) ->
- send(Msg, S);
+ message(send, Msg, S);
+
+%% Monitor has sent an outgoing message.
+transition(Msg, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ message(ack, Msg, S);
+
+%% Deferred actions from a message_cb.
+transition({actions, Dir, Acts}, S) ->
+ actions(Acts, Dir, S);
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
@@ -581,8 +598,12 @@ transition({resolve_port, Pid}, #transport{socket = Sock})
%% m/2
-m({Bin, StreamId}, #monitor{} = S) ->
- send(StreamId, Bin, S);
+m({Msg, StreamId}, #monitor{socket = Sock,
+ transport = TPid,
+ assoc_id = AId,
+ ack = B}) ->
+ send(Sock, AId, StreamId, Msg),
+ B andalso (TPid ! Msg);
m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) ->
x(T).
@@ -632,48 +653,47 @@ q(Ref, Pid, #listener{pending = {_,Q}}) ->
%% Start monitor process on first send.
send(Msg, #transport{send = true,
socket = Sock,
- assoc_id = AId}
+ assoc_id = AId,
+ message_cb = CB}
= S) ->
{ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(),
socket = Sock,
- assoc_id = AId}),
+ assoc_id = AId,
+ ack = false /= CB}),
monitor(process, MPid),
send(Msg, S#transport{send = MPid});
%% Outbound Diameter message on a specified stream ...
-send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}},
+send(#diameter_packet{transport_data = {outstream, SId}}
+ = Msg,
#transport{streams = {_, OS}}
= S) ->
- send(SId rem OS, Bin, S),
- S;
+ send(SId rem OS, Msg, S);
%% ... or not: rotate through all streams.
-send(#diameter_packet{bin = Bin}, S) ->
- send(Bin, S);
-send(Bin, #transport{streams = {_, OS},
+send(Msg, #transport{streams = {_, OS},
os = N}
- = S)
- when is_binary(Bin) ->
- send(N, Bin, S),
- S#transport{os = (N + 1) rem OS}.
+ = S) ->
+ send(N, Msg, S#transport{os = (N + 1) rem OS}).
%% send/3
-send(StreamId, Bin, #transport{send = false,
+send(StreamId, Msg, #transport{send = false,
socket = Sock,
- assoc_id = AId}) ->
- send(Sock, AId, StreamId, Bin);
-
-send(StreamId, Bin, #transport{send = MPid}) ->
- MPid ! {Bin, StreamId},
- MPid;
+ assoc_id = AId}
+ = S) ->
+ send(Sock, AId, StreamId, Msg),
+ message(ack, Msg, S);
-send(StreamId, Bin, #monitor{socket = Sock,
- assoc_id = AId}) ->
- send(Sock, AId, StreamId, Bin).
+send(StreamId, Msg, #transport{send = MPid} = S) ->
+ MPid ! {Msg, StreamId},
+ S.
%% send/4
+send(Sock, AssocId, StreamId, #diameter_packet{bin = Bin}) ->
+ send(Sock, AssocId, StreamId, Bin);
+
send(Sock, AssocId, StreamId, Bin) ->
case gen_sctp:send(Sock, AssocId, StreamId, Bin) of
ok ->
@@ -720,11 +740,10 @@ recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin}
recv(T, S#transport{assoc_id = Id});
%% Inbound Diameter message.
-recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid} = S)
+recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, S)
when is_binary(Bin) ->
- diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id},
- bin = Bin}),
- S;
+ Pkt = #diameter_packet{bin = Bin, transport_data = {stream, Id}},
+ message(recv, Pkt, S);
recv({_, #sctp_shutdown_event{}}, _) ->
stop;
@@ -842,6 +861,23 @@ connect(Sock, [Addr | AT] = As, Port, Reasons) ->
connect(Sock, AT, Port, [{Addr, E} | Reasons])
end.
+%% setopts/2
+
+setopts(_, #transport{socket = Sock,
+ active = A,
+ recv = B}
+ = S)
+ when B, not A ->
+ setopts(Sock),
+ S#transport{active = true};
+
+setopts(_, #transport{} = S) ->
+ S;
+
+setopts(#transport{socket = Sock}, T) ->
+ setopts(Sock),
+ T.
+
%% setopts/1
setopts(Sock) ->
@@ -849,3 +885,58 @@ setopts(Sock) ->
ok -> ok;
X -> x({setopts, Sock, X}) %% possibly on peer disconnect
end.
+
+%% A message_cb is invoked whenever a message is sent or received, or
+%% to provide acknowledgement of a completed send or discarded
+%% request. See diameter_tcp for semantics.
+
+%% message/3
+
+message(send, false = M, S) ->
+ message(ack, M, S);
+
+message(ack, _, #transport{message_cb = false} = S) ->
+ S;
+
+message(Dir, Msg, #transport{message_cb = CB} = S) ->
+ setopts(S, actions(cb(CB, Dir, Msg), Dir, S)).
+
+%% actions/3
+
+actions([], _, S) ->
+ S;
+
+actions([B | As], Dir, S)
+ when is_boolean(B) ->
+ actions(As, Dir, S#transport{recv = B});
+
+actions([Dir | As], _, S)
+ when Dir == send;
+ Dir == recv ->
+ actions(As, Dir, S);
+
+actions([Msg | As], send = Dir, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ actions(As, Dir, send(Msg, S));
+
+actions([Msg | As], recv = Dir, #transport{parent = Pid} = S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ diameter_peer:recv(Pid, Msg),
+ actions(As, Dir, S);
+
+actions([{defer, Tmo, Acts} | As], Dir, S) ->
+ erlang:send_after(Tmo, self(), {actions, Dir, Acts}),
+ actions(As, Dir, S);
+
+actions(CB, _, S) ->
+ S#transport{message_cb = CB}.
+
+%% cb/3
+
+cb(false, _, Msg) ->
+ [Msg];
+
+cb(CB, Dir, Msg) ->
+ diameter_lib:eval([CB, Dir, Msg]).
diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl
index bb10638cd2..95339127d4 100644
--- a/lib/diameter/test/diameter_traffic_SUITE.erl
+++ b/lib/diameter/test/diameter_traffic_SUITE.erl
@@ -108,7 +108,7 @@
handle_error/6,
handle_request/3]).
-%% diameter_tcp callbacks
+%% diameter_{tcp,sctp} callbacks
-export([message/3]).
-include("diameter.hrl").
@@ -158,7 +158,7 @@
%% Send from a dedicated process?
-define(SENDERS, [true, false]).
-%% Message callbacks from diameter_tcp?
+%% Message callbacks from diameter_{tcp,sctp}?
-define(CALLBACKS, [true, false]).
-record(group,
@@ -465,9 +465,8 @@ add_transports(Config) ->
= group(Config),
LRef = ?util:listen(SN,
[T,
- {sender, SS}
- | [{message_cb, {?MODULE, message, [4]}}
- || ST andalso T == tcp]],
+ {sender, SS},
+ {message_cb, ST andalso {?MODULE, message, [4]}}],
[{capabilities_cb, fun capx/2},
{pool_size, 8},
{spawn_opt, [{min_heap_size, 8096}]},
@@ -1510,6 +1509,9 @@ request(#diameter_base_RAR{}, _Caps) ->
%% Limit the number of messages received. More can be received if read
%% in the same packet.
+message(Dir, #diameter_packet{bin = Bin}, N) ->
+ message(Dir, Bin, N);
+
%% incoming request
message(recv, <<_:32, 1, _/bits>> = Bin, N) ->
[Bin, 1 < N, fun ?MODULE:message/3, N-1];