From c591056bb2ce9147cc946e068e980050be67dcc1 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 12 Jun 2017 15:26:45 +0200 Subject: Add diameter_sctp option packet To determine the wrapping of messages passed to recv callbacks and into diameter. The default passing of the input stream in transport_data is probably of no practical use, but has been set since time immemorial. --- lib/diameter/src/transport/diameter_sctp.erl | 53 ++++++++++++++++++++++------ lib/diameter/test/diameter_traffic_SUITE.erl | 6 +++- 2 files changed, 47 insertions(+), 12 deletions(-) (limited to 'lib') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 470a6aa9bf..e47febaf99 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -78,6 +78,7 @@ -type option() :: {sender, boolean()} | sender + | {packet, boolean() | raw} | {message_cb, false | diameter:evaluable()}. -type uint() :: non_neg_integer(). @@ -101,6 +102,8 @@ streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, os = 0 :: uint(), %% next output stream + packet = true :: boolean() %% legacy transport_data? + | raw, message_cb :: false | diameter:evaluable(), send = false :: pid() | boolean()}). %% sending process @@ -243,7 +246,8 @@ i(#monitor{transport = TPid} = S) -> i({listen, Ref, {Opts, SvcPid, Addrs}}) -> monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {Split, Rest} = proplists:split(Opts, [accept, sender, message_cb]), + {Split, Rest} + = proplists:split(Opts, [accept, packet, sender, message_cb]), OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), @@ -252,14 +256,15 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) -> #listener{ref = Ref, service = SvcPid, socket = Sock, - opts = [[[M] || {accept, M} <- OwnOpts] + opts = [[[M] || {accept, M} <- OwnOpts], + proplists:get_value(packet, OwnOpts, true) | [proplists:get_value(K, OwnOpts, false) || K <- [sender, message_cb]]]}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> {[Ps | Split], Rest} - = proplists:split(Opts, [rport, raddr, sender, message_cb]), + = proplists:split(Opts, [rport, raddr, packet, sender, message_cb]), OwnOpts = lists:append(Split), CB = proplists:get_value(message_cb, OwnOpts, false), false == CB orelse (Pid ! {diameter, ack}), @@ -273,6 +278,7 @@ i({connect, Pid, Opts, Addrs, Ref}) -> mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock, message_cb = CB, + packet = proplists:get_value(packet, OwnOpts, true), send = proplists:get_value(sender, OwnOpts, false)}; %% An accepting transport spawned by diameter, not yet owning an @@ -309,12 +315,13 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> S#transport{parent = Pid}; {K, T, Opts} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, - [Matches, Sender, CB] = Opts, + [Matches, Packet, Sender, CB] = Opts, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), false == CB orelse (S#transport.parent ! {diameter, ack}), t(T, S#transport{socket = Sock, message_cb = CB, + packet = Packet, send = Sender}); accept_timeout = T -> x(T); @@ -740,10 +747,9 @@ recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin} recv(T, S#transport{assoc_id = Id}); %% Inbound Diameter message. -recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, S) +recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S) when is_binary(Bin) -> - Pkt = #diameter_packet{bin = Bin, transport_data = {stream, Id}}, - message(recv, Pkt, S); + message(recv, Msg, S); recv({_, #sctp_shutdown_event{}}, _) -> stop; @@ -888,7 +894,9 @@ setopts(Sock) -> %% A message_cb is invoked whenever a message is sent or received, or %% to provide acknowledgement of a completed send or discarded -%% request. See diameter_tcp for semantics. +%% request. See diameter_tcp for semantics, the only difference being +%% that a recv callback can get a diameter_packet record as Msg +%% depending on how/if option packet has been specified. %% message/3 @@ -898,8 +906,8 @@ message(send, false = M, S) -> message(ack, _, #transport{message_cb = false} = S) -> S; -message(Dir, Msg, #transport{message_cb = CB} = S) -> - setopts(S, actions(cb(CB, Dir, Msg), Dir, S)). +message(Dir, Msg, S) -> + setopts(S, actions(cb(S, Dir, Msg), Dir, S)). %% actions/3 @@ -935,8 +943,31 @@ actions(CB, _, S) -> %% cb/3 -cb(false, _, Msg) -> +cb(#transport{message_cb = false, packet = P}, recv, Msg) -> + [pkt(P, true, Msg)]; + +cb(#transport{message_cb = CB, packet = P}, recv = D, Msg) -> + cb(CB, D, pkt(P, false, Msg)); + +cb(#transport{message_cb = CB}, Dir, Msg) -> + cb(CB, Dir, Msg); + +cb(false, send, Msg) -> [Msg]; cb(CB, Dir, Msg) -> diameter_lib:eval([CB, Dir, Msg]). + +%% pkt/3 + +pkt(false, _, {_Info, Bin}) -> + Bin; + +pkt(true, _, {[#sctp_sndrcvinfo{stream = Id}], Bin}) -> + #diameter_packet{bin = Bin, transport_data = {stream, Id}}; + +pkt(raw, true, {[Info], Bin}) -> + #diameter_packet{bin = Bin, transport_data = Info}; + +pkt(raw, false, {[_], _} = Msg) -> + Msg. diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index 95339127d4..f567a6f367 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -466,7 +466,9 @@ add_transports(Config) -> LRef = ?util:listen(SN, [T, {sender, SS}, - {message_cb, ST andalso {?MODULE, message, [4]}}], + {message_cb, ST andalso {?MODULE, message, [4]}} + | [{packet, hd(?util:scramble([false, raw]))} + || T == sctp andalso CS]], [{capabilities_cb, fun capx/2}, {pool_size, 8}, {spawn_opt, [{min_heap_size, 8096}]}, @@ -1509,6 +1511,8 @@ request(#diameter_base_RAR{}, _Caps) -> %% Limit the number of messages received. More can be received if read %% in the same packet. +message(recv = D, {[_], Bin}, N) -> + message(D, Bin, N); message(Dir, #diameter_packet{bin = Bin}, N) -> message(Dir, Bin, N); -- cgit v1.2.3