From 373cd07c28bbe3e299eaca1c96b1441623ad4979 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sun, 11 Jun 2017 01:13:36 +0200 Subject: Add diameter_sctp send/recv callbacks Corresponding to diameter_tcp callbacks a few commits back. Exercise the callbacks in the traffic suite. --- lib/diameter/src/transport/diameter_sctp.erl | 183 ++++++++++++++++++++------- lib/diameter/test/diameter_traffic_SUITE.erl | 12 +- 2 files changed, 144 insertions(+), 51 deletions(-) (limited to 'lib') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 3919596cb1..470a6aa9bf 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -59,9 +59,6 @@ %% The default port for a listener. -define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 -%% Remote addresses to accept connections from. --define(DEFAULT_ACCEPT, []). %% any - %% How long to wait for a transport process to attach after %% association establishment. -define(ACCEPT_TIMEOUT, 5000). @@ -80,7 +77,8 @@ | term(). %% gen_sctp:open_option(). -type option() :: {sender, boolean()} - | sender. + | sender + | {message_cb, false | diameter:evaluable()}. -type uint() :: non_neg_integer(). @@ -93,6 +91,8 @@ %% {RAs, RP, Errors} | connect, socket :: gen_sctp:sctp_socket() | undefined, + active = false :: boolean(), %% is socket active? + recv = true :: boolean(), %% should it be active? assoc_id :: gen_sctp:assoc_id() %% association identifier | undefined | true, @@ -101,11 +101,13 @@ streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, os = 0 :: uint(), %% next output stream + message_cb :: false | diameter:evaluable(), send = false :: pid() | boolean()}). %% sending process %% Monitor process state. -record(monitor, {transport :: pid(), + ack = false :: boolean(), socket :: gen_sctp:sctp_socket(), assoc_id :: gen_sctp:assoc_id()}). %% next output stream @@ -115,8 +117,7 @@ socket :: gen_sctp:sctp_socket(), service :: pid(), %% service process pending = {0, queue:new()}, - accept :: [match()], - sender :: boolean()}). + opts :: [[match()] | boolean() | diameter:evaluable()]}). %% Field pending implements two queues: the first of transport-to-be %% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived @@ -242,7 +243,7 @@ i(#monitor{transport = TPid} = S) -> i({listen, Ref, {Opts, SvcPid, Addrs}}) -> monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {Split, Rest} = proplists:split(Opts, [accept, sender]), + {Split, Rest} = proplists:split(Opts, [accept, sender, message_cb]), OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), @@ -251,13 +252,17 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) -> #listener{ref = Ref, service = SvcPid, socket = Sock, - accept = [[M] || {accept, M} <- OwnOpts], - sender = proplists:get_value(sender, OwnOpts, false)}; + opts = [[[M] || {accept, M} <- OwnOpts] + | [proplists:get_value(K, OwnOpts, false) + || K <- [sender, message_cb]]]}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> - {[Ps | Split], Rest} = proplists:split(Opts, [rport, raddr, sender]), + {[Ps | Split], Rest} + = proplists:split(Opts, [rport, raddr, sender, message_cb]), OwnOpts = lists:append(Split), + CB = proplists:get_value(message_cb, OwnOpts, false), + false == CB orelse (Pid ! {diameter, ack}), RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts], [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], {LAs, Sock} = open(Addrs, Rest, 0), @@ -267,6 +272,7 @@ i({connect, Pid, Opts, Addrs, Ref}) -> #transport{parent = Pid, mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock, + message_cb = CB, send = proplists:get_value(sender, OwnOpts, false)}; %% An accepting transport spawned by diameter, not yet owning an @@ -301,12 +307,15 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> receive {Ref, Pid} when K == parent -> %% transport process started S#transport{parent = Pid}; - {K, T, Matches, Bool} when K == peeloff -> %% association + {K, T, Opts} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, + [Matches, Sender, CB] = Opts, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), + false == CB orelse (S#transport.parent ! {diameter, ack}), t(T, S#transport{socket = Sock, - send = Bool}); + message_cb = CB, + send = Sender}); accept_timeout = T -> x(T); {'DOWN', _, process, _, _} = T -> @@ -477,12 +486,11 @@ getr(Key) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, - accept = Matches, - sender = Sender} + opts = Opts} = S) -> Id = assoc_id(Data), {TPid, NewS} = accept(S), - TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches, Sender}, + TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Opts}, setopts(Sock), NewS; @@ -536,12 +544,21 @@ t(T,S) -> %% Incoming message. transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> - setopts(Sock), - recv(Data, S); + setopts(S, recv(Data, S#transport{active = false})); %% Outgoing message. transition({diameter, {send, Msg}}, S) -> - send(Msg, S); + message(send, Msg, S); + +%% Monitor has sent an outgoing message. +transition(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + message(ack, Msg, S); + +%% Deferred actions from a message_cb. +transition({actions, Dir, Acts}, S) -> + actions(Acts, Dir, S); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> @@ -581,8 +598,12 @@ transition({resolve_port, Pid}, #transport{socket = Sock}) %% m/2 -m({Bin, StreamId}, #monitor{} = S) -> - send(StreamId, Bin, S); +m({Msg, StreamId}, #monitor{socket = Sock, + transport = TPid, + assoc_id = AId, + ack = B}) -> + send(Sock, AId, StreamId, Msg), + B andalso (TPid ! Msg); m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) -> x(T). @@ -632,48 +653,47 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% Start monitor process on first send. send(Msg, #transport{send = true, socket = Sock, - assoc_id = AId} + assoc_id = AId, + message_cb = CB} = S) -> {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(), socket = Sock, - assoc_id = AId}), + assoc_id = AId, + ack = false /= CB}), monitor(process, MPid), send(Msg, S#transport{send = MPid}); %% Outbound Diameter message on a specified stream ... -send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, +send(#diameter_packet{transport_data = {outstream, SId}} + = Msg, #transport{streams = {_, OS}} = S) -> - send(SId rem OS, Bin, S), - S; + send(SId rem OS, Msg, S); %% ... or not: rotate through all streams. -send(#diameter_packet{bin = Bin}, S) -> - send(Bin, S); -send(Bin, #transport{streams = {_, OS}, +send(Msg, #transport{streams = {_, OS}, os = N} - = S) - when is_binary(Bin) -> - send(N, Bin, S), - S#transport{os = (N + 1) rem OS}. + = S) -> + send(N, Msg, S#transport{os = (N + 1) rem OS}). %% send/3 -send(StreamId, Bin, #transport{send = false, +send(StreamId, Msg, #transport{send = false, socket = Sock, - assoc_id = AId}) -> - send(Sock, AId, StreamId, Bin); - -send(StreamId, Bin, #transport{send = MPid}) -> - MPid ! {Bin, StreamId}, - MPid; + assoc_id = AId} + = S) -> + send(Sock, AId, StreamId, Msg), + message(ack, Msg, S); -send(StreamId, Bin, #monitor{socket = Sock, - assoc_id = AId}) -> - send(Sock, AId, StreamId, Bin). +send(StreamId, Msg, #transport{send = MPid} = S) -> + MPid ! {Msg, StreamId}, + S. %% send/4 +send(Sock, AssocId, StreamId, #diameter_packet{bin = Bin}) -> + send(Sock, AssocId, StreamId, Bin); + send(Sock, AssocId, StreamId, Bin) -> case gen_sctp:send(Sock, AssocId, StreamId, Bin) of ok -> @@ -720,11 +740,10 @@ recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin} recv(T, S#transport{assoc_id = Id}); %% Inbound Diameter message. -recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid} = S) +recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, S) when is_binary(Bin) -> - diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id}, - bin = Bin}), - S; + Pkt = #diameter_packet{bin = Bin, transport_data = {stream, Id}}, + message(recv, Pkt, S); recv({_, #sctp_shutdown_event{}}, _) -> stop; @@ -842,6 +861,23 @@ connect(Sock, [Addr | AT] = As, Port, Reasons) -> connect(Sock, AT, Port, [{Addr, E} | Reasons]) end. +%% setopts/2 + +setopts(_, #transport{socket = Sock, + active = A, + recv = B} + = S) + when B, not A -> + setopts(Sock), + S#transport{active = true}; + +setopts(_, #transport{} = S) -> + S; + +setopts(#transport{socket = Sock}, T) -> + setopts(Sock), + T. + %% setopts/1 setopts(Sock) -> @@ -849,3 +885,58 @@ setopts(Sock) -> ok -> ok; X -> x({setopts, Sock, X}) %% possibly on peer disconnect end. + +%% A message_cb is invoked whenever a message is sent or received, or +%% to provide acknowledgement of a completed send or discarded +%% request. See diameter_tcp for semantics. + +%% message/3 + +message(send, false = M, S) -> + message(ack, M, S); + +message(ack, _, #transport{message_cb = false} = S) -> + S; + +message(Dir, Msg, #transport{message_cb = CB} = S) -> + setopts(S, actions(cb(CB, Dir, Msg), Dir, S)). + +%% actions/3 + +actions([], _, S) -> + S; + +actions([B | As], Dir, S) + when is_boolean(B) -> + actions(As, Dir, S#transport{recv = B}); + +actions([Dir | As], _, S) + when Dir == send; + Dir == recv -> + actions(As, Dir, S); + +actions([Msg | As], send = Dir, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + actions(As, Dir, send(Msg, S)); + +actions([Msg | As], recv = Dir, #transport{parent = Pid} = S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + diameter_peer:recv(Pid, Msg), + actions(As, Dir, S); + +actions([{defer, Tmo, Acts} | As], Dir, S) -> + erlang:send_after(Tmo, self(), {actions, Dir, Acts}), + actions(As, Dir, S); + +actions(CB, _, S) -> + S#transport{message_cb = CB}. + +%% cb/3 + +cb(false, _, Msg) -> + [Msg]; + +cb(CB, Dir, Msg) -> + diameter_lib:eval([CB, Dir, Msg]). diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index bb10638cd2..95339127d4 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -108,7 +108,7 @@ handle_error/6, handle_request/3]). -%% diameter_tcp callbacks +%% diameter_{tcp,sctp} callbacks -export([message/3]). -include("diameter.hrl"). @@ -158,7 +158,7 @@ %% Send from a dedicated process? -define(SENDERS, [true, false]). -%% Message callbacks from diameter_tcp? +%% Message callbacks from diameter_{tcp,sctp}? -define(CALLBACKS, [true, false]). -record(group, @@ -465,9 +465,8 @@ add_transports(Config) -> = group(Config), LRef = ?util:listen(SN, [T, - {sender, SS} - | [{message_cb, {?MODULE, message, [4]}} - || ST andalso T == tcp]], + {sender, SS}, + {message_cb, ST andalso {?MODULE, message, [4]}}], [{capabilities_cb, fun capx/2}, {pool_size, 8}, {spawn_opt, [{min_heap_size, 8096}]}, @@ -1510,6 +1509,9 @@ request(#diameter_base_RAR{}, _Caps) -> %% Limit the number of messages received. More can be received if read %% in the same packet. +message(Dir, #diameter_packet{bin = Bin}, N) -> + message(Dir, Bin, N); + %% incoming request message(recv, <<_:32, 1, _/bits>> = Bin, N) -> [Bin, 1 < N, fun ?MODULE:message/3, N-1]; -- cgit v1.2.3