aboutsummaryrefslogtreecommitdiffstats
path: root/lib/diameter/src
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2017-06-14 09:29:05 +0200
committerAnders Svensson <[email protected]>2017-06-14 09:29:05 +0200
commit1bf842f3cd603ddd6246d874e188e4f75b0cc692 (patch)
treed47b488ce7b0e6402241ac99ee4161e0ebffee6b /lib/diameter/src
parentd4fea060349a72fb58267e82c2d6bfa7b638b2c9 (diff)
parent69c5a74179e13e145da3da70e02dd43881a82008 (diff)
downloadotp-1bf842f3cd603ddd6246d874e188e4f75b0cc692.tar.gz
otp-1bf842f3cd603ddd6246d874e188e4f75b0cc692.tar.bz2
otp-1bf842f3cd603ddd6246d874e188e4f75b0cc692.zip
Merge branch 'anders/diameter/transport/ERL-332'
* anders/diameter/transport/ERL-332: (35 commits) Capitulate on SCTP vs sparc-sun-solaris2.10 Remove obsolete traffic testcase Fix dialyzer warnings Remove client/server string decode from traffic suite Add diameter_sctp option packet Add diameter_sctp send/recv callbacks Let diameter_tcp send/recv callbacks deal in diameter_packet Randomly select traffic testcases Exercise diameter_tcp message callbacks in traffic suite Exercise diameter_{tcp,sctp} sender in traffic suite Remove upgrade from diameter_traffic Add diameter_tcp send/recv callbacks Make diameter_{tcp,sctp} sender configurable Remove upgrade from diameter_sctp; tweak diameter_tcp to match Fix incomprehensible dialyzer warning Simplify acks to transport processes Strip throttling callbacks from diameter_tcp Deal with (another) SCTP association id quirk on Solaris Use binary:copy/2 when generating largish data in test suites Deal with SCTP association id quirk on Solaris ...
Diffstat (limited to 'lib/diameter/src')
-rw-r--r--lib/diameter/src/base/diameter_config.erl2
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl146
-rw-r--r--lib/diameter/src/base/diameter_reg.erl2
-rw-r--r--lib/diameter/src/base/diameter_traffic.erl84
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl39
-rw-r--r--lib/diameter/src/transport/diameter_sctp.erl319
-rw-r--r--lib/diameter/src/transport/diameter_sctp_sup.erl3
-rw-r--r--lib/diameter/src/transport/diameter_tcp.erl491
8 files changed, 665 insertions, 421 deletions
diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl
index e10804c931..cea671f275 100644
--- a/lib/diameter/src/base/diameter_config.erl
+++ b/lib/diameter/src/base/diameter_config.erl
@@ -277,7 +277,7 @@ start_link() ->
start_link(T) ->
proc_lib:start_link(?MODULE, init, [T], infinity, []).
-
+
state() ->
call(state).
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 46d231da74..601e48e817 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -129,6 +129,7 @@
%% the request was sent explicitly with
%% diameter:call/4.
strict :: boolean(),
+ ack = false :: boolean(),
length_errors :: exit | handle | discard,
incoming_maxlen :: integer() | infinity}).
@@ -235,7 +236,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) ->
Tmo = proplists:get_value(capx_timeout, Opts, ?CAPX_TIMEOUT),
Strictness = proplists:get_value(capx_strictness, Opts, true),
- OnLengthErr = proplists:get_value(length_errors, Opts, exit),
+ LengthErr = proplists:get_value(length_errors, Opts, exit),
{TPid, Addrs} = start_transport(T, Rest, Svc),
@@ -247,7 +248,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) ->
dictionary = Dict0,
mode = M,
service = svc(Svc, Addrs),
- length_errors = OnLengthErr,
+ length_errors = LengthErr,
strict = Strictness,
incoming_maxlen = Maxlen}.
%% The transport returns its local ip addresses so that different
@@ -442,9 +443,18 @@ transition({connection_timeout = T, TPid},
transition({connection_timeout, _}, _) ->
ok;
+%% Requests for acknowledgements to the transport.
+transition({diameter, ack}, S) ->
+ S#state{ack = true};
+
%% Incoming message from the transport.
-transition({diameter, {recv, MsgT}}, S) ->
- incoming(MsgT, S);
+transition({diameter, {recv, Msg}}, S) ->
+ incoming(recv(Msg, S), S);
+
+%% Handler of an incoming request is telling of its existence.
+transition({handler, Pid}, _) ->
+ put_route(Pid),
+ ok;
%% Timeout when still in the same state ...
transition({timeout = T, PS}, #state{state = PS}) ->
@@ -458,7 +468,7 @@ transition({timeout, _}, _) ->
transition({send, Msg}, S) ->
outgoing(Msg, S);
transition({send, Msg, Route}, S) ->
- put_route(Route),
+ route_outgoing(Route),
outgoing(Msg, S);
%% Request for graceful shutdown at remove_transport, stop_service of
@@ -487,12 +497,13 @@ transition({'DOWN', _, process, WPid, _},
transition({'DOWN', _, process, TPid, _},
#state{transport = TPid}
= S) ->
- start_next(S);
+ start_next(S#state{ack = false});
%% Transport has died after connection timeout, or handler process has
%% died.
-transition({'DOWN', _, process, Pid, _}, _) ->
- erase_route(Pid),
+transition({'DOWN', _, process, Pid, _}, #state{transport = TPid}) ->
+ is_reference(erase_route(Pid))
+ andalso send(TPid, false), %% answer not forthcoming
ok;
%% State query.
@@ -502,37 +513,56 @@ transition({state, Pid}, #state{state = S, transport = TPid}) ->
%% Crash on anything unexpected.
-%% put_route/1
-%%
+%% route_outgoing/1
+
%% Map identifiers in an outgoing request to be able to lookup the
%% handler process when the answer is received.
-
-put_route({Pid, Ref, Seqs}) ->
+route_outgoing({Pid, Ref, Seqs}) -> %% request
MRef = monitor(process, Pid),
put(Pid, Seqs),
- put(Seqs, {Pid, Ref, MRef}).
+ put(Seqs, {Pid, Ref, MRef});
+
+%% Remove a mapping made for an incoming request.
+route_outgoing(Pid)
+ when is_pid(Pid) -> %% answer
+ MRef = erase_route(Pid),
+ undefined == MRef orelse demonitor(MRef).
-%% get_route/1
+%% put_route/1
+
+%% Monitor on a handler process for an incoming request.
+put_route(Pid) ->
+ MRef = monitor(process, Pid),
+ put(Pid, MRef).
-get_route(#diameter_packet{header = #diameter_header{is_request = false}}
- = Pkt) ->
+%% get_route/2
+
+%% incoming answer
+get_route(_, #diameter_packet{header = #diameter_header{is_request = false}}
+ = Pkt) ->
Seqs = diameter_codec:sequence_numbers(Pkt),
case erase(Seqs) of
{Pid, Ref, MRef} ->
demonitor(MRef),
erase(Pid),
{Pid, Ref, self()};
- undefined ->
+ undefined -> %% request unknown
false
end;
-get_route(_) ->
- false.
+%% incoming request
+get_route(Ack, _) ->
+ Ack.
%% erase_route/1
erase_route(Pid) ->
- erase(erase(Pid)).
+ case erase(Pid) of
+ {_,_} = Seqs ->
+ erase(Seqs);
+ T ->
+ T
+ end.
%% capx/1
@@ -611,29 +641,24 @@ encode(Rec, Dict) ->
%% incoming/2
-incoming({Msg, NPid}, S) ->
- try recv(Msg, S) of
- T ->
- NPid ! {diameter, discard},
- T
- catch
- {?MODULE, Name, Pkt} ->
- incoming(Name, Pkt, NPid, S)
- end;
+incoming({recv = T, Name, Pkt}, #state{parent = Pid, ack = Ack} = S) ->
+ Pid ! {T, self(), get_route(Ack, Pkt), Name, Pkt},
+ rcv(Name, Pkt, S);
-incoming(Msg, S) ->
- try
- recv(Msg, S)
- catch
- {?MODULE, Name, Pkt} ->
- incoming(Name, Pkt, false, S)
- end.
+incoming(#diameter_header{is_request = R}, #state{transport = TPid,
+ ack = Ack}) ->
+ R andalso Ack andalso send(TPid, false),
+ ok;
+
+incoming(<<_:32, 1:1, _/bits>>, #state{ack = true} = S) ->
+ send(S#state.transport, false),
+ ok;
-%% incoming/4
+incoming(<<_/bits>>, _) ->
+ ok;
-incoming(Name, Pkt, NPid, #state{parent = Pid} = S) ->
- Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid},
- rcv(Name, Pkt, S).
+incoming(T, _) ->
+ T.
%% recv/2
@@ -658,18 +683,19 @@ recv1(_,
#diameter_packet{header = H, bin = Bin},
#state{incoming_maxlen = M})
when M < size(Bin) ->
- invalid(false, incoming_maxlen_exceeded, {size(Bin), H});
+ invalid(false, incoming_maxlen_exceeded, {size(Bin), H}),
+ H;
%% Ignore anything but an expected CER/CEA if so configured. This is
%% non-standard behaviour.
-recv1(Name, _, #state{state = {'Wait-CEA', _, _},
- strict = false})
+recv1(Name, #diameter_packet{header = H}, #state{state = {'Wait-CEA', _, _},
+ strict = false})
when Name /= 'CEA' ->
- ok;
-recv1(Name, _, #state{state = recv_CER,
- strict = false})
+ H;
+recv1(Name, #diameter_packet{header = H}, #state{state = recv_CER,
+ strict = false})
when Name /= 'CER' ->
- ok;
+ H;
%% Incoming request after outgoing DPR: discard. Don't discard DPR, so
%% both ends don't do so when sending simultaneously.
@@ -677,13 +703,15 @@ recv1(Name,
#diameter_packet{header = #diameter_header{is_request = true} = H},
#state{dpr = {_,_,_}})
when Name /= 'DPR' ->
- invalid(false, recv_after_outgoing_dpr, H);
+ invalid(false, recv_after_outgoing_dpr, H),
+ H;
%% Incoming request after incoming DPR: discard.
recv1(_,
#diameter_packet{header = #diameter_header{is_request = true} = H},
#state{dpr = true}) ->
- invalid(false, recv_after_incoming_dpr, H);
+ invalid(false, recv_after_incoming_dpr, H),
+ H;
%% DPA with identifier mismatch, or in response to a DPR initiated by
%% the service.
@@ -701,7 +729,7 @@ recv1('DPA' = N,
%% Any other message with a header and no length errors: send to the
%% parent.
recv1(Name, Pkt, #state{}) ->
- throw({?MODULE, Name, Pkt}).
+ {recv, Name, Pkt}.
%% recv/3
@@ -720,10 +748,12 @@ recv(#diameter_header{}
#diameter_packet{bin = Bin},
#state{length_errors = E}) ->
T = {size(Bin), bit_size(Bin) rem 8, H},
- invalid(E, message_length_mismatch, T);
+ invalid(E, message_length_mismatch, T),
+ Bin;
recv(false, #diameter_packet{bin = Bin}, #state{length_errors = E}) ->
- invalid(E, truncated_header, Bin).
+ invalid(E, truncated_header, Bin),
+ Bin.
%% Note that counters here only count discarded messages.
invalid(E, Reason, T) ->
@@ -779,14 +809,10 @@ rcv('DPA' = N,
diameter_peer:close(TPid),
{stop, N};
-%% Ignore anything else, an unsolicited DPA in particular. Note that
-%% dpa_timeout deals with the case in which the peer sends the wrong
-%% identifiers in DPA.
-rcv(N, #diameter_packet{header = H}, _)
- when N == 'CER';
- N == 'CEA';
- N == 'DPR';
- N == 'DPA' ->
+%% Ignore an unsolicited DPA in particular. Note that dpa_timeout
+%% deals with the case in which the peer sends the wrong identifiers
+%% in DPA.
+rcv('DPA' = N, #diameter_packet{header = H}, _) ->
?LOG(ignored, N),
%% Note that these aren't counted in the normal recv counter.
diameter_stats:incr({diameter_codec:msg_id(H), recv, ignored}),
diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl
index 9027130063..4910979219 100644
--- a/lib/diameter/src/base/diameter_reg.erl
+++ b/lib/diameter/src/base/diameter_reg.erl
@@ -137,7 +137,7 @@ match(Pat) ->
match(Pat, Pid) ->
ets:match_object(?TABLE, {Pat, Pid}).
-
+
%% ===========================================================================
%% # wait(Pat)
%%
diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl
index bc1ccf4feb..ccfab22e9c 100644
--- a/lib/diameter/src/base/diameter_traffic.erl
+++ b/lib/diameter/src/base/diameter_traffic.erl
@@ -30,7 +30,7 @@
-export([send_request/4]).
%% towards diameter_watchdog
--export([receive_message/6]).
+-export([receive_message/5]).
%% towards diameter_peer_fsm and diameter_watchdog
-export([incr/4,
@@ -54,9 +54,6 @@
-define(RELAY, ?DIAMETER_DICT_RELAY).
-define(BASE, ?DIAMETER_DICT_COMMON). %% Note: the RFC 3588 dictionary
--define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests
--define(DEFAULT_SPAWN_OPTS, []).
-
%% Table containing outgoing entries that live and die with
%% peer_up/down. The name is historic, since the table used to contain
%% information about outgoing requests for which an answer has yet to
@@ -67,7 +64,7 @@
-record(options,
{filter = none :: diameter:peer_filter(),
extra = [] :: list(),
- timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF,
+ timeout = 5000 :: 0..16#FFFFFFFF, %% for outgoing requests
detach = false :: boolean()}).
%% Term passed back to receive_message/6 with every incoming message.
@@ -93,7 +90,7 @@
packet :: #diameter_packet{} | undefined}). %% of request
%% ---------------------------------------------------------------------------
-%% # make_recvdata/1
+%% make_recvdata/1
%% ---------------------------------------------------------------------------
make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) ->
@@ -206,42 +203,38 @@ incr_rc(Dir, Pkt, TPid, Dict0) ->
incr_rc(Dir, Pkt, TPid, {Dict0, Dict0, Dict0}).
%% ---------------------------------------------------------------------------
-%% # receive_message/6
+%% receive_message/5
%%
-%% Handle an incoming Diameter message.
+%% Handle an incoming Diameter message in a watchdog process.
%% ---------------------------------------------------------------------------
-%% Handle an incoming Diameter message in the watchdog process.
-
-receive_message(TPid, Route, Pkt, false, Dict0, RecvData) ->
- incoming(TPid, Route, Pkt, Dict0, RecvData);
-
-receive_message(TPid, Route, Pkt, NPid, Dict0, RecvData) ->
- NPid ! {diameter, incoming(TPid, Route, Pkt, Dict0, RecvData)}.
-
-%% incoming/4
-
-incoming(TPid, Route, Pkt, Dict0, RecvData)
- when is_pid(TPid) ->
+-spec receive_message(pid(), Route, #diameter_packet{}, module(), RecvData)
+ -> pid()
+ | boolean()
+ when Route :: {Handler, RequestRef, Seqs}
+ | Ack,
+ RecvData :: {[SpawnOpt], #recvdata{}},
+ SpawnOpt :: term(),
+ Handler :: pid(),
+ RequestRef :: reference(),
+ Seqs :: {0..16#FFFFFFFF, 0..16#FFFFFFFF},
+ Ack :: boolean().
+
+receive_message(TPid, Route, Pkt, Dict0, RecvData) ->
#diameter_packet{header = #diameter_header{is_request = R}} = Pkt,
recv(R, Route, TPid, Pkt, Dict0, RecvData).
%% recv/6
%% Incoming request ...
-recv(true, false, TPid, Pkt, Dict0, T) ->
- try
- {request, spawn_request(TPid, Pkt, Dict0, T)}
- catch
- error: system_limit = E -> %% discard
- ?LOG(error, E),
- discard
- end;
+recv(true, Ack, TPid, Pkt, Dict0, T)
+ when is_boolean(Ack) ->
+ spawn_request(Ack, TPid, Pkt, Dict0, T);
%% ... answer to known request ...
recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) ->
Pid ! {answer, Ref, TPid, Dict0, Pkt},
- {answer, Pid};
+ true;
%% Note that failover could have happened prior to this message being
%% received and triggering failback. That is, both a failover message
@@ -256,23 +249,22 @@ recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) ->
recv(false, false, TPid, Pkt, _, _) ->
?LOG(discarded, Pkt#diameter_packet.header),
incr(TPid, {{unknown, 0}, recv, discarded}),
- discard.
-
-%% spawn_request/4
+ false.
-spawn_request(TPid, Pkt, Dict0, {Opts, RecvData}) ->
- spawn_request(TPid, Pkt, Dict0, Opts, RecvData);
-spawn_request(TPid, Pkt, Dict0, RecvData) ->
- spawn_request(TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData).
+%% spawn_request/5
-spawn_request(TPid, Pkt, Dict0, Opts, RecvData) ->
- spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts).
+spawn_request(Ack, TPid, Pkt, Dict0, {Opts, RecvData}) ->
+ spawn_opt(fun() ->
+ recv_request(Ack, TPid, Pkt, Dict0, RecvData)
+ end,
+ Opts).
%% ---------------------------------------------------------------------------
-%% recv_request/4
+%% recv_request/5
%% ---------------------------------------------------------------------------
-recv_request(TPid,
+recv_request(Ack,
+ TPid,
#diameter_packet{header = #diameter_header{application_id = Id}}
= Pkt,
Dict0,
@@ -280,6 +272,7 @@ recv_request(TPid,
apps = Apps,
codec = Opts}
= RecvData) ->
+ Ack andalso (TPid ! {handler, self()}),
diameter_codec:setopts([{common_dictionary, Dict0} | Opts]),
send_A(recv_R(diameter_service:find_incoming_app(PeerT, TPid, Id, Apps),
TPid,
@@ -511,7 +504,7 @@ send_A(T, TPid, {AppDict, Dict0} = DictT0, ReqPkt, EvalPktFs, EvalFs) ->
{MsgDict, Pkt} = reply(T, TPid, DictT0, EvalPktFs, ReqPkt),
incr(send, Pkt, TPid, AppDict),
incr_rc(send, Pkt, TPid, {MsgDict, AppDict, Dict0}), %% count outgoing
- send(TPid, Pkt),
+ send(TPid, Pkt, _Route = self()),
lists:foreach(fun diameter_lib:eval/1, EvalFs).
%% answer/6
@@ -1207,7 +1200,7 @@ x(T) ->
exit(T).
%% ---------------------------------------------------------------------------
-%% # send_request/4
+%% send_request/4
%%
%% Handle an outgoing Diameter request.
%% ---------------------------------------------------------------------------
@@ -1296,7 +1289,7 @@ mo(T, _) ->
?ERROR({invalid_option, T}).
%% ---------------------------------------------------------------------------
-%% # send_request/6
+%% send_request/6
%% ---------------------------------------------------------------------------
%% Send an outgoing request in its dedicated process.
@@ -1745,11 +1738,6 @@ recv(TPid, Pid, TRef, {LocalTRef, MRef}) ->
exit({timeout, LocalTRef, TPid} = T)
end.
-%% send/2
-
-send(Pid, Pkt) ->
- Pid ! {send, Pkt}.
-
%% send/3
send(Pid, Pkt, Route) ->
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index f28b8f2910..a2eb661870 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -283,7 +283,7 @@ event(Msg,
?LOG(transition, {From, To}).
data(Msg, TPid, reopen, okay) ->
- {recv, TPid, false, 'DWA', _Pkt, _NPid} = Msg, %% assert
+ {recv, TPid, _, 'DWA', _Pkt} = Msg, %% assert
{TPid, T} = eraser(open),
[T];
@@ -302,6 +302,8 @@ tpid(_, Pid)
tpid(Pid, _) ->
Pid.
+%% send/2
+
send(Pid, T) ->
Pid ! T.
@@ -447,14 +449,15 @@ transition({'DOWN', _, process, TPid, _Reason} = D,
end;
%% Incoming message.
-transition({recv, TPid, Route, Name, Pkt, NPid},
+transition({recv, TPid, Route, Name, Pkt},
#watchdog{transport = TPid}
= S) ->
- try
- incoming(Name, Pkt, NPid, S)
- catch
+ try incoming(Route, Name, Pkt, S) of
#watchdog{dictionary = Dict0, receive_data = T} = NS ->
- diameter_traffic:receive_message(TPid, Route, Pkt, NPid, Dict0, T),
+ diameter_traffic:receive_message(TPid, Route, Pkt, Dict0, T),
+ NS
+ catch
+ #watchdog{} = NS ->
NS
end;
@@ -586,25 +589,13 @@ send_watchdog(#watchdog{pending = false,
%% incoming/4
-incoming(Name, Pkt, false, S) ->
- recv(Name, Pkt, S);
-
-incoming(Name, Pkt, NPid, S) ->
- try
- recv(Name, Pkt, S)
- after
- NPid ! {diameter, discard}
- end.
-
-%% recv/3
-
-recv(Name, Pkt, S) ->
- try rcv(Name, Pkt, rcv(Name, S)) of
- #watchdog{} = NS ->
- throw(NS)
+incoming(Route, Name, Pkt, S) ->
+ try rcv(Name, S) of
+ NS -> rcv(Name, Pkt, NS)
catch
- #watchdog{} = NS -> %% throwaway
- NS
+ #watchdog{transport = TPid} = NS when Route -> %% incoming request
+ send(TPid, {send, false}), %% requiring ack
+ throw(NS)
end.
%% rcv/3
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl
index 76aacabcb8..6a9f1f940b 100644
--- a/lib/diameter/src/transport/diameter_sctp.erl
+++ b/lib/diameter/src/transport/diameter_sctp.erl
@@ -52,21 +52,20 @@
%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY, ref).
+-define(TRANSPORT_KEY, transport).
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
%% The default port for a listener.
-define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1
-%% Remote addresses to accept connections from.
--define(DEFAULT_ACCEPT, []). %% any
-
%% How long to wait for a transport process to attach after
%% association establishment.
-define(ACCEPT_TIMEOUT, 5000).
-type connect_option() :: {raddr, inet:ip_address()}
| {rport, inet:port_number()}
+ | option()
| term(). %% gen_sctp:open_option().
-type match() :: inet:ip_address()
@@ -74,8 +73,14 @@
| [match()].
-type listen_option() :: {accept, match()}
+ | option()
| term(). %% gen_sctp:open_option().
+-type option() :: {sender, boolean()}
+ | sender
+ | {packet, boolean() | raw}
+ | {message_cb, false | diameter:evaluable()}.
+
-type uint() :: non_neg_integer().
%% Accepting/connecting transport process state.
@@ -87,20 +92,35 @@
%% {RAs, RP, Errors}
| connect,
socket :: gen_sctp:sctp_socket() | undefined,
- assoc_id :: gen_sctp:assoc_id(), %% association identifier
+ active = false :: boolean(), %% is socket active?
+ recv = true :: boolean(), %% should it be active?
+ assoc_id :: gen_sctp:assoc_id() %% association identifier
+ | undefined
+ | true,
peer :: {[inet:ip_address()], uint()} %% {RAs, RP}
| undefined,
streams :: {uint(), uint()} %% {InStream, OutStream} counts
| undefined,
- os = 0 :: uint()}). %% next output stream
+ os = 0 :: uint(), %% next output stream
+ packet = true :: boolean() %% legacy transport_data?
+ | raw,
+ message_cb = false :: false | diameter:evaluable(),
+ send = false :: pid() | boolean()}). %% sending process
+
+%% Monitor process state.
+-record(monitor,
+ {transport :: pid(),
+ ack = false :: boolean(),
+ socket :: gen_sctp:sctp_socket(),
+ assoc_id :: gen_sctp:assoc_id()}). %% next output stream
%% Listener process state.
-record(listener,
{ref :: reference(),
socket :: gen_sctp:sctp_socket(),
- service = false :: false | pid(), %% service process
+ service :: pid(), %% service process
pending = {0, queue:new()},
- accept :: [match()]}).
+ opts :: [[match()] | boolean() | diameter:evaluable()]}).
%% Field pending implements two queues: the first of transport-to-be
%% processes to which an association has been assigned but for which
%% diameter hasn't yet spawned a transport process, a short-lived
@@ -132,11 +152,11 @@
start(T, Svc, Opts)
when is_list(Opts) ->
#diameter_service{capabilities = Caps,
- pid = SPid}
+ pid = Pid}
= Svc,
diameter_sctp_sup:start(), %% start supervisors on demand
Addrs = Caps#diameter_caps.host_ip_address,
- s(T, Addrs, SPid, lists:map(fun ip/1, Opts)).
+ s(T, Addrs, Pid, lists:map(fun ip/1, Opts)).
ip({ifaddr, A}) ->
{ip, A};
@@ -147,9 +167,9 @@ ip(T) ->
%% when there is not yet an association to assign it, or at comm_up on
%% a new association in which case the call retrieves a transport from
%% the pending queue.
-s({accept, Ref} = A, Addrs, SPid, Opts) ->
- {ok, LPid, LAs} = listener(Ref, {Opts, Addrs}),
- try gen_server:call(LPid, {A, self(), SPid}, infinity) of
+s({accept, Ref} = A, Addrs, SvcPid, Opts) ->
+ {ok, LPid, LAs} = listener(Ref, {Opts, SvcPid, Addrs}),
+ try gen_server:call(LPid, {A, self()}, infinity) of
{ok, TPid} ->
{ok, TPid, LAs};
No ->
@@ -162,7 +182,7 @@ s({accept, Ref} = A, Addrs, SPid, Opts) ->
%% gen_sctp in order to be able to accept a new association only
%% *after* an accepting transport has been spawned.
-s({connect = C, Ref}, Addrs, _SPid, Opts) ->
+s({connect = C, Ref}, Addrs, _SvcPid, Opts) ->
diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}).
%% start_link/1
@@ -216,22 +236,39 @@ init(T) ->
%% i/1
+i(#monitor{transport = TPid} = S) ->
+ monitor(process, TPid),
+ putr(?TRANSPORT_KEY, TPid),
+ proc_lib:init_ack({ok, self()}),
+ S;
+
%% A process owning a listening socket.
-i({listen, Ref, {Opts, Addrs}}) ->
+i({listen, Ref, {Opts, SvcPid, Addrs}}) ->
+ monitor(process, SvcPid),
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
- {[Matches], Rest} = proplists:split(Opts, [accept]),
+ {Split, Rest}
+ = proplists:split(Opts, [accept, packet, sender, message_cb]),
+ OwnOpts = lists:append(Split),
{LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT),
ok = gen_sctp:listen(Sock, true),
true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}),
proc_lib:init_ack({ok, self(), LAs}),
#listener{ref = Ref,
+ service = SvcPid,
socket = Sock,
- accept = [[M] || {accept, M} <- Matches]};
+ opts = [[[M] || {accept, M} <- OwnOpts],
+ proplists:get_value(packet, OwnOpts, true)
+ | [proplists:get_value(K, OwnOpts, false)
+ || K <- [sender, message_cb]]]};
%% A connecting transport.
i({connect, Pid, Opts, Addrs, Ref}) ->
- {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]),
- RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As],
+ {[Ps | Split], Rest}
+ = proplists:split(Opts, [rport, raddr, packet, sender, message_cb]),
+ OwnOpts = lists:append(Split),
+ CB = proplists:get_value(message_cb, OwnOpts, false),
+ false == CB orelse (Pid ! {diameter, ack}),
+ RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts],
[RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps],
{LAs, Sock} = open(Addrs, Rest, 0),
putr(?REF_KEY, Ref),
@@ -239,7 +276,10 @@ i({connect, Pid, Opts, Addrs, Ref}) ->
monitor(process, Pid),
#transport{parent = Pid,
mode = {connect, connect(Sock, RAs, RP, [])},
- socket = Sock};
+ socket = Sock,
+ message_cb = CB,
+ packet = proplists:get_value(packet, OwnOpts, true),
+ send = proplists:get_value(sender, OwnOpts, false)};
%% An accepting transport spawned by diameter, not yet owning an
%% association.
@@ -273,11 +313,16 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) ->
receive
{Ref, Pid} when K == parent -> %% transport process started
S#transport{parent = Pid};
- {K, T, Matches} when K == peeloff -> %% association
+ {K, T, Opts} when K == peeloff -> %% association
{sctp, Sock, _RA, _RP, _Data} = T,
+ [Matches, Packet, Sender, CB] = Opts,
ok = accept_peer(Sock, Matches),
demonitor(Ref, [flush]),
- t(T, S#transport{socket = Sock});
+ false == CB orelse (S#transport.parent ! {diameter, ack}),
+ t(T, S#transport{socket = Sock,
+ message_cb = CB,
+ packet = Packet,
+ send = Sender});
accept_timeout = T ->
x(T);
{'DOWN', _, process, _, _} = T ->
@@ -374,13 +419,9 @@ handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref} = S) ->
{TPid, NewS} = accept(Ref, Pid, S),
{reply, {ok, TPid}, NewS};
-handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) ->
- handle_call({T, Pid}, From, if not is_pid(P), is_pid(SPid) ->
- monitor(process, SPid),
- S#listener{service = SPid};
- true ->
- S
- end);
+%% Transport is telling us of parent death.
+handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
+ {stop, {shutdown, Reason}, ok, S};
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -400,7 +441,11 @@ handle_info(T, #transport{} = S) ->
{noreply, #transport{} = t(T,S)};
handle_info(T, #listener{} = S) ->
- {noreply, #listener{} = l(T,S)}.
+ {noreply, #listener{} = l(T,S)};
+
+handle_info(T, #monitor{} = S) ->
+ m(T,S),
+ {noreply, S}.
%% Prior to the possibility of setting pool_size on in transport
%% configuration, a new accepting transport was only started following
@@ -422,6 +467,9 @@ code_change(_, State, _) ->
%% # terminate/2
%% ---------------------------------------------------------------------------
+terminate(_, #monitor{}) ->
+ ok;
+
terminate(_, #transport{assoc_id = undefined}) ->
ok;
@@ -445,11 +493,11 @@ getr(Key) ->
%% Incoming message from SCTP.
l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock,
- accept = Matches}
+ opts = Opts}
= S) ->
Id = assoc_id(Data),
{TPid, NewS} = accept(S),
- TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches},
+ TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Opts},
setopts(Sock),
NewS;
@@ -503,12 +551,21 @@ t(T,S) ->
%% Incoming message.
transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) ->
- setopts(Sock),
- recv(Data, S);
+ setopts(S, recv(Data, S#transport{active = false}));
%% Outgoing message.
transition({diameter, {send, Msg}}, S) ->
- send(Msg, S);
+ message(send, Msg, S);
+
+%% Monitor has sent an outgoing message.
+transition(Msg, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ message(ack, Msg, S);
+
+%% Deferred actions from a message_cb.
+transition({actions, Dir, Acts}, S) ->
+ actions(Acts, Dir, S);
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
@@ -522,8 +579,18 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) ->
transition({diameter, {tls, _Ref, _Type, _Bool}}, _) ->
stop;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+%% Parent process has died: call the monitor to not close the socket
+%% during an ongoing send, but don't let it take forever.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
+ send = MPid}) ->
+ is_boolean(MPid)
+ orelse ok == (catch gen_server:call(MPid, {stop, Pid}))
+ orelse exit(MPid, kill),
+ stop;
+
+%% Monitor process has died.
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop;
%% Timeout after transport process has been started.
@@ -536,6 +603,18 @@ transition({resolve_port, Pid}, #transport{socket = Sock})
Pid ! inet:port(Sock),
ok.
+%% m/2
+
+m({Msg, StreamId}, #monitor{socket = Sock,
+ transport = TPid,
+ assoc_id = AId,
+ ack = B}) ->
+ send(Sock, AId, StreamId, Msg),
+ B andalso (TPid ! Msg);
+
+m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) ->
+ x(T).
+
%% Crash on anything unexpected.
ok({ok, T}) ->
@@ -578,33 +657,52 @@ q(Ref, Pid, #listener{pending = {_,Q}}) ->
%% send/2
+%% Start monitor process on first send.
+send(Msg, #transport{send = true,
+ socket = Sock,
+ assoc_id = AId,
+ message_cb = CB}
+ = S) ->
+ {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(),
+ socket = Sock,
+ assoc_id = AId,
+ ack = false /= CB}),
+ monitor(process, MPid),
+ send(Msg, S#transport{send = MPid});
+
%% Outbound Diameter message on a specified stream ...
-send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}},
+send(#diameter_packet{transport_data = {outstream, SId}}
+ = Msg,
#transport{streams = {_, OS}}
= S) ->
- send(SId rem OS, Bin, S),
- S;
+ send(SId rem OS, Msg, S);
%% ... or not: rotate through all streams.
-send(#diameter_packet{bin = Bin}, S) ->
- send(Bin, S);
-send(Bin, #transport{streams = {_, OS},
+send(Msg, #transport{streams = {_, OS},
os = N}
- = S)
- when is_binary(Bin) ->
- send(N, Bin, S),
- S#transport{os = (N + 1) rem OS}.
+ = S) ->
+ send(N, Msg, S#transport{os = (N + 1) rem OS}).
%% send/3
-send(StreamId, Bin, #transport{socket = Sock,
- assoc_id = AId}) ->
- send(Sock, AId, StreamId, Bin).
+send(StreamId, Msg, #transport{send = false,
+ socket = Sock,
+ assoc_id = AId}
+ = S) ->
+ send(Sock, AId, StreamId, Msg),
+ message(ack, Msg, S);
+
+send(StreamId, Msg, #transport{send = MPid} = S) ->
+ MPid ! {Msg, StreamId},
+ S.
%% send/4
-send(Sock, AssocId, Stream, Bin) ->
- case gen_sctp:send(Sock, AssocId, Stream, Bin) of
+send(Sock, AssocId, StreamId, #diameter_packet{bin = Bin}) ->
+ send(Sock, AssocId, StreamId, Bin);
+
+send(Sock, AssocId, StreamId, Bin) ->
+ case gen_sctp:send(Sock, AssocId, StreamId, Bin) of
ok ->
ok;
{error, Reason} ->
@@ -624,7 +722,9 @@ recv({_, #sctp_assoc_change{state = comm_up,
= S) ->
Ref = getr(?REF_KEY),
publish(T, Ref, Id, Sock),
- up(S#transport{assoc_id = Id,
+ %% Deal with different association id after peeloff on Solaris by
+ %% taking the id from the first reception.
+ up(S#transport{assoc_id = T == accept orelse Id,
streams = {IS, OS}});
%% ... or not: try the next address.
@@ -639,17 +739,19 @@ recv({_, #sctp_assoc_change{} = E},
recv({_, #sctp_assoc_change{}}, _) ->
stop;
+%% First inbound on an accepting transport.
+recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin}
+ = T,
+ #transport{assoc_id = true}
+ = S) ->
+ recv(T, S#transport{assoc_id = Id});
+
%% Inbound Diameter message.
-recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid})
+recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S)
when is_binary(Bin) ->
- diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id},
- bin = Bin}),
- ok;
+ message(recv, Msg, S);
-recv({_, #sctp_shutdown_event{assoc_id = A}},
- #transport{assoc_id = Id})
- when A == Id;
- A == 0 ->
+recv({_, #sctp_shutdown_event{}}, _) ->
stop;
%% Note that diameter_sctp(3) documents that sctp_events cannot be
@@ -765,6 +867,23 @@ connect(Sock, [Addr | AT] = As, Port, Reasons) ->
connect(Sock, AT, Port, [{Addr, E} | Reasons])
end.
+%% setopts/2
+
+setopts(_, #transport{socket = Sock,
+ active = A,
+ recv = B}
+ = S)
+ when B, not A ->
+ setopts(Sock),
+ S#transport{active = true};
+
+setopts(_, #transport{} = S) ->
+ S;
+
+setopts(#transport{socket = Sock}, T) ->
+ setopts(Sock),
+ T.
+
%% setopts/1
setopts(Sock) ->
@@ -772,3 +891,83 @@ setopts(Sock) ->
ok -> ok;
X -> x({setopts, Sock, X}) %% possibly on peer disconnect
end.
+
+%% A message_cb is invoked whenever a message is sent or received, or
+%% to provide acknowledgement of a completed send or discarded
+%% request. See diameter_tcp for semantics, the only difference being
+%% that a recv callback can get a diameter_packet record as Msg
+%% depending on how/if option packet has been specified.
+
+%% message/3
+
+message(send, false = M, S) ->
+ message(ack, M, S);
+
+message(ack, _, #transport{message_cb = false} = S) ->
+ S;
+
+message(Dir, Msg, S) ->
+ setopts(S, actions(cb(S, Dir, Msg), Dir, S)).
+
+%% actions/3
+
+actions([], _, S) ->
+ S;
+
+actions([B | As], Dir, S)
+ when is_boolean(B) ->
+ actions(As, Dir, S#transport{recv = B});
+
+actions([Dir | As], _, S)
+ when Dir == send;
+ Dir == recv ->
+ actions(As, Dir, S);
+
+actions([Msg | As], send = Dir, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ actions(As, Dir, send(Msg, S));
+
+actions([Msg | As], recv = Dir, #transport{parent = Pid} = S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ diameter_peer:recv(Pid, Msg),
+ actions(As, Dir, S);
+
+actions([{defer, Tmo, Acts} | As], Dir, S) ->
+ erlang:send_after(Tmo, self(), {actions, Dir, Acts}),
+ actions(As, Dir, S);
+
+actions(CB, _, S) ->
+ S#transport{message_cb = CB}.
+
+%% cb/3
+
+cb(#transport{message_cb = false, packet = P}, recv, Msg) ->
+ [pkt(P, true, Msg)];
+
+cb(#transport{message_cb = CB, packet = P}, recv = D, Msg) ->
+ cb(CB, D, pkt(P, false, Msg));
+
+cb(#transport{message_cb = CB}, Dir, Msg) ->
+ cb(CB, Dir, Msg);
+
+cb(false, send, Msg) ->
+ [Msg];
+
+cb(CB, Dir, Msg) ->
+ diameter_lib:eval([CB, Dir, Msg]).
+
+%% pkt/3
+
+pkt(false, _, {_Info, Bin}) ->
+ Bin;
+
+pkt(true, _, {[#sctp_sndrcvinfo{stream = Id}], Bin}) ->
+ #diameter_packet{bin = Bin, transport_data = {stream, Id}};
+
+pkt(raw, true, {[Info], Bin}) ->
+ #diameter_packet{bin = Bin, transport_data = Info};
+
+pkt(raw, false, {[_], _} = Msg) ->
+ Msg.
diff --git a/lib/diameter/src/transport/diameter_sctp_sup.erl b/lib/diameter/src/transport/diameter_sctp_sup.erl
index 36050aaf28..e8e26ec7c5 100644
--- a/lib/diameter/src/transport/diameter_sctp_sup.erl
+++ b/lib/diameter/src/transport/diameter_sctp_sup.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -49,6 +49,7 @@ start() ->
start_child(T) ->
SupRef = case element(1,T) of
+ monitor -> ?TRANSPORT_SUP;
connect -> ?TRANSPORT_SUP;
accept -> ?TRANSPORT_SUP;
listen -> ?LISTENER_SUP
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl
index 44abc5c3b4..a2f393d5d4 100644
--- a/lib/diameter/src/transport/diameter_tcp.erl
+++ b/lib/diameter/src/transport/diameter_tcp.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2010-2016. All Rights Reserved.
+%% Copyright Ericsson AB 2010-2017. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -19,7 +19,6 @@
%%
-module(diameter_tcp).
--dialyzer({no_fail_call, throttle/2}).
-behaviour(gen_server).
@@ -53,6 +52,7 @@
%% Keys into process dictionary.
-define(INFO_KEY, info).
-define(REF_KEY, ref).
+-define(TRANSPORT_KEY, transport).
-define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})).
@@ -68,16 +68,23 @@
%% The same gen_server implementation supports three different kinds
%% of processes: an actual transport process, one that will club it to
%% death should the parent die before a connection is established, and
-%% a process owning the listening port.
+%% a process owning the listening port. The monitor process
+%% historically died after connection establishment, but can now live
+%% on as the sender of outgoing messages, so that a blocking send
+%% doesn't prevent messages from being received.
%% Listener process state.
-record(listener, {socket :: inet:socket(),
+ module :: module(),
service = false :: false | pid()}). %% service process
%% Monitor process state.
-record(monitor,
- {parent :: pid(),
- transport = self() :: pid()}).
+ {parent :: reference() | false | pid(),
+ transport = self() :: pid(),
+ ack = false :: boolean(),
+ socket :: inet:socket() | ssl:sslsocket() | undefined,
+ module :: module() | undefined}).
-type length() :: 0..16#FFFFFF. %% message length from Diameter header
-type size() :: non_neg_integer(). %% accumulated binary size
@@ -97,25 +104,30 @@
-type listen_option() :: {accept, match()}
| {ssl_options, true | [ssl:listen_option()]}
+ | option()
| ssl:listen_option()
| gen_tcp:listen_option().
-type option() :: {port, non_neg_integer()}
- | {fragment_timer, 0..16#FFFFFFFF}
- | {throttle_cb, diameter:evaluable()}.
+ | {sender, boolean()}
+ | sender
+ | {message_cb, false | diameter:evaluable()}
+ | {fragment_timer, 0..16#FFFFFFFF}.
%% Accepting/connecting transport process state.
-record(transport,
{socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket
+ active = false :: boolean(), %% is socket active?
+ recv = true :: boolean(), %% should it be active?
parent :: pid(), %% of process that started us
module :: module(), %% gen_tcp-like module
- frag = <<>> :: frag(), %% message fragment
ssl :: [term()] | boolean(), %% ssl options, ssl or not
+ frag = <<>> :: frag(), %% message fragment
timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout
tref = false :: false | reference(), %% fragment timer reference
flush = false :: boolean(), %% flush fragment at timeout?
- throttle_cb :: false | diameter:evaluable(), %% ask to receive
- throttled :: boolean() | binary()}). %% stopped receiving?
+ message_cb :: false | diameter:evaluable(),
+ send :: pid() | false}). %% sending process
%% The usual transport using gen_tcp can be replaced by anything
%% sufficiently gen_tcp-like by passing a 'module' option as the first
@@ -137,13 +149,13 @@
start({T, Ref}, Svc, Opts) ->
#diameter_service{capabilities = Caps,
- pid = SPid}
+ pid = SvcPid}
= Svc,
diameter_tcp_sup:start(), %% start tcp supervisors on demand
{Mod, Rest} = split(Opts),
Addrs = Caps#diameter_caps.host_ip_address,
- Arg = {T, Ref, Mod, self(), Rest, Addrs, SPid},
+ Arg = {T, Ref, Mod, self(), Rest, Addrs, SvcPid},
diameter_tcp_sup:start_child(Arg).
split([{module, M} | Opts]) ->
@@ -197,57 +209,53 @@ init(T) ->
%% i/1
%% A transport process.
-i({T, Ref, Mod, Pid, Opts, Addrs, SPid})
+i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid})
when T == accept;
T == connect ->
monitor(process, Pid),
%% Since accept/connect might block indefinitely, spawn a process
- %% that does nothing but kill us with the parent until call
- %% returns.
- {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
+ %% that kills us with the parent until call returns, and then
+ %% sends outgoing messages.
{[SO|TO], Rest} = proplists:split(Opts, [ssl_options,
- fragment_timer,
- throttle_cb]),
+ sender,
+ message_cb,
+ fragment_timer]),
SslOpts = ssl_opts(SO),
OwnOpts = lists:append(TO),
Tmo = proplists:get_value(fragment_timer,
OwnOpts,
?DEFAULT_FRAGMENT_TIMEOUT),
+ [CB, Sender] = [proplists:get_value(K, OwnOpts, false)
+ || K <- [message_cb, sender]],
?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}),
- Throttle = proplists:get_value(throttle_cb, OwnOpts, false),
- Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid),
- MPid ! {stop, self()}, %% tell the monitor to die
+ {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}),
+ Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid),
M = if SslOpts -> ssl; true -> Mod end,
+ Sender andalso monitor(process, MPid),
+ false == CB orelse (Pid ! {diameter, ack}),
+ MPid ! {start, self(), Sender andalso {Sock, M}, false /= CB},
putr(?REF_KEY, Ref),
- throttle(#transport{parent = Pid,
- module = M,
- socket = Sock,
- ssl = SslOpts,
- timeout = Tmo,
- throttle_cb = Throttle,
- throttled = false /= Throttle});
+ setopts(#transport{parent = Pid,
+ module = M,
+ socket = Sock,
+ ssl = SslOpts,
+ message_cb = CB,
+ timeout = Tmo,
+ send = Sender andalso MPid});
%% Put the reference in the process dictionary since we now use it
%% advertise the ssl socket after TLS upgrade.
-i({T, _Ref, _Mod, _Pid, _Opts, _Addrs} = Arg) %% from old code
- when T == accept;
- T == connect ->
- i(erlang:append_element(Arg, _SPid = false));
-
%% A monitor process to kill the transport if the parent dies.
i(#monitor{parent = Pid, transport = TPid} = S) ->
+ putr(?TRANSPORT_KEY, TPid),
proc_lib:init_ack({ok, self()}),
- monitor(process, Pid),
monitor(process, TPid),
- S;
+ S#monitor{parent = monitor(process, Pid)};
%% In principle a link between the transport and killer processes
%% could do the same thing: have the accepting/connecting process be
%% killed when the killer process dies as a consequence of parent
%% death. However, a link can be unlinked and this is exactly what
-%% gen_tcp seems to so. Links should be left to supervisors.
-
-i({listen = L, Ref, _APid, T}) -> %% from old code
- i({L, Ref, T});
+%% gen_tcp seems to do. Links should be left to supervisors.
i({listen, Ref, {Mod, Opts, Addrs}}) ->
[_] = diameter_config:subscribe(Ref, transport), %% assert existence
@@ -258,7 +266,8 @@ i({listen, Ref, {Mod, Opts, Addrs}}) ->
LAddr = laddr(LAddrOpt, Mod, LSock),
true = diameter_reg:add_new({?MODULE, listener, {Ref, {LAddr, LSock}}}),
proc_lib:init_ack({ok, self(), {LAddr, LSock}}),
- #listener{socket = LSock}.
+ #listener{socket = LSock,
+ module = Mod}.
laddr([], Mod, Sock) ->
{ok, {Addr, _Port}} = sockname(Mod, Sock),
@@ -279,19 +288,19 @@ ssl_opts(T) ->
%% init/8
%% Establish a TLS connection before capabilities exchange ...
-init(Type, Ref, Mod, Pid, true, Opts, Addrs, SPid) ->
- init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SPid);
+init(Type, Ref, Mod, Pid, true, Opts, Addrs, SvcPid) ->
+ init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SvcPid);
%% ... or not.
-init(Type, Ref, Mod, Pid, _, Opts, Addrs, SPid) ->
- init(Type, Ref, Mod, Pid, Opts, Addrs, SPid).
+init(Type, Ref, Mod, Pid, _, Opts, Addrs, SvcPid) ->
+ init(Type, Ref, Mod, Pid, Opts, Addrs, SvcPid).
%% init/7
-init(accept = T, Ref, Mod, Pid, Opts, Addrs, SPid) ->
+init(accept = T, Ref, Mod, Pid, Opts, Addrs, SvcPid) ->
{[Matches], Rest} = proplists:split(Opts, [accept]),
{ok, LPid, {LAddr, LSock}} = listener(Ref, {Mod, Rest, Addrs}),
- ok = gen_server:call(LPid, {accept, SPid}, infinity),
+ ok = gen_server:call(LPid, {accept, SvcPid}, infinity),
proc_lib:init_ack({ok, self(), [LAddr]}),
Sock = ok(accept(Mod, LSock)),
ok = accept_peer(Mod, Sock, accept(Matches)),
@@ -299,7 +308,7 @@ init(accept = T, Ref, Mod, Pid, Opts, Addrs, SPid) ->
diameter_peer:up(Pid),
Sock;
-init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SPid) ->
+init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SvcPid) ->
{[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]),
LAddrOpt = get_addr(LA, Addrs),
RAddr = get_addr(RA),
@@ -451,14 +460,18 @@ portnr(Sock) ->
%% # handle_call/3
%% ---------------------------------------------------------------------------
-handle_call({accept, SPid}, _From, #listener{service = P} = S) ->
- {reply, ok, if not is_pid(P), is_pid(SPid) ->
- monitor(process, SPid),
- S#listener{service = SPid};
+handle_call({accept, SvcPid}, _From, #listener{service = P} = S) ->
+ {reply, ok, if not is_pid(P), is_pid(SvcPid) ->
+ monitor(process, SvcPid),
+ S#listener{service = SvcPid};
true ->
S
end};
-
+
+%% Transport is telling us of parent death.
+handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) ->
+ {stop, {shutdown, Reason}, ok, S};
+
handle_call(_, _, State) ->
{reply, nok, State}.
@@ -480,8 +493,7 @@ handle_info(T, #listener{} = S) ->
{noreply, #listener{} = l(T,S)};
handle_info(T, #monitor{} = S) ->
- m(T,S),
- x(T).
+ {noreply, #monitor{} = m(T,S)}.
%% ---------------------------------------------------------------------------
%% # code_change/3
@@ -497,6 +509,7 @@ code_change(_, State, _) ->
terminate(_, _) ->
ok.
+
%% ---------------------------------------------------------------------------
putr(Key, Val) ->
@@ -509,18 +522,47 @@ getr(Key) ->
%%
%% Transition monitor state.
+%% Outgoing message.
+m(Msg, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ send(Msg, S),
+ S;
+
+%% Transport has established a connection. Stop monitoring on the
+%% parent so as not to die before a send from the transport.
+m({start, TPid, T, Ack} = M, #monitor{transport = TPid} = S) ->
+ case T of
+ {Sock, Mod} ->
+ demonitor(S#monitor.parent, [flush]),
+ S#monitor{parent = false,
+ socket = Sock,
+ module = Mod,
+ ack = Ack};
+ false -> %% monitor not sending
+ x(M)
+ end;
+
%% Transport is telling us to die.
-m({stop, TPid}, #monitor{transport = TPid}) ->
- ok;
+m({stop, TPid} = T, #monitor{transport = TPid}) ->
+ x(T);
-%% Transport has died.
-m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) ->
- ok;
+%% Transport is telling us to die.
+m({stop, TPid} = T, #monitor{transport = TPid}) ->
+ x(T);
-%% Transport parent has died.
-m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid,
- transport = TPid}) ->
- exit(TPid, {shutdown, parent}).
+%% Transport is telling us that TLS has been negotiated after
+%% capabilities exchange.
+m({tls, SSock}, S) ->
+ S#monitor{socket = SSock,
+ module = ssl};
+
+%% Transport or parent has died.
+m({'DOWN', M, process, P, _} = T, #monitor{parent = MRef,
+ transport = TPid})
+ when M == MRef;
+ P == TPid ->
+ x(T).
%% l/2
%%
@@ -528,18 +570,16 @@ m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid,
%% Service process has died.
l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid,
- socket = Sock}) ->
- gen_tcp:close(Sock),
+ socket = Sock,
+ module = M}) ->
+ M:close(Sock),
x(T);
%% Transport has been removed.
-l({transport, remove, _} = T, #listener{socket = Sock}) ->
- gen_tcp:close(Sock),
- x(T);
-
-%% Possibly death of an accepting process monitored in old code.
-l(_, S) ->
- S.
+l({transport, remove, _} = T, #listener{socket = Sock,
+ module = M}) ->
+ M:close(Sock),
+ x(T).
%% t/2
%%
@@ -557,21 +597,13 @@ t(T,S) ->
%% transition/2
-%% Incoming message.
+%% Incoming packets.
transition({P, Sock, Bin}, #transport{socket = Sock,
- ssl = B,
- throttled = T}
+ ssl = B}
= S)
when P == ssl, true == B;
P == tcp ->
- false = T, %% assert
- recv(Bin, S);
-
-%% Make a new throttling callback after a timeout.
-transition(throttle, #transport{throttled = false}) ->
- ok;
-transition(throttle, S) ->
- throttle(S);
+ recv(Bin, S#transport{active = false});
%% Capabilties exchange has decided on whether or not to run over TLS.
transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
@@ -581,7 +613,7 @@ transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid}
= NS
= tls_handshake(Type, B, S),
Pid ! {diameter, {tls, Ref}},
- throttle(NS#transport{ssl = B});
+ NS#transport{ssl = B};
transition({C, Sock}, #transport{socket = Sock,
ssl = B})
@@ -597,8 +629,18 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock,
?ERROR({T,S});
%% Outgoing message.
-transition({diameter, {send, Bin}}, S) ->
- send(Bin, S);
+transition({diameter, {send, Msg}}, #transport{} = S) ->
+ message(send, Msg, S);
+
+%% Monitor has sent an outgoing message.
+transition(Msg, S)
+ when is_record(Msg, diameter_packet);
+ is_binary(Msg) ->
+ message(ack, Msg, S);
+
+%% Deferred actions from a message_cb.
+transition({actions, Dir, Acts}, S) ->
+ actions(Acts, Dir, S);
%% Request to close the transport connection.
transition({diameter, {close, Pid}}, #transport{parent = Pid,
@@ -618,8 +660,18 @@ transition({resolve_port, Pid}, #transport{socket = Sock,
Pid ! portnr(M, Sock),
ok;
-%% Parent process has died.
-transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) ->
+%% Parent process has died: call the monitor to not close the socket
+%% during an ongoing send, but don't let it take forever.
+transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid,
+ send = MPid}) ->
+ false == MPid
+ orelse (ok == gen_server:call(MPid, {stop, self()}, 1000))
+ orelse exit(MPid, {shutdown, parent}),
+ stop;
+
+%% Monitor process has died.
+transition({'DOWN', _, process, MPid, _}, #transport{send = MPid})
+ when is_pid(MPid) ->
stop.
%% Crash on anything unexpected.
@@ -643,11 +695,13 @@ tls_handshake(_, true, #transport{ssl = false}) ->
%% Capabilities exchange negotiated TLS: upgrade the connection.
tls_handshake(Type, true, #transport{socket = Sock,
module = M,
- ssl = Opts}
+ ssl = Opts,
+ send = MPid}
= S) ->
{ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]),
Ref = getr(?REF_KEY),
true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}),
+ false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process
S#transport{socket = SSock,
module = ssl};
@@ -666,24 +720,15 @@ tls(accept, Sock, Opts) ->
%% using Nagle.
%% Receive packets until a full message is received,
-recv(Bin, #transport{frag = Head, throttled = false} = S) ->
+recv(Bin, #transport{frag = Head} = S) ->
case rcv(Head, Bin) of
- {Msg, B} ->
- throttle(S#transport{frag = B, throttled = Msg});
- Frag ->
- setopts(S),
- start_fragment_timer(S#transport{frag = Frag,
- flush = false})
+ {Msg, B} -> %% have a complete message ...
+ message(recv, Msg, S#transport{frag = B});
+ Frag -> %% read more on the socket
+ start_fragment_timer(setopts(S#transport{frag = Frag,
+ flush = false}))
end.
-%% recv/1
-
-recv(#transport{throttled = false} = S) ->
- recv(<<>>, S);
-
-recv(#transport{} = S) ->
- S.
-
%% rcv/2
%% No previous fragment.
@@ -743,13 +788,16 @@ recv1(Len, Bin) ->
<<Msg:Len/binary, Rest/binary>> = Bin,
{Msg, Rest}.
-%% bin/1-2
+%% bin/2
bin(Head, Acc) ->
list_to_binary([Head | lists:reverse(Acc)]).
+%% bin/1
+
bin({_, _, Head, Acc}) ->
bin(Head, Acc);
+
bin(Bin)
when is_binary(Bin) ->
Bin.
@@ -768,9 +816,7 @@ bin(Bin)
%% also eventually lead to watchdog failover.
%% No fragment to flush or not receiving messages.
-flush(#transport{frag = Frag, throttled = B} = S)
- when Frag == <<>>;
- B /= false ->
+flush(#transport{frag = <<>>} = S) ->
S;
%% Messages have been received since last timer expiry.
@@ -778,9 +824,8 @@ flush(#transport{flush = false} = S) ->
start_fragment_timer(S#transport{flush = true});
%% No messages since last expiry.
-flush(#transport{frag = Frag, parent = Pid} = S) ->
- diameter_peer:recv(Pid, bin(Frag)),
- S#transport{frag = <<>>}.
+flush(#transport{frag = Frag} = S) ->
+ message(recv, bin(Frag), S#transport{frag = <<>>}).
%% start_fragment_timer/1
%%
@@ -813,9 +858,27 @@ connect(Mod, Host, Port, Opts) ->
%% send/2
-send(Bin, #transport{socket = Sock,
- module = M}) ->
- case send(M, Sock, Bin) of
+send(Msg, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) ->
+ send1(M, Sock, Msg),
+ B andalso (TPid ! Msg);
+
+send(Msg, #transport{socket = Sock, module = M, send = false} = S) ->
+ send1(M, Sock, Msg),
+ message(ack, Msg, S);
+
+%% Send from the monitor process to avoid deadlock if both the
+%% receiver and the peer were to block in send.
+send(Msg, #transport{send = Pid} = S) ->
+ Pid ! Msg,
+ S.
+
+%% send1/3
+
+send1(Mod, Sock, #diameter_packet{bin = Bin}) ->
+ send1(Mod, Sock, Bin);
+
+send1(Mod, Sock, Bin) ->
+ case send(Mod, Sock, Bin) of
ok ->
ok;
{error, Reason} ->
@@ -842,120 +905,19 @@ setopts(M, Sock, Opts) ->
%% setopts/1
-setopts(#transport{socket = Sock, module = M}) ->
- setopts(M, Sock).
-
-%% setopts/2
-
-setopts(M, Sock) ->
+setopts(#transport{socket = Sock,
+ active = A,
+ recv = B,
+ module = M}
+ = S)
+ when B, not A ->
case setopts(M, Sock, [{active, once}]) of
- ok -> ok;
- X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect
- end.
-
-%% throttle/1
-
-%% Still collecting packets for a complete message: keep receiving.
-throttle(#transport{throttled = false} = S) ->
- recv(S);
-
-%% Decide whether to receive another, or whether to accept a message
-%% that's been received.
-throttle(#transport{throttle_cb = F, throttled = T} = S) ->
- Res = cb(F, T),
-
- try throttle(Res, S) of
- #transport{ssl = SB} = NS when is_boolean(SB) ->
- throttle(defrag(NS));
- #transport{throttled = Msg} = NS when is_binary(Msg) ->
- %% Initial incoming message when we might need to upgrade
- %% to TLS: wait for reception of a tls tuple.
- defrag(NS)
- catch
- #transport{} = NS ->
- recv(NS)
- end.
-
-%% cb/2
-
-cb(false, _) ->
- ok;
-
-cb(F, B) ->
- diameter_lib:eval([F, true /= B andalso B]).
-
-%% throttle/2
-
-%% Callback says to receive another message.
-throttle(ok, #transport{throttled = true} = S) ->
- throw(S#transport{throttled = false});
-
-%% Callback says to accept a received message.
-throttle(ok, #transport{parent = Pid, throttled = Msg} = S)
- when is_binary(Msg) ->
- diameter_peer:recv(Pid, Msg),
- S;
-
-throttle({ok = T, F}, S) ->
- throttle(T, S#transport{throttle_cb = F});
-
-%% Callback says to accept a received message and acknowledged the
-%% returned pid with a {request, Pid} message if a request pid is
-%% spawned, a discard message otherwise. The latter does not mean that
-%% the message was necessarily discarded: it could have been an
-%% answer.
-throttle(NPid, #transport{parent = Pid, throttled = Msg} = S)
- when is_pid(NPid), is_binary(Msg) ->
- diameter_peer:recv(Pid, {Msg, NPid}),
- S;
-
-throttle({NPid, F}, #transport{throttled = Msg} = S)
- when is_pid(NPid), is_binary(Msg) ->
- throttle(NPid, S#transport{throttle_cb = F});
-
-%% Callback to accept a received message says to discard it.
-throttle(discard, #transport{throttled = Msg} = S)
- when is_binary(Msg) ->
- S;
-
-throttle({discard = T, F}, #transport{throttled = Msg} = S)
- when is_binary(Msg) ->
- throttle(T, S#transport{throttle_cb = F});
-
-%% Callback to accept a received message says to answer it with the
-%% supplied binary.
-throttle(Bin, #transport{throttled = Msg} = S)
- when is_binary(Bin), is_binary(Msg) ->
- send(Bin, S),
- S;
-
-throttle({Bin, F}, #transport{throttled = Msg} = S)
- when is_binary(Bin), is_binary(Msg) ->
- throttle(Bin, S#transport{throttle_cb = F});
-
-%% Callback says to ask again in the specified number of milliseconds.
-throttle({timeout, Tmo}, S) ->
- erlang:send_after(Tmo, self(), throttle),
- throw(S);
-
-throttle({timeout = T, Tmo, F}, S) ->
- throttle({T, Tmo}, S#transport{throttle_cb = F});
-
-throttle(T, #transport{throttle_cb = F}) ->
- ?ERROR({invalid_return, T, F}).
-
-%% defrag/1
-%%
-%% Try to extract another message from packets already read before
-%% another throttling callback.
+ ok -> S#transport{active = true};
+ X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect
+ end;
-defrag(#transport{frag = Head} = S) ->
- case rcv(Head, <<>>) of
- {Msg, B} ->
- S#transport{throttled = Msg, frag = B};
- _ ->
- S#transport{throttled = true}
- end.
+setopts(S) ->
+ S.
%% portnr/2
@@ -990,3 +952,80 @@ getstat(gen_tcp, Sock) ->
getstat(M, Sock) ->
M:getstat(Sock).
%% Note that ssl:getstat/1 doesn't yet exist in R15B01.
+
+%% A message_cb is invoked whenever a message is sent or received, or
+%% to provide acknowledgement of a completed send or discarded
+%% request. Ignoring possible extra arguments, calls are of the
+%% following form.
+%%
+%% cb(recv, Msg) Receive a message into diameter?
+%% cb(send, Msg) Send a message on the socket?
+%% cb(ack, Msg) Acknowledgement of a completed send.
+%% cb(ack, false) Acknowledgement of a discarded request.
+%%
+%% Msg will be binary() in a recv callback, but can be a
+%% diameter_packet record in a send/ack callback if a recv/send
+%% callback returns a record. Callbacks return a list of the following
+%% form.
+%%
+%% [boolean() | send | recv | binary() | #diameter_packet{}]
+%%
+%% The atoms are meaningless by themselves, but say whether subsequent
+%% messages are to be sent or received. A boolean says whether or not
+%% to continue reading on the socket. Messages can be received even
+%% after false is returned if these arrived in the same packet. A
+%% leading recv or send is implicit on the corresponding callbacks. A
+%% new callback can be returned as the tail of a returned list: any
+%% value not of the aforementioned list type is interpreted as a
+%% callback.
+
+%% message/3
+
+message(send, false = M, S) ->
+ message(ack, M, S);
+
+message(ack, _, #transport{message_cb = false} = S) ->
+ S;
+
+message(Dir, Msg, #transport{message_cb = CB} = S) ->
+ recv(<<>>, actions(cb(CB, Dir, Msg), Dir, S)).
+
+%% actions/3
+
+actions([], _, S) ->
+ S;
+
+actions([B | As], Dir, S)
+ when is_boolean(B) ->
+ actions(As, Dir, S#transport{recv = B});
+
+actions([Dir | As], _, S)
+ when Dir == send;
+ Dir == recv ->
+ actions(As, Dir, S);
+
+actions([Msg | As], send = Dir, S)
+ when is_binary(Msg);
+ is_record(Msg, diameter_packet) ->
+ actions(As, Dir, send(Msg, S));
+
+actions([Msg | As], recv = Dir, #transport{parent = Pid} = S)
+ when is_binary(Msg);
+ is_record(Msg, diameter_packet) ->
+ diameter_peer:recv(Pid, Msg),
+ actions(As, Dir, S);
+
+actions([{defer, Tmo, Acts} | As], Dir, S) ->
+ erlang:send_after(Tmo, self(), {actions, Dir, Acts}),
+ actions(As, Dir, S);
+
+actions(CB, _, S) ->
+ S#transport{message_cb = CB}.
+
+%% cb/3
+
+cb(false, _, Msg) ->
+ [Msg];
+
+cb(CB, Dir, Msg) ->
+ diameter_lib:eval([CB, Dir, Msg]).