From fd2850798f68c9a3c502ad9d66ef46561816ab6f Mon Sep 17 00:00:00 2001 From: Anders Svensson Date: Mon, 12 Jun 2017 21:54:50 +0200 Subject: Let spawn_opt config replace erlang:spawn_opt/2 for request processes By accepting an MFA that is applied to the fun that is otherwise spawned for each incoming request, to allow handler processes to be reused. This is not yet documented and may change, but the motivation is to let spawn be replaced by process pool, from which the MFA selects. A list-valued spawn_opt is equivalent to {erlang, spawn_opt, [Opts]}. --- lib/diameter/src/base/diameter_config.erl | 7 +++++ lib/diameter/src/base/diameter_service.erl | 2 +- lib/diameter/src/base/diameter_traffic.erl | 42 ++++++++++++++++++++++++---- lib/diameter/test/diameter_traffic_SUITE.erl | 7 ++--- 4 files changed, 47 insertions(+), 11 deletions(-) (limited to 'lib/diameter') diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl index 1db9b52dfa..99d8c8c6ec 100644 --- a/lib/diameter/src/base/diameter_config.erl +++ b/lib/diameter/src/base/diameter_config.erl @@ -610,6 +610,9 @@ opt({watchdog_timer, Tmo}) -> opt({watchdog_config, L}) -> is_list(L) andalso lists:all(fun wdopt/1, L); +opt({spawn_opt, {M,F,A}}) + when is_atom(M), is_atom(F), is_list(A) -> + true; opt({spawn_opt = K, Opts}) -> if is_list(Opts) -> {value, {K, spawn_opts(Opts)}}; @@ -739,6 +742,10 @@ opt(incoming_maxlen, N) when 0 =< N, N < 1 bsl 24 -> N; +opt(spawn_opt, {M,F,A} = T) + when is_atom(M), is_atom(F), is_list(A) -> + T; + opt(spawn_opt, L) when is_list(L) -> spawn_opts(L); diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl index be50e87179..8e383818ea 100644 --- a/lib/diameter/src/base/diameter_service.erl +++ b/lib/diameter/src/base/diameter_service.erl @@ -114,7 +114,7 @@ incoming_maxlen := diameter:message_length(), strict_mbit := boolean(), string_decode := boolean(), - spawn_opt := list()}}). + spawn_opt := list() | {module(), atom(), list()}}}). %% Record representing an RFC 3539 watchdog process implemented by %% diameter_watchdog. diff --git a/lib/diameter/src/base/diameter_traffic.erl b/lib/diameter/src/base/diameter_traffic.erl index 54f39afbf0..af7ac10f13 100644 --- a/lib/diameter/src/base/diameter_traffic.erl +++ b/lib/diameter/src/base/diameter_traffic.erl @@ -212,8 +212,9 @@ incr_rc(Dir, Pkt, TPid, Dict0) -> %% --------------------------------------------------------------------------- -spec receive_message(pid(), Route, #diameter_packet{}, module(), RecvData) - -> pid() - | boolean() + -> pid() %% request handler + | boolean() %% answer, known request or not + | discard %% request discarded by MFA when Route :: {Handler, RequestRef, Seqs} | Ack, RecvData :: {[SpawnOpt], #recvdata{}}, @@ -230,9 +231,10 @@ receive_message(TPid, Route, Pkt, Dict0, RecvData) -> %% recv/6 %% Incoming request ... -recv(true, Ack, TPid, Pkt, Dict0, RecvData) +recv(true, Ack, TPid, Pkt, Dict0, T) when is_boolean(Ack) -> - spawn_request(Ack, TPid, Pkt, Dict0, RecvData); + {Opts, RecvData} = T, + spawn_request(Ack, TPid, Pkt, Dict0, RecvData, Opts); %% ... answer to known request ... recv(false, {Pid, Ref, TPid}, _, Pkt, Dict0, _) -> @@ -254,18 +256,46 @@ recv(false, false, TPid, Pkt, _, _) -> incr(TPid, {{unknown, 0}, recv, discarded}), false. -%% spawn_request/5 +%% spawn_request/6 -spawn_request(Ack, TPid, Pkt, Dict0, {Opts, RecvData}) -> +%% An MFA should return a pid() or the atom 'discard'. The latter +%% results in an acknowledgment back to the transport process when +%% appropriate, to ensure that send/recv callbacks can count +%% outstanding requests. Acknowledgement is implicit if the +%% handler process dies (in a handle_request callback for example). +spawn_request(Ack, TPid, Pkt, Dict0, RecvData, {M,F,A}) -> + ReqF = fun() -> + ack(Ack, TPid, recv_request(Ack, TPid, Pkt, Dict0, RecvData)) + end, + ack(Ack, TPid, apply(M, F, [ReqF | A])); + +%% A spawned process acks implicitly when it dies, so there's no need +%% to handle 'discard'. +spawn_request(Ack, TPid, Pkt, Dict0, RecvData, Opts) -> spawn_opt(fun() -> recv_request(Ack, TPid, Pkt, Dict0, RecvData) end, Opts). +%% ack/3 + +ack(Ack, TPid, RC) -> + RC == discard andalso Ack andalso (TPid ! {send, false}), + RC. + %% --------------------------------------------------------------------------- %% recv_request/5 %% --------------------------------------------------------------------------- +-spec recv_request(Ack :: boolean(), + TPid :: pid(), + #diameter_packet{}, + Dict0 :: module(), + #recvdata{}) + -> ok %% answer was sent + | discard %% or not + | false. %% no transport + recv_request(Ack, TPid, #diameter_packet{header = #diameter_header{application_id = Id}} diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl index f2e796005d..7c36b3f82b 100644 --- a/lib/diameter/test/diameter_traffic_SUITE.erl +++ b/lib/diameter/test/diameter_traffic_SUITE.erl @@ -203,8 +203,7 @@ {'Acct-Application-Id', [?DIAMETER_APP_ID_ACCOUNTING]}, {restrict_connections, false}, {string_decode, Decode}, - {incoming_maxlen, 1 bsl 21}, - {spawn_opt, [{min_heap_size, 5000}]} + {incoming_maxlen, 1 bsl 21} | [{application, [{dictionary, D}, {module, ?MODULE}, {answer_errors, callback}]} @@ -466,8 +465,8 @@ add_transports(Config) -> || T == sctp andalso CS]], [{capabilities_cb, fun capx/2}, {pool_size, 8}, - {spawn_opt, [{min_heap_size, 8096}]}, - {applications, apps(rfc3588)}]), + {applications, apps(rfc3588)}] + ++ [{spawn_opt, {erlang, spawn, []}} || CS]), Cs = [?util:connect(CN, [T, {sender, CS}], LRef, -- cgit v1.2.3