aboutsummaryrefslogtreecommitdiffstats
path: root/erts
diff options
context:
space:
mode:
authorRickard Green <rickard@erlang.org>2010-10-19 14:31:31 +0200
committerRickard Green <rickard@erlang.org>2010-11-02 13:39:50 +0100
commit9fb85488909f45d65409f3d8158398f6ad3bbbf2 (patch)
tree76b47deac61cda333bb94babb23cf90cb98d34bd /erts
parent158ed71a5ddc5050809723a214a8d8c841022871 (diff)
downloadotp-9fb85488909f45d65409f3d8158398f6ad3bbbf2.tar.gz
otp-9fb85488909f45d65409f3d8158398f6ad3bbbf2.tar.bz2
otp-9fb85488909f45d65409f3d8158398f6ad3bbbf2.zip
Be less eager to set dist entry in busy state
The runtime system is now less eager to suspend processes sending messages over the distribution. The default value of the distribution buffer busy limit has also been increased from 128 KB to 1 MB. This in order to improve throughput.
Diffstat (limited to 'erts')
-rw-r--r--erts/doc/src/erl.xml2
-rw-r--r--erts/emulator/beam/dist.c89
-rw-r--r--erts/emulator/beam/dist.h6
-rw-r--r--erts/emulator/beam/erl_node_tables.c10
-rw-r--r--erts/emulator/beam/erl_node_tables.h2
5 files changed, 46 insertions, 63 deletions
diff --git a/erts/doc/src/erl.xml b/erts/doc/src/erl.xml
index 09c9cf6812..e8a9501ace 100644
--- a/erts/doc/src/erl.xml
+++ b/erts/doc/src/erl.xml
@@ -914,7 +914,7 @@
<item>
<p>Set the distribution buffer busy limit
(<seealso marker="erlang#system_info_dist_buf_busy_limit">dist_buf_busy_limit</seealso>)
- in kilobytes. Valid range is 1-2097151. Default is 128.</p>
+ in kilobytes. Valid range is 1-2097151. Default is 1024.</p>
<p>A larger buffer limit will allow processes to buffer
more outgoing messages over the distribution. When the
buffer limit has been reached, sending processes will be
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 4497e17d79..694460d702 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -162,7 +162,7 @@ Uint erts_dist_cache_size(void)
static ErtsProcList *
get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs)
{
- ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&dep->qlock));
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&dep->qlock));
dep->qflgs &= ~unset_qflgs;
if (dep->qflgs & ERTS_DE_QFLG_EXIT) {
/* No resume when exit has been scheduled */
@@ -455,17 +455,17 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
if (dep->status & ERTS_DE_SFLG_EXITING) {
#ifdef DEBUG
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qflgs & ERTS_DE_QFLG_EXIT);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
#endif
}
else {
dep->status |= ERTS_DE_SFLG_EXITING;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
dep->qflgs |= ERTS_DE_QFLG_EXIT;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
erts_smp_de_links_lock(dep);
@@ -579,7 +579,7 @@ static void clear_dist_entry(DistEntry *dep)
erts_smp_de_links_unlock(dep);
#endif
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
if (!dep->out_queue.last)
obuf = dep->finalized_out_queue.first;
@@ -595,7 +595,7 @@ static void clear_dist_entry(DistEntry *dep)
dep->status = 0;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
erts_smp_atomic_set(&dep->dist_cmd_scheduled, 0);
dep->send = NULL;
erts_smp_de_rwunlock(dep);
@@ -613,10 +613,10 @@ static void clear_dist_entry(DistEntry *dep)
}
if (obufsize) {
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
}
@@ -1538,18 +1538,18 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
}
else {
ErtsProcList *plp = NULL;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
dep->qsize += size_obuf(obuf);
if (dep->qsize >= erts_dist_buf_busy_limit)
dep->qflgs |= ERTS_DE_QFLG_BUSY;
if (!force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) {
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
plp = erts_proclist_create(c_p);
plp->next = NULL;
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
suspended = 1;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
}
/* Enqueue obuf on dist entry */
@@ -1575,7 +1575,7 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
}
}
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
erts_schedule_dist_command(NULL, dep);
erts_smp_de_runlock(dep);
@@ -1708,10 +1708,8 @@ erts_dist_command(Port *prt, int reds_limit)
{
Sint reds = ERTS_PORT_REDS_DIST_CMD_START;
int prt_busy;
- int de_busy;
Uint32 status;
Uint32 flags;
- Uint32 qflgs;
Sint obufsize = 0;
ErtsDistOutputQueue oq, foq;
DistEntry *dep = prt->dist_entry;
@@ -1746,13 +1744,12 @@ erts_dist_command(Port *prt, int reds_limit)
* a mess.
*/
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
oq.first = dep->out_queue.first;
oq.last = dep->out_queue.last;
dep->out_queue.first = NULL;
dep->out_queue.last = NULL;
- qflgs = dep->qflgs;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
foq.first = dep->finalized_out_queue.first;
foq.last = dep->finalized_out_queue.last;
@@ -1763,17 +1760,8 @@ erts_dist_command(Port *prt, int reds_limit)
goto preempted;
prt_busy = (int) (prt->status & ERTS_PORT_SFLG_PORT_BUSY);
- de_busy = (int) (qflgs & ERTS_DE_QFLG_BUSY);
- if (prt_busy) {
- if (!de_busy) {
- erts_smp_spin_lock(&dep->qlock);
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- erts_smp_spin_unlock(&dep->qlock);
- de_busy = 1;
- }
- }
- else if (foq.first) {
+ if (!prt_busy && foq.first) {
int preempt = 0;
do {
Uint size;
@@ -1791,10 +1779,7 @@ erts_dist_command(Port *prt, int reds_limit)
free_dist_obuf(fob);
preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD);
if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) {
- erts_smp_spin_lock(&dep->qlock);
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- erts_smp_spin_unlock(&dep->qlock);
- de_busy = prt_busy = 1;
+ prt_busy = 1;
break;
}
} while (foq.first && !preempt);
@@ -1877,10 +1862,7 @@ erts_dist_command(Port *prt, int reds_limit)
free_dist_obuf(fob);
preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD);
if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) {
- erts_smp_spin_lock(&dep->qlock);
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- erts_smp_spin_unlock(&dep->qlock);
- de_busy = prt_busy = 1;
+ prt_busy = 1;
if (oq.first && !preempt)
goto finalize_only;
}
@@ -1907,22 +1889,23 @@ erts_dist_command(Port *prt, int reds_limit)
* dist entry in a non-busy state and resume suspended
* processes.
*/
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
obufsize = 0;
- if (de_busy && !prt_busy && dep->qsize < erts_dist_buf_busy_limit) {
+ if (!prt_busy
+ && (dep->qflgs & ERTS_DE_QFLG_BUSY)
+ && dep->qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
int resumed;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
resumed = erts_resume_processes(suspendees);
reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED;
- de_busy = 0;
}
else
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
ASSERT(!oq.first && !oq.last);
@@ -1931,10 +1914,10 @@ erts_dist_command(Port *prt, int reds_limit)
if (obufsize != 0) {
ASSERT(obufsize > 0);
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
ASSERT(foq.first || !foq.last);
@@ -1984,9 +1967,9 @@ erts_dist_command(Port *prt, int reds_limit)
foq.last = NULL;
#ifdef DEBUG
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize == obufsize);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
#endif
}
else {
@@ -1995,14 +1978,14 @@ erts_dist_command(Port *prt, int reds_limit)
* Unhandle buffers need to be put back first
* in out_queue.
*/
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
dep->qsize -= obufsize;
obufsize = 0;
oq.last->next = dep->out_queue.first;
dep->out_queue.first = oq.first;
if (!dep->out_queue.last)
dep->out_queue.last = oq.last;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
erts_schedule_dist_command(prt, NULL);
@@ -2026,10 +2009,10 @@ erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id)
dep->status |= ERTS_DE_SFLG_EXITING;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
dep->qflgs |= ERTS_DE_QFLG_EXIT;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
erts_schedule_dist_command(NULL, dep);
}
@@ -2400,13 +2383,13 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
ErtsProcList *plp = erts_proclist_create(BIF_P);
plp->next = NULL;
erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
if (dep->suspended.last)
dep->suspended.last->next = plp;
else
dep->suspended.first = plp;
dep->suspended.last = plp;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
goto yield;
}
@@ -2434,9 +2417,9 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
ASSERT(dep->send);
#ifdef DEBUG
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize == 0);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
#endif
erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);
diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h
index 28cdd05c3c..64caf34550 100644
--- a/erts/emulator/beam/dist.h
+++ b/erts/emulator/beam/dist.h
@@ -99,7 +99,7 @@ typedef struct {
#define ERTS_DE_IS_CONNECTED(DEP) \
(!ERTS_DE_IS_NOT_CONNECTED((DEP)))
-#define ERTS_DE_BUSY_LIMIT (128*1024)
+#define ERTS_DE_BUSY_LIMIT (1024*1024)
extern int erts_dist_buf_busy_limit;
extern int erts_is_alive;
@@ -154,10 +154,10 @@ erts_dsig_prepare(ErtsDSigData *dsdp,
}
if (no_suspend) {
failure = ERTS_DSIG_PREP_CONNECTED;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
if (dep->qflgs & ERTS_DE_QFLG_BUSY)
failure = ERTS_DSIG_PREP_WOULD_SUSPEND;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
if (failure == ERTS_DSIG_PREP_WOULD_SUSPEND)
goto fail;
}
diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c
index d0b08bf72e..8cdda395df 100644
--- a/erts/emulator/beam/erl_node_tables.c
+++ b/erts/emulator/beam/erl_node_tables.c
@@ -107,7 +107,7 @@ dist_table_alloc(void *dep_tmpl)
dep->nlinks = NULL;
dep->monitors = NULL;
- erts_smp_spinlock_init_x(&dep->qlock, "dist_entry_out_queue", chnl_nr);
+ erts_smp_mtx_init_x(&dep->qlock, "dist_entry_out_queue", chnl_nr);
dep->qflgs = 0;
dep->qsize = 0;
dep->out_queue.first = NULL;
@@ -172,7 +172,7 @@ dist_table_free(void *vdep)
ASSERT(!dep->cache);
erts_smp_rwmtx_destroy(&dep->rwmtx);
erts_smp_mtx_destroy(&dep->lnk_mtx);
- erts_smp_spinlock_destroy(&dep->qlock);
+ erts_smp_mtx_destroy(&dep->qlock);
#ifdef DEBUG
sys_memset(vdep, 0x77, sizeof(DistEntry));
@@ -755,9 +755,9 @@ void erts_init_node_tables(void)
erts_this_dist_entry->nlinks = NULL;
erts_this_dist_entry->monitors = NULL;
- erts_smp_spinlock_init_x(&erts_this_dist_entry->qlock,
- "dist_entry_out_queue",
- make_small(ERST_INTERNAL_CHANNEL_NO));
+ erts_smp_mtx_init_x(&erts_this_dist_entry->qlock,
+ "dist_entry_out_queue",
+ make_small(ERST_INTERNAL_CHANNEL_NO));
erts_this_dist_entry->qflgs = 0;
erts_this_dist_entry->qsize = 0;
erts_this_dist_entry->out_queue.first = NULL;
diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h
index eb759b87e9..b0a63ae035 100644
--- a/erts/emulator/beam/erl_node_tables.h
+++ b/erts/emulator/beam/erl_node_tables.h
@@ -131,7 +131,7 @@ typedef struct dist_entry_ {
ErtsLink *nlinks; /* Link tree with subtrees */
ErtsMonitor *monitors; /* Monitor tree */
- erts_smp_spinlock_t qlock; /* Protects qflgs and out_queue */
+ erts_smp_mtx_t qlock; /* Protects qflgs and out_queue */
Uint32 qflgs;
Sint qsize;
ErtsDistOutputQueue out_queue;