diff options
author | Scott Lystig Fritchie <[email protected]> | 2010-10-22 15:25:10 -0500 |
---|---|---|
committer | Rickard Green <[email protected]> | 2010-11-02 11:48:44 +0100 |
commit | 8faf1746ece60fc5fa634e5fd16e98df1ef7f3ba (patch) | |
tree | 7420747bcb3589811ec177c09f110d424312debb | |
parent | 499082d25170aa7edf922a6aee0465a7ffb144d4 (diff) | |
download | otp-8faf1746ece60fc5fa634e5fd16e98df1ef7f3ba.tar.gz otp-8faf1746ece60fc5fa634e5fd16e98df1ef7f3ba.tar.bz2 otp-8faf1746ece60fc5fa634e5fd16e98df1ef7f3ba.zip |
Add flag-based setting for the distribution buffer busy limit
Id: OTP-8912
This patch creates a new family of flags with the "+z" prefix. It
further creates a new configuration option called "dbbl" (which is the
first letter of the name dist_buf_busy_limit). Example usage of this
flag would be "+zdbbl 1048576".
This patch creates an adjustable buffer limit for the amount of data
that may be buffered by the erlang distribution code (in dist.c
specifically). Before this patch, this hard-coded constant was used:
#define ERTS_DE_BUSY_LIMIT (128*1024)
When large binaries are transmitted between nodes (or simply a lot of
medium-sized binaries), it is very easy to hit the old 128KB limit.
Processes that use the erlang:system_monitor() BIF to monitor system
events can be spammed by {monitor, busy_dist_port, ...} message tuples
at rates of tens to even hundreds of messages/second.
A larger buffer limit will allow processes to buffer more outgoing
messages over the distribution. When the buffer limit has been
reached, sending processes will be suspended until the buffer size has
shrunk. The buffer limit is per distribution channel. A higher limit
will give lower latency and higher throughput at the expense of
higher memory usage.
A variation of this patch has been in commercial production use in at
least two companies that the author is aware of. Larger buffer values
can reduce the number of {monitor, busy_dist_port, ...} system
messages drastically, lower overall messaging latencies, and prevent
false timeouts and 'nodedown' messages in extremely busy Mnesia systems.
Test suite: there are two tests:
a. In erlexec_SUITE.erl to test basic set & get of the value
b. In distribution_SUITE.erl, to verify that setting +zdbbl very
low will actually change behavior.
-rw-r--r-- | erts/doc/src/erl.xml | 19 | ||||
-rw-r--r-- | erts/doc/src/erlang.xml | 7 | ||||
-rw-r--r-- | erts/emulator/beam/dist.c | 8 | ||||
-rw-r--r-- | erts/emulator/beam/dist.h | 3 | ||||
-rw-r--r-- | erts/emulator/beam/erl_bif_info.c | 7 | ||||
-rw-r--r-- | erts/emulator/beam/erl_init.c | 25 | ||||
-rw-r--r-- | erts/emulator/test/distribution_SUITE.erl | 93 | ||||
-rw-r--r-- | erts/etc/common/erlexec.c | 22 | ||||
-rw-r--r-- | erts/test/erlexec_SUITE.erl | 24 |
9 files changed, 196 insertions, 12 deletions
diff --git a/erts/doc/src/erl.xml b/erts/doc/src/erl.xml index e36d0adb0d..09c9cf6812 100644 --- a/erts/doc/src/erl.xml +++ b/erts/doc/src/erl.xml @@ -906,6 +906,25 @@ <seealso marker="kernel:error_logger#warning_map/0">error_logger(3)</seealso> for further information.</p> </item> + <tag><c><![CDATA[+zFlag Value]]></c></tag> + <item> + <p>Miscellaneous flags.</p> + <taglist> + <tag><marker id="+zdbbl"><c>+zdbbl size</c></marker></tag> + <item> + <p>Set the distribution buffer busy limit + (<seealso marker="erlang#system_info_dist_buf_busy_limit">dist_buf_busy_limit</seealso>) + in kilobytes. Valid range is 1-2097151. Default is 128.</p> + <p>A larger buffer limit will allow processes to buffer + more outgoing messages over the distribution. When the + buffer limit has been reached, sending processes will be + suspended until the buffer size has shrunk. The buffer + limit is per distribution channel. A higher limit will + give lower latency and higher throughput at the expense + of higher memory usage.</p> + </item> + </taglist> + </item> </taglist> </section> diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml index 59ac3dc66c..25c92bdbb7 100644 --- a/erts/doc/src/erlang.xml +++ b/erts/doc/src/erlang.xml @@ -5624,6 +5624,13 @@ true</pre> The return value will always be <c>false</c> since the elib_malloc allocator has been removed.</p> </item> + <tag><marker id="system_info_dist_buf_busy_limit"><c>dist_buf_busy_limit</c></marker></tag> + <item> + <p>Returns the value of the distribution buffer busy limit + in bytes. This limit can be set on startup by passing the + <seealso marker="erl#+zdbbl">+zdbbl</seealso> command line + flag to <c>erl</c>.</p> + </item> <tag><c>fullsweep_after</c></tag> <item> <p>Returns <c>{fullsweep_after, int()}</c> which is the diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 16b6aeac3f..4497e17d79 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -97,6 +97,8 @@ dist_msg_dbg(ErtsDistExternal *edep, char *what, byte *buf, int sz) #define PASS_THROUGH 'p' /* This code should go */ int erts_is_alive; /* System must be blocked on change */ +int erts_dist_buf_busy_limit; + /* distribution trap functions */ Export* dsend2_trap = NULL; @@ -1453,8 +1455,6 @@ int erts_net_message(Port *prt, return -1; } -#define ERTS_DE_BUSY_LIMIT (128*1024) - static int dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy) { @@ -1540,7 +1540,7 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy) ErtsProcList *plp = NULL; erts_smp_spin_lock(&dep->qlock); dep->qsize += size_obuf(obuf); - if (dep->qsize >= ERTS_DE_BUSY_LIMIT) + if (dep->qsize >= erts_dist_buf_busy_limit) dep->qflgs |= ERTS_DE_QFLG_BUSY; if (!force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) { erts_smp_spin_unlock(&dep->qlock); @@ -1911,7 +1911,7 @@ erts_dist_command(Port *prt, int reds_limit) ASSERT(dep->qsize >= obufsize); dep->qsize -= obufsize; obufsize = 0; - if (de_busy && !prt_busy && dep->qsize < ERTS_DE_BUSY_LIMIT) { + if (de_busy && !prt_busy && dep->qsize < erts_dist_buf_busy_limit) { ErtsProcList *suspendees; int resumed; suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index fa19c7fb45..28cdd05c3c 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -99,7 +99,8 @@ typedef struct { #define ERTS_DE_IS_CONNECTED(DEP) \ (!ERTS_DE_IS_NOT_CONNECTED((DEP))) - +#define ERTS_DE_BUSY_LIMIT (128*1024) +extern int erts_dist_buf_busy_limit; extern int erts_is_alive; /* diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 40d8dc097c..801263ec26 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -2533,6 +2533,13 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1) BIF_RET(erts_nif_taints(BIF_P)); } else if (ERTS_IS_ATOM_STR("reader_groups_map", BIF_ARG_1)) { BIF_RET(erts_get_reader_groups_map(BIF_P)); + } else if (ERTS_IS_ATOM_STR("dist_buf_busy_limit", BIF_ARG_1)) { + Uint hsz = 0; + + (void) erts_bld_uint(NULL, &hsz, erts_dist_buf_busy_limit); + hp = hsz ? HAlloc(BIF_P, hsz) : NULL; + res = erts_bld_uint(&hp, NULL, erts_dist_buf_busy_limit); + BIF_RET(res); } BIF_ERROR(BIF_P, BADARG); diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index 4ae656a3ad..36fefc5cba 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -535,7 +535,8 @@ void erts_usage(void) erts_fprintf(stderr, "-W<i|w> set error logger warnings mapping,\n"); erts_fprintf(stderr, " see error_logger documentation for details\n"); - + erts_fprintf(stderr, "-zdbbl size set the distribution buffer busy limit in kilobytes\n"); + erts_fprintf(stderr, " valid range is [1-%d]\n", INT_MAX/1024); erts_fprintf(stderr, "\n"); erts_fprintf(stderr, "Note that if the emulator is started with erlexec (typically\n"); erts_fprintf(stderr, "from the erl script), these flags should be specified with +.\n"); @@ -818,7 +819,7 @@ early_init(int *argc, char **argv) /* erl_sys_args(argc, argv); erts_ets_realloc_always_moves = 0; - + erts_dist_buf_busy_limit = ERTS_DE_BUSY_LIMIT; } #ifndef ERTS_SMP @@ -1346,6 +1347,26 @@ erl_start(int argc, char **argv) } break; + case 'z': { + char *sub_param = argv[i]+2; + int new_limit; + + if (has_prefix("dbbl", sub_param)) { + arg = get_arg(sub_param+4, argv[i+1], &i); + new_limit = atoi(arg); + if (new_limit < 1 || INT_MAX/1024 < new_limit) { + erts_fprintf(stderr, "Invalid dbbl limit: %d\n", new_limit); + erts_usage(); + } else { + erts_dist_buf_busy_limit = new_limit*1024; + } + } else { + erts_fprintf(stderr, "bad -z option %s\n", argv[i]); + erts_usage(); + } + break; + } + default: erts_fprintf(stderr, "%s unknown flag %s\n", argv[0], argv[i]); erts_usage(); diff --git a/erts/emulator/test/distribution_SUITE.erl b/erts/emulator/test/distribution_SUITE.erl index 7c19274696..79252d0593 100644 --- a/erts/emulator/test/distribution_SUITE.erl +++ b/erts/emulator/test/distribution_SUITE.erl @@ -27,6 +27,7 @@ -export([all/1, ping/1, bulk_send/1, bulk_send_small/1, bulk_send_big/1, + bulk_send_bigbig/1, local_send/1, local_send_small/1, local_send_big/1, local_send_legal/1, link_to_busy/1, exit_to_busy/1, lost_exit/1, link_to_dead/1, link_to_dead_new_node/1, @@ -50,7 +51,8 @@ -export([sender/3, receiver2/2, dummy_waiter/0, dead_process/0, roundtrip/1, bounce/1, do_dist_auto_connect/1, inet_rpc_server/1, dist_parallel_sender/3, dist_parallel_receiver/0, - dist_evil_parallel_receiver/0]). + dist_evil_parallel_receiver/0, + sendersender/4, sendersender2/4]). all(suite) -> [ ping, bulk_send, local_send, link_to_busy, exit_to_busy, @@ -121,7 +123,7 @@ bulk_send(doc) -> "the time. This tests that a process that is suspended on a ", "busy port will eventually be resumed."]; bulk_send(suite) -> - [bulk_send_small, bulk_send_big]. + [bulk_send_small, bulk_send_big, bulk_send_bigbig]. bulk_send_small(Config) when is_list(Config) -> ?line bulk_send(64, 32). @@ -129,6 +131,9 @@ bulk_send_small(Config) when is_list(Config) -> bulk_send_big(Config) when is_list(Config) -> ?line bulk_send(32, 64). +bulk_send_bigbig(Config) when is_list(Config) -> + ?line bulk_sendsend(32*5, 4). + bulk_send(Terms, BinSize) -> ?line Dog = test_server:timetrap(test_server:seconds(30)), @@ -145,6 +150,53 @@ bulk_send(Terms, BinSize) -> ?line test_server:timetrap_cancel(Dog), {comment, integer_to_list(trunc(Size/1024/Elapsed+0.5)) ++ " K/s"}. +bulk_sendsend(Terms, BinSize) -> + {Rate1, MonitorCount1} = bulk_sendsend2(Terms, BinSize, 5), + {Rate2, MonitorCount2} = bulk_sendsend2(Terms, BinSize, 995), + Ratio = if MonitorCount2 == 0 -> MonitorCount1 / 1.0; + true -> MonitorCount1 / MonitorCount2 + end, + %% A somewhat arbitrary ratio, but hopefully one that will accomodate + %% a wide range of CPU speeds. + true = (Ratio > 8.0), + {comment, + integer_to_list(Rate1) ++ " K/s, " ++ + integer_to_list(Rate2) ++ " K/s, " ++ + integer_to_list(MonitorCount1) ++ " monitor msgs, " ++ + integer_to_list(MonitorCount2) ++ " monitor msgs, " ++ + float_to_list(Ratio) ++ " monitor ratio"}. + +bulk_sendsend2(Terms, BinSize, BusyBufSize) -> + ?line Dog = test_server:timetrap(test_server:seconds(30)), + + ?line io:format("Sending ~w binaries, each of size ~w K", + [Terms, BinSize]), + ?line {ok, NodeRecv} = start_node(bulk_receiver), + ?line Recv = spawn(NodeRecv, erlang, apply, [fun receiver/2, [0, 0]]), + ?line Bin = list_to_binary(lists:duplicate(BinSize*1024, 253)), + ?line Size = Terms*size(Bin), + + %% SLF LEFT OFF HERE. + %% When the caller uses small hunks, like 4k via + %% bulk_sendsend(32*5, 4), then (on my laptop at least), we get + %% zero monitor messages. But if we use "+zdbbl 5", then we + %% get a lot of monitor messages. So, if we can count up the + %% total number of monitor messages that we get when running both + %% default busy size and "+zdbbl 5", and if the 5 case gets + %% "many many more" monitor messages, then we know we're working. + + ?line {ok, NodeSend} = start_node(bulk_sender, "+zdbbl " ++ integer_to_list(BusyBufSize)), + ?line _Send = spawn(NodeSend, erlang, apply, [fun sendersender/4, [self(), Recv, Bin, Terms]]), + ?line {Elapsed, {TermsN, SizeN}, MonitorCount} = + receive {sendersender, BigRes} -> + BigRes + end, + ?line stop_node(NodeRecv), + ?line stop_node(NodeSend), + + ?line test_server:timetrap_cancel(Dog), + {trunc(SizeN/1024/Elapsed+0.5), MonitorCount}. + sender(To, _Bin, 0) -> To ! {done, self()}, receive @@ -155,6 +207,43 @@ sender(To, Bin, Left) -> To ! {term, Bin}, sender(To, Bin, Left-1). +%% Sender process to be run on a slave node + +sendersender(Parent, To, Bin, Left) -> + erlang:system_monitor(self(), [busy_dist_port]), + [spawn(fun() -> sendersender2(To, Bin, Left, false) end) || + _ <- lists:seq(1,1)], + {USec, {Res, MonitorCount}} = + timer:tc(?MODULE, sendersender2, [To, Bin, Left, true]), + Parent ! {sendersender, {USec/1000000, Res, MonitorCount}}. + +sendersender2(To, Bin, Left, SendDone) -> + sendersender3(To, Bin, Left, SendDone, 0). + +sendersender3(To, _Bin, 0, SendDone, MonitorCount) -> + if SendDone -> + To ! {done, self()}; + true -> + ok + end, + receive + {monitor, _Pid, _Type, _Info} = M -> + sendersender3(To, _Bin, 0, SendDone, MonitorCount + 1) + after 0 -> + if SendDone -> + receive + Any when is_tuple(Any), size(Any) == 2 -> + {Any, MonitorCount} + end; + true -> + exit(normal) + end + end; +sendersender3(To, Bin, Left, SendDone, MonitorCount) -> + To ! {term, Bin}, + %%timer:sleep(50), + sendersender3(To, Bin, Left-1, SendDone, MonitorCount). + %% Receiver process to be run on a slave node. receiver(Terms, Size) -> diff --git a/erts/etc/common/erlexec.c b/erts/etc/common/erlexec.c index c1fc2aebee..d65a55f223 100644 --- a/erts/etc/common/erlexec.c +++ b/erts/etc/common/erlexec.c @@ -138,6 +138,12 @@ static char *plusr_val_switches[] = { NULL }; +/* +z arguments with values */ +static char *plusz_val_switches[] = { + "dbbl", + NULL +}; + /* * Define sleep(seconds) in terms of Sleep() on Windows. @@ -909,6 +915,20 @@ int main(int argc, char **argv) i++; } break; + case 'z': + if (!is_one_of_strings(&argv[i][2], plusz_val_switches)) { + goto the_default; + } else { + if (i+1 >= argc + || argv[i+1][0] == '-' + || argv[i+1][0] == '+') + usage(argv[i]); + argv[i][0] = '-'; + add_Eargs(argv[i]); + add_Eargs(argv[i+1]); + i++; + } + break; default: the_default: argv[i][0] = '-'; /* Change +option to -option. */ @@ -1096,7 +1116,7 @@ usage_aux(void) "[+l] [+M<SUBSWITCH> <ARGUMENT>] [+P MAX_PROCS] [+R COMPAT_REL] " "[+r] [+rg READER_GROUPS_LIMIT] [+s SCHEDULER_OPTION] " "[+S NO_SCHEDULERS:NO_SCHEDULERS_ONLINE] [+T LEVEL] [+V] [+v] " - "[+W<i|w>] [args ...]\n"); + "[+W<i|w>] [+z MISC_OPTION] [args ...]\n"); exit(1); } diff --git a/erts/test/erlexec_SUITE.erl b/erts/test/erlexec_SUITE.erl index 164ce9faaf..6adb865f6d 100644 --- a/erts/test/erlexec_SUITE.erl +++ b/erts/test/erlexec_SUITE.erl @@ -33,7 +33,7 @@ -export([all/1, init_per_testcase/2, fin_per_testcase/2]). --export([args_file/1, evil_args_file/1, env/1, args_file_env/1, otp_7461/1, otp_7461_remote/1, otp_8209/1]). +-export([args_file/1, evil_args_file/1, env/1, args_file_env/1, otp_7461/1, otp_7461_remote/1, otp_8209/1, zdbbl_dist_buf_busy_limit/1]). -include_lib("test_server/include/test_server.hrl"). @@ -53,7 +53,8 @@ fin_per_testcase(_Case, Config) -> all(doc) -> []; all(suite) -> - [args_file, evil_args_file, env, args_file_env, otp_7461, otp_8209]. + [args_file, evil_args_file, env, args_file_env, otp_7461, otp_8209, + zdbbl_dist_buf_busy_limit]. otp_8209(doc) -> @@ -330,6 +331,25 @@ otp_7461_remote([halt, Pid]) -> io:format("halt order from ~p to node ~p\n",[Pid,node()]), halt(). +zdbbl_dist_buf_busy_limit(doc) -> + ["Check +zdbbl flag"]; +zdbbl_dist_buf_busy_limit(suite) -> + []; +zdbbl_dist_buf_busy_limit(Config) when is_list(Config) -> + LimKB = 1122233, + LimB = LimKB*1024, + ?line {ok,[[PName]]} = init:get_argument(progname), + ?line SNameS = "erlexec_test_02", + ?line SName = list_to_atom(SNameS++"@"++ + hd(tl(string:tokens(atom_to_list(node()),"@")))), + ?line Cmd = PName ++ " -sname "++SNameS++" -setcookie "++ + atom_to_list(erlang:get_cookie()) ++ + " +zdbbl " ++ integer_to_list(LimKB), + ?line open_port({spawn,Cmd},[]), + ?line pong = loop_ping(SName,40), + ?line LimB = rpc:call(SName,erlang,system_info,[dist_buf_busy_limit]), + ?line ok = cleanup_node(SNameS, 10), + ok. %% |