diff options
author | Anders Svensson <[email protected]> | 2017-06-14 09:29:05 +0200 |
---|---|---|
committer | Anders Svensson <[email protected]> | 2017-06-14 09:29:05 +0200 |
commit | 1bf842f3cd603ddd6246d874e188e4f75b0cc692 (patch) | |
tree | d47b488ce7b0e6402241ac99ee4161e0ebffee6b /lib/diameter/src | |
parent | d4fea060349a72fb58267e82c2d6bfa7b638b2c9 (diff) | |
parent | 69c5a74179e13e145da3da70e02dd43881a82008 (diff) | |
download | otp-1bf842f3cd603ddd6246d874e188e4f75b0cc692.tar.gz otp-1bf842f3cd603ddd6246d874e188e4f75b0cc692.tar.bz2 otp-1bf842f3cd603ddd6246d874e188e4f75b0cc692.zip |
Merge branch 'anders/diameter/transport/ERL-332'
* anders/diameter/transport/ERL-332: (35 commits)
Capitulate on SCTP vs sparc-sun-solaris2.10
Remove obsolete traffic testcase
Fix dialyzer warnings
Remove client/server string decode from traffic suite
Add diameter_sctp option packet
Add diameter_sctp send/recv callbacks
Let diameter_tcp send/recv callbacks deal in diameter_packet
Randomly select traffic testcases
Exercise diameter_tcp message callbacks in traffic suite
Exercise diameter_{tcp,sctp} sender in traffic suite
Remove upgrade from diameter_traffic
Add diameter_tcp send/recv callbacks
Make diameter_{tcp,sctp} sender configurable
Remove upgrade from diameter_sctp; tweak diameter_tcp to match
Fix incomprehensible dialyzer warning
Simplify acks to transport processes
Strip throttling callbacks from diameter_tcp
Deal with (another) SCTP association id quirk on Solaris
Use binary:copy/2 when generating largish data in test suites
Deal with SCTP association id quirk on Solaris
...
Diffstat (limited to 'lib/diameter/src')
-rw-r--r-- | lib/diameter/src/base/diameter_config.erl | 2 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_peer_fsm.erl | 146 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_reg.erl | 2 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_traffic.erl | 84 | ||||
-rw-r--r-- | lib/diameter/src/base/diameter_watchdog.erl | 39 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp.erl | 319 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_sctp_sup.erl | 3 | ||||
-rw-r--r-- | lib/diameter/src/transport/diameter_tcp.erl | 491 |
8 files changed, 665 insertions, 421 deletions
diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index e10804c931..cea671f275 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -277,7 +277,7 @@ start_link() -> start_link(T) -> proc_lib:start_link(?MODULE, init, [T], infinity, []). - + state() -> call(state). diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl index 46d231da74..601e48e817 100644 --- a/lib/diameter/src/base/diameter_peer_fsm.erl +++ b/lib/diameter/src/base/diameter_peer_fsm.erl @@ -129,6 +129,7 @@ %% the request was sent explicitly with %% diameter:call/4. strict :: boolean(), + ack = false :: boolean(), length_errors :: exit | handle | discard, incoming_maxlen :: integer() | infinity}). @@ -235,7 +236,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> Tmo = proplists:get_value(capx_timeout, Opts, ?CAPX_TIMEOUT), Strictness = proplists:get_value(capx_strictness, Opts, true), - OnLengthErr = proplists:get_value(length_errors, Opts, exit), + LengthErr = proplists:get_value(length_errors, Opts, exit), {TPid, Addrs} = start_transport(T, Rest, Svc), @@ -247,7 +248,7 @@ i({Ack, WPid, {M, Ref} = T, Opts, {SvcOpts, Nodes, Dict0, Svc}}) -> dictionary = Dict0, mode = M, service = svc(Svc, Addrs), - length_errors = OnLengthErr, + length_errors = LengthErr, strict = Strictness, incoming_maxlen = Maxlen}. %% The transport returns its local ip addresses so that different @@ -442,9 +443,18 @@ transition({connection_timeout = T, TPid}, transition({connection_timeout, _}, _) -> ok; +%% Requests for acknowledgements to the transport. +transition({diameter, ack}, S) -> + S#state{ack = true}; + %% Incoming message from the transport. -transition({diameter, {recv, MsgT}}, S) -> - incoming(MsgT, S); +transition({diameter, {recv, Msg}}, S) -> + incoming(recv(Msg, S), S); + +%% Handler of an incoming request is telling of its existence. +transition({handler, Pid}, _) -> + put_route(Pid), + ok; %% Timeout when still in the same state ... transition({timeout = T, PS}, #state{state = PS}) -> @@ -458,7 +468,7 @@ transition({timeout, _}, _) -> transition({send, Msg}, S) -> outgoing(Msg, S); transition({send, Msg, Route}, S) -> - put_route(Route), + route_outgoing(Route), outgoing(Msg, S); %% Request for graceful shutdown at remove_transport, stop_service of @@ -487,12 +497,13 @@ transition({'DOWN', _, process, WPid, _}, transition({'DOWN', _, process, TPid, _}, #state{transport = TPid} = S) -> - start_next(S); + start_next(S#state{ack = false}); %% Transport has died after connection timeout, or handler process has %% died. -transition({'DOWN', _, process, Pid, _}, _) -> - erase_route(Pid), +transition({'DOWN', _, process, Pid, _}, #state{transport = TPid}) -> + is_reference(erase_route(Pid)) + andalso send(TPid, false), %% answer not forthcoming ok; %% State query. @@ -502,37 +513,56 @@ transition({state, Pid}, #state{state = S, transport = TPid}) -> %% Crash on anything unexpected. -%% put_route/1 -%% +%% route_outgoing/1 + %% Map identifiers in an outgoing request to be able to lookup the %% handler process when the answer is received. - -put_route({Pid, Ref, Seqs}) -> +route_outgoing({Pid, Ref, Seqs}) -> %% request MRef = monitor(process, Pid), put(Pid, Seqs), - put(Seqs, {Pid, Ref, MRef}). + put(Seqs, {Pid, Ref, MRef}); + +%% Remove a mapping made for an incoming request. +route_outgoing(Pid) + when is_pid(Pid) -> %% answer + MRef = erase_route(Pid), + undefined == MRef orelse demonitor(MRef). -%% get_route/1 +%% put_route/1 + +%% Monitor on a handler process for an incoming request. +put_route(Pid) -> + MRef = monitor(process, Pid), + put(Pid, MRef). -get_route(#diameter_packet{header = #diameter_header{is_request = false}} - = Pkt) -> +%% get_route/2 + +%% incoming answer +get_route(_, #diameter_packet{header = #diameter_header{is_request = false}} + = Pkt) -> Seqs = diameter_codec:sequence_numbers(Pkt), case erase(Seqs) of {Pid, Ref, MRef} -> demonitor(MRef), erase(Pid), {Pid, Ref, self()}; - undefined -> + undefined -> %% request unknown false end; -get_route(_) -> - false. +%% incoming request +get_route(Ack, _) -> + Ack. %% erase_route/1 erase_route(Pid) -> - erase(erase(Pid)). + case erase(Pid) of + {_,_} = Seqs -> + erase(Seqs); + T -> + T + end. %% capx/1 @@ -611,29 +641,24 @@ encode(Rec, Dict) -> %% incoming/2 -incoming({Msg, NPid}, S) -> - try recv(Msg, S) of - T -> - NPid ! {diameter, discard}, - T - catch - {?MODULE, Name, Pkt} -> - incoming(Name, Pkt, NPid, S) - end; +incoming({recv = T, Name, Pkt}, #state{parent = Pid, ack = Ack} = S) -> + Pid ! {T, self(), get_route(Ack, Pkt), Name, Pkt}, + rcv(Name, Pkt, S); -incoming(Msg, S) -> - try - recv(Msg, S) - catch - {?MODULE, Name, Pkt} -> - incoming(Name, Pkt, false, S) - end. +incoming(#diameter_header{is_request = R}, #state{transport = TPid, + ack = Ack}) -> + R andalso Ack andalso send(TPid, false), + ok; + +incoming(<<_:32, 1:1, _/bits>>, #state{ack = true} = S) -> + send(S#state.transport, false), + ok; -%% incoming/4 +incoming(<<_/bits>>, _) -> + ok; -incoming(Name, Pkt, NPid, #state{parent = Pid} = S) -> - Pid ! {recv, self(), get_route(Pkt), Name, Pkt, NPid}, - rcv(Name, Pkt, S). +incoming(T, _) -> + T. %% recv/2 @@ -658,18 +683,19 @@ recv1(_, #diameter_packet{header = H, bin = Bin}, #state{incoming_maxlen = M}) when M < size(Bin) -> - invalid(false, incoming_maxlen_exceeded, {size(Bin), H}); + invalid(false, incoming_maxlen_exceeded, {size(Bin), H}), + H; %% Ignore anything but an expected CER/CEA if so configured. This is %% non-standard behaviour. -recv1(Name, _, #state{state = {'Wait-CEA', _, _}, - strict = false}) +recv1(Name, #diameter_packet{header = H}, #state{state = {'Wait-CEA', _, _}, + strict = false}) when Name /= 'CEA' -> - ok; -recv1(Name, _, #state{state = recv_CER, - strict = false}) + H; +recv1(Name, #diameter_packet{header = H}, #state{state = recv_CER, + strict = false}) when Name /= 'CER' -> - ok; + H; %% Incoming request after outgoing DPR: discard. Don't discard DPR, so %% both ends don't do so when sending simultaneously. @@ -677,13 +703,15 @@ recv1(Name, #diameter_packet{header = #diameter_header{is_request = true} = H}, #state{dpr = {_,_,_}}) when Name /= 'DPR' -> - invalid(false, recv_after_outgoing_dpr, H); + invalid(false, recv_after_outgoing_dpr, H), + H; %% Incoming request after incoming DPR: discard. recv1(_, #diameter_packet{header = #diameter_header{is_request = true} = H}, #state{dpr = true}) -> - invalid(false, recv_after_incoming_dpr, H); + invalid(false, recv_after_incoming_dpr, H), + H; %% DPA with identifier mismatch, or in response to a DPR initiated by %% the service. @@ -701,7 +729,7 @@ recv1('DPA' = N, %% Any other message with a header and no length errors: send to the %% parent. recv1(Name, Pkt, #state{}) -> - throw({?MODULE, Name, Pkt}). + {recv, Name, Pkt}. %% recv/3 @@ -720,10 +748,12 @@ recv(#diameter_header{} #diameter_packet{bin = Bin}, #state{length_errors = E}) -> T = {size(Bin), bit_size(Bin) rem 8, H}, - invalid(E, message_length_mismatch, T); + invalid(E, message_length_mismatch, T), + Bin; recv(false, #diameter_packet{bin = Bin}, #state{length_errors = E}) -> - invalid(E, truncated_header, Bin). + invalid(E, truncated_header, Bin), + Bin. %% Note that counters here only count discarded messages. invalid(E, Reason, T) -> @@ -779,14 +809,10 @@ rcv('DPA' = N, diameter_peer:close(TPid), {stop, N}; -%% Ignore anything else, an unsolicited DPA in particular. Note that -%% dpa_timeout deals with the case in which the peer sends the wrong -%% identifiers in DPA. -rcv(N, #diameter_packet{header = H}, _) - when N == 'CER'; - N == 'CEA'; - N == 'DPR'; - N == 'DPA' -> +%% Ignore an unsolicited DPA in particular. Note that dpa_timeout +%% deals with the case in which the peer sends the wrong identifiers +%% in DPA. +rcv('DPA' = N, #diameter_packet{header = H}, _) -> ?LOG(ignored, N), %% Note that these aren't counted in the normal recv counter. diameter_stats:incr({diameter_codec:msg_id(H), recv, ignored}), diff --git a/lib/diameter/src/base/diameter_reg.erl b/lib/diameter/src/base/diameter_reg.erl index 9027130063..4910979219 100644 --- a/lib/diameter/src/base/diameter_reg.erl +++ b/lib/diameter/src/base/diameter_reg.erl @@ -137,7 +137,7 @@ match(Pat) -> match(Pat, Pid) -> ets:match_object(?TABLE, {Pat, Pid}). - + %% =========================================================================== %% # wait(Pat) %% diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index bc1ccf4feb..ccfab22e9c 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -30,7 +30,7 @@ -export([send_request/4]). %% towards diameter_watchdog --export([receive_message/6]). +-export([receive_message/5]). %% towards diameter_peer_fsm and diameter_watchdog -export([incr/4, @@ -54,9 +54,6 @@ -define(RELAY, ?DIAMETER_DICT_RELAY). -define(BASE, ?DIAMETER_DICT_COMMON). %% Note: the RFC 3588 dictionary --define(DEFAULT_TIMEOUT, 5000). %% for outgoing requests --define(DEFAULT_SPAWN_OPTS, []). - %% Table containing outgoing entries that live and die with %% peer_up/down. The name is historic, since the table used to contain %% information about outgoing requests for which an answer has yet to @@ -67,7 +64,7 @@ -record(options, {filter = none :: diameter:peer_filter(), extra = [] :: list(), - timeout = ?DEFAULT_TIMEOUT :: 0..16#FFFFFFFF, + timeout = 5000 :: 0..16#FFFFFFFF, %% for outgoing requests detach = false :: boolean()}). %% Term passed back to receive_message/6 with every incoming message. @@ -93,7 +90,7 @@ packet :: #diameter_packet{} | undefined}). %% of request %% --------------------------------------------------------------------------- -%% # make_recvdata/1 +%% make_recvdata/1 %% --------------------------------------------------------------------------- make_recvdata([SvcName, PeerT, Apps, SvcOpts | _]) -> @@ -206,42 +203,38 @@ incr_rc(Dir, Pkt, TPid, Dict0) -> incr_rc(Dir, Pkt, TPid, {Dict0, Dict0, Dict0}). %% --------------------------------------------------------------------------- -%% # receive_message/6 +%% receive_message/5 %% -%% Handle an incoming Diameter message. +%% Handle an incoming Diameter message in a watchdog process. %% --------------------------------------------------------------------------- -%% Handle an incoming Diameter message in the watchdog process. - -receive_message(TPid, Route, Pkt, false, Dict0, RecvData) -> - incoming(TPid, Route, Pkt, Dict0, RecvData); - -receive_message(TPid, Route, Pkt, NPid, Dict0, RecvData) -> - NPid ! {diameter, incoming(TPid, Route, Pkt, Dict0, RecvData)}. - -%% incoming/4 - -incoming(TPid, Route, Pkt, Dict0, RecvData) - when is_pid(TPid) -> +-spec receive_message(pid(), Route, #diameter_packet{}, module(), RecvData) + -> pid() + | boolean() + when Route :: {Handler, RequestRef, Seqs} + | Ack, + RecvData :: {[SpawnOpt], #recvdata{}}, + SpawnOpt :: term(), + Handler :: pid(), + RequestRef :: reference(), + Seqs :: {0..16#FFFFFFFF, 0..16#FFFFFFFF}, + Ack :: boolean(). + +receive_message(TPid, Route, Pkt, Dict0, RecvData) -> #diameter_packet{header = #diameter_header{is_request = R}} = Pkt, recv(R, Route, TPid, Pkt, Dict0, RecvData). %% recv/6 %% Incoming request ... -recv(true, false, TPid, Pkt, Dict0, T) -> - try - {request, spawn_request(TPid, Pkt, Dict0, T)} - catch - error: system_limit = E -> %% discard - ?LOG(error, E), - discard - end; +recv(true, Ack, TPid, Pkt, Dict0, T) + when is_boolean(Ack) -> + spawn_request(Ack, TPid, Pkt, Dict0, T); %% ... answer to known request ... recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) -> Pid ! {answer, Ref, TPid, Dict0, Pkt}, - {answer, Pid}; + true; %% Note that failover could have happened prior to this message being %% received and triggering failback. That is, both a failover message @@ -256,23 +249,22 @@ recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) -> recv(false, false, TPid, Pkt, _, _) -> ?LOG(discarded, Pkt#diameter_packet.header), incr(TPid, {{unknown, 0}, recv, discarded}), - discard. - -%% spawn_request/4 + false. -spawn_request(TPid, Pkt, Dict0, {Opts, RecvData}) -> - spawn_request(TPid, Pkt, Dict0, Opts, RecvData); -spawn_request(TPid, Pkt, Dict0, RecvData) -> - spawn_request(TPid, Pkt, Dict0, ?DEFAULT_SPAWN_OPTS, RecvData). +%% spawn_request/5 -spawn_request(TPid, Pkt, Dict0, Opts, RecvData) -> - spawn_opt(fun() -> recv_request(TPid, Pkt, Dict0, RecvData) end, Opts). +spawn_request(Ack, TPid, Pkt, Dict0, {Opts, RecvData}) -> + spawn_opt(fun() -> + recv_request(Ack, TPid, Pkt, Dict0, RecvData) + end, + Opts). %% --------------------------------------------------------------------------- -%% recv_request/4 +%% recv_request/5 %% --------------------------------------------------------------------------- -recv_request(TPid, +recv_request(Ack, + TPid, #diameter_packet{header = #diameter_header{application_id = Id}} = Pkt, Dict0, @@ -280,6 +272,7 @@ recv_request(TPid, apps = Apps, codec = Opts} = RecvData) -> + Ack andalso (TPid ! {handler, self()}), diameter_codec:setopts([{common_dictionary, Dict0} | Opts]), send_A(recv_R(diameter_service:find_incoming_app(PeerT, TPid, Id, Apps), TPid, @@ -511,7 +504,7 @@ send_A(T, TPid, {AppDict, Dict0} = DictT0, ReqPkt, EvalPktFs, EvalFs) -> {MsgDict, Pkt} = reply(T, TPid, DictT0, EvalPktFs, ReqPkt), incr(send, Pkt, TPid, AppDict), incr_rc(send, Pkt, TPid, {MsgDict, AppDict, Dict0}), %% count outgoing - send(TPid, Pkt), + send(TPid, Pkt, _Route = self()), lists:foreach(fun diameter_lib:eval/1, EvalFs). %% answer/6 @@ -1207,7 +1200,7 @@ x(T) -> exit(T). %% --------------------------------------------------------------------------- -%% # send_request/4 +%% send_request/4 %% %% Handle an outgoing Diameter request. %% --------------------------------------------------------------------------- @@ -1296,7 +1289,7 @@ mo(T, _) -> ?ERROR({invalid_option, T}). %% --------------------------------------------------------------------------- -%% # send_request/6 +%% send_request/6 %% --------------------------------------------------------------------------- %% Send an outgoing request in its dedicated process. @@ -1745,11 +1738,6 @@ recv(TPid, Pid, TRef, {LocalTRef, MRef}) -> exit({timeout, LocalTRef, TPid} = T) end. -%% send/2 - -send(Pid, Pkt) -> - Pid ! {send, Pkt}. - %% send/3 send(Pid, Pkt, Route) -> diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl index f28b8f2910..a2eb661870 100644 --- a/lib/diameter/src/base/diameter_watchdog.erl +++ b/lib/diameter/src/base/diameter_watchdog.erl @@ -283,7 +283,7 @@ event(Msg, ?LOG(transition, {From, To}). data(Msg, TPid, reopen, okay) -> - {recv, TPid, false, 'DWA', _Pkt, _NPid} = Msg, %% assert + {recv, TPid, _, 'DWA', _Pkt} = Msg, %% assert {TPid, T} = eraser(open), [T]; @@ -302,6 +302,8 @@ tpid(_, Pid) tpid(Pid, _) -> Pid. +%% send/2 + send(Pid, T) -> Pid ! T. @@ -447,14 +449,15 @@ transition({'DOWN', _, process, TPid, _Reason} = D, end; %% Incoming message. -transition({recv, TPid, Route, Name, Pkt, NPid}, +transition({recv, TPid, Route, Name, Pkt}, #watchdog{transport = TPid} = S) -> - try - incoming(Name, Pkt, NPid, S) - catch + try incoming(Route, Name, Pkt, S) of #watchdog{dictionary = Dict0, receive_data = T} = NS -> - diameter_traffic:receive_message(TPid, Route, Pkt, NPid, Dict0, T), + diameter_traffic:receive_message(TPid, Route, Pkt, Dict0, T), + NS + catch + #watchdog{} = NS -> NS end; @@ -586,25 +589,13 @@ send_watchdog(#watchdog{pending = false, %% incoming/4 -incoming(Name, Pkt, false, S) -> - recv(Name, Pkt, S); - -incoming(Name, Pkt, NPid, S) -> - try - recv(Name, Pkt, S) - after - NPid ! {diameter, discard} - end. - -%% recv/3 - -recv(Name, Pkt, S) -> - try rcv(Name, Pkt, rcv(Name, S)) of - #watchdog{} = NS -> - throw(NS) +incoming(Route, Name, Pkt, S) -> + try rcv(Name, S) of + NS -> rcv(Name, Pkt, NS) catch - #watchdog{} = NS -> %% throwaway - NS + #watchdog{transport = TPid} = NS when Route -> %% incoming request + send(TPid, {send, false}), %% requiring ack + throw(NS) end. %% rcv/3 diff --git a/lib/diameter/src/transport/diameter_sctp.erl b/lib/diameter/src/transport/diameter_sctp.erl index 76aacabcb8..6a9f1f940b 100644 --- a/lib/diameter/src/transport/diameter_sctp.erl +++ b/lib/diameter/src/transport/diameter_sctp.erl @@ -52,21 +52,20 @@ %% Keys into process dictionary. -define(INFO_KEY, info). -define(REF_KEY, ref). +-define(TRANSPORT_KEY, transport). -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). %% The default port for a listener. -define(DEFAULT_PORT, 3868). %% RFC 3588, ch 2.1 -%% Remote addresses to accept connections from. --define(DEFAULT_ACCEPT, []). %% any - %% How long to wait for a transport process to attach after %% association establishment. -define(ACCEPT_TIMEOUT, 5000). -type connect_option() :: {raddr, inet:ip_address()} | {rport, inet:port_number()} + | option() | term(). %% gen_sctp:open_option(). -type match() :: inet:ip_address() @@ -74,8 +73,14 @@ | [match()]. -type listen_option() :: {accept, match()} + | option() | term(). %% gen_sctp:open_option(). +-type option() :: {sender, boolean()} + | sender + | {packet, boolean() | raw} + | {message_cb, false | diameter:evaluable()}. + -type uint() :: non_neg_integer(). %% Accepting/connecting transport process state. @@ -87,20 +92,35 @@ %% {RAs, RP, Errors} | connect, socket :: gen_sctp:sctp_socket() | undefined, - assoc_id :: gen_sctp:assoc_id(), %% association identifier + active = false :: boolean(), %% is socket active? + recv = true :: boolean(), %% should it be active? + assoc_id :: gen_sctp:assoc_id() %% association identifier + | undefined + | true, peer :: {[inet:ip_address()], uint()} %% {RAs, RP} | undefined, streams :: {uint(), uint()} %% {InStream, OutStream} counts | undefined, - os = 0 :: uint()}). %% next output stream + os = 0 :: uint(), %% next output stream + packet = true :: boolean() %% legacy transport_data? + | raw, + message_cb = false :: false | diameter:evaluable(), + send = false :: pid() | boolean()}). %% sending process + +%% Monitor process state. +-record(monitor, + {transport :: pid(), + ack = false :: boolean(), + socket :: gen_sctp:sctp_socket(), + assoc_id :: gen_sctp:assoc_id()}). %% next output stream %% Listener process state. -record(listener, {ref :: reference(), socket :: gen_sctp:sctp_socket(), - service = false :: false | pid(), %% service process + service :: pid(), %% service process pending = {0, queue:new()}, - accept :: [match()]}). + opts :: [[match()] | boolean() | diameter:evaluable()]}). %% Field pending implements two queues: the first of transport-to-be %% processes to which an association has been assigned but for which %% diameter hasn't yet spawned a transport process, a short-lived @@ -132,11 +152,11 @@ start(T, Svc, Opts) when is_list(Opts) -> #diameter_service{capabilities = Caps, - pid = SPid} + pid = Pid} = Svc, diameter_sctp_sup:start(), %% start supervisors on demand Addrs = Caps#diameter_caps.host_ip_address, - s(T, Addrs, SPid, lists:map(fun ip/1, Opts)). + s(T, Addrs, Pid, lists:map(fun ip/1, Opts)). ip({ifaddr, A}) -> {ip, A}; @@ -147,9 +167,9 @@ ip(T) -> %% when there is not yet an association to assign it, or at comm_up on %% a new association in which case the call retrieves a transport from %% the pending queue. -s({accept, Ref} = A, Addrs, SPid, Opts) -> - {ok, LPid, LAs} = listener(Ref, {Opts, Addrs}), - try gen_server:call(LPid, {A, self(), SPid}, infinity) of +s({accept, Ref} = A, Addrs, SvcPid, Opts) -> + {ok, LPid, LAs} = listener(Ref, {Opts, SvcPid, Addrs}), + try gen_server:call(LPid, {A, self()}, infinity) of {ok, TPid} -> {ok, TPid, LAs}; No -> @@ -162,7 +182,7 @@ s({accept, Ref} = A, Addrs, SPid, Opts) -> %% gen_sctp in order to be able to accept a new association only %% *after* an accepting transport has been spawned. -s({connect = C, Ref}, Addrs, _SPid, Opts) -> +s({connect = C, Ref}, Addrs, _SvcPid, Opts) -> diameter_sctp_sup:start_child({C, self(), Opts, Addrs, Ref}). %% start_link/1 @@ -216,22 +236,39 @@ init(T) -> %% i/1 +i(#monitor{transport = TPid} = S) -> + monitor(process, TPid), + putr(?TRANSPORT_KEY, TPid), + proc_lib:init_ack({ok, self()}), + S; + %% A process owning a listening socket. -i({listen, Ref, {Opts, Addrs}}) -> +i({listen, Ref, {Opts, SvcPid, Addrs}}) -> + monitor(process, SvcPid), [_] = diameter_config:subscribe(Ref, transport), %% assert existence - {[Matches], Rest} = proplists:split(Opts, [accept]), + {Split, Rest} + = proplists:split(Opts, [accept, packet, sender, message_cb]), + OwnOpts = lists:append(Split), {LAs, Sock} = AS = open(Addrs, Rest, ?DEFAULT_PORT), ok = gen_sctp:listen(Sock, true), true = diameter_reg:add_new({?MODULE, listener, {Ref, AS}}), proc_lib:init_ack({ok, self(), LAs}), #listener{ref = Ref, + service = SvcPid, socket = Sock, - accept = [[M] || {accept, M} <- Matches]}; + opts = [[[M] || {accept, M} <- OwnOpts], + proplists:get_value(packet, OwnOpts, true) + | [proplists:get_value(K, OwnOpts, false) + || K <- [sender, message_cb]]]}; %% A connecting transport. i({connect, Pid, Opts, Addrs, Ref}) -> - {[As, Ps], Rest} = proplists:split(Opts, [raddr, rport]), - RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- As], + {[Ps | Split], Rest} + = proplists:split(Opts, [rport, raddr, packet, sender, message_cb]), + OwnOpts = lists:append(Split), + CB = proplists:get_value(message_cb, OwnOpts, false), + false == CB orelse (Pid ! {diameter, ack}), + RAs = [diameter_lib:ipaddr(A) || {raddr, A} <- OwnOpts], [RP] = [P || {rport, P} <- Ps] ++ [P || P <- [?DEFAULT_PORT], [] == Ps], {LAs, Sock} = open(Addrs, Rest, 0), putr(?REF_KEY, Ref), @@ -239,7 +276,10 @@ i({connect, Pid, Opts, Addrs, Ref}) -> monitor(process, Pid), #transport{parent = Pid, mode = {connect, connect(Sock, RAs, RP, [])}, - socket = Sock}; + socket = Sock, + message_cb = CB, + packet = proplists:get_value(packet, OwnOpts, true), + send = proplists:get_value(sender, OwnOpts, false)}; %% An accepting transport spawned by diameter, not yet owning an %% association. @@ -273,11 +313,16 @@ i({K, Ref}, #transport{mode = {accept, _}} = S) -> receive {Ref, Pid} when K == parent -> %% transport process started S#transport{parent = Pid}; - {K, T, Matches} when K == peeloff -> %% association + {K, T, Opts} when K == peeloff -> %% association {sctp, Sock, _RA, _RP, _Data} = T, + [Matches, Packet, Sender, CB] = Opts, ok = accept_peer(Sock, Matches), demonitor(Ref, [flush]), - t(T, S#transport{socket = Sock}); + false == CB orelse (S#transport.parent ! {diameter, ack}), + t(T, S#transport{socket = Sock, + message_cb = CB, + packet = Packet, + send = Sender}); accept_timeout = T -> x(T); {'DOWN', _, process, _, _} = T -> @@ -374,13 +419,9 @@ handle_call({{accept, Ref}, Pid}, _, #listener{ref = Ref} = S) -> {TPid, NewS} = accept(Ref, Pid, S), {reply, {ok, TPid}, NewS}; -handle_call({{accept, _} = T, Pid, SPid}, From, #listener{service = P} = S) -> - handle_call({T, Pid}, From, if not is_pid(P), is_pid(SPid) -> - monitor(process, SPid), - S#listener{service = SPid}; - true -> - S - end); +%% Transport is telling us of parent death. +handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> + {stop, {shutdown, Reason}, ok, S}; handle_call(_, _, State) -> {reply, nok, State}. @@ -400,7 +441,11 @@ handle_info(T, #transport{} = S) -> {noreply, #transport{} = t(T,S)}; handle_info(T, #listener{} = S) -> - {noreply, #listener{} = l(T,S)}. + {noreply, #listener{} = l(T,S)}; + +handle_info(T, #monitor{} = S) -> + m(T,S), + {noreply, S}. %% Prior to the possibility of setting pool_size on in transport %% configuration, a new accepting transport was only started following @@ -422,6 +467,9 @@ code_change(_, State, _) -> %% # terminate/2 %% --------------------------------------------------------------------------- +terminate(_, #monitor{}) -> + ok; + terminate(_, #transport{assoc_id = undefined}) -> ok; @@ -445,11 +493,11 @@ getr(Key) -> %% Incoming message from SCTP. l({sctp, Sock, _RA, _RP, Data} = T, #listener{socket = Sock, - accept = Matches} + opts = Opts} = S) -> Id = assoc_id(Data), {TPid, NewS} = accept(S), - TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Matches}, + TPid ! {peeloff, setelement(2, T, peeloff(Sock, Id, TPid)), Opts}, setopts(Sock), NewS; @@ -503,12 +551,21 @@ t(T,S) -> %% Incoming message. transition({sctp, Sock, _RA, _RP, Data}, #transport{socket = Sock} = S) -> - setopts(Sock), - recv(Data, S); + setopts(S, recv(Data, S#transport{active = false})); %% Outgoing message. transition({diameter, {send, Msg}}, S) -> - send(Msg, S); + message(send, Msg, S); + +%% Monitor has sent an outgoing message. +transition(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + message(ack, Msg, S); + +%% Deferred actions from a message_cb. +transition({actions, Dir, Acts}, S) -> + actions(Acts, Dir, S); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> @@ -522,8 +579,18 @@ transition({diameter, {close, Pid}}, #transport{parent = Pid}) -> transition({diameter, {tls, _Ref, _Type, _Bool}}, _) -> stop; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> +%% Parent process has died: call the monitor to not close the socket +%% during an ongoing send, but don't let it take forever. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, + send = MPid}) -> + is_boolean(MPid) + orelse ok == (catch gen_server:call(MPid, {stop, Pid})) + orelse exit(MPid, kill), + stop; + +%% Monitor process has died. +transition({'DOWN', _, process, MPid, _}, #transport{send = MPid}) + when is_pid(MPid) -> stop; %% Timeout after transport process has been started. @@ -536,6 +603,18 @@ transition({resolve_port, Pid}, #transport{socket = Sock}) Pid ! inet:port(Sock), ok. +%% m/2 + +m({Msg, StreamId}, #monitor{socket = Sock, + transport = TPid, + assoc_id = AId, + ack = B}) -> + send(Sock, AId, StreamId, Msg), + B andalso (TPid ! Msg); + +m({'DOWN', _, process, TPid, _} = T, #monitor{transport = TPid}) -> + x(T). + %% Crash on anything unexpected. ok({ok, T}) -> @@ -578,33 +657,52 @@ q(Ref, Pid, #listener{pending = {_,Q}}) -> %% send/2 +%% Start monitor process on first send. +send(Msg, #transport{send = true, + socket = Sock, + assoc_id = AId, + message_cb = CB} + = S) -> + {ok, MPid} = diameter_sctp_sup:start_child(#monitor{transport = self(), + socket = Sock, + assoc_id = AId, + ack = false /= CB}), + monitor(process, MPid), + send(Msg, S#transport{send = MPid}); + %% Outbound Diameter message on a specified stream ... -send(#diameter_packet{bin = Bin, transport_data = {outstream, SId}}, +send(#diameter_packet{transport_data = {outstream, SId}} + = Msg, #transport{streams = {_, OS}} = S) -> - send(SId rem OS, Bin, S), - S; + send(SId rem OS, Msg, S); %% ... or not: rotate through all streams. -send(#diameter_packet{bin = Bin}, S) -> - send(Bin, S); -send(Bin, #transport{streams = {_, OS}, +send(Msg, #transport{streams = {_, OS}, os = N} - = S) - when is_binary(Bin) -> - send(N, Bin, S), - S#transport{os = (N + 1) rem OS}. + = S) -> + send(N, Msg, S#transport{os = (N + 1) rem OS}). %% send/3 -send(StreamId, Bin, #transport{socket = Sock, - assoc_id = AId}) -> - send(Sock, AId, StreamId, Bin). +send(StreamId, Msg, #transport{send = false, + socket = Sock, + assoc_id = AId} + = S) -> + send(Sock, AId, StreamId, Msg), + message(ack, Msg, S); + +send(StreamId, Msg, #transport{send = MPid} = S) -> + MPid ! {Msg, StreamId}, + S. %% send/4 -send(Sock, AssocId, Stream, Bin) -> - case gen_sctp:send(Sock, AssocId, Stream, Bin) of +send(Sock, AssocId, StreamId, #diameter_packet{bin = Bin}) -> + send(Sock, AssocId, StreamId, Bin); + +send(Sock, AssocId, StreamId, Bin) -> + case gen_sctp:send(Sock, AssocId, StreamId, Bin) of ok -> ok; {error, Reason} -> @@ -624,7 +722,9 @@ recv({_, #sctp_assoc_change{state = comm_up, = S) -> Ref = getr(?REF_KEY), publish(T, Ref, Id, Sock), - up(S#transport{assoc_id = Id, + %% Deal with different association id after peeloff on Solaris by + %% taking the id from the first reception. + up(S#transport{assoc_id = T == accept orelse Id, streams = {IS, OS}}); %% ... or not: try the next address. @@ -639,17 +739,19 @@ recv({_, #sctp_assoc_change{} = E}, recv({_, #sctp_assoc_change{}}, _) -> stop; +%% First inbound on an accepting transport. +recv({[#sctp_sndrcvinfo{assoc_id = Id}], _Bin} + = T, + #transport{assoc_id = true} + = S) -> + recv(T, S#transport{assoc_id = Id}); + %% Inbound Diameter message. -recv({[#sctp_sndrcvinfo{stream = Id}], Bin}, #transport{parent = Pid}) +recv({[#sctp_sndrcvinfo{}], Bin} = Msg, S) when is_binary(Bin) -> - diameter_peer:recv(Pid, #diameter_packet{transport_data = {stream, Id}, - bin = Bin}), - ok; + message(recv, Msg, S); -recv({_, #sctp_shutdown_event{assoc_id = A}}, - #transport{assoc_id = Id}) - when A == Id; - A == 0 -> +recv({_, #sctp_shutdown_event{}}, _) -> stop; %% Note that diameter_sctp(3) documents that sctp_events cannot be @@ -765,6 +867,23 @@ connect(Sock, [Addr | AT] = As, Port, Reasons) -> connect(Sock, AT, Port, [{Addr, E} | Reasons]) end. +%% setopts/2 + +setopts(_, #transport{socket = Sock, + active = A, + recv = B} + = S) + when B, not A -> + setopts(Sock), + S#transport{active = true}; + +setopts(_, #transport{} = S) -> + S; + +setopts(#transport{socket = Sock}, T) -> + setopts(Sock), + T. + %% setopts/1 setopts(Sock) -> @@ -772,3 +891,83 @@ setopts(Sock) -> ok -> ok; X -> x({setopts, Sock, X}) %% possibly on peer disconnect end. + +%% A message_cb is invoked whenever a message is sent or received, or +%% to provide acknowledgement of a completed send or discarded +%% request. See diameter_tcp for semantics, the only difference being +%% that a recv callback can get a diameter_packet record as Msg +%% depending on how/if option packet has been specified. + +%% message/3 + +message(send, false = M, S) -> + message(ack, M, S); + +message(ack, _, #transport{message_cb = false} = S) -> + S; + +message(Dir, Msg, S) -> + setopts(S, actions(cb(S, Dir, Msg), Dir, S)). + +%% actions/3 + +actions([], _, S) -> + S; + +actions([B | As], Dir, S) + when is_boolean(B) -> + actions(As, Dir, S#transport{recv = B}); + +actions([Dir | As], _, S) + when Dir == send; + Dir == recv -> + actions(As, Dir, S); + +actions([Msg | As], send = Dir, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + actions(As, Dir, send(Msg, S)); + +actions([Msg | As], recv = Dir, #transport{parent = Pid} = S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + diameter_peer:recv(Pid, Msg), + actions(As, Dir, S); + +actions([{defer, Tmo, Acts} | As], Dir, S) -> + erlang:send_after(Tmo, self(), {actions, Dir, Acts}), + actions(As, Dir, S); + +actions(CB, _, S) -> + S#transport{message_cb = CB}. + +%% cb/3 + +cb(#transport{message_cb = false, packet = P}, recv, Msg) -> + [pkt(P, true, Msg)]; + +cb(#transport{message_cb = CB, packet = P}, recv = D, Msg) -> + cb(CB, D, pkt(P, false, Msg)); + +cb(#transport{message_cb = CB}, Dir, Msg) -> + cb(CB, Dir, Msg); + +cb(false, send, Msg) -> + [Msg]; + +cb(CB, Dir, Msg) -> + diameter_lib:eval([CB, Dir, Msg]). + +%% pkt/3 + +pkt(false, _, {_Info, Bin}) -> + Bin; + +pkt(true, _, {[#sctp_sndrcvinfo{stream = Id}], Bin}) -> + #diameter_packet{bin = Bin, transport_data = {stream, Id}}; + +pkt(raw, true, {[Info], Bin}) -> + #diameter_packet{bin = Bin, transport_data = Info}; + +pkt(raw, false, {[_], _} = Msg) -> + Msg. diff --git a/lib/diameter/src/transport/diameter_sctp_sup.erl b/lib/diameter/src/transport/diameter_sctp_sup.erl index 36050aaf28..e8e26ec7c5 100644 --- a/lib/diameter/src/transport/diameter_sctp_sup.erl +++ b/lib/diameter/src/transport/diameter_sctp_sup.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -49,6 +49,7 @@ start() -> start_child(T) -> SupRef = case element(1,T) of + monitor -> ?TRANSPORT_SUP; connect -> ?TRANSPORT_SUP; accept -> ?TRANSPORT_SUP; listen -> ?LISTENER_SUP diff --git a/lib/diameter/src/transport/diameter_tcp.erl b/lib/diameter/src/transport/diameter_tcp.erl index 44abc5c3b4..a2f393d5d4 100644 --- a/lib/diameter/src/transport/diameter_tcp.erl +++ b/lib/diameter/src/transport/diameter_tcp.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2010-2016. All Rights Reserved. +%% Copyright Ericsson AB 2010-2017. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -19,7 +19,6 @@ %% -module(diameter_tcp). --dialyzer({no_fail_call, throttle/2}). -behaviour(gen_server). @@ -53,6 +52,7 @@ %% Keys into process dictionary. -define(INFO_KEY, info). -define(REF_KEY, ref). +-define(TRANSPORT_KEY, transport). -define(ERROR(T), erlang:error({T, ?MODULE, ?LINE})). @@ -68,16 +68,23 @@ %% The same gen_server implementation supports three different kinds %% of processes: an actual transport process, one that will club it to %% death should the parent die before a connection is established, and -%% a process owning the listening port. +%% a process owning the listening port. The monitor process +%% historically died after connection establishment, but can now live +%% on as the sender of outgoing messages, so that a blocking send +%% doesn't prevent messages from being received. %% Listener process state. -record(listener, {socket :: inet:socket(), + module :: module(), service = false :: false | pid()}). %% service process %% Monitor process state. -record(monitor, - {parent :: pid(), - transport = self() :: pid()}). + {parent :: reference() | false | pid(), + transport = self() :: pid(), + ack = false :: boolean(), + socket :: inet:socket() | ssl:sslsocket() | undefined, + module :: module() | undefined}). -type length() :: 0..16#FFFFFF. %% message length from Diameter header -type size() :: non_neg_integer(). %% accumulated binary size @@ -97,25 +104,30 @@ -type listen_option() :: {accept, match()} | {ssl_options, true | [ssl:listen_option()]} + | option() | ssl:listen_option() | gen_tcp:listen_option(). -type option() :: {port, non_neg_integer()} - | {fragment_timer, 0..16#FFFFFFFF} - | {throttle_cb, diameter:evaluable()}. + | {sender, boolean()} + | sender + | {message_cb, false | diameter:evaluable()} + | {fragment_timer, 0..16#FFFFFFFF}. %% Accepting/connecting transport process state. -record(transport, {socket :: inet:socket() | ssl:sslsocket(), %% accept/connect socket + active = false :: boolean(), %% is socket active? + recv = true :: boolean(), %% should it be active? parent :: pid(), %% of process that started us module :: module(), %% gen_tcp-like module - frag = <<>> :: frag(), %% message fragment ssl :: [term()] | boolean(), %% ssl options, ssl or not + frag = <<>> :: frag(), %% message fragment timeout :: infinity | 0..16#FFFFFFFF, %% fragment timeout tref = false :: false | reference(), %% fragment timer reference flush = false :: boolean(), %% flush fragment at timeout? - throttle_cb :: false | diameter:evaluable(), %% ask to receive - throttled :: boolean() | binary()}). %% stopped receiving? + message_cb :: false | diameter:evaluable(), + send :: pid() | false}). %% sending process %% The usual transport using gen_tcp can be replaced by anything %% sufficiently gen_tcp-like by passing a 'module' option as the first @@ -137,13 +149,13 @@ start({T, Ref}, Svc, Opts) -> #diameter_service{capabilities = Caps, - pid = SPid} + pid = SvcPid} = Svc, diameter_tcp_sup:start(), %% start tcp supervisors on demand {Mod, Rest} = split(Opts), Addrs = Caps#diameter_caps.host_ip_address, - Arg = {T, Ref, Mod, self(), Rest, Addrs, SPid}, + Arg = {T, Ref, Mod, self(), Rest, Addrs, SvcPid}, diameter_tcp_sup:start_child(Arg). split([{module, M} | Opts]) -> @@ -197,57 +209,53 @@ init(T) -> %% i/1 %% A transport process. -i({T, Ref, Mod, Pid, Opts, Addrs, SPid}) +i({T, Ref, Mod, Pid, Opts, Addrs, SvcPid}) when T == accept; T == connect -> monitor(process, Pid), %% Since accept/connect might block indefinitely, spawn a process - %% that does nothing but kill us with the parent until call - %% returns. - {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), + %% that kills us with the parent until call returns, and then + %% sends outgoing messages. {[SO|TO], Rest} = proplists:split(Opts, [ssl_options, - fragment_timer, - throttle_cb]), + sender, + message_cb, + fragment_timer]), SslOpts = ssl_opts(SO), OwnOpts = lists:append(TO), Tmo = proplists:get_value(fragment_timer, OwnOpts, ?DEFAULT_FRAGMENT_TIMEOUT), + [CB, Sender] = [proplists:get_value(K, OwnOpts, false) + || K <- [message_cb, sender]], ?IS_TIMEOUT(Tmo) orelse ?ERROR({fragment_timer, Tmo}), - Throttle = proplists:get_value(throttle_cb, OwnOpts, false), - Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SPid), - MPid ! {stop, self()}, %% tell the monitor to die + {ok, MPid} = diameter_tcp_sup:start_child(#monitor{parent = Pid}), + Sock = init(T, Ref, Mod, Pid, SslOpts, Rest, Addrs, SvcPid), M = if SslOpts -> ssl; true -> Mod end, + Sender andalso monitor(process, MPid), + false == CB orelse (Pid ! {diameter, ack}), + MPid ! {start, self(), Sender andalso {Sock, M}, false /= CB}, putr(?REF_KEY, Ref), - throttle(#transport{parent = Pid, - module = M, - socket = Sock, - ssl = SslOpts, - timeout = Tmo, - throttle_cb = Throttle, - throttled = false /= Throttle}); + setopts(#transport{parent = Pid, + module = M, + socket = Sock, + ssl = SslOpts, + message_cb = CB, + timeout = Tmo, + send = Sender andalso MPid}); %% Put the reference in the process dictionary since we now use it %% advertise the ssl socket after TLS upgrade. -i({T, _Ref, _Mod, _Pid, _Opts, _Addrs} = Arg) %% from old code - when T == accept; - T == connect -> - i(erlang:append_element(Arg, _SPid = false)); - %% A monitor process to kill the transport if the parent dies. i(#monitor{parent = Pid, transport = TPid} = S) -> + putr(?TRANSPORT_KEY, TPid), proc_lib:init_ack({ok, self()}), - monitor(process, Pid), monitor(process, TPid), - S; + S#monitor{parent = monitor(process, Pid)}; %% In principle a link between the transport and killer processes %% could do the same thing: have the accepting/connecting process be %% killed when the killer process dies as a consequence of parent %% death. However, a link can be unlinked and this is exactly what -%% gen_tcp seems to so. Links should be left to supervisors. - -i({listen = L, Ref, _APid, T}) -> %% from old code - i({L, Ref, T}); +%% gen_tcp seems to do. Links should be left to supervisors. i({listen, Ref, {Mod, Opts, Addrs}}) -> [_] = diameter_config:subscribe(Ref, transport), %% assert existence @@ -258,7 +266,8 @@ i({listen, Ref, {Mod, Opts, Addrs}}) -> LAddr = laddr(LAddrOpt, Mod, LSock), true = diameter_reg:add_new({?MODULE, listener, {Ref, {LAddr, LSock}}}), proc_lib:init_ack({ok, self(), {LAddr, LSock}}), - #listener{socket = LSock}. + #listener{socket = LSock, + module = Mod}. laddr([], Mod, Sock) -> {ok, {Addr, _Port}} = sockname(Mod, Sock), @@ -279,19 +288,19 @@ ssl_opts(T) -> %% init/8 %% Establish a TLS connection before capabilities exchange ... -init(Type, Ref, Mod, Pid, true, Opts, Addrs, SPid) -> - init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SPid); +init(Type, Ref, Mod, Pid, true, Opts, Addrs, SvcPid) -> + init(Type, Ref, ssl, Pid, [{cb_info, ?TCP_CB(Mod)} | Opts], Addrs, SvcPid); %% ... or not. -init(Type, Ref, Mod, Pid, _, Opts, Addrs, SPid) -> - init(Type, Ref, Mod, Pid, Opts, Addrs, SPid). +init(Type, Ref, Mod, Pid, _, Opts, Addrs, SvcPid) -> + init(Type, Ref, Mod, Pid, Opts, Addrs, SvcPid). %% init/7 -init(accept = T, Ref, Mod, Pid, Opts, Addrs, SPid) -> +init(accept = T, Ref, Mod, Pid, Opts, Addrs, SvcPid) -> {[Matches], Rest} = proplists:split(Opts, [accept]), {ok, LPid, {LAddr, LSock}} = listener(Ref, {Mod, Rest, Addrs}), - ok = gen_server:call(LPid, {accept, SPid}, infinity), + ok = gen_server:call(LPid, {accept, SvcPid}, infinity), proc_lib:init_ack({ok, self(), [LAddr]}), Sock = ok(accept(Mod, LSock)), ok = accept_peer(Mod, Sock, accept(Matches)), @@ -299,7 +308,7 @@ init(accept = T, Ref, Mod, Pid, Opts, Addrs, SPid) -> diameter_peer:up(Pid), Sock; -init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SPid) -> +init(connect = T, Ref, Mod, Pid, Opts, Addrs, _SvcPid) -> {[LA, RA, RP], Rest} = proplists:split(Opts, [ip, raddr, rport]), LAddrOpt = get_addr(LA, Addrs), RAddr = get_addr(RA), @@ -451,14 +460,18 @@ portnr(Sock) -> %% # handle_call/3 %% --------------------------------------------------------------------------- -handle_call({accept, SPid}, _From, #listener{service = P} = S) -> - {reply, ok, if not is_pid(P), is_pid(SPid) -> - monitor(process, SPid), - S#listener{service = SPid}; +handle_call({accept, SvcPid}, _From, #listener{service = P} = S) -> + {reply, ok, if not is_pid(P), is_pid(SvcPid) -> + monitor(process, SvcPid), + S#listener{service = SvcPid}; true -> S end}; - + +%% Transport is telling us of parent death. +handle_call({stop, _Pid} = Reason, _From, #monitor{} = S) -> + {stop, {shutdown, Reason}, ok, S}; + handle_call(_, _, State) -> {reply, nok, State}. @@ -480,8 +493,7 @@ handle_info(T, #listener{} = S) -> {noreply, #listener{} = l(T,S)}; handle_info(T, #monitor{} = S) -> - m(T,S), - x(T). + {noreply, #monitor{} = m(T,S)}. %% --------------------------------------------------------------------------- %% # code_change/3 @@ -497,6 +509,7 @@ code_change(_, State, _) -> terminate(_, _) -> ok. + %% --------------------------------------------------------------------------- putr(Key, Val) -> @@ -509,18 +522,47 @@ getr(Key) -> %% %% Transition monitor state. +%% Outgoing message. +m(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + send(Msg, S), + S; + +%% Transport has established a connection. Stop monitoring on the +%% parent so as not to die before a send from the transport. +m({start, TPid, T, Ack} = M, #monitor{transport = TPid} = S) -> + case T of + {Sock, Mod} -> + demonitor(S#monitor.parent, [flush]), + S#monitor{parent = false, + socket = Sock, + module = Mod, + ack = Ack}; + false -> %% monitor not sending + x(M) + end; + %% Transport is telling us to die. -m({stop, TPid}, #monitor{transport = TPid}) -> - ok; +m({stop, TPid} = T, #monitor{transport = TPid}) -> + x(T); -%% Transport has died. -m({'DOWN', _, process, TPid, _}, #monitor{transport = TPid}) -> - ok; +%% Transport is telling us to die. +m({stop, TPid} = T, #monitor{transport = TPid}) -> + x(T); -%% Transport parent has died. -m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid, - transport = TPid}) -> - exit(TPid, {shutdown, parent}). +%% Transport is telling us that TLS has been negotiated after +%% capabilities exchange. +m({tls, SSock}, S) -> + S#monitor{socket = SSock, + module = ssl}; + +%% Transport or parent has died. +m({'DOWN', M, process, P, _} = T, #monitor{parent = MRef, + transport = TPid}) + when M == MRef; + P == TPid -> + x(T). %% l/2 %% @@ -528,18 +570,16 @@ m({'DOWN', _, process, Pid, _}, #monitor{parent = Pid, %% Service process has died. l({'DOWN', _, process, Pid, _} = T, #listener{service = Pid, - socket = Sock}) -> - gen_tcp:close(Sock), + socket = Sock, + module = M}) -> + M:close(Sock), x(T); %% Transport has been removed. -l({transport, remove, _} = T, #listener{socket = Sock}) -> - gen_tcp:close(Sock), - x(T); - -%% Possibly death of an accepting process monitored in old code. -l(_, S) -> - S. +l({transport, remove, _} = T, #listener{socket = Sock, + module = M}) -> + M:close(Sock), + x(T). %% t/2 %% @@ -557,21 +597,13 @@ t(T,S) -> %% transition/2 -%% Incoming message. +%% Incoming packets. transition({P, Sock, Bin}, #transport{socket = Sock, - ssl = B, - throttled = T} + ssl = B} = S) when P == ssl, true == B; P == tcp -> - false = T, %% assert - recv(Bin, S); - -%% Make a new throttling callback after a timeout. -transition(throttle, #transport{throttled = false}) -> - ok; -transition(throttle, S) -> - throttle(S); + recv(Bin, S#transport{active = false}); %% Capabilties exchange has decided on whether or not to run over TLS. transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} @@ -581,7 +613,7 @@ transition({diameter, {tls, Ref, Type, B}}, #transport{parent = Pid} = NS = tls_handshake(Type, B, S), Pid ! {diameter, {tls, Ref}}, - throttle(NS#transport{ssl = B}); + NS#transport{ssl = B}; transition({C, Sock}, #transport{socket = Sock, ssl = B}) @@ -597,8 +629,18 @@ transition({E, Sock, _Reason} = T, #transport{socket = Sock, ?ERROR({T,S}); %% Outgoing message. -transition({diameter, {send, Bin}}, S) -> - send(Bin, S); +transition({diameter, {send, Msg}}, #transport{} = S) -> + message(send, Msg, S); + +%% Monitor has sent an outgoing message. +transition(Msg, S) + when is_record(Msg, diameter_packet); + is_binary(Msg) -> + message(ack, Msg, S); + +%% Deferred actions from a message_cb. +transition({actions, Dir, Acts}, S) -> + actions(Acts, Dir, S); %% Request to close the transport connection. transition({diameter, {close, Pid}}, #transport{parent = Pid, @@ -618,8 +660,18 @@ transition({resolve_port, Pid}, #transport{socket = Sock, Pid ! portnr(M, Sock), ok; -%% Parent process has died. -transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid}) -> +%% Parent process has died: call the monitor to not close the socket +%% during an ongoing send, but don't let it take forever. +transition({'DOWN', _, process, Pid, _}, #transport{parent = Pid, + send = MPid}) -> + false == MPid + orelse (ok == gen_server:call(MPid, {stop, self()}, 1000)) + orelse exit(MPid, {shutdown, parent}), + stop; + +%% Monitor process has died. +transition({'DOWN', _, process, MPid, _}, #transport{send = MPid}) + when is_pid(MPid) -> stop. %% Crash on anything unexpected. @@ -643,11 +695,13 @@ tls_handshake(_, true, #transport{ssl = false}) -> %% Capabilities exchange negotiated TLS: upgrade the connection. tls_handshake(Type, true, #transport{socket = Sock, module = M, - ssl = Opts} + ssl = Opts, + send = MPid} = S) -> {ok, SSock} = tls(Type, Sock, [{cb_info, ?TCP_CB(M)} | Opts]), Ref = getr(?REF_KEY), true = diameter_reg:add_new({?MODULE, Type, {Ref, SSock}}), + false == MPid orelse (MPid ! {tls, SSock}), %% tell the sender process S#transport{socket = SSock, module = ssl}; @@ -666,24 +720,15 @@ tls(accept, Sock, Opts) -> %% using Nagle. %% Receive packets until a full message is received, -recv(Bin, #transport{frag = Head, throttled = false} = S) -> +recv(Bin, #transport{frag = Head} = S) -> case rcv(Head, Bin) of - {Msg, B} -> - throttle(S#transport{frag = B, throttled = Msg}); - Frag -> - setopts(S), - start_fragment_timer(S#transport{frag = Frag, - flush = false}) + {Msg, B} -> %% have a complete message ... + message(recv, Msg, S#transport{frag = B}); + Frag -> %% read more on the socket + start_fragment_timer(setopts(S#transport{frag = Frag, + flush = false})) end. -%% recv/1 - -recv(#transport{throttled = false} = S) -> - recv(<<>>, S); - -recv(#transport{} = S) -> - S. - %% rcv/2 %% No previous fragment. @@ -743,13 +788,16 @@ recv1(Len, Bin) -> <<Msg:Len/binary, Rest/binary>> = Bin, {Msg, Rest}. -%% bin/1-2 +%% bin/2 bin(Head, Acc) -> list_to_binary([Head | lists:reverse(Acc)]). +%% bin/1 + bin({_, _, Head, Acc}) -> bin(Head, Acc); + bin(Bin) when is_binary(Bin) -> Bin. @@ -768,9 +816,7 @@ bin(Bin) %% also eventually lead to watchdog failover. %% No fragment to flush or not receiving messages. -flush(#transport{frag = Frag, throttled = B} = S) - when Frag == <<>>; - B /= false -> +flush(#transport{frag = <<>>} = S) -> S; %% Messages have been received since last timer expiry. @@ -778,9 +824,8 @@ flush(#transport{flush = false} = S) -> start_fragment_timer(S#transport{flush = true}); %% No messages since last expiry. -flush(#transport{frag = Frag, parent = Pid} = S) -> - diameter_peer:recv(Pid, bin(Frag)), - S#transport{frag = <<>>}. +flush(#transport{frag = Frag} = S) -> + message(recv, bin(Frag), S#transport{frag = <<>>}). %% start_fragment_timer/1 %% @@ -813,9 +858,27 @@ connect(Mod, Host, Port, Opts) -> %% send/2 -send(Bin, #transport{socket = Sock, - module = M}) -> - case send(M, Sock, Bin) of +send(Msg, #monitor{socket = Sock, module = M, transport = TPid, ack = B}) -> + send1(M, Sock, Msg), + B andalso (TPid ! Msg); + +send(Msg, #transport{socket = Sock, module = M, send = false} = S) -> + send1(M, Sock, Msg), + message(ack, Msg, S); + +%% Send from the monitor process to avoid deadlock if both the +%% receiver and the peer were to block in send. +send(Msg, #transport{send = Pid} = S) -> + Pid ! Msg, + S. + +%% send1/3 + +send1(Mod, Sock, #diameter_packet{bin = Bin}) -> + send1(Mod, Sock, Bin); + +send1(Mod, Sock, Bin) -> + case send(Mod, Sock, Bin) of ok -> ok; {error, Reason} -> @@ -842,120 +905,19 @@ setopts(M, Sock, Opts) -> %% setopts/1 -setopts(#transport{socket = Sock, module = M}) -> - setopts(M, Sock). - -%% setopts/2 - -setopts(M, Sock) -> +setopts(#transport{socket = Sock, + active = A, + recv = B, + module = M} + = S) + when B, not A -> case setopts(M, Sock, [{active, once}]) of - ok -> ok; - X -> x({setopts, M, Sock, X}) %% possibly on peer disconnect - end. - -%% throttle/1 - -%% Still collecting packets for a complete message: keep receiving. -throttle(#transport{throttled = false} = S) -> - recv(S); - -%% Decide whether to receive another, or whether to accept a message -%% that's been received. -throttle(#transport{throttle_cb = F, throttled = T} = S) -> - Res = cb(F, T), - - try throttle(Res, S) of - #transport{ssl = SB} = NS when is_boolean(SB) -> - throttle(defrag(NS)); - #transport{throttled = Msg} = NS when is_binary(Msg) -> - %% Initial incoming message when we might need to upgrade - %% to TLS: wait for reception of a tls tuple. - defrag(NS) - catch - #transport{} = NS -> - recv(NS) - end. - -%% cb/2 - -cb(false, _) -> - ok; - -cb(F, B) -> - diameter_lib:eval([F, true /= B andalso B]). - -%% throttle/2 - -%% Callback says to receive another message. -throttle(ok, #transport{throttled = true} = S) -> - throw(S#transport{throttled = false}); - -%% Callback says to accept a received message. -throttle(ok, #transport{parent = Pid, throttled = Msg} = S) - when is_binary(Msg) -> - diameter_peer:recv(Pid, Msg), - S; - -throttle({ok = T, F}, S) -> - throttle(T, S#transport{throttle_cb = F}); - -%% Callback says to accept a received message and acknowledged the -%% returned pid with a {request, Pid} message if a request pid is -%% spawned, a discard message otherwise. The latter does not mean that -%% the message was necessarily discarded: it could have been an -%% answer. -throttle(NPid, #transport{parent = Pid, throttled = Msg} = S) - when is_pid(NPid), is_binary(Msg) -> - diameter_peer:recv(Pid, {Msg, NPid}), - S; - -throttle({NPid, F}, #transport{throttled = Msg} = S) - when is_pid(NPid), is_binary(Msg) -> - throttle(NPid, S#transport{throttle_cb = F}); - -%% Callback to accept a received message says to discard it. -throttle(discard, #transport{throttled = Msg} = S) - when is_binary(Msg) -> - S; - -throttle({discard = T, F}, #transport{throttled = Msg} = S) - when is_binary(Msg) -> - throttle(T, S#transport{throttle_cb = F}); - -%% Callback to accept a received message says to answer it with the -%% supplied binary. -throttle(Bin, #transport{throttled = Msg} = S) - when is_binary(Bin), is_binary(Msg) -> - send(Bin, S), - S; - -throttle({Bin, F}, #transport{throttled = Msg} = S) - when is_binary(Bin), is_binary(Msg) -> - throttle(Bin, S#transport{throttle_cb = F}); - -%% Callback says to ask again in the specified number of milliseconds. -throttle({timeout, Tmo}, S) -> - erlang:send_after(Tmo, self(), throttle), - throw(S); - -throttle({timeout = T, Tmo, F}, S) -> - throttle({T, Tmo}, S#transport{throttle_cb = F}); - -throttle(T, #transport{throttle_cb = F}) -> - ?ERROR({invalid_return, T, F}). - -%% defrag/1 -%% -%% Try to extract another message from packets already read before -%% another throttling callback. + ok -> S#transport{active = true}; + X -> x({setopts, Sock, M, X}) %% possibly on peer disconnect + end; -defrag(#transport{frag = Head} = S) -> - case rcv(Head, <<>>) of - {Msg, B} -> - S#transport{throttled = Msg, frag = B}; - _ -> - S#transport{throttled = true} - end. +setopts(S) -> + S. %% portnr/2 @@ -990,3 +952,80 @@ getstat(gen_tcp, Sock) -> getstat(M, Sock) -> M:getstat(Sock). %% Note that ssl:getstat/1 doesn't yet exist in R15B01. + +%% A message_cb is invoked whenever a message is sent or received, or +%% to provide acknowledgement of a completed send or discarded +%% request. Ignoring possible extra arguments, calls are of the +%% following form. +%% +%% cb(recv, Msg) Receive a message into diameter? +%% cb(send, Msg) Send a message on the socket? +%% cb(ack, Msg) Acknowledgement of a completed send. +%% cb(ack, false) Acknowledgement of a discarded request. +%% +%% Msg will be binary() in a recv callback, but can be a +%% diameter_packet record in a send/ack callback if a recv/send +%% callback returns a record. Callbacks return a list of the following +%% form. +%% +%% [boolean() | send | recv | binary() | #diameter_packet{}] +%% +%% The atoms are meaningless by themselves, but say whether subsequent +%% messages are to be sent or received. A boolean says whether or not +%% to continue reading on the socket. Messages can be received even +%% after false is returned if these arrived in the same packet. A +%% leading recv or send is implicit on the corresponding callbacks. A +%% new callback can be returned as the tail of a returned list: any +%% value not of the aforementioned list type is interpreted as a +%% callback. + +%% message/3 + +message(send, false = M, S) -> + message(ack, M, S); + +message(ack, _, #transport{message_cb = false} = S) -> + S; + +message(Dir, Msg, #transport{message_cb = CB} = S) -> + recv(<<>>, actions(cb(CB, Dir, Msg), Dir, S)). + +%% actions/3 + +actions([], _, S) -> + S; + +actions([B | As], Dir, S) + when is_boolean(B) -> + actions(As, Dir, S#transport{recv = B}); + +actions([Dir | As], _, S) + when Dir == send; + Dir == recv -> + actions(As, Dir, S); + +actions([Msg | As], send = Dir, S) + when is_binary(Msg); + is_record(Msg, diameter_packet) -> + actions(As, Dir, send(Msg, S)); + +actions([Msg | As], recv = Dir, #transport{parent = Pid} = S) + when is_binary(Msg); + is_record(Msg, diameter_packet) -> + diameter_peer:recv(Pid, Msg), + actions(As, Dir, S); + +actions([{defer, Tmo, Acts} | As], Dir, S) -> + erlang:send_after(Tmo, self(), {actions, Dir, Acts}), + actions(As, Dir, S); + +actions(CB, _, S) -> + S#transport{message_cb = CB}. + +%% cb/3 + +cb(false, _, Msg) -> + [Msg]; + +cb(CB, Dir, Msg) -> + diameter_lib:eval([CB, Dir, Msg]). |