aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorRickard Green <[email protected]>2012-08-30 23:25:34 +0200
committerRickard Green <[email protected]>2012-12-03 21:18:03 +0100
commitd827ac847517ed43339a6a86493c7c017be782a5 (patch)
tree6e7a85323ac80f99f73ee838badff0236bcc1276
parent88126e785de24f5f41068c610bc13840dcab4a7d (diff)
downloadotp-d827ac847517ed43339a6a86493c7c017be782a5.tar.gz
otp-d827ac847517ed43339a6a86493c7c017be782a5.tar.bz2
otp-d827ac847517ed43339a6a86493c7c017be782a5.zip
Implement functionality for delaying thread progress from unmanaged threads
-rw-r--r--erts/emulator/beam/erl_process_lock.c36
-rw-r--r--erts/emulator/beam/erl_thr_progress.c285
-rw-r--r--erts/emulator/beam/erl_thr_progress.h62
-rw-r--r--erts/emulator/test/driver_SUITE.erl63
-rw-r--r--erts/emulator/test/driver_SUITE_data/Makefile.src3
-rw-r--r--erts/emulator/test/driver_SUITE_data/thr_msg_blast_drv.c178
6 files changed, 543 insertions, 84 deletions
diff --git a/erts/emulator/beam/erl_process_lock.c b/erts/emulator/beam/erl_process_lock.c
index b864340782..fa935af315 100644
--- a/erts/emulator/beam/erl_process_lock.c
+++ b/erts/emulator/beam/erl_process_lock.c
@@ -66,6 +66,7 @@
#endif
#include "erl_process.h"
+#include "erl_thr_progress.h"
const Process erts_proc_lock_busy = {ERTS_INVALID_PID};
@@ -671,7 +672,7 @@ erts_proc_lock_prepare_proc_lock_waiter(void)
*/
static void
-proc_safelock(int is_sched,
+proc_safelock(int is_managed,
Process *a_proc,
ErtsProcLocks a_have_locks,
ErtsProcLocks a_need_locks,
@@ -797,7 +798,7 @@ proc_safelock(int is_sched,
if (unlock_locks) {
have_locks1 &= ~unlock_locks;
need_locks1 |= unlock_locks;
- if (!is_sched && !have_locks1) {
+ if (!is_managed && !have_locks1) {
refc1 = 1;
erts_smp_proc_inc_refc(p1);
}
@@ -807,7 +808,7 @@ proc_safelock(int is_sched,
if (unlock_locks) {
have_locks2 &= ~unlock_locks;
need_locks2 |= unlock_locks;
- if (!is_sched && !have_locks2) {
+ if (!is_managed && !have_locks2) {
refc2 = 1;
erts_smp_proc_inc_refc(p2);
}
@@ -888,7 +889,7 @@ proc_safelock(int is_sched,
}
#endif
- if (!is_sched) {
+ if (!is_managed) {
if (refc1)
erts_smp_proc_dec_refc(p1);
if (refc2)
@@ -921,7 +922,7 @@ erts_pid2proc_opt(Process *c_p,
int flags)
{
Process *dec_refc_proc = NULL;
- int need_ptl;
+ ErtsThrPrgrDelayHandle dhndl;
ErtsProcLocks need_locks;
Uint pix;
Process *proc;
@@ -959,10 +960,7 @@ erts_pid2proc_opt(Process *c_p,
}
}
- need_ptl = !erts_get_scheduler_id();
-
- if (need_ptl)
- erts_smp_rwmtx_rwlock(&erts_proc_tab_rwmtx);
+ dhndl = erts_thr_progress_unmanaged_delay();
proc = (Process *) erts_smp_atomic_read_ddrb(&erts_proc.tab[pix]);
@@ -1026,6 +1024,7 @@ erts_pid2proc_opt(Process *c_p,
if (flags & ERTS_P2P_FLG_TRY_LOCK)
proc = ERTS_PROC_LOCK_BUSY;
else {
+ int managed;
if (flags & ERTS_P2P_FLG_SMP_INC_REFC)
erts_smp_proc_inc_refc(proc);
@@ -1033,14 +1032,21 @@ erts_pid2proc_opt(Process *c_p,
erts_lcnt_proc_lock_unaquire(&proc->lock, lcnt_locks);
#endif
- if (need_ptl) {
+ managed = dhndl == ERTS_THR_PRGR_DHANDLE_MANAGED;
+ if (!managed) {
erts_smp_proc_inc_refc(proc);
+ erts_thr_progress_unmanaged_continue(dhndl);
dec_refc_proc = proc;
- erts_smp_rwmtx_rwunlock(&erts_proc_tab_rwmtx);
- need_ptl = 0;
+
+ /*
+ * We don't want to call
+ * erts_thr_progress_unmanaged_continue()
+ * again.
+ */
+ dhndl = ERTS_THR_PRGR_DHANDLE_MANAGED;
}
- proc_safelock(!need_ptl,
+ proc_safelock(managed,
c_p,
c_p_have_locks,
c_p_have_locks,
@@ -1052,8 +1058,8 @@ erts_pid2proc_opt(Process *c_p,
}
}
- if (need_ptl)
- erts_smp_rwmtx_rwunlock(&erts_proc_tab_rwmtx);
+ if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_thr_progress_unmanaged_continue(dhndl);
if (need_locks
&& proc
diff --git a/erts/emulator/beam/erl_thr_progress.c b/erts/emulator/beam/erl_thr_progress.c
index 88524bdd4c..3b2f95d43b 100644
--- a/erts/emulator/beam/erl_thr_progress.c
+++ b/erts/emulator/beam/erl_thr_progress.c
@@ -96,17 +96,14 @@
#define ERTS_THR_PRGR_LFLG_BLOCK (((erts_aint32_t) 1) << 31)
#define ERTS_THR_PRGR_LFLG_NO_LEADER (((erts_aint32_t) 1) << 30)
-#define ERTS_THR_PRGR_LFLG_ACTIVE_MASK (~(ERTS_THR_PRGR_LFLG_NO_LEADER \
- | ERTS_THR_PRGR_LFLG_BLOCK))
+#define ERTS_THR_PRGR_LFLG_WAITING_UM (((erts_aint32_t) 1) << 29)
+#define ERTS_THR_PRGR_LFLG_ACTIVE_MASK (~(ERTS_THR_PRGR_LFLG_NO_LEADER \
+ | ERTS_THR_PRGR_LFLG_BLOCK \
+ | ERTS_THR_PRGR_LFLG_WAITING_UM))
-#define ERTS_THR_PRGR_LFLGS_ACTIVE(LFLGS) \
+#define ERTS_THR_PRGR_LFLGS_ACTIVE(LFLGS) \
((LFLGS) & ERTS_THR_PRGR_LFLG_ACTIVE_MASK)
-#define ERTS_THR_PRGR_LFLGS_ALL_WAITING(LFLGS) \
- (((LFLGS) & (ERTS_THR_PRGR_LFLG_NO_LEADER \
- |ERTS_THR_PRGR_LFLG_ACTIVE_MASK)) \
- == ERTS_THR_PRGR_LFLG_NO_LEADER)
-
/*
* We use a 64-bit value for thread progress. By this wrapping of
* the thread progress will more or less never occur.
@@ -262,6 +259,11 @@ typedef struct {
erts_atomic32_t managed_count;
erts_atomic32_t managed_id;
erts_atomic32_t unmanaged_id;
+ int chk_next_ix;
+ struct {
+ int waiting;
+ erts_atomic32_t current;
+ } umrefc_ix;
} ErtsThrPrgrMiscData;
typedef struct {
@@ -276,12 +278,18 @@ typedef union {
char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrPrgrElement))];
} ErtsThrPrgrArray;
+typedef union {
+ erts_atomic_t refc;
+ char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_atomic_t))];
+} ErtsThrPrgrUnmanagedRefc;
+
typedef struct {
union {
ErtsThrPrgrMiscData data;
char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(
sizeof(ErtsThrPrgrMiscData))];
} misc;
+ ErtsThrPrgrUnmanagedRefc umrefc[2];
ErtsThrPrgrArray *thr;
struct {
int no;
@@ -346,7 +354,9 @@ init_tmp_thr_prgr_data(ErtsThrPrgrData *tpd)
tpd->is_managed = 0;
tpd->is_blocking = 0;
tpd->is_temporary = 1;
-
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ tpd->is_delaying = 0;
+#endif
erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
}
@@ -461,6 +471,9 @@ erts_thr_progress_init(int no_schedulers, int managed, int unmanaged)
erts_atomic32_init_nob(&intrnl->misc.data.managed_count, 0);
erts_atomic32_init_nob(&intrnl->misc.data.managed_id, no_schedulers);
erts_atomic32_init_nob(&intrnl->misc.data.unmanaged_id, -1);
+ intrnl->misc.data.chk_next_ix = 0;
+ intrnl->misc.data.umrefc_ix.waiting = -1;
+ erts_atomic32_init_nob(&intrnl->misc.data.umrefc_ix.current, 0);
intrnl->thr = (ErtsThrPrgrArray *) ptr;
ptr += thr_arr_sz;
@@ -547,6 +560,9 @@ erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrCallbacks *callbacks)
tpd->is_managed = 0;
tpd->is_blocking = is_blocking;
tpd->is_temporary = 0;
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ tpd->is_delaying = 0;
+#endif
ASSERT(tpd->id >= 0);
if (tpd->id >= intrnl->unmanaged.no)
erl_exit(ERTS_ABORT_EXIT,
@@ -600,6 +616,9 @@ erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp,
tpd->is_managed = 1;
tpd->is_blocking = is_blocking;
tpd->is_temporary = 0;
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ tpd->is_delaying = 1;
+#endif
init_wakeup_request_array(&tpd->wakeup_request[0]);
@@ -607,8 +626,8 @@ erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp,
tpd->leader = 0;
tpd->active = 1;
- tpd->previous.local = 0;
- tpd->previous.current = ERTS_THR_PRGR_VAL_WAITING;
+ tpd->confirmed = 0;
+ tpd->leader_state.current = ERTS_THR_PRGR_VAL_WAITING;
erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
erts_atomic32_inc_nob(&intrnl->misc.data.lflgs);
@@ -651,60 +670,113 @@ leader_update(ErtsThrPrgrData *tpd)
block_thread(tpd);
}
else {
+ ErtsThrPrgrVal current;
+ int ix, chk_next_ix, umrefc_ix, my_ix, no_managed, waiting_unmanaged;
erts_aint32_t lflgs;
ErtsThrPrgrVal next;
- int ix, sz, make_progress;
+ erts_aint_t refc;
- if (tpd->previous.current == ERTS_THR_PRGR_VAL_WAITING) {
- /* Took over as leader from another thread */
- tpd->previous.current = read_acqb(&erts_thr_prgr__.current);
- tpd->previous.next = tpd->previous.current;
- tpd->previous.next++;
- if (tpd->previous.next == ERTS_THR_PRGR_VAL_WAITING)
- tpd->previous.next = 0;
- }
+ my_ix = tpd->id;
- if (tpd->previous.local == tpd->previous.current) {
- ErtsThrPrgrVal val = tpd->previous.current + 1;
- if (val == ERTS_THR_PRGR_VAL_WAITING)
- val = 0;
- tpd->previous.local = val;
- set_mb(&intrnl->thr[tpd->id].data.current, val);
+ if (tpd->leader_state.current == ERTS_THR_PRGR_VAL_WAITING) {
+ /* Took over as leader from another thread */
+ tpd->leader_state.current = read_nob(&erts_thr_prgr__.current);
+ tpd->leader_state.next = tpd->leader_state.current;
+ tpd->leader_state.next++;
+ if (tpd->leader_state.next == ERTS_THR_PRGR_VAL_WAITING)
+ tpd->leader_state.next = 0;
+ tpd->leader_state.chk_next_ix = intrnl->misc.data.chk_next_ix;
+ tpd->leader_state.umrefc_ix.waiting = intrnl->misc.data.umrefc_ix.waiting;
+ tpd->leader_state.umrefc_ix.current =
+ (int) erts_atomic32_read_nob(&intrnl->misc.data.umrefc_ix.current);
+
+ if (tpd->confirmed == tpd->leader_state.current) {
+ ErtsThrPrgrVal val = tpd->leader_state.current + 1;
+ if (val == ERTS_THR_PRGR_VAL_WAITING)
+ val = 0;
+ tpd->confirmed = val;
+ set_mb(&intrnl->thr[my_ix].data.current, val);
+ }
}
- next = tpd->previous.next;
- make_progress = 1;
- sz = intrnl->managed.no;
- for (ix = 0; ix < sz; ix++) {
- ErtsThrPrgrVal tmp;
- tmp = read_nob(&intrnl->thr[ix].data.current);
- if (tmp != next && tmp != ERTS_THR_PRGR_VAL_WAITING) {
- make_progress = 0;
- ASSERT(erts_thr_progress_has_passed__(next, tmp));
- break;
+ next = tpd->leader_state.next;
+
+ waiting_unmanaged = 0;
+ umrefc_ix = -1; /* Shut up annoying warning */
+
+ chk_next_ix = tpd->leader_state.chk_next_ix;
+ no_managed = intrnl->managed.no;
+ ASSERT(0 <= chk_next_ix && chk_next_ix <= no_managed);
+ /* Check manged threads */
+ if (chk_next_ix < no_managed) {
+ for (ix = chk_next_ix; ix < no_managed; ix++) {
+ ErtsThrPrgrVal tmp;
+ if (ix == my_ix)
+ continue;
+ tmp = read_nob(&intrnl->thr[ix].data.current);
+ if (tmp != next && tmp != ERTS_THR_PRGR_VAL_WAITING) {
+ tpd->leader_state.chk_next_ix = ix;
+ ASSERT(erts_thr_progress_has_passed__(next, tmp));
+ goto done;
+ }
}
}
- if (make_progress) {
- ErtsThrPrgrVal current = next;
+ /* Check unmanged threads */
+ waiting_unmanaged = tpd->leader_state.umrefc_ix.waiting != -1;
+ umrefc_ix = (waiting_unmanaged
+ ? tpd->leader_state.umrefc_ix.waiting
+ : tpd->leader_state.umrefc_ix.current);
+ refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc);
+ ASSERT(refc >= 0);
+ if (refc != 0) {
+ int new_umrefc_ix;
+
+ if (waiting_unmanaged)
+ goto done;
+
+ new_umrefc_ix = (umrefc_ix + 1) & 0x1;
+ tpd->leader_state.umrefc_ix.waiting = umrefc_ix;
+ tpd->leader_state.chk_next_ix = no_managed;
+ erts_atomic32_set_nob(&intrnl->misc.data.umrefc_ix.current,
+ (erts_aint32_t) new_umrefc_ix);
+ ETHR_MEMBAR(ETHR_StoreLoad);
+ refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc);
+ ASSERT(refc >= 0);
+ waiting_unmanaged = 1;
+ if (refc != 0)
+ goto done;
+ }
+
+ /* Make progress */
+ current = next;
- next++;
- if (next == ERTS_THR_PRGR_VAL_WAITING)
- next = 0;
+ next++;
+ if (next == ERTS_THR_PRGR_VAL_WAITING)
+ next = 0;
- set_nob(&intrnl->thr[tpd->id].data.current, next);
- set_mb(&erts_thr_prgr__.current, current);
- tpd->previous.local = next;
- tpd->previous.next = next;
- tpd->previous.current = current;
+ set_nob(&intrnl->thr[my_ix].data.current, next);
+ set_mb(&erts_thr_prgr__.current, current);
+ tpd->confirmed = next;
+ tpd->leader_state.next = next;
+ tpd->leader_state.current = current;
#if ERTS_THR_PRGR_PRINT_VAL
- if (current % 1000 == 0)
- erts_fprintf(stderr, "%b64u\n", current);
+ if (current % 1000 == 0)
+ erts_fprintf(stderr, "%b64u\n", current);
#endif
- handle_wakeup_requests(current);
+ handle_wakeup_requests(current);
+
+ if (waiting_unmanaged) {
+ waiting_unmanaged = 0;
+ tpd->leader_state.umrefc_ix.waiting = -1;
+ erts_atomic32_read_band_nob(&intrnl->misc.data.lflgs,
+ ~ERTS_THR_PRGR_LFLG_WAITING_UM);
}
+ tpd->leader_state.chk_next_ix = 0;
+
+ done:
if (tpd->active) {
lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
@@ -712,20 +784,44 @@ leader_update(ErtsThrPrgrData *tpd)
(void) block_thread(tpd);
}
else {
+ int force_wakeup_check = 0;
+ erts_aint32_t set_flags = ERTS_THR_PRGR_LFLG_NO_LEADER;
tpd->leader = 0;
- tpd->previous.current = ERTS_THR_PRGR_VAL_WAITING;
+ tpd->leader_state.current = ERTS_THR_PRGR_VAL_WAITING;
#if ERTS_THR_PRGR_PRINT_LEADER
erts_fprintf(stderr, "L <- %d\n", tpd->id);
#endif
ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 0);
+ if (waiting_unmanaged)
+ set_flags |= ERTS_THR_PRGR_LFLG_WAITING_UM;
+
lflgs = erts_atomic32_read_bor_relb(&intrnl->misc.data.lflgs,
- ERTS_THR_PRGR_LFLG_NO_LEADER);
+ set_flags);
+ lflgs |= set_flags;
if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
lflgs = block_thread(tpd);
- if (ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0 && got_sched_wakeups())
+
+ if (waiting_unmanaged) {
+ /* Need to check umrefc again */
+ ETHR_MEMBAR(ETHR_StoreLoad);
+ refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc);
+ if (refc == 0) {
+ /* Need to force wakeup check */
+ force_wakeup_check = 1;
+ }
+ }
+
+ if ((force_wakeup_check
+ || ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER
+ | ERTS_THR_PRGR_LFLG_WAITING_UM
+ | ERTS_THR_PRGR_LFLG_ACTIVE_MASK))
+ == ERTS_THR_PRGR_LFLG_NO_LEADER))
+ && got_sched_wakeups()) {
+ /* Someone need to make progress */
wakeup_managed(0);
+ }
}
}
@@ -744,11 +840,11 @@ update(ErtsThrPrgrData *tpd)
erts_aint32_t lflgs;
res = 0;
val = read_acqb(&erts_thr_prgr__.current);
- if (tpd->previous.local == val) {
+ if (tpd->confirmed == val) {
val++;
if (val == ERTS_THR_PRGR_VAL_WAITING)
val = 0;
- tpd->previous.local = val;
+ tpd->confirmed = val;
set_mb(&intrnl->thr[tpd->id].data.current, val);
}
@@ -801,12 +897,19 @@ erts_thr_progress_prepare_wait(ErtsSchedulerData *esdp)
block_count_dec();
- tpd->previous.local = ERTS_THR_PRGR_VAL_WAITING;
+ tpd->confirmed = ERTS_THR_PRGR_VAL_WAITING;
set_mb(&intrnl->thr[tpd->id].data.current, ERTS_THR_PRGR_VAL_WAITING);
lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
- if (ERTS_THR_PRGR_LFLGS_ALL_WAITING(lflgs) && got_sched_wakeups())
- wakeup_managed(0); /* Someone need to make progress */
+
+ if ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER
+ | ERTS_THR_PRGR_LFLG_WAITING_UM
+ | ERTS_THR_PRGR_LFLG_ACTIVE_MASK))
+ == ERTS_THR_PRGR_LFLG_NO_LEADER
+ && got_sched_wakeups()) {
+ /* Someone need to make progress */
+ wakeup_managed(0);
+ }
}
void
@@ -828,7 +931,7 @@ erts_thr_progress_finalize_wait(ErtsSchedulerData *esdp)
val++;
if (val == ERTS_THR_PRGR_VAL_WAITING)
val = 0;
- tpd->previous.local = val;
+ tpd->confirmed = val;
set_mb(&intrnl->thr[tpd->id].data.current, val);
val = read_acqb(&erts_thr_prgr__.current);
if (current == val)
@@ -875,6 +978,68 @@ erts_thr_progress_active(ErtsSchedulerData *esdp, int on)
}
+static ERTS_INLINE void
+unmanaged_continue(ErtsThrPrgrDelayHandle handle)
+{
+ int umrefc_ix = (int) handle;
+ erts_aint_t refc;
+
+ ASSERT(umrefc_ix == 0 || umrefc_ix == 1);
+ refc = erts_atomic_dec_read_relb(&intrnl->umrefc[umrefc_ix].refc);
+ ASSERT(refc >= 0);
+ if (refc == 0) {
+ erts_aint_t lflgs;
+ ERTS_THR_READ_MEMORY_BARRIER;
+ lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
+ if ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER
+ | ERTS_THR_PRGR_LFLG_WAITING_UM
+ | ERTS_THR_PRGR_LFLG_ACTIVE_MASK))
+ == (ERTS_THR_PRGR_LFLG_NO_LEADER|ERTS_THR_PRGR_LFLG_WAITING_UM)
+ && got_sched_wakeups()) {
+ /* Others waiting for us... */
+ wakeup_managed(0);
+ }
+ }
+}
+
+void
+erts_thr_progress_unmanaged_continue__(ErtsThrPrgrDelayHandle handle)
+{
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
+ ERTS_LC_ASSERT(tpd && tpd->is_delaying);
+ tpd->is_delaying = 0;
+ return_tmp_thr_prgr_data(tpd);
+#endif
+ ASSERT(!erts_thr_progress_is_managed_thread());
+
+ unmanaged_continue(handle);
+}
+
+ErtsThrPrgrDelayHandle
+erts_thr_progress_unmanaged_delay__(void)
+{
+ int umrefc_ix;
+ ASSERT(!erts_thr_progress_is_managed_thread());
+ umrefc_ix = (int) erts_atomic32_read_acqb(&intrnl->misc.data.umrefc_ix.current);
+ while (1) {
+ int tmp_ix;
+ erts_atomic_inc_acqb(&intrnl->umrefc[umrefc_ix].refc);
+ tmp_ix = (int) erts_atomic32_read_acqb(&intrnl->misc.data.umrefc_ix.current);
+ if (tmp_ix == umrefc_ix)
+ break;
+ unmanaged_continue(umrefc_ix);
+ umrefc_ix = tmp_ix;
+ }
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ {
+ ErtsThrPrgrData *tpd = tmp_thr_prgr_data(NULL);
+ tpd->is_delaying = 1;
+ }
+#endif
+ return (ErtsThrPrgrDelayHandle) umrefc_ix;
+}
+
static ERTS_INLINE int
has_reached_wakeup(ErtsThrPrgrVal wakeup)
{
@@ -931,7 +1096,7 @@ request_wakeup_managed(ErtsThrPrgrData *tpd, ErtsThrPrgrVal value)
*/
ASSERT(tpd->is_managed);
- ASSERT(tpd->previous.local != ERTS_THR_PRGR_VAL_WAITING);
+ ASSERT(tpd->confirmed != ERTS_THR_PRGR_VAL_WAITING);
if (has_reached_wakeup(value)) {
wakeup_managed(tpd->id);
@@ -946,7 +1111,7 @@ request_wakeup_managed(ErtsThrPrgrData *tpd, ErtsThrPrgrVal value)
tpd->wakeup_request[wix]));
- if (tpd->previous.local == value) {
+ if (tpd->confirmed == value) {
/*
* We have already confirmed this value. We need to request
* wakeup for a value later than our latest confirmed value in
diff --git a/erts/emulator/beam/erl_thr_progress.h b/erts/emulator/beam/erl_thr_progress.h
index e72321cf48..0e63d41fda 100644
--- a/erts/emulator/beam/erl_thr_progress.h
+++ b/erts/emulator/beam/erl_thr_progress.h
@@ -53,9 +53,22 @@ typedef Uint64 ErtsThrPrgrVal;
#define ERTS_THR_PRGR_WAKEUP_DATA_SIZE 4 /* Need to be an even power of 2. */
typedef struct {
+ ErtsThrPrgrVal next;
+ ErtsThrPrgrVal current;
+ int chk_next_ix;
+ struct {
+ int current;
+ int waiting;
+ } umrefc_ix;
+} ErtsThrPrgrLeaderState;
+
+typedef struct {
int id;
int is_managed;
int is_blocking;
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ int is_delaying; /* managed is always delaying */
+#endif
int is_temporary;
/* --- Part below only for registered threads --- */
@@ -66,11 +79,8 @@ typedef struct {
int leader; /* Needs to be first in the managed threads part */
int active;
- struct {
- ErtsThrPrgrVal local;
- ErtsThrPrgrVal next;
- ErtsThrPrgrVal current;
- } previous;
+ ErtsThrPrgrVal confirmed;
+ ErtsThrPrgrLeaderState leader_state;
} ErtsThrPrgrData;
void erts_thr_progress_fatal_error_block(SWord timeout,
@@ -121,6 +131,10 @@ typedef struct {
ERTS_THR_PRGR_ATOMIC current;
} ErtsThrPrgr;
+typedef int ErtsThrPrgrDelayHandle;
+#define ERTS_THR_PRGR_DHANDLE_MANAGED ((ErtsThrPrgrDelayHandle) -1)
+/* ERTS_THR_PRGR_DHANDLE_MANAGED implies managed thread */
+
extern ErtsThrPrgr erts_thr_prgr__;
void erts_thr_progress_pre_init(void);
@@ -136,6 +150,8 @@ int erts_thr_progress_update(ErtsSchedulerData *esdp);
int erts_thr_progress_leader_update(ErtsSchedulerData *esdp);
void erts_thr_progress_prepare_wait(ErtsSchedulerData *esdp);
void erts_thr_progress_finalize_wait(ErtsSchedulerData *esdp);
+ErtsThrPrgrDelayHandle erts_thr_progress_unmanaged_delay__(void);
+void erts_thr_progress_unmanaged_continue__(int umrefc_ix);
void erts_thr_progress_dbg_print_state(void);
@@ -148,6 +164,11 @@ ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_prgr_read_acqb__(ERTS_THR_PRGR_ATOMIC *a
ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_prgr_read_mb__(ERTS_THR_PRGR_ATOMIC *atmc);
ERTS_GLB_INLINE int erts_thr_progress_is_managed_thread(void);
+ERTS_GLB_INLINE ErtsThrPrgrDelayHandle erts_thr_progress_unmanaged_delay(void);
+ERTS_GLB_INLINE void erts_thr_progress_unmanaged_continue(ErtsThrPrgrDelayHandle handle);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ERTS_GLB_INLINE int erts_thr_progress_lc_is_delaying(void);
+#endif
ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_progress_current_to_later__(ErtsThrPrgrVal val);
ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_progress_later(ErtsSchedulerData *);
ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_progress_current(void);
@@ -229,6 +250,35 @@ erts_thr_progress_is_managed_thread(void)
return tpd && tpd->is_managed;
}
+ERTS_GLB_INLINE ErtsThrPrgrDelayHandle
+erts_thr_progress_unmanaged_delay(void)
+{
+ if (erts_thr_progress_is_managed_thread())
+ return ERTS_THR_PRGR_DHANDLE_MANAGED; /* Nothing to do */
+ else
+ return erts_thr_progress_unmanaged_delay__();
+}
+
+ERTS_GLB_INLINE void
+erts_thr_progress_unmanaged_continue(ErtsThrPrgrDelayHandle handle)
+{
+ ASSERT(handle != ERTS_THR_PRGR_DHANDLE_MANAGED
+ || erts_thr_progress_is_managed_thread());
+ if (handle != ERTS_THR_PRGR_DHANDLE_MANAGED)
+ erts_thr_progress_unmanaged_continue__(handle);
+}
+
+#ifdef ERTS_ENABLE_LOCK_CHECK
+
+ERTS_GLB_INLINE int
+erts_thr_progress_lc_is_delaying(void)
+{
+ ErtsThrPrgrData *tpd = erts_tsd_get(erts_thr_prgr_data_key__);
+ return tpd && tpd->is_delaying;
+}
+
+#endif
+
ERTS_GLB_INLINE ErtsThrPrgrVal
erts_thr_progress_current_to_later__(ErtsThrPrgrVal val)
{
@@ -248,7 +298,7 @@ erts_thr_progress_later(ErtsSchedulerData *esdp)
if (esdp) {
tpd = &esdp->thr_progress_data;
managed_thread:
- val = tpd->previous.local;
+ val = tpd->confirmed;
ERTS_THR_MEMORY_BARRIER;
}
else {
diff --git a/erts/emulator/test/driver_SUITE.erl b/erts/emulator/test/driver_SUITE.erl
index 643357263c..9a66257aff 100644
--- a/erts/emulator/test/driver_SUITE.erl
+++ b/erts/emulator/test/driver_SUITE.erl
@@ -77,7 +77,8 @@
thread_mseg_alloc_cache_clean/1,
otp_9302/1,
thr_free_drv/1,
- async_blast/1]).
+ async_blast/1,
+ thr_msg_blast/1]).
-export([bin_prefix/2]).
@@ -147,7 +148,8 @@ all() ->
thread_mseg_alloc_cache_clean,
otp_9302,
thr_free_drv,
- async_blast].
+ async_blast,
+ thr_msg_blast].
groups() ->
[{timer, [],
@@ -2010,7 +2012,64 @@ async_blast(Config) when is_list(Config) ->
?line erlang:display({async_blast_time, AsyncBlastTime}),
?line ok.
+thr_msg_blast_receiver(_Port, N, N) ->
+ ok;
+thr_msg_blast_receiver(Port, N, Max) ->
+ receive
+ {Port, hi} ->
+ thr_msg_blast_receiver(Port, N+1, Max)
+ end.
+
+thr_msg_blast_receiver_proc(Port, Max, Parent, Done) ->
+ case port_control(Port, 0, "") of
+ "receiver" ->
+ spawn(fun () ->
+ thr_msg_blast_receiver_proc(Port, Max+1, Parent, Done)
+ end),
+ thr_msg_blast_receiver(Port, 0, Max);
+ "done" ->
+ Parent ! Done
+ end.
+thr_msg_blast(Config) when is_list(Config) ->
+ case erlang:system_info(smp_support) of
+ false ->
+ {skipped, "Non-SMP emulator; nothing to test..."};
+ true ->
+ Path = ?config(data_dir, Config),
+ erl_ddll:start(),
+ ok = load_driver(Path, thr_msg_blast_drv),
+ MemBefore = driver_alloc_size(),
+ Start = os:timestamp(),
+ Port = open_port({spawn, thr_msg_blast_drv}, []),
+ true = is_port(Port),
+ Done = make_ref(),
+ Me = self(),
+ spawn(fun () ->
+ thr_msg_blast_receiver_proc(Port, 1, Me, Done)
+ end),
+ receive
+ Done -> ok
+ end,
+ ok = thr_msg_blast_receiver(Port, 0, 32*10000),
+ port_close(Port),
+ End = os:timestamp(),
+ receive
+ Garbage ->
+ ?t:fail({received_garbage, Port, Garbage})
+ after 2000 ->
+ ok
+ end,
+ MemAfter = driver_alloc_size(),
+ io:format("MemBefore=~p, MemAfter=~p~n",
+ [MemBefore, MemAfter]),
+ ThrMsgBlastTime = timer:now_diff(End,Start)/1000000,
+ io:format("ThrMsgBlastTime=~p~n", [ThrMsgBlastTime]),
+ MemBefore = MemAfter,
+ Res = {thr_msg_blast_time, ThrMsgBlastTime},
+ erlang:display(Res),
+ Res
+ end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Utilities
diff --git a/erts/emulator/test/driver_SUITE_data/Makefile.src b/erts/emulator/test/driver_SUITE_data/Makefile.src
index 9cc107cc66..b667dff6b6 100644
--- a/erts/emulator/test/driver_SUITE_data/Makefile.src
+++ b/erts/emulator/test/driver_SUITE_data/Makefile.src
@@ -14,7 +14,8 @@ MISC_DRVS = outputv_drv@dll@ \
thr_alloc_drv@dll@ \
otp_9302_drv@dll@ \
thr_free_drv@dll@ \
- async_blast_drv@dll@
+ async_blast_drv@dll@ \
+ thr_msg_blast_drv@dll@
SYS_INFO_DRVS = sys_info_base_drv@dll@ \
sys_info_prev_drv@dll@ \
diff --git a/erts/emulator/test/driver_SUITE_data/thr_msg_blast_drv.c b/erts/emulator/test/driver_SUITE_data/thr_msg_blast_drv.c
new file mode 100644
index 0000000000..1070678d7b
--- /dev/null
+++ b/erts/emulator/test/driver_SUITE_data/thr_msg_blast_drv.c
@@ -0,0 +1,178 @@
+/*
+ * %CopyrightBegin%
+ *
+ * Copyright Ericsson AB 2012. All Rights Reserved.
+ *
+ * The contents of this file are subject to the Erlang Public License,
+ * Version 1.1, (the "License"); you may not use this file except in
+ * compliance with the License. You should have received a copy of the
+ * Erlang Public License along with this software. If not, it can be
+ * retrieved online at http://www.erlang.org/.
+ *
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+ * the License for the specific language governing rights and limitations
+ * under the License.
+ *
+ * %CopyrightEnd%
+ */
+
+#include "erl_driver.h"
+
+#define THR_MSG_BLAST_NO_PROCS 10
+#define THR_MSG_BLAST_NO_SENDS_PER_PROC 10000
+
+#define THR_MSG_BLAST_THREADS 32
+
+static void stop(ErlDrvData drv_data);
+static ErlDrvData start(ErlDrvPort port,
+ char *command);
+static ErlDrvSSizeT control(ErlDrvData drv_data,
+ unsigned int command,
+ char *buf, ErlDrvSizeT len,
+ char **rbuf, ErlDrvSizeT rlen);
+
+static ErlDrvEntry thr_msg_blast_drv_entry = {
+ NULL /* init */,
+ start,
+ stop,
+ NULL /* output */,
+ NULL /* ready_input */,
+ NULL /* ready_output */,
+ "thr_msg_blast_drv",
+ NULL /* finish */,
+ NULL /* handle */,
+ control,
+ NULL /* timeout */,
+ NULL /* outputv */,
+ NULL /* ready_async */,
+ NULL /* flush */,
+ NULL /* call */,
+ NULL /* event */,
+ ERL_DRV_EXTENDED_MARKER,
+ ERL_DRV_EXTENDED_MAJOR_VERSION,
+ ERL_DRV_EXTENDED_MINOR_VERSION,
+ ERL_DRV_FLAG_USE_PORT_LOCKING,
+ NULL /* handle2 */,
+ NULL /* handle_monitor */
+};
+
+typedef struct {
+ ErlDrvPort port;
+ ErlDrvTermData td_port;
+ ErlDrvTermData hi;
+ ErlDrvTid tid[THR_MSG_BLAST_THREADS];
+ int no_thrs;
+ ErlDrvTermData proc[THR_MSG_BLAST_NO_PROCS];
+ int no_procs;
+} thr_msg_blast_data_t;
+
+
+DRIVER_INIT(thr_msg_blast_drv)
+{
+ return &thr_msg_blast_drv_entry;
+}
+
+static void stop(ErlDrvData drv_data)
+{
+ int i;
+ thr_msg_blast_data_t *tmbd = (thr_msg_blast_data_t *) drv_data;
+ for (i = 0; i < tmbd->no_thrs; i++)
+ erl_drv_thread_join(tmbd->tid[i], NULL);
+ driver_free((void *) tmbd);
+}
+
+static ErlDrvData start(ErlDrvPort port,
+ char *command)
+{
+ thr_msg_blast_data_t *tmbd;
+
+ tmbd = driver_alloc(sizeof(thr_msg_blast_data_t));
+ if (!tmbd)
+ return ERL_DRV_ERROR_GENERAL;
+
+ tmbd->port = port;
+ tmbd->td_port = driver_mk_port(port);
+ tmbd->hi = driver_mk_atom("hi");
+ tmbd->no_thrs = 0;
+ tmbd->no_procs = 1;
+ tmbd->proc[0] = driver_caller(port);
+
+ return (ErlDrvData) tmbd;
+}
+
+static void *thread(void *);
+
+static ErlDrvSSizeT control(ErlDrvData drv_data,
+ unsigned int command,
+ char *buf, ErlDrvSizeT len,
+ char **rbuf, ErlDrvSizeT rlen)
+{
+ thr_msg_blast_data_t *tmbd = (thr_msg_blast_data_t *) drv_data;
+ char *res_str = "error";
+
+ if (tmbd->no_procs >= THR_MSG_BLAST_NO_PROCS) {
+ int i;
+ for (i = 0; i < tmbd->no_thrs; i++)
+ erl_drv_thread_join(tmbd->tid[i], NULL);
+ tmbd->no_thrs = 0;
+ res_str = "done";
+ }
+ else {
+
+ tmbd->proc[tmbd->no_procs++] = driver_caller(tmbd->port);
+
+ if (tmbd->no_procs == THR_MSG_BLAST_NO_PROCS) {
+ for (tmbd->no_thrs = 0;
+ tmbd->no_thrs < THR_MSG_BLAST_THREADS;
+ tmbd->no_thrs++) {
+ int res = erl_drv_thread_create("test",
+ &tmbd->tid[tmbd->no_thrs],
+ thread,
+ tmbd,
+ NULL);
+ if (res != 0) {
+ driver_failure_posix(tmbd->port, res);
+ goto done;
+ }
+ }
+ }
+
+ res_str = "receiver";
+ }
+
+ done: {
+ ErlDrvSSizeT res_len = strlen(res_str);
+ if (res_len > rlen) {
+ char *abuf = driver_alloc(sizeof(char)*res_len);
+ if (!abuf)
+ return 0;
+ *rbuf = abuf;
+ }
+
+ memcpy((void *) *rbuf, (void *) res_str, res_len);
+
+ return res_len;
+ }
+}
+
+static void *thread(void *varg)
+{
+ int s, p;
+ thr_msg_blast_data_t *tmbd = (thr_msg_blast_data_t *) varg;
+ ErlDrvTermData spec[] = {
+ ERL_DRV_PORT, tmbd->td_port,
+ ERL_DRV_ATOM, tmbd->hi,
+ ERL_DRV_TUPLE, 2
+ };
+
+ for (s = 0; s < THR_MSG_BLAST_NO_SENDS_PER_PROC; s++) {
+ for (p = 0; p < THR_MSG_BLAST_NO_PROCS; p++) {
+ int res = driver_send_term(tmbd->port, tmbd->proc[p],
+ spec, sizeof(spec)/sizeof(spec[0]));
+ if (p == 0 && res <= 0)
+ abort(); /* Could not send to creator */
+ }
+ }
+ return NULL;
+}