diff options
Diffstat (limited to 'lib/diameter/src/transport')
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 428 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp_sup.erl | 3 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 680 |
3 files changed, 709 insertions, 402 deletions
diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index f48e4347ee..64b34da690 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -52,21 +52,20 @@ %% Keys into process dictionary. -define(INFO_KEY, info). -define(REF_KEY, ref). +-define(TRANSPORT_KEY, transport). -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). %% The default port for a listener. -define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 -%% Remote addresses to accept connections from. --define(DEFAULT_ACCEPT, []). %% any - %% How long to wait for a transport process to attach after %% association establishment. -define(ACCEPT_TIMEOUT, 5000). -type connect_option() :: {raddr, inet:ip_address()} | {rport, inet:port_number()} + | option() | term(). %% gen_sctp:open_option(). -type match() :: inet:ip_address() @@ -74,8 +73,14 @@ | [match()]. -type listen_option() :: {accept, match()} + | option() | term(). %% gen_sctp:open_option(). +-type option() :: {sender, boolean()} + | sender + | {packet, boolean() | raw} + | {message_cb, false | diameter:eval()}. + -type uint() :: non_neg_integer(). %% Accepting/connecting transport process state. @@ -87,20 +92,38 @@ %% {RAs, RP, Errors} | connect, socket :: gen_sctp:sctp_socket() | undefined, - assoc_id :: gen_sctp:assoc_id(), %% association identifier + active = false :: boolean(), %% is socket active? + recv = true :: boolean(), %% should it be active? + assoc_id :: gen_sctp:assoc_id() %% association identifier + | undefined + | true, peer :: {[inet:ip_address()], uint()} %% {RAs, RP} | undefined, streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, - os = 0 :: uint()}). %% next output stream + os = 0 :: uint(), %% next output stream + rotate = 1 :: boolean() | 0 | 1, %% rotate os? + unordered = false :: boolean() %% always send unordered? + | pos_integer(),% or if =< N outbound streams? + packet = true :: boolean() %% legacy transport_data? + | raw, + message_cb = false :: false | diameter:eval(), + send = false :: pid() | boolean()}). %% sending process + +%% Monitor process state. +-record(monitor, + {transport :: pid(), + ack = false :: boolean(), + socket :: gen_sctp:sctp_socket(), + assoc_id :: gen_sctp:assoc_id()}). %% Listener process state. -record(listener, {ref :: reference(), socket :: gen_sctp:sctp_socket(), - service = false :: false | pid(), %% service process + service :: pid(), %% service process pending = {0, queue:new()}, - accept :: [match()]}). + opts :: [[match()] | boolean() | diameter:eval()]}). %% Field pending implements two queues: the first of transport-to-be %% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived @@ -132,24 +155,19 @@ start(T, Svc, Opts) when is_list(Opts) -> #diameter_service{capabilities = Caps, - pid = SPid} + pid = Pid} = Svc, diameter_sctp_sup:start(), %% start supervisors on demand Addrs = Caps#diameter_caps.host_ip_address, - s(T, Addrs, SPid, lists:map(fun ip/1, Opts)). - -ip({ifaddr, A}) -> - {ip, A}; -ip(T) -> - T. + s(T, Addrs, Pid, Opts). %% A listener spawns transports either as a consequence of this call %% when there is not yet an association to assign it, or at comm_up on %% a new association in which case the call retrieves a transport from %% the pending queue. -s({accept, Ref} = A, Addrs, SPid, Opts) -> - {ok, LPid, LAs} = listener(Ref, {Opts, Addrs}), - try gen_server:call(LPid, {A, self(), SPid}, infinity) of +s({accept, Ref} = A, Addrs, SvcPid, Opts) -> + {ok, LPid, LAs} = listener(Ref, {Opts, SvcPid, Addrs}), + try gen_server:call(LPid, {A, self()}, infinity) of {ok, TPid} -> {ok, TPid, LAs}; No -> @@ -162,7 +180,7 @@ s({accept, Ref} = A, Addrs, SPid, Opts) -> %% gen_sctp in order to be able to accept a new association only %% *after* an accepting transport has been spawned. -s({connect = C, Ref}, Addrs, _SPid, Opts) -> +s({connect = C, Ref}, Addrs, _SvcPid, Opts) -> diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}). %% start_link/1 @@ -216,22 +234,46 @@ init(T) -> %% i/1 +i(#monitor{transport = TPid} = S) -> + monitor(process, TPid), + putr(?TRANSPORT_KEY, TPid), + proc_lib:init_ack({ok, self()}), + S; + %% A process owning a listening socket. -i({listen, Ref, {Opts, Addrs}}) -> +i({listen, Ref, {Opts, SvcPid, Addrs}}) -> + monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {[Matches], Rest} = proplists:split(Opts, [accept]), + {Split, Rest} = proplists:split(Opts, [accept, + packet, + sender, + message_cb, + unordered]), + OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), proc_lib:init_ack({ok, self(), LAs}), #listener{ref = Ref, + service = SvcPid, socket = Sock, - accept = [[M] || {accept, M} <- Matches]}; + opts = [[[M] || {accept, M} <- OwnOpts], + proplists:get_value(packet, OwnOpts, true) + | [proplists:get_value(K, OwnOpts, false) + || K <- [sender, message_cb, unordered]]]}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> - {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]), - RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As], + {[Ps | Split], Rest} = proplists:split(Opts, [rport, + raddr, + packet, + sender, + message_cb, + unordered]), + OwnOpts = lists:append(Split), + CB = proplists:get_value(message_cb, OwnOpts, false), + false == CB orelse (Pid ! {diameter, ack}), + RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts], [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], {LAs, Sock} = open(Addrs, Rest, 0), putr(?REF_KEY, Ref), @@ -239,7 +281,11 @@ i({connect, Pid, Opts, Addrs, Ref}) -> monitor(process, Pid), #transport{parent = Pid, mode = {connect, connect(Sock, RAs, RP, [])}, - socket = Sock}; + socket = Sock, + message_cb = CB, + unordered = proplists:get_value(ordered, OwnOpts, false), + packet = proplists:get_value(packet, OwnOpts, true), + send = proplists:get_value(sender, OwnOpts, false)}; %% An accepting transport spawned by diameter, not yet owning an %% association. @@ -273,11 +319,17 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> receive {Ref, Pid} when K == parent -> %% transport process started S#transport{parent = Pid}; - {K, T, Matches} when K == peeloff -> %% association + {K, T, Opts} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, + [Matches, Packet, Sender, CB, Unordered] = Opts, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), - t(T, S#transport{socket = Sock}); + false == CB orelse (S#transport.parent ! {diameter, ack}), + t(T, S#transport{socket = Sock, + message_cb = CB, + unordered = Unordered, + packet = Packet, + send = Sender}); accept_timeout = T -> x(T); {'DOWN', _, process, _, _} = T -> @@ -309,23 +361,35 @@ l([], Ref, T) -> %% open/3 open(Addrs, Opts, PortNr) -> - {LAs, Os} = addrs(Addrs, Opts), - {LAs, case gen_sctp:open(gen_opts(portnr(Os, PortNr))) of - {ok, Sock} -> - Sock; - {error, Reason} -> - x({open, Reason}) - end}. + case gen_sctp:open(gen_opts(portnr(addrs(Addrs, Opts), PortNr))) of + {ok, Sock} -> + {addrs(Sock), Sock}; + {error, Reason} -> + x({open, Reason}) + end. addrs(Addrs, Opts) -> - case proplists:split(Opts, [ip]) of - {[[]], _} -> - {Addrs, Opts ++ [{ip, A} || A <- Addrs]}; - {[As], Os} -> - LAs = [diameter_lib:ipaddr(A) || {ip, A} <- As], - {LAs, Os ++ [{ip, A} || A <- LAs]} + case lists:mapfoldl(fun ipaddr/2, false, Opts) of + {Os, true} -> + Os; + {_, false} -> + Opts ++ [{ip, A} || A <- Addrs] end. +ipaddr({K,A}, _) + when K == ifaddr; + K == ip -> + {{ip, ipaddr(A)}, true}; +ipaddr(T, B) -> + {T, B}. + +ipaddr(A) + when A == loopback; + A == any -> + A; +ipaddr(A) -> + diameter_lib:ipaddr(A). + portnr(Opts, PortNr) -> case proplists:get_value(port, Opts) of undefined -> @@ -334,6 +398,14 @@ portnr(Opts, PortNr) -> Opts end. +addrs(Sock) -> + case inet:socknames(Sock) of + {ok, As} -> + [A || {A,_} <- As]; + {error, Reason} -> + x({socknames, Reason}) + end. + %% x/1 x(Reason) -> @@ -374,13 +446,9 @@ handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref} = S) -> {TPid, NewS} = accept(Ref, Pid, S), {reply, {ok, TPid}, NewS}; -handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) -> - handle_call({T, Pid}, From, if not is_pid(P), is_pid(SPid) -> - monitor(process, SPid), - S#listener{service = SPid}; - true -> - S - end); +%% Transport is telling us of parent death. +handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> + {stop, {shutdown, Reason}, ok, S}; handle_call(_, _, State) -> {reply, nok, State}. @@ -400,9 +468,13 @@ handle_info(T, #transport{} = S) -> {noreply, #transport{} = t(T,S)}; handle_info(T, #listener{} = S) -> - {noreply, #listener{} = l(T,S)}. + {noreply, #listener{} = l(T,S)}; + +handle_info(T, #monitor{} = S) -> + m(T,S), + {noreply, S}. -%% Prior to the possiblity of setting pool_size on in transport +%% Prior to the possibility of setting pool_size on in transport %% configuration, a new accepting transport was only started following %% the death of a predecessor, so that there was only at most one %% previously started transport process waiting for an association. @@ -422,6 +494,9 @@ code_change(_, State, _) -> %% # terminate/2 %% --------------------------------------------------------------------------- +terminate(_, #monitor{}) -> + ok; + terminate(_, #transport{assoc_id = undefined}) -> ok; @@ -445,11 +520,11 @@ getr(Key) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, - accept = Matches} + opts = Opts} = S) -> Id = assoc_id(Data), {TPid, NewS} = accept(S), - TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches}, + TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Opts}, setopts(Sock), NewS; @@ -503,12 +578,21 @@ t(T,S) -> %% Incoming message. transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> - setopts(Sock), - recv(Data, S); + setopts(S, recv(Data, S#transport{active = false})); %% Outgoing message. transition({diameter, {send, Msg}}, S) -> - send(Msg, S); + message(send, Msg, S); + +%% Monitor has sent an outgoing message. +transition(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + message(ack, Msg, S); + +%% Deferred actions from a message_cb. +transition({actions, Dir, Acts}, S) -> + setopts(ok, actions(Acts, Dir, S)); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> @@ -522,8 +606,18 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> stop; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> +%% Parent process has died: call the monitor to not close the socket +%% during an ongoing send, but don't let it take forever. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, + send = MPid}) -> + is_boolean(MPid) + orelse ok == (catch gen_server:call(MPid, {stop, Pid})) + orelse exit(MPid, kill), + stop; + +%% Monitor process has died. +transition({'DOWN', _, process, MPid, _}, #transport{send = MPid}) + when is_pid(MPid) -> stop; %% Timeout after transport process has been started. @@ -536,6 +630,18 @@ transition({resolve_port, Pid}, #transport{socket = Sock}) Pid ! inet:port(Sock), ok. +%% m/2 + +m({Msg, StreamId}, #monitor{socket = Sock, + transport = TPid, + assoc_id = AId, + ack = B}) -> + send(Sock, AId, StreamId, Msg), + B andalso (TPid ! Msg); + +m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) -> + x(T). + %% Crash on anything unexpected. ok({ok, T}) -> @@ -578,33 +684,57 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% send/2 +%% Start monitor process on first send. +send(Msg, #transport{send = true, + socket = Sock, + assoc_id = AId, + message_cb = CB} + = S) -> + {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(), + socket = Sock, + assoc_id = AId, + ack = false /= CB}), + monitor(process, MPid), + send(Msg, S#transport{send = MPid}); + %% Outbound Diameter message on a specified stream ... -send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, +send(#diameter_packet{transport_data = {outstream, SId}} + = Msg, #transport{streams = {_, OS}} = S) -> - send(SId rem OS, Bin, S), - S; + send(SId rem OS, Msg, S); -%% ... or not: rotate through all streams. -send(#diameter_packet{bin = Bin}, S) -> - send(Bin, S); -send(Bin, #transport{streams = {_, OS}, +%% ... or not: rotate when sending on multiple streams ... +send(Msg, #transport{rotate = true, + streams = {_, OS}, os = N} - = S) - when is_binary(Bin) -> - send(N, Bin, S), - S#transport{os = (N + 1) rem OS}. + = S) -> + send(N, Msg, S#transport{os = (N + 1) rem OS}); + +%% ... or send on the only stream available. +send(Msg, S) -> + send(0, Msg, S). %% send/3 -send(StreamId, Bin, #transport{socket = Sock, - assoc_id = AId}) -> - send(Sock, AId, StreamId, Bin). +send(StreamId, Msg, #transport{send = false, + socket = Sock, + assoc_id = AId} + = S) -> + send(Sock, AId, StreamId, Msg), + message(ack, Msg, S); + +send(StreamId, Msg, #transport{send = MPid} = S) -> + MPid ! {Msg, StreamId}, + S. %% send/4 -send(Sock, AssocId, Stream, Bin) -> - case gen_sctp:send(Sock, AssocId, Stream, Bin) of +send(Sock, AssocId, StreamId, #diameter_packet{bin = Bin}) -> + send(Sock, AssocId, StreamId, Bin); + +send(Sock, AssocId, StreamId, Bin) -> + case gen_sctp:send(Sock, AssocId, StreamId, Bin) of ok -> ok; {error, Reason} -> @@ -624,7 +754,9 @@ recv({_, #sctp_assoc_change{state = comm_up, = S) -> Ref = getr(?REF_KEY), publish(T, Ref, Id, Sock), - up(S#transport{assoc_id = Id, + %% Deal with different association id after peeloff on Solaris by + %% taking the id from the first reception. + up(S#transport{assoc_id = T == accept orelse Id, streams = {IS, OS}}); %% ... or not: try the next address. @@ -639,17 +771,19 @@ recv({_, #sctp_assoc_change{} = E}, recv({_, #sctp_assoc_change{}}, _) -> stop; +%% First inbound on an accepting transport. +recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin} + = T, + #transport{assoc_id = true} + = S) -> + recv(T, S#transport{assoc_id = Id}); + %% Inbound Diameter message. -recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid}) +recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S) when is_binary(Bin) -> - diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id}, - bin = Bin}), - ok; + message(recv, Msg, recv(S)); -recv({_, #sctp_shutdown_event{assoc_id = A}}, - #transport{assoc_id = Id}) - when A == Id; - A == 0 -> +recv({_, #sctp_shutdown_event{}}, _) -> stop; %% Note that diameter_sctp(3) documents that sctp_events cannot be @@ -667,6 +801,41 @@ recv({_, #sctp_paddr_change{}}, _) -> recv({_, #sctp_pdapi_event{}}, _) -> ok. +%% recv/1 +%% +%% Start sending unordered after the second reception, so that an +%% outgoing CER/CEA will arrive at the peer before another request. + +recv(#transport{rotate = B} = S) + when is_boolean(B) -> + S; + +recv(#transport{rotate = 0, + streams = {_,OS}, + socket = Sock, + unordered = B} + = S) -> + ok = unordered(Sock, OS, B), + S#transport{rotate = 1 < OS}; + +recv(#transport{rotate = N} = S) -> + S#transport{rotate = N-1}. + +%% unordered/3 + +unordered(Sock, OS, B) + when B; + is_integer(B), OS =< B -> + inet:setopts(Sock, [{sctp_default_send_param, + #sctp_sndrcvinfo{flags = [unordered]}}]); + +unordered(_, OS, B) + when not B; + is_integer(B), B < OS -> + ok. + +%% publish/4 + publish(T, Ref, Id, Sock) -> true = diameter_reg:add_new({?MODULE, T, {Ref, {Id, Sock}}}), putr(?INFO_KEY, {gen_sctp, Sock}). %% for info/1 @@ -765,6 +934,23 @@ connect(Sock, [Addr | AT] = As, Port, Reasons) -> connect(Sock, AT, Port, [{Addr, E} | Reasons]) end. +%% setopts/2 + +setopts(_, #transport{socket = Sock, + active = A, + recv = B} + = S) + when B, not A -> + setopts(Sock), + S#transport{active = true}; + +setopts(_, #transport{} = S) -> + S; + +setopts(#transport{socket = Sock}, T) -> + setopts(Sock), + T. + %% setopts/1 setopts(Sock) -> @@ -772,3 +958,83 @@ setopts(Sock) -> ok -> ok; X -> x({setopts, Sock, X}) %% possibly on peer disconnect end. + +%% A message_cb is invoked whenever a message is sent or received, or +%% to provide acknowledgement of a completed send or discarded +%% request. See diameter_tcp for semantics, the only difference being +%% that a recv callback can get a diameter_packet record as Msg +%% depending on how/if option packet has been specified. + +%% message/3 + +message(send, false = M, S) -> + message(ack, M, S); + +message(ack, _, #transport{message_cb = false} = S) -> + S; + +message(Dir, Msg, S) -> + setopts(S, actions(cb(S, Dir, Msg), Dir, S)). + +%% actions/3 + +actions([], _, S) -> + S; + +actions([B | As], Dir, S) + when is_boolean(B) -> + actions(As, Dir, S#transport{recv = B}); + +actions([Dir | As], _, S) + when Dir == send; + Dir == recv -> + actions(As, Dir, S); + +actions([Msg | As], send = Dir, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + actions(As, Dir, send(Msg, S)); + +actions([Msg | As], recv = Dir, #transport{parent = Pid} = S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + diameter_peer:recv(Pid, Msg), + actions(As, Dir, S); + +actions([{defer, Tmo, Acts} | As], Dir, S) -> + erlang:send_after(Tmo, self(), {actions, Dir, Acts}), + actions(As, Dir, S); + +actions(CB, _, S) -> + S#transport{message_cb = CB}. + +%% cb/3 + +cb(#transport{message_cb = false, packet = P}, recv, Msg) -> + [pkt(P, true, Msg)]; + +cb(#transport{message_cb = CB, packet = P}, recv = D, Msg) -> + cb(CB, D, pkt(P, false, Msg)); + +cb(#transport{message_cb = CB}, Dir, Msg) -> + cb(CB, Dir, Msg); + +cb(false, send, Msg) -> + [Msg]; + +cb(CB, Dir, Msg) -> + diameter_lib:eval([CB, Dir, Msg]). + +%% pkt/3 + +pkt(false, _, {_Info, Bin}) -> + Bin; + +pkt(true, _, {[#sctp_sndrcvinfo{stream = Id}], Bin}) -> + #diameter_packet{bin = Bin, transport_data = {stream, Id}}; + +pkt(raw, true, {[Info], Bin}) -> + #diameter_packet{bin = Bin, transport_data = Info}; + +pkt(raw, false, {[_], _} = Msg) -> + Msg. diff --git a/lib/diameter/src/transport/diameter_sctp_sup.erl b/lib/diameter/src/transport/diameter_sctp_sup.erl index 36050aaf28..e8e26ec7c5 100644 --- a/lib/diameter/src/transport/diameter_sctp_sup.erl +++ b/lib/diameter/src/transport/diameter_sctp_sup.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -49,6 +49,7 @@ start() -> start_child(T) -> SupRef = case element(1,T) of + monitor -> ?TRANSPORT_SUP; connect -> ?TRANSPORT_SUP; accept -> ?TRANSPORT_SUP; listen -> ?LISTENER_SUP diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 44abc5c3b4..a8639baa11 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ %% -module(diameter_tcp). --dialyzer({no_fail_call, throttle/2}). -behaviour(gen_server). @@ -53,6 +52,7 @@ %% Keys into process dictionary. -define(INFO_KEY, info). -define(REF_KEY, ref). +-define(TRANSPORT_KEY, transport). -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). @@ -68,20 +68,26 @@ %% The same gen_server implementation supports three different kinds %% of processes: an actual transport process, one that will club it to %% death should the parent die before a connection is established, and -%% a process owning the listening port. +%% a process owning the listening port. The monitor process +%% historically died after connection establishment, but can now live +%% on as the sender of outgoing messages, so that a blocking send +%% doesn't prevent messages from being received. %% Listener process state. -record(listener, {socket :: inet:socket(), + module :: module(), service = false :: false | pid()}). %% service process %% Monitor process state. -record(monitor, - {parent :: pid(), - transport = self() :: pid()}). + {parent :: reference() | false | pid(), + transport = self() :: pid(), + ack = false :: boolean(), + socket :: inet:socket() | ssl:sslsocket() | undefined, + module :: module() | undefined}). -type length() :: 0..16#FFFFFF. %% message length from Diameter header --type size() :: non_neg_integer(). %% accumulated binary size --type frag() :: {length(), size(), binary(), list(binary())} +-type frag() :: maybe_improper_list(length(), binary()) | binary(). -type connect_option() :: {raddr, inet:ip_address()} @@ -97,25 +103,30 @@ -type listen_option() :: {accept, match()} | {ssl_options, true | [ssl:listen_option()]} + | option() | ssl:listen_option() | gen_tcp:listen_option(). -type option() :: {port, non_neg_integer()} - | {fragment_timer, 0..16#FFFFFFFF} - | {throttle_cb, diameter:evaluable()}. + | {sender, boolean()} + | sender + | {message_cb, false | diameter:eval()} + | {fragment_timer, 0..16#FFFFFFFF}. %% Accepting/connecting transport process state. -record(transport, {socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket + active = false :: boolean(), %% is socket active? + recv = true :: boolean(), %% should it be active? parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module - frag = <<>> :: frag(), %% message fragment ssl :: [term()] | boolean(), %% ssl options, ssl or not + frag = <<>> :: frag(), %% message fragment timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? - throttle_cb :: false | diameter:evaluable(), %% ask to receive - throttled :: boolean() | binary()}). %% stopped receiving? + message_cb :: false | diameter:eval(), + send :: pid() | false}). %% sending process %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -131,19 +142,18 @@ -> {ok, pid(), [inet:ip_address()]} when Ref :: diameter:transport_ref(); ({connect, Ref}, #diameter_service{}, [connect_option()]) - -> {ok, pid(), [inet:ip_address()]} - | {ok, pid()} + -> {ok, pid()} when Ref :: diameter:transport_ref(). start({T, Ref}, Svc, Opts) -> #diameter_service{capabilities = Caps, - pid = SPid} + pid = SvcPid} = Svc, diameter_tcp_sup:start(), %% start tcp supervisors on demand {Mod, Rest} = split(Opts), Addrs = Caps#diameter_caps.host_ip_address, - Arg = {T, Ref, Mod, self(), Rest, Addrs, SPid}, + Arg = {T, Ref, Mod, self(), Rest, Addrs, SvcPid}, diameter_tcp_sup:start_child(Arg). split([{module, M} | Opts]) -> @@ -197,74 +207,63 @@ init(T) -> %% i/1 %% A transport process. -i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) +i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid}) when T == accept; T == connect -> monitor(process, Pid), %% Since accept/connect might block indefinitely, spawn a process - %% that does nothing but kill us with the parent until call - %% returns. - {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), + %% that kills us with the parent until call returns, and then + %% sends outgoing messages. {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, - fragment_timer, - throttle_cb]), + sender, + message_cb, + fragment_timer]), SslOpts = ssl_opts(SO), OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), + [CB, Sender] = [proplists:get_value(K, OwnOpts, false) + || K <- [message_cb, sender]], ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), - Throttle = proplists:get_value(throttle_cb, OwnOpts, false), - Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid), - MPid ! {stop, self()}, %% tell the monitor to die + {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), + Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid), M = if SslOpts -> ssl; true -> Mod end, + Sender andalso monitor(process, MPid), + false == CB orelse (Pid ! {diameter, ack}), + MPid ! {start, self(), Sender andalso {Sock, M}, false /= CB}, putr(?REF_KEY, Ref), - throttle(#transport{parent = Pid, - module = M, - socket = Sock, - ssl = SslOpts, - timeout = Tmo, - throttle_cb = Throttle, - throttled = false /= Throttle}); + setopts(#transport{parent = Pid, + module = M, + socket = Sock, + ssl = SslOpts, + message_cb = CB, + timeout = Tmo, + send = Sender andalso MPid}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. -i({T, _Ref, _Mod, _Pid, _Opts, _Addrs} = Arg) %% from old code - when T == accept; - T == connect -> - i(erlang:append_element(Arg, _SPid = false)); - %% A monitor process to kill the transport if the parent dies. i(#monitor{parent = Pid, transport = TPid} = S) -> + putr(?TRANSPORT_KEY, TPid), proc_lib:init_ack({ok, self()}), - monitor(process, Pid), monitor(process, TPid), - S; + S#monitor{parent = monitor(process, Pid)}; %% In principle a link between the transport and killer processes %% could do the same thing: have the accepting/connecting process be %% killed when the killer process dies as a consequence of parent %% death. However, a link can be unlinked and this is exactly what -%% gen_tcp seems to so. Links should be left to supervisors. - -i({listen = L, Ref, _APid, T}) -> %% from old code - i({L, Ref, T}); +%% gen_tcp seems to do. Links should be left to supervisors. i({listen, Ref, {Mod, Opts, Addrs}}) -> [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {[LA, LP], Rest} = proplists:split(Opts, [ip, port]), - LAddrOpt = get_addr(LA, Addrs), - LPort = get_port(LP), - {ok, LSock} = Mod:listen(LPort, gen_opts(LAddrOpt, Rest)), - LAddr = laddr(LAddrOpt, Mod, LSock), + {[LP], Rest} = proplists:split(Opts, [port]), + {ok, LSock} = Mod:listen(get_port(LP), gen_opts(Addrs, Rest)), + {ok, {LAddr, _}} = sockname(Mod, LSock), true = diameter_reg:add_new({?MODULE, listener, {Ref, {LAddr, LSock}}}), proc_lib:init_ack({ok, self(), {LAddr, LSock}}), - #listener{socket = LSock}. - -laddr([], Mod, Sock) -> - {ok, {Addr, _Port}} = sockname(Mod, Sock), - Addr; -laddr([{ip, Addr}], _, _) -> - Addr. + #listener{socket = LSock, + module = Mod}. ssl_opts([]) -> false; @@ -279,19 +278,19 @@ ssl_opts(T) -> %% init/8 %% Establish a TLS connection before capabilities exchange ... -init(Type, Ref, Mod, Pid, true, Opts, Addrs, SPid) -> - init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SPid); +init(Type, Ref, Mod, Pid, true, Opts, Addrs, SvcPid) -> + init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SvcPid); %% ... or not. -init(Type, Ref, Mod, Pid, _, Opts, Addrs, SPid) -> - init(Type, Ref, Mod, Pid, Opts, Addrs, SPid). +init(Type, Ref, Mod, Pid, _, Opts, Addrs, SvcPid) -> + init(Type, Ref, Mod, Pid, Opts, Addrs, SvcPid). %% init/7 -init(accept = T, Ref, Mod, Pid, Opts, Addrs, SPid) -> +init(accept = T, Ref, Mod, Pid, Opts, Addrs, SvcPid) -> {[Matches], Rest} = proplists:split(Opts, [accept]), {ok, LPid, {LAddr, LSock}} = listener(Ref, {Mod, Rest, Addrs}), - ok = gen_server:call(LPid, {accept, SPid}, infinity), + ok = gen_server:call(LPid, {accept, SvcPid}, infinity), proc_lib:init_ack({ok, self(), [LAddr]}), Sock = ok(accept(Mod, LSock)), ok = accept_peer(Mod, Sock, accept(Matches)), @@ -299,25 +298,17 @@ init(accept = T, Ref, Mod, Pid, Opts, Addrs, SPid) -> diameter_peer:up(Pid), Sock; -init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SPid) -> - {[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]), - LAddrOpt = get_addr(LA, Addrs), +init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SvcPid) -> + {[RA, RP], Rest} = proplists:split(Opts, [raddr, rport]), RAddr = get_addr(RA), RPort = get_port(RP), - proc_lib:init_ack(init_rc(LAddrOpt)), - Sock = ok(connect(Mod, RAddr, RPort, gen_opts(LAddrOpt, Rest))), + proc_lib:init_ack({ok, self()}), + Sock = ok(connect(Mod, RAddr, RPort, gen_opts(Addrs, Rest))), publish(Mod, T, Ref, Sock), - up(Pid, {RAddr, RPort}, LAddrOpt, Mod, Sock), + up(Pid, {RAddr, RPort}, Mod, Sock), Sock. -init_rc([{ip, Addr}]) -> - {ok, self(), [Addr]}; -init_rc([]) -> - {ok, self()}. - -up(Pid, Remote, [{ip, _Addr}], _, _) -> - diameter_peer:up(Pid, Remote); -up(Pid, Remote, [], Mod, Sock) -> +up(Pid, Remote, Mod, Sock) -> {Addr, _Port} = ok(sockname(Mod, Sock)), diameter_peer:up(Pid, Remote, [Addr]). @@ -374,25 +365,41 @@ l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> l([], Ref, T) -> diameter_tcp_sup:start_child({listen, Ref, T}). -%% get_addr/1 +%% addrs/2 +%% +%% Take the first address from the service if several are specified +%% and not address is configured. + +addrs(Addrs, Opts) -> + case lists:mapfoldr(fun ipaddr/2, [], Opts) of + {Os, [_]} -> + Os; + {_, []} -> + Opts ++ [{ip, A} || [A|_] <- [Addrs]]; + {_, As} -> + ?ERROR({invalid_addrs, As, Addrs}) + end. -get_addr(As) -> - diameter_lib:ipaddr(addr(As, [])). +ipaddr({K,A}, As) + when K == ifaddr; + K == ip -> + {{ip, ipaddr(A)}, [A | As]}; +ipaddr(T, B) -> + {T, B}. -%% get_addr/2 +ipaddr(A) + when A == loopback; + A == any -> + A; +ipaddr(A) -> + diameter_lib:ipaddr(A). -get_addr([], []) -> - []; -get_addr(As, Def) -> - [{ip, diameter_lib:ipaddr(addr(As, Def))}]. +%% get_addr/1 -%% Take the first address from the service if several are unspecified. -addr([], [Addr | _]) -> - Addr; -addr([{_, Addr}], _) -> - Addr; -addr(As, Addrs) -> - ?ERROR({invalid_addrs, As, Addrs}). +get_addr([{_, Addr}]) -> + diameter_lib:ipaddr(Addr); +get_addr(Addrs) -> + ?ERROR({invalid_addrs, Addrs}). %% get_port/1 @@ -405,10 +412,15 @@ get_port(Ps) -> %% gen_opts/2 -gen_opts(LAddrOpt, Opts) -> +gen_opts(Addrs, Opts) -> + gen_opts(addrs(Addrs, Opts)). + +%% gen_opts/1 + +gen_opts(Opts) -> {L,_} = proplists:split(Opts, [binary, packet, active]), [[],[],[]] == L orelse ?ERROR({reserved_options, Opts}), - [binary, {packet, 0}, {active, false}] ++ LAddrOpt ++ Opts. + [binary, {packet, 0}, {active, false} | Opts]. %% --------------------------------------------------------------------------- %% # ports/1 @@ -451,14 +463,18 @@ portnr(Sock) -> %% # handle_call/3 %% --------------------------------------------------------------------------- -handle_call({accept, SPid}, _From, #listener{service = P} = S) -> - {reply, ok, if not is_pid(P), is_pid(SPid) -> - monitor(process, SPid), - S#listener{service = SPid}; +handle_call({accept, SvcPid}, _From, #listener{service = P} = S) -> + {reply, ok, if not is_pid(P), is_pid(SvcPid) -> + monitor(process, SvcPid), + S#listener{service = SvcPid}; true -> S end}; - + +%% Transport is telling us of parent death. +handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> + {stop, {shutdown, Reason}, ok, S}; + handle_call(_, _, State) -> {reply, nok, State}. @@ -480,8 +496,7 @@ handle_info(T, #listener{} = S) -> {noreply, #listener{} = l(T,S)}; handle_info(T, #monitor{} = S) -> - m(T,S), - x(T). + {noreply, #monitor{} = m(T,S)}. %% --------------------------------------------------------------------------- %% # code_change/3 @@ -497,6 +512,7 @@ code_change(_, State, _) -> terminate(_, _) -> ok. + %% --------------------------------------------------------------------------- putr(Key, Val) -> @@ -509,18 +525,47 @@ getr(Key) -> %% %% Transition monitor state. +%% Outgoing message. +m(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + send(Msg, S), + S; + +%% Transport has established a connection. Stop monitoring on the +%% parent so as not to die before a send from the transport. +m({start, TPid, T, Ack} = M, #monitor{transport = TPid} = S) -> + case T of + {Sock, Mod} -> + demonitor(S#monitor.parent, [flush]), + S#monitor{parent = false, + socket = Sock, + module = Mod, + ack = Ack}; + false -> %% monitor not sending + x(M) + end; + %% Transport is telling us to die. -m({stop, TPid}, #monitor{transport = TPid}) -> - ok; +m({stop, TPid} = T, #monitor{transport = TPid}) -> + x(T); -%% Transport has died. -m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) -> - ok; +%% Transport is telling us to die. +m({stop, TPid} = T, #monitor{transport = TPid}) -> + x(T); -%% Transport parent has died. -m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid, - transport = TPid}) -> - exit(TPid, {shutdown, parent}). +%% Transport is telling us that TLS has been negotiated after +%% capabilities exchange. +m({tls, SSock}, S) -> + S#monitor{socket = SSock, + module = ssl}; + +%% Transport or parent has died. +m({'DOWN', M, process, P, _} = T, #monitor{parent = MRef, + transport = TPid}) + when M == MRef; + P == TPid -> + x(T). %% l/2 %% @@ -528,18 +573,16 @@ m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid, %% Service process has died. l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid, - socket = Sock}) -> - gen_tcp:close(Sock), + socket = Sock, + module = M}) -> + M:close(Sock), x(T); %% Transport has been removed. -l({transport, remove, _} = T, #listener{socket = Sock}) -> - gen_tcp:close(Sock), - x(T); - -%% Possibly death of an accepting process monitored in old code. -l(_, S) -> - S. +l({transport, remove, _} = T, #listener{socket = Sock, + module = M}) -> + M:close(Sock), + x(T). %% t/2 %% @@ -557,21 +600,14 @@ t(T,S) -> %% transition/2 -%% Incoming message. +%% Incoming packets. transition({P, Sock, Bin}, #transport{socket = Sock, ssl = B, - throttled = T} + frag = Frag} = S) when P == ssl, true == B; P == tcp -> - false = T, %% assert - recv(Bin, S); - -%% Make a new throttling callback after a timeout. -transition(throttle, #transport{throttled = false}) -> - ok; -transition(throttle, S) -> - throttle(S); + recv(acc(Frag, Bin), S); %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} @@ -581,7 +617,7 @@ transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = NS = tls_handshake(Type, B, S), Pid ! {diameter, {tls, Ref}}, - throttle(NS#transport{ssl = B}); + NS#transport{ssl = B}; transition({C, Sock}, #transport{socket = Sock, ssl = B}) @@ -597,8 +633,18 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, ?ERROR({T,S}); %% Outgoing message. -transition({diameter, {send, Bin}}, S) -> - send(Bin, S); +transition({diameter, {send, Msg}}, #transport{} = S) -> + message(send, Msg, S); + +%% Monitor has sent an outgoing message. +transition(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + message(ack, Msg, S); + +%% Deferred actions from a message_cb. +transition({actions, Dir, Acts}, S) -> + setopts(actions(Acts, Dir, S)); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid, @@ -618,8 +664,18 @@ transition({resolve_port, Pid}, #transport{socket = Sock, Pid ! portnr(M, Sock), ok; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> +%% Parent process has died: call the monitor to not close the socket +%% during an ongoing send, but don't let it take forever. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, + send = MPid}) -> + false == MPid + orelse (ok == gen_server:call(MPid, {stop, self()}, 1000)) + orelse exit(MPid, {shutdown, parent}), + stop; + +%% Monitor process has died. +transition({'DOWN', _, process, MPid, _}, #transport{send = MPid}) + when is_pid(MPid) -> stop. %% Crash on anything unexpected. @@ -643,11 +699,13 @@ tls_handshake(_, true, #transport{ssl = false}) -> %% Capabilities exchange negotiated TLS: upgrade the connection. tls_handshake(Type, true, #transport{socket = Sock, module = M, - ssl = Opts} + ssl = Opts, + send = MPid} = S) -> {ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]), Ref = getr(?REF_KEY), true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}), + false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process S#transport{socket = SSock, module = ssl}; @@ -666,92 +724,77 @@ tls(accept, Sock, Opts) -> %% using Nagle. %% Receive packets until a full message is received, -recv(Bin, #transport{frag = Head, throttled = false} = S) -> - case rcv(Head, Bin) of - {Msg, B} -> - throttle(S#transport{frag = B, throttled = Msg}); - Frag -> - setopts(S), - start_fragment_timer(S#transport{frag = Frag, - flush = false}) - end. -%% recv/1 +recv({Msg, Rest}, S) -> %% have a complete message ... + recv(acc(Rest), message(recv, Msg, S)); -recv(#transport{throttled = false} = S) -> - recv(<<>>, S); +recv(Frag, #transport{recv = B, + socket = Sock, + module = M} + = S) -> %% or not + B andalso setopts(M, Sock), + start_fragment_timer(S#transport{frag = Frag, + flush = false, + active = B}). -recv(#transport{} = S) -> - S. +%% acc/2 -%% rcv/2 +%% Know how many bytes to extract. +acc([Len | Acc], Bin) -> + acc1(Len, <<Acc/binary, Bin/binary>>); -%% No previous fragment. -rcv(<<>>, Bin) -> - rcv(Bin); +%% Or not. +acc(Head, Bin) -> + acc(<<Head/binary, Bin/binary>>). -%% Not even the first four bytes of the header. -rcv(Head, Bin) - when is_binary(Head) -> - rcv(<<Head/binary, Bin/binary>>); - -%% Or enough to know how many bytes to extract. -rcv({Len, N, Head, Acc}, Bin) -> - rcv(Len, N + size(Bin), Head, [Bin | Acc]). - -%% rcv/4 +%% acc1/3 %% Extract a message for which we have all bytes. -rcv(Len, N, Head, Acc) - when Len =< N -> - recv1(Len, bin(Head, Acc)); +acc1(Len, Bin) + when Len =< byte_size(Bin) -> + split_binary(Bin, Len); %% Wait for more packets. -rcv(Len, N, Head, Acc) -> - {Len, N, Head, Acc}. - -%% rcv/1 - -%% Nothing left. -rcv(<<>> = Bin) -> - Bin; - -%% The Message Length isn't even sufficient for a header. Chances are -%% things will go south from here but if we're lucky then the bytes we -%% have extend to an intended message boundary and we can recover by -%% simply receiving them. Make it so. -rcv(<<_:1/binary, Len:24, _/binary>> = Bin) - when Len < 20 -> - {Bin, <<>>}; - -%% Enough bytes to extract a message. -rcv(<<_:1/binary, Len:24, _/binary>> = Bin) - when Len =< size(Bin) -> - recv1(Len, Bin); - -%% Or not: wait for more packets. -rcv(<<_:1/binary, Len:24, _/binary>> = Head) -> - {Len, size(Head), Head, []}; +acc1(Len, Bin) -> + [Len | Bin]. + +%% acc/1 + +%% Don't match on Bin since this results in it being copied at the +%% next append according to the Efficiency Guide. This is also the +%% reason that the Len is extracted and maintained when accumulating +%% messages. The simplest implementation is just to accumulate a +%% binary and match <<_, Len:24, _/binary>> each time the length is +%% required, but the performance of this decays quadratically with the +%% message length, since the binary is then copied with each append of +%% additional bytes from gen_tcp. + +acc(Bin) + when 3 < byte_size(Bin) -> + {Head, _} = split_binary(Bin, 4), + [_,A,B,C] = binary_to_list(Head), + Len = (A bsl 16) bor (B bsl 8) bor C, + if Len < 20 -> + %% Message length isn't sufficient for a Diameter Header. + %% Chances are things will go south from here but if we're + %% lucky then the bytes we have extend to an intended + %% message boundary and we can recover by simply receiving + %% them. Make it so. + {Bin, <<>>}; + true -> + acc1(Len, Bin) + end; %% Not even 4 bytes yet. -rcv(Head) -> - Head. - -%% recv1/2 - -recv1(Len, Bin) -> - <<Msg:Len/binary, Rest/binary>> = Bin, - {Msg, Rest}. +acc(Bin) -> + Bin. -%% bin/1-2 +%% bin/1 -bin(Head, Acc) -> - list_to_binary([Head | lists:reverse(Acc)]). +bin([_ | Bin]) -> + Bin; -bin({_, _, Head, Acc}) -> - bin(Head, Acc); -bin(Bin) - when is_binary(Bin) -> +bin(Bin) -> Bin. %% flush/1 @@ -768,9 +811,7 @@ bin(Bin) %% also eventually lead to watchdog failover. %% No fragment to flush or not receiving messages. -flush(#transport{frag = Frag, throttled = B} = S) - when Frag == <<>>; - B /= false -> +flush(#transport{frag = <<>>} = S) -> S; %% Messages have been received since last timer expiry. @@ -778,9 +819,8 @@ flush(#transport{flush = false} = S) -> start_fragment_timer(S#transport{flush = true}); %% No messages since last expiry. -flush(#transport{frag = Frag, parent = Pid} = S) -> - diameter_peer:recv(Pid, bin(Frag)), - S#transport{frag = <<>>}. +flush(#transport{frag = Frag} = S) -> + message(recv, bin(Frag), S#transport{frag = <<>>}). %% start_fragment_timer/1 %% @@ -813,9 +853,27 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(Bin, #transport{socket = Sock, - module = M}) -> - case send(M, Sock, Bin) of +send(Msg, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) -> + send1(M, Sock, Msg), + B andalso (TPid ! Msg); + +send(Msg, #transport{socket = Sock, module = M, send = false} = S) -> + send1(M, Sock, Msg), + message(ack, Msg, S); + +%% Send from the monitor process to avoid deadlock if both the +%% receiver and the peer were to block in send. +send(Msg, #transport{send = Pid} = S) -> + Pid ! Msg, + S. + +%% send1/3 + +send1(Mod, Sock, #diameter_packet{bin = Bin}) -> + send1(Mod, Sock, Bin); + +send1(Mod, Sock, Bin) -> + case send(Mod, Sock, Bin) of ok -> ok; {error, Reason} -> @@ -842,119 +900,24 @@ setopts(M, Sock, Opts) -> %% setopts/1 -setopts(#transport{socket = Sock, module = M}) -> - setopts(M, Sock). +setopts(#transport{socket = Sock, + active = A, + recv = B, + module = M} + = S) + when B, not A -> + setopts(M, Sock), + S#transport{active = true}; + +setopts(S) -> + S. %% setopts/2 setopts(M, Sock) -> case setopts(M, Sock, [{active, once}]) of ok -> ok; - X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect - end. - -%% throttle/1 - -%% Still collecting packets for a complete message: keep receiving. -throttle(#transport{throttled = false} = S) -> - recv(S); - -%% Decide whether to receive another, or whether to accept a message -%% that's been received. -throttle(#transport{throttle_cb = F, throttled = T} = S) -> - Res = cb(F, T), - - try throttle(Res, S) of - #transport{ssl = SB} = NS when is_boolean(SB) -> - throttle(defrag(NS)); - #transport{throttled = Msg} = NS when is_binary(Msg) -> - %% Initial incoming message when we might need to upgrade - %% to TLS: wait for reception of a tls tuple. - defrag(NS) - catch - #transport{} = NS -> - recv(NS) - end. - -%% cb/2 - -cb(false, _) -> - ok; - -cb(F, B) -> - diameter_lib:eval([F, true /= B andalso B]). - -%% throttle/2 - -%% Callback says to receive another message. -throttle(ok, #transport{throttled = true} = S) -> - throw(S#transport{throttled = false}); - -%% Callback says to accept a received message. -throttle(ok, #transport{parent = Pid, throttled = Msg} = S) - when is_binary(Msg) -> - diameter_peer:recv(Pid, Msg), - S; - -throttle({ok = T, F}, S) -> - throttle(T, S#transport{throttle_cb = F}); - -%% Callback says to accept a received message and acknowledged the -%% returned pid with a {request, Pid} message if a request pid is -%% spawned, a discard message otherwise. The latter does not mean that -%% the message was necessarily discarded: it could have been an -%% answer. -throttle(NPid, #transport{parent = Pid, throttled = Msg} = S) - when is_pid(NPid), is_binary(Msg) -> - diameter_peer:recv(Pid, {Msg, NPid}), - S; - -throttle({NPid, F}, #transport{throttled = Msg} = S) - when is_pid(NPid), is_binary(Msg) -> - throttle(NPid, S#transport{throttle_cb = F}); - -%% Callback to accept a received message says to discard it. -throttle(discard, #transport{throttled = Msg} = S) - when is_binary(Msg) -> - S; - -throttle({discard = T, F}, #transport{throttled = Msg} = S) - when is_binary(Msg) -> - throttle(T, S#transport{throttle_cb = F}); - -%% Callback to accept a received message says to answer it with the -%% supplied binary. -throttle(Bin, #transport{throttled = Msg} = S) - when is_binary(Bin), is_binary(Msg) -> - send(Bin, S), - S; - -throttle({Bin, F}, #transport{throttled = Msg} = S) - when is_binary(Bin), is_binary(Msg) -> - throttle(Bin, S#transport{throttle_cb = F}); - -%% Callback says to ask again in the specified number of milliseconds. -throttle({timeout, Tmo}, S) -> - erlang:send_after(Tmo, self(), throttle), - throw(S); - -throttle({timeout = T, Tmo, F}, S) -> - throttle({T, Tmo}, S#transport{throttle_cb = F}); - -throttle(T, #transport{throttle_cb = F}) -> - ?ERROR({invalid_return, T, F}). - -%% defrag/1 -%% -%% Try to extract another message from packets already read before -%% another throttling callback. - -defrag(#transport{frag = Head} = S) -> - case rcv(Head, <<>>) of - {Msg, B} -> - S#transport{throttled = Msg, frag = B}; - _ -> - S#transport{throttled = true} + X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect end. %% portnr/2 @@ -990,3 +953,80 @@ getstat(gen_tcp, Sock) -> getstat(M, Sock) -> M:getstat(Sock). %% Note that ssl:getstat/1 doesn't yet exist in R15B01. + +%% A message_cb is invoked whenever a message is sent or received, or +%% to provide acknowledgement of a completed send or discarded +%% request. Ignoring possible extra arguments, calls are of the +%% following form. +%% +%% cb(recv, Msg) Receive a message into diameter? +%% cb(send, Msg) Send a message on the socket? +%% cb(ack, Msg) Acknowledgement of a completed send. +%% cb(ack, false) Acknowledgement of a discarded request. +%% +%% Msg will be binary() in a recv callback, but can be a +%% diameter_packet record in a send/ack callback if a recv/send +%% callback returns a record. Callbacks return a list of the following +%% form. +%% +%% [boolean() | send | recv | binary() | #diameter_packet{}] +%% +%% The atoms are meaningless by themselves, but say whether subsequent +%% messages are to be sent or received. A boolean says whether or not +%% to continue reading on the socket. Messages can be received even +%% after false is returned if these arrived in the same packet. A +%% leading recv or send is implicit on the corresponding callbacks. A +%% new callback can be returned as the tail of a returned list: any +%% value not of the aforementioned list type is interpreted as a +%% callback. + +%% message/3 + +message(send, false = M, S) -> + message(ack, M, S); + +message(ack, _, #transport{message_cb = false} = S) -> + S; + +message(Dir, Msg, #transport{message_cb = CB} = S) -> + setopts(actions(cb(CB, Dir, Msg), Dir, S)). + +%% actions/3 + +actions([], _, S) -> + S; + +actions([B | As], Dir, S) + when is_boolean(B) -> + actions(As, Dir, S#transport{recv = B}); + +actions([Dir | As], _, S) + when Dir == send; + Dir == recv -> + actions(As, Dir, S); + +actions([Msg | As], send = Dir, S) + when is_binary(Msg); + is_record(Msg, diameter_packet) -> + actions(As, Dir, send(Msg, S)); + +actions([Msg | As], recv = Dir, #transport{parent = Pid} = S) + when is_binary(Msg); + is_record(Msg, diameter_packet) -> + diameter_peer:recv(Pid, Msg), + actions(As, Dir, S); + +actions([{defer, Tmo, Acts} | As], Dir, S) -> + erlang:send_after(Tmo, self(), {actions, Dir, Acts}), + actions(As, Dir, S); + +actions(CB, _, S) -> + S#transport{message_cb = CB}. + +%% cb/3 + +cb(false, _, Msg) -> + [Msg]; + +cb(CB, Dir, Msg) -> + diameter_lib:eval([CB, Dir, Msg]). |