diff options
author | Sverker Eriksson <[email protected]> | 2017-08-30 21:00:35 +0200 |
---|---|---|
committer | Sverker Eriksson <[email protected]> | 2017-08-30 21:00:35 +0200 |
commit | 44a83c8860bbd00878c720a7b9d940b4630bab8a (patch) | |
tree | 101b3c52ec505a94f56c8f70e078ecb8a2e8c6cd /lib/diameter/src/transport/diameter_tcp.erl | |
parent | 7c67bbddb53c364086f66260701bc54a61c9659c (diff) | |
parent | 040bdce67f88d833bfb59adae130a4ffb4c180f0 (diff) | |
download | otp-44a83c8860bbd00878c720a7b9d940b4630bab8a.tar.gz otp-44a83c8860bbd00878c720a7b9d940b4630bab8a.tar.bz2 otp-44a83c8860bbd00878c720a7b9d940b4630bab8a.zip |
Merge tag 'OTP-20.0' into sverker/20/binary_to_atom-utf8-crash/ERL-474/OTP-14590
Diffstat (limited to 'lib/diameter/src/transport/diameter_tcp.erl')
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 533 |
1 files changed, 296 insertions, 237 deletions
diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 546c2cfa5e..a2f393d5d4 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ %% -module(diameter_tcp). --dialyzer({no_fail_call, throttle/2}). -behaviour(gen_server). @@ -53,6 +52,7 @@ %% Keys into process dictionary. -define(INFO_KEY, info). -define(REF_KEY, ref). +-define(TRANSPORT_KEY, transport). -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). @@ -68,19 +68,23 @@ %% The same gen_server implementation supports three different kinds %% of processes: an actual transport process, one that will club it to %% death should the parent die before a connection is established, and -%% a process owning the listening port. +%% a process owning the listening port. The monitor process +%% historically died after connection establishment, but can now live +%% on as the sender of outgoing messages, so that a blocking send +%% doesn't prevent messages from being received. %% Listener process state. --record(listener, {socket :: inet:socket(), - count = 1 :: non_neg_integer()}). %% accepting processes -%% The count of accepting processes was previously used to terminate -%% the listening process, but diameter_reg:subscribe/2 is now used for -%% this. Leave the the count for trace purposes. +-record(listener, {socket :: inet:socket(), + module :: module(), + service = false :: false | pid()}). %% service process %% Monitor process state. -record(monitor, - {parent :: pid(), - transport = self() :: pid()}). + {parent :: reference() | false | pid(), + transport = self() :: pid(), + ack = false :: boolean(), + socket :: inet:socket() | ssl:sslsocket() | undefined, + module :: module() | undefined}). -type length() :: 0..16#FFFFFF. %% message length from Diameter header -type size() :: non_neg_integer(). %% accumulated binary size @@ -100,25 +104,30 @@ -type listen_option() :: {accept, match()} | {ssl_options, true | [ssl:listen_option()]} + | option() | ssl:listen_option() | gen_tcp:listen_option(). -type option() :: {port, non_neg_integer()} - | {fragment_timer, 0..16#FFFFFFFF} - | {throttle_cb, diameter:evaluable()}. + | {sender, boolean()} + | sender + | {message_cb, false | diameter:evaluable()} + | {fragment_timer, 0..16#FFFFFFFF}. %% Accepting/connecting transport process state. -record(transport, {socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket + active = false :: boolean(), %% is socket active? + recv = true :: boolean(), %% should it be active? parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module - frag = <<>> :: frag(), %% message fragment ssl :: [term()] | boolean(), %% ssl options, ssl or not + frag = <<>> :: frag(), %% message fragment timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? - throttle_cb :: false | diameter:evaluable(), %% ask to receive - throttled :: boolean() | binary()}). %% stopped receiving? + message_cb :: false | diameter:evaluable(), + send :: pid() | false}). %% sending process %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -138,11 +147,15 @@ | {ok, pid()} when Ref :: diameter:transport_ref(). -start({T, Ref}, #diameter_service{capabilities = Caps}, Opts) -> +start({T, Ref}, Svc, Opts) -> + #diameter_service{capabilities = Caps, + pid = SvcPid} + = Svc, + diameter_tcp_sup:start(), %% start tcp supervisors on demand {Mod, Rest} = split(Opts), Addrs = Caps#diameter_caps.host_ip_address, - Arg = {T, Ref, Mod, self(), Rest, Addrs}, + Arg = {T, Ref, Mod, self(), Rest, Addrs, SvcPid}, diameter_tcp_sup:start_child(Arg). split([{module, M} | Opts]) -> @@ -196,61 +209,65 @@ init(T) -> %% i/1 %% A transport process. -i({T, Ref, Mod, Pid, Opts, Addrs}) +i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid}) when T == accept; T == connect -> monitor(process, Pid), %% Since accept/connect might block indefinitely, spawn a process - %% that does nothing but kill us with the parent until call - %% returns. - {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), + %% that kills us with the parent until call returns, and then + %% sends outgoing messages. {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, - fragment_timer, - throttle_cb]), + sender, + message_cb, + fragment_timer]), SslOpts = ssl_opts(SO), OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), + [CB, Sender] = [proplists:get_value(K, OwnOpts, false) + || K <- [message_cb, sender]], ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), - Throttle = proplists:get_value(throttle_cb, OwnOpts, false), - Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs), - MPid ! {stop, self()}, %% tell the monitor to die + {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), + Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid), M = if SslOpts -> ssl; true -> Mod end, + Sender andalso monitor(process, MPid), + false == CB orelse (Pid ! {diameter, ack}), + MPid ! {start, self(), Sender andalso {Sock, M}, false /= CB}, putr(?REF_KEY, Ref), - throttle(#transport{parent = Pid, - module = M, - socket = Sock, - ssl = SslOpts, - timeout = Tmo, - throttle_cb = Throttle, - throttled = false /= Throttle}); + setopts(#transport{parent = Pid, + module = M, + socket = Sock, + ssl = SslOpts, + message_cb = CB, + timeout = Tmo, + send = Sender andalso MPid}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. %% A monitor process to kill the transport if the parent dies. i(#monitor{parent = Pid, transport = TPid} = S) -> + putr(?TRANSPORT_KEY, TPid), proc_lib:init_ack({ok, self()}), - monitor(process, Pid), monitor(process, TPid), - S; + S#monitor{parent = monitor(process, Pid)}; %% In principle a link between the transport and killer processes %% could do the same thing: have the accepting/connecting process be %% killed when the killer process dies as a consequence of parent %% death. However, a link can be unlinked and this is exactly what -%% gen_tcp seems to so. Links should be left to supervisors. +%% gen_tcp seems to do. Links should be left to supervisors. -i({listen, LRef, APid, {Mod, Opts, Addrs}}) -> - [_] = diameter_config:subscribe(LRef, transport), %% assert existence +i({listen, Ref, {Mod, Opts, Addrs}}) -> + [_] = diameter_config:subscribe(Ref, transport), %% assert existence {[LA, LP], Rest} = proplists:split(Opts, [ip, port]), LAddrOpt = get_addr(LA, Addrs), LPort = get_port(LP), {ok, LSock} = Mod:listen(LPort, gen_opts(LAddrOpt, Rest)), LAddr = laddr(LAddrOpt, Mod, LSock), - true = diameter_reg:add_new({?MODULE, listener, {LRef, {LAddr, LSock}}}), + true = diameter_reg:add_new({?MODULE, listener, {Ref, {LAddr, LSock}}}), proc_lib:init_ack({ok, self(), {LAddr, LSock}}), - monitor(process, APid), - #listener{socket = LSock}. + #listener{socket = LSock, + module = Mod}. laddr([], Mod, Sock) -> {ok, {Addr, _Port}} = sockname(Mod, Sock), @@ -268,21 +285,22 @@ ssl_opts([{ssl_options, Opts}]) ssl_opts(T) -> ?ERROR({ssl_options, T}). -%% init/7 +%% init/8 %% Establish a TLS connection before capabilities exchange ... -init(Type, Ref, Mod, Pid, true, Opts, Addrs) -> - init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs); +init(Type, Ref, Mod, Pid, true, Opts, Addrs, SvcPid) -> + init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SvcPid); %% ... or not. -init(Type, Ref, Mod, Pid, _, Opts, Addrs) -> - init(Type, Ref, Mod, Pid, Opts, Addrs). +init(Type, Ref, Mod, Pid, _, Opts, Addrs, SvcPid) -> + init(Type, Ref, Mod, Pid, Opts, Addrs, SvcPid). -%% init/6 +%% init/7 -init(accept = T, Ref, Mod, Pid, Opts, Addrs) -> +init(accept = T, Ref, Mod, Pid, Opts, Addrs, SvcPid) -> {[Matches], Rest} = proplists:split(Opts, [accept]), - {LAddr, LSock} = listener(Ref, {Mod, Rest, Addrs}), + {ok, LPid, {LAddr, LSock}} = listener(Ref, {Mod, Rest, Addrs}), + ok = gen_server:call(LPid, {accept, SvcPid}, infinity), proc_lib:init_ack({ok, self(), [LAddr]}), Sock = ok(accept(Mod, LSock)), ok = accept_peer(Mod, Sock, accept(Matches)), @@ -290,7 +308,7 @@ init(accept = T, Ref, Mod, Pid, Opts, Addrs) -> diameter_peer:up(Pid), Sock; -init(connect = T, Ref, Mod, Pid, Opts, Addrs) -> +init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SvcPid) -> {[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]), LAddrOpt = get_addr(LA, Addrs), RAddr = get_addr(RA), @@ -344,24 +362,26 @@ accept(Opts) -> %% Accepting processes can be started concurrently: ensure only one %% listener is started. -listener(LRef, T) -> - diameter_sync:call({?MODULE, listener, LRef}, - {?MODULE, listener, [{LRef, T, self()}]}, +listener(Ref, T) -> + diameter_sync:call({?MODULE, listener, Ref}, + {?MODULE, listener, [{Ref, T, self()}]}, infinity, infinity). -listener({LRef, T, TPid}) -> - l(diameter_reg:match({?MODULE, listener, {LRef, '_'}}), LRef, T, TPid). +%% listener/1 + +listener({Ref, T, _TPid}) -> + l(diameter_reg:match({?MODULE, listener, {Ref, '_'}}), Ref, T). + +%% l/3 %% Existing listening process ... -l([{{?MODULE, listener, {_, AS}}, LPid}], _, _, TPid) -> - LPid ! {accept, TPid}, - AS; +l([{{?MODULE, listener, {_, AS}}, LPid}], _, _) -> + {ok, LPid, AS}; %% ... or not. -l([], LRef, T, TPid) -> - {ok, _, AS} = diameter_tcp_sup:start_child({listen, LRef, TPid, T}), - AS. +l([], Ref, T) -> + diameter_tcp_sup:start_child({listen, Ref, T}). %% get_addr/1 @@ -440,6 +460,18 @@ portnr(Sock) -> %% # handle_call/3 %% --------------------------------------------------------------------------- +handle_call({accept, SvcPid}, _From, #listener{service = P} = S) -> + {reply, ok, if not is_pid(P), is_pid(SvcPid) -> + monitor(process, SvcPid), + S#listener{service = SvcPid}; + true -> + S + end}; + +%% Transport is telling us of parent death. +handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> + {stop, {shutdown, Reason}, ok, S}; + handle_call(_, _, State) -> {reply, nok, State}. @@ -461,8 +493,7 @@ handle_info(T, #listener{} = S) -> {noreply, #listener{} = l(T,S)}; handle_info(T, #monitor{} = S) -> - m(T,S), - x(T). + {noreply, #monitor{} = m(T,S)}. %% --------------------------------------------------------------------------- %% # code_change/3 @@ -478,6 +509,7 @@ code_change(_, State, _) -> terminate(_, _) -> ok. + %% --------------------------------------------------------------------------- putr(Key, Val) -> @@ -490,35 +522,63 @@ getr(Key) -> %% %% Transition monitor state. -%% Transport is telling us to die. -m({stop, TPid}, #monitor{transport = TPid}) -> - ok; +%% Outgoing message. +m(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + send(Msg, S), + S; -%% Transport has died. -m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) -> - ok; +%% Transport has established a connection. Stop monitoring on the +%% parent so as not to die before a send from the transport. +m({start, TPid, T, Ack} = M, #monitor{transport = TPid} = S) -> + case T of + {Sock, Mod} -> + demonitor(S#monitor.parent, [flush]), + S#monitor{parent = false, + socket = Sock, + module = Mod, + ack = Ack}; + false -> %% monitor not sending + x(M) + end; -%% Transport parent has died. -m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid, - transport = TPid}) -> - exit(TPid, {shutdown, parent}). +%% Transport is telling us to die. +m({stop, TPid} = T, #monitor{transport = TPid}) -> + x(T); + +%% Transport is telling us to die. +m({stop, TPid} = T, #monitor{transport = TPid}) -> + x(T); + +%% Transport is telling us that TLS has been negotiated after +%% capabilities exchange. +m({tls, SSock}, S) -> + S#monitor{socket = SSock, + module = ssl}; + +%% Transport or parent has died. +m({'DOWN', M, process, P, _} = T, #monitor{parent = MRef, + transport = TPid}) + when M == MRef; + P == TPid -> + x(T). %% l/2 %% %% Transition listener state. -%% An accepting transport is attaching. -l({accept, TPid}, #listener{count = N} = S) -> - monitor(process, TPid), - S#listener{count = N+1}; - -%% Accepting process has died. -l({'DOWN', _, process, _, _}, #listener{count = N} = S) -> - S#listener{count = N-1}; +%% Service process has died. +l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid, + socket = Sock, + module = M}) -> + M:close(Sock), + x(T); %% Transport has been removed. -l({transport, remove, _} = T, #listener{socket = Sock}) -> - gen_tcp:close(Sock), +l({transport, remove, _} = T, #listener{socket = Sock, + module = M}) -> + M:close(Sock), x(T). %% t/2 @@ -537,21 +597,13 @@ t(T,S) -> %% transition/2 -%% Incoming message. +%% Incoming packets. transition({P, Sock, Bin}, #transport{socket = Sock, - ssl = B, - throttled = T} + ssl = B} = S) when P == ssl, true == B; P == tcp -> - false = T, %% assert - recv(Bin, S); - -%% Make a new throttling callback after a timeout. -transition(throttle, #transport{throttled = false}) -> - ok; -transition(throttle, S) -> - throttle(S); + recv(Bin, S#transport{active = false}); %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} @@ -561,7 +613,7 @@ transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = NS = tls_handshake(Type, B, S), Pid ! {diameter, {tls, Ref}}, - throttle(NS#transport{ssl = B}); + NS#transport{ssl = B}; transition({C, Sock}, #transport{socket = Sock, ssl = B}) @@ -577,8 +629,18 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, ?ERROR({T,S}); %% Outgoing message. -transition({diameter, {send, Bin}}, S) -> - send(Bin, S); +transition({diameter, {send, Msg}}, #transport{} = S) -> + message(send, Msg, S); + +%% Monitor has sent an outgoing message. +transition(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + message(ack, Msg, S); + +%% Deferred actions from a message_cb. +transition({actions, Dir, Acts}, S) -> + actions(Acts, Dir, S); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid, @@ -598,8 +660,18 @@ transition({resolve_port, Pid}, #transport{socket = Sock, Pid ! portnr(M, Sock), ok; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> +%% Parent process has died: call the monitor to not close the socket +%% during an ongoing send, but don't let it take forever. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, + send = MPid}) -> + false == MPid + orelse (ok == gen_server:call(MPid, {stop, self()}, 1000)) + orelse exit(MPid, {shutdown, parent}), + stop; + +%% Monitor process has died. +transition({'DOWN', _, process, MPid, _}, #transport{send = MPid}) + when is_pid(MPid) -> stop. %% Crash on anything unexpected. @@ -623,11 +695,13 @@ tls_handshake(_, true, #transport{ssl = false}) -> %% Capabilities exchange negotiated TLS: upgrade the connection. tls_handshake(Type, true, #transport{socket = Sock, module = M, - ssl = Opts} + ssl = Opts, + send = MPid} = S) -> {ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]), Ref = getr(?REF_KEY), true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}), + false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process S#transport{socket = SSock, module = ssl}; @@ -646,24 +720,15 @@ tls(accept, Sock, Opts) -> %% using Nagle. %% Receive packets until a full message is received, -recv(Bin, #transport{frag = Head, throttled = false} = S) -> +recv(Bin, #transport{frag = Head} = S) -> case rcv(Head, Bin) of - {Msg, B} -> - throttle(S#transport{frag = B, throttled = Msg}); - Frag -> - setopts(S), - start_fragment_timer(S#transport{frag = Frag, - flush = false}) + {Msg, B} -> %% have a complete message ... + message(recv, Msg, S#transport{frag = B}); + Frag -> %% read more on the socket + start_fragment_timer(setopts(S#transport{frag = Frag, + flush = false})) end. -%% recv/1 - -recv(#transport{throttled = false} = S) -> - recv(<<>>, S); - -recv(#transport{} = S) -> - S. - %% rcv/2 %% No previous fragment. @@ -723,13 +788,16 @@ recv1(Len, Bin) -> <<Msg:Len/binary, Rest/binary>> = Bin, {Msg, Rest}. -%% bin/1-2 +%% bin/2 bin(Head, Acc) -> list_to_binary([Head | lists:reverse(Acc)]). +%% bin/1 + bin({_, _, Head, Acc}) -> bin(Head, Acc); + bin(Bin) when is_binary(Bin) -> Bin. @@ -748,9 +816,7 @@ bin(Bin) %% also eventually lead to watchdog failover. %% No fragment to flush or not receiving messages. -flush(#transport{frag = Frag, throttled = B} = S) - when Frag == <<>>; - B /= false -> +flush(#transport{frag = <<>>} = S) -> S; %% Messages have been received since last timer expiry. @@ -758,9 +824,8 @@ flush(#transport{flush = false} = S) -> start_fragment_timer(S#transport{flush = true}); %% No messages since last expiry. -flush(#transport{frag = Frag, parent = Pid} = S) -> - diameter_peer:recv(Pid, bin(Frag)), - S#transport{frag = <<>>}. +flush(#transport{frag = Frag} = S) -> + message(recv, bin(Frag), S#transport{frag = <<>>}). %% start_fragment_timer/1 %% @@ -793,9 +858,27 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(Bin, #transport{socket = Sock, - module = M}) -> - case send(M, Sock, Bin) of +send(Msg, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) -> + send1(M, Sock, Msg), + B andalso (TPid ! Msg); + +send(Msg, #transport{socket = Sock, module = M, send = false} = S) -> + send1(M, Sock, Msg), + message(ack, Msg, S); + +%% Send from the monitor process to avoid deadlock if both the +%% receiver and the peer were to block in send. +send(Msg, #transport{send = Pid} = S) -> + Pid ! Msg, + S. + +%% send1/3 + +send1(Mod, Sock, #diameter_packet{bin = Bin}) -> + send1(Mod, Sock, Bin); + +send1(Mod, Sock, Bin) -> + case send(Mod, Sock, Bin) of ok -> ok; {error, Reason} -> @@ -822,120 +905,19 @@ setopts(M, Sock, Opts) -> %% setopts/1 -setopts(#transport{socket = Sock, module = M}) -> - setopts(M, Sock). - -%% setopts/2 - -setopts(M, Sock) -> +setopts(#transport{socket = Sock, + active = A, + recv = B, + module = M} + = S) + when B, not A -> case setopts(M, Sock, [{active, once}]) of - ok -> ok; - X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect - end. - -%% throttle/1 - -%% Still collecting packets for a complete message: keep receiving. -throttle(#transport{throttled = false} = S) -> - recv(S); - -%% Decide whether to receive another, or whether to accept a message -%% that's been received. -throttle(#transport{throttle_cb = F, throttled = T} = S) -> - Res = cb(F, T), - - try throttle(Res, S) of - #transport{ssl = SB} = NS when is_boolean(SB) -> - throttle(defrag(NS)); - #transport{throttled = Msg} = NS when is_binary(Msg) -> - %% Initial incoming message when we might need to upgrade - %% to TLS: wait for reception of a tls tuple. - defrag(NS) - catch - #transport{} = NS -> - recv(NS) - end. - -%% cb/2 - -cb(false, _) -> - ok; - -cb(F, B) -> - diameter_lib:eval([F, true /= B andalso B]). - -%% throttle/2 - -%% Callback says to receive another message. -throttle(ok, #transport{throttled = true} = S) -> - throw(S#transport{throttled = false}); - -%% Callback says to accept a received message. -throttle(ok, #transport{parent = Pid, throttled = Msg} = S) - when is_binary(Msg) -> - diameter_peer:recv(Pid, Msg), - S; - -throttle({ok = T, F}, S) -> - throttle(T, S#transport{throttle_cb = F}); - -%% Callback says to accept a received message and acknowledged the -%% returned pid with a {request, Pid} message if a request pid is -%% spawned, a discard message otherwise. The latter does not mean that -%% the message was necessarily discarded: it could have been an -%% answer. -throttle(NPid, #transport{parent = Pid, throttled = Msg} = S) - when is_pid(NPid), is_binary(Msg) -> - diameter_peer:recv(Pid, {Msg, NPid}), - S; - -throttle({NPid, F}, #transport{throttled = Msg} = S) - when is_pid(NPid), is_binary(Msg) -> - throttle(NPid, S#transport{throttle_cb = F}); - -%% Callback to accept a received message says to discard it. -throttle(discard, #transport{throttled = Msg} = S) - when is_binary(Msg) -> - S; - -throttle({discard = T, F}, #transport{throttled = Msg} = S) - when is_binary(Msg) -> - throttle(T, S#transport{throttle_cb = F}); - -%% Callback to accept a received message says to answer it with the -%% supplied binary. -throttle(Bin, #transport{throttled = Msg} = S) - when is_binary(Bin), is_binary(Msg) -> - send(Bin, S), - S; - -throttle({Bin, F}, #transport{throttled = Msg} = S) - when is_binary(Bin), is_binary(Msg) -> - throttle(Bin, S#transport{throttle_cb = F}); - -%% Callback says to ask again in the specified number of milliseconds. -throttle({timeout, Tmo}, S) -> - erlang:send_after(Tmo, self(), throttle), - throw(S); - -throttle({timeout = T, Tmo, F}, S) -> - throttle({T, Tmo}, S#transport{throttle_cb = F}); - -throttle(T, #transport{throttle_cb = F}) -> - ?ERROR({invalid_return, T, F}). - -%% defrag/1 -%% -%% Try to extract another message from packets already read before -%% another throttling callback. + ok -> S#transport{active = true}; + X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect + end; -defrag(#transport{frag = Head} = S) -> - case rcv(Head, <<>>) of - {Msg, B} -> - S#transport{throttled = Msg, frag = B}; - _ -> - S#transport{throttled = true} - end. +setopts(S) -> + S. %% portnr/2 @@ -970,3 +952,80 @@ getstat(gen_tcp, Sock) -> getstat(M, Sock) -> M:getstat(Sock). %% Note that ssl:getstat/1 doesn't yet exist in R15B01. + +%% A message_cb is invoked whenever a message is sent or received, or +%% to provide acknowledgement of a completed send or discarded +%% request. Ignoring possible extra arguments, calls are of the +%% following form. +%% +%% cb(recv, Msg) Receive a message into diameter? +%% cb(send, Msg) Send a message on the socket? +%% cb(ack, Msg) Acknowledgement of a completed send. +%% cb(ack, false) Acknowledgement of a discarded request. +%% +%% Msg will be binary() in a recv callback, but can be a +%% diameter_packet record in a send/ack callback if a recv/send +%% callback returns a record. Callbacks return a list of the following +%% form. +%% +%% [boolean() | send | recv | binary() | #diameter_packet{}] +%% +%% The atoms are meaningless by themselves, but say whether subsequent +%% messages are to be sent or received. A boolean says whether or not +%% to continue reading on the socket. Messages can be received even +%% after false is returned if these arrived in the same packet. A +%% leading recv or send is implicit on the corresponding callbacks. A +%% new callback can be returned as the tail of a returned list: any +%% value not of the aforementioned list type is interpreted as a +%% callback. + +%% message/3 + +message(send, false = M, S) -> + message(ack, M, S); + +message(ack, _, #transport{message_cb = false} = S) -> + S; + +message(Dir, Msg, #transport{message_cb = CB} = S) -> + recv(<<>>, actions(cb(CB, Dir, Msg), Dir, S)). + +%% actions/3 + +actions([], _, S) -> + S; + +actions([B | As], Dir, S) + when is_boolean(B) -> + actions(As, Dir, S#transport{recv = B}); + +actions([Dir | As], _, S) + when Dir == send; + Dir == recv -> + actions(As, Dir, S); + +actions([Msg | As], send = Dir, S) + when is_binary(Msg); + is_record(Msg, diameter_packet) -> + actions(As, Dir, send(Msg, S)); + +actions([Msg | As], recv = Dir, #transport{parent = Pid} = S) + when is_binary(Msg); + is_record(Msg, diameter_packet) -> + diameter_peer:recv(Pid, Msg), + actions(As, Dir, S); + +actions([{defer, Tmo, Acts} | As], Dir, S) -> + erlang:send_after(Tmo, self(), {actions, Dir, Acts}), + actions(As, Dir, S); + +actions(CB, _, S) -> + S#transport{message_cb = CB}. + +%% cb/3 + +cb(false, _, Msg) -> + [Msg]; + +cb(CB, Dir, Msg) -> + diameter_lib:eval([CB, Dir, Msg]). |