aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl53
-rw-r--r--lib/diameter/test/diameter_traffic_SUITE.erl6
2 files changed, 47 insertions, 12 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 470a6aa9bf..e47febaf99 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -78,6 +78,7 @@
-type option() :: {sender, boolean()}
| sender
+ | {packet, boolean() | raw}
| {message_cb, false | diameter:evaluable()}.
-type uint() :: non_neg_integer().
@@ -101,6 +102,8 @@
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
os = 0 :: uint(), %% next output stream
+ packet = true :: boolean() %% legacy transport_data?
+ | raw,
message_cb :: false | diameter:evaluable(),
send = false :: pid() | boolean()}). %% sending process
@@ -243,7 +246,8 @@ i(#monitor{transport = TPid} = S) ->
i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
monitor(process, SvcPid),
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
- {Split, Rest} = proplists:split(Opts, [accept, sender, message_cb]),
+ {Split, Rest}
+ = proplists:split(Opts, [accept, packet, sender, message_cb]),
OwnOpts = lists:append(Split),
{LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
ok = gen_sctp:listen(Sock, true),
@@ -252,14 +256,15 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
#listener{ref = Ref,
service = SvcPid,
socket = Sock,
- opts = [[[M] || {accept, M} <- OwnOpts]
+ opts = [[[M] || {accept, M} <- OwnOpts],
+ proplists:get_value(packet, OwnOpts, true)
| [proplists:get_value(K, OwnOpts, false)
|| K <- [sender, message_cb]]]};
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
{[Ps | Split], Rest}
- = proplists:split(Opts, [rport, raddr, sender, message_cb]),
+ = proplists:split(Opts, [rport, raddr, packet, sender, message_cb]),
OwnOpts = lists:append(Split),
CB = proplists:get_value(message_cb, OwnOpts, false),
false == CB orelse (Pid ! {diameter, ack}),
@@ -273,6 +278,7 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
mode = {connect, connect(Sock, RAs, RP, [])},
socket = Sock,
message_cb = CB,
+ packet = proplists:get_value(packet, OwnOpts, true),
send = proplists:get_value(sender, OwnOpts, false)};
%% An accepting transport spawned by diameter, not yet owning an
@@ -309,12 +315,13 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
S#transport{parent = Pid};
{K, T, Opts} when K == peeloff -> %% association
{sctp, Sock, _RA, _RP, _Data} = T,
- [Matches, Sender, CB] = Opts,
+ [Matches, Packet, Sender, CB] = Opts,
ok = accept_peer(Sock, Matches),
demonitor(Ref, [flush]),
false == CB orelse (S#transport.parent ! {diameter, ack}),
t(T, S#transport{socket = Sock,
message_cb = CB,
+ packet = Packet,
send = Sender});
accept_timeout = T ->
x(T);
@@ -740,10 +747,9 @@ recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin}
recv(T, S#transport{assoc_id = Id});
%% Inbound Diameter message.
-recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, S)
+recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S)
when is_binary(Bin) ->
- Pkt = #diameter_packet{bin = Bin, transport_data = {stream, Id}},
- message(recv, Pkt, S);
+ message(recv, Msg, S);
recv({_, #sctp_shutdown_event{}}, _) ->
stop;
@@ -888,7 +894,9 @@ setopts(Sock) ->
%% A message_cb is invoked whenever a message is sent or received, or
%% to provide acknowledgement of a completed send or discarded
-%% request. See diameter_tcp for semantics.
+%% request. See diameter_tcp for semantics, the only difference being
+%% that a recv callback can get a diameter_packet record as Msg
+%% depending on how/if option packet has been specified.
%% message/3
@@ -898,8 +906,8 @@ message(send, false = M, S) ->
message(ack, _, #transport{message_cb = false} = S) ->
S;
-message(Dir, Msg, #transport{message_cb = CB} = S) ->
- setopts(S, actions(cb(CB, Dir, Msg), Dir, S)).
+message(Dir, Msg, S) ->
+ setopts(S, actions(cb(S, Dir, Msg), Dir, S)).
%% actions/3
@@ -935,8 +943,31 @@ actions(CB, _, S) ->
%% cb/3
-cb(false, _, Msg) ->
+cb(#transport{message_cb = false, packet = P}, recv, Msg) ->
+ [pkt(P, true, Msg)];
+
+cb(#transport{message_cb = CB, packet = P}, recv = D, Msg) ->
+ cb(CB, D, pkt(P, false, Msg));
+
+cb(#transport{message_cb = CB}, Dir, Msg) ->
+ cb(CB, Dir, Msg);
+
+cb(false, send, Msg) ->
[Msg];
cb(CB, Dir, Msg) ->
diameter_lib:eval([CB, Dir, Msg]).
+
+%% pkt/3
+
+pkt(false, _, {_Info, Bin}) ->
+ Bin;
+
+pkt(true, _, {[#sctp_sndrcvinfo{stream = Id}], Bin}) ->
+ #diameter_packet{bin = Bin, transport_data = {stream, Id}};
+
+pkt(raw, true, {[Info], Bin}) ->
+ #diameter_packet{bin = Bin, transport_data = Info};
+
+pkt(raw, false, {[_], _} = Msg) ->
+ Msg.
diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl
index 95339127d4..f567a6f367 100644
--- a/lib/diameter/test/diameter_traffic_SUITE.erl
+++ b/lib/diameter/test/diameter_traffic_SUITE.erl
@@ -466,7 +466,9 @@ add_transports(Config) ->
LRef = ?util:listen(SN,
[T,
{sender, SS},
- {message_cb, ST andalso {?MODULE, message, [4]}}],
+ {message_cb, ST andalso {?MODULE, message, [4]}}
+ | [{packet, hd(?util:scramble([false, raw]))}
+ || T == sctp andalso CS]],
[{capabilities_cb, fun capx/2},
{pool_size, 8},
{spawn_opt, [{min_heap_size, 8096}]},
@@ -1509,6 +1511,8 @@ request(#diameter_base_RAR{}, _Caps) ->
%% Limit the number of messages received. More can be received if read
%% in the same packet.
+message(recv = D, {[_], Bin}, N) ->
+ message(D, Bin, N);
message(Dir, #diameter_packet{bin = Bin}, N) ->
message(Dir, Bin, N);