diff options
author | Rickard Green <[email protected]> | 2012-08-30 23:25:34 +0200 |
---|---|---|
committer | Rickard Green <[email protected]> | 2012-12-03 21:18:03 +0100 |
commit | d827ac847517ed43339a6a86493c7c017be782a5 (patch) | |
tree | 6e7a85323ac80f99f73ee838badff0236bcc1276 /erts | |
parent | 88126e785de24f5f41068c610bc13840dcab4a7d (diff) | |
download | otp-d827ac847517ed43339a6a86493c7c017be782a5.tar.gz otp-d827ac847517ed43339a6a86493c7c017be782a5.tar.bz2 otp-d827ac847517ed43339a6a86493c7c017be782a5.zip |
Implement functionality for delaying thread progress from unmanaged threads
Diffstat (limited to 'erts')
-rw-r--r-- | erts/emulator/beam/erl_process_lock.c | 36 | ||||
-rw-r--r-- | erts/emulator/beam/erl_thr_progress.c | 285 | ||||
-rw-r--r-- | erts/emulator/beam/erl_thr_progress.h | 62 | ||||
-rw-r--r-- | erts/emulator/test/driver_SUITE.erl | 63 | ||||
-rw-r--r-- | erts/emulator/test/driver_SUITE_data/Makefile.src | 3 | ||||
-rw-r--r-- | erts/emulator/test/driver_SUITE_data/thr_msg_blast_drv.c | 178 |
6 files changed, 543 insertions, 84 deletions
diff --git a/erts/emulator/beam/erl_process_lock.c b/erts/emulator/beam/erl_process_lock.c index b864340782..fa935af315 100644 --- a/erts/emulator/beam/erl_process_lock.c +++ b/erts/emulator/beam/erl_process_lock.c @@ -66,6 +66,7 @@ #endif #include "erl_process.h" +#include "erl_thr_progress.h" const Process erts_proc_lock_busy = {ERTS_INVALID_PID}; @@ -671,7 +672,7 @@ erts_proc_lock_prepare_proc_lock_waiter(void) */ static void -proc_safelock(int is_sched, +proc_safelock(int is_managed, Process *a_proc, ErtsProcLocks a_have_locks, ErtsProcLocks a_need_locks, @@ -797,7 +798,7 @@ proc_safelock(int is_sched, if (unlock_locks) { have_locks1 &= ~unlock_locks; need_locks1 |= unlock_locks; - if (!is_sched && !have_locks1) { + if (!is_managed && !have_locks1) { refc1 = 1; erts_smp_proc_inc_refc(p1); } @@ -807,7 +808,7 @@ proc_safelock(int is_sched, if (unlock_locks) { have_locks2 &= ~unlock_locks; need_locks2 |= unlock_locks; - if (!is_sched && !have_locks2) { + if (!is_managed && !have_locks2) { refc2 = 1; erts_smp_proc_inc_refc(p2); } @@ -888,7 +889,7 @@ proc_safelock(int is_sched, } #endif - if (!is_sched) { + if (!is_managed) { if (refc1) erts_smp_proc_dec_refc(p1); if (refc2) @@ -921,7 +922,7 @@ erts_pid2proc_opt(Process *c_p, int flags) { Process *dec_refc_proc = NULL; - int need_ptl; + ErtsThrPrgrDelayHandle dhndl; ErtsProcLocks need_locks; Uint pix; Process *proc; @@ -959,10 +960,7 @@ erts_pid2proc_opt(Process *c_p, } } - need_ptl = !erts_get_scheduler_id(); - - if (need_ptl) - erts_smp_rwmtx_rwlock(&erts_proc_tab_rwmtx); + dhndl = erts_thr_progress_unmanaged_delay(); proc = (Process *) erts_smp_atomic_read_ddrb(&erts_proc.tab[pix]); @@ -1026,6 +1024,7 @@ erts_pid2proc_opt(Process *c_p, if (flags & ERTS_P2P_FLG_TRY_LOCK) proc = ERTS_PROC_LOCK_BUSY; else { + int managed; if (flags & ERTS_P2P_FLG_SMP_INC_REFC) erts_smp_proc_inc_refc(proc); @@ -1033,14 +1032,21 @@ erts_pid2proc_opt(Process *c_p, erts_lcnt_proc_lock_unaquire(&proc->lock, lcnt_locks); #endif - if (need_ptl) { + managed = dhndl == ERTS_THR_PRGR_DHANDLE_MANAGED; + if (!managed) { erts_smp_proc_inc_refc(proc); + erts_thr_progress_unmanaged_continue(dhndl); dec_refc_proc = proc; - erts_smp_rwmtx_rwunlock(&erts_proc_tab_rwmtx); - need_ptl = 0; + + /* + * We don't want to call + * erts_thr_progress_unmanaged_continue() + * again. + */ + dhndl = ERTS_THR_PRGR_DHANDLE_MANAGED; } - proc_safelock(!need_ptl, + proc_safelock(managed, c_p, c_p_have_locks, c_p_have_locks, @@ -1052,8 +1058,8 @@ erts_pid2proc_opt(Process *c_p, } } - if (need_ptl) - erts_smp_rwmtx_rwunlock(&erts_proc_tab_rwmtx); + if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) + erts_thr_progress_unmanaged_continue(dhndl); if (need_locks && proc diff --git a/erts/emulator/beam/erl_thr_progress.c b/erts/emulator/beam/erl_thr_progress.c index 88524bdd4c..3b2f95d43b 100644 --- a/erts/emulator/beam/erl_thr_progress.c +++ b/erts/emulator/beam/erl_thr_progress.c @@ -96,17 +96,14 @@ #define ERTS_THR_PRGR_LFLG_BLOCK (((erts_aint32_t) 1) << 31) #define ERTS_THR_PRGR_LFLG_NO_LEADER (((erts_aint32_t) 1) << 30) -#define ERTS_THR_PRGR_LFLG_ACTIVE_MASK (~(ERTS_THR_PRGR_LFLG_NO_LEADER \ - | ERTS_THR_PRGR_LFLG_BLOCK)) +#define ERTS_THR_PRGR_LFLG_WAITING_UM (((erts_aint32_t) 1) << 29) +#define ERTS_THR_PRGR_LFLG_ACTIVE_MASK (~(ERTS_THR_PRGR_LFLG_NO_LEADER \ + | ERTS_THR_PRGR_LFLG_BLOCK \ + | ERTS_THR_PRGR_LFLG_WAITING_UM)) -#define ERTS_THR_PRGR_LFLGS_ACTIVE(LFLGS) \ +#define ERTS_THR_PRGR_LFLGS_ACTIVE(LFLGS) \ ((LFLGS) & ERTS_THR_PRGR_LFLG_ACTIVE_MASK) -#define ERTS_THR_PRGR_LFLGS_ALL_WAITING(LFLGS) \ - (((LFLGS) & (ERTS_THR_PRGR_LFLG_NO_LEADER \ - |ERTS_THR_PRGR_LFLG_ACTIVE_MASK)) \ - == ERTS_THR_PRGR_LFLG_NO_LEADER) - /* * We use a 64-bit value for thread progress. By this wrapping of * the thread progress will more or less never occur. @@ -262,6 +259,11 @@ typedef struct { erts_atomic32_t managed_count; erts_atomic32_t managed_id; erts_atomic32_t unmanaged_id; + int chk_next_ix; + struct { + int waiting; + erts_atomic32_t current; + } umrefc_ix; } ErtsThrPrgrMiscData; typedef struct { @@ -276,12 +278,18 @@ typedef union { char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrPrgrElement))]; } ErtsThrPrgrArray; +typedef union { + erts_atomic_t refc; + char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_atomic_t))]; +} ErtsThrPrgrUnmanagedRefc; + typedef struct { union { ErtsThrPrgrMiscData data; char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE( sizeof(ErtsThrPrgrMiscData))]; } misc; + ErtsThrPrgrUnmanagedRefc umrefc[2]; ErtsThrPrgrArray *thr; struct { int no; @@ -346,7 +354,9 @@ init_tmp_thr_prgr_data(ErtsThrPrgrData *tpd) tpd->is_managed = 0; tpd->is_blocking = 0; tpd->is_temporary = 1; - +#ifdef ERTS_ENABLE_LOCK_CHECK + tpd->is_delaying = 0; +#endif erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd); } @@ -461,6 +471,9 @@ erts_thr_progress_init(int no_schedulers, int managed, int unmanaged) erts_atomic32_init_nob(&intrnl->misc.data.managed_count, 0); erts_atomic32_init_nob(&intrnl->misc.data.managed_id, no_schedulers); erts_atomic32_init_nob(&intrnl->misc.data.unmanaged_id, -1); + intrnl->misc.data.chk_next_ix = 0; + intrnl->misc.data.umrefc_ix.waiting = -1; + erts_atomic32_init_nob(&intrnl->misc.data.umrefc_ix.current, 0); intrnl->thr = (ErtsThrPrgrArray *) ptr; ptr += thr_arr_sz; @@ -547,6 +560,9 @@ erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrCallbacks *callbacks) tpd->is_managed = 0; tpd->is_blocking = is_blocking; tpd->is_temporary = 0; +#ifdef ERTS_ENABLE_LOCK_CHECK + tpd->is_delaying = 0; +#endif ASSERT(tpd->id >= 0); if (tpd->id >= intrnl->unmanaged.no) erl_exit(ERTS_ABORT_EXIT, @@ -600,6 +616,9 @@ erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp, tpd->is_managed = 1; tpd->is_blocking = is_blocking; tpd->is_temporary = 0; +#ifdef ERTS_ENABLE_LOCK_CHECK + tpd->is_delaying = 1; +#endif init_wakeup_request_array(&tpd->wakeup_request[0]); @@ -607,8 +626,8 @@ erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp, tpd->leader = 0; tpd->active = 1; - tpd->previous.local = 0; - tpd->previous.current = ERTS_THR_PRGR_VAL_WAITING; + tpd->confirmed = 0; + tpd->leader_state.current = ERTS_THR_PRGR_VAL_WAITING; erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd); erts_atomic32_inc_nob(&intrnl->misc.data.lflgs); @@ -651,60 +670,113 @@ leader_update(ErtsThrPrgrData *tpd) block_thread(tpd); } else { + ErtsThrPrgrVal current; + int ix, chk_next_ix, umrefc_ix, my_ix, no_managed, waiting_unmanaged; erts_aint32_t lflgs; ErtsThrPrgrVal next; - int ix, sz, make_progress; + erts_aint_t refc; - if (tpd->previous.current == ERTS_THR_PRGR_VAL_WAITING) { - /* Took over as leader from another thread */ - tpd->previous.current = read_acqb(&erts_thr_prgr__.current); - tpd->previous.next = tpd->previous.current; - tpd->previous.next++; - if (tpd->previous.next == ERTS_THR_PRGR_VAL_WAITING) - tpd->previous.next = 0; - } + my_ix = tpd->id; - if (tpd->previous.local == tpd->previous.current) { - ErtsThrPrgrVal val = tpd->previous.current + 1; - if (val == ERTS_THR_PRGR_VAL_WAITING) - val = 0; - tpd->previous.local = val; - set_mb(&intrnl->thr[tpd->id].data.current, val); + if (tpd->leader_state.current == ERTS_THR_PRGR_VAL_WAITING) { + /* Took over as leader from another thread */ + tpd->leader_state.current = read_nob(&erts_thr_prgr__.current); + tpd->leader_state.next = tpd->leader_state.current; + tpd->leader_state.next++; + if (tpd->leader_state.next == ERTS_THR_PRGR_VAL_WAITING) + tpd->leader_state.next = 0; + tpd->leader_state.chk_next_ix = intrnl->misc.data.chk_next_ix; + tpd->leader_state.umrefc_ix.waiting = intrnl->misc.data.umrefc_ix.waiting; + tpd->leader_state.umrefc_ix.current = + (int) erts_atomic32_read_nob(&intrnl->misc.data.umrefc_ix.current); + + if (tpd->confirmed == tpd->leader_state.current) { + ErtsThrPrgrVal val = tpd->leader_state.current + 1; + if (val == ERTS_THR_PRGR_VAL_WAITING) + val = 0; + tpd->confirmed = val; + set_mb(&intrnl->thr[my_ix].data.current, val); + } } - next = tpd->previous.next; - make_progress = 1; - sz = intrnl->managed.no; - for (ix = 0; ix < sz; ix++) { - ErtsThrPrgrVal tmp; - tmp = read_nob(&intrnl->thr[ix].data.current); - if (tmp != next && tmp != ERTS_THR_PRGR_VAL_WAITING) { - make_progress = 0; - ASSERT(erts_thr_progress_has_passed__(next, tmp)); - break; + next = tpd->leader_state.next; + + waiting_unmanaged = 0; + umrefc_ix = -1; /* Shut up annoying warning */ + + chk_next_ix = tpd->leader_state.chk_next_ix; + no_managed = intrnl->managed.no; + ASSERT(0 <= chk_next_ix && chk_next_ix <= no_managed); + /* Check manged threads */ + if (chk_next_ix < no_managed) { + for (ix = chk_next_ix; ix < no_managed; ix++) { + ErtsThrPrgrVal tmp; + if (ix == my_ix) + continue; + tmp = read_nob(&intrnl->thr[ix].data.current); + if (tmp != next && tmp != ERTS_THR_PRGR_VAL_WAITING) { + tpd->leader_state.chk_next_ix = ix; + ASSERT(erts_thr_progress_has_passed__(next, tmp)); + goto done; + } } } - if (make_progress) { - ErtsThrPrgrVal current = next; + /* Check unmanged threads */ + waiting_unmanaged = tpd->leader_state.umrefc_ix.waiting != -1; + umrefc_ix = (waiting_unmanaged + ? tpd->leader_state.umrefc_ix.waiting + : tpd->leader_state.umrefc_ix.current); + refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc); + ASSERT(refc >= 0); + if (refc != 0) { + int new_umrefc_ix; + + if (waiting_unmanaged) + goto done; + + new_umrefc_ix = (umrefc_ix + 1) & 0x1; + tpd->leader_state.umrefc_ix.waiting = umrefc_ix; + tpd->leader_state.chk_next_ix = no_managed; + erts_atomic32_set_nob(&intrnl->misc.data.umrefc_ix.current, + (erts_aint32_t) new_umrefc_ix); + ETHR_MEMBAR(ETHR_StoreLoad); + refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc); + ASSERT(refc >= 0); + waiting_unmanaged = 1; + if (refc != 0) + goto done; + } + + /* Make progress */ + current = next; - next++; - if (next == ERTS_THR_PRGR_VAL_WAITING) - next = 0; + next++; + if (next == ERTS_THR_PRGR_VAL_WAITING) + next = 0; - set_nob(&intrnl->thr[tpd->id].data.current, next); - set_mb(&erts_thr_prgr__.current, current); - tpd->previous.local = next; - tpd->previous.next = next; - tpd->previous.current = current; + set_nob(&intrnl->thr[my_ix].data.current, next); + set_mb(&erts_thr_prgr__.current, current); + tpd->confirmed = next; + tpd->leader_state.next = next; + tpd->leader_state.current = current; #if ERTS_THR_PRGR_PRINT_VAL - if (current % 1000 == 0) - erts_fprintf(stderr, "%b64u\n", current); + if (current % 1000 == 0) + erts_fprintf(stderr, "%b64u\n", current); #endif - handle_wakeup_requests(current); + handle_wakeup_requests(current); + + if (waiting_unmanaged) { + waiting_unmanaged = 0; + tpd->leader_state.umrefc_ix.waiting = -1; + erts_atomic32_read_band_nob(&intrnl->misc.data.lflgs, + ~ERTS_THR_PRGR_LFLG_WAITING_UM); } + tpd->leader_state.chk_next_ix = 0; + + done: if (tpd->active) { lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); @@ -712,20 +784,44 @@ leader_update(ErtsThrPrgrData *tpd) (void) block_thread(tpd); } else { + int force_wakeup_check = 0; + erts_aint32_t set_flags = ERTS_THR_PRGR_LFLG_NO_LEADER; tpd->leader = 0; - tpd->previous.current = ERTS_THR_PRGR_VAL_WAITING; + tpd->leader_state.current = ERTS_THR_PRGR_VAL_WAITING; #if ERTS_THR_PRGR_PRINT_LEADER erts_fprintf(stderr, "L <- %d\n", tpd->id); #endif ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 0); + if (waiting_unmanaged) + set_flags |= ERTS_THR_PRGR_LFLG_WAITING_UM; + lflgs = erts_atomic32_read_bor_relb(&intrnl->misc.data.lflgs, - ERTS_THR_PRGR_LFLG_NO_LEADER); + set_flags); + lflgs |= set_flags; if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK) lflgs = block_thread(tpd); - if (ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0 && got_sched_wakeups()) + + if (waiting_unmanaged) { + /* Need to check umrefc again */ + ETHR_MEMBAR(ETHR_StoreLoad); + refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc); + if (refc == 0) { + /* Need to force wakeup check */ + force_wakeup_check = 1; + } + } + + if ((force_wakeup_check + || ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER + | ERTS_THR_PRGR_LFLG_WAITING_UM + | ERTS_THR_PRGR_LFLG_ACTIVE_MASK)) + == ERTS_THR_PRGR_LFLG_NO_LEADER)) + && got_sched_wakeups()) { + /* Someone need to make progress */ wakeup_managed(0); + } } } @@ -744,11 +840,11 @@ update(ErtsThrPrgrData *tpd) erts_aint32_t lflgs; res = 0; val = read_acqb(&erts_thr_prgr__.current); - if (tpd->previous.local == val) { + if (tpd->confirmed == val) { val++; if (val == ERTS_THR_PRGR_VAL_WAITING) val = 0; - tpd->previous.local = val; + tpd->confirmed = val; set_mb(&intrnl->thr[tpd->id].data.current, val); } @@ -801,12 +897,19 @@ erts_thr_progress_prepare_wait(ErtsSchedulerData *esdp) block_count_dec(); - tpd->previous.local = ERTS_THR_PRGR_VAL_WAITING; + tpd->confirmed = ERTS_THR_PRGR_VAL_WAITING; set_mb(&intrnl->thr[tpd->id].data.current, ERTS_THR_PRGR_VAL_WAITING); lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); - if (ERTS_THR_PRGR_LFLGS_ALL_WAITING(lflgs) && got_sched_wakeups()) - wakeup_managed(0); /* Someone need to make progress */ + + if ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER + | ERTS_THR_PRGR_LFLG_WAITING_UM + | ERTS_THR_PRGR_LFLG_ACTIVE_MASK)) + == ERTS_THR_PRGR_LFLG_NO_LEADER + && got_sched_wakeups()) { + /* Someone need to make progress */ + wakeup_managed(0); + } } void @@ -828,7 +931,7 @@ erts_thr_progress_finalize_wait(ErtsSchedulerData *esdp) val++; if (val == ERTS_THR_PRGR_VAL_WAITING) val = 0; - tpd->previous.local = val; + tpd->confirmed = val; set_mb(&intrnl->thr[tpd->id].data.current, val); val = read_acqb(&erts_thr_prgr__.current); if (current == val) @@ -875,6 +978,68 @@ erts_thr_progress_active(ErtsSchedulerData *esdp, int on) } +static ERTS_INLINE void +unmanaged_continue(ErtsThrPrgrDelayHandle handle) +{ + int umrefc_ix = (int) handle; + erts_aint_t refc; + + ASSERT(umrefc_ix == 0 || umrefc_ix == 1); + refc = erts_atomic_dec_read_relb(&intrnl->umrefc[umrefc_ix].refc); + ASSERT(refc >= 0); + if (refc == 0) { + erts_aint_t lflgs; + ERTS_THR_READ_MEMORY_BARRIER; + lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs); + if ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER + | ERTS_THR_PRGR_LFLG_WAITING_UM + | ERTS_THR_PRGR_LFLG_ACTIVE_MASK)) + == (ERTS_THR_PRGR_LFLG_NO_LEADER|ERTS_THR_PRGR_LFLG_WAITING_UM) + && got_sched_wakeups()) { + /* Others waiting for us... */ + wakeup_managed(0); + } + } +} + +void +erts_thr_progress_unmanaged_continue__(ErtsThrPrgrDelayHandle handle) +{ +#ifdef ERTS_ENABLE_LOCK_CHECK + ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL); + ERTS_LC_ASSERT(tpd && tpd->is_delaying); + tpd->is_delaying = 0; + return_tmp_thr_prgr_data(tpd); +#endif + ASSERT(!erts_thr_progress_is_managed_thread()); + + unmanaged_continue(handle); +} + +ErtsThrPrgrDelayHandle +erts_thr_progress_unmanaged_delay__(void) +{ + int umrefc_ix; + ASSERT(!erts_thr_progress_is_managed_thread()); + umrefc_ix = (int) erts_atomic32_read_acqb(&intrnl->misc.data.umrefc_ix.current); + while (1) { + int tmp_ix; + erts_atomic_inc_acqb(&intrnl->umrefc[umrefc_ix].refc); + tmp_ix = (int) erts_atomic32_read_acqb(&intrnl->misc.data.umrefc_ix.current); + if (tmp_ix == umrefc_ix) + break; + unmanaged_continue(umrefc_ix); + umrefc_ix = tmp_ix; + } +#ifdef ERTS_ENABLE_LOCK_CHECK + { + ErtsThrPrgrData *tpd = tmp_thr_prgr_data(NULL); + tpd->is_delaying = 1; + } +#endif + return (ErtsThrPrgrDelayHandle) umrefc_ix; +} + static ERTS_INLINE int has_reached_wakeup(ErtsThrPrgrVal wakeup) { @@ -931,7 +1096,7 @@ request_wakeup_managed(ErtsThrPrgrData *tpd, ErtsThrPrgrVal value) */ ASSERT(tpd->is_managed); - ASSERT(tpd->previous.local != ERTS_THR_PRGR_VAL_WAITING); + ASSERT(tpd->confirmed != ERTS_THR_PRGR_VAL_WAITING); if (has_reached_wakeup(value)) { wakeup_managed(tpd->id); @@ -946,7 +1111,7 @@ request_wakeup_managed(ErtsThrPrgrData *tpd, ErtsThrPrgrVal value) tpd->wakeup_request[wix])); - if (tpd->previous.local == value) { + if (tpd->confirmed == value) { /* * We have already confirmed this value. We need to request * wakeup for a value later than our latest confirmed value in diff --git a/erts/emulator/beam/erl_thr_progress.h b/erts/emulator/beam/erl_thr_progress.h index e72321cf48..0e63d41fda 100644 --- a/erts/emulator/beam/erl_thr_progress.h +++ b/erts/emulator/beam/erl_thr_progress.h @@ -53,9 +53,22 @@ typedef Uint64 ErtsThrPrgrVal; #define ERTS_THR_PRGR_WAKEUP_DATA_SIZE 4 /* Need to be an even power of 2. */ typedef struct { + ErtsThrPrgrVal next; + ErtsThrPrgrVal current; + int chk_next_ix; + struct { + int current; + int waiting; + } umrefc_ix; +} ErtsThrPrgrLeaderState; + +typedef struct { int id; int is_managed; int is_blocking; +#ifdef ERTS_ENABLE_LOCK_CHECK + int is_delaying; /* managed is always delaying */ +#endif int is_temporary; /* --- Part below only for registered threads --- */ @@ -66,11 +79,8 @@ typedef struct { int leader; /* Needs to be first in the managed threads part */ int active; - struct { - ErtsThrPrgrVal local; - ErtsThrPrgrVal next; - ErtsThrPrgrVal current; - } previous; + ErtsThrPrgrVal confirmed; + ErtsThrPrgrLeaderState leader_state; } ErtsThrPrgrData; void erts_thr_progress_fatal_error_block(SWord timeout, @@ -121,6 +131,10 @@ typedef struct { ERTS_THR_PRGR_ATOMIC current; } ErtsThrPrgr; +typedef int ErtsThrPrgrDelayHandle; +#define ERTS_THR_PRGR_DHANDLE_MANAGED ((ErtsThrPrgrDelayHandle) -1) +/* ERTS_THR_PRGR_DHANDLE_MANAGED implies managed thread */ + extern ErtsThrPrgr erts_thr_prgr__; void erts_thr_progress_pre_init(void); @@ -136,6 +150,8 @@ int erts_thr_progress_update(ErtsSchedulerData *esdp); int erts_thr_progress_leader_update(ErtsSchedulerData *esdp); void erts_thr_progress_prepare_wait(ErtsSchedulerData *esdp); void erts_thr_progress_finalize_wait(ErtsSchedulerData *esdp); +ErtsThrPrgrDelayHandle erts_thr_progress_unmanaged_delay__(void); +void erts_thr_progress_unmanaged_continue__(int umrefc_ix); void erts_thr_progress_dbg_print_state(void); @@ -148,6 +164,11 @@ ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_prgr_read_acqb__(ERTS_THR_PRGR_ATOMIC *a ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_prgr_read_mb__(ERTS_THR_PRGR_ATOMIC *atmc); ERTS_GLB_INLINE int erts_thr_progress_is_managed_thread(void); +ERTS_GLB_INLINE ErtsThrPrgrDelayHandle erts_thr_progress_unmanaged_delay(void); +ERTS_GLB_INLINE void erts_thr_progress_unmanaged_continue(ErtsThrPrgrDelayHandle handle); +#ifdef ERTS_ENABLE_LOCK_CHECK +ERTS_GLB_INLINE int erts_thr_progress_lc_is_delaying(void); +#endif ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_progress_current_to_later__(ErtsThrPrgrVal val); ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_progress_later(ErtsSchedulerData *); ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_progress_current(void); @@ -229,6 +250,35 @@ erts_thr_progress_is_managed_thread(void) return tpd && tpd->is_managed; } +ERTS_GLB_INLINE ErtsThrPrgrDelayHandle +erts_thr_progress_unmanaged_delay(void) +{ + if (erts_thr_progress_is_managed_thread()) + return ERTS_THR_PRGR_DHANDLE_MANAGED; /* Nothing to do */ + else + return erts_thr_progress_unmanaged_delay__(); +} + +ERTS_GLB_INLINE void +erts_thr_progress_unmanaged_continue(ErtsThrPrgrDelayHandle handle) +{ + ASSERT(handle != ERTS_THR_PRGR_DHANDLE_MANAGED + || erts_thr_progress_is_managed_thread()); + if (handle != ERTS_THR_PRGR_DHANDLE_MANAGED) + erts_thr_progress_unmanaged_continue__(handle); +} + +#ifdef ERTS_ENABLE_LOCK_CHECK + +ERTS_GLB_INLINE int +erts_thr_progress_lc_is_delaying(void) +{ + ErtsThrPrgrData *tpd = erts_tsd_get(erts_thr_prgr_data_key__); + return tpd && tpd->is_delaying; +} + +#endif + ERTS_GLB_INLINE ErtsThrPrgrVal erts_thr_progress_current_to_later__(ErtsThrPrgrVal val) { @@ -248,7 +298,7 @@ erts_thr_progress_later(ErtsSchedulerData *esdp) if (esdp) { tpd = &esdp->thr_progress_data; managed_thread: - val = tpd->previous.local; + val = tpd->confirmed; ERTS_THR_MEMORY_BARRIER; } else { diff --git a/erts/emulator/test/driver_SUITE.erl b/erts/emulator/test/driver_SUITE.erl index 643357263c..9a66257aff 100644 --- a/erts/emulator/test/driver_SUITE.erl +++ b/erts/emulator/test/driver_SUITE.erl @@ -77,7 +77,8 @@ thread_mseg_alloc_cache_clean/1, otp_9302/1, thr_free_drv/1, - async_blast/1]). + async_blast/1, + thr_msg_blast/1]). -export([bin_prefix/2]). @@ -147,7 +148,8 @@ all() -> thread_mseg_alloc_cache_clean, otp_9302, thr_free_drv, - async_blast]. + async_blast, + thr_msg_blast]. groups() -> [{timer, [], @@ -2010,7 +2012,64 @@ async_blast(Config) when is_list(Config) -> ?line erlang:display({async_blast_time, AsyncBlastTime}), ?line ok. +thr_msg_blast_receiver(_Port, N, N) -> + ok; +thr_msg_blast_receiver(Port, N, Max) -> + receive + {Port, hi} -> + thr_msg_blast_receiver(Port, N+1, Max) + end. + +thr_msg_blast_receiver_proc(Port, Max, Parent, Done) -> + case port_control(Port, 0, "") of + "receiver" -> + spawn(fun () -> + thr_msg_blast_receiver_proc(Port, Max+1, Parent, Done) + end), + thr_msg_blast_receiver(Port, 0, Max); + "done" -> + Parent ! Done + end. +thr_msg_blast(Config) when is_list(Config) -> + case erlang:system_info(smp_support) of + false -> + {skipped, "Non-SMP emulator; nothing to test..."}; + true -> + Path = ?config(data_dir, Config), + erl_ddll:start(), + ok = load_driver(Path, thr_msg_blast_drv), + MemBefore = driver_alloc_size(), + Start = os:timestamp(), + Port = open_port({spawn, thr_msg_blast_drv}, []), + true = is_port(Port), + Done = make_ref(), + Me = self(), + spawn(fun () -> + thr_msg_blast_receiver_proc(Port, 1, Me, Done) + end), + receive + Done -> ok + end, + ok = thr_msg_blast_receiver(Port, 0, 32*10000), + port_close(Port), + End = os:timestamp(), + receive + Garbage -> + ?t:fail({received_garbage, Port, Garbage}) + after 2000 -> + ok + end, + MemAfter = driver_alloc_size(), + io:format("MemBefore=~p, MemAfter=~p~n", + [MemBefore, MemAfter]), + ThrMsgBlastTime = timer:now_diff(End,Start)/1000000, + io:format("ThrMsgBlastTime=~p~n", [ThrMsgBlastTime]), + MemBefore = MemAfter, + Res = {thr_msg_blast_time, ThrMsgBlastTime}, + erlang:display(Res), + Res + end. %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% Utilities diff --git a/erts/emulator/test/driver_SUITE_data/Makefile.src b/erts/emulator/test/driver_SUITE_data/Makefile.src index 9cc107cc66..b667dff6b6 100644 --- a/erts/emulator/test/driver_SUITE_data/Makefile.src +++ b/erts/emulator/test/driver_SUITE_data/Makefile.src @@ -14,7 +14,8 @@ MISC_DRVS = outputv_drv@dll@ \ thr_alloc_drv@dll@ \ otp_9302_drv@dll@ \ thr_free_drv@dll@ \ - async_blast_drv@dll@ + async_blast_drv@dll@ \ + thr_msg_blast_drv@dll@ SYS_INFO_DRVS = sys_info_base_drv@dll@ \ sys_info_prev_drv@dll@ \ diff --git a/erts/emulator/test/driver_SUITE_data/thr_msg_blast_drv.c b/erts/emulator/test/driver_SUITE_data/thr_msg_blast_drv.c new file mode 100644 index 0000000000..1070678d7b --- /dev/null +++ b/erts/emulator/test/driver_SUITE_data/thr_msg_blast_drv.c @@ -0,0 +1,178 @@ +/* + * %CopyrightBegin% + * + * Copyright Ericsson AB 2012. All Rights Reserved. + * + * The contents of this file are subject to the Erlang Public License, + * Version 1.1, (the "License"); you may not use this file except in + * compliance with the License. You should have received a copy of the + * Erlang Public License along with this software. If not, it can be + * retrieved online at http://www.erlang.org/. + * + * Software distributed under the License is distributed on an "AS IS" + * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See + * the License for the specific language governing rights and limitations + * under the License. + * + * %CopyrightEnd% + */ + +#include "erl_driver.h" + +#define THR_MSG_BLAST_NO_PROCS 10 +#define THR_MSG_BLAST_NO_SENDS_PER_PROC 10000 + +#define THR_MSG_BLAST_THREADS 32 + +static void stop(ErlDrvData drv_data); +static ErlDrvData start(ErlDrvPort port, + char *command); +static ErlDrvSSizeT control(ErlDrvData drv_data, + unsigned int command, + char *buf, ErlDrvSizeT len, + char **rbuf, ErlDrvSizeT rlen); + +static ErlDrvEntry thr_msg_blast_drv_entry = { + NULL /* init */, + start, + stop, + NULL /* output */, + NULL /* ready_input */, + NULL /* ready_output */, + "thr_msg_blast_drv", + NULL /* finish */, + NULL /* handle */, + control, + NULL /* timeout */, + NULL /* outputv */, + NULL /* ready_async */, + NULL /* flush */, + NULL /* call */, + NULL /* event */, + ERL_DRV_EXTENDED_MARKER, + ERL_DRV_EXTENDED_MAJOR_VERSION, + ERL_DRV_EXTENDED_MINOR_VERSION, + ERL_DRV_FLAG_USE_PORT_LOCKING, + NULL /* handle2 */, + NULL /* handle_monitor */ +}; + +typedef struct { + ErlDrvPort port; + ErlDrvTermData td_port; + ErlDrvTermData hi; + ErlDrvTid tid[THR_MSG_BLAST_THREADS]; + int no_thrs; + ErlDrvTermData proc[THR_MSG_BLAST_NO_PROCS]; + int no_procs; +} thr_msg_blast_data_t; + + +DRIVER_INIT(thr_msg_blast_drv) +{ + return &thr_msg_blast_drv_entry; +} + +static void stop(ErlDrvData drv_data) +{ + int i; + thr_msg_blast_data_t *tmbd = (thr_msg_blast_data_t *) drv_data; + for (i = 0; i < tmbd->no_thrs; i++) + erl_drv_thread_join(tmbd->tid[i], NULL); + driver_free((void *) tmbd); +} + +static ErlDrvData start(ErlDrvPort port, + char *command) +{ + thr_msg_blast_data_t *tmbd; + + tmbd = driver_alloc(sizeof(thr_msg_blast_data_t)); + if (!tmbd) + return ERL_DRV_ERROR_GENERAL; + + tmbd->port = port; + tmbd->td_port = driver_mk_port(port); + tmbd->hi = driver_mk_atom("hi"); + tmbd->no_thrs = 0; + tmbd->no_procs = 1; + tmbd->proc[0] = driver_caller(port); + + return (ErlDrvData) tmbd; +} + +static void *thread(void *); + +static ErlDrvSSizeT control(ErlDrvData drv_data, + unsigned int command, + char *buf, ErlDrvSizeT len, + char **rbuf, ErlDrvSizeT rlen) +{ + thr_msg_blast_data_t *tmbd = (thr_msg_blast_data_t *) drv_data; + char *res_str = "error"; + + if (tmbd->no_procs >= THR_MSG_BLAST_NO_PROCS) { + int i; + for (i = 0; i < tmbd->no_thrs; i++) + erl_drv_thread_join(tmbd->tid[i], NULL); + tmbd->no_thrs = 0; + res_str = "done"; + } + else { + + tmbd->proc[tmbd->no_procs++] = driver_caller(tmbd->port); + + if (tmbd->no_procs == THR_MSG_BLAST_NO_PROCS) { + for (tmbd->no_thrs = 0; + tmbd->no_thrs < THR_MSG_BLAST_THREADS; + tmbd->no_thrs++) { + int res = erl_drv_thread_create("test", + &tmbd->tid[tmbd->no_thrs], + thread, + tmbd, + NULL); + if (res != 0) { + driver_failure_posix(tmbd->port, res); + goto done; + } + } + } + + res_str = "receiver"; + } + + done: { + ErlDrvSSizeT res_len = strlen(res_str); + if (res_len > rlen) { + char *abuf = driver_alloc(sizeof(char)*res_len); + if (!abuf) + return 0; + *rbuf = abuf; + } + + memcpy((void *) *rbuf, (void *) res_str, res_len); + + return res_len; + } +} + +static void *thread(void *varg) +{ + int s, p; + thr_msg_blast_data_t *tmbd = (thr_msg_blast_data_t *) varg; + ErlDrvTermData spec[] = { + ERL_DRV_PORT, tmbd->td_port, + ERL_DRV_ATOM, tmbd->hi, + ERL_DRV_TUPLE, 2 + }; + + for (s = 0; s < THR_MSG_BLAST_NO_SENDS_PER_PROC; s++) { + for (p = 0; p < THR_MSG_BLAST_NO_PROCS; p++) { + int res = driver_send_term(tmbd->port, tmbd->proc[p], + spec, sizeof(spec)/sizeof(spec[0])); + if (p == 0 && res <= 0) + abort(); /* Could not send to creator */ + } + } + return NULL; +} |