From 4bad839a0931d7a926118c49f20cfab6ebcfdb91 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sun, 11 Jun 2017 16:27:21 +0200 Subject: Remove trailing whitespace From commits 5ca5fb71 and 58091992. --- lib/diameter/src/base/diameter_config.erl | 2 +- lib/diameter/src/base/diameter_reg.erl | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index 245a3ea7ac..386ae967f2 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -277,7 +277,7 @@ start_link() -> start_link(T) -> proc_lib:start_link(?MODULE, init, [T], infinity, []). - + state() -> call(state). diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 9027130063..4910979219 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -137,7 +137,7 @@ match(Pat) -> match(Pat, Pid) -> ets:match_object(?TABLE, {Pat, Pid}). - + %% =========================================================================== %% # wait(Pat) %% -- cgit v1.2.3 From 6b7ec6db6bd999a509e867aef438b3b8769cae43 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 15 Mar 2017 16:42:44 +0100 Subject: Fix broken discard acknowledgement A transport process can request acknowledgement of the fate of an incoming message to a specified pid, causing it to receive one of {diameter, {request|answer, pid()} | discard} depending on whether or not diameter passes the message off to a handler process. This was broken in commit a4da06a5 (since recv/3 threw a message that should be received), but is of little consequence since the interface isn't yet documented and is only used from diameter_tcp with configuration that will soon change. --- lib/diameter/src/base/diameter_watchdog.erl | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index f28b8f2910..4484b7ee2c 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -590,11 +590,9 @@ incoming(Name, Pkt, false, S) -> recv(Name, Pkt, S); incoming(Name, Pkt, NPid, S) -> - try - recv(Name, Pkt, S) - after - NPid ! {diameter, discard} - end. + NS = recv(Name, Pkt, S), + NPid ! {diameter, discard}, + NS. %% recv/3 -- cgit v1.2.3 From 2d74fa618f3a34a5487f5de37c4f6e2870b58273 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 15 Mar 2017 22:33:17 +0100 Subject: Fix handling of message length errors Message length errors in incoming messages were misinterpreted with transport_opt() {length_errors, exit} due to the throw introduced in commit 2ffb288: the corresponding catch in incoming/2 caught errors thrown by close/1, leading to failure when the error reason was interpreted as a diameter_packet record. Do away with the throw, that also caused woe in the parent commit. --- lib/diameter/src/base/diameter_peer_fsm.erl | 42 +++++++++++++---------------- 1 file changed, 19 insertions(+), 23 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 7ee1e5fe59..d394156367 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -444,7 +444,8 @@ transition({connection_timeout, _}, _) -> %% Incoming message from the transport. transition({diameter, {recv, MsgT}}, S) -> - incoming(MsgT, S); + {Msg, NPid} = msg(MsgT), + incoming(recv(Msg, S), NPid, S); %% Timeout when still in the same state ... transition({timeout = T, PS}, #state{state = PS}) -> @@ -609,31 +610,26 @@ encode(Rec, Dict) -> diameter_codec:encode(Dict, #diameter_packet{header = Hdr, msg = Rec}). -%% incoming/2 +%% incoming/3 -incoming({Msg, NPid}, S) -> - try recv(Msg, S) of - T -> - NPid ! {diameter, discard}, - T - catch - {?MODULE, Name, Pkt} -> - incoming(Name, Pkt, NPid, S) - end; +incoming({recv, Name, Pkt}, NPid, #state{parent = Pid} = S) -> + Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid}, + rcv(Name, Pkt, S); -incoming(Msg, S) -> - try - recv(Msg, S) - catch - {?MODULE, Name, Pkt} -> - incoming(Name, Pkt, false, S) - end. +incoming(T, false, _) -> + T; -%% incoming/4 +incoming(T, NPid, _) -> + NPid ! {diameter, discard}, + T. -incoming(Name, Pkt, NPid, #state{parent = Pid} = S) -> - Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid}, - rcv(Name, Pkt, S). +%% msg/1 + +msg({_,_} = T) -> + T; + +msg(Msg) -> + {Msg, false}. %% recv/2 @@ -701,7 +697,7 @@ recv1('DPA' = N, %% Any other message with a header and no length errors: send to the %% parent. recv1(Name, Pkt, #state{}) -> - throw({?MODULE, Name, Pkt}). + {recv, Name, Pkt}. %% recv/3 -- cgit v1.2.3 From eb69b55b1f2a490f87e5f3f976fcd1351b82eafb Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Fri, 10 Mar 2017 17:09:58 +0100 Subject: Remove clauses supporting old code Since smooth upgrade won't be supported in this branch. --- lib/diameter/src/transport/diameter_tcp.erl | 16 ++-------------- 1 file changed, 2 insertions(+), 14 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 44abc5c3b4..63642a8168 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -229,11 +229,6 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. -i({T, _Ref, _Mod, _Pid, _Opts, _Addrs} = Arg) %% from old code - when T == accept; - T == connect -> - i(erlang:append_element(Arg, _SPid = false)); - %% A monitor process to kill the transport if the parent dies. i(#monitor{parent = Pid, transport = TPid} = S) -> proc_lib:init_ack({ok, self()}), @@ -246,9 +241,6 @@ i(#monitor{parent = Pid, transport = TPid} = S) -> %% death. However, a link can be unlinked and this is exactly what %% gen_tcp seems to so. Links should be left to supervisors. -i({listen = L, Ref, _APid, T}) -> %% from old code - i({L, Ref, T}); - i({listen, Ref, {Mod, Opts, Addrs}}) -> [_] = diameter_config:subscribe(Ref, transport), %% assert existence {[LA, LP], Rest} = proplists:split(Opts, [ip, port]), @@ -535,11 +527,7 @@ l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid, %% Transport has been removed. l({transport, remove, _} = T, #listener{socket = Sock}) -> gen_tcp:close(Sock), - x(T); - -%% Possibly death of an accepting process monitored in old code. -l(_, S) -> - S. + x(T). %% t/2 %% -- cgit v1.2.3 From 69b0c1878a95bdfcfe9043fbccf8a0f7b4545bdc Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 11 Mar 2017 00:04:58 +0100 Subject: Fix gen_tcp close of ssl socket Should be ssl:close/1. --- lib/diameter/src/transport/diameter_tcp.erl | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 63642a8168..8e400bc6ee 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -72,6 +72,7 @@ %% Listener process state. -record(listener, {socket :: inet:socket(), + module :: module(), service = false :: false | pid()}). %% service process %% Monitor process state. @@ -250,7 +251,8 @@ i({listen, Ref, {Mod, Opts, Addrs}}) -> LAddr = laddr(LAddrOpt, Mod, LSock), true = diameter_reg:add_new({?MODULE, listener, {Ref, {LAddr, LSock}}}), proc_lib:init_ack({ok, self(), {LAddr, LSock}}), - #listener{socket = LSock}. + #listener{socket = LSock, + module = Mod}. laddr([], Mod, Sock) -> {ok, {Addr, _Port}} = sockname(Mod, Sock), @@ -520,13 +522,15 @@ m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid, %% Service process has died. l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid, - socket = Sock}) -> - gen_tcp:close(Sock), + socket = Sock, + module = M}) -> + M:close(Sock), x(T); %% Transport has been removed. -l({transport, remove, _} = T, #listener{socket = Sock}) -> - gen_tcp:close(Sock), +l({transport, remove, _} = T, #listener{socket = Sock, + module = M}) -> + M:close(Sock), x(T). %% t/2 -- cgit v1.2.3 From 9ff8491996381cb2297671b94b7282a7ffb2136f Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Thu, 9 Feb 2017 17:18:21 +0100 Subject: Don't send from receiving transport processes Both diameter_tcp and diameter_sctp are susceptible to deadlock since a peer that blocks send also prevents additional messages from being received. Send from a process that's paired with the transport process to avoid this. Use the existing monitor process in the TCP case, add one in the SCTP case. This has been the reason for many sporadic testcase failures, mostly in diameter_traffic_SUITE. --- lib/diameter/src/transport/diameter_sctp.erl | 76 ++++++++++++--- lib/diameter/src/transport/diameter_sctp_sup.erl | 3 +- lib/diameter/src/transport/diameter_tcp.erl | 119 ++++++++++++++++------- 3 files changed, 149 insertions(+), 49 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index f48e4347ee..f9feb68874 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -52,6 +52,7 @@ %% Keys into process dictionary. -define(INFO_KEY, info). -define(REF_KEY, ref). +-define(TRANSPORT_KEY, transport). -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). @@ -92,7 +93,14 @@ | undefined, streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, - os = 0 :: uint()}). %% next output stream + os = 0 :: uint(), %% next output stream + monitor :: pid() | undefined}). %% sending process + +%% Monitor process state. +-record(monitor, + {transport :: pid(), + socket :: gen_sctp:sctp_socket(), + assoc_id :: gen_sctp:assoc_id()}). %% next output stream %% Listener process state. -record(listener, @@ -216,6 +224,12 @@ init(T) -> %% i/1 +i(#monitor{transport = TPid} = S) -> + monitor(process, TPid), + putr(?TRANSPORT_KEY, TPid), + proc_lib:init_ack({ok, self()}), + S; + %% A process owning a listening socket. i({listen, Ref, {Opts, Addrs}}) -> [_] = diameter_config:subscribe(Ref, transport), %% assert existence @@ -382,6 +396,10 @@ handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) -> S end); +%% Transport is telling us of parent death. +handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> + {stop, {shutdown, Reason}, ok, S}; + handle_call(_, _, State) -> {reply, nok, State}. @@ -400,7 +418,11 @@ handle_info(T, #transport{} = S) -> {noreply, #transport{} = t(T,S)}; handle_info(T, #listener{} = S) -> - {noreply, #listener{} = l(T,S)}. + {noreply, #listener{} = l(T,S)}; + +handle_info(T, #monitor{} = S) -> + m(T,S), + {noreply, S}. %% Prior to the possiblity of setting pool_size on in transport %% configuration, a new accepting transport was only started following @@ -422,6 +444,9 @@ code_change(_, State, _) -> %% # terminate/2 %% --------------------------------------------------------------------------- +terminate(_, #monitor{}) -> + ok; + terminate(_, #transport{assoc_id = undefined}) -> ok; @@ -522,8 +547,17 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> stop; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> +%% Parent process has died: call the monitor to not close the socket +%% during an ongoing send, but don't let it take forever. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, + monitor = MPid}) -> + undefined == MPid + orelse ok == (catch gen_server:call(MPid, {stop, Pid})) + orelse exit(MPid, kill), + stop; + +%% Monitor process has died. +transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> stop; %% Timeout after transport process has been started. @@ -536,6 +570,14 @@ transition({resolve_port, Pid}, #transport{socket = Sock}) Pid ! inet:port(Sock), ok. +%% m/2 + +m({Bin, StreamId}, #monitor{} = S) -> + send(StreamId, Bin, S); + +m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) -> + x(T). + %% Crash on anything unexpected. ok({ok, T}) -> @@ -578,6 +620,17 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% send/2 +%% Start monitor process on first send. +send(Msg, #transport{monitor = undefined, + socket = Sock, + assoc_id = AId} + = S) -> + {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(), + socket = Sock, + assoc_id = AId}), + monitor(process, MPid), + send(Msg, S#transport{monitor = MPid}); + %% Outbound Diameter message on a specified stream ... send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, #transport{streams = {_, OS}} @@ -597,14 +650,13 @@ send(Bin, #transport{streams = {_, OS}, %% send/3 -send(StreamId, Bin, #transport{socket = Sock, - assoc_id = AId}) -> - send(Sock, AId, StreamId, Bin). - -%% send/4 +send(StreamId, Bin, #transport{monitor = MPid}) -> + MPid ! {Bin, StreamId}, + MPid; -send(Sock, AssocId, Stream, Bin) -> - case gen_sctp:send(Sock, AssocId, Stream, Bin) of +send(StreamId, Bin, #monitor{socket = Sock, + assoc_id = AId}) -> + case gen_sctp:send(Sock, AId, StreamId, Bin) of ok -> ok; {error, Reason} -> diff --git a/lib/diameter/src/transport/diameter_sctp_sup.erl b/lib/diameter/src/transport/diameter_sctp_sup.erl index 36050aaf28..e8e26ec7c5 100644 --- a/lib/diameter/src/transport/diameter_sctp_sup.erl +++ b/lib/diameter/src/transport/diameter_sctp_sup.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -49,6 +49,7 @@ start() -> start_child(T) -> SupRef = case element(1,T) of + monitor -> ?TRANSPORT_SUP; connect -> ?TRANSPORT_SUP; accept -> ?TRANSPORT_SUP; listen -> ?LISTENER_SUP diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 8e400bc6ee..2a580753a0 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -53,6 +53,7 @@ %% Keys into process dictionary. -define(INFO_KEY, info). -define(REF_KEY, ref). +-define(TRANSPORT_KEY, transport). -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). @@ -68,17 +69,22 @@ %% The same gen_server implementation supports three different kinds %% of processes: an actual transport process, one that will club it to %% death should the parent die before a connection is established, and -%% a process owning the listening port. +%% a process owning the listening port. The monitor process +%% historically died after connection establishment, but now lives on +%% as the sender of outgoing messages, so that a blocking send doesn't +%% prevent messages from being received. %% Listener process state. -record(listener, {socket :: inet:socket(), module :: module(), service = false :: false | pid()}). %% service process -%% Monitor process state. +%% Monitor process state. The name monitor predates its role as sender. -record(monitor, - {parent :: pid(), - transport = self() :: pid()}). + {parent :: reference() | false, + transport = self() :: pid(), + socket :: inet:socket() | ssl:sslsocket() | undefined, + module :: module() | undefined}). -type length() :: 0..16#FFFFFF. %% message length from Diameter header -type size() :: non_neg_integer(). %% accumulated binary size @@ -116,7 +122,8 @@ tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? throttle_cb :: false | diameter:evaluable(), %% ask to receive - throttled :: boolean() | binary()}). %% stopped receiving? + throttled :: boolean() | binary(), %% stopped receiving? + monitor :: pid()}). %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -203,8 +210,8 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) T == connect -> monitor(process, Pid), %% Since accept/connect might block indefinitely, spawn a process - %% that does nothing but kill us with the parent until call - %% returns. + %% that kills us with the parent until call returns, and then + %% sends outgoing messages. {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, fragment_timer, @@ -217,8 +224,9 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), Throttle = proplists:get_value(throttle_cb, OwnOpts, false), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid), - MPid ! {stop, self()}, %% tell the monitor to die M = if SslOpts -> ssl; true -> Mod end, + monitor(process, MPid), + MPid ! {start, self(), Sock, M}, %% prepare monitor for sending putr(?REF_KEY, Ref), throttle(#transport{parent = Pid, module = M, @@ -226,21 +234,22 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) ssl = SslOpts, timeout = Tmo, throttle_cb = Throttle, - throttled = false /= Throttle}); + throttled = false /= Throttle, + monitor = MPid}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. %% A monitor process to kill the transport if the parent dies. i(#monitor{parent = Pid, transport = TPid} = S) -> + putr(?TRANSPORT_KEY, TPid), proc_lib:init_ack({ok, self()}), - monitor(process, Pid), monitor(process, TPid), - S; + S#monitor{parent = monitor(process, Pid)}; %% In principle a link between the transport and killer processes %% could do the same thing: have the accepting/connecting process be %% killed when the killer process dies as a consequence of parent %% death. However, a link can be unlinked and this is exactly what -%% gen_tcp seems to so. Links should be left to supervisors. +%% gen_tcp seems to do. Links should be left to supervisors. i({listen, Ref, {Mod, Opts, Addrs}}) -> [_] = diameter_config:subscribe(Ref, transport), %% assert existence @@ -452,7 +461,11 @@ handle_call({accept, SPid}, _From, #listener{service = P} = S) -> true -> S end}; - + +%% Transport is telling us of parent death. +handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> + {stop, {shutdown, Reason}, ok, S}; + handle_call(_, _, State) -> {reply, nok, State}. @@ -474,8 +487,7 @@ handle_info(T, #listener{} = S) -> {noreply, #listener{} = l(T,S)}; handle_info(T, #monitor{} = S) -> - m(T,S), - x(T). + {noreply, #monitor{} = m(T,S)}. %% --------------------------------------------------------------------------- %% # code_change/3 @@ -491,6 +503,7 @@ code_change(_, State, _) -> terminate(_, _) -> ok. + %% --------------------------------------------------------------------------- putr(Key, Val) -> @@ -503,18 +516,38 @@ getr(Key) -> %% %% Transition monitor state. -%% Transport is telling us to die. -m({stop, TPid}, #monitor{transport = TPid}) -> - ok; +%% Outgoing message. +m(Bin, #monitor{} = S) + when is_binary(Bin) -> + send(Bin, S), + S; -%% Transport has died. -m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) -> - ok; +%% Transport is telling us to be ready to send. Stop monitoring on the +%% parent so as not to die before a send from the transport. +m({start, TPid, Sock, Mod}, #monitor{parent = MRef, + transport = TPid} + = S) -> + demonitor(MRef, [flush]), + S#monitor{parent = false, + socket = Sock, + module = Mod}; -%% Transport parent has died. -m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid, - transport = TPid}) -> - exit(TPid, {shutdown, parent}). +%% Transport is telling us to die. +m({stop, TPid} = T, #monitor{transport = TPid}) -> + x(T); + +%% Transport is telling us that TLS has been negotiated after +%% capabilities exchange. +m({tls, SSock}, #monitor{} = S) -> + S#monitor{socket = SSock, + module = ssl}; + +%% Transport or parent has died. +m({'DOWN', M, process, P, _} = T, #monitor{parent = MRef, + transport = TPid}) + when M == MRef; + P == TPid -> + x(T). %% l/2 %% @@ -589,8 +622,9 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, ?ERROR({T,S}); %% Outgoing message. -transition({diameter, {send, Bin}}, S) -> - send(Bin, S); +transition({diameter, {send, Bin}}, #transport{} = S) -> + send(Bin, S), + ok; %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid, @@ -610,8 +644,16 @@ transition({resolve_port, Pid}, #transport{socket = Sock, Pid ! portnr(M, Sock), ok; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> +%% Parent process has died: call the monitor to not close the socket +%% during an ongoing send, but don't let it take forever. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, + monitor = MPid}) -> + ok == (catch gen_server:call(MPid, {stop, Pid})) + orelse exit(MPid, kill), + stop; + +%% Monitor process has died. +transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> stop. %% Crash on anything unexpected. @@ -635,11 +677,13 @@ tls_handshake(_, true, #transport{ssl = false}) -> %% Capabilities exchange negotiated TLS: upgrade the connection. tls_handshake(Type, true, #transport{socket = Sock, module = M, - ssl = Opts} + ssl = Opts, + monitor = MPid} = S) -> {ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]), Ref = getr(?REF_KEY), true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}), + MPid ! {tls, SSock}, %% tell the monitor process S#transport{socket = SSock, module = ssl}; @@ -805,14 +849,20 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(Bin, #transport{socket = Sock, - module = M}) -> +send(Bin, #monitor{socket = Sock, + module = M}) -> case send(M, Sock, Bin) of ok -> ok; {error, Reason} -> x({send, Reason}) - end. + end; + +%% Send from the monitor process to avoid deadlock if both the +%% receiver and the peer were to block in send. +send(Bin, #transport{monitor = MPid}) -> + MPid ! Bin, + MPid. %% send/3 @@ -909,7 +959,6 @@ throttle({NPid, F}, #transport{throttled = Msg} = S) throttle(discard, #transport{throttled = Msg} = S) when is_binary(Msg) -> S; - throttle({discard = T, F}, #transport{throttled = Msg} = S) when is_binary(Msg) -> throttle(T, S#transport{throttle_cb = F}); @@ -920,7 +969,6 @@ throttle(Bin, #transport{throttled = Msg} = S) when is_binary(Bin), is_binary(Msg) -> send(Bin, S), S; - throttle({Bin, F}, #transport{throttled = Msg} = S) when is_binary(Bin), is_binary(Msg) -> throttle(Bin, S#transport{throttle_cb = F}); @@ -929,7 +977,6 @@ throttle({Bin, F}, #transport{throttled = Msg} = S) throttle({timeout, Tmo}, S) -> erlang:send_after(Tmo, self(), throttle), throw(S); - throttle({timeout = T, Tmo, F}, S) -> throttle({T, Tmo}, S#transport{throttle_cb = F}); -- cgit v1.2.3 From c9dd8410fb44034821c4068e1cc4df253b21f9f9 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Tue, 21 Mar 2017 16:38:28 +0100 Subject: Add missing dialyzer types Which dialyzer itself has never complained about. --- lib/diameter/src/transport/diameter_tcp.erl | 1 + 1 file changed, 1 insertion(+) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 2a580753a0..f745e2a6f7 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -104,6 +104,7 @@ -type listen_option() :: {accept, match()} | {ssl_options, true | [ssl:listen_option()]} + | option() | ssl:listen_option() | gen_tcp:listen_option(). -- cgit v1.2.3 From 618acfe5dcb0aa3d8ec0101704cd8fc774ac6c90 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 10 Apr 2017 15:28:43 +0200 Subject: Deal with SCTP association id quirk on Solaris In particular, that the association id received in messages on a one-to-one socket after peeloff may be different from the id received on the listen socket at comm_up. This seems odd, since it's then not possible to send until the id is discovered by reception of an SCTP message containing it, but it's unclear if this is a bug or a feature, or if it's specific to certain platforms. Treat it as a feature in this commit, and get the association id as mentioned, an incoming CER being expected before anything is sent. Commit da3e5d67 has more history. --- lib/diameter/src/transport/diameter_sctp.erl | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index f9feb68874..2059a0e029 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -88,7 +88,9 @@ %% {RAs, RP, Errors} | connect, socket :: gen_sctp:sctp_socket() | undefined, - assoc_id :: gen_sctp:assoc_id(), %% association identifier + assoc_id :: gen_sctp:assoc_id() %% association identifier + | undefined + | true, peer :: {[inet:ip_address()], uint()} %% {RAs, RP} | undefined, streams :: {uint(), uint()} %% {InStream, OutStream} counts @@ -676,7 +678,9 @@ recv({_, #sctp_assoc_change{state = comm_up, = S) -> Ref = getr(?REF_KEY), publish(T, Ref, Id, Sock), - up(S#transport{assoc_id = Id, + %% Deal with different association id after peeloff on Solaris by + %% taking the id from the first reception. + up(S#transport{assoc_id = T == accept orelse Id, streams = {IS, OS}}); %% ... or not: try the next address. @@ -691,16 +695,24 @@ recv({_, #sctp_assoc_change{} = E}, recv({_, #sctp_assoc_change{}}, _) -> stop; +%% First inbound on an accepting transport. +recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin} + = T, + #transport{assoc_id = true} + = S) -> + recv(T, S#transport{assoc_id = Id}); + %% Inbound Diameter message. -recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid}) +recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid} = S) when is_binary(Bin) -> diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id}, bin = Bin}), - ok; + S; recv({_, #sctp_shutdown_event{assoc_id = A}}, #transport{assoc_id = Id}) - when A == Id; + when Id; + A == Id; A == 0 -> stop; -- cgit v1.2.3 From 9a90f20f365a90612ee5e5f7db14fd71b4875b84 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 24 Apr 2017 11:07:27 +0200 Subject: Deal with (another) SCTP association id quirk on Solaris Shutdown events have been seen to get a different association id. For example, first incoming message with association id = 0: + {trace_ts,<6421.268.0>,call, {diameter_sctp,handle_info, [{sctp,#Port<6421.2588>, {10,67,16,179}, 44159, {[{sctp_sndrcvinfo,0,0,[],0,0,0,269950872,269950872,0}], <<1,0,0,156,128,0,1,1,0,0,0,0,6,193,40,137,6,193,40,137,0,0, 1,8,64,0,0,30,67,45,49,51,52,50,49,55,52,52,49,46,101,114, 108,97,110,103,46,111,114,103,0,0,0,0,1,40,64,0,0,18,101, 114,108,97,110,103,46,111,114,103,0,0,0,0,1,1,64,0,0,14,0, 1,127,0,0,1,0,0,0,0,1,10,64,0,0,12,0,0,48,57,0,0,1,13,0,0, 0,20,79,84,80,47,100,105,97,109,101,116,101,114,0,0,1,22, 64,0,0,12,0,0,0,1,0,0,1,2,64,0,0,12,0,0,0,0,0,0,1,3,64,0,0, 12,0,0,0,3>>}}, {transport,<6421.252.0>,accept,#Port<6421.2588>,true,undefined, {32,32}, 0,undefined}]}, {1493,21505,577938}} Later, a shutdown event with association id 1536: + {trace_ts,<6421.268.0>,call, {diameter_sctp,handle_info, [{sctp,#Port<6421.2588>, {10,67,16,179}, 44159, {[],{sctp_shutdown_event,1536}}}, {transport,<6421.252.0>,accept,#Port<6421.2588>,0,undefined, {32,32}, 2,<6421.304.0>}]}, {1493,21505,746929}} Both this and the grandparent commit are on this: $ uname -a SunOS beren 5.10 Generic_118833-33 sun4v sparc SUNW,Netra-T2000 --- lib/diameter/src/transport/diameter_sctp.erl | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 2059a0e029..1e64804666 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -709,11 +709,7 @@ recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid} = S) bin = Bin}), S; -recv({_, #sctp_shutdown_event{assoc_id = A}}, - #transport{assoc_id = Id}) - when Id; - A == Id; - A == 0 -> +recv({_, #sctp_shutdown_event{}}, _) -> stop; %% Note that diameter_sctp(3) documents that sctp_events cannot be -- cgit v1.2.3 From ea339754d579b7e917dab0afa9a4694689f1f990 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 11 Mar 2017 21:11:48 +0100 Subject: Strip throttling callbacks from diameter_tcp Commits starting at 472a080c added a throttle_cb option to diameter_tcp to let a callback apply backpressure when it decides that additional requests should not be read. It didn't provide a hook for knowing that an answer was sent however, which is needed when sends no longer take place in the receiver process, and is more complicated than it should be. Strip it all away, in preparation for a simpler incarnation. --- lib/diameter/src/transport/diameter_tcp.erl | 221 +++++++--------------------- 1 file changed, 51 insertions(+), 170 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index f745e2a6f7..c81701b624 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -19,7 +19,6 @@ %% -module(diameter_tcp). --dialyzer({no_fail_call, throttle/2}). -behaviour(gen_server). @@ -109,22 +108,19 @@ | gen_tcp:listen_option(). -type option() :: {port, non_neg_integer()} - | {fragment_timer, 0..16#FFFFFFFF} - | {throttle_cb, diameter:evaluable()}. + | {fragment_timer, 0..16#FFFFFFFF}. %% Accepting/connecting transport process state. -record(transport, {socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module - frag = <<>> :: frag(), %% message fragment ssl :: [term()] | boolean(), %% ssl options, ssl or not + frag = <<>> :: frag(), %% message fragment timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? - throttle_cb :: false | diameter:evaluable(), %% ask to receive - throttled :: boolean() | binary(), %% stopped receiving? - monitor :: pid()}). + monitor :: pid()}). %% monitor/sender process %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -213,30 +209,26 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) %% Since accept/connect might block indefinitely, spawn a process %% that kills us with the parent until call returns, and then %% sends outgoing messages. - {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, - fragment_timer, - throttle_cb]), + fragment_timer]), SslOpts = ssl_opts(SO), OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), - Throttle = proplists:get_value(throttle_cb, OwnOpts, false), + {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid), M = if SslOpts -> ssl; true -> Mod end, monitor(process, MPid), MPid ! {start, self(), Sock, M}, %% prepare monitor for sending putr(?REF_KEY, Ref), - throttle(#transport{parent = Pid, - module = M, - socket = Sock, - ssl = SslOpts, - timeout = Tmo, - throttle_cb = Throttle, - throttled = false /= Throttle, - monitor = MPid}); + setopts(#transport{parent = Pid, + module = M, + socket = Sock, + ssl = SslOpts, + timeout = Tmo, + monitor = MPid}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -518,7 +510,7 @@ getr(Key) -> %% Transition monitor state. %% Outgoing message. -m(Bin, #monitor{} = S) +m(Bin, S) when is_binary(Bin) -> send(Bin, S), S; @@ -539,7 +531,7 @@ m({stop, TPid} = T, #monitor{transport = TPid}) -> %% Transport is telling us that TLS has been negotiated after %% capabilities exchange. -m({tls, SSock}, #monitor{} = S) -> +m({tls, SSock}, S) -> S#monitor{socket = SSock, module = ssl}; @@ -583,22 +575,14 @@ t(T,S) -> %% transition/2 -%% Incoming message. +%% Incoming packets. transition({P, Sock, Bin}, #transport{socket = Sock, - ssl = B, - throttled = T} + ssl = B} = S) when P == ssl, true == B; P == tcp -> - false = T, %% assert recv(Bin, S); -%% Make a new throttling callback after a timeout. -transition(throttle, #transport{throttled = false}) -> - ok; -transition(throttle, S) -> - throttle(S); - %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = S) -> @@ -607,7 +591,7 @@ transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = NS = tls_handshake(Type, B, S), Pid ! {diameter, {tls, Ref}}, - throttle(NS#transport{ssl = B}); + NS#transport{ssl = B}; transition({C, Sock}, #transport{socket = Sock, ssl = B}) @@ -623,8 +607,8 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, ?ERROR({T,S}); %% Outgoing message. -transition({diameter, {send, Bin}}, #transport{} = S) -> - send(Bin, S), +transition({diameter, {send, Msg}}, #transport{} = S) -> + send(Msg, S), ok; %% Request to close the transport connection. @@ -703,24 +687,16 @@ tls(accept, Sock, Opts) -> %% using Nagle. %% Receive packets until a full message is received, -recv(Bin, #transport{frag = Head, throttled = false} = S) -> +recv(Bin, #transport{parent = Pid, frag = Head} = S) -> case rcv(Head, Bin) of - {Msg, B} -> - throttle(S#transport{frag = B, throttled = Msg}); - Frag -> - setopts(S), - start_fragment_timer(S#transport{frag = Frag, - flush = false}) + {Msg, B} -> %% have a complete message ... + diameter_peer:recv(Pid, Msg), + recv(<<>>, S#transport{frag = B}); + Frag -> %% read more on the socket + start_fragment_timer(setopts(S#transport{frag = Frag, + flush = false})) end. -%% recv/1 - -recv(#transport{throttled = false} = S) -> - recv(<<>>, S); - -recv(#transport{} = S) -> - S. - %% rcv/2 %% No previous fragment. @@ -780,13 +756,16 @@ recv1(Len, Bin) -> <> = Bin, {Msg, Rest}. -%% bin/1-2 +%% bin/2 bin(Head, Acc) -> list_to_binary([Head | lists:reverse(Acc)]). +%% bin/1 + bin({_, _, Head, Acc}) -> bin(Head, Acc); + bin(Bin) when is_binary(Bin) -> Bin. @@ -805,9 +784,7 @@ bin(Bin) %% also eventually lead to watchdog failover. %% No fragment to flush or not receiving messages. -flush(#transport{frag = Frag, throttled = B} = S) - when Frag == <<>>; - B /= false -> +flush(#transport{frag = <<>>} = S) -> S; %% Messages have been received since last timer expiry. @@ -850,14 +827,11 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(Bin, #monitor{socket = Sock, - module = M}) -> - case send(M, Sock, Bin) of - ok -> - ok; - {error, Reason} -> - x({send, Reason}) - end; +send(#diameter_packet{bin = Bin}, S) -> + send(Bin, S); + +send(Bin, #monitor{} = S) -> + send1(Bin, S); %% Send from the monitor process to avoid deadlock if both the %% receiver and the peer were to block in send. @@ -865,6 +839,17 @@ send(Bin, #transport{monitor = MPid}) -> MPid ! Bin, MPid. +%% send1/2 + +send1(Bin, #monitor{socket = Sock, + module = M}) -> + case send(M, Sock, Bin) of + ok -> + ok; + {error, Reason} -> + x({send, Reason}) + end. + %% send/3 send(gen_tcp, Sock, Bin) -> @@ -885,116 +870,12 @@ setopts(M, Sock, Opts) -> %% setopts/1 -setopts(#transport{socket = Sock, module = M}) -> - setopts(M, Sock). - -%% setopts/2 - -setopts(M, Sock) -> +setopts(#transport{socket = Sock, + module = M} + = S)-> case setopts(M, Sock, [{active, once}]) of - ok -> ok; - X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect - end. - -%% throttle/1 - -%% Still collecting packets for a complete message: keep receiving. -throttle(#transport{throttled = false} = S) -> - recv(S); - -%% Decide whether to receive another, or whether to accept a message -%% that's been received. -throttle(#transport{throttle_cb = F, throttled = T} = S) -> - Res = cb(F, T), - - try throttle(Res, S) of - #transport{ssl = SB} = NS when is_boolean(SB) -> - throttle(defrag(NS)); - #transport{throttled = Msg} = NS when is_binary(Msg) -> - %% Initial incoming message when we might need to upgrade - %% to TLS: wait for reception of a tls tuple. - defrag(NS) - catch - #transport{} = NS -> - recv(NS) - end. - -%% cb/2 - -cb(false, _) -> - ok; - -cb(F, B) -> - diameter_lib:eval([F, true /= B andalso B]). - -%% throttle/2 - -%% Callback says to receive another message. -throttle(ok, #transport{throttled = true} = S) -> - throw(S#transport{throttled = false}); - -%% Callback says to accept a received message. -throttle(ok, #transport{parent = Pid, throttled = Msg} = S) - when is_binary(Msg) -> - diameter_peer:recv(Pid, Msg), - S; - -throttle({ok = T, F}, S) -> - throttle(T, S#transport{throttle_cb = F}); - -%% Callback says to accept a received message and acknowledged the -%% returned pid with a {request, Pid} message if a request pid is -%% spawned, a discard message otherwise. The latter does not mean that -%% the message was necessarily discarded: it could have been an -%% answer. -throttle(NPid, #transport{parent = Pid, throttled = Msg} = S) - when is_pid(NPid), is_binary(Msg) -> - diameter_peer:recv(Pid, {Msg, NPid}), - S; - -throttle({NPid, F}, #transport{throttled = Msg} = S) - when is_pid(NPid), is_binary(Msg) -> - throttle(NPid, S#transport{throttle_cb = F}); - -%% Callback to accept a received message says to discard it. -throttle(discard, #transport{throttled = Msg} = S) - when is_binary(Msg) -> - S; -throttle({discard = T, F}, #transport{throttled = Msg} = S) - when is_binary(Msg) -> - throttle(T, S#transport{throttle_cb = F}); - -%% Callback to accept a received message says to answer it with the -%% supplied binary. -throttle(Bin, #transport{throttled = Msg} = S) - when is_binary(Bin), is_binary(Msg) -> - send(Bin, S), - S; -throttle({Bin, F}, #transport{throttled = Msg} = S) - when is_binary(Bin), is_binary(Msg) -> - throttle(Bin, S#transport{throttle_cb = F}); - -%% Callback says to ask again in the specified number of milliseconds. -throttle({timeout, Tmo}, S) -> - erlang:send_after(Tmo, self(), throttle), - throw(S); -throttle({timeout = T, Tmo, F}, S) -> - throttle({T, Tmo}, S#transport{throttle_cb = F}); - -throttle(T, #transport{throttle_cb = F}) -> - ?ERROR({invalid_return, T, F}). - -%% defrag/1 -%% -%% Try to extract another message from packets already read before -%% another throttling callback. - -defrag(#transport{frag = Head} = S) -> - case rcv(Head, <<>>) of - {Msg, B} -> - S#transport{throttled = Msg, frag = B}; - _ -> - S#transport{throttled = true} + ok -> S; + X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect end. %% portnr/2 -- cgit v1.2.3 From ca09cf7b697798aca5a4f81a11d5ad1d90f4107e Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Wed, 15 Mar 2017 17:04:44 +0100 Subject: Simplify acks to transport processes What's interesting when implementing some form of load regulation is when an incoming request has been answered or discarded. Acknowledge exactly this, not the identity of handler processes as previously. A transport process can request acks of nonforthcoming answers by sending {diameter, ack} to the parent peer_fsm, a handler processes identifies itself with a {handler, pid()} message, and the peer_fsm monitors on this to be able to send a notification to the transport if the handler dies before sending an answer. --- lib/diameter/src/base/diameter_peer_fsm.erl | 138 +++++++++++++++++----------- lib/diameter/src/base/diameter_traffic.erl | 80 ++++++++-------- lib/diameter/src/base/diameter_watchdog.erl | 37 +++----- 3 files changed, 136 insertions(+), 119 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index d394156367..d2af8fe425 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -129,6 +129,7 @@ %% the request was sent explicitly with %% diameter:call/4. strict :: boolean(), + ack = false :: boolean(), length_errors :: exit | handle | discard, incoming_maxlen :: integer() | infinity}). @@ -235,7 +236,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> Tmo = proplists:get_value(capx_timeout, Opts, ?CAPX_TIMEOUT), Strictness = proplists:get_value(capx_strictness, Opts, true), - OnLengthErr = proplists:get_value(length_errors, Opts, exit), + LengthErr = proplists:get_value(length_errors, Opts, exit), {TPid, Addrs} = start_transport(T, Rest, Svc), @@ -247,7 +248,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> dictionary = Dict0, mode = M, service = svc(Svc, Addrs), - length_errors = OnLengthErr, + length_errors = LengthErr, strict = Strictness, incoming_maxlen = Maxlen}. %% The transport returns its local ip addresses so that different @@ -442,10 +443,18 @@ transition({connection_timeout = T, TPid}, transition({connection_timeout, _}, _) -> ok; +%% Requests for acknowledgements to the transport. +transition({diameter, ack}, S) -> + S#state{ack = true}; + %% Incoming message from the transport. -transition({diameter, {recv, MsgT}}, S) -> - {Msg, NPid} = msg(MsgT), - incoming(recv(Msg, S), NPid, S); +transition({diameter, {recv, Msg}}, S) -> + incoming(recv(Msg, S), S); + +%% Handler of an incoming request is telling of its existence. +transition({handler, Pid}, _) -> + put_route(Pid), + ok; %% Timeout when still in the same state ... transition({timeout = T, PS}, #state{state = PS}) -> @@ -459,7 +468,7 @@ transition({timeout, _}, _) -> transition({send, Msg}, S) -> outgoing(Msg, S); transition({send, Msg, Route}, S) -> - put_route(Route), + route_outgoing(Route), outgoing(Msg, S); %% Request for graceful shutdown at remove_transport, stop_service of @@ -488,12 +497,13 @@ transition({'DOWN', _, process, WPid, _}, transition({'DOWN', _, process, TPid, _}, #state{transport = TPid} = S) -> - start_next(S); + start_next(S#state{ack = false}); %% Transport has died after connection timeout, or handler process has %% died. -transition({'DOWN', _, process, Pid, _}, _) -> - erase_route(Pid), +transition({'DOWN', _, process, Pid, _}, #state{transport = TPid}) -> + is_reference(erase_route(Pid)) + andalso send(TPid, false), %% answer not forthcoming ok; %% State query. @@ -503,37 +513,56 @@ transition({state, Pid}, #state{state = S, transport = TPid}) -> %% Crash on anything unexpected. -%% put_route/1 -%% +%% route_outgoing/1 + %% Map identifiers in an outgoing request to be able to lookup the %% handler process when the answer is received. - -put_route({Pid, Ref, Seqs}) -> +route_outgoing({Pid, Ref, Seqs}) -> %% request MRef = monitor(process, Pid), put(Pid, Seqs), - put(Seqs, {Pid, Ref, MRef}). + put(Seqs, {Pid, Ref, MRef}); -%% get_route/1 +%% Remove a mapping made for an incoming request. +route_outgoing(Pid) + when is_pid(Pid) -> %% answer + MRef = erase_route(Pid), + undefined == MRef orelse demonitor(MRef). -get_route(#diameter_packet{header = #diameter_header{is_request = false}} - = Pkt) -> +%% put_route/1 + +%% Monitor on a handler process for an incoming request. +put_route(Pid) -> + MRef = monitor(process, Pid), + put(Pid, MRef). + +%% get_route/2 + +%% incoming answer +get_route(_, #diameter_packet{header = #diameter_header{is_request = false}} + = Pkt) -> Seqs = diameter_codec:sequence_numbers(Pkt), case erase(Seqs) of {Pid, Ref, MRef} -> demonitor(MRef), erase(Pid), {Pid, Ref, self()}; - undefined -> + undefined -> %% request unknown false end; -get_route(_) -> - false. +%% incoming request +get_route(Ack, _) -> + Ack. %% erase_route/1 erase_route(Pid) -> - erase(erase(Pid)). + case erase(Pid) of + {_,_} = Seqs -> + erase(Seqs); + T -> + T + end. %% capx/1 @@ -610,26 +639,26 @@ encode(Rec, Dict) -> diameter_codec:encode(Dict, #diameter_packet{header = Hdr, msg = Rec}). -%% incoming/3 +%% incoming/2 -incoming({recv, Name, Pkt}, NPid, #state{parent = Pid} = S) -> - Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid}, +incoming({recv = T, Name, Pkt}, #state{parent = Pid, ack = Ack} = S) -> + Pid ! {T, self(), get_route(Ack, Pkt), Name, Pkt}, rcv(Name, Pkt, S); -incoming(T, false, _) -> - T; - -incoming(T, NPid, _) -> - NPid ! {diameter, discard}, - T. +incoming(#diameter_header{is_request = R}, #state{transport = TPid, + ack = Ack}) -> + R andalso Ack andalso send(TPid, false), + ok; -%% msg/1 +incoming(<<_:32, 1:1, _/bits>>, #state{ack = true} = S) -> + send(S#state.transport, false), + ok; -msg({_,_} = T) -> - T; +incoming(<<_/bits>>, _) -> + ok; -msg(Msg) -> - {Msg, false}. +incoming(T, _) -> + T. %% recv/2 @@ -654,18 +683,19 @@ recv1(_, #diameter_packet{header = H, bin = Bin}, #state{incoming_maxlen = M}) when M < size(Bin) -> - invalid(false, incoming_maxlen_exceeded, {size(Bin), H}); + invalid(false, incoming_maxlen_exceeded, {size(Bin), H}), + H; %% Ignore anything but an expected CER/CEA if so configured. This is %% non-standard behaviour. -recv1(Name, _, #state{state = {'Wait-CEA', _, _}, - strict = false}) +recv1(Name, #diameter_packet{header = H}, #state{state = {'Wait-CEA', _, _}, + strict = false}) when Name /= 'CEA' -> - ok; -recv1(Name, _, #state{state = recv_CER, - strict = false}) + H; +recv1(Name, #diameter_packet{header = H}, #state{state = recv_CER, + strict = false}) when Name /= 'CER' -> - ok; + H; %% Incoming request after outgoing DPR: discard. Don't discard DPR, so %% both ends don't do so when sending simultaneously. @@ -673,13 +703,15 @@ recv1(Name, #diameter_packet{header = #diameter_header{is_request = true} = H}, #state{dpr = {_,_,_}}) when Name /= 'DPR' -> - invalid(false, recv_after_outgoing_dpr, H); + invalid(false, recv_after_outgoing_dpr, H), + H; %% Incoming request after incoming DPR: discard. recv1(_, #diameter_packet{header = #diameter_header{is_request = true} = H}, #state{dpr = true}) -> - invalid(false, recv_after_incoming_dpr, H); + invalid(false, recv_after_incoming_dpr, H), + H; %% DPA with identifier mismatch, or in response to a DPR initiated by %% the service. @@ -716,10 +748,12 @@ recv(#diameter_header{} #diameter_packet{bin = Bin}, #state{length_errors = E}) -> T = {size(Bin), bit_size(Bin) rem 8, H}, - invalid(E, message_length_mismatch, T); + invalid(E, message_length_mismatch, T), + Bin; recv(false, #diameter_packet{bin = Bin}, #state{length_errors = E}) -> - invalid(E, truncated_header, Bin). + invalid(E, truncated_header, Bin), + Bin. %% Note that counters here only count discarded messages. invalid(E, Reason, T) -> @@ -775,14 +809,10 @@ rcv('DPA' = N, diameter_peer:close(TPid), {stop, N}; -%% Ignore anything else, an unsolicited DPA in particular. Note that -%% dpa_timeout deals with the case in which the peer sends the wrong -%% identifiers in DPA. -rcv(N, #diameter_packet{header = H}, _) - when N == 'CER'; - N == 'CEA'; - N == 'DPR'; - N == 'DPA' -> +%% Ignore an unsolicited DPA in particular. Note that dpa_timeout +%% deals with the case in which the peer sends the wrong identifiers +%% in DPA. +rcv('DPA' = N, #diameter_packet{header = H}, _) -> ?LOG(ignored, N), %% Note that these aren't counted in the normal recv counter. diameter_stats:incr({diameter_codec:msg_id(H), recv, ignored}), diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index bc1ccf4feb..96f3a307f9 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -30,7 +30,7 @@ -export([send_request/4]). %% towards diameter_watchdog --export([receive_message/6]). +-export([receive_message/5]). %% towards diameter_peer_fsm and diameter_watchdog -export([incr/4, @@ -93,7 +93,7 @@ packet :: #diameter_packet{} | undefined}). %% of request %% --------------------------------------------------------------------------- -%% # make_recvdata/1 +%% make_recvdata/1 %% --------------------------------------------------------------------------- make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) -> @@ -206,42 +206,36 @@ incr_rc(Dir, Pkt, TPid, Dict0) -> incr_rc(Dir, Pkt, TPid, {Dict0, Dict0, Dict0}). %% --------------------------------------------------------------------------- -%% # receive_message/6 +%% receive_message/5 %% -%% Handle an incoming Diameter message. +%% Handle an incoming Diameter message in a watchdog process. %% --------------------------------------------------------------------------- -%% Handle an incoming Diameter message in the watchdog process. - -receive_message(TPid, Route, Pkt, false, Dict0, RecvData) -> - incoming(TPid, Route, Pkt, Dict0, RecvData); - -receive_message(TPid, Route, Pkt, NPid, Dict0, RecvData) -> - NPid ! {diameter, incoming(TPid, Route, Pkt, Dict0, RecvData)}. - -%% incoming/4 - -incoming(TPid, Route, Pkt, Dict0, RecvData) - when is_pid(TPid) -> +-spec receive_message(pid(), Route, #diameter_packet{}, module(), #recvdata{}) + -> pid() + | boolean() + when Route :: {Handler, RequestRef, Seqs} + | Ack, + Handler :: pid(), + RequestRef :: reference(), + Seqs :: {0..16#FFFFFFFF, 0..16#FFFFFFFF}, + Ack :: boolean(). + +receive_message(TPid, Route, Pkt, Dict0, RecvData) -> #diameter_packet{header = #diameter_header{is_request = R}} = Pkt, recv(R, Route, TPid, Pkt, Dict0, RecvData). %% recv/6 %% Incoming request ... -recv(true, false, TPid, Pkt, Dict0, T) -> - try - {request, spawn_request(TPid, Pkt, Dict0, T)} - catch - error: system_limit = E -> %% discard - ?LOG(error, E), - discard - end; +recv(true, Ack, TPid, Pkt, Dict0, T) + when is_boolean(Ack) -> + spawn_request(Ack, TPid, Pkt, Dict0, T); %% ... answer to known request ... recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) -> Pid ! {answer, Ref, TPid, Dict0, Pkt}, - {answer, Pid}; + true; %% Note that failover could have happened prior to this message being %% received and triggering failback. That is, both a failover message @@ -256,23 +250,27 @@ recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) -> recv(false, false, TPid, Pkt, _, _) -> ?LOG(discarded, Pkt#diameter_packet.header), incr(TPid, {{unknown, 0}, recv, discarded}), - discard. + false. -%% spawn_request/4 +%% spawn_request/5 -spawn_request(TPid, Pkt, Dict0, {Opts, RecvData}) -> - spawn_request(TPid, Pkt, Dict0, Opts, RecvData); -spawn_request(TPid, Pkt, Dict0, RecvData) -> - spawn_request(TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData). +spawn_request(Ack, TPid, Pkt, Dict0, {Opts, RecvData}) -> + spawn_request(Ack, TPid, Pkt, Dict0, Opts, RecvData); +spawn_request(Ack, TPid, Pkt, Dict0, RecvData) -> + spawn_request(Ack, TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData). -spawn_request(TPid, Pkt, Dict0, Opts, RecvData) -> - spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts). +spawn_request(Ack, TPid, Pkt, Dict0, Opts, RecvData) -> + spawn_opt(fun() -> + recv_request(Ack, TPid, Pkt, Dict0, RecvData) + end, + Opts). %% --------------------------------------------------------------------------- -%% recv_request/4 +%% recv_request/5 %% --------------------------------------------------------------------------- -recv_request(TPid, +recv_request(Ack, + TPid, #diameter_packet{header = #diameter_header{application_id = Id}} = Pkt, Dict0, @@ -280,6 +278,7 @@ recv_request(TPid, apps = Apps, codec = Opts} = RecvData) -> + Ack andalso (TPid ! {handler, self()}), diameter_codec:setopts([{common_dictionary, Dict0} | Opts]), send_A(recv_R(diameter_service:find_incoming_app(PeerT, TPid, Id, Apps), TPid, @@ -511,7 +510,7 @@ send_A(T, TPid, {AppDict, Dict0} = DictT0, ReqPkt, EvalPktFs, EvalFs) -> {MsgDict, Pkt} = reply(T, TPid, DictT0, EvalPktFs, ReqPkt), incr(send, Pkt, TPid, AppDict), incr_rc(send, Pkt, TPid, {MsgDict, AppDict, Dict0}), %% count outgoing - send(TPid, Pkt), + send(TPid, Pkt, _Route = self()), lists:foreach(fun diameter_lib:eval/1, EvalFs). %% answer/6 @@ -1207,7 +1206,7 @@ x(T) -> exit(T). %% --------------------------------------------------------------------------- -%% # send_request/4 +%% send_request/4 %% %% Handle an outgoing Diameter request. %% --------------------------------------------------------------------------- @@ -1296,7 +1295,7 @@ mo(T, _) -> ?ERROR({invalid_option, T}). %% --------------------------------------------------------------------------- -%% # send_request/6 +%% send_request/6 %% --------------------------------------------------------------------------- %% Send an outgoing request in its dedicated process. @@ -1745,11 +1744,6 @@ recv(TPid, Pid, TRef, {LocalTRef, MRef}) -> exit({timeout, LocalTRef, TPid} = T) end. -%% send/2 - -send(Pid, Pkt) -> - Pid ! {send, Pkt}. - %% send/3 send(Pid, Pkt, Route) -> diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index 4484b7ee2c..a2eb661870 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -283,7 +283,7 @@ event(Msg, ?LOG(transition, {From, To}). data(Msg, TPid, reopen, okay) -> - {recv, TPid, false, 'DWA', _Pkt, _NPid} = Msg, %% assert + {recv, TPid, _, 'DWA', _Pkt} = Msg, %% assert {TPid, T} = eraser(open), [T]; @@ -302,6 +302,8 @@ tpid(_, Pid) tpid(Pid, _) -> Pid. +%% send/2 + send(Pid, T) -> Pid ! T. @@ -447,14 +449,15 @@ transition({'DOWN', _, process, TPid, _Reason} = D, end; %% Incoming message. -transition({recv, TPid, Route, Name, Pkt, NPid}, +transition({recv, TPid, Route, Name, Pkt}, #watchdog{transport = TPid} = S) -> - try - incoming(Name, Pkt, NPid, S) - catch + try incoming(Route, Name, Pkt, S) of #watchdog{dictionary = Dict0, receive_data = T} = NS -> - diameter_traffic:receive_message(TPid, Route, Pkt, NPid, Dict0, T), + diameter_traffic:receive_message(TPid, Route, Pkt, Dict0, T), + NS + catch + #watchdog{} = NS -> NS end; @@ -586,23 +589,13 @@ send_watchdog(#watchdog{pending = false, %% incoming/4 -incoming(Name, Pkt, false, S) -> - recv(Name, Pkt, S); - -incoming(Name, Pkt, NPid, S) -> - NS = recv(Name, Pkt, S), - NPid ! {diameter, discard}, - NS. - -%% recv/3 - -recv(Name, Pkt, S) -> - try rcv(Name, Pkt, rcv(Name, S)) of - #watchdog{} = NS -> - throw(NS) +incoming(Route, Name, Pkt, S) -> + try rcv(Name, S) of + NS -> rcv(Name, Pkt, NS) catch - #watchdog{} = NS -> %% throwaway - NS + #watchdog{transport = TPid} = NS when Route -> %% incoming request + send(TPid, {send, false}), %% requiring ack + throw(NS) end. %% rcv/3 -- cgit v1.2.3 From 200697ef596dbe689443834aaf5aa0303971ca5d Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Tue, 4 Apr 2017 11:24:48 +0200 Subject: Fix incomprehensible dialyzer warning This: diameter_tcp.erl:241: Record construction #transport{parent::'false',ssl::boolean() | maybe_improper_list(),frag::<<>>,tref::'false',flush::'false',pending::0,reset::{1 | 4,0 | 2},throttled::boolean(),q::{0,queue:queue(_)},monitor::'undefined' | pid()} violates the declared type of field parent::pid() The problem isn't #transport.pid at all, it's #monitor.pid, and the only relation is that the pid that's assigned to the latter is also (later) assigned to the former. There is no record construction that assigns false to #transport.parent. Introduced in commit 33a535e4. --- lib/diameter/src/transport/diameter_tcp.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index c81701b624..8f2e9027fc 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -80,7 +80,7 @@ %% Monitor process state. The name monitor predates its role as sender. -record(monitor, - {parent :: reference() | false, + {parent :: reference() | false | pid(), transport = self() :: pid(), socket :: inet:socket() | ssl:sslsocket() | undefined, module :: module() | undefined}). -- cgit v1.2.3 From 034089ed1ba3f78c732edcfc84d85a6ed4a4854e Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 10 Jun 2017 06:34:44 +0200 Subject: Remove upgrade from diameter_sctp; tweak diameter_tcp to match Added in commit 2afd1fe5. Only rename variables in diameter_tcp, no functional change. --- lib/diameter/src/transport/diameter_sctp.erl | 26 ++++++++++-------------- lib/diameter/src/transport/diameter_tcp.erl | 30 ++++++++++++++-------------- 2 files changed, 25 insertions(+), 31 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 1e64804666..d23e56b413 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -108,7 +108,7 @@ -record(listener, {ref :: reference(), socket :: gen_sctp:sctp_socket(), - service = false :: false | pid(), %% service process + service :: pid(), %% service process pending = {0, queue:new()}, accept :: [match()]}). %% Field pending implements two queues: the first of transport-to-be @@ -142,11 +142,11 @@ start(T, Svc, Opts) when is_list(Opts) -> #diameter_service{capabilities = Caps, - pid = SPid} + pid = Pid} = Svc, diameter_sctp_sup:start(), %% start supervisors on demand Addrs = Caps#diameter_caps.host_ip_address, - s(T, Addrs, SPid, lists:map(fun ip/1, Opts)). + s(T, Addrs, Pid, lists:map(fun ip/1, Opts)). ip({ifaddr, A}) -> {ip, A}; @@ -157,9 +157,9 @@ ip(T) -> %% when there is not yet an association to assign it, or at comm_up on %% a new association in which case the call retrieves a transport from %% the pending queue. -s({accept, Ref} = A, Addrs, SPid, Opts) -> - {ok, LPid, LAs} = listener(Ref, {Opts, Addrs}), - try gen_server:call(LPid, {A, self(), SPid}, infinity) of +s({accept, Ref} = A, Addrs, SvcPid, Opts) -> + {ok, LPid, LAs} = listener(Ref, {Opts, SvcPid, Addrs}), + try gen_server:call(LPid, {A, self()}, infinity) of {ok, TPid} -> {ok, TPid, LAs}; No -> @@ -172,7 +172,7 @@ s({accept, Ref} = A, Addrs, SPid, Opts) -> %% gen_sctp in order to be able to accept a new association only %% *after* an accepting transport has been spawned. -s({connect = C, Ref}, Addrs, _SPid, Opts) -> +s({connect = C, Ref}, Addrs, _SvcPid, Opts) -> diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}). %% start_link/1 @@ -233,7 +233,8 @@ i(#monitor{transport = TPid} = S) -> S; %% A process owning a listening socket. -i({listen, Ref, {Opts, Addrs}}) -> +i({listen, Ref, {Opts, SvcPid, Addrs}}) -> + monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence {[Matches], Rest} = proplists:split(Opts, [accept]), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), @@ -241,6 +242,7 @@ i({listen, Ref, {Opts, Addrs}}) -> true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), proc_lib:init_ack({ok, self(), LAs}), #listener{ref = Ref, + service = SvcPid, socket = Sock, accept = [[M] || {accept, M} <- Matches]}; @@ -390,14 +392,6 @@ handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref} = S) -> {TPid, NewS} = accept(Ref, Pid, S), {reply, {ok, TPid}, NewS}; -handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) -> - handle_call({T, Pid}, From, if not is_pid(P), is_pid(SPid) -> - monitor(process, SPid), - S#listener{service = SPid}; - true -> - S - end); - %% Transport is telling us of parent death. handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> {stop, {shutdown, Reason}, ok, S}; diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 8f2e9027fc..427b2395b9 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -142,13 +142,13 @@ start({T, Ref}, Svc, Opts) -> #diameter_service{capabilities = Caps, - pid = SPid} + pid = SvcPid} = Svc, diameter_tcp_sup:start(), %% start tcp supervisors on demand {Mod, Rest} = split(Opts), Addrs = Caps#diameter_caps.host_ip_address, - Arg = {T, Ref, Mod, self(), Rest, Addrs, SPid}, + Arg = {T, Ref, Mod, self(), Rest, Addrs, SvcPid}, diameter_tcp_sup:start_child(Arg). split([{module, M} | Opts]) -> @@ -202,7 +202,7 @@ init(T) -> %% i/1 %% A transport process. -i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) +i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid}) when T == accept; T == connect -> monitor(process, Pid), @@ -218,7 +218,7 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) ?DEFAULT_FRAGMENT_TIMEOUT), ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), - Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid), + Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid), M = if SslOpts -> ssl; true -> Mod end, monitor(process, MPid), MPid ! {start, self(), Sock, M}, %% prepare monitor for sending @@ -275,19 +275,19 @@ ssl_opts(T) -> %% init/8 %% Establish a TLS connection before capabilities exchange ... -init(Type, Ref, Mod, Pid, true, Opts, Addrs, SPid) -> - init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SPid); +init(Type, Ref, Mod, Pid, true, Opts, Addrs, SvcPid) -> + init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SvcPid); %% ... or not. -init(Type, Ref, Mod, Pid, _, Opts, Addrs, SPid) -> - init(Type, Ref, Mod, Pid, Opts, Addrs, SPid). +init(Type, Ref, Mod, Pid, _, Opts, Addrs, SvcPid) -> + init(Type, Ref, Mod, Pid, Opts, Addrs, SvcPid). %% init/7 -init(accept = T, Ref, Mod, Pid, Opts, Addrs, SPid) -> +init(accept = T, Ref, Mod, Pid, Opts, Addrs, SvcPid) -> {[Matches], Rest} = proplists:split(Opts, [accept]), {ok, LPid, {LAddr, LSock}} = listener(Ref, {Mod, Rest, Addrs}), - ok = gen_server:call(LPid, {accept, SPid}, infinity), + ok = gen_server:call(LPid, {accept, SvcPid}, infinity), proc_lib:init_ack({ok, self(), [LAddr]}), Sock = ok(accept(Mod, LSock)), ok = accept_peer(Mod, Sock, accept(Matches)), @@ -295,7 +295,7 @@ init(accept = T, Ref, Mod, Pid, Opts, Addrs, SPid) -> diameter_peer:up(Pid), Sock; -init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SPid) -> +init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SvcPid) -> {[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]), LAddrOpt = get_addr(LA, Addrs), RAddr = get_addr(RA), @@ -447,10 +447,10 @@ portnr(Sock) -> %% # handle_call/3 %% --------------------------------------------------------------------------- -handle_call({accept, SPid}, _From, #listener{service = P} = S) -> - {reply, ok, if not is_pid(P), is_pid(SPid) -> - monitor(process, SPid), - S#listener{service = SPid}; +handle_call({accept, SvcPid}, _From, #listener{service = P} = S) -> + {reply, ok, if not is_pid(P), is_pid(SvcPid) -> + monitor(process, SvcPid), + S#listener{service = SvcPid}; true -> S end}; -- cgit v1.2.3 From eadf4efc7e264fe8dd30befb42a42a02cdef58f1 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 10 Jun 2017 23:40:53 +0200 Subject: Make diameter_{tcp,sctp} sender configurable With sends still from the receiving process by default, since changing the default behaviour may well have negative effects. A separate sender probably implies a greater need for some form of load regulation for one, since a blocking send would no longer imply that incoming messages are no longer recevied. Dealing with this could result in the same deadlock that the sending process intends to avoid, but the user should be in control over how/when incoming traffic is regulated. --- lib/diameter/src/transport/diameter_sctp.erl | 59 +++++++++++++------- lib/diameter/src/transport/diameter_tcp.erl | 81 +++++++++++++++++----------- 2 files changed, 90 insertions(+), 50 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index d23e56b413..3919596cb1 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -68,6 +68,7 @@ -type connect_option() :: {raddr, inet:ip_address()} | {rport, inet:port_number()} + | option() | term(). %% gen_sctp:open_option(). -type match() :: inet:ip_address() @@ -75,8 +76,12 @@ | [match()]. -type listen_option() :: {accept, match()} + | option() | term(). %% gen_sctp:open_option(). +-type option() :: {sender, boolean()} + | sender. + -type uint() :: non_neg_integer(). %% Accepting/connecting transport process state. @@ -96,7 +101,7 @@ streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, os = 0 :: uint(), %% next output stream - monitor :: pid() | undefined}). %% sending process + send = false :: pid() | boolean()}). %% sending process %% Monitor process state. -record(monitor, @@ -110,7 +115,8 @@ socket :: gen_sctp:sctp_socket(), service :: pid(), %% service process pending = {0, queue:new()}, - accept :: [match()]}). + accept :: [match()], + sender :: boolean()}). %% Field pending implements two queues: the first of transport-to-be %% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived @@ -236,7 +242,8 @@ i(#monitor{transport = TPid} = S) -> i({listen, Ref, {Opts, SvcPid, Addrs}}) -> monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {[Matches], Rest} = proplists:split(Opts, [accept]), + {Split, Rest} = proplists:split(Opts, [accept, sender]), + OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), @@ -244,12 +251,14 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) -> #listener{ref = Ref, service = SvcPid, socket = Sock, - accept = [[M] || {accept, M} <- Matches]}; + accept = [[M] || {accept, M} <- OwnOpts], + sender = proplists:get_value(sender, OwnOpts, false)}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> - {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]), - RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As], + {[Ps | Split], Rest} = proplists:split(Opts, [rport, raddr, sender]), + OwnOpts = lists:append(Split), + RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts], [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], {LAs, Sock} = open(Addrs, Rest, 0), putr(?REF_KEY, Ref), @@ -257,7 +266,8 @@ i({connect, Pid, Opts, Addrs, Ref}) -> monitor(process, Pid), #transport{parent = Pid, mode = {connect, connect(Sock, RAs, RP, [])}, - socket = Sock}; + socket = Sock, + send = proplists:get_value(sender, OwnOpts, false)}; %% An accepting transport spawned by diameter, not yet owning an %% association. @@ -291,11 +301,12 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> receive {Ref, Pid} when K == parent -> %% transport process started S#transport{parent = Pid}; - {K, T, Matches} when K == peeloff -> %% association + {K, T, Matches, Bool} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), - t(T, S#transport{socket = Sock}); + t(T, S#transport{socket = Sock, + send = Bool}); accept_timeout = T -> x(T); {'DOWN', _, process, _, _} = T -> @@ -466,11 +477,12 @@ getr(Key) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, - accept = Matches} + accept = Matches, + sender = Sender} = S) -> Id = assoc_id(Data), {TPid, NewS} = accept(S), - TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches}, + TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches, Sender}, setopts(Sock), NewS; @@ -546,14 +558,15 @@ transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> %% Parent process has died: call the monitor to not close the socket %% during an ongoing send, but don't let it take forever. transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, - monitor = MPid}) -> - undefined == MPid + send = MPid}) -> + is_boolean(MPid) orelse ok == (catch gen_server:call(MPid, {stop, Pid})) orelse exit(MPid, kill), stop; %% Monitor process has died. -transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> +transition({'DOWN', _, process, MPid, _}, #transport{send = MPid}) + when is_pid(MPid) -> stop; %% Timeout after transport process has been started. @@ -617,7 +630,7 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% send/2 %% Start monitor process on first send. -send(Msg, #transport{monitor = undefined, +send(Msg, #transport{send = true, socket = Sock, assoc_id = AId} = S) -> @@ -625,7 +638,7 @@ send(Msg, #transport{monitor = undefined, socket = Sock, assoc_id = AId}), monitor(process, MPid), - send(Msg, S#transport{monitor = MPid}); + send(Msg, S#transport{send = MPid}); %% Outbound Diameter message on a specified stream ... send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, @@ -646,13 +659,23 @@ send(Bin, #transport{streams = {_, OS}, %% send/3 -send(StreamId, Bin, #transport{monitor = MPid}) -> +send(StreamId, Bin, #transport{send = false, + socket = Sock, + assoc_id = AId}) -> + send(Sock, AId, StreamId, Bin); + +send(StreamId, Bin, #transport{send = MPid}) -> MPid ! {Bin, StreamId}, MPid; send(StreamId, Bin, #monitor{socket = Sock, assoc_id = AId}) -> - case gen_sctp:send(Sock, AId, StreamId, Bin) of + send(Sock, AId, StreamId, Bin). + +%% send/4 + +send(Sock, AssocId, StreamId, Bin) -> + case gen_sctp:send(Sock, AssocId, StreamId, Bin) of ok -> ok; {error, Reason} -> diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 427b2395b9..edbbec1709 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -69,16 +69,16 @@ %% of processes: an actual transport process, one that will club it to %% death should the parent die before a connection is established, and %% a process owning the listening port. The monitor process -%% historically died after connection establishment, but now lives on -%% as the sender of outgoing messages, so that a blocking send doesn't -%% prevent messages from being received. +%% historically died after connection establishment, but can now live +%% on as the sender of outgoing messages, so that a blocking send +%% doesn't prevent messages from being received. %% Listener process state. -record(listener, {socket :: inet:socket(), module :: module(), service = false :: false | pid()}). %% service process -%% Monitor process state. The name monitor predates its role as sender. +%% Monitor process state. -record(monitor, {parent :: reference() | false | pid(), transport = self() :: pid(), @@ -108,6 +108,7 @@ | gen_tcp:listen_option(). -type option() :: {port, non_neg_integer()} + | {sender, boolean()} | {fragment_timer, 0..16#FFFFFFFF}. %% Accepting/connecting transport process state. @@ -120,7 +121,7 @@ timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? - monitor :: pid()}). %% monitor/sender process + send :: pid() | false}). %% sending process %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -210,25 +211,27 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid}) %% that kills us with the parent until call returns, and then %% sends outgoing messages. {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, + sender, fragment_timer]), SslOpts = ssl_opts(SO), OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), + Sender = proplists:get_value(sender, OwnOpts, false), ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid), M = if SslOpts -> ssl; true -> Mod end, - monitor(process, MPid), - MPid ! {start, self(), Sock, M}, %% prepare monitor for sending + Sender andalso monitor(process, MPid), + MPid ! {start, self(), Sender andalso {Sock, M}}, %% prepare for sending putr(?REF_KEY, Ref), setopts(#transport{parent = Pid, module = M, socket = Sock, ssl = SslOpts, timeout = Tmo, - monitor = MPid}); + send = Sender andalso MPid}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. @@ -515,15 +518,22 @@ m(Bin, S) send(Bin, S), S; -%% Transport is telling us to be ready to send. Stop monitoring on the +%% Transport has established a connection. Stop monitoring on the %% parent so as not to die before a send from the transport. -m({start, TPid, Sock, Mod}, #monitor{parent = MRef, - transport = TPid} - = S) -> - demonitor(MRef, [flush]), - S#monitor{parent = false, - socket = Sock, - module = Mod}; +m({start, TPid, T} = M, #monitor{transport = TPid} = S) -> + case T of + {Sock, Mod} -> + demonitor(S#monitor.parent, [flush]), + S#monitor{parent = false, + socket = Sock, + module = Mod}; + false -> %% monitor not sending + x(M) + end; + +%% Transport is telling us to die. +m({stop, TPid} = T, #monitor{transport = TPid}) -> + x(T); %% Transport is telling us to die. m({stop, TPid} = T, #monitor{transport = TPid}) -> @@ -632,13 +642,15 @@ transition({resolve_port, Pid}, #transport{socket = Sock, %% Parent process has died: call the monitor to not close the socket %% during an ongoing send, but don't let it take forever. transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, - monitor = MPid}) -> - ok == (catch gen_server:call(MPid, {stop, Pid})) - orelse exit(MPid, kill), + send = MPid}) -> + false == MPid + orelse (ok == gen_server:call(MPid, {stop, self()}, 1000)) + orelse exit(MPid, {shutdown, parent}), stop; %% Monitor process has died. -transition({'DOWN', _, process, MPid, _}, #transport{monitor = MPid}) -> +transition({'DOWN', _, process, MPid, _}, #transport{send = MPid}) + when is_pid(MPid) -> stop. %% Crash on anything unexpected. @@ -663,12 +675,12 @@ tls_handshake(_, true, #transport{ssl = false}) -> tls_handshake(Type, true, #transport{socket = Sock, module = M, ssl = Opts, - monitor = MPid} + send = MPid} = S) -> {ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]), Ref = getr(?REF_KEY), true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}), - MPid ! {tls, SSock}, %% tell the monitor process + false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process S#transport{socket = SSock, module = ssl}; @@ -827,29 +839,34 @@ connect(Mod, Host, Port, Opts) -> %% send/2 +send(false, #transport{}) -> %% ack + ok; + send(#diameter_packet{bin = Bin}, S) -> send(Bin, S); -send(Bin, #monitor{} = S) -> - send1(Bin, S); +send(Bin, #transport{socket = Sock, module = M, send = false}) -> + send1(M, Sock, Bin); + +send(Bin, #monitor{socket = Sock, module = M}) -> + send1(M, Sock, Bin); %% Send from the monitor process to avoid deadlock if both the %% receiver and the peer were to block in send. -send(Bin, #transport{monitor = MPid}) -> - MPid ! Bin, - MPid. +send(Bin, #transport{send = Pid}) -> + Pid ! Bin, + ok. -%% send1/2 +%% send1/3 -send1(Bin, #monitor{socket = Sock, - module = M}) -> - case send(M, Sock, Bin) of +send1(Mod, Sock, Bin) -> + case send(Mod, Sock, Bin) of ok -> ok; {error, Reason} -> x({send, Reason}) end. - + %% send/3 send(gen_tcp, Sock, Bin) -> -- cgit v1.2.3 From 636a719927b23751c12563b8e137ea8698e2abd5 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sat, 10 Jun 2017 23:15:37 +0200 Subject: Add diameter_tcp send/recv callbacks From the receiver process, that can return binaries to send/receive and stop the transport process from reading on the socket. This is still undocumented, and may change. --- lib/diameter/src/transport/diameter_tcp.erl | 156 ++++++++++++++++++++++------ 1 file changed, 125 insertions(+), 31 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index edbbec1709..5819d52bdc 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -82,6 +82,7 @@ -record(monitor, {parent :: reference() | false | pid(), transport = self() :: pid(), + ack = false :: boolean(), socket :: inet:socket() | ssl:sslsocket() | undefined, module :: module() | undefined}). @@ -109,11 +110,15 @@ -type option() :: {port, non_neg_integer()} | {sender, boolean()} + | sender + | {message_cb, false | diameter:evaluable()} | {fragment_timer, 0..16#FFFFFFFF}. %% Accepting/connecting transport process state. -record(transport, {socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket + active = false :: boolean(), %% is socket active? + recv = true :: boolean(), %% should it be active? parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module ssl :: [term()] | boolean(), %% ssl options, ssl or not @@ -121,7 +126,8 @@ timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? - send :: pid() | false}). %% sending process + message_cb :: false | diameter:evaluable(), + send :: pid() | false}). %% sending process %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -212,24 +218,28 @@ i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid}) %% sends outgoing messages. {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, sender, + message_cb, fragment_timer]), SslOpts = ssl_opts(SO), OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), - Sender = proplists:get_value(sender, OwnOpts, false), + [CB, Sender] = [proplists:get_value(K, OwnOpts, false) + || K <- [message_cb, sender]], ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid), M = if SslOpts -> ssl; true -> Mod end, Sender andalso monitor(process, MPid), - MPid ! {start, self(), Sender andalso {Sock, M}}, %% prepare for sending + false == CB orelse (Pid ! {diameter, ack}), + MPid ! {start, self(), Sender andalso {Sock, M}, false /= CB}, putr(?REF_KEY, Ref), setopts(#transport{parent = Pid, module = M, socket = Sock, ssl = SslOpts, + message_cb = CB, timeout = Tmo, send = Sender andalso MPid}); %% Put the reference in the process dictionary since we now use it @@ -520,13 +530,14 @@ m(Bin, S) %% Transport has established a connection. Stop monitoring on the %% parent so as not to die before a send from the transport. -m({start, TPid, T} = M, #monitor{transport = TPid} = S) -> +m({start, TPid, T, Ack} = M, #monitor{transport = TPid} = S) -> case T of {Sock, Mod} -> demonitor(S#monitor.parent, [flush]), S#monitor{parent = false, socket = Sock, - module = Mod}; + module = Mod, + ack = Ack}; false -> %% monitor not sending x(M) end; @@ -591,7 +602,7 @@ transition({P, Sock, Bin}, #transport{socket = Sock, = S) when P == ssl, true == B; P == tcp -> - recv(Bin, S); + recv(Bin, S#transport{active = false}); %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} @@ -618,8 +629,16 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, %% Outgoing message. transition({diameter, {send, Msg}}, #transport{} = S) -> - send(Msg, S), - ok; + message(send, Msg, S); + +%% Monitor has sent an outgoing message. +transition(Bin, S) + when is_binary(Bin) -> + message(ack, Bin, S); + +%% Deferred actions from a message_cb. +transition({actions, Dir, Acts}, S) -> + actions(Acts, Dir, S); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid, @@ -699,12 +718,11 @@ tls(accept, Sock, Opts) -> %% using Nagle. %% Receive packets until a full message is received, -recv(Bin, #transport{parent = Pid, frag = Head} = S) -> +recv(Bin, #transport{frag = Head} = S) -> case rcv(Head, Bin) of {Msg, B} -> %% have a complete message ... - diameter_peer:recv(Pid, Msg), - recv(<<>>, S#transport{frag = B}); - Frag -> %% read more on the socket + message(recv, Msg, S#transport{frag = B}); + Frag -> %% read more on the socket start_fragment_timer(setopts(S#transport{frag = Frag, flush = false})) end. @@ -804,9 +822,8 @@ flush(#transport{flush = false} = S) -> start_fragment_timer(S#transport{flush = true}); %% No messages since last expiry. -flush(#transport{frag = Frag, parent = Pid} = S) -> - diameter_peer:recv(Pid, bin(Frag)), - S#transport{frag = <<>>}. +flush(#transport{frag = Frag} = S) -> + message(recv, bin(Frag), S#transport{frag = <<>>}). %% start_fragment_timer/1 %% @@ -839,23 +856,19 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(false, #transport{}) -> %% ack - ok; - -send(#diameter_packet{bin = Bin}, S) -> - send(Bin, S); - -send(Bin, #transport{socket = Sock, module = M, send = false}) -> - send1(M, Sock, Bin); +send(Bin, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) -> + send1(M, Sock, Bin), + B andalso (TPid ! Bin); -send(Bin, #monitor{socket = Sock, module = M}) -> - send1(M, Sock, Bin); +send(Bin, #transport{socket = Sock, module = M, send = false} = S) -> + send1(M, Sock, Bin), + message(ack, Bin, S); %% Send from the monitor process to avoid deadlock if both the %% receiver and the peer were to block in send. -send(Bin, #transport{send = Pid}) -> +send(Bin, #transport{send = Pid} = S) -> Pid ! Bin, - ok. + S. %% send1/3 @@ -866,7 +879,7 @@ send1(Mod, Sock, Bin) -> {error, Reason} -> x({send, Reason}) end. - + %% send/3 send(gen_tcp, Sock, Bin) -> @@ -888,12 +901,18 @@ setopts(M, Sock, Opts) -> %% setopts/1 setopts(#transport{socket = Sock, + active = A, + recv = B, module = M} - = S)-> + = S) + when B, not A -> case setopts(M, Sock, [{active, once}]) of - ok -> S; + ok -> S#transport{active = true}; X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect - end. + end; + +setopts(S) -> + S. %% portnr/2 @@ -928,3 +947,78 @@ getstat(gen_tcp, Sock) -> getstat(M, Sock) -> M:getstat(Sock). %% Note that ssl:getstat/1 doesn't yet exist in R15B01. + +%% A message_cb is invoked whenever a message is sent or received, or +%% to provide acknowledgement of a completed send or discarded +%% request. Ignoring possible extra arguments, calls are of the +%% following form. +%% +%% cb(recv, Bin) Pass a received message into diameter? +%% cb(send, Bin) Send a message? +%% cb(ack, Bin) Acknowledgement of a completed send. +%% cb(ack, false) Acknowledgement of a discarded request. +%% +%% Callbacks return a list of the following form. +%% +%% [boolean() | send | recv | binary()] +%% +%% The atoms are meaningless by themselves, but say whether subsequent +%% binaries are to be sent or received. A boolean says whether or not +%% to continue reading on the socket. Messages can be received even +%% after false is returned if these arrived in the same packet. A +%% leading recv or send is implicit on the corresponding callbacks. A +%% new callback can be returned as the tail of a returned list: any +%% value not of the aforementioned list type is interpreted as a +%% callback. + +%% message/3 + +message(send, false = M, S) -> + message(ack, M, S); + +message(ack, _, #transport{message_cb = false} = S) -> + S; + +message(Dir, #diameter_packet{bin = Bin}, S) -> + message(Dir, Bin, S); + +message(Dir, Bin, #transport{message_cb = CB} = S) -> + recv(<<>>, actions(cb(CB, Dir, Bin), Dir, S)). + +%% actions/3 + +actions([], _, S) -> + S; + +actions([B | As], Dir, S) + when is_boolean(B) -> + actions(As, Dir, S#transport{recv = B}); + +actions([Dir | As], _, S) + when Dir == send; + Dir == recv -> + actions(As, Dir, S); + +actions([Bin | As], send = Dir, #transport{} = S) + when is_binary(Bin) -> + actions(As, Dir, send(Bin, S)); + +actions([Bin | As], recv = Dir, #transport{parent = Pid} = S) + when is_binary(Bin) -> + diameter_peer:recv(Pid, Bin), + actions(As, Dir, S); + +actions([{defer, Tmo, Acts} | As], Dir, S) -> + erlang:send_after(Tmo, self(), {actions, Dir, Acts}), + actions(As, Dir, S); + +actions(CB, _, S) -> + S#transport{message_cb = CB}. + +%% cb/3 + +cb(false, _, Bin) -> + [Bin]; + +cb(CB, Dir, Msg) -> + diameter_lib:eval([CB, Dir, Msg]). -- cgit v1.2.3 From ff367b200930d5107666e0a059d09e3218740567 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Tue, 21 Mar 2017 13:01:15 +0100 Subject: Remove upgrade from diameter_traffic --- lib/diameter/src/base/diameter_traffic.erl | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index 96f3a307f9..ccfab22e9c 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -54,9 +54,6 @@ -define(RELAY, ?DIAMETER_DICT_RELAY). -define(BASE, ?DIAMETER_DICT_COMMON). %% Note: the RFC 3588 dictionary --define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests --define(DEFAULT_SPAWN_OPTS, []). - %% Table containing outgoing entries that live and die with %% peer_up/down. The name is historic, since the table used to contain %% information about outgoing requests for which an answer has yet to @@ -67,7 +64,7 @@ -record(options, {filter = none :: diameter:peer_filter(), extra = [] :: list(), - timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF, + timeout = 5000 :: 0..16#FFFFFFFF, %% for outgoing requests detach = false :: boolean()}). %% Term passed back to receive_message/6 with every incoming message. @@ -211,11 +208,13 @@ incr_rc(Dir, Pkt, TPid, Dict0) -> %% Handle an incoming Diameter message in a watchdog process. %% --------------------------------------------------------------------------- --spec receive_message(pid(), Route, #diameter_packet{}, module(), #recvdata{}) +-spec receive_message(pid(), Route, #diameter_packet{}, module(), RecvData) -> pid() | boolean() when Route :: {Handler, RequestRef, Seqs} | Ack, + RecvData :: {[SpawnOpt], #recvdata{}}, + SpawnOpt :: term(), Handler :: pid(), RequestRef :: reference(), Seqs :: {0..16#FFFFFFFF, 0..16#FFFFFFFF}, @@ -255,11 +254,6 @@ recv(false, false, TPid, Pkt, _, _) -> %% spawn_request/5 spawn_request(Ack, TPid, Pkt, Dict0, {Opts, RecvData}) -> - spawn_request(Ack, TPid, Pkt, Dict0, Opts, RecvData); -spawn_request(Ack, TPid, Pkt, Dict0, RecvData) -> - spawn_request(Ack, TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData). - -spawn_request(Ack, TPid, Pkt, Dict0, Opts, RecvData) -> spawn_opt(fun() -> recv_request(Ack, TPid, Pkt, Dict0, RecvData) end, -- cgit v1.2.3 From 84bfb4980a5d6dd806cff07c8dc1c9f2ef85fc20 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 12 Jun 2017 15:35:22 +0200 Subject: Let diameter_tcp send/recv callbacks deal in diameter_packet To let a recv callback for an incoming request set transport_data and have it returned in a send callback. --- lib/diameter/src/transport/diameter_tcp.erl | 73 ++++++++++++++++------------- 1 file changed, 40 insertions(+), 33 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 5819d52bdc..a2f393d5d4 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -523,9 +523,10 @@ getr(Key) -> %% Transition monitor state. %% Outgoing message. -m(Bin, S) - when is_binary(Bin) -> - send(Bin, S), +m(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + send(Msg, S), S; %% Transport has established a connection. Stop monitoring on the @@ -632,9 +633,10 @@ transition({diameter, {send, Msg}}, #transport{} = S) -> message(send, Msg, S); %% Monitor has sent an outgoing message. -transition(Bin, S) - when is_binary(Bin) -> - message(ack, Bin, S); +transition(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + message(ack, Msg, S); %% Deferred actions from a message_cb. transition({actions, Dir, Acts}, S) -> @@ -856,22 +858,25 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(Bin, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) -> - send1(M, Sock, Bin), - B andalso (TPid ! Bin); +send(Msg, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) -> + send1(M, Sock, Msg), + B andalso (TPid ! Msg); -send(Bin, #transport{socket = Sock, module = M, send = false} = S) -> - send1(M, Sock, Bin), - message(ack, Bin, S); +send(Msg, #transport{socket = Sock, module = M, send = false} = S) -> + send1(M, Sock, Msg), + message(ack, Msg, S); %% Send from the monitor process to avoid deadlock if both the %% receiver and the peer were to block in send. -send(Bin, #transport{send = Pid} = S) -> - Pid ! Bin, +send(Msg, #transport{send = Pid} = S) -> + Pid ! Msg, S. %% send1/3 +send1(Mod, Sock, #diameter_packet{bin = Bin}) -> + send1(Mod, Sock, Bin); + send1(Mod, Sock, Bin) -> case send(Mod, Sock, Bin) of ok -> @@ -953,17 +958,20 @@ getstat(M, Sock) -> %% request. Ignoring possible extra arguments, calls are of the %% following form. %% -%% cb(recv, Bin) Pass a received message into diameter? -%% cb(send, Bin) Send a message? -%% cb(ack, Bin) Acknowledgement of a completed send. +%% cb(recv, Msg) Receive a message into diameter? +%% cb(send, Msg) Send a message on the socket? +%% cb(ack, Msg) Acknowledgement of a completed send. %% cb(ack, false) Acknowledgement of a discarded request. %% -%% Callbacks return a list of the following form. +%% Msg will be binary() in a recv callback, but can be a +%% diameter_packet record in a send/ack callback if a recv/send +%% callback returns a record. Callbacks return a list of the following +%% form. %% -%% [boolean() | send | recv | binary()] +%% [boolean() | send | recv | binary() | #diameter_packet{}] %% %% The atoms are meaningless by themselves, but say whether subsequent -%% binaries are to be sent or received. A boolean says whether or not +%% messages are to be sent or received. A boolean says whether or not %% to continue reading on the socket. Messages can be received even %% after false is returned if these arrived in the same packet. A %% leading recv or send is implicit on the corresponding callbacks. A @@ -979,11 +987,8 @@ message(send, false = M, S) -> message(ack, _, #transport{message_cb = false} = S) -> S; -message(Dir, #diameter_packet{bin = Bin}, S) -> - message(Dir, Bin, S); - -message(Dir, Bin, #transport{message_cb = CB} = S) -> - recv(<<>>, actions(cb(CB, Dir, Bin), Dir, S)). +message(Dir, Msg, #transport{message_cb = CB} = S) -> + recv(<<>>, actions(cb(CB, Dir, Msg), Dir, S)). %% actions/3 @@ -999,13 +1004,15 @@ actions([Dir | As], _, S) Dir == recv -> actions(As, Dir, S); -actions([Bin | As], send = Dir, #transport{} = S) - when is_binary(Bin) -> - actions(As, Dir, send(Bin, S)); +actions([Msg | As], send = Dir, S) + when is_binary(Msg); + is_record(Msg, diameter_packet) -> + actions(As, Dir, send(Msg, S)); -actions([Bin | As], recv = Dir, #transport{parent = Pid} = S) - when is_binary(Bin) -> - diameter_peer:recv(Pid, Bin), +actions([Msg | As], recv = Dir, #transport{parent = Pid} = S) + when is_binary(Msg); + is_record(Msg, diameter_packet) -> + diameter_peer:recv(Pid, Msg), actions(As, Dir, S); actions([{defer, Tmo, Acts} | As], Dir, S) -> @@ -1017,8 +1024,8 @@ actions(CB, _, S) -> %% cb/3 -cb(false, _, Bin) -> - [Bin]; +cb(false, _, Msg) -> + [Msg]; cb(CB, Dir, Msg) -> diameter_lib:eval([CB, Dir, Msg]). -- cgit v1.2.3 From 373cd07c28bbe3e299eaca1c96b1441623ad4979 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Sun, 11 Jun 2017 01:13:36 +0200 Subject: Add diameter_sctp send/recv callbacks Corresponding to diameter_tcp callbacks a few commits back. Exercise the callbacks in the traffic suite. --- lib/diameter/src/transport/diameter_sctp.erl | 183 ++++++++++++++++++++------- 1 file changed, 137 insertions(+), 46 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 3919596cb1..470a6aa9bf 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -59,9 +59,6 @@ %% The default port for a listener. -define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 -%% Remote addresses to accept connections from. --define(DEFAULT_ACCEPT, []). %% any - %% How long to wait for a transport process to attach after %% association establishment. -define(ACCEPT_TIMEOUT, 5000). @@ -80,7 +77,8 @@ | term(). %% gen_sctp:open_option(). -type option() :: {sender, boolean()} - | sender. + | sender + | {message_cb, false | diameter:evaluable()}. -type uint() :: non_neg_integer(). @@ -93,6 +91,8 @@ %% {RAs, RP, Errors} | connect, socket :: gen_sctp:sctp_socket() | undefined, + active = false :: boolean(), %% is socket active? + recv = true :: boolean(), %% should it be active? assoc_id :: gen_sctp:assoc_id() %% association identifier | undefined | true, @@ -101,11 +101,13 @@ streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, os = 0 :: uint(), %% next output stream + message_cb :: false | diameter:evaluable(), send = false :: pid() | boolean()}). %% sending process %% Monitor process state. -record(monitor, {transport :: pid(), + ack = false :: boolean(), socket :: gen_sctp:sctp_socket(), assoc_id :: gen_sctp:assoc_id()}). %% next output stream @@ -115,8 +117,7 @@ socket :: gen_sctp:sctp_socket(), service :: pid(), %% service process pending = {0, queue:new()}, - accept :: [match()], - sender :: boolean()}). + opts :: [[match()] | boolean() | diameter:evaluable()]}). %% Field pending implements two queues: the first of transport-to-be %% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived @@ -242,7 +243,7 @@ i(#monitor{transport = TPid} = S) -> i({listen, Ref, {Opts, SvcPid, Addrs}}) -> monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {Split, Rest} = proplists:split(Opts, [accept, sender]), + {Split, Rest} = proplists:split(Opts, [accept, sender, message_cb]), OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), @@ -251,13 +252,17 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) -> #listener{ref = Ref, service = SvcPid, socket = Sock, - accept = [[M] || {accept, M} <- OwnOpts], - sender = proplists:get_value(sender, OwnOpts, false)}; + opts = [[[M] || {accept, M} <- OwnOpts] + | [proplists:get_value(K, OwnOpts, false) + || K <- [sender, message_cb]]]}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> - {[Ps | Split], Rest} = proplists:split(Opts, [rport, raddr, sender]), + {[Ps | Split], Rest} + = proplists:split(Opts, [rport, raddr, sender, message_cb]), OwnOpts = lists:append(Split), + CB = proplists:get_value(message_cb, OwnOpts, false), + false == CB orelse (Pid ! {diameter, ack}), RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts], [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], {LAs, Sock} = open(Addrs, Rest, 0), @@ -267,6 +272,7 @@ i({connect, Pid, Opts, Addrs, Ref}) -> #transport{parent = Pid, mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock, + message_cb = CB, send = proplists:get_value(sender, OwnOpts, false)}; %% An accepting transport spawned by diameter, not yet owning an @@ -301,12 +307,15 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> receive {Ref, Pid} when K == parent -> %% transport process started S#transport{parent = Pid}; - {K, T, Matches, Bool} when K == peeloff -> %% association + {K, T, Opts} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, + [Matches, Sender, CB] = Opts, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), + false == CB orelse (S#transport.parent ! {diameter, ack}), t(T, S#transport{socket = Sock, - send = Bool}); + message_cb = CB, + send = Sender}); accept_timeout = T -> x(T); {'DOWN', _, process, _, _} = T -> @@ -477,12 +486,11 @@ getr(Key) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, - accept = Matches, - sender = Sender} + opts = Opts} = S) -> Id = assoc_id(Data), {TPid, NewS} = accept(S), - TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches, Sender}, + TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Opts}, setopts(Sock), NewS; @@ -536,12 +544,21 @@ t(T,S) -> %% Incoming message. transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> - setopts(Sock), - recv(Data, S); + setopts(S, recv(Data, S#transport{active = false})); %% Outgoing message. transition({diameter, {send, Msg}}, S) -> - send(Msg, S); + message(send, Msg, S); + +%% Monitor has sent an outgoing message. +transition(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + message(ack, Msg, S); + +%% Deferred actions from a message_cb. +transition({actions, Dir, Acts}, S) -> + actions(Acts, Dir, S); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> @@ -581,8 +598,12 @@ transition({resolve_port, Pid}, #transport{socket = Sock}) %% m/2 -m({Bin, StreamId}, #monitor{} = S) -> - send(StreamId, Bin, S); +m({Msg, StreamId}, #monitor{socket = Sock, + transport = TPid, + assoc_id = AId, + ack = B}) -> + send(Sock, AId, StreamId, Msg), + B andalso (TPid ! Msg); m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) -> x(T). @@ -632,48 +653,47 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% Start monitor process on first send. send(Msg, #transport{send = true, socket = Sock, - assoc_id = AId} + assoc_id = AId, + message_cb = CB} = S) -> {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(), socket = Sock, - assoc_id = AId}), + assoc_id = AId, + ack = false /= CB}), monitor(process, MPid), send(Msg, S#transport{send = MPid}); %% Outbound Diameter message on a specified stream ... -send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, +send(#diameter_packet{transport_data = {outstream, SId}} + = Msg, #transport{streams = {_, OS}} = S) -> - send(SId rem OS, Bin, S), - S; + send(SId rem OS, Msg, S); %% ... or not: rotate through all streams. -send(#diameter_packet{bin = Bin}, S) -> - send(Bin, S); -send(Bin, #transport{streams = {_, OS}, +send(Msg, #transport{streams = {_, OS}, os = N} - = S) - when is_binary(Bin) -> - send(N, Bin, S), - S#transport{os = (N + 1) rem OS}. + = S) -> + send(N, Msg, S#transport{os = (N + 1) rem OS}). %% send/3 -send(StreamId, Bin, #transport{send = false, +send(StreamId, Msg, #transport{send = false, socket = Sock, - assoc_id = AId}) -> - send(Sock, AId, StreamId, Bin); - -send(StreamId, Bin, #transport{send = MPid}) -> - MPid ! {Bin, StreamId}, - MPid; + assoc_id = AId} + = S) -> + send(Sock, AId, StreamId, Msg), + message(ack, Msg, S); -send(StreamId, Bin, #monitor{socket = Sock, - assoc_id = AId}) -> - send(Sock, AId, StreamId, Bin). +send(StreamId, Msg, #transport{send = MPid} = S) -> + MPid ! {Msg, StreamId}, + S. %% send/4 +send(Sock, AssocId, StreamId, #diameter_packet{bin = Bin}) -> + send(Sock, AssocId, StreamId, Bin); + send(Sock, AssocId, StreamId, Bin) -> case gen_sctp:send(Sock, AssocId, StreamId, Bin) of ok -> @@ -720,11 +740,10 @@ recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin} recv(T, S#transport{assoc_id = Id}); %% Inbound Diameter message. -recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid} = S) +recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, S) when is_binary(Bin) -> - diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id}, - bin = Bin}), - S; + Pkt = #diameter_packet{bin = Bin, transport_data = {stream, Id}}, + message(recv, Pkt, S); recv({_, #sctp_shutdown_event{}}, _) -> stop; @@ -842,6 +861,23 @@ connect(Sock, [Addr | AT] = As, Port, Reasons) -> connect(Sock, AT, Port, [{Addr, E} | Reasons]) end. +%% setopts/2 + +setopts(_, #transport{socket = Sock, + active = A, + recv = B} + = S) + when B, not A -> + setopts(Sock), + S#transport{active = true}; + +setopts(_, #transport{} = S) -> + S; + +setopts(#transport{socket = Sock}, T) -> + setopts(Sock), + T. + %% setopts/1 setopts(Sock) -> @@ -849,3 +885,58 @@ setopts(Sock) -> ok -> ok; X -> x({setopts, Sock, X}) %% possibly on peer disconnect end. + +%% A message_cb is invoked whenever a message is sent or received, or +%% to provide acknowledgement of a completed send or discarded +%% request. See diameter_tcp for semantics. + +%% message/3 + +message(send, false = M, S) -> + message(ack, M, S); + +message(ack, _, #transport{message_cb = false} = S) -> + S; + +message(Dir, Msg, #transport{message_cb = CB} = S) -> + setopts(S, actions(cb(CB, Dir, Msg), Dir, S)). + +%% actions/3 + +actions([], _, S) -> + S; + +actions([B | As], Dir, S) + when is_boolean(B) -> + actions(As, Dir, S#transport{recv = B}); + +actions([Dir | As], _, S) + when Dir == send; + Dir == recv -> + actions(As, Dir, S); + +actions([Msg | As], send = Dir, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + actions(As, Dir, send(Msg, S)); + +actions([Msg | As], recv = Dir, #transport{parent = Pid} = S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + diameter_peer:recv(Pid, Msg), + actions(As, Dir, S); + +actions([{defer, Tmo, Acts} | As], Dir, S) -> + erlang:send_after(Tmo, self(), {actions, Dir, Acts}), + actions(As, Dir, S); + +actions(CB, _, S) -> + S#transport{message_cb = CB}. + +%% cb/3 + +cb(false, _, Msg) -> + [Msg]; + +cb(CB, Dir, Msg) -> + diameter_lib:eval([CB, Dir, Msg]). -- cgit v1.2.3 From c591056bb2ce9147cc946e068e980050be67dcc1 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 12 Jun 2017 15:26:45 +0200 Subject: Add diameter_sctp option packet To determine the wrapping of messages passed to recv callbacks and into diameter. The default passing of the input stream in transport_data is probably of no practical use, but has been set since time immemorial. --- lib/diameter/src/transport/diameter_sctp.erl | 53 ++++++++++++++++++++++------ 1 file changed, 42 insertions(+), 11 deletions(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 470a6aa9bf..e47febaf99 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -78,6 +78,7 @@ -type option() :: {sender, boolean()} | sender + | {packet, boolean() | raw} | {message_cb, false | diameter:evaluable()}. -type uint() :: non_neg_integer(). @@ -101,6 +102,8 @@ streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, os = 0 :: uint(), %% next output stream + packet = true :: boolean() %% legacy transport_data? + | raw, message_cb :: false | diameter:evaluable(), send = false :: pid() | boolean()}). %% sending process @@ -243,7 +246,8 @@ i(#monitor{transport = TPid} = S) -> i({listen, Ref, {Opts, SvcPid, Addrs}}) -> monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {Split, Rest} = proplists:split(Opts, [accept, sender, message_cb]), + {Split, Rest} + = proplists:split(Opts, [accept, packet, sender, message_cb]), OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), @@ -252,14 +256,15 @@ i({listen, Ref, {Opts, SvcPid, Addrs}}) -> #listener{ref = Ref, service = SvcPid, socket = Sock, - opts = [[[M] || {accept, M} <- OwnOpts] + opts = [[[M] || {accept, M} <- OwnOpts], + proplists:get_value(packet, OwnOpts, true) | [proplists:get_value(K, OwnOpts, false) || K <- [sender, message_cb]]]}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> {[Ps | Split], Rest} - = proplists:split(Opts, [rport, raddr, sender, message_cb]), + = proplists:split(Opts, [rport, raddr, packet, sender, message_cb]), OwnOpts = lists:append(Split), CB = proplists:get_value(message_cb, OwnOpts, false), false == CB orelse (Pid ! {diameter, ack}), @@ -273,6 +278,7 @@ i({connect, Pid, Opts, Addrs, Ref}) -> mode = {connect, connect(Sock, RAs, RP, [])}, socket = Sock, message_cb = CB, + packet = proplists:get_value(packet, OwnOpts, true), send = proplists:get_value(sender, OwnOpts, false)}; %% An accepting transport spawned by diameter, not yet owning an @@ -309,12 +315,13 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> S#transport{parent = Pid}; {K, T, Opts} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, - [Matches, Sender, CB] = Opts, + [Matches, Packet, Sender, CB] = Opts, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), false == CB orelse (S#transport.parent ! {diameter, ack}), t(T, S#transport{socket = Sock, message_cb = CB, + packet = Packet, send = Sender}); accept_timeout = T -> x(T); @@ -740,10 +747,9 @@ recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin} recv(T, S#transport{assoc_id = Id}); %% Inbound Diameter message. -recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, S) +recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S) when is_binary(Bin) -> - Pkt = #diameter_packet{bin = Bin, transport_data = {stream, Id}}, - message(recv, Pkt, S); + message(recv, Msg, S); recv({_, #sctp_shutdown_event{}}, _) -> stop; @@ -888,7 +894,9 @@ setopts(Sock) -> %% A message_cb is invoked whenever a message is sent or received, or %% to provide acknowledgement of a completed send or discarded -%% request. See diameter_tcp for semantics. +%% request. See diameter_tcp for semantics, the only difference being +%% that a recv callback can get a diameter_packet record as Msg +%% depending on how/if option packet has been specified. %% message/3 @@ -898,8 +906,8 @@ message(send, false = M, S) -> message(ack, _, #transport{message_cb = false} = S) -> S; -message(Dir, Msg, #transport{message_cb = CB} = S) -> - setopts(S, actions(cb(CB, Dir, Msg), Dir, S)). +message(Dir, Msg, S) -> + setopts(S, actions(cb(S, Dir, Msg), Dir, S)). %% actions/3 @@ -935,8 +943,31 @@ actions(CB, _, S) -> %% cb/3 -cb(false, _, Msg) -> +cb(#transport{message_cb = false, packet = P}, recv, Msg) -> + [pkt(P, true, Msg)]; + +cb(#transport{message_cb = CB, packet = P}, recv = D, Msg) -> + cb(CB, D, pkt(P, false, Msg)); + +cb(#transport{message_cb = CB}, Dir, Msg) -> + cb(CB, Dir, Msg); + +cb(false, send, Msg) -> [Msg]; cb(CB, Dir, Msg) -> diameter_lib:eval([CB, Dir, Msg]). + +%% pkt/3 + +pkt(false, _, {_Info, Bin}) -> + Bin; + +pkt(true, _, {[#sctp_sndrcvinfo{stream = Id}], Bin}) -> + #diameter_packet{bin = Bin, transport_data = {stream, Id}}; + +pkt(raw, true, {[Info], Bin}) -> + #diameter_packet{bin = Bin, transport_data = Info}; + +pkt(raw, false, {[_], _} = Msg) -> + Msg. -- cgit v1.2.3 From 19ad1805bfc895b12ea692e713cbfd252c44d147 Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 12 Jun 2017 20:42:52 +0200 Subject: Fix dialyzer warnings diameter_sctp.erl:292: Record construction #transport{parent::pid(),mode::{'accept',atom() | pid() | port() | {atom(),atom()}},active::'false',recv::'true',os::0,packet::'true',message_cb::'undefined',send::'false'} violates the declared type of field message_cb::'false' | fun() | maybe_improper_list(fun() | maybe_improper_list(any(),[any()]) | {atom(),atom(),[any()]},[any()]) | {atom(),atom(),[any()]} diameter_sctp.erl:302: Record construction #transport{mode::{'accept',atom() | pid() | port() | {atom(),atom()}},active::'false',recv::'true',os::0,packet::'true',message_cb::'undefined',send::'false'} violates the declared type of field message_cb::'false' | fun() | maybe_improper_list(fun() | maybe_improper_list(any(),[any()]) | {atom(),atom(),[any()]},[any()]) | {atom(),atom(),[any()]} --- lib/diameter/src/transport/diameter_sctp.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'lib/diameter/src') diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index e47febaf99..f243f2b7cd 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -104,7 +104,7 @@ os = 0 :: uint(), %% next output stream packet = true :: boolean() %% legacy transport_data? | raw, - message_cb :: false | diameter:evaluable(), + message_cb = false :: false | diameter:evaluable(), send = false :: pid() | boolean()}). %% sending process %% Monitor process state. -- cgit v1.2.3