%%
%% %CopyrightBegin%
%%
%% Copyright Ericsson AB 1997-2018. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%
%% %CopyrightEnd%
%%
-module(distribution_SUITE).
-define(VERSION_MAGIC, 131).
-define(ATOM_EXT, 100).
-define(REFERENCE_EXT, 101).
-define(PORT_EXT, 102).
-define(PID_EXT, 103).
-define(NEW_REFERENCE_EXT, 114).
-define(ATOM_UTF8_EXT, 118).
-define(SMALL_ATOM_UTF8_EXT, 119).
%% Tests distribution and the tcp driver.
-include_lib("common_test/include/ct.hrl").
%-define(Line, erlang:display({line,?LINE}),).
-define(Line,).
-export([all/0, suite/0, groups/0,
init_per_suite/1, end_per_suite/1,
init_per_group/2, end_per_group/2,
ping/1, bulk_send_small/1,
group_leader/1,
optimistic_dflags/1,
bulk_send_big/1, bulk_send_bigbig/1,
local_send_small/1, local_send_big/1,
local_send_legal/1, link_to_busy/1, exit_to_busy/1,
lost_exit/1, link_to_dead/1, link_to_dead_new_node/1,
ref_port_roundtrip/1, nil_roundtrip/1,
trap_bif_1/1, trap_bif_2/1, trap_bif_3/1,
stop_dist/1,
dist_auto_connect_never/1, dist_auto_connect_once/1,
dist_parallel_send/1,
atom_roundtrip/1,
unicode_atom_roundtrip/1,
contended_atom_cache_entry/1,
contended_unicode_atom_cache_entry/1,
bad_dist_structure/1,
bad_dist_ext_receive/1,
bad_dist_ext_process_info/1,
bad_dist_ext_control/1,
bad_dist_ext_connection_id/1,
bad_dist_ext_size/1,
start_epmd_false/1, epmd_module/1,
bad_dist_fragments/1,
message_latency_large_message/1,
message_latency_large_link_exit/1,
message_latency_large_monitor_exit/1,
message_latency_large_exit2/1,
system_limit/1]).
%% Internal exports.
-export([sender/3, receiver2/2, dummy_waiter/0, dead_process/0,
group_leader_1/1,
optimistic_dflags_echo/0, optimistic_dflags_sender/1,
roundtrip/1, bounce/1, do_dist_auto_connect/1, inet_rpc_server/1,
dist_parallel_sender/3, dist_parallel_receiver/0,
dist_evil_parallel_receiver/0, make_busy/2]).
%% epmd_module exports
-export([start_link/0, register_node/2, register_node/3, port_please/2, address_please/3]).
suite() ->
[{ct_hooks,[ts_install_cth]},
{timetrap, {minutes, 4}}].
all() ->
[ping, {group, bulk_send}, {group, local_send},
group_leader,
optimistic_dflags,
link_to_busy, exit_to_busy, lost_exit, link_to_dead,
link_to_dead_new_node,
ref_port_roundtrip, nil_roundtrip, stop_dist,
{group, trap_bif}, {group, dist_auto_connect},
dist_parallel_send, atom_roundtrip, unicode_atom_roundtrip,
contended_atom_cache_entry, contended_unicode_atom_cache_entry,
{group, message_latency},
{group, bad_dist}, {group, bad_dist_ext},
start_epmd_false, epmd_module, system_limit].
groups() ->
[{bulk_send, [], [bulk_send_small, bulk_send_big, bulk_send_bigbig]},
{local_send, [],
[local_send_small, local_send_big, local_send_legal]},
{trap_bif, [], [trap_bif_1, trap_bif_2, trap_bif_3]},
{dist_auto_connect, [],
[dist_auto_connect_never, dist_auto_connect_once]},
{bad_dist, [],
[bad_dist_structure, bad_dist_fragments]},
{bad_dist_ext, [],
[bad_dist_ext_receive, bad_dist_ext_process_info,
bad_dist_ext_size,
bad_dist_ext_control, bad_dist_ext_connection_id]},
{message_latency, [],
[message_latency_large_message,
message_latency_large_link_exit,
message_latency_large_monitor_exit,
message_latency_large_exit2]}
].
init_per_suite(Config) ->
{ok, Apps} = application:ensure_all_started(os_mon),
[{started_apps, Apps} | Config].
end_per_suite(Config) ->
Apps = proplists:get_value(started_apps, Config),
[application:stop(App) || App <- lists:reverse(Apps)],
Config.
init_per_group(message_latency, Config) ->
Free = free_memory(),
if Free < 2048 ->
{skip, "Not enough memory"};
true ->
Config
end;
init_per_group(_, Config) ->
Config.
end_per_group(_, Config) ->
Config.
%% Tests pinging a node in different ways.
ping(Config) when is_list(Config) ->
Times = 1024,
%% Ping a non-existing node many times. This used to crash the emulator
%% on Windows.
Host = hostname(),
BadName = list_to_atom("__pucko__@" ++ Host),
io:format("Pinging ~s (assumed to not exist)", [BadName]),
test_server:do_times(Times, fun() -> pang = net_adm:ping(BadName)
end),
%% Pings another node.
{ok, OtherNode} = start_node(distribution_SUITE_other),
io:format("Pinging ~s (assumed to exist)", [OtherNode]),
test_server:do_times(Times, fun() -> pong = net_adm:ping(OtherNode) end),
stop_node(OtherNode),
%% Pings our own node many times.
Node = node(),
io:format("Pinging ~s (the same node)", [Node]),
test_server:do_times(Times, fun() -> pong = net_adm:ping(Node) end),
ok.
%% Test erlang:group_leader(_, ExternalPid), i.e. DOP_GROUP_LEADER
group_leader(Config) when is_list(Config) ->
?Line Sock = start_relay_node(group_leader_1, []),
?Line Sock2 = start_relay_node(group_leader_2, []),
try
?Line Node2 = inet_rpc_nodename(Sock2),
?Line {ok, ok} = do_inet_rpc(Sock, ?MODULE, group_leader_1, [Node2])
after
?Line stop_relay_node(Sock),
?Line stop_relay_node(Sock2)
end,
ok.
group_leader_1(Node2) ->
?Line ExtPid = spawn(Node2, fun F() ->
receive {From, group_leader} ->
From ! {self(), group_leader, group_leader()}
end,
F()
end),
?Line GL1 = self(),
?Line group_leader(GL1, ExtPid),
?Line ExtPid ! {self(), group_leader},
?Line {ExtPid, group_leader, GL1} = receive_one(),
%% Kill connection and repeat test when group_leader/2 triggers auto-connect
?Line net_kernel:monitor_nodes(true),
?Line net_kernel:disconnect(Node2),
?Line {nodedown, Node2} = receive_one(),
?Line GL2 = spawn(fun() -> dummy end),
?Line group_leader(GL2, ExtPid),
?Line {nodeup, Node2} = receive_one(),
?Line ExtPid ! {self(), group_leader},
?Line {ExtPid, group_leader, GL2} = receive_one(),
ok.
%% Test optimistic distribution flags toward pending connections (DFLAG_DIST_HOPEFULLY)
optimistic_dflags(Config) when is_list(Config) ->
?Line Sender = start_relay_node(optimistic_dflags_sender, []),
?Line Echo = start_relay_node(optimistic_dflags_echo, []),
try
?Line {ok, ok} = do_inet_rpc(Echo, ?MODULE, optimistic_dflags_echo, []),
?Line EchoNode = inet_rpc_nodename(Echo),
?Line {ok, ok} = do_inet_rpc(Sender, ?MODULE, optimistic_dflags_sender, [EchoNode])
after
?Line stop_relay_node(Sender),
?Line stop_relay_node(Echo)
end,
ok.
optimistic_dflags_echo() ->
P = spawn(fun F() ->
receive {From, Term} ->
From ! {self(), Term}
end,
F()
end),
register(optimistic_dflags_echo, P),
optimistic_dflags_echo ! {self(), hello},
{P, hello} = receive_one(),
ok.
optimistic_dflags_sender(EchoNode) ->
?Line net_kernel:monitor_nodes(true),
optimistic_dflags_do(EchoNode, <<1:1>>),
optimistic_dflags_do(EchoNode, fun lists:map/2),
ok.
optimistic_dflags_do(EchoNode, Term) ->
?Line {optimistic_dflags_echo, EchoNode} ! {self(), Term},
?Line {nodeup, EchoNode} = receive_one(),
?Line {EchoPid, Term} = receive_one(),
%% repeat with pid destination
?Line net_kernel:disconnect(EchoNode),
?Line {nodedown, EchoNode} = receive_one(),
?Line EchoPid ! {self(), Term},
?Line {nodeup, EchoNode} = receive_one(),
?Line {EchoPid, Term} = receive_one(),
?Line net_kernel:disconnect(EchoNode),
?Line {nodedown, EchoNode} = receive_one(),
ok.
receive_one() ->
receive M -> M after 1000 -> timeout end.
bulk_send_small(Config) when is_list(Config) ->
bulk_send(64, 32).
bulk_send_big(Config) when is_list(Config) ->
bulk_send(32, 64).
bulk_send(Terms, BinSize) ->
ct:timetrap({seconds, 30}),
io:format("Sending ~w binaries, each of size ~w K", [Terms, BinSize]),
{ok, Node} = start_node(bulk_receiver),
Recv = spawn(Node, erlang, apply, [fun receiver/2, [0, 0]]),
Bin = binary:copy(<<253>>, BinSize*1024),
Size = Terms*size(Bin),
{Elapsed, {Terms, Size}} = test_server:timecall(?MODULE, sender,
[Recv, Bin, Terms]),
stop_node(Node),
{comment, integer_to_list(round(Size/1024/max(1,Elapsed))) ++ " K/s"}.
sender(To, _Bin, 0) ->
To ! {done, self()},
receive
Any ->
Any
end;
sender(To, Bin, Left) ->
To ! {term, Bin},
sender(To, Bin, Left-1).
bulk_send_bigbig(Config) when is_list(Config) ->
Terms = 32*5,
BinSize = 4,
{Rate1, MonitorCount1} = bulk_sendsend2(Terms, BinSize, 5),
{Rate2, MonitorCount2} = bulk_sendsend2(Terms, BinSize, 995),
Ratio = if MonitorCount2 == 0 -> MonitorCount1 / 1.0;
true -> MonitorCount1 / MonitorCount2
end,
Comment0 = io_lib:format("~p K/s, ~p K/s, "
"~p monitor msgs, ~p monitor msgs, "
"~.1f monitor ratio",
[Rate1,Rate2,MonitorCount1,
MonitorCount2,Ratio]),
Comment = lists:flatten(Comment0),
{comment,Comment}.
bulk_sendsend2(Terms, BinSize, BusyBufSize) ->
ct:timetrap({seconds, 30}),
io:format("\nSending ~w binaries, each of size ~w K",
[Terms, BinSize]),
{ok, NodeRecv} = start_node(bulk_receiver),
Recv = spawn(NodeRecv, erlang, apply, [fun receiver/2, [0, 0]]),
Bin = binary:copy(<<253>>, BinSize*1024),
%% SLF LEFT OFF HERE.
%% When the caller uses small hunks, like 4k via
%% bulk_sendsend(32*5, 4), then (on my laptop at least), we get
%% zero monitor messages. But if we use "+zdbbl 5", then we
%% get a lot of monitor messages. So, if we can count up the
%% total number of monitor messages that we get when running both
%% default busy size and "+zdbbl 5", and if the 5 case gets
%% "many many more" monitor messages, then we know we're working.
{ok, NodeSend} = start_node(bulk_sender, "+zdbbl " ++
integer_to_list(BusyBufSize)),
_Send = spawn(NodeSend, erlang, apply,
[fun sendersender/4, [self(), Recv, Bin, Terms]]),
{Elapsed, {_TermsN, SizeN}, MonitorCount} =
receive
%% On some platforms (Windows), the time taken is 0 so we
%% simulate that some little time has passed.
{sendersender, {0.0,T,MC}} ->
{0.0015, T, MC};
{sendersender, BigRes} ->
BigRes
end,
stop_node(NodeRecv),
stop_node(NodeSend),
{round(SizeN/1024/Elapsed), MonitorCount}.
%% Sender process to be run on a slave node
sendersender(Parent, To, Bin, Left) ->
erlang:system_monitor(self(), [busy_dist_port]),
_ = spawn(fun() ->
sendersender_send(To, Bin, Left),
exit(normal)
end),
{USec, {Res, MonitorCount}} =
timer:tc(fun() ->
sendersender_send(To, Bin, Left),
To ! {done, self()},
count_monitors(0)
end),
Parent ! {sendersender, {USec/1000000, Res, MonitorCount}}.
sendersender_send(_To, _Bin, 0) ->
ok;
sendersender_send(To, Bin, Left) ->
To ! {term, Bin},
sendersender_send(To, Bin, Left-1).
count_monitors(MonitorCount) ->
receive
{monitor, _Pid, _Type, _Info} ->
count_monitors(MonitorCount + 1)
after 0 ->
receive
{_,_}=Any ->
{Any,MonitorCount}
end
end.
%% Receiver process to be run on a slave node.
receiver(Terms, Size) ->
receive
{term, Bin} ->
receiver(Terms+1, Size+byte_size(Bin));
{done, ReplyTo} ->
ReplyTo ! {Terms, Size}
end.
%% Sends several big message to an non-registered process on the local node.
local_send_big(Config) when is_list(Config) ->
Data0= ["Tests sending small and big messages to a non-existing ",
"local registered process."],
Data1=[Data0,[Data0, Data0, [Data0], Data0],Data0],
Data2=Data0++lists:flatten(Data1)++
list_to_binary(lists:flatten(Data1)),
Func=fun() -> Data2= {arbitrary_name, node()} ! Data2 end,
test_server:do_times(4096, Func),
ok.
%% Sends a small message to an non-registered process on the local node.
local_send_small(Config) when is_list(Config) ->
Data={some_stupid, "arbitrary", 'Data'},
Func=fun() -> Data= {unregistered_name, node()} ! Data end,
test_server:do_times(4096, Func),
ok.
%% Sends data to a registered process on the local node, as if it was on another node.
local_send_legal(Config) when is_list(Config) ->
Times=16384,
Txt = "Some Not so random Data",
Data={[Txt,Txt,Txt], [Txt,Txt,Txt]},
Pid=spawn(?MODULE,receiver2, [0, 0]) ,
true=register(registered_process, Pid),
Func=fun() -> Data={registered_process, node()} ! Data end,
TotalSize=size(Data)*Times,
test_server:do_times(Times, Func),
% Check that all msgs really came through.
Me=self(),
{done, Me}=
{registered_process, node()} ! {done, Me},
receive
{Times, TotalSize} ->
ok;
_ ->
ct:fail("Wrong number of msgs received.")
end,
ok.
receiver2(Num, TotSize) ->
receive
{done, ReplyTo} ->
ReplyTo ! {Num, TotSize};
Stuff ->
receiver2(Num+1, TotSize+size(Stuff))
end.
%% Test that link/1 to a busy distribution port works.
link_to_busy(Config) when is_list(Config) ->
ct:timetrap({seconds, 60}),
{ok, Node} = start_node(link_to_busy),
Recv = spawn(Node, erlang, apply, [fun sink/1, [link_to_busy_sink]]),
Tracer = case os:getenv("TRACE_BUSY_DIST_PORT") of
"true" -> start_busy_dist_port_tracer();
_ -> false
end,
%% We will spawn off a process which will try to link to the other
%% node. The linker process will not actually run until this
%% process is suspended due to the busy distribution port (because
%% of the big send). When the link/1 is run, the linker
%% process will block, too, because of the because busy port,
%% and will later be restarted.
do_busy_test(Node, fun () -> linker(Recv) end),
%% Same thing, but we apply link/1 instead of calling it directly.
do_busy_test(Node, fun () -> applied_linker(Recv) end),
%% Same thing again, but we apply link/1 in the tail of a function.
do_busy_test(Node, fun () -> tail_applied_linker(Recv) end),
%% Done.
stop_node(Node),
stop_busy_dist_port_tracer(Tracer),
ok.
linker(Pid) ->
true = link(Pid),
{links, Links} = process_info(self(), links),
true = lists:member(Pid, Links).
applied_linker(Pid) ->
true = apply(erlang, link, [Pid]),
{links, Links} = process_info(self(), links),
true = lists:member(Pid, Links).
tail_applied_linker(Pid) ->
apply(erlang, link, [Pid]).
%% Test that exit/2 to a busy distribution port works.
exit_to_busy(Config) when is_list(Config) ->
ct:timetrap({seconds, 60}),
{ok, Node} = start_node(exit_to_busy),
Tracer = case os:getenv("TRACE_BUSY_DIST_PORT") of
"true" -> start_busy_dist_port_tracer();
_ -> false
end,
%% We will spawn off a process which will try to exit a process on
%% the other node. That process will not actually run until this
%% process is suspended due to the busy distribution port
%% The process executing exit/2 will block,
%% too, because of the busy distribution port, and will be allowed
%% to continue when the port becomes non-busy.
Recv1 = spawn(Node, fun () -> sink(exit_to_busy_sink) end),
M1 = erlang:monitor(process, Recv1),
do_busy_test(Node, fun () -> joey_killer(Recv1) end),
receive
{'DOWN', M1, process, Recv1, R1} ->
joey_said_die = R1
end,
%% Same thing, but tail call to exit/2.
Recv2 = spawn(Node, fun () -> sink(exit_to_busy_sink) end),
M2 = erlang:monitor(process, Recv2),
do_busy_test(Node, fun () -> tail_joey_killer(Recv2) end),
receive
{'DOWN', M2, process, Recv2, R2} ->
joey_said_die = R2
end,
%% Same thing, but we apply exit/2 instead of calling it directly.
Recv3 = spawn(Node, fun () -> sink(exit_to_busy_sink) end),
M3 = erlang:monitor(process, Recv3),
do_busy_test(Node, fun () -> applied_joey_killer(Recv3) end),
receive
{'DOWN', M3, process, Recv3, R3} ->
joey_said_die = R3
end,
%% Same thing again, but we apply exit/2 in the tail of a function.
Recv4 = spawn(Node, fun () -> sink(exit_to_busy_sink) end),
M4 = erlang:monitor(process, Recv4),
do_busy_test(Node, fun () -> tail_applied_joey_killer(Recv4) end),
receive
{'DOWN', M4, process, Recv4, R4} ->
joey_said_die = R4
end,
%% Done.
stop_node(Node),
stop_busy_dist_port_tracer(Tracer),
ok.
make_busy_data() ->
Size = 1024*1024,
Key = '__busy__port__data__',
case get(Key) of
undefined ->
Data = list_to_binary(lists:duplicate(Size, 253)),
put(Key, Data),
Data;
Data ->
true = is_binary(Data),
true = size(Data) == Size,
Data
end.
make_busy(Node, Time) when is_integer(Time) ->
Own = 500,
freeze_node(Node, Time+Own),
Data = make_busy_data(),
DCtrl = dctrl(Node),
%% first make port busy
Pid = spawn_link(fun () ->
forever(fun () ->
dctrl_dop_reg_send(Node,
'__noone__',
Data)
end)
end),
receive after Own -> ok end,
until(fun () ->
case {DCtrl, process_info(Pid, status)} of
{DPrt, {status, suspended}} when is_port(DPrt) -> true;
{DPid, {status, waiting}} when is_pid(DPid) -> true;
_ -> false
end
end),
%% then dist entry
make_busy(Node, [nosuspend], Data),
Pid.
make_busy(Node, Opts, Data) ->
case erlang:send({'__noone__', Node}, Data, Opts) of
nosuspend -> nosuspend;
_ -> make_busy(Node, Opts, Data)
end.
unmake_busy(Pid) ->
unlink(Pid),
exit(Pid, bang).
do_busy_test(Node, Fun) ->
Busy = make_busy(Node, 1000),
{P, M} = spawn_monitor(Fun),
receive after 100 -> ok end,
Pinfo = process_info(P, [status, current_function]),
unmake_busy(Busy),
io:format("~p : ~p~n", [P, Pinfo]),
case Pinfo of
undefined ->
receive
{'DOWN', M, process, P, Reason} ->
io:format("~p died with exit reason ~p~n", [P, Reason])
end,
ct:fail(premature_death);
_ ->
%% Don't match arity; it is different in debug and
%% optimized emulator
[{status, suspended},
{current_function, {Mod, Func, _}}] = Pinfo,
if
Mod =:= erlang andalso Func =:= bif_return_trap ->
true;
Mod =:= erts_internal andalso Func =:= dsend_continue_trap ->
true;
true ->
ct:fail({incorrect, pinfo, Pinfo})
end,
receive
{'DOWN', M, process, P, Reason} ->
io:format("~p died with exit reason ~p~n", [P, Reason]),
verify_nc(node()),
verify_nc(Node),
normal = Reason
end
end.
remote_is_process_alive(Pid) ->
rpc:call(node(Pid), erlang, is_process_alive,
[Pid]).
joey_killer(Pid) ->
exit(Pid, joey_said_die),
until(fun () -> false == remote_is_process_alive(Pid) end).
tail_joey_killer(Pid) ->
exit(Pid, joey_said_die).
applied_joey_killer(Pid) ->
apply(erlang, exit, [Pid, joey_said_die]),
until(fun () -> false == remote_is_process_alive(Pid) end).
tail_applied_joey_killer(Pid) ->
apply(erlang, exit, [Pid, joey_said_die]).
sink(Name) ->
register(Name, self()),
sink1().
sink1() ->
receive
_Any -> sink1()
end.
%% Test that EXIT and DOWN messages send to another node are not lost if
%% the distribution port is busy.
lost_exit(Config) when is_list(Config) ->
{ok, Node} = start_node(lost_exit),
Tracer = case os:getenv("TRACE_BUSY_DIST_PORT") of
"true" -> start_busy_dist_port_tracer();
_ -> false
end,
Self = self(),
Die = make_ref(),
R1 = spawn(fun () -> receive after infinity -> ok end end),
MR1 = erlang:monitor(process, R1),
{L1, ML1} = spawn_monitor(fun() ->
link(R1),
Self ! {self(), linked},
receive
Die ->
exit(controlled_suicide)
end
end),
R2 = spawn(fun () ->
M = erlang:monitor(process, L1),
receive
{'DOWN', M, process, L1, R} ->
Self ! {self(), got_down_message, L1, R}
end
end),
receive {L1, linked} -> ok end,
Busy = make_busy(Node, 2000),
receive after 100 -> ok end,
L1 ! Die,
receive
{'DOWN', ML1, process, L1, RL1} ->
controlled_suicide = RL1
end,
receive after 500 -> ok end,
unmake_busy(Busy),
receive
{'DOWN', MR1, process, R1, RR1} ->
controlled_suicide = RR1
end,
receive
{R2, got_down_message, L1, RR2} ->
controlled_suicide = RR2
end,
%% Done.
stop_busy_dist_port_tracer(Tracer),
stop_node(Node),
ok.
dummy_waiter() ->
receive
after infinity ->
ok
end.
%% Test that linking to a dead remote process gives an EXIT message
%% AND that the link is teared down.
link_to_dead(Config) when is_list(Config) ->
process_flag(trap_exit, true),
{ok, Node} = start_node(link_to_dead),
% monitor_node(Node, true),
net_adm:ping(Node), %% Ts_cross_server workaround.
Pid = spawn(Node, ?MODULE, dead_process, []),
receive
after 5000 -> ok
end,
link(Pid),
receive
{'EXIT', Pid, noproc} ->
ok;
Other ->
ct:fail({unexpected_message, Other})
after 5000 ->
ct:fail(nothing_received)
end,
{links, Links} = process_info(self(), links),
io:format("Pid=~p, links=~p", [Pid, Links]),
false = lists:member(Pid, Links),
stop_node(Node),
receive
Message ->
ct:fail({unexpected_message, Message})
after 3000 ->
ok
end,
ok.
dead_process() ->
erlang:error(die).
%% Test that linking to a pid on node that has gone and restarted gives
%% the correct EXIT message (OTP-2304).
link_to_dead_new_node(Config) when is_list(Config) ->
process_flag(trap_exit, true),
%% Start the node, get a Pid and stop the node again.
{ok, Node} = start_node(link_to_dead_new_node),
Pid = spawn(Node, ?MODULE, dead_process, []),
stop_node(Node),
%% Start a new node with the same name.
{ok, Node} = start_node(link_to_dead_new_node),
link(Pid),
receive
{'EXIT', Pid, noproc} ->
ok;
Other ->
stop_node(Node),
ct:fail({unexpected_message, Other})
after 5000 ->
ct:fail(nothing_received)
end,
%% Make sure that the link wasn't created.
{links, Links} = process_info(self(), links),
io:format("Pid=~p, links=~p", [Pid, Links]),
false = lists:member(Pid, Links),
stop_node(Node),
receive
Message ->
ct:fail({unexpected_message, Message})
after 3000 ->
ok
end,
ok.
%% Test that sending a port or reference to another node and back again
%% doesn't correct them in any way.
ref_port_roundtrip(Config) when is_list(Config) ->
process_flag(trap_exit, true),
Port = make_port(),
Ref = make_ref(),
{ok, Node} = start_node(ref_port_roundtrip),
net_adm:ping(Node),
Term = {Port, Ref},
io:format("Term before: ~p", [show_term(Term)]),
Pid = spawn_link(Node, ?MODULE, roundtrip, [Term]),
receive after 5000 -> ok end,
stop_node(Node),
receive
{'EXIT', Pid, {Port, Ref}} ->
io:format("Term after: ~p", [show_term(Term)]),
ok;
Other ->
io:format("Term after: ~p", [show_term(Term)]),
ct:fail({unexpected, Other})
after 10000 ->
ct:fail(timeout)
end,
ok.
make_port() ->
hd(erlang:ports()).
roundtrip(Term) ->
exit(Term).
%% Test that the smallest external term [] aka NIL can be sent to
%% another node node and back again.
nil_roundtrip(Config) when is_list(Config) ->
process_flag(trap_exit, true),
{ok, Node} = start_node(nil_roundtrip),
net_adm:ping(Node),
Pid = spawn_link(Node, ?MODULE, bounce, [self()]),
Pid ! [],
receive
[] ->
receive
{'EXIT', Pid, []} ->
stop_node(Node),
ok
end
end.
bounce(Dest) ->
receive Msg ->
Dest ! Msg,
exit(Msg)
end.
show_term(Term) ->
binary_to_list(term_to_binary(Term)).
%% Tests behaviour after net_kernel:stop (OTP-2586).
stop_dist(Config) when is_list(Config) ->
Str = os:cmd(ct:get_progname()
++ " -noshell -pa "
++ proplists:get_value(data_dir, Config)
++ " -s run"),
%% The "true" may be followed by an error report, so ignore anything that
%% follows it.
"true\n"++_ = Str,
%% "May fail on FreeBSD due to differently configured name lookup - ask Arndt",
%% if you can find him.
ok.
trap_bif_1(Config) when is_list(Config) ->
{true} = tr1(),
ok.
trap_bif_2(Config) when is_list(Config) ->
{true} = tr2(),
ok.
trap_bif_3(Config) when is_list(Config) ->
{hoo} = tr3(),
ok.
tr1() ->
NonExisting = 'abc@boromir',
X = erlang:monitor_node(NonExisting, true),
{X}.
tr2() ->
NonExisting = 'abc@boromir',
X = apply(erlang, monitor_node, [NonExisting, true]),
{X}.
tr3() ->
NonExisting = 'abc@boromir',
X = {NonExisting, glirp} ! hoo,
{X}.
% This has to be done by nodes with differrent cookies, otherwise global
% will connect nodes, which is correct, but makes it hard to test.
% * Start two nodes, n1 and n2. n2 with the dist_auto_connect once parameter
% * n2 pings n1 -> connection
% * check that they now know each other
% * Kill n1
% * Make sure n2 gets pang when pinging n1
% * restart n1
% * Make sure n2 *still gets pang*!
% * Ping n2 from n1 -> pong
% * n2 now also gets pong when pinging n1
% * disconnect n2 from n1
% * n2 gets pang when pinging n1
% * n2 forces connection by using net_kernel:connect_node (ovverrides)
% * n2 gets pong when pinging n1.
%% Test the dist_auto_connect once kernel parameter
dist_auto_connect_once(Config) when is_list(Config) ->
Sock = start_relay_node(dist_auto_connect_relay_node,[]),
NN = inet_rpc_nodename(Sock),
Sock2 = start_relay_node(dist_auto_connect_once_node,
"-kernel dist_auto_connect once"),
NN2 = inet_rpc_nodename(Sock2),
{ok,[]} = do_inet_rpc(Sock,erlang,nodes,[]),
{ok, pong} = do_inet_rpc(Sock2,net_adm,ping,[NN]),
{ok,[NN2]} = do_inet_rpc(Sock,erlang,nodes,[]),
{ok,[NN]} = do_inet_rpc(Sock2,erlang,nodes,[]),
[_,HostPartPeer] = string:lexemes(atom_to_list(NN),"@"),
[_,MyHostPart] = string:lexemes(atom_to_list(node()),"@"),
% Give net_kernel a chance to change the state of the node to up to.
receive after 1000 -> ok end,
case HostPartPeer of
MyHostPart ->
ok = stop_relay_node(Sock),
{ok,pang} = do_inet_rpc(Sock2,net_adm,ping,[NN]);
_ ->
{ok, true} = do_inet_rpc(Sock,net_kernel,disconnect,[NN2]),
receive
after 500 -> ok
end
end,
{ok, []} = do_inet_rpc(Sock2,erlang,nodes,[]),
Sock3 = case HostPartPeer of
MyHostPart ->
start_relay_node(dist_auto_connect_relay_node,[]);
_ ->
Sock
end,
TS1 = timestamp(),
{ok, pang} = do_inet_rpc(Sock2,net_adm,ping,[NN]),
TS2 = timestamp(),
RefT = net_kernel:connecttime() - 1000,
true = ((TS2 - TS1) < RefT),
TS3 = timestamp(),
{ok, true} = do_inet_rpc(Sock2,erlang,monitor_node,
[NN,true,[allow_passive_connect]]),
TS4 = timestamp(),
true = ((TS4 - TS3) > RefT),
{ok, pong} = do_inet_rpc(Sock3,net_adm,ping,[NN2]),
{ok, pong} = do_inet_rpc(Sock2,net_adm,ping,[NN]),
{ok, true} = do_inet_rpc(Sock3,net_kernel,disconnect,[NN2]),
receive
after 500 -> ok
end,
{ok, pang} = do_inet_rpc(Sock2,net_adm,ping,[NN]),
{ok, true} = do_inet_rpc(Sock2,net_kernel,connect_node,[NN]),
{ok, pong} = do_inet_rpc(Sock2,net_adm,ping,[NN]),
stop_relay_node(Sock3),
stop_relay_node(Sock2).
%% Start a relay node and a lonely (dist_auto_connect never) node.
%% Lonely node pings relay node. That should fail.
%% Lonely node connects to relay node with net_kernel:connect_node/1.
%% Result is sent here through relay node.
dist_auto_connect_never(Config) when is_list(Config) ->
Self = self(),
{ok, RelayNode} = start_node(dist_auto_connect_relay),
spawn(RelayNode,
fun() ->
register(dist_auto_connect_relay, self()),
dist_auto_connect_relay(Self)
end),
{ok, Handle} = dist_auto_connect_start(dist_auto_connect, never),
Result = receive
{do_dist_auto_connect, ok} ->
ok;
{do_dist_auto_connect, Error} ->
{error, Error};
%% The io:formats in dos_dist_auto_connect will
%% generate port output messages that are ok
Other when not is_port(element(1, Other))->
{error, Other}
after 32000 ->
timeout
end,
stop_node(RelayNode),
Stopped = dist_auto_connect_stop(Handle),
Junk = receive
{do_dist_auto_connect, _} = J -> J
after 0 -> ok
end,
{ok, ok, ok} = {Result, Stopped, Junk},
ok.
do_dist_auto_connect([never]) ->
Node = list_to_atom("dist_auto_connect_relay@" ++ hostname()),
io:format("~p:do_dist_auto_connect([false]) Node=~p~n", [?MODULE, Node]),
Ping = net_adm:ping(Node),
io:format("~p:do_dist_auto_connect([false]) Ping=~p~n", [?MODULE, Ping]),
Result = case Ping of
pang -> ok;
_ -> {error, Ping}
end,
io:format("~p:do_dist_auto_connect([false]) Result=~p~n", [?MODULE, Result]),
net_kernel:connect_node(Node),
catch {dist_auto_connect_relay, Node} ! {do_dist_auto_connect, Result};
% receive after 1000 -> ok end,
% halt();
do_dist_auto_connect(Arg) ->
io:format("~p:do_dist_auto_connect(~p)~n", [?MODULE, Arg]),
receive after 10000 -> ok end,
halt().
dist_auto_connect_start(Name, Value) when is_atom(Name) ->
dist_auto_connect_start(atom_to_list(Name), Value);
dist_auto_connect_start(Name, Value) when is_list(Name), is_atom(Value) ->
Node = list_to_atom(lists:append([Name, "@", hostname()])),
ModuleDir = filename:dirname(code:which(?MODULE)),
ValueStr = atom_to_list(Value),
Cookie = atom_to_list(erlang:get_cookie()),
Cmd = lists:append(
[%"xterm -e ",
ct:get_progname(),
% " -noinput ",
" -detached ",
long_or_short(), " ", Name,
" -setcookie ", Cookie,
" -pa ", ModuleDir,
" -s ", atom_to_list(?MODULE),
" do_dist_auto_connect ", ValueStr,
" -kernel dist_auto_connect ", ValueStr]),
io:format("~p:dist_auto_connect_start() cmd: ~p~n", [?MODULE, Cmd]),
Port = open_port({spawn, Cmd}, [stream]),
{ok, {Port, Node}}.
dist_auto_connect_stop({Port, Node}) ->
Pid = spawn_link(fun() -> rpc:call(Node, erlang, halt, []) end),
dist_auto_connect_stop(Port, Node, Pid, 5000).
dist_auto_connect_stop(Port, _Node, Pid, N) when is_integer(N), N =< 0 ->
exit(Pid, normal),
catch erlang:port_close(Port),
Result = {error, node_not_down},
io:format("~p:dist_auto_connect_stop() ~p~n", [?MODULE, Result]),
Result;
dist_auto_connect_stop(Port, Node, Pid, N) when is_integer(N) ->
case net_adm:ping(Node) of
pong ->
receive after 100 -> ok end,
dist_auto_connect_stop(Port, Node, Pid, N-100);
pang ->
exit(Pid, normal),
catch erlang:port_close(Port),
io:format("~p:dist_auto_connect_stop() ok~n", [?MODULE]),
ok
end.
dist_auto_connect_relay(Parent) ->
receive X ->
catch Parent ! X
end,
dist_auto_connect_relay(Parent).
dist_parallel_send(Config) when is_list(Config) ->
{ok, RNode} = start_node(dist_parallel_receiver),
{ok, SNode} = start_node(dist_parallel_sender),
WatchDog = spawn_link(
fun () ->
TRef = erlang:start_timer((2*60*1000), self(), oops),
receive
{timeout, TRef, _ } ->
spawn(SNode, fun () -> abort(timeout) end),
spawn(RNode, fun () -> abort(timeout) end)
%% rpc:cast(SNode, erlang, halt,
%% ["Timetrap (sender)"]),
%% rpc:cast(RNode, erlang, halt,
%% ["Timetrap (receiver)"])
end
end),
MkSndrs = fun (Receiver) ->
lists:map(fun (_) ->
spawn_link(SNode,
?MODULE,
dist_parallel_sender,
[self(), Receiver, 1000])
end, lists:seq(1, 64))
end,
SndrsStart = fun (Sndrs) ->
Parent = self(),
spawn_link(SNode,
fun () ->
lists:foreach(fun (P) ->
P ! {go, Parent}
end, Sndrs)
end)
end,
SndrsWait = fun (Sndrs) ->
lists:foreach(fun (P) ->
receive {P, done} -> ok end
end, Sndrs)
end,
DPR = spawn_link(RNode, ?MODULE, dist_parallel_receiver, []),
Sndrs1 = MkSndrs(DPR),
SndrsStart(Sndrs1),
SndrsWait(Sndrs1),
unlink(DPR),
exit(DPR, bang),
DEPR = spawn_link(RNode, ?MODULE, dist_evil_parallel_receiver, []),
Sndrs2 = MkSndrs(DEPR),
SndrsStart(Sndrs2),
SndrsWait(Sndrs2),
unlink(DEPR),
exit(DEPR, bang),
unlink(WatchDog),
exit(WatchDog, bang),
stop_node(RNode),
stop_node(SNode),
ok.
do_dist_parallel_sender(Parent, _Receiver, 0) ->
Parent ! {self(), done};
do_dist_parallel_sender(Parent, Receiver, N) ->
Receiver ! {self(), "Some data"},
do_dist_parallel_sender(Parent, Receiver, N-1).
dist_parallel_sender(Parent, Receiver, N) ->
receive {go, Parent} -> ok end,
do_dist_parallel_sender(Parent, Receiver, N).
dist_parallel_receiver() ->
receive {_Sender, _Data} -> ok end,
dist_parallel_receiver().
dist_evil_parallel_receiver() ->
receive {Sender, _Data} -> ok end,
net_kernel:disconnect(node(Sender)),
dist_evil_parallel_receiver().
atom_roundtrip(Config) when is_list(Config) ->
AtomData = atom_data(),
verify_atom_data(AtomData),
{ok, Node} = start_node(Config),
do_atom_roundtrip(Node, AtomData),
stop_node(Node),
ok.
unicode_atom_roundtrip(Config) when is_list(Config) ->
AtomData = unicode_atom_data(),
verify_atom_data(AtomData),
{ok, Node} = start_node(Config),
do_atom_roundtrip(Node, AtomData),
stop_node(Node),
ok.
do_atom_roundtrip(Node, AtomData) ->
Parent = self(),
Proc = spawn_link(Node, fun () -> verify_atom_data_loop(Parent) end),
Proc ! {self(), AtomData},
receive {Proc, AD1} -> AtomData = AD1 end,
Proc ! {self(), AtomData},
receive {Proc, AD2} -> AtomData = AD2 end,
RevAtomData = lists:reverse(AtomData),
Proc ! {self(), RevAtomData},
receive {Proc, RAD1} -> RevAtomData = RAD1 end,
unlink(Proc),
exit(Proc, bang),
ok.
verify_atom_data_loop(From) ->
receive
{From, AtomData} ->
verify_atom_data(AtomData),
From ! {self(), AtomData},
verify_atom_data_loop(From)
end.
atom_data() ->
lists:map(fun (N) ->
ATxt = "a"++integer_to_list(N),
{list_to_atom(ATxt), ATxt}
end,
lists:seq(1, 2000)).
verify_atom_data(AtomData) ->
lists:foreach(fun ({Atom, AtomTxt}) when is_atom(Atom) ->
AtomTxt = atom_to_list(Atom);
({PPR, AtomTxt}) ->
% Pid, Port, or Ref
AtomTxt = atom_to_list(node(PPR))
end,
AtomData).
uc_atom_tup(ATxt) ->
Atom = string_to_atom(ATxt),
ATxt = atom_to_list(Atom),
{Atom, ATxt}.
uc_pid_tup(ATxt) ->
ATxtExt = string_to_atom_ext(ATxt),
Pid = mk_pid({ATxtExt, 1}, 4711,17),
true = is_pid(Pid),
Atom = node(Pid),
true = is_atom(Atom),
ATxt = atom_to_list(Atom),
{Pid, ATxt}.
uc_port_tup(ATxt) ->
ATxtExt = string_to_atom_ext(ATxt),
Port = mk_port({ATxtExt, 2}, 4711),
true = is_port(Port),
Atom = node(Port),
true = is_atom(Atom),
ATxt = atom_to_list(Atom),
{Port, ATxt}.
uc_ref_tup(ATxt) ->
ATxtExt = string_to_atom_ext(ATxt),
Ref = mk_ref({ATxtExt, 3}, [4711,17, 4711]),
true = is_reference(Ref),
Atom = node(Ref),
true = is_atom(Atom),
ATxt = atom_to_list(Atom),
{Ref, ATxt}.
unicode_atom_data() ->
[uc_pid_tup(lists:seq(16#1f600, 16#1f600+249) ++ "@host"),
uc_pid_tup(lists:seq(16#1f600, 16#1f600+30) ++ "@host"),
uc_port_tup(lists:seq(16#1f600, 16#1f600+249) ++ "@host"),
uc_port_tup(lists:seq(16#1f600, 16#1f600+30) ++ "@host"),
uc_ref_tup(lists:seq(16#1f600, 16#1f600+249) ++ "@host"),
uc_ref_tup(lists:seq(16#1f600, 16#1f600+30) ++ "@host"),
uc_atom_tup(lists:seq(16#1f600, 16#1f600+254)),
uc_atom_tup(lists:seq(16#1f600, 16#1f600+63)),
uc_atom_tup(lists:seq(0, 254)),
uc_atom_tup(lists:seq(100, 163)),
uc_atom_tup(lists:seq(200, 354)),
uc_atom_tup(lists:seq(200, 263)),
uc_atom_tup(lists:seq(2000, 2254)),
uc_atom_tup(lists:seq(2000, 2063)),
uc_atom_tup(lists:seq(65500, 65754)),
uc_atom_tup(lists:seq(65500, 65563))
| lists:map(fun (N) ->
uc_atom_tup(lists:seq(64000+N, 64254+N))
end, lists:seq(1, 2000))].
contended_atom_cache_entry(Config) when is_list(Config) ->
contended_atom_cache_entry_test(Config, latin1).
contended_unicode_atom_cache_entry(Config) when is_list(Config) ->
contended_atom_cache_entry_test(Config, unicode).
contended_atom_cache_entry_test(Config, Type) ->
TestServer = self(),
ProcessPairs = 10,
Msgs = 100000,
{ok, SNode} = start_node(Config),
{ok, RNode} = start_node(Config),
Success = make_ref(),
spawn_link(
SNode,
fun () ->
Master = self(),
CIX = get_cix(),
TestAtoms = case Type of
latin1 ->
get_conflicting_atoms(CIX,
ProcessPairs);
unicode ->
get_conflicting_unicode_atoms(CIX,
ProcessPairs)
end,
io:format("Testing with the following atoms all using "
"cache index ~p:~n ~w~n",
[CIX, TestAtoms]),
Ps = lists:map(
fun (A) ->
Ref = make_ref(),
R = spawn_link(RNode,
fun () ->
Atom = receive
{Ref, txt, ATxt} ->
case Type of
latin1 ->
list_to_atom(ATxt);
unicode ->
string_to_atom(ATxt)
end
end,
receive_ref_atom(Ref,
Atom,
Msgs),
Master ! {self(), success}
end),
S = spawn_link(SNode,
fun () ->
receive go -> ok end,
R ! {Ref,
txt,
atom_to_list(A)},
send_ref_atom(R, Ref, A, Msgs)
end),
{S, R}
end,
TestAtoms),
lists:foreach(fun ({S, _}) ->
S ! go
end,
Ps),
lists:foreach(fun ({_, R}) ->
receive {R, success} -> ok end
end,
Ps),
TestServer ! Success
end),
receive
Success ->
ok
end,
stop_node(SNode),
stop_node(RNode),
ok.
send_ref_atom(_To, _Ref, _Atom, 0) ->
ok;
send_ref_atom(To, Ref, Atom, N) ->
To ! {Ref, Atom},
send_ref_atom(To, Ref, Atom, N-1).
receive_ref_atom(_Ref, _Atom, 0) ->
ok;
receive_ref_atom(Ref, Atom, N) ->
receive
{Ref, Value} ->
Atom = Value
end,
receive_ref_atom(Ref, Atom, N-1).
get_cix() ->
get_cix(1000).
get_cix(CIX) when is_integer(CIX), CIX < 0 ->
get_cix(0);
get_cix(CIX) when is_integer(CIX) ->
get_cix(CIX,
unwanted_cixs(),
get_internal_state(max_atom_out_cache_index)).
get_cix(CIX, Unwanted, MaxCIX) when CIX > MaxCIX ->
get_cix(0, Unwanted, MaxCIX);
get_cix(CIX, Unwanted, MaxCIX) ->
case lists:member(CIX, Unwanted) of
true -> get_cix(CIX+1, Unwanted, MaxCIX);
false -> CIX
end.
unwanted_cixs() ->
lists:map(fun (Node) ->
get_internal_state({atom_out_cache_index,
Node})
end,
nodes()).
get_conflicting_atoms(_CIX, 0) ->
[];
get_conflicting_atoms(CIX, N) ->
Atom = list_to_atom("atom" ++ integer_to_list(erlang:unique_integer([positive]))),
case get_internal_state({atom_out_cache_index, Atom}) of
CIX ->
[Atom|get_conflicting_atoms(CIX, N-1)];
_ ->
get_conflicting_atoms(CIX, N)
end.
get_conflicting_unicode_atoms(_CIX, 0) ->
[];
get_conflicting_unicode_atoms(CIX, N) ->
Atom = string_to_atom([16#1f608] ++ "atom" ++ integer_to_list(erlang:unique_integer([positive]))),
case get_internal_state({atom_out_cache_index, Atom}) of
CIX ->
[Atom|get_conflicting_unicode_atoms(CIX, N-1)];
_ ->
get_conflicting_unicode_atoms(CIX, N)
end.
%% The message_latency_large tests that small distribution messages are
%% not blocked by other large distribution messages. Basically it tests
%% that fragmentation of distribution messages works.
message_latency_large_message(Config) when is_list(Config) ->
measure_latency_large_message(?FUNCTION_NAME, fun(Dropper, Payload) -> Dropper ! Payload end).
message_latency_large_exit2(Config) when is_list(Config) ->
measure_latency_large_message(?FUNCTION_NAME, fun erlang:exit/2).
message_latency_large_link_exit(Config) when is_list(Config) ->
message_latency_large_exit(?FUNCTION_NAME, fun erlang:link/1).
message_latency_large_monitor_exit(Config) when is_list(Config) ->
message_latency_large_exit(?FUNCTION_NAME, fun(Dropper) ->
Dropper ! {monitor, self()},
receive ok -> ok end
end).
message_latency_large_exit(Nodename, ReasonFun) ->
measure_latency_large_message(
Nodename,
fun(Dropper, Payload) ->
Pid = spawn(fun() ->
receive go -> ok end,
ReasonFun(Dropper),
exit(Payload)
end),
FlushTrace = fun F() ->
receive
{trace, Pid, _, _} ->
F()
after 0 ->
ok
end
end,
erlang:trace(Pid, true, [exiting]),
Pid ! go,
receive
{trace, Pid, out_exited, 0} ->
FlushTrace()
end
end).
measure_latency_large_message(Nodename, DataFun) ->
erlang:system_monitor(self(), [busy_dist_port]),
{ok, N} = start_node(Nodename),
Dropper = spawn(N, fun F() ->
process_flag(trap_exit, true),
receive
{monitor,Pid} ->
erlang:monitor(process, Pid),
Pid ! ok;
_ -> ok
end,
F()
end),
Echo = spawn(N, fun F() -> receive {From, Msg} -> From ! Msg, F() end end),
BuildType = erlang:system_info(build_type),
WordSize = erlang:system_info(wordsize),
if
BuildType =/= opt; WordSize =:= 4 ->
%% Test 3.2 MB and 32 MB and test the latency difference of sent messages
Payloads = [{I, <<0:(I * 32 * 1024 * 8)>>} || I <- [1,10]];
true ->
%% Test 32 MB and 320 MB and test the latency difference of sent messages
Payloads = [{I, <<0:(I * 32 * 1024 * 1024 * 8)>>} || I <- [1,10]]
end,
IndexTimes = [{I, measure_latency(DataFun, Dropper, Echo, P)}
|| {I, P} <- Payloads],
Times = [ Time || {_I, Time} <- IndexTimes],
ct:pal("~p",[IndexTimes]),
stop_node(N),
case {lists:max(Times), lists:min(Times)} of
{Max, Min} when Max * 0.25 > Min, BuildType =:= opt ->
ct:fail({incorrect_latency, IndexTimes});
_ ->
ok
end.
measure_latency(DataFun, Dropper, Echo, Payload) ->
flush(),
Senders = [spawn_monitor(
fun F() ->
DataFun(Dropper, Payload),
receive
die -> ok
after 0 ->
F()
end
end) || _ <- lists:seq(1,2)],
[receive
{monitor, _Sender, busy_dist_port, _Info} ->
ok
end || _ <- lists:seq(1,10)],
{TS, Times} =
timer:tc(fun() ->
[begin
T0 = erlang:monotonic_time(),
Echo ! {self(), hello},
receive hello -> ok end,
(erlang:monotonic_time() - T0) / 1000000
end || _ <- lists:seq(1,100)]
end),
Avg = lists:sum(Times) / length(Times),
StdDev = math:sqrt(lists:sum([math:pow(V - Avg,2) || V <- Times]) / length(Times)),
ct:pal("Times: Avg: ~p Max: ~p Min: ~p Var: ~p",
[Avg, lists:max(Times), lists:min(Times), StdDev]),
[begin
Sender ! die,
receive
{'DOWN', Ref, process, _, _} ->
ok
end
end || {Sender, Ref} <- Senders],
TS.
flush() ->
receive
_ ->
flush()
after 0 ->
ok
end.
system_limit(Config) when is_list(Config) ->
case erlang:system_info(wordsize) of
8 ->
case proplists:get_value(system_total_memory,
memsup:get_system_memory_data()) of
Memory when is_integer(Memory),
Memory > 6*1024*1024*1024 ->
test_system_limit(Config),
garbage_collect(),
ok;
_ ->
{skipped, "Not enough memory on this machine"}
end;
4 ->
{skipped, "Only interesting on 64-bit builds"}
end.
test_system_limit(Config) when is_list(Config) ->
Bits = ((1 bsl 32)+1)*8,
HugeBin = <<0:Bits>>,
HugeListBin = [lists:duplicate(2000000,2000000), HugeBin],
{ok, N1} = start_node(Config),
monitor_node(N1, true),
receive
{nodedown, N1} ->
ct:fail({unexpected_nodedown, N1})
after 0 ->
ok
end,
P1 = spawn(N1,
fun () ->
receive after infinity -> ok end
end),
io:format("~n** distributed send **~n~n", []),
try
P1 ! HugeBin,
exit(oops1)
catch
error:system_limit -> ok
end,
try
P1 ! HugeListBin,
exit(oops2)
catch
error:system_limit -> ok
end,
io:format("~n** distributed exit **~n~n", []),
try
exit(P1, HugeBin),
exit(oops3)
catch
error:system_limit -> ok
end,
try
exit(P1, HugeListBin),
exit(oops4)
catch
error:system_limit -> ok
end,
io:format("~n** distributed registered send **~n~n", []),
try
{missing_proc, N1} ! HugeBin,
exit(oops5)
catch
error:system_limit -> ok
end,
try
{missing_proc, N1} ! HugeListBin,
exit(oops6)
catch
error:system_limit -> ok
end,
receive
{nodedown, N1} ->
ct:fail({unexpected_nodedown, N1})
after 0 ->
ok
end,
%%
%% system_limit in exit reasons brings the
%% connection down...
%%
io:format("~n** distributed link exit **~n~n", []),
spawn(fun () ->
link(P1),
exit(HugeBin)
end),
receive {nodedown, N1} -> ok end,
{ok, N2} = start_node(Config),
monitor_node(N2, true),
P2 = spawn(N2,
fun () ->
receive after infinity -> ok end
end),
spawn(fun () ->
link(P2),
exit(HugeListBin)
end),
receive {nodedown, N2} -> ok end,
io:format("~n** distributed monitor down **~n~n", []),
{ok, N3} = start_node(Config),
monitor_node(N3, true),
Go1 = make_ref(),
LP1 = spawn(fun () ->
receive Go1 -> ok end,
exit(HugeBin)
end),
_ = spawn(N3,
fun () ->
_ = erlang:monitor(process, LP1),
LP1 ! Go1,
receive after infinity -> ok end
end),
receive {nodedown, N3} -> ok end,
{ok, N4} = start_node(Config),
monitor_node(N4, true),
Go2 = make_ref(),
LP2 = spawn(fun () ->
receive Go2 -> ok end,
exit(HugeListBin)
end),
_ = spawn(N4,
fun () ->
_ = erlang:monitor(process, LP2),
LP2 ! Go2,
receive after infinity -> ok end
end),
receive {nodedown, N4} -> ok end,
ok.
-define(COOKIE, '').
-define(DOP_LINK, 1).
-define(DOP_SEND, 2).
-define(DOP_EXIT, 3).
-define(DOP_UNLINK, 4).
-define(DOP_REG_SEND, 6).
-define(DOP_GROUP_LEADER, 7).
-define(DOP_EXIT2, 8).
-define(DOP_SEND_TT, 12).
-define(DOP_EXIT_TT, 13).
-define(DOP_REG_SEND_TT, 16).
-define(DOP_EXIT2_TT, 18).
-define(DOP_MONITOR_P, 19).
-define(DOP_DEMONITOR_P, 20).
-define(DOP_MONITOR_P_EXIT, 21).
-define(DOP_SEND_SENDER, 22).
-define(DOP_SEND_SENDER_TT, 23).
-define(DOP_PAYLOAD_EXIT, 24).
-define(DOP_PAYLOAD_EXIT_TT, 25).
-define(DOP_PAYLOAD_EXIT2, 26).
-define(DOP_PAYLOAD_EXIT2_TT, 27).
-define(DOP_PAYLOAD_MONITOR_P_EXIT, 28).
start_monitor(Offender,P) ->
Parent = self(),
Q = spawn(Offender,
fun () ->
Ref = erlang:monitor(process,P),
Parent ! {self(),ref,Ref},
receive
just_stay_alive -> ok
end
end),
Ref = receive
{Q,ref,R} ->
R
after 5000 ->
error
end,
io:format("Ref is ~p~n",[Ref]),
ok.
start_link(Offender,P) ->
Parent = self(),
Q = spawn(Offender,
fun () ->
process_flag(trap_exit,true),
link(P),
Parent ! {self(),ref,P},
receive
just_stay_alive -> ok
end
end),
Ref = receive
{Q,ref,R} ->
R
after 5000 ->
error
end,
io:format("Ref is ~p~n",[Ref]),
ok.
%% Test dist messages with valid structure (binary to term ok) but malformed control content
bad_dist_structure(Config) when is_list(Config) ->
ct:timetrap({seconds, 15}),
{ok, Offender} = start_node(bad_dist_structure_offender),
{ok, Victim} = start_node(bad_dist_structure_victim),
start_node_monitors([Offender,Victim]),
Parent = self(),
P = spawn(Victim,
fun () ->
process_flag(trap_exit,true),
Parent ! {self(), started},
receive check_msgs -> ok end,
bad_dist_struct_check_msgs([one,
two]),
Parent ! {self(), messages_checked},
receive done -> ok end
end),
receive {P, started} -> ok end,
pong = rpc:call(Victim, net_adm, ping, [Offender]),
verify_up(Offender, Victim),
true = lists:member(Offender, rpc:call(Victim, erlang, nodes, [])),
start_monitor(Offender,P),
P ! one,
send_bad_structure(Offender, P,{?DOP_MONITOR_P_EXIT,'replace',P,normal},2),
start_monitor(Offender,P),
send_bad_structure(Offender, P,{?DOP_MONITOR_P_EXIT,'replace',P,normal,normal},2),
start_link(Offender,P),
send_bad_structure(Offender, P,{?DOP_LINK},0),
start_link(Offender,P),
send_bad_structure(Offender, P,{?DOP_UNLINK,'replace'},2),
start_link(Offender,P),
send_bad_structure(Offender, P,{?DOP_UNLINK,'replace',make_ref()},2),
start_link(Offender,P),
send_bad_structure(Offender, P,{?DOP_UNLINK,make_ref(),P},0),
start_link(Offender,P),
send_bad_structure(Offender, P,{?DOP_UNLINK,normal,normal},0),
start_monitor(Offender,P),
send_bad_structure(Offender, P,{?DOP_MONITOR_P,'replace',P},2),
start_monitor(Offender,P),
send_bad_structure(Offender, P,{?DOP_MONITOR_P,'replace',P,normal},2),
start_monitor(Offender,P),
send_bad_structure(Offender, P,{?DOP_DEMONITOR_P,'replace',P},2),
start_monitor(Offender,P),
send_bad_structure(Offender, P,{?DOP_DEMONITOR_P,'replace',P,normal},2),
send_bad_structure(Offender, P,{?DOP_EXIT,'replace',P},2),
send_bad_structure(Offender, P,{?DOP_EXIT,make_ref(),normal,normal},0),
send_bad_structure(Offender, P,{?DOP_EXIT_TT,'replace',token,P},2),
send_bad_structure(Offender, P,{?DOP_EXIT_TT,make_ref(),token,normal,normal},0),
send_bad_structure(Offender, P,{?DOP_EXIT2,'replace',P},2),
send_bad_structure(Offender, P,{?DOP_EXIT2,make_ref(),normal,normal},0),
send_bad_structure(Offender, P,{?DOP_EXIT2_TT,'replace',token,P},2),
send_bad_structure(Offender, P,{?DOP_EXIT2_TT,make_ref(),token,normal,normal},0),
send_bad_structure(Offender, P,{?DOP_GROUP_LEADER,'replace'},2),
send_bad_structure(Offender, P,{?DOP_GROUP_LEADER,'replace','atomic'},2),
send_bad_structure(Offender, P,{?DOP_GROUP_LEADER,'replace',P},0),
send_bad_structure(Offender, P,{?DOP_REG_SEND_TT,'replace','',name},2,{message}),
send_bad_structure(Offender, P,{?DOP_REG_SEND_TT,'replace','',name,token},0,{message}),
send_bad_structure(Offender, P,{?DOP_REG_SEND,'replace',''},2,{message}),
send_bad_structure(Offender, P,{?DOP_REG_SEND,'replace','',P},0,{message}),
send_bad_structure(Offender, P,{?DOP_REG_SEND,'replace','',name},0,{message}),
send_bad_structure(Offender, P,{?DOP_REG_SEND,'replace','',name,{token}},2,{message}),
send_bad_structure(Offender, P,{?DOP_SEND_TT,'',P},0,{message}),
send_bad_structure(Offender, P,{?DOP_SEND_TT,'',name,token},0,{message}),
send_bad_structure(Offender, P,{?DOP_SEND,''},0,{message}),
send_bad_structure(Offender, P,{?DOP_SEND,'',name},0,{message}),
send_bad_structure(Offender, P,{?DOP_SEND,'',P,{token}},0,{message}),
P ! two,
P ! check_msgs,
receive
{P, messages_checked} -> ok
after 5000 ->
exit(victim_is_dead)
end,
{message_queue_len, 0}
= rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
unlink(P),
P ! done,
stop_node(Offender),
stop_node(Victim),
ok.
%% Test various dist fragmentation errors
bad_dist_fragments(Config) when is_list(Config) ->
ct:timetrap({seconds, 15}),
{ok, Offender} = start_node(bad_dist_fragment_offender),
{ok, Victim} = start_node(bad_dist_fragment_victim),
Msg = iolist_to_binary(dmsg_ext(lists:duplicate(255,255))),
start_node_monitors([Offender,Victim]),
Parent = self(),
P = spawn(Victim,
fun () ->
process_flag(trap_exit,true),
Parent ! {self(), started},
receive check_msgs -> ok end,
bad_dist_struct_check_msgs([one,
two]),
Parent ! {self(), messages_checked},
receive done -> ok end
end),
receive {P, started} -> ok end,
pong = rpc:call(Victim, net_adm, ping, [Offender]),
verify_up(Offender, Victim),
true = lists:member(Offender, rpc:call(Victim, erlang, nodes, [])),
start_monitor(Offender,P),
P ! one,
start_monitor(Offender,P),
send_bad_fragments(Offender, Victim, P,{?DOP_SEND,?COOKIE,P},3,
[{frg, 1, binary:part(Msg, 10,byte_size(Msg)-10)}]),
start_monitor(Offender,P),
send_bad_fragments(Offender, Victim, P,{?DOP_SEND,?COOKIE,P},3,
[{hdr, 3, binary:part(Msg, 0,10)},
{frg, 1, binary:part(Msg, 10,byte_size(Msg)-10)}]),
start_monitor(Offender,P),
send_bad_fragments(Offender, Victim, P,{?DOP_SEND,?COOKIE,P},3,
[{hdr, 3, binary:part(Msg, 0,10)},
{hdr, 3, binary:part(Msg, 0,10)}]),
start_monitor(Offender,P),
send_bad_fragments(Offender, Victim, P,{?DOP_SEND,?COOKIE,P,broken},3,
[{hdr, 1, binary:part(Msg, 10,byte_size(Msg)-10)}]),
start_monitor(Offender,P),
send_bad_fragments(Offender, Victim, P,{?DOP_SEND,?COOKIE,P},3,
[{hdr, 3, binary:part(Msg, 10,byte_size(Msg)-10)},
close]),
start_monitor(Offender,P),
ExitVictim = spawn(Victim, fun() -> receive ok -> ok end end),
send_bad_fragments(Offender, Victim, P,{?DOP_PAYLOAD_EXIT,P,ExitVictim},2,
[{hdr, 1, [131]}]),
start_monitor(Offender,P),
Exit2Victim = spawn(Victim, fun() -> receive ok -> ok end end),
send_bad_fragments(Offender, Victim, P,{?DOP_PAYLOAD_EXIT2,P,Exit2Victim},2,
[{hdr, 1, [132]}]),
start_monitor(Offender,P),
DownVictim = spawn(Victim, fun() -> receive ok -> ok end end),
DownRef = erlang:monitor(process, DownVictim),
send_bad_fragments(Offender, Victim, P,{?DOP_PAYLOAD_MONITOR_P_EXIT,P,DownVictim,DownRef},2,
[{hdr, 1, [133]}]),
P ! two,
P ! check_msgs,
receive
{P, messages_checked} -> ok
after 5000 ->
exit(victim_is_dead)
end,
{message_queue_len, 0}
= rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
unlink(P),
P ! done,
stop_node(Offender),
stop_node(Victim),
ok.
dmsg_frag_hdr(Frag) ->
dmsg_frag_hdr(erlang:phash2(self()), Frag).
dmsg_frag_hdr(Seq, Frag) ->
[131, $E, uint64_be(Seq), uint64_be(Frag), 0].
dmsg_frag(Frag) ->
dmsg_frag(erlang:phash2(self()), Frag).
dmsg_frag(Seq, Frag) ->
[131, $F, uint64_be(Seq), uint64_be(Frag)].
send_bad_fragments(Offender,VictimNode,Victim,Ctrl,WhereToPutSelf,Fragments) ->
Parent = self(),
Done = make_ref(),
ct:pal("Send: ~p",[Fragments]),
spawn_link(Offender,
fun () ->
Node = node(Victim),
pong = net_adm:ping(Node),
erlang:monitor_node(Node, true),
DCtrl = dctrl(Node),
Ctrl1 = case WhereToPutSelf of
0 ->
Ctrl;
N when N > 0 ->
setelement(N,Ctrl,self())
end,
FragData = [case Type of
hdr ->
[dmsg_frag_hdr(FragId),
dmsg_ext(Ctrl1), FragPayload];
frg ->
[dmsg_frag(FragId), FragPayload]
end || {Type, FragId, FragPayload} <- Fragments],
receive {nodedown, Node} -> exit("premature nodedown")
after 10 -> ok
end,
[ dctrl_send(DCtrl, D) || D <- FragData ],
[ erlang:port_close(DCtrl) || close <- Fragments],
receive {nodedown, Node} -> ok
after 5000 -> exit("missing nodedown")
end,
Parent ! {FragData,Done}
end),
receive
{WhatSent,Done} ->
io:format("Offender sent ~p~n",[WhatSent]),
verify_nc(VictimNode),
ok
after 7000 ->
exit(unable_to_send)
end.
bad_dist_ext_receive(Config) when is_list(Config) ->
{ok, Offender} = start_node(bad_dist_ext_receive_offender),
{ok, Victim} = start_node(bad_dist_ext_receive_victim),
start_node_monitors([Offender,Victim]),
Parent = self(),
P = spawn_link(Victim,
fun () ->
Parent ! {self(), started},
receive check_msgs -> ok end,
bad_dist_ext_check_msgs([one,
two,
three]),
Parent ! {self(), messages_checked},
receive done -> ok end
end),
receive {P, started} -> ok end,
pong = rpc:call(Victim, net_adm, ping, [Offender]),
verify_up(Offender, Victim),
true = lists:member(Offender, rpc:call(Victim, erlang, nodes, [])),
P ! one,
send_bad_msg(Offender, P),
P ! two,
verify_down(Offender, connection_closed, Victim, killed),
{message_queue_len, 2}
= rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
Suspended = make_ref(),
S = spawn(Victim,
fun () ->
erlang:suspend_process(P),
Parent ! Suspended,
receive after infinity -> ok end
end),
MS = erlang:monitor(process, S),
receive Suspended -> ok end,
pong = rpc:call(Victim, net_adm, ping, [Offender]),
verify_up(Offender, Victim),
true = lists:member(Offender, rpc:call(Victim, erlang, nodes, [])),
send_bad_msgs(Offender, P, 5),
true = lists:member(Offender, rpc:call(Victim, erlang, nodes, [])),
P ! three,
send_bad_msgs(Offender, P, 5),
%% Make sure bad msgs has reached Victim
rpc:call(Offender, rpc, call, [Victim, erlang, node, []]),
verify_still_up(Offender, Victim),
{message_queue_len, 13}
= rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
exit(S, bang),
receive {'DOWN', MS, process, S, bang} -> ok end,
verify_down(Offender, connection_closed, Victim, killed),
{message_queue_len, 3}
= rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
P ! check_msgs,
receive {P, messages_checked} -> ok end,
{message_queue_len, 0}
= rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
P ! done,
unlink(P),
verify_no_down(Offender, Victim),
stop_node(Offender),
stop_node(Victim).
bad_dist_ext_process_info(Config) when is_list(Config) ->
{ok, Offender} = start_node(bad_dist_ext_process_info_offender),
{ok, Victim} = start_node(bad_dist_ext_process_info_victim),
start_node_monitors([Offender,Victim]),
Parent = self(),
P = spawn_link(Victim,
fun () ->
Parent ! {self(), started},
receive check_msgs -> ok end,
bad_dist_ext_check_msgs([one, two]),
Parent ! {self(), messages_checked},
receive done -> ok end
end),
receive {P, started} -> ok end,
P ! one,
Suspended = make_ref(),
S = spawn(Victim,
fun () ->
erlang:suspend_process(P),
Parent ! Suspended,
receive after infinity -> ok end
end),
receive Suspended -> ok end,
pong = rpc:call(Victim, net_adm, ping, [Offender]),
verify_up(Offender, Victim),
send_bad_msgs(Offender, P, 5),
P ! two,
send_bad_msgs(Offender, P, 5),
%% Make sure bad msgs has reached Victim
rpc:call(Offender, rpc, call, [Victim, erlang, node, []]),
verify_still_up(Offender, Victim),
{message_queue_len, 12}
= rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
verify_still_up(Offender, Victim),
[{message_queue_len, 2},
{messages, [one, two]}]
= rpc:call(Victim, erlang, process_info, [P, [message_queue_len,
messages]]),
verify_down(Offender, connection_closed, Victim, killed),
P ! check_msgs,
exit(S, bang),
receive {P, messages_checked} -> ok end,
{message_queue_len, 0}
= rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
P ! done,
unlink(P),
verify_no_down(Offender, Victim),
stop_node(Offender),
stop_node(Victim).
bad_dist_ext_control(Config) when is_list(Config) ->
{ok, Offender} = start_node(bad_dist_ext_control_offender),
{ok, Victim} = start_node(bad_dist_ext_control_victim),
start_node_monitors([Offender,Victim]),
pong = rpc:call(Victim, net_adm, ping, [Offender]),
verify_up(Offender, Victim),
send_bad_dhdr(Offender, Victim),
verify_down(Offender, connection_closed, Victim, killed),
pong = rpc:call(Victim, net_adm, ping, [Offender]),
verify_up(Offender, Victim),
send_bad_ctl(Offender, Victim),
verify_down(Offender, connection_closed, Victim, killed),
verify_no_down(Offender, Victim),
stop_node(Offender),
stop_node(Victim).
bad_dist_ext_connection_id(Config) when is_list(Config) ->
{ok, Offender} = start_node(bad_dist_ext_connection_id_offender),
{ok, Victim} = start_node(bad_dist_ext_connection_id_victim),
start_node_monitors([Offender,Victim]),
Parent = self(),
P = spawn_link(Victim,
fun () ->
Parent ! {self(), started},
receive check_msgs -> ok end,
bad_dist_ext_check_msgs([]),
Parent ! {self(), messages_checked},
receive done -> ok end
end),
receive {P, started} -> ok end,
Suspended = make_ref(),
S = spawn(Victim,
fun () ->
erlang:suspend_process(P),
Parent ! Suspended,
receive after infinity -> ok end
end),
MS = erlang:monitor(process, S),
receive Suspended -> ok end,
pong = rpc:call(Victim, net_adm, ping, [Offender]),
verify_up(Offender, Victim),
send_bad_msg(Offender, P),
%% Make sure bad msg has reached Victim
rpc:call(Offender, rpc, call, [Victim, erlang, node, []]),
{message_queue_len, 1}
= rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
true = rpc:call(Offender, net_kernel, disconnect, [Victim]),
verify_down(Offender, disconnect, Victim, connection_closed),
pong = rpc:call(Offender, net_adm, ping, [Victim]),
verify_up(Offender, Victim),
%% We have a new connection between Offender and Victim, bad message
%% should not bring it down.
{message_queue_len, 1}
= rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
exit(S, bang),
receive {'DOWN', MS, process, S, bang} -> ok end,
%% Wait for a while (if the connection is taken down it might take a
%% while).
receive after 2000 -> ok end,
verify_still_up(Offender, Victim),
P ! check_msgs,
receive {P, messages_checked} -> ok end,
{message_queue_len, 0}
= rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
verify_still_up(Offender, Victim),
P ! done,
unlink(P),
verify_no_down(Offender, Victim),
stop_node(Offender),
stop_node(Victim).
%% OTP-14661: Bad message is discovered by erts_msg_attached_data_size
bad_dist_ext_size(Config) when is_list(Config) ->
{ok, Offender} = start_node(bad_dist_ext_process_info_offender),
%%Prog = "Prog=/home/uabseri/src/otp_new3/bin/cerl -rr -debug",
Prog = [],
{ok, Victim} = start_node(bad_dist_ext_process_info_victim, [], Prog),
start_node_monitors([Offender,Victim]),
Parent = self(),
P = spawn_opt(Victim,
fun () ->
Parent ! {self(), started},
receive check_msgs -> ok end, %% DID CRASH HERE
bad_dist_ext_check_msgs([one]),
Parent ! {self(), messages_checked}
end,
[link,
%% on_heap to force total_heap_size to inspect msg queue
{message_queue_data, on_heap}]),
receive {P, started} -> ok end,
P ! one,
Suspended = make_ref(),
S = spawn(Victim,
fun () ->
erlang:suspend_process(P),
Parent ! Suspended,
receive after infinity -> ok end
end),
receive Suspended -> ok end,
pong = rpc:call(Victim, net_adm, ping, [Offender]),
verify_up(Offender, Victim),
send_bad_msgs(Offender, P, 1, dmsg_bad_tag()),
%% Make sure bad msgs has reached Victim
rpc:call(Offender, rpc, call, [Victim, erlang, node, []]),
verify_still_up(Offender, Victim),
%% Let process_info(P, total_heap_size) find bad msg and disconnect
rpc:call(Victim, erlang, process_info, [P, total_heap_size]),
verify_down(Offender, connection_closed, Victim, killed),
P ! check_msgs,
exit(S, bang), % resume Victim
receive {P, messages_checked} -> ok end,
unlink(P),
verify_no_down(Offender, Victim),
stop_node(Offender),
stop_node(Victim).
bad_dist_struct_check_msgs([]) ->
receive
Msg ->
exit({unexpected_message, Msg})
after 0 ->
ok
end;
bad_dist_struct_check_msgs([M|Ms]) ->
receive
{'EXIT',_,_} = EM ->
io:format("Ignoring exit message: ~p~n",[EM]),
bad_dist_struct_check_msgs([M|Ms]);
Msg ->
M = Msg,
bad_dist_struct_check_msgs(Ms)
end.
bad_dist_ext_check_msgs([]) ->
receive
Msg ->
exit({unexpected_message, Msg})
after 0 ->
ok
end;
bad_dist_ext_check_msgs([M|Ms]) ->
receive
Msg ->
M = Msg,
bad_dist_ext_check_msgs(Ms)
end.
ensure_dctrl(Node) ->
case dctrl(Node) of
undefined ->
pong = net_adm:ping(Node),
dctrl(Node);
DCtrl ->
DCtrl
end.
dctrl_send(DPrt, Data) when is_port(DPrt) ->
port_command(DPrt, Data);
dctrl_send(DPid, Data) when is_pid(DPid) ->
Ref = make_ref(),
DPid ! {send, self(), Ref, Data},
receive {Ref, Res} -> Res end.
dctrl_dop_reg_send(Node, Name, Msg) ->
dctrl_send(ensure_dctrl(Node),
[dmsg_hdr(),
dmsg_ext({?DOP_REG_SEND,
self(),
?COOKIE,
Name}),
dmsg_ext(Msg)]).
dctrl_dop_send(To, Msg) ->
Node = node(To),
dctrl_send(ensure_dctrl(Node),
[dmsg_hdr(),
dmsg_ext({?DOP_SEND, ?COOKIE, To}),
dmsg_ext(Msg)]).
send_bad_structure(Offender,Victim,Bad,WhereToPutSelf) ->
send_bad_structure(Offender,Victim,Bad,WhereToPutSelf,[]).
send_bad_structure(Offender,Victim,Bad,WhereToPutSelf,PayLoad) ->
Parent = self(),
Done = make_ref(),
spawn_link(Offender,
fun () ->
Node = node(Victim),
pong = net_adm:ping(Node),
erlang:monitor_node(Node, true),
DCtrl = dctrl(Node),
Bad1 = case WhereToPutSelf of
0 ->
Bad;
N when N > 0 ->
setelement(N,Bad,self())
end,
DData = [dmsg_hdr(),
dmsg_ext(Bad1)] ++
case PayLoad of
[] -> [];
_Other -> [dmsg_ext(PayLoad)]
end,
receive {nodedown, Node} -> exit("premature nodedown")
after 10 -> ok
end,
dctrl_send(DCtrl, DData),
receive {nodedown, Node} -> ok
after 5000 -> exit("missing nodedown")
end,
Parent ! {DData,Done}
end),
receive
{WhatSent,Done} ->
io:format("Offender sent ~p~n",[WhatSent]),
ok
after 5000 ->
exit(unable_to_send)
end.
%% send_bad_msgs():
%% Send a valid distribution header and control message
%% but an invalid message. This invalid message will be
%% enqueued in the receivers message queue.
send_bad_msg(BadNode, To) ->
send_bad_msgs(BadNode, To, 1).
send_bad_msgs(BadNode, To, Repeat) ->
send_bad_msgs(BadNode, To, Repeat, dmsg_bad_atom_cache_ref()).
send_bad_msgs(BadNode, To, Repeat, BadTerm) when is_atom(BadNode),
is_pid(To),
is_integer(Repeat) ->
Parent = self(),
Done = make_ref(),
spawn_link(BadNode,
fun () ->
Node = node(To),
pong = net_adm:ping(Node),
DCtrl = dctrl(Node),
DData = [dmsg_hdr(),
dmsg_ext({?DOP_SEND, ?COOKIE, To}),
BadTerm],
repeat(fun () -> dctrl_send(DCtrl, DData) end, Repeat),
Parent ! Done
end),
receive Done -> ok end.
%% send_bad_ctl():
%% Send a valid distribution header but an invalid control message.
send_bad_ctl(BadNode, ToNode) when is_atom(BadNode), is_atom(ToNode) ->
Parent = self(),
Done = make_ref(),
spawn_link(BadNode,
fun () ->
pong = net_adm:ping(ToNode),
%% We creat a valid ctl msg and replace an
%% atom with an invalid atom cache reference
<<131,Replace/binary>> = term_to_binary(replace),
Ctl = dmsg_ext({?DOP_REG_SEND,
self(),
?COOKIE,
replace}),
CtlBeginSize = size(Ctl) - size(Replace),
<<CtlBegin:CtlBeginSize/binary, Replace/binary>> = Ctl,
DCtrl = dctrl(ToNode),
Data = [dmsg_fake_hdr2(),
CtlBegin,
dmsg_bad_atom_cache_ref(),
dmsg_ext({a, message})],
dctrl_send(DCtrl, Data),
Parent ! Done
end),
receive Done -> ok end.
%% send_bad_dhr():
%% Send an invalid distribution header
send_bad_dhdr(BadNode, ToNode) when is_atom(BadNode), is_atom(ToNode) ->
Parent = self(),
Done = make_ref(),
spawn_link(BadNode,
fun () ->
pong = net_adm:ping(ToNode),
dctrl_send(dctrl(ToNode), dmsg_bad_hdr()),
Parent ! Done
end),
receive Done -> ok end.
dctrl(Node) when is_atom(Node) ->
get_internal_state({dist_ctrl, Node}).
get_internal_state(Op) ->
try erts_debug:get_internal_state(Op) of
R -> R
catch
error:undef ->
erts_debug:set_internal_state(available_internal_state, true),
erts_debug:get_internal_state(Op)
end.
set_internal_state(Op, Val) ->
try erts_debug:set_internal_state(Op, Val) of
R -> R
catch
error:undef ->
erts_debug:set_internal_state(available_internal_state, true),
erts_debug:set_internal_state(Op, Val)
end.
dmsg_hdr() ->
[131, % Version Magic
$D, % Dist header
0]. % No atom cache referenses
dmsg_bad_hdr() ->
[131, % Version Magic
$D, % Dist header
255]. % 255 atom references
%% dmsg_fake_hdr1() ->
%% A = <<"fake header atom 1">>,
%% [131, % Version Magic
%% $D, 1, 16#8, 0, size(A), A]. % Fake header
dmsg_fake_hdr2() ->
A1 = <<"fake header atom 1">>,
A2 = <<"atom 2">>,
A3 = <<"atom 3">>,
[131, % Version Magic
$D,
3,
16#88, 16#08, % Flags
0, size(A1), A1,
1, size(A2), A2,
2, size(A3), A3].
dmsg_ext(Term) ->
<<131, Res/binary>> = term_to_binary(Term),
Res.
dmsg_bad_atom_cache_ref() ->
[$R, 137].
dmsg_bad_tag() -> %% Will fail early at heap size calculation
[$?, 66].
start_epmd_false(Config) when is_list(Config) ->
%% Start a node with the option -start_epmd false.
{ok, OtherNode} = start_node(start_epmd_false, "-start_epmd false"),
%% We should be able to ping it, as epmd was started by us:
pong = net_adm:ping(OtherNode),
stop_node(OtherNode),
ok.
epmd_module(Config) when is_list(Config) ->
%% We need a relay node to test this, since the test node uses the
%% standard epmd module.
Sock1 = start_relay_node(epmd_module_node1, "-epmd_module " ++ ?MODULE_STRING),
Node1 = inet_rpc_nodename(Sock1),
%% Ask what port it's listening on - it won't have registered with
%% epmd.
{ok, {ok, Port1}} = do_inet_rpc(Sock1, application, get_env, [kernel, dist_listen_port]),
%% Start a second node, passing the port number as a secret
%% argument.
Sock2 = start_relay_node(epmd_module_node2, "-epmd_module " ++ ?MODULE_STRING
++ " -other_node_port " ++ integer_to_list(Port1)),
Node2 = inet_rpc_nodename(Sock2),
%% Node 1 can't ping node 2
{ok, pang} = do_inet_rpc(Sock1, net_adm, ping, [Node2]),
{ok, []} = do_inet_rpc(Sock1, erlang, nodes, []),
{ok, []} = do_inet_rpc(Sock2, erlang, nodes, []),
%% But node 2 can ping node 1
{ok, pong} = do_inet_rpc(Sock2, net_adm, ping, [Node1]),
{ok, [Node2]} = do_inet_rpc(Sock1, erlang, nodes, []),
{ok, [Node1]} = do_inet_rpc(Sock2, erlang, nodes, []),
stop_relay_node(Sock2),
stop_relay_node(Sock1).
%% epmd_module functions:
start_link() ->
ignore.
register_node(Name, Port) ->
register_node(Name, Port, inet_tcp).
register_node(_Name, Port, _Driver) ->
%% Save the port number we're listening on.
application:set_env(kernel, dist_listen_port, Port),
Creation = rand:uniform(3),
{ok, Creation}.
port_please(_Name, _Ip) ->
case init:get_argument(other_node_port) of
error ->
%% None specified. Default to 42.
Port = 42,
Version = 5,
{port, Port, Version};
{ok, [[PortS]]} ->
%% Port number given on command line.
Port = list_to_integer(PortS),
Version = 5,
{port, Port, Version}
end.
address_please(_Name, _Address, _AddressFamily) ->
%% Use localhost.
IP = {127,0,0,1},
{ok, IP}.
%%% Utilities
timestamp() ->
erlang:monotonic_time(millisecond).
start_node(X) ->
start_node(X, [], []).
start_node(X, Y) ->
start_node(X, Y, []).
start_node(Name, Args, Rel) when is_atom(Name), is_list(Rel) ->
Pa = filename:dirname(code:which(?MODULE)),
Cookie = atom_to_list(erlang:get_cookie()),
RelArg = case Rel of
[] -> [];
_ -> [{erl,[{release,Rel}]}]
end,
test_server:start_node(Name, slave,
[{args,
Args++" -setcookie "++Cookie++" -pa \""++Pa++"\""}
| RelArg]);
start_node(Config, Args, Rel) when is_list(Config), is_list(Rel) ->
Name = list_to_atom((atom_to_list(?MODULE)
++ "-"
++ atom_to_list(proplists:get_value(testcase, Config))
++ "-"
++ integer_to_list(erlang:system_time(second))
++ "-"
++ integer_to_list(erlang:unique_integer([positive])))),
start_node(Name, Args, Rel).
stop_node(Node) ->
verify_nc(Node),
test_server:stop_node(Node).
verify_nc(Node) ->
P = self(),
Ref = make_ref(),
Pid = spawn(Node,
fun() ->
R = erts_test_utils:check_node_dist(fun(E) -> E end),
P ! {Ref, R}
end),
MonRef = monitor(process, Pid),
receive
{Ref, ok} ->
demonitor(MonRef,[flush]),
ok;
{Ref, Error} ->
ct:log("~p",[Error]),
ct:fail(failed_nc_refc_check);
{'DOWN', MonRef, _, _, _} = Down ->
ct:log("~p",[Down]),
ct:fail(crashed_nc_refc_check)
end.
freeze_node(Node, MS) ->
Own = 300,
DoingIt = make_ref(),
Freezer = self(),
spawn_link(Node,
fun () ->
dctrl_dop_send(Freezer, DoingIt),
receive after Own -> ok end,
set_internal_state(block, MS+Own)
end),
receive DoingIt -> ok end,
receive after Own -> ok end.
inet_rpc_nodename({N,H,_Sock}) ->
list_to_atom(N++"@"++H).
do_inet_rpc({_,_,Sock},M,F,A) ->
Bin = term_to_binary({M,F,A}),
gen_tcp:send(Sock,Bin),
case gen_tcp:recv(Sock,0) of
{ok, Bin2} ->
T = binary_to_term(Bin2),
{ok,T};
Else ->
{error, Else}
end.
inet_rpc_server([Host, PortList]) ->
Port = list_to_integer(PortList),
{ok, Sock} = gen_tcp:connect(Host, Port,[binary, {packet, 4},
{active, false}]),
inet_rpc_server_loop(Sock).
inet_rpc_server_loop(Sock) ->
case gen_tcp:recv(Sock,0) of
{ok, Bin} ->
{M,F,A} = binary_to_term(Bin),
Res = (catch apply(M,F,A)),
RB = term_to_binary(Res),
gen_tcp:send(Sock,RB),
inet_rpc_server_loop(Sock);
_ ->
erlang:halt()
end.
start_relay_node(Node, Args) ->
Pa = filename:dirname(code:which(?MODULE)),
Cookie = "NOT"++atom_to_list(erlang:get_cookie()),
{ok, LSock} = gen_tcp:listen(0, [binary, {packet, 4}, {active, false}]),
{ok, Port} = inet:port(LSock),
{ok, Host} = inet:gethostname(),
RunArg = "-run " ++ atom_to_list(?MODULE) ++ " inet_rpc_server " ++
Host ++ " " ++ integer_to_list(Port),
{ok, NN} = test_server:start_node(Node, peer,
[{args, Args ++
" -setcookie "++Cookie++" -pa "++Pa++" "++
RunArg}]),
[N,H] = string:lexemes(atom_to_list(NN),"@"),
{ok, Sock} = gen_tcp:accept(LSock),
pang = net_adm:ping(NN),
{N,H,Sock}.
stop_relay_node({N,H,Sock}) ->
catch do_inet_rpc(Sock,erlang,halt,[]),
catch gen_tcp:close(Sock),
wait_dead(N,H,10).
wait_dead(N,H,0) ->
{error,{not_dead,N,H}};
wait_dead(N,H,X) ->
case erl_epmd:port_please(N,H) of
{port,_,_} ->
receive
after 1000 ->
ok
end,
wait_dead(N,H,X-1);
noport ->
ok;
Else ->
{error, {unexpected, Else}}
end.
start_node_monitors(Nodes) ->
Master = self(),
lists:foreach(fun (Node) ->
spawn(Node,
fun () ->
node_monitor(Master)
end)
end,
Nodes),
ok.
node_monitor(Master) ->
Opts = [nodedown_reason,{node_type,all}],
Nodes0 = nodes(connected),
net_kernel:monitor_nodes(true, Opts),
Nodes1 = nodes(connected),
case lists:sort(Nodes0) == lists:sort(Nodes1) of
true ->
lists:foreach(fun (Node) ->
Master ! {nodeup, node(), Node}
end,
Nodes0),
io:format("~p ~p: ~p~n", [node(), erlang:system_time(microsecond), Nodes0]),
node_monitor_loop(Master);
false ->
net_kernel:monitor_nodes(false, Opts),
flush_node_changes(),
node_monitor(Master)
end.
flush_node_changes() ->
receive
{NodeChange, _Node, _InfoList} when NodeChange == nodeup;
NodeChange == nodedown ->
flush_node_changes()
after 0 ->
ok
end.
node_monitor_loop(Master) ->
receive
{nodeup, Node, _InfoList} = Msg ->
Master ! {nodeup, node(), Node},
io:format("~p ~p: ~p~n", [node(), erlang:system_time(microsecond), Msg]),
node_monitor_loop(Master);
{nodedown, Node, InfoList} = Msg ->
Reason = case lists:keysearch(nodedown_reason, 1, InfoList) of
{value, {nodedown_reason, R}} -> R;
_ -> undefined
end,
Master ! {nodedown, node(), Node, Reason},
io:format("~p ~p: ~p~n", [node(), erlang:system_time(microsecond), Msg]),
node_monitor_loop(Master)
end.
verify_up(A, B) ->
receive {nodeup, A, B} -> ok end,
receive {nodeup, B, A} -> ok end.
verify_still_up(A, B) ->
true = lists:member(B, rpc:call(A, erlang, nodes, [connected])),
true = lists:member(A, rpc:call(B, erlang, nodes, [connected])),
verify_no_down(A, B).
verify_no_down(A, B) ->
receive
{nodedown, A, B, _} = Msg0 ->
ct:fail(Msg0)
after 0 ->
ok
end,
receive
{nodedown, B, A, _} = Msg1 ->
ct:fail(Msg1)
after 0 ->
ok
end.
%% verify_down(A, B) ->
%% receive {nodedown, A, B, _} -> ok end,
%% receive {nodedown, B, A, _} -> ok end.
verify_down(A, ReasonA, B, ReasonB) ->
receive
{nodedown, A, B, _} = Msg0 ->
{nodedown, A, B, ReasonA} = Msg0
end,
receive
{nodedown, B, A, _} = Msg1 ->
{nodedown, B, A, ReasonB} = Msg1
end,
ok.
hostname() ->
from($@, atom_to_list(node())).
from(H, [H | T]) -> T;
from(H, [_ | T]) -> from(H, T);
from(_, []) -> [].
%% fun_spawn(Fun) ->
%% fun_spawn(Fun, []).
%% fun_spawn(Fun, Args) ->
%% spawn_link(erlang, apply, [Fun, Args]).
long_or_short() ->
case net_kernel:longnames() of
true -> " -name ";
false -> " -sname "
end.
until(Fun) ->
case Fun() of
true ->
ok;
false ->
receive after 10 -> ok end,
until(Fun)
end.
forever(Fun) ->
Fun(),
forever(Fun).
abort(Why) ->
set_internal_state(abort, Why).
start_busy_dist_port_tracer() ->
Tracer = spawn_link(fun () -> busy_dist_port_tracer() end),
erlang:system_monitor(Tracer, [busy_dist_port]),
Tracer.
stop_busy_dist_port_tracer(Tracer) when is_pid(Tracer) ->
unlink(Tracer),
exit(Tracer, bye);
stop_busy_dist_port_tracer(_) ->
true.
busy_dist_port_tracer() ->
receive
{monitor, _SuspendedProcess, busy_dist_port, _Port} = M ->
erlang:display(M),
busy_dist_port_tracer()
end.
repeat(_Fun, 0) ->
ok;
repeat(Fun, N) ->
Fun(),
repeat(Fun, N-1).
string_to_atom_ext(String) ->
Utf8List = string_to_utf8_list(String),
Len = length(Utf8List),
case Len < 256 of
true ->
[?SMALL_ATOM_UTF8_EXT, Len | Utf8List];
false ->
[?ATOM_UTF8_EXT, Len bsr 8, Len band 16#ff | Utf8List]
end.
string_to_atom(String) ->
binary_to_term(list_to_binary([?VERSION_MAGIC
| string_to_atom_ext(String)])).
string_to_utf8_list([]) ->
[];
string_to_utf8_list([CP|CPs]) when is_integer(CP),
0 =< CP,
CP =< 16#7F ->
[CP | string_to_utf8_list(CPs)];
string_to_utf8_list([CP|CPs]) when is_integer(CP),
16#80 =< CP,
CP =< 16#7FF ->
[16#C0 bor (CP bsr 6),
16#80 bor (16#3F band CP)
| string_to_utf8_list(CPs)];
string_to_utf8_list([CP|CPs]) when is_integer(CP),
16#800 =< CP,
CP =< 16#FFFF ->
[16#E0 bor (CP bsr 12),
16#80 bor (16#3F band (CP bsr 6)),
16#80 bor (16#3F band CP)
| string_to_utf8_list(CPs)];
string_to_utf8_list([CP|CPs]) when is_integer(CP),
16#10000 =< CP,
CP =< 16#10FFFF ->
[16#F0 bor (CP bsr 18),
16#80 bor (16#3F band (CP bsr 12)),
16#80 bor (16#3F band (CP bsr 6)),
16#80 bor (16#3F band CP)
| string_to_utf8_list(CPs)].
mk_pid({NodeName, Creation}, Number, Serial) when is_atom(NodeName) ->
<<?VERSION_MAGIC, NodeNameExt/binary>> = term_to_binary(NodeName),
mk_pid({NodeNameExt, Creation}, Number, Serial);
mk_pid({NodeNameExt, Creation}, Number, Serial) ->
case catch binary_to_term(list_to_binary([?VERSION_MAGIC,
?PID_EXT,
NodeNameExt,
uint32_be(Number),
uint32_be(Serial),
uint8(Creation)])) of
Pid when is_pid(Pid) ->
Pid;
{'EXIT', {badarg, _}} ->
exit({badarg, mk_pid, [{NodeNameExt, Creation}, Number, Serial]});
Other ->
exit({unexpected_binary_to_term_result, Other})
end.
mk_port({NodeName, Creation}, Number) when is_atom(NodeName) ->
<<?VERSION_MAGIC, NodeNameExt/binary>> = term_to_binary(NodeName),
mk_port({NodeNameExt, Creation}, Number);
mk_port({NodeNameExt, Creation}, Number) ->
case catch binary_to_term(list_to_binary([?VERSION_MAGIC,
?PORT_EXT,
NodeNameExt,
uint32_be(Number),
uint8(Creation)])) of
Port when is_port(Port) ->
Port;
{'EXIT', {badarg, _}} ->
exit({badarg, mk_port, [{NodeNameExt, Creation}, Number]});
Other ->
exit({unexpected_binary_to_term_result, Other})
end.
mk_ref({NodeName, Creation}, [Number] = NL) when is_atom(NodeName),
is_integer(Creation),
is_integer(Number) ->
<<?VERSION_MAGIC, NodeNameExt/binary>> = term_to_binary(NodeName),
mk_ref({NodeNameExt, Creation}, NL);
mk_ref({NodeNameExt, Creation}, [Number]) when is_integer(Creation),
is_integer(Number) ->
case catch binary_to_term(list_to_binary([?VERSION_MAGIC,
?REFERENCE_EXT,
NodeNameExt,
uint32_be(Number),
uint8(Creation)])) of
Ref when is_reference(Ref) ->
Ref;
{'EXIT', {badarg, _}} ->
exit({badarg, mk_ref, [{NodeNameExt, Creation}, [Number]]});
Other ->
exit({unexpected_binary_to_term_result, Other})
end;
mk_ref({NodeName, Creation}, Numbers) when is_atom(NodeName),
is_integer(Creation),
is_list(Numbers) ->
<<?VERSION_MAGIC, NodeNameExt/binary>> = term_to_binary(NodeName),
mk_ref({NodeNameExt, Creation}, Numbers);
mk_ref({NodeNameExt, Creation}, Numbers) when is_integer(Creation),
is_list(Numbers) ->
case catch binary_to_term(list_to_binary([?VERSION_MAGIC,
?NEW_REFERENCE_EXT,
uint16_be(length(Numbers)),
NodeNameExt,
uint8(Creation),
lists:map(fun (N) ->
uint32_be(N)
end,
Numbers)])) of
Ref when is_reference(Ref) ->
Ref;
{'EXIT', {badarg, _}} ->
exit({badarg, mk_ref, [{NodeNameExt, Creation}, Numbers]});
Other ->
exit({unexpected_binary_to_term_result, Other})
end.
uint64_be(Uint) when is_integer(Uint), 0 =< Uint, Uint < 1 bsl 64 ->
[(Uint bsr 56) band 16#ff,
(Uint bsr 48) band 16#ff,
(Uint bsr 40) band 16#ff,
(Uint bsr 32) band 16#ff,
(Uint bsr 24) band 16#ff,
(Uint bsr 16) band 16#ff,
(Uint bsr 8) band 16#ff,
Uint band 16#ff];
uint64_be(Uint) ->
exit({badarg, uint64_be, [Uint]}).
uint32_be(Uint) when is_integer(Uint), 0 =< Uint, Uint < 1 bsl 32 ->
[(Uint bsr 24) band 16#ff,
(Uint bsr 16) band 16#ff,
(Uint bsr 8) band 16#ff,
Uint band 16#ff];
uint32_be(Uint) ->
exit({badarg, uint32_be, [Uint]}).
uint16_be(Uint) when is_integer(Uint), 0 =< Uint, Uint < 1 bsl 16 ->
[(Uint bsr 8) band 16#ff,
Uint band 16#ff];
uint16_be(Uint) ->
exit({badarg, uint16_be, [Uint]}).
uint8(Uint) when is_integer(Uint), 0 =< Uint, Uint < 1 bsl 8 ->
Uint band 16#ff;
uint8(Uint) ->
exit({badarg, uint8, [Uint]}).
free_memory() ->
%% Free memory in MB.
try
SMD = memsup:get_system_memory_data(),
{value, {free_memory, Free}} = lists:keysearch(free_memory, 1, SMD),
TotFree = (Free +
case lists:keysearch(cached_memory, 1, SMD) of
{value, {cached_memory, Cached}} -> Cached;
false -> 0
end +
case lists:keysearch(buffered_memory, 1, SMD) of
{value, {buffered_memory, Buffed}} -> Buffed;
false -> 0
end),
TotFree div (1024*1024)
catch
error : undef ->
ct:fail({"os_mon not built"})
end.