diff options
author | Rickard Green <rickard@erlang.org> | 2010-10-19 14:31:31 +0200 |
---|---|---|
committer | Rickard Green <rickard@erlang.org> | 2010-11-02 13:39:50 +0100 |
commit | 9fb85488909f45d65409f3d8158398f6ad3bbbf2 (patch) | |
tree | 76b47deac61cda333bb94babb23cf90cb98d34bd /erts | |
parent | 158ed71a5ddc5050809723a214a8d8c841022871 (diff) | |
download | otp-9fb85488909f45d65409f3d8158398f6ad3bbbf2.tar.gz otp-9fb85488909f45d65409f3d8158398f6ad3bbbf2.tar.bz2 otp-9fb85488909f45d65409f3d8158398f6ad3bbbf2.zip |
Be less eager to set dist entry in busy state
The runtime system is now less eager to suspend
processes sending messages over the distribution. The
default value of the distribution buffer busy limit
has also been increased from 128 KB to 1 MB. This in
order to improve throughput.
Diffstat (limited to 'erts')
-rw-r--r-- | erts/doc/src/erl.xml | 2 | ||||
-rw-r--r-- | erts/emulator/beam/dist.c | 89 | ||||
-rw-r--r-- | erts/emulator/beam/dist.h | 6 | ||||
-rw-r--r-- | erts/emulator/beam/erl_node_tables.c | 10 | ||||
-rw-r--r-- | erts/emulator/beam/erl_node_tables.h | 2 |
5 files changed, 46 insertions, 63 deletions
diff --git a/erts/doc/src/erl.xml b/erts/doc/src/erl.xml index 09c9cf6812..e8a9501ace 100644 --- a/erts/doc/src/erl.xml +++ b/erts/doc/src/erl.xml @@ -914,7 +914,7 @@ <item> <p>Set the distribution buffer busy limit (<seealso marker="erlang#system_info_dist_buf_busy_limit">dist_buf_busy_limit</seealso>) - in kilobytes. Valid range is 1-2097151. Default is 128.</p> + in kilobytes. Valid range is 1-2097151. Default is 1024.</p> <p>A larger buffer limit will allow processes to buffer more outgoing messages over the distribution. When the buffer limit has been reached, sending processes will be diff --git a/erts/emulator/beam/dist.c b/erts/emulator/beam/dist.c index 4497e17d79..694460d702 100644 --- a/erts/emulator/beam/dist.c +++ b/erts/emulator/beam/dist.c @@ -162,7 +162,7 @@ Uint erts_dist_cache_size(void) static ErtsProcList * get_suspended_on_de(DistEntry *dep, Uint32 unset_qflgs) { - ERTS_SMP_LC_ASSERT(erts_smp_lc_spinlock_is_locked(&dep->qlock)); + ERTS_SMP_LC_ASSERT(erts_smp_lc_mtx_is_locked(&dep->qlock)); dep->qflgs &= ~unset_qflgs; if (dep->qflgs & ERTS_DE_QFLG_EXIT) { /* No resume when exit has been scheduled */ @@ -455,17 +455,17 @@ int erts_do_net_exits(DistEntry *dep, Eterm reason) if (dep->status & ERTS_DE_SFLG_EXITING) { #ifdef DEBUG - erts_smp_spin_lock(&dep->qlock); + erts_smp_mtx_lock(&dep->qlock); ASSERT(dep->qflgs & ERTS_DE_QFLG_EXIT); - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); #endif } else { dep->status |= ERTS_DE_SFLG_EXITING; - erts_smp_spin_lock(&dep->qlock); + erts_smp_mtx_lock(&dep->qlock); ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT)); dep->qflgs |= ERTS_DE_QFLG_EXIT; - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); } erts_smp_de_links_lock(dep); @@ -579,7 +579,7 @@ static void clear_dist_entry(DistEntry *dep) erts_smp_de_links_unlock(dep); #endif - erts_smp_spin_lock(&dep->qlock); + erts_smp_mtx_lock(&dep->qlock); if (!dep->out_queue.last) obuf = dep->finalized_out_queue.first; @@ -595,7 +595,7 @@ static void clear_dist_entry(DistEntry *dep) dep->status = 0; suspendees = get_suspended_on_de(dep, ERTS_DE_QFLGS_ALL); - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); erts_smp_atomic_set(&dep->dist_cmd_scheduled, 0); dep->send = NULL; erts_smp_de_rwunlock(dep); @@ -613,10 +613,10 @@ static void clear_dist_entry(DistEntry *dep) } if (obufsize) { - erts_smp_spin_lock(&dep->qlock); + erts_smp_mtx_lock(&dep->qlock); ASSERT(dep->qsize >= obufsize); dep->qsize -= obufsize; - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); } } @@ -1538,18 +1538,18 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy) } else { ErtsProcList *plp = NULL; - erts_smp_spin_lock(&dep->qlock); + erts_smp_mtx_lock(&dep->qlock); dep->qsize += size_obuf(obuf); if (dep->qsize >= erts_dist_buf_busy_limit) dep->qflgs |= ERTS_DE_QFLG_BUSY; if (!force_busy && (dep->qflgs & ERTS_DE_QFLG_BUSY)) { - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); plp = erts_proclist_create(c_p); plp->next = NULL; erts_suspend(c_p, ERTS_PROC_LOCK_MAIN, NULL); suspended = 1; - erts_smp_spin_lock(&dep->qlock); + erts_smp_mtx_lock(&dep->qlock); } /* Enqueue obuf on dist entry */ @@ -1575,7 +1575,7 @@ dsig_send(ErtsDSigData *dsdp, Eterm ctl, Eterm msg, int force_busy) } } - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); erts_schedule_dist_command(NULL, dep); erts_smp_de_runlock(dep); @@ -1708,10 +1708,8 @@ erts_dist_command(Port *prt, int reds_limit) { Sint reds = ERTS_PORT_REDS_DIST_CMD_START; int prt_busy; - int de_busy; Uint32 status; Uint32 flags; - Uint32 qflgs; Sint obufsize = 0; ErtsDistOutputQueue oq, foq; DistEntry *dep = prt->dist_entry; @@ -1746,13 +1744,12 @@ erts_dist_command(Port *prt, int reds_limit) * a mess. */ - erts_smp_spin_lock(&dep->qlock); + erts_smp_mtx_lock(&dep->qlock); oq.first = dep->out_queue.first; oq.last = dep->out_queue.last; dep->out_queue.first = NULL; dep->out_queue.last = NULL; - qflgs = dep->qflgs; - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); foq.first = dep->finalized_out_queue.first; foq.last = dep->finalized_out_queue.last; @@ -1763,17 +1760,8 @@ erts_dist_command(Port *prt, int reds_limit) goto preempted; prt_busy = (int) (prt->status & ERTS_PORT_SFLG_PORT_BUSY); - de_busy = (int) (qflgs & ERTS_DE_QFLG_BUSY); - if (prt_busy) { - if (!de_busy) { - erts_smp_spin_lock(&dep->qlock); - dep->qflgs |= ERTS_DE_QFLG_BUSY; - erts_smp_spin_unlock(&dep->qlock); - de_busy = 1; - } - } - else if (foq.first) { + if (!prt_busy && foq.first) { int preempt = 0; do { Uint size; @@ -1791,10 +1779,7 @@ erts_dist_command(Port *prt, int reds_limit) free_dist_obuf(fob); preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD); if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) { - erts_smp_spin_lock(&dep->qlock); - dep->qflgs |= ERTS_DE_QFLG_BUSY; - erts_smp_spin_unlock(&dep->qlock); - de_busy = prt_busy = 1; + prt_busy = 1; break; } } while (foq.first && !preempt); @@ -1877,10 +1862,7 @@ erts_dist_command(Port *prt, int reds_limit) free_dist_obuf(fob); preempt = reds > reds_limit || (prt->status & ERTS_PORT_SFLGS_DEAD); if (prt->status & ERTS_PORT_SFLG_PORT_BUSY) { - erts_smp_spin_lock(&dep->qlock); - dep->qflgs |= ERTS_DE_QFLG_BUSY; - erts_smp_spin_unlock(&dep->qlock); - de_busy = prt_busy = 1; + prt_busy = 1; if (oq.first && !preempt) goto finalize_only; } @@ -1907,22 +1889,23 @@ erts_dist_command(Port *prt, int reds_limit) * dist entry in a non-busy state and resume suspended * processes. */ - erts_smp_spin_lock(&dep->qlock); + erts_smp_mtx_lock(&dep->qlock); ASSERT(dep->qsize >= obufsize); dep->qsize -= obufsize; obufsize = 0; - if (de_busy && !prt_busy && dep->qsize < erts_dist_buf_busy_limit) { + if (!prt_busy + && (dep->qflgs & ERTS_DE_QFLG_BUSY) + && dep->qsize < erts_dist_buf_busy_limit) { ErtsProcList *suspendees; int resumed; suspendees = get_suspended_on_de(dep, ERTS_DE_QFLG_BUSY); - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); resumed = erts_resume_processes(suspendees); reds += resumed*ERTS_PORT_REDS_DIST_CMD_RESUMED; - de_busy = 0; } else - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); } ASSERT(!oq.first && !oq.last); @@ -1931,10 +1914,10 @@ erts_dist_command(Port *prt, int reds_limit) if (obufsize != 0) { ASSERT(obufsize > 0); - erts_smp_spin_lock(&dep->qlock); + erts_smp_mtx_lock(&dep->qlock); ASSERT(dep->qsize >= obufsize); dep->qsize -= obufsize; - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); } ASSERT(foq.first || !foq.last); @@ -1984,9 +1967,9 @@ erts_dist_command(Port *prt, int reds_limit) foq.last = NULL; #ifdef DEBUG - erts_smp_spin_lock(&dep->qlock); + erts_smp_mtx_lock(&dep->qlock); ASSERT(dep->qsize == obufsize); - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); #endif } else { @@ -1995,14 +1978,14 @@ erts_dist_command(Port *prt, int reds_limit) * Unhandle buffers need to be put back first * in out_queue. */ - erts_smp_spin_lock(&dep->qlock); + erts_smp_mtx_lock(&dep->qlock); dep->qsize -= obufsize; obufsize = 0; oq.last->next = dep->out_queue.first; dep->out_queue.first = oq.first; if (!dep->out_queue.last) dep->out_queue.last = oq.last; - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); } erts_schedule_dist_command(prt, NULL); @@ -2026,10 +2009,10 @@ erts_kill_dist_connection(DistEntry *dep, Uint32 connection_id) dep->status |= ERTS_DE_SFLG_EXITING; - erts_smp_spin_lock(&dep->qlock); + erts_smp_mtx_lock(&dep->qlock); ASSERT(!(dep->qflgs & ERTS_DE_QFLG_EXIT)); dep->qflgs |= ERTS_DE_QFLG_EXIT; - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); erts_schedule_dist_command(NULL, dep); } @@ -2400,13 +2383,13 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) ErtsProcList *plp = erts_proclist_create(BIF_P); plp->next = NULL; erts_suspend(BIF_P, ERTS_PROC_LOCK_MAIN, NULL); - erts_smp_spin_lock(&dep->qlock); + erts_smp_mtx_lock(&dep->qlock); if (dep->suspended.last) dep->suspended.last->next = plp; else dep->suspended.first = plp; dep->suspended.last = plp; - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); goto yield; } @@ -2434,9 +2417,9 @@ BIF_RETTYPE setnode_3(BIF_ALIST_3) ASSERT(dep->send); #ifdef DEBUG - erts_smp_spin_lock(&dep->qlock); + erts_smp_mtx_lock(&dep->qlock); ASSERT(dep->qsize == 0); - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); #endif erts_set_dist_entry_connected(dep, BIF_ARG_2, flags); diff --git a/erts/emulator/beam/dist.h b/erts/emulator/beam/dist.h index 28cdd05c3c..64caf34550 100644 --- a/erts/emulator/beam/dist.h +++ b/erts/emulator/beam/dist.h @@ -99,7 +99,7 @@ typedef struct { #define ERTS_DE_IS_CONNECTED(DEP) \ (!ERTS_DE_IS_NOT_CONNECTED((DEP))) -#define ERTS_DE_BUSY_LIMIT (128*1024) +#define ERTS_DE_BUSY_LIMIT (1024*1024) extern int erts_dist_buf_busy_limit; extern int erts_is_alive; @@ -154,10 +154,10 @@ erts_dsig_prepare(ErtsDSigData *dsdp, } if (no_suspend) { failure = ERTS_DSIG_PREP_CONNECTED; - erts_smp_spin_lock(&dep->qlock); + erts_smp_mtx_lock(&dep->qlock); if (dep->qflgs & ERTS_DE_QFLG_BUSY) failure = ERTS_DSIG_PREP_WOULD_SUSPEND; - erts_smp_spin_unlock(&dep->qlock); + erts_smp_mtx_unlock(&dep->qlock); if (failure == ERTS_DSIG_PREP_WOULD_SUSPEND) goto fail; } diff --git a/erts/emulator/beam/erl_node_tables.c b/erts/emulator/beam/erl_node_tables.c index d0b08bf72e..8cdda395df 100644 --- a/erts/emulator/beam/erl_node_tables.c +++ b/erts/emulator/beam/erl_node_tables.c @@ -107,7 +107,7 @@ dist_table_alloc(void *dep_tmpl) dep->nlinks = NULL; dep->monitors = NULL; - erts_smp_spinlock_init_x(&dep->qlock, "dist_entry_out_queue", chnl_nr); + erts_smp_mtx_init_x(&dep->qlock, "dist_entry_out_queue", chnl_nr); dep->qflgs = 0; dep->qsize = 0; dep->out_queue.first = NULL; @@ -172,7 +172,7 @@ dist_table_free(void *vdep) ASSERT(!dep->cache); erts_smp_rwmtx_destroy(&dep->rwmtx); erts_smp_mtx_destroy(&dep->lnk_mtx); - erts_smp_spinlock_destroy(&dep->qlock); + erts_smp_mtx_destroy(&dep->qlock); #ifdef DEBUG sys_memset(vdep, 0x77, sizeof(DistEntry)); @@ -755,9 +755,9 @@ void erts_init_node_tables(void) erts_this_dist_entry->nlinks = NULL; erts_this_dist_entry->monitors = NULL; - erts_smp_spinlock_init_x(&erts_this_dist_entry->qlock, - "dist_entry_out_queue", - make_small(ERST_INTERNAL_CHANNEL_NO)); + erts_smp_mtx_init_x(&erts_this_dist_entry->qlock, + "dist_entry_out_queue", + make_small(ERST_INTERNAL_CHANNEL_NO)); erts_this_dist_entry->qflgs = 0; erts_this_dist_entry->qsize = 0; erts_this_dist_entry->out_queue.first = NULL; diff --git a/erts/emulator/beam/erl_node_tables.h b/erts/emulator/beam/erl_node_tables.h index eb759b87e9..b0a63ae035 100644 --- a/erts/emulator/beam/erl_node_tables.h +++ b/erts/emulator/beam/erl_node_tables.h @@ -131,7 +131,7 @@ typedef struct dist_entry_ { ErtsLink *nlinks; /* Link tree with subtrees */ ErtsMonitor *monitors; /* Monitor tree */ - erts_smp_spinlock_t qlock; /* Protects qflgs and out_queue */ + erts_smp_mtx_t qlock; /* Protects qflgs and out_queue */ Uint32 qflgs; Sint qsize; ErtsDistOutputQueue out_queue; |