aboutsummaryrefslogtreecommitdiffstats
path: root/erts/emulator/beam/dist.c
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2010-10-19 14:31:31 +0200
committerRickard Green <[email protected]>2010-11-02 13:39:50 +0100
commit9fb85488909f45d65409f3d8158398f6ad3bbbf2 (patch)
tree76b47deac61cda333bb94babb23cf90cb98d34bd /erts/emulator/beam/dist.c
parent158ed71a5ddc5050809723a214a8d8c841022871 (diff)
downloadotp-9fb85488909f45d65409f3d8158398f6ad3bbbf2.tar.gz
otp-9fb85488909f45d65409f3d8158398f6ad3bbbf2.tar.bz2
otp-9fb85488909f45d65409f3d8158398f6ad3bbbf2.zip
Be less eager to set dist entry in busy state
The runtime system is now less eager to suspend processes sending messages over the distribution. The default value of the distribution buffer busy limit has also been increased from 128 KB to 1 MB. This in order to improve throughput.
Diffstat (limited to 'erts/emulator/beam/dist.c')
-rw-r--r--erts/emulator/beam/dist.c89
1 files changed, 36 insertions, 53 deletions
diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c
index 4497e17d79..694460d702 100644
--- a/erts/emulator/beam/dist.c
+++ b/erts/emulator/beam/dist.c
@@ -162,7 +162,7 @@ Uint erts_dist_cache_size(void)
static ErtsProcList *
get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs)
{
- ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&dep->qlock));
+ ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&dep->qlock));
dep->qflgs &= ~unset_qflgs;
if (dep->qflgs & ERTS_DE_QFLG_EXIT) {
/* No resume when exit has been scheduled */
@@ -455,17 +455,17 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason)
if (dep->status & ERTS_DE_SFLG_EXITING) {
#ifdef DEBUG
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qflgs & ERTS_DE_QFLG_EXIT);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
#endif
}
else {
dep->status |= ERTS_DE_SFLG_EXITING;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
dep->qflgs |= ERTS_DE_QFLG_EXIT;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
erts_smp_de_links_lock(dep);
@@ -579,7 +579,7 @@ static void clear_dist_entry(DistEntry *dep)
erts_smp_de_links_unlock(dep);
#endif
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
if (!dep->out_queue.last)
obuf = dep->finalized_out_queue.first;
@@ -595,7 +595,7 @@ static void clear_dist_entry(DistEntry *dep)
dep->status = 0;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
erts_smp_atomic_set(&dep->dist_cmd_scheduled, 0);
dep->send = NULL;
erts_smp_de_rwunlock(dep);
@@ -613,10 +613,10 @@ static void clear_dist_entry(DistEntry *dep)
}
if (obufsize) {
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
}
@@ -1538,18 +1538,18 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
}
else {
ErtsProcList *plp = NULL;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
dep->qsize += size_obuf(obuf);
if (dep->qsize >= erts_dist_buf_busy_limit)
dep->qflgs |= ERTS_DE_QFLG_BUSY;
if (!force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) {
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
plp = erts_proclist_create(c_p);
plp->next = NULL;
erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL);
suspended = 1;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
}
/* Enqueue obuf on dist entry */
@@ -1575,7 +1575,7 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy)
}
}
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
erts_schedule_dist_command(NULL, dep);
erts_smp_de_runlock(dep);
@@ -1708,10 +1708,8 @@ erts_dist_command(Port *prt, int reds_limit)
{
Sint reds = ERTS_PORT_REDS_DIST_CMD_START;
int prt_busy;
- int de_busy;
Uint32 status;
Uint32 flags;
- Uint32 qflgs;
Sint obufsize = 0;
ErtsDistOutputQueue oq, foq;
DistEntry *dep = prt->dist_entry;
@@ -1746,13 +1744,12 @@ erts_dist_command(Port *prt, int reds_limit)
* a mess.
*/
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
oq.first = dep->out_queue.first;
oq.last = dep->out_queue.last;
dep->out_queue.first = NULL;
dep->out_queue.last = NULL;
- qflgs = dep->qflgs;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
foq.first = dep->finalized_out_queue.first;
foq.last = dep->finalized_out_queue.last;
@@ -1763,17 +1760,8 @@ erts_dist_command(Port *prt, int reds_limit)
goto preempted;
prt_busy = (int) (prt->status & ERTS_PORT_SFLG_PORT_BUSY);
- de_busy = (int) (qflgs & ERTS_DE_QFLG_BUSY);
- if (prt_busy) {
- if (!de_busy) {
- erts_smp_spin_lock(&dep->qlock);
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- erts_smp_spin_unlock(&dep->qlock);
- de_busy = 1;
- }
- }
- else if (foq.first) {
+ if (!prt_busy && foq.first) {
int preempt = 0;
do {
Uint size;
@@ -1791,10 +1779,7 @@ erts_dist_command(Port *prt, int reds_limit)
free_dist_obuf(fob);
preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD);
if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) {
- erts_smp_spin_lock(&dep->qlock);
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- erts_smp_spin_unlock(&dep->qlock);
- de_busy = prt_busy = 1;
+ prt_busy = 1;
break;
}
} while (foq.first && !preempt);
@@ -1877,10 +1862,7 @@ erts_dist_command(Port *prt, int reds_limit)
free_dist_obuf(fob);
preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD);
if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) {
- erts_smp_spin_lock(&dep->qlock);
- dep->qflgs |= ERTS_DE_QFLG_BUSY;
- erts_smp_spin_unlock(&dep->qlock);
- de_busy = prt_busy = 1;
+ prt_busy = 1;
if (oq.first && !preempt)
goto finalize_only;
}
@@ -1907,22 +1889,23 @@ erts_dist_command(Port *prt, int reds_limit)
* dist entry in a non-busy state and resume suspended
* processes.
*/
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
obufsize = 0;
- if (de_busy && !prt_busy && dep->qsize < erts_dist_buf_busy_limit) {
+ if (!prt_busy
+ && (dep->qflgs & ERTS_DE_QFLG_BUSY)
+ && dep->qsize < erts_dist_buf_busy_limit) {
ErtsProcList *suspendees;
int resumed;
suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
resumed = erts_resume_processes(suspendees);
reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED;
- de_busy = 0;
}
else
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
ASSERT(!oq.first && !oq.last);
@@ -1931,10 +1914,10 @@ erts_dist_command(Port *prt, int reds_limit)
if (obufsize != 0) {
ASSERT(obufsize > 0);
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize >= obufsize);
dep->qsize -= obufsize;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
ASSERT(foq.first || !foq.last);
@@ -1984,9 +1967,9 @@ erts_dist_command(Port *prt, int reds_limit)
foq.last = NULL;
#ifdef DEBUG
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize == obufsize);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
#endif
}
else {
@@ -1995,14 +1978,14 @@ erts_dist_command(Port *prt, int reds_limit)
* Unhandle buffers need to be put back first
* in out_queue.
*/
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
dep->qsize -= obufsize;
obufsize = 0;
oq.last->next = dep->out_queue.first;
dep->out_queue.first = oq.first;
if (!dep->out_queue.last)
dep->out_queue.last = oq.last;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
}
erts_schedule_dist_command(prt, NULL);
@@ -2026,10 +2009,10 @@ erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id)
dep->status |= ERTS_DE_SFLG_EXITING;
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT));
dep->qflgs |= ERTS_DE_QFLG_EXIT;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
erts_schedule_dist_command(NULL, dep);
}
@@ -2400,13 +2383,13 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
ErtsProcList *plp = erts_proclist_create(BIF_P);
plp->next = NULL;
erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL);
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
if (dep->suspended.last)
dep->suspended.last->next = plp;
else
dep->suspended.first = plp;
dep->suspended.last = plp;
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
goto yield;
}
@@ -2434,9 +2417,9 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3)
ASSERT(dep->send);
#ifdef DEBUG
- erts_smp_spin_lock(&dep->qlock);
+ erts_smp_mtx_lock(&dep->qlock);
ASSERT(dep->qsize == 0);
- erts_smp_spin_unlock(&dep->qlock);
+ erts_smp_mtx_unlock(&dep->qlock);
#endif
erts_set_dist_entry_connected(dep, BIF_ARG_2, flags);