From f5c6195f7cfaea66ab3137ba5fa30f90626c8213 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Gustavsson?= Date: Tue, 30 May 2017 16:14:32 +0200 Subject: distribution_SUITE: Refactor bulk_send_bigbig/1 Refactor bulk_send_bigbig/1 to make it somewhat easier to follow. --- erts/emulator/test/distribution_SUITE.erl | 122 +++++++++++++++--------------- 1 file changed, 59 insertions(+), 63 deletions(-) (limited to 'erts/emulator') diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl index 434e729310..78d67ef2ae 100644 --- a/erts/emulator/test/distribution_SUITE.erl +++ b/erts/emulator/test/distribution_SUITE.erl @@ -62,8 +62,7 @@ -export([sender/3, receiver2/2, dummy_waiter/0, dead_process/0, roundtrip/1, bounce/1, do_dist_auto_connect/1, inet_rpc_server/1, dist_parallel_sender/3, dist_parallel_receiver/0, - dist_evil_parallel_receiver/0, - sendersender/4, sendersender2/4]). + dist_evil_parallel_receiver/0]). %% epmd_module exports -export([start_link/0, register_node/2, register_node/3, port_please/2]). @@ -129,33 +128,43 @@ bulk_send_small(Config) when is_list(Config) -> bulk_send_big(Config) when is_list(Config) -> bulk_send(32, 64). -bulk_send_bigbig(Config) when is_list(Config) -> - bulk_sendsend(32*5, 4). - bulk_send(Terms, BinSize) -> ct:timetrap({seconds, 30}), io:format("Sending ~w binaries, each of size ~w K", [Terms, BinSize]), {ok, Node} = start_node(bulk_receiver), Recv = spawn(Node, erlang, apply, [fun receiver/2, [0, 0]]), - Bin = list_to_binary(lists:duplicate(BinSize*1024, 253)), + Bin = binary:copy(<<253>>, BinSize*1024), Size = Terms*size(Bin), {Elapsed, {Terms, Size}} = test_server:timecall(?MODULE, sender, [Recv, Bin, Terms]), stop_node(Node), - {comment, integer_to_list(trunc(Size/1024/max(1,Elapsed)+0.5)) ++ " K/s"}. + {comment, integer_to_list(round(Size/1024/max(1,Elapsed))) ++ " K/s"}. + +sender(To, _Bin, 0) -> + To ! {done, self()}, + receive + Any -> + Any + end; +sender(To, Bin, Left) -> + To ! {term, Bin}, + sender(To, Bin, Left-1). -bulk_sendsend(Terms, BinSize) -> +bulk_send_bigbig(Config) when is_list(Config) -> + Terms = 32*5, + BinSize = 4, {Rate1, MonitorCount1} = bulk_sendsend2(Terms, BinSize, 5), {Rate2, MonitorCount2} = bulk_sendsend2(Terms, BinSize, 995), Ratio = if MonitorCount2 == 0 -> MonitorCount1 / 1.0; true -> MonitorCount1 / MonitorCount2 end, - Comment = integer_to_list(Rate1) ++ " K/s, " ++ - integer_to_list(Rate2) ++ " K/s, " ++ - integer_to_list(MonitorCount1) ++ " monitor msgs, " ++ - integer_to_list(MonitorCount2) ++ " monitor msgs, " ++ - float_to_list(Ratio) ++ " monitor ratio", + Comment0 = io_lib:format("~p K/s, ~p K/s, " + "~p monitor msgs, ~p monitor msgs, " + "~.1f monitor ratio", + [Rate1,Rate2,MonitorCount1, + MonitorCount2,Ratio]), + Comment = lists:flatten(Comment0), if %% A somewhat arbitrary ratio, but hopefully one that will %% accommodate a wide range of CPU speeds. @@ -169,12 +178,11 @@ bulk_sendsend(Terms, BinSize) -> bulk_sendsend2(Terms, BinSize, BusyBufSize) -> ct:timetrap({seconds, 30}), - io:format("Sending ~w binaries, each of size ~w K", + io:format("\nSending ~w binaries, each of size ~w K", [Terms, BinSize]), {ok, NodeRecv} = start_node(bulk_receiver), Recv = spawn(NodeRecv, erlang, apply, [fun receiver/2, [0, 0]]), - Bin = list_to_binary(lists:duplicate(BinSize*1024, 253)), - %%Size = Terms*size(Bin), + Bin = binary:copy(<<253>>, BinSize*1024), %% SLF LEFT OFF HERE. %% When the caller uses small hunks, like 4k via @@ -185,74 +193,62 @@ bulk_sendsend2(Terms, BinSize, BusyBufSize) -> %% default busy size and "+zdbbl 5", and if the 5 case gets %% "many many more" monitor messages, then we know we're working. - {ok, NodeSend} = start_node(bulk_sender, "+zdbbl " ++ integer_to_list(BusyBufSize)), - _Send = spawn(NodeSend, erlang, apply, [fun sendersender/4, [self(), Recv, Bin, Terms]]), + {ok, NodeSend} = start_node(bulk_sender, "+zdbbl " ++ + integer_to_list(BusyBufSize)), + _Send = spawn(NodeSend, erlang, apply, + [fun sendersender/4, [self(), Recv, Bin, Terms]]), {Elapsed, {_TermsN, SizeN}, MonitorCount} = - receive - %% On some platforms (windows), the time taken is 0 so we - %% simulate that some little time has passed. - {sendersender, {0.0,T,MC}} -> - {0.0015, T, MC}; - {sendersender, BigRes} -> - BigRes - end, + receive + %% On some platforms (Windows), the time taken is 0 so we + %% simulate that some little time has passed. + {sendersender, {0.0,T,MC}} -> + {0.0015, T, MC}; + {sendersender, BigRes} -> + BigRes + end, stop_node(NodeRecv), stop_node(NodeSend), - {trunc(SizeN/1024/Elapsed+0.5), MonitorCount}. - -sender(To, _Bin, 0) -> - To ! {done, self()}, - receive - Any -> - Any - end; -sender(To, Bin, Left) -> - To ! {term, Bin}, - sender(To, Bin, Left-1). + {round(SizeN/1024/Elapsed), MonitorCount}. %% Sender process to be run on a slave node sendersender(Parent, To, Bin, Left) -> erlang:system_monitor(self(), [busy_dist_port]), - [spawn(fun() -> sendersender2(To, Bin, Left, false) end) || - _ <- lists:seq(1,1)], + _ = spawn(fun() -> + sendersender_send(To, Bin, Left), + exit(normal) + end), {USec, {Res, MonitorCount}} = - timer:tc(?MODULE, sendersender2, [To, Bin, Left, true]), + timer:tc(fun() -> + sendersender_send(To, Bin, Left), + To ! {done, self()}, + count_monitors(0) + end), Parent ! {sendersender, {USec/1000000, Res, MonitorCount}}. -sendersender2(To, Bin, Left, SendDone) -> - sendersender3(To, Bin, Left, SendDone, 0). +sendersender_send(_To, _Bin, 0) -> + ok; +sendersender_send(To, Bin, Left) -> + To ! {term, Bin}, + sendersender_send(To, Bin, Left-1). -sendersender3(To, _Bin, 0, SendDone, MonitorCount) -> - if SendDone -> - To ! {done, self()}; - true -> - ok - end, +count_monitors(MonitorCount) -> receive {monitor, _Pid, _Type, _Info} -> - sendersender3(To, _Bin, 0, SendDone, MonitorCount + 1) + count_monitors(MonitorCount + 1) after 0 -> - if SendDone -> - receive - Any when is_tuple(Any), size(Any) == 2 -> - {Any, MonitorCount} - end; - true -> - exit(normal) - end - end; -sendersender3(To, Bin, Left, SendDone, MonitorCount) -> - To ! {term, Bin}, - %%timer:sleep(50), - sendersender3(To, Bin, Left-1, SendDone, MonitorCount). + receive + {_,_}=Any -> + {Any,MonitorCount} + end + end. %% Receiver process to be run on a slave node. receiver(Terms, Size) -> receive {term, Bin} -> - receiver(Terms+1, Size+size(Bin)); + receiver(Terms+1, Size+byte_size(Bin)); {done, ReplyTo} -> ReplyTo ! {Terms, Size} end. -- cgit v1.2.3