From 9ff8491996381cb2297671b94b7282a7ffb2136f Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 9 Feb 2017 17:18:21 +0100 Subject: Don't send from receiving transport processes Both diameter_tcp and diameter_sctp are susceptible to deadlock since a peer that blocks send also prevents additional messages from being received. Send from a process that's paired with the transport process to avoid this. Use the existing monitor process in the TCP case, add one in the SCTP case. This has been the reason for many sporadic testcase failures, mostly in diameter_traffic_SUITE. --- lib/diameter/src/transport/diameter_sctp.erl | 76 +++++++++++++++++++++++----- 1 file changed, 64 insertions(+), 12 deletions(-) (limited to 'lib/diameter/src/transport/diameter_sctp.erl') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index f48e4347ee..f9feb68874 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -52,6 +52,7 @@ %% Keys into process dictionary. -define(INFO_KEY, info). -define(REF_KEY, ref). +-define(TRANSPORT_KEY, transport). -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). @@ -92,7 +93,14 @@ | undefined, streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, - os = 0 :: uint()}). %% next output stream + os = 0 :: uint(), %% next output stream + monitor :: pid() | undefined}). %% sending process + +%% Monitor process state. +-record(monitor, + {transport :: pid(), + socket :: gen_sctp:sctp_socket(), + assoc_id :: gen_sctp:assoc_id()}). %% next output stream %% Listener process state. -record(listener, @@ -216,6 +224,12 @@ init(T) -> %% i/1 +i(#monitor{transport = TPid} = S) -> + monitor(process, TPid), + putr(?TRANSPORT_KEY, TPid), + proc_lib:init_ack({ok, self()}), + S; + %% A process owning a listening socket. i({listen, Ref, {Opts, Addrs}}) -> [_] = diameter_config:subscribe(Ref, transport), %% assert existence @@ -382,6 +396,10 @@ handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) -> S end); +%% Transport is telling us of parent death. +handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> + {stop, {shutdown, Reason}, ok, S}; + handle_call(_, _, State) -> {reply, nok, State}. @@ -400,7 +418,11 @@ handle_info(T, #transport{} = S) -> {noreply, #transport{} = t(T,S)}; handle_info(T, #listener{} = S) -> - {noreply, #listener{} = l(T,S)}. + {noreply, #listener{} = l(T,S)}; + +handle_info(T, #monitor{} = S) -> + m(T,S), + {noreply, S}. %% Prior to the possiblity of setting pool_size on in transport %% configuration, a new accepting transport was only started following @@ -422,6 +444,9 @@ code_change(_, State, _) -> %% # terminate/2 %% --------------------------------------------------------------------------- +terminate(_, #monitor{}) -> + ok; + terminate(_, #transport{assoc_id = undefined}) -> ok; @@ -522,8 +547,17 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> stop; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> +%% Parent process has died: call the monitor to not close the socket +%% during an ongoing send, but don't let it take forever. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, + monitor = MPid}) -> + undefined == MPid + orelse ok == (catch gen_server:call(MPid, {stop, Pid})) + orelse exit(MPid, kill), + stop; + +%% Monitor process has died. +transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> stop; %% Timeout after transport process has been started. @@ -536,6 +570,14 @@ transition({resolve_port, Pid}, #transport{socket = Sock}) Pid ! inet:port(Sock), ok. +%% m/2 + +m({Bin, StreamId}, #monitor{} = S) -> + send(StreamId, Bin, S); + +m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) -> + x(T). + %% Crash on anything unexpected. ok({ok, T}) -> @@ -578,6 +620,17 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% send/2 +%% Start monitor process on first send. +send(Msg, #transport{monitor = undefined, + socket = Sock, + assoc_id = AId} + = S) -> + {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(), + socket = Sock, + assoc_id = AId}), + monitor(process, MPid), + send(Msg, S#transport{monitor = MPid}); + %% Outbound Diameter message on a specified stream ... send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, #transport{streams = {_, OS}} @@ -597,14 +650,13 @@ send(Bin, #transport{streams = {_, OS}, %% send/3 -send(StreamId, Bin, #transport{socket = Sock, - assoc_id = AId}) -> - send(Sock, AId, StreamId, Bin). - -%% send/4 +send(StreamId, Bin, #transport{monitor = MPid}) -> + MPid ! {Bin, StreamId}, + MPid; -send(Sock, AssocId, Stream, Bin) -> - case gen_sctp:send(Sock, AssocId, Stream, Bin) of +send(StreamId, Bin, #monitor{socket = Sock, + assoc_id = AId}) -> + case gen_sctp:send(Sock, AId, StreamId, Bin) of ok -> ok; {error, Reason} -> -- cgit v1.2.3 From 618acfe5dcb0aa3d8ec0101704cd8fc774ac6c90 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 10 Apr 2017 15:28:43 +0200 Subject: Deal with SCTP association id quirk on Solaris In particular, that the association id received in messages on a one-to-one socket after peeloff may be different from the id received on the listen socket at comm_up. This seems odd, since it's then not possible to send until the id is discovered by reception of an SCTP message containing it, but it's unclear if this is a bug or a feature, or if it's specific to certain platforms. Treat it as a feature in this commit, and get the association id as mentioned, an incoming CER being expected before anything is sent. Commit da3e5d67 has more history. --- lib/diameter/src/transport/diameter_sctp.erl | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) (limited to 'lib/diameter/src/transport/diameter_sctp.erl') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index f9feb68874..2059a0e029 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -88,7 +88,9 @@ %% {RAs, RP, Errors} | connect, socket :: gen_sctp:sctp_socket() | undefined, - assoc_id :: gen_sctp:assoc_id(), %% association identifier + assoc_id :: gen_sctp:assoc_id() %% association identifier + | undefined + | true, peer :: {[inet:ip_address()], uint()} %% {RAs, RP} | undefined, streams :: {uint(), uint()} %% {InStream, OutStream} counts @@ -676,7 +678,9 @@ recv({_, #sctp_assoc_change{state = comm_up, = S) -> Ref = getr(?REF_KEY), publish(T, Ref, Id, Sock), - up(S#transport{assoc_id = Id, + %% Deal with different association id after peeloff on Solaris by + %% taking the id from the first reception. + up(S#transport{assoc_id = T == accept orelse Id, streams = {IS, OS}}); %% ... or not: try the next address. @@ -691,16 +695,24 @@ recv({_, #sctp_assoc_change{} = E}, recv({_, #sctp_assoc_change{}}, _) -> stop; +%% First inbound on an accepting transport. +recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin} + = T, + #transport{assoc_id = true} + = S) -> + recv(T, S#transport{assoc_id = Id}); + %% Inbound Diameter message. -recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid}) +recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid} = S) when is_binary(Bin) -> diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id}, bin = Bin}), - ok; + S; recv({_, #sctp_shutdown_event{assoc_id = A}}, #transport{assoc_id = Id}) - when A == Id; + when Id; + A == Id; A == 0 -> stop; -- cgit v1.2.3 From 9a90f20f365a90612ee5e5f7db14fd71b4875b84 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 24 Apr 2017 11:07:27 +0200 Subject: Deal with (another) SCTP association id quirk on Solaris Shutdown events have been seen to get a different association id. For example, first incoming message with association id = 0: + {trace_ts,<6421.268.0>,call, {diameter_sctp,handle_info, [{sctp,#Port<6421.2588>, {10,67,16,179}, 44159, {[{sctp_sndrcvinfo,0,0,[],0,0,0,269950872,269950872,0}], <<1,0,0,156,128,0,1,1,0,0,0,0,6,193,40,137,6,193,40,137,0,0, 1,8,64,0,0,30,67,45,49,51,52,50,49,55,52,52,49,46,101,114, 108,97,110,103,46,111,114,103,0,0,0,0,1,40,64,0,0,18,101, 114,108,97,110,103,46,111,114,103,0,0,0,0,1,1,64,0,0,14,0, 1,127,0,0,1,0,0,0,0,1,10,64,0,0,12,0,0,48,57,0,0,1,13,0,0, 0,20,79,84,80,47,100,105,97,109,101,116,101,114,0,0,1,22, 64,0,0,12,0,0,0,1,0,0,1,2,64,0,0,12,0,0,0,0,0,0,1,3,64,0,0, 12,0,0,0,3>>}}, {transport,<6421.252.0>,accept,#Port<6421.2588>,true,undefined, {32,32}, 0,undefined}]}, {1493,21505,577938}} Later, a shutdown event with association id 1536: + {trace_ts,<6421.268.0>,call, {diameter_sctp,handle_info, [{sctp,#Port<6421.2588>, {10,67,16,179}, 44159, {[],{sctp_shutdown_event,1536}}}, {transport,<6421.252.0>,accept,#Port<6421.2588>,0,undefined, {32,32}, 2,<6421.304.0>}]}, {1493,21505,746929}} Both this and the grandparent commit are on this: $ uname -a SunOS beren 5.10 Generic_118833-33 sun4v sparc SUNW,Netra-T2000 --- lib/diameter/src/transport/diameter_sctp.erl | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) (limited to 'lib/diameter/src/transport/diameter_sctp.erl') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 2059a0e029..1e64804666 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -709,11 +709,7 @@ recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid} = S) bin = Bin}), S; -recv({_, #sctp_shutdown_event{assoc_id = A}}, - #transport{assoc_id = Id}) - when Id; - A == Id; - A == 0 -> +recv({_, #sctp_shutdown_event{}}, _) -> stop; %% Note that diameter_sctp(3) documents that sctp_events cannot be -- cgit v1.2.3 From 034089ed1ba3f78c732edcfc84d85a6ed4a4854e Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 10 Jun 2017 06:34:44 +0200 Subject: Remove upgrade from diameter_sctp; tweak diameter_tcp to match Added in commit 2afd1fe5. Only rename variables in diameter_tcp, no functional change. --- lib/diameter/src/transport/diameter_sctp.erl | 26 ++++++++++---------------- 1 file changed, 10 insertions(+), 16 deletions(-) (limited to 'lib/diameter/src/transport/diameter_sctp.erl') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 1e64804666..d23e56b413 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -108,7 +108,7 @@ -record(listener, {ref :: reference(), socket :: gen_sctp:sctp_socket(), - service = false :: false | pid(), %% service process + service :: pid(), %% service process pending = {0, queue:new()}, accept :: [match()]}). %% Field pending implements two queues: the first of transport-to-be @@ -142,11 +142,11 @@ start(T, Svc, Opts) when is_list(Opts) -> #diameter_service{capabilities = Caps, - pid = SPid} + pid = Pid} = Svc, diameter_sctp_sup:start(), %% start supervisors on demand Addrs = Caps#diameter_caps.host_ip_address, - s(T, Addrs, SPid, lists:map(fun ip/1, Opts)). + s(T, Addrs, Pid, lists:map(fun ip/1, Opts)). ip({ifaddr, A}) -> {ip, A}; @@ -157,9 +157,9 @@ ip(T) -> %% when there is not yet an association to assign it, or at comm_up on %% a new association in which case the call retrieves a transport from %% the pending queue. -s({accept, Ref} = A, Addrs, SPid, Opts) -> - {ok, LPid, LAs} = listener(Ref, {Opts, Addrs}), - try gen_server:call(LPid, {A, self(), SPid}, infinity) of +s({accept, Ref} = A, Addrs, SvcPid, Opts) -> + {ok, LPid, LAs} = listener(Ref, {Opts, SvcPid, Addrs}), + try gen_server:call(LPid, {A, self()}, infinity) of {ok, TPid} -> {ok, TPid, LAs}; No -> @@ -172,7 +172,7 @@ s({accept, Ref} = A, Addrs, SPid, Opts) -> %% gen_sctp in order to be able to accept a new association only %% *after* an accepting transport has been spawned. -s({connect = C, Ref}, Addrs, _SPid, Opts) -> +s({connect = C, Ref}, Addrs, _SvcPid, Opts) -> diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}). %% start_link/1 @@ -233,7 +233,8 @@ i(#monitor{transport = TPid} = S) -> S; %% A process owning a listening socket. -i({listen, Ref, {Opts, Addrs}}) -> +i({listen, Ref, {Opts, SvcPid, Addrs}}) -> + monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence {[Matches], Rest} = proplists:split(Opts, [accept]), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), @@ -241,6 +242,7 @@ i({listen, Ref, {Opts, Addrs}}) -> true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), proc_lib:init_ack({ok, self(), LAs}), #listener{ref = Ref, + service = SvcPid, socket = Sock, accept = [[M] || {accept, M} <- Matches]}; @@ -390,14 +392,6 @@ handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref} = S) -> {TPid, NewS} = accept(Ref, Pid, S), {reply, {ok, TPid}, NewS}; -handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) -> - handle_call({T, Pid}, From, if not is_pid(P), is_pid(SPid) -> - monitor(process, SPid), - S#listener{service = SPid}; - true -> - S - end); - %% Transport is telling us of parent death. handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> {stop, {shutdown, Reason}, ok, S}; -- cgit v1.2.3 From eadf4efc7e264fe8dd30befb42a42a02cdef58f1 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 10 Jun 2017 23:40:53 +0200 Subject: Make diameter_{tcp,sctp} sender configurable With sends still from the receiving process by default, since changing the default behaviour may well have negative effects. A separate sender probably implies a greater need for some form of load regulation for one, since a blocking send would no longer imply that incoming messages are no longer recevied. Dealing with this could result in the same deadlock that the sending process intends to avoid, but the user should be in control over how/when incoming traffic is regulated. --- lib/diameter/src/transport/diameter_sctp.erl | 59 +++++++++++++++++++--------- 1 file changed, 41 insertions(+), 18 deletions(-) (limited to 'lib/diameter/src/transport/diameter_sctp.erl') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index d23e56b413..3919596cb1 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -68,6 +68,7 @@ -type connect_option() :: {raddr, inet:ip_address()} | {rport, inet:port_number()} + | option() | term(). %% gen_sctp:open_option(). -type match() :: inet:ip_address() @@ -75,8 +76,12 @@ | [match()]. -type listen_option() :: {accept, match()} + | option() | term(). %% gen_sctp:open_option(). +-type option() :: {sender, boolean()} + | sender. + -type uint() :: non_neg_integer(). %% Accepting/connecting transport process state. @@ -96,7 +101,7 @@ streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, os = 0 :: uint(), %% next output stream - monitor :: pid() | undefined}). %% sending process + send = false :: pid() | boolean()}). %% sending process %% Monitor process state. -record(monitor, @@ -110,7 +115,8 @@ socket :: gen_sctp:sctp_socket(), service :: pid(), %% service process pending = {0, queue:new()}, - accept :: [match()]}). + accept :: [match()], + sender :: boolean()}). %% Field pending implements two queues: the first of transport-to-be %% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived @@ -236,7 +242,8 @@ i(#monitor{transport = TPid} = S) -> i({listen, Ref, {Opts, SvcPid, Addrs}}) -> monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {[Matches], Rest} = proplists:split(Opts, [accept]), + {Split, Rest} = proplists:split(Opts, [accept, sender]), + OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), @@ -244,12 +251,14 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) -> #listener{ref = Ref, service = SvcPid, socket = Sock, - accept = [[M] || {accept, M} <- Matches]}; + accept = [[M] || {accept, M} <- OwnOpts], + sender = proplists:get_value(sender, OwnOpts, false)}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> - {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]), - RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As], + {[Ps | Split], Rest} = proplists:split(Opts, [rport, raddr, sender]), + OwnOpts = lists:append(Split), + RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts], [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], {LAs, Sock} = open(Addrs, Rest, 0), putr(?REF_KEY, Ref), @@ -257,7 +266,8 @@ i({connect, Pid, Opts, Addrs, Ref}) -> monitor(process, Pid), #transport{parent = Pid, mode = {connect, connect(Sock, RAs, RP, [])}, - socket = Sock}; + socket = Sock, + send = proplists:get_value(sender, OwnOpts, false)}; %% An accepting transport spawned by diameter, not yet owning an %% association. @@ -291,11 +301,12 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> receive {Ref, Pid} when K == parent -> %% transport process started S#transport{parent = Pid}; - {K, T, Matches} when K == peeloff -> %% association + {K, T, Matches, Bool} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), - t(T, S#transport{socket = Sock}); + t(T, S#transport{socket = Sock, + send = Bool}); accept_timeout = T -> x(T); {'DOWN', _, process, _, _} = T -> @@ -466,11 +477,12 @@ getr(Key) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, - accept = Matches} + accept = Matches, + sender = Sender} = S) -> Id = assoc_id(Data), {TPid, NewS} = accept(S), - TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches}, + TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches, Sender}, setopts(Sock), NewS; @@ -546,14 +558,15 @@ transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> %% Parent process has died: call the monitor to not close the socket %% during an ongoing send, but don't let it take forever. transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, - monitor = MPid}) -> - undefined == MPid + send = MPid}) -> + is_boolean(MPid) orelse ok == (catch gen_server:call(MPid, {stop, Pid})) orelse exit(MPid, kill), stop; %% Monitor process has died. -transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> +transition({'DOWN', _, process, MPid, _}, #transport{send = MPid}) + when is_pid(MPid) -> stop; %% Timeout after transport process has been started. @@ -617,7 +630,7 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% send/2 %% Start monitor process on first send. -send(Msg, #transport{monitor = undefined, +send(Msg, #transport{send = true, socket = Sock, assoc_id = AId} = S) -> @@ -625,7 +638,7 @@ send(Msg, #transport{monitor = undefined, socket = Sock, assoc_id = AId}), monitor(process, MPid), - send(Msg, S#transport{monitor = MPid}); + send(Msg, S#transport{send = MPid}); %% Outbound Diameter message on a specified stream ... send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, @@ -646,13 +659,23 @@ send(Bin, #transport{streams = {_, OS}, %% send/3 -send(StreamId, Bin, #transport{monitor = MPid}) -> +send(StreamId, Bin, #transport{send = false, + socket = Sock, + assoc_id = AId}) -> + send(Sock, AId, StreamId, Bin); + +send(StreamId, Bin, #transport{send = MPid}) -> MPid ! {Bin, StreamId}, MPid; send(StreamId, Bin, #monitor{socket = Sock, assoc_id = AId}) -> - case gen_sctp:send(Sock, AId, StreamId, Bin) of + send(Sock, AId, StreamId, Bin). + +%% send/4 + +send(Sock, AssocId, StreamId, Bin) -> + case gen_sctp:send(Sock, AssocId, StreamId, Bin) of ok -> ok; {error, Reason} -> -- cgit v1.2.3 From 373cd07c28bbe3e299eaca1c96b1441623ad4979 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sun, 11 Jun 2017 01:13:36 +0200 Subject: Add diameter_sctp send/recv callbacks Corresponding to diameter_tcp callbacks a few commits back. Exercise the callbacks in the traffic suite. --- lib/diameter/src/transport/diameter_sctp.erl | 183 ++++++++++++++++++++------- 1 file changed, 137 insertions(+), 46 deletions(-) (limited to 'lib/diameter/src/transport/diameter_sctp.erl') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 3919596cb1..470a6aa9bf 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -59,9 +59,6 @@ %% The default port for a listener. -define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 -%% Remote addresses to accept connections from. --define(DEFAULT_ACCEPT, []). %% any - %% How long to wait for a transport process to attach after %% association establishment. -define(ACCEPT_TIMEOUT, 5000). @@ -80,7 +77,8 @@ | term(). %% gen_sctp:open_option(). -type option() :: {sender, boolean()} - | sender. + | sender + | {message_cb, false | diameter:evaluable()}. -type uint() :: non_neg_integer(). @@ -93,6 +91,8 @@ %% {RAs, RP, Errors} | connect, socket :: gen_sctp:sctp_socket() | undefined, + active = false :: boolean(), %% is socket active? + recv = true :: boolean(), %% should it be active? assoc_id :: gen_sctp:assoc_id() %% association identifier | undefined | true, @@ -101,11 +101,13 @@ streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, os = 0 :: uint(), %% next output stream + message_cb :: false | diameter:evaluable(), send = false :: pid() | boolean()}). %% sending process %% Monitor process state. -record(monitor, {transport :: pid(), + ack = false :: boolean(), socket :: gen_sctp:sctp_socket(), assoc_id :: gen_sctp:assoc_id()}). %% next output stream @@ -115,8 +117,7 @@ socket :: gen_sctp:sctp_socket(), service :: pid(), %% service process pending = {0, queue:new()}, - accept :: [match()], - sender :: boolean()}). + opts :: [[match()] | boolean() | diameter:evaluable()]}). %% Field pending implements two queues: the first of transport-to-be %% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived @@ -242,7 +243,7 @@ i(#monitor{transport = TPid} = S) -> i({listen, Ref, {Opts, SvcPid, Addrs}}) -> monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {Split, Rest} = proplists:split(Opts, [accept, sender]), + {Split, Rest} = proplists:split(Opts, [accept, sender, message_cb]), OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), @@ -251,13 +252,17 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) -> #listener{ref = Ref, service = SvcPid, socket = Sock, - accept = [[M] || {accept, M} <- OwnOpts], - sender = proplists:get_value(sender, OwnOpts, false)}; + opts = [[[M] || {accept, M} <- OwnOpts] + | [proplists:get_value(K, OwnOpts, false) + || K <- [sender, message_cb]]]}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> - {[Ps | Split], Rest} = proplists:split(Opts, [rport, raddr, sender]), + {[Ps | Split], Rest} + = proplists:split(Opts, [rport, raddr, sender, message_cb]), OwnOpts = lists:append(Split), + CB = proplists:get_value(message_cb, OwnOpts, false), + false == CB orelse (Pid ! {diameter, ack}), RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts], [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], {LAs, Sock} = open(Addrs, Rest, 0), @@ -267,6 +272,7 @@ i({connect, Pid, Opts, Addrs, Ref}) -> #transport{parent = Pid, mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock, + message_cb = CB, send = proplists:get_value(sender, OwnOpts, false)}; %% An accepting transport spawned by diameter, not yet owning an @@ -301,12 +307,15 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> receive {Ref, Pid} when K == parent -> %% transport process started S#transport{parent = Pid}; - {K, T, Matches, Bool} when K == peeloff -> %% association + {K, T, Opts} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, + [Matches, Sender, CB] = Opts, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), + false == CB orelse (S#transport.parent ! {diameter, ack}), t(T, S#transport{socket = Sock, - send = Bool}); + message_cb = CB, + send = Sender}); accept_timeout = T -> x(T); {'DOWN', _, process, _, _} = T -> @@ -477,12 +486,11 @@ getr(Key) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, - accept = Matches, - sender = Sender} + opts = Opts} = S) -> Id = assoc_id(Data), {TPid, NewS} = accept(S), - TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches, Sender}, + TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Opts}, setopts(Sock), NewS; @@ -536,12 +544,21 @@ t(T,S) -> %% Incoming message. transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> - setopts(Sock), - recv(Data, S); + setopts(S, recv(Data, S#transport{active = false})); %% Outgoing message. transition({diameter, {send, Msg}}, S) -> - send(Msg, S); + message(send, Msg, S); + +%% Monitor has sent an outgoing message. +transition(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + message(ack, Msg, S); + +%% Deferred actions from a message_cb. +transition({actions, Dir, Acts}, S) -> + actions(Acts, Dir, S); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> @@ -581,8 +598,12 @@ transition({resolve_port, Pid}, #transport{socket = Sock}) %% m/2 -m({Bin, StreamId}, #monitor{} = S) -> - send(StreamId, Bin, S); +m({Msg, StreamId}, #monitor{socket = Sock, + transport = TPid, + assoc_id = AId, + ack = B}) -> + send(Sock, AId, StreamId, Msg), + B andalso (TPid ! Msg); m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) -> x(T). @@ -632,48 +653,47 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% Start monitor process on first send. send(Msg, #transport{send = true, socket = Sock, - assoc_id = AId} + assoc_id = AId, + message_cb = CB} = S) -> {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(), socket = Sock, - assoc_id = AId}), + assoc_id = AId, + ack = false /= CB}), monitor(process, MPid), send(Msg, S#transport{send = MPid}); %% Outbound Diameter message on a specified stream ... -send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, +send(#diameter_packet{transport_data = {outstream, SId}} + = Msg, #transport{streams = {_, OS}} = S) -> - send(SId rem OS, Bin, S), - S; + send(SId rem OS, Msg, S); %% ... or not: rotate through all streams. -send(#diameter_packet{bin = Bin}, S) -> - send(Bin, S); -send(Bin, #transport{streams = {_, OS}, +send(Msg, #transport{streams = {_, OS}, os = N} - = S) - when is_binary(Bin) -> - send(N, Bin, S), - S#transport{os = (N + 1) rem OS}. + = S) -> + send(N, Msg, S#transport{os = (N + 1) rem OS}). %% send/3 -send(StreamId, Bin, #transport{send = false, +send(StreamId, Msg, #transport{send = false, socket = Sock, - assoc_id = AId}) -> - send(Sock, AId, StreamId, Bin); - -send(StreamId, Bin, #transport{send = MPid}) -> - MPid ! {Bin, StreamId}, - MPid; + assoc_id = AId} + = S) -> + send(Sock, AId, StreamId, Msg), + message(ack, Msg, S); -send(StreamId, Bin, #monitor{socket = Sock, - assoc_id = AId}) -> - send(Sock, AId, StreamId, Bin). +send(StreamId, Msg, #transport{send = MPid} = S) -> + MPid ! {Msg, StreamId}, + S. %% send/4 +send(Sock, AssocId, StreamId, #diameter_packet{bin = Bin}) -> + send(Sock, AssocId, StreamId, Bin); + send(Sock, AssocId, StreamId, Bin) -> case gen_sctp:send(Sock, AssocId, StreamId, Bin) of ok -> @@ -720,11 +740,10 @@ recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin} recv(T, S#transport{assoc_id = Id}); %% Inbound Diameter message. -recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid} = S) +recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, S) when is_binary(Bin) -> - diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id}, - bin = Bin}), - S; + Pkt = #diameter_packet{bin = Bin, transport_data = {stream, Id}}, + message(recv, Pkt, S); recv({_, #sctp_shutdown_event{}}, _) -> stop; @@ -842,6 +861,23 @@ connect(Sock, [Addr | AT] = As, Port, Reasons) -> connect(Sock, AT, Port, [{Addr, E} | Reasons]) end. +%% setopts/2 + +setopts(_, #transport{socket = Sock, + active = A, + recv = B} + = S) + when B, not A -> + setopts(Sock), + S#transport{active = true}; + +setopts(_, #transport{} = S) -> + S; + +setopts(#transport{socket = Sock}, T) -> + setopts(Sock), + T. + %% setopts/1 setopts(Sock) -> @@ -849,3 +885,58 @@ setopts(Sock) -> ok -> ok; X -> x({setopts, Sock, X}) %% possibly on peer disconnect end. + +%% A message_cb is invoked whenever a message is sent or received, or +%% to provide acknowledgement of a completed send or discarded +%% request. See diameter_tcp for semantics. + +%% message/3 + +message(send, false = M, S) -> + message(ack, M, S); + +message(ack, _, #transport{message_cb = false} = S) -> + S; + +message(Dir, Msg, #transport{message_cb = CB} = S) -> + setopts(S, actions(cb(CB, Dir, Msg), Dir, S)). + +%% actions/3 + +actions([], _, S) -> + S; + +actions([B | As], Dir, S) + when is_boolean(B) -> + actions(As, Dir, S#transport{recv = B}); + +actions([Dir | As], _, S) + when Dir == send; + Dir == recv -> + actions(As, Dir, S); + +actions([Msg | As], send = Dir, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + actions(As, Dir, send(Msg, S)); + +actions([Msg | As], recv = Dir, #transport{parent = Pid} = S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + diameter_peer:recv(Pid, Msg), + actions(As, Dir, S); + +actions([{defer, Tmo, Acts} | As], Dir, S) -> + erlang:send_after(Tmo, self(), {actions, Dir, Acts}), + actions(As, Dir, S); + +actions(CB, _, S) -> + S#transport{message_cb = CB}. + +%% cb/3 + +cb(false, _, Msg) -> + [Msg]; + +cb(CB, Dir, Msg) -> + diameter_lib:eval([CB, Dir, Msg]). -- cgit v1.2.3 From c591056bb2ce9147cc946e068e980050be67dcc1 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 12 Jun 2017 15:26:45 +0200 Subject: Add diameter_sctp option packet To determine the wrapping of messages passed to recv callbacks and into diameter. The default passing of the input stream in transport_data is probably of no practical use, but has been set since time immemorial. --- lib/diameter/src/transport/diameter_sctp.erl | 53 ++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 11 deletions(-) (limited to 'lib/diameter/src/transport/diameter_sctp.erl') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 470a6aa9bf..e47febaf99 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -78,6 +78,7 @@ -type option() :: {sender, boolean()} | sender + | {packet, boolean() | raw} | {message_cb, false | diameter:evaluable()}. -type uint() :: non_neg_integer(). @@ -101,6 +102,8 @@ streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, os = 0 :: uint(), %% next output stream + packet = true :: boolean() %% legacy transport_data? + | raw, message_cb :: false | diameter:evaluable(), send = false :: pid() | boolean()}). %% sending process @@ -243,7 +246,8 @@ i(#monitor{transport = TPid} = S) -> i({listen, Ref, {Opts, SvcPid, Addrs}}) -> monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {Split, Rest} = proplists:split(Opts, [accept, sender, message_cb]), + {Split, Rest} + = proplists:split(Opts, [accept, packet, sender, message_cb]), OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), @@ -252,14 +256,15 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) -> #listener{ref = Ref, service = SvcPid, socket = Sock, - opts = [[[M] || {accept, M} <- OwnOpts] + opts = [[[M] || {accept, M} <- OwnOpts], + proplists:get_value(packet, OwnOpts, true) | [proplists:get_value(K, OwnOpts, false) || K <- [sender, message_cb]]]}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> {[Ps | Split], Rest} - = proplists:split(Opts, [rport, raddr, sender, message_cb]), + = proplists:split(Opts, [rport, raddr, packet, sender, message_cb]), OwnOpts = lists:append(Split), CB = proplists:get_value(message_cb, OwnOpts, false), false == CB orelse (Pid ! {diameter, ack}), @@ -273,6 +278,7 @@ i({connect, Pid, Opts, Addrs, Ref}) -> mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock, message_cb = CB, + packet = proplists:get_value(packet, OwnOpts, true), send = proplists:get_value(sender, OwnOpts, false)}; %% An accepting transport spawned by diameter, not yet owning an @@ -309,12 +315,13 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> S#transport{parent = Pid}; {K, T, Opts} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, - [Matches, Sender, CB] = Opts, + [Matches, Packet, Sender, CB] = Opts, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), false == CB orelse (S#transport.parent ! {diameter, ack}), t(T, S#transport{socket = Sock, message_cb = CB, + packet = Packet, send = Sender}); accept_timeout = T -> x(T); @@ -740,10 +747,9 @@ recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin} recv(T, S#transport{assoc_id = Id}); %% Inbound Diameter message. -recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, S) +recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S) when is_binary(Bin) -> - Pkt = #diameter_packet{bin = Bin, transport_data = {stream, Id}}, - message(recv, Pkt, S); + message(recv, Msg, S); recv({_, #sctp_shutdown_event{}}, _) -> stop; @@ -888,7 +894,9 @@ setopts(Sock) -> %% A message_cb is invoked whenever a message is sent or received, or %% to provide acknowledgement of a completed send or discarded -%% request. See diameter_tcp for semantics. +%% request. See diameter_tcp for semantics, the only difference being +%% that a recv callback can get a diameter_packet record as Msg +%% depending on how/if option packet has been specified. %% message/3 @@ -898,8 +906,8 @@ message(send, false = M, S) -> message(ack, _, #transport{message_cb = false} = S) -> S; -message(Dir, Msg, #transport{message_cb = CB} = S) -> - setopts(S, actions(cb(CB, Dir, Msg), Dir, S)). +message(Dir, Msg, S) -> + setopts(S, actions(cb(S, Dir, Msg), Dir, S)). %% actions/3 @@ -935,8 +943,31 @@ actions(CB, _, S) -> %% cb/3 -cb(false, _, Msg) -> +cb(#transport{message_cb = false, packet = P}, recv, Msg) -> + [pkt(P, true, Msg)]; + +cb(#transport{message_cb = CB, packet = P}, recv = D, Msg) -> + cb(CB, D, pkt(P, false, Msg)); + +cb(#transport{message_cb = CB}, Dir, Msg) -> + cb(CB, Dir, Msg); + +cb(false, send, Msg) -> [Msg]; cb(CB, Dir, Msg) -> diameter_lib:eval([CB, Dir, Msg]). + +%% pkt/3 + +pkt(false, _, {_Info, Bin}) -> + Bin; + +pkt(true, _, {[#sctp_sndrcvinfo{stream = Id}], Bin}) -> + #diameter_packet{bin = Bin, transport_data = {stream, Id}}; + +pkt(raw, true, {[Info], Bin}) -> + #diameter_packet{bin = Bin, transport_data = Info}; + +pkt(raw, false, {[_], _} = Msg) -> + Msg. -- cgit v1.2.3 From 19ad1805bfc895b12ea692e713cbfd252c44d147 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 12 Jun 2017 20:42:52 +0200 Subject: Fix dialyzer warnings diameter_sctp.erl:292: Record construction #transport{parent::pid(),mode::{'accept',atom() | pid() | port() | {atom(),atom()}},active::'false',recv::'true',os::0,packet::'true',message_cb::'undefined',send::'false'} violates the declared type of field message_cb::'false' | fun() | maybe_improper_list(fun() | maybe_improper_list(any(),[any()]) | {atom(),atom(),[any()]},[any()]) | {atom(),atom(),[any()]} diameter_sctp.erl:302: Record construction #transport{mode::{'accept',atom() | pid() | port() | {atom(),atom()}},active::'false',recv::'true',os::0,packet::'true',message_cb::'undefined',send::'false'} violates the declared type of field message_cb::'false' | fun() | maybe_improper_list(fun() | maybe_improper_list(any(),[any()]) | {atom(),atom(),[any()]},[any()]) | {atom(),atom(),[any()]} --- lib/diameter/src/transport/diameter_sctp.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/diameter/src/transport/diameter_sctp.erl') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index e47febaf99..f243f2b7cd 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -104,7 +104,7 @@ os = 0 :: uint(), %% next output stream packet = true :: boolean() %% legacy transport_data? | raw, - message_cb :: false | diameter:evaluable(), + message_cb = false :: false | diameter:evaluable(), send = false :: pid() | boolean()}). %% sending process %% Monitor process state. -- cgit v1.2.3