aboutsummaryrefslogtreecommitdiffstats
path: root/lib
diff options
context:
space:
mode:
authorAnders Svensson <[email protected]>2012-11-08 12:55:58 +0100
committerAnders Svensson <[email protected]>2012-11-08 12:55:58 +0100
commitcedda10e06e39d7d08ab98ee342d7c8eed00edc9 (patch)
treef7b47ea8cf3c16648fff0b2c0ab7a4815c6726cf /lib
parent1262a7cadb69ee503bb5c2586038059b00595c99 (diff)
parent618642cc8bf057294f86eadfb7c0968c089a0dac (diff)
downloadotp-cedda10e06e39d7d08ab98ee342d7c8eed00edc9.tar.gz
otp-cedda10e06e39d7d08ab98ee342d7c8eed00edc9.tar.bz2
otp-cedda10e06e39d7d08ab98ee342d7c8eed00edc9.zip
Merge branch 'anders/diameter/shared_transport/OTP-10443' into maint
* anders/diameter/shared_transport/OTP-10443: Use multiple connections in traffic suite Implement service_opt() restrict_connections Document service_opt() restrict_connections
Diffstat (limited to 'lib')
-rw-r--r--lib/diameter/doc/src/diameter.xml42
-rw-r--r--lib/diameter/src/base/diameter.erl9
-rw-r--r--lib/diameter/src/base/diameter_config.erl25
-rw-r--r--lib/diameter/src/base/diameter_peer_fsm.erl38
-rw-r--r--lib/diameter/src/base/diameter_service.erl16
-rw-r--r--lib/diameter/src/base/diameter_watchdog.erl99
-rw-r--r--lib/diameter/test/diameter_traffic_SUITE.erl90
7 files changed, 242 insertions, 77 deletions
diff --git a/lib/diameter/doc/src/diameter.xml b/lib/diameter/doc/src/diameter.xml
index 161fd94d5a..b7936dbacc 100644
--- a/lib/diameter/doc/src/diameter.xml
+++ b/lib/diameter/doc/src/diameter.xml
@@ -375,10 +375,15 @@ is meant in the sense of <c>eval([E|A])</c>.</p>
<warning>
<p>
-Beware of using fun expressions of the form <c>fun Name/Arity</c> (not
-fun Mod:Name/Arity) in situations in which the fun is not short-lived
+Beware of using fun expressions of the form <c>fun Name/Arity</c> in
+situations in which the fun is not short-lived
and code is to be upgraded at runtime since any processes retaining
-such a fun will have a reference to old code.</p>
+such a fun will have a reference to old code.
+In particular, such a value is typically inappropriate in
+configuration passed to <seealso
+marker="#start_service">start_service/2</seealso> or
+<seealso
+marker="#add_transport">add_transport/2</seealso>.</p>
</warning>
<marker id="peer_filter"/>
@@ -699,6 +704,35 @@ the application's <seealso marker="diameter_dict">dictionary</seealso>
file.</p>
</item>
+<tag><c>{restrict_connections, false
+ | node
+ | nodes
+ | [node()]
+ | diameter:evaluable()}</c></tag>
+<item>
+<p>
+Specifies the degree to which multiple transport connections to the
+same peer are accepted by the service.</p>
+
+<p>
+If type <c>[node()]</c> then a connection is rejected if another already
+exists on any of the specified nodes.
+Values of type <c>false</c>, <c>node</c>, <c>nodes</c> or
+<c>diameter:evaluable()</c> are equivalent to values <c>[]</c>,
+<c>[node()]</c>, <c>[node()|nodes()]</c> and the evaluated value,
+respectively, evaluation of each expression taking place whenever a
+new connection is to be established.
+Note that <c>false</c> allows an unlimited number of connections to be
+established with the same peer.</p>
+
+<p>
+Multiple connections are independent and governed
+by their own peer and watchdog state machines.</p>
+
+<p>
+Defaults to <c>nodes</c>.</p>
+</item>
+
<tag><c>{sequence, {H,N} | <seealso
marker="diameter#evaluable">diameter:evaluable()</seealso>}</c></tag>
<item>
@@ -879,7 +913,7 @@ reconnection attempts, as required by RFC 3539.</p>
For a listening transport, the timer specifies the time after which a
previously connected peer will be forgotten: a connection after this time is
regarded as an initial connection rather than a reestablishment,
-causing the RFC 3539 state machine to pass to state OPEN rather than
+causing the RFC 3539 state machine to pass to state OKAY rather than
REOPEN.
Note that these semantics are not goverened by the RFC and
that a listening transport's <c>reconnect_timer</c> should be greater
diff --git a/lib/diameter/src/base/diameter.erl b/lib/diameter/src/base/diameter.erl
index d60510db7d..3e3a6be0ef 100644
--- a/lib/diameter/src/base/diameter.erl
+++ b/lib/diameter/src/base/diameter.erl
@@ -44,6 +44,7 @@
stop/0]).
-export_type([evaluable/0,
+ restriction/0,
sequence/0,
app_alias/0,
service_name/0,
@@ -284,11 +285,19 @@ call(SvcName, App, Message) ->
-type sequence()
:: {'Unsigned32'(), 0..32}.
+-type restriction()
+ :: false
+ | node
+ | nodes
+ | [node()]
+ | evaluable().
+
%% Options passed to start_service/2
-type service_opt()
:: capability()
| {application, [application_opt()]}
+ | {restrict_connections, restriction()}
| {sequence, sequence() | evaluable()}.
-type application_opt()
diff --git a/lib/diameter/src/base/diameter_config.erl b/lib/diameter/src/base/diameter_config.erl
index fc37ca8541..63d28f25a2 100644
--- a/lib/diameter/src/base/diameter_config.erl
+++ b/lib/diameter/src/base/diameter_config.erl
@@ -555,7 +555,8 @@ make_config(SvcName, Opts) ->
Os = split(Opts, fun opt/2, [{false, share_peers},
{false, use_shared_peers},
{false, monitor},
- {?NOMASK, sequence}]),
+ {?NOMASK, sequence},
+ {nodes, restrict_connections}]),
%% share_peers and use_shared_peers are currently undocumented.
#service{name = SvcName,
@@ -579,15 +580,33 @@ opt(monitor, P)
when is_pid(P) ->
P;
+opt(restrict_connections, T)
+ when T == node;
+ T == nodes;
+ T == [];
+ is_atom(hd(T)) ->
+ T;
+
+opt(restrict_connections = K, F) ->
+ try diameter_lib:eval(F) of %% no guarantee that it won't fail later
+ Nodes when is_list(Nodes) ->
+ F;
+ V ->
+ ?THROW({value, {K,V}})
+ catch
+ E:R ->
+ ?THROW({value, {K, E, R, ?STACK}})
+ end;
+
opt(sequence, {_,_} = T) ->
sequence(T);
-opt(sequence, F) ->
+opt(sequence = K, F) ->
try diameter_lib:eval(F) of
T -> sequence(T)
catch
E:R ->
- ?THROW({value, {sequence, E, R, ?STACK}})
+ ?THROW({value, {K, E, R, ?STACK}})
end;
opt(K, _) ->
diff --git a/lib/diameter/src/base/diameter_peer_fsm.erl b/lib/diameter/src/base/diameter_peer_fsm.erl
index 8ce6ea847a..297a5d7709 100644
--- a/lib/diameter/src/base/diameter_peer_fsm.erl
+++ b/lib/diameter/src/base/diameter_peer_fsm.erl
@@ -60,6 +60,7 @@
-define(Q_KEY, q). %% transport start queue
-define(START_KEY, start). %% start of connected transport
-define(SEQUENCE_KEY, mask). %% mask for sequence numbers
+-define(RESTRICT_KEY, restrict). %% nodes for connection check
%% The default sequence mask.
-define(NOMASK, {0,32}).
@@ -126,7 +127,9 @@
%%% ---------------------------------------------------------------------------
-spec start(T, [Opt], #diameter_service{} %% from old code
- | {diameter:sequence(), #diameter_service{}})
+ | {diameter:sequence(),
+ diameter:restriction(),
+ #diameter_service{}})
-> pid()
when T :: {connect|accept, diameter:transport_ref()},
Opt :: diameter:transport_opt().
@@ -157,11 +160,11 @@ init(T) ->
gen_server:enter_loop(?MODULE, [], i(T)).
i({WPid, Type, Opts, #diameter_service{} = Svc}) -> %% from old code
- i({WPid, Type, Opts, {?NOMASK, Svc}});
+ i({WPid, Type, Opts, {?NOMASK, [node() | nodes()], Svc}});
-i({WPid, T, Opts, {Mask, #diameter_service{applications = Apps,
- capabilities = Caps}
- = Svc}}) ->
+i({WPid, T, Opts, {Mask, Nodes, #diameter_service{applications = Apps,
+ capabilities = Caps}
+ = Svc}}) ->
[] /= Apps orelse ?ERROR({no_apps, T, Opts}),
putr(?DWA_KEY, dwa(Caps)),
{M, Ref} = T,
@@ -169,6 +172,7 @@ i({WPid, T, Opts, {Mask, #diameter_service{applications = Apps,
{[Ts], Rest} = proplists:split(Opts, [capabilities_cb]),
putr(?CB_KEY, {Ref, [F || {_,F} <- Ts]}),
putr(?SEQUENCE_KEY, Mask),
+ putr(?RESTRICT_KEY, Nodes),
erlang:monitor(process, WPid),
{TPid, Addrs} = start_transport(T, Rest, Svc),
#state{parent = WPid,
@@ -990,15 +994,31 @@ dpa_timer() ->
%% Register a term and ensure it's not registered elsewhere. Note that
%% two process that simultaneously register the same term may well
%% both fail to do so this isn't foolproof.
+%%
+%% Everywhere is no longer everywhere, it's where a
+%% restrict_connections service_opt() specifies.
register_everywhere(T) ->
- diameter_reg:add_new(T)
- andalso unregistered(T).
+ reg(getr(?RESTRICT_KEY), T).
+
+reg(Nodes, T) ->
+ add(lists:member(node(), Nodes), T) andalso unregistered(Nodes, T).
+
+add(true, T) ->
+ diameter_reg:add_new(T);
+add(false, T) ->
+ diameter_reg:add(T).
-unregistered(T) ->
- {ResL, _} = rpc:multicall(?MODULE, match, [{node(), T}]),
+%% unregistered
+%%
+%% Ensure that the term in question isn't registered on other nodes.
+
+unregistered(Nodes, T) ->
+ {ResL, _} = rpc:multicall(Nodes, ?MODULE, match, [{node(), T}]),
lists:all(fun(L) -> [] == L end, ResL).
+%% match/1
+
match({Node, _})
when Node == node() ->
[];
diff --git a/lib/diameter/src/base/diameter_service.erl b/lib/diameter/src/base/diameter_service.erl
index 91e7cbd996..b4e54cc9f9 100644
--- a/lib/diameter/src/base/diameter_service.erl
+++ b/lib/diameter/src/base/diameter_service.erl
@@ -110,6 +110,9 @@
%% The default sequence mask.
-define(NOMASK, {0,32}).
+%% The default restrict_connections.
+-define(RESTRICT, nodes).
+
%% Workaround for dialyzer's lack of understanding of match specs.
-type match(T)
:: T | '_' | '$1' | '$2' | '$3' | '$4'.
@@ -126,6 +129,7 @@
monitor = false :: false | pid(), %% process to die with
options
:: [{sequence, diameter:sequence()} %% sequence mask
+ | {restrict_connections, diameter:restriction()}
| {share_peers, boolean()} %% broadcast peers to remote nodes?
| {use_shared_peers, boolean()}]}).%% use broadcasted peers?
%% shared_peers reflects the peers broadcast from remote nodes. Note
@@ -500,6 +504,10 @@ handle_call(stop, _From, S) ->
handle_call(sequence, _From, #state{options = [{_, Mask} | _]} = S) ->
{reply, Mask, S};
+%% Watchdog is asking for the nodes restriction.
+handle_call(restriction, _From, #state{options = [_,_,_,{_,R} | _]} = S) ->
+ {reply, R, S};
+
handle_call(Req, From, S) ->
unexpected(handle_call, [Req, From], S),
{reply, nok, S}.
@@ -656,7 +664,8 @@ upgrade({state, Id, Svc, Name, Svc, PT, CT, SB, UB, SD, LD, MPid}) ->
monitor = MPid,
options = [{sequence, ?NOMASK},
{share_peers, SB},
- {use_shared_peers, UB}]},
+ {use_shared_peers, UB},
+ {restrict_connections, ?RESTRICT}]},
upgrade_insert(S),
S.
@@ -867,7 +876,10 @@ cfg_acc({_Ref, Type, _Opts} = T, {S, Acc})
service_options(Opts) ->
[{sequence, proplists:get_value(sequence, Opts, ?NOMASK)},
{share_peers, get_value(share_peers, Opts)},
- {use_shared_peers, get_value(use_shared_peers, Opts)}].
+ {use_shared_peers, get_value(use_shared_peers, Opts)},
+ {restrict_connections, proplists:get_value(restrict_connections,
+ Opts,
+ ?RESTRICT)}].
%% The order of options is significant since we match against the list.
mref(false = No) ->
diff --git a/lib/diameter/src/base/diameter_watchdog.erl b/lib/diameter/src/base/diameter_watchdog.erl
index b37a1a10e9..d814f1afe2 100644
--- a/lib/diameter/src/base/diameter_watchdog.erl
+++ b/lib/diameter/src/base/diameter_watchdog.erl
@@ -57,8 +57,9 @@
parent = self() :: pid(),
transport :: pid() | undefined,
tref :: reference(), %% reference for current watchdog timer
- message_data, %% term passed into diameter_service with message
- sequence :: diameter:sequence()}). %% mask
+ message_data, %% term passed into diameter_service with message
+ sequence :: diameter:sequence(), %% mask
+ restrict :: {diameter:restriction(), boolean()}}).
%% start/2
%%
@@ -121,15 +122,18 @@ make_state({T, Pid, {RecvData,
putr(restart, {T, Opts, Svc}), %% save seeing it in trace
putr(dwr, dwr(Caps)), %%
{_,_} = Mask = call(Pid, sequence),
+ Restrict = call(Pid, restriction),
+ Nodes = restrict_nodes(Restrict),
#watchdog{parent = Pid,
transport = monitor(diameter_peer_fsm:start(T,
- Opts,
- {Mask, Svc})),
+ Opts,
+ {Mask, Nodes, Svc})),
tw = proplists:get_value(watchdog_timer,
Opts,
?DEFAULT_TW_INIT),
message_data = {RecvData, SvcName, Apps, Mask},
- sequence = Mask}.
+ sequence = Mask,
+ restrict = {Restrict, lists:member(node(), Nodes)}}.
%% Retrieve the sequence mask from the parent from the parent, rather
%% than having it passed into init/1, for upgrade reasons: the call to
@@ -160,9 +164,11 @@ handle_info(T, #watchdog{} = State) ->
{stop, {shutdown, T}, State}
end;
-handle_info(T, S) -> %% upgrade
- handle_info(T, #watchdog{} = list_to_tuple(tuple_to_list(S)
- ++ [?NOMASK])).
+handle_info(T, S) ->
+ handle_info(T, upgrade(S)).
+
+upgrade(S) ->
+ #watchdog{} = list_to_tuple(tuple_to_list(S) ++ [?NOMASK, {nodes, true}]).
event(#watchdog{status = T}, #watchdog{status = T}) ->
ok;
@@ -255,9 +261,10 @@ transition({close, TPid, _Reason}, #watchdog{transport = TPid}) ->
transition({open, TPid, Hosts, T} = Open,
#watchdog{transport = TPid,
status = initial,
- parent = Pid}
+ parent = Pid,
+ restrict = {_, R}}
= S) ->
- case okay(getr(restart), Hosts) of
+ case okay(getr(restart), Hosts, R) of
okay ->
open(Pid, {TPid, T}),
set_watchdog(S#watchdog{status = okay});
@@ -363,20 +370,24 @@ encode(Msg, Mask) ->
#diameter_packet{bin = Bin} = diameter_codec:encode(?BASE, Pkt),
Bin.
-%% okay/2
+%% okay/3
-okay({{accept, Ref}, _, _}, Hosts) ->
+okay({{accept, Ref}, _, _}, Hosts, Restrict) ->
T = {?MODULE, connection, Ref, Hosts},
diameter_reg:add(T),
- okay(diameter_reg:match(T));
+ if Restrict ->
+ okay(diameter_reg:match(T));
+ true ->
+ okay
+ end;
%% Register before matching so that at least one of two registering
-%% processes will match the other. (Which can't happen as long as
-%% diameter_peer_fsm guarantees at most one open connection to the same
-%% peer.)
+%% processes will match the other.
-okay({{connect, _}, _, _}, _) ->
+okay({{connect, _}, _, _}, _, _) ->
okay.
+%% okay/2
+
%% The peer hasn't been connected recently ...
okay([{_,P}]) ->
P = self(), %% assert
@@ -633,23 +644,40 @@ restart(#watchdog{transport = undefined} = S) ->
restart(S) ->
S.
+%% restart/2
+%%
%% Only restart the transport in the connecting case. For an accepting
-%% transport, we've registered the peer connection when leaving state
-%% initial and this is used by a new accepting process to realize that
-%% it's actually in state down rather then initial when receiving
-%% notification of an open connection.
+%% transport, there's no guarantee that an accepted connection in a
+%% restarted transport if from the peer we've lost contact with so
+%% have to be prepared for another watchdog to handle it. This is what
+%% the diameter_reg registration in this module is for: the peer
+%% connection is registered when leaving state initial and this is
+%% used by a new accepting watchdog to realize that it's actually in
+%% state down rather then initial when receiving notification of an
+%% open connection.
restart({{connect, _} = T, Opts, Svc}, #watchdog{parent = Pid,
- sequence = Mask}
+ sequence = Mask,
+ restrict = {R,_}}
= S) ->
Pid ! {reconnect, self()},
+ Nodes = restrict_nodes(R),
S#watchdog{transport = monitor(diameter_peer_fsm:start(T,
- Opts,
- {Mask, Svc}))};
+ Opts,
+ {Mask, Nodes, Svc})),
+ restrict = {R, lists:member(node(), Nodes)}};
+
+%% No restriction on the number of connections to the same peer: just
+%% die. Note that a state machine never enters state REOPEN in this
+%% case.
+restart({{accept, _}, _, _}, #watchdog{restrict = {_, false}}) ->
+ stop;
+
+%% Otherwise hang around until told to die.
restart({{accept, _}, _, _}, S) ->
S.
-%% Don't currently use Opts/Svc in the accept case but having them in
-%% the process dictionary is helpful if the process dies unexpectedly.
+
+%% Don't currently use Opts/Svc in the accept case.
%% dwr/1
@@ -659,3 +687,22 @@ dwr(#diameter_caps{origin_host = OH,
['DWR', {'Origin-Host', OH},
{'Origin-Realm', OR},
{'Origin-State-Id', OSI}].
+
+%% restrict_nodes/1
+
+restrict_nodes(false) ->
+ [];
+
+restrict_nodes(nodes) ->
+ [node() | nodes()];
+
+restrict_nodes(node) ->
+ [node()];
+
+restrict_nodes(Nodes)
+ when [] == Nodes;
+ is_atom(hd(Nodes)) ->
+ Nodes;
+
+restrict_nodes(F) ->
+ diameter_lib:eval(F).
diff --git a/lib/diameter/test/diameter_traffic_SUITE.erl b/lib/diameter/test/diameter_traffic_SUITE.erl
index 863d16e792..c391ba0317 100644
--- a/lib/diameter/test/diameter_traffic_SUITE.erl
+++ b/lib/diameter/test/diameter_traffic_SUITE.erl
@@ -81,11 +81,11 @@
%% diameter callbacks
-export([peer_up/3,
peer_down/3,
- pick_peer/5, pick_peer/6,
- prepare_request/4, prepare_request/5,
- prepare_retransmit/4,
- handle_answer/5, handle_answer/6,
- handle_error/5,
+ pick_peer/6, pick_peer/7,
+ prepare_request/5, prepare_request/6,
+ prepare_retransmit/5,
+ handle_answer/6, handle_answer/7,
+ handle_error/6,
handle_request/3]).
-include("diameter.hrl").
@@ -115,6 +115,9 @@
%% messages as lists or records.
-define(ENCODINGS, [list, record]).
+%% Identifers for client connections.
+-define(CONNECTIONS, [c1,c2,c3]).
+
%% Not really what we should be setting unless the message is sent in
%% the common application but diameter doesn't care.
-define(APP_ID, ?DIAMETER_APP_ID_COMMON).
@@ -127,7 +130,8 @@
{'Vendor-Id', 12345},
{'Product-Name', "OTP/diameter"},
{'Auth-Application-Id', [?DIAMETER_APP_ID_COMMON]},
- {'Acct-Application-Id', [?DIAMETER_APP_ID_ACCOUNTING]}
+ {'Acct-Application-Id', [?DIAMETER_APP_ID_ACCOUNTING]},
+ {restrict_connections, false}
| [{application, [{dictionary, D},
{module, ?MODULE},
{answer_errors, callback}]}
@@ -178,12 +182,14 @@ suite() ->
all() ->
[start, start_services, add_transports, result_codes]
- ++ [{group, name([E]), P} || E <- ?ENCODINGS, P <- [[], [parallel]]]
+ ++ [{group, name([E,C]), P} || E <- ?ENCODINGS,
+ C <- ?CONNECTIONS,
+ P <- [[], [parallel]]]
++ [remove_transports, stop_services, stop].
groups() ->
Ts = tc(),
- [{name([E]), [], Ts} || E <- ?ENCODINGS].
+ [{name([E,C]), [], Ts} || E <- ?ENCODINGS, C <- ?CONNECTIONS].
init_per_group(Name, Config) ->
[{group, Name} | Config].
@@ -252,12 +258,12 @@ start_services(_Config) ->
add_transports(Config) ->
LRef = ?util:listen(?SERVER, tcp, [{capabilities_cb, fun capx/2}]),
- CRef = ?util:connect(?CLIENT, tcp, LRef),
- ?util:write_priv(Config, "transport", {LRef, CRef}).
+ Cs = [?util:connect(?CLIENT, tcp, LRef, [{id, C}]) || C <- ?CONNECTIONS],
+ ?util:write_priv(Config, "transport", [LRef | Cs]).
remove_transports(Config) ->
- {LRef, CRef} = ?util:read_priv(Config, "transport"),
- ?util:disconnect(?CLIENT, CRef, ?SERVER, LRef).
+ [LRef | Cs] = ?util:read_priv(Config, "transport"),
+ [?util:disconnect(?CLIENT, C, ?SERVER, LRef) || C <- Cs].
stop_services(_Config) ->
ok = diameter:stop_service(?CLIENT),
@@ -543,11 +549,11 @@ call(Config, Req) ->
call(Config, Req, Opts) ->
Name = proplists:get_value(testcase, Config),
- [Enc] = name(proplists:get_value(group, Config)),
+ [Encoding, Client] = name(proplists:get_value(group, Config)),
diameter:call(?CLIENT,
dict(Req),
- req(Req, Enc),
- [{extra, [Name]} | Opts]).
+ req(Req, Encoding),
+ [{extra, [Name, Client]} | Opts]).
req(['ACR' = H | T], record) ->
?ACCT:'#new-'(?ACCT:msg2rec(H), T);
@@ -584,7 +590,7 @@ set(_, _, _, Rec) ->
Rec.
%% Contruct and deconstruct names to work around group names being
-%% restricted to atoms. (Not really used yet.)
+%% restricted to atoms.
name(Names)
when is_list(Names) ->
@@ -607,27 +613,45 @@ peer_up(_SvcName, _Peer, State) ->
peer_down(_SvcName, _Peer, State) ->
State.
-%% pick_peer/5/6
+%% pick_peer/6-7
-pick_peer([Peer], _, ?CLIENT, _State, Name)
+pick_peer(Peers, _, ?CLIENT, _State, Name, Id)
when Name /= send_detach ->
- {ok, Peer}.
+ find(Id, Peers).
-pick_peer([_Peer], _, ?CLIENT, _State, send_nopeer, ?EXTRA) ->
+pick_peer(_Peers, _, ?CLIENT, _State, send_nopeer, _, ?EXTRA) ->
false;
-pick_peer([Peer], _, ?CLIENT, _State, send_detach, {_,_}) ->
- {ok, Peer}.
+pick_peer(Peers, _, ?CLIENT, _State, send_detach, Id, {_,_}) ->
+ find(Id, Peers).
+
+find(Id, Peers) ->
+ [P] = lists:flatmap(fun(C) -> peer(Id, C) end,
+ diameter:service_info(?CLIENT, transport)),
+ case lists:keyfind(P, 1, Peers) of %% OTP-10470 will provide a better way.
+ {_,_} = TC ->
+ {ok, TC};
+ false = No ->
+ No
+ end.
+
+peer(Id, [{ref, _},
+ {type, connect},
+ {options, Opts},
+ {watchdog, _},
+ {peer, {PeerRef, _}}
+ | _]) ->
+ [PeerRef || lists:member({id, Id}, Opts)].
-%% prepare_request/4/5
+%% prepare_request/5-6
-prepare_request(_Pkt, ?CLIENT, {_Ref, _Caps}, send_discard) ->
+prepare_request(_Pkt, ?CLIENT, {_Ref, _Caps}, send_discard, _) ->
{discard, unprepared};
-prepare_request(Pkt, ?CLIENT, {_Ref, Caps}, Name) ->
+prepare_request(Pkt, ?CLIENT, {_Ref, Caps}, Name, _) ->
{send, prepare(Pkt, Caps, Name)}.
-prepare_request(Pkt, ?CLIENT, {_Ref, Caps}, send_detach, _) ->
+prepare_request(Pkt, ?CLIENT, {_Ref, Caps}, send_detach, _, _) ->
{eval_packet, {send, prepare(Pkt, Caps)}, [fun log/2, detach]}.
log(#diameter_packet{} = P, T) ->
@@ -714,25 +738,25 @@ prepare(#diameter_packet{msg = Req}, Caps)
{'Destination-Realm', DR},
{'Auth-Application-Id', ?APP_ID}]).
-%% prepare_retransmit/4
+%% prepare_retransmit/5
-prepare_retransmit(_Pkt, false, _Peer, _Name) ->
+prepare_retransmit(_Pkt, false, _Peer, _Name, _Id) ->
discard.
-%% handle_answer/5/6
+%% handle_answer/6-7
-handle_answer(Pkt, Req, ?CLIENT, Peer, Name) ->
+handle_answer(Pkt, Req, ?CLIENT, Peer, Name, _Id) ->
answer(Pkt, Req, Peer, Name).
-handle_answer(Pkt, _Req, ?CLIENT, _Peer, send_detach, {Pid, Ref}) ->
+handle_answer(Pkt, _Req, ?CLIENT, _Peer, send_detach, _Id, {Pid, Ref}) ->
Pid ! {Ref, Pkt}.
answer(#diameter_packet{msg = Rec, errors = []}, _Req, _Peer, _) ->
Rec.
-%% handle_error/5
+%% handle_error/6
-handle_error(Reason, _Req, ?CLIENT, _Peer, _Name) ->
+handle_error(Reason, _Req, ?CLIENT, _Peer, _Name, _Id) ->
{error, Reason}.
%% handle_request/3