aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/test/distribution_SUITE.erl
diff options
context:
space:
mode:
Diffstat (limited to 'erts/emulator/test/distribution_SUITE.erl')
-rw-r--r--erts/emulator/test/distribution_SUITE.erl414
1 files changed, 389 insertions, 25 deletions
diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl
index 885c66331c..6e31c73e10 100644
--- a/erts/emulator/test/distribution_SUITE.erl
+++ b/erts/emulator/test/distribution_SUITE.erl
@@ -19,7 +19,6 @@
%%
-module(distribution_SUITE).
--compile(r16).
-define(VERSION_MAGIC, 131).
@@ -39,6 +38,8 @@
-define(Line,).
-export([all/0, suite/0, groups/0,
+ init_per_suite/1, end_per_suite/1,
+ init_per_group/2, end_per_group/2,
ping/1, bulk_send_small/1,
group_leader/1,
optimistic_dflags/1,
@@ -53,7 +54,6 @@
dist_parallel_send/1,
atom_roundtrip/1,
unicode_atom_roundtrip/1,
- atom_roundtrip_r16b/1,
contended_atom_cache_entry/1,
contended_unicode_atom_cache_entry/1,
bad_dist_structure/1,
@@ -62,7 +62,12 @@
bad_dist_ext_control/1,
bad_dist_ext_connection_id/1,
bad_dist_ext_size/1,
- start_epmd_false/1, epmd_module/1]).
+ start_epmd_false/1, epmd_module/1,
+ bad_dist_fragments/1,
+ message_latency_large_message/1,
+ message_latency_large_link_exit/1,
+ message_latency_large_monitor_exit/1,
+ message_latency_large_exit2/1]).
%% Internal exports.
-export([sender/3, receiver2/2, dummy_waiter/0, dead_process/0,
@@ -88,9 +93,9 @@ all() ->
ref_port_roundtrip, nil_roundtrip, stop_dist,
{group, trap_bif}, {group, dist_auto_connect},
dist_parallel_send, atom_roundtrip, unicode_atom_roundtrip,
- atom_roundtrip_r16b,
contended_atom_cache_entry, contended_unicode_atom_cache_entry,
- bad_dist_structure, {group, bad_dist_ext},
+ {group, message_latency},
+ {group, bad_dist}, {group, bad_dist_ext},
start_epmd_false, epmd_module].
groups() ->
@@ -100,10 +105,40 @@ groups() ->
{trap_bif, [], [trap_bif_1, trap_bif_2, trap_bif_3]},
{dist_auto_connect, [],
[dist_auto_connect_never, dist_auto_connect_once]},
+ {bad_dist, [],
+ [bad_dist_structure, bad_dist_fragments]},
{bad_dist_ext, [],
[bad_dist_ext_receive, bad_dist_ext_process_info,
bad_dist_ext_size,
- bad_dist_ext_control, bad_dist_ext_connection_id]}].
+ bad_dist_ext_control, bad_dist_ext_connection_id]},
+ {message_latency, [],
+ [message_latency_large_message,
+ message_latency_large_link_exit,
+ message_latency_large_monitor_exit,
+ message_latency_large_exit2]}
+ ].
+
+init_per_suite(Config) ->
+ {ok, Apps} = application:ensure_all_started(os_mon),
+ [{started_apps, Apps} | Config].
+
+end_per_suite(Config) ->
+ Apps = proplists:get_value(started_apps, Config),
+ [application:stop(App) || App <- lists:reverse(Apps)],
+ Config.
+
+init_per_group(message_latency, Config) ->
+ Free = free_memory(),
+ if Free < 2048 ->
+ {skip, "Not enough memory"};
+ true ->
+ Config
+ end;
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
%% Tests pinging a node in different ways.
ping(Config) when is_list(Config) ->
@@ -568,10 +603,20 @@ do_busy_test(Node, Fun) ->
%% Don't match arity; it is different in debug and
%% optimized emulator
[{status, suspended},
- {current_function, {erlang, bif_return_trap, _}}] = Pinfo,
+ {current_function, {Mod, Func, _}}] = Pinfo,
+ if
+ Mod =:= erlang andalso Func =:= bif_return_trap ->
+ true;
+ Mod =:= erts_internal andalso Func =:= dsend_continue_trap ->
+ true;
+ true ->
+ ct:fail({incorrect, pinfo, Pinfo})
+ end,
receive
{'DOWN', M, process, P, Reason} ->
io:format("~p died with exit reason ~p~n", [P, Reason]),
+ verify_nc(node()),
+ verify_nc(Node),
normal = Reason
end
end.
@@ -720,6 +765,7 @@ link_to_dead_new_node(Config) when is_list(Config) ->
{'EXIT', Pid, noproc} ->
ok;
Other ->
+ stop_node(Node),
ct:fail({unexpected_message, Other})
after 5000 ->
ct:fail(nothing_received)
@@ -931,7 +977,9 @@ dist_auto_connect_never(Config) when is_list(Config) ->
ok;
{do_dist_auto_connect, Error} ->
{error, Error};
- Other ->
+ %% The io:formats in dos_dist_auto_connect will
+ %% generate port output messages that are ok
+ Other when not is_port(element(1, Other))->
{error, Other}
after 32000 ->
timeout
@@ -1107,23 +1155,6 @@ atom_roundtrip(Config) when is_list(Config) ->
stop_node(Node),
ok.
-atom_roundtrip_r16b(Config) when is_list(Config) ->
- case test_server:is_release_available("r16b") of
- true ->
- ct:timetrap({minutes, 6}),
- AtomData = unicode_atom_data(),
- verify_atom_data(AtomData),
- case start_node(Config, [], "r16b") of
- {ok, Node} ->
- do_atom_roundtrip(Node, AtomData),
- stop_node(Node);
- {error, timeout} ->
- {skip,"Unable to start OTP R16B release"}
- end;
- false ->
- {skip,"No OTP R16B available"}
- end.
-
unicode_atom_roundtrip(Config) when is_list(Config) ->
AtomData = unicode_atom_data(),
verify_atom_data(AtomData),
@@ -1364,6 +1395,139 @@ get_conflicting_unicode_atoms(CIX, N) ->
get_conflicting_unicode_atoms(CIX, N)
end.
+
+%% The message_latency_large tests that small distribution messages are
+%% not blocked by other large distribution messages. Basically it tests
+%% that fragmentation of distribution messages works.
+message_latency_large_message(Config) when is_list(Config) ->
+ measure_latency_large_message(?FUNCTION_NAME, fun(Dropper, Payload) -> Dropper ! Payload end).
+
+message_latency_large_exit2(Config) when is_list(Config) ->
+ measure_latency_large_message(?FUNCTION_NAME, fun erlang:exit/2).
+
+message_latency_large_link_exit(Config) when is_list(Config) ->
+ message_latency_large_exit(?FUNCTION_NAME, fun erlang:link/1).
+
+message_latency_large_monitor_exit(Config) when is_list(Config) ->
+ message_latency_large_exit(?FUNCTION_NAME, fun(Dropper) ->
+ Dropper ! {monitor, self()},
+ receive ok -> ok end
+ end).
+
+message_latency_large_exit(Nodename, ReasonFun) ->
+ measure_latency_large_message(
+ Nodename,
+ fun(Dropper, Payload) ->
+ Pid = spawn(fun() ->
+ receive go -> ok end,
+ ReasonFun(Dropper),
+ exit(Payload)
+ end),
+
+ FlushTrace = fun F() ->
+ receive
+ {trace, Pid, _, _} ->
+ F()
+ after 0 ->
+ ok
+ end
+ end,
+
+ erlang:trace(Pid, true, [exiting]),
+ Pid ! go,
+ receive
+ {trace, Pid, out_exited, 0} ->
+ FlushTrace()
+ end
+ end).
+
+measure_latency_large_message(Nodename, DataFun) ->
+
+ erlang:system_monitor(self(), [busy_dist_port]),
+
+ {ok, N} = start_node(Nodename),
+
+ Dropper = spawn(N, fun F() ->
+ process_flag(trap_exit, true),
+ receive
+ {monitor,Pid} ->
+ erlang:monitor(process, Pid),
+ Pid ! ok;
+ _ -> ok
+ end,
+ F()
+ end),
+
+ Echo = spawn(N, fun F() -> receive {From, Msg} -> From ! Msg, F() end end),
+
+ case erlang:system_info(build_type) of
+ debug ->
+ %% Test 3.2 MB and 32 MB and test the latency difference of sent messages
+ Payloads = [{I, <<0:(I * 32 * 1024 * 8)>>} || I <- [1,10]];
+ _ ->
+ %% Test 32 MB and 320 MB and test the latency difference of sent messages
+ Payloads = [{I, <<0:(I * 32 * 1024 * 1024 * 8)>>} || I <- [1,10]]
+ end,
+
+ IndexTimes = [{I, measure_latency(DataFun, Dropper, Echo, P)}
+ || {I, P} <- Payloads],
+
+ Times = [ Time || {_I, Time} <- IndexTimes],
+
+ ct:pal("~p",[IndexTimes]),
+
+ stop_node(N),
+
+ case {lists:max(Times), lists:min(Times)} of
+ {Max, Min} when Max * 0.25 > Min ->
+ ct:fail({incorrect_latency, IndexTimes});
+ _ ->
+ ok
+ end.
+
+measure_latency(DataFun, Dropper, Echo, Payload) ->
+
+ flush(),
+
+ Senders = [spawn_monitor(
+ fun F() ->
+ DataFun(Dropper, Payload),
+ receive
+ die -> ok
+ after 0 ->
+ F()
+ end
+ end) || _ <- lists:seq(1,2)],
+
+ [receive
+ {monitor, _Sender, busy_dist_port, _Info} ->
+ ok
+ end || _ <- lists:seq(1,10)],
+
+ {TS, _} =
+ timer:tc(fun() ->
+ [begin
+ Echo ! {self(), hello},
+ receive hello -> ok end
+ end || _ <- lists:seq(1,100)]
+ end),
+ [begin
+ Sender ! die,
+ receive
+ {'DOWN', Ref, process, _, _} ->
+ ok
+ end
+ end || {Sender, Ref} <- Senders],
+ TS.
+
+flush() ->
+ receive
+ _ ->
+ flush()
+ after 0 ->
+ ok
+ end.
+
-define(COOKIE, '').
-define(DOP_LINK, 1).
-define(DOP_SEND, 2).
@@ -1382,6 +1546,15 @@ get_conflicting_unicode_atoms(CIX, N) ->
-define(DOP_DEMONITOR_P, 20).
-define(DOP_MONITOR_P_EXIT, 21).
+-define(DOP_SEND_SENDER, 22).
+-define(DOP_SEND_SENDER_TT, 23).
+
+-define(DOP_PAYLOAD_EXIT, 24).
+-define(DOP_PAYLOAD_EXIT_TT, 25).
+-define(DOP_PAYLOAD_EXIT2, 26).
+-define(DOP_PAYLOAD_EXIT2_TT, 27).
+-define(DOP_PAYLOAD_MONITOR_P_EXIT, 28).
+
start_monitor(Offender,P) ->
Parent = self(),
Q = spawn(Offender,
@@ -1515,7 +1688,145 @@ bad_dist_structure(Config) when is_list(Config) ->
stop_node(Victim),
ok.
+%% Test various dist fragmentation errors
+bad_dist_fragments(Config) when is_list(Config) ->
+ ct:timetrap({seconds, 15}),
+
+ {ok, Offender} = start_node(bad_dist_fragment_offender),
+ {ok, Victim} = start_node(bad_dist_fragment_victim),
+
+ Msg = iolist_to_binary(dmsg_ext(lists:duplicate(255,255))),
+
+ start_node_monitors([Offender,Victim]),
+ Parent = self(),
+ P = spawn(Victim,
+ fun () ->
+ process_flag(trap_exit,true),
+ Parent ! {self(), started},
+ receive check_msgs -> ok end,
+ bad_dist_struct_check_msgs([one,
+ two]),
+ Parent ! {self(), messages_checked},
+ receive done -> ok end
+ end),
+ receive {P, started} -> ok end,
+ pong = rpc:call(Victim, net_adm, ping, [Offender]),
+ verify_up(Offender, Victim),
+ true = lists:member(Offender, rpc:call(Victim, erlang, nodes, [])),
+ start_monitor(Offender,P),
+ P ! one,
+
+ start_monitor(Offender,P),
+ send_bad_fragments(Offender, Victim, P,{?DOP_SEND,?COOKIE,P},3,
+ [{frg, 1, binary:part(Msg, 10,byte_size(Msg)-10)}]),
+
+ start_monitor(Offender,P),
+ send_bad_fragments(Offender, Victim, P,{?DOP_SEND,?COOKIE,P},3,
+ [{hdr, 3, binary:part(Msg, 0,10)},
+ {frg, 1, binary:part(Msg, 10,byte_size(Msg)-10)}]),
+
+ start_monitor(Offender,P),
+ send_bad_fragments(Offender, Victim, P,{?DOP_SEND,?COOKIE,P},3,
+ [{hdr, 3, binary:part(Msg, 0,10)},
+ {hdr, 3, binary:part(Msg, 0,10)}]),
+
+ start_monitor(Offender,P),
+ send_bad_fragments(Offender, Victim, P,{?DOP_SEND,?COOKIE,P,broken},3,
+ [{hdr, 1, binary:part(Msg, 10,byte_size(Msg)-10)}]),
+
+ start_monitor(Offender,P),
+ send_bad_fragments(Offender, Victim, P,{?DOP_SEND,?COOKIE,P},3,
+ [{hdr, 3, binary:part(Msg, 10,byte_size(Msg)-10)},
+ close]),
+
+ start_monitor(Offender,P),
+ ExitVictim = spawn(Victim, fun() -> receive ok -> ok end end),
+ send_bad_fragments(Offender, Victim, P,{?DOP_PAYLOAD_EXIT,P,ExitVictim},2,
+ [{hdr, 1, [131]}]),
+
+ start_monitor(Offender,P),
+ Exit2Victim = spawn(Victim, fun() -> receive ok -> ok end end),
+ send_bad_fragments(Offender, Victim, P,{?DOP_PAYLOAD_EXIT2,P,Exit2Victim},2,
+ [{hdr, 1, [132]}]),
+
+ start_monitor(Offender,P),
+ DownVictim = spawn(Victim, fun() -> receive ok -> ok end end),
+ DownRef = erlang:monitor(process, DownVictim),
+ send_bad_fragments(Offender, Victim, P,{?DOP_PAYLOAD_MONITOR_P_EXIT,P,DownVictim,DownRef},2,
+ [{hdr, 1, [133]}]),
+
+ P ! two,
+ P ! check_msgs,
+ receive
+ {P, messages_checked} -> ok
+ after 5000 ->
+ exit(victim_is_dead)
+ end,
+
+ {message_queue_len, 0}
+ = rpc:call(Victim, erlang, process_info, [P, message_queue_len]),
+
+ unlink(P),
+ P ! done,
+ stop_node(Offender),
+ stop_node(Victim),
+ ok.
+
+dmsg_frag_hdr(Frag) ->
+ dmsg_frag_hdr(erlang:phash2(self()), Frag).
+dmsg_frag_hdr(Seq, Frag) ->
+ [131, $E, uint64_be(Seq), uint64_be(Frag), 0].
+
+dmsg_frag(Frag) ->
+ dmsg_frag(erlang:phash2(self()), Frag).
+dmsg_frag(Seq, Frag) ->
+ [131, $F, uint64_be(Seq), uint64_be(Frag)].
+
+send_bad_fragments(Offender,VictimNode,Victim,Ctrl,WhereToPutSelf,Fragments) ->
+ Parent = self(),
+ Done = make_ref(),
+ ct:pal("Send: ~p",[Fragments]),
+ spawn_link(Offender,
+ fun () ->
+ Node = node(Victim),
+ pong = net_adm:ping(Node),
+ erlang:monitor_node(Node, true),
+ DCtrl = dctrl(Node),
+ Ctrl1 = case WhereToPutSelf of
+ 0 ->
+ Ctrl;
+ N when N > 0 ->
+ setelement(N,Ctrl,self())
+ end,
+
+ FragData = [case Type of
+ hdr ->
+ [dmsg_frag_hdr(FragId),
+ dmsg_ext(Ctrl1), FragPayload];
+ frg ->
+ [dmsg_frag(FragId), FragPayload]
+ end || {Type, FragId, FragPayload} <- Fragments],
+ receive {nodedown, Node} -> exit("premature nodedown")
+ after 10 -> ok
+ end,
+
+ [ dctrl_send(DCtrl, D) || D <- FragData ],
+ [ erlang:port_close(DCtrl) || close <- Fragments],
+
+ receive {nodedown, Node} -> ok
+ after 5000 -> exit("missing nodedown")
+ end,
+ Parent ! {FragData,Done}
+ end),
+ receive
+ {WhatSent,Done} ->
+ io:format("Offender sent ~p~n",[WhatSent]),
+ verify_nc(VictimNode),
+ ok
+ after 7000 ->
+ exit(unable_to_send)
+ end.
bad_dist_ext_receive(Config) when is_list(Config) ->
{ok, Offender} = start_node(bad_dist_ext_receive_offender),
@@ -2124,8 +2435,30 @@ start_node(Config, Args, Rel) when is_list(Config), is_list(Rel) ->
start_node(Name, Args, Rel).
stop_node(Node) ->
+ verify_nc(Node),
test_server:stop_node(Node).
+verify_nc(Node) ->
+ P = self(),
+ Ref = make_ref(),
+ Pid = spawn(Node,
+ fun() ->
+ R = erts_test_utils:check_node_dist(fun(E) -> E end),
+ P ! {Ref, R}
+ end),
+ MonRef = monitor(process, Pid),
+ receive
+ {Ref, ok} ->
+ demonitor(MonRef,[flush]),
+ ok;
+ {Ref, Error} ->
+ ct:log("~p",[Error]),
+ ct:fail(failed_nc_refc_check);
+ {'DOWN', MonRef, _, _, _} = Down ->
+ ct:log("~p",[Down]),
+ ct:fail(crashed_nc_refc_check)
+ end.
+
freeze_node(Node, MS) ->
Own = 300,
DoingIt = make_ref(),
@@ -2485,6 +2818,17 @@ mk_ref({NodeNameExt, Creation}, Numbers) when is_integer(Creation),
exit({unexpected_binary_to_term_result, Other})
end.
+uint64_be(Uint) when is_integer(Uint), 0 =< Uint, Uint < 1 bsl 64 ->
+ [(Uint bsr 56) band 16#ff,
+ (Uint bsr 48) band 16#ff,
+ (Uint bsr 40) band 16#ff,
+ (Uint bsr 32) band 16#ff,
+ (Uint bsr 24) band 16#ff,
+ (Uint bsr 16) band 16#ff,
+ (Uint bsr 8) band 16#ff,
+ Uint band 16#ff];
+uint64_be(Uint) ->
+ exit({badarg, uint64_be, [Uint]}).
uint32_be(Uint) when is_integer(Uint), 0 =< Uint, Uint < 1 bsl 32 ->
[(Uint bsr 24) band 16#ff,
@@ -2505,3 +2849,23 @@ uint8(Uint) when is_integer(Uint), 0 =< Uint, Uint < 1 bsl 8 ->
Uint band 16#ff;
uint8(Uint) ->
exit({badarg, uint8, [Uint]}).
+
+free_memory() ->
+ %% Free memory in MB.
+ try
+ SMD = memsup:get_system_memory_data(),
+ {value, {free_memory, Free}} = lists:keysearch(free_memory, 1, SMD),
+ TotFree = (Free +
+ case lists:keysearch(cached_memory, 1, SMD) of
+ {value, {cached_memory, Cached}} -> Cached;
+ false -> 0
+ end +
+ case lists:keysearch(buffered_memory, 1, SMD) of
+ {value, {buffered_memory, Buffed}} -> Buffed;
+ false -> 0
+ end),
+ TotFree div (1024*1024)
+ catch
+ error : undef ->
+ ct:fail({"os_mon not built"})
+ end.