%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 1997-2011. All Rights Reserved. %% %% The contents of this file are subject to the Erlang Public License, %% Version 1.1, (the "License"); you may not use this file except in %% compliance with the License. You should have received a copy of the %% Erlang Public License along with this software. If not, it can be %% retrieved online at http://www.erlang.org/. %% %% Software distributed under the License is distributed on an "AS IS" %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See %% the License for the specific language governing rights and limitations %% under the License. %% %% %CopyrightEnd% %% -module(distribution_SUITE). -compile(r12). %% Tests distribution and the tcp driver. -include_lib("test_server/include/test_server.hrl"). -export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1, init_per_group/2,end_per_group/2, ping/1, bulk_send_small/1, bulk_send_big/1, bulk_send_bigbig/1, local_send_small/1, local_send_big/1, local_send_legal/1, link_to_busy/1, exit_to_busy/1, lost_exit/1, link_to_dead/1, link_to_dead_new_node/1, applied_monitor_node/1, ref_port_roundtrip/1, nil_roundtrip/1, trap_bif_1/1, trap_bif_2/1, trap_bif_3/1, stop_dist/1, dist_auto_connect_never/1, dist_auto_connect_once/1, dist_parallel_send/1, atom_roundtrip/1, atom_roundtrip_r12b/1, contended_atom_cache_entry/1, bad_dist_structure/1, bad_dist_ext_receive/1, bad_dist_ext_process_info/1, bad_dist_ext_control/1, bad_dist_ext_connection_id/1]). -export([init_per_testcase/2, end_per_testcase/2]). %% Internal exports. -export([sender/3, receiver2/2, dummy_waiter/0, dead_process/0, roundtrip/1, bounce/1, do_dist_auto_connect/1, inet_rpc_server/1, dist_parallel_sender/3, dist_parallel_receiver/0, dist_evil_parallel_receiver/0, sendersender/4, sendersender2/4]). suite() -> [{ct_hooks,[ts_install_cth]}]. all() -> [ping, {group, bulk_send}, {group, local_send}, link_to_busy, exit_to_busy, lost_exit, link_to_dead, link_to_dead_new_node, applied_monitor_node, ref_port_roundtrip, nil_roundtrip, stop_dist, {group, trap_bif}, {group, dist_auto_connect}, dist_parallel_send, atom_roundtrip, atom_roundtrip_r12b, contended_atom_cache_entry, bad_dist_structure, {group, bad_dist_ext}]. groups() -> [{bulk_send, [], [bulk_send_small, bulk_send_big, bulk_send_bigbig]}, {local_send, [], [local_send_small, local_send_big, local_send_legal]}, {trap_bif, [], [trap_bif_1, trap_bif_2, trap_bif_3]}, {dist_auto_connect, [], [dist_auto_connect_never, dist_auto_connect_once]}, {bad_dist_ext, [], [bad_dist_ext_receive, bad_dist_ext_process_info, bad_dist_ext_control, bad_dist_ext_connection_id]}]. init_per_suite(Config) -> Config. end_per_suite(_Config) -> ok. init_per_group(_GroupName, Config) -> Config. end_per_group(_GroupName, Config) -> Config. -define(DEFAULT_TIMETRAP, 4*60*1000). init_per_testcase(Func, Config) when is_atom(Func), is_list(Config) -> Dog=?t:timetrap(?DEFAULT_TIMETRAP), [{watchdog, Dog},{testcase, Func}|Config]. end_per_testcase(Func, Config) when is_atom(Func), is_list(Config) -> Dog=?config(watchdog, Config), ?t:timetrap_cancel(Dog). %%% Don't be too hard on vxworks, the cross server gets nodedown %%% cause the card is too busy if we don't sleep a little between pings. sleep() -> case os:type() of vxworks -> receive after 10 -> ok end; _ -> ok end. ping(doc) -> ["Tests pinging a node in different ways."]; ping(Config) when is_list(Config) -> Times = 1024, %% Ping a non-existing node many times. This used to crash the emulator %% on Windows. ?line Host = hostname(), ?line BadName = list_to_atom("__pucko__@" ++ Host), ?line io:format("Pinging ~s (assumed to not exist)", [BadName]), ?line test_server:do_times(Times, fun() -> pang = net_adm:ping(BadName), sleep() end), %% Pings another node. ?line {ok, OtherNode} = start_node(distribution_SUITE_other), ?line io:format("Pinging ~s (assumed to exist)", [OtherNode]), ?line test_server:do_times(Times, fun() -> pong = net_adm:ping(OtherNode),sleep() end), ?line stop_node(OtherNode), %% Pings our own node many times. ?line Node = node(), ?line io:format("Pinging ~s (the same node)", [Node]), ?line test_server:do_times(Times, fun() -> pong = net_adm:ping(Node),sleep() end), ok. bulk_send_small(Config) when is_list(Config) -> ?line bulk_send(64, 32). bulk_send_big(Config) when is_list(Config) -> ?line bulk_send(32, 64). bulk_send_bigbig(Config) when is_list(Config) -> ?line bulk_sendsend(32*5, 4). bulk_send(Terms, BinSize) -> ?line Dog = test_server:timetrap(test_server:seconds(30)), ?line io:format("Sending ~w binaries, each of size ~w K", [Terms, BinSize]), ?line {ok, Node} = start_node(bulk_receiver), ?line Recv = spawn(Node, erlang, apply, [fun receiver/2, [0, 0]]), ?line Bin = list_to_binary(lists:duplicate(BinSize*1024, 253)), ?line Size = Terms*size(Bin), ?line {Elapsed, {Terms, Size}} = test_server:timecall(?MODULE, sender, [Recv, Bin, Terms]), ?line stop_node(Node), ?line test_server:timetrap_cancel(Dog), {comment, integer_to_list(trunc(Size/1024/Elapsed+0.5)) ++ " K/s"}. bulk_sendsend(Terms, BinSize) -> {Rate1, MonitorCount1} = bulk_sendsend2(Terms, BinSize, 5), {Rate2, MonitorCount2} = bulk_sendsend2(Terms, BinSize, 995), Ratio = if MonitorCount2 == 0 -> MonitorCount1 / 1.0; true -> MonitorCount1 / MonitorCount2 end, %% A somewhat arbitrary ratio, but hopefully one that will accommodate %% a wide range of CPU speeds. true = (Ratio > 8.0), {comment, integer_to_list(Rate1) ++ " K/s, " ++ integer_to_list(Rate2) ++ " K/s, " ++ integer_to_list(MonitorCount1) ++ " monitor msgs, " ++ integer_to_list(MonitorCount2) ++ " monitor msgs, " ++ float_to_list(Ratio) ++ " monitor ratio"}. bulk_sendsend2(Terms, BinSize, BusyBufSize) -> ?line Dog = test_server:timetrap(test_server:seconds(30)), ?line io:format("Sending ~w binaries, each of size ~w K", [Terms, BinSize]), ?line {ok, NodeRecv} = start_node(bulk_receiver), ?line Recv = spawn(NodeRecv, erlang, apply, [fun receiver/2, [0, 0]]), ?line Bin = list_to_binary(lists:duplicate(BinSize*1024, 253)), %%?line Size = Terms*size(Bin), %% SLF LEFT OFF HERE. %% When the caller uses small hunks, like 4k via %% bulk_sendsend(32*5, 4), then (on my laptop at least), we get %% zero monitor messages. But if we use "+zdbbl 5", then we %% get a lot of monitor messages. So, if we can count up the %% total number of monitor messages that we get when running both %% default busy size and "+zdbbl 5", and if the 5 case gets %% "many many more" monitor messages, then we know we're working. ?line {ok, NodeSend} = start_node(bulk_sender, "+zdbbl " ++ integer_to_list(BusyBufSize)), ?line _Send = spawn(NodeSend, erlang, apply, [fun sendersender/4, [self(), Recv, Bin, Terms]]), ?line {Elapsed, {_TermsN, SizeN}, MonitorCount} = receive {sendersender, BigRes} -> BigRes end, ?line stop_node(NodeRecv), ?line stop_node(NodeSend), ?line test_server:timetrap_cancel(Dog), {trunc(SizeN/1024/Elapsed+0.5), MonitorCount}. sender(To, _Bin, 0) -> To ! {done, self()}, receive Any -> Any end; sender(To, Bin, Left) -> To ! {term, Bin}, sender(To, Bin, Left-1). %% Sender process to be run on a slave node sendersender(Parent, To, Bin, Left) -> erlang:system_monitor(self(), [busy_dist_port]), [spawn(fun() -> sendersender2(To, Bin, Left, false) end) || _ <- lists:seq(1,1)], {USec, {Res, MonitorCount}} = timer:tc(?MODULE, sendersender2, [To, Bin, Left, true]), Parent ! {sendersender, {USec/1000000, Res, MonitorCount}}. sendersender2(To, Bin, Left, SendDone) -> sendersender3(To, Bin, Left, SendDone, 0). sendersender3(To, _Bin, 0, SendDone, MonitorCount) -> if SendDone -> To ! {done, self()}; true -> ok end, receive {monitor, _Pid, _Type, _Info} -> sendersender3(To, _Bin, 0, SendDone, MonitorCount + 1) after 0 -> if SendDone -> receive Any when is_tuple(Any), size(Any) == 2 -> {Any, MonitorCount} end; true -> exit(normal) end end; sendersender3(To, Bin, Left, SendDone, MonitorCount) -> To ! {term, Bin}, %%timer:sleep(50), sendersender3(To, Bin, Left-1, SendDone, MonitorCount). %% Receiver process to be run on a slave node. receiver(Terms, Size) -> receive {term, Bin} -> receiver(Terms+1, Size+size(Bin)); {done, ReplyTo} -> ReplyTo ! {Terms, Size} end. local_send_big(doc) -> ["Sends several big message to an non-registered process on ", "the local node."]; local_send_big(Config) when is_list(Config) -> Data0=local_send_big(doc)++ ["Tests sending small and big messages to a non-existing ", "local registered process."], Data1=[Data0,[Data0, Data0, [Data0], Data0],Data0], Data2=Data0++lists:flatten(Data1)++ list_to_binary(lists:flatten(Data1)), Func=fun() -> Data2= {arbitrary_name, node()} ! Data2 end, ?line test_server:do_times(4096, Func), ok. local_send_small(doc) -> ["Sends a small message to an non-registered process on the ", "local node."]; local_send_small(Config) when is_list(Config) -> Data={some_stupid, "arbitrary", 'Data'}, Func=fun() -> Data= {unregistered_name, node()} ! Data end, ?line test_server:do_times(4096, Func), ok. local_send_legal(doc) -> ["Sends data to a registered process on the local node, ", "as if it was on another node."]; local_send_legal(Config) when is_list(Config) -> Times=16384, Data={local_send_legal(doc), local_send_legal(doc)}, Pid=spawn(?MODULE,receiver2, [0, 0]) , ?line true=register(registered_process, Pid), Func=fun() -> Data={registered_process, node()} ! Data end, TotalSize=size(Data)*Times, ?line test_server:do_times(Times, Func), % Check that all msgs really came through. Me=self(), ?line {done, Me}= {registered_process, node()} ! {done, Me}, receive {Times, TotalSize} -> ok; _ -> test_server:fail("Wrong number of msgs received.") end, ok. receiver2(Num, TotSize) -> receive {done, ReplyTo} -> ReplyTo ! {Num, TotSize}; Stuff -> receiver2(Num+1, TotSize+size(Stuff)) end. link_to_busy(doc) -> "Test that link/1 to a busy distribution port works."; link_to_busy(Config) when is_list(Config) -> ?line Dog = test_server:timetrap(test_server:seconds(30)), ?line {ok, Node} = start_node(link_to_busy), ?line Recv = spawn(Node, erlang, apply, [fun sink/1, [link_to_busy_sink]]), Tracer = case os:getenv("TRACE_BUSY_DIST_PORT") of "true" -> start_busy_dist_port_tracer(); _ -> false end, %% We will spawn off a process which will try to link to the other %% node. The linker process will not actually run until this %% process is suspended due to the busy distribution port (because %% of the big send). When the link/1 is run, the linker %% process will block, too, because of the because busy port, %% and will later be restarted. ?line do_busy_test(Node, fun () -> linker(Recv) end), %% Same thing, but we apply link/1 instead of calling it directly. ?line do_busy_test(Node, fun () -> applied_linker(Recv) end), %% Same thing again, but we apply link/1 in the tail of a function. ?line do_busy_test(Node, fun () -> tail_applied_linker(Recv) end), %% Done. ?line stop_node(Node), ?line stop_busy_dist_port_tracer(Tracer), ?line test_server:timetrap_cancel(Dog), ok. linker(Pid) -> true = link(Pid), {links, Links} = process_info(self(), links), true = lists:member(Pid, Links). applied_linker(Pid) -> true = apply(erlang, link, [Pid]), {links, Links} = process_info(self(), links), true = lists:member(Pid, Links). tail_applied_linker(Pid) -> apply(erlang, link, [Pid]). exit_to_busy(doc) -> "Test that exit/2 to a busy distribution port works."; exit_to_busy(Config) when is_list(Config) -> ?line Dog = test_server:timetrap(test_server:seconds(30)), ?line {ok, Node} = start_node(exit_to_busy), Tracer = case os:getenv("TRACE_BUSY_DIST_PORT") of "true" -> start_busy_dist_port_tracer(); _ -> false end, %% We will spawn off a process which will try to exit a process on %% the other node. That process will not actually run until this %% process is suspended due to the busy distribution port %% The process executing exit/2 will block, %% too, because of the busy distribution port, and will be allowed %% to continue when the port becomes non-busy. ?line Recv1 = spawn(Node, fun () -> sink(exit_to_busy_sink) end), ?line M1 = erlang:monitor(process, Recv1), ?line do_busy_test(Node, fun () -> joey_killer(Recv1) end), ?line receive {'DOWN', M1, process, Recv1, R1} -> ?line joey_said_die = R1 end, %% Same thing, but tail call to exit/2. ?line Recv2 = spawn(Node, fun () -> sink(exit_to_busy_sink) end), ?line M2 = erlang:monitor(process, Recv2), ?line do_busy_test(Node, fun () -> tail_joey_killer(Recv2) end), ?line receive {'DOWN', M2, process, Recv2, R2} -> ?line joey_said_die = R2 end, %% Same thing, but we apply exit/2 instead of calling it directly. ?line Recv3 = spawn(Node, fun () -> sink(exit_to_busy_sink) end), ?line M3 = erlang:monitor(process, Recv3), ?line do_busy_test(Node, fun () -> applied_joey_killer(Recv3) end), ?line receive {'DOWN', M3, process, Recv3, R3} -> ?line joey_said_die = R3 end, %% Same thing again, but we apply exit/2 in the tail of a function. ?line Recv4 = spawn(Node, fun () -> sink(exit_to_busy_sink) end), ?line M4 = erlang:monitor(process, Recv4), ?line do_busy_test(Node, fun () -> tail_applied_joey_killer(Recv4) end), ?line receive {'DOWN', M4, process, Recv4, R4} -> ?line joey_said_die = R4 end, %% Done. ?line stop_node(Node), ?line stop_busy_dist_port_tracer(Tracer), ?line test_server:timetrap_cancel(Dog), ok. make_busy_data() -> Size = 1024*1024, Key = '__busy__port__data__', case get(Key) of undefined -> Data = list_to_binary(lists:duplicate(Size, 253)), put(Key, Data), Data; Data -> true = is_binary(Data), true = size(Data) == Size, Data end. make_busy(Node, Time) when is_integer(Time) -> Own = 500, freeze_node(Node, Time+Own), Data = make_busy_data(), %% first make port busy Pid = spawn_link(fun () -> forever(fun () -> dport_reg_send(Node, '__noone__', Data) end) end), receive after Own -> ok end, until(fun () -> case process_info(Pid, status) of {status, suspended} -> true; _ -> false end end), %% then dist entry make_busy(Node, [nosuspend], Data), Pid. make_busy(Node, Opts, Data) -> case erlang:send({'__noone__', Node}, Data, Opts) of nosuspend -> nosuspend; _ -> make_busy(Node, Opts, Data) end. unmake_busy(Pid) -> unlink(Pid), exit(Pid, bang). do_busy_test(Node, Fun) -> Busy = make_busy(Node, 1000), {P, M} = spawn_monitor(Fun), receive after 100 -> ok end, Pinfo = process_info(P, [status, current_function]), unmake_busy(Busy), ?t:format("~p : ~p~n", [P, Pinfo]), case Pinfo of undefined -> receive {'DOWN', M, process, P, Reason} -> ?t:format("~p died with exit reason ~p~n", [P, Reason]) end, ?t:fail(premature_death); _ -> %% Don't match arity; it is different in debug and %% optimized emulator [{status, suspended}, {current_function, {erlang, bif_return_trap, _}}] = Pinfo, receive {'DOWN', M, process, P, Reason} -> ?t:format("~p died with exit reason ~p~n", [P, Reason]), normal = Reason end end. remote_is_process_alive(Pid) -> rpc:call(node(Pid), erlang, is_process_alive, [Pid]). joey_killer(Pid) -> exit(Pid, joey_said_die), until(fun () -> false == remote_is_process_alive(Pid) end). tail_joey_killer(Pid) -> exit(Pid, joey_said_die). applied_joey_killer(Pid) -> apply(erlang, exit, [Pid, joey_said_die]), until(fun () -> false == remote_is_process_alive(Pid) end). tail_applied_joey_killer(Pid) -> apply(erlang, exit, [Pid, joey_said_die]). sink(Name) -> register(Name, self()), sink1(). sink1() -> receive _Any -> sink1() end. lost_exit(doc) -> "Test that EXIT and DOWN messages send to another node are not lost if " "the distribution port is busy."; lost_exit(Config) when is_list(Config) -> ?line {ok, Node} = start_node(lost_exit), Tracer = case os:getenv("TRACE_BUSY_DIST_PORT") of "true" -> start_busy_dist_port_tracer(); _ -> false end, Self = self(), Die = make_ref(), ?line R1 = spawn(fun () -> receive after infinity -> ok end end), ?line MR1 = erlang:monitor(process, R1), ?line {L1, ML1} = spawn_monitor(fun() -> link(R1), Self ! {self(), linked}, receive Die -> exit(controlled_suicide) end end), ?line R2 = spawn(fun () -> M = erlang:monitor(process, L1), receive {'DOWN', M, process, L1, R} -> Self ! {self(), got_down_message, L1, R} end end), ?line receive {L1, linked} -> ok end, Busy = make_busy(Node, 2000), receive after 100 -> ok end, L1 ! Die, ?line receive {'DOWN', ML1, process, L1, RL1} -> ?line controlled_suicide = RL1 end, receive after 500 -> ok end, unmake_busy(Busy), ?line receive {'DOWN', MR1, process, R1, RR1} -> ?line controlled_suicide = RR1 end, ?line receive {R2, got_down_message, L1, RR2} -> ?line controlled_suicide = RR2 end, %% Done. ?line stop_busy_dist_port_tracer(Tracer), ?line stop_node(Node), ok. dummy_waiter() -> receive after infinity -> ok end. link_to_dead(doc) -> ["Test that linking to a dead remote process gives an EXIT message ", "AND that the link is teared down."]; link_to_dead(Config) when is_list(Config) -> ?line process_flag(trap_exit, true), ?line {ok, Node} = start_node(link_to_dead), % ?line monitor_node(Node, true), ?line net_adm:ping(Node), %% Ts_cross_server workaround. ?line Pid = spawn(Node, ?MODULE, dead_process, []), receive after 5000 -> ok end, ?line link(Pid), ?line receive {'EXIT', Pid, noproc} -> ok; Other -> ?line test_server:fail({unexpected_message, Other}) after 5000 -> ?line test_server:fail(nothing_received) end, ?line {links, Links} = process_info(self(), links), ?line io:format("Pid=~p, links=~p", [Pid, Links]), ?line false = lists:member(Pid, Links), ?line stop_node(Node), ?line receive Message -> ?line test_server:fail({unexpected_message, Message}) after 3000 -> ok end, ok. dead_process() -> erlang:error(die). link_to_dead_new_node(doc) -> ["Test that linking to a pid on node that has gone and restarted gives ", "the correct EXIT message (OTP-2304)."]; link_to_dead_new_node(Config) when is_list(Config) -> ?line process_flag(trap_exit, true), %% Start the node, get a Pid and stop the node again. ?line {ok, Node} = start_node(link_to_dead_new_node), ?line Pid = spawn(Node, ?MODULE, dead_process, []), ?line stop_node(Node), %% Start a new node with the same name. ?line {ok, Node} = start_node(link_to_dead_new_node), ?line link(Pid), ?line receive {'EXIT', Pid, noproc} -> ok; Other -> ?line test_server:fail({unexpected_message, Other}) after 5000 -> ?line test_server:fail(nothing_received) end, %% Make sure that the link wasn't created. ?line {links, Links} = process_info(self(), links), ?line io:format("Pid=~p, links=~p", [Pid, Links]), ?line false = lists:member(Pid, Links), ?line stop_node(Node), ?line receive Message -> ?line test_server:fail({unexpected_message, Message}) after 3000 -> ok end, ok. applied_monitor_node(doc) -> "Test that monitor_node/2 works when applied."; applied_monitor_node(Config) when is_list(Config) -> ?line NonExisting = list_to_atom("__non_existing__@" ++ hostname()), %% Tail-recursive call to apply (since the node is non-existing, %% there will be a trap). ?line true = tail_apply(erlang, monitor_node, [NonExisting, true]), ?line [{nodedown, NonExisting}] = test_server:messages_get(), %% Ordinary call (with trap). ?line true = apply(erlang, monitor_node, [NonExisting, true]), ?line [{nodedown, NonExisting}] = test_server:messages_get(), ok. tail_apply(M, F, A) -> apply(M, F, A). ref_port_roundtrip(doc) -> "Test that sending a port or reference to another node and back again " "doesn't correct them in any way."; ref_port_roundtrip(Config) when is_list(Config) -> ?line process_flag(trap_exit, true), ?line Port = open_port({spawn, efile}, []), ?line Ref = make_ref(), ?line {ok, Node} = start_node(ref_port_roundtrip), ?line net_adm:ping(Node), ?line Term = {Port, Ref}, ?line io:format("Term before: ~p", [show_term(Term)]), ?line Pid = spawn_link(Node, ?MODULE, roundtrip, [Term]), ?line receive after 5000 -> ok end, ?line stop_node(Node), ?line receive {'EXIT', Pid, {Port, Ref}} -> ?line io:format("Term after: ~p", [show_term(Term)]), ok; Other -> ?line io:format("Term after: ~p", [show_term(Term)]), ?line test_server:fail({unexpected, Other}) after 10000 -> ?line test_server:fail(timeout) end, ok. roundtrip(Term) -> exit(Term). nil_roundtrip(doc) -> "Test that the smallest external term [] aka NIL can be sent to " "another node node and back again."; nil_roundtrip(Config) when is_list(Config) -> ?line process_flag(trap_exit, true), ?line {ok, Node} = start_node(nil_roundtrip), ?line net_adm:ping(Node), ?line Pid = spawn_link(Node, ?MODULE, bounce, [self()]), ?line Pid ! [], ?line receive [] -> ?line receive {'EXIT', Pid, []} -> ?line stop_node(Node), ok end end. bounce(Dest) -> receive Msg -> Dest ! Msg, exit(Msg) end. show_term(Term) -> binary_to_list(term_to_binary(Term)). stop_dist(doc) -> ["Tests behaviour after net_kernel:stop (OTP-2586)."]; stop_dist(Config) when is_list(Config) -> ?line Str = os:cmd(atom_to_list(lib:progname()) ++ " -noshell -pa " ++ ?config(data_dir, Config) ++ " -s run"), %% The "true" may be followed by an error report, so ignore anything that %% follows it. ?line "true\n"++_ = Str, %% "May fail on FreeBSD due to differently configured name lookup - ask Arndt", %% if you can find him. ok. trap_bif_1(doc) -> [""]; trap_bif_1(Config) when is_list(Config) -> ?line {true} = tr1(), ok. trap_bif_2(doc) -> [""]; trap_bif_2(Config) when is_list(Config) -> ?line {true} = tr2(), ok. trap_bif_3(doc) -> [""]; trap_bif_3(Config) when is_list(Config) -> ?line {hoo} = tr3(), ok. tr1() -> ?line NonExisting = 'abc@boromir', ?line X = erlang:monitor_node(NonExisting, true), {X}. tr2() -> ?line NonExisting = 'abc@boromir', ?line X = apply(erlang, monitor_node, [NonExisting, true]), {X}. tr3() -> ?line NonExisting = 'abc@boromir', ?line X = {NonExisting, glirp} ! hoo, {X}. % This has to be done by nodes with differrent cookies, otherwise global % will connect nodes, which is correct, but makes it hard to test. % * Start two nodes, n1 and n2. n2 with the dist_auto_connect once parameter % * n2 pings n1 -> connection % * check that they now know each other % * Kill n1 % * Make sure n2 gets pang when pinging n1 % * restart n1 % * Make sure n2 *still gets pang*! % * Ping n2 from n1 -> pong % * n2 now also gets pong when pinging n1 % * disconnect n2 from n1 % * n2 gets pang when pinging n1 % * n2 forces connection by using net_kernel:connect_node (ovverrides) % * n2 gets pong when pinging n1. dist_auto_connect_once(doc) -> "Test the dist_auto_connect once kernel parameter"; dist_auto_connect_once(Config) when is_list(Config) -> ?line Sock = start_relay_node(dist_auto_connect_relay_node,[]), ?line NN = inet_rpc_nodename(Sock), ?line Sock2 = start_relay_node(dist_auto_connect_once_node, "-kernel dist_auto_connect once"), ?line NN2 = inet_rpc_nodename(Sock2), ?line {ok,[]} = do_inet_rpc(Sock,erlang,nodes,[]), ?line {ok, pong} = do_inet_rpc(Sock2,net_adm,ping,[NN]), ?line {ok,[NN2]} = do_inet_rpc(Sock,erlang,nodes,[]), ?line {ok,[NN]} = do_inet_rpc(Sock2,erlang,nodes,[]), ?line [_,HostPartPeer] = string:tokens(atom_to_list(NN),"@"), ?line [_,MyHostPart] = string:tokens(atom_to_list(node()),"@"), % Give net_kernel a chance to change the state of the node to up to. ?line receive after 1000 -> ok end, case HostPartPeer of MyHostPart -> ?line ok = stop_relay_node(Sock), ?line {ok,pang} = do_inet_rpc(Sock2,net_adm,ping,[NN]); _ -> ?line {ok, true} = do_inet_rpc(Sock,net_kernel,disconnect,[NN2]), receive after 500 -> ok end end, ?line {ok, []} = do_inet_rpc(Sock2,erlang,nodes,[]), Sock3 = case HostPartPeer of MyHostPart -> ?line start_relay_node(dist_auto_connect_relay_node,[]); _ -> Sock end, ?line TS1 = timestamp(), ?line {ok, pang} = do_inet_rpc(Sock2,net_adm,ping,[NN]), ?line TS2 = timestamp(), RefT = net_kernel:connecttime() - 1000, ?line true = ((TS2 - TS1) < RefT), ?line TS3 = timestamp(), ?line {ok, true} = do_inet_rpc(Sock2,erlang,monitor_node, [NN,true,[allow_passive_connect]]), ?line TS4 = timestamp(), ?line true = ((TS4 - TS3) > RefT), ?line {ok, pong} = do_inet_rpc(Sock3,net_adm,ping,[NN2]), ?line {ok, pong} = do_inet_rpc(Sock2,net_adm,ping,[NN]), ?line {ok, true} = do_inet_rpc(Sock3,net_kernel,disconnect,[NN2]), receive after 500 -> ok end, ?line {ok, pang} = do_inet_rpc(Sock2,net_adm,ping,[NN]), ?line {ok, true} = do_inet_rpc(Sock2,net_kernel,connect_node,[NN]), ?line {ok, pong} = do_inet_rpc(Sock2,net_adm,ping,[NN]), ?line stop_relay_node(Sock3), ?line stop_relay_node(Sock2). %% Start a relay node and a lonely (dist_auto_connect never) node. %% Lonely node pings relay node. That should fail. %% Lonely node connects to relay node with net_kernel:connect_node/1. %% Result is sent here through relay node. dist_auto_connect_never(Config) when is_list(Config) -> Self = self(), ?line {ok, RelayNode} = start_node(dist_auto_connect_relay), ?line spawn(RelayNode, fun() -> register(dist_auto_connect_relay, self()), dist_auto_connect_relay(Self) end), ?line {ok, Handle} = dist_auto_connect_start(dist_auto_connect, never), ?line Result = receive {do_dist_auto_connect, ok} -> ok; {do_dist_auto_connect, Error} -> {error, Error}; Other -> {error, Other} after 32000 -> timeout end, ?line stop_node(RelayNode), ?line Stopped = dist_auto_connect_stop(Handle), ?line Junk = receive {do_dist_auto_connect, _} = J -> J after 0 -> ok end, ?line {ok, ok, ok} = {Result, Stopped, Junk}, ok. do_dist_auto_connect([never]) -> Node = list_to_atom("dist_auto_connect_relay@" ++ hostname()), io:format("~p:do_dist_auto_connect([false]) Node=~p~n", [?MODULE, Node]), Ping = net_adm:ping(Node), io:format("~p:do_dist_auto_connect([false]) Ping=~p~n", [?MODULE, Ping]), Result = case Ping of pang -> ok; _ -> {error, Ping} end, io:format("~p:do_dist_auto_connect([false]) Result=~p~n", [?MODULE, Result]), net_kernel:connect_node(Node), catch {dist_auto_connect_relay, Node} ! {do_dist_auto_connect, Result}; % receive after 1000 -> ok end, % halt(); do_dist_auto_connect(Arg) -> io:format("~p:do_dist_auto_connect(~p)~n", [?MODULE, Arg]), receive after 10000 -> ok end, halt(). dist_auto_connect_start(Name, Value) when is_atom(Name) -> dist_auto_connect_start(atom_to_list(Name), Value); dist_auto_connect_start(Name, Value) when is_list(Name), is_atom(Value) -> Node = list_to_atom(lists:append([Name, "@", hostname()])), ModuleDir = filename:dirname(code:which(?MODULE)), ValueStr = atom_to_list(Value), Cookie = atom_to_list(erlang:get_cookie()), Cmd = lists:concat( [%"xterm -e ", atom_to_list(lib:progname()), % " -noinput ", " -detached ", long_or_short(), " ", Name, " -setcookie ", Cookie, " -pa ", ModuleDir, " -s ", atom_to_list(?MODULE), " do_dist_auto_connect ", ValueStr, " -kernel dist_auto_connect ", ValueStr]), io:format("~p:dist_auto_connect_start() cmd: ~p~n", [?MODULE, Cmd]), Port = open_port({spawn, Cmd}, [stream]), {ok, {Port, Node}}. dist_auto_connect_stop({Port, Node}) -> Pid = spawn_link(fun() -> rpc:call(Node, erlang, halt, []) end), dist_auto_connect_stop(Port, Node, Pid, 5000). dist_auto_connect_stop(Port, _Node, Pid, N) when is_integer(N), N =< 0 -> exit(Pid, normal), catch erlang:port_close(Port), Result = {error, node_not_down}, io:format("~p:dist_auto_connect_stop() ~p~n", [?MODULE, Result]), Result; dist_auto_connect_stop(Port, Node, Pid, N) when is_integer(N) -> case net_adm:ping(Node) of pong -> receive after 100 -> ok end, dist_auto_connect_stop(Port, Node, Pid, N-100); pang -> exit(Pid, normal), catch erlang:port_close(Port), io:format("~p:dist_auto_connect_stop() ok~n", [?MODULE]), ok end. dist_auto_connect_relay(Parent) -> receive X -> catch Parent ! X end, dist_auto_connect_relay(Parent). dist_parallel_send(doc) -> []; dist_parallel_send(suite) -> []; dist_parallel_send(Config) when is_list(Config) -> ?line {ok, RNode} = start_node(dist_parallel_receiver), ?line {ok, SNode} = start_node(dist_parallel_sender), ?line WatchDog = spawn_link( fun () -> TRef = erlang:start_timer((?DEFAULT_TIMETRAP div 2), self(), oops), receive {timeout, TRef, _ } -> spawn(SNode, fun () -> abort(timeout) end), spawn(RNode, fun () -> abort(timeout) end) %% rpc:cast(SNode, erlang, halt, %% ["Timetrap (sender)"]), %% rpc:cast(RNode, erlang, halt, %% ["Timetrap (receiver)"]) end end), ?line MkSndrs = fun (Receiver) -> lists:map(fun (_) -> spawn_link(SNode, ?MODULE, dist_parallel_sender, [self(), Receiver, 1000]) end, lists:seq(1, 64)) end, ?line SndrsStart = fun (Sndrs) -> Parent = self(), spawn_link( SNode, fun () -> lists:foreach(fun (P) -> P ! {go, Parent} end, Sndrs) end) end, ?line SndrsWait = fun (Sndrs) -> lists:foreach(fun (P) -> receive {P, done} -> ok end end, Sndrs) end, ?line DPR = spawn_link(RNode, ?MODULE, dist_parallel_receiver, []), ?line Sndrs1 = MkSndrs(DPR), ?line SndrsStart(Sndrs1), ?line SndrsWait(Sndrs1), ?line unlink(DPR), ?line exit(DPR, bang), ?line DEPR = spawn_link(RNode, ?MODULE, dist_evil_parallel_receiver, []), ?line Sndrs2 = MkSndrs(DEPR), ?line SndrsStart(Sndrs2), ?line SndrsWait(Sndrs2), ?line unlink(DEPR), ?line exit(DEPR, bang), ?line unlink(WatchDog), ?line exit(WatchDog, bang), ?line stop_node(RNode), ?line stop_node(SNode), ?line ok. do_dist_parallel_sender(Parent, _Receiver, 0) -> Parent ! {self(), done}; do_dist_parallel_sender(Parent, Receiver, N) -> Receiver ! {self(), "Some data"}, do_dist_parallel_sender(Parent, Receiver, N-1). dist_parallel_sender(Parent, Receiver, N) -> receive {go, Parent} -> ok end, do_dist_parallel_sender(Parent, Receiver, N). dist_parallel_receiver() -> receive {_Sender, _Data} -> ok end, dist_parallel_receiver(). dist_evil_parallel_receiver() -> receive {Sender, _Data} -> ok end, net_kernel:disconnect(node(Sender)), dist_evil_parallel_receiver(). atom_roundtrip(Config) when is_list(Config) -> ?line AtomData = atom_data(), ?line verify_atom_data(AtomData), ?line {ok, Node} = start_node(Config), ?line do_atom_roundtrip(Node, AtomData), ?line stop_node(Node), ?line ok. atom_roundtrip_r12b(Config) when is_list(Config) -> case ?t:is_release_available("r12b") of true -> ?line AtomData = atom_data(), ?line verify_atom_data(AtomData), ?line {ok, Node} = start_node(Config, [], "r12b"), ?line do_atom_roundtrip(Node, AtomData), ?line stop_node(Node), ?line ok; false -> ?line {skip,"No OTP R12B available"} end. do_atom_roundtrip(Node, AtomData) -> ?line Parent = self(), ?line Proc = spawn_link(Node, fun () -> verify_atom_data_loop(Parent) end), ?line Proc ! {self(), AtomData}, ?line receive {Proc, AD1} -> AtomData = AD1 end, ?line Proc ! {self(), AtomData}, ?line receive {Proc, AD2} -> AtomData = AD2 end, ?line RevAtomData = lists:reverse(AtomData), ?line Proc ! {self(), RevAtomData}, ?line receive {Proc, RAD1} -> RevAtomData = RAD1 end, ?line unlink(Proc), ?line exit(Proc, bang), ?line ok. verify_atom_data_loop(From) -> receive {From, AtomData} -> verify_atom_data(AtomData), From ! {self(), AtomData}, verify_atom_data_loop(From) end. atom_data() -> lists:map(fun (N) -> ATxt = "a"++integer_to_list(N), {list_to_atom(ATxt), ATxt} end, lists:seq(1, 2000)). verify_atom_data(AtomData) -> lists:foreach(fun ({Atom, AtomTxt}) -> AtomTxt = atom_to_list(Atom) end, AtomData). contended_atom_cache_entry(Config) when is_list(Config) -> ?line TestServer = self(), ?line ProcessPairs = 10, ?line Msgs = 100000, ?line {ok, SNode} = start_node(Config), ?line {ok, RNode} = start_node(Config), ?line Success = make_ref(), ?line spawn_link( SNode, fun () -> erts_debug:set_internal_state(available_internal_state, true), Master = self(), CIX = get_cix(), TestAtoms = get_conflicting_atoms(CIX, ProcessPairs), io:format("Testing with the following atoms all using " "cache index ~p:~n ~p~n", [CIX, TestAtoms]), Ps = lists:map( fun (A) -> Ref = make_ref(), R = spawn_link( RNode, fun () -> Atom = receive {Ref, txt, ATxt} -> list_to_atom( ATxt) end, receive_ref_atom(Ref, Atom, Msgs), Master ! {self(), success} end), S = spawn_link( SNode, fun () -> receive go -> ok end, R ! {Ref, txt, atom_to_list(A)}, send_ref_atom(R, Ref, A, Msgs) end), {S, R} end, TestAtoms), lists:foreach(fun ({S, _}) -> S ! go end, Ps), lists:foreach(fun ({_, R}) -> receive {R, success} -> ok end end, Ps), TestServer ! Success end), ?line receive Success -> ok end, ?line stop_node(SNode), ?line stop_node(RNode), ?line ok. send_ref_atom(_To, _Ref, _Atom, 0) -> ok; send_ref_atom(To, Ref, Atom, N) -> To ! {Ref, Atom}, send_ref_atom(To, Ref, Atom, N-1). receive_ref_atom(_Ref, _Atom, 0) -> ok; receive_ref_atom(Ref, Atom, N) -> receive {Ref, Value} -> Atom = Value end, receive_ref_atom(Ref, Atom, N-1). get_cix() -> get_cix(1000). get_cix(CIX) when is_integer(CIX), CIX < 0 -> get_cix(0); get_cix(CIX) when is_integer(CIX) -> get_cix(CIX, unwanted_cixs(), erts_debug:get_internal_state(max_atom_out_cache_index)). get_cix(CIX, Unwanted, MaxCIX) when CIX > MaxCIX -> get_cix(0, Unwanted, MaxCIX); get_cix(CIX, Unwanted, MaxCIX) -> case lists:member(CIX, Unwanted) of true -> get_cix(CIX+1, Unwanted, MaxCIX); false -> CIX end. unwanted_cixs() -> lists:map(fun (Node) -> erts_debug:get_internal_state({atom_out_cache_index, Node}) end, nodes()). get_conflicting_atoms(_CIX, 0) -> []; get_conflicting_atoms(CIX, N) -> {A, B, C} = now(), Atom = list_to_atom("atom" ++ integer_to_list(A*1000000000000 + B*1000000 + C)), case erts_debug:get_internal_state({atom_out_cache_index, Atom}) of CIX -> [Atom|get_conflicting_atoms(CIX, N-1)]; _ -> get_conflicting_atoms(CIX, N) end. -define(COOKIE, ''). -define(DOP_LINK, 1). -define(DOP_SEND, 2). -define(DOP_EXIT, 3). -define(DOP_UNLINK, 4). -define(DOP_REG_SEND, 6). -define(DOP_GROUP_LEADER, 7). -define(DOP_EXIT2, 8). -define(DOP_SEND_TT, 12). -define(DOP_EXIT_TT, 13). -define(DOP_REG_SEND_TT, 16). -define(DOP_EXIT2_TT, 18). -define(DOP_MONITOR_P, 19). -define(DOP_DEMONITOR_P, 20). -define(DOP_MONITOR_P_EXIT, 21). start_monitor(Offender,P) -> ?line Parent = self(), ?line Q = spawn(Offender, fun () -> Ref = erlang:monitor(process,P), Parent ! {self(),ref,Ref}, receive just_stay_alive -> ok end end), ?line Ref = receive {Q,ref,R} -> R after 5000 -> error end, io:format("Ref is ~p~n",[Ref]), ok. start_link(Offender,P) -> ?line Parent = self(), ?line Q = spawn(Offender, fun () -> process_flag(trap_exit,true), link(P), Parent ! {self(),ref,P}, receive just_stay_alive -> ok end end), ?line Ref = receive {Q,ref,R} -> R after 5000 -> error end, io:format("Ref is ~p~n",[Ref]), ok. bad_dist_structure(suite) -> []; bad_dist_structure(doc) -> ["Test dist messages with valid structure (binary to term ok) but malformed" "control content"]; bad_dist_structure(Config) when is_list(Config) -> %process_flag(trap_exit,true), ODog = ?config(watchdog, Config), ?t:timetrap_cancel(ODog), Dog = ?t:timetrap(?t:seconds(15)), ?line {ok, Offender} = start_node(bad_dist_structure_offender), ?line {ok, Victim} = start_node(bad_dist_structure_victim), ?line start_node_monitors([Offender,Victim]), ?line Parent = self(), ?line P = spawn(Victim, fun () -> process_flag(trap_exit,true), Parent ! {self(), started}, receive check_msgs -> ok end, bad_dist_struct_check_msgs([one, two]), Parent ! {self(), messages_checked}, receive done -> ok end end), ?line receive {P, started} -> ok end, ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line verify_up(Offender, Victim), ?line true = lists:member(Offender, rpc:call(Victim, erlang, nodes, [])), ?line start_monitor(Offender,P), ?line P ! one, ?line send_bad_structure(Offender, P,{?DOP_MONITOR_P_EXIT,'replace',P,normal},2), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line start_monitor(Offender,P), ?line send_bad_structure(Offender, P,{?DOP_MONITOR_P_EXIT,'replace',P,normal,normal},2), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line start_link(Offender,P), ?line send_bad_structure(Offender, P,{?DOP_LINK},0), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line start_link(Offender,P), ?line send_bad_structure(Offender, P,{?DOP_UNLINK,'replace'},2), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line start_link(Offender,P), ?line send_bad_structure(Offender, P,{?DOP_UNLINK,'replace',make_ref()},2), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line start_link(Offender,P), ?line send_bad_structure(Offender, P,{?DOP_UNLINK,make_ref(),P},0), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line start_link(Offender,P), ?line send_bad_structure(Offender, P,{?DOP_UNLINK,normal,normal},0), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line start_monitor(Offender,P), ?line send_bad_structure(Offender, P,{?DOP_MONITOR_P,'replace',P},2), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line start_monitor(Offender,P), ?line send_bad_structure(Offender, P,{?DOP_MONITOR_P,'replace',P,normal},2), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line start_monitor(Offender,P), ?line send_bad_structure(Offender, P,{?DOP_DEMONITOR_P,'replace',P},2), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line start_monitor(Offender,P), ?line send_bad_structure(Offender, P,{?DOP_DEMONITOR_P,'replace',P,normal},2), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_EXIT,'replace',P},2), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_EXIT,make_ref(),normal,normal},0), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_EXIT_TT,'replace',token,P},2), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_EXIT_TT,make_ref(),token,normal,normal},0), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_EXIT2,'replace',P},2), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_EXIT2,make_ref(),normal,normal},0), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_EXIT2_TT,'replace',token,P},2), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_EXIT2_TT,make_ref(),token,normal,normal},0), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_GROUP_LEADER,'replace'},2), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_GROUP_LEADER,'replace','atomic'},2), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_GROUP_LEADER,'replace',P},0), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_REG_SEND_TT,'replace','',name},2,{message}), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_REG_SEND_TT,'replace','',name,token},0,{message}), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_REG_SEND,'replace',''},2,{message}), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_REG_SEND,'replace','',P},0,{message}), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_REG_SEND,'replace','',name},0,{message}), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_REG_SEND,'replace','',name,{token}},2,{message}), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_SEND_TT,'',P},0,{message}), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_SEND_TT,'',name,token},0,{message}), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_SEND,''},0,{message}), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_SEND,'',name},0,{message}), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line send_bad_structure(Offender, P,{?DOP_SEND,'',P,{token}},0,{message}), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line P ! two, ?line P ! check_msgs, ?line receive {P, messages_checked} -> ok after 5000 -> exit(victim_is_dead) end, ?line {message_queue_len, 0} = rpc:call(Victim, erlang, process_info, [P, message_queue_len]), ?line unlink(P), ?line P ! done, ?line stop_node(Offender), ?line stop_node(Victim), ?t:timetrap_cancel(Dog), ok. bad_dist_ext_receive(Config) when is_list(Config) -> ?line {ok, Offender} = start_node(bad_dist_ext_receive_offender), ?line {ok, Victim} = start_node(bad_dist_ext_receive_victim), ?line start_node_monitors([Offender,Victim]), ?line Parent = self(), ?line P = spawn_link(Victim, fun () -> Parent ! {self(), started}, receive check_msgs -> ok end, bad_dist_ext_check_msgs([one, two, three]), Parent ! {self(), messages_checked}, receive done -> ok end end), ?line receive {P, started} -> ok end, ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line verify_up(Offender, Victim), ?line true = lists:member(Offender, rpc:call(Victim, erlang, nodes, [])), ?line P ! one, ?line send_bad_msg(Offender, P), ?line P ! two, ?line verify_down(Offender, connection_closed, Victim, killed), ?line {message_queue_len, 2} = rpc:call(Victim, erlang, process_info, [P, message_queue_len]), ?line Suspended = make_ref(), ?line S = spawn(Victim, fun () -> erlang:suspend_process(P), Parent ! Suspended, receive after infinity -> ok end end), ?line MS = erlang:monitor(process, S), ?line receive Suspended -> ok end, ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line verify_up(Offender, Victim), ?line true = lists:member(Offender, rpc:call(Victim, erlang, nodes, [])), ?line send_bad_msgs(Offender, P, 5), ?line true = lists:member(Offender, rpc:call(Victim, erlang, nodes, [])), ?line P ! three, ?line send_bad_msgs(Offender, P, 5), %% Make sure bad msgs has reached Victim ?line rpc:call(Offender, rpc, call, [Victim, erlang, node, []]), ?line verify_still_up(Offender, Victim), ?line {message_queue_len, 13} = rpc:call(Victim, erlang, process_info, [P, message_queue_len]), ?line exit(S, bang), ?line receive {'DOWN', MS, process, S, bang} -> ok end, ?line verify_down(Offender, connection_closed, Victim, killed), ?line {message_queue_len, 3} = rpc:call(Victim, erlang, process_info, [P, message_queue_len]), ?line P ! check_msgs, ?line receive {P, messages_checked} -> ok end, ?line {message_queue_len, 0} = rpc:call(Victim, erlang, process_info, [P, message_queue_len]), ?line P ! done, ?line unlink(P), ?line verify_no_down(Offender, Victim), ?line stop_node(Offender), ?line stop_node(Victim). bad_dist_ext_process_info(Config) when is_list(Config) -> ?line {ok, Offender} = start_node(bad_dist_ext_process_info_offender), ?line {ok, Victim} = start_node(bad_dist_ext_process_info_victim), ?line start_node_monitors([Offender,Victim]), ?line Parent = self(), ?line P = spawn_link(Victim, fun () -> Parent ! {self(), started}, receive check_msgs -> ok end, bad_dist_ext_check_msgs([one, two]), Parent ! {self(), messages_checked}, receive done -> ok end end), ?line receive {P, started} -> ok end, ?line P ! one, ?line Suspended = make_ref(), ?line S = spawn(Victim, fun () -> erlang:suspend_process(P), Parent ! Suspended, receive after infinity -> ok end end), ?line receive Suspended -> ok end, ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line verify_up(Offender, Victim), ?line send_bad_msgs(Offender, P, 5), ?line P ! two, ?line send_bad_msgs(Offender, P, 5), %% Make sure bad msgs has reached Victim ?line rpc:call(Offender, rpc, call, [Victim, erlang, node, []]), ?line verify_still_up(Offender, Victim), ?line {message_queue_len, 12} = rpc:call(Victim, erlang, process_info, [P, message_queue_len]), ?line verify_still_up(Offender, Victim), ?line [{message_queue_len, 2}, {messages, [one, two]}] = rpc:call(Victim, erlang, process_info, [P, [message_queue_len, messages]]), ?line verify_down(Offender, connection_closed, Victim, killed), ?line P ! check_msgs, ?line exit(S, bang), ?line receive {P, messages_checked} -> ok end, ?line {message_queue_len, 0} = rpc:call(Victim, erlang, process_info, [P, message_queue_len]), ?line P ! done, ?line unlink(P), ?line verify_no_down(Offender, Victim), ?line stop_node(Offender), ?line stop_node(Victim). bad_dist_ext_control(Config) when is_list(Config) -> ?line {ok, Offender} = start_node(bad_dist_ext_control_offender), ?line {ok, Victim} = start_node(bad_dist_ext_control_victim), ?line start_node_monitors([Offender,Victim]), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line verify_up(Offender, Victim), ?line send_bad_dhdr(Offender, Victim), ?line verify_down(Offender, connection_closed, Victim, killed), ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line verify_up(Offender, Victim), ?line send_bad_ctl(Offender, Victim), ?line verify_down(Offender, connection_closed, Victim, killed), ?line verify_no_down(Offender, Victim), ?line stop_node(Offender), ?line stop_node(Victim). bad_dist_ext_connection_id(Config) when is_list(Config) -> ?line {ok, Offender} = start_node(bad_dist_ext_connection_id_offender), ?line {ok, Victim} = start_node(bad_dist_ext_connection_id_victim), ?line start_node_monitors([Offender,Victim]), ?line Parent = self(), ?line P = spawn_link(Victim, fun () -> Parent ! {self(), started}, receive check_msgs -> ok end, bad_dist_ext_check_msgs([]), Parent ! {self(), messages_checked}, receive done -> ok end end), ?line receive {P, started} -> ok end, ?line Suspended = make_ref(), ?line S = spawn(Victim, fun () -> erlang:suspend_process(P), Parent ! Suspended, receive after infinity -> ok end end), ?line MS = erlang:monitor(process, S), ?line receive Suspended -> ok end, ?line pong = rpc:call(Victim, net_adm, ping, [Offender]), ?line verify_up(Offender, Victim), ?line send_bad_msg(Offender, P), %% Make sure bad msg has reached Victim ?line rpc:call(Offender, rpc, call, [Victim, erlang, node, []]), ?line {message_queue_len, 1} = rpc:call(Victim, erlang, process_info, [P, message_queue_len]), ?line true = rpc:call(Offender, net_kernel, disconnect, [Victim]), ?line verify_down(Offender, disconnect, Victim, connection_closed), ?line pong = rpc:call(Offender, net_adm, ping, [Victim]), ?line verify_up(Offender, Victim), %% We have a new connection between Offender and Victim, bad message %% should not bring it down. ?line {message_queue_len, 1} = rpc:call(Victim, erlang, process_info, [P, message_queue_len]), ?line exit(S, bang), ?line receive {'DOWN', MS, process, S, bang} -> ok end, %% Wait for a while (if the connection is taken down it might take a %% while). ?line receive after 2000 -> ok end, ?line verify_still_up(Offender, Victim), ?line P ! check_msgs, ?line receive {P, messages_checked} -> ok end, ?line {message_queue_len, 0} = rpc:call(Victim, erlang, process_info, [P, message_queue_len]), ?line verify_still_up(Offender, Victim), ?line P ! done, ?line unlink(P), ?line verify_no_down(Offender, Victim), ?line stop_node(Offender), ?line stop_node(Victim). bad_dist_struct_check_msgs([]) -> receive Msg -> exit({unexpected_message, Msg}) after 0 -> ok end; bad_dist_struct_check_msgs([M|Ms]) -> receive {'EXIT',_,_} = EM -> io:format("Ignoring exit message: ~p~n",[EM]), bad_dist_struct_check_msgs([M|Ms]); Msg -> M = Msg, bad_dist_struct_check_msgs(Ms) end. bad_dist_ext_check_msgs([]) -> receive Msg -> exit({unexpected_message, Msg}) after 0 -> ok end; bad_dist_ext_check_msgs([M|Ms]) -> receive Msg -> M = Msg, bad_dist_ext_check_msgs(Ms) end. dport_reg_send(Node, Name, Msg) -> DPrt = case dport(Node) of undefined -> pong = net_adm:ping(Node), dport(Node); Prt -> Prt end, port_command(DPrt, [dmsg_hdr(), dmsg_ext({?DOP_REG_SEND, self(), ?COOKIE, Name}), dmsg_ext(Msg)]). dport_send(To, Msg) -> Node = node(To), DPrt = case dport(Node) of undefined -> pong = net_adm:ping(Node), dport(Node); Prt -> Prt end, port_command(DPrt, [dmsg_hdr(), dmsg_ext({?DOP_SEND, ?COOKIE, To}), dmsg_ext(Msg)]). send_bad_structure(Offender,Victim,Bad,WhereToPutSelf) -> send_bad_structure(Offender,Victim,Bad,WhereToPutSelf,[]). send_bad_structure(Offender,Victim,Bad,WhereToPutSelf,PayLoad) -> Parent = self(), Done = make_ref(), spawn(Offender, fun () -> Node = node(Victim), pong = net_adm:ping(Node), DPrt = dport(Node), Bad1 = case WhereToPutSelf of 0 -> Bad; N when N > 0 -> setelement(N,Bad,self()) end, DData = [dmsg_hdr(), dmsg_ext(Bad1)] ++ case PayLoad of [] -> []; _Other -> [dmsg_ext(PayLoad)] end, port_command(DPrt, DData), Parent ! {DData,Done} end), receive {WhatSent,Done} -> io:format("Offender sent ~p~n",[WhatSent]), ok after 5000 -> exit(unable_to_send) end. %% send_bad_msgs(): %% Send a valid distribution header and control message %% but an invalid message. This invalid message will be %% enqueued in the receivers message queue. send_bad_msg(BadNode, To) -> send_bad_msgs(BadNode, To, 1). send_bad_msgs(BadNode, To, Repeat) when is_atom(BadNode), is_pid(To), is_integer(Repeat) -> Parent = self(), Done = make_ref(), spawn_link(BadNode, fun () -> Node = node(To), pong = net_adm:ping(Node), DPrt = dport(Node), DData = [dmsg_hdr(), dmsg_ext({?DOP_SEND, ?COOKIE, To}), dmsg_bad_atom_cache_ref()], repeat(fun () -> port_command(DPrt, DData) end, Repeat), Parent ! Done end), receive Done -> ok end. %% send_bad_ctl(): %% Send a valid distribution header but an invalid control message. send_bad_ctl(BadNode, ToNode) when is_atom(BadNode), is_atom(ToNode) -> Parent = self(), Done = make_ref(), spawn_link(BadNode, fun () -> pong = net_adm:ping(ToNode), %% We creat a valid ctl msg and replace an %% atom with an invalid atom cache reference <<131,Replace/binary>> = term_to_binary(replace), Ctl = dmsg_ext({?DOP_REG_SEND, self(), ?COOKIE, replace}), CtlBeginSize = size(Ctl) - size(Replace), <> = Ctl, port_command(dport(ToNode), [dmsg_fake_hdr2(), CtlBegin, dmsg_bad_atom_cache_ref(), dmsg_ext({a, message})]), Parent ! Done end), receive Done -> ok end. %% send_bad_dhr(): %% Send an invalid distribution header send_bad_dhdr(BadNode, ToNode) when is_atom(BadNode), is_atom(ToNode) -> Parent = self(), Done = make_ref(), spawn_link(BadNode, fun () -> pong = net_adm:ping(ToNode), port_command(dport(ToNode), dmsg_bad_hdr()), Parent ! Done end), receive Done -> ok end. dport(Node) when is_atom(Node) -> case catch erts_debug:get_internal_state(available_internal_state) of true -> true; _ -> erts_debug:set_internal_state(available_internal_state, true) end, erts_debug:get_internal_state({dist_port, Node}). dmsg_hdr() -> [131, % Version Magic $D, % Dist header 0]. % No atom cache referenses dmsg_bad_hdr() -> [131, % Version Magic $D, % Dist header 255]. % 255 atom references %% dmsg_fake_hdr1() -> %% A = <<"fake header atom 1">>, %% [131, % Version Magic %% $D, 1, 16#8, 0, size(A), A]. % Fake header dmsg_fake_hdr2() -> A1 = <<"fake header atom 1">>, A2 = <<"atom 2">>, A3 = <<"atom 3">>, [131, % Version Magic $D, 3, 16#88, 16#08, % Flags 0, size(A1), A1, 1, size(A2), A2, 2, size(A3), A3]. dmsg_ext(Term) -> <<131, Res/binary>> = term_to_binary(Term), Res. dmsg_bad_atom_cache_ref() -> [$R, 137]. %%% Utilities timestamp() -> {A,B,C} = erlang:now(), (C div 1000) + (B * 1000) + (A * 1000000000). start_node(X) -> start_node(X, [], []). start_node(X, Y) -> start_node(X, Y, []). start_node(Name, Args, Rel) when is_atom(Name), is_list(Rel) -> Pa = filename:dirname(code:which(?MODULE)), Cookie = atom_to_list(erlang:get_cookie()), RelArg = case Rel of [] -> []; _ -> [{erl,[{release,Rel}]}] end, test_server:start_node(Name, slave, [{args, Args++" -setcookie "++Cookie++" -pa "++Pa} | RelArg]); start_node(Config, Args, Rel) when is_list(Config), is_list(Rel) -> Name = list_to_atom((atom_to_list(?MODULE) ++ "-" ++ atom_to_list(?config(testcase, Config)) ++ "-" ++ integer_to_list(timestamp()))), start_node(Name, Args, Rel). stop_node(Node) -> test_server:stop_node(Node). freeze_node(Node, MS) -> Own = 300, DoingIt = make_ref(), Freezer = self(), spawn_link(Node, fun () -> erts_debug:set_internal_state(available_internal_state, true), dport_send(Freezer, DoingIt), receive after Own -> ok end, erts_debug:set_internal_state(block, MS+Own) end), receive DoingIt -> ok end, receive after Own -> ok end. inet_rpc_nodename({N,H,_Sock}) -> list_to_atom(N++"@"++H). do_inet_rpc({_,_,Sock},M,F,A) -> Bin = term_to_binary({M,F,A}), gen_tcp:send(Sock,Bin), case gen_tcp:recv(Sock,0) of {ok, Bin2} -> T = binary_to_term(Bin2), {ok,T}; Else -> {error, Else} end. inet_rpc_server([Host, PortList]) -> Port = list_to_integer(PortList), {ok, Sock} = gen_tcp:connect(Host, Port,[binary, {packet, 4}, {active, false}]), inet_rpc_server_loop(Sock). inet_rpc_server_loop(Sock) -> case gen_tcp:recv(Sock,0) of {ok, Bin} -> {M,F,A} = binary_to_term(Bin), Res = (catch apply(M,F,A)), RB = term_to_binary(Res), gen_tcp:send(Sock,RB), inet_rpc_server_loop(Sock); _ -> erlang:halt() end. start_relay_node(Node, Args) -> Pa = filename:dirname(code:which(?MODULE)), Cookie = "NOT"++atom_to_list(erlang:get_cookie()), {ok, LSock} = gen_tcp:listen(0, [binary, {packet, 4}, {active, false}]), {ok, Port} = inet:port(LSock), {ok, Host} = inet:gethostname(), RunArg = "-run " ++ atom_to_list(?MODULE) ++ " inet_rpc_server " ++ Host ++ " " ++ integer_to_list(Port), {ok, NN} = test_server:start_node(Node, peer, [{args, Args ++ " -setcookie "++Cookie++" -pa "++Pa++" "++ RunArg}]), [N,H] = string:tokens(atom_to_list(NN),"@"), {ok, Sock} = gen_tcp:accept(LSock), pang = net_adm:ping(NN), {N,H,Sock}. stop_relay_node({N,H,Sock}) -> catch do_inet_rpc(Sock,erlang,halt,[]), catch gen_tcp:close(Sock), wait_dead(N,H,10). wait_dead(N,H,0) -> {error,{not_dead,N,H}}; wait_dead(N,H,X) -> case erl_epmd:port_please(N,H) of {port,_,_} -> receive after 1000 -> ok end, wait_dead(N,H,X-1); noport -> ok; Else -> {error, {unexpected, Else}} end. start_node_monitors(Nodes) -> Master = self(), lists:foreach(fun (Node) -> spawn(Node, fun () -> node_monitor(Master) end) end, Nodes), ok. node_monitor(Master) -> Opts = [nodedown_reason,{node_type,all}], Nodes0 = nodes(connected), net_kernel:monitor_nodes(true, Opts), Nodes1 = nodes(connected), case lists:sort(Nodes0) == lists:sort(Nodes1) of true -> lists:foreach(fun (Node) -> Master ! {nodeup, node(), Node} end, Nodes0), ?t:format("~p ~p: ~p~n", [node(), erlang:now(), Nodes0]), node_monitor_loop(Master); false -> net_kernel:monitor_nodes(false, Opts), flush_node_changes(), node_monitor(Master) end. flush_node_changes() -> receive {NodeChange, _Node, _InfoList} when NodeChange == nodeup; NodeChange == nodedown -> flush_node_changes() after 0 -> ok end. node_monitor_loop(Master) -> receive {nodeup, Node, _InfoList} = Msg -> Master ! {nodeup, node(), Node}, ?t:format("~p ~p: ~p~n", [node(), erlang:now(), Msg]), node_monitor_loop(Master); {nodedown, Node, InfoList} = Msg -> Reason = case lists:keysearch(nodedown_reason, 1, InfoList) of {value, {nodedown_reason, R}} -> R; _ -> undefined end, Master ! {nodedown, node(), Node, Reason}, ?t:format("~p ~p: ~p~n", [node(), erlang:now(), Msg]), node_monitor_loop(Master) end. verify_up(A, B) -> receive {nodeup, A, B} -> ok end, receive {nodeup, B, A} -> ok end. verify_still_up(A, B) -> true = lists:member(B, rpc:call(A, erlang, nodes, [connected])), true = lists:member(A, rpc:call(B, erlang, nodes, [connected])), verify_no_down(A, B). verify_no_down(A, B) -> receive {nodedown, A, B, _} = Msg0 -> ?t:fail(Msg0) after 0 -> ok end, receive {nodedown, B, A, _} = Msg1 -> ?t:fail(Msg1) after 0 -> ok end. %% verify_down(A, B) -> %% receive {nodedown, A, B, _} -> ok end, %% receive {nodedown, B, A, _} -> ok end. verify_down(A, ReasonA, B, ReasonB) -> receive {nodedown, A, B, _} = Msg0 -> {nodedown, A, B, ReasonA} = Msg0 end, receive {nodedown, B, A, _} = Msg1 -> {nodedown, B, A, ReasonB} = Msg1 end, ok. hostname() -> from($@, atom_to_list(node())). from(H, [H | T]) -> T; from(H, [_ | T]) -> from(H, T); from(_, []) -> []. %% fun_spawn(Fun) -> %% fun_spawn(Fun, []). %% fun_spawn(Fun, Args) -> %% spawn_link(erlang, apply, [Fun, Args]). long_or_short() -> case net_kernel:longnames() of true -> " -name "; false -> " -sname " end. until(Fun) -> case Fun() of true -> ok; false -> receive after 10 -> ok end, until(Fun) end. forever(Fun) -> Fun(), forever(Fun). abort(Why) -> erts_debug:set_internal_state(available_internal_state, true), erts_debug:set_internal_state(abort, Why). start_busy_dist_port_tracer() -> Tracer = spawn_link(fun () -> busy_dist_port_tracer() end), erlang:system_monitor(Tracer, [busy_dist_port]), Tracer. stop_busy_dist_port_tracer(Tracer) when is_pid(Tracer) -> unlink(Tracer), exit(Tracer, bye); stop_busy_dist_port_tracer(_) -> true. busy_dist_port_tracer() -> receive {monitor, _SuspendedProcess, busy_dist_port, _Port} = M -> erlang:display(M), busy_dist_port_tracer() end. repeat(_Fun, 0) -> ok; repeat(Fun, N) -> Fun(), repeat(Fun, N-1).